美文网首页
soul从入门到放弃16--长轮询同步(一)

soul从入门到放弃16--长轮询同步(一)

作者: 滴流乱转的小胖子 | 来源:发表于2021-02-05 07:15 被阅读0次

一、前戏

长轮询同步的代码复杂程度,相较于websocket、zookeeper确实增加不少。之前写数据同步方式时,看过一遍,没有看懂,遂放弃,改写了熔断与限流。 但是在好奇心的趋势下,我又啃起来了这段代码。

个人理解,长轮询这块主要分三个部分:本篇以soul-admin服务端视角,从数据的产生、数据的变更通知、同步到客户端展开分析,下篇以soul-bootstrap视角分析请求连接部分代码。

项目配置可以参照官网:https://dromara.org/zh/projects/soul/use-data-sync/

二、soul-admin启动--全量数据缓存更新

  • HttpLongPollingDataChangedListener初始化

按条件创建bean

image
  • 实现InitializingBean接口的HttpLongPollingDataChangedListener
image

afterInitialize方法在程序加载完执行

@Override
protected void afterInitialize() {
    long syncInterval = httpSyncProperties.getRefreshInterval().toMillis();
    // 创建定时线程池,每5分钟执行一次,从DB中获取数据,更新缓存
    scheduler.scheduleWithFixedDelay(() -> {
        log.info("http sync strategy refresh config start.");
        try {
            this.refreshLocalCache();
            log.info("http sync strategy refresh config success.");
        } catch (Exception e) {
            log.error("http sync strategy refresh config error!", e);
        }
    }, syncInterval, syncInterval, TimeUnit.MILLISECONDS);
    log.info("http sync strategy refresh interval: {}ms", syncInterval);
}

三、soul-admin数据更改--增量同步

  • DataChangedEventDispatcher订阅分发器

基于spring的发布订阅功能,当后台数据变动时,会有数据变动时间发出。在DataChangedEventDispatcher订阅分发器中,通知长轮询的监听器有数据变动。

  • AbstractDataChangedListener处理更新缓存,调用子类实现afterPluginChanged方法
public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {
    @Override
  public void onPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
    if (CollectionUtils.isEmpty(changed)) {
      return;
    }
    // 更新缓存
    this.updatePluginCache();
    // 
    this.afterPluginChanged(changed, eventType);
  }
}</pre>

*   HttpLongPollingDataChangedListener$afterPluginChanged开启线程,根据传入的变动事件类型,处理通知请求。

<pre>public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener {

    @Override
  protected void afterPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
  // 此处执行Runnable类型的的任务DataChangeTask,实现多线程调用
    scheduler.execute(new DataChangeTask(ConfigGroupEnum.PLUGIN));
  }
}
  • 逐个响应clients中请求

数据变动后,操作clients一边遍历一边剔除元素,类似出队。调用sendResponse方法,响应缓存的客户端请求。

class DataChangeTask implements Runnable {

  @Override
  public void run() {
    // 出队:遍历队列clients,逐个发送响应请求
    for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) {
      LongPollingClient client = iter.next();
      // 从集合中移除,类似出队列
      iter.remove();
      // 说明完成 response 响应了
      client.sendResponse(Collections.singletonList(groupKey));
      log.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime);
    }
  }
}

至于这个clients是怎么来的呢?请接着往下看

四、soul-admin长轮询监听

  • 建立连接

ConfigController$listener,提供一个/configs/listener 路径供网关调用,让网关与后台建立请求连接。

@RestController
@RequestMapping("/configs")
public class ConfigController {

  @PostMapping(value = "/listener")
  public void listener(final HttpServletRequest request, final HttpServletResponse response) {
    longPollingListener.doLongPolling(request, response);
  }
}
  • 保持连接
public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {
    // 比较数据是否变更,具体方式很精巧,后文再提
    List<ConfigGroupEnum> changedGroup = compareChangedGroup(request);
    String clientIp = getRemoteIp(request);
    // 如果数据有变化,直接将数据构造响应信息并返回
    if (CollectionUtils.isNotEmpty(changedGroup)) {
        this.generateResponse(response, changedGroup);
        log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);
        return;
    }
    // 将请求转化为异步方式,并且不限制超时时间,监听数据的变换。  hold住请求
    final AsyncContext asyncContext = request.startAsync();
    asyncContext.setTimeout(0L);
    // 新启一个线程,执行LongPollingClient$run方法
    scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));
}
  • 定时发送数据
@Override
public void run() {
    // 线程每60秒执行一次
    this.asyncTimeoutFuture = scheduler.schedule(() -> {
        // 出队:内存缓存中移除该对象
        clients.remove(LongPollingClient.this);
        // 获取变化的数据
        List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());
        // 推送到远端:释放请求
        sendResponse(changedGroups);
    }, timeoutTime, TimeUnit.MILLISECONDS);
    // 入队:自定义请求对象添加内存缓存,等待下个周期被宠信
    clients.add(this);
}

小结:

  • 长轮询监听定时发送这部分需要在仔细看下,未完待续
  • 日拱一卒

相关文章

网友评论

      本文标题:soul从入门到放弃16--长轮询同步(一)

      本文链接:https://www.haomeiwen.com/subject/hshwtltx.html