ConcurrentModificationException日志关键字报警引发的思考

发布时间 2023-12-11 08:10:58作者: James_Shangguan

本文将记录和分析日志中的ConcurrentModificationException关键字报警,还有一些我的思考,希望对大家有帮助。

一、背景

近期,在日常的日志关键字报警分析时,发现我负责的一个电商核心系统在某时段存在较多ConcurrentModificationException异常日志,遂进行分析和改进,下面是我的一些思考。

1.1 系统架构

一直以来,无状态的服务都被当作分布式服务设计的最佳实践。因为无状态的服务对于扩展性和运维方面有着得天独厚的优势,可以随意地增加和减少节点。本系统的整体架构可以认为是由一个MQ应用、一个RPC应用底层存储组成。

RPC应用是无状态服务,对外提供常用的查询和操作接口;采用双机房部署,每个机房10*8C16G;

MQ应用是无状态服务,负责消费MQ消息,在消费过程中会调用该RPC应用提供方法;采用双机房部署,每个机房5*8C16G;

底层存储用的是数据库集群和缓存集群,大概如图所示:

1.2 关键代码

MyRpcService 对外提供RPC服务,getList 方法可以根据入参中的状态进行查询,由于业务需要,需要对入参的状态进行排序,实现部分关键代码如下:

public class MyRpcServiceImpl implements MyRpcService{

    @Override
    public BaseResult getList(ListParam listParam) {

        BaseResult baseResult = new BaseResult();

        List<Integer> states = listParam.getStateList();

        // 省略大段代码
        KeyUtil.getKeyString(states);
        // 省略大段代码

        baseResult.setSuccess(true);

        return baseResult;
    }

}

KeyUtil 是一个工具类,getKeyString 方法对入参的itemList进行排序使用的是Java集合框架内置的sort 方法,代码如下:

public class KeyUtil {

    public static String getKeyString(List<Integer> itemList) {
        String result = "";
        //省略代码
        Collections.sort(itemList);
        //省略代码
        return result;
    }

}

MyMqConsumer是MQ消费者,负责监听消息进行消费。在消费逻辑中,会调用MyRpcServicegetList() 方法进行状态查询,因为查询的状态是固定的,所以在Consumer类中定义了static final 类型的stateList ,关键代码如下:

public class MyMqConsumer implements MessageListener{

    public static final List<Integer> stateList = Stream.of(1).collect(Collectors.toList());

    @Resource
    private MyRpcService myRpcService;

    @Override
    public void onMessage(List<Message> messageList) {

        // 省略代码

        for (Message message : messageList) {

            // 省略其他代码
            ListParam listParam = new ListParam();
            listParam.setStateList(stateList);
            BaseResult result = myRpcService.getList(listParam);
            // 省略其他代码

        }

    }

}

二、  原因分析

看了上面的系统架构和关键代码,不知道你有没有发现问题?可以先抛开设计和代码实现方面的问题不谈,只看这样的代码能不能正常执行,得到正确的业务结果。

既然这么问了,当然会有问题:在高并发环境下,MQ应用在消费消息时,调用RPC服务查询时可能会抛出异常,从而触发MQ异常重试,至于对业务有没有影响,得具体问题具体分析了。

ERROR 执行流程时出错
java.util.ConcurrentModificationException:null
at java.util.ArrayList.forEach(ArrayList.java:1260)~[:?1.8.0_192]
at com.shangguan.test.util.KeyUtil.getKeyString(KeyUtil.java:10)
...

2.1 分析1-ArrayList源码

从日志中可以看到,ConcurrentModificationExceptionjava.util.ArrayList类里面的forEach方法抛出来的,源码如下:

    @Override
    public void forEach(Consumer<? super E> action) {
        Objects.requireNonNull(action);
        final int expectedModCount = modCount;
        @SuppressWarnings("unchecked")
        final E[] elementData = (E[]) this.elementData;
        final int size = this.size;
        for (int i=0; modCount == expectedModCount && i < size; i++) {
            action.accept(elementData[i]);
        }
        if (modCount != expectedModCount) {
            throw new ConcurrentModificationException();
        }
    }

在该方法中,内部会维护一个expectedModCount变量,赋值为modCount,在每次迭代过程中,迭代器会检查expectedModCount是否等于当前的modCount。如果不等,说明在迭代过程中ArrayList的结构发生了修改,迭代器会抛出ConcurrentModificationException异常。这种设计可以确保在多线程环境下,当一个线程修改ArrayList时,其他线程在迭代过程中可以立即发现这种修改,从而避免潜在的数据不一致问题。

再可以看下源码中modCount的注释,大意是:

modCount表示ArrayList自从创建以来结构上发生的修改次数。结构修改是指改变列表大小的修改,或者以其他方式扰乱列表,使正在进行的迭代可能产生不正确的结果。

modCount字段用于iteratorlistIterator方法返回的迭代器(或列表迭代器)。如果这个字段的值在迭代过程中发生意外的变化,迭代器(或列表迭代器)将在next、remove、previous、set或add操作时抛出ConcurrentModificationException异常。这提供了fail-fast(快速失败)行为,而不是在迭代过程中遇到并发修改时具有不确定性。

子类可以选择使用这个字段。如果子类希望提供fail-fast迭代器(和列表迭代器),那么它只需在其add(int, E)remove(int)方法(以及覆盖的任何其他导致列表结构修改的方法)中递增此字段。单次调用add(int, E)remove(int)应该在此字段上增加不超过1次,否则迭代器(和列表迭代器)将抛出虚假的ConcurrentModificationException。如果实现不希望提供fail-fast迭代器,可以忽略此字段。

2.2 分析2-线程安全问题

有个有趣的现象是,这个异常日志仅存在MQ应用中,这是为什么呢?

这其实是一个多线程问题。我们知道,static对象是在类加载时创建的全局对象,它们的生命周期与类的生命周期相同。static对象在程序启动时创建,在程序结束时销毁。这意味着static对象在多个线程之间共享的,可能存在线程安全问题。

翻回去仔细看下代码,可以看到MyMqConsumer定义的stateList是static类型的,是否是否存在线程安全问题呢?

在流量较低的情况下,多个消息不在同一时刻到达,每个线程处理消息将不会争夺static对象,所以不会有问题;

当流量较大情况下,有多个消息可能在同一时刻到达,每个线程处理过程中都会对stateList进行赋值,调用远程RPC接口,它们之间将会争夺static对象,可能存在问题。例如上图中右半部分,线程1还没有处理完消息1时,线程2就开始争抢,那么就可能使ArrayList中modCount != expectedModCount条件满足,从而抛出异常。

三、改进思考

3.1 本问题的优化

经过上述分析,已经清楚问题的产生原因了。对于本问题的优化,其实也比较简单。有如下两种方式可供选择:

1.  在MyMqConsumer调用RPC查询的入参,使用new List来替代原来的类中定义好的static对象;

2.  修改KeyUtil代码,浅拷贝传入的itemList,再进行排序

3.2 类似问题的发现和改进

本问题已经修复,那类似的问题是否可以避免或者减少,将是接下来值得思考的一个问题。为了减少这类问题发生,我结合平时工作过程中的几个阶段,认为可以从以下几个方面进行改进:

  • 开发

开发过程中,开发人员需要提升认知和水平,注意代码中可能存在的线程问题;注意编写单元测试,可以通过模拟多线程环境来检测潜在的问题。

  • 代码评审

开发完成的代码一定需要进行代码评审,评审过程中架构师需要发挥自己丰富的开发经验和较强的代码直觉,“火眼金睛”,发现代码中的漏洞;当然这对评审人员的要求很高,因为仅通过改动的几行代码发现问题确实是一件很有挑战的事情。如果要有一些自动化工具或者插件,则可以起到事半功倍的效果。这里其实我还没有调研相关的工具,如果有大佬有相关经验欢迎评论交流。

  • 测试

测试阶段除了验证正常的业务功能,还需要进行集成测试和性能测试。在集成测试中,将多个模块组合在一起,测试整个系统在多线程环境中的行为,有助于发现模块之间的交互问题。除了继承测试,有时还需要性能测试,性能测试可以发现潜在的竞争条件、死锁、资源争用等多线程问题。

四、小结

最后,我简单总结一下本文内容。本文主要记录和分析日志中的ConcurrentModificationException关键字报警,首先介绍了系统整体架构和关键代码;然后从ArrayList源码和线程安全两个方面分析问题产生原因,最后我提出了修复该问题的方案和类似问题的思考,希望对大家有帮助。