Guava EventBus的具体使用以及源码解析

发布时间 2023-06-14 00:37:56作者: knqiufan

使用Guava EventBus对系统进行异步解耦改造

一、背景

最近在写的项目里,在使用定时器进行自动任务下派时,发现之前写的程序中将包括启动流程、地图更新、发送短信、效能计算等操作全部集成在同一个方法中同步执行,当任务量大时执行效率很低,且”一荣俱荣一损俱损“,即只要有一个操作执行失败,整条任务流程下派失败,可用性低。

为此使用GuavaEventBus组件对该模块代码进行重构,借此机会也对EventBus进行了学习,并进行一个分享。分为以下几个部分:

  • 简述:Guava EventBus的简单介绍
  • 项目重构过程:存在问题以及解决方案代码分享
  • 原理:Guava EventBus原理详解与源码解析
  • 问题解答
  • 总结

二、Guava EventBus简述

EventBusGuava中的一个处理组件间通信的事件总线,是观察者模式的一种实现。相比于MQ更加简洁,轻量,使事件生产者和事件消费者实现解耦分离。

1. 为什么使用事件总线?

当一个事件的发生(事件生产者),需要触发很多其他的事件(事件消费者)时,一般做法是在事件生产者中分别去调用其他事件,这样往往是很浪费资源的。事件生产者和事件消费者产生了极大的耦合。如果需要改动一个事件消费者,很可能还需要改动到事件生产者。

所以这个时候就可以用到事件总线了,利用EventBus对事件进行统一管理,并使用异步的方式来发送事件。

2. 使用场景

在分布式场景下,组件与组件之间实现通信,使用异步的方式来发送事件或触发另一个动作,经常用到的框架是MQ消息队列。如果是在同一个JVM中进行事件通知的话,就可以使用到EventBus,优点是简单、轻量、便捷。

3. 三个关键点

在这里放一张很经典的EventBus结构图:

可以看到有三个关键组成部分:

  • Publisher 发布者

    事件发布者,就是将事件发送(post)到EventBus事件总线的一方。

    事件发布者可以在程序的任何地方使用post方法将事件发送给EventBus事件总线,EventBus再将事件发送给订阅者们。

  • Event 事件

    EventEventBus通信的基本单位,一个Event可以是任何类型。简单来说,Event就是Object,可以将任意一个Bean作为事件。

  • Subscriber 订阅者

    事件订阅者,就是接受事件的一方,这些订阅者需要在自己的方法上添加 @Subscribe 注解来声明自己是一个事件订阅者,并将自己所在的类注册到EventBus中,让EventBus可以扫描到。

4. EventBus与AsyncEventBus

EventBus是同步事件总线:

  • 同步执行,事件发送方在发出事件之后,会等待所有的事件消费方执行完毕后,才会回来继续执行自己后面的代码。
  • 事件发送方和事件消费方会在同一个线程中执行,消费方的执行线程取决于发送方。
  • 同一个事件的多个订阅者,在接收到事件的顺序上面有不同。谁先注册到EventBus的,谁先执行,如果是在同一个类中的两个订阅者一起被注册到EventBus的情况,收到事件的顺序跟方法名有关。

AsyncEventBus是异步事件总线:

  • 异步执行,事件发送方异步发出事件,不会等待事件消费方是否收到,直接执行自己后面的代码。
  • 在定义AsyncEventBus时,构造函数中会传入一个线程池。事件消费方收到异步事件时,消费方会从线程池中获取一个新的线程来执行自己的任务。
  • 同一个事件的多个订阅者,它们的注册顺序跟接收到事件的顺序上没有任何联系,都会同时收到事件,并且都是在新的线程中,异步并发的执行自己的任务。

5. SubscriberExceptionHandler

如果在发生异常时该怎么处理?

无论在EventBus还是AsyncEventBus都可以传入自定义的SubscriberExceptionHandler。该handler会在出现异常时被调用,可以从Throwable参数中获取到异常信息,从SubscriberExceptionContext参数中获取到消息信息进行特定的处理。

SubscriberExceptionHandler接口声明为:

public interface SubscriberExceptionHandler {
    void handleException(Throwable var1, SubscriberExceptionContext var2);
}

我们可以实现这个接口实现自定义异常处理,如:

public class EventBusUtil implements SubscriberExceptionHandler {
    @Override
    public void handleException(Throwable throwable, SubscriberExceptionContext subscriberExceptionContext) {
        log.error("eventBus handler exception", throwable);
    }
}

三、项目消息通信重构过程

1. 存在问题

任务下派包含以下几个执行:

  • 启动流程
  • 位置落图
  • 启动效能
  • 发送短信
  • 记录日志
  1. 以上所有执行都在一个方法内同步进行,任务量大时执行效率低;
  2. 所有执行强依赖,耦合度过高,只要有一个执行失败,整个方法失败,任务下派失败,可用性低。

2. 解决方案

采用观察者模式,对每个事件进行异步解耦,流程启动成功后发布消息,其他操作订阅消息。提高运行性能和可用性。

3. 具体重构过程

封装GuavaEventBus发布订阅事件总线处理组件实现。

  • 在pom中引入guava包
<!-- Google Guava -->
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>30.0-jre</version>
</dependency>
  • 创建EventBus抽象类AbstractSpringEventBus,实现ApplicationContextAware接口,通过ApplicationContextAware这个上下文环境得到Spring容器的Bean。这里是为了获取实现IEventConsumer订阅接口的Bean
public abstract class AbstractSpringEventBus implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        scanConsumer(applicationContext);
    }

    /**
     * 事件发布
     *
     * @param event 事件
     */
    public abstract void post(Object event);

    /**
     * 注册
     *
     * @param event 事件
     */
    public abstract void registerConsumer(Object event);

    /**
     * 取消注册
     *
     * @param event 事件
     */
    public abstract void unregisterConsumer(Object event);

    /**
     * 扫描事件
     *
     */
    public void scanConsumer(ApplicationContext applicationContext) {
        applicationContext.getBeansOfType(IEventConsumer.class).forEach((k, v) -> this.registerConsumer(v));
    }
}
  • 创建订阅者接口。之后所有需要进行消息订阅的消费者都需要实现此接口。
public interface IEventConsumer<T> {

    /**
     * 消费订阅事件
     *
     * @param t 实体
     */
    void post(T t);
}
  • 创建EventBus执行工具类。采用AsyncEventBus调度器,指定线程池异步分发事件,同一个事件的多个订阅者,它们的注册顺序跟接收到事件的顺序上没有任何联系,都会同时收到事件,并且都是在新的线程中,异步并发的执行自己的任务。
@Slf4j
@Component
public class EventBusUtil extends AbstractSpringEventBus implements SubscriberExceptionHandler {

    private final EventBus eventBus;

    /**
     * 创建线程池
     */
    public EventBusUtil() {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                5,
                20,
                3000,
                TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(100),
                new ThreadPoolExecutor.CallerRunsPolicy());
        eventBus = new AsyncEventBus(executor, this);
    }

    @Override
    public void post(Object event) {
        eventBus.post(event);
    }

    @Override
    public void registerConsumer(Object event) {
        eventBus.register(event);
    }

    @Override
    public void unregisterConsumer(Object event) {
        eventBus.unregister(event);
    }

    @Override
    public void handleException(Throwable throwable, SubscriberExceptionContext subscriberExceptionContext) {
        log.error("eventBus handler exception", throwable);
    }
}
  • 创建订阅者,实现IEventConsumer接口。在方法上加上@Subscribe注解,进行事件监听。
/**
 * 订阅者 - 消息提醒操作
 *
 * @author huangyuqiu
 * @version 1.0.0
 * @date 2023/5/30 16:51
 */
@Slf4j
@Component
public class AppPushConsumer implements IEventConsumer<HcajEntity> {

    @Resource
    private HcajService hcajService;

    @Subscribe
    @Override
    public void post(HcajEntity hcajEntity) {
        hcajService.push(hcajEntity, BizTypeEnum.getByCode(hcajEntity.getBizType()));
    }
}

/**
 * 订阅者 - 地图更新操作
 *
 * @author huangyuqiu
 * @version 1.0.0
 * @date 2023/5/30 16:50
 */
@Slf4j
@Component
public class MapUpdateConsumer implements IEventConsumer<HcajEntity> {

    @Resource
    HcajMapUpdateLogService hcajMapUpdateLogService;

    @Subscribe
    @Override
    public void post(HcajEntity hcajEntity) {
        // 更新地图,并保存更新日志
        hcajMapUpdateLogService.addToMap(hcajEntity);
    }
}

/**
 * 订阅者 - 效能计算
 *
 * @author huangyuqiu
 * @version 1.0.0
 * @date 2023/5/30 16:51
 */
@Slf4j
@Component
public class EffConsumer implements IEventConsumer<HcajEntity> {

    @Resource
    private EffUtil effUtil;

    @Subscribe
    @Override
    public void post(HcajEntity hcajEntity) {
        // 保存效能数据
        effUtil.saveEff(hcajEntity.getActBusiness(), hcajEntity.getBizStartTime());
    }
}
  • 业务中执行操作,并进行消息发布
@Resource
private EventBusUtil eventBusUtil;

/**
 * 业务执行操作
 */
protected void apply() {
    // ... 省略部分代码 ...
    
    // 自动发起流程
    processService.apply();
    // 异步消息发布:更新地图日志服务 / 保存效能数据服务 / 消息发送服务
    eventBusPost(hcajEntity);
    
    // ... 省略部分代码 ...
}

/**
 * 异步消息发布:更新地图日志服务 / 保存效能数据服务 / 消息发送服务
 */
private void eventBusPost(HcajEntity hcajEntity) {
    // 发布消息:更新地图日志服务 / 保存效能数据服务 / 消息发送服务
    eventBusUtil.post(hcajEntity);
}

运行程序,当流程发起成功之后,可以看到更新地图日志服务、保存效能数据服务、消息发送服务成功订阅并消费了消息,异步解耦改造成功。

四、Guava EventBus原理详解与源码解析

EventBus中最重要的就是这么几个步骤:初始化、注册、发布、注销。接下来逐一解析EventBus的源码。

先放一张UML图。

可以看出其实关联挺少的。

1. 初始化

1.1. EventBus

EventBus最主要最核心的初始化方法如下:

// EventBus的唯一标识,默认为default
private final String identifier;
// 线程处理器 默认是 MoreExecutors.directExecutor()
private final Executor executor;
// 异常处理机制
private final SubscriberExceptionHandler exceptionHandler;
// 消息分发队列 默认是 Dispatcher.perThreadDispatchQueue()
private final Dispatcher dispatcher;

EventBus(
  String identifier,
  Executor executor,
  Dispatcher dispatcher,
  SubscriberExceptionHandler exceptionHandler) {
this.identifier = checkNotNull(identifier);
this.executor = checkNotNull(executor);
this.dispatcher = checkNotNull(dispatcher);
this.exceptionHandler = checkNotNull(exceptionHandler);
}

1.2. AsyncEventBus

查看AsyncEventBus的源码可知,AsyncEventBus继承自EventBus,其构造函数最终还是调用的EventBus的构造函数。如下:

@Beta
public class AsyncEventBus extends EventBus {

  /**
   * 创建一个新的AsyncEventBus,用于分配事件。identifier用于日志记录的总线名称
   *
   * @param identifier 总线的名称,用于日志记录。
   * @param executor 用于分配事件的执行线程。在最后一个事件被发布到此事件总线之后,调用方有责任关闭执行器。
   */
  public AsyncEventBus(String identifier, Executor executor) {
    super(identifier, executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
  }

  /**
   * 创建一个新的AsyncEventBus,用于分配事件
   *
   * @param executor 用于分配事件的执行线程。在最后一个事件被发布到此事件总线之后,调用方有责任关闭执行器。
   * @param subscriberExceptionHandler 用于处理从订阅者那边抛出的异常处理
   * @since 16.0
   */
  public AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) {
    super("default", executor, Dispatcher.legacyAsync(), subscriberExceptionHandler);
  }

  /**
   * 创建一个新的AsyncEventBus,用于分配事件
   *
   * @param executor 用于分配事件的执行线程。在最后一个事件被发布到此事件总线之后,调用方有责任关闭执行器。
   */
  public AsyncEventBus(Executor executor) {
    super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
  }
}

2. 注册(register)

  1. EventBus中使用register方法对订阅者进行注册。源码如下:
/**
* 注册对象上的所有订阅者方法,以便接收事件
* 
* @param object 该被注册的订阅方法所在对象
*/
public void register(Object object) {
	subscribers.register(object);
}

可以看到是调用了 SubscriberRegistry 注册器的register方法。

  1. 查看 register 方法:
void register(Object listener) {
    // 按照所订阅的事件类型分组返回所有订阅者的订阅方法
    Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);

    for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
      // 事件类型
      Class<?> eventType = entry.getKey();
      // 订阅者中的事件方法 
      Collection<Subscriber> eventMethodsInListener = entry.getValue();
      
      // 获取当前事件类型的所有订阅者
      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);

      if (eventSubscribers == null) {
        CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
        eventSubscribers =
            MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
      }

      eventSubscribers.addAll(eventMethodsInListener);
    }
}

可以看到在这个方法中,首先会去获取当前订阅者里的所有订阅方法,并根据事件类型(eventType)分组后返回。之后遍历分组,根据事件类型将订阅的方法逐一加入 subscribers 注册器。

通过代码上下文可知,subscribers 注册器的本质其实就是一个 ConcurrentMap

/**
* 所有注册的订阅者,索引是事件类型
*
* <p>The {@link CopyOnWriteArraySet} values make it easy and relatively lightweight to get an
* immutable snapshot of all current subscribers to an event without any locking.
*/
private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers = Maps.newConcurrentMap();

注册本质上就是将订阅的方法加入到ConcurrentMap 中去。

另外,还可以看到 findAllSubscribers 方法,是将事件按照类型返回。方法源码如下:

  /**
   * 按照所订阅的事件类型分组返回所有订阅者的订阅方法
   */
  private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
    Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
    Class<?> clazz = listener.getClass();
    for (Method method : getAnnotatedMethods(clazz)) {
      Class<?>[] parameterTypes = method.getParameterTypes();
      Class<?> eventType = parameterTypes[0];
      methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
    }
    return methodsInListener;
  }

代码很好看懂,获取需要注册的类的带着订阅注解的方法,之后遍历分组。主要是 getAnnotatedMethods 方法。源码如下:

  private static ImmutableList<Method> getAnnotatedMethods(Class<?> clazz) {
    try {
      return subscriberMethodsCache.getUnchecked(clazz);
    } catch (UncheckedExecutionException e) {
      throwIfUnchecked(e.getCause());
      throw e;
    }
  }

可以看到其实是通过subscriberMethodsCache这个缓存对象实例来获取。继续通过读上下文的代码,查看subscriberMethodsCache的代码:


  private static final LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache =
      CacheBuilder.newBuilder()
          .weakKeys()
          .build(
              new CacheLoader<Class<?>, ImmutableList<Method>>() {
                @Override
                public ImmutableList<Method> load(Class<?> concreteClass) throws Exception {
                  return getAnnotatedMethodsNotCached(concreteClass);
                }
              });

重点关注getAnnotatedMethodsNotCached方法。继续阅读getAnnotatedMethodsNotCached的源码,如下:

  private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {
    Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();
    Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();
    for (Class<?> supertype : supertypes) {
      for (Method method : supertype.getDeclaredMethods()) {
        if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) {
          // TODO(cgdecker): Should check for a generic parameter type and error out
          Class<?>[] parameterTypes = method.getParameterTypes();
          checkArgument(
              parameterTypes.length == 1,
              "Method %s has @Subscribe annotation but has %s parameters. "
                  + "Subscriber methods must have exactly 1 parameter.",
              method,
              parameterTypes.length);

          checkArgument(
              !parameterTypes[0].isPrimitive(),
              "@Subscribe method %s's parameter is %s. "
                  + "Subscriber methods cannot accept primitives. "
                  + "Consider changing the parameter to %s.",
              method,
              parameterTypes[0].getName(),
              Primitives.wrap(parameterTypes[0]).getSimpleName());

          MethodIdentifier ident = new MethodIdentifier(method);
          if (!identifiers.containsKey(ident)) {
            identifiers.put(ident, method);
          }
        }
      }
    }
    return ImmutableList.copyOf(identifiers.values());
  }

到这里就可以很清楚的看到,这个方法中获取了带Subscribe注解的方法,并将它封装进ImmutableList这个集合中并返回,加入到缓存中。之后在方法中获取缓存里已经存在的订阅方法,注册进ConcurrentMap

至此,注册方法的源码解析完毕。

3. 注销(unregister)

解析完注册的源码,那么注销的源码就很简单了,就是将订阅方法从ConcurrentMap中删除。源码如下:

  void unregister(Object listener) {
    Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);

    for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
      Class<?> eventType = entry.getKey();
      Collection<Subscriber> listenerMethodsForType = entry.getValue();

      CopyOnWriteArraySet<Subscriber> currentSubscribers = subscribers.get(eventType);
      if (currentSubscribers == null || !currentSubscribers.removeAll(listenerMethodsForType)) {
        // if removeAll returns true, all we really know is that at least one subscriber was
        // removed... however, barring something very strange we can assume that if at least one
        // subscriber was removed, all subscribers on listener for that event type were... after
        // all, the definition of subscribers on a particular class is totally static
        throw new IllegalArgumentException(
            "missing event subscriber for an annotated method. Is " + listener + " registered?");
      }

      // don't try to remove the set if it's empty; that can't be done safely without a lock
      // anyway, if the set is empty it'll just be wrapping an array of length 0
    }
  }

如果删除之后返回true,表示至少有一个订阅者被注销了。

注销的代码很简单,就不多做探究。

4. 发布(post)

注册完订阅者之后,就可以在想要发布的地方使用post来发布消息了。post的源码如下:

  /**
   * 给所有注册过的订阅者发布事件。这个方法会在事件发布给所有订阅者之后成功返回,
   * 不管订阅者们抛出了什么异常。
   *
   * 如果没有订阅者订阅当前event事件,且该event事件不是DeadEvent,它将会被封装为DeadEvent之后重新发布。
   *
   * @param event 需要发布的事件
   */
  public void post(Object event) {
    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
    if (eventSubscribers.hasNext()) {
      dispatcher.dispatch(event, eventSubscribers);
    } else if (!(event instanceof DeadEvent)) {
      // the event had no subscribers and was not itself a DeadEvent
      post(new DeadEvent(this, event));
    }
  }

从上述代码中可以看到,先使用SubscriberRegistry的实例对象获取当前事件类型的所有订阅者,并返回一个迭代器。如果存在订阅者则调用dispatcher.dispatch方法,否则作为DeadEvent类型重新进行发布。

DeadEvent是比较特殊的事件类型,注释源码是这么定义的:

Wraps an event that was posted, but which had no subscribers and thus could not be delivered.
Registering a DeadEvent subscriber is useful for debugging or logging, as it can detect misconfigurations in a system's event distribution.

解释一下说就是,DeadEvent用于封装因为没有订阅者而无法送达的事件。注册DeadEvent时间对于调试和日志记录是很有用的,因为它可以检测一些系统事件分派的错误配置问题。

Dispatcher是将事件分派给订阅者的处理程序,是一个抽象类,不同的EventBus会调用不同类的dispatch实现。

EventBus使用的是PerThreadQueuedDispatcher类的实现,AsyncEventBus使用的是LegacyAsyncDispatcher类的实现。我们一一来看。

4.1.1. PerThreadQueuedDispatcher

EventBus默认情况下Dispatcher实现的都是PerThreadQueuedDispatcher


  private static final class PerThreadQueuedDispatcher extends Dispatcher {

    // 这是 EventBus 默认的转发器,可以翻译作:"每个线程单独设置一个队列"转发器。

    /** 事件调度的线程队列。定义一个 ThreadLocal 线程私有对象,每次获取的时候都能够获得一个队列 */
    private final ThreadLocal<Queue<Event>> queue =
        new ThreadLocal<Queue<Event>>() {
          @Override
          protected Queue<Event> initialValue() {
            return Queues.newArrayDeque();
          }
        };

    /**  线程私有对象,用于保存每个线程的转发状态,防止事件被重复转发 */
    private final ThreadLocal<Boolean> dispatching =
        new ThreadLocal<Boolean>() {
          @Override
          protected Boolean initialValue() {
            return false;
          }
        };

    @Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      checkNotNull(event);
      checkNotNull(subscribers);
      // 获取线程私有的队列
      Queue<Event> queueForThread = queue.get();
      // 往队列写入需要被转发的 Event(事件本身+监听者们)
      queueForThread.offer(new Event(event, subscribers));

      if (!dispatching.get()) {
        dispatching.set(true);
        try {
          Event nextEvent;
          
          while ((nextEvent = queueForThread.poll()) != null) {
            // 使用迭代器 遍历执行队列中的事件
            while (nextEvent.subscribers.hasNext()) {
              nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
            }
          }
        } finally {
          dispatching.remove();
          queue.remove();
        }
      }
    }

上面的代码很好理解,先检查事件和订阅者迭代器是否为空,之后将事件打包放入队列中,判断当前线程是否被调度如,如果没有,则从队列中获取事件,执行订阅者类subscribersdispatchEvent方法。

dispatchEvent就是真正执行事件的方法,源码如下:

  /** Dispatches {@code event} to this subscriber using the proper executor. */
  final void dispatchEvent(final Object event) {
    executor.execute(
        new Runnable() {
          @Override
          public void run() {
            try {
              invokeSubscriberMethod(event);
            } catch (InvocationTargetException e) {
              bus.handleSubscriberException(e.getCause(), context(event));
            }
          }
        });
  }

  /**
   * Invokes the subscriber method. This method can be overridden to make the invocation
   * synchronized.
   */
  @VisibleForTesting
  void invokeSubscriberMethod(Object event) throws InvocationTargetException {
    try {
      method.invoke(target, checkNotNull(event));
    } catch (IllegalArgumentException e) {
      throw new Error("Method rejected target/argument: " + event, e);
    } catch (IllegalAccessException e) {
      throw new Error("Method became inaccessible: " + event, e);
    } catch (InvocationTargetException e) {
      if (e.getCause() instanceof Error) {
        throw (Error) e.getCause();
      }
      throw e;
    }
  }

可以看到,dispatchEvent方法其实就是通过反射来执行了订阅方法。

4.1.2. LegacyAsyncDispatcher

如果使用的是AsyncEventBus,那么就会走LegacyAsyncDispatcher类的方法实现。

  /** Implementation of a {@link #legacyAsync()} dispatcher. */
  private static final class LegacyAsyncDispatcher extends Dispatcher {

    // 在多线程环境下无法保证事件执行的顺序性。

    /** 全局事件队列 */
    private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
        Queues.newConcurrentLinkedQueue();

    @Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      checkNotNull(event);
      while (subscribers.hasNext()) {
        queue.add(new EventWithSubscriber(event, subscribers.next()));
      }

      EventWithSubscriber e;
      while ((e = queue.poll()) != null) {
        e.subscriber.dispatchEvent(e.event);
      }
    }

    private static final class EventWithSubscriber {
      private final Object event;
      private final Subscriber subscriber;

      private EventWithSubscriber(Object event, Subscriber subscriber) {
        this.event = event;
        this.subscriber = subscriber;
      }
    }
  }

LegacyAsyncDispatcher的代码与PerThreadQueuedDispatcher的区别不是很大,最终也是调用的dispatchEvent方法反射执行订阅方法,区别在于LegacyAsyncDispatcher使用了线程安全队列,以及可以自定义多线程环境。

五、总结

在项目中存在一些需要进行异步解耦处理事件的场景时,GuavaEventBus就能很好的派上用场,且其源码十分优雅漂亮,用到了很多设计模式,API简单,使用也很简单,是一个很好的可供学习的工具。