NSQ demo

发布时间 2023-08-22 15:03:59作者: kanx1

Docker

docker pull nsqio/nsq

nsqd

nsqd​ 是接收、队列和向客户端传递消息的守护进程。它可以独立运行,但通常在具有nsqlookupd​实例的集群中进行配置(这种情况下,他将会发布主题和频道以便发现)

配置及api:https://nsq.io/components/nsqd.html

docker run --name nsqd -p 4150:4150 -p 4151:4151 nsqio/nsq /nsqd --broadcast-address=172.18.0.1

nsqlookup

nsqlookupd​ 是管理拓扑信息的守护进程。客户端可以查询 nsqlookupd​ 来发现特定主题的 nsqd​ 生产者,而 nsqd​ 节点会广播主题和通道的信息。

配置及api:https://nsq.io/components/nsqlookupd.html

nsqadmin

nsqadmin​ 是一个 Web 用户界面,可以实时查看汇总的集群统计信息,并执行各种管理任务。

docker run --name nsqadmin -p 4171:4171 nsqio/nsq /nsqadmin  --nsqd-http-address=172.18.0.1:4151

Demo

 

// producer.go
package main

import (
	"bufio"
	"fmt"
	"github.com/nsqio/go-nsq"
	"os"
	"strings"
)

var producer *nsq.Producer

func initProducer(str string) (err error) {
	config := nsq.NewConfig()
	producer, err = nsq.NewProducer(str, config)
	if err != nil {
		fmt.Println("create producer failed, err:", err)
		return
	}
	return nil
}

func main() {
	nsqAddress := "localhost:4150"
	err := initProducer(nsqAddress)
	if err != nil {
		fmt.Printf("init producer failed, err:%v\n", err)
		return
	}

	reader := bufio.NewReader(os.Stdin)
	for {
		data, err := reader.ReadString('\n')
		if err != nil {
			fmt.Printf("read string failed, err:%v\n", err)
			continue
		}
		data = strings.TrimSpace(data)
		if strings.ToUpper(data) == "Q" {
			break
		}

		err = producer.Publish("topic_demo", []byte(data))
		if err != nil {
			fmt.Printf("publish msg to nsq failed, err:%v\n", err)
			continue
		}
	}
}

 

// consumer.go
package main

import (
	"fmt"
	"time"

	"github.com/nsqio/go-nsq"
)

type ConsumerT struct{}

func main() {
	InitConsumer("topic_demo", "localhost:4150")
	for {
		time.Sleep(1 * time.Second)
	}
}

func (*ConsumerT) HandleMessage(msg *nsq.Message) error {
	fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
	return nil
}

func InitConsumer(topic string, address string) {
	cfg := nsq.NewConfig()
	cfg.LookupdPollInterval = 15 * time.Second
	c, err := nsq.NewConsumer(topic, "ch", cfg)
	if err != nil {
		panic(err)
	}
	c.SetLogger(nil, 0)
	c.AddHandler(&ConsumerT{})

	if err := c.ConnectToNSQD(address); err != nil {
		panic(err)
	}
}