美文网首页
python进阶-05-并发编程

python进阶-05-并发编程

作者: 西海岸虎皮猫大人 | 来源:发表于2020-03-30 18:50 被阅读0次

1 概念

概念

并发即同时执行
真正的并发是在多核CPU上执行,任务数多于核心数则多核轮流执行

模拟并发
# coding=utf-8
from time import sleep


def sing():
    for i in range(3):
        print('唱歌...%d'%i)
        dance()
        sleep(1)


def dance():
    for i in range(3):
        print('跳舞...%d'%i)
        sleep(1)


if __name__ == '__main__':
    sing()

2.进程创建及传参

多进程包 multiprocessing

通过Process创建,start方法启动进程
# coding=utf-8
# 导入模块
from multiprocessing import Process


def run_test():
    print('test...')


if __name__ == '__main__':
    print('主进程执行')
    # 创建子进程 target接收执行的任务
    p = Process(target=run_test)
    # 启动子进程
    p.start()
创建子进程,传递参数
# coding=utf-8
from multiprocessing import Process


def run_test(name, age, **kwargs):
    print('子进程, name:%s, age:%s'%(name,age))
    print('字典kwargs:', kwargs)


if __name__ == '__main__':
    print('主进程')
    # 创建子进程 参数使用元组 可变参数使用字典
    p = Process(target=run_test, args=('test',18), kwargs={'key': 12})
    p.start()

3.join方法

主进程要等待join的方法停止

# coding=utf-8
from multiprocessing import Process
from time import sleep


def worker(interval):
    print('worker start...')
    sleep(interval)
    print('worker end...')


if __name__ == '__main__':
    print('主进程 start...')
    # 创建子进程
    p = Process(target=worker, args=(3,))
    p.start()
    p.join()
    # 主进程等待1秒即执行
    # p.join(1)
    print('主进程 end...')

4.进程属性

name 名称
id

# coding=utf-8
from multiprocessing import Process
import time


def clock(interval):
    for i in range(3):
        print('当前时间:{}'.format(time.ctime()))
        time.sleep(interval)


if __name__ == '__main__':
    # 创建子进程
    p = Process(target=clock, args=(1,))
    p.start()
    print('p.id', p.pid)
    # 进程名称
    print('p.name', p.name)
    # 进程是否存活
    print('p.is_alive', p.is_alive())
创建多个任务
# coding=utf-8
from multiprocessing import Process
from time import sleep


def work1(interval):
    print('work1 start...')
    sleep(interval)
    print('work1 end...')


def work2(interval):
    print('work2 start...')
    sleep(interval)
    print('work2 end...')


def work3(interval):
    print('work3 start...')
    sleep(interval)
    print('work3 end...')


if __name__ == '__main__':
    print('主进程 start...')
    p1 = Process(target=work1, args=(4,))
    p2 = Process(target=work2, args=(2,))
    p3 = Process(target=work3, args=(3,))

    p1.start()
    p2.start()
    p3.start()

    p1.join()
    p2.join()
    p3.join()

    print('p1.name', p1.name)
    print('p2.name', p2.name)
    print('p3.name', p3.name)

    print('主进程 end...')

执行结果:

主进程 start...
work2 start...
work3 start...
work1 start...
work2 end...
work3 end...
work1 end...
p1.name Process-1
p2.name Process-2
p3.name Process-3
主进程 end...

说明:
系统会为进程指定一个默认的名字

5.使用继承的方式创建进程

继承Process类,重写run方法

# coding=utf-8
from multiprocessing import Process
from time import sleep
import time


class ClockProcess(Process):
    # 重写init方法
    def __init__(self, inteval):
        Process.__init__(self)
        self.inteval = inteval

    # 重写run方法
    def run(self):
        print('子进程开始时间{}'.format(time.ctime()))
        sleep(self.inteval)
        print('子进程结束时间{}'.format(time.ctime()))


if __name__ == '__main__':
    p = ClockProcess(3)
    p.start()
    p.join()
    print('主进程结束')

6.进程池的使用

限制进程数量,超过数量则等待

进程池使用(非阻塞)
# coding=utf-8
import multiprocessing
import time


def func(msg):
    print('start:', msg)
    time.sleep(3)
    print('end:', msg)


if __name__ == '__main__':
    # 创建初始化3的进程池
    pool = multiprocessing.Pool(3)
    # 添加任务
    for i in range(5):
        msg = '进程%d'%i
        # 异步执行
        pool.apply_async(func, (msg,))

    # 如果进程池不再接收新的请求,调用close
    pool.close()
    # 等待子进程结束
    pool.join()

说明:
第3个任务执行完毕后,再执行第4个任务

进程池使用(阻塞)
# 上述代码修改
# pool.apply_async(func, (msg,))
pool.apply(func, (msg,))

说明:
阻塞即单进程,一个进程结束后再执行第二个进程

7.全局变量在进程之间不共享

# coding=utf-8
from multiprocessing import Process
num = 10


def work1():
    global num
    num += 5
    print('子进程1: num=', num)


def work2():
    global num
    num += 10
    print('子进程2: num=', num)


if __name__ == '__main__':
    print('主进程 start...')
    p1 = Process(target=work1)
    p2 = Process(target=work2)
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print(num)
    print('主进程 end...')

执行结果:

主进程 start...
子进程1: num= 15
子进程2: num= 20
10
主进程 end...

说明:
全局变量在进程间不共享

8.队列方法

队列,先进先出

# coding=utf-8
from multiprocessing import Queue

# 创建一个队列,指定队列大小为3,默认为无限
q = Queue(3)
q.put('消息1')
q.put('消息2')
q.put('消息3')
# put方法可选参数 队列已满等待1秒后抛异常
# q.put('消息4', block=True, timeout=1)
# 如果队列已满
if not q.full():
    q.put('消息4', block=True, timeout=1)

# 读取并删除元素
# print(q.get())
# print(q.get())
# print(q.get())
# 如果队列已空等待1秒
# print(q.get(block=True, timeout=1))
# 如果队列非空
# if not q.empty():
#     print(q.get(block=True, timeout=1))

# 查看队列大小
print(q.qsize())

# 循环获取队列元素
for i in range(q.qsize()):
    print(q.get())

9.进程之间通信

使用队列实现
# coding=utf-8
from multiprocessing import Process,Queue
from time import sleep


def write(q):
    a = ['a', 'b', 'c', 'd', 'e']
    for i in a:
        print('开始写入的值:%s'%i)
        q.put(i)
        sleep(1)


def read(q):
    for i in range(q.qsize()):
        print('读取到的值:%s'%q.get())
        sleep(1)


if __name__ == '__main__':
    # 创建队列
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    pw.start()
    pw.join()
    pr.start()
    pr.join()
进程池进程间通信

注意是Manager模块的Queue

# coding=utf-8
from multiprocessing import Process, Pool, Manager
from time import sleep


def write(q):
    a = ['a', 'b', 'c', 'd', 'e']
    for i in a:
        print('开始写入的值:%s'%i)
        q.put(i)
        sleep(1)


def read(q):
    for i in range(q.qsize()):
        print('读取到的值:%s'%q.get())
        sleep(1)


if __name__ == '__main__':
    # 创建队列
    q = Manager().Queue()
    pool = Pool(3)
    pool.apply(write, (q,))
    pool.apply(read, (q,))
    pool.close()
    pool.join()

10.进程和线程的区别

操作系统的一个任务就是一个进程,比如QQ
子任务就是线程
进程具有独立的代码和数据空间,线程共享内存
进程可以拥有多个并发的线程

11._thread创建线程

python3使用_thread替换了thread模块

# coding=utf-8
import _thread
import time


def fun1():
    print('fun1 start...')
    time.sleep(4)
    print('fun1 end...')


def fun2():
    print('fun2 start...')
    time.sleep(2)
    print('fun2 end...')


if __name__ == '__main__':
    print('main start...')
    # 创建线程
    _thread.start_new_thread(fun1, ())
    _thread.start_new_thread(fun2, ())
    time.sleep(7)
为线程传递参数
# coding=utf-8
import _thread
import time


def fun1(thread_name, delay):
    print('fun1 start, 线程名:', thread_name)
    time.sleep(delay)
    print('fun1 end...')


def fun2(thread_name, delay):
    print('fun2 start, 线程名:', thread_name)
    time.sleep(delay)
    print('fun2 end...')


if __name__ == '__main__':
    print('main start...')
    # 创建线程
    _thread.start_new_thread(fun1, ('线程1', 4))
    _thread.start_new_thread(fun2, ('线程2', 2))
    time.sleep(7)

12.threading模块创建线程

run start join等方法与进程类似

# coding=utf-8
import threading
import time


def fun1(thread_name, delay):
    print('fun1 start, 线程名:', thread_name)
    time.sleep(delay)
    print('fun1 end...')


def fun2(thread_name, delay):
    print('fun2 start, 线程名:', thread_name)
    time.sleep(delay)
    print('fun2 end...')


if __name__ == '__main__':
    print('main start...')
    # 创建线程
    t1 = threading.Thread(target=fun1, args=('thread-1', 2))
    t2 = threading.Thread(target=fun1, args=('thread-2', 3))
    t1.start()
    t2.start()
    # thread-1休眠时,thread-2会抢占CPU
    t1.join()
    t2.join()
通过继承的方式创建线程
# coding=utf-8
import threading
import time


def fun1(delay):
    # 获取当前线程名
    print('fun1 start, 线程名:', threading.current_thread().getName())
    time.sleep(delay)
    print('fun1 end...')


def fun2(delay):
    print('fun2 start, 线程名:', threading.current_thread().getName())
    time.sleep(delay)
    print('fun2 end...')


class MyThread(threading.Thread):
    # 重写父类构造方法,func要执行的函数,name表示线程名,args线程参数
    def __init__(self, func, name, args):
        super().__init__(target=func, name=name, args=args)
        
    # 重写run方法
    def run(self):
        # 元组参数为*
        self._target(*self._args)


if __name__ == '__main__':
    print('main start...')
    # 创建线程,注意args为元组
    t1 = MyThread(fun1, 'thread-1', (2,))
    t2 = MyThread(fun2, 'thread-2', (4,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()

13.线程之间共享全局变量

# coding=utf-8
from threading import Thread

# 定义全局变量num
num = 10


def test1():
    global num
    for i in range(3):
        num += 1
    print('test1, num=', num)


def test2():
    global num
    for i in range(3):
        num += 1
    print('test2, num=', num)


if __name__ == '__main__':
    print('main start...')
    t1 = Thread(target=test1)
    t2 = Thread(target=test2)
    t1.start()
    t1.join()
    t2.start()
    t2.join()
    print('main num=', num)

执行结果:

main start...
test1, num= 13
test2, num= 16
main num= 16

说明:
进程1对全局变量的修改影响进程2
同进程的线程之间共享全局变量,不同进程间不共享
多个线程修改同一变量值可能造成混乱

多线程操作变量造成混乱
# coding=utf-8
from threading import Thread

# 定义全局变量num
num = 0


def test1():
    global num
    for i in range(1000000):
        num += 1
    print('test1, num=', num)


def test2():
    global num
    for i in range(1000000):
        num += 1
    print('test2, num=', num)


if __name__ == '__main__':
    print('main start...')
    t1 = Thread(target=test1)
    t2 = Thread(target=test2)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print('main num=', num)

执行结果:

main start...
test1, num= 1158345
test2, num= 1475121
main num= 1475121

说明:
在本人电脑执行10万次并没有造成混乱现象
混乱的原因,线程1获取到num的值要执行num+1时,前程2获取到CPU资源,线程1随后获取到CPU资源执行num+1时操作的是过时的num
解决方式,加锁

14.互斥锁

线程操作时对变量加锁,其他线程不能操作
线程释放锁后,其他线程才能操作
基于13中的代码修改

# coding=utf-8
from threading import Thread,Lock

# 定义全局变量num
num = 0
lock = Lock()


def test1():
    global num
    # 上锁
    lock.acquire()
    for i in range(1000000):
        num += 1
    # 释放锁
    lock.release()
    print('test1, num=', num)


def test2():
    global num
    lock.acquire()
    for i in range(1000000):
        num += 1
    lock.release()
    print('test2, num=', num)


if __name__ == '__main__':
    print('main start...')
    t1 = Thread(target=test1)
    t2 = Thread(target=test2)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print('main num=', num)

说明:
上述方式效率低,线程1循环过程中,线程2无法操作
改进:
只对num += 1上锁

def test1():
    global num
    for i in range(1000000):
        # 上锁
        lock.acquire()
        num += 1
        # 释放锁
        lock.release()
    print('test1, num=', num)


def test2():
    global num
    for i in range(1000000):
        lock.acquire()
        num += 1
        lock.release()
    print('test2, num=', num)

15.线程同步

同步,按照预定顺序执行线程
适用于线程B依赖于线程A结果的场景

# coding=utf-8
from threading import Thread,Lock
import time

# 创建3把互斥锁
lock1 = Lock()
lock2 = Lock()
lock3 = Lock()

# 对lock2 lock3上锁
lock2.acquire()
lock3.acquire()


class Task1(Thread):
    def run(self):
        while True:
            if lock1.acquire():
                print('task1...')
                time.sleep(1)
                lock2.release()


class Task2(Thread):
    def run(self):
        while True:
            if lock2.acquire():
                print('task2...')
                time.sleep(1)
                lock3.release()


class Task3(Thread):
    def run(self):
        while True:
            if lock3.acquire():
                print('task3...')
                time.sleep(1)
                lock1.release()


if __name__ == '__main__':
    print('main start...')
    t1 = Task1()
    t2 = Task2()
    t3 = Task3()
    t1.start()
    t2.start()
    t3.start()

说明:
task1获取锁1,然后释放锁2
task2获取锁2,然后释放锁3
task3获取锁3,然后释放锁1
保证task1-2-3的执行顺序

16.生产者消费者模式

可以通过队列完成生产者消费者之间的通信

# coding=utf-8
from threading import Thread
from queue import Queue
import time


class Producer(Thread):
    def run(self):
        global queue
        count = 0
        while True:
            # 判断队列大小
            if queue.qsize() < 1000:
                for i in range(100):
                    count += 1
                    msg = '生产第' + str(count)+ '个产品'
                    queue.put(msg)
                    print(msg)
                time.sleep(0.5)


class Consumer(Thread):
    def run(self):
        global queue
        while True:
            if queue.qsize() > 100:
                for i in range(10):
                    msg = self.name + '消费' + queue.get()
                    print(msg)
                time.sleep(1)


if __name__ == '__main__':
    queue = Queue()
    p = Producer()
    c = Consumer()
    p.start()
    time.sleep(1)
    c.start()

说明:
首先,启动生产者线程
生产者每生产100个产品后休眠0.5秒
队列数大于100时,消费者开始消费
每消费10个产品后休眠1秒

17.ThreadLocal

多线程对全局变量修改会影响其他线程,通过加锁解决并发问题
ThreadLocal每个线程的私有数据,其他线程不可见

# coding=utf-8
import threading


# 创建ThreadLocal对象
local = threading.local()


def process_student():
    student_name = local.name
    print('线程名:%s 学生姓名:%s'%(threading.current_thread().getName(), student_name))


def process_thread(name):
    # 传入的name绑定到local的name
    local.name = name
    process_student()


t1 = threading.Thread(target=process_thread, args=('张三',), name="Thread-A")
t2 = threading.Thread(target=process_thread, args=('李四',), name="Thread-B")
t1.start()
t2.start()
t1.join()
t2.join()

执行结果:

线程名:Thread-A 学生姓名:张三
线程名:Thread-B 学生姓名:李四

相关文章

网友评论

      本文标题:python进阶-05-并发编程

      本文链接:https://www.haomeiwen.com/subject/pvxbuhtx.html