美文网首页
dubbo服务的调用过程之consumer端发送消息

dubbo服务的调用过程之consumer端发送消息

作者: 一滴水的坚持 | 来源:发表于2018-03-10 14:16 被阅读0次

    dubbo的引用过程中可以看到,最终通过<dubbo:service >引用到的bean为InvokerInvocationHandler,。
    在调用过程中,组装调用的参数组成RpcInvocation,然后传递到对应的Invoker中。

    public class InvokerInvocationHandler implements InvocationHandler {
    
        private final Invoker<?> invoker;
    
        public InvokerInvocationHandler(Invoker<?> handler) {
            this.invoker = handler;
        }
    
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            //忽略很多代码
            return invoker.invoke(new RpcInvocation(method, args)).recreate();
        }
    }
    
    //RpcInvocation的格式
    //RpcInvocation [methodName=sayName, parameterTypes=[class java.lang.String], arguments=[hello], attachments={}]
    

    实质在调用过程中,首先会调用MockClusterInvoker,然后传递到MockClusterInvoker对象中,判断是否需要Mock,若不需要Mock,则直接调用FailoverClusterInvoker处理逻辑。

    public class MockClusterInvoker<T> implements Invoker<T> {
    
        private static final Logger logger = LoggerFactory.getLogger(MockClusterInvoker.class);
    
        private final Directory<T> directory;
    
        private final Invoker<T> invoker;
    
        public Result invoke(Invocation invocation) throws RpcException {
            Result result = null;
            //是否需要Mock
            String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
            if (value.length() == 0 || value.equalsIgnoreCase("false")) {
                //no mock
                result = this.invoker.invoke(invocation);
            } else if (value.startsWith("force")) {
                result = doMockInvoke(invocation, null);
            } else {
                //fail-mock
                try {
                    result = this.invoker.invoke(invocation);
                } catch (RpcException e) {
                        result = doMockInvoke(invocation, e);
                    }
                }
            }
            return result;
        }   
    }
    

    FailoverClusterInvoker继承于AbstractClusterInvoker,首先会调用该父类的invoker方法,根据扩展类,获取LoadBalance算法,和下游系统的列表,然后交由FailoverClusterInvoker处理具体逻辑。

    public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
      
        public Result invoke(final Invocation invocation) throws RpcException {
            checkWhetherDestroyed();
            LoadBalance loadbalance;
            //获取所有Invoker列表
            List<Invoker<T>> invokers = list(invocation);
            if (invokers != null && invokers.size() > 0) {
                loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                        .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
            } else {
                loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
            }
            RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
            return doInvoke(invocation, invokers, loadbalance);
        }
    }
    
    protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
            List<Invoker<T>> invokers = directory.list(invocation);
            return invokers;
    }
    

    FailoverClusterInvoker处理中,首先根据路由算法选择需要调用的invoker,然后将选择的invoker保存在RpcContext上下文中,调用选择的invoker

    public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
    
        private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);
    
        public FailoverClusterInvoker(Directory<T> directory) {
            super(directory);
        }
    
        public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            List<Invoker<T>> copyinvokers = invokers;
            //检查copyinvokers是否为null
            checkInvokers(copyinvokers, invocation);
            //获取重试次数
            int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
            if (len <= 0) {
                len = 1;
            }
            RpcException le = null; // last exception.
            List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
            for (int i = 0; i < len; i++) {
                if (i > 0) {
                    checkWhetherDestroyed();
                    copyinvokers = list(invocation);
                    checkInvokers(copyinvokers, invocation);
                }
                //根据路由算法选择一个invoker对象。
                Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
                //标记为已吊用过
                invoked.add(invoker);
                //将invokers放在RpcContext的上下文中
                RpcContext.getContext().setInvokers((List) invoked);
                try {
                    Result result = invoker.invoke(invocation);
                    return result;
                } catch (RpcException e) {
                }
            }
           
        }
    }
    protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
            if (invokers == null || invokers.size() == 0)
                return null;
            //忽略代码
            Invoker<T> invoker = doselect(loadbalance, invocation, invokers, selected);
            //忽略代码
            return invoker;
    }
    
    private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
            if (invokers == null || invokers.size() == 0)
                return null;
            if (invokers.size() == 1)
                return invokers.get(0);
            if (invokers.size() == 2 && selected != null && selected.size() > 0) {
                //如果有两个服务提供,则用交替提供服务
                return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
            }
            //路由算法,选择一个invoker,默认随机算法
            Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
            if ((selected != null && selected.contains(invoker))
                    || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
                try {
                    Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
                    if (rinvoker != null) {
                        invoker = rinvoker;
                    } else {
                        int index = invokers.indexOf(invoker);
                        invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invoker;
                     
                    }
                } 
            }
            return invoker;
        }
    }
    
    public class RandomLoadBalance extends AbstractLoadBalance {
    
        public static final String NAME = "random";
    
        private final Random random = new Random();
    
        protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
            int length = invokers.size(); // Number of invokers
            //忽略权重算法        
            return invokers.get(random.nextInt(length));
        }
    }
    

    FailOverClusterInvoker调用完成之后,经过一系列的WrapperFilter,(InvokerWrapper,ListenerInvokerWrapper,ProtocolFilterWrapper,ConsumerContextFilter等)最终调用了AbstractInvokerinvoke,然后调用到了DubboInvokerinvoke方法,在ConsumerContextFilter中,设置了RpcContext的相关信息,代码如下:

    public class ConsumerContextFilter implements Filter {
        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            RpcContext.getContext()
                    .setInvoker(invoker)
                    .setInvocation(invocation)
                    .setLocalAddress(NetUtils.getLocalHost(), 0)
                    .setRemoteAddress(invoker.getUrl().getHost(),
                            invoker.getUrl().getPort());
            if (invocation instanceof RpcInvocation) {
                ((RpcInvocation) invocation).setInvoker(invoker);
            }
            try {
                return invoker.invoke(invocation);
            } finally {
                RpcContext.getContext().clearAttachments();
            }
        }
    }
    
    AbstractInvoker.java
    public Result invoke(Invocation inv) throws RpcException {
        RpcInvocation invocation = (RpcInvocation) inv;
        invocation.setInvoker(this);
        if (attachment != null && attachment.size() > 0) {
            invocation.addAttachmentsIfAbsent(attachment);
        }
        Map<String, String> context = RpcContext.getContext().getAttachments();
        //记录调用深度
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        //调用DubboInvoker
        return doInvoke(invocation);
        
    }
    
    //DubboInvoker.java
    @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);
        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        RpcContext.getContext().setFuture(null);
        //netty通信 inv中保存了所有相关信息
        // //RpcInvocation [methodName=sayName, parameterTypes=[class java.lang.String], arguments=[hello], attachments={path=com.alibaba.dubbo.config.api.DemoService, interface=com.alibaba.dubbo.config.api.DemoService, version=0.0.0}]
        return (Result) currentClient.request(inv, timeout).get();
    }
    

    currentClient为ReferenceCountExchangeClient实例,最终通过一系列包装HeaderExchangeClient,HeaderExchangeChannel,最终在HeaderExchangeChannel中封装Request,调用抽象的AbstractPeerNettyClientNettyChannel,将数据发送给服务端(调用netty通信)。

    //HeaderExchangeClient.java
    public ResponseFuture request(Object request) throws RemotingException {
        return channel.request(request);
    }
    
    
    HeaderExchangeChannel.java
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        //忽略若干代码
        Request req = new Request();
        req.setVersion("2.0.0");
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        channel.send(req);
        //Request [id=10, version=2.0.0, twoway=true, event=false, broken=false, data=RpcInvocation [methodName=sayName, parameterTypes=[class java.lang.String], arguments=[hello], attachments={path=com.alibaba.dubbo.config.api.DemoService, interface=com.alibaba.dubbo.config.api.DemoService, version=0.0.0}]]
        return future;
    }
    
    NettyClient.java
    public void send(Object message, boolean sent) throws RemotingException {
            if (send_reconnect && !isConnected()) {
                connect();
            }
            Channel channel = getChannel();
            channel.send(message, sent);
        }
        
    NettyClient.java
    @Override
    protected com.alibaba.dubbo.remoting.Channel getChannel() {
        Channel c = channel;
        if (c == null || !c.isConnected())
            return null;
        return NettyChannel.getOrAddChannel(c, getUrl(), this);
    }
    
    NettyChannel.java
    public void send(Object message, boolean sent) throws RemotingException {
        //忽略若干代码
       //channel 为org.jboss.netty.channel.Channel 
       //Request [id=6, version=2.0.0, twoway=true, event=false, broken=false, data=RpcInvocation [methodName=sayName, parameterTypes=[class java.lang.String], arguments=[hello], attachments={path=com.alibaba.dubbo.config.api.DemoService, interface=com.alibaba.dubbo.config.api.DemoService, version=0.0.0}]]
        ChannelFuture future = channel.write(message);
        //忽略若干代码 
    }
    

    到这里,消费方就将信息已经发送给了服务端。编解码部分暂时不分析。

    相关文章

      网友评论

          本文标题:dubbo服务的调用过程之consumer端发送消息

          本文链接:https://www.haomeiwen.com/subject/bdbvfftx.html