前言
看别人的代码也是对自己思维和经验的学习丰富过程
作为一个初学者,之前写过的代码量较少,很少涉及到完整的项目开发,看完redis-py库后,get到其中的
- 连接保活机制
- 连接池的实现
- 开辟buffer存入从socket接收来的数据及buffer管理
- 熟悉了RESP协议
- 多进程多线程的情况下,利用锁确保连接池数据结构安全
我读的过程中觉得值得注意的是,_in_use_connections
使用集合结构;此连接池不是在初始化时创建好一定数量的tcp连接;其中用了两个互斥锁,一个保护连接池,一个保护多进程的池;较多的连接重连,确保连接可用
执行过程
该库主要有Redis、Connection、ConnectionPool、PythonParse、SocketBuffer几个类,下面大概理了一下redis-py的执行过程
开始使用redis-py客户端
class Redis实例化
可以关注下面几个参数:
- socket_timeout=None,
- socket_connect_timeout=None,
- socket_keepalive=None,
- socket_keepalive_options=None,
- connection_pool=None,
- retry_on_timeout=False,
- max_connections=None,
- single_connection_client=False, # 是否单个连接,不用连接池
- health_check_interval=0,
class ConnectionPool连接池初始化
ConnectionPool()
,此时尚未创建连接# 此处定义连接池最大连接数 max_connections = max_connections or 2 ** 31 # fork_safe,在_checkpid()方法中用到,保护临界区的锁。这个锁是在进程id改变时获得的。比如fork出一个子进程后,子进程id和池对象中保存的id不一致,那么子进程中的多个线程都可能会先获取此锁,第一个获得锁的线程将重置此池的数据结构并最终释放锁对象,后续的线程再执行时,pid已于子线程池中的pid熟悉一致,不再做其他操作,在下面也会有提到 self._fork_lock = threading.Lock() # 定义了并初始化已创建连接数、使用中的链接、可用的连接等数据结构 self._lock = threading.Lock() self._created_connections = 0 self._available_connections = [] self._in_use_connections = set()
注意此处
_in_use_connections
使用了集合存储池中的连接对象,这个与python数据类型时间复杂度有关,可点此参考官网,集合的内部实现与字典极为相似,此集合对象只用到两个操作,add
和remove
,时间复杂度均为O(1),(有误烦请指正🤡🤡🤡)初始化Redis-Client状态信息完毕,此时还没有任何连接被创建
假设开始执行
r.set('foo', 'baiqi')
,此方法返回r.excute_command()
的结果首先尝试从池中获取一个Connection对象
pool.get_connection(command_name, **options)
获取时得先执行下
_checkpid()
方法,再执行get_connection()
在现代操作系统下保证ConnectionPool fork-safe,连接池的所有方法都会先调用此方法再来操纵连接池的状态。如果当前pid和池对象中保存的pid不一致,可以假设当前进程是fork出来的子进程,子进程不能用父进程的文件描述符(比如sockets),因此它会继续调用
self.reset()
方法重新初始化当前进程的连接池;如果pids都一致那么直接pass。而
self._fork_lock
就是确保了在子进程中的多个线程不会多次执行self.reset()
方法。因为在子进程中,第一个调用_checkpid
的线程调用了reset()
方法使得self.pid
置为当前子进程的id。从池中获取连接时加锁保护,如果池中
_available_connections.pop()
没有连接,那么开始创建连接make_connection()
,并将此连接加入到池的已使用连接集合中,即self._in_use_connections
集合创建连接时得先确认下连接数是不是超过了最大连接数配置,没有则继续创建,返回
Connection
对象。所以这里的tcp连接数不是在应用一初始化就按照池配置中的连接数来直接一次性创建好多少个连接
Connection对象初始化,此时尚未建立连接
一些可以关注下的参数,部分参数从连接池那边传递过来
class Connection: "Manages TCP communication to and from a Redis server" def __init__(self, parser_class=DefaultParser, socket_read_size=65536, health_check_interval=0): ... self.socket_timeout = socket_timeout self.socket_connect_timeout = socket_connect_timeout or socket_timeout self.socket_keepalive = socket_keepalive self.socket_keepalive_options = socket_keepalive_options or {} self.socket_type = socket_type self.retry_on_timeout = retry_on_timeout # 此处其实是有一个协议缺陷,由用户自己决定是否在与redis-server交互失败时要进行重试 self.health_check_interval = health_check_interval # 是否应用层连接保活 self.next_health_check = 0 self.encoder = Encoder(encoding, encoding_errors, decode_responses) self._sock = None self._parser = parser_class(socket_read_size=socket_read_size) self._connect_callbacks = [] self._buffer_cutoff = 6000 # 大数据包分段的时候用到,用以截断大包挨个发送 ...
开始建立与redis-server的连接
connection.connect()
,已连接即self._sock
不为false则跳过,否则就开始执行_connect()
动作_connect()
中设置好socket相关的一些参数,然后连接到server端,返回socksock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # 如果开启了tcp_keepalive sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # set the socket_connect_timeout before we connect sock.settimeout(self.socket_connect_timeout) # set the socket_timeout now that we're connected sock.settimeout(self.socket_timeout) ...
建立tcp连接后,执行
Connection.on_connect()
初始化此连接相关的一些redis属性,比如认证,选择db,此方法中调用了PythonParser.on_connect()
,初始化了编码器Encoder
类和SocketBuffer()
SocketBuffer
开辟了一块内存空间io.BytesIO()
,socket接收过来的数据会先被放到此buffer中认证,上面这些初始化完成后,开始与redis-server认证,调用
Connection.send_command()
,首先对认证命令调用Connection.pack_command()
按照RESP协议进行封装,返回output
列表此处需要注意buffer的大小,定义了buffer_cutoff长度为6000,较大字符串会进行分片赋给一个列表
封装完了后调用
Connection.send_packed_command()
发送封装后的包,类似[b'*2\r\n$4\r\nAUTH\r\n$6\r\n123456\r\n']
这样,发包前检查是否有连接Connection,是否进行check_health()
,然后迭代发送output
里面的元素,即Connection._sock.sendall(item)
发完包后开始读取响应内容
Connection.read_response()
,此方法里面调用的是PythonParse.read_response()
方法,从之前初始化好的SocketBuffer中读取数据从此buffer读取的时候,如果没有以
\r\n
结尾就说明包不完整,继续从socket接口去拿数据写入到buffer中,如果写入到buffer中的数据长度和已经从buffer中读走的数据长度一样,那么就清理一下buffer。如果定义了health_check_interval,那么记录一下下次检查的时间戳
选择db,如果指定了db,执行
Connection.send_command()
发送select db
,重复上面发送操作(此处还没看懂怎么执行到这一步的)执行
Connection.can_read()
,轮询socket是否有数据可以读开始执行redis的指令,例如
r.set('foo', 'baiqi')
首先判断是否健康检查,发送ping指令
发送set指令,重复上述发送动作
成功后,执行
ConnectionPool.release()
,操作池的时候同样用锁保护,然后从_in_use_connections
集合中移除,并放到_available_connections
列表中发送get指令,如果没有
Connection
,那么就从ConnectionPool
中去拿,并确保此连接可用Connection.connect()
,并且此连接中没有数据Connection.can_read()
,之后进行之前一样的发送操作
redis-py客户端的缺陷
当 redis-py client向server发送请求之后可能会出现由于各种因素导致的连接断开,可能发生在请求发送阶段,也可能发生在server响应阶段。请求阶段呢server未收到请求,响应阶段server已处理过请求,client不清楚到底是哪个阶段出问题了,那么这时候是重试呢还是不重试呢?重试可能会导致server的数据被覆盖或重复执行,不重试可能出现响应丢失。
在redis-py中,这种情况主要会抛出两种异常,ConnectionError
和TimeoutError
,连接异常毫无疑问得重连。TimeoutError
有两种读超时raise TimeoutError("Timeout reading from socket")
和写超时raise TimeoutError("Timeout writing to socket")
。
写超时:
内核为socket开辟的内存空间满了,这个可能与网络因素有关,需要了解tcp协议的三个指针(滑动窗口)
- 写给socket的数据,可能由于网络原因没到server
- server端迟迟没有读取数据,那么也就不会响应ack
- server发的ack由于网络等原因到不了client
这种情况下,套接字内存空间不会清理未消费的数据,那么就会导致套接字内存空间满,引起各种超时。
这时候,仔细想一下,几种情况我们客户端都无法预料,无法判断是否要重试请求
读超时:
请求内容已写入套接字缓冲区,但是读取套接字recv的时候可能消息不完整,或者压根啥都读不到。可能呢server端已处理响应还没过来,也可能server端没收到请求。如果收到了一部分消息,那么说明server端已经处理过请求了,在redis-py中没有做这方面的判断。