DataX介绍及应用实例

发布时间 2023-06-20 09:46:19作者: 柴高八斗

一、DataX简介

  • DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。
  • DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。
  • 为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

1.1 框架设计

 

  • DataX 本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

1.2 核心架构

https://github.com/alibaba/DataX/blob/master/introduction.md

1.3 开源地址

https://github.com/alibaba/DataX

二、安装DataX

2.1 下载地址

http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

2.2 环境安装

DataX依赖于Java和Python环境,所以需要先安装。

1、安装Java环境(查看已安装的Java版本):

具体安装请参考 https://www.cnblogs.com/fengguozhong/p/11843156.html

[root@localhost bin]# java -version
java version "1.8.0_231"
Java(TM) SE Runtime Environment (build 1.8.0_231-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.231-b11, mixed mode)

2、Python安装(查看已安装的python版本,安装过程在此不做详述):

[root@localhost opt]# python
Python 2.7.5 (default, Apr 11 2018, 07:36:10)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>>

3、安装DataX环境

直接解压压缩包(绿色解压,无需编译安装):

cd /opt  && mkdir datax
tar -zxvf datax.tar.gz

检查DataX是否安装成功,若出现如下结果表示安装成功。

执行命令python datax.py -r streamreader -w streamwriter

控制台信息内容:

[root@localhost bin]# python datax.py -r streamreader -w streamwriter

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the streamreader document:
     https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md

Please refer to the streamwriter document:
     https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md

Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader",
                    "parameter": {
                        "column": [],
                        "sliceRecordCount": ""
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "encoding": "",
                        "print": true
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}

注意事项

*注意:DataX安装完成后,需要删除文件plugin目录下的隐藏文件,这个非常重要,不删除将会影响任务的执行。(此步骤不做,将在后面做也可以)

rm -rf /opt/datax/plugin/*/._*  #删除隐藏文件

三、案例

3.1 实现从stream到stream数据读取

1、开发datax配置文件

  • 方法一:执行此命令获得帮助 python datax.py -r streamreader -w streamwriter
  • 方法二:进入/opt/datax/job目录下, 拷贝一份job.json,命名为stream2stream.json
python datax.py -r streamreader -w streamwriter #方法一
#或者
cp job.json  stream2stream.json  #方法二

stream2stream.json文件内容:

{
  "job": {
    "setting": {
      "speed": {
        "byte": 10485760
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.02
      }
    },
    "content": [{
      "reader": {
        "name": "streamreader",
        "parameter": {
          "column": [{
            "value": "DataX",
            "type": "string"
          },
            {
              "value": 19890604,
              "type": "long"
            },
            {
              "value": "1989-06-04 00:00:00",
              "type": "date"
            },
            {
              "value": true,
              "type": "bool"
            },
            {
              "value": "test",
              "type": "bytes"
            }
          ],
          "sliceRecordCount": 10
        }
      },
      "writer": {
        "name": "streamwriter",
        "parameter": {
          "print": true,
          "encoding": "UTF-8"
        }
      }
    }]
  }
}

2、启动datax打印数据,执行任务。

cd /opt/datax/bin/
./datax.py /opt/datax/job/stream2stream.json

执行过程中如遇到配置文件问题,请检查并删除plugin目录下reader 和 writer目录中的隐藏文件,详细说明见: 常见问题

3、成功执行,任务结果:

 

3.2 实现从mysql到stream数据读取

需求:从mysql数据库的表中读取数据并打印在控制台。

建库、建表、插入数据:

#创建数据库master_test_db
CREATE DATABASE /*!32312 IF NOT EXISTS*/`master_test_db` /*!40100 DEFAULT CHARACTER SET utf8 COLLATE utf8_unicode_ci */;
USE `master_test_db`;

#创建表tb_user
CREATE TABLE `tb_user` (
  `id` INT(11) NOT NULL AUTO_INCREMENT,
  `name` VARCHAR(20) COLLATE utf8_unicode_ci DEFAULT NULL,
  `age` INT(11) DEFAULT NULL,
  `address` VARCHAR(100) COLLATE utf8_unicode_ci DEFAULT NULL,
  `status` INT(2) NOT NULL DEFAULT '0',
  `city` VARCHAR(6) COLLATE utf8_unicode_ci DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=INNODB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci CHECKSUM=1 DELAY_KEY_WRITE=1 ROW_FORMAT=DYNAMIC;

#插入记录
INSERT  INTO `tb_user`(`id`,`name`,`age`,`address`,`status`,`city`) VALUES (1,'guozhong',20,'北京市石景山区鲁谷大街',0,'我的老是人是'),(2,'zhang',23,'石家庄市红旗大街42号',1,NULL),(8,'Sam',30,'山西省西安市XXXX1111',1,NULL),(9,'liuli',30,'山西省西安市',0,NULL),(10,'xxxx',30,'xxxx111',0,NULL),(11,'xxxx',30,'山西省西安市',1,NULL),(12,'Sam',30,'山西省西安市XXXX1111',0,NULL),(13,'xxxx',30,'山西省西安市',1,NULL),(14,'Sam11',30,'xxxx111',0,NULL),(15,'Sam11',30,'山西省西安市XXXX1111',0,NULL),(16,'Sam11',30,'xxxx111',0,NULL),(19,'lulu',20,'北京',0,NULL),(20,'tonton',20,'北京',0,NULL);

表记录:

1、开发datax配置文件

执行此命令获得帮助:python datax.py -r mysqlreader -w streamwriter

python datax.py -r mysqlreader -w streamwriter

控制台打印结果为如下,接下来配置该文件即可:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [],
                        "connection": [
                            {
                                "jdbcUrl": [],
                                "table": []
                            }
                        ],
                        "password": "",
                        "username": "",
                        "where": ""
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "encoding": "",
                        "print": true
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}

将配置修改完成的文件命名为 :mysql2stream.json,其文件内容为:

{
    "job": {
        "setting": {
            "speed": {
                "channel": "3"
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [{
            "reader": {
                "name": "mysqlreader",
                "parameter": {
                    "username": "testuser",
                    "password": "******",
                    "column": [
                        "id", "name"
                    ],
                    "where": "",
                    "connection": [{
                        "jdbcUrl": [
                            "jdbc:mysql://192.168.100.1:3306/master_test_db"
                        ],
                        "table": [
                            "tb_user"
                        ]
                    }]
                }
            },
            "writer": {
                "name": "streamwriter",
                "parameter": {
                    "encoding": "UTF-8",
                    "print": true
                }
            }
        }]
    }
}

2、启动datax打印数据,执行任务。

cd /opt/datax/bin/
./datax.py /opt/datax/job/mysql2stream.json

3、成功执行,任务结果:

3.3 实现从mysql到mysql数据同步

1、开发datax配置文件

执行此命令获得帮助:python datax.py -r mysqlreader -w mysqlwriter

也可以参考 datax/plugin/writer/mysqlwriter目录下的plugin_job_template.json,reader同理。

注意writer中的jdbcUrl是用"",表示只能连接一个库,而reader中的jdbcUrl是[]中括号,表示可以连接多个库。

python datax.py -r mysqlreader -w mysqlwriter

控制台打印结果为如下,接下来配置该文件即可:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", #读取端
                    "parameter": {
                        "column": [],  #需要同步的列(*表示所有的列)
                        "connection": [
                            {
                                "jdbcUrl": [], #数据库连接信息,可以多个库
                                "table": [] #表
                            }
                        ],
                        "password": "", 
                        "username": "", 
                        "where": "" #筛选条件
                    }
                },
                "writer": {
                    "name": "mysqlwriter", #写入端
                    "parameter": {
                        "column": [], #需要同步的列
                        "connection": [
                            {
                                "jdbcUrl": "", #只能有一个库,不使用[]中括号括起来
                                "table": []
                            }
                        ],
                        "password": "",
                        "preSql": [],
                        "session": [],
                        "username": "",
                        "writeMode": "" # replace,update 或 insert
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "" #并发数
            }
        }
    }
}

2、将配置修改完成的文件命名为 :mysql2mysql.json,其文件内容为:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": ["name","age","address","status","city"],
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://47.92.113.122:3306/master_test_db?useUnicode=true&characterEncoding=utf8"],
                                "table": ["tb_user"]
                            }
                        ],
                        "username": "testuser",
                        "password": "******",
                        "where": ""
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "column": ["name","age","address","status","city"],
                        "connection": [
                            {

                                "jdbcUrl": "jdbc:mysql://47.92.113.122:3306/master_test_db?useUnicode=true&characterEncoding=utf8",
                                "table": ["tb_user2"]

                            }
                        ],
                        "password": "******",
                        "preSql": [],
                        "session": [],
                        "username": "testuser",
                        "writeMode": "insert",
                        "encoding": "UTF-8",
                        "print":true
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "3"
            }
        }
    }
}
  1. 启动datax打印数据,执行任务。
cd /opt/datax/bin/
./datax.py /opt/datax/job/mysql2mysql.json

3、成功执行,任务结果:

 

四、⚠️常见问题

4.1 执行任务-报配置文件错误

一般部署完DataX之后,首次执行任务会报如下错误:

控制台详细错误内容:

[root@localhost bin]# ./datax.py /opt/datax/job/stream2stream.json

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


2023-06-14 16:29:29.709 [main] WARN  ConfigParser - 插件[streamreader,streamwriter]加载失败,1s后重试... Exception:Code:[Common-00], Describe:[您提供的配置文件存在错误信息,请检查您的作业配置 .] - 配置信息错误,您提供的配置文件[/opt/datax/plugin/reader/._drdsreader/plugin.json]不存在. 请检查您的配置文件.
2023-06-14 16:29:30.713 [main] ERROR Engine -

经DataX智能分析,该任务最可能的错误原因是:
com.alibaba.datax.common.exception.DataXException: Code:[Common-00], Describe:[您提供的配置文件存在错误信息,请检查您的作业配置 .] - 配置信息错误,您提供的配置文件[/opt/datax/plugin/reader/._drdsreader/plugin.json]不存在. 请检查您的配置文件.
        at com.alibaba.datax.common.exception.DataXException.asDataXException(DataXException.java:26)
        at com.alibaba.datax.common.util.Configuration.from(Configuration.java:95)
        at com.alibaba.datax.core.util.ConfigParser.parseOnePluginConfig(ConfigParser.java:153)
        at com.alibaba.datax.core.util.ConfigParser.parsePluginConfig(ConfigParser.java:125)
        at com.alibaba.datax.core.util.ConfigParser.parse(ConfigParser.java:63)
        at com.alibaba.datax.core.Engine.entry(Engine.java:137)
        at com.alibaba.datax.core.Engine.main(Engine.java:204)

解决方法:

分别进入 datax/plugin/reader 和datax/plugin/writer目录下,删除这些隐藏文件。

rm -rf ./._*

4.2 mysql同步数据出现乱码问题

在mysql同步完成的结果数据中,中文内容出现乱码的问题,如下:

可以通过设置job.json中 的jdbcUrl 中,数据库连接字符串中设置utf8编码:

jdbc:mysql://192.168.100.1:3306/master_test_d?useUnicode=true&characterEncoding=utf8