Paimon的写入流程

发布时间 2023-06-23 21:21:58作者: Aitozi

基于Paimon 0.5版本

写入流程的构建org.apache.paimon.flink.sink.FlinkSinkBuilder#build
算子的流向
BucketingStreamPartitioner 分区 -> RowDataStoreWriteOperator 写入 -> CommitterOperator 提交

Primary key表写入

BucketingStreamPartitioner 根据数据的bucket和partition计算应该发送的通道
RowDataStoreWriteOperator#processElement
StoreSinkWriteImpl(StoreSinkWrite)#write
org.apache.paimon.operation.AbstractFileStoreWrite#write
org.apache.paimon.operation.KeyValueFileStoreWrite#createWriter 创建writer MergeTreeWriter
org.apache.paimon.mergetree.MergeTreeWriter#write
org.apache.paimon.mergetree.SortBufferWriteBuffer#put 添加到buffer
BinaryInMemorySortBuffer/BinaryExternalSortBuffer#write 序列化后写入buffer
org.apache.paimon.mergetree.MergeTreeWriter#flushWriteBuffer 内存满后 刷写writeBuffer. 遍历buffer 应用merge函数, 并创建level 0的 file writer, 将数据写入到datafile中. 如果同时配置了Changelog producer是input,那么会将原始的数据写出到Changelog文件中

Append-only表写入

Append-only的表是没有Pk的表, 在创建表的时候就已经根据pk和write-mode参数确定了表的类型, 一般来说,没有PK的就是Append-only的表, Append-only的表意味着不处理变更流的数据

WriteMode writeMode = coreOptions.get(CoreOptions.WRITE_MODE);
if (writeMode == WriteMode.AUTO) {
    writeMode =
            tableSchema.primaryKeys().isEmpty()
                    ? WriteMode.APPEND_ONLY
                    : WriteMode.CHANGE_LOG;
    coreOptions.set(CoreOptions.WRITE_MODE, writeMode);
}
if (writeMode == WriteMode.APPEND_ONLY) {
    table = new AppendOnlyFileStoreTable(fileIO, tablePath, tableSchema);
} else {
    if (tableSchema.primaryKeys().isEmpty()) {
        // 没有PK但是设置了写入模式是Changelog类型, 会将全列作为主键, 并记录出现的次数来进行回撤
        table = new ChangelogValueCountFileStoreTable(fileIO, tablePath, tableSchema);
    } else {
        table = new ChangelogWithKeyFileStoreTable(fileIO, tablePath, tableSchema);
    }
}

org.apache.paimon.operation.AbstractFileStoreWrite#write
org.apache.paimon.operation.AppendOnlyFileStoreWrite#createWriter 创建AppendOnlyWriter
org.apache.paimon.io.RollingFileWriter#write append-only 表直接写文件了, 没有pk表中的write buffer