美文网首页
[nebula-graph] nebula spark for

[nebula-graph] nebula spark for

作者: Lo_ading | 来源:发表于2021-07-14 14:01 被阅读0次

NebulaSparkConnector2.0

[参考链接]
[参考链接]:https://docs.nebula-graph.com.cn/spark-connector/sc-ug-what-is-spark-connector/

[代码]
[代码]:https://github.com/Loading-Life/nebula-spark-java-demo

说明

Nebula Spark Connector 是一个 Spark 连接器,提供了通过 Spark 标准形式读写 Nebula Graph 数据库的能力,由以下两部分组成:

  • Reader:为您提供了一个 Spark SQL 接口,您可以使用 Spark SQL 接口编程读取 Nebula Graph 图数据,单次读取一个点或边类型的数据,并将读取的结果组装成 Spark 的 DataFrame。

  • Writer:为您提供了一个 Spark SQL 接口,您可以使用 Spark SQL 接口编程将 DataFrame 格式的数据逐条或批量写入 Nebula Graph。

适用场景

  • 在不同的 Nebula Graph 集群之间迁移数据。
  • 在同一个 Nebula Graph 集群内不同图空间之间迁移数据。
  • Nebula Graph 与其他数据源之间迁移数据。

环境

  • NebulaGraph : 2.0.0
  • Apache Spark™ : 2.4.4
  • Scala : 2.11.12
  • Java : 1.8

使用

  1. clone & insall

     $ git clone https://github.com/vesoft-inc/nebula-spark-utils.git
     $ cd nebula-spark-utils
     $ git checkout -b v2.0.0
     $ mvn clean install -Dgpg.skip -Dmaven.javadoc.skip=true -Dmaven.test.skip=true
    
  2. 创建你的maven项目

  3. 添加maven依赖

     <dependency>
       <groupId>com.vesoft</groupId>
       <artifactId>nebula-spark</artifactId>
       <version>2.0.0</version>
     </dependency>
    
  4. 编写测试类并修改成你的配置

    • 数据样例

      dege

        {"src":12345,"dst":23456,"degree":34, "descr": "aaa","timep": "2020-01-01"}
        {"src":11111,"dst":22222,"degree":33, "descr": "aaa","timep": "2020-01-01"}
        {"src":11111,"dst":33333,"degree":32, "descr": "a\baa","timep": "2020-01-01"}
        {"src":11111,"dst":44444,"degree":31, "descr": "aaa","timep": "2020-01-01"}
        {"src":22222,"dst":55555,"degree":30, "descr": "a\naa","timep": "2020-01-01"}
        {"src":33333,"dst":44444,"degree":29, "descr": "aaa","timep": "2020-01-01"}
        {"src":33333,"dst":55555,"degree":28, "descr": "aa\ta","timep": "2020-01-01"}
        {"src":44444,"dst":22222,"degree":27, "descr": "aaa","timep": "2020-01-01"}
        {"src":44444,"dst":55555,"degree":26, "descr": "aaa","timep": "2020-01-01"}
        {"src":22222,"dst":66666,"degree":25, "descr": "aaa","timep": "2020-01-01"}
        {"src":21,"dst":22,"degree":24, "descr": "aaa","timep": "2021-04-09"}
      

      vertex

        te{"id":"aa","name":"Tom","age":10,"timep": "2020-01-01"}
        {"id":"\ns","name":"Bob","age":11,"timep": "2020-01-02"}
        {"id":14,"name":"Jane","age":12,"timep": "2020-01-03"}
        {"id":15,"name":"Jena","age":13,"timep": "2020-01-04"}
        {"id":16,"name":"Ni\tc","age":14,"timep": "2020-01-05"}
        {"id":17,"name":"Mei","age":15,"timep": "2020-01-06"}
        {"id":18,"name":"HH","age":16,"timep": "2020-01-07"}
        {"id":19,"name":"Ty\nler","age":17,"timep": "2020-01-08"}
        {"id":20,"name":"Ber","age":18,"timep": "2020-01-09"}
        {"id":21,"name":"Mercy","age":19,"timep": "2020-01-10"}
        {"id":22,"name":"why","age":27,"timep": "2021-04-09"}
      
    • write代码示例

        package com.loading.nebula;
      
        import com.facebook.thrift.protocol.TCompactProtocol;
        import com.vesoft.nebula.connector.NebulaConnectionConfig;
        import com.vesoft.nebula.connector.WriteNebulaEdgeConfig;
        import com.vesoft.nebula.connector.WriteNebulaVertexConfig;
        import org.apache.spark.SparkConf;
        import org.apache.spark.SparkContext;
        import org.apache.spark.sql.DataFrameWriter;
        import org.apache.spark.sql.Dataset;
        import org.apache.spark.sql.Row;
        import org.apache.spark.sql.SparkSession;
        import org.apache.spark.storage.StorageLevel;
        import com.vesoft.nebula.connector.connector.package$;
        import org.slf4j.Logger;
        import org.slf4j.LoggerFactory;
        
        
        /**
         * desc:
         *
         * @author Lo_ading
         * @version 1.0.0
         * @date 2021/4/9
         */
        public class NebulaSparkWriterExample {
        
          private final Logger logger = LoggerFactory.getLogger(NebulaSparkWriterExample.class);
        
          public static void main(String[] args) {
        
            SparkConf sparkConf = new SparkConf();
            sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        
            Class<?>[] classes = {TCompactProtocol.class};
            sparkConf.registerKryoClasses(classes);
        
            SparkContext sparkContext = new SparkContext("local", "NebulaSparkWriterExample", sparkConf);
            SparkSession sparkSession = new SparkSession(sparkContext);
        
            writeData(sparkSession);
        
            sparkSession.close();
            System.exit(0);
        
          }
        
          private static void writeData(SparkSession sparkSession) {
        
            // build and setting nubula connection config
            NebulaConnectionConfig nebulaConnectionConfig = NebulaConnectionConfig
                .builder()
                .withMetaAddress("your-meta-host:meta-port")
                .withGraphAddress("your-graph-host:graph-port")
                .withConenctionRetry(2)
                .build();
        
            //write vertex data
            System.out.println("Start to write nebula data [vertex]");
            
            //your data file
            Dataset<Row> vertexDataset = sparkSession.read().json("/data/vertex");
            vertexDataset.show();
            vertexDataset.persist(StorageLevel.MEMORY_ONLY_SER());
            WriteNebulaVertexConfig writeNebulaVertexConfig = WriteNebulaVertexConfig
                .builder()
                .withSpace("test")
                .withTag("person")
                .withVidField("id")
                .withVidAsProp(true)
                .withBatch(1000)
                .build();
            DataFrameWriter<Row> vertexDataFrameWriter = new DataFrameWriter<>(vertexDataset);
            package$.MODULE$.NebulaDataFrameWriter(vertexDataFrameWriter)
                .nebula(nebulaConnectionConfig, writeNebulaVertexConfig).writeVertices();
            System.out.println("End to write nebula data [vertex]");
        
            //写入edge数据
            System.out.println("Start to write nebula data [edge]");
            Dataset<Row> edgeDataset = sparkSession.read().json("/data/edge");
            edgeDataset.show();
            edgeDataset.persist(StorageLevel.MEMORY_ONLY_SER());
            WriteNebulaEdgeConfig writeNebulaEdgeConfig = WriteNebulaEdgeConfig
                .builder()
                .withSpace("test")
                .withEdge("friend")
                .withSrcIdField("src")
                .withDstIdField("dst")
                .withRankField("degree")
                .withSrcAsProperty(true)
                .withDstAsProperty(true)
                .withRankAsProperty(true)
                .withBatch(1000)
                .build();
            DataFrameWriter<Row> edgeDataFrameWriter = new DataFrameWriter<>(edgeDataset);
            package$.MODULE$.NebulaDataFrameWriter(edgeDataFrameWriter)
                .nebula(nebulaConnectionConfig, writeNebulaEdgeConfig).writeEdges();
            System.out.println("End to write nebula data [edge]");
      
          }
        
        
        }
      
    • read代码示例

        package com.loading.nebula;
        
        import com.facebook.thrift.protocol.TCompactProtocol;
        import com.vesoft.nebula.connector.NebulaConnectionConfig;
        import com.vesoft.nebula.connector.ReadNebulaConfig;
        import org.apache.spark.SparkConf;
        import org.apache.spark.SparkContext;
        import org.apache.spark.graphx.Edge;
        import org.apache.spark.rdd.RDD;
        import org.apache.spark.sql.DataFrameReader;
        import org.apache.spark.sql.Dataset;
        import org.apache.spark.sql.Row;
        import org.apache.spark.sql.SparkSession;
        import org.slf4j.Logger;
        import org.slf4j.LoggerFactory;
        import scala.Tuple2;
        import scala.collection.immutable.List;
        import com.vesoft.nebula.connector.connector.package$;
        import scala.collection.immutable.List$;
        import scala.collection.mutable.StringBuilder;
        
        /**
         * desc:
         *
         * @author Lo_ading
         * @version 1.0.0
         * @date 2021/4/9
         */
        public class NebulaSparkReaderExample {
        
          private final Logger logger = LoggerFactory.getLogger(NebulaSparkWriterExample.class);
        
          public static void main(String[] args) {
        
            SparkConf sparkConf = new SparkConf();
            sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        
            Class<?>[] classes = {TCompactProtocol.class};
            sparkConf.registerKryoClasses(classes);
        
            SparkContext sparkContext = new SparkContext("local", "NebulaSparkReaderExample", sparkConf);
            SparkSession sparkSession = new SparkSession(sparkContext);
        
            readData(sparkSession);
        
            sparkSession.close();
            System.exit(0);
        
          }
        
          private static void readData(SparkSession sparkSession) {
        
            // build connection config
            NebulaConnectionConfig nebulaConnectionConfig = NebulaConnectionConfig
                .builder()
                .withMetaAddress("your-meta-host:meta-port")
                .withConenctionRetry(2)
                .withTimeout(600)
                .build();
        
            readVertex(sparkSession, nebulaConnectionConfig);
            readEdges(sparkSession, nebulaConnectionConfig);
            readVertexGraph(sparkSession, nebulaConnectionConfig);
            readEdgeGraph(sparkSession, nebulaConnectionConfig);
          }
        
          private static void readVertex(SparkSession sparkSession,
              NebulaConnectionConfig nebulaConnectionConfig) {
        
            List<String> cols = List$.MODULE$.empty();
            cols.addString(new StringBuilder("name"));
            cols.addString(new StringBuilder("age"));
            cols.addString(new StringBuilder("id"));
            cols.addString(new StringBuilder("timep"));
            ReadNebulaConfig readNebulaConfig = ReadNebulaConfig
                .builder()
                .withSpace("test")
                .withLabel("person")
                .withNoColumn(false)
                .withReturnCols(cols)
                .build();
            DataFrameReader dataFrameReader = new DataFrameReader(sparkSession);
            Dataset<Row> dataset = package$.MODULE$.NebulaDataFrameReader(dataFrameReader)
                .nebula(nebulaConnectionConfig, readNebulaConfig).loadVerticesToDF();
            System.out.println("Vertices schema");
            dataset.printSchema();
            dataset.show(20);
            System.out.println("Vertices nums:" + dataset.count());
        
          }
        
          private static void readEdges(SparkSession sparkSession,
              NebulaConnectionConfig nebulaConnectionConfig) {
            ReadNebulaConfig readNebulaConfig = ReadNebulaConfig
                .builder()
                .withSpace("test")
                .withLabel("friend")
                .withNoColumn(true)
                .build();
            DataFrameReader dataFrameReader = new DataFrameReader(sparkSession);
            Dataset<Row> dataset = package$.MODULE$.NebulaDataFrameReader(dataFrameReader)
                .nebula(nebulaConnectionConfig, readNebulaConfig).loadEdgesToDF();
            System.out.println("Edge schema");
            dataset.printSchema();
            dataset.show(20);
            System.out.println("Edge nums:" + dataset.count());
          }
        
          private static void readVertexGraph(SparkSession sparkSession,
              NebulaConnectionConfig nebulaConnectionConfig) {
            List<String> cols = List$.MODULE$.empty();
            cols.addString(new StringBuilder("name"));
            ReadNebulaConfig readNebulaConfig = ReadNebulaConfig
                .builder()
                .withSpace("test")
                .withLabel("person")
                .withReturnCols(cols)
                .build();
            DataFrameReader dataFrameReader = new DataFrameReader(sparkSession);
            RDD<Tuple2<Object, List<Object>>> vertexRdd = package$.MODULE$
                .NebulaDataFrameReader(dataFrameReader).nebula(nebulaConnectionConfig, readNebulaConfig)
                .loadVerticesToGraphx();
            System.out.println("Vertices RDD nums:" + vertexRdd.count());
          }
        
          private static void readEdgeGraph(SparkSession sparkSession,
              NebulaConnectionConfig nebulaConnectionConfig) {
            ReadNebulaConfig readNebulaConfig = ReadNebulaConfig
                .builder()
                .withSpace("test")
                .withLabel("friend")
                .withNoColumn(true)
                .build();
            DataFrameReader dataFrameReader = new DataFrameReader(sparkSession);
            RDD<Edge<Tuple2<Object, List<Object>>>> dataset = package$.MODULE$.NebulaDataFrameReader(dataFrameReader)
                .nebula(nebulaConnectionConfig, readNebulaConfig).loadEdgesToGraphx();
            System.out.println("Edge RDD nums:" + dataset.count());
        
          }
        
        }
      
  5. 运行demo

2.0版本的写入默认为为批量,构造writeNebulaEdgeConfig时使用withBatch设置批量操作条数
写入时需提前创建好 图空间Edge, Vertex, 及相关属性

相关文章

网友评论

      本文标题:[nebula-graph] nebula spark for

      本文链接:https://www.haomeiwen.com/subject/fqsypltx.html