美文网首页程序员
Python 实现TCP长连接,epoll、select通信模式

Python 实现TCP长连接,epoll、select通信模式

作者: Ethansmart | 来源:发表于2018-10-07 10:28 被阅读0次

    epoll 是在2.5.44内核中被引进的(epoll(4) is a new API introduced in Linux kernel 2.5.44),它几乎具备众多优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。

    epoll的优点:

    1.支持一个进程打开大数目的socket描述符(FD)
    select 最不能忍受的是一个进程所打开的FD是有一定限制的,由FD_SETSIZE设置,默认值是2048。对于那些需要支持的上万连接数目的IM服务器来说显然太少了。这时候你一是可以选择修改这个宏然后重新编译内核,不过资料也同时指出这样会带来网络效率的下降,二是可以选择多进程的解决方案(传统的 Apache方案),不过虽然linux上面创建进程的代价比较小,但仍旧是不可忽视的,加上进程间数据同步远比不上线程间同步的高效,所以也不是一种完美的方案。不过 epoll则没有这个限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左右,具体数目可以cat /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。

    2.IO效率不随FD数目增加而线性下降
    传统的select/poll另一个致命弱点就是当你拥有一个很大的socket集合,不过由于网络延时,任一时间只有部分的socket是"活跃"的,但是select/poll每次调用都会线性扫描全部的集合,导致效率呈现线性下降。但是epoll不存在这个问题,它只会对"活跃"的socket进行操作---这是因为在内核实现中epoll是根据每个fd上面的callback函数实现的。那么,只有"活跃"的socket才会主动的去调用 callback函数,其他idle状态socket则不会,在这点上,epoll实现了一个"伪"AIO,因为这时候推动力在os内核。在一些 benchmark中,如果所有的socket基本上都是活跃的---比如一个高速LAN环境,epoll并不比select/poll有什么效率,相反,如果过多使用epoll_ctl,效率相比还有稍微的下降。但是一旦使用idle connections模拟WAN环境,epoll的效率就远在select/poll之上了。

    3.使用mmap加速内核与用户空间的消息传递
    这点实际上涉及到epoll的具体实现了。无论是select,poll还是epoll都需要内核把FD消息通知给用户空间,如何避免不必要的内存拷贝就很重要,在这点上,epoll是通过内核于用户空间mmap同一块内存实现的。而如果你想我一样从2.5内核就关注epoll的话,一定不会忘记手工 mmap这一步的。

    4.内核微调

    这一点其实不算epoll的优点了,而是整个linux平台的优点。也许你可以怀疑linux平台,但是你无法回避linux平台赋予你微调内核的能力。比如,内核TCP/IP协议栈使用内存池管理sk_buff结构,那么可以在运行时期动态调整这个内存pool(skb_head_pool)的大小--- 通过echo XXXX>/proc/sys/net/core/hot_list_length完成。再比如listen函数的第2个参数(TCP完成3次握手的数据包队列长度),也可以根据你平台内存大小动态调整。更甚至在一个数据包面数目巨大但同时每个数据包本身大小却很小的特殊系统上尝试最新的NAPI网卡驱动架构。

    linux下epoll如何实现高效处理百万句柄的

    开发高性能网络程序时,windows开发者们言必称iocp,linux开发者们则言必称epoll。大家都明白epoll是一种IO多路复用技术,可以非常高效的处理数以百万计的socket句柄,比起以前的select和poll效率高大发了。我们用起epoll来都感觉挺爽,确实快,那么,它到底为什么可以高速处理这么多并发连接呢?

    使用起来很清晰,首先要调用epoll_create建立一个epoll对象。参数size是内核保证能够正确处理的最大句柄数,多于这个最大数时内核可不保证效果。epoll_ctl可以操作上面建立的epoll,例如,将刚建立的socket加入到epoll中让其监控,或者把 epoll正在监控的某个socket句柄移出epoll,不再监控它等等。epoll_wait在调用时,在给定的timeout时间内,当在监控的所有句柄中有事件发生时,就返回用户态的进程。从上面的调用方式就可以看到epoll比select/poll的优越之处:因为后者每次调用时都要传递你所要监控的所有socket给select/poll系统调用,这意味着需要将用户态的socket列表copy到内核态,如果以万计的句柄会导致每次都要copy几十几百KB的内存到内核态,非常低效。而我们调用epoll_wait时就相当于以往调用select/poll,但是这时却不用传递socket句柄给内核,因为内核已经在epoll_ctl中拿到了要监控的句柄列表。所以,实际上在你调用epoll_create后,内核就已经在内核态开始准备帮你存储要监控的句柄了,每次调用epoll_ctl只是在往内核的数据结构里塞入新的socket句柄。当一个进程调用epoll_creaqte方法时,Linux内核会创建一个eventpoll结构体,这个结构体中有两个成员与epoll的使用方式密切相关:每一个epoll对象都有一个独立的eventpoll结构体,这个结构体会在内核空间中创造独立的内存,用于存储使用epoll_ctl方法向epoll对象中添加进来的事件。这样,重复的事件就可以通过红黑树而高效的识别出来。

    此外,epoll还维护了一个双链表,用户存储发生的事件。当epoll_wait调用时,仅仅观察这个list链表里有没有数据即eptime项即可。有数据就返回,没有数据就sleep,等到timeout时间到后即使链表没数据也返回。所以,epoll_wait非常高效。而且,通常情况下即使我们要监控百万计的句柄,大多一次也只返回很少量的准备就绪句柄而已,所以,epoll_wait仅需要从内核态copy少量的句柄到用户态而已,如何能不高效 !那么,这个准备就绪list链表是怎么维护的呢?当我们执行epoll_ctl时,除了把socket放到epoll文件系统里file对象对应的红黑树上之外,还会给内核中断处理程序注册一个回调函数,告诉内核,如果这个句柄的中断到了,就把它放到准备就绪list链表里。所以,当一个socket上有数据到了,内核在把网卡上的数据copy到内核中后就来把socket插入到准备就绪链表里了。如此,一颗红黑树,一张准备就绪句柄链表,少量的内核cache,就帮我们解决了大并发下的socket处理问题。执行epoll_create时,创建了红黑树和就绪链表,执行epoll_ctl时,如果增加socket句柄,则检查在红黑树中是否存在,存在立即返回,不存在则添加到树干上,然后向内核注册回调函数,用于当中断事件来临时向准备就绪链表中插入数据。执行epoll_wait时立刻返回准备就绪链表里的数据即可。

    Python Tcp长连接 Server端代码:

    #coding:utf-8
    import socket
    import select
    
    class emsc_server:
        def __init__(self):
            self.serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            self.serversocket.bind(('0.0.0.0', 5000))
            self.serversocket.listen(1000)
            self.serversocket.setblocking(0)
    
        def run(self):
            response = "接收成功,返回数据: connecting status: 200 \n"
            response += "haody,client !"
            epoll = select.epoll()
            epoll.register(self.serversocket.fileno(), select.EPOLLIN)
    
            try:
                connections = {}
                requests = {}
                responses = {}
                endflag = '\n\r\n'
    
                while True:
                    events = epoll.poll(1)
                    for fid, event in events:
                        if fid == self.serversocket.fileno():
                            connection, address = self.serversocket.accept()
                            connection.setblocking(0)
                            epoll.register(connection.fileno(), select.EPOLLIN)
                            connections[connection.fileno()] = connection
                            requests[connection.fileno()] = ''
                            responses[connection.fileno()] = response.encode()
    
                        elif event & select.EPOLLIN:
                            try:
                                requests[fid] = connections[fid].recv(1024)
                                if len(str(requests[fid].decode())) == 0:
                                    connections[fid].shutdown(socket.SHUT_RDWR)
                                    break
                                else:
                                    print("2 | ------ : " + str(requests[fid].decode()) + "\n")
                                    byteswritten = connections[fid].send(responses[fid])
    
                                if endflag in requests[fid]:
                                    epoll.modify(fid, select.EPOLLOUT)
                                    connections[fid].setsockopt(socket.IPPROTO_TCP, socket.TCP_CORK, 1)
                                    print('-' * 40 + '\n' + requests[fid].decode()[:-2])
                            except:
                                continue
    
                        elif event & select.EPOLLOUT:
                            byteswritten = connections[fid].send(responses[fid])
                            responses[fid] = responses[fid][byteswritten:]
                            if len(responses[fid]) == 0:
                                connections[fid].setsockopt(socket.IPPROTO_TCP, socket.TCP_CORK, 0)
                                epoll.modify(fid, 0)
                                connections[fid].shutdown(socket.SHUT_RDWR)
    
                        elif event & select.EPOLLHUP:
                            epoll.unregister(fid)
                            connections[fid].close()
                            del connections[fid]
    
            except:
                print("server excepted ...")
                epoll.unregister(self.serversocket.fileno())
                self.run()
    
            finally:
                print("server closed ...")
    
    if __name__=="__main__":
        emsc = emsc_server()
        emsc.run()
    

    Python Tcp长连接 Client端代码:

    #coding:utf-8
    import socket
    import time
    
    class emsc_client:
    
        def __init__(self):
            self.host = "127.0.0.1"
            self.port = 5000
            self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    
        def run(self):
            try:
                self.conn.connect((self.host, self.port))
                while True:
                    self.conn.send(("来自客户端发送的数据 : " + str(time.time())).encode())
                    data = self.conn.recv(1024).decode()
                    print("来自服务端数据 :" + data + "|" + str(time.time()))
                    time.sleep(0.1)
            except:
                print("服务器连接异常,尝试重新连接 (10s) ...")
                self.conn.close()
                time.sleep(10) # 断开连接后,每10s重新连接一次
                emsc_client().run()
    
            finally:
                print("客户端已关闭 ...")
    
    if __name__=="__main__":
        emsc = emsc_client()
        emsc.run()
    

    实现epoll,I/O复用

    服务端断开后,客户端每10s重新连接

    在windows 上

    Python 采用Select 模式 实现Tcp 长连接

    #coding:utf-8
    import select
    import socket
    import queue
    import time
    import os
    
    class emsc_select_server:
        def __init__(self):
            self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.server.setblocking(False)
            self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            self.server_address = ('0.0.0.0', 5000)
            self.server.bind(self.server_address)
            self.server.listen(1000)
            self.inputs = [self.server]
            self.outputs = []
            self.message_queues = {}
            self.timeout = 20
    
        def run(self):
            response = "接收成功,返回数据: connecting status: 200 \n"
            response += "haody,client ! | "
    
            while self.inputs:
                print("waiting for next event")
                # timeout是超时,当前连接要是超过这个时间的话,就会kill
                readable, writable, exceptional = select.select(self.inputs, self.outputs, self.inputs, self.timeout)
    
                if not (readable or writable or exceptional):
                    print("Time out ! ")
                    break
                for ser in readable:
                    if ser is self.server:
                        # 通过inputs查看是否有客户端来
                        connection, client_address = ser.accept()
                        print("connection from ", client_address)
                        connection.setblocking(0)
                        self.inputs.append(connection)
                        self.message_queues[connection] = queue.Queue()
                    else:
                        data = ser.recv(1024)
                        if data:
                            print("收到数据 ", data.decode(), "\n来自:", ser.getpeername())
                            self.message_queues[ser].put(data)
                            # 添加通道
                            if ser not in self.outputs:
                                self.outputs.append(ser)
                        else:
                            print("closing", client_address)
                            if ser in self.outputs:
                                self.outputs.remove(ser)
                            self.inputs.remove(ser)
                            ser.close()
                            # 清除队列信息
                            del self.message_queues[ser]
    
                for ser in writable:
                    try:
                        next_msg = self.message_queues[ser].get_nowait()
                    except queue.Empty:
                        print(ser.getpeername(), 'queue empty')
                        self.outputs.remove(ser)
                    else:
                        print("发送数据 ", str(response + next_msg.decode()), " to ", ser.getpeername(),"\n")
                        ser.send(response.encode()+next_msg)
    
                for ser in exceptional:
                    print(" exception condition on ", ser.getpeername())
                    # stop listening for input on the connection
                    self.inputs.remove(ser)
                    if ser in self.outputs:
                        self.outputs.remove(ser)
                    ser.close()
                    # 清除队列信息
                    del self.message_queues[ser]
    
    if __name__=="__main__":
        select_server = emsc_select_server()
        select_server.run()
    

    客户端:同上

    相关文章

      网友评论

        本文标题:Python 实现TCP长连接,epoll、select通信模式

        本文链接:https://www.haomeiwen.com/subject/emtlaftx.html