rocksDB 在huagegraph 中应该有很重要的定位:
- 作为高性能的存储,在性能测试中可以与 nebula 做有意义的对比;这个自己也可以在后续做一下,看看同时使用rocksdb 和nebula kv 相比性能怎么样,目前各个厂家 在对比 hugegraph vs nebula 时候,都是用 cassandra vs rocksdb,说实话参考性不是那么强;
- 结合raft +rocksdb ,可以构建真正意义的高可用集群,hugegraph server + rocksdb在一个机器上,能大幅度提升性能
- 所以 rocksdb 应该是 追求性能的首选,也是hugegraph 走向开源商业化的选择
在现有0.11的版本中,我们看到了hugegraph做了优化,之前0.10版本中, 一个节点存储,可能需要存储多个 key-value,[包含属性,以Id作为前缀],使得查询和存储的负担都变大, 目前已经更改为 一个节点, id -> properties 的序列化; 与 nebula 存储的方式基本一致;
因为,使用的是 纯粹KV,没有 table的概念, 在实现图的model的时候,使用的columFamily 来作为隔离的; hugegraph 依旧是 使用 g_v, g_oe, g_ie 这样的表来分割数据;
public static class IndexLabel extends SchemaTable {
public static final String TABLE = HugeType.INDEX_LABEL.string();
public IndexLabel(String database) {
super(database, TABLE);
}
}
public static class Vertex extends RocksDBTable {
public static final String TABLE = HugeType.VERTEX.string();
public Vertex(String database) {
super(database, TABLE);
}
@Override
protected BackendColumnIterator queryById(Session session, Id id) {
return this.getById(session, id);
}
}
不过使用 rocksdb 做后端,运维是个大问题,貌似没有什么成熟的管理工具;只能靠图的工具。
RocksDb 的目录,m/s/g 开头,在cassandra 等中,这个作为 table的命名的前缀;rocksdb作为 目录的前缀;
public static final ConfigOption<String> STORE_SYSTEM =
new ConfigOption<>(
"store.system",
"The system table name, which store system data.",
disallowEmpty(),
"s"
);
public static final ConfigOption<String> STORE_SCHEMA =
new ConfigOption<>(
"store.schema",
"The schema table name, which store meta data.",
disallowEmpty(),
"m"
);
public static final ConfigOption<String> STORE_GRAPH =
new ConfigOption<>(
"store.graph",
"The graph table name, which store vertex, edge and property.",
disallowEmpty(),
"g"
);
因为 rocks本身没有keyspace的概念,所以每个 graph ,都会加到 columnFamily的前缀上;
ETL 的高性能尝试
之前nebula 对比 导入性能的时候,对比的是 rocksdb vs cassandra, 并且 还是使用 ingest sst file 这样的方式, 那么显而易见的是,nebula 离线导入性能会吊打 其他的数据库的;
我们知道rocksdb 插入数据是 memtable 到sstfile, 同时提供API 支持用户 自己生成 sstfile,然后ingest到rocksdb中,这样的数据就是飞快的, 我们给出以一个例子, 通过sstfile 插入数据,但是需要保证 sstFile 中的key 是严格递增的; rocksdb engine,会自动把 现有的文件, 和 ingestFile 进行merge;
public static void main(String[] args) throws RocksDBException {
RocksDB.loadLibrary();
final Random random = new Random();
final EnvOptions envOptions = new EnvOptions();
final StringAppendOperator stringAppendOperator = new StringAppendOperator();
Options options1 = new Options();
SstFileWriter fw = null;
ComparatorOptions comparatorOptions = new ComparatorOptions();
try {
options1 = options1
.setCreateIfMissing(true)
.setEnv(Env.getDefault())
.setComparator(new BytewiseComparator(comparatorOptions));
fw = new SstFileWriter(envOptions, options1);
fw.open("./sst_upload_01");
Map<String, String> data = new HashMap<>();
for (int index = 0; index < 1000; index++) {
data.put("Key-" + random.nextLong(), "Value-" + random.nextDouble());
}
List<String> keys = new ArrayList<String>(data.keySet());
Collections.sort(keys);
for (String key : keys) {
Slice keySlice = new Slice(key);
Slice valueSlice = new Slice(data.get(key));
fw.put(keySlice, valueSlice);
}
fw.finish();
} catch (RocksDBException ex) {
ex.printStackTrace();
} finally {
stringAppendOperator.close();
envOptions.close();
options1.close();
if (fw != null) {
fw.close();
}
}
try {
final Options options = new Options().setCreateIfMissing(true);
final String rocksDirectory = "./rocksdb";
final Path path = Paths.get(rocksDirectory);
RocksDB rocksDB = RocksDB.open(options, path.toString());
List<String> sst = new ArrayList<>();
sst.add("./sst_upload_01");
IngestExternalFileOptions igOptions = new IngestExternalFileOptions();
igOptions.setMoveFiles(true);
rocksDB.ingestExternalFile(sst, igOptions);
final RocksIterator iterator = rocksDB.newIterator();
for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) {
System.out.println(new String(iterator.key()) +"\t" + new String(iterator.value()));
}
} catch (final Exception exception) {
System.exit(1);
}
}
通过 SST file 导入数据到 hugegraph
- 需要明确知道 hugeVertex 到 key-value的映射关系, 这个在 hugegraph-core 中的 BinaryweSerializer 有明确的方法,把图中的节点,转化成 相对应的 byte数组;
- 另外,我们需要明确定义 响应的columnFamily, 我们知道 hugegraph 底层是通过 columnFamily 来达到逻辑表的划分的;
比如 g_v 对应 图中节点数据,在 cassandra 是 keyspace_g_v, 在 rocksdb中, 是 目录 g_hugegraph_v, 这样做的隔离; - 另外,我们如果插入数据,必须要对 hugeVertex 的数据进行排序, 我们可以使用 spark 等工具,利用 sortByKey(RangePratitioner)来划分大量数据,分治排序; 目前来看, rocksdb 默认不支持读hdfs和写hdfs, 貌似要 自己编译添加hadoop支持,可以google看看这个方案; 所以我们可以封装一下,从hdfs读取文件,本地生产 sst file 再导入到 rocksdb 中;
- 另外我们可以在uploader 中 做一个patch,支持生产 sst file, 并导入到rocksdb中;
- 排序的时候,有一个ticker,就是 hugegraph 对vertex 的序列化,会对 ID加入一个长度信息,所以对于字符串ID,线比较长度,然后是字典序,需要满足这个排序;
public static void main(String[] args) throws Exception {
RocksDB.loadLibrary();
final Random random = new Random();
final EnvOptions envOptions = new EnvOptions();
final StringAppendOperator stringAppendOperator = new StringAppendOperator();
Options options1 = new Options();
SstFileWriter fw = null;
ComparatorOptions comparatorOptions = new ComparatorOptions();
options1 = options1
.setCreateIfMissing(true)
.setEnv(Env.getDefault())
.setComparator(new BytewiseComparator(comparatorOptions));
fw = new SstFileWriter(envOptions, options1);
fw.open(targetDir);
List<String> lines = FileUtils.readLines(new File(companyFile));
/// hugegraph 序列化的时候,id 第一个字符串为 id 的长度;所以在排序的时候,短ID需要在前面;
BinarySerializer ser = new BinarySerializer();
for(String line: lines){
BackendEntry entry = ser.writeVertex(parse(line));
BackendEntry.BackendColumn column = entry.columns().iterator().next();
fw.put(column.name, column.value);
}
fw.finish();
}
static HugeGraph graph = Mockito.mock(HugeGraph.class);
static HugeVertex parse(String line){
String id = line.split("@")[0];
String name = line.split("@")[1];
String eid = line.split("@")[2];
PropertyKey pname = newPropertyKey(IdGenerator.of(1), "name");
PropertyKey peid = newPropertyKey(IdGenerator.of(2), "eid");
VertexLabel vl = new VertexLabel(graph,IdGenerator.of(1),"Company");
vl.idStrategy(IdStrategy.CUSTOMIZE_STRING);
vl.properties(pname.id(),peid.id());
HugeVertex vertex = new HugeVertex(graph, IdGenerator.of(id),vl);
vertex.addProperty(pname,name);
vertex.addProperty(peid,eid);
return vertex;
}
public static PropertyKey newPropertyKey(Id id, String name) {
return newPropertyKey(id, name, DataType.TEXT, Cardinality.SINGLE);
}
public static PropertyKey newPropertyKey(Id id, String name,
DataType dataType) {
return newPropertyKey(id, name, dataType, Cardinality.SINGLE);
}
public static PropertyKey newPropertyKey(Id id, String name,
DataType dataType,
Cardinality cardinality) {
PropertyKey schema = new PropertyKey(graph, id, name);
schema.dataType(dataType);
schema.cardinality(cardinality);
return schema;
}
- 另外一个坑, hugegraph rocksdb的ingest 功能有个bug, 就是 hugegraph+s/ , 和 hugegraph+g/ 都是用的是同一个目录;
如果你的sst 文件放在 一个目录下,会被 S和 G 两个空间所ingest,结果是不对的;这个自己修改一下ingest的代码。 - ingest 的速度是真的快, 800万节点,基本上是1分钟级别;
与cassandra 导入性能测试
同一份数据集,3000万节点,5kw边,导入 cassandra/rocksdb 后端,单机;
rocksdb: 1453s,平均 5万/s。
rocksdb: 3个节点集群模式,5546s;
cassandra:4808,3被多的差距,1.6万/s的情况
3跳查询:
g.V().has('ticker','中国').out().out().out().profile()
rocksdb: 1800 ms
cassandra: 24000ms; 估计有10的性能差距;
如果使用中信证券,3跳, rocksdb 24s,cassandra120s;性能差距还是很大;
网友评论