k8s controller选主

发布时间 2023-09-16 11:10:50作者: 王景迁

controller选主代码实现

controller多实例可能状态
1 抢锁成功,作为Leader跑业务
2 抢锁失败等待
3 释放锁,结束

k8s官方例子
go.mod和主流程

module controller-by-leader-election

go 1.19

require (
	github.com/google/uuid v1.3.0
	k8s.io/apimachinery v0.28.2
	k8s.io/client-go v0.28.2
	k8s.io/klog/v2 v2.100.1
)

require (
	github.com/davecgh/go-spew v1.1.1 // indirect
	github.com/emicklei/go-restful/v3 v3.9.0 // indirect
	github.com/go-logr/logr v1.2.4 // indirect
	github.com/go-openapi/jsonpointer v0.19.6 // indirect
	github.com/go-openapi/jsonreference v0.20.2 // indirect
	github.com/go-openapi/swag v0.22.3 // indirect
	github.com/gogo/protobuf v1.3.2 // indirect
	github.com/golang/protobuf v1.5.3 // indirect
	github.com/google/gnostic-models v0.6.8 // indirect
	github.com/google/go-cmp v0.5.9 // indirect
	github.com/google/gofuzz v1.2.0 // indirect
	github.com/imdario/mergo v0.3.6 // indirect
	github.com/josharian/intern v1.0.0 // indirect
	github.com/json-iterator/go v1.1.12 // indirect
	github.com/mailru/easyjson v0.7.7 // indirect
	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
	github.com/modern-go/reflect2 v1.0.2 // indirect
	github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
	github.com/spf13/pflag v1.0.5 // indirect
	golang.org/x/net v0.13.0 // indirect
	golang.org/x/oauth2 v0.8.0 // indirect
	golang.org/x/sys v0.10.0 // indirect
	golang.org/x/term v0.10.0 // indirect
	golang.org/x/text v0.11.0 // indirect
	golang.org/x/time v0.3.0 // indirect
	google.golang.org/appengine v1.6.7 // indirect
	google.golang.org/protobuf v1.31.0 // indirect
	gopkg.in/inf.v0 v0.9.1 // indirect
	gopkg.in/yaml.v2 v2.4.0 // indirect
	gopkg.in/yaml.v3 v3.0.1 // indirect
	k8s.io/api v0.28.2 // indirect
	k8s.io/kube-openapi v0.0.0-20230905202853-d090da108d2f // indirect
	k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect
	sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
	sigs.k8s.io/structured-merge-diff/v4 v4.3.0 // indirect
	sigs.k8s.io/yaml v1.3.0 // indirect
)

// k8s.io/api => k8s.io/api v0.0.0-20230915221828-1cac0b1ef7e3
replace k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20230915221524-64708d3e9048
package main

import (
	"context"
	"flag"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/google/uuid"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	clientset "k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/tools/leaderelection"
	"k8s.io/client-go/tools/leaderelection/resourcelock"
	"k8s.io/klog/v2"
)

func buildConfig(kubeconfig string) (*rest.Config, error) {
	if kubeconfig != "" {
		cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
		if err != nil {
			return nil, err
		}
		return cfg, nil
	}

	cfg, err := rest.InClusterConfig()
	if err != nil {
		return nil, err
	}
	return cfg, nil
}

func main() {
	klog.InitFlags(nil)

	var kubeconfig string
	var leaseLockName string
	var leaseLockNamespace string
	var id string

	// kubeconfig 指定了kubernetes集群的配置文件路径
	flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
	// 锁的拥有者的ID,如果没有传参数进来,就随机生成一个
	flag.StringVar(&id, "id", uuid.New().String(), "the holder identity name")
	// 锁的ID,对应kubernetes中资源的name
	flag.StringVar(&leaseLockName, "lease-lock-name", "", "the lease lock resource name")
	// 锁的命名空间
	flag.StringVar(&leaseLockNamespace, "lease-lock-namespace", "", "the lease lock resource namespace")
	// 解析命令行参数
	flag.Parse()

	if leaseLockName == "" {
		klog.Fatal("unable to get lease lock resource name (missing lease-lock-name flag).")
	}
	if leaseLockNamespace == "" {
		klog.Fatal("unable to get lease lock resource namespace (missing lease-lock-namespace flag).")
	}

	// leader election uses the Kubernetes API by writing to a
	// lock object, which can be a LeaseLock object (preferred),
	// a ConfigMap, or an Endpoints (deprecated) object.
	// Conflicting writes are detected and each client handles those actions
	// independently.
	config, err := buildConfig(kubeconfig)
	if err != nil {
		klog.Fatal(err)
	}
	// 获取kubernetes集群的客户端,如果获取不到,就抛异常退出
	client := clientset.NewForConfigOrDie(config)

	// 模拟Controller的逻辑代码
	run := func(ctx context.Context) {
		// complete your controller loop here
		klog.Info("Controller loop...")

		// 不退出
		select {}
	}

	// use a Go context so we can tell the leaderelection code when we
	// want to step down
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// listen for interrupts or the Linux SIGTERM signal and cancel
	// our context, which the leader election code will observe and
	// step down
	// 处理系统的系统,收到SIGTERM信号后,会退出进程
	ch := make(chan os.Signal, 1)
	signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
	go func() {
		<-ch
		klog.Info("Received termination, signaling shutdown")
		cancel()
	}()

	// we use the Lease lock type since edits to Leases are less common
	// and fewer objects in the cluster watch "all Leases".

	// 根据参数,生成锁。这里使用的Lease这种类型资源作为锁
	lock := &resourcelock.LeaseLock{
		LeaseMeta: metav1.ObjectMeta{
			Name:      leaseLockName,
			Namespace: leaseLockNamespace,
		},
		// 跟kubernetes集群关联起来
		Client: client.CoordinationV1(),
		LockConfig: resourcelock.ResourceLockConfig{
			Identity: id,
		},
	}

	// start the leader election code loop

	// 注意,选举逻辑启动时候,会传入ctx参数,如果ctx对应的cancel函数被调用,那么选举也会结束
	leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
		// 选举使用的锁
		Lock: lock,
		// IMPORTANT: you MUST ensure that any code you have that
		// is protected by the lease must terminate **before**
		// you call cancel. Otherwise, you could have a background
		// loop still running and another process could
		// get elected before your background loop finished, violating
		// the stated goal of the lease.
		//主动放弃leader,当ctx canceled的时候
		ReleaseOnCancel: true,
		LeaseDuration:   60 * time.Second, // 选举的任期,60s一个任期,如果在60s后没有renew,那么leader就会释放锁,重新选举
		RenewDeadline:   15 * time.Second, // renew的请求的超时时间
		RetryPeriod:     5 * time.Second,  // leader获取到锁后,renew leadership的间隔。非leader,抢锁成为leader的间隔

		// 回调函数的注册
		Callbacks: leaderelection.LeaderCallbacks{
			// 成为leader的回调
			OnStartedLeading: func(ctx context.Context) {
				// we're notified when we start - this is where you would
				// usually put your code
				// 运行controller的逻辑
				run(ctx)
			},
			OnStoppedLeading: func() {
				// we can do cleanup here
				// 退出leader的
				klog.Infof("leader lost: %s", id)
				os.Exit(0)
			},
			OnNewLeader: func(identity string) {
				// 有新的leader当选
				// we're notified when new leader elected
				if identity == id {
					// I just got the lock
					return
				}
				klog.Infof("new leader elected: %s", identity)
			},
		},
	})
}

运行2个controller实例,lease-lock-name都是c,lease-lock-namespace都是default,id不同

该controller获得了锁
./controller-by-leader-election --kubeconfig=/root/.kube/config -logtostderr=true -lease-lock-name=c -lease-lock-namespace=default -id=1

该controller没有获得锁
./controller-by-leader-election --kubeconfig=/root/.kube/config -logtostderr=true -lease-lock-name=c -lease-lock-namespace=default -id=2

在default命名空间下增加了lease资源c

查看lease内容

在id=1 controller退出后id=2 controller获得了锁

锁的实现

k8s提供了Lease/Configmap/Endpoint作为锁的底层。

update时对比resourceVersion。

func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
  now := metav1.Now()
  leaderElectionRecord := rl.LeaderElectionRecord{
    HolderIdentity:       le.config.Lock.Identity(),
    LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
    RenewTime:            now,
    AcquireTime:          now,
  }

  // 检查锁有没有
  oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
  if err != nil {
    // 没有锁的资源,就创建一个
    if !errors.IsNotFound(err) {
      klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
      return false
    }
    if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
      klog.Errorf("error initially creating leader election record: %v", err)
      return false
    }
    // 对外宣称自己是leader
    le.setObservedRecord(&leaderElectionRecord)
​
    return true
  }
​
  // 2. Record obtained, check the Identity & Time
  if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
    // leader不断renew,oldLeaderElectionRawRecord一直变化,更新le.observedTime
    le.setObservedRecord(oldLeaderElectionRecord)
    le.observedRawRecord = oldLeaderElectionRawRecord
  }
  
  // 未超时且不是leader,return
  if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
    le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
    !le.IsLeader() {
    klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
    return false
  }

  // leader续约锁
  if le.IsLeader() {
    leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
    leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
  } else {
    // 不是leader,尝试成为leader
    leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
  }
  
  // 更新锁
  if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
    klog.Errorf("Failed to update lock: %v", err)
    return false
  }
​
  le.setObservedRecord(&leaderElectionRecord)
  return true
}

参考资料

https://juejin.cn/post/7157648925078323207