我们在使用RxJs中,知道RxJs的操作分为两类,一类是创建型,比如of(),fromEvent(),from()等,还有一类是操作型,比如map(),filter()。如果对Observable不甚理解,需要快速理解的可以查看这一篇
今天我们来学习第一类,尝试着自己实现一次。
of()
这个操作接收一些参数,返回一个Observable,一旦订阅之后,将这些值将发出执行。
代码实现
export function of(...args) {
return new MyObservable(observer => {
args.forEach(arg => {
observer.next(arg);
});
observer.complete();
return {
unsubscribe: ()=>{}
};
});
}
//代码使用
const observer = {
next: value => console.log('value:',value),
error: err => console.error('error:',err),
complete: () => console.log('done')
}
const newObservable = of(1,2,3,4,5);
newObservable.subscribe(observer );
执行日志
它的本质上就是将of的参数值交给observer迭代执行。简单吧。
同样的道理,我可以实现一下一些创建型的Observable。
fromEvent()
这个属性绑定一些事件,事件源就是一个Observable。
代码实现
export function fromEvent(element,event) {
return new MyObservable(observer => {
const handler = e =>observer.next(e);
element.addEventListener(event, handler);
return {
unsubscribe: ()=>element.removeEventListener(event, handler)
};
});
}
代码执行
const element = document.body;
const eleObservable = fromEvent(element,'click');
const observer = {
next: value => console.log('value:',value),
error: err => console.error('error:',err),
complete: () => console.log('done')
}
eleObservable.subscribe(observer);
执行日志
interval()
这个用于定时发送一些值。
代码实现
export function interval(delay) {
return new MyObservable(observer => {
let index = 0;
const time = setInterval(()=> {
observer.next(index++);
}, delay);
return {
unsubscribe: () => clearInterval(time)
};
});
}
修改一行代码
const newObservable = interval(3000);
执行日志
timer()
延迟一定时间后发送一个值
代码实现
export function timer(delay) {
return new MyObservable(observer => {
const time = setTimeout(()=> {
observer.next(0);
}, delay);
return {
unsubscribe: () => clearTimeout(time)
};
});
}
修改一行代码
const newObservable = timer(3000);
执行日志
from()
和of()有些类似,可以接收数组和Promise
代码实现
export function from(param) {
if (Array.isArray(param)) {
return new MyObservable(observer => {
param.forEach( v=> observer.next(v));
observer.complete();
return {
unsubscribe: () => { }
}
});
}
return new MyObservable(observer => {
let canceld = false;
Promise.resolve(param)
.then(val => {
if (!canceld) {
observer.next(val);
observer.complete();
}
}).catch(err => {
observer.error(err);
}
);
return {
unsubscribe: () => { canceld = true }
}
});
}
修改一行代码
const newObservable = from([6,7,8,9,10]);
数组执行日志
promise代码执行
const newPromise = new Promise((resolve, reject)=>{
console.log('promise resolve');
resolve('resolve done')
});
const newObservable = from(newPromise);
newObservable.subscribe(observer );
Promise执行日志
今天学习到这里,记录一下,方便以后查找,明天学习map和filter的实现。