kafka-consumer.go

发布时间 2023-07-13 19:32:59作者: 木林coder

package consumer

import (
"fmt"
"sync"

"github.com/Shopify/sarama"

)

type CRead func() string

func Run() {
var wg sync.WaitGroup
consumer, err := sarama.NewConsumer([]string{"192.168.184.10:9092"}, nil)
if err != nil {
fmt.Println("Failed to start consumer: %s", err)
return
}
partitionList, err := consumer.Partitions("sun") // 通过topic获取到所有的分区
if err != nil {
fmt.Println("Failed to get the list of partition: ", err)
return
}
fmt.Println("partitionList:", partitionList)

for partition := range partitionList { // 遍历所有的分区
	pc, err := consumer.ConsumePartition("sun", int32(partition), sarama.OffsetNewest) // 针对每个分区创建一个分区消费者
	if err != nil {
		fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)
	}
	wg.Add(1)
	go func(sarama.PartitionConsumer) { // 为每个分区开一个go协程取值
		for msg := range pc.Messages() { // 阻塞直到有值发送过来,然后再继续等待
			fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
		}
		defer pc.AsyncClose()
		wg.Done()
	}(pc)
}
wg.Wait()
consumer.Close()

}

func Test() {
fmt.Println("this is consumer")
}