读redis-py客户端源码
前言 看别人的代码也是对自己思维和经验的学习丰富过程 作为一个初学者,之前写过的代码量较少,很少涉及到完整的项目开发,看完redis-py库后,get到其中的 连接保活机制 连接池的实现 开辟buffer存入从socket接收来的数据及buffer管理 熟悉了RESP协议 多进程多线程的情况下,利用锁确保连接池数据结构安全 我读的过程中觉得值得注意的是,_in_use_connections使用集合结构;此连接池不是在初始化时创建好一定数量的tcp连接;其中用了两个互斥锁,一个保护连接池,一个保护多进程的池;较多的连接重连,确保连接可用 执行过程 该库主要有Redis、Connection、ConnectionPool、PythonParse、SocketBuffer几个类,下面大概理了一下redis-py的执行过程 开始使用redis-py客户端 class Redis实例化 可以关注下面几个参数: socket_timeout=None, socket_connect_timeout=None, socket_keepalive=None, socket_keepalive_options=None, connection_pool=None, retry_on_timeout=False, max_connections=None, single_connection_client=False, # 是否单个连接,不用连接池 health_check_interval=0, class ConnectionPool连接池初始化ConnectionPool(),此时尚未创建连接 # 此处定义连接池最大连接数 max_connections = max_connections or 2 ** 31 # fork_safe,在_checkpid()方法中用到,保护临界区的锁。这个锁是在进程id改变时获得的。比如fork出一个子进程后,子进程id和池对象中保存的id不一致,那么子进程中的多个线程都可能会先获取此锁,第一个获得锁的线程将重置此池的数据结构并最终释放锁对象,后续的线程再执行时,pid已于子线程池中的pid熟悉一致,不再做其他操作,在下面也会有提到 self._fork_lock = threading.Lock() # 定义了并初始化已创建连接数、使用中的链接、可用的连接等数据结构 self._lock = threading.Lock() self._created_connections = 0 self._available_connections = [] self._in_use_connections = set() 注意此处_in_use_connections使用了集合存储池中的连接对象,这个与python数据类型时间复杂度有关,可点此参考官网,集合的内部实现与字典极为相似,此集合对象只用到两个操作,add和remove,时间复杂度均为O(1),(有误烦请指正🤡🤡🤡) 初始化Redis-Client状态信息完毕,此时还没有任何连接被创建 假设开始执行 r.set('foo', 'baiqi'),此方法返回r.excute_command()的结果 首先尝试从池中获取一个Connection对象 pool.get_connection(command_name, **options) 获取时得先执行下_checkpid()方法,再执行get_connection() 在现代操作系统下保证ConnectionPool fork-safe,连接池的所有方法都会先调用此方法再来操纵连接池的状态。如果当前pid和池对象中保存的pid不一致,可以假设当前进程是fork出来的子进程,子进程不能用父进程的文件描述符(比如sockets),因此它会继续调用self.reset()方法重新初始化当前进程的连接池;如果pids都一致那么直接pass。 而self._fork_lock就是确保了在子进程中的多个线程不会多次执行self.reset()方法。因为在子进程中,第一个调用_checkpid的线程调用了reset()方法使得self.pid置为当前子进程的id。 从池中获取连接时加锁保护,如果池中_available_connections.pop()没有连接,那么开始创建连接make_connection(),并将此连接加入到池的已使用连接集合中,即self._in_use_connections集合 创建连接时得先确认下连接数是不是超过了最大连接数配置,没有则继续创建,返回Connection对象。所以这里的tcp连接数不是在应用一初始化就按照池配置中的连接数来直接一次性创建好多少个连接...