RocketMQ - 顺序消息/事务消息/延迟消息

发布时间 2023-08-07 17:32:17作者: archaique

顺序消息

生产端顺序生产

消费端顺序消费

一般都是局部顺序消息。生产端根据 shardingkey 对队列数量取模,把同一个 shardingkey 的消息发送到同一个队列

而消费端也要确保消费这个队列时是一个线程消费的

首先是 consumer 中注册的 Listener 来指定是顺序消息消费还是并发消费

public class PushConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("C-CLOUD-UPSTREAM-YSS");
        consumer.subscribe("MEDIA_MESSAGE_UPSTREAM_YSS", "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //wrong time format 2017_0422_221800
        consumer.setConsumeTimestamp("20181109221800");
        consumer.setNamesrvAddr("10.10.168.3:10812");
        // MessageListenerOrderly 或 MessageListenerConcurrently
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

PushConsumer 拉取到的消息提交到线程池后,会根据注册的 Listener 类型来决定是由 ConsumeMessageConcurrentlyService 还是 ConsumeMessageOrderlyService 来处理

if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
     this.consumeOrderly = true;
     this.consumeMessageService =
     new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
     this.consumeOrderly = false;
     // 并发消费服务
     this.consumeMessageService =
     new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
          
// 启动 并发消费/顺序消费 服务的定时任务线程池
this.consumeMessageService.start();

ConsumeMessageOrderlyService 

广播模式下,订阅组下所有的队列都要被所有的消费者消费,所以不存在问题

集群模式下,要保证消费时的顺序性,就要 同一时刻,一个消费队列只能被一个消费者中的一个线程消费。用加锁来实现:

  • 定时在 broker 对队列加 clientId 锁,保证rebalance时一个队列也只能被一个消费者消费
  • consumer 端对 MessageQueue 加锁,保证当前线程占有该队列
  • consumer 端对 ProcessQueue 加锁,保证当前线程占有该队列

注:

ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable 是当前消费者订阅的topic分配给当前消费者的所有队列

MessageQueue,ProcessQueue 存的都是当前消费者订阅的topic分配给当前消费者的队列,但是存的内容侧重点有所不同

MessageQueue: (topic,brokerName,queueId) 主要存队列与broker的关系;

ProcessQueue: 存队列在消费者端的一些状态等

1.定时在 broker 对队列加 clientId 锁,保证rebalance时一个队列也只能被一个消费者消费

org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#start

初始化了一个定时线程池,然后在ConsumeMessageOrderlyService启动的时候,创建了一个任务,1s执行一次lockMQPeriodically这个方法就是给当前这个客户端所消费的所有队列去borker进行上锁

这里的客户端是指订阅了某个 topic 并且指定了顺序消费的客户端,全部队列是指这个 topic 下的所有队列中,分配给这个消费者的所有队列。

我们知道rocketmq集群模式一般都是队列粒度的负载均衡,已经是一个队列只能被一个消费者消费的,那为什么还要在 broker 对队列加上client 的锁呢?

分配给这个消费者的所有队列在发生 rebanlance 时可能会被指派给别的消费者,在 rebalance 时可能会有两个消费者,所以要去 broker 上锁,保证 rebalance 前后只有获取了独占锁的消费者才可以消费。

public void start() {
        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        ConsumeMessageOrderlyService.this.lockMQPeriodically();
                    } catch (Throwable e) {
                        log.error("scheduleAtFixedRate lockMQPeriodically exception", e);
                    }
                }
            }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
        }
    }

上锁过程:

  1. 将该消费者订阅的 topic 分配给当前消费者的所有队列,按照 broker 进行分组 HashMap<String, Set<MessageQueue>> brokerMqs 内容为 brockerName-该broker下该消费者消费的所有队列
  2. 遍历上面根据 broker 分组过的 brokerMqs ,按照不同的 broker,对一个 broker 下该消费者被分配的所有队列,一次性向这个 broker 申请加锁
  3. 如果加锁成功, ProcessQueue 的 isLocked 设为 true,否则设为 false

注:

ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable 是当前消费者订阅的topic分配给当前消费者的所有队列

MessageQueue,ProcessQueue 存的都是当前消费者订阅的topic分配给当前消费者的队列,但是存的内容侧重点有所不同

MessageQueue: (topic,brokerName,queueId) 主要存队列与broker的关系;

ProcessQueue: 存队列在消费者端的一些状态等

public abstract class RebalanceImpl {
  public void lockAll() {
        // ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable 是当前消费者订阅的topic分配给当前消费者的所有队列
        // MessageQueue: (topic,brokerName,queueId) 主要存队列与broker的关系;
        // ProcessQueue: 存队列在消费者端的一些状态等
        // 用这个 processQueueTable 构建 brokerMqs: brockerName-该broker下该消费者消费的所有队列
        HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();

        Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator();
        // 遍历 map ,即根据 broker 对这个 broker 分配给这个消费者的所有队列,一次性发送加锁请求
        while (it.hasNext()) {
            Entry<String, Set<MessageQueue>> entry = it.next();
            final String brokerName = entry.getKey();
            final Set<MessageQueue> mqs = entry.getValue();

            if (mqs.isEmpty()) {
                continue;
            }

            // 找到 broker
            FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
            if (findBrokerResult != null) {
                // 构建加锁请求体
                LockBatchRequestBody requestBody = new LockBatchRequestBody();
                requestBody.setConsumerGroup(this.consumerGroup);
                requestBody.setClientId(this.mQClientFactory.getClientId());
                // 对这个 broker 分配给这个消费者的所有队列,一次性发送加锁请求
                requestBody.setMqSet(mqs);

                try {
                    // 向 broker 请求加锁,返回的 lockOKMQSet 是加锁成功的 MessageQueue
                    Set<MessageQueue> lockOKMQSet =
                        this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);

                    for (MessageQueue mq : mqs) {
                        // ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable 是当前消费者订阅的topic分配给当前消费者的所有队列
                        // MessageQueue: (topic,brokerName,queueId) 主要存队列与broker的关系;
                        // ProcessQueue: 存队列在消费者端的一些状态等
                        ProcessQueue processQueue = this.processQueueTable.get(mq);
                        if (processQueue != null) {
                            // 如果加锁成功
                            if (lockOKMQSet.contains(mq)) {
                                if (!processQueue.isLocked()) {
                                    log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);
                                }
                                // processQueue Locked 状态为 true
                                processQueue.setLocked(true);
                                processQueue.setLastLockTimestamp(System.currentTimeMillis());
                            }
                            // 如果加锁失败
                            else {
                                // processQueue Locked 状态为 false
                                processQueue.setLocked(false);
                                log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup, mq);
                            }
                        }
                    }
                } catch (Exception e) {
                    log.error("lockBatchMQ exception, " + mqs, e);
                }
            }
        }
    }
}

broker 端:

下面这段代码,对里面的逻辑进行整合概括就是:或者执行时不再包含被重新分配的队列,也就不会对这些已被自己加锁的队列进行续期。那新分配的消费者判断队列锁已经过期,就可以该队列加锁成功。

  • 被当前请求的client加锁的队列:进行续期,更新最新加锁时间
  • 没有被任何client加锁的队列:加上当前请求的client锁,更新最新加锁时间
  • 被其它client加锁的队列:判断是否过期,如果过期,将加锁client替换为当前请求的client,并进行续期,更新最新加锁时间

上面的第三点就保证了,如果rebalance服务将某些队列分配了新的消费者,而旧的消费者不再每秒执行一次(挂掉时)定时去broker加队列锁任务lockMQPeriodically,或者执行时请求的队列不再包含被重新分配的队列,也就不会对这些之前被自己加锁的队列进行续期。那新分配的消费者判断这些队列上的队列锁已经过期,就可以对它们加锁成功。

public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs,
        final String clientId) {
        Set<MessageQueue> lockedMqs = new HashSet<>(mqs.size());
        Set<MessageQueue> notLockedMqs = new HashSet<>(mqs.size());

        // mqs 是 requestBody 的客户端所有请求加锁的队列
        for (MessageQueue mq : mqs) {
            // 进行分类:
            // this.isLocked 还会在里面对已经被clientId加锁了的,进行续期,更新最新加锁时间
            if (this.isLocked(group, mq, clientId)) {
                // 已经被请求的client加锁的队列
                lockedMqs.add(mq);
            } else {
                // 没有被请求的client加锁的队列,包括两类:
                // 1.没有被任何client加锁的队列
                // 2.被其它client而不是请求的client加锁的队列
                notLockedMqs.add(mq);
            }
        }


        if (!notLockedMqs.isEmpty()) {
            try {
                // 对下面这段代码加同步锁
                this.lock.lockInterruptibly();
                try {
                    ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
                    if (null == groupValue) {
                        groupValue = new ConcurrentHashMap<>(32);
                        this.mqLockTable.put(group, groupValue);
                    }
                    // 遍历没有被请求的client加锁的队列,包括两类:
                    // 1.没有被任何client加锁的队列
                    // 2.被其它client而不是请求的client加锁的队列
                    for (MessageQueue mq : notLockedMqs) {
                        LockEntry lockEntry = groupValue.get(mq);

                        // 队列没有被任何client锁上,加上属于该client的锁
                        if (null == lockEntry) {
                            lockEntry = new LockEntry();
                            lockEntry.setClientId(clientId);
                            groupValue.put(mq, lockEntry);
                            log.info(
                                "RebalanceLockManager#tryLockBatch: lock a message which has not been locked yet, "
                                    + "group={}, clientId={}, mq={}", group, clientId, mq);
                        }


                        // 1.没有被任何client加锁的队列,在上面被当前请求的client加锁成功
                        // 这种情况需要进行续期,更新最新加锁时间
                        if (lockEntry.isLocked(clientId)) {
                            lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
                            lockedMqs.add(mq);
                            continue;
                        }

                        String oldClientId = lockEntry.getClientId();

                        // 2.被其它client而不是请求的client加锁的队列
                        // 这种情况需要检查锁是否过期, 如果过期, 将client替换为当前请求的client,并更新加锁时间
                        if (lockEntry.isExpired()) {
                            lockEntry.setClientId(clientId);
                            lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
                            log.warn(
                                "RebalanceLockManager#tryLockBatch: try to lock a expired message queue, group={}, "
                                    + "mq={}, old client id={}, new client id={}", group, mq, oldClientId, clientId);
                            lockedMqs.add(mq);
                            continue;
                        }

                        log.warn(
                            "RebalanceLockManager#tryLockBatch: message queue has been locked by other client, "
                                + "group={}, mq={}, locked client id={}, current client id={}", group, mq, oldClientId,
                            clientId);
                    }
                } finally {
                    this.lock.unlock();
                }
            } catch (InterruptedException e) {
                log.error("RebalanceLockManager#tryBatch: unexpected error, group={}, mqs={}, clientId={}", group, mqs,
                    clientId, e);
            }
        }
        // 所以最后加锁成功的包含:
        // 1.没被任何client加锁,这里新被请求的client加锁 2.被其它client加锁,但是锁已经过期,这里将加锁的client替换为当前请求的client
        return lockedMqs;
    }

  private boolean isLocked(final String group, final MessageQueue mq, final String clientId) {
        ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
        if (groupValue != null) {
            LockEntry lockEntry = groupValue.get(mq);
            if (lockEntry != null) {
                boolean locked = lockEntry.isLocked(clientId);
                if (locked) {
                    // 对已经被当前请求的client加了锁的队列进行续期
                    lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
                }

                return locked;
            }
        }

        return false;
    }

processQueue 在 broker 获得了队列锁之后,是怎么用的呢?

集群模式在 ConsumeMessageOrderlyService 消费消息时,会判断只有 this.processQueue.isLocked() 才会消费。也就是说消费者获取了这个锁,才能消费消息:

class ConsumeRequest implements Runnable {
        private final ProcessQueue processQueue;
        private final MessageQueue messageQueue;

        public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
            this.processQueue = processQueue;
            this.messageQueue = messageQueue;
        }

        public ProcessQueue getProcessQueue() {
            return processQueue;
        }

        public MessageQueue getMessageQueue() {
            return messageQueue;
        }

        @Override
        public void run() {
            if (this.processQueue.isDropped()) {
                log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                return;
            }

            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
            synchronized (objLock) {
                if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                    || this.processQueue.isLocked() && !this.processQueue.isLockExpired()) {
                    // .......
                    // 暂时省略中间的消费代码
                } else {
                    if (this.processQueue.isDropped()) {
                        log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                        return;
                    }

                    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
                }
            }
        }

    }

 

2.消费时 consumer 对消费代码块加锁,保证同一时间队列只能被获得broker队列锁的消费者的一个线程消费