业务背景
公司需要将n个api接口返回的数据在分析后封装成个信报告。原先的设计比较死板,几个版本需要查的东西是固定的。为了以后的扩展性及尽量满足可能的需求变化,原先的设计显然是不够,因此就有了本次重构设计。
词语定义
模块:由一个或者多个接口返回的数据解析后封装成的一类数据,如基础信息模块,学历信息模块等。
产品:由多个模块组合成的一个东西,可以理解成“套餐”。这个可以在后台系统通过自由组合模块来形成。
一些关键问题
- 模块和api接口是多对多的关系,有的模块需要调用多个接口,有的接口可以产生多个模块。
- 为了尽量减少查询时间,需要采用多线程,让模块或者api并发调用。
- 尽量将一些关键内容封装起来,在日后扩展模块的时候,只要将精力放在获取api响应结果及分析数据上。
- 对于同一个api接口,同一次查询只能查一次
- 各api接口的有效响应,需要存储到数据库缓存,在相同查询条件的响应缓存的有效期内,可以以缓存作为该次查询的响应结果;无效响应不存储。
- 模块有主接口,可能有备用接口(预防主接口无法查询的情况,有且仅有一个)。
关键思路
- 基于策略模式来处理,面向抽象编程
- 由“模块管理器”来管理模块,该管理器持有具体策略类的class name,通过java反射在运行时获取到具体的策略子类
- 用一个Map作为资源池pool,api.code作为key,对应的响应作为value,各线程共享同一个pool。用一个空的JsonObject(记做EMPTY_JSON)作为占位符,表示api查询中。对于各线程而言,如果pool里不存在key,则表示本次查询该api接口还未查询,那么就可以查询api并将EMPTY_JSON放入pool里告知其他线程该接口正在查询中;如果pool里存在key且EMPTY_JSON.equal(value),则表示该api接口有其他线程正在查询;如果pool里存在key且!EMPTY_JSON.equal(value),则表示该api接口已经由其他线程查询完毕,该value可作为响应直接取到。
- 由于是多线程,在读取pool是否存在key和放入放入EMPTY_JSON这两个步骤需要合并一起作为原子性操作。
类图
//TODO 日后再补
关键代码
由ModuleManager反射出具体的策略类,并启动线程。
public void start(PostMethodWrapper postMethodWrapper, Object postBody, Object oldInstance) {
Report report = (Report) oldInstance;
ProductionManager productionManager = report.getProductionManager();
Set<ModuleManager> moduleManagers = productionManager.getModuleManagers();
List<Future> futures = new ArrayList<>();
Lock lock = new ReentrantLock();
Map<String, JSONObject> pool = new HashMap<>();
ExecutorService executorService = Executors.newFixedThreadPool(8);
for (ModuleManager moduleManager : moduleManagers) {
String strategyName = moduleManager.getStrategyName();
try {
Object o = Class.forName(strategyName).getConstructor(Report.class, SupplyAPIRepository.class, APISearchRepository.class)
.newInstance(report, supplyAPIRepository, apiSearchRepository);
if (o instanceof BaseStrategy) {
BaseStrategy strategy = (BaseStrategy) o;
Future<BaseModule> future = executorService.submit(new ModuleCallable(strategy, pool, lock, moduleManager.getDispalyOrder()));
futures.add(future);
} else {
logger.warn("strategyName配置错误,非BaseStrategy子类,module:" + moduleManager.getName());
}
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException | InvocationTargetException e) {
logger.warn("strategyName配置错误,module:" + moduleManager.getName());
}
}
for (Future future : futures) {
try {
future.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
}
}
面向抽象编程
public class ModuleCallable implements Callable<BaseModule> {
private static final Logger logger = LoggerFactory.getLogger(ModuleCallable.class);
private BaseStrategy strategy;
private Map<String, JSONObject> pool;
private final Lock lock;
private Integer displayOrder;
public ModuleCallable(BaseStrategy strategy, Map<String, JSONObject> pool, Lock lock, Integer displayOrder) {
this.strategy = strategy;
this.pool = pool;
this.lock = lock;
this.displayOrder = displayOrder;
}
@Override
public BaseModule call() throws Exception {
BaseModule result = null;
if (strategy.isAPIActive()) {
boolean isWaitForOther;
do {
lock.lock();
isWaitForOther = strategy.isContainsAPI(pool) && strategy.isAPIUnfinished(pool);
if (isWaitForOther) {
lock.unlock();
try {
logger.info(strategy.getClass().getSimpleName() + "其他线程在查询主接口,休眠1s");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
strategy.tryPutEmptyAPI(pool);
lock.unlock();
}
} while (isWaitForOther);
try {
if (strategy.fetchData(pool)) {
strategy.putAPIResponseIntoPool(pool);
} else {
strategy.removeEmptyAPI(pool);
}
} catch (Exception e) {
//如果发生异常的话,从pool里移出空的api
strategy.removeEmptyAPI(pool);
}
result = strategy.analyseData();
}
// 如果结果为null且备用接口可用
if (result == null && strategy.isSpareAPIActive()) {
boolean isWaitForOther;
do {
lock.lock();
isWaitForOther = strategy.isContainsSpareAPI(pool) && !strategy.isSpareAPIFinished(pool);
if (isWaitForOther) {
lock.unlock();
try {
logger.info(strategy.getClass().getSimpleName() + "其他线程在查询备用接口,休眠1s");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
strategy.tryPutEmptySpareAPI(pool);
lock.unlock();
}
} while (isWaitForOther);
try {
if (strategy.fetchSpareData(pool)) {
strategy.putSpareAPIReponseIntoPool(pool);
} else {
strategy.removeSpareAPI(pool);
}
} catch (Exception e) {
//如果发生异常的话,从pool里移出空的api
strategy.removeSpareAPI(pool);
}
result = strategy.analyseSpareData();
}
strategy.setModuleIntoReport(result, displayOrder);
return result;
}
}
抽象策略类
public abstract class BaseStrategy {
protected Report report;
protected SupplyAPIRepository supplyAPIRepository;
protected APISearchRepository apiSearchRepository;
protected final static JSONObject EMPTY_JSON = new JSONObject();
public BaseStrategy(Report report, SupplyAPIRepository supplyAPIRepository, APISearchRepository apiSearchRepository) {
this.report = report;
this.supplyAPIRepository = supplyAPIRepository;
this.apiSearchRepository = apiSearchRepository;
}
public abstract boolean isAPIActive();
public abstract boolean isContainsAPI(Map<String, JSONObject> pool);
public abstract boolean isAPIUnfinished(Map<String, JSONObject> pool);
public abstract void tryPutEmptyAPI(Map<String, JSONObject> pool);
public abstract void removeEmptyAPI(Map<String, JSONObject> pool);
public abstract void putAPIResponseIntoPool(Map<String, JSONObject> pool);
public abstract boolean fetchData(Map<String, JSONObject> pool);
public abstract BaseModule analyseData();
public abstract boolean isSpareAPIActive();
public abstract boolean isContainsSpareAPI(Map<String, JSONObject> pool);
public abstract boolean isSpareAPIFinished(Map<String, JSONObject> pool);
public abstract void tryPutEmptySpareAPI(Map<String, JSONObject> pool);
public abstract void removeSpareAPI(Map<String, JSONObject> pool);
public abstract void putSpareAPIReponseIntoPool(Map<String, JSONObject> pool);
public abstract boolean fetchSpareData(Map<String, JSONObject> pool);
public abstract BaseModule analyseSpareData();
public abstract void setModuleIntoReport(BaseModule module, Integer displayOrder);
public static class API {
private SupplyAPI supplyAPI;
private HashMap<String, String> parameters = new HashMap<>();
private HashMap<String, String> headers = new HashMap<>();
public SupplyAPI getSupplyAPI() {
return supplyAPI;
}
public void setSupplyAPI(SupplyAPI supplyAPI) {
this.supplyAPI = supplyAPI;
}
public HashMap<String, String> getParameters() {
return parameters;
}
public void setParameters(HashMap<String, String> parameters) {
this.parameters = parameters;
}
public HashMap<String, String> getHeaders() {
return headers;
}
public void setHeaders(HashMap<String, String> headers) {
this.headers = headers;
}
}
protected void putCache(API api, JSONObject apiResponse) {
APISearch apiSearchLog = new APISearch();
apiSearchLog.setCode(api.getSupplyAPI().getCode());
apiSearchLog.setCreatedAt(new Date());
apiSearchLog.setParameters(parametersToString(api.getParameters()));
apiSearchLog.setResult(apiResponse.toJSONString());
apiSearchRepository.save(apiSearchLog);
}
protected JSONObject getCache(API api) {
String parameters = parametersToString(api.getParameters());
APISearch ret = apiSearchRepository
.findFirstByCodeAndParametersOrderByCreatedAtDesc(api.getSupplyAPI().getCode(), parameters);
if (ret != null) {
Calendar calendar = Calendar.getInstance();
calendar.setTime(ret.getCreatedAt());
calendar.add(Calendar.SECOND, api.getSupplyAPI().getEffectiveTime());
Calendar calendarNow = Calendar.getInstance();
if (calendar.compareTo(calendarNow) < 0) {
return null;
}
return JSONObject.parseObject(ret.getResult());
} else {
return null;
}
}
private String parametersToString(Map<String, String> paramMap) {
// 用于排序的list
List<String> keyList = new ArrayList<>();
for (String key : paramMap.keySet()) {
if (paramMap.get(key) != null && !paramMap.get(key).equals("")) {
keyList.add(key);
}
}
// 对list进行排序
Collections.sort(keyList);
List<String> parameterList = new ArrayList<>();
// 将排序后的参数组成字符串
for (String key : keyList) {
parameterList.add(key + "=" + paramMap.get(key));
}
return StringUtils.join(parameterList, '&');
}
}
一个具体策略子类例子
public class FraudModuleStrategy extends BaseStrategy {
private static final Logger logger = LoggerFactory.getLogger(FraudModuleStrategy.class);
private final static long API_ID = 17L;
private final SupplyAPI api;
private JSONObject apiResponse;
public FraudModuleStrategy(Report report, SupplyAPIRepository supplyAPIRepository, APISearchRepository apiSearchRepository) {
super(report, supplyAPIRepository, apiSearchRepository);
this.api = this.supplyAPIRepository.findOne(API_ID);
}
@Override
public boolean isAPIActive() {
return api.isActive() && isParameterComplete();
}
private boolean isParameterComplete() {
return super.report.getCustomerName() != null && super.report.getCustomerIdCard() != null
&& super.report.getCustomerMobile() != null;
}
@Override
public boolean isContainsAPI(Map<String, JSONObject> pool) {
return pool.containsKey(api.getCode());
}
@Override
public boolean isAPIUnfinished(Map<String, JSONObject> pool) {
return EMPTY_JSON.equals(pool.get(api.getCode()));
}
@Override
public void tryPutEmptyAPI(Map<String, JSONObject> pool) {
if (!pool.containsKey(api.getCode())) {
pool.put(api.getCode(), EMPTY_JSON);
}
}
@Override
public void removeEmptyAPI(Map<String, JSONObject> pool) {
pool.remove(api.getCode());
}
@Override
public void putAPIResponseIntoPool(Map<String, JSONObject> pool) {
if (apiResponse != null && !EMPTY_JSON.equals(apiResponse)) {
pool.put(api.getCode(), apiResponse);
}
}
@Override
public boolean fetchData(Map<String, JSONObject> pool) {
JSONObject cacheData = pool.get(api.getCode());
if (cacheData != null && !EMPTY_JSON.equals(cacheData)) {
this.apiResponse = cacheData;
return true;
}
API api = new API();
api.setSupplyAPI(this.api);
api.getParameters().put("name", super.report.getCustomerName());
api.getParameters().put("idCard", super.report.getCustomerIdCard());
api.getParameters().put("mobile", super.report.getCustomerMobile());
if (!StringUtils.isEmpty(report.getCustomerBankCard())) {
api.getParameters().put("bankCardNo", super.report.getCustomerBankCard());
}
cacheData = getCache(api);
if (cacheData != null) {
logger.info("缓存查询" + this.api.getCode());
} else {
logger.info("即时查询" + this.api.getCode());
XinShuCallable xinShuCallable = new XinShuCallable(api.getSupplyAPI(), new HashMap<>(), api.getParameters(),
"");
cacheData = xinShuCallable.call();
logger.info(this.api.getCode() + " response:" + cacheData);
if (cacheData != null && "0000".equals(cacheData.getString("rc"))) {
putCache(api, cacheData);
}
}
if (cacheData == null) {
return false;
} else {
this.apiResponse = cacheData;
return true;
}
}
@Override
public BaseModule analyseData() {
//分析及封装操作,省略
}
@Override
public boolean isSpareAPIActive() {
return false;
}
@Override
public boolean isContainsSpareAPI(Map<String, JSONObject> pool) {
return false;
}
@Override
public boolean isSpareAPIFinished(Map<String, JSONObject> pool) {
return false;
}
@Override
public void tryPutEmptySpareAPI(Map<String, JSONObject> pool) {
}
@Override
public void removeSpareAPI(Map<String, JSONObject> pool) {
}
@Override
public void putSpareAPIReponseIntoPool(Map<String, JSONObject> pool) {
}
@Override
public boolean fetchSpareData(Map<String, JSONObject> pool) {
return false;
}
@Override
public BaseModule analyseSpareData() {
return null;
}
@Override
public void setModuleIntoReport(BaseModule module, Integer displayOrder) {
module.setDisplaySort(displayOrder);
report.setFraudModule((FraudModule) module);
}
}
过程中碰到的问题
- 在start方法里,lock和pool的new写在for循环里了,导致没有公用一个pool和lock,查了好久才发现。一个低级错误
- BaseStrategy的tryPutEmptyAPI方法,原先是putEmptyAPI,没有判断pool.containsKey(api.getCode()),这会导致EMPTY_JSON覆盖掉原来正确的响应
if (isWaitForOther) {
lock.unlock();
try {
logger.info(strategy.getClass().getSimpleName() + "其他线程在查询主接口,休眠1s");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
strategy.tryPutEmptyAPI(pool);
lock.unlock();
}
@Override
public void tryPutEmptyAPI(Map<String, JSONObject> pool) {
if (!pool.containsKey(api.getCode())) {
pool.put(api.getCode(), EMPTY_JSON);
}
}
一些不足
- 对于
一些关键问题
的第3点,具体策略子类还是需要完成调用api和分析api以外的一些操作,还是有些烦。 - 此外还有备用接口也需要写,这个看起来有点冗余。
实际中模块和接口大部分情况下是一对一的关系的,可以考虑再写个抽象类,把isAPIActive(), isContainsAPI()等方法再封装一下,这样就只剩下fetchData()和analyseData()方法需要写了。这个可以解决问题1。而对于问题2,可以考虑把备用接口从策略里分开,单独看成一个策略,即一个策略类里只有一组接口,ModuleCallable需要再改造一下。
网友评论