美文网首页
python线程的一些知识

python线程的一些知识

作者: DayBreakL | 来源:发表于2020-05-16 11:31 被阅读0次

python threading模块

import threading
import time

def thread_demo():
    print("===线程,来自方法==={}".format(threading.current_thread()))
    sum = 0
    for i in range(1000):
        time.sleep(0.01) # 测试子线程
        sum += i

    print(sum)

print("===线程,创建子线程前==={}".format(threading.current_thread()))
# 创建线程
t1 = threading.Thread(target=thread_demo)  # 参数target 指定要执行的任务;参数args 指定方法传递的参数
t2 = threading.Thread(target=thread_demo)
# 启动创建的线程,让线程去执行任务
t1.start()
t2.start()
print("====线程数===={}".format(threading.enumerate()))
print("====线程数===={}".format(threading.active_count()))
# 阻塞当前线程,默认等待指定的线程执行结束
t1.join() # 等t1回来再继续往下执行
t2.join() # 等t2回来再继续往下执行
print("===线程==={}".format(threading.current_thread()))
# 主线程就是代码从上往下运行
print("主线程结束")

重写threading.Thread来实现:

import threading
import time

def thread_demo():
    print("===线程,来自方法==={}".format(threading.current_thread()))
    sum = 0
    for i in range(1000):
        time.sleep(0.01) # 测试子线程
        sum += i

    print(sum)
# 重写threading,继承threading.Thread
class MyThread(threading.Thread):

    # 重写run方法,待执行的任务
    def run(self):
        thread_demo() #

print("===线程,创建子线程前==={}".format(threading.current_thread()))
# 创建线程
m1=MyThread()
m2=MyThread()
# 运行线程
m1.start()
m2.start()
print("====线程数===={}".format(threading.enumerate()))
print("====线程数===={}".format(threading.active_count()))
# 阻塞当前线程,默认等待指定的线程执行结束
m1.join()
m2.join()
print("主线程结束")

守护线程:

# 守护线程
def thread_demo():
    print("===线程,来自方法==={}".format(threading.current_thread()))
    sum = 0
    for i in range(1000):
        time.sleep(0.01) # 测试子线程
        sum += i

    print(sum)


t1=threading.Thread(target=thread_demo)
t1.daemon=True # 表示当前线程就是t1的守护线程 ,通过注释更好理解,主线程结束,守护线程也会结束
t1.start()

time.sleep(2)
print("主线程结束")

线程的生命周期:


  • 实例化threading.Thread时,创建了线程 --->新建
  • start() 准备干活-->就绪 ,等待CPU的调度
  • cpu的切换分为2种:
    1. 就绪 <---> 运行 反复切换 (高频率计算的)
    2. 就绪<--->运行--->阻塞--->就绪--->运行。对于一些需要等待结果的,比如等待数据库返回、比如time.sleep(),cpu就会把状态由运行改为阻塞,不会再去调度你。
  • 直到你等待的结果回来,比如数据库返回了、sleep()结束。你的状态由阻塞变为就绪,等待cpu重新调度。
  • 运行-->死亡 计算结束、报异常等,线程死亡

同步锁和互斥锁

a = 0
def add_one():
    global a
    for i in range(10000000):
        b = a + 1
        a = b


# 创建线程
thread_list = [threading.Thread(target=add_one) for i in range(5)]

for i in thread_list:
    i.start()

for i in thread_list:
    i.join()

print(a)  # 远小于500000000,
为什么a远小于500000000
  • e.g. 线程1 b=1000 线程2 b=2000 线程3 b=3000 线程4 b=4000 线程5 b=5000 ,此时cpu再开始到线程1 a=b=1000,那么相当于后面线程的都是无用功
  • 怎么办呢?加锁
互斥锁
a = 0

lock=threading.Lock() # 创建一把锁 互斥锁
def add_one():
    # 加锁
    lock.acquire() # 已锁定,其他线程就无法加锁成功
    global a
    for i in range(10000000):
        b = a + 1
        a = b
    lock.release() # 结束锁定

#  创建线程
thread_list = [threading.Thread(target=add_one) for i in range(5)]

for i in thread_list:
    i.start()

for i in thread_list:
    i.join()

print(a)  # 500000000,
死锁
a = 0

lock=threading.Lock() # 创建一把锁 互斥锁
def add_one():
    # 加锁
    lock.acquire() # 已锁定,其他线程就无法加锁成功
    global a
    for i in range(10000000):
        b = a + 1
        raise NameError("线程挂掉了") # 假设在加锁后、未释放锁之前挂掉了
        a = b
    lock.release() # 结束锁定

#  创建线程
thread_list = [threading.Thread(target=add_one) for i in range(5)]

for i in thread_list:
    i.start()

for i in thread_list:
    i.join()

print(a)  # 500000000,

加了代码raise NameError("线程挂掉了") 模拟线程在加锁后、未释放锁之前挂掉了,那么程序会一直卡在那里,该线程无法继续,其他线程要等待加锁的线程释放,导致也无法运行。这种情况就是死锁。

通过try... finally... ,无论结果如何,都要释放锁,这样程序不会卡在那里

lock=threading.Lock() # 创建一把锁 互斥锁
def add_one():
    # 加锁
    lock.acquire() # 已锁定,其他线程就无法加锁成功
    try:
        global a
        for i in range(10000000):
            b = a + 1
            raise NameError("线程挂掉了")
            a = b
    finally:
        lock.release() # 结束锁定
逻辑死锁
a = 0

lock_1=threading.Lock() # 互斥锁
lock_2=threading.Lock() # 互斥锁

def add_one():

    print("线程{}进入方法add_one".format(threading.current_thread()))
    lock_1.acquire() # 线程2 lock_1 加锁
    lock_2.acquire() # 
    print("线程{}开始计算".format(threading.current_thread()))
    lock_1.release()  # 
    time.sleep(0.1) # 线程1 阻塞  lock_2锁定,休眠时间过了之后,lock_1加锁,但lock_1已被占用,
    lock_1.acquire()
    global a
    for i in range(10000000):
        b = a + 1
        raise NameError("线程挂掉了")
        a = b

    lock_2.acquire()
    lock_1.release()
    print("线程{}退出方法add_one".format(threading.current_thread()))


# 创建线程
thread_list = [threading.Thread(target=add_one) for i in range(5)]

for i in thread_list:
    i.start()

for i in thread_list:
    i.join()

print(a)  

锁的滥用容易出现逻辑死锁,故应避免使用多把锁。

线程间的通信

线程间的通信:

Condition线程通信

消费者-生产者设计模式
con=threading.Condition() # Condition默认自带互斥锁

ball=[]
def set_data():

    for i in range(10):
        con.acquire()
        for j in range(20):
            ball.append("丸子")
        print("第{}个丸子放进去了,吃货们快来吃".format(len(ball)))
        con.notify_all() # 激活其他所有线程进入就绪状态
        con.wait() # wait()释放当前锁,并进入阻塞状态

def get_data():
    while True:
        con.acquire() #加锁
        time.sleep(0.5)
        if(len(ball)==0):
            print("锅里没丸子了")
            con.notify_all()
            con.wait()
        else:
            ball.pop()
            print("吃货{}夹走了一个丸子,锅里还剩下{}个丸子".format(threading.current_thread(),len(ball)))
        con.release()

product=threading.Thread(target=set_data)
customers=[threading.Thread(target=get_data) for i in range(3)]

product.start()
for i in customers:
    i.start()

product.join()

Queue队列实现线程通信


con=threading.Condition() # Condition默认自带互斥锁

# 用队列储存丸子
q=queue.Queue(maxsize=10) # 创建队列
def set_data():

    for i in range(10):
        con.acquire()
        for j in range(20):
            if q.full(): # 判断队列是否已满

                print("当前有{}个丸子,吃货们来吃吧".format(q.qsize())) #q.qsize()返回队列中数据的个数
                con.notify_all()  # 激活其他所有线程进入就绪状态
                con.wait()  # wait()释放当前锁,并进入阻塞状态
            else:
                q.put("丸子") # 往队列里放东西



def get_data():

    while True:
        con.acquire() #加锁
        time.sleep(0.5)
        if q.empty(): #判断队列是否为空
            print("锅里没丸子了")
            con.notify_all()
            con.wait()
        else:
            q.get()  # 获取队列中的数据
            print("吃货{}夹走了一个丸子,锅里还剩下{}个丸子".format(threading.current_thread(),q.qsize()))
        con.release()

product=threading.Thread(target=set_data)
customers=[threading.Thread(target=get_data) for i in range(3)]

product.start()
for i in customers:
    i.start()

product.join()

通过Event实现通信 Event可以代替Condition

product_env=threading.Event() # 生产者和消费者不能共用一个evn
customer_env=threading.Event()

# 用队列储存丸子
q=queue.Queue(maxsize=10) # 创建队列
def set_data():

    for i in range(10):
        for j in range(20):
            if q.full(): # 判断队列是否已满

                print("当前有{}个丸子,吃货们来吃吧".format(q.qsize())) #q.qsize()返回队列中数据的个数
                customer_env.set() # 通知消费者
                product_env.clear() # 把生产者线程里的flag设置为 False
                product_env.wait() # 阻塞当前线程
            else:
                q.put("丸子") # 往队列里放东西



def get_data():

    while True:
        time.sleep(0.5)
        if q.empty(): #判断队列是否为空
            print("锅里没丸子了")
            product_env.set()
            customer_env.clear()
            customer_env.wait()
        else:
            q.get()  # 获取队列中的数据
            print("吃货{}夹走了一个丸子,锅里还剩下{}个丸子".format(threading.current_thread(),q.qsize()))


product=threading.Thread(target=set_data)
customers=[threading.Thread(target=get_data) for i in range(3)]

product.start()
for i in customers:
    i.start()

product.join()

线程池

线程池
product_env=threading.Event() # 生产者和消费者不能共用一个evn
customer_env=threading.Event()

q=queue.Queue(maxsize=10) # 创建队列
def set_data():

    for i in range(10):
        for j in range(20):
            if q.full(): # 判断队列是否已满

                print("当前有{}个丸子,吃货们来吃吧".format(q.qsize())) #q.qsize()返回队列中数据的个数
                customer_env.set() # 通知消费者
                product_env.clear() # 把生产者线程里的flag设置为 False
                product_env.wait() # 阻塞当前线程
            else:
                q.put("丸子") # 往队列里放东西
    return "没丸子,别吃了"

def get_data():

    while True:
        time.sleep(0.5)
        if q.empty(): #判断队列是否为空
            print("锅里没丸子了")
            product_env.set()
            customer_env.clear()
            customer_env.wait()
        else:
            q.get()  # 获取队列中的数据
            print("吃货{}夹走了一个丸子,锅里还剩下{}个丸子".format(threading.current_thread(),q.qsize()))

    def get_result(feature):
        print(feature.result())

pool=ThreadPoolExecutor(max_workers=4) # 创建线程池,max_workers 指定最大线程数
product_future=pool.submit(set_data)# 往线程池中添加任务,并激活线程中的线程,即改为就绪状态
product_future.add_done_callback() # 等同于result()
customer_future1=pool.submit(get_data) #
customer_future2=pool.submit(get_data)
customer_future3=pool.submit(get_data)

# res=product_future.result()  #result方法,获取程序的返回值
# print(res)

pool.shutdown() # 关闭线程池

map方法:

# 使用map方法,map使用于有参的方法
product_env=threading.Event() # 生产者和消费者不能共用一个evn
customer_env=threading.Event()

q=queue.Queue(maxsize=10) # 创建队列
def set_data(d=None):

    for i in range(10):
        for j in range(20):
            if q.full(): # 判断队列是否已满

                print("当前有{}个丸子,吃货们来吃吧".format(q.qsize())) #q.qsize()返回队列中数据的个数
                customer_env.set() # 通知消费者
                product_env.clear() # 把生产者线程里的flag设置为 False
                product_env.wait() # 阻塞当前线程
            else:
                q.put("丸子") # 往队列里放东西
    return "没丸子,别吃了"

def get_data(d=None):

    while True:
        time.sleep(0.5)
        if q.empty(): #判断队列是否为空
            print("锅里没丸子了")
            product_env.set()
            customer_env.clear()
            customer_env.wait()
        else:
            q.get()  # 获取队列中的数据
            print("吃货{}夹走了一个丸子,锅里还剩下{}个丸子".format(threading.current_thread(),q.qsize()))

    def get_result(feature):
        print(feature.result())

pool=ThreadPoolExecutor(max_workers=4) # 创建线程池,max_workers 指定最大线程数

product=pool.map(set_data,(None,))
customer=pool.map(get_data,(None,None,None)) #启动三次

# 获取返回值
print(next(product))

pool.shutdown() # 关闭线程池

相关文章

网友评论

      本文标题:python线程的一些知识

      本文链接:https://www.haomeiwen.com/subject/ljulohtx.html