QTT-nsqd主流程

发布时间 2023-03-30 14:25:36作者: 一束光

NSQ是Go语言编写的,开源的分布式消息队列中间件,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。

nsq 有三个必要的组件nsqd、nsqlookupd、nsqadmin 

 

nsqd :
负责接收消息,存储队列和将消息发送给客户端

 

nsqlookupd
主要负责服务发现、负责nsqd的心跳、状态监测,给客户端、nsqadmin提供nsqd地址与状态

 

nsqadmin:
nsqadmin是一个web管理界面

 

概念:

nsqd:

生产者和消费者都是通过直接与nsqd进行交互产生或者消费数据。

生产者可以通过tcp或者http将数据抛入nsqd。

消费者使用tcp与nsqd连接消费队列中的数据。

 

topic和channel:

 

nsqd中可以有多个topic,一个topic中可以有多个channel。

 

如果一个topic中有多个channel,每个channel都能获得一份消息的复制。channel之间相互独立。

 

当一个channel有多个连接者(消费者)时候,消息随机发给其中一个消费者,不会出现一个消息推送给多个消费者。

nsqd的启动流程:

  1. 调用nsqd.NewOptions()加载参数及参数的默认值。
  2. 捕获用户命令行启动参数并映射到NewOptions对应的参数。
  3. 解析配置文件。
  4. 参数合并。将参数用户命令行参数,配置文件内容合并到一个变量
  5. 初始化NSQD结构体。用变量初始化NSQD结构体
  6. 调用LoadMetadata(),主要是加载元数据信息。包括topic和channel
  7. 调用PersistMetadata(),主要将元数据持久化到文件中。
  8. 调用nsqd.Main(),启动nsqd服务

变量权重

// Values are resolved with the following priorities (highest to lowest):
//
// 1. Command line flag
// 2. Deprecated command line flag
// 3. Config file value
// 4. Get() value (if Getter)
// 5. Options struct default value

 

nsqd启动代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (p *program) Start() error {
   opts := nsqd.NewOptions()
 
   flagSet := nsqdFlagSet(opts)
   flagSet.Parse(os.Args[1:])
 
 
   var cfg config
   configFile := flagSet.Lookup("config").Value.String()
   cfg.Validate()
 
   options.Resolve(opts, flagSet, cfg)
   nsqd := nsqd.New(opts)
 
   nsqd.LoadMetadata()
   nsqd.PersistMetadata()
   nsqd.Main()
 
}

 

一、配置初始化及解析

   opts := nsqd.NewOptions()

   flagSet := nsqdFlagSet(opts)
   flagSet.Parse(os.Args[1:])

 

二、NSQD初始化

NSQD的初始化主要在nsq/nsqd/nsqd.go文件的New(opts *Options)函数中

主要作用是:初始化NSQD,并检验Options的配置信息

 

三、元数据加载和持久化

加载和持久化主要在

   nsqd.LoadMetadata()
   nsqd.PersistMetadata()

这两个函数中

看下LoadMetadata()方法

1.元数据以json格式保存在nsqd可执行文件目录下的nsqd.dat文件中。
2.读取文件中的json数据并映射成meta结构体,得到系统中存在的topic列表,遍历列表中的topic:
  (1)检查topic名称是否合法(长度在1-64之间,满足正则表达式^[\.a-zA-Z0-9_-]+(#ephemeral)?$),若不合法则忽略
  (2) 使用GetTopic()函数通过名字获得topic对象
  (3)判断当前topic对象是否处于暂停状态,是的话调用Pause函数暂停topic
  (4) 获取当前topic下所有的channel,并且遍历channel,执行的操作与topic基本一致
        (i)检查channel名称是否合法(长度在1-64之间,满足正则表达式^[\.a-zA-Z0-9_-]+(#ephemeral)?$),若不合法则忽略
        (ii)使用GetChannel函数通过名字获得channel对象
        (iii)判断当前channel对象是否处于暂停状态,是的话调用Pause函数暂停channel
至此,元数据的载入完成

func (n *NSQD) LoadMetadata() error {
    //上边几行说的是避免重复加载锁和校验新旧元数据文件
   var m meta
   json.Unmarshal(data, &m)
   for _, t := range m.Topics {
      if !protocol.IsValidTopicName(t.Name) {
         continue
      }
      topic := n.GetTopic(t.Name)
      for _, c := range t.Channels {
         if !protocol.IsValidChannelName(c.Name) {
            continue
         }
         channel := topic.GetChannel(c.Name)
      }
      topic.Start()
   }
   return nil
}
PersistMetadata()方法

topics:[{name,channels[{name,paused}],paused}]

转成json格式的字符串写入到元数据文件中nsqd.dat。

 

四、NSQD服务启动

 

1.开启一个tcp服务,用于监听tcp连接
     (1)当有新的客户端连接后,检测协议版本号
     (2)最后调用protocol_v2的IOLoop(conn net.Conn)进行客户端读写操作
2.开启一个协程,开启一个http的Serve,用于监听http请求
3.可选,如果配置tls信息,则新建一个https的服务,用于监听https请求


4.调用queueScanLoop()方法 作用:
   (1)定期   执行根据channel的数量控制worker Pool中worker的数量
   (2)定期   检测channel中是否有可以投递的消息,如果有,则投递消息


5.调用lookupLoop()方法 作用:
   (1)定时检测nsqd和nsqlookupd的连接信息(默认15s,执行一次PING命令来监听)
   (2)有Channel和Topic更新,则发送给所有配置的nsqlookupd
   (3)更新nsqlookupd配置


6.如果配置的StatsdAddress不为空,则调用statsdLoop,用于统计相关信息

func (n *NSQD) Main() {
   var err error
   ctx := &context{n}
 
   n.tcpListener, err = net.Listen("tcp", n.getOpts().TCPAddress)
   n.httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress)
   if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
      n.httpsListener, _ = tls.Listen("tcp", n.getOpts().HTTPSAddress, n.tlsConfig)
 
   }
 
   tcpServer := &tcpServer{ctx: ctx}
   n.waitGroup.Wrap(func() {
      protocol.TCPServer(n.tcpListener, tcpServer, n.logf)
   })
 
   httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
   n.waitGroup.Wrap(func() {
      http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf)
   })
 
 
//////////////////////////////
router.Handle("GET""/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
router.Handle("GET""/info", http_api.Decorate(s.doInfo, log, http_api.V1))
 
 
// v1 negotiate
router.Handle("POST""/pub", http_api.Decorate(s.doPUB, http_api.V1))
router.Handle("POST""/mpub", http_api.Decorate(s.doMPUB, http_api.V1))
router.Handle("GET""/stats", http_api.Decorate(s.doStats, log, http_api.V1))
 
// only v1
router.Handle("POST""/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1))
router.Handle("POST""/topic/delete", http_api.Decorate(s.doDeleteTopic, log, http_api.V1))
router.Handle("POST""/topic/empty", http_api.Decorate(s.doEmptyTopic, log, http_api.V1))
router.Handle("POST""/topic/pause", http_api.Decorate(s.doPauseTopic, log, http_api.V1))
router.Handle("POST""/topic/unpause", http_api.Decorate(s.doPauseTopic, log, http_api.V1))
router.Handle("POST""/channel/create", http_api.Decorate(s.doCreateChannel, log, http_api.V1))
router.Handle("POST""/channel/delete", http_api.Decorate(s.doDeleteChannel, log, http_api.V1))
router.Handle("POST""/channel/empty", http_api.Decorate(s.doEmptyChannel, log, http_api.V1))
router.Handle("POST""/channel/pause", http_api.Decorate(s.doPauseChannel, log, http_api.V1))
router.Handle("POST""/channel/unpause", http_api.Decorate(s.doPauseChannel, log, http_api.V1))
router.Handle("GET""/config/:opt", http_api.Decorate(s.doConfig, log, http_api.V1))
router.Handle("PUT""/config/:opt", http_api.Decorate(s.doConfig, log, http_api.V1))
 
// debug
router.HandlerFunc("GET""/debug/pprof/", pprof.Index)
router.HandlerFunc("GET""/debug/pprof/cmdline", pprof.Cmdline)
router.HandlerFunc("GET""/debug/pprof/symbol", pprof.Symbol)
router.HandlerFunc("POST""/debug/pprof/symbol", pprof.Symbol)
router.HandlerFunc("GET""/debug/pprof/profile", pprof.Profile)
router.Handler("GET""/debug/pprof/heap", pprof.Handler("heap"))
router.Handler("GET""/debug/pprof/goroutine", pprof.Handler("goroutine"))
router.Handler("GET""/debug/pprof/block", pprof.Handler("block"))
router.Handle("PUT""/debug/setblockrate", http_api.Decorate(setBlockRateHandler, log, http_api.PlainText))
router.Handler("GET""/debug/pprof/threadcreate", pprof.Handler("threadcreate"))
//////////////////////////////
 
 
   if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
      httpsServer := newHTTPServer(ctx, true, true)
      n.waitGroup.Wrap(func() {
         http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf)
      })
   }
 
   n.waitGroup.Wrap(n.queueScanLoop)
   n.waitGroup.Wrap(n.lookupLoop)
 
   if n.getOpts().StatsdAddress != "" {
      n.waitGroup.Wrap(n.statsdLoop)
   }
}

 

queueScanLoop 函数的实现

该函数使用若干个worker来扫描并处理当前在投递中以及等待重新投递的消息。worker的个数由配置和当前Channel数量共同决定。

func (n *NSQD) queueScanLoop() {
   workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)
   responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)
   closeCh := make(chan int)
 
   workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
   refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)
 
   channels := n.channels()
   n.resizePool(len(channels), workCh, responseCh, closeCh)
 
   for {
      select {
      case <-workTicker.C:
         if len(channels) == 0 {
            continue
         }
      case <-refreshTicker.C:
         channels = n.channels()
         n.resizePool(len(channels), workCh, responseCh, closeCh)
         continue
      case <-n.exitChan:
         goto exit
      }
 
      num := n.getOpts().QueueScanSelectionCount
      if num > len(channels) {
         num = len(channels)
      }
 
   loop:
      for _, i := range util.UniqRands(num, len(channels)) {
         workCh <- channels[i]
      }
 
      numDirty := 0
      for i := 0; i < num; i++ {
         if <-responseCh {
            numDirty++
         }
      }
 
      if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
         goto loop
      }
   }
 
exit:
   n.logf(LOG_INFO, "QUEUESCAN: closing")
   close(closeCh)
   workTicker.Stop()
   refreshTicker.Stop()
}

 

 

首先,初始化3个gochannel:workCh、responseCh、closeCh,分别控制worker的输入、输出和销毁。

两个定时器 workTicker refreshTicker

然后获取当前的Channel集合,并且调用resizePool函数来启动指定数量的worker。

最后进入扫描的循环。在循环中,等待两个定时器,workTickerrefreshTicker,定时时间分别由由配置中的QueueScanIntervalQueueScanRefreshInterval决定。

这种由等待定时器触发的循环避免了函数持续的执行影响性能,而Golang的特性使得这种机制在写法上非常简洁。

  1. workTicker定时器触发扫描流程。 
    nsqd采用了Redis的probabilistic expiration算法来进行扫描。
    首先从所有Channel中随机选取部分Channel,投到workerChan中,并且等待反馈结果,结果有两种,dirty和非dirty,channel中有数据就认为是dirty,如果dirty的比例超过配置中设定的QueueScanDirtyPercent,那么不进入休眠,继续扫描,如果比例较低,则重新等待定时器触发下一轮扫描。
    这种机制可以在保证处理延时较低的情况下减少对CPU资源的浪费。
  2. refreshTicker定时器触发更新Channel列表流程。 
    这个流程比较简单,先获取一次Channel列表, 
    再调用resizePool重新分配worker。

resizePool的实现

func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
   idealPoolSize := int(float64(num) * 0.25)
   if idealPoolSize < 1 {
      idealPoolSize = 1
   else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
      idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
   }
   for {
      if idealPoolSize == n.poolSize {
         break
      else if idealPoolSize < n.poolSize {
         // contract
         closeCh <- 1
         n.poolSize--
      else {
         // expand
         n.waitGroup.Wrap(func() {
            n.queueScanWorker(workCh, responseCh, closeCh)
         })
         n.poolSize++
      }
   }
}
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
   for {
      select {
      case c := <-workCh:
         now := time.Now().UnixNano()
         dirty := false
         if c.processInFlightQueue(now) {
            dirty = true
         }
         if c.processDeferredQueue(now) {
            dirty = true
         }
         responseCh <- dirty
      case <-closeCh:
         return
      }
   }
}

当需要的worker数量超过之前分配的数量时,通过向closeCh投递消息使多余的worker销毁,如果需要的数量比之前的多,则通过queueScanWorker创建新的worker。

queueScanWorker接收workCh发来的消息,处理,并且通过responseCh反馈消息。收到closeCh时则关闭。由于所有worker都监听相同的closeCh,所以当向closeCh发送消息时,随机关闭一个worker。且由于workChcloseCh的监听是串行的,所以不存在任务处理到一半时被关闭的可能。这也是nsq中优雅关闭gochannel的的一个例子。

workTicker计时器接收responseCh,决定是否继续扫描。

 

c.processInFlightQueue(now)

实现了一个过期消息的优先级队列。

优先级是按照超时时间来排序的,越靠近过期时间,将会越靠前。

func (c *Channel) processInFlightQueueprocessInFlightQueue(t int64) bool {
   c.exitMutex.RLock()
   defer c.exitMutex.RUnlock()
 
   if c.Exiting() {
      return false
   }
 
   dirty := false
   for {
      c.inFlightMutex.Lock()
      //过期消息
      msg, _ := c.inFlightPQ.PeekAndShift(t)
      c.inFlightMutex.Unlock()
 
      if msg == nil {
         goto exit
      }
      dirty = true
      _, err := c.popInFlightMessage(msg.clientID, msg.ID)//删除消息
      if err != nil {
         goto exit
      }
      atomic.AddUint64(&c.timeoutCount, 1)
      c.RLock()
      client, ok := c.clients[msg.clientID]
      c.RUnlock()
      if ok {
         client.TimedOutMessage()
      }
      c.put(msg) //优先级队列
   }
 
exit:
   return dirty
}

 

 

c.processInFlightQueue(now)

实现了一个过期消息的优先级队列。

优先级是按照超时时间来排序的,越靠近过期时间,将会越靠前。

 

func (c *Channel) processDeferredQueue(t int64) bool {
   c.exitMutex.RLock()
   defer c.exitMutex.RUnlock()
 
   if c.Exiting() {
      return false
   }
 
   dirty := false
   for {
      c.deferredMutex.Lock()
      item, _ := c.deferredPQ.PeekAndShift(t)
      c.deferredMutex.Unlock()
 
      if item == nil {
         goto exit
      }
      dirty = true
 
      msg := item.Value.(*Message)
      _, err := c.popDeferredMessage(msg.ID)
      if err != nil {
         goto exit
      }
      c.put(msg)
   }
 
exit:
   return dirty
}

 

两个函数processDeferredQueueprocessInFlightQueue的实现基本一致,那为什么相同的逻辑要实现两次呢。

两个队列,DeferredQueue 用 head 包实现, InFlightQueue 自己又实现了一次heap, 其实跟 DeferredQueue 不是一样的么?

  之前两个就真是是一样的, 后来有一个提交,里面的注释是: this eliminates the use of container/heap and the associated cost of boxing and interface type assertions.

https://github.com/nsqio/nsq/commit/74bfde101934700cb0cd980d01b6dfe2fe5a6a53

  意思就是说, 这些 队列里 存的是 Message 这个类型, 如果使用 heap, 需要存到 heap.Item 的 Value 里,而这个value 是一个 interface{} , 赋值 和 取值 都需要做类型推断 和 包装,那么作为 InFlightQueue 这个 “高负荷” 的队列, 减少这种 “类型推断和包装” , 有利于提高性能。

 

nsqd的退出

基于github.com/judwhite/go-svc/svc这个包做的守护进程项目。

 

func main() {
   prg := &program{}
   if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
      log.Fatal(err)
   }
}
 
 
func (p *program) Init(env svc.Environment) error {
}
 
 
func (p *program) Start() error {
}
 
 
func (p *program) Stop() error {
   if p.nsqd != nil {
      p.nsqd.Exit()
   }
   return nil
}

 

svc设置了两个信号量

SIGINT
程序终止(interrupt)信号, 在用户键入INTR字符(通常是Ctrl-C)时发出,用于通知前台进程组终止进程

SIGTERM
程序结束(terminate)信号, 与SIGKILL不同的是该信号可以被阻塞和处理。通常用来要求程序自己正常退出,shell命令kill缺省产生这个信号

p.nsqd.Exit()

1.关闭一个tcp服务,用于监听tcp连接

2.关闭一个协程,用于监听http请求

3.可选,如果存在tls信息,则关闭这个https的服务,用于监听https请求

4.将内存中的topics及其channels等信息持久到nsq.dat文件中。

5.关闭所有的topic停止生产信息

6.sync.WaitGroup.wait等待所有的协程执行完成