alembic by manutej/luxor-claude-marketplace
npx skills add https://github.com/manutej/luxor-claude-marketplace --skill alembic本技能为在客户支持环境中使用 Alembic 管理数据库迁移提供全面指导。它涵盖了从初始设置到复杂生产部署场景的所有内容,重点在于维护数据完整性并最大限度地减少支持操作的中断时间。
Alembic 是一个用于 SQLAlchemy 的轻量级数据库迁移工具。它提供了一种通过版本控制的迁移脚本来管理数据库模式随时间变化的方法。对于客户支持系统,这意味着:
# 安装 Alembic 及 PostgreSQL 支持
pip install alembic psycopg2-binary sqlalchemy
# 或添加到 requirements.txt
alembic>=1.13.0
sqlalchemy>=2.0.0
psycopg2-binary>=2.9.0
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
# 初始化 Alembic(创建 alembic/ 目录和 alembic.ini)
alembic init alembic
# 对于多数据库支持
alembic init --template multidb alembic
这将创建:
alembic/:包含迁移脚本的目录alembic/versions/:存放各个迁移文件的地方alembic/env.py:迁移环境配置alembic.ini:Alembic 配置文件编辑 alembic.ini 以设置数据库 URL:
# 用于开发
sqlalchemy.url = postgresql://user:password@localhost/support_dev
# 用于生产(使用环境变量)
sqlalchemy.url = postgresql://%(DB_USER)s:%(DB_PASSWORD)s@%(DB_HOST)s/%(DB_NAME)s
更好的方法 - 在 env.py 中使用环境变量:
import os
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context
# 导入你的模型
from myapp.models import Base
# 这是 Alembic Config 对象
config = context.config
# 从环境变量覆盖 sqlalchemy.url
db_url = os.getenv('DATABASE_URL', 'postgresql://localhost/support_dev')
config.set_main_option('sqlalchemy.url', db_url)
# 为自动生成设置目标元数据
target_metadata = Base.metadata
当需要精确控制时,手动创建迁移:
# 创建空的迁移文件
alembic revision -m "add ticket priority column"
这将生成类似 versions/abc123_add_ticket_priority_column.py 的文件:
"""add ticket priority column
Revision ID: abc123
Revises: def456
Create Date: 2025-01-15 10:30:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers
revision = 'abc123'
down_revision = 'def456'
branch_labels = None
depends_on = None
def upgrade() -> None:
# Add priority column to tickets table
op.add_column('tickets',
sa.Column('priority', sa.String(20), nullable=True, server_default='normal')
)
# Create index for performance
op.create_index('ix_tickets_priority', 'tickets', ['priority'])
def downgrade() -> None:
# Remove index first
op.drop_index('ix_tickets_priority', 'tickets')
# Remove column
op.drop_column('tickets', 'priority')
让 Alembic 自动检测模式更改:
# 通过比较模型与数据库来生成迁移
alembic revision --autogenerate -m "add customer satisfaction table"
重要 :始终审查自动生成的迁移!它们可能会遗漏:
自动生成的迁移示例:
"""add customer satisfaction table
Revision ID: xyz789
Revises: abc123
Create Date: 2025-01-15 11:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
revision = 'xyz789'
down_revision = 'abc123'
branch_labels = None
depends_on = None
def upgrade() -> None:
# Auto-generated - review before running!
op.create_table(
'customer_satisfaction',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('ticket_id', sa.Integer(), nullable=False),
sa.Column('rating', sa.Integer(), nullable=False),
sa.Column('feedback', sa.Text(), nullable=True),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(['ticket_id'], ['tickets.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id')
)
op.create_index('ix_satisfaction_ticket_id', 'customer_satisfaction', ['ticket_id'])
op.create_index('ix_satisfaction_created_at', 'customer_satisfaction', ['created_at'])
def downgrade() -> None:
op.drop_index('ix_satisfaction_created_at', 'customer_satisfaction')
op.drop_index('ix_satisfaction_ticket_id', 'customer_satisfaction')
op.drop_table('customer_satisfaction')
当需要转换现有数据时:
"""convert ticket status to new enum
Revision ID: data001
Revises: xyz789
Create Date: 2025-01-15 12:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table, column
revision = 'data001'
down_revision = 'xyz789'
def upgrade() -> None:
# Create new status column
op.add_column('tickets',
sa.Column('status_new', sa.String(50), nullable=True)
)
# Migrate data using bulk update
tickets = table('tickets',
column('status', sa.String),
column('status_new', sa.String)
)
# Map old statuses to new ones
status_mapping = {
'open': 'OPEN',
'in_progress': 'IN_PROGRESS',
'pending': 'WAITING_ON_CUSTOMER',
'resolved': 'RESOLVED',
'closed': 'CLOSED'
}
connection = op.get_bind()
for old_status, new_status in status_mapping.items():
connection.execute(
tickets.update().where(
tickets.c.status == old_status
).values(status_new=new_status)
)
# Make new column non-nullable now that data is migrated
op.alter_column('tickets', 'status_new', nullable=False)
# Drop old column and rename new one
op.drop_column('tickets', 'status')
op.alter_column('tickets', 'status_new', new_column_name='status')
def downgrade() -> None:
# Reverse the migration
op.add_column('tickets',
sa.Column('status_old', sa.String(50), nullable=True)
)
tickets = table('tickets',
column('status', sa.String),
column('status_old', sa.String)
)
# Reverse mapping
reverse_mapping = {
'OPEN': 'open',
'IN_PROGRESS': 'in_progress',
'WAITING_ON_CUSTOMER': 'pending',
'RESOLVED': 'resolved',
'CLOSED': 'closed'
}
connection = op.get_bind()
for new_status, old_status in reverse_mapping.items():
connection.execute(
tickets.update().where(
tickets.c.status == new_status
).values(status_old=old_status)
)
op.alter_column('tickets', 'status_old', nullable=False)
op.drop_column('tickets', 'status')
op.alter_column('tickets', 'status_old', new_column_name='status')
对于大型表,分批处理数据:
"""add computed resolution time to tickets
Revision ID: data002
Revises: data001
Create Date: 2025-01-15 13:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table, column, select
revision = 'data002'
down_revision = 'data001'
def upgrade() -> None:
# Add new column
op.add_column('tickets',
sa.Column('resolution_time_seconds', sa.Integer(), nullable=True)
)
connection = op.get_bind()
tickets = table('tickets',
column('id', sa.Integer),
column('created_at', sa.DateTime),
column('resolved_at', sa.DateTime),
column('resolution_time_seconds', sa.Integer)
)
# Process in batches to avoid memory issues
batch_size = 1000
offset = 0
while True:
# Get batch of tickets that need processing
batch = connection.execute(
select(
tickets.c.id,
tickets.c.created_at,
tickets.c.resolved_at
).where(
sa.and_(
tickets.c.resolved_at.isnot(None),
tickets.c.resolution_time_seconds.is_(None)
)
).limit(batch_size).offset(offset)
).fetchall()
if not batch:
break
# Update batch
for row in batch:
if row.resolved_at and row.created_at:
resolution_time = (row.resolved_at - row.created_at).total_seconds()
connection.execute(
tickets.update().where(
tickets.c.id == row.id
).values(resolution_time_seconds=int(resolution_time))
)
offset += batch_size
# Now make column non-nullable for future rows
op.alter_column('tickets', 'resolution_time_seconds',
nullable=False, server_default='0')
def downgrade() -> None:
op.drop_column('tickets', 'resolution_time_seconds')
# 升级到最新修订版本(head)
alembic upgrade head
# 查看将要执行的内容(仅 SQL,不运行)
alembic upgrade head --sql
# 一次升级一步
alembic upgrade +1
# 升级到特定修订版本
alembic upgrade abc123
# 降级一个修订版本
alembic downgrade -1
# 降级到特定修订版本
alembic downgrade abc123
# 降级到基础版本(空数据库)
alembic downgrade base
# 生成降级 SQL 而不执行
alembic downgrade -1 --sql
# 显示当前数据库修订版本
alembic current
# 显示当前修订版本详细信息
alembic current --verbose
# 显示迁移历史
alembic history
# 显示标记了当前修订版本的历史
alembic history --indicate-current
# 显示特定修订版本范围
alembic history -r base:head
在客户支持系统中,你可能会有:
# 为新分支创建基础
alembic revision -m "create reporting branch" \
--head=base \
--branch-label=reporting \
--version-path=alembic/versions/reporting
# 向特定分支添加迁移
alembic revision -m "add report tables" \
--head=reporting@head
分支结构示例:
base
├── main branch
│ ├── abc123: initial schema
│ ├── def456: add tickets
│ └── ghi789: add users
└── reporting branch
├── rep001: create reports table
└── rep002: add scheduled reports
# 显示所有分支头
alembic heads
# 显示分支点
alembic branches
# 升级特定分支
alembic upgrade reporting@head
# 升级所有分支
alembic upgrade heads
当功能准备好合并时:
# 合并两个分支
alembic merge -m "merge reporting into main" \
main@head reporting@head
生成的合并迁移:
"""merge reporting into main
Revision ID: merge001
Revises: ghi789, rep002
Create Date: 2025-01-15 14:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
revision = 'merge001'
down_revision = ('ghi789', 'rep002') # Multiple parents
branch_labels = None
depends_on = None
def upgrade() -> None:
# Usually empty for simple merges
# Add code if you need to reconcile conflicting changes
pass
def downgrade() -> None:
pass
当一个分支依赖于另一个分支时:
# 创建依赖于另一个分支特定修订版本的迁移
alembic revision -m "reporting needs user table" \
--head=reporting@head \
--depends-on=def456 # Revision from main branch
# tests/test_migrations.py
import pytest
from alembic import command
from alembic.config import Config
from sqlalchemy import create_engine, inspect
from sqlalchemy.orm import sessionmaker
@pytest.fixture
def alembic_config():
"""Provide Alembic configuration for testing"""
config = Config("alembic.ini")
config.set_main_option(
"sqlalchemy.url",
"postgresql://localhost/support_test"
)
return config
@pytest.fixture
def test_db(alembic_config):
"""Create test database and apply migrations"""
# Create engine
engine = create_engine(
alembic_config.get_main_option("sqlalchemy.url")
)
# Run migrations to head
command.upgrade(alembic_config, "head")
yield engine
# Cleanup - downgrade to base
command.downgrade(alembic_config, "base")
engine.dispose()
def test_migration_creates_tickets_table(test_db):
"""Test that migrations create expected tables"""
inspector = inspect(test_db)
tables = inspector.get_table_names()
assert 'tickets' in tables
assert 'users' in tables
assert 'customer_satisfaction' in tables
def test_tickets_table_structure(test_db):
"""Test ticket table has correct columns"""
inspector = inspect(test_db)
columns = {col['name']: col for col in inspector.get_columns('tickets')}
assert 'id' in columns
assert 'priority' in columns
assert 'status' in columns
assert 'created_at' in columns
assert 'resolution_time_seconds' in columns
# Check column types
assert columns['priority']['type'].python_type == str
assert columns['status']['type'].python_type == str
def test_migration_upgrade_downgrade_cycle(alembic_config):
"""Test that upgrade -> downgrade -> upgrade works"""
# Start at base
command.downgrade(alembic_config, "base")
# Upgrade to head
command.upgrade(alembic_config, "head")
# Downgrade one step
command.downgrade(alembic_config, "-1")
# Upgrade back to head
command.upgrade(alembic_config, "head")
# Should complete without errors
def test_data_migration_preserves_data(test_db):
"""Test that data migrations don't lose data"""
from sqlalchemy.orm import sessionmaker
from myapp.models import Ticket
Session = sessionmaker(bind=test_db)
session = Session()
# Insert test data
ticket = Ticket(
title="Test ticket",
status="OPEN",
priority="high"
)
session.add(ticket)
session.commit()
ticket_id = ticket.id
session.close()
# Run a migration that modifies tickets table
# (This would be a specific revision)
# command.upgrade(alembic_config, "specific_revision")
# Verify data still exists
session = Session()
retrieved = session.query(Ticket).filter_by(id=ticket_id).first()
assert retrieved is not None
assert retrieved.title == "Test ticket"
session.close()
# tests/test_migration_integration.py
import pytest
from alembic import command
from alembic.config import Config
from alembic.script import ScriptDirectory
from alembic.runtime.migration import MigrationContext
def test_no_pending_migrations(alembic_config, test_db):
"""Ensure all migrations are applied in test environment"""
script = ScriptDirectory.from_config(alembic_config)
with test_db.connect() as connection:
context = MigrationContext.configure(connection)
current_heads = set(context.get_current_heads())
script_heads = set(script.get_heads())
assert current_heads == script_heads, \
f"Database has pending migrations. Current: {current_heads}, Expected: {script_heads}"
def test_migration_order_is_valid(alembic_config):
"""Verify migration chain has no gaps or conflicts"""
script = ScriptDirectory.from_config(alembic_config)
# Get all revisions
revisions = list(script.walk_revisions())
# Check each revision has valid down_revision
for revision in revisions:
if revision.down_revision is not None:
if isinstance(revision.down_revision, tuple):
# Merge point
for down_rev in revision.down_revision:
assert script.get_revision(down_rev) is not None
else:
assert script.get_revision(revision.down_revision) is not None
def test_check_command_detects_drift(alembic_config, test_db):
"""Test that check command detects schema drift"""
# This test verifies that `alembic check` works correctly
try:
command.check(alembic_config)
# If no exception, database matches models
assert True
except Exception as e:
# If exception, there's drift between DB and models
pytest.fail(f"Schema drift detected: {e}")
# tests/test_migration_performance.py
import time
import pytest
from alembic import command
def test_migration_completes_within_time_limit(alembic_config):
"""Ensure migrations complete within acceptable time"""
# Downgrade to base
command.downgrade(alembic_config, "base")
# Time the upgrade
start = time.time()
command.upgrade(alembic_config, "head")
duration = time.time() - start
# Assert completes within 60 seconds
assert duration < 60, f"Migration took {duration}s, exceeds 60s limit"
@pytest.mark.slow
def test_data_migration_with_large_dataset(alembic_config, test_db):
"""Test data migration performance with realistic data volume"""
from sqlalchemy.orm import sessionmaker
from myapp.models import Ticket
Session = sessionmaker(bind=test_db)
session = Session()
# Create 10,000 test tickets
tickets = [
Ticket(
title=f"Test ticket {i}",
status="OPEN",
priority="normal"
)
for i in range(10000)
]
session.bulk_save_objects(tickets)
session.commit()
session.close()
# Run data migration and measure time
start = time.time()
command.upgrade(alembic_config, "data002") # Specific data migration
duration = time.time() - start
# Should process 10k records in reasonable time
assert duration < 30, f"Data migration took {duration}s for 10k records"
# .github/workflows/migrations.yml
name: Database Migrations
on:
pull_request:
paths:
- 'alembic/versions/**'
- 'myapp/models/**'
- 'alembic.ini'
- 'alembic/env.py'
push:
branches:
- main
- develop
jobs:
test-migrations:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:15
env:
POSTGRES_PASSWORD: postgres
POSTGRES_DB: support_test
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest pytest-cov
- name: Run migration tests
env:
DATABASE_URL: postgresql://postgres:postgres@localhost/support_test
run: |
# Test upgrade to head
alembic upgrade head
# Test downgrade to base
alembic downgrade base
# Test upgrade again
alembic upgrade head
# Run pytest for migration tests
pytest tests/test_migrations.py -v
- name: Check for schema drift
env:
DATABASE_URL: postgresql://postgres:postgres@localhost/support_test
run: |
alembic check
- name: Validate migration history
run: |
# Check for multiple heads (should be only one)
HEADS_COUNT=$(alembic heads | wc -l)
if [ "$HEADS_COUNT" -gt 1 ]; then
echo "ERROR: Multiple heads detected. Please merge branches."
alembic heads
exit 1
fi
review-migration-sql:
runs-on: ubuntu-latest
if: github.event_name == 'pull_request'
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Generate SQL for review
run: |
# Generate SQL without executing
alembic upgrade head --sql > migration.sql
- name: Upload SQL artifact
uses: actions/upload-artifact@v3
with:
name: migration-sql
path: migration.sql
- name: Comment PR with SQL
uses: actions/github-script@v6
with:
script: |
const fs = require('fs');
const sql = fs.readFileSync('migration.sql', 'utf8');
github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: `## Migration SQL\n\n\`\`\`sql\n${sql}\n\`\`\``
});
#!/bin/bash
# scripts/deploy_migrations.sh
set -e # Exit on error
echo "Starting database migration deployment..."
# Environment variables
DB_HOST="${DB_HOST:-localhost}"
DB_NAME="${DB_NAME:-support_prod}"
DB_USER="${DB_USER:-postgres}"
DATABASE_URL="postgresql://${DB_USER}:${DB_PASSWORD}@${DB_HOST}/${DB_NAME}"
# Configuration
BACKUP_DIR="./backups"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
BACKUP_FILE="${BACKUP_DIR}/pre_migration_${TIMESTAMP}.sql"
# Create backup directory
mkdir -p "$BACKUP_DIR"
# 1. Backup database before migration
echo "Creating database backup..."
pg_dump "$DATABASE_URL" > "$BACKUP_FILE"
echo "Backup created: $BACKUP_FILE"
# 2. Check current migration status
echo "Current migration status:"
alembic current
# 3. Show pending migrations
echo "Pending migrations:"
alembic history --verbose | grep -A 5 "head"
# 4. Run migrations with timeout
echo "Running migrations..."
timeout 300 alembic upgrade head || {
echo "ERROR: Migration failed or timed out!"
echo "Restoring from backup..."
psql "$DATABASE_URL" < "$BACKUP_FILE"
exit 1
}
# 5. Verify migration success
echo "Verifying migration status..."
CURRENT_REV=$(alembic current | grep "Rev:" | awk '{print $2}')
HEAD_REV=$(alembic heads | awk '{print $1}')
if [ "$CURRENT_REV" != "$HEAD_REV" ]; then
echo "ERROR: Migration incomplete. Current: $CURRENT_REV, Expected: $HEAD_REV"
echo "Restoring from backup..."
psql "$DATABASE_URL" < "$BACKUP_FILE"
exit 1
fi
echo "Migration completed successfully!"
echo "Current revision: $CURRENT_REV"
# 6. Cleanup old backups (keep last 10)
echo "Cleaning up old backups..."
ls -t "$BACKUP_DIR"/*.sql | tail -n +11 | xargs -r rm
echo "Deployment complete!"
对于无法离线的重要支持系统:
阶段 1:添加性更改
"""add new column (phase 1)
Revision ID: zd001
"""
def upgrade() -> None:
# Add new column as nullable
op.add_column('tickets',
sa.Column('new_field', sa.String(100), nullable=True)
)
def downgrade() -> None:
op.drop_column('tickets', 'new_field')
阶段 2:数据迁移(后台)
"""populate new column (phase 2)
Revision ID: zd002
"""
def upgrade() -> None:
# Update in small batches during low-traffic periods
connection = op.get_bind()
batch_size = 100
while True:
result = connection.execute(
"""
UPDATE tickets
SET new_field = calculate_value(old_field)
WHERE new_field IS NULL
LIMIT {batch_size}
""".format(batch_size=batch_size)
)
if result.rowcount == 0:
break
# Small delay to reduce database load
import time
time.sleep(0.1)
def downgrade() -> None:
connection = op.get_bind()
connection.execute("UPDATE tickets SET new_field = NULL")
阶段 3:设为必需
"""make new column required (phase 3)
Revision ID: zd003
"""
def upgrade() -> None:
# Now that all rows have values, make it non-nullable
op.alter_column('tickets', 'new_field',
nullable=False,
server_default='default_value'
)
def downgrade() -> None:
op.alter_column('tickets', 'new_field',
nullable=True,
server_default=None
)
阶段 4:删除旧列(可选)
"""remove old column (phase 4)
Revision ID: zd004
"""
def upgrade() -> None:
op.drop_column('tickets', 'old_field')
def downgrade() -> None:
op.add_column('tickets',
sa.Column('old_field', sa.String(100), nullable=True)
)
# alembic/env.py additions for error handling
from alembic import context
import logging
logger = logging.getLogger('alembic.env')
def run_migrations_online():
"""Run migrations in 'online' mode with error handling"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix='sqlalchemy.',
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
transaction_per_migration=True, # Rollback individual migrations
compare_type=True,
compare_server_default=True
)
try:
with context.begin_transaction():
context.run_migrations()
except Exception as e:
logger.error(f"Migration failed: {e}")
logger.error("Rolling back transaction...")
# Transaction automatically rolled back
raise
else:
logger.info("Migration completed successfully")
为你的组织创建自定义模板:
# alembic/script.py.mako
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
Author: ${author if author else 'Support Team'}
Jira: ${jira_ticket if jira_ticket else 'N/A'}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade() -> None:
"""Apply migration changes"""
${upgrades if upgrades else "pass"}
def downgrade() -> None:
"""Revert migration changes"""
${downgrades if downgrades else "pass"}
对于具有独立数据库的系统(例如,主数据库 + 分析数据库):
# alembic/env.py for multiple databases
def run_migrations_online():
"""Run migrations for multiple databases"""
# Configuration for each database
engines = {
'main': {
'url': os.getenv('MAIN_DB_URL'),
'target_metadata': main_metadata
},
'analytics': {
'url': os.getenv('ANALYTICS_DB_URL'),
'target_metadata': analytics_metadata
}
}
for name, config in engines.items():
logger.info(f"Running migrations for {name} database")
engine = create_engine(config['url'])
with engine.connect() as connection:
context.configure(
connection=connection,
target_metadata=config['target_metadata'],
upgrade_token=f"{name}_upgrade",
downgrade_token=f"{name}_downgrade"
)
with context.begin_transaction():
context.run_migrations(engine_name=name)
多个头错误
# 问题:"Multiple heads exist"
# 解决方案:合并分支
alembic merge heads -m "merge branches"
迁移不同步
# 问题:数据库修订版本与迁移历史不匹配
# 解决方案:将数据库标记到特定修订版本
alembic stamp head
# 或标记到特定修订版本
alembic stamp abc123
失败的迁移清理
# 问题:迁移中途失败
# 解决方案:手动清理
# 1. 检查当前状态
alembic current
# 2. 手动修复数据库问题
psql $DATABASE_URL
# 3. 标记到正确的修订版本
alembic stamp previous_working_revision
# 4. 再次尝试迁移
alembic upgrade head
循环依赖
# 问题:"Circular dependency detected"
# 解决方案:使用 depends_on 替代 down_revision
alembic revision -m "fix circular dependency" \
--head=branch_a@head \
--depends-on=branch_b_revision
本技能涵盖了客户支持系统中 Alembic 的全面使用:
始终记住:
更多示例,请参阅此技能包中的 EXAMPLES.md。
每周安装次数
99
代码仓库
GitHub 星标数
46
首次出现
Jan 22, 2026
安全审计
安装于
opencode82
codex80
gemini-cli79
github-copilot75
cursor
This skill provides comprehensive guidance for managing database migrations using Alembic in customer support environments. It covers everything from initial setup through complex production deployment scenarios, with a focus on maintaining data integrity and minimizing downtime for support operations.
Alembic is a lightweight database migration tool for use with SQLAlchemy. It provides a way to manage changes to your database schema over time through version-controlled migration scripts. For customer support systems, this means:
# Install Alembic with PostgreSQL support
pip install alembic psycopg2-binary sqlalchemy
# Or add to requirements.txt
alembic>=1.13.0
sqlalchemy>=2.0.0
psycopg2-binary>=2.9.0
# Initialize Alembic (creates alembic/ directory and alembic.ini)
alembic init alembic
# For multiple database support
alembic init --template multidb alembic
This creates:
alembic/: Directory containing migration scriptsalembic/versions/: Where individual migration files livealembic/env.py: Migration environment configurationalembic.ini: Alembic configuration fileEdit alembic.ini to set your database URL:
# For development
sqlalchemy.url = postgresql://user:password@localhost/support_dev
# For production (use environment variables)
sqlalchemy.url = postgresql://%(DB_USER)s:%(DB_PASSWORD)s@%(DB_HOST)s/%(DB_NAME)s
Better approach - use environment variables in env.py:
import os
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context
# Import your models
from myapp.models import Base
# This is the Alembic Config object
config = context.config
# Override sqlalchemy.url from environment
db_url = os.getenv('DATABASE_URL', 'postgresql://localhost/support_dev')
config.set_main_option('sqlalchemy.url', db_url)
# Set up target metadata for autogenerate
target_metadata = Base.metadata
Create a migration manually when you need precise control:
# Create empty migration file
alembic revision -m "add ticket priority column"
This generates a file like versions/abc123_add_ticket_priority_column.py:
"""add ticket priority column
Revision ID: abc123
Revises: def456
Create Date: 2025-01-15 10:30:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers
revision = 'abc123'
down_revision = 'def456'
branch_labels = None
depends_on = None
def upgrade() -> None:
# Add priority column to tickets table
op.add_column('tickets',
sa.Column('priority', sa.String(20), nullable=True, server_default='normal')
)
# Create index for performance
op.create_index('ix_tickets_priority', 'tickets', ['priority'])
def downgrade() -> None:
# Remove index first
op.drop_index('ix_tickets_priority', 'tickets')
# Remove column
op.drop_column('tickets', 'priority')
Let Alembic detect schema changes automatically:
# Generate migration by comparing models to database
alembic revision --autogenerate -m "add customer satisfaction table"
Important : Always review autogenerated migrations! They may miss:
Example autogenerated migration:
"""add customer satisfaction table
Revision ID: xyz789
Revises: abc123
Create Date: 2025-01-15 11:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
revision = 'xyz789'
down_revision = 'abc123'
branch_labels = None
depends_on = None
def upgrade() -> None:
# Auto-generated - review before running!
op.create_table(
'customer_satisfaction',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('ticket_id', sa.Integer(), nullable=False),
sa.Column('rating', sa.Integer(), nullable=False),
sa.Column('feedback', sa.Text(), nullable=True),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(['ticket_id'], ['tickets.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id')
)
op.create_index('ix_satisfaction_ticket_id', 'customer_satisfaction', ['ticket_id'])
op.create_index('ix_satisfaction_created_at', 'customer_satisfaction', ['created_at'])
def downgrade() -> None:
op.drop_index('ix_satisfaction_created_at', 'customer_satisfaction')
op.drop_index('ix_satisfaction_ticket_id', 'customer_satisfaction')
op.drop_table('customer_satisfaction')
When you need to transform existing data:
"""convert ticket status to new enum
Revision ID: data001
Revises: xyz789
Create Date: 2025-01-15 12:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table, column
revision = 'data001'
down_revision = 'xyz789'
def upgrade() -> None:
# Create new status column
op.add_column('tickets',
sa.Column('status_new', sa.String(50), nullable=True)
)
# Migrate data using bulk update
tickets = table('tickets',
column('status', sa.String),
column('status_new', sa.String)
)
# Map old statuses to new ones
status_mapping = {
'open': 'OPEN',
'in_progress': 'IN_PROGRESS',
'pending': 'WAITING_ON_CUSTOMER',
'resolved': 'RESOLVED',
'closed': 'CLOSED'
}
connection = op.get_bind()
for old_status, new_status in status_mapping.items():
connection.execute(
tickets.update().where(
tickets.c.status == old_status
).values(status_new=new_status)
)
# Make new column non-nullable now that data is migrated
op.alter_column('tickets', 'status_new', nullable=False)
# Drop old column and rename new one
op.drop_column('tickets', 'status')
op.alter_column('tickets', 'status_new', new_column_name='status')
def downgrade() -> None:
# Reverse the migration
op.add_column('tickets',
sa.Column('status_old', sa.String(50), nullable=True)
)
tickets = table('tickets',
column('status', sa.String),
column('status_old', sa.String)
)
# Reverse mapping
reverse_mapping = {
'OPEN': 'open',
'IN_PROGRESS': 'in_progress',
'WAITING_ON_CUSTOMER': 'pending',
'RESOLVED': 'resolved',
'CLOSED': 'closed'
}
connection = op.get_bind()
for new_status, old_status in reverse_mapping.items():
connection.execute(
tickets.update().where(
tickets.c.status == new_status
).values(status_old=old_status)
)
op.alter_column('tickets', 'status_old', nullable=False)
op.drop_column('tickets', 'status')
op.alter_column('tickets', 'status_old', new_column_name='status')
For large tables, process data in batches:
"""add computed resolution time to tickets
Revision ID: data002
Revises: data001
Create Date: 2025-01-15 13:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table, column, select
revision = 'data002'
down_revision = 'data001'
def upgrade() -> None:
# Add new column
op.add_column('tickets',
sa.Column('resolution_time_seconds', sa.Integer(), nullable=True)
)
connection = op.get_bind()
tickets = table('tickets',
column('id', sa.Integer),
column('created_at', sa.DateTime),
column('resolved_at', sa.DateTime),
column('resolution_time_seconds', sa.Integer)
)
# Process in batches to avoid memory issues
batch_size = 1000
offset = 0
while True:
# Get batch of tickets that need processing
batch = connection.execute(
select(
tickets.c.id,
tickets.c.created_at,
tickets.c.resolved_at
).where(
sa.and_(
tickets.c.resolved_at.isnot(None),
tickets.c.resolution_time_seconds.is_(None)
)
).limit(batch_size).offset(offset)
).fetchall()
if not batch:
break
# Update batch
for row in batch:
if row.resolved_at and row.created_at:
resolution_time = (row.resolved_at - row.created_at).total_seconds()
connection.execute(
tickets.update().where(
tickets.c.id == row.id
).values(resolution_time_seconds=int(resolution_time))
)
offset += batch_size
# Now make column non-nullable for future rows
op.alter_column('tickets', 'resolution_time_seconds',
nullable=False, server_default='0')
def downgrade() -> None:
op.drop_column('tickets', 'resolution_time_seconds')
# Upgrade to latest revision (head)
alembic upgrade head
# See what would be executed (SQL only, don't run)
alembic upgrade head --sql
# Upgrade one step at a time
alembic upgrade +1
# Upgrade to specific revision
alembic upgrade abc123
# Downgrade one revision
alembic downgrade -1
# Downgrade to specific revision
alembic downgrade abc123
# Downgrade to base (empty database)
alembic downgrade base
# Generate SQL for downgrade without executing
alembic downgrade -1 --sql
# Show current database revision
alembic current
# Show current revision with details
alembic current --verbose
# Show migration history
alembic history
# Show history with current revision marked
alembic history --indicate-current
# Show specific revision range
alembic history -r base:head
In customer support systems, you might have:
# Create base for new branch
alembic revision -m "create reporting branch" \
--head=base \
--branch-label=reporting \
--version-path=alembic/versions/reporting
# Add migration to specific branch
alembic revision -m "add report tables" \
--head=reporting@head
Example branch structure:
base
├── main branch
│ ├── abc123: initial schema
│ ├── def456: add tickets
│ └── ghi789: add users
└── reporting branch
├── rep001: create reports table
└── rep002: add scheduled reports
# Show all branch heads
alembic heads
# Show branch points
alembic branches
# Upgrade specific branch
alembic upgrade reporting@head
# Upgrade all branches
alembic upgrade heads
When features are ready to merge:
# Merge two branches
alembic merge -m "merge reporting into main" \
main@head reporting@head
Generated merge migration:
"""merge reporting into main
Revision ID: merge001
Revises: ghi789, rep002
Create Date: 2025-01-15 14:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
revision = 'merge001'
down_revision = ('ghi789', 'rep002') # Multiple parents
branch_labels = None
depends_on = None
def upgrade() -> None:
# Usually empty for simple merges
# Add code if you need to reconcile conflicting changes
pass
def downgrade() -> None:
pass
When one branch depends on another:
# Create migration that depends on specific revision from another branch
alembic revision -m "reporting needs user table" \
--head=reporting@head \
--depends-on=def456 # Revision from main branch
# tests/test_migrations.py
import pytest
from alembic import command
from alembic.config import Config
from sqlalchemy import create_engine, inspect
from sqlalchemy.orm import sessionmaker
@pytest.fixture
def alembic_config():
"""Provide Alembic configuration for testing"""
config = Config("alembic.ini")
config.set_main_option(
"sqlalchemy.url",
"postgresql://localhost/support_test"
)
return config
@pytest.fixture
def test_db(alembic_config):
"""Create test database and apply migrations"""
# Create engine
engine = create_engine(
alembic_config.get_main_option("sqlalchemy.url")
)
# Run migrations to head
command.upgrade(alembic_config, "head")
yield engine
# Cleanup - downgrade to base
command.downgrade(alembic_config, "base")
engine.dispose()
def test_migration_creates_tickets_table(test_db):
"""Test that migrations create expected tables"""
inspector = inspect(test_db)
tables = inspector.get_table_names()
assert 'tickets' in tables
assert 'users' in tables
assert 'customer_satisfaction' in tables
def test_tickets_table_structure(test_db):
"""Test ticket table has correct columns"""
inspector = inspect(test_db)
columns = {col['name']: col for col in inspector.get_columns('tickets')}
assert 'id' in columns
assert 'priority' in columns
assert 'status' in columns
assert 'created_at' in columns
assert 'resolution_time_seconds' in columns
# Check column types
assert columns['priority']['type'].python_type == str
assert columns['status']['type'].python_type == str
def test_migration_upgrade_downgrade_cycle(alembic_config):
"""Test that upgrade -> downgrade -> upgrade works"""
# Start at base
command.downgrade(alembic_config, "base")
# Upgrade to head
command.upgrade(alembic_config, "head")
# Downgrade one step
command.downgrade(alembic_config, "-1")
# Upgrade back to head
command.upgrade(alembic_config, "head")
# Should complete without errors
def test_data_migration_preserves_data(test_db):
"""Test that data migrations don't lose data"""
from sqlalchemy.orm import sessionmaker
from myapp.models import Ticket
Session = sessionmaker(bind=test_db)
session = Session()
# Insert test data
ticket = Ticket(
title="Test ticket",
status="OPEN",
priority="high"
)
session.add(ticket)
session.commit()
ticket_id = ticket.id
session.close()
# Run a migration that modifies tickets table
# (This would be a specific revision)
# command.upgrade(alembic_config, "specific_revision")
# Verify data still exists
session = Session()
retrieved = session.query(Ticket).filter_by(id=ticket_id).first()
assert retrieved is not None
assert retrieved.title == "Test ticket"
session.close()
# tests/test_migration_integration.py
import pytest
from alembic import command
from alembic.config import Config
from alembic.script import ScriptDirectory
from alembic.runtime.migration import MigrationContext
def test_no_pending_migrations(alembic_config, test_db):
"""Ensure all migrations are applied in test environment"""
script = ScriptDirectory.from_config(alembic_config)
with test_db.connect() as connection:
context = MigrationContext.configure(connection)
current_heads = set(context.get_current_heads())
script_heads = set(script.get_heads())
assert current_heads == script_heads, \
f"Database has pending migrations. Current: {current_heads}, Expected: {script_heads}"
def test_migration_order_is_valid(alembic_config):
"""Verify migration chain has no gaps or conflicts"""
script = ScriptDirectory.from_config(alembic_config)
# Get all revisions
revisions = list(script.walk_revisions())
# Check each revision has valid down_revision
for revision in revisions:
if revision.down_revision is not None:
if isinstance(revision.down_revision, tuple):
# Merge point
for down_rev in revision.down_revision:
assert script.get_revision(down_rev) is not None
else:
assert script.get_revision(revision.down_revision) is not None
def test_check_command_detects_drift(alembic_config, test_db):
"""Test that check command detects schema drift"""
# This test verifies that `alembic check` works correctly
try:
command.check(alembic_config)
# If no exception, database matches models
assert True
except Exception as e:
# If exception, there's drift between DB and models
pytest.fail(f"Schema drift detected: {e}")
# tests/test_migration_performance.py
import time
import pytest
from alembic import command
def test_migration_completes_within_time_limit(alembic_config):
"""Ensure migrations complete within acceptable time"""
# Downgrade to base
command.downgrade(alembic_config, "base")
# Time the upgrade
start = time.time()
command.upgrade(alembic_config, "head")
duration = time.time() - start
# Assert completes within 60 seconds
assert duration < 60, f"Migration took {duration}s, exceeds 60s limit"
@pytest.mark.slow
def test_data_migration_with_large_dataset(alembic_config, test_db):
"""Test data migration performance with realistic data volume"""
from sqlalchemy.orm import sessionmaker
from myapp.models import Ticket
Session = sessionmaker(bind=test_db)
session = Session()
# Create 10,000 test tickets
tickets = [
Ticket(
title=f"Test ticket {i}",
status="OPEN",
priority="normal"
)
for i in range(10000)
]
session.bulk_save_objects(tickets)
session.commit()
session.close()
# Run data migration and measure time
start = time.time()
command.upgrade(alembic_config, "data002") # Specific data migration
duration = time.time() - start
# Should process 10k records in reasonable time
assert duration < 30, f"Data migration took {duration}s for 10k records"
# .github/workflows/migrations.yml
name: Database Migrations
on:
pull_request:
paths:
- 'alembic/versions/**'
- 'myapp/models/**'
- 'alembic.ini'
- 'alembic/env.py'
push:
branches:
- main
- develop
jobs:
test-migrations:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:15
env:
POSTGRES_PASSWORD: postgres
POSTGRES_DB: support_test
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest pytest-cov
- name: Run migration tests
env:
DATABASE_URL: postgresql://postgres:postgres@localhost/support_test
run: |
# Test upgrade to head
alembic upgrade head
# Test downgrade to base
alembic downgrade base
# Test upgrade again
alembic upgrade head
# Run pytest for migration tests
pytest tests/test_migrations.py -v
- name: Check for schema drift
env:
DATABASE_URL: postgresql://postgres:postgres@localhost/support_test
run: |
alembic check
- name: Validate migration history
run: |
# Check for multiple heads (should be only one)
HEADS_COUNT=$(alembic heads | wc -l)
if [ "$HEADS_COUNT" -gt 1 ]; then
echo "ERROR: Multiple heads detected. Please merge branches."
alembic heads
exit 1
fi
review-migration-sql:
runs-on: ubuntu-latest
if: github.event_name == 'pull_request'
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Generate SQL for review
run: |
# Generate SQL without executing
alembic upgrade head --sql > migration.sql
- name: Upload SQL artifact
uses: actions/upload-artifact@v3
with:
name: migration-sql
path: migration.sql
- name: Comment PR with SQL
uses: actions/github-script@v6
with:
script: |
const fs = require('fs');
const sql = fs.readFileSync('migration.sql', 'utf8');
github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: `## Migration SQL\n\n\`\`\`sql\n${sql}\n\`\`\``
});
#!/bin/bash
# scripts/deploy_migrations.sh
set -e # Exit on error
echo "Starting database migration deployment..."
# Environment variables
DB_HOST="${DB_HOST:-localhost}"
DB_NAME="${DB_NAME:-support_prod}"
DB_USER="${DB_USER:-postgres}"
DATABASE_URL="postgresql://${DB_USER}:${DB_PASSWORD}@${DB_HOST}/${DB_NAME}"
# Configuration
BACKUP_DIR="./backups"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
BACKUP_FILE="${BACKUP_DIR}/pre_migration_${TIMESTAMP}.sql"
# Create backup directory
mkdir -p "$BACKUP_DIR"
# 1. Backup database before migration
echo "Creating database backup..."
pg_dump "$DATABASE_URL" > "$BACKUP_FILE"
echo "Backup created: $BACKUP_FILE"
# 2. Check current migration status
echo "Current migration status:"
alembic current
# 3. Show pending migrations
echo "Pending migrations:"
alembic history --verbose | grep -A 5 "head"
# 4. Run migrations with timeout
echo "Running migrations..."
timeout 300 alembic upgrade head || {
echo "ERROR: Migration failed or timed out!"
echo "Restoring from backup..."
psql "$DATABASE_URL" < "$BACKUP_FILE"
exit 1
}
# 5. Verify migration success
echo "Verifying migration status..."
CURRENT_REV=$(alembic current | grep "Rev:" | awk '{print $2}')
HEAD_REV=$(alembic heads | awk '{print $1}')
if [ "$CURRENT_REV" != "$HEAD_REV" ]; then
echo "ERROR: Migration incomplete. Current: $CURRENT_REV, Expected: $HEAD_REV"
echo "Restoring from backup..."
psql "$DATABASE_URL" < "$BACKUP_FILE"
exit 1
fi
echo "Migration completed successfully!"
echo "Current revision: $CURRENT_REV"
# 6. Cleanup old backups (keep last 10)
echo "Cleaning up old backups..."
ls -t "$BACKUP_DIR"/*.sql | tail -n +11 | xargs -r rm
echo "Deployment complete!"
For critical support systems that can't go offline:
Phase 1: Additive Changes
"""add new column (phase 1)
Revision ID: zd001
"""
def upgrade() -> None:
# Add new column as nullable
op.add_column('tickets',
sa.Column('new_field', sa.String(100), nullable=True)
)
def downgrade() -> None:
op.drop_column('tickets', 'new_field')
Phase 2: Data Migration (Background)
"""populate new column (phase 2)
Revision ID: zd002
"""
def upgrade() -> None:
# Update in small batches during low-traffic periods
connection = op.get_bind()
batch_size = 100
while True:
result = connection.execute(
"""
UPDATE tickets
SET new_field = calculate_value(old_field)
WHERE new_field IS NULL
LIMIT {batch_size}
""".format(batch_size=batch_size)
)
if result.rowcount == 0:
break
# Small delay to reduce database load
import time
time.sleep(0.1)
def downgrade() -> None:
connection = op.get_bind()
connection.execute("UPDATE tickets SET new_field = NULL")
Phase 3: Make Required
"""make new column required (phase 3)
Revision ID: zd003
"""
def upgrade() -> None:
# Now that all rows have values, make it non-nullable
op.alter_column('tickets', 'new_field',
nullable=False,
server_default='default_value'
)
def downgrade() -> None:
op.alter_column('tickets', 'new_field',
nullable=True,
server_default=None
)
Phase 4: Remove Old Column (Optional)
"""remove old column (phase 4)
Revision ID: zd004
"""
def upgrade() -> None:
op.drop_column('tickets', 'old_field')
def downgrade() -> None:
op.add_column('tickets',
sa.Column('old_field', sa.String(100), nullable=True)
)
# alembic/env.py additions for error handling
from alembic import context
import logging
logger = logging.getLogger('alembic.env')
def run_migrations_online():
"""Run migrations in 'online' mode with error handling"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix='sqlalchemy.',
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
transaction_per_migration=True, # Rollback individual migrations
compare_type=True,
compare_server_default=True
)
try:
with context.begin_transaction():
context.run_migrations()
except Exception as e:
logger.error(f"Migration failed: {e}")
logger.error("Rolling back transaction...")
# Transaction automatically rolled back
raise
else:
logger.info("Migration completed successfully")
Create custom template for your organization:
# alembic/script.py.mako
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
Author: ${author if author else 'Support Team'}
Jira: ${jira_ticket if jira_ticket else 'N/A'}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade() -> None:
"""Apply migration changes"""
${upgrades if upgrades else "pass"}
def downgrade() -> None:
"""Revert migration changes"""
${downgrades if downgrades else "pass"}
For systems with separate databases (e.g., main DB + analytics):
# alembic/env.py for multiple databases
def run_migrations_online():
"""Run migrations for multiple databases"""
# Configuration for each database
engines = {
'main': {
'url': os.getenv('MAIN_DB_URL'),
'target_metadata': main_metadata
},
'analytics': {
'url': os.getenv('ANALYTICS_DB_URL'),
'target_metadata': analytics_metadata
}
}
for name, config in engines.items():
logger.info(f"Running migrations for {name} database")
engine = create_engine(config['url'])
with engine.connect() as connection:
context.configure(
connection=connection,
target_metadata=config['target_metadata'],
upgrade_token=f"{name}_upgrade",
downgrade_token=f"{name}_downgrade"
)
with context.begin_transaction():
context.run_migrations(engine_name=name)
Multiple Heads Error
# Problem: "Multiple heads exist"
# Solution: Merge the branches
alembic merge heads -m "merge branches"
Migration Out of Sync
# Problem: Database revision doesn't match migration history
# Solution: Stamp database to specific revision
alembic stamp head
# Or stamp to specific revision
alembic stamp abc123
Failed Migration Cleanup
# Problem: Migration failed midway
# Solution: Manual cleanup
# 1. Check current state
alembic current
# 2. Manually fix database issues
psql $DATABASE_URL
# 3. Stamp to correct revision
alembic stamp previous_working_revision
# 4. Try migration again
alembic upgrade head
Circular Dependencies
# Problem: "Circular dependency detected"
# Solution: Use depends_on instead of down_revision
alembic revision -m "fix circular dependency" \
--head=branch_a@head \
--depends-on=branch_b_revision
This skill covered comprehensive Alembic usage for customer support systems:
Always remember:
For more examples, see EXAMPLES.md in this skill package.
Weekly Installs
99
Repository
GitHub Stars
46
First Seen
Jan 22, 2026
Security Audits
Gen Agent Trust HubFailSocketPassSnykPass
Installed on
opencode82
codex80
gemini-cli79
github-copilot75
cursor69
amp63
电子表格技能:使用Python自动化创建、编辑、分析与可视化Excel/CSV数据
265 周安装
故事分析诊断工具:系统性评估短篇小说与小说章节的叙事质量与改进机会
268 周安装
React Email JSON 渲染器 - 将 JSON 规范转换为 HTML/纯文本邮件 | Vercel Labs
277 周安装
Auto-Skill 自进化知识系统:AI助手自动化技能管理与知识库构建指南
271 周安装
阿里云产品API发现工具 - 自动化收集OpenAPI元数据与技能规划
275 周安装
lesson经验提取存储工具 - 从对话中自动捕获技术陷阱与决策原则
280 周安装