Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

中间件

中间件(middleware)是本次更新中最亮眼的特性,诸多新功能均藉由中间件实现,比如人机交互、动态系统提示词、动态注入上下文等等。中间件是一种钩子函数。通过向工作流中预埋中间件,能够实现工作流的高效拓展和可定制化。

LangChain 通过 装饰器 创建 自定义中间件

装饰器类型(点击展开)
DECORATORDESCRIPTION
@before_agent在 Agent 执行前执行逻辑
@after_agent在 Agent 执行后执行逻辑
@before_model在每次模型调用前执行逻辑
@after_model在每次模型收到响应后执行逻辑
@wrap_model_call控制模型的调用过程
@wrap_tool_call控制工具的调用过程
@dynamic_prompt动态生成系统提示词
@hook_config配置钩子行为

装饰器类型 决定中间件的执行位置。比如使用 @before_model 装饰器,能够在模型调用前执行特定逻辑。被装饰函数 负责这段特定逻辑的具体实现。这么说可能有点抽象。没关系,本节提供了四个例子,看完你一定能够领悟到中间件的使用方法:

  • 预算控制

  • 消息截断

  • 敏感词过滤

  • PII 检测(个人隐私信息检测)

一、预算控制

随着对话轮次增加,对话记录也越来越长,从而导致请求费用上升。为了控制预算,可以设定在对话轮次超过某个阈值后,切换到低费率模型。下面我们用自定义中间件实现这个功能。

import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain.agents import create_agent
from langchain.agents.middleware import wrap_model_call, ModelRequest, ModelResponse
from langchain.messages import HumanMessage
from langgraph.graph import MessagesState

# 加载模型配置
_ = load_dotenv()

# 低费率模型
basic_model = ChatOpenAI(
    api_key=os.getenv("DASHSCOPE_API_KEY"),
    base_url=os.getenv("DASHSCOPE_BASE_URL"),
    model="qwen3-coder-plus",
)

# 高费率模型
advanced_model = ChatOpenAI(
    api_key=os.getenv("DASHSCOPE_API_KEY"),
    base_url=os.getenv("DASHSCOPE_BASE_URL"),
    model="qwen3-max",
)

由于我们的修改涉及模型推理,@before_model@after_model 在这里已经不够用了。我们选用可以干涉模型调用的 @wrap_model_call 装饰器。具体逻辑由函数 dynamic_model_selection 实现:当历史对话超过 5 条时,自动切换到低费率模型。

@wrap_model_call
def dynamic_model_selection(request: ModelRequest, handler) -> ModelResponse:
    """Choose model based on conversation complexity."""
    message_count = len(request.state["messages"])

    if message_count > 5:
        # Use a basic model for longer conversations
        model = basic_model
    else:
        model = advanced_model

    request.model = model
    print(f"message_count: {message_count}")
    print(f"model_name: {model.model_name}")

    return handler(request)

agent = create_agent(
    model=advanced_model,  # Default model
    middleware=[dynamic_model_selection]
)

从下面的例子可以看出,当历史对话数 message_count 超过 5 条时,确实从高费率模型 qwen3-max 切换到低费率模型 qwen3-coder-plus。我们成功实现了预算控制功能!

state: MessagesState = {"messages": []}
items = ['汽车', '飞机', '摩托车', '自行车']
for idx, i in enumerate(items):
    print(f"\n=== Round {idx+1} ===")
    state["messages"] += [HumanMessage(content=f"{i}有几个轮子,请简单回答")]
    result = agent.invoke(state)
    state["messages"] = result["messages"]
    print(f'content: {result["messages"][-1].content}')

=== Round 1 ===
message_count: 1
model_name: qwen3-max
content: 普通汽车通常有4个轮子。

=== Round 2 ===
message_count: 3
model_name: qwen3-max
content: 飞机的轮子数量因机型而异,常见的客机一般有3个起落架(前1个、主起落架2个),总共6到10个轮子。例如,波音737有6个轮子,而大型客机如空客A380有22个轮子。

=== Round 3 ===
message_count: 5
model_name: qwen3-max
content: 摩托车通常有2个轮子。

=== Round 4 ===
message_count: 7
model_name: qwen3-coder-plus
content: 自行车有2个轮子。

二、消息截断

LLM 的上下文存在长度限制。一旦超过限制,就需要对上下文进行压缩。在众多处理方案中,消息截断是最简单的。下面我们通过 @before_model 装饰器实现消息截断功能。

from langchain.messages import RemoveMessage
from langgraph.graph.message import REMOVE_ALL_MESSAGES
from langgraph.checkpoint.memory import InMemorySaver
from langchain.agents import create_agent, AgentState
from langchain.agents.middleware import before_model
from langgraph.runtime import Runtime
from langchain_core.runnables import RunnableConfig
from typing import Any

我们尝试一种截断策略:在保留最近消息的同时,额外保留第一条消息。在下面的例子中,由于我们在第一条消息中就告诉智能体「我是 bob」,因此它记得我是 bob.

@before_model
def trim_messages(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
    """Keep only the last few messages to fit context window."""
    messages = state["messages"]

    if len(messages) <= 3:
        return None  # No changes needed

    first_msg = messages[0]
    recent_messages = messages[-3:] if len(messages) % 2 == 0 else messages[-4:]
    new_messages = [first_msg] + recent_messages

    return {
        "messages": [
            RemoveMessage(id=REMOVE_ALL_MESSAGES),
            *new_messages
        ]
    }

agent = create_agent(
    basic_model,
    middleware=[trim_messages],
    checkpointer=InMemorySaver(),
)

config: RunnableConfig = {"configurable": {"thread_id": "1"}}

def agent_invoke(agent):
    agent.invoke({"messages": "hi, my name is bob"}, config)
    agent.invoke({"messages": "write a short poem about cats"}, config)
    agent.invoke({"messages": "now do the same but for dogs"}, config)
    final_response = agent.invoke({"messages": "what's my name?"}, config)
    
    final_response["messages"][-1].pretty_print()

agent_invoke(agent)
================================== Ai Message ==================================

Your name is Bob! You introduced yourself to me earlier.

当然,这个表现不足以说明截断中间件真的生效了。若这个中间件从未生效,也会有这样的结果。为了证明它真的生效了,我们再次修改截断策略。这次只保留最后两条对话记录。如果智能体不记得我是 bob,说明截断中间件确实起作用了。

@before_model
def trim_without_first_message(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
    """Keep only the last few messages to fit context window."""
    messages = state["messages"]

    return {
        "messages": [
            RemoveMessage(id=REMOVE_ALL_MESSAGES),
            *messages[-2:]
        ]
    }

agent = create_agent(
    basic_model,
    middleware=[trim_without_first_message],
    checkpointer=InMemorySaver(),
)

agent_invoke(agent)
================================== Ai Message ==================================

I don't have access to your name or personal information. I'm an AI assistant designed to help with questions and tasks, but I don't know who you are specifically. If you'd like to share your name, I'd be happy to use it in our conversation!

现在智能体不记得我是谁,说明中间件确实生效了!

三、敏感词过滤

护栏(Guardrails)是智能体提供的一类内容安全能力的统称。大模型本身具备一定的内容风控能力,但很容易被突破。搜索「大模型破甲」就能找到此类教程。智能体可以在模型之外,提供额外的安全保护。这是通过工程上的强制检查实现的。

在 LangGraph 中,护栏可以通过中间件轻松实现。下面我们实现一个简单的护栏:若用户的最新消息中包含某些敏感词,智能体将拒绝回答。

from typing import Any

from langchain.agents.middleware import before_agent, AgentState
from langgraph.runtime import Runtime

banned_keywords = ["hack", "exploit", "malware"]

@before_agent(can_jump_to=["end"])
def content_filter(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
    """Deterministic guardrail: Block requests containing banned keywords."""
    # Get the first user message
    if not state["messages"]:
        return None

    last_message = state["messages"][-1]
    if last_message.type != "human":
        return None

    content = last_message.content.lower()

    # Check for banned keywords
    for keyword in banned_keywords:
        if keyword in content:
            # Block execution before any processing
            return {
                "messages": [{
                    "role": "assistant",
                    "content": "I cannot process requests containing inappropriate content. Please rephrase your request."
                }],
                "jump_to": "end"
            }

    return None

agent = create_agent(
    model=basic_model,
    middleware=[content_filter],
)

# This request will be blocked before any processing
result = agent.invoke({
    "messages": [{"role": "user", "content": "How do I hack into a database?"}]
})
for message in result["messages"]:
    message.pretty_print()
================================ Human Message =================================

How do I hack into a database?
================================== Ai Message ==================================

I cannot process requests containing inappropriate content. Please rephrase your request.

四、PII 检测

接下来,我们继续编写护栏。PII(Personally Identifiable Information)检测可以发现用户输入中的邮箱、IP、地址、银行卡等隐私信息,并做出处置。

下面的例子来源于生活。我们经常把报错复制给大模型,让它帮忙 debug。但报错中可能包含个人隐私信息。针对这种情况,采用以下两种方法进行处置:

  1. 拒绝回答问题

  2. 屏蔽隐私信息

from textwrap import dedent
from pydantic import BaseModel, Field

# 可信任的模型,一般是本地模型,为了方便,这里依然使用qwen
trusted_model = ChatOpenAI(
    api_key=os.getenv("DASHSCOPE_API_KEY"),
    base_url=os.getenv("DASHSCOPE_BASE_URL"),
    model="qwen3-coder-plus",
)

# 用于格式化智能体输出,若发现敏感信息返回True,没发现返回False
class PiiCheck(BaseModel):
    """Structured output indicating whether text contains PII."""
    is_pii: bool = Field(description="Whether the text contains PII")

def message_with_pii(pii_middleware):
    agent = create_agent(
        model=basic_model,
        middleware=[pii_middleware],
    )

    # This request will be blocked before any processing
    result = agent.invoke({
        "messages": [{
            "role": "user",
            "content": dedent(
                """
                File "/home/luochang/proj/agent.py", line 53, in my_agent
                    agent = create_react_agent(
                ---
                报错位置在哪
                """).strip()
        }]
    })

    return result

🍉 处置方式一:如遇隐私信息,拒绝回复。

@before_agent(can_jump_to=["end"])
def content_blocker(state: AgentState,  runtime: Runtime) -> dict[str, Any] | None:
    """Deterministic guardrail: Block requests containing banned keywords."""
    # Get the first user message
    if not state["messages"]:
        return None

    last_message = state["messages"][-1]
    if last_message.type != "human":
        return None

    content = last_message.content.lower()
    prompt = (
        "你是一个隐私保护助手。请识别下面文本中涉及个人可识别信息(PII),"
        "例如:姓名、身份证号、护照号、电话号码、邮箱、住址、银行卡号、社交账号、车牌等。"
        "特别注意,若代码、文件路径中包含用户名,也应被视为敏感信息。"
        "若包含敏感信息,请返回{\"is_pii\": True},否则返回{\"is_pii\": False}。"
        "请严格以 json 格式返回,并且只输出 json。文本如下:\n\n" + content
    )

    pii_agent = trusted_model.with_structured_output(PiiCheck)
    result = pii_agent.invoke(prompt)

    if result.is_pii is True:
        # Block execution before any processing
        return {
            "messages": [{
                "role": "assistant",
                "content": "I cannot process requests containing inappropriate content. Please rephrase your request."
            }],
            "jump_to": "end"
        }
    else:
        print("No PII found")

    return None
result = message_with_pii(pii_middleware=content_blocker)

for message in result["messages"]:
    message.pretty_print()
================================ Human Message =================================

File "/home/luochang/proj/agent.py", line 53, in my_agent
    agent = create_react_agent(
---
报错位置在哪
================================== Ai Message ==================================

I cannot process requests containing inappropriate content. Please rephrase your request.

🏀 处置方式二:如遇敏感信息,使用一串 ***** 号屏蔽隐私信息。

@before_agent(can_jump_to=["end"])
def content_filter(state: AgentState,  runtime: Runtime) -> dict[str, Any] | None:
    """Deterministic guardrail: Block requests containing banned keywords."""
    # Get the first user message
    if not state["messages"]:
        return None

    last_message = state["messages"][-1]
    if last_message.type != "human":
        return None

    content = last_message.content.lower()
    prompt = (
        "你是一个隐私保护助手。请识别下面文本中涉及个人可识别信息(PII),"
        "例如:姓名、身份证号、护照号、电话号码、邮箱、住址、银行卡号、社交账号、车牌等。"
        "特别注意,若代码、文件路径中包含用户名,也应被视为敏感信息。"
        "若包含敏感信息,请返回{\"is_pii\": True},否则返回{\"is_pii\": False}。"
        "请严格以 json 格式返回,并且只输出 json。文本如下:\n\n" + content
    )

    pii_agent = trusted_model.with_structured_output(PiiCheck)
    result = pii_agent.invoke(prompt)

    if result.is_pii is True:
        mask_prompt = (
            "你是一个隐私保护助手。请将下面文本中的所有个人可识别信息(PII)用星号(*)替换。"
            "仅替换敏感片段,其他文本保持不变。"
            "只输出处理后的文本,不要任何解释或额外内容。文本如下:\n\n" + last_message.content
        )
        masked_message = basic_model.invoke(mask_prompt)
        return {
            "messages": [{
                "role": "assistant",
                "content": masked_message.content
            }]
        }
    else:
        print("No PII found")

    return None
result = message_with_pii(pii_middleware=content_filter)

for message in result["messages"]:
    message.pretty_print()
================================ Human Message =================================

File "/home/luochang/proj/agent.py", line 53, in my_agent
    agent = create_react_agent(
---
报错位置在哪
================================== Ai Message ==================================

File "/home/********/proj/agent.py", line 53, in my_agent
    agent = create_react_agent(
---
报错位置在哪
================================== Ai Message ==================================

根据你提供的错误信息,报错位置在:

**文件路径**: `/home/luochang/proj/agent.py`  
**行号**: 第 53 行  
**函数**: `my_agent` 函数中  
**具体代码**: `agent = create_react_agent(` 这一行

---

### 可能的错误原因
`create_react_agent` 是 LangChain 库中的一个函数,用于创建 ReAct(Reasoning + Action)风格的 Agent。报错可能由以下几种情况引起:

1. **缺少必要的参数**:
   - `create_react_agent` 需要传入 `llm`(大语言模型)、`tools`(工具列表)等参数。
   - 示例:
     ```python
     from langchain.agents import create_react_agent
     from langchain.llms import OpenAI
     from langchain.tools import Tool

     llm = OpenAI()
     tools = [Tool(...)]
     agent = create_react_agent(llm=llm, tools=tools)
     ```

2. **参数类型不匹配**:
   - 确保传入的 `llm` 和 `tools` 类型正确。

3. **未导入依赖模块**:
   - 检查是否已正确安装并导入 `langchain` 相关模块。

4. **LangChain 版本问题**:
   - 如果使用的是旧版本或新版本,API 可能有变化,请确认当前版本支持该方法。

---

### 解决建议
请提供完整的报错信息(例如 TypeError、ImportError 等),这样可以更准确地定位问题。同时检查以下内容:

- 是否正确初始化了 LLM 和 Tools?
- 是否遗漏了某些必需参数?

如果你能贴出第 53 行附近的完整代码段和报错堆栈,我可以帮你进一步分析具体问题。