美文网首页爬虫专题python基础Python中文社区
Python 线程、线程通信、多线程

Python 线程、线程通信、多线程

作者: 梅花鹿数据rieuse | 来源:发表于2018-09-18 23:12 被阅读97次

    这是一篇学习Python 线程相关的内容,记录一下以备复习和开发使用,技术有限,如有问题欢迎指出,多谢。


    一.GIL 全局解释器锁(cpython)

    1.为什么会有这个锁:为了线程安全,减少python使用者的上手难度
    GIL 使得同一个时刻只有一个线程在一个cpu上执行字节码,无法隐射到多个cpu,多核上执行。
    2.特殊情况下会释放GIL:达到特定字节码行数、到底特定数目时间片、IO操作(主动)

    二:并发和并行的区别
    • 并发:描述程序的组织结构,指程序要被设计成多个可独立执行的子任务
    • 并行:描述程序的执行状态,指多任务需要同时执行
    三:守护线程&线程阻塞
    • 守护线程:thread.setDaemon(true),当主程序退出的时候让子程序也一并退出
    • 子线程阻塞:thread.join(),当子程序都结束后主程序再退出
    四:多线程的写法
    • 实例化Threading,调用Threading的方法去进行多线程编程
    • 写子类继承Theading,重写相应的方法
      说明:当程序简单时可使用实例化方法,当程序较复杂的时候,实现逻辑较多,第二种方法。
    五:线程间通信
    • 1.共享变量:
      方法简单,也可以写入到单独的py文件中。问题:线程不安全,易出问题。
    • 2.queue 队列:
      使用queue 的 Queue,这个是线程安全的,多线程取数据不会出错。
      内部使用的是deque Python 的双端队列,在字节码的层面上就已经到达了线程安全。
    q = Queue()
    # 方法:
    q.put()  # 放入数据
    q.get()  # 取出数据
    q.put_nowait()  # 放入数据,不用等待它完成再返回,异步的方法
    q.get_nowait()  # 取出数据,不用等待它完成再返回,异步的方法
    

    get() put(),可以设置是否阻塞的,默认是阻塞的

    q.join()方法:
    只有q.task_done()调用了join()才会让主线程退出,成对使用。

    六:线程同步
    • Lock 锁
    lock= Theading.Lock()
    # 获取锁:
    lock.acquire()
    lock.release()
    
    # 另一种方法:
    with lock:
        # do something
    

    加锁的代码段同时只有这一个代码段在执行,方式数据出问题。
    缺点:1.用锁会影响性能 2. 可能引起死锁
    死锁情况:1.有acquire 没有release 2. 相互等待

    • RLock 可重入锁
      当在一个线程中多个地方需要加锁的时候用Lock 是不行的,需要用到RLock ,但是要注意的是获取和释放锁的数量要一致,成对出现。
    • Condition 条件变量
      用于复杂的线程间同步,是一个同步锁。例如:先后顺序的多线程通信。
      重点函数:wait() notify()
    con = theading.Condition()
    with con:
        # do something
        cond.notify()   #通知其他线程
        cond.wait()    # 等待其他线程通知
        # do something
    

    注意:
    1.先con.acquire()或者with con,获取condition锁,不然wait() notify() 不能用
    2.Condition 有两把锁:一把底层锁会在线程调用了wait() 的时候释放,上面的锁会在每次调用wait的时候分配一把并放入condition的等待队列中,等到notify()的唤醒

    • Semaphore 信号量
      用于控制某段代码进入线程的数量,比如控制爬虫的并发量。
    import threading
    import time
    
    
    class HtmlSppier(threading.Thread):
        def __init__(self, url, sem):
            super().__init__()
            self.sem = sem
            self.url = url
    
        def run(self):
            time.sleep(2)
            print('download html success')
            self.sem.release()
    
    class UrlProducer(threading.Thread):
        def __init__(self,sem):
            super().__init__()
            self.sem = sem
    
    
        def run(self):
            for i in range(20):
                self.sem.acquire()
                html_thread = HtmlSppier(f'http://www.qq.com/pn={i}',self.sem)
                html_thread.start()
    
    
    if __name__ == '__main__':
        sem = threading.Semaphore(3)
        url_produce = UrlProducer(sem)
        url_produce.start()
    
    七:线程池

    为什么要使用线程池?
    主线程中可以获取某一个线程的状态或者某一个的任务的状态以及返回值
    当一个线程完成的时候主线程能立即知道

    import requests
    
    def download_html(i):
        url = f'https://www.baidu.com/s?ie=UTF-8&wd={i}'
        response = requests.get(url).text
        print(response)
    
    ids = list(range(100))
    
    
    # 线程池方式一:
    import threadpool
    def thread_main(item):
        pool = threadpool.ThreadPool(30)
        tasks = threadpool.makeRequests(download_html, ids)
        [pool.putRequest(req) for req in tasks]
        pool.wait()
    
    
    # 线程池方式二:
    from multiprocessing.dummy import   Pool as thpool
    
    def thread_pool(item):
        pool = thpool(20)
        pool.map(download_html, ids)
        pool.close()
        pool.join()
    
    
    # 线程池方式三(推荐):
    from concurrent.futures import ThreadPoolExecutor
    
    with ThreadPoolExecutor(max_workers=8) as exe:
        exe.map(download_html,ids)
    

    推荐使用 concurrent.futures 模块,线程池和进程池的接口很相似,方便使用。

    ThreadPoolExecutor 其他方法使用:
    # 其他接口使用:
    from concurrent.futures import ThreadPoolExecutor, as_completed,wait
    
    
    executor = ThreadPoolExecutor(max_workers=8)
    
    # 通过 submit 提交执行的函数到线程中
    task1 = executor.submit(download_html, (1))
    task2 = executor.submit(download_html, (3))
    
    # done() 判断 task 是否完成
    print(task1.done())
    time.sleep(4)
    print(task1.done())
    
    # result() 获取 task 的执行结果 阻塞
    print(task1.result())
    
    # cancel() 取消任务,如果任务在执行中或者执行完了是不能取消的
    # 现在线程池是8 两个任务都会被提交任务去执行,如果 max_workers = 1,执行task2.cancel()就会成功取消
    print(task2.cancel())
    
    
    # as_completed() 获取已经成功的task的返回数据,阻塞
    # as_completed实际上是一个生成器,里面有 yield 会把已经完成的 future (task) 返回结果
    ids = list(range(10))
    all_task = [executor.submit(download_html,(i)) for i in ids]
    time.sleep(8)
    # 这是异步的,谁完成就处理谁
    for future in as_completed(all_task):
        data = future.result()
        print(f'html response {data}')
    
    
    # 通过 executor 获取已经完成的task
    for data in executor.map(download_html,ids):
        print(f'html response {data}')
    
    
    # wait() 等待task完成
    ids = list(range(10))
    all_task = [executor.submit(download_html,(i)) for i in ids]
    
    #  wait 的 return_when 可选项
    FIRST_COMPLETED = 'FIRST_COMPLETED'
    FIRST_EXCEPTION = 'FIRST_EXCEPTION'
    ALL_COMPLETED = 'ALL_COMPLETED'
    _AS_COMPLETED = '_AS_COMPLETED'
    
    wait(all_task, return_when=ALL_COMPLETED)
    
    八:总结

    Python 多线程首选concurrent.futures 中的 ThreadPoolExecutor,使用简单方便,而且切换多进程也是很快速的,后面继续记录多进程方面的知识点。
    代码位置:github.com/rieuse/learnPython

    相关文章

      网友评论

        本文标题:Python 线程、线程通信、多线程

        本文链接:https://www.haomeiwen.com/subject/rwxunftx.html