RxJava之过滤操作符

时间:2022-10-15 17:50:28

RxJava之过滤操作符


      涉及到列表的数据时,总是会想到一个过滤这个词语。比如,在1-100的整数中,筛选出偶数或者奇数相加,或者将前49个数相加,又或者后36个数相加,等等。在这样的场景中,不由想到将需要的数据筛选出来。在发射的Observable中,可不可以做筛选呢?

过滤序列 - filter

    filter操作符是对源Observable产生的结果按照指定条件进行过滤,只有满足条件的结果才会提交给订阅者。

    其流程图如下:

RxJava之过滤操作符

    
    Observable.from(mLists)
.filter(new Func1<Student, Boolean>() {
@Override
public Boolean call(Student student) {
return student.getName().startsWith("A");
}
})
.subscribe(new Observer<Student>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Student student) {
mAdaStu.addData(student);
}
});
RxJava之过滤操作符

       查看上述代码,传一个新的Func1对象给filter()函数,即只有一个参数的函数。Func1有一个AppInfo对象来作为它的参数类型并且返回Boolean对象。只要条件符合filter()函数就会返回true。此时,值会发射出去并且所有的观察者都会接收到。
    
    因为该方法只有一个回调方法,我们可以使用java8的的lumbda表达式,形式如下:
    Observable.from(mLists)
.filter(student -> student.getName().startsWith("A"))
.subscribe(new Observer<Student>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Student student) {
mAdaStu.addData(student);
}
});


    
    filter()函数最常用的用法之一时过滤null对象,它帮我们免去了在onNext()函数调用中再去检测null值,让我们把注意力集中在应用业务逻辑上.。
    .filter(new Func1<AppInfo,Boolean>(){
@Override
public Boolean call(AppInfo appInfo){
return appInfo != null;
}
})

截取前N个 - take

    take操作符是用整数N来作为一个参数,把源Observable产生的结果,提取前面的N个提交给订阅者,而忽略后面的结果。

    其流程图如下

    RxJava之过滤操作符
    
    示例代码:
    Observable.from(mLists)
.take(5)
.subscribe(new Observer<Student>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Student student) {
mAdaStu.addData(student);
}
});
    RxJava之过滤操作符
       从结果图例中可以看到,对这个可观测序列应用take(3)函数,然后我们创建一个只发射可观测源的第一个到第五个数据的列,将序列发射出去并且观察者都会接收到。

截取后N个 -  takeLast

    takeLast操作符是把源Observable产生的结果的后n项提交给订阅者,提交时机是Observable发布onCompleted通知之时。

    其流程图如下:

    RxJava之过滤操作符
    示例代码:
    Observable.from(mLists)
.takeLast(5)
.subscribe(new Observer<Student>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Student student) {
mAdaStu.addData(student);
}
});
RxJava之过滤操作符
       从结果图例中可以看到,对这个可观测序列应用take(3)函数,然后我们创建一个只发射可观测源的最后5个数据的新序列, 将序列发射出去并且观察者都会接收到。

截取某条件后第一个 - takeFirst

       takeFirst操作符类似于take操作符,同时也类似于first操作符,都是获取源Observable产生的结果列表中符合指定条件的前一个或多个,与first操作符不同的是,first操作符如果获取不到数据,则会抛出NoSuchElementException异常,而takeFirst则会返回一个空的Observable,该Observable只有onCompleted通知而没有onNext发送Observable。
    takeFirst操作符的流程图如下:
    RxJava之过滤操作符
    示例代码:
    Observable.from(mLists)
.takeFirst(new Func1<Student, Boolean>() {
@Override
public Boolean call(Student student) {
return student.getName().startsWith("B");
}
})
.subscribe(new Observer<Student>() {
@Override
public void onCompleted() {
Log.i("123", "doTakeFirst - onCompleted");
}

@Override
public void onError(Throwable e) {
Log.i("123", "doTakeFirst - onError");
}

@Override
public void onNext(Student student) {
Log.i("123", "doTakeFirst - onNext");
mAdaStu.addData(student);
}
});

RxJava之过滤操作符 

过滤重复源 - distinct

    distinct操作符对源Observable产生的结果进行过滤,把重复的结果过滤掉,只输出不重复的结果给订阅者。
    distinct操作符的流程图如下:
    RxJava之过滤操作符
    用之前学过的take()和repeat()创建一个重复发射的列表。
    Observable<AppInfo> fullOfDuplicates = Observable.from(apps)
.take(3)
.repeat(3);
    从上述代码可知,fullOfDuplicates变量里把List的前三个重复了3次,有9个并且许多重复的,然后使用distinct进行过滤。
    fullOfDuplicates.distinct()
.subscribe(new Observer<Student>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Student student) {
mAdaStu.addData(student);
}
});

RxJava之过滤操作符
    
    观看上面的效果图,很明显,观察者只是收到了三个不重复的Observable,重复的Observable已被distinct函数过滤

 与前一个Observable某一条件重复过滤 - distinctUntilChanged

       distinctUntilChanged操作符对源Observable产生的结果进行条件过滤,把与前一个Observable某条件重复的结果过滤掉,只输出不重复的结果给订阅者。

      distinctUntilChanged操作符的流程图如下:

RxJava之过滤操作符

    示例代码:
    Observable.from(mLists)
.distinctUntilChanged(new Func1<Student, String>() {
@Override
public String call(Student student) {
return student.getAge();
}
})
.subscribe(new Observer<Student>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Student student) {
mAdaStu.addData(student);
}
});

RxJava之过滤操作符
      依据上述代码及效果图,明显的看出来,观察者收到的都是与前一个Observable的age字段不重复的Observable,与前一个Observable的age字段
      重复的Observable已被distinctUntilChanged过滤掉。

只发射一个元素 - first

       first操作符是从Observable列表中只发射第一个,将其余的都过滤掉,同时观察者也只收到第一个Observable。如果不做条件限制,默认列表中的第一个,否则第一个Observable为条件限制的第一个。
       first操作符的流程图如下:
    RxJava之过滤操作符
    示例代码:
 
  Observable.from(mLists)
.first()
.subscribe(new Observer<Student>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Student student) {
mAdaStu.addData(student);
}
});

       当然,first与其他操作符一样,可以传func()参数,对Observable列表中的哪一个是第一个进行限制。例如,学生列表中,将名字第一个字母为"B"作为
       第一个Observable,我们可以这样写:
    .first(new Func1<Student, Boolean>() {
@Override
public Boolean call(Student student) {
return student.getName().startsWith("B");
}
})
//Lamada写法
.first(student -> student.getName().startsWith("B"))

    效果图

RxJava之过滤操作符RxJava之过滤操作符

只发射一个元素 - last

      last操作符,与first操作符恰恰相反,是从Observable列表中只发射最后一个,将其余的都过滤掉,同时观察者也只收到最后一个Observable。
       last操作符的流程图如下:
    RxJava之过滤操作符
    示例代码:
    Observable.from(mLists)
.last()
.subscribe(new Observer<Student>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Student student) {
mAdaStu.addData(student);
}
});
RxJava之过滤操作符

不发射前N个 - skip

      skip操作符与take相对应,是以N作为参数,表示从Observable列表中忽略前N个,将其余的全部发射,同时观察者也只收到发射的Observable。
      skip操作符的流程图如下:
    RxJava之过滤操作符
    示例代码:    
    Observable.from(mLists)
.skip(2)
.subscribe(new Observer<Student>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Student student) {
mAdaStu.addData(student);
}
});
RxJava之过滤操作符

不发射后N个 - skipLast

       skipLast操作符与takeLast相对应,是以N作为参数,表示从Observable列表中忽略后N个,将其余的全部发射,同时观察者也只收到发射的Observable。
       skipLast操作符的流程图如下:
    RxJava之过滤操作符
    示例代码:
    Observable.from(mLists)
.skipLast(6)
.subscribe(new Observer<Student>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Student student) {
mAdaStu.addData(student);
}
});
RxJava之过滤操作符
 

发射某一位置 - elementAt

       elementAt操作符以N作为参数,从Observable列表中发射位置在N处的元素,将其余的忽略,同时观察者也只收到发射的Observable。
       elementAt操作符的流程图如下:
    RxJava之过滤操作符
    示例代码:    
    Observable.from(mLists)
.elementAt(2)
.subscribe(new Observer<Student>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Student student) {
mAdaStu.addData(student);
}
});

RxJava之过滤操作符
       注:亲测N不在序列表范围内,如果N为负数,会报异常IndexOutOfBoundsException;如果N大于列表长度,不会报异常,但观察者不会收到Observable。

间隔一段时间发射 - sample

      sample操作符是Observable将会在间隔一段时间内发送最后一个。sample()支持全部的时间单位:秒,毫秒,天,分等等。sample主要用于动态变化的Observable列表,
      比如温度传感器,它每秒都会发射当前室内的温度。在发射时,可以设置时间间隔。在Observable后面加一个sample(),我们将创建一个新的可观测序列,它将在一个
      指定的时间间隔里由Observable发射最近一次的数值,如果我们想让它定时发射第一个元    素而不是最近的一个元素,我们可以使用throttleFirst()。
    sample操作符的流程图如下:
    RxJava之过滤操作符
    示例代码:    
    Observable<Integer> sensor = [...]

sensor.sample(30,TimeUnit.SECONDS)
.subscribe(new Observer<Integer>() {

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Integer currentTemperature) {
updateDisplay(currentTemperature)
}
});
   

超时 - timeout

       timeout操作符主要检测间隔一段事件发射一个Observable,使用timeout()函数来监听源可观测序列,就是在我们设定的时间间隔内如果没有得到一个值则发射一个错误,即onError()方法。
    timeout操作符的流程图如下:
    RxJava之过滤操作符
    示例代码:
    Subscription subscription = getCurrentTemperature()
.timeout(2,TimeUnit.SECONDS)
.subscribe(new Observer<Integer>() {

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {
Log.d("RXJAVA","You should go check the sensor, dude");
}

@Override
public void onNext(Integer currentTemperature) {
updateDisplay(currentTemperature)
}
});

发射频率过快 - debounce

       debounce操作符过滤掉由Observable发射的速率过快的数据;如果在一个指定的时间间隔过去了仍旧没有发射一个,那么它将发射最后的那个。
      debounce操作符的流程图如下:

   RxJava之过滤操作符