之前一直在写有关scrapy爬虫的事情,今天我们看看使用scrapy如何把爬到的数据放在mysql数据库中保存。
有关python操作mysql数据库的内容,网上已经有很多内容可以参考了,但都是在同步的操作mysql数据库。在数据量不大的情况下,这种方法固然可以,但是一旦数据量增长后,mysql就会出现崩溃的情况,因为网上爬虫的速度要远远高过往数据库中插入数据的速度。为了避免这种情况发生,我们就需要使用异步的方法来存储数据,爬虫与数据存储互不影响。
为了显示方便,我们把程序设计的简单一点,只是爬一页的数据。我们今天选择伯乐在线这个网站来爬取,只爬取第一页的数据。
首先我们还是要启动一个爬虫项目,然后自己建了一个爬虫的文件jobbole.py。我们先来看看这个文件中的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
# -*- coding: utf-8 -*-
import io
import sys
import scrapy
import re
import datetime
from scrapy.http import request
from urllib import parse
from articlespider.items import jobbolearticleitem, articleitemloader
from scrapy.loader import itemloader
sys.stdout = io.textiowrapper(sys.stdout. buffer ,encoding = 'utf-8' )
class jobbolespider(scrapy.spider):
"""docstring for jobbolespider"""
name = "jobbole"
allowed_domain = [ "blog.jobbole.com" ]
start_urls = [ 'http://blog.jobbole.com/all-posts/' ]
def parse( self , response):
"""
1.获取列表页中的文章url
"""
# 解析列表汇中所有文章url并交给scrapy下载器并进行解析
post_nodes = response.css( "#archive .floated-thumb .post-thumb a" )
for post_node in post_nodes:
image_url = post_node.css( "img::attr(src)" ).extract_first("") # 这里取出每篇文章的封面图,并作为meta传入request
post_url = post_node.css( "::attr(href)" ).extract_first("")
yield request(url = parse.urljoin(response.url, post_url), meta = { "front_image_url" :image_url}, callback = self .parse_detail)
def parse_detail( self , response):
article_item = jobbolearticleitem()
# 通过itemloader加载item
# 通过add_css后的返回值都是list型,所有我们再items.py要进行处理
item_loader = articleitemloader(item = jobbolearticleitem(), response = response)
item_loader.add_css( "title" , ".entry-header h1::text" )
item_loader.add_value( "url" , response.url)
# item_loader.add_value("url_object_id", get_md5(response.url))
item_loader.add_value( "url_object_id" , response.url)
item_loader.add_css( "create_date" , "p.entry-meta-hide-on-mobile::text" )
item_loader.add_value( "front_image_url" , [front_image_url])
item_loader.add_css( "praise_nums" , ".vote-post-up h10::text" )
item_loader.add_css( "comment_nums" , "a[href='#article-comment'] span::text" )
item_loader.add_css( "fav_nums" , ".bookmark-btn::text" )
item_loader.add_css( "tags" , "p.entry-meta-hide-on-mobile a::text" )
item_loader.add_css( "content" , "div.entry" )
article_item = item_loader.load_item()
print (article_item[ "tags" ])
yield article_item
pass
|
这里我把代码进行了简化,首先对列表页发出请求,这里只爬取一页数据,然后分析每一页的url,并且交给scrapy对每一个url进行请求,得到每篇文章的详情页,把详情页的相关内容放在mysql数据库中。
这里使用itemloader来进行页面的解析,这样解析有个最大的好处就是可以把解析规则存放在数据库中,实现对解析规则的动态加载。但是要注意一点是使用itemloader中css方式和xpath方式得到的数据都是list型,因此还需要在items.py中再对相对应的数据进行处理。
接下来我们就来看看items.py是如何处理list数据的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
# -*- coding: utf-8 -*-
# define here the models for your scraped items
#
# see documentation in:
# https://doc.scrapy.org/en/latest/topics/items.html
import datetime
import re
import scrapy
from scrapy.loader import itemloader
from scrapy.loader.processors import mapcompose, takefirst,join
from articlespider.utils.common import get_md5
def convert_date(value):
try :
create_date = datetime.datetime.strptime(create_date, "%y/%m/%d" ).date()
except exception as e:
create_date = datetime.datetime.now().date()
return create_date
def get_nums(value):
match_re = re.match( ".*?(\d+).*" , value)
if match_re:
nums = int (match_re.group( 1 ))
else :
nums = 0
return nums
def remove_comment_tags(value):
# 去掉tags中的评论内容
if "评论" in value:
# 这里做了修改,如果返回"",则在list中仍然会占位,会变成类似于["程序员",,"解锁"]这样
# return ""
return none
else :
return value
def return_value(value):
return
class articleitemloader(itemloader):
"""docstring for ariticleitemloader"""
# 自定义itemloader
default_output_processor = takefirst()
class articlespideritem(scrapy.item):
# define the fields for your item here like:
# name = scrapy.field()
pass
class jobbolearticleitem(scrapy.item):
"""docstring for articlespideritem"""
title = scrapy.field()
create_date = scrapy.field(
input_processor = mapcompose(convert_date)
)
url = scrapy.field()
url_object_id = scrapy.field(
output_processor = mapcompose(get_md5)
)
# 这里注意front_image_url还是一个list,在进行sql语句时还需要处理
front_image_url = scrapy.field(
output_processor = mapcompose(return_value)
)
front_image_path = scrapy.field()
praise_nums = scrapy.field(
input_processor = mapcompose(get_nums)
)
comment_nums = scrapy.field(
input_processor = mapcompose(get_nums)
)
fav_nums = scrapy.field(
input_processor = mapcompose(get_nums)
)
# tags要做另行处理,因为tags我们需要的就是list
tags = scrapy.field(
input_processor = mapcompose(remove_comment_tags),
output_processor = join( "," )
)
content = scrapy.field()
|
首先我们看到定义了一个类articleitemloader,在这个类中只有一句话,就是对于每个items都默认采用list中的第一个元素,这样我们就可以把每个items中的第一个元素取出来。但是要注意,有些items我们是必须要用list型的,比如我们给imagepipeline的数据就要求必须是list型,这样我们就需要对front_image_url单独进行处理。这里我们做了一个小技巧,对front_image_url什么都不错,因为我们传过来的front_image_url就是list型
在items的field中有两个参数,一个是input_processor,另一个是output_processor,这两个参数可以帮助我们对items的list中的每个元素进行处理,比如有些需要用md5进行加密,有些需要用正则表达式进行筛选或者排序等等。
在进行mysql的pipeline之前,我们需要设计数据库,下面是我自己设计的数据库的字段,仅供参考
这里我把url_object_id作为该表的主键,由于它不会重复,所以适合做主键。
下面我们来看看数据库的pipeline。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
# -*- coding: utf-8 -*-
# define your item pipelines here
#
# don't forget to add your pipeline to the item_pipelines setting
# see: https://doc.scrapy.org/en/latest/topics/item-pipeline.html
import codecs
import json
from twisted.enterprise import adbapi
import mysqldb
import mysqldb.cursors
class mysqltwistedpipeline( object ):
"""docstring for mysqltwistedpipeline"""
#采用异步的机制写入mysql
def __init__( self , dbpool):
self .dbpool = dbpool
@classmethod
def from_settings( cls , settings):
dbparms = dict (
host = settings[ "mysql_host" ],
db = settings[ "mysql_dbname" ],
user = settings[ "mysql_user" ],
passwd = settings[ "mysql_password" ],
charset = 'utf8' ,
cursorclass = mysqldb.cursors.dictcursor,
use_unicode = true,
)
dbpool = adbapi.connectionpool( "mysqldb" , * * dbparms)
return cls (dbpool)
def process_item( self , item, spider):
#使用twisted将mysql插入变成异步执行
query = self .dbpool.runinteraction( self .do_insert, item)
query.adderrback( self .handle_error, item, spider) #处理异常
return item
def handle_error( self , failure, item, spider):
# 处理异步插入的异常
print (failure)
def do_insert( self , cursor, item):
#执行具体的插入
#根据不同的item 构建不同的sql语句并插入到mysql中
# insert_sql, params = item.get_insert_sql()
# print (insert_sql, params)
# cursor.execute(insert_sql, params)
insert_sql = """
insert into jobbole_article(title, url, create_date, fav_nums, url_object_id)
values (%s, %s, %s, %s, %s)
"""
# 可以只使用execute,而不需要再使用commit函数
cursor.execute(insert_sql, (item[ "title" ], item[ "url" ], item[ "create_date" ], item[ "fav_nums" ], item[ "url_object_id" ]))
|
在这里我们只是演示一下,我们只向数据库中插入5个字段的数据,分别是title,url,create_date,fav_nums,url_object_id。
当然你也可以再加入其它的字段。
首先我们看看from_settings这个函数,它可以从settings.py文件中取出我们想想要的数据,这里我们把数据库的host,dbname,username和password都放在settings.py中。实际的插入语句还是在process_item中进行,我们自己定义了一个函数do_insert,然后把它传给dbpool中用于插入真正的数据。
最后我们来看看settings.py中的代码,这里就很简单了。
1
2
3
4
|
mysql_host = "localhost"
mysql_dbname = "article_wilson"
mysql_user = "root"
mysql_password = "root"
|
其实这里是和pipeline中的代码是想对应的,别忘了把在settings.py中把pipeline打开。
1
2
3
4
5
6
7
8
9
10
|
item_pipelines = {
# 'articlespider.pipelines.articlespiderpipeline': 300,
# 'articlespider.pipelines.jsonwithencodingpipeline': 1
# # 'scrapy.pipelines.images.imagepipeline': 1,
# 'articlespider.pipelines.jsonexporterpipleline': 1
# 'articlespider.pipelines.articleimagepipeline': 2
# 'articlespider.pipelines.mysqlpipeline': 1
'articlespider.pipelines.mysqltwistedpipeline' : 1
}
|
好了,现在我们可以跑一程序吧。
scrapy crawl jobbole
下面是运行结果的截图
好了,以上就是今天的全部内容了。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/Wilson_Iceman/article/details/79270235