spring rabbitmq RPC

发布时间 2023-05-17 14:50:45作者: Heng*
1. 配置
@Configuration
public static class YwtDataQueryConfig {

	/**
	 * 一网通数据查询队列
	 */
	public static final String QUEUE_YWT_DATA_QUERY = "QUEUE_YWT_DATA_QUERY";
	/**
	 * 一网通数据查询交换机
	 */
	public static final String DIRECT_EXCHANGE_YWT_DATA_QUERY = "DIRECT_EXCHANGE_YWT_DATA_QUERY";
	/**
	 * 一网通数据查询路由键
	 */
	public static final String ROUTING_KEY_YWT_DATA_QUERY = "ROUTING_KEY_YWT_DATA_QUERY";

	@Bean
	public Queue queueYwtDataQuery() {
		return new Queue(QUEUE_YWT_DATA_QUERY, true);
	}

	@Bean
	public DirectExchange directExchangeYwtDataQuery() {
		return new DirectExchange(DIRECT_EXCHANGE_YWT_DATA_QUERY, true, false, null);
	}

	@Bean
	public Binding bindingYwtDataQuery() {
		return BindingBuilder.bind(queueYwtDataQuery()).to(directExchangeYwtDataQuery()).with(ROUTING_KEY_YWT_DATA_QUERY);
	}
}
2. 监听消费
package com.dh.gssealorder.service.mq;

import com.alibaba.fastjson.JSON;
import com.dh.gssealorder.config.RabbitMqConfig;
import com.dh.gssealorder.service.mq.vo.YwtQueryParam;
import com.dh.gssealorder.service.view.YctGongaEBaseinfoService;
import com.dh.gssealorder.vo.ResultVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;

/**
 * @Author: sh
 * @Description:
 * @Version:
 * @Date: 2023/5/16
 */
@Slf4j
@Service
public class YwtDataQueryListener {

    @Resource
    private YctGongaEBaseinfoService enterService;

    @RabbitListener(queues = RabbitMqConfig.YwtDataQueryConfig.QUEUE_YWT_DATA_QUERY)
    @SendTo
    public String onMessage(Message message) {
        try {
            byte[] body = message.getBody();
            String param = new String(body, StandardCharsets.UTF_8);
            YwtQueryParam queryParam = JSON.parseObject(param, YwtQueryParam.class);
            String queryType = queryParam.getQueryType();
            if ((YwtQueryParam.QueryType.FEEDBACK_RATIO.equals(queryType))) {
                ResultVO resultVO = enterService.feedbackRatio(queryParam);
                return JSON.toJSONString(resultVO);
            }
            if ((YwtQueryParam.QueryType.NOT_FEEDBACK_ENTERPRISE.equals(queryType))) {
                ResultVO resultVO = enterService.notFeedbackEnterprise(queryParam);
                return JSON.toJSONString(resultVO);
            }
        } catch (Exception exception) {
            log.error(exception.getMessage());
        }
        return null;
    }

}

3. 消息生产发送
public static final long YWT_REPLY_TIMEOUT = 15000L;
public static final String APPLICATION_JSON = "application/json";

public String sendToQueueYwtDataQuery(YwtQueryParam param) {
	try {
		String msg = JSON.toJSONString(param);
		MessageProperties messageProperties = new MessageProperties();
		messageProperties.setContentType(APPLICATION_JSON);
		messageProperties.setExpiration(String.valueOf(YWT_REPLY_TIMEOUT));
		rabbitTemplate.setReplyTimeout(YWT_REPLY_TIMEOUT);
		Message message = new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties);
		Message mm = rabbitTemplate.sendAndReceive(RabbitConfig.YwtDataQueryConfig.QUEUE_YWT_DATA_QUERY, message);
		if (null == mm) {
			return null;
		}
		String msgResult = new String(mm.getBody(), StandardCharsets.UTF_8);
		return msgResult;
	} catch (Exception e) {
		return e.getMessage();
	}

}