funcNewSharedInformerFactoryWithOptions(clientkubernetes.Interface,defaultResynctime.Duration,options...SharedInformerOption)SharedInformerFactory{factory:=&sharedInformerFactory{client:client,namespace:v1.NamespaceAll,defaultResync:defaultResync,informers:make(map[reflect.Type]cache.SharedIndexInformer),startedInformers:make(map[reflect.Type]bool),customResync:make(map[reflect.Type]time.Duration),}// Apply all options
for_,opt:=rangeoptions{factory=opt(factory)}returnfactory}
func(s*sharedIndexInformer)Run(stopCh<-chanstruct{}){fifo:=NewDeltaFIFOWithOptions(DeltaFIFOOptions{KnownObjects:s.indexer,EmitDeltaTypeReplaced:true,})cfg:=&Config{Queue:fifo,ListerWatcher:s.listerWatcher,ObjectType:s.objectType,FullResyncPeriod:s.resyncCheckPeriod,RetryOnError:false,ShouldResync:s.processor.shouldResync,Process:s.HandleDeltas,WatchErrorHandler:s.watchErrorHandler,}func(){s.startedLock.Lock()defers.startedLock.Unlock()s.controller=New(cfg)s.controller.(*controller).clock=s.clocks.started=true}()// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh:=make(chanstruct{})varwgwait.Groupdeferwg.Wait()// Wait for Processor to stop
deferclose(processorStopCh)// Tell Processor to stop
wg.StartWithChannel(processorStopCh,s.cacheMutationDetector.Run)wg.StartWithChannel(processorStopCh,s.processor.run)deferfunc(){s.startedLock.Lock()defers.startedLock.Unlock()s.stopped=true// Don't want any new listeners
}()s.controller.Run(stopCh)}
func(f*DeltaFIFO)Pop(processPopProcessFunc)(interface{},error){f.lock.Lock()deferf.lock.Unlock()for{forlen(f.queue)==0{// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
iff.closed{returnnil,ErrFIFOClosed}f.cond.Wait()}id:=f.queue[0]f.queue=f.queue[1:]iff.initialPopulationCount>0{f.initialPopulationCount--}item,ok:=f.items[id]if!ok{// This should never happen
klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.",id)continue}delete(f.items,id)err:=process(item)ife,ok:=err.(ErrRequeue);ok{f.addIfNotPresent(id,item)err=e.Err}// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
returnitem,err}}
HandleDeltas
HandleDeltas 用来处理 DeltasFIFO Pop 出来的对象,Pop 中调用的 process 则是这里配置的 HandleDeltas 方法。主体逻辑是遍历 Deltas,根据不同的类型,往 indexer 中添加、更新、删除数据。同时,也通过 sharedProcessor 向 Listener 通知事件。
func(s*sharedIndexInformer)HandleDeltas(objinterface{})error{s.blockDeltas.Lock()defers.blockDeltas.Unlock()// from oldest to newest
for_,d:=rangeobj.(Deltas){switchd.Type{caseSync,Replaced,Added,Updated:s.cacheMutationDetector.AddObject(d.Object)ifold,exists,err:=s.indexer.Get(d.Object);err==nil&&exists{iferr:=s.indexer.Update(d.Object);err!=nil{returnerr}isSync:=falseswitch{cased.Type==Sync:// Sync events are only propagated to listeners that requested resync
isSync=truecased.Type==Replaced:ifaccessor,err:=meta.Accessor(d.Object);err==nil{ifoldAccessor,err:=meta.Accessor(old);err==nil{// Replaced events that didn't change resourceVersion are treated as resync events
// and only propagated to listeners that requested resync
isSync=accessor.GetResourceVersion()==oldAccessor.GetResourceVersion()}}}s.processor.distribute(updateNotification{oldObj:old,newObj:d.Object},isSync)}else{iferr:=s.indexer.Add(d.Object);err!=nil{returnerr}s.processor.distribute(addNotification{newObj:d.Object},false)}caseDeleted:iferr:=s.indexer.Delete(d.Object);err!=nil{returnerr}s.processor.distribute(deleteNotification{oldObj:d.Object},false)}}returnnil}
sharedProcessor
sharedProcessor 负责启动已注册的 listener,并启动 listener 的 pop 循环。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func(p*sharedProcessor)run(stopCh<-chanstruct{}){func(){p.listenersLock.RLock()deferp.listenersLock.RUnlock()for_,listener:=rangep.listeners{p.wg.Start(listener.run)p.wg.Start(listener.pop)}p.listenersStarted=true}()<-stopChp.listenersLock.RLock()deferp.listenersLock.RUnlock()for_,listener:=rangep.listeners{close(listener.addCh)// Tell .pop() to stop. .pop() will tell .run() to stop
}p.wg.Wait()// Wait for all .pop() and .run() to stop
}
processorListener
processorListener 提供 add 和 pop 方法,上文说的的 handleDelta 会调用 add 方法来通知事件。然后 pop 接受到事件,通过 nextCh 通道完成事件的回调。
func(p*processorListener)add(notificationinterface{}){p.addCh<-notification}func(p*processorListener)pop(){deferutilruntime.HandleCrash()deferclose(p.nextCh)// Tell .run() to stop
varnextChchan<-interface{}varnotificationinterface{}for{select{casenextCh<-notification:// Notification dispatched
varokboolnotification,ok=p.pendingNotifications.ReadOne()if!ok{// Nothing to pop
nextCh=nil// Disable this select case
}casenotificationToAdd,ok:=<-p.addCh:if!ok{return}ifnotification==nil{// No notification to pop (and pendingNotifications is empty)
// Optimize the case - skip adding to pendingNotifications
notification=notificationToAddnextCh=p.nextCh}else{// There is already a notification waiting to be dispatched
p.pendingNotifications.WriteOne(notificationToAdd)}}}}func(p*processorListener)run(){// this call blocks until the channel is closed. When a panic happens during the notification
// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
// the next notification will be attempted. This is usually better than the alternative of never
// delivering again.
stopCh:=make(chanstruct{})wait.Until(func(){fornext:=rangep.nextCh{switchnotification:=next.(type){caseupdateNotification:p.handler.OnUpdate(notification.oldObj,notification.newObj)caseaddNotification:p.handler.OnAdd(notification.newObj)casedeleteNotification:p.handler.OnDelete(notification.oldObj)default:utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T",next))}}// the only way to get here is if the p.nextCh is empty and closed
close(stopCh)},1*time.Second,stopCh)}
// 截取的部分 list 代码
pager:=pager.New(pager.SimplePageFunc(func(optsmetav1.ListOptions)(runtime.Object,error){returnr.listerWatcher.List(opts)}))// 调用 API-Server 获取资源列表
list,paginatedResult,err=pager.List(context.Background(),options)// list 成功
r.setIsLastSyncResourceVersionUnavailable(false)// list was successful
initTrace.Step("Objects listed")listMetaInterface,err:=meta.ListAccessor(list)iferr!=nil{returnfmt.Errorf("unable to understand list result %#v: %v",list,err)}resourceVersion=listMetaInterface.GetResourceVersion()initTrace.Step("Resource version extracted")items,err:=meta.ExtractList(list)iferr!=nil{returnfmt.Errorf("unable to understand list result %#v (%v)",list,err)}initTrace.Step("Objects extracted")// 往 DeltaFIFO 更新数据
iferr:=r.syncWith(items,resourceVersion);err!=nil{returnfmt.Errorf("unable to sync list result: %v",err)}// sync 完成
initTrace.Step("SyncWith done")r.setLastSyncResourceVersion(resourceVersion)initTrace.Step("Resource version updated")
List 通过调用 k8s api,获取资源列表,然后往 DeltaFIFO 更新数据。
接下来,看一下 Watch 部分。
func(r*Reflector)watchHandler(starttime.Time,wwatch.Interface,resourceVersion*string,errcchanerror,stopCh<-chanstruct{})error{eventCount:=0// Stopping the watcher should be idempotent and if we return from this function there's no way
// we're coming back in with the same watch interface.
deferw.Stop()loop:for{select{case<-stopCh:returnerrorStopRequestedcaseerr:=<-errc:returnerrcaseevent,ok:=<-w.ResultChan():if!ok{breakloop}ifevent.Type==watch.Error{returnapierrors.FromObject(event.Object)}ifr.expectedType!=nil{ife,a:=r.expectedType,reflect.TypeOf(event.Object);e!=a{utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v",r.name,e,a))continue}}ifr.expectedGVK!=nil{ife,a:=*r.expectedGVK,event.Object.GetObjectKind().GroupVersionKind();e!=a{utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v",r.name,e,a))continue}}meta,err:=meta.Accessor(event.Object)iferr!=nil{utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v",r.name,event))continue}newResourceVersion:=meta.GetResourceVersion()switchevent.Type{casewatch.Added:err:=r.store.Add(event.Object)iferr!=nil{utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v",r.name,event.Object,err))}casewatch.Modified:err:=r.store.Update(event.Object)iferr!=nil{utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v",r.name,event.Object,err))}casewatch.Deleted:// TODO: Will any consumers need access to the "last known
// state", which is passed in event.Object? If so, may need
// to change this.
err:=r.store.Delete(event.Object)iferr!=nil{utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v",r.name,event.Object,err))}casewatch.Bookmark:// A `Bookmark` means watch has synced here, just update the resourceVersion
default:utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v",r.name,event))}*resourceVersion=newResourceVersionr.setLastSyncResourceVersion(newResourceVersion)ifrvu,ok:=r.store.(ResourceVersionUpdater);ok{rvu.UpdateResourceVersion(newResourceVersion)}eventCount++}}watchDuration:=r.clock.Since(start)ifwatchDuration<1*time.Second&&eventCount==0{returnfmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received",r.name)}klog.V(4).Infof("%s: Watch close - %v total %v items received",r.name,r.expectedTypeName,eventCount)returnnil}
controller 最终执行到 processLoop,processLoop 是一个无限循环,不停从 DeltaFIFO 中 Pop 数据,并调用之前 controller 配置的 process 方法 HandleDeltas 来处理出队的数据。如果执行失败则调用 AddIfNotPresent 重新入队。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func(c*controller)processLoop(){for{obj,err:=c.config.Queue.Pop(PopProcessFunc(c.config.Process))iferr!=nil{iferr==ErrFIFOClosed{return}ifc.config.RetryOnError{// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)}}}}