RocketMQ发送消息之同步异步单向

发布时间 2023-09-23 17:40:35作者: 自学Java笔记本

官网教程:https://rocketmq.apache.org/zh/docs/quickStart/01quickstart
基于双主双从异步方式开启的前提下,在maven项目中引入下列依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.1</version>
</dependency>

消息发送者步骤分析

  • 创建消息生产者producer,并制定生产者组名
  • 指定Nameserver地址
  • 启动producer
  • 创建消息对象,指定主题Topic、Tag和消息体
  • 发送消息
  • 关闭生产者producer

消息消费者步骤分析

  • 创建消费者Consumer,制定消费者组名
  • 指定Nameserver地址
  • 订阅主题Topic和Tag
  • 设置回调函数,处理消息
  • 启动消费者consumer

消息发送

发送同步消息

根据rockerMq提供的源码教程中文档所示,Producer端发送同步消息,这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

/**
 * 发送同步消息
 */
public class SyncProducer {
	public static void main(String[] args) throws Exception {
		// 实例化消息生产者Producer,指定一个生产者组
		DefaultMQProducer producer = new DefaultMQProducer("group1");
		// 设置NameServer的地址
		producer.setNamesrvAddr("192.168.78.129:9876;192.168.78.130:9876");
		// 启动Producer实例
		producer.start();
		for (int i = 0; i < 100; i++) {
			// 创建消息,并指定Topic(主题),Tag(消息Tag)和消息体(消息内容)
			Message msg = new Message("base",
					"Tag1",
					("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
			);
			// 发送消息到一个Broker
			SendResult sendResult = producer.send(msg);
			TimeUnit.SECONDS.sleep(1);
			// 通过sendResult返回消息是否成功送达
			System.out.printf("%s%n", sendResult);
		}
		// 如果不再发送消息,关闭Producer实例。
		producer.shutdown();
    }
}

SendResult即发送消息后的回调如下:

SendResult [sendStatus=SEND_OK, msgId=7F00000130A418B4AAC27429D3CA0063, offsetMsgId=C0A84E8100002A9F0000000000002DE2, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=3], queueOffset=11]

由于搭建的是双主双从架构,所以消息都发送到了主节点上,消息的消费是由从节点消费的。

发送异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
发送消息后的回调接口:

public interface SendCallback {
	// 消息成功发送的回调方法
    void onSuccess(final SendResult sendResult);
	// 消息发送失败的回调方法
    void onException(final Throwable e);
}

实现:

/**
 * 发送异步消息
 */
public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        // 1.创建消费者
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        // 2.设置nameserver
        producer.setNamesrvAddr("192.168.78.129:9876;192.168.78.130:9876");
        // 启动Producer实例
        producer.start();
        for (int i = 0; i < 100; i++) {
            // 创建消息,并指定Topic(主题),Tag(消息Tag)和消息体(消息内容)
            Message msg = new Message("base",
                    "Tag2",
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            // 发送消息到一个Broker,采用异步发送
            // SendCallback接收异步返回结果的回调
            producer.send(msg, new SendCallback() {
                // 发送成功的回调函数
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("发送结果:"+sendResult);
                }
                // 发送失败的回调函数
                @Override
                public void onException(Throwable e) {
                    System.out.println("发送异常:"+e.getMessage());
                }
            });
            // 线程休眠一秒
            TimeUnit.SECONDS.sleep(1);
        }
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

发送单向消息

这种方式主要用在不特别关心发送结果的场景,例如日志发送。

/**
发送单向消息
*/
public class OnewayProducer {
	public static void main(String[] args) throws Exception{
    	// 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("group1");
    	// 设置NameServer的地址
		producer.setNamesrvAddr("192.168.78.129:9876;192.168.78.130:9876");
    	// 启动Producer实例
        producer.start();
    	for (int i = 0; i < 100; i++) {
        	// 创建消息,并指定Topic,Tag和消息体
        	Message msg = new Message("base",
                "Tag3",
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        	);
        	// 发送单向消息,没有任何返回结果
        	producer.sendOneway(msg);

    	}
    	// 如果不再发送消息,关闭Producer实例。
    	producer.shutdown();
    }
}

消费消息

负载均衡模式

消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同
image
从图上可以看出,所谓负载均衡就是多个消费者在消费同一个topic下的消息时,他们之间是轮询的方式消费的。

在消费者代码中,通过consumer.setMessageModel(MessageModel.CLUSTERING);既可设置负载均衡
完整代码如下:

/**
 * 消息的接收者
 */
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        // 1.创建消费者Consumer,指定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        // 2.指定NameServer地址
        consumer.setNamesrvAddr("192.168.78.129:9876;192.168.78.130:9876");
        // 3.订阅主题Topic和Tag
        consumer.subscribe("base","Tag3");
        //负载均衡模式消费
        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            // 接收消息内容的方法
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("接收到的消息是:"+new String(msg.getBody()));
                }
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 5.启动消费者consumer
        consumer.start();
    }
}

结果:
image

image

可以发现,确实以及实现了负载均衡了,而且在rockermq中,默认的消费模式就是负载均衡

广播模式

image
广播模式中,生产者发送的消息,多个消费者可以同时消费
在消费者代码中设置: consumer.setMessageModel(MessageModel.BROADCASTING);
完整代码:

/**
 * 消息的接收者
 */
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        // 1.创建消费者Consumer,指定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        // 2.指定NameServer地址
        consumer.setNamesrvAddr("192.168.78.129:9876;192.168.78.130:9876");
        // 3.订阅主题Topic和Tag
        consumer.subscribe("base","Tag1");//消费Tag1中的消息,通过 || xxx 可以消费多个消息,通过*可以消费base下的所有消息
        //采用广播发送,也就是多个消费者可以消费同一个组中的base下的Tag1中的所有消息
        consumer.setMessageModel(MessageModel.BROADCASTING);
        // 4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            // 接收消息内容的方法
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("接收到的消息是:"+new String(msg.getBody()));
                }
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 5.启动消费者consumer
        consumer.start();
    }
}

结果:
image

image