RxJs——map,filter第二种实现

时间:2022-03-02 11:21:46

上一节我们实现了map和filter函数,我们将这些函数都挂载在MyObservable对象上,这里存在一个问题,类似map和filter这样的操作型函数很多,所以不可能将他们都挂载在MyObservable对象上,因此,这里出现了第二种实现。

 

这些操作函数能串联起来的本质就是能够形成嵌套调用,因此我想到了使用pipe,pipe的本质是接收一个 RxJS 操作符的运行结果作为参数,并返回一个 Observable 实例。

代码实例

map实现

export function map(fn) {
    return (observable)=>{
        return new MyObservable(observer => {
            observable.subscribe({
                next: val=> observer.next(fn(val)),
                error: err => observer.error(err),
                complete: () => observer.complete()
            });
        });
    }
}

RxJs——map,filter第二种实现

filter实现

export function filter(fn) {
    return (observable)=>{
        return new MyObservable(observer => {
            observable.subscribe({
                next: val=> fn(val)? observer.next(val): ()=>{},
                error: err => observer.error(err),
                complete: () => observer.complete()
            });
        });
    }
}

从这里我们可以看出 RxJS 操作符的运行结果就是map或者filter执行后的返回函数,返回值就是内部的一个MyObservable对象实例。

RxJs——map,filter第二种实现

pipe单参数实现

pipe(operation) {
  return operation(this);
}

注意这里的this实际上就是对应函数中的(observable)这个参数。然后调用operation(this)后返回的是一个新的Observable,同时这个参数observable会执行subscribe方法,这个方法会将这些Observable串起来调用。

 

pipe多参数实现

pipe(...operations) {
    return operations.reduce((prev, fn) => fn(prev), this);
}

以上这个函数实现的具体功能就是形成一个函数嵌套调用,并且方向是从左向右的。

这个函数的实现最经典的算是在redux中的一段源码啦,有兴趣的可以看看这个框架,本身代码不多,但是阅读起来不容易理解,感兴趣的可以去看看。

下面我用测试代码验证下这个函数:

const letObservable = of(1,2,3);
const a = interval(500).pipe(map((v) => 'a' + v), take(3));
const b = interval(500).pipe(map((v) => 'b' + v), take(3));
letObservable.pipe(merge(a, b)).subscribe((value) => console.log(value));

日志信息如下:

RxJs——map,filter第二种实现

RxJs——map,filter第二种实现

这里我实现了另外的take和merge方法,调用情况可以知道和RxJs的效果一致。这里我也贴出他们的实现。

take实现

export function take(num) {
    return (observable) => (
      new MyObservable(observer => {
        let times = 0;
        let subscription = observable.subscribe({
          next: val => {
            times++;
            if (num >= times) {
              observer.next(val)
            } else {
              observer.complete()
              //if (subscription)subscription.unsubscribe()         
            }
          },
          error: err => observer.error(err),
          complete: () => observer.complete(),
        });
      })
    )
}

RxJs——map,filter第二种实现

tap实现

export function tap(fn) {
    return (observable) => {
        return new MyObservable(observer => {
            observable.subscribe({
                next: val => {
                    fn(val);
                    observer.next(val);
                },
                error: err => observer.error(err),
                complete: () => observer.complete(),
            });
        });
    };
}

RxJs——map,filter第二种实现

merge实现

export function merge(...observables) {
    return (observable) => {
        let completeNum = 0;
        if (observable) {
            observables = [observable,...observables];
        }
        return new MyObservable(observer => {
            observables.forEach(observable => {

                observable.subscribe({
                    next: val => observer.next(val),
                    error: err => {
                        observables.forEach(observable.unsubscribe);
                        observer.error(err)
                    } ,
                    complete: () => {
                        completeNum++;
                        if (completeNum === observables.length) {
                            observer.complete();
                        }
                        
                    },
                });
            });
        });
    };
}

RxJs——map,filter第二种实现

好了,欢迎各位参与讨论。