声明:并非标题党,确实是一件实际的案例,这里只是详细捋一下自己的分析过程
-
背景
某日,进行git checkout xxx_branch
时,总是报出Unlink of file 'logs/Crawler_2019-11-02.log' failed. Should I try again? (y/n)
的错误,这是一份多线程爬虫的日志报告,查看stackoverflow,发现原因是:有程序在使用这个文件导致不能正确地进行迁移,常规解决办法:查看任务管理器,找到占用这个文件的进程,把它kill,但这又治标不治本,想到了每次出现这个的原因是:爬虫任务每次都不能正常结束,强制结束导致的资源没有被正确释放,且该任务为多线程任务,导致出现很多僵尸线程。 -
问题:
对占用.log文件的程序正确退出并释放资源,该程序分为两个阶段:- 多线程收集各个具体页面的URL
- 多线程收集各个具体页面的contents
表现出来的行为是每次程序都在第二个阶段卡住,强制结束
-
解决思路:
-
首先怀疑是
queue.Queue()
的问题,不放心它的task_done()
与join()
的配合,打印出的信息发现确实是task_done()
使qsize()
的数目-1,第一阶段执行结束,qsize()
到0,queue这边没问题 -
其次怀疑是多线程这边实现的问题,使用的
concurrrent.futures
,相比于threading
这种自己动手丰衣足食的做法,有点不放心,查看stackoverflow1 stackoverflow2,这个东西确实也是使用了threading
;甚至怀疑过使用这种多线程的做法是不是把默认的主线程阻塞了,但试了一下这种想法也太傻了
-
def worker():
while True:
item = q.get()
if item is None:
break
do_work(item)
q.task_done()
q = queue.Queue()
threads = []
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
for item in source():
q.put(item)
# block until all tasks are done
q.join()
# stop workers
for i in range(num_worker_threads):
q.put(None)
for t in threads:
t.join()
看到以上代码段的时候来源,特别是#stop workers这一段,确信:join()
的作用只是起到barrier的作用,用于同步,并不能结束子线程的执行,在考虑这种多线程问题的时候,还要设置子线程退出机制,对于子线程的退出机制:
- 对于
threading
,可以设置daemon=True
,将子线程设置为守护线程,主线程退出时,子线程自动退出; - 对于
concurrent.futures
, 在stackoverflow1中,讲到关于守护进程的设置,并不能真正的daemon
对于子线程退出的问题,我采用的做法,是通过判断queue为空,子线程退出,这适用于“快生产、慢消费”场景(不同生产消费场景思考)
if self.news_detail_url_queue.empty():
break
-
关键
这些并不能解决阻塞问题,后来检查一部分多线程并发实现的代码时发现:def parallel_do(func, args_list, max_workers=None, mode='thread'): max_workers = thread_max_workers if not max_workers else max_workers exe = cf.ThreadPoolExecutor(max_workers=max_workers) if mode == 'thread' else cf.ProcessPoolExecutor(max_workers=max_workers) with exe as executor: if args_list is None: for t in range(max_workers): executor.submit(func) else: executor.map(func, args_list)
使用with statement
虽然保证了子线程的退出和资源的正确释放,这也是导致整个执行过程被阻塞的根源。 也就是不能使parallel_do
之间实现并发,要想并发,可以考虑去除with
statement。
-
总结
- 使用多线程时要注意子线程的退出条件,否则出现 zombie thread;
-
concurrent.futures
内部使用的仍然是threading
实现的多线程; - 使用
queue.join()
要注意对task_done()
的正确调用,否则会阻塞;
网友评论