返回博客列表
技术文章AgentLangGraphReActPlan-ExecuteAILLM架构设计

Agent 的三种模式:Reactive、Deliberative 与 Hybrid

2026年06月🇨🇳 中文

本文所涉及的代码:https://github.com/LiuSandy/agent-playground/tree/tutorial-01

同一套 Agent 代码,跑在不同的任务上

上一篇文章我们用 LangGraph 搭了一个能计算、能搜索的 Agent。它的执行方式很简单:LLM 拿到用户输入 → 判断是否需要调用工具 → 需要就调、不需要就直接回答 → 调完工具拿到结果再判断 → 一直循环到能回答为止。代码量不多,跑起来也干净利落。

这套代码跑简单任务效果很好。问天气、算个数、搜个文档,一两轮就结束了。但如果换一个任务呢?

调研三个主流 Python Web 框架(Django、Flask、FastAPI),从性能、生态、学习成本三个维度对比,输出一份报告。

这时候问题来了。这个任务暗含多个子目标——先分别调研每个框架,再归纳出三个维度的对比,最后组织成报告格式——子目标之间有明确的结构依赖。而原来的 Agent 是走一步看一步的:它只看当前这一刻的状态,决定"下一步该做什么"。面对多子目标的复杂任务,这种策略很容易失控:

  • 搜了 Django → 觉得信息不够 → 又搜一次 Django → 搜到一半想起 FastAPI 还没搜 → 跳过去搜 FastAPI → 忘了 Django 查到哪了
  • 最后输出的可能只有第一个框架的零散信息,但 Token 已经烧了好几千

像一个下棋只看眼前一步的人——每一步也许都不算错,但十步之后可能偏离目标很远。

问题不在于 Agent 写得不好,而在于"走一步看一步"这个策略本身的视野太窄。 不同复杂度的任务,需要不同的决策模式来应对。Agent 的世界里不是只有一种做法,而是三种——这正是 Agent 架构设计中最核心的一组选择:

Reactive — 走一步看一步,快速反应,适合简单任务

Deliberative — 先整体规划再逐步执行,适合明确的多步复杂任务

Hybrid — 先判断任务复杂度,再选择 Reactive 或 Deliberative,适合入口任务复杂度不稳定的场景

三种模式分别对应不同的任务复杂度,这篇文章就逐一拆解它们的原理、LangGraph 实现和适用场景,帮你在面对具体任务时做出正确的选择。


Reactive 模式

什么是 Reactive Agent?

Reactive Agent 的定义很简单:感知当前状态 → 推理下一步行动 → 执行 → 观察结果 → 再次感知,如此循环。整个过程没有全局规划,每一步的决策只依赖当前可用的信息。

类比一下,打乒乓球就是典型的 Reactive 行为——对手怎么打我就怎么回,我并不会提前预测三个回合之后球会在哪。球来了 → 判断轨迹 → 挥拍 → 看对手反应 → 下一轮。每一步都很快,反馈周期极短。

在 LangGraph 中,Reactive 模式对应的图结构就是我们上一篇文章构建的:

START → call_model → 条件判断 → call_tool → call_model → ... → END

这是一个简单的"推理-工具"循环,没有规划节点,没有反思步骤。LLM 就是那个边走边看的人。

ReAct 循环:Reactive 模式的核心

ReAct 是 Reactive 模式最经典的实现,全称是 Reasoning + Acting。它把 Agent 的每一步决策拆成三个子步骤:

  1. Thought(思考):LLM 分析当前需要什么信息,决定是否调用工具
  2. Action(行动):调用选定的工具,传入参数
  3. Observation(观察):接收工具返回的结果,作为下一轮思考的输入

用一个具体例子来看。用户问"北京今天天气怎么样,适合跑步吗?"

Step 1:
  Thought: 用户想了解北京天气,我需要调用天气查询工具
  Action: get_weather("北京")

Step 2:
  Observation: 晴,25°C,PM2.5: 35
  Thought: 天气晴朗,温度适宜,空气质量良好,很适合户外跑步

Step 3:
  无需更多工具,最终回答:"北京今天晴,25°C,空气质量良好,非常适合跑步"

这个例子只用了两轮就结束了。关键点在于:Observation 是真实世界反馈——它不是 LLM 自己"脑补"的,而是工具真实返回的数据。每轮循环都在用外部数据纠正 LLM 可能产生的幻觉,这就是 ReAct 比纯生成更可靠的底层原因。

代码实现:把 ReAct 循环显式展示出来

在上一篇文章的代码基础上,我们把 ReAct 的三个步骤可视化出来。下面是完整的实现,文件结构沿用 agent-playground/ 项目:

# agent-playground/app/core/state.py
from typing import Annotated, TypedDict
from langchain_core.messages import BaseMessage
from langgraph.graph.message import add_messages


class AgentState(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]

State 定义和上一篇文章完全一样,核心是 add_messages reducer 保证消息历史不会丢失。

# agent-playground/app/core/llm.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI

load_dotenv()


def create_llm(tools=None):
    llm = ChatOpenAI(
        model=os.getenv("DEEPSEEK_MODEL", "deepseek-chat"),
        base_url=os.getenv("DEEPSEEK_BASE_URL", "https://api.deepseek.com/v1"),
        api_key=os.getenv("DEEPSEEK_API_KEY", ""),
        temperature=0,
    )
    if tools:
        llm = llm.bind_tools(tools)
    return llm

这里我们把 LLM 初始化抽成了一个工厂函数 create_llm,后面所有三种模式都复用它。

# agent-playground/app/core/graph.py
from langgraph.constants import END
from langgraph.graph import START, StateGraph
from langgraph.prebuilt import ToolNode
from .state import AgentState


def build_reactive_graph(llm, tools, max_steps=10):
    def call_model(state: AgentState) -> dict:
        response = llm.invoke(state["messages"])
        step = state.get("step_count", 0) + 1
        print(f"\n--- Step {step} ---")
        if hasattr(response, "tool_calls") and response.tool_calls:
            for tc in response.tool_calls:
                print(f"  Action: {tc['name']}({tc['args']})")
        else:
            print(f"  Response: {response.content[:100]}...")
        return {"messages": [response], "step_count": step}

    def should_continue(state: AgentState) -> str:
        last_message = state["messages"][-1]
        if state.get("step_count", 0) >= max_steps:
            return END
        if hasattr(last_message, "tool_calls") and last_message.tool_calls:
            return "call_tool"
        return END

    def observe_result(state: AgentState) -> dict:
        last_message = state["messages"][-1]
        print(f"  Observation: {last_message.content[:100]}")
        return {}

    workflow = StateGraph(AgentState)
    workflow.add_node("call_model", call_model)
    workflow.add_node("call_tool", ToolNode(tools))
    workflow.add_node("observe", observe_result)

    workflow.add_edge(START, "call_model")
    workflow.add_conditional_edges("call_model", should_continue, {
        "call_tool": "call_tool",
        END: END,
    })
    workflow.add_edge("call_tool", "observe")
    workflow.add_edge("observe", "call_model")

    return workflow.compile()

这段代码在上一篇文章的基础上加了两个东西:

  1. 步骤计数step_count 跟踪执行了多少步,配合 max_steps 参数自动截断,防止死循环
  2. 观察节点observe 节点负责打印工具返回的结果,让 ReAct 的 Thought → Action → Observation 三步在终端里清晰可视

图结构变成了:

reactive.png

observe 是一个纯日志节点,不修改 State,只是为了在执行日志里把 Observation 单独标出来。

# agent-playground/app/core/agent.py
from langchain_core.messages import HumanMessage
from .llm import create_llm
from .graph import build_reactive_graph
from .tools import get_tools


def create_reactive_agent():
    tools = get_tools()
    llm = create_llm(tools)
    return build_reactive_graph(llm, tools)


def run_agent(agent, user_input: str):
    result = agent.invoke({"messages": [HumanMessage(content=user_input)]})
    return result["messages"][-1].content

create_reactive_agent 是 Reactive 模式的便利入口,把工具加载、LLM 初始化和图构建三步收拢到一起。

跑一下看看效果:

# agent-playground/main.py
from app.core.agent import create_reactive_agent, run_agent


agent = create_reactive_agent()
answer = run_agent(agent, "北京今天天气怎么样?")
print(f"\n最终回答: {answer}")

终端输出的日志会是这样的:

--- Step 1 ---
  Action: search_tool({'query': '北京今天天气'})
--- Step 2 ---
  Observation: 北京今天晴,气温22°C ~ 32°C,空气质量良
--- Step 3 ---
  Response: 北京今天晴,气温在22°C到32°C之间...

Thought → Action → Observation 三步清晰可见,这就是 ReAct 循环的完整面貌。

适用场景与局限性

适用场景:

  • 开放域问答、信息检索、天气查询、翻译等单步或少量步数就能完成的任务
  • 工具调用场景:API 调用、数据库查询、文件操作
  • 特点:反馈周期短、每一步都能看到效果,适合"边走边看"型任务

局限性:

  • 缺乏全局规划:走一步看一步,可能"见树不见林"。面对多子目标的复杂任务,没有整体策略就容易陷入局部搜索
  • 容易迷失方向:多步任务中可能出现重复劳动——反复调用同一个工具查相同的内容,或者在不同子目标之间无意义地跳来跳去
  • 不适合需要预先判断任务复杂度的场景:ReAct 对"这个任务大概需要几步能完成"没有任何概念,它只会一直循环直到 LLM 觉得可以回答了(或者被 max_steps 截断)

简单来说:Reactive 模式是 Agent 的默认选项。大多数日常任务用它就够,但当你发现 Agent 在面对复杂任务时反复绕圈、Token 烧了很多却答不到点子上——说明该考虑 Deliberative 了。


Deliberative 模式

什么是 Deliberative(Plan-and-Execute)?

Deliberative Agent 的核心思想:先整体规划,再逐步执行,执行中可反思和重规划

这个和 Reactive 最大的区别在于决策的"视野"。Reactive 只看下一步,Deliberative 先在脑子里过一遍全局。类比的话,下象棋就是典型的 Deliberative 行为——你不会只看眼前的这一步,而是想好几步之后的局面,走一步之后再根据对手的回应调整策略。

Deliberative 模式通常包含三个阶段:

  1. 规划阶段(Plan):LLM 分析用户需求,把大任务拆解成有序的步骤列表
  2. 执行阶段(Execute):按计划逐步执行每一步,每步可能有自己的工具调用
  3. 反思阶段(Replan):检查已执行步骤的结果,判断是否需要调整后续计划

流程示意:

User → Planner → [Step1, Step2, Step3, ...]
                    ↓
              Executor(逐步执行)
                    ↓
              Replanner → 结果 OK?→ 结束
                    ↓ 结果不行
              调整计划,重新执行

三个阶段的循环保证了 Agent 既有全局视野,又能在执行过程中根据实际情况灵活调整。

代码实现:规划、执行、反思三步图

Deliberative 模式的图结构比 Reactive 复杂不少——它有三个核心节点,加上条件路由在它们之间切换。

先定义新的 State。Deliberative Agent 除了消息历史,还需要存储执行计划

# agent-playground/app/core/state.py
from typing import Annotated, TypedDict
from langchain_core.messages import BaseMessage
from langgraph.graph.message import add_messages


class AgentState(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]
    step_count: int


class DeliberativeState(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]
    plan: list[str]
    current_step: int
    step_results: dict[int, str]
    original_question: str
    tool_rounds: int

plan 是一个字符串列表,每个元素是一个可执行步骤。current_step 记录当前执行到哪一步。step_results 存储每步的执行结果,供反思阶段参考。original_question 保存用户的原始问题,避免执行某个子步骤时丢掉全局上下文;tool_rounds 用来限制单个步骤内部最多调用几轮工具,防止某一步陷入循环。

Planner 节点负责生成初始计划:

# agent-playground/app/core/graph.py(追加到文件末尾)

import json

from langchain_core.messages import HumanMessage, SystemMessage, ToolMessage
from langgraph.constants import END
from langgraph.graph import START, StateGraph
from langgraph.prebuilt import ToolNode
from .state import DeliberativeState

PLANNER_PROMPT = SystemMessage(content="""你是一个任务分解专家。请将用户的请求分解为可执行的步骤列表。

规则:
1. 每个步骤应该是一个独立、可执行的操作
2. 步骤之间应该有明确的先后顺序
3. 返回格式为 JSON 数组,每个元素是一个步骤描述字符串
4. 如果任务简单,可能只有 1-2 个步骤

示例:
用户:"调研 Django 和 Flask 的性能差异"
输出:["搜索 Django 性能数据", "搜索 Flask 性能数据", "对比两者的性能差异并输出结论"]
""")

EXECUTOR_PROMPT = SystemMessage(content="""你正在执行一个任务计划中的某一个步骤。
当前步骤是你唯一需要关注的任务,请使用可用的工具完成它。
完成后,简洁地输出执行结果。""")

REPLANNER_PROMPT = SystemMessage(content="""你是一个任务审核专家。请检查以下已完成步骤的结果,判断是否需要调整后续计划。

已完成的步骤及其结果:
{step_results}

剩余步骤:
{remaining_steps}

如果需要调整计划(比如某步骤的结果表明后续方向需要改变),请返回更新后的剩余步骤 JSON 数组。
如果不需要调整,请返回原来的剩余步骤。""")


def build_deliberative_graph(plan_llm, execute_llm, tools, max_tool_rounds_per_step=2):
    llm_with_tools = execute_llm.bind_tools(tools)

    def parse_plan(content: str) -> list[str]:
        if "```" in content:
            content = content.split("```")[1]
            if content.startswith("json"):
                content = content[4:]
        try:
            plan = json.loads(content)
            if isinstance(plan, list):
                return [str(step).strip() for step in plan if str(step).strip()]
        except json.JSONDecodeError:
            pass
        return [
            line.strip("- ").strip()
            for line in content.strip().split("\n")
            if line.strip()
        ]

    def current_tool_exchange(messages):
        for idx in range(len(messages) - 1, -1, -1):
            message = messages[idx]
            if hasattr(message, "tool_calls") and message.tool_calls:
                return messages[idx:]
        return []

    def current_tool_observations(messages) -> list[str]:
        return [
            message.content
            for message in current_tool_exchange(messages)
            if isinstance(message, ToolMessage)
        ]

    def planner(state: DeliberativeState) -> dict:
        user_msg = state["messages"][-1]
        response = plan_llm.invoke([PLANNER_PROMPT, user_msg])
        plan = parse_plan(response.content)
        if not plan:
            plan = [user_msg.content]
        print(f"\n[Plan] {' → '.join(plan)}")
        return {
            "plan": plan,
            "current_step": 0,
            "step_results": {},
            "original_question": user_msg.content,
            "tool_rounds": 0,
        }

    def executor(state: DeliberativeState) -> dict:
        step_idx = state["current_step"]
        step_desc = state["plan"][step_idx]
        print(f"\n[Execute] Step {step_idx + 1}/{len(state['plan'])}: {step_desc}")

        previous_results = "\n".join(
            f"{state['plan'][idx]}: {result}"
            for idx, result in sorted(state["step_results"].items())
        ) or "暂无"
        task_msg = HumanMessage(content=f"请完成以下任务步骤:{step_desc}")
        context_msg = HumanMessage(content=(
            f"用户原始问题:{state.get('original_question', '')}\n\n"
            f"已完成步骤结果:\n{previous_results}"
        ))
        messages = [EXECUTOR_PROMPT, context_msg, task_msg]
        if isinstance(state["messages"][-1], ToolMessage):
            messages.extend(current_tool_exchange(state["messages"]))

        response = llm_with_tools.invoke(messages)
        if hasattr(response, "tool_calls") and response.tool_calls:
            for tool_call in response.tool_calls:
                print(f"  Action: {tool_call['name']}({tool_call['args']})")
        else:
            print(f"  Result: {response.content[:100]}")
        return {"messages": [response]}

    def observe_tool_result(state: DeliberativeState) -> dict:
        tool_messages = [
            message for message in state["messages"]
            if isinstance(message, ToolMessage)
        ]
        if tool_messages:
            observations = [message.content for message in tool_messages[-2:]]
            print(f"  Observation: {' | '.join(observations)[:160]}")
        return {"tool_rounds": state.get("tool_rounds", 0) + 1}

    def record_step_result(state: DeliberativeState) -> dict:
        step_idx = state["current_step"]
        response = state["messages"][-1]
        new_results = dict(state["step_results"])
        if isinstance(response, ToolMessage):
            observations = current_tool_observations(state["messages"])
            content = "\n".join(observations) or response.content
            print(f"  Result: {content[:100]}")
        else:
            content = response.content
        new_results[step_idx] = content
        return {
            "step_results": new_results,
            "current_step": step_idx + 1,
            "tool_rounds": 0,
        }

    def replanner(state: DeliberativeState) -> dict:
        if state["current_step"] >= len(state["plan"]):
            return {}  # 全部完成,不需要重规划

        remaining = state["plan"][state["current_step"]:]
        results_str = "\n".join(
            f"Step {k+1}: {v}" for k, v in state["step_results"].items()
        )
        prompt = HumanMessage(content=REPLANNER_PROMPT.content.format(
            step_results=results_str,
            remaining_steps="\n".join(remaining),
        ))
        response = plan_llm.invoke([prompt])
        new_plan = parse_plan(response.content)
        if new_plan and new_plan != remaining:
            print(f"\n[Replan] 调整计划: {' → '.join(new_plan)}")
            updated_plan = (
                state["plan"][:state["current_step"]] + new_plan
            )
            return {"plan": updated_plan}
        return {}

    def should_continue_after_planning(state: DeliberativeState) -> str:
        if state["current_step"] >= len(state["plan"]):
            return "summarize"
        return "executor"

    def should_continue_after_executor(state: DeliberativeState) -> str:
        last_message = state["messages"][-1]
        if hasattr(last_message, "tool_calls") and last_message.tool_calls:
            return "call_tool"
        return "record_step_result"

    def should_continue_after_observation(state: DeliberativeState) -> str:
        if state.get("tool_rounds", 0) >= max_tool_rounds_per_step:
            print("  Tool limit reached for this step; using current observations.")
            return "record_step_result"
        return "executor"

    def should_continue_after_step(state: DeliberativeState) -> str:
        if state["current_step"] >= len(state["plan"]):
            return "summarize"
        if state["current_step"] > 0 and state["current_step"] % 3 == 0:
            return "replanner"
        return "executor"

    def summarize(state: DeliberativeState) -> dict:
        results_str = "\n".join(
            f"{state['plan'][k]}: {v}"
            for k, v in sorted(state["step_results"].items())
        )
        summary_prompt = HumanMessage(content=(
            "请根据以下执行结果,生成最终回答:\n\n" + results_str
        ))
        response = plan_llm.invoke([summary_prompt])
        print(f"\n[Final] {response.content[:100]}")
        return {"messages": [response]}

    workflow = StateGraph(DeliberativeState)
    workflow.add_node("planner", planner)
    workflow.add_node("executor", executor)
    workflow.add_node("call_tool", ToolNode(tools))
    workflow.add_node("observe_tool_result", observe_tool_result)
    workflow.add_node("record_step_result", record_step_result)
    workflow.add_node("replanner", replanner)
    workflow.add_node("summarize", summarize)

    workflow.add_edge(START, "planner")
    workflow.add_conditional_edges("planner", should_continue_after_planning, {
        "executor": "executor",
        "summarize": "summarize",
    })
    workflow.add_conditional_edges("executor", should_continue_after_executor, {
        "call_tool": "call_tool",
        "record_step_result": "record_step_result",
    })
    workflow.add_edge("call_tool", "observe_tool_result")
    workflow.add_conditional_edges("observe_tool_result", should_continue_after_observation, {
        "executor": "executor",
        "record_step_result": "record_step_result",
    })
    workflow.add_conditional_edges("record_step_result", should_continue_after_step, {
        "executor": "executor",
        "replanner": "replanner",
        "summarize": "summarize",
    })
    workflow.add_edge("replanner", "executor")
    workflow.add_edge("summarize", END)

    return workflow.compile()

这段代码虽然长,但结构很清晰:

  1. Planner:用一个专门的 Prompt 让 LLM 拆解任务,输出 JSON 格式的步骤列表
  2. Executor:每次拿一个步骤描述,让绑定了工具的 LLM 判断是否需要调用工具
  3. ToolNode + Observation:如果 Executor 返回了 tool_callsToolNode 负责真正执行工具,observe_tool_result 负责打印观察结果
  4. Record Step Result:当当前步骤不再需要工具,或工具轮数达到上限时,把这一小步的结果写入 step_results
  5. Replanner:每执行完 3 个计划步骤触发一次,把已完成步骤的结果给 LLM 审查,如果发现需要调整方向就更新后续计划
  6. Summarize:全部步骤执行完后,把所有结果汇总成最终答案

这里有一个容易混淆的点:execute_llm.bind_tools(tools) 只是让模型知道"有哪些工具可以调用",并允许它返回 tool_calls;真正执行工具的是 ToolNode(tools)。所以 Deliberative 模式不是简单地"规划后每步调一次 LLM",而是外层按计划推进,内层每个步骤仍然允许一小段工具调用循环。

图结构可以理解成:

deliberative.png

和 Reactive 模式相比,关键区别在于多了两条"管理线":Planner 在开始时生成策略,Replanner 在中途调整策略。Executor 内部仍然保留工具调用能力,但它服务于"当前计划步骤",而不是无目标地一路往前走。

用起来也很简单,两个 LLM 分别负责规划和执行:

# agent-playground/app/core/agent.py(追加)
from .graph import build_deliberative_graph


def create_deliberative_agent():
    tools = get_tools()
    plan_llm = create_llm()     # 负责规划,不需要工具
    execute_llm = create_llm()  # 负责执行,工具在 graph 内部绑定
    return build_deliberative_graph(plan_llm, execute_llm, tools)

plan_llm 不需要绑工具——Planner 的工作是拆解任务,不是执行工具。execute_llm 需要工具能力,但绑定动作放在 build_deliberative_graph 内部完成:llm_with_tools = execute_llm.bind_tools(tools)。这样 graph 本身同时持有"会生成 tool_calls 的 LLM"和"真正执行工具的 ToolNode",职责更完整。

调用入口:

# agent-playground/main.py
from app.core.agent import create_deliberative_agent, run_agent


agent = create_deliberative_agent()
answer = run_agent(agent, "调研 Django 和 Flask 的性能差异")
print(f"\n最终回答: {answer}")

运行时会先看到 Planner 生成的步骤列表,然后 Executor 逐步执行。每个步骤内部,如果 LLM 判断需要外部信息,就会进入 call_tool → observe_tool_result → executor 的小循环;如果不再需要工具,就进入 record_step_result,推进到下一个计划步骤。每 3 个计划步骤触发一次 Replanner 检查方向,最后 Summarize 汇总输出。

适用场景与缺点

适用场景:

  • 复杂多步任务:代码生成 → 运行测试 → 根据报错修复,这种有明确步骤依赖的任务
  • 需要全局视野的任务:研究报告撰写、技术方案设计、竞品分析
  • 任务依赖关系复杂:步骤之间有先后顺序,前一步的结果直接影响后一步怎么做

缺点:

  • 规划本身有成本:Planner 节点会额外消耗 Token 和时间。对于简单任务(比如查个天气),先规划三步再执行是纯浪费
  • 对简单任务"小题大做":用户的输入是"1+1 等于几",结果 Planner 拆成了"搜索加法定义 → 调用计算器验证 → 输出结果",三行代码能搞定的事花了 20 秒和 500 Token
  • 计划可能本身就错了:如果 Planner 一开始就理解偏了,接下来的执行就是在错误的方向上努力,Replanner 也不一定能纠正回来

一句话总结:Deliberative 模式适合你明确知道这个任务很复杂的场景。如果任务的复杂度未知,还需要更灵活的模式——这就是 Hybrid 要解决的问题。


Hybrid 模式

什么是 Hybrid?

Hybrid 模式把 Reactive 的速度和 Deliberative 的全局规划能力放到同一张图里。它的核心思路不是"每个子任务都嵌套一个 Reactive Agent",而是:先判断任务复杂度,再选择合适的执行路径

工作流程:

  1. Router 先判断任务复杂度:简单问题走 Reactive,复杂任务走 Deliberative
  2. 简单任务直接响应:比如查天气、算表达式、问一个事实,不需要先规划
  3. 复杂任务再进入规划流程:比如调研、对比、分析、写方案,交给 Planner 拆步骤,再按 Deliberative 流程执行

类比一下,这像一个客服系统的分诊台。用户只是问"订单什么时候发货?",直接查一下就回答;如果用户说"帮我分析这批订单为什么延迟,并给一个优化方案",才需要进入规划、执行、复盘的复杂流程。Hybrid 的价值就在这里:简单任务不小题大做,复杂任务不硬靠 Reactive 瞎跑。

代码实现要点

Hybrid 模式的核心是一个路由结构:入口先经过 router,然后根据 mode 进入 Reactive 分支或 Deliberative 分支。下面的代码沿用前面 Deliberative 部分抽出来的节点构造函数:_create_deliberative_nodes_add_deliberative_nodes_add_deliberative_edges

# agent-playground/app/core/graph.py(追加,Hybrid 模式)

from langgraph.constants import END
from langgraph.graph import START, StateGraph
from langgraph.prebuilt import ToolNode
from .state import HybridState


def build_hybrid_graph(
    plan_llm,
    execute_llm,
    tools,
    max_steps=10,
    max_tool_rounds_per_step=2,
):
    reactive_llm_with_tools = execute_llm.bind_tools(tools)
    deliberative_nodes = _create_deliberative_nodes(
        plan_llm,
        execute_llm.bind_tools(tools),
        max_tool_rounds_per_step,
    )

    def router(state: HybridState) -> dict:
        user_msg = state["messages"][-1]
        content = user_msg.content.lower()
        complex_markers = [
            "调研", "研究", "分析", "比较", "对比", "规划", "计划",
            "总结", "多步", "步骤", "方案", "benchmark", "performance",
            "compare", "research", "analyze",
        ]
        is_complex = len(user_msg.content) > 40 or any(
            marker in content for marker in complex_markers
        )
        mode = "deliberative" if is_complex else "reactive"
        print(f"\n[Hybrid] Route: {mode}")
        return {"mode": mode, "original_question": user_msg.content}

    def choose_mode(state: HybridState) -> str:
        return state.get("mode", "reactive")

    def reactive_call_model(state: HybridState) -> dict:
        response = reactive_llm_with_tools.invoke(
            [SYSTEM_PROMPT] + state["messages"]
        )
        step = state.get("step_count", 0) + 1
        print(f"\n[Reactive] Step {step}")
        if hasattr(response, "tool_calls") and response.tool_calls:
            for tool_call in response.tool_calls:
                print(f"  Action: {tool_call['name']}({tool_call['args']})")
        else:
            print(f"  Response: {response.content[:100]}...")
        return {"messages": [response], "step_count": step}

    def reactive_observe(state: HybridState) -> dict:
        last_message = state["messages"][-1]
        print(f"  Observation: {last_message.content[:100]}")
        return {}

    def should_continue_reactive(state: HybridState) -> str:
        last_message = state["messages"][-1]
        if state.get("step_count", 0) >= max_steps:
            return END
        if hasattr(last_message, "tool_calls") and last_message.tool_calls:
            return "reactive_call_tool"
        return END

    workflow = StateGraph(HybridState)
    workflow.add_node("router", router)
    workflow.add_node("reactive_call_model", reactive_call_model)
    workflow.add_node("reactive_call_tool", ToolNode(tools))
    workflow.add_node("reactive_observe", reactive_observe)
    _add_deliberative_nodes(workflow, deliberative_nodes, tools)

    workflow.add_edge(START, "router")
    workflow.add_conditional_edges("router", choose_mode, {
        "reactive": "reactive_call_model",
        "deliberative": "planner",
    })
    workflow.add_conditional_edges("reactive_call_model", should_continue_reactive, {
        "reactive_call_tool": "reactive_call_tool",
        END: END,
    })
    workflow.add_edge("reactive_call_tool", "reactive_observe")
    workflow.add_edge("reactive_observe", "reactive_call_model")
    _add_deliberative_edges(workflow, deliberative_nodes)

    return workflow.compile()

这段实现里,Hybrid 不是一个新的执行算法,而是把前面两种模式放进同一张图,再加一个入口路由。

  • Reactive 分支reactive_call_model → reactive_call_tool → reactive_observe → reactive_call_model,适合简单、短链路任务
  • Deliberative 分支:复用前面 Deliberative 的 planner / executor / call_tool / observe_tool_result / record_step_result / replanner / summarize
  • Router:根据输入长度和关键词做一个轻量判断,比如"调研、分析、对比、方案、research、compare"这类任务会被路由到 Deliberative

流程图如下

hybrid.png

适用场景

  • 任务复杂度不稳定:同一个入口既会收到"查一下天气",也会收到"调研三个框架并输出报告"
  • 想控制简单任务成本:简单任务不进入 Planner,直接走 Reactive,少一次规划开销
  • 想保留复杂任务质量:复杂任务自动切到 Deliberative,先拆计划再执行,不靠 Reactive 硬撑

对比总结与决策框架

三种模式不是"哪个更好"的关系,而是"哪个更适合当前任务"。

维度ReactiveDeliberativeHybrid
规划阶段有(详细步骤)简单任务无,复杂任务有
执行方式单步决策按计划逐步执行Router 选择 Reactive 或 Deliberative
反馈回路每一步每执行一轮取决于被路由到哪条分支
响应速度
Token 消耗
简单任务表现浪费好(走 Reactive)
复杂任务表现
实现复杂度

决策框架——拿到一个任务时,按这个顺序判断:

  1. 任务是简单的信息查询或单步操作? → 用 Reactive。不要多想,直接上 ReAct 循环
  2. 任务明显是多步骤的、步骤之间有明确的先后依赖? → 用 Deliberative。先让 Planner 拆出计划,再按计划执行
  3. 任务复杂度不确定,入口既可能收到简单问题也可能收到复杂任务? → 用 Hybrid。先路由,简单任务走 Reactive,复杂任务走 Deliberative
  4. 如果你不确定该选哪个——从 Reactive 开始,发现 Agent 在绕圈、迷失方向时再升级到 Hybrid

最后一句话总结:Reactive 是默认选项,当 Reactive 搞不定时再考虑更复杂的模式。不要为了"架构看起来高级"而选 Deliberative 或 Hybrid——你的 Token 和时间都是成本。

和"四种 Agent 架构"那篇是什么关系?

如果你读过前面的《四种主流的 Agent 架构:你写的是哪一种》,可能会发现一个问题:那篇文章讲的是 ReAct、Plan-Execute、Reflexion 和 Multi-Agent 四种架构,而这篇文章讲的是 Reactive、Deliberative 和 Hybrid 三种模式。它们不是两套互相冲突的分类,而是从不同维度看同一件事。

那篇文章更像是 Agent 架构全景图,关注的是 Agent 的循环机制怎么被扩展:ReAct 是最基础的"思考-行动-观察"循环;Plan-Execute 在循环前加了规划;Reflexion 在一次完整尝试后加了复盘和记忆;Multi-Agent 则把任务拆给多个角色协作。

这篇文章关注的是 单个 Agent 面对任务时的决策方式:Reactive 对应 ReAct,强调边走边看;Deliberative 对应 Plan-Execute,强调先规划再执行;Hybrid 则是工程实践里常见的折中——入口先判断任务复杂度,简单任务走 Reactive,复杂任务走 Deliberative。

所以可以这样理解:ReAct 和 Plan-Execute 基本能分别映射到 Reactive 和 Deliberative;Hybrid 是把两者组合起来的一种任务执行策略;而 Reflexion 和 Multi-Agent 更像可叠加的增强机制。你可以让一个 Reactive Agent 带 Reflexion,也可以让一个 Multi-Agent 系统里的每个角色分别采用 Reactive、Deliberative 或 Hybrid。