美文网首页
tornado多线程共享CPU的踩坑经历

tornado多线程共享CPU的踩坑经历

作者: 欠我的都给我吐出来 | 来源:发表于2018-11-26 15:50 被阅读0次

    需求分析

    需求是写一个简单的tornado服务器端程序,内部调用了人脸检测模块的代码。对外提供一个接口,接口的输入是base64的一张或者多张图片即可。当输入多张图片时,要求输出的顺序和输入的图片顺序一致。输出是一个list,其中的每一项是dict,里面保存了人脸的位置信息以及人脸关键点的二维坐标信息。这个基本的服务最终会被封装到docker中,这样就可以开启多个服务,被外界调用。

    性能分析

    我需要完成的是将被封装在docker镜像中的tornado服务。docker的容器本身是且仅仅是一个进程,因此tornado启动时不需要开启多进程了,没有意义。但是在handler函数内部可以使用异步的方式来实现多线程,减少阻塞,实现并发。

    开始踩坑

    第一坑:list不是线程安全的

    python的list不是线程安全的,因此给定的一个list[image],如果想要内部实现多线程并发,那么无法保证按照输入的顺序输出。因此需要先创建一个image_id的字典,并且将id的顺序保存下来。这样在多线程并发执行完成之后,按照id的顺序重新排序输出。这里还需要注意的是,如果传入的图片是同一图片传入多次,那么在保存image_id的时候,相同key的value会被覆盖,因此设置键值的时候不能直接以图片指针为键,而需要仔细的设定。

    第二坑:tornado自带的multi函数

    我们希望多线程去完成人脸识别的功能,主线程等待多线程完成之后,统一进行整理工作。因此如何在python中让主线程等待子线程完成之后再返回是一个问题。阅读tornado的官网,发现可以使用multi方法,这个方法可以保证内部多线程的执行,同时保证输出的顺序和输入的顺序一致。

    multi是协程模式中的一个函数。后面跟是dict或者list的Future返回对象。然后并行等待输出,且如果后面跟的是list,那么返回的list顺序和原始list一致。

    if isinstance(images,list):
                task_res=await multi([image_processing(image) for image in images])
    

    我以为问题就这么圆满的解决了,但是,请注意!!!!
    问题并没有解决!!!
    实际测试的时候,使用multi运行100张图片的时间是6.9s,而如果改用for循环同步运行的方式耗时6.6s。这意味着使用异步的方式反而比使用同步的方式要慢!

    分析:查看了multi函数的源码,内部将所有的list中Future传给了一个multi_future的方法。

    if _contains_yieldpoint(children):
        return MultiYieldPoint(children, quiet_exceptions=quiet_exceptions)
    else:
        return multi_future(children, quiet_exceptions=quiet_exceptions)
    

    查看mulit_future方法,维护着多个子future,每次有一个子future完成后,就会调用callback,将其从unfinished_children中移除,当所有子Future的callback都执行完后,会真正调用set_result方法。

    也就是说,multi确实是实现了异步的。那么为什么在这里没有实现异步的效果?再次在网上搜索原因,可能的原因如下:

    1. 首先我们要知道Tornado是个单线程的服务器,他作为服务端是没法使用多线程处理并发。tornado 本身就是利用ioloop的异步回调解决io阻塞的问题。也就是说,如果多个任务中,每个任务既有IO耗时操作又有CPU计算操作,那么使用ioloop的事件注册循环机制,就可以实现在一个任务IO时,另一个任务去执行CPU。而如果我们的任务都是高CPU的,而没有IO阻塞操作,那么使用异步反而导致更多的事件花在任务切换上。所以时间反而变长了。
    2. 如果希望tornado开启多个进程,那么可以在main函数中使用如下形式:
    def main():
        app = make_app()
        server = tornado.httpserver.HTTPServer(app)
        server.bind(8888)
        server.start(0)  # forks one process per cpu
        IOLoop.current().start()
    

    但是遗憾的是,如果是window环境下,不能使用这样的方式,因为server.start会调用底层的os.fork函数,而windows没有这个函数。

    第三弹-使用线程池concurrent.futures.ThreadPoolExecutor

    使用线程池可以实现多线程功能是之前就明白的。如何实现主线程在多线程执行完毕之后在进行之后的操作时这个解决方式的需要考虑的。通过阅读官网说明可以看到,官网提供了一个wait函数

    concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)
    

    函数等待fs这些future实例完成,并且返回两个命名元组集合( a named 2-tuple of sets),第一个命名元组done是在timeout时间内返回的已经完成或者取消的future集合。第二个命名原则not_done是在timeout时间内没有完成的future集合。
    timeout用于控制等待时间,默认None表示没有限定等待时间。
    return_when用于指定什么时候这个函数返回,其中ALL_COMPLETE表示会等到所有的Future实例都完成或者取消之后才返回。

    因此我们可以先定义一个Future的list,然后使用这个wait方法等待所有的任务都完成,最后整理done这个集合中的返回结果。

                with ThreadPoolExecutor(args.executor) as executor:
                    future_result = [executor.submit(image_processing, image=image[0], num=id) for image, id in image_id.items()]
    
                    #主线程等待全部执行完毕
                    done, undone = concurrent.futures.wait(future_result, return_when=ALL_COMPLETED)
                    for future in done:
                        res = future.result()
                        id_res[res[0]] = res[1]
    

    在设置线程池的线程数量为36时,100张图片的处理时间为3.6s。顺序处理的时间为6.6s.

    第四弹:实现参数的动态输入

    我们希望在开启这个服务的时候,可以通过参数的方式设置例如监听端口、线程池数量等参数。可以使用argparse包。

    parser = argparse.ArgumentParser(description='Start the face dection web server')
    parser.add_argument('--port', type=int, default=3000, help='The web port to listen.')
    parser.add_argument('--executor', type=int, default=36, help='The num of thread in ThreadPoolExecutor')
    args = parser.parse_args()
    

    这样,就可以使用args.port, args.executor来代替下面代码出现的对应变量值。然后在开启服务时,可以通过python XXX.py --port 8080 --executor 4来改变

    完成任务!!!

    相关文章

      网友评论

          本文标题:tornado多线程共享CPU的踩坑经历

          本文链接:https://www.haomeiwen.com/subject/ffdbqqtx.html