kubernetes 实现 list-watch 的底层原理

发布时间 2023-07-13 16:41:28作者: 偶尔发呆

我们都知道,controller-manager, scheduler, kubelet 会向 apiserver 监听感兴趣的对象,当监听对象的内容或状态发生变化后,对应的事件会立即推送到监听者。借由这套事件通知机制,kubernetes 才能良好地运转。那么这套事件通知机制是如何实现并驱动的呢?

1. etcd

在 k8s 中,apiserver 是 etcd 数据的出入口,其他组件通过 apiserver 提供的接口访问 etcd 中存储的数据,etcd 提供了 watch api,当 watch 的 key 发生变化后,etcd 会立即通知客户端。

开启一个终端监听 stock1 的 key

etcdctl watch stock1

开启一个终端设值

etcdctl put stock1 1000

 

 开启一个终端监听前缀为 stock 的 key

etcdctl watch stock --prefix

开启一个终端设值

etcdctl put stock1 10
etcdctl put stock2 20

 

通过上面 2 个示例,我们大致了解了 etcd watch 的用法,可以猜想到 apisever 也是通过 watch api 监听 etcd 中存储的 pod, depolyment 等数据,etcd 将变化的数据推送到 apiserver,然后 apiserver 将这些数据分发给其他系统组件。

2. apiserver 之于 etcd

从某种角度看,apiserver 之于 etcd,犹如 mybatis 之于 mysql,本文借助一个测试用例描述 apiserver 是如何 watch 数据的。

// staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go:134
func TestDeleteTriggerWatch(t *testing.T) {
  // 启动 etcd
  ctx, store, _ := testSetup(t)
  // 写入一个 pod 数据
  key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
  // 开始监听
  w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
  if err != nil {
    t.Fatalf("Watch failed: %v", err)
  }
  // 删除 pod 数据
  if err := store.Delete(ctx, key, &example.Pod{}, nil, storage.ValidateAllObjectFunc, nil); err != nil {
    t.Fatalf("Delete failed: %v", err)
  }
  // 检测收到删除的事件
  testCheckEventType(t, watch.Deleted, w)
}

测试用例的流程如下:

 

 对应的数据结构如下图:

 

 代码执行流如下图:

 
上图中,事件交互的起点在 watch 和 run,详细分析源码:
 1 //  startWatching does:
 2 // - get current objects if initialRev=0; set initialRev to current rev
 3 // - watch on given key and send events to process.
 4 func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
 5   if wc.initialRev == 0 {
 6     if err := wc.sync(); err != nil {
 7       klog.Errorf("failed to sync with latest state: %v", err)
 8       wc.sendError(err)
 9       return
10     }
11   }
12   opts := []clientv3.OpOption{clientv3.WithRev(wc.initialRev + 1), clientv3.WithPrevKV()}
13   if wc.recursive {
14     opts = append(opts, clientv3.WithPrefix())
15   }
16   if wc.progressNotify {
17     opts = append(opts, clientv3.WithProgressNotify())
18   }
19   wch := wc.watcher.client.Watch(wc.ctx, wc.key, opts...)
20   for wres := range wch {
21     if wres.Err() != nil {
22       err := wres.Err()
23       // If there is an error on server (e.g. compaction), the channel will return it before closed.
24       logWatchChannelErr(err)
25       wc.sendError(err)
26       return
27     }
28     if wres.IsProgressNotify() {
29       wc.sendEvent(progressNotifyEvent(wres.Header.GetRevision()))
30       metrics.RecordEtcdBookmark(wc.watcher.objectType)
31       continue
32     }
33 
34     for _, e := range wres.Events {
35       parsedEvent, err := parseEvent(e)
36       if err != nil {
37         logWatchChannelErr(err)
38         wc.sendError(err)
39         return
40       }
41       wc.sendEvent(parsedEvent)
42     }
43   }
44   // When we come to this point, it's only possible that client side ends the watch.
45   // e.g. cancel the context, close the client.
46   // If this watch chan is broken and context isn't cancelled, other goroutines will still hang.
47   // We should notify the main thread that this goroutine has exited.
48   close(watchClosedCh)
49 }

在 19 行 watch 指定的 key,然后 for 循环遍历只读的 channel,并解析 etcd 传回的事件。

 1 func (wc *watchChan) sendEvent(e *event) {
 2   if len(wc.incomingEventChan) == incomingBufSize {
 3     klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow decoding, user not receiving fast, or other processing logic", "incomingEvents", incomingBufSize, "objectType", wc.watcher.objectType)
 4   }
 5   select {
 6   case wc.incomingEventChan <- e:
 7   case <-wc.ctx.Done():
 8   }
 9 }
10 
11 // processEvent processes events from etcd watcher and sends results to resultChan.
12 func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
13   defer wg.Done()
14 
15   for {
16     select {
17     case e := <-wc.incomingEventChan:
18       res := wc.transform(e)
19       if res == nil {
20         continue
21       }
22       if len(wc.resultChan) == outgoingBufSize {
23         klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", wc.watcher.objectType)
24       }
25       // If user couldn't receive results fast enough, we also block incoming events from watcher.
26       // Because storing events in local will cause more memory usage.
27       // The worst case would be closing the fast watcher.
28       select {
29       case wc.resultChan <- *res:
30       case <-wc.ctx.Done():
31         return
32       }
33     case <-wc.ctx.Done():
34       return
35     }
36   }
37 }

事件流经 incomingEventChan 后最终来到了 resultChan,能想象到,如果谁对事件感兴趣,直接消费 resultChan 这个 channel 即可。

 1 // watchChan implements watch.Interface.
 2 type watchChan struct {
 3   // 省略其他属性
 4   watcher           *watcher
 5   incomingEventChan chan *event
 6   resultChan        chan watch.Event
 7   errChan           chan error
 8 }
 9 
10 func (wc *watchChan) ResultChan() <-chan watch.Event {
11   return wc.resultChan
12 }

ResultChan() 方法返回了 resultChan,因此在这个方法中断点,就能知道谁对这个事件感兴趣,于是引出了 cacher 数据结构。

 3. cacher

终于到了缓存,apiserver 在内存中对数据进行了缓存,避免过度读 etcd。以 "kubectl get namespaces" 为例, 在 apiserver 读取 namespace 缓存的代码处断点,调用栈如下:

 从 GetList 的注释可以看出,当指定了 resource version 后,走缓存读取数据,保证返回值的版本大于等于指定的版本,否则仍然是读取 etcd。继续看缓存的数据结构,调用栈如下,结合调用栈,我们清晰地看到所谓缓存,最终也是一个 map。

 OK 缓存的运行时代码走读到此为止,我们继续看缓存的创建,经过一番探查,定位到了缓存的创建时机点。

// vendor/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go:37
var ret RESTOptions
ret.Decorator = genericregistry.StorageWithCacher()

4. watch 

打开一个终端,执行命令 "kubectl get namespaces -w",则对 namespaces 进行了 list 和 watch,当 namepaces 发生变化时,变化会推送到终端,这是怎么实现的呢?

运行时的事件传播机制非常繁琐,一环套一环,简洁地讲,cacher 需要消费第3节中提到的 watchChan.resultChan,接收到来自 etcd 的事件,然后将事件传播给 cacheWatcher, 最后 http handler 处理该事件以 http chunked 的形式将事件转换后发送给 http 客户端。

打开一个终端 watch namespaces,开另一个终端创建新的 namespace,在关键代码处加上条件断点(event.Type == "ADDED"),通过走读源代码发现:cacher 消费了 watchChan.resultChan,完整的事件流路径为:

watchChan.resultChan -> Cacher.incoming 
  -> cacheWatcher.input -> cacheWatcher.result

读取 watchChan.resultChan channel 的方法调用栈如下:

 接下来的事件流转如下:

 1 // vendor/k8s.io/apiserver/pkg/storage/cacher/cacher.go:749
 2 func (c *Cacher) processEvent(event *watchCacheEvent) {
 3   if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) {
 4     // Monitor if this gets backed up, and how much.
 5     klog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.objectType.String(), curLen)
 6   }
 7   c.incoming <- *event
 8 }
 9 
10 // 中间经过 dispatchEvents
11 // vendor/k8s.io/apiserver/pkg/storage/cacher/cacher.go:757
12 
13 // vendor/k8s.io/apiserver/pkg/storage/cacher/cacher.go:1204
14 func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
15   select {
16   case c.input <- event:
17     return true
18   default:
19     return false
20   }
21 }
22 
23 // vendor/k8s.io/apiserver/pkg/storage/cacher/cacher.go:1415
24 func (c *cacheWatcher) process(ctx context.Context, resourceVersion uint64) {
25   // At this point we already start processing incoming watch events.
26   // However, the init event can still be processed because their serialization
27   // and sending to the client happens asynchrnously.
28   // TODO: As describe in the KEP, we would like to estimate that by delaying
29   //   the initialization signal proportionally to the number of events to
30   //   process, but we're leaving this to the tuning phase.
31   utilflowcontrol.WatchInitialized(ctx)
32 
33   for {
34     select {
35     case event, ok := <-c.input:
36       if !ok {
37         return
38       }
39       // only send events newer than resourceVersion
40       if event.ResourceVersion > resourceVersion {
41         c.sendWatchCacheEvent(event)
42       }
43     case <-ctx.Done():
44       return
45     }
46   }
47 }
48 
49 // vendor/k8s.io/apiserver/pkg/storage/cacher/cacher.go:1319
50 // NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!
51 func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
52   watchEvent := c.convertToWatchEvent(event)
53   if watchEvent == nil {
54     // Watcher is not interested in that object.
55     return
56   }
57 
58   // We need to ensure that if we put event X to the c.result, all
59   // previous events were already put into it before, no matter whether
60   // c.done is close or not.
61   // Thus we cannot simply select from c.done and c.result and this
62   // would give us non-determinism.
63   // At the same time, we don't want to block infinitely on putting
64   // to c.result, when c.done is already closed.
65   //
66   // This ensures that with c.done already close, we at most once go
67   // into the next select after this. With that, no matter which
68   // statement we choose there, we will deliver only consecutive
69   // events.
70   select {
71   case <-c.done:
72     return
73   default:
74   }
75 
76   select {
77   case c.result <- *watchEvent:
78   case <-c.done:
79   }
80 }
81 
82 // Implements watch.Interface.
83 func (c *cacheWatcher) ResultChan() <-chan watch.Event {
84   return c.result
85 }

执行上面的代码后,事件来到了 cacheWatcher.result,在 http handler 中消费 cacheWatcher.result 中的事件,将变化事件发送给 http 客户端。

  1 // vendor/k8s.io/apiserver/pkg/endpoints/handlers/watch.go:164
  2 // ServeHTTP serves a series of encoded events via HTTP with Transfer-Encoding: chunked
  3 // or over a websocket connection.
  4 func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
  5   kind := s.Scope.Kind
  6   metrics.RegisteredWatchers.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc()
  7   defer metrics.RegisteredWatchers.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Dec()
  8 
  9   if wsstream.IsWebSocketRequest(req) {
 10     w.Header().Set("Content-Type", s.MediaType)
 11     websocket.Handler(s.HandleWS).ServeHTTP(w, req)
 12     return
 13   }
 14 
 15   flusher, ok := w.(http.Flusher)
 16   if !ok {
 17     err := fmt.Errorf("unable to start watch - can't get http.Flusher: %#v", w)
 18     utilruntime.HandleError(err)
 19     s.Scope.err(errors.NewInternalError(err), w, req)
 20     return
 21   }
 22 
 23   framer := s.Framer.NewFrameWriter(w)
 24   if framer == nil {
 25     // programmer error
 26     err := fmt.Errorf("no stream framing support is available for media type %q", s.MediaType)
 27     utilruntime.HandleError(err)
 28     s.Scope.err(errors.NewBadRequest(err.Error()), w, req)
 29     return
 30   }
 31 
 32   var e streaming.Encoder
 33   var memoryAllocator runtime.MemoryAllocator
 34 
 35   if encoder, supportsAllocator := s.Encoder.(runtime.EncoderWithAllocator); supportsAllocator {
 36     memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator)
 37     defer runtime.AllocatorPool.Put(memoryAllocator)
 38     e = streaming.NewEncoderWithAllocator(framer, encoder, memoryAllocator)
 39   } else {
 40     e = streaming.NewEncoder(framer, s.Encoder)
 41   }
 42 
 43   // ensure the connection times out
 44   timeoutCh, cleanup := s.TimeoutFactory.TimeoutCh()
 45   defer cleanup()
 46 
 47   // begin the stream
 48   w.Header().Set("Content-Type", s.MediaType)
 49   w.Header().Set("Transfer-Encoding", "chunked")
 50   w.WriteHeader(http.StatusOK)
 51   flusher.Flush()
 52 
 53   var unknown runtime.Unknown
 54   internalEvent := &metav1.InternalEvent{}
 55   outEvent := &metav1.WatchEvent{}
 56   buf := &bytes.Buffer{}
 57   ch := s.Watching.ResultChan()
 58   done := req.Context().Done()
 59 
 60   embeddedEncodeFn := s.EmbeddedEncoder.Encode
 61   if encoder, supportsAllocator := s.EmbeddedEncoder.(runtime.EncoderWithAllocator); supportsAllocator {
 62     if memoryAllocator == nil {
 63       // don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call.
 64       // instead, we allocate the buffer for the entire watch session and release it when we close the connection.
 65       memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator)
 66       defer runtime.AllocatorPool.Put(memoryAllocator)
 67     }
 68     embeddedEncodeFn = func(obj runtime.Object, w io.Writer) error {
 69       return encoder.EncodeWithAllocator(obj, w, memoryAllocator)
 70     }
 71   }
 72 
 73   for {
 74     select {
 75     case <-done:
 76       return
 77     case <-timeoutCh:
 78       return
 79     case event, ok := <-ch:
 80       if !ok {
 81         // End of results.
 82         return
 83       }
 84       metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc()
 85 
 86       obj := s.Fixup(event.Object)
 87       if err := embeddedEncodeFn(obj, buf); err != nil {
 88         // unexpected error
 89         utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", obj, err))
 90         return
 91       }
 92 
 93       // ContentType is not required here because we are defaulting to the serializer
 94       // type
 95       unknown.Raw = buf.Bytes()
 96       event.Object = &unknown
 97       metrics.WatchEventsSizes.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Observe(float64(len(unknown.Raw)))
 98 
 99       *outEvent = metav1.WatchEvent{}
100 
101       // create the external type directly and encode it.  Clients will only recognize the serialization we provide.
102       // The internal event is being reused, not reallocated so its just a few extra assignments to do it this way
103       // and we get the benefit of using conversion functions which already have to stay in sync
104       *internalEvent = metav1.InternalEvent(event)
105       err := metav1.Convert_v1_InternalEvent_To_v1_WatchEvent(internalEvent, outEvent, nil)
106       if err != nil {
107         utilruntime.HandleError(fmt.Errorf("unable to convert watch object: %v", err))
108         // client disconnect.
109         return
110       }
111       if err := e.Encode(outEvent); err != nil {
112         utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v (%#v)", outEvent, err, e))
113         // client disconnect.
114         return
115       }
116       if len(ch) == 0 {
117         flusher.Flush()
118       }
119 
120       buf.Reset()
121     }
122   }
123 }

对应的调用栈如下图,清楚地看到:

// s.Watching.ResultChan() 即 cacheWatcher.result
 ch := s.Watching.ResultChan() 

 总结,以上的这些过程可以用下图表示:

 

 客户端发起 list and watch,本质是一个 http get 请求,watch 参数为 true,具体代码逻辑在

// staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go:169
func ListResource(...)

客户端和 apiserver 之间的使用的是 http 长连接,apiserver 设置响应头 Transfer-Encoding 为 chunked,并保持住连接,apiserver 首先把 list 数据返回给客户端后,后续变化的事件以 chunked 块推送给客户端,这是一种长轮询的解法。

 

笔记发布在公众号上,没什么人看,链接:https://mp.weixin.qq.com/s?__biz=MzA3ODQwMDk5Mg==&mid=2247483730&idx=1&sn=02e7f5db7697b4a674371ad16272b67b&chksm=9f421ddda83594cb6aa89801dc62a2d22193e264ceefd0c039c301ca64a63265e8d36bf05620&token=1311797829&lang=zh_CN#rd