Redisson的功能
- 多种连接方式:支持同步、异步、异步流、管道流方式连接
- 数据序列化:多种数据序列化方式
- 集合数据分片:
- 在集群模式下,Redisson为单个集合类型提供了自动分片功能。
- Redisson通过自身的分片算法,将数据均匀的分布在集群的各个节点上。
- 目前支持的数据结构类型为:Set和Map
- 分布式对象:
- 通用对象桶、二进制流、地理空间对象桶
- BitSet、基数估计算法、原子整长型
- 订阅分发、布隆过滤器
- 分布式集合:
- Map、MultiMap
- Set、SortedSet、List、ScoredSortedSet、LexSortedSet
- Queue以及双端、阻塞、延迟、优先队列等。
- 分布式锁:
- 信号量
- 可过期信号量
- 可重入锁、公平锁、读写锁
- 分布式服务:
- 分布式远程服务
- 分布式调度服务、分布式映射归纳服务、分布式执行服务等
布隆过滤器
@Data
@ToString
@EqualsAndHashCode
@AllArgsConstructor
public class BloomDto implements Serializable {
private Integer id;
private String msg;
}
@Test
public void testBloomFilter() {
final String key = "myBloomFilterDataV4";
RBloomFilter<BloomDto> bloomDtoRBloomFilter = redissonClient.getBloomFilter(key);
bloomDtoRBloomFilter.tryInit(1000, 0.01);
BloomDto dto1 = new BloomDto(1, "1");
BloomDto dto2 = new BloomDto(10, "10");
BloomDto dto3 = new BloomDto(100, "100");
BloomDto dto4 = new BloomDto(1000, "1000");
BloomDto dto5 = new BloomDto(10000, "10000");
bloomDtoRBloomFilter.add(dto1);
bloomDtoRBloomFilter.add(dto2);
bloomDtoRBloomFilter.add(dto3);
bloomDtoRBloomFilter.add(dto4);
bloomDtoRBloomFilter.add(dto5);
log.info("bloomfilter contains dto1 data:{}", bloomDtoRBloomFilter.contains(dto1));
log.info("bloomfilter contains dto2 data:{}", bloomDtoRBloomFilter.contains(dto2));
}
布隆过滤器也可以用于解决Redis缓存击穿。
发布订阅主题
@Component
@Slf4j
public class UserLoginPublisher {
private static final String topicKey = "redissonUserLoginTopicKey";
@Autowired
private RedissonClient redissonClient;
public void sendMsg(UserLoginDto dto) {
try {
RTopic rTopic = redissonClient.getTopic(topicKey);
rTopic.publishAsync(dto);
} catch (Exception e) {
log.error("");
}
}
}
@Component
@Slf4j
public class UserLoginSubscriber implements ApplicationRunner, Ordered {
private static final String topicKey = "redissonUserLoginTopicKey";
@Autowired
private RedissonClient redissonClient;
@Autowired
private SysLogService sysLogService;
@Override
public void run(ApplicationArguments args) throws Exception {
try {
RTopic rTopic = redissonClient.getTopic(topicKey);
rTopic.addListener(UserLoginDto.class, new MessageListener<UserLoginDto>() {
@Override
public void onMessage(CharSequence channel, UserLoginDto msg) {
if (null != msg) {
sysLogService.recordLog(msg);
}
}
});
} catch (Exception e) {
log.error("");
}
}
@Override
public int getOrder() {
return 0;
}
}
@Data
@ToString
public class UserLoginDto implements Serializable {
@NotBlank
private String userName;
@NotBlank
private String password;
private Integer userId;
}
@Autowired
private UserLoginPublisher userLoginPublisher;
@Test
public void testTopic() {
UserLoginDto dto = new UserLoginDto();
dto.setUserId(90001);
dto.setUserName("xiaoyao");
dto.setPassword("123456");
userLoginPublisher.sendMsg(dto);
}
映射Map
RMap的特性:
- Eviction元素淘汰:允许针对一个映射中每个元素单独设定有效时间和最长闲置时间
- LocalCache本地缓存
- 可以将部分数据保存在本地内存里,将数据读取速度提高最多45倍
- 所有同名的本地缓存公用一个订阅-发布话题,所有更新和过期的消息都将通过该话题共享
- Sharding数据分片
- 仅使用于Redis的集群环境,该映射结构也叫集群分布式映射
- 该数据结构利用了分库的原理,将单一的映射结构切分为若干个小的映射,并均匀地分布在集群的各个节点
- 可以使得单一的映射结构突破Redis自身的容量限制,让其容量随集群的扩大而增长,在扩容的同时,还能够使读写性能和元素淘汰处理能力呈线性增长
@Test
public void testRMap() throws InterruptedException {
final String key = "myRedissonMapCache";
RMapCache<Integer, RMapDto> rMap = redissonClient.getMapCache(key);
RMapDto dto1 = new RMapDto(1,"map1");
RMapDto dto2 = new RMapDto(2,"map2");
RMapDto dto3 = new RMapDto(3,"map3");
RMapDto dto4 = new RMapDto(4,"map4");
rMap.putIfAbsent(dto1.getId(),dto1);
rMap.putIfAbsent(dto2.getId(),dto2,10, TimeUnit.SECONDS);
rMap.putIfAbsent(dto3.getId(),dto3);
rMap.putIfAbsent(dto4.getId(),dto4,5, TimeUnit.SECONDS);
Set<Integer> set = rMap.keySet();
Map<Integer,RMapDto> resMap = rMap.getAll(set);
System.out.println(resMap);
Thread.sleep(5000);
resMap = rMap.getAll(rMap.keySet());
System.out.println(resMap);
Thread.sleep(10000);
resMap = rMap.getAll(rMap.keySet());
System.out.println(resMap);
}
Set
Redisson中包含有序集合功能组件SortedSet,积分排序集合功能组件ScoredSortedSet,字典排序集合功能组件LexSortedSet等。
@Data
@ToString
@EqualsAndHashCode
@AllArgsConstructor
@NoArgsConstructor
public class RSetDto implements Serializable {
private Integer id;
private String name;
private Integer age;
private Double score;
public RSetDto(Integer id, String name, Double score) {
this.id = id;
this.name = name;
this.score = score;
}
public RSetDto(Integer id, String name, Integer age) {
this.id = id;
this.name = name;
this.age = age;
}
}
public class RSetComparator implements Comparator<RSetDto> {
@Override
public int compare(RSetDto o1, RSetDto o2) {
return o1.getAge().compareTo(o2.getAge());
}
}
@Test
public void testSortedSet() {
//定义存储于缓存中间件Redis的Key
//保证了元素的有序性
final String key = "myRedissonSortedSetV2";
//创建对象实例
RSetDto dto1 = new RSetDto(1, "N1", 20);
RSetDto dto2 = new RSetDto(2, "N2", 18);
RSetDto dto3 = new RSetDto(3, "N3", 21);
RSetDto dto4 = new RSetDto(4, "N4", 19);
RSetDto dto5 = new RSetDto(5, "N5", 22);
//定义有序集合SortedSet实例
RSortedSet<RSetDto> rSortedSet = redissonClient.getSortedSet(key);
//设置有序集合SortedSet的元素比较器
rSortedSet.trySetComparator(new RSetComparator());
//将对象元素往集合中添加
rSortedSet.add(dto1);
rSortedSet.add(dto2);
rSortedSet.add(dto3);
rSortedSet.add(dto4);
rSortedSet.add(dto5);
//查看此时有序集合Set的元素列表
Collection<RSetDto> result = rSortedSet.readAll();
System.out.println("此时有序集合Set的元素列表:" + result);
}
@Test
public void testScoredSortedSet() {
//定义存储于缓存中间件Redis的Key
final String key = "myRedissonScoredSortedSet";
//创建对象实例
RSetDto dto1 = new RSetDto(1, "N1", 10.0D);
RSetDto dto2 = new RSetDto(2, "N2", 2.0D);
RSetDto dto3 = new RSetDto(3, "N3", 8.0D);
RSetDto dto4 = new RSetDto(4, "N4", 6.0D);
//定义得分排序集合ScoredSortedSet实例
RScoredSortedSet<RSetDto> rScoredSortedSet = redissonClient.getScoredSortedSet(key);
//往得分排序集合ScoredSortedSet添加对象元素
rScoredSortedSet.add(dto1.getScore(), dto1);
rScoredSortedSet.add(dto2.getScore(), dto2);
rScoredSortedSet.add(dto3.getScore(), dto3);
rScoredSortedSet.add(dto4.getScore(), dto4);
//查看此时得分排序集合ScoredSortedSet的元素列表
//可以通过SortOrder指定读取出的元素是正序还是倒序
Collection<RSetDto> result = rScoredSortedSet.readSortAlpha(SortOrder.DESC);
log.info("此时得分排序集合ScoredSortedSet的元素列表-从大到小:{} ", result);
//获取对象元素在集合中的位置-相当于获取排名
//rank()方法默认采用的是“正序”的方式
//而且得到的排序值是从0开始算的,可以 加1
log.info("获取对象元素的排名:对象元素={},从大到小排名={} ", dto1, rScoredSortedSet.revRank(dto1) + 1);
log.info("获取对象元素的排名:对象元素={},从大到小排名={} ", dto2, rScoredSortedSet.revRank(dto2) + 1);
log.info("获取对象元素的排名:对象元素={},从大到小排名={} ", dto3, rScoredSortedSet.revRank(dto3) + 1);
log.info("获取对象元素的排名:对象元素={},从大到小排名={} ", dto4, rScoredSortedSet.revRank(dto4) + 1);
log.info("\n");
//获取对象元素在排名集合中的得分
log.info("获取对象元素在排名集合中的得分:对象元素={},得分={} ", dto1, rScoredSortedSet.getScore(dto1));
log.info("获取对象元素在排名集合中的得分:对象元素={},得分={} ", dto2, rScoredSortedSet.getScore(dto2));
log.info("获取对象元素在排名集合中的得分:对象元素={},得分={} ", dto3, rScoredSortedSet.getScore(dto3));
log.info("获取对象元素在排名集合中的得分:对象元素={},得分={} ", dto4, rScoredSortedSet.getScore(dto4));
}
队列
队列底层核心的执行逻辑是记录"基于发布-订阅"的主题来实现。
在使用延迟队列的时候,不要将TTL不同的时间放入到同一个队列,Redisson可以按照TTL从小到大的顺序先后被真正的队列监听、消费。
@Component
@Slf4j
public class QueueConsumer implements ApplicationRunner, Ordered {
@Autowired
private RedissonClient redissonClient;
@Override
public void run(ApplicationArguments args) throws Exception {
final String queueName = "redissonQueue";
RQueue<String> rQueue = redissonClient.getQueue(queueName);
while (true) {
String msg = rQueue.poll();
if (!Strings.isNullOrEmpty(msg)) {
log.info("消费者,监听信息:{}", msg);
}
}
}
@Override
public int getOrder() {
return 0;
}
}
@Component
@Slf4j
public class QueuePublisher {
@Autowired
private RedissonClient redissonClient;
public void sendBasicMsg(String msg) {
try {
final String queueName = "redissonQueue";
RQueue<String> rQueue = redissonClient.getQueue(queueName);
rQueue.add(msg);
log.error("队列生产者信息,发送成功:{}", msg);
} catch (Exception e) {
log.error("队列发生异常信息:{}", msg, e.fillInStackTrace());
}
}
}
延迟队列
//创建死信队列-由死信交换机+死信路由组成
@Bean
public Queue redissonBasicDeadQueue() {
Map<String, Object> argsMap = new HashMap<>();
argsMap.put("x-dead-letter-exchange", env.getProperty("mq.redisson.dead.exchange.name"));
argsMap.put("x-dead-letter-routing-key", env.getProperty("mq.redisson.dead.routing.key.name"));
return new Queue(env.getProperty("mq.redisson.dead.queue.name"), true, false, false, argsMap);
}
//创建基本交换机
@Bean
public TopicExchange redissonBasicExchange() {
return new TopicExchange(env.getProperty("mq.redisson.dead.basic.exchange.name"), true, false);
}
//创建基本路由及其绑定-绑定到死信队列
@Bean
public Binding redissonBasicBinding() {
return BindingBuilder.bind(redissonBasicDeadQueue())
.to(redissonBasicExchange()).with(env.getProperty("mq.redisson.dead.basic.routing.key.name"));
}
//创建死信交换机
@Bean
public TopicExchange redissonBasicDeadExchange() {
return new TopicExchange(env.getProperty("mq.redisson.dead.exchange.name"), true, false);
}
//创建真正队列 - 面向消费者
@Bean
public Queue redissonBasicDeadRealQueue() {
return new Queue(env.getProperty("mq.redisson.real.queue.name"), true);
}
//创建死信路由及其绑定-绑定到真正的队列
@Bean
public Binding redissonBasicDeadRealBinding() {
return BindingBuilder.bind(redissonBasicDeadRealQueue())
.to(redissonBasicDeadExchange()).with(env.getProperty("mq.redisson.dead.routing.key.name"));
}
@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class DeadDto implements Serializable {
private Integer id;
private String name;
}
@Component
@Slf4j
public class RedissonDelayQueuePublisher {
@Autowired
private RedissonClient redissonClient;
public void sendDelayMsg(final DeadDto dto, final long ttl) {
try {
final String delayQueueName = "redissonDelayQueueV3";
RBlockingQueue<DeadDto> rBlockingQueue = redissonClient.getBlockingQueue(delayQueueName);
RDelayedQueue<DeadDto> rDelayedQueue = redissonClient.getDelayedQueue(rBlockingQueue);
rDelayedQueue.offer(dto, ttl, TimeUnit.SECONDS);
log.info("延迟队列,消息:{}", dto);
} catch (Exception e) {
log.info("延迟队列,消息,异常:{}", dto, e.fillInStackTrace());
}
}
}
@Component
@Slf4j
public class RedissonDelayQueueConsumer {
@Autowired
private RedissonClient redissonClient;
@Scheduled(cron = "*/1 * * * * ?")
public void consumerMsg() throws Exception {
final String delayQueueName = "redissonDelayQueueV3";
RBlockingQueue<DeadDto> rBlockingQueue = redissonClient.getBlockingQueue(delayQueueName);
DeadDto msg = rBlockingQueue.take();
if (null != msg) {
log.info("消费数据:{}", msg);
}
}
}
分布式锁
一次性锁
一次性锁:在高并发的时候,如果当前线程可以获取到分布式锁,则成功获取,如果获取不到,当前线程将永远的被抛弃。
用户重复提交场景:
/**
* 处理用户提交注册的请求-加Redisson分布式锁
*
* @param dto
* @throws Exception
*/
@Override
public void userRegRedisson(UserRegDto dto) throws Exception {
//定义锁的名称
final String lockName = "redissonOneLock-" + dto.getUserName();
//获取分布式锁实例
RLock lock = redissonClient.getLock(lockName);
try {
//操作共享资源之前上锁
//在这里可以通过lock.lock()方法也可以通过调用如下的方法,
//即上锁之后,不管何种状况,10s后会自动释放
lock.lock(10, TimeUnit.SECONDS);
//TODO:真正的核心处理逻辑
//根据用户名查询用户实体信息
UserReg reg = userRegMapper.selectByUserName(dto.getUserName());
//如果当前用户名还未被注册,则将当前用户信息注册入数据库中
if (reg == null) {
log.info("---加了Redisson分布式锁之一次性锁---,当前用户名为:{} ", dto.getUserName());
//创建用户注册实体信息
UserReg entity = new UserReg();
//将提交的用户注册请求实体信息中对应的字段取值
//复制到新创建的用户注册实体的相应字段中
BeanUtils.copyProperties(dto, entity);
//设置注册时间
entity.setCreateTime(new Date());
//插入用户注册信息
userRegMapper.insertSelective(entity);
} else {
//如果用户名已被注册,则抛出异常
throw new Exception("用户信息已经存在!");
}
} catch (Exception e) {
log.error("---获取Redisson的分布式锁失败!---");
throw e;
} finally {
//TODO:不管发生何种情况,在处理完核心业务逻辑之后,需要释放该分布式锁
if (lock != null) {
lock.unlock();
//在某些严格的业务场景下,也可以调用强制释放分布式锁的方法
//lock.forceUnlock();
}
}
}
可重入锁
分布式锁的可重入,当高并发产生多线程时,如果当前线程不能获取到分布式锁,它并不会立即被抛弃,而是等待一定的时间,重新尝试去获取分布式锁。
如果在规定的时间内获取到锁,执行后面的操作,如果不能获取到锁,就会被抛弃。
@Override
@Transactional(rollbackFor = Exception.class)
public void robWithRedisson(BookRobDto dto) throws Exception {
final String lockName = "redissonTryLock-" + dto.getBookNo() + "-" + dto.getUserId();
/**
* 获取锁对象
*/
RLock lock = redissonClient.getLock(lockName);
try {
Boolean result = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (result) {
//TODO:真正的核心处理逻辑
//根据书籍编号查询记录
BookStock stock = bookStockMapper.selectByBookNo(dto.getBookNo());
//统计每个用户每本书的抢购数量
int total = bookRobMapper.countByBookNoUserId(dto.getUserId(), dto.getBookNo());
//商品记录存在、库存充足,而且用户还没抢购过本书,则代表当前用户可以抢购
if (stock != null && stock.getStock() > 0 && total <= 0) {
//当前用户抢购到书籍,库存减一
int res = bookStockMapper.updateStockWithLock(dto.getBookNo());
//如果允许商品超卖-达成饥饿营销的目的,则可以调用下面的方法
//int res=bookStockMapper.updateStock(dto.getBookNo());
//更新库存成功后,需要添加抢购记录
if (res > 0) {
//创建书籍抢购记录实体信息
BookRob entity = new BookRob();
//将提交的用户抢购请求实体信息中对应的字段取值
//复制到新创建的书籍抢购记录实体的相应字段中
entity.setBookNo(dto.getBookNo());
entity.setUserId(dto.getUserId());
//设置抢购时间
entity.setRobTime(new Date());
//插入用户注册信息
bookRobMapper.insertSelective(entity);
log.info("---处理书籍抢购逻辑-加Redisson分布式锁---,当前线程成功抢到书籍:{} ", dto);
}
} else {
//如果不满足上述的任意一个if条件,则抛出异常
throw new Exception("该书籍库存不足!");
}
} else {
throw new Exception("----获取Redisson分布式锁失败!----");
}
} catch (Exception e) {
throw e;
} finally {
//TODO:不管发生何种情况,在处理完核心业务逻辑之后,需要释放该分布式锁
if (lock != null) {
lock.unlock();
//在某些严格的业务场景下,也可以调用强制释放分布式锁的方法
//lock.forceUnlock();
}
}
}
参考文献
(分布式中间件技术实战(Java版).钟林森.机械工业出版社)
网友评论