美文网首页程序员软件测试Python专家之路工具癖
python工具库介绍-dubbo:通过telnet接口访问du

python工具库介绍-dubbo:通过telnet接口访问du

作者: python测试开发 | 来源:发表于2018-10-10 16:42 被阅读97次

    简介

    dubbo服务发布之后,我们可以利用telnet命令进行调试、管理。更多资料参见 Telnet命令参考手册

    telnet 调用示例:

    
    $ telnet 172.17.103.110 9097
    Trying 172.17.103.110...
    Connected to 172.17.103.110.
    Escape character is '^]'.
    
    dubbo>ls com.oppo.sso.service.onekey.IOnekeyRegister
    register
    dubbo>invoke com.oppo.sso.service.onekey.IOnekeyRegister.register({"applicationKey":"mac","imei":"","mobile":"13244448888","createIP":"127.0.0.1","createBy":"172.17.0.1"})
    {"configCountry":null,"userIdLong":0,"appPackage":null,"appVersion":null,"accountName":null,"romVersion":null,"resultCode":3001,"thirdStatus":null,"registerType":0,"sendChannel":null,"operator":null,"manufacturer":null,"password":null,"osVersion":null,"lock":false,"model":null,"visitorLocked":false,"OK":false,"brand":null,"email":null,"createIP":null,"deny":false,"encryptEmail":null,"sessionKey":null,"thirdId":null,"passwordOriginal":null,"mobile":null,"applicationKey":null,"thirdpartyOk":false,"userAgent":null,"userName":null,"resultDesc":"应用不存在","userId":0,"encryptMobile":null,"emailStatus":null,"createBy":null,"freePwd":false,"changeTimes":0,"createTime":null,"mobileStatus":null,"oldLock":false,"codeTimeout":null,"lastUpdate":null,"imei":null,"sessionTimeout":0,"sdkVersion":null,"networkID":0,"status":null}
    elapsed: 98 ms.
    dubbo>
    
    

    Python实现

    源码:

    
    """
    Name: dubbo.py
    
    Tesed in python3.5
    """
    
    import json
    import telnetlib
    import socket
    
    
    class Dubbo(telnetlib.Telnet):
    
        prompt = 'dubbo>'
        coding = 'utf-8'
    
        def __init__(self, host=None, port=0,
                     timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
            super().__init__(host, port)
            self.write(b'\n')
    
        def command(self, flag, str_=""):
            data = self.read_until(flag.encode())
            self.write(str_.encode() + b"\n")
            return data
    
        def invoke(self, service_name, method_name, arg):
            command_str = "invoke {0}.{1}({2})".format(
                service_name, method_name, json.dumps(arg))
            self.command(Dubbo.prompt, command_str)
            data = self.command(Dubbo.prompt, "")
            data = json.loads(data.decode(Dubbo.coding,
                                          errors='ignore').split('\n')[0].strip())
            return data
    
    if __name__ == '__main__':
    
        conn = Dubbo('172.17.103.110', 9097)
    
        json_data = {
            "applicationKey":"mac",
            "imei":"",
            "mobile":"13244448888",
            "createIP":"127.0.0.1",
            "createBy":"172.17.0.1"
        }
    
        result = conn.invoke(
            "com.oppo.sso.service.onekey.IOnekeyRegister",
            "register",
            json_data
        )
        print(result)
    

    执行结果:

    
    # python3 dubbo.py 
    {'manufacturer': None, 'applicationKey': None, 'OK': False, 'codeTimeout': None, 'password': None, 'encryptEmail': None, 'passwordOriginal': None, 'thirdId': None, 'emailStatus': None, 'freePwd': False, 'sessionTimeout': 0, 'createTime': None, 'osVersion': None, 'lastUpdate': None, 'email': None, 'sdkVersion': None, 'registerType': 0, 'sendChannel': None, 'visitorLocked': False, 'createIP': None, 'thirdStatus': None, 'encryptMobile': None, 'networkID': 0, 'resultCode': 3001, 'brand': None, 'changeTimes': 0, 'userAgent': None, 'imei': None, 'operator': None, 'romVersion': None, 'model': None, 'lock': False, 'sessionKey': None, 'configCountry': None, 'deny': False, 'userIdLong': 0, 'resultDesc': '应用不存在', 'status': None, 'createBy': None, 'thirdpartyOk': False, 'appPackage': None, 'appVersion': None, 'accountName': None, 'userId': 0, 'oldLock': False, 'userName': None, 'mobile': None, 'mobileStatus': None}
    

    更复杂的例子。

    源码:

    
    import json
    import telnetlib
    import socket
    
    
    class Dubbo(telnetlib.Telnet):
    
        prompt = 'dubbo>'
        coding = 'gbk'
    
        def __init__(self, host=None, port=0,
                     timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
            super().__init__(host, port)
            self.write(b'\n')
    
        def command(self, flag, str_=""):
            data = self.read_until(flag.encode())
            self.write(str_.encode() + b"\n")
            return data
    
        def invoke(self, service_name, method_name, arg):
            command_str = "invoke {0}.{1}({2})".format(
                service_name, method_name, json.dumps(arg))
            self.command(Dubbo.prompt, command_str)
            data = self.command(Dubbo.prompt, "")
            data = json.loads(data.decode(Dubbo.coding,
                                          errors='ignore').split('\n')[0].strip())
            return data
    
    if __name__ == '__main__':
    
        conn = Dubbo('183.131.22.99', 21881)
    
        content = {
            "sign": "FKeKnMEPybHujjBTzz11BrulB5av7pLhJpk=",
            "partnerOrder": "0511e0d38f334319a96920fa02be02a7",
            "productDesc": "hello",
            "paySuccessTime": "2016-08-25 18:33:04",
            "price": "1",
            "count": "1",
            "attach": "from_pay_demo",
            "date": "20160825",
        }
        content_json = json.dumps(content).replace('"', '\\"')
    
        json_data = {
            "requestId": "0511e0d38f334319a96920fa02be02a7",
            "reqUrl": 'http://www.oppo.com',
            "httpReqType": "POST",
            "headerMap": None,
            "reqContent": content_json,
            "appId": "10001",
            "productName": "test",
            "timeStamp": "1472121184957",
            "partnerId": "2031",
            "signType": "MD5",
            "sign": "23B621FBBF887373C65E553C2222258F",
            "successResult": "OK",
        }
    
        result = conn.invoke(
            "com.oppo.pay.notify.api.facade.NotifyApplyService",
            "httpNotify",
            json_data
        )
        print(result)
    

    执行结果:

    
    $ python3 dubbo.py 
    {'resMsg': 'ǩ', 'data': None, 'errorDetail': 'sign check fail!', 'resCode': '100003', 'success': False}
    
    

    待改进

    • 进行多个项目的实验,增加异常处理

    参考资料

    相关文章

      网友评论

        本文标题:python工具库介绍-dubbo:通过telnet接口访问du

        本文链接:https://www.haomeiwen.com/subject/gftmaftx.html