需求场景:
老大让我利用爬虫爬取的数据写到或更新到mysql数据库中,百度了两种方法
1 是使用pymysql连接mysql,通过操作原生的sql语句进行增删改查数据;
2 是使用sqlalchemy连接mysql,通过ORM模型建表并操作数据库,不需要写原生的sql语句,相对简单些;
以下就是本次使用sqlalchemy的经验之谈。
实现流程:连接数据库》通过模型类创建表》建立会话》执行创建表语句》通过会话进行增删改查
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
from sqlalchemy import exists, Column, Integer, String, ForeignKey, exists
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# 创建的数据库引擎
engine = create_engine( "mysql+pymysql://user:pwd@ip/数据库名?charset=utf8" )
#创建session类型
DBSession = sessionmaker(bind = engine)
# 实例化官宣模型 - Base 就是 ORM 模型
Base = declarative_base()
# 创建服务单表
class ServiceOrder(Base):
__tablename__ = 'serviceOrderTable'
id = Column(Integer, primary_key = True , autoincrement = True )
serviceOrderId = Column(String( 32 ), nullable = False , index = True , comment = '服务单ID' )
serviceDesc = Column(String( 268 ), comment = '服务说明' )
oneLevelName = Column(String( 32 ), comment = 'C类别' )
twoLevelName = Column(String( 32 ), comment = 'T子类' )
threeLevelName = Column(String( 32 ), comment = 'I项目' )
fourLevelName = Column(String( 32 ), comment = 'S子项' )
transferTimes = Column(String( 32 ), comment = '转派次数' )
overDueStatus = Column(String( 32 ), comment = '过期状态' )
serviceTimeLimit = Column(String( 32 ), comment = '服务时限' )
serTimeLimitTypeName = Column(String( 16 ), comment = '时限类型' )
# 一对多:
# serviceWorkOrder = relationship("ServiceWorkOrder", backref="serviceorder")
# 多对一:多个服务工单可以属于服务单
class ServiceWorkOrder(Base):
__tablename__ = 'serviceWorkOrderTable'
id = Column(Integer, primary_key = True , autoincrement = True )
serviceWorkOrderId = Column(String( 32 ), nullable = False , index = True , comment = '服务工单ID' )
workOrderName = Column(String( 268 ), comment = '工单名称' )
fromId = Column(String( 32 ), comment = '服务单ID' )
createUserSectionName = Column(String( 32 ), comment = '创建人室' )
createUserName = Column(String( 32 ), comment = '创建人' )
handlerName = Column(String( 32 ), comment = '处理人' )
statusName = Column(String( 32 ), comment = '工单状态' )
createTime = Column(String( 32 ), comment = '创建时间' )
# “多”的一方的book表是通过外键关联到user表的:
# serviceOrder_id = Column(Integer, ForeignKey('serviceOrderTable.id'))
# 创建数据库 如果数据库已存在 则不会创建 会根据库名直接连接已有的库
def init_db():
Base.metadata.create_all(engine)
def drop_db():
Base.metadata.drop_all(engine)
def insert_update():
# all_needed_data_lists 是需要插入数据库的数据 格式[{key: value, ... }, { }, { }...]
for item in all_needed_data_lists:
ServiceOrderRow = ServiceOrder(serviceOrderId = item[ 'serviceOrderId' ],
serviceDesc = item[ 'serviceDesc' ],
oneLevelName = item[ 'oneLevelName' ],
twoLevelName = item[ 'twoLevelName' ],
threeLevelName = item[ 'threeLevelName' ],
fourLevelName = item[ 'fourLevelName' ],
transferTimes = item[ 'transferTimes' ],
overDueStatus = item[ 'overDueStatus' ],
serviceTimeLimit = item[ 'serviceTimeLimit' ],
serTimeLimitTypeName = item[ 'serTimeLimitTypeName' ],
)
try :
# 利用exists判断目标对象是否存在,返回True或Faults
it_exists = session.query(
exists().where(ServiceOrder.serviceOrderId = = item[ 'serviceOrderId' ] )
).scalar()
except Exception as e:
self .log.error(e)
break
try :
# 如果不存在,进行新增;存在的话就更新现存的数据
if not it_exists:
session.add(ServiceOrderRow)
else :
session.query(ServiceOrder). filter (ServiceOrder.serviceOrderId = = item[ 'serviceOrderId' ])\
.update(item)
except Exception as e:
self .log.error(e)
break
try :
session.commit()
self .log.info( '数据更新成功!' )
except :
session.rollback()
self .log.info( '数据更新失败!' )
if __name__ = = "__main__" :
# 创建数据库 如果数据库已存在 则不会创建 会根据库名直接连接已有的库
init_db()
# 创建session对象,进行增删改查:
session = DBSession()
# 利用session 增 改数据 记得提交
insert_update()
|
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://www.cnblogs.com/We612/p/12105135.html