一、问题复现
批量插入时,使用多线程对插入数据实现分批插入,在service层使用@Transactional注解,对应方法中线程池中开辟的子线程抛出异常时,没有回滚事务。
二、原因分析
-
事务管理范围不正确:
@Transactional
注解仅对当前方法有效,如果在方法内创建新的线程或使用线程池等异步操作,该方法之外的代码将无法受到事务的管理。因此,在使用多线程进行批量操作时,需要确保整个批量操作处于同一事务管理范围内。 -
Spring事务和Java线程池机制的互动问题:在使用
ThreadPoolExecutor
进行批量操作时,线程池中的线程和Spring管理的事务并不是同一个线程,这可能会导致事务管理器感知不到线程中的异常,从而导致事务未能回滚。
三、解决办法
弃用注解样事务,改为手动管理事务。
1 SqlSession sqlSession = SpringContextUtils.getBean(SqlSessionTemplate.class).getSqlSessionFactory() 2 .openSession(); 3 Connection connection = sqlSession.getConnection(); 4 OfflineExpressRecordExtMapper extMapper = sqlSession.getMapper(OfflineExpressRecordExtMapper.class); 5 6 // 批量插入 7 8 int taskCount = (int) Math.ceil((double) beanList.size() / THREAD_HANDLE); 9 ThreadPoolExecutor executor = SpringContextUtils 10 .getBean("offlineExpressRecordThreadPoolExecutor", ThreadPoolExecutor.class); 11 try { 12 connection.setAutoCommit(false); 13 ArrayList<Future<?>> futures = new ArrayList<>(); 14 for (int i = 0; i < taskCount; i++) { 15 int start = i * THREAD_HANDLE; 16 int end = (i + 1) * THREAD_HANDLE > beanList.size() ? beanList.size() : (i + 1) * THREAD_HANDLE; 17 List<OfflineExpressRecord> threadHandleList = beanList.subList(start, end); 18 Future<?> task = executor.submit(() -> extMapper.saveBatch(threadHandleList)); 19 futures.add(task); 20 } 21 // 等待插入完成,检验异常 22 for (Future<?> future : futures) { 23 future.get(); 24 } 25 connection.commit(); 26 } catch (Exception e) { 27 log.error("批量导入存储数据过程中出现异常", e); 28 connection.rollback(); 29 throw e; 30 } finally { 31 connection.close(); 32 }