【Flink从入门到精通 05】Source&Sink

发布时间 2023-12-26 15:58:52作者: sunny123456

【Flink从入门到精通 05】Source&Sink

Flink用于处理有状态的流式计算,需要对Source端的数据进行加工处理,然后写入到Sink端,下图展示了在Flink中数据所经历的过程,今天就根据这张图分别给大家分享下。

img

01 Environment

Flink所有的程序都从这一步开始,只有创建了执行环境,才能开始下一步的编写。可以使用如下方式获取运行环境:

(1)getExecutionEnvironment

创建一个执行环境,表示当前执行程序的上下文

  • 如果程序是独立调用的,则此方法返回本地执行环境
  • 如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境
  • 会根据查询运行的方式决定返回什么样的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  • 1

如果没有设置并行度,会以flink-conf.yaml中的配置为准,默认是1

parallelism.default:1

(2)createRemoteEnvironment

返回集群的执行环境,将Jar提交到远程服务器,需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包

StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname",6123,"YOURPATH//wordcount.jar");
  • 1

常用的创建执行环境方式为getExecutionEnvironment

02 Source&Sink

Source即Flink中的数据源,Sink则为数据输出端,Flink通过Flink Streaming Connector来与外部存储系统连接,Flink主要通过四种方式完成数据交换:

  • Flink预定义的Source与Sink
  • Flink内部提供的Boundled Connectors
  • 第三方Apache Bahir项目中的连接器
  • 异步IO方式

下面主要对预定义内容及Boundled Connectors作为介绍,更多内容可以参考

(1) 预定义的Source&Sink

先来看一下Flink给我们提供的内置Source,这些方法都位于StreamExecutionEnvironment类中。image-20220220141411737

Flink中内置的Sink如下图,均位于DataStream类中。

image-20220220141817284

基于文件的 source 和 sink

  1. 从文本文件中读取数据
env.readTextFile(path)
  • 1
  1. 根据指定的 fileInputFormat 格式读取文件中的内容
env.readFile(fileInputFormat, path)
  • 1
  1. 将结果从文本或 csv 格式写出到文件中
dataStream.writeAsText(path) ;
dataStream.writeAsCsv(path);
  • 1
  • 2

基于Socket的Source和Sink

需要提供 Socket 的 hostname 及 port,可以直接用 StreamExecutionEnvironment 预定的接口

  • socketTextStream 创建基于 Socket 的 source,从该 socket 中 以文本的形式读取数据

  • writeToSocket将结果写出到Socket

env.socketTextStream("localhost",9999);
kafkaDStream.writeToSocket("localhost",9999,new SimpleStringSchema());
  • 1
  • 2

基于内存 Collections、Iterators 的 Source

用于连接基于内存中的集合或者迭代器,通常用于数据测试

  • 调用 StreamExecutionEnvironment fromCollection、fromElements 构建相应的 source

  • 结果数据直接 print、 printToError 的方式写出到标准输出或标准错误

env.fromCollection(Collection);
env.fromCollection(Iterator, Class);
env.fromElements(T ...);
  • 1
  • 2
  • 3

(2) Boundled Connectors

在官网中,给出了如下的Connectors:

在使用过程中,提交 Job 的时候需要注意, job 代码 jar 包中一定要将相应的 connetor 相关类打包进去,否则在提交作业时就会失败,提示找不到相应的类,或初始化某些类异常

(3) 自定义Source&Sink

除了上述的Source与Sink外,Flink还支持自定义Source与Sink。

自定义Source

  • 实现SourceFunction类
  • 重写run方法和cancel方法
  • 在主函数中通过addSource调用
public class MySource implements SourceFunction<String> {
    // 定义一个运行标志位,表示数据源是否运行
    Boolean flag = true;
    @Override
    public void run(SourceContext<String> sourceContext) throws Exception {
        while (flag){
            sourceContext.collect("当前时间为:" + System.currentTimeMillis());
            Thread.sleep(100);
        }
    }
@Override
public void cancel() {
    flag = false;
}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
env.addSource(new MySource());
  • 1

自定义Sink

  • 继承SinkFunction
  • 重写invoke方法

下面给出了自定义JDBC Sink的案例,可以参考:

public class MyJdbcSink extends RichSinkFunction<String> {
// 定义连接
Connection conn;

// 创建连接
@Override
public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","root");
}

// 关闭连接
@Override
public void close() throws Exception {
    super.close();
    conn.close();
}

// 调用连接执行SQL
@Override
public void invoke(String value, Context context) throws Exception {
    PreparedStatement preparedStatement = conn.prepareStatement(value);
    preparedStatement.execute();
    preparedStatement.close();
}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
env.addSink(new MyJdbcSink());
  • 1

03 Transform

关于转换算子,在之前的DataStreamAPI中有过详细介绍,这里不做过多讲解,有不清楚的可以跳转《Flink DataStream API》一文。

04 Kafka Connector

在生产实践中,Kafka Connector是最常用的Connector,下面针对Kafka Connector做详细介绍。

(1)Kafka Consumer

  • 反序列化数据
  • 设置消费起始位置
  • Topic和Partition动态发现
  • Commit Offset
  • Timestamp Extraction/Watermark生成
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(
java.util.regex.Pattern.compile("test-topic-[0-9]"),
new SimpleStringSchema(),
properties);

DataStream<String> stream = env.addSource(myConsumer);

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

反序列化数据

kafka 中数据是以二进制 byte 形式存储的,读到 Flink 系统中之后,需要将二进制数据转化为具体的 java、scala 对象

  • 实现一个 schema 类, 定义如何序列化和反序列数据
  • 反序列化时需要实现 DeserializationSchema 接口,并重写 deserialize(byte[] message) 函数
  • 反序列化 kafka 中 kv 的数据时,需要实现 KeyedDeserializationSchema 接口,并重写 deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) 函数
  • 常用的序列化反序列化的 schema 类
    • SimpleStringSchema,按字符串方式进行序列化、反序列化
    • TypeInformationSerializationSchema,根据 Flink 的 TypeInformation 信息来推断出需要选择的 schema
    • JsonDeserializationSchema 使用 jackson 反序列化 json 格式消息, 并返回 ObjectNode,可以使用 .get(“property”) 方法来访问相应字段

消费起始位置设置

对于数据源来说,能否重设消费位置关系到端到端一致性的保证。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(...);
myConsumer.setStartFromEarliest();     // 尽可能从最早的记录开始
myConsumer.setStartFromLatest();       // 从最新的记录开始
myConsumer.setStartFromTimestamp(...); // 从指定的时间开始(毫秒)
myConsumer.setStartFromGroupOffsets(); // 默认的方法
// 为每个分区指定 consumer 应该开始消费的具体 offset
Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);
myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
DataStream<String> stream = env.addSource(myConsumer);
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

FlinkKafkaConsumer 类提供了相应函数,设置合适的起始位置

  • setStartFromGroupOffsets(默认),从 group offset 位置读取数据,group offset 指的是 kafka broker 端记录的某个 group 的最后一次的消费位置,但是 kafka broker 端没有该 group 信息,会根据 kafka 的参数 “auto.offset.reset” 的设置来决定从哪个位置开始消费
  • setStartFromEarliest,从 kafka 最早的位置开始读取
  • setStartFromLatest,从 kafka 最新的位置开始读取
  • setStartFromTimestamp(long),从时间戳大于或等于指定时间戳的位置开始读取,Kafka 时戳,是指 kafka 为每条消息增加另一个时间戳,可以表示消息在 proudcer 端生成时的时间、或进入到 kafka broker 时的时间
  • setStartFromSpecificOffsets,从指定分区的 offset 位置开始读取,如指定的 offsets 中不存某个分区,该分区从 group offset 位置开始读取。此时需要用户给定一个具体的分区、offset 的集合。

作业从 Checkpoint或Savepoint 恢复时,作业消费起始位置是从之前保存的状态中恢复,与上面提到跟 kafka 这些单独的配置无关

Topic 和分区发现

分区发现

Flink Kafka Consumer 支持发现动态创建的 Kafka 分区,并使用精准一次的语义保证去消费它们。在初始检索分区元数据之后(即当 Job 开始运行时)发现的所有分区将从最早可能的 offset 中消费。

默认情况下,是禁用了分区发现的。若要启用它,需要在提供的属性配置中为 flink.partition-discovery.interval-millis 设置大于 0 的值,表示发现分区的间隔是以毫秒为单位的。

FlinkKafkaConsumer 内部会启动一个单独的线程定期去 kafka 获取最新的 meta 信息。

Topic 发现

在更高的级别上,Flink Kafka Consumer 还能够使用正则表达式基于 Topic 名称的模式匹配来发现 Topic。

当 Job 开始运行时,Consumer 将订阅名称与指定正则表达式匹配的所有主题(以 test-topic 开头并以单个数字结尾)。

要允许 consumer 在作业开始运行后发现动态创建的主题,则要将 flink.partition-discovery.interval-millis 设置非负值,允许 consumer 发现名称与指定模式匹配的新主题的分区。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(
    java.util.regex.Pattern.compile("test-topic-[0-9]"),
    new SimpleStringSchema(),
    properties);
DataStream<String> stream = env.addSource(myConsumer);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

Commit Offset

Flink Kafka Consumer 不依赖于提交的 offset 来实现容错保证

  • 禁用 Checkpointing: 如果禁用了 checkpointing,则 Flink Kafka Consumer 依赖于内部使用的 Kafka client 自动定期 offset 提交功能。 因此,要禁用或启用 offset 的提交,只需将 enable.auto.commit 或者 auto.commit.interval.ms 的Key 值设置为提供的 Properties 配置中的适当值。
  • 启用 Checkpointing: 如果启用了 checkpointing,那么当 checkpointing 完成时,Flink Kafka Consumer 将提交的 offset 存储在 checkpoint 状态中。 这确保 Kafka broker 中提交的 offset 与 checkpoint 状态中的 offset 一致。 用户可以通过调用 consumer 上的 setCommitOffsetsOnCheckpoints(boolean) 方法来禁用或启用 offset 的提交(默认情况下,这个值是 true )。 注意,该场景中,Properties 中的自动定期 offset 提交设置会被完全忽略

Timestamp Extraction/Watermark生成

Flink Kafka Consumer 允许指定 AssignerWithPeriodicWatermarksAssignerWithPunctuatedWatermarks,此时每个 partition 一个 watermark assigner,source 生成的时戳为多个 partition 时戳对齐后的最小时间戳。

此时在一个 source 读取多个 partition,并且 partition 之间数据时戳有一定差距的情况下,因为在 source 端 watermark 在 partition 级别有对齐,不 会导致数据读取较慢 partition 数据丢失。

image-20220129162754223

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer<String> myConsumer =
    new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties);
myConsumer.assignTimestampsAndWatermarks(
    WatermarkStrategy
        .forBoundedOutOfOrderness(Duration.ofSeconds(20)));
DataStream<String> stream = env.addSource(myConsumer);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

(2)Kafka Producer

Flink Kafka Producer 即 FlinkKafkaProducer。它允许将消息流写入一个或多个 Kafka topic。

构造器接收下列参数:

  1. kafka主题 topic
  2. 序列化数据写入 Kafka 的 SerializationSchema / KafkaSerializationSchema
  3. Kafka client 的 Properties,其中“bootstrap.servers” (逗号分隔 Kafka broker 列表)配置是必须的
  4. 容错语义
  5. setWriteTimestampToKafka(boolean writeTimestampToKafka),给每条记录设置时间戳
  6. setLogFailuresOnly(boolean logFailuresOnly) ,设置是否在 Producer 发生异常时仅仅记录日志
  7. setTransactionalIdPrefix(String transactionalIdPrefix) ,设置自定义的 transactional.id 前缀
  8. ignoreFailuresAfterTransactionTimeout(),在恢复时忽略事务超时异常
DataStream<String> stream = ...;
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<String>(
        "my-topic",                  // 目标 topic
        new SimpleStringSchema(),    // 序列化 schema
        properties,                  // producer 配置
        FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // 容错
stream.addSink(myProducer);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

Producer分区

使用 FlinkKafkaProducer 往 kafka 中写数据时,如果不单独设置 partition 策略,默认使用 FlinkFixedPartitioner,该 partitioner 分区的方式是 task 所在的并发 id 对 topic 总 partition 数取余:parallelInstanceId % partitions.length

  • 如果 sink 为 4,paritition 为 1,则 4 个 task 往同一个 partition 中写数据

  • sink task < partition 个数时会有部分 partition 没有数据写入,如 sink task 为 2,partition 总数为 4,则后面两个 partition 将没有数据写入

  • 如果构建 FlinkKafkaProducer 时,partition 设置为 null,此时会使用 kafka producer 默认分区方式,非 key 写入的情况下,使用 round-robin 的方式进行分区,每个 task 都会轮循的写下游的所有 partition。该方式下游的 partition 数据会比较均衡,但是缺点是 partition 个数过多的情况下需要维持过多的网络连接,即每个 task 都会维持跟所有 partition 所在 broker 的连接

容错

启用 Flink 的 checkpointing 后,FlinkKafkaProducer 可以提供精确一次的语义保证。

除了启用 Flink 的 checkpointing,也可以通过将适当的 semantic 参数传递给 FlinkKafkaProducer 来选择三种不同的操作模式:

  • Semantic.NONE:Flink 不会有任何语义的保证,产生的记录可能会丢失或重复。
  • Semantic.AT_LEAST_ONCE(默认设置):可以保证不会丢失任何记录(但是记录可能会重复)
  • Semantic.EXACTLY_ONCE:使用 Kafka 事务提供精确一次语义。无论何时,在使用事务写入 Kafka 时,都要记得为所有消费 Kafka 消息的应用程序设置所需的 isolation.levelread_committedread_uncommitted - 后者是默认值)

Semantic.EXACTLY_ONCE 模式依赖于事务提交的能力。事务提交发生于触发 checkpoint 之前,以及从 checkpoint 恢复之后。如果从 Flink 应用程序崩溃到完全重启的时间超过了 Kafka 的事务超时时间,那么将会有数据丢失(Kafka 会自动丢弃超出超时时间的事务)

默认情况下,Kafka broker 将 transaction.max.timeout.ms 设置为 15 分钟。此属性不允许为大于其值的 producer 设置事务超时时间。 默认情况下,FlinkKafkaProducer 将 producer config 中的 transaction.timeout.ms 属性设置为 1 小时,因此在使用 Semantic.EXACTLY_ONCE 模式之前应该增加 transaction.max.timeout.ms 的值。

KafkaConsumerread_committed 模式中,任何未结束(既未中止也未完成)的事务将阻塞来自给定 Kafka topic 的未结束事务之后的所有读取数据。 换句话说,在遵循如下一系列事件之后:

  1. 用户启动了 transaction1 并使用它写了一些记录
  2. 用户启动了 transaction2 并使用它编写了一些其他记录
  3. 用户提交了 transaction2

即使 transaction2 中的记录已提交,在提交或中止 transaction1 之前,消费者也不会看到这些记录。这有 2 层含义:

  • 首先,在 Flink 应用程序的正常工作期间,用户可以预料 Kafka 主题中生成的记录的可见性会延迟,相当于已完成 checkpoint 之间的平均时间。
  • 其次,在 Flink 应用程序失败的情况下,此应用程序正在写入的供消费者读取的主题将被阻塞,直到应用程序重新启动或配置的事务超时时间过去后,才恢复正常。此标注仅适用于有多个 agent 或者应用程序写入同一 Kafka 主题的情况。

注意Semantic.EXACTLY_ONCE 模式为每个 FlinkKafkaProducer 实例使用固定大小的 KafkaProducer 池。每个 checkpoint 使用其中一个 producer。如果并发 checkpoint 的数量超过池的大小,FlinkKafkaProducer 将抛出异常,并导致整个应用程序失败。请合理地配置最大池大小和最大并发 checkpoint 数量。

注意Semantic.EXACTLY_ONCE 会尽一切可能不留下任何逗留的事务,否则会阻塞其他消费者从这个 Kafka topic 中读取数据。但是,如果 Flink 应用程序在第一次 checkpoint 之前就失败了,那么在重新启动此类应用程序后,系统中不会有先前池大小(pool size)相关的信息。因此,在第一次 checkpoint 完成前对 Flink 应用程序进行缩容,且并发数缩容倍数大于安全系数 FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR 的值的话,是不安全的。同样,在这种情况使用 setTransactionalIdPrefix() 改变 transactional.id 也是不安全的,因为系统也不知道先前使用的 transactional.id 前缀

05 Function

(1) 函数类

Flink暴露了所有udf函数的接口(实现方式为接口或抽象类),如:MapFunction、FilterFunction、ProcessFunction等。

// 将函数实现为匿名类
kafkaDStream.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String s) throws Exception {
                if (s.contains("actions")) {
                    return true;
                }
                return false;
            }
        });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

(2) 富函数

富函数是DataStreamAPI提供的一个函数类的接口,所有Flink函数类都有其Rich版本

  • 可以获取运行环境的上下文
  • 拥有生命周期方法
    • open()方法:初始化方法,在算子调用之前会调用open方法
    • close()方法:生命周期中最后一个调用的方法,用于清理工作
    • getRuntimeContext()方法,提供函数的RuntimeContext的一些信息,如函数执行的并行度、任务的名字、state状态等
public class MyJdbcSink extends RichSinkFunction<String> {
    // 定义连接
    Connection conn;
    // 创建连接,open常用于初始化
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","root");
    }
    // 关闭连接,常用于资源清理
    @Override
    public void close() throws Exception {
        super.close();
        conn.close();
    }
    // 调用连接执行SQL
    @Override
    public void invoke(String value, Context context) throws Exception {
        PreparedStatement preparedStatement = conn.prepareStatement(value);
        preparedStatement.execute();
        preparedStatement.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

关于Flink的Source与Sink就分享到这了,如果对你有帮助,动动小手点个关注吧~
在这里插入图片描述

文章知识点与官方知识档案匹配,可进一步学习相关知识
Java技能树首页概览137888 人正在系统学习中
原文链接:https://blog.csdn.net/qq_36369061/article/details/123031450