Rxjs expand的用法分析

时间:2023-05-12 17:43:01

Rxjs的expand()函数声明:

public expand(project: function(value: T, index: number), concurrent: number, scheduler: Scheduler): Observable

expand()会递归调用project函数,project函数把源值映射为一个Observable,每次递归调用都是把前一次调用输出的Observable的源作为输入。最后把所有的Observable合并为一个Observable作为结果输出。

它接收三个参数:

  • project:映射函数,以前一次调用输出额Observable作为输入,返回一个新的Observable。
  • concurrent:最大并发次数,默认值为Number.POSITIVE_INFINITY,即无限次递归调用
  • scheduler:默认为null,表示立即执行

单个源值

示例:

const source = Rx.Observable.of(1);
const powerTow = source
 // 递归调用提供的函数
 .expand(val => {
  console.log(`输入: ${val}`);
  return Rx.Observable.of(2*val);
 })
 .take(5);
const subscribe = powerTow.subscribe(val => console.log(`输出: ${val}`));

示例输出源值1,每次对源值乘以2。take(5)限定了只输出前5个Observable,包括初始的源值1。

结果为:

"输出: 1"
"输入: 1"
"输出: 2"
"输入: 2"
"输出: 4"
"输入: 4"
"输出: 8"
"输入: 8"
"输出: 16"
"输入: 16"

多源值

单个源值是比较容易理解,递归调用project函数是顺序的。当源值产生是异步且是多个的,每一个源值都会单独递归调用project映射函数。最后把所有产生的Observable合并为一个Observable作为结果。

示例:

var clicks = Rx.Observable.fromEvent(document, 'click');
var powersOfTwo = clicks
 .map(e => 1)
 .expand(x => {
    console.log(`输入:${x}`);
    return Rx.Observable.of(2 * x).delay(1000)})
 .take(5);
powersOfTwo.subscribe(x => console.log(`输出:${x}`));

示例监听页面的点击事件,每次点击发送源值1,并对源值执行expand,expand的映射函数也是将源值乘以2,并延时1000毫秒。

点击一次,输出结果是和上面的例子一样。连续点击两次,输出结果:

输出:1
输入:1
输出:1
输入:1
输出:2
输入:2
输出:2
输入:2
输出:4
输入:4

由于例子里做了延时,多次点击的时间不同,输出的结果也是不同的。

递归调用的终止

默认情况下,expand递归调用时不限次数的,那如何终止呢。可以有以下一些可以终止递归调用:

1、使用take()操作符

2、映射函数project,返回一个empty Observable

var source = new Rx.Observable.of(3);

source.expand(function(x) {
  console.log('count: ' + x);
  x--;

return (x >= 0) ? Rx.Observable.just(x) : Rx.Observable.empty()
})
.subscribe(
    function (x) {
        console.log('Next: ' , x);
    },
    function (err) {
        console.log('Error: ' + err);   
    },
    function () {
        console.log('Completed');   
    });