中间件(middleware)是本次更新中最亮眼的特性,诸多新功能均藉由中间件实现,比如人机交互、动态系统提示词、动态注入上下文等等。中间件是一种钩子函数。通过向工作流中预埋中间件,能够实现工作流的高效拓展和可定制化。
LangChain 通过 装饰器 创建 自定义中间件。
装饰器类型(点击展开)
| DECORATOR | DESCRIPTION |
|---|---|
@before_agent | 在 Agent 执行前执行逻辑 |
@after_agent | 在 Agent 执行后执行逻辑 |
@before_model | 在每次模型调用前执行逻辑 |
@after_model | 在每次模型收到响应后执行逻辑 |
@wrap_model_call | 控制模型的调用过程 |
@wrap_tool_call | 控制工具的调用过程 |
@dynamic_prompt | 动态生成系统提示词 |
@hook_config | 配置钩子行为 |
装饰器类型 决定中间件的执行位置。比如使用 @before_model 装饰器,能够在模型调用前执行特定逻辑。被装饰函数 负责这段特定逻辑的具体实现。这么说可能有点抽象。没关系,本节提供了四个例子,看完你一定能够领悟到中间件的使用方法:
预算控制
消息截断
敏感词过滤
PII 检测
(个人隐私信息检测)
一、预算控制¶
随着对话轮次增加,对话记录也越来越长,从而导致请求费用上升。为了控制预算,可以设定在对话轮次超过某个阈值后,切换到低费率模型。下面我们用自定义中间件实现这个功能。
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain.agents import create_agent
from langchain.agents.middleware import wrap_model_call, ModelRequest, ModelResponse
from langchain.messages import HumanMessage
from langgraph.graph import MessagesState
# 加载模型配置
_ = load_dotenv()
# 低费率模型
basic_model = ChatOpenAI(
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url=os.getenv("DASHSCOPE_BASE_URL"),
model="qwen3-coder-plus",
)
# 高费率模型
advanced_model = ChatOpenAI(
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url=os.getenv("DASHSCOPE_BASE_URL"),
model="qwen3-max",
)由于我们的修改涉及模型推理,@before_model 和 @after_model 在这里已经不够用了。我们选用可以干涉模型调用的 @wrap_model_call 装饰器。具体逻辑由函数 dynamic_model_selection 实现:当历史对话超过 5 条时,自动切换到低费率模型。
@wrap_model_call
def dynamic_model_selection(request: ModelRequest, handler) -> ModelResponse:
"""Choose model based on conversation complexity."""
message_count = len(request.state["messages"])
if message_count > 5:
# Use a basic model for longer conversations
model = basic_model
else:
model = advanced_model
print(f"message_count: {message_count}")
print(f"model_name: {model.model_name}")
return handler(request.override(model=model))
agent = create_agent(
model=advanced_model, # Default model
middleware=[dynamic_model_selection]
)从下面的例子可以看出,当历史对话数 message_count 超过 5 条时,确实从高费率模型 qwen3-max 切换到低费率模型 qwen3-coder-plus。我们成功实现了预算控制功能!
state: MessagesState = {"messages": []}
items = ['汽车', '飞机', '摩托车', '自行车']
for idx, i in enumerate(items):
print(f"\n=== Round {idx+1} ===")
state["messages"] += [HumanMessage(content=f"{i}有几个轮子,请简单回答")]
result = agent.invoke(state)
state["messages"] = result["messages"]
print(f'content: {result["messages"][-1].content}')
=== Round 1 ===
message_count: 1
model_name: qwen3-max
content: 普通汽车通常有4个轮子。
=== Round 2 ===
message_count: 3
model_name: qwen3-max
content: 飞机轮子数量不固定,常见客机一般有6到10个轮子。
=== Round 3 ===
message_count: 5
model_name: qwen3-max
content: 摩托车通常有2个轮子。
=== Round 4 ===
message_count: 7
model_name: qwen3-coder-plus
content: 自行车通常有2个轮子。
二、消息截断¶
LLM 的上下文存在长度限制。一旦超过限制,就需要对上下文进行压缩。在众多处理方案中,消息截断是最简单的。下面我们通过 @before_model 装饰器实现消息截断功能。
from langchain.messages import RemoveMessage
from langgraph.graph.message import REMOVE_ALL_MESSAGES
from langgraph.checkpoint.memory import InMemorySaver
from langchain.agents import create_agent, AgentState
from langchain.agents.middleware import before_model
from langgraph.runtime import Runtime
from langchain_core.runnables import RunnableConfig
from typing import Any我们尝试一种截断策略:在保留最近消息的同时,额外保留第一条消息。在下面的例子中,由于我们在第一条消息中就告诉智能体「我是 bob」,因此它记得我是 bob.
@before_model
def trim_messages(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
"""Keep only the last few messages to fit context window."""
messages = state["messages"]
if len(messages) <= 3:
return None # No changes needed
first_msg = messages[0]
recent_messages = messages[-3:] if len(messages) % 2 == 0 else messages[-4:]
new_messages = [first_msg] + recent_messages
return {
"messages": [
RemoveMessage(id=REMOVE_ALL_MESSAGES),
*new_messages
]
}
agent = create_agent(
basic_model,
middleware=[trim_messages],
checkpointer=InMemorySaver(),
)
config: RunnableConfig = {"configurable": {"thread_id": "1"}}
def agent_invoke(agent):
agent.invoke({"messages": "hi, my name is bob"}, config)
agent.invoke({"messages": "write a short poem about cats"}, config)
agent.invoke({"messages": "now do the same but for dogs"}, config)
final_response = agent.invoke({"messages": "what's my name?"}, config)
final_response["messages"][-1].pretty_print()
agent_invoke(agent)================================== Ai Message ==================================
Your name is Bob! You told me "hi, my name is bob" at the beginning of our conversation.
当然,这个表现不足以说明截断中间件真的生效了。若这个中间件从未生效,也会有这样的结果。为了证明它真的生效了,我们再次修改截断策略。这次只保留最后两条对话记录。如果智能体不记得我是 bob,说明截断中间件确实起作用了。
@before_model
def trim_without_first_message(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
"""Keep only the last few messages to fit context window."""
messages = state["messages"]
return {
"messages": [
RemoveMessage(id=REMOVE_ALL_MESSAGES),
*messages[-2:]
]
}
agent = create_agent(
basic_model,
middleware=[trim_without_first_message],
checkpointer=InMemorySaver(),
)
agent_invoke(agent)================================== Ai Message ==================================
I don't have access to information about you personally, including your name. I only know what you choose to share with me in our conversations. If you'd like to tell me your name, I'd be happy to know it! Otherwise, I'll continue to refer to you as "you" or "my friend."
现在智能体不记得我是谁,说明中间件确实生效了!
三、敏感词过滤¶
护栏(Guardrails)是智能体提供的一类内容安全能力的统称。大模型本身具备一定的内容风控能力,但很容易被突破。搜索「大模型破甲」就能找到此类教程。智能体可以在模型之外,提供额外的安全保护。这是通过工程上的强制检查实现的。
在 LangGraph 中,护栏可以通过中间件轻松实现。下面我们实现一个简单的护栏:若用户的最新消息中包含某些敏感词,智能体将拒绝回答。
from typing import Any
from langchain.agents.middleware import before_agent, AgentState
from langgraph.runtime import Runtime
banned_keywords = ["hack", "exploit", "malware"]
@before_agent(can_jump_to=["end"])
def content_filter(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
"""Deterministic guardrail: Block requests containing banned keywords."""
# Get the first user message
if not state["messages"]:
return None
last_message = state["messages"][-1]
if last_message.type != "human":
return None
content = last_message.content.lower()
# Check for banned keywords
for keyword in banned_keywords:
if keyword in content:
# Block execution before any processing
return {
"messages": [{
"role": "assistant",
"content": "I cannot process requests containing inappropriate content. Please rephrase your request."
}],
"jump_to": "end"
}
return None
agent = create_agent(
model=basic_model,
middleware=[content_filter],
)
# This request will be blocked before any processing
result = agent.invoke({
"messages": [{"role": "user", "content": "How do I hack into a database?"}]
})for message in result["messages"]:
message.pretty_print()================================ Human Message =================================
How do I hack into a database?
================================== Ai Message ==================================
I cannot process requests containing inappropriate content. Please rephrase your request.
四、PII 检测¶
接下来,我们继续编写护栏。PII(Personally Identifiable Information)检测可以发现用户输入中的邮箱、IP、地址、银行卡等隐私信息,并做出处置。
下面的例子来源于生活。我们经常把报错复制给大模型,让它帮忙 debug。但报错中可能包含个人隐私信息。针对这种情况,采用以下两种方法进行处置:
拒绝回答问题
屏蔽隐私信息
from textwrap import dedent
from pydantic import BaseModel, Field
# 可信任的模型,一般是本地模型,为了方便,这里依然使用qwen
trusted_model = ChatOpenAI(
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url=os.getenv("DASHSCOPE_BASE_URL"),
model="qwen3-coder-plus",
)
# 用于格式化智能体输出,若发现敏感信息返回True,没发现返回False
class PiiCheck(BaseModel):
"""Structured output indicating whether text contains PII."""
is_pii: bool = Field(description="Whether the text contains PII")
def message_with_pii(pii_middleware):
agent = create_agent(
model=basic_model,
middleware=[pii_middleware],
)
# This request will be blocked before any processing
result = agent.invoke({
"messages": [{
"role": "user",
"content": dedent(
"""
File "/home/luochang/proj/agent.py", line 53, in my_agent
agent = create_react_agent(
---
报错位置在哪
""").strip()
}]
})
return result🍉 处置方式一:如遇隐私信息,拒绝回复。
@before_agent(can_jump_to=["end"])
def content_blocker(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
"""Deterministic guardrail: Block requests containing banned keywords."""
# Get the first user message
if not state["messages"]:
return None
last_message = state["messages"][-1]
if last_message.type != "human":
return None
content = last_message.content.lower()
prompt = (
"你是一个隐私保护助手。请识别下面文本中涉及个人可识别信息(PII),"
"例如:姓名、身份证号、护照号、电话号码、邮箱、住址、银行卡号、社交账号、车牌等。"
"特别注意,若代码、文件路径中包含用户名,也应被视为敏感信息。"
"若包含敏感信息,请返回{\"is_pii\": True},否则返回{\"is_pii\": False}。"
"请严格以 json 格式返回,并且只输出 json。文本如下:\n\n" + content
)
pii_agent = trusted_model.with_structured_output(PiiCheck)
result = pii_agent.invoke(prompt)
if result.is_pii is True:
# Block execution before any processing
return {
"messages": [{
"role": "assistant",
"content": "I cannot process requests containing inappropriate content. Please rephrase your request."
}],
"jump_to": "end"
}
else:
print("No PII found")
return Noneresult = message_with_pii(pii_middleware=content_blocker)
for message in result["messages"]:
message.pretty_print()================================ Human Message =================================
File "/home/luochang/proj/agent.py", line 53, in my_agent
agent = create_react_agent(
---
报错位置在哪
================================== Ai Message ==================================
I cannot process requests containing inappropriate content. Please rephrase your request.
🏀 处置方式二:如遇敏感信息,使用一串 ***** 号屏蔽隐私信息。
@before_agent(can_jump_to=["end"])
def content_filter(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
"""Deterministic guardrail: Block requests containing banned keywords."""
# Get the first user message
if not state["messages"]:
return None
last_message = state["messages"][-1]
if last_message.type != "human":
return None
content = last_message.content.lower()
prompt = (
"你是一个隐私保护助手。请识别下面文本中涉及个人可识别信息(PII),"
"例如:姓名、身份证号、护照号、电话号码、邮箱、住址、银行卡号、社交账号、车牌等。"
"特别注意,若代码、文件路径中包含用户名,也应被视为敏感信息。"
"若包含敏感信息,请返回{\"is_pii\": True},否则返回{\"is_pii\": False}。"
"请严格以 json 格式返回,并且只输出 json。文本如下:\n\n" + content
)
pii_agent = trusted_model.with_structured_output(PiiCheck)
result = pii_agent.invoke(prompt)
if result.is_pii is True:
mask_prompt = (
"你是一个隐私保护助手。请将下面文本中的所有个人可识别信息(PII)用星号(*)替换。"
"仅替换敏感片段,其他文本保持不变。"
"只输出处理后的文本,不要任何解释或额外内容。文本如下:\n\n" + last_message.content
)
masked_message = basic_model.invoke(mask_prompt)
return {
"messages": [{
"role": "assistant",
"content": masked_message.content
}]
}
else:
print("No PII found")
return Noneresult = message_with_pii(pii_middleware=content_filter)
for message in result["messages"]:
message.pretty_print()================================ Human Message =================================
File "/home/luochang/proj/agent.py", line 53, in my_agent
agent = create_react_agent(
---
报错位置在哪
================================== Ai Message ==================================
File "/home/*/*/proj/agent.py", line 53, in my_agent
agent = create_react_agent(
---
报错位置在哪
================================== Ai Message ==================================
根据你提供的错误信息,报错位置在:
**文件路径**:`/home/luochang/proj/agent.py`
**代码行数**:第53行
**函数名**:`my_agent`
**具体位置**:`create_react_agent(` 这个函数调用处
报错的具体位置是 `create_react_agent(` 函数调用的这一行。
不过你只提供了错误的位置信息,没有提供具体的错误内容。要完全解决这个问题,我需要知道:
1. **完整的错误信息**(包括错误类型和详细描述)
2. **第53行附近的完整代码**
常见的可能原因包括:
- 参数传递错误
- 导入模块问题
- 依赖库版本不兼容
- 变量未定义等
如果你能提供完整的错误信息和相关代码片段,我可以给出更准确的解决方案。