python by martinholovsky/claude-skills-generator
npx skills add https://github.com/martinholovsky/claude-skills-generator --skill python本技能针对高风险需求采用拆分结构:
| 关卡 | 状态 | 备注 |
|---|---|---|
| 0.1 领域专业知识 | 通过 | 类型安全、异步、安全、测试 |
| 0.2 漏洞研究 | 通过 | 记录 5 个以上 CVE(2025-11-20) |
| 0.5 幻觉检查 | 通过 | 示例已在 Python 3.11+ 上测试 |
| 0.11 文件组织 | 拆分 | 高风险,约 450 行 + 参考资料 |
风险等级 : 高
理由 : Python 后端服务处理身份验证、数据库访问、文件操作和外部 API 通信。输入验证、反序列化、命令执行和加密方面的漏洞可能导致数据泄露和系统被攻陷。
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
您是一位专业的 Python 后端开发专家,专注于构建安全、可维护且高性能的服务。
| 场景 | 方法 |
|---|---|
| 用户输入 | 使用 Pydantic 验证,清理输出 |
| 数据库查询 | 使用 ORM 或参数化查询,切勿格式化字符串 |
| 文件操作 | 验证路径,使用 pathlib,检查包含关系 |
| 子进程 | 使用列表参数,切勿在用户输入时使用 shell=True |
| 密钥 | 从环境变量或密钥管理器加载 |
| 加密 | 使用 cryptography 库,切勿自己实现 |
import pytest
from my_service import UserService, UserNotFoundError
class TestUserService:
@pytest.mark.asyncio
async def test_get_user_returns_user_when_exists(self, db_session):
service = UserService(db_session)
user_id = await service.create_user("alice", "alice@example.com")
user = await service.get_user(user_id)
assert user.username == "alice"
@pytest.mark.asyncio
async def test_get_user_raises_when_not_found(self, db_session):
service = UserService(db_session)
with pytest.raises(UserNotFoundError):
await service.get_user(99999)
@pytest.mark.asyncio
async def test_create_user_validates_email(self, db_session):
service = UserService(db_session)
with pytest.raises(ValueError, match="Invalid email"):
await service.create_user("bob", "not-an-email")
class UserNotFoundError(Exception): pass
class UserService:
def __init__(self, db: AsyncSession):
self.db = db
async def get_user(self, user_id: int) -> User:
user = await self.db.get(User, user_id)
if not user:
raise UserNotFoundError(f"User {user_id} not found")
return user
async def create_user(self, username: str, email: str) -> int:
if "@" not in email:
raise ValueError("Invalid email format")
# ... minimal implementation to pass tests
pytest --cov=src # All tests pass
mypy src/ --strict # Type check passes
bandit -r src/ -ll # Security scan passes
pip-audit && safety check # Dependencies clean
# BAD: Sequential requests (slow)
for url in urls:
response = await client.get(url) # Waits for each one
# GOOD: Concurrent requests with gather
tasks = [client.get(url) for url in urls]
responses = await asyncio.gather(*tasks) # All at once
# BAD: Load all into memory
return [process(line) for line in f.readlines()] # OOM risk
# GOOD: Generator yields one at a time
def process_large_file(filepath: str) -> Iterator[dict]:
with open(filepath) as f:
for line in f:
yield process(line) # Memory efficient
# BAD: List for membership testing - O(n)
required in user_perms_list # Slow for large lists
# GOOD: Set for membership testing - O(1)
required in user_perms_set # Fast lookup
# BAD: Repeated string concatenation
result = ""; for f in fields: result += f + ", " # Creates new string each time
# GOOD: Join for string building
", ".join(fields) # Single allocation
# BAD: New connection per request
engine = create_async_engine(DATABASE_URL) # Connection overhead each time
# GOOD: Reuse pooled connections
engine = create_async_engine(DATABASE_URL, pool_size=20, max_overflow=10)
async_session = sessionmaker(engine, class_=AsyncSession)
async def get_user(user_id: int):
async with async_session() as session: # Reuses pooled connection
return await session.get(User, user_id)
# BAD: Individual inserts (N round trips)
for user in users:
db.add(User(**user)); await db.commit() # N commits = slow
# GOOD: Batch insert (1 round trip)
stmt = insert(User).values(users)
await db.execute(stmt); await db.commit() # Single commit
# GOOD: Chunked for very large datasets
for i in range(0, len(users), 1000):
await db.execute(insert(User).values(users[i:i+1000]))
await db.commit()
| 类别 | 版本 | 备注 |
|---|---|---|
| 长期支持/推荐 | Python 3.11+ | 性能改进,更好的错误信息 |
| 最低要求 | Python 3.9 | 安全支持至 2025 年 10 月 |
| 避免使用 | Python 3.8- | 生命周期结束,无安全补丁 |
# pyproject.toml
[project]
dependencies = [
"pydantic>=2.0", "email-validator>=2.0", # Validation
"cryptography>=41.0", "argon2-cffi>=21.0", # Cryptography
"PyJWT>=2.8", "sqlalchemy>=2.0", "asyncpg>=0.28",
"httpx>=0.25", "bandit>=1.7",
]
[project.optional-dependencies]
dev = ["pytest>=7.0", "pytest-asyncio>=0.21", "hypothesis>=6.0", "safety>=2.0", "pip-audit>=2.0"]
from pydantic import BaseModel, Field, field_validator, EmailStr
from typing import Annotated
import re
class UserCreate(BaseModel):
"""Validated user creation request."""
username: Annotated[str, Field(min_length=3, max_length=50)]
email: EmailStr
password: Annotated[str, Field(min_length=12)]
@field_validator('username')
@classmethod
def validate_username(cls, v: str) -> str:
if not re.match(r'^[a-zA-Z0-9_-]+$', v):
raise ValueError('Username must be alphanumeric')
return v
@field_validator('password')
@classmethod
def validate_password_strength(cls, v: str) -> str:
if not all([re.search(r'[A-Z]', v), re.search(r'[a-z]', v), re.search(r'\d', v)]):
raise ValueError('Password needs uppercase, lowercase, and digit')
return v
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
ph = PasswordHasher(time_cost=3, memory_cost=65536, parallelism=4)
def hash_password(password: str) -> str:
return ph.hash(password)
def verify_password(password: str, hash: str) -> bool:
try:
ph.verify(hash, password)
return True
except VerifyMismatchError:
return False
from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncSession
# NEVER: f"SELECT * FROM users WHERE username = '{username}'"
async def get_user_safe(db: AsyncSession, username: str) -> User | None:
stmt = select(User).where(User.username == username)
result = await db.execute(stmt)
return result.scalar_one_or_none()
async def search_users(db: AsyncSession, pattern: str) -> list:
stmt = text("SELECT * FROM users WHERE username LIKE :pattern")
result = await db.execute(stmt, {"pattern": f"%{pattern}%"})
return result.fetchall()
from pathlib import Path
def safe_read_file(base_dir: Path, user_filename: str) -> str:
if '..' in user_filename or user_filename.startswith('/'):
raise ValueError("Invalid filename")
file_path = (base_dir / user_filename).resolve()
if not file_path.is_relative_to(base_dir.resolve()):
raise ValueError("Path traversal detected")
return file_path.read_text()
import subprocess
ALLOWED_PROGRAMS = {'git', 'python', 'pip'}
def run_command_safe(program: str, args: list[str]) -> str:
if program not in ALLOWED_PROGRAMS:
raise ValueError(f"Program not allowed: {program}")
result = subprocess.run(
[program, *args],
capture_output=True, text=True, timeout=30, check=True,
)
return result.stdout
| CVE ID | 严重性 | 描述 | 缓解措施 |
|---|---|---|---|
| CVE-2024-12718 | 严重 | tarfile 过滤器绕过 | Python 3.12.3+,filter='data' |
| CVE-2024-12254 | 高 | asyncio 内存耗尽 | 升级,监控内存 |
| CVE-2024-5535 | 中 | SSLContext 缓冲区过度读取 | 升级 OpenSSL |
| CVE-2023-50782 | 高 | RSA 信息泄露 | 升级 cryptography |
| CVE-2023-27043 | 中 | 电子邮件解析漏洞 | 严格的电子邮件验证 |
完整 CVE 详情和缓解代码请参见
references/security-examples.md
| 类别 | 风险 | 关键缓解措施 |
|---|---|---|
| A01 访问控制失效 | 高 | 验证权限,装饰器 |
| A02 加密机制失效 | 高 | cryptography 库,Argon2 |
| A03 注入 | 严重 | 参数化查询,不使用 shell=True |
| A04 不安全设计 | 中 | 类型安全,验证层 |
| A05 安全配置错误 | 高 | 安全默认值,审计依赖项 |
| A06 易受攻击的组件 | 高 | 在 CI 中使用 pip-audit,safety |
from pydantic import BaseModel, field_validator
import os, logging
# Secure base model - reject unknown fields, strip whitespace
class SecureInput(BaseModel):
model_config = {'extra': 'forbid', 'str_strip_whitespace': True}
@field_validator('*', mode='before')
@classmethod
def reject_null_bytes(cls, v):
if isinstance(v, str) and '\x00' in v:
raise ValueError('Null bytes not allowed')
return v
# Secrets from environment (NEVER hardcode)
API_KEY = os.environ["API_KEY"]
DB_URL = os.environ["DATABASE_URL"]
# Safe error handling - log details, return safe message
class AppError(Exception):
def __init__(self, message: str, internal: str = None):
self.message = message
if internal:
logging.error(f"{message}: {internal}")
def to_response(self) -> dict:
return {"error": self.message}
密钥管理器集成请参见
references/advanced-patterns.md
bandit -r src/ -ll # Static analysis
pip-audit && safety check # Dependency vulnerabilities
mypy src/ --strict # Type checking
import pytest
from pathlib import Path
def test_sql_injection_prevented(db):
for payload in ["'; DROP TABLE users; --", "' OR '1'='1", "admin'--"]:
assert get_user_safe(db, payload) is None
def test_path_traversal_blocked():
base = Path("/app/data")
for attack in ["../etc/passwd", "..\\windows\\system32", "foo/../../etc/passwd"]:
with pytest.raises(ValueError, match="traversal|Invalid"):
safe_read_file(base, attack)
def test_command_injection_blocked():
with pytest.raises(ValueError, match="not allowed"):
run_command_safe("rm", ["-rf", "/"])
全面的测试模式请参见
references/security-examples.md
| 反模式 | 错误做法 | 正确做法 |
|---|---|---|
| SQL 格式化 | f"SELECT * WHERE id={id}" | select(User).where(User.id == id) |
| 反序列化不可信数据 | pickle.loads(data) | json.loads(data) |
| Shell 注入 | subprocess.run(f"echo {x}", shell=True) | subprocess.run(["echo", x]) |
| 弱哈希 | hashlib.md5(pw).hexdigest() | PasswordHasher().hash(pw) |
| 硬编码密钥 | API_KEY = "sk-123..." | API_KEY = os.environ["API_KEY"] |
pytest --cov=srcmypy src/ --strictbandit -r src/ -llpip-audit && safety check创建类型安全、安全、可测试且可维护的 Python 代码。
安全要点 :
攻击场景和威胁建模请参见
references/threat-model.md
每周安装次数
88
代码仓库
GitHub 星标数
32
首次出现
2026 年 1 月 20 日
安全审计
安装于
gemini-cli72
codex72
opencode71
github-copilot67
cursor65
claude-code60
This skill uses a split structure for HIGH-RISK requirements:
| Gate | Status | Notes |
|---|---|---|
| 0.1 Domain Expertise | PASSED | Type safety, async, security, testing |
| 0.2 Vulnerability Research | PASSED | 5+ CVEs documented (2025-11-20) |
| 0.5 Hallucination Check | PASSED | Examples tested on Python 3.11+ |
| 0.11 File Organization | Split | HIGH-RISK, ~450 lines + references |
Risk Level : HIGH
Justification : Python backend services handle authentication, database access, file operations, and external API communication. Vulnerabilities in input validation, deserialization, command execution, and cryptography can lead to data breaches and system compromise.
You are an expert Python backend developer specializing in secure, maintainable, and performant services.
| Situation | Approach |
|---|---|
| User input | Validate with Pydantic, sanitize output |
| Database queries | Use ORM or parameterized queries, never format strings |
| File operations | Validate paths, use pathlib, check containment |
| Subprocess | Use list args, never shell=True with user input |
| Secrets | Load from environment or secret manager |
| Cryptography | Use cryptography library, never roll your own |
import pytest
from my_service import UserService, UserNotFoundError
class TestUserService:
@pytest.mark.asyncio
async def test_get_user_returns_user_when_exists(self, db_session):
service = UserService(db_session)
user_id = await service.create_user("alice", "alice@example.com")
user = await service.get_user(user_id)
assert user.username == "alice"
@pytest.mark.asyncio
async def test_get_user_raises_when_not_found(self, db_session):
service = UserService(db_session)
with pytest.raises(UserNotFoundError):
await service.get_user(99999)
@pytest.mark.asyncio
async def test_create_user_validates_email(self, db_session):
service = UserService(db_session)
with pytest.raises(ValueError, match="Invalid email"):
await service.create_user("bob", "not-an-email")
class UserNotFoundError(Exception): pass
class UserService:
def __init__(self, db: AsyncSession):
self.db = db
async def get_user(self, user_id: int) -> User:
user = await self.db.get(User, user_id)
if not user:
raise UserNotFoundError(f"User {user_id} not found")
return user
async def create_user(self, username: str, email: str) -> int:
if "@" not in email:
raise ValueError("Invalid email format")
# ... minimal implementation to pass tests
pytest --cov=src # All tests pass
mypy src/ --strict # Type check passes
bandit -r src/ -ll # Security scan passes
pip-audit && safety check # Dependencies clean
# BAD: Sequential requests (slow)
for url in urls:
response = await client.get(url) # Waits for each one
# GOOD: Concurrent requests with gather
tasks = [client.get(url) for url in urls]
responses = await asyncio.gather(*tasks) # All at once
# BAD: Load all into memory
return [process(line) for line in f.readlines()] # OOM risk
# GOOD: Generator yields one at a time
def process_large_file(filepath: str) -> Iterator[dict]:
with open(filepath) as f:
for line in f:
yield process(line) # Memory efficient
# BAD: List for membership testing - O(n)
required in user_perms_list # Slow for large lists
# GOOD: Set for membership testing - O(1)
required in user_perms_set # Fast lookup
# BAD: Repeated string concatenation
result = ""; for f in fields: result += f + ", " # Creates new string each time
# GOOD: Join for string building
", ".join(fields) # Single allocation
# BAD: New connection per request
engine = create_async_engine(DATABASE_URL) # Connection overhead each time
# GOOD: Reuse pooled connections
engine = create_async_engine(DATABASE_URL, pool_size=20, max_overflow=10)
async_session = sessionmaker(engine, class_=AsyncSession)
async def get_user(user_id: int):
async with async_session() as session: # Reuses pooled connection
return await session.get(User, user_id)
# BAD: Individual inserts (N round trips)
for user in users:
db.add(User(**user)); await db.commit() # N commits = slow
# GOOD: Batch insert (1 round trip)
stmt = insert(User).values(users)
await db.execute(stmt); await db.commit() # Single commit
# GOOD: Chunked for very large datasets
for i in range(0, len(users), 1000):
await db.execute(insert(User).values(users[i:i+1000]))
await db.commit()
| Category | Version | Notes |
|---|---|---|
| LTS/Recommended | Python 3.11+ | Performance improvements, better errors |
| Minimum | Python 3.9 | Security support until Oct 2025 |
| Avoid | Python 3.8- | EOL, no security patches |
# pyproject.toml
[project]
dependencies = [
"pydantic>=2.0", "email-validator>=2.0", # Validation
"cryptography>=41.0", "argon2-cffi>=21.0", # Cryptography
"PyJWT>=2.8", "sqlalchemy>=2.0", "asyncpg>=0.28",
"httpx>=0.25", "bandit>=1.7",
]
[project.optional-dependencies]
dev = ["pytest>=7.0", "pytest-asyncio>=0.21", "hypothesis>=6.0", "safety>=2.0", "pip-audit>=2.0"]
from pydantic import BaseModel, Field, field_validator, EmailStr
from typing import Annotated
import re
class UserCreate(BaseModel):
"""Validated user creation request."""
username: Annotated[str, Field(min_length=3, max_length=50)]
email: EmailStr
password: Annotated[str, Field(min_length=12)]
@field_validator('username')
@classmethod
def validate_username(cls, v: str) -> str:
if not re.match(r'^[a-zA-Z0-9_-]+$', v):
raise ValueError('Username must be alphanumeric')
return v
@field_validator('password')
@classmethod
def validate_password_strength(cls, v: str) -> str:
if not all([re.search(r'[A-Z]', v), re.search(r'[a-z]', v), re.search(r'\d', v)]):
raise ValueError('Password needs uppercase, lowercase, and digit')
return v
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
ph = PasswordHasher(time_cost=3, memory_cost=65536, parallelism=4)
def hash_password(password: str) -> str:
return ph.hash(password)
def verify_password(password: str, hash: str) -> bool:
try:
ph.verify(hash, password)
return True
except VerifyMismatchError:
return False
from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncSession
# NEVER: f"SELECT * FROM users WHERE username = '{username}'"
async def get_user_safe(db: AsyncSession, username: str) -> User | None:
stmt = select(User).where(User.username == username)
result = await db.execute(stmt)
return result.scalar_one_or_none()
async def search_users(db: AsyncSession, pattern: str) -> list:
stmt = text("SELECT * FROM users WHERE username LIKE :pattern")
result = await db.execute(stmt, {"pattern": f"%{pattern}%"})
return result.fetchall()
from pathlib import Path
def safe_read_file(base_dir: Path, user_filename: str) -> str:
if '..' in user_filename or user_filename.startswith('/'):
raise ValueError("Invalid filename")
file_path = (base_dir / user_filename).resolve()
if not file_path.is_relative_to(base_dir.resolve()):
raise ValueError("Path traversal detected")
return file_path.read_text()
import subprocess
ALLOWED_PROGRAMS = {'git', 'python', 'pip'}
def run_command_safe(program: str, args: list[str]) -> str:
if program not in ALLOWED_PROGRAMS:
raise ValueError(f"Program not allowed: {program}")
result = subprocess.run(
[program, *args],
capture_output=True, text=True, timeout=30, check=True,
)
return result.stdout
| CVE ID | Severity | Description | Mitigation |
|---|---|---|---|
| CVE-2024-12718 | CRITICAL | tarfile filter bypass | Python 3.12.3+, filter='data' |
| CVE-2024-12254 | HIGH | asyncio memory exhaustion | Upgrade, monitor memory |
| CVE-2024-5535 | MEDIUM | SSLContext buffer over-read | Upgrade OpenSSL |
| CVE-2023-50782 | HIGH | RSA information disclosure | Upgrade cryptography |
| CVE-2023-27043 | MEDIUM | Email parsing vulnerability | Strict email validation |
See
references/security-examples.mdfor complete CVE details and mitigation code
| Category | Risk | Key Mitigations |
|---|---|---|
| A01 Broken Access Control | HIGH | Validate permissions, decorators |
| A02 Cryptographic Failures | HIGH | cryptography lib, Argon2 |
| A03 Injection | CRITICAL | Parameterized queries, no shell=True |
| A04 Insecure Design | MEDIUM | Type safety, validation layers |
| A05 Misconfiguration | HIGH | Safe defaults, audit deps |
| A06 Vulnerable Components | HIGH | pip-audit, safety in CI |
from pydantic import BaseModel, field_validator
import os, logging
# Secure base model - reject unknown fields, strip whitespace
class SecureInput(BaseModel):
model_config = {'extra': 'forbid', 'str_strip_whitespace': True}
@field_validator('*', mode='before')
@classmethod
def reject_null_bytes(cls, v):
if isinstance(v, str) and '\x00' in v:
raise ValueError('Null bytes not allowed')
return v
# Secrets from environment (NEVER hardcode)
API_KEY = os.environ["API_KEY"]
DB_URL = os.environ["DATABASE_URL"]
# Safe error handling - log details, return safe message
class AppError(Exception):
def __init__(self, message: str, internal: str = None):
self.message = message
if internal:
logging.error(f"{message}: {internal}")
def to_response(self) -> dict:
return {"error": self.message}
See
references/advanced-patterns.mdfor secrets manager integration
bandit -r src/ -ll # Static analysis
pip-audit && safety check # Dependency vulnerabilities
mypy src/ --strict # Type checking
import pytest
from pathlib import Path
def test_sql_injection_prevented(db):
for payload in ["'; DROP TABLE users; --", "' OR '1'='1", "admin'--"]:
assert get_user_safe(db, payload) is None
def test_path_traversal_blocked():
base = Path("/app/data")
for attack in ["../etc/passwd", "..\\windows\\system32", "foo/../../etc/passwd"]:
with pytest.raises(ValueError, match="traversal|Invalid"):
safe_read_file(base, attack)
def test_command_injection_blocked():
with pytest.raises(ValueError, match="not allowed"):
run_command_safe("rm", ["-rf", "/"])
See
references/security-examples.mdfor comprehensive test patterns
| Anti-Pattern | Bad | Good |
|---|---|---|
| SQL formatting | f"SELECT * WHERE id={id}" | select(User).where(User.id == id) |
| Pickle untrusted | pickle.loads(data) | json.loads(data) |
| Shell injection | subprocess.run(f"echo {x}", shell=True) | subprocess.run(["echo", x]) |
| Weak hashing |
pytest --cov=srcmypy src/ --strictbandit -r src/ -llpip-audit && safety checkCreate Python code that is type safe , secure , testable , and maintainable.
Security Essentials :
For attack scenarios and threat modeling, see
references/threat-model.md
Weekly Installs
88
Repository
GitHub Stars
32
First Seen
Jan 20, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
gemini-cli72
codex72
opencode71
github-copilot67
cursor65
claude-code60
Azure RBAC 权限管理工具:查找最小角色、创建自定义角色与自动化分配
142,000 周安装
Datadog自动化监控:通过Rube MCP与Composio实现指标、日志、仪表板管理
69 周安装
Intercom自动化指南:通过Rube MCP与Composio实现客户支持对话管理
69 周安装
二进制初步分析指南:使用ReVa工具快速识别恶意软件与逆向工程
69 周安装
PrivateInvestigator 道德人员查找工具 | 公开数据调查、反向搜索与背景研究
69 周安装
TorchTitan:PyTorch原生分布式大语言模型预训练平台,支持4D并行与H100 GPU加速
69 周安装
screenshot 截图技能:跨平台桌面截图工具,支持macOS/Linux权限管理与多模式捕获
69 周安装
hashlib.md5(pw).hexdigest() |
PasswordHasher().hash(pw) |
| Hardcoded secrets | API_KEY = "sk-123..." | API_KEY = os.environ["API_KEY"] |