AI Agent编排范式

AI Agent编排范式 范式主要整理自Eino框架,文中有些概念仅属于Eino框架 引言 AI应用的核心任务极为简单: 接收指令 调用大模型 安全地执行工具(运行命令、操作文件、控制浏览器) 之后将结果反馈给你 三个关键原则,是解决一切花哨问题的基础: 执行是能控制的 状态是能追溯的 失败是能复盘的 AI应用的核心任务,即ReAct模式,通过让 ChatModel 进行显式的、一步一步的"思考"来解决复杂问题,在Eino ADK中将其直接抽象为ChatModelAgent了。在不同的领域、不同的场景下有各种各样的模型,也有各种各样的Agent来处理专有领域的任务,如Coding模型、视觉模型等。一个复杂的任务通常需要调用各种能力的模型和各种工具来完成任务,这就需要把这些拥有专有能力的Agent编排起来组成一个功能完善、强大的应用。 AI Agent编排范式分类 AI Agent编排范式大概分为以下几类: mindmap root((AI Agent编排范式)) Eino框架原生范式 基础编排层 Chain链式编排: Graph的封装,除了 "环" 之外,Chain 暴露了几乎所有 Graph 的能力。线性单向流/简单任务 Graph图编排: 支持pregel和dag模式/分支/并行/循环/复杂逻辑 Workflow工作流编排: DAG模式,字段级别映射,控制流/数据流分离 图嵌套:Eino中的chain、graph、workflow都可以嵌套进图 Agent编排 ChatModelAgent: 智能决策大脑,实现ReAct模式,LLM交互核心 Workflow Agents: 流程协调 Sequential Agent: 串行执行按顺序流转,如CI/CD流水线,数据ETL Parallel Agent: 并行执行/共享输入/结果聚合,如多源数据采集、多渠道推送 Loop Agent: 循环执行/结果积累/条件退出,数据同步、压力测试 Custom Agent: 可高度定制 预构建Multi-Agent Plan-Execute Agent: 规划-执行-重规划,结构化解决复杂任务,如复杂任务研究,智能助理任务执行 Supervisor Agent: 监督者统筹/子Agent分工/汇总决策 DeepAgents: 规划驱动集中协作,强化拆解与上下文隔离 通用行业主流范式 经典协作模式 Master-Slave(主从): 主Agent分配任务/从Agent执行/无自主决策 Swarm(蜂群): 无中心/Agent自主协作/去中心化/高容错 Hierarchical(层级): 多层级Agent/上层决策/下层执行/复杂系统 Peer-to-Peer(对等): 平级Agent/自主协商/资源共享/无核心节点 详细编排范式解析 1. 基础编排层 Chain(链式编排) 定义: Graph的封装,除了"环"之外,Chain暴露了几乎所有Graph的能力,适用于线性单向流/简单任务 使用场景: 数据处理流水线(如ETL过程) 顺序执行的任务链 简单的工作流程 Graph(图编排) 定义: 支持Pregel和DAG模式,能够处理分支、并行、循环等复杂逻辑 使用场景: 复杂业务逻辑处理 需要条件分支和循环的任务 并行计算任务 Workflow(工作流编排) 定义: DAG模式,不支持环,支持字段级别映射,强调控制流和数据流分离 使用场景: 数据集成和转换 需要精确字段映射的场景 复杂的数据处理管道 2. Agent编排层 ChatModelAgent 定义: 智能决策大脑,实现ReAct模式,是LLM交互核心 使用场景: 需要推理和决策的场景 自然语言交互 复杂问题分解 Workflow Agents Sequential Agent 定义: 串行执行按顺序流转 使用场景: 研究报告撰写流程 CI/CD流水线 数据提取、转换和加载(ETL) flowchart LR subgraph one[Sequential Agent] PlanAgent[Plan Agent<br/>制定研究计划] --> WriteAgent[Write Agent<br/>撰写学术报告] end Start[开始研究主题] --> one one --> End[结束] Parallel Agent 定义: 并行执行/共享输入/结果聚合 使用场景: 多源数据采集 并行数据分析 多渠道信息收集 flowchart LR subgraph p[彼此之间无需交互,功能边界清晰] a[Stock Data Collection Agent<br/>股票数据采集] b[News Data Collection Agent<br/>新闻数据采集] c[Social Media Info Collection Agent<br/>社媒信息采集] d[聚合] end Start[开始] --> a Start --> b Start --> c a --> d b --> d c --> d d --> End[结束] Loop Agent 定义: 循环执行/结果积累/条件退出 使用场景: 解决方案优化迭代 数据同步、压力测试 反馈循环改进 flowchart LR subgraph loop[反思迭代] a[Main Agent<br/>生成初步解决方案] --> b[Critique Agent<br/>质量审查/反馈改进] b -->|结果不满意<br/>MaxIteration=5| a end Start[开始] --> loop loop -->|结果满足| End[结束] 3. 预构建Multi-Agent范式 Plan-Execute Agent 定义: 规划-执行-重规划,适合需要多步骤推理、动态调整和工具集成的复杂任务场景 使用场景: 复杂研究任务 需要多步规划的项目 动态任务调整 flowchart TD A[1. UserInput] --> B[Planner] B -->|2. 生成具体的计划步骤| C[Plan Steps<br/>1. ...<br/>2. ...<br/>3. ...] C -->|3. 执行计划的第一步| D[Executor] D -->|4. 传达当前的计划和第一步运行的结果| E[Replanner] E -->|5a. 评估并生成已调整的新计划<br/>交给Executor执行新计划的第一步| D E -->|5b. 评估任务执行的完成情况<br/>或达到最大迭代次数| F[End] Supervisor Agent 定义: 监督者统筹/子Agent分工/汇总决策 使用场景: 多Agent任务分配和管理 动态任务路由 Agent间的协调 flowchart LR subgraph subagents a[Agent1] b[Agent2] c[Agent3] end s[Supervisor Agent] <--> |dispatch mission</br>return result| a s <--> |dispatch mission</br>return result| b s <--> |dispatch mission</br>return result| c s --> Exit Layered-Supervisor 使用场景: 多层级任务管理和动态路由 flowchart LR TopSupervisor[Supervisor Agent<br/>顶层监督者-动态路由] --> Research[Research Agent<br/>信息检索] TopSupervisor --> MathSuper[Math Agent<br/>中层监督者-数学运算] MathSuper --> Subtract[Subtract Agent<br/>减法运算] MathSuper --> Multiply[Multiply Agent<br/>乘法运算] MathSuper --> Divide[Divide Agent<br/>除法运算] DeepAgents 定义:规划驱动的集中式协作,Main Agent 统一协调下的 Multi-Agent 模式 流程: 通过 WriteTodos 将用户目标拆解为结构化待办并记录进度 通过统一入口 TaskTool 选择并调用对应的 SubAgent 执行子任务;主/子代理上下文隔离,避免中间步骤污染主流程。 汇总各子代理返回的结果;必要时再次调用 WriteTodos 更新进度或进行重规划,直至完成。 适用场景: 多角色协作的复杂业务流程,集中委派子任务并统一汇总 严格上下文隔离的执行环境 flowchart TD subgraph MainAgent ChatModel[ChatModel] subgraph Tools WriteTodos[WriteTodos] TaskTool[TaskTool] CustomTools[CustomTools] end end subgraph SubAgents GeneralPurpose[GeneralPurpose] CustomSubAgents[CustomSubAgents] end ChatModel -->|Reasoning & Action| Tools Tools --> ChatModel TaskTool -->|Delegate Tasks| SubAgents SubAgents -->|Return Results| TaskTool 4. 其他主流范式 Master-Slave(主从模式) 定义: 主Agent分配任务/从Agent执行/无自主决策 使用场景: 任务分配明确的系统 标准化执行任务 控制权集中的场景 Swarm(蜂群模式) 定义: 无中心/Agent自主协作/去中心化/高容错 使用场景: 去中心化系统 高容错需求 自适应环境 Hierarchical(层级模式) 定义: 多层级Agent/上层决策/下层执行/复杂系统 使用场景: 大型复杂系统 分层决策机制 组织化任务分配 Peer-to-Peer(对等模式) 定义: 平级Agent/自主协商/资源共享/无核心节点 使用场景: 平等协作环境 资源共享 无单点故障需求 选择编排范式 任务复杂度 简单 → Chain/Sequential 中等 → Graph/Workflow 复杂 → Plan-Execute/Supervisor 执行模式 顺序 → Chain/Sequential 并行 → Parallel 迭代 → Loop 动态调整 → Plan-Execute 总结 AI Agent编排范式的选择需要根据具体的应用场景、任务复杂度和系统需求进行综合考虑。无论选择哪种范式,都需要遵循"执行可控、状态可追溯、失败可复盘"的原则,确保系统的稳定性和可维护性。 ...

February 6, 2026 · 3 min · 427 words · erpan

基于Eino框架理解大模型工具调用

函数调用 Function Calling Function Calling 是一种将大模型与外部工具和 API 相连的关键功能,大模型能够将用户的自然语言智能地转化为对特定工具或 API 的调用,从而高效满足各种场景需求,如动态信息查询、任务自动化等 工具调用的一般步骤: 应用程序将用户问题和tools列表一起发送给大模型,tools列表表明模型可以调用的工具 LLM 对用户意图进行分析,决定是否需要使用工具以及使用哪些工具 a. 无需工具则生成回答响应给应用程序 b. 需要调用工具输出工具名和参数信息响应给应用程序 应用程序解析模型响应 有工具调用,则调用工具并将调用结果和之前的消息记录一并发给模型,继续处理 无工具调用,继续处理程序逻辑或直接给用户 循环上面步骤,达到结束条件则会话完成 火山引擎文档中有一张图多轮工具调用的逻辑图,可以辅助理解 多轮工具调用 MCP 官网 https://modelcontextprotocol.io/ MCP(Model Context Protocol)即模型上下文协议,与 function calling(函数调用)都是实现大语言模型与外部系统交互的关键技术概念 MCP 主要负责规范化函数的具体执行过程,为 AI 模型和外部数据源或工具之间建立统一的通信接口。 二者的关系表现为 function calling 是 MCP 生态下的一种具体功能实现形式。function calling 为 MCP 提供了函数调用的指令来源,而 MCP 则为 function calling 生成的指令提供了标准化的执行框架,确保这些指令能够在不同的外部系统中可靠地执行。 MCP也可以简单理解为function的共享,因此MCP开源社区在最近几个月都非常活跃。 MCP遵循CS架构(Client-Server),几个核心概念: 主机(Host):通常是发起连接的 LLM 应用程序,如 Claude Desktop、IDE 插件等,负责管理客户端实例和安全策略 客户端(Client):位于主机内,是主机与服务器之间的桥梁,与服务器建立 1:1 会话,处理协议协商和消息路由等 服务器(Server):是独立运行的轻量级服务,通过标准化接口提供特定功能,如文件系统访问、数据库查询等 核心架构这块参考官方文档 https://modelcontextprotocol.io/docs/concepts/architecture 传输机制 MCP的client-server间传输层协议当前有两种,都使用JSON-RPC2.0作为消息交换格式: Stdio 进程间通信 适用于命令行等同服务器通信 Client将Server作为子进程启动,Server从其标准输入 (stdin) 读取 JSON-RPC 消息,并将消息发送到其标准输出 (stdout)。Server可以将 UTF-8 字符串写入其标准错误 (stderr) 以进行日志记录,Client可以捕获、转发或忽略此日志记录。Server不能向其 stdout 写入任何不是有效 MCP 消息的内容,Client不能向Server的 stdin 写入任何不是有效 MCP 消息的内容。 ...

April 18, 2025 · 7 min · 1372 words · erpan

golang context包用法理解

同时启很多个goroutine来完成一个任务,在一些必要的情况下如何跟踪或取消这些goroutine?常见的有下面几种方式: WaitGroup,goroutine之间的同步 for select 加上 stop channel来监听消息管理协程 context包 context包就是用来在goroutine之间传递上下文信息的,它提供了超时timeout和取消cancel机制,利用了channel进行信息传递,方便的管理具有复杂层级关系的多个goroutine,这些复杂的层级关系类似于一棵树,可以有多个分叉。Go标准库中的net/http,database/sql等都用到了context包。 接口 context包定义了两个接口 Context 1 2 3 4 5 6 type Context interface { Deadline() (deadline time.Time, ok bool) Done() <-chan struct{} Err() error Value(key interface{}) interface{} } Deadline 方法,第一个返回值是该上下文执行完的截止时间,第二个返回值是布尔值,表示是否设置了截止日期,没设置返回ok==false,否则为true。该方法幂等 Done,返回一个只读通道,在context被取消或者context到了deadline时,此通道会被关闭 WithCancel 上下文,当调用cancel后此通道关闭关闭 WithDeadline,到达deadline时间点后自动关闭此通道 WithTimeout,指定的时间超时后自动关闭此通道 Err,Done返回的通道未被关闭时,Err的返回值是nil,关闭后,返回 context 取消的原因,被取消时返回Canceled,到了截止时间返回 DeadlineExceeded Value,获取该Context上绑定的值,需要注意的是键和值都是any类型。以上四个方法都是幂等的 canceler ,私有接口,表示一个可以被取消cancel的context对象,包内的*cancelCtx 和 *timerCtx结构体实现了此接口 1 2 3 4 type canceler interface { cancel(removeFromParent bool, err, cause error) Done() <-chan struct{} } 创建 几个实现此接口的结构体关系: ...

February 15, 2022 · 3 min · 503 words · erpan

client-go RingGrowingBuffer 环形缓冲区

在client-go源码中,processorListener对象里面定义了一个RingBuffer用于缓存所有尚未分发的事件通知,在此记录下这个RingBuffer。 RingBuffer一般用于数据的缓存机制,例如tcp协议里面数据包的缓冲就利用到了RingBuffer。 client-go中的这个buffer是非线程安全、可增长、无边界的先进先出环形缓冲区。环是一个逻辑上的概念,有了环,此段内存空间就可以重复利用,不用频繁重新申请内存,本质上数据还是存在数组里面的,这个数组的大小可以按需进行倍数扩容,扩容后需要重新分配内存空间并拷贝未消费的数据到新数组来。因为它是数组,内存是预先分配的,数组是内存上连续的一段空间,它有一个容易预测的访问模式,因此对CPU高速缓存友好,垃圾回收(GC)在这种情况下也不用做什么。 在实现上,可以理解为两个指针:a) 读指针、b)写指针。在一段buffer上,读指针控制下一次该读数据的位置,写指针控制下一次该写数据的位置。在数组里面我们可以直接用数组下标。要遵循FIFO原则,读指针不能超过写指针,两指针重叠了要么buffer写满了,要么buffer为空。 参考这里的一张图 目前我们只需要考虑: 存储啥数据类型 数据如何存放 何时空间满了需要扩容 不能丢失未消费数据 k8s中的源码: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 // 源码路径k8s.io/utils/buffer/ring_growing.go package buffer // 非线程安全、可增长的环形缓冲区 type RingGrowing struct { data []interface{} // 存任意数据的数组 n int // 缓冲区大小 beg int // 第一个可用的元素位置索引 readable int // 未消费的元素数量 } // 初始化一个RingBuffer func NewRingGrowing(initialSize int) *RingGrowing { return &RingGrowing{ data: make([]interface{}, initialSize), n: initialSize, } } // 取出未消费元素中的第一个元素 func (r *RingGrowing) ReadOne() (data interface{}, ok bool) { if r.readable == 0 { return nil, false } r.readable-- element := r.data[r.beg] r.data[r.beg] = nil // Remove reference to the object to help GC if r.beg == r.n-1 { // 这种情况就是读到数组最后一个元素了,下次读就得从列表的头部开始,以免越界 r.beg = 0 } else { r.beg++ } return element, true } // 在buffer尾部添加一个元素,buffer满了就扩容 func (r *RingGrowing) WriteOne(data interface{}) { // 满了的情况 if r.readable == r.n { newN := r.n * 2 // 新开辟一个两倍大小的数组 newData := make([]interface{}, newN) to := r.beg + r.readable if to <= r.n { copy(newData, r.data[r.beg:to]) } else { // 未消费的元素在数组两端的情况 copied := copy(newData, r.data[r.beg:]) copy(newData[copied:], r.data[:(to%r.n)]) } r.beg = 0 r.data = newData r.n = newN } r.data[(r.readable+r.beg)%r.n] = data r.readable++ } client-go中的这个buffer是非线程安全、可增长、无边界的先进先出环形缓冲区,因此,在此client-go ringBuffer基础上可以继续考虑下面几个问题: ...

October 11, 2021 · 2 min · 252 words · erpan

读redis-py客户端源码

前言 看别人的代码也是对自己思维和经验的学习丰富过程 作为一个初学者,之前写过的代码量较少,很少涉及到完整的项目开发,看完redis-py库后,get到其中的 连接保活机制 连接池的实现 开辟buffer存入从socket接收来的数据及buffer管理 熟悉了RESP协议 多进程多线程的情况下,利用锁确保连接池数据结构安全 我读的过程中觉得值得注意的是,_in_use_connections使用集合结构;此连接池不是在初始化时创建好一定数量的tcp连接;其中用了两个互斥锁,一个保护连接池,一个保护多进程的池;较多的连接重连,确保连接可用 执行过程 该库主要有Redis、Connection、ConnectionPool、PythonParse、SocketBuffer几个类,下面大概理了一下redis-py的执行过程 开始使用redis-py客户端 class Redis实例化 可以关注下面几个参数: socket_timeout=None, socket_connect_timeout=None, socket_keepalive=None, socket_keepalive_options=None, connection_pool=None, retry_on_timeout=False, max_connections=None, single_connection_client=False, # 是否单个连接,不用连接池 health_check_interval=0, class ConnectionPool连接池初始化ConnectionPool(),此时尚未创建连接 # 此处定义连接池最大连接数 max_connections = max_connections or 2 ** 31 # fork_safe,在_checkpid()方法中用到,保护临界区的锁。这个锁是在进程id改变时获得的。比如fork出一个子进程后,子进程id和池对象中保存的id不一致,那么子进程中的多个线程都可能会先获取此锁,第一个获得锁的线程将重置此池的数据结构并最终释放锁对象,后续的线程再执行时,pid已于子线程池中的pid熟悉一致,不再做其他操作,在下面也会有提到 self._fork_lock = threading.Lock() # 定义了并初始化已创建连接数、使用中的链接、可用的连接等数据结构 self._lock = threading.Lock() self._created_connections = 0 self._available_connections = [] self._in_use_connections = set() 注意此处_in_use_connections使用了集合存储池中的连接对象,这个与python数据类型时间复杂度有关,可点此参考官网,集合的内部实现与字典极为相似,此集合对象只用到两个操作,add和remove,时间复杂度均为O(1),(有误烦请指正🤡🤡🤡) 初始化Redis-Client状态信息完毕,此时还没有任何连接被创建 假设开始执行 r.set('foo', 'baiqi'),此方法返回r.excute_command()的结果 首先尝试从池中获取一个Connection对象 pool.get_connection(command_name, **options) 获取时得先执行下_checkpid()方法,再执行get_connection() ...

March 6, 2020 · 1 min · 208 words · erpan