RocketMQ源码阅读

发布时间 2023-08-14 15:36:20作者: yudoge

img

RocketMQ中的领域概念,RTFM!!!!

作为一个MQ的设计者,如何路由用户的消息

img

项目中肯定不止一台MQ broker在服务,如果生产者和我一样路痴,它会迷路的!如何路由生产者消息?

我们貌似也需要一个类似注册中心的东西,它需要

  1. 记录每个broker上的topic
  2. 了解各个broker的状态
  3. 负载均衡,如果有多个可以投递的broker,选择最优者

所以MQ系统中肯定有一种用于在集群中路由消息的角色,在RocketMQ中,这玩意儿就是NameServer

MQ 解决办法
RabbitMQ 抽象出Exchanger用于消息路由,集群设备间共享Exchanger元数据,集群内部的机器根据共享的元数据可以帮助路由消息。客户端持有所有MQ实例地址。
Kafka 之前使用Zookeeper来管理集群broker信息,后面我也不知道了…
RocketMQ NameServer服务器

你会怎么设计Namesrv?

img

现在MQ里有了这些组件,快给他们连上线吧!它们至少要按如下方式协作

  1. Broker启动时肯定要和Namesrv上报自己的topic信息
  2. Producer以及Consumer肯定要从Namesrv拉取各个broker中的topic信息,知道自己该把消息投送到哪
  3. Namesrv不应该是单体的,它存在的目的就是提供单体broker无法提供的高可用和高负载,它不应该成为瓶颈

如何协作才合理?

  1. Producer和Consumer不可能每条消息发送前都找Namesrv拿最新的信息吧,应该以固定的频率(30s)
  2. 这样一来Producer肯定也得知道Broker的状态吧,它是不是已经死了
  3. Broker肯定也要按固定时间向Namesrv上报自己的最新状态(30s)
  4. Namesrv之间的数据暂时不一致也不影响啥吧,它们甚至没必要相互通信同步数据,Producer拿到失效的数据导致投送失败,再试一次别的呗

在MQ的设计里,broker和namesrv没必要(有些时候也可以说是没办法)解决所有问题,可以适当摆烂,并把问题推给Producer和Consumer(在SDK中提供)

可以适当看点Namesrv的代码了

rocketmq中和NameServer相关的代码都在namesrv这个模块里,它的代码很简单

img

看下主类有的方法,大概知道有做一些命令行参数和配置文件的解析,以及NameServer服务器的启停,而且用到了Netty:

img

我想干活我想干活先不看了先不看了

这个类没啥可看的,不是核心逻辑,看到NamesrvController

// 初始化流程
public boolean initialize() {
      loadConfig();  // 加载配置
      initiateNetworkComponents(); // 初始化网络组件
      initiateThreadExecutors(); // 初始化线程执行器(线程池)
      registerProcessor(); // 注册处理器
      startScheduleService(); // 开始定时任务
      initiateSslContext(); // 初始化SSL上下文
      initiateRpcHooks(); // 初始化Rpc钩子
      return true;
  }
// 注册处理器
private void registerProcessor() {
    // 如果开启了集群测试功能,注册对应的processor
    if (namesrvConfig.isClusterTest()) {
        this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()), this.defaultExecutor);
    } else {
        // 注册客户端请求处理器来处理GET_ROUTEINTO_BY_TOPIC请求
        ClientRequestProcessor clientRequestProcessor = new ClientRequestProcessor(this);
        this.remotingServer.registerProcessor(RequestCode.GET_ROUTEINFO_BY_TOPIC, clientRequestProcessor, this.clientRequestExecutor);
        // 注册默认的请求处理器
        this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.defaultExecutor);
    }
}

看代码只看自己目前关注的,所以只需要注意到ClientRequestProcessor用来处理topic路由,DefaultRequestProcessor处理其它到Namesrv的请求即可,RequestCode 中可以查看所有的请求类型。

private void startScheduleService() {
    // 扫描非活跃broker的定时任务
    this.scanExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker,
        5, this.namesrvConfig.getScanNotActiveBrokerInterval(), TimeUnit.MILLISECONDS);

    // maybe周期性打印kv配置的定时任务?
    this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically,
        1, 10, TimeUnit.MINUTES);

    // maybe周期性打印水位信息的定时任务?
    this.scheduledExecutorService.scheduleAtFixedRate(() -> {
        try {
            NamesrvController.this.printWaterMark();
        } catch (Throwable e) {
            LOGGER.error("printWaterMark error.", e);
        }
    }, 10, 1, TimeUnit.SECONDS);
}

这里有一个需要关注的:扫描非活跃broker的方法:routeInfoManager::scanNotActiveBroker ,它会被一个周期调度的Executor定时执行。

这里我们就针对处理用户的路由请求以及扫描非活跃broker两个点来阅读代码,在DefaultRequestProcessor 中的请求处理方法processRequest 里有针对各种类型消息的处理函数调用,和我们要研究的功能相关的大概有下面几个:

case RequestCode.REGISTER_BROKER:
    return this.registerBroker(ctx, request);
case RequestCode.UNREGISTER_BROKER:
    return this.unregisterBroker(ctx, request);
case RequestCode.BROKER_HEARTBEAT:
    return this.brokerHeartbeat(ctx, request);
case RequestCode.GET_BROKER_MEMBER_GROUP:
    return this.getBrokerMemberGroup(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO:
    return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
    return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.ADD_WRITE_PERM_OF_BROKER:
    return this.addWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
    return this.getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
    return this.deleteTopicInNamesrv(ctx, request);
case RequestCode.REGISTER_TOPIC_IN_NAMESRV:
		return this.registerTopicToNamesrv(ctx, request);

BROKER_HEARTBEAT 请求是broker会给namesrv按频率发送的心跳请求,namesrv可以根据此来维护一个brokerLiveTable

public RemotingCommand brokerHeartbeat(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final BrokerHeartbeatRequestHeader requestHeader =
        (BrokerHeartbeatRequestHeader) request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);

    // RouteInfoManager中的brokerLiveTable维护了
		// broker的存活状态,这里更新brokerLiveTable中的broker信息
    this.namesrvController.getRouteInfoManager().updateBrokerInfoUpdateTimestamp(requestHeader.getClusterName(), requestHeader.getBrokerAddr());

    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}

// RouteInfoManager,实际上就是设置最后更新的时间戳
public void updateBrokerInfoUpdateTimestamp(final String clusterName, final String brokerAddr) {
    BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
    BrokerLiveInfo prev = this.brokerLiveTable.get(addrInfo);
    if (prev != null) {
        prev.setLastUpdateTimestamp(System.currentTimeMillis());
    }
}

REGIST_TOPIC_TO_NAMESRV 是broker向mq注册自己的topic的请求,和heartbeat差不多,都是委托namesrvControllerRouteInfoManager去维护实际内容,所以直接进入RouteInfoManager ,其中维护了一个topicQueueTable 保存了如下二级映射,Topic名→{brokername→queuedata}。用人话说就是每一个Topic下有一些队列,但是这些队列可能来自不同的broker,所以是一个broker到queuedata的映射。

img

// 该方法拿到的是topic名和所有要注册到其名下的数据
public void registerTopic(final String topic, List<QueueData> queueDatas) {
    if (queueDatas == null || queueDatas.isEmpty()) {
        return;
    }
    //

    try {
        // 上写锁
        this.lock.writeLock().lockInterruptibly();
        // 若已经有该topic的数据了
        if (this.topicQueueTable.containsKey(topic)) {
            Map<String, QueueData> queueDataMap  = this.topicQueueTable.get(topic);
            for (QueueData queueData : queueDatas) {
                // 如果该broker已经注册过
                if (!this.brokerAddrTable.containsKey(queueData.getBrokerName())) {
                    log.warn("Register topic contains illegal broker, {}, {}", topic, queueData);
                    return;
                }
                // 向queueDataMap中插入queueData
                queueDataMap.put(queueData.getBrokerName(), queueData);
            }
            log.info("Topic route already exist.{}, {}", topic, this.topicQueueTable.get(topic));
        } else {
            // 构造queueDataMap
            Map<String, QueueData> queueDataMap = new HashMap<>();
            for (QueueData queueData : queueDatas) {
                if (!this.brokerAddrTable.containsKey(queueData.getBrokerName())) {
                    log.warn("Register topic contains illegal broker, {}, {}", topic, queueData);
                    return;
                }
                queueDataMap.put(queueData.getBrokerName(), queueData);
            }

            // 向topicQueueTable中新增
            this.topicQueueTable.put(topic, queueDataMap);
            log.info("Register topic route:{}, {}", topic, queueDatas);
        }
    } catch (Exception e) {
        log.error("registerTopic Exception", e);
    } finally {
        this.lock.writeLock().unlock();
    }
}

GET_ALL_TOPIC_LIST_FROM_NAMESERVER 是客户端向namesrv发起的请求,用于获取topic列表(也是只看RouteInfoManager的:

**public TopicList getAllTopicList() {
    TopicList topicList = new TopicList();
    try {
        this.lock.readLock().lockInterruptibly();
        topicList.getTopicList().addAll(this.topicQueueTable.keySet());
    } catch (Exception e) {
        log.error("getAllTopicList Exception", e);
    } finally {
        this.lock.readLock().unlock();
    }

    return topicList;
}**

从代码中可以看出,rocketmq维护了几张表来做这些工作:

  1. brokerAddrTable:broker的地址到broker的一些元信息的映射
  2. brokerLiveTable:broker存活信息
  3. topicQueueTable:主题到队列数据的映射

还没有看如何断定一个broker非活跃呢,现在只是把这些数据注册了进去。在最开始的时候,NamesrvController 里注册了三个定时任务,其中的第一个是:

private void startScheduleService() {
    // 扫描非活跃broker的定时任务
    this.scanExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker,
        5, this.namesrvConfig.getScanNotActiveBrokerInterval(), TimeUnit.MILLISECONDS);
}

该任务会周期性扫描brokerLiveTable ,然后判断它上一次heartbeat时间与当前时间的差是否大于设置好的间隔时间,如果是,就destroy它的channel:

public void scanNotActiveBroker() {
    try {
        log.info("start scanNotActiveBroker");
        for (Entry<BrokerAddrInfo, BrokerLiveInfo> next : this.brokerLiveTable.entrySet()) {
            long last = next.getValue().getLastUpdateTimestamp();
            long timeoutMillis = next.getValue().getHeartbeatTimeoutMillis();
            if ((last + timeoutMillis) < System.currentTimeMillis()) {
                RemotingHelper.closeChannel(next.getValue().getChannel());
                log.warn("The broker channel expired, {} {}ms", next.getKey(), timeoutMillis);
                this.onChannelDestroy(next.getKey());
            }
        }
    } catch (Exception e) {
        log.error("scanNotActiveBroker exception", e);
    }
}

onChannelDestroy 会给BatchUnregistrationService提交一个UnRegisterBroker请求,这个类管理所有的这种请求,包括broker主动的unregist。该类维护了一个阻塞队列,然后用一个线程不断的从阻塞队列中拿Unregist请求,调用RouteInfoManager 进行解注册

@Override
public void run() {
    while (!this.isStopped()) {
        try {
            final UnRegisterBrokerRequestHeader request = unregistrationQueue.take();
            Set<UnRegisterBrokerRequestHeader> unregistrationRequests = new HashSet<>();
            unregistrationQueue.drainTo(unregistrationRequests);

            // Add polled request
            unregistrationRequests.add(request);

            this.routeInfoManager.unRegisterBroker(unregistrationRequests);
        } catch (Throwable e) {
            log.error("Handle unregister broker request failed", e);
        }
    }

关于为啥这里非要一个阻塞队列,我想该类的注释中的这句话就是原因:BatchUnregistrationServer provides a mechanism to unregister brokers in batch manner, which speeds up broker-offline process.

RouteInfoManager中,解注册的代码有点复杂,但总体来说就是将它维护的那几张表里关于该broker的信息移除掉,想干活不想看代码所以我就不继续分析了。

gracefully close

通过给runtime添加shutdownhook,实现优雅关闭

public static NamesrvController start(final NamesrvController controller) throws Exception {
		// 省略...
    Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {
        controller.shutdown();
        return null;
    }));
    controller.start();
    return controller;
}

小结

  1. broker启动时会向namesrv发送一个registBroker消息,携带了它的topic信息,namesrv会向brokerAddrTable、brokerLiveTable、topicQueueTable等表中注册broker的信息
  2. broker每隔一段时间会向namesrv发送一个heartbeat消息,namesrv会更新brokerLiveTable(最近更新时间戳)
  3. namesrv中有一个定时任务(10秒一次),扫描brokerLiveTable中的数据,发现没有及时heartbeat的broker,unregist它
  4. producer每隔一段时间会向namesrv发送getAllTopic请求获取所有topic列表

Producer发送消息

你希望Producer如何发送消息?

  1. 消息发送可能因为如下原因失败,我希望无论如何我的消息都不会丢失,不会被发送多次:

    1. 选择的broker死了
    2. 发送时网络中的某一环节出了问题
    3. broker已经接到消息,但响应时网络出了问题,producer没接到成功响应,可能会重复发送消息,造成重复消费风险

    对于ab理想情况下,producer应该有重试机制,确保当ab发生时消息也可以被发送。c怎么解决?

    如果MQ是单体的,只有一个broker,那就只需要给消息加一个唯一id即可,broker发现这条消息已经接受过了,就会知道这是上一次我给producer返回的“发送成功”没被接到,然后再给producer返回一个“发送成功”并忽略这条冗余消息即可(回到了网络课),但在集群环境中,producer第一次给一个broker发送消息,无响应,它重试时几乎不会再选择该broker发送,它会投递到其他broker,对于其它broker来说,这个id是新的,这样一个消息就在mq集群中重复了。所以指望MQ来解决这个问题几乎没啥可能,如果需要一个精确消费的语义,一般都是在业务上做。

  2. 有时我希望消息的实时性,所以我想立即发送消息

  3. 有时我希望消息的吞吐量,所以我想批量发送消息

可以看一点Producer的代码了

producer家族的结构如下:

img

MQProducer有如下行为:

img

MQAdmin有如下行为:

img

所以,大体上来看,Producer:

  1. 有自己的生命周期(start、shutdown)
  2. 可以以多种形式发送消息(同步、异步、oneway、批量发送、指定mq发送)
  3. request好像是给rpc用的,我不理解是什么样的rpc
  4. 可以获取一个主题下的消息队列列表
  5. 具备创建主题、获取offset等mq管理功能

我们将主要精力放在DefaultMQProducer的同步消息发送上

MQProducer并没有直接实现和broker通信的逻辑,而是委托DefaultMQImplDefaultMQImpl又分别委托MQClientAPI来做发消息的工作,委托MQAdminImpl来做MQ管理相关的工作

img

  1. MQClientAPIImpl:封装了到broker的rpc调用,它不关心如何选择broker,不关心失败重试等各种机制,只用来rpc调用
  2. DefaultMQProducerImpl:封装消息发送的主要逻辑,如失败重试

sendMessage的核心:RPC调用

MQClientAPIImpl中有一个核心方法,所有发送消息的工作最终都会走到该方法,它封装了不同形式的消息对broker的rpc调用:

public SendResult sendMessage(
    final String addr,
    final String brokerName,
    final Message msg,
    final SendMessageRequestHeader requestHeader,
    final long timeoutMillis,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final TopicPublishInfo topicPublishInfo,
    final MQClientInstance instance,
    final int retryTimesWhenSendFailed,
    final SendMessageContext context,
    final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
    long beginStartTime = System.currentTimeMillis();
    // 获取消息的一些属性
    RemotingCommand request = null;
    String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
    boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
    // 根据是否是回复消息、是否是批量消息、是否是智能消息,创建不同的request对象
    if (isReply) {
        if (sendSmartMsg) {
            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
        } else {
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
        }
    } else {
        if (sendSmartMsg || msg instanceof MessageBatch) {
            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
        } else {
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
        }
    }
    request.setBody(msg.getBody());

    // 根据通信模式,调用不同的接口
    switch (communicationMode) {
        case ONEWAY:
            // 如果是oneway,就是发完后啥也不用管,直接rpc调用
            this.remotingClient.invokeOneway(addr, request, timeoutMillis);
            return null;
        case ASYNC:
            // 异步消息,先检测刚刚的初始化过程是否已经导致消息超时,如果超时,就抛出异常
            final AtomicInteger times = new AtomicInteger();
            long costTimeAsync = System.currentTimeMillis() - beginStartTime;
            if (timeoutMillis < costTimeAsync) {
                throw new RemotingTooMuchRequestException("sendMessage call timeout");
            }
            // 调用sendMessageAsync
            this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
                retryTimesWhenSendFailed, times, context, producer);
            return null;
        case SYNC:
            // 同步消息,也先检测超时
            long costTimeSync = System.currentTimeMillis() - beginStartTime;
            if (timeoutMillis < costTimeSync) {
                throw new RemotingTooMuchRequestException("sendMessage call timeout");
            }
            // 调用sendMessageSync
            return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
        default:
            assert false;
            break;
    }

    return null;
}

oneway消息不关心结果,所以返回null;异步消息通过SendCallback返回结果,同步消息返回sendMessageSync调用的结果,同步消息的代码如下:

private SendResult sendMessageSync(
    final String addr,
    final String brokerName,
    final Message msg,
    final long timeoutMillis,
    final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
    // 到broker的rpc调用
    RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
    // 响应必须不为null
    assert response != null;
    // 对响应进行处理,并返回给调用者
    return this.processSendResponse(brokerName, msg, response, addr);
}

重试机制

DefaultMQProducerImpl中封装了消息发送的重试机制:

private SendResult sendDefaultImpl(
    Message msg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    this.makeSureStateOK();
    Validators.checkMessage(msg, this.defaultMQProducer);
    final long invokeID = random.nextLong();
    long beginTimestampFirst = System.currentTimeMillis();
    long beginTimestampPrev = beginTimestampFirst;
    long endTimestamp = beginTimestampFirst;
    // 查找消息对应的主题信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        boolean callTimeout = false;
        MessageQueue mq = null;
        Exception exception = null;
        SendResult sendResult = null;
        // 同步消息 timesTotal=1+重试次数次
        // 其它消息 timesTotal=1
        // 这不代表异步消息没有重试,它只是不在这个循环里重试
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
        int times = 0;
        // 每次尝试发送的broker列表
        String[] brokersSent = new String[timesTotal];
        for (; times < timesTotal; times++) {
            // 上次发送的broker
            String lastBrokerName = null == mq ? null : mq.getBrokerName();
            // 选择一个消息队列(传入上一次的broker名,因为上一次发送失败这次大概率还是失败,避免选择到它)
            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
            if (mqSelected != null) {
                mq = mqSelected;
                // 填充broker列表
                brokersSent[times] = mq.getBrokerName();
                try {
                    beginTimestampPrev = System.currentTimeMillis();
                    if (times > 0) {
                        //Reset topic with namespace during resend.
                        msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                    }
                    long costTime = beginTimestampPrev - beginTimestampFirst;
                    if (timeout < costTime) {
                        callTimeout = true;
                        break;
                    }

                    // 发送消息
                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                    endTimestamp = System.currentTimeMillis();
                    // 更新FaultItem
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    // 其它消息直接跳出,同步消息需要根据发送是否成功来判断是否重试
                    switch (communicationMode) {
                        case ASYNC:
                            return null;
                        case ONEWAY:
                            return null;
                        case SYNC:
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                    continue;
                                }
                            }

                            return sendResult;
                        default:
                            break;
                    }
                // 出错,重试,或者某些情况下向上抛出异常
                } catch (RemotingException | MQClientException e) {
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e);
                    if (log.isDebugEnabled()) {
                        log.debug(msg.toString());
                    }
                    exception = e;
                    continue;
                } catch (MQBrokerException e) {
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e);
                    if (log.isDebugEnabled()) {
                        log.debug(msg.toString());
                    }
                    exception = e;
                    if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {
                        continue;
                    } else {
                        if (sendResult != null) {
                            return sendResult;
                        }

                        throw e;
                    }
                } catch (InterruptedException e) {
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    log.warn("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e);
                    if (log.isDebugEnabled()) {
                        log.debug(msg.toString());
                    }
                    throw e;
                }
            } else {
                break;
            }
        }
        // 同步正确返回
        if (sendResult != null) {
            return sendResult;
        }
        String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
            times,
            System.currentTimeMillis() - beginTimestampFirst,
            msg.getTopic(),
            Arrays.toString(brokersSent));

        info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

        // 同步发送异常,根据exception封装异常代码
        MQClientException mqClientException = new MQClientException(info, exception);
        if (callTimeout) {
            throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
        }

        if (exception instanceof MQBrokerException) {
            mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
        } else if (exception instanceof RemotingConnectException) {
            mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
        } else if (exception instanceof RemotingTimeoutException) {
            mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
        } else if (exception instanceof MQClientException) {
            mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
        }

        throw mqClientException;
    }

    validateNameServerSetting();

    throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
        null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}

选择消息队列

selectOneMessageQueue怎么选择要发送的消息队列?

下面是selectOneMessageQueue的部分代码,我们分两个阶段看,第一个阶段是先从所有该topic的消息队列中轮询选择一个,并且尽量避免选择lastBrokerName(因为它已经失败了):

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    // 如果开启了失败延迟规避机制
    if (this.sendLatencyFaultEnable) {
        try {
            // 阶段1:
            // 先从tpInfoGetMessageQueueList轮询选择
            // 不选择上一次选过的broker(因为它失败了),并且选择一个latencyFaultTolerance认为可用的broker
            int index = tpInfo.getSendWhichQueue().incrementAndGet();
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                int pos = index++ % tpInfo.getMessageQueueList().size();
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                if (!StringUtils.equals(lastBrokerName, mq.getBrokerName()) && latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                    return mq;
                }
            }

            // 省略第二阶段...
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }

        return tpInfo.selectOneMessageQueue();
    }

    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

比较有趣的是,这个代码在rocketmq的早期版本(至少在4.7.0)不是这样的,而是这样的:

int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
    if (pos < 0)
        pos = 0;
    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
            return mq;
    }
}

这段轮询的最终目的就是判断上一次失败的broker是否已经available,如果是,就选中它,否则直到循环结束,进入下一阶段。认同早期写法的人们的观点是应该尽量保证各个broker的loadbalance,即如果上次失败的broker已经available,就优先选择它。

但不论如何,首先最内层的if判断被认为是冗余代码而被删除掉了,最后逻辑也改成了尽量不选上次失败的broker,而不管它是否已经可用。所以这一段代码先后被打上冗余代码和bug的标签,即使从最开始的视角来看好像逻辑也确实没什么问题(也没什么必要)。这些争论在rocketmq官方github repo下有很多issue,这里提供其中一个

回到主线,第一个阶段以选择一个所在broker可用的mq并规避lastBrokerName为目标,但它最终有可能以选不出mq告终,这就来到了第二个阶段。第二个阶段的代码有些看不懂,需要先来了解下LatencyFaultTolerance的工作模式。

LatencyFaultTolerance的默认实现中维护了这样一个Map,它是brokerName到FaultItem的映射,而FaultItem中包含当前延迟以及开始时间戳两个字段:

private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<>(16);

class FaultItem implements Comparable<FaultItem> {
    private final String name;
    private volatile long currentLatency;
    private volatile long startTimestamp;

    // 当前时间-开始时间戳 >= 0 --> available
    public boolean isAvailable() {
        return (System.currentTimeMillis() - startTimestamp) >= 0;
    }
}

根据上面的return公式,大概可以断定开始时间戳的定义是开始认为broker已经可用的时间,所以当当前时间超过这个时间就认为可用,updateFaultItem中的逻辑也大致可以看出来,其参数notAvailableDuration是一个外部传入的计算值,即预估多久后broker会available。

// 会在每次消息投递时,根据投递情况被调用,用于根据当前投递延迟更新一个预估的broker可用时间
@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
    FaultItem old = this.faultItemTable.get(name);
    if (null == old) {
        final FaultItem faultItem = new FaultItem(name);
        faultItem.setCurrentLatency(currentLatency);
        faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

        old = this.faultItemTable.putIfAbsent(name, faultItem);
        if (old != null) {
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
    } else {
        old.setCurrentLatency(currentLatency);
        old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
    }
}

先不管外面是咋计算的,只能说isAvailable()真了,broker也不一定为真,所以第一阶段老版本那个写法还挺抽象的,它大概率会引发bug(即总向一个不可用的broker重复投递)。

回到重复投递时选择消息队列的第二阶段:

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    // 如果开启了失败延迟规避机制
    if (this.sendLatencyFaultEnable) {
        try{
            // 省略第一阶段...
            
            // 第二阶段
            // 先让latencytFaultTolerance根据手中的各种延迟情况以及预估的可用情况
            // 选出一个broker,latencyFaultTolerance的逻辑是在比较优秀的前一半里
            // 随机选一个出来,这里不深入。所以这个变量叫notBestBroker,它不一定是最好的
            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            // 如果写队列数大于0
            int writeQueueNums = tpInfo.getWriteQueueNumsByBroker(notBestBroker);
            if (writeQueueNums > 0) {
                // 使用这个notBestBroker中的一个writeQueue发送
                // 这段代码写的很奇怪,让tpInfo选一个mq,然后把这个mq实体改写成notBestBroker
                // 中的一个mq
                // 可能是MessageQueue没有一个合适的创建方式吧,用这种方法创建一个?那原来的不就被改变了吗??
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                }
                return mq;
            } else {
                // 写队列数小于等于0,在latencyFaultTolerance中移除它
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }

        // 如果第二阶段还没选出来,就tpInfo选一个
        return tpInfo.selectOneMessageQueue();
    }
    // 如果没开失败延迟规避,直接选一个,但规避上一次失败的
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

小结

RocketMQ通过在Producer端引入重试机制来避免消息投递过程中发生问题导致投递失败的情况

todo

  1. 定时/延时消息
  2. 顺序消息
  3. 事务消息

如何安全投递消息

如何安全消费消息

IO存储机制

mmap

namesrv设计