分析 kubernetes 中的事件机制

2020-03-06 21:54:44 +08:00
 silenceper

我们通过 kubectl describe [资源] 命令,可以在看到 Event 输出,并且经常依赖 event 进行问题定位,从 event 中可以分析整个 POD 的运行轨迹,为服务的客观测性提供数据来源,由此可见,event 在 Kubernetes 中起着举足轻重的作用。

event 并不只是 kubelet 中都有的,关于 event 的操作被封装在client-go/tools/record包,我们完全可以在写入自定义的 event。

现在让我们来一步步揭开 event 的面纱。

Event 定义

其实 event 也是一个资源对象,并且通过 apiserver 将 event 存储在 etcd 中,所以我们也可以通过 kubectl get event 命令查看对应的 event 对象。

以下是一个 event 的 yaml 文件:

apiVersion: v1
count: 1
eventTime: null
firstTimestamp: "2020-03-02T13:08:22Z"
involvedObject:
  apiVersion: v1
  kind: Pod
  name: example-foo-d75d8587c-xsf64
  namespace: default
  resourceVersion: "429837"
  uid: ce611c62-6c1a-4bd8-9029-136a1adf7de4
kind: Event
lastTimestamp: "2020-03-02T13:08:22Z"
message: Pod sandbox changed, it will be killed and re-created.
metadata:
  creationTimestamp: "2020-03-02T13:08:30Z"
  name: example-foo-d75d8587c-xsf64.15f87ea1df862b64
  namespace: default
  resourceVersion: "479466"
  selfLink: /api/v1/namespaces/default/events/example-foo-d75d8587c-xsf64.15f87ea1df862b64
  uid: 9fe6f72a-341d-4c49-960b-e185982d331a
reason: SandboxChanged
reportingComponent: ""
reportingInstance: ""
source:
  component: kubelet
  host: minikube
type: Normal

**
主要字段说明:

event 字段定义可以看这里:types.go#L5078

接下来我们来看看,整个 event 是如何下入的。

写入事件

1、这里以 kubelet 为例,看看是如何进行事件写入的

2、文中代码以 Kubernetes 1.17.3 为例进行分析

先以一幅图来看下整个的处理流程

创建操作事件的客户端:
kubelet/app/server.go#L461

// makeEventRecorder sets up kubeDeps.Recorder if it's nil. It's a no-op otherwise.
func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
	if kubeDeps.Recorder != nil {
		return
	}
    //事件广播
	eventBroadcaster := record.NewBroadcaster()
    //创建 EventRecorder
	kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
	//发送 event 至 log 输出
    eventBroadcaster.StartLogging(klog.V(3).Infof)
	if kubeDeps.EventClient != nil {
		klog.V(4).Infof("Sending events to api server.")
        //发送 event 至 apiserver
		eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
	} else {
		klog.Warning("No api server defined - no events will be sent to API server.")
	}
}

通过 makeEventRecorder 创建了 EventRecorder 实例,这是一个事件广播器,通过它提供了 StartLogging 和 StartRecordingToSink 两个事件处理函数,分别将 event 发送给 log 和 apiserver。
NewRecorder创建了 EventRecorder 的实例,它提供了 EventEventf 等方法供事件记录。

EventBroadcaster

我们来看下 EventBroadcaster 接口定义:event.go#L113

// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
type EventBroadcaster interface {
    //
	StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface
	StartRecordingToSink(sink EventSink) watch.Interface
	StartLogging(logf func(format string, args ...interface{})) watch.Interface
	NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder

	Shutdown()
}

具体实现是通过 eventBroadcasterImpl struct 来实现了各个方法。

其中 StartLogging 和 StartRecordingToSink 其实就是完成了对事件的消费,EventRecorder 实现对事件的写入,中间通过 channel 实现了生产者消费者模型。

EventRecorder

我们先来看下EventRecorder 接口定义:event.go#L88,提供了一下 4 个方法

// EventRecorder knows how to record events on behalf of an EventSource.
type EventRecorder interface {
	// Event constructs an event from the given information and puts it in the queue for sending.
	// 'object' is the object this event is about. Event will make a reference-- or you may also
	// pass a reference to the object directly.
	// 'type' of this event, and can be one of Normal, Warning. New types could be added in future
	// 'reason' is the reason this event is generated. 'reason' should be short and unique; it
	// should be in UpperCamelCase format (starting with a capital letter). "reason" will be used
	// to automate handling of events, so imagine people writing switch statements to handle them.
	// You want to make that easy.
	// 'message' is intended to be human readable.
	//
	// The resulting event will be created in the same namespace as the reference object.
	Event(object runtime.Object, eventtype, reason, message string)

	// Eventf is just like Event, but with Sprintf for the message field.
	Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})

	// PastEventf is just like Eventf, but with an option to specify the event's 'timestamp' field.
	PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{})

	// AnnotatedEventf is just like eventf, but with annotations attached
	AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
}

主要参数说明:

我们来看下当我们调用 Event(object runtime.Object, eventtype, reason, message string) 的整个过程。
发现最终都调用到了 generateEvent 方法:event.go#L316

func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) {
    .....
	event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
	event.Source = recorder.source
	go func() {
		// NOTE: events should be a non-blocking operation
		defer utilruntime.HandleCrash()
		recorder.Action(watch.Added, event)
	}()
}

最终事件在一个 goroutine 中通过调用 recorder.Action 进入处理,这里保证了每次调用 event 方法都是非阻塞的。
其中 makeEvent 的作用主要是构造了一个 event 对象,事件 name 根据 InvolvedObject 中的 name 加上时间戳生成:

注意看:对于一些非 namespace 资源产生的 event,event 的 namespace 是 default

func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event {
	t := metav1.Time{Time: recorder.clock.Now()}
	namespace := ref.Namespace
	if namespace == "" {
		namespace = metav1.NamespaceDefault
	}
	return &v1.Event{
		ObjectMeta: metav1.ObjectMeta{
			Name:        fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),
			Namespace:   namespace,
			Annotations: annotations,
		},
		InvolvedObject: *ref,
		Reason:         reason,
		Message:        message,
		FirstTimestamp: t,
		LastTimestamp:  t,
		Count:          1,
		Type:           eventtype,
	}
}

进一步跟踪Action方法,apimachinery/blob/master/pkg/watch/mux.go#L188:23

// Action distributes the given event among all watchers.
func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
	m.incoming <- Event{action, obj}
}

将 event 写入到了一个 channel 里面。
注意:
这个 Action 方式是apimachinery包中的方法,因为实现的 sturt recorderImpl
*watch.Broadcaster 作为一个匿名 struct,并且在 NewRecorder 进行 Broadcaster 赋值,这个Broadcaster其实就是 eventBroadcasterImpl 中的Broadcaster

到此,基本清楚了 event 最终被写入到了 Broadcaster 中的 incoming channel 中,下面看下是怎么进行消费的。

消费事件

makeEventRecorder 调用的 StartLoggingStartRecordingToSink 其实就是完成了对事件的消费。

两个方法内部都调用了 StartEventWatcher 方法,并且传入一个 eventHandler 方法对 event 进行处理

func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
	watcher := e.Watch()
	go func() {
		defer utilruntime.HandleCrash()
		for watchEvent := range watcher.ResultChan() {
			event, ok := watchEvent.Object.(*v1.Event)
			if !ok {
				// This is all local, so there's no reason this should
				// ever happen.
				continue
			}
			eventHandler(event)
		}
	}()
	return watcher
}

其中 watcher.ResultChan 方法就拿到了事件,这里是在一个 goroutine 中通过func (m *Broadcaster) loop() ==>func (m *Broadcaster) distribute(event Event) 方法调用将 event 又写入了broadcasterWatcher.result

主要看下 StartRecordingToSink 提供的的eventHandlerrecordToSink 方法:

func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, sleepDuration time.Duration) {
	// Make a copy before modification, because there could be multiple listeners.
	// Events are safe to copy like this.
	eventCopy := *event
	event = &eventCopy
	result, err := eventCorrelator.EventCorrelate(event)
	if err != nil {
		utilruntime.HandleError(err)
	}
	if result.Skip {
		return
	}
	tries := 0
	for {
		if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
			break
		}
		tries++
		if tries >= maxTriesPerEvent {
			klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
			break
		}
		// Randomize the first sleep so that various clients won't all be
		// synced up if the master goes down.
        // 第一次重试增加随机性,防止 apiserver 重启的时候所有的事件都在同一时间发送事件
		if tries == 1 {
			time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64()))
		} else {
			time.Sleep(sleepDuration)
		}
	}
}

其中 event 被经过了一个 eventCorrelator.EventCorrelate(event) 方法做预处理,主要是聚合相同的事件(避免产生的事件过多,增加 etcd 和 apiserver 的压力,也会导致查看 pod 事件很不清晰)

下面一个 for 循环就是在进行重试,最大重试次数是 12 次,调用 recordEvent 方法才真正将 event 写入到了 apiserver。

事件处理

我们来看下EventCorrelate方法:

// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
	if newEvent == nil {
		return nil, fmt.Errorf("event is nil")
	}
	aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
	observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
	if c.filterFunc(observedEvent) {
		return &EventCorrelateResult{Skip: true}, nil
	}
	return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
}

分别调用了 aggregator.EventAggregatelogger.eventObservefilterFunc 三个方法,分别作用是:

  1. aggregator.EventAggregate:聚合 event,如果在最近 10 分钟出现过 10 个相似的事件(除了 message 和时间戳之外其他关键字段都相同的事件),aggregator 会把它们的 message 设置为 (combined from similar events)+event.Message
  2. logger.eventObserve:它会把相同的事件以及包含 aggregator 被聚合了的相似的事件,通过增加 Count 字段来记录事件发生了多少次。
  3. filterFunc: 这里实现了一个基于令牌桶的限流算法,如果超过设定的速率则丢弃,保证了 apiserver 的安全。

我们主要来看下aggregator.EventAggregate方法:

func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) {
	now := metav1.NewTime(e.clock.Now())
	var record aggregateRecord
	// eventKey is the full cache key for this event
    //eventKey 是将除了时间戳外所有字段结合在一起
	eventKey := getEventKey(newEvent)
	// aggregateKey is for the aggregate event, if one is needed.
    //aggregateKey 是除了 message 和时间戳外的字段结合在一起,localKey 是 message
	aggregateKey, localKey := e.keyFunc(newEvent)

	// Do we have a record of similar events in our cache?
	e.Lock()
	defer e.Unlock()
    //从 cache 中根据 aggregateKey 查询是否存在,如果是相同或者相类似的事件会被放入 cache 中
	value, found := e.cache.Get(aggregateKey)
	if found {
		record = value.(aggregateRecord)
	}

    //判断上次事件产生的时间是否超过 10 分钟,如何操作则重新生成一个 localKeys 集合(集合中存放 message )
	maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
	interval := now.Time.Sub(record.lastTimestamp.Time)
	if interval > maxInterval {
		record = aggregateRecord{localKeys: sets.NewString()}
	}

	// Write the new event into the aggregation record and put it on the cache
    //将 locakKey 也就是 message 放入集合中,如果 message 相同就是覆盖了
	record.localKeys.Insert(localKey)
	record.lastTimestamp = now
	e.cache.Add(aggregateKey, record)

	// If we are not yet over the threshold for unique events, don't correlate them
	//判断 localKeys 集合中存放的类似事件是否超过 10 个,
    if uint(record.localKeys.Len()) < e.maxEvents {
		return newEvent, eventKey
	}

	// do not grow our local key set any larger than max
	record.localKeys.PopAny()

	// create a new aggregate event, and return the aggregateKey as the cache key
	// (so that it can be overwritten.)
	eventCopy := &v1.Event{
		ObjectMeta: metav1.ObjectMeta{
			Name:      fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()),
			Namespace: newEvent.Namespace,
		},
		Count:          1,
		FirstTimestamp: now,
		InvolvedObject: newEvent.InvolvedObject,
		LastTimestamp:  now,
        //这里会对 message 加个前缀:(combined from similar events):
		Message:        e.messageFunc(newEvent),
		Type:           newEvent.Type,
		Reason:         newEvent.Reason,
		Source:         newEvent.Source,
	}
	return eventCopy, aggregateKey
}

aggregator.EventAggregate方法中其实就是判断了通过 cache 和 localKeys 判断事件是否相似,如果最近 10 分钟出现过 10 个相似的事件就合并并加上前缀,后续通过logger.eventObserve方法进行 count 累加,如果 message 也相同,肯定就是直接 count++。

总结

好了,event 处理的整个流程基本就是这样,我们可以概括一下,可以结合文中的图对比一起看下:

  1. 创建 EventRecorder 对象,通过其提供的 Event 等方法,创建好 event 对象
  2. 将创建出来的对象发送给 EventBroadcaster 中的 channel 中
  3. EventBroadcaster 通过后台运行的 goroutine,从管道中取出事件,并广播给提前注册好的 handler 处理
  4. 当输出 log 的 handler 收到事件就直接打印事件
  5. EventSink handler 收到处理事件就通过预处理之后将事件发送给 apiserver
  6. 其中预处理包含三个动作,1、限流 2、聚合 3、计数
  7. apiserver 收到事件处理之后就存储在 etcd 中

回顾 event 的整个流程,可以看到 event 并不是保证 100%事件写入(从预处理的过程来看),这样做是为了后端服务 etcd 的可用性,因为 event 事件在整个集群中产生是非常频繁的,尤其在服务不稳定的时候,而相比 Deployment,Pod 等其他资源,又没那么的重要。所以这里做了个取舍。

参考文档:

原文地址:https://silenceper.com/blog/202003/kubernetes-event/

3704 次点击
所在节点    Kubernetes
2 条回复
better0332
2020-03-08 22:36:54 +08:00
注意 k8s event 只保留 1 小时
silenceper
2020-03-18 22:13:52 +08:00
@better0332 是的

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/650548

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX