服务注册与发现-etcd 遇见的问题

发布时间 2023-06-02 14:36:27作者: 尚墨

服务注册与发现-etcd 遇见的问题

问题现象

grpc client 调用 server ,通过 etcd 提供服务发现能力

2023/06/02 11:25:33 scheme: etcd;
{"level":"warn","ts":"2023-06-02T11:25:33.444+0800","logger":"etcd-client","caller":"endpoints/endpoints_impl.go:91","msg":"unmarshal endpoint update failed","key":"etcd.mygrpc-svc","error":"invalid character '.' after top-level value"}
2023/06/02 11:25:33 gcli.grpcc:  etcd:///etcd.mygrpc-svc

// client/main.go 客户端代码
func newGrpcCli(host []string) *grpcli {
	gcli := &grpcli{
		host: host,
	}
	cli, err := cliv3.NewFromURL("http://localhost:2379")
	if err != nil {
		gcli.err = err
		return gcli
	}

	etcdResolver, err := resolver.NewBuilder(cli)
	if err != nil {

		gcli.err = err
		return gcli
	}
	log.Printf("scheme: %s;", etcdResolver.Scheme())

	conn, err := grpc.DialContext(context.TODO(), "etcd:///etcd.mygrpc-svc",
		grpc.WithResolvers(etcdResolver),
		grpc.WithTransportCredentials(insecure.NewCredentials()),
		grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`))
	if err != nil {
		gcli.err = err
		return gcli
	}
	gcli.grpcc = conn
	log.Println("gcli.grpcc: ", gcli.grpcc.Target())
	return gcli
}

问题分析

可以很明显看见报错,好像是解析一个 JSON 数据失败,unmarshal 一般是反序列化的时候用到的函数。那么

func (m *endpointManager) DeleteEndpoint(ctx context.Context, key string, opts ...clientv3.OpOption) error {
        // 下面这行是源码第 91行
	return m.Update(ctx, []*UpdateWithOpts{NewDeleteUpdateOpts(key, opts...)})
}

// 调用的地址解析函数
func (m *endpointManager) Update(ctx context.Context, updates []*UpdateWithOpts) (err error) {
	ops := make([]clientv3.Op, 0, len(updates))
	for _, update := range updates {
		if !strings.HasPrefix(update.Key, m.target+"/") {
			return status.Errorf(codes.InvalidArgument, "endpoints: endpoint key should be prefixed with '%s/' got: '%s'", m.target, update.Key)
		}

		switch update.Op {
		case Add:
			internalUpdate := &internal.Update{
				Op:       internal.Add,
				Addr:     update.Endpoint.Addr,
				Metadata: update.Endpoint.Metadata,
			}

			var v []byte
                        // 在这里遇见了错误并返回
			if v, err = json.Marshal(internalUpdate); err != nil {
				return status.Error(codes.InvalidArgument, err.Error())
			}
			ops = append(ops, clientv3.OpPut(update.Key, string(v), update.Opts...))
		case Delete:
			ops = append(ops, clientv3.OpDelete(update.Key, update.Opts...))
		default:
			return status.Error(codes.InvalidArgument, "endpoints: bad update op")
		}
	}
	_, err = m.client.KV.Txn(ctx).Then(ops...).Commit()
	return err
}

我们来看一下 grpc server 服务注册代码

// main.go 
import (
    myetcdRgst "mygrpc/registry/etcd"
)
func main() {
	flag.Parse()
	addr := fmt.Sprintf("0.0.0.0:%s", port)
        ......
	etcdCli, err := myetcdRgst.NewEtcdRegister()
	if err != nil {
		log.Println("etcd register: ", err)
		return
	}
	defer etcdCli.Close()

	sn := "etcd.mygrpc-svc"
        // 这里传入的 key: etcd.mygrpc-svc ,addr: 0.0.0.0:8080
	if err := etcdCli.RegisterServer(sn, addr, 5); err != nil {
		log.Println("register service: ", err)
		return
	}
        ......
}

// registry/etcd/etcd.go
func (s *EtcdRegister) BindLease(key, value string) error {
        // 将数据放入 etcd 中
	res, err := s.etcdCli.Put(s.ctx, key, value, clientv3.WithLease(s.leaseId))
	if err != nil {
		return err
	}
	log.Println("BindLease: ", res)
	return nil
}

这时我们查看 etcd 中保存的数据格式

docker exec -it etcd-server /bin/bash
etcdctl get etcd.mygrpc-svc --prefix
# 输出
etcd.mygrpc-svc
0.0.0.0:8080

解决方式

这种格式中存在 . 这样的符号。与报错信息发生的呼应。我选择修改一下BindLease() 函数中的 value

// registry/etcd/etcd.go
func (s *EtcdRegister) BindLease(key, value string) error {
        // 将数据放入 etcd 中
        data := `{"addr":"`+value+`"}`
	res, err := s.etcdCli.Put(s.ctx, key, data, clientv3.WithLease(s.leaseId))
	if err != nil {
		return err
	}
	log.Println("BindLease: ", res)
	return nil
}
# 执行客户端代码,发现调用成功
cd client
go run main.go 
# 输出
2023/06/02 14:01:43 scheme: etcd;
2023/06/02 14:01:43 gcli.grpcc:  etcd:///etcd.mygrpc-svc                                                  
say:  Name:"My name is sober; my port: 8080"

如何更加优雅,且实现负载均衡

// 注册代码改造,新增随机哈希值
// 获取一个哈希值用于标记服务端的
func randStr() string {
	var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
	b := make([]rune, 5)
	for i := range b {
		b[i] = letterRunes[rand.Intn(len(letterRunes))]
	}
	return string(b)
}

func (s *EtcdRegister) BindLease(key, value string) error {
	em, err := endpoints.NewManager(s.etcdCli, key)
	if err != nil {
		return err
	}
	realKey := fmt.Sprintf("%s/%s", key, randStr())
        // 将服务端添加进 endpoints 列表中
	return em.AddEndpoint(context.TODO(), realKey, endpoints.Endpoint{Addr: value})
}
# etcd 存储表现
docker exec -it etcd-server /bin/bash
etcdctl get etcd.mygrpc-svc --prefix
# 输出
etcd.mygrpc-svc/NshHo
{"Op":0,"Addr":"0.0.0.0:8080","Metadata":null}
etcd.mygrpc-svc/OJZbS
{"Op":0,"Addr":"0.0.0.0:8081","Metadata":null}

客户端再次访问的时候 etcd 对服务端的解析就实现了负载均衡,无需我们单独实现。

参考文献

gRPC naming and discovery