1、多线程-threading
python的thread模块是比较底层的模块,python的threading模块是对thread做了一些包装的,可以更加方便的被使用
- 多线程执行
from threading import Thread
from time import sleep
def saySorry():
print('亲爱的,我错了,我能吃饭了么?')
sleep(1)
if __name__ == '__main__':
for i in range(5):
t = Thread(target=saySorry)
t.start() #启动线程,即让线程开始执行
- 主线程等待所有的子线程结束后才结束
from threading import Thread
from time import sleep,ctime
def sing():
for i in range(3):
print('正在唱歌……%s'%i)
sleep(1)
def dance():
for i in range(3):
print('正在跳舞……%s'%i)
sleep(1)
if __name__ == '__main__':
print('开始了……%s'%ctime())
t1 = Thread(target=sing)
t2 = Thread(target=dance)
t1.start()
t2.start()
sleep(5)
print('结束了……%s' % ctime())
运行结果:
开始了……Tue Jul 25 17:57:54 2017
正在唱歌……0
正在跳舞……0
正在唱歌……1
正在跳舞……1
正在跳舞……2
正在唱歌……2
结束了……Tue Jul 25 17:57:59 2017
2、线程类的封装
通过上一小节,能够看出,通过使用threading模块能完成多任务的程序开发,为了让每个线程的封装性更完美,所以使用threading模块时,往往会定义一个新的子类class,只要继承 threading.Thread 就可以了,然后重写 run 方法
from threading import Thread
from time import sleep
#唱歌的类
class sing(Thread):
def __init__(self,name):
Thread.__init__(self)
self.name = name
def run(self):
for i in range(5):
print("%s sing..."%self.name)
sleep(1)
class dance(Thread):
def __init__(self, name):
Thread.__init__(self)
self.name = name
def run(self):
for i in range(5):
print("%s dance..." % self.name)
sleep(1)
if __name__ == '__main__':
t1 = sing('gx')
t2 = dance('gx')
t1.start()
t2.start()
运行结果:
gx sing...
gx dance...
gx sing...
gx dance...
……
import os
import time
from threading import Thread,enumerate
def myRun():
print('线程运行中...')
time.sleep(3)
print('线程结束...')
class MyThread(Thread):
def __init__(self,tname):
Thread.__init__(self,name = tname)
def run(self):
self.myRun()
def myRun(self):
print('线程运行中...')
time.sleep(3)
print('线程结束...')
if __name__ == '__main__':
print('进程 %d.' % os.getpid())
t1 = MyThread('老王的线程')
t1.start()
t1.join()
print(t1.getName())
print(enumerate())
print('game over...')
运行结果:
进程 11276.
线程运行中...
线程结束...
老王的线程
[<_MainThread(MainThread, started 10496)>]
game over...
说明:
python的threading.Thread类有一个run方法,用于定义线程的功能函数可以在自己的线程类中覆盖该方法。而创建自己的线程实例后,通过Thread类的start方法,可以启动该线程,交给python虚拟机进行调度,当该线程获得执行的机会时就会调用run方法执行线程。
总结
- 每个线程一定会有一个名字,尽管上面的例子中没有指定线程对象的name,但是python会自动为线程指定一个名字。
- 当线程的run()方法结束时该线程完成。
- 无法控制线程调度程序,但可以通过别的方式来影响线程调度的方式。
- 线程的几种状态
3、同步
同步就是协同步调,按预定的先后次序进行运行。如:你说完,我再说。
from threading import Thread
from time import sleep
def sing(name, ls):
for item in ls:
print("%s sing... %s"%(name,item))
sleep(1)
def kcup(name, ls):
for item in ls:
print("zm:%s:%s"%(name,item))
sleep(1)
if __name__ == '__main__':
f = open('song.txt', 'r', encoding='utf-8')
content = f.readlines()
f.close()
#print(content)
t1 = Thread(target=sing,args=('lzy',content))
t2 = Thread(target=kcup,args=('lzy',content))
t1.start()
t2.start()
4、多线程共享全局变量
4.1全局变量
from threading import Thread
import time
g_num = 100
def work1():
global g_num
for i in range(3):
g_num += 1
print("----in work1, g_num is %d---"%g_num)
def work2():
#global g_num
print("----in work2, g_num is %d---"%g_num)
print("---线程创建之前g_num is %d---"%g_num)
t1 = Thread(target=work1)
t1.start()
#延时一会,保证t1线程中的事情做完
time.sleep(1)
t2 = Thread(target=work2)
t2.start()
运行结果:
---线程创建之前g_num is 100---
----in work1, g_num is 103---
----in work2, g_num is 103---
4.2列表作为实参
from threading import Thread
import time
def work1(nums):
nums.append(44)
print("----in work1---",nums)
def work2(nums):
#延时⼀会, 保证t1线程中的事情做完
time.sleep(1)
print("----in work2---",nums)
g_nums = [11,22,33]
t1 = Thread(target=work1, args=(g_nums,))
t1.start()
t2 = Thread(target=work2, args=(g_nums,))
t2.start()
运行结果:
----in work1--- [11, 22, 33, 44]
----in work2--- [11, 22, 33, 44]
总结:
- 在一个进程内的所有线程共享全局变量,能够完成多线程之间的数据共享(这点要比多进程要好)
- 缺点就是,线程是对全局变量随意修改可能造成多线程之间对全局变量的混乱( 即线程非安全)
4.3多线程操作全局变量——互斥锁
当多个线程一乎同时修改某一个共享数据的时候, 需要进行同步控制
线程同步能够保证多个线程安全访问竞争资源, 最简单的同步机制是引入互斥锁。
互斥锁为资源引入一个状态: 锁定/非锁定。
某个线程要更改共享数据时, 先将其锁定, 此时资源的状态为“锁定”, 其他线程不能更改; 直到该线程释放资源, 将资源的状态变成“非锁定”, 其他的线程才能再次锁定该资源。 互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。
threading模块中定义了Lock类, 可以方便的处理锁定:
创建锁
mutex = threading.Lock()
锁定
mutex.acquire([blocking])
释放
mutex.release()
其中, 锁定方法acquire可以有一个blocking参数。
如果设定blocking为True, 则当前线程会堵塞, 直到获取到这个锁为止
( 如果没有指定, 那么默认为True)
如果设定blocking为False, 则当前线程不会堵塞
使用互斥锁实现上面的例子的代码如下:
from threading import Thread,enumerate,Lock
import time
g_num = 0
# 创建一个互斥锁
# 这个锁默认是未上锁的状态
g_lock = Lock()
def test1():
global g_num
for i in range(1000000):
#True表示堵塞 即如果这个锁在上锁之前已经被上锁了, 那么这个线程会在这里一直等
#False表示非堵塞, 即不管本次调⽤能够成功上锁, 都不会卡在这,而是继续执行下面的
isLock = g_lock.acquire()
if isLock:
g_num += 1
g_lock.release()
print("---test1---g_num=%d"%g_num) #这个在循环外!!!
def test2():
global g_num
for i in range(1000000):
isLock = g_lock.acquire()
if isLock:
g_num += 1
g_lock.release()
print("---test2---g_num=%d"%g_num)
p1 = Thread(target=test1)
p1.start()
p2 = Thread(target=test2)
p2.start()
print(enumerate())
print("---g_num=%d---"%g_num)
运行结果:
[<Thread(Thread-1, started 1632)>, <Thread(Thread-2, started 4808)>, <_MainThread(MainThread, started 7156)>]
---g_num=32952---
---test2---g_num=1757467
---test1---g_num=2000000
from threading import Thread,enumerate,Lock
import time
g_num = 0
g_lock = Lock()
def test1():
global g_num
isLock = g_lock.acquire()
if isLock:
for i in range(1000000):
g_num += 1
print("---test1---g_num=%d" % g_num) #这句话放到循环中,最后运行结果就是想要的了
g_lock.release()
def test2():
global g_num
isLock = g_lock.acquire()
if isLock:
for i in range(1000000):
g_num += 1
print("---test2---g_num=%d" % g_num)
g_lock.release()
else:
print('---test2---没有获取到锁,那我就走了')
p1 = Thread(target=test1)
p1.start()
p2 = Thread(target=test2)
p2.start()
print(enumerate())
print("---g_num=%d---"%g_num)
运行结果:
[<Thread(Thread-1, started 3760)>, <Thread(Thread-2, started 3780)>, <_MainThread(MainThread, started 10068)>]
---g_num=183906---
---test1---g_num=1000000
---test2---g_num=2000000
可以看到, 加入互斥锁后, 运行结果与预期相符。
上锁解锁过程
当一个线程调用锁的acquire()方法获得锁时, 锁就进入“locked”状态。
每次只有一个线程可以获得锁。 如果此时另一个线程试图获得这个锁, 该线程就会变为“blocked”状态, 称为“阻塞”, 直到拥有锁的线程调用锁的release()方法释放锁之后, 锁进入“unlocked”状态。
线程调度程序从处于同步阻塞状态的线程中选择一个来获得锁, 并使得该线程进入运行( running) 状态。
总结
锁的好处:
确保了某段关键代码只能由一个线程从头到尾完整地执行
锁的坏处:
阻止了多线程并发执行, 包含锁的某段代码实际上只能以单线程模式执行, 效率就大大地下降了
由于可以存在多个锁, 不同的线程持有不同的锁, 并试图获取对方持有的锁时, 可能会造成死锁
4.4死锁
在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁。
尽管死锁很少发生, 但一旦发生就会造成应用的停止响应。 下面看一个死锁的例子
from threading import Thread,Lock
from time import sleep
#美国
class America(Thread):
def __init__(self):
Thread.__init__(self)
def run(self):
while True:
if mutex1.acquire():
print("军事演戏...")
sleep(1)
if mutex2.acquire():
print('放弃军演')
mutex1.release()
#朝鲜
class NorthKorea(Thread):
def __init__(self):
Thread.__init__(self)
def run(self):
while True:
flag = mutex2.acquire()
if flag:
print("核武开发")
sleep(1)
if mutex1.acquire():
print('放弃核武开发')
mutex2.release()
if __name__ == '__main__':
#lock:军事威胁 unlock:取消军事威胁
mutex1 = Lock()
#lock:开发核武 unlock:放弃核武
mutex2 = Lock()
#创建线程一
t1 = America()
#创建线程二
t2 = NorthKorea()
#启动线程
t1.start()
t2.start()
运行代码:
军事演戏...
核武开发
避免死锁:
程序设计时要尽量避免( 银行家算法),
添加超时时间等
5、多线程-局部变量非共享数据
对于全局变量, 在多线程中要格外小心,否则容易造成数据错乱的情况发生
非全局变量是否要加锁呢?看下面一个例子
from threading import Thread
from time import sleep
class MyThread(Thread):
def __init__(self,num):
Thread.__init__(self)
self.num = num
def run(self):
self.num += 1
sleep(1)
print('num=%d'%(self.num))
if __name__ == '__main__':
t1 = MyThread(100)
t1.start()
t2 = MyThread(300)
t2.start()
运行结果:
num=301
num=101
总结
在多线程开发中, 全局变量是多个线程都共享的数据,而局部变量等是各自线程的,是非共享的
6、ThreadLocal
6.1 使用函数传参的方法
但是局部变量也有问题,就是在函数调用的时候,传递起来很麻烦:
def process_student(name):
std = Student(name)
# std是局部变量,但是每个函数都要用它,因此必须传进去:
do_task_1(std)
do_task_2(std)
def do_task_1(std):
do_subtask_1(std)
do_subtask_2(std)
def do_task_2(std):
do_subtask_2(std)
do_subtask_2(std)
每个函数一层一层调用都这么传参数那还得了? 用全局变量? 也不行,因为
每个线程处理不同的Student对象,不能共享。
6.2使用全局字典的方法
如果一个个全局dict存放所有的Student对象, 然后以thread自身作为key获得线程对应的Student对象如何?
from threading import Thread,current_thread
class Student(object):
def __init__(self,name):
self.name = name
def stu_process(name):
stu = Student(name)
print(current_thread())
global_dict[current_thread()] = stu
stu_dotask1()
stu_dotask2()
def stu_dotask1():
name = global_dict[current_thread()].name
print('stu_dotask1:%s'%name)
stu_dosubtask1()
def stu_dotask2():
name = global_dict[current_thread()].name
print('stu_dotask2:%s'%name)
stu_dosubtask2()
def stu_dosubtask1():
name = global_dict[current_thread()].name
print('stu_dosubtask1:%s'%name)
def stu_dosubtask2():
name = global_dict[current_thread()].name
print('stu_dosubtask2:%s' %name)
global_dict = {}
if __name__ == '__main__':
t1 = Thread(target=stu_process,args=('xs1',))
t1.start()
t2 = Thread(target=stu_process,args=('xs2',))
t2.start()
这种方式理论上是可行的,它最大的优点是消除了std对象在每层函数中的传递问题,但是,每个函数获取std的代码有点low。
6.3使用ThreadLocal的方法
ThreadLocal应运而生,不用查找dict,ThreadLocal帮你自动做这件事:(可以看做是包装了dict的方法)
from threading import Thread,local,current_thread
class Student(object):
def __init__(self,name):
self.name = name
def stu_process(name):
#在local_school添加一个student属性
local_shool.student = name
#stu_dotask1()
stu_dotask2()
def stu_dotask1():
name = local_shool.student
print('stu_dotask1:%s'%name)
stu_dosubtask1()
def stu_dotask2():
name = local_shool.student
print('stu_dotask2:%s'%name)
stu_dosubtask2()
def stu_dosubtask1():
name = local_shool.student
print('stu_dosubtask1:%s'%name)
def stu_dosubtask2():
name = local_shool.student
print('stu_dosubtask2:%s' %name)
#定义Threadlocal对象
local_shool = local()
if __name__ == '__main__':
t1 = Thread(target=stu_process,args=('xs1',))
t1.start()
t2 = Thread(target=stu_process,args=('xs2',))
t2.start()
运行结果:
stu_dotask2:xs1
stu_dosubtask2:xs1
stu_dotask2:xs2
stu_dosubtask2:xs2
说明
全局变量local_school就是一个ThreadLocal对象,每个Thread对它都可以读写student属性,但互不影响。 你可以把local_school看成全局变量,但每个属性如local_school.student都是线程的局部变量,可以任意读写而互不干扰,也不用管理锁的问题,ThreadLocal内部会处理。
可以理解为全局变量local_school是一个dict,不但可以用local_school.student,还可以绑定其他变量,如local_school.teacher等等。
ThreadLocal最常用的地方就是为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等,这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源。
6.4总结
一个ThreadLocal变量虽然是全局变量,但每个线程都只能读写自己线程的独立副本,互不干扰。ThreadLocal解决了参数在一个线程中各个函数之间互相传递的问题
7、异步
同步调用就是你 喊 你朋友吃饭 ,你朋友在忙 ,你就一直在那等,等你朋友忙完了 ,你们一起去
异步调用就是你 喊 你朋友吃饭 ,你朋友说知道了 ,待会忙完去找你 ,你就去做别的了。
from multiprocessing import Pool
import time
import os
def test():
print("---进程池中的进程---pid=%d,ppid=%d--"%(os.getpid(),os.getppid())
for i in range(3):
print("----%d---"%i)
time.sleep(1)
return "hahah"
def test2(args):
print("---callback func--pid=%d"%os.getpid())
print("---callback func--args=%s"%args)
pool = Pool(3)
pool.apply_async(func=test,callback=test2)
time.sleep(5)
print("----主进程-pid=%d----"%os.getpid())
运行结果:
---进程池中的进程---pid=9401,ppid=9400--
----0---
----1---
----2---
---callback func--pid=9400
---callback func--args=hahah
----主进程-pid=9400----
作业题
模拟多线程下载,10个线程,每个线程下载10个数据,(线程1:10,12...19,线程2:20,21...29,类推),最后合成存入一个文件
from threading import Thread
from time import sleep
ls = ['','','','','','','','','','']
#下载的线程类
class Down(Thread):
def __init__(self, index):
Thread.__init__(self)
self.index = index
def run(self):
result = ""
for i in range(10):
s = str(self.index) + str(i)
print(s)
result = result + s
sleep(0.5)
print(result)
ls[self.index] = result
if __name__ == '__main__':
for i in range(10):
t = Down(i)
t.start()
sleep(8)
print(ls)
#保存
f = open('data.txt', 'w', encoding='utf-8')
#join:列表元素链接成一个字符串,
s = ':'.join(ls)
print(s)
f.write(s)
f.close()
网友评论