美文网首页
Dubbo源码分析(十五) Merger实现

Dubbo源码分析(十五) Merger实现

作者: skyguard | 来源:发表于2018-11-13 11:30 被阅读0次

    下面我们来说一下Dubbo的Merger实现。在开发中,有这么一种情况,先定义了一个UserService接口,有UserServiceImpl和CategoryUserServiceImpl两种实现,它们又分别属于user和category两个组,consumer将调用这两个服务,并按照自定义策略合并返回结果,作为最终结果。这就需要Dubbo的Merger来实现了。我们先来看一下MergeableClusterInvoker的invoke方法

    public Result invoke(final Invocation invocation) throws RpcException {
        // 获得 Invoker 集合
        List<Invoker<T>> invokers = directory.list(invocation);
        // 获得 Merger 拓展名
        String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY);
        // 若果未配置拓展,直接调用首个可用的 Invoker 对象
        if (ConfigUtils.isEmpty(merger)) { // If a method doesn't have a merger, only invoke one Group
            for (final Invoker<T> invoker : invokers) {
                if (invoker.isAvailable()) {
                    return invoker.invoke(invocation);
                }
            }
            return invokers.iterator().next().invoke(invocation);
        }
    
        // 通过反射,获得返回类型
        Class<?> returnType;
        try {
            returnType = getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes()).getReturnType();
        } catch (NoSuchMethodException e) {
            returnType = null;
        }
    
        // 提交线程池,并行执行,发起 RPC 调用,并添加到 results 中
        Map<String, Future<Result>> results = new HashMap<String, Future<Result>>();
        for (final Invoker<T> invoker : invokers) {
            Future<Result> future = executor.submit(new Callable<Result>() {
                public Result call() {
                    // RPC 调用
                    return invoker.invoke(new RpcInvocation(invocation, invoker));
                }
            });
            results.put(invoker.getUrl().getServiceKey(), future);
        }
    
        // 阻塞等待执行执行结果,并添加到 resultList 中
        List<Result> resultList = new ArrayList<Result>(results.size());
        int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        for (Map.Entry<String, Future<Result>> entry : results.entrySet()) {
            Future<Result> future = entry.getValue();
            try {
                Result r = future.get(timeout, TimeUnit.MILLISECONDS);
                if (r.hasException()) { // 异常 Result ,打印错误日志,忽略
                    log.error(new StringBuilder(32).append("Invoke ").append(getGroupDescFromServiceKey(entry.getKey())).append(" failed: ").append(r.getException().getMessage()).toString(), r.getException());
                } else { // 正常 Result ,添加到 resultList 中
                    resultList.add(r);
                }
            } catch (Exception e) { // 异常,抛出 RpcException 异常
                throw new RpcException(new StringBuilder(32).append("Failed to invoke service ").append(entry.getKey()).append(": ").append(e.getMessage()).toString(), e);
            }
        }
    
        // 结果大小为空,返回空的 RpcResult
        if (resultList.isEmpty()) {
            return new RpcResult((Object) null);
        // 结果大小为 1 ,返回首个 RpcResult
        } else if (resultList.size() == 1) {
            return resultList.iterator().next();
        }
        // 返回类型为 void ,返回空的 RpcResult
        if (returnType == void.class) {
            return new RpcResult((Object) null);
        }
    
        Object result;
        // 【第 1 种】基于合并方法
        if (merger.startsWith(".")) {
            // 获得合并方法 Method
            merger = merger.substring(1);
            Method method;
            try {
                method = returnType.getMethod(merger, returnType);
            } catch (NoSuchMethodException e) {
                throw new RpcException(new StringBuilder(32).append("Can not merge result because missing method [ ").append(merger).append(" ] in class [ ").append(returnType.getClass().getName()).append(" ]").toString());
            }
            // 有 Method ,进行合并
            if (method != null) {
                if (!Modifier.isPublic(method.getModifiers())) {
                    method.setAccessible(true);
                }
                result = resultList.remove(0).getValue();
                try {
                    // 方法返回类型匹配,合并时,修改 result
                    if (method.getReturnType() != void.class && method.getReturnType().isAssignableFrom(result.getClass())) {
                        for (Result r : resultList) {
                            result = method.invoke(result, r.getValue());
                        }
                    // 方法返回类型不匹配,合并时,不修改 result
                    } else {
                        for (Result r : resultList) {
                            method.invoke(result, r.getValue());
                        }
                    }
                } catch (Exception e) {
                    throw new RpcException(new StringBuilder(32).append("Can not merge result: ").append(e.getMessage()).toString(), e);
                }
            // 无 Method ,抛出 RpcException 异常
            } else {
                throw new RpcException(new StringBuilder(32).append("Can not merge result because missing method [ ").append(merger).append(" ] in class [ ").append(returnType.getClass().getName()).append(" ]").toString());
            }
        // 【第 2 种】基于 Merger
        } else {
            Merger resultMerger;
            // 【第 2.1 种】根据返回值类型自动匹配 Merger
            if (ConfigUtils.isDefault(merger)) {
                resultMerger = MergerFactory.getMerger(returnType);
            // 【第 2.2 种】指定 Merger
            } else {
                resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger);
            }
            // 有 Merger ,进行合并
            if (resultMerger != null) {
                List<Object> rets = new ArrayList<Object>(resultList.size());
                for (Result r : resultList) {
                    rets.add(r.getValue());
                }
                result = resultMerger.merge(rets.toArray((Object[]) Array.newInstance(returnType, 0)));
            // 无 Merger ,抛出 RpcException 异常
            } else {
                throw new RpcException("There is no merger to merge result.");
            }
        }
        // 返回 RpcResult 结果
        return new RpcResult(result);
    }
    

    调用了Merger进行结果的合并处理。我们再来看一个类MergerFactory,这个类是生产Merger的工厂类

     public static <T> Merger<T> getMerger(Class<T> returnType) {
        Merger result;
        // 数组类型
        if (returnType.isArray()) {
            Class type = returnType.getComponentType();
            // 从缓存中获得 Merger 对象
            result = mergerCache.get(type);
            if (result == null) {
                loadMergers();
                result = mergerCache.get(type);
            }
            // 获取不到,使用 ArrayMerger
            if (result == null && !type.isPrimitive()) {
                result = ArrayMerger.INSTANCE;
            }
        // 普通类型
        } else {
            // 从缓存中获得 Merger 对象
            result = mergerCache.get(returnType);
            if (result == null) {
                loadMergers();
                result = mergerCache.get(returnType);
            }
        }
        return result;
    }
    

    我们再来看一个类ArrayMerger,这个类是数组结果合并的处理类,实现了merge方法

    if (others.length == 0) {
            return null;
        }
        int totalLen = 0;
        for (int i = 0; i < others.length; i++) {
            Object item = others[i];
            if (item != null && item.getClass().isArray()) {
                totalLen += Array.getLength(item);
            } else {
                throw new IllegalArgumentException(
                        new StringBuilder(32).append(i + 1)
                                .append("th argument is not an array").toString());
            }
        }
    
        if (totalLen == 0) {
            return null;
        }
    
        Class<?> type = others[0].getClass().getComponentType();
    
        Object result = Array.newInstance(type, totalLen);
        int index = 0;
        for (Object array : others) {
            for (int i = 0; i < Array.getLength(array); i++) {
                Array.set(result, index++, Array.get(array, i));
            }
        }
        return (Object[]) result;
    

    再来看一个类MapMerger,这个类是Map结果合并的处理类

    if (items.length == 0) {
            return null;
        }
        // 创建结果 Map
        Map<Object, Object> result = new HashMap<Object, Object>();
        // 合并多个 Map
        for (Map<?, ?> item : items) {
            if (item != null) {
                result.putAll(item);
            }
        }
        return result;
    

    还有很多数据类型的处理类,比如BooleanArrayMerger,ByteArrayMerger,CharArrayMerger等,就不在这里具体说了。
    通过Merger实现了对多个返回结果的处理。
    Dubbo的Merger机制就介绍到这里了。

    相关文章

      网友评论

          本文标题:Dubbo源码分析(十五) Merger实现

          本文链接:https://www.haomeiwen.com/subject/lzpkfqtx.html