处理同时在多个文件中筛选日志数据时,为了提高效率考虑使用Python多进程。对比单进程的时候,在本机(奔腾处理器)未发现有太大的提升,但是将其放入服务器运行(6核6线程)提升明显。
今天将多进程用法简单做下笔记,以便日后查询。
- Pool
from multiprocessing import Pool, cpu_count
import os
import time
def test(num):
# 获取子进程的名称
print("{} is running...".format(os.getpid()))
print(num)
# 休眠5秒
time.sleep(5)
if __name__ == "__main__":
start = time.time()
# 打印CPU核心数量
print("CPU counters: {}".format(cpu_count()))
if cpu_count() > 1:
p = Pool()
p.apply_async(test, args=(1, ))
p.apply_async(test, args=(2, ))
# 关闭Pool
p.close()
p.join()
print("Multi Cost: {}".format(time.time() - start ))
start = time.time()
test(1)
test(2)
print("Single Cost: {}".format(time.time() - start ))
运行结果如下:
CPU counters: 2
22168 is running...
25172 is running...
2
1
Multi Cost: 7.875996828079224
21360 is running...
1
21360 is running...
2
Single Cost: 10.007039546966553
可见,多进程执行程序还是稍微有点影响的。
- 进程间数据共享
from multiprocessing import Pool, Manager, Process, cpu_count
import os
import time
def test(num, l, d):
print("{} is running...".format(os.getpid()))
print(num)
l.append(num * 2)
d[num] = num * 2
# 休眠5秒
time.sleep(5)
if __name__ == "__main__":
start = time.time()
# 打印CPU核心数量
print("CPU counters: {}".format(cpu_count()))
if cpu_count() > 1:
l = Manager().list()
d = Manager().dict()
p = Pool()
p.apply_async(test, args=(1, l, d))
p.apply_async(test, args=(2, l, d))
# 关闭Pool
p.close()
p.join()
print("List: {}".format(l))
print("Dict: {}".format(d))
print("Multi Cost: {}".format(time.time() - start ))
运行结果如下:
CPU counters: 2
13924 is running...
1
27060 is running...
2
List: [2, 4]
Dict: {1: 2, 2: 4}
Multi Cost: 11.537004232406616
使用Manager的好处是不用加锁,因为它已经默认加锁了
- 进程数据共享的一个“坑”
错误的代码:
from multiprocessing import Pool, Manager, Process, cpu_count
import os
import time
def test(l):
print("{} is running...".format(os.getpid()))
print("inner list: {}".format(l))
l[0][1] = 9999
print("inner has changed")
if __name__ == "__main__":
start = time.time()
# 打印CPU核心数量
print("CPU counters: {}".format(cpu_count()))
if cpu_count() > 1:
l = Manager().list()
l.append({1: 2})
p1 = Process(target=test, args=(l, ))
p1.start()
p1.join()
print("Outter list: {}".format(l[0]))
print("Multi Cost: {}".format(time.time() - start ))
运行结果如下:
CPU counters: 2
27608 is running...
inner list: [{1: 2}]
inner has changed
Outter list: {1: 2}
Multi Cost: 4.526983976364136
会发现,列表中的数据并没有被修改未‘9999’
正确的代码:
from multiprocessing import Pool, Manager, Process, cpu_count
import os
import time
def test(l):
print("{} is running...".format(os.getpid()))
print("inner list: {}".format(l))
# 错误的交换变量
temp = l[0][1]
temp = 9999
l[0][1] = temp
# 正确的交换变量
temp = l[0]
temp[1] = 9999
l[0] = temp
print("inner has changed")
if __name__ == "__main__":
start = time.time()
# 打印CPU核心数量
print("CPU counters: {}".format(cpu_count()))
if cpu_count() > 1:
l = Manager().list()
l.append({1: 2})
p1 = Process(target=test, args=(l, ))
p1.start()
p1.join()
print("Outter list: {}".format(l))
print("Multi Cost: {}".format(time.time() - start ))
运行结果为:
CPU counters: 2
7784 is running...
inner list: [{1: 2}]
inner has changed
Outter list: [{1: 9999}]
Multi Cost: 4.427980184555054
终于出现了预期的结果,其中还有一个错误交换变量的示范。所以更改共享数据中列表的数据,Manager无法感知,需要用交换变量的方法解决。
参考:
https://www.jianshu.com/p/52676b93430d
https://blog.csdn.net/qhd1994/article/details/79864087
网友评论