代码一
author = 'damao'
from random import randint
from time import time,sleep
from multiprocessing import Process
from os import getpid
def download_file(file_name):
print("开始下载{a}".format(a=file_name))
download_use_time = randint(1,10)
sleep(download_use_time)
print("{a}下载完成,共耗时{b}秒".format(a=file_name,b=download_use_time))
"""未使用多进程代码"""
def test():
start_time = time()
download_file("Python核心编程.pdf")
download_file("av.mp4")
end_time = time()
totle_time = end_time - start_time
print("共耗时{a}秒".format(a=round(totle_time,3)))
"""使用多进程代码"""
def test_use_process():
start_time = time()
p1 = Process(target=download_file,args=("C语言编程.pdf",))
p1.start()
p2 = Process(target=download_file,args=("tykohot.avi",))
p2.start()
p1.join()
p2.join()
end_time = time()
totle_time = end_time - start_time
print("共耗时{a}秒".format(a=round(totle_time, 3)))
if __name__ == '__main__':
# test()
test_use_process()
代码二
author = 'damao'
"""控制总‘进程’数"""
from multiprocessing import Process,Queue
from time import sleep
def func(words,q):
num = q.get()
while num:
print("{a}:{b}".format(a=num,b=words))
sleep(0.001)
num = q.get()
def test():
q = Queue(10) # 指定进程数
for num in range(1,11):
q.put(num)
Process(target=func,args=("good",q)).start()
Process(target=func,args=("great",q)).start()
if __name__ == '__main__':
test()
代码三
author = 'damao'
"""使用threading模块实现多线程"""
from threading import Thread
from random import randint
from time import time,sleep
def download_file(file_name):
print("开始下载{a}".format(a=file_name))
download_use_time = randint(1,10)
sleep(download_use_time)
print("{a}下载完成,共耗时{b}秒".format(a=file_name,b=download_use_time))
"""使用多进程代码"""
def test_use_thread():
start_time = time()
p1 = Thread(target=download_file,args=("C语言编程.pdf",))
p1.start()
p2 = Thread(target=download_file,args=("tykohot.avi",))
p2.start()
p1.join()
p2.join()
end_time = time()
totle_time = end_time - start_time
print("共耗时{a}秒".format(a=round(totle_time, 3)))
"""###########################################################"""
"""自定义线程类"""
class MyThread(Thread):
def __init__(self,file_name):
super().__init__()
self.file_name = file_name
def run(self):
print("开始下载{a}".format(a=self.file_name))
download_use_time = randint(1, 10)
sleep(download_use_time)
print("{a}下载完成,共耗时{b}秒".format(a=self.file_name, b=download_use_time))
def main():
start_time = time()
p1 = MyThread("NNN.pdf")
p1.start()
p2 = MyThread("AAA.mp4")
p2.start()
p1.join()
p2.join()
end_time = time()
use_time = end_time - start_time
print("共耗时{a}秒".format(a=round(use_time, 3)))
if __name__ == '__main__':
# test_use_thread()
main()
代码四
author = 'damao'
from random import randint
from threading import Thread
from time import time, sleep
class MyThread(Thread):
def __init__(self,file_name):
super().__init__()
self._file_name = file_name
def run(self):
print("开始下载{a}".format(a=self._file_name))
download_use_time = randint(1, 10)
sleep(download_use_time)
print("{a}下载完成,共耗时{b}秒".format(a=self._file_name, b=download_use_time))
def main():
start_time = time()
p1 = MyThread("NNN.pdf")
p1.start()
p2 = MyThread("AAA.mp4")
p2.start()
p1.join()
p2.join()
end_time = time()
use_time = end_time - start_time
print("共耗时{a}秒".format(a=round(use_time, 3)))
if __name__ == '__main__':
# test_use_thread()
main()
代码五
author = 'damao'
from time import sleep
from threading import Thread,Lock
class SaveMoney(object):
def __init__(self):
self._balance = 0
self._lock = Lock()
@property
def balance(self):
return self._balance
def save_meony(self,moeny):
# 先获取锁才能进行后续的操作
self._lock.acquire()
try:
new_balance = self._balance + moeny
sleep(0.01)
self._balance = new_balance
finally:
# 在finally中执行释放锁的操作保证正常异常锁都能释放
self._lock.release()
class AddMoneyThread(Thread):
"""存钱多程序"""
def __init__(self,account,money):
super().__init__()
self._account = account
self._money = money
def run(self):
self._account.save_meony(self._money)
def main():
my_money = SaveMoney()
save_money_threads = []
for _ in range(100):
t = AddMoneyThread(my_money,1)
save_money_threads.append(t)
t.start()
for i in save_money_threads:
i.join()
print("账户余额为:{a}元".format(a=my_money.balance))
if __name__ == '__main__':
main()
代码六
import time
import tkinter
import tkinter.messagebox
from threading import Thread
def main():
class DownloadTaskHandler(Thread):
def run(self):
time.sleep(10)
tkinter.messagebox.showinfo('提示', '下载完成!')
# 启用下载按钮
button1.config(state=tkinter.NORMAL)
def download():
# 禁用下载按钮
button1.config(state=tkinter.DISABLED)
# 通过daemon参数将线程设置为守护线程(主程序退出就不再保留执行)
# 在线程中处理耗时间的下载任务
DownloadTaskHandler(daemon=True).start()
def show_about():
tkinter.messagebox.showinfo('关于', '作者: 骆昊(v1.0)')
top = tkinter.Tk()
top.title('单线程')
top.geometry('200x150')
top.wm_attributes('-topmost', 1)
panel = tkinter.Frame(top)
button1 = tkinter.Button(panel, text='下载', command=download)
button1.pack(side='left')
button2 = tkinter.Button(panel, text='关于', command=show_about)
button2.pack(side='right')
panel.pack(side='bottom')
tkinter.mainloop()
if __name__ == '__main__':
main()
代码七
author = 'damao'
"""完成1~100000000求和"""
import time
# 方法一 共耗时4.673534870147705
def to_sum():
stat_time = time.time()
sum = 0
for i in range(1,100000001):
sum += i
print(sum)
end_time = time.time()
totle_time = end_time - stat_time
print("共耗时{a}".format(a=totle_time))
# 方法二 共耗时11.168128728866577
def test_sum():
stat_time = time.time()
num_list = [x for x in range(1,100000001)]
a = sum(num_list)
print(a)
end_time = time.time()
totle_time = end_time - stat_time
print("共耗时{a}".format(a=totle_time))
if __name__ == '__main__':
# to_sum()
test_sum()
网友评论