美文网首页
无标题文章

无标题文章

作者: 河自清 | 来源:发表于2014-05-09 23:28 被阅读0次

    1. Stackless Python简介 

      

    Stackless Python是CPython的增强版本,它实现了一个合作式多任务系统。和抢占式多任 

    务系统不同,Stackless中的每个任务(tasklet)必须主动放弃对处理器的控制,才能实现 

    多任务。 

      

    严格地说,合作式多任务系统都可以通过简单的循环实现。但是使用Stackless可以让代码 

    更清楚,同时也让程序员不必考虑复杂的同步控制,因为实际上实现合作式多任务系统的循 

    环并不简单。另一方面,这种合作式多任务系统的资源消耗比传统线程小得多,加上 

    Python有伟大的GIL,所以Stackless的效率还是很不错的。 

      

    Stackless Python的主页是

    http://www.stackless.com

    ,上面可以下载源代码和预编译的版 

    本。同时也有不少文档——多是介绍性质的,缺少手册。幸好,使用Stackless Python不需 

    要厚厚的手册,看了主页上的入门加上help()就足够了。Python的self-documenting就是好 

    !下面有关Stackless的介绍可能尚不敷用,所以如果对Stackless Python感兴趣最好还是 

    啃啃主页上的英文。 

      

    1.1. tasklet & run 

      

    Stackless中一个任务被称作一个tasklet,可以这样创建一个tasklet: 

    stackless.tasklet(a_callable)(args)。之后处理这个任务就是运行a_callable(args) 

      

    创建了一个或多个tasklet之后,可以用stackless.run()来运行这些任务。实际上可以认为 

    run()是进入了多任务系统,如果还有可运行的任务(没有运行完或者阻塞在某个channel上 

    ),那么run()就会持续运行。 

      

    1.2. schedule 

      

    当在一个tasklet中运行stackless.schedule(),当前的任务就被挂起,并放在运行循环的 

    最后。下面一个程序是一个死循环: 

      

    import stackless 

    def idle(): 

         while True: 

             print 'idle' 

             stackless.schedule() 

    stackless.tasklet(idle)() 

    stackless.run() 

      

    1.3. channel: send & receive 

      

    channel是tasklet之间的同步方式,使用stackless.channel()就能建立一个通信用的 

    channel。channel可以传送任何Python对象。下面假设ch = stackless.channel()。 

      

    ch.receive()返回从ch中收到的对象,如果ch中没有对象,则该tasklet被阻塞。 

      

    ch.send(obj)把obj送入ch,如果已经有其他tasklet执行了ch.receive(),则当前tasklet 

    停止运行并成为最后一个tasklet,而阻塞在ch.receive()的tasklet继续运行。如果没有, 

    则当前tasklet阻塞在ch.send(obj)。 

      

    这又是一个死循环: 

      

    import stackless 

    ch = stackless.channel() 

    def ping(): 

         while True: 

             print ch.receive() 

             ch.send('ping') 

    def pong(): 

         while True: 

             print ch.receive() 

             ch.send('pong') 

    stackless.tasklet(ping)() 

    stackless.tasklet(pong)() 

    ch.send('start') 

    stackless.run() 

      

    2. 基于Stackless的多任务:三种行为模式 

      

    概念基本上来自Introduction to Concurrent Programming with Stackless Python,分别 

    是: 

      

    Daemon——每个周期都要运行。 

    class Daemon(object): 

         def __init__(self): 

             stackless.tasklet(self)() 

      

         def __call__(self): 

             while True: 

                 self.run() 

                 stackless.schedule() 

      

    Task——只运行一次。 

    class Task(object): 

         def __init__(self): 

             stackless.tasklet(self)() 

      

         def __call__(self): 

             self.run() 

      

    Handler——通过channel处理请求,没有请求就阻塞。如果收到一个nil请求,则结束运行。 

    class Handler(object): 

         def __init__(self): 

             self.channel = stackless.channel() 

             stackless.tasklet(self)() 

      

         def __call__(self): 

             is_alive = True 

             while is_alive: 

                 message = self.channel.receive() 

                 if message: 

                     is_alive = self.run(message) 

                 else: 

                     is_alive = False 

      

    在游戏中,Daemon实现一些管理功能,每个Object可能都要有一个Handler与之对应(反正 

    大多数Handler都是阻塞的),但因为Handler模型需要分析收到的message,所以有的时候 

    也可以用Task。 

      

    3. 网络连接 

      

    网络连接当然要使用异步socket,我从Python In A Nutshell抄了一个asyncore+asynchat 

    的例子,放在子线程里运行asyncore.loop()。我没有用www.stackless.com提供的一个 

    stacklesssocket,因为大概看了一下stacklesssocket.py,觉得好像有些问题。 

      

    4. PyMud01 

      

    代码:实现了一个echo server,利用了Daemon-Handler-Task三级模型。 

    network_thread通过conn_manager.queue和主线程通信, 

    EchoDaemon给每个不同的连接建立一个EchoHandler, 

    收到信息时发给相应的EchoHandler,EchoHandler建立一个EchoTask。 

    最后由EchoTask完成回显。 

      

    import stackless 

    import asyncore, asynchat, socket 

    import threading, Queue 

      

    class Monitor(asyncore.dispatcher): 

         def __init__(self, conn_manager, addr='', port=9000): 

             asyncore.dispatcher.__init__(self) 

             self.create_socket(socket.AF_INET, socket.SOCK_STREAM) 

             self.bind((addr, port)) 

             self.listen(5) 

             self.conn_manager = conn_manager 

              

         def handle_accept(self): 

             conn, addr_port = self.accept() 

             self.conn_manager.Create(conn, addr_port) 

      

    class Connection(asynchat.async_chat): 

         def __init__(self, conn, addr_port, conn_manager): 

             asynchat.async_chat.__init__(self, conn) 

             self.set_terminator('\r\n') 

             self.addr_port = addr_port 

             self.data_pieces = [] 

             self.conn_manager = conn_manager 

              

         def collect_incoming_data(self, data): 

             self.data_pieces.append(data) 

      

         def found_terminator(self): 

             self.conn_manager.Receive(self.addr_port, ''.join(self.data_pieces)) 

             self.data_pieces = [] 

      

         def handle_close(self): 

             self.conn_manager.Close(self.addr_port) 

             self.close() 

              

    class ConnectionManager(object): 

         def __init__(self): 

             self.queue = Queue.Queue() 

             self.conn_table = {} 

              

         def Create(self, conn, addr_port): 

             self.conn_table[addr_port] = Connection(conn, addr_port, self) 

      

         def Receive(self, addr_port, data): 

             self.queue.put_nowait((addr_port, data)) 

      

         def Send(self, addr_port, data): 

             self.conn_table[addr_port].push(data) 

      

         def Close(self, addr_port): 

             self.conn_table[addr_port] = None 

      

         def IsConnect(self, addr_port): 

             if self.conn_table.has_key(addr_port): 

                 return bool(self.conn_table[addr_port]) 

             else: 

                 return False 

              

    conn_manager = ConnectionManager() 

      

    class NetworkThread(threading.Thread): 

         def run(self): 

             Monitor(conn_manager) 

             asyncore.loop() 

          

    network_thread = NetworkThread() 

    network_thread.start() 

      

    class Daemon(object): 

         def __init__(self): 

             stackless.tasklet(self)() 

      

         def __call__(self): 

             while True: 

                 self.run() 

                 stackless.schedule() 

      

    class Task(object): 

         def __init__(self): 

             stackless.tasklet(self)() 

      

         def __call__(self): 

             self.run() 

      

    class Handler(object): 

         def __init__(self): 

             self.channel = stackless.channel() 

             stackless.tasklet(self)() 

      

         def __call__(self): 

             is_alive = True 

             while is_alive: 

                 message = self.channel.receive() 

                 if message: 

                     is_alive = self.run(message) 

                 else: 

                     is_alive = False 

      

    class EchoTask(Task): 

         def __init__(self, addr_port, data): 

             Task.__init__(self) 

             self.addr_port = addr_port 

             self.data = data 

      

         def run(self): 

             print 'task:', stackless.getruncount(), self.addr_port, self.data 

             conn_manager.Send(self.addr_port, self.data+'\r\n') 

      

    class EchoHandler(Handler): 

         def run(self, message): 

             print 'handler:', stackless.getruncount(), message 

             EchoTask(message[0], message[1]) 

             return True 

          

    class EchoDaemon(Daemon): 

         def __init__(self): 

             Daemon.__init__(self) 

             self.handler_table = {} 

              

         def run(self): 

             for key in self.handler_table.keys(): 

                 if not conn_manager.IsConnect(key): 

                     self.handler_table[key].channel.send(None) 

                     del self.handler_table[key] 

             try: 

                 message = conn_manager.queue.get_nowait() 

             except Queue.Empty: 

                 return 

             print 'daemon:', message 

              

             if not self.handler_table.has_key(message[0]): 

                 self.handler_table[message[0]] = EchoHandler() 

                 print 'daemon:', len(self.handler_table) 

      

             self.handler_table[message[0]].channel.send(message) 

      

    EchoDaemon() 

      

    stackless.run() 

    相关文章

      网友评论

          本文标题:无标题文章

          本文链接:https://www.haomeiwen.com/subject/hhirtttx.html