SQLAlchemy ORM Expert by manutej/luxor-claude-marketplace
npx skills add https://github.com/manutej/luxor-claude-marketplace --skill 'SQLAlchemy ORM Expert'本技能为在客户支持系统中使用 SQLAlchemy 2.0+ 提供全面指导,重点关注 ORM 模式、会话管理、查询优化、与 FastAPI 的异步操作以及 PostgreSQL 集成。内容涵盖从基础模型定义到高性能支持应用程序的高级模式。
构建客户支持系统时,需要健壮的数据模型来表示工单、用户、评论、附件及其关系。SQLAlchemy 带有类型提示的声明式映射提供了一种简洁、现代的方法。
基础模型设置:
from datetime import datetime
from typing import Optional, List
from sqlalchemy import String, Integer, DateTime, Text, ForeignKey, Enum, Boolean
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy.sql import func
import enum
class Base(DeclarativeBase):
"""所有 ORM 模型的基类"""
pass
class TicketStatus(enum.Enum):
"""工单状态枚举"""
OPEN = "open"
IN_PROGRESS = "in_progress"
WAITING_ON_CUSTOMER = "waiting_on_customer"
RESOLVED = "resolved"
CLOSED = "closed"
class TicketPriority(enum.Enum):
"""工单优先级级别"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
URGENT = "urgent"
用户模型:
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False, index=True)
full_name: Mapped[str] = mapped_column(String(255), nullable=False)
is_active: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)
is_staff: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
password_hash: Mapped[str] = mapped_column(String(255), nullable=False)
# 时间戳
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
nullable=False
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(),
nullable=False
)
last_login: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True))
# 关系
tickets_created: Mapped[List["Ticket"]] = relationship(
"Ticket",
back_populates="creator",
foreign_keys="Ticket.creator_id",
cascade="all, delete-orphan"
)
tickets_assigned: Mapped[List["Ticket"]] = relationship(
"Ticket",
back_populates="assignee",
foreign_keys="Ticket.assignee_id"
)
comments: Mapped[List["Comment"]] = relationship(
"Comment",
back_populates="author",
cascade="all, delete-orphan"
)
def __repr__(self) -> str:
return f"<User(id={self.id}, email='{self.email}', name='{self.full_name}')>"
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
工单模型:
class Ticket(Base):
__tablename__ = "tickets"
id: Mapped[int] = mapped_column(primary_key=True)
ticket_number: Mapped[str] = mapped_column(String(50), unique=True, nullable=False, index=True)
title: Mapped[str] = mapped_column(String(500), nullable=False)
description: Mapped[str] = mapped_column(Text, nullable=False)
# 状态和优先级
status: Mapped[TicketStatus] = mapped_column(
Enum(TicketStatus),
default=TicketStatus.OPEN,
nullable=False,
index=True
)
priority: Mapped[TicketPriority] = mapped_column(
Enum(TicketPriority),
default=TicketPriority.MEDIUM,
nullable=False,
index=True
)
# 外键
creator_id: Mapped[int] = mapped_column(ForeignKey("users.id"), nullable=False, index=True)
assignee_id: Mapped[Optional[int]] = mapped_column(ForeignKey("users.id"), index=True)
# 软删除
deleted_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True))
# 时间戳
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
nullable=False,
index=True
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(),
nullable=False
)
resolved_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True))
# 关系
creator: Mapped["User"] = relationship(
"User",
back_populates="tickets_created",
foreign_keys=[creator_id]
)
assignee: Mapped[Optional["User"]] = relationship(
"User",
back_populates="tickets_assigned",
foreign_keys=[assignee_id]
)
comments: Mapped[List["Comment"]] = relationship(
"Comment",
back_populates="ticket",
cascade="all, delete-orphan",
order_by="Comment.created_at"
)
attachments: Mapped[List["Attachment"]] = relationship(
"Attachment",
back_populates="ticket",
cascade="all, delete-orphan"
)
tags: Mapped[List["Tag"]] = relationship(
"Tag",
secondary="ticket_tags",
back_populates="tickets"
)
def __repr__(self) -> str:
return f"<Ticket(id={self.id}, number='{self.ticket_number}', status={self.status.value})>"
一对多(工单上的评论):
class Comment(Base):
__tablename__ = "comments"
id: Mapped[int] = mapped_column(primary_key=True)
content: Mapped[str] = mapped_column(Text, nullable=False)
is_internal: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
# 外键
ticket_id: Mapped[int] = mapped_column(ForeignKey("tickets.id"), nullable=False, index=True)
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"), nullable=False, index=True)
# 时间戳
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
nullable=False
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(),
nullable=False
)
# 关系
ticket: Mapped["Ticket"] = relationship("Ticket", back_populates="comments")
author: Mapped["User"] = relationship("User", back_populates="comments")
def __repr__(self) -> str:
return f"<Comment(id={self.id}, ticket_id={self.ticket_id}, author_id={self.author_id})>"
多对多(工单上的标签):
from sqlalchemy import Table, Column
# 多对多关系的关联表
ticket_tags = Table(
"ticket_tags",
Base.metadata,
Column("ticket_id", ForeignKey("tickets.id"), primary_key=True),
Column("tag_id", ForeignKey("tags.id"), primary_key=True),
Column("created_at", DateTime(timezone=True), server_default=func.now())
)
class Tag(Base):
__tablename__ = "tags"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100), unique=True, nullable=False, index=True)
color: Mapped[str] = mapped_column(String(7), nullable=False) # 十六进制颜色
description: Mapped[Optional[str]] = mapped_column(String(500))
# 时间戳
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
nullable=False
)
# 关系
tickets: Mapped[List["Ticket"]] = relationship(
"Ticket",
secondary=ticket_tags,
back_populates="tags"
)
def __repr__(self) -> str:
return f"<Tag(id={self.id}, name='{self.name}')>"
同步会话设置:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from contextlib import contextmanager
# 数据库 URL
DATABASE_URL = "postgresql://user:password@localhost:5432/support_db"
# 创建带连接池的引擎
engine = create_engine(
DATABASE_URL,
pool_pre_ping=True, # 使用前验证连接
pool_size=10, # 维护的连接数
max_overflow=20, # 池满时的额外连接
echo=False, # 设置为 True 以记录 SQL 日志
)
# 创建会话工厂
SessionLocal = sessionmaker(
bind=engine,
autocommit=False,
autoflush=False,
expire_on_commit=False # 提交后不使对象过期
)
@contextmanager
def get_db_session() -> Session:
"""数据库会话的上下文管理器"""
session = SessionLocal()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
FastAPI 的异步会话设置:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
# 异步数据库 URL(使用 asyncpg)
ASYNC_DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/support_db"
# 创建异步引擎
async_engine = create_async_engine(
ASYNC_DATABASE_URL,
pool_pre_ping=True,
pool_size=10,
max_overflow=20,
echo=False,
)
# 创建异步会话工厂
AsyncSessionLocal = async_sessionmaker(
bind=async_engine,
class_=AsyncSession,
autocommit=False,
autoflush=False,
expire_on_commit=False
)
async def get_async_db() -> AsyncSession:
"""FastAPI 获取异步数据库会话的依赖项"""
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
使用连接加载避免 N+1 查询:
from sqlalchemy import select
from sqlalchemy.orm import joinedload, selectinload
async def get_tickets_with_details(
session: AsyncSession,
status: Optional[TicketStatus] = None
) -> List[Ticket]:
"""
在优化查询中获取包含所有相关数据的工单。
对单行关系使用 joinedload,对集合使用 selectinload。
"""
stmt = (
select(Ticket)
.options(
joinedload(Ticket.creator), # 一对一/多对一:使用 joinedload
joinedload(Ticket.assignee),
selectinload(Ticket.comments).joinedload(Comment.author), # 集合:使用 selectinload
selectinload(Ticket.attachments),
selectinload(Ticket.tags)
)
.where(Ticket.deleted_at.is_(None)) # 软删除过滤器
)
if status:
stmt = stmt.where(Ticket.status == status)
stmt = stmt.order_by(Ticket.created_at.desc())
result = await session.execute(stmt)
return list(result.unique().scalars().all())
为获得更好性能使用选择加载:
async def get_user_with_tickets(session: AsyncSession, user_id: int) -> Optional[User]:
"""
使用 selectinload 获取包含所有工单的用户,以在大集合上获得更好的性能。
"""
stmt = (
select(User)
.options(
selectinload(User.tickets_created).selectinload(Ticket.comments),
selectinload(User.tickets_assigned)
)
.where(User.id == user_id)
)
result = await session.execute(stmt)
return result.unique().scalar_one_or_none()
高级过滤:
from sqlalchemy import and_, or_, not_, func, case
from datetime import timedelta
async def search_tickets(
session: AsyncSession,
search_term: Optional[str] = None,
status_list: Optional[List[TicketStatus]] = None,
priority: Optional[TicketPriority] = None,
assignee_id: Optional[int] = None,
created_after: Optional[datetime] = None,
tags: Optional[List[str]] = None,
limit: int = 50,
offset: int = 0
) -> tuple[List[Ticket], int]:
"""
具有多个过滤器的高级工单搜索。
返回工单和总数。
"""
# 基础查询
stmt = (
select(Ticket)
.options(
joinedload(Ticket.creator),
joinedload(Ticket.assignee),
selectinload(Ticket.tags)
)
.where(Ticket.deleted_at.is_(None))
)
# 应用过滤器
if search_term:
search_filter = or_(
Ticket.title.ilike(f"%{search_term}%"),
Ticket.description.ilike(f"%{search_term}%"),
Ticket.ticket_number.ilike(f"%{search_term}%")
)
stmt = stmt.where(search_filter)
if status_list:
stmt = stmt.where(Ticket.status.in_(status_list))
if priority:
stmt = stmt.where(Ticket.priority == priority)
if assignee_id:
stmt = stmt.where(Ticket.assignee_id == assignee_id)
if created_after:
stmt = stmt.where(Ticket.created_at >= created_after)
if tags:
stmt = stmt.join(Ticket.tags).where(Tag.name.in_(tags))
# 计数查询
count_stmt = select(func.count()).select_from(stmt.subquery())
count_result = await session.execute(count_stmt)
total = count_result.scalar_one()
# 应用排序和分页
stmt = stmt.order_by(Ticket.created_at.desc()).limit(limit).offset(offset)
result = await session.execute(stmt)
tickets = list(result.unique().scalars().all())
return tickets, total
工单统计:
from sqlalchemy import func, case, extract, literal_column
from typing import Dict, Any
async def get_ticket_statistics(
session: AsyncSession,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> Dict[str, Any]:
"""
为分析仪表板获取全面的工单统计信息。
"""
# 基础过滤器
base_filter = Ticket.deleted_at.is_(None)
if start_date:
base_filter = and_(base_filter, Ticket.created_at >= start_date)
if end_date:
base_filter = and_(base_filter, Ticket.created_at <= end_date)
# 按状态计数
status_stmt = (
select(
Ticket.status,
func.count(Ticket.id).label("count")
)
.where(base_filter)
.group_by(Ticket.status)
)
status_result = await session.execute(status_stmt)
status_counts = {row[0].value: row[1] for row in status_result}
# 按优先级计数
priority_stmt = (
select(
Ticket.priority,
func.count(Ticket.id).label("count")
)
.where(base_filter)
.group_by(Ticket.priority)
)
priority_result = await session.execute(priority_stmt)
priority_counts = {row[0].value: row[1] for row in priority_result}
# 平均解决时间
resolution_stmt = (
select(
func.avg(
func.extract("epoch", Ticket.resolved_at - Ticket.created_at)
).label("avg_seconds")
)
.where(
and_(
base_filter,
Ticket.resolved_at.is_not(None)
)
)
)
resolution_result = await session.execute(resolution_stmt)
avg_resolution_seconds = resolution_result.scalar_one() or 0
# 每个分配人的工单数
assignee_stmt = (
select(
User.full_name,
func.count(Ticket.id).label("ticket_count"),
func.avg(
case(
(Ticket.resolved_at.is_not(None),
func.extract("epoch", Ticket.resolved_at - Ticket.created_at))
)
).label("avg_resolution_time")
)
.join(Ticket.assignee)
.where(base_filter)
.group_by(User.id, User.full_name)
.order_by(func.count(Ticket.id).desc())
)
assignee_result = await session.execute(assignee_stmt)
assignee_stats = [
{
"assignee": row[0],
"ticket_count": row[1],
"avg_resolution_hours": (row[2] / 3600) if row[2] else None
}
for row in assignee_result
]
return {
"status_counts": status_counts,
"priority_counts": priority_counts,
"avg_resolution_hours": avg_resolution_seconds / 3600,
"assignee_stats": assignee_stats
}
批量插入:
async def bulk_create_tickets(
session: AsyncSession,
tickets_data: List[Dict[str, Any]]
) -> List[Ticket]:
"""
在单个事务中高效创建多个工单。
"""
tickets = [Ticket(**data) for data in tickets_data]
session.add_all(tickets)
await session.flush() # 刷新以获取 ID 而不提交
return tickets
async def bulk_insert_with_return(
session: AsyncSession,
tickets_data: List[Dict[str, Any]]
) -> List[Ticket]:
"""
使用 PostgreSQL 的 RETURNING 子句进行批量插入。
"""
from sqlalchemy.dialects.postgresql import insert
stmt = insert(Ticket).returning(Ticket)
result = await session.execute(stmt, tickets_data)
tickets = list(result.scalars().all())
return tickets
批量更新:
from sqlalchemy import update
async def bulk_update_ticket_status(
session: AsyncSession,
ticket_ids: List[int],
new_status: TicketStatus
) -> int:
"""
高效更新多个工单的状态。
返回更新的行数。
"""
stmt = (
update(Ticket)
.where(
and_(
Ticket.id.in_(ticket_ids),
Ticket.deleted_at.is_(None)
)
)
.values(
status=new_status,
updated_at=func.now()
)
)
result = await session.execute(stmt)
await session.commit()
return result.rowcount
async def bulk_assign_tickets(
session: AsyncSession,
ticket_ids: List[int],
assignee_id: int
) -> int:
"""
批量分配工单给用户。
"""
stmt = (
update(Ticket)
.where(Ticket.id.in_(ticket_ids))
.values(
assignee_id=assignee_id,
status=TicketStatus.IN_PROGRESS,
updated_at=func.now()
)
)
result = await session.execute(stmt)
await session.commit()
return result.rowcount
软删除实现:
async def soft_delete_ticket(session: AsyncSession, ticket_id: int) -> bool:
"""
通过设置 deleted_at 时间戳来软删除工单。
"""
stmt = (
update(Ticket)
.where(
and_(
Ticket.id == ticket_id,
Ticket.deleted_at.is_(None)
)
)
.values(deleted_at=func.now())
)
result = await session.execute(stmt)
await session.commit()
return result.rowcount > 0
async def restore_ticket(session: AsyncSession, ticket_id: int) -> bool:
"""
恢复软删除的工单。
"""
stmt = (
update(Ticket)
.where(Ticket.id == ticket_id)
.values(deleted_at=None)
)
result = await session.execute(stmt)
await session.commit()
return result.rowcount > 0
审计追踪模型:
class AuditLog(Base):
__tablename__ = "audit_logs"
id: Mapped[int] = mapped_column(primary_key=True)
table_name: Mapped[str] = mapped_column(String(100), nullable=False, index=True)
record_id: Mapped[int] = mapped_column(Integer, nullable=False, index=True)
action: Mapped[str] = mapped_column(String(50), nullable=False) # CREATE, UPDATE, DELETE
user_id: Mapped[Optional[int]] = mapped_column(ForeignKey("users.id"), index=True)
changes: Mapped[Optional[Dict]] = mapped_column(JSON) # 存储之前/之后的值
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
nullable=False,
index=True
)
user: Mapped[Optional["User"]] = relationship("User")
自动审计日志记录:
from sqlalchemy import event
from sqlalchemy.orm import Session
@event.listens_for(Ticket, "after_insert")
def log_ticket_created(mapper, connection, target):
"""自动记录工单创建"""
audit_log = AuditLog(
table_name="tickets",
record_id=target.id,
action="CREATE",
changes={"ticket_number": target.ticket_number, "status": target.status.value}
)
session = Session(bind=connection)
session.add(audit_log)
@event.listens_for(Ticket, "after_update")
def log_ticket_updated(mapper, connection, target):
"""自动记录工单更新"""
changes = {}
for attr in ["status", "priority", "assignee_id"]:
hist = getattr(mapper.get_property(attr), "impl").get_history(target, mapper)
if hist.has_changes():
changes[attr] = {"old": hist.deleted[0] if hist.deleted else None,
"new": hist.added[0] if hist.added else None}
if changes:
audit_log = AuditLog(
table_name="tickets",
record_id=target.id,
action="UPDATE",
changes=changes
)
session = Session(bind=connection)
session.add(audit_log)
计算属性:
from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
class Ticket(Base):
# ... 现有字段 ...
@hybrid_property
def is_overdue(self) -> bool:
"""检查工单是否逾期(开放超过 7 天)"""
if self.status in [TicketStatus.RESOLVED, TicketStatus.CLOSED]:
return False
return (datetime.utcnow() - self.created_at).days > 7
@is_overdue.expression
def is_overdue(cls):
"""is_overdue 的 SQL 表达式"""
return and_(
cls.status.notin_([TicketStatus.RESOLVED, TicketStatus.CLOSED]),
func.date_part("day", func.now() - cls.created_at) > 7
)
@hybrid_property
def response_time_hours(self) -> Optional[float]:
"""首次评论的响应时间(小时)"""
if not self.comments:
return None
first_comment = min(self.comments, key=lambda c: c.created_at)
delta = first_comment.created_at - self.created_at
return delta.total_seconds() / 3600
@hybrid_method
def is_priority_escalation_needed(self, hours: int = 24) -> bool:
"""检查是否需要优先级升级"""
if self.status == TicketStatus.OPEN:
age_hours = (datetime.utcnow() - self.created_at).total_seconds() / 3600
return age_hours > hours
return False
优化的引擎配置:
from sqlalchemy.pool import QueuePool
# 生产级引擎配置
production_engine = create_async_engine(
ASYNC_DATABASE_URL,
poolclass=QueuePool,
pool_size=20, # 维护 20 个连接
max_overflow=40, # 允许 40 个额外连接
pool_timeout=30, # 等待连接 30 秒
pool_recycle=3600, # 1 小时后回收连接
pool_pre_ping=True, # 验证连接健康状态
echo_pool=False, # 在生产环境中禁用池日志记录
echo=False, # 在生产环境中禁用 SQL 日志记录
connect_args={
"server_settings": {"jit": "off"}, # 为 PostgreSQL 禁用 JIT
"command_timeout": 60,
"timeout": 30,
}
)
Pytest 夹具:
import pytest
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
@pytest.fixture(scope="session")
def test_database_url():
"""测试数据库 URL"""
return "postgresql+asyncpg://test_user:test_pass@localhost:5432/test_support_db"
@pytest.fixture(scope="session")
async def async_engine(test_database_url):
"""创建测试引擎"""
engine = create_async_engine(test_database_url, echo=True)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await engine.dispose()
@pytest.fixture
async def async_session(async_engine):
"""创建测试会话"""
async_session_factory = async_sessionmaker(
bind=async_engine,
class_=AsyncSession,
expire_on_commit=False
)
async with async_session_factory() as session:
yield session
await session.rollback()
@pytest.fixture
async def test_user(async_session):
"""创建测试用户"""
user = User(
email="test@example.com",
full_name="Test User",
password_hash="hashed_password",
is_active=True
)
async_session.add(user)
await async_session.commit()
await async_session.refresh(user)
return user
迁移设置:
# 初始化 Alembic
alembic init alembic
# 创建迁移
alembic revision --autogenerate -m "Create support tables"
# 应用迁移
alembic upgrade head
# 回滚迁移
alembic downgrade -1
迁移模板:
"""Create support tables
Revision ID: 001
Create Date: 2025-01-15 10:00:00
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
revision: str = '001'
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Create users table
op.create_table(
'users',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('email', sa.String(length=255), nullable=False),
sa.Column('full_name', sa.String(length=255), nullable=False),
sa.Column('is_active', sa.Boolean(), nullable=False),
sa.Column('is_staff', sa.Boolean(), nullable=False),
sa.Column('password_hash', sa.String(length=255), nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False),
sa.Column('updated_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False),
sa.Column('last_login', sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('email')
)
op.create_index(op.f('ix_users_email'), 'users', ['email'])
def downgrade() -> None:
op.drop_index(op.f('ix_users_email'), table_name='users')
op.drop_table('users')
本技能为使用 SQLAlchemy 构建生产就绪的客户支持系统提供全面指导。遵循这些模式以构建可维护、高性能和可扩展的应用程序。
每周安装次数
–
代码仓库
GitHub 星标数
47
首次出现时间
–
安全审计
This skill provides comprehensive guidance for using SQLAlchemy 2.0+ in customer support systems, focusing on ORM patterns, session management, query optimization, async operations with FastAPI, and PostgreSQL integration. It covers everything from basic model definitions to advanced patterns for high-performance support applications.
When building customer support systems, you need robust data models that represent tickets, users, comments, attachments, and their relationships. SQLAlchemy's declarative mapping with type hints provides a clean, modern approach.
Base Model Setup:
from datetime import datetime
from typing import Optional, List
from sqlalchemy import String, Integer, DateTime, Text, ForeignKey, Enum, Boolean
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy.sql import func
import enum
class Base(DeclarativeBase):
"""Base class for all ORM models"""
pass
class TicketStatus(enum.Enum):
"""Ticket status enumeration"""
OPEN = "open"
IN_PROGRESS = "in_progress"
WAITING_ON_CUSTOMER = "waiting_on_customer"
RESOLVED = "resolved"
CLOSED = "closed"
class TicketPriority(enum.Enum):
"""Ticket priority levels"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
URGENT = "urgent"
User Model:
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False, index=True)
full_name: Mapped[str] = mapped_column(String(255), nullable=False)
is_active: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)
is_staff: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
password_hash: Mapped[str] = mapped_column(String(255), nullable=False)
# Timestamps
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
nullable=False
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(),
nullable=False
)
last_login: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True))
# Relationships
tickets_created: Mapped[List["Ticket"]] = relationship(
"Ticket",
back_populates="creator",
foreign_keys="Ticket.creator_id",
cascade="all, delete-orphan"
)
tickets_assigned: Mapped[List["Ticket"]] = relationship(
"Ticket",
back_populates="assignee",
foreign_keys="Ticket.assignee_id"
)
comments: Mapped[List["Comment"]] = relationship(
"Comment",
back_populates="author",
cascade="all, delete-orphan"
)
def __repr__(self) -> str:
return f"<User(id={self.id}, email='{self.email}', name='{self.full_name}')>"
Ticket Model:
class Ticket(Base):
__tablename__ = "tickets"
id: Mapped[int] = mapped_column(primary_key=True)
ticket_number: Mapped[str] = mapped_column(String(50), unique=True, nullable=False, index=True)
title: Mapped[str] = mapped_column(String(500), nullable=False)
description: Mapped[str] = mapped_column(Text, nullable=False)
# Status and priority
status: Mapped[TicketStatus] = mapped_column(
Enum(TicketStatus),
default=TicketStatus.OPEN,
nullable=False,
index=True
)
priority: Mapped[TicketPriority] = mapped_column(
Enum(TicketPriority),
default=TicketPriority.MEDIUM,
nullable=False,
index=True
)
# Foreign keys
creator_id: Mapped[int] = mapped_column(ForeignKey("users.id"), nullable=False, index=True)
assignee_id: Mapped[Optional[int]] = mapped_column(ForeignKey("users.id"), index=True)
# Soft delete
deleted_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True))
# Timestamps
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
nullable=False,
index=True
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(),
nullable=False
)
resolved_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True))
# Relationships
creator: Mapped["User"] = relationship(
"User",
back_populates="tickets_created",
foreign_keys=[creator_id]
)
assignee: Mapped[Optional["User"]] = relationship(
"User",
back_populates="tickets_assigned",
foreign_keys=[assignee_id]
)
comments: Mapped[List["Comment"]] = relationship(
"Comment",
back_populates="ticket",
cascade="all, delete-orphan",
order_by="Comment.created_at"
)
attachments: Mapped[List["Attachment"]] = relationship(
"Attachment",
back_populates="ticket",
cascade="all, delete-orphan"
)
tags: Mapped[List["Tag"]] = relationship(
"Tag",
secondary="ticket_tags",
back_populates="tickets"
)
def __repr__(self) -> str:
return f"<Ticket(id={self.id}, number='{self.ticket_number}', status={self.status.value})>"
One-to-Many (Comments on Tickets):
class Comment(Base):
__tablename__ = "comments"
id: Mapped[int] = mapped_column(primary_key=True)
content: Mapped[str] = mapped_column(Text, nullable=False)
is_internal: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
# Foreign keys
ticket_id: Mapped[int] = mapped_column(ForeignKey("tickets.id"), nullable=False, index=True)
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"), nullable=False, index=True)
# Timestamps
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
nullable=False
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(),
nullable=False
)
# Relationships
ticket: Mapped["Ticket"] = relationship("Ticket", back_populates="comments")
author: Mapped["User"] = relationship("User", back_populates="comments")
def __repr__(self) -> str:
return f"<Comment(id={self.id}, ticket_id={self.ticket_id}, author_id={self.author_id})>"
Many-to-Many (Tags on Tickets):
from sqlalchemy import Table, Column
# Association table for many-to-many relationship
ticket_tags = Table(
"ticket_tags",
Base.metadata,
Column("ticket_id", ForeignKey("tickets.id"), primary_key=True),
Column("tag_id", ForeignKey("tags.id"), primary_key=True),
Column("created_at", DateTime(timezone=True), server_default=func.now())
)
class Tag(Base):
__tablename__ = "tags"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100), unique=True, nullable=False, index=True)
color: Mapped[str] = mapped_column(String(7), nullable=False) # Hex color
description: Mapped[Optional[str]] = mapped_column(String(500))
# Timestamps
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
nullable=False
)
# Relationships
tickets: Mapped[List["Ticket"]] = relationship(
"Ticket",
secondary=ticket_tags,
back_populates="tags"
)
def __repr__(self) -> str:
return f"<Tag(id={self.id}, name='{self.name}')>"
Synchronous Session Setup:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from contextlib import contextmanager
# Database URL
DATABASE_URL = "postgresql://user:password@localhost:5432/support_db"
# Create engine with connection pooling
engine = create_engine(
DATABASE_URL,
pool_pre_ping=True, # Verify connections before using
pool_size=10, # Number of connections to maintain
max_overflow=20, # Additional connections when pool is full
echo=False, # Set to True for SQL logging
)
# Create session factory
SessionLocal = sessionmaker(
bind=engine,
autocommit=False,
autoflush=False,
expire_on_commit=False # Don't expire objects after commit
)
@contextmanager
def get_db_session() -> Session:
"""Context manager for database sessions"""
session = SessionLocal()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
Async Session Setup for FastAPI:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
# Async database URL (using asyncpg)
ASYNC_DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/support_db"
# Create async engine
async_engine = create_async_engine(
ASYNC_DATABASE_URL,
pool_pre_ping=True,
pool_size=10,
max_overflow=20,
echo=False,
)
# Create async session factory
AsyncSessionLocal = async_sessionmaker(
bind=async_engine,
class_=AsyncSession,
autocommit=False,
autoflush=False,
expire_on_commit=False
)
async def get_async_db() -> AsyncSession:
"""Dependency for FastAPI to get async database session"""
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
Avoiding N+1 Queries with Joined Load:
from sqlalchemy import select
from sqlalchemy.orm import joinedload, selectinload
async def get_tickets_with_details(
session: AsyncSession,
status: Optional[TicketStatus] = None
) -> List[Ticket]:
"""
Fetch tickets with all related data in optimized queries.
Uses joinedload for single-row relationships and selectinload for collections.
"""
stmt = (
select(Ticket)
.options(
joinedload(Ticket.creator), # One-to-one/many-to-one: use joinedload
joinedload(Ticket.assignee),
selectinload(Ticket.comments).joinedload(Comment.author), # Collections: use selectinload
selectinload(Ticket.attachments),
selectinload(Ticket.tags)
)
.where(Ticket.deleted_at.is_(None)) # Soft delete filter
)
if status:
stmt = stmt.where(Ticket.status == status)
stmt = stmt.order_by(Ticket.created_at.desc())
result = await session.execute(stmt)
return list(result.unique().scalars().all())
Select In Load for Better Performance:
async def get_user_with_tickets(session: AsyncSession, user_id: int) -> Optional[User]:
"""
Fetch user with all tickets using selectinload for better performance
on large collections.
"""
stmt = (
select(User)
.options(
selectinload(User.tickets_created).selectinload(Ticket.comments),
selectinload(User.tickets_assigned)
)
.where(User.id == user_id)
)
result = await session.execute(stmt)
return result.unique().scalar_one_or_none()
Advanced Filtering:
from sqlalchemy import and_, or_, not_, func, case
from datetime import timedelta
async def search_tickets(
session: AsyncSession,
search_term: Optional[str] = None,
status_list: Optional[List[TicketStatus]] = None,
priority: Optional[TicketPriority] = None,
assignee_id: Optional[int] = None,
created_after: Optional[datetime] = None,
tags: Optional[List[str]] = None,
limit: int = 50,
offset: int = 0
) -> tuple[List[Ticket], int]:
"""
Advanced ticket search with multiple filters.
Returns tickets and total count.
"""
# Base query
stmt = (
select(Ticket)
.options(
joinedload(Ticket.creator),
joinedload(Ticket.assignee),
selectinload(Ticket.tags)
)
.where(Ticket.deleted_at.is_(None))
)
# Apply filters
if search_term:
search_filter = or_(
Ticket.title.ilike(f"%{search_term}%"),
Ticket.description.ilike(f"%{search_term}%"),
Ticket.ticket_number.ilike(f"%{search_term}%")
)
stmt = stmt.where(search_filter)
if status_list:
stmt = stmt.where(Ticket.status.in_(status_list))
if priority:
stmt = stmt.where(Ticket.priority == priority)
if assignee_id:
stmt = stmt.where(Ticket.assignee_id == assignee_id)
if created_after:
stmt = stmt.where(Ticket.created_at >= created_after)
if tags:
stmt = stmt.join(Ticket.tags).where(Tag.name.in_(tags))
# Count query
count_stmt = select(func.count()).select_from(stmt.subquery())
count_result = await session.execute(count_stmt)
total = count_result.scalar_one()
# Apply ordering and pagination
stmt = stmt.order_by(Ticket.created_at.desc()).limit(limit).offset(offset)
result = await session.execute(stmt)
tickets = list(result.unique().scalars().all())
return tickets, total
Ticket Statistics:
from sqlalchemy import func, case, extract, literal_column
from typing import Dict, Any
async def get_ticket_statistics(
session: AsyncSession,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> Dict[str, Any]:
"""
Get comprehensive ticket statistics for analytics dashboard.
"""
# Base filter
base_filter = Ticket.deleted_at.is_(None)
if start_date:
base_filter = and_(base_filter, Ticket.created_at >= start_date)
if end_date:
base_filter = and_(base_filter, Ticket.created_at <= end_date)
# Count by status
status_stmt = (
select(
Ticket.status,
func.count(Ticket.id).label("count")
)
.where(base_filter)
.group_by(Ticket.status)
)
status_result = await session.execute(status_stmt)
status_counts = {row[0].value: row[1] for row in status_result}
# Count by priority
priority_stmt = (
select(
Ticket.priority,
func.count(Ticket.id).label("count")
)
.where(base_filter)
.group_by(Ticket.priority)
)
priority_result = await session.execute(priority_stmt)
priority_counts = {row[0].value: row[1] for row in priority_result}
# Average resolution time
resolution_stmt = (
select(
func.avg(
func.extract("epoch", Ticket.resolved_at - Ticket.created_at)
).label("avg_seconds")
)
.where(
and_(
base_filter,
Ticket.resolved_at.is_not(None)
)
)
)
resolution_result = await session.execute(resolution_stmt)
avg_resolution_seconds = resolution_result.scalar_one() or 0
# Tickets per assignee
assignee_stmt = (
select(
User.full_name,
func.count(Ticket.id).label("ticket_count"),
func.avg(
case(
(Ticket.resolved_at.is_not(None),
func.extract("epoch", Ticket.resolved_at - Ticket.created_at))
)
).label("avg_resolution_time")
)
.join(Ticket.assignee)
.where(base_filter)
.group_by(User.id, User.full_name)
.order_by(func.count(Ticket.id).desc())
)
assignee_result = await session.execute(assignee_stmt)
assignee_stats = [
{
"assignee": row[0],
"ticket_count": row[1],
"avg_resolution_hours": (row[2] / 3600) if row[2] else None
}
for row in assignee_result
]
return {
"status_counts": status_counts,
"priority_counts": priority_counts,
"avg_resolution_hours": avg_resolution_seconds / 3600,
"assignee_stats": assignee_stats
}
Bulk Insert:
async def bulk_create_tickets(
session: AsyncSession,
tickets_data: List[Dict[str, Any]]
) -> List[Ticket]:
"""
Efficiently create multiple tickets in a single transaction.
"""
tickets = [Ticket(**data) for data in tickets_data]
session.add_all(tickets)
await session.flush() # Flush to get IDs without committing
return tickets
async def bulk_insert_with_return(
session: AsyncSession,
tickets_data: List[Dict[str, Any]]
) -> List[Ticket]:
"""
Bulk insert with RETURNING clause for PostgreSQL.
"""
from sqlalchemy.dialects.postgresql import insert
stmt = insert(Ticket).returning(Ticket)
result = await session.execute(stmt, tickets_data)
tickets = list(result.scalars().all())
return tickets
Bulk Update:
from sqlalchemy import update
async def bulk_update_ticket_status(
session: AsyncSession,
ticket_ids: List[int],
new_status: TicketStatus
) -> int:
"""
Update status for multiple tickets efficiently.
Returns number of updated rows.
"""
stmt = (
update(Ticket)
.where(
and_(
Ticket.id.in_(ticket_ids),
Ticket.deleted_at.is_(None)
)
)
.values(
status=new_status,
updated_at=func.now()
)
)
result = await session.execute(stmt)
await session.commit()
return result.rowcount
async def bulk_assign_tickets(
session: AsyncSession,
ticket_ids: List[int],
assignee_id: int
) -> int:
"""
Bulk assign tickets to a user.
"""
stmt = (
update(Ticket)
.where(Ticket.id.in_(ticket_ids))
.values(
assignee_id=assignee_id,
status=TicketStatus.IN_PROGRESS,
updated_at=func.now()
)
)
result = await session.execute(stmt)
await session.commit()
return result.rowcount
Soft Delete Implementation:
async def soft_delete_ticket(session: AsyncSession, ticket_id: int) -> bool:
"""
Soft delete a ticket by setting deleted_at timestamp.
"""
stmt = (
update(Ticket)
.where(
and_(
Ticket.id == ticket_id,
Ticket.deleted_at.is_(None)
)
)
.values(deleted_at=func.now())
)
result = await session.execute(stmt)
await session.commit()
return result.rowcount > 0
async def restore_ticket(session: AsyncSession, ticket_id: int) -> bool:
"""
Restore a soft-deleted ticket.
"""
stmt = (
update(Ticket)
.where(Ticket.id == ticket_id)
.values(deleted_at=None)
)
result = await session.execute(stmt)
await session.commit()
return result.rowcount > 0
Audit Trail Model:
class AuditLog(Base):
__tablename__ = "audit_logs"
id: Mapped[int] = mapped_column(primary_key=True)
table_name: Mapped[str] = mapped_column(String(100), nullable=False, index=True)
record_id: Mapped[int] = mapped_column(Integer, nullable=False, index=True)
action: Mapped[str] = mapped_column(String(50), nullable=False) # CREATE, UPDATE, DELETE
user_id: Mapped[Optional[int]] = mapped_column(ForeignKey("users.id"), index=True)
changes: Mapped[Optional[Dict]] = mapped_column(JSON) # Store before/after values
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
nullable=False,
index=True
)
user: Mapped[Optional["User"]] = relationship("User")
Automatic Audit Logging:
from sqlalchemy import event
from sqlalchemy.orm import Session
@event.listens_for(Ticket, "after_insert")
def log_ticket_created(mapper, connection, target):
"""Automatically log ticket creation"""
audit_log = AuditLog(
table_name="tickets",
record_id=target.id,
action="CREATE",
changes={"ticket_number": target.ticket_number, "status": target.status.value}
)
session = Session(bind=connection)
session.add(audit_log)
@event.listens_for(Ticket, "after_update")
def log_ticket_updated(mapper, connection, target):
"""Automatically log ticket updates"""
changes = {}
for attr in ["status", "priority", "assignee_id"]:
hist = getattr(mapper.get_property(attr), "impl").get_history(target, mapper)
if hist.has_changes():
changes[attr] = {"old": hist.deleted[0] if hist.deleted else None,
"new": hist.added[0] if hist.added else None}
if changes:
audit_log = AuditLog(
table_name="tickets",
record_id=target.id,
action="UPDATE",
changes=changes
)
session = Session(bind=connection)
session.add(audit_log)
Computed Properties:
from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
class Ticket(Base):
# ... existing fields ...
@hybrid_property
def is_overdue(self) -> bool:
"""Check if ticket is overdue (open for more than 7 days)"""
if self.status in [TicketStatus.RESOLVED, TicketStatus.CLOSED]:
return False
return (datetime.utcnow() - self.created_at).days > 7
@is_overdue.expression
def is_overdue(cls):
"""SQL expression for is_overdue"""
return and_(
cls.status.notin_([TicketStatus.RESOLVED, TicketStatus.CLOSED]),
func.date_part("day", func.now() - cls.created_at) > 7
)
@hybrid_property
def response_time_hours(self) -> Optional[float]:
"""Time to first comment in hours"""
if not self.comments:
return None
first_comment = min(self.comments, key=lambda c: c.created_at)
delta = first_comment.created_at - self.created_at
return delta.total_seconds() / 3600
@hybrid_method
def is_priority_escalation_needed(self, hours: int = 24) -> bool:
"""Check if priority escalation is needed"""
if self.status == TicketStatus.OPEN:
age_hours = (datetime.utcnow() - self.created_at).total_seconds() / 3600
return age_hours > hours
return False
Optimized Engine Configuration:
from sqlalchemy.pool import QueuePool
# Production-grade engine configuration
production_engine = create_async_engine(
ASYNC_DATABASE_URL,
poolclass=QueuePool,
pool_size=20, # Maintain 20 connections
max_overflow=40, # Allow 40 additional connections
pool_timeout=30, # Wait 30 seconds for connection
pool_recycle=3600, # Recycle connections after 1 hour
pool_pre_ping=True, # Verify connection health
echo_pool=False, # Disable pool logging in production
echo=False, # Disable SQL logging in production
connect_args={
"server_settings": {"jit": "off"}, # Disable JIT for PostgreSQL
"command_timeout": 60,
"timeout": 30,
}
)
Pytest Fixtures:
import pytest
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
@pytest.fixture(scope="session")
def test_database_url():
"""Test database URL"""
return "postgresql+asyncpg://test_user:test_pass@localhost:5432/test_support_db"
@pytest.fixture(scope="session")
async def async_engine(test_database_url):
"""Create test engine"""
engine = create_async_engine(test_database_url, echo=True)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await engine.dispose()
@pytest.fixture
async def async_session(async_engine):
"""Create test session"""
async_session_factory = async_sessionmaker(
bind=async_engine,
class_=AsyncSession,
expire_on_commit=False
)
async with async_session_factory() as session:
yield session
await session.rollback()
@pytest.fixture
async def test_user(async_session):
"""Create test user"""
user = User(
email="test@example.com",
full_name="Test User",
password_hash="hashed_password",
is_active=True
)
async_session.add(user)
await async_session.commit()
await async_session.refresh(user)
return user
Migration Setup:
# Initialize Alembic
alembic init alembic
# Create migration
alembic revision --autogenerate -m "Create support tables"
# Apply migration
alembic upgrade head
# Rollback migration
alembic downgrade -1
Migration Template:
"""Create support tables
Revision ID: 001
Create Date: 2025-01-15 10:00:00
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
revision: str = '001'
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Create users table
op.create_table(
'users',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('email', sa.String(length=255), nullable=False),
sa.Column('full_name', sa.String(length=255), nullable=False),
sa.Column('is_active', sa.Boolean(), nullable=False),
sa.Column('is_staff', sa.Boolean(), nullable=False),
sa.Column('password_hash', sa.String(length=255), nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False),
sa.Column('updated_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False),
sa.Column('last_login', sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('email')
)
op.create_index(op.f('ix_users_email'), 'users', ['email'])
def downgrade() -> None:
op.drop_index(op.f('ix_users_email'), table_name='users')
op.drop_table('users')
This skill provides comprehensive guidance for building production-ready customer support systems with SQLAlchemy. Follow these patterns for maintainable, performant, and scalable applications.
Weekly Installs
–
Repository
GitHub Stars
47
First Seen
–
Security Audits
GSAP React 动画库使用指南:useGSAP Hook 与最佳实践
2,400 周安装