美文网首页
thrift 入门(2/2)

thrift 入门(2/2)

作者: 李眼镜 | 来源:发表于2018-09-26 02:11 被阅读0次

上接:thrift 入门(1/2)
PS:我也不想拆,但是放一起文章太长无法发布。。。


四、thrift 入门

4.1 小试牛刀

首先,我们还是先用 thrift 实现一下前文中的需求。

按如下步骤操作:

  1. 安装 thrift(安装方式请自行百度,最新版本或稍老的版本都可以,我用的 0.9.3 )。
  2. 编写 demo.thrift 文件,内容如下:
// demo.thrift
namespace java com.ann.javas.frameworks.thrifts.demo.beans


enum Gender {
    BOY = 1,
    GIRL = 2,
}


struct UserInfo {
    1: required string name,
    3: required Gender gender,
    6: required i32 weight,
}

service DemoService {

    string sayHi(1:required UserInfo userInfo);

    string locate();

    string adsRecommendation();
}
  1. 打开终端,cd 到 demo.thrift 所在文件目录执行 thrift -r --gen java demo.thrift,thrift 会编译 demo.thrift 生成三个 java 源文件(分别是:DemoService.java、UserInfo.java、Gender.java,源码太多不在这儿贴了)到 ./gen-java 目录下。
  2. maven 引入 libthrift 包,版本须与前面安装的 thrift 版本保持一致。
  3. 把该文件拷贝到工程目录。
  4. 然后创建 3 个 java 源文件:
  • DemoServiceImpl.java,实现 thrift 定义的接口。
  • Demo1Server.java,服务端。
  • Demo1Client.java,客户端。
// DemoServiceImpl.java
package com.ann.javas.frameworks.thrifts.demo.impl;

import com.ann.javas.frameworks.thrifts.demo.beans.DemoService;
import com.ann.javas.frameworks.thrifts.demo.beans.Gender;
import com.ann.javas.frameworks.thrifts.demo.beans.UserInfo;

import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;
import java.util.Random;

/**
 * Created by liyanling on 2018/9/9.
 */
public class DemoServiceImpl implements DemoService.Iface{
  private static Logger logger = LoggerFactory.getLogger(DemoServiceImpl.class);

  private final static String SAY_HI_TEMPLATE = "Hi,%s%s!%s";

  private final static List<String> CITIES = Arrays.asList("上海", "北京", "广州", "成都", "内蒙古", "香港", "河北", "云南");

  private final static List<String> ADS = Arrays.asList("坐红旗车,走中国路", "要想皮肤好,早晚用大宝", "喝汇源果汁,走健康之路", "送礼就送脑白金",
                                                        "飘柔,就是这么自信");


  @Override
  public String sayHi(UserInfo userInfo) throws TException {
    String name = userInfo.getName();
    String nameSuffix = Gender.BOY == userInfo.getGender() ? "先生" : "女士";
    String weightNotice = userInfo.getWeight() >= 200 ? "你该减肥啦~" : "";
    return String.format(SAY_HI_TEMPLATE, name, nameSuffix, weightNotice);
  }

  @Override
  public String locate() throws TException {
    int citySize = CITIES.size();
    int randomIndex = Math.abs((new Random(System.currentTimeMillis())).nextInt()) % citySize;
    return CITIES.get(randomIndex);
  }

  @Override
  public String adsRecommendation() throws TException {
//    logger.info("sleep..........");
//    try {
//      Thread.sleep(10000);
//    } catch (InterruptedException e) {
//      e.printStackTrace();
//    }
//    logger.info("awake..........");
    int adsSize = ADS.size();
    int randomIndex = Math.abs((new Random(System.currentTimeMillis())).nextInt()) % adsSize;
    return ADS.get(randomIndex);
  }
}

// Demo1Server.java
package com.ann.javas.frameworks.thrifts.demo.impl;


import com.ann.javas.frameworks.thrifts.demo.beans.DemoService;

import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TSimpleServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Created by liyanling on 2018/9/9.
 */
public class Demo1Server {
  private static Logger logger = LoggerFactory.getLogger(Demo1Server.class);

  public static final int PORT = 9081;

  public static final int CLIENT_TIMEOUT = 100000;


  public static void main(String[] args) {
    Demo1Server server = new Demo1Server();
    server.startServer();
  }

  /**
   * 单线程服务模型:TSimpleServer + TServerSocket
   */
  private void startServer() {

    try {

      int count = 1;

      logger.info("{}:new 一个 TServerSocket 实例,指定端口号为:{} 超时时间:{}", count++, PORT, CLIENT_TIMEOUT);
      TServerTransport tServerTransport = new TServerSocket(PORT, CLIENT_TIMEOUT);

      TSimpleServer.Args args = new TSimpleServer.Args(tServerTransport);
      logger.info("{}:服务端初始化 {} 参数...", count++, "TSimpleServer.Args");

      TBinaryProtocol.Factory proFactory = new TBinaryProtocol.Factory(true, true);
      args.protocolFactory(proFactory);
      logger.info("{}:设置协议工厂为 TBinaryProtocol.Factory,即:使用 TBinaryProtocol 协议", count++);

      TProcessor processor = new DemoService.Processor(new DemoServiceImpl());
      args.processor(processor);
      logger.info("{}:设置 processor 为 {} 实例", count++, "UserServiceImpl");

      TServer server = new TSimpleServer(args);
      logger.info("{}:{} 实例,服务启动", count++, "TSimpleServer");
      server.serve();

    } catch (TTransportException e) {
      e.printStackTrace();
    }
  }
}

// Demo1Client.java
package com.ann.javas.frameworks.thrifts.demo.impl;

import com.ann.javas.frameworks.thrifts.demo.beans.DemoService;
import com.ann.javas.frameworks.thrifts.demo.beans.Gender;
import com.ann.javas.frameworks.thrifts.demo.beans.UserInfo;

import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Scanner;

/**
 * Created by liyanling on 2018/9/9.
 */
public class Demo1Client {
  private static Logger logger = LoggerFactory.getLogger(Demo1Client.class);

  public static String LOCAL_HOST = "127.0.0.1";
  public static Scanner scanner;


  public static void main(String[] args){
    scanner = new Scanner(System.in);
    callServer(LOCAL_HOST, Demo1Server.PORT);
    scanner.close();
  }


  /**
   * TSocket
   */
  public static void callServer(String IP, int PORT) {

    try {
      int count = 1;

      logger.info("{}:客户端,创建 TSocket 实例 (IP:{},PORT:{})...", count++, IP, PORT);
      TTransport transport = new TSocket(LOCAL_HOST, PORT);
      transport.open();
      logger.info("{}:打开 socket 连接", count++);

      TProtocol protocol = new TBinaryProtocol(transport);
      logger.info("{}:创建 TBinaryProtocol 实例", count++);

      DemoService.Iface client = new DemoService.Client(protocol);
      logger.info("{}:创建 Demo1Service.Client 实例", count++);

      // 测试 locate
      String locateResult = client.locate();
      logger.info("{}:locate 返回值 {}",count++,locateResult);

      // 测试 adsRecommendation
      String adsRecommendationResult = client.adsRecommendation();
      logger.info("{}:adsRecommendation 返回值 {}",count++,adsRecommendationResult);

      // 测试 sayHi
      logger.info("{}:请输入姓名:", count++);
      String name = scanner.nextLine();
      logger.info("{}:请输入性别(GIRL / BOY):", count++);
      String gender = scanner.nextLine();
      logger.info("{}:请输入体重(单位:斤):", count++);
      Integer weight = Integer.valueOf(scanner.nextLine());

      UserInfo userInfo = new UserInfo();
      userInfo.setName(name);
      userInfo.setGender(Gender.valueOf(gender));
      userInfo.setWeight(weight);

      String sayHiResult = client.sayHi(userInfo);
      logger.info("{}:sayHi 返回值 {}",count++,sayHiResult);


      transport.close();
      logger.info("{}:关闭 socket 连接", count++);

    } catch (Throwable t) {
      t.printStackTrace();
    }

  }
}

分别测试 sayHi、locate、adsRecommendation 功能,输出日志如下:

// Demo1Server 日志
00:35:50.229 [main] INFO  com.ann.javas.frameworks.thrifts.demo.impl.Demo1Server - 1:new 一个 TServerSocket 实例,指定端口号为:9081 超时时间:100000
00:35:50.249 [main] INFO  com.ann.javas.frameworks.thrifts.demo.impl.Demo1Server - 2:服务端初始化 TSimpleServer.Args 参数...
00:35:50.249 [main] INFO  com.ann.javas.frameworks.thrifts.demo.impl.Demo1Server - 3:设置协议工厂为 TBinaryProtocol.Factory,即:使用 TBinaryProtocol 协议
00:35:50.259 [main] INFO  com.ann.javas.frameworks.thrifts.demo.impl.Demo1Server - 4:设置 processor 为 UserServiceImpl 实例
00:35:50.260 [main] INFO  com.ann.javas.frameworks.thrifts.demo.impl.Demo1Server - 5:TSimpleServer 实例,服务启动

// Demo1Client 日志
00:35:53.312 [main] INFO  com.ann.javas.frameworks.thrifts.demo.impl.Demo1Client - 1:客户端,创建 TSocket 实例 (IP:127.0.0.1,PORT:9081)...
00:35:53.325 [main] INFO  com.ann.javas.frameworks.thrifts.demo.impl.Demo1Client - 2:打开 socket 连接
00:35:53.326 [main] INFO  com.ann.javas.frameworks.thrifts.demo.impl.Demo1Client - 3:创建 TBinaryProtocol 实例
00:35:53.328 [main] INFO  com.ann.javas.frameworks.thrifts.demo.impl.Demo1Client - 4:创建 Demo1Service.Client 实例
00:35:53.351 [main] INFO  com.ann.javas.frameworks.thrifts.demo.impl.Demo1Client - 5:locate 返回值 香港
00:35:53.363 [main] INFO  com.ann.javas.frameworks.thrifts.demo.impl.Demo1Client - 6:adsRecommendation 返回值 喝汇源果汁,走健康之路
00:35:53.363 [main] INFO  com.ann.javas.frameworks.thrifts.demo.impl.Demo1Client - 7:请输入姓名:
超人
00:36:21.899 [main] INFO  com.ann.javas.frameworks.thrifts.demo.impl.Demo1Client - 8:请输入性别(GIRL / BOY):
BOY
00:36:24.783 [main] INFO  com.ann.javas.frameworks.thrifts.demo.impl.Demo1Client - 9:请输入体重(单位:斤):
230
00:36:27.376 [main] INFO  com.ann.javas.frameworks.thrifts.demo.impl.Demo1Client - 10:sayHi 返回值 Hi,超人先生!你该减肥啦~
00:36:27.376 [main] INFO  com.ann.javas.frameworks.thrifts.demo.impl.Demo1Client - 11:关闭 socket 连接

这个例子主要看 client 端日志,server 端的不重要,一般都是 deamon 启动的。

不需要了解任何原理,光看这个用法你就会发现,thrift 通过某种方式屏蔽了用户对“序列化与反序列化”这个动作的感知,用户只需要:

  1. 按照某种规则、使用 xxx.thrift 文件定义 RPC 接口和数据结构。
  2. 执行命令生成对应的 java 代码。
  3. 然后在此基础上,实现你的业务逻辑就可以了。

4.2 序列化分析

关于 thrift 序列化的具体实现,可以在前面 demo 的基础上,通过串调用链路的方式进行了解。

需要进行序列化和反序列化操作的无非就这么 4 个场景:

  1. client->server,client 序列化参数。
  2. client->server,server 反序列化参数,处理请求。
  3. server->client,server 序列化返回值。
  4. server->client,client 反序列化返回值。

以 “client 序列化参数” 为例进行简单分析,过程如下:

  1. 以 sayHi() 为例,入口在 Demo1Client.java 客户端代码:
// Demo1Client.java
...
public class Demo1Client {
  ...
  public static void callServer(String IP, int PORT) {
      ...
       TProtocol protocol = new TBinaryProtocol(transport);
      ...
      DemoService.Iface client = new DemoService.Client(protocol);
      ...
      String sayHiResult = client.sayHi(userInfo);
      ...
  }
}
  1. Command+B 点 sayHi 进到 DemoService.java 中的 Iface 接口定义,这是编译器根据 IDL 自动生成的代码:
// DemoService.java
// Autogenerated by Thrift Compiler
public class DemoService {
  public interface Iface {
    public String sayHi(UserInfo userInfo) throws org.apache.thrift.TException;
    ...
  }
  ...
}
  1. 如果你用的 IDE 是 idea ,在左侧能看到一个绿色的小圆圈,鼠标悬浮在上面,会看到 Iface 接口定义的 sayHi() 方法有两个实现类:
  • ...beans.DemoService.Client,客户端实现,其实是 DemoService 的内部类,编译器自动生成的。
  • ...impl.DemoServiceImpl,服务端实现,继承自 DemoService.Iface,用于实现服务端业务逻辑。


    Iface.sayHi 的两个实现
  1. 点进客户端实现,也就是 DemoService 的内部类 DemoService.Client,然后你会看到下面这段代码:
// DemoService.java
// Autogenerated by Thrift Compiler
...
public class DemoService {
  ...
  public static class Client extends org.apache.thrift.TServiceClient implements Iface {
    ...
    public String sayHi(UserInfo userInfo) throws org.apache.thrift.TException
    {
      send_sayHi(userInfo);
      return recv_sayHi();
    }
    ...
  }
  ...
}

很明显,send_sayHi(userInfo) 是发送请求到服务端,return recv_sayHi() 是从服务端获取返回值。

  1. 本例需关注 send_sayHi(userInfo) 实现:
  • sayHi_args 又是个内部类,定义了 sayHi() 方法参数的数据结构,其唯一成员变量就是 UserInfo 类实例,对应的是 demo.thrift 中定义的 string sayHi(1:required UserInfo userInfo);
  • 先初始化一个 sayHi_args 类实例,准备好参数。
  • sendBase(...) 是 TServiceClient 提供的方法,调用时指明方法名和方法参数实例。
  • 在本例中,方法名即:"sayHi",参数即:sayHi_args 实例。
// DemoService.java
// Autogenerated by Thrift Compiler
...
public class DemoService {
  ...
  public static class Client extends org.apache.thrift.TServiceClient implements Iface {
    ...
    public String sayHi(UserInfo userInfo) throws org.apache.thrift.TException
    {
      send_sayHi(userInfo);
      return recv_sayHi();
    }
    public void send_sayHi(UserInfo userInfo) throws org.apache.thrift.TException
    {
      sayHi_args args = new sayHi_args();
      args.setUserInfo(userInfo);
      sendBase("sayHi", args);
    }
    ...
  }
  ...
}
  1. 进到 TServiceClient 看 sendBase(...) 具体实现:
...
public abstract class TServiceClient {
  ...
  protected void sendBase(String methodName, TBase<?, ?> args) throws TException {
    this.sendBase(methodName, args, (byte)1);
  }
  ...
  private void sendBase(String methodName, TBase<?, ?> args, byte type) throws TException {
    this.oprot_.writeMessageBegin(new TMessage(methodName, type, ++this.seqid_));
    args.write(this.oprot_);
    this.oprot_.writeMessageEnd();
    this.oprot_.getTransport().flush();
  }
  ...
}

可以看到 sendBase(...) 是按照:MessageBegin、MessageBody、MessageEnd 的顺序将消息写入 outputStream 。

  • writeMessageBegin 和 writeMessageEnd 操作,不同协议(TProtocol 子类)的实现各有不同。
  • writeMessageBody 则由方法的参数结构体自己实现,也就是 args.write(this.oprot_); 这一行。
  1. 跟进 args.write(this.oprot_); 里面,其实还是编译器根据 IDL 定义自动生成的代码:
// DemoService.java
// Autogenerated by Thrift Compiler
...
public class DemoService {
  ...
    private static class sayHi_argsStandardScheme extends StandardScheme<sayHi_args> {
      ...
      public void write(org.apache.thrift.protocol.TProtocol oprot, sayHi_args struct) throws org.apache.thrift.TException {
        struct.validate();
        oprot.writeStructBegin(STRUCT_DESC);
        if (struct.userInfo != null) {
          oprot.writeFieldBegin(USER_INFO_FIELD_DESC);
          struct.userInfo.write(oprot);
          oprot.writeFieldEnd();
        }
        oprot.writeFieldStop();
        oprot.writeStructEnd();
      }
    }
  ...
}
  • 在 write 操作前,有一个 validate 校验,主要是校验参数格式(required 字段非null)。
  • 剩下的应该不用多介绍了,差不多的规则和实现,到这儿往下跟就比较简单了,不再贴代码过来(有点多)。
  1. 也就是说,一个结构体大致会按照这个模板进行序列化:
- ${Message Begin}
  - Struct.validate()
  - ${struct Begin}
    - ${Field1 Begin}
    - ${Feild1 Body}
    - ${Field1 End}
    - ${Field2 Begin}
    - ${Feild2 Body}
    - ${Field2 End}
    - ......
    - ${Field Stop}
  - ${struct End}
- ${Message End}

PS:Struct 和 Feild 是可以嵌套的。
例如:sayHi_args 的 Feild1 是 userInfo,则 sayHi_args 的 Feild1Body 实际上是包含了一个 UserInfo Struct 内容。
emm......有点绕,追几遍调用链就能明白了。

反序列化的流程不在这儿贴分析流程了,感兴趣的同学可以自己跟一下。

4.3 序列化方案初探

现在对我们前面的分析做个总结,thrift 序列化方案有3个关键点:

  1. libthrift 包
  2. thrift IDL 语法规则
  3. thrift 编译器
序列化方案关键点

序列化和反序列化在 thrift 中是通过 “协议” 来体现的。

libthrift

  • 首先,libthrift 中有一个抽象类 TProtocol ,定义了一系列 readXXX 和 writeXXX 的方法,包括前面提到过的 Message、Struct、Feild 和 String、Double、Map、List、I32、I64、Binary 等数据结构的读写方法。
  • TProtocol 有多个实现,如:TBinaryProtocol 二进制格式、TCompactProtocol 压缩格式、TJSONProtocol JSON 格式 等。
  • 通常为节约带宽,提高传输效率,一般使用二进制类型的传输协议(TBinaryProtocol);但有时会还是会使用基于文本类型的协议,需根据项目/产品中的实际需求来确定。
  • 对 TProtocol 协议的具体实现感兴趣的同学,可以通过跟踪调用链的方式进行分析,然后使用 Wireshark 等工具抓包数据进行验证。

若 thrift 提供的协议不能满足要求,用户还可以基于 TProtocol 来实现定制化协议。

IDL

  • thrift 定义了一套 IDL 语法规则,可参见官方文档:Thrift interface description language
  • 用户需严格按照 thrift IDL 语法规则来定义 RPC 的接口和数据结构。

编译器

  • 指定需要编译的文件及生成代码的语言,执行 thrift 编译命令,thrift 编译器会对 IDL 文件进行编译并输出指定语言的源码文件。
  • 例如:
    1. IDL 定义的 service 会生成一个单独的 Service 类,Service 类内部的 Iface 接口包含了 IDL service 定义的所有方法。
    2. IDL service 定义的每一个方法,其入参和返回值都会自动生成单独的 Service 内部类(function_args 和 function_result),并提供序列化/反序列化方法(read、write)。
    3. IDL 中的 struct、enum、exception 等定义,也会生成相应的类,并提供 read、write 方法。

这三个要点结合起来,就构成了 thrift 序列化和反序列化的整套机制。


五、thrift 服务框架

至此,thrift 的两大关键点已经搞定一个:
1、协议(序列化和反序列化)
2、服务框架(网络通信)

协议(即:序列化和反序列化)反映的是“数据传输格式”,而对于一个 RPC 框架来说,再上层就是数据传输方式和服务模型了。

5.1 demo 分析

在前面的例子中,哪一段代码和“服务模型”或“数据传输方式”有关呢?

...
public class Demo1Server {
  ...
  private void startServer() {
    ...
    TServerTransport tServerTransport = new TServerSocket(PORT, CLIENT_TIMEOUT);
    TSimpleServer.Args args = new TSimpleServer.Args(tServerTransport);
    ...
    TServer server = new TSimpleServer(args);
    server.serve();
    ...
  }
  ...
}
  • 先看哪一句是用来启动服务的,显然是 server.serve();
  • 再往上 TServer server = new TSimpleServer(args); 定义了 server ,是一个 TSimpleServer 实例,这里选定的服务模型就是 TSimpleServer
  • 而初始化 server 实例时用到的 TServerTransport 就是其传输方式。

这次我们从上往下看,先讲服务模型。

5.2 TServer 服务模型

thrift 提供的服务模型可分为:单线程、多线程、事件驱动 3类。从另一个角度也可以划分为:阻塞服务模型、非阻塞服务模型 2类。

thrift 的服务模型可以拿来对标我们学习网络编程时学到的网络通信模型(IO模型)。

常见的有:

  • TSimpleServer - 简单的单线程服务模型,常用于测试。
  • TThreadPoolServer - 多线程、阻塞 IO 服务模型。
  • TNonblockingServer - 多线程、非阻塞 IO 服务模型。
  • 以及 TThreadedSelectorServer 和 THsHaServer。

TSimpleServer

前例中,Server 端使用的服务模型是 TSimpleServer 简单的单线程服务模型,其特点是:

  • 只有一个工作线程,循环监听新请求的到来,并完成请求的处理。
  • 这种服务模型的优点是使用方法简单、便于理解。
  • 但是一次只能接收和处理一个 socket 连接,效率比较低。
  • 主要用于测试或演示 thrift 的工作过程,在实际开发中很少会用到。

可以按照下面的步骤简单做个小测试:

  1. 在 DemoServiceImpl 的某个方法里,加一段 Thread.sleep(xxxx) 。
  2. 启动服务并建立一个 clientA 连接并发起请求,当前请求会夯在 sleep 的地方,不给客户端返回结果。
  3. 然后再新建一个 clientB 连接并发起请求。
  4. 你会发现,若 clientA 的请求不处理完,server 端不会与 clientB 建立连接,clientB 需要一直等着,直到 clientA 的请求被处理完并返回结果。
TSimpleServer 示意图

TThreadPoolServer

TThreadPoolServer 靠引入 “工作线程池” 解决了 TSimpleServer 不支持并发和多连接的问题,其特点是:

  • 使用一个专有线程负责监听端口、接受连接,具体的业务处理则由一个工作线程池中的子线程来完成。
  • 优点是将监听请求和业务处理两项工作做了拆分,在并发量较大时新连接也能够被及时接受。
  • 缺点是服务处理能力受限于线程池的工作能力,当并发请求大于线程池中的线程数时,新请求还是要排队等待。
TThreadPoolServer 示意图

TThreadServer 实现上还有一些细节没画在图上,比如:
线程池满如何处理?
有无异常重试机制?
有无优雅停服机制?
......
感兴趣的同学可以读源码了解一下。

TNonblockingServer
TNonblockingServer 使用非阻塞 I/O 解决了 TSimpleServer 一个客户端阻塞其他所有客户端的问题,其特点是:

  • 服务启动时开一个 SelectAcceptThread 线程,该线程内部通过引入 java.nio.channels.Selector 选择器(Java NIO 核心组件之一)实现单线程管理多个网络连接的功能。
  • 优点是服务端可以同时处理多个客户端连接请求,而不需要像 TSimpleServer 那样排队等待。
  • 缺点是业务处理还是单线程顺序执行,在业务处理比较复杂、耗时较长的场景下,执行效率也很难提升。

我先去补补 Selector 的课再回来补这里的图......

THsHaServer
THsHaServer 是 TNonblockingServer 的子类,可以简单粗暴的理解成 TNonblockingServer 和 TThreadPoolServer 的结合版本:

  • THsHaServer 在 TNonblockingServer 的基础上 ,引入了 TThreadPoolServer 的工作线程池,用于进行业务处理。
  • 优点是将业务处理过程扔到工作线程池处理,主线程可以直接返回进行下一次循环操作,效率大大提升。
  • 缺点是,由于主线程需要完成所有 socekt 的监听及数据读写工作,当并发请求数较大,且发送数据量较多时,监听 socket 上的新连接请求不能被及时接受。(TNonblockingServer 也有这个问题)

TThreadedSelectorServer
TThreadedSelectorServer 可以简单看做是 THsHaServer 的扩展(实际上并不是),其特点是:

  • 在 THsHaServer 的基础上,再引入一个 SelectorThread 线程池,以分散网络IO。
  • 并提供一个 SelectorThreadLoadBalancer 用作 SelectorThread 分发。

最后附上一张 TServer 精“剪”类图:


TServer 精“剪”类图

5.3 TTransport 传输方式

传输方式这部分,首先请同学们区分 TServerTransport 和 TTransport:

  • TServerTransport 定义的是“以何种方式监听和处理请求”;初始化 Server 服务端实例的时候需要明确指定;提供 listen()、accept()、interrupt() 等功能。
  • TTransport 定义的是“以何种形式在网络上传输数据”,底层实现是对 ServerSocket 做了封装;初始化 Client 客户端实例的时候需要明确指定;提供 open()/close()、read()/write()、peek()、flush() 等功能。

TTransport 和 TServerTransport 需要与 TServer 搭配使用:

  • 服务端为阻塞式服务时,使用 TServerSocket ;客户端使用 TSocket 配合。
  • 服务端为非阻塞式服务时,使用 TNonblockingServerSocket ;客户端使用 TFramedTransport 封装 TSocket 配合。

这里 TTransport 就是我们说的“传输方式”,即:how is transmitted?

  • TSocket,阻塞 IO 传输。
  • TFramedTransport,非阻塞 I/O,按帧/块传输(支持定长数据发送和接收)。
  • TMemoryTransport,内存IO。
  • TFileTransport,文件格式传输,不提供java的实现。
  • TZlibTransport,zlib压缩传输,不提供java的实现。
  • TNonblockingTransport,非阻塞式I/O,用于构建异步客户端。
    ......

不同语言对上述 Transport 的支持是不一样的。
我个人只用过 TSocket 和 TFramedTransport,有点心虚,怂...

TServerTransport 类图 TTransport 类图

5.4 小结

最后总结一下老生常谈的 thrift 分层:

  • Transport 传输层:定义了网络数据的传输方式,负责完成数据的网络传输。
  • TProtocol 协议层:定义了网络传输数据的格式,负责完成“应用数据-网络可传输的数据”的组装和拆解(序列化和反序列化)。
  • TProcessor:不重要,自动生成的处理器。
  • Server 服务层:把 Transport、Protocol、TProcessor 组合在一起,将服务运行起来,在指定端口监听并处理客户端请求。
thrift 层次图

六、思考

曾经在开发过程中遇到的问题,列在这里,供感兴趣的同学思考:

  • 若返回值里有 map,而 map 里写了个 map.put("key",null),就会抛异常,为什么?
  • 若 Server 端和 Client 端使用的 thrift 版本不一致,可能会出现什么问题?
  • IDL 中的 struct 如何扩展升级?

thrift 入门的内容差不多就到这里了,写的心好累。

如果文中内容有错误的地方,欢迎各位大佬指出,感恩。


七、参考

相关文章

  • thrift 入门(2/2)

    上接:thrift 入门(1/2)PS:我也不想拆,但是放一起文章太长无法发布。。。 四、thrift 入门 4....

  • thrift 入门(1/2)

    一、thrift 定义 “什么是 thrift” 这个问题,我曾问过别人,也有人拿来问过我。无论是官网、百度、谷歌...

  • C# 通过Thrift访问Hbase

    1、确保Hbase中已经开启Thrift服务2、Thrift官网( http://thrift.apache.or...

  • Thrift入门

    原文链接:thrift入门 转载请注明出处~ Thrift简介 什么是thrift 简单来说,是Facebook公...

  • PHP使用Thrift操作Hbase

    HBase 启动 Thrift服务 hbase启动thrift服务 需要注意的是,这里启动的是thrift2服务,...

  • thrift入门教程

    概述 本文是入门教程,想要了解thrift的源码实现可以移步我的CSDN专栏thrift源码解析 Thrift最初...

  • 2.Thrift指南 .thrift文件

    前言:在一个 .thrift 文件内定义服务,并用 thrift 工具生成服务接口。 1..thrift文件2.t...

  • thrift python语言官方实例跑通

    1. 配置好python环境后安装thrift库 2. 将tutorial.thrift、shared.thrif...

  • Python 访问 Hbase

    一、前提要求 1、thrift2、hbase-thrift 注:我通过pycharm安装 二、启动Hbase cd...

  • cdh的beeline使用spark作为执行引擎

    Unknown HS2 problem when communicating with Thrift server...

网友评论

      本文标题:thrift 入门(2/2)

      本文链接:https://www.haomeiwen.com/subject/niwroftx.html