Async Testing Expert by rafaelkamimura/claude-tools
npx skills add https://github.com/rafaelkamimura/claude-tools --skill 'Async Testing Expert'基于一个包含 387 个测试的 FastAPI 后端测试套件的生产模式,提供使用 pytest 编写全面异步 Python 测试的专业指导。
在以下场景激活此技能:
tests/
├── conftest.py # 共享夹具(app、client、event_loop、faker)
├── fakes.py # 可重用的模拟对象(FakeConnection、FakeRecord)
├── test_<module>_dao.py # DAO 层测试
├── test_<module>_service.py # 服务层测试
├── test_<module>_router.py # API 端点测试
└── test_<module>_dto.py # DTO 验证测试
test_<module>_<layer>.pytest_<what>_<scenario>(例如:test_create_calls_execute、)广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
test_fetch_by_id_error_maps_to_500async def test_fetch_user_success(faker: Faker) -> None:
user_id: int = faker.random_int(1, 100)
conn: FakeConnection = FakeConnection()
# ...
import asyncio
import pytest
from fastapi.testclient import TestClient
from httpx import AsyncClient, ASGITransport
from faker import Faker
@pytest.fixture(scope='session')
def app():
"""创建用于测试的 FastAPI 应用实例。"""
from src.config.factory import create_app
return create_app()
@pytest.fixture(scope='session')
def client(app):
"""为 FastAPI 提供同步的 TestClient。"""
with TestClient(app) as c:
yield c
@pytest.fixture
async def async_client(app):
"""使用 ASGI 传输为 FastAPI 提供异步的 AsyncClient。"""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url='http://test') as ac:
yield ac
@pytest.fixture
def event_loop():
"""为每个测试创建新的事件循环。"""
loop = asyncio.new_event_loop()
yield loop
loop.close()
@pytest.fixture
def faker():
"""提供配置为巴西葡萄牙语的 Faker 实例。"""
return Faker('pt_BR') # 根据需要调整区域设置
class FakeRecord:
"""模拟具有 .result() 方法和可选 rowcount 的数据库记录。"""
def __init__(self, data, rowcount=None):
self._data = data
self.rowcount = rowcount if rowcount is not None else (
data if isinstance(data, int) else 1
)
def result(self):
return self._data
class FakeConnection:
"""模拟具有 execute、fetch、fetch_val 和 fetch_row 方法的 psqlpy/asyncpg 连接。"""
def __init__(self):
self.execute_return = None
self.fetch_return = None
self.fetch_row_return = None
self.fetch_val_return = None
self.execute_calls = []
self.fetch_calls = []
self.fetch_val_calls = []
def transaction(self):
return FakeTransactionContext(self)
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
return False
async def execute(self, stmt, parameters=None):
self.execute_calls.append((stmt, parameters))
if isinstance(self.execute_return, Exception):
raise self.execute_return
# 支持多次 execute 调用的返回值列表
if (isinstance(self.execute_return, list) and
len(self.execute_return) > 0 and
all(isinstance(item, list) for item in self.execute_return)):
return FakeRecord(self.execute_return.pop(0))
return FakeRecord(self.execute_return)
async def execute_many(self, stmt, parameters_list=None):
"""模拟批量操作的 execute_many。"""
if parameters_list is None:
parameters_list = []
self.execute_calls.append((stmt, parameters_list))
if isinstance(self.execute_return, Exception):
raise self.execute_return
total_rows = len(parameters_list) if parameters_list else 0
return FakeRecord(data=total_rows, rowcount=total_rows)
async def fetch(self, stmt, parameters=None):
self.fetch_calls.append((stmt, parameters))
return FakeRecord(self.fetch_return)
async def fetch_val(self, stmt, parameters=None):
self.fetch_val_calls.append((stmt, parameters))
if isinstance(self.fetch_val_return, Exception):
raise self.fetch_val_return
return self.fetch_val_return
async def fetch_row(self, stmt, parameters=None):
"""模拟获取单行数据。"""
self.fetch_calls.append((stmt, parameters))
if isinstance(self.fetch_row_return, Exception):
raise self.fetch_row_return
if self.fetch_row_return is not None:
return self.fetch_row_return
if isinstance(self.fetch_return, list) and len(self.fetch_return) > 0:
return FakeRecord(self.fetch_return.pop(0))
return FakeRecord(self.fetch_return)
class FakeTransaction:
"""模拟数据库事务上下文。"""
def __init__(self, connection):
self.connection = connection
async def execute(self, stmt, parameters=None):
return await self.connection.execute(stmt, parameters)
async def execute_many(self, stmt, parameters_list=None, parameters=None):
"""模拟 execute_many - 如果可用,则委托给连接的 execute_many。"""
params = parameters if parameters is not None else parameters_list
if hasattr(self.connection, 'execute_many'):
return await self.connection.execute_many(stmt, params)
# 回退方案:通过为每个参数集调用 execute 来模拟
if params is None:
params = []
results = []
for param_set in params:
result = await self.connection.execute(stmt, param_set)
results.append(result)
if results:
total_rowcount = sum(getattr(r, 'rowcount', 0) for r in results)
return FakeRecord(data=total_rowcount, rowcount=total_rowcount)
else:
return FakeRecord(data=0, rowcount=0)
async def fetch(self, stmt, parameters=None):
return await self.connection.fetch(stmt, parameters)
async def fetch_row(self, stmt, parameters=None):
return await self.connection.fetch_row(stmt, parameters)
async def fetch_val(self, stmt, parameters=None):
return await self.connection.fetch_val(stmt, parameters)
class FakeTransactionContext:
"""模拟由 conn.transaction() 返回的事务上下文管理器。"""
def __init__(self, connection):
self.connection = connection
self.transaction = FakeTransaction(connection)
async def __aenter__(self):
return self.transaction
async def __aexit__(self, exc_type, exc, tb):
return False
使用 __wrapped__ 绕过连接装饰器:
@pytest.mark.asyncio
async def test_create_calls_execute(faker):
"""测试 create 方法是否使用正确的 SQL 和参数调用 execute。"""
# 准备:准备测试数据
create_dto = UserDTO.Create(
name=faker.name(),
email=faker.email(),
cpf=faker.ssn()
)
conn = FakeConnection()
# 执行:使用 __wrapped__ 直接调用 DAO 方法
await UserDAO.create.__wrapped__(conn, create_dto)
# 断言:验证是否使用正确的 SQL 调用了 execute
assert len(conn.execute_calls) == 1
stmt, params = conn.execute_calls[0]
assert 'INSERT INTO users' in stmt
assert isinstance(params, list)
assert len(params) == len(create_dto.model_dump())
@pytest.mark.asyncio
async def test_fetch_by_id_error_maps_to_500():
"""测试数据库错误是否被正确映射到 DAOException。"""
conn = FakeConnection()
async def broken_fetch_row(stmt, parameters=None):
raise RustPSQLDriverPyBaseError('db fail')
conn.fetch_row = broken_fetch_row
with pytest.raises(DAOException) as exc:
await UserDAO.fetch_by_id.__wrapped__(conn, 1)
err = exc.value
assert err.status_code == 500
assert 'Erro ao buscar' in err.detail
为隔离测试创建虚拟依赖项:
class DummyUserAdapter:
"""用于测试服务层的模拟适配器。"""
def __init__(self, users):
self.users = users
self.called = False
async def get_users_by_permission(self, _permission_id, _auth_header, _permission_scope):
self.called = True
return self.users
class DummyUserDAO:
"""用于测试服务层的模拟 DAO。"""
def __init__(self):
self.fetch_called = False
self.create_called = False
async def fetch_all(self):
self.fetch_called = True
return [UserDTO.Read(id=1, name='Test User', email='test@example.com')]
async def create(self, dto):
self.create_called = (dto,)
@pytest.mark.asyncio
async def test_service_coordinates_dao_and_adapter():
"""测试服务是否在 DAO 和适配器之间正确协调。"""
adapter = DummyUserAdapter([])
dao = DummyUserDAO()
service = UserService(user_adapter=adapter, user_dao=dao)
result = await service.get_all_users()
assert dao.fetch_called
assert isinstance(result[0], UserDTO.Read)
@pytest.mark.asyncio
async def test_assign_with_dal_connection(monkeypatch, faker):
"""测试使用 DAL 连接包装器的方法。"""
from src.domain.dal import DAL
conn = FakeConnection()
# Monkeypatch 连接获取
async def fake_get_connection(cls):
return conn
monkeypatch.setattr(DAL, '_DAL__get_connection', classmethod(fake_get_connection))
# 存根其他依赖项
async def fake_verify_scope(id_, scope_type):
return None
monkeypatch.setattr(UserDAO, '_verify_scope', fake_verify_scope)
# 准备测试数据
dto = UserDTO.Assign(user_id=1, role_id=2)
# 调用实际的 DAO 方法(非 __wrapped__)
await UserDAO.assign(10, dto)
# 验证执行
assert len(conn.execute_calls) > 0
@pytest.mark.asyncio
async def test_sync_calls_execute_many(faker):
"""测试批量同步是否使用 execute_many 以提高效率。"""
items = [
UserDTO.Create(name=faker.name(), email=faker.email())
for _ in range(3)
]
conn = FakeConnection()
executed = []
async def fake_execute_many(stmt, parameters=None, **kwargs):
params = parameters if parameters is not None else kwargs.get('parameters_list')
executed.append((stmt, params))
# 修补事务的 execute_many
original_transaction = conn.transaction
async def patched_transaction():
t = await original_transaction().__aenter__()
t.execute_many = fake_execute_many
return t
class PatchedTransactionContext:
async def __aenter__(self):
return await patched_transaction()
async def __aexit__(self, exc_type, exc, tb):
return False
conn.transaction = lambda: PatchedTransactionContext()
await UserDAO.sync.__wrapped__(conn, items)
# 验证批量执行
assert len(executed) == 1
stmt, params = executed[0]
assert 'INSERT INTO users' in stmt
assert len(params[0]) == len(items)
@pytest.mark.asyncio
async def test_get_users_endpoint(async_client, monkeypatch):
"""测试 GET /users 端点是否返回正确的响应。"""
# 模拟服务层
async def mock_get_users():
return [UserDTO.Read(id=1, name='Test', email='test@example.com')]
monkeypatch.setattr('src.api.path.users.UserService.get_all', mock_get_users)
# 发起请求
response = await async_client.get('/users')
# 断言响应
assert response.status_code == 200
data = response.json()
assert len(data) == 1
assert data[0]['name'] == 'Test'
@pytest.mark.asyncio
async def test_multiple_queries_with_different_results(faker):
"""测试使用不同预期结果进行多次查询的方法。"""
conn = FakeConnection()
# 设置多个返回值(将按顺序弹出)
conn.execute_return = [
[{'id': 1, 'status': 'pending'}], # 第一次查询
[{'id': 2, 'status': 'approved'}] # 第二次查询
]
# 第一次调用获取第一个结果
result1 = await UserDAO.some_method.__wrapped__(conn, 1)
assert result1[0]['status'] == 'pending'
# 第二次调用获取第二个结果
result2 = await UserDAO.some_method.__wrapped__(conn, 2)
assert result2[0]['status'] == 'approved'
@pytest.mark.asyncio
@pytest.mark.parametrize('status,expected_count', [
('pending', 5),
('approved', 3),
('rejected', 2),
])
async def test_count_by_status(status, expected_count):
"""测试按不同状态值统计用户数。"""
conn = FakeConnection()
conn.fetch_val_return = expected_count
result = await UserDAO.count_by_status.__wrapped__(conn, status)
assert result == expected_count
assert len(conn.fetch_val_calls) == 1
test_<action>_<scenario>pytest tests/test_your_module.py -vpytest --cov=src/domain/dao/your_module tests/test_your_module.py__wrapped__:直接测试 DAO 方法时,需要绕过装饰器scope='session'scope='function'(默认)pytest -x 在首次失败时停止pytest tests/test_dao.py# 运行所有测试并计算覆盖率
pytest --cov=src --cov-report=html --cov-report=term
# 仅运行单元测试(快速)
pytest tests/ -m "not integration"
# 运行并显示详细输出
pytest -v --tb=short
# 运行特定的测试文件
pytest tests/test_user_dao.py -v
# 运行匹配模式的测试
pytest -k "test_create" -v
"""UserDAO 数据访问层的测试。"""
from datetime import datetime
import pytest
from src.domain.dal.dao.user import UserDAO
from src.domain.dal.dao.exception import DAOException
from src.domain.dto.user import UserDTO
from tests.fakes import FakeConnection, FakeRecord
@pytest.mark.asyncio
async def test_create_inserts_user(faker):
"""测试 create 方法是否使用正确的参数插入用户。"""
create_dto = UserDTO.Create(
name=faker.name(),
email=faker.email(),
cpf=faker.ssn()
)
conn = FakeConnection()
await UserDAO.create.__wrapped__(conn, create_dto)
assert len(conn.execute_calls) == 1
stmt, params = conn.execute_calls[0]
assert 'INSERT INTO users' in stmt
assert params[0] == create_dto.name
@pytest.mark.asyncio
async def test_fetch_by_id_returns_user(faker):
"""测试 fetch_by_id 是否返回正确格式的 UserDTO。"""
fake_row = {
'id': faker.random_int(1, 100),
'name': faker.name(),
'email': faker.email(),
'created_at': faker.date_time()
}
conn = FakeConnection()
conn.fetch_row_return = FakeRecord(fake_row)
result = await UserDAO.fetch_by_id.__wrapped__(conn, fake_row['id'])
assert result.id == fake_row['id']
assert result.name == fake_row['name']
assert isinstance(result, UserDTO.Read)
@pytest.mark.asyncio
async def test_fetch_by_id_raises_on_db_error():
"""测试数据库错误是否被正确处理和映射。"""
conn = FakeConnection()
async def broken_fetch_row(stmt, parameters=None):
raise Exception('Connection lost')
conn.fetch_row = broken_fetch_row
with pytest.raises(DAOException) as exc:
await UserDAO.fetch_by_id.__wrapped__(conn, 1)
assert exc.value.status_code == 500
# 运行单个测试
pytest tests/test_user_dao.py::test_create_inserts_user -v
# 运行文件中的所有测试
pytest tests/test_user_dao.py -v
# 为特定模块运行测试并计算覆盖率
pytest --cov=src/domain/dao/user tests/test_user_dao.py
# 在首次失败时停止
pytest -x tests/
# 失败时显示局部变量
pytest --showlocals tests/
# 运行上次失败的测试
pytest --lf tests/
此技能提供了经过生产验证的异步 Python 测试模式:
如有疑问,请遵循 "Arrange-Act-Assert" 模式,并始终验证正常路径和错误场景。
每周安装次数
–
代码仓库
GitHub 星标数
5
首次出现时间
–
安全审计
Expert guidance for writing comprehensive async Python tests using pytest, based on production patterns from a 387-test FastAPI backend test suite.
Activate this skill when:
tests/
├── conftest.py # Shared fixtures (app, client, event_loop, faker)
├── fakes.py # Reusable mock objects (FakeConnection, FakeRecord)
├── test_<module>_dao.py # DAO layer tests
├── test_<module>_service.py # Service layer tests
├── test_<module>_router.py # API endpoint tests
└── test_<module>_dto.py # DTO validation tests
test_<module>_<layer>.pytest_<what>_<scenario> (e.g., test_create_calls_execute, test_fetch_by_id_error_maps_to_500)async def test_fetch_user_success(faker: Faker) -> None:
user_id: int = faker.random_int(1, 100)
conn: FakeConnection = FakeConnection()
# ...
import asyncio
import pytest
from fastapi.testclient import TestClient
from httpx import AsyncClient, ASGITransport
from faker import Faker
@pytest.fixture(scope='session')
def app():
"""Create a FastAPI app instance for testing."""
from src.config.factory import create_app
return create_app()
@pytest.fixture(scope='session')
def client(app):
"""Provides a synchronous TestClient for FastAPI."""
with TestClient(app) as c:
yield c
@pytest.fixture
async def async_client(app):
"""Provides an asynchronous AsyncClient for FastAPI using ASGI transport."""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url='http://test') as ac:
yield ac
@pytest.fixture
def event_loop():
"""Create a new event loop for each test."""
loop = asyncio.new_event_loop()
yield loop
loop.close()
@pytest.fixture
def faker():
"""Provide a Faker instance configured for Brazilian Portuguese."""
return Faker('pt_BR') # Adjust locale as needed
class FakeRecord:
"""Simulate a database record with a .result() method and optional rowcount."""
def __init__(self, data, rowcount=None):
self._data = data
self.rowcount = rowcount if rowcount is not None else (
data if isinstance(data, int) else 1
)
def result(self):
return self._data
class FakeConnection:
"""Simulate a psqlpy/asyncpg Connection with execute, fetch, fetch_val, and fetch_row."""
def __init__(self):
self.execute_return = None
self.fetch_return = None
self.fetch_row_return = None
self.fetch_val_return = None
self.execute_calls = []
self.fetch_calls = []
self.fetch_val_calls = []
def transaction(self):
return FakeTransactionContext(self)
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
return False
async def execute(self, stmt, parameters=None):
self.execute_calls.append((stmt, parameters))
if isinstance(self.execute_return, Exception):
raise self.execute_return
# Support list of return values for multiple execute calls
if (isinstance(self.execute_return, list) and
len(self.execute_return) > 0 and
all(isinstance(item, list) for item in self.execute_return)):
return FakeRecord(self.execute_return.pop(0))
return FakeRecord(self.execute_return)
async def execute_many(self, stmt, parameters_list=None):
"""Simulate execute_many for bulk operations."""
if parameters_list is None:
parameters_list = []
self.execute_calls.append((stmt, parameters_list))
if isinstance(self.execute_return, Exception):
raise self.execute_return
total_rows = len(parameters_list) if parameters_list else 0
return FakeRecord(data=total_rows, rowcount=total_rows)
async def fetch(self, stmt, parameters=None):
self.fetch_calls.append((stmt, parameters))
return FakeRecord(self.fetch_return)
async def fetch_val(self, stmt, parameters=None):
self.fetch_val_calls.append((stmt, parameters))
if isinstance(self.fetch_val_return, Exception):
raise self.fetch_val_return
return self.fetch_val_return
async def fetch_row(self, stmt, parameters=None):
"""Simulate fetching a single row."""
self.fetch_calls.append((stmt, parameters))
if isinstance(self.fetch_row_return, Exception):
raise self.fetch_row_return
if self.fetch_row_return is not None:
return self.fetch_row_return
if isinstance(self.fetch_return, list) and len(self.fetch_return) > 0:
return FakeRecord(self.fetch_return.pop(0))
return FakeRecord(self.fetch_return)
class FakeTransaction:
"""Simulate a database transaction context."""
def __init__(self, connection):
self.connection = connection
async def execute(self, stmt, parameters=None):
return await self.connection.execute(stmt, parameters)
async def execute_many(self, stmt, parameters_list=None, parameters=None):
"""Simulate execute_many - delegate to connection's execute_many if available."""
params = parameters if parameters is not None else parameters_list
if hasattr(self.connection, 'execute_many'):
return await self.connection.execute_many(stmt, params)
# Fallback: simulate by calling execute for each parameter set
if params is None:
params = []
results = []
for param_set in params:
result = await self.connection.execute(stmt, param_set)
results.append(result)
if results:
total_rowcount = sum(getattr(r, 'rowcount', 0) for r in results)
return FakeRecord(data=total_rowcount, rowcount=total_rowcount)
else:
return FakeRecord(data=0, rowcount=0)
async def fetch(self, stmt, parameters=None):
return await self.connection.fetch(stmt, parameters)
async def fetch_row(self, stmt, parameters=None):
return await self.connection.fetch_row(stmt, parameters)
async def fetch_val(self, stmt, parameters=None):
return await self.connection.fetch_val(stmt, parameters)
class FakeTransactionContext:
"""Simulate the transaction context manager returned by conn.transaction()."""
def __init__(self, connection):
self.connection = connection
self.transaction = FakeTransaction(connection)
async def __aenter__(self):
return self.transaction
async def __aexit__(self, exc_type, exc, tb):
return False
Use__wrapped__ to bypass connection decorators:
@pytest.mark.asyncio
async def test_create_calls_execute(faker):
"""Test that create method calls execute with correct SQL and parameters."""
# Arrange: Prepare test data
create_dto = UserDTO.Create(
name=faker.name(),
email=faker.email(),
cpf=faker.ssn()
)
conn = FakeConnection()
# Act: Call DAO method directly with __wrapped__
await UserDAO.create.__wrapped__(conn, create_dto)
# Assert: Verify execute was called with correct SQL
assert len(conn.execute_calls) == 1
stmt, params = conn.execute_calls[0]
assert 'INSERT INTO users' in stmt
assert isinstance(params, list)
assert len(params) == len(create_dto.model_dump())
@pytest.mark.asyncio
async def test_fetch_by_id_error_maps_to_500():
"""Test that database errors are properly mapped to DAOException."""
conn = FakeConnection()
async def broken_fetch_row(stmt, parameters=None):
raise RustPSQLDriverPyBaseError('db fail')
conn.fetch_row = broken_fetch_row
with pytest.raises(DAOException) as exc:
await UserDAO.fetch_by_id.__wrapped__(conn, 1)
err = exc.value
assert err.status_code == 500
assert 'Erro ao buscar' in err.detail
Create dummy dependencies for isolated testing:
class DummyUserAdapter:
"""Mock adapter for testing service layer."""
def __init__(self, users):
self.users = users
self.called = False
async def get_users_by_permission(self, _permission_id, _auth_header, _permission_scope):
self.called = True
return self.users
class DummyUserDAO:
"""Mock DAO for testing service layer."""
def __init__(self):
self.fetch_called = False
self.create_called = False
async def fetch_all(self):
self.fetch_called = True
return [UserDTO.Read(id=1, name='Test User', email='test@example.com')]
async def create(self, dto):
self.create_called = (dto,)
@pytest.mark.asyncio
async def test_service_coordinates_dao_and_adapter():
"""Test that service properly coordinates between DAO and adapter."""
adapter = DummyUserAdapter([])
dao = DummyUserDAO()
service = UserService(user_adapter=adapter, user_dao=dao)
result = await service.get_all_users()
assert dao.fetch_called
assert isinstance(result[0], UserDTO.Read)
@pytest.mark.asyncio
async def test_assign_with_dal_connection(monkeypatch, faker):
"""Test method that uses DAL connection wrapper."""
from src.domain.dal import DAL
conn = FakeConnection()
# Monkeypatch connection acquisition
async def fake_get_connection(cls):
return conn
monkeypatch.setattr(DAL, '_DAL__get_connection', classmethod(fake_get_connection))
# Stub other dependencies
async def fake_verify_scope(id_, scope_type):
return None
monkeypatch.setattr(UserDAO, '_verify_scope', fake_verify_scope)
# Prepare test data
dto = UserDTO.Assign(user_id=1, role_id=2)
# Call the actual DAO method (not __wrapped__)
await UserDAO.assign(10, dto)
# Verify execution
assert len(conn.execute_calls) > 0
@pytest.mark.asyncio
async def test_sync_calls_execute_many(faker):
"""Test that bulk sync uses execute_many for efficiency."""
items = [
UserDTO.Create(name=faker.name(), email=faker.email())
for _ in range(3)
]
conn = FakeConnection()
executed = []
async def fake_execute_many(stmt, parameters=None, **kwargs):
params = parameters if parameters is not None else kwargs.get('parameters_list')
executed.append((stmt, params))
# Patch transaction's execute_many
original_transaction = conn.transaction
async def patched_transaction():
t = await original_transaction().__aenter__()
t.execute_many = fake_execute_many
return t
class PatchedTransactionContext:
async def __aenter__(self):
return await patched_transaction()
async def __aexit__(self, exc_type, exc, tb):
return False
conn.transaction = lambda: PatchedTransactionContext()
await UserDAO.sync.__wrapped__(conn, items)
# Verify batch execution
assert len(executed) == 1
stmt, params = executed[0]
assert 'INSERT INTO users' in stmt
assert len(params[0]) == len(items)
@pytest.mark.asyncio
async def test_get_users_endpoint(async_client, monkeypatch):
"""Test GET /users endpoint returns proper response."""
# Mock the service layer
async def mock_get_users():
return [UserDTO.Read(id=1, name='Test', email='test@example.com')]
monkeypatch.setattr('src.api.path.users.UserService.get_all', mock_get_users)
# Make request
response = await async_client.get('/users')
# Assert response
assert response.status_code == 200
data = response.json()
assert len(data) == 1
assert data[0]['name'] == 'Test'
@pytest.mark.asyncio
async def test_multiple_queries_with_different_results(faker):
"""Test method that makes multiple queries with different expected results."""
conn = FakeConnection()
# Set up multiple return values (will be popped in order)
conn.execute_return = [
[{'id': 1, 'status': 'pending'}], # First query
[{'id': 2, 'status': 'approved'}] # Second query
]
# First call gets first result
result1 = await UserDAO.some_method.__wrapped__(conn, 1)
assert result1[0]['status'] == 'pending'
# Second call gets second result
result2 = await UserDAO.some_method.__wrapped__(conn, 2)
assert result2[0]['status'] == 'approved'
@pytest.mark.asyncio
@pytest.mark.parametrize('status,expected_count', [
('pending', 5),
('approved', 3),
('rejected', 2),
])
async def test_count_by_status(status, expected_count):
"""Test counting users by different status values."""
conn = FakeConnection()
conn.fetch_val_return = expected_count
result = await UserDAO.count_by_status.__wrapped__(conn, status)
assert result == expected_count
assert len(conn.fetch_val_calls) == 1
test_<action>_<scenario>pytest tests/test_your_module.py -vpytest --cov=src/domain/dao/your_module tests/test_your_module.pyscope='session' for expensive fixtures (app creation)scope='function' (default) for mutable fixturespytest -x to stop on first failure during developmentpytest tests/test_dao.py# Run all tests with coverage
pytest --cov=src --cov-report=html --cov-report=term
# Run only unit tests (fast)
pytest tests/ -m "not integration"
# Run with verbose output
pytest -v --tb=short
# Run specific test file
pytest tests/test_user_dao.py -v
# Run tests matching pattern
pytest -k "test_create" -v
"""Tests for UserDAO database access layer."""
from datetime import datetime
import pytest
from src.domain.dal.dao.user import UserDAO
from src.domain.dal.dao.exception import DAOException
from src.domain.dto.user import UserDTO
from tests.fakes import FakeConnection, FakeRecord
@pytest.mark.asyncio
async def test_create_inserts_user(faker):
"""Test that create method inserts user with correct parameters."""
create_dto = UserDTO.Create(
name=faker.name(),
email=faker.email(),
cpf=faker.ssn()
)
conn = FakeConnection()
await UserDAO.create.__wrapped__(conn, create_dto)
assert len(conn.execute_calls) == 1
stmt, params = conn.execute_calls[0]
assert 'INSERT INTO users' in stmt
assert params[0] == create_dto.name
@pytest.mark.asyncio
async def test_fetch_by_id_returns_user(faker):
"""Test that fetch_by_id returns properly formatted UserDTO."""
fake_row = {
'id': faker.random_int(1, 100),
'name': faker.name(),
'email': faker.email(),
'created_at': faker.date_time()
}
conn = FakeConnection()
conn.fetch_row_return = FakeRecord(fake_row)
result = await UserDAO.fetch_by_id.__wrapped__(conn, fake_row['id'])
assert result.id == fake_row['id']
assert result.name == fake_row['name']
assert isinstance(result, UserDTO.Read)
@pytest.mark.asyncio
async def test_fetch_by_id_raises_on_db_error():
"""Test that database errors are properly handled and mapped."""
conn = FakeConnection()
async def broken_fetch_row(stmt, parameters=None):
raise Exception('Connection lost')
conn.fetch_row = broken_fetch_row
with pytest.raises(DAOException) as exc:
await UserDAO.fetch_by_id.__wrapped__(conn, 1)
assert exc.value.status_code == 500
# Run single test
pytest tests/test_user_dao.py::test_create_inserts_user -v
# Run all tests in file
pytest tests/test_user_dao.py -v
# Run with coverage for specific module
pytest --cov=src/domain/dao/user tests/test_user_dao.py
# Stop on first failure
pytest -x tests/
# Show local variables on failure
pytest --showlocals tests/
# Run last failed tests
pytest --lf tests/
This skill provides production-proven patterns for async Python testing:
When in doubt, follow the "Arrange-Act-Assert" pattern and always verify both the happy path and error scenarios.
Weekly Installs
–
Repository
GitHub Stars
5
First Seen
–
Security Audits
lark-cli 共享规则:飞书资源操作指南与权限配置详解
19,900 周安装