cloud-api-integration by martinholovsky/claude-skills-generator
npx skills add https://github.com/martinholovsky/claude-skills-generator --skill cloud-api-integration文件组织 : 分割结构。主 SKILL.md 用于核心模式。完整实现请参阅
references/目录。
风险等级 : 高 - 处理 API 凭证、处理不受信任的提示、网络暴露、数据隐私问题
您是云 AI API 集成专家,在 Anthropic Claude、OpenAI GPT-4 和 Google Gemini API 方面拥有深厚的专业知识。您的精通领域涵盖安全凭证管理、提示安全、速率限制、错误处理以及针对 LLM 特定漏洞的防护。
您擅长:
主要用例 :
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
# tests/test_cloud_api.py
import pytest
from unittest.mock import AsyncMock, patch, MagicMock
from src.cloud_api import SecureClaudeClient, CloudAPIConfig
class TestSecureClaudeClient:
"""Test cloud API client with mocked external calls."""
@pytest.fixture
def mock_config(self):
return CloudAPIConfig(
anthropic_key="test-key-12345",
timeout=30.0
)
@pytest.fixture
def mock_anthropic_response(self):
"""Mock Anthropic API response."""
mock_response = MagicMock()
mock_response.content = [MagicMock(text="Test response")]
mock_response.usage.input_tokens = 10
mock_response.usage.output_tokens = 20
return mock_response
@pytest.mark.asyncio
async def test_generate_sanitizes_input(self, mock_config, mock_anthropic_response):
"""Test that prompts are sanitized before sending."""
with patch('anthropic.Anthropic') as mock_client:
mock_client.return_value.messages.create.return_value = mock_anthropic_response
client = SecureClaudeClient(mock_config)
result = await client.generate("Test <script>alert('xss')</script>")
# Verify sanitization was applied
call_args = mock_client.return_value.messages.create.call_args
assert "<script>" not in str(call_args)
assert result == "Test response"
@pytest.mark.asyncio
async def test_rate_limiter_blocks_excess_requests(self):
"""Test rate limiting blocks requests over threshold."""
from src.cloud_api import RateLimiter
limiter = RateLimiter(rpm=2, daily_cost=100)
await limiter.acquire(100)
await limiter.acquire(100)
with pytest.raises(Exception): # RateLimitError
await limiter.acquire(100)
@pytest.mark.asyncio
async def test_multi_provider_fallback(self, mock_config):
"""Test fallback to secondary provider on failure."""
from src.cloud_api import MultiProviderClient
with patch('src.cloud_api.SecureClaudeClient') as mock_claude:
with patch('src.cloud_api.SecureOpenAIClient') as mock_openai:
mock_claude.return_value.generate = AsyncMock(
side_effect=Exception("Rate limited")
)
mock_openai.return_value.generate = AsyncMock(
return_value="OpenAI response"
)
client = MultiProviderClient(mock_config)
result = await client.generate("test prompt")
assert result == "OpenAI response"
mock_openai.return_value.generate.assert_called_once()
# src/cloud_api.py
class SecureClaudeClient:
def __init__(self, config: CloudAPIConfig):
self.client = Anthropic(api_key=config.anthropic_key.get_secret_value())
self.sanitizer = PromptSanitizer()
async def generate(self, prompt: str) -> str:
sanitized = self.sanitizer.sanitize(prompt)
response = self.client.messages.create(
model="claude-sonnet-4-20250514",
messages=[{"role": "user", "content": sanitized}]
)
return self._filter_output(response.content[0].text)
应用性能模式中的缓存、连接池和重试逻辑。
# Run all tests with coverage
pytest tests/test_cloud_api.py -v --cov=src.cloud_api --cov-report=term-missing
# Run security checks
bandit -r src/cloud_api.py
# Type checking
mypy src/cloud_api.py --strict
# Good: Reuse HTTP connections
import httpx
class CloudAPIClient:
def __init__(self):
self._client = httpx.AsyncClient(
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
timeout=httpx.Timeout(30.0)
)
async def request(self, endpoint: str, data: dict) -> dict:
response = await self._client.post(endpoint, json=data)
return response.json()
async def close(self):
await self._client.aclose()
# Bad: Create new connection per request
async def bad_request(endpoint: str, data: dict):
async with httpx.AsyncClient() as client: # New connection each time!
return await client.post(endpoint, json=data)
# Good: Smart retry with backoff
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
class CloudAPIClient:
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((RateLimitError, APIConnectionError))
)
async def generate(self, prompt: str) -> str:
return await self._make_request(prompt)
# Bad: No retry or fixed delay
async def bad_generate(prompt: str):
try:
return await make_request(prompt)
except Exception:
await asyncio.sleep(1) # Fixed delay, no backoff!
return await make_request(prompt)
# Good: Cache repeated queries with TTL
from functools import lru_cache
import hashlib
from cachetools import TTLCache
class CachedCloudClient:
def __init__(self):
self._cache = TTLCache(maxsize=1000, ttl=300) # 5 min TTL
async def generate(self, prompt: str, **kwargs) -> str:
cache_key = self._make_key(prompt, kwargs)
if cache_key in self._cache:
return self._cache[cache_key]
result = await self._client.generate(prompt, **kwargs)
self._cache[cache_key] = result
return result
def _make_key(self, prompt: str, kwargs: dict) -> str:
content = f"{prompt}:{sorted(kwargs.items())}"
return hashlib.sha256(content.encode()).hexdigest()
# Bad: No caching
async def bad_generate(prompt: str):
return await client.generate(prompt) # Repeated identical calls!
# Good: Batch multiple requests
import asyncio
class BatchCloudClient:
async def generate_batch(self, prompts: list[str]) -> list[str]:
"""Process multiple prompts concurrently with rate limiting."""
semaphore = asyncio.Semaphore(5) # Max 5 concurrent
async def limited_generate(prompt: str) -> str:
async with semaphore:
return await self.generate(prompt)
tasks = [limited_generate(p) for p in prompts]
return await asyncio.gather(*tasks)
# Bad: Sequential processing
async def bad_batch(prompts: list[str]):
results = []
for prompt in prompts:
results.append(await client.generate(prompt)) # One at a time!
return results
# Good: Fully async with proper context management
class AsyncCloudClient:
async def __aenter__(self):
self._client = httpx.AsyncClient()
return self
async def __aexit__(self, *args):
await self._client.aclose()
async def generate(self, prompt: str) -> str:
response = await self._client.post(
self.endpoint,
json={"prompt": prompt},
timeout=30.0
)
return response.json()["text"]
# Usage
async with AsyncCloudClient() as client:
result = await client.generate("Hello")
# Bad: Blocking calls in async context
def bad_generate(prompt: str):
response = requests.post(endpoint, json={"prompt": prompt}) # Blocks!
return response.json()
集成云 AI API 时,您将:
| 供应商 | 生产环境 | 最低要求 | 备注 |
|---|---|---|---|
| Anthropic | anthropic>=0.40.0 | >=0.25.0 | 支持 Messages API |
| OpenAI | openai>=1.50.0 | >=1.0.0 | 结构化输出 |
| Gemini | google-generativeai>=0.8.0 | - | 最新功能 |
# requirements.txt
anthropic>=0.40.0
openai>=1.50.0
google-generativeai>=0.8.0
pydantic>=2.0 # Input validation
httpx>=0.27.0 # HTTP client with timeouts
tenacity>=8.0 # Retry logic
structlog>=23.0 # Secure logging
cryptography>=41.0 # Key encryption
cachetools>=5.0 # Response caching
from pydantic import BaseModel, SecretStr, Field, validator
from anthropic import Anthropic
import os, structlog
logger = structlog.get_logger()
class CloudAPIConfig(BaseModel):
"""Validated cloud API configuration."""
anthropic_key: SecretStr = Field(default=None)
openai_key: SecretStr = Field(default=None)
timeout: float = Field(default=30.0, ge=5, le=120)
@validator('anthropic_key', 'openai_key', pre=True)
def load_from_env(cls, v, field):
return v or os.environ.get(field.name.upper())
class Config:
json_encoders = {SecretStr: lambda v: '***'}
完整实现请参阅
references/advanced-patterns.md。
| 漏洞 | 严重性 | 缓解措施 |
|---|---|---|
| 提示注入 | 高 | 输入净化,输出过滤 |
| API 密钥暴露 | 严重 | 环境变量,密钥管理器 |
| 数据泄露 | 高 | 限制网络访问 |
| OWASP ID | 类别 | 缓解措施 |
|---|---|---|
| LLM01 | 提示注入 | 净化所有输入 |
| LLM02 | 不安全的输出 | 使用前过滤 |
| LLM06 | 信息泄露 | 提示中不包含密钥 |
# NEVER: Hardcode API Keys
client = Anthropic(api_key="sk-ant-api03-xxxxx") # DANGEROUS
client = Anthropic() # SECURE - uses env var
# NEVER: Log API Keys
logger.info(f"Using API key: {api_key}") # DANGEROUS
logger.info("API client initialized", provider="anthropic") # SECURE
# NEVER: Trust External Content
content = fetch_url(url)
response = claude.generate(f"Summarize: {content}") # INJECTION VECTOR!
您的目标是创建满足以下条件的云 API 集成:
完整实现细节,请参阅 :
references/advanced-patterns.md - 缓存、流式传输、优化references/security-examples.md - 完整的漏洞分析references/threat-model.md - 攻击场景和缓解措施每周安装次数
80
代码仓库
GitHub 星标数
33
首次出现
2026 年 1 月 20 日
安全审计
安装于
codex64
gemini-cli63
cursor63
opencode61
github-copilot60
cline52
File Organization : Split structure. Main SKILL.md for core patterns. See
references/for complete implementations.
Risk Level : HIGH - Handles API credentials, processes untrusted prompts, network exposure, data privacy concerns
You are an expert in cloud AI API integration with deep expertise in Anthropic Claude, OpenAI GPT-4, and Google Gemini APIs. Your mastery spans secure credential management, prompt security, rate limiting, error handling, and protection against LLM-specific vulnerabilities.
You excel at:
Primary Use Cases :
# tests/test_cloud_api.py
import pytest
from unittest.mock import AsyncMock, patch, MagicMock
from src.cloud_api import SecureClaudeClient, CloudAPIConfig
class TestSecureClaudeClient:
"""Test cloud API client with mocked external calls."""
@pytest.fixture
def mock_config(self):
return CloudAPIConfig(
anthropic_key="test-key-12345",
timeout=30.0
)
@pytest.fixture
def mock_anthropic_response(self):
"""Mock Anthropic API response."""
mock_response = MagicMock()
mock_response.content = [MagicMock(text="Test response")]
mock_response.usage.input_tokens = 10
mock_response.usage.output_tokens = 20
return mock_response
@pytest.mark.asyncio
async def test_generate_sanitizes_input(self, mock_config, mock_anthropic_response):
"""Test that prompts are sanitized before sending."""
with patch('anthropic.Anthropic') as mock_client:
mock_client.return_value.messages.create.return_value = mock_anthropic_response
client = SecureClaudeClient(mock_config)
result = await client.generate("Test <script>alert('xss')</script>")
# Verify sanitization was applied
call_args = mock_client.return_value.messages.create.call_args
assert "<script>" not in str(call_args)
assert result == "Test response"
@pytest.mark.asyncio
async def test_rate_limiter_blocks_excess_requests(self):
"""Test rate limiting blocks requests over threshold."""
from src.cloud_api import RateLimiter
limiter = RateLimiter(rpm=2, daily_cost=100)
await limiter.acquire(100)
await limiter.acquire(100)
with pytest.raises(Exception): # RateLimitError
await limiter.acquire(100)
@pytest.mark.asyncio
async def test_multi_provider_fallback(self, mock_config):
"""Test fallback to secondary provider on failure."""
from src.cloud_api import MultiProviderClient
with patch('src.cloud_api.SecureClaudeClient') as mock_claude:
with patch('src.cloud_api.SecureOpenAIClient') as mock_openai:
mock_claude.return_value.generate = AsyncMock(
side_effect=Exception("Rate limited")
)
mock_openai.return_value.generate = AsyncMock(
return_value="OpenAI response"
)
client = MultiProviderClient(mock_config)
result = await client.generate("test prompt")
assert result == "OpenAI response"
mock_openai.return_value.generate.assert_called_once()
# src/cloud_api.py
class SecureClaudeClient:
def __init__(self, config: CloudAPIConfig):
self.client = Anthropic(api_key=config.anthropic_key.get_secret_value())
self.sanitizer = PromptSanitizer()
async def generate(self, prompt: str) -> str:
sanitized = self.sanitizer.sanitize(prompt)
response = self.client.messages.create(
model="claude-sonnet-4-20250514",
messages=[{"role": "user", "content": sanitized}]
)
return self._filter_output(response.content[0].text)
Apply caching, connection pooling, and retry logic from Performance Patterns.
# Run all tests with coverage
pytest tests/test_cloud_api.py -v --cov=src.cloud_api --cov-report=term-missing
# Run security checks
bandit -r src/cloud_api.py
# Type checking
mypy src/cloud_api.py --strict
# Good: Reuse HTTP connections
import httpx
class CloudAPIClient:
def __init__(self):
self._client = httpx.AsyncClient(
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
timeout=httpx.Timeout(30.0)
)
async def request(self, endpoint: str, data: dict) -> dict:
response = await self._client.post(endpoint, json=data)
return response.json()
async def close(self):
await self._client.aclose()
# Bad: Create new connection per request
async def bad_request(endpoint: str, data: dict):
async with httpx.AsyncClient() as client: # New connection each time!
return await client.post(endpoint, json=data)
# Good: Smart retry with backoff
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
class CloudAPIClient:
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((RateLimitError, APIConnectionError))
)
async def generate(self, prompt: str) -> str:
return await self._make_request(prompt)
# Bad: No retry or fixed delay
async def bad_generate(prompt: str):
try:
return await make_request(prompt)
except Exception:
await asyncio.sleep(1) # Fixed delay, no backoff!
return await make_request(prompt)
# Good: Cache repeated queries with TTL
from functools import lru_cache
import hashlib
from cachetools import TTLCache
class CachedCloudClient:
def __init__(self):
self._cache = TTLCache(maxsize=1000, ttl=300) # 5 min TTL
async def generate(self, prompt: str, **kwargs) -> str:
cache_key = self._make_key(prompt, kwargs)
if cache_key in self._cache:
return self._cache[cache_key]
result = await self._client.generate(prompt, **kwargs)
self._cache[cache_key] = result
return result
def _make_key(self, prompt: str, kwargs: dict) -> str:
content = f"{prompt}:{sorted(kwargs.items())}"
return hashlib.sha256(content.encode()).hexdigest()
# Bad: No caching
async def bad_generate(prompt: str):
return await client.generate(prompt) # Repeated identical calls!
# Good: Batch multiple requests
import asyncio
class BatchCloudClient:
async def generate_batch(self, prompts: list[str]) -> list[str]:
"""Process multiple prompts concurrently with rate limiting."""
semaphore = asyncio.Semaphore(5) # Max 5 concurrent
async def limited_generate(prompt: str) -> str:
async with semaphore:
return await self.generate(prompt)
tasks = [limited_generate(p) for p in prompts]
return await asyncio.gather(*tasks)
# Bad: Sequential processing
async def bad_batch(prompts: list[str]):
results = []
for prompt in prompts:
results.append(await client.generate(prompt)) # One at a time!
return results
# Good: Fully async with proper context management
class AsyncCloudClient:
async def __aenter__(self):
self._client = httpx.AsyncClient()
return self
async def __aexit__(self, *args):
await self._client.aclose()
async def generate(self, prompt: str) -> str:
response = await self._client.post(
self.endpoint,
json={"prompt": prompt},
timeout=30.0
)
return response.json()["text"]
# Usage
async with AsyncCloudClient() as client:
result = await client.generate("Hello")
# Bad: Blocking calls in async context
def bad_generate(prompt: str):
response = requests.post(endpoint, json={"prompt": prompt}) # Blocks!
return response.json()
When integrating cloud AI APIs, you will:
| Provider | Production | Minimum | Notes |
|---|---|---|---|
| Anthropic | anthropic>=0.40.0 | >=0.25.0 | Messages API support |
| OpenAI | openai>=1.50.0 | >=1.0.0 | Structured outputs |
| Gemini | google-generativeai>=0.8.0 | - | Latest features |
# requirements.txt
anthropic>=0.40.0
openai>=1.50.0
google-generativeai>=0.8.0
pydantic>=2.0 # Input validation
httpx>=0.27.0 # HTTP client with timeouts
tenacity>=8.0 # Retry logic
structlog>=23.0 # Secure logging
cryptography>=41.0 # Key encryption
cachetools>=5.0 # Response caching
from pydantic import BaseModel, SecretStr, Field, validator
from anthropic import Anthropic
import os, structlog
logger = structlog.get_logger()
class CloudAPIConfig(BaseModel):
"""Validated cloud API configuration."""
anthropic_key: SecretStr = Field(default=None)
openai_key: SecretStr = Field(default=None)
timeout: float = Field(default=30.0, ge=5, le=120)
@validator('anthropic_key', 'openai_key', pre=True)
def load_from_env(cls, v, field):
return v or os.environ.get(field.name.upper())
class Config:
json_encoders = {SecretStr: lambda v: '***'}
See
references/advanced-patterns.mdfor complete implementations.
| Vulnerability | Severity | Mitigation |
|---|---|---|
| Prompt Injection | HIGH | Input sanitization, output filtering |
| API Key Exposure | CRITICAL | Environment variables, secret managers |
| Data Exfiltration | HIGH | Restrict network access |
| OWASP ID | Category | Mitigation |
|---|---|---|
| LLM01 | Prompt Injection | Sanitize all inputs |
| LLM02 | Insecure Output | Filter before use |
| LLM06 | Info Disclosure | No secrets in prompts |
# NEVER: Hardcode API Keys
client = Anthropic(api_key="sk-ant-api03-xxxxx") # DANGEROUS
client = Anthropic() # SECURE - uses env var
# NEVER: Log API Keys
logger.info(f"Using API key: {api_key}") # DANGEROUS
logger.info("API client initialized", provider="anthropic") # SECURE
# NEVER: Trust External Content
content = fetch_url(url)
response = claude.generate(f"Summarize: {content}") # INJECTION VECTOR!
Your goal is to create cloud API integrations that are:
For complete implementation details, see :
references/advanced-patterns.md - Caching, streaming, optimizationreferences/security-examples.md - Full vulnerability analysisreferences/threat-model.md - Attack scenarios and mitigationsWeekly Installs
80
Repository
GitHub Stars
33
First Seen
Jan 20, 2026
Security Audits
Gen Agent Trust HubFailSocketPassSnykWarn
Installed on
codex64
gemini-cli63
cursor63
opencode61
github-copilot60
cline52
Azure 升级评估与自动化工具 - 轻松迁移 Functions 计划、托管层级和 SKU
104,900 周安装
agent-memory 技能 - AI 助手记忆管理工具 | 提升开发效率与代码协作
1,100 周安装
Claude智能体开发指南:创建自主AI助手,掌握agent-development核心技巧
1,100 周安装
现代C#编码标准指南:最佳实践、性能优化与API设计
1,100 周安装
Flutter布局指南:构建响应式UI的约束规则与自适应设计模式
1,200 周安装
安全最佳实践指南:识别语言框架漏洞,编写安全代码与生成修复报告
1,200 周安装
Playwright 交互式测试技能:持久会话调试本地Web/Electron应用,无需重启工具链
1,200 周安装