AI Agent编排范式
AI Agent编排范式 范式主要整理自Eino框架,文中有些概念仅属于Eino框架 引言 AI应用的核心任务极为简单: 接收指令 调用大模型 安全地执行工具(运行命令、操作文件、控制浏览器) 之后将结果反馈给你 三个关键原则,是解决一切花哨问题的基础: 执行是能控制的 状态是能追溯的 失败是能复盘的 AI应用的核心任务,即ReAct模式,通过让 ChatModel 进行显式的、一步一步的"思考"来解决复杂问题,在Eino ADK中将其直接抽象为ChatModelAgent了。在不同的领域、不同的场景下有各种各样的模型,也有各种各样的Agent来处理专有领域的任务,如Coding模型、视觉模型等。一个复杂的任务通常需要调用各种能力的模型和各种工具来完成任务,这就需要把这些拥有专有能力的Agent编排起来组成一个功能完善、强大的应用。 AI Agent编排范式分类 AI Agent编排范式大概分为以下几类: mindmap root((AI Agent编排范式)) Eino框架原生范式 基础编排层 Chain链式编排: Graph的封装,除了 "环" 之外,Chain 暴露了几乎所有 Graph 的能力。线性单向流/简单任务 Graph图编排: 支持pregel和dag模式/分支/并行/循环/复杂逻辑 Workflow工作流编排: DAG模式,字段级别映射,控制流/数据流分离 图嵌套:Eino中的chain、graph、workflow都可以嵌套进图 Agent编排 ChatModelAgent: 智能决策大脑,实现ReAct模式,LLM交互核心 Workflow Agents: 流程协调 Sequential Agent: 串行执行按顺序流转,如CI/CD流水线,数据ETL Parallel Agent: 并行执行/共享输入/结果聚合,如多源数据采集、多渠道推送 Loop Agent: 循环执行/结果积累/条件退出,数据同步、压力测试 Custom Agent: 可高度定制 预构建Multi-Agent Plan-Execute Agent: 规划-执行-重规划,结构化解决复杂任务,如复杂任务研究,智能助理任务执行 Supervisor Agent: 监督者统筹/子Agent分工/汇总决策 DeepAgents: 规划驱动集中协作,强化拆解与上下文隔离 通用行业主流范式 经典协作模式 Master-Slave(主从): 主Agent分配任务/从Agent执行/无自主决策 Swarm(蜂群): 无中心/Agent自主协作/去中心化/高容错 Hierarchical(层级): 多层级Agent/上层决策/下层执行/复杂系统 Peer-to-Peer(对等): 平级Agent/自主协商/资源共享/无核心节点 详细编排范式解析 1. 基础编排层 Chain(链式编排) 定义: Graph的封装,除了"环"之外,Chain暴露了几乎所有Graph的能力,适用于线性单向流/简单任务 使用场景: 数据处理流水线(如ETL过程) 顺序执行的任务链 简单的工作流程 Graph(图编排) 定义: 支持Pregel和DAG模式,能够处理分支、并行、循环等复杂逻辑 使用场景: 复杂业务逻辑处理 需要条件分支和循环的任务 并行计算任务 Workflow(工作流编排) 定义: DAG模式,不支持环,支持字段级别映射,强调控制流和数据流分离 使用场景: 数据集成和转换 需要精确字段映射的场景 复杂的数据处理管道 2. Agent编排层 ChatModelAgent 定义: 智能决策大脑,实现ReAct模式,是LLM交互核心 使用场景: 需要推理和决策的场景 自然语言交互 复杂问题分解 Workflow Agents Sequential Agent 定义: 串行执行按顺序流转 使用场景: 研究报告撰写流程 CI/CD流水线 数据提取、转换和加载(ETL) flowchart LR subgraph one[Sequential Agent] PlanAgent[Plan Agent<br/>制定研究计划] --> WriteAgent[Write Agent<br/>撰写学术报告] end Start[开始研究主题] --> one one --> End[结束] Parallel Agent 定义: 并行执行/共享输入/结果聚合 使用场景: 多源数据采集 并行数据分析 多渠道信息收集 flowchart LR subgraph p[彼此之间无需交互,功能边界清晰] a[Stock Data Collection Agent<br/>股票数据采集] b[News Data Collection Agent<br/>新闻数据采集] c[Social Media Info Collection Agent<br/>社媒信息采集] d[聚合] end Start[开始] --> a Start --> b Start --> c a --> d b --> d c --> d d --> End[结束] Loop Agent 定义: 循环执行/结果积累/条件退出 使用场景: 解决方案优化迭代 数据同步、压力测试 反馈循环改进 flowchart LR subgraph loop[反思迭代] a[Main Agent<br/>生成初步解决方案] --> b[Critique Agent<br/>质量审查/反馈改进] b -->|结果不满意<br/>MaxIteration=5| a end Start[开始] --> loop loop -->|结果满足| End[结束] 3. 预构建Multi-Agent范式 Plan-Execute Agent 定义: 规划-执行-重规划,适合需要多步骤推理、动态调整和工具集成的复杂任务场景 使用场景: 复杂研究任务 需要多步规划的项目 动态任务调整 flowchart TD A[1. UserInput] --> B[Planner] B -->|2. 生成具体的计划步骤| C[Plan Steps<br/>1. ...<br/>2. ...<br/>3. ...] C -->|3. 执行计划的第一步| D[Executor] D -->|4. 传达当前的计划和第一步运行的结果| E[Replanner] E -->|5a. 评估并生成已调整的新计划<br/>交给Executor执行新计划的第一步| D E -->|5b. 评估任务执行的完成情况<br/>或达到最大迭代次数| F[End] Supervisor Agent 定义: 监督者统筹/子Agent分工/汇总决策 使用场景: 多Agent任务分配和管理 动态任务路由 Agent间的协调 flowchart LR subgraph subagents a[Agent1] b[Agent2] c[Agent3] end s[Supervisor Agent] <--> |dispatch mission</br>return result| a s <--> |dispatch mission</br>return result| b s <--> |dispatch mission</br>return result| c s --> Exit Layered-Supervisor 使用场景: 多层级任务管理和动态路由 flowchart LR TopSupervisor[Supervisor Agent<br/>顶层监督者-动态路由] --> Research[Research Agent<br/>信息检索] TopSupervisor --> MathSuper[Math Agent<br/>中层监督者-数学运算] MathSuper --> Subtract[Subtract Agent<br/>减法运算] MathSuper --> Multiply[Multiply Agent<br/>乘法运算] MathSuper --> Divide[Divide Agent<br/>除法运算] DeepAgents 定义:规划驱动的集中式协作,Main Agent 统一协调下的 Multi-Agent 模式 流程: 通过 WriteTodos 将用户目标拆解为结构化待办并记录进度 通过统一入口 TaskTool 选择并调用对应的 SubAgent 执行子任务;主/子代理上下文隔离,避免中间步骤污染主流程。 汇总各子代理返回的结果;必要时再次调用 WriteTodos 更新进度或进行重规划,直至完成。 适用场景: 多角色协作的复杂业务流程,集中委派子任务并统一汇总 严格上下文隔离的执行环境 flowchart TD subgraph MainAgent ChatModel[ChatModel] subgraph Tools WriteTodos[WriteTodos] TaskTool[TaskTool] CustomTools[CustomTools] end end subgraph SubAgents GeneralPurpose[GeneralPurpose] CustomSubAgents[CustomSubAgents] end ChatModel -->|Reasoning & Action| Tools Tools --> ChatModel TaskTool -->|Delegate Tasks| SubAgents SubAgents -->|Return Results| TaskTool 4. 其他主流范式 Master-Slave(主从模式) 定义: 主Agent分配任务/从Agent执行/无自主决策 使用场景: 任务分配明确的系统 标准化执行任务 控制权集中的场景 Swarm(蜂群模式) 定义: 无中心/Agent自主协作/去中心化/高容错 使用场景: 去中心化系统 高容错需求 自适应环境 Hierarchical(层级模式) 定义: 多层级Agent/上层决策/下层执行/复杂系统 使用场景: 大型复杂系统 分层决策机制 组织化任务分配 Peer-to-Peer(对等模式) 定义: 平级Agent/自主协商/资源共享/无核心节点 使用场景: 平等协作环境 资源共享 无单点故障需求 选择编排范式 任务复杂度 简单 → Chain/Sequential 中等 → Graph/Workflow 复杂 → Plan-Execute/Supervisor 执行模式 顺序 → Chain/Sequential 并行 → Parallel 迭代 → Loop 动态调整 → Plan-Execute 总结 AI Agent编排范式的选择需要根据具体的应用场景、任务复杂度和系统需求进行综合考虑。无论选择哪种范式,都需要遵循"执行可控、状态可追溯、失败可复盘"的原则,确保系统的稳定性和可维护性。 ...
基于Eino框架理解大模型工具调用
函数调用 Function Calling Function Calling 是一种将大模型与外部工具和 API 相连的关键功能,大模型能够将用户的自然语言智能地转化为对特定工具或 API 的调用,从而高效满足各种场景需求,如动态信息查询、任务自动化等 工具调用的一般步骤: 应用程序将用户问题和tools列表一起发送给大模型,tools列表表明模型可以调用的工具 LLM 对用户意图进行分析,决定是否需要使用工具以及使用哪些工具 a. 无需工具则生成回答响应给应用程序 b. 需要调用工具输出工具名和参数信息响应给应用程序 应用程序解析模型响应 有工具调用,则调用工具并将调用结果和之前的消息记录一并发给模型,继续处理 无工具调用,继续处理程序逻辑或直接给用户 循环上面步骤,达到结束条件则会话完成 火山引擎文档中有一张图多轮工具调用的逻辑图,可以辅助理解 多轮工具调用 MCP 官网 https://modelcontextprotocol.io/ MCP(Model Context Protocol)即模型上下文协议,与 function calling(函数调用)都是实现大语言模型与外部系统交互的关键技术概念 MCP 主要负责规范化函数的具体执行过程,为 AI 模型和外部数据源或工具之间建立统一的通信接口。 二者的关系表现为 function calling 是 MCP 生态下的一种具体功能实现形式。function calling 为 MCP 提供了函数调用的指令来源,而 MCP 则为 function calling 生成的指令提供了标准化的执行框架,确保这些指令能够在不同的外部系统中可靠地执行。 MCP也可以简单理解为function的共享,因此MCP开源社区在最近几个月都非常活跃。 MCP遵循CS架构(Client-Server),几个核心概念: 主机(Host):通常是发起连接的 LLM 应用程序,如 Claude Desktop、IDE 插件等,负责管理客户端实例和安全策略 客户端(Client):位于主机内,是主机与服务器之间的桥梁,与服务器建立 1:1 会话,处理协议协商和消息路由等 服务器(Server):是独立运行的轻量级服务,通过标准化接口提供特定功能,如文件系统访问、数据库查询等 核心架构这块参考官方文档 https://modelcontextprotocol.io/docs/concepts/architecture 传输机制 MCP的client-server间传输层协议当前有两种,都使用JSON-RPC2.0作为消息交换格式: Stdio 进程间通信 适用于命令行等同服务器通信 Client将Server作为子进程启动,Server从其标准输入 (stdin) 读取 JSON-RPC 消息,并将消息发送到其标准输出 (stdout)。Server可以将 UTF-8 字符串写入其标准错误 (stderr) 以进行日志记录,Client可以捕获、转发或忽略此日志记录。Server不能向其 stdout 写入任何不是有效 MCP 消息的内容,Client不能向Server的 stdin 写入任何不是有效 MCP 消息的内容。 ...
实现一个MutatingAdmissionWebhook
官方文档 概述 在 Kubernetes 中,准入控制是保障 API 请求安全和合规性的重要机制。它在 API 请求流程中扮演着关键角色,拦截即将发送到 Kubernetes APIServer的请求,在持久化之前,但在身份验证和授权之后。其位置如下图所示: 图片来源https://sysdig.com/blog/kubernetes-admission-controllers/ 准入控制器适用于创建、删除或修改对象的请求,同时也可以阻止自定义动作。读操作会绕过准入控制层,不会受到准入控制器的影响。当有多个准入控制器存在时,它们会依次被调用,只要其中一个准入控制器拒绝请求,整个请求就会被立即驳回。 准入控制常常被用来自动化设置默认资源请求和限制、应用标签和注释以及强制命名约定等管理任务。 准入控制分两个阶段: 变更准入控制器,用于在请求处理过程中对对象进行修改。 验证准入控制器,负责验证请求是否符合特定规则。 需要注意的是,部分控制器兼具变更准入和验证准入的功能。 启用准入 1 kube-apiserver --enable-admission-plugins=NamespaceLifecycle,LimitRanger ... 关闭准入 1 kube-apiserver --disable-admission-plugins=PodNodeSelector,AlwaysDeny ... 准入的两种类型: 静态准入:由 Kubernetes 内置提供,无需额外配置。 动态准入:允许用户根据自身需求进行扩展。 动态准入控制器: MutatingAdmissionWebhook 此准入控制器调用任何与请求匹配的变更(Mutating) Webhook。匹配的 Webhook 将被顺序调用 ValidatingAdmissionWebhook 此准入控制器调用与请求匹配的所有验证性 Webhook。 匹配的 Webhook 将被并行调用。如果其中任何一个拒绝请求,则整个请求将失败。 该准入控制器仅在验证(Validating)阶段运行 ValidatingAdmissionPolicy 验证准入策略使用通用表达语言 (Common Expression Language,CEL) 来声明策略的验证规则,是一种声明式的、进程内的验证准入 Webhook 方案 MutatingAdmissionPolicy 提供了一种声明式的、进程内的方案, 可以用来替代变更性准入 Webhook 实现 步骤概览 实现一个 MutatingAdmissionWebhook 主要包括以下几个步骤: 生成证书,Kubernetes 和 Webhook 服务使用 TLS 加密通信 APIServer对Webhook的认证 Webhook对APIServer的认证(可选) 实现一个接口,接收AdmissionReview请求,解析并填充response字段,响应相同版本的AdmissionReview 部署上面接口,集群内外都可以 注册准入控制器 监控 Admission Webhook 以下面需求为例: ...
OAuth 2.0 与 OpenID Connect
在现代身份认证与授权体系中,OAuth 2.0 和 OpenID Connect(OIDC) 经常被一起提及,但它们解决的是不同层面的问题。理解两者的区别和协作关系,是构建安全、标准的身份系统的关键。 是什么? OAuth 2.0 不是身份验证协议,而是一个授权框架,允许第三方应用在用户授权下访问受保护资源,而无需获取用户密码。 OpenID Connect(OIDC)是一种基于OAuth 2.0协议构建的身份认证标准,旨在为现代应用程序提供简单、安全且可互操作的用户身份验证方式。它通过在OAuth 2.0的授权框架之上添加一个身份层,解决了OAuth 2.0本身不完善的身份认证问题。 重要区分: OAuth 2.0 = 授权 OpenID Connect = 身份认证+授权 OAuth2.0授权流程 OAuth2 定义了好几种授权模式,有Authorization Code、Client Credentials、Device Code、Implicit Flow和Password Credentials,隐式流和密码凭据模式被认为是不安全的,不推荐使用。 核心角色: 角色 描述 Resource Owner(资源所有者) 通常是用户自己 Client(客户端) 请求访问用户资源的应用(Web、移动、单页应用SPA 等) Authorization Server(授权服务器) 颁发令牌(如 Casdoor、Keycloak、Google OAuth) Resource Server(资源服务器) 托管用户数据(如 API 服务),接受并验证访问令牌,以确定请求是否合法 授权码模式(Authorization Code Flow) 适用于:服务器端 Web 应用、移动 App、单页应用SPA(配合 PKCE 使用) sequenceDiagram participant User as 用户 participant Client as 客户端 participant AS as 授权服务器 participant RS as 资源服务器 User->>Client: 访问应用 Client-->>User: 重定向至授权服务器<br>(client_id, redirect_uri, scope, state, code_challenge) User->>AS: 登录并同意授权 AS-->>User: 重定向回客户端<br>(?code=xxx&state=yyy) User->>Client: 浏览器携带授权码 Client->>AS: POST /token<br>(code, client_id, client_secret*, code_verifier) AS-->>Client: 返回 access_token (+ refresh_token) Client->>RS: GET /api/data<br>Authorization: Bearer <access_token> RS-->>Client: 返回受保护资源 流程 ...
磁盘IO类型
仅作为个人笔记,不保证完全的准确性和正确性,请自行甄别 顺序写 文件IO中的“顺序写”通常指的是对文件进行连续写入操作的过程,即从文件的一个位置开始,依次向后写入数据,即追加。可以提高写入效率,尤其是在传统的机械硬盘(HDD)上,因为不需要频繁地在不同的位置之间切换,减少了寻道时间和旋转延迟。 需要注意的是,“顺序写”并不直接等同于物理磁盘上的连续空间写入。虽然理想情况下,操作系统和文件系统会尽量将文件的数据块分配到物理上连续的存储空间中,以提高读写性能,但实际上由于多种因素(如文件系统的碎片、先前删除文件留下的空洞、以及其他文件的存在等),很难保证文件的所有部分都能被分配到完全连续的物理空间中。因此,通常说的“顺序写”,更多是指逻辑上的连续写入,即按照文件内部的偏移量顺序写入数据,而不是指物理磁盘上的连续写入。 即,“顺序写”主要关注的是逻辑层面的连续性,而物理层面的连续性则是文件系统和操作系统尽力优化的结果。 示例 顺序写: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 package main import ( "fmt" "os" ) func main() { fileName := "t.bin" fileSize := 1 << 30 // 1GB 1073741824 // fileSize := 1073741825 // 测试不按照块的倍数写 // 创建并截断文件(直接分配空间) file, err := os.Create(fileName) if err != nil { fmt.Println("Error creating file:", err) return } defer file.Close() fInfo, err := file.Stat() if err != nil { fmt.Println("Error getting file size:", err) } fmt.Println("create: ", fInfo.Size()) err = file.Truncate(int64(fileSize)) if err != nil { fmt.Println("Error truncating file:", err) return } fInfo, _ = file.Stat() fmt.Println("truncate: ", fInfo.Size()) randomBlock := make([]byte, 0x1<<10<<2) // 4kB for i := 0; i < len(randomBlock); i++ { randomBlock[i] = 0x0 } bytesWritten := 0 for i := 0; i < fileSize; i += len(randomBlock) { n, err := file.Write(randomBlock) if err != nil { fmt.Println("Error writing to file:", err) } fmt.Println(n) bytesWritten += n } file.Sync() fmt.Println("written: ", bytesWritten) fInfo, err = file.Stat() if err != nil { fmt.Println("Error getting file size:", err) } fmt.Println("finally: ", fInfo.Size()) // 文件指针在末尾 n, err := file.Read(randomBlock) fmt.Println("read nums: ", n) fmt.Println("err: ", err) } 输出: ...