1 概念
概念
并发即同时执行
真正的并发是在多核CPU上执行,任务数多于核心数则多核轮流执行
模拟并发
# coding=utf-8
from time import sleep
def sing():
for i in range(3):
print('唱歌...%d'%i)
dance()
sleep(1)
def dance():
for i in range(3):
print('跳舞...%d'%i)
sleep(1)
if __name__ == '__main__':
sing()
2.进程创建及传参
多进程包 multiprocessing
通过Process创建,start方法启动进程
# coding=utf-8
# 导入模块
from multiprocessing import Process
def run_test():
print('test...')
if __name__ == '__main__':
print('主进程执行')
# 创建子进程 target接收执行的任务
p = Process(target=run_test)
# 启动子进程
p.start()
创建子进程,传递参数
# coding=utf-8
from multiprocessing import Process
def run_test(name, age, **kwargs):
print('子进程, name:%s, age:%s'%(name,age))
print('字典kwargs:', kwargs)
if __name__ == '__main__':
print('主进程')
# 创建子进程 参数使用元组 可变参数使用字典
p = Process(target=run_test, args=('test',18), kwargs={'key': 12})
p.start()
3.join方法
主进程要等待join的方法停止
# coding=utf-8
from multiprocessing import Process
from time import sleep
def worker(interval):
print('worker start...')
sleep(interval)
print('worker end...')
if __name__ == '__main__':
print('主进程 start...')
# 创建子进程
p = Process(target=worker, args=(3,))
p.start()
p.join()
# 主进程等待1秒即执行
# p.join(1)
print('主进程 end...')
4.进程属性
name 名称
id
# coding=utf-8
from multiprocessing import Process
import time
def clock(interval):
for i in range(3):
print('当前时间:{}'.format(time.ctime()))
time.sleep(interval)
if __name__ == '__main__':
# 创建子进程
p = Process(target=clock, args=(1,))
p.start()
print('p.id', p.pid)
# 进程名称
print('p.name', p.name)
# 进程是否存活
print('p.is_alive', p.is_alive())
创建多个任务
# coding=utf-8
from multiprocessing import Process
from time import sleep
def work1(interval):
print('work1 start...')
sleep(interval)
print('work1 end...')
def work2(interval):
print('work2 start...')
sleep(interval)
print('work2 end...')
def work3(interval):
print('work3 start...')
sleep(interval)
print('work3 end...')
if __name__ == '__main__':
print('主进程 start...')
p1 = Process(target=work1, args=(4,))
p2 = Process(target=work2, args=(2,))
p3 = Process(target=work3, args=(3,))
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
print('p1.name', p1.name)
print('p2.name', p2.name)
print('p3.name', p3.name)
print('主进程 end...')
执行结果:
主进程 start...
work2 start...
work3 start...
work1 start...
work2 end...
work3 end...
work1 end...
p1.name Process-1
p2.name Process-2
p3.name Process-3
主进程 end...
说明:
系统会为进程指定一个默认的名字
5.使用继承的方式创建进程
继承Process类,重写run方法
# coding=utf-8
from multiprocessing import Process
from time import sleep
import time
class ClockProcess(Process):
# 重写init方法
def __init__(self, inteval):
Process.__init__(self)
self.inteval = inteval
# 重写run方法
def run(self):
print('子进程开始时间{}'.format(time.ctime()))
sleep(self.inteval)
print('子进程结束时间{}'.format(time.ctime()))
if __name__ == '__main__':
p = ClockProcess(3)
p.start()
p.join()
print('主进程结束')
6.进程池的使用
限制进程数量,超过数量则等待
进程池使用(非阻塞)
# coding=utf-8
import multiprocessing
import time
def func(msg):
print('start:', msg)
time.sleep(3)
print('end:', msg)
if __name__ == '__main__':
# 创建初始化3的进程池
pool = multiprocessing.Pool(3)
# 添加任务
for i in range(5):
msg = '进程%d'%i
# 异步执行
pool.apply_async(func, (msg,))
# 如果进程池不再接收新的请求,调用close
pool.close()
# 等待子进程结束
pool.join()
说明:
第3个任务执行完毕后,再执行第4个任务
进程池使用(阻塞)
# 上述代码修改
# pool.apply_async(func, (msg,))
pool.apply(func, (msg,))
说明:
阻塞即单进程,一个进程结束后再执行第二个进程
7.全局变量在进程之间不共享
# coding=utf-8
from multiprocessing import Process
num = 10
def work1():
global num
num += 5
print('子进程1: num=', num)
def work2():
global num
num += 10
print('子进程2: num=', num)
if __name__ == '__main__':
print('主进程 start...')
p1 = Process(target=work1)
p2 = Process(target=work2)
p1.start()
p2.start()
p1.join()
p2.join()
print(num)
print('主进程 end...')
执行结果:
主进程 start...
子进程1: num= 15
子进程2: num= 20
10
主进程 end...
说明:
全局变量在进程间不共享
8.队列方法
队列,先进先出
# coding=utf-8
from multiprocessing import Queue
# 创建一个队列,指定队列大小为3,默认为无限
q = Queue(3)
q.put('消息1')
q.put('消息2')
q.put('消息3')
# put方法可选参数 队列已满等待1秒后抛异常
# q.put('消息4', block=True, timeout=1)
# 如果队列已满
if not q.full():
q.put('消息4', block=True, timeout=1)
# 读取并删除元素
# print(q.get())
# print(q.get())
# print(q.get())
# 如果队列已空等待1秒
# print(q.get(block=True, timeout=1))
# 如果队列非空
# if not q.empty():
# print(q.get(block=True, timeout=1))
# 查看队列大小
print(q.qsize())
# 循环获取队列元素
for i in range(q.qsize()):
print(q.get())
9.进程之间通信
使用队列实现
# coding=utf-8
from multiprocessing import Process,Queue
from time import sleep
def write(q):
a = ['a', 'b', 'c', 'd', 'e']
for i in a:
print('开始写入的值:%s'%i)
q.put(i)
sleep(1)
def read(q):
for i in range(q.qsize()):
print('读取到的值:%s'%q.get())
sleep(1)
if __name__ == '__main__':
# 创建队列
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
pw.start()
pw.join()
pr.start()
pr.join()
进程池进程间通信
注意是Manager模块的Queue
# coding=utf-8
from multiprocessing import Process, Pool, Manager
from time import sleep
def write(q):
a = ['a', 'b', 'c', 'd', 'e']
for i in a:
print('开始写入的值:%s'%i)
q.put(i)
sleep(1)
def read(q):
for i in range(q.qsize()):
print('读取到的值:%s'%q.get())
sleep(1)
if __name__ == '__main__':
# 创建队列
q = Manager().Queue()
pool = Pool(3)
pool.apply(write, (q,))
pool.apply(read, (q,))
pool.close()
pool.join()
10.进程和线程的区别
操作系统的一个任务就是一个进程,比如QQ
子任务就是线程
进程具有独立的代码和数据空间,线程共享内存
进程可以拥有多个并发的线程
11._thread创建线程
python3使用_thread替换了thread模块
# coding=utf-8
import _thread
import time
def fun1():
print('fun1 start...')
time.sleep(4)
print('fun1 end...')
def fun2():
print('fun2 start...')
time.sleep(2)
print('fun2 end...')
if __name__ == '__main__':
print('main start...')
# 创建线程
_thread.start_new_thread(fun1, ())
_thread.start_new_thread(fun2, ())
time.sleep(7)
为线程传递参数
# coding=utf-8
import _thread
import time
def fun1(thread_name, delay):
print('fun1 start, 线程名:', thread_name)
time.sleep(delay)
print('fun1 end...')
def fun2(thread_name, delay):
print('fun2 start, 线程名:', thread_name)
time.sleep(delay)
print('fun2 end...')
if __name__ == '__main__':
print('main start...')
# 创建线程
_thread.start_new_thread(fun1, ('线程1', 4))
_thread.start_new_thread(fun2, ('线程2', 2))
time.sleep(7)
12.threading模块创建线程
run start join等方法与进程类似
# coding=utf-8
import threading
import time
def fun1(thread_name, delay):
print('fun1 start, 线程名:', thread_name)
time.sleep(delay)
print('fun1 end...')
def fun2(thread_name, delay):
print('fun2 start, 线程名:', thread_name)
time.sleep(delay)
print('fun2 end...')
if __name__ == '__main__':
print('main start...')
# 创建线程
t1 = threading.Thread(target=fun1, args=('thread-1', 2))
t2 = threading.Thread(target=fun1, args=('thread-2', 3))
t1.start()
t2.start()
# thread-1休眠时,thread-2会抢占CPU
t1.join()
t2.join()
通过继承的方式创建线程
# coding=utf-8
import threading
import time
def fun1(delay):
# 获取当前线程名
print('fun1 start, 线程名:', threading.current_thread().getName())
time.sleep(delay)
print('fun1 end...')
def fun2(delay):
print('fun2 start, 线程名:', threading.current_thread().getName())
time.sleep(delay)
print('fun2 end...')
class MyThread(threading.Thread):
# 重写父类构造方法,func要执行的函数,name表示线程名,args线程参数
def __init__(self, func, name, args):
super().__init__(target=func, name=name, args=args)
# 重写run方法
def run(self):
# 元组参数为*
self._target(*self._args)
if __name__ == '__main__':
print('main start...')
# 创建线程,注意args为元组
t1 = MyThread(fun1, 'thread-1', (2,))
t2 = MyThread(fun2, 'thread-2', (4,))
t1.start()
t2.start()
t1.join()
t2.join()
13.线程之间共享全局变量
# coding=utf-8
from threading import Thread
# 定义全局变量num
num = 10
def test1():
global num
for i in range(3):
num += 1
print('test1, num=', num)
def test2():
global num
for i in range(3):
num += 1
print('test2, num=', num)
if __name__ == '__main__':
print('main start...')
t1 = Thread(target=test1)
t2 = Thread(target=test2)
t1.start()
t1.join()
t2.start()
t2.join()
print('main num=', num)
执行结果:
main start...
test1, num= 13
test2, num= 16
main num= 16
说明:
进程1对全局变量的修改影响进程2
同进程的线程之间共享全局变量,不同进程间不共享
多个线程修改同一变量值可能造成混乱
多线程操作变量造成混乱
# coding=utf-8
from threading import Thread
# 定义全局变量num
num = 0
def test1():
global num
for i in range(1000000):
num += 1
print('test1, num=', num)
def test2():
global num
for i in range(1000000):
num += 1
print('test2, num=', num)
if __name__ == '__main__':
print('main start...')
t1 = Thread(target=test1)
t2 = Thread(target=test2)
t1.start()
t2.start()
t1.join()
t2.join()
print('main num=', num)
执行结果:
main start...
test1, num= 1158345
test2, num= 1475121
main num= 1475121
说明:
在本人电脑执行10万次并没有造成混乱现象
混乱的原因,线程1获取到num的值要执行num+1时,前程2获取到CPU资源,线程1随后获取到CPU资源执行num+1时操作的是过时的num
解决方式,加锁
14.互斥锁
线程操作时对变量加锁,其他线程不能操作
线程释放锁后,其他线程才能操作
基于13中的代码修改
# coding=utf-8
from threading import Thread,Lock
# 定义全局变量num
num = 0
lock = Lock()
def test1():
global num
# 上锁
lock.acquire()
for i in range(1000000):
num += 1
# 释放锁
lock.release()
print('test1, num=', num)
def test2():
global num
lock.acquire()
for i in range(1000000):
num += 1
lock.release()
print('test2, num=', num)
if __name__ == '__main__':
print('main start...')
t1 = Thread(target=test1)
t2 = Thread(target=test2)
t1.start()
t2.start()
t1.join()
t2.join()
print('main num=', num)
说明:
上述方式效率低,线程1循环过程中,线程2无法操作
改进:
只对num += 1上锁
def test1():
global num
for i in range(1000000):
# 上锁
lock.acquire()
num += 1
# 释放锁
lock.release()
print('test1, num=', num)
def test2():
global num
for i in range(1000000):
lock.acquire()
num += 1
lock.release()
print('test2, num=', num)
15.线程同步
同步,按照预定顺序执行线程
适用于线程B依赖于线程A结果的场景
# coding=utf-8
from threading import Thread,Lock
import time
# 创建3把互斥锁
lock1 = Lock()
lock2 = Lock()
lock3 = Lock()
# 对lock2 lock3上锁
lock2.acquire()
lock3.acquire()
class Task1(Thread):
def run(self):
while True:
if lock1.acquire():
print('task1...')
time.sleep(1)
lock2.release()
class Task2(Thread):
def run(self):
while True:
if lock2.acquire():
print('task2...')
time.sleep(1)
lock3.release()
class Task3(Thread):
def run(self):
while True:
if lock3.acquire():
print('task3...')
time.sleep(1)
lock1.release()
if __name__ == '__main__':
print('main start...')
t1 = Task1()
t2 = Task2()
t3 = Task3()
t1.start()
t2.start()
t3.start()
说明:
task1获取锁1,然后释放锁2
task2获取锁2,然后释放锁3
task3获取锁3,然后释放锁1
保证task1-2-3的执行顺序
16.生产者消费者模式
可以通过队列完成生产者消费者之间的通信
# coding=utf-8
from threading import Thread
from queue import Queue
import time
class Producer(Thread):
def run(self):
global queue
count = 0
while True:
# 判断队列大小
if queue.qsize() < 1000:
for i in range(100):
count += 1
msg = '生产第' + str(count)+ '个产品'
queue.put(msg)
print(msg)
time.sleep(0.5)
class Consumer(Thread):
def run(self):
global queue
while True:
if queue.qsize() > 100:
for i in range(10):
msg = self.name + '消费' + queue.get()
print(msg)
time.sleep(1)
if __name__ == '__main__':
queue = Queue()
p = Producer()
c = Consumer()
p.start()
time.sleep(1)
c.start()
说明:
首先,启动生产者线程
生产者每生产100个产品后休眠0.5秒
队列数大于100时,消费者开始消费
每消费10个产品后休眠1秒
17.ThreadLocal
多线程对全局变量修改会影响其他线程,通过加锁解决并发问题
ThreadLocal每个线程的私有数据,其他线程不可见
# coding=utf-8
import threading
# 创建ThreadLocal对象
local = threading.local()
def process_student():
student_name = local.name
print('线程名:%s 学生姓名:%s'%(threading.current_thread().getName(), student_name))
def process_thread(name):
# 传入的name绑定到local的name
local.name = name
process_student()
t1 = threading.Thread(target=process_thread, args=('张三',), name="Thread-A")
t2 = threading.Thread(target=process_thread, args=('李四',), name="Thread-B")
t1.start()
t2.start()
t1.join()
t2.join()
执行结果:
线程名:Thread-A 学生姓名:张三
线程名:Thread-B 学生姓名:李四
网友评论