k8s的事件告警

对kubernetes的事件进行收集、存储并报警

背景:

随着企业内越来越多的服务迁移到了kubernetes上,用户对kubernetes相关信息的获取更加迫切,他们想知道自己的服务实例都“经历过”什么,比如在有业务异常的时候,他们想要知道自己的实例是否被重启过,在上线失败的时候他们也希望第一时间知道原因。更进一步,他们希望这些都能实时通知并且能事后查看。但通常他们又无法直接接触集群服务器,包括kubectl等。因此需要有一个好的解决方案。

需求拆解:

上述需求可以拆解为如下四个基本功能:

  1. 能够实时获取kubernetes上的事件信息
  2. 能够对异常事件做实时报警通知
  3. 能够对事件做更持久的保留以便问题追溯
  4. 能够对保存的事件进行查询

这些功能听起来都停合理的,而且kubernetes.io里有一篇文章Events in Stackdriver也讲到了事件的存储。

它提到:“事件作为kubernetes的一个对象资源,有助于排错,但大量的事件如果都存储在etcd中,会带来较大的性能与容量压力,所以etcd中默认只保存最近1小时的。因此需要一个第三方的工具来更持久的保存它们。”

具体实现而言,就是对于1.7+版本,在Google Kubernetes Engine上启用了cloud logging后,默认会在集群内部署一个event-exporter,它将使用最低优先级且尽可能少的资源(通常0.1核的CPU和0.1G的内存)。

听起来很不错,但只有功能1,3,4没有对应的报警。而且很显然未使用GKE的也无法直接使用。

再次查了一些开源项目,如kubewatch 以及它背后的实现自定义控制器 在这里快速回顾一下如何自己实现一个控制器:

  1. 通过cache.NewListWatchFromClient新建一个listwatcher
  2. 通过cache.NewSharedIndexInformer+ListWatcher新建一个informer
  3. 通过workqueue新建一个RateLimitingQueue或其他类型的队列
  4. 对informer添加handler,如OnAdd等将事件信息放入到队列中
  5. 启动informer
  6. 等待数据同步到本地缓存完成
  7. 启动worker持续消费queue中的事件,进行处理如过滤、报警和存储等。

但kubewatch并不能直接满足要求,它监听的是各种资源类型如pod,service,deployment等,但唯独却缺少Event类型,对不同类型它都启动一个informer并且不同类型资源的事件格式都不同,为兼容它只传递了通用信息,字段很少。因此对其再简化处理,只监听Event类型即可并对所有字段都做解析。 如:

	lw := cache.NewListWatchFromClient(
		kubeClient.CoreV1().RESTClient(), // 客户端
		"events",                         // 被监控资源类型
		"",                               // 被监控命名空间,为空则是所有
		fields.Everything())              // 选择器,这里全要

	informer := cache.NewSharedIndexInformer(lw, &api_v1.Event{}, 0, cache.Indexers{})
	c := controller.NewResourceController(kubeClient, informer, config)
func NewResourceController(client kubernetes.Interface, informer cache.SharedIndexInformer, config config.Config) *Controller {
	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
	var newEvent Event
	var err error
	informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			evt := obj.(*api_v1.Event)
			newEvent.Key, err = cache.MetaNamespaceKeyFunc(obj)
			if err == nil {
				logrus.Debugf("准备写入队列,类型为:CreateEvent,详情为:%+v", evt)
				queue.Add(newEvent.Key)
			}
		},
		UpdateFunc: func(old, new interface{}) {
			oldEvt := old.(*api_v1.Event)
			newEvt := new.(*api_v1.Event)
			if !reflect.DeepEqual(newEvt.Source, oldEvt.Source) && oldEvt.Reason != newEvt.Reason {
				newEvent.Obj = newEvt
				newEvent.Key, err = cache.MetaNamespaceKeyFunc(old)
				if err == nil {
					queue.Add(newEvent.Key)
				}
			}
		},
		DeleteFunc: func(obj interface{}) {
			evt := obj.(*api_v1.Event)
			flog.Log().Infof("%+v", evt)
			newEvent.Obj = evt
			newEvent.Type = DeleteEvent
			newEvent.Key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
			if err == nil {
				logrus.Debugf("不写入队列,类型为:DeleteEvent,详情为:%+v", evt)
			}
		},
	})
	return &Controller{
		clientset: client,
		informer:  informer,
		queue:     queue,
		config:    config,
	}
}
// Run starts the kubewatch controller
func (c *Controller) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	defer c.queue.ShutDown()
	go c.informer.Run(stopCh)
	if !cache.WaitForCacheSync(stopCh, c.HasSynced) {
		utilruntime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
		return
	}
	wait.Until(c.runWorker, time.Second, stopCh)
}

这里需要说明的是对于event对象的删除事件,通常是对过期事件的清理,直接丢弃即可。