美文网首页
jwt管理器封装

jwt管理器封装

作者: 安森老叔叔 | 来源:发表于2020-03-14 17:53 被阅读0次

from datetime import datetime, timedelta
from flask import current_app, jsonify
import jwt
from jwt.api_jwt import timegm
from flask import request
import logging

logger = logging.getLogger(__name__)

class AuthorizationJwt(object):
    """
    认证
    """

    def generate_tokens(self, user_id, with_refresh_token=True):
        """
        生成token 和refresh_token
        :param user_id: 用户id
        :return: token, refresh_token
        """
        # 颁发JWT
        now = datetime.utcnow()
        expiry = timegm((now + timedelta(seconds=current_app.config['JWT_EXPIRY_HOURS'])).utctimetuple())
        token = jwt.encode(payload={"user_id": user_id, "refresh": False, "exp": expiry}, key="123")
        refresh_token = None
        if with_refresh_token:
            refresh_expiry = timegm((now + timedelta(days=current_app.config['JWT_REFRESH_DAYS'])).utctimetuple())
            refresh_token = jwt.encode({"user_id": user_id, "refresh": True, "exp": refresh_expiry}, key="123")
        return token, refresh_token

    def register_jwt(self, appbuilder):
        """
        注册或刷新token
        """

        token = request.form.get("token")
        refresh_token = request.form.get("refresh_token")
        username = request.form.get("username")
        password = request.form.get("password")

        logger.info(username)
        if refresh_token:
            payload = self.verify_jwt(refresh_token)
            if payload:
                user_id = payload['user_id']
                token = self.refresh(user_id)
                msg, status = "register jwt successfully", "success"
            else:
                msg, status = "wrong token", "fail"
        else:
            # 使用flask_appbuilder.security.Manager进行鉴权
            user = appbuilder.sm.auth_user_db(username, password)
            if not user:
                msg, status = "wrong user or password", "fail"
            else:
                # 生成token
                token, refresh_token = self.generate_tokens(user.id)
                msg, status = "register jwt successfully", "success"

        return jsonify({"msg": msg, "status": status, "token": token, "refresh_token": refresh_token})

    def verify_jwt(self, token):
        '''
        验证token时候有效
        '''
        try:
            pay_load = jwt.decode(token, algorithms=["HS256"], key="123", options={"verify_exp": True})
        except:
            return
        return pay_load

    def refresh(self, user_id):
        '''
        根据refresh_token进行token的刷新
        :return: token
        '''
        token, _ = self.generate_tokens(user_id, with_refresh_token=False)
        return token

相关文章

网友评论

      本文标题:jwt管理器封装

      本文链接:https://www.haomeiwen.com/subject/ucbgshtx.html