一:twisted中的adbapi
数据库pymysql的commit()和execute()在提交数据时,都是同步提交至数据库,由于scrapy框架数据的解析和异步多线程的,所以scrapy的数据解析速度,要远高于数据的写入数据库的速度。如果数据写入过慢,会造成数据库写入的阻塞,影响数据库写入的效率。
使用twisted异步IO框架,实现数据的异步写入,通过多线程异步的形式对数据进行写入,可以提高数据的写入速度。
1.1 两个主要方法
adbapi.ConnectionPool:
创建一个数据库连接池对象,其中包括多个连接对象,每个连接对象在独立的线程中工作。adbapi只是提供了异步访问数据库的编程框架,再其内部依然使MySQLdb这样的库访问数据库。
dbpool.runInteraction(do_insert,item):
异步调用do_insert函数,dbpool会选择连接池中的一个连接对象在独立线程中调用insert_db,其中参数item会被传给do_insert的第二个参数,传给do_insert的第一个参数是一个Transaction对象,其接口与Cursor对象类似,可以调用execute方法执行SQL语句,do_insert执行后,连接对象会自动调用commit方法
1.2 使用实例
1
|
from twisted.enterprise import adbapi
|
1
2
3
4
|
# 初始化数据库连接池(线程池)
# 参数一:mysql的驱动
# 参数二:连接mysql的配置信息
dbpool = adbapi.ConnectionPool( 'pymysql' , * * params)
|
1
2
3
|
# 参数1:在异步任务中要执行的函数insert_db;
# 参数2:给该函数insert_db传递的参数
query = self .dbpool.runInteraction( self .do_insert, item)
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
# 在execute()之后,不需要再进行commit(),连接池内部会进行提交的操作。
def do_insert( self , cursor, item):
insert_sql = """
insert into qa_sample(
need_id,
need_question_uptime,
need_title,
need_title_describe,
need_answer_uptime,
need_answer)
values (%s, %s, %s, %s, %s, %s)
"""
params = (item[ 'need_id' ],
item[ 'need_question_uptime' ],
item[ 'need_title' ],
item[ 'need_title_describe' ],
item[ 'need_answer_uptime' ],
item[ 'need_answer' ])
cursor.execute(insert_sql, params)
|
二:结合scrapy中的pipelines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
# -*- coding: utf-8 -*-
from twisted.enterprise import adbapi
import pymysql
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
class QaSpiderPipeline( object ):
def process_item( self , item, spider):
return item
class MysqlTwistedPipeline( object ):
def __init__( self , dbpool):
self .dbpool = dbpool
@classmethod
def from_settings( cls , settings):
dbparams = dict (
host = settings[ 'MYSQL_HOST' ],
db = settings[ 'MYSQL_DBNAME' ],
user = settings[ 'MYSQL_USER' ],
passwd = settings[ 'MYSQL_PASSWORD' ],
charset = 'utf8' ,
cursorclass = pymysql.cursors.DictCursor,
use_unicode = True
)
dbpool = adbapi.ConnectionPool( 'pymysql' , * * dbparams)
return cls (dbpool)
def process_item( self , item, spider):
query = self .dbpool.runInteraction( self .do_insert, item)
def do_insert( self , cursor, item):
insert_sql = """
insert into qa_sample(
need_id,
need_question_uptime,
need_title,
need_title_describe,
need_answer_uptime,
need_answer)
values (%s, %s, %s, %s, %s, %s)
"""
params = (item[ 'need_id' ],
item[ 'need_question_uptime' ],
item[ 'need_title' ],
item[ 'need_title_describe' ],
item[ 'need_answer_uptime' ],
item[ 'need_answer' ])
cursor.execute(insert_sql, params)
|
到此这篇关于Python中Scrapy+adbapi提高数据库写入效率实现的文章就介绍到这了,更多相关Scrapy+adbapi数据库写入内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!
原文链接:https://juejin.cn/post/7021024993500725256