美文网首页程序员Redis应用与原理
Redis入坟(二)高级特性,发布订阅、事务、Lua脚本

Redis入坟(二)高级特性,发布订阅、事务、Lua脚本

作者: 源码之路 | 来源:发表于2020-08-05 09:26 被阅读0次

    目标
    1、学习 Redis 的一些高级特性,包括发布订阅、事务、Lua 脚本

    1、发布订阅模式

    1.1列表的局限

    前面我们说通过队列的 rpush 和 lpop 可以实现消息队列(队尾进队头出),但是消费者需要不停地调用 lpop 查看 List 中是否有等待处理的消息(比如写一个 while 循环)。

    为了减少通信的消耗,可以 sleep()一段时间再消费,但是会有两个问题:

    1. 如果生产者生产消息的速度远大于消费者消费消息的速度,List 会占用大量的内存。
    2. 消息的实时性降低
      list 还提供了一个阻塞的命令:blpop,没有任何元素可以弹出的时候,连接会被阻塞。blpop queue 5,阻塞5秒。
      基于 list 实现的消息队列,不支持一对多的消息分发。

    1.2发布订阅模式

    除了通过 list 实现消息队列之外,Redis 还提供了一组命令实现发布/订阅模式。

    这种方式,发送者和接收者没有直接关联(实现了解耦),接收者也不需要持续尝试获取消息。

    1.2.1 订阅频道

    可以订阅一个或者多个频道。消息的发布者(生产者)可以给指定的频道发布消息。只要有消息到达了频道,所有订阅了这个频道的订阅者都会收到这条消息。

    需要注意的注意是,发出去的消息不会被持久化,因为它已经从队列里面移除了,所以消费者只能收到它开始订阅这个频道之后发布的消息。

    下面我们来看一下发布订阅命令的使用方法。
    订阅者订阅频道:可以一次订阅多个,比如这个客户端订阅了 3 个频道。

    subscribe channel-1 channel-2 channel-3
    

    发布者可以向指定频道发布消息(并不支持一次向多个频道发送消息):

    publish channel-1 2673
    

    取消订阅(不能在订阅状态下使用):

    unsubscribe channel-1
    

    1.2.2 按规则(Pattern) 订阅频道

    支持?和占位符。?代表一个字符,代表 0 个或者多个字符。
    消费端 1,关注运动信息:

    psubscribe *sport
    

    消费端 2,关注所有新闻:

    psubscribe news*
    

    消费端 3,关注天气新闻:

    psubscribe news-weather
    

    生产者,发布 3 条信息

    publish news-sport yaoming
    publish news-music jaychou
    publish news-weather rain
    

    java 代码

    package pubsub;
    
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPubSub;
    
    public class PublishSubscribe {
    
        public static void main(String[] args) {
            Jedis jedis = new Jedis("192.168.0.224", 6379);
            jedis.subscribe(new Subscriber(),"channel");
    
        }
    }
    class Subscriber extends JedisPubSub{
    
        @Override
        public void onMessage(String channel, String message) {
            System.out.println("接收到的消息:"+message);
        }
    
        @Override
        public void onSubscribe(String channel, int subscribedChannels) {
            System.out.println("onSubscribe---channel:"+channel+",subscribedChannels:"+subscribedChannels);
        }
    
        @Override
        public void onPUnsubscribe(String pattern, int subscribedChannels) {
            System.out.println("onPUnsubscribe---pattern:"+pattern+",subscribedChannels:"+subscribedChannels);
        }
    
        @Override
        public void onPSubscribe(String pattern, int subscribedChannels) {
            System.out.println("onPSubscribe---pattern:"+pattern+",subscribedChannels:"+subscribedChannels);
        }
    
        @Override
        public void unsubscribe(String... channels) {
            super.unsubscribe(channels);
        }
    }
    
    
    package pubsub;
    
    import redis.clients.jedis.Jedis;
    
    public class PublishTest {
        public static void main(String[] args) {
            Jedis jedis = new Jedis("192.168.0.224", 6379);
            jedis.publish("channel","你好呀");
            jedis.close();
    
        }
    }
    

    2 、Redis 事务

    2.1 为什么要用事务

    我们知道 Redis 的单个命令是原子性的(比如 get set mget mset),如果涉及到多个命令的时候,需要把多个命令作为一个不可分割的处理序列,就需要用到事务。

    例如我们之前说的用 setnx 实现分布式锁,我们先 set,然后设置对 key 设置 expire,防止 del 发生异常的时候锁不会被释放,业务处理完了以后再 del,这三个动作我们就希望它们作为一组命令执行。

    Redis 的事务有两个特点:

    1. 按进入队列的顺序执行。
    2. 不会受到其他客户端的请求的影响。
      Redis 的事务涉及到四个命令:multi(开启事务),exec(执行事务),discard(取消事务),watch(监视)

    2.2 事务的用法

    案例场景:tom 和 mic 各有 1000 元,tom 需要向 mic 转账 100 元。
    tom 的账户余额减少 100 元,mic 的账户余额增加 100 元。

    127.0.0.1:6379> set tom 1000
    OK
    127.0.0.1:6379> set mic 1000
    OK
    127.0.0.1:6379> multi
    OK
    127.0.0.1:6379> decrby tom 100
    QUEUED
    127.0.0.1:6379> incrby mic 100
    QUEUED
    127.0.0.1:6379> exec
    1) (integer) 900
    2) (integer) 1100
    127.0.0.1:6379> get tom
    "900"
    127.0.0.1:6379> get mic
    "1100"
    

    通过 multi 的命令开启事务。事务不能嵌套,多个 multi 命令效果一样。
    multi 执行后,客户端可以继续向服务器发送任意多条命令, 这些命令不会立即被执行, 而是被放到一个队列中, 当 exec 命令被调用时, 所有队列中的命令才会被执行。

    通过 exec 的命令执行事务。如果没有执行 exec,所有的命令都不会被执行。如果中途不想执行事务了,怎么办?可以调用 discard 可以清空事务队列,放弃执行。

    multi
    set k1 1
    set k2 2
    set k3 3
    discard
    

    2.3 watch 命令

    在 Redis 中还提供了一个 watch 命令。
    它可以为 Redis 事务提供 CAS 乐观锁行为(Check and Set / Compare and Swap),也就是多个线程更新变量的时候,会跟原值做比较,只有它没有被其他线程修改的情况下,才更新成新的值。

    我们可以用 watch 监视一个或者多个 key,如果开启事务之后,至少有一个被监视key 键在 exec 执行之前被修改了, 那么整个事务都会被取消(key 提前过期除外)。可以用 unwatch 取消。

    2.4 事务可能遇到的问题

    我们把事务执行遇到的问题分成两种,一种是在执行 exec 之前发生错误,一种是在执行 exec 之后发生错误。

    2.4.1 在执行 exec 之前发生错误

    比如:入队的命令存在语法错误,包括参数数量,参数名等等(编译器错误)。

    127.0.0.1:6379> multi
    OK
    127.0.0.1:6379> set gupao 666
    QUEUED
    127.0.0.1:6379> hset qingshan 2673
    (error) ERR wrong number of arguments for 'hset' command
    127.0.0.1:6379> exec
    (error) EXECABORT Transaction discarded because of previous errors.
    

    在这种情况下事务会被拒绝执行,也就是队列中所有的命令都不会得到执行。

    2.4.2 在执行 exec 之后发生错误

    比如,类型错误,比如对 String 使用了 Hash 的命令,这是一种运行时错误。

    127.0.0.1:6379> flushall
    OK
    127.0.0.1:6379> multi
    OK
    127.0.0.1:6379> set k1 1
    QUEUED
    127.0.0.1:6379> hset k1 a b
    QUEUED
    127.0.0.1:6379> exec
    1) OK
    2) (error) WRONGTYPE Operation against a key holding the wrong kind of value
    127.0.0.1:6379> get k1
    "1"
    

    最后我们发现 set k1 1 的命令是成功的,也就是在这种发生了运行时异常的情况下,只有错误的命令没有被执行,但是其他命令没有受到影响。

    这个显然不符合我们对原子性的定义,也就是我们没办法用 Redis 的这种事务机制来实现原子性,保证数据的一致。

    思考(作业):
    为什么在一个事务中存在错误,Redis 不回滚?

    3 Lua 脚本

    Lua是一种轻量级脚本语言,它是用 C 语言编写的,跟数据的存储过程有点类似。 使用 Lua 脚本来执行 Redis 命令的好处:

    1. 一次发送多个命令,减少网络开销。
    2. Redis 会将整个脚本作为一个整体执行,不会被其他请求打断,保持原子性。
    3. 对于复杂的组合命令,我们可以放在文件中,可以实现程序之间的命令集复用。

    3.1 在 Redis 中调用 Lua 脚本

    使用 eval 方法,语法格式:

    redis> eval lua-script key-num [key1 key2 key3 ....] [value1 value2 value3 ....]
    

     eval 代表执行 Lua 语言的命令。
     lua-script 代表 Lua 语言脚本内容。
     key-num 表示参数中有多少个 key, 需要注意的是 Redis 中 key 是从 1 开始的, 如果没有 key 的参数, 那么写 0。
     [key1 key2 key3…]是 key 作为参数传递给 Lua 语言, 也可以不填, 但是需要和 key-num 的个数对应起来。
     [value1 value2 value3 ….]这些参数传递给 Lua 语言, 它们是可填可不填的。

    示例,返回一个字符串,0 个参数:

    redis> eval "return 'Hello World'" 0
    

    3.2 在 Lua 脚本中调用 Redis 命令

    使用 redis.call(command, key [param1, param2…])进行操作。语法格式:

    redis> eval "redis.call('set',KEYS[1],ARGV[1])" 1 lua-key lua-value
    

     command 是命令, 包括 set、 get、 del 等。
     key 是被操作的键。
     param1,param2…代表给 key 的参数。

    注意跟 Java 不一样,定义只有形参,调用只有实参。
    Lua 是在调用时用 key 表示形参,argv 表示参数值(实参)。

    3.2.1 设置键值对

    在 Redis 中调用 Lua 脚本执行 Redis 命令
    以上命令等价于 set gupao 2673
    在 redis-cli 中直接写 Lua 脚本不够方便,也不能实现编辑和复用,通常我们会把脚本放在文件里面,然后执行这个文件。

    3.2.2 在 Redis 中调用 Lua 脚本文件中的命令, 操作 Redis

    创建 Lua 脚本文件:

    cd /usr/local/soft/redis5.0.5/src
    vim gupao.lua
    

    Lua 脚本内容,先设置,再取值:

    redis.call('set','gupao','lua666')
    return redis.call('get','gupao')
    

    在 Redis 客户端中调用 Lua 脚本

    cd /usr/local/soft/redis5.0.5/src
    redis-cli --eval gupao.lua 0
    

    得到返回值:

    [root@localhost src]# redis-cli --eval gupao.lua 0
    "lua666"
    

    3.2.3 案例: 对 IP 进行限流

    需求:某个IP,在 X 秒内只能访问 Y 次。
    设计思路:用 key 记录 IP,用 value 记录访问次数。拿到 IP 以后,对 IP+1。如果是第一次访问,对 key 设置过期时间(参数 1)。否则判断次数,超过限定的次数(参数 2),返回 0。如果没有超过次数则返回 1。超过时间,key 过期之后,可以再次访问。KEY[1]是 IP, ARGV[1]是过期时间 X,ARGV[2]是限制访问的次数 Y。

    -- ip_limit.lua
    -- IP 限流, 对某个 IP 频率进行限制 , 6 秒钟访问 10 次
    local num=redis.call('incr',KEYS[1])
    if tonumber(num)==1 then
      redis.call('expire',KEYS[1],ARGV[1])
      return 1
    elseif tonumber(num)>tonumber(ARGV[2]) then
      return 0
    else
      return 1
    end
    

    6 秒钟内限制访问 10 次,调用测试(连续调用 10 次):

    ./redis-cli --eval "ip_limit.lua" app:ip:limit:192.168.8.111 , 6 10
    

     app:ip:limit:192.168.8.111 是 key 值 ,后面是参数值,中间要加上一个空格 和
    一个逗号,再加上一个 空格 。
    即:./redis-cli –eval [lua 脚本] [key…]空格,空格[args…]
     多个参数之间用一个 空格 分割

    3.2.4 缓存 Lua 脚本

    为什么要缓存
    在脚本比较长的情况下,如果每次调用脚本都需要把整个脚本传给 Redis 服务端,会产生比较大的网络开销。为了解决这个问题,Redis 提供了 EVALSHA 命令,允许开发者通过脚本内容的 SHA1 摘要来执行脚本。

    如何缓存
    Redis 在执行 script load 命令时会计算脚本的 SHA1 摘要并记录在脚本缓存中,执行 EVALSHA 命令时 Redis 会根据提供的摘要从脚本缓存中查找对应的脚本内容,如果找到了则执行脚本,否则会返回错误:"NOSCRIPT No matching script. Please use EVAL."

    127.0.0.1:6379> script load "return 'Hello World'"
    "470877a599ac74fbfda41caa908de682c5fc7d4b"
    127.0.0.1:6379> evalsha "470877a599ac74fbfda41caa908de682c5fc7d4b" 0
    "Hello World"
    

    自乘案例
    Redis 有 incrby 这样的自增命令,但是没有自乘,比如乘以 3,乘以 5。
    我们可以写一个自乘的运算,让它乘以后面的参数:

    local curVal = redis.call("get", KEYS[1])
    if curVal == false then
      curVal = 0
    else
      curVal = tonumber(curVal)
    end
    curVal = curVal * tonumber(ARGV[1])
    redis.call("set", KEYS[1], curVal)
    return curVal
    

    把这个脚本变成单行,语句之间使用分号隔开

    local curVal = redis.call("get", KEYS[1]); if curVal == false then curVal = 0 else curVal = tonumber(curVal) end; curVal
    = curVal * tonumber(ARGV[1]); redis.call("set", KEYS[1], curVal); return curVal
    

    script load '命令'

    127.0.0.1:6379> script load 'local curVal = redis.call("get", KEYS[1]); if curVal == false then curVal = 0 else curVal =
    tonumber(curVal) end; curVal = curVal * tonumber(ARGV[1]); redis.call("set", KEYS[1], curVal); return curVal'
    
    "be4f93d8a5379e5e5b768a74e77c8a4eb0434441"
    

    调用:

    127.0.0.1:6379> set num 2
    OK
    127.0.0.1:6379> evalsha be4f93d8a5379e5e5b768a74e77c8a4eb0434441 1 num 6
    (integer) 12
    

    3.2.5 脚本超时

    Redis 的指令执行本身是单线程的,这个线程还要执行客户端的 Lua 脚本,如果 Lua脚本执行超时或者陷入了死循环,是不是没有办法为客户端提供服务了呢?

    eval 'while(true) do end' 0
    

    为 了防 止 某个 脚本 执 行时 间 过长 导 致 Redis 无 法提 供 服务 , Redis 提 供 了lua-time-limit 参数限制脚本的最长运行时间,默认为 5 秒钟。
    lua-time-limit 5000(redis.conf 配置文件中)

    当脚本运行时间超过这一限制后,Redis 将开始接受其他命令但不会执行(以确保脚本的原子性,因为此时脚本并没有被终止),而是会返回“BUSY”错误。

    Redis 提供了一个 script kill 的命令来中止脚本的执行。新开一个客户端:

    script kill
    

    如果当前执行的 Lua 脚本对 Redis 的数据进行了修改(SET、DEL 等),那么通过script kill 命令是不能终止脚本运行的。

    127.0.0.1:6379> eval "redis.call('set','gupao','666') while true do end" 0
    

    因为要保证脚本运行的原子性,如果脚本执行了一部分终止,那就违背了脚本原子性的要求。最终要保证脚本要么都执行,要么都不执行。

    127.0.0.1:6379> script kill
    (error) UNKILLABLE Sorry the script already executed write commands against the dataset. You can either wait the script termination or kill the server in a hard way using the SHUTDOWN NOSAVE command.
    

    遇到这种情况,只能通过 shutdown nosave 命令来强行终止 redis。
    shutdown nosave 和 shutdown 的区别在于 shutdown nosave 不会进行持久化操作,意味着发生在上一次快照后的数据库修改都会丢失。

    总结:如果我们有一些特殊的需求,可以用 Lua 来实现,但是要注意那些耗时的操作。

    相关文章

      网友评论

        本文标题:Redis入坟(二)高级特性,发布订阅、事务、Lua脚本

        本文链接:https://www.haomeiwen.com/subject/mgcbrktx.html