rest-framework之认证组件

时间:2024-06-17 21:05:07

认证组件

认证简介

作用:校验是否登录

  • 首先定义一个类,集成BaseAuthentication,写一个方法:authenticate,在方法内部,实证过程,认证通过,返回None或者两个对象(user,auth),这两个对象,在视图类的request中可以取出来.如果返回的是None,就走下一个认证组件 [xx,xxx]
from rest_framework.authentication import BaseAuthentication
class myAuthen(BaseAuthentication):
def authenticate(self, request):
token = request.query_params.get('token')
ret = models.UserToken.objects.filter(token=token).first()
if ret:
# return ret.user, ret
# 要写多个认证类,这个地返回None
# 最后一个认证类,返回这俩值
return ret.user, ret
else:
raise AuthenticationFailed('您没有登陆')
  • 局部使用:只需要在视图类中加入(可以写多个)
authentication_classes = [myAuthen, ]
  • 全局使用 setting中设置
REST_FRAMEWORK={
"DEFAULT_AUTHENTICATION_CLASSES":["app01.service.auth.Authentication",]
}

认证顺序

认证类使用顺序:先用视图类中的验证类,再用settings里配置的验证类,最后用默认的验证类

局部使用例子

  1. models 层
class User(models.Model):
username=models.CharField(max_length=32)
password=models.CharField(max_length=32)
user_type=models.IntegerField(choices=((1,'超级用户'),(2,'普通用户'),(3,'二笔用户'))) class UserToken(models.Model):
user=models.OneToOneField(to='User')
token=models.CharField(max_length=64)
  1. 新建认证类(验证通过return两个参数)
from rest_framework.authentication import BaseAuthentication
class TokenAuth():
def authenticate(self, request):
token = request.GET.get('token')
token_obj = models.UserToken.objects.filter(token=token).first()
if token_obj:
return
else:
raise AuthenticationFailed('认证失败')
def authenticate_header(self,request):
pass
  1. view层 登录创建token,并存入数据库,下次登录认证token.
(1) 登录.....
def get_token(username):
import hashlib
import time
md = hashlib.md5()
# update内必须传bytes格式
md.update(username.encode('utf-8'))
md.update(str(time.time()).encode('utf-8'))
return md.hexdigest() #认证
class Login(APIView):
authentication_classes = [ ]
def post(self, request):
response = MyResponse()
name = request.data.get('name')
pwd = request.data.get('pwd')
user = models.User.objects.filter(username=name, password=pwd).first()
if user:
response.msg = '登陆成功'
# 登陆成功,返回一个随机字符串,以后在发请求,都携带这个字符串
token = get_token(name)
response.token = token
# 把随机字符串保存到数据库,有就更新,没有就创建
# ret=models.UserToken.objects.update_or_create(user_id=user.id,kwargs={'token':token})
ret = models.UserToken.objects.update_or_create(user=user, defaults={'token': token}) else:
response.msg = '用户名或密码错误'
response.status = 101
return JsonResponse(response.get_dic,safe=False) (2) 登录后 携带token 认证查询数据....
from app01.myauth import myAuthen class Stus(APIView):
authentication_classes = [myAuthen,] def get(self,request):
response = MyResponse()
#认证成功之后,可以去到用户名称,及token
print(request.user.username)
print(request.auth.token)
stus = models.Student.objects.all()
ret = myserial.StrSer(instance=stus,many=True)
response.msg = '查询成功'
response.data = ret.data
return JsonResponse(response.get_dic,safe=False)
  1. auth.py
from rest_framework.authentication import BaseAuthentication
#抛出异常,捕捉
from rest_framework.exceptions import AuthenticationFailed
from app01 import models class myAuthen(BaseAuthentication):
def authenticate(self, request):
token = request.query_params.get('token')
ret = models.UserToken.objects.filter(token=token).first()
if ret:
# return ret.user, ret
# 要写多个认证类,这个地返回None # 最后一个认证类,返回这俩值
return ret.user, ret
else:
raise AuthenticationFailed('您没有登陆')

附:不存数据库的token验证 就是通过某种的加密校验规则来验证

def get_token(id,salt='123'):
import hashlib
md=hashlib.md5()
md.update(bytes(str(id),encoding='utf-8'))
md.update(bytes(salt,encoding='utf-8')) return md.hexdigest()+'|'+str(id) def check_token(token,salt='123'):
ll=token.split('|')
import hashlib
md=hashlib.md5()
md.update(bytes(ll[-1],encoding='utf-8'))
md.update(bytes(salt,encoding='utf-8'))
if ll[0]==md.hexdigest():
return True
else:
return False class TokenAuth():
def authenticate(self, request):
token = request.GET.get('token')
success=check_token(token)
if success:
return
else:
raise AuthenticationFailed('认证失败')
def authenticate_header(self,request):
pass
class Login(APIView):
def post(self,reuquest):
back_msg={'status':1001,'msg':None}
try:
name=reuquest.data.get('name')
pwd=reuquest.data.get('pwd')
user=models.User.objects.filter(username=name,password=pwd).first()
if user:
token=get_token(user.pk)
# models.UserToken.objects.update_or_create(user=user,defaults={'token':token})
back_msg['status']='1000'
back_msg['msg']='登录成功'
back_msg['token']=token
else:
back_msg['msg'] = '用户名或密码错误'
except Exception as e:
back_msg['msg']=str(e)
return Response(back_msg)
from rest_framework.authentication import BaseAuthentication
class TokenAuth():
def authenticate(self, request):
token = request.GET.get('token')
token_obj = models.UserToken.objects.filter(token=token).first()
if token_obj:
return
else:
raise AuthenticationFailed('认证失败')
def authenticate_header(self,request):
pass class Course(APIView):
authentication_classes = [TokenAuth, ] def get(self, request):
return HttpResponse('get') def post(self, request):
return HttpResponse('post')

全局使用 在setting中添加

REST_FRAMEWORK={
"DEFAULT_AUTHENTICATION_CLASSES":["app01.service.auth.Authentication",]
}

源码分析

#Request对象的user方法
@property
def user(self):
the authentication classes provided to the request.
if not hasattr(self, '_user'):
with wrap_attributeerrors():
self._authenticate()
return self._user def _authenticate(self):
for authenticator in self.authenticators:
try:
user_auth_tuple = authenticator.authenticate(self)
except exceptions.APIException:
self._not_authenticated()
raise
#认证成功,可以返回一个元组,但必须是最后一个验证类才能返回
if user_auth_tuple is not None:
self._authenticator = authenticator
self.user, self.auth = user_auth_tuple
return self._not_authenticated()

self.authenticators

def get_authenticators(self):
return [auth() for auth in self.authentication_classes]

链接