SpringBoot的多数据源以及事务解决方案(下)

发布时间 2024-01-06 23:26:52作者: 我想去海边

SpringBoot的多数据源以及事务解决方案(下)

下面1-3实现方法来之看过的文章SpringBoot 多数据源及事务解决方案,4后为自己项目中用到的方法

  1. 多数据源事务处理

    1.1 关于事务的理解

    首先我们先理解下事务的本质

    1. 提到Spring事务,就离不开事务的四大特性和隔离级别、七大传播特性。

    事务特性和离级别是属于数据库范畴。Spring事务的七大传播特性是什么呢?它是Spring在当前线程内,处理多个事务操作时的事务应用策略,数据库事务本身并不存在传播特性

    • REQUIRED:默认的事务传播行为;需要事务:存在事务则使用已存在事务,否则创建新的事务;

    • SUPPORTS:支持已存在事务:存在事务则使用已存在事务,否则以非事务方式运行;

    • MANDATORY:强制使用事务:存在事务则使用已存在事务,否则抛出异常;

    • REQUIRES_NEW:需要新事务:存在事务则挂起已存在事务,否则创建新事务;

    • NOT_SUPPORTED:不支持事务:存在事务则挂起已存在事务,否则以非事务方式运行;

    • NEVER:从不使用事务:存在事务则抛出异常,否则以非事务方式运行;

    • NESTED:嵌套事务:存在事务则使创建保存点使用嵌套的事务,否则创建新的事务。

    1. Spring事务的定义包括:begin、commit、rollback、close、suspend、resume等动作。

    • begin(事务开始): 可以认为存在于数据库的命令中,比如Mysql的start transaction命令,但是在JDBC编程方式中不存在。

    • close(事务关闭): Spring事务的close()方法,是把Connection对象归还给数据库连接池,与事务无关。

    • suspend(事务挂起): Spring中事务挂起的语义是:需要新事务时,将现有的Connection保存起来(还有尚未提交的事务),然后创建新的Connection2Connection2提交、回滚、关闭完毕后,再把Connection1取出来继续执行。

    • resume(事务恢复): 嵌套事务执行完毕,返回上层事务重新绑定连接对象到事务管理器的过程。

    实际上,只有commit、rollback、close是在JDBC真实存在的,而其他动作都是应用的语意,而非JDBC事务的真实命令。因此,事务真实存在的方法是:setAutoCommit()commit()rollback()

    close()语义为:

    • 关闭一个数据库连接,这已经不再是事务的方法了。

    使用DataSource并不会执行物理关闭,只是归还给连接池。

  2. 为了保证在多个数据源中事务的一致性,我们可以手动管理Connetion的事务提交和回滚。考虑到不同ORM框架的事务管理实现差异,要求实现自定义事务管理不影响框架层的事务。

    这可以通过使用装饰器设计模式,对Connection进行包装重写commit和rolllback屏蔽其默认行为,这样就不会影响到原生Connection和ORM框架的默认事务行为。其整体思路如下图所示:

    图片

    这里并没有使用前面提到的@SwitchDataSource,这是因为我们在TransactionAop中已经执行了lookupKey的切换。

    2.1 定义多事务注解

    @Target({ElementType.METHOD})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    public @interface MultiTransaction {
       String transactionManager() default "multiTransactionManager";
       // 默认数据隔离级别,随数据库本身默认值
       IsolationLevel isolationLevel() default IsolationLevel.DEFAULT;
       // 默认为主库数据源
       String datasourceId() default "default";
       // 只读事务,若有更新操作会抛出异常
       boolean readOnly() default false;

    业务方法只需使用该注解即可开启事务,datasourceId指定事务用到的数据源,不指定默认为主库。

    2.3 包装Connection

    自定义事务我们使用包装过的Connection,屏蔽其中的commit&rollback方法。这样我们就可以在主事务里进行统一的事务提交和回滚操作。

    public class ConnectionProxy implements Connection {

       private final Connection connection;

       public ConnectionProxy(Connection connection) {
           this.connection = connection;
      }

       @Override
       public void commit() throws SQLException {
           // connection.commit();
      }

       public void realCommit() throws SQLException {
           connection.commit();
      }

       @Override
       public void close() throws SQLException {
           //connection.close();
      }

       public void realClose() throws SQLException {
           if (!connection.getAutoCommit()) {
               connection.setAutoCommit(true);
          }
           connection.close();
      }

       @Override
       public void rollback() throws SQLException {
           if(!connection.isClosed())
               connection.rollback();
      }
      ...
    }

    这里commit&close方法不执行操作,rollback执行的前提是连接执行close才生效。这样不管是使用哪个ORM框架,其自身事务管理都将失效。事务的控制就交由MultiTransaction控制了。

    2.4 事务上下文管理

    public class TransactionHolder {
       // 是否开启了一个MultiTransaction
       private boolean isOpen;
       // 是否只读事务
       private boolean readOnly;
       // 事务隔离级别
       private IsolationLevel isolationLevel;
       // 维护当前线程事务ID和连接关系
       private ConcurrentHashMap<String, ConnectionProxy> connectionMap;
       // 事务执行栈
       private Stack<String> executeStack;
       // 数据源切换栈
       private Stack<String> datasourceKeyStack;
       // 主事务ID
       private String mainTransactionId;
       // 执行次数
       private AtomicInteger transCount;

       // 事务和数据源key关系
       private ConcurrentHashMap<String, String> executeIdDatasourceKeyMap;

    }

    每开启一个事物,生成一个事务ID并绑定一个ConnectionProxy。事务嵌套调用,保存事务ID和lookupKey至栈中,当内层事务执行完毕执行pop。这样的话,外层事务只需在栈中执行peek即可获取事务ID和lookupKey。

    2.5 数据源兼容处理

    为了不影响原生事务的使用,需要重写getConnection方法。当前线程没有启动自定义事务,则直接从数据源中返回连接。

    @Override
       public Connection getConnection() throws SQLException {
           TransactionHolder transactionHolder = MultiTransactionManager.TRANSACTION_HOLDER_THREAD_LOCAL.get();
           if (Objects.isNull(transactionHolder)) {
               return determineTargetDataSource().getConnection();
          }
           ConnectionProxy ConnectionProxy = transactionHolder.getConnectionMap()
                  .get(transactionHolder.getExecuteStack().peek());
           if (ConnectionProxy == null) {
               // 没开跨库事务,直接返回
               return determineTargetDataSource().getConnection();
          } else {
               transactionHolder.addCount();
               // 开了跨库事务,从当前线程中拿包装过的Connection
               return ConnectionProxy;
          }
      }

    2.6 切面处理

    切面处理的核心逻辑是:维护一个嵌套事务栈,当业务方法执行结束,或者发生异常时,判断当前栈顶事务ID是否为主事务ID。如果是的话这时候已经到了最外层事务,这时才执行提交和回滚。详细流程如下图所示:

    图片

    package com.github.mtxn.transaction.aop;

    import com.github.mtxn.application.Application;
    import com.github.mtxn.transaction.MultiTransactionManager;
    import com.github.mtxn.transaction.annotation.MultiTransaction;
    import com.github.mtxn.transaction.context.DataSourceContextHolder;
    import com.github.mtxn.transaction.support.IsolationLevel;
    import com.github.mtxn.transaction.support.TransactionHolder;
    import com.github.mtxn.utils.ExceptionUtils;
    import lombok.extern.slf4j.Slf4j;
    import org.aspectj.lang.ProceedingJoinPoint;
    import org.aspectj.lang.annotation.Around;
    import org.aspectj.lang.annotation.Aspect;
    import org.aspectj.lang.annotation.Pointcut;
    import org.aspectj.lang.reflect.MethodSignature;
    import org.springframework.core.annotation.Order;
    import org.springframework.stereotype.Component;

    import java.lang.reflect.Method;


    @Aspect
    @Component
    @Slf4j
    @Order(99999)
    public class MultiTransactionAop {

       @Pointcut("@annotation(com.github.mtxn.transaction.annotation.MultiTransaction)")
       public void pointcut() {
           if (log.isDebugEnabled()) {
               log.debug("start in transaction pointcut...");
          }
      }


       @Around("pointcut()")
       public Object aroundTransaction(ProceedingJoinPoint point) throws Throwable {
           MethodSignature signature = (MethodSignature) point.getSignature();
           // 从切面中获取当前方法
           Method method = signature.getMethod();
           MultiTransaction multiTransaction = method.getAnnotation(MultiTransaction.class);
           if (multiTransaction == null) {
               return point.proceed();
          }
           IsolationLevel isolationLevel = multiTransaction.isolationLevel();
           boolean readOnly = multiTransaction.readOnly();
           String prevKey = DataSourceContextHolder.getKey();
           MultiTransactionManager multiTransactionManager = Application.resolve(multiTransaction.transactionManager());
           // 切数据源,如果失败使用默认库
           if (multiTransactionManager.switchDataSource(point, signature, multiTransaction)) return point.proceed();
           // 开启事务栈
           TransactionHolder transactionHolder = multiTransactionManager.startTransaction(prevKey, isolationLevel, readOnly, multiTransactionManager);
           Object proceed;

           try {
               proceed = point.proceed();
               multiTransactionManager.commit();
          } catch (Throwable ex) {
               log.error("execute method:{}#{},err:", method.getDeclaringClass(), method.getName(), ex);
               multiTransactionManager.rollback();
               throw ExceptionUtils.api(ex, "系统异常:%s", ex.getMessage());
          } finally {
               // 当前事务结束出栈
               String transId = multiTransactionManager.getTrans().getExecuteStack().pop();
               transactionHolder.getDatasourceKeyStack().pop();
               // 恢复上一层事务
               DataSourceContextHolder.setKey(transactionHolder.getDatasourceKeyStack().peek());
               // 最后回到主事务,关闭此次事务
               multiTransactionManager.close(transId);
          }
           return proceed;

      }


    }

    结论

    本文主要介绍了多数据源管理的解决方案(应用层事务,而非XA二段提交保证),以及对多个库同时操作的事务管理。

    需要注意的是,这种方式只适用于单体架构的应用。因为多个库的事务参与者都是运行在同一个JVM进行。如果是在微服务架构的应用中,则需要使用分布式事务管理(譬如:Seata)。

     

3.分布式事务的实现

图片

数据库事务

数据库事务(简称:事务 ),是数据库管理系统执行过程中的一个逻辑单位,由一个有限的数据库操作序列构成,这些操作要么全部执行,要么全部不执 行,是一个不可分割 的工作单位。

数据库事务的几个典型特性:原子性(Atomicity )、一致性( Consistency )、隔离性( Isolation)和持久性(Durabilily),简称就是ACID。

图片

  • 原子性: 事务作为一个整体被执行,包含在其中的对数据库的操作要么全部被执行,要么都不执行。

  • 一致性: 指在事务开始之前和事务结束以后,数据不会被破坏,假如A账户给B账户转10块钱,不管成功与否,A和B的总金额是不变的。

  • 隔离性: 多个事务并发访问时,事务之间是相互隔离的,即一个事务不影响其它事务运行效果。简言之,就是事务之间是进水不犯河水的。

  • 持久性: 表示事务完成以后,该事务对数据库所作的操作更改,将持久地保存在数据库之中。

事务的实现原理

本地事务

传统的单服务器,单关系型数据库下的事务,就是本地事务。本地事务由资源管理器管理,JDBC事务就是一个非常典型的本地事务。图片

事务日志

innodb事务日志包括redo log和undo log。

redo log(重做日志)

redo log通常是物理日志,记录的是数据页的物理修改,而不是某一行或某几行修改成怎样,它用来恢复提交后的物理数据页。

undo log(回滚日志)

undo log是逻辑日志,和redo log记录物理日志的不一样。可以这样认为,当delete一条记录时,undo log中会记录一条对应的insert记录,当update一条记录时,它记录一条对应相反的update记录。

事务ACID特性的实现思想

  • 原子性:是使用 undo log来实现的,如果事务执行过程中出错或者用户执行了rollback,系统通过undo log日志返回事务开始的状态。

  • 持久性:使用 redo log来实现,只要redo log日志持久化了,当系统崩溃,即可通过redo log把数据恢复。

  • 隔离性:通过锁以及MVCC,使事务相互隔离开。

  • 一致性:通过回滚、恢复,以及并发情况下的隔离性,从而实现一致性。

分布式事务

分布式事务: 就是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。简单来说,分布式事务指的就是分布式系统中的事务,它的存在就是为了保证不同数据库节点的数据一致性。

为什么需要分布式事务?接下来分两方面阐述:

微服务架构下的分布式事务

随着互联网的快速发展,轻盈且功能划分明确的微服务,登上了历史舞台。比如,一个用户下订单,购买直播礼物的服务,被拆分成三个service,分别是金币服务(coinService),下订单服务(orderService)、礼物服务(giftService)。这些服务都部署在不同的机器上(节点),对应的数据库(金币数据库、订单数据库、礼物数据库)也在不同节点上。

图片

用户下单购买礼物,礼物数据库、金币数据库、订单数据库在不同节点上,用本地事务是不可以的,那么如何保证不同数据库(节点)上的数据一致性呢?这就需要分布式事务啦~

分库分表下的分布式事务

随着业务的发展,数据库的数据日益庞大,超过千万级别的数据,我们就需要对它分库分表(以前公司是用mycat分库分表,后来用sharding-jdbc)。一分库,数据又分布在不同节点上啦,比如有的在深圳机房,有的在北京机房~你再想用本地事务去保证,已经无动于衷啦~还是需要分布式事务啦。

比如A转10块给B,A的账户数据是在北京机房,B的账户数据是在深圳机房。流程如下:

图片

CAP 理论&BASE 理论

学习分布式事务,当然需要了解 CAP 理论和BASE 理论。

CAP理论

CAP理论作为分布式系统的基础理论,指的是在一个分布式系统中, Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性),这三个要素最多只能同时实现两点。图片

一致性(C:Consistency):

一致性是指数据在多个副本之间能否保持一致的特性。例如一个数据在某个分区节点更新之后,在其他分区节点读出来的数据也是更新之后的数据。

可用性(A:Availability):

可用性是指系统提供的服务必须一直处于可用的状态,对于用户的每一个操作请求总是能够在有限的时间内返回结果。这里的重点是"有限时间内"和"返回结果"。

分区容错性(P:Partition tolerance):

分布式系统在遇到任何网络分区故障的时候,仍然需要能够保证对外提供满足一致性和可用性的服务。

选择说明
CA 放弃分区容错性,加强一致性和可用性,其实就是传统的单机数据库的选择
AP 放弃一致性,分区容错性和可用性,这是很多分布式系统设计时的选择
CP 放弃可用性,追求一致性和分区容错性,网络问题会直接让整个系统不可用

BASE 理论

BASE 理论, 是对CAP中AP的一个扩展,对于我们的业务系统,我们考虑牺牲一致性来换取系统的可用性和分区容错性。BASE是Basically Available(基本可用),Soft state(软状态),和 Eventually consistent(最终一致性)三个短语的缩写。

Basically Available

基本可用:通过支持局部故障而不是系统全局故障来实现的。如将用户分区在 5 个数据库服务器上,一个用户数据库的故障只影响这台特定主机那 20% 的用户,其他用户不受影响。

Soft State

软状态,状态可以有一段时间不同步

Eventually Consistent

最终一致,最终数据是一致的就可以了,而不是时时保持强一致。

 

分布式事务解决方案主要有以下这几种:

  • 2PC(二阶段提交)方案

  • TCC(Try、Confirm、Cancel)

  • 本地消息表

  • 最大努力通知

  • Saga事务

二阶段提交方案

二阶段提交方案是常用的分布式事务解决方案。事务的提交分为两个阶段:准备阶段和提交执行方案。

二阶段提交成功的情况

准备阶段 ,事务管理器向每个资源管理器发送准备消息,如果资源管理器的本地事务操作执行成功,则返回成功。

提交执行阶段 ,如果事务管理器收到了所有资源管理器回复的成功消息,则向每个资源管理器发送提交消息,RM 根据 TM 的指令执行提交。如图:

图片

二阶段提交失败的情况

准备阶段 ,事务管理器向每个资源管理器发送准备消息,如果资源管理器的本地事务操作执行成功,则返回成功,如果执行失败,则返回失败。

提交执行阶段 ,如果事务管理器收到了任何一个资源管理器失败的消息,则向每个资源管理器发送回滚消息。资源管理器根据事务管理器的指令回滚本地事务操作,释放所有事务处理过程中使用的锁资源。

图片

二阶段提交优缺点

2PC方案实现起来简单,成本较低,但是主要有以下缺点

  • 单点问题:如果事务管理器出现故障,资源管理器将一直处于锁定状态。

  • 性能问题:所有资源管理器在事务提交阶段处于同步阻塞状态,占用系统资源,一直到提交完成,才释放资源,容易导致性能瓶颈。

  • 数据一致性问题:如果有的资源管理器收到提交的消息,有的没收到,那么会导致数据不一致问题。

TCC(补偿机制)

TCC 采用了补偿机制,其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作。

TCC(Try-Confirm-Cancel)模型

TCC(Try-Confirm-Cancel)是通过对业务逻辑的分解来实现分布式事务。针对一个具体的业务服务,TCC 分布式事务模型需要业务系统都实现一下三段逻辑:

try阶段 :尝试去执行,完成所有业务的一致性检查,预留必须的业务资源。

Confirm阶段 :该阶段对业务进行确认提交,不做任何检查,因为try阶段已经检查过了,默认Confirm阶段是不会出错的。

Cancel 阶段 :若业务执行失败,则进入该阶段,它会释放try阶段占用的所有业务资源,并回滚Confirm阶段执行的所有操作。

图片

TCC 分布式事务模型包括三部分:主业务服务、从业务服务、业务活动管理器

  • 主业务服务:主业务服务负责发起并完成整个业务活动。

  • 从业务服务:从业务服务是整个业务活动的参与方,实现Try、Confirm、Cancel操作,供主业务服务调用。

  • 业务活动管理器:业务活动管理器管理控制整个业务活动,包括记录事务状态,调用从业务服务的 Confirm 操作,调用从业务服务的 Cancel 操作等。

下面再拿用户下单购买礼物作为例子来模拟TCC实现分布式事务的过程:

假设用户A余额为100金币,拥有的礼物为5朵。A花了10个金币,下订单,购买10朵玫瑰。余额、订单、礼物都在不同数据库。

TCC的Try阶段:

  • 生成一条订单记录,订单状态为待确认。

  • 将用户A的账户金币中余额更新为90,冻结金币为10(预留业务资源)

  • 将用户的礼物数量为5,预增加数量为10。

  • Try成功之后,便进入Confirm阶段

  • Try过程发生任何异常,均进入Cancel阶段

图片

TCC的Confirm阶段:

  • 订单状态更新为已支付

  • 更新用户余额为90,可冻结为0

  • 用户礼物数量更新为15,预增加为0

  • Confirm过程发生任何异常,均进入Cancel阶段

  • Confirm过程执行成功,则该事务结束

图片TCC的Cancel阶段:

  • 修改订单状态为已取消

  • 更新用户余额回100

  • 更新用户礼物数量为5

图片

TCC优缺点

TCC方案让应用可以自定义数据库操作的粒度,降低了锁冲突,可以提升性能,但是也有以下缺点:

  • 应用侵入性强,try、confirm、cancel三个阶段都需要业务逻辑实现。

  • 需要根据网络、系统故障等不同失败原因实现不同的回滚策略,实现难度大,一般借助TCC开源框架,ByteTCC,TCC-transaction,Himly。

本地消息表

ebay最初提出本地消息表这个方案,来解决分布式事务问题。业界目前使用这种方案是比较多的,它的核心思想就是将分布式事务拆分成本地事务进行处理。可以看一下基本的实现流程图:

图片

基本实现思路

发送消息方:

  • 需要有一个消息表,记录着消息状态相关信息。

  • 业务数据和消息表在同一个数据库,即要保证它俩在同一个本地事务。

  • 在本地事务中处理完业务数据和写消息表操作后,通过写消息到MQ消息队列。

  • 消息会发到消息消费方,如果发送失败,即进行重试。

消息消费方:

  • 处理消息队列中的消息,完成自己的业务逻辑。

  • 此时如果本地事务处理成功,则表明已经处理成功了。

  • 如果本地事务处理失败,那么就会重试执行。

  • 如果是业务上面的失败,给消息生产方发送一个业务补偿消息,通知进行回滚等操作。

生产方和消费方定时扫描本地消息表,把还没处理完成的消息或者失败的消息再发送一遍。如果有靠谱的自动对账补账逻辑,这种方案还是非常实用的。

优点&缺点:

该方案的优点是很好地解决了分布式事务问题,实现了最终一致性。缺点是消息表会耦合到业务系统中。

最大努力通知

什么是最大通知

最大努力通知也是一种分布式事务解决方案。下面是企业网银转账一个例子

图片

  • 企业网银系统调用前置接口,跳转到转账页

  • 企业网银调用转账系统接口

  • 转账系统完成转账处理,向企业网银系统发起转账结果通知,若通知失败,则转账系统按策略进行重复通知。

  • 企业网银系统未接收到通知,会主动调用转账系统的接口查询转账结果。

  • 转账系统会遇到退汇等情况,会定时回来对账。

最大努力通知方案的目标,就是发起通知方通过一定的机制,最大努力将业务处理结果通知到接收方 。最大努力通知实现机制如下:

图片

最大努力通知解决方案

要实现最大努力通知,可以采用MQ的ack机制。

方案

图片

  • 1.发起方将通知发给MQ。

  • 2.接收通知方监听MQ消息。

  • 3.接收通知方收到消息后,处理完业务,回应ack。

  • 4.接收通知方若没有回应ack,则MQ会间隔1min、5min、10min等重复通知。

  • 5.接受通知方可用消息校对接口,保证消息的一致性。

转账业务实现流程图:

图片

交互流程如下:

  • 1、用户请求转账系统进行转账。

  • 2、转账系统完成转账,将转账结果发给MQ。

  • 3、企业网银系统监听MQ,接收转账结果通知,如果接收不到消息,MQ会重复发送通知。接收到转账结果,更新转账状态。

  • 4、企业网银系统也可以主动查询转账系统的转账结果查询接口,更新转账状态。

4.我在项目中用到的是自己写的tcc事务 模式和阿里的seta分布式事务

下面重点介绍一下自己扩展的TCC(Try-Confirm-Cancel)模型模式事务

1.代码实现

定义下面实现类 DynamicDataSourceTransactionManager ,DynamicDataSourceTransactionObject ,DataSourceTransactionObject

DynamicDataSourceTransactionObject(动态数据源事务管理器的事务对象)

/**
* 事务流水号,用于跟踪问题
*/
private String serialNumber;
/**
* 线程中的事务配置,就是txAdvice相关信息,如,read-only="true" isolation="READ_COMMITTED"等
*/
private TransactionDefinition definition;
/**
* Map<数据源别名,DataSourceTransactionObject>
*/
private Map<String, DataSourceTransactionObject> dsTxObjMap = new LinkedHashMap<>();

DataSourceTransactionObject(定义数据源事务管理器对象)


public class DataSourceTransactionObject extends JdbcTransactionObjectSupport {

private boolean newConnectionHolder;
private boolean mustRestoreAutoCommit;

public void setConnectionHolder(ConnectionHolder connectionHolder, boolean newConnectionHolder) {
super.setConnectionHolder(connectionHolder);
this.newConnectionHolder = newConnectionHolder;
}

public boolean isNewConnectionHolder() {
return this.newConnectionHolder;
}

public void setMustRestoreAutoCommit(boolean mustRestoreAutoCommit) {
this.mustRestoreAutoCommit = mustRestoreAutoCommit;
}

public boolean isMustRestoreAutoCommit() {
return this.mustRestoreAutoCommit;
}

public void setRollbackOnly() {
getConnectionHolder().setRollbackOnly();
}

@Override
public boolean isRollbackOnly() {
return getConnectionHolder().isRollbackOnly();
}

@Override
public void flush() {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronizationUtils.triggerFlush();
}
}

 

定义好上述对象后就可以干正事了实现 DynamicDataSourceTransactionManager(多数据源事务管理器)

编程式事务过程,我们简化了一下,如下:

1、定义事务属性信息:TransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
2、定义事务管理器:PlatformTransactionManager platformTransactionManager = new DataSourceTransactionManager(dataSource);
3、获取事务:TransactionStatus transactionStatus = platformTransactionManager.getTransaction(transactionDefinition);
4、执行sql操作:比如上面通过JdbcTemplate的各种方法执行各种sql操作
5、提交事务(platformTransactionManager.commit)或者回滚事务(platformTransactionManager.rollback)

定义对象

/**
*数据源对象
*/
private static ThreadLocal<DynamicDataSourceTransactionObject> threadLocalTopDynamicTxObject = new TransmittableThreadLocal<>();
/**
* 线程中是否有活跃的事务(进行过dobegin的事务)
*/
private static ThreadLocal<Boolean> transactionActive = new TransmittableThreadLocal<>();

/**
* <pre>
* 事务是否支持数据库层级别的只读
* 则会改变Statement的行为
* </pre>
*/
private boolean enforceReadOnly = false;

private static DynamicDataSource dynamicDataSource;

/**
* 设置为嵌套事务
*/
private static DynamicDataSourceTransactionManager dynamicDataSourceTransactionManager;
public DynamicDataSourceTransactionManager() {
setNestedTransactionAllowed(true);
}

public DynamicDataSourceTransactionManager(DynamicDataSource dynamicDataSource) {
this();
setDynamicDataSource(dynamicDataSource);
}

public void setDynamicDataSource(DynamicDataSource dynamicDataSource) {
DynamicDataSourceTransactionManager.dynamicDataSource = dynamicDataSource;
DynamicDataSourceTransactionManager.dynamicDataSourceTransactionManager = this;
}

public void setEnforceReadOnly(boolean enforceReadOnly) {
this.enforceReadOnly = enforceReadOnly;
}

publi

DynamicDataSourceTransactionManager 类继承AbstractPlatformTransactionManager 类和 ResourceTransactionManager 接口

1.首先获取事务对象 重写 getResourceFactory返回数据源

    public Object getResourceFactory() {
return dynamicDataSource;
}

2.生成一个在整个事务处理都用到的资源,重写doGetTransaction

        DynamicDataSourceTransactionObject abTxObject = new DynamicDataSourceTransactionObject();
// 如果线程没有顶层,则说明这次是顶层事务
if (threadLocalTopDynamicTxObject.get() == null) {
threadLocalTopDynamicTxObject.set(abTxObject);
} else {
}
return abTxObject;

3.重写doBegin 开启事务

    protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException {
DynamicDataSourceTransactionObject abTxObject = (DynamicDataSourceTransactionObject) transaction;
if (log.isDebugEnabled()) {
log.debug("多数据源事务编号[" + abTxObject.getSerialNumber() + "]开始");
}
abTxObject.setDefinition(definition);
// 先把本地数据源加入管理中
addGlobalDataSource(DataSourceUtil.GLOBAL_DATASOURCE, dynamicDataSource, abTxObject);
transactionActive.set(true);// 标记线程已开启了事务
}

先把本地数据源加入管理中

private static void addGlobalDataSource(String dsKey, DataSource dataSource, DynamicDataSourceTransactionObject abTxObject) {
try {
if (dataSource == null) {
return;
}

// 拿出dsKey的资源 txObject
DataSourceTransactionObject txObject = abTxObject.getDsTxObj(dsKey);
if (txObject == null) {
txObject = new DataSourceTransactionObject();
abTxObject.putDsTxObj(dsKey, txObject);
}

// 判断资源是否为空,或者资源是否在事务中要同步(则不允许其他事务重复使用)
if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
// 尝试在资源中拿
ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
if (conHolder != null) {
// 标记为不是这次管理这次事务中生成的资源,后面回收资源时不回收
txObject.setConnectionHolder(conHolder, false);
} else {
Connection newCon = dataSource.getConnection();
// 标记为这次事务中生成的资源,需要回收资源
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
}

txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
Connection con = txObject.getConnectionHolder().getConnection();

TransactionDefinition definition = abTxObject.getDefinition();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);

// 设置这个链接必须要被恢复为自动提交,有些数据源的池是有这方面的需要的
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
con.setAutoCommit(false);
}

prepareTransactionalConnection(con, definition);

// 设置超时
int timeout = staticDetermineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}

// 判断是否本次事务生成的新资源,需要绑定到资源中
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(dataSource, txObject.getConnectionHolder());
}
} catch (Throwable ex) {
ex.printStackTrace();
DataSourceTransactionObject txObject = abTxObject.getDsTxObj(dsKey);
// 释放和关闭这次事务的相关资源
if (txObject != null && txObject.isNewConnectionHolder()) {
DataSource ds = DataSourceUtil.getDataSourceByAlias(dsKey);
Connection con = txObject.getConnectionHolder().getConnection();
DataSourceUtils.releaseConnection(con, ds);
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("在多数据源事务编号[" + abTxObject.getSerialNumber() + "]中," + "数据源别名[" + dsKey + "]打开连接错误", ex);
}

}