anthropic-sdk by bobmatnyc/claude-mpm-skills
npx skills add https://github.com/bobmatnyc/claude-mpm-skills --skill anthropic-sdkpip install anthropic
npm install @anthropic-ai/sdk
export ANTHROPIC_API_KEY='your-api-key-here'
从以下网址获取您的 API 密钥:https://console.anthropic.com/settings/keys
import anthropic
import os
client = anthropic.Anthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY")
)
message = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[
{"role": "user", "content": "Explain quantum computing in simple terms"}
]
)
print(message.content[0].text)
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
import Anthropic from '@anthropic-ai/sdk';
const client = new Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
});
const message = await client.messages.create({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 1024,
messages: [
{ role: 'user', content: 'Explain quantum computing in simple terms' }
],
});
console.log(message.content[0].text);
# Python - 用于上下文的系统提示
message = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
system="You are a helpful coding assistant specializing in Python and TypeScript.",
messages=[
{"role": "user", "content": "How do I handle errors in async functions?"}
]
)
// TypeScript - 系统提示
const message = await client.messages.create({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 1024,
system: 'You are a helpful coding assistant specializing in Python and TypeScript.',
messages: [
{ role: 'user', content: 'How do I handle errors in async functions?' }
],
});
# 实时流式响应
with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[
{"role": "user", "content": "Write a short poem about coding"}
]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
import asyncio
async def stream_response():
async with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[
{"role": "user", "content": "Explain recursion"}
]
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)
asyncio.run(stream_response())
// 使用事件处理程序的流式传输
const stream = await client.messages.stream({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 1024,
messages: [
{ role: 'user', content: 'Write a short poem about coding' }
],
});
for await (const chunk of stream) {
if (chunk.type === 'content_block_delta' &&
chunk.delta.type === 'text_delta') {
process.stdout.write(chunk.delta.text);
}
}
# 定义工具(函数)
tools = [
{
"name": "get_weather",
"description": "Get the current weather for a location",
"input_schema": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "City name, e.g., San Francisco, CA"
},
"unit": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "Temperature unit"
}
},
"required": ["location"]
}
}
]
# 初始请求
message = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
tools=tools,
messages=[
{"role": "user", "content": "What's the weather in San Francisco?"}
]
)
# 检查工具使用
if message.stop_reason == "tool_use":
tool_use = next(block for block in message.content if block.type == "tool_use")
tool_name = tool_use.name
tool_input = tool_use.input
# 执行函数(模拟示例)
if tool_name == "get_weather":
weather_result = {
"temperature": 72,
"unit": "fahrenheit",
"conditions": "sunny"
}
# 将结果发送回 Claude
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
tools=tools,
messages=[
{"role": "user", "content": "What's the weather in San Francisco?"},
{"role": "assistant", "content": message.content},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": tool_use.id,
"content": str(weather_result)
}
]
}
]
)
print(response.content[0].text)
// 定义工具
const tools: Anthropic.Tool[] = [
{
name: 'get_weather',
description: 'Get the current weather for a location',
input_schema: {
type: 'object',
properties: {
location: {
type: 'string',
description: 'City name, e.g., San Francisco, CA',
},
unit: {
type: 'string',
enum: ['celsius', 'fahrenheit'],
description: 'Temperature unit',
},
},
required: ['location'],
},
},
];
// 初始请求
const message = await client.messages.create({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 1024,
tools,
messages: [
{ role: 'user', content: "What's the weather in San Francisco?" },
],
});
// 检查工具使用
if (message.stop_reason === 'tool_use') {
const toolUse = message.content.find(
(block): block is Anthropic.ToolUseBlock => block.type === 'tool_use'
);
if (toolUse && toolUse.name === 'get_weather') {
// 执行函数
const weatherResult = {
temperature: 72,
unit: 'fahrenheit',
conditions: 'sunny',
};
// 将结果发送回
const response = await client.messages.create({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 1024,
tools,
messages: [
{ role: 'user', content: "What's the weather in San Francisco?" },
{ role: 'assistant', content: message.content },
{
role: 'user',
content: [
{
type: 'tool_result',
tool_use_id: toolUse.id,
content: JSON.stringify(weatherResult),
},
],
},
],
});
console.log(response.content[0].text);
}
}
import base64
# 加载图像
with open("image.jpg", "rb") as image_file:
image_data = base64.standard_b64encode(image_file.read()).decode("utf-8")
# 将图像发送给 Claude
message = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[
{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/jpeg",
"data": image_data,
},
},
{
"type": "text",
"text": "Describe this image in detail"
}
],
}
],
)
print(message.content[0].text)
import * as fs from 'fs';
// 加载图像
const imageData = fs.readFileSync('image.jpg').toString('base64');
// 将图像发送给 Claude
const message = await client.messages.create({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 1024,
messages: [
{
role: 'user',
content: [
{
type: 'image',
source: {
type: 'base64',
media_type: 'image/jpeg',
data: imageData,
},
},
{
type: 'text',
text: 'Describe this image in detail',
},
],
},
],
});
console.log(message.content[0].text);
通过缓存重复的提示内容来降低成本。
# 缓存系统提示和长上下文
message = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
system=[
{
"type": "text",
"text": "You are an expert Python developer...",
"cache_control": {"type": "ephemeral"}
}
],
messages=[
{
"role": "user",
"content": "How do I use async/await?"
}
]
)
# 后续请求重用缓存的系统提示
const message = await client.messages.create({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 1024,
system: [
{
type: 'text',
text: 'You are an expert TypeScript developer...',
cache_control: { type: 'ephemeral' },
},
],
messages: [
{ role: 'user', content: 'How do I use async/await?' },
],
});
缓存优势:
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import anthropic
import os
app = FastAPI()
client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
class ChatRequest(BaseModel):
message: str
stream: bool = False
@app.post("/chat")
async def chat(request: ChatRequest):
try:
if request.stream:
# 流式响应
async def generate():
async with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": request.message}]
) as stream:
async for text in stream.text_stream:
yield text
return StreamingResponse(generate(), media_type="text/plain")
else:
# 非流式响应
message = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": request.message}]
)
return {"response": message.content[0].text}
except anthropic.APIError as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/chat/tools")
async def chat_with_tools(request: ChatRequest):
tools = [
{
"name": "search_database",
"description": "Search the knowledge database",
"input_schema": {
"type": "object",
"properties": {
"query": {"type": "string"}
},
"required": ["query"]
}
}
]
message = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
tools=tools,
messages=[{"role": "user", "content": request.message}]
)
return {"response": message.content, "stop_reason": message.stop_reason}
import express from 'express';
import Anthropic from '@anthropic-ai/sdk';
const app = express();
app.use(express.json());
const client = new Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
});
interface ChatRequest {
message: string;
stream?: boolean;
}
app.post('/chat', async (req, res) => {
const { message, stream }: ChatRequest = req.body;
try {
if (stream) {
// 流式响应
res.setHeader('Content-Type', 'text/plain');
res.setHeader('Transfer-Encoding', 'chunked');
const streamResponse = await client.messages.stream({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 1024,
messages: [{ role: 'user', content: message }],
});
for await (const chunk of streamResponse) {
if (chunk.type === 'content_block_delta' &&
chunk.delta.type === 'text_delta') {
res.write(chunk.delta.text);
}
}
res.end();
} else {
// 非流式响应
const response = await client.messages.create({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 1024,
messages: [{ role: 'user', content: message }],
});
res.json({ response: response.content[0].text });
}
} catch (error) {
if (error instanceof Anthropic.APIError) {
res.status(500).json({ error: error.message });
} else {
res.status(500).json({ error: 'Internal server error' });
}
}
});
app.listen(3000, () => {
console.log('Server running on port 3000');
});
from anthropic import (
APIError,
APIConnectionError,
RateLimitError,
APITimeoutError
)
import time
def chat_with_retry(message_content: str, max_retries: int = 3):
for attempt in range(max_retries):
try:
message = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": message_content}]
)
return message.content[0].text
except RateLimitError as e:
if attempt < max_retries - 1:
# 指数退避
wait_time = 2 ** attempt
print(f"Rate limit hit, waiting {wait_time}s...")
time.sleep(wait_time)
else:
raise
except APIConnectionError as e:
if attempt < max_retries - 1:
print(f"Connection error, retrying...")
time.sleep(1)
else:
raise
except APITimeoutError as e:
if attempt < max_retries - 1:
print(f"Timeout, retrying...")
time.sleep(2)
else:
raise
except APIError as e:
# 一般 API 错误不重试
print(f"API error: {e}")
raise
import Anthropic from '@anthropic-ai/sdk';
async function chatWithRetry(
messageContent: string,
maxRetries: number = 3
): Promise<string> {
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
const message = await client.messages.create({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 1024,
messages: [{ role: 'user', content: messageContent }],
});
return message.content[0].text;
} catch (error) {
if (error instanceof Anthropic.RateLimitError) {
if (attempt < maxRetries - 1) {
const waitTime = Math.pow(2, attempt) * 1000;
console.log(`Rate limit hit, waiting ${waitTime}ms...`);
await new Promise(resolve => setTimeout(resolve, waitTime));
} else {
throw error;
}
} else if (error instanceof Anthropic.APIConnectionError) {
if (attempt < maxRetries - 1) {
console.log('Connection error, retrying...');
await new Promise(resolve => setTimeout(resolve, 1000));
} else {
throw error;
}
} else {
// 其他错误不重试
throw error;
}
}
}
throw new Error('Max retries exceeded');
}
# 从响应中获取令牌使用情况
message = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello!"}]
)
print(f"Input tokens: {message.usage.input_tokens}")
print(f"Output tokens: {message.usage.output_tokens}")
# 计算成本(示例费率)
INPUT_COST_PER_1K = 0.003 # 每百万令牌 $3
OUTPUT_COST_PER_1K = 0.015 # 每百万令牌 $15
input_cost = (message.usage.input_tokens / 1000) * INPUT_COST_PER_1K
output_cost = (message.usage.output_tokens / 1000) * OUTPUT_COST_PER_1K
total_cost = input_cost + output_cost
print(f"Total cost: ${total_cost:.6f}")
const message = await client.messages.create({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 1024,
messages: [{ role: 'user', content: 'Hello!' }],
});
console.log(`Input tokens: ${message.usage.input_tokens}`);
console.log(`Output tokens: ${message.usage.output_tokens}`);
// 计算成本
const INPUT_COST_PER_1K = 0.003;
const OUTPUT_COST_PER_1K = 0.015;
const inputCost = (message.usage.input_tokens / 1000) * INPUT_COST_PER_1K;
const outputCost = (message.usage.output_tokens / 1000) * OUTPUT_COST_PER_1K;
const totalCost = inputCost + outputCost;
console.log(`Total cost: $${totalCost.toFixed(6)}`);
# 低温度 (0.0-0.3) 用于事实性、确定性的响应
message = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
temperature=0.1, # 更专注
messages=[{"role": "user", "content": "What is 2+2?"}]
)
# 较高温度 (0.7-1.0) 用于创造性响应
message = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=2048,
temperature=0.9, # 更具创造性
messages=[{"role": "user", "content": "Write a creative story"}]
)
# Top-p (核采样)
message = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
top_p=0.9, # 考虑前 90% 的概率质量
messages=[{"role": "user", "content": "Brainstorm ideas"}]
)
from datetime import datetime, timedelta
from collections import deque
class RateLimiter:
def __init__(self, max_requests: int, time_window: int):
self.max_requests = max_requests
self.time_window = time_window # 秒
self.requests = deque()
def can_proceed(self) -> bool:
now = datetime.now()
cutoff = now - timedelta(seconds=self.time_window)
# 移除旧的请求
while self.requests and self.requests[0] < cutoff:
self.requests.popleft()
return len(self.requests) < self.max_requests
def add_request(self):
self.requests.append(datetime.now())
# 用法:每分钟 50 个请求
limiter = RateLimiter(max_requests=50, time_window=60)
if limiter.can_proceed():
limiter.add_request()
message = client.messages.create(...)
else:
print("Rate limit reached, waiting...")
# 多轮对话
conversation = []
def chat(user_message: str):
# 添加用户消息
conversation.append({"role": "user", "content": user_message})
# 发送给 Claude
message = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=conversation
)
# 添加助手响应
conversation.append({
"role": "assistant",
"content": message.content
})
return message.content[0].text
# 多轮使用
response1 = chat("What is Python?")
response2 = chat("Can you show me an example?")
response3 = chat("Explain the example in detail")
# 使用自定义超时配置客户端
client = anthropic.Anthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY"),
timeout=60.0, # 60 秒超时
max_retries=2,
)
# 用于异步操作
async_client = anthropic.AsyncAnthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY"),
timeout=60.0,
)
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def monitored_chat(user_message: str):
start_time = time.time()
try:
message = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": user_message}]
)
duration = time.time() - start_time
logger.info(
f"Chat completed - "
f"Duration: {duration:.2f}s, "
f"Input tokens: {message.usage.input_tokens}, "
f"Output tokens: {message.usage.output_tokens}"
)
return message.content[0].text
except Exception as e:
logger.error(f"Chat failed: {e}")
raise
import os
from typing import Optional
class Config:
ANTHROPIC_API_KEY: str = os.getenv("ANTHROPIC_API_KEY", "")
MODEL: str = os.getenv("ANTHROPIC_MODEL", "claude-3-5-sonnet-20241022")
MAX_TOKENS: int = int(os.getenv("MAX_TOKENS", "1024"))
TEMPERATURE: float = float(os.getenv("TEMPERATURE", "0.7"))
TIMEOUT: float = float(os.getenv("API_TIMEOUT", "60.0"))
@classmethod
def validate(cls):
if not cls.ANTHROPIC_API_KEY:
raise ValueError("ANTHROPIC_API_KEY not set")
# 使用配置初始化客户端
Config.validate()
client = anthropic.Anthropic(
api_key=Config.ANTHROPIC_API_KEY,
timeout=Config.TIMEOUT,
)
| 模型 | 上下文窗口 | 最佳用途 |
|---|---|---|
| claude-3-5-sonnet-20241022 | 200K 令牌 | 通用目的、推理、代码 |
| claude-3-5-haiku-20241022 | 200K 令牌 | 快速响应、成本效益高 |
| claude-3-opus-20240229 | 200K 令牌 | 复杂任务、最高能力 |
推荐: claude-3-5-sonnet-20241022 以获得速度、成本和能力的最佳平衡。
stop_reason 并迭代处理工具使用每周安装
137
仓库
GitHub 星标
18
首次出现
Jan 23, 2026
安全审计
安装于
claude-code114
opencode107
codex106
gemini-cli103
cursor101
github-copilot98
pip install anthropic
npm install @anthropic-ai/sdk
export ANTHROPIC_API_KEY='your-api-key-here'
Get your API key from: https://console.anthropic.com/settings/keys
import anthropic
import os
client = anthropic.Anthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY")
)
message = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[
{"role": "user", "content": "Explain quantum computing in simple terms"}
]
)
print(message.content[0].text)
import Anthropic from '@anthropic-ai/sdk';
const client = new Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
});
const message = await client.messages.create({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 1024,
messages: [
{ role: 'user', content: 'Explain quantum computing in simple terms' }
],
});
console.log(message.content[0].text);
# Python - System prompt for context
message = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
system="You are a helpful coding assistant specializing in Python and TypeScript.",
messages=[
{"role": "user", "content": "How do I handle errors in async functions?"}
]
)
// TypeScript - System prompt
const message = await client.messages.create({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 1024,
system: 'You are a helpful coding assistant specializing in Python and TypeScript.',
messages: [
{ role: 'user', content: 'How do I handle errors in async functions?' }
],
});
# Real-time streaming responses
with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[
{"role": "user", "content": "Write a short poem about coding"}
]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
import asyncio
async def stream_response():
async with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[
{"role": "user", "content": "Explain recursion"}
]
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)
asyncio.run(stream_response())
// Streaming with event handlers
const stream = await client.messages.stream({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 1024,
messages: [
{ role: 'user', content: 'Write a short poem about coding' }
],
});
for await (const chunk of stream) {
if (chunk.type === 'content_block_delta' &&
chunk.delta.type === 'text_delta') {
process.stdout.write(chunk.delta.text);
}
}
# Define tools (functions)
tools = [
{
"name": "get_weather",
"description": "Get the current weather for a location",
"input_schema": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "City name, e.g., San Francisco, CA"
},
"unit": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "Temperature unit"
}
},
"required": ["location"]
}
}
]
# Initial request
message = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
tools=tools,
messages=[
{"role": "user", "content": "What's the weather in San Francisco?"}
]
)
# Check for tool use
if message.stop_reason == "tool_use":
tool_use = next(block for block in message.content if block.type == "tool_use")
tool_name = tool_use.name
tool_input = tool_use.input
# Execute function (mock example)
if tool_name == "get_weather":
weather_result = {
"temperature": 72,
"unit": "fahrenheit",
"conditions": "sunny"
}
# Send result back to Claude
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
tools=tools,
messages=[
{"role": "user", "content": "What's the weather in San Francisco?"},
{"role": "assistant", "content": message.content},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": tool_use.id,
"content": str(weather_result)
}
]
}
]
)
print(response.content[0].text)
// Define tools
const tools: Anthropic.Tool[] = [
{
name: 'get_weather',
description: 'Get the current weather for a location',
input_schema: {
type: 'object',
properties: {
location: {
type: 'string',
description: 'City name, e.g., San Francisco, CA',
},
unit: {
type: 'string',
enum: ['celsius', 'fahrenheit'],
description: 'Temperature unit',
},
},
required: ['location'],
},
},
];
// Initial request
const message = await client.messages.create({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 1024,
tools,
messages: [
{ role: 'user', content: "What's the weather in San Francisco?" },
],
});
// Check for tool use
if (message.stop_reason === 'tool_use') {
const toolUse = message.content.find(
(block): block is Anthropic.ToolUseBlock => block.type === 'tool_use'
);
if (toolUse && toolUse.name === 'get_weather') {
// Execute function
const weatherResult = {
temperature: 72,
unit: 'fahrenheit',
conditions: 'sunny',
};
// Send result back
const response = await client.messages.create({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 1024,
tools,
messages: [
{ role: 'user', content: "What's the weather in San Francisco?" },
{ role: 'assistant', content: message.content },
{
role: 'user',
content: [
{
type: 'tool_result',
tool_use_id: toolUse.id,
content: JSON.stringify(weatherResult),
},
],
},
],
});
console.log(response.content[0].text);
}
}
import base64
# Load image
with open("image.jpg", "rb") as image_file:
image_data = base64.standard_b64encode(image_file.read()).decode("utf-8")
# Send image to Claude
message = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[
{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/jpeg",
"data": image_data,
},
},
{
"type": "text",
"text": "Describe this image in detail"
}
],
}
],
)
print(message.content[0].text)
import * as fs from 'fs';
// Load image
const imageData = fs.readFileSync('image.jpg').toString('base64');
// Send image to Claude
const message = await client.messages.create({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 1024,
messages: [
{
role: 'user',
content: [
{
type: 'image',
source: {
type: 'base64',
media_type: 'image/jpeg',
data: imageData,
},
},
{
type: 'text',
text: 'Describe this image in detail',
},
],
},
],
});
console.log(message.content[0].text);
Reduce costs by caching repetitive prompt content.
# Cache system prompt and long context
message = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
system=[
{
"type": "text",
"text": "You are an expert Python developer...",
"cache_control": {"type": "ephemeral"}
}
],
messages=[
{
"role": "user",
"content": "How do I use async/await?"
}
]
)
# Subsequent requests reuse cached system prompt
const message = await client.messages.create({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 1024,
system: [
{
type: 'text',
text: 'You are an expert TypeScript developer...',
cache_control: { type: 'ephemeral' },
},
],
messages: [
{ role: 'user', content: 'How do I use async/await?' },
],
});
Caching Benefits:
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import anthropic
import os
app = FastAPI()
client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
class ChatRequest(BaseModel):
message: str
stream: bool = False
@app.post("/chat")
async def chat(request: ChatRequest):
try:
if request.stream:
# Streaming response
async def generate():
async with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": request.message}]
) as stream:
async for text in stream.text_stream:
yield text
return StreamingResponse(generate(), media_type="text/plain")
else:
# Non-streaming response
message = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": request.message}]
)
return {"response": message.content[0].text}
except anthropic.APIError as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/chat/tools")
async def chat_with_tools(request: ChatRequest):
tools = [
{
"name": "search_database",
"description": "Search the knowledge database",
"input_schema": {
"type": "object",
"properties": {
"query": {"type": "string"}
},
"required": ["query"]
}
}
]
message = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
tools=tools,
messages=[{"role": "user", "content": request.message}]
)
return {"response": message.content, "stop_reason": message.stop_reason}
import express from 'express';
import Anthropic from '@anthropic-ai/sdk';
const app = express();
app.use(express.json());
const client = new Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
});
interface ChatRequest {
message: string;
stream?: boolean;
}
app.post('/chat', async (req, res) => {
const { message, stream }: ChatRequest = req.body;
try {
if (stream) {
// Streaming response
res.setHeader('Content-Type', 'text/plain');
res.setHeader('Transfer-Encoding', 'chunked');
const streamResponse = await client.messages.stream({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 1024,
messages: [{ role: 'user', content: message }],
});
for await (const chunk of streamResponse) {
if (chunk.type === 'content_block_delta' &&
chunk.delta.type === 'text_delta') {
res.write(chunk.delta.text);
}
}
res.end();
} else {
// Non-streaming response
const response = await client.messages.create({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 1024,
messages: [{ role: 'user', content: message }],
});
res.json({ response: response.content[0].text });
}
} catch (error) {
if (error instanceof Anthropic.APIError) {
res.status(500).json({ error: error.message });
} else {
res.status(500).json({ error: 'Internal server error' });
}
}
});
app.listen(3000, () => {
console.log('Server running on port 3000');
});
from anthropic import (
APIError,
APIConnectionError,
RateLimitError,
APITimeoutError
)
import time
def chat_with_retry(message_content: str, max_retries: int = 3):
for attempt in range(max_retries):
try:
message = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": message_content}]
)
return message.content[0].text
except RateLimitError as e:
if attempt < max_retries - 1:
# Exponential backoff
wait_time = 2 ** attempt
print(f"Rate limit hit, waiting {wait_time}s...")
time.sleep(wait_time)
else:
raise
except APIConnectionError as e:
if attempt < max_retries - 1:
print(f"Connection error, retrying...")
time.sleep(1)
else:
raise
except APITimeoutError as e:
if attempt < max_retries - 1:
print(f"Timeout, retrying...")
time.sleep(2)
else:
raise
except APIError as e:
# Don't retry on general API errors
print(f"API error: {e}")
raise
import Anthropic from '@anthropic-ai/sdk';
async function chatWithRetry(
messageContent: string,
maxRetries: number = 3
): Promise<string> {
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
const message = await client.messages.create({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 1024,
messages: [{ role: 'user', content: messageContent }],
});
return message.content[0].text;
} catch (error) {
if (error instanceof Anthropic.RateLimitError) {
if (attempt < maxRetries - 1) {
const waitTime = Math.pow(2, attempt) * 1000;
console.log(`Rate limit hit, waiting ${waitTime}ms...`);
await new Promise(resolve => setTimeout(resolve, waitTime));
} else {
throw error;
}
} else if (error instanceof Anthropic.APIConnectionError) {
if (attempt < maxRetries - 1) {
console.log('Connection error, retrying...');
await new Promise(resolve => setTimeout(resolve, 1000));
} else {
throw error;
}
} else {
// Don't retry on other errors
throw error;
}
}
}
throw new Error('Max retries exceeded');
}
# Get token usage from response
message = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello!"}]
)
print(f"Input tokens: {message.usage.input_tokens}")
print(f"Output tokens: {message.usage.output_tokens}")
# Calculate cost (example rates)
INPUT_COST_PER_1K = 0.003 # $3 per million tokens
OUTPUT_COST_PER_1K = 0.015 # $15 per million tokens
input_cost = (message.usage.input_tokens / 1000) * INPUT_COST_PER_1K
output_cost = (message.usage.output_tokens / 1000) * OUTPUT_COST_PER_1K
total_cost = input_cost + output_cost
print(f"Total cost: ${total_cost:.6f}")
const message = await client.messages.create({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 1024,
messages: [{ role: 'user', content: 'Hello!' }],
});
console.log(`Input tokens: ${message.usage.input_tokens}`);
console.log(`Output tokens: ${message.usage.output_tokens}`);
// Calculate cost
const INPUT_COST_PER_1K = 0.003;
const OUTPUT_COST_PER_1K = 0.015;
const inputCost = (message.usage.input_tokens / 1000) * INPUT_COST_PER_1K;
const outputCost = (message.usage.output_tokens / 1000) * OUTPUT_COST_PER_1K;
const totalCost = inputCost + outputCost;
console.log(`Total cost: $${totalCost.toFixed(6)}`);
# Low temperature (0.0-0.3) for factual, deterministic responses
message = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
temperature=0.1, # More focused
messages=[{"role": "user", "content": "What is 2+2?"}]
)
# Higher temperature (0.7-1.0) for creative responses
message = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=2048,
temperature=0.9, # More creative
messages=[{"role": "user", "content": "Write a creative story"}]
)
# Top-p (nucleus sampling)
message = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
top_p=0.9, # Consider top 90% probability mass
messages=[{"role": "user", "content": "Brainstorm ideas"}]
)
from datetime import datetime, timedelta
from collections import deque
class RateLimiter:
def __init__(self, max_requests: int, time_window: int):
self.max_requests = max_requests
self.time_window = time_window # seconds
self.requests = deque()
def can_proceed(self) -> bool:
now = datetime.now()
cutoff = now - timedelta(seconds=self.time_window)
# Remove old requests
while self.requests and self.requests[0] < cutoff:
self.requests.popleft()
return len(self.requests) < self.max_requests
def add_request(self):
self.requests.append(datetime.now())
# Usage: 50 requests per minute
limiter = RateLimiter(max_requests=50, time_window=60)
if limiter.can_proceed():
limiter.add_request()
message = client.messages.create(...)
else:
print("Rate limit reached, waiting...")
# Multi-turn conversation
conversation = []
def chat(user_message: str):
# Add user message
conversation.append({"role": "user", "content": user_message})
# Send to Claude
message = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=conversation
)
# Add assistant response
conversation.append({
"role": "assistant",
"content": message.content
})
return message.content[0].text
# Multi-turn usage
response1 = chat("What is Python?")
response2 = chat("Can you show me an example?")
response3 = chat("Explain the example in detail")
# Configure client with custom timeout
client = anthropic.Anthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY"),
timeout=60.0, # 60 second timeout
max_retries=2,
)
# For async operations
async_client = anthropic.AsyncAnthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY"),
timeout=60.0,
)
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def monitored_chat(user_message: str):
start_time = time.time()
try:
message = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": user_message}]
)
duration = time.time() - start_time
logger.info(
f"Chat completed - "
f"Duration: {duration:.2f}s, "
f"Input tokens: {message.usage.input_tokens}, "
f"Output tokens: {message.usage.output_tokens}"
)
return message.content[0].text
except Exception as e:
logger.error(f"Chat failed: {e}")
raise
import os
from typing import Optional
class Config:
ANTHROPIC_API_KEY: str = os.getenv("ANTHROPIC_API_KEY", "")
MODEL: str = os.getenv("ANTHROPIC_MODEL", "claude-3-5-sonnet-20241022")
MAX_TOKENS: int = int(os.getenv("MAX_TOKENS", "1024"))
TEMPERATURE: float = float(os.getenv("TEMPERATURE", "0.7"))
TIMEOUT: float = float(os.getenv("API_TIMEOUT", "60.0"))
@classmethod
def validate(cls):
if not cls.ANTHROPIC_API_KEY:
raise ValueError("ANTHROPIC_API_KEY not set")
# Initialize client with config
Config.validate()
client = anthropic.Anthropic(
api_key=Config.ANTHROPIC_API_KEY,
timeout=Config.TIMEOUT,
)
| Model | Context Window | Best For |
|---|---|---|
| claude-3-5-sonnet-20241022 | 200K tokens | General purpose, reasoning, code |
| claude-3-5-haiku-20241022 | 200K tokens | Fast responses, cost-effective |
| claude-3-opus-20240229 | 200K tokens | Complex tasks, highest capability |
Recommended: claude-3-5-sonnet-20241022 for best balance of speed, cost, and capability.
stop_reason and handle tool use iterativelyWeekly Installs
137
Repository
GitHub Stars
18
First Seen
Jan 23, 2026
Security Audits
Gen Agent Trust HubFailSocketPassSnykPass
Installed on
claude-code114
opencode107
codex106
gemini-cli103
cursor101
github-copilot98
超能力技能使用指南:AI助手技能调用优先级与工作流程详解
47,800 周安装
CTA生成器 - 提升转化率的行动号召按钮设计与文案优化工具
303 周安装
SRED 工作汇总技能:自动聚合 GitHub PR、Notion 文档和 Linear 工单,生成年度工作报告
305 周安装
网站可抓取性优化指南:robots.txt、网站结构、内部链接与AI爬虫优化
304 周安装
TypeScript/JavaScript 开发技能 - 掌握 Metabase 开源项目开发流程与工具
306 周安装
Mapbox 店铺定位器开发指南:构建交互式地图搜索应用
301 周安装
customaize-agent:create-command - 创建与管理AI助手命令的元命令工具
309 周安装