seata

发布时间 2023-05-29 09:39:50作者: 当幸福来敲门-wufuqin

seata

1. 官网、GitHub

  1. 官网:https://seata.io/zh-cn/docs/overview/what-is-seata.html

  2. github:https://github.com/seata/seata

2. 下载seata

  1. 在官网中版本说明: https://github.com/alibaba/spring-cloud-alibaba/wiki/版本说明 ,选择 Spring Cloud 版本的 选择匹配的 seata 版本,版本间如果不兼容服务会启动报错
  2. 下载 seata1.3.0 版本的binary类型的文件,zip压缩包 https://seata.io/zh-cn/blog/download.html

3. 安装、启动 seata

1. windows

  • 解压zip压缩包;进入seata 服务端的bin目录下,执行 seata-server.bat 脚本

    ./seata-server.bat
    

2. linux

  • 解压zip压缩包;进入seata 服务端的bin目录下,执行 seata-server.sh 脚本

    ./seata-server.sh
    

3. 支持的启动参数

参数 全写 作用 备注
-h --host 指定在注册中心注册的 IP 不指定时获取当前的 IP,外部访问部署在云环境和容器中的 server 建议指定
-p --port 指定 server 启动的端口 默认为 8091
-m --storeMode 事务日志存储方式 支持file,db,redis,默认为 file 注:redis需seata-server 1.3版本及以上
-n --serverNode 用于指定seata-server节点ID 1,2,3..., 默认为 1
-e --seataEnv 指定 seata-server 运行环境 dev, test 等, 服务启动时会使用 registry-dev.conf 这样的配置

例如: ./seata-server.sh -p 8091 -h 127.0.0.1 -m file

4. nacos 集成 seata

1. maven坐标

<!--spring boot 2.3.8-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-dependencies</artifactId>
    <version>2.3.8.RELEASE</version>
</dependency>

<!--spring cloud Hoxton.SR11-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-dependencies</artifactId>
    <version>Hoxton.SR11</version>
</dependency>

<!--seata 2.2.7-->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
    <version>2.2.7.RELEASE</version>
</dependency>

2. 数据库配置

1. 新建seata数据库

2. mysql

-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
    `xid`                       VARCHAR(128) NOT NULL,
    `transaction_id`            BIGINT,
    `status`                    TINYINT      NOT NULL,
    `application_id`            VARCHAR(32),
    `transaction_service_group` VARCHAR(32),
    `transaction_name`          VARCHAR(128),
    `timeout`                   INT,
    `begin_time`                BIGINT,
    `application_data`          VARCHAR(2000),
    `gmt_create`                DATETIME,
    `gmt_modified`              DATETIME,
    PRIMARY KEY (`xid`),
    KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
    KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
    `branch_id`         BIGINT       NOT NULL,
    `xid`               VARCHAR(128) NOT NULL,
    `transaction_id`    BIGINT,
    `resource_group_id` VARCHAR(32),
    `resource_id`       VARCHAR(256),
    `branch_type`       VARCHAR(8),
    `status`            TINYINT,
    `client_id`         VARCHAR(64),
    `application_data`  VARCHAR(2000),
    `gmt_create`        DATETIME(6),
    `gmt_modified`      DATETIME(6),
    PRIMARY KEY (`branch_id`),
    KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
    `row_key`        VARCHAR(128) NOT NULL,
    `xid`            VARCHAR(96),
    `transaction_id` BIGINT,
    `branch_id`      BIGINT       NOT NULL,
    `resource_id`    VARCHAR(256),
    `table_name`     VARCHAR(32),
    `pk`             VARCHAR(36),
    `gmt_create`     DATETIME,
    `gmt_modified`   DATETIME,
    PRIMARY KEY (`row_key`),
    KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

3. oracle

-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE global_table
(
    xid                       VARCHAR2(128) NOT NULL,
    transaction_id            NUMBER(19),
    status                    NUMBER(3)     NOT NULL,
    application_id            VARCHAR2(32),
    transaction_service_group VARCHAR2(32),
    transaction_name          VARCHAR2(128),
    timeout                   NUMBER(10),
    begin_time                NUMBER(19),
    application_data          VARCHAR2(2000),
    gmt_create                TIMESTAMP(0),
    gmt_modified              TIMESTAMP(0),
    PRIMARY KEY (xid)
);

CREATE INDEX idx_gmt_modified_status ON global_table (gmt_modified, status);
CREATE INDEX idx_transaction_id ON global_table (transaction_id);

-- the table to store BranchSession data
CREATE TABLE branch_table
(
    branch_id         NUMBER(19)    NOT NULL,
    xid               VARCHAR2(128) NOT NULL,
    transaction_id    NUMBER(19),
    resource_group_id VARCHAR2(32),
    resource_id       VARCHAR2(256),
    branch_type       VARCHAR2(8),
    status            NUMBER(3),
    client_id         VARCHAR2(64),
    application_data  VARCHAR2(2000),
    gmt_create        TIMESTAMP(6),
    gmt_modified      TIMESTAMP(6),
    PRIMARY KEY (branch_id)
);

CREATE INDEX idx_xid ON branch_table (xid);

-- the table to store lock data
CREATE TABLE lock_table
(
    row_key        VARCHAR2(128) NOT NULL,
    xid            VARCHAR2(96),
    transaction_id NUMBER(19),
    branch_id      NUMBER(19)    NOT NULL,
    resource_id    VARCHAR2(256),
    table_name     VARCHAR2(32),
    pk             VARCHAR2(36),
    gmt_create     TIMESTAMP(0),
    gmt_modified   TIMESTAMP(0),
    PRIMARY KEY (row_key)
);

CREATE INDEX idx_branch_id ON lock_table (branch_id);

4. postgresql

-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS public.global_table
(
    xid                       VARCHAR(128) NOT NULL,
    transaction_id            BIGINT,
    status                    SMALLINT     NOT NULL,
    application_id            VARCHAR(32),
    transaction_service_group VARCHAR(32),
    transaction_name          VARCHAR(128),
    timeout                   INT,
    begin_time                BIGINT,
    application_data          VARCHAR(2000),
    gmt_create                TIMESTAMP(0),
    gmt_modified              TIMESTAMP(0),
    CONSTRAINT pk_global_table PRIMARY KEY (xid)
);

CREATE INDEX idx_gmt_modified_status ON public.global_table (gmt_modified, status);
CREATE INDEX idx_transaction_id ON public.global_table (transaction_id);

-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS public.branch_table
(
    branch_id         BIGINT       NOT NULL,
    xid               VARCHAR(128) NOT NULL,
    transaction_id    BIGINT,
    resource_group_id VARCHAR(32),
    resource_id       VARCHAR(256),
    branch_type       VARCHAR(8),
    status            SMALLINT,
    client_id         VARCHAR(64),
    application_data  VARCHAR(2000),
    gmt_create        TIMESTAMP(6),
    gmt_modified      TIMESTAMP(6),
    CONSTRAINT pk_branch_table PRIMARY KEY (branch_id)
);

CREATE INDEX idx_xid ON public.branch_table (xid);

-- the table to store lock data
CREATE TABLE IF NOT EXISTS public.lock_table
(
    row_key        VARCHAR(128) NOT NULL,
    xid            VARCHAR(96),
    transaction_id BIGINT,
    branch_id      BIGINT       NOT NULL,
    resource_id    VARCHAR(256),
    table_name     VARCHAR(32),
    pk             VARCHAR(36),
    gmt_create     TIMESTAMP(0),
    gmt_modified   TIMESTAMP(0),
    CONSTRAINT pk_lock_table PRIMARY KEY (row_key)
);

CREATE INDEX idx_branch_id ON public.lock_table (branch_id);

5. 需要进行分布式事务控制的数据库需要添加 undo_log 表

  • mysql
-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(
    `branch_id`     BIGINT(20)   NOT NULL COMMENT 'branch transaction id',
    `xid`           VARCHAR(100) NOT NULL COMMENT 'global transaction id',
    `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
    `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
    `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
    `log_created`   DATETIME(6)  NOT NULL COMMENT 'create datetime',
    `log_modified`  DATETIME(6)  NOT NULL COMMENT 'modify datetime',
    UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';
  • oracle
-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE undo_log
(
    id            NUMBER(19)    NOT NULL,
    branch_id     NUMBER(19)    NOT NULL,
    xid           VARCHAR2(100) NOT NULL,
    context       VARCHAR2(128) NOT NULL,
    rollback_info BLOB          NOT NULL,
    log_status    NUMBER(10)    NOT NULL,
    log_created   TIMESTAMP(0)  NOT NULL,
    log_modified  TIMESTAMP(0)  NOT NULL,
    PRIMARY KEY (id),
    CONSTRAINT ux_undo_log UNIQUE (xid, branch_id)
);

COMMENT ON TABLE undo_log IS 'AT transaction mode undo table';

-- Generate ID using sequence and trigger
CREATE SEQUENCE UNDO_LOG_SEQ START WITH 1 INCREMENT BY 1;
  • postgresql
-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS public.undo_log
(
    id            SERIAL       NOT NULL,
    branch_id     BIGINT       NOT NULL,
    xid           VARCHAR(100) NOT NULL,
    context       VARCHAR(128) NOT NULL,
    rollback_info BYTEA        NOT NULL,
    log_status    INT          NOT NULL,
    log_created   TIMESTAMP(0) NOT NULL,
    log_modified  TIMESTAMP(0) NOT NULL,
    CONSTRAINT pk_undo_log PRIMARY KEY (id),
    CONSTRAINT ux_undo_log UNIQUE (xid, branch_id)
);

CREATE SEQUENCE IF NOT EXISTS undo_log_id_seq INCREMENT BY 1 MINVALUE 1 ;

3. seata配置

1. 配置 file.conf

  • 修改 store 块,设置之前新建的数据库

    store {
      mode = "db"    # db数据库模式
      db {
        datasource = "druid"	# 数据源
        dbType = "mysql"		# 数据库类型
        driverClassName = "com.mysql.jdbc.Driver"	# 数据库驱动
        url = "jdbc:mysql://192.168.2.240:3306/cloud_seata"	# 数据库连接
        user = "root"			# 数据库用户名
        password = "inhe@2011"	# 密码
        minConn = 5		# 最小连接数
        maxConn = 30	# 最大连接数
        globalTable = "global_table"	# 全局表
        branchTable = "branch_table"	# 分支表
        lockTable = "lock_table"		# 锁表
        queryLimit = 100				# 查询条数
      }
    }
    

2. 配置 registry.conf

  • 修改 registry 块和 config 块

    registry {
      type = "nacos"    				# 注册中心nacos
      nacos {
        application = "seata-server"	 # 注册到nacos的应用名称	
        serverAddr = "127.0.0.1:8848"	 # 注册中心的ip和端口号
        group = "SEATA_GROUP"			# 所属分组
        namespace = ""				    # 命名空间 
        cluster = "default"				# 集群
        username = ""				    # naocs用户名 
        password = ""				    # nacos密码
      }
      
    }
    
    config {
      type = "nacos"					# 配置中心naocs
      nacos {
        serverAddr = "127.0.0.1:8848"	 # naocsip和端口号	
        namespace = ""					# 命名空间
        group = "SEATA_GROUP"	         # 所属分组
        username = ""				    # naocs用户名 
        password = ""				    # nacos密码
      }
     
    }
    

5. Spring Boot 集成seata

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>

1. application.yml

  • 添加以下配置,使用nacos做配置中心可以将配置放在nacos中
seata:
  enabled: true
  # 事务协调器组(下文中resources下`file.conf`中vgroupMapping后面配置的保持一致)
  tx-service-group: test
  application-id: consume  # 应用名称
  # 默认开启数据源代理
  enable-auto-data-source-proxy: true
  
  service:
    grouplist:
      default: 192.168.2.240:8091  # seata所在服务器的ip和seata占用的端口

2. file.conf

  • 新建file.conf文件放在resource目录下,需要使用seata进行分布式事务控制springboot应用都需要
transport {
  # tcp udt unix-domain-socket
  type = "TCP"
  #NIO NATIVE
  server = "NIO"
  #enable heartbeat
  heartbeat = true
  # the client batch send request enable
  enableClientBatchSendRequest = true
  #thread factory for netty
  threadFactory {
    bossThreadPrefix = "NettyBoss"
    workerThreadPrefix = "NettyServerNIOWorker"
    serverExecutorThread-prefix = "NettyServerBizHandler"
    shareBossWorker = false
    clientSelectorThreadPrefix = "NettyClientSelector"
    clientSelectorThreadSize = 1
    clientWorkerThreadPrefix = "NettyClientWorkerThread"
    # netty boss thread size,will not be used for UDT
    bossThreadSize = 1
    #auto default pin or 8
    workerThreadSize = "default"
  }
  shutdown {
    # when destroy server, wait seconds
    wait = 3
  }
  serialization = "seata"
  compressor = "none"
}
service {
#这里注意,等号前后都是配置,前面是yml里配置的事务组,后面是register.conf里定义的seata-server
  vgroupMapping.test = "default"
  #only support when registry.type=file, please don't set multiple addresses
  seata_tc_server.grouplist = "192.168.10.142:8091"
  #degrade, current not support
  enableDegrade = false
  #disable seata
  disableGlobalTransaction = false
}

client {
  rm {
    asyncCommitBufferLimit = 10000
    lock {
      retryInterval = 10
      retryTimes = 30
      retryPolicyBranchRollbackOnConflict = true
    }
    reportRetryCount = 5
    tableMetaCheckEnable = false
    reportSuccessEnable = false
  }
  tm {
    commitRetryCount = 5
    rollbackRetryCount = 5
  }
  undo {
    dataValidation = true
    logSerialization = "jackson"
    logTable = "undo_log"
  }
  log {
    exceptionRate = 100
  }
}

注意修改:

vgroupMapping.test = "default"

test:事务组,这个的test与 springboot的配置文件中的 tx-service-group 对应的值需要保持一致

default:register.conf里定义的seata-server

3. register.conf

  • 新建register.conf文件放在resource目录下,需要使用seata进行分布式事务控制springboot应用都需要
registry {
  type = "nacos" # 注册中心类型
  nacos {
    application = "seata-server"	# 应用名称
    serverAddr = "192.168.10.142:8848"	# 注册中心ip和端口
    group = "DEV_GROUP"					# 组
    namespace = "53692cf3-f16e-4015-8844-4186555271d0"	# 名称空间
    cluster = "default"				# 集群
    username = "nacos"				# nacos登录名
    password = "inhe@naocs"			# nacos登录密码
  }
  
}

config {
  type = "nacos"  # 配置中心类型
  nacos {
    serverAddr = "192.168.10.142:8848"	# 配置中心ip和端口
    namespace = "53692cf3-f16e-4015-8844-4186555271d0"	# 名称空间
    group = "DEV_GROUP"				# 组
    username = "nacos"				# nacos登录名
    password = "inhe@naocs"			# nacos登录密码
  }
 
}

6. 使用示例

  • 在需要控制分布式事务的方法上添加注解:@GlobalTransactional

  • 需要注意:必须是通过远程调用的方式才可以控制,通过mq发送消息的方式无法控制。

    /**
     * 处理分布式事务
     */
    @GlobalTransactional
    public int insert(Consume consume) {
        
        // 添加数据
        String comsumeId = CodeUtil.getComsumeId();
        consume.setContent(consume.getContent());
        consume.setCreateDate(new Date());
        consume.setUpdateDate(new Date());
        int count = consumeDao.insert(consume);
    
        // 远程调用添加数据
        JSONObject json = new JSONObject();
        json.put("content", "远程调用添加数据");
        produceClient.insert(json);
    
        return count;
    }
    

7. seata高可用

  • 启动多个节点,注册到nacos注册中心集群中就实现了seata的高可用,不需额外配置