美文网首页网络IO
实现非阻塞IO—select解析

实现非阻塞IO—select解析

作者: LazzMan | 来源:发表于2017-04-17 23:28 被阅读19次

    一、select原理

    网络通信被Unix系统抽象为文件的读写,通常是一个设备,由设备驱动程序提供,驱动可以知道自身的数据是否可用。支持阻塞操作的设备驱动通常会实现一组自身的等待队列,如读/写等待队列用于支持上层(用户层)所需的block或non-block操作。设备的文件的资源如果可用(可读或者可写)则会通知进程,反之则会让进程睡眠,等到数据到来可用的时候,再唤醒进程。

    这些设备的文件描述符被放在一个数组中,然后select调用的时候遍历这个数组,如果对于的文件描述符可读则会返回改文件描述符。当遍历结束之后,如果仍然没有一个可用设备文件描述符,select让用户进程则会睡眠,直到等待资源可用的时候在唤醒,遍历之前那个监视的数组。每次遍历都是线性的。


    二、select方法

    Python的select()方法直接调用操作系统的IO接口,它监控sockets,open files, and pipes(所有带fileno()方法的文件句柄)何时变成readable 和writeable, 或者通信错误,select()使得同时监控多个连接变的简单,并且这比写一个长循环来等待和监控多客户端连接要高效,因为select直接通过操作系统提供的C的网络接口进行操作,而不是通过Python的解释器。

    示例(只支持Unix,不支持Windows):
    服务端:

    import select
    import socket
    import sys
    import Queue
    
    # Create a TCP/IP socket
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setblocking(0)
     
    # Bind the socket to the port
    server_address = ('localhost', 10000)
    print >>sys.stderr, 'starting up on %s port %s' % server_address
    server.bind(server_address)
     
    # Listen for incoming connections
    server.listen(5)
    

    select()方法接收并监控3个通信列表, 第一个是所有的输入的data,就是指外部发过来的数据,第2个是监控和接收所有要发出去的data(outgoing data),第3个监控错误信息,接下来我们需要创建2个列表来包含输入和输出信息来传给select().

    readable , writable , exceptional = select.select(inputs, outputs, inputs, timeout)
    

    所有客户端的进来的连接和数据将会被server的主循环程序放在上面的list中处理,我们现在的server端需要等待连接可写(writable)之后才能过来,然后接收数据并返回(因此不是在接收到数据之后就立刻返回),因为每个连接要把输入或输出的数据先缓存到queue里,然后再由select取出来再发出去

    当你把inputs,outputs,exceptional(这里跟inputs共用)传给select()后,它返回3个新的list,我们上面将他们分别赋值为readable,writable,exceptional, 所有在readable list中的socket连接代表有数据可接收(recv),所有在writable list中的存放着你可以对其进行发送(send)操作的socket连接,当连接通信出现error时会把error写到exceptional列表中。

    Readable list 中的socket 可以有3种可能状态,第一种是如果这个socket是main "server" socket,它负责监听客户端的连接,如果这个main server socket出现在readable里,那代表这是server端已经ready来接收一个新的连接进来了,为了让这个main server能同时处理多个连接,在下面的代码里,我们把这个main server的socket设置为非阻塞模式。

    # Handle inputs
    for s in readable:
     
        if s is server:
            # A "readable" server socket is ready to accept a connection
            connection, client_address = s.accept()
            print >>sys.stderr, 'new connection from', client_address
            connection.setblocking(0)
            inputs.append(connection)
     
            # Give the connection a queue for data we want to send
            message_queues[connection] = Queue.Queue()
    

    第二种情况是这个socket是已经建立了的连接,它把数据发了过来,这个时候你就可以通过recv()来接收它发过来的数据,然后把接收到的数据放到queue里,这样你就可以把接收到的数据再传回给客户端了。

    else:
         data = s.recv(1024)
         if data:
             # A readable client socket has data
             print >>sys.stderr, 'received "%s" from %s' % (data, s.getpeername())
             message_queues[s].put(data)
             # Add output channel for response
             if s not in outputs:
                 outputs.append(s)
    

    第三种情况就是这个客户端已经断开了,所以你再通过recv()接收到的数据就为空了,所以这个时候你就可以把这个跟客户端的连接关闭了。

    else:
        # Interpret empty result as closed connection
        print >>sys.stderr, 'closing', client_address, 'after reading no data'
        # Stop listening for input on the connection
        if s in outputs:
            outputs.remove(s)  #既然客户端都断开了,我就不用再给它返回数据了,所以这时候如果这个客户端的连接对象还在outputs列表中,就把它删掉
        inputs.remove(s)    #inputs中也删除掉
        s.close()           #把这个连接关闭掉
     
        # Remove message queue
        del message_queues[s] 
    
    
    else:
        # Interpret empty result as closed connection
        print >>sys.stderr, 'closing', client_address, 'after reading no data'
        # Stop listening for input on the connection
        if s in outputs:
            outputs.remove(s)  #既然客户端都断开了,我就不用再给它返回数据了,所以这时候如果这个客户端的连接对象还在outputs列表中,就把它删掉
        inputs.remove(s)    #inputs中也删除掉
        s.close()           #把这个连接关闭掉
     
        # Remove message queue
        del message_queues[s]  
    

    对于writable list中的socket,也有几种状态,如果这个客户端连接在跟它对应的queue里有数据,就把这个数据取出来再发回给这个客户端,否则就把这个连接从output list中移除,这样下一次循环select()调用时检测到outputs list中没有这个连接,那就会认为这个连接还处于非活动状态

    # Handle outputs
    for s in writable:
        try:
            next_msg = message_queues[s].get_nowait()
        except Queue.Empty:
            # No messages waiting so stop checking for writability.
            print >>sys.stderr, 'output queue for', s.getpeername(), 'is empty'
            outputs.remove(s)
        else:
            print >>sys.stderr, 'sending "%s" to %s' % (next_msg, s.getpeername())
            s.send(next_msg)
    

    最后,如果在跟某个socket连接通信过程中出了错误,就把这个连接对象在inputs\outputs\message_queue中都删除,再把连接关闭掉

    # Handle "exceptional conditions"
    for s in exceptional:
        print >>sys.stderr, 'handling exceptional condition for', s.getpeername()
        # Stop listening for input on the connection
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()
     
        # Remove message queue
        del message_queues[s]
    

    客户端

    下面的这个是客户端程序展示了如何通过select()对socket进行管理并与多个连接同时进行交互

    import socket
    import sys
     
    messages = [ 'This is the message. ',
                 'It will be sent ',
                 'in parts.',
                 ]
    server_address = ('localhost', 10000)
     
    # Create a TCP/IP socket
    socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
              socket.socket(socket.AF_INET, socket.SOCK_STREAM),
              ]
    
    # Connect the socket to the port where the server is listening
    print >>sys.stderr, 'connecting to %s port %s' % server_address
    for s in socks:
        s.connect(server_address)
    

    接下来通过循环通过每个socket连接给server发送和接收数据

    for message in messages:
     
        # Send messages on both sockets
        for s in socks:
            print >>sys.stderr, '%s: sending "%s"' % (s.getsockname(), message)
            s.send(message)
     
        # Read responses on both sockets
        for s in socks:
            data = s.recv(1024)
            print >>sys.stderr, '%s: received "%s"' % (s.getsockname(), data)
            if not data:
                print >>sys.stderr, 'closing socket', s.getsockname()
    

    服务端完整代码

    #_*_coding:utf-8_*_
     
    import select
    import socket
    import sys
    import queue
     
    # Create a TCP/IP socket
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setblocking(False)
     
    # Bind the socket to the port
    server_address = ('localhost', 10000)
    print(sys.stderr, 'starting up on %s port %s' % server_address)
    server.bind(server_address)
     
    # Listen for incoming connections
    server.listen(5)
     
    # Sockets from which we expect to read
    inputs = [ server ]
     
    # Sockets to which we expect to write
    outputs = [ ]
     
    message_queues = {}
    while inputs:
     
        # Wait for at least one of the sockets to be ready for processing
        print( '\nwaiting for the next event')
        readable, writable, exceptional = select.select(inputs, outputs, inputs)
        # Handle inputs
        for s in readable:
     
            if s is server:
                # A "readable" server socket is ready to accept a connection
                connection, client_address = s.accept()
                print('new connection from', client_address)
                connection.setblocking(False)
                inputs.append(connection)
     
                # Give the connection a queue for data we want to send
                message_queues[connection] = queue.Queue()
            else:
                data = s.recv(1024)
                if data:
                    # A readable client socket has data
                    print(sys.stderr, 'received "%s" from %s' % (data, s.getpeername()) )
                    message_queues[s].put(data)
                    # Add output channel for response
                    if s not in outputs:
                        outputs.append(s)
                else:
                    # Interpret empty result as closed connection
                    print('closing', client_address, 'after reading no data')
                    # Stop listening for input on the connection
                    if s in outputs:
                        outputs.remove(s)  #既然客户端都断开了,我就不用再给它返回数据了,所以这时候如果这个客户端的连接对象还在outputs列表中,就把它删掉
                    inputs.remove(s)    #inputs中也删除掉
                    s.close()           #把这个连接关闭掉
     
                    # Remove message queue
                    del message_queues[s]
        # Handle outputs
        for s in writable:
            try:
                next_msg = message_queues[s].get_nowait()
            except queue.Empty:
                # No messages waiting so stop checking for writability.
                print('output queue for', s.getpeername(), 'is empty')
                outputs.remove(s)
            else:
                print( 'sending "%s" to %s' % (next_msg, s.getpeername()))
                s.send(next_msg)
        # Handle "exceptional conditions"
        for s in exceptional:
            print('handling exceptional condition for', s.getpeername() )
            # Stop listening for input on the connection
            inputs.remove(s)
            if s in outputs:
                outputs.remove(s)
            s.close()
     
            # Remove message queue
            del message_queues[s]
    

    客户端完整代码

    import socket
    import sys
     
    messages = [ 'This is the message. ',
                 'It will be sent ',
                 'in parts.',
                 ]
    server_address = ('localhost', 10000)
     
    # Create a TCP/IP socket
    socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
              socket.socket(socket.AF_INET, socket.SOCK_STREAM),
              ]
     
    # Connect the socket to the port where the server is listening
    print >>sys.stderr, 'connecting to %s port %s' % server_address
    for s in socks:
        s.connect(server_address)
     
    for message in messages:
     
        # Send messages on both sockets
        for s in socks:
            print >>sys.stderr, '%s: sending "%s"' % (s.getsockname(), message)
            s.send(message)
     
        # Read responses on both sockets
        for s in socks:
            data = s.recv(1024)
            print >>sys.stderr, '%s: received "%s"' % (s.getsockname(), data)
            if not data:
                print >>sys.stderr, 'closing socket', s.getsockname()
                s.close()
    

    [本文转自:http://www.cnblogs.com/alex3714/p/4372426.html#top]

    相关文章

      网友评论

        本文标题:实现非阻塞IO—select解析

        本文链接:https://www.haomeiwen.com/subject/wrvxzttx.html