美文网首页
Thrift 异步Service的原理

Thrift 异步Service的原理

作者: qingshuiting | 来源:发表于2018-11-30 15:57 被阅读0次

Thrift 异步Service的原理

定义一个Thrift Service

service TSIService {
    TSOpenSessionResp openSession(1:TSOpenSessionReq req);
}

Thrift Service方法会提供两种类型的实现:

  1. Iface

  2. AsyncIface

按照openSession进行举例说明:

同步接口的方法定义如下:

public TSOpenSessionResp openSession(TSOpenSessionReq req) 
throws org.apache.thrift.TException;

异步接口的定义方法如下:

public void openSession(TSOpenSessionReq req, org.apache.thrift.async.AsyncMethodCallback resultHandler) 
throws org.apache.thrift.TException;

在Thrift中还定义两种Processor,一种为Processor(同步),一种为AsynProcessor(异步)

同样还按照Processor中的openSession举例:

    public static class openSession<I extends Iface> extends org.apache.thrift.ProcessFunction<I, openSession_args> {
      public openSession() {
        super("openSession");
      }

      public openSession_args getEmptyArgsInstance() {
        return new openSession_args();
      }

      protected boolean isOneway() {
        return false;
      }
        // 直接被调用
      public openSession_result getResult(I iface, openSession_args args) throws org.apache.thrift.TException {
        openSession_result result = new openSession_result();
        // 调用同步方法
        result.success = iface.openSession(args.req);
        return result;
      }
    }

在AsnyProcessor中举例:

最为复杂的是获得一个CallBack对象,通过getResultHandler,方法。

    public static class openSession<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, openSession_args, TSOpenSessionResp> {
      public openSession() {
        super("openSession");
      }

      public openSession_args getEmptyArgsInstance() {
        return new openSession_args();
      }

      public AsyncMethodCallback<TSOpenSessionResp> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
        final org.apache.thrift.AsyncProcessFunction fcall = this;
        return new AsyncMethodCallback<TSOpenSessionResp>() { 
          public void onComplete(TSOpenSessionResp o) {
            openSession_result result = new openSession_result();
            result.success = o;
            try {
              fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
              return;
            } catch (Exception e) {
              LOGGER.error("Exception writing to internal frame buffer", e);
            }
            fb.close();
          }
          public void onError(Exception e) {
            byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
            org.apache.thrift.TBase msg;
            openSession_result result = new openSession_result();
            {
              msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
              msg = (org.apache.thrift.TBase)new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
            }
            try {
              fcall.sendResponse(fb,msg,msgType,seqid);
              return;
            } catch (Exception ex) {
              LOGGER.error("Exception writing to internal frame buffer", ex);
            }
            fb.close();
          }
        };
      }

      protected boolean isOneway() {
        return false;
      }

      public void start(I iface, openSession_args args, org.apache.thrift.async.AsyncMethodCallback<TSOpenSessionResp> resultHandler) throws TException {
        // openSession方法只处理请求,不处理恢复
        // 所以尽量在openSession方法中把req和handler封装交给另一个线程处理
        // 如果没有交给其他线程处理,完全可以按照同步的方法,在当前调用线程中处理req
        // 处理结束req,获得resp或者error,然后调用handler的callback方法。
        iface.openSession(args.req,resultHandler);
      }
    }

外部的调用过程就是,先获得一个CallBack,然后调用start方法。

异步使用原则

如果使用了AsynIface实现Service,需要注意几点:

  1. 不能直接在方法内处理req,req需要和handler(callback)封装交给另外的线程进行处理(暂且把这些线程叫做worker线程)

  2. worker线程只做计算逻辑,也就是根据req的要求进行操作,在操作req结束以后获得的resp或者error,不能直接调用handler(callback)的方法(因为callback中的方法是一个网络IO的操作,有可能会block当前线程,如果网络IO操作是一个异步操作的话就不会block当前线程)

相关文章

网友评论

      本文标题:Thrift 异步Service的原理

      本文链接:https://www.haomeiwen.com/subject/luphcqtx.html