1. 需求背景
在支付系统中,因为上游通道的差异化,有时会出现这种情形:
-
上游提供的接口是异步的,即上游接收请求后,立即响应处理中(中间状态),最终处理结果以异步通知的形式告知;
-
平台对接多家上游机构,有同步响应也有异步响应,为了标准统一,平台对下游统一封装了同步响应接口,方便下游处理业务。
此时,对于异步响应的通道,平台需要通过异步转同步功能,让下游无感知。
2. 方案简介
请求与响应流程具体流程见上图:
- 客户端同步请求服务端(集群);
-
2.1 将orderNo/uniqueId作为key/value 放入redis缓存,其中uniqueId用于异步转同步识别请求线程;
-
2.2 服务端请求第三方服务;
(中间忽略了同步响应的中间状态响应)
- 第三方发起异步响应到服务端集群;
(此时,不能保证异步响应路由到源请求的节点上)
-
4.1 按照orderNo查询redis,获取请求的uniqueId;
-
4.2 按约定组装结果放入MQ中(此处使用RabbitMq演示);
- MQ向消费者广播;
-
5.1 节点接收广播后,识别是该节点发出的请求,则进行响应处理;
-
5.2 节点接收广播后,识别不是该节点发出的请求,则丢弃该通知;
- 在未超时的情况下,同步响应客户端。
3. 技术要点
参考Dubbo 2.5.x com.alibaba.dubbo.remoting.exchange.support. DefaultFuture类
主要使用 ReentrantLock解决互斥问题,使用 Condition 实现超时等待功能。对DefaultFuture做适当的简化之后,示例如下:
public class DefaultFuture {
private static final Map<String, DefaultFuture> FUTURES = new ConcurrentHashMap<>();
private final String id;
private final int timeout;
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
private final long start = System.currentTimeMillis();
private volatile Response response;
private final Request request;
public DefaultFuture(Request request, int timeout){
this.id = request.getUniqueId();
this.timeout = timeout > 0 ? timeout : Constants.DEFAULT_TIMEOUT;
this.request = request;
// put into waiting map.
FUTURES.put(id, this);
}
/**
* 阻塞获取响应
* @return
* @throws TimeoutException
*/
public Object get() throws Exception {
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
// 超时等待
done.await(timeout, TimeUnit.MILLISECONDS);
// 如果有返回结果了,或者,超时了,就退出循环
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
// 如果是超时了,就抛出异常
if (!isDone()) {
throw new TimeoutException(request.getUniqueId());
}
}
// 远程服务正常返回结果,则返回给调用方
return returnFromResponse();
}
private Object returnFromResponse() throws Exception {
Response res = response;
if (res.getStatus() == Response.OK) {
return response.getResult();
}
if (res.getStatus() == Response.TIMEOUT) {
throw new TimeoutException(res.getErrorMessage());
}
FUTURES.remove(id);
throw new Exception(res.getErrorMessage());
}
/**
* 是否响应
* @return
*/
private boolean isDone(){
return this.response != null;
}
public static void received(Response response) {
try {
// 根据请求id从FUTURES中获取DefaultFuture,并删除
DefaultFuture future = FUTURES.remove(response.getUniqueId());
if (future != null) {
future.doReceived(response);
} else {
log.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response);
}
} finally {
}
}
private void doReceived(Response response) {
lock.lock();
try {
this.response = response;
this.response.setStatus(Response.OK);
if (done != null) {
// 唤醒阻塞的线程
done.signal();
}
} finally {
lock.unlock();
}
}
/**
* 是否是正确的响应
* @param id
* @return
*/
public static boolean isCorrectResponse(String id){
return FUTURES.containsKey(id);
}
private int getTimeout() {
return timeout;
}
private long getStartTimestamp() {
return start;
}
public String getId() {
return id;
}
private static class RemotingInvocationTimeoutScan implements Runnable {
@Override
public void run() {
while (true) {
try {
for (DefaultFuture future : FUTURES.values()) {
if (future == null || future.isDone()) {
continue;
}
if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {
// create exception response.
Response timeoutResponse = new Response(future.getId());
// set timeout status.
timeoutResponse.setErrorMessage("响应超时");
// handle response.
DefaultFuture.received(timeoutResponse);
}
}
Thread.sleep(30);
} catch (Throwable e) {
log.error("Exception when scan the timeout invocation of remoting.", e);
}
}
}
}
static {
// 自动清除超时任务
Thread th = new Thread(new RemotingInvocationTimeoutScan(), "remoting-invocation-timeout-scan");
th.setDaemon(true);
th.start();
}
}
4. 测试
- 测试类接收请求后,直接组装响应,将响应放入mq,然后调用DefaultFuture.get()阻塞等待响应;
- mq发送广播,测试项目接收后,按流程通过DefaultFuture.received()处理响应;
- DefaultFuture.doReceived()方法中,唤醒等待的请求线程,测试完成。
线程组 TPS RabbitMQ测试环境: Macbook Pro 8G + Spring boot(单实例)+ jmeter(共100个线程)
网友评论