美文网首页
Thrift基本原理

Thrift基本原理

作者: 夏目手札 | 来源:发表于2018-04-17 10:05 被阅读0次

前言

基于上一次的简单介绍做一次稍微深入的分析

正文

  1. 客户端如何发送
    由上次结尾的一个简单demo作为分析的例子,看下作为客户端如何发送数据的。
public class HelloServiceClient {
    public static void main(String[] args) throws TException {
        System.out.println("客户端启动.....");
        //1.初始化传输工具
        TTransport transport =new TSocket("10.10.163.11", 8999, 30000);
        //2.绑定传输协议
        TProtocol protocol = new TBinaryProtocol(transport);
        //3.client绑定传输协议
        Hello.Client client = new Hello.Client(protocol);
        transport.open();
        //4.client调用,发送消息
        String result = client.helloWorld("TOM");
        System.out.println(result);
    }
}

看一下Hello.Client调用方法

public String helloWorld(String para) throws org.apache.thrift.TException
    {
      send_helloWorld(para);
      return recv_helloWorld();
    }

    public void send_helloWorld(String para) throws org.apache.thrift.TException
    {
      //初始化并设置参数对象
      helloWorld_args args = new helloWorld_args();
      args.setPara(para);
      //发送信息
      sendBase("helloWorld", args);
    }

Hello.Client中sendBase方法

protected void sendBase(String methodName, TBase<?,?> args) throws TException {
    sendBase(methodName, args, TMessageType.CALL);
  }
private void sendBase(String methodName, TBase<?,?> args, byte type) throws TException {
    //封装消息并写入传输协议中
    oprot_.writeMessageBegin(new TMessage(methodName, type, ++seqid_));
    args.write(oprot_);
    oprot_.writeMessageEnd();
    //发送消息
    oprot_.getTransport().flush();
  }

TMessage对象定义了三个成员变量,重点还是在writeMessageBegin方法中
由于我们的demo绑定的传输协议是TBinaryProtocol(二进制编码格式进行数据传输)
所以我们查看TBinaryProtocol的writeMessageBegin方法

public void writeMessageBegin(TMessage message) throws TException {
    if (strictWrite_) {
      int version = VERSION_1 | message.type;
      writeI32(version);
      writeString(message.name);
      writeI32(message.seqid);
    } else {
      writeString(message.name);
      writeByte(message.type);
      writeI32(message.seqid);
    }
  }
public void writeI32(int i32) throws TException {
    inoutTemp[0] = (byte)(0xff & (i32 >> 24));
    inoutTemp[1] = (byte)(0xff & (i32 >> 16));
    inoutTemp[2] = (byte)(0xff & (i32 >> 8));
    inoutTemp[3] = (byte)(0xff & (i32));
    trans_.write(inoutTemp, 0, 4);
  }
public void writeString(String str) throws TException {
    try {
      byte[] dat = str.getBytes("UTF-8");
      writeI32(dat.length);
      trans_.write(dat, 0, dat.length);
    } catch (UnsupportedEncodingException uex) {
      throw new TException("JVM DOES NOT SUPPORT UTF-8");
    }
  }

基本上就是写入版本,写入方法长度,写入方法名,最后在写入seqid
最后再由Hello.helloWorld_args类调用helloWorld_argsStandardScheme中write方法写入参数信息

public void write(org.apache.thrift.protocol.TProtocol oprot, helloWorld_args struct) throws org.apache.thrift.TException {
        struct.validate();//空方法
        oprot.writeStructBegin(STRUCT_DESC);//空方法
        if (struct.para != null) {
          //写入参数信息
          oprot.writeFieldBegin(PARA_FIELD_DESC);
          oprot.writeString(struct.para);
          oprot.writeFieldEnd();
        }
        oprot.writeFieldStop();
        oprot.writeStructEnd();
      }

下面看下接受信息的处理

//Hello.Client
public String recv_helloWorld() throws org.apache.thrift.TException
    {
      helloWorld_result result = new helloWorld_result();
      receiveBase(result, "helloWorld");
      if (result.isSetSuccess()) {
        return result.success;
      }
     ...
    }
protected void receiveBase(TBase<?,?> result, String methodName) throws TException {
    TMessage msg = iprot_.readMessageBegin();
    if (msg.type == TMessageType.EXCEPTION) {
      ...
    }
    System.out.format("Received %d%n", msg.seqid);
    if (msg.seqid != seqid_) {
    ...
    }
    result.read(iprot_);
    iprot_.readMessageEnd();
  }

以上就是客户端的整个处理流程,大概时序图如下


client-process.png

下面看下服务端的处理流程
先看下服务端的代码

public class HelloServiceServer {
    public static void main(String[] args) throws TTransportException {
        System.out.println("服务端开启......");
        TProcessor tProcessor=new Hello.Processor<Hello.Iface>(new HelloServiceImpl());
        TServerSocket serverSocket = new TServerSocket(8999);
        TServer.Args tArgs = new TServer.Args(serverSocket);
        tArgs.processor(tProcessor);
        tArgs.protocolFactory(new TBinaryProtocol.Factory());
        TServer server = new TSimpleServer(tArgs);
        server.serve();
    }
}

前面的都是一些绑定processor,protocol还有启动绑定serversocket,主要看一下server.serve();

public void serve() {
   ....
        client = serverTransport_.accept();
        if (client != null) {
          processor = processorFactory_.getProcessor(client);
          inputTransport = inputTransportFactory_.getTransport(client);
          outputTransport = outputTransportFactory_.getTransport(client);
          inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
          outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
          if (eventHandler_ != null) {
            connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol);
          }
          while (true) {
            if (eventHandler_ != null) {
              eventHandler_.processContext(connectionContext, inputTransport, outputTransport);
            }
            if(!processor.process(inputProtocol, outputProtocol)) {
              break;
            }
          }
        }
     ....
  }

前面的代码主要是从客户端获取对应的数据,主要看下处理流程即processor.process()

public boolean process(TProtocol in, TProtocol out) throws TException {
    TMessage msg = in.readMessageBegin();//获取TMessage
    ProcessFunction fn = processMap.get(msg.name);//根据方法名获取对应的方法处理类
    if (fn == null) {
      .....
    }
    fn.process(msg.seqid, in, out, iface);
    return true;
  }

当获取方法处理类时,执行处理方法

//Hello.java 中helloWorld类
public helloWorld_args getEmptyArgsInstance() {
        return new helloWorld_args();
      }
//helloWorld_args类中read方法,与客户端对应
public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
      schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
    }
//Hello.java 中Processor类执行了实现类HelloServiceImpl
public helloWorld_result getResult(I iface, helloWorld_args args) throws org.apache.thrift.TException {
        helloWorld_result result = new helloWorld_result();
        result.success = iface.helloWorld(args.para);
        return result;
      }
public final void process(int seqid, TProtocol iprot, TProtocol oprot, I iface) throws TException {
    T args = getEmptyArgsInstance();
    try {
      args.read(iprot);
    } catch (TProtocolException e) {
     ....
    }
    iprot.readMessageEnd();
    TBase result = null;

    try {
      result = getResult(iface, args);
    } catch(TException tex) {
     ....
    }

    if(!isOneway()) {//跟客户端差不多了
      oprot.writeMessageBegin(new TMessage(getMethodName(), TMessageType.REPLY, seqid));
      result.write(oprot);//结果写到输出流
      oprot.writeMessageEnd();
      oprot.getTransport().flush();
    }
  }

总结

我们再来看一下Thrift的协议栈以及上述流程的抓包信息


thrift.PNG
Thrift_protocol.png

基本上对Thrift的流程熟悉了。

相关文章

  • Thrift基本原理

    前言 基于上一次的简单介绍做一次稍微深入的分析 正文 客户端如何发送由上次结尾的一个简单demo作为分析的例子,看...

  • Thirft

    一、About thrift二、什么是thrift,怎么工作?三、Thrift IDL四、Thrift D...

  • Docker&k8s微服务学习实践(二)

    一、下载配置thrift 从Thrift官网 http://thrift.apache.org/ 下载thrift...

  • thrift 指南

    /usr/local/opt/thrift@0.9/bin/thrift -gen java a.thrift会在...

  • Thrift学习

    Thrift源码剖析 Thrift源码分析及一个完整的例子 CSDN Thrift源码分析 Thrift二进制序列...

  • thrift 简介

    thrift 基本概念、数据类型thrift 简介一 thrift 基本类概述、序列化协议thrift 简介二 t...

  • Thrift 异步Service的原理

    Thrift 异步Service的原理 定义一个Thrift Service Thrift Service方法会提...

  • C# 通过Thrift访问Hbase

    1、确保Hbase中已经开启Thrift服务2、Thrift官网( http://thrift.apache.or...

  • 聊一聊序列化-Thrift

    认识Thrift Thrift is an interface definition language and b...

  • Thrift入门

    原文链接:thrift入门 转载请注明出处~ Thrift简介 什么是thrift 简单来说,是Facebook公...

网友评论

      本文标题:Thrift基本原理

      本文链接:https://www.haomeiwen.com/subject/dhyekftx.html