python多线程爬虫设计及实现示例

时间:2022-01-31 00:01:13

爬虫的基本步骤分为:获取,解析,存储。假设这里获取和存储为io密集型(访问网络和数据存储),解析为cpu密集型。那么在设计多线程爬虫时主要有两种方案:第一种方案是一个线程完成三个步骤,然后运行多个线程;第二种方案是每个步骤运行一个多线程,比如N个线程进行获取,1个线程进行解析(多个线程之间切换会降低效率),N个线程进行存储。

下面我们尝试抓取http://www.chembridge.com/ 库存药品信息。

首先确定url为http://www.chembridge.com/search/search.phpsearchType=MFCD&query='+line+'&type=phrase&results=10&search=1,其中line为要搜索的药品信息(要搜索的药品信息保存在本地文件txt中),这里使用requests库进行http请求,获取页面的代码如下:

url='http://www.chembridge.com/search/search.php?searchType=MFCD&query='+line+'&type=phrase&results=10&search=1'
response = requests.get(url,headers=self.headers[0],timeout=20)
html_doc=response.text

页面解析使用beautifulsoup库,部分代码如下:

soup = BeautifulSoup(html_doc, 'lxml')
div=soup.find(id='BBResults')
if div:
links=div.select('a.chemical')
for link in links:
try:
self.get_page_link(link,line)
except Exception as e:
print('%s入库失败:'%line,e)
time.sleep(self.relay*2)
print('%s重新入库'%line)
self.get_page_link(link,line)
continue
print('%s搜索完成'%line)
def get_page_link(self,link,line):
res=[]
href=link.get('href')
print(href)
time.sleep(self.relay*2*random.randint(5,15)/10)
r=requests.get(href,headers=self.headers[1],timeout=20)
if r.status_code==200:
parse_html=r.text
soup1=BeautifulSoup(parse_html, 'lxml')
catalogs=[catalog.get_text() for catalog in soup1.select('form div.matter h2')]#获取catalog
# print(catalogs)
table_headers=[table_header.get_text(strip=True) for table_header in soup1.select('form .matter thead tr')]
if 'AmountPriceQty.' in table_headers:
index=table_headers.index('AmountPriceQty.')
catalog=catalogs[0]
trs=soup1.select('.form tbody tr')
if len(catalogs)>1:
catalog=catalogs[index]
for tr in trs:
if len(tr.select('td'))>1:
row=tuple([catalog])+tuple(td.get_text("|", strip=True) for td in tr.select('td'))
res.append(row)

最后将res保存到mysql数据库:

conn=mysql.connector.connect(host='localhost',user='root', passwd='password', db='test')
cursor = conn.cursor()
sql = 'INSERT INTO chembridge VALUES(%s,%s,%s,%s)'
cursor.executemany(sql,res)
print('入库')
conn.commit()
cursor.close()
conn.close()

一、单线程爬虫封装的完整代码如下:

# -*- coding:utf-8 -*-
import requests,random,time
from bs4 import BeautifulSoup
import mysql.connector class Spider:
def __init__(self):
self.headers=[{
'Host':'www.chembridge.com',
'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0',
'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language':'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
'Accept-Encoding':'gzip, deflate',
'Referer':'http://www.chembridge.com/search/search.php?search=1',
'Connection':'keep-alive',
'Upgrade-Insecure-Requests':''
},
{
'Host':'www.hit2lead.com',
'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0',
'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language':'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
'Accept-Encoding':'gzip, deflate, br'
}]
self.filename='MDL.txt' def get_page_link(self,link):
res=[]
href=link.get('href')
print(href)
parse_html=requests.get(href,headers=self.headers[1]).text
soup1=BeautifulSoup(parse_html, 'lxml')
catalogs=[catalog.get_text() for catalog in soup1.select('form div.matter h2')]#获取catalog
print(catalogs)
table_headers=[table_header.get_text(strip=True) for table_header in soup1.select('form .matter thead tr')]
print(table_headers)
index=table_headers.index('AmountPriceQty.')
catalog=catalogs[0]
trs=soup1.select('.form tbody tr')
# print(trs)
if len(catalogs)>1:
catalog=catalogs[index]
for tr in trs:
if len(tr.select('td'))>1:
row=tuple([catalog])+tuple(td.get_text("|", strip=True) for td in tr.select('td'))
res.append(row)
print(res)
conn=mysql.connector.connect(host='localhost',user='root', passwd='password', db='test')
cursor = conn.cursor()
sql = 'INSERT INTO chembridge_test2 VALUES(%s,%s,%s,%s)'
cursor.executemany(sql,res)
conn.commit()
cursor.close()
conn.close() def get_page(self,line):
url='http://www.chembridge.com/search/search.php?searchType=MFCD&query='+line+'&type=phrase&results=10&search=1'
try:
response = requests.get(url,headers=self.headers[0],timeout=20)
print(response.status_code)
html_doc=response.text
# print(html_doc)
soup = BeautifulSoup(html_doc, 'lxml')
div=soup.find(id='BBResults')
if div:
links=div.select('a.chemical')
for link in links:
self.get_page_link(link)
relay=random.randint(2,5)/10
print(relay)
time.sleep(relay)
except Exception as e:
print('except:', e) def get_file(self,filename):
i=0
f=open(filename,'r')
for line in f.readlines():
line=line.strip()
print(line)
self.get_page(line)
i=i+1
print('第%s个'%(i))
f.close() def run(self):
self.get_file(self.filename) spider=Spider()
starttime=time.time()
spider.run()
print('耗时:%f s'%(time.time()-starttime))

二、多线程爬虫设计代码

1.第一种设计方案的实现示例:

# -*- coding:utf-8 -*-
from threading import Thread
import threading
from queue import Queue
import os,time,random
import requests,mysql.connector
from bs4 import BeautifulSoup
from openpyxl.workbook import Workbook
from openpyxl.styles import Font class ThreadCrawl(Thread):
def __init__(self,tname,relay):
Thread.__init__(self)
#super(MyThread2, self).__init__()
# self.queue=queue
# self.lock=lock
# self.conn=conn
self.relay=relay*random.randint(5,15)/10
self.tname=tname
self.num_retries=3 #设置尝试重新搜索次数
self.headers=[{
'Host':'www.chembridge.com',
'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0',
'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language':'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
'Accept-Encoding':'gzip, deflate',
'Referer':'http://www.chembridge.com/search/search.php?search=1',
'Connection':'keep-alive',
'Upgrade-Insecure-Requests':''
},
{
'Host':'www.hit2lead.com',
'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0',
'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language':'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
'Accept-Encoding':'gzip, deflate, br'
}] def run(self):
print('%s 开始爬取'%self.tname)
# line = my_queue.get()
# print(line)
# while not self.queue.empty():
while len(words)>0:
lock.acquire()
line = words[0]
words.pop(0)
lock.release()
self.get_page(line,self.num_retries)
time.sleep(self.relay*random.randint(5,15)/10) while not my_queue.empty():
line=my_queue.get()
print('重新爬取%s...'%line)
self.get_page(line,num_retries=1)
print('%s 结束'%self.tname) #获取页面内容
def get_page(self,line,num_retries=2):
print('%s正在搜索%s...'%(self.tname,line))
# write this thread task
url='http://www.chembridge.com/search/search.php?searchType=MFCD&query='+line+'&type=phrase&results=10&search=1'
try:
response = requests.get(url,headers=self.headers[0],timeout=20)
status=response.status_code
if status==200:
html_doc=response.text
# print(html_doc)
soup = BeautifulSoup(html_doc, 'lxml')
div=soup.find(id='BBResults')
if div:
links=div.select('a.chemical')
for link in links:
try:
self.get_page_link(link,line)
except Exception as e:
print('%s入库失败:'%line,e)
time.sleep(self.relay*2)
print('%s重新入库'%line)
self.get_page_link(link,line)
continue
print('%s搜索完成'%line)
lock.acquire()
global count
count=count+1
print('已完成%s个'%count)
lock.release()
# time.sleep(self.relay*random.randint(5,15)/10)
else:
print('%s搜索%s网络异常,错误代码:%s'%(self.tname,line,status))
# time.sleep(self.relay*random.randint(5,15)/10)
if num_retries>0:
print('%s尝试重新搜索%s'%(self.tname,line))
time.sleep(self.relay*random.randint(5,15)/10)
self.get_page(line,num_retries-1)
else:
print('%s四次搜索失败!!!'%line)
my_queue.put(line)
# error_list.append(line) except Exception as e:
print('%s搜索%s异常,error:'%(self.tname,line), e)
# time.sleep(self.relay*random.randint(5,15)/10)
if num_retries>0:
print('%s尝试重新搜索%s'%(self.tname,line))
time.sleep(self.relay*random.randint(5,15)/10)
self.get_page(line,num_retries-1)
else:
print('%s四次搜索失败!!!'%line)
my_queue.put(line)
# error_list.append(line)
# self.queue.task_done() #获取下一页链接并解析入库
def get_page_link(self,link,line):
res=[]
href=link.get('href')
print(href)
time.sleep(self.relay*2*random.randint(5,15)/10)
r=requests.get(href,headers=self.headers[1],timeout=20)
if r.status_code==200:
parse_html=r.text
soup1=BeautifulSoup(parse_html, 'lxml')
catalogs=[catalog.get_text() for catalog in soup1.select('form div.matter h2')]#获取catalog
# print(catalogs)
table_headers=[table_header.get_text(strip=True) for table_header in soup1.select('form .matter thead tr')]
if 'AmountPriceQty.' in table_headers:
index=table_headers.index('AmountPriceQty.')
catalog=catalogs[0]
trs=soup1.select('.form tbody tr')
if len(catalogs)>1:
catalog=catalogs[index]
for tr in trs:
if len(tr.select('td'))>1:
row=tuple([catalog])+tuple(td.get_text("|", strip=True) for td in tr.select('td'))
res.append(row)
# print(res)
lock.acquire()
conn=mysql.connector.connect(host='localhost',user='root', passwd='password', db='test')
cursor = conn.cursor()
try:
print('%s: %s正在入库...'%(line,catalog))
sql = 'INSERT INTO chembridge VALUES(%s,%s,%s,%s)'
cursor.executemany(sql,res)
conn.commit()
except Exception as e:
print(e)
finally:
cursor.close()
conn.close()
lock.release() def writeToExcel(datas,filename):
# 在内存创建一个工作簿obj
result_wb = Workbook()
#第一个sheet是ws
ws1 = result_wb.worksheets[0]
# ws1=wb1.create_sheet('result',0)
#设置ws的名称
ws1.title = "爬取结果"
row0 = ['catalog', 'amount', 'price', 'qty']
ft = Font(name='Arial', size=11, bold=True)
for k in range(len(row0)):
ws1.cell(row=1,column=k+1).value=row0[k]
ws1.cell(row=1,column=k+1).font=ft
for i in range(1,len(datas)+1):
for j in range(1,len(row0)+1):
ws1.cell(row=i+1,column=j).value=datas[i-1][j-1]
# 工作簿保存到磁盘
result_wb.save(filename = filename) if __name__ == '__main__':
starttime=time.time()
lock = threading.Lock() words=[] # 存放搜索字段的数据
basedir=os.path.abspath(os.path.dirname(__file__))
filename='MDL.txt'
file=os.path.join(basedir,filename) #文件路径
f=open(file,'r')
for line in f.readlines():
line=line.strip()
words.append(line)
f.close() count=0 # 爬取进度计数
# global my_queue
my_queue = Queue() #FIFO队列,存放第一次搜索失败的字段,保证线程同步
error_list=[] #存放最终搜索失败的字段数组
threads=[] # 程序开始前清空数据库chembridge表数据
conn=mysql.connector.connect(host='localhost',user='root', passwd='password', db='test')
cursor = conn.cursor()
print('清空表...')
cursor.execute('delete from chembridge')
conn.commit()
cursor.close()
conn.close() num_threads=10 #设置爬虫数量
relay=10 # 设置爬取时延,时延=relay*(0.5~1.5之间的随机数)
threadList = []
for i in range(1,num_threads+1):
threadList.append('爬虫-%s'%i)
# 开启多线程
for tName in threadList:
thread = ThreadCrawl(tName,relay)
thread.setDaemon(True)
thread.start()
threads.append(thread)
time.sleep(1)
# 主线程阻塞,等待所有子线程运行结束
for t in threads:
t.join() #将数据保存到excel
conn=mysql.connector.connect(host='localhost',user='root', passwd='password', db='test')
cursor = conn.cursor()
cursor.execute('select * from chembridge')
datas=cursor.fetchall()
conn.commit()
cursor.close()
conn.close()
writeToExcel(datas,'result.xlsx') #统计结果
while not my_queue.empty():
error_line=my_queue.get()
error_list.append(error_line)
print('爬取完成!\n')
if len(error_list)==0:
print('爬取失败列表:0个')
else:
print('总共爬取失败%s个:'%len(error_list),','.join(error_list))
# print('爬取完成!')
print('耗时:%f s'%(time.time()-starttime))

words为存放搜索记录的数组,当搜索记录失败时,会立即尝试重新搜索,num_retries为每条记录的最大搜索次数。如果某条记录在搜索num_retries次后仍失败,会把访问失败的word加入my_queue队列中。

当所有words搜索完时,会重新搜索my_queue中的所有word,循环直到my_queue为空(即所有word搜索成功)。

注意:这里要注意python多线程的GIL,修改同一个全局变量要加锁。

运行截图:

python多线程爬虫设计及实现示例

2.第二种设计方案的实现示例

urls_queue、html_queue和item_queue3分别存放要访问的url、要解析的页面和爬取到的结果。分别设计三个类,Fetcher类根据url进行简单的抓取,Parser类根据抓取内容进行解析,生成待保存的ItemSaver类进行Item的保存。当urls_queue、html_queue和item_queue3个队列同时为空时,所有子线程终止,任务结束。

# coding=utf-8
import threading
import queue,requests
import time,random
import mysql.connector
from bs4 import BeautifulSoup class Fetcher(threading.Thread):
def __init__(self,urls_queue,html_queue):
threading.Thread.__init__(self)
self.__running=threading.Event()
self.__running.set()
self.urls_queue = urls_queue
self.html_queue = html_queue
self.num_retries=3 #设置尝试重新搜索次数
self.headers={
'Host':'www.chembridge.com',
'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0',
'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language':'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
'Accept-Encoding':'gzip, deflate',
'Referer':'http://www.chembridge.com/search/search.php?search=1',
'Connection':'keep-alive',
'Upgrade-Insecure-Requests':''
} def run(self):
while not self.urls_queue.empty():
# while self.__running.isSet():
line=self.urls_queue.get()
print(line)
time.sleep(2*random.randint(5,15)/10)
# self.urls_queue.task_done()
self.get_page(line,self.num_retries)
def get_page(self,line,num_retries=2):
url='http://www.chembridge.com/search/search.php?searchType=MFCD&query='+line+'&type=phrase&results=10&search=1'
try:
response = requests.get(url,headers=self.headers,timeout=20)
status=response.status_code
if status==200:
html_doc=response.text
print(html_doc)
self.html_queue.put(html_doc)
# self.urls_queue.task_done()
print('%s搜索完成'%line)
else:
print('搜索%s网络异常,错误代码:%s'%(line,status))
if num_retries>0:
print('尝试重新搜索%s'%(line))
time.sleep(2*random.randint(5,15)/10)
self.get_page(line,num_retries-1)
else:
print('%s四次搜索失败!!!'%line)
self.urls_queue.put(line) except Exception as e:
print('%s搜索异常,error:'%line,e)
if num_retries>0:
print('尝试重新搜索%s'%(line))
time.sleep(2*random.randint(5,15)/10)
self.get_page(line,num_retries-1)
else:
print('%s四次搜索失败!!!'%line)
self.urls_queue.put(line) def stop(self):
self.__running.clear() class Parser(threading.Thread):
def __init__(self, html_queue,item_queue):
threading.Thread.__init__(self)
self.__running=threading.Event()
self.__running.set()
self.html_queue = html_queue
self.item_queue = item_queue
self.num_retries=3 #设置尝试重新搜索次数
self.headers={
'Host':'www.hit2lead.com',
'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0',
'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language':'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
'Accept-Encoding':'gzip, deflate, br'
}
def run(self):
while self.__running.isSet():
print('html_queue长度: ',self.html_queue.qsize())
# if self.html_queue.empty():
# break
html_doc=self.html_queue.get()
try:
soup = BeautifulSoup(html_doc, 'lxml')
div=soup.find(id='BBResults')
if div:
links=div.select('a.chemical')
for link in links:
self.get_page_link(link,self.num_retries)
relay=random.randint(20,50)/10
# print(relay)
time.sleep(relay)
except Exception as e:
self.html_queue.put(html_doc)
# self.html_queue.task_done() def get_page_link(self,link,num_retries=2):
print('haha')
time.sleep(2*random.randint(5,15)/10)
res=[]
href=link.get('href')
print(href)
response=requests.get(href,headers=self.headers,timeout=20)
status=response.status_code
if status==200:
parse_html=response.text
soup1=BeautifulSoup(parse_html, 'lxml')
catalogs=[catalog.get_text() for catalog in soup1.select('form div.matter h2')]#获取catalog
# print(catalogs)
table_headers=[table_header.get_text(strip=True) for table_header in soup1.select('form .matter thead tr')]
# print(table_headers)
if 'AmountPriceQty.' in table_headers:
index=table_headers.index('AmountPriceQty.')
catalog=catalogs[0]
trs=soup1.select('.form tbody tr')
# print(trs)
if len(catalogs)>1:
catalog=catalogs[index]
for tr in trs:
if len(tr.select('td'))>1:
row=tuple([catalog])+tuple(td.get_text("|", strip=True) for td in tr.select('td'))
res.append(row)
# print(res)
self.item_queue.put(res)
else:
print('搜索%s网络异常,错误代码:%s'%(link,status))
# time.sleep(self.relay*random.randint(5,15)/10)
if num_retries>0:
print('尝试重新搜索%s'%(link))
time.sleep(random.randint(5,15)/10)
self.get_page_link(link,num_retries-1)
else:
print('%s四次搜索失败!!!'%line)
def stop(self):
self.__running.clear() class Saver(threading.Thread):
def __init__(self, item_queue):
threading.Thread.__init__(self)
self.__running=threading.Event()
self.__running.set()
self.item_queue = item_queue def run(self):
# while not self.item_queue.empty():
while self.__running.isSet():
print('item_queue长度: ',self.item_queue.qsize())
res=self.item_queue.get()
print(res)
conn=mysql.connector.connect(host='localhost',user='root', passwd='password', db='test')
cursor = conn.cursor()
sql = 'INSERT INTO chembridge_test2 VALUES(%s,%s,%s,%s)'
cursor.executemany(sql,res)
print('入库')
conn.commit()
cursor.close()
conn.close()
def stop(self):
self.__running.clear() if __name__ == '__main__':
starttime=time.time()
lock = threading.Lock()
urls_queue = queue.Queue()
html_queue = queue.Queue()
item_queue = queue.Queue() conn=mysql.connector.connect(host='localhost',user='root', passwd='password', db='test')
cursor = conn.cursor()
print('清空表...')
cursor.execute('delete from chembridge_test2')
conn.commit()
cursor.close()
conn.close() print('start...') f=open('MDL1.txt','r')
for line in f.readlines():
line=line.strip()
urls_queue.put(line)
f.close() threads=[]
for j in range(8):
thread1 = Fetcher(urls_queue,html_queue)
thread1.setDaemon(True)
thread1.start()
threads.append(thread1)
for j in range(1):
thread1 = Parser(html_queue,item_queue)
thread1.setDaemon(True)
thread1.start()
threads.append(thread1)
for j in range(2):
thread1 = Saver(item_queue)
thread1.setDaemon(True)
thread1.start()
threads.append(thread1) # while not urls_queue.empty():
# while not html_queue.empty():
# while not item_queue.empty():
# pass
while True:
time.sleep(0.5)
if urls_queue.empty() and html_queue.empty() and item_queue.empty():
break print('完成!')
for t in threads:
t.stop()
for t in threads:
t.join()
print('end')
print('耗时:%f s'%(time.time()-starttime))

根据网络情况,设置线程数量,避免requests访问网络时阻塞。

另外附上用scrapy实现的代码

items.py

import scrapy

class ChemItem(scrapy.Item):
# define the fields for your item here like:
# name = scrapy.Field()
catalog=scrapy.Field()
amount=scrapy.Field()
price=scrapy.Field()
qty=scrapy.Field()

quotes_spider.py

# -*- coding: utf-8 -*-
import scrapy
from scrapy.selector import Selector
from tutorial.items import ChemItem class QuotesSpider(scrapy.Spider):
name = "quotes"
# allowed_domains = ["chembridge.com"]
headers=[{
'Host':'www.chembridge.com',
'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0',
'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language':'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
'Accept-Encoding':'gzip, deflate',
'Referer':'http://www.chembridge.com/search/search.php?search=1',
'Connection':'keep-alive',
'Upgrade-Insecure-Requests':''
},
{
'Host':'www.hit2lead.com',
'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0',
'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language':'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
'Accept-Encoding':'gzip, deflate, br'
}]
def start_requests(self):
start_urls = []
f=open('MDL.txt','r')
for line in f.readlines():
line=line.strip()
print(line)
start_urls.append('http://www.chembridge.com/search/search.php?searchType=MFCD&query='+line+'&type=phrase&results=10&search=1')
for url in start_urls:
yield scrapy.Request(url=url, callback=self.parse,headers=self.headers[0]) def parse(self, response):
links=response.css('#BBResults a.chemical::attr(href)').extract()
for link in links:
yield scrapy.Request(url=link,callback=self.parse_dir_contents,headers=self.headers[1]) def parse_dir_contents(self, response):
items=[]
catalogs=response.css('form div.matter h2::text').extract()
table_headers=[''.join(res.re(r'>(.*)</td>')) for res in response.css('form div.matter thead tr')]
print(table_headers)
index=table_headers.index('AmountPriceQty.')
catalog=catalogs[0]
trs=response.css('.form tbody tr')
if len(catalogs)>1:
catalog=catalogs[index]
for tr in trs:
if len(tr.css('td'))>1:
item=ChemItem()
# print(tr.css('td::text').extract())
# row=tuple([catalog])+tuple(td.get_text("|", strip=True) for td in tr.css('td'))
item['catalog']=catalog
item['amount']=tr.css('td')[0].css('::text').extract()[0]
item['price']='|'.join(tr.css('td')[1].css('::text').extract())
print(len(tr.css('td::text')))
item['qty']=tr.css('td')[2].css('::text').extract()[0] if len(tr.css('td')[2].css('::text').extract())==1 else tr.css('td')[2].css('::attr(value)').extract()[0]
# self.log('Saved result %s' % item)
# print(tr.css('td::text')[0].extract())
yield item
# items.append(item)
# return items

pipelines.py

#将数据存储到mysql数据库
from twisted.enterprise import adbapi
import MySQLdb
import MySQLdb.cursors
from scrapy import log class MySQLStorePipeline(object):
def __init__(self, dbpool):
self.dbpool = dbpool #数据库参数
@classmethod
def from_settings(cls, settings):
dbargs = dict(
host=settings['MYSQL_HOST'],
db=settings['MYSQL_DBNAME'],
user=settings['MYSQL_USER'],
passwd=settings['MYSQL_PASSWD'],
charset='utf8',
cursorclass = MySQLdb.cursors.DictCursor,
use_unicode= True,
)
dbpool = adbapi.ConnectionPool('MySQLdb', **dbargs)
return cls(dbpool) # #数据库参数
# def __init__(self):
# dbargs = dict(
# host = 'localhost',
# db = 'test',
# user = 'root',
# passwd = 'password',
# cursorclass = MySQLdb.cursors.DictCursor,
# charset = 'utf8',
# use_unicode = True
# )
# self.dbpool = adbapi.ConnectionPool('MySQLdb',**dbargs) '''
The default pipeline invoke function
'''
def process_item(self, item,spider):
res = self.dbpool.runInteraction(self.insert_into_table,item)
res.addErrback(self.handle_error)
return item
#插入的表,此表需要事先建好
def insert_into_table(self,conn,item):
conn.execute('insert into chembridge(catalog, amount, price,qty) values(%s,%s,%s,%s)', (
item['catalog'],
item['amount'],
# item['star'][0],
item['price'],
item['qty']
))
def handle_error(self,e):
log.err(e)

settings.py

FEED_EXPORTERS = {
'csv': 'tutorial.spiders.csv_item_exporter.MyProjectCsvItemExporter',
} #tutorial为工程名 FIELDS_TO_EXPORT = [
'catalog',
'amount',
'price',
'qty'
] LINETERMINATOR='\n' ITEM_PIPELINES = {
'tutorial.pipelines.MySQLStorePipeline': 300,
} # start MySQL database configure setting
MYSQL_HOST = 'localhost'
MYSQL_DBNAME = 'test'
MYSQL_USER = 'root'
MYSQL_PASSWD = 'password'
# end of MySQL database configure setting

main.py

# -*- coding: utf-8 -*-
from scrapy import cmdline
cmdline.execute("scrapy crawl quotes -o items.csv -t csv".split())

最后运行main.py,将结果同时保存到csv文件和mysql数据库中。

python多线程爬虫设计及实现示例的更多相关文章

  1. python多线程爬虫&plus;批量下载斗图啦图片项目(关注、持续更新)

    python多线程爬虫项目() 爬取目标:斗图啦(起始url:http://www.doutula.com/photo/list/?page=1) 爬取内容:斗图啦全网图片 使用工具:requests ...

  2. Python多线程爬虫与多种数据存储方式实现&lpar;Python爬虫实战2&rpar;

    1. 多进程爬虫 对于数据量较大的爬虫,对数据的处理要求较高时,可以采用python多进程或多线程的机制完成,多进程是指分配多个CPU处理程序,同一时刻只有一个CPU在工作,多线程是指进程内部有多个类 ...

  3. Python多线程爬虫爬取电影天堂资源

    最近花些时间学习了一下Python,并写了一个多线程的爬虫程序来获取电影天堂上资源的迅雷下载地址,代码已经上传到GitHub上了,需要的同学可以自行下载.刚开始学习python希望可以获得宝贵的意见. ...

  4. python 多线程爬虫

    最近,一直在做网络爬虫相关的东西. 看了一下开源C++写的larbin爬虫,仔细阅读了里面的设计思想和一些关键技术的实现. 1.larbin的URL去重用的很高效的bloom filter算法: 2. ...

  5. Python多线程爬虫爬取网页图片

    临近期末考试,但是根本不想复习!啊啊啊啊啊啊啊!!!! 于是做了一个爬虫,网址为 https://yande.re,网页图片为动漫美图(图片带点颜色........宅男福利 github项目地址为:h ...

  6. Python多线程爬虫详解

    一.程序进程和线程之间的关系 程序:一个应用就是一个程序,比如:qq,爬虫 进程:程序运行的资源分配最小单位, 很多人学习python,不知道从何学起.很多人学习python,掌握了基本语法过后,不知 ...

  7. Python多线程爬虫

    前言 用上多线程,感觉爬虫跑起来带着风 运行情况 爬取了9万多条文本记录,耗时比较短,一会儿就是几千条 关键点 多个线程对同一全局变量进行修改要加锁 # 获取锁,用于线程同步 threadLock.a ...

  8. python 多线程爬虫 实例

    多进程 Multiprocessing 模块 Process 类用来描述一个进程对象.创建子进程的时候,只需要传入一个执行函数和函数的参数即可完成 Process 示例的创建. star() 方法启动 ...

  9. python多线程爬虫:亚马逊价格

    import re import requests import threading import time from time import ctime,sleep from queue impor ...

随机推荐

  1. 安装软件时出现error 1337 【添加权限】

    Error 1317 An error occurred while attempting to create the directory Drive Name:\Folder Name https: ...

  2. Android HTTP实例 使用GET方法和POST方法发送请求

    Android HTTP实例 使用GET方法和POST方法发送请求 Web程序:使用GET和POST方法发送请求 首先利用MyEclispe+Tomcat写好一个Web程序,实现的功能就是提交用户信息 ...

  3. ichat在线客服jQuery插件&lpar;可能是历史上最灵活的&rpar;

    ichat是一款开源免费在线客服jQuery插件,通过该插件,您可以*的定制属于自己的在线客服代码. ichat充分吸收传统在线客服插件的优点,并加上自身的独特设计,使得ichat可定制性异常强大. ...

  4. AS 学习笔记 加载数据

    AS2 加载本地(外部)数据.swf .png .jpg 等资源使用loadMovie() 加载库里面的mc 用 attachMovie AS3 加载本地(外部)数据 用 Loader 类来完成这个操 ...

  5. asp&period;net2&period;0安全性&lpar;1&rpar;--用户角色篇&lpar;类&rpar;--转载来自车老师

    Membership.MembershipUser和Roles类 用户与角色管理在asp.net2.0中是通过Membership和Roles两个类来实现的. Membership:用户成员账号管理, ...

  6. 大量Javascript&sol;JQuery学习教程电子书合集

    [推荐分享]大量Javascript/JQuery学习教程电子书合集,送给有需要的人   不收藏是你的错^_^. 经证实,均可免费下载. 资源名称 资源大小   15天学会jQuery(完整版).pd ...

  7. 末学者笔记--apache编译安装及LAMP架构上线

    apache介绍 一.Apache的三种工作模式 Apache一共有3种稳定的MPM模式(多进程处理模块),它们分别是prefork.worker.event.http-2.2版本的httpd默认的m ...

  8. 执行makemigrations后错误集锦

    在项目配置xadmin后,执行python manage.py makemigrations后出现了很多问题: 1.ModuleNotFoundError: No module named 'futu ...

  9. yield from

    一.yield 关于yield详细可参考我这篇文章 下面是一个带yield的生成器: def gen_yield(): while True: recv = yield do something wi ...

  10. IDEA使用笔记(十)——设置Java方法注释

    如果你看到了,这篇博文,那么你是幸运的!你问什么?你百度百度同类型的网文就明白了! 一:先看效果 二:我的实验过程(肯定还有别的方式) 1:新建   Template Group,详细操作步骤见下图 ...