3.4Redis实现内部队列

作者: 孔垂云 | 来源:发表于2017-05-18 01:20 被阅读0次

    日常工作中会出现操作的异步处理,这时候就需要队列了,redis可以模拟实现队列。原理就是模拟一个topic,然后把消息放到redis的list对象里面,由专门的监听器来捕获队列的内容。

    来看看具体实现,模拟这样一个场景,用户注册时,需要发短信,把发短信的这个操作异步处理,放入redis队列。

    添加依赖

     <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <junit.version>4.12</junit.version>
            <spring.version>4.3.3.RELEASE</spring.version>
            <jedis.version>2.9.0</jedis.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-aop</artifactId>
                <version>${spring.version}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-aspects</artifactId>
                <version>${spring.version}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-beans</artifactId>
                <version>${spring.version}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-context</artifactId>
                <version>${spring.version}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-context-support</artifactId>
                <version>${spring.version}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-core</artifactId>
                <version>${spring.version}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-web</artifactId>
                <version>${spring.version}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-test</artifactId>
                <version>${spring.version}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.data</groupId>
                <artifactId>spring-data-redis</artifactId>
                <version>1.7.2.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>${jedis.version}</version>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>${junit.version}</version>
            </dependency>
        </dependencies>
    

    这里面主要包括spring依赖和redis依赖。

    新建一个web工程,并修改web.xml

     <context-param>
            <param-name>contextConfigLocation</param-name>
            <param-value>
                classpath:applicationContext-redis.xml
            </param-value>
        </context-param>
        <listener>
            <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
        </listener>
    

    这里面的核心是启动时加载applicationContext-redis.xml,利用spring的监听ContextLoaderListener

    application.properties 配置

    redis.host=127.0.0.1
    redis.port=6379
    redis.pass=
    redis.maxIdle=300
    redis.testOnBorrow=true
    

    applicationContext-redis.xml配置

     <!--redis连接池-->
        <bean id="redisConnectionFactory"
             class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
            <property name="hostName" value="${redis.host}"></property>
            <property name="port" value="${redis.port}"></property>
            <property name="usePool" value="true"></property>
        </bean>
        <!-- 注入RedisTemplate -->
        <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
            <property name="connectionFactory" ref="redisConnectionFactory"></property>
        </bean>
    
        <!-- 序列化工具-->
        <bean id="jdkSerializer"
             class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer"/>
       <!-- 监听代理,用于接收redis消息-->
        <bean id="smsMessageDelegateListener"
              class="com.critc.queue.SmsMessageDelegateListener"/>
        <!-- 消息监听,spring内置的消息监听适配器 -->
        <bean id="smsMessageListener"
              class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
            <property name="delegate" ref="smsMessageDelegateListener"/>
            <property name="serializer" ref="jdkSerializer"/>
        </bean>
        <!--消息发送工具 -->
        <bean id="sendMessage" class="com.critc.queue.SendMessage">
            <property name="redisTemplate" ref="redisTemplate"/>
        </bean>
        <!-- 定义消息的topic-->
        <redis:listener-container>
            <redis:listener ref="smsMessageListener" method="handleMessage"
                            serializer="jdkSerializer" topic="sms_queue_shortmessage"/>
        </redis:listener-container>
    

    SmsMessageVo.java 模拟消息的一个VO

    public class SmsMessageVo implements Serializable {
        private static final long serialVersionUID = 1L;
        // 手机号
        private String mobile;
    
        // 短信内容
        private String content;
        //省略get set方法
    }
    

    里面包括一个手机号,一个短信内容即验证码

    SmsMessageDelegateListener消息队列监听

    public class SmsMessageDelegateListener {
    
        // 监听Redis消息
        public void handleMessage(Serializable message) {
            System.out.println("收到消息" + message);
        }
    }
    

    接收到消息后就可以进行后续的处理,比如调用发短信服务等等。
    这里面需要把message一般是一个json串,先转换成对应的vo,再进行后续的处理

    SendMessage.java 发送消息类

    public class SendMessage {
        private RedisTemplate<String, Object> redisTemplate;
    
        public RedisTemplate<String, Object> getRedisTemplate() {
            return redisTemplate;
        }
    
        public void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {
            this.redisTemplate = redisTemplate;
        }
    
        public void sendMessage(String channel, Serializable message) {
            redisTemplate.convertAndSend(channel, message);
        }
    }
    

    这里面最核心的一句redisTemplate.convertAndSend(channel, message),发送到指定通道消息。

    模拟发送消息

    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations = {"classpath*:applicationContext-redis.xml"})
    public class TestSendMessage {
    
        @Autowired
        private SendMessage sendMessage;
    
        @Test
        public void testSendMessage() {
            SmsMessageVo smsMessageVo = new SmsMessageVo();
            smsMessageVo.setMobile("13811111111");
            smsMessageVo.setContent("1234");
            //一般消息内容会是一个json串,消费时再把json转成对象
            sendMessage.sendMessage("sms_queue_shortmessage", smsMessageVo.toString());
        }
    }
    

    首先启动web工程,然后点击TestSendMessage进行消息的发送。

    消息接收结果

    利用redis实现的消息队列可以进行事务的异步处理,不过这个消息队列和其他的消息中间件有本质的区别,最大的区别就是redis不能保证同一个消息被一个应用接收,比如当前有多个应用监听redis实现的队列,如果队列里面加入新消息,就会被多个应用同时接收,不能实现消息被唯一应用接收。

    当然这只是一个简单的队列,如果有更多需求,需要采用专门的消息中间件来实现,比如rabbitmq、activemq等等

    源码下载

    [本工程详细源码]
    (https://github.com/chykong/java_component/tree/master/chapter3_4_redis_queue)

    相关文章

      网友评论

        本文标题:3.4Redis实现内部队列

        本文链接:https://www.haomeiwen.com/subject/hgjcxxtx.html