kafka代码示例

发布时间 2023-10-29 14:43:25作者: 乐之者v

安装kafka:

Windows安装kafka, 详情见:https://blog.csdn.net/sinat_32502451/article/details/133067851

Linux 安装kafka,详情见:https://blog.csdn.net/sinat_32502451/article/details/133080353

添加依赖包:

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.1.10.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.0</version>
        </dependency>

kafka配置:

在 application.properties 添加以下配置:

### kafka生产者
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

### kafka消费者
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.max-poll-records=1
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.listener.concurrency=5

生产者代码:

  • KafkaProducerService :

生产者发送消息。

@Component
public class KafkaProducerService {


    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    /**
     * 发送消息,处理回调。
     * 在发送消息时会自动创建你设置的 topic。
     *
     */
    public void send()  {
        MyMsg myMsg = new MyMsg();
        myMsg.setName("lin");
        myMsg.setId("1234");

        //发送消息
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("myTopic1", "key", JSON.toJSONString(myMsg));
        //处理回调的结果,比如消息发送失败的处理。如果不需要回调,也可以不处理。
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

            @Override
            public void onFailure(Throwable ex) {
                System.out.println("消息发送失败." + ex);
            }

            @Override
            public void onSuccess(SendResult<String, String> result) {
                ProducerRecord<String, String> producerRecord = result.getProducerRecord();
                RecordMetadata recordMetadata = result.getRecordMetadata();
                System.out.println("消息发送成功.producerRecord:"+ JSON.toJSONString(producerRecord)
                                + ",recordMetadata:" + JSON.toJSONString(recordMetadata));

            }

        });

    }


}

  • 调用生产者发送消息:
@RestController
@RequestMapping("/")
public class KafkaController {

    @Autowired
    private KafkaProducerService kafkaProducerService;

    @PostMapping(value = "/kafka/send")
    public void send()  {
        kafkaProducerService.send();
    }

}

消费者代码:

  • KafkaConsumerService:
@Component
public class KafkaConsumerService {


    /**
     * Kafka监听器,可以监听消息。
     * 指定需要监听的 kafka 主题 topics,可以是多个topic.
     * 指定消费者群组 groupId,可以不写.
     *
     */
    @KafkaListener( topics = {"myTopic1"} , groupId ="myGroup")
    public void consume(ConsumerRecord<String, String> consumerRecord)  {
        System.out.println("消费者接收到信息,内容为:" + consumerRecord.value());
        System.out.println("偏移量:" +  consumerRecord.offset());

    }


}

测试结果 :

调用生产者发送消息,消费者成功接收到消息,类似如下:

消费者接收到信息,内容为:{"id":"1234","name":"lin"}
偏移量:19