Rxjava2实战--第三章 创建操作符

时间:2023-07-10 11:21:01

Rxjava2实战--第三章 创建操作符

Rxjava的创建操作符

操作符 用途
just() 将一个或多个对象转换成发射这个或者这些对象的一个Observable
from() 将一个Iterable、一个Future或者一个数组转换成一个Observable
create() 使用一个函数从头创建一个Obervable
defer() 只有当订阅者订阅才创建Observable,为每个订阅者创建一个新的Observab
range() 创建一个发射指定范围的整数序列的Observable
interval() 创建一个按照给定的时间间隔发射整数序列的Observavble
timer() 创建一个在给定的延时之后发射单个数据的Observable
empty() 创建一个什么都不做直接通知完成的Observable
error() 创建一个什么都不做直接通知错误的Observable
nerver() 创建一个不发射任何数据的Observable

1.create、just和from

1.1 create

使用一个函数从头开始创建一个Obervable

        Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i <10 ; i++) {
emitter.onNext(i);
}
emitter.onComplete();
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println(throwable.getMessage());
throwable.printStackTrace();
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("onComplete");
}
});

执行结果:

0
1
2
3
4
5
6
7
8
9
onComplete

1.2 just

创建一个发射指定值的Observable。

发射单个数据:

   Observable.just("Hello World")
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
throwable.printStackTrace();
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("onComplete");
}
});

执行结果:

Hello World
onComplete

just类似于from,但是from会将数组或Iterable的数据取出然后逐个发射,而just只是简单地原样发射,将数组或Iterable当单个参数。

它可以接受一至十个参数,返回一个按参数列表顺序发射这些数据的Observable。

    Observable.just(1,2,3,4,5,6,7,8,9,10)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println(throwable.getMessage());
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("onComplete");
}
});

执行结果:

1
2
3
4
5
6
7
8
9
10
onComplete

在Rxjava2.0中,如果在just()中传入null,则会抛出一个空指针异常。

 java.lang.NullPointerException: item is null

1.3 from

from可以将其他种类的对象和数据类型转换为Observable

在Rxjava中,from操作符可以将Future、Iterable和数组转换成Observable。对于Iterable和数组,产生的Observable会发射Iterable或数组的每一项数据。

1.数组:fromArray(T... items)

    Observable.fromArray("hello","world")
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
执行结果:
hello
world

2.Iterable:fromIterable(Iterable<? extends T> source)

    Observable.fromIterable(list)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
}); 执行结果:
1
2
3
4
5
6
7
8
9
10

3.fromFuture(Future<? extends T> future)

对于Future,他会发射Future.get()方法返回的数据

        ExecutorService executorService= Executors.newCachedThreadPool();
Future<String> future=executorService.submit(new MyCallable()); Observable.fromFuture(future)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
}); static class MyCallable implements Callable<String>{ @Override
public String call() throws Exception {
System.out.println("模拟一些耗时的任务...");
Thread.sleep(5000);
return "OK";
}
}
} 执行结果:
模拟一些耗时的任务...
OK

4. fromFuture(Future<? extends T> future, long timeout, TimeUnit unit)

指定超时时长和时间单位。如果过了指定的时长,Future还没有返回一个值,那么这个Observable就会发射错误通知并终止。

        ExecutorService executorService=Executors.newCachedThreadPool();
Future<String> future=executorService.submit(new MyCallable()); Observable.fromFuture(future,4,TimeUnit.SECONDS)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
throwable.printStackTrace();
System.out.println(throwable.getMessage());
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("onComplete");
}
}); 执行结果:
模拟一些耗时的任务...
java.util.concurrent.TimeoutException...
null

2.repeat

repeat会重复发射数据。 repeat不是创建一个Observable,而是重复发射原始Observable的数据序列,这个序列或者是无限的,或者是通过repeat(n)指定的重复次数。

1.repeat(long times)

        Observable.just("hello repeat")
.repeat(3)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
执行结果:
hello repeat
hello repeat
hello repeat

2.repeatWhen

repeatWhen不是缓存和重放原始Observable的数据序列,而是有条件地重复订阅和发射原来的Observable.

将原始Observable的终止通知(完成或错误)当作一个void数据传递给一个通知处理器,以此来决定是否要重新订阅和发射原来的observable.这个通知处理器就像一个Observable操作符,接受一个发射void通知的Observable(意思是,重新订阅和发射原始Observable)作为输入,返回一个发射void数据或者直接终止(既使用repeatwhen终止发射数据)的Observable。

当apply方法返回的是Observable.empty(),Observable.error()。则不发送事件。

        Observable.range(0,9)
.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
return objectObservable.timer(10,TimeUnit.SECONDS);
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception { }
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("onComplete");
}
}); try {
Thread.sleep(12000);
} catch (InterruptedException e) {
e.printStackTrace();
} 执行结果: 0
1
2
3
4
5
6
7
8
0
1
2
3
4
5
6
7
8
onComplete

3. repeatUntil

repeatUntil表示直到某个条件就不再重复发射数据。当BooleanSupplier的getAsBoolean()返回false时,表示重复发射上游的Observable;当getAsBoolean()为true时,表示中止重复发射上游的Observab。

        long startTimeMillis=System.currentTimeMillis();
Observable.interval(500, TimeUnit.MILLISECONDS)
.take(5)
.repeatUntil(new BooleanSupplier() {
@Override
public boolean getAsBoolean() throws Exception {
Log.e(TAG, "getAsBoolean: time:"+(System.currentTimeMillis()-startTimeMillis ));
return (System.currentTimeMillis()-startTimeMillis)>5000;
}
}).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "accept: "+aLong );
}
}); 执行结果:
2019-11-13 13:42:36.672 15936-15976/com.loan.rxjavademo E/MainActivity: accept: 0
2019-11-13 13:42:37.172 15936-15976/com.loan.rxjavademo E/MainActivity: accept: 1
2019-11-13 13:42:37.673 15936-15976/com.loan.rxjavademo E/MainActivity: accept: 2
2019-11-13 13:42:38.172 15936-15976/com.loan.rxjavademo E/MainActivity: accept: 3
2019-11-13 13:42:38.673 15936-15976/com.loan.rxjavademo E/MainActivity: accept: 4
2019-11-13 13:42:38.673 15936-15976/com.loan.rxjavademo E/MainActivity: getAsBoolean: time:2513
2019-11-13 13:42:39.175 15936-16023/com.loan.rxjavademo E/MainActivity: accept: 0
2019-11-13 13:42:39.676 15936-16023/com.loan.rxjavademo E/MainActivity: accept: 1
2019-11-13 13:42:40.175 15936-16023/com.loan.rxjavademo E/MainActivity: accept: 2
2019-11-13 13:42:40.675 15936-16023/com.loan.rxjavademo E/MainActivity: accept: 3
2019-11-13 13:42:41.175 15936-16023/com.loan.rxjavademo E/MainActivity: accept: 4
2019-11-13 13:42:41.175 15936-16023/com.loan.rxjavademo E/MainActivity: getAsBoolean: time:5015

3. defer、interval和timer

3.1 defer

直到有观察者订阅时才创建Observable,并且为每个观察者创建一个全新的Observable。

defer操作符会一直等待直到有观察者订阅它,然后它使用Observable工厂方法生成一个Observable。它对每个观察者都这样做,因此尽管每个订阅者都以为自己订阅的是同一个Observable,但事实上每个订阅者获取的是他们自己单独的数据序列。

           static int i=10;
public static void main(String[] args) {
Observable observable=Observable.defer(new Supplier<ObservableSource<?>>() {
@Override
public ObservableSource<?> get() throws Throwable {
return Observable.just(i);
}
});
i=20;
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
System.out.println(integer);
}
});
} 执行结果:
20

3.2 interval

创建一个按固定时间间隔发射整数序列的Observable。

interval操作符返回一个Observable,它按固定的时间间隔发射一个无限递增的整数序列。

interval操作符默认在computation调度器上执行。

       Observable.interval(1,TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println(aLong);
}
}); try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
} 执行结果:
0
1
2
3
4
5
6
7
8

3.3 timer

创建一个Observable,它在一个给定的延迟后发射一个特殊的值。

timer操作符创建一个在给定的时间段之后返回一个特殊值的Observable。

timer返回一个Observable,它在延迟一段给定的时间后发射一个简单地数字0.

time操作符默认在computation调度器上执行。

      Observable.timer(2,TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println(aLong);
System.out.println("hello world");
}
}); try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
} 执行结果:
0
hello world