全量窗口与增量窗口

发布时间 2023-04-01 16:10:21作者: 曹军

  全量和增量是可以共用的,这种场景还是比较常见的。

  例如,分组后,先对每个组进行增量计算,然后对每个增量计算,做一个全窗口的计算。

  这个时候,就需要互相结合使用了。

 

  

 

 

 

一:窗口函数说明

1:apply与process的区别

  

 

  • apply和process都是处理全量计算,但工作中正常用process。
  • process更加底层,更加强大,有open/close生命周期方法,又可获取RuntimeContext。

 

2.reduce与aggregate的区别

  • reduce接受两个相同类型的输入,生成一个同类型输出,所以泛型就一个 <T>
  • maxBy、minBy、sum这3个底层都是由reduce实现的
  • aggregate的输入值、中间结果值、输出值它们3个类型可以各不相同,泛型有<T, ACC, R>

 

AggregateFunction和ProcessWindowFunction结合使用

1.

  

 

 

 

2.结合 

  在reduce和aggregate中,都有一个可以把增量函数和全量函数结合使用的方法,就是上面图中标红色五角星的。

  对于一个窗口来说,Flink先增量计算,窗口关闭前,将增量计算结果发送给ProcessWindowFunction作为输入再进行处理。

public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction, ProcessWindowFunction<V, R, K, W> windowFunction)

 

3.示例

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.compress.utils.Lists;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

@Slf4j
public class OffLineAlarmStrategy implements AlarmStrategy<MonitoringIndex, DataOfflineResult> {
    @Override
    public DataStream<DataOfflineResult> execute(DataStream<MonitoringIndex> dataInterruptTimesStream) {
        // 判读出数据是否离线
        SingleOutputStreamOperator<DataOfflineResult> aggregateDS = dataInterruptTimesStream.assignTimestampsAndWatermarks(WatermarkStrategy.<MonitoringIndex>forBoundedOutOfOrderness(Duration.ofMinutes(1))
                        .withTimestampAssigner((SerializableTimestampAssigner<MonitoringIndex>) (element, recordTimestamp) -> {
                            if (element.getEventTime() > System.currentTimeMillis()) {
                                log.error("=======> the event time is larger than the current time,event time = {}", element.getEventTime());
                                return System.currentTimeMillis();
                            }
                            return element.getEventTime();
                        }))
                .keyBy((KeySelector<MonitoringIndex, Long>) MonitoringIndex::getCustomerId)

                .window(SlidingEventTimeWindows.of(Time.minutes(60), Time.minutes(20)))
                .aggregate(new AggregateFunction<MonitoringIndex, MonitorAggregate, MonitorAggregate>() {

                    @Override
                    public MonitorAggregate createAccumulator() {
                        return new MonitorAggregate();
                    }

                    @Override
                    public MonitorAggregate add(MonitoringIndex value, MonitorAggregate accumulator) {
                        accumulator.getSignalSet().add(value.getCustomerId() + "::" + value.getSignalNo() + "::" + value.getDeviceNo());
                        return accumulator;
                    }

                    @Override
                    public MonitorAggregate getResult(MonitorAggregate accumulator) {
                        return accumulator;
                    }

                    @Override
                    public MonitorAggregate merge(MonitorAggregate a, MonitorAggregate b) {
                        return null;
                    }
                }, new ProcessWindowFunction<MonitorAggregate, DataOfflineResult, Long, TimeWindow>() {

                    @Override
                    public void process(Long customerId, ProcessWindowFunction<MonitorAggregate, DataOfflineResult, Long, TimeWindow>.Context context, Iterable<MonitorAggregate> elements, Collector<DataOfflineResult> out) throws Exception {
                        long windowStart = context.window().getStart();
                        long windowEnd = context.window().getEnd();
                        log.info("==============窗口开始时间:{} 窗口结束时间:{}, customerId={}", windowStart, windowEnd, customerId);
                        List<MonitorAggregate> monitoringIndexList = Lists.newArrayList(elements.iterator());
                        Set<String> monitorAggregateSet = new HashSet<>();
                        if (!CollectionUtils.isEmpty(monitoringIndexList)) {
                            for (MonitorAggregate monitorAggregate : monitoringIndexList) {
                                monitorAggregateSet.addAll(monitorAggregate.getSignalSet());
                            }
                        }
                        if (!CollectionUtils.isEmpty(monitorAggregateSet)) {
                            List<SimpleMonitor> simpleMonitorList = new ArrayList<>();
                            for (String signalNoKey : monitorAggregateSet) {
                                String[] splitResult = signalNoKey.split("::");
                                SimpleMonitor simpleMonitor = new SimpleMonitor();
                                simpleMonitor.setCustomerId(Long.parseLong(splitResult[0]));
                                simpleMonitor.setSignalNo(splitResult[1]);
                                simpleMonitor.setDeviceNo(splitResult[2]);
                                simpleMonitorList.add(simpleMonitor);
                            }
                            if (!CollectionUtils.isEmpty(simpleMonitorList)) {
                                List<DataOfflineResult> dataOfflineResults = OneHourOffLineCalculate.getCalculateSignalStatus(customerId, simpleMonitorList, windowStart, windowEnd);
                                if (!CollectionUtils.isEmpty(dataOfflineResults)) {
                                    dataOfflineResults.forEach(out::collect);
                                }
                            }
                        }

                    }
                });return aggregateDS;
    }



}