一、基本概念
reactive x中有几个核心的概念,先来简单介绍一下。
1.1、observable和observer(可观察对象和观察者)
首先是observable和observer,它们分别是可观察对象和观察者。observable可以理解为一个异步的数据源,会发送一系列的值。observer则类似于消费者,需要先订阅observable,然后才可以接收到其发射的值。可以说这组概念是设计模式中的观察者模式和生产者-消费者模式的综合体。
1.2、operator(操作符)
另外一个非常重要的概念就是操作符了。操作符作用于observable的数据流上,可以对其施加各种各样的操作。更重要的是,操作符还可以链式组合起来。这样的链式函数调用不仅将数据和操作分隔开来,而且代码更加清晰可读。一旦熟练掌握之后,你就会爱上这种感觉的。
1.3、single(单例)
在rxjava和其变体中,还有一个比较特殊的概念叫做single,它是一种只会发射同一个值的observable,说白了就是单例。当然如果你对java等语言比较熟悉,那么单例想必也很熟悉。
1.4、subject(主体)
主体这个概念非常特殊,它既是observable又是observer。正是因为这个特点,所以subject可以订阅其他observable,也可以将发射对象给其他observer。在某些场景中,subject会有很大的作用。
1.5、scheduler(调度器)
默认情况下reactive x只运行在当前线程下,但是如果有需要的话,也可以用调度器来让reactive x运行在多线程环境下。有很多调度器和对应的操作符,可以处理多线程场景下的各种要求。
1.6、observer和observable
先来看看一个最简单的例子,运行的结果会依次打印这些数字。这里的of是一个操作符,可以根据给定的参数创建一个新的observable。创建之后,就可以订阅observable,三个回调方法在对应的时机执行。一旦observer订阅了observable,就会接收到后续observable发射的各项值。
1
2
3
4
5
6
7
8
9
|
from rx import of
ob = of( 1 , 2 , 34 , 5 , 6 , 7 , 7 )
ob.subscribe(
on_next = lambda i: print (f 'received: {i}' ),
on_error = lambda e: print (f 'error: {e}' ),
on_completed = lambda : print ( 'completed' )
)
|
这个例子看起来好像很简单,并且看起来没什么用。但是当你了解了rx的一些核心概念,就会理解到这是一个多么强大的工具。更重要的是,observable生成数据和订阅的过程是异步的,如果你熟悉的话,就可以利用这个特性做很多事情。
1.7、操作符
在rxpy中另一个非常重要的概念就是操作符了,甚至可以说操作符就是最重要的一个概念了。几乎所有的功能都可以通过组合各个操作符来实现。熟练掌握操作符就是学好rxpy的关键了。操作符之间也可以用pipe函数连接起来,构成复杂的操作链。
1
2
3
4
5
6
7
8
|
from rx import of, operators as op
import rx
ob = of( 1 , 2 , 34 , 5 , 6 , 7 , 7 )
ob.pipe(
op. map ( lambda i: i * * 2 ),
op. filter ( lambda i: i > = 10 )
).subscribe( lambda i: print (f 'received: {i}' ))
|
在rxpy中有大量操作符,可以完成各种各样的功能。我们来简单看看其中一些常用的操作符。如果你熟悉java8的流类库或者其他函数式编程类库的话,应该对这些操作符感到非常亲切。
1.8、创建型操作符
首先是创建observable的操作符,列举了一些比较常用的创建型操作符。
1.9、过滤型操作符
过滤型操作符的主要作用是对observable进行筛选和过滤。
1.10、转换型操作符
1.11、算术操作符
1.12、subject
subject是一种特殊的对象,它既是observer又是observable。不过这个对象一般不太常用,但是假如某些用途还是很有用的。所以还是要介绍一下。下面的代码,因为订阅的时候第一个值已经发射出去了,所以只会打印订阅之后才发射的值。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
from rx.subject import subject, asyncsubject, behaviorsubject, replaysubject
# subject同时是observer和observable
print ( '--------subject---------' )
subject = subject()
subject.on_next( 1 )
subject.subscribe( lambda i: print (i))
subject.on_next( 2 )
subject.on_next( 3 )
subject.on_next( 4 )
subject.on_completed()
# 2 3 4
|
另外还有几个特殊的subject,下面来介绍一下。
1.13、replaysubject
replaysubject是一个特殊的subject,它会记录所有发射过的值,不论什么时候订阅的。所以它可以用来当做缓存来使用。replaysubject还可以接受一个buffersize参数,指定可以缓存的最近数据数,默认情况下是全部。
下面的代码和上面的代码几乎完全一样,但是因为使用了replaysubject,所以所有的值都会被打印。当然大家也可以试试把订阅语句放到其他位置,看看输出是否会产生变化。
1
2
3
4
5
6
7
8
9
10
|
# replaysubject会缓存所有值,如果指定参数的话只会缓存最近的几个值
print ( '--------replaysubject---------' )
subject = replaysubject()
subject.on_next( 1 )
subject.subscribe( lambda i: print (i))
subject.on_next( 2 )
subject.on_next( 3 )
subject.on_next( 4 )
subject.on_completed()
# 1 2 3 4
|
1.14、behaviorsubject
behaviorsubject是一个特殊的subject,它只会记录最近一次发射的值。而且在创建它的时候,必须指定一个初始值,所有订阅它的对象都可以接收到这个初始值。当然如果订阅的晚了,这个初始值同样会被后面发射的值覆盖,这一点要注意。
1
2
3
4
5
6
7
8
9
10
|
# behaviorsubject会缓存上次发射的值,除非observable已经关闭
print ( '--------behaviorsubject---------' )
subject = behaviorsubject( 0 )
subject.on_next( 1 )
subject.on_next( 2 )
subject.subscribe( lambda i: print (i))
subject.on_next( 3 )
subject.on_next( 4 )
subject.on_completed()
# 2 3 4
|
1.15、asyncsubject
asyncsubject是一个特殊的subject,顾名思义它是一个异步的subject,它只会在observer完成的时候发射数据,而且只会发射最后一个数据。因此下面的代码仅仅会输出4.假如注释掉最后一行co_completed调用,那么什么也不会输出。
1
2
3
4
5
6
7
8
9
10
|
# asyncsubject会缓存上次发射的值,而且仅会在observable关闭后开始发射
print ( '--------asyncsubject---------' )
subject = asyncsubject()
subject.on_next( 1 )
subject.on_next( 2 )
subject.subscribe( lambda i: print (i))
subject.on_next( 3 )
subject.on_next( 4 )
subject.on_completed()
# 4
|
1.16、scheduler
虽然rxpy算是异步的框架,但是其实它默认还是运行在单个线程之上的,因此如果使用了某些会阻碍线程运行的操作,那么程序就会卡死。当然针对这些情况,我们就可以使用其他的scheduler来调度任务,保证程序能够高效运行。
下面的例子创建了一个threadpoolscheduler,它是基于线程池的调度器。两个observable用subscribe_on方法指定了调度器,因此它们会使用不同的线程来工作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
import rx
from rx.scheduler import threadpoolscheduler
from rx import operators as op
import multiprocessing
import time
import threading
import random
def long_work(value):
time.sleep(random.randint( 5 , 20 ) / 10 )
return value
pool_schedular = threadpoolscheduler(multiprocessing.cpu_count())
rx. range ( 5 ).pipe(
op. map ( lambda i: long_work(i + 1 )),
op.subscribe_on(pool_schedular)
).subscribe( lambda i: print (f 'work 1: {threading.current_thread().name}, {i}' ))
rx.of( 1 , 2 , 3 , 4 , 5 ).pipe(
op. map ( lambda i: i * 2 ),
op.subscribe_on(pool_schedular)
).subscribe( lambda i: print (f 'work 2: {threading.current_thread().name}, {i}' ))
|
如果你观察过各个操作符的api的话,可以发现大部分操作符都支持可选的scheduler参数,为操作符指定一个调度器。如果操作符上指定了调度器的话,会优先使用这个调度器;其次的话,会使用subscribe方法上指定的调度器;如果以上都没有指定的话,就会使用默认的调度器。
二、应用场景
好了,介绍了一些reactive x的知识之后,下面来看看如何来使用reactive x。在很多应用场景下,都可以利用reactive x来抽象数据处理,把概念简单化。
2.1、防止重复发送
很多情况下我们都需要控制事件的发生间隔,比如有一个按钮不小心按了好几次,只希望第一次按钮生效。这种情况下可以使用debounce操作符,它会过滤observable,小于指定时间间隔的数据会被过滤掉。debounce操作符会等待一段时间,直到过了间隔时间,才会发射最后一次的数据。如果想要过滤后面的数据,发送第一次的数据,则要使用throttle_first操作符。
下面的代码可以比较好的演示这个操作符,快速按回车键发送数据,注意观察按键和数据显示之间的关系,还可以把throttle_first操作符换成debounce操作符,然后再看看输出会发生什么变化,还可以完全注释掉pipe中的操作符,再看看输出会有什么变化。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
import rx
from rx import operators as op
from rx.subject import subject
import datetime
# debounce操作符,仅在时间间隔之外的可以发射
ob = subject()
ob.pipe(
op.throttle_first( 3 )
# op.debounce(3)
).subscribe(
on_next = lambda i: print (i),
on_completed = lambda : print ( 'completed' )
)
print ( 'press enter to print, press other key to exit' )
while true:
s = input ()
if s = = '':
ob.on_next(datetime.datetime.now().time())
else :
ob.on_completed()
break
|
2.2、操作数据流
如果需要对一些数据进行操作,那么同样有一大堆操作符可以满足需求。当然这部分功能并不是reactive x独有的,如果你对java 8的流类库有所了解,会发现这两者这方面的功能几乎是完全一样的。
下面是个简单的例子,将两个数据源结合起来,然后找出来其中所有的偶数。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
import rx
from rx import operators as op
from rx.subject import subject
import datetime
# 操作数据流
some_data = rx.of( 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 )
some_data2 = rx.from_iterable( range ( 10 , 20 ))
some_data.pipe(
op.merge(some_data2),
op. filter ( lambda i: i % 2 = = 0 ),
# op.map(lambda i: i * 2)
).subscribe( lambda i: print (i))
|
再或者一个利用reduce的简单例子,求1-100的整数和。
1
2
3
4
5
6
7
8
|
import rx
from rx import operators as op
from rx.subject import subject
import datetime
rx. range ( 1 , 101 ).pipe(
op. reduce ( lambda acc, i: acc + i, 0 )
).subscribe( lambda i: print (i))
|
以上就是浅谈python响应式类库rxpy的详细内容,更多关于python响应式类库rxpy的资料请关注服务器之家其它相关文章!
原文链接:https://blog.csdn.net/u011054333/article/details/107240111