美文网首页
thrift 入门(2/2)

thrift 入门(2/2)

作者: 李眼镜 | 来源:发表于2018-09-26 02:11 被阅读0次

    上接:thrift 入门(1/2)
    PS:我也不想拆,但是放一起文章太长无法发布。。。


    四、thrift 入门

    4.1 小试牛刀

    首先,我们还是先用 thrift 实现一下前文中的需求。

    按如下步骤操作:

    1. 安装 thrift(安装方式请自行百度,最新版本或稍老的版本都可以,我用的 0.9.3 )。
    2. 编写 demo.thrift 文件,内容如下:
    // demo.thrift
    namespace java com.ann.javas.frameworks.thrifts.demo.beans
    
    
    enum Gender {
        BOY = 1,
        GIRL = 2,
    }
    
    
    struct UserInfo {
        1: required string name,
        3: required Gender gender,
        6: required i32 weight,
    }
    
    service DemoService {
    
        string sayHi(1:required UserInfo userInfo);
    
        string locate();
    
        string adsRecommendation();
    }
    
    1. 打开终端,cd 到 demo.thrift 所在文件目录执行 thrift -r --gen java demo.thrift,thrift 会编译 demo.thrift 生成三个 java 源文件(分别是:DemoService.java、UserInfo.java、Gender.java,源码太多不在这儿贴了)到 ./gen-java 目录下。
    2. maven 引入 libthrift 包,版本须与前面安装的 thrift 版本保持一致。
    3. 把该文件拷贝到工程目录。
    4. 然后创建 3 个 java 源文件:
    • DemoServiceImpl.java,实现 thrift 定义的接口。
    • Demo1Server.java,服务端。
    • Demo1Client.java,客户端。
    // DemoServiceImpl.java
    package com.ann.javas.frameworks.thrifts.demo.impl;
    
    import com.ann.javas.frameworks.thrifts.demo.beans.DemoService;
    import com.ann.javas.frameworks.thrifts.demo.beans.Gender;
    import com.ann.javas.frameworks.thrifts.demo.beans.UserInfo;
    
    import org.apache.thrift.TException;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Arrays;
    import java.util.List;
    import java.util.Random;
    
    /**
     * Created by liyanling on 2018/9/9.
     */
    public class DemoServiceImpl implements DemoService.Iface{
      private static Logger logger = LoggerFactory.getLogger(DemoServiceImpl.class);
    
      private final static String SAY_HI_TEMPLATE = "Hi,%s%s!%s";
    
      private final static List<String> CITIES = Arrays.asList("上海", "北京", "广州", "成都", "内蒙古", "香港", "河北", "云南");
    
      private final static List<String> ADS = Arrays.asList("坐红旗车,走中国路", "要想皮肤好,早晚用大宝", "喝汇源果汁,走健康之路", "送礼就送脑白金",
                                                            "飘柔,就是这么自信");
    
    
      @Override
      public String sayHi(UserInfo userInfo) throws TException {
        String name = userInfo.getName();
        String nameSuffix = Gender.BOY == userInfo.getGender() ? "先生" : "女士";
        String weightNotice = userInfo.getWeight() >= 200 ? "你该减肥啦~" : "";
        return String.format(SAY_HI_TEMPLATE, name, nameSuffix, weightNotice);
      }
    
      @Override
      public String locate() throws TException {
        int citySize = CITIES.size();
        int randomIndex = Math.abs((new Random(System.currentTimeMillis())).nextInt()) % citySize;
        return CITIES.get(randomIndex);
      }
    
      @Override
      public String adsRecommendation() throws TException {
    //    logger.info("sleep..........");
    //    try {
    //      Thread.sleep(10000);
    //    } catch (InterruptedException e) {
    //      e.printStackTrace();
    //    }
    //    logger.info("awake..........");
        int adsSize = ADS.size();
        int randomIndex = Math.abs((new Random(System.currentTimeMillis())).nextInt()) % adsSize;
        return ADS.get(randomIndex);
      }
    }
    
    
    // Demo1Server.java
    package com.ann.javas.frameworks.thrifts.demo.impl;
    
    
    import com.ann.javas.frameworks.thrifts.demo.beans.DemoService;
    
    import org.apache.thrift.TProcessor;
    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.server.TServer;
    import org.apache.thrift.server.TSimpleServer;
    import org.apache.thrift.transport.TServerSocket;
    import org.apache.thrift.transport.TServerTransport;
    import org.apache.thrift.transport.TTransportException;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * Created by liyanling on 2018/9/9.
     */
    public class Demo1Server {
      private static Logger logger = LoggerFactory.getLogger(Demo1Server.class);
    
      public static final int PORT = 9081;
    
      public static final int CLIENT_TIMEOUT = 100000;
    
    
      public static void main(String[] args) {
        Demo1Server server = new Demo1Server();
        server.startServer();
      }
    
      /**
       * 单线程服务模型:TSimpleServer + TServerSocket
       */
      private void startServer() {
    
        try {
    
          int count = 1;
    
          logger.info("{}:new 一个 TServerSocket 实例,指定端口号为:{} 超时时间:{}", count++, PORT, CLIENT_TIMEOUT);
          TServerTransport tServerTransport = new TServerSocket(PORT, CLIENT_TIMEOUT);
    
          TSimpleServer.Args args = new TSimpleServer.Args(tServerTransport);
          logger.info("{}:服务端初始化 {} 参数...", count++, "TSimpleServer.Args");
    
          TBinaryProtocol.Factory proFactory = new TBinaryProtocol.Factory(true, true);
          args.protocolFactory(proFactory);
          logger.info("{}:设置协议工厂为 TBinaryProtocol.Factory,即:使用 TBinaryProtocol 协议", count++);
    
          TProcessor processor = new DemoService.Processor(new DemoServiceImpl());
          args.processor(processor);
          logger.info("{}:设置 processor 为 {} 实例", count++, "UserServiceImpl");
    
          TServer server = new TSimpleServer(args);
          logger.info("{}:{} 实例,服务启动", count++, "TSimpleServer");
          server.serve();
    
        } catch (TTransportException e) {
          e.printStackTrace();
        }
      }
    }
    
    
    // Demo1Client.java
    package com.ann.javas.frameworks.thrifts.demo.impl;
    
    import com.ann.javas.frameworks.thrifts.demo.beans.DemoService;
    import com.ann.javas.frameworks.thrifts.demo.beans.Gender;
    import com.ann.javas.frameworks.thrifts.demo.beans.UserInfo;
    
    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.protocol.TProtocol;
    import org.apache.thrift.transport.TSocket;
    import org.apache.thrift.transport.TTransport;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Scanner;
    
    /**
     * Created by liyanling on 2018/9/9.
     */
    public class Demo1Client {
      private static Logger logger = LoggerFactory.getLogger(Demo1Client.class);
    
      public static String LOCAL_HOST = "127.0.0.1";
      public static Scanner scanner;
    
    
      public static void main(String[] args){
        scanner = new Scanner(System.in);
        callServer(LOCAL_HOST, Demo1Server.PORT);
        scanner.close();
      }
    
    
      /**
       * TSocket
       */
      public static void callServer(String IP, int PORT) {
    
        try {
          int count = 1;
    
          logger.info("{}:客户端,创建 TSocket 实例 (IP:{},PORT:{})...", count++, IP, PORT);
          TTransport transport = new TSocket(LOCAL_HOST, PORT);
          transport.open();
          logger.info("{}:打开 socket 连接", count++);
    
          TProtocol protocol = new TBinaryProtocol(transport);
          logger.info("{}:创建 TBinaryProtocol 实例", count++);
    
          DemoService.Iface client = new DemoService.Client(protocol);
          logger.info("{}:创建 Demo1Service.Client 实例", count++);
    
          // 测试 locate
          String locateResult = client.locate();
          logger.info("{}:locate 返回值 {}",count++,locateResult);
    
          // 测试 adsRecommendation
          String adsRecommendationResult = client.adsRecommendation();
          logger.info("{}:adsRecommendation 返回值 {}",count++,adsRecommendationResult);
    
          // 测试 sayHi
          logger.info("{}:请输入姓名:", count++);
          String name = scanner.nextLine();
          logger.info("{}:请输入性别(GIRL / BOY):", count++);
          String gender = scanner.nextLine();
          logger.info("{}:请输入体重(单位:斤):", count++);
          Integer weight = Integer.valueOf(scanner.nextLine());
    
          UserInfo userInfo = new UserInfo();
          userInfo.setName(name);
          userInfo.setGender(Gender.valueOf(gender));
          userInfo.setWeight(weight);
    
          String sayHiResult = client.sayHi(userInfo);
          logger.info("{}:sayHi 返回值 {}",count++,sayHiResult);
    
    
          transport.close();
          logger.info("{}:关闭 socket 连接", count++);
    
        } catch (Throwable t) {
          t.printStackTrace();
        }
    
      }
    }
    
    

    分别测试 sayHi、locate、adsRecommendation 功能,输出日志如下:

    // Demo1Server 日志
    00:35:50.229 [main] INFO  com.ann.javas.frameworks.thrifts.demo.impl.Demo1Server - 1:new 一个 TServerSocket 实例,指定端口号为:9081 超时时间:100000
    00:35:50.249 [main] INFO  com.ann.javas.frameworks.thrifts.demo.impl.Demo1Server - 2:服务端初始化 TSimpleServer.Args 参数...
    00:35:50.249 [main] INFO  com.ann.javas.frameworks.thrifts.demo.impl.Demo1Server - 3:设置协议工厂为 TBinaryProtocol.Factory,即:使用 TBinaryProtocol 协议
    00:35:50.259 [main] INFO  com.ann.javas.frameworks.thrifts.demo.impl.Demo1Server - 4:设置 processor 为 UserServiceImpl 实例
    00:35:50.260 [main] INFO  com.ann.javas.frameworks.thrifts.demo.impl.Demo1Server - 5:TSimpleServer 实例,服务启动
    
    
    // Demo1Client 日志
    00:35:53.312 [main] INFO  com.ann.javas.frameworks.thrifts.demo.impl.Demo1Client - 1:客户端,创建 TSocket 实例 (IP:127.0.0.1,PORT:9081)...
    00:35:53.325 [main] INFO  com.ann.javas.frameworks.thrifts.demo.impl.Demo1Client - 2:打开 socket 连接
    00:35:53.326 [main] INFO  com.ann.javas.frameworks.thrifts.demo.impl.Demo1Client - 3:创建 TBinaryProtocol 实例
    00:35:53.328 [main] INFO  com.ann.javas.frameworks.thrifts.demo.impl.Demo1Client - 4:创建 Demo1Service.Client 实例
    00:35:53.351 [main] INFO  com.ann.javas.frameworks.thrifts.demo.impl.Demo1Client - 5:locate 返回值 香港
    00:35:53.363 [main] INFO  com.ann.javas.frameworks.thrifts.demo.impl.Demo1Client - 6:adsRecommendation 返回值 喝汇源果汁,走健康之路
    00:35:53.363 [main] INFO  com.ann.javas.frameworks.thrifts.demo.impl.Demo1Client - 7:请输入姓名:
    超人
    00:36:21.899 [main] INFO  com.ann.javas.frameworks.thrifts.demo.impl.Demo1Client - 8:请输入性别(GIRL / BOY):
    BOY
    00:36:24.783 [main] INFO  com.ann.javas.frameworks.thrifts.demo.impl.Demo1Client - 9:请输入体重(单位:斤):
    230
    00:36:27.376 [main] INFO  com.ann.javas.frameworks.thrifts.demo.impl.Demo1Client - 10:sayHi 返回值 Hi,超人先生!你该减肥啦~
    00:36:27.376 [main] INFO  com.ann.javas.frameworks.thrifts.demo.impl.Demo1Client - 11:关闭 socket 连接
    

    这个例子主要看 client 端日志,server 端的不重要,一般都是 deamon 启动的。

    不需要了解任何原理,光看这个用法你就会发现,thrift 通过某种方式屏蔽了用户对“序列化与反序列化”这个动作的感知,用户只需要:

    1. 按照某种规则、使用 xxx.thrift 文件定义 RPC 接口和数据结构。
    2. 执行命令生成对应的 java 代码。
    3. 然后在此基础上,实现你的业务逻辑就可以了。

    4.2 序列化分析

    关于 thrift 序列化的具体实现,可以在前面 demo 的基础上,通过串调用链路的方式进行了解。

    需要进行序列化和反序列化操作的无非就这么 4 个场景:

    1. client->server,client 序列化参数。
    2. client->server,server 反序列化参数,处理请求。
    3. server->client,server 序列化返回值。
    4. server->client,client 反序列化返回值。

    以 “client 序列化参数” 为例进行简单分析,过程如下:

    1. 以 sayHi() 为例,入口在 Demo1Client.java 客户端代码:
    // Demo1Client.java
    ...
    public class Demo1Client {
      ...
      public static void callServer(String IP, int PORT) {
          ...
           TProtocol protocol = new TBinaryProtocol(transport);
          ...
          DemoService.Iface client = new DemoService.Client(protocol);
          ...
          String sayHiResult = client.sayHi(userInfo);
          ...
      }
    }
    
    1. Command+B 点 sayHi 进到 DemoService.java 中的 Iface 接口定义,这是编译器根据 IDL 自动生成的代码:
    // DemoService.java
    // Autogenerated by Thrift Compiler
    public class DemoService {
      public interface Iface {
        public String sayHi(UserInfo userInfo) throws org.apache.thrift.TException;
        ...
      }
      ...
    }
    
    1. 如果你用的 IDE 是 idea ,在左侧能看到一个绿色的小圆圈,鼠标悬浮在上面,会看到 Iface 接口定义的 sayHi() 方法有两个实现类:
    • ...beans.DemoService.Client,客户端实现,其实是 DemoService 的内部类,编译器自动生成的。
    • ...impl.DemoServiceImpl,服务端实现,继承自 DemoService.Iface,用于实现服务端业务逻辑。


      Iface.sayHi 的两个实现
    1. 点进客户端实现,也就是 DemoService 的内部类 DemoService.Client,然后你会看到下面这段代码:
    // DemoService.java
    // Autogenerated by Thrift Compiler
    ...
    public class DemoService {
      ...
      public static class Client extends org.apache.thrift.TServiceClient implements Iface {
        ...
        public String sayHi(UserInfo userInfo) throws org.apache.thrift.TException
        {
          send_sayHi(userInfo);
          return recv_sayHi();
        }
        ...
      }
      ...
    }
    

    很明显,send_sayHi(userInfo) 是发送请求到服务端,return recv_sayHi() 是从服务端获取返回值。

    1. 本例需关注 send_sayHi(userInfo) 实现:
    • sayHi_args 又是个内部类,定义了 sayHi() 方法参数的数据结构,其唯一成员变量就是 UserInfo 类实例,对应的是 demo.thrift 中定义的 string sayHi(1:required UserInfo userInfo);
    • 先初始化一个 sayHi_args 类实例,准备好参数。
    • sendBase(...) 是 TServiceClient 提供的方法,调用时指明方法名和方法参数实例。
    • 在本例中,方法名即:"sayHi",参数即:sayHi_args 实例。
    // DemoService.java
    // Autogenerated by Thrift Compiler
    ...
    public class DemoService {
      ...
      public static class Client extends org.apache.thrift.TServiceClient implements Iface {
        ...
        public String sayHi(UserInfo userInfo) throws org.apache.thrift.TException
        {
          send_sayHi(userInfo);
          return recv_sayHi();
        }
        public void send_sayHi(UserInfo userInfo) throws org.apache.thrift.TException
        {
          sayHi_args args = new sayHi_args();
          args.setUserInfo(userInfo);
          sendBase("sayHi", args);
        }
        ...
      }
      ...
    }
    
    1. 进到 TServiceClient 看 sendBase(...) 具体实现:
    ...
    public abstract class TServiceClient {
      ...
      protected void sendBase(String methodName, TBase<?, ?> args) throws TException {
        this.sendBase(methodName, args, (byte)1);
      }
      ...
      private void sendBase(String methodName, TBase<?, ?> args, byte type) throws TException {
        this.oprot_.writeMessageBegin(new TMessage(methodName, type, ++this.seqid_));
        args.write(this.oprot_);
        this.oprot_.writeMessageEnd();
        this.oprot_.getTransport().flush();
      }
      ...
    }
    

    可以看到 sendBase(...) 是按照:MessageBegin、MessageBody、MessageEnd 的顺序将消息写入 outputStream 。

    • writeMessageBegin 和 writeMessageEnd 操作,不同协议(TProtocol 子类)的实现各有不同。
    • writeMessageBody 则由方法的参数结构体自己实现,也就是 args.write(this.oprot_); 这一行。
    1. 跟进 args.write(this.oprot_); 里面,其实还是编译器根据 IDL 定义自动生成的代码:
    // DemoService.java
    // Autogenerated by Thrift Compiler
    ...
    public class DemoService {
      ...
        private static class sayHi_argsStandardScheme extends StandardScheme<sayHi_args> {
          ...
          public void write(org.apache.thrift.protocol.TProtocol oprot, sayHi_args struct) throws org.apache.thrift.TException {
            struct.validate();
            oprot.writeStructBegin(STRUCT_DESC);
            if (struct.userInfo != null) {
              oprot.writeFieldBegin(USER_INFO_FIELD_DESC);
              struct.userInfo.write(oprot);
              oprot.writeFieldEnd();
            }
            oprot.writeFieldStop();
            oprot.writeStructEnd();
          }
        }
      ...
    }
    
    • 在 write 操作前,有一个 validate 校验,主要是校验参数格式(required 字段非null)。
    • 剩下的应该不用多介绍了,差不多的规则和实现,到这儿往下跟就比较简单了,不再贴代码过来(有点多)。
    1. 也就是说,一个结构体大致会按照这个模板进行序列化:
    - ${Message Begin}
      - Struct.validate()
      - ${struct Begin}
        - ${Field1 Begin}
        - ${Feild1 Body}
        - ${Field1 End}
        - ${Field2 Begin}
        - ${Feild2 Body}
        - ${Field2 End}
        - ......
        - ${Field Stop}
      - ${struct End}
    - ${Message End}
    

    PS:Struct 和 Feild 是可以嵌套的。
    例如:sayHi_args 的 Feild1 是 userInfo,则 sayHi_args 的 Feild1Body 实际上是包含了一个 UserInfo Struct 内容。
    emm......有点绕,追几遍调用链就能明白了。

    反序列化的流程不在这儿贴分析流程了,感兴趣的同学可以自己跟一下。

    4.3 序列化方案初探

    现在对我们前面的分析做个总结,thrift 序列化方案有3个关键点:

    1. libthrift 包
    2. thrift IDL 语法规则
    3. thrift 编译器
    序列化方案关键点

    序列化和反序列化在 thrift 中是通过 “协议” 来体现的。

    libthrift

    • 首先,libthrift 中有一个抽象类 TProtocol ,定义了一系列 readXXX 和 writeXXX 的方法,包括前面提到过的 Message、Struct、Feild 和 String、Double、Map、List、I32、I64、Binary 等数据结构的读写方法。
    • TProtocol 有多个实现,如:TBinaryProtocol 二进制格式、TCompactProtocol 压缩格式、TJSONProtocol JSON 格式 等。
    • 通常为节约带宽,提高传输效率,一般使用二进制类型的传输协议(TBinaryProtocol);但有时会还是会使用基于文本类型的协议,需根据项目/产品中的实际需求来确定。
    • 对 TProtocol 协议的具体实现感兴趣的同学,可以通过跟踪调用链的方式进行分析,然后使用 Wireshark 等工具抓包数据进行验证。

    若 thrift 提供的协议不能满足要求,用户还可以基于 TProtocol 来实现定制化协议。

    IDL

    • thrift 定义了一套 IDL 语法规则,可参见官方文档:Thrift interface description language
    • 用户需严格按照 thrift IDL 语法规则来定义 RPC 的接口和数据结构。

    编译器

    • 指定需要编译的文件及生成代码的语言,执行 thrift 编译命令,thrift 编译器会对 IDL 文件进行编译并输出指定语言的源码文件。
    • 例如:
      1. IDL 定义的 service 会生成一个单独的 Service 类,Service 类内部的 Iface 接口包含了 IDL service 定义的所有方法。
      2. IDL service 定义的每一个方法,其入参和返回值都会自动生成单独的 Service 内部类(function_args 和 function_result),并提供序列化/反序列化方法(read、write)。
      3. IDL 中的 struct、enum、exception 等定义,也会生成相应的类,并提供 read、write 方法。

    这三个要点结合起来,就构成了 thrift 序列化和反序列化的整套机制。


    五、thrift 服务框架

    至此,thrift 的两大关键点已经搞定一个:
    1、协议(序列化和反序列化)
    2、服务框架(网络通信)

    协议(即:序列化和反序列化)反映的是“数据传输格式”,而对于一个 RPC 框架来说,再上层就是数据传输方式和服务模型了。

    5.1 demo 分析

    在前面的例子中,哪一段代码和“服务模型”或“数据传输方式”有关呢?

    ...
    public class Demo1Server {
      ...
      private void startServer() {
        ...
        TServerTransport tServerTransport = new TServerSocket(PORT, CLIENT_TIMEOUT);
        TSimpleServer.Args args = new TSimpleServer.Args(tServerTransport);
        ...
        TServer server = new TSimpleServer(args);
        server.serve();
        ...
      }
      ...
    }
    
    • 先看哪一句是用来启动服务的,显然是 server.serve();
    • 再往上 TServer server = new TSimpleServer(args); 定义了 server ,是一个 TSimpleServer 实例,这里选定的服务模型就是 TSimpleServer
    • 而初始化 server 实例时用到的 TServerTransport 就是其传输方式。

    这次我们从上往下看,先讲服务模型。

    5.2 TServer 服务模型

    thrift 提供的服务模型可分为:单线程、多线程、事件驱动 3类。从另一个角度也可以划分为:阻塞服务模型、非阻塞服务模型 2类。

    thrift 的服务模型可以拿来对标我们学习网络编程时学到的网络通信模型(IO模型)。

    常见的有:

    • TSimpleServer - 简单的单线程服务模型,常用于测试。
    • TThreadPoolServer - 多线程、阻塞 IO 服务模型。
    • TNonblockingServer - 多线程、非阻塞 IO 服务模型。
    • 以及 TThreadedSelectorServer 和 THsHaServer。

    TSimpleServer

    前例中,Server 端使用的服务模型是 TSimpleServer 简单的单线程服务模型,其特点是:

    • 只有一个工作线程,循环监听新请求的到来,并完成请求的处理。
    • 这种服务模型的优点是使用方法简单、便于理解。
    • 但是一次只能接收和处理一个 socket 连接,效率比较低。
    • 主要用于测试或演示 thrift 的工作过程,在实际开发中很少会用到。

    可以按照下面的步骤简单做个小测试:

    1. 在 DemoServiceImpl 的某个方法里,加一段 Thread.sleep(xxxx) 。
    2. 启动服务并建立一个 clientA 连接并发起请求,当前请求会夯在 sleep 的地方,不给客户端返回结果。
    3. 然后再新建一个 clientB 连接并发起请求。
    4. 你会发现,若 clientA 的请求不处理完,server 端不会与 clientB 建立连接,clientB 需要一直等着,直到 clientA 的请求被处理完并返回结果。
    TSimpleServer 示意图

    TThreadPoolServer

    TThreadPoolServer 靠引入 “工作线程池” 解决了 TSimpleServer 不支持并发和多连接的问题,其特点是:

    • 使用一个专有线程负责监听端口、接受连接,具体的业务处理则由一个工作线程池中的子线程来完成。
    • 优点是将监听请求和业务处理两项工作做了拆分,在并发量较大时新连接也能够被及时接受。
    • 缺点是服务处理能力受限于线程池的工作能力,当并发请求大于线程池中的线程数时,新请求还是要排队等待。
    TThreadPoolServer 示意图

    TThreadServer 实现上还有一些细节没画在图上,比如:
    线程池满如何处理?
    有无异常重试机制?
    有无优雅停服机制?
    ......
    感兴趣的同学可以读源码了解一下。

    TNonblockingServer
    TNonblockingServer 使用非阻塞 I/O 解决了 TSimpleServer 一个客户端阻塞其他所有客户端的问题,其特点是:

    • 服务启动时开一个 SelectAcceptThread 线程,该线程内部通过引入 java.nio.channels.Selector 选择器(Java NIO 核心组件之一)实现单线程管理多个网络连接的功能。
    • 优点是服务端可以同时处理多个客户端连接请求,而不需要像 TSimpleServer 那样排队等待。
    • 缺点是业务处理还是单线程顺序执行,在业务处理比较复杂、耗时较长的场景下,执行效率也很难提升。

    我先去补补 Selector 的课再回来补这里的图......

    THsHaServer
    THsHaServer 是 TNonblockingServer 的子类,可以简单粗暴的理解成 TNonblockingServer 和 TThreadPoolServer 的结合版本:

    • THsHaServer 在 TNonblockingServer 的基础上 ,引入了 TThreadPoolServer 的工作线程池,用于进行业务处理。
    • 优点是将业务处理过程扔到工作线程池处理,主线程可以直接返回进行下一次循环操作,效率大大提升。
    • 缺点是,由于主线程需要完成所有 socekt 的监听及数据读写工作,当并发请求数较大,且发送数据量较多时,监听 socket 上的新连接请求不能被及时接受。(TNonblockingServer 也有这个问题)

    TThreadedSelectorServer
    TThreadedSelectorServer 可以简单看做是 THsHaServer 的扩展(实际上并不是),其特点是:

    • 在 THsHaServer 的基础上,再引入一个 SelectorThread 线程池,以分散网络IO。
    • 并提供一个 SelectorThreadLoadBalancer 用作 SelectorThread 分发。

    最后附上一张 TServer 精“剪”类图:


    TServer 精“剪”类图

    5.3 TTransport 传输方式

    传输方式这部分,首先请同学们区分 TServerTransport 和 TTransport:

    • TServerTransport 定义的是“以何种方式监听和处理请求”;初始化 Server 服务端实例的时候需要明确指定;提供 listen()、accept()、interrupt() 等功能。
    • TTransport 定义的是“以何种形式在网络上传输数据”,底层实现是对 ServerSocket 做了封装;初始化 Client 客户端实例的时候需要明确指定;提供 open()/close()、read()/write()、peek()、flush() 等功能。

    TTransport 和 TServerTransport 需要与 TServer 搭配使用:

    • 服务端为阻塞式服务时,使用 TServerSocket ;客户端使用 TSocket 配合。
    • 服务端为非阻塞式服务时,使用 TNonblockingServerSocket ;客户端使用 TFramedTransport 封装 TSocket 配合。

    这里 TTransport 就是我们说的“传输方式”,即:how is transmitted?

    • TSocket,阻塞 IO 传输。
    • TFramedTransport,非阻塞 I/O,按帧/块传输(支持定长数据发送和接收)。
    • TMemoryTransport,内存IO。
    • TFileTransport,文件格式传输,不提供java的实现。
    • TZlibTransport,zlib压缩传输,不提供java的实现。
    • TNonblockingTransport,非阻塞式I/O,用于构建异步客户端。
      ......

    不同语言对上述 Transport 的支持是不一样的。
    我个人只用过 TSocket 和 TFramedTransport,有点心虚,怂...

    TServerTransport 类图 TTransport 类图

    5.4 小结

    最后总结一下老生常谈的 thrift 分层:

    • Transport 传输层:定义了网络数据的传输方式,负责完成数据的网络传输。
    • TProtocol 协议层:定义了网络传输数据的格式,负责完成“应用数据-网络可传输的数据”的组装和拆解(序列化和反序列化)。
    • TProcessor:不重要,自动生成的处理器。
    • Server 服务层:把 Transport、Protocol、TProcessor 组合在一起,将服务运行起来,在指定端口监听并处理客户端请求。
    thrift 层次图

    六、思考

    曾经在开发过程中遇到的问题,列在这里,供感兴趣的同学思考:

    • 若返回值里有 map,而 map 里写了个 map.put("key",null),就会抛异常,为什么?
    • 若 Server 端和 Client 端使用的 thrift 版本不一致,可能会出现什么问题?
    • IDL 中的 struct 如何扩展升级?

    thrift 入门的内容差不多就到这里了,写的心好累。

    如果文中内容有错误的地方,欢迎各位大佬指出,感恩。


    七、参考

    相关文章

      网友评论

          本文标题:thrift 入门(2/2)

          本文链接:https://www.haomeiwen.com/subject/niwroftx.html