Update client go sourcecode analyse doc: improve readability (#448)

* update client-go sourcecode analyse: improve readability

* remove unuse ref

* markdown format update: start from the second-level

* add title body

* add title image

* be careful: remove unuse ref please...

Co-authored-by: Jianlong Zhou <jianlong.zhou@shopee.com>
pull/450/head
jianlongzhou 2021-07-07 17:34:15 +08:00 committed by GitHub
parent 34b648c73f
commit 3989521766
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 227 additions and 123 deletions

View File

@ -4,28 +4,34 @@
![client-go informer](../images/client-go-informer.png)
## 以deployment controller为例分析其中client-go informer的用法
## 前言
Kubernetes作为新一代的基础设施系统其重要性已经不言而喻了。基于控制器模型实现的声明式API支持着集群中各类型的工作负载稳定高效的按照期望状态运转随着越来越多的用户选择kubernetes无论是为了深入了解kubernetes这一云原生操作系统的工作逻辑还是期待能够根据自己的特定业务需求对kubernetes进行二次开发了解控制器模型的实现机制都是非常重要的。kubernetes提供了client-go以方便使用go语言进行二次快发本文试图讲述client-go各模块如informer、reflector、cache等实现细节。
当我们需要利用client-go来实现自定义控制器时通常会使用informerFactory来管理控制器需要的多个资源对象的informer实例
```go
// 创建一个informer factory
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
controller := NewController(
kubeClient, exampleClient,
kubeInformerFactory.Apps().V1().Deployments())
// factory已经为所有k8s的内置资源对象提供了创建对应informer实例的方法调用具体informer实例的Lister或Informer方法
// 就完成了将informer注册到factory的过程
deploymentLister := kubeInformerFactory.Apps().V1().Deployments().Lister()
// 启动注册到factory的所有informer
kubeInformerFactory.Start(stopCh)
if err = controller.Run(2, stopCh); err != nil {
klog.Fatalf("Error running controller: %s", err.Error())
}
```
### SharedInformerFactory结构
使用sharedInformerFactory的好处比如很多个模块都需要使用pod对象没必要都创建一个pod informer用factor存储每种资源的一个informer这里的informer实现是shareIndexInformer
NewSharedInformerFactory调用了NewSharedInformerFactoryWithOptions将返回一个sharedInformerFactory对象
使用sharedInformerFactory可以统一管理控制器中需要的各资源对象的informer实例避免同一个资源创建多个实例这里的informer实现是shareIndexInformer
NewSharedInformerFactory调用了NewSharedInformerFactoryWithOptions将返回一个sharedInformerFactory对象
> kubeClient:clientset
> defaultResync:30s用于初始化持有的shareIndexInformer的resyncCheckPeriod和defaultEventHandlerResyncPeriod字段
> client: clientset支持直接请求api中各内置资源对象的restful group客户端集合
> namespace: factory关注的namespace默认All Namespaceinformer中的reflector将只会listAndWatch指定namespace的资源
> defaultResync: 用于初始化持有的shareIndexInformer的resyncCheckPeriod和defaultEventHandlerResyncPeriod字段用于定时的将local store同步到deltaFIFO
> customResync支持针对每一个informer来配置resync时间通过WithCustomResyncConfig这个Option配置否则就用指定的defaultResync
> informersfactory管理的informer集合
> startedInformers记录已经启动的informer集合
```go
type sharedInformerFactory struct {
@ -34,11 +40,8 @@ type sharedInformerFactory struct {
tweakListOptions internalinterfaces.TweakListOptionsFunc
lock sync.Mutex
defaultResync time.Duration //前面传过来的时间如30s
customResync map[reflect.Type]time.Duration //针对每一个informer用户配置的resync时间通过WithCustomResyncConfig这个Option配置否则就用指定的defaultResync
customResync map[reflect.Type]time.Duration //自定义resync时间
informers map[reflect.Type]cache.SharedIndexInformer //针对每种类型资源存储一个informerinformer的类型是ShareIndexInformer
// startedInformers is used for tracking which informers have been started.
// This allows Start() to be called multiple times safely.
startedInformers map[reflect.Type]bool //每个informer是否都启动了
}
```
@ -50,7 +53,7 @@ sharedInformerFactory对象的关键方法
```go
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
factory := &sharedInformerFactory{
client: client, //clientsetdeployment资源来说这里就可以直接使用kube clientset
client: client, //clientset原生资源来说,这里可以直接使用kube clientset
namespace: v1.NamespaceAll, //可以看到默认是监听所有ns下的指定资源
defaultResync: defaultResync, //30s
//以下初始化map结构
@ -79,17 +82,17 @@ func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
}
```
#### 等待ShareIndexInformer的cache被同步
#### 等待informer的cache被同步
等待每一个ShareIndexInformer的cache被同步具体怎么算同步完成
等待每一个ShareIndexInformer的cache被同步具体怎么算同步完成
- sharedInformerFactory的WaitForCacheSync将会不断调用factory持有的所有informer的HasSynced方法直到返回true
- 而informer的HasSynced方法调用的自己持有的controller的HasSynced方法informer结构体包含controller对象下文会分析informer的结构
- 而informer的HasSynced方法调用的自己持有的controller的HasSynced方法informer结构持有controller对象下文会分析informer的结构
- informer中的controller的HasSynced方法则调用的是controller持有的deltaFIFO对象的HasSynced方法
也就说sharedInformerFactory的WaitForCacheSync方法判断informer的cache是否同步看的是informer中的deltaFIFO是否同步了deltaFIFO的结构下文将会分析
也就说sharedInformerFactory的WaitForCacheSync方法判断informer的cache是否同步最终看的是informer中的deltaFIFO是否同步了deltaFIFO的结构下文将会分析
```go
func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
@ -120,10 +123,11 @@ func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[ref
只有向factory中添加informerfactory才有意义添加完成之后上面factory的start方法就可以启动了
> obj:如deployment{}
> newFunc:一个可以用来创建指定informer的方法k8s为每一个内置的对象都实现了这个方法比如创建deployment的ShareIndexInformer的方法
> obj: informer关注的资源如deployment{}
> newFunc: 一个知道如何创建指定informer的方法k8s为每一个内置的对象都实现了这个方法比如创建deployment的ShareIndexInformer的方法
```go
// 向factory中注册指定的informer
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
@ -146,7 +150,7 @@ func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internal
}
```
##### deployment的shareIndexInformer对应的newFunc的实现
##### shareIndexInformer对应的newFunc的实现
client-go中已经为所有内置对象都提供了NewInformerFunc
@ -163,17 +167,20 @@ func (v *version) Deployments() DeploymentInformer {
- 只要调用了factory.Apps().V1().Deployments()返回的deploymentInformer的Informer或Lister方法就完成了向factory中添加deployment informer
```go
//即会调用以下Deployments方法创建deploymentInformer对象具有defaultInformer、Informer、Lister方法
//可以看到创建deploymentInformer时传递了一个带索引的缓存附带了一个namespace索引后面可以了解带索引的缓存实现比如可以支持查询某个namespace下的所有pod
//用于创建对应的shareIndexInformer该方法提供给factory的InformerFor方法
// deploymentInformer对象具有defaultInformer、Informer、Lister方法
// 可以看到创建deploymentInformer时传递了一个带索引的缓存附带了一个namespace索引后面可以了解带索引的缓存实现比如可以支持查询某个namespace下的所有pod
// 用于创建对应的shareIndexInformer该方法提供给factory的InformerFor方法
func (f *deploymentInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return NewFilteredDeploymentInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
//向factor中添加dpeloyment的shareIndexInformer并返回
// 向factor中添加dpeloyment的shareIndexInformer并返回
func (f *deploymentInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
}
//返回dpeloyment的lister对象该lister中持有上面创建出的shareIndexInformer的cache的引用方便通过缓存获取对象
// 返回dpeloyment的lister对象该lister中持有上面创建出的shareIndexInformer的cache的引用方便通过缓存获取对象
func (f *deploymentInformer) Lister() v1.DeploymentLister {
return v1.NewDeploymentLister(f.Informer().GetIndexer())
}
@ -185,7 +192,7 @@ func (f *deploymentInformer) Lister() v1.DeploymentLister {
// 可先看看下面的shareIndexInformer结构
func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
//定义对象的ListWatch方法这里直接用的是clientset中的方法
// 定义对象的ListWatch方法这里直接用的是clientset中的方法
&cache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
@ -201,7 +208,7 @@ func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string
},
},
&appsv1beta1.Deployment{},
resyncPeriod, //创建factory是制定的时间,30s
resyncPeriod, //创建factory是指定的时间,如30s
indexers,
)
}
@ -260,7 +267,7 @@ sharedIndexInformer对象的关键方法
该方法初始化了controller对象并启动同时调用processor.run启动所有的listener用于回调用户配置的EventHandler
具体sharedIndexInformer中的processor中的listener是怎么添加的看下文shareProcessor的分析
> 具体sharedIndexInformer中的processor中的listener是怎么添加的看下文shareProcessor的分析
```go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
@ -368,7 +375,11 @@ func (c *controller) processLoop() {
先看看controller怎么处理DeltaFIFO中的对象需要注意DeltaFIFO中的Deltas的结构是一个slice保存同一个对象的所有增量事件
![DeltaFIFO](../images/deltafifo.png)
![image](https://github.com/jianlongzhou/client-go-source-analysis/blob/main/deltaFIFO.png?raw=true)
sharedIndexInformer的HandleDeltas处理从deltaFIFO pod出来的增量时先尝试更新到本地缓存cache更新成功的话会调用processor.distribute方法向processor中的listener添加notificationlistener启动之后会不断获取notification回调用户的EventHandler方法
@ -516,6 +527,8 @@ func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVer
}
```
##### 定时触发resync
在ListAndWatch中还起了一个gorouting定时的进行resync动作
@ -607,10 +620,10 @@ func (f *DeltaFIFO) syncKeyLocked(key string) error {
### 底层缓存的实现
shareIndexInformer中带有一个缓存indexer是一个支持索引的map优点是支持快速查询,参考类图,我们可以知道
shareIndexInformer中带有一个缓存indexer是一个支持索引的map优点是支持快速查询
- Indexer、Queue接口和cache结构体都实现了顶层的Store接口
- cache结构体持有threadSafeStore对象该结构体是线程安全的具备索引查找能力的map
- cache结构体持有threadSafeStore对象threadSafeStore是线程安全的并且具备自定义索引查找的能力
threadSafeMap的结构如下
@ -618,10 +631,6 @@ threadSafeMap的结构如下
> Indexers:一个map[string]IndexFunc结构其中key为索引的名称namespace字符串value则是一个具体的索引函数
> Indices:一个map[string]Index结构其中key也是索引的名称value是一个map[string]sets.String结构其中key是具体的namespace如default这个nsvlaue则是这个ns下的按照索引函数求出来的值的集合比如default这个ns下的所有pod对象名称
通过在向items插入对象的过程中遍历所有的Indexers中的索引函数根据索引函数存储索引key到value的集合关系以下图式结构可以很好的说明
![threadSafeMap](../images/threadsafemap.png)
```go
type threadSafeMap struct {
lock sync.RWMutex
@ -641,11 +650,17 @@ type Indices map[string]Index
type Index map[string]sets.String
```
#### 索引的维护
通过在向items插入对象的过程中遍历所有的Indexers中的索引函数根据索引函数存储索引key到value的集合关系以下图式结构可以很好的说明
![图片来源于网络](https://user-images.githubusercontent.com/41672087/116666278-5981ca00-a9cd-11eb-9570-8ee6eb447d05.png)
#### 缓存中增加对象
以向上面的结构中增加一个对象为例
> 所谓带索引的缓存其实就是在crud对象的时候维护对应的索引结构
在向threadSafeMap的items map中增加完对象后再通过updateIndices更新索引结构
```go
func (c *threadSafeMap) Add(key string, obj interface{}) {
@ -692,7 +707,7 @@ func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, ke
}
```
#### MetaNamespaceIndexFunc索引函数
#### IndexFunc索引函数
一个典型的索引函数MetaNamespaceIndexFunc方便查询时可以根据namespace获取该namespace下的所有对象
@ -711,11 +726,8 @@ func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
提供利用索引来查询的能力Index方法可以根据索引名称和对象查询所有的关联对象
> 例如通过
> 例如通过 `Index(“namespace”, &metav1.ObjectMeta{Namespace: namespace})`获取指定ns下的所有对象具体可以参考tools/cache/listers.go#ListAllByNamespace
>
> Index(“namespace”, &metav1.ObjectMeta{Namespace: namespace})
>
> 获取指定ns下的所有对象具体可以参考tools/cache/listers.go#ListAllByNamespace
```go
func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
@ -763,14 +775,27 @@ func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{},
shareIndexInformer.controller.reflector中的deltaFIFO实现
> items记录deltaFIFO中的对象注意map的value是一个Delta slice
> queue记录上面items中的key
> items记录deltaFIFO中的对象注意map的value是一个delta slice
> queue记录上面items中的key维护对象的fifo顺序
> populated队列中是否填充过数据LIST时调用Replace或调用Delete/Add/Update都会置为true
> initialPopulationCount前面首次List的时候获取到的数据就会调用Replace批量增加到队列同时设置initialPopulationCount为List到的对象数量每次Pop出来会减一于判断是否把首次批量插入的数据都POP出去了
> keyFunc知道怎么从对象中解析出对应key的函数
> initialPopulationCount首次List的时候获取到的数据就会调用Replace批量增加到队列同时设置initialPopulationCount为List到的对象数量每次Pop出来会减一于判断是否把首次批量插入的数据都POP出去了
> keyFunc知道怎么从对象中解析出对应key的函数如MetaNamespaceKeyFunc可以解析出namespace/name的形式
> knownObjects这个其实就是shareIndexInformer中的indexer底层缓存的引用可以认为和etcd中的数据一致
```go
// NewDeltaFIFO方法在前面分析的sharedIndexInformer的Run方法中调用
// fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
f := &DeltaFIFO{
items: map[string]Deltas{},
queue: []string{},
keyFunc: keyFunc,
knownObjects: knownObjects,
}
f.cond.L = &f.lock
return f
}
type DeltaFIFO struct {
// lock/cond protects access to 'items' and 'queue'.
lock sync.RWMutex
@ -820,11 +845,12 @@ DeltaFIFO关键的方法
#### 向deltaFIFO批量插入对象
批量向队列插入数据的方法注意knownObjects是本地缓存indexer的引用
批量向队列插入数据的方法注意knownObjects是informer中本地缓存indexer的引用
这里会更新deltaFIFO的initialPopulationCount为Replace list的对象总数加上list中相比knownObjects多出的对象数量。
因为Replace方法可能是reflector发生re-list的时候再次调用这个时候就会出现knownObjects中存在的对象不在Replace list的情况比如watch的delete事件丢失了这个时候是把这些对象筛选出来封装成DeletedFinalStateUnknown对象以Delete type类型再次加入到deltaFIFO中这样最终从detaFIFO处理这个DeletedFinalStateUnknown 增量时就可以更新本地缓存并且触发reconcile。
因为这个对象最终的结构确实找不到了所以只能用knownObjects里面的记录来封装所以叫做FinalStateUnknown。
> 因为Replace方法可能是reflector发生re-list的时候再次调用这个时候就会出现knownObjects中存在的对象不在Replace list的情况比如watch的delete事件丢失了这个时候是把这些对象筛选出来封装成DeletedFinalStateUnknown对象以Delete type类型再次加入到deltaFIFO中这样最终从detaFIFO处理这个DeletedFinalStateUnknown 增量时就可以更新本地缓存并且触发reconcile。
> 因为这个对象最终的结构确实找不到了所以只能用knownObjects里面的记录来封装delta所以叫做FinalStateUnknown。
```go
@ -839,13 +865,14 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
return KeyError{item, err}
}
keys.Insert(key)
// 通过Replace添加到队列的Delta type都是Sync
// 调用deltaFIFO的queueActionLocked向deltaFIFO增加一个增量
// 可以看到Replace添加的Delta type都是Sync
if err := f.queueActionLocked(Sync, item); err != nil {
return fmt.Errorf("couldn't enqueue object: %v", err)
}
}
// 底层的缓存不应该会是nil
// 底层的缓存不应该会是nil,可以忽略这种情况
if f.knownObjects == nil {
// Do deletion detection against our own list.
queuedDeletions := 0
@ -875,31 +902,28 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
}
// Detect deletions not already in the queue.
// 当reflector发生re-list时可能会出现knownObjects中存在的对象不在Replace list的情况
knownKeys := f.knownObjects.ListKeys()
// 记录这次替换相当于在缓存中删除多少对象
queuedDeletions := 0
// 枚举每一个缓存对象的key看看在不在即将用来替换delta队列的list中
// 枚举local store中的所有对象
for _, k := range knownKeys {
// 对象也在Replace list中所以跳过
if keys.Has(k) {
continue
}
// 对象在缓存但不在list中说明替换操作完成后这个对象相当于被删除了
// 注意这里的所谓替换对deltaFIFO来说是给队列中的对应对象增加一个delete增量queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj})
// 真正删除缓存是在shareIndexInformer中的HandleDeltas中
// 对象在缓存但不在list中说明替换操作完成后这个对象相当于被删除了
// 注意这里的所谓替换对deltaFIFO来说是给队列中的对应对象增加一个
// delete增量queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj})
// 真正删除缓存需要等到DeletedFinalStateUnknown增量被POP出来操作local store时
deletedObj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
deletedObj = nil
klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
} else if !exists {
deletedObj = nil
klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
}
queuedDeletions++
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
// 设置f.initialPopulationCount大于0表示首次插入的对象还没有全部pop出去
// 设置f.initialPopulationCount该值大于0表示首次插入的对象还没有全部pop出去
// informer WaitForCacheSync就是在等待该值为0
if !f.populated {
f.populated = true
f.initialPopulationCount = len(list) + queuedDeletions
@ -909,16 +933,12 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
}
```
#### 从deltaFIFO pop出对象
从队列中Pop出一个方法并由函数process来处理就是shareIndexInformer的HandleDeltas
从队列中Pop出一个方法并由函数process来处理其实就是shareIndexInformer的HandleDeltas
> 每次从DeltaFIFO Pop出一个对象f.initialPopulationCount会减一初始值为List时的对象数量
> 前面的Informer的WaitForCacheSync最终就是调用了这个HasSynced方法
>
> 因为前面Pop出对象的处理方法HandleDeltas中会先调用indexder把对象存起来所以这个HasSynced相当于判断本地缓存是否首次同步完成
```go
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
@ -963,11 +983,9 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
}
```
#### deltaFIFO是否同步完成
对应到前面遗留的没有串起来的问题factory的WaitForCacheSync是如何等待缓存同步完成
串连前面的问题factory的WaitForCacheSync是如何等待缓存同步完成
> factory的WaitForCacheSync方法调用informer的HasSync方法继而调用deltaFIFO的HasSync方法也就是判断从reflector list到的数据是否pop完
@ -979,35 +997,7 @@ func (f *DeltaFIFO) HasSynced() bool {
}
```
#### deltaFIFO增加一个对象
```go
//在队列中给指定的对象append一个Delta
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
newDeltas := append(f.items[id], Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)
if len(newDeltas) > 0 {
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
}
f.items[id] = newDeltas
f.cond.Broadcast()
} else {
// We need to remove this from our map (extra items in the queue are
// ignored if they are not in the map).
delete(f.items, id)
}
return nil
}
```
#### Resync方法
#### 同步local store到deltaFIFO
> 所谓的resync其实就是把knownObjects即缓存中的对象全部再通过queueActionLocked(Sync, obj)加到队列
@ -1021,6 +1011,7 @@ func (f *DeltaFIFO) Resync() error {
}
keys := f.knownObjects.ListKeys()
// 把local store中的对象都以Sync类型增量的形式重新放回到deltaFIFO
for _, k := range keys {
if err := f.syncKeyLocked(k); err != nil {
return err
@ -1046,7 +1037,7 @@ func (f *DeltaFIFO) syncKeyLocked(key string) error {
if len(f.items[id]) > 0 {
return nil
}
// 可以看到这里跟list一样增加到deltaFIFO的是一个Sync类型的增量
if err := f.queueActionLocked(Sync, obj); err != nil {
return fmt.Errorf("couldn't queue object: %v", err)
}
@ -1054,36 +1045,117 @@ func (f *DeltaFIFO) syncKeyLocked(key string) error {
}
```
###
#### 在deltaFIFO增加一个对象
### shareProcess的实现
注意这里在append增量时的去重逻辑如果连续的两个增量类型都是Deleted那么就去掉一个正常情况确实不会出现这样且没必要优先去掉前面所说的因为re-list可能导致的api与local store不一致而增加的DeletedFinalStateUnknown类型的增量
shareIndexInformer中具有一个shareProcess结构用于分发deltaFIFO的对象调用用户配置的EventHandler处理
```go
//在队列中给指定的对象append一个Delta
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
// 把增量append到slice的后面
newDeltas := append(f.items[id], Delta{actionType, obj})
// 连续的两个Deleted delta将会去掉一个
newDeltas = dedupDeltas(newDeltas)
if len(newDeltas) > 0 {
// 维护queue队列
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
}
f.items[id] = newDeltas
f.cond.Broadcast()
} else {
// We need to remove this from our map (extra items in the queue are
// ignored if they are not in the map).
delete(f.items, id)
}
return nil
}
```
当前认为只有连续的两个Delete delta才有必要去重
```go
func dedupDeltas(deltas Deltas) Deltas {
n := len(deltas)
if n < 2 {
return deltas
}
// 每次取最后两个delta来判断
a := &deltas[n-1]
b := &deltas[n-2]
if out := isDup(a, b); out != nil {
d := append(Deltas{}, deltas[:n-2]...)
return append(d, *out)
}
return deltas
}
func isDup(a, b *Delta) *Delta {
// 当前认为只有连续的两个Delete delta才有必要去重
if out := isDeletionDup(a, b); out != nil {
return out
}
// TODO: Detect other duplicate situations? Are there any?
return nil
}
// keep the one with the most information if both are deletions.
func isDeletionDup(a, b *Delta) *Delta {
if b.Type != Deleted || a.Type != Deleted {
return nil
}
// Do more sophisticated checks, or is this sufficient?
// 优先去重DeletedFinalStateUnknown类型的Deleted delta
if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
return a
}
return b
}
```
### sharedProcessor的实现
shareIndexInformer中的sharedProcess结构用于分发deltaFIFO的对象回调用户配置的EventHandler方法
可以看到shareIndexInformer中的process直接通过&sharedProcessor{clock: realClock}初始化
如下为sharedProcessor结构
> listenersStartedlisteners中包含的listener是否都已经启动了
> listeners已添加的listener列表用来处理watch到的数据
> syncingListeners已添加的listener列表用来处理list到的数据
```go
// NewSharedIndexInformer creates a new instance for the listwatcher.
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
// 初始化一个默认的processor
processor: &sharedProcessor{clock: realClock},
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: objType,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
// cacheMutationDetector可以记录local store是否被外部修改
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
clock: realClock,
}
return sharedIndexInformer
}
```
如下为sharedProcessor结构
> listenersStartedlisteners中包含的listener是否都已经启动了
> listeners已添加的listener列表用来处理watch到的数据
> syncingListeners已添加的listener列表用来处理list或者resync的数据
```go
type sharedProcessor struct {
listenersStarted bool
listenersLock sync.RWMutex
@ -1094,6 +1166,18 @@ type sharedProcessor struct {
}
```
#### 理解listeners和syncingListeners的区别
processor可以支持listener的维度配置是否需要resync一个informer可以配置多个EventHandler而一个EventHandler对应processor中的一个listener每个listener可以配置需不需要resync如果某个listener需要resync那么添加到deltaFIFO的Sync增量最终也只会回到对应的listener
reflector中会定时判断每一个listener是否需要进行resync判断的依据是看配置EventHandler的时候指定的resyncPeriod0代表该listener不需要resync否则就每隔resyncPeriod看看是否到时间了
- listeners记录了informer添加的所有listener
- syncingListeners记录了informer中哪些listener处于sync状态
syncingListeners是listeners的子集syncingListeners记录那些开启了resync且时间已经到达了的listener把它们放在一个独立的slice是避免下面分析的distribute方法中把obj增加到了还不需要resync的listener中
#### 为sharedProcessor添加listener
在sharedProcessor中添加一个listener
@ -1101,12 +1185,13 @@ type sharedProcessor struct {
```go
func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
// 同时添加到listeners和syncingListeners列表但其实添加的是同一个对象的引用
// 所以下面run启动的时候只需要启动listeners中listener就可以了
p.listeners = append(p.listeners, listener)
p.syncingListeners = append(p.syncingListeners, listener)
}
```
#### 启动sharedProcessor中的listener
#### 启动sharedProcessor中的listener
sharedProcessor启动所有的listener
是通过调用listener.run和listener.pop来启动一个listener两个方法具体作用看下文processorListener说明
@ -1117,7 +1202,9 @@ func (p *sharedProcessor) run(stopCh <-chan struct{}) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
// listener的run方法不断的从listener自身的缓冲区取出对象回调handler
p.wg.Start(listener.run)
// listener的pod方法不断的接收对象并暂存在自身的缓冲区中
p.wg.Start(listener.pop)
}
p.listenersStarted = true
@ -1134,14 +1221,22 @@ func (p *sharedProcessor) run(stopCh <-chan struct{}) {
#### sharedProcessor分发对象
distribute方法是在前面介绍[deltaFIFO pop出来的对象处理逻辑]时提到的把notification事件添加到listener中listener如何pop出notification回调EventHandler见下文listener分析
distribute方法是在前面介绍`[deltaFIFO pop出来的对象处理逻辑]`时提到的把notification事件添加到listener中listener如何pop出notification回调EventHandler见下文listener部分分析
当通过distribute分发从deltaFIFO获取的对象时如果delta type是Sync那么就会把对象交给sync listener来处理而Sync类型的delta只能来源于下面两种情况
- reflector list Replace到deltaFIFO的对象因为首次在sharedProcessor增加一个listener的时候是同时加在listeners和syncingListeners中的
- reflector定时触发resync local store到deltaFIFO的对象因为每次reflector调用processor的shouldResync时都会把达到resync条件的listener筛选出来重新放到p.syncingListeners
上面两种情况都可以在p.syncingListeners中准备好listener
```go
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
// 如果是通过reflector list Replace到deltaFIFO的对象那么distribute到syncingListeners
// 如果是通过reflector list Replace到deltaFIFO的对象或者reflector定时触发resync到deltaFIFO的对象那么distribute到syncingListeners
if sync {
// 保证deltaFIFO Resync方法过来的delta obj只给开启了resync能力的listener
for _, listener := range p.syncingListeners {
listener.add(obj)
}
@ -1157,10 +1252,16 @@ func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
sharedProcessor中的listener具体的类型运转逻辑就是把用户通过addCh增加的事件发送到nextCh供run方法取出回调Eventhandler因为addCh和nectCh都是无缓冲channel所以中间引入ringBuffer做缓存
> processorListener是sharedIndexInformer调用AddEventHandler时创建并添加到sharedProcess
> 对于一个Informer可以多次调用AddEventHandler来添加多个listener但我们的一般的使用场景应该都只会添加一个作用都是类似enqueue到workQueue
> addCh和nextCh都将被初始化为无缓冲的chan
processorListener是sharedIndexInformer调用AddEventHandler时创建并添加到sharedProcessor对于一个Informer可以多次调用AddEventHandler来添加多个listener
> addCh无缓冲的chanlistener的pod方法不断从addCh取出对象丢给nextCh。addCh中的对象来源于listener的add方法如果nextCh不能及时消费则放入缓冲区pendingNotifications
> nextCh无缓冲的chanlistener的run方法不断从nextCh取出对象回调用户handler。nextCh的对象来源于addCh或者缓冲区
> pendingNotifications一个无容量限制的环形缓冲区可以理解为可以无限存储的队列用来存储deltaFIFO分发过来的消息
> nextResync由resyncPeriod和requestedResyncPeriod计算得出与当前时间now比较判断listener是否该进行resync了
> resyncPeriodlistener自身期待多长时间进行resync
> requestedResyncPeriodinformer希望listener多长时间进行resync
```go
type processorListener struct {
@ -1195,13 +1296,14 @@ shareProcessor中的distribute方法调用的是listener的add来向addCh增加
```go
func (p *processorListener) add(notification interface{}) {
// 虽然p.addCh是一个无缓冲的channel但是因为listener中存在ring buffer所以这里并不会一直阻塞
p.addCh <- notification
}
```
#### 判断是否需要resync
前面reflector中会调用这个shouldResync方法根据每个listener的resyncPeriod判断是否需要resync
如果resyncPeriod为0表示不需要resync否则判断当前时间now是否已经超过了nextResync是的话则返回true表示需要resync。其中nextResync在每次调用listener的shouldResync方法成功时更新
```go
// shouldResync queries every listener to determine if any of them need a resync, based on each
@ -1209,7 +1311,7 @@ func (p *processorListener) add(notification interface{}) {
func (p *sharedProcessor) shouldResync() bool {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
// 这里每次都会先置空列表保证里面记录了当前需要resync的listener
p.syncingListeners = []*processorListener{}
resyncNeeded := false
@ -1219,6 +1321,7 @@ func (p *sharedProcessor) shouldResync() bool {
// listeners that are going to be resyncing.
if listener.shouldResync(now) {
resyncNeeded = true
// 达到resync条件的listener被加入syncingListeners
p.syncingListeners = append(p.syncingListeners, listener)
listener.determineNextResync(now)
}
@ -1244,6 +1347,7 @@ func (p *processorListener) run() {
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
// 回调用户配置的handler
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)