起因:
公司有一个小项目,大概逻辑如下:
服务器A会不断向队列中push消息,消息主要内容是视频的地址,服务器B则需要不断从队列中pop消息,然后将该视频进行剪辑最终将剪辑后的视频保存到云服务器。个人主要实现B服务器逻辑。
实现思路:
1 线程池+多进程
要求点一:主进程要以daemon的方式运行。
要求点二:利用线程池,设置最大同时运行的worker,每一个线程通过调用subprocess中的Popen来运行wget ffprobe ffmpeg等命令处理视频。
2 消息队列采用redis的list实现
3 主线程从队列中获取到消息后,从线程池中获取空闲从线程(在这里,非主线程统称为从线程,下同),从线程对该消息做一些逻辑上的处理后,然后生成进程对视频进行剪辑,最后上传视频。
要求点三:为了让daemon能在收到signint信号时,处理完当前正在进行的worker后关闭,且不能浪费队列中的数据,需要让主进程在有空闲worker时才从队列中获取数据。
大概就是这样:
基本上主要资源耗费在视频下载以及视频处理上,且同时运行的worker(从线程)不会太多(一般cpu有几个就设置几个worker)。
上面一共有三个要求点,其中要求点二并不费事。所以忽略。
实现
要求点一实现:
要求点三实现:
线程池,采用python的futures模块。该模块提供了线程池的机制。稍微说一下他的线程池实现原理吧,ThreadPoolExecutor该类实现了线程池:
1 每个实例本身有个_work_queue属性,这是一个Queue对象,里面存储了任务。
2 每当我们调用该对象的submit方法时,都会向其_work_queue中放入一个任务,同时生成从线程,直到从线程数达到max_worker所设定的值。
3 该线程池实例中所有的从线程会不断的从_work_queue中获取任务,并执行。同时从线程的daemon属性被设置为True
重点就是那嵌套的while循环。
踩坑&收获:
1 python中只有主线程才能处理信号,如果使用线程中的join方法阻塞主线程,如果从线程运行时间过长可能会导致信号长时间无法处理。所以尽量设置从线程的daemon为True。
2 Queue的底层是deque,而deque的底层是一个双端链表,为什么用双端链表而不用list?答案请在参考中找。
3 学会了进程以daemon方式运行的实现方式:
1 pid文件的来源
2 进程以及进程组的概念
3 信号的捕捉
4 dup2函数以及fcntl函数
4 进程使用Popen()创建时,如果用PIPE作为子进程(stdin stdout stderr)与父进程进行交互时,然后调用wait时,如果子进程的stdin stdout stderr中某个数据过多可能会导致主进程卡死。原因也在参考中找。
5 sudo执行脚本时环境变量去哪了?答案请在参考中找
6 python中的weakref模块很有用啊
网友评论