观察者模式重启线程

发布时间 2023-04-11 10:24:27作者: 風栖祈鸢

观察者模式重启线程

看代码的过程中发现了观察者模式用于重启线程的实例,就顺便研究了一下。

观察者模式

先引用介绍一下观察者模式:

意图: 定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新。

主要解决: 一个对象状态改变给其他对象通知的问题,而且要考虑到易用和低耦合,保证高度的协作。

何时使用: 一个对象(目标对象)的状态发生改变,所有的依赖对象(观察者对象)都将得到通知,进行广播通知。

如何解决: 使用面向对象技术,可以将这种依赖关系弱化。

关键代码: 在抽象类里有一个 ArrayList 存放观察者们。

观察者模式算是一种比较简单的设计模式了,主要就是一个思路:当被观察者发生改变时,通知它所有的观察者,从而观察者们可以做出对应的操作。在 Java 中,已经有观察者模式的支持类了,接下来也是基于这个类实现的。

观察者线程

现在的场景是这样:监控系统将获取到的监控数据保存到队列中,系统中的一个线程从队列中获取并处理监控信息,它不能也不可以停下来(007),否则会积压大量数据。但没法保证这个线程运行一直是正常的,当线程运行中遇到异常,它可能会中断、停止或是产生一些奇奇怪怪的行为。而为了保证这个信息处理线程的不间断运行,我们可以采用观察者模式实现(监工模式)。

先用简单的例子实现一下观察者模式的用法。

首先创建一个线程,它的工作就是从0到10打印数字:

public class BeObservered implements Runnable {
  
    private int count = 0;

    @Override
    public void run() {
        try {
            while (true) {
                Thread.sleep(500);
                System.out.println("当前线程名:" + Thread.currentThread().getName());
                System.out.println("当前线程ID:" + Thread.currentThread().getId());
                System.out.println("当前线程优先级:" + Thread.currentThread().getPriority());
                System.out.println("当前线程Count:" + count.get());

		count++;
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

现在的要求是,当这个线程打印到10的时候,需要把它砍掉,再开一个新的线程来打印。这里就可以通过观察者模式来实现:这个打印数字的线程就是被观察者,再创建一个观察者类,当这个线程打印到10的时候,通知观察者,观察者将这个线程停掉并重开一个即可。

说起来简单,做起来也简单,先把这个线程改造一下,继承被观察者类:

public class BeObservered extends Observable implements Runnable {

    // 坏,多个线程共用一个count,疯狂增加
    //private int count = 0;

    // 好,定义count为线程局部变量
    private ThreadLocal<Integer> count = ThreadLocal.withInitial(() -> 0);

    private volatile ThreadLocal<Boolean> flag = ThreadLocal.withInitial(() -> true);

    @Override
    public void run() {
        try {
            while (flag.get()) {
                Thread.sleep(500);
                System.out.println("当前线程名:" + Thread.currentThread().getName());
                System.out.println("当前线程ID:" + Thread.currentThread().getId());
                System.out.println("当前线程优先级:" + Thread.currentThread().getPriority());
                System.out.println("当前线程Count:" + count.get());

                if (count.get() >= 10) {
                    doBusiness();
                }
                count.set(count.get() + 1);
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private void doBusiness() {
        // 通知观察者的两步操作 不可缺少
        super.setChanged();
        notifyObservers();
    }

    public void setFlag(boolean flag){
        this.flag.set(flag);
    }
}

其中有几个改动,说明一下:

  1. BeObservered 继承了 Observable 类,它就是一个被观察者了;
  2. count 的类型从 int 变成了 ThreadLocal <Integer>,做出这个变化时还没有加入观察者停止之前线程的功能,而是另开一个线程继续打印。在这个场景下,原本的 count 不是线程私有变量,而是类变量,会被多个线程共享。导致当第一个线程打印到10时,通知观察者启动第二个线程,而第二个线程的count也是10,又会通知观察者...如此往复,被观察者阵营指数扩大,马上就要起义了。因此需要将 count 设置为线程私有变量,即 ThreadLocal,每个线程的 count 都是独立的,就不会出现新生的线程的 count 就是10的情况了。
  3. 增加 doBusiness 方法,其中先是将自己设置为已改变,并通知所有观察者。这两步操作缺一不可,因为如果不设置自身为已改变,是不会对观察者发出通知的。
  4. 增加 flag 变量,线程通过检测 flag 变量来执行打印任务。这个变量的设置是为了观察者能通过改变 flag 而终止线程的打印。

接着是线程的观察者的实现,也非常简单:

public class MeObserver implements Observer {

    private BeObservered beObservered;

    @Override
    public void update(Observable o, Object arg) {
        System.out.println("观察者:" + Thread.currentThread().getName() + "通知,开始处理...");
        beObservered.setFlag(false);

        BeObservered run = new BeObservered();
        run.addObserver(this);
        new Thread(run).start();

        setBeObservered(run);
    }

    public void setBeObservered(BeObservered beObservered){
        this.beObservered = beObservered;
    }
}

也简单说一下其中的几个地方:

  1. 观察者需要实现观察接口 Observer,实现了这个接口,它就能观察别人了;
  2. 维护一个被观察者变量 beObservered,表示当前被监工的线程;
  3. 观察者需要重写 update 方法,当观察者收到被观察者发生改变的通知后,会执行 update 方法,在这里,我们将维护的线程,也就是执行到10的线程停掉,并创建一个新线程,将自己加入到新线程的观察者列表中,并维护这个新的被观察者。

如此一来,观察者就能一直监督一个线程从0工作到10,这个线程工作完成后再换下一个线程,卸磨杀驴,疯狂产出,无穷无尽...感觉这个例子有点奇怪的含义,当时写的时候没想到,现在越想越不对劲勒。

解决问题

回到监控系统中,同理,只需要为处理监控信息的线程创建一个观察者,当处理线程抛出异常时,通知观察者,观察者将之前的线程停掉,另开一个线程继续处理即可。这样就实现了线程的自动重启,保证了线程处理信息不中断。

参考代码,被观察者,信息处理线程:

public class AlarmRecordThread extends Observable implements  Runnable {

    protected IServicePreAlarm servicePreAlarm;
    private Logger log = LoggerFactory.getLogger(AlarmRecordThread.class);

    public AlarmRecordThread(IServicePreAlarm servicePreAlarm){
        this.servicePreAlarm = servicePreAlarm;
    }

    public void doBusiness(){
        super.setChanged();
        notifyObservers();
    }

    @Override
    public void run() {
        while(true){
            try{
                AlarmHandlerDto dto = AlarmDtoQueue.queue.poll();
                //log.info("queueSize:" + AlarmDtoQueue.queue.size());
                if(dto!=null){
                    servicePreAlarm.singleHandler(dto);
                }else{
                    Thread.sleep(500);//200
                }
            }catch (Exception e) {
                doBusiness();
                log.error("---------警报处理线程异常--------:",e);
                break;
            }

        }

    }

}

参考代码,观察者,负责重启信息处理线程:

@Component
public class ARThreadListener implements Observer {

    @Autowired
    IServicePreAlarm serviceMysql;
    private Logger log = LoggerFactory.getLogger(ARThreadListener.class);

    @Override
    public void update(Observable o, Object arg) {
        AlarmRecordThread run = new AlarmRecordThread(serviceMysql);
        run.addObserver(this);
        new Thread(run).start();
        log.info("--------警报处理线程重启成功--------");

    }
}

这里的代码没把之前的线程停掉,感觉会有问题。。。