Python Flask 数据库开发
- 引言
- 环境配置
- 创建 Flask 应用,连接数据库
- 定义路由
- 定义模型
- 创建表
- 创建 API
- 数据库直接操作
- 启动 Flask 应用
- app.py 示例
- 运行 Flask
- 访问应用
- 展望
引言
在现代 web 开发中,Python 的 Flask 框架因其轻量和灵活性受到广泛欢迎。结合数据库技术,Flask 可以高效地管理和处理数据,使开发者能够快速构建功能强大的应用程序。无论是选择 MySQL 还是 PostgreSQL,掌握数据库与 Flask 的集成至关重要。本文将探讨如何在 Flask 中设置和使用数据库,涵盖从环境配置到基本操作的各个方面,便于我们去实现数据驱动的应用。
环境配置
在开始之前,需要确保 Python 环境已安装 Flask 模块。可以通过 pip 安装 Flask 和 SQLAlchemy(Flask 的 ORM 工具):
pip install Flask Flask-SQLAlchemy
接下来,根据所选数据库的类型(如 MySQL 或 PostgreSQL),部署相应的数据库服务。
详情可参考:MySQL && PostgreSQL 数据库部署
创建 Flask 应用,连接数据库
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
# 创建了一个 Flask 应用实例。__name__ 是 Python 的一个特殊变量,它指向当前模块的名字。
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://user:password@localhost/dbname'
# mysql://user:password@localhost/dbname 的格式是:
# mysql:数据库类型。
# user:数据库用户名。
# password:数据库密码。
# localhost:数据库服务器地址(这里是本地)。
# dbname:要连接的数据库名称。
db = SQLAlchemy(app)
# 创建了一个 SQLAlchemy 对象,并将 Flask 应用实例传递给它。这样,SQLAlchemy 就可以使用 Flask 应用的配置来管理数据库操作
# @app.route 是 Flask 中的装饰器,用于定义路由。它将一个 URL 路径与一个视图函数关联起来。简单来说,当用户访问指定的 URL 时,Flask 会调用对应的视图函数并返回结果
@app.route('/')
def home():
return 'Hello, Flask!'
if __name__ == '__main__':
app.run(debug=True)
定义路由
@app.route 是 Flask 中的装饰器,用于定义路由。它将一个 URL 路径与一个视图函数关联起来。简单来说,当用户访问指定的 URL 时,Flask 会调用对应的视图函数并返回结果。例如:
@app.route('/')
def home():
return 'Hello, Flask!'
在这个例子中,访问根 URL (/) 时,用户会看到 “Hello, Flask!” 的消息。通过使用不同的路径和方法(如 GET 或 POST),我们可以创建丰富的 Web 应用。
定义模型
使用 SQLAlchemy 的 ORM(对象关系映射)定义数据库模型是与数据库交互的第一步。数据库模型是用于定义和组织数据库中数据结构的抽象框架。它描述了数据的类型、关系、约束以及如何在数据库中存储和检索数据。
一个清晰的数据库模型会大大提高开发效率和数据管理的质量,比如说:
- 创建表:通过模型定义表的结构(字段类型、约束等)。
- 执行 CRUD 操作:模型使得创建、读取、更新和删除数据变得简单。
- 维护数据完整性:模型可以设置约束(如唯一性、外键),确保数据的正确性。
- 简化查询:通过模型,可以使用 ORM(对象关系映射)库,轻松编写查询。
# 创建一个名为 DiagnosisCategory 的类,继承自 db.Model,这是 SQLAlchemy 中所有模型类的基类
class DiagnosisCategory(db.Model):
__tablename__ = 'diagnosis_category' # 设置表名
# 定义一个名为 id 的列,类型为 Integer。primary_key=True 表示这个列是主键,用于唯一标识每个用户。
id = db.Column(db.Integer, primary_key=True)
# 定义一个名为 name 的列,类型为 String,最大长度为 80。unique=True 表示这个列的值在数据库中必须是唯一的,nullable=False 表示该字段不能为空。
name = db.Column(db.String(80), unique=True, nullable=False)
def __repr__(self):
return f'<User {self.username}>'
创建表
要创建 diagnosis_category
这张表,只需在应用上下文中调用 db.create_all(),就会自动生成 User 表。这样,数据库中就会有一个新的 DiagnosisCategory 表 (diagnosis_category) 用于存储用户信息。
class DiagnosisCategory(db.Model):
__tablename__ = 'diagnosis_category'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(80))
def __repr__(self):
return f'<User {self.username}>'
# 创建表
with app.app_context():
db.create_all()
创建 API
使用 Flask-Restless 创建的 API 可以通过 HTTP 请求进行 CRUD 操作。其中,collection_name 定义了访问特定资源的 URL 路径。
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_restless import APIManager
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///yourdatabase.db'
db = SQLAlchemy(app)
# 定义模型
class DiagnosisCategory(db.Model):
__tablename__ = 'diagnosis_category'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(80))
# 创建数据库表
with app.app_context():
db.create_all()
# 创建 API 管理器
manager = APIManager(app, session=db.session)
# 创建 API 端点
manager.create_api(DiagnosisCategory, collection_name='category',
methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH'])
访问这个 API 的路径将是 /api/category,前端代码(ts、js等)可以通过这个路径对后端服务器进行 CRUD 操作。
// 获取所有分类(GET 请求):
GET /api/category
// 创建新分类(POST 请求):
POST /api/category
// 获取特定分类(GET 请求,假设 ID 为 1):
GET /api/category/1
// 更新特定分类(PUT 请求,假设 ID 为 1):
PUT /api/category/1
// 删除特定分类(DELETE 请求,假设 ID 为 1):
DELETE /api/category/1
数据库直接操作
在 Flask 中,也可以使用 SQLAlchemy 提供的 API 直接操作数据库,使用 Python 对象进行 CRUD。
# 创建
new_user = User(username='example') # 创建新用户实例
db.session.add(new_user) # 将新用户添加到会话
db.session.commit() # 提交事务以保存更改
# 查询
users = User.query.all() # # 查询所有用户
# 更新
user = User.query.first() # # 查询第一个用户
user.username = 'new_username'
db.session.commit()
# 删除
db.session.delete(user) # 删除用户
db.session.commit()
启动 Flask 应用
app.py 示例
我们以一个实际应用为例:连接远程主机(10.2.0.92)上的 MySQL数据库,创建 t_diag_category、t_diag_model、t_diag_test_suite、t_diag_test_target、t_diag_test_result 五张表,定义相应的 CRUD 接口,实现基于表的SN字段查重、关系表获取等额外操作。
# -*- coding: UTF-8 -*-
import os
import shutil
import sys
import argparse
import json
from flask import Flask, make_response, request, jsonify, send_from_directory
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import text
from flask_restless import APIManager
from datetime import datetime
CURRENT_PATH = os.path.dirname(os.path.realpath(sys.argv[0]))
DB_SERVER = '10.2.0.92'
DB_PORT = '3306'
DB_USERNAME = 'root'
DB_PASSWORD = '123456'
DB_NAME = 'diagnosisdev'
app = Flask(__name__)
db = None
manager = None
def initialize():
global app
global db
global manager
# Database
app.config[
'SQLALCHEMY_DATABASE_URI'] = f'mysql://{DB_USERNAME}:{DB_PASSWORD}@{DB_SERVER}:{DB_PORT}/{DB_NAME}?charset=utf8mb4'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
db = SQLAlchemy(app)
manager = APIManager(app, session=db.session)
class DiagnosisCategory(db.Model):
__tablename__ = 't_diag_category'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
parent_id = db.Column(db.Integer, nullable=False, default=-1)
name = db.Column(db.Text)
create_user = db.Column(db.Integer)
create_time = db.Column(db.DateTime, default=datetime.utcnow)
modify_user = db.Column(db.Integer)
modify_time = db.Column(db.DateTime, default=datetime.utcnow)
remark = db.Column(db.Text)
enabled = db.Column(db.Integer, default=1)
class DiagnosisModel(db.Model):
__tablename__ = 't_diag_model'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
parent_id = db.Column(db.Integer, nullable=False, default=-1)
category_id = db.Column(
db.Integer, db.ForeignKey('t_diag_category.id'))
category = db.relationship('DiagnosisCategory', backref='t_diag_model')
name = db.Column(db.Text)
create_user = db.Column(db.Integer)
create_time = db.Column(db.DateTime, default=datetime.utcnow)
modify_user = db.Column(db.Integer)
modify_time = db.Column(db.DateTime, default=datetime.utcnow)
remark = db.Column(db.Text)
enabled = db.Column(db.Integer, default=1)
image_url = db.Column(db.Text)
class TestSuite(db.Model):
'''
for test:
curl http://127.0.0.1:50000/api/test_suite -H "Accept: application/vnd.api+json"
curl http://127.0.0.1:50000/api/test_suite/1 -H "Accept: application/vnd.api+json"
curl -X POST http://127.0.0.1:50000/api/test_suite -H "Content-Type: application/vnd.api+json" -H "Accept: application/vnd.api+json" -d '{"data":{"type": "test_suite", "attributes": {"name": "xxx", "data": "123"}}}'
curl -X PATCH http://127.0.0.1:50000/api/test_suite/1 -H "Content-Type: application/vnd.api+json" -H "Accept: application/vnd.api+json" -d '{"data":{"type": "test_suite", "id": "1", "attributes": {"name": "xxx", "data": "123"}}}'
'''
__tablename__ = 't_diag_test_suite'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
name = db.Column(db.Text)
type = db.Column(db.Text)
model_id = db.Column(db.Integer, db.ForeignKey('t_diag_model.id'))
model = db.relationship('DiagnosisModel', backref='t_diag_test_suite')
report_name = db.Column(db.Text)
data = db.Column(db.Text)
create_user = db.Column(db.Integer)
create_time = db.Column(db.DateTime, default=datetime.utcnow)
modify_user = db.Column(db.Integer)
modify_time = db.Column(db.DateTime, default=datetime.utcnow)
remark = db.Column(db.Text)
enabled = db.Column(db.Integer, default=1)
sop_url = db.Column(db.Text)
case_type = db.Column(db.INT)
# ALTER TABLE t_diag_test_suite ADD COLUMN sop_url TEXT;
# ALTER TABLE t_diag_test_suite DROP COLUMN sop_url;
class TestTarget(db.Model):
__tablename__ = 't_diag_test_target'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
parent_id = db.Column(db.Integer, nullable=False, default=-1)
model_id = db.Column(db.Integer, db.ForeignKey('t_diag_model.id'))
model = db.relationship('DiagnosisModel', backref='t_diag_test_target')
name = db.Column(db.Text)
sn = db.Column(db.Text)
is_hardware_changed = db.Column(db.Integer, default=0)
create_user = db.Column(db.Integer)
create_time = db.Column(db.DateTime, default=datetime.utcnow)
modify_user = db.Column(db.Integer)
modify_time = db.Column(db.DateTime, default=datetime.utcnow)
remark = db.Column(db.Text)
enabled = db.Column(db.Integer, default=1)
class TestResult(db.Model):
__tablename__ = 't_diag_test_result'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
target_id = db.Column(
db.Integer, db.ForeignKey('t_diag_test_target.id'))
target = db.relationship('TestTarget', backref='t_diag_test_result')
test_suite_id = db.Column(
db.Integer, db.ForeignKey('t_diag_test_suite.id'))
test_suite = db.relationship('TestSuite', backref='t_diag_test_result')
name = db.Column(db.Text)
data = db.Column(db.Text)
create_user = db.Column(db.Integer)
create_time = db.Column(db.DateTime, default=datetime.now)
modify_user = db.Column(db.Integer)
modify_time = db.Column(db.DateTime, default=datetime.now)
remark = db.Column(db.Text)
enabled = db.Column(db.Integer, default=1)
with app.app_context():
db.create_all()
manager.create_api(DiagnosisCategory, collection_name='category',
methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH'])
manager.create_api(DiagnosisModel, collection_name='model',
methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH'])
manager.create_api(TestSuite, collection_name='test_suite',
methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH'])
manager.create_api(TestTarget, collection_name='test_target',
methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH'])
manager.create_api(TestResult, collection_name='test_result',
methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH'])
@app.route('/')
def hello():
return 'Hello, world!'
@app.route('/api/is_scrapped_sn/<sn>', methods=['GET'])
def is_scrapped_sn(sn):
try:
result = db.session.execute(
text('SELECT COUNT(1) FROM t_diag_test_target WHERE sn = :sn AND scrap = 1'), {'sn': sn})
return '1' if list(result)[0][0] > 0 else '0'
except Exception as e:
print(e)
return '0'
@app.route('/api/is_duplicate_primary_sn/<sn>', methods=['GET'])
def is_duplicate_primary_sn(sn):
try:
result = db.session.execute(
text('SELECT COUNT(1) FROM t_diag_test_target WHERE sn = :sn AND enabled = 1 AND parent_id = -1'), {'sn': sn})
return '1' if list(result)[0][0] > 0 else '0'
except Exception as e:
print(e)
return '0'
@app.route('/api/is_duplicate_relationship_sn/<sn>', methods=['GET'])
def is_duplicate_relationship_sn(sn):
try:
result = db.session.execute(
text('SELECT COUNT(1) FROM t_diag_test_target WHERE sn = :sn AND enabled = 1 AND parent_id != -1'), {'sn': sn})
return '1' if list(result)[0][0] > 0 else '0'
except Exception as e:
print(e)
return '0'
@app.route('/api/disable_sn/<sn>', methods=['GET'])
def disable_sn(sn):
with db.session.begin():
try:
db.session.execute(
text('UPDATE t_diag_test_target SET enabled = 0 WHERE parent_id IN (SELECT id FROM t_diag_test_target WHERE sn = :sn)'), {'sn': sn})
db.session.execute(
text('UPDATE t_diag_test_target SET enabled = 0 WHERE sn = :sn'), {'sn': sn})
db.session.commit()
return '0'
except Exception as e:
print(e)
db.session.rollback()
return '-1'
@app.route('/api/get_relationship_sn/<sn>', methods=['GET'])
def get_relationship_sn(sn):
try:
result = db.session.execute(
text('WITH RECURSIVE cte(id, parent_id, sn, psn) AS (SELECT id, parent_id, sn, psn FROM v_diag_valid_test_target WHERE sn = :sn UNION ALL SELECT t1.id, t1.parent_id, t1.sn, t1.psn FROM v_diag_valid_test_target t1, cte WHERE t1.psn = cte.sn) SELECT * FROM cte'), {'sn': sn})
data = [row[2