websocket by martinholovsky/claude-skills-generator
npx skills add https://github.com/martinholovsky/claude-skills-generator --skill websocket关卡 0.2 : 已通过(记录了 5 个以上漏洞)- CVE-2024-23898, CVE-2024-26135, CVE-2023-0957
风险等级 : 高
理由 : WebSocket 连接绕过了同源策略保护,使其容易受到跨站 WebSocket 劫持攻击。持久连接需要仔细的身份验证、会话管理和输入验证。
您是一位 WebSocket 安全专家,了解持久双向连接的独特漏洞。
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
| 情况 | 方法 |
|---|---|
| 新连接 | 验证 Origin,要求身份验证令牌 |
| 每条消息 | 验证格式,检查操作授权 |
| 敏感操作 | 重新验证会话,记录操作 |
| 空闲连接 | 在非活动期后超时 |
| 错误情况 | 关闭连接,记录详细信息 |
| 组件 | 版本 | 备注 |
|---|---|---|
| FastAPI/Starlette | 0.115+ | WebSocket 支持 |
| websockets | 12.0+ | Python WebSocket 库 |
WEBSOCKET_CONFIG = {
"max_message_size": 1024 * 1024, # 1MB
"max_connections_per_ip": 10,
"idle_timeout_seconds": 300,
"messages_per_minute": 60,
}
# 切勿使用 "*" 作为来源
ALLOWED_ORIGINS = ["https://app.example.com", "https://admin.example.com"]
import pytest
from httpx import AsyncClient, ASGITransport
from fastapi.testclient import TestClient
# 首先测试安全边界
@pytest.mark.asyncio
async def test_origin_validation_rejects_invalid():
"""CSWSH 预防 - 必须拒绝无效来源。"""
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test"
) as client:
# 在实现来源验证之前,此测试应失败
with pytest.raises(Exception):
async with client.websocket_connect(
"/ws?token=valid",
headers={"Origin": "https://evil.com"}
):
pass
@pytest.mark.asyncio
async def test_authentication_required():
"""必须拒绝没有有效令牌的连接。"""
with TestClient(app) as client:
with pytest.raises(Exception):
with client.websocket_connect("/ws") as ws:
pass
@pytest.mark.asyncio
async def test_message_authorization():
"""每个消息操作都必须经过授权。"""
with TestClient(app) as client:
with client.websocket_connect(
"/ws?token=readonly_user",
headers={"Origin": "https://app.example.com"}
) as ws:
ws.send_json({"action": "delete", "id": "123"})
response = ws.receive_json()
assert response.get("error") == "Permission denied"
# 仅实现通过测试所需的功能
async def validate_origin(websocket: WebSocket) -> bool:
origin = websocket.headers.get("origin")
if not origin or origin not in ALLOWED_ORIGINS:
await websocket.close(code=4003, reason="Invalid origin")
return False
return True
# 运行所有 WebSocket 测试
pytest tests/websocket/ -v --asyncio-mode=auto
# 检查安全问题
bandit -r src/websocket/
# 验证没有回归
pytest tests/ -v
# 错误做法 - 为每个请求创建新连接
ws = await create_connection(user_id) # 开销大!
# 正确做法 - 从池中重用连接
class ConnectionPool:
def __init__(self, max_size: int = 100):
self.connections: dict[str, WebSocket] = {}
async def get_or_create(self, user_id: str) -> WebSocket:
if user_id not in self.connections:
self.connections[user_id] = await create_connection(user_id)
return self.connections[user_id]
# 错误做法 - 一次发送一条消息
for item in items:
await websocket.send_json({"type": "item", "data": item})
# 正确做法 - 批量发送消息以减少开销
await websocket.send_json({"type": "batch", "data": items[:50]})
# 错误做法 - 对高频数据使用 JSON(约 80 字节)
await websocket.send_json({"x": 123.456, "y": 789.012, "z": 456.789})
# 正确做法 - 二进制格式(20 字节)
import struct
await websocket.send_bytes(struct.pack('!3f', 123.456, 789.012, 456.789))
# 错误做法 - 固定频率的心跳
HEARTBEAT_INTERVAL = 5 # 每 5 秒一次
# 正确做法 - 基于活动性的自适应心跳
interval = 60 if (time() - last_activity) < 60 else 30
# 错误做法 - 在慢速客户端上阻塞
await ws.send_json(message)
# 正确做法 - 超时和有界队列
from collections import deque
queue = deque(maxlen=100) # 队列满时丢弃最旧的消息
try:
await asyncio.wait_for(ws.send_json(message), timeout=1.0)
except asyncio.TimeoutError:
pass # 客户端太慢
from fastapi import WebSocket
async def validate_origin(websocket: WebSocket) -> bool:
"""根据允许列表验证 WebSocket 来源。"""
origin = websocket.headers.get("origin")
if not origin or origin not in ALLOWED_ORIGINS:
await websocket.close(code=4003, reason="Invalid origin")
return False
return True
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
if not await validate_origin(websocket):
return
await websocket.accept()
from jose import jwt, JWTError
async def authenticate_websocket(websocket: WebSocket) -> User | None:
"""通过令牌进行身份验证(不要使用 cookie - 容易受到 CSWSH 攻击)。"""
token = websocket.query_params.get("token")
if not token:
await websocket.close(code=4001, reason="Authentication required")
return None
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
user = await user_service.get(payload.get("sub"))
if not user:
await websocket.close(code=4001, reason="User not found")
return None
return user
except JWTError:
await websocket.close(code=4001, reason="Invalid token")
return None
from pydantic import BaseModel, field_validator
class WebSocketMessage(BaseModel):
action: str
data: dict
@field_validator('action')
@classmethod
def validate_action(cls, v):
if v not in {'subscribe', 'unsubscribe', 'send', 'query'}:
raise ValueError(f'Invalid action: {v}')
return v
async def handle_message(websocket: WebSocket, user: User, raw_data: dict):
try:
message = WebSocketMessage(**raw_data)
except ValueError:
await websocket.send_json({"error": "Invalid message format"})
return
if not user.has_permission(f"ws:{message.action}"):
await websocket.send_json({"error": "Permission denied"})
return
result = await handlers[message.action](user, message.data)
await websocket.send_json(result)
from collections import defaultdict
from time import time
class SecureConnectionManager:
def __init__(self):
self.connections: dict[str, WebSocket] = {}
self.message_counts: dict[str, list[float]] = defaultdict(list)
self.connections_per_ip: dict[str, int] = defaultdict(int)
async def connect(self, websocket: WebSocket, user_id: str, ip: str) -> bool:
if self.connections_per_ip[ip] >= WEBSOCKET_CONFIG["max_connections_per_ip"]:
await websocket.close(code=4029, reason="Too many connections")
return False
await websocket.accept()
self.connections[user_id] = websocket
self.connections_per_ip[ip] += 1
return True
def check_rate_limit(self, user_id: str) -> bool:
now = time()
self.message_counts[user_id] = [
ts for ts in self.message_counts[user_id] if ts > now - 60
]
if len(self.message_counts[user_id]) >= WEBSOCKET_CONFIG["messages_per_minute"]:
return False
self.message_counts[user_id].append(now)
return True
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
if not await validate_origin(websocket):
return
user = await authenticate_websocket(websocket)
if not user:
return
ip = websocket.client.host
if not await manager.connect(websocket, user.id, ip):
return
try:
while True:
raw = await asyncio.wait_for(
websocket.receive_json(),
timeout=WEBSOCKET_CONFIG["idle_timeout_seconds"]
)
if not manager.check_rate_limit(user.id):
await websocket.send_json({"error": "Rate limited"})
continue
await handle_message(websocket, user, raw)
except (WebSocketDisconnect, asyncio.TimeoutError):
pass
finally:
manager.disconnect(user.id, ip)
| CVE ID | 严重性 | 描述 | 缓解措施 |
|---|---|---|---|
| CVE-2024-23898 | 高 | Jenkins CSWSH - 命令执行 | 验证 Origin |
| CVE-2024-26135 | 高 | MeshCentral CSWSH - 配置泄露 | Origin + SameSite |
| CVE-2023-0957 | 严重 | Gitpod CSWSH - 账户接管 | Origin + 令牌身份验证 |
| 类别 | 缓解措施 |
|---|---|
| A01 访问控制 | 来源验证、每条消息的授权 |
| A02 加密失败 | 仅使用 TLS/WSS、签名令牌 |
| A03 注入 | 验证所有消息内容 |
| A07 身份验证失败 | 令牌身份验证、会话验证 |
async def secure_websocket_handler(websocket: WebSocket):
# 1. 验证来源(关键)
if websocket.headers.get("origin") not in ALLOWED_ORIGINS:
await websocket.close(code=4003)
return
# 2. 使用令牌进行身份验证(不要使用 cookie)
user = await validate_token(websocket.query_params.get("token"))
if not user:
await websocket.close(code=4001)
return
# 3. 验证通过后才接受连接
await websocket.accept()
# 4. 授权每条消息,5. 速率限制,6. 空闲超时
# 切勿这样做 - 容易受到 CSWSH 攻击
@app.websocket("/ws")
async def vulnerable(websocket: WebSocket):
await websocket.accept() # 接受任何来源!
# 始终这样做 - 先验证来源
if websocket.headers.get("origin") not in ALLOWED_ORIGINS:
await websocket.close(code=4003)
return
# 切勿这样做 - cookie 在 CSWSH 攻击中会自动发送
session = websocket.cookies.get("session")
# 始终这样做 - 要求显式的令牌参数
token = websocket.query_params.get("token")
# 切勿这样做 - 假设连接等同于完全访问权限
if data["action"] == "delete":
await delete_resource(data["id"])
# 始终这样做 - 检查每个操作的权限
if not user.has_permission("delete"):
return {"error": "Permission denied"}
# 切勿这样做 - 信任 WebSocket 消息
await db.execute(f"SELECT * FROM {data['table']}") # SQL 注入!
# 始终这样做 - 使用 Pydantic 进行验证
message = WebSocketMessage(**data)
references/threat-model.md 中的威胁模型pytest tests/websocket/ -vbandit -r src/websocket/pytest tests/ -v安全目标 :
关键提醒 : 始终验证 Origin,使用令牌身份验证,授权每条消息,在生产环境中使用 WSS。
每周安装数
113
仓库
GitHub 星标数
29
首次出现
Jan 20, 2026
安全审计
安装于
codex90
gemini-cli89
opencode88
cursor85
github-copilot82
claude-code75
Gate 0.2 : PASSED (5+ vulnerabilities documented) - CVE-2024-23898, CVE-2024-26135, CVE-2023-0957
Risk Level : HIGH
Justification : WebSocket connections bypass Same-Origin Policy protections, making them vulnerable to Cross-Site WebSocket Hijacking (CSWSH). Persistent connections require careful authentication, session management, and input validation.
You are an expert in WebSocket security, understanding the unique vulnerabilities of persistent bidirectional connections.
| Situation | Approach |
|---|---|
| New connection | Validate Origin, require authentication token |
| Each message | Validate format, check authorization for action |
| Sensitive operations | Re-verify session, log action |
| Idle connection | Timeout after inactivity period |
| Error condition | Close connection, log details |
| Component | Version | Notes |
|---|---|---|
| FastAPI/Starlette | 0.115+ | WebSocket support |
| websockets | 12.0+ | Python WebSocket library |
WEBSOCKET_CONFIG = {
"max_message_size": 1024 * 1024, # 1MB
"max_connections_per_ip": 10,
"idle_timeout_seconds": 300,
"messages_per_minute": 60,
}
# NEVER use "*" for origins
ALLOWED_ORIGINS = ["https://app.example.com", "https://admin.example.com"]
import pytest
from httpx import AsyncClient, ASGITransport
from fastapi.testclient import TestClient
# Test security boundaries first
@pytest.mark.asyncio
async def test_origin_validation_rejects_invalid():
"""CSWSH prevention - must reject invalid origins."""
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test"
) as client:
# This should fail until origin validation is implemented
with pytest.raises(Exception):
async with client.websocket_connect(
"/ws?token=valid",
headers={"Origin": "https://evil.com"}
):
pass
@pytest.mark.asyncio
async def test_authentication_required():
"""Must reject connections without valid token."""
with TestClient(app) as client:
with pytest.raises(Exception):
with client.websocket_connect("/ws") as ws:
pass
@pytest.mark.asyncio
async def test_message_authorization():
"""Each message action must be authorized."""
with TestClient(app) as client:
with client.websocket_connect(
"/ws?token=readonly_user",
headers={"Origin": "https://app.example.com"}
) as ws:
ws.send_json({"action": "delete", "id": "123"})
response = ws.receive_json()
assert response.get("error") == "Permission denied"
# Implement only what's needed to pass the test
async def validate_origin(websocket: WebSocket) -> bool:
origin = websocket.headers.get("origin")
if not origin or origin not in ALLOWED_ORIGINS:
await websocket.close(code=4003, reason="Invalid origin")
return False
return True
# Run all WebSocket tests
pytest tests/websocket/ -v --asyncio-mode=auto
# Check for security issues
bandit -r src/websocket/
# Verify no regressions
pytest tests/ -v
# BAD - Create new connection for each request
ws = await create_connection(user_id) # Expensive!
# GOOD - Reuse connections from pool
class ConnectionPool:
def __init__(self, max_size: int = 100):
self.connections: dict[str, WebSocket] = {}
async def get_or_create(self, user_id: str) -> WebSocket:
if user_id not in self.connections:
self.connections[user_id] = await create_connection(user_id)
return self.connections[user_id]
# BAD - Send messages one at a time
for item in items:
await websocket.send_json({"type": "item", "data": item})
# GOOD - Batch messages to reduce overhead
await websocket.send_json({"type": "batch", "data": items[:50]})
# BAD - JSON for high-frequency data (~80 bytes)
await websocket.send_json({"x": 123.456, "y": 789.012, "z": 456.789})
# GOOD - Binary format (20 bytes)
import struct
await websocket.send_bytes(struct.pack('!3f', 123.456, 789.012, 456.789))
# BAD - Fixed frequent heartbeats
HEARTBEAT_INTERVAL = 5 # Every 5 seconds
# GOOD - Adaptive heartbeats based on activity
interval = 60 if (time() - last_activity) < 60 else 30
# BAD - Blocks on slow clients
await ws.send_json(message)
# GOOD - Timeout and bounded queue
from collections import deque
queue = deque(maxlen=100) # Drop oldest when full
try:
await asyncio.wait_for(ws.send_json(message), timeout=1.0)
except asyncio.TimeoutError:
pass # Client too slow
from fastapi import WebSocket
async def validate_origin(websocket: WebSocket) -> bool:
"""Validate WebSocket origin against allowlist."""
origin = websocket.headers.get("origin")
if not origin or origin not in ALLOWED_ORIGINS:
await websocket.close(code=4003, reason="Invalid origin")
return False
return True
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
if not await validate_origin(websocket):
return
await websocket.accept()
from jose import jwt, JWTError
async def authenticate_websocket(websocket: WebSocket) -> User | None:
"""Authenticate via token (not cookies - vulnerable to CSWSH)."""
token = websocket.query_params.get("token")
if not token:
await websocket.close(code=4001, reason="Authentication required")
return None
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
user = await user_service.get(payload.get("sub"))
if not user:
await websocket.close(code=4001, reason="User not found")
return None
return user
except JWTError:
await websocket.close(code=4001, reason="Invalid token")
return None
from pydantic import BaseModel, field_validator
class WebSocketMessage(BaseModel):
action: str
data: dict
@field_validator('action')
@classmethod
def validate_action(cls, v):
if v not in {'subscribe', 'unsubscribe', 'send', 'query'}:
raise ValueError(f'Invalid action: {v}')
return v
async def handle_message(websocket: WebSocket, user: User, raw_data: dict):
try:
message = WebSocketMessage(**raw_data)
except ValueError:
await websocket.send_json({"error": "Invalid message format"})
return
if not user.has_permission(f"ws:{message.action}"):
await websocket.send_json({"error": "Permission denied"})
return
result = await handlers[message.action](user, message.data)
await websocket.send_json(result)
from collections import defaultdict
from time import time
class SecureConnectionManager:
def __init__(self):
self.connections: dict[str, WebSocket] = {}
self.message_counts: dict[str, list[float]] = defaultdict(list)
self.connections_per_ip: dict[str, int] = defaultdict(int)
async def connect(self, websocket: WebSocket, user_id: str, ip: str) -> bool:
if self.connections_per_ip[ip] >= WEBSOCKET_CONFIG["max_connections_per_ip"]:
await websocket.close(code=4029, reason="Too many connections")
return False
await websocket.accept()
self.connections[user_id] = websocket
self.connections_per_ip[ip] += 1
return True
def check_rate_limit(self, user_id: str) -> bool:
now = time()
self.message_counts[user_id] = [
ts for ts in self.message_counts[user_id] if ts > now - 60
]
if len(self.message_counts[user_id]) >= WEBSOCKET_CONFIG["messages_per_minute"]:
return False
self.message_counts[user_id].append(now)
return True
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
if not await validate_origin(websocket):
return
user = await authenticate_websocket(websocket)
if not user:
return
ip = websocket.client.host
if not await manager.connect(websocket, user.id, ip):
return
try:
while True:
raw = await asyncio.wait_for(
websocket.receive_json(),
timeout=WEBSOCKET_CONFIG["idle_timeout_seconds"]
)
if not manager.check_rate_limit(user.id):
await websocket.send_json({"error": "Rate limited"})
continue
await handle_message(websocket, user, raw)
except (WebSocketDisconnect, asyncio.TimeoutError):
pass
finally:
manager.disconnect(user.id, ip)
| CVE ID | Severity | Description | Mitigation |
|---|---|---|---|
| CVE-2024-23898 | HIGH | Jenkins CSWSH - command execution | Validate Origin |
| CVE-2024-26135 | HIGH | MeshCentral CSWSH - config leak | Origin + SameSite |
| CVE-2023-0957 | CRITICAL | Gitpod CSWSH - account takeover | Origin + token auth |
| Category | Mitigations |
|---|---|
| A01 Access Control | Origin validation, per-message authz |
| A02 Crypto Failures | TLS/WSS only, signed tokens |
| A03 Injection | Validate all message content |
| A07 Auth Failures | Token auth, session validation |
async def secure_websocket_handler(websocket: WebSocket):
# 1. VALIDATE ORIGIN (Critical)
if websocket.headers.get("origin") not in ALLOWED_ORIGINS:
await websocket.close(code=4003)
return
# 2. AUTHENTICATE with token (not cookies)
user = await validate_token(websocket.query_params.get("token"))
if not user:
await websocket.close(code=4001)
return
# 3. Accept only after validation
await websocket.accept()
# 4. AUTHORIZE each message, 5. RATE LIMIT, 6. TIMEOUT idle
# NEVER - vulnerable to CSWSH
@app.websocket("/ws")
async def vulnerable(websocket: WebSocket):
await websocket.accept() # Accepts any origin!
# ALWAYS - validate origin first
if websocket.headers.get("origin") not in ALLOWED_ORIGINS:
await websocket.close(code=4003)
return
# NEVER - cookies sent automatically in CSWSH attacks
session = websocket.cookies.get("session")
# ALWAYS - require explicit token parameter
token = websocket.query_params.get("token")
# NEVER - assumes connection = full access
if data["action"] == "delete":
await delete_resource(data["id"])
# ALWAYS - check permission for each action
if not user.has_permission("delete"):
return {"error": "Permission denied"}
# NEVER - trust WebSocket messages
await db.execute(f"SELECT * FROM {data['table']}") # SQL injection!
# ALWAYS - validate with Pydantic
message = WebSocketMessage(**data)
references/threat-model.mdpytest tests/websocket/ -vbandit -r src/websocket/pytest tests/ -vSecurity Goals :
Critical Reminders : ALWAYS validate Origin, use token auth (not cookies), authorize EACH message, use WSS in production.
Weekly Installs
113
Repository
GitHub Stars
29
First Seen
Jan 20, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykWarn
Installed on
codex90
gemini-cli89
opencode88
cursor85
github-copilot82
claude-code75
xdrop 文件传输脚本:Bun 环境下安全上传下载工具,支持加密分享
40,400 周安装
lp-agent:自动化流动性提供策略工具 | Hummingbot API 与 Solana DEX 集成
211 周安装
Encore.ts 声明式基础设施:PostgreSQL、Redis、Pub/Sub、定时任务、对象存储
213 周安装
SkyPilot 多云编排指南:跨 AWS/GCP/Azure 自动优化机器学习成本与分布式训练
213 周安装
Flash Attention优化指南:PyTorch快速内存高效注意力机制实现2-4倍加速
209 周安装
邮件序列设计指南:自动化营销策略、模板与最佳实践 | 提升转化率
214 周安装
网页转Markdown工具 - 支持JS渲染页面,一键转换并清理内容
212 周安装