美文网首页
综合中间件Redisson实战

综合中间件Redisson实战

作者: 一生逍遥一生 | 来源:发表于2020-12-28 20:48 被阅读0次

Redisson的功能

  • 多种连接方式:支持同步、异步、异步流、管道流方式连接
  • 数据序列化:多种数据序列化方式
  • 集合数据分片:
    • 在集群模式下,Redisson为单个集合类型提供了自动分片功能。
    • Redisson通过自身的分片算法,将数据均匀的分布在集群的各个节点上。
    • 目前支持的数据结构类型为:Set和Map
  • 分布式对象:
    • 通用对象桶、二进制流、地理空间对象桶
    • BitSet、基数估计算法、原子整长型
    • 订阅分发、布隆过滤器
  • 分布式集合:
    • Map、MultiMap
    • Set、SortedSet、List、ScoredSortedSet、LexSortedSet
    • Queue以及双端、阻塞、延迟、优先队列等。
  • 分布式锁:
    • 信号量
    • 可过期信号量
    • 可重入锁、公平锁、读写锁
  • 分布式服务:
    • 分布式远程服务
    • 分布式调度服务、分布式映射归纳服务、分布式执行服务等

布隆过滤器

@Data
@ToString
@EqualsAndHashCode
@AllArgsConstructor
public class BloomDto implements Serializable {
    private Integer id;
    private String msg;
}
@Test
public void testBloomFilter() {
    final String key = "myBloomFilterDataV4";
    RBloomFilter<BloomDto> bloomDtoRBloomFilter = redissonClient.getBloomFilter(key);
    bloomDtoRBloomFilter.tryInit(1000, 0.01);
    BloomDto dto1 = new BloomDto(1, "1");
    BloomDto dto2 = new BloomDto(10, "10");
    BloomDto dto3 = new BloomDto(100, "100");
    BloomDto dto4 = new BloomDto(1000, "1000");
    BloomDto dto5 = new BloomDto(10000, "10000");
    bloomDtoRBloomFilter.add(dto1);
    bloomDtoRBloomFilter.add(dto2);
    bloomDtoRBloomFilter.add(dto3);
    bloomDtoRBloomFilter.add(dto4);
    bloomDtoRBloomFilter.add(dto5);
    log.info("bloomfilter contains dto1 data:{}", bloomDtoRBloomFilter.contains(dto1));
    log.info("bloomfilter contains dto2 data:{}", bloomDtoRBloomFilter.contains(dto2));
}

布隆过滤器也可以用于解决Redis缓存击穿。

发布订阅主题

@Component
@Slf4j
public class UserLoginPublisher {
    private static final String topicKey = "redissonUserLoginTopicKey";
    @Autowired
    private RedissonClient redissonClient;

    public void sendMsg(UserLoginDto dto) {
        try {
            RTopic rTopic = redissonClient.getTopic(topicKey);
            rTopic.publishAsync(dto);
        } catch (Exception e) {
            log.error("");
        }
    }
}
@Component
@Slf4j
public class UserLoginSubscriber implements ApplicationRunner, Ordered {

    private static final String topicKey = "redissonUserLoginTopicKey";

    @Autowired
    private RedissonClient redissonClient;

    @Autowired
    private SysLogService sysLogService;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        try {
            RTopic rTopic = redissonClient.getTopic(topicKey);
            rTopic.addListener(UserLoginDto.class, new MessageListener<UserLoginDto>() {
                @Override
                public void onMessage(CharSequence channel, UserLoginDto msg) {
                    if (null != msg) {
                        sysLogService.recordLog(msg);
                    }
                }
            });
        } catch (Exception e) {
            log.error("");
        }
    }

    @Override
    public int getOrder() {
        return 0;
    }
}
@Data
@ToString
public class UserLoginDto implements Serializable {
    @NotBlank
    private String userName;
    @NotBlank
    private String password;
    private Integer userId;
}
@Autowired
private UserLoginPublisher userLoginPublisher;

@Test
public void testTopic() {
    UserLoginDto dto = new UserLoginDto();
    dto.setUserId(90001);
    dto.setUserName("xiaoyao");
    dto.setPassword("123456");
    userLoginPublisher.sendMsg(dto);
}

映射Map

RMap的特性:

  • Eviction元素淘汰:允许针对一个映射中每个元素单独设定有效时间和最长闲置时间
  • LocalCache本地缓存
    • 可以将部分数据保存在本地内存里,将数据读取速度提高最多45倍
    • 所有同名的本地缓存公用一个订阅-发布话题,所有更新和过期的消息都将通过该话题共享
  • Sharding数据分片
    • 仅使用于Redis的集群环境,该映射结构也叫集群分布式映射
    • 该数据结构利用了分库的原理,将单一的映射结构切分为若干个小的映射,并均匀地分布在集群的各个节点
    • 可以使得单一的映射结构突破Redis自身的容量限制,让其容量随集群的扩大而增长,在扩容的同时,还能够使读写性能和元素淘汰处理能力呈线性增长
@Test
public void testRMap() throws InterruptedException {
    final String key = "myRedissonMapCache";
    RMapCache<Integer, RMapDto> rMap = redissonClient.getMapCache(key);
    RMapDto dto1 = new RMapDto(1,"map1");
    RMapDto dto2 = new RMapDto(2,"map2");
    RMapDto dto3 = new RMapDto(3,"map3");
    RMapDto dto4 = new RMapDto(4,"map4");
    rMap.putIfAbsent(dto1.getId(),dto1);
    rMap.putIfAbsent(dto2.getId(),dto2,10, TimeUnit.SECONDS);
    rMap.putIfAbsent(dto3.getId(),dto3);
    rMap.putIfAbsent(dto4.getId(),dto4,5, TimeUnit.SECONDS);

    Set<Integer> set = rMap.keySet();
    Map<Integer,RMapDto> resMap = rMap.getAll(set);
    System.out.println(resMap);
    Thread.sleep(5000);
    resMap = rMap.getAll(rMap.keySet());
    System.out.println(resMap);
    Thread.sleep(10000);
    resMap = rMap.getAll(rMap.keySet());
    System.out.println(resMap);
}

Set

Redisson中包含有序集合功能组件SortedSet,积分排序集合功能组件ScoredSortedSet,字典排序集合功能组件LexSortedSet等。

@Data
@ToString
@EqualsAndHashCode
@AllArgsConstructor
@NoArgsConstructor
public class RSetDto implements Serializable {
    private Integer id;
    private String name;
    private Integer age;
    private Double score;

    public RSetDto(Integer id, String name, Double score) {
        this.id = id;
        this.name = name;
        this.score = score;
    }

    public RSetDto(Integer id, String name, Integer age) {
        this.id = id;
        this.name = name;
        this.age = age;
    }
}
public class RSetComparator implements Comparator<RSetDto> {
    @Override
    public int compare(RSetDto o1, RSetDto o2) {
        return o1.getAge().compareTo(o2.getAge());
    }
}
@Test
public void testSortedSet() {
    //定义存储于缓存中间件Redis的Key
    //保证了元素的有序性
    final String key = "myRedissonSortedSetV2";
    //创建对象实例
    RSetDto dto1 = new RSetDto(1, "N1", 20);
    RSetDto dto2 = new RSetDto(2, "N2", 18);
    RSetDto dto3 = new RSetDto(3, "N3", 21);
    RSetDto dto4 = new RSetDto(4, "N4", 19);
    RSetDto dto5 = new RSetDto(5, "N5", 22);

    //定义有序集合SortedSet实例
    RSortedSet<RSetDto> rSortedSet = redissonClient.getSortedSet(key);
    //设置有序集合SortedSet的元素比较器
    rSortedSet.trySetComparator(new RSetComparator());
    //将对象元素往集合中添加
    rSortedSet.add(dto1);
    rSortedSet.add(dto2);
    rSortedSet.add(dto3);
    rSortedSet.add(dto4);
    rSortedSet.add(dto5);

    //查看此时有序集合Set的元素列表
    Collection<RSetDto> result = rSortedSet.readAll();
    System.out.println("此时有序集合Set的元素列表:" + result);
}

@Test
public void testScoredSortedSet() {
    //定义存储于缓存中间件Redis的Key
    final String key = "myRedissonScoredSortedSet";
    //创建对象实例
    RSetDto dto1 = new RSetDto(1, "N1", 10.0D);
    RSetDto dto2 = new RSetDto(2, "N2", 2.0D);
    RSetDto dto3 = new RSetDto(3, "N3", 8.0D);
    RSetDto dto4 = new RSetDto(4, "N4", 6.0D);

    //定义得分排序集合ScoredSortedSet实例
    RScoredSortedSet<RSetDto> rScoredSortedSet = redissonClient.getScoredSortedSet(key);

    //往得分排序集合ScoredSortedSet添加对象元素
    rScoredSortedSet.add(dto1.getScore(), dto1);
    rScoredSortedSet.add(dto2.getScore(), dto2);
    rScoredSortedSet.add(dto3.getScore(), dto3);
    rScoredSortedSet.add(dto4.getScore(), dto4);

    //查看此时得分排序集合ScoredSortedSet的元素列表
    //可以通过SortOrder指定读取出的元素是正序还是倒序
    Collection<RSetDto> result = rScoredSortedSet.readSortAlpha(SortOrder.DESC);
    log.info("此时得分排序集合ScoredSortedSet的元素列表-从大到小:{} ", result);

    //获取对象元素在集合中的位置-相当于获取排名
    //rank()方法默认采用的是“正序”的方式
    //而且得到的排序值是从0开始算的,可以 加1
    log.info("获取对象元素的排名:对象元素={},从大到小排名={} ", dto1, rScoredSortedSet.revRank(dto1) + 1);
    log.info("获取对象元素的排名:对象元素={},从大到小排名={} ", dto2, rScoredSortedSet.revRank(dto2) + 1);
    log.info("获取对象元素的排名:对象元素={},从大到小排名={} ", dto3, rScoredSortedSet.revRank(dto3) + 1);
    log.info("获取对象元素的排名:对象元素={},从大到小排名={} ", dto4, rScoredSortedSet.revRank(dto4) + 1);

    log.info("\n");
    //获取对象元素在排名集合中的得分
    log.info("获取对象元素在排名集合中的得分:对象元素={},得分={} ", dto1, rScoredSortedSet.getScore(dto1));
    log.info("获取对象元素在排名集合中的得分:对象元素={},得分={} ", dto2, rScoredSortedSet.getScore(dto2));
    log.info("获取对象元素在排名集合中的得分:对象元素={},得分={} ", dto3, rScoredSortedSet.getScore(dto3));
    log.info("获取对象元素在排名集合中的得分:对象元素={},得分={} ", dto4, rScoredSortedSet.getScore(dto4));
}

队列

队列底层核心的执行逻辑是记录"基于发布-订阅"的主题来实现。
在使用延迟队列的时候,不要将TTL不同的时间放入到同一个队列,Redisson可以按照TTL从小到大的顺序先后被真正的队列监听、消费。

@Component
@Slf4j
public class QueueConsumer implements ApplicationRunner, Ordered {
    @Autowired
    private RedissonClient redissonClient;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        final String queueName = "redissonQueue";
        RQueue<String> rQueue = redissonClient.getQueue(queueName);
        while (true) {
            String msg = rQueue.poll();
            if (!Strings.isNullOrEmpty(msg)) {
                log.info("消费者,监听信息:{}", msg);
            }
        }
    }

    @Override
    public int getOrder() {
        return 0;
    }
}
@Component
@Slf4j
public class QueuePublisher {
    @Autowired
    private RedissonClient redissonClient;

    public void sendBasicMsg(String msg) {
        try {
            final String queueName = "redissonQueue";
            RQueue<String> rQueue = redissonClient.getQueue(queueName);
            rQueue.add(msg);
            log.error("队列生产者信息,发送成功:{}", msg);
        } catch (Exception e) {
            log.error("队列发生异常信息:{}", msg, e.fillInStackTrace());
        }
    }
}

延迟队列

//创建死信队列-由死信交换机+死信路由组成
@Bean
public Queue redissonBasicDeadQueue() {
    Map<String, Object> argsMap = new HashMap<>();
    argsMap.put("x-dead-letter-exchange", env.getProperty("mq.redisson.dead.exchange.name"));
    argsMap.put("x-dead-letter-routing-key", env.getProperty("mq.redisson.dead.routing.key.name"));
    return new Queue(env.getProperty("mq.redisson.dead.queue.name"), true, false, false, argsMap);
}

//创建基本交换机
@Bean
public TopicExchange redissonBasicExchange() {
    return new TopicExchange(env.getProperty("mq.redisson.dead.basic.exchange.name"), true, false);
}

//创建基本路由及其绑定-绑定到死信队列
@Bean
public Binding redissonBasicBinding() {
    return BindingBuilder.bind(redissonBasicDeadQueue())
            .to(redissonBasicExchange()).with(env.getProperty("mq.redisson.dead.basic.routing.key.name"));
}

//创建死信交换机
@Bean
public TopicExchange redissonBasicDeadExchange() {
    return new TopicExchange(env.getProperty("mq.redisson.dead.exchange.name"), true, false);
}

//创建真正队列 - 面向消费者
@Bean
public Queue redissonBasicDeadRealQueue() {
    return new Queue(env.getProperty("mq.redisson.real.queue.name"), true);
}

//创建死信路由及其绑定-绑定到真正的队列
@Bean
public Binding redissonBasicDeadRealBinding() {
    return BindingBuilder.bind(redissonBasicDeadRealQueue())
            .to(redissonBasicDeadExchange()).with(env.getProperty("mq.redisson.dead.routing.key.name"));
}
@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class DeadDto implements Serializable {
    private Integer id;
    private String name;
}
@Component
@Slf4j
public class RedissonDelayQueuePublisher {
    @Autowired
    private RedissonClient redissonClient;

    public void sendDelayMsg(final DeadDto dto, final long ttl) {
        try {
            final String delayQueueName = "redissonDelayQueueV3";
            RBlockingQueue<DeadDto> rBlockingQueue = redissonClient.getBlockingQueue(delayQueueName);
            RDelayedQueue<DeadDto> rDelayedQueue = redissonClient.getDelayedQueue(rBlockingQueue);
            rDelayedQueue.offer(dto, ttl, TimeUnit.SECONDS);
            log.info("延迟队列,消息:{}", dto);
        } catch (Exception e) {
            log.info("延迟队列,消息,异常:{}", dto, e.fillInStackTrace());
        }
    }
}
@Component
@Slf4j
public class RedissonDelayQueueConsumer {
    @Autowired
    private RedissonClient redissonClient;

    @Scheduled(cron = "*/1 * * * * ?")
    public void consumerMsg() throws Exception {
        final String delayQueueName = "redissonDelayQueueV3";
        RBlockingQueue<DeadDto> rBlockingQueue = redissonClient.getBlockingQueue(delayQueueName);
        DeadDto msg = rBlockingQueue.take();
        if (null != msg) {
            log.info("消费数据:{}", msg);
        }
    }
}

分布式锁

一次性锁

一次性锁:在高并发的时候,如果当前线程可以获取到分布式锁,则成功获取,如果获取不到,当前线程将永远的被抛弃。

用户重复提交场景:

 /**
 * 处理用户提交注册的请求-加Redisson分布式锁
 *
 * @param dto
 * @throws Exception
 */
@Override
public void userRegRedisson(UserRegDto dto) throws Exception {
    //定义锁的名称
    final String lockName = "redissonOneLock-" + dto.getUserName();
    //获取分布式锁实例
    RLock lock = redissonClient.getLock(lockName);
    try {
        //操作共享资源之前上锁
        //在这里可以通过lock.lock()方法也可以通过调用如下的方法,
        //即上锁之后,不管何种状况,10s后会自动释放
        lock.lock(10, TimeUnit.SECONDS);

        //TODO:真正的核心处理逻辑

        //根据用户名查询用户实体信息
        UserReg reg = userRegMapper.selectByUserName(dto.getUserName());
        //如果当前用户名还未被注册,则将当前用户信息注册入数据库中
        if (reg == null) {
            log.info("---加了Redisson分布式锁之一次性锁---,当前用户名为:{} ", dto.getUserName());
            //创建用户注册实体信息
            UserReg entity = new UserReg();
            //将提交的用户注册请求实体信息中对应的字段取值
            //复制到新创建的用户注册实体的相应字段中
            BeanUtils.copyProperties(dto, entity);
            //设置注册时间
            entity.setCreateTime(new Date());
            //插入用户注册信息
            userRegMapper.insertSelective(entity);
        } else {
            //如果用户名已被注册,则抛出异常
            throw new Exception("用户信息已经存在!");
        }
    } catch (Exception e) {
        log.error("---获取Redisson的分布式锁失败!---");
        throw e;
    } finally {
        //TODO:不管发生何种情况,在处理完核心业务逻辑之后,需要释放该分布式锁
        if (lock != null) {
            lock.unlock();

            //在某些严格的业务场景下,也可以调用强制释放分布式锁的方法
            //lock.forceUnlock();
        }
    }
}

可重入锁

分布式锁的可重入,当高并发产生多线程时,如果当前线程不能获取到分布式锁,它并不会立即被抛弃,而是等待一定的时间,重新尝试去获取分布式锁。
如果在规定的时间内获取到锁,执行后面的操作,如果不能获取到锁,就会被抛弃。

 @Override
    @Transactional(rollbackFor = Exception.class)
    public void robWithRedisson(BookRobDto dto) throws Exception {
        final String lockName = "redissonTryLock-" + dto.getBookNo() + "-" + dto.getUserId();
        /**
         * 获取锁对象
         */
        RLock lock = redissonClient.getLock(lockName);
        try {
            Boolean result = lock.tryLock(100, 10, TimeUnit.SECONDS);
            if (result) {
                //TODO:真正的核心处理逻辑

                //根据书籍编号查询记录
                BookStock stock = bookStockMapper.selectByBookNo(dto.getBookNo());
                //统计每个用户每本书的抢购数量
                int total = bookRobMapper.countByBookNoUserId(dto.getUserId(), dto.getBookNo());

                //商品记录存在、库存充足,而且用户还没抢购过本书,则代表当前用户可以抢购
                if (stock != null && stock.getStock() > 0 && total <= 0) {
                    //当前用户抢购到书籍,库存减一
                    int res = bookStockMapper.updateStockWithLock(dto.getBookNo());
                    //如果允许商品超卖-达成饥饿营销的目的,则可以调用下面的方法
                    //int res=bookStockMapper.updateStock(dto.getBookNo());

                    //更新库存成功后,需要添加抢购记录
                    if (res > 0) {
                        //创建书籍抢购记录实体信息
                        BookRob entity = new BookRob();
                        //将提交的用户抢购请求实体信息中对应的字段取值
                        //复制到新创建的书籍抢购记录实体的相应字段中
                        entity.setBookNo(dto.getBookNo());
                        entity.setUserId(dto.getUserId());
                        //设置抢购时间
                        entity.setRobTime(new Date());
                        //插入用户注册信息
                        bookRobMapper.insertSelective(entity);

                        log.info("---处理书籍抢购逻辑-加Redisson分布式锁---,当前线程成功抢到书籍:{} ", dto);
                    }
                } else {
                    //如果不满足上述的任意一个if条件,则抛出异常
                    throw new Exception("该书籍库存不足!");
                }
            } else {
                throw new Exception("----获取Redisson分布式锁失败!----");
            }
        } catch (Exception e) {
            throw e;
        } finally {
            //TODO:不管发生何种情况,在处理完核心业务逻辑之后,需要释放该分布式锁
            if (lock != null) {
                lock.unlock();
                //在某些严格的业务场景下,也可以调用强制释放分布式锁的方法
                //lock.forceUnlock();
            }
        }
    }

参考文献

(分布式中间件技术实战(Java版).钟林森.机械工业出版社)

相关文章

网友评论

      本文标题:综合中间件Redisson实战

      本文链接:https://www.haomeiwen.com/subject/pfotoktx.html