本爬虫开启了进程池,执行多个进程爬取网站图片。爬取过程中,将操作信息写入日志文件 “Photo_Galleries.log” 。
在main()中:
- 以multiprocessing.Process()启动一个进程process_logger,执行任务task_logger()。该进程的接口为multiprocessing.Queue,接收queue中的信息并写入日志文件中。
- 搜素网页页面中的元素,获取所需要的元素信息date_data。
- 以以上元素信息的长度date_data为循环上限,以异步方式启动进程池pool,执行任务task_gallery(),搜素图片url并下载图片。相关信息发送到queue中。
- pool结束后,关闭process_logger。
# -*- coding: utf-8 -*- import logging import multiprocessing import os import time import re import requests import sys from requests_html import HTMLSession COOKIE = 'Hm_lvt_dbc355aef238b6c32b43eacbbf161c3c=1547555739,1547555783; Hm_lpvt_dbc355aef238b6c32b43eacbbf161c3c=1547735534' USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36' def get_directory(file_dir): '''获取文件完整路径中的目录''' file_dir_split = file_dir.split('\\', -1) dir = file_dir.rstrip(file_dir_split[-1]) return dir def logger( log_file="", log_console=False, log_format="%(asctime)s - %(levelname)s - %(message)s", log_setlevel=logging.DEBUG, ): '''定义日志处理函数''' # 如果未设置日志文件和控制台,返回None,结束运行 if log_file == "" and log_console is False: return None else: # 日志处理基本设置 logger = logging.getLogger(__name__) # 新建一个logging对象 logger.setLevel(level=log_setlevel) # 设置日志记录等级 logging_formatter = logging.Formatter(log_format) # 如果定义了日志文件,则设置日志文件 if log_file != "": # 设置日志文件 logging_file = logging.FileHandler(log_file) logging_file.setLevel(level=log_setlevel) logging_file.setFormatter(logging_formatter) logger.addHandler(logging_file) # 如果定义了控制台,则设置控制台 if log_console is True: # 设置控制台 logging_console = logging.StreamHandler(stream=sys.stdout) logging_console.setLevel(level=log_setlevel) logging_console.setFormatter(logging_formatter) logger.addHandler(logging_console) return logger main_dir = get_directory(__file__) logger = logger(log_file=main_dir + 'Photo_Galleries.log', log_console=True) def task_logger(queue): '''日志记录进程''' while True: logger_level, logger_message = queue.get(block=True, timeout=60) if logger_level == 'DEBUG': logger.debug(logger_message) if logger_level == 'INFO': logger.info(logger_message) if logger_level == 'WARNING': logger.warning(logger_message) if logger_level == 'ERROR': logger.error(logger_message) if logger_level == 'CRITICAL': logger.critical(logger_message) def create_directory(queue, main_dir, sub_dir): '''在main_dir下创建sub_dir''' # 去除目录中的特殊字符: \/:*?"<>| sub_dir_name = re.sub(r'[\/:*?"<>|]', '', sub_dir) dir = main_dir + sub_dir_name + '\\' # 建立目录,用于保存图片 if os.path.exists(dir) is False: try: os.makedirs(dir, 0o777) queue.put(('INFO', '创建目录 ' + dir + ' 成功。'), block=True, timeout=60) return dir except Exception: queue.put(('DEBUG', '创建目录 ' + dir + ' 失败。'), block=True, timeout=60) return None else: queue.put(('INFO', '目录 ' + dir + ' 已存在。'), block=True, timeout=60) return dir def get_web_page_elements(queue, url, headers, xpath_expression, timeout): '''获取网页中的元素''' session = HTMLSession() err_status = 0 task_finished = False while task_finished is False and err_status <= 10: try: response = session.get(url, headers=headers, timeout=timeout) elements_list = response.html.xpath(xpath_expression) task_finished = True queue.put(('INFO', '在网址 ' + url + ' 中搜索 ' + xpath_expression + ' 成功。'), block=True, timeout=60) return elements_list except Exception: err_status += 1 queue.put(('DEBUG', '在网址 ' + url + ' 中搜索 ' + xpath_expression + ' 失败。睡眠15秒钟后重新执行搜素任务。'), block=True, timeout=60) time.sleep(15) finally: session.close() if err_status == 10: queue.put(('DEBUG', '在网址 ' + url + ' 中搜索 ' + xpath_expression + ' 失败了10次。不再执行该搜素任务。'), block=True, timeout=60) return None def download_img_file(queue, img_dir, img_url, img_headers, timeout): '''下载图片文件''' img_file_downloaded = False err_status = 0 while img_file_downloaded is False and err_status <= 10: try: img_response = requests.get(img_url, headers=img_headers, timeout=timeout) img_url_split = img_url.split('/', -1) img_name = img_url_split[-1] img_file = open(img_dir + img_name, 'wb') img_file.write(img_response.content) img_file_downloaded = True except Exception: err_status += 1 queue.put(('DEBUG', '在网址 ' + img_url + ' 中下载图片失败。睡眠15秒钟后重启该下载任务。'), block=True, timeout=60) time.sleep(15) if img_file_downloaded: queue.put(('INFO', '在网址 ' + img_url + ' 中下载图片成功。'), block=True, timeout=60) else: queue.put(('DEBUG', '在网址 ' + img_url + ' 中下载图片失败了10次。不再执行该下载任务。'), block=True, timeout=60) def task_gallery(queue, main_dir, web_page_url, date): '''数据爬取进程''' # 创建日期目录 gallery_date_dir = create_directory(queue, main_dir, date) if gallery_date_dir is not None: gallery_page_url = web_page_url + '?s=' + date gallery_page_referer = gallery_page_url gallery_page_headers = { 'Referer': gallery_page_referer, 'Cookie': COOKIE, 'User-Agent': USER_AGENT } star_page_xpath_expression = "//div[@class='gal_list']/a" star_page_data = get_web_page_elements(queue, gallery_page_url, gallery_page_headers, star_page_xpath_expression, 30) for each_star_page in star_page_data: star_name = each_star_page.text # 明星名称 star_page_url = 'https://www.kindgirls.com' + each_star_page.attrs['href'] # 明星完整url star_photo_dir = create_directory(queue, gallery_date_dir, star_name) # 创建明星目录 if star_photo_dir is not None: star_page_referer = star_page_url star_page_headers = { 'Referer': star_page_referer, 'Cookie': COOKIE, 'User-Agent': USER_AGENT } star_photos_xpath_expression = "//div[@class='gal_list']/a[2]" star_photos_data = get_web_page_elements(queue, star_page_url, star_page_headers, star_photos_xpath_expression, 30) for each in star_photos_data: star_photo_url = each.attrs['href'] star_photo_referer = star_page_url star_photo_headers = { 'Referer': star_photo_referer, 'Cookie': COOKIE, 'User-Agent': USER_AGENT } download_img_file(queue, star_photo_dir, star_photo_url, star_photo_headers, 30) else: pass time.sleep(1) else: pass def main(main_dir): '''main函数''' # 设置 queue queue = multiprocessing.Manager().Queue(8) # 设置进程池大小为8 pool = multiprocessing.Pool() # 启动日志进程 process_logger = multiprocessing.Process(target=task_logger, args=(queue,)) process_logger.start() # pool.apply_async(task_logger, args=(queue,)) # 网页数据 web_page_url = '网站地址' web_page_referer = web_page_url web_page_headers = { 'Referer': web_page_referer, 'Cookie': COOKIE, 'User-Agent': USER_AGENT } date_xpath_expression = "//select[@name='s']/option" date_data = get_web_page_elements(queue, web_page_url, web_page_headers, date_xpath_expression, 30) date_data_length = len(date_data) for date_index in range(0, date_data_length): date = date_data[date_index].attrs['value'] pool.apply_async(task_gallery, args=(queue, main_dir, web_page_url, date)) pool.close() pool.join() queue.put(('INFO', '任务完成。关闭日志进程。'), block=True, timeout=60) time.sleep(5) process_logger.terminate() process_logger.join() if __name__ == '__main__': '''本程序入口''' main(main_dir)