美文网首页
Flask-SocketIO传输二进制单元测试的Bug和修改

Flask-SocketIO传输二进制单元测试的Bug和修改

作者: JianMing | 来源:发表于2017-11-16 15:44 被阅读165次

最近用了Flask-SocketIO,因为要和一些性能比较差的机器通信,所以数据格式并没有采用传统的json,而是采用Google的跨平台序列化工具FlatBuffers,它的结构化数据都以二进制形式保存,所以需要用SocketIO传输二进制格式,这个Flask-SocketIO支持的。

但在写单元测试的的时候发现Flask-SocketIO的单元测试会异常,代码和异常如下:

  • test.py
from flask import Flask, render_template
from flask_socketio import SocketIO, emit

app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socket_io = SocketIO(binary=True)
socket_io.init_app(app)


@app.route('/')
def index():
    return 'hello'


@socket_io.on('event')
def test_message(message):
    print("Success: %s", str(message))

if __name__ == '__main__':
    socket_io.run(app)
  • test.py
import unittest
from test_app import app, socket_io


class FlaskrTestCase(unittest.TestCase):

    def setUp(self):
        self.app = app
        self.app.config['TESTING'] = True

        self.socket_io_client = socket_io.test_client(self.app)

    def test_socket_io(self):

        self.socket_io_client.emit('event', bytes(b'1111111'), binary=True)

    def tearDown(self):
        pass

if __name__ == '__main__':
    unittest.main()

  • 异常信息
Testing started at 5:39 PM ...
decode: encoded_packet[['51-["event",{"_placeholder":true,"num":0}]', b'1111111']]

Error
Traceback (most recent call last):
  File "/mnt/hgfs/Share/fogstor-server/test.py", line 15, in test_socket_io
    self.socket_io_client.emit('event', bytes(b'1111111'), binary=True)
  File "/home/jeff/develop-env/fosstor_3.5/lib/python3.5/site-packages/flask_socketio/test_client.py", line 117, in emit
    self.socketio.server._handle_eio_message(self.sid, pkt.encode())
  File "/home/jeff/develop-env/fosstor_3.5/lib/python3.5/site-packages/socketio/server.py", line 522, in _handle_eio_message
    raise ValueError('Unknown packet type.')
ValueError: Unknown packet type.


Process finished with exit code 0

这是由于Flask-SocketIO在二进制传输应用不多,所以代码并没有很好的覆盖到,只需要将

# flask_socketio文件夹下的test_client.py
......

def emit(self, event, *args, **kwargs):
        """Emit an event to the server.

        :param event: The event name.
        :param *args: The event arguments.
        :param callback: ``True`` if the client requests a callback, ``False``
                         if not. Note that client-side callbacks are not
                         implemented, a callback request will just tell the
                         server to provide the arguments to invoke the
                         callback, but no callback is invoked. Instead, the
                         arguments that the server provided for the callback
                         are returned by this function.
        :param namespace: The namespace of the event. The global namespace is
                          assumed if this argument is not provided.
        """
        namespace = kwargs.pop('namespace', None)
        callback = kwargs.pop('callback', False)
        id = None
        if callback:
            self.callback_counter += 1
            id = self.callback_counter
        pkt = packet.Packet(packet.EVENT, data=[event] + list(args),
                            namespace=namespace, id=id, binary=False)

        self.ack = None
        with self.app.app_context():
            self.socketio.server._handle_eio_message(self.sid, pkt.encode())

        if self.ack is not None:
            return self.ack['args'][0] if len(self.ack['args']) == 1 \
                else self.ack['args']

......

修改为

# flask_socketio文件夹下的test_client.py
......

def emit(self, event, *args, **kwargs):
        """Emit an event to the server.

        :param event: The event name.
        :param *args: The event arguments.
        :param callback: ``True`` if the client requests a callback, ``False``
                         if not. Note that client-side callbacks are not
                         implemented, a callback request will just tell the
                         server to provide the arguments to invoke the
                         callback, but no callback is invoked. Instead, the
                         arguments that the server provided for the callback
                         are returned by this function.
        :param namespace: The namespace of the event. The global namespace is
                          assumed if this argument is not provided.
        """
        namespace = kwargs.pop('namespace', None)
        callback = kwargs.pop('callback', False)
        binary = kwargs.pop('binary', False)
        id = None
        if callback:
            self.callback_counter += 1
            id = self.callback_counter
        pkt = packet.Packet(packet.EVENT, data=[event] + list(args),
                            namespace=namespace, id=id, binary=binary)

        self.ack = None
        with self.app.app_context():
            if binary:
                encoded_packet = pkt.encode()
                self.socketio.server._handle_eio_message(self.sid, encoded_packet[0])
                self.socketio.server._handle_eio_message(self.sid, encoded_packet[1])
            else:
                self.socketio.server._handle_eio_message(self.sid, pkt.encode())

        if self.ack is not None:
            return self.ack['args'][0] if len(self.ack['args']) == 1 \
                else self.ack['args']

......

就可以单元测试了:

Testing started at 5:40 PM ...
decode: encoded_packet[51-["event",{"_placeholder":true,"num":0}]]
reconstruct_binary: data[['event', {'_placeholder': True, 'num': 0}]], attachments[[b'1111111']]
Success: %s b'1111111'

Process finished with exit code 0

相关文章

  • Flask-SocketIO传输二进制单元测试的Bug和修改

    最近用了Flask-SocketIO,因为要和一些性能比较差的机器通信,所以数据格式并没有采用传统的json,而是...

  • Android单元测试总结

    单元测试小总结 ​ 单元测试往往在产品赶着上线的情况下被忽视。然后单元测试往往会节约大量修改bug的时间。还有一点...

  • karma+jasmine前端单元测试

    karma+jasmine前端单元测试 Q:为何要单元测试?A:为了提升代码的质量、减少bug、快速定位bug、减...

  • SDK版本管理方案

    1.禁止将开发功能的分支合并到修改bug的分支2.在本地分别拉取开发功能的分支和修改bug的分支3.修改bug的分...

  • JS单元测试及原理

    单元测试 单元测试是指对软件中的最小可测试单元进行检查和验证,通过单元测试可以检测出潜在的bug,还可以快速反馈功...

  • http2笔记

    http2的特点: 二进制传输:所有请求响应都以二进制编码,流式传输,最小单位是帧;传统http1传输的是文本,最...

  • python3编码转换

    进行编码传输的原因 因为网络传输是采用二进制信号进行传输的, 所以字符串、图片等格式的数据都需要编码成为二进制字节...

  • 2018-05-05 五月第一周

    时间:2018.04.16~2018.04.22 员工:周一 工作内容: 1、可研单元测试 2、概算的开发和bug...

  • swift 单元测试1

    swift 单元测试1 1、为什么要进行单元测试? 答:单元测试是为了避免你的app变成充满bug的软件,让我们在...

  • URL 编、解码(EncodedString、DecodedSt

    开发中对文本传输或二进制传输,都需要将传输的对象进行二进制字节的转化操作,所以无异于编、解码便会经常用到的操作; ...

网友评论

      本文标题:Flask-SocketIO传输二进制单元测试的Bug和修改

      本文链接:https://www.haomeiwen.com/subject/pxnlvxtx.html