ruoyi整合ActiveMQ-Stomp

发布时间 2023-04-21 19:26:18作者: binbinx

https://www.cnblogs.com/SjhCode/p/ruoyiActiveMQ.html

ActiveMQ 在若依中的配置,这里使用的传输协议是stomp协议

消费时第一时间要确定对方打开了端口,是可以连接的状态。(cmd命令行测试telnet, ping)
ActiveMQ
消费消息consumeMsg:在处理业务时,需要判断JSON里面每一条数据有没有存在,当遇到不存在的JSON,需要抛出异常或者跳过本次,及时结束消费,以免造成死循环。
需要换多几条json测试,以免消费不成功

本地如何测试:

https://activemq.apache.org/download.html    ;下载ActiveMQ 

在\apache-activemq-5.xx.x\bin\win64 ,启动activemq.bat,不能关闭窗口

浏览器访问http://127.0.0.1:8161/  ,用户名admin ,密码admin

创建Queue ,Queue Name就是通道名称,(下面代码中,QUEUE_FLIGHT_ASSIGNPARK=“”; 需要和配合这里修改)

然后就是写完java代码之后启动项目,通过网站的对应Queue, 点击sent to;输入Message body;往通道里面放消息;

如果此时java代码没报错,并且打印出对应的log,说明就消费成功了,网站上会显示消息已经被消费了,待消费数量为0,消费者数量为1,消息出队+1;

接入测试:

cmd命令行测试telnet

对方提前往队列里面加消息

根据三方提供的host,port,user,password,queuename对应修改,连上后会消费到消息则对接成功

 

导入ActiveMQ需要的jar包

 

        <!--ActiveMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.14.5</version>
        </dependency>    
 

配置文件

 
#activemq
  activemq:
    host: 127.0.0.1
    port: 61613
    user: admin
    password: admin
    pool:
      enabled: true
      max-connections: 10
#  #默认情况下activemq提供的是queue模式,若要使用topic模式需要配置下面配置
#  pub-sub-domain: true
 

生产者

 
package com.ruoyi.web.controller.ActiveMQ;

import org.apache.activemq.transport.stomp.Stomp;
import org.apache.activemq.transport.stomp.StompConnection;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;


import java.util.HashMap;
import java.util.UUID;
//生产者
@Component
public class JmsProducer {
    @Autowired
    JmsConsumer jmsConsumer;

    @Value("${spring.activemq.user}")
    private String userName;

    @Value("${spring.activemq.password}")
    private String password;

    @Value("${spring.activemq.host}")
    private String host;

    @Value("${spring.activemq.port}")
    private Integer port;



    //连接
    public StompConnection connectionFactory() throws Exception {
        StompConnection conn = new StompConnection();
        conn.open(host, port);
        conn.connect(userName, password);
        return conn;
    }

    /**
     * 发送JMS消息
     *
     * @throws Exception exception
     */
    public void sendMessage(String queueName, String message)
        throws Exception {
        StompConnection stompConnection = connectionFactory();
        String tx = UUID.randomUUID().toString().replaceAll("-", "");
        HashMap<String, String> headers = new HashMap<>();
        headers.put(Stomp.Headers.Send.PERSISTENT, "true");
        stompConnection.begin(tx);
        stompConnection.send(queueName, message, tx, headers);
        stompConnection.commit(tx);
        stompConnection.disconnect();
    }


}
 

消费者

QUEUE_FLIGHT_ASSIGNPARK 是需要订阅通道名
package com.ruoyi.web.controller.ActiveMQ;


import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.ruoyi.admin.domain.entity.Dto.BullyingDto;
import com.ruoyi.admin.service.WarningService;
import com.ruoyi.common.websocket.WebSocket;
import com.ruoyi.web.controller.ActiveMQ.JmsProducer;
import org.apache.activemq.transport.stomp.StompConnection;
import org.apache.activemq.transport.stomp.StompFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.net.SocketTimeoutException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;


import javax.annotation.Resource;
//消费者
@Component
public class JmsConsumer implements ApplicationListener<ContextRefreshedEvent> {
    private static Logger LOG = LoggerFactory.getLogger(JmsConsumer.class);
    @Resource
    public JmsProducer jmsProducer;
    @Resource
    private WebSocket webSocket;
    @Autowired
    private WarningService warningService;

    private static final String QUEUE_FLIGHT_ASSIGNPARK = "test001";

    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        try {
            consumeMsg(QUEUE_FLIGHT_ASSIGNPARK);
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }
    /**
     * 从mq中消费消息
     *
     * @param queueName
     * @throws Exception
     */

    public void consumeMsg(String queueName) throws Exception {
        LOG.info("*********从消息队列中获取结果、并推送给前端**************");
        new Thread(() -> {
            while (true) {
                StompFrame frame;
                String messageId = "";
                StompConnection stompConnection = null;
                try {
                    String ack = "client";
                    stompConnection = jmsProducer.connectionFactory();
                    stompConnection.subscribe(queueName, ack);
                    stompConnection.keepAlive();
                    // 注意,如果没有接收到消息,
                    // 这个消费者线程会停在这里,直到本次等待超时
                    long waitTimeOut = 30000;
                    frame = stompConnection.receive(waitTimeOut);
                    Map<String, String> headers = frame.getHeaders();
                    messageId = headers.get("message-id");
                    LOG.info("消息id:{}", messageId);
                    //具体的业务处理......
                    //取出推送信息中的msg
            String msg = frame.getBody(); LOG.info(msg);
            // 在ack是client标记的情况下,确认消息

            if ("client".equals(ack)) {
                      stompConnection.ack(messageId);
                      LOG.info("确认消息");
                    }
                } catch (SocketTimeoutException e) {
                    LOG.error(e.getMessage());
                    continue;
                } catch (JSONException ex) {
                    LOG.error("message_id:{},数据异常:{}", messageId, ex.getMessage());
                    continue;
                } catch (Exception e) {
                    LOG.error(e.getMessage());
                    continue;
                } finally {
                    try {
                        stompConnection.ack(messageId);
//                        LOG.info();
                        stompConnection.disconnect();
                    } catch (Exception e) {
                        LOG.error(e.getMessage());
                    }
                }
            }
        }).start();
    }
}