python-resource-management by wshobson/agents
npx skills add https://github.com/wshobson/agents --skill python-resource-management使用上下文管理器确定性管理资源。即使发生异常,也应可靠释放数据库连接、文件句柄和网络套接字等资源。
with 语句确保资源自动释放,即使在发生异常时也是如此。
同步资源管理使用 __enter__/__exit__,异步资源管理使用 __aenter__/__aexit__。
无论是否发生异常,__exit__ 始终运行。
从 返回 以抑制异常,返回 以传播异常。
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
__exit__TrueFalsefrom contextlib import contextmanager
@contextmanager
def managed_resource():
resource = acquire_resource()
try:
yield resource
finally:
resource.cleanup()
with managed_resource() as r:
r.do_work()
为复杂资源实现上下文管理器协议。
class DatabaseConnection:
"""具有自动清理功能的数据库连接。"""
def __init__(self, dsn: str) -> None:
self._dsn = dsn
self._conn: Connection | None = None
def connect(self) -> None:
"""建立数据库连接。"""
self._conn = psycopg.connect(self._dsn)
def close(self) -> None:
"""如果连接打开则关闭它。"""
if self._conn is not None:
self._conn.close()
self._conn = None
def __enter__(self) -> "DatabaseConnection":
"""进入上下文:连接并返回自身。"""
self.connect()
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""退出上下文:始终关闭连接。"""
self.close()
# 使用上下文管理器(推荐)
with DatabaseConnection(dsn) as db:
result = db.execute(query)
# 需要时进行手动管理
db = DatabaseConnection(dsn)
db.connect()
try:
result = db.execute(query)
finally:
db.close()
对于异步资源,实现异步协议。
class AsyncDatabasePool:
"""异步数据库连接池。"""
def __init__(self, dsn: str, min_size: int = 1, max_size: int = 10) -> None:
self._dsn = dsn
self._min_size = min_size
self._max_size = max_size
self._pool: asyncpg.Pool | None = None
async def __aenter__(self) -> "AsyncDatabasePool":
"""创建连接池。"""
self._pool = await asyncpg.create_pool(
self._dsn,
min_size=self._min_size,
max_size=self._max_size,
)
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""关闭池中的所有连接。"""
if self._pool is not None:
await self._pool.close()
async def execute(self, query: str, *args) -> list[dict]:
"""使用池化连接执行查询。"""
async with self._pool.acquire() as conn:
return await conn.fetch(query, *args)
# 用法
async with AsyncDatabasePool(dsn) as pool:
users = await pool.execute("SELECT * FROM users WHERE active = $1", True)
对于简单情况,使用装饰器简化上下文管理器。
from contextlib import contextmanager, asynccontextmanager
import time
import structlog
logger = structlog.get_logger()
@contextmanager
def timed_block(name: str):
"""计时代码块的执行时间。"""
start = time.perf_counter()
try:
yield
finally:
elapsed = time.perf_counter() - start
logger.info(f"{name} completed", duration_seconds=round(elapsed, 3))
# 用法
with timed_block("data_processing"):
process_large_dataset()
@asynccontextmanager
async def database_transaction(conn: AsyncConnection):
"""管理数据库事务。"""
await conn.execute("BEGIN")
try:
yield conn
await conn.execute("COMMIT")
except Exception:
await conn.execute("ROLLBACK")
raise
# 用法
async with database_transaction(conn) as tx:
await tx.execute("INSERT INTO users ...")
await tx.execute("INSERT INTO audit_log ...")
始终在 __exit__ 中清理资源,无论是否发生异常。
class FileProcessor:
"""保证清理的文件处理器。"""
def __init__(self, path: str) -> None:
self._path = path
self._file: IO | None = None
self._temp_files: list[Path] = []
def __enter__(self) -> "FileProcessor":
self._file = open(self._path, "r")
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""无条件清理所有资源。"""
# 关闭主文件
if self._file is not None:
self._file.close()
# 清理任何临时文件
for temp_file in self._temp_files:
try:
temp_file.unlink()
except OSError:
pass # 尽力清理
# 返回 None/False 以传播任何异常
仅抑制特定的、有文档记录的异常。
class StreamWriter:
"""优雅处理管道断裂的写入器。"""
def __init__(self, stream) -> None:
self._stream = stream
def __enter__(self) -> "StreamWriter":
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> bool:
"""清理,在关闭时抑制 BrokenPipeError。"""
self._stream.close()
# 抑制 BrokenPipeError(客户端断开连接)
# 这是预期行为,不是错误
if exc_type is BrokenPipeError:
return True # 异常被抑制
return False # 传播所有其他异常
在流式处理期间维护增量块和累积状态。
from collections.abc import Generator
from dataclasses import dataclass, field
@dataclass
class StreamingResult:
"""累积的流式处理结果。"""
chunks: list[str] = field(default_factory=list)
_finalized: bool = False
@property
def content(self) -> str:
"""获取累积内容。"""
return "".join(self.chunks)
def add_chunk(self, chunk: str) -> None:
"""将块添加到累加器。"""
if self._finalized:
raise RuntimeError("Cannot add to finalized result")
self.chunks.append(chunk)
def finalize(self) -> str:
"""标记流完成并返回内容。"""
self._finalized = True
return self.content
def stream_with_accumulation(
response: StreamingResponse,
) -> Generator[tuple[str, str], None, str]:
"""流式处理响应同时累积内容。
Yields:
每个块的(累积内容,新块)元组。
Returns:
最终累积内容。
"""
result = StreamingResult()
for chunk in response.iter_content():
result.add_chunk(chunk)
yield result.content, chunk
return result.finalize()
避免累积时出现 O(n²) 的字符串连接。
def accumulate_stream(stream) -> str:
"""高效累积流内容。"""
# 不好:由于字符串不可变性导致 O(n²)
# content = ""
# for chunk in stream:
# content += chunk # 每次创建新字符串
# 好:使用列表和 join 实现 O(n)
chunks: list[str] = []
for chunk in stream:
chunks.append(chunk)
return "".join(chunks) # 单次分配
测量首字节时间和总流式处理时间。
import time
from collections.abc import Generator
def stream_with_metrics(
response: StreamingResponse,
) -> Generator[str, None, dict]:
"""流式处理响应同时收集指标。
Yields:
内容块。
Returns:
指标字典。
"""
start = time.perf_counter()
first_chunk_time: float | None = None
chunk_count = 0
total_bytes = 0
for chunk in response.iter_content():
if first_chunk_time is None:
first_chunk_time = time.perf_counter() - start
chunk_count += 1
total_bytes += len(chunk.encode())
yield chunk
total_time = time.perf_counter() - start
return {
"time_to_first_byte_ms": round((first_chunk_time or 0) * 1000, 2),
"total_time_ms": round(total_time * 1000, 2),
"chunk_count": chunk_count,
"total_bytes": total_bytes,
}
干净地处理动态数量的资源。
from contextlib import ExitStack, AsyncExitStack
from pathlib import Path
def process_files(paths: list[Path]) -> list[str]:
"""处理多个文件并自动清理。"""
results = []
with ExitStack() as stack:
# 打开所有文件 - 它们都将在块退出时关闭
files = [stack.enter_context(open(p)) for p in paths]
for f in files:
results.append(f.read())
return results
async def process_connections(hosts: list[str]) -> list[dict]:
"""处理多个异步连接。"""
results = []
async with AsyncExitStack() as stack:
connections = [
await stack.enter_async_context(connect_to_host(host))
for host in hosts
]
for conn in connections:
results.append(await conn.fetch_data())
return results
__exit__ 即使在异常时也运行Falsewith 和手动管理每周安装量
2.9K
仓库
GitHub 星标数
32.2K
首次出现
Jan 30, 2026
安全审计
安装于
gemini-cli2.3K
opencode2.3K
codex2.2K
claude-code2.2K
cursor2.1K
github-copilot2.0K
Manage resources deterministically using context managers. Resources like database connections, file handles, and network sockets should be released reliably, even when exceptions occur.
The with statement ensures resources are released automatically, even on exceptions.
__enter__/__exit__ for sync, __aenter__/__aexit__ for async resource management.
__exit__ always runs, regardless of whether an exception occurred.
Return True from __exit__ to suppress exceptions, False to propagate them.
from contextlib import contextmanager
@contextmanager
def managed_resource():
resource = acquire_resource()
try:
yield resource
finally:
resource.cleanup()
with managed_resource() as r:
r.do_work()
Implement the context manager protocol for complex resources.
class DatabaseConnection:
"""Database connection with automatic cleanup."""
def __init__(self, dsn: str) -> None:
self._dsn = dsn
self._conn: Connection | None = None
def connect(self) -> None:
"""Establish database connection."""
self._conn = psycopg.connect(self._dsn)
def close(self) -> None:
"""Close connection if open."""
if self._conn is not None:
self._conn.close()
self._conn = None
def __enter__(self) -> "DatabaseConnection":
"""Enter context: connect and return self."""
self.connect()
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""Exit context: always close connection."""
self.close()
# Usage with context manager (preferred)
with DatabaseConnection(dsn) as db:
result = db.execute(query)
# Manual management when needed
db = DatabaseConnection(dsn)
db.connect()
try:
result = db.execute(query)
finally:
db.close()
For async resources, implement the async protocol.
class AsyncDatabasePool:
"""Async database connection pool."""
def __init__(self, dsn: str, min_size: int = 1, max_size: int = 10) -> None:
self._dsn = dsn
self._min_size = min_size
self._max_size = max_size
self._pool: asyncpg.Pool | None = None
async def __aenter__(self) -> "AsyncDatabasePool":
"""Create connection pool."""
self._pool = await asyncpg.create_pool(
self._dsn,
min_size=self._min_size,
max_size=self._max_size,
)
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""Close all connections in pool."""
if self._pool is not None:
await self._pool.close()
async def execute(self, query: str, *args) -> list[dict]:
"""Execute query using pooled connection."""
async with self._pool.acquire() as conn:
return await conn.fetch(query, *args)
# Usage
async with AsyncDatabasePool(dsn) as pool:
users = await pool.execute("SELECT * FROM users WHERE active = $1", True)
Simplify context managers with the decorator for straightforward cases.
from contextlib import contextmanager, asynccontextmanager
import time
import structlog
logger = structlog.get_logger()
@contextmanager
def timed_block(name: str):
"""Time a block of code."""
start = time.perf_counter()
try:
yield
finally:
elapsed = time.perf_counter() - start
logger.info(f"{name} completed", duration_seconds=round(elapsed, 3))
# Usage
with timed_block("data_processing"):
process_large_dataset()
@asynccontextmanager
async def database_transaction(conn: AsyncConnection):
"""Manage database transaction."""
await conn.execute("BEGIN")
try:
yield conn
await conn.execute("COMMIT")
except Exception:
await conn.execute("ROLLBACK")
raise
# Usage
async with database_transaction(conn) as tx:
await tx.execute("INSERT INTO users ...")
await tx.execute("INSERT INTO audit_log ...")
Always clean up resources in __exit__, regardless of exceptions.
class FileProcessor:
"""Process file with guaranteed cleanup."""
def __init__(self, path: str) -> None:
self._path = path
self._file: IO | None = None
self._temp_files: list[Path] = []
def __enter__(self) -> "FileProcessor":
self._file = open(self._path, "r")
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""Clean up all resources unconditionally."""
# Close main file
if self._file is not None:
self._file.close()
# Clean up any temporary files
for temp_file in self._temp_files:
try:
temp_file.unlink()
except OSError:
pass # Best effort cleanup
# Return None/False to propagate any exception
Only suppress specific, documented exceptions.
class StreamWriter:
"""Writer that handles broken pipe gracefully."""
def __init__(self, stream) -> None:
self._stream = stream
def __enter__(self) -> "StreamWriter":
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> bool:
"""Clean up, suppressing BrokenPipeError on shutdown."""
self._stream.close()
# Suppress BrokenPipeError (client disconnected)
# This is expected behavior, not an error
if exc_type is BrokenPipeError:
return True # Exception suppressed
return False # Propagate all other exceptions
Maintain both incremental chunks and accumulated state during streaming.
from collections.abc import Generator
from dataclasses import dataclass, field
@dataclass
class StreamingResult:
"""Accumulated streaming result."""
chunks: list[str] = field(default_factory=list)
_finalized: bool = False
@property
def content(self) -> str:
"""Get accumulated content."""
return "".join(self.chunks)
def add_chunk(self, chunk: str) -> None:
"""Add chunk to accumulator."""
if self._finalized:
raise RuntimeError("Cannot add to finalized result")
self.chunks.append(chunk)
def finalize(self) -> str:
"""Mark stream complete and return content."""
self._finalized = True
return self.content
def stream_with_accumulation(
response: StreamingResponse,
) -> Generator[tuple[str, str], None, str]:
"""Stream response while accumulating content.
Yields:
Tuple of (accumulated_content, new_chunk) for each chunk.
Returns:
Final accumulated content.
"""
result = StreamingResult()
for chunk in response.iter_content():
result.add_chunk(chunk)
yield result.content, chunk
return result.finalize()
Avoid O(n²) string concatenation when accumulating.
def accumulate_stream(stream) -> str:
"""Efficiently accumulate stream content."""
# BAD: O(n²) due to string immutability
# content = ""
# for chunk in stream:
# content += chunk # Creates new string each time
# GOOD: O(n) with list and join
chunks: list[str] = []
for chunk in stream:
chunks.append(chunk)
return "".join(chunks) # Single allocation
Measure time-to-first-byte and total streaming time.
import time
from collections.abc import Generator
def stream_with_metrics(
response: StreamingResponse,
) -> Generator[str, None, dict]:
"""Stream response while collecting metrics.
Yields:
Content chunks.
Returns:
Metrics dictionary.
"""
start = time.perf_counter()
first_chunk_time: float | None = None
chunk_count = 0
total_bytes = 0
for chunk in response.iter_content():
if first_chunk_time is None:
first_chunk_time = time.perf_counter() - start
chunk_count += 1
total_bytes += len(chunk.encode())
yield chunk
total_time = time.perf_counter() - start
return {
"time_to_first_byte_ms": round((first_chunk_time or 0) * 1000, 2),
"total_time_ms": round(total_time * 1000, 2),
"chunk_count": chunk_count,
"total_bytes": total_bytes,
}
Handle a dynamic number of resources cleanly.
from contextlib import ExitStack, AsyncExitStack
from pathlib import Path
def process_files(paths: list[Path]) -> list[str]:
"""Process multiple files with automatic cleanup."""
results = []
with ExitStack() as stack:
# Open all files - they'll all be closed when block exits
files = [stack.enter_context(open(p)) for p in paths]
for f in files:
results.append(f.read())
return results
async def process_connections(hosts: list[str]) -> list[dict]:
"""Process multiple async connections."""
results = []
async with AsyncExitStack() as stack:
connections = [
await stack.enter_async_context(connect_to_host(host))
for host in hosts
]
for conn in connections:
results.append(await conn.fetch_data())
return results
__exit__ runs even on exceptionFalse unless suppression is intentionalwith and manual managementWeekly Installs
2.9K
Repository
GitHub Stars
32.2K
First Seen
Jan 30, 2026
Security Audits
Gen Agent Trust HubFailSocketPassSnykPass
Installed on
gemini-cli2.3K
opencode2.3K
codex2.2K
claude-code2.2K
cursor2.1K
github-copilot2.0K
React 组合模式指南:Vercel 组件架构最佳实践,提升代码可维护性
102,200 周安装