美文网首页Python
多任务服务器的几种实现方式

多任务服务器的几种实现方式

作者: 柏丘君 | 来源:发表于2017-06-30 17:46 被阅读18次

    前两篇文章介绍了 Python 中 UDP 套接字和 TCP 套接字,并在此基础上实现了简单的客户端和服务端,本文接着介绍服务端多任务处理的几种实现方式,算是对这方面知识的一个总结,本文将介绍以下几种实现方式:

    • 多进程服务端
    • 多线程服务端
    • 单进程/线程非阻塞服务端
    • select 实现 IO 多路复用型服务端
    • epoll 事件订阅型服务端
    • gevent 协程型服务端

    客户端编写

    在开始编写服务端之前,首先准备一份通用的客户端代码,后面创建的服务端都使用这份代码测试:

    from socket import *
    
    def main():
        cSocket = socket(AF_INET, SOCK_STREAM)
        cSocket.connect(("192.168.2.142",3001))
        while True:
            msg = input("Enter Message:")
            if msg == "q!":
                break
            else:
                cSocket.send(msg.encode("utf-8"))
    
        cSocket.close()
    
    if __name__ == '__main__':
        main()
    

    多进程型服务端

    多进程型服务端的特点是每收到一个 Socket 连接,就为其创建一个独立的进程进行服务:

    from socket import *
    from multiprocessing import Process
    
    # Server 类负责接收客户端请求,并未每个客户端创建套接字
    class Server():
        @classmethod
        def __prepareSocket(cls):
            cls.sSocket = socket(AF_INET, SOCK_STREAM)
            cls.sSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
            cls.sSocket.bind(("",3001))
            cls.sSocket.listen(5)
    
        @classmethod
        def startServer(cls):
            cls.__prepareSocket()
            while True:
                # 监听客户端请求
                clientSocket,clientAddr = cls.sSocket.accept()
                print("%s 已连入,正在接受消息..."%clientAddr[1])
                # 创建 SocketHander 实例,并将新创建的套接字传入
                cp = SocketHander(clientSocket,clientAddr)
                cp.start()
    
    # SocketHander 类用来为每一个客户端提供独立的服务
    class SocketHander(Process):
        def __init__(self,clientSocket,clientAddr):
            Process.__init__(self)
            self.clientSocket = clientSocket
            self.clientAddr = clientAddr
    
        def run(self):
            # 监听异常信息
            try:
                while True:
                    recvMsg = self.clientSocket.recv(1024)
                    print("%s:%s"%(self.clientAddr[0],recvMsg.decode("utf-8")))
                    self.clientSocket.send("ding~".encode("utf-8"))
            except:
                print("%s 已断开连接~"%self.clientAddr[0])
            finally:
                self.clientSocket.close()
    
    if __name__ == '__main__':
        Server.startServer()
    

    多线程型服务器

    只需对上面的代码做一丁点儿修改,就可以实现一个多线程服务器:

    from socket import *
    from threading import Thread
    
    # Server 类负责接收客户端请求,并未每个客户端创建套接字
    class Server():
        @classmethod
        def __prepareSocket(cls):
            cls.sSocket = socket(AF_INET, SOCK_STREAM)
            cls.sSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
            cls.sSocket.bind(("",3001))
            cls.sSocket.listen(5)
    
        @classmethod
        def startServer(cls):
            cls.__prepareSocket()
            while True:
                # 监听客户端请求
                clientSocket,clientAddr = cls.sSocket.accept()
                print("%s 已连入,正在接受消息..."%clientAddr[1])
                # 创建 SocketHander 实例,并将新创建的套接字传入
                cp = SocketHander(clientSocket,clientAddr)
                cp.start()
    
    # SocketHander 类用来为每一个客户端提供独立的服务
    class SocketHander(Thread):
        def __init__(self,clientSocket,clientAddr):
            Thread.__init__(self)
            self.clientSocket = clientSocket
            self.clientAddr = clientAddr
    
        def run(self):
            # 监听异常信息
            try:
                while True:
                    recvMsg = self.clientSocket.recv(1024)
                    print("%s:%s"%(self.clientAddr[0],recvMsg.decode("utf-8")))
                    self.clientSocket.send("ding~".encode("utf-8"))
            except:
                print("%s 已断开连接~"%self.clientAddr[0])
            finally:
                self.clientSocket.close()
    
    if __name__ == '__main__':
        Server.startServer()
    

    单进程/线程非阻塞服务端

    前面的代码中为什么要使用多进程或者多线程呢?这是因为 Socket 对象的 accept 方法和 recv 方法是阻塞的,如果将二者放在一个进程或线程中,势必会造成阻塞。于是我们把 accept 方法放在一个进程/线程中执行,将 recv 方法放在另一个进程/线程中执行,就避免了阻塞。
    如果 accpetrecv 方法是不阻塞的,不就可以解决这个问题了吗?是的,我们可以在创建套接字后,调用其的 setblocking 方法,传入一个参数 False,这时这两个方法就不阻塞了。
    另外,客户端连接成功后不一定立马向服务端发送消息,因此我们并不能确定合适调用 Socket 对象的 recv 方法,为了解决这个问题,我们可以在客户端连接成功后,将创建好的客户端 Socket 装入一个列表中,然后每隔一段时间遍历此列表即可。
    下面是实现代码:

    from socket import *
    from time import sleep
    
    class Server():
        # 存放客户端 Socket 对象
        clientSockets = []
    
        @classmethod
        def __prepareSocket(cls):
            cls.sSocket = socket(AF_INET, SOCK_STREAM)
            cls.sSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
            # 将服务端 socket 设置为非阻塞
            cls.sSocket.setblocking(False)
            cls.sSocket.bind(("",3001))
            cls.sSocket.listen(5)
        @classmethod
        def startServer(cls):
            cls.__prepareSocket()
            # 轮询 检查有没有新的连接
            while True:
                # 监听客户端请求
                try:
                    clientSocket,clientAddr = cls.sSocket.accept()
                except:
                    pass
                else:
                    print("%s 已连入,正在接受消息..."%clientAddr[1])
                    # 如果不发生异常,说明有了新的连接,将新建的 Socket 对象设置为非阻塞
                    clientSocket.setblocking(False)
                    # 将此对象添加到列表中,等待轮询
                    cls.clientSockets.append((clientSocket,clientAddr))
                # 每个 0.3 秒遍历一次列表,判断是否能接受消息
                sleep(0.3)
                cls.__handleSocket()
    
        @classmethod
        def __handleSocket(cls):
            for clientSocket,clientAddr in cls.clientSockets:
                try:
                    recvMsg = clientSocket.recv(1024)
                except:
                    pass
                else:
                    if not len(recvMsg):
                        print("%s 已断开连接..."%clientAddr[1])
                        # 关闭客户端 Socket
                        clientSocket.close()
                        # 断开连接后,从列表中移除该 Socket 对象
                        cls.clientSockets.remove((clientSocket,clientAddr))
                    else:
                        clientSocket.send("ding~".encode("utf-8"))
                        print("%s:%s"%(clientAddr[1],recvMsg.decode("utf-8")))
    
    if __name__ == '__main__':
        Server.startServer()
    

    需要注意的是,将 Socket 对象设置为非阻塞后,在使用其的 acceptrecv 方法时需要加上异常处理,这是因为使用非阻塞 Socket 后,轮询时如果没有新的连接或者没有客户端发送消息,将会引发异常,需要我们进行异常捕获。

    select 实现 IO 多路复用型服务端

    上面我们采用非阻塞 Socket 和轮询列表的方式实现了一个单进程/线程的非阻塞服务器,其实,操作系统底层也提供了一个 select 模块,用来帮我们检测哪些套接字发生了变化,可以对其进行操作,由于是操作系统底层帮我们完成的,效率上比上面的手动循环更高。
    select 的用法很简单,我们只需调用 select 模块中的 select 方法,该方法接收三个列表作为参数,并对这三个列表进行监听。这三个列表依次为可读的套接字列表、可写的套接字列表、异常套接字列表。
    select 方法是阻塞的,在其接收的三个列表中有状态变化时,它会返回三个列表,列表中的元素是发生了状态变化的元素,返回的三个列表一次对应于发生变化的刻度套接字列表、发生变化的可写套接字列表和发生变化的异常套接字列表。
    基本用法:

    readble,writeable,exceptional = select(readbleList, writeableList, exceptionalList)
    

    下面看一个例子:

    from socket import *
    from select import select
    
    class Server():
        # 存放客户端 Socket 对象
        readableSocketsList = []
    
        @classmethod
        def __prepareSocket(cls):
            cls.sSocket = socket(AF_INET, SOCK_STREAM)
            cls.sSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
            cls.sSocket.bind(("",3001))
            cls.sSocket.listen(5)
            cls.readableSocketsList.append(cls.sSocket)
        @classmethod
        def startServer(cls):
            cls.__prepareSocket()
            # 轮询 检查有没有新的连接
            while True:
                readableSockets,writeableSockets,exceptionalSocket = select(cls.readableSocketsList,[],[])
                cls.__handleSocket(readableSockets)
    
        @classmethod
        def __handleSocket(cls,readableSockets):
            for sock in readableSockets:
                # 如果是 sSocket 发生了变化,说明有新的客户端连接
                if sock == cls.sSocket:
                    clientSocket,clientAddr = cls.sSocket.accept()
                    print("%s 已连入..."%clientAddr[1])
                    # 将新建的客户端 Sockets 对象存入 readableSocketsList 中
                    cls.readableSocketsList.append(clientSocket)
                # 如果不是 sSocket 发生了变化,这说明有客户端向服务端发送了消息
                else:
                    try:
                        recvMsg = sock.recv(1024)
                        # 判断客户端是否断开了了解
                        if not len(recvMsg):
                            sock.close()
                            cls.readableSocketsList.remove(sock)
                        else:
                            # 向客户端回执消息
                            sock.send("ding~".encode("utf-8"))
                            print("%s:%s"%(sock.getpeername()[1],recvMsg.decode("utf-8")))
                    except:
                        pass
    
    if __name__ == '__main__':
        Server.startServer()
    

    上面我们给 select 函数传入一个 readableSocketsList 列表,当此列表中有 Socket 对象发生了状态变化时,我们会立马得到一个发生了变化的 Socket 对象的列表,可以对此列表中的 Socket 对象进行操作。

    epoll 事件订阅型服务端

    上面我们使用 select 模块完成了一个IO多路复用,对套接字对象变化的检测是操作系统内部检测的,但本质上仍然是对套接字列表进行遍历操作,效率并不高,并且使用 select 实现并发时还有并发量限制,一般 32 位机器是 1024 个,64 位机器是 2048 个。
    select 的作用是什么呢?无非就是通过对套接字列表的遍历,找出发生状态变化的套接字,那么我们也可以换一种方式:不主动遍历套接字,而是在哪个套接字发生变化的时候再通知系统,系统拿到变化的套接字后再通知我们的 Python 程序,效率自然更高。
    epoll 的使用仍然依赖于 select,在使用 epoll 前需要创建一个 epoll 对象:

    epoll = select.epoll()
    

    然后向 epoll 中注册事件:

    epoll.register( Socket 对象的文件描述符, 操作方式 )
    

    获取 Socket 对象的文件描述符:

    fno = socketObj.fileno()
    

    epoll 对文件描述符的操作是由三个常量来表示的:

    • select.EPOLLIN (可读)
    • select.EPOLLOUT (可写)
    • select.EPOLLET (ET模式)

    epoll对文件描述符的操作有两种模式:LT(level trigger)和ET(edge trigger)。LT模式是默认模式,LT模式与ET模式的区别如下:

    • LT模式:当epoll检测到描述符事件发生并将此事件通知应用程序,应用程序可以不立即处理该事件。下次调用epoll时,会再次响应应用程序并通知此事件。
    • ET模式:当epoll检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理该事件。如果不处理,下次调用epoll时,不会再次响应应用程序并通知此事件。
    from socket import *
    import select
    
    class Server():
        # 存放已连接的客户端 Socket 对象
        clientSocktsList = {}
        # 存放客户端的地址信息
        clietnAddrList = {}
    
        @classmethod
        def __prepareSocket(cls):
            cls.sSocket = socket(AF_INET, SOCK_STREAM)
            cls.sSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
            cls.sSocket.bind(("",3001))
            cls.sSocket.listen(5)
            # 创建 epoll 对象
            cls.epoll = select.epoll()
            # 将服务端 Socket 添加到 epoll 事件监听
            cls.epoll.register(cls.sSocket.fileno(),select.EPOLLIN | select.EPOLLET)
    
        @classmethod
        def startServer(cls):
            cls.__prepareSocket()
            # 进行轮询操作,处理事件响应
            while True:
                # poll 方法是阻塞方法
                # 当有套接字发生状态变化时,会通过时间通知操作系统,操作系统将这些发生变化的套接字对象返还给程序
                socketList = cls.epoll.poll()
                cls.__handleSocket(socketList)
    
    
        @classmethod
        def __handleSocket(cls,socketList):
            for fno,event in socketList:
                # 如果文件描述符等于服务端 Socket 对象的文件描述符,说明有新的客户端连接
                if fno == cls.sSocket.fileno():
                    clientSocket,clientAddr = cls.sSocket.accept()
                    print("%s 已连接..."%clientAddr[1])
                    # 将客户端 Socket 对象和客户端地址信息添加到 clientSocktsList 和 clietnAddrList
                    # 以该套接字的文件描述符作为key
                    socketFno = clientSocket.fileno()
                    cls.clientSocktsList[socketFno] = clientSocket
                    cls.clietnAddrList[socketFno] = clientAddr
                    # 将新建的客户端 Socket 对象注册到 epoll 事件监听
                    cls.epoll.register(clientSocket.fileno(),select.EPOLLIN | select.EPOLLET)
                
                # 如果 Socket 对象的文件描述符和服务端 Socket 对象不一致,说明客户端向服务端发送了消息
                # 判断 event 类型,作出相应的处理
                elif event == select.EPOLLIN:
                    clientSocket = cls.clientSocktsList[fno]
                    addr = cls.clietnAddrList[fno][1]
                    try:
                        recvMsg = clientSocket.recv(1024)
                        if not len(recvMsg):
                            print("%s 已断开连接"%addr)
                            clientSocket.close()
                            del cls.clientSocktsList[fno]
                        else:
                            print("%s:%s"%(addr,recvMsg.decode("utf-8")))
                    except:
                        pass
    
    if __name__ == '__main__':
        Server.startServer()
    

    可见,使用 epoll 和使用 select 的处理方式都是一致:获取到发生了变化的套接字列表,然后进行相应的处理。区别只在于操作系统内部对于 epollselect 的处理方式的不同。

    gevent 协程型服务器

    我们也可以 gevent 这个协程库来实现一个多任务处理的服务器,首先需要安装 gevent

    pip install gevent
    

    需要注意,使用 gevent 实现服务器时,需要使用 gevent 库提供的 socket,而不是系统自带的 socket
    代码如下:

    from gevent import socket,monkey,spawn
    # 使用 gevent 在执行代码之前,需要首先调用 monkey 下的 patch_all 方法
    monkey.patch_all()
    
    class Server():
        @classmethod
        def __prepareSocket(cls):
            # 使用 genvent 提供的 socket
            cls.sSocket = socket.socket()
            # 将服务端 socket 设置为非阻塞
            cls.sSocket.bind(("",3001))
            cls.sSocket.listen(5)
    
        @classmethod
        def startServer(cls):
            cls.__prepareSocket()
            while True:
                clientSocket,clientAddr = cls.sSocket.accept()
                print("%s 已连接..."%clientAddr[1])
                # 处理连接,需要使用 gevent 的 spawn 方法调用
                spawn(cls.__handleSocket,clientSocket,clientAddr)
    
        @classmethod
        def __handleSocket(cls,clientSocket,clientAddr):
            while True:
                try:
                    recvMsg = clientSocket.recv(1024)
                    if not len(recvMsg):
                        print("%s 已断开连接..."%clientAddr[1])
                        clientSocket.close()
                        break
                    else:
                        print("%s:%s"%(clientAddr[1],recvMsg.decode("utf-8")))
                except:
                    pass
    
    if __name__ == '__main__':
        Server.startServer()
    

    使用 gevent 协程实现多任务服务器有以下几个注意点:

    • 需要使用 gevent 模块中提供的 socket
    • 需要使用 gevent 模块中的 spawn 函数去调用目标函数
    • 在代码执行之前需要调用 monkey 下的 patch_all 方法

    总结

    本文介绍了 Python 中实现多任务服务器的几种常用方式,包括多进程/多线程实现、非阻塞 Socket 实现、select 实现、epoll 实现和 gevent 实现。算是一个记录,以后忘记了可以随时回头查阅。

    完。

    相关文章

      网友评论

        本文标题:多任务服务器的几种实现方式

        本文链接:https://www.haomeiwen.com/subject/kfmbcxtx.html