中间件(middleware)是本次更新中最亮眼的特性,诸多新功能均藉由中间件实现,比如人机交互、动态系统提示词、动态注入上下文等等。中间件是一种钩子函数。通过向工作流中预埋中间件,能够实现工作流的高效拓展和可定制化。
LangChain 通过 装饰器 创建 自定义中间件。
装饰器类型(点击展开)
| DECORATOR | DESCRIPTION |
|---|---|
@before_agent | 在 Agent 执行前执行逻辑 |
@after_agent | 在 Agent 执行后执行逻辑 |
@before_model | 在每次模型调用前执行逻辑 |
@after_model | 在每次模型收到响应后执行逻辑 |
@wrap_model_call | 控制模型的调用过程 |
@wrap_tool_call | 控制工具的调用过程 |
@dynamic_prompt | 动态生成系统提示词 |
@hook_config | 配置钩子行为 |
装饰器类型 决定中间件的执行位置。比如使用 @before_model 装饰器,能够在模型调用前执行特定逻辑。被装饰函数 负责这段特定逻辑的具体实现。这么说可能有点抽象。没关系,本节提供了四个例子,看完你一定能够领悟到中间件的使用方法:
预算控制
消息截断
敏感词过滤
PII 检测
(个人隐私信息检测)
一、预算控制¶
随着对话轮次增加,对话记录也越来越长,从而导致请求费用上升。为了控制预算,可以设定在对话轮次超过某个阈值后,切换到低费率模型。下面我们用自定义中间件实现这个功能。
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain.agents import create_agent
from langchain.agents.middleware import wrap_model_call, ModelRequest, ModelResponse
from langchain.messages import HumanMessage
from langgraph.graph import MessagesState
# 加载模型配置
_ = load_dotenv()
# 低费率模型
basic_model = ChatOpenAI(
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url=os.getenv("DASHSCOPE_BASE_URL"),
model="qwen3-coder-plus",
)
# 高费率模型
advanced_model = ChatOpenAI(
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url=os.getenv("DASHSCOPE_BASE_URL"),
model="qwen3-max",
)由于我们的修改涉及模型推理,@before_model 和 @after_model 在这里已经不够用了。我们选用可以干涉模型调用的 @wrap_model_call 装饰器。具体逻辑由函数 dynamic_model_selection 实现:当历史对话超过 5 条时,自动切换到低费率模型。
@wrap_model_call
def dynamic_model_selection(request: ModelRequest, handler) -> ModelResponse:
"""Choose model based on conversation complexity."""
message_count = len(request.state["messages"])
if message_count > 5:
# Use a basic model for longer conversations
model = basic_model
else:
model = advanced_model
request.model = model
print(f"message_count: {message_count}")
print(f"model_name: {model.model_name}")
return handler(request)
agent = create_agent(
model=advanced_model, # Default model
middleware=[dynamic_model_selection]
)从下面的例子可以看出,当历史对话数 message_count 超过 5 条时,确实从高费率模型 qwen3-max 切换到低费率模型 qwen3-coder-plus。我们成功实现了预算控制功能!
state: MessagesState = {"messages": []}
items = ['汽车', '飞机', '摩托车', '自行车']
for idx, i in enumerate(items):
print(f"\n=== Round {idx+1} ===")
state["messages"] += [HumanMessage(content=f"{i}有几个轮子,请简单回答")]
result = agent.invoke(state)
state["messages"] = result["messages"]
print(f'content: {result["messages"][-1].content}')
=== Round 1 ===
message_count: 1
model_name: qwen3-max
content: 普通汽车通常有4个轮子。
=== Round 2 ===
message_count: 3
model_name: qwen3-max
content: 飞机的轮子数量因机型而异,常见的客机一般有3个起落架(前1个、主起落架2个),总共6到10个轮子。例如,波音737有6个轮子,而大型客机如空客A380有22个轮子。
=== Round 3 ===
message_count: 5
model_name: qwen3-max
content: 摩托车通常有2个轮子。
=== Round 4 ===
message_count: 7
model_name: qwen3-coder-plus
content: 自行车有2个轮子。
二、消息截断¶
LLM 的上下文存在长度限制。一旦超过限制,就需要对上下文进行压缩。在众多处理方案中,消息截断是最简单的。下面我们通过 @before_model 装饰器实现消息截断功能。
from langchain.messages import RemoveMessage
from langgraph.graph.message import REMOVE_ALL_MESSAGES
from langgraph.checkpoint.memory import InMemorySaver
from langchain.agents import create_agent, AgentState
from langchain.agents.middleware import before_model
from langgraph.runtime import Runtime
from langchain_core.runnables import RunnableConfig
from typing import Any我们尝试一种截断策略:在保留最近消息的同时,额外保留第一条消息。在下面的例子中,由于我们在第一条消息中就告诉智能体「我是 bob」,因此它记得我是 bob.
@before_model
def trim_messages(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
"""Keep only the last few messages to fit context window."""
messages = state["messages"]
if len(messages) <= 3:
return None # No changes needed
first_msg = messages[0]
recent_messages = messages[-3:] if len(messages) % 2 == 0 else messages[-4:]
new_messages = [first_msg] + recent_messages
return {
"messages": [
RemoveMessage(id=REMOVE_ALL_MESSAGES),
*new_messages
]
}
agent = create_agent(
basic_model,
middleware=[trim_messages],
checkpointer=InMemorySaver(),
)
config: RunnableConfig = {"configurable": {"thread_id": "1"}}
def agent_invoke(agent):
agent.invoke({"messages": "hi, my name is bob"}, config)
agent.invoke({"messages": "write a short poem about cats"}, config)
agent.invoke({"messages": "now do the same but for dogs"}, config)
final_response = agent.invoke({"messages": "what's my name?"}, config)
final_response["messages"][-1].pretty_print()
agent_invoke(agent)================================== Ai Message ==================================
Your name is Bob! You introduced yourself to me earlier.
当然,这个表现不足以说明截断中间件真的生效了。若这个中间件从未生效,也会有这样的结果。为了证明它真的生效了,我们再次修改截断策略。这次只保留最后两条对话记录。如果智能体不记得我是 bob,说明截断中间件确实起作用了。
@before_model
def trim_without_first_message(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
"""Keep only the last few messages to fit context window."""
messages = state["messages"]
return {
"messages": [
RemoveMessage(id=REMOVE_ALL_MESSAGES),
*messages[-2:]
]
}
agent = create_agent(
basic_model,
middleware=[trim_without_first_message],
checkpointer=InMemorySaver(),
)
agent_invoke(agent)================================== Ai Message ==================================
I don't have access to your name or personal information. I'm an AI assistant designed to help with questions and tasks, but I don't know who you are specifically. If you'd like to share your name, I'd be happy to use it in our conversation!
现在智能体不记得我是谁,说明中间件确实生效了!
三、敏感词过滤¶
护栏(Guardrails)是智能体提供的一类内容安全能力的统称。大模型本身具备一定的内容风控能力,但很容易被突破。搜索「大模型破甲」就能找到此类教程。智能体可以在模型之外,提供额外的安全保护。这是通过工程上的强制检查实现的。
在 LangGraph 中,护栏可以通过中间件轻松实现。下面我们实现一个简单的护栏:若用户的最新消息中包含某些敏感词,智能体将拒绝回答。
from typing import Any
from langchain.agents.middleware import before_agent, AgentState
from langgraph.runtime import Runtime
banned_keywords = ["hack", "exploit", "malware"]
@before_agent(can_jump_to=["end"])
def content_filter(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
"""Deterministic guardrail: Block requests containing banned keywords."""
# Get the first user message
if not state["messages"]:
return None
last_message = state["messages"][-1]
if last_message.type != "human":
return None
content = last_message.content.lower()
# Check for banned keywords
for keyword in banned_keywords:
if keyword in content:
# Block execution before any processing
return {
"messages": [{
"role": "assistant",
"content": "I cannot process requests containing inappropriate content. Please rephrase your request."
}],
"jump_to": "end"
}
return None
agent = create_agent(
model=basic_model,
middleware=[content_filter],
)
# This request will be blocked before any processing
result = agent.invoke({
"messages": [{"role": "user", "content": "How do I hack into a database?"}]
})for message in result["messages"]:
message.pretty_print()================================ Human Message =================================
How do I hack into a database?
================================== Ai Message ==================================
I cannot process requests containing inappropriate content. Please rephrase your request.
四、PII 检测¶
接下来,我们继续编写护栏。PII(Personally Identifiable Information)检测可以发现用户输入中的邮箱、IP、地址、银行卡等隐私信息,并做出处置。
下面的例子来源于生活。我们经常把报错复制给大模型,让它帮忙 debug。但报错中可能包含个人隐私信息。针对这种情况,采用以下两种方法进行处置:
拒绝回答问题
屏蔽隐私信息
from textwrap import dedent
from pydantic import BaseModel, Field
# 可信任的模型,一般是本地模型,为了方便,这里依然使用qwen
trusted_model = ChatOpenAI(
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url=os.getenv("DASHSCOPE_BASE_URL"),
model="qwen3-coder-plus",
)
# 用于格式化智能体输出,若发现敏感信息返回True,没发现返回False
class PiiCheck(BaseModel):
"""Structured output indicating whether text contains PII."""
is_pii: bool = Field(description="Whether the text contains PII")
def message_with_pii(pii_middleware):
agent = create_agent(
model=basic_model,
middleware=[pii_middleware],
)
# This request will be blocked before any processing
result = agent.invoke({
"messages": [{
"role": "user",
"content": dedent(
"""
File "/home/luochang/proj/agent.py", line 53, in my_agent
agent = create_react_agent(
---
报错位置在哪
""").strip()
}]
})
return result🍉 处置方式一:如遇隐私信息,拒绝回复。
@before_agent(can_jump_to=["end"])
def content_blocker(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
"""Deterministic guardrail: Block requests containing banned keywords."""
# Get the first user message
if not state["messages"]:
return None
last_message = state["messages"][-1]
if last_message.type != "human":
return None
content = last_message.content.lower()
prompt = (
"你是一个隐私保护助手。请识别下面文本中涉及个人可识别信息(PII),"
"例如:姓名、身份证号、护照号、电话号码、邮箱、住址、银行卡号、社交账号、车牌等。"
"特别注意,若代码、文件路径中包含用户名,也应被视为敏感信息。"
"若包含敏感信息,请返回{\"is_pii\": True},否则返回{\"is_pii\": False}。"
"请严格以 json 格式返回,并且只输出 json。文本如下:\n\n" + content
)
pii_agent = trusted_model.with_structured_output(PiiCheck)
result = pii_agent.invoke(prompt)
if result.is_pii is True:
# Block execution before any processing
return {
"messages": [{
"role": "assistant",
"content": "I cannot process requests containing inappropriate content. Please rephrase your request."
}],
"jump_to": "end"
}
else:
print("No PII found")
return Noneresult = message_with_pii(pii_middleware=content_blocker)
for message in result["messages"]:
message.pretty_print()================================ Human Message =================================
File "/home/luochang/proj/agent.py", line 53, in my_agent
agent = create_react_agent(
---
报错位置在哪
================================== Ai Message ==================================
I cannot process requests containing inappropriate content. Please rephrase your request.
🏀 处置方式二:如遇敏感信息,使用一串 ***** 号屏蔽隐私信息。
@before_agent(can_jump_to=["end"])
def content_filter(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
"""Deterministic guardrail: Block requests containing banned keywords."""
# Get the first user message
if not state["messages"]:
return None
last_message = state["messages"][-1]
if last_message.type != "human":
return None
content = last_message.content.lower()
prompt = (
"你是一个隐私保护助手。请识别下面文本中涉及个人可识别信息(PII),"
"例如:姓名、身份证号、护照号、电话号码、邮箱、住址、银行卡号、社交账号、车牌等。"
"特别注意,若代码、文件路径中包含用户名,也应被视为敏感信息。"
"若包含敏感信息,请返回{\"is_pii\": True},否则返回{\"is_pii\": False}。"
"请严格以 json 格式返回,并且只输出 json。文本如下:\n\n" + content
)
pii_agent = trusted_model.with_structured_output(PiiCheck)
result = pii_agent.invoke(prompt)
if result.is_pii is True:
mask_prompt = (
"你是一个隐私保护助手。请将下面文本中的所有个人可识别信息(PII)用星号(*)替换。"
"仅替换敏感片段,其他文本保持不变。"
"只输出处理后的文本,不要任何解释或额外内容。文本如下:\n\n" + last_message.content
)
masked_message = basic_model.invoke(mask_prompt)
return {
"messages": [{
"role": "assistant",
"content": masked_message.content
}]
}
else:
print("No PII found")
return Noneresult = message_with_pii(pii_middleware=content_filter)
for message in result["messages"]:
message.pretty_print()================================ Human Message =================================
File "/home/luochang/proj/agent.py", line 53, in my_agent
agent = create_react_agent(
---
报错位置在哪
================================== Ai Message ==================================
File "/home/********/proj/agent.py", line 53, in my_agent
agent = create_react_agent(
---
报错位置在哪
================================== Ai Message ==================================
根据你提供的错误信息,报错位置在:
**文件路径**: `/home/luochang/proj/agent.py`
**行号**: 第 53 行
**函数**: `my_agent` 函数中
**具体代码**: `agent = create_react_agent(` 这一行
---
### 可能的错误原因
`create_react_agent` 是 LangChain 库中的一个函数,用于创建 ReAct(Reasoning + Action)风格的 Agent。报错可能由以下几种情况引起:
1. **缺少必要的参数**:
- `create_react_agent` 需要传入 `llm`(大语言模型)、`tools`(工具列表)等参数。
- 示例:
```python
from langchain.agents import create_react_agent
from langchain.llms import OpenAI
from langchain.tools import Tool
llm = OpenAI()
tools = [Tool(...)]
agent = create_react_agent(llm=llm, tools=tools)
```
2. **参数类型不匹配**:
- 确保传入的 `llm` 和 `tools` 类型正确。
3. **未导入依赖模块**:
- 检查是否已正确安装并导入 `langchain` 相关模块。
4. **LangChain 版本问题**:
- 如果使用的是旧版本或新版本,API 可能有变化,请确认当前版本支持该方法。
---
### 解决建议
请提供完整的报错信息(例如 TypeError、ImportError 等),这样可以更准确地定位问题。同时检查以下内容:
- 是否正确初始化了 LLM 和 Tools?
- 是否遗漏了某些必需参数?
如果你能贴出第 53 行附近的完整代码段和报错堆栈,我可以帮你进一步分析具体问题。