client-go中的Indexer

啥是Indexer?

如图:它就是左下角的Informer中的LocalStore

UML图 从类的实现、组合关系上可见,cache实现了Store interface的所有方法,因此可以认为cache是Store类型;cache也实现了Indexer的所有方法,也可以认为cache是Indexer类型。由上图可见,理解threadSafeMap的实现能帮助我们理解cache。

Indexer的接口定义

client-go/tools/cache/index.go

// Indexer is a storage interface that lets you list objects using multiple indexing functions.
// There are three kinds of strings here.
// One is a storage key, as defined in the Store interface.
// Another kind is a name of an index.
// The third kind of string is an "indexed value", which is produced by an
// IndexFunc and can be a field value or any other string computed from the object.
type Indexer interface {
	Store
	// Index returns the stored objects whose set of indexed values
	// intersects the set of indexed values of the given object, for
	// the named index
	Index(indexName string, obj interface{}) ([]interface{}, error)
	// IndexKeys returns the storage keys of the stored objects whose
	// set of indexed values for the named index includes the given
	// indexed value
	IndexKeys(indexName, indexedValue string) ([]string, error)
	// ListIndexFuncValues returns all the indexed values of the given index
	ListIndexFuncValues(indexName string) []string
	// ByIndex returns the stored objects whose set of indexed values
	// for the named index includes the given indexed value
	ByIndex(indexName, indexedValue string) ([]interface{}, error)
	// GetIndexer return the indexers
	GetIndexers() Indexers

	// AddIndexers adds more indexers to this store.  If you call this after you already have data
	// in the store, the results are undefined.
	AddIndexers(newIndexers Indexers) error
}

它所封装的Store接口的定义: client-go/tools/cache/store.go

// Store is a generic object storage interface. Reflector knows how to watch a server
// and update a store. A generic store is provided, which allows Reflector to be used
// as a local caching system, and an LRU store, which allows Reflector to work like a
// queue of items yet to be processed.
//
// Store makes no assumptions about stored object identity; it is the responsibility
// of a Store implementation to provide a mechanism to correctly key objects and to
// define the contract for obtaining objects by some arbitrary key type.
type Store interface {
	Add(obj interface{}) error
	Update(obj interface{}) error
	Delete(obj interface{}) error
	List() []interface{}
	ListKeys() []string
	Get(obj interface{}) (item interface{}, exists bool, err error)
	GetByKey(key string) (item interface{}, exists bool, err error)

	// Replace will delete the contents of the store, using instead the
	// given list. Store takes ownership of the list, you should not reference
	// it after calling this function.
	Replace([]interface{}, string) error
	Resync() error
}

Indexer接口的一个经典实现——cache

cache是Indexer的一种非常经典的实现,所有的对象缓存在内存中,而且从cache这个类型的名称来看属于包内私有类型,外部无法直接使用,只能通过专用的函数创建。

cache的结构定义

和store接口在同一个文件中:client-go/tools/cache/store.go,它是一个内部私有包,可通过NewStrore或NewIndexer来初始化构建

// cache responsibilities are limited to:
//	1. Computing keys for objects via keyFunc
//  2. Invoking methods of a ThreadSafeStorage interface
type cache struct {
	// cacheStorage bears the burden of thread safety for the cache
	cacheStorage ThreadSafeStore
	// keyFunc is used to make the key for objects stored in and retrieved from items, and
	// should be deterministic.
	keyFunc KeyFunc
}

// NewStore returns a Store implemented simply with a map and a lock.
// 据此也可知cache实现了Store接口和Indexer接口
func NewStore(keyFunc KeyFunc) Store {
	return &cache{
		cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
		keyFunc:      keyFunc,
	}
}

// NewIndexer returns an Indexer implemented simply with a map and a lock.
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
	return &cache{
		cacheStorage: NewThreadSafeStore(indexers, Indices{}),
		keyFunc:      keyFunc,
	}
}

cache依赖的ThreadSafeStore

client-go/tools/cache/thread_safe_store.go

// ThreadSafeStore is an interface that allows concurrent access to a storage backend.
// TL;DR caveats: you must not modify anything returned by Get or List as it will break
// the indexing feature in addition to not being thread safe.
//
// The guarantees of thread safety provided by List/Get are only valid if the caller
// treats returned items as read-only. For example, a pointer inserted in the store
// through `Add` will be returned as is by `Get`. Multiple clients might invoke `Get`
// on the same key and modify the pointer in a non-thread-safe way. Also note that
// modifying objects stored by the indexers (if any) will *not* automatically lead
// to a re-index. So it's not a good idea to directly modify the objects returned by
// Get/List, in general.
type ThreadSafeStore interface {
	Add(key string, obj interface{})
	Update(key string, obj interface{})
	Delete(key string)
	Get(key string) (item interface{}, exists bool)
	List() []interface{}
	ListKeys() []string
	Replace(map[string]interface{}, string)
	Index(indexName string, obj interface{}) ([]interface{}, error)
	IndexKeys(indexName, indexKey string) ([]string, error)
	ListIndexFuncValues(name string) []string
	ByIndex(indexName, indexKey string) ([]interface{}, error)
	GetIndexers() Indexers

	// AddIndexers adds more indexers to this store.  If you call this after you already have data
	// in the store, the results are undefined.
	AddIndexers(newIndexers Indexers) error
	Resync() error
}

// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
	lock  sync.RWMutex
	items map[string]interface{}

	// indexers maps a name to an IndexFunc
	indexers Indexers
	// indices maps a name to an Index
	indices Indices
}

索引键和对象键

  • threadSafeStore是和Store接口类似但不同的,增删改查传入的参数不同,前者是 Add(key string, obj interface{}) 后者是 Add(obj interface{}) error ,前者多了一个key,这个key是对象键。对应着的是threadSafeMap中的items这个map中的key。对象键是为对象在存储中的唯一命名的,对象是通过名字+对象的方式存储的。

  • indexKey是indexName索引类中一个索引键,索引键是用于对象快速查找的,经过索引建在map中排序查找会更快。

threadSafeStore的增删改查实现

// client-go/tools/cache/thread_safe_store.go

func (c *threadSafeMap) Add(key string, obj interface{}) {
	c.lock.Lock()
	defer c.lock.Unlock()
	oldObject := c.items[key]
	c.items[key] = obj
	c.updateIndices(oldObject, obj, key)
}

func (c *threadSafeMap) Update(key string, obj interface{}) {
	c.lock.Lock()
	defer c.lock.Unlock()
	oldObject := c.items[key]
	c.items[key] = obj
	c.updateIndices(oldObject, obj, key)
}

func (c *threadSafeMap) Delete(key string) {
	c.lock.Lock()
	defer c.lock.Unlock()
	if obj, exists := c.items[key]; exists {
		c.deleteFromIndices(obj, key)
		delete(c.items, key)
	}
}

func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
	c.lock.RLock()
	defer c.lock.RUnlock()
	item, exists = c.items[key]
	return item, exists
}

func (c *threadSafeMap) List() []interface{} {
	c.lock.RLock()
	defer c.lock.RUnlock()
	list := make([]interface{}, 0, len(c.items))
	for _, item := range c.items {
		list = append(list, item)
	}
	return list
}

func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
	c.lock.Lock()
	defer c.lock.Unlock()
	c.items = items

	// rebuild any index
	c.indices = Indices{}
	for key, item := range c.items {
		c.updateIndices(nil, item, key)
	}
}

注:运行go语言接口中的方法即运行该方法的实现。以threadSafeMap为例,在运行cache.Add函数中的“c.cacheStorage.Add(key, obj)”时,实际是在运行”(&threadSafeMap{items:map[string]interface{}{}, indexers: indexers, indices: indices}).Add(key, obj)“

client-go中的很多实现封装都非常规范,如:

  • index.go中给出了索引相关的操作(接口)
  • store.go中给出了与操作存储相关的接口,并提供了一个cache实现,当然也可以自行实现Store接口
  • thread_safe_store.go为cache的私有实现

client-go的indexer实际操作的还是threadSafeMap中的方法和数据,调用关系如下:

最容易混淆的就是threadSafeMap中的各种索引及其之间的关系:

threadSafeMap内部对象关系示意图:

indexers Indexers: map[string]indexFunc

1. key:indexFuncName
2. value:indexFunc
3. 划分了不同的索引类型(indexName,如namespace),并按照索引类型进行索引(indexFunc,如MetaNamespaceIndexFunc)得出符合该对象的索引键(indexKeys,如namespace),一个对象在一个索引类型中可能有多个索引键。
	// client-go/informer/apps/v1beta1/deployment.go
	const (
		// NamespaceIndex is the lookup name for the most comment index function, which is to index by the namespace field.
		NamespaceIndex string = "namespace"
	)

	func (f *deploymentInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
		return NewFilteredDeploymentInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
	}

这里在创建NewFilteredDeploymentInformer时传入了一个cache.Indexers,它在构造时,使用常量NamespaceIndex(“namespace”)作为key, cache.MetaNamespaceIndexFunc作为value(该方法随后看,其返回对象的[]ns值)

indices Indices: map[string]Index

  • indices是对象快速索引表,用于快速查找对象,map的key是indexName,value是Index对象,即map[string]sets.String。
  • 它按照索引类型保存了索引(index,如包含所有namespace下面的obj)进而可以按照索引键找出特定的对象键(keys,如某个namespace下的对象键),

Index:map[string]set.String

  • key是indexFunc计算的值,如MetaNamespaceIndexFunc返回的是namespace,如果使用indexByPodNodeName那么返回nodename
  • value是set.String,值是对象键objkey

items map[string]interface

  • 按照对象键保存了实际的对象。
  • key是用函数(如MetaNamespaceKeyFunc)生成,value是runtime.object本身

indexer做了两件事:

  • 存储,保存k8s的runtime.object到items中,实际的对象存储在threadSafeMap结构的items。
  • 索引,为items map的每个对象创建三层索引分别是:
    1. 第一层是indices map的类别索引,如按照'namespace’ ‘nodename'来索引
    2. 第二层是index map的详细类别索引,如按照'namespace1’,‘namespace2’ 或'nodeName1’,‘nodeName2’
    3. 第三层是indexSet的对象键,第三层索引才真正索引到runtime.object

例子: // client-go/tools/cache/thread_safe_store.go threadSafeMap在添加、更新、删除资源时的索引操作(注意object的添加、更新、删除在对应Update、Add、Delete方法里 不在这里) 这里的主要操作即层层遍历,找到最终索引set,并在其中添加或删除记录。

  • updateIndices函数被Add,Update,Replace等函数调用。updateIndices执行步骤如下:
  1. updateIndices函数首先删除oldObj的索引键;
  2. 遍历indexFunc map c.indexers,获取第一层索引indices的索引类别name,并计算第二层索引Index的详细索引类别indexValues(数组);
  3. 遍历第二层索引Index,依次执行:若第三层索引IndexSet为空则添加一个空set;然后添加对象键到第三层索引IndexSet。
// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
// updateIndices must be called from a function that already has a lock on the cache
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
	// if we got an old object, we need to remove it before we add it again
	if oldObj != nil {
		c.deleteFromIndices(oldObj, key)
	}
	for name, indexFunc := range c.indexers {
		// 遍历indexers,根据名字和方法,再获取对应的值,例如indexFunc=MetaNamespaceIndexFunc的时候返回的就是ns的slice
		indexValues, err := indexFunc(newObj)
		if err != nil {
			panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
		}
		index := c.indices[name]
		if index == nil {
			index = Index{}
			c.indices[name] = index
		}

		// 遍历所有indexValues(如namespace)
		for _, indexValue := range indexValues {
			// 取出索引集 并更新
			set := index[indexValue]
			if set == nil {
				set = sets.String{}
				index[indexValue] = set
			}
			// 这里的set是存储索引的地方。
			set.Insert(key)
		}
	}
}
  • deleteFromIndices被Delete,updateIndices等函数调用,它的功能是删除indices的某个索引。deleteFromIndices函数执行步骤如下:
  1. 遍历indexFunc map c.indexers获取第一层索引indices的类别name,并计算第二层索引index的详细索引类别indexValue;
  2. 遍历第二层索引表index,依次删除第三层索引表indexSet的对象键。
// deleteFromIndices removes the object from each of the managed indexes
// it is intended to be called from a function that already has a lock on the cache
func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) {
	for name, indexFunc := range c.indexers {
		indexValues, err := indexFunc(obj)
		if err != nil {
			panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
		}

		index := c.indices[name]
		if index == nil {
			continue
		}
		for _, indexValue := range indexValues {
			set := index[indexValue]
			if set != nil {
				set.Delete(key)
			}
		}
	}
}

以namespace作为索引类型为例来讲,首先从indexers获取计算namespace的indexFunc,然后使用该indexFunc计算出与入参对象相关的所有namespaces。indices中保存了所有namespaces下面的对象键,可以获取特定namespace下面的所有对象键,在items中输入特定的对象键就可以得出特定的对象。indexers用于找出与特定对象相关的资源,如找出某Pod相关的secrets。



k8s中主要的索引函数——IndexFunc

  • MetaNamespaceIndexFunc,在client-go/tools/cache/index.go中,获取对象元数据的namespace字段,即所有对象以namespace为索引键。
  • indexByPodNodeName,在kubernetes/pkg/controller/daemon/daemon_controller.go中,获取该pod对象所在节点的名字(nodename)

代码

// client-go/tools/cache/index.go
// MetaNamespaceIndexFunc is a default index function that indexes based on an object's namespace
func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
	meta, err := meta.Accessor(obj)
	if err != nil {
		return []string{""}, fmt.Errorf("object has no meta: %v", err)
	}
	return []string{meta.GetNamespace()}, nil
}
//kubernetes/pkg/controller/daemon/daemon_controller.go
func indexByPodNodeName(obj interface{}) ([]string, error) {
	pod, ok := obj.(*v1.Pod)
	if !ok {
		return []string{}, nil
	}
	// We are only interested in active pods with nodeName set
	if len(pod.Spec.NodeName) == 0 || pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
		return []string{}, nil
	}
	return []string{pod.Spec.NodeName}, nil
}

cache的keyFunc

上面介绍了cache结构中的cacheStorage是ThreadSafeStore,具体数据结构是threadSafeMap,cache的另一个字段是keyFunc,作用是生成objectKey,如下默认keyFunc

// MetaNamespaceKeyFunc is a convenient default KeyFunc which knows how to make
// keys for API objects which implement meta.Interface.
// The key uses the format <namespace>/<name> unless <namespace> is empty, then
// it's just <name>.
//
// TODO: replace key-as-string with a key-as-struct so that this
// packing/unpacking won't be necessary.
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
	if key, ok := obj.(ExplicitKey); ok {
		return string(key), nil
	}
	meta, err := meta.Accessor(obj)
	if err != nil {
		return "", fmt.Errorf("object has no meta: %v", err)
	}
	if len(meta.GetNamespace()) > 0 {
		return meta.GetNamespace() + "/" + meta.GetName(), nil
	}
	return meta.GetName(), nil
}

利用一个对象计算出来的索引键,然后把所有具备这些索引键的对象全部取出来,为了方便理解我都是这样告诉自己的:比如取出一个Pod所在节点上的所有Pod,这样理解就会非常方便,但是kubernetes可能就不这么用。如果更抽象一点,就是符合对象某些特征的所有对象,而这个特征就是我们指定的索引函数计算出来的。

cache的具体实现

主要是对上述threadSafeStore的封装,不过多解释。

总结:

k8s里的所有对象(pod,node,svc,deploy)等都是有属性、标签的,这些属性标签就是索引键,Indexer就是把具有相同属性、标签的对象放在一个集合中,如果再对属性标签分一下类,也即是我们本文说的Indexer的核心内容了。 甚至我们可以理解为Indexer就是简单的把相同namespace对象放在一个集合中,k8s就是基于属相、标签检索的。

参考