Redis支持发布/订阅模式,在发布订阅模式中,主要涉及到三个角色:
- 发布者 (publisher)
- 订阅者 (subscriber)
- 通道(channel)
在SpringBoot2.1.4版本,先来一个简单的发布订阅demo,这里有一个发布者,发布了两个通道;有三个订阅者,第一个订阅者订阅第一个通道,第二个订阅者订阅第二个通道,第三个订阅者同时订阅两个通道。
首先,需要在pom文件中引入必须的架包;
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.1.0</version><!-- 加版本号,是预防在使用@Test注解测试时,找不到JedisPubSub类 -->
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
第二步,上测试demo;
import org.junit.Test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
/**
* Jedis发布/订阅者测试
* @author 程就人生
* @date 2019年12月19日
*/
public class JedisTest {
//发布者
@Test
public void pub(){
Jedis jedis = new Jedis("127.0.0.1", 6379);
//发布者1
jedis.publish("publish", "我是发布者,我发布了一些内容,aass");
jedis.publish("publish2", "我是发布者2,我发布了一些内容,bbbbb");
}
//第一个订阅者
//@Test
public void sub1(){
Jedis jedis = new Jedis("127.0.0.1", 6379);
jedis.subscribe(new JedisPubSub(){
@Override
public void onMessage(String channel, String message) {
System.out.println("第一个订阅者");
System.out.println("订阅的通道为:" + channel);
System.out.println("接收到的内容为:" + message);
}
}, "publish");
}
//第二个订阅者
//@Test
public void sub2(){
Jedis jedis = new Jedis("127.0.0.1", 6379);
jedis.subscribe(new JedisPubSub(){
@Override
public void onMessage(String channel, String message) {
System.out.println("第二个订阅者");
System.out.println("订阅的通道为:" + channel);
System.out.println("接收到的内容为:" + message);
}
}, "publish2");
}
//第三个订阅者
//@Test
public void sub3(){
Jedis jedis = new Jedis("127.0.0.1", 6379);
jedis.subscribe(new JedisPubSub(){
@Override
public void onMessage(String channel, String message) {
System.out.println("第三个订阅者");
System.out.println("订阅的通道为:" + channel);
System.out.println("接收到的内容为:" + message);
}
}, "publish", "publish2");
}
}
最后,测试;先把订阅者一个一个启动起来,最后再启动发布者,这时可以看到每个订阅者都收到了自己该收到的消息;
第一个订阅者
订阅的通道为:publish
接收到的内容为:我是发布者,我发布了一些内容,aass
第二个订阅者
订阅的通道为:publish2
接收到的内容为:我是发布者2,我发布了一些内容,bbbbb
第三个订阅者
订阅的通道为:publish
接收到的内容为:我是发布者,我发布了一些内容,aass
第三个订阅者
订阅的通道为:publish2
接收到的内容为:我是发布者2,我发布了一些内容,bbbbb
在这个demo中,使用的是Jedis框架,使用极其简单;但是,在实际的开发中,肯定没有这么简单,如何才能和SpringBoot更好的地结合呢,这就需要单独地写Redis的配置文件;
首先,还是pom文件里引入必须的架包;
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
第二步,redis配置文件,这里使用了Lettuce框架,也可以使用Jedis框架,Jedis框架配置参数设置比较麻烦;
package com.example.demo.config;
import java.io.Serializable;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
* redis配置
* @author 程就人生
* @date 2019年12月19日
*/
@Configuration
@EnableCaching//开启注解式缓存
public class SRedisConfig extends CachingConfigurerSupport {
/**
* 配置lettuce连接池(多数据源的公共参数)
* GenericObjectPoolConfig不是线程安全的
* @return
*/
@SuppressWarnings("rawtypes")
@Bean
@ConfigurationProperties(prefix = "spring.redis.lettuce.pool")
public GenericObjectPoolConfig redisPool() {
return new GenericObjectPoolConfig<>();
}
/**
* 配置数据源
*
* @return
*/
@Bean
@ConfigurationProperties(prefix = "spring.redis")
public RedisStandaloneConfiguration redisConfig() {
return new RedisStandaloneConfiguration();
}
/**
*
* @param config
* @param redisConfig
* @return
*/
@SuppressWarnings("rawtypes")
@Bean("factory")
@Primary
public LettuceConnectionFactory factory(GenericObjectPoolConfig config, RedisStandaloneConfiguration redisConfig) {
LettuceClientConfiguration clientConfiguration = LettucePoolingClientConfiguration.builder().poolConfig(config).build();
return new LettuceConnectionFactory(redisConfig, clientConfiguration);
}
/**
*
* @param factory
* @return
*/
@Bean("redisTemplate")
public RedisTemplate<Serializable, Serializable> redisTemplate(@Qualifier("factory") RedisConnectionFactory factory) {
RedisTemplate<Serializable, Serializable> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
//使用StringRedisSerializer来序列化和反序列化redis的key值
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new JdkSerializationRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(new JdkSerializationRedisSerializer());
//开启事务
template.setEnableTransactionSupport(true);
template.afterPropertiesSet();
return template;
}
}
第三步,测试代码;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 发布订阅测试
* @author 程就人生
* @date 2019年12月19日
*/
@RestController
public class Index1Controller {
@Autowired
LettuceConnectionFactory factory;
//发布次数
public static int i=0;
//订阅
@GetMapping("/sub")
public void sub(){
factory.getConnection().subscribe(new MessageListener(){
@Override
public void onMessage(Message message, byte[] pattern) {
System.out.println(message);
}
}, "Lettuce通道".getBytes());
}
//发布
@GetMapping("/pub")
public void pub(){
factory.getConnection().publish("Lettuce通道".getBytes(), ("我是Lettuce"+(i++)).getBytes());
}
}
最后,测试;启动入口程序,先订阅再发布,这样订阅者才能收到发布者发布的消息,先发布再订阅,那么订阅事件之前发布的信息将不能收到。
网友评论