背景
socket模块不同同时处理多个客户端,为了在网络编程中处理多个客户端,有了socketserver这个模块,socketserver是对socket的进一步包装,能处理多个客户端请求。
示例
服务端代码
import socketserver
class MyServer(socketserver.BaseRequestHandler):
def handle(self):
flag = 0
while not flag:
print(111,self.client_address)
from_client_data = self.request.recv(1024).decode("utf-8")
if from_client_data == "byebye":
self.request.send("byebye".encode("utf-8"))
flag = 1
else:
print("对方说:", from_client_data)
server_input = input(">>>")
self.request.send(server_input.encode("utf-8"))
print("已结束!")
self.request.close()
if __name__ == '__main__':
ip_port = ("127.0.0.1", 8001)
socketserver.TCPServer.allow_reuse_address = True
server = socketserver.ThreadingTCPServer(ip_port, MyServer)
server.serve_forever()
- 上述例子中服务端代码利用多线程处理多个客户端请求,我们根据源码进行分析讲解
- 首先编写请求类
MyServer
继承socketserver.BaseRequestHandler
,必须实现handle方法,后面我们会说明为何如此 - 初始化
ThreadingTCPServer
类,这个是多线程TCP的,UDP有单独的类,传入我们写的MyServer
- 调用这个方法
serve_forever
,这样每个客户端请求来了,都会开一个线程
我们接下来分析一下背后的源码:
首先看下我们需要继承的BaseRequestHandler
类:
class BaseRequestHandler:
"""Base class for request handler classes.
This class is instantiated for each request to be handled. The
constructor sets the instance variables request, client_address
and server, and then calls the handle() method. To implement a
specific service, all you need to do is to derive a class which
defines a handle() method.
The handle() method can find the request as self.request, the
client address as self.client_address, and the server (in case it
needs access to per-server information) as self.server. Since a
separate instance is created for each request, the handle() method
can define other arbitrary instance variables.
"""
def __init__(self, request, client_address, server):
self.request = request
self.client_address = client_address
self.server = server
self.setup()
try:
self.handle()
finally:
self.finish()
def setup(self):
pass
def handle(self):
pass
def finish(self):
pass
当实例化MyServer时,会执行self.handle()方法,因此我们一定要在MyServer类中定义handle方法
。
server = socketserver.ThreadingTCPServer(ip_port, MyServer)
,这条语句,会实例化ThreadingTCPServer
,
源码
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
TCPServer
又继承基类BaseServer
,两者的初始化方法做的事情:主要是self.server_bind()和self.server_activate()
def server_bind(self):
"""Called by constructor to bind the socket.
May be overridden.
"""
if self.allow_reuse_address:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(self.server_address)
self.server_address = self.socket.getsockname()
def server_activate(self):
"""Called by constructor to activate the server.
May be overridden.
"""
self.socket.listen(self.request_queue_size)
server_bind
这个方法是绑定地址
server_activate
调用了socket的listen方法。
def serve_forever(self, poll_interval=0.5):
"""Handle one request at a time until shutdown.
Polls for shutdown every poll_interval seconds. Ignores
self.timeout. If you need to do periodic tasks, do them in
another thread.
"""
self.__is_shut_down.clear()
try:
# XXX: Consider using another file descriptor or connecting to the
# socket to wake this up instead of polling. Polling reduces our
# responsiveness to a shutdown request and wastes cpu at all other
# times.
with _ServerSelector() as selector:
selector.register(self, selectors.EVENT_READ)
while not self.__shutdown_request:
ready = selector.select(poll_interval)
# bpo-35017: shutdown() called during select(), exit immediately.
if self.__shutdown_request:
break
if ready:
self._handle_request_noblock()
self.service_actions()
finally:
self.__shutdown_request = False
self.__is_shut_down.set()
调用这个方法,根据io多路复用机制,有数据的时候会执行self._handle_request_noblock()
,
进而执行process_request
,而这个方法又被ThreadingMixIn
类重载,其实这个mixin类的作用就是请求来了,就创建一个线程进行处理。
class ThreadingMixIn:
"""Mix-in class to handle each request in a new thread."""
# Decides how threads will act upon termination of the
# main process
daemon_threads = False
# If true, server_close() waits until all non-daemonic threads terminate.
_block_on_close = False
# For non-daemonic threads, list of threading.Threading objects
# used by server_close() to wait for all threads completion.
_threads = None
def process_request_thread(self, request, client_address):
"""Same as in BaseServer but as a thread.
In addition, exception handling is done here.
"""
try:
self.finish_request(request, client_address)
except Exception:
self.handle_error(request, client_address)
finally:
self.shutdown_request(request)
def process_request(self, request, client_address):
"""Start a new thread to process the request."""
t = threading.Thread(target = self.process_request_thread,
args = (request, client_address))
t.daemon = self.daemon_threads
if not t.daemon and self._block_on_close:
if self._threads is None:
self._threads = []
self._threads.append(t)
t.start()
def server_close(self):
super().server_close()
if self._block_on_close:
threads = self._threads
self._threads = None
if threads:
for thread in threads:
thread.join()
process_request
调用process_request_thread
,,而这个方法就是请求来了,就创建一个线程进行处理,这个请求代表的是每个客户端的连接conn,s=socket.socket() ; conn,addr = s.accept()
这就实现了服务端的多线程模式,使得可以同时处理多个客户端。
总结
每次有客户端连接,数据到了,就会实例化一下我们定义的MyServer
类,每个客户端都有自己的MyServer对象。这样就做到了每个服务端可处理多个客户端。
网友评论