Python3.7爬虫 大量爬取某小说网站小说并写入mysql(持续完善中...) 未解决问题:mysql长时间新增超过百万条数据表锁甚至崩溃

时间:2021-05-01 23:30:54

练手之作 代码中还有很多问题 持续完善中 

渣渣阿里T5 99包邮服务器只开了6个进程

#encoding:utf-8
import requests  # 请求
from lxml import html  # 解析HTML
from multiprocessing import Pool,Semaphore  # 进程
import random
import time
import os
import string
from fake_useragent import UserAgent
import multiprocessing
import base64
import MySQLdb

basepath = os.path.abspath('text')
imgpath = os.path.abspath('timg')
baseUrl = 'http://www.quanshuwang.com/list/1_1.html'
baseFrom = '全书网'
type=2

def getList(page):  # 获得主页数据
    r = requests.get('http://www.quanshuwang.com/all/allvisit_{}_0_0_0_0_0_{}.html'.format(type,page), headers=getHeaders()).text
    doc = html.fromstring(r)
    urls = doc.xpath('//div[@class="yd-book-item yd-book-item-pull-left"]/a/@href')
    return urls




def getHeaders():  # 头部
    headers = {
        'Referer': baseUrl,
        'Connection': 'close',
        'User-Agent': UserAgent().random
    }
    return headers


def upload_img(jpgLink, filename):
    with open(filename, "wb+") as jpg:
        jpg.write(requests.get(jpgLink).content)
        print('图片下载成功')


def getInfo(url):
    try:
        info = {}
        pro = 1
        r = requests.get(url, headers=getHeaders(), timeout=3)
        doc = html.fromstring(r.content)
        des = doc.xpath('//div[@id="waa"]/text()')[0]
        info['des'] = "".join(des.split())
        info['des'] = info['des'][3:]
        info['name'] = doc.xpath('//div[@class="b-info"]/h1/text()')[0]
        links = doc.xpath('//div[@class="b-oper"]/a/@href')[0]
        imgurl = doc.xpath('//a[@class="l mr11"]/img/@src')[0]
        img = base64.b64encode(info['name'].encode('utf-8')) + b'.jpg'
        img=(img.decode()).replace('/', '')
        info['thumb'] = 'timg/' + img
        filename = imgpath + '/' + img
        info['from'] = links
        upload_img(imgurl, filename)  # 下载图片
        getBook(links, pro, info) #下载内容
    except requests.exceptions.Timeout:
        print('连接超时,正在重连...')
        getInfo(url)
    except Exception as e:
        print('错误',e)
        getInfo(url)


def insertList(info):  # 新增小说
    db = MySQLdb.connect.connect(host='localhost', user='root', passwd='LuoYang%684985', db='python', port=3306,
                         charset='utf8')
    you = db.cursor()  # 用cursor方法获取一个操作游标you
    sql='select id from text_list  where name={}'.format("'"+info['name']+"'")
    you.execute(sql)
    is_repeat =you.fetchone()
    if is_repeat:
        print('小说{}重复'.format(info['name']))
        return is_repeat[0]
    else:
        you.execute("insert into text_list (type,thumb,description,name,author,froms,add_time) value({},{},{},{},{},{},{})".format(info['type'],"'"+info['thumb']+"'","'"+info['des']+"'","'"+info['name']+"'","'"+info['author']+"'", "'"+info['from']+"'",int(time.time())))
        you.execute("select last_insert_id();")
        data = you.fetchone()
        db.commit()
        db.close()  # 释放数据库资源
        print('正在下载小说{}'.format(info['name']))
        return data[0]

def is_repeat(info,db):
    you = db.cursor()  # 用cursor方法获取一个操作游标you
    sql1 = 'select id from text_del where l_id={} and title={}'.format(info['l_id'], "'" + info['title'] + "'")
    you.execute(sql1)
    is_repeat = you.fetchone()
    if is_repeat:
        time.sleep(0.1)
        return -1
    else:
        return 1
def insertContent(info,db):  # 新增小说
    you = db.cursor()  # 用cursor方法获取一个操作游标you
    sql="insert into text_del (l_id,title,content,add_time,`order`,froms) value({},{},{},{},{},{})".format(info['l_id'],"'"+info['title']+"'","'"+info['content']+"'",info['add_time'],info['num'],"'"+info['froms']+"'")
    you.execute(sql)
    db.commit()
    you.close()  # 关闭操作游标


def random_string(size=5, chars=string.ascii_uppercase + string.digits):
        return str(int(time.time()))+''.join(random.choice(chars) for _ in range(size))
def getBook(link, pro, info):  # 下载图片以及存入mysql
    try:
        r = requests.get(link, headers=getHeaders(),timeout=3)
        doc = html.fromstring(r.content)
        info['author'] = doc.xpath('//div[@class="chapName"]/span/text()')[0]
        info['author'] = info['author'][3:]
        info['type'] = type
        res={}
        res['l_id']=insertList(info)
        links = doc.xpath('//div[@class="clearfix dirconone"]/li/a')
        str=random_string()
        bookpath = '%s\\%s' % (basepath,str )
        if os.path.exists(bookpath):
            pass
        else:
            os.mkdir(bookpath)
        num=0
        db = MySQLdb.connect(host='localhost', user='root', passwd='LuoYang%684985', db='python', port=3306,charset='utf8')
        for i in links:
            num = num + 1
            res['num'] = num
            name = i.xpath('./text()')[0]
            res['title'] = name
            if is_repeat(res,db)==-1:
                pass
            else:
                downTxt(i, str, pro,res,db)
        db.close()
    except requests.exceptions.Timeout:
        print('代理连接超时,正在重连...')
        getBook(link, 0, info)
    except Exception as e:
        # print('错误', e)
        getBook(link, 0, info)


def downTxt(page, path, pro,res,db):  # 下载书籍
    res['add_time']=int(time.time())
    url = page.xpath('./@href')[0]
    try:
        r = requests.get(url, headers=getHeaders())
        doc = html.fromstring(r.content)
        arc = doc.xpath('//div[@id="content"]/text()')
        arc = "".join(arc)
        str=random_string()
        relname='text/{}/{}.txt'.format(path,str)
        res['froms']=url
        res['content']=relname
        fileName = '%s\\text\\%s\\%s.txt' % (os.path.abspath('.'),path, str)
        time.sleep(1)
        insertContent(res,db)
        with open(fileName, 'w+', encoding='utf-8') as txt:
            txt.write(arc)
    except requests.exceptions.Timeout:
        # print('代理连接超时,正在重连...')
        downTxt(page, path, 0,res,db)
    except Exception as e:
        # print('错误', e, '正在重连...')
        downTxt(page, path, 0,res,db)


def work(i):
    lists = getList(i)
    for s in lists:
        getInfo(s)
        time.sleep(10)
if __name__ == '__main__':
    multiprocessing.freeze_support()
    pool = multiprocessing.Pool(processes=6)
    for i in range(1,51):
        pool.apply_async(work, (i, ))
    pool.close()
    pool.join()