使用etcd实现Master的选举功能

发布时间 2023-04-20 15:01:00作者: 我有酒,你有故事吗?

背景

说起master选举,最开始想到的可能就是zookeeper,但有些场景zookeeper的使用过于繁重和复杂,又由于etcd是基于Raft的分布式K/V存储,强一致性的K/V读写是核心。
所以造就了etcd可以用于master的选举的场景。

原理

etcd clientv3 concurrency对选举进行了封装

import github.com/coreos/etcd/clientv3/concurrency

上篇文章介绍了etcd使用txn实现分布式锁,而节点选举也要依靠Txn进行创建key的CAS操作
zookeeper是利用创建临时有序节点的方式,etcd也同样的是在prefix path下创建相应的key,并且key的revision也是有序。
同样也是通过watch机制进行通知。

应用

//封装
type etcdLeader struct {
	sess *concurrency.Session    //会话 客户端的租约会话,会做KeepAlive
	elec *concurrency.Election    //选举
}

func Campaign(ctx context.Context, client *clientv3.Client, prefix, value string) (*etcdLeader, error) {
	if value == "" {
		value = fmt.Sprintf("%s-%d", MyHostName, timestampMs())
	}
	s, err := concurrency.NewSession(client)
	if err != nil {
		return nil, fmt.Errorf("failed to generate session: %v", err)
	}
	prefix = "/openresty/" + "-concurrency/" + strings.TrimPrefix(prefix, "/")

	elec := concurrency.NewElection(s, prefix)
	return &etcdLeader{
		sess: s,
		elec: elec,
	}, elec.Campaign(ctx, value) // blocked until elected
}

//elec.Campaign 会参加选举,直到它被选中、发生错误或上下文被取消

// 调用elec.Proclaim 常用来检测当前主节点是否生效
func (l *etcdLeader) Proclaim(ctx context.Context, value string) error {
	if l.elec == nil {
		return fmt.Errorf("already closed")
	}
	if value == "" {
		value = fmt.Sprintf("%s-%d", MyHostName, timestampMs())
	}
	return l.elec.Proclaim(ctx, value)
}
//调用elec.Resign 退出master
func (l *etcdLeader) Resign(ctx context.Context) error {
	if l.elec == nil {
		return nil
	}
	defer l.sess.Close()
	defer func() {
		l.elec = nil
		l.sess = nil
	}()
	return l.elec.Resign(ctx)
}

测试

//模拟进程处理
func process(processID string) {
	cli, err := getEtcdCli("open")  //获得cli
	if err != nil {
		fmt.Println(err)
		panic(processID)
	}
	ctx := context.Background()
	maintainer, err := Campaign(context.Background(), cli, "process", "")  //会阻塞,直到该节点选为master
	if err != nil {
		fmt.Println("process " + processID + " Campaign err" + err.Error())
		return
	}
	fmt.Println(processID + "is maintainer")
	ticker := time.NewTicker(time.Second)
	count := 0
	defer ticker.Stop()
	for {
		select {
		case <-ctx.Done():
			err = maintainer.Resign(context.Background())  //超时退出
			if err != nil {
				fmt.Printf("error occured when resign lfe_sync: %v", err)
			}
			return
		case <-ticker.C:
			err = maintainer.Proclaim(ctx, "") 
			if err != nil {
				fmt.Printf("error occured when proclaim lfe_sync: %v", err)
				maintainer.Resign(ctx)
				return
			}
			count++
			//do sth
			fmt.Println("processID ", processID+" ", count)
			if count == 5 {      //主动退出
				err = maintainer.Resign(context.Background())
				if err != nil {
					fmt.Printf("error occured when resign lfe_sync: %v", err)
				}
				return
			}
		}
	}
}

总结与思考

如果从Campaign源码可以看出,比起上一篇利用txn实现分布式锁的实现,选主增加watch机制,减少轮训获取锁的过程。在本场景中选主和分布式锁的使用并没有什么差别,只是选主减少轮训,性能更好。
本次代码地址:https://github.com/zhaoshoucheng/hodgepodge/blob/main/etcd/master.go