美文网首页
[nebula-graph] nebula spark for

[nebula-graph] nebula spark for

作者: Lo_ading | 来源:发表于2021-07-14 14:01 被阅读0次

    NebulaSparkConnector2.0

    [参考链接]
    [参考链接]:https://docs.nebula-graph.com.cn/spark-connector/sc-ug-what-is-spark-connector/

    [代码]
    [代码]:https://github.com/Loading-Life/nebula-spark-java-demo

    说明

    Nebula Spark Connector 是一个 Spark 连接器,提供了通过 Spark 标准形式读写 Nebula Graph 数据库的能力,由以下两部分组成:

    • Reader:为您提供了一个 Spark SQL 接口,您可以使用 Spark SQL 接口编程读取 Nebula Graph 图数据,单次读取一个点或边类型的数据,并将读取的结果组装成 Spark 的 DataFrame。

    • Writer:为您提供了一个 Spark SQL 接口,您可以使用 Spark SQL 接口编程将 DataFrame 格式的数据逐条或批量写入 Nebula Graph。

    适用场景

    • 在不同的 Nebula Graph 集群之间迁移数据。
    • 在同一个 Nebula Graph 集群内不同图空间之间迁移数据。
    • Nebula Graph 与其他数据源之间迁移数据。

    环境

    • NebulaGraph : 2.0.0
    • Apache Spark™ : 2.4.4
    • Scala : 2.11.12
    • Java : 1.8

    使用

    1. clone & insall

       $ git clone https://github.com/vesoft-inc/nebula-spark-utils.git
       $ cd nebula-spark-utils
       $ git checkout -b v2.0.0
       $ mvn clean install -Dgpg.skip -Dmaven.javadoc.skip=true -Dmaven.test.skip=true
      
    2. 创建你的maven项目

    3. 添加maven依赖

       <dependency>
         <groupId>com.vesoft</groupId>
         <artifactId>nebula-spark</artifactId>
         <version>2.0.0</version>
       </dependency>
      
    4. 编写测试类并修改成你的配置

      • 数据样例

        dege

          {"src":12345,"dst":23456,"degree":34, "descr": "aaa","timep": "2020-01-01"}
          {"src":11111,"dst":22222,"degree":33, "descr": "aaa","timep": "2020-01-01"}
          {"src":11111,"dst":33333,"degree":32, "descr": "a\baa","timep": "2020-01-01"}
          {"src":11111,"dst":44444,"degree":31, "descr": "aaa","timep": "2020-01-01"}
          {"src":22222,"dst":55555,"degree":30, "descr": "a\naa","timep": "2020-01-01"}
          {"src":33333,"dst":44444,"degree":29, "descr": "aaa","timep": "2020-01-01"}
          {"src":33333,"dst":55555,"degree":28, "descr": "aa\ta","timep": "2020-01-01"}
          {"src":44444,"dst":22222,"degree":27, "descr": "aaa","timep": "2020-01-01"}
          {"src":44444,"dst":55555,"degree":26, "descr": "aaa","timep": "2020-01-01"}
          {"src":22222,"dst":66666,"degree":25, "descr": "aaa","timep": "2020-01-01"}
          {"src":21,"dst":22,"degree":24, "descr": "aaa","timep": "2021-04-09"}
        

        vertex

          te{"id":"aa","name":"Tom","age":10,"timep": "2020-01-01"}
          {"id":"\ns","name":"Bob","age":11,"timep": "2020-01-02"}
          {"id":14,"name":"Jane","age":12,"timep": "2020-01-03"}
          {"id":15,"name":"Jena","age":13,"timep": "2020-01-04"}
          {"id":16,"name":"Ni\tc","age":14,"timep": "2020-01-05"}
          {"id":17,"name":"Mei","age":15,"timep": "2020-01-06"}
          {"id":18,"name":"HH","age":16,"timep": "2020-01-07"}
          {"id":19,"name":"Ty\nler","age":17,"timep": "2020-01-08"}
          {"id":20,"name":"Ber","age":18,"timep": "2020-01-09"}
          {"id":21,"name":"Mercy","age":19,"timep": "2020-01-10"}
          {"id":22,"name":"why","age":27,"timep": "2021-04-09"}
        
      • write代码示例

          package com.loading.nebula;
        
          import com.facebook.thrift.protocol.TCompactProtocol;
          import com.vesoft.nebula.connector.NebulaConnectionConfig;
          import com.vesoft.nebula.connector.WriteNebulaEdgeConfig;
          import com.vesoft.nebula.connector.WriteNebulaVertexConfig;
          import org.apache.spark.SparkConf;
          import org.apache.spark.SparkContext;
          import org.apache.spark.sql.DataFrameWriter;
          import org.apache.spark.sql.Dataset;
          import org.apache.spark.sql.Row;
          import org.apache.spark.sql.SparkSession;
          import org.apache.spark.storage.StorageLevel;
          import com.vesoft.nebula.connector.connector.package$;
          import org.slf4j.Logger;
          import org.slf4j.LoggerFactory;
          
          
          /**
           * desc:
           *
           * @author Lo_ading
           * @version 1.0.0
           * @date 2021/4/9
           */
          public class NebulaSparkWriterExample {
          
            private final Logger logger = LoggerFactory.getLogger(NebulaSparkWriterExample.class);
          
            public static void main(String[] args) {
          
              SparkConf sparkConf = new SparkConf();
              sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
          
              Class<?>[] classes = {TCompactProtocol.class};
              sparkConf.registerKryoClasses(classes);
          
              SparkContext sparkContext = new SparkContext("local", "NebulaSparkWriterExample", sparkConf);
              SparkSession sparkSession = new SparkSession(sparkContext);
          
              writeData(sparkSession);
          
              sparkSession.close();
              System.exit(0);
          
            }
          
            private static void writeData(SparkSession sparkSession) {
          
              // build and setting nubula connection config
              NebulaConnectionConfig nebulaConnectionConfig = NebulaConnectionConfig
                  .builder()
                  .withMetaAddress("your-meta-host:meta-port")
                  .withGraphAddress("your-graph-host:graph-port")
                  .withConenctionRetry(2)
                  .build();
          
              //write vertex data
              System.out.println("Start to write nebula data [vertex]");
              
              //your data file
              Dataset<Row> vertexDataset = sparkSession.read().json("/data/vertex");
              vertexDataset.show();
              vertexDataset.persist(StorageLevel.MEMORY_ONLY_SER());
              WriteNebulaVertexConfig writeNebulaVertexConfig = WriteNebulaVertexConfig
                  .builder()
                  .withSpace("test")
                  .withTag("person")
                  .withVidField("id")
                  .withVidAsProp(true)
                  .withBatch(1000)
                  .build();
              DataFrameWriter<Row> vertexDataFrameWriter = new DataFrameWriter<>(vertexDataset);
              package$.MODULE$.NebulaDataFrameWriter(vertexDataFrameWriter)
                  .nebula(nebulaConnectionConfig, writeNebulaVertexConfig).writeVertices();
              System.out.println("End to write nebula data [vertex]");
          
              //写入edge数据
              System.out.println("Start to write nebula data [edge]");
              Dataset<Row> edgeDataset = sparkSession.read().json("/data/edge");
              edgeDataset.show();
              edgeDataset.persist(StorageLevel.MEMORY_ONLY_SER());
              WriteNebulaEdgeConfig writeNebulaEdgeConfig = WriteNebulaEdgeConfig
                  .builder()
                  .withSpace("test")
                  .withEdge("friend")
                  .withSrcIdField("src")
                  .withDstIdField("dst")
                  .withRankField("degree")
                  .withSrcAsProperty(true)
                  .withDstAsProperty(true)
                  .withRankAsProperty(true)
                  .withBatch(1000)
                  .build();
              DataFrameWriter<Row> edgeDataFrameWriter = new DataFrameWriter<>(edgeDataset);
              package$.MODULE$.NebulaDataFrameWriter(edgeDataFrameWriter)
                  .nebula(nebulaConnectionConfig, writeNebulaEdgeConfig).writeEdges();
              System.out.println("End to write nebula data [edge]");
        
            }
          
          
          }
        
      • read代码示例

          package com.loading.nebula;
          
          import com.facebook.thrift.protocol.TCompactProtocol;
          import com.vesoft.nebula.connector.NebulaConnectionConfig;
          import com.vesoft.nebula.connector.ReadNebulaConfig;
          import org.apache.spark.SparkConf;
          import org.apache.spark.SparkContext;
          import org.apache.spark.graphx.Edge;
          import org.apache.spark.rdd.RDD;
          import org.apache.spark.sql.DataFrameReader;
          import org.apache.spark.sql.Dataset;
          import org.apache.spark.sql.Row;
          import org.apache.spark.sql.SparkSession;
          import org.slf4j.Logger;
          import org.slf4j.LoggerFactory;
          import scala.Tuple2;
          import scala.collection.immutable.List;
          import com.vesoft.nebula.connector.connector.package$;
          import scala.collection.immutable.List$;
          import scala.collection.mutable.StringBuilder;
          
          /**
           * desc:
           *
           * @author Lo_ading
           * @version 1.0.0
           * @date 2021/4/9
           */
          public class NebulaSparkReaderExample {
          
            private final Logger logger = LoggerFactory.getLogger(NebulaSparkWriterExample.class);
          
            public static void main(String[] args) {
          
              SparkConf sparkConf = new SparkConf();
              sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
          
              Class<?>[] classes = {TCompactProtocol.class};
              sparkConf.registerKryoClasses(classes);
          
              SparkContext sparkContext = new SparkContext("local", "NebulaSparkReaderExample", sparkConf);
              SparkSession sparkSession = new SparkSession(sparkContext);
          
              readData(sparkSession);
          
              sparkSession.close();
              System.exit(0);
          
            }
          
            private static void readData(SparkSession sparkSession) {
          
              // build connection config
              NebulaConnectionConfig nebulaConnectionConfig = NebulaConnectionConfig
                  .builder()
                  .withMetaAddress("your-meta-host:meta-port")
                  .withConenctionRetry(2)
                  .withTimeout(600)
                  .build();
          
              readVertex(sparkSession, nebulaConnectionConfig);
              readEdges(sparkSession, nebulaConnectionConfig);
              readVertexGraph(sparkSession, nebulaConnectionConfig);
              readEdgeGraph(sparkSession, nebulaConnectionConfig);
            }
          
            private static void readVertex(SparkSession sparkSession,
                NebulaConnectionConfig nebulaConnectionConfig) {
          
              List<String> cols = List$.MODULE$.empty();
              cols.addString(new StringBuilder("name"));
              cols.addString(new StringBuilder("age"));
              cols.addString(new StringBuilder("id"));
              cols.addString(new StringBuilder("timep"));
              ReadNebulaConfig readNebulaConfig = ReadNebulaConfig
                  .builder()
                  .withSpace("test")
                  .withLabel("person")
                  .withNoColumn(false)
                  .withReturnCols(cols)
                  .build();
              DataFrameReader dataFrameReader = new DataFrameReader(sparkSession);
              Dataset<Row> dataset = package$.MODULE$.NebulaDataFrameReader(dataFrameReader)
                  .nebula(nebulaConnectionConfig, readNebulaConfig).loadVerticesToDF();
              System.out.println("Vertices schema");
              dataset.printSchema();
              dataset.show(20);
              System.out.println("Vertices nums:" + dataset.count());
          
            }
          
            private static void readEdges(SparkSession sparkSession,
                NebulaConnectionConfig nebulaConnectionConfig) {
              ReadNebulaConfig readNebulaConfig = ReadNebulaConfig
                  .builder()
                  .withSpace("test")
                  .withLabel("friend")
                  .withNoColumn(true)
                  .build();
              DataFrameReader dataFrameReader = new DataFrameReader(sparkSession);
              Dataset<Row> dataset = package$.MODULE$.NebulaDataFrameReader(dataFrameReader)
                  .nebula(nebulaConnectionConfig, readNebulaConfig).loadEdgesToDF();
              System.out.println("Edge schema");
              dataset.printSchema();
              dataset.show(20);
              System.out.println("Edge nums:" + dataset.count());
            }
          
            private static void readVertexGraph(SparkSession sparkSession,
                NebulaConnectionConfig nebulaConnectionConfig) {
              List<String> cols = List$.MODULE$.empty();
              cols.addString(new StringBuilder("name"));
              ReadNebulaConfig readNebulaConfig = ReadNebulaConfig
                  .builder()
                  .withSpace("test")
                  .withLabel("person")
                  .withReturnCols(cols)
                  .build();
              DataFrameReader dataFrameReader = new DataFrameReader(sparkSession);
              RDD<Tuple2<Object, List<Object>>> vertexRdd = package$.MODULE$
                  .NebulaDataFrameReader(dataFrameReader).nebula(nebulaConnectionConfig, readNebulaConfig)
                  .loadVerticesToGraphx();
              System.out.println("Vertices RDD nums:" + vertexRdd.count());
            }
          
            private static void readEdgeGraph(SparkSession sparkSession,
                NebulaConnectionConfig nebulaConnectionConfig) {
              ReadNebulaConfig readNebulaConfig = ReadNebulaConfig
                  .builder()
                  .withSpace("test")
                  .withLabel("friend")
                  .withNoColumn(true)
                  .build();
              DataFrameReader dataFrameReader = new DataFrameReader(sparkSession);
              RDD<Edge<Tuple2<Object, List<Object>>>> dataset = package$.MODULE$.NebulaDataFrameReader(dataFrameReader)
                  .nebula(nebulaConnectionConfig, readNebulaConfig).loadEdgesToGraphx();
              System.out.println("Edge RDD nums:" + dataset.count());
          
            }
          
          }
        
    5. 运行demo

    2.0版本的写入默认为为批量,构造writeNebulaEdgeConfig时使用withBatch设置批量操作条数
    写入时需提前创建好 图空间Edge, Vertex, 及相关属性

    相关文章

      网友评论

          本文标题:[nebula-graph] nebula spark for

          本文链接:https://www.haomeiwen.com/subject/fqsypltx.html