Python爬虫,多进程 + 日志记录

时间:2022-03-30 03:39:13

 

本爬虫开启了进程池,执行多个进程爬取网站图片。爬取过程中,将操作信息写入日志文件 “Photo_Galleries.log” 。

在main()中:

  • 以multiprocessing.Process()启动一个进程process_logger,执行任务task_logger()。该进程的接口为multiprocessing.Queue,接收queue中的信息并写入日志文件中。
  • 搜素网页页面中的元素,获取所需要的元素信息date_data。
  • 以以上元素信息的长度date_data为循环上限,以异步方式启动进程池pool,执行任务task_gallery(),搜素图片url并下载图片。相关信息发送到queue中。
  • pool结束后,关闭process_logger。
# -*- coding: utf-8 -*-


import logging
import multiprocessing
import os
import time
import re
import requests
import sys
from requests_html import HTMLSession

COOKIE = 'Hm_lvt_dbc355aef238b6c32b43eacbbf161c3c=1547555739,1547555783; Hm_lpvt_dbc355aef238b6c32b43eacbbf161c3c=1547735534'
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36'


def get_directory(file_dir):
    '''获取文件完整路径中的目录'''
    file_dir_split = file_dir.split('\\', -1)
    dir = file_dir.rstrip(file_dir_split[-1])
    return dir


def logger(
    log_file="",
    log_console=False,
    log_format="%(asctime)s - %(levelname)s - %(message)s",
    log_setlevel=logging.DEBUG,
):
    '''定义日志处理函数'''
    # 如果未设置日志文件和控制台,返回None,结束运行
    if log_file == "" and log_console is False:
        return None
    else:
        # 日志处理基本设置
        logger = logging.getLogger(__name__)  # 新建一个logging对象
        logger.setLevel(level=log_setlevel)  # 设置日志记录等级
        logging_formatter = logging.Formatter(log_format)
        # 如果定义了日志文件,则设置日志文件
        if log_file != "":
            # 设置日志文件
            logging_file = logging.FileHandler(log_file)
            logging_file.setLevel(level=log_setlevel)
            logging_file.setFormatter(logging_formatter)
            logger.addHandler(logging_file)
        # 如果定义了控制台,则设置控制台
        if log_console is True:
            # 设置控制台
            logging_console = logging.StreamHandler(stream=sys.stdout)
            logging_console.setLevel(level=log_setlevel)
            logging_console.setFormatter(logging_formatter)
            logger.addHandler(logging_console)
        return logger


main_dir = get_directory(__file__)
logger = logger(log_file=main_dir + 'Photo_Galleries.log', log_console=True)


def task_logger(queue):
    '''日志记录进程'''
    while True:
        logger_level, logger_message = queue.get(block=True, timeout=60)
        if logger_level == 'DEBUG':
            logger.debug(logger_message)
        if logger_level == 'INFO':
            logger.info(logger_message)
        if logger_level == 'WARNING':
            logger.warning(logger_message)
        if logger_level == 'ERROR':
            logger.error(logger_message)
        if logger_level == 'CRITICAL':
            logger.critical(logger_message)


def create_directory(queue, main_dir, sub_dir):
    '''在main_dir下创建sub_dir'''
    # 去除目录中的特殊字符: \/:*?"<>|
    sub_dir_name = re.sub(r'[\/:*?"<>|]', '', sub_dir)
    dir = main_dir + sub_dir_name + '\\'
    # 建立目录,用于保存图片
    if os.path.exists(dir) is False:
        try:
            os.makedirs(dir, 0o777)
            queue.put(('INFO', '创建目录 ' + dir + ' 成功。'), block=True, timeout=60)
            return dir
        except Exception:
            queue.put(('DEBUG', '创建目录 ' + dir + ' 失败。'), block=True, timeout=60)
            return None
    else:
        queue.put(('INFO', '目录 ' + dir + ' 已存在。'), block=True, timeout=60)
        return dir


def get_web_page_elements(queue, url, headers, xpath_expression, timeout):
    '''获取网页中的元素'''
    session = HTMLSession()
    err_status = 0
    task_finished = False
    while task_finished is False and err_status <= 10:
        try:
            response = session.get(url, headers=headers, timeout=timeout)
            elements_list = response.html.xpath(xpath_expression)
            task_finished = True
            queue.put(('INFO', '在网址 ' + url + ' 中搜索 ' + xpath_expression + ' 成功。'), block=True, timeout=60)
            return elements_list
        except Exception:
            err_status += 1
            queue.put(('DEBUG', '在网址 ' + url + ' 中搜索 ' + xpath_expression + ' 失败。睡眠15秒钟后重新执行搜素任务。'), block=True, timeout=60)
            time.sleep(15)
        finally:
            session.close()
    if err_status == 10:
        queue.put(('DEBUG', '在网址 ' + url + ' 中搜索 ' + xpath_expression + ' 失败了10次。不再执行该搜素任务。'), block=True, timeout=60)
    return None


def download_img_file(queue, img_dir, img_url, img_headers, timeout):
    '''下载图片文件'''
    img_file_downloaded = False
    err_status = 0
    while img_file_downloaded is False and err_status <= 10:
        try:
            img_response = requests.get(img_url, headers=img_headers, timeout=timeout)
            img_url_split = img_url.split('/', -1)
            img_name = img_url_split[-1]
            img_file = open(img_dir + img_name, 'wb')
            img_file.write(img_response.content)
            img_file_downloaded = True
        except Exception:
            err_status += 1
            queue.put(('DEBUG', '在网址 ' + img_url + ' 中下载图片失败。睡眠15秒钟后重启该下载任务。'), block=True, timeout=60)
            time.sleep(15)
    if img_file_downloaded:
        queue.put(('INFO', '在网址 ' + img_url + ' 中下载图片成功。'), block=True, timeout=60)
    else:
        queue.put(('DEBUG', '在网址 ' + img_url + ' 中下载图片失败了10次。不再执行该下载任务。'), block=True, timeout=60)


def task_gallery(queue, main_dir, web_page_url, date):
    '''数据爬取进程'''
    # 创建日期目录
    gallery_date_dir = create_directory(queue, main_dir, date)
    if gallery_date_dir is not None:
        gallery_page_url = web_page_url + '?s=' + date
        gallery_page_referer = gallery_page_url
        gallery_page_headers = {
            'Referer': gallery_page_referer,
            'Cookie': COOKIE,
            'User-Agent': USER_AGENT
        }
        star_page_xpath_expression = "//div[@class='gal_list']/a"
        star_page_data = get_web_page_elements(queue, gallery_page_url, gallery_page_headers, star_page_xpath_expression, 30)
        for each_star_page in star_page_data:
            star_name = each_star_page.text     # 明星名称
            star_page_url = 'https://www.kindgirls.com' + each_star_page.attrs['href']   # 明星完整url
            star_photo_dir = create_directory(queue, gallery_date_dir, star_name)  # 创建明星目录
            if star_photo_dir is not None:
                star_page_referer = star_page_url
                star_page_headers = {
                    'Referer': star_page_referer,
                    'Cookie': COOKIE,
                    'User-Agent': USER_AGENT
                }
                star_photos_xpath_expression = "//div[@class='gal_list']/a[2]"
                star_photos_data = get_web_page_elements(queue, star_page_url, star_page_headers, star_photos_xpath_expression, 30)
                for each in star_photos_data:
                    star_photo_url = each.attrs['href']
                    star_photo_referer = star_page_url
                    star_photo_headers = {
                        'Referer': star_photo_referer,
                        'Cookie': COOKIE,
                        'User-Agent': USER_AGENT
                    }
                    download_img_file(queue, star_photo_dir, star_photo_url, star_photo_headers, 30)
            else:
                pass
            time.sleep(1)
    else:
        pass


def main(main_dir):
    '''main函数'''
    # 设置 queue
    queue = multiprocessing.Manager().Queue(8)
    # 设置进程池大小为8
    pool = multiprocessing.Pool()
    # 启动日志进程
    process_logger = multiprocessing.Process(target=task_logger, args=(queue,))
    process_logger.start()
    # pool.apply_async(task_logger, args=(queue,))
    # 网页数据
    web_page_url = '网站地址'
    web_page_referer = web_page_url
    web_page_headers = {
        'Referer': web_page_referer,
        'Cookie': COOKIE,
        'User-Agent': USER_AGENT
    }
    date_xpath_expression = "//select[@name='s']/option"
    date_data = get_web_page_elements(queue, web_page_url, web_page_headers, date_xpath_expression, 30)
    date_data_length = len(date_data)
    for date_index in range(0, date_data_length):
        date = date_data[date_index].attrs['value']
        pool.apply_async(task_gallery, args=(queue, main_dir, web_page_url, date))
    pool.close()
    pool.join()
    queue.put(('INFO', '任务完成。关闭日志进程。'), block=True, timeout=60)
    time.sleep(5)
    process_logger.terminate()
    process_logger.join()


if __name__ == '__main__':
    '''本程序入口'''
    main(main_dir)