美文网首页
Thrift server比较和使用

Thrift server比较和使用

作者: 弋炎 | 来源:发表于2017-09-05 17:01 被阅读200次

    Thrift提供了多种服务器实现。

    WechatIMG20.jpeg
    它们各有特点,适应不同的需求环境。下面我将结合网络资源和自己的理解来梳理一下Thrift各种java server的特点和使用姿势。
    • TSimpleServer

    while (!stopped_) {
      TTransport client = null;
      TProcessor processor = null;
      TTransport inputTransport = null;
      TTransport outputTransport = null;
      TProtocol inputProtocol = null;
      TProtocol outputProtocol = null;
      ServerContext connectionContext = null;
      try {
        client = serverTransport_.accept();
        if (client != null) {
          processor = processorFactory_.getProcessor(client);
          inputTransport = inputTransportFactory_.getTransport(client);
          outputTransport = outputTransportFactory_.getTransport(client);
          inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
          outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
          if (eventHandler_ != null) {
            connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol);
          }
          while (true) {
            if (eventHandler_ != null) {
              eventHandler_.processContext(connectionContext, inputTransport, outputTransport);
            }
            if(!processor.process(inputProtocol, outputProtocol)) {
              break;
            }
          }
        }
      }
    }
    

    TSimplerServer在while循环中每次接受一个连接,处理连接请求,直到客户端关闭了连接,它才会去接受一个新的连接。由于它只在一个单独的线程中以阻塞I/O的方式完成这些工作,所以它只能服务一个客户端连接,其他所有客户端在被服务器端接受之前都只能等待。其使用方法如下:

    public static void sample() throws Exception {
        ServerSocket socket = new ServerSocket(8912);
        TServerSocket serverTransport = new TServerSocket(socket); // HelloServiceImpl 为自己实现的Thrift服务接口的具体实现  
        TServer server = new TSimpleServer(new TServer.Args(serverTransport).processor(processor));
        System.out.println("Server start...");
        server.serve();
    }
    
    • TNonblockingServer

    public void run() {
      try {
        if (eventHandler_ != null) {
          eventHandler_.preServe();
        }
    
        while (!stopped_) {
          select();
          processInterestChanges();
        }
        for (SelectionKey selectionKey : selector.keys()) {
          cleanupSelectionKey(selectionKey);
        }
      } catch (Throwable t) {
        LOGGER.error("run() exiting due to uncaught error", t);
      } finally {
        try {
          selector.close();
        } catch (IOException e) {
          LOGGER.error("Got an IOException while closing selector!", e);
        }
        stopped_ = true;
      }
    }
    private void select() {
      try {
        // wait for io events.
        selector.select();
    
        // process the io events we received
        Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
        while (!stopped_ && selectedKeys.hasNext()) {
          SelectionKey key = selectedKeys.next();
          selectedKeys.remove();
    
          // skip if not valid
          if (!key.isValid()) {
            cleanupSelectionKey(key);
            continue;
          }
    
          // if the key is marked Accept, then it has to be the server
          // transport.
          if (key.isAcceptable()) {
            handleAccept();
          } else if (key.isReadable()) {
            // deal with reads
            handleRead(key);
          } else if (key.isWritable()) {
            // deal with writes
            handleWrite(key);
          } else {
            LOGGER.warn("Unexpected state in select! " + key.interestOps());
          }
        }
      } catch (IOException e) {
        LOGGER.warn("Got an IOException while selecting!", e);
      }
    }
    

    TNonblockingServer 使用非阻塞的 I/O 解决了TSimpleServer一个客户端阻塞其他所有客户端的问题。它使用了java.nio.channels.Selector,通过调用select(),它使得你阻塞在多个连接上,而不是阻塞在单一的连接上。当一或多个连接准备好被接受/读/写时,select()调用便会返回。TNonblockingServer处理这些连接的时候,要么接受它,要么从它那读数据,要么把数据写到它那里,然后再次调用select()来等待下一个可用的连接。通用这种方式,server可同时服务多个客户端,而不会出现一个客户端把其他客户端全部“饿死”的情况。
    使用方法:

    public static void nonBlock() throws TTransportException {
        TNonblockingServerSocket serverSocket = new TNonblockingServerSocket(8912);
        Hello.Processor processor = new Hello.Processor(new HelloServiceImpl());
        TNonblockingServer.Args args = new TNonblockingServer.Args(serverSocket);
        args.transportFactory(new TFramedTransport.Factory());
        args.protocolFactory(new TCompactProtocol.Factory());
        args.processor(processor);
        TNonblockingServer server = new TNonblockingServer(args);
        server.serve();
    }
    
    • ThreadedSelectorServer

    ThreadedSelectorServer允许你用多个线程来处理网络 I/O。它维护了两个线程池,一个用来处理网络 I/O,另一个用来进行请求的处理。使用方法:

    public static void threadSelector() throws TTransportException, IOException {
        TNonblockingServerSocket serverSocket = new TNonblockingServerSocket(8912);
        Hello.Processor processor = new Hello.Processor(new HelloServiceImpl());
        TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(serverSocket);
        args.transportFactory(new TFramedTransport.Factory());
        args.protocolFactory(new TBinaryProtocol.Factory());
        args.selectorThreads(10);
        args.acceptQueueSizePerThread(10);
        args.processor(processor);
        TThreadedSelectorServer server = new TThreadedSelectorServer(args);
        server.serve();
    }
    
    • TThreadPoolServer

    TThreadPoolServer有一个专用的线程用来接受连接旦接受了一个连接,它就会被放入ThreadPoolExecutor中的一个 worker 线程里处理。worker 线程被绑定到特定的客户端连接上,直到它关闭。一旦连接关闭,该worker线程就又回到了线程池中。你可以配置线程池的最小、最大线程数,默认值分别是5(最小)和Integer.MAX_VALUE(最大)。使用方法:

    public static void threadPool() throws Exception {
        TNonblockingServerSocket serverSocket = new TNonblockingServerSocket(8912);
        Hello.Processor processor = new Hello.Processor(new HelloServiceImpl());
        TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverSocket);
        args.transportFactory(new TFramedTransport.Factory());
        args.protocolFactory(new TBinaryProtocol.Factory());
        args.processor(processor);
        args.maxWorkerThreads(10);
        TThreadPoolServer server = new TThreadPoolServer(args);
        server.serve();
    }
    

    相关文章

      网友评论

          本文标题:Thrift server比较和使用

          本文链接:https://www.haomeiwen.com/subject/lfxpjxtx.html