2018年5月24日笔记

时间:2022-12-22 21:37:52
  • 消息队列 Message Queue

MQ是在消息传输过程中保存消息的容器。

MQ最经典的用法就是在producer和customer之间通过消息管道来传递消息,producer往管道中写入消息,customer从管道中读取消息。

操作系统提供了很多机制来实现进程间的通信,multiprocessing模块就提供了Queue()和Pipe()两种方法来实现。

 

 

  • multiprocessing 模块中的方法:Queue()
 1 from multiprocessing import Queue
 2 
 3 q = Queue(3)        # 消息队列最大个数为3
 4 
 5 q.put(1)            # put()向队列中放数据
 6 q.put("A")
 7 q.put(["abc", "345"])
 8 # q.put("test")         # 队列满了就无法再放入数据了
 9 # q.put_nowait()        # 不会等待队列有空闲位置再放入数据,如果数据放入不成功就直接崩溃(不建议使用put_nowait)
10 
11 print(q.full())           # 判断队列此时是否已满
12 print(q.get())
13 print(q.get())
14 
15 print(q.full())
16 print(q.get())
17 print(q.empty())        # 判断此时队列是否为空
18 # print(q.get())          # 队列空了就无法再读取数据了
19 # q.get_nowait()        # 队列为空,取值的时候不等待,但是取不到值那么直接崩溃了
True
1
A
False
['abc', '345']
True

 

 

  • multiprocessing模块中的方法:Pipe()

multiprocessing.Pipe([duplex]) 
1)返回2个连接对象(conn1, conn2),代表管道的两端,默认是双向通信.如果duplex=False,conn1只能用来接收消息,conn2只能用来发送消息.不同于os.open之处在于os.pipe()返回2个文件描述符(r, w),表示可读的和可写的。

2)send()和recv()方法分别是发送和接收消息。close()表示关闭关闭管道

 1 from multiprocessing import Process, Pipe
 2 
 3 
 4 def fun(pipe, x):
 5     pipe.send("Hello {0}".format(x))
 6 
 7 reciver, sender = Pipe()
 8 p = Process(target=fun, args=(sender, 'Karl',))  # 传递参数为sender,其实传递的是reciver也是可以的
 9 p.start()
10 print(reciver.recv())
11 p.join()
12 print(reciver.recv())           #在等待接收
Hello Karl

 

 

  • Queue模块

python提供了Queue模块来专门实现消息队列。

Queue对象实现一个FIFO队列,Queue()只有一个构造参数maxsize,用来指定队列长度,maxsize小于1就表示队列长度无限。

 

Queue对象主要有以下成员函数:

qsize():  放回MQ的当前空间

empty():  判断MQ是否为空

full():  判断MQ是否满

put(item, block=True, timeout=None):  往MQ中存放消息。block可以控制是否阻塞,timeout指定阻塞等待时间。

put_nowait(item):  相当于put(item, False)

get(block=True, timeout=None)

 

以下两个函数用来判断消息对应的任务是否完成。

task_done():  接收消息的线程通过调用此函数来说明消息对应的任务已完成。

join():  实际上意味着等到队列为空,再执行别的操作。

 

 

  • Celery异步分布式

Celery 是一个由 Python 编写的简单、灵活、可靠的用来处理大量信息的分布式系统,它同时提供操作和维护分布式系统所需的工具。

Celery 专注于实时任务处理,支持任务调度。

说白了,它是一个分布式队列的管理工具,我们可以用 Celery 提供的接口快速实现并管理一个分布式的任务队列。

Celery 本身不是任务队列,它是管理分布式任务队列的工具,或者换一种说法,它封装好了操作常见任务队列的各种操作,我们用它可以快速进行任务队列的使用与管理。

 

  • 使用Celery时的几个常用概念

1) Brokers

  brokers 中文意思为中间人,在这里就是指任务队列本身,Celery 扮演生产者和消费者的角色,brokers 就是生产者和消费者存放/拿取产品的地方(队列)

常见的 brokers 有 rabbitmq、redis、Zookeeper 等。

 

2) Result Stores / Backend

  顾名思义就是结果储存的地方,队列中的任务运行完后的结果或者状态需要被任务发送者知道,那么就需要一个地方储存这些结果,就是 Result Stores 了

常见的 backend 有 redis、Memcached 甚至常用的数据都可以。

 

3) Workers

  就是 Celery 中的工作者,类似与生产/消费模型中的消费者,其从队列中取出任务并执行。

 

4) Tasks

  就是我们想在队列中进行的任务咯,一般由用户、触发器或其他操作将任务入队,然后交由 workers 进行处理。

 

5) Celery 其内建任务状态有如下几种:

参数 说明
PENDING 任务等待中
STARTED 任务已开始
SUCCESS 任务执行成功
FAILURE 任务执行失败
RETRY 任务将被重试
REVOKED 任务取消

 

 

  • 习题

这里我们用 redis 当做 celery 的 broker 和 backend。

首先,写一个task:

1 #tasks.py
2 from celery import Celery
3  
4 app = Celery('tasks',  backend='redis://localhost:6379/0', broker='redis://localhost:6379/0') #配置好celery的backend和broker
5  
6 @app.task  #普通函数装饰为 celery task
7 def add(x, y):
8     return x + y

OK,到这里,broker 我们有了,backend 我们有了,task 我们也有了,现在就该运行 worker 进行工作了,在 tasks.py 所在目录下运行:

1 celery -A tasks worker --loglevel=info

意思就是运行 tasks 这个任务集合的 worker 进行工作(当然此时broker中还没有任务,worker此时相当于待命状态)

最后一步,就是触发任务啦,最简单方式就是再写一个脚本然后调用那个被装饰成 task 的函数:

1 #trigger.py
2 from tasks import add
3 result = add.delay(4, 4) #不要直接 add(4, 4),这里需要用 celery 提供的接口 delay 进行调用
4 while not result.ready():
5     time.sleep(1)
6 print 'task done: {0}'.format(result.get())

运行此脚本

delay 返回的是一个 AsyncResult 对象,里面存的就是一个异步的结果,当任务完成时result.ready() 为 true,然后用 result.get() 取结果即可。

到此,一个简单的 celery 应用就完成啦。