27-DataX

发布时间 2023-08-07 23:50:08作者: tree6x7

1. DataX 概述

DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(Mysql、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等各种异构数据源之间稳定高效的数据同步功能。

DataX 侧重于同步数据库中的数据。DataX 没有所谓版本号,只有这一个开源的版本(开源的有些功能受限),商业版的组件名叫 DataWorks。

DataX 目前已经有了比较全面的插件体系,主流的 RDBMS 数据库、NoSQL、大数据计算系统都已经接入,支持如下:

DataX 和 Sqoop 对比:

功能 DataX Sqoop
运行模式 单进程多线程 MR
分布式 不支持,可以通过调度系统规避 支持
流控 有流控功能 需要定制
统计信息 已有一些统计,上报需定制 没有,分布式的数据收集不方便
数据校验 在core部分有校验功能 没有,分布式的数据收集不方便
监控 需要定制 需要定制

2. 架构原理

2.1 设计理念

异构数据源同步问题,就是不同框架之间同步数据时,相同的数据在不同框架中具有不同的数据结构。

DataX 将复杂的网状的同步链路变成了星型数据链路,DataX 作为中间传输载体负责连接数据各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到 DataX,便能跟已有的数据源做到无缝数据同步。

2.2 框架设计

DataX 本身作为离线数据同步框架,采用 Framework + Plugin 架构构建。将数据源读取和写入抽象成为 Reader/Writer 插件,纳入到整个同步框架中。

  • 【Reader】数据采集模块,负责采集数据源的数据,将数据发送给 Framework;
  • 【Writer】数据写入模块,负责不断从 Framework 取数据,并将数据写出到目的端;
  • 【Framework】主题框架,用于连接 Reader 和 Writer,作为两者的数据传输通道,并处理缓冲、流控、并发、数据转换等核心技术问题。

2.3 运行流程

DataX 支持单机多线程模式完成同步作业,下面用一个 DataX 作业生命周期的时序图,用以说明 DataX 的运行流程、核心概念以及每个概念的关系。

核心模块介绍:

模块 说明
Job DataX 完成单个数据同步的作业称之为 Job,Job 模块是单个作业的中枢管理节点,承担着数据清理、子任务切分、TaskGroup 管理等功能。
Task 由 Job 切分出来的子任务。Task 是 DataX 作业中的最小单元,每一个 Task 都会负责一部分的同步工作。
Scheduler 根据并发量重组 Task
TaskGroup 重组后的多个 Task 的集合,即 TaskGroup 任务组。负责以一定的并发量运行分配好的所有 Task,默认并发量 5。

流程介绍:

  1. DataX 接收到一个 Job 作业后,将启动一个进程来完成整个作业同步过程;
  2. Job 启动后,会根据不同数据源端的切分策略,将 Job 切分成多个小的 Task(子任务),以便并发执行;
  3. 切分多个 Task 之后,Job 会调用 Scheduler 模块根据 {配置的并发数量},将 Task 重新组合,成为 TaskGroup(任务组);
  4. 每个 Task 都是由 TaskGroup 负责启动,Task 启动后,会固定启动 Reder => Channel => Writer 的线程来完成任务的同步工作;
  5. DataX 作业运行起来之后,Job 监控并等待多个 TaskGroup 模块任务完成,等待所有的 TaskGroup 任务完成后 Job 成功退出。否则,异常退出。

2.4 调度策略

案例需求:

如果用户提交了一个 DataX 作业,并配置了总的并发度为 20,目的是对有 100 张表的 Mysql 数据源进行同步。

DataX 的调度策略思路:

  1. Job 根据数据源的切分策略,将作业切分成 100 个 Task;
  2. 根据配置的并发度是 20、每个 TaskGroup 的并发度是5,计算出需要 TaskGroup 的个数是 4 个;
  3. 由 4 个 TaskGroup 平分 100 个 Task,每个 TaskGroup 负责运行 25 个 Task。

3. DataX 部署

解压即用。先上传 DataX 的安装包到 /opt/software,然后解压到 /opt/module/ 下。

执行如下命令进行自检,出现如下内容表明安装成功。

[liujiaqi@hadoop102 datax]$ python bin/datax.py job/job.json

...
2023-08-07 05:16:55.904 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2023-08-07 05:16:45
任务结束时刻                    : 2023-08-07 05:16:55
任务总计耗时                    :                 10s
任务平均流量                    :          253.91KB/s
记录写入速度                    :          10000rec/s
读出记录总数                    :              100000
读写失败总数                    :                   0

4. DataX 使用

4.1 使用概述

a. 任务提交命令

DataX 的使用非常简单,用户仅需要根据自己同步数据的数据源和目的地的类型来选择相应的 Reader 和 Writer 插件即可,并将 Reader 和 Writer 插件的信息配置在一个 JSON 文件中,然后在执行命令时,指定配置文件提交数据同步任务即可。

[liujiaqi@hadoop102 datax]$ python bin/datax.py path/to/your/job.json

b. 配置文件格式

可以执行如下命令,查看 DataX 配置文件模板:

[liujiaqi@hadoop102 datax]$ python bin/datax.py -r mysqlreader -w hdfswriter

{
    "job": {                                                   # 最外层是一个job, job包含setting和content两部分
        "content": [                                           # content 用于配置用户的数据源和目的地
            {
                "reader": {                                    # Reader相关配置
                    "name": "mysqlreader",                     # Reader名称(不可随意命名)
                    "parameter": {                             # Reader配置参数
                        "column": [], 
                        "connection": [
                            {
                                "jdbcUrl": [], 
                                "table": []
                            }
                        ], 
                        "password": "", 
                        "username": "", 
                        "where": ""
                    }
                }, 
                "writer": {                                    # Writer相关配置
                    "name": "hdfswriter",                      # Writer 名称(不可随意命名)
                    "parameter": {                             # Writer 配置参数
                        "column": [], 
                        "compress": "", 
                        "defaultFS": "", 
                        "fieldDelimiter": "", 
                        "fileName": "", 
                        "fileType": "", 
                        "path": "", 
                        "writeMode": ""
                    }
                }
            }
        ], 
        "setting": {                                           # setting用于对整个job进行配置(限速等)
            "speed": {
                "channel": ""
            }
        }
    }
}

4.2 同步Mysql到HDFS

需求:同步 gmall 库中的 base_province 表数据中 id>3 的数据到 HDFS

分析:

  • 插件选择
    • MysqlReader
    • HdfsWriter
  • 模式分类
    • TableMode:使用 table、column、where 等属性声明需要同步的数据
    • QuerySQLMode:后者使用一条 SQL 查询语句声明需要同步的数据

插件介绍:

MysqlReader HdfsWriter
说明 实现了从 Mysql 读取数据。在底层实现上,MysqlReader 通过 JDBC 连接远程 Mysql,并执行相应的 SQL 将数据从 Mysql 中 select 出来。 提供向 HDFS 文件系统指定路径中写入 TextFile 和 OrcFile 类型的文件,文件内容可与 Hive 表相关联。
原理 MysqlReader 通过 JDBC 连接器连接到远程的 Mysql,并根据用户配置的信息生成查询语句发送到远程 Mysql,并将该 SQL 执行返回结果使用 DataX 自定义的数据类型拼装为抽象的数据集,并传递给下游 Writer 处理。TableMode 模式中根据用户配置的 table、column、where 等信息生成 SQL 发送到 Mysql;QuerysqlMode 模式则是直接将用户配置的 querySQL 发送到 Mysql 中。 首先根据用户指定的 path,创建一个 HDFS 文件系统上的不存在的临时目录,创建规则是:path_随机。然后将读取的文件写入到这个临时目录中,待到全部写入后,再将这个临时目录下的文件移动到用户所指定的目录下,(在创建文件时保证文件名不重复),最后删除临时目录。如果在中间过程中发生网络中断等情况,造成无法与 HDFS 建立连接,需要用户手动删除已经写入的文件和临时目录。

a. TableMode

(1)在 job 目录下创建配置文件 base_province_tableMode.json

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "column": ["id","name","region_id","area_code","iso_code","iso_3166_2"],
            "connection": [
              {
                "jdbcUrl": ["jdbc:mysql://hadoop103:3306/gmall"],
                "table": ["base_province"]
              }
            ],
            "password": "root",
            "username": "root",
            "where": "id>=3",
            "splitPk": ""
          }
        },
        "writer": {
          "name": "hdfswriter",
          "parameter": {
            "column": [
              {"name": "id","type": "bigint"},
              {"name": "name","type": "string"},
              {"name": "region_id","type": "string"},
              {"name": "area_code","type": "string"},
              {"name": "iso_code","type": "string"},
              {"name": "iso_3166_2","type": "string"}
            ],
            "compress": "gzip",
            "defaultFS": "hdfs://hadoop102:8020",
            "fieldDelimiter": "\t",
            "fileName": "base_province",
            "fileType": "text",
            "path": "/base_province",
            "writeMode": "append"
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": "1"
      }
    }
  }
}

(2)在 Hive 中创建表

【null 存储】HDFS Writer 并未提供 nullFormat 参数,也就是用户并不能自定义 null 值写到 HDFS 文件中的存储格式。默认情况下,HDFS Writer 会将 null 值存储为空字符串(''),而 Hive 默认的 null 值存储格式为 \N。所以后期将 DataX 同步的文件导入 Hive 表就会出现问题。

【解决方案】

  1. 修改 DataX HDFS Writer 的源码,增加自定义 null 值存储格式的逻辑,可参考 https://blog.csdn.net/u010834071/article/details/105506580
  2. 在 Hive 中建表时指定 null 值存储格式为空字符串('')

【建表语句】

CREATE EXTERNAL TABLE base_province
(
    `id`         STRING COMMENT '编号',
    `name`       STRING COMMENT '省份名称',
    `region_id`  STRING COMMENT '地区ID',
    `area_code`  STRING COMMENT '地区编码',
    `iso_code`   STRING COMMENT '旧版ISO-3166-2编码,供可视化使用',
    `iso_3166_2` STRING COMMENT '新版IOS-3166-2编码,供可视化使用'
) COMMENT '省份表'
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
    NULL DEFINED AS ''
    LOCATION '/base_province/';

(3)提交任务

# a. 在 HDFS 创建 /base_province 目录
[liujiaqi@hadoop102 job]$ hadoop fs -mkdir /base_province
# b. 执行 DataX 同步命令
[liujiaqi@hadoop102 datax]$ python bin/datax.py job/base_province_tableMode.json

执行可能会报错:

2023-08-06 11:01:21.033 [job-0] WARN  DBUtil - test connection of [jdbc:mysql://hadoop103:3306/gmall] failed, for Code:[DBUtilErrorCode-10], Description:[连接数据库失败. 请检查您的 账号、密码、数据库名称、IP、Port或者向 DBA 寻求帮助(注意网络环境).].  -  具体错误信息为:com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Could not create connection to database server..
2023-08-06 11:01:21.037 [job-0] ERROR RetryUtil - Exception when calling callable, 异常Msg:DataX无法连接对应的数据库,可能原因是:1) 配置的ip/port/database/jdbc错误,无法连接。2) 配置的username/password错误,鉴权失败。请和DBA确认该数据库的连接信息是否正确。
java.lang.Exception: DataX无法连接对应的数据库,可能原因是:1) 配置的ip/port/database/jdbc错误,无法连接。2) 配置的username/password错误,鉴权失败。请和DBA确认该数据库的连接信息是否正确。
    at com.alibaba.datax.plugin.rdbms.util.DBUtil$2.call(DBUtil.java:71) ~[plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na]
    ...

排查了提示信息列举的配置信息,都没有问题。最后发现问题出在 DataX 提供的 Java 连接 Mysql 的驱动包太老了(我装的 Mysql 版本是 8.0,DataX 提供的驱动是 mysql-connector-java-5.1.34.jar),解决办法就是用数据库相应版本的驱动包替换这个老版本的就行。

此时再重新跑 Job,就能顺利执行了。

【补充】在 Mysql 的 base_province 表中新增一条记录(除了 id 字段外其余都 null),然后再重新同步,可以发现 HDFS Writer 会将 null 值存储为空字符串。

b. QuerySQLMode

(1)在 job 目录下创建配置文件 base_province_querysqlmode.json

{
  "job": {
    "setting": {
      "speed": {
        "channel":1
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "root",
            "connection": [
              {
                "querySql": [
                  "select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where id >=3;"
                ],
                "jdbcUrl": [
                  "jdbc:mysql://hadoop103:3306/gmall"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "hdfswriter",
          "parameter": {
            "column": [
              {"name": "id","type": "bigint"},
              {"name": "name","type": "string"},
              {"name": "region_id","type": "string"},
              {"name": "area_code","type": "string"},
              {"name": "iso_code","type": "string"},
              {"name": "iso_3166_2","type": "string"}
            ],
            "compress": "gzip",
            "defaultFS": "hdfs://hadoop102:8020",
            "fieldDelimiter": "\t",
            "fileName": "base_province",
            "fileType": "text",
            "path": "/base_province",
            "writeMode": "append"
          }
        }
      }
    ]
  }
}

querySql:在某些业务场景下,where 这个配置项不足以描述所筛选的条件,用户可以通过该配置来自定义筛选 SQL。当用户配置该项后,DataX 系统就会忽略 Table、column 这些配置项,直接使用这个配置项的内容对数据进行筛选,例如需要进行多表 JOIN 后同步数据的使用场景。

(2)提交任务

# a. 在 HDFS 创建 /base_province 目录
[liujiaqi@hadoop102 job]$ hadoop fs -mkdir /base_province
# b. 执行 DataX 同步命令
[liujiaqi@hadoop102 datax]$ python bin/datax.py job/base_province_tableMode.json

查看执行情况:

4.3 同步HDFS到Mysql

需求:同步 DHFS 上的 base_province 目录下的数据到 Mysql 中 gmall 库的 test_province 表

插件选择:

(1)HdfsReader

  • 提供了读取 HDFS 数据存储的能力。
  • 在底层实现上,HdfsReader 获取分布式文件系统上文件的数据,并转换为 DataX 传输协议传递给 Writer,支持 textfile、orcfile、rcfile、sequencefile。

(2)MysqlWriter

  • 实现了写入数据到 Mysql 目的表的功能。
  • 在底层实现上,MysqlWriter 通过 JDBC 连接远程的 Mysql,并执行相应的 inster into replace into 语句,将数据写入到 Mysql,内部会分批次提交入库,需要数据库本身采用 InnoDB 引擎。
  • MysqlWriter 通过 DataX 获取 Reader 生成的协议数据,根据你配置的 writeMode 生成相应的插入模式。
    • insert:当主键/唯一性索引冲突时,不会写入冲突的行;
    • replace :当主键/唯一性索引不冲突时,与 insert 行为一致,当遇见冲突时,会更新替换原有的行(字段)。
    • update:当主键/唯一性索引不冲突时,与insert行为一致,当遇见冲突时,会更新更新原有的行(字段)。

案例说明:

(1)在 datax/job 目录下编写配置文件 base_province_hdfs2mysql.json

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1
      }
    },
    "content": [
      {
        "reader": {
          "name": "hdfsreader",
          "parameter": {
            "path": "/base_province",
            "defaultFS": "hdfs://hadoop102:8020",
            "column": [
              "*"
            ],
            "fileType": "text",
            "compress": "gzip",
            "encoding": "UTF-8",
            "nullFormat": "",
            "fieldDelimiter": "\t"
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "writeMode": "replace",
            "username": "root",
            "password": "root",
            "column": [
              "id",
              "name",
              "region_id",
              "area_code",
              "iso_code",
              "iso_3166_2"
            ],
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://hadoop103:3306/gmall?useUnicode=true&characterEncoding=utf-8",
                "table": [
                  "test_province"
                ]
              }
            ]
          }
        }
      }
    ]
  }
}

reader.nullFormat 值是 Hive 表用什么来表示 null 值的,默认这里填 \\N,但是刚刚做同步 Mysql 到 HDFS 案例时候说过:HDFS Writer 会将 null 值存储为空字符串。所以这里的reader.nullFormat 值给个空串。

(2)在 Mysql 中创建 gmall.test_province 表

create table test_province like base_province;

(3)提交执行同步任务

[liujiaqi@hadoop102 datax]$ python bin/datax.py job/base_province_hdfs2mysql.json

这里同样有 mysql 驱动版本过低问题:

4.4 DataX 传参

在生产环境中,离线数据同步任务需要每日定时重复执行,故 HDFS 上的目标路径通常会包含一层日期,用来对每日同步的数据加以分区,也就是说每日同步数据的目标路径不是固定的,因此 DataX 配置文件中的 DHFSWriter 插件中参数 path 的值应该是动态变化的。为实现这个业务需求,我们需要使用 DataX 的传参功能。

【用法】在任务的 JSON 配置文件中使用 ${param} 引用参数,然后在提交任务时使用 -p "-Dparam=value" 传入参数值。

案例说明:

(1)修改配置文件

...
"writer": {
  "name": "hdfswriter",
  "parameter": {
    "column": [
      {"name": "id","type": "bigint"},
      {"name": "name","type": "string"},
      {"name": "region_id","type": "string"},
      {"name": "area_code","type": "string"},
      {"name": "iso_code","type": "string"},
      {"name": "iso_3166_2","type": "string"}
    ],
    "compress": "gzip",
    "defaultFS": "hdfs://hadoop102:8020",
    "fieldDelimiter": "\t",
    "fileName": "base_province",
    "fileType": "text",
    "path": "/base_province/${date}",
    "writeMode": "append"
  }
}
...

(2)在 HDFS 创建目录

[liujiaqi@hadoop102 datax]$ hadoop fs -mkdir /base_province/2022_03_18

(3)进到 DataX 目录下,执行命令启动同步任务

[liujiaqi@hadoop102 datax]$ python bin/datax.py -p "-Ddate=2022_03_18" job/base_province_querysqlmode.json

5. DataX 优化

5.1 速度控制

DataX 中提供了包括通道(并发)、记录流、字节流三种流控模式,可以根据需要控制你的作业速度,让你的作业在数据库可以承受的范围内达到最佳的同步速度。

参数 说明 注意事项
job.setting.speed.channel 并发数
job.setting.speed.record 总 record 限速 配置此参数,则必须配置单个 channel 的 record 限速参数
job.setting.speed.byte 总 byte 限速 配置此参数,则必须配置单个 channel 的 byte 限速参数
core.transport.channel.speed.record 单个 channel 的 record 限速,默认1w条/s
core.transport.channel.speed.byte 单个 channel 的 byte 限速,默认1M/s

(1)提升每个 channel 的速度

在 DataX 内部对每个 channel 会有严格的速度控制,分两种:一种是控制每秒同步的记录数,另外一种是每秒同步的字节数,默认的速度限制是 1MB/s,可以根据具体硬件情况设置这个 byte 速度或者 record 速度,一般设置 byte 速度,比如:我们可以把单个 channel 的速度上限配置为 5MB。

(2)提升 Job 内 channel 并发数

并发数 = taskGroup 的数量 * 每个 TaskGroup 并发执行的 Task 数(默认为5)

提升 Job 内 channel 并发有三种配置方式:

  1. 配置全局 Byte 限速以及单 channel Byte 限速
    # Channel 个数 = 全局 Byte 限速 / 单 Channel Byte 限速
    
    - core.transport.channel.speed.byte=1048576
    - job.setting.speed.byte=5242880
    
    # Channel 个数 = 5242880 / 1048576 = 5
    
  2. 配置全局 Record 限速以及单 channel Record 限速
    # Channel 个数 = 全局 Record 限速 / 单 Channel Record 限速
    
    - core.transport.channel.speed.record=100
    - job.setting.speed.record=500
    
    # Channel 个数 = 500 / 100 = 5
    
  3. 只有上面两种均未设置,直接配置 channel 并发数才生效
    - job.setting.speed.channel=5
    
  4. 若同时配置 1、2 两种,则实际 channel 并发数是通过计算得到的。

5.2 内存调整

当提升 Job 内的 Channel 并发数时,内存的占用会明显增加,因为 DataX 作为数据交换通道,在内存中会缓存较多的数据,例如:Channel 中会有一个 Buffer 作为临时的数据交换缓冲区,而在 Reader 和 Writer 中也会有一些 Buffer。所以为了防止 OOM 等错误,需要适当调大 JVM 堆内存。

修改方式:

  1. 修改 datax.py 脚本
  2. 启动时使用参数:python bin/datax.py --jvm="-Xms8G -Xmx8G" /path/to/your/job.json