Flink Sql On Hive

发布时间 2024-01-09 09:37:28作者: 粒子先生

TableEnvironment

// **********************
// FLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
// or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);
 
// ******************
// FLINK BATCH QUERY
// ******************
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
 
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
 
// **********************
// BLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
 
// ******************
// BLINK BATCH QUERY
// ******************
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
 
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);

表到流的转换

  • Append-only 流: 数据只增加不修改,select insert 等。
  • Retract 流:数据增加和删除,group by 和 join等。
  • Upsert 流:待调查。
DataStream<Tuple2<Boolean, JoinTestModel>> dataStream = stEnv.toRetractStream(wordWithCount, JoinTestModel.class);
DataStream<JoinTestModel> dataStream = stEnv.toAppendStream(wordWithCount, JoinTestModel.class);

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/streaming/dynamic_tables.html

自定义三种类型TableSink

AppendStreamTableSink

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
 
public class MyAppendStreamTableSink implements AppendStreamTableSink<Row> {
 
    @Override
    public TableSink<Row> configure(String[] strings, TypeInformation<?>[] typeInformations) {
        return null;
    }
 
    @Override
    public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
        return null;
    }
}

RetractStreamTableSink

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sinks.RetractStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
 
public class MyRetractStreamTableSink implements RetractStreamTableSink<Row> {
 
    private TableSchema tableSchema;
 
    public MyRetractStreamTableSink(String[] fieldNames, TypeInformation[] typeInformations) {
        this.tableSchema = new TableSchema(fieldNames, typeInformations);
    }
 
    public MyRetractStreamTableSink(String[] fieldNames, DataType[] dataTypes) {
        this.tableSchema = TableSchema.builder().fields(fieldNames, dataTypes).build();
    }
 
    @Override
    public TableSchema getTableSchema() {
        return tableSchema;
    }
 
    @Override
    public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
        return dataStream.addSink(new SinkFunction<Tuple2<Boolean, Row>>() {
            @Override
            public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
                //自定义Sink
                // f0==true :插入新数据
                // f0==false:删除旧数据
                if (value.f0) {
                    //可以写入MySQL、Kafka或者发HttpPost...根据具体情况开发
                    System.out.println(value.f1);
                }
            }
        });
    }
 
    @Override
    public TypeInformation<Row> getRecordType() {
        return new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames());
    }
 
    @Override
    public TableSink<Tuple2<Boolean, Row>> configure(String[] strings, TypeInformation<?>[] typeInformations) {
        return null;
    }
 
}

UpsertStreamTableSink

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sinks.UpsertStreamTableSink;
import org.apache.flink.types.Row;
 
public class MyUpsertStreamTableSink implements UpsertStreamTableSink<Row> {
 
    @Override
    public void setKeyFields(String[] strings) {
 
    }
 
    @Override
    public void setIsAppendOnly(Boolean aBoolean) {
 
    }
 
    @Override
    public TypeInformation<Row> getRecordType() {
        return null;
    }
 
 
    @Override
    public TableSink<Tuple2<Boolean, Row>> configure(String[] strings, TypeInformation<?>[] typeInformations) {
        return null;
    }
 
    @Override
    public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
        return null;
    }
}

Hive Streaming Read

注意2个点:
如果没有这2个设置我们查询hive表数据会发现每次查询一次任务就完结了,加了下面的参数之后,任务会一直执行,不停的执行。

tEnv.getConfig().getConfiguration().setBoolean("table.dynamic-table-options.enabled",true);
/*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-07-14') */
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.hive.HiveCatalog;
 
/**
 * Hive streaming read
 *
 * @author libin
 * @date 2020/8/28 13:45
 */
public class HiveStreamingDynamicRead {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(10000);
//        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //构建EnvironmentSettings 并指定Blink Planner
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        //构建StreamTableEnvironment
        StreamTableEnvironment stEnv = StreamTableEnvironment.create(env, bsSettings);
 
        String name = "myhive";
        String defaultDatabase = "default";
        String hiveConfDir = "C:\\work\\projects\\hiveconf1"; // a local path
        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
        //todo 注册一个
        stEnv.registerCatalog("myhive", hive);
        stEnv.useCatalog("myhive");
        stEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        stEnv.useDatabase("default");
        stEnv.getConfig().getConfiguration().setBoolean(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true);
 
        String createDbSql = "SELECT * FROM sql_test_02 /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */";
        TableResult result = stEnv.executeSql(createDbSql);
 
        result.print();
        env.execute();
    }
}

Hive Streaming Write