美文网首页
由git checkout引发的python多线程思考

由git checkout引发的python多线程思考

作者: wanncy | 来源:发表于2019-11-03 13:50 被阅读0次

声明:并非标题党,确实是一件实际的案例,这里只是详细捋一下自己的分析过程

  • 背景
    某日,进行git checkout xxx_branch时,总是报出Unlink of file 'logs/Crawler_2019-11-02.log' failed. Should I try again? (y/n) 的错误,这是一份多线程爬虫的日志报告,查看stackoverflow,发现原因是:有程序在使用这个文件导致不能正确地进行迁移,常规解决办法:查看任务管理器,找到占用这个文件的进程,把它kill,但这又治标不治本,想到了每次出现这个的原因是:爬虫任务每次都不能正常结束,强制结束导致的资源没有被正确释放,且该任务为多线程任务,导致出现很多僵尸线程。

  • 问题:
    对占用.log文件的程序正确退出并释放资源,该程序分为两个阶段:

    • 多线程收集各个具体页面的URL
    • 多线程收集各个具体页面的contents

    表现出来的行为是每次程序都在第二个阶段卡住,强制结束

  • 解决思路:

    • 首先怀疑是queue.Queue()的问题,不放心它的task_done()join()的配合,打印出的信息发现确实是task_done()使qsize()的数目-1,第一阶段执行结束,qsize()到0,queue这边没问题

    • 其次怀疑是多线程这边实现的问题,使用的concurrrent.futures,相比于threading这种自己动手丰衣足食的做法,有点不放心,查看stackoverflow1 stackoverflow2,这个东西确实也是使用了threading

      甚至怀疑过使用这种多线程的做法是不是把默认的主线程阻塞了,但试了一下这种想法也太傻了

def worker():
    while True:
        item = q.get()
        if item is None:
            break
        do_work(item)
        q.task_done()

q = queue.Queue()
threads = []
for i in range(num_worker_threads):
    t = threading.Thread(target=worker)
    t.start()
    threads.append(t)

for item in source():
    q.put(item)

# block until all tasks are done
q.join()

# stop workers
for i in range(num_worker_threads):
    q.put(None)
for t in threads:
    t.join()

看到以上代码段的时候来源,特别是#stop workers这一段,确信:join()的作用只是起到barrier的作用,用于同步,并不能结束子线程的执行,在考虑这种多线程问题的时候,还要设置子线程退出机制,对于子线程的退出机制:

  • 对于threading,可以设置daemon=True,将子线程设置为守护线程,主线程退出时,子线程自动退出;
  • 对于concurrent.futures, 在stackoverflow1中,讲到关于守护进程的设置,并不能真正的daemon

对于子线程退出的问题,我采用的做法,是通过判断queue为空,子线程退出,这适用于“快生产、慢消费”场景(不同生产消费场景思考)

  if self.news_detail_url_queue.empty():
     break
  • 关键
    这些并不能解决阻塞问题,后来检查一部分多线程并发实现的代码时发现:
    def parallel_do(func, args_list, max_workers=None, mode='thread'):
      max_workers = thread_max_workers if not max_workers else max_workers
      exe = cf.ThreadPoolExecutor(max_workers=max_workers) if mode == 'thread' else cf.ProcessPoolExecutor(max_workers=max_workers)
      with exe as executor:
          if args_list is None:
              for t in range(max_workers):
                  executor.submit(func)
          else:
              executor.map(func, args_list)
    

使用with statement虽然保证了子线程的退出和资源的正确释放,这也是导致整个执行过程被阻塞的根源。 也就是不能使parallel_do之间实现并发,要想并发,可以考虑去除with statement。

  • 总结

    • 使用多线程时要注意子线程的退出条件,否则出现 zombie thread;
    • concurrent.futures内部使用的仍然是threading实现的多线程;
    • 使用queue.join()要注意对task_done()的正确调用,否则会阻塞;

相关文章

网友评论

      本文标题:由git checkout引发的python多线程思考

      本文链接:https://www.haomeiwen.com/subject/kgmkbctx.html