Thrift 为服务器端提供了多种工作模式:TSimpleServer、TNonblockingServer、THsHaServer、TThreadPoolServer、TThreadedSelectorServer
TSimpleServer
- TSimpleServer 是一个单线程阻塞 I/O 的 Server,它循环监听新请求的到来并对请求进行处理
- 但一次只能接收和处理一个 Socket 连接,效率低,主要用于测试和学习,实际开发很少用到
TNonblockingServer
- TNonblockingServer 是一个单线程 NIO 的 Server,NIO 通过 Selector 循环监听所有 Socket,每次 selector 结束时,处理所有就绪状态的 Socket
- 一个阻塞 I/O 线程一次只能处理一个 Socket 连接,而一个 NIO 线程可以处理多个 Socket 连接
- 虽然一个 NIO 线程可以同时接收多个 Socket 连接,但在处理任务时仍然是阻塞的
public class Server {
public static void main(String[] args) throws Exception {
TProcessor processor = new UserService.Processor<UserService.Iface>(new UserServiceImpl());
TNonblockingServerSocket serverSocket = new TNonblockingServerSocket(8181);
TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();
TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverSocket)
.processor(processor)
.protocolFactory(protocolFactory);
TServer server = new TNonblockingServer(serverArgs);
System.out.println("开启Thrift服务器,监听端口:8181");
server.serve();
}
}
TThreadPoolServer
- TThreadPoolServer 模式采用阻塞 Socket 方式工作,主线程负责阻塞监听是否有新的 Socket 连接,业务交由一个线程池进行处理
- 该模式的处理能力受限于线程池的工作能力,当并发请求数大于线程池中的线程数时,新的请求会进入队列中排队等待处理
public class Server {
public static void main(String[] args) throws Exception {
TProcessor processor = new UserService.Processor<UserService.Iface>(new UserServiceImpl());
TServerSocket serverSocket = new TServerSocket(8181);
TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();
TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverSocket)
.processor(processor)
.protocolFactory(protocolFactory);
TServer server = new TThreadPoolServer(serverArgs);
System.out.println("开启Thrift服务器,监听端口:8181");
server.serve();
}
}
THsHaServer
- THsHaServer 是 TNonBlockingServer 的子类,在 TNonBlockingServer 模式中,采用一个线程来完成对所有的 Socket 监听和业务处理,造成了效率低下,而 THsHaServer 模式使用了一个线程池来专门进行业务处理
public class Server {
public static void main(String[] args) throws Exception {
TProcessor processor = new UserService.Processor<UserService.Iface>(new UserServiceImpl());
TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(8181);
TTransportFactory transportFactory = new TFramedTransport.Factory();
TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();
THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport)
.processor(processor)
.protocolFactory(protocolFactory)
.transportFactory(transportFactory);
TServer server = new THsHaServer(serverArgs);
System.out.println("开启Thrift服务器,监听端口:8181");
server.serve();
}
}
TThreadedSelectorServer
- TThreadedSelectorServer 是 Thrift 目前最高级的模式,它内部由几个部分构成:
- 一个 AcceptThread 线程对象,专门用于处理 Socket 上的新连接
- 若干个 SelectorThread 对象专门用于处理业务 Socket 的网络 I/O 操作,所有网络数据的读写都是由这些线程来完成的
- 一个负载均衡器 SelectorThreadLoadBalancer 对象,主要用于 AcceptThread 线程接收到新的 Socket 连接请求时,将请求分配给 SelectorThread 线程
- 一个 ExecutorService 线程池,负责完成业务逻辑处理
public class Server {
public static void main(String[] args) throws Exception {
TProcessor processor = new UserService.Processor<UserService.Iface>(new UserServiceImpl());
TNonblockingServerSocket serverSocket = new TNonblockingServerSocket(8181);
TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();
TThreadedSelectorServer.Args serverArgs = new TThreadedSelectorServer.Args(serverSocket)
.processor(processor)
.protocolFactory(protocolFactory);
TServer server = new TThreadedSelectorServer(serverArgs);
System.out.println("开启Thrift服务器,监听端口:8181");
server.serve();
}
}
网友评论