在client-go源码中,processorListener对象里面定义了一个RingBuffer用于缓存所有尚未分发的事件通知,在此记录下这个RingBuffer。

RingBuffer一般用于数据的缓存机制,例如tcp协议里面数据包的缓冲就利用到了RingBuffer。

client-go中的这个buffer是非线程安全、可增长、无边界的先进先出环形缓冲区。环是一个逻辑上的概念,有了环,此段内存空间就可以重复利用,不用频繁重新申请内存,本质上数据还是存在数组里面的,这个数组的大小可以按需进行倍数扩容,扩容后需要重新分配内存空间并拷贝未消费的数据到新数组来。因为它是数组,内存是预先分配的,数组是内存上连续的一段空间,它有一个容易预测的访问模式,因此对CPU高速缓存友好,垃圾回收(GC)在这种情况下也不用做什么。 在实现上,可以理解为两个指针:a) 读指针、b)写指针。在一段buffer上,读指针控制下一次该读数据的位置,写指针控制下一次该写数据的位置。在数组里面我们可以直接用数组下标。要遵循FIFO原则,读指针不能超过写指针,两指针重叠了要么buffer写满了,要么buffer为空。

参考这里的一张图 Circular_Buffer_Animation

目前我们只需要考虑:

  1. 存储啥数据类型
  2. 数据如何存放
  3. 何时空间满了需要扩容
  4. 不能丢失未消费数据

k8s中的源码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// 源码路径k8s.io/utils/buffer/ring_growing.go

package buffer

// 非线程安全、可增长的环形缓冲区
type RingGrowing struct {
    data     []interface{} // 存任意数据的数组
    n        int // 缓冲区大小
    beg      int // 第一个可用的元素位置索引
    readable int // 未消费的元素数量
}

// 初始化一个RingBuffer
func NewRingGrowing(initialSize int) *RingGrowing {
    return &RingGrowing{
        data: make([]interface{}, initialSize),
        n:    initialSize,
    }
}

// 取出未消费元素中的第一个元素
func (r *RingGrowing) ReadOne() (data interface{}, ok bool) {
    if r.readable == 0 {
        return nil, false
    }
    r.readable--
    element := r.data[r.beg]
    r.data[r.beg] = nil // Remove reference to the object to help GC
    if r.beg == r.n-1 {
        // 这种情况就是读到数组最后一个元素了,下次读就得从列表的头部开始,以免越界
        r.beg = 0
    } else {
        r.beg++
    }
    return element, true
}

// 在buffer尾部添加一个元素,buffer满了就扩容
func (r *RingGrowing) WriteOne(data interface{}) {
    // 满了的情况
    if r.readable == r.n {
        newN := r.n * 2
        // 新开辟一个两倍大小的数组
        newData := make([]interface{}, newN)
        to := r.beg + r.readable
        if to <= r.n {
            copy(newData, r.data[r.beg:to])
        } else {
            // 未消费的元素在数组两端的情况
            copied := copy(newData, r.data[r.beg:])
            copy(newData[copied:], r.data[:(to%r.n)])
        }
        r.beg = 0
        r.data = newData
        r.n = newN
    }
    r.data[(r.readable+r.beg)%r.n] = data
    r.readable++
}

client-go中的这个buffer是非线程安全、可增长、无边界的先进先出环形缓冲区,因此,在此client-go ringBuffer基础上可以继续考虑下面几个问题:

  1. 无边界可能导致的oom
  2. 拷贝数组的开销
  3. 一次扩容到多大合适
  4. 是否缩容
  5. 线程安全
  6. 可以扩展实现io.ReaderWriter接口
  7. 可以扩展缓冲区阻塞功能

另外这个概念上的指针(区别于语言上的指针&)用途,类似的在tcp协议的滑动窗口流控里面就用到了,发送端使用了三个指针把缓冲区切割成四段来控制发送速率:

  1. 已发送包并已收到ACK
  2. 已发送包但未收到ACK确认
  3. 未发送但是可以发送包的空间
  4. 未发送但超过窗口范围了

接收端使用两个指针分成三段以便通知接收能力:

  1. 已接收并确认的空间段
  2. 未接收但可以接收的空间
  3. 未接收并不能接收数据