美文网首页
Thrift 异步Service的原理

Thrift 异步Service的原理

作者: qingshuiting | 来源:发表于2018-11-30 15:57 被阅读0次

    Thrift 异步Service的原理

    定义一个Thrift Service

    service TSIService {
        TSOpenSessionResp openSession(1:TSOpenSessionReq req);
    }
    

    Thrift Service方法会提供两种类型的实现:

    1. Iface

    2. AsyncIface

    按照openSession进行举例说明:

    同步接口的方法定义如下:

    public TSOpenSessionResp openSession(TSOpenSessionReq req) 
    throws org.apache.thrift.TException;
    

    异步接口的定义方法如下:

    public void openSession(TSOpenSessionReq req, org.apache.thrift.async.AsyncMethodCallback resultHandler) 
    throws org.apache.thrift.TException;
    
    

    在Thrift中还定义两种Processor,一种为Processor(同步),一种为AsynProcessor(异步)

    同样还按照Processor中的openSession举例:

        public static class openSession<I extends Iface> extends org.apache.thrift.ProcessFunction<I, openSession_args> {
          public openSession() {
            super("openSession");
          }
    
          public openSession_args getEmptyArgsInstance() {
            return new openSession_args();
          }
    
          protected boolean isOneway() {
            return false;
          }
            // 直接被调用
          public openSession_result getResult(I iface, openSession_args args) throws org.apache.thrift.TException {
            openSession_result result = new openSession_result();
            // 调用同步方法
            result.success = iface.openSession(args.req);
            return result;
          }
        }
    

    在AsnyProcessor中举例:

    最为复杂的是获得一个CallBack对象,通过getResultHandler,方法。

        public static class openSession<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, openSession_args, TSOpenSessionResp> {
          public openSession() {
            super("openSession");
          }
    
          public openSession_args getEmptyArgsInstance() {
            return new openSession_args();
          }
    
          public AsyncMethodCallback<TSOpenSessionResp> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
            final org.apache.thrift.AsyncProcessFunction fcall = this;
            return new AsyncMethodCallback<TSOpenSessionResp>() { 
              public void onComplete(TSOpenSessionResp o) {
                openSession_result result = new openSession_result();
                result.success = o;
                try {
                  fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
                  return;
                } catch (Exception e) {
                  LOGGER.error("Exception writing to internal frame buffer", e);
                }
                fb.close();
              }
              public void onError(Exception e) {
                byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
                org.apache.thrift.TBase msg;
                openSession_result result = new openSession_result();
                {
                  msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
                  msg = (org.apache.thrift.TBase)new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
                }
                try {
                  fcall.sendResponse(fb,msg,msgType,seqid);
                  return;
                } catch (Exception ex) {
                  LOGGER.error("Exception writing to internal frame buffer", ex);
                }
                fb.close();
              }
            };
          }
    
          protected boolean isOneway() {
            return false;
          }
    
          public void start(I iface, openSession_args args, org.apache.thrift.async.AsyncMethodCallback<TSOpenSessionResp> resultHandler) throws TException {
            // openSession方法只处理请求,不处理恢复
            // 所以尽量在openSession方法中把req和handler封装交给另一个线程处理
            // 如果没有交给其他线程处理,完全可以按照同步的方法,在当前调用线程中处理req
            // 处理结束req,获得resp或者error,然后调用handler的callback方法。
            iface.openSession(args.req,resultHandler);
          }
        }
    

    外部的调用过程就是,先获得一个CallBack,然后调用start方法。

    异步使用原则

    如果使用了AsynIface实现Service,需要注意几点:

    1. 不能直接在方法内处理req,req需要和handler(callback)封装交给另外的线程进行处理(暂且把这些线程叫做worker线程)

    2. worker线程只做计算逻辑,也就是根据req的要求进行操作,在操作req结束以后获得的resp或者error,不能直接调用handler(callback)的方法(因为callback中的方法是一个网络IO的操作,有可能会block当前线程,如果网络IO操作是一个异步操作的话就不会block当前线程)

    相关文章

      网友评论

          本文标题:Thrift 异步Service的原理

          本文链接:https://www.haomeiwen.com/subject/luphcqtx.html