一、分布式锁
-
过期
我们知道redis的setnx指令可以设置作为锁来使用,但还要设置过期时间。redis 2.8之后加入了set key value ex n nx指令使setnx和expire可以原子执行。 -
重入
若要实现可重入,可以用threadLocal来实现。threadLocal里存一个以锁的key为key,重入次数为value的map,每次拿锁的时候可以先看看这个map里是否已经重入过了。 -
集群环境下
如果是集群环境,需要考虑节点挂了的问题。比如哨兵模式下设置锁的节点挂了,从节点变成主节点,此节点没有锁,就出现了锁的丢失问题。可以采用RedLock来解决。加锁时,它会向过半节点发送 set(key, value, nx=True, ex=xxx) 指令,只要过半节点 set 成功,那就认为加锁成功。释放锁时,需要向所有节点发送 del 指令。
二、队列
-
消息队列
我们知道list有lpop、rpop、lpush和rpush指令,可以用来实现一个消息队列。 -
阻塞队列
如果队列为空的话,我们如果用while一直轮询pop肯定会飙高CPU,如果sleep一段时间再pop,又可能增大消息的延迟。我们可以用blpop和brpop来实现阻塞读取。这样做需要注意的是,如果阻塞一段时间之后会成为空闲连接,redis会自动断开,我们要注意异常的捕获。 -
延迟队列
延时队列可以通过 Redis 的 zset(有序列表) 来实现。我们将消息序列化成一个字符串作为 zset 的 value,这个消息的到期处理时间作为 score,然后用多个线程轮询 zset 获取到期的任务进行处理。代码实现:
public class RedisDelayingQueue<T> {
static class TaskItem<T> {
public String id;
public T msg;
}
// fastjson 序列化对象中存在 generic 类型时,需要使用 TypeReference
private Type TaskType = new TypeReference<TaskItem<T>>() { }.getType();
private Jedis jedis;
private String queueKey;
public RedisDelayingQueue(Jedis jedis, String queueKey) {
this.jedis = jedis;
this.queueKey = queueKey;
}
public void delay(T msg) {
TaskItem task = new TaskItem();
task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid
task.msg = msg;
String s = JSON.toJSONString(task); // fastjson 序列化
jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试
}
public void loop() {
while (!Thread.interrupted()) {
// 只取一条
Set values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
if (values.isEmpty()) {
try {
Thread.sleep(500); // 歇会继续
}
catch (InterruptedException e) {
break;
}
continue;
}
String s = values.iterator().next();
if (jedis.zrem(queueKey, s) > 0) { // 抢到了
TaskItem task = JSON.parseObject(s, TaskType); // fastjson 反序列化
this.handleMsg(task.msg);
}
}
}
public void handleMsg(T msg) {
System.out.println(msg);
}
public static void main(String[] args) {
Jedis jedis = new Jedis();
RedisDelayingQueue queue = new RedisDelayingQueue<>(jedis, "q-demo");
Thread producer = new Thread() {
public void run() {
for (int i = 0; i < 10; i++) {
queue.delay("codehole" + i);
}
}
};
Thread consumer = new Thread() {
public void run() {
queue.loop();
}
};
producer.start();
consumer.start();
try {
producer.join();
Thread.sleep(6000);
consumer.interrupt();
consumer.join();
}
catch (InterruptedException e) {
}
}
}
三、HyperLogLog
HyperLogLog可以用来做一个非精确的计数。例如要统计一个页面的访问用户数,一个用户算一次。如果直接用set来存用户id的话未免太占内存。我们可以用HyperLogLog来实现,命令pfadd添加元素,命令pfcount获取计数。
四、布隆过滤器
-
使用场景
redis4.0提供了布隆过滤器,它可以用来判断元素在不在集合内。比如我们在使用新闻客户端看新闻时,它会给我们不停地推荐新的内容,它每次推荐时要去重,去掉那些已经看过的内容。又或者爬虫系统中判断一个网页是否已经爬过,小部分误判的话再爬一次问题也不大。它判断不存在是不会误判的,判断存在时会存在小几率的误判。原理就是新增时将给定的元素hash映射到bit表里,把这几个位置置为1。如果给定元素hash值的位置有一个为0那就不存在,反之可能存在。
-
使用方式
bf.add添加,添加多个用bf.madd,bf.mexists判断是否存在,查询多个用bf.mexists。bf.reserve设置 key,error_rate(默认0.01)和initial_size(默认100)。如果元素值超过初始值,误判率会上升。在redis4.0之前可以用一些第三方的库来基于redis的位图来实现布隆过滤器。
五、GeoHash
GeoHash可以用来保存地理信息,计算距离,或者查询附近的单位。指令有geoadd、geodist(计算距离)、geopos(获取元素经纬度)、geohash (获取元素hash值),georadiusbymember(获取某元素附近的元素)、georadius(获取某点附近的元素)。
六、漏斗限流
可以用来限制用户评论等。
CL.THROTTLE user123 15 30 60 1
▲ ▲ ▲ ▲ ▲
| | | | └───── apply 1 token (default if omitted)
| | └──┴─────── 30 tokens / 60 seconds
| └───────────── 15 max_burst
└─────────────────── key "user123"
# 执行命令CL.THROTTLE user123 15 30 60 1
127.0.0.1:6379> CL.THROTTLE user123 15 30 60 1
1) (integer) 0 # 0 表示允许,1表示拒绝
2) (integer) 16 # 漏斗总容量 就是 15+1得到的。
3) (integer) 15 # 漏斗剩余空间,取得了一个所以剩下15
4) (integer) -1 # 如果拒绝了,需要多长时间后再试(漏斗有空间了,单位秒)
5) (integer) 2 # 表示多久后令牌桶中的令牌会存满(单位秒)
参考资料来源:《Redis 深度历险: 核心原理和应用实践》
网友评论