Kubernetes事件是一种资源对象,用于记录和展示集群内发生的情况,kubernetes中的各个组件会将运行时发生的事件上报给kube-apiserver。然后我们可以通过 kubectl get events或者使用kubectl describe
由于Kubernetes是一种资源对象,它的所有信息都会存储在Etcd中,为了减少磁盘的开销,故强制执行保留策略,即在最后一次事件发生后,删除1小时之前发生的事件。
Kubernetes系统以Pod资源为核心,Deployment、StatefulSet、ReplicaSet、DaemonSet、CronJob等,最终都会创建出Pod。因此Kubernetes事件也是围绕Pod进行的,在Pod生命周期内的关键步骤中都会产生事件消息。Event资源数据结构体定义在core资源组下,代码示例如下(源码:staging\src\k8s.io\api\core\v1\types.go):
type Event struct {metav1.TypeMeta `json:",inline"`metav1.ObjectMeta `json:"metadata" protobuf:"bytes,1,opt,name=metadata"`InvolvedObject ObjectReference `json:"involvedObject" protobuf:"bytes,2,opt,name=involvedObject"`Reason string `json:"reason,omitempty" protobuf:"bytes,3,opt,name=reason"`Message string `json:"message,omitempty" protobuf:"bytes,4,opt,name=message"`Source EventSource `json:"source,omitempty" protobuf:"bytes,5,opt,name=source"`FirstTimestamp metav1.Time `json:"firstTimestamp,omitempty" protobuf:"bytes,6,opt,name=firstTimestamp"`LastTimestamp metav1.Time `json:"lastTimestamp,omitempty" protobuf:"bytes,7,opt,name=lastTimestamp"`Count int32 `json:"count,omitempty" protobuf:"varint,8,opt,name=count"`Type string `json:"type,omitempty" protobuf:"bytes,9,opt,name=type"`EventTime metav1.MicroTime `json:"eventTime,omitempty" protobuf:"bytes,10,opt,name=eventTime"`Series *EventSeries `json:"series,omitempty" protobuf:"bytes,11,opt,name=series"`Action string `json:"action,omitempty" protobuf:"bytes,12,opt,name=action"`Related *ObjectReference `json:"related,omitempty" protobuf:"bytes,13,opt,name=related"`ReportingController string `json:"reportingComponent" protobuf:"bytes,14,opt,name=reportingComponent"`ReportingInstance string `json:"reportingInstance" protobuf:"bytes,15,opt,name=reportingInstance"`}
Event资源数据结构体描述了当前时间段内发生了哪些关键性事件。事件有两种类型,分别为Normal和Warning,前者为正常事件,后者为警告事件。代码示例如下:
const (EventTypeNormal string = "Normal"EventTypeWarning string = "Warning")
其中:
- Normal是正常事件
- Warning是警告事件
EventBroadcaster
在kubernetes中,事件是通过EventBroadcaster来进行管理的。它主要包含:
- EventRecorder:事件(Event)生产者,也称为事件记录器。Kubernetes系统组件通过EventRecorder记录关键性事件。
- EventBroadcaster:事件(Event)消费者,也称为事件广播器。EventBroadcaster消费EventRecorder记录的事件并将其分发给目前所有已连接的broadcasterWatcher。分发过程有两种机制,分别是非阻塞(Non-Blocking)分发机制和阻塞(Blocking)分发机制。
- broadcasterWatcher:观察者(Watcher)管理,用于定义事件的处理方式,例如上报事件至Kubernetes API Server。
EventRecorder
EventRecorder是事件记录器,其接口代码如下(源码:staging\src\k8s.io\client-go\tools\events\interfaces.go):
type EventRecorder interface {Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{})}
其中Eventf就是使用fmt.Sprintf格式化输出事件的格式。
EventBroadcaster
EventBroadcaster是事件消费者,消费EventRecorder记录的事件并将其分发给目前所有已连接的broadcasterWatcher。EventBroadcaster通过NewBroadcaster函数进行实例化(源码:staging\src\k8s.io\client-go\tools\events\event_broadcaster.go):
func NewBroadcaster(sink EventSink) EventBroadcaster {return newBroadcaster(sink, defaultSleepDuration, map[eventKey]*eventsv1.Event{})}
在实例化过程中,会通过watch.NewBroadcaster函数在内部启动goroutine(即m.loop函数)来监控m.incoming,并将监控的事件通过m.distribute函数分发给所有已连接的broadcasterWatcher。
过程如下:
(1)调用newBroadcaster,代码如下(源码:staging\src\k8s.io\client-go\tools\events\event_broadcaster.go):
func newBroadcaster(sink EventSink, sleepDuration time.Duration, eventCache map[eventKey]*eventsv1.Event) EventBroadcaster {return &eventBroadcasterImpl{Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),eventCache: eventCache,sleepDuration: sleepDuration,sink: sink,}}
(2)然后watch.NewBroadcaster会启动goroutine,如下(源码:staging\src\k8s.io\apimachinery\pkg\watch\mux.go):
func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {m := &Broadcaster{watchers: map[int64]*broadcasterWatcher{},incoming: make(chan Event, incomingQueueLength),watchQueueLength: queueLength,fullChannelBehavior: fullChannelBehavior,}m.distributing.Add(1)go m.loop()return m}
(3)m.loop()会监控m.incoming,获取事件,代码如下(源码:staging\src\k8s.io\apimachinery\pkg\watch\mux.go):
func (m *Broadcaster) loop() {// Deliberately not catching crashes here. Yes, bring down the process if there's a// bug in watch.Broadcaster.for event := range m.incoming {if event.Type == internalRunFunctionMarker {event.Object.(functionFakeRuntimeObject)()continue}m.distribute(event)}m.closeAll()m.distributing.Done()}
(4)获取到事件后就传递给m.distribute(event),再传递给broadcasterWatcher,代码如下:
func (m *Broadcaster) distribute(event Event) {m.lock.Lock()defer m.lock.Unlock()if m.fullChannelBehavior == DropIfChannelFull {for _, w := range m.watchers {select {case w.result <- event:case <-w.stopped:default: // Don't block if the event can't be queued.}}} else {for _, w := range m.watchers {select {case w.result <- event:case <-w.stopped:}}}}
其中m.watchers是map[int64]*broadcasterWatcher类型。
分发过程有两种机制,分别是非阻塞分发机制和阻塞分发机制。在非阻塞分发机制下使用DropIfChannelFull标识,在阻塞分发机制下使用WaitIfChannelFull标识,默认为DropIfChannelFull标识。
broadcasterWatcher
broadcasterWatcher是每个Kubernetes系统组件自定义处理事件的方式。它拥有两种自定义处理事件的函数,分别介绍如下:
- StartLogging:将事件写入日志中。
- StartRecordingToSink:将事件上报至Kubernetes API Server并存储至Etcd集群。
那么整个执行的流程如下:
