ai-pipeline-orchestration by bagelhole/devops-security-agent-skills
npx skills add https://github.com/bagelhole/devops-security-agent-skills --skill ai-pipeline-orchestration构建可靠、可观测的 AI 工作流——从文档摄取到批量推理,再到模型训练流水线。
在以下场景中使用此技能:
| 工具 | 最适合 | 复杂度 | GPU 作业支持 |
|---|---|---|---|
| Prefect | 现代 Python 优先;易于采用 | 低 | 良好 |
| Airflow | 复杂 DAG;大型团队;已有使用经验 | 高 | 良好 |
| Dagster | 以资产为中心;强大的数据血缘 | 中等 | 优秀 |
| Temporal | 长时间运行的工作流;可靠性优先 | 中等 |
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
| 良好 |
pip install prefect prefect-kubernetes
# 启动 Prefect 服务器(或使用 Prefect Cloud)
prefect server start
# 在另一个终端中
prefect worker start --pool default-agent-pool
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta
import hashlib
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=24))
def fetch_documents(source_url: str) -> list[dict]:
"""从源获取文档;缓存以避免重复获取。"""
logger = get_run_logger()
logger.info(f"Fetching from {source_url}")
# ... 获取逻辑
return documents
@task(retries=3, retry_delay_seconds=30)
def chunk_and_embed(documents: list[dict]) -> list[dict]:
"""分块文档并生成嵌入,失败时重试。"""
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("BAAI/bge-large-en-v1.5")
chunks = []
for doc in documents:
doc_chunks = chunk_text(doc["content"])
embeddings = model.encode(doc_chunks, batch_size=64)
for chunk, emb in zip(doc_chunks, embeddings):
chunks.append({"text": chunk, "embedding": emb.tolist(),
"source": doc["url"], "doc_hash": doc["hash"]})
return chunks
@task(retries=2)
def upsert_to_vector_store(chunks: list[dict]) -> int:
"""将嵌入向量更新插入到 Qdrant,跳过未更改的文档。"""
from qdrant_client import QdrantClient
client = QdrantClient("http://qdrant:6333")
client.upsert(collection_name="knowledge-base", points=[...])
return len(chunks)
@flow(name="rag-ingestion", log_prints=True)
def rag_ingestion_pipeline(sources: list[str]):
"""完整的 RAG 摄取流程——每日运行。"""
logger = get_run_logger()
total = 0
for source in sources:
docs = fetch_documents(source)
chunks = chunk_and_embed(docs)
count = upsert_to_vector_store(chunks)
total += count
logger.info(f"Ingested {count} chunks from {source}")
logger.info(f"Pipeline complete: {total} total chunks indexed")
if __name__ == "__main__":
rag_ingestion_pipeline.serve(
name="daily-rag-ingestion",
cron="0 2 * * *", # 每日凌晨 2 点
parameters={"sources": ["https://docs.myapp.com", "https://api.myapp.com/kb"]},
)
from prefect import flow, task
from prefect.concurrency.sync import concurrency
import asyncio
from openai import AsyncOpenAI
@task(retries=3, retry_delay_seconds=60)
async def process_batch(items: list[dict], model: str = "gpt-4o-mini") -> list[dict]:
"""通过 LLM 处理一批项目,并实施速率限制保护。"""
client = AsyncOpenAI()
async with concurrency("openai-api", occupy=len(items)): # 速率限制
tasks = [
client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": item["prompt"]}],
max_tokens=256,
)
for item in items
]
responses = await asyncio.gather(*tasks, return_exceptions=True)
results = []
for item, response in zip(items, responses):
if isinstance(response, Exception):
results.append({**item, "error": str(response), "output": None})
else:
results.append({**item, "output": response.choices[0].message.content})
return results
@flow(name="batch-llm-inference")
async def batch_inference_flow(input_file: str, output_file: str, batch_size: int = 50):
import json
items = [json.loads(line) for line in open(input_file)]
batches = [items[i:i+batch_size] for i in range(0, len(items), batch_size)]
all_results = []
for batch in batches:
results = await process_batch(batch)
all_results.extend(results)
with open(output_file, "w") as f:
for result in all_results:
f.write(json.dumps(result) + "\n")
return len(all_results)
from airflow.decorators import dag, task
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from datetime import datetime
from kubernetes.client import models as k8s
@dag(
dag_id="llm_fine_tuning",
schedule="@weekly",
start_date=datetime(2025, 1, 1),
catchup=False,
tags=["ai", "training"],
)
def llm_fine_tuning_dag():
@task
def prepare_dataset() -> str:
"""下载并预处理训练数据。"""
# ... 数据准备逻辑
return "s3://my-bucket/training-data/2025-03-01/"
train = KubernetesPodOperator(
task_id="train_model",
name="llm-training-job",
namespace="ml",
image="nvcr.io/nvidia/pytorch:24.05-py3",
cmds=["accelerate", "launch", "-m", "axolotl.cli.train", "/config/config.yaml"],
resources=k8s.V1ResourceRequirements(
limits={"nvidia.com/gpu": "4", "memory": "320Gi"},
requests={"nvidia.com/gpu": "4"},
),
node_selector={"nvidia.com/gpu.product": "A100-SXM4-80GB"},
volumes=[...],
volume_mounts=[...],
get_logs=True,
is_delete_operator_pod=True,
)
@task
def evaluate_model(dataset_path: str) -> dict:
"""运行评估;如果质量下降则使流水线失败。"""
metrics = run_evals()
if metrics["accuracy"] < 0.85:
raise ValueError(f"Model quality too low: {metrics}")
return metrics
@task
def deploy_model(metrics: dict):
"""将合并后的模型推送到 HF Hub 并更新 vLLM 配置。"""
update_serving_config(new_model="org/fine-tuned-v2")
dataset = prepare_dataset()
train.set_upstream(dataset)
eval_result = evaluate_model(dataset)
eval_result.set_upstream(train)
deploy_model(eval_result)
llm_fine_tuning_dag()
from dagster import asset, AssetExecutionContext, define_asset_job, ScheduleDefinition
@asset(description="从知识源获取的原始文档")
def raw_documents(context: AssetExecutionContext) -> list[dict]:
context.log.info("Fetching documents...")
return fetch_all_documents()
@asset(
deps=[raw_documents],
description="分块并嵌入的文档向量",
)
def document_embeddings(context: AssetExecutionContext, raw_documents) -> int:
chunks = process_and_embed(raw_documents)
context.log.info(f"Generated {len(chunks)} embeddings")
upsert_to_qdrant(chunks)
return len(chunks)
@asset(
deps=[document_embeddings],
description="RAG 系统质量指标",
)
def rag_quality_metrics(context: AssetExecutionContext) -> dict:
metrics = evaluate_rag_system()
context.add_output_metadata({"ragas_score": metrics["ragas_score"]})
return metrics
# 调度:每晚刷新嵌入
nightly_refresh = ScheduleDefinition(
job=define_asset_job("rag_refresh_job", [raw_documents, document_embeddings]),
cron_schedule="0 1 * * *",
)
concurrency 限制或在 Airflow 中使用 pool 槽位来遵守外部速率限制。每周安装数
1
代码仓库
GitHub 星标数
13
首次出现
1 天前
安全审计
安装于
zencoder1
amp1
cline1
openclaw1
opencode1
cursor1
Build reliable, observable AI workflows — from document ingestion to batch inference to model training pipelines.
Use this skill when:
| Tool | Best For | Complexity | GPU Jobs |
|---|---|---|---|
| Prefect | Modern Python-first; easy to adopt | Low | Good |
| Airflow | Complex DAGs; large teams; existing usage | High | Good |
| Dagster | Asset-centric; strong data lineage | Medium | Excellent |
| Temporal | Long-running workflows; reliability-first | Medium | Good |
pip install prefect prefect-kubernetes
# Start Prefect server (or use Prefect Cloud)
prefect server start
# In another terminal
prefect worker start --pool default-agent-pool
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta
import hashlib
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=24))
def fetch_documents(source_url: str) -> list[dict]:
"""Fetch documents from source; cached to avoid re-fetching."""
logger = get_run_logger()
logger.info(f"Fetching from {source_url}")
# ... fetch logic
return documents
@task(retries=3, retry_delay_seconds=30)
def chunk_and_embed(documents: list[dict]) -> list[dict]:
"""Chunk documents and generate embeddings with retry on failure."""
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("BAAI/bge-large-en-v1.5")
chunks = []
for doc in documents:
doc_chunks = chunk_text(doc["content"])
embeddings = model.encode(doc_chunks, batch_size=64)
for chunk, emb in zip(doc_chunks, embeddings):
chunks.append({"text": chunk, "embedding": emb.tolist(),
"source": doc["url"], "doc_hash": doc["hash"]})
return chunks
@task(retries=2)
def upsert_to_vector_store(chunks: list[dict]) -> int:
"""Upsert embeddings to Qdrant, skip unchanged documents."""
from qdrant_client import QdrantClient
client = QdrantClient("http://qdrant:6333")
client.upsert(collection_name="knowledge-base", points=[...])
return len(chunks)
@flow(name="rag-ingestion", log_prints=True)
def rag_ingestion_pipeline(sources: list[str]):
"""Full RAG ingestion flow — runs daily."""
logger = get_run_logger()
total = 0
for source in sources:
docs = fetch_documents(source)
chunks = chunk_and_embed(docs)
count = upsert_to_vector_store(chunks)
total += count
logger.info(f"Ingested {count} chunks from {source}")
logger.info(f"Pipeline complete: {total} total chunks indexed")
if __name__ == "__main__":
rag_ingestion_pipeline.serve(
name="daily-rag-ingestion",
cron="0 2 * * *", # 2 AM daily
parameters={"sources": ["https://docs.myapp.com", "https://api.myapp.com/kb"]},
)
from prefect import flow, task
from prefect.concurrency.sync import concurrency
import asyncio
from openai import AsyncOpenAI
@task(retries=3, retry_delay_seconds=60)
async def process_batch(items: list[dict], model: str = "gpt-4o-mini") -> list[dict]:
"""Process a batch of items through LLM with rate limit protection."""
client = AsyncOpenAI()
async with concurrency("openai-api", occupy=len(items)): # rate limit
tasks = [
client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": item["prompt"]}],
max_tokens=256,
)
for item in items
]
responses = await asyncio.gather(*tasks, return_exceptions=True)
results = []
for item, response in zip(items, responses):
if isinstance(response, Exception):
results.append({**item, "error": str(response), "output": None})
else:
results.append({**item, "output": response.choices[0].message.content})
return results
@flow(name="batch-llm-inference")
async def batch_inference_flow(input_file: str, output_file: str, batch_size: int = 50):
import json
items = [json.loads(line) for line in open(input_file)]
batches = [items[i:i+batch_size] for i in range(0, len(items), batch_size)]
all_results = []
for batch in batches:
results = await process_batch(batch)
all_results.extend(results)
with open(output_file, "w") as f:
for result in all_results:
f.write(json.dumps(result) + "\n")
return len(all_results)
from airflow.decorators import dag, task
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from datetime import datetime
from kubernetes.client import models as k8s
@dag(
dag_id="llm_fine_tuning",
schedule="@weekly",
start_date=datetime(2025, 1, 1),
catchup=False,
tags=["ai", "training"],
)
def llm_fine_tuning_dag():
@task
def prepare_dataset() -> str:
"""Download and preprocess training data."""
# ... data prep logic
return "s3://my-bucket/training-data/2025-03-01/"
train = KubernetesPodOperator(
task_id="train_model",
name="llm-training-job",
namespace="ml",
image="nvcr.io/nvidia/pytorch:24.05-py3",
cmds=["accelerate", "launch", "-m", "axolotl.cli.train", "/config/config.yaml"],
resources=k8s.V1ResourceRequirements(
limits={"nvidia.com/gpu": "4", "memory": "320Gi"},
requests={"nvidia.com/gpu": "4"},
),
node_selector={"nvidia.com/gpu.product": "A100-SXM4-80GB"},
volumes=[...],
volume_mounts=[...],
get_logs=True,
is_delete_operator_pod=True,
)
@task
def evaluate_model(dataset_path: str) -> dict:
"""Run evals; fail pipeline if quality drops."""
metrics = run_evals()
if metrics["accuracy"] < 0.85:
raise ValueError(f"Model quality too low: {metrics}")
return metrics
@task
def deploy_model(metrics: dict):
"""Push merged model to HF Hub and update vLLM config."""
update_serving_config(new_model="org/fine-tuned-v2")
dataset = prepare_dataset()
train.set_upstream(dataset)
eval_result = evaluate_model(dataset)
eval_result.set_upstream(train)
deploy_model(eval_result)
llm_fine_tuning_dag()
from dagster import asset, AssetExecutionContext, define_asset_job, ScheduleDefinition
@asset(description="Raw documents fetched from knowledge sources")
def raw_documents(context: AssetExecutionContext) -> list[dict]:
context.log.info("Fetching documents...")
return fetch_all_documents()
@asset(
deps=[raw_documents],
description="Chunked and embedded document vectors",
)
def document_embeddings(context: AssetExecutionContext, raw_documents) -> int:
chunks = process_and_embed(raw_documents)
context.log.info(f"Generated {len(chunks)} embeddings")
upsert_to_qdrant(chunks)
return len(chunks)
@asset(
deps=[document_embeddings],
description="RAG system quality metrics",
)
def rag_quality_metrics(context: AssetExecutionContext) -> dict:
metrics = evaluate_rag_system()
context.add_output_metadata({"ragas_score": metrics["ragas_score"]})
return metrics
# Schedule: refresh embeddings nightly
nightly_refresh = ScheduleDefinition(
job=define_asset_job("rag_refresh_job", [raw_documents, document_embeddings]),
cron_schedule="0 1 * * *",
)
concurrency limits in Prefect or pool slots in Airflow to respect external rate limits.Weekly Installs
1
Repository
GitHub Stars
13
First Seen
1 day ago
Security Audits
Gen Agent Trust HubPassSocketFailSnykWarn
Installed on
zencoder1
amp1
cline1
openclaw1
opencode1
cursor1
Azure RBAC 权限管理工具:查找最小角色、创建自定义角色与自动化分配
117,000 周安装