Apache Doris 使用指南

发布时间 2023-09-27 09:17:14作者: LittleDonkey

欢迎参观我的博客,一个Vue 与 SpringBoot结合的产物:https://poetize.cn

原文链接:https://poetize.cn/article?id=30

Apache Doris 中文官网

注意

  • 修改和删除只支持在 Unique Key 模型上。
  • 目前的删除是支持 Flink CDC 的方式接入数据实现自动删除,如果是其他数据接入的方式删除需要自己实现。
  • 目前Doris Source是有界流,不支持CDC方式读取。

Maven

<dependency>
    <groupId>org.apache.doris</groupId>
    <artifactId>flink-doris-connector-1.14_2.12</artifactId>
    <version>1.1.1</version>
</dependency>

SQL

CREATE TABLE sink (
    id STRING,
    name STRING
) WITH (
    'connector' = 'doris',
    'fenodes' = 'ip:8030',
    'table.identifier' = 'db.table',
    'username' = 'root',
    'password' = '',
    'sink.properties.format' = 'json',
    'sink.enable-delete' = 'true',
    'sink.properties.read_json_by_line' = 'true',
    'sink.label-prefix' = 'doris-label'
)

DataStream

/**
 * Source
 */
DorisOptions.Builder builder = DorisOptions.builder()
    .setFenodes("ip:8030")
    .setTableIdentifier("db.table")
    .setUsername("root")
    .setPassword("");

DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder()
    .setDorisOptions(builder.build())
    .setDorisReadOptions(DorisReadOptions.builder().build())
    .setDeserializer(new SimpleListDeserializationSchema())
    .build();

env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "Doris Source")
    .print();


/**
 * Sink
 */
DataStreamSource<String> mySQLSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");

DorisOptions dorisOptions = DorisOptions.builder()
    .setFenodes("ip:8030")
    .setTableIdentifier("db.table")
    .setUsername("root")
    .setPassword("")
    .build();

Properties props = new Properties();
props.setProperty("format", "json");
props.setProperty("read_json_by_line", "true");

DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label-doris")
    .setStreamLoadProp(props)
    .setDeletable(true);

DorisSink.Builder<String> builder = DorisSink.builder();
builder.setDorisReadOptions(DorisReadOptions.builder().build())
    .setDorisExecutionOptions(executionBuilder.build())
    .setDorisOptions(dorisOptions)
    .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build());

mySQLSource.sinkTo(builder.build());


/**
 * RowData序列化
 */
builder.setSerializer(RowDataSerializer.builder()
    .setFieldNames(new String[]{"id", "name"})
    .setType("json")
    .setFieldType(new DataType[]{DataTypes.STRING(), DataTypes.STRING()}).build());

Sink 参数配置

  • SQL 使用WITH参数sink.properties.配置。
  • DataStream 使用方法DorisExecutionOptions.builder().setStreamLoadProp(Properties)配置。

应用场景

  • 使用Flink Doris Connector最适合的场景就是实时/批次同步源数据(Mysql,Oracle,PostgreSQL等)到Doris。
  • 使用Flink对Doris中的数据和其他数据源进行联合分析,也可以使用Flink Doris Connector。

Flink写入Uniq模型时,保证一批数据的有序性

可以添加sequence列配置来保证,较大值可以替换较小值(sequence列目前只支持Uniq模型)。

使用语法:Sequence列建表时有两种方式,一种是建表时设置sequence_col属性,一种是建表时设置sequence_type属性。

设置sequence_col(推荐)

创建Uniq表时,指定sequence列到表中其他column的映射,可以为整型和时间类型(DATE、DATETIME),创建后不能更改该列的类型

PROPERTIES (
    "function_column.sequence_col" = 'column_name'
);

设置sequence_type

创建Uniq表时,指定sequence列类型,可以为整型和时间类型(DATE、DATETIME)

PROPERTIES (
    "function_column.sequence_type" = 'Date'
);

导入时需要指定sequence列到其他列的映射。