TableEnvironment
// ********************** // FLINK STREAMING QUERY // ********************** import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings); // or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings); // ****************** // FLINK BATCH QUERY // ****************** import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.table.api.bridge.java.BatchTableEnvironment; ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv); // ********************** // BLINK STREAMING QUERY // ********************** import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); // or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings); // ****************** // BLINK BATCH QUERY // ****************** import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
表到流的转换
- Append-only 流: 数据只增加不修改,select insert 等。
- Retract 流:数据增加和删除,group by 和 join等。
- Upsert 流:待调查。
DataStream<Tuple2<Boolean, JoinTestModel>> dataStream = stEnv.toRetractStream(wordWithCount, JoinTestModel.class); DataStream<JoinTestModel> dataStream = stEnv.toAppendStream(wordWithCount, JoinTestModel.class);
自定义三种类型TableSink
AppendStreamTableSink
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.table.sinks.AppendStreamTableSink; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.types.Row; public class MyAppendStreamTableSink implements AppendStreamTableSink<Row> { @Override public TableSink<Row> configure(String[] strings, TypeInformation<?>[] typeInformations) { return null; } @Override public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) { return null; } }
RetractStreamTableSink
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.sinks.RetractStreamTableSink; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.types.DataType; import org.apache.flink.types.Row; public class MyRetractStreamTableSink implements RetractStreamTableSink<Row> { private TableSchema tableSchema; public MyRetractStreamTableSink(String[] fieldNames, TypeInformation[] typeInformations) { this.tableSchema = new TableSchema(fieldNames, typeInformations); } public MyRetractStreamTableSink(String[] fieldNames, DataType[] dataTypes) { this.tableSchema = TableSchema.builder().fields(fieldNames, dataTypes).build(); } @Override public TableSchema getTableSchema() { return tableSchema; } @Override public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) { return dataStream.addSink(new SinkFunction<Tuple2<Boolean, Row>>() { @Override public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception { //自定义Sink // f0==true :插入新数据 // f0==false:删除旧数据 if (value.f0) { //可以写入MySQL、Kafka或者发HttpPost...根据具体情况开发 System.out.println(value.f1); } } }); } @Override public TypeInformation<Row> getRecordType() { return new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames()); } @Override public TableSink<Tuple2<Boolean, Row>> configure(String[] strings, TypeInformation<?>[] typeInformations) { return null; } }
UpsertStreamTableSink
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.sinks.UpsertStreamTableSink; import org.apache.flink.types.Row; public class MyUpsertStreamTableSink implements UpsertStreamTableSink<Row> { @Override public void setKeyFields(String[] strings) { } @Override public void setIsAppendOnly(Boolean aBoolean) { } @Override public TypeInformation<Row> getRecordType() { return null; } @Override public TableSink<Tuple2<Boolean, Row>> configure(String[] strings, TypeInformation<?>[] typeInformations) { return null; } @Override public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) { return null; } }
Hive Streaming Read
注意2个点:
如果没有这2个设置我们查询hive表数据会发现每次查询一次任务就完结了,加了下面的参数之后,任务会一直执行,不停的执行。
tEnv.getConfig().getConfiguration().setBoolean("table.dynamic-table-options.enabled",true); /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-07-14') */
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.catalog.hive.HiveCatalog; /** * Hive streaming read * * @author libin * @date 2020/8/28 13:45 */ public class HiveStreamingDynamicRead { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(10000); // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //构建EnvironmentSettings 并指定Blink Planner EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); //构建StreamTableEnvironment StreamTableEnvironment stEnv = StreamTableEnvironment.create(env, bsSettings); String name = "myhive"; String defaultDatabase = "default"; String hiveConfDir = "C:\\work\\projects\\hiveconf1"; // a local path HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir); //todo 注册一个 stEnv.registerCatalog("myhive", hive); stEnv.useCatalog("myhive"); stEnv.getConfig().setSqlDialect(SqlDialect.HIVE); stEnv.useDatabase("default"); stEnv.getConfig().getConfiguration().setBoolean(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true); String createDbSql = "SELECT * FROM sql_test_02 /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */"; TableResult result = stEnv.executeSql(createDbSql); result.print(); env.execute(); } }
Hive Streaming Write
CREATE TABLE hive_table ( user_id STRING, order_amount DOUBLE ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='1 h', 'sink.partition-commit.policy.kind'='metastore,success-file' );
- SQL Server 检测是不是数字型数据的两种方法
- SQL Join的一些总结
- SQL Server 高性能写入的一些总结
- 网络攻击技术开篇——SQL Injection
- Hive常见问题
- SQL SERVER日期时间转字符串
- sql server 保留2位小数
- 2024-01-13 Can't perform a React state update on an unmounted component. This is a no-op, but it indicates a memory leak in your application. ==》引用了未使用的方法导致
- Spark On YARN架构
- mysql> GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' WITH GRANT OPTION; ERROR 1410 (42000): You are not allowed to create a user with GRANT
本栏目推荐文章