使用dataX进行大数据推送

发布时间 2023-12-12 11:28:51作者: 凌晨与风

  针对大数据量推送,常规的推送工具推送效率很慢,比如kettle ,上千万的数据用时太长,因此,我使用了datax进行推送,1600万用时10分钟,2800万用时20分钟。用datax推送的效率很高

  在datax安装好了之后,推送的配置文件主要是配置 json 文件,全量推送可以放置在 /datax/job  中

  但是如果是增量推送,可以放置在 /datax/bin 主要用三个文件:json文件、sh文件、最大时间记录文件

       json 文件

{
    "job": {
        "content": [
            {
                "reader": {     #数据库读取信息
                  "name": "mysqlreader",
                  "parameter": {
                    "username": "账户",
                    "password": "密码",
                    "column": [],
                    "splitPk": "",
                    "connection": [
                      {
                        "querySql": [
                          "select id, name from table_name"
                        ],
                        "jdbcUrl": [
                          "jdbc:mysql://IP:端口号/数据库名?useUnicode=true&characterEncoding=UTF8"
                        ]
                      }
                    ]
                  }
                },
                "writer": {
                    "name": "mysqlwriter",  #数据库写入信息
                    "parameter": {
                        "column": [
                            "id",
                            "name"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://IP:端口号/数据库名?useUnicode=true&characterEncoding=UTF8",
                                "table": [
                                    "table_name"
                                ]
                            }
                        ],
                        "username": "账户",
                        "password": "密码",
                        "session": [],
                        "writeMode": "insert" #写入方式,可以插入、更新等等方式
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "5"
            }
        }
    }
}

文件配置好了之后,执行以下命令进行单次执行任务

python /root/datax/bin/datax.py /root/datax/bin/XXX.json   

为了实现增量方式,那么在tale_name 可以使用 变量来代替 :{source_table}  ,同时可以用 {source_db_where}进行条件判断

 增量 sh文件 内容如下

#!/bin/sh
function exitWhenError(){
  if [ $1 -eq 1 ]
  then
      echo 'datax执行出错'$2
      exit 1
  fi
}

n=0
#circle_count:循环次数,1为执行1次循环,利用循环的意思是,每次执行一部分数据,循环之前多少次跑完,其目的是防止一次datax执行失败之后,能够继续按照现在的进度推送数据,相对来说,这种所花的时间更长
circle_count=31
#begintime='2023-10-29 18:00:00'
#endtime='2023-10-29 20:00:00'
source_table='table_name'
target_table='table_name'
column_name='extime'
echo '同步的表:'$source_table

#以下是如果是将一张表的数据按照月份分成多张表推送,可以使用一下命令进行数据表的切换,这样就可以不用每个月都需要重新修改表名,如果不需要分表就可以不用这个命令 # this_month_long
=$(date -d "$day" "+%Y%m") # this_month=${this_month_long:0:6} # last_month_long=$(date -d "$day - 1 month" "+%Y%m") # last_month=${last_month_long:0:6} # next_month_long=$(date -d "$day + 1 month" "+%Y%m") # next_month=${next_month_long:0:6} # source_table_this_month=$source_table$this_month # target_table_this_month=$target_table"_"$this_month # source_table_last_month=$source_table$last_month # target_table_last_month=$target_table"_"$last_month # source_table_next_month=$source_table$next_month # target_table_next_month=$target_table"_"$next_month # echo "this_month:"$this_month",last_month:"$last_month,"next_month:"$next_month files_path=$(cd $(dirname $0);pwd) echo 'files_path:'$files_path #取上次同步时间 RESULT_FILE=`ls $files_path/${target_table}_data_max_time` begintime=`cat $RESULT_FILE` endtime=$(date -d "$day" "+%Y-%m-%d %H:%M:%S") # 计算时间差,如果业务中时间存在时间差,比如业务在入库之后,会有一个小时或者是一段时间的时间差,可以使用以下命令进行推送 delta=$(($(date +%s -d "${endtime}")-$(date +%s -d "${begintime}"))); # 计算打印结果 diffSeconds=$delta #秒 diffMinutes=$(expr $delta / 60) #分钟 diffHours=$(expr $delta / 3600) #小时 echo “Time difference between $begintime and $endtime is $diffHours hours $diffMinutes minutes $diffSeconds seconds delta $delta” let time_interval=$delta/$circle_count+1 echo 'time_interval:'$time_interval #如果时间差大于1小时,则停止抽取 # if [ ${diffSeconds} -gt 36000 ]; then # echo echo '本次执行时间差为diffSeconds:'$diffSeconds',停止抽取数据' # exit 1 # fi #如果每次循环时间间隔小于60秒,则不进行拆分 if [ ${time_interval} -lt 60 ]; then echo '循环时间间隔小于60秒' time_interval=$delta circle_count=1 echo echo 'time_interval:'$time_interval',circle_count:'$circle_count fi while (( $n <= $circle_count-1 )) do echo ''$n'循环' let m=$n+1 echo 'm:'$m #当前循环起始秒数、终止秒数 let circle_seconds=$n*$time_interval circle_timestamp=$(date +%s -d "${begintime}") let new_begin_time=$circle_timestamp+$circle_seconds let new_end_time=$circle_timestamp+$circle_seconds+$time_interval echo ''$n'循环,''new_begin_time:'$new_begin_time',new_end_time:'$new_end_time sql_begin_time=$(date "+%Y-%m-%d %H:%M:%S" -d "@${new_begin_time}") sql_end_time=$(date "+%Y-%m-%d %H:%M:%S" -d "@${new_end_time}") echo 'sql_begin_time:'$sql_begin_time',sql_end_time:'$sql_end_time source_db_where=" where $column_name > '$sql_begin_time' and $column_name <= '$sql_end_time' and $column_name <= '$endtime' " target_db_where=" where $column_name > '$sql_begin_time' and $column_name <= '$sql_end_time' and $column_name <= '$endtime' " echo 'source_db_where:'$source_db_where echo 'target_db_where:'$target_db_where cur_time=$(date -d "$day - 0 minute" "+%Y-%m-%d %H:%M:%S") echo "cur_time:" $cur_time sed -e "s/{source_db_where}/$source_db_where/g" \ -e "s/{target_db_where}/$target_db_where/g" \ -e "s/{source_table}/$source_table_this_month/g" \ -e "s/{target_table}/$target_table_this_month/g" \ $files_path/$target_table.json > $files_path/${target_table}_tmp.json python /root/datax/bin/datax.py $files_path/${target_table}_tmp.json exitWhenError $? 1 echo "==============保存当次循环时间记录==============" echo $sql_end_time>$files_path/${target_table}_data_max_time sed -e "s/{source_db_where}/$source_db_where/g" \ -e "s/{target_db_where}/$target_db_where/g" \ -e "s/{source_table}/$source_table_last_month/g" \ -e "s/{target_table}/$target_table_last_month/g" \ $files_path/$target_table.json > $files_path/${target_table}_tmp.json python /root/datax/bin/datax.py $files_path/${target_table}_tmp.json exitWhenError $? 2 sed -e "s/{source_db_where}/$source_db_where/g" \ -e "s/{target_db_where}/$target_db_where/g" \ -e "s/{source_table}/$source_table_next_month/g" \ -e "s/{target_table}/$target_table_next_month/g" \ $files_path/$target_table.json > $files_path/${target_table}_tmp.json python /root/datax/bin/datax.py $files_path/${target_table}_tmp.json exitWhenError $? 3 (( n++ )) done echo "==============保存当前时间记录==============" echo $endtime>$files_path/${target_table}_data_max_time

最大时间记录文件

记录数据表中的最大时间

增量文件配置好了之后,使用以下命令执行

bash /root/datax/bin/XXXX.sh

 而定时调度可以通过Linux服务器的crontab定时器 进行配置

crontab -e #编辑定时调度
crontab -l #查看定时调度
*/
30 * * * * su - root -c /root/datax/bin/hive/XXX.sh #定时执行一条任务 */30 * * * * su - root -c /root/datax/bin/hive/XXX.sh > /root/datax/bin/hive/XXX.log #定时执行一条任务 ,并打印日志
*/30 * * * *  bash /root/datax/bin/XXXX.sh #多种方式都可以尝试一下
*/30 * * * * python /root/datax/bin/datax.py /root/datax/bin/XXX.json#多种方式都可以尝试一下