深入理解Kafka(一)、生产者

发布时间 2023-09-11 22:36:12作者: Stitches

一、客户端开发

客户端需要具备以下功能:

  • 配置生产者客户端参数以及创建相应的生产者实例;
  • 构建待发送的消息;
  • 发送消息;
  • 关闭生产者实例;

基础 Kafka 客户端代码:

public class KafkaProducerAnalysis { 
    public static final String brokerList = "localhost:9092"; 
    public static final String topic = "topic-demo"; 
 
    // 初始化客户端配置
    public static Properties initConfig(){ 
        Properties props = new Properties(); 
        props.put("bootstrap.servers", brokerList); 
        props.put("key.serializer", 
                "org.apache.kafka.common.serialization.StringSerializer"); 
        props.put("value.serializer", 
                "org.apache.kafka.common.serialization.StringSerializer"); 
        properties.put("client.id", "producer.client.id.demo"); 
        return props; 
    } 
 
    public static void main(String[] args) { 
        Properties props = initConfig();
        // 生成生产者实例 
        KafkaProducer<String, String> producer = new KafkaProducer<>(props); 
        // 创建消息对象
        ProducerRecord<String, String> record = 
                new ProducerRecord<>(topic, "hello, Kafka!"); 
        try { 
            // 发送消息
            producer.send(record); 
        } catch (Exception e) { 
            e.printStackTrace(); 
        } 
    } 
} 

其中 KafkaProcedure 包含了多个属性,类定义如下:

public class ProducerRecord<K, V> { 
    private final String topic; //主题 
    private final Integer partition; //分区号 
    private final Headers headers; //消息头部 
    private final K key; //键 
    private final V value; //值 
    private final Long timestamp; //消息的时间戳 
    //省略其他成员方法和构造方法 
}

消息首先按照 topic 分类,然后再按照 key 进行二次分类,同一个 key 的消息会被划分到同一个分区中;value 一般为消息体不为空,为空表示墓碑消息;timestamp 是消息的时间戳,包括 CreateTimeLogAppendTime 两种类型,前者表示消息创建的时间、后者表示消息追加到日志的时间。

生产者一些重要的参数配置:

# 指定生产者客户端连接 Kafka 集群所需的 broker 地址清单,可以是一个或多个,至少需要设置两个值
bootstrap.servers=host1:port1,host2:port2....

# broker 端接收的消息以字节数组的格式存在,指定序列化器
key.serializer
value.serializer

# 设定 KafkaProducer 对应的客户端id,若客户端不设置,默认为 `procedure-xxx` 的格式
client.id

消息发送

生成 ProcedureRecord 有如下构造方法:

public ProducerRecord(String topic, Integer partition, Long timestamp,  
K key, V value, Iterable<Header> headers) 
public ProducerRecord(String topic, Integer partition, Long timestamp, 
K key, V value) 
public ProducerRecord(String topic, Integer partition, K key, V value,  
Iterable<Header> headers) 
public ProducerRecord(String topic, Integer partition, K key, V value) 
public ProducerRecord(String topic, K key, V value) 
public ProducerRecord(String topic, V value)

构建完消息之后,可以开始发送消息,发送有如下几种方式:发后即忘、同步、异步。

  • 发后即忘:procedure.send(record),只管消息是否发送,不关心消息是否达到;
  • 同步:procedure.send(record)j.get()send() 方法返回一个 Future 对象,再通过调用 get() 方法阻塞等待消息获取。或者通过 get(long timeout, TimeUnit unit) 方法实现超时阻塞;
  • 异步:异步的实现不能通过 get() 阻塞得到的 Future对象,因为 Future 里的 get() 方法在何时调用,以及怎么调用都是一个问题,消息不停的发送,那么各个消息对应的 Future 就会很混乱。可以使用 Callback() 回调函数的方式更简单,不用为每个消息注册 Future对象,只是在消息返回时自动调用注册的回调函数,实例如下:
producer.send(record, new Callback() { 
    @Override 
    public void onCompletion(RecordMetadata metadata, Exception exception) { 
        if (exception != null) { 
            exception.printStackTrace(); 
        } else { 
            System.out.println(metadata.topic() + "-" + 
                    metadata.partition() + ":" + metadata.offset()); 
        } 
    } 
}); 

kafka 一般会出现两种异常,可重试异常和不可重试异常。常见的可重试异常 NetworkExceptionLeaderNotAvailableExceptionUnknownTopicOrPartitionExceptionNotEnoughReplicasExceptionNotCoordinatorException;不可重试异常包括 RecordTooLargeException,它表示发送的消息体太大,kafka不会对此进行重试,而是直接抛出异常。对于可重试异常,如果配置了 retries 参数,只要在规定的重试时间内完成,就不会抛出异常,例如:

props.put(ProducerConfig.RETRIES_CONFIG, 10);


序列化器

生产者需要通过序列化器将对象转化为字节数组,然后才通过网络发送给 Kafka,同时对侧消费者需要通过反序列化器从 kafka 中收到的字节数组转化为对象。为了方便,消息的key和value都使用了字符串,对应程序中的
序列化器也使用了客户端自带的 org.apache.kafka.common.serialization.StringSerializer,除了 String类型的序列化器,还有 ByteArray、ByteBuffer、Bytes、Double、Integer、Long 几种类型,都实现了 org.apache.kafka.common.serialization.Serializer 接口,该接口包含如下方法:

public void configure(Map<String, ?> configs, boolean isKey) 
public byte[] serialize(String topic, T data) 
public void close()

configure() 用来配置当前类,serialize() 执行序列化操作,close() 用来关闭序列化器。StringSerializer 序列化器的实现如下:

public class StringSerializer implements Serializer<String> { 
    private String encoding = "UTF8"; 
 
    // 指定序列化的编码类型
    @Override 
    public void configure(Map<String, ?> configs, boolean isKey) { 
        String propertyName = isKey ? "key.serializer.encoding" : 
                "value.serializer.encoding"; 
        Object encodingValue = configs.get(propertyName); 
        if (encodingValue == null) 
            encodingValue = configs.get("serializer.encoding"); 
        if (encodingValue != null && encodingValue instanceof String) 
            encoding = (String) encodingValue; 
    } 
 
    // 数据序列化操作
    @Override 
    public byte[] serialize(String topic, String data) { 
        try { 
            if (data == null) 
                return null; 
            else 
                return data.getBytes(encoding); 
        } catch (UnsupportedEncodingException e) { 
            throw new SerializationException("Error when serializing " + 
                    "string to byte[] due to unsupported encoding " + encoding); 
        } 
    } 
 
    // 
    @Override 
    public void close() { 
        // nothing to do 
    } 
} 

除了系统给出的几种序列化器,用户还可以使用自定义类型的序列化器来实现,比如用户可以选择 Avro、JSON、Thrift、ProtoBuf 等序列化工具实现。

分区器

消息由生产者通过 send() 方法发送到 broker 的过程中,会经过拦截器、序列化器、分区器等部件作用后才真正发送到 broker中。其中分区器根据消息的 主题topic、键key、序列化后的键keyBytes、值value、序列化后的值valueBytes、集群元信息 cluster 计算得到具体的分区号,再发送消息到该分区中。

Kafka 默认实现的分区器为 DefaultPartitioner,它实现了 Partitioner 接口:

public class DefaultPartitioner implements Partitioner {

    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();

    public void configure(Map<String, ?> configs) {}

    /**
     * 1. 首先根据 topic 计算出可能的分区信息集合;
     * 2. 如果序列化后的 keyBytes 不为空,通过 MurmurHash2 算法计算分区号(注意这里是发送到主题的所有分区);
     * 3. 如果序列化后的 KeyButes 为空,将消息轮询的发送到主题中各个可用的分区(注意这里是主题的可用分区);
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        return counter.getAndIncrement();
    }

    public void close() {}
}

拦截器

Kafka 包含了两种拦截器,生产者拦截器和消费者拦截器。其中生产者拦截器主要用来在消息发送前做一些准备工作,比如按照某个规则过滤掉不符合要求的消息、修改消息的内容;或者在接收消息回调时做一些统计工作。

Kafka 的拦截器接口为 ProducerInterceptor,内部包含了核心的几个方法:

public interface ProducerInterceptor<K, V> extends Configurable {
    // 消息序列化或者计算消息分区前调用
    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
    
    // 消息应答之前或者消息发送失败时调用
    public void onAcknowledgement(RecordMetadata metadata, Exception exception);

    // 关闭拦截器执行的资源清理工作
    public void close();
}

可以实现 ProducerInterceptor 接口来实现自定义的消息拦截器,比如给每个发送的消息添加固定前缀,或者统计成功发送的消息数量:

public class ProducerInterceptorPrefix implements
        ProducerInterceptor<String, String> {
    private volatile long sendSuccess = 0;
    private volatile long sendFailure = 0;

    @Override
    public ProducerRecord<String, String> onSend(
            ProducerRecord<String, String> record) {
        String modifiedValue = "prefix1-" + record.value();
        return new ProducerRecord<>(record.topic(),
                record.partition(), record.timestamp(),
                record.key(), modifiedValue, record.headers());
//        if (record.value().length() < 5) {
//            throw new RuntimeException();
//        }
//        return record;
    }

    @Override
    public void onAcknowledgement(
            RecordMetadata recordMetadata,
            Exception e) {
        if (e == null) {
            sendSuccess++;
        } else {
            sendFailure++;
        }
    }

    @Override
    public void close() {
        double successRatio = (double) sendSuccess / (sendFailure + sendSuccess);
        System.out.println("[INFO] 发送成功率="
                + String.format("%f", successRatio * 100) + "%");
    }

    @Override
    public void configure(Map<String, ?> map) {
    }
}

二、生产者实现原理

客户端由两个线程组成,分别是主线程和 Sender 线程。主线程通过 KafkaProducer 创建消息然后通过拦截器、序列化器、分区器将消息发送到 RecordAccumulator 消息累加器中。Sender 线程负责从 RecordAccumulator 中获取消息并发送到 kafka 中。

主线程操作

KafkaProducer.send(xxx) 方法入手开始探究源码:

// KafkaProducer类
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    // 消息经过拦截器预处理
    ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
    // 发送消息
    return doSend(interceptedRecord, callback);
}

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    TopicPartition tp = null;
    try {
        throwIfProducerClosed();
        // first make sure the metadata for the topic is available
        ClusterAndWaitTime clusterAndWaitTime;
        try {
            clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
        } catch (KafkaException e) {
            if (metadata.isClosed())
                throw new KafkaException("Producer closed while send in progress", e);
            throw e;
        }
        long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
        Cluster cluster = clusterAndWaitTime.cluster;

        // 开始序列化 Key、Value
        byte[] serializedKey;
        try {
            serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
        } catch (ClassCastException cce) {
            throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                    " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                    " specified in key.serializer", cce);
        }
        byte[] serializedValue;
        try {
            serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
        } catch (ClassCastException cce) {
            throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                    " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                    " specified in value.serializer", cce);
        }

        // 开始调用分区器获取分区
        int partition = partition(record, serializedKey, serializedValue, cluster);
        tp = new TopicPartition(record.topic(), partition);

        setReadOnly(record.headers());
        Header[] headers = record.headers().toArray();

        int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                compressionType, serializedKey, serializedValue, headers);
        ensureValidRecordSize(serializedSize);
        long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
        log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
        // producer callback will make sure to call both 'callback' and interceptor callback
        Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

        if (transactionManager != null && transactionManager.isTransactional())
            transactionManager.maybeAddPartitionToTransaction(tp);

        // RecordAccumulator 添加消息
        RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                serializedValue, headers, interceptCallback, remainingWaitMs);
        if (result.batchIsFull || result.newBatchCreated) {
            log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
            this.sender.wakeup();
        }
        return result.future;
        // handling exceptions and record the errors;
        // for API exceptions return them in the future,
        // for other exceptions throw directly
    } catch (ApiException e) {
        log.debug("Exception occurred during message send:", e);
        if (callback != null)
            callback.onCompletion(null, e);
        this.errors.record();
        this.interceptors.onSendError(record, tp, e);
        return new FutureFailure(e);
    } catch (InterruptedException e) {
        this.errors.record();
        this.interceptors.onSendError(record, tp, e);
        throw new InterruptException(e);
    } catch (BufferExhaustedException e) {
        this.errors.record();
        this.metrics.sensor("buffer-exhausted-records").record();
        this.interceptors.onSendError(record, tp, e);
        throw e;
    } catch (KafkaException e) {
        this.errors.record();
        this.interceptors.onSendError(record, tp, e);
        throw e;
    } catch (Exception e) {
        // we notify interceptor about all exceptions, since onSend is called before anything else in this method
        this.interceptors.onSendError(record, tp, e);
        throw e;
    }
}

添加记录到 Accumulator 中缓存,并返回消息添加后的回调。其中每个分区对应一个双端队列,队列中存储 ProducerBatch对象。

// RecordAccumulator 类

public RecordAppendResult append(TopicPartition tp,
                                    long timestamp,
                                    byte[] key,
                                    byte[] value,
                                    Header[] headers,
                                    Callback callback,
                                    long maxTimeToBlock) throws InterruptedException {
    // We keep track of the number of appending thread to make sure we do not miss batches in
    // abortIncompleteBatches().
    appendsInProgress.incrementAndGet();
    ByteBuffer buffer = null;
    if (headers == null) headers = Record.EMPTY_HEADERS;
    try {
        // 1. 检查当前分区是否有可用的 ProducerBatch 对象
        Deque<ProducerBatch> dq = getOrCreateDeque(tp);
        synchronized (dq) {
            if (closed)
                throw new KafkaException("Producer closed while send in progress");
            // 2. 从双端队列的末尾添加
            RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
            if (appendResult != null)
                return appendResult;
        }

        // 3. 没有可用 ProducerBatch,创建一个新的
        byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
        // 4. 评估该消息的大小是否超过 batch.size 参数,若未超过就以 batch.size 大小来创建 ProducerBatch,这样在使用完这块区域后可以通过 BufferPool 的管理来进行复用;否则按照实际大小创建,使用完后不可复用。
        int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
        log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
        buffer = free.allocate(size, maxTimeToBlock);
        synchronized (dq) {
            // Need to check if producer is closed again after grabbing the dequeue lock.
            if (closed)
                throw new KafkaException("Producer closed while send in progress");

            // 5. 尝试添加消息到新申请的 ProducerBatch 对象中(但是这里还没开始创建新的 ProducerBatch 对象)
            RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
            if (appendResult != null) {
                // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                return appendResult;
            }

            // 6. 构建底层 MemoryRecordsBuilder 对象
            MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);

            // 7. 创建新的 ProducerBatch 对象
            ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
            FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));

            dq.addLast(batch);
            incomplete.add(batch);

            // Don't deallocate this buffer in the finally block as it's being used in the record batch
            buffer = null;

            return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
        }
    } finally {
        if (buffer != null)
            free.deallocate(buffer);
        appendsInProgress.decrementAndGet();
    }
}

/**
 *  1.获取到队列中最后一个 ProducerBatch 对象;
 *  2.如果最后一个 ProducerBatch 对象不为空,开始添加;
      2.1 已达最大容量,说明该 ProducerBatch 不能再添加消息,并调整该ProducerBatch 的 Header为不可追加;同时创建一个新的 ProducerBatch对象,
*/
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
                                        Callback callback, Deque<ProducerBatch> deque) {
    ProducerBatch last = deque.peekLast();
    if (last != null) {
        FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
        if (future == null)
            last.closeForRecordAppends();
        else
            return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
    }
    return null;
}


// ProducerBatch 类
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
    // 1. 检查当前 ProducerBatch 是否有足够的剩余空间,没有返回空
    if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
        return null;
    } else {
        // 2. 空间足够添加消息并返回成功后的回调
        Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
        this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
                recordsBuilder.compressionType(), key, value, headers));
        this.lastAppendTime = now;
        FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                timestamp, checksum,
                                                                key == null ? -1 : key.length,
                                                                value == null ? -1 : value.length);
        // we have to keep every future returned to the users in case the batch needs to be
        // split to several new batches and resent.
        thunks.add(new Thunk(callback, future));
        this.recordCount++;
        return future;
    }
}

以上为消息发送的逻辑,注意消息在网络上都是以字节形式传送的,所以发送前需要创建一块内存来保存消息数据。Kafka 通过 java.io.ByteBuffer 实现消息内存的创建和释放,但是频繁创建和释放是很耗时的,所以 Kafka 另外设计了一个 BufferPool 结构,该结构只对特定大小的 ByteBuffer 进行管理,其它大小的 ByteBuffer 不会回收,该特定大小通过 batch.size 参数指定,默认为 16KB。

public class BufferPool {

    static final String WAIT_TIME_SENSOR_NAME = "bufferpool-wait-time";

    private final long totalMemory;

    // 可缓存的内存大小
    private final int poolableSize;
    private final ReentrantLock lock;

    // 缓存不再使用的空间
    private final Deque<ByteBuffer> free;
    private final Deque<Condition> waiters;
    
    // 非池化的空间,理解为正在使用的内存空间
    private long nonPooledAvailableMemory;
    private final Metrics metrics;
    private final Time time;
    private final Sensor waitTime;
    

    /** 
     *  内存分配算法
     *  1. 检查待申请的大小 size 是否等于可缓存的内存块大小;
     *    1.1 若相等,直接从 free队列头部拿出一个;
          1.2 若不等,检查BufferPool的总空间是否能够分配size;
          1.3 若不能分配size,就超时阻塞等待直到有足够的内存去分配;
     */
    public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
        if (size > this.totalMemory)
            throw new IllegalArgumentException("Attempt to allocate " + size
                                               + " bytes, but there is a hard limit of "
                                               + this.totalMemory
                                               + " on memory allocations.");

        ByteBuffer buffer = null;
        this.lock.lock();
        try {
            // check if we have a free buffer of the right size pooled
            if (size == poolableSize && !this.free.isEmpty())
                return this.free.pollFirst();

            // now check if the request is immediately satisfiable with the
            // memory on hand or if we need to block
            int freeListSize = freeSize() * this.poolableSize;
            if (this.nonPooledAvailableMemory + freeListSize >= size) {
                // we have enough unallocated or pooled memory to immediately
                // satisfy the request, but need to allocate the buffer
                freeUp(size);
                this.nonPooledAvailableMemory -= size;
            } else {
                // we are out of memory and will have to block
                int accumulated = 0;
                // 获取到 Condition 对象
                Condition moreMemory = this.lock.newCondition();
                try {
                    long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
                    this.waiters.addLast(moreMemory);
                    // loop over and over until we have a buffer or have reserved
                    // enough memory to allocate one
                    while (accumulated < size) {
                        long startWaitNs = time.nanoseconds();
                        long timeNs;
                        boolean waitingTimeElapsed;
                        try {
                            // 调用 Condition.await() 超时阻塞等待
                            waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
                        } finally {
                            long endWaitNs = time.nanoseconds();
                            timeNs = Math.max(0L, endWaitNs - startWaitNs);
                            this.waitTime.record(timeNs, time.milliseconds());
                        }

                        if (waitingTimeElapsed) {
                            throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
                        }

                        remainingTimeToBlockNs -= timeNs;

                        // check if we can satisfy this request from the free list,
                        // otherwise allocate memory
                        if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
                            // just grab a buffer from the free list
                            buffer = this.free.pollFirst();
                            accumulated = size;
                        } else {
                            // we'll need to allocate memory, but we may only get
                            // part of what we need on this iteration
                            freeUp(size - accumulated);
                            int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
                            this.nonPooledAvailableMemory -= got;
                            accumulated += got;
                        }
                    }
                    // Don't reclaim memory on throwable since nothing was thrown
                    accumulated = 0;
                } finally {
                    // When this loop was not able to successfully terminate don't loose available memory
                    this.nonPooledAvailableMemory += accumulated;
                    this.waiters.remove(moreMemory);
                }
            }
        } finally {
            // signal any additional waiters if there is more memory left
            // over for them
            try {
                if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
                    this.waiters.peekFirst().signal();
            } finally {
                // Another finally... otherwise find bugs complains
                lock.unlock();
            }
        }

        if (buffer == null)
            return safeAllocateByteBuffer(size);
        else
            return buffer;
    }
}

Sender 线程操作

在主线程成功将消息发送到 RecordAccumulator 缓存后,会唤醒 Sender 线程:

Sender 从 RecordAccumulator 中获取缓存的消息之后,会进一步将原本 <分区,Deque<ProducerBatch>> 的保存形式转变成 <Node, List<ProducerBatch>> 的形式,其中 Node 表示 Kafka
集群的broker节点。对于网络连接来说,生产者客户端是与具体的broker节点建立的连接,也
就是向具体的 broker 节点发送消息,而并不关心消息属于哪一个分区;而对于 KafkaProducer
的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以在这里需要做一个应用逻辑层
面到网络I/O层面的转换。

Sender 类的 sendProducerData() 方法入手,该方法遍历所有 Partition,找到数据可以发送的分区集合,然后完成消息存储格式的转换,存储了节点ID到 ProducerBatch 的映射。

接着 Sender 会进一步封装为 <Node, Request> 的形式,这样就可以将 Reqeust 请求发往各个 Node 了。

在请求从 Sender 线程发送到 kafka 之前还会保存到 InFlightRequests 中,InFlightRequests 保存对象的具体形式为 Map<NodeId, Deque<Request>>,它的主要作用是缓存已经发送出去但是还没有收到响应的请求。利用 InFlightRequests 可以限制每个连接最多缓存的请求数 max.in.flight.requests.per.connection,默认为 5,超过 5就不能再发送请求了。

元数据的更新

上述提及的 InFlightRequests 还可以获取到 leastLoadedNode,即所有节点中负载最小的节点。这里负载最小是指已经建立连接,但是发送的请求数最少的节点。

public Node leastLoadedNode(long now) {
    // 获取所有的节点集合
    List<Node> nodes = this.metadataUpdater.fetchNodes();
    int inflight = Integer.MAX_VALUE;
    Node found = null;

    int offset = this.randOffset.nextInt(nodes.size());
    for (int i = 0; i < nodes.size(); i++) {
        int idx = (offset + i) % nodes.size();
        Node node = nodes.get(idx);

        // 获取该节点的请求负载数
        int currInflight = this.inFlightRequests.count(node.idString());
        if (currInflight == 0 && isReady(node, now)) {
            // 如果已建立连接并且请求负载数为0,则找到退出
            log.trace("Found least loaded node {} connected with no in-flight requests", node);
            return node;
        } else if (!this.connectionStates.isBlackedOut(node.idString(), now) && currInflight < inflight) {
            // 否则更新当前的请求负载数,一直找到负载数最小的节点退出
            inflight = currInflight;
            found = node;
        } else if (log.isTraceEnabled()) {
            log.trace("Removing node {} from least loaded node selection: is-blacked-out: {}, in-flight-requests: {}",
                    node, this.connectionStates.isBlackedOut(node.idString(), now), currInflight);
        }
    }

    if (found != null)
        log.trace("Found least loaded node {}", found);
    else
        log.trace("Least loaded node selection failed to find an available node");

    return found;
}

选择 leastLoadedNode 发送请求可以使它能够尽快发出,避免因网络拥塞等异常而影响整体的进
度。leastLoadedNode 的概念可以用于多个应用场合,比如元数据请求、消费者组播协议的交互。
当我们通过 ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Hello, Kafka!") 创建消息生产者时,我们只需要知道主题名称。但是 KafkaProducer 在将消息追加到指定主题的某个分区对应的 leader 副本之前,首先需要知道主题的分区数量,然后通过计算得到目标分区,之后需要知道 leader 副本所在的 broker 节点的地址、端口等信息才能建立连接,最终将消息发送到 kafka,这个过程中所需要的信息都属于元数据信息。

Kafka 的元数据是指集群的元数据,这些元数据记录了集群有哪些主题,这些主题有哪些分区,每个分区的 leader 副本分配在哪个节点上,follower 副本分布在哪些节点上,哪些副本在 AR、ISR 等集合中。在需要更新元数据时,会挑选出 leastLoadedNode ,然后向该 Node 发送 MetadataRequest 请求获取元数据信息。

// NetworkClient 类

public long maybeUpdate(long now) {
    // should we update our metadata?
    long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
    long waitForMetadataFetch = this.metadataFetchInProgress ? defaultRequestTimeoutMs : 0;

    long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);

    if (metadataTimeout > 0) {
        return metadataTimeout;
    }

    // Beware that the behavior of this method and the computation of timeouts for poll() are
    // highly dependent on the behavior of leastLoadedNode.
    // 获取负载最小的节点
    Node node = leastLoadedNode(now);
    if (node == null) {
        log.debug("Give up sending metadata request since no node is available");
        return reconnectBackoffMs;
    }

    // 通过该节点发送 MetadataRequest 请求
    return maybeUpdate(now, node);
}