guava eventbus代码分析(二)

时间:2022-02-25 20:46:48

---恢复内容开始---

我们分析下EventBus的核心方法 post方法,直接贴代码

1 public void post(Object event) {
2 Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
3 if (eventSubscribers.hasNext()) {
4 dispatcher.dispatch(event, eventSubscribers);
5 } else if (!(event instanceof DeadEvent)) {
6 // the event had no subscribers and was not itself a DeadEvent
7 post(new DeadEvent(this, event));
8 }
9 }

第2行,从subscribers中得到一个事件的所有监听者subscriber列表,第4行,调用dispatcher的dispatch方法通知所有的监听者,

第7行,如果当前事件没有任何监听者,则发送一个DeadEvent用于记录日志。

我们跟进第4行的dispatch方法:

guava eventbus代码分析(二)

dispatcher方法有3个实现类,主要用到的是后面两个,LegacyAsyncDispatcher,这是一个多线程分发器,是配合 AsyncEventBus 使用的;PerThreadQueuedDispather,是一个单线程分发器。

其中,这个单线程分发器实际上和当前eventBus 的post方法用的是同一个线程(并没有启用新的线程),而且单线程分发器能保证事件的执行顺序和post的顺序完全一致(即广度优先遍历)

那么按照顺序来看一下这两个分发器的dispatch 方法,

LegacyAsyncDispatcher的dispatch方法:

 1 @Override
2 void dispatch(Object event, Iterator<Subscriber> subscribers) {
3 checkNotNull(event);
4 while (subscribers.hasNext()) {
5 queue.add(new EventWithSubscriber(event, subscribers.next()));
6 }
7
8 EventWithSubscriber e;
9 while ((e = queue.poll()) != null) {
10 e.subscriber.dispatchEvent(e.event);
11 }
12 }

第3行,checkNotNull方法 import static 方式引入的,这种校验参数的写法被很多开发者使用

接下来是两个串行的while循环

第4-6行,一个ConcurrentLinkedQueue线程安全队列,把 EventWithSubscriber 往这个队列里面扔

第9-11行,从这个队列里面取数据,然后调用subscriber的dispatchEvent方法

这个地方为什么搞1个队列,而不是直接dispatchEvent呢?

 我看了一下这里,这个队列的作用能够使不同线程 “比较均衡的 从队列里面取数据进行分发”,我举个例子,有一个线程A post一个时间对象,

这个对象只有两个监听者 subcriber,另一线程B post另外一个对象,有100个subscriber, 这两个线程如果并发进行,总计往队列里面放了102个 EventWithSubscriber

虽然A线程里面的subcriber少,但是由于俩线程同时从队列里面取数据,着俩线程会在相近的时刻执行完整个dispatch方法,所以施一种均衡策略,

可能有深层次的原因用这个队列,我暂时看不出来。

然后我们跟进第10行, 看下subscriber的 dispatchEvent方法:

 1 /**
2 * Dispatches {@code event} to this subscriber using the proper executor.
3 */
4 final void dispatchEvent(final Object event) {
5 executor.execute(new Runnable() {
6 @Override
7 public void run() {
8 try {
9 invokeSubscriberMethod(event);
10 } catch (InvocationTargetException e) {
11 bus.handleSubscriberException(e.getCause(), context(event));
12 }
13 }
14 });
15 }

就是使用线程池多线程来执行订阅者的方法,AsyncEventBus 使用的线程池是用户自定义的线程池,一般使用 jdk Executors工具类里面创建的ExecutorService作为线程池

再看一下上面代码第9行,执行订阅者的方法,一直跟进到 Subscriber里面的代码:

 1 @VisibleForTesting
2 void invokeSubscriberMethod(Object event) throws InvocationTargetException {
3 try {
4 method.invoke(target, checkNotNull(event));
5 } catch (IllegalArgumentException e) {
6 throw new Error("Method rejected target/argument: " + event, e);
7 } catch (IllegalAccessException e) {
8 throw new Error("Method became inaccessible: " + event, e);
9 } catch (InvocationTargetException e) {
10 if (e.getCause() instanceof Error) {
11 throw (Error) e.getCause();
12 }
13 throw e;
14 }
15 }

第4行,就是反射调用,method 和 target都是 在执行EventBus的post的方法的时候,new Subscriber的时候作为构造参数传入的

结束,下一篇我们再来看单线程的PerThreadQueuedDispather