美文网首页
Python多任务(高级编程六)

Python多任务(高级编程六)

作者: 冷煖自知 | 来源:发表于2020-01-11 18:35 被阅读0次

并行:真的多任务 cpu大于当前执行的任务
并发:假的多任务 cpu小于当前执行的任务

多线程

import threading

def demo():
    # 子线程
    print("hello girls")

if __name__ == "__main__":
    for i in range(5):
        t = threading.Thread(target=demo)
        t.start()   
查看线程数量

threading.enumerate()
至少包含一个主线程

继承Thread类创建线程

必须重写run()方法

import threading
import time

class A(threading.Thread):
    
    def __init__(self,name):
        super().__init__(name=name)
    
    def run(self):
        for i in range(5):
            print(i)

if __name__ == "__main__":
    t = A('test_name')    
    t.start()       
多线程共享全局变量(线程间通信)
import threading
import time

num = 100

# 多线程共享全局变量的!!!
def demo1():
    global num
    num += 1
    print("demo1---%d" % num)


def demo2():
    print("demo2---%d" % num)


def main():
    t1 = threading.Thread(target=demo1)
    t2 = threading.Thread(target=demo2)
    t1.start()
    # time.sleep(1)

    t2.start()
    # time.sleep(1)

    print("main---%d" % num)
多线程参数-args , kwargs
threading.Thread(target=test, args=(num,))
threading.Thread(target=demo, kwargs={'name': "join"})
Python字节码

查看Python转字节码的顺序

import dis
# 1 load a
# 2 load 1
# 3 执行add
# 4 赋值给a

def add_num(a):
    a+=1


print(dis.dis(add_num))
互斥锁

当多个线程几乎同时修改某一个共享数据的时候,需要进行同步控制
创建锁
mutex = threading.Lock()
锁定
mutex.acquire()
解锁
mutex.release()

import threading

num = 0
mutex = threading.Lock()

def demo(nums):
    global num
    # 加锁
    mutex.acquire()
    for i in range(nums):
        num += 1
    # 解锁
    mutex.release()


t = threading.Thread(target=demo1, args=(100,))
t.start()

可使用threading.RLock()创建可重入的锁,多次加锁。(解锁次数和加锁次数相对应)

死锁

在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁。

  • 避免死锁
    • 程序设计时要尽量避免(银行家算法)
    • 添加超时时间等
线程同步

Lock是比较低级的同步原语,当被锁定以后不属于特定的线程。一个所有两种状态:locked和unlocked。如果锁处于unlocked状态,acquire()方法将其修改为locked并立即返回;如果锁已处于locked状态,则阻塞当前线程并等待其他线程释放锁,然后将其修改为locked并立即返回。release()方法用来将锁的状态由locked修改为unlocked并立即返回,如果锁已经处于unlocked状态,调用该方法将抛出异常。
可重入锁RLock对象也是一种常用的线程同步原语,可以被同一个线程acquire()多次。当处于locked状态时,某线程拥有该锁;当处于unlocked状态时,该锁不属于任何线程。
RLock对象的acquire() / release()调用对可以嵌套,仅当最后一个或者最外层release()执行结束后,锁才被设置为unlocked。

  • acquire()
    获得锁。该方法等待锁被解锁,将其设置为locked并返回True。
  • release()
    释放锁。当锁被锁定时,将其重置为解锁并返回。如果锁未锁定,则会引发RuntimeError。
  • locked()
    如果锁被锁定,返回True。
import threading
import time


# 自定义线程类
class MyThread1(threading.Thread):
    def __init__(self,cond):
        super().__init__(name='MyThread')
        self.cond = cond

    # 重写线程代码
    def run(self):
        global x
        # 获得锁
        self.cond.acquire()
        # 等待
        self.cond.wait()
        for i in range(3):
            x = x + i
        time.sleep(2)
        print(x)
        # 通知
        self.cond.notify()

class MyThread2(threading.Thread):
    def __init__(self,cond):
        super().__init__(name='MyThread')
        self.cond = cond

    # 重写线程代码
    def run(self):
        global x
        # 获得锁
        self.cond.acquire()
        # 通知
        self.cond.notify()
        for i in range(3):
            x = x + i
        time.sleep(2)
        print(x)
        # 等待
        self.cond.wait()
        print(x)


# 创建锁
cond = threading.Condition()
x = 0
# 先等待再通知,顺序不能反
t1 = MyThread1(cond)
t1.start()
t2 = MyThread2(cond)
t2.start()
多任务版udp聊天
  1. 创建套接字
  2. 绑定本地信息
  3. 获取对方IP和端口
  4. 发送、接收数据
  5. 创建两个线程,去执行功能
import socket
import threading

def recv_msg(udp_socket):
    """发送数据"""
    while True:
        recv_data = udp_socket.recvfrom(1024)
        print(recv_data)


def send_msg(udp_socket, dest_ip, dest_port):
    """接收数据"""
    while True:
        send_data = input("输入要发送的数据:")
        udp_socket.sendto(send_data.encode('gbk'), (dest_ip, dest_port))


def main():
    """完成udp聊天器"""
    # 创建套接字
    udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    # 绑定
    udp_socket.bind(("", 7890))

    # 获取对方的IP和端口
    dest_ip = input('请输入对方的IP:')
    dest_port = int(input('请输入对方的port:'))

    t_recv = threading.Thread(target=recv_msg, args=(udp_socket,))
    t_send = threading.Thread(target=send_msg, args=(udp_socket, dest_ip, dest_port))

    t_recv.start()
    t_send.start()


if __name__ == '__main__':
    main()

多进程

  • 进程和程序
    进程:正在执行的程序
    程序:没有执行的代码,是一个静态的
import os
import time

# pid
pid = os.fork()

print("123")

# pid == 0 子进程
if pid == 0:
    print("子进程:{}, 父进程:{}".format(os.getpid(), os.getppid()))
else:
    print("父进程:{}".format(os.getpid()))

time.sleep(2)

'''
123
父进程:3456
123
子进程:3457,父进程:3456
'''
使用进程实现多任务

multiprocessing模块就是跨平台的多进程模块,提供了一个Process类来代表一个进程对象,这个对象可以理解为是一个独立的进程,可以执行另外的事情。

import multiprocessing

def demo():
    while True:
        print("--1--")
        time.sleep(1)


def demo1():
    while True:
        print("--2--")
        time.sleep(1)


p1 = multiprocessing.Process(target=demo)
p2 = multiprocessing.Process(target=demo1)
p1.start()
p2.start()
线程和进程之间的对比
  • 进程:能够完成多任务,一台电脑上可以同时运行多个QQ
  • 线程:能够完成多任务,一个QQ中的多个聊天窗口
  • 根本区别:进程是操作系统资源分配的基本单位,而线程是任务调度和执行的基本单位
进程间通信-Queue

Queue-队列 先进先出

from multiprocessing import Queue

# 创建队列  最多可以存放3条数据
q = Queue(3)

# 存数据
q.put(1)
q.put("name")
q.put([11, 22])
# 获取大小
print(q.qsize())
# 判断队列是否为满
print(q.full())
# 判断队列是否为空
print(q.empty())
队列间简单通信

模拟下载数据,与数据处理

import multiprocessing


def download(q):
    """下载数据"""
    lis = [11, 22, 33]
    for item in lis:
        q.put(item)

    print("下载完成,并且保存到队列中...")


def analysis(q):
    """数据处理"""
    analysis_data = list()
    while True:
        data = q.get()
        print(data)
        analysis_data.append(data)

        if q.empty():
            break

    print(analysis_data)

# 创建一个队列  跨进程通信的队列
q = multiprocessing.Queue(2)

t1 = multiprocessing.Process(target=download, args=(q, ))
t2 = multiprocessing.Process(target=analysis, args=(q, ))

t1.start()
t2.start()
多进程共享全局变量

共享全局变量不适用于多进程编程

from queue import Queue

普通队列无法多进程通信

进程池

当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态生成多个进程,但是如果是上百甚至上千个目标,手动的去创建的进程的工作量巨大,此时就可以用到multiprocessing模块提供的Pool方法

初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求,但是如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务

from multiprocessing import Pool

import os, time, random


def worker(msg):
    t_start = time.time()
    print('%s开始执行,进程号为%d' % (msg, os.getpid()))

    time.sleep(random.random() * 2)
    t_stop = time.time()
    # 0.2f
    print(msg, "执行完成,耗时%0.2f" % (t_stop - t_start))


def demo():
    pass


if __name__ == '__main__':
    po = Pool(4)  # 定义一个进程池  3个进程

    for i in range(0, 10):
        po.apply_async(worker, (i,))

    print("--start--")
    # 关闭进程池 不在接收新的请求
    po.close()

    # po.apply_async(demo)

    # 等待子进程执行完成
    po.join()
    print("--end--")
进程池间的进程通信

进程池通信需要用到multiprocessing.Manager().Queue()实现

import multiprocessing


def demo1(q):
    # 进程池里面的进程 如果出现异常 不会主动抛出
    try:
        q.put('a')
    except Exception as e:
        print(e)


def demo2(q):
    try:
        data = q.get()
        print(data)
    except Exception as e:
        print(e)


if __name__ == '__main__':
    # q = Queue()  不能完成进程之间的通信
    # q = multiprocessing.Queue()   # 进程间通信
    q = multiprocessing.Manager().Queue()       # 进程池中的进程通信

    po = multiprocessing.Pool(2)

    po.apply_async(demo1, args=(q,))
    po.apply_async(demo2, args=(q,))

    po.close()

    po.join()
多任务文件夹复制
  1. 获取用户要复制的文件夹名字
  2. 创建一个新的文件夹
  3. 获取文件夹的所有待拷贝的文件名字
  4. 创建进程池
  5. 添加拷贝任务
import multiprocessing
import os


def copy_file(q, file_name, new_folder_name, old_folder_name):
    """完成文件拷贝"""

    # print("拷贝的文件名称为:%s" % file_name)

    with open(old_folder_name + "/" + file_name, "rb") as f:
        content = f.read()

    # 保存到新的文件夹中
    new_file = open(new_folder_name + "/" + file_name, "wb")
    new_file.write(content)
    new_file.close()

    q.put(file_name)


def main():
    # 获取用户要复制的文件夹名字  test
    old_folder_name = input("请输入要复制的文件夹名字:")

    # 创建一个新的文件夹   test[复件]
    new_folder_name = old_folder_name + "复件"
    if not os.path.exists(new_folder_name):
        os.mkdir(new_folder_name)

    # 获取文件夹的所有待拷贝的文件名字
    file_names = os.listdir(old_folder_name)
    # print(file_names)

    # 创建进程池
    po = multiprocessing.Pool(5)

    # 创建队列
    q = multiprocessing.Manager().Queue()

    # 添加拷贝任务
    for file_name in file_names:
        po.apply_async(copy_file, args=(q, file_name, new_folder_name, old_folder_name))

    po.close()

    # 文件的总数
    file_count = len(file_names)

    coly_file_num = 0
    while True:
        file_name = q.get()
        coly_file_num += 1
        # 拷贝的进度
        print("拷贝的进度%f%%" % (coly_file_num*100/file_count), end='')
        if coly_file_num >= file_count:
            break


if __name__ == '__main__':
    main()

协程

同步、异步
  • 同步:是指代码调用IO操作时,必须等待IO操作完成才返回的调用方式
  • 异步:是指代码调用IO操作时,不必等IO操作完成就返回的调用方式
阻塞、非阻塞
  • 阻塞:从调用者的角度出发,如果在调用的时候,被卡住,不能再继续向下运行,需要等待,就说是阻塞
  • 非阻塞:从调用者的角度出发, 如果在调用的时候,没有被卡住,能够继续向下运行,无需等待,就说是非阻塞
生成器-send方法
  • send()方法有一个参数,该参数指定的是上一次被挂起的yield语句的返回值
  • 生成器启动时候send()参数必须为None
def a():
    print('aaa')
    p = yield '123'
    print(p)
    print('bbb')
    p1 = yield '789'


r = a()
print(r.send(None))
print(r.send('456'))

>> aaa
>> 123
>> 456
>> bbb
>> 789
  • 接收生成器return的值需要用异常抛出
def a():
    print('aaa')
    yield '789'
    return "end"


r = a()
print(r.send(None))
try:
    print(next(r))
except Exception as e:
    print(e)

>> aaa
>> 789
>> end
  • close()方法关闭生成器后,nextsend直接抛出异常
使用yield完成多任务
import time

def task1():
    while True:
        print("--1--")
        time.sleep(0.1)
        yield

def task2():
    while True:
        print("--2--")
        time.sleep(0.1)
        yield

def main():
    t1 = task1()
    t2 = task2()
    while True:
        next(t1)
        next(t2)

if __name__ == "__main__":
    main()
yield from介绍

python3.3新加了yield from语法
【子生成器】:yield from后的generator_1()生成器函数是子生成器
【委托生成器】:generator_2()是程序中的委托生成器,它负责委托子生成器完成具体任务。
【调用方】:main()是程序中的调用方,负责调用委托生成器。

lis = [1, 2, 3]


def generator_1(lis):
    yield lis


def generator_2(lis):
    yield from lis


for i in generator_1(lis):
    print(i)    # [1, 2, 3]


for i in generator_2(lis):
    print(i)    # 1, 2, 3

yield from还可以做委托生成器,获取return返回的值

def generator_1():
    total = 0
    while True:
        x = yield
        print('加', x)
        if not x:
            break
        total += x
    return total


def generator_2():  # 委托生成器
    while True:
        total = yield from generator_1()  # 子生成器
        print('加和总数是:', total)


def main():  # 调用方
    # g1 = generator_1()
    # g1.send(None)
    # g1.send(2)
    # g1.send(3)
    # g1.send(None)
    g2 = generator_2()
    g2.send(None)
    g2.send(2)
    g2.send(3)
    g2.send(None)


if __name__ == '__main__':
    main()
协程

协程,又称微线程
协程是python个中另外一种实现多任务的方式,只不过比线程更小占用更小执行单元(理解为需要的资源)

  • Python中的协程大概经历了如下三个阶段:
    1. 最初的生成器变形yield/send
    2. yield from
    3. 在最近的Python3.5版本中引入async/await关键字
使用greenlet完成多任务

greenlet必须手动切换

from greenlet import greenlet
import time

# 协程利用程序的IO 来切换任务
def demo1():
    while True:
        print("demo1")
        gr2.switch()
        time.sleep(0.5)


def demo2():
    while True:
        print("demo2")
        gr1.switch()
        time.sleep(0.5)


gr1 = greenlet(demo1)
# print(greenlet.__doc__)
gr2 = greenlet(demo2)

gr1.switch()
使用gevent完成多任务

使用gevent必须使用gevent.sleep()睡眠后才会切换
gevent启动会在time.sleep()之后执行,gevent.sleep()才会打断
monkey.patch_all()会将程序中用到的耗时操作 换为gevent中实现的模块

import gevent
import time
from gevent import monkey
# 将程序中用到的耗时操作  换为gevent中实现的模块
monkey.patch_all()

def f1(n):
    for i in range(n):
        print(gevent.getcurrent(), i)
        time.sleep(0.5)
        # gevent.sleep(0.5)


def f2(n):
    for i in range(n):
        print(gevent.getcurrent(), i)
        time.sleep(0.5)
        # gevent.sleep(0.5)

def f3(n):
    for i in range(n):
        print(gevent.getcurrent(), i)
        time.sleep(0.5)
        # gevent.sleep(0.5)


print("--1--")
g1 = gevent.spawn(f1, 5)
print("--2--")
time.sleep(1)
# gevent.sleep(0.5)
g2 = gevent.spawn(f2, 5)
print("--3--")
g3 = gevent.spawn(f3, 5)
print("--4--")


g1.join()
g2.join()
g3.join()
下载器
import gevent

from gevent import monkey
monkey.patch_all()
# 必须放到patch_all()之下,否则会报警告
import requests     # urllib 进行封装 爬虫 80-90

# 并行  并发
# 协程  并发

def download(url):
    print("get: %s" % url)
    res = requests.get(url)
    data = res.text
    print(len(data), url)


# g1 = gevent.spawn(download, 'https://www.baidu.com/')
# g2 = gevent.spawn(download, 'https://www.python.org/')
# g3 = gevent.spawn(download, 'https://www.baidu.com/')
# g1.join()
# g2.join()
# g3.join()
gevent.joinall([
    gevent.spawn(download, 'https://www.baidu.com/'),
    gevent.spawn(download, 'https://www.python.org/'),
    gevent.spawn(download, 'https://www.baidu.com/')
])

总结

  • 进程是资源分配的单位
  • 线程是操作系统调度的单位
  • 进程切换需要的资源很最大,效率很低
  • 线程切换需要的资源一般,效率一般(当然了在不考虑GIL的情况下)
  • 协程切换任务资源很小,效率高
  • 多进程、多线程根据cpu核数不一样可能是并行的,但是协程是在一个线程中 所以是并发

相关文章

  • Python多任务(高级编程六)

    并行:真的多任务 cpu大于当前执行的任务并发:假的多任务 cpu小于当前执行的任务 多线程 查看线程数量 thr...

  • GIL

    Python的GIL是什么鬼 学习编程的时候,我们少会涉及到多任务。可是在python中使用多任务经常会提...

  • day24系统编程

    1python系统编程 1.1进程 1.1.1多任务的引入 单任务: 多任务: 说明: ·程序执行到os.fork...

  • Python从入门到精通

    Python语法的三个阶段 Python基础语法函数是编程 Python进阶语法面向对象编程 Python高级语法...

  • 13个Python GUI库

    Python Python是一门高级编程语言。它用于通用编程。Python语言由Guido van Rossum创...

  • Python简介与IDE

    Python 简介 Python是一种广泛使用的解释型、高级编程、通用型编程语言。Python主要分为Python...

  • Python开发者必知的13个Python GUI库(转)

    源 | Python编程 Python是一种高级编程语言,它用于通用编程,由Guido van Rossum 在1...

  • Python高级编程[www.rejoiceblog.com].

    下载地址:Python高级编程[www.rejoiceblog.com].pdf

  • 为什么要学习python

    Python的简介 Python是一种广泛使用的解释型、高级编程、通用型编程语言。 Python的设计哲学强调代码...

  • python_多任务编程

    多任务 多任务:在同一时间内执行多个任务多任务的目的:多任务的最大好处是充分利用CPU资源,提高程序的执行效率 ...

网友评论

      本文标题:Python多任务(高级编程六)

      本文链接:https://www.haomeiwen.com/subject/ztuvactx.html