美文网首页Just programing
Redis 发布订阅模式

Redis 发布订阅模式

作者: chengchaos | 来源:发表于2020-02-14 18:30 被阅读0次

    https://chengchaos.github.io/2019/04/25/redis-pub-sub-notes.html

    通常来说,发布订阅(又叫 pub/sub)的特点是订阅者(listener)负责订阅频道(channel),发布者(publisher)负责向频道发送二进制字符串消息(binary string message)。每当有消息被发送给特定的频道,频道的所有订阅者都会收到消息。

    命令

    
    # 订阅一个或者多个频道
    SUBSCRIBE
    SUBSCRIBE channel [channel ...] 
    
    # 退订给定的一个或者多个频道
    # 如果不指定频道名称,则退订所有频道
    UNSUBSCRIBE
    UNSUBSCRIBE [channel [channel ...]]
    
    # 订阅给定模式的频道
    PSUBSCRIBE
    PSUBSCRIBE pattern [pattern ...]
    
    # 退订给定模式的频道
    # 如果不指定模式,则退订所有频道
    PUNSUBSCRIBE 
    PUNSUBSCRIBE [pattern [pattern ...]
    
    # 向给定的频道发送消息
    PUBLISH
    PUBLISH <channel> <message>
    

    Java

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
                <exclusions>
                    <exclusion>
                        <groupId>io.lettuce</groupId>
                        <artifactId>lettuce-core</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>2.9.0</version>
            </dependency>
    

    Jedis

    Jedis中的JedisPubSub抽象类提供了订阅和取消的功能。想处理订阅和取消订阅某些channel的相关事件,我们得扩展JedisPubSub类并实现相关的方法:

    package com.demo.redis;
    
    import org.apache.log4j.Logger;
    import redis.clients.jedis.JedisPubSub;
    
    public class Subscriber extends JedisPubSub {//注意这里继承了抽象类JedisPubSub
    
        private static final Logger LOGGER = Logger.getLogger(Subscriber.class);
    
        @Override
        public void onMessage(String channel, String message) {
            LOGGER.info(String.format("Message. Channel: %s, Msg: %s", channel, message));
        }
    
        @Override
        public void onPMessage(String pattern, String channel, String message) {
            LOGGER.info(String.format("PMessage. Pattern: %s, Channel: %s, Msg: %s", 
                pattern, channel, message));
        }
    
        @Override
        public void onSubscribe(String channel, int subscribedChannels) {
            LOGGER.info("onSubscribe");
        }
    
        @Override
        public void onUnsubscribe(String channel, int subscribedChannels) {
            LOGGER.info("onUnsubscribe");
        }
    
        @Override
        public void onPUnsubscribe(String pattern, int subscribedChannels) {
            LOGGER.info("onPUnsubscribe");
        }
    
        @Override
        public void onPSubscribe(String pattern, int subscribedChannels) {
            LOGGER.info("onPSubscribe");
        }
    }
    

    有了订阅者,我们还需要一个发布者:

    
    package com.demo.redis;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import org.apache.log4j.Logger;
    import redis.clients.jedis.Jedis;
    
    public class Publisher {
    
        private static final Logger LOGGER = Logger.getLogger(Publisher.class);
        private final Jedis publisherJedis;
        private final String channel;
    
        public Publisher(Jedis publisherJedis, String channel) {
            this.publisherJedis = publisherJedis;
            this.channel = channel;
        }
    
        /**
         * 不停的读取输入,然后发布到channel上面,遇到quit则停止发布。
         */
        public void startPublish() {
            LOGGER.info("Type your message (quit for terminate)");
            try {
                BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
                while (true) {
                    String line = reader.readLine();
                    if (!"quit".equals(line)) {
                        publisherJedis.publish(channel, line);
                    } else {
                        break;
                    }
                }
            } catch (IOException e) {
                LOGGER.error("IO failure while reading input", e);
            }
        }
    }
    

    为简单起见,这个发布者接收控制台的输入,然后将输入的消息发布到指定的channel上面,如果输入quit,则停止发布消息。

    接下来是主函数:

    package com.demo.redis;
    
    import org.apache.log4j.Logger;
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    import redis.clients.jedis.JedisPoolConfig;
    
    public class Program {
        
        public static final String CHANNEL_NAME = "MyChannel";
        //我这里的Redis是一个集群,192.168.56.101和192.168.56.102都可以使用
        public static final String REDIS_HOST = "192.168.56.101";
        public static final int REDIS_PORT = 7000;
        
        private final static Logger LOGGER = Logger.getLogger(Program.class);
        private final static JedisPoolConfig POOL_CONFIG = new JedisPoolConfig();
        private final static JedisPool JEDIS_POOL = 
                new JedisPool(POOL_CONFIG, REDIS_HOST, REDIS_PORT, 0);
        
        public static void main(String[] args) throws Exception {
            final Jedis subscriberJedis = JEDIS_POOL.getResource();
            final Jedis publisherJedis = JEDIS_POOL.getResource();
            final Subscriber subscriber = new Subscriber();
            //订阅线程:接收消息
            new Thread(new Runnable() {
                public void run() {
                    try {
                        LOGGER.info("Subscribing to \"MyChannel\". This thread will be blocked.");
                        //使用subscriber订阅CHANNEL_NAME上的消息,这一句之后,线程进入订阅模式,阻塞。
                        subscriberJedis.subscribe(subscriber, CHANNEL_NAME);
                        
                        //当unsubscribe()方法被调用时,才执行以下代码
                        LOGGER.info("Subscription ended.");
                    } catch (Exception e) {
                        LOGGER.error("Subscribing failed.", e);
                    }
                }
            }).start();
            
            //主线程:发布消息到CHANNEL_NAME频道上
            new Publisher(publisherJedis, CHANNEL_NAME).startPublish();
            publisherJedis.close();
            
            //Unsubscribe
            subscriber.unsubscribe();
            subscriberJedis.close();
        }
    }
    

    参考: https://my.oschina.net/itblog/blog/601284

    Spring

    config

    
    /**
     * @author zhangsi
     * @date created in 2018/3/1 15:30
     */
    @Configuration
    public class RedisConfig {
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisConfig.class);
    
        @Bean
        public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
            Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
            ObjectMapper om = new ObjectMapper();
            om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            jackson2JsonRedisSerializer.setObjectMapper(om);
            RedisSerializer<String> stringSerializer = new StringRedisSerializer();
            RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
            template.setConnectionFactory(redisConnectionFactory);
            template.setKeySerializer(stringSerializer);
            template.setValueSerializer(stringSerializer);
            template.setHashKeySerializer(stringSerializer);
            template.setHashValueSerializer(jackson2JsonRedisSerializer);
            template.afterPropertiesSet();
            return template;
        }
    
        @Bean
        public Jedis jedis(RedisProperties redisProperties) {
    
    //        Field jedisField = ReflectionUtils.findField(JedisConnection.class, "jedis");
    //
    //        ReflectionUtils.makeAccessible(jedisField);
    //
    //        Jedis jedis = (Jedis) ReflectionUtils.getField(jedisField, redisConnectionFactory.getConnection());
    
            Jedis jedis = new Jedis(redisProperties.getHost(), redisProperties.getPort());
    
            if (redisProperties.getPassword() != null) {
                jedis.auth(redisProperties.getPassword());
            }
    
            return jedis;
        }
    
    
        /**
         * 本模块中,创建 jedis 客户端的目的是为了使用原生的 api 来完成上锁和解锁的操作
         * 见 {@code CtrlCommandHelper#tryGetDistributedLock() } 和
         * {@code CtrlCommandHelper.releaseDistributedLock() }方法
         *
         * 因为 RedisTemplate 提供的 api 不足以完成原子操作的同时并返回执行结果
         * spring boot2 后 redis 默认的客户端由 jedis 变为 Lettuce
         * 所以理论上可以不创建 jedis 客户端,所以建议通过从
         * RedisTemplate 中获取 nativeConnection 来获取 lettuce 原生 api 来完成加锁和解锁的操作。
         *
         * @see  cn.futuremove.tsp.vehicle.control.service.CommandHelper#tryGetDistributedLock
         * @see  cn.futuremove.tsp.vehicle.control.service.CommandHelper#releaseDistributedLock
         */
        @Bean
        public JedisPool jedisPool(RedisProperties redisProperties) {
    
    
            JedisPool jedisPool = new JedisPool(new GenericObjectPoolConfig(),
                    redisProperties.getHost(),
                    redisProperties.getPort() == 0 ? Protocol.DEFAULT_PORT : redisProperties.getPort(),
                    Protocol.DEFAULT_TIMEOUT,
                    redisProperties.getPassword());
    
            LOGGER.debug("JedisPool : {}", jedisPool);
    
            return jedisPool;
        }
    
        @Bean
        @ConditionalOnProperty(value = {"spring.redis.cluster"})
        public JedisCluster jedisCluster(RedisProperties redisProperties) {
    
            List<String> nodes = redisProperties.getCluster().getNodes();
    
            Set<HostAndPort> nodeSet = new HashSet<>(nodes.size());
    
            for (String node : nodes) {
    
                String[] nodeArr = node.split(":");
    
                nodeSet.add(new HostAndPort(nodeArr[0], Integer.parseInt(nodeArr[1])));
            }
    
            JedisCluster jedisCluster = new JedisCluster(nodeSet);
    
            LOGGER.debug("JedisCluster : {}", jedisCluster);
    
            return jedisCluster;
        }
    
        @Autowired
        private RedisConnectionFactory redisConnectionFactory;
    
        @Autowired
        private OfflineNotifyRedisMessageListener offlineNotifyRedisMessageListener;
    
        @Autowired
        private CommandResultMessageListener commandResultMessageListener;
    
        /**
         *
         * @return
         */
        @Bean
        public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
            ThreadPoolTaskScheduler scheduler =
                    new ThreadPoolTaskScheduler();
            scheduler.setPoolSize(20);
            //scheduler.setPoolSize(50);
            scheduler.setThreadGroupName("chaos");
            scheduler.setThreadNamePrefix("chaos-");
            return scheduler;
        }
    
        /**
         * 定义 Redis 的监听容器
         * @return offlineNotifyRedisMessageListener
         */
        @Bean
        public RedisMessageListenerContainer redisMessageListenerContainer() {
    
            RedisMessageListenerContainer container =
                    new RedisMessageListenerContainer();
    
            container.setConnectionFactory(redisConnectionFactory);
            container.setTaskExecutor(threadPoolTaskScheduler());
    
            Topic appOfflineTopic = new ChannelTopic("app_offline");
            container.addMessageListener(offlineNotifyRedisMessageListener, appOfflineTopic);
    
            Topic ctrlNotify = new ChannelTopic(RedisKeyType.VEHICLE_CTRL_NOTIFY_CHANNEL_TOPIC);
            container.addMessageListener(commandResultMessageListener, ctrlNotify);
    
            return container;
        }
    }
    

    OfflineNotifyRedisMessageListener

    
    /**
     * <p>
     * <strong>
     *     一个 Redis 消息监听器
     * </strong><br /><br />
     * 用于监听 Redis 中的消息, 当有用户异地登录, 通知原来登录的用户,
     * 并提示下线.
     *
     *
     * </p>
     *
     * @author chengchao - 2018-12-15 16:43 <br />
     * @see [相关类方法]
     * @since [产品模块版本]
     */
    @Component
    public class OfflineNotifyRedisMessageListener implements MessageListener {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(OfflineNotifyRedisMessageListener.class);
    
        @Override
        public void onMessage(Message message, byte[] bytes) {
    
            String topic = new String(message.getChannel(), StandardCharsets.UTF_8);
            String body = new String(message.getBody(), StandardCharsets.UTF_8);
    
            if (StringUtils.isAnyBlank(body, topic)) {
                LOGGER.warn("body or topic is null or empty !");
                return;
            }
            LOGGER.debug("topic: {}, message body: {}", topic, body);
    
    
            VehicleControlContext.sendOfflineNotify(body);
        }
    }
    
    
    

    public

    
    private void sendOfflineNotifyMessage(BoundValueOperations<String, String> operations) {
            String userLoginInfos = operations.get();
    
            if (StringUtils.isBlank(userLoginInfos)) {
               return;
            }
            RedisUserDto redisUserDto;
    
            try {
                redisUserDto = JSON.parseObject(userLoginInfos, RedisUserDto.class);
                if (Objects.isNull(redisUserDto)) {
                    return;
                }
            } catch (Exception e) {
                // 解析失败, 无所谓
                LOGGER.warn("exception : {} -> {}", userLoginInfos, e.getMessage());
                return;
            }
    
            String token = redisUserDto.getToken();
            if (StringUtils.isBlank(token)) {
                return;
            }
    
            redisTemplate.convertAndSend("app_offline", redisUserDto.getUser().getId() + ":"+ token);
    
        }
    

    相关文章

      网友评论

        本文标题:Redis 发布订阅模式

        本文链接:https://www.haomeiwen.com/subject/gbyjfhtx.html