作者:京东科技 刘子洋
背景
最近项目出现同一消息发送多次的现象,对下游业务方造成困扰,经过排查发现使用EventBus方式不正确。也借此机会学习了下EventBus并进行分享。以下为分享内容,本文主要分为五个部分,篇幅较长,望大家耐心阅读。
- 1、简述:简单介绍EventBus及其组成部分。
- 2、原理解析:主要对listener注册流程及Event发布流程进行解析。
- 3、使用指导:EventBus简单的使用指导。
- 4、注意事项:在使用EventBus中需要注意的一些隐藏逻辑。
- 5、分享时提问的问题
- 6、项目中遇到的问题:上述问题进行详细描述并复现场景。
1、简述
1.1、概念
下文摘自EventBus源码注释,从注释中可以直观了解到他的功能、特性、注意事项。
【源码注释】
Dispatches events to listeners, and provides ways for listeners to register themselves.
The EventBus allows publish-subscribe-style communication between components without requiring the components to explicitly register with one another (and thus be aware of each other). It is designed exclusively to replace traditional Java in-process event distribution using explicit registration. It is not a general-purpose publish-subscribe system, nor is it intended for interprocess communication.
Receiving Events
To receive events, an object should:
- Expose a public method, known as the event subscriber, which accepts a single argument of the type of event desired;
- Mark it with a Subscribe annotation;
- Pass itself to an EventBus instance's register(Object) method.
Posting Events
To post an event, simply provide the event object to the post(Object) method. The EventBus instance will determine the type of event and route it to all registered listeners.
Events are routed based on their type — an event will be delivered to any subscriber for any type to which the event is assignable. This includes implemented interfaces, all superclasses, and all interfaces implemented by superclasses.
When post is called, all registered subscribers for an event are run in sequence, so subscribers should be reasonably quick. If an event may trigger an extended process (such as a database load), spawn a thread or queue it for later. (For a convenient way to do this, use an AsyncEventBus.)
Subscriber Methods
Event subscriber methods must accept only one argument: the event.
Subscribers should not, in general, throw. If they do, the EventBus will catch and log the exception. This is rarely the right solution for error handling and should not be relied upon; it is intended solely to help find problems during development.
The EventBus guarantees that it will not call a subscriber method from multiple threads simultaneously, unless the method explicitly allows it by bearing the AllowConcurrentEvents annotation. If this annotation is not present, subscriber methods need not worry about being reentrant, unless also called from outside the EventBus.
Dead Events
If an event is posted, but no registered subscribers can accept it, it is considered "dead." To give the system a second chance to handle dead events, they are wrapped in an instance of DeadEvent and reposted.
If a subscriber for a supertype of all events (such as Object) is registered, no event will ever be considered dead, and no DeadEvents will be generated. Accordingly, while DeadEvent extends Object, a subscriber registered to receive any Object will never receive a DeadEvent.
This class is safe for concurrent use.
See the Guava User Guide article on EventBus.
Since:
10.0
Author:
Cliff Biffle
1.2、系统流程
1.3、组成部分
1.3.1、调度器
EventBus、AsyncEventBus都是一个调度的角色,区别是一个同步一个异步。
- EventBus
- AsyncEventBus
1.3.2、事件承载器
- Event
- DeadEvent
1.3.3、事件注册中心
SubscriberRegistry
1.3.4、事件分发器
Dispatcher
Dispatcher有三个子类,用以满足不同的分发情况
1.PerThreadQueuedDispatcher
2.LegacyAsyncDispatcher
3.ImmediateDispatcher
1.3.4、订阅者
- Subscriber
- SynchronizedSubscriber
2、原理解析
2.1、主体流程
- listener 通过EventBus进行注册。
- SubscriberRegister 会根据listener、listener中含有【@Subscribe】注解的方法及各方法参数创建Subscriber 对象,并将其维护在Subscribers(ConcurrentMap类型,key为event类对象,value为subscriber集合)中。
- publisher发布事件Event。
- 发布Event后,EventBus会从SubscriberRegister中查找出所有订阅此事件的Subscriber,然后让Dispatcher分发Event到每一个Subscriber。
流程如下:
2.2、listener注册原理
2.2.1、listener注册流程
- 缓存所有含有@Subscribe注解方法到subscriberMethodsCache(LoadingCache<Class<?>, ImmutableList>, key为listener,value为method集合)。
- listener注册。
2.2.2、原理分析
- 获取含有@Subscribe注释的方法进行缓存
找到所有被【@Subscribe】修饰的方法,并进行缓存
注意!!!这两个方法被static修饰,类加载的时候就进行寻找
订阅者唯一标识是【方法名+入参】
- 注册订阅者
1.注册方法
创建Subscriber时,如果method含有【@AllowConcurrentEvents】注释,则创建SynchronizedSubscriber,否则创建Subscriber
2、获取所有订阅者
3、从缓存中获取所有订阅方法
2.3、Event发布原理
2.3.1、发布主体流程
- publisher 发布事件Event。
- EventBus 根据Event 类对象从SubscriberRegistry中获取所有订阅者。
- 将Event 和 eventSubscribers 交由Dispatcher去分发。
- Dispatcher 将Event 分发给每个Subscribers。
- Subscriber 利用反射执行订阅者方法。
图中画出了三个Dispatcher的分发原理。
2.3.2、原理分析
- 创建缓存
- 缓存EventMsg所有超类
注意!!!此处是静态方法,因此在代码加载的时候就会缓存Event所有超类。 - 发布Event事件
- 此方法是发布事件时调用的方法。
- 获取所有订阅者
- 1、从缓存中获取所有订阅者
2、获取Event超类
- 事件分发
- 1、分发入口
- 2、分发器分发
- 2.1、ImmediateDispatcher
- 来了一个事件则通知对这个事件感兴趣的订阅者。
- 2.2、PerThreadQueuedDispatcher(EventBus默认选项)
- 在同一个线程post的Event执行顺序是有序的。用ThreadLocal queue来实现每个线程的Event有序性,在把事件添加到queue后会有一个ThreadLocal dispatching来判断当前线程是否正在分发,如果正在分发,则这次添加的event不会马上进行分发而是等到dispatching的值为false(分发完成)才进行。
- 源码如下:
- 2.3、LegacyAsyncDispatcher(AsyncEventBus默认选项)
- 会有一个全局的队列ConcurrentLinkedQueue queue保存EventWithSubscriber(事件和subscriber),如果被不同的线程poll,不能保证在queue队列中的event是有序发布的。源码如下:
- 执行订阅者方法
- 方法入口是dispatchEvent,源码如下:
- 由于Subscriber有两种,因此执行方法也有两种:
- 1.Subscriber(非线程安全)
- 2.SynchronizedSubscriber(线程安全)
- 注意!!!执行方法会加同步锁
3、使用指导
3.1、主要流程
3.2、流程详解
- 1、创建EventBus、AsyncEventBus Bean
- 在项目中统一配置全局单例Bean(如特殊需求,可配置多例)
- 2、定义EventMsg
- 设置消息载体。
- 3、注册Listener
- 注册Listener,处理事件
- 注意! 在使用 PostConstruct注释进行注册时,需要注意子类会执行父类含有PostConstruct 注释的方法。
- 3、事件发布
- 封装统一发布事件的Bean,然后通过Bean注入到需要发布的Bean里面进行事件发布。
此处对EventBus进行了统一封装收口操作,主要考虑的是如果做一些操作,直接改这一处就可以。如果不需要封装,可以在使用的地方直接注入EventBus即可。
4、注意事项
4.1、循环分发事件
如果业务流程较长,切记梳理好业务流程,不要让事件循环分发。
目前EventBus没有对循环事件进行处理。
4.2、使用 @PostConstrucrt 注册listener
子类在执行实例化时,会执行父类@PostConstrucrt 注释。 如果listenerSon继承listenerFather,当两者都使用@PostConstrucrt注册订阅方法时,子类也会调用父类的注册方法进行注册订阅方法。由于EventBus机制,子类注册订阅方法时,也会注册父类的监听方法
Subscriber唯一标志是(listener+method),因此在对同一方法注册时,由于不是同一个listener,所以对于EventBus是两个订阅方法。
因此,如果存在listenerSon、listenerFather两个listener,且listenerSon继承listenerFather。当都使用@PostConstrucrt注册时,会导致listenerFather里面的订阅方法注册两次。
4.3、含有继承关系的listener
当注册listener含有继承关系时,listener处理Event消息时,listener的父类也会处理该消息。
4.3.1、继承关系的订阅者
4.3.2、原理
子类listener注册,父类listener也会注册
4.4、含有继承关系的Event
如果作为参数的Event有继承关系,使用EventBus发布Event时,Event父类的监听者也会对Event进行处理。
4.4.1、执行结果
4.4.2、原理
在分发消息的时候,会获取所有订阅者数据(Event订阅者和Event超类的订阅者),然后进行分发数据。
获取订阅者数据如下图:
缓存Event及其超类的类对象,key为Event类对象。
5、分享提问问题
问题1:PerThreadQueuedDispatcherd 里面的队列,是否是有界队列?
有界队列,最大值为 int 的最大值 (2147483647),源码如下图:
问题2:dispatcher 分发给订阅者是否有序?
EventBus:同步事件总线:
同一个事件的多个订阅者,在接收到事件的顺序上面有不同。谁先注册到EventBus的,谁先执行(由于base使用的是PostConstruct进行注册,因此跟不同Bean之间的初始化顺序有关系)。如果是在同一个类中的两个订阅者一起被注册到EventBus的情况,收到事件的顺序跟方法名有关。
AsyncEventBus:异步事件总线:同一个事件的多个订阅者,它们的注册顺序跟接收到事件的顺序上没有任何联系,都会同时收到事件,并且都是在新的线程中,异步并发的执行自己的任务。
问题3:EventBus与SpringEvent的对比?
- 使用方式比较
项目 |
事件 |
发布者 |
发布方法 |
是否异步 |
监听者 |
注册方式 |
EventBus |
任意对象 |
EventBus |
EventBus#post |
支持同步异步 |
注解Subscribe方法 |
手动注册EventBus#register |
SpringEvent |
任意对象 |
ApplicationEventPublisher |
ApplicationEventPublisher#publishEvent |
支持同步异步 |
注解EventListener方法 |
系统注册 |
- 使用场景比较
项目 |
事件区分 |
是否支持事件簇 |
是否支持自定义event |
是否支持过滤 |
是否支持事件隔离 |
是否支持事务 |
是否支持设置订阅者消费顺序 |
复杂程度 |
EventBus |
Class |
是 |
是 |
否 |
是 |
否 |
否 |
简单 |
Spring Event |
Class |
是 |
是 |
是 |
否 |
是 |
是 |
复杂 |
参考链接https://www.cnblogs.com/shoren/p/eventBus_springEvent.html
问题4:EventBus的使用场景,结合现有使用场景考虑是否合适?
EventBus暂时不适用,主要有一下几个点:
- EventBus不支持事务,项目在更新、创建商品时,最好等事务提交成功后,再发送MQ消息(主要问题点)
- EventBus不支持设置同一消息的订阅者消费顺序。
- EventBus不支持消息过滤。SpringEvent支持消息过滤
6.项目中遇到的问题
6.1、问题描述
商品上架时会触发渠道分发功能,会有两步操作
- 1、创建一条分发记录,并对外发送一条未分发状态的商品变更消息(通过eventBus 事件发送消息)。
- 2、将分发记录改为审核中(需要审核)或审核通过(不需要审核),并对外发送一条已分发状态的商品变更消息(通过eventBus 事件发送消息)。
所以分发会触发两条分发状态不同的商品变更消息,一条是未分发,另一条是已分发。实际发送了两条分发状态相同的商品变更消息,状态都是已分发。
6.2、原因
我们先来回顾下EventBus 监听者处理事件时有三种策略,这是根本原因:
- ImmediateDispatcher:来一个事件马上进行处理。
- PerThreadQueuedDispatcher(eventBus默认选项,项目中使用此策略):在同一个线程post的Event,执行的顺序是有序的。用ThreadLocal queue来实现每个线程post的Event是有序的,在把事件添加到queue后会有一个ThreadLocal dispatching来判断当前线程是否正在分发,如果正在分发,则这次添加的event不会马上进行分发而是等到dispatching的值为false才进行。
- LegacyAsyncDispatcher(AsyncEventBus默认选项):会有一个全局的队列ConcurrentLinkedQueue queue保存EventWithSubscriber(事件和subscriber),如果被不同的线程poll 不能保证在queue队列中的event是有序发布的。
详情可见上文中的【2.3.4、事件分发】
再看下项目中的逻辑:
6.3、场景复现
在handler中对静态变量进行两次+1 操作,每操作一步发送一条事件,此处假设静态变量为分发状态。
6.4、解决办法
目前 Dispatcher 包用default 修饰,使用者无法指定Dispatcher 策略。并且 ImmediateDispatcher 使用private修饰。
因此目前暂无解决非同步问题,只能在业务逻辑上进行规避。
其实可以修改源码并发布一个包自己使用,但是公司安全规定不允许这样做,只能通过业务逻辑上进行规避,下图是github上对此问题的讨论。
7、总结
如果项目中需要使用异步解耦处理一些事项,使用EventBus还是比较方便的。