Flink之基于EventTime的WaterMark

发布时间 2023-04-21 11:01:32作者: 技术虫

什么是FLink watermark?

Flink watermark是一种用于时间和事件处理的机制,它为事件流中的每个事件都分配了一个时间戳,以便将其与其他事件进行排序和分组。Watermark还为每个事件流提供了一个智能定界框架,使Flink能够有效地控制事件流的处理方式。Watermark可以确保Flink在处理事件时不会超前于实际的事件发生时间,并且可以处理乱序的事件序列。

Flink Watermark的核心思想是,对于事件流的每个时间戳,Flink都会分配一个延迟值,这个延迟值就是Watermark。Flink 进行事件处理时,只有在Watermark到达后才会处理该时间戳之前的所有事件。Watermark可以在异步模式下工作,这意味着系统无需等待所有事件都到达才能开始处理它们。水印是通过在事件流中添加特殊的事件来生成的,这些事件表示实际事件之间的时间间隔。比如一个时间戳为t的事件,它所对应的watermark就是t-1,这意味着系统可以在所有时间戳小于t的事件都已经到达后,开始执行相关操作。这非常适合实时数据处理任务,因为实时处理任务通常需要在实际数据到达之前做出反应。

场景

     自定义一个数据源,每隔1s发送一条数据,数据格式“编号 时间戳 点击数”,定义一个5秒的滚动窗口,最大延迟时间设为0s, 同时定义一个迟到数据的侧输出流,确保数据不丢失。

代码:

import java.util.Date;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import lombok.Data;

@Data
class WaterSensor{
    //编号
    String id;
    //时间戳
    Long ts;
    //点击数
    Integer vc;

}

class MySource implements ParallelSourceFunction<String>{

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        // TODO Auto-generated method stub
        String[] input = {"1001 1 1","1001 2 1","1001 5 1","1001 7 1","1001 8 1"
                ,"1001 9 1","1001 3 1","1001 14 1","1001 4 1"};
        int length = input.length;
        int currentIndex = 0;
        while(true) {
            ctx.collect(input[currentIndex]);
            if (currentIndex!=length-1) {                
                currentIndex++;
            }else {    
//                break;
                currentIndex=0;
            }
            Thread.sleep(1000l);
        }
    }

    @Override
    public void cancel() {
        // TODO Auto-generated method stub
        
    }
    
}
public class FlinkWindowWatermark {
    @SuppressWarnings("serial")
    public static void main(String[] args)  {
        try {
            StreamExecutionEnvironment stream = StreamExecutionEnvironment.getExecutionEnvironment();
            stream.setParallelism(1);
            stream.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//            自定义数据源
//            DataStreamSource<String> socketTextStream = stream.socketTextStream("172.40.247.87", 8888);
            DataStreamSource<String> addSource = stream.addSource(new MySource());
            SingleOutputStreamOperator<WaterSensor> waterSensorStream = addSource.map((MapFunction<String,WaterSensor>)line -> {
                System.out.println("收到的数据:"+line);
                String[] split = line.split(" ");
                WaterSensor waterSensor = new WaterSensor();
                waterSensor.setId(split[0]);
                Date date = new Date();
                
                waterSensor.setTs(date.getTime());
//                waterSensor.setTs(Long.parseLong(split[1])*1000l);
                waterSensor.setVc(Integer.parseInt(split[2]));

                return waterSensor;
            });
            //提前定义侧输出,确保数据不丢失
            OutputTag<WaterSensor> lateData = new OutputTag<WaterSensor>("lateData") {};
            SingleOutputStreamOperator<String>  process = waterSensorStream
                    .assignTimestampsAndWatermarks(
                    //设置watermark的生成方式为periodic watermark,并实现extractTimestamp和getCurrentWatermark
                    new AssignerWithPeriodicWatermarks<WaterSensor>() {
                        private Long currentMaxTimeStamp=0L;
                        /*最大允许的消息延迟*/
                        private final Long maxOutOfOrderness = 0L;
                        @Override
                        public long extractTimestamp(WaterSensor element, long previousElementTimestamp) {
                            // TODO Auto-generated method stub
                            long timestamp = element.getTs();
                            currentMaxTimeStamp = Math.max(timestamp, currentMaxTimeStamp);
                            return timestamp;
                        }

                        @Override
                        public Watermark getCurrentWatermark() {
                            // TODO Auto-generated method stub
                            return new Watermark(currentMaxTimeStamp-maxOutOfOrderness);
//                            return new Watermark(currentMaxTimeStamp - maxOutOfOrderness);
                        }
                    })
                    .keyBy(ws->ws.getId())    
//                    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
            .window(TumblingEventTimeWindows.of(Time.seconds(5)))
            .sideOutputLateData(lateData)
            .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {

                @Override
                public void process(String arg0,
                        ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context arg1,
                        Iterable<WaterSensor> arg2, Collector<String> arg3) throws Exception {
                    // TODO Auto-generated method stub
                    StringBuilder stringBuilder = new StringBuilder() ;    
                    stringBuilder.append("时间窗口为:").append(arg1.window().getStart()).append("-")
                    .append(arg1.window().getEnd()).append("\n");
                    for (WaterSensor sensor : arg2) {
                        stringBuilder.append(sensor).append("\n");
                    }
//                    System.out.println(stringBuilder.toString());
                    arg3.collect(stringBuilder.toString());
                }
            });

             //获取主流的数据
            process.print("Main:");
            //获取侧输出流的数据
            process.getSideOutput(lateData).print("lateData:");
            
            stream.execute();
        } catch (Exception e) {
            // TODO: handle exception
        }
        
    }
    
    
}