美文网首页
python线程的实现,线程池

python线程的实现,线程池

作者: 昆仑草莽 | 来源:发表于2019-05-07 09:28 被阅读0次

    python通过标准库threading实现多线程的运行。
    程序的运行总要考虑并发,并行数。在多线程程序中为了确保程序在运行中出现争抢资源的现象,使用线程锁或者线程池来规避资源的争抢。

    线程的实现

    Python通过两个标准库_thread 和threading,提供对线程的支持 , threading对_thread进行了封装。threading模块中提供了Thread , Lock ,RLock , Condition等组件
    因此在实际的使用中我们一般都是使用threading
    Thread类的说明:
    参数说明:

    参数 描述
    target 表示调用对象,即子线程要执行的任务
    name 子线程的名称
    args 传入target函数的位置参数,是一个元组,参数后面必须加 逗号

    常用实例方法:

    实例方法 描述
    Thread.run(self) 线程启动时运行的方法,由该方法调用target参数所指定的函数
    Thread.start(self) 启动线程,start就是帮你去调用run方法
    Thread.terminate(self) 强制终止线程
    Thread.join(self,timeout=None) 阻塞调用,主线程进行等待
    Thread.setDeamon(self,deamonic) 将子线程设置为守护线程
    Thread.getname(self,name) 获取线程名称
    Thread.setname(self,name) 设置线程名称

    创建线程:
    在python中由两种方法创建线程,实例Thread类和重写Thread类
    1、实例Thread类

    import threading
    import time
    #定义线程要运行的函数
    def thread_print(startname,endname):
        print('我是: {}'.format(startname))
        time.sleep(2) #为了便于观察,使程序睡2秒
        print('线程结束')
    
    #建立线程实例,args是一个元组,必须加逗号
    t1 = threading.Thread(target=thread_print,args=('一','二'))
    t2 = threading.Thread(target=thread_print,args=('开始',0))
    t1.setDaemon(True)
    #t2.setDaemon(True)
    t1.start()
    t2.start()
    输出:
    我是: 一
    我是: 开始 #睡眠2秒
    线程结束
    线程结束
    Process finished with exit code 0
    

    2、继承Thread类

    import threading
    import time
    
    #继承threading类中的Thread类
    class MyThread(threading.Thread):
       # 线程中需要的参数
       def __init__(self,name):
           super().__init__()
           self.name = name
       #重构run方法
       def run(self):
           print('I am {}'.format(self.name))
           time.sleep(3)
    
    #创建实例化线程
    t1 = MyThread('apple')
    t2 = MyThread('banana')
    #启动线程,调用类中的run方法
    t1.start()
    t2.start()
    #获取线程名称
    print(t1.getName())
    print(t2.getName())
    输出:
    I am apple
    I am banana
    apple
    banana
    Process finished with exit code 0
    

    在知道了这两种方法后,我们来看子线程和主线程以及一些线程定义
    主线程:当一个程序启动时 , 就有一个线程开始运行 , 该线程通常叫做程序的主线程
    子线程:因为程序是开始时就执行的 , 如果你需要再创建线程 , 那么创建的线程就是这个主线程的子线程
    主线程的重要性体现在两方面 :
    1、是产生其他子线程的线程
    2、通常它必须最后完成执行比如执行各种关闭操作
    join:阻塞调用程序 , 直到调用join () 方法的线程执行结束, 才会继续往下执行

    import threading
    import time
    
    #继承threading类中的Thread类
    class MyThread(threading.Thread):
        # 线程中需要的参数
        def __init__(self,name):
            super().__init__()
            self.name = name
        #重构run方法
        def run(self):
            print('I am {}'.format(self.name))
            time.sleep(3)
            print('子线程结束!!!')
    
    #创建实例化线程
    t1 = MyThread('apple')
    #启动线程,调用类中的run方法
    t1.start()
    t1.join() #只有等待子线程结束,主线程才能结束
    print('主线程结束!!!')
    输出:
    I am apple
    子线程结束!!!
    主线程结束!!!
    Process finished with exit code 0
    

    setDeamon():setDaemon() 与 join() 基本上是相对的 , join会等子线程执行完毕 ; 而setDaemon则不会等

    import threading
    import time
    
    #继承threading类中的Thread类
    class MyThread(threading.Thread):
        # 线程中需要的参数
        def __init__(self,name):
            super().__init__()
            self.name = name
        #重构run方法
        def run(self):
            print('I am {}'.format(self.name))
            time.sleep(3)
            print('子线程结束!!!')
    
    #创建实例化线程
    t1 = MyThread('apple')
    #启动线程,调用类中的run方法
    t1.setDaemon(True) #放在子线程启动之前,否则会报错
    t1.start()
    print('主线程结束!!!')
    输出:
    I am apple  #子线程没有结束,主线程就已经结束
    主线程结束!!!
    Process finished with exit code 0
    

    线程间的通信

    互斥锁:在多线程中 , 所有变量对于所有线程都是共享的 , 因此 , 线程之间共享数据最大的危险在于多个线程同时修改一个变量 , 那就乱套了 , 所以我们需要互斥锁 , 来锁住数据。
    提示!
    因为线程属于同一个进程,因此它们之间共享内存区域。因此全局变量是公共的。

    from threading import Thread
    
    a = 1
    def func():
        global a
        a = 2
    
    t = Thread(target=func())
    t.start()
    t.join()
    print(a)
    输出:
    2
    
    Process finished with exit code 0
    

    上面是一个共享参数,也就是共享内存的问题,多线程下,共享内存会出现互相竞争的问题

    from threading import Thread
    
    a = 0
    def add_func():
        global a
        for i in range(1000000):
            a += 1
    def sub_func():
        global a
        for i in range(1000000):
            a -= 1
    
    t_add = Thread(target=add_func)
    t_sub = Thread(target=sub_func)
    t_add.start()
    t_sub.start()
    t_add.join()
    t_sub.join()
    print(a)
    输出说明:
    当取值小于10000时,结果为0 是正常的
    当取值大于10000时,结果就会别的数,有时为正,有时为负数,这就说明这两个线程在互相抢占资源造成结果的不正确。
    

    使用锁来控制共享资源的访问

    from threading import Thread,Lock
    
    a = 0
    lock = Lock()
    def add_func():
        global a
        for i in range(1000000):
            lock.acquire()
            a += 1
            lock.release()
    def sub_func():
        global a
        for i in range(1000000):
            lock.acquire()
            a -= 1
            lock.release()
    
    t_add = Thread(target=add_func)
    t_sub = Thread(target=sub_func)
    t_add.start()
    t_sub.start()
    t_add.join()
    t_sub.join()
    print(a)
    输出:
    0
    
    Process finished with exit code 0
    

    或者

    from threading import Thread,Lock
    
    a = 0
    lock = Lock()
    def add_func():
        global a
        for i in range(1000000):
            with lock: #上下文管理器,会自动关闭锁
                a += 1
    def sub_func():
        global a
        for i in range(1000000):
            with lock:
                a -= 1
    
    t_add = Thread(target=add_func)
    t_sub = Thread(target=sub_func)
    t_add.start()
    t_sub.start()
    t_add.join()
    t_sub.join()
    print(a)
    输出:
    0
    
    Process finished with exit code 0
    

    但是,加锁的缺点是程序会运行非常缓慢。

    队列:队列就好像是排队,有一个入口,一个出口,先入先出。 线程队列操作命令:
    操作命令 描述
    put(item) 入队
    get() 出队
    empty() 测试空 #近似
    full() 测试满 #近似
    qsize() 队列长度
    task_done() 任务完成
    join() 等待完成 (此处的join和线程的阻塞不是一回事)
    from threading import Thread
    from queue import Queue
    from random import randint
    
    #创建队列,指定长度
    my_q = Queue(10)
    
    def func_put(q):
        '''生产数据'''
        for i in range(10):
            num = randint(0,100)
            q.put(num)
    
    def func_get(qq):
        '''取出数据'''
        for j in range(5): #每次取出5个数值
            num = qq.get()
            print(num)
    
    
    t1 = Thread(target=func_put, args=(my_q,))
    t2 = Thread(target=func_get, args=(my_q,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    输出:
    48
    10
    78
    41
    27
    Process finished with exit code 0
    

    队列保证了数据的安全性,没有争抢资源的现象。是按照顺序依次取出的。

    from queue import Queue
    
    my_q = Queue(4)
    my_q.put(1,)
    print(my_q.qsize())
    my_q.get()
    print(my_q.qsize())
    print(my_q.empty())
    my_q.put(1,)
    my_q.put(1,)
    my_q.put(1,)
    my_q.put(1,)
    print(my_q.full())
    my_q.task_done()
    my_q.task_done()
    my_q.task_done()
    my_q.task_done()
    my_q.task_done()#每put一次,就需要任务完成一次
    my_q.join() #检测put和task_dane()是否相同,不相同就会阻塞,相同就会完成
    print('ok')
    输出:
    1
    0
    True
    True
    ok
    Process finished with exit code 0
    

    线程池

    线程池:

    主线程: 相当于生产者,只管向线程池提交任务。并不关心线程池是如何执行任务的。因此,并不关心是哪一个线程执行的这个任务
    线程池: 相当于消费者,负责接收任务,并将任务分配到一个空闲的线程中去执行

    线程的简单实现:
    from threading import Thread
    from queue import Queue
    import time
    
    class ThreadPool:
        def __init__(self,n):
            self.queue = Queue()
            for i in range(n):
                Thread(target=self.worker).start()
    
        def worker(self):
            while True:
                func,args,kwargs = self.queue.get()
                func(*args,**kwargs)
                self.queue.task_done()
        def apply_sy(self,func,args = (),kwargs = {}):
            self.queue.put((func,args,kwargs))
        def join(self):
            self.queue.join()
    
    def t1():
        print('任务1')
        time.sleep(2)
        print('任务1完成')
    def t2 (*args,**kwargs):
        print('任务2',args,kwargs)
        time.sleep(2)
        print('任务2完成',args,kwargs)
    
    pool = ThreadPool(4)
    
    pool.apply_sy(t1)
    pool.apply_sy(t2,args = (1,2),kwargs = {'a':3,'b':4})
    print('任务提交')
    pool.join()
    print('任务完成')
    输出:
    任务提交
    任务1任务2 
    (1, 2) {'a': 3, 'b': 4}
    任务2完成 任务1完成
    (1, 2) {'a': 3, 'b': 4}
    任务完成
    Process finished with exit code -1
    

    python内置线程池:

    from multiprocessing.pool import ThreadPool
    import time
    
    def t1():
        print('任务1')
        time.sleep(2)
        print('任务1完成')
    def t2 (*args,**kwargs):
        print('任务2',args,kwargs)
        time.sleep(2)
        print('任务2完成',args,kwargs)
    
    pool = ThreadPool(4)
    
    pool.apply_async(t1)
    pool.apply_async(t2,args = (1,2),kwds = {'a':3,'b':4})
    print('任务提交')
    pool.close() #在join前必须要有close,这样就不允许提交任务了
    pool.join()
    print('任务完成')
    输出:
    任务提交任务1
    任务2 (1, 2) {'a': 3, 'b': 4}
    
    任务2完成任务1完成
     (1, 2) {'a': 3, 'b': 4}
    任务完成
    Process finished with exit code 0
    

    池的其他操作:
    1、关闭操作:close -关闭提交通道,不允许在提交任务
    2、终止操作:terminate -终止进程池,终止所有任务。

    from multiprocessing.pool import ThreadPool
    import time
    
    def t1():
        print('任务1')
        time.sleep(2)
        print('任务1完成')
    def t2 (*args,**kwargs):
        print('任务2',args,kwargs)
        time.sleep(2)
        print('任务2完成',args,kwargs)
    
    pool = ThreadPool(4)
    
    pool.apply_async(t1)
    pool.apply_async(t2,args = (1,2),kwds = {'a':3,'b':4})
    print('任务提交')
    pool.terminate() #终止进程池,终止所有任务
    pool.join()
    print('任务完成')
    输出:
    任务提交
    任务完成
    Process finished with exit code 0
    

    相关文章

      网友评论

          本文标题:python线程的实现,线程池

          本文链接:https://www.haomeiwen.com/subject/lorhoqtx.html