client-go的informer的工作流程

以一个自定义controller项目为例,学习informer、controller的工作流程

分为三个阶段

  • 概览
  • 代码
  • 总结

概览,先看简单点的图,等代码看完后总结里会有更详细的图:

各组件的作用:

  • Reflector:称之为反射器,实现对 apiserver 指定类型对象的监控(ListAndWatch),其中反射实现的就是把监控的结果实例化成具体的对象,最终也是调用 Kubernetes 的 List/Watch API;
  • DeltaIFIFO Queue:一个增量队列,将 Reflector 监控变化的对象形成一个 FIFO 队列,此处的 Delta 就是变化;
  • LocalStore:就是 informer 的 cache,这里面缓存的是 apiserver 中的对象(其中有一部分可能还在DeltaFIFO 中),此时使用者再查询对象的时候就直接从 cache 中查找,减少了 apiserver 的压力,LocalStore 只会被 Lister 的 List/Get 方法访问。
  • WorkQueue:DeltaIFIFO 收到时间后会先将对象存储在自己的数据结构中,然后直接操作 Store 中存储的数据,更新完 store 后 DeltaIFIFO 会将该事件 pop 到 WorkQueue 中,Controller 收到 WorkQueue 中的事件会根据对应的类型触发对应的回调函数。

代码走一遍: k8swatch中使用informer的步骤:

1. 对特定资源的listwatch的创建

  lw := cache.NewListWatchFromClient(
      kubeClient,          // 客户端
      resource.Name,       // 被监控资源类型
      "",                  // 被监控命名空间
      fields.Everything()) // 选择器,减少匹配的资源数量

2. informer的创建(这里涉及Indexer的初始化)

  func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
  	realClock := &clock.RealClock{}
  	sharedIndexInformer := &sharedIndexInformer{
  		processor:                       &sharedProcessor{clock: realClock},
  		indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
  		listerWatcher:                   lw,
  		objectType:                      objType,
  		resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
  		defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
  		cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
  		clock:                           realClock,
  	}
  	return sharedIndexInformer
  }

3. 自定义controller的创建,给informer添加handler,增加workqueue


func NewResourceController(eventHandlers []handlers.Handler, informer cache.SharedIndexInformer, config config.Config, resourceType string) *Controller {
	c := &Controller{
		resourceType:  resourceType,
		informer:      informer,
		config:        config,
		eventHandlers: eventHandlers,
	}
	c.queue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), resourceType)
	var cacheMeta CacheMeta
	var err error
	c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			cacheMeta.Key, err = cache.MetaNamespaceKeyFunc(obj)
			if err != nil {
				zlog.Error("cache.MetaNamespaceKeyFunc(obj) failed", zap.Error(err))
				return
			}
			cacheMeta.Action = event.CreateEvent
			cacheMeta.Kind = resourceType
			c.queue.Add(cacheMeta)
		},
		UpdateFunc: func(old, new interface{}) {
			oldEvent := event.New(old, event.UpdateEvent)
			newEvent := event.New(new, event.UpdateEvent)
			if oldEvent.ResourceVersion == newEvent.ResourceVersion {
				zlog.Warnf("ResourceVersion not change in UpdateEvent:%s/%s", newEvent.Namespace, newEvent.Name)
				return
			}
			cacheMeta.Key, err = cache.MetaNamespaceKeyFunc(new)
			if err != nil {
				zlog.Error("cache.MetaNamespaceKeyFunc(obj) failed", zap.Error(err))
				return
			}
			cacheMeta.Action = event.UpdateEvent
			cacheMeta.Kind = resourceType
			cacheMeta.ResourceVersion = newEvent.ResourceVersion
			c.queue.Add(cacheMeta)
      //...
		},
		DeleteFunc: func(obj interface{}) {
			cacheMeta.Key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
			if err != nil {
				zlog.Error("cache.DeletionHandlingMetaNamespaceKeyFunc(obj) failed", zap.Error(err))
				return
			}
			cacheMeta.Action = event.DeleteEvent
			cacheMeta.Kind = resourceType
			c.queue.Add(cacheMeta)
			zlog.Debugf("DeleteFunc queue.add item :%+v c.queue.Len():%d", cacheMeta, c.queue.Len())
		},
	})
	return c
}


4. 自定义controller的启动:阶段一(生产):informer.Run;阶段二(消费):同步后runworker

// Run starts the k8swatch controller
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	defer c.queue.ShutDown()
	go c.informer.Run(stopCh)
	if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced, c.HasSynced) {
		zlog.Errorf("Timed out waiting for caches to sync,controller type:%s", c.resourceType)
		utilruntime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
		return
	}
	zlog.Infof("%s controller synced and ready", c.resourceType)

	for i := 0; i < threadiness; i++ {
		go wait.Until(c.runWorker, time.Second, stopCh)
	}
	zlog.Infof("Started workers")
	<-stopCh
	zlog.Infof("Stopping %s Controller", c.resourceType)
}

func (c *Controller) runWorker() {
	for c.processNextItem() {}
}
func (c *Controller) processNextItem() bool {
	cacheMeta, quit := c.queue.Get()
	if quit {
		return false
	}
	defer c.queue.Done(cacheMeta)
	var err error
	go func() {
		err = c.process(cacheMeta.(CacheMeta))
	}()
	c.handleErr(err, cacheMeta)
	return true
}

以上便是业务代码编写的全部流程,非常简单,但想弄清背后的原理,还需不断看底层实现,这里尝试自顶向下梳理。

  • 第1步是listwatch的初始化,主要是返回一个针对某类资源的ListFunc和WatchFunc。
  • 第2步是informer的初始化,新建Indexer并将上述Listwatcher一同放入informer结构中
  • 第3步是给informer添加AddEventHandler(通常包含add update delete),并根据情况引入workqueue(可是默认的也可以是限速、延时等队列)
  • 第4步是总开关,先是启动informer.Run(),然后等待同步完成后是worker的启动,监听退出信号等待退出。

第1,2步比较简单看源码即可,往下看第3和4步是如何联动的?

AddEventHandler:

// client-go/tools/cache/shared_informer.go
// AddEventHandler
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
	s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
	s.startedLock.Lock()
	defer s.startedLock.Unlock()
  // 如果informer已关闭,则不再添加任何handler
	if s.stopped {
		klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)
		return
	}
	if resyncPeriod > 0 {
    // 如果新的同步周期小于当前的最小同步周期,则将其设置为最小同步周期。
		if resyncPeriod < minimumResyncPeriod {
			klog.Warningf("resyncPeriod %d is too small. Changing it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod)
			resyncPeriod = minimumResyncPeriod
		}
    // 如果同步周期小于已内部协调后的周期间隔,则看informer是否已启动,如果已启动,则按照当前的来。若没启动,则全体采用该同步周期作为全局最小周期以对齐。
		if resyncPeriod < s.resyncCheckPeriod {
			if s.started {
				klog.Warningf("resyncPeriod %d is smaller than resyncCheckPeriod %d and the informer has already started. Changing it to %d", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
				resyncPeriod = s.resyncCheckPeriod
			} else {
				s.resyncCheckPeriod = resyncPeriod
				s.processor.resyncCheckPeriodChanged(resyncPeriod)
			}
		}
	}
  // 这里把外部传来的handler封装为listener
	listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
  // 如果informer还没启动,则直接添加。
	if !s.started {
		s.processor.addListener(listener)
		return
	}
  // informer若已启动,则要安全的添加handler——需首先停止发送add update delete notifications,具体实现就是通过s.blockDeltas.Lock()加锁
  // 然后添加listener
  // 再遍历缓存中的所有对象,全部都告诉listener,避免对象被处理过而忽视这个新来的listener,最后解锁。
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()
	s.processor.addListener(listener)
	for _, item := range s.indexer.List() {
		listener.add(addNotification{newObj: item})
	}
}

通过AddEventHandler()添加的处理器(handler)最终就会封装成processorListener,然后通过sharedProcessor管理起来,通过processorListener的封装就可以达到所谓的”有事处理,没事挂起。”,如何实现?

func (p *sharedProcessor) addListener(listener *processorListener) {
	p.listenersLock.Lock()
	defer p.listenersLock.Unlock()

	p.addListenerLocked(listener)
	if p.listenersStarted {
    // listenser如已启动(初次运行run之后会将p.listensersStarted设置为true),则直接运行run pod方法,
		p.wg.Start(listener.run)
		p.wg.Start(listener.pop)
	}
}
// 添加到listensers、syncingListeners组里
func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
  // 定时同步和不定时同步处理器数组都添加了,这是因为没有定时同步的也会用默认的时间。
	p.listeners = append(p.listeners, listener)
	p.syncingListeners = append(p.syncingListeners, listener)
}

对于: p.wg.Start(listener.run)、p.wg.Start(listener.pop)即运行shared_informer.go中的processorListener的run()和pop()方法以实现事件的缓冲和处理的目的,往下看:

type processorListener struct {
	nextCh chan interface{}
	addCh  chan interface{}
	handler ResourceEventHandler
	pendingNotifications buffer.RingGrowing
	requestedResyncPeriod time.Duration
	resyncPeriod time.Duration
	nextResync time.Time
	resyncLock sync.Mutex
}
// 等待被调用,被调用后,将事件添加到addCh中。下面的pop方法会从addCh中取出再放入nextCh,run方法则从nextCh中取出再做处理。
func (p *processorListener) add(notification interface{}) {
	p.addCh <- notification
}

func (p *processorListener) pop() {
	defer utilruntime.HandleCrash()
	defer close(p.nextCh) // Tell .run() to stop
	var nextCh chan<- interface{}
	var notification interface{}
	for {
		select {
		case nextCh <- notification:
			var ok bool
			notification, ok = p.pendingNotifications.ReadOne()
			if !ok { // Nothing to pop
				nextCh = nil // Disable this select case
			}
		case notificationToAdd, ok := <-p.addCh:
			if !ok {
				return
			}
			if notification == nil { // No notification to pop (and pendingNotifications is empty)
				// Optimize the case - skip adding to pendingNotifications
				notification = notificationToAdd
				nextCh = p.nextCh
			} else { // There is already a notification waiting to be dispatched
				p.pendingNotifications.WriteOne(notificationToAdd)
			}
		}
	}
}

func (p *processorListener) run() {
	stopCh := make(chan struct{})
	wait.Until(func() {
		err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
			for next := range p.nextCh {
        // 判断事件类型,这里面的handler就是调用SharedInfomer.AddEventHandler()传入的
        // 理论上处理的不是Deltas么?怎么变成了其他类型,这是SharedInformer做的二次封装,后面会看到
				switch notification := next.(type) {
				case updateNotification:
					p.handler.OnUpdate(notification.oldObj, notification.newObj)
				case addNotification:
					p.handler.OnAdd(notification.newObj)
				case deleteNotification:
					p.handler.OnDelete(notification.oldObj)
				default:
					utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
				}
			}
			// the only way to get here is if the p.nextCh is empty and closed
			return true, nil
		})
		// the only way to get here is if the p.nextCh is empty and closed
		if err == nil {
			close(stopCh)
		}
	}, 1*time.Minute, stopCh)
}
// client-go/tools/cache/controller.go

// OnAdd calls AddFunc if it's not nil. 即:p.handler.AddFunc() 即
// c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
//		AddFunc: func(obj interface{}) {}
func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) {
	if r.AddFunc != nil {
		r.AddFunc(obj)  // c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(){}
	}
}


先看processorListener的run()

  • wait.Until(func(){},1*time.Minute,stopCh)即1分钟检查一次内部函数是否在运行,若停止则运行它,
  • 这个函数里面用到wait.ExponentialBackoff(),它和wait.Until()类似,wait.Until()是无限循环,它是尝试几次后,每次等待时间指数上涨。
  • 该函数从p.nextCh中取通知对象,根据事件类型匹配到不同的handler里,即ResourceEventHandler接口的不同方法里(会被自动触发执行)

再看processorListener的pop()

  • 从for,select开始,如果nextCh尚未初始化,则被阻塞,当nextCh被赋值为p.nextCh这个无缓冲的channel,数据不发送成功也阻塞。
  • 如果从p.addCh读取出事件,开始消费,如果前面定义的notification为nil说明当前还没发送任何事件给处理器,就把从addCh中取出来的事件赋给notification,然后把 p.nextCh赋给nextCh
  • 如果notification不为空,说明上一个事件的处理还没发送成功,于是把新读取出的事件放入缓存(p.pendingNotifications)中,类似一个数组。

上面两个函数即processorListener的核心功能:事件的缓冲和处理,此处的处理就是使用者传入的函数。在没有事件的时候可以阻塞处理器,当事件较多时可以把事件缓冲起来,实现了事件分发器与处理器的异步处理。

processor会做几个事情

  1. 当apiserver反馈资源变化,DeltaFIFO输出Deltas,通过processorListener.add()将事件传入到channel中
  2. 缓冲处理器
  3. 阻塞处理器进而形成listener

回到4,informer

informer.Run(),这里涉及DeltaFIFO的初始化 内部controller的建立、processor和内部controller的运行

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
  // 新建DeltaFIFO
	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
  // 新建cfg,用于建立informer内部的controller
	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    s.listerWatcher,
		ObjectType:       s.objectType,
		FullResyncPeriod: s.resyncCheckPeriod,
		RetryOnError:     false,
		ShouldResync:     s.processor.shouldResync,
		Process: s.HandleDeltas,
	}
  // TODO 这里为啥要放一个独立的func里执行?
  // 建立informer内部的controller
	func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()

		s.controller = New(cfg)
		s.controller.(*controller).clock = s.clock
		s.started = true
	}()

	// Separate stop channel because Processor should be stopped strictly after controller
	processorStopCh := make(chan struct{})
	var wg wait.Group
	defer wg.Wait()              // Wait for Processor to stop
	defer close(processorStopCh) // Tell Processor to stop
	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
	wg.StartWithChannel(processorStopCh, s.processor.run)

	defer func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()
		s.stopped = true // Don't want any new listeners
	}()
	s.controller.Run(stopCh)
}
  • 建立DeltaFIFO,controller,wg启动processor.run,再启动controller.Run
  • 其中第一个wg启动的是用来做脏数据(数据突变)校验的,默认没开启,所以暂可不看。
  • 第二个wg启动的就是processor.run
  • 启动controller.run ,controller通过DeltaFIFO.Pop()函数弹出Deltas,并调用函数处理,这个处理函数就是sharedIndexInformer.HandleDeltas(),他把Deltas转换为sharedProcess需要的各种Notification类型。
  • 注意这里的一个提示:processor的关闭顺序必须严格在controller之后。

informer的创建与启动过程中先后完成了相关组件的创建(Indexer,DeltaFIFO,Reflector)

  1. cache.NewSharedIndexInformer()会新建NewIndexer,即Indexer(localstore)
  2. sharedIndexInformer.Run()会通过NewDeltaFIFO创建DeltaFIFO
  3. 先启动s.processor.run,即: &sharedProcessor{clock: realClock}.run
  4. 最后启动controller.Run,创建了Reflector

processor.run

// 启动所有listener的run pop,等待信号并结束。
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
	func() {
		p.listenersLock.RLock()
		defer p.listenersLock.RUnlock()
		for _, listener := range p.listeners {
			p.wg.Start(listener.run) // listener.run 和pop在第三步详解里已介绍过
			p.wg.Start(listener.pop)
		}
		p.listenersStarted = true
	}()
	<-stopCh
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()
	for _, listener := range p.listeners {
		close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
	}
	p.wg.Wait() // Wait for all .pop() and .run() to stop
}

controller.run

func (c *controller) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	go func() {
		<-stopCh
		c.config.Queue.Close()
	}()
	r := NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)
	r.ShouldResync = c.config.ShouldResync
	r.clock = c.clock

	c.reflectorMutex.Lock()
	c.reflector = r
	c.reflectorMutex.Unlock()

	var wg wait.Group
	defer wg.Wait()

	wg.StartWithChannel(stopCh, r.Run)
	wait.Until(c.processLoop, time.Second, stopCh)
}

做了三件事:

  1. 新建了Informer中的Reflector组件
  2. wg.StartWithChannel(stopCh, r.Run),即协程运行:reflector.Run
  3. wait.Until(c.processLoop, time.Second, stopCh),

wg.StartWithChannel(stopCh, r.Run)即开始listwatch,从apiserver获取事件,更新到DeltaFIFO队列中。

func (r *Reflector) Run(stopCh <-chan struct{}) {
	klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
	wait.Until(func() {
		if err := r.ListAndWatch(stopCh); err != nil {
			utilruntime.HandleError(err)
		}
	}, r.period, stopCh)
}

处理循环则不断从队列中读取事件(queue即DeltaFIFO)

func (c *controller) processLoop() {
	for {
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		if err != nil {
			if err == ErrFIFOClosed {
				return
			}
			if c.config.RetryOnError {
				c.config.Queue.AddIfNotPresent(obj)
			}
		}
	}
}

c.config.Process是啥呢,往回看代码:

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	cfg := &Config{
    //...
		Process: s.HandleDeltas,
	}
  s.controller = New(cfg)
}

很明了,即c.config.Process就是s.HandleDeltas

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	// from oldest to newest 因为Deltas里包含了一个对象的多个增量操作,所以要从最老的Delta到最新的Delta遍历处理
	for _, d := range obj.(Deltas) {
		switch d.Type {
    // 根据不同的Delta做不同操作,大概分为对象添加、删除两大类。
    // 所有操作都要现在cache先同步,再通知处理器 以保持处理器和cache的状态一致。
		case Sync, Added, Updated:
			isSync := d.Type == Sync
			s.cacheMutationDetector.AddObject(d.Object)
      // 如果cache里有的对象,一律当做更新事件
			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
        // 更新缓存中的对象
				if err := s.indexer.Update(d.Object); err != nil {
					return err
				}
        // 通知handler处理事件
				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
			} else {
				if err := s.indexer.Add(d.Object); err != nil {
					return err
				}
				s.processor.distribute(addNotification{newObj: d.Object}, isSync)
			}
		case Deleted:
			if err := s.indexer.Delete(d.Object); err != nil {
				return err
			}
			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
		}
	}
	return nil
}

// 对应前面listener.pop listener.run俩方法,他们俩对事件进行缓冲以及处理。
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()
	if sync {
		for _, listener := range p.syncingListeners {
			listener.add(obj)
		}
	} else {
		for _, listener := range p.listeners {
			listener.add(obj)
		}
	}
}

p.listeners其实就是前面在AddEventHandler时注册的回调函数列表。

到此基本梳理清楚流程了:

DeltaFIFO 的r.syncwith() 和r.store.Resync()分别是做什么的?

  • syncwith()–>r.store.Replace()–>重建DeltaFIFO中的 queue([]string)、items(map[string]deltas)
  • r.store.Resync()–>根据DeltaFIFO中的items,同步删除本地缓存中的对象。啥时候添加——handledelta时。

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
	var resourceVersion string
	options := metav1.ListOptions{ResourceVersion: "0"}
	if err := func() error {
		initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
		defer initTrace.LogIfLong(10 * time.Second)
		var list runtime.Object
		var err error
		listCh := make(chan struct{}, 1)
		panicCh := make(chan interface{}, 1)
		go func() {
			defer func() {
				if r := recover(); r != nil {
					panicCh <- r
				}
			}()
			pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
				return r.listerWatcher.List(opts)
			}))
			if r.WatchListPageSize != 0 {
				pager.PageSize = r.WatchListPageSize
			}
			list, err = pager.List(context.Background(), options)
			close(listCh)
		}()
		select {
		case <-stopCh:
			return nil
		case r := <-panicCh:
			panic(r)
		case <-listCh:
		}
		if err != nil {
			return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)
		}
		initTrace.Step("Objects listed")
		listMetaInterface, err := meta.ListAccessor(list)
		if err != nil {
			return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
		}
		resourceVersion = listMetaInterface.GetResourceVersion()
		initTrace.Step("Resource version extracted")
		items, err := meta.ExtractList(list)
		if err != nil {
			return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
		}
		initTrace.Step("Objects extracted")
		if err := r.syncWith(items, resourceVersion); err != nil {
			return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
		}
		initTrace.Step("SyncWith done")
		r.setLastSyncResourceVersion(resourceVersion)
		initTrace.Step("Resource version updated")
		return nil
	}(); err != nil {
		return err
	}

	resyncerrc := make(chan error, 1)
	cancelCh := make(chan struct{})
	defer close(cancelCh)
	go func() {
		resyncCh, cleanup := r.resyncChan()
		defer func() {
			cleanup() // Call the last one written into cleanup
		}()
		for {
			select {
			case <-resyncCh:
			case <-stopCh:
				return
			case <-cancelCh:
				return
			}
			if r.ShouldResync == nil || r.ShouldResync() {
				klog.V(4).Infof("%s: forcing resync", r.name)
				if err := r.store.Resync(); err != nil {
					resyncerrc <- err
					return
				}
			}
			cleanup()
			resyncCh, cleanup = r.resyncChan()
		}
	}()
	for {
		select {
		case <-stopCh:
			return nil
		default:
		}
		timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
		options = metav1.ListOptions{
			ResourceVersion: resourceVersion,
			TimeoutSeconds: &timeoutSeconds,
			AllowWatchBookmarks: true,
		}

		w, err := r.listerWatcher.Watch(options)
		if err != nil {
			switch err {
			case io.EOF:
				// watch closed normally
			case io.ErrUnexpectedEOF:
				klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
			default:
				utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
			}
      // 这里曾经出过一个bug,当apiserver重启后,watch端未能有效捕捉到异常导致return nil后重新出发listwatch,后被补上。
			if utilnet.IsConnectionRefused(err) {
				time.Sleep(time.Second)
				continue
			}
			return nil
		}
		if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
			if err != errorStopRequested {
				switch {
				case apierrs.IsResourceExpired(err):
					klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
				default:
					klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
				}
			}
			return nil
		}
	}
}

DeltaFIFO中的knownObjects到底是什么

type DeltaFIFO struct {
  // ...
  // knownObjects list keys that are "known", for the
	// purpose of figuring out which items have been deleted
	// when Replace() or Delete() is called.
	knownObjects KeyListerGetter
}

在Replace、 Delete()被调用时用来检查哪些item已经被删除。分析其创建路径:


func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
	realClock := &clock.RealClock{}
	sharedIndexInformer := &sharedIndexInformer{
		processor:                       &sharedProcessor{clock: realClock},
		indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
		listerWatcher:                   lw,
    //...
	}
	return sharedIndexInformer
}

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
}

func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
	f := &DeltaFIFO{
		items:        map[string]Deltas{},
		queue:        []string{},
		keyFunc:      keyFunc,
		knownObjects: knownObjects,
	}
	f.cond.L = &f.lock
	return f
}

NewIndexer()作为SharedIndexInformer的indexer,继而利用其创建NewDeltaFIFO,indexer作为DeltaFIFO的knownObjects

在listwatch环节,会更新DeltaFIFO中的queue和items,也会更新knownObjects(即Indexer)中要删除的对象。 而Indexer中常规的添加、更新、删除等操作则是在processor中执行的,即在handleDeltas方法里执行如:s.indexer.Add(d.Object)后再通过distribute发送到p.addCh中供 listener缓冲和消费。

总结

Informer整体工作流程:

  • Reflector使用ListAndWatch方法,先从apiserver中list某类资源的所有实例,拿到对象的最新版本,然后用watch方法监听该resourceversion之后的所有变化,若中途出现异常,reflector则会从断开的resourceversion处重新监听所有变化 一旦有Add、Del、Update动作,Reflector会收到更新通知,该事件及它所对应的API对象这个组合,被称为增量Delta,它会被放进DeltaFIFO中
  • Informer会不断从这个DeltaFIFO中读取增量,每拿出一个对象,Informer就会判断这个增量的事件类型,然后创建或更新本地的缓存。
  • DeltaFIFO再pop这个事件到controller中,controller会调用事先注册到ResourceEventHandler回调函数进行处理。

官方git上的流程图:

代码图:


bookmark 可以将 API Server 重启时需要重新同步的事件降低为原来的 3%(性能提高了几十倍),该功能有阿里云容器平台开发,已经发布到社区 Kubernetes-1.15 版本中。

场景: k8swatch重启后,会重新list&watch,会在list时拉取所有的信息。 但如果是apiserver重启后呢?k8swatch按计划只是watch,但对于apiserver的本地store的队列已经是空的了,k8swatch发来watch之后,版本号不一致,于是apiserver会触发client重新list所有数据,导致apiserver的压力陡增。

这时候 API Server 无法判断 5 与当前队列最小值(7)之间是否存在客户端需要感知的变更,因此返回 Client too old version err 触发 Client 重新 list 所有的数据。

参考:

其他

listwatch方法每秒调用一次,那么意味着每秒都要执行一次list和watch吗?(period默认为1秒即:time.Second)

func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
	return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}
// NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
	r := &Reflector{
    //...
		period:        time.Second,
	}
	return r
}
func (r *Reflector) Run(stopCh <-chan struct{}) {
	klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
	wait.Until(func() {
		if err := r.ListAndWatch(stopCh); err != nil {
			utilruntime.HandleError(err)
		}
	}, r.period, stopCh)
}

这里其实是一开始对wait.Until有误解,它其中的方法如果没退出,是不会在下一秒继续执行的,只有等方法退出后,才会在下一秒中检测到退出并重新执行,因此这里其实是个监控(功能类似keepalived)。