Kafka快速使用与详解

发布时间 2023-03-25 17:11:01作者: 手可敲星辰脚驾七彩云

一、Kafka简介

1. 概念

 Kafka是一个分布式的、基于发布/订阅的消息队列,最初由LinkedIn开发,并于2011年成为Apache项目的一部分。Kafka具有高吞吐量可扩展性持久性容错性等特性,被广泛应用于大数据领域,如数据采集、数据传输、数据处理等场景。

Kafka主要由以下三个部分组成:

  • Producer:生产者,负责生产消息并将其发送到Kafka集群。

  • Broker:Kafka集群中的服务器,负责存储消息并进行消息的路由和传输。

  • Consumer:消费者,负责从Kafka集群中订阅并消费消息。

Kafka的消息模型基于发布/订阅模式,可以将消息分为多个主题(Topic),每个主题可以分为多个分区(Partition),每个分区可以分配到不同的Broker上进行存储。Kafka采用了基于时间的存储机制,即消息一旦被写入分区,就会被保留一定的时间,即使已经被消费也不会立即删除。

Kafka的使用步骤主要包括以下几个方面:

  1. 安装部署Kafka集群:根据实际需求安装部署Kafka集群。

  2. 创建Topic:使用Kafka提供的工具创建需要的Topic。

  3. 生产者生产消息:使用Kafka提供的API或者客户端工具,向指定Topic中生产消息。

  4. 消费者消费消息:使用Kafka提供的API或者客户端工具,从指定的Topic中消费消息。

  5. 监控和管理Kafka集群:使用Kafka提供的工具监控和管理Kafka集群,如监控集群健康状态、处理消息堆积等。

2. 简单示例
  1. 创建Topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
  1. 生产者生产消息:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
  1. 消费者消费消息:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

二、详细示例

  1. 安装Kafka
    首先,需要下载并安装Kafka。在安装完成之后,需要启动Kafka集群和Zookeeper服务器。可以通过以下命令启动:
$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/kafka-server-start.sh config/server.properties
  1. 创建Topic
    在使用Kafka发送和消费消息之前,需要创建一个Topic。
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

上述命令将创建一个名为“test”的Topic,它有1个副本和1个分区。可以通过以下命令来查看已创建的Topic:

$ bin/kafka-topics.sh --list --zookeeper localhost:2181

  1. 发送消息
    可以使用以下命令向Topic发送消息:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

在控制台输入消息后,按“Enter”键即可将消息发送到Topic中。
  1. 消费消息
    可以使用以下命令从Topic中消费消息:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

上述命令将从Topic“test”中消费消息,并将它们输出到控制台。如果要停止消费消息,请按“Ctrl-C”。
  1. 使用Java API发送消息
    可以使用Java API来发送消息。以下是一个发送消息的示例代码:
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import java.util.Properties;

public class KafkaProducerExample {
   public static void main(String[] args) throws Exception{
      String topicName = "test";
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");         
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

      Producer<String, String> producer = new KafkaProducer<>(props);
            
      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i)));

      System.out.println("Message sent successfully");
      producer.close();
   }
}

  1. 使用Java API消费消息
    可以使用Java API来消费消息。以下是一个消费消息的示例代码:
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
   public static void main(String[] args) throws Exception {
      String topicName = "test";
      Properties props = new Properties();
      props