RxJava 1.0版本入门篇之--1

时间:2022-01-23 17:45:06

RxJava 1.0版本入门

1.首先在app目录下的build.gradle中加入

compile 'io.reactivex:rxandroid:1.2.1'
compile 'io.reactivex:rxjava:1.1.6'

然后点击同步工程(sync Project with gradle files)不报错的话就可以来使用了.

Demo1 观察者是Observer

       //1.创建observable被观察者
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello RXAndroid");
}
});

//2.创建观察者
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
Log.v(TAG,"onCompleted");
}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {
Log.v(TAG,"onNext"+s);
}
};

//3.订阅
observable.subscribe(observer);

Demo2 观察者是Single

    //Single对象只能调用onSuccess()/onError()  并且只能调用一次
Single<String> observable = Single.create(new Single.OnSubscribe<String>() {
@Override
public void call(SingleSubscriber<? super String> singleSubscriber) {
//系统内部就会调用观察者对象的onNext+onComplete()
//singleSubscriber.onSuccess("好好学习!");
//系统内部就会调用观察者对象的onError()
singleSubscriber.onError(new NullPointerException());
}
});

//2.家庭-----》观察者Observer
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
Log.i(TAG, "onCompleted: ");
}

@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: "+e.getLocalizedMessage());
}

//接受处理事件的方法
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: "+s);
}
};


//3.关联----》订阅subscribe
observable.subscribe(observer);
}

Demo3 观察者是Subscriber即Observer的实现类

        final Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
//相当于发报纸
subscriber.onNext("Hello Android !");
//subscriber.onCompleted();
subscriber.onError(new NullPointerException("mock exception !"));
}
});
//2.观察者Subscriber是Observer的子类 他还提供了取消订阅和判断是否订阅的方法
Subscriber<String> observer = new Subscriber<String>() {

@Override
public void onCompleted() {
Log.i(TAG, "onCompleted: ");
}

@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: "+e.getLocalizedMessage());
}

//接受处理事件的方法
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: "+s);
}
};

//3.关联----》订阅subscribe
observable.subscribe(observer);

}

Demo4 只关心事件Action

//如果只关心onNext事件 而不关心onComplete()/onError
//如果只关心onNext事件 那么被观察者发送了异常而没人处理 就会抛给系统
Action1<String> onNextAction=new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, "call: "+s);
}
};

//关心Error事件
<Throwable> onErrorAction=new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.i(TAG, "call: "+throwable.getLocalizedMessage());
}
};

//关心onComplete事件
Action0 onCompleteAction=new Action0() {
@Override
public void call() {
Log.i(TAG, "onComplete: ");
}
};

Demo5 观察者是Subject的子类AsyncSubject

        //Subject<T, R> extends Observable<R> implements Observer<T>
//AsyncSubject他在创建之后就可以发送数据(不用订阅之后再发送数据)它只接收最后一个onNext()事件(在onComplete调用之前)
//只要没有onComplete被发送 那么观察者就接收不到任何信息
AsyncSubject<String> observable=AsyncSubject.create();
observable.onNext("Hello Android !");
observable.onNext("Hello Java !");
observable.onNext("Hello CPP !");
//observable.onCompleted();
observable.onError(new NullPointerException());

//2.创建一个观察者
Action1<String> onNextAction=new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, "call: "+s);
}
};

//3.实现订阅
observable.subscribe(onNextAction);

Demo6 观察者是Subject的子类BehaviorSubject

        //1.创建一个被观察者BehaviorSubject是以订阅方法作为分界线
//只发送订阅前最后一个onNext事件和订阅后的所有onNext事件
//如果订阅前没有发送数据 那么就会接收构造器里面默认的事件和订阅后的事件。
BehaviorSubject<String> observable= BehaviorSubject.create("DEFAULT");
// observable.onNext("A !");
// observable.onNext("B !");
// observable.onNext("C !");

//2.创建一个观察者
Action1<String> onNextAction=new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, "call: "+s);
}
};

//3.实现订阅
observable.subscribe(onNextAction);

observable.onNext("D !");
observable.onNext("E !");
observable.onNext("F !");

Demo7 观察者是Subject的子类PublishSubject

    //1.创建一个被观察者PublishSubject:
// 它是在创建之后就可以发送事件
// 作为观察者 只能接收到订阅后的所有事件
PublishSubject<String> observable= PublishSubject.create();
observable.onNext("A !");
observable.onNext("B !");
observable.onNext("C !");

//2.创建一个观察者
Action1<String> onNextAction=new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, "call: "+s);
}
};

//3.实现订阅
observable.subscribe(onNextAction);

observable.onNext("D !");
observable.onNext("E !");
observable.onNext("F !");

Demo8 观察者是Subject的子类ReplaySubject

//1.创建一个被观察者ReplaySubject:
//ReplaySubject刚创建完毕的时候就开始发送数据了
//不管观察者是什么时候订阅 它都会接收ReplaySubject对象发出的任何事件。
ReplaySubject<String> observable= ReplaySubject.create();
observable.onNext("A !");
observable.onNext("B !");
observable.onNext("C !");

//2.创建一个观察者
Action1<String> onNextAction=new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, "call: "+s);
}
};

//3.实现订阅
observable.subscribe(onNextAction);

observable.onNext("D !");
observable.onNext("E !");
observable.onNext("F !");

Demo8 观察者是可连接的被观察者ConnectableObservable

 Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
//相当于发报纸
subscriber.onNext("Hello Android !");
subscriber.onNext("Hello Android !");
subscriber.onNext("Hello Android !");
subscriber.onNext("Hello Android !");
}
});

//publish--->将普通的被观察者 变成可连接的观察者
ConnectableObservable<String> connectableObservable = observable.publish();

//refCount--->将可连接的观察者转换成普通的观察者
//Observable<String> stringObservable = connectableObservable.refCount();

Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
Log.i(TAG, "onCompleted: ");
}

@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: "+e.getLocalizedMessage());
}

//接受处理事件的方法
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: "+s);
}
};

connectableObservable.subscribe(observer);

//connect-->让可连接的被观察者调用内部的call方法(相当于发送了事件)
connectableObservable.connect();

Demo9 观察者是可连接的被观察者ConnectableObservable

        //创建一个普通的被观察者
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
//相当于发报纸
subscriber.onNext("Hello Android !");
subscriber.onNext("Hello Android !");
subscriber.onNext("Hello Android !");
subscriber.onNext("Hello Android !");
}
});

//2.将普通的被观察者变成可连接的被观察者
//publish()创建的被观察者只有在connect()之前订阅的观察者才能接收事件 如果在connect()之后订阅的观察者 是无法获取被观察者发送的事件
//有没办法可以让 只要是观察者订阅了可连接的被观察者 都能打印出被观察者发送出来的数据 而不管订阅在connect()的前后顺序。-->replay()
ConnectableObservable<String> connectableObservable = observable.replay();

//3.实现订阅
connectableObservable.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, "call==1===: "+s);
}
});

//4.让被观察者主动发送事件
connectableObservable.connect();

//5.再次订阅一个新的观察者
connectableObservable.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, "call==2===: "+s);
}
});

有的被观察者 在创建之后就马上发送了数据—–》“热”Observable—-》Subject的子类

有的被观察者 在订阅的时候才发送的数据——->”冷”Observable—》普通的Observable

还有一种特殊的被观察者 他可以在我们指定的时间点发送数据—–>”冷”Observable—->可连接的Observable