美文网首页
Python版本Zinx——(8)消息队列及多任务机制

Python版本Zinx——(8)消息队列及多任务机制

作者: 爱折腾的胖子 | 来源:发表于2023-08-28 16:14 被阅读0次

      最近想研究一下关于长链接的相关内容,在B站上看到了Zinx框架的视频,是Golang语言的框架,本着更好的理解框架的内容,按照整个Zinx课程的进度,制作一个Python版本的Zinx框架。有关于Zinx框架的具体内容,可以看框架作者的介绍
      python版本的Zinx,基于Gevent 22.10.2,使用协程功能。

      golang版本的Zinx项目,项目中两个文件夹,ziface和znet。

    • ziface主要是存放一些Zinx框架的全部模块的抽象层接口类。
    • znet模块是zinx框架中网络相关功能的实现,所有网络相关模块都会定义在znet模块中。
      └── zinx
       ├── ziface
       │  └──
       └── znet
          ├──

      python中的关键字没有interface,但是可以使用抽象基类(abstract base class)和第三方库来实现类似于接口的功能。在实际开发中,我们可以根据具体需求选择合适的实现方式。
      暂时使用抽象基类的形式模拟接口的实现。


      在之前的章节中,服务端读取到消息之后,就会开启一个协程进行消息的处理。本节就是将流程改为读取到消息后,将待处理的消息存放到每一个worker的任务队列中,池中空闲的worker会领取自身任务队列中的消息进行处理,防止开启过多的协程进行处理。
      让用户可以在配置文件中配置worker的数量,需要再GlobalObj中添加一个成员变量WorkerPoolSize 。

    # -*- coding: utf-8 -*-
    from typing import Optional
    
    from ziface.iserver import IServer
    import json
    import os
    
    
    class GlobalObj:
        def __init__(self, host: str, tcp_port: int, name: str, version: str, max_packet_size: int, max_conn: int, worker_pool_size: int):
            self.TcpServer: Optional[IServer] = None  # 当前zinx的全局Server对象
            self.Host: str = host  # 当前服务器主机IP
            self.TcpPort: int = tcp_port  # 当前服务器主机监听端口号
            self.Name: str = name  # 当前服务器名称
            self.Version: str = version  # 当前Zinx版本号
            self.MaxPacketSize: int = max_packet_size  # 都需数据包的最大值
            self.MaxConn: int = max_conn  # 当前服务器主机允许的最大链接个数
            self.WorkerPoolSize = worker_pool_size
            self.Reload()
    
        def Reload(self):
            """
            读取用户的配置文件,覆盖默认配置
            :return:
            """
            root_path = os.getcwd()
            conf_path = "/conf/"
            conf_name = "zinx.json"
            real_path = root_path + conf_path + conf_name
            if not os.path.exists(real_path):
                return
    
            with open(real_path, 'r', encoding='utf-8') as jsonfile:
                data = json.load(jsonfile)
                for key in data.keys():
                    if key in self.__dict__.keys():
                        self.__dict__[key] = data[key]
    
        def SetTcpServer(self, tcp):
            """
            记录一下TCPServer
            :param tcp:
            :return:
            """
            self.TcpServer = tcp
    
    
    # 全局对象
    GlobalObject = GlobalObj("0.0.0.0", 8986, "ZinxServerApp", "测试版本", 4096, 12000, 10)
    # 顺便做一个全局Events
    GlobalGevents: list = []
    
    

      imsghandler中添加两个函数,StartWorkerPool开启工作池、SendMsgToTaskQueue将消息放入工作池队列。

    # -*- coding: utf-8 -*-
    
    from ziface.irequest import IRequest
    from ziface.irouter import IRouter
    from abc import ABC, abstractmethod
    
    
    class IMsgHandler(ABC):
        """
        消息管理抽象层
        """
        @abstractmethod
        def DoMsgHandler(self, request: IRequest):
            """
            调度/执行对应的Router消息处理方法
            :param request:
            :return:
            """
            pass
    
        @abstractmethod
        def AddRouter(self, msgID: int, router: IRouter):
            """
            为消息添加具体的处理逻辑
            :param msgID:
            :param router:
            :return:
            """
            pass
    
        @abstractmethod
        def StartWorkerPool(self):
            """
            启动worker工作池
            :return:
            """
            pass
    
        @abstractmethod
        def SendMsgToTaskQueue(self, request: IRequest):
            """
            将消息交给TaskQueue, 由worker进行处理
            :param request:
            :return:
            """
            pass
    
    

     &esmp;在msgHandler实现一下,添加了两个成员函数WorkerPoolSize表示worker数量、TaskQueue每一个worker对应的消息队列。StartWorkerPool开启了工作池,StartOneWorker将每一个worker放入协程中运行,SendMsgToTaskQueue将消息放入对应worker的消息队列。

    # -*- coding: utf-8 -*-
    from typing import Dict, List
    
    import gevent
    from ziface.irequest import IRequest
    from ziface.irouter import IRouter
    from ziface.imsghandler import IMsgHandler
    from utils.globalobj import GlobalObject, GlobalGevents
    from gevent.queue import Queue
    
    
    class MsgHandler(IMsgHandler):
        """
        消息管理实现层
        """
    
        def __init__(self):
            self.Apis: Dict[int, IRouter] = {}  # type : dict[int, IRouter]
            self.WorkerPoolSize = GlobalObject.WorkerPoolSize       # worker数量
            self.TaskQueue: List[Queue] = []    # 每一个worker对应一个Queue,所有Queue用List管理
    
        def DoMsgHandler(self, request: IRequest):
            """
            调度/执行对应的Router消息处理方法
            :param request:
            :return:
            """
            # 判断当前request绑定的API处理方法是否已经存在
            if request.GetMsgID() not in self.Apis.keys():
                print("接口 msgId = ", request.GetMsgID(), " 未找到!")
                return
    
            # 执行对应处理方法
            self.Apis[request.GetMsgID()].PreHandle(request)
            self.Apis[request.GetMsgID()].Handle(request)
            self.Apis[request.GetMsgID()].PostHandle(request)
    
        def AddRouter(self, msgID: int, router: IRouter):
            """
            为消息添加具体的处理逻辑
            :param msgID:
            :param router:
            :return:
            """
            # 1 判断当前msg绑定的API处理方法是否已经存在
            if msgID in self.Apis.keys():
                print("接口 msgId = ", msgID, " 已存在,无法再次添加!")
                return
            # 2 添加msg与api的绑定关系
            self.Apis[msgID] = router
            print("接口 msgId = ", msgID, " 添加成功!")
    
        def StartWorkerPool(self):
            # 遍历需要启动worker的数量,依此启动
            for i in range(0, self.WorkerPoolSize):
                # 一个worker被启动
                # 给当前worker对应的任务队列开辟空间
                self.TaskQueue.append(Queue())
                # 启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来
                g1 = gevent.spawn(self.StartOneWorker, i)
                GlobalGevents.append(g1)
    
        def StartOneWorker(self, workerID: int):
            print("Worker ID = ", workerID, " 已启动。")
            # 不断的等待队列中的消息
            while True:
                # 有消息则取出队列的Request,并执行绑定的业务方法
                try:
                    temp_req = self.TaskQueue[workerID].get_nowait()
                    self.DoMsgHandler(temp_req)
                except:
                    # get_nowait并不会主动切换其他协程,使用sleep切换到其他协程,保证不在当前协程中循环运行
                    gevent.sleep(0)
                    continue
    
        def SendMsgToTaskQueue(self, request: IRequest):
            """
            根据ConnID来分配当前的连接应该由哪个worker负责处理 轮询的平均分配法则
            :return:
            """
            # 得到需要处理此条连接的workerID
            worker_id = request.GetConnection().GetConnID() % self.WorkerPoolSize
            print("添加 ConnID=", request.GetConnection().GetConnID(), " 请求 msgID=", request.GetMsgID(), "到 workerID=",
                  worker_id)
            # 将请求消息发送给任务队列
            self.TaskQueue[worker_id].put(request)
    
    
    def NewMsgHandler() -> IMsgHandler:
        return MsgHandler()
    
    

      消息队列和多任务模块封装好了,接下来就是集成了。
      在Server的Start中,完成率先启动。

        def Start(self):
            """
            启动服务器方法
            :return:
            """
            print("[启动] 服务监听,IP地址:%s, 端口:%s,已经启动\n" % (self.ip, self.port))
            self.msgHandler.StartWorkerPool()
            g1 = gevent.spawn(self.InitTCP)
            GlobalGevents.append(g1)
    

      在Connection中StartReader中,将消息队列集成进去。设置worker数量则开启任务队列,不设置则不开启。

        def StartReader(self):
            """
            处理读业务
            :return:
            """
            print("开启读业务")
            while True:
                try:
                    dp = NewDataPack()
                    # 读取客户端的Msg head
                    head_data = self.Conn.recv(dp.GetHeadLen())
                    if len(head_data) == 0:
                        # head_data 长度为0表示客户端已经退出
                        break
                    # 拆包,得到msgID 和 dataLen 放在msg中
                    msg = dp.Unpack(head_data)
                    # 根据 dataLen 读取 data,放在msg.Data中
                    data_content = self.Conn.recv(msg.GetDataLen())
                    msg.SetData(data_content)
                    # 得到当前conn数据的Request请求数据
                    req = NewRequest(self, msg)
                    # 执行注册的路由方法
                    self.RunHandlerForWorker(req)
                    g1 = gevent.spawn(self.RunHandler(req))
                    GlobalGevents.append(g1)
                except Exception as e:
                    print("读取数据异常 ", e)
                    break
            print("读业务关闭,ID", self.ConnID)
            self.Stop()
    
        def RunHandlerForWorker(self, req):
            """
            采用任务队列运行Handler,如果未设置worker数量或数量为0时,默认不开启队列运行
            :param req:
            :return:
            """
            if GlobalObject.WorkerPoolSize <= 0:
                return
            # 已经启动工作池机制,将消息交给Worker处理
            self.msgHandler.SendMsgToTaskQueue(req)
    
        def RunHandler(self, request: IRequest):
            """
            采用协程运行Handler,如果设置了worker数量,默认不开启协程运行。
            :param request:
            :return:
            """
            if GlobalObject.WorkerPoolSize > 0:
                return
            self.msgHandler.DoMsgHandler(request)
    

      服务接口做完了。在demo\taskqueue中做一个客户端和服务端测试一下,代码与demo\msghandler一样即可。
      此时发送和接收都正常。消息队列和多任务完成。

    相关文章

      网友评论

          本文标题:Python版本Zinx——(8)消息队列及多任务机制

          本文链接:https://www.haomeiwen.com/subject/fvrmmdtx.html