SpringCloud之Stream消息驱动RocketMQ讲解

发布时间 2023-06-29 11:48:03作者: 上善若泪

1 Stream消息驱动

本文是以 RocketMQ 为例讲解,点击此处了解SpringBoot整合RocketMQ

1.1 简介

1.1.1 定义

Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它可以基于 Spring Boot 来创建独立的、可用于生产的 Spring 应用程序。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。简单的说,Spring Cloud Stream本质上就是整合了Spring BootSpring Integration,实现了一套轻量级的消息驱动的微服务框架。

1.1.2 抽象模型

我们都知道市面上有很多消息中间件,Sping Cloud Stream 为了可以集成各种各样的中间件,它抽象出了 Binder 的概念,每个消息中间件都需要有对应自己的 Binder。这样它就可以根据不同的 Binder 集成不同的中间件。下图的input和outputchannelBinder则是消息中间件和通道之间的桥梁
在这里插入图片描述

1.1.3 绑定器

通过使用 Spring Cloud Stream,可以有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。但是目前 Spring Cloud Stream 只支持 RabbitMQKafka 的自动化配置。
Spring Cloud Stream 提供了 Binder (负责与消息中间件进行交互),我们则通过 inputs 或者 outputs 这样的消息通道与 Binder 进行交互。

Binder 绑定器是 Spring cloud Stream 中一个非常重要的概念,实现了应用程序和消息中间件之间的隔离,同时我们也可以通过应用程序实现,消息中间件之间的通信。在我们的项目的可以继承多种绑定器,我们可以根据不同特性的消息使用不同的消息中间件。Spring Cloud Stream 为我们实现了 RabbitMQ 和Kafka 的绑定器。如果你想使用其他的消息中间件需要自己去实现绑定器接口.

1.2 操作实操

1.2.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>RocketMQDemo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <spring.boot.version>2.6.11</spring.boot.version>
        <spring.cloud.version>2021.0.4</spring.cloud.version>
        <spring.cloud.alibaba>2021.0.4.0</spring.cloud.alibaba>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.9.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-remoting</artifactId>
            <version>4.9.4</version>
        </dependency>

    </dependencies>

    <dependencyManagement>
    <dependencies>
        <!--springboot父依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>${spring.boot.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
        <!--springcloud父依赖-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring.cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
        <!--springcloudalibaba父依赖-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-alibaba-dependencies</artifactId>
            <version>${spring.cloud.alibaba}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>

    </dependencies>
    </dependencyManagement>

</project>

1.2.2 操作实体

@Data
@ToString
@NoArgsConstructor
@AllArgsConstructor
public class UserEntity {
    private String name;//账号
    private String pass;//密码
}

1.3 Stream 3.x 之前操作

虽然在 SpringCloudStream 3.x 版本后是可以看到 @StreamListener@EnableBinding 都打上了@Deprecated 注解,但是不妨碍我们测试学习

1.3.1 自定义通道

package cn.mq;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface MyChannel {

    String INPUT = "test-input";
    String OUTPUT = "test-output";

    /**
     * 这两个通道可能定义在两个不同的通道里面,这里为了方便放在同一个项目中演示
     */
    // 收(订阅频道/消息消费者)
    @Input(INPUT)
    SubscribableChannel input();
    // 发(消息生产者)
    @Output(OUTPUT)
    MessageChannel output();
}

1.3.2 消费消息

此处可以使用我们自定义的通道,也可以使用原装的 Sink.class

package cn.mq;

import cn.entity.UserEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;

@Slf4j
//@EnableBinding(Sink.class)
@EnableBinding(MyChannel.class)
public class ReceiveMQ {
    @StreamListener(MyChannel.INPUT)
    public void receive(UserEntity entity){
        log.info("收到消费消息:{}",entity.toString());
    }
}

默认情况下,如果消费者是一个集群,此时,一条消息会被多次消费。通过消息分组,我们可以解决这个问题。

添加如下配置分组,放入组 g1:

spring.cloud.stream.bindings.test-input.group=g1
spring.cloud.stream.bindings.test-output.group=g1

1.3.3 发送消息

package cn.controller;

import cn.entity.UserEntity;
import cn.mq.MyChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
public class MQController {
    @Autowired
    private MyChannel myChannel;
    @GetMapping("/test")
    public void test(){
        UserEntity userEntity = new UserEntity("hello", "pass");
        boolean send = myChannel.output().send(MessageBuilder.withPayload(userEntity).build());
        log.info("发送消息:{},结果:{}",userEntity.toString(),send);
    }
}

其中,MessageBuilderSpring Integration中用于创建消息的工具类。以下是createMessage, fromMessagewithPayload方法的区别:

  • createMessage:这是一个静态方法,用于创建一个新的消息。你需要提供消息的负载(payload)和消息头(header)。
    例如:Message<String> message = MessageBuilder.createMessage("Hello World", new MessageHeaders(headers));
  • fromMessage:这个方法用于从一个已存在的消息创建一个新的消息。新的消息将会有相同的负载和消息头。这个方法通常在你想修改一个已存在消息的部分属性但保持其他部分不变时使用。
    例如:Message<String> newMessage = MessageBuilder.fromMessage(oldMessage).setHeader("newHeader", "newValue").build();
  • withPayload:这个方法用于设置消息的负载。你可以链式地调用其他方法(如setHeader)来设置消息头。
    例如:Message<String> message = MessageBuilder.withPayload("Hello World").setHeader("headerKey", "headerValue").build();

总的来说,这三个方法提供了灵活的方式来创建和修改消息,你可以根据具体的需求来选择使用哪一个。

1.3.4 配置文件

spring:
  application:
    name: rokcet-mq-demo
  cloud:
    stream:
      bindings: # 配置消息通道的信息
        test-input: # 自定义消费 通道
          destination: test-optic
          group: test
          binder: rocketmq
        test-output: # 自定义发送 通道
          destination: test-optic
          group: test
          binder: rocketmq
      rocketmq:
        binder:
          name-server: ip:port
          group: test #此处定义整体消费者组名字

1.4 Stream 3.x 之后操作

1.4.1 Stream 3.x 之后讲解

由于 SpringCloudStream 3.x 版本后是 可以看到 @StreamListener@EnableBinding 都打上了@Deprecated 注解。后续的版本更新中会逐渐替换成函数式的方式实现。
既然通过四大函数式接口的方式替换了注解的方式 那么该如何进行绑定呢?
通过 spring.cloud.stream.function.definition:名称的方式进行绑定 公开 topic
不管是创建 Consumer 还是 Supplier 或者是 Function Stream都会将其的 方法名称 进行 一个 topic拆封绑定 假设 创建了一个 Consumer< String > myTopic 的方法,Stream 会将其 拆分成 Inout 两个通道:

  • 输入 - <functionName> + -in- + < index >
    myTopic-in-0
  • 输出 - <functionName> + -out- + < index >
    myTopic-out-0

注意:这里的 functionName需要和代码声明的函数名称还有spring.cloud.stream.function.definition下的名称保持一致

1.4.2 消费消息

package cn.mq;

import cn.entity.UserEntity;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;

import java.util.function.Consumer;

@Configuration
public class ReceiveMQ {
    @Bean
    public Consumer<Message<UserEntity>> myTopicC(){
        return (data)->{
            UserEntity user = data.getPayload();
            MessageHeaders headers = data.getHeaders();
            System.out.println("myTopicC 接收一条记录:" + user);
            System.out.println("getHeaders headerFor:" + headers.get("for"));
        };
    }
  }

1.4.3 发送消息

1.4.3.1 自动发送

package cn.mq;

import cn.entity.UserEntity;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

import java.util.function.Supplier;

@Configuration
public class SendMQ {
    Integer i = 1;
    @Bean
    public Supplier<Message<UserEntity>> myTopicP() {
        return () -> {
            UserEntity entity = new UserEntity();
            entity.setPass(i++ + "");
            entity.setName(Thread.currentThread().getName());
            System.out.println("myTopicP 发送一条记录:" + entity);
            return MessageBuilder
                    .withPayload(entity)
                    .build();
        };
    }
}

这种方式定义 suppelier 会 默认1000ms 发送一次记录
可以修改:spring.cloud.stream.poller:fixedDelay: 延迟毫秒值

1.4.3.2 手动触发

通过 StreamBridge 触发

package cn.controller;

import cn.entity.UserEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
public class MQController {

    @Autowired
    private StreamBridge streamBridge;

    @GetMapping("/test")
    public void sendMsg() {
        UserEntity entity = new UserEntity("hello","world");
        System.out.println("sendMsg 发送一条记录:" + entity);
        streamBridge
                .send(
                        "myTopicP-out-0",
                        MessageBuilder.withPayload(entity)
                                .setHeader("for", "这是一个请求头~")
                                .build());
    }
}

1.4.4 配置文件

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: localhost:9876
# -------------- 分割线 ---------------
      function:
      # 组装和绑定
      # 手动时把 myTopicP 去掉
        definition: myTopicC;myTopicP
      bindings:
        myTopicC-in-0:
          destination: my-topic
          group: test
       myTopicP-out-0:
          destination: my-topic

1.4.5 中转函数Function

Function< String,String > 范型中有两个参数 :一个入参,一个出参,所以在Stream中可以用来作于一个消息中转站来使用。相当于 top-1 接受到消息 但是我不想处理 我对其数据进行一次处理 发送到 top-2 通道,交给top-2 进行数据的最终处理。

采用手动触发示例,在上面改造测试:

@Bean
public Consumer<UserEntity> testFunctionQ(){
    return (data)->{
        System.out.println("testFunctionQ 消息中转后接收一条记录:" + data);
    };
}

@Bean
public Function<UserEntity, UserEntity> testFunction() {
    return value -> {
        System.out.println("中转 testFunction: " + value);
        value.setPass(value.getPass().toUpperCase());
        value.setName(value.getName().toUpperCase());
        return value;
    };
}

配置文件:

spring:
  application:
    name: rokcet-mq-demo
  cloud:
    stream:
      bindings:
        myTopicP-out-0:
          destination: test-topic
        testFunction-in-0:
          destination: test-topic
          group: my_input_group
        testFunction-out-0:
          destination: test-topic-Q
        testFunctionQ-in-0:
          destination: test-topic-Q
          group: my_input_group-Q

      rocketmq:
        binder:
          name-server: localhost:9876
          group: test
      function:
        definition: testFunction;testFunctionQ

1.5 配置文件讲解

1.5.1 spring.cloud.function.definition

spring.cloud.function.definition 是一个配置属性,用于指定 Spring Cloud Function 应用程序中的函数定义。
这个属性的值是一个以 逗号分隔(如果用逗号分隔有顺序问题,还是最好用分号分隔)的字符串,表示要使用的函数、消费者(Consumer)或生产者(Supplier)的名称。
Spring Cloud Stream 中,这个属性用于将函数、消费者或生产者与消息队列(如 RabbitMQKafka 等)进行绑定。当指定为 Supplier 时,它将作为消息队列的生产者,负责生成并发送消息;当指定为 Consumer 时,它将作为消息队列的消费者,负责接收并处理消息。
例如,假设有一个名为 process 的函数,你可以通过以下配置将其作为消费者与消息队列进行绑定:

spring.cloud.function.definition=process

这样,process 函数将作为消息队列的消费者,接收并处理来自队列的消息。同样,可以将 Supplier 与消息队列进行绑定,作为生产者生成并发送消息。

1.5.2 spring.cloud.stream.binders和bindings区别

spring.cloud.stream.bindersspring.cloud.stream.bindings都是Spring Cloud Stream的配置属性,但它们的用途是不同的。

  • spring.cloud.stream.binders用于配置消息中间件的连接信息。
    例如,如果使用的是 RabbitMQ,你需要在这里配置 RabbitMQ 的主机名、端口、用户名和密码等信息。可以配置多个binder,每个binder对应一个消息中间件。
  • spring.cloud.stream.bindings用于配置消息通道的信息。在Spring Cloud Stream中,消息通道是消息生产者和消费者之间的桥梁。可以在这里配置通道的名称、目标(对应消息中间件中的队列或主题名)、分区等信息。

简单来说,spring.cloud.stream.binders是用来配置消息中间件的,而spring.cloud.stream.bindings是用来配置消息通道的。

spring:
  cloud:
    stream:
      # 如果你项目里只对接一个中间件,那么不用定义binders
      # 当系统要定义多个不同消息中间件的时候,使用binders定义
      binders:
        my-rabbit:
          type: rabbit # 消息中间件类型
          environment: # 连接信息
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings:
        # 添加coupon - Producer
        addCoupon-out-0:
          destination: request-coupon-topic
          content-type: application/json
          binder: my-rabbit
        # 添加coupon - Consumer
        addCoupon-in-0:
          destination: request-coupon-topic
          content-type: application/json
          # 消费组,同一个组内只能被消费一次
          group: add-coupon-group
          binder: my-rabbit  

1.5.3 消费分组

Spring Cloud Stream中,发送者(Producer)不需要分组,只有消费者(Consumer)需要分组。
分组的主要目的是为了实现消息的广播或者分区。当多个消费者在同一个组中时,消息会被任何一个消费者消费,但不会被同一组的所有消费者消费,这就实现了消息的负载均衡。如果每个消费者有自己的组,那么每个消费者都会收到一份消息的拷贝,这就实现了消息的广播。

1.5.4 spring.cloud.stream.rocketmq.binder.group和spring.cloud.stream.bindings.通道名字.group两个属性区别

spring.cloud.stream.rocketmq.binder.group 是全局配置,用于设置默认的消费组名。如果没有在具体的通道中设置消费组名,那么就会使用这个全局配置。

spring.cloud.stream.bindings.通道名字.group 是针对具体通道的配置,用于设置该通道的消费组名。如果在这里设置了消费组名,那么就会覆盖全局配置。

总的来说,这两个属性都是用于设置消费组名的,但是作用范围不同,一个是全局的,一个是针对具体通道的。

报错Property 'group' is required - producerGroup
这时候就需要在 spring.cloud.stream.rocketmq.binder.group属性中设置值,就不会报错了