美文网首页JanusGraph我爱编程
Janusgraph Spark yarn-client模式批量

Janusgraph Spark yarn-client模式批量

作者: westfire | 来源:发表于2017-07-29 18:09 被阅读953次

    Janusgraph是一个分布式图数据库,继承自titan。Janusgraph的批量导入(bulkload)默认使用spark的local模式运行,不支持yarn-cluster模式。虽然支持yarn-client模式,但官方没有说明如何配置,配置起来有许多坑。本文将介绍如何配置yarn-client模式的批量导入。
    首先介绍基本配置,然后介绍导入批量导入的配置,最后介绍批量导入的优化。

    本文所用软件版本:
    janusgraph: 0.1.1
    hbase: 1.1.2
    hadoop: 2.7.1

    基本配置

    1. 首先从官网下载并解压janusgraph到本地/data/janusgraph/目录。
    2. 然后配置图数据库前后端。由于我们用的是es + hbase, 所以直接修改/data/janusgraph/conf/janusgraph-hbase-es.properties :
    #重要
    gremlin.graph=org.janusgraph.core.JanusGraphFactory
    #hbase配置
    storage.batch-loading=true
    storage.backend=hbase
    storage.hostname=c1-nn1.bdp.idc,c1-nn2.bdp.idc,c1-nn3.bdp.idc
    storage.hbase.ext.hbase.zookeeper.property.clientPort=2181
    storage.hbase.table = yisou:test_graph
    #es配置
    index.search.backend=elasticsearch
    index.search.hostname=10.120.64.69  #es是只安装在本地,此为本机ip。
    index.search.elasticsearch.client-only=true
    index.search.index-name=yisou_test_graph
    #默认cache配置
    cache.db-cache = true
    cache.db-cache-clean-wait = 20
    cache.db-cache-time = 180000
    cache.db-cache-size = 0.5
    

    3.修改/data/janusgraph/lib下的jar包。由于在跑yarn-client批量导入时有guava等jar包冲突,我根据冲突情况对lib下面的jar包作了调整。主要调整了3个jar包:

    1. hbase-client-1.2.4.jar ==> yisou-hbase-1.0-SNAPSHOT.jar
      由于lib下的hbase-client-1.2.4.jar用的guava与我们yarn集群的guava版本有冲突,所以我们用了公司内部的去除了guava的hbase-client,即yisou-hbase-1.0-SNAPSHOT.jar 。
      如果不替换,报错 "Caused by: java.lang.IllegalAccessError: tried to access method com.google.common.base.Stopwatch.<init>()V from class org.apache.hadoop.hbase.zookeeper.MetaTableLocator"
    2. spark-assembly-1.6.1-hadoop2.6.0.jar ==> spark-assembly-1.6.2-hadoop2.6.0.jar
      lib自带的spark-assembly-1.6.1-hadoop2.6.0.jar也会引起guava冲突,我将其替换成spark-assembly-1.6.2-hadoop2.6.0.jar。
      如果不替换,将会报错"java.lang.NoSuchMethodError: groovy.lang.MetaClassImpl.hasCustomStaticInvokeMethod()Z"
    3. 删除 hbase-protocol-1.2.4.jar.
      如果不删除,将会报错 "com.google.protobuf.ServiceException: java.lang.NoSuchMethodError: org.apache.hadoop.hbase.protobuf.generated.RPCProtos$ConnectionHeader$Builder.setVersionInfo(Lorg/apache/hadoop/hbase/protobuf/generated/RPCProtos$VersionInfo;)Lorg/apache/hadoop/hbase/protobuf/generated/RPCProtos$ConnectionHeader$Builder;"

    4.配置图中边和节点属性,具体参考官网,本文不展开。

    批量导入配置

    由于需要与yarn配合,将导入程序放在yarn上执行,所以需要hadoop相关环境配置。需要修改两个配置文件,一个是Janusgraph的启动脚本/data/janusgraph/lib/gremlin.sh, 另一个是hadoop和spark相关的配置/data/janusgraph/conf/hadoop-graph/hadoop-script.properties。

    1.复制/data/janusgraph/lib/gremlin.sh, 假定命名为yarn-gremlin.sh。 然后增加hadoop的配置到JAVA_OPTIONS和CLASSPATH中。这样能保证hadoop相关配置能被程序读取到,便于正常启动spark在yarn上的任务。

    #!/bin/bash
    export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
    export HADOOP_HOME=/usr/local/hadoop-2.7.1
    export JAVA_OPTIONS="$JAVA_OPTIONS -Djava.library.path=$HADOOP_HOME/lib/native"
    export CLASSPATH=$HADOOP_CONF_DIR
    #JANUSGRAPH_HOME为用户安装janusgraph的目录/data/janusgraph/
    cd $JANUSGRAPH_HOME
    ./bin/gremlin.sh
    

    2.修改/data/janusgraph/conf/hadoop-graph/hadoop-script.properties
    主要根据要导入文件的格式修改inputFormat、指定要导入的hdfs文件路径、parse函数路径以及spark master指定为yarn-client等。

    #
    # Hadoop Graph Configuration
    #
    gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
    gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.script.ScriptInputFormat
    gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONOutputFormat
    gremlin.hadoop.jarsInDistributedCache=true
    
    #导入文件的hdfs路径。也可以在加载该配置文件后指定
    gremlin.hadoop.inputLocation=/user/yisou/taotian1/janus/data/fewData.test.dup
    #解析hdfs文件的parse函数路径。也可以在加载该配置文件后指定
    gremlin.hadoop.scriptInputFormat.script=/user/yisou/taotian1/janus/data/conf/vertex_parse.groovy
    #gremlin.hadoop.outputLocation=output
    
    #
    # SparkGraphComputer with Yarn Configuration
    #
    spark.master=yarn-client
    spark.executor.memory=6g
    spark.executor.instances=10
    spark.executor.cores=2
    spark.serializer=org.apache.spark.serializer.KryoSerializer
    # spark.kryo.registrationRequired=true
    # spark.storage.memoryFraction=0.2
    # spark.eventLog.enabled=true
    # spark.eventLog.dir=/tmp/spark-event-logs
    # spark.ui.killEnabled=true
    
    #cache config
    gremlin.spark.persistContext=true
    gremlin.spark.graphStorageLevel=MEMORY_AND_DISK
    #gremlin.spark.persistStorageLevel=DISK_ONLY
    
    
    #####################################
    # GiraphGraphComputer Configuration #
    #####################################
    giraph.minWorkers=2
    giraph.maxWorkers=3
    giraph.useOutOfCoreGraph=true
    giraph.useOutOfCoreMessages=true
    mapred.map.child.java.opts=-Xmx1024m
    mapred.reduce.child.java.opts=-Xmx1024m
    giraph.numInputThreads=4
    giraph.numComputeThreads=4
    # giraph.maxPartitionsInMemory=1
    # giraph.userPartitionCount=2
    

    执行批量导入

    启动命令:

    sh /data/janusgraph/lib/yarn-gremlin.sh
    

    批量导入命令:

    local_root="/data/janusgraph"
    hdfs_root="/user/yisou/taotian1/janus"
    social_graph="${local_root}/conf/janusgraph-hbase-es.properties"
    graph = GraphFactory.open("${local_root}/conf/hadoop-script.properties")
    graph.configuration().setProperty("gremlin.hadoop.inputLocation","/user/yisou/taotian1/janus/data/fewData.test.dup")
    graph.configuration().setProperty("gremlin.hadoop.scriptInputFormat.script", "${hdfs_root}/conf/vertex_parse.groovy")
    blvp = BulkLoaderVertexProgram.build().writeGraph(social_graph).create(graph)
    graph.compute(SparkGraphComputer).program(blvp).submit().get()
    

    运行结果:

    sh /data/janusgraph/lib/yarn-gremlin.sh
    \,,,/
    (o o)
    -----oOOo-(3)-oOOo-----
    plugin activated: janusgraph.imports
    plugin activated: tinkerpop.server
    plugin activated: tinkerpop.utilities
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/data2/janusgraph-0.1.1-hadoop2/lib/slf4j-log4j12-1.7.12.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/data2/janusgraph-0.1.1-hadoop2/lib/logback-classic-1.1.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/data2/janusgraph-0.1.1-hadoop2/lib/spark-assembly-1.6.2-hadoop2.6.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/data2/janusgraph-0.1.1-hadoop2/lib/yisou-hbase-1.0-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
    21:22:00,392  INFO HadoopGraph:87 - HADOOP_GREMLIN_LIBS is set to: /data2/janusgraph-0.1.1-hadoop2/lib
    plugin activated: tinkerpop.hadoop
    plugin activated: tinkerpop.spark
    plugin activated: tinkerpop.tinkergraph
    gremlin>
    gremlin> local_root="/data2/janusgraph-0.1.1-hadoop2/social"
    ==>/data2/janusgraph-0.1.1-hadoop2/social
    gremlin> hdfs_root="/user/yisou/taotian1/janus"
    ==>/user/yisou/taotian1/janus
    gremlin> social_graph="${local_root}/conf/janusgraph-hbase-es-social.properties"
    ==>/data2/janusgraph-0.1.1-hadoop2/social/conf/janusgraph-hbase-es-social.properties
    gremlin> graph = GraphFactory.open("${local_root}/conf/hadoop-yarn.properties")
    ==>hadoopgraph[scriptinputformat->graphsonoutputformat]
    gremlin> graph.configuration().setProperty("gremlin.hadoop.inputLocation","/user/yisou/taotian1/janus/tmp1person/")
    ==>null
    gremlin> graph.configuration().setProperty("gremlin.hadoop.scriptInputFormat.script", "${hdfs_root}/person_parse.groovy")
    ==>null
    gremlin> blvp = BulkLoaderVertexProgram.build().writeGraph(social_graph).create(graph)
    ==>BulkLoaderVertexProgram[bulkLoader=IncrementalBulkLoader, vertexIdProperty=bulkLoader.vertex.id, userSuppliedIds=false, keepOriginalIds=true, batchSize=0]
    gremlin> graph.compute(SparkGraphComputer).program(blvp).submit().get()
    21:25:04,666  INFO deprecation:1173 - mapred.reduce.child.java.opts is deprecated. Instead, use mapreduce.reduce.java.opts
    21:25:04,667  INFO deprecation:1173 - mapred.map.child.java.opts is deprecated. Instead, use mapreduce.map.java.opts
    21:25:04,680  INFO KryoShimServiceLoader:117 - Set KryoShimService provider to org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPoolShimService@4cb2918c (class org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPoolShimService) because its priority value (0) is the highest available
    21:25:04,680  INFO KryoShimServiceLoader:123 - Configuring KryoShimService provider org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPoolShimService@4cb2918c with user-provided configuration
      21:25:10,479  WARN SparkConf:70 - The configuration key 'spark.yarn.user.classpath.first' has been deprecated as of Spark 1.3 and may be removed in the future. Please use spark.{driver,executor}.userClassPathFirst instead.
    21:25:10,505  INFO SparkContext:58 - Running Spark version 1.6.2
    21:25:10,524  WARN SparkConf:70 - The configuration key 'spark.yarn.user.classpath.first' has been deprecated as of Spark 1.3 and may be removed in the future. Please use spark.{driver,executor}.userClassPathFirst instead.
    21:25:10,564  INFO SecurityManager:58 - Changing view acls to: yisou
    21:25:10,565  INFO SecurityManager:58 - Changing modify acls to: yisou
    21:25:10,566  INFO SecurityManager:58 - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yisou); users with modify permissions: Set(yisou)
    21:25:10,833  WARN SparkConf:70 - The configuration key 'spark.yarn.user.classpath.first' has been deprecated as of Spark 1.3 and may be removed in the future. Please use spark.{driver,executor}.userClassPathFirst instead.
    21:25:10,835  WARN SparkConf:70 - The configuration key 'spark.yarn.user.classpath.first' has been deprecated as of Spark 1.3 and may be removed in the future. Please use spark.{driver,executor}.userClassPathFirst instead.
    21:25:11,035  INFO Utils:58 - Successfully started service 'sparkDriver' on port 36502.
    21:25:11,576  INFO Slf4jLogger:80 - Slf4jLogger started
      21:25:11,646  INFO Remoting:74 - Starting remoting
    ............
    21:25:20,736  INFO Client:58 - Submitting application 2727164 to ResourceManager
    21:25:20,771  INFO YarnClientImpl:273 - Submitted application application_1466564207556_2727164
    21:25:21,780  INFO Client:58 - Application report for application_1466564207556_2727164 (state: ACCEPTED)
    21:25:21,785  INFO Client:58 -
    client token: N/A
    diagnostics: N/A
    ApplicationMaster host: N/A
    ApplicationMaster RPC port: -1
    queue: root.yisou
    start time: 1500297920750
    final status: UNDEFINED
    tracking URL: http://c1-nn3.bdp.idc:8981/proxy/application_1466564207556_2727164/
    21:25:22,787  INFO Client:58 - Application report for application_1466564207556_2727164 (state: ACCEPTED)
    21:25:23,789  INFO Client:58 - Application report for application_1466564207556_2727164 (state: ACCEPTED)
    21:25:24,791  INFO Client:58 - Application report for application_1466564207556_2727164 (state: ACCEPTED)
    21:25:25,793  INFO Client:58 - Application report for application_1466564207556_2727164 (state: ACCEPTED)
    21:25:39,585  INFO JettyUtils:58 - Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
    21:25:39,823  INFO Client:58 - Application report for application_1466564207556_2727164 (state: RUNNING)
    21:25:39,824  INFO Client:58 -
    client token: N/A
    diagnostics: N/A
    ApplicationMaster host: 10.130.1.50
    ApplicationMaster RPC port: 0
    queue: root.yisou
    start time: 1500297920750
    final status: UNDEFINED
    tracking URL: http://c1-nn3.bdp.idc:8981/proxy/application_1466564207556_2727164/
    ..........
    21:25:42,864  INFO SparkContext:58 - Added JAR /data2/janusgraph-0.1.1-hadoop2/lib/commons-codec-1.7.jar at http://10.130.64.69:38209/jars/commons-codec-1.7.jar with timestamp 1500297942864
    21:25:42,866  INFO SparkContext:58 - Added JAR /data2/janusgraph-0.1.1-hadoop2/lib/commons-lang-2.5.jar at http://10.130.64.69:38209/jars/commons-lang-2.5.jar with timestamp 1500297942866
    21:25:42,869  INFO SparkContext:58 - Added JAR /data2/janusgraph-0.1.1-hadoop2/lib/commons-collections-3.2.2.jar at http://10.130.64.69:38209/jars/commons-collections-3.2.2.jar with timestamp 1500297942869
    21:25:42,872  INFO SparkContext:58 - Added JAR /data2/janusgraph-0.1.1-hadoop2/lib/commons-io-2.3.jar at http://10.130.64.69:38209/jars/commons-io-2.3.jar with timestamp 1500297942872
    21:25:42,874  INFO SparkContext:58 - Added JAR /data2/janusgraph-0.1.1-hadoop2/lib/jetty-util-6.1.26.jar at http://10.130.64.69:38209/jars/jetty-util-6.1.26.jar with timestamp 1500297942874
    21:25:42,879  INFO SparkContext:58 - Added JAR /data2/janusgraph-0.1.1-hadoop2/lib/htrace-core-3.1.0-incubating.jar at http://10.130.64.69:38209/jars/htrace-core-3.1.0-incubating.jar with timestamp 1
    ............
    21:26:14,751  INFO MapOutputTrackerMaster:58 - Size of output statuses for shuffle 2 is 146 bytes
    21:26:14,767  INFO TaskSetManager:58 - Finished task 0.0 in stage 6.0 (TID 4) in 40 ms on c1-dn31.bdp.idc (1/1)
    21:26:14,767  INFO YarnScheduler:58 - Removed TaskSet 6.0, whose tasks have all completed, from pool
    21:26:14,767  INFO DAGScheduler:58 - ResultStage 6 (foreachPartition at SparkExecutor.java:173) finished in 0.042 s
    21:26:14,768  INFO DAGScheduler:58 - Job 1 finished: foreachPartition at SparkExecutor.java:173, took 1.776125 s
    21:26:14,775  INFO ShuffledRDD:58 - Removing RDD 2 from persistence list
    21:26:14,785  INFO BlockManager:58 - Removing RDD 2
    ==>result[hadoopgraph[scriptinputformat->graphsonoutputformat],memory[size:0]]
    gremlin> 21:26:22,515  INFO YarnClientSchedulerBackend:58 - Registered executor NettyRpcEndpointRef(null) (c1-dn9.bdp.idc:60762) with ID 8
    

    批量导入性能优化

    如果不做优化,janusgraph批量导入的速度非常慢,导入4千万条数据大约需要3.5小时。优化后可降低到1小时.
    1.加大ids.block-size和storage.buffer-size参数的大小(在janusgraph-hbase-es.properties中配置)。
    ids.block-size=100000000
    storage.buffer-size=102400

    2.指定hbase初始的region数目(在janusgraph-hbase-es.properties中配置)。
    storage.hbase.region-count = 50

    3.边和顶点同时导入,而不是顶点和边分成不同的文件,分开导入。格式可参考/data/janusgraph/data/grateful-dead.txt。

    总结

    本文主要讲解了janusgraph中如何配置yarn-client的方式批量导入节点和边。

    分为基本配置和批量导入的配置两部分,基本配置中需要注意janusgraph自带jar包与用户yarn环境中jar包的冲突问题,可替换或者删除相关jar包。

    批量导入配置中重点是在gremlin.sh中添加hadoop的相关配置,将hadoop环境配置到JAVA_OPTIONS和CLASSPATH中。

    (完)

    参考链接

    Titan 数据库使用
    图数据库Titan在生产环境中的使用全过程+分析
    合并顶点和边,批量导入parse函数样例
    Yet Another Analytics & Intelligence Communication Series

    相关文章

      网友评论

      • wing文音:请问那个id.block-size怎么设置啊?在properties文件添加后,导数据有warning,提示文件中设置无效,需要用managementsystem的接口设置,但是不知道是哪个接口…
      • 260a4c3c4a53:!!!!最近刚入JanusGraph的坑。现在从HBase读数据我还是逐条创建的。楼主这篇文章刚好帮我解决历史数据导入问题。大赞!!!!!
        问一个问题,楼主有没有研究到如何构建Gremlin Server集群啊?
      • 小辉的阳仔:你好,请问批量导入数据的格式是什么样的?数据格式和schema是怎么对应的?谢谢

      本文标题:Janusgraph Spark yarn-client模式批量

      本文链接:https://www.haomeiwen.com/subject/gvpqkxtx.html