美文网首页
Netty Websocket client接收数据分片(数据帧

Netty Websocket client接收数据分片(数据帧

作者: facio | 来源:发表于2020-03-27 17:43 被阅读0次

    1.问题

    11月21日,云南勐腊发生地震,ICL在推送相关地震数据后,Netty客户端收到的数据在日志中显示为不完整的JSON数据,导致json反序列化失败:

    pis.png

    2.分析

    PIS作为长链接的客户端,目前与ICL的通信协议是websocket,对应的数据处理报文类:io.netty.handler.codec.http.websocketx.TextWebSocketFrame

    WebSocketFrame frame = (WebSocketFrame) msg;
    if (frame instanceof TextWebSocketFrame) {
        TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
     String text = textFrame.text();
     LOGGER.info("【PIS-client】 received message:{}", text);
     TypeProtocol typeProtocol = JSON.parseObject(text, TypeProtocol.class);
     MessageAction<String, String> action = dispatcher.getMsgAction(typeProtocol.getType());
     String response = action.process(text);
     if (StringUtils.isNotBlank(response)) {
            TextWebSocketFrame socketFrame = new TextWebSocketFrame(response);
     ch.writeAndFlush(socketFrame);
     }
    } else if (frame instanceof PongWebSocketFrame) {
        LOGGER.info("【PIS-client】 received pong");
    } else if (frame instanceof CloseWebSocketFrame) {
        LOGGER.info("【PIS-client】 received closing");
     ch.close();
    } else if (frame instanceof BinaryWebSocketFrame) {
        LOGGER.info("【PIS-client】 received Binary");
    }
    

    初步怀疑是因为ICL发送的数据超出一定长度导致数据分片发送或被截断。

    3.处理

    本地启动Netty 服务和Netty Client,尝试使用相似的数据复现该问题,未成功复现。

    4.解决

    查阅相关网上资料后,得到如下信息:

    websocket定义了一种“分片帧”,如果消息比较大,一次性接收不完,可以把数据分片传输。netty的实现就是:ContinuationWebSocketFrame 它有个属性:finalFragment 表示当前的数据,是否是最后一帧数据。

    针对该信息,调整了处理逻辑:

    WebSocketFrame frame = (WebSocketFrame) msg;
    LOGGER.info("Received incoming frame [{}]", frame.getClass().getName());
    
    if (frame instanceof CloseWebSocketFrame) {
        LOGGER.info("【PIS-client】 received CloseWebSocketFrame");
     if (frameBuffer != null) {
            handleMessageCompleted(ctx, frameBuffer.toString());
     }
        frameBuffer = null;
     handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
     return;
    } else if (frame instanceof PongWebSocketFrame) {
        LOGGER.info("【PIS-client】 received pong");
     return;
    } else if (frame instanceof BinaryWebSocketFrame) {
        LOGGER.info("【PIS-client】 received Binary");
     return;
    }
    
    //正常业务数据
    if (frame instanceof TextWebSocketFrame) {
        frameBuffer = new StringBuilder();
     TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
     String text = textFrame.text();
     LOGGER.info("【PIS-client】received TextWebSocketFrame message:{}", text);
     frameBuffer.append(text);
    } else if (frame instanceof ContinuationWebSocketFrame) {
        ContinuationWebSocketFrame continuationWebSocketFrame = (ContinuationWebSocketFrame) frame;
     String cwsText = continuationWebSocketFrame.text();
     LOGGER.info("【PIS-client】 received ContinuationWebSocketFrame ,value:{}", cwsText);
     if (frameBuffer != null) {
            frameBuffer.append(cwsText);
     } else {
            LOGGER.warn("【PIS-client】Continuation frame received without initial frame.");
     }
    } else {
        LOGGER.warn("【PIS-client】 received unsupported frame {}", frame.toString());
     return;
    }
    
    // Check if Text or Continuation Frame is final fragment and handle if needed.
    if (frame.isFinalFragment()) {
        handleMessageCompleted(ctx, frameBuffer.toString());
     frameBuffer = null;
    }
    

    其中,frameBuffer为在当前的handler定义的成员变量stringBuilder,用来拼接TextWebSocketFrameContinuationWebSocketFrame中出现的文本字符。

    目前,使用修改后的逻辑接收ICL提供的数据,未再发现问题。

    5.疑问

    网上对ContinuationWebSocketFrame的介绍比较少,目前还有以下疑点:

    1. websocketframe的分片行为,到底是服务端行为,还是客户端行为

    2. “分片帧”的大小是多少,如何控制

      后续如果对这些问题有进展,会在本文进行更新。

    相关文章

      网友评论

          本文标题:Netty Websocket client接收数据分片(数据帧

          本文链接:https://www.haomeiwen.com/subject/evkruhtx.html