美文网首页
RobotFramework二次开发——Socket推送实时日志

RobotFramework二次开发——Socket推送实时日志

作者: 点点寒彬 | 来源:发表于2017-12-08 16:21 被阅读102次

背景

上一章说道RobotFramework的执行监听,是通过Socket服务来实现的,那么现在就来实现一下简单的Socket服务

思路

Socket服务有一个client端和一个server端,Listener.py我们可以当成一个Client,因为在执行的时候,我们需要Listener来发送实时数据,因此,我们还要另写一段服务端的代码。

改造Listener.py

首先,要开启要一个Socket服务

    def __init__(self):
        self.sock = socket.socket()
        self.conn = self.sock.connect(("127.0.0.1", 50007))

在类的初始化构造中起一个连接,端口可以先暂时指定,如果有需求,再改造成参数化传入。
之前我们获取的实时数据都是用print的方式打印出来,现在我们要把每个print的内容变成send_socket

那么就要构造一个发送消息的方法。

    def _send_socket(self, msg):
        self.sock.sendall(msg)

类似这样的,往连接的端口发送数据。

再把所有的print内容用_send_socket方法发送出去。

    def log_message(self, message):
        # print message['timestamp'] + " :   " + message['level'] + " : " + message['message']
        self._send_socket(message['timestamp'] + " :   " + message['level'] + " : " + message['message'])

这样,我们的Listerner就改造完成了。

Server端

Server端就比较简单了,直接网上找一个代码改改就好。

import socket

HOST = ''  # Symbolic name meaning all available interfaces
PORT = 50007  # Arbitrary non-privileged port
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((HOST, PORT))
s.listen(1)
conn, addr = s.accept()
print 'Connected by', addr
while 1:
    data = conn.recv(1024)
    print data
    if not data: break
conn.close()

说明一下,上面代码中拿到了数据,也是用print方法打印出来,这里是可以构造很多后续操作的,比如写文件,或者集成到web环境时,可以用yield返回stream数据,在web端实时展示数据。

最后没有数据返回的时候,就关闭连接。

完整代码

Listener.py

# ecoding=utf-8
# Author: Sven_Weng
# Email : sven_weng@wengyb.com
# Web   : http://wybblog.applinzi.com
import socket


class RobotListener(object):
    ROBOT_LISTENER_API_VERSION = 2

    def __init__(self):
        self.sock = socket.socket()
        self.conn = self.sock.connect(("127.0.0.1", 50007))

    def start_suite(self, name, args):
        self._send_socket("Starting Suite : " + name + "  " + args['source'])

    def start_test(self, name, args):
        self._send_socket("Starting test : " + name)
        if args['template']:
            print 'Template is : ' + args['template']

    def end_test(self, name, args):
        self._send_socket("Ending test:  " + args['longname'])
        self._send_socket("Test Result is : " + args['status'])
        self._send_socket("Test Time is: " + str(args['elapsedtime']))

    def log_message(self, message):
        self._send_socket(message['timestamp'] + " :   " + message['level'] + " : " + message['message'])

    def _send_socket(self, msg):
        self.sock.sendall(msg)

Server.py

import socket

HOST = ''  # Symbolic name meaning all available interfaces
PORT = 50007  # Arbitrary non-privileged port
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((HOST, PORT))
s.listen(1)
conn, addr = s.accept()
print 'Connected by', addr
while 1:
    data = conn.recv(1024)
    print data
    if not data: break
conn.close()

相关文章

网友评论

      本文标题:RobotFramework二次开发——Socket推送实时日志

      本文链接:https://www.haomeiwen.com/subject/vjneixtx.html