Thrift 异步Service的原理
定义一个Thrift Service
service TSIService {
TSOpenSessionResp openSession(1:TSOpenSessionReq req);
}
Thrift Service方法会提供两种类型的实现:
-
Iface
-
AsyncIface
按照openSession进行举例说明:
同步接口的方法定义如下:
public TSOpenSessionResp openSession(TSOpenSessionReq req)
throws org.apache.thrift.TException;
异步接口的定义方法如下:
public void openSession(TSOpenSessionReq req, org.apache.thrift.async.AsyncMethodCallback resultHandler)
throws org.apache.thrift.TException;
在Thrift中还定义两种Processor,一种为Processor(同步),一种为AsynProcessor(异步)
同样还按照Processor中的openSession举例:
public static class openSession<I extends Iface> extends org.apache.thrift.ProcessFunction<I, openSession_args> {
public openSession() {
super("openSession");
}
public openSession_args getEmptyArgsInstance() {
return new openSession_args();
}
protected boolean isOneway() {
return false;
}
// 直接被调用
public openSession_result getResult(I iface, openSession_args args) throws org.apache.thrift.TException {
openSession_result result = new openSession_result();
// 调用同步方法
result.success = iface.openSession(args.req);
return result;
}
}
在AsnyProcessor中举例:
最为复杂的是获得一个CallBack对象,通过getResultHandler,方法。
public static class openSession<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, openSession_args, TSOpenSessionResp> {
public openSession() {
super("openSession");
}
public openSession_args getEmptyArgsInstance() {
return new openSession_args();
}
public AsyncMethodCallback<TSOpenSessionResp> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
final org.apache.thrift.AsyncProcessFunction fcall = this;
return new AsyncMethodCallback<TSOpenSessionResp>() {
public void onComplete(TSOpenSessionResp o) {
openSession_result result = new openSession_result();
result.success = o;
try {
fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
return;
} catch (Exception e) {
LOGGER.error("Exception writing to internal frame buffer", e);
}
fb.close();
}
public void onError(Exception e) {
byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
org.apache.thrift.TBase msg;
openSession_result result = new openSession_result();
{
msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
msg = (org.apache.thrift.TBase)new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
}
try {
fcall.sendResponse(fb,msg,msgType,seqid);
return;
} catch (Exception ex) {
LOGGER.error("Exception writing to internal frame buffer", ex);
}
fb.close();
}
};
}
protected boolean isOneway() {
return false;
}
public void start(I iface, openSession_args args, org.apache.thrift.async.AsyncMethodCallback<TSOpenSessionResp> resultHandler) throws TException {
// openSession方法只处理请求,不处理恢复
// 所以尽量在openSession方法中把req和handler封装交给另一个线程处理
// 如果没有交给其他线程处理,完全可以按照同步的方法,在当前调用线程中处理req
// 处理结束req,获得resp或者error,然后调用handler的callback方法。
iface.openSession(args.req,resultHandler);
}
}
外部的调用过程就是,先获得一个CallBack,然后调用start方法。
异步使用原则
如果使用了AsynIface实现Service,需要注意几点:
-
不能直接在方法内处理req,req需要和handler(callback)封装交给另外的线程进行处理(暂且把这些线程叫做worker线程)
-
worker线程只做计算逻辑,也就是根据req的要求进行操作,在操作req结束以后获得的resp或者error,不能直接调用handler(callback)的方法(因为callback中的方法是一个网络IO的操作,有可能会block当前线程,如果网络IO操作是一个异步操作的话就不会block当前线程)
网友评论