flink中配置kafka

发布时间 2023-04-01 17:27:38作者: 曹军

  Flink 提供了 Apache Kafka 连接器,用于从 Kafka topic 中读取或者向其中写入数据,可提供精确一次的处理语义。

一:简单使用

1.pom

        <!--Flink Connector KAFKA-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

 

2.创建properties

  具体的配置,写在配置文件中

    private static Properties kp;
    private static Properties kc;
    private static Properties kcc;

    public static Properties getProducer() {
        if (kp == null) {
            kp = new Properties();
            Config load = ConfigFactory.load("kafka.properties");
            kp.put("bootstrap.servers",load.getString("kafka.config.bootstrap.servers"));
            kp.put("acks", load.getString("kafka.config.acks"));
            kp.put("retries", load.getInt("kafka.config.retries"));
            kp.put("batch.size", load.getInt("kafka.config.batch.size"));
        }
        return kp;
    }


    public static Properties getConsumer(String groupId) {
        if (kc == null) {
            kc = new Properties();
            Config load = ConfigFactory.load("kafka.properties");
            kc.put("bootstrap.servers", load.getString("kafka.config.bootstrap.servers"));
            kc.put("group.id", StringUtils.isNotEmpty(groupId) ? groupId : "exceed-group");
            kc.put("enable.auto.commit", load.getString("kafka.config.enable.auto.commit"));
            kc.put("auto.commit.interval.ms",load.getString("kafka.config.auto.commit.interval.ms"));
            kc.put("session.timeout.ms",load.getString("kafka.config.session.timeout.ms"));
            kc.put("key.deserializer",load.getString("kafka.config.key.deserializer"));
            kc.put("value.deserializer",load.getString("kafka.config.value.deserializer"));
            kc.put("auto.offset.reset",load.getString("kafka.config.auto.offset.reset"));
        }

        return kc;
    }

 

3.代码使用

  消费者:

FlinkKafkaConsumer<String> stringFlinkKafkaConsumer = new FlinkKafkaConsumer<>(DataVerifyConst.Topics.LOG_TOPIC, new SimpleStringSchema(), KafkaConfigUtil.getConsumer(ConsumerGroup.getNowOneHourOfflineGroup()));

  其中,注意点是groupId

  重启之后,之前的数据不要了。

public class ConsumerGroup {

    public static String getMin(){
        Calendar cal = Calendar.getInstance();
        StringBuffer sb=new StringBuffer();
        sb.append(cal.get(Calendar.YEAR));
        sb.append(cal.get(Calendar.MONTH+1));
        sb.append(cal.get(Calendar.DATE));
        sb.append(cal.get(Calendar.HOUR));
        sb.append(cal.get(Calendar.MINUTE));
        return sb.toString();
    }

    public static String getNowOneHourOfflineGroup(){
        StringBuffer sb=new StringBuffer();
        sb.append("verify-offline-");
        sb.append(getMin());
        return sb.toString();
    }
}

 

  生产者:

Properties props = KafkaConfigUtil.getProducer();
FlinkKafkaProducer<MonitoringIndex> producer = new FlinkKafkaProducer<>("verify-alarm", new MonitorIndexSchema(), props);

  配置对应的schema序列化

public class MonitorIndexSchema implements DeserializationSchema<MonitoringIndex>, SerializationSchema<MonitoringIndex> {

    private static final long serialVersionUID = 1L;
    private transient Charset charset;

    public MonitorIndexSchema() {
        this(StandardCharsets.UTF_8);
    }

    public MonitorIndexSchema(Charset charset) {
        this.charset = checkNotNull(charset);
    }

    public Charset getCharset() {
        return charset;
    }

    @Override
    public MonitoringIndex deserialize(byte[] message) throws IOException {
        String json = new String(message, this.charset == null ? StandardCharsets.UTF_8 : this.charset);
        return JacksonJsonUtil.json2Obj(json, MonitoringIndex.class);
    }

    @Override
    public boolean isEndOfStream(MonitoringIndex monitoringIndex) {
        return false;
    }

    @Override
    public byte[] serialize(MonitoringIndex monitoringIndex) {
        String json = JacksonJsonUtil.obj2Json(monitoringIndex);
        return json.getBytes(StandardCharsets.UTF_8);
    }

    @Override
    public TypeInformation<MonitoringIndex> getProducedType() {
        return TypeInformation.of(new TypeHint<MonitoringIndex>() {
        });
    }
}

 

二:Kafka Consumer

1.构造函数

  Flink 的 Kafka consumer 称为 FlinkKafkaConsumer。它提供对一个或多个 Kafka topics 的访问。

  构造函数接受以下参数:

  1. Topic 名称或者名称列表
  2. 用于反序列化 Kafka 数据的 DeserializationSchema 或者 KafkaDeserializationSchema
  3. Kafka 消费者的属性。需要以下属性:
  • “bootstrap.servers”(以逗号分隔的 Kafka broker 列表)
  • “group.id” 消费组 ID

 

2.java中使用

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
    .addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));

 

3.反序列化

Flink Kafka Consumer 需要知道如何将 Kafka 中的二进制数据转换为 Java 或者 Scala 对象。KafkaDeserializationSchema 允许用户指定这样的 schema,每条 Kafka 中的消息会调用 T deserialize(ConsumerRecord<byte[], byte[]> record) 反序列化。

 

4.配置 Kafka Consumer 开始消费的位置

FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(...);
myConsumer.setStartFromEarliest();     // 尽可能从最早的记录开始
myConsumer.setStartFromLatest();       // 从最新的记录开始
myConsumer.setStartFromTimestamp(...); // 从指定的时间开始(毫秒)
myConsumer.setStartFromGroupOffsets(); // 默认的方法
 
DataStream<String> stream = env.addSource(myConsumer);

 

5.Kafka Consumer 提交 Offset 的行为配置

  Flink Kafka Consumer 允许有配置如何将 offset 提交回 Kafka broker 的行为。请注意:Flink Kafka Consumer 不依赖于提交的 offset 来实现容错保证。提交的 offset 只是一种方法,用于公开 consumer 的进度以便进行监控。

  配置 offset 提交行为的方法是否相同,取决于是否为 job 启用了 checkpointing。

  • 禁用 Checkpointing: 如果禁用了 checkpointing,则 Flink Kafka Consumer 依赖于内部使用的 Kafka client 自动定期 offset 提交功能。 因此,要禁用或启用 offset 的提交,只需将 enable.auto.commit 或者 auto.commit.interval.ms 的Key 值设置为提供的 Properties 配置中的适当值。

  • 启用 Checkpointing: 如果启用了 checkpointing,那么当 checkpointing 完成时,Flink Kafka Consumer 将提交的 offset 存储在 checkpoint 状态中。 这确保 Kafka broker 中提交的 offset 与 checkpoint 状态中的 offset 一致。 用户可以通过调用 consumer 上的 setCommitOffsetsOnCheckpoints(boolean) 方法来禁用或启用 offset 的提交(默认情况下,这个值是 true )。 注意,在这个场景中,Properties 中的自动定期 offset 提交设置会被完全忽略。

 

 

三:Kafka Producter

1.构造函数

  Flink Kafka Producer 被称为 FlinkKafkaProducer。它允许将消息流写入一个或多个 Kafka topic。

  构造器接收下列参数:

  1. 事件被写入的默认输出 topic
  2. 序列化数据写入 Kafka 的 SerializationSchema / KafkaSerializationSchema
  3. Kafka client 的 Properties。下列 property 是必须的:
    • “bootstrap.servers” (逗号分隔 Kafka broker 列表)
  4. 容错语义
roperties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
 
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<String>(
        "my-topic",                  // 目标 topic
        new SimpleStringSchema()     // 序列化 schema
        properties,                  // producer 配置
        FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // 容错
 
stream.addSink(myProducer);

 

四:解释

public TypeInformation<MonitoringIndex> getProducedType()

 

  实现这个接口的getProducedType方法就是获取此函数或输入格式产生的数据类型