Rabbit-分布式事务实例 20230406

发布时间 2023-04-06 16:27:08作者: cn2023

 

一、生产、消费者 流程

 

 

 

 

 

 

 

 

1、生产者(下单后生产 务必成功)
  派单队列:order_platonn_queue
  交换机:order_exchange_name
  绑交换机路由键:orderRoutingKey

  生产者=>采用confirm,确认应答机制
        Ack模式:成功
            失败则重试


2、消费者(platonn派单)
  派单队列:order_platonn_queue
  交换机:order_exchange_name
  绑交换机路由键:orderRoutingKey

       消费者=>重发Retry/清除Ack/丢弃Nack

 

 

 

 二、RabbitMQ

 

 

 

 

 

 

 

 

 

 

 

 

三、SB+Order+Platoon

 

                (一)、rabbitmq_order2050

 

  1、 pom.xml
    <dependencies>
     <dependency>
      <groupId>org.mybatis.spring.boot</groupId>
      <artifactId>mybatis-spring-boot-starter</artifactId>
      <version>2.2.2</version>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <scope>runtime</scope>
      <version>8.0.22</version>

     </dependency>
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>druid</artifactId>
      <version>1.2.11</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
    </dependency>
    <!--spring对amqp-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
      <version>2.3.1.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-lang3</artifactId>
    </dependency>
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>2.0.9</version>
    </dependency>
   </dependencies>

 

  2、 application.properties

 

  spring.application.name=rabbitmq_order2050
  server.port=2050
  spring.rabbitmq.host=localhost
  spring.rabbitmq.username=guest
  spring.rabbitmq.password=guest
  spring.rabbitmq.port=5672
  ###开启消息确认机制confirms
  spring.rabbitmq.publisher-confirm-type=correlated
  spring.rabbitmq.publisher-returns=true

 

  # 数据源名称
  spring.datasource.name=test
  # 数据库连接地址
  spring.datasource.url=jdbc:mysql://localhost:3306/newtest?characterEncoding=UTF-8&serverTimezone=UTC
  # 数据库用户名&密码:
  spring.datasource.username=root
  spring.datasource.password=root
  #使用druid数据源
  spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
  # 数据库驱动:
  spring.datasource.driver-class-name=com.mysql.jdbc.Driver

 


  3、base
  3.1、ApiContains
  package com.sc.rabbitmq_order2050.base;

 

  public interface ApiContains {
    //响应请求成功
    String HTTP_RES_CODE_200_VALUE = "success";
    //系统错识
    String HTTP_RES_CODE_500_VALUE = "fail";
    //响应请求成功code
    Integer HTTP_RES_CODE_200 = 200;
    //系统错识
    Integer HTTP_RES_CODE_500 = 500;
    //未关联QQ账号
    Integer HTTP_RES_CODE_201 = 201;
  }

 


  3.2、BaseApiService
  package com.sc.rabbitmq_order2050.base;
  import org.springframework.stereotype.Component;

 

  @Component
  public class BaseApiService {

 

    public ResponseBase setResultError(Integer code, String msg) {
      return setResult(code, msg, null);
    }

 

    //返回错识,可以传msg
    public ResponseBase setResultError(String msg) {
      return setResult(ApiContains.HTTP_RES_CODE_500, msg, null);
    }

 

    //返回成功,可以传data值
    public ResponseBase setResultSuccess(Object data) {
      return setResult(ApiContains.HTTP_RES_CODE_200, ApiContains.HTTP_RES_CODE_200_VALUE, data);
    }

 

    //返回成功,无data值
    public ResponseBase setResultSuccess() {
      return setResult(ApiContains.HTTP_RES_CODE_200, ApiContains.HTTP_RES_CODE_200_VALUE, null);
    }

 

    //返回成功,无data值
    public ResponseBase setResultSuccess(String msg) {
      return setResult(ApiContains.HTTP_RES_CODE_200, msg, null);
    }

 

    //通用封装
    public ResponseBase setResult(Integer code, String msg, Object data) {
      return new ResponseBase(code, msg, data);
    }
  }

 

  3.3、ResponseBase
  package com.sc.rabbitmq_order2050.base;

 

  import lombok.Data;
  @Data
  public class ResponseBase {

    private Integer rtnCode;
    private String msg;
    private Object data;

 

    public ResponseBase() {
    }

 

    public ResponseBase(Integer rtnCode, String msg, Object data) {
      super();
      this.rtnCode = rtnCode;
      this.msg = msg;
      this.data = data;
    }

 

    @Override
    public String toString() {
      return "ResponseBase[rtnCode=" + rtnCode + ", msg=" + msg + ", data=" + data + "]";
    }
  }

 


  4、RabbitmqConfig
  package com.sc.rabbitmq_order2050.rabbitmq;

  import org.springframework.amqp.core.Binding;
  import org.springframework.amqp.core.BindingBuilder;
  import org.springframework.amqp.core.DirectExchange;
  import org.springframework.amqp.core.Queue;
  import org.springframework.context.annotation.Bean;
  import org.springframework.stereotype.Component;

 

  /**
  * 下单且派单、补单队列
  * 下单、派单交换机
  * 定义【下单且派单队列、补单队列】与交换机绑定
  */
  @Component
  public class RabbitmqConfig {

 

    //下单且派单队列
    public static final String ORDER_PLATOON_QUEUE = "order_platoon_queue";
    //补单队列,判断订单是否已经被创建
    //public static final String ORDER_CREATE_QUEUE = "order_create_queue";
    //下单并且派单交换机
    public static final String ORDER_EXCHANGE_NAME= "order_exchange_name";

 

    //1、定义订单队列
    @Bean
    public Queue directOrderPlatoonQueue() {
      return new Queue(ORDER_PLATOON_QUEUE);
    }

    //2、定义补单队列
    @Bean
    public Queue directCreateOrderQueue() {
      return new Queue(ORDER_CREATE_QUEUE);
    }

 

    //2、定义交换机
    DirectExchange directOrderExchange() {
      return new DirectExchange(ORDER_EXCHANGE_NAME);
    }

 

    //3、定义【下单且派单队列】与交换机绑定
    @Bean
    Binding bindingExchangeOrderDicQueue() {
      return BindingBuilder.bind(directOrderPlatoonQueue()).to(directOrderExchange())
      .with("orderRoutingKey");
    }

 

    //3、定义补单队列与交换机绑定
    @Bean
      Binding bindingExchangeCreateOrderQueue() {
        return BindingBuilder.bind(directCreateOrderQueue()).to(directOrderExchange())
.        .with("orderRoutingKey");
    }
  }

 

 

 

  5、OrderEnity
  package com.sc.rabbitmq_order2050.entity;

  import lombok.Data;
  @Data
  public class OrderEnity {
    private Long id;
    //订单名称
    private String name;
    //订单金额
    private Double orderMoney;
    //订单id
    private String orderId;
  }

 


  6、OrderMapper
  package com.sc.rabbitmq_order2050.mapper;

  import com.sc.rabbitmq_order2050.entity.OrderEnity;
  import org.apache.ibatis.annotations.Insert;
  import org.apache.ibatis.annotations.Options;
  import org.apache.ibatis.annotations.Param;
  import org.apache.ibatis.annotations.Select;

 

  public interface OrderMapper {

    @Insert(value = "insert into order_info values(#{id},#{name},#{orderMoney},#{orderId})")
    @Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id")
    public int addOder(OrderEnity orderEnity); 

    @Select("select id as id,name as name,order_money as orderMonty,orderId as orderId" +
    " from order_info where orderId=#{orderId};")
    public OrderEnity findOrderId(@Param("orderId") String orderId);

   }

 

  7、OrderService

  package com.sc.rabbitmq_order2050.service;

  import com.alibaba.fastjson.JSONObject;
  import com.sc.rabbitmq_order2050.base.BaseApiService;
  import com.sc.rabbitmq_order2050.base.ResponseBase;
  import com.sc.rabbitmq_order2050.entity.OrderEnity;
  import com.sc.rabbitmq_order2050.mapper.OrderMapper;
  import org.springframework.amqp.core.Message;
  import org.springframework.amqp.core.MessageBuilder;
  import org.springframework.amqp.core.MessageProperties;
  import org.springframework.amqp.rabbit.connection.CorrelationData;
  import org.springframework.amqp.rabbit.core.RabbitTemplate;
  import org.springframework.beans.factory.annotation.Autowired;
  import org.springframework.stereotype.Service;
  import java.util.UUID;

  @Service
  public class OrderService extends BaseApiService implements RabbitTemplate.ConfirmCallback {

    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private RabbitTemplate rabbitTemplate;

 

    public ResponseBase addOrderAndPlatoon() {
      //先下单 订单表插入数据
      OrderEnity orderEnity = new OrderEnity();
      orderEnity.setName("xxxx面");
      //价格是300元
      orderEnity.setOrderMoney(300d);
      //商品id
      String orderId = UUID.randomUUID().toString();
      orderEnity.setOrderId(orderId);
      //1、先下单,创建订单(往订单数据库中插入一条数据)
      int orderResult = orderMapper.addOder(orderEnity);
      System.out.println("orderResult:" + orderResult);
      if (orderResult <= 0) {
        return setResultError("下单失败!");
      }
      //2、订单表插入完数据后 订单发送 外卖小哥【派单】
      send(orderId);
      return setResultSuccess();
    }

 

    private void send(String orderId) {
      JSONObject jsonObject = new JSONObject();
      jsonObject.put("orderId", orderId);
      String msg = jsonObject.toJSONString();
      System.out.println("msg:" + msg);
      //封装信息
      Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON)
      .setContentEncoding("utf-8").setMessageId(orderId).build();
      //构建回调返回的数据
      CorrelationData correlationData = new CorrelationData(orderId);
      //发送信息
      this.rabbitTemplate.setMandatory(true);
      this.rabbitTemplate.setConfirmCallback(this);
      rabbitTemplate.convertAndSend("order_exchange_name",
      “orderRoutingKey", message, correlationData);
    }

 

    //生产信息确认机制 生产者往服务器端发送消息的时候 采用应答机制
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String s) {
      String orderId = correlationData.getId();//id都是相同 全局id
      System.out.println("消息id:" + correlationData.getId());
      if (ack) {//消息发送成功
        System.out.println("消息id:" + correlationData.getId());
      } else {
        //重试机制
        send(orderId);
        System.out.println("消息发送确认失败:" + s);
      }
    }

 

  }

 


  8、OrderController
  package com.sc.rabbitmq_order2050.controller; 

  import com.sc.rabbitmq_order2050.base.BaseApiService;
  import com.sc.rabbitmq_order2050.base.ResponseBase;
  import com.sc.rabbitmq_order2050.service.OrderService;
  import org.springframework.beans.factory.annotation.Autowired;
  import org.springframework.web.bind.annotation.RequestMapping;
  import org.springframework.web.bind.annotation.RestController;

  @RestController
  public class OrderController extends BaseApiService {
  @Autowired
  private OrderService orderService;

 

  @RequestMapping("/addOrder")
    public ResponseBase addOrder() {
    return orderService.addOrderAndDispath();
    }
  }

 

  9、RabbitmqOrder2050Application

  package com.sc.rabbitmq_order2050;

  import org.mybatis.spring.annotation.MapperScan;
  import org.springframework.boot.SpringApplication;
  import org.springframework.boot.autoconfigure.SpringBootApplication;

  @MapperScan("com.sc.rabbitmq_order2050.mapper")
  @SpringBootApplication
  public class RabbitmqOrder2050Application {

    public static void main(String[] args) {
      SpringApplication.run(RabbitmqOrder2050Application.class, args);
    }

  }

 

 

 

 

 

 

 

 

 

 

 

                (二)、rabbitmq_platoon2051

 

 

 

  1、 pom.xml
    <dependencies>
      <dependency>
      <groupId>org.mybatis.spring.boot</groupId>
      <artifactId>mybatis-spring-boot-starter</artifactId>
      <version>2.2.2</version>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <scope>runtime</scope>
      <version>8.0.22</version>
    </dependency>
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>druid</artifactId>
      <version>1.2.11</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
    </dependency>
    <!--spring对amqp-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
      <version>2.3.1.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-lang3</artifactId>
    </dependency>
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>2.0.9</version>
    </dependency>
  </dependencies>

 

  2、 application.properties

 

    spring.application.name=rabbitmq_platoon2051
    server.port=2051
    spring.rabbitmq.host=localhost
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.port=5672
    ###开启消息者(程序出现异常情况下会)进行重试
    spring.rabbitmq.listener.simple.retry.enabled=true
    ##最大重试次数
    spring.rabbitmq.listener.simple.retry.max-attempts=5
    #重试间隔
    spring.rabbitmq.listener.simple.retry.initial-interval=3000
    #开启手动ack
    spring.rabbitmq.listener.simple.acknowledge-mode=manual

 


    # 数据源名称
    spring.datasource.name=test
    # 数据库连接地址
    spring.datasource.url=jdbc:mysql://localhost:3306/newtest?characterEncoding=UTF-8&serverTimezone=UTC
    # 数据库用户名&密码:
    spring.datasource.username=root
    spring.datasource.password=root
    #使用druid数据源
    spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
    # 数据库驱动:
    spring.datasource.driver-class-name=com.mysql.jdbc.Driver

 


  3、RabbitmqConfig
    package com.sc.rabbitmq_platoon2051.rabbitmq;

 

    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.stereotype.Component;

 

    /**
    * 下单且派单队列
    * 下单、派单交换机
    * 定义【下单且派单队列】与交换机绑定
    */
    @Component
    public class RabbitmqConfig {

 

      //下单且派单队列
      public static final String ORDER_PLATOO_QUEUE = "order_platoon_queue";
      ///补单队列,判断订单是否已经被创建
      public static final String ORDER_CREATE_QUEUE = "order_create_queue";
      //下单、派单交换机
      public static final String ORDER_EXCHANGE_NAME= "order_exchange_name";

 

      //1、定义订单队列
      @Bean
      public Queue directOrderPlatoonQueue() {
        return new Queue(ORDER_PLATOO_QUEUE);
      }

 

 

      //2、定义补单队列
      /* @Bean
      public Queue directCreateOrderQueue() {
        return new Queue(ORDER_CREATE_QUEUE);
      }*/

 

      //2、定义交换机
      DirectExchange directOrderExchange() {
        return new DirectExchange(ORDER_EXCHANGE_NAME);
      }

 

      //3、定义【下单且派单队列】与交换机绑定
      @Bean
      Binding bindingExchangeOrderDicQueue() {
        return BindingBuilder.bind(directOrderPlatoonQueue()).to(directOrderExchange())
        .with("orderRoutingKey");
      }

 

      //3、定义补单队列与交换机绑定
      /*@Bean
      Binding bindingExchangeCreateOrderQueue() {
        return BindingBuilder.bind(directCreateOrderQueue()).to(directOrderExchange())
        .with("orderRoutingKey");
      }*/
    }

 

 

 

  4、PlatoonEntity
  package com.sc.rabbitmq_platoon2051.entity;

 

  import lombok.Data;

 

  @Data
  public class PlatoonEntity {
    private Long id;
    //订单号
    private String orderId;
    //外卖员
    private Long takeoutUserId;
  }

 

 

 

  5、PlatoonMapper
  package com.sc.rabbitmq_platoon2051.mapper;

 

  import com.sc.rabbitmq_platoon2051.entity.PlatoonEntity;
  import org.apache.ibatis.annotations.Insert;

 

  public interface PlatoonMapper
  {
    /**
    * 新增派单任务
    */
    @Insert("insert into platoon values(null,#{orderId},#{takeoutUserId});")
    public int insertPlatoon(PlatoonEntity platoon);
  }

 


  6、PlatoonConsumer

 

  package com.sc.rabbitmq_platoon2051.consumber;

 

  import com.alibaba.fastjson.JSONObject;
  import com.rabbitmq.client.Channel;
  import com.sc.rabbitmq_platoon2051.entity.PlatoonEntity;
  import com.sc.rabbitmq_platoon2051.mapper.PlatoonMapper;
  import com.sc.rabbitmq_platoon2051.rabbitmq.RabbitmqConfig;
  import org.apache.commons.lang3.StringUtils;
  import org.springframework.amqp.core.Message;
  import org.springframework.amqp.rabbit.annotation.RabbitListener;
  import org.springframework.beans.factory.annotation.Autowired;
  import org.springframework.messaging.handler.annotation.Headers;
  import org.springframework.stereotype.Component;
  import java.io.IOException;
  import java.util.Map;

 

  /**
    * 派单服务
  */
  @Component
  public class PlatoonConsumer {

 

    @Autowired
    private PlatoonMapper platoonMapper;

 

    @RabbitListener(queues =RabbitmqConfig.ORDER_PLATOO_QUEUE)//下单且派单队列
    public void process(Message message, @Headers Map<String,Object> headers,
      Channel channel) throws IOException {
      String messageId=message.getMessageProperties().getMessageId();
      String msg=new String(message.getBody(),"UTF-8");
      System.out.println("派单平台"+msg+",消息id:"+messageId);
      JSONObject jsonObject=JSONObject.parseObject(msg);
      String orderId=jsonObject.getString("orderId");
      if(StringUtils.isEmpty(orderId)){
        //日志记录
        return;
      }
      PlatoonEntity platoonEntity=new PlatoonEntity();
      //订单id
      platoonEntity.setOrderId(orderId);
      //外卖员id
      platoonEntity.setTakeoutUserId(Long.valueOf("121"));
      try{
        int result=platoonMapper.insertPlatoon(platoonEntity);
        if(result>0){
          //手动签收信息,通知msg服务端删除该信息
          channel.basicAck(message.getMessageProperties().getDeliveryTag()
          ,false);
        }
      }catch (Exception e){
        e.printStackTrace();
        //丢该消息
        channel.basicNack(message.getMessageProperties().getDeliveryTag()
        ,false,false);
      }
    }
  }

 


  7、RabbitmqPlatoon2051Application
  package com.sc.rabbitmq_platoon2051;

 

  import org.mybatis.spring.annotation.MapperScan;
  import org.springframework.boot.SpringApplication;
  import org.springframework.boot.autoconfigure.SpringBootApplication;

 

  @MapperScan("com.sc.rabbitmq_platoon2051.mapper")
  @SpringBootApplication
  public class RabbitmqPlatoon2051Application {

 

    public static void main(String[] args) {
      SpringApplication.run(RabbitmqPlatoon2051Application.class, args);
    }

 

  }