美文网首页redis
OpenResty上使用的redis工具类

OpenResty上使用的redis工具类

作者: 肥兔子爱豆畜子 | 来源:发表于2021-10-17 16:48 被阅读0次

    最近在看360团队的openresty最佳实践,一边学lua这门语言,一边熟悉openresty技术栈。里边提供了一个封装的redis客户端工具类,这里稍微改了一下。

    -- file name: resty/redis_iresty.lua
    local redis_c = require "resty.redis"
    
    --当前版本是否支持table.new,做下兼容处理
    local ok, new_tab = pcall(require, "table.new")
    if not ok or type(new_tab) ~= "function" then
        new_tab = function (narr, nrec) return {} end
    end
    
    --[[
    table.new(narray,nhash)
    预分配new一个table,指定array长度或者hash长度。
    一般一个table推荐同时只用一种类型,所以一般是table.new(0,n)或table.new(n,0)
    ]]
    local _M = new_tab(0, 155) -- 新建一个155长度的hash table
    _M._VERSION = '0.01'
    
    --redis命令
    local commands = {
        "append",            "auth",              "bgrewriteaof",
        "bgsave",            "bitcount",          "bitop",
        "blpop",             "brpop",
        "brpoplpush",        "client",            "config",
        "dbsize",
        "debug",             "decr",              "decrby",
        "del",               "discard",           "dump",
        "echo",
        "eval",              "exec",              "exists",
        "expire",            "expireat",          "flushall",
        "flushdb",           "get",               "getbit",
        "getrange",          "getset",            "hdel",
        "hexists",           "hget",              "hgetall",
        "hincrby",           "hincrbyfloat",      "hkeys",
        "hlen",
        "hmget",              "hmset",      "hscan",
        "hset",
        "hsetnx",            "hvals",             "incr",
        "incrby",            "incrbyfloat",       "info",
        "keys",
        "lastsave",          "lindex",            "linsert",
        "llen",              "lpop",              "lpush",
        "lpushx",            "lrange",            "lrem",
        "lset",              "ltrim",             "mget",
        "migrate",
        "monitor",           "move",              "mset",
        "msetnx",            "multi",             "object",
        "persist",           "pexpire",           "pexpireat",
        "ping",              "psetex",            "psubscribe",
        "pttl",
        "publish",      --[[ "punsubscribe", ]]   "pubsub",
        "quit",
        "randomkey",         "rename",            "renamenx",
        "restore",
        "rpop",              "rpoplpush",         "rpush",
        "rpushx",            "sadd",              "save",
        "scan",              "scard",             "script",
        "sdiff",             "sdiffstore",
        "select",            "set",               "setbit",
        "setex",             "setnx",             "setrange",
        "shutdown",          "sinter",            "sinterstore",
        "sismember",         "slaveof",           "slowlog",
        "smembers",          "smove",             "sort",
        "spop",              "srandmember",       "srem",
        "sscan",
        "strlen",       --[[ "subscribe",  ]]     "sunion",
        "sunionstore",       "sync",              "time",
        "ttl",
        "type",         --[[ "unsubscribe", ]]    "unwatch",
        "watch",             "zadd",              "zcard",
        "zcount",            "zincrby",           "zinterstore",
        "zrange",            "zrangebyscore",     "zrank",
        "zrem",              "zremrangebyrank",   "zremrangebyscore",
        "zrevrange",         "zrevrangebyscore",  "zrevrank",
        "zscan",
        "zscore",            "zunionstore",       "evalsha"
    }
    
    --元表,用在返回的模块对象上
    local mt = { __index = _M }
    
    
    local function is_redis_null( res )
        if type(res) == "table" then
            for k,v in pairs(res) do
                if v ~= ngx.null then
                    return false
                end
            end
            return true
        elseif res == ngx.null then
            return true
        elseif res == nil then
            return true
        end
    
        return false
    end
    
    
    -- change connect address as you need
    function _M.connect_mod( self, redis )
        redis:set_timeout(self.timeout)
        --return redis:connect("127.0.0.1", 6379)
        --return redis:connect(self.ip, self.port)
        local ok ,err = redis:connect(self.ip, self.port)
        if not ok then
            return {}, err
        end
    
        local count
        count, err = redis:get_reused_times()
        if 0 == count then
            --local ok, err = redis:auth("123456")
            ok, err = redis:auth(self.password)
            if not ok then
                --ngx.say("failed to auth: ", err)
                return {},  err
            end
        elseif err then
            --ngx.say("failed to get reused times: ", err)
            return {}, err
        end
    
        return ok, err
    end
    
    
    function _M.set_keepalive_mod( self, redis )
        -- put it into the connection pool of size 100, with 60 seconds max idle time
        --return redis:set_keepalive(60000, 100)
        return redis:set_keepalive(self.max_idle_ms, self.pool_size)
    end
    
    
    function _M.init_pipeline( self )
        self._reqs = {}
    end
    
    
    function _M.commit_pipeline( self )
        local reqs = self._reqs
    
        if nil == reqs or 0 == #reqs then
            return {}, "no pipeline"
        else
            self._reqs = nil
        end
    
        local redis, err = redis_c:new()
        if not redis then
            return nil, err
        end
    
        local ok, err = self:connect_mod(redis)
        if not ok then
            return {}, err
        end
    
        redis:init_pipeline()
        for _, vals in ipairs(reqs) do
            local fun = redis[vals[1]]
            table.remove(vals , 1)
    
            fun(redis, unpack(vals))
        end
    
        local results, err = redis:commit_pipeline()
        if not results or err then
            return {}, err
        end
    
        if is_redis_null(results) then
            results = {}
            ngx.log(ngx.WARN, "is null")
        end
        -- table.remove (results , 1)
    
        self.set_keepalive_mod(redis)
    
        for i,value in ipairs(results) do
            if is_redis_null(value) then
                results[i] = nil
            end
        end
    
        return results, err
    end
    
    
    function _M.subscribe( self, channel )
        local redis, err = redis_c:new()
        if not redis then
            return nil, err
        end
    
        local ok, err = self:connect_mod(redis)
        if not ok or err then
            return nil, err
        end
    
        local res, err = redis:subscribe(channel)
        if not res then
            return nil, err
        end
    
        res, err = redis:read_reply()
        if not res then
            return nil, err
        end
    
        redis:unsubscribe(channel)
        self.set_keepalive_mod(redis)
    
        return res, err
    end
    
    
    local function do_command(self, cmd, ... )
        if self._reqs then
            table.insert(self._reqs, {cmd, ...})
            return
        end
    
        local redis, err = redis_c:new()
        if not redis then
            return nil, err
        end
    
        local ok, err = self:connect_mod(redis)
        if not ok or err then
            return nil, err
        end
    
        local fun = redis[cmd]
        local result, err = fun(redis, ...)
        if not result or err then
            -- ngx.log(ngx.ERR, "pipeline result:", result, " err:", err)
            return nil, err
        end
    
        if is_redis_null(result) then
            result = nil
        end
    
        self:set_keepalive_mod(redis)  -- self.set_keepalive_mod(self, redis)
    
        return result, err
    end
    
    
    for i = 1, #commands do
        local cmd = commands[i]
        _M[cmd] =
                function (self, ...)
                    return do_command(self, cmd, ...)
                end
    end
    
    
    function _M.new(self, opts)
        opts = opts or {}
        --local timeout = (opts.timeout and opts.timeout * 1000) or 1000
        local timeout = opts.timeout or 1000
        local db_index= opts.db_index or 0
    
        --补齐其他可配置项
        local ip = opts.ip or "127.0.0.1"   --默认去连本地redis,目前不支持哨兵和集群模式
        local port = opts.port or 6379
        local max_idle_ms = opts.max_idle_ms or 60000
        local pool_size = opts.pool_size or 64
        local password = opts.password or "password"
        
        --ngx.log(ngx.INFO, "redis timeout: ", timeout, " , db_index: ", db_index)
        --ngx.log(ngx.INFO, "redis ip: ", ip, " , port: ", port)
        --ngx.log(ngx.INFO, "pool_size: ", pool_size, " , max_idle_ms: ", max_idle_ms)
    
        return setmetatable({
                timeout = timeout,
                db_index = db_index,
                ip = ip,
                port = port,
                max_idle_ms = max_idle_ms,
                pool_size = pool_size,
                password = password,
                _reqs = nil }, mt)
    end
    
    
    return _M
    

    代码使用:

    local redis_client = require "wangan.common.redis_iresty"
    local red = redis_client:new({
        ip = "127.0.0.1",
        port = 6379,
        password = "123456",
        timeout = 2000,
        db_index = 0,
        max_idle_ms = 60000,
        pool_size = 32
    })
    
    local ok, err = red:get("my-account")
    
    if not ok then
        ngx.say("failed to get my-account: ", err)
        return
    end
    
    ngx.say("my-account: ", ok)
    

    然后如果不使用封装的工具类,代码是这样的:

    --[[
        https://github.com/openresty/lua-resty-redis
        openresty连接redis的例子
        官方还不支持redis哨兵和集群模式,第三方库倒是有一个https://github.com/steve0511/resty-redis-cluster
    --]]
    
    local redis = require "resty.redis"
    local red = redis:new()
    --red:set_timeouts(connect_timeout, send_timeout, read_timeout)
    red:set_timeouts(1000, 1000, 1000)
    
    --red:connect是新建连接、还有可能是去内建连接池里取一个连接
    local ok, err = red:connect("127.0.0.1", 6379)
    if not ok then
        ngx.say("failed to connect: ", err)
        return
    end
    
    
    --每个连接只需要auth一次,这里get_reused_times判定是否是新连接
    
    local count
    count, err = red:get_reused_times()
    if 0 == count then
        ok, err = red:auth("123456")
        if not ok then
            ngx.say("failed to auth: ", err)
            return
        end
    elseif err then
        ngx.say("failed to get reused times: ", err)
        return
    end
    
    
    local res, err = red:get("my-account")
    
    if not res then
        ngx.say("failed to get my-account: ", err)
        return
    end
    
    if res == ngx.null then
        ngx.say("my-account not found.")
        return
    end
    
    ngx.say("my-account: ", res)
    
    --连接正常使用之后,将连接放入连接池
    --允许空闲10s,最大连接数100
    local ok , err = red:set_keepalive(10000, 100)
    if not ok then
        ngx.say("failed to set_keepalive: ", err)
        return
    end
    
    --[[
        关于将连接放入连接池,一定要确保连接使用过程中未发生异常、即顺利的使用完以后在放入。
        并且要注意连接的状态,因为放入连接池的连接将会带着这个状态,会被其他的业务模块使用到,可能会引发预期之外的问题。
        例如:
        red:connect(ip, 6379)
        red:select(1)
        red:set_keepalive(10000, 100)
        高并发下上面代码将连接放到池里后,后面其他模块如果拿到这个连接还是会取db1操作的,如果默认是想去db0操作那么就会引发bug
        所以正确的代码应该是在set_keepalvie之前,复位默认,red:select(0)
    --]]
    
    

    不使用封装工具类也没啥问题,但是会有很多重复冗余的代码。

    关于哨兵和集群模式

    参看OpenResty连接redis的例子 https://github.com/openresty/lua-resty-redis
    目前官方还不支持redis哨兵和集群模式,第三方库和实现思路可以参考:
    https://github.com/steve0511/resty-redis-cluster

    https://www.sohu.com/a/300661862_355142

    相关文章

      网友评论

        本文标题:OpenResty上使用的redis工具类

        本文链接:https://www.haomeiwen.com/subject/ufelpltx.html