美文网首页
pigeon源码分析-同步调用和异步调用

pigeon源码分析-同步调用和异步调用

作者: WhiteBase | 来源:发表于2018-04-07 19:44 被阅读0次

pigeon源码分析-同步调用和异步调用

Pigeon是美团点评内部广泛使用的一个分布式服务通信框架(RPC),本文所有分析基于 pigeon 开源版本:RPC framework of DIANPING,具体版本为:2.9.12-SNAPSHOT

概要:先从调用方视角 RPC 框架基本原理,然后介绍动态代理的生成,从这里切入之后再详细介绍同步、异步调用;

RPC就是要在分布式场景下完成这样一个动作: Result result = service.call(args) ,在这个= 等号背后 RPC 框架做了很多事。

RPC 调用在客户端视角下的分解

简单来说可以分成这几步:

  • Client client = findClient(); // 这一步主要是服务治理相关,本文不关注;
  • client.write(request);
  • Object resp = waitResponse();
  • Result result = convert2Result(resp);

pigeon 在客户端是如何生成代理的?

远程调用离不开动态代理,其实 pigeon service 在客户端视角就是生成了一个被调用接口的实现类,通过网络请求来返回数据,这样就好像实现类在本地一样;

生成代理的逻辑:
在 bean 初始化的时候,获取 Proxy 实例,代码如下:

// com.dianping.pigeon.remoting.invoker.config.spring.ReferenceBean#init
    public void init() throws Exception {
            // 进行初始化配置
        this.obj = ServiceFactory.getService(invokerConfig);
        // 略
    }

使用 JDK 动态代理,真正执行的逻辑都在 java.lang.reflect.InvocationHandler 中,

可以看到 invoke() 时执行的是 com.dianping.pigeon.remoting.invoker.service.ServiceInvocationProxy#extractResult,执行的逻辑是一个 FilterChain,成员见 com.dianping.pigeon.remoting.invoker.process.InvokerProcessHandlerFactory#init,每个 InvocationInvokeFilter 的作用基本见名知意,完成一些统计、上下文准备、监控等功能;

终于到了 RemoteCallInvokeFilter,这是执行远程调用最关键的一步,主要代码如下:

//com.dianping.pigeon.remoting.invoker.process.filter.RemoteCallInvokeFilter#invoke

    @Override
    public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext)
            throws Throwable {
            // 略
        try {
            switch (callMethod) {
                case SYNC:
                    // do call
                     break;
                case CALLBACK:
                    // do call
                    break;
                case FUTURE:
                    //do call
                    break;
                case ONEWAY:
                    //do call
                    break;
                default:
                    throw new BadRequestException("Call type[" + callMethod.getName() + "] is not supported!");

            }

            ((DefaultInvokerContext)invocationContext).setResponse(response);
            afterInvoke(invocationContext);
        } catch (Throwable t) {
            afterThrowing(invocationContext, t);
            throw t;
        }

        return response;
    }

可以看到,pigeon 支持四种调用类型:sync, callback, future, oneway

这里主要分析 sync 和 future,分别对应一般意义上的同步、异步调用模式,其他两种是在这两种上做了特殊处理;

SYNC-同步调用

同步调用,主要逻辑:

case SYNC:
    CallbackFuture future = new CallbackFuture();
    response = InvokerUtils.sendRequest(client, invocationContext.getRequest(), future);
    invocationContext.getTimeline().add(new TimePoint(TimePhase.Q));
    if (response == null) {
        response = future.getResponse(request.getTimeout());
    }
    break;

先初始化了一个 callbackFuture,之后 sendRequest(),最后获取 response

进一步查看 sendRequest() 实现:

com.dianping.pigeon.remoting.invoker.util.InvokerUtils#sendRequest(com.dianping.pigeon.remoting.invoker.Client, com.dianping.pigeon.remoting.common.domain.InvocationRequest, com.dianping.pigeon.remoting.invoker.concurrent.Callback)
 
 --> com.dianping.pigeon.remoting.invoker.AbstractClient#write
 实际的 doWrite() 有两种实现,HTTP / netty,这里看 NettyClient
 
 --> com.dianping.pigeon.remoting.netty.invoker.NettyClient#doWrite
 
 // 调用 netty client 写请求
     @Override
    public InvocationResponse doWrite(InvocationRequest request) throws NetworkException {
        NettyChannel channel = null;

        try {

            channel = channelPool.selectChannel();

            ChannelFuture future = channel.write0(request);

            afterWrite(request, channel);

            if (request.getMessageType() == Constants.MESSAGE_TYPE_SERVICE
                    || request.getMessageType() == Constants.MESSAGE_TYPE_HEART) {
                future.addListener(new MessageWriteListener(request, channel));
            }

        } catch (Exception e) {
            throw new NetworkException("[doRequest] remote call failed:" + request, e);
        }
        return null;
    }

仔细看这里永远返回 null,然后再看 sync 调用执行的下一步,就是 response = future.getResponse(request.getTimeout()); , 判断 sendRequest() 返回结果为 null,就从 future 获取结果,这两步就联系起来了;

而 getResponse(timeout) 所做的事情只有一件:while 循环等待到 isDone == true || timeout,这里不详细展示,看代码即可;

拓展一下,这里只做等待,那么返回值从哪里来呢?答案是等待 socket 的另一端也就是 server 回写;

回过头来看,remoteCallInvocationFilter 所做的就是写请求,然后等待接受返回值;

FUTURE-异步调用

Future 调用是 pigeon 提供的标准异步调用模式。使用方式如下:

//调用ServiceA的method1
serviceA.method1("aaa");
//获取ServiceA的method1调用future状态
Future future1OfServiceA = InvokerHelper.getFuture();
//调用ServiceA的method2
serviceA.method2("bbb");
//获取ServiceA的method2调用future状态
Future future2OfServiceA = InvokerHelper.getFuture();

//获取ServiceA的method2调用结果
Object result2OfServiceA = future2OfServiceA.get();
//获取ServiceA的method1调用结果
Object result1OfServiceA = future1OfServiceA.get();

简单来说就是:调用 —> 获取 Future —> 获取结果。

因为每次调用不用阻塞等待结果,而是拿到 Future 实例之后由开发者在合适的时机获取返回值,所以合理使用能提高并发性;

来看下实现:

case FUTURE:
    ServiceFutureImpl futureImpl = new ServiceFutureImpl(invocationContext, request.getTimeout());
    InvokerUtils.sendRequest(client, invocationContext.getRequest(), futureImpl);
    FutureFactory.setFuture(futureImpl);
    response = InvokerUtils.createFutureResponse(futureImpl);
    invocationContext.getTimeline().add(new TimePoint(TimePhase.Q));
break;

可以看到只有 sendRequest(),没有等待返回,开发文档中有介绍,Future 调用之后需要开发者自行通过 future.get(timeout); 获取返回值;

FutureFactory.setFuture(futureImpl); 这一步做了什么?

public class FutureFactory {
    private static ThreadLocal<Future<?>> threadFuture = new ThreadLocal<Future<?>>();

    public static void setFuture(Future<?> future) {
        threadFuture.set(future);
    }

其实就是将 Future 实例设置到 ThreadLocal 中,所以每次调用结束之后立即通过 Future future1OfServiceA = InvokerHelper.getFuture(); 获取Future实例,否则如果紧接着进行了下一次future调用,就会因为ThreadLocal 中的 future 被覆盖而无法正确获取返回值;

思考 :返回值的 Future 是什么作用?

答案:其实 pigeon 中的 Future 实现了 JDK 的标准语义,即:A Future represents the result of an asynchronous computation . 代表了一次异步计算的结果。简单来说,就是一个暂时代表结果的占位符。

需要结果的时候就去看看有没有 response 了,有就返回,没有就阻塞到超时时间到;

pigeon 同步调用与异步调用处理有哪些不同

同步、异步,都是先发送完请求,然后等待结果。

SYNC 模式下,立即等待直到拿到结果或者超时,然后将结果(超时情况下则是异常)返回给调用方;

FUTURE 模式下,立即返回个占位符 Future,这里的实现类是:com.dianping.pigeon.remoting.invoker.concurrent.ServiceFutureImpl#ServiceFutureImpl,将真正获取返回值的时机交给调用方控制;

这里也因为 Future 模式实现时使用了 ThreadLocal 暂时存储占位符,所以多次 Future 调用,一定要顺序获取结果;

相关文章

  • pigeon源码分析-同步调用和异步调用

    pigeon源码分析-同步调用和异步调用 Pigeon是美团点评内部广泛使用的一个分布式服务通信框架(RPC),本...

  • 关于高并发的几个重要概念

    1.1 同步和异步 首先这里说的同步和异步是指函数/方法调用方面。 很明显,同步调用会等待方法的返回,异步调用会瞬...

  • 实战Java高并发程序设计笔记第一章

    相关术语 同步和异步 同步和异步用来形容一次方法调用。 同步:方法调用一旦开始,调用者必须等到方法调用结束后,才能...

  • 同步异步/阻塞非阻塞

    同步异步 同步和异步体现的是消息通知的方式 同步:调用者调用被调用者,被调用者在返回其结果前,调用者不会返回。实际...

  • JAVA语言系列:组合式异步编程

    1. 导论 同步API和异步API:同步/异步关注的是消息通知的机制。 同步:调用了某个方法,调用方在被调用方运行...

  • OkHttp 任务调度 Dispatcher 源码分析

    我们在前面的同步异步请求源码分析中经常会到 Dispatcher 类中去调用一些方法。 OkHttp如何实现同步异...

  • 第一章 走进并行世界

    基本概念 1. 同步和异步 同步和异步通常用来形容一次方法调用 同步方法调用一旦开始,调用者必须等到方法调用返回后...

  • BIO,NIO,AIO

    同步、异步、阻塞、非阻塞 同步与异步 同步: 同步就是发起一个调用后,被调用者未处理完请求之前,调用不返回。 异步...

  • 并发,你需要知道的几个概念

    同步(Synchronous)和异步(Asynchronous) 同步方法调用一旦开始,调用者必须等待方法调用返回...

  • 多线程基本概念

    1.同步&异步 同步和异步通常用来形容一次方法的调用。 同步方法一旦开始调用,调用者必须等待调用方法的返回,才能继...

网友评论

      本文标题:pigeon源码分析-同步调用和异步调用

      本文链接:https://www.haomeiwen.com/subject/uhtuhftx.html