User-defined Sources & Sinks

发布时间 2024-01-08 15:26:55作者: 粒子先生

Dynamic tables are the core concept of Flink’s Table & SQL API for processing both bounded and unbounded data in a unified fashion.

Because dynamic tables are only a logical concept, Flink does not own the data itself. Instead, the content of a dynamic table is stored in external systems (such as databases, key-value stores, message queues) or files.

Dynamic sources and dynamic sinks can be used to read and write data from and to an external system. In the documentation, sources and sinks are often summarized under the term connector.

Flink provides pre-defined connectors for Kafka, Hive, and different file systems. See the connector section for more information about built-in table sources and sinks.

This page focuses on how to develop a custom, user-defined connector.

动态表是Flink的Table&sqlapi的核心概念,用于统一处理有界数据和无界数据。

因为动态表只是一个逻辑概念,所以Flink本身并不拥有数据。相反,动态表的内容存储在外部系统(如数据库、键值存储、消息队列)或文件中。

动态源和动态接收器可用于从外部系统读写数据。在文档中,源和汇通常总结在术语connector下。

Flink为Kafka、Hive和不同的文件系统提供预定义的连接器。有关内置表源和接收器的详细信息,请参阅连接器部分。

本页重点介绍如何开发自定义的、用户定义的连接器。

Attention New table source and table sink interfaces have been introduced in Flink 1.11 as part of FLIP-95. Also the factory interfaces have been reworked. FLIP-95 is not fully implemented yet. Many ability interfaces are not supported yet (e.g. for filter or partition push down). If necessary, please also have a look at the old table sources and sinks page. Those interfaces are still supported for backwards compatibility.

作为FLIP-95的一部分,flink1.11中引入了新的表源和表宿接口。工厂的接口也被重新设计过。FLIP-95尚未完全实现。许多功能接口尚不受支持(例如,用于过滤器或分区下推)。如有必要,还请查看旧的表源和汇页。这些接口仍然支持向后兼容。

Overview

In many cases, implementers don’t need to create a new connector from scratch but would like to slightly modify existing connectors or hook into the existing stack. In other cases, implementers would like to create specialized connectors.

This section helps for both kinds of use cases. It explains the general architecture of table connectors from pure declaration in the API to runtime code that will be executed on the cluster.

The filled arrows show how objects are transformed to other objects from one stage to the next stage during the translation process.

在许多情况下,实现者不需要从头开始创建新的连接器,而是希望稍微修改现有的连接器或钩住现有的堆栈。在其他情况下,实现者希望创建专门的连接器。

本节对这两种用例都有帮助。它解释了表连接器的一般架构,从API中的纯声明到将在集群上执行的运行时代码。

填充箭头显示在转换过程中对象如何从一个阶段转换到下一个阶段的其他对象。

Metadata

Both Table API and SQL are declarative APIs. This includes the declaration of tables. Thus, executing a CREATE TABLE statement results in updated metadata in the target catalog.

For most catalog implementations, physical data in the external system is not modified for such an operation. Connector-specific dependencies don’t have to be present in the classpath yet. The options declared in the WITH clause are neither validated nor otherwise interpreted.

The metadata for dynamic tables (created via DDL or provided by the catalog) is represented as instances of CatalogTable. A table name will be resolved into a CatalogTable internally when necessary.

表API和SQL都是声明性API。这包括表的声明。因此,执行createtable语句会导致目标目录中的元数据更新。

对于大多数目录实现,外部系统中的物理数据不会针对此类操作进行修改。特定于连接器的依赖项不必出现在类路径中。否则,子句中的nor既不与选项一起声明,也不进行解释。

动态表的元数据(通过DDL创建或由目录提供)表示为CatalogTable的实例。必要时,表名将在内部解析为CatalogTable。

Planning

When it comes to planning and optimization of the table program, a CatalogTable needs to be resolved into a DynamicTableSource (for reading in a SELECT query) and DynamicTableSink (for writing in an INSERT INTO statement).

DynamicTableSourceFactory and DynamicTableSinkFactory provide connector-specific logic for translating the metadata of a CatalogTable into instances of DynamicTableSource and DynamicTableSink. In most of the cases, a factory’s purpose is to validate options (such as 'port' = '5022' in the example), configure encoding/decoding formats (if required), and create a parameterized instance of the table connector.

By default, instances of DynamicTableSourceFactory and DynamicTableSinkFactory are discovered using Java’s Service Provider Interfaces (SPI). The connector option (such as 'connector' = 'custom' in the example) must correspond to a valid factory identifier.

Although it might not be apparent in the class naming, DynamicTableSource and DynamicTableSink can also be seen as stateful factories that eventually produce concrete runtime implementation for reading/writing the actual data.

The planner uses the source and sink instances to perform connector-specific bidirectional comunication until an optimal logical plan could be found. Depending on the optionally declared ability interfaces (e.g. SupportsProjectionPushDown or SupportsOverwrite), the planner might apply changes to an instance and thus mutate the produced runtime implementation.

在计划和优化表程序时,需要将CatalogTable解析为DynamicTableSource(用于在SELECT查询中读取)和DynamicTableSink(用于写入INSERT into语句)。

DynamicTableSourceFactory和DynamicTableSinkFactory提供连接器特定的逻辑,用于将CatalogTable的元数据转换为DynamicTableSource和DynamicTableSink的实例。在大多数情况下,工厂的目的是验证选项(例如示例中的“port”=“5022”),配置编码/解码格式(如果需要),并创建表连接器的参数化实例。

默认情况下,DynamicTableSourceFactory和DynamicTableSinkFactory的实例是使用Java的服务提供者接口(SPI)发现的。连接器选项(例如示例中的“connector”=“custom”)必须与有效的工厂标识符相对应。

尽管在类命名中可能不明显,但DynamicTableSource和DynamicTableSink也可以看作是有状态的工厂,它们最终生成具体的运行时实现,用于读/写实际数据。

规划器使用源和接收器实例执行连接器特定的双向通信,直到找到最佳逻辑计划。根据可选声明的功能接口(例如SupportsProjectionPushDown或SupportsOverwrite),规划器可能会将更改应用于实例,从而更改生成的运行时实现。

Runtime

Once the logical planning is complete, the planner will obtain the runtime implementation from the table connector. Runtime logic is implemented in Flink’s core connector interfaces such as InputFormat or SourceFunction.

Those interfaces are grouped by another level of abstraction as subclasses of ScanRuntimeProviderLookupRuntimeProvider, and SinkRuntimeProvider.

For example, both OutputFormatProvider (providing org.apache.flink.api.common.io.OutputFormat) and SinkFunctionProvider (providing org.apache.flink.streaming.api.functions.sink.SinkFunction) are concrete instances of SinkRuntimeProvider that the planner can handle.

逻辑规划完成后,规划器将从表连接器获取运行时实现。运行时逻辑是在Flink的核心连接器接口(如InputFormat或SourceFunction)中实现的。

这些接口按另一个抽象级别分组为ScanRuntimeProvider、LookupRuntimeProvider和SinkRuntimeProvider的子类。

例如,两个OutputFormatProvider(提供org.apache.flink网站.api.common.io接口.OutputFormat)和SinkFunctionProvider(提供org.apache.flink网站.streaming.api.函数.sink.sink函数)是计划者可以处理的SinkRuntimeProvider的具体实例。

 Back to top

Extension Points

This section explains the available interfaces for extending Flink’s table connectors.

Dynamic Table Factories

Dynamic table factories are used to configure a dynamic table connector for an external storage system from catalog and session information.

org.apache.flink.table.factories.DynamicTableSourceFactory can be implemented to construct a DynamicTableSource.

org.apache.flink.table.factories.DynamicTableSinkFactory can be implemented to construct a DynamicTableSink.

By default, the factory is discovered using the value of the connector option as the factory identifier and Java’s Service Provider Interface.

In JAR files, references to new implementations can be added to the service file:

META-INF/services/org.apache.flink.table.factories.Factory

The framework will check for a single matching factory that is uniquely identified by factory identifier and requested base class (e.g. DynamicTableSourceFactory).

The factory discovery process can be bypassed by the catalog implementation if necessary. For this, a catalog needs to return an instance that implements the requested base class in org.apache.flink.table.catalog.Catalog#getFactory.

动态表工厂用于根据目录和会话信息为外部存储系统配置动态表连接器。

org.apache.flink.表.Factorys.DynamicTableSourceFactory可以实现以构造动态源。

org.apache.flink.表:工厂.动态沉降工厂可以实现以构造动态接收器。

默认情况下,工厂是使用连接器选项的值作为工厂标识符和Java的服务提供者接口发现的。

在JAR文件中,可以将对新实现的引用添加到服务文件中:

META-INF/服务/org.apache.flink.表工厂工厂

框架将检查由工厂标识符和请求的基类唯一标识的单个匹配工厂(例如DynamicTableSourceFactory)。

如果需要,目录实现可以绕过工厂发现过程。为此,目录需要返回一个实例,该实例实现了中请求的基类org.apache.flink.表.catalog.catalog\getFactory.

Dynamic Table Source

By definition, a dynamic table can change over time.

When reading a dynamic table, the content can either be considered as:

  • A changelog (finite or infinite) for which all changes are consumed continuously until the changelog is exhausted. This is represented by the ScanTableSource interface.
  • A continuously changing or very large external table whose content is usually never read entirely but queried for individual values when necessary. This is represented by the LookupTableSource interface.

A class can implement both of these interfaces at the same time. The planner decides about their usage depending on the specified query.

根据定义,动态表可以随时间变化。

在读取动态表时,可以将内容视为:

一种变更日志(有限的或无限的),它的所有变更都被连续地使用,直到变更日志用完为止。这由ScanTableSource接口表示。

一种不断变化的或非常大的外部表,其内容通常不会被完全读取,而是在必要时查询单个值。这由LookupTableSource接口表示。

一个类可以同时实现这两个接口。计划者根据指定的查询来决定它们的用途。

Scan Table Source

ScanTableSource scans all rows from an external storage system during runtime.

The scanned rows don’t have to contain only insertions but can also contain updates and deletions. Thus, the table source can be used to read a (finite or infinite) changelog. The returned changelog mode indicates the set of changes that the planner can expect during runtime.

For regular batch scenarios, the source can emit a bounded stream of insert-only rows.

For regular streaming scenarios, the source can emit an unbounded stream of insert-only rows.

For change data capture (CDC) scenarios, the source can emit bounded or unbounded streams with insert, update, and delete rows.

A table source can implement further abilitiy interfaces such as SupportsProjectionPushDown that might mutate an instance during planning. All abilities are listed in the org.apache.flink.table.connector.source.abilities package and in the documentation of org.apache.flink.table.connector.source.ScanTableSource.

The runtime implementation of a ScanTableSource must produce internal data structures. Thus, records must be emitted as org.apache.flink.table.data.RowData. The framework provides runtime converters such that a source can still work on common data structures and perform a conversion at the end.

ScanTableSource在运行时扫描外部存储系统中的所有行。

扫描的行不必只包含插入,还可以包含更新和删除。因此,可以使用表源来读取(有限或无限)changelog。返回的changelog模式表示计划器在运行时可以期望的一组更改。

对于常规的批处理方案,源可以发出一个只插入行的有界流。

对于常规流式处理方案,源可以发出一个无边界的仅插入行的流。

对于更改数据捕获(CDC)方案,源可以发出带有insert、update和delete行的有界或无边界流。

表源可以实现更进一步的功能接口,例如在计划期间可能会改变实例的supportsprrojectpushdown。所有能力都列在org.apache.flink网站.表.connector.source能力包和文档中org.apache.flink网站.表.connector.source.ScanTableSource。

ScanTableSource的运行时实现必须生成内部数据结构。因此,记录必须作为org.apache.flink网站.table.data.RowData表. 框架提供了运行时转换器,这样源代码仍然可以在公共数据结构上工作,并在最后执行转换。

Lookup Table Source

LookupTableSource looks up rows of an external storage system by one or more keys during runtime.

Compared to ScanTableSource, the source does not have to read the entire table and can lazily fetch individual values from a (possibly continuously changing) external table when necessary.

Compared to ScanTableSource, a LookupTableSource does only support emitting insert-only changes currently.

Further abilities are not supported. See the documentation of org.apache.flink.table.connector.source.LookupTableSource for more information.

The runtime implementation of a LookupTableSource is a TableFunction or AsyncTableFunction. The function will be called with values for the given lookup keys during runtime.

LookupTableSource在运行时按一个或多个键查找外部存储系统的行。

与ScanTableSource相比,该源不必读取整个表,并且可以在必要时从外部表(可能是不断变化的)延迟地获取单个值。

与ScanTableSource相比,LookupTableSource目前只支持发出insert-only更改。

不支持其他能力。参见的文档org.apache.flink网站.表.connector.source。有关详细信息,请参阅TableSource。

LookupTableSource的运行时实现是TableFunction或AsyncTableFunction。函数将在运行时使用给定查找键的值来调用。

Dynamic Table Sink

By definition, a dynamic table can change over time.

When writing a dynamic table, the content can always be considered as a changelog (finite or infinite) for which all changes are written out continuously until the changelog is exhausted. The returned changelog mode indicates the set of changes that the sink accepts during runtime.

For regular batch scenarios, the sink can solely accept insert-only rows and write out bounded streams.

For regular streaming scenarios, the sink can solely accept insert-only rows and can write out unbounded streams.

For change data capture (CDC) scenarios, the sink can write out bounded or unbounded streams with insert, update, and delete rows.

A table sink can implement further abilitiy interfaces such as SupportsOverwrite that might mutate an instance during planning. All abilities are listed in the org.apache.flink.table.connector.sink.abilities package and in the documentation of org.apache.flink.table.connector.sink.DynamicTableSink.

The runtime implementation of a DynamicTableSink must consume internal data structures. Thus, records must be accepted as org.apache.flink.table.data.RowData. The framework provides runtime converters such that a sink can still work on common data structures and perform a conversion at the beginning.

根据定义,动态表可以随时间变化。

在编写动态表时,内容总是可以被视为一个变更日志(有限的或无限的),在变更日志用完之前,所有的变更都会被连续地写出。返回的changelog模式指示接收器在运行时接受的一组更改。

对于常规批处理方案,接收器只能接受只插入的行和写出有界流。

对于常规的流式处理方案,接收器只能接受只插入的行,并且可以写出无边界的流。

对于更改数据捕获(CDC)场景,接收器可以使用insert、update和delete行写出有界或无边界的流。

表接收器可以实现进一步的功能性接口,例如可能在计划过程中改变实例的SupportsOverwrite。所有能力都列在org.apache.flink网站.table.connector.sink表能力包和文档中org.apache.flink网站.table.connector.sink表.DynamicTableSink。

DynamicTableSink的运行时实现必须使用内部数据结构。因此,记录必须被接受为org.apache.flink网站.table.data.RowData表. 框架提供了运行时转换器,这样sink仍然可以在公共数据结构上工作并在开始时执行转换。

Encoding / Decoding Formats

Some table connectors accept different formats that encode and decode keys and/or values.

Formats work similar to the pattern DynamicTableSourceFactory -> DynamicTableSource -> ScanRuntimeProvider, where the factory is responsible for translating options and the source is responsible for creating runtime logic.

Because formats might be located in different modules, they are discovered using Java’s Service Provider Interface similar to table factories. In order to discover a format factory, the dynamic table factory searches for a factory that corresponds to a factory identifier and connector-specific base class.

For example, the Kafka table source requires a DeserializationSchema as runtime interface for a decoding format. Therefore, the Kafka table source factory uses the value of the value.format option to discover a DeserializationFormatFactory.

The following format factories are currently supported:

org.apache.flink.table.factories.DeserializationFormatFactory
org.apache.flink.table.factories.SerializationFormatFactory

The format factory translates the options into an EncodingFormat or a DecodingFormat. Those interfaces are another kind of factory that produce specialized format runtime logic for the given data type.

For example, for a Kafka table source factory, the DeserializationFormatFactory would return an EncodingFormat<DeserializationSchema> that can be passed into the Kafka table source.

某些表连接器接受对键和/或值进行编码和解码的不同格式。

格式的工作方式类似于DynamicTableSourceFactory->DynamicTableSource->ScanRuntimeProvider,其中factory负责转换选项,而source负责创建运行时逻辑。

因为格式可能位于不同的模块中,因此使用类似于表工厂的Java服务提供者接口来发现它们。在一个工厂和一个特定于工厂的类中查找一个工厂特定的标识符。

例如,Kafka表源需要一个反序列化模式作为解码格式的运行时接口。因此,Kafka表源工厂使用值.格式用于发现反序列化FormatFactory的选项。

当前支持以下格式工厂:

org.apache.flink网站.table.Factorys.DeserializationFormatFactory

org.apache.flink网站.table.Factorys.SerializationFormatFactory

格式工厂将选项转换为EncodingFormat或DecodingFormat。这些接口是为给定数据类型生成专用格式运行时逻辑的另一种工厂。

例如,对于Kafka表源工厂,反序列化formatfactory将返回一个EncodingFormat<DeserializationSchema>,它可以传递到Kafka表源中。

 Back to top

Full Stack Example

This section sketches how to implement a scan table source with a decoding format that supports changelog semantics. The example illustrates how all of the mentioned components play together. It can serve as a reference implementation.

In particular, it shows how to

  • create factories that parse and validate options,
  • implement table connectors,
  • implement and discover custom formats,
  • and use provided utilities such as data structure converters and the FactoryUtil.

The table source uses a simple single-threaded SourceFunction to open a socket that listens for incoming bytes. The raw bytes are decoded into rows by a pluggable format. The format expects a changelog flag as the first column.

本节概述了如何使用支持changelog语义的解码格式实现扫描表源代码。这个例子说明了所有提到的组件是如何一起运行的。它可以作为参考实现。

尤其是如何

创建解析和验证选项的工厂,

执行表连接器,

实现和发现自定义格式,

并使用提供的实用程序,如数据结构转换器和FactoryUtil。

表source使用一个简单的单线程SourceFunction打开一个侦听传入字节的套接字。原始字节通过可插入格式解码成行。格式要求将changelog标志作为第一列。

 

We will use most of the interfaces metioned above to enable the following DDL:

CREATE TABLE UserScores (name STRING, score INT)
WITH (
  'connector' = 'socket',
  'hostname' = 'localhost',
  'port' = '9999',
  'byte-delimiter' = '10',
  'format' = 'changelog-csv',
  'changelog-csv.column-delimiter' = '|'
);

Because the format supports changelog semantics, we are able to ingest updates during runtime and create an updating view that can continuously evaluate changing data:

SELECT name, SUM(score) FROM UserScores GROUP BY name;

Use the following command to ingest data in a terminal:

> nc -lk 9999
INSERT|Alice|12
INSERT|Bob|5
DELETE|Alice|12
INSERT|Alice|18

Factories

This section illustrates how to translate metadata coming from the catalog to concrete connector instances.

Both factories have been added to the META-INF/services directory.

SocketDynamicTableFactory

The SocketDynamicTableFactory translates the catalog table to a table source. Because the table source requires a decoding format, we are discovering the format using the provided FactoryUtil for convenience.

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;

public class SocketDynamicTableFactory implements DynamicTableSourceFactory {

  // define all options statically
  public static final ConfigOption<String> HOSTNAME = ConfigOptions.key("hostname")
    .stringType()
    .noDefaultValue();

  public static final ConfigOption<Integer> PORT = ConfigOptions.key("port")
    .intType()
    .noDefaultValue();

  public static final ConfigOption<Integer> BYTE_DELIMITER = ConfigOptions.key("byte-delimiter")
    .intType()
    .defaultValue(10); // corresponds to '\n'

  @Override
  public String factoryIdentifier() {
    return "socket"; // used for matching to `connector = '...'`
  }

  @Override
  public Set<ConfigOption<?>> requiredOptions() {
    final Set<ConfigOption<?>> options = new HashSet<>();
    options.add(HOSTNAME);
    options.add(PORT);
    options.add(FactoryUtil.FORMAT); // use pre-defined option for format
    return options;
  }

  @Override
  public Set<ConfigOption<?>> optionalOptions() {
    final Set<ConfigOption<?>> options = new HashSet<>();
    options.add(BYTE_DELIMITER);
    return options;
  }

  @Override
  public DynamicTableSource createDynamicTableSource(Context context) {
    // either implement your custom validation logic here ...
    // or use the provided helper utility
    final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);

    // discover a suitable decoding format
    final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
      DeserializationFormatFactory.class,
      FactoryUtil.FORMAT);

    // validate all options
    helper.validate();

    // get the validated options
    final ReadableConfig options = helper.getOptions();
    final String hostname = options.get(HOSTNAME);
    final int port = options.get(PORT);
    final byte byteDelimiter = (byte) (int) options.get(BYTE_DELIMITER);

    // derive the produced data type (excluding computed columns) from the catalog table
    final DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();

    // create and return dynamic table source
    return new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType);
  }
}

ChangelogCsvFormatFactory

The ChangelogCsvFormatFactory translates format-specific options to a format. The FactoryUtil in SocketDynamicTableFactory takes care of adapting the option keys accordingly and handles the prefixing like changelog-csv.column-delimiter.

Because this factory implements DeserializationFormatFactory, it could also be used for other connectors that support deserialization formats such as the Kafka connector.

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;

public class ChangelogCsvFormatFactory implements DeserializationFormatFactory {

  // define all options statically
  public static final ConfigOption<String> COLUMN_DELIMITER = ConfigOptions.key("column-delimiter")
    .stringType()
    .defaultValue("|");

  @Override
  public String factoryIdentifier() {
    return "changelog-csv";
  }

  @Override
  public Set<ConfigOption<?>> requiredOptions() {
    return Collections.emptySet();
  }

  @Override
  public Set<ConfigOption<?>> optionalOptions() {
    final Set<ConfigOption<?>> options = new HashSet<>();
    options.add(COLUMN_DELIMITER);
    return options;
  }

  @Override
  public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
      DynamicTableFactory.Context context,
      ReadableConfig formatOptions) {
    // either implement your custom validation logic here ...
    // or use the provided helper method
    FactoryUtil.validateFactoryOptions(this, formatOptions);

    // get the validated options
    final String columnDelimiter = formatOptions.get(COLUMN_DELIMITER);

    // create and return the format
    return new ChangelogCsvFormat(columnDelimiter);
  }
}

Table Source and Decoding Format

This section illustrates how to translate from instances of the planning layer to runtime instances that are shipped to the cluster.

SocketDynamicTableSource

The SocketDynamicTableSource is used during planning. In our example, we don’t implement any of the available ability interfaces. Therefore, the main logic can be found in getScanRuntimeProvider(...) where we instantiate the required SourceFunction and its DeserializationSchema for runtime. Both instances are parameterized to return internal data structures (i.e. RowData).

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;

public class SocketDynamicTableSource implements ScanTableSource {

  private final String hostname;
  private final int port;
  private final byte byteDelimiter;
  private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
  private final DataType producedDataType;

  public SocketDynamicTableSource(
      String hostname,
      int port,
      byte byteDelimiter,
      DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
      DataType producedDataType) {
    this.hostname = hostname;
    this.port = port;
    this.byteDelimiter = byteDelimiter;
    this.decodingFormat = decodingFormat;
    this.producedDataType = producedDataType;
  }

  @Override
  public ChangelogMode getChangelogMode() {
    // in our example the format decides about the changelog mode
    // but it could also be the source itself
    return decodingFormat.getChangelogMode();
  }

  @Override
  public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {

    // create runtime classes that are shipped to the cluster

    final DeserializationSchema<RowData> deserializer = decodingFormat.createRuntimeDecoder(
      runtimeProviderContext,
      producedDataType);

    final SourceFunction<RowData> sourceFunction = new SocketSourceFunction(
      hostname,
      port,
      byteDelimiter,
      deserializer);

    return SourceFunctionProvider.of(sourceFunction, false);
  }

  @Override
  public DynamicTableSource copy() {
    return new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType);
  }

  @Override
  public String asSummaryString() {
    return "Socket Table Source";
  }
}

 

ChangelogCsvFormat

The ChangelogCsvFormat is a decoding format that uses a DeserializationSchema during runtime. It supports emitting INSERT and DELETE changes.

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.DynamicTableSource.DataStructureConverter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.RowKind;

public class ChangelogCsvFormat implements DecodingFormat<DeserializationSchema<RowData>> {

  private final String columnDelimiter;

  public ChangelogCsvFormat(String columnDelimiter) {
    this.columnDelimiter = columnDelimiter;
  }

  @Override
  @SuppressWarnings("unchecked")
  public DeserializationSchema<RowData> createRuntimeDecoder(
      DynamicTableSource.Context context,
      DataType producedDataType) {
    // create type information for the DeserializationSchema
    final TypeInformation<RowData> producedTypeInfo = (TypeInformation<RowData>) context.createTypeInformation(
      producedDataType);

    // most of the code in DeserializationSchema will not work on internal data structures
    // create a converter for conversion at the end
    final DataStructureConverter converter = context.createDataStructureConverter(producedDataType);

    // use logical types during runtime for parsing
    final List<LogicalType> parsingTypes = producedDataType.getLogicalType().getChildren();

    // create runtime class
    return new ChangelogCsvDeserializer(parsingTypes, converter, producedTypeInfo, columnDelimiter);
  }

  @Override
  public ChangelogMode getChangelogMode() {
    // define that this format can produce INSERT and DELETE rows
    return ChangelogMode.newBuilder()
      .addContainedKind(RowKind.INSERT)
      .addContainedKind(RowKind.DELETE)
      .build();
  }
}

 

Runtime

For completeness, this section illustrates the runtime logic for both SourceFunction and DeserializationSchema.

ChangelogCsvDeserializer

The ChangelogCsvDeserializer contains a simple parsing logic for converting bytes into Row of Integer and String with a row kind. The final conversion step converts those into internal data structures.

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.connector.RuntimeConverter.Context;
import org.apache.flink.table.connector.source.DynamicTableSource.DataStructureConverter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

public class ChangelogCsvDeserializer implements DeserializationSchema<RowData> {

  private final List<LogicalType> parsingTypes;
  private final DataStructureConverter converter;
  private final TypeInformation<RowData> producedTypeInfo;
  private final String columnDelimiter;

  public ChangelogCsvDeserializer(
      List<LogicalType> parsingTypes,
      DataStructureConverter converter,
      TypeInformation<RowData> producedTypeInfo,
      String columnDelimiter) {
    this.parsingTypes = parsingTypes;
    this.converter = converter;
    this.producedTypeInfo = producedTypeInfo;
    this.columnDelimiter = columnDelimiter;
  }

  @Override
  public TypeInformation<RowData> getProducedType() {
    // return the type information required by Flink's core interfaces
    return producedTypeInfo;
  }

  @Override
  public void open(InitializationContext context) {
    // converters must be open
    converter.open(Context.create(ChangelogCsvDeserializer.class.getClassLoader()));
  }

  @Override
  public RowData deserialize(byte[] message) {
    // parse the columns including a changelog flag
    final String[] columns = new String(message).split(Pattern.quote(columnDelimiter));
    final RowKind kind = RowKind.valueOf(columns[0]);
    final Row row = new Row(kind, parsingTypes.size());
    for (int i = 0; i < parsingTypes.size(); i++) {
      row.setField(i, parse(parsingTypes.get(i).getTypeRoot(), columns[i + 1]));
    }
    // convert to internal data structure
    return (RowData) converter.toInternal(row);
  }

  private static Object parse(LogicalTypeRoot root, String value) {
    switch (root) {
      case INTEGER:
        return Integer.parseInt(value);
      case VARCHAR:
        return value;
      default:
        throw new IllegalArgumentException();
    }
  }

  @Override
  public boolean isEndOfStream(RowDatanextElement){
    return false;
  }
}

 

SocketSourceFunction

The SocketSourceFunction opens a socket and consumes bytes. It splits records by the given byte delimiter (\n by default) and delegates the decoding to a pluggable DeserializationSchema. The source function can only work with a parallelism of 1.

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.data.RowData;

public class SocketSourceFunction extends RichSourceFunction<RowData> implements ResultTypeQueryable<RowData> {

  private final String hostname;
  private final int port;
  private final byte byteDelimiter;
  private final DeserializationSchema<RowData> deserializer;

  private volatile boolean isRunning = true;
  private Socket currentSocket;

  public SocketSourceFunction(String hostname, int port, byte byteDelimiter, DeserializationSchema<RowData> deserializer) {
    this.hostname = hostname;
    this.port = port;
    this.byteDelimiter = byteDelimiter;
    this.deserializer = deserializer;
  }

  @Override
  public TypeInformation<RowData> getProducedType() {
    return deserializer.getProducedType();
  }

  @Override
  public void open(Configuration parameters) throws Exception {
    deserializer.open(() -> getRuntimeContext().getMetricGroup());
  }

  @Override
  public void run(SourceContext<RowData> ctx) throws Exception {
    while (isRunning) {
      // open and consume from socket
      try (final Socket socket = new Socket()) {
        currentSocket = socket;
        socket.connect(new InetSocketAddress(hostname, port), 0);
        try (InputStream stream = socket.getInputStream()) {
          ByteArrayOutputStream buffer = new ByteArrayOutputStream();
          int b;
          while ((b = stream.read()) >= 0) {
            // buffer until delimiter
            if (b != byteDelimiter) {
              buffer.write(b);
            }
            // decode and emit record
            else {
              ctx.collect(deserializer.deserialize(buffer.toByteArray()));
              buffer.reset();
            }
          }
        }
      } catch (Throwable t) {
        t.printStackTrace(); // print and continue
      }
      Thread.sleep(1000);
    }
  }

  @Override
  public void cancel() {
    isRunning = false;
    try {
      currentSocket.close();
    } catch (Throwable t) {
      // ignore
    }
  }
}

 

 Back to top

想参与贡献翻译?