flink事件时间的水印延迟不会导致延迟数据在上一个窗口内

发布时间 2023-12-10 20:09:28作者: aminor

设窗口为5,延迟为3。
假如数据为:0 1 2 5 6 7 3 4 8

则两个窗口为 :
window = TimeWindow{start=0, end=5}
0
1
2
3
4
window = TimeWindow{start=5, end=10}
5
6
7
8

即:5 6 7 的数据不会包含在 TimeWindow{start=0, end=5} 里。

验证程序:

public class FlinkWindowExample {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> ds = env.socketTextStream("worker1", 7777);

        SingleOutputStreamOperator<Integer> watermarks = ds
                .map(Integer::parseInt)
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<Integer>forBoundedOutOfOrderness(Duration.ofMillis(3))
                        .withTimestampAssigner((event, timestamp) -> event));

        SingleOutputStreamOperator<Integer> windowedStream = watermarks
                .windowAll(TumblingEventTimeWindows.of(Time.milliseconds(5)))
                .apply(new AllWindowFunction<Integer, Integer, TimeWindow>() {
                    @Override
                    public void apply(TimeWindow window, Iterable<Integer> values, Collector<Integer> out) throws Exception {
                        System.out.println("window = " + window);
                        for (Integer value : values) {
                            out.collect(value);
                        }
                    }
                });

        windowedStream.print();

        env.execute("Flink Window Example");
    }
}