美文网首页
Flask token配置与 用户分组(三)

Flask token配置与 用户分组(三)

作者: andpy | 来源:发表于2018-06-21 15:30 被阅读455次

    Token的配置

    生成Token: TimedJSONWebSignatureSerializer进行序列化

    from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
    
    from flask import current_app, jsonify
    from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
    
    from app.libs.enums import ClientTypeEnum
    from app.libs.redprint import RedPrint
    from app.models.user import User
    from app.validators.forms import ClientForm
    
    api = RedPrint('token')
    
    
    @api.route('', methods=['POST'])
    def get_token():
        form = ClientForm().validate_for_api()
        promise = {
            ClientTypeEnum.USER_EMAIL: User.verify
        }
        identify = promise[ClientTypeEnum(form.type.data)](
            form.account.data,
            form.secret.data
        )
        # 生成令牌
        expiration = current_app.config['TOKEN_EXPIRATION']
        token = generate_auth_token(identify['uid'],
                                    form.type.data,
                                    identify['scope'],
                                    expiration)
    
        # 注意需要进行 ascii 编码
        t = {
            'token': token.decode('ascii')
        }
        return jsonify(t, 201)
    
    
    def generate_auth_token(uid, ac_type, scope=None, expiration=7200):
        """
        生成token,将用户的id,作用域,用户类型,过期时间写入token
        :param uid: 用户id
        :param ac_type: 用户类型
        :param scope: 权限域
        :param expiration: 过期时间 秒
        :return:
        """
        s = Serializer(current_app.config['SECRET_KEY'],
                       expires_in=expiration,
                       )
        return s.dumps({'uid': uid,
                        'type': ac_type.value,
                        'scope': scope
                        })
    

    Token的验证,在用户调取接口等相关的操作的时候,对token进行验证的操作,传递token是在http的请求头中 Authorization 字段中

    #使用示例,注意必须放到放到方法名上
    @api.route('', methods=['GET'])
    @auth.login_required
    def get_user():
        uid = g.user.uid
        user = User.query.filter_by(id=uid).first_or_404()
        return jsonify(user)
    
    
    #验证相关代码
    from collections import namedtuple
    from flask import current_app, g, request
    from flask_httpauth import HTTPBasicAuth
    from itsdangerous import TimedJSONWebSignatureSerializer as Serializer, BadSignature, SignatureExpired
    from app.libs.error_code import AuthFailed, Forbidden
    from app.libs.scope import is_in_scope
    
    auth = HTTPBasicAuth()
    #使用nametuple生成类
    User = namedtuple('User', ['uid', 'ac_type', 'scope'])
    
    @auth.verify_password
    def verify_password(token, password):
        # http的协议规范
        # key = Authorization
        # value = basic base64(rjl:111111)
        user_info = verify_auth_token(token)
        if not user_info:
            return False
        else:
            g.user = user_info
            return True
    
    # 验证不通过,直接返回相关的信息,从token中获取用户相关的信息
    def verify_auth_token(token):
        s = Serializer(current_app.config['SECRET_KEY'])
        try:
            data = s.loads(token)
        except BadSignature:
            raise AuthFailed(msg='token is invalid', error_code=1002)
        except SignatureExpired:
            raise AuthFailed(msg='token is expired', error_code=1003)
        uid = data['uid']
        ac_type = data['type']
        scope = data['scope']
        # request 视图函数
        allow = is_in_scope(scope, request.endpoint)
        if not allow:
            raise Forbidden()
        return User(uid, ac_type, scope)
    

    用户对接口操作权限的判断

    在用户登录操作的时候把用户的等级标志写入token中,根据token的信息,与当前用户的等级进行判断!

    将相应用户可以访问的视图函数,存储起来,当用户请求的时候,查看是> 否在该作用域中。

    定义作于域:

    class Scope:
        # 视图函数级别的区分
        allow_api = []
    
        # 模块级别的定义区分
        allow_module = []
    
        # 排除
        forbidden_api = []
    
        def __add__(self, other):
            self.allow_api = self.allow_api + other.allow_api
            # set去重,再转为list
            self.allow_api = list(set(self.allow_api))
    
            self.allow_module = self.allow_module + other.allow_module
            self.allow_module = list(set(self.allow_module))
    
            return self
    
    #定义管理权限组
    class AdminScope(Scope):
        allow_module = [
            'v1.user'
        ]
        # def __init__(self):
        #     self + UserScope()
    
    #普通用户组
    class UserScope(Scope):
        allow_api = [
            'v1.user+get_user'
        ]
    
        def __init__(self):
            pass
    
    #超级管理员组
    class SuperScope(Scope):
        """
        超级管理员,权限相加
        """
        allow_api = [
    
        ]
        allow_module = [
            'v1.user'
        ]
        def __init__(self):
            self + AdminScope() + UserScope()
    
    
    #验证权限
    def is_in_scope(scope, endpoint):
        """
        根据视图函数名判断,该视图函数是否在 权限组中
        :param scope:
        :param endpoint:
        :return:
        """
        # 反射 ,注意endpoint的路径是带有 blueprint的路径的
        scope = globals()[scope]()
        # 查看定义 redprint的 位置
        splits = endpoint.split('+')
        red_name = splits[0]
        if endpoint in scope.forbidden_api:
            return False
        if endpoint in scope.allow_api:
            return True
        if red_name in scope.allow_module:
            return True
        else:
            return False
    

    与token的数据进行校验

    . . .
    uid = data['uid']
    ac_type = data['type']
    scope = data['scope']
    # request 视图函数
    allow = is_in_scope(scope, request.endpoint)
    if not allow:
        raise Forbidden()
    . . .
    

    tips:模块级别的校验,需要更改 redprint的 endpoint

    class RedPrint:
    def __init__(self, name):
        self.name = name
        self.mound = []
    
    def route(self, rule, **options):
        def decorator(f):
            self.mound.append((f, rule, options))
            return f
    
        return decorator
    
    def register(self, bp, url_prefix=None):
        """
        将 redprint注册到 blueprint,实际调用 blueprint代码
        """
        if url_prefix is None:
            url_prefix = '/' + self.name
        for f, rule, options in self.mound:
            # 自定义 endpoint
            endpoint = self.name + '+' + options.pop("endpoint", f.__name__)
            bp.add_url_rule(url_prefix + rule, endpoint, f, **options)
    

    相关文章

      网友评论

          本文标题:Flask token配置与 用户分组(三)

          本文链接:https://www.haomeiwen.com/subject/jgzqyftx.html