美文网首页
hugegraph 整合 spark+graphX

hugegraph 整合 spark+graphX

作者: NazgulSun | 来源:发表于2019-08-01 19:49 被阅读0次

当数据量太大的时候,对hugegraph 进行一些统计查询或者算法遍历的时候,经常会超时,或者时间很久。
这个时候需要借助与大数据相关的技术。hugegraph 之前提供了一个hugegraph-spark的组件,
但是变态的是,目前该组件已经商业化,还好其团队在git上大概介绍了实现的思路。

  • 使用hugegraph 的切片查询功能。 因为底层使用cassandra,可以先查出整个集群有多少个切片。
  • 然后使用并行框架并发的去遍历每个切片,这样就可以快速的把整个集群的节点和边获取出来。
  • 把所有的节点和边导入到HDFS中。
  • 编写代码把导入的文件转化为graphx 支持的RDD格式,并生成graph对象,然后就可以调用graph对象的方法。
  • 把整个程序打包成jar包提交给spark平台,就可以分布式的调用graphx的相关方法,比如统计节点。

我们构建了一个3000万个节点,4000万边的数据集。在10个节点的spark集群上,大概需要1分钟。

大概的实现:

  • 使用java+scala的混合方式。
  • java线程池读遍历hugegraph分片,并导入数据到hdfs
edgeShards.forEach(shard->
            {
                try
                {
                    producerSemaphore.acquire();
                } catch (InterruptedException e)
                {
                    throw new RuntimeException("can't acquire consumer semaphore");
                }

                HugeEdgeHandler edgeHandler = new HugeEdgeHandler(shard,graphQueryDao,graphSerializer,
                        hadoopProperties,hdfsDir);
                CompletableFuture.runAsync(
                        edgeHandler,
                        producerExecutor
                ).whenComplete((r,e)->{ producerSemaphore.release();});
            }
        );
        
    @Override
    public void run()
    {
        //TODO add Retry
        List<Edge> edges = graphQueryDao.getEdgesByShard(this.currentShard);
        log.info("add {} edges to queue using shard {}", edges.size(), this.currentShard);
        Collection<String> lines = edges.stream().map(e-> graphxLine(graphSerializer.writeEdge(e))).collect(Collectors.toList());
        HadoopFileUtils.writeFromLocalToHdfs(hadoopProperties, lines.iterator(), getHdfsFileName());
        log.info("write {} lines into  hdfs file {}",lines.size(), getHdfsFileName());
    }
  • 构建GraphX的RDD对象,生成Graph并调用算法,这部分由scala实现。
object GraphXTraversal {
  def run(context : SparkContext, vertexFileName: String, edgeFileName: String, traversalFileName: String): Unit = {
    val vertexRdd : RDD[(VertexId, String)] = context.textFile(vertexFileName)
      .map(line => {
        val parts = line.split("\t")
        (parts(1).toLong, parts(0))
      })
    val edgeRdd : RDD[Edge[String]] = context.textFile(edgeFileName)
      .map(line => {
        val parts = line.split("\t")
        new Edge(parts(1).toLong, parts(2).toLong, parts(0))
      })

    val graph : Graph[String, String]  =  Graph(vertexRdd, edgeRdd)

    val vConunt = graph.vertices.count();
    val eCount = graph.edges.count();
    val output=context.makeRDD(List(vConunt,eCount));
    output.saveAsTextFile(traversalFileName);
  }


}
  • java+scala混合打包,pom.xml 的配置是关键,需要加入scala插件,并且显示指定main方法,否则 spark-submit的时候会报 class not found 的问题。
<plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>2.4.1</version>
            <configuration>
                <archive>
                    <manifest>
                        <mainClass>com.datayes.kgraph.job.GraphTraveralJob</mainClass>
                    </manifest>
                </archive>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <skipAssembly>false</skipAssembly>
            </configuration>
            <executions>
                <execution>
                    <id>package</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

  • spark 集群自己调用hugegraph并生成rdd
    使用如下的方式 让spark 的集群并发获取shard的信息直接构建,RDD,这样是最快的。
 JavaRDD<Edge<String>> edgeRDD = rddShards.flatMap((shard)->
        {
            HugeClient  Xcv =new HugeClient("http://10.20.205.167:8080", "hugegraph",1200);
            List<com.baidu.hugegraph.structure.graph.Edge> baiduEdges =  Xcv.traverser().edges(shard);
            List<Edge<String>> sparkEdge = Lists.newArrayList();
            baiduEdges.stream().forEach(e->
            {
                sparkEdge.add(new Edge<String>(Long.parseLong(String.valueOf(e.source()))
                        ,Long.parseLong(String.valueOf(e.target())),e.label()));
            });
            return sparkEdge.iterator();
        }
        );

但是打包运行的时候遇到了各种问题

  • 其中一个是 Hugeclient checkVersion失败。

原因 hugegraph的组件启动的时候会去 自己的包里面 manifest.mf 里面找一个 implemnt-version的变量。
看和程序的版本是否一致。 hugegraph-client, hugegraph-common 的包里都有这个文件。

但是当使用 maven assembly 和maven shade的时候, 依赖的包都会被解压, manifest 都没有copy过来,或者说被覆盖了。

造成 hugegraph启动的时候一直就是 version 无法match。

  • 解决的办法, 不能unpack 依赖的jar 包,否则manifest信息会丢失。
    在不unpack 的情况下,只能将jar 拷贝到目录,然后指定 classpath。
 <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <archive>
                        <manifest>
                         <addClasspath>true</addClasspath>
                         <classpathPrefix>lib/</classpathPrefix>
                         <mainClass>com.datayes.kgraph.job.GraphTraveralJob</mainClass>
                     </manifest>
                   </archive>
                </configuration>
            </plugin>
        <plugin>
             <groupId>org.apache.maven.plugins</groupId>
             <artifactId>maven-dependency-plugin</artifactId>
             <version>2.10</version>
             <executions>
                 <execution>
                     <id>copy-dependencies</id>
                     <phase>package</phase>
                     <goals>
                         <goal>copy-dependencies</goal>
                     </goals>
                     <configuration>
                         <outputDirectory>${project.build.directory}/lib</outputDirectory>
                     </configuration>
                 </execution>
             </executions>
         </plugin>

在spak-submit的时候,指定 --jar a.jar,b.jar myapplication.jar

--jar 参数一定要在 myapplication.jar 之前。 为啥还有这种限制,真的是无语了。坑巨多。

相关文章

网友评论

      本文标题:hugegraph 整合 spark+graphX

      本文链接:https://www.haomeiwen.com/subject/hczcdctx.html