在client-go源码中,processorListener对象里面定义了一个RingBuffer用于缓存所有尚未分发的事件通知,在此记录下这个RingBuffer。
RingBuffer一般用于数据的缓存机制,例如tcp协议里面数据包的缓冲就利用到了RingBuffer。
client-go中的这个buffer是非线程安全、可增长、无边界的先进先出环形缓冲区。环是一个逻辑上的概念,有了环,此段内存空间就可以重复利用,不用频繁重新申请内存,本质上数据还是存在数组里面的,这个数组的大小可以按需进行倍数扩容,扩容后需要重新分配内存空间并拷贝未消费的数据到新数组来。因为它是数组,内存是预先分配的,数组是内存上连续的一段空间,它有一个容易预测的访问模式,因此对CPU高速缓存友好,垃圾回收(GC)在这种情况下也不用做什么。
在实现上,可以理解为两个指针:a) 读指针、b)写指针。在一段buffer上,读指针控制下一次该读数据的位置,写指针控制下一次该写数据的位置。在数组里面我们可以直接用数组下标。要遵循FIFO原则,读指针不能超过写指针,两指针重叠了要么buffer写满了,要么buffer为空。
参考这里的一张图
目前我们只需要考虑:
- 存储啥数据类型
- 数据如何存放
- 何时空间满了需要扩容
- 不能丢失未消费数据
k8s中的源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
| // 源码路径k8s.io/utils/buffer/ring_growing.go
package buffer
// 非线程安全、可增长的环形缓冲区
type RingGrowing struct {
data []interface{} // 存任意数据的数组
n int // 缓冲区大小
beg int // 第一个可用的元素位置索引
readable int // 未消费的元素数量
}
// 初始化一个RingBuffer
func NewRingGrowing(initialSize int) *RingGrowing {
return &RingGrowing{
data: make([]interface{}, initialSize),
n: initialSize,
}
}
// 取出未消费元素中的第一个元素
func (r *RingGrowing) ReadOne() (data interface{}, ok bool) {
if r.readable == 0 {
return nil, false
}
r.readable--
element := r.data[r.beg]
r.data[r.beg] = nil // Remove reference to the object to help GC
if r.beg == r.n-1 {
// 这种情况就是读到数组最后一个元素了,下次读就得从列表的头部开始,以免越界
r.beg = 0
} else {
r.beg++
}
return element, true
}
// 在buffer尾部添加一个元素,buffer满了就扩容
func (r *RingGrowing) WriteOne(data interface{}) {
// 满了的情况
if r.readable == r.n {
newN := r.n * 2
// 新开辟一个两倍大小的数组
newData := make([]interface{}, newN)
to := r.beg + r.readable
if to <= r.n {
copy(newData, r.data[r.beg:to])
} else {
// 未消费的元素在数组两端的情况
copied := copy(newData, r.data[r.beg:])
copy(newData[copied:], r.data[:(to%r.n)])
}
r.beg = 0
r.data = newData
r.n = newN
}
r.data[(r.readable+r.beg)%r.n] = data
r.readable++
}
|
client-go中的这个buffer是非线程安全、可增长、无边界的先进先出环形缓冲区,因此,在此client-go ringBuffer基础上可以继续考虑下面几个问题:
- 无边界可能导致的oom
- 拷贝数组的开销
- 一次扩容到多大合适
- 是否缩容
- 线程安全
- 可以扩展实现io.ReaderWriter接口
- 可以扩展缓冲区阻塞功能
另外这个概念上的指针(区别于语言上的指针&)用途,类似的在tcp协议的滑动窗口流控里面就用到了,发送端使用了三个指针把缓冲区切割成四段来控制发送速率:
- 已发送包并已收到ACK
- 已发送包但未收到ACK确认
- 未发送但是可以发送包的空间
- 未发送但超过窗口范围了
接收端使用两个指针分成三段以便通知接收能力:
- 已接收并确认的空间段
- 未接收但可以接收的空间
- 未接收并不能接收数据