kube-scheduler 启动分析

发布时间 2023-08-21 15:19:36作者: 偶尔发呆

先看一段 kubernetes scheduler 的描述:

The Kubernetes scheduler is a control plane process which assigns Pods to Nodes. 
The scheduler determines which Nodes are valid placements for each Pod in the scheduling queue according to constraints and available resources. 
The scheduler then ranks each valid Node and binds the Pod to a suitable Node. 
Multiple different schedulers may be used within a cluster; kube-scheduler is the reference implementation. 

简单地讲:kube-scheduler 监听 pod 和 node,负责把 pod 分配给 node。

在 cmd/kube-scheduler/app/server.go:298 的 Setup 函数中断点,跟读代码

// pkg/scheduler/scheduler.go:395
//
NewInformerFactory creates a SharedInformerFactory and initializes a scheduler specific // in-place podInformer. func NewInformerFactory(cs clientset.Interface, resyncPeriod time.Duration) informers.SharedInformerFactory { informerFactory := informers.NewSharedInformerFactory(cs, resyncPeriod) informerFactory.InformerFor(&v1.Pod{}, newPodInformer) return informerFactory }

informerFactory 负责管理和创建 informer,而 informer 则是 node 和 pod 等资源的监听者。informerFactory 维持一个 map,把所有的 informer 放在该 map 中:

// vendor/k8s.io/client-go/informers/factory.go:164
// InternalInformerFor returns the SharedIndexInformer for obj using an internal // client. func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() informerType := reflect.TypeOf(obj) informer, exists := f.informers[informerType] if exists { return informer } resyncPeriod, exists := f.customResync[informerType] if !exists { resyncPeriod = f.defaultResync } informer = newFunc(f.client, resyncPeriod) f.informers[informerType] = informer return informer }

刚启动时,尝试从 map 中获取 podInformer, 未找到则创建一个,并放置到 map 中:

 

创建 informerFactory 后,分别为 pod 和 node 创建 informer,其中 pod 的 informer 是默认创建的。

接下来跟读创建 podInformer 的过程:

// pkg/scheduler/scheduler.go:500
//
newPodInformer creates a shared index informer that returns only non-terminal pods. func newPodInformer(cs clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { selector := fmt.Sprintf("status.phase!=%v,status.phase!=%v", v1.PodSucceeded, v1.PodFailed) tweakListOptions := func(options *metav1.ListOptions) { options.FieldSelector = selector } return coreinformers.NewFilteredPodInformer(cs, metav1.NamespaceAll, resyncPeriod, nil, tweakListOptions) }

 

从 metav1.NamespaceAll 这个入参可以看出,创建的  podInfromer 会监听所有 namespace 的 pod。

继续深挖,终于看到了 ListWatch 对象:

// vendor/k8s.io/client-go/informers/core/v1/pod.go:58
//
NewFilteredPodInformer constructs a new informer for Pod type. // Always prefer using an informer factory to get a shared informer instead of getting an independent // one. This reduces memory footprint and number of connections to the server. func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { return cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.CoreV1().Pods(namespace).List(context.TODO(), options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.CoreV1().Pods(namespace).Watch(context.TODO(), options) }, }, &corev1.Pod{}, resyncPeriod, indexers, ) }

此处的 ListWatch 对象封装了对 pod 的 list 和 watch 操作,具体逻辑跟读运行时代码才能看到:

接下来,跟一跟 list watch 的运行原理,schduler 在启动时,其中一步是启动所有的 informers:

// cmd/kube-scheduler/app/server.go:145
//
Run executes the scheduler based on the given configuration. It only returns on error or when context is done. func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error { // To help debugging, immediately log version klog.InfoS("Starting Kubernetes Scheduler", "version", version.Get()) klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK")) // Config registration. if cz, err := configz.New("componentconfig"); err == nil { cz.Set(cc.ComponentConfig) } else { return fmt.Errorf("unable to register configz: %s", err) } // Prepare the event broadcaster. cc.EventBroadcaster.StartRecordingToSink(ctx.Done()) // Setup healthz checks. var checks []healthz.HealthChecker if cc.ComponentConfig.LeaderElection.LeaderElect { checks = append(checks, cc.LeaderElection.WatchDog) } waitingForLeader := make(chan struct{}) isLeader := func() bool { select { case _, ok := <-waitingForLeader: // if channel is closed, we are leading return !ok default: // channel is open, we are waiting for a leader return false } } // Start up the healthz server. if cc.SecureServing != nil { handler := buildHandlerChain(newHealthzAndMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer) // TODO: handle stoppedCh and listenerStoppedCh returned by c.SecureServing.Serve if _, _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil { // fail early for secure handlers, removing the old error loop from above return fmt.Errorf("failed to start secure server: %v", err) } } // Start all informers. cc.InformerFactory.Start(ctx.Done()) // DynInformerFactory can be nil in tests. if cc.DynInformerFactory != nil { cc.DynInformerFactory.Start(ctx.Done()) } // Wait for all caches to sync before scheduling. cc.InformerFactory.WaitForCacheSync(ctx.Done()) // DynInformerFactory can be nil in tests. if cc.DynInformerFactory != nil { cc.DynInformerFactory.WaitForCacheSync(ctx.Done()) } // If leader election is enabled, runCommand via LeaderElector until done and exit. if cc.LeaderElection != nil { cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { close(waitingForLeader) sched.Run(ctx) }, OnStoppedLeading: func() { select { case <-ctx.Done(): // We were asked to terminate. Exit 0. klog.InfoS("Requested to terminate, exiting") os.Exit(0) default: // We lost the lock. klog.ErrorS(nil, "Leaderelection lost") klog.FlushAndExit(klog.ExitFlushTimeout, 1) } }, } leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection) if err != nil { return fmt.Errorf("couldn't create leader elector: %v", err) } leaderElector.Run(ctx) return fmt.Errorf("lost lease") } // Leader election is disabled, so runCommand inline until done. close(waitingForLeader) sched.Run(ctx) return fmt.Errorf("finished without leader elect") }

 

遍历 factory 之 map 中保存的的 informer,逐个启动:

// vendor/k8s.io/client-go/informers/factory.go:128
//
Start initializes all requested informers. func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { f.lock.Lock() defer f.lock.Unlock() for informerType, informer := range f.informers { if !f.startedInformers[informerType] { go informer.Run(stopCh) f.startedInformers[informerType] = true } } }

 

 

我们看 podInformer 这个实例的 Run 过程:

 1 // vendor/k8s.io/client-go/tools/cache/shared_informer.go:397 Run 方法
 2 
 3 func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
 4     defer utilruntime.HandleCrash()
 5 
 6     if s.HasStarted() {
 7         klog.Warningf("The sharedIndexInformer has started, run more than once is not allowed")
 8         return
 9     }
10     fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
11         KnownObjects:          s.indexer,
12         EmitDeltaTypeReplaced: true,
13     })
14 
15     cfg := &Config{
16         Queue:            fifo,
17         ListerWatcher:    s.listerWatcher,
18         ObjectType:       s.objectType,
19         FullResyncPeriod: s.resyncCheckPeriod,
20         RetryOnError:     false,
21         ShouldResync:     s.processor.shouldResync,
22 
23         Process:           s.HandleDeltas,
24         WatchErrorHandler: s.watchErrorHandler,
25     }
26 
27     func() {
28         s.startedLock.Lock()
29         defer s.startedLock.Unlock()
30 
31         s.controller = New(cfg)
32         s.controller.(*controller).clock = s.clock
33         s.started = true
34     }()
35 
36     // Separate stop channel because Processor should be stopped strictly after controller
37     processorStopCh := make(chan struct{})
38     var wg wait.Group
39     defer wg.Wait()              // Wait for Processor to stop
40     defer close(processorStopCh) // Tell Processor to stop
41     wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
42     wg.StartWithChannel(processorStopCh, s.processor.run)
43 
44     defer func() {
45         s.startedLock.Lock()
46         defer s.startedLock.Unlock()
47         s.stopped = true // Don't want any new listeners
48     }()
49     s.controller.Run(stopCh)
50 }

 

 继续跟踪 s.controller.Run()

 1 // vendor/k8s.io/client-go/tools/cache/controller.go:128
 2 // Run begins processing items, and will continue until a value is sent down stopCh or it is closed.
 3 // It's an error to call Run more than once.
 4 // Run blocks; call via go.
 5 func (c *controller) Run(stopCh <-chan struct{}) {
 6     defer utilruntime.HandleCrash()
 7     go func() {
 8         <-stopCh
 9         c.config.Queue.Close()
10     }()
11     r := NewReflector(
12         c.config.ListerWatcher,
13         c.config.ObjectType,
14         c.config.Queue,
15         c.config.FullResyncPeriod,
16     )
17     r.ShouldResync = c.config.ShouldResync
18     r.WatchListPageSize = c.config.WatchListPageSize
19     r.clock = c.clock
20     if c.config.WatchErrorHandler != nil {
21         r.watchErrorHandler = c.config.WatchErrorHandler
22     }
23 
24     c.reflectorMutex.Lock()
25     c.reflector = r
26     c.reflectorMutex.Unlock()
27 
28     var wg wait.Group
29 
30     wg.StartWithChannel(stopCh, r.Run)
31 
32     wait.Until(c.processLoop, time.Second, stopCh)
33     wg.Wait()
34 }

 

wg.StartWithChannel(stopCh, r.Run) 进入到 Reflector 的 Run 方法中,Reflector 是封装了 list-watch 逻辑的客户端。