基于Eino框架理解大模型工具调用

函数调用 Function Calling Function Calling 是一种将大模型与外部工具和 API 相连的关键功能,大模型能够将用户的自然语言智能地转化为对特定工具或 API 的调用,从而高效满足各种场景需求,如动态信息查询、任务自动化等 工具调用的一般步骤: 应用程序将用户问题和tools列表一起发送给大模型,tools列表表明模型可以调用的工具 LLM 对用户意图进行分析,决定是否需要使用工具以及使用哪些工具 a. 无需工具则生成回答响应给应用程序 b. 需要调用工具输出工具名和参数信息响应给应用程序 应用程序解析模型响应 有工具调用,则调用工具并将调用结果和之前的消息记录一并发给模型,继续处理 无工具调用,继续处理程序逻辑或直接给用户 循环上面步骤,达到结束条件则会话完成 火山引擎文档中有一张图多轮工具调用的逻辑图,可以辅助理解 多轮工具调用 MCP 官网 https://modelcontextprotocol.io/ MCP(Model Context Protocol)即模型上下文协议,与 function calling(函数调用)都是实现大语言模型与外部系统交互的关键技术概念 MCP 主要负责规范化函数的具体执行过程,为 AI 模型和外部数据源或工具之间建立统一的通信接口。 二者的关系表现为 function calling 是 MCP 生态下的一种具体功能实现形式。function calling 为 MCP 提供了函数调用的指令来源,而 MCP 则为 function calling 生成的指令提供了标准化的执行框架,确保这些指令能够在不同的外部系统中可靠地执行。 MCP也可以简单理解为function的共享,因此MCP开源社区在最近几个月都非常活跃。 MCP遵循CS架构(Client-Server),几个核心概念: 主机(Host):通常是发起连接的 LLM 应用程序,如 Claude Desktop、IDE 插件等,负责管理客户端实例和安全策略 客户端(Client):位于主机内,是主机与服务器之间的桥梁,与服务器建立 1:1 会话,处理协议协商和消息路由等 服务器(Server):是独立运行的轻量级服务,通过标准化接口提供特定功能,如文件系统访问、数据库查询等 核心架构这块参考官方文档 https://modelcontextprotocol.io/docs/concepts/architecture 传输机制 MCP的client-server间传输层协议当前有两种,都使用JSON-RPC2.0作为消息交换格式: Stdio 进程间通信 适用于命令行等同服务器通信 Client将Server作为子进程启动,Server从其标准输入 (stdin) 读取 JSON-RPC 消息,并将消息发送到其标准输出 (stdout)。Server可以将 UTF-8 字符串写入其标准错误 (stderr) 以进行日志记录,Client可以捕获、转发或忽略此日志记录。Server不能向其 stdout 写入任何不是有效 MCP 消息的内容,Client不能向Server的 stdin 写入任何不是有效 MCP 消息的内容。...

April 18, 2025 · 6 min · 1171 words · erpan

golang context包用法理解

同时启很多个goroutine来完成一个任务,在一些必要的情况下如何跟踪或取消这些goroutine?常见的有下面几种方式: WaitGroup,goroutine之间的同步 for select 加上 stop channel来监听消息管理协程 context包 context包就是用来在goroutine之间传递上下文信息的,它提供了超时timeout和取消cancel机制,利用了channel进行信息传递,方便的管理具有复杂层级关系的多个goroutine,这些复杂的层级关系类似于一棵树,可以有多个分叉。Go标准库中的net/http,database/sql等都用到了context包。 接口 context包定义了两个接口 Context 1 2 3 4 5 6 type Context interface { Deadline() (deadline time.Time, ok bool) Done() <-chan struct{} Err() error Value(key interface{}) interface{} } Deadline 方法,第一个返回值是该上下文执行完的截止时间,第二个返回值是布尔值,表示是否设置了截止日期,没设置返回ok==false,否则为true。该方法幂等 Done,返回一个只读通道,在context被取消或者context到了deadline时,此通道会被关闭 WithCancel 上下文,当调用cancel后此通道关闭关闭 WithDeadline,到达deadline时间点后自动关闭此通道 WithTimeout,指定的时间超时后自动关闭此通道 Err,Done返回的通道未被关闭时,Err的返回值是nil,关闭后,返回 context 取消的原因,被取消时返回Canceled,到了截止时间返回 DeadlineExceeded Value,获取该Context上绑定的值,需要注意的是键和值都是any类型。以上四个方法都是幂等的 canceler ,私有接口,表示一个可以被取消cancel的context对象,包内的*cancelCtx 和 *timerCtx结构体实现了此接口 1 2 3 4 type canceler interface { cancel(removeFromParent bool, err, cause error) Done() <-chan struct{} } 创建 几个实现此接口的结构体关系:...

February 15, 2022 · 3 min · 507 words · erpan

client-go RingGrowingBuffer 环形缓冲区

在client-go源码中,processorListener对象里面定义了一个RingBuffer用于缓存所有尚未分发的事件通知,在此记录下这个RingBuffer。 RingBuffer一般用于数据的缓存机制,例如tcp协议里面数据包的缓冲就利用到了RingBuffer。 client-go中的这个buffer是非线程安全、可增长、无边界的先进先出环形缓冲区。环是一个逻辑上的概念,有了环,此段内存空间就可以重复利用,不用频繁重新申请内存,本质上数据还是存在数组里面的,这个数组的大小可以按需进行倍数扩容,扩容后需要重新分配内存空间并拷贝未消费的数据到新数组来。因为它是数组,内存是预先分配的,数组是内存上连续的一段空间,它有一个容易预测的访问模式,因此对CPU高速缓存友好,垃圾回收(GC)在这种情况下也不用做什么。 在实现上,可以理解为两个指针:a) 读指针、b)写指针。在一段buffer上,读指针控制下一次该读数据的位置,写指针控制下一次该写数据的位置。在数组里面我们可以直接用数组下标。要遵循FIFO原则,读指针不能超过写指针,两指针重叠了要么buffer写满了,要么buffer为空。 参考这里的一张图 目前我们只需要考虑: 存储啥数据类型 数据如何存放 何时空间满了需要扩容 不能丢失未消费数据 k8s中的源码: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 // 源码路径k8s....

October 11, 2021 · 2 min · 252 words · erpan

读redis-py客户端源码

前言 看别人的代码也是对自己思维和经验的学习丰富过程 作为一个初学者,之前写过的代码量较少,很少涉及到完整的项目开发,看完redis-py库后,get到其中的 连接保活机制 连接池的实现 开辟buffer存入从socket接收来的数据及buffer管理 熟悉了RESP协议 多进程多线程的情况下,利用锁确保连接池数据结构安全 我读的过程中觉得值得注意的是,_in_use_connections使用集合结构;此连接池不是在初始化时创建好一定数量的tcp连接;其中用了两个互斥锁,一个保护连接池,一个保护多进程的池;较多的连接重连,确保连接可用 执行过程 该库主要有Redis、Connection、ConnectionPool、PythonParse、SocketBuffer几个类,下面大概理了一下redis-py的执行过程 开始使用redis-py客户端 class Redis实例化 可以关注下面几个参数: socket_timeout=None, socket_connect_timeout=None, socket_keepalive=None, socket_keepalive_options=None, connection_pool=None, retry_on_timeout=False, max_connections=None, single_connection_client=False, # 是否单个连接,不用连接池 health_check_interval=0, class ConnectionPool连接池初始化ConnectionPool(),此时尚未创建连接 # 此处定义连接池最大连接数 max_connections = max_connections or 2 ** 31 # fork_safe,在_checkpid()方法中用到,保护临界区的锁。这个锁是在进程id改变时获得的。比如fork出一个子进程后,子进程id和池对象中保存的id不一致,那么子进程中的多个线程都可能会先获取此锁,第一个获得锁的线程将重置此池的数据结构并最终释放锁对象,后续的线程再执行时,pid已于子线程池中的pid熟悉一致,不再做其他操作,在下面也会有提到 self._fork_lock = threading.Lock() # 定义了并初始化已创建连接数、使用中的链接、可用的连接等数据结构 self._lock = threading.Lock() self._created_connections = 0 self._available_connections = [] self._in_use_connections = set() 注意此处_in_use_connections使用了集合存储池中的连接对象,这个与python数据类型时间复杂度有关,可点此参考官网,集合的内部实现与字典极为相似,此集合对象只用到两个操作,add和remove,时间复杂度均为O(1),(有误烦请指正🤡🤡🤡) 初始化Redis-Client状态信息完毕,此时还没有任何连接被创建 假设开始执行 r.set('foo', 'baiqi'),此方法返回r.excute_command()的结果 首先尝试从池中获取一个Connection对象 pool.get_connection(command_name, **options) 获取时得先执行下_checkpid()方法,再执行get_connection() 在现代操作系统下保证ConnectionPool fork-safe,连接池的所有方法都会先调用此方法再来操纵连接池的状态。如果当前pid和池对象中保存的pid不一致,可以假设当前进程是fork出来的子进程,子进程不能用父进程的文件描述符(比如sockets),因此它会继续调用self.reset()方法重新初始化当前进程的连接池;如果pids都一致那么直接pass。 而self._fork_lock就是确保了在子进程中的多个线程不会多次执行self.reset()方法。因为在子进程中,第一个调用_checkpid的线程调用了reset()方法使得self.pid置为当前子进程的id。 从池中获取连接时加锁保护,如果池中_available_connections.pop()没有连接,那么开始创建连接make_connection(),并将此连接加入到池的已使用连接集合中,即self._in_use_connections集合 创建连接时得先确认下连接数是不是超过了最大连接数配置,没有则继续创建,返回Connection对象。所以这里的tcp连接数不是在应用一初始化就按照池配置中的连接数来直接一次性创建好多少个连接...

March 6, 2020 · 1 min · 208 words · erpan