美文网首页
socketserver源码分析

socketserver源码分析

作者: 落羽归尘 | 来源:发表于2019-09-22 19:38 被阅读0次

    背景

    socket模块不同同时处理多个客户端,为了在网络编程中处理多个客户端,有了socketserver这个模块,socketserver是对socket的进一步包装,能处理多个客户端请求。

    示例

    服务端代码

    import socketserver
    
    class MyServer(socketserver.BaseRequestHandler):
        def handle(self):
            flag = 0
            while not flag:
                print(111,self.client_address)
                from_client_data = self.request.recv(1024).decode("utf-8") 
                if from_client_data == "byebye":
                    self.request.send("byebye".encode("utf-8"))
                    flag = 1
                else:
                    print("对方说:", from_client_data)
                    server_input = input(">>>")
                    self.request.send(server_input.encode("utf-8"))       
            print("已结束!")
            self.request.close()                                    
    
    if __name__ == '__main__':
        ip_port = ("127.0.0.1", 8001)
        socketserver.TCPServer.allow_reuse_address = True
        server = socketserver.ThreadingTCPServer(ip_port, MyServer)
        server.serve_forever()
    
    • 上述例子中服务端代码利用多线程处理多个客户端请求,我们根据源码进行分析讲解
    • 首先编写请求类MyServer继承socketserver.BaseRequestHandler,必须实现handle方法,后面我们会说明为何如此
    • 初始化ThreadingTCPServer类,这个是多线程TCP的,UDP有单独的类,传入我们写的MyServer
    • 调用这个方法serve_forever,这样每个客户端请求来了,都会开一个线程
    我们接下来分析一下背后的源码:

    首先看下我们需要继承的BaseRequestHandler类:

    class BaseRequestHandler:
    
        """Base class for request handler classes.
    
        This class is instantiated for each request to be handled.  The
        constructor sets the instance variables request, client_address
        and server, and then calls the handle() method.  To implement a
        specific service, all you need to do is to derive a class which
        defines a handle() method.
    
        The handle() method can find the request as self.request, the
        client address as self.client_address, and the server (in case it
        needs access to per-server information) as self.server.  Since a
        separate instance is created for each request, the handle() method
        can define other arbitrary instance variables.
    
        """
    
        def __init__(self, request, client_address, server):
            self.request = request
            self.client_address = client_address
            self.server = server
            self.setup()
            try:
                self.handle()
            finally:
                self.finish()
    
        def setup(self):
            pass
    
        def handle(self):
            pass
    
        def finish(self):
            pass
    

    当实例化MyServer时,会执行self.handle()方法,因此我们一定要在MyServer类中定义handle方法

    server = socketserver.ThreadingTCPServer(ip_port, MyServer),这条语句,会实例化ThreadingTCPServer,

    源码
    class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
    

    TCPServer又继承基类BaseServer,两者的初始化方法做的事情:主要是self.server_bind()和self.server_activate()

    def server_bind(self):
        """Called by constructor to bind the socket.
    
        May be overridden.
    
        """
        if self.allow_reuse_address:
            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(self.server_address)
        self.server_address = self.socket.getsockname()
    
    def server_activate(self):
        """Called by constructor to activate the server.
    
        May be overridden.
    
        """
        self.socket.listen(self.request_queue_size)
    

    server_bind这个方法是绑定地址
    server_activate调用了socket的listen方法。

    def serve_forever(self, poll_interval=0.5):
        """Handle one request at a time until shutdown.
    
        Polls for shutdown every poll_interval seconds. Ignores
        self.timeout. If you need to do periodic tasks, do them in
        another thread.
        """
        self.__is_shut_down.clear()
        try:
            # XXX: Consider using another file descriptor or connecting to the
            # socket to wake this up instead of polling. Polling reduces our
            # responsiveness to a shutdown request and wastes cpu at all other
            # times.
            with _ServerSelector() as selector:
                selector.register(self, selectors.EVENT_READ)
    
                while not self.__shutdown_request:
                    ready = selector.select(poll_interval)
                    # bpo-35017: shutdown() called during select(), exit immediately.
                    if self.__shutdown_request:
                        break
                    if ready:
                        self._handle_request_noblock()
    
                    self.service_actions()
        finally:
            self.__shutdown_request = False
            self.__is_shut_down.set()
    

    调用这个方法,根据io多路复用机制,有数据的时候会执行self._handle_request_noblock()
    进而执行process_request,而这个方法又被ThreadingMixIn类重载,其实这个mixin类的作用就是请求来了,就创建一个线程进行处理。

    class ThreadingMixIn:
        """Mix-in class to handle each request in a new thread."""
    
        # Decides how threads will act upon termination of the
        # main process
        daemon_threads = False
        # If true, server_close() waits until all non-daemonic threads terminate.
        _block_on_close = False
        # For non-daemonic threads, list of threading.Threading objects
        # used by server_close() to wait for all threads completion.
        _threads = None
    
        def process_request_thread(self, request, client_address):
            """Same as in BaseServer but as a thread.
    
            In addition, exception handling is done here.
    
            """
            try:
                self.finish_request(request, client_address)
            except Exception:
                self.handle_error(request, client_address)
            finally:
                self.shutdown_request(request)
    
        def process_request(self, request, client_address):
            """Start a new thread to process the request."""
            t = threading.Thread(target = self.process_request_thread,
                                 args = (request, client_address))
            t.daemon = self.daemon_threads
            if not t.daemon and self._block_on_close:
                if self._threads is None:
                    self._threads = []
                self._threads.append(t)
            t.start()
    
        def server_close(self):
            super().server_close()
            if self._block_on_close:
                threads = self._threads
                self._threads = None
                if threads:
                    for thread in threads:
                        thread.join()
    

    process_request调用process_request_thread,,而这个方法就是请求来了,就创建一个线程进行处理,这个请求代表的是每个客户端的连接conn,s=socket.socket() ; conn,addr = s.accept()
    这就实现了服务端的多线程模式,使得可以同时处理多个客户端。

    总结

    每次有客户端连接,数据到了,就会实例化一下我们定义的MyServer类,每个客户端都有自己的MyServer对象。这样就做到了每个服务端可处理多个客户端。

    相关文章

      网友评论

          本文标题:socketserver源码分析

          本文链接:https://www.haomeiwen.com/subject/hzxductx.html