在apollo中有一个需求是有一个配置,同时只能有一个人修改,这就需要用到锁了。但是并发量不大,所以就建了一个表,通过唯一索引确定唯一来当锁。
@Entity
@Table(name = "Namespace")
@SQLDelete(sql = "Update Namespace set isDeleted = 1 where id = ?")
@Where(clause = "isDeleted = 0")
public class Namespace extends BaseEntity {
@Column(name = "appId", nullable = false)
private String appId;
@Column(name = "ClusterName", nullable = false)
private String clusterName;
@Column(name = "NamespaceName", nullable = false)
private String namespaceName;
@Service
public class NamespaceLockService {
@Autowired
private NamespaceLockRepository namespaceLockRepository;
public NamespaceLock findLock(Long namespaceId){
return namespaceLockRepository.findByNamespaceId(namespaceId);
}
@Transactional
public NamespaceLock tryLock(NamespaceLock lock){
return namespaceLockRepository.save(lock);
}
@Transactional
public void unlock(Long namespaceId){
namespaceLockRepository.deleteByNamespaceId(namespaceId);
}
}
这里提示一下,如果加锁就存该namespaceId,如果释放锁,就删除该列
那么aop加锁如下
/**
* 一个namespace在一次发布中只能允许一个人修改配置
* 通过数据库lock表来实现
*/
@Aspect
@Component
public class NamespaceAcquireLockAspect {
private static final Logger logger = LoggerFactory.getLogger(NamespaceAcquireLockAspect.class);
@Autowired
private NamespaceLockService namespaceLockService;
@Autowired
private NamespaceService namespaceService;
@Autowired
private ItemService itemService;
@Autowired
private BizConfig bizConfig;
//create item
@Before("@annotation(PreAcquireNamespaceLock) && args(appId, clusterName, namespaceName, item, ..)")
public void requireLockAdvice(String appId, String clusterName, String namespaceName,
ItemDTO item) {
acquireLock(appId, clusterName, namespaceName, item.getDataChangeLastModifiedBy());
}
//update item
@Before("@annotation(PreAcquireNamespaceLock) && args(appId, clusterName, namespaceName, itemId, item, ..)")
public void requireLockAdvice(String appId, String clusterName, String namespaceName, long itemId,
ItemDTO item) {
acquireLock(appId, clusterName, namespaceName, item.getDataChangeLastModifiedBy());
}
//update by change set
@Before("@annotation(PreAcquireNamespaceLock) && args(appId, clusterName, namespaceName, changeSet, ..)")
public void requireLockAdvice(String appId, String clusterName, String namespaceName,
ItemChangeSets changeSet) {
acquireLock(appId, clusterName, namespaceName, changeSet.getDataChangeLastModifiedBy());
}
//delete item
@Before("@annotation(PreAcquireNamespaceLock) && args(itemId, operator, ..)")
public void requireLockAdvice(long itemId, String operator) {
Item item = itemService.findOne(itemId);
if (item == null){
throw new BadRequestException("item not exist.");
}
acquireLock(item.getNamespaceId(), operator);
}
void acquireLock(String appId, String clusterName, String namespaceName,
String currentUser) {
if (bizConfig.isNamespaceLockSwitchOff()) {
return;
}
Namespace namespace = namespaceService.findOne(appId, clusterName, namespaceName);
acquireLock(namespace, currentUser);
}
void acquireLock(long namespaceId, String currentUser) {
if (bizConfig.isNamespaceLockSwitchOff()) {
return;
}
Namespace namespace = namespaceService.findOne(namespaceId);
acquireLock(namespace, currentUser);
}
private void acquireLock(Namespace namespace, String currentUser) {
if (namespace == null) {
throw new BadRequestException("namespace not exist.");
}
long namespaceId = namespace.getId();
NamespaceLock namespaceLock = namespaceLockService.findLock(namespaceId);
if (namespaceLock == null) {
try {
tryLock(namespaceId, currentUser);
//lock success
} catch (DataIntegrityViolationException e) {
//lock fail
namespaceLock = namespaceLockService.findLock(namespaceId);
checkLock(namespace, namespaceLock, currentUser);
} catch (Exception e) {
logger.error("try lock error", e);
throw e;
}
} else {
//check lock owner is current user
checkLock(namespace, namespaceLock, currentUser);
}
}
private void tryLock(long namespaceId, String user) {
NamespaceLock lock = new NamespaceLock();
lock.setNamespaceId(namespaceId);
lock.setDataChangeCreatedBy(user);
lock.setDataChangeLastModifiedBy(user);
namespaceLockService.tryLock(lock);
}
private void checkLock(Namespace namespace, NamespaceLock namespaceLock,
String currentUser) {
if (namespaceLock == null) {
throw new ServiceException(
String.format("Check lock for %s failed, please retry.", namespace.getNamespaceName()));
}
String lockOwner = namespaceLock.getDataChangeCreatedBy();
if (!lockOwner.equals(currentUser)) {
throw new BadRequestException(
"namespace:" + namespace.getNamespaceName() + " is modified by " + lockOwner);
}
}
}
释放锁
@Aspect
@Component
public class NamespaceUnlockAspect {
private Gson gson = new Gson();
@Autowired
private NamespaceLockService namespaceLockService;
@Autowired
private NamespaceService namespaceService;
@Autowired
private ItemService itemService;
@Autowired
private ReleaseService releaseService;
@Autowired
private BizConfig bizConfig;
//create item
@After("@annotation(PreAcquireNamespaceLock) && args(appId, clusterName, namespaceName, item, ..)")
public void requireLockAdvice(String appId, String clusterName, String namespaceName,
ItemDTO item) {
tryUnlock(namespaceService.findOne(appId, clusterName, namespaceName));
}
//update item
@After("@annotation(PreAcquireNamespaceLock) && args(appId, clusterName, namespaceName, itemId, item, ..)")
public void requireLockAdvice(String appId, String clusterName, String namespaceName, long itemId,
ItemDTO item) {
tryUnlock(namespaceService.findOne(appId, clusterName, namespaceName));
}
//update by change set
@After("@annotation(PreAcquireNamespaceLock) && args(appId, clusterName, namespaceName, changeSet, ..)")
public void requireLockAdvice(String appId, String clusterName, String namespaceName,
ItemChangeSets changeSet) {
tryUnlock(namespaceService.findOne(appId, clusterName, namespaceName));
}
//delete item
@After("@annotation(PreAcquireNamespaceLock) && args(itemId, operator, ..)")
public void requireLockAdvice(long itemId, String operator) {
Item item = itemService.findOne(itemId);
if (item == null) {
throw new BadRequestException("item not exist.");
}
tryUnlock(namespaceService.findOne(item.getNamespaceId()));
}
private void tryUnlock(Namespace namespace) {
if (bizConfig.isNamespaceLockSwitchOff()) {
return;
}
if (!isModified(namespace)) {
namespaceLockService.unlock(namespace.getId());
}
}
boolean isModified(Namespace namespace) {
Release release = releaseService.findLatestActiveRelease(namespace);
List<Item> items = itemService.findItemsWithoutOrdered(namespace.getId());
if (release == null) {
return hasNormalItems(items);
}
Map<String, String> releasedConfiguration = gson.fromJson(release.getConfigurations(), GsonType.CONFIG);
Map<String, String> configurationFromItems = generateConfigurationFromItems(namespace, items);
MapDifference<String, String> difference = Maps.difference(releasedConfiguration, configurationFromItems);
return !difference.areEqual();
}
private boolean hasNormalItems(List<Item> items) {
for (Item item : items) {
if (!StringUtils.isEmpty(item.getKey())) {
return true;
}
}
return false;
}
private Map<String, String> generateConfigurationFromItems(Namespace namespace, List<Item> namespaceItems) {
Map<String, String> configurationFromItems = Maps.newHashMap();
Namespace parentNamespace = namespaceService.findParentNamespace(namespace);
//parent namespace
if (parentNamespace == null) {
generateMapFromItems(namespaceItems, configurationFromItems);
} else {//child namespace
Release parentRelease = releaseService.findLatestActiveRelease(parentNamespace);
if (parentRelease != null) {
configurationFromItems = gson.fromJson(parentRelease.getConfigurations(), GsonType.CONFIG);
}
generateMapFromItems(namespaceItems, configurationFromItems);
}
return configurationFromItems;
}
private Map<String, String> generateMapFromItems(List<Item> items, Map<String, String> configurationFromItems) {
for (Item item : items) {
String key = item.getKey();
if (StringUtils.isBlank(key)) {
continue;
}
configurationFromItems.put(key, item.getValue());
}
return configurationFromItems;
}
}
这样一来,只需要PreAcquireNamespaceLock注解,且参数满足要求,即可加锁.
网友评论