简介
Redis提供了发布订阅功能,可以用于消息的传输,Redis的发布订阅机制包括三个部分,发布者,订阅者,Channel。发布者和订阅者都是Redis客户端,Channel则为Redis服务器端,发布者将消息发送到某个的频道,订阅了这个频道的订阅者就能接收到这条消息。
Redis的这种发布订阅机制与基于主题的发布订阅类似,Channel相当于主题。
Maven 引用
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
Redis配置
spring.redis.host=172.21.12.32
spring.redis.password=JH7j3d
spring.redis.port=6379
spring.redis.database=4
spring.redis.pool.max-active=64
spring.redis.pool.max-idle=16
订阅者监听
package com.kingxunlian.tax.client;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
/**
* @author leijie.gao
* @version 1.0.0
* @ClassName MessagePubListener.java
* @Description TODO
* @createTime 2021年06月02日 13:43:00
*/
@Component
public class MessagePubListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] bytes) {
System.out.println("消费者消费" + message.toString());
}
}
配置监听适配器
package com.kingxunlian.tax.client;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
@Configuration
@EnableCaching
public class RedisCacheConfig {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 可以添加多个 messageListener,配置不同的交换机
container.addMessageListener(listenerAdapter, new PatternTopic("channel:redis-test"));
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(MessagePubListener receiver) {
System.out.println("消息适配器1");
return new MessageListenerAdapter(receiver, "onMessage");
}
}
发布者发布
package com.kingxunlian.tax.biz;
import com.kingxunlian.tax.TaxApplication;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
/**
* @author leijie.gao
* @version 1.0.0
* @ClassName Test.java
* @Description TODO
* @createTime 2021年06月03日 11:19:00
*/
@RunWith(SpringJUnit4ClassRunner.class)
@WebAppConfiguration
@SpringBootTest(classes = TaxApplication.class)
public class Test {
@Autowired
private StringRedisTemplate redisTemplate;
@org.junit.Test
public void test(){
redisTemplate.convertAndSend("channel:redis-test","我开始发布消息了");
}
}
结果
2021-06-03 15:32:13.398 TID: N/A [main] DEBUG c.b.m.core.MybatisConfiguration :87 addMappedStatement: com.kingxunlian.tax.biz.pub.dao.mapper.IndustryTypeMapper.selectPage
2021-06-03 15:32:17.193 TID: N/A [main] INFO c.k.tax.client.RedisCacheConfig :32 加载消息适配器
2021-06-03 15:32:17.206 TID: N/A [main] INFO c.k.tax.client.RedisCacheConfig :25 加载消息监听器
2021-06-03 15:32:17.303 TID: N/A [main] INFO c.k.config.XLSpringCloudConfig :102 restTemplate ini bean success!
2021-06-03 15:32:17.334 TID: N/A [main] INFO c.k.config.XLSpringCloudConfig :135 setLogTraceIdListener ini bean success!
2021-06-03 15:32:17.344 TID: N/A [main] INFO c.k.config.XLSpringCloudConfig :142 GlobalControllerExcptionHandler ini bean success!
2021-06-03 15:32:17.350 TID: N/A [main] INFO c.k.config.XLSpringCloudConfig :149 logConfig init bean success
2021-06-03 15:32:17.680 TID: N/A [main] WARN c.n.c.sources.URLConfigurationSource :121 No URLs will be polled as dynamic configuration sources.
2021-06-03 15:32:17.680 TID: N/A [main] INFO c.n.c.sources.URLConfigurationSource :122 To enable URLs as dynamic configuration sources, define System property archaius.configurationSource.additionalUrls or make config.properties available on classpath.
2021-06-03 15:32:17.692 TID: N/A [main] WARN c.n.c.sources.URLConfigurationSource :121 No URLs will be polled as dynamic configuration sources.
2021-06-03 15:32:17.692 TID: N/A [main] INFO c.n.c.sources.URLConfigurationSource :122 To enable URLs as dynamic configuration sources, define System property archaius.configurationSource.additionalUrls or make config.properties available on classpath.
2021-06-03 15:32:19.083 TID: N/A [main] WARN o.s.c.s.e.EurekaStarterDeprecationWarningAutoConfiguration :19 spring-cloud-starter-eureka is deprecated as of Spring Cloud Netflix 1.4.0, please migrate to spring-cloud-starter-netflix-eureka
2021-06-03 15:32:21.025 TID: N/A [main] INFO com.kingxunlian.tax.biz.RedisTest :57 Started RedisTest in 22.982 seconds (JVM running for 23.882)
2021-06-03 15:32:21.111 TID: N/A [main] INFO com.kingxunlian.tax.biz.RedisTest :30 发布者【老师】开始拿着大喇叭广播拉!
2021-06-03 15:32:21.298 TID: N/A [container-2] INFO c.k.tax.client.MessagePubListener :20 同学们听到消息:同学们全体放假!
结束语
虽然redis实现了发布订阅(publish/subscribe)的功能,但是在通常的情况下是不推荐使用的,如果想使用消息队列这种功能,最好还是使用专业的各种MQ中间件,例如rabbitMQ,rockedMQ,activitedMQ等
PUBLISH和SUBSCRIBE的缺陷在于客户端必须一直在线才能接收到消息,断线可能会导致客户端丢失消息。
所以大家结合情况决定要使用MQ的发布订阅还是用Redis的发布订阅。
网友评论