美文网首页IT@程序员猿媛SpringBoot精选
SpringBoot2.1 Redis 发布/订阅模式

SpringBoot2.1 Redis 发布/订阅模式

作者: 程就人生 | 来源:发表于2019-12-19 22:01 被阅读0次

    Redis支持发布/订阅模式,在发布订阅模式中,主要涉及到三个角色:

    • 发布者 (publisher)
    • 订阅者 (subscriber)
    • 通道(channel)

    在SpringBoot2.1.4版本,先来一个简单的发布订阅demo,这里有一个发布者,发布了两个通道;有三个订阅者,第一个订阅者订阅第一个通道,第二个订阅者订阅第二个通道,第三个订阅者同时订阅两个通道。

    首先,需要在pom文件中引入必须的架包;

    <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>3.1.0</version><!-- 加版本号,是预防在使用@Test注解测试时,找不到JedisPubSub类 -->
            </dependency>       
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    

    第二步,上测试demo;

    import org.junit.Test;
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPubSub;
    /**
     * Jedis发布/订阅者测试
     * @author 程就人生
     * @date 2019年12月19日
     */
    public class JedisTest {
    
        //发布者
        @Test
        public void pub(){      
            Jedis jedis = new Jedis("127.0.0.1", 6379);
            //发布者1
            jedis.publish("publish", "我是发布者,我发布了一些内容,aass");
            jedis.publish("publish2", "我是发布者2,我发布了一些内容,bbbbb");
        }
        
        //第一个订阅者    
        //@Test
        public void sub1(){
            Jedis jedis = new Jedis("127.0.0.1", 6379); 
            jedis.subscribe(new JedisPubSub(){
                @Override
                public void onMessage(String channel, String message) {
                    System.out.println("第一个订阅者");
                    System.out.println("订阅的通道为:" + channel);
                    System.out.println("接收到的内容为:" + message);
                }
            }, "publish");
        }
        
        //第二个订阅者    
        //@Test
        public void sub2(){
            Jedis jedis = new Jedis("127.0.0.1", 6379); 
            jedis.subscribe(new JedisPubSub(){
                @Override
                public void onMessage(String channel, String message) {
                    System.out.println("第二个订阅者");
                    System.out.println("订阅的通道为:" + channel);
                    System.out.println("接收到的内容为:" + message);
                }
            }, "publish2");
        }
            
        //第三个订阅者    
        //@Test
        public void sub3(){
            Jedis jedis = new Jedis("127.0.0.1", 6379); 
            jedis.subscribe(new JedisPubSub(){
                @Override
                public void onMessage(String channel, String message) {
                    System.out.println("第三个订阅者");
                    System.out.println("订阅的通道为:" + channel);
                    System.out.println("接收到的内容为:" + message);
                }
            }, "publish", "publish2");
        }
    }
    

    最后,测试;先把订阅者一个一个启动起来,最后再启动发布者,这时可以看到每个订阅者都收到了自己该收到的消息;

    第一个订阅者
    订阅的通道为:publish
    接收到的内容为:我是发布者,我发布了一些内容,aass
    
    第二个订阅者
    订阅的通道为:publish2
    接收到的内容为:我是发布者2,我发布了一些内容,bbbbb
    
    第三个订阅者
    订阅的通道为:publish
    接收到的内容为:我是发布者,我发布了一些内容,aass
    第三个订阅者
    订阅的通道为:publish2
    接收到的内容为:我是发布者2,我发布了一些内容,bbbbb
    

    在这个demo中,使用的是Jedis框架,使用极其简单;但是,在实际的开发中,肯定没有这么简单,如何才能和SpringBoot更好的地结合呢,这就需要单独地写Redis的配置文件;

    首先,还是pom文件里引入必须的架包;

    <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-configuration-processor</artifactId>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-pool2</artifactId>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    

    第二步,redis配置文件,这里使用了Lettuce框架,也可以使用Jedis框架,Jedis框架配置参数设置比较麻烦;

    package com.example.demo.config;
    
    import java.io.Serializable;
    
    import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.cache.annotation.CachingConfigurerSupport;
    import org.springframework.cache.annotation.EnableCaching;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Primary;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
    import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
    import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
    import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
    import org.springframework.data.redis.serializer.StringRedisSerializer;
    
    /**
     * redis配置
     * @author 程就人生
     * @date 2019年12月19日
     */
    @Configuration
    @EnableCaching//开启注解式缓存
    public class SRedisConfig extends CachingConfigurerSupport {
    
        /**
         * 配置lettuce连接池(多数据源的公共参数)
         * GenericObjectPoolConfig不是线程安全的
         * @return
         */
        @SuppressWarnings("rawtypes")
        @Bean
        @ConfigurationProperties(prefix = "spring.redis.lettuce.pool")
        public GenericObjectPoolConfig redisPool() {
            return new GenericObjectPoolConfig<>();
        }
    
        /**
         * 配置数据源
         *
         * @return
         */
        @Bean
        @ConfigurationProperties(prefix = "spring.redis")
        public RedisStandaloneConfiguration redisConfig() {
            return new RedisStandaloneConfiguration();
        }
    
        /**
         *
         * @param config
         * @param redisConfig
         * @return
         */
        @SuppressWarnings("rawtypes")
        @Bean("factory")
        @Primary
        public LettuceConnectionFactory factory(GenericObjectPoolConfig config, RedisStandaloneConfiguration redisConfig) {
          LettuceClientConfiguration clientConfiguration = LettucePoolingClientConfiguration.builder().poolConfig(config).build();
          return new LettuceConnectionFactory(redisConfig, clientConfiguration);
        }
        
        /**
         *
         * @param factory
         * @return
         */
        @Bean("redisTemplate")
        public RedisTemplate<Serializable, Serializable> redisTemplate(@Qualifier("factory") RedisConnectionFactory factory) {
            
            RedisTemplate<Serializable, Serializable> template = new RedisTemplate<>();
          
            template.setConnectionFactory(factory);
        
            //使用StringRedisSerializer来序列化和反序列化redis的key值
            template.setKeySerializer(new StringRedisSerializer());
              
            template.setValueSerializer(new JdkSerializationRedisSerializer());
              
            template.setHashKeySerializer(new StringRedisSerializer());
              
            template.setHashValueSerializer(new JdkSerializationRedisSerializer());
            
            //开启事务
            template.setEnableTransactionSupport(true);
              
            template.afterPropertiesSet();
            
            return template;
        }
    }
    

    第三步,测试代码

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.connection.Message;
    import org.springframework.data.redis.connection.MessageListener;
    import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * 发布订阅测试
     * @author 程就人生
     * @date 2019年12月19日
     */
    @RestController
    public class Index1Controller {
    
        @Autowired
        LettuceConnectionFactory factory;
        
        //发布次数
        public static int i=0;
        
        //订阅
        @GetMapping("/sub")
        public void sub(){
            factory.getConnection().subscribe(new MessageListener(){
                @Override
                public void onMessage(Message message, byte[] pattern) {
                    System.out.println(message);
                }
                
            }, "Lettuce通道".getBytes());
        }
        
        //发布
        @GetMapping("/pub")
        public void pub(){
            factory.getConnection().publish("Lettuce通道".getBytes(), ("我是Lettuce"+(i++)).getBytes());
        }
        
    }
    

    最后,测试;启动入口程序,先订阅再发布,这样订阅者才能收到发布者发布的消息,先发布再订阅,那么订阅事件之前发布的信息将不能收到。

    测试结果图

    相关文章

      网友评论

        本文标题:SpringBoot2.1 Redis 发布/订阅模式

        本文链接:https://www.haomeiwen.com/subject/isgmnctx.html