美文网首页
RobotFramework二次开发——Socket推送实时日志

RobotFramework二次开发——Socket推送实时日志

作者: 点点寒彬 | 来源:发表于2017-12-08 16:21 被阅读102次

    背景

    上一章说道RobotFramework的执行监听,是通过Socket服务来实现的,那么现在就来实现一下简单的Socket服务

    思路

    Socket服务有一个client端和一个server端,Listener.py我们可以当成一个Client,因为在执行的时候,我们需要Listener来发送实时数据,因此,我们还要另写一段服务端的代码。

    改造Listener.py

    首先,要开启要一个Socket服务

        def __init__(self):
            self.sock = socket.socket()
            self.conn = self.sock.connect(("127.0.0.1", 50007))
    

    在类的初始化构造中起一个连接,端口可以先暂时指定,如果有需求,再改造成参数化传入。
    之前我们获取的实时数据都是用print的方式打印出来,现在我们要把每个print的内容变成send_socket

    那么就要构造一个发送消息的方法。

        def _send_socket(self, msg):
            self.sock.sendall(msg)
    

    类似这样的,往连接的端口发送数据。

    再把所有的print内容用_send_socket方法发送出去。

        def log_message(self, message):
            # print message['timestamp'] + " :   " + message['level'] + " : " + message['message']
            self._send_socket(message['timestamp'] + " :   " + message['level'] + " : " + message['message'])
    

    这样,我们的Listerner就改造完成了。

    Server端

    Server端就比较简单了,直接网上找一个代码改改就好。

    import socket
    
    HOST = ''  # Symbolic name meaning all available interfaces
    PORT = 50007  # Arbitrary non-privileged port
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind((HOST, PORT))
    s.listen(1)
    conn, addr = s.accept()
    print 'Connected by', addr
    while 1:
        data = conn.recv(1024)
        print data
        if not data: break
    conn.close()
    

    说明一下,上面代码中拿到了数据,也是用print方法打印出来,这里是可以构造很多后续操作的,比如写文件,或者集成到web环境时,可以用yield返回stream数据,在web端实时展示数据。

    最后没有数据返回的时候,就关闭连接。

    完整代码

    Listener.py
    
    # ecoding=utf-8
    # Author: Sven_Weng
    # Email : sven_weng@wengyb.com
    # Web   : http://wybblog.applinzi.com
    import socket
    
    
    class RobotListener(object):
        ROBOT_LISTENER_API_VERSION = 2
    
        def __init__(self):
            self.sock = socket.socket()
            self.conn = self.sock.connect(("127.0.0.1", 50007))
    
        def start_suite(self, name, args):
            self._send_socket("Starting Suite : " + name + "  " + args['source'])
    
        def start_test(self, name, args):
            self._send_socket("Starting test : " + name)
            if args['template']:
                print 'Template is : ' + args['template']
    
        def end_test(self, name, args):
            self._send_socket("Ending test:  " + args['longname'])
            self._send_socket("Test Result is : " + args['status'])
            self._send_socket("Test Time is: " + str(args['elapsedtime']))
    
        def log_message(self, message):
            self._send_socket(message['timestamp'] + " :   " + message['level'] + " : " + message['message'])
    
        def _send_socket(self, msg):
            self.sock.sendall(msg)
    
    
    Server.py
    
    import socket
    
    HOST = ''  # Symbolic name meaning all available interfaces
    PORT = 50007  # Arbitrary non-privileged port
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind((HOST, PORT))
    s.listen(1)
    conn, addr = s.accept()
    print 'Connected by', addr
    while 1:
        data = conn.recv(1024)
        print data
        if not data: break
    conn.close()
    

    相关文章

      网友评论

          本文标题:RobotFramework二次开发——Socket推送实时日志

          本文链接:https://www.haomeiwen.com/subject/vjneixtx.html