kafka(java客户端)生产者消费者不能连接虚拟机kafka

发布时间 2023-04-02 10:18:20作者: Realife

报错如下:

...:localhost:9092...
java.net.ConnectException: Connection refused: no further information
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
    at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:51)
    at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:73)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:236)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:148)
    at java.lang.Thread.run(Thread.java:745)

各软件使用的版本:
kafka版本:kafka_2.12-2.2.1
zookeeper版本:zookeeper-3.4.14
需要注意的是Kafka 2.8.0版本后脱离了zookeeper依赖, 用自管理的Quorum代替ZooKeeper管理元数据。

在Centos 7.9虚拟机上安装了kafka,zookeepe之后,没有修改过其中的任何配置文件,kafka默认端口是9029,zookeeper默认端口是2181,
分别运行以下两个命令,他两都能正常运行:

bin/zookeeper-server-start.sh config/zookeeper.properties

bin/kafka-server-start.sh config/server.properties

启动后,创建Topic:

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1

通过命令行工具(kafka-console-producer.sh和kafka-console-consumer.sh)是能够相互通信的,producer发布的信息consumer能够接收到。

但是
java通过kafka-client的API写的代码始终不能跟kafka通信:java producer的消息发不出去, java comsumer也收不到任何消息。
仔细检查了下代码中IP、端口都没有写错。

package com.heima.kafka.chapter1;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * Kafka 消息生产者
 */
public class ProducerFastStart {
    // Kafka集群地址
    private static final String brokerList = "localhost:9092";
    // 主题名称-之前已经创建
    private static final String topic = "test1";

    public static void main(String[] args) {
        Properties properties = new Properties();
        // 设置集群地址
        properties.put("bootstrap.servers", brokerList);
        // 设置key序列化器
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //另外一种写法
        //properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 设置重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);
        // 设置值序列化器
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // KafkaProducer 线程安全
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Kafka-demo-001", "hello, Kafka!");
        try {
            producer.send(record);
            //RecordMetadata recordMetadata = producer.send(record).get();
            //System.out.println("part:" + recordMetadata.partition() + ";topic:" + recordMetadata.topic());
        } catch (Exception e) {
            e.printStackTrace();
        }
        producer.close();
    }
}

解决办法
将kafka/config/server.properties文件中advertised.listeners改为如下属性。192.168.230.66是我虚拟机的IP。改完后重启kafaka。
advertised.listeners=PLAINTEXT://192.168.75.137:9092

advertised.listeners上的注释是这样的:

#Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().

意思就是说:hostname、port都会广播给producer、consumer。如果你没有配置了这个属性的话,则使用listeners的值,如果listeners的值也没有配置的话,则使用
java.net.InetAddress.getCanonicalHostName()返回值(这里也就是返回localhost了)。

最后不要忘了修改Java代码:

// Kafka集群地址
private static final String brokerList = "192.168.230.66:9092";

运行Java生产者代码,好了,虚拟机端消费者可以接受到消息了。