同步工具-SeaTunnel使用

发布时间 2024-01-02 09:56:41作者: 黑水滴

一、介绍

SeaTunnel 是一个非常好用、超高性能的分布式数据集成平台,架构于 Apache Spark 和 Apache Flink 之上,实现了海量数据的实时同步与转换。每天可以稳定高效地同步数百亿数据,目前已接近百家公司在生产上使用

依赖环境:Spark3.2.1、FLink

运行服务器:test01(用户bdg_app)

运行路径:/home/bdg_app/wang/seatunnel

官网:https://seatunnel.apache.org/docs/2.3.3/about

 

二、SeaTunnel环境配置

1、服务配置

(1)下载地址

https://seatunnel.apache.org/docs/2.3.3/start-v2/locally/deployment

(2)修改配置,选择需要的连接器 config/plugin_config

connector-fake
connector-console
connector-cdc-mysql
connector-clickhouse
connector-hive
connector-iceberg
connector-jdbc
connector-kafka

(3)安装连接器

sh bin/install-plugin.sh 2.3.3

(4)下载mysql驱动,放入SeaTunnel目录

https://seatunnel.apache.org/docs/2.3.3/connector-v2/source/Mysql

放入 $SEATNUNNEL_HOME/plugins/jdbc/lib/

(5)Spark版本切换

SeaTunnel可以支持spark2和3提交任务,可以通过替换spark包方式切换spark版本。从其它服务器拷贝过来的spark包需要修改配置才可以

环境变量配置地址: /etc/profile

Spark地址:/apps/srv/service/spark

第一步:先把spark目录换名,mv spark spark-bak2.4.1-20231108

第二步:把新版spark换名称,mv spark-3.2.1 spark

(6)旧版waterdrop同步命令

cd /home/bdg_app/wangshida/waterdrop/2281/test_usql_log

dw用户:sh /apps/scripts/waterdrop-2.0.1/bin/start-waterdrop-spark.sh --master yarn --deploy-mode cluster --config execute.job --queue defalt

非dw用户:sudo su - bdg_app -c 'cd /apps/srv/instance/test-nezha-executor.baijiahulian.com/bin/executions/11652781/2281/test_usql_log && export HADOOP_USER_NAME=bdg_app && source ./stage.env && sh /apps/scripts/waterdrop-2.0.1/bin/start-waterdrop-spark.sh --master yarn --deploy-mode cluster --config execute.job --queue defalt

 

三、配置使用案例

1、随机生成数据,输出到控制台。提交到yarn运行

sh /apps/scripts/seatunnel-2.3.3/bin/start-seatunnel-spark-3-connector-v2.sh --master yarn --deploy-mode cluster --config ./rand_console.job --queue defalt

配置内容:

env {
  execution.parallelism = 1
  job.mode = "BATCH"
}
source {
  FakeSource {
    result_table_name = "fake"
    row.num = 16
    schema = {
      fields {
        name = "string"
        age = "int"
      }
    }
  }
}
transform {
  FieldMapper {
    source_table_name = "fake"
    result_table_name = "fake1"
    field_mapper = {
      age = age
      name = new_name
    }
  }
}
 

2、随机生成数据,输出到控制台。本地运行。配置同上

sh /apps/scripts/seatunnel-2.3.3/bin/start-seatunnel-spark-3-connector-v2.sh --master local --deploy-mode client --config ./rand_console.job

3、mysql到控制台,提交到yarn运行

sh /apps/scripts/seatunnel-2.3.3/bin/start-seatunnel-spark-3-connector-v2.sh --master yarn --deploy-mode cluster --config ./mysql_console.job --queue sailing

配置内容:

env {
  execution.parallelism = 1
  job.mode = "BATCH"
}
source {
  Jdbc {
    driver = "com.mysql.cj.jdbc.Driver" 
    connection_check_timeout_sec = 100
    url = "jdbc:mysql://127:3306/usql?zeroDateTimeBehavior=convertToNull&useServerPrepStmts=false&rewriteBatchedStatements=true&useUnicode=true&characterEncoding=utf8&tinyInt1isBit=false&serverTimezone=Asia/Shanghai"
    user = ""
    password = ""
    query="select * from test_wang1"
  }
}
transform {
}
sink {
  Console {}
}
 

4、hive到控制台-因goosfs暂不支持

sh /apps/scripts/seatunnel-2.3.3/bin/start-seatunnel-spark-3-connector-v2.sh --master yarn --deploy-mode cluster --config ./hive_console.job --queue sailing

env {
  parallelism = 1
  job.name="test_hive_source_to_hive"
  spark.kerberos.principal="bdg_app@EMR-"
  spark.kerberos.keytab="./bdg_app.keytab"
}
source {
  Hive {
    table_name = "bdg_app.ods_other_test_usql_usql_log_df"
    metastore_uri = "thrift://127:7004"
  }
}
sink {
    Console {
    }
}
 

5、mysql 到hive-因goosfs暂不支持

sh /apps/scripts/seatunnel-2.3.3/bin/start-seatunnel-spark-3-connector-v2.sh --master yarn --deploy-mode cluster --config ./mysql_hive.job --queue sailing

env {
  parallelism = 1
  job.name="test_hive_source_to_hive"
  spark.kerberos.principal="bdg_app@EMR-QX9XE1IH"
  spark.kerberos.keytab="./bdg_app.keytab"
}
source {
  Hive {
    table_name = "bdg_app.ods_other_test_usql_usql_log_df"
    metastore_uri = "thrift://127:7004"
  }
}
sink {
    Console {
    }
}
 

6、iceberg到控制台

sh /apps/scripts/seatunnel-2.3.3/bin/start-seatunnel-spark-2-connector-v2.sh --master yarn --deploy-mode cluster --config ./iceberg_console.job --queue sailing

env {
  parallelism = 1
  job.name="test_iceberg_source_to_hive"
  spark.kerberos.principal="bdg_app@EMR-"
  spark.kerberos.keytab="./bdg_app.keytab"
}
source {
  Iceberg {
    catalog_name = "spark_catalog"
    catalog_type = "hive"
    uri = "thrift://127:7004"
    warehouse = "hdfs://usr/hive/warehouse"
    namespace = "bdg_app"
    table = "test2"
  }
}
sink {
    Console {
    }
}
 

7、因旧版支持同步到iceberg,测试seatunnel 2.1.3版本mysql到iceberg

sh /home/bdg_app/wangshida/upload/apache-seatunnel-incubating-2.1.3/bin/start-seatunnel-spark.sh --master yarn --deploy-mode cluster --config ./mysql_iceberg.job