重要前提
安装AI Skills的关键前提是:必须科学上网,且开启TUN模式,这一点至关重要,直接决定安装能否顺利完成,在此郑重提醒三遍:科学上网,科学上网,科学上网。查看完整安装教程 →
pytest by manutej/luxor-claude-marketplace
npx skills add https://github.com/manutej/luxor-claude-marketplace --skill pytestpytest 是 Python 应用程序的行业标准测试框架,提供强大的功能,支持编写全面、可维护且可扩展的测试套件。本技能专门针对客户支持技术赋能,为测试后端支持系统、工单平台、知识库和客户数据平台提供模式和实践。
客户支持系统因其关键任务性质需要严格的测试。停机或错误会直接影响客户满意度、客服人员生产力和业务运营。本技能提供使用 pytest 测试支持系统所有方面的全面指导。
客户支持应用程序有特定的测试需求:
pytest 通过以下方式满足这些需求:
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
Fixtures 是 pytest 的杀手级功能,提供可重用的设置/拆卸逻辑和依赖注入。对于客户支持系统,fixtures 管理数据库、API 客户端、测试数据和外部服务模拟。
import pytest
from app.models import Ticket, Customer, Agent
@pytest.fixture
def support_ticket():
"""提供基础的支持工单字典。"""
return {
"id": 1,
"title": "无法访问账户",
"description": "用户重置密码后无法登录",
"status": "open",
"priority": "high",
"customer_email": "user@example.com",
"category": "authentication"
}
def test_ticket_structure(support_ticket):
"""测试工单具有必填字段。"""
assert support_ticket["status"] == "open"
assert support_ticket["priority"] in ["low", "medium", "high", "critical"]
assert "@" in support_ticket["customer_email"]
使用作用域控制 fixtures 的创建和销毁时机:
function(默认):每个测试函数创建一次,测试完成后销毁
class:每个测试类创建一次,在所有方法间共享
module:每个模块文件创建一次,在文件中的所有测试间共享
package:每个包创建一次,在包中的所有测试间共享
session:整个测试会话创建一次,在所有测试间共享
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, Session from typing import Generator
@pytest.fixture(scope="session") def database_engine(): """每个会话创建一次数据库引擎。""" engine = create_engine( "postgresql://support_user:password@localhost/support_test", echo=False, pool_pre_ping=True, pool_size=10, max_overflow=20 ) # 创建所有表 from app.database import Base Base.metadata.create_all(engine)
yield engine
# 清理:删除所有表并释放引擎
Base.metadata.drop_all(engine)
engine.dispose()
@pytest.fixture(scope="function") def db_session(database_engine) -> Generator[Session, None, None]: """为每个测试创建新的数据库会话,并自动回滚。""" SessionLocal = sessionmaker(bind=database_engine) session = SessionLocal()
try:
yield session
finally:
session.rollback() # 回滚所有更改
session.close()
无需显式请求即可自动运行的 fixtures:
@pytest.fixture(autouse=True)
def reset_caches():
"""在每个测试前清除所有缓存。"""
from app.cache import ticket_cache, customer_cache, agent_cache
ticket_cache.clear()
customer_cache.clear()
agent_cache.clear()
yield
# 测试后可选的清理
ticket_cache.clear()
customer_cache.clear()
agent_cache.clear()
@pytest.fixture(autouse=True, scope="session")
def configure_logging():
"""为测试会话配置日志记录。"""
import logging
logging.basicConfig(level=logging.WARNING)
logging.getLogger("sqlalchemy.engine").setLevel(logging.ERROR)
Fixtures 可以依赖其他 fixtures,创建依赖链:
@pytest.fixture
def database_url():
"""提供测试用的数据库 URL。"""
return "postgresql://test:test@localhost:5432/support_test"
@pytest.fixture
def engine(database_url):
"""从数据库 URL 创建 SQLAlchemy 引擎。"""
from sqlalchemy import create_engine
return create_engine(database_url, echo=False)
@pytest.fixture
def session(engine):
"""从引擎创建数据库会话。"""
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=engine)
session = Session()
yield session
session.close()
@pytest.fixture
def sample_customer(session):
"""使用数据库会话创建示例客户。"""
customer = Customer(
email="test@example.com",
name="Test Customer",
tier="premium",
company="Acme Corp"
)
session.add(customer)
session.commit()
return customer
@pytest.fixture
def sample_ticket(session, sample_customer):
"""创建链接到示例客户的示例工单。"""
ticket = Ticket(
title="Test Ticket",
description="Test description",
priority="high",
status="open",
customer_id=sample_customer.id
)
session.add(ticket)
session.commit()
return ticket
创建返回工厂函数的 fixtures,用于灵活的测试数据创建:
@pytest.fixture
def ticket_factory(db_session):
"""用于创建具有自定义属性的工单的工厂。"""
created_tickets = []
def _create_ticket(
title="Default Ticket",
description="Default description",
priority="medium",
status="open",
**kwargs
):
ticket = Ticket(
title=title,
description=description,
priority=priority,
status=status,
**kwargs
)
db_session.add(ticket)
db_session.commit()
created_tickets.append(ticket)
return ticket
yield _create_ticket
# 清理:删除所有创建的工单
for ticket in created_tickets:
db_session.delete(ticket)
db_session.commit()
def test_multiple_tickets(ticket_factory):
"""测试使用工厂创建多个工单。"""
high_priority = ticket_factory(priority="high", title="Urgent Issue")
low_priority = ticket_factory(priority="low", title="Minor Request")
assert high_priority.priority == "high"
assert low_priority.priority == "low"
参数化允许使用不同的输入运行相同的测试,这对于全面测试具有各种优先级、状态、用户角色和边界情况的客户支持系统至关重要。
@pytest.mark.parametrize("priority,expected_sla_hours", [
("critical", 1),
("high", 4),
("medium", 24),
("low", 72)
])
def test_sla_calculation(priority, expected_sla_hours):
"""测试不同优先级的 SLA 小时数计算。"""
from app.services.sla import calculate_sla_hours
sla = calculate_sla_hours(priority)
assert sla == expected_sla_hours
@pytest.mark.parametrize("email,is_valid", [
("user@example.com", True),
("user.name+tag@example.co.uk", True),
("user@localhost", True),
("invalid.email", False),
("@example.com", False),
("user@", False),
("", False),
("user @example.com", False),
("user@example .com", False),
])
def test_email_validation(email, is_valid):
"""测试各种有效和无效输入的电子邮件验证。"""
from app.validators import is_valid_email
assert is_valid_email(email) == is_valid
@pytest.mark.parametrize("status,priority,assigned,expected_urgent", [
("open", "critical", False, True), # 未分配的关键工单 = 紧急
("open", "high", False, True), # 未分配的高优先级工单 = 紧急
("open", "medium", False, False), # 未分配的中等优先级工单 = 不紧急
("open", "low", False, False), # 未分配的低优先级工单 = 不紧急
("in_progress", "critical", True, False), # 已分配的关键工单 = 不紧急
("in_progress", "high", True, False), # 已分配的高优先级工单 = 不紧急
("closed", "critical", True, False), # 已关闭的工单从不紧急
("resolved", "high", True, False), # 已解决的工单从不紧急
])
def test_urgent_ticket_identification(status, priority, assigned, expected_urgent):
"""测试需要立即关注的紧急工单的识别。"""
from app.models import Ticket
ticket = Ticket(status=status, priority=priority)
if assigned:
ticket.assigned_agent_id = 123
assert ticket.is_urgent() == expected_urgent
@pytest.fixture(params=["sqlite", "postgresql"])
def database_type(request):
"""用于不同数据库类型的参数化 fixture。"""
return request.param
def test_database_connection(database_type):
"""测试运行两次:一次使用 SQLite,一次使用 PostgreSQL。"""
from app.database import get_connection
connection = get_connection(database_type)
assert connection.is_connected()
connection.close()
@pytest.fixture(params=[
{"role": "admin", "can_delete": True, "can_reassign": True},
{"role": "agent", "can_delete": False, "can_reassign": True},
{"role": "viewer", "can_delete": False, "can_reassign": False},
])
def user_permissions(request):
"""用于不同用户角色和权限的参数化 fixture。"""
return request.param
def test_permission_checks(user_permissions):
"""测试不同角色的权限验证。"""
from app.auth import check_permission
can_delete = check_permission(user_permissions["role"], "delete_ticket")
can_reassign = check_permission(user_permissions["role"], "reassign_ticket")
assert can_delete == user_permissions["can_delete"]
assert can_reassign == user_permissions["can_reassign"]
@pytest.mark.parametrize("email,expected_valid,reason", [
pytest.param("user@example.com", True, "valid", id="valid-standard-email"),
pytest.param("user+tag@example.com", True, "valid", id="valid-with-plus"),
pytest.param("invalid", False, "missing-at", id="no-at-symbol"),
pytest.param("@example.com", False, "no-user", id="missing-username"),
pytest.param("user@", False, "no-domain", id="missing-domain"),
pytest.param("user@.com", False, "invalid-domain", id="dot-at-domain-start"),
])
def test_email_validation_detailed(email, expected_valid, reason):
"""测试带有详细原因的电子邮件验证。"""
from app.validators import validate_email_detailed
result = validate_email_detailed(email)
assert result.is_valid == expected_valid
if not expected_valid:
assert reason in result.error_code.lower()
@pytest.mark.parametrize("ticket_data,expected_status", [
({"priority": "critical", "auto_assign": True}, "assigned"),
({"priority": "high", "auto_assign": True}, "assigned"),
({"priority": "medium", "auto_assign": False}, "open"),
({"priority": "low", "auto_assign": False}, "open"),
], ids=["critical-auto", "high-auto", "medium-manual", "low-manual"])
def test_auto_assignment(ticket_data, expected_status):
"""测试基于优先级的自动工单分配。"""
from app.services.assignment import process_new_ticket
ticket = process_new_ticket(**ticket_data)
assert ticket.status == expected_status
模拟对于将代码单元与电子邮件服务、支付网关、CRM 系统和第三方 API 等外部依赖项隔离至关重要。pytest-mock 插件为 Python 的 unittest.mock 提供了一个简洁的接口。
def test_ticket_notification(mocker, db_session):
"""测试创建工单会发送电子邮件通知。"""
# 模拟电子邮件服务
mock_send = mocker.patch('app.services.email.EmailService.send_email')
mock_send.return_value = {"status": "sent", "message_id": "msg_123"}
# 创建工单
from app.services.ticket import TicketService
service = TicketService(db_session)
ticket = service.create_ticket(
title="Login Issue",
description="Cannot access account",
customer_email="customer@example.com",
priority="high"
)
# 验证电子邮件已发送
mock_send.assert_called_once()
call_args = mock_send.call_args
assert call_args.kwargs['to'] == "customer@example.com"
assert "Login Issue" in call_args.kwargs['subject']
assert ticket.id is not None
def test_customer_tier_lookup(mocker, api_client):
"""测试从外部 CRM 查找客户层级。"""
# 模拟外部 CRM API 响应
mock_response = mocker.Mock()
mock_response.status_code = 200
mock_response.json.return_value = {
"customer_id": "CRM_12345",
"tier": "enterprise",
"support_plan": "24/7 premium",
"account_manager": "jane@company.com"
}
mocker.patch('requests.get', return_value=mock_response)
# 测试使用 CRM 数据的端点
response = api_client.get("/api/customers/customer@example.com/tier")
assert response.status_code == 200
data = response.json()
assert data["tier"] == "enterprise"
assert data["support_plan"] == "24/7 premium"
def test_retry_logic_on_api_failure(mocker):
"""测试外部 API 暂时失败时的重试机制。"""
from app.services.external import call_with_retry
mock_api = mocker.patch('app.services.external.call_external_api')
# 前两次调用失败,第三次成功
mock_api.side_effect = [
ConnectionError("Connection timeout"),
ConnectionError("Connection timeout"),
{"success": True, "data": "response"}
]
result = call_with_retry(max_attempts=3, backoff_seconds=0)
assert result["success"] is True
assert result["data"] == "response"
assert mock_api.call_count == 3
def test_email_fallback_on_primary_failure(mocker):
"""测试主要提供商失败时的电子邮件服务回退。"""
from app.services.email import send_email_with_fallback
mock_primary = mocker.patch('app.services.email.sendgrid_send')
mock_fallback = mocker.patch('app.services.email.ses_send')
# 主要服务失败,回退服务成功
mock_primary.side_effect = Exception("SendGrid API error")
mock_fallback.return_value = {"status": "sent", "provider": "ses"}
result = send_email_with_fallback(
to="customer@example.com",
subject="Ticket Update",
body="Your ticket has been updated"
)
assert result["status"] == "sent"
assert result["provider"] == "ses"
mock_primary.assert_called_once()
mock_fallback.assert_called_once()
def test_cache_utilization(mocker, db_session):
"""监视缓存以验证其使用是否正确。"""
from app.cache import ticket_cache
spy_get = mocker.spy(ticket_cache, 'get')
spy_set = mocker.spy(ticket_cache, 'set')
from app.services.ticket import get_ticket
# 第一次调用:缓存未命中,应查询数据库
ticket1 = get_ticket(123)
assert spy_get.call_count == 1
assert spy_set.call_count == 1
# 第二次调用:缓存命中,不应查询数据库
ticket2 = get_ticket(123)
assert spy_get.call_count == 2
assert spy_set.call_count == 1 # 不再调用
assert ticket1.id == ticket2.id
def test_ticket_creation_business_logic(mocker):
"""测试无数据库的工单创建逻辑。"""
mock_session = mocker.Mock()
mock_session.add = mocker.Mock()
mock_session.commit = mocker.Mock()
mock_session.refresh = mocker.Mock(side_effect=lambda obj: setattr(obj, 'id', 123))
from app.services.ticket import create_ticket_entity
ticket = create_ticket_entity(
mock_session,
title="Test Ticket",
priority="high"
)
assert ticket.id == 123
assert ticket.priority == "high"
mock_session.add.assert_called_once()
mock_session.commit.assert_called_once()
import pytest
from unittest.mock import AsyncMock
@pytest.mark.asyncio
async def test_async_notification_service(mocker):
"""使用适当的异步模拟测试异步通知服务。"""
mock_send = mocker.patch(
'app.services.notification.send_push_notification',
new_callable=AsyncMock
)
mock_send.return_value = {
"delivered": True,
"notification_id": "notif_789"
}
from app.services.notification import NotificationService
service = NotificationService()
result = await service.notify_agent(
agent_id=456,
message="New high-priority ticket assigned to you",
priority="high"
)
assert result["delivered"] is True
assert result["notification_id"] is not None
mock_send.assert_awaited_once()
# 验证调用参数
call_kwargs = mock_send.call_args.kwargs
assert call_kwargs["agent_id"] == 456
assert "high-priority" in call_kwargs["message"]
代码覆盖率衡量在测试期间执行了代码的百分比。对于客户支持系统,全面的覆盖率可确保可靠性并帮助识别未经测试的边界情况。
# 安装 pytest-cov
pip install pytest-cov
# 运行带覆盖率的测试
pytest --cov=app tests/
# 生成 HTML 报告
pytest --cov=app --cov-report=html tests/
# 显示未覆盖的行
pytest --cov=app --cov-report=term-missing tests/
# 如果覆盖率低于阈值则失败
pytest --cov=app --cov-fail-under=80 tests/
[pytest]
addopts =
--cov=app
--cov-report=html
--cov-report=term-missing
--cov-report=xml
--cov-fail-under=80
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
markers =
unit: Unit tests
integration: Integration tests
slow: Slow running tests
[run]
source = app
omit =
*/tests/*
*/migrations/*
*/__init__.py
*/config.py
*/venv/*
*/.venv/*
[report]
exclude_lines =
pragma: no cover
def __repr__
def __str__
raise AssertionError
raise NotImplementedError
if __name__ == .__main__.:
if TYPE_CHECKING:
@abstractmethod
@abc.abstractmethod
except ImportError
precision = 2
show_missing = True
[html]
directory = htmlcov
title = Customer Support API Coverage Report
[run]
source = app
branch = True # 启用分支覆盖率
[report]
show_missing = True
skip_covered = False
# 运行带分支覆盖率的测试
pytest --cov=app --cov-branch --cov-report=html
现代客户支持系统使用异步操作以获得更好的性能和可扩展性。测试异步代码需要特殊处理以正确等待协程。
import pytest
@pytest.mark.asyncio
async def test_async_ticket_creation():
"""测试异步工单创建服务。"""
from app.services.ticket import create_ticket_async
ticket = await create_ticket_async(
title="Async Test Ticket",
description="Testing async operations",
priority="high",
customer_email="async@example.com"
)
assert ticket.id is not None
assert ticket.status == "open"
assert ticket.priority == "high"
import pytest
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
@pytest.fixture
async def async_db_session():
"""提供异步数据库会话。"""
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/support_test",
echo=False,
future=True
)
async_session_maker = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
async with async_session_maker() as session:
yield session
await session.rollback()
await engine.dispose()
@pytest.mark.asyncio
async def test_async_database_operation(async_db_session):
"""测试异步数据库操作。"""
from app.models import Ticket
from sqlalchemy import select
# 创建工单
ticket = Ticket(title="Async DB Test", priority="medium")
async_db_session.add(ticket)
await async_db_session.commit()
# 查询工单
result = await async_db_session.execute(
select(Ticket).where(Ticket.title == "Async DB Test")
)
found_ticket = result.scalar_one()
assert found_ticket.id is not None
assert found_ticket.priority == "medium"
@pytest.mark.asyncio
async def test_background_email_task(mocker):
"""测试发送电子邮件的异步后台任务。"""
from app.tasks.email import process_ticket_notifications
mock_send = mocker.patch(
'app.tasks.email.send_email_async',
new_callable=AsyncMock
)
mock_send.return_value = {"status": "sent", "message_id": "msg_456"}
await process_ticket_notifications(ticket_id=123)
mock_send.assert_awaited_once()
call_kwargs = mock_send.call_args.kwargs
assert "ticket_id" in call_kwargs or call_kwargs.get("ticket_id") == 123
import asyncio
@pytest.mark.asyncio
async def test_concurrent_ticket_updates():
"""测试并发工单更新的处理。"""
from app.services.ticket import update_ticket_async, add_comment_async
ticket_id = 1
async def update_status():
return await update_ticket_async(ticket_id, status="in_progress")
async def add_comment():
return await add_comment_async(ticket_id, "Working on this issue")
async def assign_agent():
return await update_ticket_async(ticket_id, assigned_agent_id=42)
# 并发执行
results = await asyncio.gather(
update_status(),
add_comment(),
assign_agent()
)
assert results[0]["status"] == "in_progress"
assert results[1]["comment_count"] >= 1
assert results[2]["assigned_agent_id"] == 42
# pytest.ini 或 pyproject.toml
[tool.pytest.ini_options]
asyncio_mode = "auto" # 自动检测并运行异步测试
# 或在 pytest.ini 中
[pytest]
asyncio_mode = auto
FastAPI 广泛用于客户支持 API。测试端点可确保正确的请求/响应处理、验证、身份验证和业务逻辑。
from fastapi.testclient import TestClient
from app.main import app
@pytest.fixture
def client():
"""提供 FastAPI 测试客户端。"""
return TestClient(app)
def test_create_ticket_endpoint(client):
"""通过 POST 端点测试工单创建。"""
response = client.post(
"/api/v1/tickets",
json={
"title": "Cannot login to application",
"description": "User receives 'invalid credentials' error",
"priority": "high",
"customer_email": "frustrated@example.com",
"category": "authentication"
}
)
assert response.status_code == 201
data = response.json()
assert data["title"] == "Cannot login to application"
assert data["priority"] == "high"
assert "id" in data
assert "created_at" in data
def test_get_ticket_by_id(client, sample_tickets):
"""测试按 ID 检索特定工单。"""
ticket_id = sample_tickets[0].id
response = client.get(f"/api/v1/tickets/{ticket_id}")
assert response.status_code == 200
data = response.json()
assert data["id"] == ticket_id
assert data["title"] == sample_tickets[0].title
def test_update_ticket_status(client, sample_tickets):
"""通过 PATCH 测试更新工单状态。"""
ticket_id = sample_tickets[0].id
response = client.patch(
f"/api/v1/tickets/{ticket_id}",
json={"status": "in_progress"}
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "in_progress"
assert data["id"] == ticket_id
可以覆盖 FastAPI 的依赖注入以进行测试:
from app.dependencies import get_db, get_current_user
from app.models import User
@pytest.fixture
def authenticated_client(client, mocker):
"""提供经过身份验证的测试客户端。"""
mock_user = User(
id=1,
email="agent@support.com",
role="agent",
name="Test Agent"
)
async def override_get_current_user():
return mock_user
app.dependency_overrides[get_current_user] = override_get_current_user
yield client
app.dependency_overrides.clear()
def test_protected_endpoint(authenticated_client):
"""测试需要身份验证的端点。"""
response = authenticated_client.get("/api/v1/admin/users")
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
@pytest.fixture
def admin_client(client, mocker):
"""提供以管理员身份进行身份验证的客户端。"""
mock_admin = User(
id=1,
email="admin@support.com",
role="admin",
permissions=["read", "write", "delete", "admin"]
)
async def override_get_current_user():
return mock_admin
app.dependency_overrides[get_current_user] = override_get_current_user
yield client
app.dependency_overrides.clear()
def test_admin_only_endpoint(admin_client):
"""测试需要管理员权限的端点。"""
response = admin_client.delete("/api/v1/tickets/123")
assert response.status_code in [200, 204, 404] # 成功或未找到
@pytest.fixture
def client_with_db(db_session):
"""提供带有数据库会话覆盖的 FastAPI 客户端。"""
def override_get_db():
try:
yield db_session
finally:
pass
app.dependency_overrides[get_db] = override_get_db
client = TestClient(app)
yield client
app.dependency_overrides.clear()
def test_ticket_lifecycle(client_with_db, db_session):
"""测试具有数据库持久性的完整工单生命周期。"""
# 创建工单
create_response = client_with_db.post(
"/api/v1/tickets",
json={
"title": "Lifecycle Test",
"description": "Testing full CRUD cycle",
"priority": "medium"
}
)
assert create_response.status_code == 201
ticket_id = create_response.json()["id"]
# 读取工单
get_response = client_with_db.get(f"/api/v1/tickets/{ticket_id}")
assert get_response.status_code == 200
assert get_response.json()["title"] == "Lifecycle Test"
# 更新工单
update_response = client_with_db.patch(
f"/api/v1/tickets/{ticket_id}",
json={"status": "resolved", "priority": "low"}
)
assert update_response.status_code == 200
assert update_response.json()["status"] == "resolved"
# 在数据库中验证
from app.models import Ticket
ticket = db_session.query(Ticket).filter_by(id=ticket_id).first()
assert ticket is not None
assert ticket.status == "resolved"
assert ticket.priority == "low"
# 删除工单
delete_response = client_with_db.delete(f"/api/v1/tickets/{ticket_id}")
assert delete_response.status_code in [200, 204]
# 验证删除
get_deleted = client_with_db.get(f"/api/v1/tickets/{ticket_id}")
assert get_deleted.status_code == 404
def test_invalid_ticket_creation(client):
"""测试无效的工单数据返回适当的验证错误。"""
# 缺少必填字段
response = client.post(
"/api/v1/tickets",
json={"description": "Missing title"}
)
assert response.status_code == 422
# 无效的优先级
response = client.post(
"/api/v1/tickets",
json={
"title": "Test",
"description": "Test",
"priority": "invalid_priority"
}
)
assert response.status_code == 422
errors = response.json()["detail"]
assert any("priority" in str(e).lower() for e in errors)
# 无效的电子邮件格式
response = client.post(
"/api/v1/tickets",
json={
"title": "Test",
"description": "Test",
"customer_email": "not_an_email"
}
)
assert response.status_code == 422
测试数据库模型、查询、关系和操作可确保客户支持系统中的数据完整性。
from app.models import Ticket, Comment, User, Customer
def test_ticket_model_creation(db_session):
"""测试创建和持久化工单模型。"""
customer = Customer(
email="customer@example.com",
name="John Doe",
tier="enterprise"
)
db_session.add(customer)
db_session.flush()
ticket = Ticket(
title="Database Connection Error",
description="Cannot connect to production database",
priority="critical",
status="open",
customer_id=customer.id,
category="technical"
)
db_session.add(ticket)
db_session.commit()
assert ticket.id is not None
assert ticket.created_at is not None
assert ticket.customer.email == "customer@example.com"
def test_ticket_model_defaults(db_session):
"""测试工单模型上的默认值。"""
ticket = Ticket(title="Test", description="Test")
db_session.add(ticket)
db_session.commit()
assert ticket.status == "open" # 默认状态
assert ticket.priority == "medium" # 默认优先级
assert ticket.created_at is not None
assert ticket.updated_at is not None
def test_ticket_comments_relationship(db_session):
"""测试工单和评论之间的一对多关系。"""
ticket = Ticket(
title="Test Ticket",
description="Testing relationships",
priority="low"
)
db_session.add(ticket)
db_session.flush()
# 添加多个评论
comment1 = Comment(
ticket_id=ticket.id,
content="First comment",
author_email="agent1@support.com"
)
comment2 = Comment(
ticket_id=ticket.id,
content="Second comment",
author_email="agent2@support.com"
)
db_session.add_all([comment1, comment2])
db_session.commit()
# 重新加载工单并验证关系
db_session.expire(ticket)
assert len(ticket.comments) == 2
assert ticket.comments[0].content == "First comment"
assert ticket.comments[1].content == "Second comment"
def test_customer_tickets_relationship(db_session):
"""测试客户和工单之间的一对多关系。"""
customer = Customer(
email="loyal@customer.com",
name="Loyal Customer"
)
db_session.add(customer)
db_session.flush()
# 为同一客户创建多个工单
for i in range(3):
ticket = Ticket(
title=f"Issue #{i+1}",
description=f"Description {
pytest is the industry-standard testing framework for Python applications, offering powerful features that enable comprehensive, maintainable, and scalable test suites. This skill focuses specifically on customer support tech enablement, providing patterns and practices for testing backend support systems, ticketing platforms, knowledge bases, and customer data platforms.
Customer support systems require rigorous testing due to their mission-critical nature. Downtime or bugs directly impact customer satisfaction, agent productivity, and business operations. This skill provides comprehensive guidance on testing all aspects of support systems using pytest.
Customer support applications have specific testing needs:
pytest addresses these needs through:
Fixtures are pytest's killer feature, providing reusable setup/teardown logic and dependency injection. For customer support systems, fixtures manage databases, API clients, test data, and external service mocks.
import pytest
from app.models import Ticket, Customer, Agent
@pytest.fixture
def support_ticket():
"""Provide a basic support ticket dictionary."""
return {
"id": 1,
"title": "Cannot access account",
"description": "User unable to login after password reset",
"status": "open",
"priority": "high",
"customer_email": "user@example.com",
"category": "authentication"
}
def test_ticket_structure(support_ticket):
"""Test ticket has required fields."""
assert support_ticket["status"] == "open"
assert support_ticket["priority"] in ["low", "medium", "high", "critical"]
assert "@" in support_ticket["customer_email"]
Control when fixtures are created and destroyed using scopes:
function (default): Created once per test function, destroyed after test completes
class : Created once per test class, shared across all methods
module : Created once per module file, shared across all tests in file
package : Created once per package, shared across all tests in package
session : Created once per entire test session, shared across all tests
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, Session from typing import Generator
@pytest.fixture(scope="session") def database_engine(): """Create database engine once per session.""" engine = create_engine( "postgresql://support_user:password@localhost/support_test", echo=False, pool_pre_ping=True, pool_size=10, max_overflow=20 ) # Create all tables from app.database import Base Base.metadata.create_all(engine)
yield engine
# Cleanup: drop all tables and dispose engine
Base.metadata.drop_all(engine)
engine.dispose()
@pytest.fixture(scope="function") def db_session(database_engine) -> Generator[Session, None, None]: """Create a new database session for each test with automatic rollback.""" SessionLocal = sessionmaker(bind=database_engine) session = SessionLocal()
try:
yield session
finally:
session.rollback() # Rollback any changes
session.close()
Fixtures that run automatically without being explicitly requested:
@pytest.fixture(autouse=True)
def reset_caches():
"""Clear all caches before each test."""
from app.cache import ticket_cache, customer_cache, agent_cache
ticket_cache.clear()
customer_cache.clear()
agent_cache.clear()
yield
# Optional cleanup after test
ticket_cache.clear()
customer_cache.clear()
agent_cache.clear()
@pytest.fixture(autouse=True, scope="session")
def configure_logging():
"""Configure logging for test session."""
import logging
logging.basicConfig(level=logging.WARNING)
logging.getLogger("sqlalchemy.engine").setLevel(logging.ERROR)
Fixtures can depend on other fixtures, creating dependency chains:
@pytest.fixture
def database_url():
"""Provide database URL for testing."""
return "postgresql://test:test@localhost:5432/support_test"
@pytest.fixture
def engine(database_url):
"""Create SQLAlchemy engine from database URL."""
from sqlalchemy import create_engine
return create_engine(database_url, echo=False)
@pytest.fixture
def session(engine):
"""Create database session from engine."""
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=engine)
session = Session()
yield session
session.close()
@pytest.fixture
def sample_customer(session):
"""Create a sample customer using database session."""
customer = Customer(
email="test@example.com",
name="Test Customer",
tier="premium",
company="Acme Corp"
)
session.add(customer)
session.commit()
return customer
@pytest.fixture
def sample_ticket(session, sample_customer):
"""Create a sample ticket linked to sample customer."""
ticket = Ticket(
title="Test Ticket",
description="Test description",
priority="high",
status="open",
customer_id=sample_customer.id
)
session.add(ticket)
session.commit()
return ticket
Create fixtures that return factory functions for flexible test data creation:
@pytest.fixture
def ticket_factory(db_session):
"""Factory for creating tickets with custom attributes."""
created_tickets = []
def _create_ticket(
title="Default Ticket",
description="Default description",
priority="medium",
status="open",
**kwargs
):
ticket = Ticket(
title=title,
description=description,
priority=priority,
status=status,
**kwargs
)
db_session.add(ticket)
db_session.commit()
created_tickets.append(ticket)
return ticket
yield _create_ticket
# Cleanup: delete all created tickets
for ticket in created_tickets:
db_session.delete(ticket)
db_session.commit()
def test_multiple_tickets(ticket_factory):
"""Test using factory to create multiple tickets."""
high_priority = ticket_factory(priority="high", title="Urgent Issue")
low_priority = ticket_factory(priority="low", title="Minor Request")
assert high_priority.priority == "high"
assert low_priority.priority == "low"
Parametrization allows running the same test with different inputs, essential for comprehensive testing of customer support systems with various priority levels, statuses, user roles, and edge cases.
@pytest.mark.parametrize("priority,expected_sla_hours", [
("critical", 1),
("high", 4),
("medium", 24),
("low", 72)
])
def test_sla_calculation(priority, expected_sla_hours):
"""Test SLA hours calculation for different priorities."""
from app.services.sla import calculate_sla_hours
sla = calculate_sla_hours(priority)
assert sla == expected_sla_hours
@pytest.mark.parametrize("email,is_valid", [
("user@example.com", True),
("user.name+tag@example.co.uk", True),
("user@localhost", True),
("invalid.email", False),
("@example.com", False),
("user@", False),
("", False),
("user @example.com", False),
("user@example .com", False),
])
def test_email_validation(email, is_valid):
"""Test email validation with various valid and invalid inputs."""
from app.validators import is_valid_email
assert is_valid_email(email) == is_valid
@pytest.mark.parametrize("status,priority,assigned,expected_urgent", [
("open", "critical", False, True), # Unassigned critical = urgent
("open", "high", False, True), # Unassigned high = urgent
("open", "medium", False, False), # Unassigned medium = not urgent
("open", "low", False, False), # Unassigned low = not urgent
("in_progress", "critical", True, False), # Assigned critical = not urgent
("in_progress", "high", True, False), # Assigned high = not urgent
("closed", "critical", True, False), # Closed tickets never urgent
("resolved", "high", True, False), # Resolved tickets never urgent
])
def test_urgent_ticket_identification(status, priority, assigned, expected_urgent):
"""Test identification of urgent tickets requiring immediate attention."""
from app.models import Ticket
ticket = Ticket(status=status, priority=priority)
if assigned:
ticket.assigned_agent_id = 123
assert ticket.is_urgent() == expected_urgent
@pytest.fixture(params=["sqlite", "postgresql"])
def database_type(request):
"""Parametrized fixture for different database types."""
return request.param
def test_database_connection(database_type):
"""Test runs twice: once with SQLite, once with PostgreSQL."""
from app.database import get_connection
connection = get_connection(database_type)
assert connection.is_connected()
connection.close()
@pytest.fixture(params=[
{"role": "admin", "can_delete": True, "can_reassign": True},
{"role": "agent", "can_delete": False, "can_reassign": True},
{"role": "viewer", "can_delete": False, "can_reassign": False},
])
def user_permissions(request):
"""Parametrized fixture for different user roles and permissions."""
return request.param
def test_permission_checks(user_permissions):
"""Test permission validation for different roles."""
from app.auth import check_permission
can_delete = check_permission(user_permissions["role"], "delete_ticket")
can_reassign = check_permission(user_permissions["role"], "reassign_ticket")
assert can_delete == user_permissions["can_delete"]
assert can_reassign == user_permissions["can_reassign"]
@pytest.mark.parametrize("email,expected_valid,reason", [
pytest.param("user@example.com", True, "valid", id="valid-standard-email"),
pytest.param("user+tag@example.com", True, "valid", id="valid-with-plus"),
pytest.param("invalid", False, "missing-at", id="no-at-symbol"),
pytest.param("@example.com", False, "no-user", id="missing-username"),
pytest.param("user@", False, "no-domain", id="missing-domain"),
pytest.param("user@.com", False, "invalid-domain", id="dot-at-domain-start"),
])
def test_email_validation_detailed(email, expected_valid, reason):
"""Test email validation with detailed reasoning."""
from app.validators import validate_email_detailed
result = validate_email_detailed(email)
assert result.is_valid == expected_valid
if not expected_valid:
assert reason in result.error_code.lower()
@pytest.mark.parametrize("ticket_data,expected_status", [
({"priority": "critical", "auto_assign": True}, "assigned"),
({"priority": "high", "auto_assign": True}, "assigned"),
({"priority": "medium", "auto_assign": False}, "open"),
({"priority": "low", "auto_assign": False}, "open"),
], ids=["critical-auto", "high-auto", "medium-manual", "low-manual"])
def test_auto_assignment(ticket_data, expected_status):
"""Test automatic ticket assignment based on priority."""
from app.services.assignment import process_new_ticket
ticket = process_new_ticket(**ticket_data)
assert ticket.status == expected_status
Mocking is essential for isolating units of code from external dependencies like email services, payment gateways, CRM systems, and third-party APIs. The pytest-mock plugin provides a clean interface to Python's unittest.mock.
def test_ticket_notification(mocker, db_session):
"""Test that creating a ticket sends email notification."""
# Mock the email service
mock_send = mocker.patch('app.services.email.EmailService.send_email')
mock_send.return_value = {"status": "sent", "message_id": "msg_123"}
# Create ticket
from app.services.ticket import TicketService
service = TicketService(db_session)
ticket = service.create_ticket(
title="Login Issue",
description="Cannot access account",
customer_email="customer@example.com",
priority="high"
)
# Verify email was sent
mock_send.assert_called_once()
call_args = mock_send.call_args
assert call_args.kwargs['to'] == "customer@example.com"
assert "Login Issue" in call_args.kwargs['subject']
assert ticket.id is not None
def test_customer_tier_lookup(mocker, api_client):
"""Test customer tier lookup from external CRM."""
# Mock external CRM API response
mock_response = mocker.Mock()
mock_response.status_code = 200
mock_response.json.return_value = {
"customer_id": "CRM_12345",
"tier": "enterprise",
"support_plan": "24/7 premium",
"account_manager": "jane@company.com"
}
mocker.patch('requests.get', return_value=mock_response)
# Test endpoint that uses CRM data
response = api_client.get("/api/customers/customer@example.com/tier")
assert response.status_code == 200
data = response.json()
assert data["tier"] == "enterprise"
assert data["support_plan"] == "24/7 premium"
def test_retry_logic_on_api_failure(mocker):
"""Test retry mechanism when external API fails temporarily."""
from app.services.external import call_with_retry
mock_api = mocker.patch('app.services.external.call_external_api')
# First two calls fail, third succeeds
mock_api.side_effect = [
ConnectionError("Connection timeout"),
ConnectionError("Connection timeout"),
{"success": True, "data": "response"}
]
result = call_with_retry(max_attempts=3, backoff_seconds=0)
assert result["success"] is True
assert result["data"] == "response"
assert mock_api.call_count == 3
def test_email_fallback_on_primary_failure(mocker):
"""Test email service fallback when primary provider fails."""
from app.services.email import send_email_with_fallback
mock_primary = mocker.patch('app.services.email.sendgrid_send')
mock_fallback = mocker.patch('app.services.email.ses_send')
# Primary fails, fallback succeeds
mock_primary.side_effect = Exception("SendGrid API error")
mock_fallback.return_value = {"status": "sent", "provider": "ses"}
result = send_email_with_fallback(
to="customer@example.com",
subject="Ticket Update",
body="Your ticket has been updated"
)
assert result["status"] == "sent"
assert result["provider"] == "ses"
mock_primary.assert_called_once()
mock_fallback.assert_called_once()
def test_cache_utilization(mocker, db_session):
"""Spy on cache to verify it's being used correctly."""
from app.cache import ticket_cache
spy_get = mocker.spy(ticket_cache, 'get')
spy_set = mocker.spy(ticket_cache, 'set')
from app.services.ticket import get_ticket
# First call: cache miss, should query database
ticket1 = get_ticket(123)
assert spy_get.call_count == 1
assert spy_set.call_count == 1
# Second call: cache hit, should not query database
ticket2 = get_ticket(123)
assert spy_get.call_count == 2
assert spy_set.call_count == 1 # Not called again
assert ticket1.id == ticket2.id
def test_ticket_creation_business_logic(mocker):
"""Test ticket creation logic without database."""
mock_session = mocker.Mock()
mock_session.add = mocker.Mock()
mock_session.commit = mocker.Mock()
mock_session.refresh = mocker.Mock(side_effect=lambda obj: setattr(obj, 'id', 123))
from app.services.ticket import create_ticket_entity
ticket = create_ticket_entity(
mock_session,
title="Test Ticket",
priority="high"
)
assert ticket.id == 123
assert ticket.priority == "high"
mock_session.add.assert_called_once()
mock_session.commit.assert_called_once()
import pytest
from unittest.mock import AsyncMock
@pytest.mark.asyncio
async def test_async_notification_service(mocker):
"""Test async notification service with proper async mocking."""
mock_send = mocker.patch(
'app.services.notification.send_push_notification',
new_callable=AsyncMock
)
mock_send.return_value = {
"delivered": True,
"notification_id": "notif_789"
}
from app.services.notification import NotificationService
service = NotificationService()
result = await service.notify_agent(
agent_id=456,
message="New high-priority ticket assigned to you",
priority="high"
)
assert result["delivered"] is True
assert result["notification_id"] is not None
mock_send.assert_awaited_once()
# Verify call arguments
call_kwargs = mock_send.call_args.kwargs
assert call_kwargs["agent_id"] == 456
assert "high-priority" in call_kwargs["message"]
Code coverage measures what percentage of your code is executed during testing. For customer support systems, comprehensive coverage ensures reliability and helps identify untested edge cases.
# Install pytest-cov
pip install pytest-cov
# Run tests with coverage
pytest --cov=app tests/
# Generate HTML report
pytest --cov=app --cov-report=html tests/
# Show lines not covered
pytest --cov=app --cov-report=term-missing tests/
# Fail if coverage below threshold
pytest --cov=app --cov-fail-under=80 tests/
[pytest]
addopts =
--cov=app
--cov-report=html
--cov-report=term-missing
--cov-report=xml
--cov-fail-under=80
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
markers =
unit: Unit tests
integration: Integration tests
slow: Slow running tests
[run]
source = app
omit =
*/tests/*
*/migrations/*
*/__init__.py
*/config.py
*/venv/*
*/.venv/*
[report]
exclude_lines =
pragma: no cover
def __repr__
def __str__
raise AssertionError
raise NotImplementedError
if __name__ == .__main__.:
if TYPE_CHECKING:
@abstractmethod
@abc.abstractmethod
except ImportError
precision = 2
show_missing = True
[html]
directory = htmlcov
title = Customer Support API Coverage Report
[run]
source = app
branch = True # Enable branch coverage
[report]
show_missing = True
skip_covered = False
# Run with branch coverage
pytest --cov=app --cov-branch --cov-report=html
Modern customer support systems use asynchronous operations for better performance and scalability. Testing async code requires special handling to properly await coroutines.
import pytest
@pytest.mark.asyncio
async def test_async_ticket_creation():
"""Test asynchronous ticket creation service."""
from app.services.ticket import create_ticket_async
ticket = await create_ticket_async(
title="Async Test Ticket",
description="Testing async operations",
priority="high",
customer_email="async@example.com"
)
assert ticket.id is not None
assert ticket.status == "open"
assert ticket.priority == "high"
import pytest
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
@pytest.fixture
async def async_db_session():
"""Provide async database session."""
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/support_test",
echo=False,
future=True
)
async_session_maker = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
async with async_session_maker() as session:
yield session
await session.rollback()
await engine.dispose()
@pytest.mark.asyncio
async def test_async_database_operation(async_db_session):
"""Test async database operations."""
from app.models import Ticket
from sqlalchemy import select
# Create ticket
ticket = Ticket(title="Async DB Test", priority="medium")
async_db_session.add(ticket)
await async_db_session.commit()
# Query ticket
result = await async_db_session.execute(
select(Ticket).where(Ticket.title == "Async DB Test")
)
found_ticket = result.scalar_one()
assert found_ticket.id is not None
assert found_ticket.priority == "medium"
@pytest.mark.asyncio
async def test_background_email_task(mocker):
"""Test async background task for sending emails."""
from app.tasks.email import process_ticket_notifications
mock_send = mocker.patch(
'app.tasks.email.send_email_async',
new_callable=AsyncMock
)
mock_send.return_value = {"status": "sent", "message_id": "msg_456"}
await process_ticket_notifications(ticket_id=123)
mock_send.assert_awaited_once()
call_kwargs = mock_send.call_args.kwargs
assert "ticket_id" in call_kwargs or call_kwargs.get("ticket_id") == 123
import asyncio
@pytest.mark.asyncio
async def test_concurrent_ticket_updates():
"""Test handling of concurrent ticket updates."""
from app.services.ticket import update_ticket_async, add_comment_async
ticket_id = 1
async def update_status():
return await update_ticket_async(ticket_id, status="in_progress")
async def add_comment():
return await add_comment_async(ticket_id, "Working on this issue")
async def assign_agent():
return await update_ticket_async(ticket_id, assigned_agent_id=42)
# Execute concurrently
results = await asyncio.gather(
update_status(),
add_comment(),
assign_agent()
)
assert results[0]["status"] == "in_progress"
assert results[1]["comment_count"] >= 1
assert results[2]["assigned_agent_id"] == 42
# pytest.ini or pyproject.toml
[tool.pytest.ini_options]
asyncio_mode = "auto" # Automatically detect and run async tests
# Or in pytest.ini
[pytest]
asyncio_mode = auto
FastAPI is widely used for customer support APIs. Testing endpoints ensures proper request/response handling, validation, authentication, and business logic.
from fastapi.testclient import TestClient
from app.main import app
@pytest.fixture
def client():
"""Provide FastAPI test client."""
return TestClient(app)
def test_create_ticket_endpoint(client):
"""Test ticket creation via POST endpoint."""
response = client.post(
"/api/v1/tickets",
json={
"title": "Cannot login to application",
"description": "User receives 'invalid credentials' error",
"priority": "high",
"customer_email": "frustrated@example.com",
"category": "authentication"
}
)
assert response.status_code == 201
data = response.json()
assert data["title"] == "Cannot login to application"
assert data["priority"] == "high"
assert "id" in data
assert "created_at" in data
def test_get_ticket_by_id(client, sample_tickets):
"""Test retrieving specific ticket by ID."""
ticket_id = sample_tickets[0].id
response = client.get(f"/api/v1/tickets/{ticket_id}")
assert response.status_code == 200
data = response.json()
assert data["id"] == ticket_id
assert data["title"] == sample_tickets[0].title
def test_update_ticket_status(client, sample_tickets):
"""Test updating ticket status via PATCH."""
ticket_id = sample_tickets[0].id
response = client.patch(
f"/api/v1/tickets/{ticket_id}",
json={"status": "in_progress"}
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "in_progress"
assert data["id"] == ticket_id
FastAPI's dependency injection can be overridden for testing:
from app.dependencies import get_db, get_current_user
from app.models import User
@pytest.fixture
def authenticated_client(client, mocker):
"""Provide authenticated test client."""
mock_user = User(
id=1,
email="agent@support.com",
role="agent",
name="Test Agent"
)
async def override_get_current_user():
return mock_user
app.dependency_overrides[get_current_user] = override_get_current_user
yield client
app.dependency_overrides.clear()
def test_protected_endpoint(authenticated_client):
"""Test endpoint requiring authentication."""
response = authenticated_client.get("/api/v1/admin/users")
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
@pytest.fixture
def admin_client(client, mocker):
"""Provide client authenticated as admin."""
mock_admin = User(
id=1,
email="admin@support.com",
role="admin",
permissions=["read", "write", "delete", "admin"]
)
async def override_get_current_user():
return mock_admin
app.dependency_overrides[get_current_user] = override_get_current_user
yield client
app.dependency_overrides.clear()
def test_admin_only_endpoint(admin_client):
"""Test endpoint requiring admin privileges."""
response = admin_client.delete("/api/v1/tickets/123")
assert response.status_code in [200, 204, 404] # Success or not found
@pytest.fixture
def client_with_db(db_session):
"""Provide FastAPI client with database session override."""
def override_get_db():
try:
yield db_session
finally:
pass
app.dependency_overrides[get_db] = override_get_db
client = TestClient(app)
yield client
app.dependency_overrides.clear()
def test_ticket_lifecycle(client_with_db, db_session):
"""Test complete ticket lifecycle with database persistence."""
# Create ticket
create_response = client_with_db.post(
"/api/v1/tickets",
json={
"title": "Lifecycle Test",
"description": "Testing full CRUD cycle",
"priority": "medium"
}
)
assert create_response.status_code == 201
ticket_id = create_response.json()["id"]
# Read ticket
get_response = client_with_db.get(f"/api/v1/tickets/{ticket_id}")
assert get_response.status_code == 200
assert get_response.json()["title"] == "Lifecycle Test"
# Update ticket
update_response = client_with_db.patch(
f"/api/v1/tickets/{ticket_id}",
json={"status": "resolved", "priority": "low"}
)
assert update_response.status_code == 200
assert update_response.json()["status"] == "resolved"
# Verify in database
from app.models import Ticket
ticket = db_session.query(Ticket).filter_by(id=ticket_id).first()
assert ticket is not None
assert ticket.status == "resolved"
assert ticket.priority == "low"
# Delete ticket
delete_response = client_with_db.delete(f"/api/v1/tickets/{ticket_id}")
assert delete_response.status_code in [200, 204]
# Verify deletion
get_deleted = client_with_db.get(f"/api/v1/tickets/{ticket_id}")
assert get_deleted.status_code == 404
def test_invalid_ticket_creation(client):
"""Test that invalid ticket data returns proper validation errors."""
# Missing required field
response = client.post(
"/api/v1/tickets",
json={"description": "Missing title"}
)
assert response.status_code == 422
# Invalid priority
response = client.post(
"/api/v1/tickets",
json={
"title": "Test",
"description": "Test",
"priority": "invalid_priority"
}
)
assert response.status_code == 422
errors = response.json()["detail"]
assert any("priority" in str(e).lower() for e in errors)
# Invalid email format
response = client.post(
"/api/v1/tickets",
json={
"title": "Test",
"description": "Test",
"customer_email": "not_an_email"
}
)
assert response.status_code == 422
Testing database models, queries, relationships, and operations ensures data integrity in customer support systems.
from app.models import Ticket, Comment, User, Customer
def test_ticket_model_creation(db_session):
"""Test creating and persisting ticket model."""
customer = Customer(
email="customer@example.com",
name="John Doe",
tier="enterprise"
)
db_session.add(customer)
db_session.flush()
ticket = Ticket(
title="Database Connection Error",
description="Cannot connect to production database",
priority="critical",
status="open",
customer_id=customer.id,
category="technical"
)
db_session.add(ticket)
db_session.commit()
assert ticket.id is not None
assert ticket.created_at is not None
assert ticket.customer.email == "customer@example.com"
def test_ticket_model_defaults(db_session):
"""Test default values on ticket model."""
ticket = Ticket(title="Test", description="Test")
db_session.add(ticket)
db_session.commit()
assert ticket.status == "open" # Default status
assert ticket.priority == "medium" # Default priority
assert ticket.created_at is not None
assert ticket.updated_at is not None
def test_ticket_comments_relationship(db_session):
"""Test one-to-many relationship between tickets and comments."""
ticket = Ticket(
title="Test Ticket",
description="Testing relationships",
priority="low"
)
db_session.add(ticket)
db_session.flush()
# Add multiple comments
comment1 = Comment(
ticket_id=ticket.id,
content="First comment",
author_email="agent1@support.com"
)
comment2 = Comment(
ticket_id=ticket.id,
content="Second comment",
author_email="agent2@support.com"
)
db_session.add_all([comment1, comment2])
db_session.commit()
# Reload ticket and verify relationship
db_session.expire(ticket)
assert len(ticket.comments) == 2
assert ticket.comments[0].content == "First comment"
assert ticket.comments[1].content == "Second comment"
def test_customer_tickets_relationship(db_session):
"""Test one-to-many relationship between customers and tickets."""
customer = Customer(
email="loyal@customer.com",
name="Loyal Customer"
)
db_session.add(customer)
db_session.flush()
# Create multiple tickets for same customer
for i in range(3):
ticket = Ticket(
title=f"Issue #{i+1}",
description=f"Description {i+1}",
customer_id=customer.id
)
db_session.add(ticket)
db_session.commit()
# Verify customer has all tickets
db_session.expire(customer)
assert len(customer.tickets) == 3
assert all(t.customer_id == customer.id for t in customer.tickets)
def test_filter_tickets_by_status(db_session):
"""Test filtering tickets by status."""
# Create tickets with different statuses
tickets = [
Ticket(title=f"Ticket {i}", status="open" if i % 2 == 0 else "closed")
for i in range(10)
]
db_session.add_all(tickets)
db_session.commit()
# Query open tickets
open_tickets = db_session.query(Ticket).filter_by(status="open").all()
assert len(open_tickets) == 5
assert all(t.status == "open" for t in open_tickets)
# Query closed tickets
closed_tickets = db_session.query(Ticket).filter_by(status="closed").all()
assert len(closed_tickets) == 5
assert all(t.status == "closed" for t in closed_tickets)
def test_complex_ticket_query(db_session):
"""Test complex multi-filter ticket query."""
from sqlalchemy import and_, or_
from datetime import datetime, timedelta
# Create diverse set of tickets
old_critical = Ticket(
title="Old Critical",
priority="critical",
status="open",
created_at=datetime.utcnow() - timedelta(days=7)
)
recent_high = Ticket(
title="Recent High",
priority="high",
status="open",
created_at=datetime.utcnow() - timedelta(hours=2)
)
old_low = Ticket(
title="Old Low",
priority="low",
status="open",
created_at=datetime.utcnow() - timedelta(days=30)
)
db_session.add_all([old_critical, recent_high, old_low])
db_session.commit()
# Query: open tickets that are either critical OR created in last 24 hours
cutoff = datetime.utcnow() - timedelta(days=1)
urgent_tickets = db_session.query(Ticket).filter(
and_(
Ticket.status == "open",
or_(
Ticket.priority == "critical",
Ticket.created_at >= cutoff
)
)
).all()
assert len(urgent_tickets) == 2
ticket_titles = [t.title for t in urgent_tickets]
assert "Old Critical" in ticket_titles
assert "Recent High" in ticket_titles
assert "Old Low" not in ticket_titles
def test_transaction_rollback_on_error(db_session):
"""Test that failed transactions are properly rolled back."""
from sqlalchemy.exc import IntegrityError
# Create initial ticket
ticket1 = Ticket(title="First Ticket", description="Test")
db_session.add(ticket1)
db_session.commit()
initial_count = db_session.query(Ticket).count()
# Attempt to create invalid ticket (will fail)
try:
ticket2 = Ticket(title="Second Ticket", description="Test")
db_session.add(ticket2)
db_session.flush()
# Simulate constraint violation
ticket3 = Ticket(
title="Third Ticket",
external_id="DUPLICATE_ID" # Assuming unique constraint
)
db_session.add(ticket3)
ticket4 = Ticket(
title="Fourth Ticket",
external_id="DUPLICATE_ID" # Will violate unique constraint
)
db_session.add(ticket4)
db_session.commit()
except IntegrityError:
db_session.rollback()
# Verify count hasn't changed
final_count = db_session.query(Ticket).count()
assert final_count == initial_count
Well-designed database fixtures are crucial for testing customer support systems with complex data relationships.
import pytest
from sqlalchemy import create_engine, event
from sqlalchemy.orm import sessionmaker
from app.database import Base
@pytest.fixture(scope="session")
def db_engine():
"""Create test database engine for entire session."""
engine = create_engine(
"postgresql://test:test@localhost:5432/support_test",
echo=False,
pool_pre_ping=True
)
# Create all tables
Base.metadata.create_all(engine)
yield engine
# Cleanup: drop all tables
Base.metadata.drop_all(engine)
engine.dispose()
@pytest.fixture
def db_session(db_engine):
"""Provide clean database session with automatic rollback."""
connection = db_engine.connect()
transaction = connection.begin()
session = sessionmaker(bind=connection)()
# Enable savepoints for nested transactions
@event.listens_for(session, "after_transaction_end")
def restart_savepoint(session, transaction):
if transaction.nested and not transaction._parent.nested:
session.expire_all()
session.begin_nested()
session.begin_nested()
yield session
# Cleanup
session.close()
transaction.rollback()
connection.close()
@pytest.fixture
def sample_customers(db_session):
"""Create sample customers with various tiers."""
customers = [
Customer(email="free@example.com", name="Free User", tier="free"),
Customer(email="basic@example.com", name="Basic User", tier="basic"),
Customer(email="premium@example.com", name="Premium User", tier="premium"),
Customer(email="enterprise@example.com", name="Enterprise User", tier="enterprise"),
]
db_session.add_all(customers)
db_session.commit()
return customers
@pytest.fixture
def sample_agents(db_session):
"""Create sample support agents."""
agents = [
User(email="agent1@support.com", name="Agent One", role="agent"),
User(email="agent2@support.com", name="Agent Two", role="agent"),
User(email="senior@support.com", name="Senior Agent", role="senior_agent"),
]
db_session.add_all(agents)
db_session.commit()
return agents
@pytest.fixture
def sample_tickets(db_session, sample_customers):
"""Create diverse set of sample tickets."""
tickets = []
priorities = ["low", "medium", "high", "critical"]
statuses = ["open", "in_progress", "resolved", "closed"]
for i in range(12):
ticket = Ticket(
title=f"Sample Ticket #{i+1}",
description=f"Description for ticket {i+1}",
priority=priorities[i % len(priorities)],
status=statuses[i % len(statuses)],
customer_id=sample_customers[i % len(sample_customers)].id,
category=["technical", "billing", "account"][i % 3]
)
tickets.append(ticket)
db_session.add_all(tickets)
db_session.commit()
return tickets
@pytest.fixture
def customer_factory(db_session):
"""Factory for creating customers with custom attributes."""
created = []
def _create_customer(
email=None,
name=None,
tier="basic",
**kwargs
):
import uuid
customer = Customer(
email=email or f"customer{uuid.uuid4().hex[:8]}@example.com",
name=name or f"Customer {uuid.uuid4().hex[:8]}",
tier=tier,
**kwargs
)
db_session.add(customer)
db_session.commit()
created.append(customer)
return customer
yield _create_customer
# Cleanup
for customer in created:
db_session.delete(customer)
db_session.commit()
@pytest.fixture
def ticket_factory(db_session, customer_factory):
"""Factory for creating tickets with automatic customer creation."""
created = []
def _create_ticket(
title=None,
customer=None,
priority="medium",
status="open",
**kwargs
):
import uuid
if customer is None:
customer = customer_factory()
ticket = Ticket(
title=title or f"Ticket {uuid.uuid4().hex[:8]}",
description=kwargs.pop("description", "Test ticket description"),
customer_id=customer.id,
priority=priority,
status=status,
**kwargs
)
db_session.add(ticket)
db_session.commit()
created.append(ticket)
return ticket
yield _create_ticket
# Cleanup
for ticket in created:
db_session.delete(ticket)
db_session.commit()
# Usage in tests
def test_with_factories(customer_factory, ticket_factory):
"""Test using factory fixtures for flexible test data creation."""
# Create premium customer
premium_customer = customer_factory(tier="enterprise", name="Big Corp")
# Create multiple tickets for same customer
critical_ticket = ticket_factory(
customer=premium_customer,
priority="critical",
title="Production outage"
)
normal_ticket = ticket_factory(
customer=premium_customer,
priority="low",
title="Feature request"
)
assert critical_ticket.customer_id == premium_customer.id
assert normal_ticket.customer_id == premium_customer.id
assert premium_customer.tier == "enterprise"
Real-world testing scenarios specific to customer support systems.
def test_round_robin_assignment(db_session, sample_agents):
"""Test round-robin ticket assignment to available agents."""
from app.services.assignment import assign_ticket_round_robin
tickets = []
for i in range(9):
ticket = Ticket(
title=f"Assignment Test {i}",
priority="medium",
status="open"
)
db_session.add(ticket)
db_session.flush()
assign_ticket_round_robin(ticket, db_session)
tickets.append(ticket)
db_session.commit()
# Verify even distribution
for agent in sample_agents:
assigned_count = len([t for t in tickets if t.assigned_agent_id == agent.id])
assert assigned_count == 3 # 9 tickets / 3 agents = 3 each
def test_priority_based_assignment(db_session, sample_agents):
"""Test that critical tickets go to senior agents."""
from app.services.assignment import assign_ticket_by_priority
critical_ticket = Ticket(
title="Critical Issue",
priority="critical",
status="open"
)
db_session.add(critical_ticket)
db_session.flush()
assign_ticket_by_priority(critical_ticket, db_session)
db_session.commit()
# Find senior agent
senior_agent = next(a for a in sample_agents if a.role == "senior_agent")
assert critical_ticket.assigned_agent_id == senior_agent.id
from datetime import datetime, timedelta
def test_sla_breach_detection(db_session):
"""Test detection of SLA breaches for different priorities."""
from app.services.sla import find_sla_breaches
now = datetime.utcnow()
# Critical ticket created 2 hours ago (SLA: 1 hour) - BREACHED
breached_critical = Ticket(
title="Critical - Breached",
priority="critical",
status="open",
created_at=now - timedelta(hours=2)
)
# High priority created 5 hours ago (SLA: 4 hours) - BREACHED
breached_high = Ticket(
title="High - Breached",
priority="high",
status="open",
created_at=now - timedelta(hours=5)
)
# Medium priority created 2 hours ago (SLA: 24 hours) - OK
ok_medium = Ticket(
title="Medium - OK",
priority="medium",
status="open",
created_at=now - timedelta(hours=2)
)
# Critical ticket but already assigned (not breached)
assigned_critical = Ticket(
title="Critical - Assigned",
priority="critical",
status="in_progress",
created_at=now - timedelta(hours=2),
assigned_agent_id=1
)
db_session.add_all([breached_critical, breached_high, ok_medium, assigned_critical])
db_session.commit()
breaches = find_sla_breaches(db_session)
breach_titles = [b.title for b in breaches]
assert len(breaches) == 2
assert "Critical - Breached" in breach_titles
assert "High - Breached" in breach_titles
assert "Medium - OK" not in breach_titles
assert "Critical - Assigned" not in breach_titles
def test_sla_time_remaining_calculation(db_session):
"""Test calculation of remaining SLA time."""
from app.services.sla import calculate_sla_remaining
now = datetime.utcnow()
# High priority ticket created 1 hour ago (SLA: 4 hours)
ticket = Ticket(
title="SLA Test",
priority="high",
status="open",
created_at=now - timedelta(hours=1)
)
db_session.add(ticket)
db_session.commit()
remaining = calculate_sla_remaining(ticket)
# Should have ~3 hours remaining (4 hour SLA - 1 hour elapsed)
assert 2.9 <= remaining.total_seconds() / 3600 <= 3.1
def test_automatic_escalation(db_session):
"""Test automatic ticket escalation for overdue tickets."""
from app.services.escalation import escalate_overdue_tickets
now = datetime.utcnow()
# Medium priority ticket open for 3 days without assignment
overdue_ticket = Ticket(
title="Overdue Ticket",
priority="medium",
status="open",
created_at=now - timedelta(days=3)
)
# Recent ticket (should not escalate)
recent_ticket = Ticket(
title="Recent Ticket",
priority="medium",
status="open",
created_at=now - timedelta(hours=2)
)
db_session.add_all([overdue_ticket, recent_ticket])
db_session.commit()
# Run escalation logic
escalated_count = escalate_overdue_tickets(db_session)
db_session.refresh(overdue_ticket)
db_session.refresh(recent_ticket)
assert escalated_count == 1
assert overdue_ticket.priority == "high" # Escalated from medium
assert overdue_ticket.escalated is True
assert recent_ticket.priority == "medium" # Unchanged
assert recent_ticket.escalated is False
def test_status_change_notification(mocker, db_session):
"""Test email notification sent when ticket status changes."""
from app.services.ticket import update_ticket_status
mock_send_email = mocker.patch('app.services.email.send_email')
mock_send_email.return_value = {"status": "sent"}
ticket = Ticket(
title="Notification Test",
customer_email="customer@example.com",
status="open"
)
db_session.add(ticket)
db_session.commit()
# Update status
update_ticket_status(ticket.id, "resolved", db_session)
# Verify notification sent
mock_send_email.assert_called_once()
call_kwargs = mock_send_email.call_args.kwargs
assert call_kwargs["to"] == "customer@example.com"
assert "resolved" in call_kwargs["template_name"].lower()
assert call_kwargs["template_data"]["ticket_id"] == ticket.id
def test_assignment_notification(mocker, db_session, sample_agents):
"""Test notification sent to agent when ticket assigned."""
from app.services.ticket import assign_ticket
mock_notify = mocker.patch('app.services.notification.notify_agent')
ticket = Ticket(title="Assignment Notification Test", status="open")
db_session.add(ticket)
db_session.commit()
agent = sample_agents[0]
assign_ticket(ticket.id, agent.id, db_session)
mock_notify.assert_called_once()
call_args = mock_notify.call_args
assert call_args[0][0] == agent.id # First arg is agent_id
assert "assigned" in call_args[0][1].lower() # Second arg is message
Testing integrations with external services and systems.
@pytest.mark.integration
def test_crm_sync_integration(mocker, db_session):
"""Test synchronization with external CRM system."""
from app.services.crm import sync_customer_to_crm
# Mock external CRM API
mock_response = mocker.Mock()
mock_response.status_code = 200
mock_response.json.return_value = {
"crm_customer_id": "CRM_98765",
"sync_status": "success",
"last_sync": "2025-01-15T10:30:00Z"
}
mocker.patch('requests.post', return_value=mock_response)
customer = Customer(
email="crm.test@example.com",
name="CRM Test Customer",
tier="enterprise"
)
db_session.add(customer)
db_session.commit()
result = sync_customer_to_crm(customer.id, db_session)
assert result["sync_status"] == "success"
assert result["crm_customer_id"] == "CRM_98765"
# Verify customer record updated
db_session.refresh(customer)
assert customer.crm_id == "CRM_98765"
def test_webhook_delivery_on_ticket_created(mocker, db_session):
"""Test webhook delivery when new ticket is created."""
from app.services.webhooks import deliver_webhook
# Setup webhook subscription
webhook = WebhookSubscription(
url="https://customer-app.example.com/webhooks/support",
events=["ticket.created", "ticket.updated"],
active=True
)
db_session.add(webhook)
db_session.commit()
mock_post = mocker.patch('requests.post')
mock_post.return_value.status_code = 200
ticket = Ticket(
title="Webhook Test Ticket",
priority="high"
)
db_session.add(ticket)
db_session.commit()
# Trigger webhook delivery
from app.services.webhooks import trigger_webhooks
trigger_webhooks("ticket.created", ticket.to_dict(), db_session)
# Verify webhook called
assert mock_post.call_count == 1
call_kwargs = mock_post.call_args.kwargs
assert call_kwargs["url"] == "https://customer-app.example.com/webhooks/support"
assert call_kwargs["json"]["event"] == "ticket.created"
assert call_kwargs["json"]["data"]["id"] == ticket.id
@pytest.mark.asyncio
async def test_background_export_job(db_session, sample_tickets):
"""Test asynchronous ticket export background job."""
from app.tasks.export import export_tickets_job
job = Job(
type="export_tickets",
status="pending",
params={
"format": "csv",
"date_range": "last_7_days",
"filters": {"priority": ["high", "critical"]}
}
)
db_session.add(job)
db_session.commit()
# Process job
result = await export_tickets_job(job.id, db_session)
db_session.refresh(job)
assert job.status == "completed"
assert job.result is not None
assert "file_url" in job.result
assert "row_count" in job.result
assert result["success"] is True
Structure tests to mirror application architecture:
tests/
├── unit/ # Fast, isolated unit tests
│ ├── test_models.py
│ ├── test_validators.py
│ ├── test_utils.py
│ └── services/
│ ├── test_ticket_service.py
│ ├── test_email_service.py
│ └── test_sla_service.py
├── integration/ # Tests with database/external deps
│ ├── test_api_tickets.py
│ ├── test_api_customers.py
│ ├── test_database_ops.py
│ └── test_external_apis.py
├── e2e/ # End-to-end workflow tests
│ ├── test_ticket_lifecycle.py
│ └── test_customer_journey.py
├── performance/ # Performance/load tests
│ └── test_api_performance.py
└── conftest.py # Shared fixtures
# Good - clear, specific test names
def test_high_priority_ticket_sends_immediate_notification_to_assigned_agent():
pass
def test_ticket_auto_closes_after_7_days_without_customer_response():
pass
def test_sla_breach_alert_sent_to_team_lead_when_critical_ticket_unassigned_for_1_hour():
pass
# Bad - vague test names
def test_ticket_notification():
pass
def test_auto_close():
pass
def test_sla():
pass
def test_ticket_assignment():
# Arrange: Set up test data and conditions
agent = create_agent(name="Test Agent", role="agent")
ticket = create_ticket(title="Test", priority="high", status="open")
# Act: Perform the action being tested
result = assign_ticket(ticket.id, agent.id)
# Assert: Verify expected outcomes
assert result.assigned_agent_id == agent.id
assert result.status == "assigned"
assert result.assigned_at is not None
# Each test should be independent and not rely on others
# Bad - tests depend on execution order
def test_create_user():
create_user("test@example.com")
def test_get_user():
user = get_user("test@example.com") # Relies on test_create_user
assert user is not None
# Good - each test is self-contained
@pytest.fixture
def test_user(db_session):
user = User(email="test@example.com")
db_session.add(user)
db_session.commit()
return user
def test_get_user_by_email(db_session, test_user):
found_user = get_user(test_user.email, db_session)
assert found_user.email == test_user.email
# Good
def test_ticket_creation_sets_default_status():
ticket = create_ticket(title="Test")
assert ticket.status == "open"
def test_ticket_creation_sets_created_timestamp():
ticket = create_ticket(title="Test")
assert ticket.created_at is not None
# Bad
def test_ticket_creation():
ticket = create_ticket(title="Test")
assert ticket.status == "open"
assert ticket.created_at is not None
assert ticket.priority == "medium"
assert ticket.category is None
2. Use fixtures for setup:
# Good
@pytest.fixture
def authenticated_user(db_session):
user = User(email="auth@example.com", role="agent")
db_session.add(user)
db_session.commit()
return user
def test_authenticated_endpoint(authenticated_user, api_client):
response = api_client.get("/api/tickets", user=authenticated_user)
assert response.status_code == 200
# Less ideal - setup in test
def test_authenticated_endpoint(api_client):
user = User(email="auth@example.com", role="agent")
# ... more setup ...
response = api_client.get("/api/tickets", user=user)
assert response.status_code == 200
3. Mock external dependencies:
def test_email_notification_on_ticket_creation(mocker):
"""Always mock external services like email."""
mock_send = mocker.patch('app.services.email.send_email')
mock_send.return_value = {"status": "sent"}
ticket = create_ticket(title="Test", customer_email="user@example.com")
mock_send.assert_called_once()
assert ticket.id is not None
Tests affecting each other due to shared database state.
Solution: Use transaction rollback in fixtures.
@pytest.fixture
def db_session(db_engine):
"""Isolate database state per test with rollback."""
connection = db_engine.connect()
transaction = connection.begin()
session = sessionmaker(bind=connection)()
yield session
session.close()
transaction.rollback() # Rollback all changes
connection.close()
Forgetting to mark async tests or improper event loop handling.
Solution: Always use @pytest.mark.asyncio and await coroutines.
# Wrong - test will hang
async def test_async_operation():
result = await async_function()
assert result is not None
# Correct
@pytest.mark.asyncio
async def test_async_operation():
result = await async_function()
assert result is not None
Session-scoped fixtures with mutable state causing test pollution.
Solution: Use appropriate scope and reset state.
# Problematic
@pytest.fixture(scope="session")
def user_cache():
return {} # Shared dict across all tests
# Better
@pytest.fixture(scope="function")
def user_cache():
return {} # New dict per test
# Or reset state
@pytest.fixture(scope="session")
def user_cache():
cache = {}
yield cache
cache.clear() # Clean up after each test
Mocking too much makes tests meaningless.
Solution: Only mock external boundaries.
# Over-mocked - testing nothing real
def test_ticket_creation(mocker):
mocker.patch('app.services.create_ticket', return_value=Ticket(id=1))
ticket = create_ticket(title="Test")
assert ticket.id == 1 # Just testing the mock
# Better - test actual logic
def test_ticket_creation(db_session):
ticket = create_ticket_service(db_session, title="Test", priority="high")
assert ticket.id is not None
assert ticket.status == "open"
assert ticket.priority == "high"
Tests taking too long to run.
Solution: Use appropriate markers and parallel execution.
# Run only fast tests
pytest -m "not slow"
# Run tests in parallel
pytest -n auto # Requires pytest-xdist
# Run specific test file
后端测试指南:API端点、业务逻辑与数据库测试最佳实践
11,800 周安装