重要前提
安装AI Skills的关键前提是:必须科学上网,且开启TUN模式,这一点至关重要,直接决定安装能否顺利完成,在此郑重提醒三遍:科学上网,科学上网,科学上网。查看完整安装教程 →
langchain-orchestration by manutej/luxor-claude-marketplace
npx skills add https://github.com/manutej/luxor-claude-marketplace --skill langchain-orchestration使用 LangChain 构建生产级 LLM 应用程序的完整指南,涵盖链、智能体、记忆、RAG 模式和高级编排技术。
LCEL 是 LangChain 中声明式组合链的方式,支持流式、异步和并行执行。
from langchain_core.runnables import RunnablePassthrough
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
# 基础 LCEL 链
prompt = ChatPromptTemplate.from_template("Tell me about {topic}")
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
output_parser = StrOutputParser()
chain = prompt | llm | output_parser
result = chain.invoke({"topic": "quantum computing"})
LangChain 中的每个组件都实现了具有标准方法的 Runnable 接口:
from langchain_core.runnables import RunnablePassthrough
# 关键方法:invoke、stream、batch、ainvoke、astream、abatch
chain = prompt | llm | output_parser
# 同步调用
result = chain.invoke({"topic": "AI"})
# 流式处理
for chunk in chain.stream({"topic": "AI"}):
print(chunk, end="", flush=True)
# 批处理
results = chain.batch([{"topic": "AI"}, {"topic": "ML"}])
# 异步变体
result = await chain.ainvoke({"topic": "AI"})
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
直接传递输入或应用转换:
from langchain_core.runnables import RunnablePassthrough
# 原样传递
chain = RunnablePassthrough() | llm | output_parser
# 带转换
def add_context(x):
return {"text": x["input"], "context": "important"}
chain = RunnablePassthrough.assign(processed=add_context) | llm
通过多个步骤顺序处理数据。
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(temperature=0)
# 步骤 1:生成想法
idea_prompt = ChatPromptTemplate.from_template(
"Generate 3 creative ideas for: {topic}"
)
idea_chain = idea_prompt | llm | StrOutputParser()
# 步骤 2:评估想法
eval_prompt = ChatPromptTemplate.from_template(
"Evaluate these ideas and pick the best one:\n{ideas}"
)
eval_chain = eval_prompt | llm | StrOutputParser()
# 组合成顺序链
sequential_chain = (
{"ideas": idea_chain}
| RunnablePassthrough.assign(evaluation=eval_chain)
)
result = sequential_chain.invoke({"topic": "mobile app"})
并行处理多个输入并合并结果。
from langchain_core.runnables import RunnableParallel
from langchain_core.prompts import ChatPromptTemplate
# 定义并行处理
summary_prompt = ChatPromptTemplate.from_template(
"Summarize this text in one sentence: {text}"
)
keywords_prompt = ChatPromptTemplate.from_template(
"Extract 3 keywords from: {text}"
)
sentiment_prompt = ChatPromptTemplate.from_template(
"Analyze sentiment (positive/negative/neutral): {text}"
)
# 映射:并行处理
map_chain = RunnableParallel(
summary=summary_prompt | llm | StrOutputParser(),
keywords=keywords_prompt | llm | StrOutputParser(),
sentiment=sentiment_prompt | llm | StrOutputParser()
)
# 归约:合并结果
reduce_prompt = ChatPromptTemplate.from_template(
"""Combine the analysis:
Summary: {summary}
Keywords: {keywords}
Sentiment: {sentiment}
Provide a comprehensive report:"""
)
map_reduce_chain = map_chain | reduce_prompt | llm | StrOutputParser()
result = map_reduce_chain.invoke({
"text": "LangChain is an amazing framework for building LLM applications."
})
根据条件将输入路由到不同的链。
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# 定义专用链
technical_prompt = ChatPromptTemplate.from_template(
"Provide a technical explanation of: {query}"
)
simple_prompt = ChatPromptTemplate.from_template(
"Explain in simple terms: {query}"
)
technical_chain = technical_prompt | llm | StrOutputParser()
simple_chain = simple_prompt | llm | StrOutputParser()
# 路由函数
def route_query(input_dict):
query = input_dict["query"]
complexity = input_dict.get("complexity", "simple")
if complexity == "technical":
return technical_chain
return simple_chain
# 创建路由链
from langchain_core.runnables import RunnableLambda
router_chain = RunnableLambda(route_query)
# 使用路由器
result = router_chain.invoke({
"query": "quantum entanglement",
"complexity": "technical"
})
根据条件执行链。
from langchain_core.runnables import RunnableBranch
# 定义基于条件的路由
classification_prompt = ChatPromptTemplate.from_template(
"Classify this as 'question', 'statement', or 'command': {text}"
)
question_handler = ChatPromptTemplate.from_template(
"Answer this question: {text}"
) | llm | StrOutputParser()
statement_handler = ChatPromptTemplate.from_template(
"Acknowledge this statement: {text}"
) | llm | StrOutputParser()
command_handler = ChatPromptTemplate.from_template(
"Execute this command: {text}"
) | llm | StrOutputParser()
# 创建条件分支
branch = RunnableBranch(
(lambda x: "question" in x["type"].lower(), question_handler),
(lambda x: "statement" in x["type"].lower(), statement_handler),
command_handler # 默认
)
# 带分类的完整链
full_chain = (
{"text": RunnablePassthrough(), "type": classification_prompt | llm | StrOutputParser()}
| branch
)
仍然支持的传统链格式:
from langchain.chains import LLMChain
from langchain_core.prompts import PromptTemplate
prompt = PromptTemplate(
input_variables=["product"],
template="What is a good name for a company that makes {product}?"
)
chain = LLMChain(llm=llm, prompt=prompt)
result = chain.run(product="eco-friendly water bottles")
将文档组合到单个上下文中:
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.documents import Document
prompt = ChatPromptTemplate.from_template(
"""Answer based on the following context:
<context>
{context}
</context>
Question: {input}"""
)
document_chain = create_stuff_documents_chain(llm, prompt)
docs = [
Document(page_content="LangChain supports multiple LLM providers."),
Document(page_content="Chains can be composed using LCEL.")
]
result = document_chain.invoke({
"input": "What does LangChain support?",
"context": docs
})
迭代使用工具进行推理和行动的智能体。
from langchain.agents import create_react_agent, AgentExecutor
from langchain_core.tools import Tool
from langchain import hub
# 定义工具
def search_tool(query: str) -> str:
"""Search for information"""
return f"Search results for: {query}"
def calculator_tool(expression: str) -> str:
"""Calculate mathematical expressions"""
try:
return str(eval(expression))
except:
return "Invalid expression"
tools = [
Tool(
name="Search",
func=search_tool,
description="Useful for searching information"
),
Tool(
name="Calculator",
func=calculator_tool,
description="Useful for math calculations"
)
]
# 创建 ReAct 智能体
prompt = hub.pull("hwchase17/react")
agent = create_react_agent(llm, tools, prompt)
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True,
max_iterations=5
)
result = agent_executor.invoke({
"input": "What is 25 * 4, and then search for that number's significance"
})
使用 LangGraph 进行更好控制的现代方法:
from langgraph.prebuilt import create_react_agent
from langchain_core.tools import tool
from langgraph.checkpoint.memory import MemorySaver
@tool
def retrieve(query: str) -> str:
"""Retrieve relevant information from the knowledge base"""
# Your retrieval logic here
return f"Retrieved information for: {query}"
@tool
def analyze(text: str) -> str:
"""Analyze text and provide insights"""
return f"Analysis of: {text}"
# 创建带记忆的智能体
memory = MemorySaver()
agent_executor = create_react_agent(
llm,
[retrieve, analyze],
checkpointer=memory
)
# 使用配置
config = {"configurable": {"thread_id": "abc123"}}
for chunk in agent_executor.stream(
{"messages": [("user", "Find information about LangChain")]},
config=config
):
print(chunk)
具有内置对话记忆的智能体:
from langchain.agents import create_conversational_retrieval_agent
from langchain_core.tools import Tool
tools = [
Tool(
name="Knowledge Base",
func=lambda q: f"KB result: {q}",
description="Search the knowledge base"
)
]
conversational_agent = create_conversational_retrieval_agent(
llm,
tools,
verbose=True
)
# 维护对话上下文
result1 = conversational_agent.invoke({
"input": "What is LangChain?"
})
result2 = conversational_agent.invoke({
"input": "Tell me more about its features"
})
无需示例即可工作的智能体:
from langchain.agents import AgentType, initialize_agent, load_tools
# 加载预构建工具
tools = load_tools(["serpapi", "llm-math"], llm=llm)
agent = initialize_agent(
tools,
llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
verbose=True,
max_iterations=3
)
result = agent.run(
"What is the population of Tokyo and what is that number divided by 2?"
)
使用结构化输入/输出的智能体:
from langchain.agents import create_structured_chat_agent
# 定义具有结构化模式的工具
from pydantic import BaseModel, Field
class SearchInput(BaseModel):
query: str = Field(description="The search query")
max_results: int = Field(default=5, description="Maximum results")
@tool(args_schema=SearchInput)
def structured_search(query: str, max_results: int = 5) -> str:
"""Search with structured parameters"""
return f"Found {max_results} results for: {query}"
tools = [structured_search]
prompt = hub.pull("hwchase17/structured-chat-agent")
agent = create_structured_chat_agent(llm, tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
使用原生工具调用的现代智能体:
from langchain_core.tools import tool
@tool
def multiply(a: int, b: int) -> int:
"""Multiply two numbers"""
return a * b
@tool
def search_database(query: str, limit: int = 10) -> str:
"""Search the database"""
return f"Found {limit} results for {query}"
# 将工具绑定到 LLM
llm_with_tools = llm.bind_tools([multiply, search_database])
# 创建简单的工具链
from operator import itemgetter
tool_chain = llm_with_tools | (lambda x: x.tool_calls[0]["args"]) | multiply
result = tool_chain.invoke("What's four times 23")
存储完整的对话历史:
from langchain.memory import ConversationBufferMemory
from langchain.chains import LLMChain
memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
("placeholder", "{chat_history}"),
("human", "{input}")
])
chain = LLMChain(llm=llm, prompt=prompt, memory=memory)
# 对话自动存储
response1 = chain.run(input="Hi, I'm Alice")
response2 = chain.run(input="What's my name?") # 会记住 Alice
仅保留最近的 K 次交互:
from langchain.memory import ConversationBufferWindowMemory
memory = ConversationBufferWindowMemory(
k=5, # 保留最后 5 次交互
memory_key="chat_history",
return_messages=True
)
chain = LLMChain(llm=llm, prompt=prompt, memory=memory)
总结对话历史:
from langchain.memory import ConversationSummaryMemory
memory = ConversationSummaryMemory(
llm=llm,
memory_key="chat_history",
return_messages=True
)
chain = LLMChain(llm=llm, prompt=prompt, memory=memory)
# 长对话自动摘要
for i in range(20):
chain.run(input=f"Tell me fact {i} about AI")
混合方法:最近消息 + 摘要:
from langchain.memory import ConversationSummaryBufferMemory
memory = ConversationSummaryBufferMemory(
llm=llm,
max_token_limit=100, # 何时触发摘要
memory_key="chat_history",
return_messages=True
)
对话历史的语义搜索:
from langchain.memory import VectorStoreRetrieverMemory
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
vectorstore = FAISS.from_texts([], embeddings)
memory = VectorStoreRetrieverMemory(
retriever=vectorstore.as_retriever(search_kwargs={"k": 5})
)
# 保存上下文
memory.save_context(
{"input": "My favorite color is blue"},
{"output": "That's great!"}
)
# 检索相关上下文
relevant = memory.load_memory_variables({"input": "What's my favorite color?"})
具有保存和搜索功能的结构化记忆:
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_openai import OpenAIEmbeddings
from langchain_core.tools import tool
recall_vector_store = InMemoryVectorStore(OpenAIEmbeddings())
@tool
def save_recall_memory(memory: str) -> str:
"""Save important information to long-term memory"""
recall_vector_store.add_texts([memory])
return f"Saved memory: {memory}"
@tool
def search_recall_memories(query: str) -> str:
"""Search long-term memories"""
docs = recall_vector_store.similarity_search(query, k=3)
return "\n".join([doc.page_content for doc in docs])
# 与智能体一起使用
from langgraph.prebuilt import create_react_agent
agent = create_react_agent(
llm,
[save_recall_memory, search_recall_memories]
)
为记忆定义自定义状态:
from typing import List
from langgraph.graph import MessagesState, StateGraph, START, END
class State(MessagesState):
recall_memories: List[str]
def load_memories(state: State):
"""Load relevant memories before agent processes input"""
messages = state["messages"]
last_message = messages[-1].content if messages else ""
# 搜索相关记忆
docs = recall_vector_store.similarity_search(last_message, k=3)
memories = [doc.page_content for doc in docs]
return {"recall_memories": memories}
# 添加到图
builder = StateGraph(State)
builder.add_node(load_memories)
builder.add_edge(START, "load_memories")
基础的检索增强生成:
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
# 设置向量存储
embeddings = OpenAIEmbeddings()
vectorstore = FAISS.from_texts(
[
"LangChain supports multiple LLM providers including OpenAI, Anthropic, and more.",
"Chains can be composed using LangChain Expression Language (LCEL).",
"Agents can use tools to interact with external systems."
],
embedding=embeddings
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
# RAG 提示
template = """Answer the question based only on the following context:
{context}
Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
# 构建 RAG 链
rag_chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
result = rag_chain.invoke("What does LangChain support?")
使用内置检索链构造函数:
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
prompt = ChatPromptTemplate.from_template(
"""Answer based on the context:
<context>
{context}
</context>
Question: {input}"""
)
document_chain = create_stuff_documents_chain(llm, prompt)
retrieval_chain = create_retrieval_chain(retriever, document_chain)
response = retrieval_chain.invoke({
"input": "What is LCEL?"
})
# 返回:{"input": "...", "context": [...], "answer": "..."}
具有上下文的对话式 RAG:
from langchain.chains import create_history_aware_retriever
from langchain_core.prompts import MessagesPlaceholder
contextualize_prompt = ChatPromptTemplate.from_messages([
("system", "Given a chat history and the latest user question, "
"formulate a standalone question which can be understood "
"without the chat history."),
MessagesPlaceholder("chat_history"),
("human", "{input}")
])
history_aware_retriever = create_history_aware_retriever(
llm,
retriever,
contextualize_prompt
)
# 在 RAG 链中使用
qa_chain = create_retrieval_chain(
history_aware_retriever,
document_chain
)
# 第一个问题
result1 = qa_chain.invoke({
"input": "What is LangChain?",
"chat_history": []
})
# 带上下文的后续问题
result2 = qa_chain.invoke({
"input": "What are its main features?",
"chat_history": [
("human", "What is LangChain?"),
("ai", result1["answer"])
]
})
生成多个搜索查询以获得更好的检索效果:
from langchain.retrievers.multi_query import MultiQueryRetriever
multi_query_retriever = MultiQueryRetriever.from_llm(
retriever=vectorstore.as_retriever(),
llm=llm
)
# 自动生成多个查询变体
rag_chain = (
{"context": multi_query_retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
使用重排提高相关性:
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import FlashrankRerank
# 设置重排器
compressor = FlashrankRerank()
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=retriever
)
# 在 RAG 链中使用
rag_chain = (
{"context": compression_retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
检索更大的父文档以获得完整上下文:
from langchain.retrievers import ParentDocumentRetriever
from langchain.storage import InMemoryStore
from langchain_text_splitters import RecursiveCharacterTextSplitter
# 父文档存储
store = InMemoryStore()
# 分割器
child_splitter = RecursiveCharacterTextSplitter(chunk_size=400)
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000)
parent_retriever = ParentDocumentRetriever(
vectorstore=vectorstore,
docstore=store,
child_splitter=child_splitter,
parent_splitter=parent_splitter,
)
# 添加文档
parent_retriever.add_documents(documents)
自然语言到结构化查询:
from langchain.retrievers.self_query.base import SelfQueryRetriever
from langchain.chains.query_constructor.base import AttributeInfo
metadata_field_info = [
AttributeInfo(
name="source",
description="The document source",
type="string",
),
AttributeInfo(
name="page",
description="The page number",
type="integer",
),
]
document_content_description = "Technical documentation"
self_query_retriever = SelfQueryRetriever.from_llm(
llm,
vectorstore,
document_content_description,
metadata_field_info,
)
from langchain_openai import ChatOpenAI, OpenAI
# 聊天模型
chat_model = ChatOpenAI(
model="gpt-4o-mini",
temperature=0.7,
max_tokens=500,
api_key="your-api-key"
)
# 补全模型
completion_model = OpenAI(
model="gpt-3.5-turbo-instruct",
temperature=0.9
)
from langchain_anthropic import ChatAnthropic
claude = ChatAnthropic(
model="claude-3-5-sonnet-20241022",
temperature=0,
max_tokens=1024,
api_key="your-api-key"
)
from langchain_huggingface import HuggingFaceEndpoint
llm = HuggingFaceEndpoint(
repo_id="meta-llama/Llama-2-7b-chat-hf",
huggingfacehub_api_token="your-token",
task="text-generation",
temperature=0.7
)
from langchain_google_vertexai import ChatVertexAI, VertexAI
# 聊天模型
chat_model = ChatVertexAI(
model_name="chat-bison",
temperature=0
)
# 补全模型
completion_model = VertexAI(
model_name="gemini-1.0-pro-002"
)
from langchain_community.llms import Ollama
llm = Ollama(
model="llama2",
temperature=0.8
)
from langchain_core.tools import tool
@tool
def multiply(a: int, b: int) -> int:
"""Multiply two numbers together"""
return a * b
# 将工具绑定到模型
llm_with_tools = llm.bind_tools([multiply])
# 模型将返回工具调用
response = llm_with_tools.invoke("What is 3 times 4?")
print(response.tool_calls)
跟踪链执行:
from langchain_core.callbacks import StdOutCallbackHandler
from langchain.callbacks import get_openai_callback
# 标准输出回调
callbacks = [StdOutCallbackHandler()]
chain = prompt | llm | StrOutputParser()
result = chain.invoke(
{"topic": "AI"},
config={"callbacks": callbacks}
)
# OpenAI 成本跟踪
with get_openai_callback() as cb:
result = chain.invoke({"topic": "AI"})
print(f"Total Tokens: {cb.total_tokens}")
print(f"Total Cost: ${cb.total_cost}")
创建自定义回调处理器:
from langchain_core.callbacks import BaseCallbackHandler
from typing import Any, Dict
class MyCustomCallback(BaseCallbackHandler):
def on_llm_start(self, serialized: Dict[str, Any], prompts: list[str], **kwargs):
print(f"LLM started with prompts: {prompts}")
def on_llm_end(self, response, **kwargs):
print(f"LLM finished with response: {response}")
def on_chain_start(self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs):
print(f"Chain started with inputs: {inputs}")
def on_chain_end(self, outputs: Dict[str, Any], **kwargs):
print(f"Chain ended with outputs: {outputs}")
def on_tool_start(self, serialized: Dict[str, Any], input_str: str, **kwargs):
print(f"Tool started with input: {input_str}")
def on_tool_end(self, output: str, **kwargs):
print(f"Tool ended with output: {output}")
# 使用自定义回调
custom_callback = MyCustomCallback()
result = chain.invoke(
{"topic": "AI"},
config={"callbacks": [custom_callback]}
)
跟踪并记录到 Argilla:
from langchain_community.callbacks import ArgillaCallbackHandler
argilla_callback = ArgillaCallbackHandler(
dataset_name="langchain-dataset",
api_url="http://localhost:6900",
api_key="your-api-key"
)
callbacks = [argilla_callback]
agent = initialize_agent(
tools,
llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
callbacks=callbacks
)
agent.run("Who was the first president of the United States?")
RAG 评估和监控:
from langchain_community.callbacks import UpTrainCallbackHandler
uptrain_callback = UpTrainCallbackHandler(
key_type="uptrain",
api_key="your-api-key"
)
config = {"callbacks": [uptrain_callback]}
# 自动评估上下文相关性、事实准确性、完整性
result = rag_chain.invoke("What is LangChain?", config=config)
生产监控和调试:
import os
# 设置环境变量
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-key"
os.environ["LANGCHAIN_PROJECT"] = "my-project"
# 所有链自动追踪
result = chain.invoke({"topic": "AI"})
# 在 smith.langchain.com 查看追踪
基础相似性搜索:
from langchain_community.vectorstores import FAISS, Chroma, Pinecone
# FAISS
faiss_retriever = vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 5}
)
# 最大边际相关性 (MMR)
mmr_retriever = vectorstore.as_retriever(
search_type="mmr",
search_kwargs={"k": 5, "fetch_k": 20, "lambda_mult": 0.5}
)
# 带阈值的相似性
threshold_retriever = vectorstore.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={"score_threshold": 0.8, "k": 5}
)
组合多个检索器:
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
# BM25 用于关键词搜索
bm25_retriever = BM25Retriever.from_texts(texts)
bm25_retriever.k = 5
# 与向量搜索结合
ensemble_retriever = EnsembleRetriever(
retrievers=[bm25_retriever, faiss_retriever],
weights=[0.5, 0.5]
)
docs = ensemble_retriever.get_relevant_documents("LangChain features")
优先处理最近的文档:
from langchain.retrievers import TimeWeightedVectorStoreRetriever
retriever = TimeWeightedVectorStoreRetriever(
vectorstore=vectorstore,
decay_rate=0.01, # 旧文档的衰减因子
k=5
)
每个文档多个向量:
from langchain.retrievers.multi_vector import MultiVectorRetriever
from langchain.storage import InMemoryByteStore
store = InMemoryByteStore()
retriever = MultiVectorRetriever(
vectorstore=vectorstore,
byte_store=store,
id_key="doc_id"
)
# 添加具有多种表示的文档
retriever.add_documents(documents)
在生成时流式传输令牌:
from langchain_core.output_parsers import StrOutputParser
chain = prompt | llm | StrOutputParser()
# 流式方法
for chunk in chain.stream({"topic": "AI"}):
print(chunk, end="", flush=True)
处理流式事件:
from langchain_core.callbacks import StreamingStdOutCallbackHandler
streaming_llm = ChatOpenAI(
streaming=True,
callbacks=[StreamingStdOutCallbackHandler()]
)
chain = prompt | streaming_llm | StrOutputParser()
result = chain.invoke({"topic": "AI"}) # 流式传输到 stdout
异步流式传输:
async def stream_async():
async for chunk in chain.astream({"topic": "AI"}):
print(chunk, end="", flush=True)
# 运行异步
import asyncio
asyncio.run(stream_async())
流式智能体执行:
from langgraph.prebuilt import create_react_agent
agent = create_react_agent(llm, tools)
for chunk in agent.stream(
{"messages": [("user", "Search for LangChain information")]},
stream_mode="values"
):
chunk["messages"][-1].pretty_print()
流式 RAG 响应:
retrieval_chain = (
{
"context": retriever.with_config(run_name="Docs"),
"question": RunnablePassthrough(),
}
| prompt
| llm
| StrOutputParser()
)
# 流式传输响应
for chunk in retrieval_chain.stream("What is LangChain?"):
print(chunk, end="", flush=True)
失败时自动重试:
from langchain_core.runnables import RunnableRetry
# 向链添加重试
chain_with_retry = (prompt | llm | StrOutputParser()).with_retry(
stop_after_attempt=3,
wait_exponential_jitter=True
)
result = chain_with_retry.invoke({"topic": "AI"})
出错时使用回退:
from langchain_core.runnables import RunnableWithFallbacks
primary_llm = ChatOpenAI(model="gpt-4")
fallback_llm = ChatOpenAI(model="gpt-3.5-turbo")
chain_with_fallback = (prompt | primary_llm).with_fallbacks(
[prompt | fallback_llm]
)
result = chain_with_fallback.invoke({"topic": "AI"})
手动错误处理:
from langchain_core.exceptions import OutputParserException
try:
result = chain.invoke({"topic": "AI"})
except OutputParserException as e:
print(f"Parsing failed: {e}")
result = chain.invoke({"topic": "AI"}) # 重试
except Exception as e:
print(f"Chain execution failed: {e}")
result = None
设置执行超时:
from langchain_core.runnables import RunnableConfig
config = RunnableConfig(timeout=10.0) # 10 秒
try:
result = chain.invoke({"topic": "AI"}, config=config)
except TimeoutError:
print("Chain execution timed out")
验证输入和输出:
from pydantic import BaseModel, Field, validator
class QueryInput(BaseModel):
topic: str = Field(..., min_length=1, max_length=100)
@validator("topic")
def topic_must_be_valid(cls, v):
if not v.strip():
raise ValueError("Topic cannot be empty")
return v.strip()
# 与链一起使用
def validate_and_invoke(topic: str):
try:
validated = QueryInput(topic=topic)
return chain.invoke({"topic": validated.topic})
except ValueError as e:
return f"Validation error: {e}"
安全管理密钥:
import os
from dotenv import load_dotenv
load_dotenv()
# 使用环境变量
llm = ChatOpenAI(
api_key=os.getenv("OPENAI_API_KEY"),
model=os.getenv("MODEL_NAME", "gpt-4o-mini")
)
# 向量存储配置
VECTOR_STORE_TYPE = os.getenv("VECTOR_STORE", "faiss")
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "text-embedding-3-small")
缓存 LLM 响应:
from langchain.cache import InMemoryCache, SQLiteCache
from langchain.globals import set_llm_cache
# 内存缓存
set_llm
Complete guide for building production-grade LLM applications with LangChain, covering chains, agents, memory, RAG patterns, and advanced orchestration techniques.
LCEL is the declarative way to compose chains in LangChain, enabling streaming, async, and parallel execution.
from langchain_core.runnables import RunnablePassthrough
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
# Basic LCEL chain
prompt = ChatPromptTemplate.from_template("Tell me about {topic}")
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
output_parser = StrOutputParser()
chain = prompt | llm | output_parser
result = chain.invoke({"topic": "quantum computing"})
Every component in LangChain implements the Runnable interface with standard methods:
from langchain_core.runnables import RunnablePassthrough
# Key methods: invoke, stream, batch, ainvoke, astream, abatch
chain = prompt | llm | output_parser
# Synchronous invoke
result = chain.invoke({"topic": "AI"})
# Streaming
for chunk in chain.stream({"topic": "AI"}):
print(chunk, end="", flush=True)
# Batch processing
results = chain.batch([{"topic": "AI"}, {"topic": "ML"}])
# Async variants
result = await chain.ainvoke({"topic": "AI"})
Pass inputs directly through or apply transformations:
from langchain_core.runnables import RunnablePassthrough
# Pass through unchanged
chain = RunnablePassthrough() | llm | output_parser
# With transformation
def add_context(x):
return {"text": x["input"], "context": "important"}
chain = RunnablePassthrough.assign(processed=add_context) | llm
Process data through multiple steps sequentially.
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(temperature=0)
# Step 1: Generate ideas
idea_prompt = ChatPromptTemplate.from_template(
"Generate 3 creative ideas for: {topic}"
)
idea_chain = idea_prompt | llm | StrOutputParser()
# Step 2: Evaluate ideas
eval_prompt = ChatPromptTemplate.from_template(
"Evaluate these ideas and pick the best one:\n{ideas}"
)
eval_chain = eval_prompt | llm | StrOutputParser()
# Combine into sequential chain
sequential_chain = (
{"ideas": idea_chain}
| RunnablePassthrough.assign(evaluation=eval_chain)
)
result = sequential_chain.invoke({"topic": "mobile app"})
Process multiple inputs in parallel and combine results.
from langchain_core.runnables import RunnableParallel
from langchain_core.prompts import ChatPromptTemplate
# Define parallel processing
summary_prompt = ChatPromptTemplate.from_template(
"Summarize this text in one sentence: {text}"
)
keywords_prompt = ChatPromptTemplate.from_template(
"Extract 3 keywords from: {text}"
)
sentiment_prompt = ChatPromptTemplate.from_template(
"Analyze sentiment (positive/negative/neutral): {text}"
)
# Map: Process in parallel
map_chain = RunnableParallel(
summary=summary_prompt | llm | StrOutputParser(),
keywords=keywords_prompt | llm | StrOutputParser(),
sentiment=sentiment_prompt | llm | StrOutputParser()
)
# Reduce: Combine results
reduce_prompt = ChatPromptTemplate.from_template(
"""Combine the analysis:
Summary: {summary}
Keywords: {keywords}
Sentiment: {sentiment}
Provide a comprehensive report:"""
)
map_reduce_chain = map_chain | reduce_prompt | llm | StrOutputParser()
result = map_reduce_chain.invoke({
"text": "LangChain is an amazing framework for building LLM applications."
})
Route inputs to different chains based on conditions.
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# Define specialized chains
technical_prompt = ChatPromptTemplate.from_template(
"Provide a technical explanation of: {query}"
)
simple_prompt = ChatPromptTemplate.from_template(
"Explain in simple terms: {query}"
)
technical_chain = technical_prompt | llm | StrOutputParser()
simple_chain = simple_prompt | llm | StrOutputParser()
# Router function
def route_query(input_dict):
query = input_dict["query"]
complexity = input_dict.get("complexity", "simple")
if complexity == "technical":
return technical_chain
return simple_chain
# Create router chain
from langchain_core.runnables import RunnableLambda
router_chain = RunnableLambda(route_query)
# Use the router
result = router_chain.invoke({
"query": "quantum entanglement",
"complexity": "technical"
})
Execute chains based on conditions.
from langchain_core.runnables import RunnableBranch
# Define condition-based routing
classification_prompt = ChatPromptTemplate.from_template(
"Classify this as 'question', 'statement', or 'command': {text}"
)
question_handler = ChatPromptTemplate.from_template(
"Answer this question: {text}"
) | llm | StrOutputParser()
statement_handler = ChatPromptTemplate.from_template(
"Acknowledge this statement: {text}"
) | llm | StrOutputParser()
command_handler = ChatPromptTemplate.from_template(
"Execute this command: {text}"
) | llm | StrOutputParser()
# Create conditional branch
branch = RunnableBranch(
(lambda x: "question" in x["type"].lower(), question_handler),
(lambda x: "statement" in x["type"].lower(), statement_handler),
command_handler # default
)
# Full chain with classification
full_chain = (
{"text": RunnablePassthrough(), "type": classification_prompt | llm | StrOutputParser()}
| branch
)
Traditional chain format still supported:
from langchain.chains import LLMChain
from langchain_core.prompts import PromptTemplate
prompt = PromptTemplate(
input_variables=["product"],
template="What is a good name for a company that makes {product}?"
)
chain = LLMChain(llm=llm, prompt=prompt)
result = chain.run(product="eco-friendly water bottles")
Combine documents into a single context:
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.documents import Document
prompt = ChatPromptTemplate.from_template(
"""Answer based on the following context:
<context>
{context}
</context>
Question: {input}"""
)
document_chain = create_stuff_documents_chain(llm, prompt)
docs = [
Document(page_content="LangChain supports multiple LLM providers."),
Document(page_content="Chains can be composed using LCEL.")
]
result = document_chain.invoke({
"input": "What does LangChain support?",
"context": docs
})
Reasoning and Acting agents that use tools iteratively.
from langchain.agents import create_react_agent, AgentExecutor
from langchain_core.tools import Tool
from langchain import hub
# Define tools
def search_tool(query: str) -> str:
"""Search for information"""
return f"Search results for: {query}"
def calculator_tool(expression: str) -> str:
"""Calculate mathematical expressions"""
try:
return str(eval(expression))
except:
return "Invalid expression"
tools = [
Tool(
name="Search",
func=search_tool,
description="Useful for searching information"
),
Tool(
name="Calculator",
func=calculator_tool,
description="Useful for math calculations"
)
]
# Create ReAct agent
prompt = hub.pull("hwchase17/react")
agent = create_react_agent(llm, tools, prompt)
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True,
max_iterations=5
)
result = agent_executor.invoke({
"input": "What is 25 * 4, and then search for that number's significance"
})
Modern approach using LangGraph for better control:
from langgraph.prebuilt import create_react_agent
from langchain_core.tools import tool
from langgraph.checkpoint.memory import MemorySaver
@tool
def retrieve(query: str) -> str:
"""Retrieve relevant information from the knowledge base"""
# Your retrieval logic here
return f"Retrieved information for: {query}"
@tool
def analyze(text: str) -> str:
"""Analyze text and provide insights"""
return f"Analysis of: {text}"
# Create agent with memory
memory = MemorySaver()
agent_executor = create_react_agent(
llm,
[retrieve, analyze],
checkpointer=memory
)
# Use with configuration
config = {"configurable": {"thread_id": "abc123"}}
for chunk in agent_executor.stream(
{"messages": [("user", "Find information about LangChain")]},
config=config
):
print(chunk)
Agent with built-in conversation memory:
from langchain.agents import create_conversational_retrieval_agent
from langchain_core.tools import Tool
tools = [
Tool(
name="Knowledge Base",
func=lambda q: f"KB result: {q}",
description="Search the knowledge base"
)
]
conversational_agent = create_conversational_retrieval_agent(
llm,
tools,
verbose=True
)
# Maintains conversation context
result1 = conversational_agent.invoke({
"input": "What is LangChain?"
})
result2 = conversational_agent.invoke({
"input": "Tell me more about its features"
})
Agent that works without examples:
from langchain.agents import AgentType, initialize_agent, load_tools
# Load pre-built tools
tools = load_tools(["serpapi", "llm-math"], llm=llm)
agent = initialize_agent(
tools,
llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
verbose=True,
max_iterations=3
)
result = agent.run(
"What is the population of Tokyo and what is that number divided by 2?"
)
Agent that uses structured input/output:
from langchain.agents import create_structured_chat_agent
# Define tools with structured schemas
from pydantic import BaseModel, Field
class SearchInput(BaseModel):
query: str = Field(description="The search query")
max_results: int = Field(default=5, description="Maximum results")
@tool(args_schema=SearchInput)
def structured_search(query: str, max_results: int = 5) -> str:
"""Search with structured parameters"""
return f"Found {max_results} results for: {query}"
tools = [structured_search]
prompt = hub.pull("hwchase17/structured-chat-agent")
agent = create_structured_chat_agent(llm, tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
Modern agent using native tool calling:
from langchain_core.tools import tool
@tool
def multiply(a: int, b: int) -> int:
"""Multiply two numbers"""
return a * b
@tool
def search_database(query: str, limit: int = 10) -> str:
"""Search the database"""
return f"Found {limit} results for {query}"
# Bind tools to LLM
llm_with_tools = llm.bind_tools([multiply, search_database])
# Create simple tool chain
from operator import itemgetter
tool_chain = llm_with_tools | (lambda x: x.tool_calls[0]["args"]) | multiply
result = tool_chain.invoke("What's four times 23")
Store complete conversation history:
from langchain.memory import ConversationBufferMemory
from langchain.chains import LLMChain
memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
("placeholder", "{chat_history}"),
("human", "{input}")
])
chain = LLMChain(llm=llm, prompt=prompt, memory=memory)
# Conversation is automatically stored
response1 = chain.run(input="Hi, I'm Alice")
response2 = chain.run(input="What's my name?") # Will remember Alice
Keep only recent K interactions:
from langchain.memory import ConversationBufferWindowMemory
memory = ConversationBufferWindowMemory(
k=5, # Keep last 5 interactions
memory_key="chat_history",
return_messages=True
)
chain = LLMChain(llm=llm, prompt=prompt, memory=memory)
Summarize conversation history:
from langchain.memory import ConversationSummaryMemory
memory = ConversationSummaryMemory(
llm=llm,
memory_key="chat_history",
return_messages=True
)
chain = LLMChain(llm=llm, prompt=prompt, memory=memory)
# Long conversations are automatically summarized
for i in range(20):
chain.run(input=f"Tell me fact {i} about AI")
Hybrid approach: recent messages + summary:
from langchain.memory import ConversationSummaryBufferMemory
memory = ConversationSummaryBufferMemory(
llm=llm,
max_token_limit=100, # When to trigger summarization
memory_key="chat_history",
return_messages=True
)
Semantic search over conversation history:
from langchain.memory import VectorStoreRetrieverMemory
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
vectorstore = FAISS.from_texts([], embeddings)
memory = VectorStoreRetrieverMemory(
retriever=vectorstore.as_retriever(search_kwargs={"k": 5})
)
# Save context
memory.save_context(
{"input": "My favorite color is blue"},
{"output": "That's great!"}
)
# Retrieve relevant context
relevant = memory.load_memory_variables({"input": "What's my favorite color?"})
Structured memory with save and search:
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_openai import OpenAIEmbeddings
from langchain_core.tools import tool
recall_vector_store = InMemoryVectorStore(OpenAIEmbeddings())
@tool
def save_recall_memory(memory: str) -> str:
"""Save important information to long-term memory"""
recall_vector_store.add_texts([memory])
return f"Saved memory: {memory}"
@tool
def search_recall_memories(query: str) -> str:
"""Search long-term memories"""
docs = recall_vector_store.similarity_search(query, k=3)
return "\n".join([doc.page_content for doc in docs])
# Use with agent
from langgraph.prebuilt import create_react_agent
agent = create_react_agent(
llm,
[save_recall_memory, search_recall_memories]
)
Define custom state for memory:
from typing import List
from langgraph.graph import MessagesState, StateGraph, START, END
class State(MessagesState):
recall_memories: List[str]
def load_memories(state: State):
"""Load relevant memories before agent processes input"""
messages = state["messages"]
last_message = messages[-1].content if messages else ""
# Search for relevant memories
docs = recall_vector_store.similarity_search(last_message, k=3)
memories = [doc.page_content for doc in docs]
return {"recall_memories": memories}
# Add to graph
builder = StateGraph(State)
builder.add_node(load_memories)
builder.add_edge(START, "load_memories")
Fundamental retrieval-augmented generation:
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
# Setup vector store
embeddings = OpenAIEmbeddings()
vectorstore = FAISS.from_texts(
[
"LangChain supports multiple LLM providers including OpenAI, Anthropic, and more.",
"Chains can be composed using LangChain Expression Language (LCEL).",
"Agents can use tools to interact with external systems."
],
embedding=embeddings
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
# RAG prompt
template = """Answer the question based only on the following context:
{context}
Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
# Build RAG chain
rag_chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
result = rag_chain.invoke("What does LangChain support?")
Using built-in retrieval chain constructor:
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
prompt = ChatPromptTemplate.from_template(
"""Answer based on the context:
<context>
{context}
</context>
Question: {input}"""
)
document_chain = create_stuff_documents_chain(llm, prompt)
retrieval_chain = create_retrieval_chain(retriever, document_chain)
response = retrieval_chain.invoke({
"input": "What is LCEL?"
})
# Returns: {"input": "...", "context": [...], "answer": "..."}
Conversational RAG with context:
from langchain.chains import create_history_aware_retriever
from langchain_core.prompts import MessagesPlaceholder
contextualize_prompt = ChatPromptTemplate.from_messages([
("system", "Given a chat history and the latest user question, "
"formulate a standalone question which can be understood "
"without the chat history."),
MessagesPlaceholder("chat_history"),
("human", "{input}")
])
history_aware_retriever = create_history_aware_retriever(
llm,
retriever,
contextualize_prompt
)
# Use in RAG chain
qa_chain = create_retrieval_chain(
history_aware_retriever,
document_chain
)
# First question
result1 = qa_chain.invoke({
"input": "What is LangChain?",
"chat_history": []
})
# Follow-up with context
result2 = qa_chain.invoke({
"input": "What are its main features?",
"chat_history": [
("human", "What is LangChain?"),
("ai", result1["answer"])
]
})
Generate multiple search queries for better retrieval:
from langchain.retrievers.multi_query import MultiQueryRetriever
multi_query_retriever = MultiQueryRetriever.from_llm(
retriever=vectorstore.as_retriever(),
llm=llm
)
# Automatically generates multiple query variations
rag_chain = (
{"context": multi_query_retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
Improve relevance with reranking:
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import FlashrankRerank
# Setup reranker
compressor = FlashrankRerank()
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=retriever
)
# Use in RAG chain
rag_chain = (
{"context": compression_retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
Retrieve larger parent documents for full context:
from langchain.retrievers import ParentDocumentRetriever
from langchain.storage import InMemoryStore
from langchain_text_splitters import RecursiveCharacterTextSplitter
# Storage for parent documents
store = InMemoryStore()
# Splitters
child_splitter = RecursiveCharacterTextSplitter(chunk_size=400)
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000)
parent_retriever = ParentDocumentRetriever(
vectorstore=vectorstore,
docstore=store,
child_splitter=child_splitter,
parent_splitter=parent_splitter,
)
# Add documents
parent_retriever.add_documents(documents)
Natural language to structured queries:
from langchain.retrievers.self_query.base import SelfQueryRetriever
from langchain.chains.query_constructor.base import AttributeInfo
metadata_field_info = [
AttributeInfo(
name="source",
description="The document source",
type="string",
),
AttributeInfo(
name="page",
description="The page number",
type="integer",
),
]
document_content_description = "Technical documentation"
self_query_retriever = SelfQueryRetriever.from_llm(
llm,
vectorstore,
document_content_description,
metadata_field_info,
)
from langchain_openai import ChatOpenAI, OpenAI
# Chat model
chat_model = ChatOpenAI(
model="gpt-4o-mini",
temperature=0.7,
max_tokens=500,
api_key="your-api-key"
)
# Completion model
completion_model = OpenAI(
model="gpt-3.5-turbo-instruct",
temperature=0.9
)
from langchain_anthropic import ChatAnthropic
claude = ChatAnthropic(
model="claude-3-5-sonnet-20241022",
temperature=0,
max_tokens=1024,
api_key="your-api-key"
)
from langchain_huggingface import HuggingFaceEndpoint
llm = HuggingFaceEndpoint(
repo_id="meta-llama/Llama-2-7b-chat-hf",
huggingfacehub_api_token="your-token",
task="text-generation",
temperature=0.7
)
from langchain_google_vertexai import ChatVertexAI, VertexAI
# Chat model
chat_model = ChatVertexAI(
model_name="chat-bison",
temperature=0
)
# Completion model
completion_model = VertexAI(
model_name="gemini-1.0-pro-002"
)
from langchain_community.llms import Ollama
llm = Ollama(
model="llama2",
temperature=0.8
)
from langchain_core.tools import tool
@tool
def multiply(a: int, b: int) -> int:
"""Multiply two numbers together"""
return a * b
# Bind tools to model
llm_with_tools = llm.bind_tools([multiply])
# Model will return tool calls
response = llm_with_tools.invoke("What is 3 times 4?")
print(response.tool_calls)
Track chain execution:
from langchain_core.callbacks import StdOutCallbackHandler
from langchain.callbacks import get_openai_callback
# Standard output callback
callbacks = [StdOutCallbackHandler()]
chain = prompt | llm | StrOutputParser()
result = chain.invoke(
{"topic": "AI"},
config={"callbacks": callbacks}
)
# OpenAI cost tracking
with get_openai_callback() as cb:
result = chain.invoke({"topic": "AI"})
print(f"Total Tokens: {cb.total_tokens}")
print(f"Total Cost: ${cb.total_cost}")
Create custom callback handlers:
from langchain_core.callbacks import BaseCallbackHandler
from typing import Any, Dict
class MyCustomCallback(BaseCallbackHandler):
def on_llm_start(self, serialized: Dict[str, Any], prompts: list[str], **kwargs):
print(f"LLM started with prompts: {prompts}")
def on_llm_end(self, response, **kwargs):
print(f"LLM finished with response: {response}")
def on_chain_start(self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs):
print(f"Chain started with inputs: {inputs}")
def on_chain_end(self, outputs: Dict[str, Any], **kwargs):
print(f"Chain ended with outputs: {outputs}")
def on_tool_start(self, serialized: Dict[str, Any], input_str: str, **kwargs):
print(f"Tool started with input: {input_str}")
def on_tool_end(self, output: str, **kwargs):
print(f"Tool ended with output: {output}")
# Use custom callback
custom_callback = MyCustomCallback()
result = chain.invoke(
{"topic": "AI"},
config={"callbacks": [custom_callback]}
)
Track and log to Argilla:
from langchain_community.callbacks import ArgillaCallbackHandler
argilla_callback = ArgillaCallbackHandler(
dataset_name="langchain-dataset",
api_url="http://localhost:6900",
api_key="your-api-key"
)
callbacks = [argilla_callback]
agent = initialize_agent(
tools,
llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
callbacks=callbacks
)
agent.run("Who was the first president of the United States?")
RAG evaluation and monitoring:
from langchain_community.callbacks import UpTrainCallbackHandler
uptrain_callback = UpTrainCallbackHandler(
key_type="uptrain",
api_key="your-api-key"
)
config = {"callbacks": [uptrain_callback]}
# Automatically evaluates context relevance, factual accuracy, completeness
result = rag_chain.invoke("What is LangChain?", config=config)
Production monitoring and debugging:
import os
# Set environment variables
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-key"
os.environ["LANGCHAIN_PROJECT"] = "my-project"
# All chains automatically traced
result = chain.invoke({"topic": "AI"})
# View traces at smith.langchain.com
Basic similarity search:
from langchain_community.vectorstores import FAISS, Chroma, Pinecone
# FAISS
faiss_retriever = vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 5}
)
# Maximum Marginal Relevance (MMR)
mmr_retriever = vectorstore.as_retriever(
search_type="mmr",
search_kwargs={"k": 5, "fetch_k": 20, "lambda_mult": 0.5}
)
# Similarity with threshold
threshold_retriever = vectorstore.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={"score_threshold": 0.8, "k": 5}
)
Combine multiple retrievers:
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
# BM25 for keyword search
bm25_retriever = BM25Retriever.from_texts(texts)
bm25_retriever.k = 5
# Combine with vector search
ensemble_retriever = EnsembleRetriever(
retrievers=[bm25_retriever, faiss_retriever],
weights=[0.5, 0.5]
)
docs = ensemble_retriever.get_relevant_documents("LangChain features")
Prioritize recent documents:
from langchain.retrievers import TimeWeightedVectorStoreRetriever
retriever = TimeWeightedVectorStoreRetriever(
vectorstore=vectorstore,
decay_rate=0.01, # Decay factor for older docs
k=5
)
Multiple vectors per document:
from langchain.retrievers.multi_vector import MultiVectorRetriever
from langchain.storage import InMemoryByteStore
store = InMemoryByteStore()
retriever = MultiVectorRetriever(
vectorstore=vectorstore,
byte_store=store,
id_key="doc_id"
)
# Add documents with multiple representations
retriever.add_documents(documents)
Stream tokens as they're generated:
from langchain_core.output_parsers import StrOutputParser
chain = prompt | llm | StrOutputParser()
# Stream method
for chunk in chain.stream({"topic": "AI"}):
print(chunk, end="", flush=True)
Handle streaming events:
from langchain_core.callbacks import StreamingStdOutCallbackHandler
streaming_llm = ChatOpenAI(
streaming=True,
callbacks=[StreamingStdOutCallbackHandler()]
)
chain = prompt | streaming_llm | StrOutputParser()
result = chain.invoke({"topic": "AI"}) # Streams to stdout
Stream asynchronously:
async def stream_async():
async for chunk in chain.astream({"topic": "AI"}):
print(chunk, end="", flush=True)
# Run async
import asyncio
asyncio.run(stream_async())
Stream agent execution:
from langgraph.prebuilt import create_react_agent
agent = create_react_agent(llm, tools)
for chunk in agent.stream(
{"messages": [("user", "Search for LangChain information")]},
stream_mode="values"
):
chunk["messages"][-1].pretty_print()
Stream RAG responses:
retrieval_chain = (
{
"context": retriever.with_config(run_name="Docs"),
"question": RunnablePassthrough(),
}
| prompt
| llm
| StrOutputParser()
)
# Stream the response
for chunk in retrieval_chain.stream("What is LangChain?"):
print(chunk, end="", flush=True)
Automatic retries on failure:
from langchain_core.runnables import RunnableRetry
# Add retry to chain
chain_with_retry = (prompt | llm | StrOutputParser()).with_retry(
stop_after_attempt=3,
wait_exponential_jitter=True
)
result = chain_with_retry.invoke({"topic": "AI"})
Use fallback on errors:
from langchain_core.runnables import RunnableWithFallbacks
primary_llm = ChatOpenAI(model="gpt-4")
fallback_llm = ChatOpenAI(model="gpt-3.5-turbo")
chain_with_fallback = (prompt | primary_llm).with_fallbacks(
[prompt | fallback_llm]
)
result = chain_with_fallback.invoke({"topic": "AI"})
Manual error handling:
from langchain_core.exceptions import OutputParserException
try:
result = chain.invoke({"topic": "AI"})
except OutputParserException as e:
print(f"Parsing failed: {e}")
result = chain.invoke({"topic": "AI"}) # Retry
except Exception as e:
print(f"Chain execution failed: {e}")
result = None
Set execution timeouts:
from langchain_core.runnables import RunnableConfig
config = RunnableConfig(timeout=10.0) # 10 seconds
try:
result = chain.invoke({"topic": "AI"}, config=config)
except TimeoutError:
print("Chain execution timed out")
Validate inputs and outputs:
from pydantic import BaseModel, Field, validator
class QueryInput(BaseModel):
topic: str = Field(..., min_length=1, max_length=100)
@validator("topic")
def topic_must_be_valid(cls, v):
if not v.strip():
raise ValueError("Topic cannot be empty")
return v.strip()
# Use with chain
def validate_and_invoke(topic: str):
try:
validated = QueryInput(topic=topic)
return chain.invoke({"topic": validated.topic})
except ValueError as e:
return f"Validation error: {e}"
Manage secrets securely:
import os
from dotenv import load_dotenv
load_dotenv()
# Use environment variables
llm = ChatOpenAI(
api_key=os.getenv("OPENAI_API_KEY"),
model=os.getenv("MODEL_NAME", "gpt-4o-mini")
)
# Vector store configuration
VECTOR_STORE_TYPE = os.getenv("VECTOR_STORE", "faiss")
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "text-embedding-3-small")
Cache LLM responses:
from langchain.cache import InMemoryCache, SQLiteCache
from langchain.globals import set_llm_cache
# In-memory cache
set_llm_cache(InMemoryCache())
# Persistent cache
set_llm_cache(SQLiteCache(database_path=".langchain.db"))
# Responses are cached automatically
result1 = llm.invoke("What is AI?") # Calls API
result2 = llm.invoke("What is AI?") # Uses cache
Control API usage:
from langchain_core.rate_limiters import InMemoryRateLimiter
rate_limiter = InMemoryRateLimiter(
requests_per_second=1,
check_every_n_seconds=0.1,
max_bucket_size=10
)
llm = ChatOpenAI(rate_limiter=rate_limiter)
Process multiple inputs efficiently:
# Batch invoke
inputs = [{"topic": f"Topic {i}"} for i in range(10)]
results = chain.batch(inputs, config={"max_concurrency": 5})
# Async batch
async def batch_process():
results = await chain.abatch(inputs)
return results
Production monitoring:
import logging
from langchain_core.callbacks import BaseCallbackHandler
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ProductionCallback(BaseCallbackHandler):
def on_chain_start(self, serialized, inputs, **kwargs):
logger.info(f"Chain started: {serialized.get('name', 'unknown')}")
def on_chain_end(self, outputs, **kwargs):
logger.info(f"Chain completed successfully")
def on_chain_error(self, error, **kwargs):
logger.error(f"Chain error: {error}")
# Use in production
production_callback = ProductionCallback()
config = {"callbacks": [production_callback]}
Unit test your chains:
import pytest
from langchain_core.messages import HumanMessage, AIMessage
def test_basic_chain():
chain = prompt | llm | StrOutputParser()
result = chain.invoke({"topic": "testing"})
assert isinstance(result, str)
assert len(result) > 0
def test_rag_chain():
result = rag_chain.invoke("What is LangChain?")
assert "LangChain" in result
assert len(result) > 50
@pytest.mark.asyncio
async def test_async_chain():
result = await chain.ainvoke({"topic": "async"})
assert isinstance(result, str)
Optimize chain execution:
# Use appropriate chunk sizes for text splitting
from langchain_text_splitters import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len
)
# Limit retrieval results
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
# Use smaller, faster models where appropriate
fast_llm = ChatOpenAI(model="gpt-4o-mini")
# Enable streaming for better UX
streaming_chain = prompt | fast_llm.with_streaming() | StrOutputParser()
Document your chains:
from langchain_core.runnables import RunnableConfig
class DocumentedChain:
"""
Production RAG chain for technical documentation.
Features:
- Multi-query retrieval for better coverage
- Reranking for improved relevance
- Streaming support
- Error handling with fallbacks
Usage:
chain = DocumentedChain()
result = chain.invoke("Your question here")
"""
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4o-mini")
self.retriever = self._setup_retriever()
self.chain = self._build_chain()
def _setup_retriever(self):
# Setup logic
pass
def _build_chain(self):
# Chain construction
pass
def invoke(self, query: str, config: RunnableConfig = None):
"""Execute the chain with error handling"""
try:
return self.chain.invoke(query, config=config)
except Exception as e:
logger.error(f"Chain execution failed: {e}")
raise
This skill covers comprehensive LangChain orchestration patterns:
For more examples and patterns, see EXAMPLES.md.
Weekly Installs
62
Repository
GitHub Stars
47
First Seen
Jan 22, 2026
Security Audits
Gen Agent Trust HubFailSocketPassSnykWarn
Installed on
opencode48
codex47
gemini-cli46
github-copilot44
cursor43
claude-code41
AI Elements:基于shadcn/ui的AI原生应用组件库,快速构建对话界面
76,800 周安装
Ship技能:AI自主工程师实现代码规范到分支的端到端自动化开发流程
45 周安装
Liquid Glass 采用参考指南:iOS/macOS 界面材质迁移与无障碍优化
188 周安装
TypeScript 最佳实践指南:类型安全、泛型、tsconfig 配置与实战模式
189 周安装
设计令牌专家 | 构建系统化设计基础,实现跨平台视觉一致性
199 周安装
云数据库成本优化指南:AWS/Azure/GCP 降本增效实战技能
189 周安装
Inspira UI - 120+动画Vue/Nuxt组件库 | TailwindCSS v4 + GSAP + Three.js
192 周安装