使用 Java8 实现观察者模式的方法(下)

时间:2022-03-20 10:18:22

在上篇文章给大家介绍了使用Java8 实现观察者模式的方法(上),本文继续给大家介绍java8观察者模式相关知识,具体内容如下所述:

线程安全的实现

前面章节介绍了在现代Java环境下的实现观察者模式,虽然简单但很完整,但这一实现忽略了一个关键性问题:线程安全。大多数开放的Java应用都是多线程的,而且观察者模式也多用于多线程或异步系统。例如,如果外部服务更新其数据库,那么应用也会异步地收到消息,然后用观察者模式通知内部组件更新,而不是内部组件直接注册监听外部服务。

观察者模式的线程安全主要集中在模式的主体上,因为修改注册监听器集合时很可能发生线程冲突,比如,一个线程试图添加一个新的监听器,而另一线程又试图添加一个新的animal对象,这将触发对所有注册监听器的通知。鉴于先后顺序,在已注册的监听器收到新增动物的通知前,第一个线程可能已经完成也可能尚未完成新监听器的注册。这是一个经典的线程资源竞争案例,正是这一现象告诉开发者们需要一个机制来保证线程安全。

这一问题的最简单的解决方案是:所有访问或修改注册监听器list的操作都须遵循Java的同步机制,比如:

?
1
2
3
public synchronized AnimalAddedListener registerAnimalAddedListener (AnimalAddedListener listener) { /*...*/ }
public synchronized void unregisterAnimalAddedListener (AnimalAddedListener listener) { /*...*/ }
public synchronized void notifyAnimalAddedListeners (Animal animal) { /*...*/ }

这样一来,同一时刻只有一个线程可以修改或访问已注册的监听器列表,可以成功地避免资源竞争问题,但是新问题又出现了,这样的约束太过严格(synchronized关键字和Java并发模型的更多信息,请参阅官方网页)。通过方法同步,可以时刻观测对监听器list的并发访问,注册和撤销监听器对监听器list而言是写操作,而通知监听器访问监听器list是只读操作。由于通过通知访问是读操作,因此是可以多个通知操作同时进行的。

因此,只要没有监听器注册或撤销注册,任意多的并发通知都可以同时执行,而不会引发对注册的监听器列表的资源争夺。当然,其他情况下的资源争夺现象存在已久,为了解决这一问题,设计了ReadWriteLock用以分开管理读写操作的资源锁定。Zoo类的线程安全ThreadSafeZoo实现代码如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class ThreadSafeZoo {
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
protected final Lock readLock = readWriteLock.readLock();
protected final Lock writeLock = readWriteLock.writeLock();
private List<Animal> animals = new ArrayList<>();
private List<AnimalAddedListener> listeners = new ArrayList<>();
public void addAnimal (Animal animal) {
// Add the animal to the list of animals
this.animals.add(animal);
// Notify the list of registered listeners
this.notifyAnimalAddedListeners(animal);
}
public AnimalAddedListener registerAnimalAddedListener (AnimalAddedListener listener) {
// Lock the list of listeners for writing
this.writeLock.lock();
try {
// Add the listener to the list of registered listeners
this.listeners.add(listener);
}
finally {
// Unlock the writer lock
this.writeLock.unlock();
}
return listener;
}
public void unregisterAnimalAddedListener (AnimalAddedListener listener) {
// Lock the list of listeners for writing
this.writeLock.lock();
try {
// Remove the listener from the list of the registered listeners
this.listeners.remove(listener);
}
finally {
// Unlock the writer lock
this.writeLock.unlock();
}
}
public void notifyAnimalAddedListeners (Animal animal) {
// Lock the list of listeners for reading
this.readLock.lock();
try {
// Notify each of the listeners in the list of registered listeners
this.listeners.forEach(listener -> listener.updateAnimalAdded(animal));
}
finally {
// Unlock the reader lock
this.readLock.unlock();
}
}
}

通过这样部署,Subject的实现能确保线程安全并且多个线程可以同时发布通知。但尽管如此,依旧存在两个不容忽略的资源竞争问题:

对每个监听器的并发访问。多个线程可以同时通知监听器要新增动物了,这意味着一个监听器可能会同时被多个线程同时调用。

对animal list的并发访问。多个线程可能会同时向animal list添加对象,如果通知的先后顺序存在影响,那就可能导致资源竞争,这就需要一个并发操作处理机制来避免这一问题。如果注册的监听器列表在收到通知添加animal2后,又收到通知添加animal1,此时就会产生资源竞争。但是如果animal1和animal2的添加由不同的线程执行,也是有可能在animal2前完成对animal1添加操作,具体来说就是线程1在通知监听器前添加animal1并锁定模块,线程2添加animal2并通知监听器,然后线程1通知监听器animal1已经添加。虽然在不考虑先后顺序时,可以忽略资源竞争,但问题是真实存在的。

对监听器的并发访问

并发访问监听器可以通过保证监听器的线程安全来实现。秉承着类的“责任自负”精神,监听器有“义务”确保自身的线程安全。例如,对于前面计数的监听器,多线程的递增或递减动物数量可能导致线程安全问题,要避免这一问题,动物数的计算必须是原子操作(原子变量或方法同步),具体解决代码如下:

?
1
2
3
4
5
6
7
8
9
10
public class ThreadSafeCountingAnimalAddedListener implements AnimalAddedListener {
private static AtomicLong animalsAddedCount = new AtomicLong(0);
@Override
public void updateAnimalAdded (Animal animal) {
// Increment the number of animals
animalsAddedCount.incrementAndGet();
// Print the number of animals
System.out.println("Total animals added: " + animalsAddedCount);
}
}

方法同步解决方案代码如下:

?
1
2
3
4
5
6
7
8
9
10
public class CountingAnimalAddedListener implements AnimalAddedListener {
private static int animalsAddedCount = 0;
@Override
public synchronized void updateAnimalAdded (Animal animal) {
// Increment the number of animals
animalsAddedCount++;
// Print the number of animals
System.out.println("Total animals added: " + animalsAddedCount);
}
}

要强调的是监听器应该保证自身的线程安全,subject需要理解监听器的内部逻辑,而不是简单确保对监听器的访问和修改的线程安全。否则,如果多个subject共用同一个监听器,那每个subject类都要重写一遍线程安全的代码,显然这样的代码不够简洁,因此需要在监听器类内实现线程安全。

监听器的有序通知

当要求监听器有序执行时,读写锁就不能满足需求了,而需要引入一个新的机制,可以保证notify函数的调用顺序和animal添加到zoo的顺序一致。有人尝试过用方法同步来实现,然而根据Oracle文档中的方法同步介绍,可知方法同步并不提供操作执行的顺序管理。它只是保证原子操作,也就是说操作不会被打断,并不能保证先来先执行(FIFO)的线程顺序。ReentrantReadWriteLock可以实现这样的执行顺序,代码如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class OrderedThreadSafeZoo {
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
protected final Lock readLock = readWriteLock.readLock();
protected final Lock writeLock = readWriteLock.writeLock();
private List<Animal> animals = new ArrayList<>();
private List<AnimalAddedListener> listeners = new ArrayList<>();
public void addAnimal (Animal animal) {
// Add the animal to the list of animals
this.animals.add(animal);
// Notify the list of registered listeners
this.notifyAnimalAddedListeners(animal);
}
public AnimalAddedListener registerAnimalAddedListener (AnimalAddedListener listener) {
// Lock the list of listeners for writing
this.writeLock.lock();
try {
// Add the listener to the list of registered listeners
this.listeners.add(listener);
}
finally {
// Unlock the writer lock
this.writeLock.unlock();
}
return listener;
}
public void unregisterAnimalAddedListener (AnimalAddedListener listener) {
// Lock the list of listeners for writing
this.writeLock.lock();
try {
// Remove the listener from the list of the registered listeners
this.listeners.remove(listener);
}
finally {
// Unlock the writer lock
this.writeLock.unlock();
}
}
public void notifyAnimalAddedListeners (Animal animal) {
// Lock the list of listeners for reading
this.readLock.lock();
try {
// Notify each of the listeners in the list of registered listeners
this.listeners.forEach(listener -> listener.updateAnimalAdded(animal));
}
finally {
// Unlock the reader lock
this.readLock.unlock();
}
}
}

这样的实现方式,register, unregister和notify函数将按照先进先出(FIFO)的顺序获得读写锁权限。例如,线程1注册一个监听器,线程2在开始执行注册操作后试图通知已注册的监听器,线程3在线程2等待只读锁的时候也试图通知已注册的监听器,采用fair-ordering方式,线程1先完成注册操作,然后线程2可以通知监听器,最后线程3通知监听器。这样保证了action的执行顺序和开始顺序一致。

如果采用方法同步,虽然线程2先排队等待占用资源,线程3仍可能比线程2先获得资源锁,而且不能保证线程2比线程3先通知监听器。问题的关键所在:fair-ordering方式可以保证线程按照申请资源的顺序执行。读写锁的顺序机制很复杂,应参照ReentrantReadWriteLock的官方文档以确保锁的逻辑足够解决问题。

截止目前实现了线程安全,在接下来的章节中将介绍提取主题的逻辑并将其mixin类封装为可重复代码单元的方式优缺点。

主题逻辑封装到Mixin类

把上述的观察者模式设计实现封装到目标的mixin类中很具吸引力。通常来说,观察者模式中的观察者包含已注册的监听器的集合;负责注册新的监听器的register函数;负责撤销注册的unregister函数和负责通知监听器的notify函数。对于上述的动物园的例子,zoo类除动物列表是问题所需外,其他所有操作都是为了实现主题的逻辑。

Mixin类的案例如下所示,需要说明的是为使代码更为简洁,此处去掉关于线程安全的代码:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public abstract class ObservableSubjectMixin<ListenerType> {
private List<ListenerType> listeners = new ArrayList<>();
public ListenerType registerListener (ListenerType listener) {
// Add the listener to the list of registered listeners
this.listeners.add(listener);
return listener;
}
public void unregisterAnimalAddedListener (ListenerType listener) {
// Remove the listener from the list of the registered listeners
this.listeners.remove(listener);
}
public void notifyListeners (Consumer<? super ListenerType> algorithm) {
// Execute some function on each of the listeners
this.listeners.forEach(algorithm);
}
}

正因为没有提供正在注册的监听器类型的接口信息,不能直接通知某个特定的监听器,所以正需要保证通知功能的通用性,允许客户端添加一些功能,如接受泛型参数类型的参数匹配,以适用于每个监听器,具体实现代码如下:

?
1
2
3
4
5
6
7
8
9
public class ZooUsingMixin extends ObservableSubjectMixin<AnimalAddedListener> {
private List<Animal> animals = new ArrayList<>();
public void addAnimal (Animal animal) {
// Add the animal to the list of animals
this.animals.add(animal);
// Notify the list of registered listeners
this.notifyListeners((listener) -> listener.updateAnimalAdded(animal));
}
}

Mixin类技术的最大优势是把观察者模式的Subject封装到一个可重复调用的类中,而不是在每个subject类中都重复写这些逻辑。此外,这一方法使得zoo类的实现更为简洁,只需要存储动物信息,而不用再考虑如何存储和通知监听器。

然而,使用mixin类并非只有优点。比如,如果要存储多个类型的监听器怎么办?例如,还需要存储监听器类型AnimalRemovedListener。mixin类是抽象类,Java中不能同时继承多个抽象类,而且mixin类不能改用接口实现,这是因为接口不包含state,而观察者模式中state需要用来保存已经注册的监听器列表。

其中的一个解决方案是创建一个动物增加和减少时都会通知的监听器类型ZooListener,代码如下所示:

?
1
2
3
4
public interface ZooListener {
public void onAnimalAdded (Animal animal);
public void onAnimalRemoved (Animal animal);
}

这样就可以使用该接口实现利用一个监听器类型对zoo状态各种变化的监听了:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ZooUsingMixin extends ObservableSubjectMixin<ZooListener> {
private List<Animal> animals = new ArrayList<>();
public void addAnimal (Animal animal) {
// Add the animal to the list of animals
this.animals.add(animal);
// Notify the list of registered listeners
this.notifyListeners((listener) -> listener.onAnimalAdded(animal));
}
public void removeAnimal (Animal animal) {
// Remove the animal from the list of animals
this.animals.remove(animal);
// Notify the list of registered listeners
this.notifyListeners((listener) -> listener.onAnimalRemoved(animal));
}
}

将多个监听器类型合并到一个监听器接口中确实解决了上面提到的问题,但仍旧存在不足之处,接下来的章节会详细讨论。

Multi-Method监听器和适配器

在上述方法,监听器的接口中实现的包含太多函数,接口就过于冗长,例如,Swing MouseListener就包含5个必要的函数。尽管可能只会用到其中一个,但是只要用到鼠标点击事件就必须要添加这5个函数,更多可能是用空函数体来实现剩下的函数,这无疑会给代码带来不必要的混乱。

其中一种解决方案是创建适配器(概念来自GoF提出的适配器模式),适配器中以抽象函数的形式实现监听器接口的操作,供具体监听器类继承。这样一来,具体监听器类就可以选择其需要的函数,对adapter不需要的函数采用默认操作即可。例如上面例子中的ZooListener类,创建ZooAdapter(Adapter的命名规则与监听器一致,只需要把类名中的Listener改为Adapter即可),代码如下:

?
1
2
3
4
5
6
public class ZooAdapter implements ZooListener {
@Override
public void onAnimalAdded (Animal animal) {}
@Override
public void onAnimalRemoved (Animal animal) {}
}

乍一看,这个适配器类微不足道,然而它所带来的便利却是不可小觑的。比如对于下面的具体类,只需选择对其实现有用的函数即可:

?
1
2
3
4
5
6
7
public class NamePrinterZooAdapter extends ZooAdapter {
@Override
public void onAnimalAdded (Animal animal) {
// Print the name of the animal that was added
System.out.println("Added animal named " + animal.getName());
}
}

有两种替代方案同样可以实现适配器类的功能:一是使用默认函数;二是把监听器接口和适配器类合并到一个具体类中。默认函数是Java8新提出的,在接口中允许开发者提供默认(防御)的实现方法。

Java库的这一更新主要是方便开发者在不改变老版本代码的情况下,实现程序扩展,因此应该慎用这个方法。部分开发者多次使用后,会感觉这样写的代码不够专业,而又有开发者认为这是Java8的特色,不管怎样,需要明白这个技术提出的初衷是什么,再结合具体问题决定是否要用。使用默认函数实现的ZooListener接口代码如下示:

?
1
2
3
4
public interface ZooListener {
default public void onAnimalAdded (Animal animal) {}
default public void onAnimalRemoved (Animal animal) {}
}

通过使用默认函数,实现该接口的具体类,无需在接口中实现全部函数,而是选择性实现所需函数。虽然这是接口膨胀问题一个较为简洁的解决方案,开发者在使用时还应多加注意。

第二种方案是简化观察者模式,省略了监听器接口,而是用具体类实现监听器的功能。比如ZooListener接口就变成了下面这样:

?
1
2
3
4
public class ZooListener {
public void onAnimalAdded (Animal animal) {}
public void onAnimalRemoved (Animal animal) {}
}

这一方案简化了观察者模式的层次结构,但它并非适用于所有情况,因为如果把监听器接口合并到具体类中,具体监听器就不可以实现多个监听接口了。例如,如果AnimalAddedListener和AnimalRemovedListener接口写在同一个具体类中,那么单独一个具体监听器就不可以同时实现这两个接口了。此外,监听器接口的意图比具体类更显而易见,很显然前者就是为其他类提供接口,但后者就并非那么明显了。

如果没有合适的文档说明,开发者并不会知道已经有一个类扮演着接口的角色,实现了其对应的所有函数。此外,类名不包含adapter,因为类并不适配于某一个接口,因此类名并没有特别暗示此意图。综上所述,特定问题需要选择特定的方法,并没有哪个方法是万能的。

在开始下一章前,需要特别提一下,适配器在观察模式中很常见,尤其是在老版本的Java代码中。Swing API正是以适配器为基础实现的,正如很多老应用在Java5和Java6中的观察者模式中所使用的那样。zoo案例中的监听器或许并不需要适配器,但需要了解适配器提出的目的以及其应用,因为我们可以在现有的代码中对其进行使用。下面的章节,将会介绍时间复杂的监听器,该类监听器可能会执行耗时的运算或进行异步调用,不能立即给出返回值。

Complex & Blocking监听器

关于观察者模式的一个假设是:执行一个函数时,一系列监听器会被调用,但假定这一过程对调用者而言是完全透明的。例如,客户端代码在Zoo中添加animal时,在返回添加成功之前,并不知道会调用一系列监听器。如果监听器的执行需要时间较长(其时间受监听器的数量、每个监听器执行时间影响),那么客户端代码将会感知这一简单增加动物操作的时间副作用。

本文不能面面俱到的讨论这个话题,下面几条是开发者调用复杂的监听器时应该注意的事项:

监听器启动新线程。新线程启动后,在新线程中执行监听器逻辑的同时,返回监听器函数的处理结果,并运行其他监听器执行。

Subject启动新线程。与传统的线性迭代已注册的监听器列表不同,Subject的notify函数重启一个新的线程,然后在新线程中迭代监听器列表。这样使得notify函数在执行其他监听器操作的同时可以输出其返回值。需要注意的是需要一个线程安全机制来确保监听器列表不会进行并发修改。

队列化监听器调用并采用一组线程执行监听功能。将监听器操作封装在一些函数中并队列化这些函数,而非简单的迭代调用监听器列表。这些监听器存储到队列中后,线程就可以从队列中弹出单个元素并执行其监听逻辑。这类似于生产者-消费者问题,notify过程产生可执行函数队列,然后线程依次从队列中取出并执行这些函数,函数需要存储被创建的时间而非执行的时间供监听器函数调用。例如,监听器被调用时创建的函数,那么该函数就需要存储该时间点,这一功能类似于Java中的如下操作:

?
1
2
3
4
5
6
7
8
9
10
11
12
public class AnimalAddedFunctor {
private final AnimalAddedListener listener;
private final Animal parameter;
public AnimalAddedFunctor (AnimalAddedListener listener, Animal parameter) {
this.listener = listener;
this.parameter = parameter;
}
public void execute () {
// Execute the listener with the parameter provided during creation
this.listener.updateAnimalAdded(this.parameter);
}
}

函数创建并保存在队列中,可以随时调用,这样一来就无需在遍历监听器列表时立即执行其对应操作了。一旦每个激活监听器的函数都压入队列中,“消费者线程”就会给客户端代码返回操作权。之后某个时间点“消费者线程”将会执行这些函数,就像在监听器被notify函数激活时执行一样。这项技术在其他语言中被叫作参数绑定,刚好适合上面的例子,技术的实质是保存监听器的参数,execute()函数再直接调用。如果监听器接收多个参数,处理方法也类似。

需要注意的是如果要保存监听器的执行顺序,则需要引入综合排序机制。方案一中,监听器按照正常的顺序激活新线程,这样可以确保监听器按照注册的顺序执行。方案二中,队列支持排序,其中的函数会按照进入队列的顺序执行。简单来说就是,开发者需要重视监听器多线程执行的复杂程度,加以小心处理以确保实现所需的功能。

结束语

观察者模式在1994年被写进书中以前,就已经是主流的软件设计模式了,为软件设计中经常出现的问题提供了很多令人满意的解决方案。Java一直是使用该模式的引领者,在其标准库中封装了这一模式,但是鉴于Java更新到了版本8,十分有必要重新考查经典模式在其中的使用。随着lambda表达式和其他新结构的出现,这一“古老的”模式又有了新的生机。无论是处理旧程序还是使用这一历史悠久的方法解决新问题,尤其对经验丰富的Java开发者来说,观察者模式都是开发者的主要工具。

OneAPM 为您提供端到端的Java 应用性能解决方案,我们支持所有常见的 Java 框架及应用服务器,助您快速发现系统瓶颈,定位异常根本原因。分钟级部署,即刻体验,Java 监控从来没有如此简单。想阅读更多技术文章,请访问OneAPM 官方技术博客。

以上内容给大家介绍了使用 Java8 实现观察者模式的方法(下),希望对大家有所帮助!