Seata解决分布式事务

发布时间 2023-08-31 12:44:22作者: strongmore

简介

Seata 是阿里开源的一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。

初始化数据库

创建 seata 库,初始化脚本如下

-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
    `xid`                       VARCHAR(128) NOT NULL,
    `transaction_id`            BIGINT,
    `status`                    TINYINT      NOT NULL,
    `application_id`            VARCHAR(32),
    `transaction_service_group` VARCHAR(32),
    `transaction_name`          VARCHAR(128),
    `timeout`                   INT,
    `begin_time`                BIGINT,
    `application_data`          VARCHAR(2000),
    `gmt_create`                DATETIME,
    `gmt_modified`              DATETIME,
    PRIMARY KEY (`xid`),
    KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
    KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
    `branch_id`         BIGINT       NOT NULL,
    `xid`               VARCHAR(128) NOT NULL,
    `transaction_id`    BIGINT,
    `resource_group_id` VARCHAR(32),
    `resource_id`       VARCHAR(256),
    `branch_type`       VARCHAR(8),
    `status`            TINYINT,
    `client_id`         VARCHAR(64),
    `application_data`  VARCHAR(2000),
    `gmt_create`        DATETIME(6),
    `gmt_modified`      DATETIME(6),
    PRIMARY KEY (`branch_id`),
    KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
    `row_key`        VARCHAR(128) NOT NULL,
    `xid`            VARCHAR(96),
    `transaction_id` BIGINT,
    `branch_id`      BIGINT       NOT NULL,
    `resource_id`    VARCHAR(256),
    `table_name`     VARCHAR(32),
    `pk`             VARCHAR(36),
    `gmt_create`     DATETIME,
    `gmt_modified`   DATETIME,
    PRIMARY KEY (`row_key`),
    KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

使用docker安装

docker pull seataio/seata-server:1.5.0
docker run -d --name seata-server -p 8091:8091 -p 7091:7091 seataio/seata-server:1.5.0

修改配置文件

docker exec -it seata-server sh #不能使用 /bin/bash
docker cp seata-server:/seata-server/resources/application.yml .
docker cp seata-server:/seata-server/resources/application.example.yml .

默认的配置文件 application.yml 内容如下

server:
  port: 7091

spring:
  application:
    name: seata-server

logging:
  config: classpath:logback-spring.xml
  file:
    path: ${user.home}/logs/seata
  extend:
    logstash-appender:
      destination: 127.0.0.1:4560
    kafka-appender:
      bootstrap-servers: 127.0.0.1:9092
      topic: logback_to_logstash

console:
  user:
    username: seata
    password: seata

seata:
  config:
    # support: nacos, consul, apollo, zk, etcd3
    type: file
  registry:
    # support: nacos, eureka, redis, zk, consul, etcd3, sofa
    type: file
  store:
    # support: file 、 db 、 redis
    mode: file
#  server:
#    service-port: 8091 #If not configured, the default is '${server.port} + 1000'
  security:
    secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
    tokenValidityInMilliseconds: 1800000
    ignore:
      urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.ico,/console-fe/public/**,/api/v1/auth/login# 

参考 application.example.yml,修改之后为

server:
  port: 7091

spring:
  application:
    name: seata-server

logging:
  config: classpath:logback-spring.xml
  file:
    path: ${user.home}/logs/seata
  extend:
    logstash-appender:
      destination: 127.0.0.1:4560
    kafka-appender:
      bootstrap-servers: 127.0.0.1:9092
      topic: logback_to_logstash

console:
  user:
    username: seata
    password: seata

seata:
  config:
    type: nacos
    nacos:
      server-addr: ip:8848
      namespace: d8b0df04-aa58-4a5b-b582-7d133b9e8b2c
      group: SEATA_GROUP
      username: nacos
      password: nacos
      data-id: seataServer.properties
  registry:
    type: nacos
    nacos:
      application: seata-server
      server-addr: ip:8848
      group: SEATA_GROUP
      namespace: d8b0df04-aa58-4a5b-b582-7d133b9e8b2c
      cluster: default
      username: nacos
      password: nacos
  store:
    mode: db
    db:
      datasource: druid
      db-type: mysql
      driver-class-name: com.mysql.cj.jdbc.Driver
      url: jdbc:mysql://ip:port/seata?rewriteBatchedStatements=true&serverTimezone=UTC
      user: root
      password: xxx
      min-conn: 5
      max-conn: 100
      global-table: global_table
      branch-table: branch_table
      lock-table: lock_table
      distributed-lock-table: distributed_lock
      query-limit: 100
      max-wait: 5000
  security:
    secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
    tokenValidityInMilliseconds: 1800000
    ignore:
      urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.ico,/console-fe/public/**,/api/v1/auth/login# 

使用nacos当做配置中心和注册中心,使用数据库存储。配置MySQL8.0所需要的jar

wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar
docker cp application.yml seata-server:/seata-server/resources/
docker cp mysql-connector-java-8.0.19.jar seata-server:/seata-server/libs/
docker restart seata-server

查看nacos服务列表,可以看到seata-server服务,说明启动成功。

SpringBoot多项目使用seata(HttpClient调用)

参与分布式的数据库要创建 undo_log 表

CREATE TABLE undo_log
(
    id            BIGINT(20)   NOT NULL AUTO_INCREMENT,
    branch_id     BIGINT(20)   NOT NULL,
    xid           VARCHAR(100) NOT NULL,
    context       VARCHAR(128) NOT NULL,
    rollback_info LONGBLOB     NOT NULL,
    log_status    INT(11)      NOT NULL,
    log_created   DATETIME     NOT NULL,
    log_modified  DATETIME     NOT NULL,
    PRIMARY KEY (id),
    UNIQUE KEY ux_undo_log (xid, branch_id)
) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8;

业务使用表如下

create table if not exists tb_order
(
    id         bigint auto_increment primary key comment '主键',
    product_id bigint comment '商品id',
    quantity   int comment '购买数量',
    price      decimal(10, 2) comment '总金额'
);

create table if not exists tb_product
(
    id    bigint auto_increment primary key comment '主键',
    name  varchar(20) comment '商品名称',
    stock int comment '库存量'
);

insert into tb_product(name, stock)
values ('小米手机', 100);

一个订单表,一个商品表,订单服务创建订单的过程中会调用商品服务来减库存,调用过程中会将全局事务id传递过去。

  1. 商品服务成功,订单成功,事务成功
  2. 商品服务失败,订单也失败,事务失败
  3. 商品服务成功,订单失败,事务回滚

项目代码

order 服务

配置 seata 地址

seata:
  application-id: ${spring.application.name} # Seata 应用编号,默认为 ${spring.application.name}
  tx-service-group: ${spring.application.name}-group # Seata 事务组编号,用于 TC 集群名
  # 服务配置项,对应 ServiceProperties 类
  service:
    # 虚拟组和分组的映射
    vgroup-mapping:
      spring-seata-order-group: default
    # 分组和 Seata 服务的映射
    grouplist:
      default: ip:8091
  enabled: false
spring:
  application:
    name: spring-seata-order
  datasource:
    url: jdbc:mysql://ip:port/testdb?characterEncoding=utf8&useSSL=true&serverTimezone=Asia/Shanghai
    username: root
    password: xxx
    driver-class-name: com.mysql.cj.jdbc.Driver
    hikari:
      max-lifetime: 30000
  cloud:
    nacos:
      discovery:
        namespace: d8b0df04-aa58-4a5b-b582-7d133b9e8b2c
        username: nacos
        password: nacos
        server-addr: ip:8848

服务调用,底层通过 nacos 做负载均衡

String resp = restTemplate.postForObject("http://spring-seata-product/api/v1/product/reduceProductStock", null, String.class);

通过拦截器,将 seata 的分布式事物 id 传递到其他服务,seata 内的 TransactionPropagationInterceptor 拦截器获取到此 id 并保存到 RootContext 中。

@Component
public class RestTemplateBeanPostProcessor implements BeanPostProcessor {

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof RestTemplate) {
            ((RestTemplate) bean).getInterceptors()
                    .add((request, body, execution) -> {
                        String xid = RootContext.getXID();
                        if (Objects.nonNull(xid)) {
                            request.getHeaders().add(RootContext.KEY_XID, xid);
                        }
                        return execution.execute(request, body);
                    });

        }
        return bean;
    }
}

product 服务

配置 seata 地址

seata:
  application-id: ${spring.application.name} # Seata 应用编号,默认为 ${spring.application.name}
  tx-service-group: ${spring.application.name}-group # Seata 事务组编号,用于 TC 集群名
  # 服务配置项,对应 ServiceProperties 类
  service:
    # 虚拟组和分组的映射
    vgroup-mapping:
      spring-seata-order-group: default
    # 分组和 Seata 服务的映射
    grouplist:
      default: ip:8091
  enabled: false
spring:
  application:
    name: spring-seata-product
  datasource:
    url: jdbc:mysql://ip:port/testdb?characterEncoding=utf8&useSSL=true&serverTimezone=Asia/Shanghai
    username: root
    password: xxx
    driver-class-name: com.mysql.cj.jdbc.Driver
    hikari:
      max-lifetime: 30000
  cloud:
    nacos:
      discovery:
        namespace: d8b0df04-aa58-4a5b-b582-7d133b9e8b2c
        username: nacos
        password: nacos
        server-addr: ip:8848

参考

Seata快速入门-官方文档
使用 Docker 部署 Seata Server (1.5.0)
芋道 Spring Boot 分布式事务 Seata 入门