1 开发环境搭建
1.1 Windows环境
- 下载Python。
- 下载PyCharm。
- 下载virtualenv。
- 下载MySQL。可以安转一个数据库GUI。
1.2 Linux环境
- 下载VMware Workstation Pro。
- 下载ubuntu和xshell。
用xshell需要是虚拟机桥接。
- 下载Python。
pip install 名字 -i https://mirrors.aliyun.com/pypi/simple
- 下载PyCharm。
整个ubuntu还好说,但要是个最小版的Centos,那真就有点小离谱了,所以整个共享目录。
- 下载virtualenv。
为了方便直接使用。
ln -s /usr/local/lib/python3.8/dist-packages/virtualenv /usr/bin/virtualenv
virtualenv -p python3 vir_pyhon3
source vir_python3/bin/activate
- 下载MySQL。
sudo apt install mysql-server
sudo service mysql start
https://blog.csdn.net/qq_57309855/article/details/127602061?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522167319614116800213055420%2522%252C%2522scm%2522%253A%252220140713.130102334…%2522%257D&request_id=167319614116800213055420&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2allsobaiduend~default-1-127602061-null-null.142v70control,201v4add_ask&utm_term=ERROR%201064%20%2842000%29%3A%20You%20have%20an%20error%20in%20your%20SQL%20syntax%3B%20check%20the%20manual%20that%20corresponds%20to%20your%20MySQL%20server%20version%20for%20the%20right%20syntax%20to%20use%20near%20%E2%80%98root%E2%80%99%20at%20line%201&spm=1018.2226.3001.4187
远程连接:
https://blog.csdn.net/a648119398/article/details/122420906?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522167319661216800215020531%2522%252C%2522scm%2522%253A%252220140713.130102334…%2522%257D&request_id=167319661216800215020531&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2alltop_positive~default-1-122420906-null-null.142v70control,201v4add_ask&utm_term=navicat%E8%BF%9C%E7%A8%8B%E8%BF%9E%E6%8E%A5mysql&spm=1018.2226.3001.4187
最后一个问题是发现我的my.cnf中并没有需要注释的代码,反而在/etc/mysql/mysql.conf.d/mysqld.cnf文件中,注释掉就好了。
本人所写的博客:https://blog.csdn.net/weixin_61823031/article/details/128608050
2 Flask框架简介
2.1 MVC框架对比
- Flask:微框架。易学习、Jinja2模板、内置服务器、扩展丰富,易扩展。
- Django:全能框架、学习成本大
- Tornado:高性能、异步处理、扩展不多
- Bottle:小巧
2.2 下载
pip install flask
2.3 运行
Windows下直接使用PyCharm进行。
Linux下运行:
export FLASK_APP=app.py
flask run host 0.0.0.0
然后就可以使用Linux的主机IP地址在浏览器上进行访问即可。
2.4 Flask相关
2.5 Flask配置
网络配置
监听网卡:本地网卡(127.0.0.1),整块网卡(0.0.0.0)
这里导入配置的方法,本人选择使用导入py文件配置。
如下:
from flask import Flask
import configs
# 初始化实例
app = Flask(__name__)
# 加载配置文件
app.config.from_object(configs)
@app.route('/')
def hello_world(): # put application's code here
return 'Hello World!'
if __name__ == '__main__':
app.run(host="0.0.0.0")
DEBUG = True
一直有个警告气死我了,调的时候还导致多了第一句废话,更气了!
3 Flask路由和请求对象
3.1 路由方式
- route():内部调用add_url_rule()
- add_url_rule()
加入参数的方式。
@app.route("/<name>"):
def hello(name)
return "hello, %s" % name
- 蓝图
一般使用两个文件来实现,类似如下所示。
from flask import Flask
from controller import start
import configs
# 初始化实例
app = Flask(__name__)
# 加载配置文件
app.config.from_object(configs)
app.register_blueprint(start, url_prefix="/start")
if __name__ == '__main__':
app.run(host="0.0.0.0")
from flask import Flask, Blueprint
start = Blueprint("start", __name__)
@start.route("/")
def hello():
return "hello, world!"
@start.route("/my")
def hello2():
return "hello, two!"
3.2 HTTP请求
- DNS解析:根据域名获取IP
- HTTP请求
- HTTP响应
获取请求对象的GET和POST参数
请求的示例代码:
@start.route("/get")
def get():
var_a = request.args.get("a", "hello")
return "request:%s, params:%s,var_a:%s"%(request.method, request.args, var_a)
@start.route("/post", methods=["POST"])
def post():
var_a = request.form['a'] if "a" in request.form else ''
return "request:%s, params:%s,var_a:%s"%(request.method, request.form, var_a)
post请求直接刷新浏览器是不行的,这里使用了hackbar这样的一款插件来发送post请求。
还有一种不需要区分的方式:
@start.route("/get")
def get():
req = request.values
var_a = req['a'] if 'a' in req else "none"
# var_a = request.args.get("a", "hello")
return "request:%s, params:%s,var_a:%s"%(request.method, request.args, var_a)
@start.route("/post", methods=["POST"])
def post():
req = request.values
var_a = req['a'] if 'a' in req else "none"
# var_a = request.form['a'] if "a" in request.form else ''
return "request:%s, params:%s,var_a:%s"%(request.method, request.form, var_a)
上传文件
@start.route("/upload", methods=["POST"])
def upload():
f = request.files['file'] if "file" in request.files else None
return "request:%s, params:%s,file:%s"%(request.method, request.files, f)
使用Postman进行测试。
如果对于同一个参数a,同时使用get和post的传参,request.values会接受get方式的请求。
4 Flask响应对象和模板
4.1 Flask响应
- text/html
@start.route("/text")
def text():
return "text/html"
@start.route("/text_same")
def text_same():
response = make_response("text/html", 200)
return response
- Json
@start.route("/json")
def json():
import json
data = {"a": "b"}
# 默认text,所以要改成json
response = make_response(json.dumps(data))
response.headers["Content-Type"] = "application/json"
return response
@start.route("/json_same")
def json_same():
data = {"a": "b"}
response = make_response(jsonify(data))
return response
- 模板响应
@start.route("/template")
def template():
return render_template("index.html")
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
hello , template!
</body>
</html>
官方文档中的说明。
4.2 模板引擎Jinja2
- 传递变量
@start.route("/template")
def template():
# 传值
# name = "daye"
# return render_template("index.html", name=name)
name = "daye"
context = {"name":name}
return render_template("index.html", **context)
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
hello , template!
<p>hello ,{{ name }}</p>
</body>
</html>
- Jinja2基本语法
@start.route("/template")
def template():
# 传值
# name = "daye"
# return render_template("index.html", name=name)
name = "daye"
context = {"name": name, 'user': {"nickname": "day3", "qq": 12324, "home": "Bejing"}, 'num_list': [1, 2, 3, 4, 5]}
return render_template("index.html", **context)
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
hello , template!
<p>hello ,{{ name }}</p>
<p>
{% if user %}
{{ user.nickname }} QQ:{{ user.qq }} 家庭住址:{{ user.home }}
{% endif %}
</p>
<p>
{% for tmp in num_list %}
{{ tmp }}
{% endfor %}
</p>
</body>
</html>
- 模板继承
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>模板</title>
</head>
<body>
{% block content %}
{% endblock %}
</body>
</html>
{% extends "common/base.html" %}
{% block content %}
<p>this is extend</p>
{% endblock %}
@start.route("/extend")
def extend():
return render_template("extend.html")
5 Flask数据库
5.1 数据库配置
下载必要的配置:
pip install flask_sqlalchemy
apt-get install libmysqlclient-dev python3-dev
pip install mysqlclient
增添代码如下所示:
from flask import Blueprint, request, make_response, jsonify, render_template
from sqlalchemy import text
from exts import db
@start.route("/template")
def template():
# 传值
# name = "daye"
# return render_template("index.html", name=name)
name = "daye"
context = {"name": name, 'user': {"nickname": "day3", "qq": 12324, "home": "Bejing"}, 'num_list': [1, 2, 3, 4, 5]}
sql = text("select * from `user`")
result = db.engine.execute(sql)
context['result'] = result
return render_template("index.html", **context)
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
from flask import Flask
from controller import start
import configs
from exts import db
# 增加session会话保护(任意字符串,用来对session进行加密)
app.secret_key = "day3"
# db绑定app
db.init_app(app)
DEBUG = True
# 数据库配置
HOSTNAME = '127.0.0.1'
PORT = '3306'
DATABASE = 'mysql'
USERNAME = 'root'
PASSWORD = 'root'
DB_URI = 'mysql+mysqldb://{}:{}@{}:{}/{}?charset=utf8'. \
format(USERNAME, PASSWORD, HOSTNAME, PORT, DATABASE)
SQLALCHEMY_DATABASE_URI = DB_URI
SQLALCHEMY_TRACK_MODIFICATIONS = True
SQLALCHEMY_ECHO = True
![chrome_SkWekptSYs.png](https://img-blog.csdnimg.cn/img_convert/e9d7db316872d088b2da12bad0c7ef6a.png#averageHue=#fcfcfb&clientId=uc5b1810e-a7aa-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=151&id=u0421cd6a&margin=[object Object]&name=chrome_SkWekptSYs.png&originHeight=424&originWidth=883&originalType=binary&ratio=1&rotation=0&showTitle=false&size=10845&status=done&style=none&taskId=u130bd777-80b5-4ba3-b0b4-4383027c0f1&title=&width=315.3333435058594)
5.2 Flask通过Model访问数据库
@start.route("/template")
def template():
# 传值
# name = "daye"
# return render_template("index.html", name=name)
name = "daye"
context = {"name": name, 'user': {"nickname": "day3", "qq": 12324, "home": "Bejing"}, 'num_list': [1, 2, 3, 4, 5]}
# 数据库查询
# sql = text("select * from `user`")
# result = db.engine.execute(sql)
result = User.query.all()
context['result'] = result
return render_template("index.html", **context)
from exts import db
class User(db.Model):
Host = db.Column(db.String(80), primary_key=True)
User = db.Column(db.String(120))
5.3 自动生成model
下载自动生成model的库。
pip install flask-sqlacodegen
生成model的命令。
flask-sqlacodegen ‘mysql+mysqldb://root:root@127.0.0.1:3306/mysql?charset=utf8’ --table user --outfile “user.py” --flask
生成的表如下所示:
# coding: utf-8
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
class User(db.Model):
__tablename__ = 'user'
Host = db.Column(db.String(255, 'ascii_general_ci'), primary_key=True, nullable=False, server_default=db.FetchedValue())
User = db.Column(db.String(32, 'utf8mb3_bin'), primary_key=True, nullable=False, server_default=db.FetchedValue())
Select_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue())
Insert_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue())
Update_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue())
Delete_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue())
Create_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue())
Drop_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue())
Reload_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue())
Shutdown_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue())
Process_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue())
File_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue())
Grant_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue())
References_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue())
Index_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue())
Alter_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue())
Show_db_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue())
Super_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue())
Create_tmp_table_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue())
Lock_tables_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue())
Execute_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue())
Repl_slave_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue())
Repl_client_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue())
Create_view_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue())
Show_view_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue())
Create_routine_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue())
Alter_routine_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue())
Create_user_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue())
Event_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue())
Trigger_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue())
Create_tablespace_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue())
ssl_type = db.Column(db.Enum('', 'ANY', 'X509', 'SPECIFIED'), nullable=False, server_default=db.FetchedValue())
ssl_cipher = db.Column(db.LargeBinary, nullable=False)
x509_issuer = db.Column(db.LargeBinary, nullable=False)
x509_subject = db.Column(db.LargeBinary, nullable=False)
max_questions = db.Column(db.Integer, nullable=False, server_default=db.FetchedValue())
max_updates = db.Column(db.Integer, nullable=False, server_default=db.FetchedValue())
max_connections = db.Column(db.Integer, nullable=False, server_default=db.FetchedValue())
max_user_connections = db.Column(db.Integer, nullable=False, server_default=db.FetchedValue())
plugin = db.Column(db.String(64, 'utf8mb3_bin'), nullable=False, server_default=db.FetchedValue())
authentication_string = db.Column(db.Text(collation='utf8mb3_bin'))
password_expired = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue())
password_last_changed = db.Column(db.DateTime)
password_lifetime = db.Column(db.SmallInteger)
account_locked = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue())
Create_role_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue())
Drop_role_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue())
Password_reuse_history = db.Column(db.SmallInteger)
Password_reuse_time = db.Column(db.SmallInteger)
Password_require_current = db.Column(db.Enum('N', 'Y'))
User_attributes = db.Column(db.JSON)
写好的model读入数据库。
from app import app, db
if __name__ == '__main__':
from models import User
with app.app_context():
db.create_all()
# app.run(host="0.0.0.0")
6 MVC优化
- flask_script自定义启动命令
- 多环境配置文件
- flask_debugtoolbar
- 错误拦截器
- 请求拦截器
7 登录和注册功能实现
7.1 猫影前台功能
用户数据表设计
导出建表SQL语句
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for user
-- ----------------------------
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (
`id` int(0) NOT NULL AUTO_INCREMENT,
`nickname` varchar(30) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT '昵称',
`login_name` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT '登录用户名',
`login_pwd` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT '登录密码',
`login_salt` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT '登录密码随机码',
`status` tinyint(0) NOT NULL DEFAULT 1 COMMENT '状态:0:无效 1:有效',
`updated_time` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6) COMMENT '最后更新时间',
`created_time` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6) COMMENT '插入时间',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE INDEX `UNIQUE`(`login_name`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
7.2 搭建页面
新建member.py文件
from flask import Blueprint, render_template
member_page = Blueprint("member_page", __name__)
@member_page.route("/reg")
def reg():
return render_template("/member/reg.html")
@member_page.route("/login")
def login():
return render_template("/member/login.html")
使用bootstrap,此时模板文件为
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>模板</title>
<link rel="stylesheet" href="/static/bootstrap/css/bootstrap.min.css">
</head>
<body>
<!-- partial:partials/_sidebar.html -->
<nav class="navbar navbar-default navbar-inverse" style="border-radius: 0">
<div class="container-fluid">
<!-- Brand and toggle get grouped for better mobile display -->
<div class="navbar-header">
<button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1" aria-expanded="false">
<span class="sr-only">Toggle navigation</span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
<a class="navbar-brand" href="/">猫影</a>
</div>
<!-- Collect the nav links, forms, and other content for toggling -->
<div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
<ul class="nav navbar-nav">
<li class="active"><a href="#">影视 <span class="sr-only">(current)</span></a></li>
</ul>
<ul class="nav navbar-nav navbar-right">
<li><a href="{{ url_for('member_page.login') }}">登录</a></li>
<li><a href="{{ url_for('member_page.reg') }}">注册</a></li>
</ul>
</div><!-- /.navbar-collapse -->
</div><!-- /.container-fluid -->
</nav>
<div class="container" style="min-height: 600px">
{% block content %}
{% endblock %}
</div>
<footer class="text-center">
day3 @2022
</footer>
<script src="/static/bootstrap/js/jquery-2.0.3.js"></script>
<script src="/static/bootstrap/js/bootstrap.min.js"></script>
</body>
</html>
注册界面
{% extends "common/base.html" %}
{% block content %}
<div class="row" style="margin-top: 50px">
<div class="col-lg-6 col-lg-offset-3">
<div class="panel panel-default">
<div class="panel-heading">用户注册</div>
<div class="panel-body">
<form class="form-horizontal">
<div class="form-group">
<label for="login_name" class="col-sm-2 control-label">用户名</label>
<div class="col-sm-10">
<input type="text" class="form-control" id="login_name" placeholder="请输入用户名">
</div>
</div>
<div class="form-group">
<label for="login_email" class="col-sm-2 control-label">邮箱</label>
<div class="col-sm-10">
<input type="text" class="form-control" id="login_email" placeholder="请输入邮箱">
</div>
</div>
<div class="form-group">
<label for="login_pw1" class="col-sm-2 control-label">密码</label>
<div class="col-sm-10">
<input type="password" class="form-control" id="login_pw1" placeholder="请输入密码">
</div>
</div>
<div class="form-group">
<label for="login_pw2" class="col-sm-2 control-label">确认密码</label>
<div class="col-sm-10">
<input type="password" class="form-control" id="login_pw2" placeholder="请再次输入密码">
</div>
</div>
<div class="form-group">
<label for="yanzheng" class="col-sm-2 control-label">验证码</label>
<div class="col-sm-10">
<div class="input-group">
<input type="text" class="form-control" placeholder="请输入验证码">
<span class="input-group-btn">
<button class="btn btn-default" type="button" id="yanzheng">获取验证码</button>
</span>
</div><!-- /input-group -->
</div>
</div>
<div class="form-group">
<div class="col-sm-offset-3 col-sm-6">
<button type="submit" class="btn btn-success btn-block">确认</button>
</div>
</div>
</form>
</div>
</div>
</div>
</div>
{% endblock %}
登录界面
{% extends "common/base.html" %}
{% block content %}
<div class="row" style="margin-top: 50px">
<div class="col-lg-6 col-lg-offset-3">
<div class="panel panel-default">
<div class="panel-heading">用户登录</div>
<div class="panel-body">
<form class="form-horizontal">
<div class="form-group">
<label for="login_name" class="col-sm-2 control-label">用户名</label>
<div class="col-sm-10">
<input type="email" class="form-control" id="login_name" placeholder="请输入用户名">
</div>
</div>
<div class="form-group">
<label for="login_pwd" class="col-sm-2 control-label">密码</label>
<div class="col-sm-10">
<input type="password" class="form-control" id="login_pwd" placeholder="请输入密码">
</div>
</div>
<div class="form-group">
<div class="col-sm-offset-3 col-sm-6">
<button type="submit" class="btn btn-success btn-block">登录</button>
</div>
</div>
</form>
</div>
</div>
</div>
</div>
{% endblock %}
7.3 注册功能实现
模板函数
from app import app
class UrlManager(object):
@staticmethod
def buildUrl(path):
config_domain = app.config['DOMAIN']
return "%s%s"%(config_domain['www'], path)
@staticmethod
def buildStaticUrl(path):
path = "/static" + path
return UrlManager.buildUrl(path)
发送邮箱验证码
register.js
function bindEmailCaptchaClick(){
$("#yanzheng").click(function (event){
// $this:代表的是当前按钮的jquery对象
var $this = $(this);
// 阻止默认的事件
event.preventDefault();
var email = $("#login_email").val();
$.ajax({
// http://127.0.0.1:500
// /auth/captcha/email?email=xx@qq.com
url: "/member/captcha/email?email="+email,
method: "GET",
success: function (result){
var code = result['code'];
if(code == 200){
var countdown = 5;
// 开始倒计时之前,就取消按钮的点击事件
$this.off("click");
var timer = setInterval(function (){
$this.text(countdown);
countdown -= 1;
// 倒计时结束的时候执行
if(countdown <= 0){
// 清掉定时器
clearInterval(timer);
// 将按钮的文字重新修改回来
$this.text("获取验证码");
// 重新绑定点击事件
bindEmailCaptchaClick();
}
}, 1000);
// alert("邮箱验证码发送成功!");
}else{
alert(result['message']);
}
},
fail: function (){
console.log("获取验证码失败");
}
})
});
}
// 整个网页都加载完毕后再执行的
$(function (){
bindEmailCaptchaClick();
});
路由
@member_page.route("/captcha/email")
def get_email_captcha():
# /captcha/email/<email>
# /captcha/email?email=xxx@qq.com
email = request.args.get("email")
# 4/6:随机数组、字母、数组和字母的组合
source = string.digits*6
captcha = random.sample(source, 6)
captcha = "".join(captcha)
# I/O:Input/Output
message = Message(subject="注册验证码", recipients=[email], body=f"您的验证码是:{captcha}")
mail.send(message)
# memcached/redis
# 用数据库表的方式存储
if len(EmailCaptcha.query.filter(EmailCaptcha.email == email).all()) != 0:
EmailCaptcha.query.filter(EmailCaptcha.email == email).delete()
db.session.commit()
email_captcha = EmailCaptcha(email=email, captcha=captcha)
db.session.add(email_captcha)
db.session.commit()
# RESTful API
# {code: 200/400/500, message: "", data: {}}
return jsonify({"code": 200, "message": "出现错误,请重试!", "data": None})
@member_page.route("/login", methods=['GET', 'POST'])
def login():
if request.method == 'GET':
return render_template("/member/login.html")
else:
form = LoginForm(request.form)
if form.validate():
email = form.email.data
password = form.password.data
user = User.query.filter_by(login_name=email).first()
if not user:
print("邮箱在数据库中不存在!")
return render_template("/member/login.html")
else:
if check_password_hash(user.login_pwd, password):
# cookie:
# cookie中不适合存储太多的数据,只适合存储少量的数据
# cookie一般用来存放登录授权的东西
# flask中的session,是经过加密后存储在cookie中的
session['user_id'] = user.id
return render_template("/index.html")
else:
print("密码错误!")
return render_template("/member/login.html")
else:
print(form.errors)
return render_template("/member/login.html")
表单验证。
import wtforms
from wtforms.validators import Email, Length, EqualTo, InputRequired
from user import EmailCaptcha, User
from exts import db
# Form:主要就是用来验证前端提交的数据是否符合要求
class RegisterForm(wtforms.Form):
email = wtforms.StringField(validators=[Email(message="邮箱格式错误!")])
captcha = wtforms.StringField(validators=[Length(min=6, max=6, message="验证码格式错误!")])
username = wtforms.StringField(validators=[Length(min=3, max=20, message="用户名格式错误!")])
password = wtforms.StringField(validators=[Length(min=6, max=20, message="密码格式错误!")])
password_confirm = wtforms.StringField(validators=[EqualTo("password", message="两次密码不一致!")])
# 自定义验证:
# 1. 邮箱是否已经被注册
def validate_email(self, field):
email = field.data
user = User.query.filter_by(login_name=email).first()
if user:
raise wtforms.ValidationError(message="该邮箱已经被注册!")
# 2. 验证码是否正确
def validate_captcha(self, field):
captcha = field.data
email = self.email.data
captcha_model = EmailCaptcha.query.filter_by(email=email, captcha=captcha).first()
if not captcha_model:
raise wtforms.ValidationError(message="邮箱或验证码错误!")
else:
# todo:可以删掉captcha_model
db.session.delete(captcha_model)
db.session.commit()
class LoginForm(wtforms.Form):
email = wtforms.StringField(validators=[Email(message="邮箱格式错误!")])
password = wtforms.StringField(validators=[Length(min=6, max=20, message="密码格式错误!")])
#
#
# class QuestionForm(wtforms.Form):
# title = wtforms.StringField(validators=[Length(min=3, max=100, message="标题格式错误!")])
# content = wtforms.StringField(validators=[Length(min=3,message="内容格式错误!")])
#
#
# class AnswerForm(wtforms.Form):
# content = wtforms.StringField(validators=[Length(min=3, message="内容格式错误!")])
# question_id = wtforms.IntegerField(validators=[InputRequired(message="必须要传入问题id!")])
8 获取影视资源
apscheduler
测试apscheduler功能。
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
def aps_test():
print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test,trigger="cron",second="*/5")
scheduler.start()
Flask-APScheduler
可以查看一下此时的app.py和manger.py。
from app import app, db, scheduler
def main():
app.run(host="0.0.0.0", use_reloader=True) # 改为False可以只打印一遍
def create_all():
from user import User
with app.app_context():
db.create_all()
def aps_test():
import datetime
print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
if __name__ == '__main__':
app.apscheduler.add_job(func=aps_test, trigger="cron", second="*/5", id="aps_test")
scheduler.start()
# create_all()
try:
import sys
sys.exit(main())
except Exception as e:
import traceback
traceback.print_exc()
from flask import Flask, session, g
from controller.index import index_page
from controller.member import member_page
from flask_apscheduler import APScheduler
import configs
from user import User
from exts import db, mail
# 初始化实例
app = Flask(__name__)
# 加载配置文件
app.config.from_object(configs)
# 增加session会话保护(任意字符串,用来对session进行加密)
app.secret_key = "day3"
# 调度
scheduler = APScheduler()
scheduler.init_app(app)
# db绑定app
db.init_app(app)
mail.init_app(app)
# 注册蓝图
app.register_blueprint(index_page, url_prefix="/")
app.register_blueprint(member_page, url_prefix="/member")
'''
模板函数
'''
from libs import UrlManager
app.add_template_global(UrlManager.buildUrl, 'buildUrl')
app.add_template_global(UrlManager.buildStaticUrl, 'buildStaticUrl')
@app.before_request
def my_before_request():
user_id = session.get("user_id")
if user_id:
user = User.query.get(user_id)
setattr(g, "user", user)
else:
setattr(g, "user", None)
@app.context_processor
def my_context_processor():
return {"user": g.user}
@app.errorhandler(404)
def error_404(e):
return "404 not found"
8.1 影视表设计
下载爬虫相关库。
pip install --upgrade beautifulsoup4
pip install requests
pip install xlwt
需要注意
使用Navicat进行数据库的导入。
flask-sqlacodegen ‘mysql+mysqldb://root:root@127.0.0.1:3306/moviecat?charset=utf8’ --table movie --outfile “user1.py” --flask
# -*- coding: utf-8 -*-
import sys
import re # 正则表达式,进行文字匹配
from bs4 import BeautifulSoup # (网页解析,获取数据)
import urllib.request, urllib.error # 制定URL,获取网页数据,urllib.request urllib.error
import xlwt # 进行Excel操作
import sqlite3 # 进行sqlite数据库操作
def main():
baseurl = "https://movie.douban.com/top250?start="
datalist = getdata(baseurl)
# savepath = "豆瓣电影top250.xls"
dbpath = "movie.db"
saveData2DB(datalist, dbpath)
findlink = re.compile(r'<a href="(.*?)">') # 找链接
findImgSrc = re.compile(r'<img.*src="(.*?)"', re.S) # 图片
findTitle = re.compile(r'<span class="title">(.*)</span>') # 片名
findRating = re.compile(r'<span class="rating_num" property="v:average">(.*)</span>') # 评分
findJudge = re.compile(r'<span>(\d*)人评价</span>') # 评价人数
findInq = re.compile(r'<span class="inq">(.*)</span>') # 概况
findBd = re.compile(r'<p class="">(.*?)</p>', re.S) # 简介
# 爬取网页
def getdata(baseurl):
datalist = []
for i in range(0, 10):
url = baseurl + str(i * 25)
html = askURL(url)
# 逐一解析数据
soup = BeautifulSoup(html, "html.parser")
for item in soup.find_all('div', class_="item"):
# print(item) 测试:查看电影item所有信息
data = [] # 保存信息
item = str(item)
link = re.findall(findlink, item)[0]
# print(link)#影片链接
data.append(link) # 添加影片链接
imgSrc = re.findall(findImgSrc, item)[0]
data.append(imgSrc) # 添加图片
titles = re.findall(findTitle, item)
if (len(titles) == 2):
ctitle = titles[0] # 添加中国名
data.append(ctitle)
otitle = titles[1].replace("/", "") # 去掉无关符号
data.append(otitle) # 添加外国名
else:
data.append(titles[0])
data.append(' ') # 留空
rating = re.findall(findRating, item)[0]
data.append(rating) # 分数
judgenum = re.findall(findJudge, item)[0]
data.append(judgenum) # 评价人数
inq = re.findall(findInq, item) # 概述
if len(inq) != 0:
inq = inq[0].replace("。", "") # 去掉句号
data.append(inq)
else:
data.append(" ")
bd = re.findall(findBd, item)[0]
bd = re.sub('<br(\s+)?/>(\s+)?', " ", bd)
bd = re.sub('/', " ", bd)
data.append(bd.strip()) # 去掉空格
datalist.append(data) # 将所有信息放入datalist
return datalist
def askURL(url):
head = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36 Edg/92.0.902.78"}
request = urllib.request.Request(url, headers=head) # 封装
html = ""
try:
response = urllib.request.urlopen(request)
html = response.read().decode('utf-8', errors='ignore')
# print(html)
except urllib.error.URLError as e:
if hasattr(e, "code"):
print(e, code)
if hasattr(e, "reason"):
print(e, reason)
return html
# 保存数据
def savedata(datalist, savepath):
print("save....")
book = xlwt.Workbook(encoding="utf-8", style_compression=0) # 创建workbook对象
sheet = book.add_sheet("豆瓣电影top250", cell_overwrite_ok=True) # 创建工作表
col = ("电影链接", "图片链接", "中文名", "外国名", "评分", "评价人数", "概况", "相关信息")
for i in range(0, 8):
sheet.write(0, i, col[i])
for i in range(0, 250):
print("第%d条" % (i + 1))
data = datalist[i]
for j in range(0, 8):
sheet.write(i + 1, j, data[j])
book.save(savepath)
def saveData2DB(datalist, dbpath):
print("loading.................")
init_db(dbpath)
conn = sqlite3.connect(dbpath)
cur = conn.cursor()
for data in datalist:
for index in range(len(data)):
if index == 4 or index == 5:
continue
data[index] = '"' + data[index] + '"'
sql = '''
insert into movie250_2(
info_link,pic_link,cname,ename,score,rated,instroduction,info)
values(%s)''' % ",".join(data)
print(sql)
cur.execute(sql)
conn.commit()
cur.close()
conn.close()
print("已完成数据库操作")
def init_db(dbpath):
sql = '''
create table movie250_2
(
id integer primary key autoincrement,
info_link text,
pic_link text,
cname varchar,
ename varchar,
score numeric,
rated numeric,
instroduction text,
info,text
)
'''
conn = sqlite3.connect(dbpath)
cursor = conn.cursor()
cursor.execute(sql)
conn.commit()
conn.close()
if __name__ == "__main__":
main()
print("爬取完毕!")
8.2 前端展示
index.html
{% extends "common/base.html" %}
{% block head2 %}
<ul class="nav navbar-nav navbar-right">
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true"
aria-expanded="false">用户<span class="caret"></span></a>
<ul class="dropdown-menu">
<li><a href="#">{{ user.nickname }}</a></li>
<li><a href="{{ url_for('member_page.logout') }}">退出登录</a></li>
</ul>
</li>
</ul>
{% endblock %}
{% block content %}
<div class="row">
{% if data %}
{% for item in data %}
<div class="col-md-2">
<div class="thumbnail">
<img style="width: 100%;height: 250px" src="{{ item.pic_link | safe}}">
<div class="caption">
<a href="{{ item.info_link }}" style="overflow: hidden;text-overflow: ellipsis;display: -webkit-box;-webkit-box-orient: vertical;-webkit-line-clamp: 1">{{ item.cname | safe}}</a>
<p>豆瓣评分:{{ item.score | safe }}</p>
</div>
</div>
</div>
{% endfor %}
{% endif %}
</div>
<div class="row">
<div class="col-md-12">
<span class="pagination_page" style="line-height: 84px">共有{{ pages.total_pages }}页 | 每页{{ pages.page_size }}条</span>
<ul class="pagination pull-right">
{% if pages.is_prev == 1 %}
<li>
<a href="/?p={{ pages.current - 1 }}" aria-label="Previous">
<span aria-hidden="true">上一页;</span>
</a>
</li>
{% endif %}
{% for idx in pages.range%}
<li {% if pages.current == idx %}class="active"{% endif %}><a href="/?p={{ idx }}">{{ idx }}</a></li>
{% endfor %}
{% if pages.is_next == 1 %}
<li>
<a href="/?p={{ pages.current + 1 }}" aria-label="Previous">
<span aria-hidden="true">下一页</span>
</a>
</li>
{% endif %}
</ul>
</div>
</div>
{% endblock %}
index.py
from flask import Blueprint, render_template, g, redirect, url_for, request
from user import Movie
import math
index_page = Blueprint("index_page", __name__)
@index_page.route("/")
def index():
if g.user:
req = request.values
page = 1
if 'p' in req and req['p']:
page = int(req['p'])
query = Movie.query
page_size = 30
total_count = query.count()
total_pages = math.ceil(total_count / page_size)
total_pages = total_pages if total_pages > 0 else 1
is_prev = 0 if page <= 1 else 1
is_next = 0 if page >= total_pages else 1
pages = {
'page_size':page_size,
'total_count':total_count,
'total_pages':total_pages,
'range':range(1,total_pages+1),
'is_prev':is_prev,
'is_next':is_next,
'current':page
}
offset = (page - 1) * pages['page_size']
limit = page * pages['page_size']
list_movie = query[offset:limit]
return render_template("/index.html", data=list_movie, pages=pages)
else:
return redirect(url_for("member_page.login"))
@index_page.route("info")
def info():
req = request.values
return render_template("info.html")
9 部署
nginx+uwsgi+flask
使用如下命令进行启动。
uwsgi --http-socket :5000 --wsgi-file manger.py --callable app
还可以加进程和线程数提高并发能力
uwsgi --http-socket :5000 --wsgi-file manger.py --callable app --processes 4 --threads 2
写配置文件
[uwsgi]
chdir=/home/WF/ubuntuFlask/moviecat
http=0.0.0.0:5000
socket=/tmp/logs/movie.sock
module=manger
callable=app
master=true
processes=4
pidfile=/tmp/logs/movie.pid
daemonize=/tmp/logs/movie.log
启动
uwsgi --ini uwsgi.ini
关闭
uwsgi --stop uwsgi.ini
重启
uwsgi --reload uwsgi.ini
下载ngnix
https://blog.csdn.net/weixin_42973884/article/details/126251718
sudo apt-get update
sudo apt-get install nginx
ngnix # 启动
service nginx stop # 停止
service nginx reload # 重启
https://blog.csdn.net/weixin_39759781/article/details/118303445
cd /etc/nginx
半夜三点了还没配好,我真的是服了,留个坑后面补。