美文网首页
Python版本Zinx——(6)消息管理模块

Python版本Zinx——(6)消息管理模块

作者: 爱折腾的胖子 | 来源:发表于2023-08-27 00:34 被阅读0次

      最近想研究一下关于长链接的相关内容,在B站上看到了Zinx框架的视频,是Golang语言的框架,本着更好的理解框架的内容,按照整个Zinx课程的进度,制作一个Python版本的Zinx框架。有关于Zinx框架的具体内容,可以看框架作者的介绍
      python版本的Zinx,基于Gevent 22.10.2,使用协程功能。

      golang版本的Zinx项目,项目中两个文件夹,ziface和znet。

    • ziface主要是存放一些Zinx框架的全部模块的抽象层接口类。
    • znet模块是zinx框架中网络相关功能的实现,所有网络相关模块都会定义在znet模块中。
      └── zinx
       ├── ziface
       │  └──
       └── znet
          ├──

      python中的关键字没有interface,但是可以使用抽象基类(abstract base class)和第三方库来实现类似于接口的功能。在实际开发中,我们可以根据具体需求选择合适的实现方式。
      暂时使用抽象基类的形式模拟接口的实现。


      在第3节中,已经简单的封装了基础的路由模块,使Server可以加载用户自定义的Router,但是只能加载1个Router,本节就将路由模块继续丰富一下,升级成可以加载多个Router的功能。
      在ziface中,新增消息管理接口imsghandler。

    # -*- coding: utf-8 -*-
    
    from ziface.irequest import IRequest
    from ziface.irouter import IRouter
    from abc import ABC, abstractmethod
    
    
    class IMsgHandler(ABC):
        """
        消息管理抽象层
        """
        @abstractmethod
        def DoMsgHandler(self, request: IRequest):
            """
            调度/执行对应的Router消息处理方法
            :param request:
            :return:
            """
            pass
    
        @abstractmethod
        def AddRouter(self, msgID: int, router: IRouter):
            """
            为消息添加具体的处理逻辑
            :param msgID:
            :param router:
            :return:
            """
            pass
    
    

      在znet中,实现一下imsghandler。

    # -*- coding: utf-8 -*-
    from typing import Dict, List, Optional
    
    
    from ziface.irequest import IRequest
    from ziface.irouter import IRouter
    from ziface.imsghandler import IMsgHandler
    from utils.globalobj import GlobalObject
    
    
    class MsgHandler(IMsgHandler):
        """
        消息管理实现层
        """
    
        def __init__(self):
            self.Apis: Dict[int, IRouter] = {}  # type : dict[int, IRouter]
    
        def DoMsgHandler(self, request: IRequest):
            """
            调度/执行对应的Router消息处理方法
            :param request:
            :return:
            """
            # 判断当前request绑定的API处理方法是否已经存在
            if request.GetMsgID() not in self.Apis.keys():
                print("接口 msgId = ", request.GetMsgID(), " 未找到!")
                return
    
            # 执行对应处理方法
            self.Apis[request.GetMsgID()].PreHandle(request)
            self.Apis[request.GetMsgID()].Handle(request)
            self.Apis[request.GetMsgID()].PostHandle(request)
    
        def AddRouter(self, msgID: int, router: IRouter):
            """
            为消息添加具体的处理逻辑
            :param msgID:
            :param router:
            :return:
            """
            # 1 判断当前msg绑定的API处理方法是否已经存在
            if msgID in self.Apis.keys():
                print("接口 msgId = ", msgID, " 已存在,无法再次添加!")
                return
            # 2 添加msg与api的绑定关系
            self.Apis[msgID] = router
            print("接口 msgId = ", msgID, " 添加成功!")
    
    
    def NewMsgHandler() -> IMsgHandler:
        return MsgHandler()
    
    

      消息管理模块封装好了,接下来就是集成了。
      在IServer中调整一下AddRouter,新增一个msgID的参数。

        @abstractmethod
        def AddRouter(self, msgID: int, router: IRouter):
            """
            添加路由
            :param router:
            :return:
            """
            pass
    

      znet中的Server,需要修改一下成员变量,删除Router,新增一个msgHandler。

    class Server(IServer):
        def __init__(self, name: str, ip: str, port: int,
                     family: socket.AddressFamily = socket.AF_INET,
                     socket_kind: socket.SocketKind = socket.SOCK_STREAM):
            self.name: str = name
            self.family: socket.AddressFamily = family
            self.socket_kind: socket.SocketKind = socket_kind
            self.ip: str = ip
            self.port: int = port
            # self.Router: Optional[IRouter] = None
            self.msgHandler: IMsgHandler = NewMsgHandler()
    

      znet中的Server在InitTCP中,创建Connection的时候,之前是传入了Router,现在修改为传入msgHandler。

        def InitTCP(self):
            """
            初始化TCP链接
            :return:
            """
            # 1.获取一个TCP的Addr
            with socket.socket(self.family, self.socket_kind) as tcp:
                # 2.监听服务器的地址
                tcp.bind((self.ip, self.port))
                tcp.listen(128)
                print("[启动] %s服务启动成功,监听中......" % self.name)
                # 3.阻塞的等待客户端链接,处理客户端链接业务(读写)。
                cid: int = 0
                while True:
                    print("开启接收")
                    remote_socket, remote_addr = tcp.accept()
                    dealConn = NewConnection(remote_socket, cid, remote_addr, self.msgHandler)
                    cid += 1
                    g2 = gevent.spawn(dealConn.Start())
                    GlobalGevents.append(g2)
    

      znet中的Server在AddRouter中,实现一下IServer的参数新增。

        def AddRouter(self, msgID: int, router: IRouter):
            """
            添加路由
            :param msgID:
            :param router:
            :return:
            """
            self.msgHandler.AddRouter(msgID, router)
            print("添加路由成功")
    

      Server的InitTCP方法中,调用了NewConnection,并且传入了新的参数,就说明Connection需要进行响应的修改。修改了init、RunHandler、NewConnection函数。

    # -*- coding: utf-8 -*-
    import socket
    import gevent
    from ziface.iconnection import IConnection
    from ziface.imsghandler import IMsgHandler
    from ziface.irequest import IRequest
    from znet.message import NewMessage
    from znet.request import NewRequest
    from znet.datapack import NewDataPack
    from utils.globalobj import GlobalGevents
    
    
    class Connection(IConnection):
        def __init__(self, conn: socket.socket, connID: int, remote_addr: tuple, msgHandler: IMsgHandler):
            self.Conn: socket.socket = conn  # 当前链接的socket TCP套接字
            self.ConnID: int = connID  # 链接的ID
            # self.HandlerAPI = handlerAPI  # 当前链接绑定的业务处理方法的API
            self.is_closed: bool = False  # 链接状态
            self.Remote_Addr: tuple = remote_addr
            # self.Router: IRouter = router
            self.msgHandler: IMsgHandler = msgHandler
    
        def Start(self):
            """
            启动链接 让当前的链接准备开始工作
            :return:
            """
            print("链接开启,ID=", self.ConnID)
            # 开启读业务
            g1 = gevent.spawn(self.StartReader)
            GlobalGevents.append(g1)
    
        def StartReader(self):
            """
            处理读业务
            :return:
            """
            print("开启读业务")
            while True:
                try:
                    dp = NewDataPack()
                    # 读取客户端的Msg head
                    head_data = self.Conn.recv(dp.GetHeadLen())
                    if len(head_data) == 0:
                        # head_data 长度为0表示客户端已经退出
                        break
                    # 拆包,得到msgID 和 dataLen 放在msg中
                    msg = dp.Unpack(head_data)
                    # 根据 dataLen 读取 data,放在msg.Data中
                    data_content = self.Conn.recv(msg.GetDataLen())
                    msg.SetData(data_content)
                    # 得到当前conn数据的Request请求数据
                    req = NewRequest(self, msg)
                    g2 = gevent.spawn(self.RunHandler(req))
                    GlobalGevents.append(g2)
                except Exception as e:
                    print("读取数据异常 ", e)
                    break
            print("读业务关闭,ID", self.ConnID)
            self.Stop()
    
        def RunHandler(self, request: IRequest):
            """
            采用协程运行Handler
            :param request:
            :return:
            """
            self.msgHandler.DoMsgHandler(request)
    
        def Stop(self):
            """
            停止链接 结束当前链接的工作
            :return:
            """
            print("链接关闭,ID=", self.ConnID)
            if self.is_closed:
                return
            self.is_closed = True
            # 关闭socket链接,回收资源
            self.Conn.close()
    
        def GetTCPConnection(self) -> socket.socket:
            """
            获取当前链接绑定的socket conn
            :return:
            """
            return self.Conn
    
        def GetConnID(self) -> int:
            """
            获取当前链接模块的链接ID
            :return:
            """
            return self.ConnID
    
        def GetRemoteAddr(self) -> tuple:
            """
            获取远程客户端的TCP状态 IP Port
            :return:
            """
            return self.Remote_Addr
    
        def SendMsg(self, msgID: int, data: bytes):
            """
            发送数据 将数据发送给远程的客户端
            :param msgID:
            :param data:
            :return:
            """
            if self.is_closed:
                raise Exception("发送数据时客户端链接被关闭")
            try:
                dp = NewDataPack()
                msg = dp.Pack(NewMessage(msgID, len(data), data))
                self.Conn.send(msg)
            except Exception as e:
                print(e)
    
    
    def NewConnection(conn: socket.socket, connID: int, remote_addr: tuple, msgHandler: IMsgHandler) -> IConnection:
        c = Connection(conn, connID, remote_addr, msgHandler)
        return c
    
    

      服务接口做完了。在demo\msghandler中做一个客户端client.py测试一下

    import socket
    import time
    import sys
    
    sys.path.append("../../")
    from znet.datapack import NewDataPack
    from znet.message import NewMessage
    
    if __name__ == '__main__':
        HOST = '127.0.0.1'  # 服务器IP地址
        PORT = 8986  # 服务器端口号
        # 创建socket对象
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # 连接服务器
        s.connect((HOST, PORT))
        print(f"Connected to {HOST}:{PORT}")
        # 发送数据
        data = "Hello, Server"
        msg = NewMessage(1, len(data), data.encode("utf-8"))
        dp = NewDataPack()
        send_data = dp.Pack(msg)
        s.send(send_data)
        # 接收响应数据
        response = s.recv(1024)
        print("Received from server: ", response)
        # 关闭连接
        # s.close()
        time.sleep(100)
    

      再做一个服务端。

    # -*- coding: utf-8 -*-
    import sys
    
    sys.path.append("../../")
    from znet.server import NewServer
    from znet.router import BaseRouter
    from ziface.irequest import IRequest
    
    
    class PingRouter(BaseRouter):
        """
        用户自定义路由
        """
    
        def __init__(self):
            super().__init__()
    
        def Handle(self, request: IRequest):
            """
            处理conn业务的方法
            :param request:
            :return:
            """
            try:
                print("调用Handle")
                # request.GetConnection().GetTCPConnection().send("ping\n".encode("GB2312"))
                print(request.GetData())
                request.GetConnection().SendMsg(1, "ping\n".encode("GB2312"))
                request.GetConnection().SendMsg(1, request.GetData())
            except Exception as e:
                print("ping异常", e)
    
    
    if __name__ == '__main__':
        server = NewServer()
        server.AddRouter(1, PingRouter())
        server.Serve()
    
    

      此时发送和接收都正常。消息管理模块完成。

    相关文章

      网友评论

          本文标题:Python版本Zinx——(6)消息管理模块

          本文链接:https://www.haomeiwen.com/subject/ttfpmdtx.html