美文网首页
Python从新手到大师——12:进程和线程

Python从新手到大师——12:进程和线程

作者: 远航天下 | 来源:发表于2018-08-29 11:27 被阅读0次

    代码一

    author = 'damao'
    
    from random import randint
    from time import time,sleep
    from multiprocessing import Process
    from os import getpid
    
    
    def download_file(file_name):
        print("开始下载{a}".format(a=file_name))
        download_use_time = randint(1,10)
        sleep(download_use_time)
        print("{a}下载完成,共耗时{b}秒".format(a=file_name,b=download_use_time))
    
    """未使用多进程代码"""
    def test():
        start_time = time()
        download_file("Python核心编程.pdf")
        download_file("av.mp4")
        end_time = time()
        totle_time = end_time - start_time
        print("共耗时{a}秒".format(a=round(totle_time,3)))
    
    """使用多进程代码"""
    def test_use_process():
        start_time = time()
        p1 = Process(target=download_file,args=("C语言编程.pdf",))
        p1.start()
        p2 = Process(target=download_file,args=("tykohot.avi",))
        p2.start()
        p1.join()
        p2.join()
        end_time = time()
        totle_time = end_time - start_time
        print("共耗时{a}秒".format(a=round(totle_time, 3)))
    
        
    
    if __name__ == '__main__':
        # test()
        test_use_process()
    

    代码二

    author = 'damao'
    
    """控制总‘进程’数"""
    
    from multiprocessing import Process,Queue
    from time import sleep
    
    def func(words,q):
        num = q.get()
        while num:
            print("{a}:{b}".format(a=num,b=words))
            sleep(0.001)
            num = q.get()
    
    def test():
        q = Queue(10)  # 指定进程数
        for num in range(1,11):
            q.put(num)
        Process(target=func,args=("good",q)).start()
        Process(target=func,args=("great",q)).start()
    
    if __name__ == '__main__':
        test()  
    

    代码三

    author = 'damao'
    
    """使用threading模块实现多线程"""
    
    from threading import Thread
    from random import randint
    from time import time,sleep
    
    def download_file(file_name):
        print("开始下载{a}".format(a=file_name))
        download_use_time = randint(1,10)
        sleep(download_use_time)
        print("{a}下载完成,共耗时{b}秒".format(a=file_name,b=download_use_time))
    
    """使用多进程代码"""
    def test_use_thread():
        start_time = time()
        p1 = Thread(target=download_file,args=("C语言编程.pdf",))
        p1.start()
        p2 = Thread(target=download_file,args=("tykohot.avi",))
        p2.start()
        p1.join()
        p2.join()
        end_time = time()
        totle_time = end_time - start_time
        print("共耗时{a}秒".format(a=round(totle_time, 3)))
    
    """###########################################################"""
    
    
    """自定义线程类"""
    
    
    class MyThread(Thread):
        def __init__(self,file_name):
            super().__init__()
            self.file_name = file_name
    
        def run(self):
            print("开始下载{a}".format(a=self.file_name))
            download_use_time = randint(1, 10)
            sleep(download_use_time)
            print("{a}下载完成,共耗时{b}秒".format(a=self.file_name, b=download_use_time))
    
    def main():
        start_time = time()
        p1 = MyThread("NNN.pdf")
        p1.start()
        p2 = MyThread("AAA.mp4")
        p2.start()
        p1.join()
        p2.join()
        end_time = time()
        use_time = end_time - start_time
        print("共耗时{a}秒".format(a=round(use_time, 3)))
    
    
    if __name__ == '__main__':
        # test_use_thread()
        main()
    
    
    

    代码四

    author = 'damao'
    
    from random import randint
    from threading import Thread
    from time import time, sleep
    
    
    class MyThread(Thread):
    
        def __init__(self,file_name):
            super().__init__()
            self._file_name = file_name
    
        def run(self):
            print("开始下载{a}".format(a=self._file_name))
            download_use_time = randint(1, 10)
            sleep(download_use_time)
            print("{a}下载完成,共耗时{b}秒".format(a=self._file_name, b=download_use_time))
    
    def main():
        start_time = time()
        p1 = MyThread("NNN.pdf")
        p1.start()
        p2 = MyThread("AAA.mp4")
        p2.start()
        p1.join()
        p2.join()
        end_time = time()
        use_time = end_time - start_time
        print("共耗时{a}秒".format(a=round(use_time, 3)))
    
    
    if __name__ == '__main__':
        # test_use_thread()
        main()
    

    代码五

    author = 'damao'
    
    from time import sleep
    from threading import Thread,Lock
    
    class SaveMoney(object):
    
        def __init__(self):
            self._balance = 0
            self._lock = Lock()
    
        @property
        def balance(self):
            return self._balance
    
        def save_meony(self,moeny):
            # 先获取锁才能进行后续的操作
            self._lock.acquire()
            try:
                new_balance = self._balance + moeny
                sleep(0.01)
                self._balance = new_balance
            finally:
                # 在finally中执行释放锁的操作保证正常异常锁都能释放
                self._lock.release()
    
    
    class AddMoneyThread(Thread):
        """存钱多程序"""
    
        def __init__(self,account,money):
            super().__init__()
            self._account = account
            self._money = money
    
        def run(self):
            self._account.save_meony(self._money)
    
    
    def main():
        my_money = SaveMoney()
        save_money_threads = []
        for _ in range(100):
            t = AddMoneyThread(my_money,1)
            save_money_threads.append(t)
            t.start()
        for i in save_money_threads:
            i.join()
        print("账户余额为:{a}元".format(a=my_money.balance))
    
    
    if __name__ == '__main__':
        main()
    

    代码六

    import time
    import tkinter
    import tkinter.messagebox
    from threading import Thread
    
    
    def main():
    
        class DownloadTaskHandler(Thread):
    
            def run(self):
                time.sleep(10)
                tkinter.messagebox.showinfo('提示', '下载完成!')
                # 启用下载按钮
                button1.config(state=tkinter.NORMAL)
    
        def download():
            # 禁用下载按钮
            button1.config(state=tkinter.DISABLED)
            # 通过daemon参数将线程设置为守护线程(主程序退出就不再保留执行)
            # 在线程中处理耗时间的下载任务
            DownloadTaskHandler(daemon=True).start()
    
        def show_about():
            tkinter.messagebox.showinfo('关于', '作者: 骆昊(v1.0)')
    
        top = tkinter.Tk()
        top.title('单线程')
        top.geometry('200x150')
        top.wm_attributes('-topmost', 1)
    
        panel = tkinter.Frame(top)
        button1 = tkinter.Button(panel, text='下载', command=download)
        button1.pack(side='left')
        button2 = tkinter.Button(panel, text='关于', command=show_about)
        button2.pack(side='right')
        panel.pack(side='bottom')
    
        tkinter.mainloop()
    
    
    if __name__ == '__main__':
        main()
    

    代码七

    author = 'damao'
    
    """完成1~100000000求和"""
    
    import time
    
    # 方法一  共耗时4.673534870147705
    def to_sum():
        stat_time = time.time()
        sum = 0
        for i in range(1,100000001):
            sum += i
        print(sum)
        end_time = time.time()
        totle_time = end_time - stat_time
        print("共耗时{a}".format(a=totle_time))
    
    # 方法二  共耗时11.168128728866577
    def test_sum():
        stat_time = time.time()
        num_list = [x for x in range(1,100000001)]
        a = sum(num_list)
        print(a)
        end_time = time.time()
        totle_time = end_time - stat_time
        print("共耗时{a}".format(a=totle_time))
    
    
    if __name__ == '__main__':
        # to_sum()
        test_sum()
    

    相关文章

      网友评论

          本文标题:Python从新手到大师——12:进程和线程

          本文链接:https://www.haomeiwen.com/subject/wmuhwftx.html