Kitex微服务开发实践(ETCD服务注册)

发布时间 2023-08-01 05:00:42作者: N3ptune

服务注册通常用于分布式系统或微服务架构中,是一种用于管理和发现这些分布式服务的机制。它的目标是让服务能够动态地找到其他服务,并能够与其进行通信,而无需显式地配置其位置信息

本文简单讲述使用etcd进行服务注册,基于kitex和hertz框架简单实现微服务应用

代码地址:https://github.com/T4t4KAU/Documents/tree/main/etcd-test

接口定义

使用thrift编写如下idl

add.thrift

namespace go api

struct AddRequest {
    1: i32 first
    2: i32 second
}

struct AddResponse {
    1: i32 sum
}

service AddService {
    AddResponse Add(AddRequest req)
}

echo.thrift

namespace go api

struct EchoRequest {
    1: string message
}

struct EchoResponse {
    2: string message
}

service EchoService {
    EchoResponse Echo(1:EchoRequest req)
}

使用Kitex生成代码(此处不作赘述):

.
├── add
│   ├── handler.go
│   └── main.go
├── build.sh
├── echo
│   ├── handler.go
│   └── main.go
├── go.mod
├── go.sum
├── idl
│   ├── add.thrift
│   └── echo.thrift
├── kitex_gen
│   └── api
│       ├── add.go
│       ├── addservice
│       │   ├── addservice.go
│       │   ├── client.go
│       │   ├── invoker.go
│       │   └── server.go
│       ├── echo.go
│       ├── echoservice
│       │   ├── client.go
│       │   ├── echoservice.go
│       │   ├── invoker.go
│       │   └── server.go
│       ├── k-add.go
│       ├── k-consts.go
│       └── k-echo.go
├── kitex_info.yaml
└── script
    └── bootstrap.sh

echo服务将参数message原样返回,add服务将参数中的两个整数求和后返回

代码实现

首先实现一个无服务注册的版本,在上述生成代码的基础上完善方法实现:

add/hander.go

// Add implements the AddServiceImpl interface.
func (s *AddServiceImpl) Add(ctx context.Context, req *api.AddRequest) (resp *api.AddResponse, err error) {
	resp = new(api.AddResponse)
	resp.Sum = req.First + req.Second
	return
}

add/main.go

package main

func main() {
    // 服务地址
	addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:9091")

	svr := api.NewServer(new(AddServiceImpl),
		server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: "add"}),
		server.WithServiceAddr(addr),
	)

	err := svr.Run()

	if err != nil {
		log.Println(err.Error())
	}
}

echo/handler.go

// Echo implements the EchoServiceImpl interface.
func (s *EchoServiceImpl) Echo(ctx context.Context, req *api.EchoRequest) (resp *api.EchoResponse, err error) {
	resp = new(api.EchoResponse)
	resp.Message = req.Message
	return
}

echo/main.go

package main

func main() {
    // 服务地址
	addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:9092")

	svr := api.NewServer(new(EchoServiceImpl),
		server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: "echo"}),
		server.WithServiceAddr(addr),
	)

	err := svr.Run()

	if err != nil {
		log.Println(err.Error())
	}
}

创建一个api包,作为两个服务的API网关,使用hertz来处理用户请求

目录结构:

.
├── add
│   ├── handler.go
│   └── main.go
├── api
│   ├── add.go
│   ├── echo.go
│   └── main.go
├── build.sh
├── echo
│   ├── handler.go
│   └── main.go
├── go.mod
├── go.sum
├── idl
│   ├── add.thrift
│   └── echo.thrift
├── kitex_gen
│   └── api
│       ├── add.go
│       ├── addservice
│       │   ├── addservice.go
│       │   ├── client.go
│       │   ├── invoker.go
│       │   └── server.go
│       ├── echo.go
│       ├── echoservice
│       │   ├── client.go
│       │   ├── echoservice.go
│       │   ├── invoker.go
│       │   └── server.go
│       ├── k-add.go
│       ├── k-consts.go
│       └── k-echo.go
├── kitex_info.yaml
└── script
    └── bootstrap.sh

实现web handler函数,在handler中使用RPC请求服务

add:

package main

var addClient addservice.Client

func AddHandler(ctx context.Context, c *app.RequestContext) {
	num1, _ := strconv.Atoi(c.Query("first"))
	num2, _ := strconv.Atoi(c.Query("second"))
    
    // RPC请求
	resp, err := addClient.Add(ctx, &api.AddRequest{
		First: int32(num1), Second: int32(num2),
	})
	if err != nil {
		c.JSON(http.StatusInternalServerError, utils.H{
			"message": err.Error(),
		})
		return
	}

	c.JSON(http.StatusOK, utils.H{
		"message": resp.Sum,
	})
}

echo:

package main

var echoClient echoservice.Client

func EchoHandler(ctx context.Context, c *app.RequestContext) {
    // RPC请求
	resp, err := echoClient.Echo(ctx, &api.EchoRequest{
		Message: c.Query("message"),
	})
	if err != nil {
		c.JSON(http.StatusInternalServerError, utils.H{
			"message": err.Error(),
		})
	}

	c.JSON(http.StatusOK, utils.H{
		"message": resp.Message,
	})
}

在主调函数中进行RPC客户端初始化和路由注册:

package main

func Init() {
	var err error
	addClient, err = addservice.NewClient("addservice",
		client.WithHostPorts("127.0.0.1:9091"))
	if err != nil {
		panic(err)
	}
	echoClient, err = echoservice.NewClient("echoservice",
		client.WithHostPorts("127.0.0.1:9092"))
	if err != nil {
		panic(err)
	}
}

func main() {
	Init()
	r := server.Default()
	r.POST("/add", AddHandler)
	r.GET("/echo", EchoHandler)

	r.Spin()
}

启动add和echo服务,再启动api,请求URL即可访问,此处省略测试过程

引入服务注册

在上述实现中,要显式的告诉API网关服务的地址,一旦这个地址发生变化,会导致服务不可用,接下来使用服务注册

使用docker启动etc:

docker run -d -p 10079:2379 --name etcd \
  -e ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 \
  -e ETCD_ADVERTISE_CLIENT_URLS=http://0.0.0.0:2379 \
  -e ETCDCTL_API=3 \
  quay.io/coreos/etcd:v3.5.5

etcd服务器启动在10079端口

接下来修改服务代码:

add/main.go

package main

func main() {
    // 服务注册
	r, err := etcd.NewEtcdRegistry([]string{"127.0.0.1:10079"})
	if err != nil {
		panic(err)
	}

	addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:9091")

	svr := api.NewServer(new(AddServiceImpl),
		server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: "add"}), // 指定服务名称
		server.WithServiceAddr(addr),
		server.WithRegistry(r),  // 设置服务注册
	)

	err = svr.Run()

	if err != nil {
		log.Println(err.Error())
	}
}

echo/main.go

package main
func main() {
    // 服务注册
	r, err := etcd.NewEtcdRegistry([]string{"127.0.0.1:10079"})
	if err != nil {
		panic(err)
	}

	addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:9092")

	svr := api.NewServer(new(EchoServiceImpl),
		server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: "echo"}), // 指定服务名称
		server.WithServiceAddr(addr),
		server.WithRegistry(r),  // 设置服务注册
	)

	err = svr.Run()

	if err != nil {
		log.Println(err.Error())
	}
}

修改api代码,无须给出两个服务的地址了:

package main

func Init() {
	var err error

	r, err := etcd.NewEtcdResolver([]string{"127.0.0.1:10079"})
	if err != nil {
		panic(err)
	}

	addClient, err = addservice.NewClient("add",
		client.WithResolver(r),
	)
	if err != nil {
		panic(err)
	}
	echoClient, err = echoservice.NewClient("echo",
		client.WithResolver(r),
	)
	if err != nil {
		panic(err)
	}
}

func main() {
	Init()
	r := server.Default()
	r.POST("/add", AddHandler)
	r.GET("/echo", EchoHandler)

	r.Spin()
}

启动之后,服务正常使用