go-zero 消息队列使用

发布时间 2023-09-13 17:24:23作者: lisus2000

消息队列对于大型微服务系统是必不可少的,主要是用来解决削峰、降低服务之间的耦合度以及异步能力。

go-queue 在 segmentio/kafka-go 这个包基础上,使用 go-zero 进行了上层统一封装,让开发人员更容易上手,将更多时间聚焦在开发业务上。https://github.com/zeromicro/go-queue

 

1.1 参数简介

 

 

Brokers: kafka 的多个 Broker 节点

Group:消费者组

Topic:订阅的 Topic 主题

Offset:如果新的 topic kafka 没有对应的 offset 信息,或者当前的 offset 无效了(历史数据被删除),那么需要指定从头(first)消费还是从尾(last)部消费

Conns: 一个 kafka queue 对应可对应多个 consumer,Conns 对应 kafka queue 数量,可以同时初始化多个 kafka queue,默认只启动一个

Consumers : go-queue 内部是起多个 goroutine 从 kafka 中获取信息写入进程内的 channel,这个参数是控制此处的 goroutine 数量(⚠️ 并不是真正消费时的并发 goroutine 数量)

Processors: 当 Consumers 中的多个 goroutine 将 kafka 消息拉取到进程内部的 channel 后,我们要真正消费消息写入我们自己逻辑,go-queue 内部通过此参数控制当前消费的并发 goroutine 数量

MinBytes: fetch 一次返回的最小字节数,如果不够这个字节数就等待.

MaxBytes: fetch 一次返回的最大字节数,如果第一条消息的大小超过了这个限制仍然会继续拉取保证 consumer 的正常运行.因此并不是一个绝对的配置,消息的大小还需要受到 broker 的message.max.bytes限制,以及 topic 的max.message.bytes的限制

Username: kafka 的账号

Password:kafka 的密码

1.2 go-zero 中使用 go-queue 生产者 pusher

项目中首先要拉取 go-queue 的依赖

go get github.com/zeromicro/go-queue@latest

 在 etc/account.yaml 配置文件中添加当前的 kafka 配置信息

 在internal目录下的config下的config.go增加如下配置

 在 svc/serviceContext.go 中初始化 pusher 的 kq client

 在 logic 中写业务逻辑使用 go-queue 的 kq client 发送消息到 kafka

 启动RPC服务,通过RPC工具调用接口,这时候可以看到,消息已经写进去了

 

1.3 go-zero 中使用 go-queue 消费者 consumer

在 etc/user-api.yaml 配置文件中添加当前的 kafka 配置信息

 

 在 internal/config 下的 config.go 中定义 go 映射的配置

 

在 internal 下新建一个 mq文件夹,在 mq文件夹下新建一个UserMqSuccess 消费者 user_mq_succ.go

 

 在 mq 文件夹下新建一个文件 mqs.go 用来监听多个消费者,mqs.go 代码如下

 在 main.go 中启动 consumers 等待消费

 启动消费端

用RPC 调用接品生产消息,就可以看到消费消息了,如下图所示 

 已经看到消息,已经成功消费了

 注:要先在kafka上创建topic,如下图所示,是已经创建好了的了

 如果需要创建 的话,则手动创建一下,如下图所示

 windows下kafka启动如下