Graphx的最短路径算法只针对非带权图(即边的权重相等),采用迪杰斯特拉算法。
4.1 简介
最短路径算法用来计算任意两个节点之间的最短距离,给定一组节点集合,求图中所有节点与集合中节点的最短路径。
4.2 算法场景
(一)交通路线查询
貌似最短路径算法是图计算工具普遍会提供的算法,但好像直接使用它的业务场景相对较少,了解有限,还请有熟悉最短路径算法应用的小伙伴帮忙普及一下。
4.3 算法流程
假设需要进行求最短距离的顶点集合中的顶点为t
核心思想:将自己与顶点t的最短路径告诉所有邻居节点,邻居节点来决定是否接收该消息并确定更新自身与顶点t的最短路径。
计算步骤:
1. 初始化,给每个节点一个空的Map用来存储与顶点t的最短距离,而顶点t与自身的距离设置为0;
2. 每个节点将自己与顶点t的距离+1之后发送给所有的邻居节点;
3. 每个节点 i 将会收到来自邻居认为的 i 与顶点 t 的最短距离,对收到有同样key值的Map信息进行汇总,根据Map中标识最短距离的key值大小取最小值作为自己要接收的消息;
4. 节点将收到的消息进行处理,同样是比较收到消息中顶点t的最短距离与自己当前到顶点t的最短距离,取最小者作为自己到顶点t的新的距离。
不断迭代上述2,3,4步。
4.4 源码分析
object ShortestPaths extends Serializable {
/** Stores a map from the vertex id of a landmark to the distance to that landmark. */
type SPMap = Map[VertexId, Int]
// _*表示变长参数,表示前面的x参数是一个参数序列,这里用来存储最终结果:每个节点与给定landmark集中节点的最短路径
private def makeMap(x: (VertexId, Int)*) = Map(x: _*)
// 向前走一步
private def incrementMap(spmap: SPMap): SPMap = spmap.map { case (v, d) => v -> (d + 1) }
// 每个节点i对于收到的多个邻居节点认为的i与指定landmark集中节点的路径长度信息,取最短的路径。
private def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap = {
(spmap1.keySet ++ spmap2.keySet).map {
k => k -> math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue))
}(collection.breakOut) // more efficient alternative to [[collection.Traversable.toMap]]
}
/**
* 计算图中所有节点与给定节点集合中每个顶点的最短路径
* landmarks是进行求最短路径顶点id的集合,会计算landmarks集合中每个元素
* 返回结果是一个图,每个顶点的属性是到landmarks点之间的最短距离
*/
def run[VD, ED: ClassTag](graph: Graph[VD, ED], landmarks: Seq[VertexId]): Graph[SPMap, ED] = {
// 将图中landmarks集合中的节点标记为自身到自身的最短距离为0,其余顶点属性初始化为空的Map()
val spGraph = graph.mapVertices { (vid, attr) =>
if (landmarks.contains(vid)) makeMap(vid -> 0) else makeMap()
}
// 初始化时给每个节点传入一个空的Map属性
val initialMessage = makeMap()
def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = {
addMaps(attr, msg)
}
// 向邻居发送消息时,会把自身与指定节点的距离+1之后再发送
def sendMessage(edge: EdgeTriplet[SPMap, _]): Iterator[(VertexId, SPMap)] = {
val newAttr = incrementMap(edge.dstAttr)
// 如果src节点与指定节点距离和dst+1之后的距离相等,则不发送消息,否则向src节点发送新距离信息
if (edge.srcAttr != addMaps(newAttr, edge.srcAttr)) Iterator((edge.srcId, newAttr))
else Iterator.empty
}
Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, addMaps)
}
}
网友评论