测试函数myTest()
#定义重复执行的测试函数:myTest()
def myTest(a):
print("my name is"+a)
#定义实参
name=[" Tony"," Anna"," Daier"]
传统方法多次使用测试函数
>>>myTest(name[0])
>>>"my name is Tony"
>>>myTest(name[1])
>>>"my name is Anna"
>>>myTest(name[2])
>>>"my name is Daier"
使用多线程来并行运行测试函数
1.使用threading.Thread方法
from threading import Thread
threads=[]
class myThread(threading.Thread): #定义一个继承threading的类
def __init__(self,func,args):
Thread.__init__(self)
self.func=func #多次执行的函数
self.args=args #待执行函数的参数
def run(self):
self.func(*self.args)
for _ in name:
threads.append(myThread(myTest,(_)))
threads[-1].start() #开始执行此线程
for thread in threads:
thread.join() #等待线程结束
2.使用ThreadPoolExecutor()方法
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=5) as exe: #max_workers是线程数目
exe.map(myTest,name)
#exe.map(a,b)第一个参数a为重复执行函数,第二个参数b为执行函数的实参列表
3.线程锁
threadLock=threading.Lock()
threadLock.acquire()
待锁的变量或者操作
threadLock.release()
网友评论