多线程事务的提交解决办法

发布时间 2023-04-11 09:59:20作者: 夏之
多线程处理的时候,如果发生了错误,不会因为加了@Transcational注解而生效,这里需要额外使用
SqlSessionTemplate

{
//插入主表
electronicTaxBillMapper.insertBatch(masterList);
//更新出库单状态
outOrderDetailMapper.updateByOrderCodeList(orderCodeList);
  //切割模板 可以直接使用list.subList();直接使用这个方法,也可以统一包装一个方法出来使用。
List<List<ElectronicTaxBillDetailDO>> insertSplit = CommonUtils.split(detailList,
5000);
// 插入任务列表
List<CompletableFuture<Boolean>> tasks = new ArrayList<>();
// 根据sqlSessionTemplate获取SqlSession工厂
SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();
  //获取会话
SqlSession sqlSession = sqlSessionFactory.openSession();
// 获取Connection来手动控制事务
Connection connection = sqlSession.getConnection();

try {
//关闭事务自动提交
connection.setAutoCommit(false);
//获取mapper对象 定义一个映射关系
ElectronicTaxBillDetailMapper sqlSessioneleTaxBillDetailMapper = sqlSession
.getMapper(ElectronicTaxBillDetailMapper.class);
    //循环去获取数据
for (int i = 0; i < insertSplit.size(); i++) {
    List<ElectronicTaxBillDetailDO> electronicTaxBillDetailDOS = insertSplit.get(i);
    //使用的supplyAsync()方法会有返回值,如果使用runAsync()方法调用的是Runnale接口没有返回值。
CompletableFuture<Boolean> task = CompletableFuture.supplyAsync(() -> {
try {
//一个线程发生异常之后,直接抛出异常,后续不再执行插入操作
sqlSessioneleTaxBillDetailMapper.insertBatch(electronicTaxBillDetailDOS);
} catch (Exception e) {
return Boolean.FALSE;
}
return Boolean.TRUE;
}, THREAD_POOL_EXECUTOR);
tasks.add(task);
}

// 等待所有任务完成
     // join()和get()方法都是用来获取CompletableFuture异步之后的返回值 join()用来接收一些未检查的异常,get()用来接收一些检查的异常,需要抛错,或者try catch
CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])).join();
     //上面的线程处理的时候,如果插入数据库有问题,就会返回一个false,所以下面校验的时候如果有false,则回滚
// 如果有任意一个任务抛出异常,则回滚所有数据
for (CompletableFuture<Boolean> task : tasks) {
if (Boolean.FALSE.equals(task.get())) {
log.error("插入异常,事务回滚");
connection.rollback();
return CommonResult.error(INSERT_FAILED);
}
}
connection.commit();
long endTime = System.currentTimeMillis();
log.info("电子税单导入数据成功,主数据数量{},明细数据数量{}", masterList.size(),
detailList.size());
log.info("运输单导入耗时:{}毫秒" + (endTime - startTime));
} catch (Exception e) {
connection.rollback();
e.printStackTrace();
}
}