用flask整了个商品服务

时间:2021-02-03 00:39:49


用flask整了个商品服务

今天使用flask练习下简单的商品服务,主要使用mysql数据库和redis,并小小压测了

准备数据

此处就一个表,用来存储商品name,price,description

mysql> show create table products\G
*************************** 1. row ***************************
Table: products
Create Table: CREATE TABLE `products` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) NOT NULL,
`price` decimal(10,2) NOT NULL,
`description` text,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4
1 row in set (0.00 sec)

准备点数据,数据可以自己造,也可以使用faker去造,也可以自行采集等,此处提供简单样例

INSERT INTO products (name, price, description) VALUES
('小米11', 3999.00, '智能手机'),
('Redmi Note 10', 999.00, '智能手机'),
('小米电视', 1999.00, '智能电视'),
('小米笔记本', 4999.00, '笔记本电脑'),
('小米手环6', 229.00, '智能手环'),
('小米路由器 AX9000', 799.00, '智能路由器'),
('小爱音箱', 199.00, '智能音箱'),
('小米智能家庭套装', 399.00, '智能家居控制'),
('小米米家空气净化器', 1299.00, '智能家居净化器'),
('MIUI 13', 0.00, '移动操作系统'),
('小爱开放平台', 0.00, 'AI开放平台'),
('米家商城', 0.00, '小米电商平台'),
('小米有品', 0.00, '小米生态品牌电商');

如果使用faker创建随机数据可以直接再flask中提供接口

# 略
from faker import Faker
fake = Faker()

@app.route('/generate_products')
def generate_products():
for _ in range(100):
product = Products(
name=fake.word(),
price=fake.pydecimal(left_digits=2, right_digits=2, positive=True),
descriptinotallow=fake.sentence()
)
db.session.add(product)
db.session.commit()
return jsonify({'message': 'Products generated successfully.'})
# 略

使用SQLAlchemy来查询mysql

相关包有两个:

  • SQLAlchemy:
  • flask-sqlalchemy:

此处博主使用flask-sqlalchemy,因为后面分页要用到paginate()方法,这个方法SQLAlchemy包中不存在

在config配置文件中配置数据库信息

期间博主遇到数据库密码存在特殊字符​​@​​​,导致连接数据库出错,后面使用​​urlquote​​ 方式给它增加引号避免连接串format时解析错误

# app/config/myconfig.py

# 解决密码中有特殊字符
from urllib.parse import quote_plus as urlquote
class Config:
mysql_username = 'products'
mysql_passwd = '1qaz@WSX'
mysql_ip_addr = '192.168.44.170'
mysql_port = 3306
mysql_database = 'products'
SQLALCHEMY_DATABASE_URI= f'mysql+pymysql://{mysql_username}:{urlquote(mysql_passwd)}@{mysql_ip_addr}:{mysql_port}/{mysql_database}?charset=utf8'.format(
mysql_username, mysql_passwd, mysql_ip_addr, mysql_port, mysql_database)
SQLALCHEMY_TRACK_MODIFICATIONS = False

在models中配置products模型

# app/models.py

from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy(app)

class Products(db.Model):
__tablename__ = 'products'
id = db.Column(db.INTEGER, autoincrement=True, primary_key=True)
name = db.Column(db.String(20), nullable=False)
price = db.Column(db.Numeric(10, 2), nullable=False)
description = db.Column(db.Text, nullable=True)

在app.py中初始数据库模型

# app/app.py
from flask import Flask, jsonify, request

from config.myconfig import Config
from models import db,Products

# 增加打印日志
import logging
LOG_FORMAT = '%(asctime)s -%(name)s- %(threadName)s-%(thread)d - %(levelname)s - %(message)s'
DATE_FORMAT = "%m/%d/%Y %H:%M:%S %p"
# 日志配置
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, datefmt=DATE_FORMAT)

app = Flask(__name__)
app.config.from_object(Config)
db.init_app(app)

@app.route('/')
def hello_world():
return 'Hello World!'

if __name__ == '__main__':
app.run()

创建视图接口

根据商品id查询商品

# app/app.py
# 从mysql中获取数据
@app.route('/products/<int:product_id>', methods=['GET'])
def get_product(product_id):
product = Products.query.get_or_404(product_id)
return jsonify({'product': product})

运行起来报了个错误

raise TypeError(f"Object of type {type(o).__name__} is not JSON serializable")
TypeError: Object of type Products is not JSON serializable
03/02/2023 23:00:38 PM -werkzeug- Thread-1-16752 - INFO - 127.0.0.1 - - [02/Mar/2023 23:00:38] "GET /products_from_mysql/2 HTTP/1.1" 500 -

是因为object对象不支持json序列化,所以只好在models中提供一个转化方法to\_dict()

# app/models.py
class Products(db.Model):

# 略

# 提供返回dict类型数据
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'price': float(self.price),
'description': self.description
}

这时我们的视图方法如下

# app/app.py
# 从mysql中获取数据
# 从mysql中获取数据
@app.route('/products_from_mysql/<int:product_id>', methods=['GET'])
def get_product_from_mysql(product_id):
product = Products.query.get_or_404(product_id).to_dict()
app.logger.info(f'Product {product_id} is retrieved from MySQL database: {product}')
return jsonify({'product': product})

用flask整了个商品服务

部署到服务器上

使用docker部署到服务器:2c4g,数据库服务器另外一台2c4g

项目目录结构

[root@docker-node app]# tree
.
├── app.py
├── config
│ └── myconfig.py
├── Dockerfile
├── gunicorn.py
├── README.md
└── requirements.txt

dockerfile配置文件

[root@docker-node app]# cat Dockerfile
# FROM ubuntu:18.04
FROM python:3.9.5-slim
#RUN apt-get update -y && \
# apt-get install -y python3-pip python3-dev

WORKDIR /app


copy requirements.txt /app/requirements.txt
RUN pip3 install \
-i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com \
-r requirements.txt

COPY . /app

EXPOSE 5000
ENTRYPOINT [ "gunicorn", "app:app", "-c", "gunicorn.py" ]

gunicorn启动文件

[root@docker-node app]# cat gunicorn.py
# 并行进程数量
workers = 2
# 每个进程的线程数
thread = 1
# #gunicorn监控的接口
bind = '0.0.0.0:5000'
# 进程pid文件
pidfile = 'gunicorn.pid' # gunicorn进程id,kill掉该文件的id,gunicorn就停止

logfile = './debug.log' # debug日志
errorlog = './error.log' # 错误信息日志

loglevel = 'debug'
logfile = './debug.log' # debug日志
errorlog = './error.log' # 错误信息日志
timeout = 90

#https://github.com/benoitc/gunicorn/issues/1194
keepalive = 75 # needs to be longer than the ELB idle timeout
worker_class = 'gevent'

使用locust浅压一下

在本地使用locust创建个压测任务

from locust import HttpUser, TaskSet, task
import random
class MyTaskSet(TaskSet):
@task
def get_product(self):
id = random.randint(1,100)
self.client.get("/{id}".format(id=id))

class MyUser(HttpUser):
host = "http://192.168.44.136:8006/products_from_mysql"
tasks = [MyTaskSet]
min_wait = 100
max_wait = 200

用flask整了个商品服务

压测的结果

用flask整了个商品服务

服务器监控

170为数据库服务器,136为项目服务器,看样子服务器cpu干到70%上

用flask整了个商品服务

用flask整了个商品服务

增加redis缓存

使用redis来缓存商品信息,过期时间300s,首先查询缓存,未查找到数据后再去查mysql,然后更新数据到缓存中

配置redis缓存

# app/config/myconfig.py

# 解决密码中有特殊字符
from urllib.parse import quote_plus as urlquote
class Config:

# 略
CACHE_TYPE = 'redis'
CACHE_REDIS_HOST = '192.168.44.170'
CACHE_REDIS_PORT = 6379
CACHE_REDIS_PASSWORD = '1qaz@WSX'
CACHE_DEFAULT_TIMEOUT = 300

视图中使用缓存

# app/app.py
# 略
from flask_caching import Cache
cache = Cache(app)

@app.route('/products/<int:product_id>', methods=['GET'])
def get_product(product_id):
cache_key = f'product:{product_id}'
product = cache.get(cache_key)
if not product:
product = Products.query.get_or_404(product_id).to_dict()
app.logger.info(f'Product {product_id} is retrieved from MySQL database: {product}')
cache.set(cache_key, product)
else:
app.logger.info(f'Product {product_id} is retrieved from Redis database: {product}')
return jsonify({'product': product})

# 略

用flask整了个商品服务

对应日志

03/03/2023 00:56:01 AM -app- Thread-3-7584 - INFO - Product 20 is retrieved from MySQL database: {'id': 20, 'name': 'Surface Laptop 4', 'price': 1299.99, 'description': '笔记本电脑'}
03/03/2023 00:56:01 AM -werkzeug- Thread-3-7584 - INFO - 127.0.0.1 - - [03/Mar/2023 00:56:01] "GET /products/20 HTTP/1.1" 200 -
03/03/2023 00:56:11 AM -app- Thread-4-15520 - INFO - Product 20 is retrieved from Redis database: {'id': 20, 'name': 'Surface Laptop 4', 'price': 1299.99, 'description': '笔记本电脑'}
03/03/2023 00:56:11 AM -werkzeug- Thread-4-15520 - INFO - 127.0.0.1 - - [03/Mar/2023 00:56:11] "GET /products/20 HTTP/1.1" 200 -

locust压测测试

from locust import HttpUser, TaskSet, task
import random
class MyTaskSet(TaskSet):
@task
def get_product(self):
id = random.randint(1,100)
self.client.get("/{id}".format(id=id))

class MyUser(HttpUser):
host = "http://192.168.44.136:8006/products"
tasks = [MyTaskSet]
min_wait = 100
max_wait = 200

看样子时延降低个20%-30%,rps有所增加

用flask整了个商品服务

项目服务器和数据库服务器的cpu比之前降低了40%-50%

用flask整了个商品服务

用flask整了个商品服务