美文网首页
hsf笔记-InvocationHandler

hsf笔记-InvocationHandler

作者: 兴浩 | 来源:发表于2018-08-14 15:35 被阅读75次

    InvocationHandler使用责任链的方式来调用,总体思路就是一(出口)带多(拦截器)

    1.InvocationHandlerChainFactory

    InvocationHandlerChainFactory用于构建InvocationHandler链路

    public interface InvocationHandlerInterceptor extends InvocationHandler {
        void setInvocationHandler(InvocationHandler var1);
    }
    
    public class InvocationHandlerChainFactory {
        public InvocationHandlerChainFactory() {
        }
    
        public static InvocationHandler buildHandlerChain(ServiceMetadata metadata, InvocationHandler invocationHandler) {
            List handlers;
            if (metadata.isProvider()) {
                handlers = HSFServiceContainer.getInstances(InvocationHandlerInterceptor.class, new String[]{"server"});
            } else {
                handlers = HSFServiceContainer.getInstances(InvocationHandlerInterceptor.class, new String[]{"client"});
            }
    
            InvocationHandler last = invocationHandler;
    
            for(int i = handlers.size() - 1; i >= 0; --i) {
                InvocationHandlerInterceptor handler = (InvocationHandlerInterceptor)handlers.get(i);
                handler.setInvocationHandler((InvocationHandler)last);
                if (handler instanceof ServiceMetadataAware) {
                    ((ServiceMetadataAware)handler).setServiceMetadata(metadata);
                }
    
                if (handler instanceof LifeCycle) {
                    ((LifeCycle)handler).start();
                }
    
                last = handler;
            }
    
            return (InvocationHandler)last;
        }
    }
    

    以Consumer为例,获取到6个InvocationHandler

    转换成责任链模式

    2.InvocationHandler

    2.1 DubboAsyncInvocationHandler

    根据InvokeMode类型执行,这里是同步则直接掉到下个InvocationHandler

        public ListenableFuture<RPCResult> invoke(Invocation invocation) throws Throwable {
            InvokeMode invokeType = invocation.getInvokeType();
            ListenableFuture rpcFuture;
            if (InvokeMode.FUTURE == invokeType) {
                rpcFuture = this.delegate.invoke(invocation);
                this.futureInvoke(invocation, rpcFuture);
            } else if (InvokeMode.CALLBACK == invokeType) {
                rpcFuture = callbackInvoke(invocation, this.delegate, this.threadPoolService.callbackExecutor());
            } else {
                rpcFuture = this.delegate.invoke(invocation);
            }
    
            return rpcFuture;
        }
    

    2.2 FutureInvocationHandler

    将UserThreadPreferedExecutor设置为Invocation的executor属性

        public ListenableFuture<RPCResult> invoke(Invocation invocation) throws Throwable {
            InvokeMode invokeType = invocation.getInvokeType();
            if (InvokeMode.SYNC == invokeType) {
                UserThreadPreferedExecutor userThreadPreferedExecutor = new UserThreadPreferedExecutor(this.threadPoolService.internalCallbackExecutor());
                invocation.setExecutor(userThreadPreferedExecutor);
                ListenableFuture<RPCResult> rpcFuture = this.delegate.invoke(invocation);
                ListenableFuture<RPCResult> rpcFuture = new UserThreadPreferedListenableFuture(userThreadPreferedExecutor, rpcFuture);
                return rpcFuture;
            } else if (InvokeMode.NEW_FUTURE == invokeType) {
                Executor futureListenerExecutor = FutureInvocationHelper.futureListenerExecutor;
                if (futureListenerExecutor == null) {
                    futureListenerExecutor = this.threadPoolService.getExecutorManager().getDefaultExecutor();
                }
    
                UserThreadPreferedExecutor userThreadPreferedExecutor = new UserThreadPreferedExecutor((Executor)futureListenerExecutor);
                invocation.setExecutor(userThreadPreferedExecutor);
                ListenableFuture<RPCResult> future = this.delegate.invoke(invocation);
                if (this.serviceMetadata.getConfigStyle() == EnumConfigStyle.HSF) {
                    ListenableFuture<Object> futureWrapper = new FutureInvocationHandler.BizAdapterFuture(MoreExecutors.directExecutor(), future, invocation);
                    UserThreadPreferedListenableFuture<Object> wrapper = new UserThreadPreferedListenableFuture(userThreadPreferedExecutor, futureWrapper);
                    FutureInvocationHelper.FUTURE_HOLDER.set(wrapper);
                }
    
                return future;
            } else {
                return this.delegate.invoke(invocation);
            }
        }
    

    2.3 AsyncInvocationHandler

    有点类似DubboAsyncInvocationHandler,也是根据InvokeMode类型执行

        public ListenableFuture<RPCResult> invoke(Invocation invocation) throws Throwable {
            InvokeMode invokeType = invocation.getInvokeType();
            ListenableFuture rpcFuture;
            if (InvokeMode.FUTURE == invokeType) {
                rpcFuture = this.futureInvoke(invocation);
            } else if (InvokeMode.CALLBACK == invokeType) {
                rpcFuture = CallbackClient.invoke(invocation, this.delegate, this.threadPoolService.callbackExecutor());
            } else {
                rpcFuture = this.delegate.invoke(invocation);
            }
    
            return rpcFuture;
        }
    

    2.4 TestAddressInvocationHandler

    测试环境下更改目标地址targetAddress

        public void setServiceMetadata(ServiceMetadata serviceMetadata) {
            if (this.env.getRunMode() == RunMode.TEST) {
                String targetURLString = (String)serviceMetadata.getServiceProperties().get("target");
                if (!StringUtils.isBlank(targetURLString)) {
                    this.testAddress = serviceMetadata.getProtocolFilterChain().accept(serviceMetadata, targetURLString);
                }
            }
        }
    
        public ListenableFuture<RPCResult> invoke(Invocation invocation) throws Throwable {
            if (this.testAddress != null) {
                invocation.setTargetAddress(this.testAddress);
            }
            return this.delegate.invoke(invocation);
        }
    

    2.5 FilterInvocationHandler

    又是一个责任链,后续再分析

    public class FilterInvocationHandler extends AbstractInvocationHandlerInterceptor implements ServiceMetadataAware, LifeCycle {
        private ServiceMetadata serviceMetadata;
        private InvocationHandler filterDelegate;
    
        public FilterInvocationHandler() {
        }
    
        public ListenableFuture<RPCResult> invoke(Invocation invocation) throws Throwable {
            return this.filterDelegate.invoke(invocation);
        }
    
        public void setServiceMetadata(ServiceMetadata serviceMetadata) {
            this.serviceMetadata = serviceMetadata;
        }
    
        public void start() {
            this.filterDelegate = RPCFilterBuilder.buildInvokerChain(this.delegate, this.serviceMetadata);
        }
    
        public void stop() {
        }
    }
    

    2.6 LocalInvocationHandler

    判断是否本地执行

       public ListenableFuture<RPCResult> invoke(Invocation invocation) throws Throwable {
            ConsumerMethodModel methodModel = invocation.getClientInvocationContext().getMethodModel();
            String serviceUniqueName = methodModel.getUniqueName();
            ServiceMetadata serviceMetadata = this.isLocalInvoke(serviceUniqueName, invocation);
            return serviceMetadata != null ? this.localInvoke(serviceMetadata, invocation, serviceUniqueName) : this.delegate.invoke(invocation);
        }
    

    2.7 DelegateInvocationHandler

    纯代理转跳

    public class DelegateInvocationHandler extends AbstractInvocationHandlerInterceptor {
        public DelegateInvocationHandler() {
        }
    
        public ListenableFuture<RPCResult> invoke(Invocation invocation) throws Throwable {
            return this.delegate.invoke(invocation);
        }
    }
    

    3.service的refer

    上面的流程会发现到了DelegateInvocationHandler就执行完了,但并没有执行到远程调用流程,所以上面的流程只是其中一部分,在service调用refer方法后,会将执行后续流程

        public InvocationHandler refer() {
            try {
                InvocationHandler protocolInvocationHandler = this.protocolFilterChain.refer(this);
                this.consumerHandlerHitch.setInvocationHandler(protocolInvocationHandler);
                return this.invocationHandler;
            } catch (Throwable var3) {
                String errorCodeStr = LoggerHelper.getErrorCodeStr("HSF", "HSF-0060", "HSF", "refer service: " + this.getUniqueName() + ", group=" + this.getGroup() + " got error");
                LOGGER.error("HSF-0060", errorCodeStr, var3);
                throw new RuntimeException(var3);
            }
        }
    

    3.1 RegistryInvocationHandler

    设置Invocation的目标地址(targetAddress)

    3.2 RemotingRPCProtocolComponent

    真正实现RPC请求的类

    来看一下执行的堆栈

    send方法调用了netty中channel的writeAndFlush方法

        public void send(Object packet) {
            if (this.client.isWaterMarkEnabled() && !this.isWritable()) {
                String errorMsg = MessageFormat.format("write overflow, client gave up writing request to channel {0}, bytesBeforeWritable: {1}, watermark: [{2}-{3}] bytes", this.channel, this.channel.bytesBeforeUnwritable(), this.client.getLowWaterMark(), this.client.getHighWaterMark());
                LOGGER.error("HSF-0102", errorMsg);
                throw new HSFException(errorMsg);
            } else {
                this.channel.writeAndFlush(packet);
            }
        }
    

    4.SyncInvocationHandler

    同步执行器,同样也是一个责任链

    this.invocationHandler = InvocationHandlerChainFactory.buildHandlerChain(this, 
    this.consumerHandlerHitch);
    SyncInvocationHandler clientSyncInvocationHandler = invocationHandlerFactory.
    createSyncInvocationHandler(this.invocationHandler);
    this.syncInvocationHandler = SyncInvocationHandlerChainFactory.
    buildSyncInvocationHandlerChain(this, clientSyncInvocationHandler);
    

    最终形成的执行链堆栈如下

    相关文章

      网友评论

          本文标题:hsf笔记-InvocationHandler

          本文链接:https://www.haomeiwen.com/subject/qfqvbftx.html