上一节我们实现了map和filter函数,我们将这些函数都挂载在MyObservable对象上,这里存在一个问题,类似map和filter这样的操作型函数很多,所以不可能将他们都挂载在MyObservable对象上,因此,这里出现了第二种实现。
这些操作函数能串联起来的本质就是能够形成嵌套调用,因此我想到了使用pipe,pipe的本质是接收一个 RxJS 操作符的运行结果作为参数,并返回一个 Observable
实例。
代码实例
map实现
export function map(fn) {
return (observable)=>{
return new MyObservable(observer => {
observable.subscribe({
next: val=> observer.next(fn(val)),
error: err => observer.error(err),
complete: () => observer.complete()
});
});
}
}
filter实现
export function filter(fn) {
return (observable)=>{
return new MyObservable(observer => {
observable.subscribe({
next: val=> fn(val)? observer.next(val): ()=>{},
error: err => observer.error(err),
complete: () => observer.complete()
});
});
}
}
从这里我们可以看出 RxJS 操作符的运行结果就是map或者filter执行后的返回函数,返回值就是内部的一个MyObservable对象实例。
pipe单参数实现
pipe(operation) {
return operation(this);
}
注意这里的this实际上就是对应函数中的(observable)这个参数。然后调用operation(this)后返回的是一个新的Observable,同时这个参数observable会执行subscribe方法,这个方法会将这些Observable串起来调用。
pipe多参数实现
pipe(...operations) {
return operations.reduce((prev, fn) => fn(prev), this);
}
以上这个函数实现的具体功能就是形成一个函数嵌套调用,并且方向是从左向右的。
这个函数的实现最经典的算是在redux中的一段源码啦,有兴趣的可以看看这个框架,本身代码不多,但是阅读起来不容易理解,感兴趣的可以去看看。
下面我用测试代码验证下这个函数:
const letObservable = of(1,2,3);
const a = interval(500).pipe(map((v) => 'a' + v), take(3));
const b = interval(500).pipe(map((v) => 'b' + v), take(3));
letObservable.pipe(merge(a, b)).subscribe((value) => console.log(value));
日志信息如下:
这里我实现了另外的take和merge方法,调用情况可以知道和RxJs的效果一致。这里我也贴出他们的实现。
take实现
export function take(num) {
return (observable) => (
new MyObservable(observer => {
let times = 0;
let subscription = observable.subscribe({
next: val => {
times++;
if (num >= times) {
observer.next(val)
} else {
observer.complete()
//if (subscription)subscription.unsubscribe()
}
},
error: err => observer.error(err),
complete: () => observer.complete(),
});
})
)
}
tap实现
export function tap(fn) {
return (observable) => {
return new MyObservable(observer => {
observable.subscribe({
next: val => {
fn(val);
observer.next(val);
},
error: err => observer.error(err),
complete: () => observer.complete(),
});
});
};
}
merge实现
export function merge(...observables) {
return (observable) => {
let completeNum = 0;
if (observable) {
observables = [observable,...observables];
}
return new MyObservable(observer => {
observables.forEach(observable => {
observable.subscribe({
next: val => observer.next(val),
error: err => {
observables.forEach(observable.unsubscribe);
observer.error(err)
} ,
complete: () => {
completeNum++;
if (completeNum === observables.length) {
observer.complete();
}
},
});
});
});
};
}
好了,欢迎各位参与讨论。