美文网首页
Python 线程学习

Python 线程学习

作者: vckah | 来源:发表于2018-07-10 17:24 被阅读0次

    Python 有关线程的模块主要有两个,一个是 thread,另一个是 Threadingthread 更趋向于底层,官方建议能力高的人去操作这个模块,对于我等初学者来说,threading 模块已经够我们折腾一阵子了。threading 模块与 Process 模块差不多,只是从结构上来说,用法有点类似。
    其实 Python 的多线程有点鸡肋,准确地说,是 CPython 的多线程有点鸡肋。在 Python 诞生的那个年代,哪有什么多线程技术,一个 CPU 就够搞了,用不到什么特别高深的技术,所以这也算是个历史遗留问题吧。Python 有一个 全局解释器锁机制,就是俗称的 GIL。每个线程在执行的过程中都需要先获取 GIL。简单来说,就是 Python 将所有线程都加了一把锁,保证在同一时刻只能有一条线程去操纵数据。释放 GIL 情况:在 IO 操作等可能会引起阻塞的系统调用之前,可以释放 GIL。所以,Python 在调用一些 C 代码写的程序时,解释器都会释放 GIL。

    from threading import Thread
    
    def func(n):
        pass
    t = Thread(target=func, args=(,))
    t.start()
    t.join()
    
    # 还有类实现的
    class MyThread(Thread):
        def __init__(slef, )
            super().__init__()
            pass
        
        def run(self):
            pass
    

    实现一个 socket 服务端:

    import socket
    from threading import Thread
    
    def chat(conn):
        con.send(b'hellow')
        msg = conn.recv(1024).decode('utf-8')
        print(msg)
        conn.close()
    
    sk = socket.socket()
    sk.bind(('127.0.0.1', 8000))
    sk.listen()
    while True:
        conn, addr = sk.accept()
        Thread(target=chat, args=(conn, )).start()
    sk.close()
    
    • 守护线程
      守护进程随着主进程代码执行结束而结束
      而守护线程会在主线程结束之后等待其他子线程的结束才结束
      t.start() 之前添加 t.daemon=True
    • 线程锁
    from threading import Thread, Lock
    
    def func(lock):
        global n
        lock.acquire()
        n -= 1
        lock.release()
    n = 10
    f_lst = []
    lock = Lock()
    for i in range(10):
        t = Thread(target=func, args=(lock, ))
        t.start()
        t_lst.append(t)
    for t in t_lst:
        t.join()
    print(n)
    

    注意,以上代码并不能确保最后一定能输出 0,因为假设有一个线程取到了数据,在准备向回赋值的时候,突然时间片轮转到了,导致这个线程暂停,另一个线程取到了数据。但是第一个线程还没有写数据,所以数据会污染。解决的办法是加锁即可。

    # 再来看一个哲学家吃饭问题
    from threading import Thread, Lock
    first_lock  = Lock()
    second_lock = Lock()
    def eat1(name):
        first_lock.acquire()
        print('%s拿到第一个筷子啦'%name)
        second_lock .acquire()
        print('%s拿到第二个筷子了'%name)
        print('%s吃面'%name)
        second_lock .release()
        first_lock.release()
    
    def eat2(name):
        second_lock.acquire()
        print('%s拿到第二个筷子啦'%name)
        time.sleep(1)
        first_lock.acquire()
        print('%s拿到第一个筷子了'%name)
        print('%s吃面'%name)
        first_lock.release()
        second_lock .release()
    
    Thread(target=eat1,args=('a',)).start()
    Thread(target=eat2,args=('b',)).start()
    Thread(target=eat1,args=('c',)).start()
    Thread(target=eat2,args=('d',)).start()
    

    以上代码会出现死锁问题。为了解决这个,可以使用递归锁:

    from threading import Thread, RLock
    first_lock = second_lock = RLock()
    def eat1(name):
        first_lock.acquire()
        print('%s拿到第一个筷子啦'%name)
        second_lock .acquire()
        print('%s拿到第二个筷子了'%name)
        print('%s吃面'%name)
        second_lock .release()
        first_lock.release()
    
    def eat2(name):
        second_lock.acquire()
        print('%s拿到第二个筷子啦'%name)
        time.sleep(1)
        first_lock.acquire()
        print('%s拿到第一个筷子了'%name)
        print('%s吃面'%name)
        first_lock.release()
        second_lock .release()
    
    Thread(target=eat1,args=('a',)).start()
    Thread(target=eat2,args=('b',)).start()
    Thread(target=eat1,args=('c',)).start()
    Thread(target=eat2,args=('d',)).start()
    

    这样就不会发生死锁问题了。因为 RLock 内部维护着一个 Lock 和一个 counter 变量,counter 记录了 acquire 的次数,从而使得资源可以被多次 require 。直到一个线程所有的 acquire 都被 release,其他的线程才能获得资源。在同一个线程中使用两把以上的锁时会出现死锁问题,可以使用递归。

    • 事件和信号量
    import time
    from threading import Semaphore, Thread
    def func(sem, a, b):
        sem.acquire()
        time.sleep(1)
        print(a+b)
        sem.release()
    
    sem = Semaphore(4)
    for i in range(10):
        t = Thread(target=func, args=(sem, i, 2*i))
        t.start()
    

    同一时间只能有 n 个线程访问那段代码。

    • 事件
      e = Event()e.is_set() 判断事件是否为真,e.set() 设置事件为真,e.clear() 清除事件状态,保留为真
    #  第一个线程 : 连接数据库
        # 等待一个信号 告诉我我们之间的网络是通的
        # 连接数据库
    #  第二个线程 : 检测与数据库之间的网络是否连通
        # time.sleep(0,2) 2
        # 将事件的状态设置为True
    import time
    import random
    from threading import Thread,Event
    
    def connect_db(e):
        count = 0
        while count < 3:
            e.wait(0.5)   # 状态为False的时候,只等待 0.5s 就结束
            if e.is_set() == True:
                print('连接数据库')
                break
            else:
                count += 1
                print('第%s次连接失败'%count)
        else:
            raise TimeoutError('数据库连接超时')
    
    def check_web(e):
        time.sleep(random.randint(0,3))
        e.set()
    
    e = Event()
    t1 = Thread(target=connect_db,args=(e,))
    t2 = Thread(target=check_web,args=(e,))
    t1.start()
    t2.start()
    
    • 条件锁
      一个条件被创建之初,默认有一个 False 状态。False 状态会影响 wait 一直处于等待状态 notify(int n) 制造 n 个锁。但是线程消耗完锁之后并不归还锁。
    con = Condition()
    conn.acquire()
    conn.notify(n)
    conn.release()
    # 在线程中
    conn.acquire()
    conn.wait()
    conn.release()
    # 要进去的时候需要先获得钥匙,进去之后判断钥匙时候得到
    
    • 定时器
    from threading import Timer
    
    def func():
        print('定时器')
    
    Timer(2, func).start()
    # 过两秒过后会打印 定时器
    
    • 队列
      主要是为了线程间的数据安全问题。内置了锁,保证线程安全
    # Python 3 中的
    import queue
    # Python 2 中的
    import Queue
    q = queue.Queue()    # 先进先出
    q.get()
    q.put()
    q.put_nowait()    # 没有阻塞
    q.get_nowait()    # 没有阻塞
    
    q = queue.LifoQueue()    # 栈 先进后出
    q = queue.PriorityQueue() # 优先级队列
    q.put((10, 'a'))
    q.put((20, 'b'))    # 越小优先级越高,负数也可以
    # 优先级一样,看第二项
    

    线程池

    concurrent.futures 这是 Python 3 提供的。里面提供了线程池和进程池,接口完全一样,只是本质上不同。

    import time
    from concurrent.futures import ThreadPoolExecutor
    
    def func(n)
        time.sleep(2)
        print(n)          --> 打印的时候不是固定的,因为线程开启的事件不同
        return n*n
    
    tpool = ThreadPoolExecutor(max_workers=n)    #默认不要超过 n = CPU 个数 * 5
    tlst = []
    for i in range(20):
        t =  tpool.submit(func, 1)      --> 异步的
        tlist.append(t)
    # tpool .shutdown() --> 完成 close + join 的工作
    print('over')
    for i in tlist:
        print(t.result())      --> 获取返回值    一定按照顺序打印,因为 tlist 里面顺序是固定的
    
    # 如果使用 map 
    tpool.map(func, range(20))    # 拿不到返回值
    

    另外它也有 回调函数的用法
    tpool.submit(func, 1).add_done_callback(call_back)
    线程池的缺点:
    任务队列是无界的,所以需要控制好。如果队列的生产者任务生产的太快,而线程池消费太慢处理不过来,任务就会堆积。如果堆积一直持续下去,内存就会持续增长直到OOM,任务队列里堆积的所有任务全部彻底丢失。
    另外如果要查看成功的任务:

    from concurrent.futures import ThreadPoolExecutor, as_completed
    
    executor = ThreadPoolExecutor(max_workers=2)
    urls = []
    all_task = [executor.submit(function, (url)) for url in urls]
    for future in as_completed(all_task):
        data = future.result()
        print('success')
        # 顺序不固定,谁先完成就处理谁
    
    # 还有一种方法,只不过这种方法是严格按照同步顺序来的
    for data in executor.map(function, urls):
        print(data)
    顺序严格按照 urls 的顺序来的
    

    另外还有一个 wait

    from concurrent.futures import wait
    wait(all_task, return_when=)
    这里 return_when 有第一个完成后,所有完成后等。
    阻塞某一个 task 或者一些 task 完成
    

    相关文章

      网友评论

          本文标题:Python 线程学习

          本文链接:https://www.haomeiwen.com/subject/dxlqpftx.html