美文网首页程序员
RPC 框架 Thrift 的探索与使用

RPC 框架 Thrift 的探索与使用

作者: 醉里挑灯A | 来源:发表于2017-08-14 16:24 被阅读363次

1. 概述

Thrift是跨语言的RPC框架,现在是一个Apache的顶级项目。Thrift通过一个中间语言--IDL接口定义语言,来定义RPC的接口和数据类型。使用Thrift的代码生成工具(thrift-0.10.0.exe编译器)读取IDL文件,生成不同语言的服务端与客户端代码,并由生成的代码负责RPC协议层和传输层的实现。目前支持语言C++,Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, Smalltalk

2. 下载使用

官网地址:thrift.apache.org

  1. 如果是Win OS , 在官网下载exe文件
  2. 然后编辑.thrift 文件,比如:
/**
* 实现功能:创建一个查询结果struct和一个服务接口service
* 基于:thrift-0.10.0
**/
namespace java com.thrift
struct QryResult {
        /**
        *返回码, 1成功,0失败
        */
        1:i32 code;
        /**
        *响应信息
        */
        2:string msg;
}
service TestQry{
        /**
        * 测试查询接口,当qryCode值为1时返回"成功"的响应信息,qryCode值为其他值时返回"失败"的响应信息
        * @param qryCode测试参数
        */
        QryResult qryTest(1:i32 qryCode)
}
  1. 进入命令行,执行thrift-0.10.0.exe --gen java TestQry.thrift,之后会生成基本代码.
  2. 在工程中引入thrift jar 包,把生成的文件引入工程中
<dependency>
  <groupId>org.apache.thrift</groupId>
  <artifactId>libthrift</artifactId>
  <version>0.10.0</version>
</dependency>

3. 基本概念

3.1 数据类型

基本类型:
bool:布尔值,true 或 false,对应 Java 的 boolean
byte:8 位有符号整数,对应 Java 的 byte
i16:16 位有符号整数,对应 Java 的 short
i32:32 位有符号整数,对应 Java 的 int
i64:64 位有符号整数,对应 Java 的 long
double:64 位浮点数,对应 Java 的 double
string:utf-8编码的字符串,对应 Java 的 String

结构体类型:
struct:定义公共的对象,类似于 C 语言中的结构体定义,在 Java 中是一个 JavaBean

容器类型:
list:对应 Java 的 ArrayList
set:对应 Java 的 HashSet
map:对应 Java 的 HashMap

异常类型:
exception:对应 Java 的 Exception

服务类型:
service:对应服务的类

3.2 传输协议

Thrift 支持多种传输协议,用户可以根据实际需求选择合适的类型。Thrift 传输协议上总体可划分为文本 (text) 和二进制 (binary) 传输协议两大类,一般在生产环境中使用二进制类型的传输协议为多数(相对于文本和 JSON 具有更高的传输效率)。常用的协议包含:

TBinaryProtocol:是Thrift的默认协议,使用二进制编码格式进行数据传输,基本上直接发送原始数据
TCompactProtocol:压缩的、密集的数据传输协议,基于Variable-length quantity的zigzag 编码格式
TJSONProtocol:以JSON (JavaScript Object Notation)数据编码协议进行数据传输
TDebugProtocol:常常用以编码人员测试,以文本的形式展现方便阅读

3.3 服务

Thrift 包含三个主要的组件:protocol,transport 和 server。
其中,protocol 定义了消息是怎样序列化的;transport 定义了消息是怎样在客户端和服务器端之间通信的;server 用于从 transport 接收序列化的消息,根据 protocol 反序列化之,调用用户定义的消息处理器,并序列化消息处理器的响应,然后再将它们写回 transport。
Thrift 模块化的结构使得它能提供各种 server 实现。下面列出了 Java 中可用的 server 实现:

TSimpleServer
TNonblockingServer
THsHaServer
TThreadedSelectorServer
TThreadPoolServer

3.4 编码步骤

3.4.1 服务端基本步骤

  1. 实现服务处理接口
  2. 创建TProcessor
  3. 创建TServerTransport
  4. 创建TProtocol
  5. 创建TServer
  6. 启动Server

3.4.2 客户端基本步骤

  1. 创建Transport
  2. 创建TProtocol
  3. 基于TTransport和TProtocol创建Client
  4. 调用Client的相应方法
  5. 服务方式的选择需要根据具体的业务需求.

4. 实例演示

4.1 一些说明

  1. 服务端采用的是多接口的实现,因此支持多个.thrift的文件的实现.对应的在客户端,也要使用多接口的方式实现
  2. 在实际运行中,会出现org.apache.thrift.transport.TTransportException: Frame size (40792739) larger than max length (16384000)!异常,所以在代码中会修改一次性传输的大小(1638400000),这个需在客户端和服务端同时设定.
  3. 服务模型的选择
Thrift的TThreadedSelectorServer 服务模式.png

采用Thrift的TThreadedSelectorServer 服务模式,提高并发请求的响应.TThreadedSelectorServer模式中有一个专门的线程AcceptThread用于处理新连接请求,因此能够及时响应大量并发连接请求;另外它将网络I/O操作分散到多个SelectorThread线程中来完成,因此能够快速对网络I/O进行读写操作,能够很好地应对网络I/O较多的情况;Thrift的TThreadedSelectorServer,用业务之外的小demo进行测试,并发提高能很快返回结果。

4.2 接口的实现

public class QueryImp implements TestQry.Iface{
    public QryResult qryTest(int qryCode) throws TException {
        QryResult result = new QryResult();
        if(qryCode == 1){
            result.code = 1;
            result.msg = "success";
        }else {
            result.code = 0;
            result.msg = "fail";
        }
        for(int i=0;i<10000;i++){
               System.out.println("2");
        }
        return result;
    }
}

4.3 服务端的实现

private final static int DEFAULT_PORT = 30002;
    private static TServer server = null;
    public static void main(String[] args) {
        TNonblockingServerSocket socket = null;
        try {
            socket = new TNonblockingServerSocket(DEFAULT_PORT);
        } catch (TTransportException e) {
            e.printStackTrace();
        }
        //多接口的实现
        TProcessor  tProcessor1 = new TestQry.Processor<TestQry.Iface>(new QueryImp());
        TProcessor tProcessor2 = new TestQry1.Processor<TestQry1.Iface>(new QueryImp1());
        TThreadedSelectorServer.Args arg = new TThreadedSelectorServer.Args(socket);
        TMultiplexedProcessor multiplexedProcessor = new TMultiplexedProcessor();
        multiplexedProcessor.registerProcessor("processor1",tProcessor1);
        multiplexedProcessor.registerProcessor("processor2",tProcessor2);
        arg.processor(multiplexedProcessor);
        arg.protocolFactory(new TCompactProtocol.Factory());
        //如果传输数据量过大,需要修改这个地方的参数,默认16M
        arg.transportFactory(new TFramedTransport.Factory(1638400000));
        arg.processorFactory(new TProcessorFactory(multiplexedProcessor));
        //监听线程数
        arg.selectorThreads(10);
        //工作线程数
        ExecutorService pool = Executors.newFixedThreadPool(100);
        arg.executorService(pool);
        arg.getExecutorService();
        server = new TThreadedSelectorServer(arg);
        System.out.println("Starting server on port " + DEFAULT_PORT + "......");
        server.serve();
    }

4.4 客户端实现

private final static int DEFAULT_QRY_CODE = 1;
    public void startClient() {
        TTransport tTransport = null;
        try {
            tTransport = getTTransport();
        } catch (Exception e) {
            e.printStackTrace();
        }
        TProtocol protocol = new TCompactProtocol(tTransport);
        //对应的客户端也要用多接口的方式实现
        TMultiplexedProtocol q1 = new TMultiplexedProtocol(protocol,"processor1");
        TMultiplexedProtocol q2 = new TMultiplexedProtocol(protocol,"processor2");
        TestQry.Client client1 = new TestQry.Client(q1);
        TestQry1.Client client2 = new TestQry1.Client(q2);
        try {
            QryResult result = client1.qryTest(DEFAULT_QRY_CODE);
            System.out.println("code="+result.code+" msg="+result.msg);
            close(tTransport);
        } catch (TException e) {
            e.printStackTrace();
        }


    }

    private static TTransport getTTransport() throws Exception{
        TTransport tTransport = getTTransport("127.0.0.1",30002,300000);
        if(tTransport != null && !tTransport.isOpen()){
            tTransport.open();
        }
        return tTransport;
    }

    private static TTransport getTTransport(String host, int port, int timeout) {
        final TSocket tSocket = new TSocket(host,port,timeout);
        final TTransport tTransport = new TFramedTransport(tSocket,1638400000);
        return tTransport;
    }
    private void close(TTransport transport){
        if(transport !=null && transport.isOpen()){
            transport.close();
        }
    }

5. 参考资料

  1. 官网
  2. Thrift入门及Java实例演示
  3. 和 Thrift 的一场美丽邂逅
  4. 由浅入深了解Thrift(三)——Thrift server端的几种工作模式分析
  5. Thrift框架服务端并发处理模式的java示例
  6. Thrift多接口支持例子

相关文章

  • RPC 框架 Thrift 的探索与使用

    1. 概述 Thrift是跨语言的RPC框架,现在是一个Apache的顶级项目。Thrift通过一个中间语言--I...

  • Netty学习笔记

    公司使用thrift作为RPC框架,其中通信框架使用netty取代thrift自带的通信,所以看了官网文档,然后翻...

  • Thrift 初探

    引言 知乎使用的 RPC 框架是基于 Thrift 构建的。自然就很有必要了解下 Thrift 是什么?如何使用?...

  • 小白入门微服务(1) - RPC 初体验,python、node

    概述 前言 什么是 RPC RPC 原理 常用 RPC 框架对比 thrift 基础 python、nodejs ...

  • thrift事件交互

    一、thrift 1 thrift简介Thrift是一个RPC框架,由facebook开发。它支持可扩展且跨语言的...

  • golang 网络框架之 thrift

    thrift 最初是 facebook 开发使用的 rpc 通信框架,后来贡献给了 apache 基金会,出来得比...

  • Thrift(一):快速入门

    什么是 Thrift? Thrift 是 Facebook 开源的一款跨语言 RPC 框架,它通过 IDL(接口描...

  • Thrift

    什么是Thrift? 简单来说,是Facebook公布的一款开源跨语言的RPC框架。 【什么是RPC框架?】 RP...

  • Python RPC 之 Thrift

    Thrift 简介: Thrift 是一款高性能、开源的 RPC 框架,产自 Facebook 后贡献给了 Apa...

  • Golang RPC 之 Thrift

    Thrift 简介: Thrift 是一款高性能、开源的 RPC 框架,产自 Facebook 后贡献给了 Apa...

网友评论

    本文标题:RPC 框架 Thrift 的探索与使用

    本文链接:https://www.haomeiwen.com/subject/agburxtx.html