一、前戏
长轮询同步的代码复杂程度,相较于websocket、zookeeper确实增加不少。之前写数据同步方式时,看过一遍,没有看懂,遂放弃,改写了熔断与限流。 但是在好奇心的趋势下,我又啃起来了这段代码。
个人理解,长轮询这块主要分三个部分:本篇以soul-admin服务端视角,从数据的产生、数据的变更通知、同步到客户端展开分析,下篇以soul-bootstrap视角分析请求连接部分代码。
项目配置可以参照官网:https://dromara.org/zh/projects/soul/use-data-sync/
二、soul-admin启动--全量数据缓存更新
- HttpLongPollingDataChangedListener初始化
按条件创建bean
image- 实现InitializingBean接口的HttpLongPollingDataChangedListener
afterInitialize方法在程序加载完执行
@Override
protected void afterInitialize() {
long syncInterval = httpSyncProperties.getRefreshInterval().toMillis();
// 创建定时线程池,每5分钟执行一次,从DB中获取数据,更新缓存
scheduler.scheduleWithFixedDelay(() -> {
log.info("http sync strategy refresh config start.");
try {
this.refreshLocalCache();
log.info("http sync strategy refresh config success.");
} catch (Exception e) {
log.error("http sync strategy refresh config error!", e);
}
}, syncInterval, syncInterval, TimeUnit.MILLISECONDS);
log.info("http sync strategy refresh interval: {}ms", syncInterval);
}
三、soul-admin数据更改--增量同步
- DataChangedEventDispatcher订阅分发器
基于spring的发布订阅功能,当后台数据变动时,会有数据变动时间发出。在DataChangedEventDispatcher订阅分发器中,通知长轮询的监听器有数据变动。
- AbstractDataChangedListener处理更新缓存,调用子类实现afterPluginChanged方法
public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {
@Override
public void onPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
if (CollectionUtils.isEmpty(changed)) {
return;
}
// 更新缓存
this.updatePluginCache();
//
this.afterPluginChanged(changed, eventType);
}
}</pre>
* HttpLongPollingDataChangedListener$afterPluginChanged开启线程,根据传入的变动事件类型,处理通知请求。
<pre>public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener {
@Override
protected void afterPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
// 此处执行Runnable类型的的任务DataChangeTask,实现多线程调用
scheduler.execute(new DataChangeTask(ConfigGroupEnum.PLUGIN));
}
}
- 逐个响应clients中请求
数据变动后,操作clients一边遍历一边剔除元素,类似出队。调用sendResponse方法,响应缓存的客户端请求。
class DataChangeTask implements Runnable {
@Override
public void run() {
// 出队:遍历队列clients,逐个发送响应请求
for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) {
LongPollingClient client = iter.next();
// 从集合中移除,类似出队列
iter.remove();
// 说明完成 response 响应了
client.sendResponse(Collections.singletonList(groupKey));
log.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime);
}
}
}
至于这个clients是怎么来的呢?请接着往下看
四、soul-admin长轮询监听
- 建立连接
ConfigController$listener,提供一个/configs/listener
路径供网关调用,让网关与后台建立请求连接。
@RestController
@RequestMapping("/configs")
public class ConfigController {
@PostMapping(value = "/listener")
public void listener(final HttpServletRequest request, final HttpServletResponse response) {
longPollingListener.doLongPolling(request, response);
}
}
- 保持连接
public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {
// 比较数据是否变更,具体方式很精巧,后文再提
List<ConfigGroupEnum> changedGroup = compareChangedGroup(request);
String clientIp = getRemoteIp(request);
// 如果数据有变化,直接将数据构造响应信息并返回
if (CollectionUtils.isNotEmpty(changedGroup)) {
this.generateResponse(response, changedGroup);
log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);
return;
}
// 将请求转化为异步方式,并且不限制超时时间,监听数据的变换。 hold住请求
final AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0L);
// 新启一个线程,执行LongPollingClient$run方法
scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));
}
- 定时发送数据
@Override
public void run() {
// 线程每60秒执行一次
this.asyncTimeoutFuture = scheduler.schedule(() -> {
// 出队:内存缓存中移除该对象
clients.remove(LongPollingClient.this);
// 获取变化的数据
List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());
// 推送到远端:释放请求
sendResponse(changedGroups);
}, timeoutTime, TimeUnit.MILLISECONDS);
// 入队:自定义请求对象添加内存缓存,等待下个周期被宠信
clients.add(this);
}
小结:
- 长轮询监听定时发送这部分需要在仔细看下,未完待续
- 日拱一卒
网友评论