Flink学习记录

发布时间 2023-10-04 16:54:52作者: 沈自在。

Flink 学习记录

1 简介

1.1 梗概

Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行状态计算。对比Spark来说,FLink是真正的流式计算框架,而不是像Spark的微批处理

1.2 工程搭建

<properties>
        <flink.version>1.13.0</flink.version>
        <slf4j.version>1.7.30</slf4j.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.1</version>
        </dependency>
    </dependencies>

log4j

log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

1.3 HelloWorld

1.3.1 批处理

public class BatchWordCount {

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment environment =
                ExecutionEnvironment.getExecutionEnvironment();

        DataSource<String> lineDataSource = environment.readTextFile("input/1.txt");

        FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDataSource
                .flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
                    String[] words = line.split(" ");
                    for (String word : words) {
                        out.collect(Tuple2.of(word, 1L));
                    }
                }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        UnsortedGrouping<Tuple2<String,Long>> wordAndOneUG = wordAndOne.groupBy(0);
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);
        sum.sortPartition(1, Order.ANY);
        sum.print();
    }

}

1.3.2 流处理

public class StreamWordCount {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment =
                StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> lineDataSource = environment.readTextFile("input/1.txt");
        
        //转换计算
        SingleOutputStreamOperator<Tuple2<String, Long>> returns = lineDataSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
                    String[] words = line.split(" ");
                    for (String word : words) {
                        out.collect(Tuple2.of(word, 1L));
                    }
                })
                .returns(Types.TUPLE(Types.STRING, Types.LONG));
        KeyedStream<Tuple2<String, Long>, String> wordAndOneGroup = returns.keyBy(data -> data.f0);

        SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneGroup.sum(1);

        sum.print();

        environment.execute();
    }
}

1.4 系统架构

​ 对于数据处理系统的架构,最简单的实现方式当然就是单节点。当数据量增大、处理计算更加复杂时,我们可以考虑增加 CPU 数量、加大内存,也就是让这一台机器变得性能更强大,从而提高吞吐量——这就是所谓的 SMP(Symmetrical Multi-Processing,对称多处理)架构。但是这样做问题非常明显:所有 CPU 是完全平等、共享内存和总线资源的,这就势必造成资源竞争;而且随着 CPU 核心数量的增加,机器的成本会指数增长,所以 SMP 的可扩展性是比较差的,无法应对海量数据的处理场景。

​ 于是人们提出了“不共享任何东西”(share-nothing)的分布式架构。从以 Greenplum 为代表的 MPP(Massively Parallel Processing,大规模并行处理)架构,到 Hadoop、Spark 为代表的批处理架构,再到 Storm、Flink 为代表的流处理架构,都是以分布式作为系统架构的基本形态的。我们已经知道,Flink 就是一个分布式的并行流处理系统。简单来说,它会由多个进程构成,这些进程一般会分布运行在不同的机器上。正如一个团队,人多了就会难以管理;对于一个分布式系统来说,也需要面对很多棘手的问题。其中的核心问题有:集群中资源的分配和管理、进程协调调度、持久化和高可用的数据存储,以及故障恢复。

​ 对于这些分布式系统的经典问题,业内已有比较成熟的解决方案和服务。所以 Flink 并不会自己去处理所有的问题,而是利用了现有的集群架构和服务,这样它就可以把精力集中在核心工作——分布式数据流处理上了。Flink 可以配置为独立(Standalone)集群运行,也可以方便地跟一些集群资源管理工具集成使用,比如 YARN、Kubernetes 和 Mesos。Flink 也不会自己去提供持久化的分布式存储,而是直接利用了已有的分布式文件系统(比如 HDFS)或者对象存储(比如 S3)。而对于高可用的配置,Flink 是依靠 Apache ZooKeeper 来完成的。我们所要重点了解的,就是在 Flink 中有哪些组件、是怎样具体实现一个分布式流处理系统的。如果大家对 Spark 或者 Storm 比较熟悉,那么稍后就会发现,Flink 其实有类似的概念和架构。

组件划分

Flink 的运行时架构中,最重要的就是两大组件:作业管理器(JobManger)和任务管理器(TaskManager)。对于一个提交执行的作业,JobManager 是真正意义上的“管理者”(Master),负责管理调度,所以在不考虑高可用的情况下只能有一个;而 TaskManager 是“工作者”(Worker、Slave),负责执行任务处理数据,所以可以有一个或多个。Flink 的作业提交和任务处理时的系统如下图所示。

20220916183136_image-20220916183135902

2 基础API

2.1 执行环境

获取执行环境

最智能的方式

        StreamExecutionEnvironment environment =
                StreamExecutionEnvironment.getExecutionEnvironment();

创建本地环境

        LocalStreamEnvironment localEnvironment = 
                StreamExecutionEnvironment.createLocalEnvironment();

创建远程环境

   		 StreamExecutionEnvironment remoteEnvironment = 
                StreamExecutionEnvironment.createRemoteEnvironment();

设置执行模式

environment.setRuntimeMode(STREAMING);

2.2 源算子

在创建好执行环境之后,我们需要做的就是获取数据了,而这一过程在Flink中就是源算子的范畴了.

添加自定义数据源

environment.addSource()

该方法有多个重载,仔细查看后可以发现其中最重要的一个点是SourceFunction<OUT> function参数,这是一个函数式接口,在源码中有这样一段注释:

			public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction {
           private long count = 0L;
           private volatile boolean isRunning = true;
     
           private transient ListState<Long> checkpointedCount;
     
           public void run(SourceContext<T> ctx) {
              while (isRunning && count < 1000) {
                   // this synchronized block ensures that state checkpointing,
                   // internal state updates and emission of elements are an atomic operation
                   synchronized (ctx.getCheckpointLock()) {
                       ctx.collect(count);
                       count++;
                   }
               }
           }
     
           public void cancel() {
               isRunning = false;
           }
     
           public void initializeState(FunctionInitializationContext context) {
               this.checkpointedCount = context
                   .getOperatorStateStore()
                   .getListState(new ListStateDescriptor<>("count", Long.class));
     
               if (context.isRestored()) {
                   for (Long count : this.checkpointedCount.get()) {
                       this.count = count;
                   }
               }
           }
     
           public void snapshotState(FunctionSnapshotContext context) {
               this.checkpointedCount.clear();
               this.checkpointedCount.add(count);
           }
      }

我们可以通过重写run方法与cancel方法实现数据的获取去停止,回到刚才,添加数据源的返回值是DataStreamSource<OUT>这里的

DataStreamSource类继承自 SingleOutputStreamOperator 类,又进一步继承自 DataStream。所以很明显,读取数据的 source 操作是一个算子,得到的是一个数据流(DataStream)。

尝试数据读取

        //从文件中获取
        DataStreamSource<String> streamSource =
                environment.readTextFile("input/clicks.txt");
        //从集合中获取
        ArrayList<Integer> arrayList = new ArrayList<>();
        arrayList.add(1);

        DataStreamSource<Integer> dataStreamSource = 
                environment.fromCollection(arrayList);
				//从元素获取数据

Socket 读取数据:

DataStream<String> stream = env.socketTextStream("localhost", 7777);

如何消费Kafka数据

Kafka 作为分布式消息传输队列,是一个高吞吐、易于扩展的消息系统。而消息队列的传输方式,恰恰和流处理是完全一致的。所以可以说 Kafka 和 Flink 天生一对,是当前处理流式数据的双子星。在如今的实时流处理应用中,由 Kafka 进行数据的收集和传输,Flink 进行分析计算,这样的架构已经成为众多企业的首选。

image-20220919125551915

开启 Kafka 发送数据

./kafka-console-producer --broker-list localhost:9092 --topic clicks

接收 Kafka 数据

        StreamExecutionEnvironment environment =
                StreamExecutionEnvironment.getExecutionEnvironment();

        Properties config = new Properties();
        config.setProperty("bootstrap.servers","localhost:9092");
        config.setProperty("group.id", "consumer-group");
        config.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        config.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        config.setProperty("auto.offset.reset", "latest");

        DataStreamSource<String> streamClick = environment.addSource(new FlinkKafkaConsumer<String>(
                "clicks",
                new SimpleStringSchema(),
                config
        ));

        streamClick.print("Kafka");

        environment.execute();

2.2.1 自定义Source

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Calendar;
import java.util.Random;
public class ClickSource implements SourceFunction<Event> {
	// 声明一个布尔变量,作为控制数据生成的标识位
	private Boolean running = true;
	@Override
	public void run(SourceContext<Event> ctx) throws Exception {
		Random random = new Random(); // 在指定的数据集中随机选取数据
    String[] users = {"Mary", "Alice", "Bob", "Cary"};
		String[] urls = {"./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2"};
		while (running) {
			ctx.collect(new Event(
			users[random.nextInt(users.length)],
			urls[random.nextInt(urls.length)],
			Calendar.getInstance().getTimeInMillis()
		));
		// 隔 1 秒生成一个点击事件,方便观测
			Thread.sleep(1000);
		} }
		@Override
		public void cancel() {
			running = false;
		}
}

⚠️这里要注意的是 SourceFunction 接口定义的数据源,并行度只能设置为 1,如果数据源设置为大于 1 的并行度,则会抛出异常。

高并行度自定义数据源

    public static class ParallelCustomSource implements ParallelSourceFunction<Integer>{
        @Override
        public void run(SourceContext<Integer> ctx) throws Exception {
            //
        }

        @Override
        public void cancel() {
            //
        }
    }

2.2 基本数据类型

2.3 转换API

map

			  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Event> stream = env.fromElements(new Event("1", "11", System.currentTimeMillis()),
                new Event("2", "22", System.currentTimeMillis())
        );

        stream.setParallelism(1);

        SingleOutputStreamOperator<String> result = stream.map(data -> data.user);

        result.print();

        env.execute();

filter

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Event> stream = env.fromElements(new Event("1", "11", System.currentTimeMillis()),
                new Event("2", "22", System.currentTimeMillis())
        );

        env.setParallelism(1);

        SingleOutputStreamOperator<Event> filter = stream.filter(data -> data.user.equals("2"));

        filter.print();
        env.execute();

flatMap 扁平映射

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Event> stream = env.fromElements(new Event("1", "11", System.currentTimeMillis()),
                new Event("2", "22", System.currentTimeMillis())
        );

        SingleOutputStreamOperator<String> result = stream.flatMap((Event data, Collector<String> out) -> {
            out.collect(data.user);
            out.collect(data.url);
        })
                .returns(Types.STRING);

        result.print();

        env.execute();

2.4 聚合算子

​ 直观上看,基本转换算子确实是在“转换”——因为它们都是基于当前数据,去做了处理和输出。而在实际应用中,我们往往需要对大量的数据进行统计或整合,从而提炼出更有用的信息。比如之前 word count 程序中,要对每个词出现的频次进行叠加统计。这种操作,计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并——这就是所谓的“聚合(Aggregation),也对应着 MapReduce 中的 reduce 操作。

keyBy 按键分区

​ 对于 Flink 而言,DataStream 是没有直接进行聚合的 API 的。因为我们对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率。所以在 Flink 中,要做聚合,需要先进行分区;这个操作就是通过 keyBy 来完成的。

​ keyBy 是聚合前必须要用到的一个算子。keyBy 通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务,也就对应着任务槽(task slot)。

​ 基于不同的 key,流中的数据将被分配到不同的分区中去,如图 5-8 所示;这样一来,所有具有相同的 key 的数据,都将被发往同一个分区,那么下一步算子操作就将会在同一个 slot中进行处理了。

image-20220920162433176

在内部,是通过计算 key 的哈希值(hash code),对分区数进行取模运算来实现的。所以这里 key 如果是 POJO 的话,必须要重写 hashCode()方法。keyBy()方法需要传入一个参数,这个参数指定了一个或一组 key。有很多不同的方法来指定 key:比如对于 Tuple 数据类型,可以指定字段的位置或者多个位置的组合;对于 POJO 类型,可以指定字段的名称(String);另外,还可以传入 Lambda 表达式或者实现一个键选择器(KeySelector),用于说明从数据中提取 key 的逻辑。

⚠️注意

需要注意的是,keyBy 得到的结果将不再是 DataStream,而是会将 DataStream 转换为KeyedStream。KeyedStream 可以认为是“分区流”或者“键控流”,它是对 DataStream 按照key 的一个逻辑分区,所以泛型有两个类型:除去当前流中的元素类型外,还需要指定 key 的类型。

KeyedStream 也继承自 DataStream,所以基于它的操作也都归属于 DataStream API。但它跟之前的转换操作得到的 SingleOutputStreamOperator 不同,只是一个流的分区操作,并不是一个转换算子。KeyedStream 是一个非常重要的数据结构,只有基于它才可以做后续的聚合操作(比如 sum,reduce);而且它可以将当前算子任务的状态(state)也按照 key 进行划分、限定为仅对当前 key 有效。

简单聚合

有了按键分区的数据流 KeyedStream,我们就可以基于它进行聚合操作了。Flink 为我们

内置实现了一些最基本、最简单的聚合 API,主要有以下几种:

  • sum():在输入流上,对指定的字段做叠加求和的操作。

  • min():在输入流上,对指定的字段求最小值。

  • max():在输入流上,对指定的字段求最大值。

  • minBy():与 min()类似,在输入流上针对指定字段求最小值。不同的是,min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而 minBy()则会返回包含字段最小值的整条数据。

  • maxBy():与 max()类似,在输入流上针对指定字段求最大值。两者区别与min()/minBy()完全一致。

简单聚合算子使用非常方便,语义也非常明确。这些聚合方法调用时,也需要传入参数;但并不像基本转换算子那样需要实现自定义函数,只要说明聚合指定的字段就可以了。指定字段的方式有两种:指定位置,和指定名称。对于元组类型的数据,同样也可以使用这两种方式来指定字段。需要注意的是,元组中字段的名称,是以 f0、f1、f2、…来命名的。

2.5 归约聚合

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TransReduceTest {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = 
      StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    // 这里的 ClickSource()使用了之前自定义数据源小节中的 ClickSource()
    env.addSource(new ClickSource())
      // 将 Event 数据类型转换成元组类型
      .map(new MapFunction<Event, Tuple2<String, Long>>() {
        @Override
        public Tuple2<String, Long> map(Event e) throws Exception {
          return Tuple2.of(e.user, 1L);
        }
      })
      .keyBy(r -> r.f0) // 使用用户名来进行分流
      .reduce(new ReduceFunction<Tuple2<String, Long>>() {
        @Override
        public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, 
                                           Tuple2<String, Long> value2) throws Exception {
          // 每到一条数据,用户 pv 的统计值加 1
          return Tuple2.of(value1.f0, value1.f1 + value2.f1);
        }
      })
      .keyBy(r -> true) // 为每一条数据分配同一个 key,将聚合结果发送到一条流中
      去
      .reduce(new ReduceFunction<Tuple2<String, Long>>() {
        @Override
        public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, 
                                           Tuple2<String, Long> value2) throws Exception {
          // 将累加器更新为当前最大的 pv 统计值,然后向下游发送累加器的值
          return value1.f1 > value2.f1 ? value1 : value2;
        }
      })
      .print();
    env.execute();
  }

2.6 自定义函数

抛开简单的继承接口实现类以及匿名函数类外还有这样的方法:

富函数类

“富函数类”也是 DataStream API 提供的一个函数类的接口,所有的 Flink 函数类都有其Rich 版本。富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction 等。既然“富”,那么它一定会比常规的函数类提供更多、更丰富的功能。与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

注:生命周期的概念在编程中其实非常重要,到处都有体现。例如:对于 C 语言来说,我们需要手动管理内存的分配和回收,也就是手动管理内存的生命周期。分配内存而不回收,会造成内存泄漏,回收没有分配过的内存,会造成空指针异常。而在 JVM 中,虚拟机会自动帮助我们管理对象的生命周期。对于前端来说,一个页面也会有生命周期。数据库连接、网络连接以及文件描述符的创建和关闭,也都形成了生命周期。所以生命周期的概念在编程中是无处不在的,需要我们多加注意。

Rich Function 有生命周期的概念。典型的生命周期方法有:

  • open()方法,是 Rich Function 的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如 map()或者 filter()方法被调用之前,open()会首先被调用。所以像文件 IO 的创建,数据库连接的创建,配置文件的读取等等这样一次性的工作,都适合在 open()方法中完成。。

  • close()方法,是生命周期中的最后一个调用的方法,类似于解构方法。一般用来做一些清理工作。需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如 RichMapFunction 中的 map(),在每条数据到来后都会触发一次调用。

public class RichMapTest extends RichMapFunction<Event, Integer> {

    @Override
    public void open(Configuration parameters) throws Exception {
        //
        super.open(parameters);
    }

    @Override
    public Integer map(Event value) throws Exception {
        //
        return null;
    }

    @Override
    public void close() throws Exception {
        //
        super.close();
    }
}

3 物理分区

随机分区

随机分区服从均匀分布(uniform distribution),所以可以把流中的数据随机打乱,均匀地传递到下游任务分区。因为是完全随机的,所以对于同样的输入数据, 每次执行得到的结果也不会相同。

image-20220920182810499

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class ShuffleTest {
  public static void main(String[] args) throws Exception {
    // 创建执行环境
    StreamExecutionEnvironment env = 
      StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    // 读取数据源,并行度为 1
    DataStreamSource<Event> stream = env.addSource(new ClickSource());
    // 经洗牌后打印输出,并行度为 4
    stream.shuffle().print("shuffle").setParallelism(4);
    env.execute();
  } }

轮询分区

轮询也是一种常见的重分区方式。简单来说就是“发牌”,按照先后顺序将数据做依次分发。通过调用 DataStream 的.rebalance()方法,就可以实现轮询重分区。rebalance使用的是 Round-Robin 负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去。

image-20220920182849614

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class RebalanceTest {
  public static void main(String[] args) throws Exception {
    // 创建执行环境
    StreamExecutionEnvironment env = 
      StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    // 读取数据源,并行度为 1
    DataStreamSource<Event> stream = env.addSource(new ClickSource());
    // 经轮询重分区后打印输出,并行度为 4
    stream.rebalance().print("rebalance").setParallelism(4);
    env.execute();
  } }

重缩放分区

重缩放分区和轮询分区非常相似。当调用 rescale()方法时,其实底层也是使用 Round-Robin算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中。也就是说,“发牌人”如果有多个,那么 rebalance 的方式是每个发牌人都面向所有人发牌;而 rescale的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌。

image-20220920182949269

​ 当下游任务(数据接收方)的数量是上游任务(数据发送方)数量的整数倍时,rescale的效率明显会更高。比如当上游任务数量是 2,下游任务数量是 6 时,上游任务其中一个分区的数据就将会平均分配到下游任务的 3 个分区中。

​ 由于 rebalance 是所有分区数据的“重新平衡”,当 TaskManager 数据量较多时,这种跨节点的网络传输必然影响效率;而如果我们配置的 task slot 数量合适,用 rescale 的方式进行“局部重缩放”,就可以让数据只在当前 TaskManager 的多个 slot 之间重新分配,从而避免了网络传输带来的损耗。

​ 从底层实现上看,rebalance 和 rescale 的根本区别在于任务之间的连接机制不同。rebalance将会针对所有上游任务(发送数据方)和所有下游任务(接收数据方)之间建立通信通道,这是一个笛卡尔积的关系;而 rescale 仅仅针对每一个任务和下游对应的部分任务之间建立通信通道,节省了很多资源。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
  org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
public class RescaleTest {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = 
      StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    // 这里使用了并行数据源的富函数版本
    // 这样可以调用 getRuntimeContext 方法来获取运行时上下文的一些信息
    env
      .addSource(new RichParallelSourceFunction<Integer>() {
        @Override
        public void run(SourceContext<Integer> sourceContext) throws 
          Exception {
          for (int i = 0; i < 8; i++) {
            // 将奇数发送到索引为 1 的并行子任务
            // 将偶数发送到索引为 0 的并行子任务
            if ((i + 1) % 2 == 
                getRuntimeContext().getIndexOfThisSubtask()) {
              sourceContext.collect(i + 1);
            } } }
        @Override
        public void cancel() {
        }
      })
      .setParallelism(2)
      .rescale()
      .print().setParallelism(4);
    env.execute();
  } }

广播

这种方式其实不应该叫做“重分区”,因为经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。可以通过调用 DataStream 的 broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去。

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class BroadcastTest {
  public static void main(String[] args) throws Exception {
    // 创建执行环境
    StreamExecutionEnvironment env = 
      StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    // 读取数据源,并行度为 1
    DataStreamSource<Event> stream = env.addSource(new ClickSource());
    // 经广播后打印输出,并行度为 4
    stream. broadcast().print("broadcast").setParallelism(4);
    env.execute();
  } }

全局分区

全局分区也是一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了 1,所以使用这个操作需要非常谨慎,可能对程序造成很大的压力。

自定义分区

当 Flink 提 供 的 所 有 分 区 策 略 都 不 能 满 足 用 户 的 需 求 时 , 我 们 可 以 通 过 使 用partitionCustom()方法来自定义分区策略。在调用时,方法需要传入两个参数,第一个是自定义分区器(Partitioner)对象,第二个是应用分区器的字段,它的指定方式与 keyBy 指定 key 基本一样:可以通过字段名称指定,也可以通过字段位置索引来指定,还可以实现一个 KeySelector。

import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class CustomPartitionTest {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = 
      StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    // 将自然数按照奇偶分区
    env.fromElements(1, 2, 3, 4, 5, 6, 7, 8)
      .partitionCustom(new Partitioner<Integer>() {
        @Override
        public int partition(Integer key, int numPartitions) {
          return key % 2;
        }
      }, new KeySelector<Integer, Integer>() {
        @Override
        public Integer getKey(Integer value) throws Exception {
          return value;
        }
      })
      .print().setParallelism(2);
    env.execute();
  } }

4 输出算子

文件体系数据下沉

        StreamingFileSink<String> sink = StreamingFileSink.<String>forRowFormat(new Path("./output"), new SimpleStringEncoder<>("UTF_8"))

                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .build()
                ).build();

        stream.addSink(sink);

输出到 Kafka

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.Properties;
public class SinkToKafkaTest {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = 
      StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    Properties properties = new Properties();
    properties.put("bootstrap.servers", "hadoop102:9092");
    DataStreamSource<String> stream = env.readTextFile("input/clicks.csv");
    stream
      .addSink(new FlinkKafkaProducer<String>(
        "clicks",
        new SimpleStringSchema(),
        properties
      ));
    env.execute();
  } }

输出到Redis

依赖

        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
        </dependency>
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import 
  org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfi
  g;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import 
  org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescrip
  tion;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
public class SinkToRedisTest {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = 
      StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    // 创建一个到 redis 连接的配置
    FlinkJedisPoolConfig conf = new 
      FlinkJedisPoolConfig.Builder().setHost("hadoop102").build();
    env.addSource(new ClickSource())
      .addSink(new RedisSink<Event>(conf, new MyRedisMapper()));
    env.execute();
  } }

输出到ES

依赖

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
  <version>${flink.version}</version>
</dependency>
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import 
  org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
  ;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
public class SinkToEsTest {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = 
      StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    DataStreamSource<Event> stream = env.fromElements(
      new Event("Mary", "./home", 1000L),
      new Event("Bob", "./cart", 2000L),
      new Event("Alice", "./prod?id=100", 3000L),
      new Event("Alice", "./prod?id=200", 3500L),
      new Event("Bob", "./prod?id=2", 2500L),
      new Event("Alice", "./prod?id=300", 3600L),
      new Event("Bob", "./home", 3000L),
      new Event("Bob", "./prod?id=1", 2300L),
      new Event("Bob", "./prod?id=3", 3300L));
    ArrayList<HttpHost> httpHosts = new ArrayList<>();
    httpHosts.add(new HttpHost("hadoop102", 9200, "http"));
    // 创建一个 ElasticsearchSinkFunction
    ElasticsearchSinkFunction<Event> elasticsearchSinkFunction = new 
      ElasticsearchSinkFunction<Event>() {
      @Override
      public void process(Event element, RuntimeContext ctx, RequestIndexer 
                          indexer) {
        HashMap<String, String> data = new HashMap<>();
        data.put(element.user, element.url);
        IndexRequest request = Requests.indexRequest()
          .index("clicks")
          .type("type") // Es 6 必须定义 type
          .source(data);
        indexer.add(request);
      }
    };
    stream.addSink(new ElasticsearchSink.Builder<Event>(httpHosts, 
                                                        elasticsearchSinkFunction).build());
    stream.addSink(esBuilder.build());
    env.execute();
  } }

输出到Mysql

5 时间和窗口

5.1 水位线

5.1.1 时间语义

处理时间 or 事件事件

5.1.1.1 处理时间

​ 处理时间的概念非常简单,就是指执行处理操作的机器的系统时间。

​ 如果我们以它作为衡量标准,那么数据属于哪个窗口就很明显了:只看窗口任务处理这条数据时,当前的系统时间。比如之前举的例子,数据 8 点 59 分 59 秒产生,而窗口计算时的时间是 9 点零 1 秒,那么这条数据就属于 9 点—10 点的窗口;如果数据传输非常快,9 点之前就到了窗口任务,那么它就属于 8 点—9 点的窗口了。每个并行的窗口子任务,就只按照自己的系统时钟划分窗口。假如我们在早上 8 点 10 分启动运行程序,那么接下来一直到 9 点以前处理的所有数据,都属于第一个窗口;9 点之后、10 点之前的所有数据就将属于第二个窗口。这种方法非常简单粗暴,不需要各个节点之间进行协调同步,也不需要考虑数据在流中的位置,简单来说就是“我的地盘听我的”。所以处理时间是最简单的时间语义。

5.1.1.2 事件时间

​ 事件时间,是指每个事件在对应的设备上发生的时间,也就是数据生成的时间。

​ 数据一旦产生,这个时间自然就确定了,所以它可以作为一个属性嵌入到数据中。这其实就是这条数据记录的“时间戳”(Timestamp)。

​ 在事件时间语义下,我们对于时间的衡量,就不看任何机器的系统时间了,而是依赖于数据本身。打个比方,这相当于任务处理的时候自己本身是没有时钟的,所以只好来一个数据就问一下“现在几点了”;而数据本身也没有表,只有一个自带的“出厂时间”,于是任务就基于这个时间来确定自己的时钟。由于流处理中数据是源源不断产生的,一般来说,先产生的数据也会先被处理,所以当任务不停地接到数据时,它们的时间戳也基本上是不断增长的,就可以代表时间的推进。

​ 当然我们会发现,这里有个前提,就是“先产生的数据先被处理”,这要求我们可以保证数据到达的顺序。但是由于分布式系统中网络传输延迟的不确定性,实际应用中我们要面对的数据流往往是乱序的。在这种情况下,就不能简单地把数据自带的时间戳当作时钟了,而需要用另外的标志来表示事件时间进展,在 Flink 中把它叫作事件时间的“水位线”(Watermarks)。关于水位线的概念和用法,我们会稍后介绍。

5.1.1.3 语义对比

​ 实际应用中,数据产生的时间和处理的时间可能是完全不同的。很长时间收集起来的数据,处理或许只要一瞬间;也有可能数据量过大、处理能力不足,短时间堆了大量数据处理不完,产生“背压”(back pressure)。

​ 通常来说,处理时间是我们计算效率的衡量标准,而事件时间会更符合我们的业务计算逻辑。所以更多时候我们使用事件时间;不过处理时间也不是一无是处。对于处理时间而言,由于没有任何附加考虑,数据一来就直接处理,因此这种方式可以让我们的流处理延迟降到最低,效率达到最高。

​ 但是我们前面提到过,在分布式环境中,处理时间其实是不确定的,各个并行任务时钟不统一;而且由于网络延迟,导致数据到达各个算子任务的时间有快有慢,对于窗口操作就可能收集不到正确的数据了,数据处理的顺序也会被打乱。这就会影响到计算结果的正确性。所以处理时间语义,一般用在对实时性要求极高、而对计算准确性要求不太高的场景。而在事件时间语义下,水位线成为了时钟,可以统一控制时间的进度。这就保证了我们总可以将数据划分到正确的窗口中,比如 8 点 59 分 59 秒产生的数据,无论网络传输的延迟是多少,它永远属于 8 点~9 点的窗口,不会错分。但我们知道数据还可能是乱序的,要想让窗口正确地收集到所有数据,就必须等这些错乱的数据都到齐,这就需要一定的等待时间。所以整体上看,事件时间语义是以一定延迟为代价,换来了处理结果的正确性。由于网络延迟一般只有毫秒级,所以即使是事件时间语义,同样可以完成低延迟实时流处理的任务。

​ 另外,除了事件时间和处理时间,Flink 还有一个“摄入时间”(Ingestion Time)的概念,它是指数据进入 Flink 数据流的时间,也就是 Source 算子读入数据的时间。摄入时间相当于是事件时间和处理时间的一个中和,它是把 Source 任务的处理时间,当作了数据的产生时间添加到数据里。这样一来,水位线(watermark)也就基于这个时间直接生成,不需要单独指定了。这种时间语义可以保证比较好的正确性,同时又不会引入太大的延迟。它的具体行为跟事件时间非常像,可以当作特殊的事件时间来处理。在 Flink 中,由于处理时间比较简单,早期版本默认的时间语义是处理时间;而考虑到事件时间在实际应用中更为广泛,从 1.12 版本开始,Flink 已经将事件时间作为了默认的时间语义。

5.1.2 如何生成水位线

​ 通过之前的分析我们可以知道水位线的存在是要保证数据可以正确到达指定位置,也就意味着一旦水位线出现了,那么就同事代表这个时间线之前的所有数据都已经全部到齐(之后也不会再出现了)。但是仔细想来现实中的网络传输难免骨感万分,所以我们只能尽可能的去保证水位线的正确性。

​ 因为,这里引入了时间延迟的概念,但是延迟归延迟,万一真的出现一个超级大延迟 的数据,岂不是直接寄了?在这里直观来看,我们的解决方案只能是增加延迟时间,这个时间越长越不容易遗漏数据,但是总不能设置为1年吧。。。。。要知道,在古时候延误战机可是要当斩的,过度延迟所带来的性能降低也是极其痛苦的,

​ 如果我们希望处理得更快、实时性更强,那么可以将水位线延迟设得低一些。这种情况下,可能很多迟到数据会在水位线之后才到达,就会导致窗口遗漏数据,计算结果不准确。对于这些 “漏网之鱼”,Flink 另外提供了窗口处理迟到数据的方法,我们会在后面介绍。当然,如果我们对准确性完全不考虑、一味地追求处理速度,可以直接使用处理时间语义,这在理论上可以得到最低的延迟。

​ 所以 Flink 中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。接下来我们就具体了解一下水位线在代码中的使用。

水位线生成策略

其实从宏观角度来看,水位线的生成其实分为俩部分:

  • 获取时间戳
  • 标记水位线

DataStream的API也有这样一个API---stream.assignTimestampsAndWatermarks()他的返回值和转换算子是一样的 --> SingleOutputStreamOperator<T>, 接着看它所需要的参数,WatermarkStrategy, 而这个类中所包含的TimestampAssignerWatermarkGenerator 与我们前面所列的俩部分其实不谋而合。

  • TimestampAssigner:主要负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。

  • WatermarkGenerator:主要负责按照既定的方式,基于时间戳生成水位线。在WatermarkGenerator 接口中,主要又有两个方法:onEvent()和 onPeriodicEmit()。

  • onEvent:每个事件(数据)到来都会调用的方法,它的参数有当前事件、时间戳,以及允许发出水位线的一个 WatermarkOutput,可以基于事件做各种操作

  • onPeriodicEmit:周期性调用的方法,可以由 WatermarkOutput 发出水位线。周期时间为处理时间,可以调用环境配置的.setAutoWatermarkInterval()方法来设置,默认为200ms。

env.getConfig().setAutoWatermarkInterval(60 * 1000L);

内置水位线生成器

​ WatermarkStrategy 这个接口是一个生成水位线策略的抽象,让我们可以灵活地实现自己的需求;但看起来有些复杂,如果想要自己实现应该还是比较麻烦的。好在 Flink 充分考虑到了我们的痛苦,提供了内置的水位线生成器(WatermarkGenerator),不仅开箱即用简化了编程,而且也为我们自定义水位线策略提供了模板。这两个生成器可以通过调用 WatermarkStrategy 的静态辅助方法来创建。它们都是周期性生成水位线的,分别对应着处理有序流和乱序流的场景。

  1. 有序流

​ 对于有序流,主要特点就是时间戳单调增长(Monotonously Increasing Timestamps),所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。简单来说,就是直接拿当前最大的时间戳作为水位线就可以了。

WatermarkStrategy.forMonotonousTimestamps()

demo

        stream.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.timestamp;
                    }
                }));

​ 上面代码中我们调用.withTimestampAssigner()方法,将数据中的 timestamp 字段提取出来,作为时间戳分配给数据元素;然后用内置的有序流水位线生成器构造出了生成策略。这样,提取出的数据时间戳,就是我们处理计算的事件时间。这里需要注意的是,时间戳和水位线的单位,必须都是毫秒。

  1. 乱序流

​ 由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间(Fixed Amount of Lateness)。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用 WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。这个方法需要传入一个 maxOutOfOrderness参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。

        .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.timestamp;
                    }
                })

        ).print()

​ 上面代码中,我们同样提取了 timestamp 字段作为时间戳,并且以 5 秒的延迟时间创建了处理乱序流的水位线生成器。

事实上,有序流的水位线生成器本质上和乱序流是一样的,相当于延迟设为 0 的乱序流水位线生成器,两者完全等同:

WatermarkStrategy.forMonotonousTimestamps()
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0))
  1. 自定义水位线
  • 周期性水位线生成器
public static class CustomWatermarkStrategy implements WatermarkStrategy<Event>{
        @Override
        public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new CustomPeriodicGenerator();
        }

        @Override
        public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return new SerializableTimestampAssigner<Event>() {
                @Override
                public long extractTimestamp(Event element, long recordTimestamp) {
                    return element.timestamp;
                }
            };
        }
    }

    public static class CustomPeriodicGenerator implements WatermarkGenerator<Event>{

        private Long delayTime = 5000L;
        private Long maxTs = Long.MIN_VALUE + delayTime + 1L;

        @Override
        public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
            maxTs = Math.max(event.timestamp, maxTs);
        }

        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            //发射水位线
            output.emitWatermark(new Watermark(maxTs - delayTime -1L));
        }
    }
  • 断点式水位线生成器
    public static class CustomPunctuatedGenerator implements WatermarkGenerator<Event>{
        @Override
        public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
            //只有遇到特定事件后 才发出水位线
            if (event.user.equals("指定数据"))
                output.emitWatermark(new Watermark(event.timestamp - 1));
        }

        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            //在周期性处理中不做操作
        }
    }
  1. 自定义数据源中发送数据源

5.2 窗口计算

5.2.1 概述

image-20220923143645922

5.2.2 分类

驱动型窗口

5.2.2.1 时间窗口

​ 时间窗口以时间点来定义窗口的开始(start)和结束(end),所以截取出的就是某一时间段的数据。到达结束时间时,窗口不再收集数据,触发计算输出结果,并将窗口关闭销毁。所以可以说基本思路就是“定点发车”。

​ 用结束时间减去开始时间,得到这段时间的长度,就是窗口的大小(window size)。这里的时间可以是不同的语义,所以我们可以定义处理时间窗口和事件时间窗口。

​ Flink 中有一个专门的类来表示时间窗口,名称就叫作 TimeWindow。这个类只有两个私有属性:start 和 end,表示窗口的开始和结束的时间戳,单位为毫秒。

5.2.2.2 记数窗口

​ 计数窗口基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口。这相当于座位有限、“人满就发车”,是否发车与时间无关。每个窗口截取数据的个数,就是窗口的大小。

​ 计数窗口相比时间窗口就更加简单,我们只需指定窗口大小,就可以把数据分配到对应的窗口中了。在 Flink 内部也并没有对应的类来表示计数窗口,底层是通过“全局窗口”(Global Window)来实现的。关于全局窗口,我们稍后讲解。

按照窗口分配数据的规则分配

5.2.2.3 滚动窗口

​ 滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。如果我们把多个窗口的创建,看作一个窗口的运动,那就好像它在不停地向前“翻滚”一样。这是最简单的窗口形式,我们之前所举的例子都是滚动窗口。也正是因为滚动窗口是“无缝衔接”,所以每个数据都会被分配到一个窗口,而且只会属于一个窗口。

​ 滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有一个,就是窗口的大小(window size)。比如我们可以定义一个长度为 1 小时的滚动时间窗口,那么每个小时就会进行一次统计;或者定义一个长度为 10 的滚动计数窗口,就会每 10 个数进行一次统计。

image-20220923144913222

​ 如上图所示,小圆点表示流中的数据,我们对数据按照 userId 做了分区。当固定了窗口大小之后,所有分区的窗口划分都是一致的;窗口没有重叠,每个数据只属于一个窗口。滚动窗口应用非常广泛,它可以对每个时间段做聚合统计,很多 BI 分析指标都可以用它来实现。

5.2.2.4 滑动窗口

​ 与滚动窗口类似,滑动窗口的大小也是固定的。区别在于,窗口之间并不是首尾相接的,而是可以“错开”一定的位置。如果看作一个窗口的运动,那么就像是向前小步“滑动”一样。既然是向前滑动,那么每一步滑多远,就也是可以控制的。所以定义滑动窗口的参数有两个:除去窗口大小(window size)之外,还有一个“滑动步长”(window slide),它其实就代表了窗口计算的频率。滑动的距离代表了下个窗口开始的时间间隔,而窗口大小是固定的,所以也就是两个窗口结束时间的间隔;窗口在结束时间触发计算输出结果,那么滑动步长就代表了计算频率。例如,我们定义一个长度为 1 小时、滑动步长为 5 分钟的滑动窗口,那么就会统计 1 小时内的数据,每 5 分钟统计一次。同样,滑动窗口可以基于时间定义,也可以基于数据个数定义。

image-20220923145737616

​ 我们可以看到,当滑动步长小于窗口大小时,滑动窗口就会出现重叠,这时数据也可能会被同时分配到多个窗口中。而具体的个数,就由窗口大小和滑动步长的比值(size/slide)来决定。如图 6-18 所示,滑动步长刚好是窗口大小的一半,那么每个数据都会被分配到 2 个窗口里。比如我们定义的窗口长度为 1 小时、滑动步长为 30 分钟,那么对于 8 点 55 分的数据,应该同时属于[8 点, 9 点)和[8 点半, 9 点半)两个窗口;而对于 8 点 10 分的数据,则同时属于[8点, 9 点)和[7 点半, 8 点半)两个窗口。

​ 所以,滑动窗口其实是固定大小窗口的更广义的一种形式;换句话说,滚动窗口也可以看作是一种特殊的滑动窗口——窗口大小等于滑动步长(size = slide)。当然,我们也可以定义滑动步长大于窗口大小,这样的话就会出现窗口不重叠、但会有间隔的情况;这时有些数据不属于任何一个窗口,就会出现遗漏统计。所以一般情况下,我们会让滑动步长小于窗口大小,并尽量设置为整数倍的关系。在一些场景中,可能需要统计最近一段时间内的指标,而结果的输出频率要求又很高,甚至要求实时更新,比如股票价格的 24 小时涨跌幅统计,或者基于一段时间内行为检测的异常报警。这时滑动窗口无疑就是很好的实现方式。

5.2.2.5 会话窗口

​ 会话窗口顾名思义,是基于“会话”(session)来来对数据进行分组的。这里的会话类似Web 应用中 session 的概念,不过并不表示两端的通讯过程,而是借用会话超时失效的机制来描述窗口。简单来说,就是数据来了之后就开启一个会话窗口,如果接下来还有数据陆续到来,那么就一直保持会话;如果一段时间一直没收到数据,那就认为会话超时失效,窗口自动关闭。这就好像我们打电话一样,如果时不时总能说点什么,那说明还没聊完;如果陷入了尴尬的沉默,半天都没话说,那自然就可以挂电话了。

​ 与滑动窗口和滚动窗口不同,会话窗口只能基于时间来定义,而没有“会话计数窗口”的概念。这很好理解,“会话”终止的标志就是“隔一段时间没有数据来”,如果不依赖时间而改成个数,就成了“隔几个数据没有数据来”,这完全是自相矛盾的说法。而同样是基于这个判断标准,这“一段时间”到底是多少就很重要了,必须明确指定。对于会话窗口而言,最重要的参数就是这段时间的长度(size),它表示会话的超时时间,也就是两个会话窗口之间的最小距离。如果相邻两个数据到来的时间间隔(Gap)小于指定的大小(size),那说明还在保持会话,它们就属于同一个窗口;如果 gap 大于 size,那么新来的数据就应该属于新的会话窗口,而前一个窗口就应该关闭了。在具体实现上,我们可以设置静态固定的大小(size),也可以通过一个自定义的提取器(gap extractor)动态提取最小间隔 gap 的值。

​ 考虑到事件时间语义下的乱序流,这里又会有一些麻烦。相邻两个数据的时间间隔 gap大于指定的 size,我们认为它们属于两个会话窗口,前一个窗口就关闭;可在数据乱序的情况下,可能会有迟到数据,它的时间戳刚好是在之前的两个数据之间的。这样一来,之前我们判断的间隔中就不是“一直没有数据”,而缩小后的间隔有可能会比 size 还要小——这代表三个数据本来应该属于同一个会话窗口。

​ 所以在 Flink 底层,对会话窗口的处理会比较特殊:每来一个新的数据,都会创建一个新的会话窗口;然后判断已有窗口之间的距离,如果小于给定的 size,就对它们进行合并(merge)操作。在 Window 算子中,对会话窗口会有单独的处理逻辑。

image-20220923150138760

​ 我们可以看到,与前两种窗口不同,会话窗口的长度不固定,起始和结束时间也是不确定的,各个分区之间窗口没有任何关联。如图 6-19 所示,会话窗口之间一定是不会重叠的,而且会留有至少为 size 的间隔(session gap)。

​ 在一些类似保持会话的场景下,往往可以使用会话窗口来进行数据的处理统计。

5.2.2.6 全局窗口

​ 还有一类比较通用的窗口,就是“全局窗口”。这种窗口全局有效,会把相同 key 的所有数据都分配到同一个窗口中;说直白一点,就跟没分窗口一样。无界流的数据永无止尽,所以这种窗口也没有结束的时候,默认是不会做触发计算的。如果希望它能对数据进行计算处理,还需要自定义“触发器”(Trigger)。关于触发器,我们会在后面的 6.3.6 小节进行讲解。

image-20220923150520839

5.2.3 API