Rest framework-认证,权限,频率组件
认证,权限,频率这三个都是一样的实现逻辑
下面着重介绍认证组件,会了认证组件,权限,频率自然一通百通
大家主要看我写有注视的地方,其他选择性观看
不管什么请求来了一定都会走dispatch方法进行分发
# APIView下的dispatch
def dispatch(self, request, *args, **kwargs):
"""
`.dispatch()` is pretty much the same as Django's regular dispatch,
but with extra hooks for startup, finalize, and exception handling.
"""
self.args = args
self.kwargs = kwargs
# 接收Django的request生成新的request
request = self.initialize_request(request, *args, **kwargs)
self.request = request
self.headers = self.default_response_headers # deprecate?
try:
# 认证,权限,频率都在这句话里面
self.initial(request, *args, **kwargs)
# Get the appropriate handler method
if request.method.lower() in self.http_method_names:
handler = getattr(self, request.method.lower(),
self.http_method_not_allowed)
else:
handler = self.http_method_not_allowed
response = handler(request, *args, **kwargs)
except Exception as exc:
response = self.handle_exception(exc)
self.response = self.finalize_response(request, response, *args, **kwargs)
return self.response
def initial(self, request, *args, **kwargs):
"""
Runs anything that needs to occur prior to calling the method handler.
"""
self.format_kwarg = self.get_format_suffix(**kwargs)
# Perform content negotiation and store the accepted info on the request
neg = self.perform_content_negotiation(request)
request.accepted_renderer, request.accepted_media_type = neg
# Determine the API version, if versioning is in use.
version, scheme = self.determine_version(request, *args, **kwargs)
request.version, request.versioning_scheme = version, scheme
# Ensure that the incoming request is permitted
# 认证组件
self.perform_authentication(request)
# 权限组件
self.check_permissions(request)
# 频率组件
self.check_throttles(request)
下面开分析这三个组件
# 认证组件
self.perform_authentication(request)
# 权限组件
self.check_permissions(request)
# 频率组件
self.check_throttles(request)
认证组件源码简单分析
def perform_authentication(self, request):
"""
Perform authentication on the incoming request.
Note that if you override this and simply 'pass', then authentication
will instead be performed lazily, the first time either
`request.user` or `request.auth` is accessed.
"""
# 下面可以看出这是request下的一个user方法
# perform_authentication在APIView的dispatch下,所以是新的request
request.user
request = self.initialize_request(request, *args, **kwargs)
# 下一步 self.initialize_request
def initialize_request(self, request, *args, **kwargs):
"""
Returns the initial request object.
"""
parser_context = self.get_parser_context(request)
return Request(
request,
parsers=self.get_parsers(),
authenticators=self.get_authenticators(),
negotiator=self.get_content_negotiator(),
parser_context=parser_context
)
# 下一步 Request
# 必须有@property下面的user才是我们要找的
@property
def user(self):
"""
Returns the user associated with the current request, as authenticated
by the authentication classes provided to the request.
"""
if not hasattr(self, '_user'):
# 没有_user就执行_authenticate
with wrap_attributeerrors():
self._authenticate()
return self._user
# 下一步
# 此时的self是Request
def _authenticate(self):
"""
Attempt to authenticate the request using each authentication instance
in turn.
"""
# 此时的self是Request
# 所以我们去Request里边找authenticators
# self.authenticators = authenticators or () 这个变量是Request参数传进来的,看下图
for authenticator in self.authenticators:
try:
user_auth_tuple = authenticator.authenticate(self)
except exceptions.APIException:
self._not_authenticated()
raise
if user_auth_tuple is not None:
self._authenticator = authenticator
self.user, self.auth = user_auth_tuple
return
self._not_authenticated()
return Request(
request,
parsers=self.get_parsers(),
# authenticators = 此时的self是我们自己写的视图类.get_authenticators()
authenticators=self.get_authenticators(),
negotiator=self.get_content_negotiator(),
parser_context=parser_context
)
# 下一步 下面基本可以看出 解析,认证,权限,频率基本都是一样的实现逻辑
# 解析组件
def get_parsers(self):
return [parser() for parser in self.parser_classes]
# 认证组件
def get_authenticators(self):
return [auth() for auth in self.authentication_classes]
# 权限组件
def get_permissions(self):
return [permission() for permission in self.permission_classes]
# 频率组件
def get_throttles(self):
return [throttle() for throttle in self.throttle_classes]
# 下一步
# 自己写的视图类首先继承了APIView,在APIView找到了authentication_classes
class APIView(View):
# The following policies may be set at either globally, or per-view.
renderer_classes = api_settings.DEFAULT_RENDERER_CLASSES
parser_classes = api_settings.DEFAULT_PARSER_CLASSES
# authentication_classes
authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES
throttle_classes = api_settings.DEFAULT_THROTTLE_CLASSES
permission_classes = api_settings.DEFAULT_PERMISSION_CLASSES
content_negotiation_class = api_settings.DEFAULT_CONTENT_NEGOTIATION_CLASS
metadata_class = api_settings.DEFAULT_METADATA_CLASS
versioning_class = api_settings.DEFAULT_VERSIONING_CLASS
结论:
按照之前解释器分析,从这里我们可以看出如果自己在自己的视图内部定义了authentication_classes,就优先执行局部的authentication_classes,局部没有就优先执行全局settings.py,全局没有就默认的
认证组件应用
我们自己写一个简单的csrf_token认证的登陆接口
urls.py
url(r'^login/$', views.LoginView.as_view()),
url(r'^courses/$', views.CourseView.as_view()),
models.py
class User(models.Model):
user=models.CharField(max_length=18)
pwd=models.CharField(max_length=32)
class UserToken(models.Model):
user=models.OneToOneField("user")
token=models.CharField(max_length=128)
# 自己手动添加了两个用户
user:moyan pwd:123
user:allen pwe:234
views.py
# 生成随机字符串
def get_random_str(user):
import hashlib,time
ctime=str(time.time())
md5=hashlib.md5(bytes(user,encoding="utf8"))
md5.update(bytes(ctime,encoding="utf8"))
return md5.hexdigest()
from app01.models import User,UserToken
from django.http import JsonResponse
class LoginView(APIView):
def post(self,request):
res = {"code": 1000, "msg": None}
try:
# 接收的JSON数据所以是data不是POST,
user = request.data.get("user")
pwd = request.data.get("pwd")
user_obj = User.objects.filter(user=user, pwd=pwd).first()
if not user_obj:
res["code"] = 1001
res["msg"] = "用户名或者密码错误"
else:
# 生成随机字符串
token = get_random_str(user)
# update_or_create 更新还是创建取决于UserToken表里有user=user_obj则更新,没有则创建
UserToken.objects.update_or_create(user=user_obj, defaults={"token": token})
res["token"] = token
except Exception as e:
res["code"] = 1002
res["msg"] = str(e)
return JsonResponse(res, json_dumps_params={"ensure_ascii": False})
测试:http://127.0.0.1:8000/login/
# 请求
{
"user":"moyan",
"pwd":"123"
}
# 账户密码正确返回 下次在以moyan更新token,即第一次创建,后边则为更新
{
"code": 1000,
"msg": null,
"token": "c6c7b5346bc34208e4d885750c95aff4"
}
# 失败返回
{
"code": 1001,
"msg": "用户名或者密码错误"
}
自己写的认证类
# rest_framework认证错误类
from rest_framework.exceptions import AuthenticationFailed
class TokenAuth(object):
# 必须叫 authenticate
def authenticate(self,request):
# 获取用户携带过来的token
token=request.GET.get("token",None)
# 过滤获取用户对象
token_obj=UserToken.objects.filter(token=token).first()
if token_obj:
# 如果非要返回元祖的话,一定要是最后一个认证类
# 因为分会元祖,源码就直接return了,不在执行之后的认证类了
return token_obj.user.user,token_obj
else:
raise AuthenticationFailed("认证失败! ")
# 自己写的认证类里边必须有authenticate_header,但是里边可以什么都不写
def authenticate_header(self,request):
pass
自己写的认证类升级版
from rest_framework.exceptions import AuthenticationFailed
from app01.models import UserToken
from rest_framework.authentication import BaseAuthentication
# 继承BaseAuthentication就不用里边有写的authenticate_header
# BaseAuthentication里有写authenticate,authenticate_header这两个方法,方法内什么都没写
# 所以自己写的话覆盖,没写继承
class TokenAuth(BaseAuthentication):
# 必须叫 authenticate,覆盖BaseAuthentication里边的
def authenticate(self,request):
# 获取用户携带过来的token
token=request.GET.get("token",None)
# 过滤获取用户对象
token_obj=UserToken.objects.filter(token=token).first()
if token_obj:
# 如果非要返回元祖的话,一定要是最后一个认证类
# 因为分会元祖,源码就直接return了,不在执行之后的认证类了
return token_obj.user.user,token_obj.token
else:
raise AuthenticationFailed("认证失败! ")
以下是BaseAuthentication源码,除了BaseAuthentication还有很多REST有的认证类(SessionAuthentication,TokenAuthentication等),跟BaseAuthentication在同一文件下,但是基本没用过,每个业务认证都是不同的,所以都得重写
class BaseAuthentication(object):
def authenticate(self, request):
raise NotImplementedError(".authenticate() must be overridden.")
def authenticate_header(self, request):
pass
局部认证
class CourseView(APIView):
# 列表就是放多个认证类的意思
authentication_classes=[TokenAuth,]
def get(self,request):
return HttpResponse("get....")
def post(self,request):
print(request.data)
return HttpResponse("post.....")
以后访问courses,必须携带usertoken表里存在的token值
测试:http://127.0.0.1:8000/courses/?token=fe8d8fe240142c54c00f12655418e88f
全局认证类
settings.py
参考这个默认值authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES
REST_FRAMEWORK={
# 解析
'DEFAULT_PARSER_CLASSES': (
'rest_framework.parsers.JSONParser',
),
# 认证
'DEFAULT_AUTHENTICATION_CLASSES':(
"app01.utils.auth.TokenAuth"
)
}
权限类
models.py
class User(models.Model):
user=models.CharField(max_length=18)
pwd=models.CharField(max_length=32)
# 添加个权限字段
# user.type 只能点出来数字
type=models.IntegerField(choices=((1,"common"),(2,"VIP"),(3,"SVIP")),default=1)
class UserToken(models.Model):
user=models.OneToOneField("user")
token=models.CharField(max_length=128)
# 测试数据
# 1 moyan 123 1
# 2 allen 234 3
源码,还记得这条语句么 ? 还是先去自己的视图类里面找有没有permission_classes
# 下一步 下面基本可以看出 解析,认证,权限,频率基本都是一样的实现逻辑
# 解析组件
def get_parsers(self):
return [parser() for parser in self.parser_classes]
# 认证组件
def get_authenticators(self):
return [auth() for auth in self.authentication_classes]
# 权限组件
def get_permissions(self):
return [permission() for permission in self.permission_classes]
# 频率组件
def get_throttles(self):
return [throttle() for throttle in self.throttle_classes]
views.py
class CourseView(APIView):
# 列表就是放多个认证类的意思
authentication_classes=[TokenAuth,]
permission_classes = [SVIPPermission,]
def get(self,request):
return HttpResponse("get....")
def post(self,request):
print(request.data)
return HttpResponse("post.....")
自己的权限类怎么写,还得取决于调用的这三个方法里边怎么写的
源码
def initial(self, request, *args, **kwargs):
# ..........
self.perform_authentication(request)
self.check_permissions(request)
self.check_throttles(request)
def check_permissions(self, request):
"""
Check if the request should be permitted.
Raises an appropriate exception if the request is not permitted.
"""
for permission in self.get_permissions():
# 自己类的函数名字必须是has_permission了
# 只需要返回True 或者 Fales,让if判断
if not permission.has_permission(request, self):
self.permission_denied(
# 不通过返回错误信息,视图类下没有message用默认的
request, message=getattr(permission, 'message', None)
)
自己写的权限类
class SVIPPermission():
# 此处的view是执行此函数的视图类
def has_permission(self,request,view):
user_type=request.auth.user.type
if user_type==3:
return True
else:
return False
测试用的是moyan的token:http://127.0.0.1:8000/courses/?token=fe8d8fe240142c54c00f12655418e88f
{
"detail": "权限不够"
}
测试用的是allen的token:http://127.0.0.1:8000/courses/?token=428271f7e71624f052bc205867c659f
# 成功
get....
全局权限
settings.py
REST_FRAMEWORK={
# 解析
'DEFAULT_PARSER_CLASSES': (
'rest_framework.parsers.JSONParser',
'rest_framework.parsers.FormParser',
'rest_framework.parsers.MultiPartParser'
),
# 认证
'DEFAULT_AUTHENTICATION_CLASSES':(
# 认证类路径 我只是把他们都暂时放到一个文件里面去了
"app01.utils.auth.TokenAuth"
),
# 权限
'DEFAULT_PERMISSION_CLASSES':(
# 权限类路径
"app01.utils.auth.SVIPPermission"
),
}
频率类
自己的权限类怎么写,还得取决于调用的这三个方法里边怎么写的
源码
def initial(self, request, *args, **kwargs):
# ..........
self.perform_authentication(request)
self.check_permissions(request)
self.check_throttles(request)
def check_throttles(self, request):
for throttle in self.get_throttles():
# 自己类的函数名字必须是has_permission了
# 只需要返回True 或者 Fales,让if判断
if not throttle.allow_request(request, self):
self.throttled(request, throttle.wait())
REST频率类
# SimpleRateThrottle 简单的频率限制
from rest_framework.throttling import SimpleRateThrottle
class VisitThrottle(SimpleRateThrottle):
scope = "visit_rate"
def get_cache_key(self, request, view):
return self.get_ident(request)
全局
REST_FRAMEWORK={
# # 解析
# 'DEFAULT_PARSER_CLASSES': (
# 'rest_framework.parsers.JSONParser',
# 'rest_framework.parsers.FormParser',
# 'rest_framework.parsers.MultiPartParser'
# ),
# # 认证
# 'DEFAULT_AUTHENTICATION_CLASSES':(
# "app01.utils.auth.TokenAuth"
# ),
# # 权限
# 'DEFAULT_PERMISSION_CLASSES':(
# "app01.utils.auth.SVIPPermission"
#
# ),
# 频率
"DEFAULT_THROTTLE_CLASSES": (
"app01.utils.auth.VisitThrottle",
),
# 频率配置
"DEFAULT_THROTTLE_RATES": {
# 每分钟5次 可以 5/s秒 5/d 等
# visit_rate 对应 scope="visit_rate"
"visit_rate": "5/m",
}
}
局部
class CourseView(APIView):
# # 列表就是放多个认证类的意思
# authentication_classes=[TokenAuth,]
# permission_classes = [SVIPPermission,]
throttle_classes = [VisitThrottle]
def get(self,request):
return HttpResponse("get....")
def post(self,request):
print(request.data)
return HttpResponse("post.....")
排除个别视图类不参加认证
先走认证才能到权限之后是频率
全局每个视图类都需要认证,包括登陆在内.可是登陆成功才后生成token,现在登陆都得携带token,很不合理;需要在全局中排除掉单个视图类不参加全局认证,怎么写呢?
eg:不想让哪个视图类,参加全局认证,只需要在自己的视图类中写一个局部认证,你可以什么都不写
其他不参加权限,频率,全局等同理,写个空的局部函数,或者执行自己的局部认证
记住先走局部-全局-默认
class LoginView(APIView):
authentication_classes = []
def post(self,request):
# ...........
网友评论