美文网首页
【Redis 笔记】(1)redis实现消息队列

【Redis 笔记】(1)redis实现消息队列

作者: 程序员Anthony | 来源:发表于2020-04-26 20:32 被阅读0次

    最近抽空在读redis的源码,分享一些最近反思,要去做一些改变,都要要从外界环境+习惯+兴趣去着手。 完全依靠自我内心的自驱力,虽然感觉自己经常精神饱满,也还是不少深觉力气使错方向,心态不稳定的时刻。当然仰仗别人就更不能啦。 有了这些才谈方法和后续。保持反思保持进步。

    1 目标导向:

    本篇文章和后续文章将从实例出发,从了解分析到实现角度,来讲解:如何利用Redis实现session共享?消息队列?分布式锁?....

    目标,了解redis的各种使用场景以及Redis的代码细节,从中学习优秀的一些规范和数据结构等。

    2 redis介绍

    Redis is an advanced key-value store. It is similar to memcached but the dataset is not volatile, and values can be strings, exactly like in memcached, but also lists, sets, and ordered sets. All this data types can be manipulated with atomic operations to push/pop elements, add/remove elements, perform server side union, intersection, difference between sets, and so forth. Redis supports different kind of sorting abilities.

    3 why redis?

    是一个完全开源免费的key-value内存数据库
    通常用于做缓存.

    key-value和Java中的Map的功能是很相似的,那么redis的优势在哪里?

    • Redis可以用几十G内存做缓存,Map不行,一般JVM分几个G就很大了
    • Redis的缓存可以持久化,Map是内存对象,程序一重启数据就没了
    • Redis可以实现分布式缓存,Map的缓存只是本地的,分布式下不具备缓存一致性
    • Redis可以处理每秒百万级的并发,是专业的缓存服务,Map只是普通的Java对象
    • Redis缓存有过期机制,Map本身无此功能
    • Redis有丰富的API,而Map相比下过于单调

    4 redis 优点

    异常快 - Redis 非常快,每秒可执行大约 110000 次的设置(SET)操作,每秒大约可执行 81000 次的读取/获取(GET)操作。可以通过 ' redis-benchmark -n 100000 -q ' 查看性能。
    支持丰富的数据类型 - Redis 支持开发人员常用的大多数数据类型,例如列表,集合,排序集和散列等等。这使得 Redis 很容易被用来解决各种问题,因为我们知道哪些问题可以更好使用地哪些数据类型来处理解决。
    操作具有原子性 - 所有 Redis 操作都是原子操作,这确保如果两个客户端并发访问,Redis 服务器能接收更新的值。
    多实用工具 - Redis 是一个多实用工具,可用于多种用例,如:缓存,消息队列(Redis 本地支持发布/订阅),应用程序中的任何短期数据,例如,web应用程序中的会话,网页命中计数等。

    5 redis 基础使用

    redis的基础使用可以跟着https://www.runoob.com/redis/redis-tutorial.html
    一步步操作。

    6 redis中的五种数据结构

    官网关于5中类型的介绍

    string(字符串)、list(列表)、hash(字典)、set(集合) 和 zset(有序集合)

    来自 菜鸟教程redis

    下面再对应分析一下Redis源代码。了解一下相关的类型的实现,学习一下Redis 是如何做到内存使用的极致的。

    Redis 数据结构的源码分析,参考:
    Redis 数据结构-字符串源码分析:https://my.oschina.net/mengyuankan/blog/1926320
    Redis 数据结构-字典源码分析: https://my.oschina.net/mengyuankan/blog/1929593

    5 redis的事务

    https://redis.io/topics/transactions

    单个 Redis 命令的执行是原子性的,但 Redis 没有在事务上增加任何维持原子性的机制,所以 Redis 事务的执行并不是原子性的。
    事务可以理解为一个打包的批量执行脚本,但批量指令并非原子化的操作,中间某条指令的失败不会导致前面已做指令的回滚,也不会造成后续的指令不做。

    It's important to note that even when a command fails, all the other commands in the queue are processed – Redis will not stop the processing of commands.

    7 redis 实现消息队列

    7.1 平常使用的消息队列

    目前常用的消息队列大概有三种类型,RabbitMQ等AMQP系列, Kafka, Redis等key-value系列,它们的使用场景分别是:

    1.RabbitMQ:相对重量级高并发的情况,比如数据的异步处理,任务的串行执行等.
    2.Kafka:基于Pull的模式来处理,具体很高的吞吐量,一般用来进行,日志的存储和收集.
    3.Redis:轻量级高并发,实时性要求高的情况,比如缓存,秒杀,及时的数据分析(ELK日志分析框架,使用的就是Redis).

    7.2 第一个版本的Redis实现的消息队列

    参考菜鸟教程里的Redis 发布订阅,


    我们本地安装后运行redis服务端


    通过SUBSCRIBE redisChat 订阅一个客户端A


    重启一个客户端B,通过‘ PUBLISH redisChat ...’发布消息


    客户端接受到了消息


    7.3 使用redis的lpush/rpush来入队,lpop和rpop出队

    我们发现这里会出现消息队列的内容为空的情况,如果我们一直等待在那里,也会出现空闲链接的问题,那么如何解决呢?

    通过查阅API和《Redis 深度历险》这本书,我们发现更好的方案是使用BLPOP,BRPOP,BRPOPLPUSH实现阻塞式的消息队列,对应的API可以点击链接查看。

    BLPOP
    BLPOP is a blocking list pop primitive. It is the blocking version of LPOP because it blocks the connection when there are no elements to pop from any of the given lists. An element is popped from the head of the first list that is non-empty, with the given keys being checked in the order that they are given.
    In Brief : See BLPOP

    BRPOP
    BRPOP is a blocking list pop primitive. It is the blocking version of RPOP because it blocks the connection when there are no elements to pop from any of the given lists. An element is popped from the tail of the first list that is non-empty, with the given keys being checked in the order that they are given.
    In Brief : See BRPOP

    BRPOPLPUSH
    BRPOPLPUSH is the blocking variant of RPOPLPUSH. When source contains elements, this command behaves exactly like RPOPLPUSH. When used inside a MULTI/EXEC block, this command behaves exactly like RPOPLPUSH. When source is empty, Redis will block the connection until another client pushes to it or until timeout is reached. A timeout of zero can be used to block indefinitely.
    In Brief : See BRPOPLPUSH

    7.4 java中的延时队列的实现

    内容来自《Redis 深度历险》



    7.5 Spring data redis 实现消息发送

    再使用Spring data redis 实现消息发送功能,如下:

    消息生产者,注入redisTemplate,用convertAndSend发送消息

    package com.anthony.redis.mq;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.stereotype.Component;
    
    /**
     * <p>Description: </p>
     *
     * @author Anthony
     * @date 2020-04-22 4:54 PM
     */
    @Component
    public class PublishService {
        @Autowired
        StringRedisTemplate redisTemplate;
    
        /**
         * Publishing method
         * @param channel Message publishing subscription topic
         * @param message Message information
         */
        public void publish(String channel, Object message) {
            // connection.publish(rawChannel, rawMessage) encapsulated by this method;
            redisTemplate.convertAndSend(channel, message);
        }
    }
    

    创建一个接收消息的类,可以继承MessageListener,也可以不继承

    package com.anthony.redis.mq;
    
    import com.anthony.redis.template.RedisService;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.data.redis.connection.Message;
    import org.springframework.data.redis.connection.MessageListener;
    
    /**
     * <p>Description: </p>
     *
     * @author Anthony
     * @date 2020-04-22 4:53 PM
     */
    public class SubscribeListener implements MessageListener {
        private static final Logger log = LoggerFactory.getLogger(RedisService.class);
    
        /**
         * Subscriptions receive messages from publishers
         */
        @Override
        public void onMessage(Message message, byte[] pattern) {
            // The cached message is serialized and needs to be deserialized. However, new String() can be deserialized, but static method valueOf() cannot.
            log.info(new String(pattern) + " Receive release:" + new String(message.getBody()));
        }
    }
    

    消息订阅者配置类,主要通过下方的redisMessageListenerContainer 进行配置

    package com.anthony.redis;
    
    import com.anthony.redis.mq.SubscribeListener;
    import org.springframework.cache.annotation.CachingConfigurerSupport;
    import org.springframework.cache.annotation.EnableCaching;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.listener.PatternTopic;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import org.springframework.data.redis.serializer.StringRedisSerializer;
    
    /**
     * <p>Description: </p>
     *
     * @author Anthony
     * @date 2020-04-22 3:35 PM
     */
    @EnableCaching
    @Configuration
    public class RedisConfig extends CachingConfigurerSupport {
    
    
        @Bean
        public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
            RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
            template.setConnectionFactory(redisConnectionFactory);
    //        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); //custom
    //        RedisObjectSerializer redisObjectSerializer = new RedisObjectSerializer();
    //        template.setKeySerializer(stringRedisSerializer);
    //        template.setValueSerializer(redisObjectSerializer);
            template.setKeySerializer(new StringRedisSerializer());
            return template;
        }
    
    
        //Configure listening test topic topic topic in RedisConfig
        @Bean
        public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(redisConnectionFactory);
            // Add subscriber listening classes. The number is unlimited. PatternTopic defines listening topics. Here, listening to test topic topics
            container.addMessageListener(new SubscribeListener(), new PatternTopic("test-topic"));
            return container;
        }
    }
    

    在本地启动redis-server


    运行单元测试


    到这里完成了一个简单版本的消息发送,可以说是非常粗糙了。代码提交到github了:https://github.com/CameloeAnthony/RedisPractice01

    总结一下,Redis来做消息队列采用发布-订阅模式。这种模式是一对多的关系,即一条消息会被多个消费者消费。不能保证每个消费者都能接收到消息,没有ACK机制,无法要求消费者收到消息后进行ACK确认。如果消息丢失、Redis宕机部分数据没有持久化甚至网络抖动都可能带来数据的丢失。我们也可以利用其中的list set等存储来实现一些简单版本的消息队列的功能,我们有没有办法进行进一步的优化,实现ACK的功能呢?我们可以从Rabbitmq中学习到哪些消息队列的知识,从而可以应用到我们的redis的消息队列中呢? 后续的文章会给出答案。

    8 参考链接

    Redis 官网
    spring data redis官网
    https://github.com/spring-projects/spring-data-keyvalue-examples.
    Java guide redis
    《Redis 设计与实现》
    【官方文档】Redis 数据类型介绍
    《Redis 深度历险》
    阿里云 Redis 开发规范
    Redis 快速入门 - 易百教程
    Redis【入门】就这一篇!
    redis 社区文章列表
    Redis系列之基础篇

    未完待续......

    相关文章

      网友评论

          本文标题:【Redis 笔记】(1)redis实现消息队列

          本文链接:https://www.haomeiwen.com/subject/mqojwhtx.html