使用python做计算的时候,为了加快速率,可以启用多进程或者多线程。那几时使用多进程或者多线程呢
如果是io型,使用多线程,如果是cpu型,使用多进程。
理论上说,使用多进程是效率最大的,而且io型也可以使用,但是会使用更多资源,所以有时候在没必要使用多进程时,就用多线程。在io型的时候,由于主要用于等待,使用多进程就没必要了。
我用的是多进程multipleprocess。记住一个小点,使用多进程容易cpu飙升到90+,影响其他应用的使用,可适当的sleep。
Python Multithreading Tutorial: Concurrency and Parallelism
简要翻译上面文章。我当时是参考这篇的。
经常谈论说很难使用Python的多线程工作,指责所谓的全局解释器锁(GIL),他可以防止多个线程同时运行Python代码。因此,线程模块的使用不像我们使用C或java那样。必须明确的一点是,人们仍然可以用Python编写代码使其运行或并行,来大大提升效率,只要某些细节做好。
在Python并发的教程,我们要写一个python脚本来自Imgur下载最流行的图像。我们将开始一个版本,下载图片按照顺序,或只下载一次。作为先决条件,你必须登记在有一个应用程序
本教程中的脚本用Python 3.4.2(笔者也是用这个版本)
开始使用多线程Python
让我们开始创建一个Python模块,命名为“下载。py。该文件将包含所有必要的功能来获取图像列表和下载。我们将这些功能分成三个独立的功能:
- get_links
- download_link
- 下载安装程序目录
import json
import logging
import os
from pathlib import Path
from urllib.request import urlopen, Request
logger = logging.getLogger(__name__)
def get_links(client_id):
headers = {'Authorization': 'Client-ID {}'.format(client_id)}
req = Request('https://api.imgur.com/3/gallery/', headers=headers, method='GET')
with urlopen(req) as resp:
data = json.loads(resp.readall().decode('utf-8'))
return map(lambda item: item['link'], data['data'])
def download_link(directory, link):
logger.info('Downloading %s', link)
download_path = directory / os.path.basename(link)
with urlopen(link) as image, download_path.open('wb') as f:
f.write(image.readall())
def setup_download_dir():
download_dir = Path('images')
if not download_dir.exists():
download_dir.mkdir()
return download_dir
接下来,我们需要写一个模块,可以使用这些功能来下载图像,一个一个的。我们将这名“single.py”。这将包含我们的第一个主要功能,图像下载。该模块将通过环境变量“imgur_client_id”获取clientId。它将调用“setup_download_dir”创建目标目录的下载。最后,它会获取列表中使用get_links函数图像,过滤掉所有的GIF和专辑的网址,然后使用“download_link”下载并保存这些图像到磁盘。这里是“single.py”
import logging
import os
from time import time
from download import setup_download_dir, get_links, download_link
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)
def main():
ts = time()
client_id = os.getenv('IMGUR_CLIENT_ID')
if not client_id:
raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
download_dir = setup_download_dir()
links = [l for l in get_links(client_id) if l.endswith('.jpg')]
for link in links:
download_link(download_dir, link)
print('Took {}s'.format(time() - ts))
if __name__ == '__main__':
main()
下面通过并发,我们可以节省很多时间。
使用多线程
Threading是最通用的方法。线程更轻量。线程比进程,他共享相同的内存空间。
我们将写一个新的模块来代替“single.py”。该模块将创建一个线程池8,一共有9个线程,包括主线程。我选择了8个工作线程,因为我的电脑有8个CPU核心,每个核心一个工作者线程似乎是一个好的号码,立刻运行多线程。在实践中,这一数字更仔细的选择,要考虑其他因素,因为还有其他应用程序和服务运行在同一台机器。
from queue import Queue
from threading import Thread
class DownloadWorker(Thread):
def __init__(self, queue):
Thread.__init__(self)
self.queue = queue
def run(self):
while True:
# Get the work from the queue and expand the tuple
directory, link = self.queue.get()
download_link(directory, link)
self.queue.task_done()
def main():
ts = time()
client_id = os.getenv('IMGUR_CLIENT_ID')
if not client_id:
raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
download_dir = setup_download_dir()
links = [l for l in get_links(client_id) if l.endswith('.jpg')]
# Create a queue to communicate with the worker threads
queue = Queue()
# Create 8 worker threads
for x in range(8):
worker = DownloadWorker(queue)
# Setting daemon to True will let the main thread exit even though the workers are blocking
worker.daemon = True
worker.start()
# Put the tasks into the queue as a tuple
for link in links:
logger.info('Queueing {}'.format(link))
queue.put((download_dir, link))
# Causes the main thread to wait for the queue to finish processing all the tasks
queue.join()
print('Took {}'.format(time() - ts))
运行这个脚本在同一台机器耗时4.1s,比上面的结果快4.7倍。虽然这是更快,更值得一提的是,只有一个线程在执行一次在整个过程中,由于gil。因此,本代码是concurrent but not parallel(“并行但不并发”,我不知道这样翻译是否合适)。原因是更快是因为这是一个IO绑定的任务。该处理器同时下载这些图像,实际上大部分的时间花在等待网络。这就是为什么线程可以提供一个大的速度增加。该处理器可以切换线程,其中一个是准备做一些工作。使用线程模块Python或任何其他解释语言,只要有GIL,实际上可以导致性能降低。如果你的代码执行的是CPU绑定的任务,如解压缩gzip文件,使用线程模块会导致较慢的执行时间。而对于CPU绑定的任务和真正的并行执行,我们可以使用多处理模块。
Multiple Processe
多进程更容易操作。
使用多个进程创建一个pool。随着map的方法提供,我们把urllist传进到pool中的,这将产生8个新进程,并行下载。这是真正的并行性,但这是有代价的。每个子进程都使用着该script所创建的资源。在这个简单的例子,它不是一个大问题,但它很容易成为严重的开销。(其实相当于你跑了多个script的实例,每个实例都拥有自己的内存,译者自己使用的时候,cpu经常飙升,所以要注意优化和sleep)
from functools import partial
from multiprocessing.pool import Pool
def main():
ts = time()
client_id = os.getenv('IMGUR_CLIENT_ID')
if not client_id:
raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
download_dir = setup_download_dir()
links = [l for l in get_links(client_id) if l.endswith('.jpg')]
download = partial(download_link, download_dir)
with Pool(8) as p:
p.map(download, links)
print('Took {}s'.format(time() - ts))
Distributing to Multiple Workers
当线程和多处理模块是cool的,但是只能在同一台机器,如果是分布式呢?如果你有长时间运行的任务,你不想在同一台机器用太多线程或者进程,这会降低应用程序的性能。
这便是Python库RQ,一个非常简单但功能强大的lib。把参数传进去。添加到redis列表入队的工作是第一步,但不会做任何事情。我们还需要至少一个worker监听作业队列。
第一步是安装和运行在你的计算机一个redis服务器,或访问一个运行redis服务器。在那之后,只有一些小的变化,对现有的代码。我们首先创建一个RQ队列实例,并把它传给redis server。然后嗲用我们称之为“q.enqueue(download_link,download_dir,link)”。Enqueue方法接受一个函数作为第一个参数,然后任何其他参数或关键字参数是传递给该函数的。
最后一步我们需要做的就是启动一些工人。rq提供了一个方便的脚本在默认的队列运行。运行“rqworker”在一个终端窗口就将开始一个工人listen的默认队列。请确保您的当前工作目录是脚本存放的目录。如果你想要听一个不同的队列,您可以运行“rqworker queue_name”它会听那个队列。关于RQ还有很方便,只要你能够连接使用redis,您可以在多台机器运行,来进行监听处理。因此,它是扩大规模好工具。这里是RQ版本源码:
from redis import Redis
from rq import Queue
def main():
client_id = os.getenv('IMGUR_CLIENT_ID')
if not client_id:
raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
download_dir = setup_download_dir()
links = [l for l in get_links(client_id) if l.endswith('.jpg')]
q = Queue(connection=Redis(host='localhost', port=6379))
for link in links:
q.enqueue(download_link, download_dir, link)
总结
如果你的代码是IO绑定,多线程和多线程Python都可以。多进程比较容易,但具有更高的内存开销。如果你的代码被CPU绑定的,多进程会是更好的选择,尤其是如果目标机有多核CPU。对于Web应用程序,当你需要工作在多台机器上,RQ更好。