Springboot整合RocketMQ实现、消息生产、消息消费

发布时间 2023-10-21 01:01:54作者: 向大海

前言:

  这里我们需要准备两个Springboot项目,一个项目作为 消息生产,一个作为消费消息

   window安装MQ,自行安装,可以参考教程:

https://blog.csdn.net/qq_63815371/article/details/131032508

 

项目1:(生产消息)

项目结构:

pom.xml

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.74</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
        </dependency>



        <!--MQ-->

        <!-- RocketMQ Java Client -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.9.1</version> <!-- 使用合适的RocketMQ版本 -->
        </dependency>

        <!-- Spring Boot Starter for RocketMQ -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.2</version> <!-- 使用合适的RocketMQ Spring Boot Starter 版本 -->
        </dependency>

application.properties

server.port=8888


# RocketMQ NameServer地址
rocketmq.name-server=127.0.0.1:9876

# RocketMQ Producer(生产者)配置
rocketmq.producer.group=producer_group
rocketmq.producer.send-message-timeout=3000

发送String类型的消息:

@SpringBootTest
public class 生产者 {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 测试发送 String类型的数据
     * */
    @Test
    public void sendMessage() {
        rocketMQTemplate.convertAndSend("gao", "第一个测试消息");
    }
}

 

 发送User对象类型的数据

@SpringBootTest
public class 生产者 {


    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 测试发送 User对象 类型的数据
     * */
    @Test
    public void sendMessage2() {
        User user = new User();
        user.setName("小高");
        user.setAge(20);
        user.setSex("男");
        user.setCreateDate(new Date());

        rocketMQTemplate.convertAndSend("gao", user);
    }
}

 

 User实体:

import lombok.Data;

import java.util.Date;

@Data
public class User {
    private String name;
    private Integer age;
    private String sex;
    private Date createDate;

}

 

 发送JSON类型的数据

@SpringBootTest
public class 生产者 {


    @Autowired
    private RocketMQTemplate rocketMQTemplate;


    /**
     * 测试发送 JSON 类型的数据
     * */
    @Test
    public void sendMessage3() {
        User user = new User();
        user.setName("小高");
        user.setAge(20);
        user.setSex("男");
        user.setCreateDate(new Date());

        String s = JSON.toJSONString(user);
        System.out.println(s);


        rocketMQTemplate.convertAndSend("gao", s);
    }
}

 

 项目2:(消费者)

项目结构:

 

pom.xml

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.74</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
        </dependency>




        <!-- RocketMQ Java Client -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.9.1</version> <!-- 使用合适的RocketMQ版本 -->
        </dependency>

        <!-- Spring Boot Starter for RocketMQ -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.2</version> <!-- 使用合适的RocketMQ Spring Boot Starter 版本 -->
        </dependency>

 

 application.properties

server.port=8889


# RocketMQ NameServer地址
rocketmq.name-server=127.0.0.1:9876

# RocketMQ Consumer(消费者)配置
rocketmq.consumer.group=consumer_group
rocketmq.consumer.topic=gao

 

消费String类型的数据

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(topic = "gao", consumerGroup = "consumer_group")
public class 消费者String implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println("消费者,接收到的消息String: " + message);
    }

}

 

消费User类型的数据

import com.gao.entity.User;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(topic = "gao", consumerGroup = "consumer_group")
public class 消费者User implements RocketMQListener<User> {

    @Override
    public void onMessage(User user) {

        System.out.println("消费者,接收到的消息User: " + user);
    }
}

 

User实体:

import lombok.Data;

import java.util.Date;

@Data
public class User {
    private String name;
    private Integer age;
    private String sex;
    private Date createDate;

}

 

消费JSON的数据,原则上还是Sting类型的数据

import com.alibaba.fastjson.JSON;
import com.gao.entity.User;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(topic = "gao", consumerGroup = "consumer_group")
public class 消费者JSON implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        User user = JSON.parseObject(s, User.class);
        System.out.println("消费者,接收到的消息JSON: "+user);

    }
}