kafka_demo

发布时间 2023-08-06 10:49:37作者: hemeiwolong

参考:

概念:https://zhuanlan.zhihu.com/p/74063251

代码运用:https://zhuanlan.zhihu.com/p/114209326

 

参考 kafka 在windows 平台的搭建和简单实用_一代键客的博客-CSDN博客,先验证本地是否能使用kafka成功生产消费消息,如果因为版本问题遇到报错“bootstrap-server is not a recognized option”,参考 Kafka踩坑记----bootstrap-server is not a recognized option如何解决_南望南山的博客-CSDN博客 、Kafka参数zookeeper和bootstrap-server的区别 - Clotho_Lee - 博客园 (cnblogs.com)解决。

启动zookeeper

 启动kafka

 创建topic,生产消息

 能成功消费到消息

 再用代码实现生产消费消息

Producer.java

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class Producer implements Runnable {
    private KafkaProducer<String, String> producer;
    private String topic;

    public Producer(String topic) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("value.serializer", StringSerializer.class.getName());
        this.producer = new KafkaProducer<String, String>(properties);
        this.topic = topic;
    }

    @Override
    public void run() {
        int count = 0;
        while (count < 10) {
            String msg = "NO. " + count + " msg";
            System.out.println("produce NO." + count + " msg...");
            producer.send(new ProducerRecord<>(topic, "msg", msg));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            count++;
        }
        producer.close();
    }

    public static void main(String[] args) {
        new Thread(new Producer("kafka")).start();
    }
}

  

Consumer.java

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Properties;

public class Consumer implements Runnable {
    private KafkaConsumer<String, String> consumer;
    private String groundId;
    private String topic;
    private ConsumerRecords<String, String> msgList;

    public Consumer(String topic, String groundId) {
        this.topic = topic;
        this.groundId = groundId;
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", groundId);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer<String, String>(props);
        this.consumer.subscribe(Arrays.asList(topic));
    }

    @Override
    public void run() {
        while (true) {
            msgList = consumer.poll(1000);
            System.out.println("poll msg,msgList size:" + msgList.count());
            if (msgList != null && msgList.count() > 0) {
                int idx = 0;
                for (ConsumerRecord<String, String> record : msgList) {
                    System.out.println("consume NO." + idx + " msg:key " + record.key() + " val " + record.value() +
                            " offset" + record.offset());
                    idx++;
                }
            } else {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    public static void main(String[] args) {
        new Thread(new Consumer("kafka", "groundA")).start();
    }
}

  

先启动生产者,再启动消费者,运行结果: