美文网首页
二十 Gremlin 如何并发执行

二十 Gremlin 如何并发执行

作者: NazgulSun | 来源:发表于2021-11-08 14:42 被阅读0次

    Gremlin 多跳的查询效率问题

    从上一篇我们知道,gremlin 的执行引擎,是一个 dfs 模式,对于
    g.V().out().out() 这样的查询, 会从 图中开始节点, A,B,C
    依次调用每个节点的邻居矩阵, getAdj(A|B|C), 获得level1 之后,然后再依次调用, 这样遍历的效率非特别慢;
    之前在 paypal 上看到 他们的工程师说道对gremlin 的执行引擎进行了并行化的优化;我之前一直想不通,直到从上一篇梳理了执行模式之后, 目前我想到优化的tip就是在 processNextStart 的时候,做并发;

    以 hugegVertexStep 为例

    hugeVertexStep 是hugegraph 获取节点的邻居节点的核心类, 关键方法在flatMap, 也就是给定一个节点,会 返回 他的邻居节点; Vertexstep继承自 flatMapStep:

        @Override
        protected Traverser.Admin<E> processNextStart() {
            while (true) {
                if (this.iterator.hasNext()) {
                    return this.head.split(this.iterator.next(), this);
                } else {
                    closeIterator();
                    this.head = this.starts.next();
                    this.iterator = this.flatMap(this.head);
                }
            }
        }
    

    他的实现方式就是 start 节点 获取 邻居节点,然后一个个的返回给下一层; 每次调用flatMap, hugegVertex 就是对 存储层的一次访问;
    那么这里我们会自然而然的想到一种并发的查询;

    第一种思路,就是先收集所有start 节点,然后并发的去查询邻居节点;

        private Traverser.Admin<Vertex> head = null;
        private Iterator<E> iterator = EmptyIterator.instance();
        private ExecutorService executorService = Executors.newFixedThreadPool(
                Runtime.getRuntime().availableProcessors() *2);
        //可以并发执行
        @Override
        protected Traverser.Admin<E> processNextStart() {
    
            while (true) {
                if (this.iterator.hasNext()) {
                    return this.head.split(this.iterator.next(), this);
                } else {
                    closeIterator();
                    // 这里可以并发执行,也可以用inList
                    this.head = this.starts.next();
                    this.iterator = this.flatMap(this.head);
                    List<Future<Iterator<E>>> futures = Lists.newArrayList();
                    while(this.starts.hasNext()){
                        Traverser.Admin<Vertex> start = this.starts.next();
                        Future<Iterator<E>> its = executorService.submit(()->{
                            Iterator<E>  end = this.flatMap(start);
                            return end;
                        });
                        futures.add(its);
                    }
                    try {
                        for(Future<Iterator<E>> its: futures) {
                            this.iterator = Iterators.concat(iterator, its.get());
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    }
    
                }
            }
        }
    
    

    第二种思路,就是batch模式,依赖与底层的存储层;
    也是收集了所有的 开始节点, 我们能不能提供一个 getAdjByBatchStarts(List starts) , 也就是一次拿出来所有的邻居节点,
    如果底层的数据结构支持,类似与 id in list 这种功能的化,也是一个很好的优化点,效果可能比多线程并发要好。

    但是目前来看,hugegraph,对于 in List 的实现,都是flattern,我估计是为了适配不同的底层存储,如果我们不flattern效果应该不错;
    尝试: g.V().has('ticker','000002').out().out().out().out().profile()
    1, 直接 one by one 的模式, 11s 左右;
    2, inList, 6-7s 左右, 随着 out 展开的数据越多,这个差距会越来越明显; 数据量很少的时候,差距不是很大;

    测试需要修改的代码:

    HugeVertextStep
    
    
        private Traverser.Admin<Vertex> head = null;
        private Iterator<E> iterator = EmptyIterator.instance();
        private Set<Id> sourceIds = Sets.newHashSet();
        //可以并发执行
        @Override
        protected Traverser.Admin<E> processNextStart() {
    
            while (true) {
                if (this.iterator.hasNext()) {
                    return this.head.split(this.iterator.next(), this);
                } else {
                    closeIterator();
                    // 这里可以并发执行,也可以用inList
                    this.head = this.starts.next();
                    sourceIds.add((Id) this.head.get().id());
                    while(this.starts.hasNext()){
                        Traverser.Admin<Vertex> start = this.starts.next();
                        sourceIds.add((Id)start.get().id());
                    }
                    //使用sourceIds
                    this.iterator = this.flatMap(this.head);
                }
            }
        }
    
        public static ConditionQuery stateEdgesQueryWithIDS(Set<Id> sourceIds,
                                                            Directions direction,
                                                            Id... edgeLabels) {
            ConditionQuery query = new ConditionQuery(HugeType.EDGE);
    
            // Edge source vertex
    //        sourceIds.forEach(sourceVertex->{
    //            query.eq(HugeKeys.OWNER_VERTEX, sourceVertex);
    //        });
    
            query.query(Condition.in(HugeKeys.OWNER_VERTEX, Lists.newArrayList(sourceIds)));
    
            // Edge direction
            if (direction == Directions.BOTH) {
                query.query(Condition.or(
                        Condition.eq(HugeKeys.DIRECTION, Directions.OUT),
                        Condition.eq(HugeKeys.DIRECTION, Directions.IN)));
            } else {
                assert direction == Directions.OUT || direction == Directions.IN;
                query.eq(HugeKeys.DIRECTION, direction);
            }
            // Edge labels
            if (edgeLabels.length == 1) {
                query.eq(HugeKeys.LABEL, edgeLabels[0]);
            } else if (edgeLabels.length > 1) {
                query.query(Condition.in(HugeKeys.LABEL,
                        Arrays.asList(edgeLabels)));
            } else {
                assert edgeLabels.length == 0;
            }
    
            return query;
        }
    
    ConditionQueryFlatten
     public static List<ConditionQuery> flatten(ConditionQuery query) {
            if (query.isFlattened() && !query.mayHasDupKeys(SPECIAL_KEYS)) {
                return Arrays.asList(query);
            }
    
            List<ConditionQuery> queries = new ArrayList<>();
    
            // Flatten IN/NOT_IN if needed
            Set<Condition> conditions = InsertionOrderUtil.newSet();
    //        for (Condition condition : query.conditions()) {
    //            Condition cond = flattenIn(condition);
    //            if (cond == null) {
    //                // Process 'XX in []'
    //                return ImmutableList.of();
    //            }
    //            conditions.add(cond);
    //        }
            for (Condition condition : query.conditions()) {
    //            Condition cond = flattenIn(condition);
    //            if (cond == null) {
    //                // Process 'XX in []'
    //                return ImmutableList.of();
    //            }
                conditions.add(condition);
            }
    
    GraphTransaction ->  optimizeQuery  
    comment  verifyEdgesConditionQuery() method
    
    CassandraSerializer: rewrite query 
        //查询edge 的时候,增加InList
        @Override
        protected Query writeQueryEdgeCondition(Query query) {
            ConditionQuery result = (ConditionQuery) query;
            for (Condition.Relation r : result.relations()) {
                Object value = r.value();
                if(value instanceof List){
                    serializeListValue(r,value);
                }else{
                    serializeSingleValue(r,value);
                }
            }
            return null;
        }
    
        private void serializeSingleValue(Condition.Relation r, Object value) {
            if (value instanceof Id) {
                if (r.key() == HugeKeys.OWNER_VERTEX ||
                        r.key() == HugeKeys.OTHER_VERTEX) {
                    // Serialize vertex id
                    r.serialValue(this.writeId((Id) value));
                }
                else {
                    // Serialize label id
                    r.serialValue(((Id) value).asObject());
                }
            }
            else if (value instanceof Directions) {
                r.serialValue(((Directions) value).type().code());
            }
        }
    
        private void serializeListValue(Condition.Relation r, Object value) {
            assert value instanceof List;
            List<Object> lv = (List)value;
            Object singleValue = lv.get(0);
    
            if(singleValue instanceof  Id){
                if (r.key() == HugeKeys.OWNER_VERTEX ||
                        r.key() == HugeKeys.OTHER_VERTEX) {
                    List<Object> ids = Lists.newArrayList();
                    for(Object val : lv){
                        ids.add(this.writeId((Id) val));
                    }
                    // Serialize vertex id
                    r.serialValue(ids);
                }
                else {
                    // Serialize label id
                    List<Object> ids = Lists.newArrayList();
                    for(Object val : lv){
                        ids.add(((Id) val).asObject());
                    }
                    r.serialValue(ids);
                }
            }else if (singleValue instanceof Directions) {
                r.serialValue(((Directions) singleValue).type().code());
            }
        }
    
    cassandraTable: 
    
                case IN:
                    if(value instanceof  List){
                        return QueryBuilder.in(key, (List)value);
                    }else{
                        return QueryBuilder.in(key,value);
                    }
    
    

    思路现在有了,可以具体去实现一下,然后测试性能到底差多少。

    存在的问题
    • return this.head.split(this.iterator.next(), this); head 与 iterator 对应,在path 的时候才能得到正确的信息,需要自己维护,head 与iterator的关系;
    • 开启 8个线程 查询 start 之后(BFS),性能不如 in List, 和原来的 one vertex one query 的模式。 有点奇怪;

    最终的方案:

    • Iterators.concat(iterator, its.get()) 会耗时,用map 存起来;
    • map 存起来 head - > adj 的关系,那么路径信息就会保留;
        private Traverser.Admin<Vertex> head = null;
        private Iterator<E> iterator = EmptyIterator.instance();
        private static ExecutorService executorService = Executors.newFixedThreadPool(8);
        private Map<Traverser.Admin<Vertex>, Iterator<E>> adj = new HashMap<>();
        //可以并发执行
        @Override
        protected Traverser.Admin<E> processNextStart() {
            while (true) {
                if(!adj.isEmpty()){
                    Set<Traverser.Admin<Vertex>> toRemove = Sets.newHashSet();
                    for(Traverser.Admin<Vertex> head: adj.keySet()){
                        Iterator<E> iterator = adj.get(head);
                        if(iterator.hasNext()){
                            return head.split(iterator.next(), this);
                        }else{
                            toRemove.add(head);
                        }
                    }
                    toRemove.forEach(k-> adj.remove(k));
                }
                else {
    
                    closeIterator();
                    // 这里可以并发执行,也可以用inList
                    this.head = this.starts.next();
                    this.iterator = this.flatMap(this.head);
                    adj.put(head,this.iterator);
                    List<Future<Pair<Traverser.Admin<Vertex>,Iterator<E>>>> futures = Lists.newArrayList();
                    while(this.starts.hasNext()){
                        Traverser.Admin<Vertex> start = this.starts.next();
                        Future<Pair<Traverser.Admin<Vertex>,Iterator<E>>> its = executorService.submit(()->{
                            Iterator<E>  end = this.flatMap(start);
                            return new ImmutablePair<>(start, end);
                        });
                        futures.add(its);
                    }
    
                    try {
                        for(Future<Pair<Traverser.Admin<Vertex>,Iterator<E>>> its: futures) {
                            Pair<Traverser.Admin<Vertex>,Iterator<E>> pair = its.get();
                            this.adj.put(pair.getKey(), pair.getValue());
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    }
    
                }
            }
    
        }
    
    

    相关文章

      网友评论

          本文标题:二十 Gremlin 如何并发执行

          本文链接:https://www.haomeiwen.com/subject/wopuzltx.html