美文网首页数据库
Python操作redis的封装

Python操作redis的封装

作者: 陆_志东 | 来源:发表于2018-07-10 13:12 被阅读23次

下面是redis的封装,使用多线程的话,保证线程安全就把下面的线程锁打开

import redis
import threading
import logging


class RedisClient(object):
    """
    py2.7, py3.4
    """

    # mutex = threading.Lock()  # gevent 里使用线程锁可能有问题
    config = None
    connection_pool = None
    connection_client = None

    def __init__(self, config):
        """
        :param config: {"host":"",
                        "port": 0,
                        "index": 0,
                        "auth": "",
                        "encoding": "",
                        "decode_responses": False,
                        "max_connections": 1,
                        "target_max_memory": 1024
                        }
        """
        self.config = config
        max_conn = 1
        if "max_connections" in self.config:
            max_conn = self.config["max_connections"]
            if max_conn <= 0:
                max_conn = 1
        decode_responses = False
        if "decode_responses" in config:
            decode_responses = config["decode_responses"]
        temp_pool = redis.ConnectionPool(host=self.config['host'], port=self.config['port'], db=self.config['index'], password=self.config['auth'],
                                         encoding=self.config['encoding'], max_connections=max_conn, decode_responses=decode_responses)
        self.connection_pool = temp_pool
        temp_client = redis.Redis(connection_pool=self.connection_pool)
        self.connection_client = temp_client

    def rpush(self, key, json_text, expired_in_seconds=0):
        r = self.connection_client
        # self.mutex.acquire()
        pipe = r.pipeline()
        pipe.rpush(key, json_text)
        if expired_in_seconds > 0:
            pipe.expire(key, expired_in_seconds)
        pipe.execute()
        # self.mutex.release()

    def lpush(self, key, json_text, expired_in_seconds=0):
        r = self.connection_client
        # self.mutex.acquire()
        pipe = r.pipeline()
        pipe.lpush(key, json_text)
        if expired_in_seconds > 0:
            pipe.expire(key, expired_in_seconds)
        pipe.execute()
        # self.mutex.release()

    def lpop_pipline(self, key, length):
        i = 0
        poped_items = []
        r = self.connection_client
        # self.mutex.acquire()
        curent_len = r.llen(key)
        if curent_len > 0:
            target_len = 0
            if curent_len > length:
                target_len = length
            else:
                target_len = curent_len
            pipe = r.pipeline()
            while i < target_len:
                pipe.lpop(key)
                i += 1
            temp_poped_items = pipe.execute()
            poped_items = temp_poped_items
        # self.mutex.release()
        return poped_items

    def lpop(self, key):
        poped_items = []
        r = self.connection_client
        # self.mutex.acquire()
        data = r.lpop(key)
        if data:
            poped_items.append(data)
        # self.mutex.release()
        return poped_items

    def rpop_pipline(self, key, length):
        i = 0
        poped_items = []
        r = self.connection_client
        # self.mutex.acquire()
        curent_len = r.llen(key)
        if curent_len > 0:
            target_len = 0
            if curent_len > length:
                target_len = length
            else:
                target_len = curent_len
            pipe = r.pipeline()
            while i < target_len:
                pipe.rpop(key)
                i += 1
            temp_poped_items = pipe.execute()
            poped_items = temp_poped_items
        # self.mutex.release()
        return poped_items

    def rpop(self, key):
        poped_items = []
        r = self.connection_client
        # self.mutex.acquire()
        data = r.rpop(key)
        if data:
            poped_items.append(data)
        # self.mutex.release()
        return poped_items

    def hincrby(self, hash_key, field, amount=1):
        r = self.connection_client
        # self.mutex.acquire()
        result = r.hincrby(hash_key, field, amount)
        # self.mutex.release()
        return result

    def llen(self, key):
        r = self.connection_client
        # self.mutex.acquire()
        result = r.llen(key)
        # self.mutex.release()
        return result

    def hdel(self, key, field):
        r = self.connection_client
        # self.mutex.acquire()
        result = r.hdel(key, field)
        # self.mutex.release()
        return result

    def hset(self, key, field, value, expired_in_seconds=0):
        r = self.connection_client
        # self.mutex.acquire()
        pipline = r.pipeline()
        pipline.hset(key, field, value)
        if expired_in_seconds > 0:
            pipline.expire(key, expired_in_seconds)
        pipline.execute()
        # self.mutex.release()

    def info(self, section=None):
        r = self.connection_client
        # self.mutex.acquire()
        result = r.info(section)
        # self.mutex.release()
        return result

    def exceed_memory_limits(self):
        result = False
        if "target_max_memory" in self.config.keys():
            target_max_memory = self.config["target_max_memory"]
            redis_info = self.info("memory")
            distance = self.__max_memory_distance(redis_info, target_max_memory)
            if distance and distance <= 0:
                result = True
        return result

    def __max_memory_distance(self, redis_info_dict, target_max):
        # # Memory
        # used_memory:472978192
        # used_memory_human:451.07M
        # used_memory_rss:510640128
        # used_memory_peak:493548568
        # used_memory_peak_human:470.68M
        # used_memory_lua:35840
        # mem_fragmentation_ratio:1.08
        # mem_allocator:jemalloc - 3.6.0
        result = None
        if "used_memory" in redis_info_dict.keys():
            temp_used = int(redis_info_dict["used_memory"])
            temp_used = temp_used / (1024 * 1024)
            result = target_max - temp_used
        else:
            logging.warning(u"used_memory is not found!")
        return result

    def sadd(self, key, value):
        r = self.connection_client
        # self.mutex.acquire()
        result = r.sadd(key, value)
        # self.mutex.release()
        return result

    def sismember(self, key, value):
        r = self.connection_client
        # self.mutex.acquire()
        result = r.sismember(key, value)
        # self.mutex.release()
        return result

    def exists(self, key):
        r = self.connection_client
        # self.mutex.acquire()
        result = r.exists(key)
        # self.mutex.release()
        return result

    def keys(self, pattern):
        r = self.connection_client
        # self.mutex.acquire()
        result = r.keys(pattern=pattern)
        # self.mutex.release()
        return result

    def delele(self, key):
        r = self.connection_client
        # self.mutex.acquire()
        r.delete(key)
        # self.mutex.release()

    def scan(self, cursor, match=None, count=50):
        """
        :param cursor:
        :param match:
        :param count:
        :return:
         (new_cursor,
            [key1, key2, key3 ...])
        """
        r = self.connection_client
        # self.mutex.acquire()
        result = r.scan(cursor=cursor, match=match, count=count)
        # self.mutex.release()
        return result

    def hmget(self, hash_key, fields_list):
        r = self.connection_client
        # self.mutex.acquire()
        result = r.hmget(hash_key, fields_list)
        # self.mutex.release()
        return result

    def set(self, key, value, ex=None):
        r = self.connection_client
        # self.mutex.acquire()
        result = r.set(key, value, ex)
        # self.mutex.release()
        return result

    def close(self):
        if self.connection_pool:
            self.connection_pool.disconnect()

config = {
            "host": "",
            "port": 6379,
            "auth": "",
            "index": 11,
            "encoding": "utf-8",
            "decode_responses": True,  # 获取中文数据可以直接 decode python unicode
            "target_max_memory": 3896,
            "max_connections": 1
}

redis_client = RedisClient(config)

相关文章

  • Python操作redis的封装

    下面是redis的封装,使用多线程的话,保证线程安全就把下面的线程锁打开

  • Python操作redis

    1.python操作redis 1.安装 输入命令: 2.使用 封装 2.实例 3.Redis缓存Mysql模拟用...

  • php操作redis大全

    php连接redis测试 php操作redis函数封装 php操作redis大全,基本上php操作redis常用的...

  • Redis 简要使用

    Redis 设置认证 Python操作 Redis Node.js 操作Redis

  • Python操作Redis

    Python操作Redis 连接 Redis **import **redis r = redis.StrictR...

  • Python操作redis

    Python操作redis python连接方式:点击 下面介绍详细使用 1、String 操作 redis中的S...

  • python操作redis——walrus

    python对redis的操作,网上最多的是直接用python的redis库,其实与python操作其他数据库类似...

  • python操作redis

    #### python操作redis ##### 1、安装 ``` $ pip install redis 或者 ...

  • python操作redis

    python操作redis 安装包 查询redis库 删除redis key

  • go-redis源码分析(一):redis协议

    redis.v5是一款基于golang的redis操作库,封装了对redis的各种操作 源码地址是https://...

网友评论

    本文标题:Python操作redis的封装

    本文链接:https://www.haomeiwen.com/subject/mpsxpftx.html