美文网首页
十七 RocksDB 作为存储后端

十七 RocksDB 作为存储后端

作者: NazgulSun | 来源:发表于2021-10-25 21:08 被阅读0次

rocksDB 在huagegraph 中应该有很重要的定位:

  • 作为高性能的存储,在性能测试中可以与 nebula 做有意义的对比;这个自己也可以在后续做一下,看看同时使用rocksdb 和nebula kv 相比性能怎么样,目前各个厂家 在对比 hugegraph vs nebula 时候,都是用 cassandra vs rocksdb,说实话参考性不是那么强;
  • 结合raft +rocksdb ,可以构建真正意义的高可用集群,hugegraph server + rocksdb在一个机器上,能大幅度提升性能
  • 所以 rocksdb 应该是 追求性能的首选,也是hugegraph 走向开源商业化的选择

在现有0.11的版本中,我们看到了hugegraph做了优化,之前0.10版本中, 一个节点存储,可能需要存储多个 key-value,[包含属性,以Id作为前缀],使得查询和存储的负担都变大, 目前已经更改为 一个节点, id -> properties 的序列化; 与 nebula 存储的方式基本一致;
因为,使用的是 纯粹KV,没有 table的概念, 在实现图的model的时候,使用的columFamily 来作为隔离的; hugegraph 依旧是 使用 g_v, g_oe, g_ie 这样的表来分割数据;

    public static class IndexLabel extends SchemaTable {
        public static final String TABLE = HugeType.INDEX_LABEL.string();
        public IndexLabel(String database) {
            super(database, TABLE);
        }
    }
    public static class Vertex extends RocksDBTable {
        public static final String TABLE = HugeType.VERTEX.string();
        public Vertex(String database) {
            super(database, TABLE);
        }
        @Override
        protected BackendColumnIterator queryById(Session session, Id id) {
            return this.getById(session, id);
        }
    }

不过使用 rocksdb 做后端,运维是个大问题,貌似没有什么成熟的管理工具;只能靠图的工具。

RocksDb 的目录,m/s/g 开头,在cassandra 等中,这个作为 table的命名的前缀;rocksdb作为 目录的前缀;

    public static final ConfigOption<String> STORE_SYSTEM =
            new ConfigOption<>(
                    "store.system",
                    "The system table name, which store system data.",
                    disallowEmpty(),
                    "s"
            );

    public static final ConfigOption<String> STORE_SCHEMA =
            new ConfigOption<>(
                    "store.schema",
                    "The schema table name, which store meta data.",
                    disallowEmpty(),
                    "m"
            );

    public static final ConfigOption<String> STORE_GRAPH =
            new ConfigOption<>(
                    "store.graph",
                    "The graph table name, which store vertex, edge and property.",
                    disallowEmpty(),
                    "g"
            );

因为 rocks本身没有keyspace的概念,所以每个 graph ,都会加到 columnFamily的前缀上;

ETL 的高性能尝试

之前nebula 对比 导入性能的时候,对比的是 rocksdb vs cassandra, 并且 还是使用 ingest sst file 这样的方式, 那么显而易见的是,nebula 离线导入性能会吊打 其他的数据库的;

我们知道rocksdb 插入数据是 memtable 到sstfile, 同时提供API 支持用户 自己生成 sstfile,然后ingest到rocksdb中,这样的数据就是飞快的, 我们给出以一个例子, 通过sstfile 插入数据,但是需要保证 sstFile 中的key 是严格递增的; rocksdb engine,会自动把 现有的文件, 和 ingestFile 进行merge;

   public static void main(String[] args) throws RocksDBException {
        RocksDB.loadLibrary();
        final Random random = new Random();
        final EnvOptions envOptions = new EnvOptions();
        final StringAppendOperator stringAppendOperator = new StringAppendOperator();
        Options options1 = new Options();
        SstFileWriter fw = null;
        ComparatorOptions comparatorOptions = new ComparatorOptions();

        try {

            options1 = options1
                    .setCreateIfMissing(true)
                    .setEnv(Env.getDefault())
                    .setComparator(new BytewiseComparator(comparatorOptions));

            fw = new SstFileWriter(envOptions, options1);

            fw.open("./sst_upload_01");
            Map<String, String> data = new HashMap<>();
            for (int index = 0; index < 1000; index++) {
                data.put("Key-" + random.nextLong(), "Value-" + random.nextDouble());
            }
            List<String> keys = new ArrayList<String>(data.keySet());
            Collections.sort(keys);
            for (String key : keys) {
                Slice keySlice = new Slice(key);
                Slice valueSlice = new Slice(data.get(key));
                fw.put(keySlice, valueSlice);
            }

            fw.finish();
        } catch (RocksDBException ex) {
            ex.printStackTrace();
        } finally {
            stringAppendOperator.close();
            envOptions.close();
            options1.close();
            if (fw != null) {
                fw.close();
            }
        }

        try {
            final Options options = new Options().setCreateIfMissing(true);
            final String rocksDirectory = "./rocksdb";
            final Path path = Paths.get(rocksDirectory);
            RocksDB rocksDB = RocksDB.open(options, path.toString());

            List<String> sst = new ArrayList<>();
            sst.add("./sst_upload_01");

            IngestExternalFileOptions igOptions = new IngestExternalFileOptions();
            igOptions.setMoveFiles(true);
            rocksDB.ingestExternalFile(sst, igOptions);

            final RocksIterator iterator = rocksDB.newIterator();
            for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) {
                System.out.println(new String(iterator.key()) +"\t" + new String(iterator.value()));
            }
            } catch (final Exception exception) {
                System.exit(1);
            }

    }

通过 SST file 导入数据到 hugegraph

  • 需要明确知道 hugeVertex 到 key-value的映射关系, 这个在 hugegraph-core 中的 BinaryweSerializer 有明确的方法,把图中的节点,转化成 相对应的 byte数组;
  • 另外,我们需要明确定义 响应的columnFamily, 我们知道 hugegraph 底层是通过 columnFamily 来达到逻辑表的划分的;
    比如 g_v 对应 图中节点数据,在 cassandra 是 keyspace_g_v, 在 rocksdb中, 是 目录 g_hugegraph_v, 这样做的隔离;
  • 另外,我们如果插入数据,必须要对 hugeVertex 的数据进行排序, 我们可以使用 spark 等工具,利用 sortByKey(RangePratitioner)来划分大量数据,分治排序; 目前来看, rocksdb 默认不支持读hdfs和写hdfs, 貌似要 自己编译添加hadoop支持,可以google看看这个方案; 所以我们可以封装一下,从hdfs读取文件,本地生产 sst file 再导入到 rocksdb 中;
  • 另外我们可以在uploader 中 做一个patch,支持生产 sst file, 并导入到rocksdb中;
  • 排序的时候,有一个ticker,就是 hugegraph 对vertex 的序列化,会对 ID加入一个长度信息,所以对于字符串ID,线比较长度,然后是字典序,需要满足这个排序;
    public static void main(String[] args) throws Exception {
        RocksDB.loadLibrary();

        final Random random = new Random();

        final EnvOptions envOptions = new EnvOptions();
        final StringAppendOperator stringAppendOperator = new StringAppendOperator();
        Options options1 = new Options();
        SstFileWriter fw = null;
        ComparatorOptions comparatorOptions = new ComparatorOptions();

        options1 = options1
                .setCreateIfMissing(true)
                .setEnv(Env.getDefault())
                .setComparator(new BytewiseComparator(comparatorOptions));

        fw = new SstFileWriter(envOptions, options1);
        fw.open(targetDir);
        List<String> lines = FileUtils.readLines(new File(companyFile));


        /// hugegraph 序列化的时候,id 第一个字符串为 id 的长度;所以在排序的时候,短ID需要在前面;
        BinarySerializer ser = new BinarySerializer();
        for(String line: lines){
            BackendEntry entry = ser.writeVertex(parse(line));
            BackendEntry.BackendColumn column = entry.columns().iterator().next();
            fw.put(column.name, column.value);
        }

        fw.finish();



    }

    static HugeGraph graph = Mockito.mock(HugeGraph.class);

    static HugeVertex parse(String line){
        String id = line.split("@")[0];
        String name = line.split("@")[1];
        String eid = line.split("@")[2];

        PropertyKey pname = newPropertyKey(IdGenerator.of(1), "name");
        PropertyKey peid = newPropertyKey(IdGenerator.of(2), "eid");
        VertexLabel vl = new VertexLabel(graph,IdGenerator.of(1),"Company");
        vl.idStrategy(IdStrategy.CUSTOMIZE_STRING);
        vl.properties(pname.id(),peid.id());

        HugeVertex vertex = new HugeVertex(graph, IdGenerator.of(id),vl);
        vertex.addProperty(pname,name);
        vertex.addProperty(peid,eid);

        return vertex;
    }


    public static PropertyKey newPropertyKey(Id id, String name) {
        return newPropertyKey(id, name, DataType.TEXT, Cardinality.SINGLE);
    }

    public static PropertyKey newPropertyKey(Id id, String name,
                                             DataType dataType) {
        return newPropertyKey(id, name, dataType, Cardinality.SINGLE);
    }

    public static PropertyKey newPropertyKey(Id id, String name,
                                             DataType dataType,
                                             Cardinality cardinality) {
        PropertyKey schema = new PropertyKey(graph, id, name);
        schema.dataType(dataType);
        schema.cardinality(cardinality);
        return schema;
    }
  • 另外一个坑, hugegraph rocksdb的ingest 功能有个bug, 就是 hugegraph+s/ , 和 hugegraph+g/ 都是用的是同一个目录;
    如果你的sst 文件放在 一个目录下,会被 S和 G 两个空间所ingest,结果是不对的;这个自己修改一下ingest的代码。
  • ingest 的速度是真的快, 800万节点,基本上是1分钟级别;

与cassandra 导入性能测试

同一份数据集,3000万节点,5kw边,导入 cassandra/rocksdb 后端,单机;
rocksdb: 1453s,平均 5万/s。
rocksdb: 3个节点集群模式,5546s;
cassandra:4808,3被多的差距,1.6万/s的情况
3跳查询:
g.V().has('ticker','中国').out().out().out().profile()
rocksdb: 1800 ms
cassandra: 24000ms; 估计有10的性能差距;
如果使用中信证券,3跳, rocksdb 24s,cassandra120s;性能差距还是很大;

相关文章

网友评论

      本文标题:十七 RocksDB 作为存储后端

      本文链接:https://www.haomeiwen.com/subject/scesaltx.html