美文网首页
02-多任务

02-多任务

作者: 努力爬行中的蜗牛 | 来源:发表于2018-11-04 22:32 被阅读0次
    线程

    python的thread模块是比较底层的模块,python的threading模块是对thread做了一些包装的,可以更加方便的被使用。

    import time
    import threading
    
    
    def sing():
        for i in range(5):
            print("-----正在唱歌-------")
            time.sleep(1)
    
    
    def dance():
        for i in range(5):
            print("-----正在跳舞------")
            time.sleep(1)
    
    
    def main():
        t1 = threading.Thread(target=sing)
        t2 = threading.Thread(target=dance)
        t1.start()  # 启动线程,让线程开始执行
        t2.start()
    
    
    if __name__ == "__main__":
        main()
    
    线程的创建
    • 如果创建target时指定的函数,运行结束,意味着这个子线程结束
    • 当调用Thread的时候,不会创建线程
    • 当调动Thread创建出来的实例对象的start方法时,才会创建线程
    • 主线程等待所有子线程结束后才会结束
    import threading
    import time
    
    
    def test1():
        for i in range(5):
            print("----test1-----%d" % i)
            time.sleep(1)
        # 如果创建target时指定的函数,运行结束,意味着这个子线程结束
    
    
    def test2():
        for i in range(10):
            print("----test2-----%d" % i)
            time.sleep(1)
    
    
    # 当调用Thread的时候,不会创建线程
    # 当调动Thread创建出来的实例对象的start方法时,才会创建线程
    def main():
        t1 = threading.Thread(target=test1)
        t1.start()
    
        t2 = threading.Thread(target=test2)
        t2.start()
    
        while True:
            print(threading.enumerate())
            if len(threading.enumerate()) <= 1:
                break
            time.sleep(1)
    
    
    if __name__ == "__main__":
        main()
    
    通过继承Thread来创建线程
    import threading
    import time
    
    
    class Mythread(threading.Thread):
        # 继承Thread的类必须定义run方法
        def run(self):
            for i in range(3):
                time.sleep(1)
                msg = "I'm " + self.name + ' @ ' + str(i)  # name属性中保存的是当前线程的名字
                print(msg)
    
    
    if __name__ == "__main__":
        t = Mythread()
        t.start()
    
    多线程共享全局变量
    import threading
    import time
    
    # 定义一个全局变量
    g_num = 100
    
    
    def test1():
        global g_num
        g_num =+ 1
        print("----test1 g_num = %d" % g_num)
    
    
    def test2():
        print("---test2 g_num = %d" % g_num)
    
    
    def main():
        t1 = threading.Thread(target=test1)
        t2 = threading.Thread(target=test2)
    
        t1.start()
        time.sleep(1)
    
        t2.start()
        time.sleep(1)
    
        print("*" * 50)
    
    
    if __name__ == "__main__":
        main()
    
    
    多线程共享全局变量2
    import threading
    import time
    
    # 定义一个全局变量
    g_num = 100
    g_nums = [11, 22]
    
    
    def test1(temp):
        global g_num
        g_nums.append(33)
        print("----test1 g_num = %s" % str(temp))
    
    
    def test2(temp):
        print("---test2 g_num = %s" % str(temp))
    
    
    def main():
        # 第一个参数是线程要执行的函数名,第二个参数是传给要执行函数的参数
        t1 = threading.Thread(target=test1, args=(g_nums,))
        t2 = threading.Thread(target=test2, args=(g_nums,))
    
        t1.start()
        time.sleep(1)
    
        t2.start()
        time.sleep(1)
    
        print("*" * 50)
    
    
    if __name__ == "__main__":
        main()
    
    
    
    同步概念、互斥锁
    import threading
    import time
    
    # 定义一个全局变量
    g_num = 0
    
    # 创建一个互斥锁,默认是没有上锁的
    mutex = threading.Lock()
    
    
    def test1(num):
        global g_num
        # 上锁,如果之前没有被上锁,那么此时上锁成功
        # 如果上锁之前,已经被上上了,那么此时会堵塞在这里,知道被解锁开位置
        for i in range(num):
            mutex.acquire()
            g_num += 1
            # 解锁
            mutex.release()
        print("----test1 g_num = %d" % g_num)
    
    
    def test2(num):
        global g_num
    
        for i in range(num):
            mutex.acquire()
            g_num += 1
            mutex.release()
        print("----test1 g_num = %d" % g_num)
    
    
    def main():
        # 第一个参数是线程要执行的函数名,第二个参数是传给要执行函数的参数
        t1 = threading.Thread(target=test1, args=(10000,))
        t2 = threading.Thread(target=test2, args=(10000,))
    
        t1.start()
        t2.start()
    
        time.sleep(2)
    
        print("*" * 50)
    
    
    if __name__ == "__main__":
        main()
    
    多线程udp聊天器
    import socket
    import threading
    
    
    def recvmsg(udp_socket):
        # 接收数据
        while True:
            recv_data = udp_socket.recvfrom(1024)
            print(recv_data)
    
    
    def sendmsg(udp_socket, dest_ip, dest_port):
        # 发送数据
        while True:
            send_data = input("请输入要发送的数据:")
            udp_socket.sendto(send_data.encode("utf-8"), (dest_ip, dest_port))
    
    
    def main():
        # 1. 创建套接字
        udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    
        # 2. 绑定本地信息
        udp_socket.bind(("127.0.0.1", 7890))
    
        # 3. 获取对方的ip
        dest_ip = input("请输入对方的ip:")
        dest_port = int(input("请输入对方的port:"))
    
        # 4. 创建两个线程,去执行响应的功能
        t_recv = threading.Thread(target=recvmsg, args=(udp_socket, ))
        t_send = threading.Thread(target=sendmsg, args=(udp_socket, dest_ip, dest_port))
    
        t_recv.start()
        t_send.start()
    
    
    if __name__ == "__main__":
        main()
    
    
    进程、线程概念

    进程:一个程序运行起来后,代码加用到的资源称为进程,他是操作系统分配资源的基本单位。
    不仅线程可以执行多任务,进程也可以。
    注意:进程占用资源较多,线程执行开销小。
    进程的状态:启动->就绪->运行->结束 (中间还有个等待状态,如sleep)

    进程完成多任务
    import threading
    import time
    import multiprocessing
    
    
    def test1():
        while True:
            print("1--------")
            time.sleep(1)
    
    
    def test2():
        while True:
            print("2--------")
            time.sleep(1)
    
    
    def main():
        p1 = multiprocessing.Process(target=test1)
        p2 = multiprocessing.Process(target=test2)
    
        p1.start()
        p2.start()
    
    
    if __name__ == '__main__':
        main()
    
    
    进程和线程区别

    进程:比如电脑上的多个qq
    线程:比如电脑上的一个qq的多个窗口

    一个进程有1个或者以上的线程。进程是cpu分配资源的基本单位,线程是cpu执行任务的基本单位。
    线程依赖于进程,没有进程就没有线程。

    通过队列完成进程间的通信

    socket是进程间通信的一种方式。
    队列:先进先出
    栈:先进后出

    from multiprocessing import Queue
    
    q = Queue(3)
    # 队列可以往里面放放任意类型的数据
    q.put("111")
    # 判断队列是否满了,队列满了就会堵塞,不能再继续往里面放数据
    print(q.full())
    
    print(q.get())
    # 判断队列是否空了,队列空了也会堵塞,不能再读取数据
    print(q.empty())
    
    import multiprocessing
    
    
    def download_from_web(q):
        data = [11, 23, 23]
        # 向队列中写入数据
        for temp in data:
            q.put(temp)
        print("----数据下载完成-----")
    
    
    def analysis_data(q):
        # 从队列中读取数据
        wating_analysis = list()
        # 从队列中读取数据
        while True:
            wating_analysis.append(q.get())
            if q.empty():
                break
        print(wating_analysis)
        print("----数据读取完成-----")
    
    
    def main():
        # 创建一个队列
        q = multiprocessing.Queue()
    
        p1 = multiprocessing.Process(target=download_from_web, args=(q, ))
        p2 = multiprocessing.Process(target=analysis_data, args=(q, ))
    
        p1.start()
        p2.start()
    
    
    if __name__ == '__main__':
        main()
    
    进程池概念

    初始化pool(进程池)时,可以指定一个最大进程数,当有新的请求提交到Pool时,如果池还没满,就会创建一个进程去执行请求,如果池已满,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务。

    # -*- coding:utf-8 -*-
    from multiprocessing import Pool
    import os, time, random
    
    
    def worker(msg):
        t_start = time.time()
        print("%s开始执行,进程号为%d" % (msg, os.getpgid()))
        # random.random()随机生成0~1之间的浮点数
        time.sleep(random.random() * 2)
        t_stop = time.time()
        print(msg, "执行完毕,耗时%0.2f" % (t_stop-t_start))
    
    
    po = Pool(3)  # 定义一个进场池,最大进程数是3
    for i in range(0, 10):
        # Pool().apply_async(要调用的目标,(传递给目标的参数元组, ) )
        # 每次循环将会用空闲出来的子进程去调用目标
        po.apply_async(worker, (i, ))
    
    
    print("----start----")
    po.close()  # 关闭进程池,关闭后po不再接接收新的请求
    po.join()  # 等待pool中的所有子进程执行完成,必须放在close语句之后
    print("----end-----")
    
    文件拷贝案例
    import os
    import multiprocessing
    
    
    def copy_file(q, file_name, old_folder_name, new_folder_name):
        """完成文件的赋值"""
        # print("---模拟拷贝文件%s 从 %s 到 %s" % (file_name, old_folder_name, new_folder_name))
        old_f = open(old_folder_name + "/" + file_name, "rb")
        content = old_f.read()
        old_f.close()
    
        new_f = open(new_folder_name + "/" + file_name, "wb")
        new_f.write(content)
        new_f.close()
    
        # 如果拷贝完了文件,就向对列中写入一个消息,表示完成copy
        q.put(file_name)
    
    
    def main():
        # 1. 获取要拷贝的文件夹的名字
        old_folder_name = input("情书要copy的文件夹的名字:")
    
        # 2. 创建一个新的文件夹
        try:
            new_folder_name = old_folder_name + "[复件]"
            os.mkdir(new_folder_name)
        except:
            pass
    
        # 3. 获取文件夹的所有的待拷贝的文件的名字 os.listdir()
        file_names = os.listdir(old_folder_name)
        print(file_names)
        # 4. 创建进程池
        po = multiprocessing.Pool(5)
    
        # 5. 创建一个队列
        q = multiprocessing.Manager().Queue()
    
        # 向进程池中添加copy文件的任务
        for file_name in file_names:
            po.apply_async(copy_file, args=(q, file_name, old_folder_name, new_folder_name))
    
        po.close()
        # po.join()
        all_file_num = len(file_names)  # 测试下所有文件的个数
        copy_ok_num = 0
        while True:
            print("文件拷贝进度:%.2f %%" % (copy_ok_num * 100 / all_file_num))
            copy_ok_num += 1
            if copy_ok_num == all_file_num:
                print("文件拷贝进度:100 %")
                break
    
    
    if __name__ == "__main__":
        main()
    
    协程
    迭代器

    迭代是访问集合元素的一种方式,迭代器是一个可以记住遍历的位置的对象,迭代器对象从集合的第一个元素开始访问,直到所有的元素被访问完结束。迭代器只能往前不能往后。

    我们已经知道可以对list、tuple、str等类型的数据使用for...in 的循环语法从中依次拿到数据进行使用,我们把这样的过程称为遍历,也叫迭代。

    判断一个对象是否为可以迭代的对象,如果是就可以用for循环

    >>> from collections import Iterable
    >>> isinstance([11, 12, 13],Iterable)
    True
    >>> isinstance("abc", Iterable)
    True
    
    迭代器demo
    from collections import Iterable
    from collections import Iterator
    import time
    
    
    # 判断一个xxx_obj是否是可以迭代条件:
    # 调用iter函数得到xxx_obj对象的__iter__方法的返回值
    # iter方法的返回值是一个迭代器
    class Classmate(object):
        def __init__(self):
            self.names = list()
            self.current_num = 0
    
        def add(self, name):
            self.names.append(name)
    
        # 如果想要一个对象成为一个可以迭代的对象,即可以使用for,那么必须实现__iter__方法
        def __iter__(self):
            return self
    
        def __next__(self):
            if self.current_num < len(self.names):
                ret = self.names[self.current_num]
                self.current_num += 1
                return ret
            else:
                raise StopIteration  # 当迭代器的值取完的时候,抛出StopIteration异常,for循环自动停止
    
    
    # class ClassIterator(object):
    #     def __init__(self, obj):
    #         self.obj = obj
    #         self.current_num = 0
    #
    #     def __iter__(self):
    #         pass
    #
    #     def __next__(self):
    #         if self.current_num < len(self.obj.names):
    #             ret = self.obj.names[self.current_num]
    #             self.current_num += 1
    #             return ret
    #         else:
    #             raise StopIteration  # 当迭代器的值取完的时候,抛出StopIteration异常,for循环自动停止
    
    
    classmate = Classmate()
    
    classmate.add("zyx1")
    classmate.add("zyx2")
    classmate.add("zyx3")
    
    print("判断Classmate是否是可以迭代的对象:", isinstance(classmate, Iterable))
    classmate_iterator = iter(classmate)
    print("判断classmate_iterator是否是迭代器:", isinstance(classmate_iterator, Iterator))
    
    for name in classmate:
        print(name)
        time.sleep(1)
    
    迭代器的应用

    迭代器存储的生成数据的方法。

    import time
    
    
    class Fibonacci(object):
        def __init__(self, all_num):
            self.all_num = all_num
            self.current_num = 0
            self.a = 0
            self.b = 1
    
        # 如果想要一个对象成为一个可以迭代的对象,即可以使用for,那么必须实现__iter__方法
        def __iter__(self):
            return self
    
        def __next__(self):
            if self.current_num < self.all_num:
                ret = self.a
                self.a, self.b = self.b, self.a + self.b
                self.current_num += 1
                return ret
            else:
                raise StopIteration  # 当迭代器的值取完的时候,抛出StopIteration异常,for循环自动停止
    
    
    fibo = Fibonacci(10)
    
    for name in fibo:
        print(name)
        time.sleep(1)
    
    生成器

    生成器是一种特殊的迭代器。

    
    
    def create_num(all_num):
        print("----1----")
        a, b = 0, 1
        current_num = 0
        while current_num < all_num:
            # print(a)
            print("----2----")
            yield a  # 如果一个函数中又yield语句,那么这就不是一个函数,而是一个生成器模板
            print("----3----")
            a, b = b, a + b
            current_num += 1
            print("----4----")
    
    
    # 如果在调用这个函数的时候,发现这个函数中有yield,那么不是调用函数,而是创建一个生成器
    obj = create_num(10)
    
    ret = next(obj)
    print(ret)
    
    ret = next(obj)
    print(ret)
    
    # for num in obj:
    #     print(num)
    
    def create_num(all_num):
        a, b = 0, 1
        current_num = 0
        while current_num < all_num:
            yield a  # 如果一个函数中又yield语句,那么这就不是一个函数,而是一个生成器模板
            a, b = b, a + b
            current_num += 1
    
    
    # 如果在调用这个函数的时候,发现这个函数中有yield,那么不是调用函数,而是创建一个生成器
    obj = create_num(50)
    
    while True:
        try:
            ret = next(obj)
            print(ret)
        except Exception as result:
            break
    
    通过send来启动生成器
    def create_num(all_num):
        a, b = 0, 1
        current_num = 0
        while current_num < all_num:
            ret = yield a  # 如果一个函数中又yield语句,那么这就不是一个函数,而是一个生成器模板
            print(">>>>>>>", ret)
            a, b = b, a + b
            current_num += 1
    
    
    # 如果在调用这个函数的时候,发现这个函数中有yield,那么不是调用函数,而是创建一个生成器
    obj = create_num(10)
    
    # obj.send(None)  # send一般不会放到第一次启动生成器,如果非要这么做,那么传递None
    
    ret = next(obj)
    print(ret)
    
    # send里面的数据胡传递给第5行,当做yield a的结果,然后ret保存这个结果
    # send的结果是一下一次调用yield时 yield后面的值
    ret = obj.send("haha")
    print(ret)
    
    使用yield完成多任务
     import time
    
    
    def task_1():
        while True:
            print("----1-----")
            time.sleep(0.1)
            yield
    
    
    def task_2():
        while True:
            print("----2-----")
            time.sleep(0.1)
            yield
    
    
    def main():
        t1 = task_1()
        t2 = task_2()
        # 先让t1运行一会,当t1中遇到yield的时候,再返回到24行,然后
        # 执行t2,当遇到yield的时候,再次切换到t1中
        # 这样t1/t2/t1/t2的交替的交替运行,最终实现了多任务的处理
        while True:
            next(t1)
            next(t2)
    
    
    if __name__ == '__main__':
        main()
    
    greenlet、gevent实现多任务操作
    import gevent
    import time
    from gevent import monkey
    
    # 有耗时操作时需要
    monkey.patch_all()  # 保证将原来的代码中的耗时操作改成gevent的耗时操作,这样就不用改代码
    
    
    # gevent 要有延时操作就会自动进行切换,是gevent的延时操作
    def f1(n):
        for i in range(n):
            print(gevent.getcurrent(), i)
            time.sleep(0.5)
            # gevent.sleep(0.5)
    
    
    def f2(n):
        for i in range(n):
            print(gevent.getcurrent(), i)
            time.sleep(0.5)
            # gevent.sleep(0.5)
    
    
    def f3(n):
        for i in range(n):
            print(gevent.getcurrent(), i)
            time.sleep(0.5)
            # gevent.sleep(0.5)
    
    
    # print("----1----")
    # g1 = gevent.spawn(f1, 5)
    # print("----2----")
    # g2 = gevent.spawn(f2, 5)
    # print("----3----")
    # g3 = gevent.spawn(f3, 5)
    # print("----4----")
    # g1.join()
    # g2.join()
    # g2.join()
    
    def corountine_work(corountine_name):
        for i in range(10):
            print(corountine_name, i)
            time.sleep(1)
    
    
    gevent.joinall([
        gevent.spawn(corountine_work, "work1"),
        gevent.spawn(corountine_work, "work2")
    ])
    
    图片下载器demo
    import urllib.request
    import gevent
    from gevent import monkey
    
    monkey.patch_all()
    
    
    def download(imag_name, img_url):
        req = urllib.request.urlopen(img_url)
        img_content = req.read()
        with open(imag_name, "wb") as f:
            f.write(img_content)
    
    
    def main():
        gevent.joinall([
            gevent.spawn(download, "1.jgp", "https://rpic.douyucdn.cn/asrpic/181107/3426612_750313_65cf3_2_2019.jpg"),
            gevent.spawn(download, "2.jpg", "https://rpic.douyucdn.cn/live-cover/appCovers/2018/10/18/5246011_20181018090809_small.jpg")
        ])
    
    
    if __name__ == '__main__':
        main()
    
    进程、线程以及斜程之间的区别
    • 进程是系统分配资源的基本单位
    • 线程是系统调度资源的基本单位
    • 进程切换需要的资源很多大,效率很低
    • 线程切换需要的资源一般,效率一般(在不考虑GIL的情况)
    • 协程切换任务需要的资源很小,效率很高
    • 多进程、多线程根据cpu核数不一样可能是并发的,但是协程在一个线程中,所以是并发

    相关文章

      网友评论

          本文标题:02-多任务

          本文链接:https://www.haomeiwen.com/subject/yrrlxqtx.html