
DRF 认证、权限、限制
认证:
定义一个用户表和一个保存用户的Token表

# ======================day96======================= class UserInfo(models.Model):
username = models.CharField(max_length=16,unique=True)
password = models.CharField(max_length=32) type = models.SmallIntegerField(
choices=((1,"普通用户"),(2,"VIP用户")),
default=1
) class Token(models.Model):
token = models.CharField(max_length=128)
user = models.OneToOneField(to="UserInfo",on_delete=models.CASCADE)

定义一个登陆视图:

# 生成Token的函数
def get_token_code(username):
"""
根据用户名和时间戳来生成永不相同的token随机字符串
:param username: 字符串格式的用户名
:return: 字符串格式的Token
""" import time
import hashlib timestamp = str(time.time())
m = hashlib.md5(username.encode("utf-8"))
# md5 要传入字节类型的数据
m.update(timestamp.encode("utf-8"))
return m.hexdigest() # 将生成的随机字符串返回 # 登陆视图 class LoginView(APIView):
'''
登陆检测试图。
1,接收用户发过来的用户名和密码数据
2,校验用户密码是否正确
- 成功就返回登陆成功,然后发Token
- 失败就返回错误提示
''' def post(self,request):
res = {"code":0}
# 从post 里面取数据
print(request.data)
username = request.data.get("username")
password = request.data.get("password")
# 去数据库查询
user_obj = models.UserInfo.objects.filter(
username = username,
password = password
).first()
if user_obj:
# 登陆成功
# 生成Token
token = get_token_code(username)
# 将token保存
# 用user = user_obj 这个条件去Token表里查询。
# 如果又记录就更新default里传的参数,没有记录就用default里传的参数创建一条数据。
models.Token.objects.update_or_create(defaults={"token":token},user = user_obj)
# 将token返回给用户
res["token"] = token
else:
# 登陆失败
res["code"] = 1
res["error"] = "用户名或密码错误"
return Response(res)

新建一个utils文件夹 下面放一些组件:
定义一个MyAuth认证类:

"""
这里放自定义的认证类
"""
from rest_framework.authentication import BaseAuthentication
from app01 import models
from rest_framework.exceptions import AuthenticationFailed class MyAuth(BaseAuthentication): def authenticate(self, request):
# print(request.method)
if request.method in ["POST","PUT","DELETE"]:
# 如果请求是post,put,delete三种类型时
# 获取随用户请求发来的token随机码
token = request.data.get("token")
# 然后去数据库查询有没有这个token
token_obj = models.Token.objects.filter(token=token).first()
if token_obj:
# 如果存在,则说明验证通过,以元组形式返回用户对象和token
return token_obj.user,token
else:
# 不存在就直接抛错
raise AuthenticationFailed("无效的token")
else:
# 这一步的else 是为了当用户是get请求时也可获取数据,并不需要验证token.
return None,None

视图级别认证:
class CommentViewSet(ModelViewSet):
queryset = models.Comment.objects.all()
serializer_class = app01_serializers.CommentSerializer
authentication_classes = [MyAuth,]
permission_classes = [MyPermission,]
全局级别认证:需要在settings.py文件设置:

# REST FRAMEWORK 相关的配置 REST_FRAMEWORK = {
# 关于认证的全局配置
# "DEFAULT_AUTHENTICATION_CALSSES": ["app01.utils.auth.MyAuth",],
# "DEFAULT_PERMISSION_CLASSES" : ["app01.utils.permission.MyPermission"],
# "DEFAULT_THROTTLE_CLASSES" : ["app01.utils.throttle.MyThrottle",],
"DEFAULT_THROTTLE_CLASSES" : ["app01.utils.throttle.VisitThrottle",],
"DEFAULT_THROTTLE_RATES":{
"XXX":"5/m",
}
}

权限:
只有VIP用户才能看的内容:
自定义权限类:

'''
自定义的权限类
''' from rest_framework.permissions import BasePermission class MyPermission(BasePermission):
message = "sorry,您没有权限"
def has_permission(self, request, view):
# 内置封装的方法
'''
判断该用户有没有权限
'''
# 判断用户是不是VIP用户
# 如果是VIP用户就返回True
# 如果是普通用户就返会Flase if request.method in ["POST","PUT","DELETE"]:
# print(111)
print(request.user.username)
print(request.user.type)
print(type(request.user.type))
if request.user.type == 2: # 是VIP用户
print(2222)
return True
else:
return False
else:
return True def has_object_permission(self, request, view, obj):
# 用来判断针对的obj权限:
# 例如:是不是某一个人的评论
'''
只有评论人是自己才能删除选定的评论
'''
if request.method in ["PUT","DELETE"]:
print(obj.user.username)
print(request.user.username)
if obj.user == request.user:
# 表示当前评论对象的用户就是登陆用户
return True
else:
return False
else:
return True

视图级别配置:
class CommentViewSet(ModelViewSet):
queryset = models.Comment.objects.all()
serializer_class = app01_serializers.CommentSerializer
authentication_classes = [MyAuth,]
permission_classes = [MyPermission,]
全局级别配置:

# REST FRAMEWORK 相关的配置 REST_FRAMEWORK = {
# 关于认证的全局配置
# "DEFAULT_AUTHENTICATION_CALSSES": ["app01.utils.auth.MyAuth",],
# "DEFAULT_PERMISSION_CLASSES" : ["app01.utils.permission.MyPermission"],
# "DEFAULT_THROTTLE_CLASSES" : ["app01.utils.throttle.MyThrottle",],
"DEFAULT_THROTTLE_CLASSES" : ["app01.utils.throttle.VisitThrottle",],
"DEFAULT_THROTTLE_RATES":{
"XXX":"5/m",
}
}

限制:
自定义限制类:

'''
自定义的访问限制类
''' from rest_framework.throttling import BaseThrottle,SimpleRateThrottle
import time # =============================
# DIC = {}
#
# class MyThrottle(BaseThrottle):
# def allow_request(self, request, view):
# '''
# 返回True就放行,返回False表示被限制了
# '''
#
# # 获取当前访问的ip地址
# ip = request.META.get("REMOTE_ADDR")
#
# # 获取当前时间
# now = time.time()
#
# # 判断当前ip是否有访问记录
# if ip not in DIC:
# DIC[ip] = [] # 如果没有访问记录初始化一个空的访问历史列表
#
# # 高端操作
# history = DIC[ip]
# # 当当前ip存在访问记录,且现在的访问时间比最初的一次访问时间大于10秒
# while history and now - history[-1] > 10:
# history.pop() # 删掉历史列表中的最后一个记录
# # 判断最近一分钟的访问次数是否超过了阈值(3次)
# if len(history)>=3:
# return False
# else:
# # 把这一次的访问时间加到访问历史列表的第一位
# DIC[ip].insert(0,now)
# return True # ==============================
# 以上代码等同于一下代码
class VisitThrottle(SimpleRateThrottle):
scope = 'XXX' def get_cache_key(self, request, view):
return self.get_ident(request) # 求当前访问的IP

视图级别:

from app01.utils.auth import MyAuth
from app01.utils.permission import MyPermission
from app01.utils.throttle import SimpleRateThrottle
# from app01.utils.throttle import
class CommentViewSet(ModelViewSet):
queryset = models.Comment.objects.all()
serializer_class = app01_serializers.CommentSerializer
authentication_classes = [MyAuth,]
permission_classes = [MyPermission,]

全局级别:

# REST FRAMEWORK 相关的配置 REST_FRAMEWORK = {
# 关于认证的全局配置
# "DEFAULT_AUTHENTICATION_CALSSES": ["app01.utils.auth.MyAuth",],
# "DEFAULT_PERMISSION_CLASSES" : ["app01.utils.permission.MyPermission"],
# "DEFAULT_THROTTLE_CLASSES" : ["app01.utils.throttle.MyThrottle",],
"DEFAULT_THROTTLE_CLASSES" : ["app01.utils.throttle.VisitThrottle",],
"DEFAULT_THROTTLE_RATES":{
"XXX":"5/m",
}
}