SprinBoot整合RocketMQ

发布时间 2023-04-08 21:12:59作者: 周文豪

springboot整合rocketmq,这样可以简化rocketmq的使用

创建一个springboot工程

一、导入依赖

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.3.2</version>
        </dependency>
    </dependencies>

说明:rocketmq-spring-boot-starter的依赖包是不能直接从中央仓库下载的,需要自己通过源码install到本地仓库的。

我们先下载源码:https://github.com/apache/rocketmq-spring

进入源码目录,

执行如下命令,就安装到本地仓库了

mvn clean install

二、编写application.properties配置文件

server.port=8888
spring.rocketmq.nameServer=114.xxx.xxx.xxx:9876
spring.rocketmq.producer.group=my-group

三、生产者发送消息

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class SpringProducer {

    // 注入rocketMQ的模板
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送消息
     *
     * @param topic
     * @param msg
     */
    public void sendMsg(String topic, String msg) {
        this.rocketMQTemplate.convertAndSend(topic, msg);
    }

}

convertAndSend方法返回值为void,如果想要有返回值SendResult,可以使用SyncSend同步方法或asyncSend异步方法。

四、消费消息

import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(
        topic = "spring-my-topic",
        consumerGroup = "spring-consumer-group",
        selectorExpression = "*",
        consumeMode = ConsumeMode.CONCURRENTLY
)
public class SpringConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String msg) {
        System.out.println("接收到消息 -> " + msg);
    }
}

consumeMode:控制消费模式,您可以选择并发或有序(ORDERLY)接收消息。

messageModel:控制消息模式,广播模式:所有消费者都能接受到消息。集群模式:无论有多少个消费者,只有一个消费者能够接收到消息。

selectorExpression:控制可以选择哪个消息

consumerGroup:消费者组(多个消费者) 此参数相同即为同一个消费者组

五、编写测试用例

import com.zwh.rocketmq.SpringProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class TestSpringRocketMQ {

    @Autowired
    private SpringProducer springProducer;

    @Test
    public void testSendMsg(){
        String msg = "我的第2个SpringRocketMQ消息!";
        this.springProducer.sendMsg("spring-my-topic", msg);
        System.out.println("发送成功");
    }
}

先启动springboot,再运行测试用例,即可看到消费者接收到生产者发送的消息。

控制台打印:

接收到消息 -> 我的第2个SpringRocketMQ消息!

六、事务消息

定义TransactionListenerImpl

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import java.util.HashMap;
import java.util.Map;

@RocketMQTransactionListener(txProducerGroup = "myTransactionGroup")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
    private static Map<String, RocketMQLocalTransactionState> STATE_MAP = new HashMap<>();
    /**
     *  执行业务逻辑
     *
     * @param message
     * @param o
     * @return
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        String transId = (String)message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
        try {
            System.out.println("执行操作1");
            Thread.sleep(500); // 模拟调用服务
            System.out.println("执行操作2");
            Thread.sleep(800); // 模拟调用服务
            STATE_MAP.put(transId, RocketMQLocalTransactionState.COMMIT);
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            e.printStackTrace();
        }
        STATE_MAP.put(transId, RocketMQLocalTransactionState.ROLLBACK);
        return RocketMQLocalTransactionState.ROLLBACK;
    }

    /**
     * 回查
     *
     * @param message
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        String transId = (String)message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
        System.out.println("回查消息 -> transId = " + transId + ", state = " + STATE_MAP.get(transId));
        return STATE_MAP.get(transId);
    }
}

由于@RocketMQTransactionListener注解中包含了@Component注解,故不需要添加@Component注解。

注意:这里通过Message获取transactionId与之前的不一样,之前通过Message.getTransactionId()方法获取事务ID,该Message为org.apache.rocketmq.common .message。而这里的Message为org.springframework.messaging,需要先通过getHeaders().get(RocketMQHeaders.TRANSACTION_ID)方法获取事务ID。

定义生产者

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;

@Component
public class SpringTransactionProducer {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    /**
     * 发送消息
     *
     * @param topic
     * @param msg
     */
    public void sendMsg(String topic, String msg) {
        Message message = MessageBuilder.withPayload(msg).build();
        // myTransactionGroup要和@RocketMQTransactionListener(txProducerGroup = "myTransactionGroup")定义的一致
        this.rocketMQTemplate.sendMessageInTransaction("myTransactionGroup",
                topic,
                message,
                null);
        System.out.println("发送消息成功");
    }
}

注意:myTransactionGroup要和@RocketMQTransactionListener(txProducerGroup = "myTransactionGroup")定义的一致。

另外,由于Message是org.springframework.messaging包下的message,而不是RocketAPI的Messsage。

消费者(没有变化)

@Component
@RocketMQMessageListener(topic = "spring-tx-my-topic",
        consumerGroup = "haoke-consumer",
        selectorExpression = "*")
public class SpringTxConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String msg) {
        System.out.println("接收到消息 -> " + msg);
    }
}

编写测试用例

import com.zwh.rocketmq.transaction.SpringTransactionProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class TestSpringRocketMQ {
    @Autowired
    private SpringTransactionProducer springTransactionProducer;
    @Test
    public void testSendMsg2(){
        this.springTransactionProducer.sendMsg("spring-tx-my-topic", "第5个Spring事务消息");
    }
}

启动项目,然后发送一条消息,测试用例控制台打印如下:

执行操作1
执行操作2
发送消息成功

MyApplication控制台打印:

接收到消息 -> 第5个Spring事务消息

 

我们制造一个异常,来模拟rollback的情况

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;

import java.util.HashMap;
import java.util.Map;

@RocketMQTransactionListener(txProducerGroup = "myTransactionGroup")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {

    private static Map<String, RocketMQLocalTransactionState> STATE_MAP = new HashMap<>();

    /**
     *  执行业务逻辑
     *
     * @param message
     * @param o
     * @return
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        String transId = (String)message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);

        try {
            System.out.println("执行操作1");
            Thread.sleep(500);
            System.out.println(1/0);
            System.out.println("执行操作2");
            Thread.sleep(800);

            STATE_MAP.put(transId, RocketMQLocalTransactionState.COMMIT);

            return RocketMQLocalTransactionState.COMMIT;

        } catch (Exception e) {
            e.printStackTrace();
        }

        STATE_MAP.put(transId, RocketMQLocalTransactionState.ROLLBACK);
        return RocketMQLocalTransactionState.ROLLBACK;

    }

    /**
     * 回查
     *
     * @param message
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        String transId = (String)message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);

        System.out.println("回查消息 -> transId = " + transId + ", state = " + STATE_MAP.get(transId));

        return STATE_MAP.get(transId);
    }
}

启动项目,然后发送一条消息,测试用例控制台打印如下:

执行操作1
java.lang.ArithmeticException: / by zero
    at com.zwh.rocketmq.transaction.TransactionListenerImpl.executeLocalTransaction(TransactionListenerImpl.java:31)
    at org.apache.rocketmq.spring.support.RocketMQUtil$1.executeLocalTransaction(RocketMQUtil.java:43)
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendMessageInTransaction(DefaultMQProducerImpl.java:1156)
    at org.apache.rocketmq.client.producer.TransactionMQProducer.sendMessageInTransaction(TransactionMQProducer.java:79)
    at org.apache.rocketmq.spring.core.RocketMQTemplate.sendMessageInTransaction(RocketMQTemplate.java:535)
    at com.zwh.rocketmq.transaction.SpringTransactionProducer.sendMsg(SpringTransactionProducer.java:24)
    at com.zwh.TestSpringRocketMQ.testSendMsg2(TestSpringRocketMQ.java:23)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.springframework.test.context.junit4.statements.RunBeforeTestExecutionCallbacks.evaluate(RunBeforeTestExecutionCallbacks.java:74)
    at org.springframework.test.context.junit4.statements.RunAfterTestExecutionCallbacks.evaluate(RunAfterTestExecutionCallbacks.java:84)
    at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
    at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
    at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:251)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
    at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
    at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221)
    at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
发送消息成功

由于消费者拿不到消息,故MyApplication控制台没有打印

我们通过返回unknown状态来模拟回查

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;

import java.util.HashMap;
import java.util.Map;

@RocketMQTransactionListener(txProducerGroup = "myTransactionGroup")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {

    private static Map<String, RocketMQLocalTransactionState> STATE_MAP = new HashMap<>();

    /**
     *  执行业务逻辑
     *
     * @param message
     * @param o
     * @return
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        String transId = (String)message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);

        try {
            System.out.println("执行操作1");
            Thread.sleep(500);
            System.out.println("执行操作2");
            Thread.sleep(800);

            STATE_MAP.put(transId, RocketMQLocalTransactionState.COMMIT); // 这样会查之后就会提交

            return RocketMQLocalTransactionState.UNKNOWN;

        } catch (Exception e) {
            e.printStackTrace();
        }

        STATE_MAP.put(transId, RocketMQLocalTransactionState.ROLLBACK);
        return RocketMQLocalTransactionState.ROLLBACK;

    }

    /**
     * 回查
     *
     * @param message
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        String transId = (String)message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);

        System.out.println("回查消息 -> transId = " + transId + ", state = " + STATE_MAP.get(transId));

        return STATE_MAP.get(transId);
    }
}

启动项目,然后发送一条消息,测试用例控制台打印如下:

执行操作1
执行操作2
发送消息成功

发现MyApplication控制台没有打印,原因是单元测试结束后无法实现会查,我们可以让单元测试发送消息之后进行睡眠

@RunWith(SpringRunner.class)
@SpringBootTest
public class TestSpringRocketMQ {
    @Autowired
    private SpringTransactionProducer springTransactionProducer;


    @Test
    public void testSendMsg2(){
        this.springTransactionProducer.sendMsg("spring-tx-my-topic1", "第5个Spring事务消息");

        try {
            Thread.sleep(9999999L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

启动项目,然后发送一条消息,测试用例控制台打印如下:

执行操作1
执行操作2
发送消息成功
回查消息 -> transId = C0A81FA73D6018B4AAC228869B190000, state = null
回查消息 -> transId = C0A81FA7013018B4AAC22884B9FC0000, state = null
回查消息 -> transId = C0A81FA7573C18B4AAC228913F390000, state = COMMIT

发现MyApplication控制台打印:

回查消息 -> transId = C0A81FA7013018B4AAC22884B9FC0000, state = null
回查消息 -> transId = C0A81FA7573C18B4AAC228913F390000, state = null
回查消息 -> transId = C0A81FA73D6018B4AAC228869B190000, state = null
回查消息 -> transId = C0A81FA73D6018B4AAC228869B190000, state = null
接收到消息 -> 第5个Spring事务消息

测试结果与非Spring使用,结果一致。