ai-engineer-expert by personamanagmentlayer/pcl
npx skills add https://github.com/personamanagmentlayer/pcl --skill ai-engineer-expert为实施 AI 系统、LLM 集成、提示工程和部署生产级 AI 应用程序提供专家指导。
from openai import AsyncOpenAI
from anthropic import Anthropic
from typing import List, Dict, Optional
import asyncio
class LLMClient:
"""统一的 LLM 客户端,支持回退机制"""
def __init__(self, primary: str = "openai", fallback: str = "anthropic"):
self.openai_client = AsyncOpenAI()
self.anthropic_client = Anthropic()
self.primary = primary
self.fallback = fallback
async def chat_completion(self, messages: List[Dict],
model: str = "gpt-4-turbo",
temperature: float = 0.7,
max_tokens: int = 1000) -> str:
"""支持回退机制的聊天补全"""
try:
if self.primary == "openai":
response = await self.openai_client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens
)
return response.choices[0].message.content
except Exception as e:
print(f"主要提供商失败: {e},尝试回退")
if self.fallback == "anthropic":
response = self.anthropic_client.messages.create(
model="claude-3-5-sonnet-20241022",
messages=messages,
temperature=temperature,
max_tokens=max_tokens
)
return response.content[0].text
async def chat_completion_streaming(self, messages: List[Dict],
model: str = "gpt-4-turbo"):
"""流式聊天补全"""
stream = await self.openai_client.chat.completions.create(
model=model,
messages=messages,
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
async def function_calling(self, messages: List[Dict],
tools: List[Dict]) -> Dict:
"""带工具的函数调用"""
response = await self.openai_client.chat.completions.create(
model="gpt-4-turbo",
messages=messages,
tools=tools,
tool_choice="auto"
)
message = response.choices[0].message
if message.tool_calls:
return {
"type": "function_call",
"function": message.tool_calls[0].function.name,
"arguments": message.tool_calls[0].function.arguments
}
else:
return {
"type": "message",
"content": message.content
}
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.chains import RetrievalQA
from langchain_openai import ChatOpenAI
class RAGSystem:
"""检索增强生成系统"""
def __init__(self, persist_directory: str = "./chroma_db"):
self.embeddings = OpenAIEmbeddings()
self.vectorstore = None
self.persist_directory = persist_directory
self.llm = ChatOpenAI(model="gpt-4-turbo", temperature=0)
def ingest_documents(self, documents: List[str]):
"""摄取并索引文档"""
# 将文档分割成块
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
chunks = text_splitter.create_documents(documents)
# 创建向量存储
self.vectorstore = Chroma.from_documents(
documents=chunks,
embedding=self.embeddings,
persist_directory=self.persist_directory
)
def query(self, question: str, k: int = 4) -> Dict:
"""使用 RAG 进行查询"""
if not self.vectorstore:
raise ValueError("未摄取任何文档")
# 检索相关文档
retriever = self.vectorstore.as_retriever(
search_kwargs={"k": k}
)
# 创建问答链
qa_chain = RetrievalQA.from_chain_type(
llm=self.llm,
chain_type="stuff",
retriever=retriever,
return_source_documents=True
)
# 获取答案
result = qa_chain({"query": question})
return {
"answer": result["result"],
"sources": [doc.page_content for doc in result["source_documents"]]
}
def similarity_search(self, query: str, k: int = 4) -> List[Dict]:
"""在向量数据库中进行相似性搜索"""
results = self.vectorstore.similarity_search_with_score(query, k=k)
return [
{
"content": doc.page_content,
"score": score,
"metadata": doc.metadata
}
for doc, score in results
]
class PromptTemplate:
"""高级提示模板"""
@staticmethod
def chain_of_thought(question: str) -> str:
"""思维链提示"""
return f"""让我们一步步解决这个问题:
问题:{question}
请仔细思考这个问题:
1. 首先,确定我们需要找到什么
2. 然后,将问题分解为更小的步骤
3. 解决每个步骤
4. 最后,合并结果
您的分步解决方案:"""
@staticmethod
def few_shot(task: str, examples: List[Dict], query: str) -> str:
"""少样本学习提示"""
examples_text = "\n\n".join([
f"输入:{ex['input']}\n输出:{ex['output']}"
for ex in examples
])
return f"""任务:{task}
以下是一些示例:
{examples_text}
现在,请解决这个:
输入:{query}
输出:"""
@staticmethod
def system_message(role: str, constraints: List[str],
format_instructions: str) -> str:
"""系统消息模板"""
constraints_text = "\n".join([f"- {c}" for c in constraints])
return f"""您是一位 {role}。
约束条件:
{constraints_text}
输出格式:
{format_instructions}
请严格遵守这些准则。"""
from typing import Callable
import json
class Tool:
"""智能体可以使用的工具"""
def __init__(self, name: str, description: str, function: Callable):
self.name = name
self.description = description
self.function = function
def to_openai_function(self) -> Dict:
"""转换为 OpenAI 函数格式"""
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.get_parameters()
}
}
class AIAgent:
"""带工具的 AI 智能体"""
def __init__(self, llm_client: LLMClient, tools: List[Tool]):
self.llm = llm_client
self.tools = {tool.name: tool for tool in tools}
self.conversation_history = []
async def run(self, user_input: str, max_iterations: int = 10) -> str:
"""运行智能体并使用工具"""
self.conversation_history.append({
"role": "user",
"content": user_input
})
for i in range(max_iterations):
# 通过函数调用获取 LLM 响应
response = await self.llm.function_calling(
messages=self.conversation_history,
tools=[tool.to_openai_function() for tool in self.tools.values()]
)
if response["type"] == "message":
# 智能体完成
return response["content"]
# 执行工具
tool_name = response["function"]
arguments = json.loads(response["arguments"])
tool_result = await self.execute_tool(tool_name, arguments)
# 将工具结果添加到对话历史中
self.conversation_history.append({
"role": "function",
"name": tool_name,
"content": str(tool_result)
})
return "达到最大迭代次数"
async def execute_tool(self, tool_name: str, arguments: Dict) -> any:
"""执行工具"""
if tool_name not in self.tools:
raise ValueError(f"未找到工具 {tool_name}")
tool = self.tools[tool_name]
return await tool.function(**arguments)
from fastapi import FastAPI, HTTPException, Depends
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from circuitbreaker import circuit
import asyncio
app = FastAPI()
class ChatRequest(BaseModel):
messages: List[Dict]
model: str = "gpt-4-turbo"
stream: bool = False
class RateLimiter:
"""API 速率限制器"""
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = {}
async def check_limit(self, user_id: str) -> bool:
"""检查用户是否在速率限制内"""
import time
now = time.time()
if user_id not in self.requests:
self.requests[user_id] = []
# 移除旧的请求记录
self.requests[user_id] = [
req_time for req_time in self.requests[user_id]
if now - req_time < self.window_seconds
]
if len(self.requests[user_id]) >= self.max_requests:
return False
self.requests[user_id].append(now)
return True
rate_limiter = RateLimiter(max_requests=100, window_seconds=60)
llm_client = LLMClient()
@circuit(failure_threshold=5, recovery_timeout=60)
async def call_llm(messages: List[Dict]) -> str:
"""带熔断器的 LLM 调用"""
return await llm_client.chat_completion(messages)
@app.post("/chat")
async def chat(request: ChatRequest, user_id: str = Depends(get_user_id)):
"""带速率限制的聊天端点"""
# 检查速率限制
if not await rate_limiter.check_limit(user_id):
raise HTTPException(status_code=429, detail="超出速率限制")
try:
if request.stream:
async def generate():
async for chunk in llm_client.chat_completion_streaming(request.messages):
yield chunk
return StreamingResponse(generate(), media_type="text/event-stream")
else:
response = await call_llm(request.messages)
return {"response": response}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
❌ 没有错误处理或回退机制 ❌ 未经验证就暴露原始 LLM 输出 ❌ 没有速率限制或成本控制 ❌ 在代码中存储 API 密钥 ❌ 没有监控或日志记录 ❌ 忽略令牌限制 ❌ 不测试提示
每周安装数
77
仓库
GitHub 星标数
11
首次出现
2026年1月23日
安全审计
安装于
opencode63
codex62
gemini-cli62
cursor60
github-copilot57
kimi-cli49
Expert guidance for implementing AI systems, LLM integration, prompt engineering, and deploying production AI applications.
from openai import AsyncOpenAI
from anthropic import Anthropic
from typing import List, Dict, Optional
import asyncio
class LLMClient:
"""Unified LLM client with fallback"""
def __init__(self, primary: str = "openai", fallback: str = "anthropic"):
self.openai_client = AsyncOpenAI()
self.anthropic_client = Anthropic()
self.primary = primary
self.fallback = fallback
async def chat_completion(self, messages: List[Dict],
model: str = "gpt-4-turbo",
temperature: float = 0.7,
max_tokens: int = 1000) -> str:
"""Chat completion with fallback"""
try:
if self.primary == "openai":
response = await self.openai_client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens
)
return response.choices[0].message.content
except Exception as e:
print(f"Primary provider failed: {e}, trying fallback")
if self.fallback == "anthropic":
response = self.anthropic_client.messages.create(
model="claude-3-5-sonnet-20241022",
messages=messages,
temperature=temperature,
max_tokens=max_tokens
)
return response.content[0].text
async def chat_completion_streaming(self, messages: List[Dict],
model: str = "gpt-4-turbo"):
"""Streaming chat completion"""
stream = await self.openai_client.chat.completions.create(
model=model,
messages=messages,
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
async def function_calling(self, messages: List[Dict],
tools: List[Dict]) -> Dict:
"""Function calling with tools"""
response = await self.openai_client.chat.completions.create(
model="gpt-4-turbo",
messages=messages,
tools=tools,
tool_choice="auto"
)
message = response.choices[0].message
if message.tool_calls:
return {
"type": "function_call",
"function": message.tool_calls[0].function.name,
"arguments": message.tool_calls[0].function.arguments
}
else:
return {
"type": "message",
"content": message.content
}
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.chains import RetrievalQA
from langchain_openai import ChatOpenAI
class RAGSystem:
"""Retrieval-Augmented Generation system"""
def __init__(self, persist_directory: str = "./chroma_db"):
self.embeddings = OpenAIEmbeddings()
self.vectorstore = None
self.persist_directory = persist_directory
self.llm = ChatOpenAI(model="gpt-4-turbo", temperature=0)
def ingest_documents(self, documents: List[str]):
"""Ingest and index documents"""
# Split documents into chunks
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
chunks = text_splitter.create_documents(documents)
# Create vector store
self.vectorstore = Chroma.from_documents(
documents=chunks,
embedding=self.embeddings,
persist_directory=self.persist_directory
)
def query(self, question: str, k: int = 4) -> Dict:
"""Query with RAG"""
if not self.vectorstore:
raise ValueError("No documents ingested")
# Retrieve relevant documents
retriever = self.vectorstore.as_retriever(
search_kwargs={"k": k}
)
# Create QA chain
qa_chain = RetrievalQA.from_chain_type(
llm=self.llm,
chain_type="stuff",
retriever=retriever,
return_source_documents=True
)
# Get answer
result = qa_chain({"query": question})
return {
"answer": result["result"],
"sources": [doc.page_content for doc in result["source_documents"]]
}
def similarity_search(self, query: str, k: int = 4) -> List[Dict]:
"""Similarity search in vector database"""
results = self.vectorstore.similarity_search_with_score(query, k=k)
return [
{
"content": doc.page_content,
"score": score,
"metadata": doc.metadata
}
for doc, score in results
]
class PromptTemplate:
"""Advanced prompt templates"""
@staticmethod
def chain_of_thought(question: str) -> str:
"""Chain-of-thought prompting"""
return f"""Let's solve this step by step:
Question: {question}
Please think through this problem carefully:
1. First, identify what we need to find
2. Then, break down the problem into smaller steps
3. Solve each step
4. Finally, combine the results
Your step-by-step solution:"""
@staticmethod
def few_shot(task: str, examples: List[Dict], query: str) -> str:
"""Few-shot learning prompt"""
examples_text = "\n\n".join([
f"Input: {ex['input']}\nOutput: {ex['output']}"
for ex in examples
])
return f"""Task: {task}
Here are some examples:
{examples_text}
Now, please solve this:
Input: {query}
Output:"""
@staticmethod
def system_message(role: str, constraints: List[str],
format_instructions: str) -> str:
"""System message template"""
constraints_text = "\n".join([f"- {c}" for c in constraints])
return f"""You are a {role}.
Constraints:
{constraints_text}
Output Format:
{format_instructions}
Remember to follow these guidelines strictly."""
from typing import Callable
import json
class Tool:
"""Tool that agents can use"""
def __init__(self, name: str, description: str, function: Callable):
self.name = name
self.description = description
self.function = function
def to_openai_function(self) -> Dict:
"""Convert to OpenAI function format"""
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.get_parameters()
}
}
class AIAgent:
"""AI agent with tools"""
def __init__(self, llm_client: LLMClient, tools: List[Tool]):
self.llm = llm_client
self.tools = {tool.name: tool for tool in tools}
self.conversation_history = []
async def run(self, user_input: str, max_iterations: int = 10) -> str:
"""Run agent with tool use"""
self.conversation_history.append({
"role": "user",
"content": user_input
})
for i in range(max_iterations):
# Get LLM response with function calling
response = await self.llm.function_calling(
messages=self.conversation_history,
tools=[tool.to_openai_function() for tool in self.tools.values()]
)
if response["type"] == "message":
# Agent is done
return response["content"]
# Execute tool
tool_name = response["function"]
arguments = json.loads(response["arguments"])
tool_result = await self.execute_tool(tool_name, arguments)
# Add tool result to conversation
self.conversation_history.append({
"role": "function",
"name": tool_name,
"content": str(tool_result)
})
return "Max iterations reached"
async def execute_tool(self, tool_name: str, arguments: Dict) -> any:
"""Execute a tool"""
if tool_name not in self.tools:
raise ValueError(f"Tool {tool_name} not found")
tool = self.tools[tool_name]
return await tool.function(**arguments)
from fastapi import FastAPI, HTTPException, Depends
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from circuitbreaker import circuit
import asyncio
app = FastAPI()
class ChatRequest(BaseModel):
messages: List[Dict]
model: str = "gpt-4-turbo"
stream: bool = False
class RateLimiter:
"""Rate limiter for API"""
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = {}
async def check_limit(self, user_id: str) -> bool:
"""Check if user is within rate limit"""
import time
now = time.time()
if user_id not in self.requests:
self.requests[user_id] = []
# Remove old requests
self.requests[user_id] = [
req_time for req_time in self.requests[user_id]
if now - req_time < self.window_seconds
]
if len(self.requests[user_id]) >= self.max_requests:
return False
self.requests[user_id].append(now)
return True
rate_limiter = RateLimiter(max_requests=100, window_seconds=60)
llm_client = LLMClient()
@circuit(failure_threshold=5, recovery_timeout=60)
async def call_llm(messages: List[Dict]) -> str:
"""LLM call with circuit breaker"""
return await llm_client.chat_completion(messages)
@app.post("/chat")
async def chat(request: ChatRequest, user_id: str = Depends(get_user_id)):
"""Chat endpoint with rate limiting"""
# Check rate limit
if not await rate_limiter.check_limit(user_id):
raise HTTPException(status_code=429, detail="Rate limit exceeded")
try:
if request.stream:
async def generate():
async for chunk in llm_client.chat_completion_streaming(request.messages):
yield chunk
return StreamingResponse(generate(), media_type="text/event-stream")
else:
response = await call_llm(request.messages)
return {"response": response}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
❌ No error handling or fallbacks ❌ Exposing raw LLM outputs without validation ❌ No rate limiting or cost controls ❌ Storing API keys in code ❌ No monitoring or logging ❌ Ignoring token limits ❌ No testing of prompts
Weekly Installs
77
Repository
GitHub Stars
11
First Seen
Jan 23, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
opencode63
codex62
gemini-cli62
cursor60
github-copilot57
kimi-cli49
并行代理类型契约:防止AI代理代码实现中的类型重复与冲突
242 周安装
Windmill Rust 后端开发最佳实践:错误处理、SQLx、异步与代码规范指南
240 周安装
tldr-deep 函数深度分析工具:五层代码结构、控制流、数据流调试
241 周安装
morph-search:AI驱动的代码库搜索工具,比传统grep快20倍,支持正则表达式和文件编辑
242 周安装
Agentica-Claude代理集成指南:解决权限错误、幻觉问题与SSE流式传输
239 周安装
代码搜索工具决策树:语义搜索LEANN、结构搜索AST-grep、文本搜索Morph与Grep对比指南
240 周安装