美文网首页
soul从入门到放弃5--浅析websocket数据同步(一)

soul从入门到放弃5--浅析websocket数据同步(一)

作者: 滴流乱转的小胖子 | 来源:发表于2021-01-19 23:01 被阅读0次

    零、前戏

    最近在“玩”demo的过程中,就一直好奇。插件信息等元数据,soul是怎么从soul-admin同步到soul-bootstrap中的。

    本着大胆瞎猜,小心求证的心态,就有今天的关于websocket这篇文章。

    一、服务端--soul-admin

    • 首先从jar入手,soul-admin中的只有一个spring-boot-starter-websocket官方jar包。

    猜测:服务端实现是在写在soul-admin项目内部

    • 通过全文搜索“websocket”,找到了DataSyncConfiguration
    image

    涉及websocket的配置初始化代码如下:

    image

    与yml中的配置项对应

    image
    • 进入WebsocketCollector中一探究竟
    image

    @ServerEndpoint("/websocket")科普帖子:https://blog.csdn.net/weixin_39793752/article/details/80949128

    @ServerEndpoint("/websocket")
    public class WebsocketCollector {
        private static final Set<Session> SESSION_SET = new CopyOnWriteArraySet<>();
        private static final String SESSION_KEY = "sessionKey";
        /**
         * On open.
         *
         * @param session the session
         */
        @OnOpen
        public void onOpen(final Session session) {
            log.info("websocket on open successful....");
            SESSION_SET.add(session);
        }
        /**
         * On message.
         * 
         * @param message the message
         * @param session the session
         */
        @OnMessage
        public void onMessage(final String message, final Session session) {
            if (message.equals(DataEventTypeEnum.MYSELF.name())) {
                try {
                    ThreadLocalUtil.put(SESSION_KEY, session);
                    SpringBeanUtils.getInstance().getBean(SyncDataService.class).syncAll(DataEventTypeEnum.MYSELF);
                } finally {
                    ThreadLocalUtil.clear();
                }
            }
        }
        /**
         * On close.
         *
         * @param session the session
         */
        @OnClose
        public void onClose(final Session session) {
            SESSION_SET.remove(session);
            ThreadLocalUtil.clear();
        }
        /**
         * On error.
         *
         * @param session the session
         * @param error   the error
         */
        @OnError
        public void onError(final Session session, final Throwable error) {
            SESSION_SET.remove(session);
            ThreadLocalUtil.clear();
            log.error("websocket collection error: ", error);
        }
        /**
         * Send.
         *
         * @param message the message
         * @param type    the type
         */
        public static void send(final String message, final DataEventTypeEnum type) {
            if (StringUtils.isNotBlank(message)) {
                if (DataEventTypeEnum.MYSELF == type) {
                    try {
                        Session session = (Session) ThreadLocalUtil.get(SESSION_KEY);
                        if (session != null) {
                            session.getBasicRemote().sendText(message);
                        }
                    } catch (IOException e) {
                        log.error("websocket send result is exception: ", e);
                    }
                    return;
                }
                for (Session session : SESSION_SET) {
                    try {
                        session.getBasicRemote().sendText(message);
                    } catch (IOException e) {
                        log.error("websocket send result is exception: ", e);
                    }
                }
            }
        }
    }
    

    @OnOpen:连接建立成功调用的方法

    @OnClose:连接关闭调用方法

    @OnMessage:收到客户端消息后调用的方法

    @OnError:发送错误时调用

    • 当服务端收到一条MYSELF类型的消息,将同步全量元数据信息
    • admin 会推送一次全量数据,后续如果配置数据发生变更则将增量数据通过 websocket 主动推送给 soul-web

    详细机制此处暂留坑,后文再续

    image

    二、客户端--soul-bootstrap

    从pom文件入手,找到了soul-spring-boot-starter-sync-data-websocket中的WebsocketSyncDataConfiguration类

    image
    • 深入WebsocketSyncDataService
    image
    • 按照配置soul-admin的个数,创建线程池大小
    • 通过线程池的定时任务,实现心跳机制----保持心跳
    • /使用调度线程池进行断线重连,30秒进行一次。如果连接断开,将client.connectBlocking(3000, TimeUnit.MILLISECONDS);重连,重新发送MYSELF类型信息,进行全量更新。
    image

    线程池科普贴:https://blog.csdn.net/weixin_35756522/article/details/81707276

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);

    command:执行线程

    initialDelay:初始化延时

    period:两次开始执行最小间隔时间

    unit:计时单位

    • SoulWebsocketClient 是具体处理websocket类
    image
    • 在与服务端建立连接时,会发送MYSELF类型的消息。****如上文所述,服务端接收****MYSELF类****型消息后将发送全量消息用于同步。

    小结:

    • soul-admin与soul-bootstrap基本通信链路,基本捋顺。相关数据处理的细节,将在后续文章中浅析。
    • 日拱一卒,每天进步一点点

    相关文章

      网友评论

          本文标题:soul从入门到放弃5--浅析websocket数据同步(一)

          本文链接:https://www.haomeiwen.com/subject/wjchzktx.html