基于有限约束的多线程并发BFS算法

作者: M23 | 来源:发表于2016-04-30 14:05 被阅读532次

BFS(广度优先搜索)是图论中的一个基础搜索算法,对于下图,BFS将按照节点的数字大小逐一遍历。

BFS-search.png

单线程中的实现

借用队列的先进先出特性实现,伪代码实现如下,代码清晰且易于理解。

 1 Breadth-First-Search(Graph, root):
 2 
 3     for each node n in Graph:            
 4         n.distance = INFINITY        
 5         n.parent = NIL
 6 
 7     create empty queue Q      
 8 
 9     root.distance = 0
10     Q.enqueue(root)                      
11 
12     while Q is not empty:        
13     
14         current = Q.dequeue()
15     
16         for each node n that is adjacent to current:
17             if n.distance == INFINITY:
18                 n.distance = current.distance + 1
19                 n.parent = current
20                 Q.enqueue(n)
21

多线程中的实现

很多搜索场景下,比如网络爬虫算法,单线程的BFS无法充分利用现有的多核多线程的优势。
算法导论第三版新增了一章多线程算法的内容,主要描述了在多线程环境下如何实现某种算法。主要思想可以通过递归计算斐波那契数列(Fibonacci)来描述,众所周知的fib递归算法如下:

def fib(n):
  if n < 2:
    return n
  a = fib(n - 1)
  b = fib(n - 2)
  return a + b

通过增加sync和spawn原语,可以将其转为多线程算法。spawn的含义代表在新线程中运行,sync的含义代表所有线程的汇聚,即所有线程都运行至此才继续执行下一行代码。

将上述fib转为多线程算法是这样的:

def fib(n):
  if n < 2:
    return n
  a = spawn fib(n - 1)
  b = fib(n - 2)
  sync
  return a + b

基于这个理论,需要首先找到哪些可并发,哪些需要汇聚,考虑下面这种情况:

bfs-multi-access.png

B和C,D、E和E是分别满足并发条件的,但B和C并发后的节点E同时属于二者的后继,如果多个线程同时访问时,会将E两次推送至造成错误,并有可能使得多个线程同时取得对E的访问。
在既保证遍历是按照广度优先,而又不至于发生多次访问的错误的情况下,可以采用一种基于有限约束的并发算法,即只允许当前节点的所有子节点并发。这种约束反馈在代码实现上:

 1 Breadth-First-Search(Graph, root):
 2-11         ... ...
12     while Q is not empty:        
13     
14         current = Q.dequeue()
15     
16         for spaw each node n that is adjacent to current:
17             if n.distance == INFINITY:
18                 n.distance = current.distance + 1
19                 n.parent = current
20                 Q.enqueue(n)
21         sync

注意上面for循环中的spaw,这里指的是对每个节点的访问都将在单独的线程中进行,仅当所有节点完成遍历后进行sync。这样,所有的并发既不会乱序,也不会带来访问冲突。

Actor模型实现spawn和sync

什么是Actor

七周七并发一书中详细介绍了Actor模型。首先它是一个通用并发编程模型,也被Erlang借鉴。其次,它封装了状态并通过消息与其他actor通信,可以适应共享内存架构和分布式内存架构,而且有很好的容错性。简单来讲,Actor就是一个可以接收和发送消息的异步执行体。

使用python来模拟Actor

鉴于Actor是一个可收发消息执行体,我们需要2个python的概念与其对应:threadqueue。 thread是一个执行体,而Queue恰好是一个消息存储体。

class Actor(Thread):
    def __init__(s):
        s.queue = Queue()
        Thread.__init__(s)
    def send(s, obj):
        s.queue.put(obj)
    def recv(s):
        return s.queue.get()
    def work_done(s):
        s.queue.task_done()
    def sync(s):
        s.queue.join()
    def has_more_work(s):
        return not s.queue.empty()
    def work():
        pass
    def run(s):
        s.work()

Python内置类Queue的join方法说明:

Queue.join()

Blocks until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

配合task_done(),恰好可以模拟前文提及的sync方法,比如这样同步所有工作线程:

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()
q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done

多线程BFS实现

有了可以sync的Actor,那么多线程的BFS就可以如下方式实现了。

首先将遍历节点的动作封装在Actor中,比如称之为Vistor,它是一个Actor:

class Visitor(Actor):
    def __init__(s, monitor, vid = 0):
        s.monitor = monitor
        s.sid = sid
        Actor.__init__(s)
    def work(s):
        while True:
            v = s.recv()
            nexts = visit(v)
            s.send(nexts)
            s.work_done()

对于发起BFS的线程,我们称之为monitor线程,当然,它也是一个Actor:

class Monitor(Actor):
    def __init__(s, n_visitors, start_vertex = [], depth = 8):
        Actor.__init__(s)
        s.n_visitors = n_visitors
        s.depth = depth
        s.choice = 0
        s.visit_history = set()
        for v in start_vertex:
            s.send(v)
        s.visitors = [Visitor(s, x) for x in range(n_visitors)]
        for sp in s.n_visitors:
            sp.start()
    def wait_all(s):
        for sp in s.n_visitors:
            sp.sync()
    def dispatch(s,v):
        if v not in s.visit_history:
            s.n_visitors[s.choice % s.n_visitors].send(v)
            s.choice += 1
            s.visit_history.add(v)
    
    def work(s):
        while s.has_more_work():
            for v in chain.from_iterable(s.recv_all()):
                if has_visit_depth == s.depth:
                    break
                s.dispatch(v)
            s.wait_all()
        s.done_work()

至此,我们就实现了这种基于有限约束的BFS了。

相关文章

  • 基于有限约束的多线程并发BFS算法

    BFS(广度优先搜索)是图论中的一个基础搜索算法,对于下图,BFS将按照节点的数字大小逐一遍历。 单线程中的实现 ...

  • BFS

    [TOC] BFS 和 DFS BFS广度有限搜索和DFS深度优先搜索算法是特别常用的两种算法 DFS 算法就是回...

  • A*算法 和 最佳优先搜索算法(Best-First-Searc

    BFS算法 算法原理 最佳优先搜索算法是一种启发式搜索算法(Heuristic Algorithm),其基于广度优...

  • 走迷宫(BFS例题)

    BFS例题---走迷宫 BFS对应数据结构queue BFS算法时间复杂度O(n^2) BFS算法不走走过的点 的...

  • Optimistic Concurrency Control f

    1. Abstract 机器学习算法的两个极端:严格的并发约束;没有约束 提出一种中间的方法:算法假设冲突很少发生...

  • G - 7 UVA - 11624

    所用算法:BFS

  • H - 8 POJ - 3984

    所用算法:BFS

  • ORID27

    [127] Word Ladder解题报告 BFS算法 用什么算法?这道题需要用 BFS为什么用这个算法(那些条件...

  • LeetCode 第207题:课程表

    1、前言 2、思路 使用拓扑排序的方法,拓扑排序其实是使用的 BFS 算法,简而言之使用 BFS 算法解题。算法流...

  • CountDownLatch简笔

    前言 JDK的并发包java.util.concurrent中提供了并发编程&多线程编程非常有用的工具类。本文基于...

网友评论

    本文标题:基于有限约束的多线程并发BFS算法

    本文链接:https://www.haomeiwen.com/subject/atekrttx.html