RocketMQ安装教程

发布时间 2023-06-25 11:29:59作者: 哎哟,不错哦

  RocketMQ是阿里巴巴在2012年开发的分布式消息中间件,专为万亿级超大规模的消息处理而设计,具有高吞吐量、低延迟、海量堆积、顺序收发等特点。它是阿里巴巴双十一购物狂欢节和众多大规模互联网业务场景的必备基础设施。在同一年,阿里巴巴正式开源了RocketMQ的第一个版本。

  2015年,RocketMQ在消息传递方面迎来了一批重量级功能发布,包括事务消息、SQL过滤、轨迹追踪、定时消息、高可用多活等,以满足阿里巴巴日益丰富的业务场景。由于这些优势,RocketMQ 取代了阿里巴巴自主研发的另一款MQ产品Notify,成为阿里巴巴的首选消息中间件,实现了内部应用的百分百接入。在2016年,RocketMQ在阿里云上开发了首个全托管服务,帮助大量数字化转型的企业构建现代应用,并开始体验大规模的云计算实践。同年,RocketMQ被捐赠给Apache基金会,并入选孵化器项目,旨在未来为更多开发者服务。

  2017年从Apache基金会毕业后,RocketMQ被指定为顶级项目(TLP)。

  一:RocketMQ安装

  1、下载

  从apache文件库下载:https://archive.apache.org/dist/rocketmq/

  2、配置环境变量

 

 

 

  3、启动

  进入到刚刚解压完的文件bin目录下,先启动NAMESERVER

  命令:start mqnamesrv.cmd,启动成功后不要关闭窗口

 

  启动BROKER

  start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

  同样启动成功后不要关闭窗口

 

  二:RocketMQ图形化界面

  1、下载console

  地址:https://github.com/apache/rocketmq-dashboard

  2、将项目导入开发工具修改application.yml文件,将rocketmq地址修改为自己的地址。启动项目

   3、访问控制台

 

  三:整合项目

  1、引入jar包

       <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>${rocketmq.version}</version>
        </dependency>    

  2、编写测试类

package com.xiaoyu.provider.order.util;


import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * WJY
 */
public class ProducerExample {

    private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class);

    public static void main(String[] args) throws MQClientException {
        final DefaultMQProducer producer = new DefaultMQProducer (  "test_producer_group");
        // 设置nameServer地圳
        producer.setNamesrvAddr ("127.0.0.1:9876") ;
        producer.start() ;
        for (int i = 0 ; i < 1000; i ++){
            try {
                Message message = new Message("TestTopic", "tagA", ("生产者发送消息" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                System.out.println("生产者发送消息"+i);
                // 调用发送消息将消息传递给代理
                SendResult sendResult = producer.send(message);
                System.out.printf("%s%n",sendResult);
            }catch (Exception e){
                e.printStackTrace();
            }
        }
        // 一旦生产者实例不再使用,就立即关闭
        producer.shutdown();
    }

}
package com.xiaoyu.provider.order.util;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
 * WJY
 */
public class PushConsumerExample {
    private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);


    public static void main(String[] args) throws MQClientException {
        // 使用指定的使用者组名称实例化
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_producer_group");
        // 指定名称服务器地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 指定从哪里开始,以防指定的消费群体是一个全新的消费群体
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        //   订阅主题 tag="*"代表订阅 TopicTest主题下所有子主题消息
        consumer.subscribe("TestTopic", "tagA");
        // 注册回调函数,以便在从代理获取的消息到达时执行
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(
                    List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("消费者消费数据:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者实例
        consumer.start();
        // 如果配置信息错误或者当前服务不可用,报错MQClientException
        consumer.fetchSubscribeMessageQueues("test_producer_group");
    }

}

  分别启动ProducerExample和PushConsumerExample类