美文网首页TesterHome测试之家
用 Python 写网络编程(四)

用 Python 写网络编程(四)

作者: TesterHome | 来源:发表于2022-02-17 11:11 被阅读0次

    本文首发于TesterHome社区,作者是资深游戏测试开发工程师陈子昂。用 Python 写网络编程共四篇,今天分享的是第四篇。原文链接:https://testerhome.com/topics/29411

    复习功课

    第一篇地址

    第二篇地址

    第三篇地址

    逐步讲述回包

    直接入正题,第三篇已经提到了如何收到回包,当前这个主题网络编程都是讲 Tcp,协议请求方式本身会决定特性,我们这里先讲和发包收包顺序有关的。

    Tcp 首先是一个双工的,意思就是客户端 C 端发消息给 S 端,S 端也可以发消息给客户端,传输协议数据类型里面传输在网卡那层都是二进制文件流。为啥是流而不是包可以回顾之前的,也可以选择死记硬背。

    那么 C 端是不是发给 S 端消息,S 端一定是要回复的呢,这个是未必的。因为在协议请求/传输方式还会传输协议数据类型,这个类型和业务是直接有关的,大体可以分为需要应答和不需要应答。不需要应答的最常见的就是心跳包。这里为啥要叫包而不是叫心跳流呢,只能说是一种通用的叫法。

    当客户端链接上服务后,每隔约一段时间,往服务器固定传输一段信息,就和人的心电图一样,证实当前客户端是活跃的。如果不活跃服务器会把客户端对象断开(Tcp 挥手),这里不活跃一般是指多少次没有收到,才会判断不活跃。

    断开的好处,这里可以这样记忆,每个 socket 链接是一定有成本的,保持链接状态也一样,每个 socket 做一些事会产生内存变化(这里不考虑软件缓存,硬件的几级缓存),如果在把一些数据存储到数据库,就会把内存数据交换到数据库或者数据库缓存,交互过程也会产生内存碎片和数据库 IO 开销。只是是活跃的,就会根据这个 socket 做得事情产生一系列的非用户态的一些开销,所以有必要剔除不活跃的请求。

    比如 30 分钟没有获得经验/多少分钟没有移动(移动也会产生移动数据包,这个数据包会逐条同步给他周围的活跃用户),然后就会进入挂机状态,这种其实也是类似把 fb 标记为了不活跃 (leave = no alive),不活跃就只需要定期同步一次。

    如果是完全下线 (close),这里撇开业务,完全下线也会自动挂机获取经验。
    以上主要慢慢展开客户端发了 3 次 100 个字节后等待几百 ms,但是服务器只回了一组包,这个是 Tcp 独有黏包的问题。

    但是发现这样跨度有点大,例子比较基础,会先来通过一个 Tcp 心跳的例子(分四,五章节)来慢慢往后延伸,反正最终都会讲清楚的。

    心跳学习和设计

    写个例子来做一些直观性的推理和设计,例子也会结合之前的一些知识点。
    心跳信息:json 字串 # 实战里面是一个 bytes 形式 这个 butes 会比较小,只是为了让服务器计数客户端是活跃的。

    详解 服务器那边记录一个客户端连接句柄的管理字典,链接的客户端对象在服务器那边会是一个"fd",是否活跃会是一个字段"status":"alive",""leave","close" 分别代表 活跃,非活跃暂离,关闭。

    服务器那边进行开发设计,支持状态为一个双向的顺序 ("alive"<-->"leave"<-->"close")

    关于 status:"alive"状态条件 客户端来说是默认的,没在 socket.error 那层被拦截掉,所以链接上了,链接上分为地址正确,写法正确,防火墙允许 (这里测试用不上这个)

    需要服务器开发:
    1.socket 服务器程序
    2.常规接收包
    3.判断 4 个字节后的数据在反序列化 json.loads。服务器比客户端多一个 bind 后开启监听的功能,服务器支持多个客户端。
    4.匹配功能。服务器收到客户端信息后会进行匹配,有一个游标(第 5 章完成该功能)。

    游标比如设置为 3,阈值是 5,多少秒收到一次消息就-1,多少秒没有收到消息就 +1.阈值达到一次 5 计数一次,达到 3 次阈值,修改当前客户端链接管理字典为"leave"。达到阈值小于 3 次那么还是 alive。
    5.会用一个 user 实例对象列表去管理添加模拟链接进来的客户端。每 10 秒加一个,客户端是模拟假的,端口号 +1。

    需要客户端开发的:
    1.socket client 程序
    2.Json 发包 + 数据结构
    3.间隔每 10 秒发一次。

    关于 status:"leave"状态:记录保持 30 分钟(代码会写 30 秒)会到下个状态"close",如果出现心跳稳定多次,没有触发到阈值 5,会回退到"alive"。

    关于到达 status:"close"服务器会把客户端踢掉,在 user 的列表管理里面,服务器会存储客户端的 ip:端口。

    心跳客户端前置

    先写这个例子 只有"alive"状态条件,没有切换条件的。根据过去例子,我们先需要定义一个网络层的数据结构,这次先不用 protobuff。

    协议传输用 Tcp 使用通用性高的 struct,传输数据结构为 Json
    Tcp 数据结构为包头 4 个字节,4 个字节为里面塞包体长度,那个整个包长度也就是 4 个字节(固定的)+ 包头 4 个字节的取值(动态的包体长度)。

    Json C2S {"ip":传递本地 Ip,"status":上面讲的状态使用默认的,"pid":用于客户端管理自己的} pid 这里主要用于自己杀自己进程
    这里需要补一个知识,第三章未讲完的,就是如果 4 个字节包头长度 + 客户端传递对象是如何写的。

    # ip_add给服务器的地址
    ip_addr = socket.gethostbyname(socket.gethostname())
    # 客户端本地自己管理自己用的pid
    pid = os.getpid()
    # 状态切换用的列表,列表有序的,通过idx作为状态机索引,默认是alive
    status = ["alive", "leave", "close"]
    idx = 0
    data = {'ip': ip_addr, 'status': status[idx], 'pid': pid}
    # data的长度,这里有str只是为了下面+使用,int->str不影响下面结果
    pack_len = str(len(data))
    print(pack_len) # 输出为3
    # i是4个字节 +pack_len struct.calcsize是看fmt区域的
    client_len = struct.calcsize("!i" + pack_len + 's')
    print(client_len) # 输出为4+3
    
    

    注意看注释,动态计算长度就这么出来了,但是注意了,struct.pack 压包动态化是面向包头里面有几个对象,如果只有一个对象传入 json 输出应该是这样的
    本次代码关于压包实例

    buffer = json.dumps(data)
    print(len(buffer))
    packet_head = struct.pack("!i",len(buffer))
    buffer_client = packet_head+buffer.encode("utf-8")
    print(buffer_client) # b'\x00\x00\x008{"ip": "192.168.1.105", "status": "alive", "pid": 26464}'
    
    

    心跳客户端

    只包含和服务器进行通信,发送 Tcp-stuct,Json,把 json 数据做为心跳包发送了,注意实际心跳的确会是固定的数据,但不会传入这个。
    服务器只是解析字符串那样去判断心跳是哪个客户端发的。状态切换功能下个文章再写。

    import datetime
    import socket, sys, os
    import time, json,struct
    
    class HeartBeatClient:
        interval = 10  #心跳发送间隔
    
        def __init__(self, addr: tuple):
            try:
                self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.client.connect(addr)
            except socket.error as e:
                print("Error creating socket: %s" % e)
                sys.exit()
    
        def encode_json(self, conn,data:dict):
            """发包"""
            buffer = json.dumps(data)
            print(f"当前客户端发出去包长度{len(buffer)}")
            packet_head = struct.pack("!i", len(buffer))
            conn.send(packet_head + buffer.encode("utf-8"))
    
        def loop_client(self, conn: socket.socket):
            """
            循环客户端
            :param conn:
            :return:
            """
            status = ["alive", "leave", "close"]  #状态列表有序
            idx = 0 #状态索引
            while True:
                ip_addr = socket.gethostbyname(socket.gethostname())
                pid = os.getpid()
                data = {"ip": ip_addr, "status": status[idx], "pid": pid}
                buffer = json.dumps(data)
                try:
                    self.encode_json(conn,buffer)
                except socket.error:
                    print("client send failed")
                    break
                else:
                    now =datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                    print({"time":now,"fd":os.getpid(),"status":status[idx]})
                    time.sleep(self.interval)
            self.client_close(conn)
    
        def client_close(self,conn):
            if conn:
                conn.close()
    
    if __name__ == '__main__':
        client = HeartBeatClient(("127.0.0.1", 14000))
        client.loop_client(client.client)
    
    

    加固 strcut 传递使用的例子,建议手敲代码。

    心跳服务器

    主要是对应这次客户端编写的心跳服务器功能,注意调式是先启动服务器,在启动客户端文件。

    为了模拟 fd_port 自增 1,等于好多个客户端连接入,这里如果不熟悉 Threading 的,需要稍微自学下。

    import socket
    import json
    from threading import Thread
    import ast
    
    class HeartBeatServer:
    
        def __init__(self, addr: tuple, max: int):
            self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            self.sock.bind(addr)
            # 最大支持max个链接数
            self.sock.listen(max)
            self.user = []
    
        def encode_buffer(self, conn: socket.socket, data: str or bytes or dict):
            """
            发送数据
            :param conn: 客户端链接到服务器的句柄
            :param data: 数据
            :return:
            """
            if isinstance(data, str):
                data = data.encode("utf-8")
            elif isinstance(data, dict):
                data = str(data).encode("utf-8")
            conn.send(str(data).encode("utf-8"))
    
        def match_data(self,recv)->bool:
            """
            匹配反序列化后的数据
            :param recv:
            :return:
            """
            if isinstance(recv,dict):
                return recv.get("ip") and recv.get("status")
    
        def loop_thread(self, conn: socket.socket, fd_port: int):
            """
            服务器是只判断客户端的包头是否合法。
            :return:
            """
            head_len = 4
            self.user.append("192.168.1.105:88888")
            idx = 1
            while True:
                try:
                    # 不停收包
                    data = conn.recv(1024 * 4)
                    if len(data) > head_len:
                        recv = json.loads(data[4:])
                        recv = ast.literal_eval(recv)
                        if not self.match_data(recv):
                            # pop()列表丢出去并且返回丢出的数据,默认是最后一个
                            print(f"client fd addr:{self.user.pop()} Connected over!")
                            break
                        # 为了模拟fd_port自增1,等于好多个客户端连接入
                        self.user.append(f"{recv.get('ip')}:{fd_port + idx}")
                        print(f"当前服务器管理用户-->{self.user}")
                    else:
                        print(f"client fd addr:{self.user[-1]}数据不合法")
                except socket.error:
                    print(f"client fd addr:{self.user.pop()} Connected over!")
                    break
                else:
                    idx += 1
            conn.close()
    
        def start(self):
            """
            服务启动
            :return:
            """
            while True:
                conn, addr = self.sock.accept()
                t = Thread(target=self.loop_thread, args=(conn, addr[1]))
                t.start()
    
    if __name__ == '__main__':
        server = HeartBeatServer(("127.0.0.1", 14000), 100)
        server.start()
    
    

    这里有一个重要概念

    这里为啥不能用 self.sock 是因为 self.sock 是服务器本身的句柄
    conn 是服务器收到客户端对象的句柄,所以这个 conn 才是被管理的,当客户端断开时,在服务器通过 conn 进行挥手
    如果是 self.sock 服务就关闭了。

    结尾

    第五章也写了一部分草稿中

    以上是今天的分享,你学废了吗~

    想学习更多干货知识和前沿技术?

    想结识测试行业大咖和业界精英?

    欢迎关注2022 MTSC大会(第十届中国互联网测试开发大会)

    业界对MTSC大会的评价:落地、务实、有深度、重分享

    中国互联网测试开发大会 Testing Summit China,是由TesterHome发起的软件测试开发行业技术会议。还有个别名:Make Testing Super Cool!

    MTSC大会以软件质量保障体系和测试研发技术交流为主要目的,始于 2015 年,已成功举办了九届。共有 1000+ 家企业,10000+ 测试工程师、测试经理、CTO 参会,受到了全行业的广泛关注,是中国互联网质量保证行业的顶级会议。

    为了保证议题质量,每年都会提前半年进行议题征集,再经过历时数月的审核,最终确定在大会分享的议题。MTSC 2022的议题还在征集中,诚邀各位资深测试技术专家、质量管理经理和测试新秀们自荐/推荐议题!

    提交议题方式

    直接点击 https://www.wjx.top/vj/wZwCju3.aspx 进入投递入口,按照格式进行填写并提交即可,欢迎自荐/推荐议题。

    议题截止时间

    为便于评审组有更充分时间进行评审及与讲师进行沟通优化,为广大参会者呈现更好的议题,议题投稿(可无 PPT,有大纲等议题信息介绍即可)投递截止时间提前为:2022年3月30日

    议题征集评选流程

    总体流程:案例提交 > 初审核定 > PPT提交 > 确认议题 > 会议演讲

    总体时间节点:

    案例提交 >> 3 月 30 日

    初审核定 >> 4 月 15 日

    ppt 提交 >> 5 月 15 日

    议题确认 >> 5 月 30 日

    会议演讲 >> 7 月 22~23 日

    分享讲师福利

    1. 锻炼演讲能力,增强个人品牌

    2. 获得和业内专家面对面交流的机会,博采众长

    3. 提升公司品牌,给自己的团队增加吸引力

    4. 获取免费的大会门票和资料:

    每位讲师都会获赠 1 张大会门票,大会后续的PPT和视频都会第一时间给到讲师

    5. 外地讲师由MTSC大会组委会承担差旅费用

    相关文章

      网友评论

        本文标题:用 Python 写网络编程(四)

        本文链接:https://www.haomeiwen.com/subject/dgojlrtx.html