7.Kafka,构建TB级异步消息系统

发布时间 2023-12-29 21:00:30作者: 壹索007

1.阻塞队列

  • BlockingQueue
    • 解决线程通信的问题。
    • 阻塞方法:put、take。
  • 生产者消费者模式
    • 生产者:产生数据的线程。
    • 消费者:使用数据的线程。
  • 实现类
    • ArrayBlockingQueue
    • LinkedBlockingQueue
    • PriorityBlockingQueue、SynchronousQueue、DelayQueue等。

 面试题:写一个生产者消费者实现

public class Test {
    public static void main(String[] args) {
        BlockingQueue<String> queue = new LinkedBlockingDeque<>(10);
        Producer p = new Producer(queue);
        Consumer c = new Consumer(queue);
        new Thread(p,"producer").start();
        new Thread(c,"consumer").start();
    }
}

class Consumer implements Runnable {
    private BlockingQueue<String> queue;
    public Consumer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while(true){
                Thread.sleep(20);
                System.out.println("消费者消费了:" + queue.take());
            }
        }catch (InterruptedException e) {
                e.printStackTrace();
            }
    }
}

class Producer implements Runnable{
    private BlockingQueue<String> queue;
    public Producer(BlockingQueue<String> queue){
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 5; i++) {
                String tmp = "a product " + i + " from:" + Thread.currentThread().getName();
                System.out.println("生产者生产了:" + tmp);
                queue.put(tmp);
                Thread.sleep(20);
            }
        }catch (InterruptedException e) {
                e.printStackTrace();
        }
    }
}

2.kafka入门

  • Kafka简介
    • Kafka是一个分布式的消息队列。
    • 应用:消息系统、日志收集、用户行为追踪、流式处理。
  • Kafka特点
    • 高吞吐量、消息持久化、高可靠性、高扩展性。
  • Kafka术语
    • Broker:Kafka的服务器
    • Zookeeper:管理集群
    • Topic:点对点模式中每个消费者拿到的消息都不同,发布订阅模式中消费者可能拿到同一份消息。Kafka采用发布订阅模式,生产者把消息发布到的空间(位置)就叫Topic
    • Partition:是对Topic位置的分区,如下图:
      img
    • Offset:就是消息在分区中的索引
      img
    • Leader Replica:主副本,可以处理请求
    • Follower Replica:从副本,只是用作备份

Kafka相关链接:https://kafka.apache.org/

Windows下使用Kafka

  在2.8以前,kafka安装前需要安装zookeeper。如果不需要额外使用zookeeper其他功能,可以安装2.8以后的版本。

在启动前改一下相关配置:

(1)解压kafka压缩包

(2)config下的zookeeper.properties

(3)config下的server.properties

  注意启动的时候先zookeeper后kafka,停止的时候先kafka后zookeeper。

 (4) 启动zookeeper

cd E:\Software\Kafka\kafka_2.12-3.4.0
bin\windows\zookeeper-server-start.bat config\zookeeper.properties

报错:INFO ZooKeeper audit is disabled. (org.apache.zookeeper.audit.ZKAuditProvider),需要改zookeeper.properties

(5)启动kafka

bin\windows\kafka-server-start.bat config\server.properties

启动完成后会出现配置的文件夹

(6) 创建主题

bin\windows\kafka-topics.bat --create --topic topicDemo --bootstrap-server localhost:9092

(7)显示所有topic列表

bin\windows\kafka-topics --list --bootstrap-server localhost:9092

 (8) 向主题发送消息

E:\Software\Kafka\kafka_2.12-3.4.0>bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic topicDemo

(9)消费消息

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topicDemo --from-beginning

3.Spring整合Kafka