一、客户端开发
客户端需要具备以下功能:
- 配置生产者客户端参数以及创建相应的生产者实例;
- 构建待发送的消息;
- 发送消息;
- 关闭生产者实例;
基础 Kafka 客户端代码:
public class KafkaProducerAnalysis {
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
// 初始化客户端配置
public static Properties initConfig(){
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
properties.put("client.id", "producer.client.id.demo");
return props;
}
public static void main(String[] args) {
Properties props = initConfig();
// 生成生产者实例
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 创建消息对象
ProducerRecord<String, String> record =
new ProducerRecord<>(topic, "hello, Kafka!");
try {
// 发送消息
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
}
}
其中 KafkaProcedure
包含了多个属性,类定义如下:
public class ProducerRecord<K, V> {
private final String topic; //主题
private final Integer partition; //分区号
private final Headers headers; //消息头部
private final K key; //键
private final V value; //值
private final Long timestamp; //消息的时间戳
//省略其他成员方法和构造方法
}
消息首先按照 topic
分类,然后再按照 key
进行二次分类,同一个 key
的消息会被划分到同一个分区中;value
一般为消息体不为空,为空表示墓碑消息;timestamp
是消息的时间戳,包括 CreateTime
和 LogAppendTime
两种类型,前者表示消息创建的时间、后者表示消息追加到日志的时间。
生产者一些重要的参数配置:
# 指定生产者客户端连接 Kafka 集群所需的 broker 地址清单,可以是一个或多个,至少需要设置两个值
bootstrap.servers=host1:port1,host2:port2....
# broker 端接收的消息以字节数组的格式存在,指定序列化器
key.serializer
value.serializer
# 设定 KafkaProducer 对应的客户端id,若客户端不设置,默认为 `procedure-xxx` 的格式
client.id
消息发送
生成 ProcedureRecord
有如下构造方法:
public ProducerRecord(String topic, Integer partition, Long timestamp,
K key, V value, Iterable<Header> headers)
public ProducerRecord(String topic, Integer partition, Long timestamp,
K key, V value)
public ProducerRecord(String topic, Integer partition, K key, V value,
Iterable<Header> headers)
public ProducerRecord(String topic, Integer partition, K key, V value)
public ProducerRecord(String topic, K key, V value)
public ProducerRecord(String topic, V value)
构建完消息之后,可以开始发送消息,发送有如下几种方式:发后即忘、同步、异步。
- 发后即忘:
procedure.send(record)
,只管消息是否发送,不关心消息是否达到; - 同步:
procedure.send(record)j.get()
,send()
方法返回一个Future
对象,再通过调用get()
方法阻塞等待消息获取。或者通过get(long timeout, TimeUnit unit)
方法实现超时阻塞; 异步
:异步的实现不能通过get()
阻塞得到的 Future对象,因为 Future 里的get()
方法在何时调用,以及怎么调用都是一个问题,消息不停的发送,那么各个消息对应的 Future 就会很混乱。可以使用Callback()
回调函数的方式更简单,不用为每个消息注册 Future对象,只是在消息返回时自动调用注册的回调函数,实例如下:
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println(metadata.topic() + "-" +
metadata.partition() + ":" + metadata.offset());
}
}
});
kafka 一般会出现两种异常,可重试异常和不可重试异常。常见的可重试异常 NetworkException
、LeaderNotAvailableException
、UnknownTopicOrPartitionException
、NotEnoughReplicasException
、NotCoordinatorException
;不可重试异常包括 RecordTooLargeException
,它表示发送的消息体太大,kafka不会对此进行重试,而是直接抛出异常。对于可重试异常,如果配置了 retries
参数,只要在规定的重试时间内完成,就不会抛出异常,例如:
props.put(ProducerConfig.RETRIES_CONFIG, 10);
序列化器
生产者需要通过序列化器将对象转化为字节数组,然后才通过网络发送给 Kafka,同时对侧消费者需要通过反序列化器从 kafka 中收到的字节数组转化为对象。为了方便,消息的key和value都使用了字符串,对应程序中的
序列化器也使用了客户端自带的 org.apache.kafka.common.serialization.StringSerializer
,除了 String类型的序列化器,还有 ByteArray、ByteBuffer、Bytes、Double、Integer、Long
几种类型,都实现了 org.apache.kafka.common.serialization.Serializer
接口,该接口包含如下方法:
public void configure(Map<String, ?> configs, boolean isKey)
public byte[] serialize(String topic, T data)
public void close()
configure()
用来配置当前类,serialize()
执行序列化操作,close()
用来关闭序列化器。StringSerializer
序列化器的实现如下:
public class StringSerializer implements Serializer<String> {
private String encoding = "UTF8";
// 指定序列化的编码类型
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.serializer.encoding" :
"value.serializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null)
encodingValue = configs.get("serializer.encoding");
if (encodingValue != null && encodingValue instanceof String)
encoding = (String) encodingValue;
}
// 数据序列化操作
@Override
public byte[] serialize(String topic, String data) {
try {
if (data == null)
return null;
else
return data.getBytes(encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when serializing " +
"string to byte[] due to unsupported encoding " + encoding);
}
}
//
@Override
public void close() {
// nothing to do
}
}
除了系统给出的几种序列化器,用户还可以使用自定义类型的序列化器来实现,比如用户可以选择 Avro、JSON、Thrift、ProtoBuf 等序列化工具实现。
分区器
消息由生产者通过 send()
方法发送到 broker 的过程中,会经过拦截器、序列化器、分区器等部件作用后才真正发送到 broker中。其中分区器根据消息的 主题topic、键key、序列化后的键keyBytes、值value、序列化后的值valueBytes、集群元信息 cluster 计算得到具体的分区号,再发送消息到该分区中。
Kafka 默认实现的分区器为 DefaultPartitioner
,它实现了 Partitioner
接口:
public class DefaultPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
public void configure(Map<String, ?> configs) {}
/**
* 1. 首先根据 topic 计算出可能的分区信息集合;
* 2. 如果序列化后的 keyBytes 不为空,通过 MurmurHash2 算法计算分区号(注意这里是发送到主题的所有分区);
* 3. 如果序列化后的 KeyButes 为空,将消息轮询的发送到主题中各个可用的分区(注意这里是主题的可用分区);
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
public void close() {}
}
拦截器
Kafka 包含了两种拦截器,生产者拦截器和消费者拦截器。其中生产者拦截器主要用来在消息发送前做一些准备工作,比如按照某个规则过滤掉不符合要求的消息、修改消息的内容;或者在接收消息回调时做一些统计工作。
Kafka 的拦截器接口为 ProducerInterceptor
,内部包含了核心的几个方法:
public interface ProducerInterceptor<K, V> extends Configurable {
// 消息序列化或者计算消息分区前调用
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
// 消息应答之前或者消息发送失败时调用
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
// 关闭拦截器执行的资源清理工作
public void close();
}
可以实现 ProducerInterceptor
接口来实现自定义的消息拦截器,比如给每个发送的消息添加固定前缀,或者统计成功发送的消息数量:
public class ProducerInterceptorPrefix implements
ProducerInterceptor<String, String> {
private volatile long sendSuccess = 0;
private volatile long sendFailure = 0;
@Override
public ProducerRecord<String, String> onSend(
ProducerRecord<String, String> record) {
String modifiedValue = "prefix1-" + record.value();
return new ProducerRecord<>(record.topic(),
record.partition(), record.timestamp(),
record.key(), modifiedValue, record.headers());
// if (record.value().length() < 5) {
// throw new RuntimeException();
// }
// return record;
}
@Override
public void onAcknowledgement(
RecordMetadata recordMetadata,
Exception e) {
if (e == null) {
sendSuccess++;
} else {
sendFailure++;
}
}
@Override
public void close() {
double successRatio = (double) sendSuccess / (sendFailure + sendSuccess);
System.out.println("[INFO] 发送成功率="
+ String.format("%f", successRatio * 100) + "%");
}
@Override
public void configure(Map<String, ?> map) {
}
}
二、生产者实现原理
客户端由两个线程组成,分别是主线程和 Sender 线程。主线程通过 KafkaProducer
创建消息然后通过拦截器、序列化器、分区器将消息发送到 RecordAccumulator
消息累加器中。Sender 线程负责从 RecordAccumulator
中获取消息并发送到 kafka 中。
主线程操作
从 KafkaProducer.send(xxx)
方法入手开始探究源码:
// KafkaProducer类
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// 消息经过拦截器预处理
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
// 发送消息
return doSend(interceptedRecord, callback);
}
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
throwIfProducerClosed();
// first make sure the metadata for the topic is available
ClusterAndWaitTime clusterAndWaitTime;
try {
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
}
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
// 开始序列化 Key、Value
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer", cce);
}
// 开始调用分区器获取分区
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
setReadOnly(record.headers());
Header[] headers = record.headers().toArray();
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);
// RecordAccumulator 添加消息
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (BufferExhaustedException e) {
this.errors.record();
this.metrics.sensor("buffer-exhausted-records").record();
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (KafkaException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is called before anything else in this method
this.interceptors.onSendError(record, tp, e);
throw e;
}
}
添加记录到 Accumulator 中缓存,并返回消息添加后的回调。其中每个分区对应一个双端队列,队列中存储 ProducerBatch
对象。
// RecordAccumulator 类
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
// 1. 检查当前分区是否有可用的 ProducerBatch 对象
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
// 2. 从双端队列的末尾添加
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null)
return appendResult;
}
// 3. 没有可用 ProducerBatch,创建一个新的
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
// 4. 评估该消息的大小是否超过 batch.size 参数,若未超过就以 batch.size 大小来创建 ProducerBatch,这样在使用完这块区域后可以通过 BufferPool 的管理来进行复用;否则按照实际大小创建,使用完后不可复用。
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new KafkaException("Producer closed while send in progress");
// 5. 尝试添加消息到新申请的 ProducerBatch 对象中(但是这里还没开始创建新的 ProducerBatch 对象)
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
return appendResult;
}
// 6. 构建底层 MemoryRecordsBuilder 对象
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
// 7. 创建新的 ProducerBatch 对象
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
dq.addLast(batch);
incomplete.add(batch);
// Don't deallocate this buffer in the finally block as it's being used in the record batch
buffer = null;
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
}
} finally {
if (buffer != null)
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
}
/**
* 1.获取到队列中最后一个 ProducerBatch 对象;
* 2.如果最后一个 ProducerBatch 对象不为空,开始添加;
2.1 已达最大容量,说明该 ProducerBatch 不能再添加消息,并调整该ProducerBatch 的 Header为不可追加;同时创建一个新的 ProducerBatch对象,
*/
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
Callback callback, Deque<ProducerBatch> deque) {
ProducerBatch last = deque.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
if (future == null)
last.closeForRecordAppends();
else
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
}
return null;
}
// ProducerBatch 类
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
// 1. 检查当前 ProducerBatch 是否有足够的剩余空间,没有返回空
if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
return null;
} else {
// 2. 空间足够添加消息并返回成功后的回调
Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
recordsBuilder.compressionType(), key, value, headers));
this.lastAppendTime = now;
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp, checksum,
key == null ? -1 : key.length,
value == null ? -1 : value.length);
// we have to keep every future returned to the users in case the batch needs to be
// split to several new batches and resent.
thunks.add(new Thunk(callback, future));
this.recordCount++;
return future;
}
}
以上为消息发送的逻辑,注意消息在网络上都是以字节形式传送的,所以发送前需要创建一块内存来保存消息数据。Kafka 通过 java.io.ByteBuffer
实现消息内存的创建和释放,但是频繁创建和释放是很耗时的,所以 Kafka
另外设计了一个 BufferPool
结构,该结构只对特定大小的 ByteBuffer
进行管理,其它大小的 ByteBuffer
不会回收,该特定大小通过 batch.size
参数指定,默认为 16KB。
public class BufferPool {
static final String WAIT_TIME_SENSOR_NAME = "bufferpool-wait-time";
private final long totalMemory;
// 可缓存的内存大小
private final int poolableSize;
private final ReentrantLock lock;
// 缓存不再使用的空间
private final Deque<ByteBuffer> free;
private final Deque<Condition> waiters;
// 非池化的空间,理解为正在使用的内存空间
private long nonPooledAvailableMemory;
private final Metrics metrics;
private final Time time;
private final Sensor waitTime;
/**
* 内存分配算法
* 1. 检查待申请的大小 size 是否等于可缓存的内存块大小;
* 1.1 若相等,直接从 free队列头部拿出一个;
1.2 若不等,检查BufferPool的总空间是否能够分配size;
1.3 若不能分配size,就超时阻塞等待直到有足够的内存去分配;
*/
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
if (size > this.totalMemory)
throw new IllegalArgumentException("Attempt to allocate " + size
+ " bytes, but there is a hard limit of "
+ this.totalMemory
+ " on memory allocations.");
ByteBuffer buffer = null;
this.lock.lock();
try {
// check if we have a free buffer of the right size pooled
if (size == poolableSize && !this.free.isEmpty())
return this.free.pollFirst();
// now check if the request is immediately satisfiable with the
// memory on hand or if we need to block
int freeListSize = freeSize() * this.poolableSize;
if (this.nonPooledAvailableMemory + freeListSize >= size) {
// we have enough unallocated or pooled memory to immediately
// satisfy the request, but need to allocate the buffer
freeUp(size);
this.nonPooledAvailableMemory -= size;
} else {
// we are out of memory and will have to block
int accumulated = 0;
// 获取到 Condition 对象
Condition moreMemory = this.lock.newCondition();
try {
long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
this.waiters.addLast(moreMemory);
// loop over and over until we have a buffer or have reserved
// enough memory to allocate one
while (accumulated < size) {
long startWaitNs = time.nanoseconds();
long timeNs;
boolean waitingTimeElapsed;
try {
// 调用 Condition.await() 超时阻塞等待
waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
} finally {
long endWaitNs = time.nanoseconds();
timeNs = Math.max(0L, endWaitNs - startWaitNs);
this.waitTime.record(timeNs, time.milliseconds());
}
if (waitingTimeElapsed) {
throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
}
remainingTimeToBlockNs -= timeNs;
// check if we can satisfy this request from the free list,
// otherwise allocate memory
if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
// just grab a buffer from the free list
buffer = this.free.pollFirst();
accumulated = size;
} else {
// we'll need to allocate memory, but we may only get
// part of what we need on this iteration
freeUp(size - accumulated);
int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
this.nonPooledAvailableMemory -= got;
accumulated += got;
}
}
// Don't reclaim memory on throwable since nothing was thrown
accumulated = 0;
} finally {
// When this loop was not able to successfully terminate don't loose available memory
this.nonPooledAvailableMemory += accumulated;
this.waiters.remove(moreMemory);
}
}
} finally {
// signal any additional waiters if there is more memory left
// over for them
try {
if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
this.waiters.peekFirst().signal();
} finally {
// Another finally... otherwise find bugs complains
lock.unlock();
}
}
if (buffer == null)
return safeAllocateByteBuffer(size);
else
return buffer;
}
}
Sender 线程操作
在主线程成功将消息发送到 RecordAccumulator
缓存后,会唤醒 Sender 线程:
Sender 从 RecordAccumulator
中获取缓存的消息之后,会进一步将原本 <分区,Deque<ProducerBatch>>
的保存形式转变成 <Node, List<ProducerBatch>>
的形式,其中 Node 表示 Kafka
集群的broker节点。对于网络连接来说,生产者客户端是与具体的broker节点建立的连接,也
就是向具体的 broker 节点发送消息,而并不关心消息属于哪一个分区;而对于 KafkaProducer
的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以在这里需要做一个应用逻辑层
面到网络I/O层面的转换。
从 Sender
类的 sendProducerData()
方法入手,该方法遍历所有 Partition,找到数据可以发送的分区集合,然后完成消息存储格式的转换,存储了节点ID到 ProducerBatch
的映射。
接着 Sender 会进一步封装为 <Node, Request>
的形式,这样就可以将 Reqeust 请求发往各个 Node 了。
在请求从 Sender 线程发送到 kafka 之前还会保存到 InFlightRequests
中,InFlightRequests
保存对象的具体形式为 Map<NodeId, Deque<Request>>
,它的主要作用是缓存已经发送出去但是还没有收到响应的请求。利用 InFlightRequests
可以限制每个连接最多缓存的请求数 max.in.flight.requests.per.connection
,默认为 5,超过 5就不能再发送请求了。
元数据的更新
上述提及的 InFlightRequests
还可以获取到 leastLoadedNode
,即所有节点中负载最小的节点。这里负载最小是指已经建立连接,但是发送的请求数最少的节点。
public Node leastLoadedNode(long now) {
// 获取所有的节点集合
List<Node> nodes = this.metadataUpdater.fetchNodes();
int inflight = Integer.MAX_VALUE;
Node found = null;
int offset = this.randOffset.nextInt(nodes.size());
for (int i = 0; i < nodes.size(); i++) {
int idx = (offset + i) % nodes.size();
Node node = nodes.get(idx);
// 获取该节点的请求负载数
int currInflight = this.inFlightRequests.count(node.idString());
if (currInflight == 0 && isReady(node, now)) {
// 如果已建立连接并且请求负载数为0,则找到退出
log.trace("Found least loaded node {} connected with no in-flight requests", node);
return node;
} else if (!this.connectionStates.isBlackedOut(node.idString(), now) && currInflight < inflight) {
// 否则更新当前的请求负载数,一直找到负载数最小的节点退出
inflight = currInflight;
found = node;
} else if (log.isTraceEnabled()) {
log.trace("Removing node {} from least loaded node selection: is-blacked-out: {}, in-flight-requests: {}",
node, this.connectionStates.isBlackedOut(node.idString(), now), currInflight);
}
}
if (found != null)
log.trace("Found least loaded node {}", found);
else
log.trace("Least loaded node selection failed to find an available node");
return found;
}
选择 leastLoadedNode
发送请求可以使它能够尽快发出,避免因网络拥塞等异常而影响整体的进
度。leastLoadedNode
的概念可以用于多个应用场合,比如元数据请求、消费者组播协议的交互。
当我们通过 ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Hello, Kafka!")
创建消息生产者时,我们只需要知道主题名称。但是 KafkaProducer
在将消息追加到指定主题的某个分区对应的 leader 副本之前,首先需要知道主题的分区数量,然后通过计算得到目标分区,之后需要知道 leader 副本所在的 broker 节点的地址、端口等信息才能建立连接,最终将消息发送到 kafka,这个过程中所需要的信息都属于元数据信息。
Kafka 的元数据是指集群的元数据,这些元数据记录了集群有哪些主题,这些主题有哪些分区,每个分区的 leader 副本分配在哪个节点上,follower 副本分布在哪些节点上,哪些副本在 AR、ISR 等集合中。在需要更新元数据时,会挑选出 leastLoadedNode
,然后向该 Node 发送 MetadataRequest
请求获取元数据信息。
// NetworkClient 类
public long maybeUpdate(long now) {
// should we update our metadata?
long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
long waitForMetadataFetch = this.metadataFetchInProgress ? defaultRequestTimeoutMs : 0;
long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);
if (metadataTimeout > 0) {
return metadataTimeout;
}
// Beware that the behavior of this method and the computation of timeouts for poll() are
// highly dependent on the behavior of leastLoadedNode.
// 获取负载最小的节点
Node node = leastLoadedNode(now);
if (node == null) {
log.debug("Give up sending metadata request since no node is available");
return reconnectBackoffMs;
}
// 通过该节点发送 MetadataRequest 请求
return maybeUpdate(now, node);
}