Seata客户端集成

发布时间 2024-01-02 14:21:20作者: 周仙僧

架构环境

编辑工具:Gradle 8.4
运行环境:OpenJdk 1.8.0_391
开发框架:Spring-boot 2.7.17
微服务架构:Spring Cloud 2021.0.8

客户端依赖

官方提供了如下依赖方式,根据自身项目选择一个即可:

  • 依赖seata-all,传统依赖方式,提供了Seata客户端的核心功能
  • 依赖seata-spring-boot-starter,支持yml、properties配置(.conf可删除),内部已依赖seata-all
  • 依赖spring-cloud-alibaba-seata,内部集成了seata,并实现了xid传递

介于本文项目使用了Spring-boot开发框架,且没有引入spring-cloud-alibaba架构,最终使用了seata-spring-boot-starter,依赖配置如下:
implementation group: 'io.seata', name: 'seata-spring-boot-starter', version: '1.8.0'

配置

项目的resources目录下添加file.conf文件,配置参数可以参考Seata官网参数配置表
【file.conf】文件

transport {
  # tcp, unix-domain-socket
  type = "TCP"
  #NIO, NATIVE
  server = "NIO"
  #enable heartbeat
  heartbeat = true
  # the tm client batch send request enable
  enableTmClientBatchSendRequest = false
  # the rm client batch send request enable
  enableRmClientBatchSendRequest = true
   # the rm client rpc request timeout
  rpcRmRequestTimeout = 2000
  # the tm client rpc request timeout
  rpcTmRequestTimeout = 30000
  # the rm client rpc request timeout
  rpcRmRequestTimeout = 15000
  #thread factory for netty
  threadFactory {
    bossThreadPrefix = "NettyBoss"
    workerThreadPrefix = "NettyServerNIOWorker"
    serverExecutorThread-prefix = "NettyServerBizHandler"
    shareBossWorker = false
    clientSelectorThreadPrefix = "NettyClientSelector"
    clientSelectorThreadSize = 1
    clientWorkerThreadPrefix = "NettyClientWorkerThread"
    # netty boss thread size
    bossThreadSize = 1
    #auto default pin or 8
    workerThreadSize = "default"
  }
  shutdown {
    # when destroy server, wait seconds
    wait = 3
  }
  serialization = "seata"
  compressor = "none"
}
service {
  #transaction service group mapping
  vgroupMapping.default_tx_group = "fund_trade"
  #only support when registry.type=file, please don't set multiple addresses
  fund_trade.grouplist = "127.0.0.1:8091"
  #degrade, current not support
  enableDegrade = false
  #disable seata
  disableGlobalTransaction = false
}

client {
  rm {
    asyncCommitBufferLimit = 10000
    lock {
      retryInterval = 10
      retryTimes = 30
      retryPolicyBranchRollbackOnConflict = true
    }
    reportRetryCount = 5
    tableMetaCheckEnable = false
    tableMetaCheckerInterval = 60000
    reportSuccessEnable = false
    sagaBranchRegisterEnable = false
    sagaJsonParser = "fastjson"
    sagaRetryPersistModeUpdate = false
    sagaCompensatePersistModeUpdate = false
    tccActionInterceptorOrder = -2147482648 #Ordered.HIGHEST_PRECEDENCE + 1000
    sqlParserType = "druid"
    branchExecutionTimeoutXA = 60000
    connectionTwoPhaseHoldTimeoutXA = 10000
  }
  tm {
    commitRetryCount = 5
    rollbackRetryCount = 5
    defaultGlobalTransactionTimeout = 60000
    degradeCheck = false
    degradeCheckPeriod = 2000
    degradeCheckAllowTimes = 10
    interceptorOrder = -2147482648 #Ordered.HIGHEST_PRECEDENCE + 1000
  }
  undo {
    dataValidation = true
    onlyCareUpdateColumns = true
    logSerialization = "jackson"
    logTable = "undo_log"
    compress {
      enable = true
      # allow zip, gzip, deflater, lz4, bzip2, zstd default is zip
      type = zip
      # if rollback info size > threshold, then will be compress
      # allow k m g t
      threshold = 64k
    }
  }
  loadBalance {
      type = "XID"
      virtualNodes = 10
  }
}
log {
  exceptionRate = 100
}
tcc {
  fence {
    # tcc fence log table name
    logTableName = tcc_fence_log
    # tcc fence log clean period
    cleanPeriod = 1h
  }
}

补充说明:service.vgroupMapping.default_tx_group配置默认集群分组名称,service.分组名称.grouplist配置分组对应的seata-server服务的ip地址和端口

数据库脚本

Seata客户端默认使用AT模式,启动客户端前需要执行数据库脚本创建undo_log,用于存储全局事务执行时业务数据的备份,脚本内容可从seata源码中获取。

开启事务

在需要开启全局事务的业务代码快,添加@GlobalTransactional注解即可,如下图:
image

全局事务id提供

由于seata-spring-boot-starter没有实现服务之间xid的传递功能,为了保证服务之间Http或者feign调用过程中,调用方与被调用方能够处于同一事务中,提供了如下xid传递方式:

@Configuration
public class SeataGlobalXidConfig implements RequestInterceptor {

    @Override
    public void apply(RequestTemplate template) {
        //解决不传递请求头中的token
        ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
        if (attributes != null){
            HttpServletRequest request = attributes.getRequest();
            Enumeration<String> headerNames = request.getHeaderNames();
//            //可以在这里将自定义请求头传递进去, key 请求, value 值
//            //处理上游请求头信息,传递时继续携带
            String token = request.getHeader("token");
            template.header("token", StringUtils.isNotEmpty(token) ? token : "");
        }
//
//        // 解决seata的xid未传递
        String xid = RootContext.getXID();
        template.header(RootContext.KEY_XID, xid);
    }
}

遇到的问题

  1. 在提供通过feign拦截器添加事务传递的方式时,拦截器获取当前全局事务xid时为空。
    原因是项目中启用了hystrix服务降级机制,且hystrix.command.default.execution.strategy为多线程策略,会导致feign拦截器中获取不到当前执行环境中的事务;将hystrix.command.default.execution.strategy改为信号量传递即可。
  2. 被调用服务抛出异常后,调用方的事务没有回滚。
    原因是项目中提供了服务异常时的降级处理机制,导致异常没有正常抛出,没有触发全局事务的回滚;可以通过对异常进行封装,对特定的异常进行抛出,以触发全局事务回滚。