Flink写文件

发布时间 2024-01-08 16:38:04作者: 粒子先生

简介

StreamingFileSink 提供了将数据分桶写入文件系统的功能。

如何分桶是可以配置,默认使用基于时间的分桶策略,每个小时创建一个新的桶,也可以自定义分桶策略。

文件滚动策略支持两种方式,基于时间和文件大小的DefaultRollingPolicy策略和基于Flink检查点的OnCheckpointRollingPolicy策略。

文件结构及写入方式

 

 

 

part file三种状态:

  • in-progress:表示当前part file正在被写入
  • pending:写入完成后等待提交的状态(flink写hdfs的二段提交的体现)
  • finished:写入完成状态,只有finished状态下的文件才会保证不会再有修改,下游可安全读取

part file默认命名规则如下:

  • In-progress / Pending: part--.inprogress.uid

  • Finished: part--

flink允许用户给part file自定义前缀和后缀,通过OutputFileConfig即可配置,比如使用"prefix"和"ext"分别作为前、后缀,得到的文件如下:

└── 2019-08-25–12
├── prefix-0-0.ext
├── prefix-0-1.ext.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
├── prefix-1-0.ext
└── prefix-1-1.ext.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11

OutputFileConfig config = OutputFileConfig
 .builder()
 .withPartPrefix("prefix")
 .withPartSuffix(".ext")
 .build();
 
StreamingFileSink<Tuple2<Integer, Integer>> sink = StreamingFileSink
 .forRowFormat((new Path(outputPath), new SimpleStringEncoder<>("UTF-8"))
 .withBucketAssigner(new KeyBucketAssigner())
 .withRollingPolicy(OnCheckpointRollingPolicy.build())
 .withOutputFileConfig(config)
 .build();

StreamingFileSink提供了基于行、列两种文件写入格式

Row-encoded sink: StreamingFileSink.forRowFormat(basePath, rowEncoder)
Bulk-encoded sink: StreamingFileSink.forBulkFormat(basePath, bulkWriterFactory)
//
StreamingFileSink.forRowFormat(new Path(path), new SimpleStringEncoder<T>())
.withBucketAssigner(new PaulAssigner<>()) //分桶策略
.withRollingPolicy(new PaulRollingPolicy<>()) //滚动策略
.withBucketCheckInterval(CHECK_INTERVAL) //检查分桶的时间间隔
.build();
  
//列 parquet
StreamingFileSink.forBulkFormat(new Path(path), ParquetAvroWriters.forReflectRecord(clazz))
.withBucketAssigner(new PaulBucketAssigner<>())
.withBucketCheckInterval(CHECK_INTERVAL)
.build();

RowFormatBuilder 允许用户指定:

  • Custom RollingPolicy :自定义滚动策略以覆盖默认的 DefaultRollingPolicy

  • bucketCheckInterval (默认为1分钟):毫秒间隔,用于基于时间的滚动策略。

bulkWriterFactory可以自定义实现,自己想要的格式。

滚动策略

滚动策略 RollingPolicy 定义了指定的文件在何时关闭(closed)并将其变为 Pending 状态,随后变为 Finished 状态。

处于 Pending 状态的文件会在下一次 Checkpoint 时变为 Finished 状态,通过设置 Checkpoint 间隔时间,可以控制部分文件(part file)对下游读取者可用的速度、大小和数量。

Flink 有两个内置的滚动策略:

行/列这两种写入格式除了文件格式的不同,另外一个很重要的区别就是文件滚动策略的不同,forRowFormat行写可基于文件大小、滚动时间、不活跃时间进行滚动,

但是对于forBulkFormat列写方式只能基于checkpoint机制进行文件滚动,即在执行snapshotState方法时滚动文件,如果基于大小或者时间滚动文件,

那么在任务失败恢复时就必须对处于in-processing状态的文件按照指定的offset进行truncate,由于列式存储是无法针对文件offset进行truncate的,

因此就必须在每次checkpoint使文件滚动,其使用的滚动策略实现是OnCheckpointRollingPolicy。

重要: 使用 StreamingFileSink 时需要启用 Checkpoint ,每次做 Checkpoint 时文件写入完成转为Finished状态。如果 Checkpoint 被禁用,部分文件(part file)将永远处于 'in-progress' 或 'pending' 状态,下游系统无法安全地读取。

重要: 批量编码模式仅支持 OnCheckpointRollingPolicy 策略, 在每次 checkpoint 的时候切割文件。

小文件处理

不管是Flink还是SparkStreaming写hdfs不可避免需要关注的一个点就是如何处理小文件,众多的小文件会带来两个影响:

  • Hdfs NameNode维护元数据成本增加
  • 下游hive/spark任务执行的数据读取成本增加

小文件产生原因:

这与文件滚动周期、checkpoint时间间隔设置相关,如果滚动周期较短、checkpoint时间也比较短或者数据流量有低峰期达到文件不活跃的时间间隔,很容易产生小文件。

几种处理小文件的方式:

  • 减少并行度

回顾一下文件生成格式:part + subtaskIndex + connter,其中subtaskIndex代表着任务并行度的序号,也就是代表着当前的一个写task,越大的并行度代表着越多的subtaskIndex,数据就越分散,

如果我们减小并行度,数据写入由更少的task来执行,写入就相对集中,这个在一定程度上减少的文件的个数,但是在减少并行的同时意味着任务的并发能力下降;

  • 增大checkpoint周期或者文件滚动周期

以parquet写分析为例,parquet写文件由processing状态变为pending状态发生在checkpoint的snapshotState阶段中,如果checkpoint周期时间较短,就会更快发生文件滚动,增大checkpoint周期,

那么文件就能积累更多数据之后发生滚动,但是这种增加时间的方式带来的是数据的一定延时;

  • 下游任务合并处理

待Flink将数据写入hdfs后,下游开启一个hive或者spark定时任务,通过改变分区的方式,将文件写入新的目录中,后续任务处理读取这个新的目录数据即可,同时还需要定时清理产生的小文件,

这种方式虽然增加了后续的任务处理成本,但是其即合并了小文件提升了后续任务分析速度,也将小文件清理了减小了对NameNode的压力,相对于上面两种方式更加稳定,因此也比较推荐这种方式。

通用注意事项

  • 重要提示 1

使用 Hadoop < 2.7 时,请使用 OnCheckpointRollingPolicy 滚动策略,该策略会在每次检查点时进行文件切割。 这样做的原因是如果部分文件的生命周期跨多个检查点,

当 StreamingFileSink 从之前的检查点进行恢复时会调用文件系统的 truncate() 方法清理 in-progress 文件中未提交的数据。 Hadoop 2.7 之前的版本不支持这个方法,因此 Flink 会报异常。

  • 重要提示 2

鉴于 Flink 的 sink 以及 UDF 通常不会区分作业的正常结束(比如有限流)和异常终止,因此正常结束作业的最后一批 in-progress 文件不会被转换到 “完成” 状态。

  • 重要提示 3

Flink 以及 StreamingFileSink 不会覆盖已经提交的数据。因此如果尝试从一个包含 in-progress 文件的旧 checkpoint/savepoint 恢复, 且这些 in-progress 文件会被接下来的成功 checkpoint 提交,

Flink 会因为无法找到 in-progress 文件而抛异常,从而恢复失败。

  • 重要提示 4

目前 StreamingFileSink 只支持三种文件系统: HDFS、S3和Local。如果配置了不支持的文件系统,在执行的时候 Flink 会抛出异常。

Demo

列模式写入

package com.jc.main;
 
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.jc.config.Config;
import com.jc.config.ConfigBuilder;
import com.jc.domain.WildAnimalNews;
import com.jc.utils.ParameterUtil;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.avro.ParquetAvroWriters;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.util.Properties;
 
/**
 * 读取Kafka数据,Sink到Hdfs
 *
 * @author libin
 * @date 2020-06-23
 */
public class Kafka2HdfsParquet {
    private static Logger logger = LoggerFactory.getLogger(Kafka2HdfsParquet.class);
 
    /**
     * 构建数据流
     *
     * @param env Flink运行环境
     * @return DataStream<JSONObject>
     */
    public DataStream<JSONObject> buildDataStream(StreamExecutionEnvironment env) {
        String topic = Config.getKafkaConfig().getKafkaTopic();
        Properties properties = Config.getKafkaConfig().getConsumerProperties();
        FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
                topic,
                new SimpleStringSchema(),
                properties);
 
        DataStream<String> input = env.addSource(consumer.setStartFromEarliest()).setParallelism(1);
        return input.map(JSON::parseObject);
    }
 
    /**
     * 开始计算
     *
     * @throws Exception 异常抛出
     */
    public void startup(String[] args) throws Exception {
        System.setProperty("hadoop.home.dir", "C:\\tools\\hadoop");
        /*
         加载Flink运行环境
         */
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        /*
        添加检查点
         */
        env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);
//        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        ParameterUtil parameterUtil = ConfigBuilder.builder(args).setGlobalEvn(env).build();
        Config.init(parameterUtil);
 
        DataStream<JSONObject> dataStream = buildDataStream(env);
//        DateTimeBucketAssigner<WildAnimalNews> bucketAssigner = new DateTimeBucketAssigner<>(pathFormat, ZoneId.of(zone));
        BasePathBucketAssigner<WildAnimalNews> basePathBucketAssigner = new BasePathBucketAssigner();
        StreamingFileSink<WildAnimalNews> streamingFileSink = StreamingFileSink
                .forBulkFormat(new Path(Config.getCommonConfig().getHdfsUrl() + "/dwd/news/dwd_wildanimal_baidu_news_parquet"), ParquetAvroWriters.forReflectRecord(WildAnimalNews.class))
//                .withBucketAssigner(basePathBucketAssigner)
//                .withBucketAssigner(new CustomBucket<>())
                .withBucketCheckInterval(60)
                .build();
 
        dataStream.map(jsonObject -> {
            WildAnimalNews wildAnimalNews = new WildAnimalNews();
            wildAnimalNews.setOdsId(jsonObject.getString("odsId"));
            wildAnimalNews.setDwdId(jsonObject.getString("dwdId"));
            wildAnimalNews.setCreateDate(jsonObject.getLong("createDate"));
            JSONObject data = jsonObject.getJSONObject("data");
            wildAnimalNews.setSourceUrl(data.getString("sourceUrl"));
            wildAnimalNews.setCaseName(data.getString("caseName"));
            wildAnimalNews.setGatherTime(data.getLong("gatherTime"));
            wildAnimalNews.setPubDate(data.getLong("pubDate"));
            wildAnimalNews.setSearchWord(data.getString("searchWord"));
            wildAnimalNews.setAuthor(data.getString("author"));
            wildAnimalNews.setContent(data.getString("content"));
 
            wildAnimalNews.setWildAnimalName(data.getJSONArray("wildAnimalName").toJavaList(String.class));
            wildAnimalNews.setMotivation(data.getJSONArray("motivation").toJavaList(String.class));
            wildAnimalNews.setMode(data.getJSONArray("mode").toJavaList(String.class));
            wildAnimalNews.setWildAnimalType(data.getJSONArray("wildAnimalType").toJavaList(String.class));
            wildAnimalNews.setHuntingSite(data.getJSONArray("huntingSite").toJavaList(String.class));
            wildAnimalNews.setWildlifeTradingPlaces(data.getJSONArray("wildlifeTradingPlaces").toJavaList(String.class));
            wildAnimalNews.setWildAnimalProtectionClass(data.getJSONArray("wildAnimalProtectionClass").toJavaList(String.class));
            wildAnimalNews.setAnimalProducts(data.getJSONArray("animalProducts").toJavaList(String.class));
            wildAnimalNews.setAnimalRelatedQualification(data.getJSONArray("animalRelatedQualification").toJavaList(String.class));
 
            wildAnimalNews.setProvince(data.getString("province"));
            wildAnimalNews.setProvinceCode(data.getString("provinceCode"));
            wildAnimalNews.setCity(data.getString("city"));
            wildAnimalNews.setCityCode(data.getString("cityCode"));
            wildAnimalNews.setArea(data.getString("area"));
            wildAnimalNews.setAreaCode(data.getString("areaCode"));
            wildAnimalNews.setEmotionalScore(data.getInteger("emotionalScore"));
            wildAnimalNews.setEmotionalType(data.getString("emotionalType"));
//            System.out.println(jsonObject.getString("odsId"));
            return wildAnimalNews;
        }).addSink(streamingFileSink);
 
        env.execute("野生动物Kafka->hdfs");
    }
 
    public static void main(String[] args) throws Exception {
        Kafka2HdfsParquet kafka2Hdfs = new Kafka2HdfsParquet();
        kafka2Hdfs.startup(args);
    }
}

行模式写入

package com.jc.main;
 
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.jc.bucket.CustomBucket;
import com.jc.config.Config;
import com.jc.config.ConfigBuilder;
import com.jc.domain.WildAnimalNews;
import com.jc.utils.ParameterUtil;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.util.Properties;
import java.util.concurrent.TimeUnit;
 
/**
 * 读取Kafka数据,Sink到Hdfs
 *
 * @author libin
 * @date 2020-06-23
 */
public class Kafka2Hdfs {
    private static Logger logger = LoggerFactory.getLogger(Kafka2Hdfs.class);
 
    /**
     * 构建数据流
     *
     * @param env Flink运行环境
     * @return DataStream<JSONObject>
     */
    public DataStream<JSONObject> buildDataStream(StreamExecutionEnvironment env) {
        String topic = Config.getKafkaConfig().getKafkaTopic();
        Properties properties = Config.getKafkaConfig().getConsumerProperties();
        FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
                topic,
                new SimpleStringSchema(),
                properties);
 
        DataStream<String> input = env.addSource(consumer.setStartFromEarliest()).setParallelism(1);
        return input.map(JSON::parseObject);
    }
 
    /**
     * 开始计算
     *
     * @throws Exception 异常抛出
     */
    public void startup(String[] args) throws Exception {
        System.setProperty("hadoop.home.dir", "C:\\tools\\hadoop");
        /*
         加载Flink运行环境
         */
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        /*
        添加检查点
         */
        env.enableCheckpointing(6000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        ParameterUtil parameterUtil = ConfigBuilder.builder(args).setGlobalEvn(env).build();
        Config.init(parameterUtil);
 
        DataStream<JSONObject> dataStream = buildDataStream(env);
 
        // 自定义滚动策略
        DefaultRollingPolicy<String, String> rollPolicy = DefaultRollingPolicy.create()
                // 每隔多长时间生成一个文件
                .withRolloverInterval(TimeUnit.HOURS.toMillis(1))
                // 默认60秒,未写入数据处于不活跃状态超时会滚动新文件
                .withInactivityInterval(TimeUnit.HOURS.toMillis(1))
                // 设置每个文件的最大大小 ,默认是128M
                .withMaxPartSize(12800 * 1024 * 1024)
                .build();
        // 设置Hadoop用户
        System.setProperty("HADOOP_USER_NAME", Config.getCommonConfig().getHdfsUser());
        StreamingFileSink<String> streamingFileSink = StreamingFileSink
                /*
                    forRowFormat指定文件的跟目录与文件写入编码方式,
                    这里使用SimpleStringEncoder 以UTF-8字符串编码方式写入文件
                 */
                .forRowFormat(new Path(Config.getCommonConfig().getHdfsUrl() + "/dwd/news/dwd_wildanimal_baidu_news"), new SimpleStringEncoder<String>("UTF-8"))
                .withBucketAssigner(new CustomBucket<>())
                // 设置上面指定的滚动策略
                .withRollingPolicy(rollPolicy)
                // 桶检查间隔,这里设置为1s
                .withBucketCheckInterval(1)
                .build();
 
        dataStream.map(jsonObject -> {
//            JSONObject jsonObject = JSONObject.parseObject(line.toString());
            WildAnimalNews wildAnimalNews = new WildAnimalNews();
            wildAnimalNews.setOdsId(jsonObject.getString("odsId"));
            wildAnimalNews.setDwdId(jsonObject.getString("dwdId"));
            wildAnimalNews.setCreateDate(jsonObject.getLong("createDate"));
            JSONObject data = jsonObject.getJSONObject("data");
            wildAnimalNews.setSourceUrl(data.getString("sourceUrl"));
            wildAnimalNews.setCaseName(data.getString("caseName"));
            wildAnimalNews.setGatherTime(data.getLong("gatherTime"));
            wildAnimalNews.setPubDate(data.getLong("pubDate"));
            wildAnimalNews.setSearchWord(data.getString("searchWord"));
            wildAnimalNews.setAuthor(data.getString("author"));
            wildAnimalNews.setContent(data.getString("content"));
 
            wildAnimalNews.setWildAnimalName(data.getJSONArray("wildAnimalName").toJavaList(String.class));
            wildAnimalNews.setMotivation(data.getJSONArray("motivation").toJavaList(String.class));
            wildAnimalNews.setMode(data.getJSONArray("mode").toJavaList(String.class));
            wildAnimalNews.setWildAnimalType(data.getJSONArray("wildAnimalType").toJavaList(String.class));
            wildAnimalNews.setHuntingSite(data.getJSONArray("huntingSite").toJavaList(String.class));
            wildAnimalNews.setWildlifeTradingPlaces(data.getJSONArray("wildlifeTradingPlaces").toJavaList(String.class));
            wildAnimalNews.setWildAnimalProtectionClass(data.getJSONArray("wildAnimalProtectionClass").toJavaList(String.class));
            wildAnimalNews.setAnimalProducts(data.getJSONArray("animalProducts").toJavaList(String.class));
            wildAnimalNews.setAnimalRelatedQualification(data.getJSONArray("animalRelatedQualification").toJavaList(String.class));
 
            wildAnimalNews.setProvince(data.getString("province"));
            wildAnimalNews.setProvinceCode(data.getString("provinceCode"));
            wildAnimalNews.setCity(data.getString("city"));
            wildAnimalNews.setCityCode(data.getString("cityCode"));
            wildAnimalNews.setArea(data.getString("area"));
            wildAnimalNews.setAreaCode(data.getString("areaCode"));
            wildAnimalNews.setEmotionalScore(data.getInteger("emotionalScore"));
            wildAnimalNews.setEmotionalType(data.getString("emotionalType"));
            return wildAnimalNews.toString();
        }).addSink(streamingFileSink);
 
        env.execute("野生动物Kafka->hdfs");
    }
 
    public static void main(String[] args) throws Exception {
        Kafka2Hdfs kafka2Hdfs = new Kafka2Hdfs();
        kafka2Hdfs.startup(args);
    }
}

参考:

https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/connectors/streamfile_sink.html