美文网首页自然语言处理知识图谱
python-Redis使用封装代码

python-Redis使用封装代码

作者: 晓柒NLP与药物设计 | 来源:发表于2022-07-14 16:57 被阅读0次
import time
import redis
from rediscluster import RedisCluster
import traceback
from utils.cyberark import get_cyberark_password

from utils.logger import create_logger
logger = create_logger('redis')

redis_config = {
    'dev': [
        {'host': '30.23.76.68', 'port': 6391}, 
        {'host': '30.23.76.68', 'port': 6392}, 
        {'host': '30.23.10.31', 'port': 6393}, 
        {'host': '30.23.10.31', 'port': 6394}, 
        {'host': '30.23.10.32', 'port': 6395}, 
        {'host': '30.23.10.32', 'port': 6396}
        ],
    'stg': [
        {'host': '30.79.123.87', 'port': 6491}, 
        {'host': '30.79.123.87', 'port': 6492}, 
        {'host': '30.79.123.88', 'port': 6493}, 
        {'host': '30.79.123.88', 'port': 6494}, 
        {'host': '30.79.85.171', 'port': 6495}, 
        {'host': '30.79.85.171', 'port': 6496}
        ],
    'prd': [
        {'host': '30.64.0.14', 'port': 6463}, 
        {'host': '30.64.0.15', 'port': 6464}, 
        {'host': '30.64.0.13', 'port': 6465}, 
        {'host': '30.64.0.14', 'port': 6466}, 
        {'host': '30.64.0.15', 'port': 6467}
        ]
}



class RedisService():
    """redis服务
    """    
    def __init__(self, env):
        self.env = env
        self.REDIS = self.connect()
        

    def get_passwd(self):
        """调用cyberark接口,获取redis密码(生产环境才需要)

        Returns:
            [string]: REDIS密码
        """        
        try:
            passwd = get_cyberark_password(self.env, 'robot_redis')
            logger.info('>> [REDIS] 成功获取cyberark密码')
            return passwd
        except:
            err = traceback.format_exc()
            logger.error(f'>> [REDIS] 获取cyberark密码失败, error: {err}')
            return None

    def connect(self, verbose=1):
        """连接REDIS

        Args:
            verbose (int): 打印日志. 可选1——打印日志, 0——不打印日志

        Returns:
            REDIS类
        """
        try:
            redis_nodes = redis_config[self.env]
            if self.env == 'prd':
                redis_passwd = self.get_passwd()
                REDIS = RedisCluster(startup_nodes=redis_nodes, decode_responses=True, password=redis_passwd)
            elif self.env == 'stg':
                REDIS = RedisCluster(startup_nodes=redis_nodes, decode_responses=True)
            else:
                REDIS = RedisCluster(startup_nodes=redis_nodes, decode_responses=True, password='ub.gi8AH~1')
            if verbose == 1:
                logger.info(f'>> [REDIS] 连接{self.env}环境REDIS集群成功')

            return REDIS
        except:
            err = traceback.format_exc()
            logger.error(f'>> [REDIS] 连接{self.env}环境REDIS集群失败, error: {err}')
            return None

    def exists_key(self, key, verbose=1):
        """判断key是否存在

        Args:
            key (str): 需判断存在的key
            verbose (int): 打印日志. 可选1——打印日志, 0——不打印日志

        Returns:
            [bool]: 是否存在
        """        
        is_exists = self.REDIS.exists(key)
        if is_exists:
            if verbose == 1:
                logger.info(f'>> [REDIS] 存在key: {key}')
            return True
        else:
            if verbose == 1:
                logger.info(f'>> [REDIS] 不存在key: {key}')
            return False
        

    def get_value(self, key, verbose=2):
        """获取redis对应key的值

        Args:
            key (str): 需查询的key
            verbose (int): 打印日志详情. 可选2——打印成功日志和value, 1——打印成功日志, 0——不打印日志

        Returns:
            str/list/json: key对应的值, 可为字符、列表、字典
        """
        start_time = time.time()
        try:
            value = self.REDIS.get(key)
            if isinstance(value, str):
                try:
                    # set命令只会存string类型,所以get的结果是string类型,需要用eval尝试转成列表或字典类型
                    value = eval(value)
                except:
                    logger.info(f'>> [REDIS] 获取的value无法经过eval函数处理, 耗时: {time.time() - start_time:.2}s')

            if verbose == 2:
                logger.info(f'>> [REDIS] 获取key的value成功, key: {key}, value: {value}, 耗时: {time.time() - start_time:.2}s')
            elif verbose == 1:
                logger.info(f'>> [REDIS] 获取key的value成功, key: {key}, 耗时: {time.time() - start_time:.2}s')
            return value
        except:
            err = traceback.format_exc()
            logger.error(f'>> [REDIS] 获取value异常, key: {key}, error: {err}, 耗时: {time.time() - start_time:.2}s')
            return None
    
    def set_value(self, key, value, verbose=2):
        """设置redis对应key的值

        Args:
            key (str): 需设置的key
            value ([str/list/json]): key对应的值, 可为字符、列表、字典
            verbose (int): 打印日志详情. 可选2——打印成功日志和value, 1——打印成功日志, 0——不打印日志

        Returns:
            [bool]: 是否成功
        """ 
        start_time = time.time()
        try:
            try:
                # set命令只会存string类型,虽然value是列表或字典,但是存进去后会转成string型
                set_success = self.REDIS.set(key, value)
            except redis.exceptions.DataError:
                value = str(value)
                set_success = self.REDIS.set(key, value)

            if set_success:
                if verbose == 2:
                    logger.info(f'>> [REDIS] 设置value成功, key: {key}, value: {value}, 耗时: {time.time() - start_time:.2}s')
                elif verbose == 1:
                    logger.info(f'>> [REDIS] 设置value成功, key: {key}, 耗时: {time.time() - start_time:.2}s')
                return True
            else:
                logger.warning(f'>> [REDIS] 设置value失败, key: {key}, value: {value}, 耗时: {time.time() - start_time:.2}s')
                return False
        except:
            err = traceback.format_exc()
            logger.error(f'>> [REDIS] 设置value异常, key: {key}, value: {value}, error: {err}, 耗时: {time.time() - start_time:.2}s')
            return False

    def del_key(self, key, verbose=1):
        """删除redis的key

        Args:
            key (str): 需删除的key
            verbose (int): 打印日志. 可选1——打印日志, 0——不打印日志

        Returns:
            [bool]: 是否成功
        """ 
        start_time = time.time()
        try:
            del_success = self.REDIS.delete(key)
            if del_success == 1:
                if verbose == 1:
                    logger.info(f'>> [REDIS] 删除key成功, key: {key}, 耗时: {time.time() - start_time:.2}s')
                return True
            else:
                logger.warning(f'>> [REDIS] 删除key失败, key: {key}, 耗时: {time.time() - start_time:.2}s')
                return False
        except:
            err = traceback.format_exc()
            logger.error(f'>> [REDIS] 删除key异常, key: {key}, error: {err}, 耗时: {time.time() - start_time:.2}s')
            return False
    
    def rename_key(self, key1, key2, verbose=1):
        """重命名key

        Args:
            key1 ([string]): 需更名的key
            key2 ([string]): 更名后的key
            verbose (int): 打印日志. 可选1——打印日志, 0——不打印日志

        Returns:
            [bool]: 是否成功
        """        
        start_time = time.time()
        # rename会报错编码问题, 使用先增后删
        # self.REDIS.rename(key1, key2)
        try:
            value = self.get_value(key1, verbose=0)
            set_status = self.set_value(key2, value, verbose=0)
            if set_status:
                self.del_key(key1, verbose=0)
                if verbose:
                    logger.info(f'>> REDIS重命名key成功, 重命名前: {key1}, 重命名后: {key2}, 耗时: {time.time() - start_time:.2}s')
            else:
                logger.info(f'>> REDIS重命名key失败, 无法新增key: {key2}, 耗时: {time.time() - start_time:.2}s')
            return True
        except:
            err = traceback.format_exc()
            logger.error(f'>> REDIS重命名key失败, key: {key1}, error: {err}, 耗时: {time.time() - start_time:.2}s')
            return False    


    # TODO 取消获取key的操作,线上Redis禁止使用Keys正则匹配操作, 当key的数量很多时, keys命令会占用唯一的一个线程的大量处理时间, 导致所有的请求都被拖慢
    # TODO 虽然可以用scan命令获取key, 但有工程师反应使用scan获取key也会被警告, 且使用REDIS时原则上应该知道要用哪个key, 而不是查有哪些key
    # def get_all_key(self, str, verbose=2):
    #     """获取所有的key

    #     Args:
    #         str ([string]): key的名称或部分名称
    #         verbose (int): 打印日志详情. 可选2——打印成功日志和value, 1——打印成功日志, 0——不打印日志

    #     Returns:
    #         [list]: 查询结果
    #     """        
        

    #     start_time = time.time()
    #     try:
    #         keys = self.REDIS.keys(str)

    #         if verbose == 2:
    #             logger.info(f'>> [REDIS] 获取所有相关的key成功, str: {str}, keys: {keys}, 耗时: {time.time() - start_time:.2}s')
    #         elif verbose == 1:
    #             logger.info(f'>> [REDIS] 获取所有相关的key成功, str: {str}, 耗时: {time.time() - start_time:.2}s')
    #         return keys
    #     except:
    #         err = traceback.format_exc()
    #         logger.error(f'>> [REDIS] 获取所有相关的key异常, key: {str}, error: {err}, 耗时: {time.time() - start_time:.2}s')
    #         return None



    def get_value_type(self, key, verbose):
        """获取key的数据类型

        Args:
            key ([string]): 查询数据类型的key
            verbose (int): 打印日志. 可选1——打印日志, 0——不打印日志

        Returns:
            [string]: 数据类型
        """
        start_time = time.time()
        try:
            value_type =self.REDIS.type(key)
            if verbose == 1:
                logger.info(f'>> [REDIS] 获取key存储数据的数据类型成功, key: {key}, type: {value_type}, 耗时: {time.time() - start_time:.2}s')
            return value_type
        except:
            err = traceback.format_exc()
            logger.error(f'>> [REDIS] 获取key存储数据的数据类型异常, key: {str}, error: {err}, 耗时: {time.time() - start_time:.2}s')
            return None

相关文章

网友评论

    本文标题:python-Redis使用封装代码

    本文链接:https://www.haomeiwen.com/subject/cqkfirtx.html