快速实现用户认证:使用Python和Flask配合PyJWT生成与解密Token的教程及示例代码

时间:2024-02-26 07:19:31

生成token 与解密 token 和 拦截器

#学习交流 访问
# https://v.iiar.cn

import jwt
import datetime
from models import XUser
from flask import request, jsonify
from functools import wraps

SECRET_KEY = 'XPay'


# 创建token
def generate_token(user_id):
    try:
        payload = {
            'exp': datetime.datetime.utcnow() + datetime.timedelta(days=7),
            'iat': datetime.datetime.utcnow(),
            'sub': user_id
        }
        return jwt.encode(
            payload,
            SECRET_KEY,  # 替换为你的密钥
            algorithm='HS256'
        )
    except Exception as e:
        return e


# 解析token
def decode_token(token):
    """ 解码Token并处理异常 """
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
        return payload['sub']  # 返回成功标志和用户ID
    except jwt.ExpiredSignatureError as e:
        return False
    except jwt.InvalidTokenError as e:
        return False


# 查询用户状态
def check_token_and_user_status(token):
    user_id = decode_token(token)
    print(user_id)
    token_in_user = XUser.query.filter_by(token=token).first()

    # 根据是否开启多端登陆判断token是否有效
    multi_device_login = True
    if not multi_device_login and not token_in_user:
        return False, 'token过期'

    if user_id:
        user = XUser.query.get(user_id)
        if user:
            if user.state == '正常':
                return True, user
            else:
                return False, '该用户状态异常,请联系客服'
        else:
            return False, '用户不存在'
    else:
        return False, 'token无效'


def user_token_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        token = request.headers.get('token')
        if not token:
            return jsonify({
                'code': 401,
                'data': '',
                'msg': 'token不存在',
                'time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            })

        user_state, user_info = check_token_and_user_status(token)
        if not user_state:
            return jsonify({
                'code': 401,
                'data': '',
                'msg': user_info,
                'time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            })
        return f(*args, **kwargs, user_info=user_info)

    return decorated_function

详细解释

这段代码提供了一个使用 Python 和 Flask 结合 JWT (JSON Web Tokens) 进行用户认证的简单框架。它包括了生成 token、解码 token、检查用户状态和一个装饰器函数,用于保护需要认证的路由。下面是对代码的逐部分解释:

1. generate_token(user_id) 函数

  • 这个函数用于为指定的用户 ID 生成一个 JWT token。Token 包含三个重要的信息(称为payload):exp(过期时间),iat(发行时间),和sub(主题,这里用作用户 ID)。
  • Token 使用 HS256 算法进行签名,SECRET_KEY作为签名密钥。
  • Token 默认有效期为 7 天。

2. decode_token(token) 函数

  • 用于解码和验证接收到的 JWT token。如果 token 有效且未过期,这个函数返回保存在 token 的sub字段中的用户 ID。
  • 如果 token 过期(ExpiredSignatureError)或无效(InvalidTokenError),函数将返回 False。

3. check_token_and_user_status(token) 函数

  • 首先解码 token 来获取用户 ID,然后查询数据库中是否存在对应的用户和 token。
  • 如果设置了不允许多端登录(multi_device_login = False),它还会检查数据库中的 token 是否与提供的 token 匹配。
  • 根据用户的状态(如是否正常)和 token 的有效性返回相应的状态和信息。

4. user_token_required(f) 装饰器

  • 一个 Flask 路由装饰器,用于在执行实际路由函数之前进行用户认证。
  • 从请求头中获取 token,如果不存在 token 或 token 无效,则返回错误信息。
  • 如果用户认证成功,路由函数将正常执行。装饰器会将user_info(从check_token_and_user_status返回的用户信息)作为关键字参数传递给路由函数。

使用场景

  • 在需要对用户身份进行验证的 Flask 路由上,可以使用@user_token_required装饰器来确保只有携带有效 token 的请求才能访问。
  • 这套系统使得管理用户会话和访问控制变得简单,特别是在构建 RESTful API 时非常有用。

注意事项

  • 实际部署时,SECRET_KEY应当是一个安全的值,且不应该硬编码在代码中。
  • 在处理用户状态和多端登录逻辑时,需要根据实际业务需求进行调整。
  • XUser模型和数据库查询逻辑需要根据实际的数据库设计来实现。这里假设XUser是一个模型类,代表用户数据,且有一个state属性和一个token属性。

模型部分

class XUser(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    mid = db.Column(db.Integer, comment='推荐人id')
    phone = db.Column(db.String(80), comment='手机号')
    user_name = db.Column(db.String(80), comment='用户昵称')
    openid = db.Column(db.String(120), comment='openid')
    password = db.Column(db.String(120), comment='密码')
    registration_time = db.Column(db.DateTime, default=datetime.now, comment='注册时间')
    token = db.Column(db.String(255), comment='Token')
    account_balance = db.Column(db.Numeric(20, 4), default=0.00, comment="账户余额")
    state = db.Column(db.String(120), default='正常', comment='用户状态')
    fee_percentage = db.Column(db.Integer, default=4, comment='手续费比例')

    def to_dict(self):
        return {
            'id': self.id,
            'mid': self.mid,
            'phone': self.phone,
            'user_name': self.user_name,
            'openid': self.openid,
            'registration_time': self.registration_time.strftime(
                '%Y-%m-%d %H:%M:%S') if self.registration_time else None,
            'account_balance': float(self.account_balance) if self.account_balance is not None else None,
            'state': self.state,
            'fee_percentage': self.fee_percentage
        }

模型解释

这个XUser类是一个模型定义,使用 SQLAlchemy ORM (一个 Python SQL 工具包和对象关系映射器)用于定义和操作数据库中的用户表。每个属性(使用db.Column声明)对应用户表中的一个字段。以下是各个字段的解释:

字段解释

  • id: 用户的唯一标识符,整数类型,设为主键。
  • mid: 推荐人的ID,整数类型,用来表示这个用户是被哪个已存在的用户推荐的。
  • phone: 用户的手机号,字符串类型,最长80字符。
  • user_name: 用户昵称,字符串类型,最长80字符。
  • openid: 用户的微信OpenID,字符串类型,最长120字符。这是微信平台特有的标识用户的ID,用于识别微信用户。
  • password: 用户密码,字符串类型,最长120字符。在实际应用中,密码应该经过加密存储。
  • registration_time: 用户注册时间,DateTime类型,默认值为当前时间。这里使用了datetime.now来自动设置这个字段的值。
  • token: 用于认证的Token,字符串类型,最长255字符。这是在用户登录时生成的,用于后续请求的身份验证。
  • account_balance: 用户账户余额,数值类型,最多20位数字,小数点后最多4位。默认值为0.00。
  • state: 用户状态,字符串类型,最长120字符,默认值为’正常’。这个字段可以用来表示用户是否被禁用或其他状态。
  • fee_percentage: 手续费比例,整数类型。表示在进行某些交易时需要收取的手续费百分比,默认值为4%。

to_dict 方法

to_dict方法是一个实例方法,用于将XUser对象的属性转换成一个字典,这在进行JSON序列化时非常有用,例如在 RESTful API 响应中返回用户信息。这个方法特别处理了registration_timeaccount_balance字段,确保它们以适当的格式输出(registration_time格式化为字符串,account_balance确保为浮点数或None)。

总结

XUser类不仅定义了数据库表结构,还提供了方便的方法来操作和转换用户数据。通过使用 SQLAlchemy ORM,可以简化数据库操作,提高代码的可读性和可维护性。

用户注册/登陆/获取用户基本信息

from datetime import datetime
from models import db, XUser
from flask import request, jsonify, Blueprint

from tools.common_method import check_password, set_password
from tools.user_token import generate_token
from tools.user_token import user_token_required

user_api = Blueprint('user_api', __name__)


# 登陆
@user_api.route('/api/user/login', methods=['POST'])
def user_login():
    data = request.json
    phone = data.get('phone')
    password = data.get('password')
    user = XUser.query.filter_by(phone=phone).first()
    print(phone)
    if not user:
        return jsonify({
            'code': 403,
            'data': '',
            'msg': '用户账号或密码错误',
            'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        })
    else:
        if check_password(user.password, password) and user.state == '正常':
            token = generate_token(user.id)
            user.token = token
            db.session.commit()
            return jsonify({
                'code': 200,
                'token': token,
                'msg': '登陆成功',
                'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            })
        else:
            return jsonify({
                'code': 403,
                'data': '',
                'msg': '该用户状态不允许登陆',
                'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            })


# 注册
@user_api.route('/api/user/register', methods=['POST'])
def user_register():
    data = request.json
    phone = data.get('phone')
    password = data.get('password')
    user_name = data.get('user_name')
    password_len = len(password)
    phone_len = len(phone)
    if phone_len != 11:
        return jsonify({
            'code': 403,
            'data': '',
            'msg': '注册失败,请输入11位手机号',
            'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        })
    if password_len < 6 or password_len > 32:
        return jsonify({
            'code': 403,
            'data': '',
            'msg': '注册失败,密码长度请大于6位,小于32位',
            'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        })
    user = XUser.query.filter_by(phone=phone).first()
    if user:
        return jsonify({
            'code': 403,
            'data': '',
            'msg': '注册失败,该手机已经注册,请直接登陆或更换手机后注册',
            'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        })
    else:
        new_password = set_password(password)
        new_user = XUser(
            phone=phone,
            password=new_password,
            user_name=user_name
        )
        db.session.add(new_user)
        db.session.commit()
        token = generate_token(new_user.id)
        new_user.token = token
        db.session.commit()
        return jsonify({
            'code': 200,
            'token': token,
            'msg': '注册成功',
            'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        })


@user_api.route('/api/user/user_info', methods=['GET'])
@user_token_required
def get_user_info(user_info):
    return reg_func(200, user_info.to_dict(), '获取信息成功')


def reg_func(code, data, msg):
    return jsonify({
        'code': code,
        'data': data,
        'msg': msg,
        'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    }), code

注册/登陆/获取用户信息代码详细解释

这段代码是一个使用 Flask 框架构建的简易用户管理系统的一部分,包括用户的登录、注册以及获取用户信息的功能。它利用了 Flask 的 Blueprint 功能来组织和注册相关的路由,以及 SQLAlchemy ORM 来处理数据库操作。下面是对代码主要部分的解释:

登录 (user_login)

  • 接收客户端发送的 JSON 数据,包含用户的phonepassword
  • 通过手机号查询数据库中的用户。
  • 如果用户不存在,返回403错误和消息"用户账号或密码错误"
  • 如果用户存在,调用check_password函数验证密码是否正确,并检查用户状态是否为"正常"
  • 如果验证通过,为用户生成一个新的 token(使用generate_token函数),更新用户的 token 字段,并提交到数据库。
  • 返回包含新 token 和成功消息的 JSON 响应。

注册 (user_register)

  • 同样接收包含phonepassworduser_name的 JSON 数据。
  • 验证手机号是否为11位,密码长度是否在6到32位之间。
  • 如果手机号或密码格式不符合要求,返回403错误和相应的失败消息。
  • 如果手机号已经被注册,返回403错误和消息"注册失败,该手机已经注册,请直接登陆或更换手机后注册"
  • 如果验证通过,使用set_password函数加密密码,然后创建一个新的XUser实例,并将其添加到数据库。
  • 为新用户生成 token,更新用户的 token 字段,并提交到数据库。
  • 返回包含新 token 和成功消息的 JSON 响应。

获取用户信息 (get_user_info)

  • 这个路由使用@user_token_required装饰器,要求请求必须包含有效的 token。
  • 如果认证通过,装饰器会传递user_info参数(用户信息)给get_user_info函数。
  • 函数调用reg_func生成标准的 JSON 响应,包含用户信息和成功消息。

辅助函数 (reg_func)

  • reg_func是一个帮助函数,用于生成标准化的 JSON 响应。它接收状态码、数据和消息作为参数,并返回一个 Flask jsonify 响应。

关键工具和方法

  • Blueprint:用于创建一组相关的路由和视图函数。
  • request.json:用于获取 JSON 格式的请求体数据。
  • jsonify:将数据转换为 JSON 响应。
  • db.session:用于数据库操作,如添加新记录和提交更改。
  • generate_tokenuser_token_required:自定义函数和装饰器,用于处理 JWT token 的生成和验证。

整个代码展示了如何在 Flask 应用中实现用户认证和管理的基本流程,使用 JWT tokens 提供安全的用户状态管理和接口保护。