python使用MySQL主要有两个模块,pymysql(MySQLdb)和SQLAchemy。
pymysql(MySQLdb)为原生模块,直接执行sql语句,其中pymysql模块支持python 2和python3,MySQLdb只支持python2,两者使用起来几乎一样。
SQLAchemy为一个ORM框架,将数据对象转换成SQL,然后使用数据API执行SQL并获取执行结果
另外DBUtils模块提供了一个数据库连接池,方便多线程场景中python操作数据库。
1.pymysql模块
安装:pip install pymysql
创建表格操作: (注意中文格式设置)
#coding:utf-8 import pymysql #关于中文问题 #1. mysql命令行创建数据库,设置编码为gbk:create databse demo2 character set utf8; #2. python代码中连接时设置charset="gbk" #3. 创建表格时设置default charset=utf8 #连接数据库 conn = pymysql.connect(host="localhost", user="root", passwd="", db=\'learningsql\', charset=\'utf8\', port=3306) #和mysql服务端设置格式一样(还可设置为gbk, gb2312) #创建游标 cursor = conn.cursor() #执行sql语句 cursor.execute("""create table if not exists t_sales( id int primary key auto_increment not null, nickName varchar(128) not null, color varchar(128) not null, size varchar(128) not null, comment text not null, saledate varchar(128) not null)engine=InnoDB default charset=utf8;""") # cursor.execute("""insert into t_sales(nickName,color,size,comment,saledate) # values(\'%s\',\'%s\',\'%s\',\'%s\',\'%s\');""" % ("zack", "黑色", "L", "大小合适", "2019-04-20")) cursor.execute("""insert into t_sales(nickName,color,size,comment,saledate) values(%s,%s,%s,%s,%s);""" , ("zack", "黑色", "L", "大小合适", "2019-04-20")) #提交 conn.commit() #关闭游标 cursor.close() #关闭连接 conn.close() 创建表格操作
增删改查:
注意execute执行sql语句参数的两种情况:
execute( "insert into t_sales(nickName, size) values(\'%s\',\'%s\');" % ("zack","L") ) #此时的%s为字符窜拼接占位符,需要引号加\'%s\' (有sql注入风险)
execute( "insert into t_sales(nickName, size) values(%s,%s);" , ("zack","L") ) #此时的%s为sql语句占位符,不需要引号%s
#***************************增删改查****************************************************** conn = pymysql.connect(host="localhost", user="root", passwd="", db=\'learningsql\', charset=\'utf8\', port=3306) #和mysql服务端设置格式一样(还可设置为gbk, gb2312) #创建游标 cursor = conn.cursor() insert_sql = "insert into t_sales(nickName,color,size,comment,saledate) values(%s,%s,%s,%s,%s);" #返回受影响的行数 row1 = cursor.execute(insert_sql,("Bob", "黑色", "XL", "便宜实惠", "2019-04-20")) update_sql = "update t_sales set color=\'白色\' where id=%s;" #返回受影响的行数 row2 = cursor.execute(update_sql,(1,)) select_sql = "select * from t_sales where id>%s;" #返回受影响的行数 row3 = cursor.execute(select_sql,(1,)) delete_sql = "delete from t_sales where id=%s;" #返回受影响的行数 row4 = cursor.execute(delete_sql,(4,)) #提交,不然无法保存新建或者修改的数据(增删改得提交) conn.commit() cursor.close() conn.close() 增删改查语句
批量插入和自增id:
#***************************批量插入****************************************************** conn = pymysql.connect(host="localhost", user="root", passwd="", db=\'learningsql\', charset=\'utf8\', port=3306) #和mysql服务端设置格式一样(还可设置为gbk, gb2312) #创建游标 cursor = conn.cursor() insert_sql = "insert into t_sales(nickName,color,size,comment,saledate) values(%s,%s,%s,%s,%s);" data = [("Bob", "黑色", "XL", "便宜实惠", "2019-04-20"),("Ted", "黄色", "M", "便宜实惠", "2019-04-20"),("Gary", "黑色", "M", "穿着舒服", "2019-04-20")] row1 = cursor.executemany(insert_sql, data) conn.commit() #为插入的第一条数据的id,即插入的为5,6,7,new_id=5 new_id = cursor.lastrowid print(new_id) cursor.close() conn.close() 批量插入
获取查询数据:
#***************************获取查找sql的查询数据****************************************************** conn = pymysql.connect(host="localhost", user="root", passwd="", db=\'learningsql\', charset=\'utf8\', port=3306) #和mysql服务端设置格式一样(还可设置为gbk, gb2312) #创建游标 cursor = conn.cursor() select_sql = "select id,nickname,size from t_sales where id>%s;" cursor.execute(select_sql, (3,)) row1 = cursor.fetchone() #获取第一条数据,获取后游标会向下移动一行 row_n = cursor.fetchmany(3) #获取前n条数据,获取后游标会向下移动n行 row_all = cursor.fetchall() #获取所有数据,获取后游标会向下移动到末尾 print(row1) print(row_n) print(row_all) #conn.commit() cursor.close() conn.close() 查询数据获取
注:在fetch数据时按照顺序进行,可以使用cursor.scroll(num,mode)来移动游标位置,如:
- cursor.scroll(1,mode=\'relative\') # 相对当前位置移动
- cursor.scroll(2,mode=\'absolute\') # 相对绝对位置移动
fetch获取数据类型
fetch获取的数据默认为元组格式,还可以获取字典类型的,如下:
#***************************获取字典格式数据****************************************************** conn = pymysql.connect(host="localhost", user="root", passwd="", db=\'learningsql\', charset=\'utf8\', port=3306) #和mysql服务端设置格式一样(还可设置为gbk, gb2312) #创建游标 cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) select_sql = "select id,nickname,size from t_sales where id>%s;" cursor.execute(select_sql, (3,)) row1 = cursor.fetchall() print(row1) conn.commit() cursor.close() conn.close()
2.SQLAlchmy框架
SQLAlchemy的整体架构如下,建立在第三方的DB API上,将类和对象操作转换为数据库sql,然后利用DB API执sql语句得到结果。其适用于多种数据库。另外其内部实现了数据库连接池,方便进行多线程操作。
- Engine,框架的引擎
- Connection Pooling ,数据库连接池
- Dialect,选择连接数据库的DB API种类,(pymysql,mysqldb等)
- Schema/Types,架构和类型
- SQL Exprression Language,SQL表达式语言
- DB API: Python Database API Specification
2.1 执行原生sql
安装:pip install sqlalchemy
SQLAlchmy也可以不利用ORM,使用数据库连接池,类似pymysql模块执行原生sql
#coding:utf-8 from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, String, Integer import threading engine = create_engine( "mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8", max_overflow = 0, #超过连接池大小外最多创建的连接,为0表示超过5个连接后,其他连接请求会阻塞 (默认为10) pool_size = 5, #连接池大小(默认为5) pool_timeout = 30, #连接线程池中,没有连接时最多等待的时间,不设置无连接时直接报错 (默认为30) pool_recycle = -1) #多久之后对线程池中的线程进行一次连接的回收(重置) (默认为-1) # def task(): # conn= engine.raw_connection() #建立原生连接,和pymysql的连接一样 # cur = conn.cursor() # cur.execute("select * from t_sales where id>%s",(2,)) # result = cur.fetchone() # cur.close() # conn.close() # print(result) # def task(): # conn = engine.contextual_connect() #建立上下文管理器连接,自动打开和关闭 # with conn: # cur = conn.execute("select * from t_sales where id>%s",(2,)) # result = cur.fetchone() # print(result) def task(): cur =engine.execute("select * from t_sales where id>%s",(2,)) #engine直接执行 result = cur.fetchone() cur.close() print(result) if __name__=="__main__": for i in range(10): t = threading.Thread(target=task) t.start() 三种方式执行原生sql
create_engine()方法的所有可选参数见下面源码:
def create_engine(*args, **kwargs): """Create a new :class:`.Engine` instance. The standard calling form is to send the URL as the first positional argument, usually a string that indicates database dialect and connection arguments:: engine = create_engine("postgresql://scott:tiger@localhost/test") Additional keyword arguments may then follow it which establish various options on the resulting :class:`.Engine` and its underlying :class:`.Dialect` and :class:`.Pool` constructs:: engine = create_engine("mysql://scott:tiger@hostname/dbname", encoding=\'latin1\', echo=True) The string form of the URL is ``dialect[+driver]://user:password@host/dbname[?key=value..]``, where ``dialect`` is a database name such as ``mysql``, ``oracle``, ``postgresql``, etc., and ``driver`` the name of a DBAPI, such as ``psycopg2``, ``pyodbc``, ``cx_oracle``, etc. Alternatively, the URL can be an instance of :class:`~sqlalchemy.engine.url.URL`. ``**kwargs`` takes a wide variety of options which are routed towards their appropriate components. Arguments may be specific to the :class:`.Engine`, the underlying :class:`.Dialect`, as well as the :class:`.Pool`. Specific dialects also accept keyword arguments that are unique to that dialect. Here, we describe the parameters that are common to most :func:`.create_engine()` usage. Once established, the newly resulting :class:`.Engine` will request a connection from the underlying :class:`.Pool` once :meth:`.Engine.connect` is called, or a method which depends on it such as :meth:`.Engine.execute` is invoked. The :class:`.Pool` in turn will establish the first actual DBAPI connection when this request is received. The :func:`.create_engine` call itself does **not** establish any actual DBAPI connections directly. .. seealso:: :doc:`/core/engines` :doc:`/dialects/index` :ref:`connections_toplevel` :param case_sensitive=True: if False, result column names will match in a case-insensitive fashion, that is, ``row[\'SomeColumn\']``. .. versionchanged:: 0.8 By default, result row names match case-sensitively. In version 0.7 and prior, all matches were case-insensitive. :param connect_args: a dictionary of options which will be passed directly to the DBAPI\'s ``connect()`` method as additional keyword arguments. See the example at :ref:`custom_dbapi_args`. :param convert_unicode=False: if set to True, sets the default behavior of ``convert_unicode`` on the :class:`.String` type to ``True``, regardless of a setting of ``False`` on an individual :class:`.String` type, thus causing all :class:`.String` -based columns to accommodate Python ``unicode`` objects. This flag is useful as an engine-wide setting when using a DBAPI that does not natively support Python ``unicode`` objects and raises an error when one is received (such as pyodbc with FreeTDS). See :class:`.String` for further details on what this flag indicates. :param creator: a callable which returns a DBAPI connection. This creation function will be passed to the underlying connection pool and will be used to create all new database connections. Usage of this function causes connection parameters specified in the URL argument to be bypassed. :param echo=False: if True, the Engine will log all statements as well as a repr() of their parameter lists to the engines logger, which defaults to sys.stdout. The ``echo`` attribute of ``Engine`` can be modified at any time to turn logging on and off. If set to the string ``"debug"``, result rows will be printed to the standard output as well. This flag ultimately controls a Python logger; see :ref:`dbengine_logging` for information on how to configure logging directly. :param echo_pool=False: if True, the connection pool will log all checkouts/checkins to the logging stream, which defaults to sys.stdout. This flag ultimately controls a Python logger; see :ref:`dbengine_logging` for information on how to configure logging directly. :param empty_in_strategy: The SQL compilation strategy to use when rendering an IN or NOT IN expression for :meth:`.ColumnOperators.in_` where the right-hand side is an empty set. This is a string value that may be one of ``static``, ``dynamic``, or ``dynamic_warn``. The ``static`` strategy is the default, and an IN comparison to an empty set will generate a simple false expression "1 != 1". The ``dynamic`` strategy behaves like that of SQLAlchemy 1.1 and earlier, emitting a false expression of the form "expr != expr", which has the effect of evaluting to NULL in the case of a null expression. ``dynamic_warn`` is the same as ``dynamic``, however also emits a warning when an empty set is encountered; this because the "dynamic" comparison is typically poorly performing on most databases. .. versionadded:: 1.2 Added the ``empty_in_strategy`` setting and additionally defaulted the behavior for empty-set IN comparisons to a static boolean expression. :param encoding: Defaults to ``utf-8``. This is the string encoding used by SQLAlchemy for string encode/decode operations which occur within SQLAlchemy, **outside of the DBAPI.** Most modern DBAPIs feature some degree of direct support for Python ``unicode`` objects, what you see in Python 2 as a string of the form ``u\'some string\'``. For those scenarios where the DBAPI is detected as not supporting a Python ``unicode`` object, this encoding is used to determine the source/destination encoding. It is **not used** for those cases where the DBAPI handles unicode directly. To properly configure a system to accommodate Python ``unicode`` objects, the DBAPI should be configured to handle unicode to the greatest degree as is appropriate - see the notes on unicode pertaining to the specific target database in use at :ref:`dialect_toplevel`. Areas where string encoding may need to be accommodated outside of the DBAPI include zero or more of: * the values passed to bound parameters, corresponding to the :class:`.Unicode` type or the :class:`.String` type when ``convert_unicode`` is ``True``; * the values returned in result set columns corresponding to the :class:`.Unicode` type or the :class:`.String` type when ``convert_unicode`` is ``True``; * the string SQL statement passed to the DBAPI\'s ``cursor.execute()`` method; * the string names of the keys in the bound parameter dictionary passed to the DBAPI\'s ``cursor.execute()`` as well as ``cursor.setinputsizes()`` methods; * the string column names retrieved from the DBAPI\'s ``cursor.description`` attribute. When using Python 3, the DBAPI is required to support *all* of the above values as Python ``unicode`` objects, which in Python 3 are just known as ``str``. In Python 2, the DBAPI does not specify unicode behavior at all, so SQLAlchemy must make decisions for each of the above values on a per-DBAPI basis - implementations are completely inconsistent in their behavior. :param execution_options: Dictionary execution options which will be applied to all connections. See :meth:`~sqlalchemy.engine.Connection.execution_options` :param implicit_returning=True: When ``True``, a RETURNING- compatible construct, if available, will be used to fetch newly generated primary key values when a single row INSERT statement is emitted with no existing returning() clause. This applies to those backends which support RETURNING or a compatible construct, including PostgreSQL, Firebird, Oracle, Microsoft SQL Server. Set this to ``False`` to disable the automatic usage of RETURNING. :param isolation_level: this string parameter is interpreted by various dialects in order to affect the transaction isolation level of the database connection. The parameter essentially accepts some subset of these string arguments: ``"SERIALIZABLE"``, ``"REPEATABLE_READ"``, ``"READ_COMMITTED"``, ``"READ_UNCOMMITTED"`` and ``"AUTOCOMMIT"``. Behavior here varies per backend, and individual dialects should be consulted directly. Note that the isolation level can also be set on a per-:class:`.Connection` basis as well, using the :paramref:`.Connection.execution_options.isolation_level` feature. .. seealso:: :attr:`.Connection.default_isolation_level` - view default level :paramref:`.Connection.execution_options.isolation_level` - set per :class:`.Connection` isolation level :ref:`SQLite Transaction Isolation <sqlite_isolation_level>` :ref:`PostgreSQL Transaction Isolation <postgresql_isolation_level>` :ref:`MySQL Transaction Isolation <mysql_isolation_level>` :ref:`session_transaction_isolation` - for the ORM :param label_length=None: optional integer value which limits the size of dynamically generated column labels to that many characters. If less than 6, labels are generated as "_(counter)". If ``None``, the value of ``dialect.max_identifier_length`` is used instead. :param listeners: A list of one or more :class:`~sqlalchemy.interfaces.PoolListener` objects which will receive connection pool events. :param logging_name: String identifier which will be used within the "name" field of logging records generated within the "sqlalchemy.engine" logger. Defaults to a hexstring of the object\'s id. :param max_overflow=10: the number of connections to allow in connection pool "overflow", that is connections that can be opened above and beyond the pool_size setting, which defaults to five. this is only used with :class:`~sqlalchemy.pool.QueuePool`. :param module=None: reference to a Python module object (the module itself, not its string name). Specifies an alternate DBAPI module to be used by the engine\'s dialect. Each sub-dialect references a specific DBAPI which will be imported before first connect. This parameter causes the import to be bypassed, and the given module to be used instead. Can be used for testing of DBAPIs as well as to inject "mock" DBAPI implementations into the :class:`.Engine`. :param paramstyle=None: The `paramstyle <http://legacy.python.org/dev/peps/pep-0249/#paramstyle>`_ to use when rendering bound parameters. This style defaults to the one recommended by the DBAPI itself, which is retrieved from the ``.paramstyle`` attribute of the DBAPI. However, most DBAPIs accept more than one paramstyle, and in particular it may be desirable to change a "named" paramstyle into a "positional" one, or vice versa. When this attribute is passed, it should be one of the values ``"qmark"``, ``"numeric"``, ``"named"``, ``"format"`` or ``"pyformat"``, and should correspond to a parameter style known to be supported by the DBAPI in use. :param pool=None: an already-constructed instance of :class:`~sqlalchemy.pool.Pool`, such as a :class:`~sqlalchemy.pool.QueuePool` instance. If non-None, this pool will be used directly as the underlying connection pool for the engine, bypassing whatever connection parameters are present in the URL argument. For information on constructing connection pools manually, see :ref:`pooling_toplevel`. :param poolclass=None: a :class:`~sqlalchemy.pool.Pool` subclass, which will be used to create a connection pool instance using the connection parameters given in the URL. Note this differs from ``pool`` in that you don\'t actually instantiate the pool in this case, you just indicate what type of pool to be used. :param pool_logging_name: String identifier which will be used within the "name" field of logging records generated within the "sqlalchemy.pool" logger. Defaults to a hexstring of the object\'s id. :param pool_pre_ping: boolean, if True will enable the connection pool "pre-ping" feature that tests connections for liveness upon each checkout. .. versionadded:: 1.2 .. seealso:: :ref:`pool_disconnects_pessimistic` :param pool_size=5: the number of connections to keep open inside the connection pool. This used with :class:`~sqlalchemy.pool.QueuePool` as well as :class:`~sqlalchemy.pool.SingletonThreadPool`. With :class:`~sqlalchemy.pool.QueuePool`, a ``pool_size`` setting of 0 indicates no limit; to disable pooling, set ``poolclass`` to :class:`~sqlalchemy.pool.NullPool` instead. :param pool_recycle=-1: this setting causes the pool to recycle connections after the given number of seconds has passed. It defaults to -1, or no timeout. For example, setting to 3600 means connections will be recycled after one hour. Note that MySQL in particular will disconnect automatically if no activity is detected on a connection for eight hours (although this is configurable with the MySQLDB connection itself and the server configuration as well). .. seealso:: :ref:`pool_setting_recycle` :param pool_reset_on_return=\'rollback\': set the :paramref:`.Pool.reset_on_return` parameter of the underlying :class:`.Pool` object, which can be set to the values ``"rollback"``, ``"commit"``, or ``None``. .. seealso:: :paramref:`.Pool.reset_on_return` :param pool_timeout=30: number of seconds to wait before giving up on getting a connection from the pool. This is only used with :class:`~sqlalchemy.pool.QueuePool`. :param plugins: string list of plugin names to load. See :class:`.CreateEnginePlugin` for background. .. versionadded:: 1.2.3 :param strategy=\'plain\': selects alternate engine implementations. Currently available are: * the ``threadlocal`` strategy, which is described in :ref:`threadlocal_strategy`; * the ``mock`` strategy, which dispatches all statement execution to a function passed as the argument ``executor``. See `example in the FAQ <http://docs.sqlalchemy.org/en/latest/faq/metadata_schema.html#how-can-i-get-the-create-table-drop-table-output-as-a-string>`_. :param executor=None: a function taking arguments ``(sql, *multiparams, **params)``, to which the ``mock`` strategy will dispatch all statement execution. Used only by ``strategy=\'mock\'``. """ strategy = kwargs.pop(\'strategy\', default_strategy) strategy = strategies.strategies[strategy] return strategy.create(*args, **kwargs) create_engine源码
2.2 执行ORM语句
A. 创建和删除表
#coding:utf-8 import datetime from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, String, Integer, DateTime, Text Base = declarative_base() class User(Base): __tablename__="users" id = Column(Integer,primary_key=True) name = Column(String(32),index=True, nullable=False) #创建索引,不为空 email = Column(String(32),unique=True) ctime = Column(DateTime, default = datetime.datetime.now) #传入方法名datetime.datetime.now extra = Column(Text,nullable=True) __table_args__ = { # UniqueConstraint(\'id\', \'name\', name=\'uix_id_name\'), #设置联合唯一约束 # Index(\'ix_id_name\', \'name\', \'email\'), # 创建索引 } def create_tbs(): engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8",max_overflow=2,pool_size=5) Base.metadata.create_all(engine) #创建所有定义的表 def drop_dbs(): engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8",max_overflow=2,pool_size=5) Base.metadata.drop_all(engine) #删除所有创建的表 if __name__=="__main__": create_tbs() #创建表 #drop_dbs() #删除表 创建和删除表格
B.表中定义外键关系(一对多,多对多)
(思考:下面代码中的一对多关系,relationship定义在了customer表中,应该定义在PurchaseOrder更合理?)
(注意:mysql数据库中避免使用order做为表的名字,order为一个mysql关键字,做为表名字时必须用反引号`order` (键盘数字1旁边的符号))
#coding:utf-8 from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column,Integer,String,Text,DateTime,ForeignKey,Float from sqlalchemy.orm import relationship import datetime engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8") #数据库有密码时,//root:12345678@127.0.0.1:3306/ Base = declarative_base() class Customer(Base): __tablename__="customer" #数据库中保存的表名字 id = Column(Integer,primary_key=True) name = Column(String(64),index=True,nullable=False) phone = Column(String(16),nullable=False) address = Column(String(256),nullable=False) purchase_order_id = Column(Integer,ForeignKey("purchase_order.id")) #关键关系,关联表的__tablename__="purchase_order" # 和建立表结构无关,方便外键关系查询,backref反向查询时使用order_obj.customer purchase_order = relationship("PurchaseOrder",backref="customer") class PurchaseOrder(Base): __tablename__ = "purchase_order" #mysql数据库中表的名字避免使用order,order为一个关键字,使用时必须用反引号`order` (键盘数字1旁边的符号) id=Column(Integer,primary_key=True) cost = Column(Float,nullable=True) ctime = Column(DateTime,default =datetime.datetime.now) desc = Column(String(528)) #多对多关系时,secondary为中间表 product = relationship("Product",secondary="order_to_product",backref="purchase_order") class Product(Base): __tablename__ = "product" id = Column(Integer,primary_key=True) name = Column(String(256)) price = Column(Float,nullable=False) class OrdertoProduct(Base): __tablename__ = "order_to_product" id = Column(Integer,primary_key=True) product_id = Column(Integer,ForeignKey("product.id")) purchase_order_id = Column(Integer,ForeignKey("purchase_order.id")) if __name__ == "__main__": Base.metadata.create_all(engine) #Base.metadata.drop_all(engine) 外键关系
C.增删改查操作
#coding:utf-8 from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column,Integer,String,Text,DateTime,ForeignKey,Float from sqlalchemy.orm import relationship,sessionmaker from sqlalchemy.sql import text import datetime engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8") #数据库有密码时,//root:12345678@127.0.0.1:3306/, 设置utf8防止中文乱码 Base = declarative_base() class Customer(Base): __tablename__="customer" #数据库中保存的表名字 id = Column(Integer,primary_key=True) name = Column(String(64),index=True,nullable=False) phone = Column(String(16),nullable=False) address = Column(String(256),nullable=False) purchase_order_id = Column(Integer,ForeignKey("purchase_order.id")) #关键关系,关联表的__tablename__="purchase_order" # 和建立表结构无关,方便外键关系查询,backref反向查询时使用order_obj.customer purchase_order = relationship("PurchaseOrder",backref="customer") class PurchaseOrder(Base): __tablename__ = "purchase_order" #mysql数据库中表的名字避免使用order,order为一个关键字,使用时必须用反引号`order` (键盘数字1旁边的符号) id=Column(Integer,primary_key=True) cost = Column(Float,nullable=True) ctime = Column(DateTime,default =datetime.datetime.now) desc = Column(String(528)) #多对多关系时,secondary为中间表 product = relationship("Product",secondary="order_to_product",backref="purchase_order") class Product(Base): __tablename__ = "product" id = Column(Integer,primary_key=True) name = Column(String(256)) price = Column(Float,nullable=False) class OrdertoProduct(Base): __tablename__ = "order_to_product" id = Column(Integer,primary_key=True) product_id = Column(Integer,ForeignKey("product.id")) purchase_order_id = Column(Integer,ForeignKey("purchase_order.id")) if __name__ == "__main__": #Base.metadata.create_all(engine) #Base.metadata.drop_all(engine) Session = sessionmaker(bind=engine) #每次进行数据库操作时都要创建session session = Session() #*****************增加数据******************** # pur_order = PurchaseOrder(cost=19.7,desc="python编程之路") # session.add(pur_order) # session.add_all( # [PurchaseOrder(cost=39.7,desc="linux操作系统"), # PurchaseOrder(cost=59.6,desc="python cookbook")]) # session.commit() #*****************修改数据******************** #session.query(PurchaseOrder).filter(PurchaseOrder.id>2).update({"cost":29.7}) #session.query(PurchaseOrder).filter(PurchaseOrder.id==2).update({"cost":PurchaseOrder.cost+40.1},synchronize_session=False) #synchronize_session用于query在进行delete or update操作时,对session的同步策略。 #session.commit() #*****************删除数据******************** #session.query(PurchaseOrder).filter(PurchaseOrder.id==1).delete() #session.commit() #*****************查询数据******************** #ret = session.query(PurchaseOrder).all() # ret = session.query(PurchaseOrder).filter(PurchaseOrder.id==2).all() #包含对象的列表 # ret = session.query(PurchaseOrder).filter(PurchaseOrder.id==2).first() #单个对象 # ret = session.query(PurchaseOrder).filter_by(id=2).all() #通过列名字的表达式 # ret = session.query(PurchaseOrder).filter_by(id=2).first() #ret = session.query(PurchaseOrder).filter(text("id<:value and cost>:price")).params(value=6,price=15).order_by(PurchaseOrder.cost).all() #ret = session.query(PurchaseOrder).from_statement(text("SELECT * FROM purchase_order WHERE cost>:price")).params(price=40).all() # print ret # for i in ret: # print i.id, i.cost, i.ctime,i.desc #ret2 = session.query(PurchaseOrder.id,PurchaseOrder.cost.label(\'totalcost\')).all() #只查询两列,ret2为列表 #print ret2 #关闭session session.close() 增删改查
查询语句
# 条件 ret = session.query(Users).filter_by(name=\'alex\').all() ret = session.query(Users).filter(Users.id > 1, Users.name == \'eric\').all() ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == \'eric\').all() ret = session.query(Users).filter(Users.id.in_([1,3,4])).all() ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all() ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name=\'eric\'))).all() from sqlalchemy import and_, or_ ret = session.query(Users).filter(and_(Users.id > 3, Users.name == \'eric\')).all() ret = session.query(Users).filter(or_(Users.id < 2, Users.name == \'eric\')).all() ret = session.query(Users).filter( or_( Users.id < 2, and_(Users.name == \'eric\', Users.id > 3), Users.extra != "" )).all() # 通配符 ret = session.query(Users).filter(Users.name.like(\'e%\')).all() ret = session.query(Users).filter(~Users.name.like(\'e%\')).all() # 限制 ret = session.query(Users)[1:2] # 排序 ret = session.query(Users).order_by(Users.name.desc()).all() ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() # 分组 from sqlalchemy.sql import func ret = session.query(Users).group_by(Users.extra).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all() # 连表 ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all() ret = session.query(Person).join(Favor).all() ret = session.query(Person).join(Favor, isouter=True).all() # 组合 q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union(q2).all() q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union_all(q2).all() 查询语句
#coding:utf-8 from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.sql import text, func from sqlalchemy_orm2 import PurchaseOrder #导入定义的PurchaseOrder表格类 engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8") Session = sessionmaker(bind=engine) session = Session() #查询 ret = session.execute("select * from purchase_order where id=:value",params={"value":3}) print ret for i in ret: print i.id, i.cost, i.ctime,i.desc #插入 purchase_order = PurchaseOrder.__table__ #拿到PurchaseOrder表格对象 ret=session.execute(purchase_order.insert(), [{"cost":46.3,"desc":\'python2\'}, {"cost":43.3,"desc":\'python3\'}]) session.commit() print(ret.lastrowid) session.close() # 关联子查询 subqry = session.query(func.count(Server.id).label("sid")).filter(Server.id == Group.id).correlate(Group).as_scalar() result = session.query(Group.name, subqry) """ SELECT `group`.name AS group_name, (SELECT count(server.id) AS sid FROM server WHERE server.id = `group`.id) AS anon_1 FROM `group` """ 补充
D.多线程操作
#coding:utf-8 from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy_orm2 import Product from threading import Thread engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8",max_overflow=0,pool_size=5) Session = sessionmaker(bind=engine) def task(name,price): session = Session() pro = Product(name=name,price=price) session.add(pro) session.commit() session.close() if __name__=="__main__": for i in range(6): t = Thread(target=task,args=("pro"+str(i),i*5)) t.start() 多线程
E. 通过relationship操纵一对多和多对多关系
#coding:utf-8 from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.sql import text, func from sqlalchemy_orm2 import PurchaseOrder,Product,OrdertoProduct,Customer #导入定义的表格类 engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8") Session = sessionmaker(bind=engine) session = Session() # #通过定义的关键关系添加(id值) # cus1 = Customer(name="zack",phone="13567682333",address="Nanjing",purchase_order_id=3) # session.add(cus1) # #通过relationship正向添加 # cus2 = Customer(name="zack2",phone="13567682333",address="Nanjing",purchase_order=PurchaseOrder(cost=53,desc="java")) # session.add(cus2) # session.commit() #通过relationship反向添加 # purchase_order=PurchaseOrder(cost=53,desc="php") # cus3 = Customer(name="zack3",phone="13567682333",address="Nanjing") # cus4 = Customer(name="zack4",phone="13567682333",address="Nanjing") # purchase_order.customer=[cus3,cus4] #cus3,cus4的purchase_order_id都是purchase_order.id值,即同时添加了两组外键关系 # session.add(purchase_order) # session.commit() ##通过relationship正向查询 cus = session.query(Customer).first() print(cus.purchase_order_id) print(cus.purchase_order.desc) #通过relationship反向查询 pur = session.query(PurchaseOrder).filter(PurchaseOrder.id==3).first() print(pur.desc) print(pur.customer) #返回一个list 一对多
#coding:utf-8 from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.sql import text, func from sqlalchemy_orm2 import PurchaseOrder,Product,OrdertoProduct,Customer #导入定义的表格类 engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8") Session = sessionmaker(bind=engine) session = Session() # session.add_all([Product(name="java",price=24), # Product(name="python",price=34), # Product(name="php",price=27)]) # session.commit() # #通过定义的关键关系添加(id值) # op = OrdertoProduct(product_id=1,purchase_order_id=16) # session.add(op) # session.commit() # #通过relationship添加 # pur = PurchaseOrder(cost=27,desc="xxxx") # pur.product = [Product(name="C++",price=60),] #正向 # session.add(pur) # pro = Product(name="C",price=40) # pro.purchase_order=[PurchaseOrder(cost=27,desc="xxxx"),] #反向 # session.add(pro) # session.commit() #通过relationship正向查询 pur = session.query(PurchaseOrder).filter(PurchaseOrder.id==19).first() print(pur.desc) print(pur.product) #结果为列表 #通过relationship反向查询 pro = session.query(Product).filter(Product.id==5).first() print(pro.name) print(pro.purchase_order) #结果为列表 session.close() 多对多
3.数据库连接池
对于ORM框架,其内部维护了链接池,可以直接通过多线程操控数据库。对于pymysql模块,通过多线程操控数据库容易出错,得加锁串行执行。进行并发时,可以利用DBUtils模块来维护数据库连接池。
3.1 多线程操控pymysql
不采用DBUtils连接池时, pymysql多线程代码如下:
import pymysql import threadind #**************************无连接池******************************* #每个线程都要创立一次连接,线程并发操作间可能有问题? def func(): conn = pymysql.connect(host="127.0.0.1",port=3306,db="learningsql",user="root",passwd="",charset="utf8") cursor = conn.cursor() cursor.execute("select * from user where nid>1;") result = cursor.fetchone() print(result) cursor.close() conn.close() if __name__=="__main__": for i in range(5): t = threading.Thread(target=func,name="thread-%s"%i) t.start() 每个线程创建连接
#**************************无连接池******************************* #创建一个连接,加锁串行执行 from threading import Lock import pymysql import threading conn = pymysql.connect(host="127.0.0.1",port=3306,db="learningsql",user="root",passwd="",charset="utf8") lock = Lock() def func(): with lock: cursor = conn.cursor() cursor.execute("select * from user where nid>1;") result = cursor.fetchone() print(result) cursor.close() #conn.close()不能在线程中关闭连接,否则其他线程不可用了 if __name__=="__main__": threads = [] for i in range(5): t = threading.Thread(target=func,name="thread-%s"%i) threads.append(t) t.start() for t in threads: t.join() conn.close() 一个连接串行执行
3.2 DBUtils连接池
DBUtils连接池有两种连接模式:PersistentDB和PooledDB (官网文档:https://cito.github.io/DBUtils/UsersGuide.html)
模式一(DBUtils.PersistentDB):为每个线程创建一个连接,线程即使调用了close方法,也不会关闭,只是把连接重新放到连接池,供自己线程再次使用。当线程终止时,连接自动关闭。
PersistentDB使用代码如下:
#coding:utf-8 from DBUtils.PersistentDB import PersistentDB import pymysql import threading pool = PersistentDB( creator = pymysql, # 使用链接数据库的模块 maxusage = None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping = 0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always closeable = False, # 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接) threadlocal = None, # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置 host="127.0.0.1", port = 3306, user = "root", password="", database="learningsql", charset = "utf8" ) def func(): conn = pool.connection() cursor = conn.cursor() cursor.execute("select * from user where nid>1;") result = cursor.fetchone() print(result) cursor.close() conn.close() if __name__ == "__main__": for i in range(5): t = threading.Thread(target=func,name="thread-%s"%i) t.start() PersistentDB 使用
模式二(DBUtils.PooledDB):创建一批连接到连接池,供所有线程共享使用。
(由于pymysql、MySQLdb等threadsafety值为1,所以该模式连接池中的线程会被所有线程共享。)
PooledDB使用代码如下:
from DBUtils.PooledDB import PooledDB import pymysql import threading import time pool = PooledDB( creator = pymysql, # 使用链接数据库的模块 maxconnections = 6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached = 2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached = 5, # 链接池中最多闲置的链接,0和None不限制 maxshared = 3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 blocking = True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage = None, # 一个链接最多被重复使用的次数,None表示无限制 setsession = [], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping = 0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host="127.0.0.1", port = 3306, user="root", password="", database = "learningsql", charset = "utf8" ) def func(): conn = pool.connection() cursor = conn.cursor() cursor.execute("select * from user where nid>1;") result = cursor.fetchone() print(result) time.sleep(5) #为了查看mysql端的线程数量 cursor.close() conn.close() if __name__=="__main__": for i in range(5): t = threading.Thread(target=func,name="thread-%s"%i) t.start() PooledDB使用
上述代码中加入了sleep(5)使线程连接数据库时间延长,方便查看mysql数据库连接线程情况,下图分别为代码执行中和执行后的线程连接情况,可以发现,代码执行时,同时有6个线程连接上了数据库(有一个为mysql命令客户端),代码执行后,只有一个线程连接数据库,但仍有5个线程等待连接。
(show status like "Threads%" 查看线程连接情况)