……续上回Fibonacci数列高效解法大全及时间复杂度分析 连载【8】
在家用电脑、手机都是一堆核心,数框框的今日
自然会想到的一项提速手段是——并行运算
就以 Fibonacci数列高效解法大全及时间复杂度分析 连载【7】中第14节的“生成数列的GMP内置fib2函数解法”来说
生成从0至n的斐波那契数列是一个数算完放入结果列表,再算下一个数
这完全可以用我们的CPU多核同时算几个数,四核的话同时算四个数多好多快,完美
这就是分布在多核的多进程并行算法
四核下会是单核的4倍速吗?也就是加速比能到达4吗?
以四核为例,当实现加速比为4,那么并行算法效率就为100%
呃,那是理想状态,实际远远到不了
到不了的原因在于变成多核多进程,一个大任务要在主进程分解成几个子任务(子进程)分配到各核运算,子任务算完后再汇总回主进程,这就有了附加开销。包括任务分解、任务调度、传参、子进程上下文切换、进程间传递数据、进程间同步锁、结果合并等,这些开销很大。让效率下降不少
就生成斐波那契数列而言,实现其并行算法,中间开销最大的应该是进程间传递数据,子进程要返回主进程超大规模的数据
那么对于单机这情况,都知道通常是共享内存方式最快
可以套用时髦的概念“内存计算”
那么下面我来实现一下,一个没有多少优化的原型共享内存并行算法
17. 并行共享内存解法
import time
import multiprocessing as mp
import gmpy2
import ctypes
def fib2_processes(shared_variables_for_fib_seq_results: 'array ctypes.c_ubyte', element_length_record: 'array ctypes.c_ulonglong', write_protection_pointer: 'ctypes.c_ulonglong', unread_quantity: 'ctypes.c_ulonglong', overflow_of_n: 'ctypes.c_longlong', mutexlock: 'Lock', n_iterable: 'iterable') -> None:
start_time = time.process_time_ns()
Fib_seq_array_size = len(shared_variables_for_fib_seq_results)
write_pointer = 0
for n in n_iterable:
if (Fib_seq_array_size - write_pointer) < (n // 4): #预估第n和n-1项两fib数总共可能占用的字节。公式是 n * 2 / 8 -> n / 4
write_pointer = 0
overflow_of_n.value = n
for element in gmpy2.fib2(n):
element_bytes = gmpy2.to_binary(element)[2:]
element_length = len(element_bytes)
next_write_pointer = write_pointer + element_length
while (overflow_of_n.value > 0) and (next_write_pointer > write_protection_pointer.value):
pass #在溢出状态未消除情况下,如果要写入的范围高于写保护,就空转等待。
element_length_record[n] = element_length
with mutexlock:
shared_variables_for_fib_seq_results[write_pointer: next_write_pointer] = element_bytes
unread_quantity.value += 1
write_pointer = next_write_pointer
n -= 1
end_time = time.process_time_ns()
print ('计算和序列化用时 %.1f seconds.' % ((end_time - start_time)/10**9))
return None
def Fibonacci_sequence_21 (n: int) -> list: #参数n是表示求n项Fibonacci数列
'多进程共享内存的返回列表的GMP内置fib函数解法'
assert isinstance(n, int), 'n is an error of non-integer type.'
if n>=0:
start_time = time.process_time_ns()
Number_of_processes = max(mp.cpu_count() - 1, 1)
size_of_shared_variables_for_fib_processes = 1 * (1024 ** 2) // Number_of_processes #就是占用的内存数量,单位Byte。注意太大的n也要随之加大这个尺寸
list_of_shared_variables_for_fib_seq_results = []
list_of_element_length_record = []
list_of_write_protection_pointer = []
list_of_unread_quantity = []
list_of_overflow_n = []
list_of_mutexlock = []
for i in range(Number_of_processes):
list_of_shared_variables_for_fib_seq_results.append(mp.RawArray(ctypes.c_ubyte, size_of_shared_variables_for_fib_processes))
list_of_element_length_record.append(mp.RawArray(ctypes.c_ulonglong, n + 1))
list_of_write_protection_pointer.append(mp.RawValue(ctypes.c_ulonglong, size_of_shared_variables_for_fib_processes - 1))
list_of_unread_quantity.append(mp.RawValue(ctypes.c_ulonglong, 0))
list_of_overflow_n.append(mp.RawValue(ctypes.c_longlong, -1))
list_of_mutexlock.append(mp.Lock())
list_of_n_iterable_for_fib2_processes = [range(n - i * 2, 0, -(Number_of_processes * 2)) for i in range(Number_of_processes)]
fib2_process_list = [None] * Number_of_processes
for i in range(Number_of_processes):
fib2_process_list[i] = mp.Process(target = fib2_processes, args = (list_of_shared_variables_for_fib_seq_results[i], list_of_element_length_record[i], list_of_write_protection_pointer[i], list_of_unread_quantity[i], list_of_overflow_n[i], list_of_mutexlock[i], list_of_n_iterable_for_fib2_processes[i]))
fib2_process_list[i].start()
fib_list = [None] * (n + 1)
n_list_for_fib2_processes = []
for n_iterable in list_of_n_iterable_for_fib2_processes:
n_list_for_fib2_processes.append(list(n_iterable))
list_of_pointers_to_n_list = [0] * Number_of_processes
list_of_number_of_reciprocating = [0] * Number_of_processes
list_of_read_pointer = [0] * Number_of_processes
while True:
if mp.active_children() == []: #检测 当全部子进程都不是活的以后,进行下一个检测准备结束循环
for unread_quantity in list_of_unread_quantity:
if unread_quantity.value != 0: break
else: #检测当所有进程结果的未读数都等于零就结束循环
break
for i in range(Number_of_processes):
if list_of_unread_quantity[i].value != 0:
n_for_fib2_processes = n_list_for_fib2_processes[i][list_of_pointers_to_n_list[i]] - list_of_number_of_reciprocating[i]
if n_for_fib2_processes != list_of_overflow_n[i].value:
next_read_pointer = list_of_read_pointer[i] + list_of_element_length_record[i][n_for_fib2_processes]
fib_list[n_for_fib2_processes] = int.from_bytes(bytes(list_of_shared_variables_for_fib_seq_results[i][list_of_read_pointer[i]: next_read_pointer]), 'little')
with list_of_mutexlock[i]:
list_of_write_protection_pointer[i].value = next_read_pointer
list_of_unread_quantity[i].value -= 1
list_of_read_pointer[i] = next_read_pointer
list_of_pointers_to_n_list[i] += list_of_number_of_reciprocating[i]
list_of_number_of_reciprocating[i] ^= 1
else:
list_of_read_pointer[i] = 0
with list_of_mutexlock[i]:
list_of_write_protection_pointer[i].value = 0
list_of_overflow_n[i].value = -1
if n & 1 == 0:
fib_list[0] = 0
end_time = time.process_time_ns()
print ('主进程用时 %.1f seconds.' % ((end_time - start_time)/10**9))
return fib_list
else:
return None
if __name__ == '__main__':
start_time = time.perf_counter()
Fibonacci_sequence_21(1000000)
end_time = time.perf_counter()
print ('最终用时 %.1f seconds.' % (end_time - start_time))
在我的四核16GB物理内存电脑上算100万斐波那契数列(数据总占用内存40GB,所以包括虚拟内存)用时为:
1384秒
在之前14节单进程解法算100万斐波那契数列用时为:
1857秒
缩短用时25.5%
然而这是4核并行,并行效率为33.5%。并不高效的并行算法
各位大佬,以14节那个单进程解法为基础,来尝试写出你们的高效并行解法,在下方留言吧
结尾,嘿,欢迎点赞和赞赏支持
未完待续……
网友评论