线程异步编排
参考
https://blog.csdn.net/weixin_44378311/article/details/106793616
https://www.cnblogs.com/lwh1019/p/12896990.html
@Test
void contextLoads() {
ExecutorService future= Executors.newFixedThreadPool(10);
CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程------------------" + Thread.currentThread().getName());
int i = 10 / 2;
return i;
}, future);
//获取异步执行的结果在线程执任务行完之后返回
Integer integer = null;
try {
integer = integerCompletableFuture.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("结果为"+integer);//结果为5
}
@Test
void test2() throws ExecutionException, InterruptedException {
ExecutorService future= Executors.newFixedThreadPool(10);
future.submit(new Runnable() {
@Override
public void run() {
System.out.println(11111);
}
});
CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程------------------" + Thread.currentThread().getName());
int i = 10 / 0;//使用int i = 10 / 0;模拟有异常
return i;
}, future).whenComplete((ems,exception)->{
//在出现异常时虽然可以感知异常但不能修改数据
System.out.println("计算结果为:"+ems+"----异常为"+exception);
}).exceptionally((throwable)->{
//可以感知异常,并可以返回结果
return 10;
});
Integer integer = integerCompletableFuture.get();
System.out.println(integer);
}
@Test
void test3() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
System.out.println(Thread.currentThread().getName() + "\t completableFuture");
return 1024;
}
}).thenApply(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer o) {
System.out.println("thenApply方法,上次返回结果:" + o);
return o * 2;
}
}).whenComplete(new BiConsumer<Integer, Throwable>() {
@Override
public void accept(Integer o, Throwable throwable) {
System.out.println("-------o=" + o);
System.out.println("-------throwable=" + throwable);
int i = 10 / 0;
}
}).exceptionally(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) {
System.out.println("throwable=" + throwable);
return 6666;
}
});
System.out.println(future.get());
}
@Test
void test4() throws ExecutionException, InterruptedException {
ExecutorService future= Executors.newFixedThreadPool(10);
//先查询用户
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() ->{
System.out.println("查询到用户信息{userName:abc}");
return "{userName:abc}";
});
//可改变外部对象属性的值
SecurityProperties.User user = new SecurityProperties.User();
//根据用户获取订单
CompletableFuture<Void> futureB = futureA.thenAcceptAsync((s) -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
user.setName("fdsa");
System.out.println(s);
System.out.println("根据用户获取订单数据");
}, future);
CompletableFuture<Void> futureC = futureA.thenAcceptAsync((s) -> {
user.setPassword("123");
try {
Thread.sleep(3*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(s);
System.out.println("根据用户获取余额数据");
}, future);
CompletableFuture.allOf(futureB,futureC).get();
System.out.println(user.getName());
System.out.println(user.getPassword());
}
缓存相关
缓存穿透:
指查询一个一定不存在的数据,由于缓存是不命中,将去查询数据库,但是数据库也无此记录,我们没有将这次查询的null写入缓存,这将导致这个不 存在的数据每次请求都要到存储层去查询,失去了缓存的意义。
风险:利用不存在的数据进行攻击,数据库瞬时压力增大,最终导致崩溃。
解决:null结果缓存,并加入短暂过期时间。
缓存雪崩:
缓存雪崩是指在我们设置缓存时key采用了相同的过期时间, 导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时压力过重雪崩。
解决:原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这 样每一个缓存的过期时间的重复率就会降低,就很难引发集体 失效的事件。
缓存击穿:
对于一些设置了过期时间的key,如果这些key可能会在某些 时间点被超高并发地访问,是一种非常“热点”的数据。
如果这个key在大量请求同时进来前正好失效,那么所有对 这个key的数据查询都落到db,我们称为缓存击穿。
解决:加锁,大量并发只让一个去查,其他人等待,查到以后释放锁,其他 人获取到锁,先查缓存,就会有数据,不用去db
redis分布式锁原理
利用redis set nx实现
127.0.0.1:6379> set lock 1 nx
OK
127.0.0.1:6379> set lock 1 nx
(nil)
分布式锁一阶段
分布式锁1阶段.png public String lock() {
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", 1);
if (lock) {
//加锁成功..执行业务
String db = "db查询到的数据";
redisTemplate.delete("lock");
return db;
} else {
//加锁失败,重试
return lock();//自旋
}
}
问题:setnx占好了位,业务代码异常或者程序在页面过程中宕机。没有执行删除锁逻辑,这就造成了死锁
解决:设置锁的自动过期,即使没有删除,会自动删除
分布式锁二阶段
分布式锁2阶段.png public String lock() {
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", 1);
if (lock) {
//设置过期时间
redisTemplate.expire("lock", 30, TimeUnit.SECONDS);
//加锁成功..执行业务
String db = "db查询到的数据";
redisTemplate.delete("lock");
return db;
} else {
//加锁失败,重试
return lock();//自旋
}
}
问题:setnx设置好,正要去设置过期时间,宕机。又死锁了。
解决:设置过期时间和占位必须是原子的。redis支持使用setnx ex命令
分布式锁三阶段
分布式锁3阶段.png public String lock() {
//设置值的时候同时设置过期时间,保证原子性
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", 1,30,TimeUnit.SECONDS);
if (lock) {
//加锁成功..执行业务
String db = "db查询到的数据";
redisTemplate.delete("lock");
return db;
} else {
//加锁失败,重试
return lock();//自旋
}
}
分布式锁4阶段.png问题:删除锁直接删除?
如果由于业务时间很长,锁自己过期了,我们 直接删除,有可能把别人正在持有的锁删除了。
解决:占锁的时候,值指定为uuid,每个人匹配是自己 的锁才删除。
public String lock() {
//设置值的时候同时设置过期时间
String uuid = UUID.randomUUID().toString();
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid,30,TimeUnit.SECONDS);
if (lock) {
//加锁成功..执行业务
String db = "db查询到的数据";
//获取值对比,删除
String lockValue = Objects.requireNonNull(redisTemplate.opsForValue().get("lock")).toString();
if (uuid.equals(lockValue)) {
redisTemplate.delete("lock");
}
return db;
} else {
//加锁失败,重试
return lock();//自旋
}
}
分布式锁5阶段.png问题:如果正好判断是当前值,正要删除锁的时候,锁已经过期, 别人已经设置到了新的值。那么我们删除的是别人的锁。
解决:删除锁必须保证原子性。使用redis+Lua脚本完成。
public String lock() {
//设置值的时候同时设置过期时间
String uuid = UUID.randomUUID().toString();
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid,30,TimeUnit.SECONDS);
if (lock!=null && lock) {
//加锁成功..执行业务
String db = "db查询到的数据";
//获取值对比,删除
/* String lockValue = Objects.requireNonNull(redisTemplate.opsForValue().get("lock")).toString();
if (uuid.equals(lockValue)) {
redisTemplate.delete("lock");
}*/
String lua = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
Long lock1 = redisTemplate.execute(new DefaultRedisScript<Long>(lua, Long.class), Collections.singletonList("lock"), uuid);
return db;
} else {
//加锁失败,重试
return lock();//自旋
}
}
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
保证加锁【占位+过期时间】和删除锁【判断+删除】的原子性。 更难的事情,锁的自动续期
Redisson
https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95
@Configuration
public class MyRedissonConfig {
/**
* 对Redisson的使用都是通过RedissonClient
* @return
* @throws IOException
*/
@Bean(destroyMethod="shutdown")
public RedissonClient redisson() throws IOException {
// 单Redis节点模式
// 1、创建配置
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.56.10:6379");
// 2、根据config创建RedissonClient示例
RedissonClient redisson = Redisson.create(config);
return redisson;
}
}
@ResponseBody
@GetMapping("/hello")
public String hello() {
RLock mylock = redisson.getLock("mylock");
mylock.lock(); // 加锁 阻塞式等待
try {
System.out.println(Thread.currentThread().getId() + "---> 加锁成功");
Thread.sleep(10000);
} catch (Exception ex) {
ex.printStackTrace();
} finally {
mylock.unlock(); // 解锁
System.out.println(Thread.currentThread().getId() + " ---> 释放锁");
}
return "hello";
}
问题:假设解锁代码没有运行,会不会出现死锁
1、锁的自动续期,如果业务超长,运行期间自动给锁续上新的30s。不用担心业务时间长,锁自动过期被删掉。
2、redisson实例只要关闭,就不会给当前锁续期,即使不手动解锁,锁默认在30s以后自动删除。
自动过期
@ResponseBody
@GetMapping("/hello")
public String hello() {
RLock mylock = redisson.getLock("mylock");
mylock.lock(10, TimeUnit.SECONDS); // 加锁 阻塞式等待 10s后自动解锁
try {
System.out.println(Thread.currentThread().getId() + "---> 加锁成功");
Thread.sleep(30000);
} catch (Exception ex) {
ex.printStackTrace();
} finally {
mylock.unlock(); // 解锁
System.out.println(Thread.currentThread().getId() + " ---> 释放锁");
}
return "hello";
}
读写锁
@GetMapping("/write")
public String writeValue() {
RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock");
RLock rLock = readWriteLock.writeLock();
String s = "";
try {
rLock.lock();
s = UUID.randomUUID().toString();
Thread.sleep(30000);
stringRedisTemplate.opsForValue().set("writeValue", s);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
rLock.unlock();
}
return s;
}
@GetMapping("/read")
public String readValue() {
RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock");
String s = "";
RLock rLock = readWriteLock.readLock();
rLock.lock();
try {
s = stringRedisTemplate.opsForValue().get("writeValue");
} catch (Exception e) {
e.printStackTrace();
}finally {
rLock.unlock();
}
return s;
}
信号量
@GetMapping("/park")
@ResponseBody
public String park() throws InterruptedException {
RSemaphore park = redisson.getSemaphore("park");
//park.tryAcquire();
park.acquire();//获取一个信号,车位
return "ok";
}
@GetMapping("/go")
@ResponseBody
public String go() throws InterruptedException {
RSemaphore park = redisson.getSemaphore("park");
park.release();//释放一个车位
return "ok";
}
缓存和数据库保持一致
双写模式
双写模式.png由于卡顿等原因,导致写缓存2在最前,写缓存1在后面就出现了不一致
脏数据问题:
这是暂时性的脏数据问题,但是在数据稳定,缓存过期以后,又能得到最新的正确数据
读到的最新数据有延迟:最终一致性
失效模式
失效模式.png我们系统的一致性解决方案:
1、缓存的所有数据都有过期时间,数据过期下一次查询触发主动更新
2、读写数据的时候,加上分布式的读写锁。 经常写,经常读
无论是双写模式还是失效模式,都会导致缓存的不一致问题。即多个实例同时更新会出事。怎么办?
1、如果是用户纬度数据(订单数据、用户数据),这种并发几率非常小,不用考虑这个问题,缓存数据加 上过期时间,每隔一段时间触发读的主动更新即可
2、如果是菜单,商品介绍等基础数据,也可以去使用canal订阅binlog的方式。
3、缓存数据+过期时间也足够解决大部分业务对于缓存的要求。
4、通过加锁保证并发读写,写写的时候按顺序排好队。读读无所谓。所以适合使用读写锁。(业务不关心 脏数据,允许临时脏数据可忽略);
总结:
我们能放入缓存的数据本就不应该是实时性、一致性要求超高的。所以缓存数据的时候加上过期时间,保 证每天拿到当前最新数据即可。
我们不应该过度设计,增加系统的复杂性
遇到实时性、一致性要求高的数据,就应该查数据库,即使慢点。
Spring Cache
spring.cache.type=redis
spring.cache.redis.time-to-live=360000
#防止缓存穿透
spring.cache.redis.cache-null-values=true
spring.cache.redis.key-prefix=CACHE_
spring.cache.redis.use-key-prefix=true
网友评论