RxJs——创建型操作

时间:2022-06-01 12:59:27

我们在使用RxJs中,知道RxJs的操作分为两类,一类是创建型,比如of(),fromEvent(),from()等,还有一类是操作型,比如map(),filter()。如果对Observable不甚理解,需要快速理解的可以查看这一篇

今天我们来学习第一类,尝试着自己实现一次。

of()

这个操作接收一些参数,返回一个Observable,一旦订阅之后,将这些值将发出执行。

代码实现

export function of(...args) {
    return new MyObservable(observer => {
        args.forEach(arg => {
            observer.next(arg);
        });
        observer.complete();
        return {
            unsubscribe: ()=>{}
        };
    });
}

//代码使用
const observer = {
	next: value => console.log('value:',value),
	error: err => console.error('error:',err),
	complete: () => console.log('done')
}

const newObservable = of(1,2,3,4,5);
newObservable.subscribe(observer );

RxJs——创建型操作

执行日志

RxJs——创建型操作

它的本质上就是将of的参数值交给observer迭代执行。简单吧。

同样的道理,我可以实现一下一些创建型的Observable。

fromEvent()

这个属性绑定一些事件,事件源就是一个Observable。

代码实现

export function fromEvent(element,event) {
    return new MyObservable(observer => {
        const handler = e =>observer.next(e);
        element.addEventListener(event, handler);
        return {
            unsubscribe: ()=>element.removeEventListener(event, handler)
        };
    });
}

代码执行

const element = document.body;
const eleObservable = fromEvent(element,'click');
const observer = {
    next: value => console.log('value:',value),
    error: err => console.error('error:',err),
    complete: () => console.log('done')
}
eleObservable.subscribe(observer);

执行日志

RxJs——创建型操作

interval()

这个用于定时发送一些值。

代码实现

export function interval(delay) {
    return new MyObservable(observer => {
        let index = 0;
        const time = setInterval(()=> {
            observer.next(index++);
        }, delay);
        return {
            unsubscribe: () => clearInterval(time)
        };
    });
}

修改一行代码

const newObservable = interval(3000);

执行日志

RxJs——创建型操作

timer()

延迟一定时间后发送一个值

代码实现

export function timer(delay) {
    return new MyObservable(observer => {
        const time = setTimeout(()=> {
            observer.next(0);
        }, delay);
        return {
            unsubscribe: () => clearTimeout(time)
        };
    });
}

修改一行代码

const newObservable = timer(3000);

执行日志

RxJs——创建型操作

from()

和of()有些类似,可以接收数组和Promise

代码实现

export function from(param) {
    if (Array.isArray(param)) {
        return new MyObservable(observer => {
            param.forEach( v=> observer.next(v)); 
            observer.complete();
            return {
                unsubscribe: () => { }
            }
        });
    }

    return new MyObservable(observer => {
        let canceld = false;
        Promise.resolve(param)
            .then(val => {
                if (!canceld) {
                    observer.next(val);
                    observer.complete();
                }
            }).catch(err => {
                observer.error(err); 
            }
        );
        return {
            unsubscribe: () => { canceld = true }
        }

    });
}

修改一行代码

const newObservable = from([6,7,8,9,10]);

数组执行日志

RxJs——创建型操作

promise代码执行

const newPromise = new Promise((resolve, reject)=>{
    console.log('promise resolve');
    resolve('resolve done')
});
const newObservable = from(newPromise);
newObservable.subscribe(observer );

Promise执行日志

RxJs——创建型操作

RxJs——创建型操作

今天学习到这里,记录一下,方便以后查找,明天学习map和filter的实现。