cosmic_download-AsyncPool待修正

时间:2021-08-31 18:25:57
# !/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2018/11/16 10:02 AM
# @Author : cxa
# @File : cosmic.py
# @Software: PyCharm
# encoding: utf-8
import os
import aiohttp
import hashlib
import base64
from cryptography.fernet import Fernet
import aiofiles
import multiprocessing
import async_timeout
from lxml import html
import asyncio
from aiologger import Logger
import asyncpool
workers = multiprocessing.cpu_count() * 2 + 1
# 开始索引数
start_num = 227002
# 结束索引数
# end_num = 227006
end_num = 250606
key = "X0JxSkg4NFVBQVBPODlUM0VzT1liNnloeWtLcndkSldRT2xURzQ4MEM5RT0="
page_num_xpath = "//p[@class='selectpage']/select[@id='page_number']/option[last()]/@file"
page_id_xpath = "//img[@id='thumbnail']/@src" def aes_cbc_decrypt(message):
decrypted_text = Fernet(base64.b64decode(key).decode("utf8")).decrypt(bytes("{}".format(message), encoding="utf8"))
return decrypted_text.decode("utf8") # 漫画题目
cosmic_name = "//head//title/text()"
# 漫画id
cosmic_id = "//img[@id='curPic']/@src"
main_url = aes_cbc_decrypt(
"gAAAAABbNdhqCnxkaJwZ2VL7HUXne_IOic-NsHtE30W-J68oecVmgm0dzO_lLXgTlI7a5_NbUWlkGm7FqLwY81XIBddNWbac4rCgBA9NFAECsNISkhTvdRl4uDSaS6bHY8sbcJJwO13Z")
cosmic_url_gen = (main_url.format(i) for i in range(start_num, end_num + 1))
full_url = aes_cbc_decrypt(
"gAAAAABbNdk5FLeX55hOiDAXxgCwwYmGrokYvU3Nd1AOYuOE7OdIEcBdAmSG_Q3kOltealBKMOgUBKDuPUJtzFFPwqoxL-FUip"
"VNQU-JmBW_K5qxgzTQ3IOla_F61Rscy0fJOaN-mEXKPqrakctyDRN7OVm1LARTMhylQELLuBnJgIT4WXilchg=") # 漫画的总id,序号的id和格式使用(jpg) sema = asyncio.Semaphore(5)
sem_img = asyncio.Semaphore(50) async def logging():
logger = Logger.with_default_handlers(name='cosmic_download')
return logger async def get_buff(url, c_name, session, log):
with async_timeout.timeout(60):
async with session.get(url) as r:
buff = await r.read()
if not len(buff):
url = url.replace(".jpg", ".png")
async with session.get(url) as r2:
buff = await r2.read()
await log.info(f"NOW_URL:, {url}")
await get_img(url, buff, c_name, log) async def run_img(url, c_name, session, log):
async with sem_img:
await get_buff(url, c_name, session, log) async def get_img(url, buff, c_name, log):
# 题目那层目录
filepath = os.path.join(os.getcwd(), "comics_images", c_name)
# 如果标题太长就转md5,然后单独启动一个text写入内容为标题
md5name = hashlib.md5(c_name.encode("utf-8")).hexdigest()
filepath2 = os.path.join(os.getcwd(), "comics_images", md5name) id = url.split('/')[-1]
image_id = os.path.join(filepath, id)
image_id2 = os.path.join(filepath2, md5name) # 题目层目录是否存在
if not os.path.exists(filepath) and not os.path.exists(filepath2):
try:
os.makedirs(filepath)
except:
os.makedirs(filepath2)
image_id = image_id2
fs = await aiofiles.open(os.path.join(filepath2, "title.txt"), 'w')
await fs.write(c_name) # 文件是否存在
if not os.path.exists(image_id) and not os.path.exists(image_id2):
await log.info(f"SAVE_PATH:{image_id}")
async with aiofiles.open(image_id, 'wb') as f:
await f.write(buff) async def fetch(url, session, log, retry_index=0):
try:
with async_timeout.timeout(30):
async with session.get(url, verify_ssl=False) as req:
res_status = req.status
if res_status == 200:
text = await req.text()
root = html.fromstring(text)
name = root.xpath(cosmic_name)[0]
jpg_id = root.xpath(page_id_xpath)[0].split('/')[-2]
max_page = root.xpath(page_num_xpath)[0].split('.')[0]
full_url_gen = (full_url.format(jpg_id, i, "jpg") for i in range(1, int(max_page) + 1))
tasks = [asyncio.ensure_future(run_img(img_url, name, session, log)) for img_url in full_url_gen]
await asyncio.gather(*tasks)
except Exception as e:
text = None
if not text:
await log.error(f'Retry times: {retry_index + 1}')
retry_index += 1
return await fetch(url, session, log, retry_index) async def bound_fetch(url, session, log):
async with sema:
await fetch(url, session, log) async def run(data):
log = await logging()
result_queue = asyncio.Queue()
await log.info("Start Spider")
async with asyncpool.AsyncPool(loop, num_workers=10, name="cosmic",
logger=log, worker_co=bound_fetch) as pool:
async with aiohttp.connector.TCPConnector(limit=100, force_close=True, enable_cleanup_closed=True) as tc:
async with aiohttp.ClientSession(connector=tc) as session:
for url in data:
await pool.push(url, session, log)
await result_queue.put(None) if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run(cosmic_url_gen))
loop.close()

项目地址:https://github.com/muzico425/cosmic_download