【源码阅读】5. Broker Load 导入任务的执行流程

发布时间 2023-06-25 18:58:20作者: xutao_ustc
load_stmt ::=
    KW_LOAD KW_LABEL job_label:label
    LPAREN data_desc_list:dataDescList RPAREN
    opt_broker:broker
    opt_properties:properties
    {:
        RESULT = new LoadStmt(label, dataDescList, broker, properties);
    :}
    | KW_LOAD KW_LABEL job_label:label
    LPAREN data_desc_list:dataDescList RPAREN
    resource_desc:resource
    opt_properties:properties
    {:
        RESULT = new LoadStmt(label, dataDescList, resource, properties);
  :};
    
  
data_desc_list ::=
    data_desc:desc
    {:
        RESULT = Lists.newArrayList(desc);
    :}
    | data_desc_list:list COMMA data_desc:desc
    {:
        list.add(desc);
        RESULT = list;
    :}
    ;      
data_desc ::=
    opt_merge_type:mergeType
    KW_DATA KW_INFILE LPAREN string_list:files RPAREN
    opt_negative:isNeg
    KW_INTO KW_TABLE ident:tableName
    opt_partition_names:partitionNames
    opt_field_term:colSep                          # COLUMNS TERMINATED BY
    opt_file_format:fileFormat                     # FORMAT AS
    opt_col_list:colList                           # 列顺序 
    opt_columns_from_path:columnsFromPath          # COLUMNS FROM PATH AS  
    opt_col_mapping_list:colMappingList            # SET (column_mapping)
    pre_filter_clause:preFilterExpr                # PRECEDING FILTER
    where_clause:whereExpr                         # WHERE predicate
    delete_on_clause:deleteExpr                    # DELETE ON
    sequence_col_clause:sequenceColName            # ORDER BY
    opt_properties:properties
    {:
        RESULT = new DataDescription(tableName, partitionNames, files, colList, colSep, fileFormat,
        columnsFromPath, isNeg, colMappingList, preFilterExpr, whereExpr, mergeType, deleteExpr, sequenceColName, properties);
    :}
    | opt_merge_type:mergeType KW_DATA KW_FROM KW_TABLE ident:srcTableName
    opt_negative:isNeg
    KW_INTO KW_TABLE ident:tableName
    opt_partition_names:partitionNames
    opt_col_mapping_list:colMappingList
    where_clause:whereExpr
    delete_on_clause:deleteExpr
    opt_properties:properties
    {:
        RESULT = new DataDescription(tableName, partitionNames, srcTableName, isNeg, colMappingList, whereExpr,
        mergeType, deleteExpr, properties);
    :}
    ;    
    
// Broker或者Resource对应Broker系和SparkLoad系    
        opt_broker ::=
            {:
                RESULT = null;
            :}
            | KW_WITH KW_S3 LPAREN key_value_map:properties RPAREN
            {:
                RESULT = new BrokerDesc("S3", StorageBackend.StorageType.S3, properties);
            :}
            | KW_WITH KW_HDFS LPAREN key_value_map:properties RPAREN
            {:
                RESULT = new BrokerDesc("HDFS", StorageBackend.StorageType.HDFS, properties);
            :}
            | KW_WITH KW_LOCAL LPAREN key_value_map:properties RPAREN
            {:
                RESULT = new BrokerDesc("LOCAL", StorageBackend.StorageType.LOCAL, properties);
            :}
            | KW_WITH KW_BROKER ident_or_text:name
            {:
                RESULT = new BrokerDesc(name, null);
            :}
            | KW_WITH KW_BROKER ident_or_text:name LPAREN key_value_map:properties RPAREN
            {:
                RESULT = new BrokerDesc(name, properties);
            :}
        ;
            
    resource_desc ::=                                                               
    KW_WITH KW_RESOURCE ident_or_text:resourceName                                 
    {:                                                                               
        RESULT = new ResourceDesc(resourceName, null);                               
    :}                                                                               
    | KW_WITH KW_RESOURCE ident_or_text:resourceName LPAREN key_value_map:properties RPAREN
    {:                                                                               
        RESULT = new ResourceDesc(resourceName, properties);                         
    :}                                                                               
            ; 

 

 

LOAD语法分为四段:

● KW_LOAD KW_LABEL job_label:label

● data_desc_list - 语法核心映射,可定义多个,以逗号分隔

● opt_broker / resource_desc - 同时定义了BrokerLoad和SparkLoad的设置

○ opt_broker - WITH xxx可以是Broker或者是HDFS,S3,最终都转成BrokerDesc (BrokerLoad专用

○ resource_desc - Resource,也可额外加一些属性(SparkLoad专用

● opt_properties - PROPERTIES (xxx)

 

Load系通用流程

BrokerLoadJob定义与提交

Load作业的执行是异步的,首先会生成LoadJob(BrokerLoadJob或SparkLoadJob),然后提交到LoadJobScheduler中,具体:

● BulkLoadJob.fromLoadStmt生成BrokerLoadJob

○ 生成BrokerLoadJob,并设置Broker信息

○ 设置opt_properties信息

○ 设置data_desc_list信息

● 加入到内存映射中

● 提交到LoadJobScheduler中

 

BrokerLoadJob执行,生成BrokerLoadPendingTask

新建BrokerLoadPendingTask,放到pendingLoadTaskScheduler中。

Env中有两个相关属性MasterTaskExecutor pendingLoadTaskScheduler和MasterTaskExecutor loadingLoadTaskScheduler,都是线程池。

 

BrokerLoadPendingTask执行,生成LoadLoadingTask

分两步:

● BrokerLoadPendingTask执行 - 查询要导入的文件列表

● 执行完成,回调给BrokerLoadJob

○ 创建LoadLoadingTask

○ 初始化LoadLoadingTask

■ 创建LoadingTaskPlanner并执行plan

○ 发送LoadLoadingTask给执行器

LoadingTaskPlanner.plan

 

 

 

LoadLoadingTask执行

分两步:

● LoadLoadingTask执行 - 真正导入文件

● 执行完成后回调给BrokerLoadJob,更新状态

 

 

 

Load管理

几个关键对象Load, LoadJob, LoadManager

 

 

异步任务

从大了看,Doris的异步任务称为作业,结构如下: