From 3989521766e3263501f169d60701c83f2cf4e50a Mon Sep 17 00:00:00 2001 From: jianlongzhou <41672087+jianlongzhou@users.noreply.github.com> Date: Wed, 7 Jul 2021 17:34:15 +0800 Subject: [PATCH] Update client go sourcecode analyse doc: improve readability (#448) * update client-go sourcecode analyse: improve readability * remove unuse ref * markdown format update: start from the second-level * add title body * add title image * be careful: remove unuse ref please... Co-authored-by: Jianlong Zhou --- .../client-go-informer-sourcecode-analyse.md | 350 ++++++++++++------ 1 file changed, 227 insertions(+), 123 deletions(-) diff --git a/develop/client-go-informer-sourcecode-analyse.md b/develop/client-go-informer-sourcecode-analyse.md index 2f1860a9b..6a962466f 100644 --- a/develop/client-go-informer-sourcecode-analyse.md +++ b/develop/client-go-informer-sourcecode-analyse.md @@ -4,28 +4,34 @@ ![client-go informer](../images/client-go-informer.png) -## 以deployment controller为例分析其中client-go informer的用法 +## 前言 + +Kubernetes作为新一代的基础设施系统,其重要性已经不言而喻了。基于控制器模型实现的声明式API支持着集群中各类型的工作负载稳定高效的按照期望状态运转,随着越来越多的用户选择kubernetes,无论是为了深入了解kubernetes这一云原生操作系统的工作逻辑,还是期待能够根据自己的特定业务需求对kubernetes进行二次开发,了解控制器模型的实现机制都是非常重要的。kubernetes提供了client-go以方便使用go语言进行二次快发,本文试图讲述client-go各模块如informer、reflector、cache等实现细节。 + + +当我们需要利用client-go来实现自定义控制器时,通常会使用informerFactory来管理控制器需要的多个资源对象的informer实例 ```go +// 创建一个informer factory kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30) -controller := NewController( - kubeClient, exampleClient, - kubeInformerFactory.Apps().V1().Deployments()) - +// factory已经为所有k8s的内置资源对象提供了创建对应informer实例的方法,调用具体informer实例的Lister或Informer方法 +// 就完成了将informer注册到factory的过程 +deploymentLister := kubeInformerFactory.Apps().V1().Deployments().Lister() +// 启动注册到factory的所有informer kubeInformerFactory.Start(stopCh) - -if err = controller.Run(2, stopCh); err != nil { - klog.Fatalf("Error running controller: %s", err.Error()) -} ``` ### SharedInformerFactory结构 -使用sharedInformerFactory的好处:比如很多个模块都需要使用pod对象,没必要都创建一个pod informer,用factor存储每种资源的一个informer,这里的informer实现是shareIndexInformer -NewSharedInformerFactory调用了NewSharedInformerFactoryWithOptions,将返回一个sharedInformerFactory对象。 +使用sharedInformerFactory可以统一管理控制器中需要的各资源对象的informer实例,避免同一个资源创建多个实例,这里的informer实现是shareIndexInformer +NewSharedInformerFactory调用了NewSharedInformerFactoryWithOptions,将返回一个sharedInformerFactory对象 -> kubeClient:clientset -> defaultResync:30s,用于初始化持有的shareIndexInformer的resyncCheckPeriod和defaultEventHandlerResyncPeriod字段 +> client: clientset,支持直接请求api中各内置资源对象的restful group客户端集合 +> namespace: factory关注的namespace(默认All Namespace),informer中的reflector将只会listAndWatch指定namespace的资源 +> defaultResync: 用于初始化持有的shareIndexInformer的resyncCheckPeriod和defaultEventHandlerResyncPeriod字段,用于定时的将local store同步到deltaFIFO +> customResync:支持针对每一个informer来配置resync时间,通过WithCustomResyncConfig这个Option配置,否则就用指定的defaultResync +> informers:factory管理的informer集合 +> startedInformers:记录已经启动的informer集合 ```go type sharedInformerFactory struct { @@ -34,11 +40,8 @@ type sharedInformerFactory struct { tweakListOptions internalinterfaces.TweakListOptionsFunc lock sync.Mutex defaultResync time.Duration //前面传过来的时间,如30s - customResync map[reflect.Type]time.Duration //针对每一个informer,用户配置的resync时间,通过WithCustomResyncConfig这个Option配置,否则就用指定的defaultResync - + customResync map[reflect.Type]time.Duration //自定义resync时间 informers map[reflect.Type]cache.SharedIndexInformer //针对每种类型资源存储一个informer,informer的类型是ShareIndexInformer - // startedInformers is used for tracking which informers have been started. - // This allows Start() to be called multiple times safely. startedInformers map[reflect.Type]bool //每个informer是否都启动了 } ``` @@ -50,7 +53,7 @@ sharedInformerFactory对象的关键方法: ```go func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory { factory := &sharedInformerFactory{ - client: client, //clientset,对deployment资源来说,这里就可以直接使用kube clientset + client: client, //clientset,对原生资源来说,这里可以直接使用kube clientset namespace: v1.NamespaceAll, //可以看到默认是监听所有ns下的指定资源 defaultResync: defaultResync, //30s //以下初始化map结构 @@ -79,17 +82,17 @@ func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { } ``` -#### 等待ShareIndexInformer的cache被同步 +#### 等待informer的cache被同步 -等待每一个ShareIndexInformer的cache被同步,具体怎么算同步完成呢? +等待每一个ShareIndexInformer的cache被同步,具体怎么算同步完成? - sharedInformerFactory的WaitForCacheSync将会不断调用factory持有的所有informer的HasSynced方法,直到返回true -- 而informer的HasSynced方法调用的自己持有的controller的HasSynced方法(informer结构体包含controller对象,下文会分析informer的结构) +- 而informer的HasSynced方法调用的自己持有的controller的HasSynced方法(informer结构持有controller对象,下文会分析informer的结构) - informer中的controller的HasSynced方法则调用的是controller持有的deltaFIFO对象的HasSynced方法 -也就说sharedInformerFactory的WaitForCacheSync方法判断informer的cache是否同步,看的是informer中的deltaFIFO是否同步了,deltaFIFO的结构下文将会分析 +也就说sharedInformerFactory的WaitForCacheSync方法判断informer的cache是否同步,最终看的是informer中的deltaFIFO是否同步了,deltaFIFO的结构下文将会分析 ```go func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool { @@ -120,10 +123,11 @@ func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[ref 只有向factory中添加informer,factory才有意义,添加完成之后,上面factory的start方法就可以启动了 -> obj:如deployment{} -> newFunc:一个可以用来创建指定informer的方法,k8s为每一个内置的对象都实现了这个方法,比如创建deployment的ShareIndexInformer的方法 +> obj: informer关注的资源如deployment{} +> newFunc: 一个知道如何创建指定informer的方法,k8s为每一个内置的对象都实现了这个方法,比如创建deployment的ShareIndexInformer的方法 ```go +// 向factory中注册指定的informer func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() @@ -146,7 +150,7 @@ func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internal } ``` -##### deployment的shareIndexInformer对应的newFunc的实现 +##### shareIndexInformer对应的newFunc的实现 client-go中已经为所有内置对象都提供了NewInformerFunc @@ -163,17 +167,20 @@ func (v *version) Deployments() DeploymentInformer { - 只要调用了factory.Apps().V1().Deployments()返回的deploymentInformer的Informer或Lister方法,就完成了向factory中添加deployment informer ```go -//即会调用以下Deployments方法创建deploymentInformer对象具有defaultInformer、Informer、Lister方法 -//可以看到创建deploymentInformer时传递了一个带索引的缓存,附带了一个namespace索引,后面可以了解带索引的缓存实现,比如可以支持查询:某个namespace下的所有pod -//用于创建对应的shareIndexInformer,该方法提供给factory的InformerFor方法 +// deploymentInformer对象具有defaultInformer、Informer、Lister方法 +// 可以看到创建deploymentInformer时传递了一个带索引的缓存,附带了一个namespace索引,后面可以了解带索引的缓存实现,比如可以支持查询:某个namespace下的所有pod + +// 用于创建对应的shareIndexInformer,该方法提供给factory的InformerFor方法 func (f *deploymentInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { return NewFilteredDeploymentInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) } -//向factor中添加dpeloyment的shareIndexInformer并返回 + +// 向factor中添加dpeloyment的shareIndexInformer并返回 func (f *deploymentInformer) Informer() cache.SharedIndexInformer { return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer) } -//返回dpeloyment的lister对象,该lister中持有上面创建出的shareIndexInformer的cache的引用,方便通过缓存获取对象 + +// 返回dpeloyment的lister对象,该lister中持有上面创建出的shareIndexInformer的cache的引用,方便通过缓存获取对象 func (f *deploymentInformer) Lister() v1.DeploymentLister { return v1.NewDeploymentLister(f.Informer().GetIndexer()) } @@ -185,7 +192,7 @@ func (f *deploymentInformer) Lister() v1.DeploymentLister { // 可先看看下面的shareIndexInformer结构 func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { return cache.NewSharedIndexInformer( - //定义对象的ListWatch方法,这里直接用的是clientset中的方法 + // 定义对象的ListWatch方法,这里直接用的是clientset中的方法 &cache.ListWatch{ ListFunc: func(options v1.ListOptions) (runtime.Object, error) { if tweakListOptions != nil { @@ -201,7 +208,7 @@ func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string }, }, &appsv1beta1.Deployment{}, - resyncPeriod, //创建factory是制定的时间,30s + resyncPeriod, //创建factory是指定的时间,如30s indexers, ) } @@ -260,7 +267,7 @@ sharedIndexInformer对象的关键方法: 该方法初始化了controller对象并启动,同时调用processor.run启动所有的listener,用于回调用户配置的EventHandler -具体sharedIndexInformer中的processor中的listener是怎么添加的,看下文shareProcessor的分析 +> 具体sharedIndexInformer中的processor中的listener是怎么添加的,看下文shareProcessor的分析 ```go func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { @@ -368,7 +375,11 @@ func (c *controller) processLoop() { 先看看controller怎么处理DeltaFIFO中的对象,需要注意DeltaFIFO中的Deltas的结构,是一个slice,保存同一个对象的所有增量事件 -![DeltaFIFO](../images/deltafifo.png) +![image](https://github.com/jianlongzhou/client-go-source-analysis/blob/main/deltaFIFO.png?raw=true) + + + + sharedIndexInformer的HandleDeltas处理从deltaFIFO pod出来的增量时,先尝试更新到本地缓存cache,更新成功的话会调用processor.distribute方法向processor中的listener添加notification,listener启动之后会不断获取notification回调用户的EventHandler方法 @@ -516,6 +527,8 @@ func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVer } ``` + + ##### 定时触发resync 在ListAndWatch中还起了一个gorouting定时的进行resync动作 @@ -607,10 +620,10 @@ func (f *DeltaFIFO) syncKeyLocked(key string) error { ### 底层缓存的实现 -shareIndexInformer中带有一个缓存indexer,是一个支持索引的map,优点是支持快速查询,参考类图,我们可以知道: +shareIndexInformer中带有一个缓存indexer,是一个支持索引的map,优点是支持快速查询: - Indexer、Queue接口和cache结构体都实现了顶层的Store接口 -- cache结构体持有threadSafeStore对象,该结构体是线程安全的,具备索引查找能力的map +- cache结构体持有threadSafeStore对象,threadSafeStore是线程安全的,并且具备自定义索引查找的能力 threadSafeMap的结构如下: @@ -618,10 +631,6 @@ threadSafeMap的结构如下: > Indexers:一个map[string]IndexFunc结构,其中key为索引的名称,如’namespace’字符串,value则是一个具体的索引函数 > Indices:一个map[string]Index结构,其中key也是索引的名称,value是一个map[string]sets.String结构,其中key是具体的namespace,如default这个ns,vlaue则是这个ns下的按照索引函数求出来的值的集合,比如default这个ns下的所有pod对象名称 -通过在向items插入对象的过程中,遍历所有的Indexers中的索引函数,根据索引函数存储索引key到value的集合关系,以下图式结构可以很好的说明 - -![threadSafeMap](../images/threadsafemap.png) - ```go type threadSafeMap struct { lock sync.RWMutex @@ -641,11 +650,17 @@ type Indices map[string]Index type Index map[string]sets.String ``` +#### 索引的维护 + +通过在向items插入对象的过程中,遍历所有的Indexers中的索引函数,根据索引函数存储索引key到value的集合关系,以下图式结构可以很好的说明: + +![图片来源于网络](https://user-images.githubusercontent.com/41672087/116666278-5981ca00-a9cd-11eb-9570-8ee6eb447d05.png) + + + #### 缓存中增加对象 -以向上面的结构中增加一个对象为例 - -> 所谓带索引的缓存,其实就是在crud对象的时候,维护对应的索引结构 +在向threadSafeMap的items map中增加完对象后,再通过updateIndices更新索引结构 ```go func (c *threadSafeMap) Add(key string, obj interface{}) { @@ -692,7 +707,7 @@ func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, ke } ``` -#### MetaNamespaceIndexFunc索引函数 +#### IndexFunc索引函数 一个典型的索引函数MetaNamespaceIndexFunc,方便查询时可以根据namespace获取该namespace下的所有对象 @@ -711,11 +726,8 @@ func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) { 提供利用索引来查询的能力,Index方法可以根据索引名称和对象,查询所有的关联对象 -> 例如通过 +> 例如通过 `Index(“namespace”, &metav1.ObjectMeta{Namespace: namespace})`获取指定ns下的所有对象,具体可以参考tools/cache/listers.go#ListAllByNamespace > -> Index(“namespace”, &metav1.ObjectMeta{Namespace: namespace}) -> -> 获取指定ns下的所有对象,具体可以参考tools/cache/listers.go#ListAllByNamespace ```go func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) { @@ -763,14 +775,27 @@ func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, shareIndexInformer.controller.reflector中的deltaFIFO实现 -> items:记录deltaFIFO中的对象,注意map的value是一个Delta slice -> queue:记录上面items中的key +> items:记录deltaFIFO中的对象,注意map的value是一个delta slice +> queue:记录上面items中的key,维护对象的fifo顺序 > populated:队列中是否填充过数据,LIST时调用Replace或调用Delete/Add/Update都会置为true -> initialPopulationCount:前面首次List的时候获取到的数据就会调用Replace批量增加到队列,同时设置initialPopulationCount为List到的对象数量,每次Pop出来会减一,由于判断是否把首次批量插入的数据都POP出去了 -> keyFunc:知道怎么从对象中解析出对应key的函数 +> initialPopulationCount:首次List的时候获取到的数据就会调用Replace批量增加到队列,同时设置initialPopulationCount为List到的对象数量,每次Pop出来会减一,用于判断是否把首次批量插入的数据都POP出去了 +> keyFunc:知道怎么从对象中解析出对应key的函数,如MetaNamespaceKeyFunc可以解析出namespace/name的形式 > knownObjects:这个其实就是shareIndexInformer中的indexer底层缓存的引用,可以认为和etcd中的数据一致 ```go +// NewDeltaFIFO方法在前面分析的sharedIndexInformer的Run方法中调用 +// fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer) +func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO { + f := &DeltaFIFO{ + items: map[string]Deltas{}, + queue: []string{}, + keyFunc: keyFunc, + knownObjects: knownObjects, + } + f.cond.L = &f.lock + return f +} + type DeltaFIFO struct { // lock/cond protects access to 'items' and 'queue'. lock sync.RWMutex @@ -820,11 +845,12 @@ DeltaFIFO关键的方法: #### 向deltaFIFO批量插入对象 -批量向队列插入数据的方法,注意knownObjects是本地缓存indexer的引用 +批量向队列插入数据的方法,注意knownObjects是informer中本地缓存indexer的引用 这里会更新deltaFIFO的initialPopulationCount为Replace list的对象总数加上list中相比knownObjects多出的对象数量。 -因为Replace方法可能是reflector发生re-list的时候再次调用,这个时候就会出现knownObjects中存在的对象不在Replace list的情况(比如watch的delete事件丢失了),这个时候是把这些对象筛选出来,封装成DeletedFinalStateUnknown对象以Delete type类型再次加入到deltaFIFO中,这样最终从detaFIFO处理这个DeletedFinalStateUnknown 增量时就可以更新本地缓存并且触发reconcile。 -因为这个对象最终的结构确实找不到了,所以只能用knownObjects里面的记录来封装,所以叫做FinalStateUnknown。 + +> 因为Replace方法可能是reflector发生re-list的时候再次调用,这个时候就会出现knownObjects中存在的对象不在Replace list的情况(比如watch的delete事件丢失了),这个时候是把这些对象筛选出来,封装成DeletedFinalStateUnknown对象以Delete type类型再次加入到deltaFIFO中,这样最终从detaFIFO处理这个DeletedFinalStateUnknown 增量时就可以更新本地缓存并且触发reconcile。 +> 因为这个对象最终的结构确实找不到了,所以只能用knownObjects里面的记录来封装delta,所以叫做FinalStateUnknown。 ```go @@ -839,13 +865,14 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error { return KeyError{item, err} } keys.Insert(key) - // 通过Replace添加到队列的Delta type都是Sync + // 调用deltaFIFO的queueActionLocked向deltaFIFO增加一个增量 + // 可以看到Replace添加的Delta type都是Sync if err := f.queueActionLocked(Sync, item); err != nil { return fmt.Errorf("couldn't enqueue object: %v", err) } } -// 底层的缓存不应该会是nil + // 底层的缓存不应该会是nil,可以忽略这种情况 if f.knownObjects == nil { // Do deletion detection against our own list. queuedDeletions := 0 @@ -875,31 +902,28 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error { } // Detect deletions not already in the queue. + // 当reflector发生re-list时,可能会出现knownObjects中存在的对象不在Replace list的情况 knownKeys := f.knownObjects.ListKeys() // 记录这次替换相当于在缓存中删除多少对象 queuedDeletions := 0 - // 枚举每一个缓存对象的key,看看在不在即将用来替换delta队列的list中 + // 枚举local store中的所有对象 for _, k := range knownKeys { + // 对象也在Replace list中,所以跳过 if keys.Has(k) { continue } - // 对象在缓存,但不在list中,说明替换操作完成后,这个对象相当于被删除了 - // 注意这里的所谓替换,对deltaFIFO来说,是给队列中的对应对象增加一个delete增量queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}) - // 真正删除缓存是在shareIndexInformer中的HandleDeltas中 + // 对象在缓存,但不在list中,说明替换操作完成后,这个对象相当于被删除了 + // 注意这里的所谓替换,对deltaFIFO来说,是给队列中的对应对象增加一个 + // delete增量queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}) + // 真正删除缓存需要等到DeletedFinalStateUnknown增量被POP出来操作local store时 deletedObj, exists, err := f.knownObjects.GetByKey(k) - if err != nil { - deletedObj = nil - klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k) - } else if !exists { - deletedObj = nil - klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k) - } queuedDeletions++ if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { return err } } - // 设置f.initialPopulationCount,大于0表示首次插入的对象还没有全部pop出去 + // 设置f.initialPopulationCount,该值大于0表示首次插入的对象还没有全部pop出去 + // informer WaitForCacheSync就是在等待该值为0 if !f.populated { f.populated = true f.initialPopulationCount = len(list) + queuedDeletions @@ -909,16 +933,12 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error { } ``` - - #### 从deltaFIFO pop出对象 -从队列中Pop出一个方法,并由函数process来处理,就是shareIndexInformer的HandleDeltas +从队列中Pop出一个方法,并由函数process来处理,其实就是shareIndexInformer的HandleDeltas > 每次从DeltaFIFO Pop出一个对象,f.initialPopulationCount会减一,初始值为List时的对象数量 > 前面的Informer的WaitForCacheSync最终就是调用了这个HasSynced方法 -> -> 因为前面Pop出对象的处理方法HandleDeltas中,会先调用indexder把对象存起来,所以这个HasSynced相当于判断本地缓存是否首次同步完成 ```go func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { @@ -963,11 +983,9 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { } ``` - - #### deltaFIFO是否同步完成 -对应到前面遗留的没有串起来的问题:factory的WaitForCacheSync是如何等待缓存同步完成: +串连前面的问题:factory的WaitForCacheSync是如何等待缓存同步完成 > factory的WaitForCacheSync方法调用informer的HasSync方法,继而调用deltaFIFO的HasSync方法,也就是判断从reflector list到的数据是否pop完 @@ -979,35 +997,7 @@ func (f *DeltaFIFO) HasSynced() bool { } ``` -#### deltaFIFO增加一个对象 - -```go -//在队列中给指定的对象append一个Delta -func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { - id, err := f.KeyOf(obj) - if err != nil { - return KeyError{obj, err} - } - - newDeltas := append(f.items[id], Delta{actionType, obj}) - newDeltas = dedupDeltas(newDeltas) - - if len(newDeltas) > 0 { - if _, exists := f.items[id]; !exists { - f.queue = append(f.queue, id) - } - f.items[id] = newDeltas - f.cond.Broadcast() - } else { - // We need to remove this from our map (extra items in the queue are - // ignored if they are not in the map). - delete(f.items, id) - } - return nil -} -``` - -#### Resync方法 +#### 同步local store到deltaFIFO > 所谓的resync,其实就是把knownObjects即缓存中的对象全部再通过queueActionLocked(Sync, obj)加到队列 @@ -1021,6 +1011,7 @@ func (f *DeltaFIFO) Resync() error { } keys := f.knownObjects.ListKeys() + // 把local store中的对象都以Sync类型增量的形式重新放回到deltaFIFO for _, k := range keys { if err := f.syncKeyLocked(k); err != nil { return err @@ -1046,7 +1037,7 @@ func (f *DeltaFIFO) syncKeyLocked(key string) error { if len(f.items[id]) > 0 { return nil } - + // 可以看到这里跟list一样,增加到deltaFIFO的是一个Sync类型的增量 if err := f.queueActionLocked(Sync, obj); err != nil { return fmt.Errorf("couldn't queue object: %v", err) } @@ -1054,36 +1045,117 @@ func (f *DeltaFIFO) syncKeyLocked(key string) error { } ``` -### +#### 在deltaFIFO增加一个对象 -### shareProcess的实现 +注意这里在append增量时的去重逻辑:如果连续的两个增量类型都是Deleted,那么就去掉一个(正常情况确实不会出现这样,且没必要),优先去掉前面所说的因为re-list可能导致的api与local store不一致而增加的DeletedFinalStateUnknown类型的增量 -shareIndexInformer中具有一个shareProcess结构,用于分发deltaFIFO的对象,调用用户配置的EventHandler处理 +```go +//在队列中给指定的对象append一个Delta +func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { + id, err := f.KeyOf(obj) + if err != nil { + return KeyError{obj, err} + } + // 把增量append到slice的后面 + newDeltas := append(f.items[id], Delta{actionType, obj}) + // 连续的两个Deleted delta将会去掉一个 + newDeltas = dedupDeltas(newDeltas) + if len(newDeltas) > 0 { + // 维护queue队列 + if _, exists := f.items[id]; !exists { + f.queue = append(f.queue, id) + } + f.items[id] = newDeltas + f.cond.Broadcast() + } else { + // We need to remove this from our map (extra items in the queue are + // ignored if they are not in the map). + delete(f.items, id) + } + return nil +} +``` + +当前认为只有连续的两个Delete delta才有必要去重 + +```go +func dedupDeltas(deltas Deltas) Deltas { + n := len(deltas) + if n < 2 { + return deltas + } + // 每次取最后两个delta来判断 + a := &deltas[n-1] + b := &deltas[n-2] + if out := isDup(a, b); out != nil { + d := append(Deltas{}, deltas[:n-2]...) + return append(d, *out) + } + return deltas +} + +func isDup(a, b *Delta) *Delta { + // 当前认为只有连续的两个Delete delta才有必要去重 + if out := isDeletionDup(a, b); out != nil { + return out + } + // TODO: Detect other duplicate situations? Are there any? + return nil +} + +// keep the one with the most information if both are deletions. +func isDeletionDup(a, b *Delta) *Delta { + if b.Type != Deleted || a.Type != Deleted { + return nil + } + // Do more sophisticated checks, or is this sufficient? + // 优先去重DeletedFinalStateUnknown类型的Deleted delta + if _, ok := b.Object.(DeletedFinalStateUnknown); ok { + return a + } + return b +} +``` + + + + + + + +### sharedProcessor的实现 + +shareIndexInformer中的sharedProcess结构,用于分发deltaFIFO的对象,回调用户配置的EventHandler方法 可以看到shareIndexInformer中的process直接通过&sharedProcessor{clock: realClock}初始化 -如下为sharedProcessor结构: - -> listenersStarted:listeners中包含的listener是否都已经启动了 -> listeners:已添加的listener列表,用来处理watch到的数据 -> syncingListeners:已添加的listener列表,用来处理list到的数据 ```go // NewSharedIndexInformer creates a new instance for the listwatcher. func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { realClock := &clock.RealClock{} sharedIndexInformer := &sharedIndexInformer{ + // 初始化一个默认的processor processor: &sharedProcessor{clock: realClock}, indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), listerWatcher: lw, objectType: objType, resyncCheckPeriod: defaultEventHandlerResyncPeriod, defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, + // cacheMutationDetector:可以记录local store是否被外部修改 cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)), clock: realClock, } return sharedIndexInformer } +``` +如下为sharedProcessor结构: + +> listenersStarted:listeners中包含的listener是否都已经启动了 +> listeners:已添加的listener列表,用来处理watch到的数据 +> syncingListeners:已添加的listener列表,用来处理list或者resync的数据 + +```go type sharedProcessor struct { listenersStarted bool listenersLock sync.RWMutex @@ -1094,6 +1166,18 @@ type sharedProcessor struct { } ``` +#### 理解listeners和syncingListeners的区别 + +processor可以支持listener的维度配置是否需要resync:一个informer可以配置多个EventHandler,而一个EventHandler对应processor中的一个listener,每个listener可以配置需不需要resync,如果某个listener需要resync,那么添加到deltaFIFO的Sync增量最终也只会回到对应的listener + +reflector中会定时判断每一个listener是否需要进行resync,判断的依据是看配置EventHandler的时候指定的resyncPeriod,0代表该listener不需要resync,否则就每隔resyncPeriod看看是否到时间了 + +- listeners:记录了informer添加的所有listener + +- syncingListeners:记录了informer中哪些listener处于sync状态 + +syncingListeners是listeners的子集,syncingListeners记录那些开启了resync且时间已经到达了的listener,把它们放在一个独立的slice是避免下面分析的distribute方法中把obj增加到了还不需要resync的listener中 + #### 为sharedProcessor添加listener 在sharedProcessor中添加一个listener @@ -1101,12 +1185,13 @@ type sharedProcessor struct { ```go func (p *sharedProcessor) addListenerLocked(listener *processorListener) { // 同时添加到listeners和syncingListeners列表,但其实添加的是同一个对象的引用 + // 所以下面run启动的时候只需要启动listeners中listener就可以了 p.listeners = append(p.listeners, listener) p.syncingListeners = append(p.syncingListeners, listener) } ``` -#### 启动sharedProcessor中的的listener +#### 启动sharedProcessor中的listener sharedProcessor启动所有的listener 是通过调用listener.run和listener.pop来启动一个listener,两个方法具体作用看下文processorListener说明 @@ -1117,7 +1202,9 @@ func (p *sharedProcessor) run(stopCh <-chan struct{}) { p.listenersLock.RLock() defer p.listenersLock.RUnlock() for _, listener := range p.listeners { + // listener的run方法不断的从listener自身的缓冲区取出对象回调handler p.wg.Start(listener.run) + // listener的pod方法不断的接收对象并暂存在自身的缓冲区中 p.wg.Start(listener.pop) } p.listenersStarted = true @@ -1134,14 +1221,22 @@ func (p *sharedProcessor) run(stopCh <-chan struct{}) { #### sharedProcessor分发对象 -distribute方法是在前面介绍[deltaFIFO pop出来的对象处理逻辑]时提到的,把notification事件添加到listener中,listener如何pop出notification回调EventHandler见下文listener分析 +distribute方法是在前面介绍`[deltaFIFO pop出来的对象处理逻辑]`时提到的,把notification事件添加到listener中,listener如何pop出notification回调EventHandler见下文listener部分分析 + +当通过distribute分发从deltaFIFO获取的对象时,如果delta type是Sync,那么就会把对象交给sync listener来处理,而Sync类型的delta只能来源于下面两种情况: + +- reflector list Replace到deltaFIFO的对象:因为首次在sharedProcessor增加一个listener的时候是同时加在listeners和syncingListeners中的 +- reflector定时触发resync local store到deltaFIFO的对象:因为每次reflector调用processor的shouldResync时,都会把达到resync条件的listener筛选出来重新放到p.syncingListeners + +上面两种情况都可以在p.syncingListeners中准备好listener ```go func (p *sharedProcessor) distribute(obj interface{}, sync bool) { p.listenersLock.RLock() defer p.listenersLock.RUnlock() - // 如果是通过reflector list Replace到deltaFIFO的对象,那么distribute到syncingListeners + // 如果是通过reflector list Replace到deltaFIFO的对象或者reflector定时触发resync到deltaFIFO的对象,那么distribute到syncingListeners if sync { + // 保证deltaFIFO Resync方法过来的delta obj只给开启了resync能力的listener for _, listener := range p.syncingListeners { listener.add(obj) } @@ -1157,10 +1252,16 @@ func (p *sharedProcessor) distribute(obj interface{}, sync bool) { sharedProcessor中的listener具体的类型:运转逻辑就是把用户通过addCh增加的事件发送到nextCh供run方法取出回调Eventhandler,因为addCh和nectCh都是无缓冲channel,所以中间引入ringBuffer做缓存 -> processorListener是sharedIndexInformer调用AddEventHandler时创建并添加到sharedProcess -> 对于一个Informer,可以多次调用AddEventHandler来添加多个listener,但我们的一般的使用场景应该都只会添加一个,作用都是类似enqueue到workQueue -> addCh和nextCh:都将被初始化为无缓冲的chan +processorListener是sharedIndexInformer调用AddEventHandler时创建并添加到sharedProcessor,对于一个Informer,可以多次调用AddEventHandler来添加多个listener + +> addCh:无缓冲的chan,listener的pod方法不断从addCh取出对象丢给nextCh。addCh中的对象来源于listener的add方法,如果nextCh不能及时消费,则放入缓冲区pendingNotifications +> nextCh:无缓冲的chan,listener的run方法不断从nextCh取出对象回调用户handler。nextCh的对象来源于addCh或者缓冲区 > pendingNotifications:一个无容量限制的环形缓冲区,可以理解为可以无限存储的队列,用来存储deltaFIFO分发过来的消息 +> nextResync:由resyncPeriod和requestedResyncPeriod计算得出,与当前时间now比较判断listener是否该进行resync了 +> resyncPeriod:listener自身期待多长时间进行resync +> requestedResyncPeriod:informer希望listener多长时间进行resync + + ```go type processorListener struct { @@ -1195,13 +1296,14 @@ shareProcessor中的distribute方法调用的是listener的add来向addCh增加 ```go func (p *processorListener) add(notification interface{}) { + // 虽然p.addCh是一个无缓冲的channel,但是因为listener中存在ring buffer,所以这里并不会一直阻塞 p.addCh <- notification } ``` #### 判断是否需要resync -前面reflector中会调用这个shouldResync方法根据每个listener的resyncPeriod判断是否需要resync +如果resyncPeriod为0表示不需要resync,否则判断当前时间now是否已经超过了nextResync,是的话则返回true表示需要resync。其中nextResync在每次调用listener的shouldResync方法成功时更新 ```go // shouldResync queries every listener to determine if any of them need a resync, based on each @@ -1209,7 +1311,7 @@ func (p *processorListener) add(notification interface{}) { func (p *sharedProcessor) shouldResync() bool { p.listenersLock.Lock() defer p.listenersLock.Unlock() - + // 这里每次都会先置空列表,保证里面记录了当前需要resync的listener p.syncingListeners = []*processorListener{} resyncNeeded := false @@ -1219,6 +1321,7 @@ func (p *sharedProcessor) shouldResync() bool { // listeners that are going to be resyncing. if listener.shouldResync(now) { resyncNeeded = true + // 达到resync条件的listener被加入syncingListeners p.syncingListeners = append(p.syncingListeners, listener) listener.determineNextResync(now) } @@ -1244,6 +1347,7 @@ func (p *processorListener) run() { for next := range p.nextCh { switch notification := next.(type) { case updateNotification: + // 回调用户配置的handler p.handler.OnUpdate(notification.oldObj, notification.newObj) case addNotification: p.handler.OnAdd(notification.newObj)