Spring Cloud Stream

发布时间 2023-12-15 10:50:08作者: 安浩阳

Spring Cloud Stream

官方文档 用法参考文档 官方代码Demo

事件驱动架构EDA)是一种软件架构范例。事件生产者和事件消费者是 EDA 的两个主要组成部分。生产者的责任是感知任何状态变化并将该状态呈现为事件消息。生产者不知道谁是该事件的消费者以及该事件的结果是什么。事件的传输将通过事件通道进行。

EDA 模型有两种类型:

  1. Pub/Sub模型:在该模型中,当事件发生或产生时,系统会将该事件作为消息放入事件流中,监听该流的订阅者将消费该事件。
  2. 事件流:在此模型中,事件写入日志中,事件使用者不订阅事件流。它可以随时从流的任何部分读取事件。

Spring Cloud Stream 是 Spring 库,它帮助开发人员专注于核心应用程序而不是样板连接器代码。它将许多流行的消息传递平台统一到一个易于使用的 API 背后,包括 RabbitMQ、Apache Kafka、Amazon Kinesis、Google PubSub、Solace PubSub+、Azure 事件中心和 Apache RocketMQ。Spring Cloud Stream支持WebFlux、Multi IO(多个输入和输出)、Multi Binder(您可以在单个应用程序中分别使用Kafka和RabbitMQ作为输入和输出通道)、Kafka Stream功能(例如KTable和KStream等)。

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
package com.cloud.data.stream;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration;
@SpringBootApplication(exclude=ContextFunctionCatalogAutoConfiguration.class)
public class EventStreamApplication {
    public static void main(String[] args) {
    	SpringApplication.run(EventStreamApplication.class);
    }
}
package com.cloud.data.stream;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface EventStream {
	String INBOUND = "event-consumer";
	String OUTBOUND = "event-producer";

	@Input(INBOUND)
	SubscribableChannel consumer();

	@Output(OUTBOUND)
	MessageChannel producer();
}
package com.cloud.data.stream;

import org.springframework.cloud.stream.annotation.EnableBinding;

@EnableBinding(EventStream.class)
public class EventStreamConfig {

}
spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
      bindings:
        event-consumer:
          destination: data_stream
          contentType: application/json
        event-producer:
          destination: data_stream
          contentType: application/json
package com.cloud.data.stream;


public class Message {
	private Integer id;
	private String name;
	private String data;
	private byte[] bytePayload;

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}


	public Integer getId() {
		return id;
	}

	public void setId(Integer id) {
		this.id = id;
	}

	public String getData() {
		return data;
	}

	public void setData(String data) {
		this.data = data;
	}

	public byte[] getBytePayload() {
		return bytePayload;
	}

	public void setBytePayload(byte[] bytePayload) {
		this.bytePayload = bytePayload;
	}

}
package com.cloud.data.stream;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/api/event")
public class EventController {
	@Autowired
	private EventStreamService eventStreamService;

	@PostMapping("/produce")
	public Boolean sendEvent(@RequestBody Message msg) throws Exception {
		return eventStreamService.produceEvent(msg);
	}

}
package com.cloud.data.stream;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;

@Service
public class EventStreamService {
	@Autowired
	private EventStream eventStream;

	public Boolean produceEvent(Message msg) {
		System.out.println("Producing events --> id: "+ msg.getId() +" name: "+msg.getName()+" Actual message: "+ msg.getData());
		msg.setBytePayload(msg.getData().getBytes());
		MessageChannel messageChannel = eventStream.producer();
		return messageChannel.send(MessageBuilder.withPayload(msg)
				.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build());
	
	}

}
package com.cloud.data.stream;

import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.handler.annotation.Payload;
@Configuration
public class EventConsumer {
	@StreamListener(EventStream.INBOUND)
	public void consumeEvent(@Payload Message msg) {
		System.out.println("Inbound message--> id: " + msg.getId() + " name: " + msg.getName() + " Actual message: "
				+ msg.getData() + " bytePayload: " + msg.getBytePayload());
	}

}