一、需求场景和业务逻辑
场景:客户下了一个订单,商品1,重量2斤,当分拣员打包订单商品时,商品重量超过了实际下单量,然而部分商品不能拆分,分拣人员就需要修改实际的出库量。假如这时客户订单有误,也需要修改订单商品数据。这时,客户和分拣人员同时要操作数据库数据,会出现哪些情况?如何来保证数据的一致性(实际的业务场景是一个订单会有几十或上百种蔬菜或其它商品)
-
分拣人员和客户同时打开终端和网页:都在界面上显示了数据,这里有可能客户先操作,有可能分拣人员先操作,也可能同时操作。
首先,从代码上来思考
我们需要一把锁,把这个订单锁住,谁先获取到订单锁,谁先执行,也就是把我们的并行任务并行转为了串行执行。
然后,从业务上来思考
1.1 如果分拣员先拿到了订单锁提交了商品出库量,数据库会先是分拣员提交的数据,然后客户拿到了订单锁又提交了,数据库会是客户提交的数据。这时,需要添加一条未分拣数据,并作废之前的数据。
1.2 如果客户先拿到了订单锁提交了商品出库量,数据库会先是客户的提交的数据,然后分拣人员拿到了订单锁要提交了,这时,我们需要判断原始下单量是否和数据库的下单量是否一致,如果不一致,说明客户改了下单量,这时需要提示分拣员,客户修改了数据,有一个新数据需要分拣,页面上显示需要分拣的新数据。
从业务来说,客户为主,分拣人员为次。客户修改了下单量,就多一条未分拣记录。 -
客户使用多个网页同时修改商品下单量
-
多个分拣人员使用终端同时修改分拣量
这两种情况可以让订单锁住,谁先获取到订单锁,谁先执行。
数据库:订单表:商品下单量orderWeight,商品出库量storeWeight,订单生成时,商品下单量和出库量是一致的。
分拣队列表:商品下单量orderWeight,商品分拣量pickWeight
业务逻辑:分拣人员修改分拣量时,会修改订单表的出库量;
接下来写一下,使用的锁。使用什么锁,与实际部署的服务相关联,如果单服务,我们只需要单机锁,如果服务集群或者分布式服务,就需要用到分布式锁了。
基本概念:分布式锁是控制分布式系统之间同步访问共享资源的一种方式。
在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证,这个时候,便需要使用到分布式锁。
二、使用 Spring Redis Lock 实现分布式锁
- 配置Maven
<dependency>
<groupId>org.springframework.session</groupId>
<artifactId>spring-session-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
</dependency>
- 配置 redis
#redis服务器
spring.redis.cluster.nodes=172.16.1.14:7379,172.16.1.15:7379,172.16.1.16:7379,172.16.1.17:7379,172.16.1.18:7379,172.16.1.19:7379
spring.redis.cluster.max-redirects=3
spring.redis.database=0
spring.redis.timeout=5000ms
spring.redis.lettuce.pool.max-active=100
spring.redis.lettuce.pool.max-idle=10
spring.redis.lettuce.pool.max-wait=-1ms
spring.redis.lettuce.pool.min-idle=2
spring.redis.lettuce.shutdown-timeout=100ms
#配置业务缓存类型为:redis,如果不用缓存可配置为:none
spring.cache.type=redis
# 是否允许缓存null值
spring.cache.redis.cache-null-values=true
# 缓存key前缀
spring.cache.redis.key-prefix=scms-wh
# 是否使用缓存key前缀
spring.cache.redis.use-key-prefix=true
# Redis缓存ttl
spring.cache.redis.time-to-live=100s
- 配置 RedisLockConfiguration
package com.yuanben.scms.base.redis.configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.integration.redis.util.RedisLockRegistry;
/**
* Redis 分布式锁配置类
* @author CaoJian
*
*/
@Configuration
public class RedisLockConfiguration {
private static final Logger log = LoggerFactory.getLogger(RedisLockConfiguration.class);
private static final String redisLockKeyPrefix = "scms-redis-lock";
@Bean
public RedisLockRegistry redisLockRegistry(RedisConnectionFactory redisConnectionFactory) {
log.info("注册Redis分布式锁,key前缀:"+redisLockKeyPrefix);
return new RedisLockRegistry(redisConnectionFactory, redisLockKeyPrefix, 300000L);
}
}
- 示例
@Autowired
private RedisLockRegistry redisLockRegistry;
private void testRedisLock() {
tmp1=0; tmp2 = 0;
/**
* 不使用分布式锁
*/
new Thread(() -> {
for (int i = 0; i<10; i++) {
tmp1++;
System.out.println("Thread:"+Thread.currentThread().getId()+"tmp1="+tmp1);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(() -> {
for (int i = 0; i<10; i++) {
tmp1++;
System.out.println("Thread:"+Thread.currentThread().getId()+"tmp1="+tmp1);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
Thread.sleep(10000);
/**
* 使用分布式锁示例
*/
new Thread(() -> {
for (int i = 0; i<10; i++) {
if (lock == null) lock = redisLockRegistry.obtain("redis-lock-key");
try {
if (lock.tryLock(30, TimeUnit.SECONDS)) {
tmp2++;
System.out.println("Thread:"+Thread.currentThread().getId()+"tmp2="+tmp2);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}).start();
new Thread(() -> {
for (int i = 0; i<10; i++) {
if (lock == null) lock = redisLockRegistry.obtain("redis-lock-key");
try {
if (lock.tryLock(30, TimeUnit.SECONDS)) {
tmp2++;
System.out.println("Thread:"+Thread.currentThread().getId()+"tmp2="+tmp2);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}).start();
}
- 业务中的使用,注意一定要释放锁
private Lock getRedisLock(String prefix, String keyGuid) {
if (StringUtils.isBlank(keyGuid)) {
log.error("分布式锁key不能为空, 获得分布式锁失败");
throw new MsgException("分布式锁key不能为空,获得分布式锁失败");
}
return redisLockRegistry.obtain(prefix + keyGuid);
}
Lock orderLock = null;
String lockKey = RedisLockEnum.ORDER_LOCK.getKey(), lockName = RedisLockEnum.ORDER_LOCK.getValue();
try {
TodOrderPO orderPO = orderDAO.findByPrimaryKey(orderGuid);
if (null == orderPO) {
throw new MsgException("未查询到订单。");
}
orderLock = getRedisLock(lockKey, orderPO.getOrderCode());
if (!orderLock.tryLock(20, TimeUnit.SECONDS)) {
throw new MsgException(lockName + "加锁失败,订单编码=" + orderPO.getOrderCode());
}
....................
} catch (InterruptedException e) {
throw new Exception("获取" + lockName + "超时" + e.getMessage(), e);
} catch (MsgException e) {
throw new Exception(e.getMessage(), e);
} catch (Exception e) {
throw new Exception(e.getMessage(), e);
} finally {
//必须释放锁
if (null != orderLock) {
orderLock.unlock();
}
}
操作修改订单对订单号加锁,在上传分拣数据也需要对订单号加锁,来保证任务按顺序执行。
三、相关资料
什么是分布式锁?
分布式锁的几种实现
分布式锁方案和区别
分布式锁(Redisson)-从零开始,深入理解与不断优化
快速上手Spring Integration提供的可重入防死锁的分布式锁
redis分布式锁(基于springboot实现),分布式锁对比
网友评论