springcloud stream kafka实践

发布时间 2023-07-19 18:22:51作者: 天晴修屋顶

Spring Cloud Stream是Spring Cloud提供的一个用于构建消息驱动的微服务的框架。它简化了消息系统(如Kafka,rabbitMQ)的使用和集成,使开发者可以更专注于业务逻辑的实现。

项目结构如下

 

一、移入依赖

创建一个springboot web项目引入依赖

 1 <properties>
 2     <java.version>1.8</java.version>
 3     <spring-cloud.version>Finchley.SR2</spring-cloud.version>
 4   </properties>
 5 
 6   <dependencyManagement>
 7     <dependencies>
 8       <dependency>
 9         <groupId>org.springframework.cloud</groupId>
10         <artifactId>spring-cloud-dependencies</artifactId>
11         <version>${spring-cloud.version}</version>
12         <type>pom</type>
13         <scope>import</scope>
14       </dependency>
15     </dependencies>
16   </dependencyManagement>
17 
18   <dependencies>
19     <dependency>
20       <groupId>org.springframework.boot</groupId>
21       <artifactId>spring-boot-starter-web</artifactId>
22     </dependency>
23 
24     <dependency>
25       <groupId>org.springframework.cloud</groupId>
26       <artifactId>spring-cloud-stream-binder-kafka</artifactId>
27     </dependency>
28   </dependencies>

二、配置消息中间件

这里先以kafka为例, 事先要准备kafka

 1 # 生成者配置
 2 spring:
 3   kafka:
 4     bootstrap-servers: 192.168.3.100:9092
 5   cloud:
 6     stream:
 7       bindings:
 8         output:
 9           destination: ${kafka.topic}
10         input:
11           destination: ${kafka.topic}
12 kafka:
13   topic: cloud-stream

 

三、消息生产者producer

 1 import org.springframework.beans.factory.annotation.Autowired;
 2 import org.springframework.beans.factory.annotation.Qualifier;
 3 import org.springframework.cloud.stream.annotation.EnableBinding;
 4 import org.springframework.cloud.stream.messaging.Source;
 5 import org.springframework.messaging.MessageChannel;
 6 import org.springframework.messaging.support.MessageBuilder;
 7 import org.springframework.stereotype.Component;
 8 
 9 /**
10  * @Classname MessageProducer
11  * @Created by Michael
12  * @Date 2023/7/19
13  * @Description 消息生产者
14  */
15 @Component
16 @EnableBinding(Source.class)
17 public class MessageProducer {
18   @Autowired
19   @Qualifier(Source.OUTPUT)
20   private MessageChannel messageChannel;
21 
22   @Autowired
23   private Source source;
24 
25   /**
26    * 发送消息
27    * @param message
28    */
29   public void send(String message){
30     //通过消息管道发送消息
31 //    messageChannel.send(MessageBuilder.withPayload(message).build());
32     source.output().send(MessageBuilder.withPayload(message).build());
33   }
34 }

四、消息消费者consumer

 1 import org.springframework.beans.factory.annotation.Autowired;
 2 import org.springframework.beans.factory.annotation.Qualifier;
 3 import org.springframework.cloud.stream.annotation.EnableBinding;
 4 import org.springframework.cloud.stream.annotation.StreamListener;
 5 import org.springframework.cloud.stream.messaging.Sink;
 6 import org.springframework.integration.annotation.ServiceActivator;
 7 import org.springframework.messaging.SubscribableChannel;
 8 import org.springframework.stereotype.Component;
 9 
10 import javax.annotation.PostConstruct;
11 
12 /**
13  * @Classname MessageConsumer
14  * @Created by Michael
15  * @Date 2023/7/19
16  * @Description 消息消费者
17  */
18 @Component
19 @EnableBinding({Sink.class})
20 public class MessageConsumer {
21   @Autowired
22   @Qualifier(Sink.INPUT)
23   private SubscribableChannel subscribableChannel;
24 
25   //有3中订阅方式
26   //3.1 当 subscribableChannel注入后完成回调,可以拿到MessageHandler对象
27 //  @PostConstruct
28 //  public void init() {
29 //    subscribableChannel.subscribe(message -> {
30 //      System.out.println(message.getPayload());
31 //    });
32 //  }
33 
34   //3.2 使用ServiceActivator
35   @ServiceActivator(inputChannel = Sink.INPUT)
36   public void messageActivator(String message) {
37     System.out.println("@ServiceActivator -> " + message);
38   }
39 
40   //3.3 使用@StreamListener
41   @StreamListener(Sink.INPUT)
42   public void onMessage(String message) {
43     System.out.println("@StreamListener -> " + message);
44   }
45 
46 }

五、测试结果

创建一个controller,接收请求

 1 import com.mike.study.springcloudstreamkafka.producer.MessageProducer;
 2 import org.springframework.beans.factory.annotation.Autowired;
 3 import org.springframework.web.bind.annotation.GetMapping;
 4 import org.springframework.web.bind.annotation.PostMapping;
 5 import org.springframework.web.bind.annotation.RequestParam;
 6 import org.springframework.web.bind.annotation.RestController;
 7 
 8 /**
 9  * @Classname StreamController
10  * @Created by Michael
11  * @Date 2023/7/19
12  * @Description controller
13  */
14 @RestController
15 public class StreamController {
16   @Autowired
17   MessageProducer messageProducer;
18 
19   @GetMapping("send/msg")
20   public boolean sendMsg(@RequestParam("message") String message){
21     messageProducer.send(message);
22     return true;
23   }
24 
25   @PostMapping("send/msg")
26   public boolean sendMsg1(@RequestParam("message") String message){
27     messageProducer.send(message);
28     return true;
29   }
30 }

发起GET/POST请求,请求3次

 

 

 

控制台查看结果

 结果可以看到,3种订阅方式,轮询优先使用@StreamListener, 然后是@ServiceActivator, 最后是@PostConstruct.。

总结,这里使用了GET/POST这2种请求都可以,同时procedure和consumer中没有kafka的有关代码,后期如果切换消息中间件,只需修改配置文件即可。