观察者模式在RxJava中的应用

时间:2022-01-23 17:44:54

一、观察者模式(observer pattern)和发布-订阅模式(publish-subscribe pattern)

  在分析观察者模式在项目中的应用之前,我想要先谈谈观察者模式和发布订阅模式的区别。最初是课堂上的练习中发现同时有两者的名词解释,一时犯了难,只好把两者的答案写成一样,没有去深究。到写这篇博客需要查阅资料时,很多blog又将两者不加区分,迷惑不已,又看了几篇博客,才大概搞懂两者间的区别。

  在观察者模式中,目标和观察者是基类,目标提供维护观察者的一系列方法,观察者提供更新接口。具体观察者和具体目标继承各自的基类,然后具体观察者把自己注册到具体目标里,在具体目标发生变化时候,调度观察者的更新方法。

 

观察者模式在RxJava中的应用

 

  发布订阅模式是最常用的一种观察者模式的实现,并且从解耦和重用角度来看,更优于典型的观察者模式。

  在发布订阅模式中,发布者和订阅者之间多了一个发布通道;一方面从发布者接收事件,另一方面向订阅者发布事件;订阅者需要从事件通道订阅事件以此避免发布者和订阅者之间产生依赖关系。

 

观察者模式在RxJava中的应用

 

 

 二、观察者模式在RXjava中的应用

1. RxJava是什么

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.

  RxJava在GitHub上的的介绍如上, 简单翻译一下其为一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库它扩展了观察者模式以支持数据和/或事件序列,并增加了运算符,使你可以声明性地组合序列,同时抽象出对低级线程,同步,线程安全性,并发数据结构和非线程等事物的关注阻塞I/O。到目前为止RXJava已经更新到了2.x版本。

2.RxJava中的观察者模式

  RxJava有四个重要的概念:目标(observable),观察者(observer),订阅(subscribe),事件(event)。和观察者模式的一般结构一样,观察者订阅被观察者,被观察者发生事件时通知观察者,观察者实现更新。下面一一分析

  1) 被观察者及发送事件的实现

  源代码请点击

1 Observable.create(new ObservableOnSubscribe<Integer>() {
2     @Override
3     public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
4         emitter.onNext(1);
5         emitter.onNext(2);
6         emitter.onNext(3);
7         emitter.onComplete();
8     }
9 })

  被观察对象的创建

1   public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
2       ...
3 
4       return new ObservableCreate<T>(source)
5 }

  此处创建ObservableCreate类对象,并传入source对象,即我们手动创建的ObservableOnSubscribe对象

1     public final class ObservableCreate<T> extends Observable<T> {
2       ...
3       // 仅贴出关键源码
4 
5         final ObservableOnSubscribe<T> source;
6 
7         public ObservableCreate(ObservableOnSubscribe<T> source) {
8             this.source = source;
9     }

  可以看出来,ObservableCreate类 = Observable的子类,传入的source对象 = 手动创建的ObservableOnSubscribe对象。

 1         @Override
 2         protected void subscribeActual(Observer<? super T> observer) {
 3 
 4             CreateEmitter<T> parent = new CreateEmitter<T>(observer);
 5 
 6             observer.onSubscribe(parent);
 7 
 8             try {
 9                 source.subscribe(parent);
10 
11             } catch (Throwable ex) {
12                 Exceptions.throwIfFatal(ex);
13                 parent.onError(ex);
14             }
15     }

  这一段代码复写了subscribeActual(),其作用为订阅时,通过接口回调调用被观察者(Observerable)与观察者(Observer)的方法。可以分为三个步骤来看:

  (1)创建1个CreateEmitter对象(封装成1个Disposable对象),作用是发布事件

  (2)调用观察者(Observer)的onSubscribe(),onSubscribe()的实现 = 使用步骤2(创建观察者(Observer))时复写的onSubscribe()

  (3)调用source对象的subscribe(),source对象 = 使用步骤1(创建被观察者(Observable))中创建的ObservableOnSubscribe对象,subscribe()的实现 = 使用步骤1(创建被观察者(Observable))中复写的subscribe()

  接下来则是事件发射subscribe()实现中的onNext()

 1 static final class CreateEmitter<T> extends AtomicReference<Disposable>
 2                                         implements ObservableEmitter<T>, Disposable {
 3 
 4         ...
 5         @Override
 6         public void onNext(T t) {
 7             if (t == null) {
 8                 onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
 9                 return;
10             }
11 
12             if (!isDisposed()) {
13                 observer.onNext(t);
14             }
15         }
16 
17         @Override
18         public void onError(Throwable t) {
19             if (t == null) {
20                 t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
21             }
22             if (!isDisposed()) {
23                 try {
24                     observer.onError(t);
25                 } finally {
26                     dispose();
27                 }
28             } else {
29                 RxJavaPlugins.onError(t);
30             }
31         }
32 
33         @Override
34         public void onComplete() {
35             if (!isDisposed()) {
36                 try {
37                     observer.onComplete();
38                 } finally {
39                     dispose();
40                 }
41             }
42         }

  2)创建观察者

https://github.com/ReactiveX/RxJava/blob/59454ea47965547e7e218039ed152b591a51e268/src/main/java/io/reactivex/Observer.java#L76

1   public interface Observer<T> {
2         void onSubscribe(@NonNull Disposable d); 
3         void onNext(@NonNull T t);
4         void onError(@NonNull Throwable e);
5         void onComplete();
6     }
Observer本质 = 1个接口, 接口内含4个方法,分别用于 响应对应于被观察者发送的不同事件。

  3)通过订阅连接被观察者和观察者

https://github.com/ReactiveX/RxJava/blob/be0353cc6334fbb60a285f2f4f4ba88150f099e6/src/main/java/io/reactivex/Observable.java#L12083

1   @Override
2   public final void subscribe(Observer<? super T> observer) {
3 
4     ...
5     // 仅贴出关键源码
6 
7     subscribeActual(observer);
8   }

9 protected abstract void subscribeActual(Observer<? super T> observer);
  Observable.subscribeActual(observer)属于抽象方法,由子类实现;此处的子类 = 创建被观察者(Observable)时创建的ObservableCreate类,即 在订阅时,实际上是调用了创建被观察者(Observable)时创建的ObservableCreate类里的subscribeActual()
 

3 总结

  很显然,观察者模式使得RxJava的概念结构更加简单,各种类之间的耦合更加松散,诸如onNext(),onError()等发生时,也不用关心发生了事件的具体内容,只需要通知到观察者,就能进行相应的更新。