1. 什么是发布订阅机制 (PUB/SUB)
除了普通的数据类型之外,Redis其实还支持一种类似消息推送的机制,也就是有一个客户端A向redis服务器发布了一条消息,而另外的客户端如果订阅了这个消息,那么就能接收到客户端A发送的消息。
用通俗的话说就是,针对一个主题 topic(一个类比),如果某一个群体对这样的一个主题感兴趣,那么他们就会订阅这样的一个主题里边的内容,如果某个个体有一个与该主题相关的消息希望分享出去,那么他就可以向订阅了这个主题的人进行推送,而redis服务器的作用其实是一个中间人的作用,但需要注意的一点就是,这里的消息并不会被持久化,也就是如果是以前的消息,那么新订阅的个体是无法接收到数据的。
2. 常用指令
-
subscribe
: 订阅指定channel的消息
根据下面的内容,我们可以得到,订阅一个channel(其实可以同时订阅多个channel)成功后,一般会返回三个数据,第一个是类型,表示这是一个订阅的相关的消息,第二个是channel名称,第三个其实是指我们当前订阅的channel数量。
而对于后续接收到的消息,同样也有三个数据,一个是表示这是一个消息也就是message,第二个则是channel的名称,第三个是具体的内容。
127.0.0.1:6379> subscribe sms_send
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "sms_send"
3) (integer) 1
1) "message"
2) "sms_send"
3) "hello"
-
publish
: 发布消息到指定的channel
返回:收到消息的客户端数量。请注意,在 Redis 集群中,只有与发布客户端连接到同一节点的客户端才会包含在计数中。
127.0.0.1:6379> publish sms_send hello
(integer) 1
-
unsubscribe
: 取消订阅,但是在命令行上的时候,其实是没办法很好的测试的,因为subscribe之后会一直处于读取消息的状态,所以大概是供代码调用的吧
127.0.0.1:6379> unsubscribe sms_send
1) "unsubscribe"
2) "sms_send"
3) (integer) 0
-
pubsub channels
: 查看当前的channel
127.0.0.1:6379> pubsub channels
1) "first"
2) "second"
-
pubsub numsub
: 返回指定频道的订阅者数量(不包括订阅模式的客户端)。
127.0.0.1:6379> pubsub numsub sms_send
1) "sms_send"
2) (integer) 1
-
monitor
: monitor是一个调试命令,它流回 Redis 服务器处理的每个命令。它可以帮助理解数据库发生了什么。该命令既可以通过 也可以通过redis-cli使用telnet。
3. 其他的内容
-
psubscribe
: 订阅一个或多个符合给定模式的频道。其中的p表示pattern -
punsubscribe
: 取消订阅一个或多个符合给定模式的频道。其中的P表示pattern -
ssubscribe
: 为客户端订阅指定的分片通道。其中的s表示Shared -
sunsubscribe
: 停止监听发布到给定分片通道的消息。其中的s表示Shared -
spublish
: 发布消息到给定分片通道。其中的s表示Shared
4. 如何在java中使用订阅发布机制 - 例子
- 修改 pom.xml
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>${lettuce.version}</version>
</dependency>
- 创建 AppConfig.java
package cn.lazyfennec.cache.redis;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
class AppConfig {
/**
* 用于测试的通道名称
*/
public final static String TEST_CHANNEL_NAME = "sms_send";
@Bean
public RedisConnectionFactory redisConnectionFactory() {
System.out.println("使用单机版本");
return new LettuceConnectionFactory(new RedisStandaloneConfiguration("192.168.1.7", 6379));
}
@Bean
public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate redisTemplate = new RedisTemplate();
redisTemplate.setConnectionFactory(redisConnectionFactory);
// 可以配置对象的转换规则,比如使用json格式对object进行存储。
// Object --> 序列化 --> 二进制流 --> redis-server存储
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());
return redisTemplate;
}
}
- 订阅
- 3.1 redisTemplate.execute 的方式
package cn.lazyfennec.cache.redis;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* 接收短信通知,直接用客户端的方式
*/
@Component
public class SmsChannelListener {
@Autowired
RedisTemplate redisTemplate;
@PostConstruct
public void setup() {
redisTemplate.execute(new RedisCallback() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
connection.subscribe((message, pattern) -> {
System.out.println("收到消息,使用redisTemplate收到的:" + message);
}, PubsubRedisAppConfig.TEST_CHANNEL_NAME.getBytes());
return null;
}
});
}
}
- 3.2 消息容器的方式
package com.study.cache.redis.a3_pubsub;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
import java.util.*;
/**
* 接收短信通知
*/
@Component
@Configuration
public class SmsChannelListenerBySpring {
// 定义监听器
@Bean
public RedisMessageListenerContainer smsMessageListener(RedisConnectionFactory redisConnectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
SmsSendListener smsSendListener = new SmsSendListener();
container.addMessageListener(smsSendListener, Arrays.asList(new ChannelTopic(PubsubRedisAppConfig.TEST_CHANNEL_NAME)));
return container;
}
// 定义触发的方法
class SmsSendListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
System.out.println("借助spring容器收到消息:" + message);
}
}
}
- 发布 - 例子
@Test
public void test1() throws InterruptedException {
System.out.println("开始测试发布订阅机制,5秒后发布一条消息");
Thread.sleep(5000L);
redisTemplate.execute(new RedisCallback<Long>() {
@Override
public Long doInRedis(RedisConnection connection) throws DataAccessException {
// 发送通知
Long received = connection.publish(PubsubRedisAppConfig.TEST_CHANNEL_NAME.getBytes(), "{手机号码10086~短信内容~~}".getBytes());
return received;
}
});
}
5. Redis keyspace notifications
实时监控 Redis 键和值的变化
Keyspace 通知允许客户端订阅 Pub/Sub 频道,以便接收以某种方式影响 Redis 数据集的事件。
可能接收的事件示例如下:
- 所有影响给定键的命令。
- 所有接收LPUSH操作的键。
- 所有在数据库0中到期的键。
事件使用Redis的普通发布/订阅层传递,因此实现了发布/订阅的客户端无需修改即可使用此功能。
由于Redis的发布/订阅是fire and forget,因此如果你的应用要求可靠的事件通知,目前还不能使用这个功能,也就是说,如果你的发布/订阅客户端断开连接,并在稍后重连,那么所有在客户端断开期间发送的事件将会丢失。
事件类型
键空间通知的实现是为每一个影响Redis数据空间的操作发送两个不同类型的事件。例如,在数据库0中名为mykey的键上执行DEL操作,将触发两条消息的传递,完全等同于下面两个PUBLISH命令:
PUBLISH __keyspace@0__:mykey del
PUBLISH __keyevent@0__:del mykey
以上很容易看到,一个频道允许监听所有以键mykey为目标的所有事件,以及另一个频道允许获取有关所有DEL操作目标键的信息。
第一种事件,在频道中使用keyspace前缀的被叫做键空间通知,第二种,使用keyevent前缀的,被叫做键事件通知。
在以上例子中,为键mykey生成了一个del事件。 会发生什么:
- 键空间频道接收到的消息是事件的名称。
- 键事件频道接收到的消息是键的名称。
可以只启用其中一种通知,以便只传递我们感兴趣的事件子集。
配置
默认情况下,键空间事件通知是不启用的,因为虽然不太明智,但该功能会消耗一些CPU。可以使用redis.conf中的notify-keyspace-events或者使用CONFIG SET命令来开启通知。
将参数设置为空字符串会禁用通知。 为了开启通知功能,使用了一个非空字符串,由多个字符组成,每一个字符都有其特殊的含义,具体参见下表:
- 英文版
参数 | 说明 |
---|---|
K | Keyspace events, published with keyspace@<db> prefix. |
E | Keyevent events, published with keyevent@<db> prefix. |
g | Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ... |
$ | String commands |
l | List commands |
s | Set commands |
h | Hash commands |
z | Sorted set commands |
t | Stream commands |
d | Module key type events |
x | Expired events (events generated every time a key expires) |
e | Evicted events (events generated when a key is evicted for maxmemory) |
m | Key miss events (events generated when a key that doesn't exist is accessed) |
n | New key events (Note: not included in the 'A' class) |
A | Alias for "g$lshztxed", so that the "AKE" string means all the events except "m". |
过期事件
设置了生存时间的键由Redis以两种方式过期:
- 当命令访问键时,发现键已过期。
- 通过后台系统在后台逐步查找过期的键,以便能够收集那些从未被访问的键。
当通过以上系统之一访问键且发现键已经过期时,将生成expired事件。因此无法保证Redis服务器在键过期的那一刻同时生成expired事件。
如果没有命令不断地访问键,并且有很多键都有关联的TTL,那么在键的生存时间降至零到生成expired事件之间,将会有明显的延迟。
基本上,expired事件是在Redis服务器删除键的时候生成的,而不是在理论上生存时间达到零值时生成的。
如果觉得有收获,欢迎点赞和评论,更多知识,请点击关注查看我的主页信息哦~
网友评论