langchain-framework by bobmatnyc/claude-mpm-skills
npx skills add https://github.com/bobmatnyc/claude-mpm-skills --skill langchain-framework使用 | 操作符构建链的现代可组合语法。
基础链 :
from langchain_anthropic import ChatAnthropic
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# 组件
llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")
prompt = ChatPromptTemplate.from_template("Tell me a joke about {topic}")
output_parser = StrOutputParser()
# 使用 LCEL 组合
chain = prompt | llm | output_parser
# 调用
result = chain.invoke({"topic": "programming"})
为何使用 LCEL :
提示词 :
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
# 简单模板
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
("user", "{input}")
])
# 包含消息历史
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
MessagesPlaceholder(variable_name="history"),
("user", "{input}")
])
# 少样本示例
from langchain_core.prompts import FewShotChatMessagePromptTemplate
examples = [
{"input": "2+2", "output": "4"},
{"input": "3*5", "output": "15"}
]
example_prompt = ChatPromptTemplate.from_messages([
("human", "{input}"),
("ai", "{output}")
])
few_shot_prompt = FewShotChatMessagePromptTemplate(
example_prompt=example_prompt,
examples=examples
)
LLMs :
# Anthropic Claude
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(
model="claude-3-5-sonnet-20241022",
temperature=0.7,
max_tokens=1024,
timeout=60.0
)
# OpenAI
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(
model="gpt-4-turbo-preview",
temperature=0.7
)
# 流式传输
for chunk in llm.stream("Tell me a story"):
print(chunk.content, end="", flush=True)
输出解析器 :
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
# 字符串解析器
str_parser = StrOutputParser()
# JSON 解析器
json_parser = JsonOutputParser()
# 结构化输出
class Person(BaseModel):
name: str = Field(description="Person's name")
age: int = Field(description="Person's age")
parser = PydanticOutputParser(pydantic_object=Person)
prompt = ChatPromptTemplate.from_template(
"Extract person info.\n{format_instructions}\n{query}"
)
chain = prompt | llm | parser
from langchain_community.document_loaders import (
TextLoader,
PyPDFLoader,
DirectoryLoader,
WebBaseLoader
)
# 文本文件
loader = TextLoader("document.txt")
docs = loader.load()
# PDF 文件
loader = PyPDFLoader("document.pdf")
docs = loader.load()
# 文件目录
loader = DirectoryLoader(
"./docs",
glob="**/*.md",
show_progress=True
)
docs = loader.load()
# 网页
loader = WebBaseLoader("https://example.com")
docs = loader.load()
from langchain.text_splitter import (
RecursiveCharacterTextSplitter,
CharacterTextSplitter,
TokenTextSplitter
)
# 递归分割器(推荐)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len,
separators=["\n\n", "\n", " ", ""]
)
chunks = text_splitter.split_documents(docs)
# 基于 Token 的分割
from langchain.text_splitter import TokenTextSplitter
splitter = TokenTextSplitter(
chunk_size=512,
chunk_overlap=50
)
from langchain_community.vectorstores import Chroma, FAISS, Pinecone
from langchain_openai import OpenAIEmbeddings
from langchain_community.embeddings import HuggingFaceEmbeddings
# 嵌入模型
embeddings = OpenAIEmbeddings()
# Chroma(本地,持久化)
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
persist_directory="./chroma_db"
)
# FAISS(本地,内存中)
vectorstore = FAISS.from_documents(
documents=chunks,
embedding=embeddings
)
vectorstore.save_local("./faiss_index")
# Pinecone(云端)
from langchain_community.vectorstores import Pinecone
import pinecone
pinecone.init(api_key="your-key", environment="us-west1-gcp")
vectorstore = Pinecone.from_documents(
documents=chunks,
embedding=embeddings,
index_name="langchain-index"
)
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
# 创建检索器
retriever = vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 4}
)
# RAG 提示词
template = """Answer based on context:
Context: {context}
Question: {question}
Answer:"""
prompt = ChatPromptTemplate.from_template(template)
# 格式化文档
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
# RAG 链
rag_chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
# 查询
answer = rag_chain.invoke("What is LangChain?")
# 多查询检索
from langchain.retrievers.multi_query import MultiQueryRetriever
retriever = MultiQueryRetriever.from_llm(
retriever=vectorstore.as_retriever(),
llm=llm
)
# 上下文压缩
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
compressor = LLMChainExtractor.from_llm(llm)
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=vectorstore.as_retriever()
)
# 父文档检索器
from langchain.retrievers import ParentDocumentRetriever
from langchain.storage import InMemoryStore
store = InMemoryStore()
retriever = ParentDocumentRetriever(
vectorstore=vectorstore,
docstore=store,
child_splitter=text_splitter
)
from langchain.tools import tool
from langchain_core.tools import Tool
# 装饰器方式
@tool
def search_wikipedia(query: str) -> str:
"""Search Wikipedia for information."""
# 实现
return f"Results for: {query}"
# 类方式
from langchain.tools import BaseTool
from pydantic import BaseModel, Field
class CalculatorInput(BaseModel):
expression: str = Field(description="Mathematical expression")
class CalculatorTool(BaseTool):
name = "calculator"
description = "Useful for math calculations"
args_schema = CalculatorInput
def _run(self, expression: str) -> str:
return str(eval(expression))
# 预构建工具
from langchain_community.tools import (
DuckDuckGoSearchRun,
WikipediaQueryRun,
PythonREPLTool
)
search = DuckDuckGoSearchRun()
wikipedia = WikipediaQueryRun()
python_repl = PythonREPLTool()
from langchain.agents import create_react_agent, AgentExecutor
from langchain import hub
# ReAct 智能体(推荐)
prompt = hub.pull("hwchase17/react")
tools = [search_wikipedia, CalculatorTool()]
agent = create_react_agent(llm, tools, prompt)
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True,
max_iterations=3,
handle_parsing_errors=True
)
result = agent_executor.invoke({"input": "What is 2+2 and who invented addition?"})
# 结构化聊天智能体(函数调用)
from langchain.agents import create_structured_chat_agent
agent = create_structured_chat_agent(llm, tools, prompt)
# OpenAI 函数智能体
from langchain.agents import create_openai_functions_agent
agent = create_openai_functions_agent(llm, tools, prompt)
from langchain.memory import ConversationBufferMemory
memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
memory=memory,
verbose=True
)
# 对话循环
while True:
user_input = input("You: ")
if user_input.lower() == "exit":
break
response = agent_executor.invoke({"input": user_input})
print(f"Agent: {response['output']}")
from langchain.memory import (
ConversationBufferMemory,
ConversationBufferWindowMemory,
ConversationSummaryMemory,
ConversationSummaryBufferMemory
)
# 完整对话历史
memory = ConversationBufferMemory(return_messages=True)
# 最近 K 条消息
memory = ConversationBufferWindowMemory(k=5, return_messages=True)
# 摘要化历史
memory = ConversationSummaryMemory(llm=llm, return_messages=True)
# 摘要 + 最近缓冲区
memory = ConversationSummaryBufferMemory(
llm=llm,
max_token_limit=100,
return_messages=True
)
from langchain.chains import ConversationChain
conversation = ConversationChain(
llm=llm,
memory=ConversationBufferMemory()
)
conversation.predict(input="Hi, I'm Alice")
conversation.predict(input="What's my name?") # "Alice"
# 带记忆的自定义提示词
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
MessagesPlaceholder(variable_name="history"),
("human", "{input}")
])
chain = prompt | llm | StrOutputParser()
# 手动管理记忆
from langchain_core.messages import HumanMessage, AIMessage
history = []
def chat(user_input):
response = chain.invoke({"input": user_input, "history": history})
history.append(HumanMessage(content=user_input))
history.append(AIMessage(content=response))
return response
from langchain.chains import SequentialChain, LLMChain
# 步骤 1:生成概要
synopsis_chain = LLMChain(
llm=llm,
prompt=ChatPromptTemplate.from_template("Write synopsis for: {title}"),
output_key="synopsis"
)
# 步骤 2:生成评论
review_chain = LLMChain(
llm=llm,
prompt=ChatPromptTemplate.from_template("Review this synopsis: {synopsis}"),
output_key="review"
)
# 组合
overall_chain = SequentialChain(
chains=[synopsis_chain, review_chain],
input_variables=["title"],
output_variables=["synopsis", "review"],
verbose=True
)
result = overall_chain({"title": "AI Revolution"})
from langchain.chains.router import MultiPromptChain
from langchain.chains.router.llm_router import LLMRouterChain, RouterOutputParser
# 定义专用提示词
physics_template = """You are a physics expert. Answer: {input}"""
math_template = """You are a math expert. Answer: {input}"""
prompt_infos = [
{
"name": "physics",
"description": "Good for physics questions",
"prompt_template": physics_template
},
{
"name": "math",
"description": "Good for math questions",
"prompt_template": math_template
}
]
# 创建路由器
from langchain.chains.router.multi_prompt_prompt import MULTI_PROMPT_ROUTER_TEMPLATE
router_template = MULTI_PROMPT_ROUTER_TEMPLATE.format(destinations="\n".join(
[f"{p['name']}: {p['description']}" for p in prompt_infos]
))
router_prompt = ChatPromptTemplate.from_template(router_template)
router_chain = LLMRouterChain.from_llm(llm, router_prompt)
# 构建多提示词链
chain = MultiPromptChain(
router_chain=router_chain,
destination_chains={
"physics": LLMChain(llm=llm, prompt=ChatPromptTemplate.from_template(physics_template)),
"math": LLMChain(llm=llm, prompt=ChatPromptTemplate.from_template(math_template))
},
default_chain=LLMChain(llm=llm, prompt=ChatPromptTemplate.from_template("{input}")),
verbose=True
)
from langchain_core.runnables import RunnableParallel
# 并行执行多个链
parallel_chain = RunnableParallel(
summary=summary_chain,
translation=translation_chain,
sentiment=sentiment_chain
)
result = parallel_chain.invoke({"text": "Long article text..."})
# 返回: {"summary": "...", "translation": "...", "sentiment": "..."}
import asyncio
# 异步调用
async def process():
result = await chain.ainvoke({"input": "Hello"})
return result
# 异步流式传输
async def stream():
async for chunk in chain.astream({"input": "Tell me a story"}):
print(chunk, end="", flush=True)
# 异步批处理
async def batch():
results = await chain.abatch([
{"input": "Question 1"},
{"input": "Question 2"}
])
return results
# 运行
asyncio.run(process())
from langchain_core.runnables import RunnablePassthrough
async def process_documents(docs):
# 并发处理多个文档
tasks = [chain.ainvoke({"doc": doc}) for doc in docs]
results = await asyncio.gather(*tasks)
return results
# 带速率限制
from langchain.callbacks import get_openai_callback
async def process_with_limits(docs, max_concurrent=5):
semaphore = asyncio.Semaphore(max_concurrent)
async def process_one(doc):
async with semaphore:
return await chain.ainvoke({"doc": doc})
tasks = [process_one(doc) for doc in docs]
return await asyncio.gather(*tasks)
import os
# 启用 LangSmith
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-key"
os.environ["LANGCHAIN_PROJECT"] = "my-project"
# 追踪自动捕获所有 LangChain 操作
result = chain.invoke({"input": "Hello"})
# 查看追踪: https://smith.langchain.com
from langsmith import trace
@trace
def my_function(input_text):
# 自定义函数追踪
result = chain.invoke({"input": input_text})
return result
# 添加元数据
from langchain.callbacks import LangChainTracer
tracer = LangChainTracer(
project_name="my-project",
metadata={"environment": "production", "version": "1.0"}
)
chain.invoke({"input": "Hello"}, config={"callbacks": [tracer]})
from langsmith import Client
from langchain.evaluation import load_evaluator
client = Client()
# 创建数据集
dataset = client.create_dataset("my-dataset")
client.create_examples(
inputs=[{"input": "What is AI?"}],
outputs=[{"output": "Artificial Intelligence..."}],
dataset_id=dataset.id
)
# 评估
def predict(input_dict):
return chain.invoke(input_dict)
# 运行评估
results = client.run_on_dataset(
dataset_name="my-dataset",
llm_or_chain_factory=lambda: chain,
evaluation=load_evaluator("qa"),
project_name="my-evaluation"
)
from langchain_core.runnables import RunnableWithFallbacks
# 备用链
primary_llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")
fallback_llm = ChatOpenAI(model="gpt-4-turbo-preview")
chain = (prompt | primary_llm).with_fallbacks([prompt | fallback_llm])
# 重试逻辑
from langchain_core.runnables import RunnableRetry
chain_with_retry = chain.with_retry(
retry_if_exception_type=(RateLimitError,),
wait_exponential_jitter=True,
stop_after_attempt=3
)
# 错误处理
try:
result = chain.invoke({"input": "Hello"})
except Exception as e:
logger.error(f"Chain failed: {e}")
# 优雅处理
from langchain.cache import InMemoryCache, SQLiteCache
from langchain.globals import set_llm_cache
# 内存缓存
set_llm_cache(InMemoryCache())
# 持久化缓存
set_llm_cache(SQLiteCache(database_path=".langchain.db"))
# Redis 缓存
from langchain.cache import RedisCache
import redis
set_llm_cache(RedisCache(redis_=redis.Redis()))
# 语义缓存
from langchain.cache import RedisSemanticCache
from langchain_openai import OpenAIEmbeddings
set_llm_cache(RedisSemanticCache(
redis_url="redis://localhost:6379",
embedding=OpenAIEmbeddings(),
score_threshold=0.8
))
from langchain.llms.base import BaseLLM
from ratelimit import limits, sleep_and_retry
class RateLimitedLLM(BaseLLM):
@sleep_and_retry
@limits(calls=50, period=60) # 每分钟 50 次调用
def _call(self, prompt, stop=None, **kwargs):
return self.llm._call(prompt, stop, **kwargs)
# Token 预算追踪
from langchain.callbacks import get_openai_callback
with get_openai_callback() as cb:
result = chain.invoke({"input": "Hello"})
print(f"Tokens used: {cb.total_tokens}")
print(f"Cost: ${cb.total_cost}")
from langchain.callbacks.base import BaseCallbackHandler
class MetricsCallback(BaseCallbackHandler):
def on_llm_start(self, serialized, prompts, **kwargs):
# 记录 LLM 开始
logger.info(f"LLM started: {prompts}")
def on_llm_end(self, response, **kwargs):
# 记录 LLM 完成
logger.info(f"LLM completed: {response}")
def on_llm_error(self, error, **kwargs):
# 记录错误
logger.error(f"LLM error: {error}")
# 使用回调
chain.invoke({"input": "Hello"}, config={"callbacks": [MetricsCallback()]})
# chains.py
from langchain_anthropic import ChatAnthropic
from langchain_core.prompts import ChatPromptTemplate
def create_summarization_chain():
llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")
prompt = ChatPromptTemplate.from_template("Summarize: {text}")
return prompt | llm
# main.py
from chains import create_summarization_chain
chain = create_summarization_chain()
# config.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
anthropic_api_key: str
langsmith_api_key: str | None = None
langsmith_project: str = "default"
model_name: str = "claude-3-5-sonnet-20241022"
temperature: float = 0.7
class Config:
env_file = ".env"
settings = Settings()
# 在链中使用
llm = ChatAnthropic(
model=settings.model_name,
temperature=settings.temperature,
anthropic_api_key=settings.anthropic_api_key
)
import pytest
from langchain_core.prompts import ChatPromptTemplate
def test_chain_output():
# 使用模拟 LLM 进行测试
from langchain.llms.fake import FakeListLLM
llm = FakeListLLM(responses=["Mocked response"])
prompt = ChatPromptTemplate.from_template("Test: {input}")
chain = prompt | llm
result = chain.invoke({"input": "test"})
assert result == "Mocked response"
# 集成测试
@pytest.mark.integration
def test_real_chain():
chain = create_summarization_chain()
result = chain.invoke({"text": "Long text..."})
assert len(result) > 0
# ❌ 错误:内存无限增长
memory = ConversationBufferMemory()
while True:
chain.invoke({"input": user_input}) # 历史记录从未清除
# ✅ 正确:使用窗口化记忆
memory = ConversationBufferWindowMemory(k=10)
# 或定期清理
if len(memory.chat_memory.messages) > 100:
memory.clear()
# ❌ 错误:检索过多文档
retriever = vectorstore.as_retriever(search_kwargs={"k": 100})
# ✅ 正确:优化 k 值
retriever = vectorstore.as_retriever(search_kwargs={"k": 4})
# ✅ 更好:使用 MMR 增加多样性
retriever = vectorstore.as_retriever(
search_type="mmr",
search_kwargs={"k": 4, "fetch_k": 20}
)
# ❌ 错误:在异步上下文中阻塞
async def process():
result = chain.invoke({"input": "test"}) # 阻塞事件循环
# ✅ 正确:使用异步方法
async def process():
result = await chain.ainvoke({"input": "test"})
# ❌ 错误:解析字符串输出
result = chain.invoke({"input": "Extract name and age"})
# 然后:手动解析结果字符串
# ✅ 正确:使用结构化输出
from langchain.output_parsers import PydanticOutputParser
parser = PydanticOutputParser(pydantic_object=Person)
chain = prompt | llm | parser
result = chain.invoke({"input": "Extract name and age"})
# 返回: Person(name="John", age=30)
# ❌ 错误:每次发送完整上下文
for question in questions:
chain.invoke({"context": long_document, "question": question})
# ✅ 正确:使用 RAG 检索
for question in questions:
relevant_docs = retriever.get_relevant_documents(question)
chain.invoke({"context": relevant_docs, "question": question})
安装 :
pip install langchain langchain-anthropic langchain-openai
pip install chromadb faiss-cpu # 向量存储
pip install langsmith # 可观测性
核心导入 :
from langchain_anthropic import ChatAnthropic
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
基础链 : prompt | llm | parser
RAG 链 : retriever | format_docs | prompt | llm
带记忆 : 在提示词中使用 MessagesPlaceholder + 记忆对象
异步 : 将 invoke 替换为 ainvoke,stream 替换为 astream
调试 : 设置 verbose=True 或启用 LangSmith 追踪
生产环境 : 添加错误处理、缓存、速率限制、监控
每周安装量
79
代码仓库
GitHub 星标数
20
首次出现
Jan 23, 2026
安全审计
安装于
opencode61
gemini-cli60
codex59
cursor58
claude-code57
github-copilot55
Modern composable syntax for building chains with | operator.
Basic Chain :
from langchain_anthropic import ChatAnthropic
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# Components
llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")
prompt = ChatPromptTemplate.from_template("Tell me a joke about {topic}")
output_parser = StrOutputParser()
# Compose with LCEL
chain = prompt | llm | output_parser
# Invoke
result = chain.invoke({"topic": "programming"})
Why LCEL :
Prompts :
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
# Simple template
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
("user", "{input}")
])
# With message history
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
MessagesPlaceholder(variable_name="history"),
("user", "{input}")
])
# Few-shot examples
from langchain_core.prompts import FewShotChatMessagePromptTemplate
examples = [
{"input": "2+2", "output": "4"},
{"input": "3*5", "output": "15"}
]
example_prompt = ChatPromptTemplate.from_messages([
("human", "{input}"),
("ai", "{output}")
])
few_shot_prompt = FewShotChatMessagePromptTemplate(
example_prompt=example_prompt,
examples=examples
)
LLMs :
# Anthropic Claude
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(
model="claude-3-5-sonnet-20241022",
temperature=0.7,
max_tokens=1024,
timeout=60.0
)
# OpenAI
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(
model="gpt-4-turbo-preview",
temperature=0.7
)
# Streaming
for chunk in llm.stream("Tell me a story"):
print(chunk.content, end="", flush=True)
Output Parsers :
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
# String parser
str_parser = StrOutputParser()
# JSON parser
json_parser = JsonOutputParser()
# Structured output
class Person(BaseModel):
name: str = Field(description="Person's name")
age: int = Field(description="Person's age")
parser = PydanticOutputParser(pydantic_object=Person)
prompt = ChatPromptTemplate.from_template(
"Extract person info.\n{format_instructions}\n{query}"
)
chain = prompt | llm | parser
from langchain_community.document_loaders import (
TextLoader,
PyPDFLoader,
DirectoryLoader,
WebBaseLoader
)
# Text files
loader = TextLoader("document.txt")
docs = loader.load()
# PDFs
loader = PyPDFLoader("document.pdf")
docs = loader.load()
# Directory of files
loader = DirectoryLoader(
"./docs",
glob="**/*.md",
show_progress=True
)
docs = loader.load()
# Web pages
loader = WebBaseLoader("https://example.com")
docs = loader.load()
from langchain.text_splitter import (
RecursiveCharacterTextSplitter,
CharacterTextSplitter,
TokenTextSplitter
)
# Recursive splitter (recommended)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len,
separators=["\n\n", "\n", " ", ""]
)
chunks = text_splitter.split_documents(docs)
# Token-aware splitting
from langchain.text_splitter import TokenTextSplitter
splitter = TokenTextSplitter(
chunk_size=512,
chunk_overlap=50
)
from langchain_community.vectorstores import Chroma, FAISS, Pinecone
from langchain_openai import OpenAIEmbeddings
from langchain_community.embeddings import HuggingFaceEmbeddings
# Embeddings
embeddings = OpenAIEmbeddings()
# Chroma (local, persistent)
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
persist_directory="./chroma_db"
)
# FAISS (local, in-memory)
vectorstore = FAISS.from_documents(
documents=chunks,
embedding=embeddings
)
vectorstore.save_local("./faiss_index")
# Pinecone (cloud)
from langchain_community.vectorstores import Pinecone
import pinecone
pinecone.init(api_key="your-key", environment="us-west1-gcp")
vectorstore = Pinecone.from_documents(
documents=chunks,
embedding=embeddings,
index_name="langchain-index"
)
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
# Create retriever
retriever = vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 4}
)
# RAG prompt
template = """Answer based on context:
Context: {context}
Question: {question}
Answer:"""
prompt = ChatPromptTemplate.from_template(template)
# Format documents
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
# RAG chain
rag_chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
# Query
answer = rag_chain.invoke("What is LangChain?")
# Multi-query retrieval
from langchain.retrievers.multi_query import MultiQueryRetriever
retriever = MultiQueryRetriever.from_llm(
retriever=vectorstore.as_retriever(),
llm=llm
)
# Contextual compression
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
compressor = LLMChainExtractor.from_llm(llm)
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=vectorstore.as_retriever()
)
# Parent document retriever
from langchain.retrievers import ParentDocumentRetriever
from langchain.storage import InMemoryStore
store = InMemoryStore()
retriever = ParentDocumentRetriever(
vectorstore=vectorstore,
docstore=store,
child_splitter=text_splitter
)
from langchain.tools import tool
from langchain_core.tools import Tool
# Decorator approach
@tool
def search_wikipedia(query: str) -> str:
"""Search Wikipedia for information."""
# Implementation
return f"Results for: {query}"
# Class approach
from langchain.tools import BaseTool
from pydantic import BaseModel, Field
class CalculatorInput(BaseModel):
expression: str = Field(description="Mathematical expression")
class CalculatorTool(BaseTool):
name = "calculator"
description = "Useful for math calculations"
args_schema = CalculatorInput
def _run(self, expression: str) -> str:
return str(eval(expression))
# Pre-built tools
from langchain_community.tools import (
DuckDuckGoSearchRun,
WikipediaQueryRun,
PythonREPLTool
)
search = DuckDuckGoSearchRun()
wikipedia = WikipediaQueryRun()
python_repl = PythonREPLTool()
from langchain.agents import create_react_agent, AgentExecutor
from langchain import hub
# ReAct agent (recommended)
prompt = hub.pull("hwchase17/react")
tools = [search_wikipedia, CalculatorTool()]
agent = create_react_agent(llm, tools, prompt)
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True,
max_iterations=3,
handle_parsing_errors=True
)
result = agent_executor.invoke({"input": "What is 2+2 and who invented addition?"})
# Structured chat agent (function calling)
from langchain.agents import create_structured_chat_agent
agent = create_structured_chat_agent(llm, tools, prompt)
# OpenAI functions agent
from langchain.agents import create_openai_functions_agent
agent = create_openai_functions_agent(llm, tools, prompt)
from langchain.memory import ConversationBufferMemory
memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
memory=memory,
verbose=True
)
# Conversational loop
while True:
user_input = input("You: ")
if user_input.lower() == "exit":
break
response = agent_executor.invoke({"input": user_input})
print(f"Agent: {response['output']}")
from langchain.memory import (
ConversationBufferMemory,
ConversationBufferWindowMemory,
ConversationSummaryMemory,
ConversationSummaryBufferMemory
)
# Full conversation history
memory = ConversationBufferMemory(return_messages=True)
# Last K messages
memory = ConversationBufferWindowMemory(k=5, return_messages=True)
# Summarized history
memory = ConversationSummaryMemory(llm=llm, return_messages=True)
# Summary + recent buffer
memory = ConversationSummaryBufferMemory(
llm=llm,
max_token_limit=100,
return_messages=True
)
from langchain.chains import ConversationChain
conversation = ConversationChain(
llm=llm,
memory=ConversationBufferMemory()
)
conversation.predict(input="Hi, I'm Alice")
conversation.predict(input="What's my name?") # "Alice"
# Custom prompt with memory
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
MessagesPlaceholder(variable_name="history"),
("human", "{input}")
])
chain = prompt | llm | StrOutputParser()
# Manual memory management
from langchain_core.messages import HumanMessage, AIMessage
history = []
def chat(user_input):
response = chain.invoke({"input": user_input, "history": history})
history.append(HumanMessage(content=user_input))
history.append(AIMessage(content=response))
return response
from langchain.chains import SequentialChain, LLMChain
# Step 1: Generate synopsis
synopsis_chain = LLMChain(
llm=llm,
prompt=ChatPromptTemplate.from_template("Write synopsis for: {title}"),
output_key="synopsis"
)
# Step 2: Generate review
review_chain = LLMChain(
llm=llm,
prompt=ChatPromptTemplate.from_template("Review this synopsis: {synopsis}"),
output_key="review"
)
# Combine
overall_chain = SequentialChain(
chains=[synopsis_chain, review_chain],
input_variables=["title"],
output_variables=["synopsis", "review"],
verbose=True
)
result = overall_chain({"title": "AI Revolution"})
from langchain.chains.router import MultiPromptChain
from langchain.chains.router.llm_router import LLMRouterChain, RouterOutputParser
# Define specialized prompts
physics_template = """You are a physics expert. Answer: {input}"""
math_template = """You are a math expert. Answer: {input}"""
prompt_infos = [
{
"name": "physics",
"description": "Good for physics questions",
"prompt_template": physics_template
},
{
"name": "math",
"description": "Good for math questions",
"prompt_template": math_template
}
]
# Create router
from langchain.chains.router.multi_prompt_prompt import MULTI_PROMPT_ROUTER_TEMPLATE
router_template = MULTI_PROMPT_ROUTER_TEMPLATE.format(destinations="\n".join(
[f"{p['name']}: {p['description']}" for p in prompt_infos]
))
router_prompt = ChatPromptTemplate.from_template(router_template)
router_chain = LLMRouterChain.from_llm(llm, router_prompt)
# Build multi-prompt chain
chain = MultiPromptChain(
router_chain=router_chain,
destination_chains={
"physics": LLMChain(llm=llm, prompt=ChatPromptTemplate.from_template(physics_template)),
"math": LLMChain(llm=llm, prompt=ChatPromptTemplate.from_template(math_template))
},
default_chain=LLMChain(llm=llm, prompt=ChatPromptTemplate.from_template("{input}")),
verbose=True
)
from langchain_core.runnables import RunnableParallel
# Execute multiple chains in parallel
parallel_chain = RunnableParallel(
summary=summary_chain,
translation=translation_chain,
sentiment=sentiment_chain
)
result = parallel_chain.invoke({"text": "Long article text..."})
# Returns: {"summary": "...", "translation": "...", "sentiment": "..."}
import asyncio
# Async invoke
async def process():
result = await chain.ainvoke({"input": "Hello"})
return result
# Async streaming
async def stream():
async for chunk in chain.astream({"input": "Tell me a story"}):
print(chunk, end="", flush=True)
# Async batch
async def batch():
results = await chain.abatch([
{"input": "Question 1"},
{"input": "Question 2"}
])
return results
# Run
asyncio.run(process())
from langchain_core.runnables import RunnablePassthrough
async def process_documents(docs):
# Process multiple documents concurrently
tasks = [chain.ainvoke({"doc": doc}) for doc in docs]
results = await asyncio.gather(*tasks)
return results
# With rate limiting
from langchain.callbacks import get_openai_callback
async def process_with_limits(docs, max_concurrent=5):
semaphore = asyncio.Semaphore(max_concurrent)
async def process_one(doc):
async with semaphore:
return await chain.ainvoke({"doc": doc})
tasks = [process_one(doc) for doc in docs]
return await asyncio.gather(*tasks)
import os
# Enable LangSmith
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-key"
os.environ["LANGCHAIN_PROJECT"] = "my-project"
# Trace automatically captures all LangChain operations
result = chain.invoke({"input": "Hello"})
# View trace at: https://smith.langchain.com
from langsmith import trace
@trace
def my_function(input_text):
# Custom function tracing
result = chain.invoke({"input": input_text})
return result
# Add metadata
from langchain.callbacks import LangChainTracer
tracer = LangChainTracer(
project_name="my-project",
metadata={"environment": "production", "version": "1.0"}
)
chain.invoke({"input": "Hello"}, config={"callbacks": [tracer]})
from langsmith import Client
from langchain.evaluation import load_evaluator
client = Client()
# Create dataset
dataset = client.create_dataset("my-dataset")
client.create_examples(
inputs=[{"input": "What is AI?"}],
outputs=[{"output": "Artificial Intelligence..."}],
dataset_id=dataset.id
)
# Evaluate
def predict(input_dict):
return chain.invoke(input_dict)
# Run evaluation
results = client.run_on_dataset(
dataset_name="my-dataset",
llm_or_chain_factory=lambda: chain,
evaluation=load_evaluator("qa"),
project_name="my-evaluation"
)
from langchain_core.runnables import RunnableWithFallbacks
# Fallback chain
primary_llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")
fallback_llm = ChatOpenAI(model="gpt-4-turbo-preview")
chain = (prompt | primary_llm).with_fallbacks([prompt | fallback_llm])
# Retry logic
from langchain_core.runnables import RunnableRetry
chain_with_retry = chain.with_retry(
retry_if_exception_type=(RateLimitError,),
wait_exponential_jitter=True,
stop_after_attempt=3
)
# Error handling
try:
result = chain.invoke({"input": "Hello"})
except Exception as e:
logger.error(f"Chain failed: {e}")
# Handle gracefully
from langchain.cache import InMemoryCache, SQLiteCache
from langchain.globals import set_llm_cache
# In-memory cache
set_llm_cache(InMemoryCache())
# Persistent cache
set_llm_cache(SQLiteCache(database_path=".langchain.db"))
# Redis cache
from langchain.cache import RedisCache
import redis
set_llm_cache(RedisCache(redis_=redis.Redis()))
# Semantic cache
from langchain.cache import RedisSemanticCache
from langchain_openai import OpenAIEmbeddings
set_llm_cache(RedisSemanticCache(
redis_url="redis://localhost:6379",
embedding=OpenAIEmbeddings(),
score_threshold=0.8
))
from langchain.llms.base import BaseLLM
from ratelimit import limits, sleep_and_retry
class RateLimitedLLM(BaseLLM):
@sleep_and_retry
@limits(calls=50, period=60) # 50 calls per minute
def _call(self, prompt, stop=None, **kwargs):
return self.llm._call(prompt, stop, **kwargs)
# Token budget tracking
from langchain.callbacks import get_openai_callback
with get_openai_callback() as cb:
result = chain.invoke({"input": "Hello"})
print(f"Tokens used: {cb.total_tokens}")
print(f"Cost: ${cb.total_cost}")
from langchain.callbacks.base import BaseCallbackHandler
class MetricsCallback(BaseCallbackHandler):
def on_llm_start(self, serialized, prompts, **kwargs):
# Log LLM start
logger.info(f"LLM started: {prompts}")
def on_llm_end(self, response, **kwargs):
# Log LLM completion
logger.info(f"LLM completed: {response}")
def on_llm_error(self, error, **kwargs):
# Log errors
logger.error(f"LLM error: {error}")
# Use callback
chain.invoke({"input": "Hello"}, config={"callbacks": [MetricsCallback()]})
# chains.py
from langchain_anthropic import ChatAnthropic
from langchain_core.prompts import ChatPromptTemplate
def create_summarization_chain():
llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")
prompt = ChatPromptTemplate.from_template("Summarize: {text}")
return prompt | llm
# main.py
from chains import create_summarization_chain
chain = create_summarization_chain()
# config.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
anthropic_api_key: str
langsmith_api_key: str | None = None
langsmith_project: str = "default"
model_name: str = "claude-3-5-sonnet-20241022"
temperature: float = 0.7
class Config:
env_file = ".env"
settings = Settings()
# Use in chains
llm = ChatAnthropic(
model=settings.model_name,
temperature=settings.temperature,
anthropic_api_key=settings.anthropic_api_key
)
import pytest
from langchain_core.prompts import ChatPromptTemplate
def test_chain_output():
# Use mock LLM for testing
from langchain.llms.fake import FakeListLLM
llm = FakeListLLM(responses=["Mocked response"])
prompt = ChatPromptTemplate.from_template("Test: {input}")
chain = prompt | llm
result = chain.invoke({"input": "test"})
assert result == "Mocked response"
# Integration test
@pytest.mark.integration
def test_real_chain():
chain = create_summarization_chain()
result = chain.invoke({"text": "Long text..."})
assert len(result) > 0
# ❌ WRONG: Memory grows unbounded
memory = ConversationBufferMemory()
while True:
chain.invoke({"input": user_input}) # History never cleared
# ✅ CORRECT: Use windowed memory
memory = ConversationBufferWindowMemory(k=10)
# Or clear periodically
if len(memory.chat_memory.messages) > 100:
memory.clear()
# ❌ WRONG: Retrieving too many documents
retriever = vectorstore.as_retriever(search_kwargs={"k": 100})
# ✅ CORRECT: Optimize k value
retriever = vectorstore.as_retriever(search_kwargs={"k": 4})
# ✅ BETTER: Use MMR for diversity
retriever = vectorstore.as_retriever(
search_type="mmr",
search_kwargs={"k": 4, "fetch_k": 20}
)
# ❌ WRONG: Blocking in async context
async def process():
result = chain.invoke({"input": "test"}) # Blocks event loop
# ✅ CORRECT: Use async methods
async def process():
result = await chain.ainvoke({"input": "test"})
# ❌ WRONG: Parsing string outputs
result = chain.invoke({"input": "Extract name and age"})
# Then: parse result string manually
# ✅ CORRECT: Use structured output
from langchain.output_parsers import PydanticOutputParser
parser = PydanticOutputParser(pydantic_object=Person)
chain = prompt | llm | parser
result = chain.invoke({"input": "Extract name and age"})
# Returns: Person(name="John", age=30)
# ❌ WRONG: Sending full context every time
for question in questions:
chain.invoke({"context": long_document, "question": question})
# ✅ CORRECT: Use RAG retrieval
for question in questions:
relevant_docs = retriever.get_relevant_documents(question)
chain.invoke({"context": relevant_docs, "question": question})
Installation :
pip install langchain langchain-anthropic langchain-openai
pip install chromadb faiss-cpu # Vector stores
pip install langsmith # Observability
Essential Imports :
from langchain_anthropic import ChatAnthropic
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
Basic Chain : prompt | llm | parser
RAG Chain : retriever | format_docs | prompt | llm
With Memory : Use MessagesPlaceholder in prompt + memory object
Async : Replace invoke with ainvoke, stream with astream
Debugging : Set verbose=True or enable LangSmith tracing
Production : Add error handling, caching, rate limiting, monitoring
Weekly Installs
79
Repository
GitHub Stars
20
First Seen
Jan 23, 2026
Security Audits
Gen Agent Trust HubFailSocketPassSnykWarn
Installed on
opencode61
gemini-cli60
codex59
cursor58
claude-code57
github-copilot55
超能力技能使用指南:AI助手技能调用优先级与工作流程详解
52,100 周安装