azure-cosmosdb by alinaqi/claude-bootstrap
npx skills add https://github.com/alinaqi/claude-bootstrap --skill azure-cosmosdb加载方式:base.md + [typescript.md | python.md]
Azure Cosmos DB 是一个全球分布式、多模型数据库,保证低延迟、弹性可扩展性和多种一致性模型。
资料来源: Cosmos DB 文档 | 分区 | SDK
明智选择分区键,为访问模式设计,理解一致性权衡。
Cosmos DB 将数据分布在多个分区中。分区键的选择决定了可扩展性、性能和成本。设计时要考虑均匀分布和查询效率。
| API | 使用场景 |
|---|---|
| NoSQL (Core) | 文档数据库,最灵活 |
| MongoDB | 兼容 MongoDB 有线协议 |
| PostgreSQL | 分布式 PostgreSQL (Citus) |
| Apache Cassandra | 宽列存储 |
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
| 图数据库 |
| Table | 键值存储(兼容 Azure Table Storage) |
本技能重点介绍 NoSQL (Core) API - 最常见的选择。
| 概念 | 描述 |
|---|---|
| 容器 | 项的集合(类似表) |
| 项 | 单个文档/记录 (JSON) |
| 分区键 | 决定数据分布 |
| 逻辑分区 | 具有相同分区键的项 |
| 物理分区 | 存储单元(最大 50GB,10K RU/s) |
| RU (请求单位) | 吞吐量货币 |
// 高基数,均匀分布,在查询中使用
// 电子商务:用户数据的 userId
{ "id": "order-123", "userId": "user-456", ... } // PK: /userId
// 多租户:tenantId
{ "id": "doc-1", "tenantId": "tenant-abc", ... } // PK: /tenantId
// IoT:遥测数据的 deviceId
{ "id": "reading-1", "deviceId": "device-789", ... } // PK: /deviceId
// 日志:合成键(日期 + 类别)
{ "id": "log-1", "partitionKey": "2024-01-15_errors", ... } // PK: /partitionKey
// 用于多级分布(例如,租户 → 用户)
// 容器创建时使用:/tenantId, /userId
{
"id": "order-123",
"tenantId": "acme-corp",
"userId": "user-456",
"items": [...]
}
// 在租户和用户内高效查询
// 避免:
// - 低基数(状态、类型、布尔值)
// - 单调递增(时间戳、自增)
// - 频繁更新的字段
// - 查询中不使用的字段
// 不良:只有 3 个值 → 热分区
{ "status": "pending" | "completed" | "cancelled" }
// 不良:所有写入都进入最新分区
{ "timestamp": "2024-01-15T10:30:00Z" }
npm install @azure/cosmos
// lib/cosmosdb.ts
import { CosmosClient, Database, Container } from '@azure/cosmos';
const endpoint = process.env.COSMOS_ENDPOINT!;
const key = process.env.COSMOS_KEY!;
const databaseId = process.env.COSMOS_DATABASE!;
const client = new CosmosClient({ endpoint, key });
// 或使用连接字符串
// const client = new CosmosClient(process.env.COSMOS_CONNECTION_STRING!);
export const database: Database = client.database(databaseId);
export function getContainer(containerId: string): Container {
return database.container(containerId);
}
// types/cosmos.ts
export interface BaseItem {
id: string;
_ts?: number; // 自动生成的时间戳
_etag?: string; // 用于乐观并发控制
}
export interface User extends BaseItem {
userId: string; // 分区键
email: string;
name: string;
createdAt: string;
updatedAt: string;
}
export interface Order extends BaseItem {
userId: string; // 分区键
orderId: string;
items: OrderItem[];
total: number;
status: 'pending' | 'paid' | 'shipped' | 'delivered';
createdAt: string;
}
export interface OrderItem {
productId: string;
name: string;
quantity: number;
price: number;
}
import { getContainer } from './cosmosdb';
import { User } from './types';
const usersContainer = getContainer('users');
async function createUser(data: Omit<User, 'id' | 'createdAt' | 'updatedAt'>): Promise<User> {
const now = new Date().toISOString();
const user: User = {
id: crypto.randomUUID(),
...data,
createdAt: now,
updatedAt: now
};
const { resource } = await usersContainer.items.create(user);
return resource as User;
}
// 最高效的读取 - 需要 id 和分区键
async function getUser(userId: string, id: string): Promise<User | null> {
try {
const { resource } = await usersContainer.item(id, userId).read<User>();
return resource || null;
} catch (error: any) {
if (error.code === 404) return null;
throw error;
}
}
// 如果 id 等于分区键值
async function getUserById(userId: string): Promise<User | null> {
try {
const { resource } = await usersContainer.item(userId, userId).read<User>();
return resource || null;
} catch (error: any) {
if (error.code === 404) return null;
throw error;
}
}
// 在分区内查询(高效)
async function getUserOrders(userId: string): Promise<Order[]> {
const ordersContainer = getContainer('orders');
const { resources } = await ordersContainer.items
.query<Order>({
query: 'SELECT * FROM c WHERE c.userId = @userId ORDER BY c.createdAt DESC',
parameters: [{ name: '@userId', value: userId }]
})
.fetchAll();
return resources;
}
// 跨分区查询(谨慎使用)
async function getOrdersByStatus(status: string): Promise<Order[]> {
const ordersContainer = getContainer('orders');
const { resources } = await ordersContainer.items
.query<Order>({
query: 'SELECT * FROM c WHERE c.status = @status',
parameters: [{ name: '@status', value: status }]
})
.fetchAll();
return resources;
}
// 分页查询
async function getOrdersPaginated(
userId: string,
pageSize: number = 10,
continuationToken?: string
): Promise<{ items: Order[]; continuationToken?: string }> {
const ordersContainer = getContainer('orders');
const queryIterator = ordersContainer.items.query<Order>(
{
query: 'SELECT * FROM c WHERE c.userId = @userId ORDER BY c.createdAt DESC',
parameters: [{ name: '@userId', value: userId }]
},
{
maxItemCount: pageSize,
continuationToken
}
);
const { resources, continuationToken: nextToken } = await queryIterator.fetchNext();
return {
items: resources,
continuationToken: nextToken
};
}
// 替换整个项
async function updateUser(userId: string, id: string, updates: Partial<User>): Promise<User> {
const existing = await getUser(userId, id);
if (!existing) throw new Error('User not found');
const updated: User = {
...existing,
...updates,
updatedAt: new Date().toISOString()
};
const { resource } = await usersContainer.item(id, userId).replace(updated);
return resource as User;
}
// 部分更新(补丁操作)
async function patchUser(userId: string, id: string, operations: any[]): Promise<User> {
const { resource } = await usersContainer.item(id, userId).patch(operations);
return resource as User;
}
// 用法:
await patchUser('user-123', 'user-123', [
{ op: 'set', path: '/name', value: 'New Name' },
{ op: 'set', path: '/updatedAt', value: new Date().toISOString() },
{ op: 'incr', path: '/loginCount', value: 1 }
]);
async function deleteUser(userId: string, id: string): Promise<void> {
await usersContainer.item(id, userId).delete();
}
async function updateUserWithETag(
userId: string,
id: string,
updates: Partial<User>,
etag: string
): Promise<User> {
const existing = await getUser(userId, id);
if (!existing) throw new Error('User not found');
const updated: User = {
...existing,
...updates,
updatedAt: new Date().toISOString()
};
try {
const { resource } = await usersContainer.item(id, userId).replace(updated, {
accessCondition: { type: 'IfMatch', condition: etag }
});
return resource as User;
} catch (error: any) {
if (error.code === 412) {
throw new Error('文档已被其他进程修改');
}
throw error;
}
}
| 级别 | 保证 | 延迟 | 使用场景 |
|---|---|---|---|
| 强一致性 | 可线性化读取 | 最高 | 金融、库存 |
| 有限过期一致性 | 在界限内一致 | 高 | 排行榜、计数器 |
| 会话一致性 | 读取自己的写入 | 中等 | 用户会话(默认) |
| 一致前缀一致性 | 有序读取 | 低 | 社交动态 |
| 最终一致性 | 无顺序保证 | 最低 | 分析、日志 |
// 覆盖默认一致性
const { resource } = await usersContainer.item(id, userId).read<User>({
consistencyLevel: 'Strong'
});
// 对于查询
const { resources } = await container.items.query(
{ query: 'SELECT * FROM c' },
{ consistencyLevel: 'BoundedStaleness' }
).fetchAll();
async function createOrderWithItems(userId: string, order: Order, items: any[]): Promise<void> {
const ordersContainer = getContainer('orders');
const operations = [
{ operationType: 'Create' as const, resourceBody: order },
...items.map(item => ({
operationType: 'Create' as const,
resourceBody: { ...item, userId, orderId: order.orderId }
}))
];
const { result } = await ordersContainer.items.batch(operations, userId);
// 检查是否有操作失败
if (result.some(r => r.statusCode >= 400)) {
throw new Error('批处理操作失败');
}
}
// 用于大规模导入(非事务性)
async function bulkImportUsers(users: User[]): Promise<void> {
const operations = users.map(user => ({
operationType: 'Create' as const,
resourceBody: user,
partitionKey: user.userId
}));
// 分块处理
const chunkSize = 100;
for (let i = 0; i < operations.length; i += chunkSize) {
const chunk = operations.slice(i, i + chunkSize);
await usersContainer.items.bulk(chunk);
}
}
import { ChangeFeedStartFrom } from '@azure/cosmos';
async function processChangeFeed(): Promise<void> {
const container = getContainer('orders');
const changeFeedIterator = container.items.changeFeed({
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
});
while (changeFeedIterator.hasMoreResults) {
const { result: items, statusCode } = await changeFeedIterator.fetchNext();
if (statusCode === 304) {
// 没有新更改
await sleep(1000);
continue;
}
for (const item of items) {
console.log('更改的项:', item);
// 处理更改...
}
}
}
// 对于生产环境,使用带有租约容器的更改源处理器
async function startChangeFeedProcessor(): Promise<void> {
const sourceContainer = getContainer('orders');
const leaseContainer = getContainer('leases');
const changeFeedProcessor = sourceContainer.items.changeFeed
.for(item => {
// 处理每个更改
console.log('正在处理:', item);
})
.withLeaseContainer(leaseContainer)
.build();
await changeFeedProcessor.start();
}
pip install azure-cosmos
# cosmos_db.py
import os
from azure.cosmos import CosmosClient, PartitionKey
from azure.cosmos.exceptions import CosmosResourceNotFoundError
from typing import Optional, List
from datetime import datetime
import uuid
# 初始化客户端
endpoint = os.environ['COSMOS_ENDPOINT']
key = os.environ['COSMOS_KEY']
database_name = os.environ['COSMOS_DATABASE']
client = CosmosClient(endpoint, key)
database = client.get_database_client(database_name)
def get_container(container_name: str):
return database.get_container_client(container_name)
# CRUD 操作
users_container = get_container('users')
def create_user(email: str, name: str, user_id: str = None) -> dict:
user_id = user_id or str(uuid.uuid4())
now = datetime.utcnow().isoformat()
user = {
'id': user_id,
'userId': user_id, # 分区键
'email': email,
'name': name,
'createdAt': now,
'updatedAt': now
}
return users_container.create_item(user)
def get_user(user_id: str) -> Optional[dict]:
try:
return users_container.read_item(item=user_id, partition_key=user_id)
except CosmosResourceNotFoundError:
return None
def query_users(email_domain: str) -> List[dict]:
query = "SELECT * FROM c WHERE CONTAINS(c.email, @domain)"
parameters = [{'name': '@domain', 'value': email_domain}]
return list(users_container.query_items(
query=query,
parameters=parameters,
enable_cross_partition_query=True
))
def update_user(user_id: str, **updates) -> dict:
user = get_user(user_id)
if not user:
raise ValueError('User not found')
user.update(updates)
user['updatedAt'] = datetime.utcnow().isoformat()
return users_container.replace_item(item=user_id, body=user)
def delete_user(user_id: str) -> None:
users_container.delete_item(item=user_id, partition_key=user_id)
# 分页查询
def get_users_paginated(page_size: int = 10, continuation_token: str = None):
query = "SELECT * FROM c ORDER BY c.createdAt DESC"
items = users_container.query_items(
query=query,
enable_cross_partition_query=True,
max_item_count=page_size,
continuation_token=continuation_token
)
page = items.by_page()
results = list(next(page))
return {
'items': results,
'continuation_token': page.continuation_token
}
{
"indexingMode": "consistent",
"automatic": true,
"includedPaths": [
{ "path": "/userId/?" },
{ "path": "/status/?" },
{ "path": "/createdAt/?" }
],
"excludedPaths": [
{ "path": "/content/*" },
{ "path": "/_etag/?" }
],
"compositeIndexes": [
[
{ "path": "/userId", "order": "ascending" },
{ "path": "/createdAt", "order": "descending" }
]
]
}
await database.containers.createIfNotExists({
id: 'orders',
partitionKey: { paths: ['/userId'] },
indexingPolicy: {
indexingMode: 'consistent',
includedPaths: [
{ path: '/userId/?' },
{ path: '/status/?' },
{ path: '/createdAt/?' }
],
excludedPaths: [
{ path: '/*' } // 默认排除所有
]
}
});
// 容器级别
await database.containers.createIfNotExists({
id: 'orders',
partitionKey: { paths: ['/userId'] },
throughput: 1000 // RU/s
});
// 缩放吞吐量
const container = database.container('orders');
await container.throughput.replace(2000);
await database.containers.createIfNotExists({
id: 'orders',
partitionKey: { paths: ['/userId'] },
maxThroughput: 10000 // 自动缩放 10% 到 100%
});
// 无需吞吐量配置
// 按请求付费(适用于开发/测试、间歇性工作负载)
await database.containers.createIfNotExists({
id: 'orders',
partitionKey: { paths: ['/userId'] }
// 无吞吐量 = 无服务器
});
# Azure CLI
az cosmosdb create --name myaccount --resource-group mygroup
az cosmosdb sql database create --account-name myaccount --name mydb --resource-group mygroup
az cosmosdb sql container create \
--account-name myaccount \
--database-name mydb \
--name orders \
--partition-key-path /userId \
--throughput 400
# 查询
az cosmosdb sql query --account-name myaccount --database-name mydb \
--container-name orders --query "SELECT * FROM c"
# 密钥
az cosmosdb keys list --name myaccount --resource-group mygroup
az cosmosdb keys list --name myaccount --resource-group mygroup --type connection-strings
| 策略 | 影响 |
|---|---|
| 正确的分区键 | 避免热分区(浪费 RU) |
| 仅索引查询内容 | 降低写入 RU 成本 |
| 使用点读取 | 1 RU 对比查询的 3+ RU |
| 开发/测试使用无服务器 | 按请求付费 |
| 生产环境使用自动缩放 | 低流量时缩减 |
| 临时数据使用 TTL | 自动删除旧项 |
// 在容器上启用 TTL
await database.containers.createIfNotExists({
id: 'sessions',
partitionKey: { paths: ['/userId'] },
defaultTtl: 3600 // 1 小时
});
// 每项 TTL
const session = {
id: 'session-123',
userId: 'user-456',
ttl: 1800 // 覆盖:30 分钟
};
每周安装次数
62
代码库
GitHub 星标数
531
首次出现
2026年1月20日
安全审计
安装于
claude-code50
opencode49
gemini-cli46
codex44
cursor41
antigravity39
Load with: base.md + [typescript.md | python.md]
Azure Cosmos DB is a globally distributed, multi-model database with guaranteed low latency, elastic scalability, and multiple consistency models.
Sources: Cosmos DB Docs | Partitioning | SDK
Choose partition key wisely, design for your access patterns, understand consistency tradeoffs.
Cosmos DB distributes data across partitions. Your partition key choice determines scalability, performance, and cost. Design for even distribution and query efficiency.
| API | Use Case |
|---|---|
| NoSQL (Core) | Document database, most flexible |
| MongoDB | MongoDB wire protocol compatible |
| PostgreSQL | Distributed PostgreSQL (Citus) |
| Apache Cassandra | Wide-column store |
| Apache Gremlin | Graph database |
| Table | Key-value (Azure Table Storage compatible) |
This skill focuses on NoSQL (Core) API - the most common choice.
| Concept | Description |
|---|---|
| Container | Collection of items (like a table) |
| Item | Single document/record (JSON) |
| Partition Key | Determines data distribution |
| Logical Partition | Items with same partition key |
| Physical Partition | Storage unit (max 50GB, 10K RU/s) |
| RU (Request Unit) | Throughput currency |
// High cardinality, even distribution, used in queries
// E-commerce: userId for user data
{ "id": "order-123", "userId": "user-456", ... } // PK: /userId
// Multi-tenant: tenantId
{ "id": "doc-1", "tenantId": "tenant-abc", ... } // PK: /tenantId
// IoT: deviceId for telemetry
{ "id": "reading-1", "deviceId": "device-789", ... } // PK: /deviceId
// Logs: synthetic key (date + category)
{ "id": "log-1", "partitionKey": "2024-01-15_errors", ... } // PK: /partitionKey
// For multi-level distribution (e.g., tenant → user)
// Container created with: /tenantId, /userId
{
"id": "order-123",
"tenantId": "acme-corp",
"userId": "user-456",
"items": [...]
}
// Query within tenant and user efficiently
// Avoid:
// - Low cardinality (status, type, boolean)
// - Monotonically increasing (timestamp, auto-increment)
// - Frequently updated fields
// - Fields not used in queries
// Bad: Only 3 values → hot partitions
{ "status": "pending" | "completed" | "cancelled" }
// Bad: All writes go to latest partition
{ "timestamp": "2024-01-15T10:30:00Z" }
npm install @azure/cosmos
// lib/cosmosdb.ts
import { CosmosClient, Database, Container } from '@azure/cosmos';
const endpoint = process.env.COSMOS_ENDPOINT!;
const key = process.env.COSMOS_KEY!;
const databaseId = process.env.COSMOS_DATABASE!;
const client = new CosmosClient({ endpoint, key });
// Or with connection string
// const client = new CosmosClient(process.env.COSMOS_CONNECTION_STRING!);
export const database: Database = client.database(databaseId);
export function getContainer(containerId: string): Container {
return database.container(containerId);
}
// types/cosmos.ts
export interface BaseItem {
id: string;
_ts?: number; // Auto-generated timestamp
_etag?: string; // For optimistic concurrency
}
export interface User extends BaseItem {
userId: string; // Partition key
email: string;
name: string;
createdAt: string;
updatedAt: string;
}
export interface Order extends BaseItem {
userId: string; // Partition key
orderId: string;
items: OrderItem[];
total: number;
status: 'pending' | 'paid' | 'shipped' | 'delivered';
createdAt: string;
}
export interface OrderItem {
productId: string;
name: string;
quantity: number;
price: number;
}
import { getContainer } from './cosmosdb';
import { User } from './types';
const usersContainer = getContainer('users');
async function createUser(data: Omit<User, 'id' | 'createdAt' | 'updatedAt'>): Promise<User> {
const now = new Date().toISOString();
const user: User = {
id: crypto.randomUUID(),
...data,
createdAt: now,
updatedAt: now
};
const { resource } = await usersContainer.items.create(user);
return resource as User;
}
// Most efficient read - requires id AND partition key
async function getUser(userId: string, id: string): Promise<User | null> {
try {
const { resource } = await usersContainer.item(id, userId).read<User>();
return resource || null;
} catch (error: any) {
if (error.code === 404) return null;
throw error;
}
}
// If id equals partition key value
async function getUserById(userId: string): Promise<User | null> {
try {
const { resource } = await usersContainer.item(userId, userId).read<User>();
return resource || null;
} catch (error: any) {
if (error.code === 404) return null;
throw error;
}
}
// Query within partition (efficient)
async function getUserOrders(userId: string): Promise<Order[]> {
const ordersContainer = getContainer('orders');
const { resources } = await ordersContainer.items
.query<Order>({
query: 'SELECT * FROM c WHERE c.userId = @userId ORDER BY c.createdAt DESC',
parameters: [{ name: '@userId', value: userId }]
})
.fetchAll();
return resources;
}
// Cross-partition query (use sparingly)
async function getOrdersByStatus(status: string): Promise<Order[]> {
const ordersContainer = getContainer('orders');
const { resources } = await ordersContainer.items
.query<Order>({
query: 'SELECT * FROM c WHERE c.status = @status',
parameters: [{ name: '@status', value: status }]
})
.fetchAll();
return resources;
}
// Paginated query
async function getOrdersPaginated(
userId: string,
pageSize: number = 10,
continuationToken?: string
): Promise<{ items: Order[]; continuationToken?: string }> {
const ordersContainer = getContainer('orders');
const queryIterator = ordersContainer.items.query<Order>(
{
query: 'SELECT * FROM c WHERE c.userId = @userId ORDER BY c.createdAt DESC',
parameters: [{ name: '@userId', value: userId }]
},
{
maxItemCount: pageSize,
continuationToken
}
);
const { resources, continuationToken: nextToken } = await queryIterator.fetchNext();
return {
items: resources,
continuationToken: nextToken
};
}
// Replace entire item
async function updateUser(userId: string, id: string, updates: Partial<User>): Promise<User> {
const existing = await getUser(userId, id);
if (!existing) throw new Error('User not found');
const updated: User = {
...existing,
...updates,
updatedAt: new Date().toISOString()
};
const { resource } = await usersContainer.item(id, userId).replace(updated);
return resource as User;
}
// Partial update (patch operations)
async function patchUser(userId: string, id: string, operations: any[]): Promise<User> {
const { resource } = await usersContainer.item(id, userId).patch(operations);
return resource as User;
}
// Usage:
await patchUser('user-123', 'user-123', [
{ op: 'set', path: '/name', value: 'New Name' },
{ op: 'set', path: '/updatedAt', value: new Date().toISOString() },
{ op: 'incr', path: '/loginCount', value: 1 }
]);
async function deleteUser(userId: string, id: string): Promise<void> {
await usersContainer.item(id, userId).delete();
}
async function updateUserWithETag(
userId: string,
id: string,
updates: Partial<User>,
etag: string
): Promise<User> {
const existing = await getUser(userId, id);
if (!existing) throw new Error('User not found');
const updated: User = {
...existing,
...updates,
updatedAt: new Date().toISOString()
};
try {
const { resource } = await usersContainer.item(id, userId).replace(updated, {
accessCondition: { type: 'IfMatch', condition: etag }
});
return resource as User;
} catch (error: any) {
if (error.code === 412) {
throw new Error('Document was modified by another process');
}
throw error;
}
}
| Level | Guarantees | Latency | Use Case |
|---|---|---|---|
| Strong | Linearizable reads | Highest | Financial, inventory |
| Bounded Staleness | Consistent within bounds | High | Leaderboards, counters |
| Session | Read your writes | Medium | User sessions (default) |
| Consistent Prefix | Ordered reads | Low | Social feeds |
| Eventual | No ordering guarantee | Lowest | Analytics, logs |
// Override default consistency
const { resource } = await usersContainer.item(id, userId).read<User>({
consistencyLevel: 'Strong'
});
// For queries
const { resources } = await container.items.query(
{ query: 'SELECT * FROM c' },
{ consistencyLevel: 'BoundedStaleness' }
).fetchAll();
async function createOrderWithItems(userId: string, order: Order, items: any[]): Promise<void> {
const ordersContainer = getContainer('orders');
const operations = [
{ operationType: 'Create' as const, resourceBody: order },
...items.map(item => ({
operationType: 'Create' as const,
resourceBody: { ...item, userId, orderId: order.orderId }
}))
];
const { result } = await ordersContainer.items.batch(operations, userId);
// Check if any operation failed
if (result.some(r => r.statusCode >= 400)) {
throw new Error('Batch operation failed');
}
}
// For large-scale imports (not transactional)
async function bulkImportUsers(users: User[]): Promise<void> {
const operations = users.map(user => ({
operationType: 'Create' as const,
resourceBody: user,
partitionKey: user.userId
}));
// Process in chunks
const chunkSize = 100;
for (let i = 0; i < operations.length; i += chunkSize) {
const chunk = operations.slice(i, i + chunkSize);
await usersContainer.items.bulk(chunk);
}
}
import { ChangeFeedStartFrom } from '@azure/cosmos';
async function processChangeFeed(): Promise<void> {
const container = getContainer('orders');
const changeFeedIterator = container.items.changeFeed({
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
});
while (changeFeedIterator.hasMoreResults) {
const { result: items, statusCode } = await changeFeedIterator.fetchNext();
if (statusCode === 304) {
// No new changes
await sleep(1000);
continue;
}
for (const item of items) {
console.log('Changed item:', item);
// Process the change...
}
}
}
// For production, use Change Feed Processor with lease container
async function startChangeFeedProcessor(): Promise<void> {
const sourceContainer = getContainer('orders');
const leaseContainer = getContainer('leases');
const changeFeedProcessor = sourceContainer.items.changeFeed
.for(item => {
// Process each change
console.log('Processing:', item);
})
.withLeaseContainer(leaseContainer)
.build();
await changeFeedProcessor.start();
}
pip install azure-cosmos
# cosmos_db.py
import os
from azure.cosmos import CosmosClient, PartitionKey
from azure.cosmos.exceptions import CosmosResourceNotFoundError
from typing import Optional, List
from datetime import datetime
import uuid
# Initialize client
endpoint = os.environ['COSMOS_ENDPOINT']
key = os.environ['COSMOS_KEY']
database_name = os.environ['COSMOS_DATABASE']
client = CosmosClient(endpoint, key)
database = client.get_database_client(database_name)
def get_container(container_name: str):
return database.get_container_client(container_name)
# CRUD Operations
users_container = get_container('users')
def create_user(email: str, name: str, user_id: str = None) -> dict:
user_id = user_id or str(uuid.uuid4())
now = datetime.utcnow().isoformat()
user = {
'id': user_id,
'userId': user_id, # Partition key
'email': email,
'name': name,
'createdAt': now,
'updatedAt': now
}
return users_container.create_item(user)
def get_user(user_id: str) -> Optional[dict]:
try:
return users_container.read_item(item=user_id, partition_key=user_id)
except CosmosResourceNotFoundError:
return None
def query_users(email_domain: str) -> List[dict]:
query = "SELECT * FROM c WHERE CONTAINS(c.email, @domain)"
parameters = [{'name': '@domain', 'value': email_domain}]
return list(users_container.query_items(
query=query,
parameters=parameters,
enable_cross_partition_query=True
))
def update_user(user_id: str, **updates) -> dict:
user = get_user(user_id)
if not user:
raise ValueError('User not found')
user.update(updates)
user['updatedAt'] = datetime.utcnow().isoformat()
return users_container.replace_item(item=user_id, body=user)
def delete_user(user_id: str) -> None:
users_container.delete_item(item=user_id, partition_key=user_id)
# Paginated query
def get_users_paginated(page_size: int = 10, continuation_token: str = None):
query = "SELECT * FROM c ORDER BY c.createdAt DESC"
items = users_container.query_items(
query=query,
enable_cross_partition_query=True,
max_item_count=page_size,
continuation_token=continuation_token
)
page = items.by_page()
results = list(next(page))
return {
'items': results,
'continuation_token': page.continuation_token
}
{
"indexingMode": "consistent",
"automatic": true,
"includedPaths": [
{ "path": "/userId/?" },
{ "path": "/status/?" },
{ "path": "/createdAt/?" }
],
"excludedPaths": [
{ "path": "/content/*" },
{ "path": "/_etag/?" }
],
"compositeIndexes": [
[
{ "path": "/userId", "order": "ascending" },
{ "path": "/createdAt", "order": "descending" }
]
]
}
await database.containers.createIfNotExists({
id: 'orders',
partitionKey: { paths: ['/userId'] },
indexingPolicy: {
indexingMode: 'consistent',
includedPaths: [
{ path: '/userId/?' },
{ path: '/status/?' },
{ path: '/createdAt/?' }
],
excludedPaths: [
{ path: '/*' } // Exclude all by default
]
}
});
// Container level
await database.containers.createIfNotExists({
id: 'orders',
partitionKey: { paths: ['/userId'] },
throughput: 1000 // RU/s
});
// Scale throughput
const container = database.container('orders');
await container.throughput.replace(2000);
await database.containers.createIfNotExists({
id: 'orders',
partitionKey: { paths: ['/userId'] },
maxThroughput: 10000 // Auto-scales 10% to 100%
});
// No throughput configuration needed
// Pay per request (good for dev/test, intermittent workloads)
await database.containers.createIfNotExists({
id: 'orders',
partitionKey: { paths: ['/userId'] }
// No throughput = serverless
});
# Azure CLI
az cosmosdb create --name myaccount --resource-group mygroup
az cosmosdb sql database create --account-name myaccount --name mydb --resource-group mygroup
az cosmosdb sql container create \
--account-name myaccount \
--database-name mydb \
--name orders \
--partition-key-path /userId \
--throughput 400
# Query
az cosmosdb sql query --account-name myaccount --database-name mydb \
--container-name orders --query "SELECT * FROM c"
# Keys
az cosmosdb keys list --name myaccount --resource-group mygroup
az cosmosdb keys list --name myaccount --resource-group mygroup --type connection-strings
| Strategy | Impact |
|---|---|
| Right partition key | Avoid hot partitions (wasted RUs) |
| Index only what you query | Reduce write RU cost |
| Use point reads | 1 RU vs 3+ RU for queries |
| Serverless for dev/test | Pay per request |
| Autoscale for production | Scale down during low traffic |
| TTL for temporary data | Auto-delete old items |
// Enable TTL on container
await database.containers.createIfNotExists({
id: 'sessions',
partitionKey: { paths: ['/userId'] },
defaultTtl: 3600 // 1 hour
});
// Per-item TTL
const session = {
id: 'session-123',
userId: 'user-456',
ttl: 1800 // Override: 30 minutes
};
Weekly Installs
62
Repository
GitHub Stars
531
First Seen
Jan 20, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
claude-code50
opencode49
gemini-cli46
codex44
cursor41
antigravity39
Supabase Postgres 最佳实践指南 - 8大类别性能优化规则与SQL示例
78,800 周安装