一、利用ResultHandler对大数据进行流式处理
背景
有如下场景,需要将十万级别的用户数据从数据库中取出,导入到redis中,如直接mybatis把整个数据查询到java代码list中。
一般做法:
List<CustVO> list = baseMapper.selectList();
for (CustVo vo: list) {
redisUtil.set(RedisPrefix.custKey, vo);
}
运行此段代码,会发现系统内存占用有跃升且长时间维持高位:
就会产生一个问题,如果数据量大的话,会导致list数据爆炸,内存溢出(OOM)。
这个时候我们就可以利用Mybatis中的流式查询来处理。
流式查询指的是查询成功后不是返回一个集合而是返回一个迭代器,应用每次从迭代器取一条查询结果。流式查询的好处是能够降低内存使用。其原理是,系统与数据库都会申请一部分内存作为数据缓冲区,应用从缓冲区中读取数据,数据库则会把查询出来的数据不断的放到缓冲区传输给应用层,简单说就是系统处理一部分再获取一部分,但整个过程中与数据库建立的链接是不会断开的,直到数据处理完毕。
ResultHandler来处理。
baseMapper.selectList(actId, new ResultHandler<CustVO>() {
@Override
public void handleResult(ResultContext<? extends CustVO> resultContext) {
CustVO vo = resultContext.getResultObject();
//vo数据保存到redis中
redisUtil.set(RedisPrefix.custKey, vo);
}
}
});
运行代码,查询并处理10万条数据,此时可以观察到,操作系统的内存占用趋于平稳
但此时代码仍未完美,我们发现处理将10万条数据从数据库获取然后加载到redis,需要3~4分钟的时间,效率很慢。什么原因呢?原来代码这里对每条数据调用一次redis的set操作,频繁的调用系统IO,发送网络请求,等待响应,严重影响效率。为解决次问题,就要引入第二个概念。
二、利用redis pipeline实现大数据加载到redis
原理
大多数同学一直以来对 Redis 管道有一个误解,
他们以为这是 Redis 服务器提供的一种特别的技术,
有了这种技术就可以加速 Redis 的存取效率。
但是实际上 Redis 管道 (Pipeline) 本身并不是 Redis 服务器直接提供的技术,
这个技术本质上是由客户端提供的,
跟服务器没有什么直接的关系。
下面我们对这块做一个深入探究。
Redis 的消息交互
当我们使用客户端对 Redis 进行一次操作时,
如下图所示,客户端将请求传送给服务器,
服务器处理完毕后,再将响应回复给客户端。
这要花费一个网络数据包来回的时间。
如果连续执行多条指令,
那就会花费多个网络数据包来回的时间。
如下图所示。
回到客户端代码层面,
客户端是经历了读-写-读-写四个操作才完整地执行了两条指令。
现在如果我们调整读写顺序,改成写—写-读-读,
这两个指令同样可以正常完成。
两个连续的写操作和两个连续的读操作总共只会花费一次网络来回,
就好比连续的 write 操作合并了,
连续的 read 操作也合并了一样。
这便是管道操作的本质,
服务器根本没有任何区别对待,
还是收到一条消息,
执行一条消息,
回复一条消息的正常的流程。
客户端通过对管道中的指令列表改变读写顺序就可以大幅节省 IO 时间。
管道中指令越多,效果越好。
代码改造如下:
baseMapper.selectList(actId, new ResultHandler<CustVO>() {
@Override
public void handleResult(ResultContext<? extends CustVO> resultContext) {
CustVO vo = resultContext.getResultObject();
//vo先保存到map中,当map中的数据量达到一定数量再调用pipeline进行加载
if (custMap.size() >= 8000) {
redisUtil.pipelinedSet(custMap);
custMap.clear();
}
}
});
//将最后剩余数据利用pipeline加载到redis
if (!custMap.isEmpty()) {
redisUtil.pipelinedSet(custMap);
custMap.clear();
}
pipeline方法:
public void pipelinedSet(Map<String, String> map) {
RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
redisTemplate.executePipelined(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection connection) throws DataAccessException {
map.forEach((key, value) -> {
connection.set(serializer.serialize(key), serializer.serialize(value));
});
return null;
}
},serializer);
}
运行代码,加载从数据库查询并加载10万条数据到redis,总用时10s,比之前速度提升50倍!
三、功能业务逻辑中的核心:利用Redis的原子性实现商品库存的扣减
背景:
优惠购项目是提供极其优惠的价格,给星级用户推送消息进来商城进行商品抢购,有点类似于秒抢的场景,所以系统瞬时并发量会比较高,对于接口的QPS要求也很高,所以系统将库存以及用户资格数据放到了性能较高的Redis中。然而一般情况下,为了防止接口并发时库存数据的准确性,保证不出现超卖,超买的情况,需要在客户下单的业务逻辑加锁判断及扣减库存,代码实现如下:
public SeckillActivityRequestVO seckillHandle(SeckillActivityRequestVO request) {
SeckillActivityRequestVO response;
String key = "key:" + request.getSeckillId;
try {
Boolean lockFlag = redisTemplate.opsForValue().setIfAbsent(key, "val", 10, TimeUnit.SECONDS);
if (lockFlag) {
// HTTP请求用户服务进行用户相关的校验
// 用户活动校验
// 库存校验
Object stock = redisTemplate.opsForHash().get(key+":info", "stock");
Object custAct= redisTemplate.opsForHash().get(key+":info", "custAct");
assert stock != null;
if(custAct == null || custAct.getActTimes <= 0) {
// 业务异常,客户没有资格
}
if (Integer.parseInt(stock.toString()) <= 0) {
// 业务异常,商品库存已用完
} else {
redisTemplate.opsForHash().increment(key+":info", "stock", -1);
// 生成订单
// 发布订单创建成功事件
// 构建响应VO
}
}
} finally {
// 释放锁
stringRedisTemplate.delete("key");
// 构建响应VO
}
return response;
}
但实际应用发现,对业务逻辑加锁操作,虽然能保证数据的一致性,但是对系统的效率影响很大,对于秒杀的场景来说,会影响到用户的使用体验,那能不能把锁去掉呢,这里就要说一下Redis的原子性概念了。
Redis原子性:
1、Redis是单进程单线程的网络模型,用的是epoll网络模型,网络模型都是单线程异步非阻塞处理网络请求
2、Redis的单线程处理所有的客户端连接请求,命令读写请求。(有些任务比如rdb和aof等操作是fork子进程处理的,不会影响Redis主线程处理客户端的命令)
3、Redis提供的所有API操作,相对于服务端方面都是one by one执行的,命令是一个接着一个执行的,不存在并行执行的情况。
4、Redis客户端就可能会出现高并发出现错误的读写数据,下面我们举个电商秒抢的例子来讲解一下。
既然知道了Redis执行命令是oen by one 单线程执行的,那么我们是否可以利用这一点,把业务逻辑中,需要在Redis判断的用户购买资格判断和商品库存判断及扣减聚合起来发给Redis一次执行,从而取消java中的锁操作呢?
答案是可以的,java可以利用lua脚本,将需要跟redis交互的命令写到脚本中,一次性传给redis执行,Redis收到脚本后,会单线程执行脚本里面的指令,其他指令则会等待脚本中的命令执行完成返回结果后才执行。这样,java代码可以不用加业务逻辑的锁,大大提高了运行效率。
业务代码如下:
public ActOrderVO saveOrderRedis(ActOrderDTO actOrderDTO) throws InterruptedException {
//.......这里隐藏部分业务逻辑代码
//获取lua脚本并构建redisScript
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setResultType(Long.class);
redisScript.setScriptText(redisScriptConfig.getSaveOrder());
//redis执行脚本
Long result = (Long) redisTemplate.execute(redisScript, keys, 1, amount, order,RedisPrefix.generateCustOrderKey(actOrderDTO.getActId(),actOrderDTO.getOpenid()));
//返回脚本执行结果到上层
ActOrderVO vo = new ActOrderVO();
vo.setStatus(result);
return vo;
}
lua脚本:
local keys,values = KEYS,ARGV
local len = table.getn(keys)
--资格判断
local canBuy = redis.call('get', keys[len-2])
if canBuy
then
if tonumber(canBuy) < 1
then
return -2 --资格已使用
end
else
return -1 --key不存在也视为没有资格
end
--库存判断
for i, v in ipairs(keys) do
local num = i%2
if (i <= len-3 and num ==1)
then
local stock = redis.call('get', keys[i])
if stock
then
if tonumber(stock) < tonumber(keys[i+1])
then
return -3
end
else
return -3
end
end
end
--扣减资格
redis.call('incrby',keys[len-2], -1)
--扣减库存
for i, v in ipairs(keys) do
local num = i%2
if (i <= len-3 and num ==1)
then
redis.call('incrby',keys[i], 0 - tonumber(keys[i+1]))
end
end
redis.call('set', keys[len-1], ARGV[3]) --保存订单
redis.call('rpush', keys[len], values[4])--保存待支付订单状态
return 1
运行得知,资格判断、库存扣减、订单创建整个脚本执行效率极高,毫秒级别即返回结果,整个接口单机压力测试QPS可达到500,符合业务要求。
网友评论