FastAPI Customer Support Tech Enablement by manutej/luxor-claude-marketplace
npx skills add https://github.com/manutej/luxor-claude-marketplace --skill 'FastAPI Customer Support Tech Enablement'此技能提供了使用 FastAPI 构建生产就绪的客户支持 API 的全面指导。FastAPI 是一个现代、快速(高性能)的 Web 框架,用于基于 Python 3.8+ 的标准 Python 类型提示构建 API。
FastAPI 因其以下特点而成为客户支持系统的理想选择:
FastAPI 构建在 Starlette(用于 Web 路由)和 Pydantic(用于数据验证)之上,为客户支持系统中常见的 I/O 密集型操作提供了出色的异步支持。
关键概念:
async defdef支持 API 的最佳实践:
# 数据库操作的异步处理(大多数支持 API)
@app.get("/tickets/{ticket_id}")
async def get_ticket(ticket_id: int, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Ticket).where(Ticket.id == ticket_id))
ticket = result.scalar_one_or_none()
if not ticket:
raise HTTPException(status_code=404, detail="Ticket not found")
return ticket
# 无 I/O 的简单操作的同步处理
@app.get("/health")
def health_check():
return {"status": "healthy"}
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
FastAPI 的依赖注入功能强大,可用于管理数据库会话、身份验证和配置等共享资源。
数据库会话管理:
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/support_db"
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
# 在端点中使用
@app.post("/tickets/")
async def create_ticket(
ticket: TicketCreate,
db: AsyncSession = Depends(get_db)
):
db_ticket = Ticket(**ticket.dict())
db.add(db_ticket)
await db.commit()
await db.refresh(db_ticket)
return db_ticket
身份验证依赖项:
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db)
) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
result = await db.execute(select(User).where(User.username == username))
user = result.scalar_one_or_none()
if user is None:
raise credentials_exception
return user
async def get_current_active_agent(
current_user: User = Depends(get_current_user)
) -> User:
if not current_user.is_active or current_user.role != "agent":
raise HTTPException(status_code=403, detail="Not authorized as support agent")
return current_user
Pydantic 模型确保整个支持系统的数据完整性。
客户支持的基础模型:
from pydantic import BaseModel, EmailStr, Field, validator
from datetime import datetime
from typing import Optional, List
from enum import Enum
class TicketPriority(str, Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
URGENT = "urgent"
class TicketStatus(str, Enum):
OPEN = "open"
IN_PROGRESS = "in_progress"
WAITING_CUSTOMER = "waiting_customer"
RESOLVED = "resolved"
CLOSED = "closed"
class TicketBase(BaseModel):
title: str = Field(..., min_length=3, max_length=200)
description: str = Field(..., min_length=10)
priority: TicketPriority = TicketPriority.MEDIUM
category: str = Field(..., max_length=50)
@validator('title')
def title_must_not_be_empty(cls, v):
if not v.strip():
raise ValueError('Title cannot be empty or whitespace')
return v.strip()
class TicketCreate(TicketBase):
customer_email: EmailStr
attachments: Optional[List[str]] = []
class TicketUpdate(BaseModel):
title: Optional[str] = Field(None, min_length=3, max_length=200)
description: Optional[str] = None
status: Optional[TicketStatus] = None
priority: Optional[TicketPriority] = None
assigned_to: Optional[int] = None
class TicketResponse(TicketBase):
id: int
status: TicketStatus
customer_email: str
assigned_to: Optional[int]
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True # 用于 SQLAlchemy 模型
用于 PostgreSQL 操作的现代异步 SQLAlchemy 集成。
模型定义:
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Enum, Text
from sqlalchemy.orm import declarative_base, relationship
from sqlalchemy.sql import func
import enum
Base = declarative_base()
class TicketStatusEnum(enum.Enum):
OPEN = "open"
IN_PROGRESS = "in_progress"
WAITING_CUSTOMER = "waiting_customer"
RESOLVED = "resolved"
CLOSED = "closed"
class TicketPriorityEnum(enum.Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
URGENT = "urgent"
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, index=True, nullable=False)
email = Column(String(100), unique=True, index=True, nullable=False)
hashed_password = Column(String(255), nullable=False)
full_name = Column(String(100))
role = Column(String(20), default="customer") # customer, agent, admin
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
assigned_tickets = relationship("Ticket", back_populates="assigned_agent")
comments = relationship("Comment", back_populates="author")
class Ticket(Base):
__tablename__ = "tickets"
id = Column(Integer, primary_key=True, index=True)
title = Column(String(200), nullable=False)
description = Column(Text, nullable=False)
status = Column(Enum(TicketStatusEnum), default=TicketStatusEnum.OPEN, index=True)
priority = Column(Enum(TicketPriorityEnum), default=TicketPriorityEnum.MEDIUM, index=True)
category = Column(String(50), index=True)
customer_email = Column(String(100), index=True, nullable=False)
assigned_to = Column(Integer, ForeignKey("users.id"), nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
resolved_at = Column(DateTime(timezone=True), nullable=True)
assigned_agent = relationship("User", back_populates="assigned_tickets")
comments = relationship("Comment", back_populates="ticket", cascade="all, delete-orphan")
class Comment(Base):
__tablename__ = "comments"
id = Column(Integer, primary_key=True, index=True)
ticket_id = Column(Integer, ForeignKey("tickets.id"), nullable=False)
author_id = Column(Integer, ForeignKey("users.id"), nullable=False)
content = Column(Text, nullable=False)
is_internal = Column(Boolean, default=False) # 内部代理备注
created_at = Column(DateTime(timezone=True), server_default=func.now())
ticket = relationship("Ticket", back_populates="comments")
author = relationship("User", back_populates="comments")
数据库初始化:
async def init_db():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
@app.on_event("startup")
async def startup():
await init_db()
基于 JWT 的身份验证,用于支持门户访问。
密码哈希:
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
令牌生成:
from datetime import datetime, timedelta
from jose import jwt
SECRET_KEY = "your-secret-key-here" # 在生产环境中使用环境变量
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
登录端点:
from fastapi.security import OAuth2PasswordRequestForm
@app.post("/token")
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: AsyncSession = Depends(get_db)
):
result = await db.execute(select(User).where(User.username == form_data.username))
user = result.scalar_one_or_none()
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username, "role": user.role},
expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
客户和支持代理之间的实时聊天。
WebSocket 管理器:
from fastapi import WebSocket, WebSocketDisconnect
from typing import Dict, List
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[int, List[WebSocket]] = {}
async def connect(self, websocket: WebSocket, ticket_id: int):
await websocket.accept()
if ticket_id not in self.active_connections:
self.active_connections[ticket_id] = []
self.active_connections[ticket_id].append(websocket)
def disconnect(self, websocket: WebSocket, ticket_id: int):
if ticket_id in self.active_connections:
self.active_connections[ticket_id].remove(websocket)
if not self.active_connections[ticket_id]:
del self.active_connections[ticket_id]
async def send_message(self, message: str, ticket_id: int):
if ticket_id in self.active_connections:
for connection in self.active_connections[ticket_id]:
await connection.send_text(message)
async def broadcast(self, message: str, ticket_id: int, exclude: WebSocket = None):
if ticket_id in self.active_connections:
for connection in self.active_connections[ticket_id]:
if connection != exclude:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws/ticket/{ticket_id}")
async def websocket_endpoint(
websocket: WebSocket,
ticket_id: int,
token: str,
db: AsyncSession = Depends(get_db)
):
# 验证令牌
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
except JWTError:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
await manager.connect(websocket, ticket_id)
try:
while True:
data = await websocket.receive_text()
message = {
"ticket_id": ticket_id,
"username": username,
"message": data,
"timestamp": datetime.utcnow().isoformat()
}
await manager.broadcast(json.dumps(message), ticket_id)
except WebSocketDisconnect:
manager.disconnect(websocket, ticket_id)
处理电子邮件通知和长时间运行的操作,而不阻塞响应。
电子邮件通知任务:
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
def send_email_notification(to_email: str, subject: str, body: str):
"""发送电子邮件的后台任务"""
try:
msg = MIMEMultipart()
msg['From'] = "support@company.com"
msg['To'] = to_email
msg['Subject'] = subject
msg.attach(MIMEText(body, 'html'))
# 配置 SMTP
with smtplib.SMTP('smtp.gmail.com', 587) as server:
server.starttls()
server.login("support@company.com", "password")
server.send_message(msg)
except Exception as e:
print(f"Failed to send email: {e}")
@app.post("/tickets/")
async def create_ticket(
ticket: TicketCreate,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
db_ticket = Ticket(**ticket.dict())
db.add(db_ticket)
await db.commit()
await db.refresh(db_ticket)
# 在后台发送确认邮件
background_tasks.add_task(
send_email_notification,
ticket.customer_email,
f"Ticket #{db_ticket.id} Created",
f"Your support ticket has been created. We'll respond within 24 hours."
)
return db_ticket
对于包含大量记录的支持票据列表至关重要。
分页依赖项:
from typing import Optional
class PaginationParams(BaseModel):
skip: int = Field(0, ge=0, description="要跳过的记录数")
limit: int = Field(10, ge=1, le=100, description="要返回的最大记录数")
class TicketFilters(BaseModel):
status: Optional[TicketStatus] = None
priority: Optional[TicketPriority] = None
category: Optional[str] = None
assigned_to: Optional[int] = None
search: Optional[str] = Field(None, description="在标题/描述中搜索")
@app.get("/tickets/", response_model=List[TicketResponse])
async def list_tickets(
filters: TicketFilters = Depends(),
pagination: PaginationParams = Depends(),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
query = select(Ticket)
# 应用筛选器
if filters.status:
query = query.where(Ticket.status == filters.status)
if filters.priority:
query = query.where(Ticket.priority == filters.priority)
if filters.category:
query = query.where(Ticket.category == filters.category)
if filters.assigned_to:
query = query.where(Ticket.assigned_to == filters.assigned_to)
if filters.search:
search_term = f"%{filters.search}%"
query = query.where(
(Ticket.title.ilike(search_term)) |
(Ticket.description.ilike(search_term))
)
# 应用分页
query = query.offset(pagination.skip).limit(pagination.limit)
query = query.order_by(Ticket.created_at.desc())
result = await db.execute(query)
tickets = result.scalars().all()
return tickets
一致的错误响应和日志记录。
自定义异常处理器:
from fastapi.responses import JSONResponse
class TicketNotFoundError(Exception):
def __init__(self, ticket_id: int):
self.ticket_id = ticket_id
@app.exception_handler(TicketNotFoundError)
async def ticket_not_found_handler(request: Request, exc: TicketNotFoundError):
return JSONResponse(
status_code=404,
content={
"error": "ticket_not_found",
"message": f"Ticket with ID {exc.ticket_id} not found",
"ticket_id": exc.ticket_id
}
)
@app.exception_handler(ValidationError)
async def validation_exception_handler(request: Request, exc: ValidationError):
return JSONResponse(
status_code=422,
content={
"error": "validation_error",
"message": "Invalid request data",
"details": exc.errors()
}
)
日志记录中间件:
import time
import logging
logger = logging.getLogger(__name__)
@app.middleware("http")
async def log_requests(request: Request, call_next):
start_time = time.time()
# 记录请求
logger.info(f"Request: {request.method} {request.url}")
response = await call_next(request)
# 记录响应
process_time = time.time() - start_time
logger.info(
f"Response: {response.status_code} "
f"(took {process_time:.2f}s)"
)
response.headers["X-Process-Time"] = str(process_time)
return response
允许 Web 客户端访问 API。
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=[
"http://localhost:3000", # React 开发服务器
"https://support.company.com" # 生产域名
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
FastAPI 自动生成交互式文档。
自定义文档:
from fastapi import FastAPI
app = FastAPI(
title="Customer Support API",
description="用于管理客户支持票据、代理和实时聊天的 API",
version="1.0.0",
contact={
"name": "Support Team",
"email": "tech@company.com"
},
license_info={
"name": "MIT"
}
)
# 为组织添加标签
tags_metadata = [
{
"name": "tickets",
"description": "支持票据操作",
},
{
"name": "users",
"description": "用户和代理管理",
},
{
"name": "chat",
"description": "通过 WebSocket 进行实时聊天",
},
]
app = FastAPI(openapi_tags=tags_metadata)
使用 pytest 和 httpx 进行全面的测试。
测试设置:
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from main import app, get_db
TEST_DATABASE_URL = "postgresql+asyncpg://user:password@localhost/test_support_db"
@pytest.fixture
async def test_db():
engine = create_async_engine(TEST_DATABASE_URL)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
TestSessionLocal = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async with TestSessionLocal() as session:
yield session
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
@pytest.fixture
async def client(test_db):
async def override_get_db():
yield test_db
app.dependency_overrides[get_db] = override_get_db
async with AsyncClient(app=app, base_url="http://test") as ac:
yield ac
app.dependency_overrides.clear()
@pytest.mark.anyio
async def test_create_ticket(client: AsyncClient):
response = await client.post(
"/tickets/",
json={
"title": "Test Ticket",
"description": "This is a test ticket",
"priority": "high",
"category": "technical",
"customer_email": "customer@example.com"
}
)
assert response.status_code == 200
data = response.json()
assert data["title"] == "Test Ticket"
assert data["status"] == "open"
使用预加载:
from sqlalchemy.orm import selectinload
@app.get("/tickets/{ticket_id}/full")
async def get_ticket_with_comments(
ticket_id: int,
db: AsyncSession = Depends(get_db)
):
query = select(Ticket).options(
selectinload(Ticket.comments),
selectinload(Ticket.assigned_agent)
).where(Ticket.id == ticket_id)
result = await db.execute(query)
ticket = result.scalar_one_or_none()
if not ticket:
raise HTTPException(status_code=404, detail="Ticket not found")
return ticket
数据库连接池:
engine = create_async_engine(
DATABASE_URL,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
pool_recycle=3600
)
import redis.asyncio as redis
from fastapi import Depends
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
async def get_cached_ticket(ticket_id: int) -> Optional[dict]:
cached = await redis_client.get(f"ticket:{ticket_id}")
if cached:
return json.loads(cached)
return None
async def cache_ticket(ticket_id: int, data: dict, expire: int = 300):
await redis_client.setex(
f"ticket:{ticket_id}",
expire,
json.dumps(data)
)
@app.get("/tickets/{ticket_id}")
async def get_ticket(
ticket_id: int,
db: AsyncSession = Depends(get_db)
):
# 首先尝试缓存
cached = await get_cached_ticket(ticket_id)
if cached:
return cached
# 查询数据库
result = await db.execute(select(Ticket).where(Ticket.id == ticket_id))
ticket = result.scalar_one_or_none()
if not ticket:
raise HTTPException(status_code=404, detail="Ticket not found")
# 缓存结果
ticket_dict = {
"id": ticket.id,
"title": ticket.title,
"status": ticket.status.value,
# ... 其他字段
}
await cache_ticket(ticket_id, ticket_dict)
return ticket_dict
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
@app.get("/health")
async def health_check(db: AsyncSession = Depends(get_db)):
try:
# 检查数据库连接
await db.execute(text("SELECT 1"))
return {
"status": "healthy",
"database": "connected",
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
raise HTTPException(
status_code=503,
detail=f"Service unhealthy: {str(e)}"
)
from prometheus_client import Counter, Histogram, generate_latest
from fastapi.responses import Response
REQUEST_COUNT = Counter('http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status'])
REQUEST_DURATION = Histogram('http_request_duration_seconds', 'HTTP request duration')
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
REQUEST_DURATION.observe(duration)
return response
@app.get("/metrics")
async def metrics():
return Response(generate_latest(), media_type="text/plain")
@app.post("/tickets/{ticket_id}/assign")
async def assign_ticket(
ticket_id: int,
agent_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_active_agent)
):
# 获取票据
result = await db.execute(select(Ticket).where(Ticket.id == ticket_id))
ticket = result.scalar_one_or_none()
if not ticket:
raise HTTPException(status_code=404, detail="Ticket not found")
# 验证代理是否存在且处于活动状态
agent_result = await db.execute(select(User).where(User.id == agent_id))
agent = agent_result.scalar_one_or_none()
if not agent or agent.role != "agent" or not agent.is_active:
raise HTTPException(status_code=400, detail="Invalid agent")
# 更新票据
ticket.assigned_to = agent_id
ticket.status = TicketStatusEnum.IN_PROGRESS
await db.commit()
return {"message": f"Ticket assigned to {agent.full_name}"}
from datetime import timedelta
def calculate_sla_breach(ticket: Ticket) -> dict:
sla_hours = {
TicketPriorityEnum.URGENT: 4,
TicketPriorityEnum.HIGH: 8,
TicketPriorityEnum.MEDIUM: 24,
TicketPriorityEnum.LOW: 48
}
sla_deadline = ticket.created_at + timedelta(hours=sla_hours[ticket.priority])
now = datetime.utcnow()
if ticket.status in [TicketStatusEnum.RESOLVED, TicketStatusEnum.CLOSED]:
resolution_time = ticket.resolved_at or ticket.updated_at
breached = resolution_time > sla_deadline
time_to_resolution = (resolution_time - ticket.created_at).total_seconds() / 3600
else:
breached = now > sla_deadline
time_to_resolution = None
return {
"sla_deadline": sla_deadline,
"breached": breached,
"time_to_resolution_hours": time_to_resolution
}
@app.post("/tickets/bulk-update")
async def bulk_update_tickets(
ticket_ids: List[int],
update_data: TicketUpdate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_active_agent)
):
result = await db.execute(
select(Ticket).where(Ticket.id.in_(ticket_ids))
)
tickets = result.scalars().all()
if not tickets:
raise HTTPException(status_code=404, detail="No tickets found")
for ticket in tickets:
if update_data.status:
ticket.status = update_data.status
if update_data.priority:
ticket.priority = update_data.priority
if update_data.assigned_to:
ticket.assigned_to = update_data.assigned_to
await db.commit()
return {"updated_count": len(tickets), "ticket_ids": ticket_ids}
1. 数据库连接错误
postgresql+asyncpg://user:pass@host:port/dbpip install asyncpg2. 异步/等待错误
async defawait3. Pydantic 验证错误
4. JWT 令牌问题
5. WebSocket 连接断开
此 FastAPI 技能提供了构建生产就绪的客户支持 API 所需的一切:
对客户支持系统的关键优势:
每周安装次数
–
代码仓库
GitHub 星标数
47
首次出现
–
安全审计
This skill provides comprehensive guidance for building production-ready customer support APIs using FastAPI, the modern, fast (high-performance) web framework for building APIs with Python 3.8+ based on standard Python type hints.
FastAPI is ideal for customer support systems due to its:
FastAPI is built on top of Starlette for web routing and Pydantic for data validation, providing excellent async support for I/O-bound operations common in customer support systems.
Key Concepts:
async def for path operations when making database queries, external API calls, or file operationsdef for CPU-bound operations or when using synchronous librariesBest Practices for Support APIs:
# Async for database operations (most support APIs)
@app.get("/tickets/{ticket_id}")
async def get_ticket(ticket_id: int, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Ticket).where(Ticket.id == ticket_id))
ticket = result.scalar_one_or_none()
if not ticket:
raise HTTPException(status_code=404, detail="Ticket not found")
return ticket
# Sync for simple operations without I/O
@app.get("/health")
def health_check():
return {"status": "healthy"}
FastAPI's dependency injection is powerful for managing shared resources like database sessions, authentication, and configuration.
Database Session Management:
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/support_db"
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
# Use in endpoints
@app.post("/tickets/")
async def create_ticket(
ticket: TicketCreate,
db: AsyncSession = Depends(get_db)
):
db_ticket = Ticket(**ticket.dict())
db.add(db_ticket)
await db.commit()
await db.refresh(db_ticket)
return db_ticket
Authentication Dependencies:
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db)
) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
result = await db.execute(select(User).where(User.username == username))
user = result.scalar_one_or_none()
if user is None:
raise credentials_exception
return user
async def get_current_active_agent(
current_user: User = Depends(get_current_user)
) -> User:
if not current_user.is_active or current_user.role != "agent":
raise HTTPException(status_code=403, detail="Not authorized as support agent")
return current_user
Pydantic models ensure data integrity throughout your support system.
Base Models for Customer Support:
from pydantic import BaseModel, EmailStr, Field, validator
from datetime import datetime
from typing import Optional, List
from enum import Enum
class TicketPriority(str, Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
URGENT = "urgent"
class TicketStatus(str, Enum):
OPEN = "open"
IN_PROGRESS = "in_progress"
WAITING_CUSTOMER = "waiting_customer"
RESOLVED = "resolved"
CLOSED = "closed"
class TicketBase(BaseModel):
title: str = Field(..., min_length=3, max_length=200)
description: str = Field(..., min_length=10)
priority: TicketPriority = TicketPriority.MEDIUM
category: str = Field(..., max_length=50)
@validator('title')
def title_must_not_be_empty(cls, v):
if not v.strip():
raise ValueError('Title cannot be empty or whitespace')
return v.strip()
class TicketCreate(TicketBase):
customer_email: EmailStr
attachments: Optional[List[str]] = []
class TicketUpdate(BaseModel):
title: Optional[str] = Field(None, min_length=3, max_length=200)
description: Optional[str] = None
status: Optional[TicketStatus] = None
priority: Optional[TicketPriority] = None
assigned_to: Optional[int] = None
class TicketResponse(TicketBase):
id: int
status: TicketStatus
customer_email: str
assigned_to: Optional[int]
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True # For SQLAlchemy models
Modern async SQLAlchemy integration for PostgreSQL operations.
Model Definitions:
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Enum, Text
from sqlalchemy.orm import declarative_base, relationship
from sqlalchemy.sql import func
import enum
Base = declarative_base()
class TicketStatusEnum(enum.Enum):
OPEN = "open"
IN_PROGRESS = "in_progress"
WAITING_CUSTOMER = "waiting_customer"
RESOLVED = "resolved"
CLOSED = "closed"
class TicketPriorityEnum(enum.Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
URGENT = "urgent"
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, index=True, nullable=False)
email = Column(String(100), unique=True, index=True, nullable=False)
hashed_password = Column(String(255), nullable=False)
full_name = Column(String(100))
role = Column(String(20), default="customer") # customer, agent, admin
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
assigned_tickets = relationship("Ticket", back_populates="assigned_agent")
comments = relationship("Comment", back_populates="author")
class Ticket(Base):
__tablename__ = "tickets"
id = Column(Integer, primary_key=True, index=True)
title = Column(String(200), nullable=False)
description = Column(Text, nullable=False)
status = Column(Enum(TicketStatusEnum), default=TicketStatusEnum.OPEN, index=True)
priority = Column(Enum(TicketPriorityEnum), default=TicketPriorityEnum.MEDIUM, index=True)
category = Column(String(50), index=True)
customer_email = Column(String(100), index=True, nullable=False)
assigned_to = Column(Integer, ForeignKey("users.id"), nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
resolved_at = Column(DateTime(timezone=True), nullable=True)
assigned_agent = relationship("User", back_populates="assigned_tickets")
comments = relationship("Comment", back_populates="ticket", cascade="all, delete-orphan")
class Comment(Base):
__tablename__ = "comments"
id = Column(Integer, primary_key=True, index=True)
ticket_id = Column(Integer, ForeignKey("tickets.id"), nullable=False)
author_id = Column(Integer, ForeignKey("users.id"), nullable=False)
content = Column(Text, nullable=False)
is_internal = Column(Boolean, default=False) # Internal agent notes
created_at = Column(DateTime(timezone=True), server_default=func.now())
ticket = relationship("Ticket", back_populates="comments")
author = relationship("User", back_populates="comments")
Database Initialization:
async def init_db():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
@app.on_event("startup")
async def startup():
await init_db()
JWT-based authentication for support portal access.
Password Hashing:
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
Token Generation:
from datetime import datetime, timedelta
from jose import jwt
SECRET_KEY = "your-secret-key-here" # Use environment variable in production
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
Login Endpoint:
from fastapi.security import OAuth2PasswordRequestForm
@app.post("/token")
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: AsyncSession = Depends(get_db)
):
result = await db.execute(select(User).where(User.username == form_data.username))
user = result.scalar_one_or_none()
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username, "role": user.role},
expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
Real-time chat between customers and support agents.
WebSocket Manager:
from fastapi import WebSocket, WebSocketDisconnect
from typing import Dict, List
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[int, List[WebSocket]] = {}
async def connect(self, websocket: WebSocket, ticket_id: int):
await websocket.accept()
if ticket_id not in self.active_connections:
self.active_connections[ticket_id] = []
self.active_connections[ticket_id].append(websocket)
def disconnect(self, websocket: WebSocket, ticket_id: int):
if ticket_id in self.active_connections:
self.active_connections[ticket_id].remove(websocket)
if not self.active_connections[ticket_id]:
del self.active_connections[ticket_id]
async def send_message(self, message: str, ticket_id: int):
if ticket_id in self.active_connections:
for connection in self.active_connections[ticket_id]:
await connection.send_text(message)
async def broadcast(self, message: str, ticket_id: int, exclude: WebSocket = None):
if ticket_id in self.active_connections:
for connection in self.active_connections[ticket_id]:
if connection != exclude:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws/ticket/{ticket_id}")
async def websocket_endpoint(
websocket: WebSocket,
ticket_id: int,
token: str,
db: AsyncSession = Depends(get_db)
):
# Verify token
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
except JWTError:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
await manager.connect(websocket, ticket_id)
try:
while True:
data = await websocket.receive_text()
message = {
"ticket_id": ticket_id,
"username": username,
"message": data,
"timestamp": datetime.utcnow().isoformat()
}
await manager.broadcast(json.dumps(message), ticket_id)
except WebSocketDisconnect:
manager.disconnect(websocket, ticket_id)
Handle email notifications and long-running operations without blocking responses.
Email Notification Task:
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
def send_email_notification(to_email: str, subject: str, body: str):
"""Background task to send email"""
try:
msg = MIMEMultipart()
msg['From'] = "support@company.com"
msg['To'] = to_email
msg['Subject'] = subject
msg.attach(MIMEText(body, 'html'))
# Configure SMTP
with smtplib.SMTP('smtp.gmail.com', 587) as server:
server.starttls()
server.login("support@company.com", "password")
server.send_message(msg)
except Exception as e:
print(f"Failed to send email: {e}")
@app.post("/tickets/")
async def create_ticket(
ticket: TicketCreate,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
db_ticket = Ticket(**ticket.dict())
db.add(db_ticket)
await db.commit()
await db.refresh(db_ticket)
# Send confirmation email in background
background_tasks.add_task(
send_email_notification,
ticket.customer_email,
f"Ticket #{db_ticket.id} Created",
f"Your support ticket has been created. We'll respond within 24 hours."
)
return db_ticket
Essential for support ticket lists with many records.
Pagination Dependencies:
from typing import Optional
class PaginationParams(BaseModel):
skip: int = Field(0, ge=0, description="Number of records to skip")
limit: int = Field(10, ge=1, le=100, description="Maximum records to return")
class TicketFilters(BaseModel):
status: Optional[TicketStatus] = None
priority: Optional[TicketPriority] = None
category: Optional[str] = None
assigned_to: Optional[int] = None
search: Optional[str] = Field(None, description="Search in title/description")
@app.get("/tickets/", response_model=List[TicketResponse])
async def list_tickets(
filters: TicketFilters = Depends(),
pagination: PaginationParams = Depends(),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
query = select(Ticket)
# Apply filters
if filters.status:
query = query.where(Ticket.status == filters.status)
if filters.priority:
query = query.where(Ticket.priority == filters.priority)
if filters.category:
query = query.where(Ticket.category == filters.category)
if filters.assigned_to:
query = query.where(Ticket.assigned_to == filters.assigned_to)
if filters.search:
search_term = f"%{filters.search}%"
query = query.where(
(Ticket.title.ilike(search_term)) |
(Ticket.description.ilike(search_term))
)
# Apply pagination
query = query.offset(pagination.skip).limit(pagination.limit)
query = query.order_by(Ticket.created_at.desc())
result = await db.execute(query)
tickets = result.scalars().all()
return tickets
Consistent error responses and logging.
Custom Exception Handlers:
from fastapi.responses import JSONResponse
class TicketNotFoundError(Exception):
def __init__(self, ticket_id: int):
self.ticket_id = ticket_id
@app.exception_handler(TicketNotFoundError)
async def ticket_not_found_handler(request: Request, exc: TicketNotFoundError):
return JSONResponse(
status_code=404,
content={
"error": "ticket_not_found",
"message": f"Ticket with ID {exc.ticket_id} not found",
"ticket_id": exc.ticket_id
}
)
@app.exception_handler(ValidationError)
async def validation_exception_handler(request: Request, exc: ValidationError):
return JSONResponse(
status_code=422,
content={
"error": "validation_error",
"message": "Invalid request data",
"details": exc.errors()
}
)
Logging Middleware:
import time
import logging
logger = logging.getLogger(__name__)
@app.middleware("http")
async def log_requests(request: Request, call_next):
start_time = time.time()
# Log request
logger.info(f"Request: {request.method} {request.url}")
response = await call_next(request)
# Log response
process_time = time.time() - start_time
logger.info(
f"Response: {response.status_code} "
f"(took {process_time:.2f}s)"
)
response.headers["X-Process-Time"] = str(process_time)
return response
Enable web clients to access the API.
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=[
"http://localhost:3000", # React dev server
"https://support.company.com" # Production domain
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
FastAPI automatically generates interactive documentation.
Customizing Documentation:
from fastapi import FastAPI
app = FastAPI(
title="Customer Support API",
description="API for managing customer support tickets, agents, and real-time chat",
version="1.0.0",
contact={
"name": "Support Team",
"email": "tech@company.com"
},
license_info={
"name": "MIT"
}
)
# Add tags for organization
tags_metadata = [
{
"name": "tickets",
"description": "Operations with support tickets",
},
{
"name": "users",
"description": "User and agent management",
},
{
"name": "chat",
"description": "Real-time chat via WebSocket",
},
]
app = FastAPI(openapi_tags=tags_metadata)
Comprehensive testing with pytest and httpx.
Test Setup:
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from main import app, get_db
TEST_DATABASE_URL = "postgresql+asyncpg://user:password@localhost/test_support_db"
@pytest.fixture
async def test_db():
engine = create_async_engine(TEST_DATABASE_URL)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
TestSessionLocal = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async with TestSessionLocal() as session:
yield session
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
@pytest.fixture
async def client(test_db):
async def override_get_db():
yield test_db
app.dependency_overrides[get_db] = override_get_db
async with AsyncClient(app=app, base_url="http://test") as ac:
yield ac
app.dependency_overrides.clear()
@pytest.mark.anyio
async def test_create_ticket(client: AsyncClient):
response = await client.post(
"/tickets/",
json={
"title": "Test Ticket",
"description": "This is a test ticket",
"priority": "high",
"category": "technical",
"customer_email": "customer@example.com"
}
)
assert response.status_code == 200
data = response.json()
assert data["title"] == "Test Ticket"
assert data["status"] == "open"
Use Eager Loading:
from sqlalchemy.orm import selectinload
@app.get("/tickets/{ticket_id}/full")
async def get_ticket_with_comments(
ticket_id: int,
db: AsyncSession = Depends(get_db)
):
query = select(Ticket).options(
selectinload(Ticket.comments),
selectinload(Ticket.assigned_agent)
).where(Ticket.id == ticket_id)
result = await db.execute(query)
ticket = result.scalar_one_or_none()
if not ticket:
raise HTTPException(status_code=404, detail="Ticket not found")
return ticket
Database Connection Pooling:
engine = create_async_engine(
DATABASE_URL,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
pool_recycle=3600
)
import redis.asyncio as redis
from fastapi import Depends
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
async def get_cached_ticket(ticket_id: int) -> Optional[dict]:
cached = await redis_client.get(f"ticket:{ticket_id}")
if cached:
return json.loads(cached)
return None
async def cache_ticket(ticket_id: int, data: dict, expire: int = 300):
await redis_client.setex(
f"ticket:{ticket_id}",
expire,
json.dumps(data)
)
@app.get("/tickets/{ticket_id}")
async def get_ticket(
ticket_id: int,
db: AsyncSession = Depends(get_db)
):
# Try cache first
cached = await get_cached_ticket(ticket_id)
if cached:
return cached
# Query database
result = await db.execute(select(Ticket).where(Ticket.id == ticket_id))
ticket = result.scalar_one_or_none()
if not ticket:
raise HTTPException(status_code=404, detail="Ticket not found")
# Cache result
ticket_dict = {
"id": ticket.id,
"title": ticket.title,
"status": ticket.status.value,
# ... other fields
}
await cache_ticket(ticket_id, ticket_dict)
return ticket_dict
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
@app.get("/health")
async def health_check(db: AsyncSession = Depends(get_db)):
try:
# Check database connection
await db.execute(text("SELECT 1"))
return {
"status": "healthy",
"database": "connected",
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
raise HTTPException(
status_code=503,
detail=f"Service unhealthy: {str(e)}"
)
from prometheus_client import Counter, Histogram, generate_latest
from fastapi.responses import Response
REQUEST_COUNT = Counter('http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status'])
REQUEST_DURATION = Histogram('http_request_duration_seconds', 'HTTP request duration')
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
REQUEST_DURATION.observe(duration)
return response
@app.get("/metrics")
async def metrics():
return Response(generate_latest(), media_type="text/plain")
@app.post("/tickets/{ticket_id}/assign")
async def assign_ticket(
ticket_id: int,
agent_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_active_agent)
):
# Get ticket
result = await db.execute(select(Ticket).where(Ticket.id == ticket_id))
ticket = result.scalar_one_or_none()
if not ticket:
raise HTTPException(status_code=404, detail="Ticket not found")
# Verify agent exists and is active
agent_result = await db.execute(select(User).where(User.id == agent_id))
agent = agent_result.scalar_one_or_none()
if not agent or agent.role != "agent" or not agent.is_active:
raise HTTPException(status_code=400, detail="Invalid agent")
# Update ticket
ticket.assigned_to = agent_id
ticket.status = TicketStatusEnum.IN_PROGRESS
await db.commit()
return {"message": f"Ticket assigned to {agent.full_name}"}
from datetime import timedelta
def calculate_sla_breach(ticket: Ticket) -> dict:
sla_hours = {
TicketPriorityEnum.URGENT: 4,
TicketPriorityEnum.HIGH: 8,
TicketPriorityEnum.MEDIUM: 24,
TicketPriorityEnum.LOW: 48
}
sla_deadline = ticket.created_at + timedelta(hours=sla_hours[ticket.priority])
now = datetime.utcnow()
if ticket.status in [TicketStatusEnum.RESOLVED, TicketStatusEnum.CLOSED]:
resolution_time = ticket.resolved_at or ticket.updated_at
breached = resolution_time > sla_deadline
time_to_resolution = (resolution_time - ticket.created_at).total_seconds() / 3600
else:
breached = now > sla_deadline
time_to_resolution = None
return {
"sla_deadline": sla_deadline,
"breached": breached,
"time_to_resolution_hours": time_to_resolution
}
@app.post("/tickets/bulk-update")
async def bulk_update_tickets(
ticket_ids: List[int],
update_data: TicketUpdate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_active_agent)
):
result = await db.execute(
select(Ticket).where(Ticket.id.in_(ticket_ids))
)
tickets = result.scalars().all()
if not tickets:
raise HTTPException(status_code=404, detail="No tickets found")
for ticket in tickets:
if update_data.status:
ticket.status = update_data.status
if update_data.priority:
ticket.priority = update_data.priority
if update_data.assigned_to:
ticket.assigned_to = update_data.assigned_to
await db.commit()
return {"updated_count": len(tickets), "ticket_ids": ticket_ids}
1. Database Connection Errors
postgresql+asyncpg://user:pass@host:port/dbpip install asyncpg2. Async/Await Errors
async def for async database operationsawait when calling async functions3. Pydantic Validation Errors
4. JWT Token Issues
5. WebSocket Connection Drops
This FastAPI skill provides everything needed to build production-ready customer support APIs with:
Key advantages for customer support systems:
Weekly Installs
–
Repository
GitHub Stars
47
First Seen
–
Security Audits
agent-browser 浏览器自动化工具 - Vercel Labs 命令行网页操作与测试
147,400 周安装