浅谈RxJava源码解析(观察者),创建(create、from、just),变换(Map、flatMap)、线程调度

时间:2021-06-03 15:40:55

一、创建操作:

1、观察者模式:
RxJava的世界里,我们有四种角色:

    Observable<T>(被观察者)、Observer(观察者)
    Subscriber(订阅者)、Subject
    Observable和Subject是两个“生产”实体,Observer和Subscriber是两个“消费”实体。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。

2、回调方法:
Subscribe方法用于将观察者连接到Observable,你的观察者需要实现以下方法:

    • onNext(T item)
      Observable调用这个方法发射数据,方法的参数就是Observable发射的数据,这个方法可能会被调用多次,取决于你的实现。

    • onError(Exception ex)
      当Observable遇到错误或者无法返回期望的数据时会调用这个方法,这个调用会终止Observable,后续不会再调用onNext和onCompleted,onError方法的参数是抛出的异常。

    • onComplete
      正常终止,如果没有遇到错误,Observable在最后一次调用onNext之后调用此方法。

3、添加依赖:

  compile 'io.reactivex:rxandroid:1.1.0'   compile 'io.reactivex:rxjava:1.1.0'

4、create():
Observable.create(new Observable.OnSubscribe<String>() {
    @Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("RxJava");
subscriber.onCompleted();
}
}).subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.i(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext=" + s);
}
});
Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的,只是多了onStart()方法,作为异步调用之前的操作:
.subscribe(new Subscriber() {
@Override
public void onStart() {
Log.i(TAG, "onStart");
}

@Override
public void onCompleted() {
Log.i(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
Log.i(TAG, "onNext=" + o);
}
});

*****************************************************源码解析订阅**************************************
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}

private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// validate and proceed
if (subscriber == null) {
throw new IllegalArgumentException("observer can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that's not the appropriate approach
* so I won't mention that in the exception
*/
}

// new Subscriber so onStart it
subscriber.onStart();

/*
* See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
* to user code from within an Observer"
*/
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}

// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));//捕获异常并回调onError()
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren't we throwing the hook's return value.
throw r;
}
return Subscriptions.unsubscribed();
}
}
*****************************************************源码解析订阅**************************************

5、from():

String[] arrays={"Hello","RxJava"};
Observable.from(arrays)
.subscribe(new Subscriber() {
@Override
public void onStart() {
Log.i(TAG, "onStart");
}

@Override
public void onCompleted() {
Log.i(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
Log.i(TAG, "onNext=" + o);
}
});
*****************************************************源码解析订阅**************************************
public final class{
  ...
  //此处create()就不多做解释了,...call()...
  public final static <T> Observable<T> from(Iterable<? extends T> iterable) {        
    return create(new OnSubscribeFromIterable<T>(iterable));    
  }
  ...
}
public final class OnSubscribeFromIterable<T> implements OnSubscribe<T> {
final Iterable<? extends T> is;
public OnSubscribeFromIterable(Iterable<? extends T> iterable) {
if (iterable == null) {
throw new NullPointerException("iterable must not be null");
}
this.is = iterable;
}

@Override
public void call(final Subscriber<? super T> o) {
final Iterator<? extends T> it = is.iterator();
if (!it.hasNext() && !o.isUnsubscribed())
o.onCompleted();
else
o.setProducer(new IterableProducer<T>(o, it));//未执行完继续迭代
}
private static final class IterableProducer<T> extends AtomicLong implements Producer {
/** */
private static final long serialVersionUID = -8730475647105475802L;
private final Subscriber<? super T> o;
private final Iterator<? extends T> it;

private IterableProducer(Subscriber<? super T> o, Iterator<? extends T> it) {
this.o = o;
this.it = it;
}

@Override
public void request(long n) {
if (get() == Long.MAX_VALUE) {
// already started with fast-path
return;
}
if (n == Long.MAX_VALUE && compareAndSet(0, Long.MAX_VALUE)) {
fastpath();
} else
if (n > 0 && BackpressureUtils.getAndAddRequest(this, n) == 0L) {
slowpath(n);
}

}

void slowpath(long n) {
// backpressure is requested
final Subscriber<? super T> o = this.o;
final Iterator<? extends T> it = this.it;

long r = n;
while (true) {
/*
* This complicated logic is done to avoid touching the
* volatile `requested` value during the loop itself. If
* it is touched during the loop the performance is
* impacted significantly.
*/
long numToEmit = r;
while (true) {
if (o.isUnsubscribed()) {
return;
} else if (it.hasNext()) {
if (--numToEmit >= 0) {
o.onNext(it.next());
} else
break;
} else if (!o.isUnsubscribed()) {
o.onCompleted();
return;
} else {
// is unsubscribed
return;
}
}
r = addAndGet(-r);
if (r == 0L) {
// we're done emitting the number requested so
// return
return;
}

}
}

void fastpath() {
// fast-path without backpressure
final Subscriber<? super T> o = this.o;
final Iterator<? extends T> it = this.it;

while (true) {
if (o.isUnsubscribed()) {
return;
} else if (it.hasNext()) {
o.onNext(it.next());
} else if (!o.isUnsubscribed()) {
o.onCompleted();
return;
} else {
// is unsubscribed
return;
}
}
}
}
}
*****************************************************源码解析订阅**************************************

6、just():

Observable.just("Hello","RxJava")  .subscribe(new Observer<String>() {    @Override    public void onCompleted() {      Log.i("wxl", "onCompleted");    }    @Override    public void onError(Throwable e) {    }    @Override    public void onNext(String s) {      Log.i("wxl", "onNext=" + s);    }  });
*****************************************************源码解析订阅**************************************
public final static <T> Observable<T> just(T t1, T t2) {   //然而源码解析完之后你会觉得 同上from()
return from(Arrays.asList(t1, t2));
}
*****************************************************源码解析订阅**************************************

二、变换操作:(重点:当然还是要先创建被观察者)
1、Map(): (多输入,单输出
的概念,用代理模式去理解map()方法执行过程,简单说就是Observable和OnSubscribe被新的取代了)

Observable.just("Hello", "RxJava")  .map(new Func1<String, String>() {    @Override    public String call(String s) {      return s.toUpperCase();    }  }).subscribe(new Subscriber<String>() {    @Override    public void onCompleted() {      Log.i("wxl", "onCompleted");    }     @Override    public void onError(Throwable e) {     }     @Override    public void onNext(String s) {      Log.i("wxl", "onNext=" + s);    }});
*****************************************************源码解析订阅**************************************
public class Observable<T> {
...
  //订阅,跟上面一样
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// validate and proceed
if (subscriber == null) {
throw new IllegalArgumentException("observer can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that's not the appropriate approach
* so I won't mention that in the exception
*/
}

// new Subscriber so onStart it
subscriber.onStart();

/*
* See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
* to user code from within an Observer"
*/
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}

// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);//发射OnSubscrible中的call();注意,此时已替换了,用代理思维去理解。
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren't we throwing the hook's return value.
throw r;
}
return Subscriptions.unsubscribed();
}
}

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() { //重新创建了新的Observable和OnSubscribe
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = hook.onLift(operator).call(o);//回调替换的部分逻辑
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
onSubscribe.call(st);//发射新的
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
});
}
...
}
*****************************************************源码解析订阅**************************************

2、flatMap():

同上差不多,可以看做是扁平化的一种map(二次转换)

 比较flatMap与map:

假设:如果要打印出每个学生所需要修的所有课程的名称呢?(需求的区别在于,每个学生只有一个名字,但却有多个课程。)首先可以这样实现:
Student[] students = ...;
Subscriber<Student> subscriber = new Subscriber<Student>() {
  @Override
  public void onNext(Student student) {
    List<Course> courses = student.getCourses();
    for (int i = 0; i < courses.size(); i++) {
      Course course = courses.get(i);
   Log.d(tag, course.getName());
   }
  }
...
};
Observable.from(students)
  .subscribe(subscriber);
而flatMap:
Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
  @Override
  public void onNext(Course course) {
Log.d(tag, course.getName());
}
...
};
Observable.from(students)
  .flatMap(new Func1<Student, Observable<Course>>() {
    @Override
    public Observable<Course> call(Student student) {
  return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);


3、Filter():
Observable.just(4, 2, 1, 7, 5)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer > 3;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
Log.i("wxl", "onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
Log.i("wxl", "onNext=" + integer);
}
});
***************************************************源码解析****************************************************************
public final Observable<T> filter(Func1<? super T, Boolean> predicate) {
return lift(new OperatorFilter<T>(predicate));
}

public final class OperatorFilter<T> implements Operator<T, T> {

private final Func1<? super T, Boolean> predicate;

public OperatorFilter(Func1<? super T, Boolean> predicate) {
this.predicate = predicate;
}

@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
return new Subscriber<T>(child) {

@Override
public void onCompleted() {
child.onCompleted();
}

@Override
public void onError(Throwable e) {
child.onError(e);
}

@Override
public void onNext(T t) {
try {
if (predicate.call(t)) {//判断
child.onNext(t);
} else {
// TODO consider a more complicated version that batches these
request(1);
}
} catch (Throwable e) {
Exceptions.throwOrReport(e, child, t);
}
}

};
}

}
***************************************************源码解析****************************************************************
4、线程调度:
.subscribeOn(Schedulers.io()):
***************************************************源码解析****************************************************************
public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return nest().lift(new OperatorSubscribeOn<T>(scheduler));
}
 
public final class Schedulers {
  //
private final Scheduler computationScheduler;
private final Scheduler ioScheduler;
private final Scheduler newThreadScheduler;

private static final Schedulers INSTANCE = new Schedulers();
   ....
}

public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> {

private final Scheduler scheduler;

public OperatorSubscribeOn(Scheduler scheduler) {
this.scheduler = scheduler;
}

@Override
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
return new Subscriber<Observable<T>>(subscriber) {

@Override
public void onCompleted() {
// ignore because this is a nested Observable and we expect only 1 Observable<T> emitted to onNext
}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
}

@Override
public void onNext(final Observable<T> o) {
inner.schedule(new Action0() {//在选择的线程里做处理

@Override
public void call() {
final Thread t = Thread.currentThread();
o.unsafeSubscribe(new Subscriber<T>(subscriber) {

@Override
public void onCompleted() {
subscriber.onCompleted();
}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
}

@Override
public void onNext(T t) {
subscriber.onNext(t);
}

@Override
public void setProducer(final Producer producer) {
subscriber.setProducer(new Producer() {

@Override
public void request(final long n) {
if (Thread.currentThread() == t) {
// don't schedule if we're already on the thread (primarily for first setProducer call)
// see unit test 'testSetProducerSynchronousRequest' for more context on this
producer.request(n);
} else {
inner.schedule(new Action0() {

@Override
public void call() {
producer.request(n);
}
});
}
}

});
}

});
}
});
}

};
}
}
***************************************************源码解析****************************************************************
.observeOn(AndroidSchedulers.mainThread()):
参考
OperatorObserveOn中源码;

基本思路:最好用代理思维去理解

(被观察者)Observable<T>:(订阅者)OnSubscribe<T> extends Action1<T> extends Action extends Function

——> 订阅 subscribe(Observer<T>) 回调处理

(观察者)Observer<T>:Subscriber<T> implements Observer<T>

 应用场景还没有,希望多交流多指正!