上接:thrift 入门(1/2)
PS:我也不想拆,但是放一起文章太长无法发布。。。
四、thrift 入门
4.1 小试牛刀
首先,我们还是先用 thrift 实现一下前文中的需求。
按如下步骤操作:
- 安装 thrift(安装方式请自行百度,最新版本或稍老的版本都可以,我用的 0.9.3 )。
- 编写 demo.thrift 文件,内容如下:
// demo.thrift
namespace java com.ann.javas.frameworks.thrifts.demo.beans
enum Gender {
BOY = 1,
GIRL = 2,
}
struct UserInfo {
1: required string name,
3: required Gender gender,
6: required i32 weight,
}
service DemoService {
string sayHi(1:required UserInfo userInfo);
string locate();
string adsRecommendation();
}
- 打开终端,cd 到 demo.thrift 所在文件目录执行
thrift -r --gen java demo.thrift
,thrift 会编译 demo.thrift 生成三个 java 源文件(分别是:DemoService.java、UserInfo.java、Gender.java,源码太多不在这儿贴了)到 ./gen-java 目录下。 - maven 引入
libthrift
包,版本须与前面安装的 thrift 版本保持一致。 - 把该文件拷贝到工程目录。
- 然后创建 3 个 java 源文件:
- DemoServiceImpl.java,实现 thrift 定义的接口。
- Demo1Server.java,服务端。
- Demo1Client.java,客户端。
// DemoServiceImpl.java
package com.ann.javas.frameworks.thrifts.demo.impl;
import com.ann.javas.frameworks.thrifts.demo.beans.DemoService;
import com.ann.javas.frameworks.thrifts.demo.beans.Gender;
import com.ann.javas.frameworks.thrifts.demo.beans.UserInfo;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
/**
* Created by liyanling on 2018/9/9.
*/
public class DemoServiceImpl implements DemoService.Iface{
private static Logger logger = LoggerFactory.getLogger(DemoServiceImpl.class);
private final static String SAY_HI_TEMPLATE = "Hi,%s%s!%s";
private final static List<String> CITIES = Arrays.asList("上海", "北京", "广州", "成都", "内蒙古", "香港", "河北", "云南");
private final static List<String> ADS = Arrays.asList("坐红旗车,走中国路", "要想皮肤好,早晚用大宝", "喝汇源果汁,走健康之路", "送礼就送脑白金",
"飘柔,就是这么自信");
@Override
public String sayHi(UserInfo userInfo) throws TException {
String name = userInfo.getName();
String nameSuffix = Gender.BOY == userInfo.getGender() ? "先生" : "女士";
String weightNotice = userInfo.getWeight() >= 200 ? "你该减肥啦~" : "";
return String.format(SAY_HI_TEMPLATE, name, nameSuffix, weightNotice);
}
@Override
public String locate() throws TException {
int citySize = CITIES.size();
int randomIndex = Math.abs((new Random(System.currentTimeMillis())).nextInt()) % citySize;
return CITIES.get(randomIndex);
}
@Override
public String adsRecommendation() throws TException {
// logger.info("sleep..........");
// try {
// Thread.sleep(10000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// logger.info("awake..........");
int adsSize = ADS.size();
int randomIndex = Math.abs((new Random(System.currentTimeMillis())).nextInt()) % adsSize;
return ADS.get(randomIndex);
}
}
// Demo1Server.java
package com.ann.javas.frameworks.thrifts.demo.impl;
import com.ann.javas.frameworks.thrifts.demo.beans.DemoService;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TSimpleServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Created by liyanling on 2018/9/9.
*/
public class Demo1Server {
private static Logger logger = LoggerFactory.getLogger(Demo1Server.class);
public static final int PORT = 9081;
public static final int CLIENT_TIMEOUT = 100000;
public static void main(String[] args) {
Demo1Server server = new Demo1Server();
server.startServer();
}
/**
* 单线程服务模型:TSimpleServer + TServerSocket
*/
private void startServer() {
try {
int count = 1;
logger.info("{}:new 一个 TServerSocket 实例,指定端口号为:{} 超时时间:{}", count++, PORT, CLIENT_TIMEOUT);
TServerTransport tServerTransport = new TServerSocket(PORT, CLIENT_TIMEOUT);
TSimpleServer.Args args = new TSimpleServer.Args(tServerTransport);
logger.info("{}:服务端初始化 {} 参数...", count++, "TSimpleServer.Args");
TBinaryProtocol.Factory proFactory = new TBinaryProtocol.Factory(true, true);
args.protocolFactory(proFactory);
logger.info("{}:设置协议工厂为 TBinaryProtocol.Factory,即:使用 TBinaryProtocol 协议", count++);
TProcessor processor = new DemoService.Processor(new DemoServiceImpl());
args.processor(processor);
logger.info("{}:设置 processor 为 {} 实例", count++, "UserServiceImpl");
TServer server = new TSimpleServer(args);
logger.info("{}:{} 实例,服务启动", count++, "TSimpleServer");
server.serve();
} catch (TTransportException e) {
e.printStackTrace();
}
}
}
// Demo1Client.java
package com.ann.javas.frameworks.thrifts.demo.impl;
import com.ann.javas.frameworks.thrifts.demo.beans.DemoService;
import com.ann.javas.frameworks.thrifts.demo.beans.Gender;
import com.ann.javas.frameworks.thrifts.demo.beans.UserInfo;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Scanner;
/**
* Created by liyanling on 2018/9/9.
*/
public class Demo1Client {
private static Logger logger = LoggerFactory.getLogger(Demo1Client.class);
public static String LOCAL_HOST = "127.0.0.1";
public static Scanner scanner;
public static void main(String[] args){
scanner = new Scanner(System.in);
callServer(LOCAL_HOST, Demo1Server.PORT);
scanner.close();
}
/**
* TSocket
*/
public static void callServer(String IP, int PORT) {
try {
int count = 1;
logger.info("{}:客户端,创建 TSocket 实例 (IP:{},PORT:{})...", count++, IP, PORT);
TTransport transport = new TSocket(LOCAL_HOST, PORT);
transport.open();
logger.info("{}:打开 socket 连接", count++);
TProtocol protocol = new TBinaryProtocol(transport);
logger.info("{}:创建 TBinaryProtocol 实例", count++);
DemoService.Iface client = new DemoService.Client(protocol);
logger.info("{}:创建 Demo1Service.Client 实例", count++);
// 测试 locate
String locateResult = client.locate();
logger.info("{}:locate 返回值 {}",count++,locateResult);
// 测试 adsRecommendation
String adsRecommendationResult = client.adsRecommendation();
logger.info("{}:adsRecommendation 返回值 {}",count++,adsRecommendationResult);
// 测试 sayHi
logger.info("{}:请输入姓名:", count++);
String name = scanner.nextLine();
logger.info("{}:请输入性别(GIRL / BOY):", count++);
String gender = scanner.nextLine();
logger.info("{}:请输入体重(单位:斤):", count++);
Integer weight = Integer.valueOf(scanner.nextLine());
UserInfo userInfo = new UserInfo();
userInfo.setName(name);
userInfo.setGender(Gender.valueOf(gender));
userInfo.setWeight(weight);
String sayHiResult = client.sayHi(userInfo);
logger.info("{}:sayHi 返回值 {}",count++,sayHiResult);
transport.close();
logger.info("{}:关闭 socket 连接", count++);
} catch (Throwable t) {
t.printStackTrace();
}
}
}
分别测试 sayHi、locate、adsRecommendation 功能,输出日志如下:
// Demo1Server 日志
00:35:50.229 [main] INFO com.ann.javas.frameworks.thrifts.demo.impl.Demo1Server - 1:new 一个 TServerSocket 实例,指定端口号为:9081 超时时间:100000
00:35:50.249 [main] INFO com.ann.javas.frameworks.thrifts.demo.impl.Demo1Server - 2:服务端初始化 TSimpleServer.Args 参数...
00:35:50.249 [main] INFO com.ann.javas.frameworks.thrifts.demo.impl.Demo1Server - 3:设置协议工厂为 TBinaryProtocol.Factory,即:使用 TBinaryProtocol 协议
00:35:50.259 [main] INFO com.ann.javas.frameworks.thrifts.demo.impl.Demo1Server - 4:设置 processor 为 UserServiceImpl 实例
00:35:50.260 [main] INFO com.ann.javas.frameworks.thrifts.demo.impl.Demo1Server - 5:TSimpleServer 实例,服务启动
// Demo1Client 日志
00:35:53.312 [main] INFO com.ann.javas.frameworks.thrifts.demo.impl.Demo1Client - 1:客户端,创建 TSocket 实例 (IP:127.0.0.1,PORT:9081)...
00:35:53.325 [main] INFO com.ann.javas.frameworks.thrifts.demo.impl.Demo1Client - 2:打开 socket 连接
00:35:53.326 [main] INFO com.ann.javas.frameworks.thrifts.demo.impl.Demo1Client - 3:创建 TBinaryProtocol 实例
00:35:53.328 [main] INFO com.ann.javas.frameworks.thrifts.demo.impl.Demo1Client - 4:创建 Demo1Service.Client 实例
00:35:53.351 [main] INFO com.ann.javas.frameworks.thrifts.demo.impl.Demo1Client - 5:locate 返回值 香港
00:35:53.363 [main] INFO com.ann.javas.frameworks.thrifts.demo.impl.Demo1Client - 6:adsRecommendation 返回值 喝汇源果汁,走健康之路
00:35:53.363 [main] INFO com.ann.javas.frameworks.thrifts.demo.impl.Demo1Client - 7:请输入姓名:
超人
00:36:21.899 [main] INFO com.ann.javas.frameworks.thrifts.demo.impl.Demo1Client - 8:请输入性别(GIRL / BOY):
BOY
00:36:24.783 [main] INFO com.ann.javas.frameworks.thrifts.demo.impl.Demo1Client - 9:请输入体重(单位:斤):
230
00:36:27.376 [main] INFO com.ann.javas.frameworks.thrifts.demo.impl.Demo1Client - 10:sayHi 返回值 Hi,超人先生!你该减肥啦~
00:36:27.376 [main] INFO com.ann.javas.frameworks.thrifts.demo.impl.Demo1Client - 11:关闭 socket 连接
这个例子主要看 client 端日志,server 端的不重要,一般都是 deamon 启动的。
不需要了解任何原理,光看这个用法你就会发现,thrift 通过某种方式屏蔽了用户对“序列化与反序列化”这个动作的感知,用户只需要:
- 按照某种规则、使用 xxx.thrift 文件定义 RPC 接口和数据结构。
- 执行命令生成对应的 java 代码。
- 然后在此基础上,实现你的业务逻辑就可以了。
4.2 序列化分析
关于 thrift 序列化的具体实现,可以在前面 demo 的基础上,通过串调用链路的方式进行了解。
需要进行序列化和反序列化操作的无非就这么 4 个场景:
- client->server,client 序列化参数。
- client->server,server 反序列化参数,处理请求。
- server->client,server 序列化返回值。
- server->client,client 反序列化返回值。
以 “client 序列化参数” 为例进行简单分析,过程如下:
- 以 sayHi() 为例,入口在 Demo1Client.java 客户端代码:
// Demo1Client.java
...
public class Demo1Client {
...
public static void callServer(String IP, int PORT) {
...
TProtocol protocol = new TBinaryProtocol(transport);
...
DemoService.Iface client = new DemoService.Client(protocol);
...
String sayHiResult = client.sayHi(userInfo);
...
}
}
- Command+B 点 sayHi 进到 DemoService.java 中的 Iface 接口定义,这是编译器根据 IDL 自动生成的代码:
// DemoService.java
// Autogenerated by Thrift Compiler
public class DemoService {
public interface Iface {
public String sayHi(UserInfo userInfo) throws org.apache.thrift.TException;
...
}
...
}
- 如果你用的 IDE 是 idea ,在左侧能看到一个绿色的小圆圈,鼠标悬浮在上面,会看到 Iface 接口定义的 sayHi() 方法有两个实现类:
- ...beans.DemoService.Client,客户端实现,其实是 DemoService 的内部类,编译器自动生成的。
-
...impl.DemoServiceImpl,服务端实现,继承自 DemoService.Iface,用于实现服务端业务逻辑。
Iface.sayHi 的两个实现
- 点进客户端实现,也就是 DemoService 的内部类 DemoService.Client,然后你会看到下面这段代码:
// DemoService.java
// Autogenerated by Thrift Compiler
...
public class DemoService {
...
public static class Client extends org.apache.thrift.TServiceClient implements Iface {
...
public String sayHi(UserInfo userInfo) throws org.apache.thrift.TException
{
send_sayHi(userInfo);
return recv_sayHi();
}
...
}
...
}
很明显,send_sayHi(userInfo) 是发送请求到服务端,return recv_sayHi() 是从服务端获取返回值。
- 本例需关注 send_sayHi(userInfo) 实现:
- sayHi_args 又是个内部类,定义了 sayHi() 方法参数的数据结构,其唯一成员变量就是 UserInfo 类实例,对应的是 demo.thrift 中定义的
string sayHi(1:required UserInfo userInfo);
。 - 先初始化一个 sayHi_args 类实例,准备好参数。
- sendBase(...) 是 TServiceClient 提供的方法,调用时指明方法名和方法参数实例。
- 在本例中,方法名即:"sayHi",参数即:sayHi_args 实例。
// DemoService.java
// Autogenerated by Thrift Compiler
...
public class DemoService {
...
public static class Client extends org.apache.thrift.TServiceClient implements Iface {
...
public String sayHi(UserInfo userInfo) throws org.apache.thrift.TException
{
send_sayHi(userInfo);
return recv_sayHi();
}
public void send_sayHi(UserInfo userInfo) throws org.apache.thrift.TException
{
sayHi_args args = new sayHi_args();
args.setUserInfo(userInfo);
sendBase("sayHi", args);
}
...
}
...
}
- 进到 TServiceClient 看 sendBase(...) 具体实现:
...
public abstract class TServiceClient {
...
protected void sendBase(String methodName, TBase<?, ?> args) throws TException {
this.sendBase(methodName, args, (byte)1);
}
...
private void sendBase(String methodName, TBase<?, ?> args, byte type) throws TException {
this.oprot_.writeMessageBegin(new TMessage(methodName, type, ++this.seqid_));
args.write(this.oprot_);
this.oprot_.writeMessageEnd();
this.oprot_.getTransport().flush();
}
...
}
可以看到 sendBase(...) 是按照:MessageBegin、MessageBody、MessageEnd 的顺序将消息写入 outputStream 。
- writeMessageBegin 和 writeMessageEnd 操作,不同协议(TProtocol 子类)的实现各有不同。
- writeMessageBody 则由方法的参数结构体自己实现,也就是
args.write(this.oprot_);
这一行。
- 跟进 args.write(this.oprot_); 里面,其实还是编译器根据 IDL 定义自动生成的代码:
// DemoService.java
// Autogenerated by Thrift Compiler
...
public class DemoService {
...
private static class sayHi_argsStandardScheme extends StandardScheme<sayHi_args> {
...
public void write(org.apache.thrift.protocol.TProtocol oprot, sayHi_args struct) throws org.apache.thrift.TException {
struct.validate();
oprot.writeStructBegin(STRUCT_DESC);
if (struct.userInfo != null) {
oprot.writeFieldBegin(USER_INFO_FIELD_DESC);
struct.userInfo.write(oprot);
oprot.writeFieldEnd();
}
oprot.writeFieldStop();
oprot.writeStructEnd();
}
}
...
}
- 在 write 操作前,有一个 validate 校验,主要是校验参数格式(required 字段非null)。
- 剩下的应该不用多介绍了,差不多的规则和实现,到这儿往下跟就比较简单了,不再贴代码过来(有点多)。
- 也就是说,一个结构体大致会按照这个模板进行序列化:
- ${Message Begin}
- Struct.validate()
- ${struct Begin}
- ${Field1 Begin}
- ${Feild1 Body}
- ${Field1 End}
- ${Field2 Begin}
- ${Feild2 Body}
- ${Field2 End}
- ......
- ${Field Stop}
- ${struct End}
- ${Message End}
PS:Struct 和 Feild 是可以嵌套的。
例如:sayHi_args 的 Feild1 是 userInfo,则 sayHi_args 的 Feild1Body 实际上是包含了一个 UserInfo Struct 内容。
emm......有点绕,追几遍调用链就能明白了。
反序列化的流程不在这儿贴分析流程了,感兴趣的同学可以自己跟一下。
4.3 序列化方案初探
现在对我们前面的分析做个总结,thrift 序列化方案有3个关键点:
- libthrift 包
- thrift IDL 语法规则
- thrift 编译器
序列化和反序列化在 thrift 中是通过 “协议” 来体现的。
libthrift
- 首先,libthrift 中有一个抽象类
TProtocol
,定义了一系列 readXXX 和 writeXXX 的方法,包括前面提到过的 Message、Struct、Feild 和 String、Double、Map、List、I32、I64、Binary 等数据结构的读写方法。 - TProtocol 有多个实现,如:
TBinaryProtocol
二进制格式、TCompactProtocol
压缩格式、TJSONProtocol
JSON 格式 等。 - 通常为节约带宽,提高传输效率,一般使用二进制类型的传输协议(TBinaryProtocol);但有时会还是会使用基于文本类型的协议,需根据项目/产品中的实际需求来确定。
- 对 TProtocol 协议的具体实现感兴趣的同学,可以通过跟踪调用链的方式进行分析,然后使用 Wireshark 等工具抓包数据进行验证。
若 thrift 提供的协议不能满足要求,用户还可以基于 TProtocol 来实现定制化协议。
IDL
- thrift 定义了一套 IDL 语法规则,可参见官方文档:Thrift interface description language。
- 用户需严格按照 thrift IDL 语法规则来定义 RPC 的接口和数据结构。
编译器
- 指定需要编译的文件及生成代码的语言,执行 thrift 编译命令,thrift 编译器会对 IDL 文件进行编译并输出指定语言的源码文件。
- 例如:
- IDL 定义的 service 会生成一个单独的 Service 类,Service 类内部的 Iface 接口包含了 IDL service 定义的所有方法。
- IDL service 定义的每一个方法,其入参和返回值都会自动生成单独的 Service 内部类(function_args 和 function_result),并提供序列化/反序列化方法(read、write)。
- IDL 中的 struct、enum、exception 等定义,也会生成相应的类,并提供 read、write 方法。
这三个要点结合起来,就构成了 thrift 序列化和反序列化的整套机制。
五、thrift 服务框架
至此,thrift 的两大关键点已经搞定一个:
1、协议(序列化和反序列化)
2、服务框架(网络通信)
协议(即:序列化和反序列化)反映的是“数据传输格式”,而对于一个 RPC 框架来说,再上层就是数据传输方式和服务模型了。
5.1 demo 分析
在前面的例子中,哪一段代码和“服务模型”或“数据传输方式”有关呢?
...
public class Demo1Server {
...
private void startServer() {
...
TServerTransport tServerTransport = new TServerSocket(PORT, CLIENT_TIMEOUT);
TSimpleServer.Args args = new TSimpleServer.Args(tServerTransport);
...
TServer server = new TSimpleServer(args);
server.serve();
...
}
...
}
- 先看哪一句是用来启动服务的,显然是
server.serve();
。 - 再往上
TServer server = new TSimpleServer(args);
定义了 server ,是一个 TSimpleServer 实例,这里选定的服务模型就是TSimpleServer
。 - 而初始化 server 实例时用到的
TServerTransport
就是其传输方式。
这次我们从上往下看,先讲服务模型。
5.2 TServer 服务模型
thrift 提供的服务模型可分为:单线程、多线程、事件驱动 3类。从另一个角度也可以划分为:阻塞服务模型、非阻塞服务模型 2类。
thrift 的服务模型可以拿来对标我们学习网络编程时学到的网络通信模型(IO模型)。
常见的有:
- TSimpleServer - 简单的单线程服务模型,常用于测试。
- TThreadPoolServer - 多线程、阻塞 IO 服务模型。
- TNonblockingServer - 多线程、非阻塞 IO 服务模型。
- 以及 TThreadedSelectorServer 和 THsHaServer。
TSimpleServer
前例中,Server 端使用的服务模型是 TSimpleServer 简单的单线程服务模型,其特点是:
- 只有一个工作线程,循环监听新请求的到来,并完成请求的处理。
- 这种服务模型的优点是使用方法简单、便于理解。
- 但是一次只能接收和处理一个 socket 连接,效率比较低。
- 主要用于测试或演示 thrift 的工作过程,在实际开发中很少会用到。
可以按照下面的步骤简单做个小测试:
- 在 DemoServiceImpl 的某个方法里,加一段 Thread.sleep(xxxx) 。
- 启动服务并建立一个 clientA 连接并发起请求,当前请求会夯在 sleep 的地方,不给客户端返回结果。
- 然后再新建一个 clientB 连接并发起请求。
- 你会发现,若 clientA 的请求不处理完,server 端不会与 clientB 建立连接,clientB 需要一直等着,直到 clientA 的请求被处理完并返回结果。
TThreadPoolServer
TThreadPoolServer 靠引入 “工作线程池” 解决了 TSimpleServer 不支持并发和多连接的问题,其特点是:
- 使用一个专有线程负责监听端口、接受连接,具体的业务处理则由一个工作线程池中的子线程来完成。
- 优点是将监听请求和业务处理两项工作做了拆分,在并发量较大时新连接也能够被及时接受。
- 缺点是服务处理能力受限于线程池的工作能力,当并发请求大于线程池中的线程数时,新请求还是要排队等待。
TThreadServer 实现上还有一些细节没画在图上,比如:
线程池满如何处理?
有无异常重试机制?
有无优雅停服机制?
......
感兴趣的同学可以读源码了解一下。
TNonblockingServer
TNonblockingServer 使用非阻塞 I/O 解决了 TSimpleServer 一个客户端阻塞其他所有客户端的问题,其特点是:
- 服务启动时开一个 SelectAcceptThread 线程,该线程内部通过引入 java.nio.channels.Selector 选择器(Java NIO 核心组件之一)实现单线程管理多个网络连接的功能。
- 优点是服务端可以同时处理多个客户端连接请求,而不需要像 TSimpleServer 那样排队等待。
- 缺点是业务处理还是单线程顺序执行,在业务处理比较复杂、耗时较长的场景下,执行效率也很难提升。
我先去补补 Selector 的课再回来补这里的图......
THsHaServer
THsHaServer 是 TNonblockingServer 的子类,可以简单粗暴的理解成 TNonblockingServer 和 TThreadPoolServer 的结合版本:
- THsHaServer 在 TNonblockingServer 的基础上 ,引入了 TThreadPoolServer 的工作线程池,用于进行业务处理。
- 优点是将业务处理过程扔到工作线程池处理,主线程可以直接返回进行下一次循环操作,效率大大提升。
- 缺点是,由于主线程需要完成所有 socekt 的监听及数据读写工作,当并发请求数较大,且发送数据量较多时,监听 socket 上的新连接请求不能被及时接受。(TNonblockingServer 也有这个问题)
TThreadedSelectorServer
TThreadedSelectorServer 可以简单看做是 THsHaServer 的扩展(实际上并不是),其特点是:
- 在 THsHaServer 的基础上,再引入一个 SelectorThread 线程池,以分散网络IO。
- 并提供一个 SelectorThreadLoadBalancer 用作 SelectorThread 分发。
最后附上一张 TServer 精“剪”类图:
TServer 精“剪”类图
5.3 TTransport 传输方式
传输方式这部分,首先请同学们区分 TServerTransport 和 TTransport:
-
TServerTransport
定义的是“以何种方式监听和处理请求”;初始化 Server 服务端实例的时候需要明确指定;提供 listen()、accept()、interrupt() 等功能。 -
TTransport
定义的是“以何种形式在网络上传输数据”,底层实现是对 ServerSocket 做了封装;初始化 Client 客户端实例的时候需要明确指定;提供 open()/close()、read()/write()、peek()、flush() 等功能。
TTransport 和 TServerTransport 需要与 TServer 搭配使用:
- 服务端为阻塞式服务时,使用 TServerSocket ;客户端使用 TSocket 配合。
- 服务端为非阻塞式服务时,使用 TNonblockingServerSocket ;客户端使用 TFramedTransport 封装 TSocket 配合。
这里 TTransport 就是我们说的“传输方式”,即:how is transmitted?
- TSocket,阻塞 IO 传输。
- TFramedTransport,非阻塞 I/O,按帧/块传输(支持定长数据发送和接收)。
- TMemoryTransport,内存IO。
- TFileTransport,文件格式传输,不提供java的实现。
- TZlibTransport,zlib压缩传输,不提供java的实现。
- TNonblockingTransport,非阻塞式I/O,用于构建异步客户端。
......
TServerTransport 类图 TTransport 类图不同语言对上述 Transport 的支持是不一样的。
我个人只用过 TSocket 和 TFramedTransport,有点心虚,怂...
5.4 小结
最后总结一下老生常谈的 thrift 分层:
- Transport 传输层:定义了网络数据的传输方式,负责完成数据的网络传输。
- TProtocol 协议层:定义了网络传输数据的格式,负责完成“应用数据-网络可传输的数据”的组装和拆解(序列化和反序列化)。
- TProcessor:不重要,自动生成的处理器。
- Server 服务层:把 Transport、Protocol、TProcessor 组合在一起,将服务运行起来,在指定端口监听并处理客户端请求。
六、思考
曾经在开发过程中遇到的问题,列在这里,供感兴趣的同学思考:
- 若返回值里有 map,而 map 里写了个 map.put("key",null),就会抛异常,为什么?
- 若 Server 端和 Client 端使用的 thrift 版本不一致,可能会出现什么问题?
- IDL 中的 struct 如何扩展升级?
thrift 入门的内容差不多就到这里了,写的心好累。
如果文中内容有错误的地方,欢迎各位大佬指出,感恩。
网友评论