golang context包用法理解

同时启很多个goroutine来完成一个任务,在一些必要的情况下如何跟踪或取消这些goroutine?常见的有下面几种方式: WaitGroup,goroutine之间的同步 for select 加上 stop channel来监听消息管理协程 context包 context包就是用来在goroutine之间传递上下文信息的,它提供了超时timeout和取消cancel机制,利用了channel进行信息传递,方便的管理具有复杂层级关系的多个goroutine,这些复杂的层级关系类似于一棵树,可以有多个分叉。Go标准库中的net/http,database/sql等都用到了context包。 接口 context包定义了两个接口 Context 1 2 3 4 5 6 type Context interface { Deadline() (deadline time.Time, ok bool) Done() <-chan struct{} Err() error Value(key interface{}) interface{} } Deadline 方法,第一个返回值是该上下文执行完的截止时间,第二个返回值是布尔值,表示是否设置了截止日期,没设置返回ok==false,否则为true。该方法幂等 Done,返回一个只读通道,在context被取消或者context到了deadline时,此通道会被关闭 WithCancel 上下文,当调用cancel后此通道关闭关闭 WithDeadline,到达deadline时间点后自动关闭此通道 WithTimeout,指定的时间超时后自动关闭此通道 Err,Done返回的通道未被关闭时,Err的返回值是nil,关闭后,返回 context 取消的原因,被取消时返回Canceled,到了截止时间返回 DeadlineExceeded Value,获取该Context上绑定的值,需要注意的是键和值都是any类型。以上四个方法都是幂等的 canceler ,私有接口,表示一个可以被取消cancel的context对象,包内的*cancelCtx 和 *timerCtx结构体实现了此接口 1 2 3 4 type canceler interface { cancel(removeFromParent bool, err, cause error) Done() <-chan struct{} } 创建 几个实现此接口的结构体关系:...

February 15, 2022 · 3 min · 507 words · erpan

client-go RingGrowingBuffer 环形缓冲区

在client-go源码中,processorListener对象里面定义了一个RingBuffer用于缓存所有尚未分发的事件通知,在此记录下这个RingBuffer。 RingBuffer一般用于数据的缓存机制,例如tcp协议里面数据包的缓冲就利用到了RingBuffer。 client-go中的这个buffer是非线程安全、可增长、无边界的先进先出环形缓冲区。环是一个逻辑上的概念,有了环,此段内存空间就可以重复利用,不用频繁重新申请内存,本质上数据还是存在数组里面的,这个数组的大小可以按需进行倍数扩容,扩容后需要重新分配内存空间并拷贝未消费的数据到新数组来。因为它是数组,内存是预先分配的,数组是内存上连续的一段空间,它有一个容易预测的访问模式,因此对CPU高速缓存友好,垃圾回收(GC)在这种情况下也不用做什么。 在实现上,可以理解为两个指针:a) 读指针、b)写指针。在一段buffer上,读指针控制下一次该读数据的位置,写指针控制下一次该写数据的位置。在数组里面我们可以直接用数组下标。要遵循FIFO原则,读指针不能超过写指针,两指针重叠了要么buffer写满了,要么buffer为空。 参考这里的一张图 目前我们只需要考虑: 存储啥数据类型 数据如何存放 何时空间满了需要扩容 不能丢失未消费数据 k8s中的源码: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 // 源码路径k8s....

October 11, 2021 · 2 min · 252 words · erpan

读redis-py客户端源码

前言 看别人的代码也是对自己思维和经验的学习丰富过程 作为一个初学者,之前写过的代码量较少,很少涉及到完整的项目开发,看完redis-py库后,get到其中的 连接保活机制 连接池的实现 开辟buffer存入从socket接收来的数据及buffer管理 熟悉了RESP协议 多进程多线程的情况下,利用锁确保连接池数据结构安全 我读的过程中觉得值得注意的是,_in_use_connections使用集合结构;此连接池不是在初始化时创建好一定数量的tcp连接;其中用了两个互斥锁,一个保护连接池,一个保护多进程的池;较多的连接重连,确保连接可用 执行过程 该库主要有Redis、Connection、ConnectionPool、PythonParse、SocketBuffer几个类,下面大概理了一下redis-py的执行过程 开始使用redis-py客户端 class Redis实例化 可以关注下面几个参数: socket_timeout=None, socket_connect_timeout=None, socket_keepalive=None, socket_keepalive_options=None, connection_pool=None, retry_on_timeout=False, max_connections=None, single_connection_client=False, # 是否单个连接,不用连接池 health_check_interval=0, class ConnectionPool连接池初始化ConnectionPool(),此时尚未创建连接 # 此处定义连接池最大连接数 max_connections = max_connections or 2 ** 31 # fork_safe,在_checkpid()方法中用到,保护临界区的锁。这个锁是在进程id改变时获得的。比如fork出一个子进程后,子进程id和池对象中保存的id不一致,那么子进程中的多个线程都可能会先获取此锁,第一个获得锁的线程将重置此池的数据结构并最终释放锁对象,后续的线程再执行时,pid已于子线程池中的pid熟悉一致,不再做其他操作,在下面也会有提到 self._fork_lock = threading.Lock() # 定义了并初始化已创建连接数、使用中的链接、可用的连接等数据结构 self._lock = threading.Lock() self._created_connections = 0 self._available_connections = [] self._in_use_connections = set() 注意此处_in_use_connections使用了集合存储池中的连接对象,这个与python数据类型时间复杂度有关,可点此参考官网,集合的内部实现与字典极为相似,此集合对象只用到两个操作,add和remove,时间复杂度均为O(1),(有误烦请指正🤡🤡🤡) 初始化Redis-Client状态信息完毕,此时还没有任何连接被创建 假设开始执行 r.set('foo', 'baiqi'),此方法返回r.excute_command()的结果 首先尝试从池中获取一个Connection对象 pool.get_connection(command_name, **options) 获取时得先执行下_checkpid()方法,再执行get_connection() 在现代操作系统下保证ConnectionPool fork-safe,连接池的所有方法都会先调用此方法再来操纵连接池的状态。如果当前pid和池对象中保存的pid不一致,可以假设当前进程是fork出来的子进程,子进程不能用父进程的文件描述符(比如sockets),因此它会继续调用self.reset()方法重新初始化当前进程的连接池;如果pids都一致那么直接pass。 而self._fork_lock就是确保了在子进程中的多个线程不会多次执行self.reset()方法。因为在子进程中,第一个调用_checkpid的线程调用了reset()方法使得self.pid置为当前子进程的id。 从池中获取连接时加锁保护,如果池中_available_connections.pop()没有连接,那么开始创建连接make_connection(),并将此连接加入到池的已使用连接集合中,即self._in_use_connections集合 创建连接时得先确认下连接数是不是超过了最大连接数配置,没有则继续创建,返回Connection对象。所以这里的tcp连接数不是在应用一初始化就按照池配置中的连接数来直接一次性创建好多少个连接...

March 6, 2020 · 1 min · 208 words · erpan