python 性能提升之 并行map 线程池过亿数据量

时间:2024-10-29 15:32:52

前段时间进行单一目录下10万张图片发送,效果很差,数据积压原来越多。

性能问题提上议程。

采用多线程 多进程 感觉比较繁琐,网上有介绍 map的并行处理的,使用后性能提高明细。

网上介绍map如下

 

介绍:Map

Map是一个很棒的小功能,同时它也是Python并行代码快速运行的关键。给不熟悉的人讲解一下吧,map是从函数语言Lisp来的。map函数能够按序映射出另一个函数。例如

  1. urls = ['', '']
  2. results = map(urllib2.urlopen, urls)

这里调用urlopen方法来把调用结果全部按序返回并存储到一个列表里。就像:

  1. results = []
  2. for url in urls:
  3. (urllib2.urlopen(url))

Map按序处理这些迭代。调用这个函数,它就会返回给我们一个按序存储着结果的简易列表。

为什么它这么厉害呢?因为只要有了合适的库,map能使并行运行得十分流畅!
这里写图片描述
有两个能够支持通过map函数来完成并行的库:一个是multiprocessing,另一个是鲜为人知但功能强大的子文件:。

题外话:这个是什么?你从来没听说过dummy多进程库?我也是最近才知道的。它在多进程的说明文档里面仅仅只被提到了一句。而且那一句就是大概让你知道有这么个东西的存在。我敢说,这样几近抛售的做法造成的后果是不堪设想的!

Dummy就是多进程模块的克隆文件。唯一不同的是,多进程模块使用的是进程,而dummy则使用线程(当然,它有所有Python常见的限制)。也就是说,数据由一个传递给另一个。这能够使得数据轻松的在这两个之间进行前进和回跃,特别是对于探索性程序来说十分有用,因为你不用确定框架调用到底是IO 还是CPU模式。

准备开始

要做到通过map函数来完成并行,你应该先导入装有它们的模块:

  1. from multiprocessing import Pool
  2. from multiprocessing.dummy import Pool as ThreadPool

再初始化:

pool = ThreadPool()

这简单的一句就能代替我们的build_worker_pool 函数在中的所有工作。换句话说,它创建了许多有效的worker,启动它们来为接下来的工作做准备,以及把它们存储在不同的位置,方便使用。

Pool对象需要一些参数,但最重要的是:进程。它决定pool中的worker数量。如果你不填的话,它就会默认为你电脑的内核数值。

如果你在CPU模式下使用多进程pool,通常内核数越大速度就越快(还有很多其它因素)。但是,当进行线程或者处理网络绑定之类的工作时,情况会比较复杂所以应该使用pool的准确大小。

pool = ThreadPool(4) # Sets the pool size to 4

如果你运行过多线程,多线程间的切换将会浪费许多时间,所以你最好耐心调试出最适合的任务数。

使用代码效果:

没有优化前代码,性能很差,数据处理不完,积压越来越多

 

  1. import linecache
  2. import os
  3. import
  4. import requests
  5. import time
  6. import datetime
  7. import sys
  8. reload(sys)
  9. ('utf8')
  10. imagedir='/opt/tomcat_api/video_sendto_api/image_bak/'
  11. def send_image(imagedir):
  12. #扫描图片路径
  13. for img_parent, img_dir_names, img_names in (imagedir):
  14. for img_name in img_names:
  15. image = (img_parent, img_name) #拼接图片完整路径
  16. print ("%Y-%m-%d %X"),image
  17. #准备发送图片
  18. file = dict(file=open(image, 'rb'))
  19. post_data = {'mark': 'room-201', 'timestamp': 1846123456, 'random': 123}
  20. headers = {'app_key': app_key, 'access_token': access_token}
  21. result = (url, files=file, data=post_data, headers=headers, verify=False)
  22. print
  23. #删除发送的图片
  24. str_img = "rm -f " + " " + image
  25. del_img = (str_img).readline()
  26. print del_img
  27. if __name__ == "__main__":
  28. send_image(imagedir)


采用map 代码,效果处理速度明细。

 

拆分send_image函数。

 

  1. import linecache
  2. import os
  3. import
  4. import requests
  5. import time
  6. import datetime
  7. import sys
  8. reload(sys)
  9. ('utf8')
  10. from multiprocessing import Pool
  11. from import Pool as ThreadPool
  12. imagedir='/opt/tomcat_api/video_sendto_api/image_bak/'
  13. #获取扫描目录,生成列表
  14. def get_img_path(imagedir):
  15. list=[]
  16. for img_parent, img_dir_names, img_names in (imagedir):
  17. for img_name in img_names:
  18. image = (img_parent, img_name) #拼接图片完整路径
  19. list.append(image)
  20. return list
  21. def send_images(image):
  22. file = dict(file=open(image, 'rb'))
  23. post_data = {'mark': 'room-201', 'timestamp': 1846123456, 'random': 123}
  24. headers = {'app_key': app_key, 'access_token': access_token}
  25. result = (url, files=file, data=post_data, headers=headers, verify=False)
  26. print
  27. str_img = "rm -f " + " " + image
  28. del_img = (str_img).readline()
  29. print del_img
  30. if __name__ == "__main__":
  31. image=get_img_path(imagedir)
  32. pool=Pool()
  33. pool.map(send_images,image)
  34. ()
  35. ()


任务不需要长时间运行,如果程序处理完毕,添加定时任务再吊起来。

 

crontab  11分钟检测一次。

 

  1. #!/bin/bash
  2. counter=$(ps -C video_send2api_new|wc -l)
  3. if [ "${counter}" -le 1 ]; then
  4. python /opt/tomcat_api/video_sendto_api/video_send2api_new.py >>/opt/tomcat_api/video_sendto_api/logs/&
  5. fi

 

线程池

接到任务要发送一亿个图片,来进行系统性能和稳定性验证。本次任务采用 线程池 ThreadPoolExecutor 来解决。

from  import ThreadPoolExecutor

实现思路:准备30万图片,图片提前生成任务列表,循环读取任务列表。

代码如下

  1. # -*- coding:utf-8 -*-
  2. import time
  3. import os
  4. from import ThreadPoolExecutor
  5. #具体实现方法
  6. def read_url(num, url):
  7. pass
  8. #主要方法
  9. def query_data():
  10. file_path = "md5_list.txt"
  11. num = 0
  12. count= 100000000
  13. begin_time = ()
  14. print("开始时间:", ("%Y-%m-%d %H:%M:%S", ()))
  15. while True:
  16. with open(file_path, 'r') as f:
  17. urls = ()
  18. with ThreadPoolExecutor(10) as executor:
  19. for url in urls:
  20. num = num +1
  21. count -= 1
  22. (read_url, num, url)
  23. if count <= 0:
  24. break
  25. if count <=0:
  26. break
  27. end_time = ()
  28. print("结束:", ("%Y-%m-%d %H:%M:%S", ()))
  29. print("总时间:%f s"% (end_time - begin_time))

根据当前cpu核数,可以设置线程数量,本代码设置10