rocketmq-client-go

发布时间 2023-06-15 11:53:30作者: 黑熊一只

关注几个配置项:

topic

groupName

tag

按需配置即可。

关于producer和consumer的入口启动略去,客户端层面,关于producer和consumer可以按照自己业务特点,进行配置。

以下为simple样例。

生产者

 1 package producer
 2 
 3 import (
 4     "context"
 5     "fmt"
 6     "github.com/apache/rocketmq-client-go/v2"
 7     "github.com/apache/rocketmq-client-go/v2/primitive"
 8     "github.com/apache/rocketmq-client-go/v2/producer"
 9     "os"
10 )
11 
12 func ProTagSimple() {
13     p, _ := rocketmq.NewProducer(
14         producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
15         producer.WithRetry(2),
16         producer.WithGroupName("testGroup"),
17     )
18     err := p.Start()
19     if err != nil {
20         fmt.Printf("start producer error: %s", err.Error())
21         os.Exit(1)
22     }
23     tags := []string{"TagA", "TagB", "TagC"}
24     for i := 0; i < 3; i++ {
25         tag := tags[i%3]
26         msg := primitive.NewMessage("test",
27             []byte("Hello RocketMQ Go Client!"))
28         msg.WithTag(tag)
29 
30         res, err := p.SendSync(context.Background(), msg)
31         if err != nil {
32             fmt.Printf("send message error: %s\n", err)
33         } else {
34             fmt.Printf("send message success: result=%s\n", res.String())
35         }
36     }
37     err = p.Shutdown()
38     if err != nil {
39         fmt.Printf("shutdown producer error: %s", err.Error())
40     }
41 }

消费者

 1 package consumer
 2 
 3 import (
 4     "context"
 5     "fmt"
 6     "github.com/apache/rocketmq-client-go/v2"
 7     "github.com/apache/rocketmq-client-go/v2/consumer"
 8     "github.com/apache/rocketmq-client-go/v2/primitive"
 9     "os"
10     "time"
11 )
12 
13 func ConTagSimple() {
14     c, _ := rocketmq.NewPushConsumer(
15         consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
16         consumer.WithGroupName("testGroup"),
17     )
18     selector := consumer.MessageSelector{
19         Type:       consumer.TAG,
20         Expression: "TagA || TagC",
21     }
22     err := c.Subscribe("test", selector, func(ctx context.Context,
23         msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
24         fmt.Printf("subscribe callback: %v \n", msgs)
25         return consumer.ConsumeSuccess, nil
26     })
27     if err != nil {
28         fmt.Println(err.Error())
29     }
30     err = c.Start()
31     if err != nil {
32         fmt.Println(err.Error())
33         os.Exit(-1)
34     }
35     time.Sleep(time.Minute * 5)
36     err = c.Shutdown()
37     if err != nil {
38         fmt.Printf("shutdown Consumer error: %s", err.Error())
39     }
40 }