美文网首页
fbthrift--ThriftServer解析

fbthrift--ThriftServer解析

作者: zlcook | 来源:发表于2020-09-01 20:27 被阅读0次

    使用案例

    GraphDaemon.cpp

    /*
    DEFINE_int32(num_netio_threads, 0,
                    "Number of networking threads, 0 for number of physical CPU cores");
    DEFINE_int32(num_accept_threads, 1, "Number of threads to accept incoming connections");
    DEFINE_int32(num_worker_threads, 0, "Number of threads to execute user queries");
    DEFINE_bool(reuse_port, true, "Whether to turn on the SO_REUSEPORT option");
    DEFINE_int32(listen_backlog, 1024, "Backlog of the listen socket");
    */
    
       auto threadFactory = std::make_shared<folly::NamedThreadFactory>("graph-netio");
        auto ioThreadPool = std::make_shared<folly::IOThreadPoolExecutor>(
                                FLAGS_num_netio_threads, std::move(threadFactory));
        gServer = std::make_unique<apache::thrift::ThriftServer>();
        gServer->setIOThreadPool(ioThreadPool);
    
        auto interface = std::make_shared<GraphService>();
        status = interface->init(ioThreadPool);
        if (!status.ok()) {
            LOG(ERROR) << status;
            return EXIT_FAILURE;
        }
    
        gServer->setInterface(std::move(interface));
        gServer->setAddress(localIP, FLAGS_port);
        // fbthrift-2018.08.20 always enables SO_REUSEPORT once `setReusePort' is called
        // which had been fixed in later version.
        if (FLAGS_reuse_port) {
            gServer->setReusePort(FLAGS_reuse_port);
        }
        gServer->setIdleTimeout(std::chrono::seconds(FLAGS_client_idle_timeout_secs));
        gServer->setNumCPUWorkerThreads(FLAGS_num_worker_threads);
        gServer->setCPUWorkerThreadName("executor");
        gServer->setNumAcceptThreads(FLAGS_num_accept_threads);
        gServer->setListenBacklog(FLAGS_listen_backlog);
        gServer->setThreadStackSizeMB(5);
    
        FLOG_INFO("Starting nebula-graphd on %s:%d\n", localIP.c_str(), FLAGS_port);
        try {
            gServer->serve();  // Blocking wait until shut down via gServer->stop()
        } catch (const std::exception &e) {
            FLOG_ERROR("Exception thrown while starting the RPC server: %s", e.what());
            return EXIT_FAILURE;
        }
    

    Thrift中的工作模型

    ThriftServer.h 相关option设置

    • ThriftServer服务端,关于其部分方法说明可参考: ThriftServer
    可选:
    auto handler = std::make_shared<ServiceHandler>(kvStore);
    auto proc_factory = std::make_shared<ThriftServerAsyncProcessorFactory<ServiceHandler>>(
                               handler);
    server->setProcessorFactory(proc_factory);
    
    • ip,端口 : setAddress

    • setIOThreadPool: 配置IO线程池, 线程池用来处理网络IO任务,从连接中读取请求数据并反序列化,然后就交给CPUWorker处理,将结果数据序列化并通过连接发送出去。

    • setNumCPUWorkerThreads: 配置cpu线程数,负责处理用户的业务逻辑,如果请求过多可以调大该值,和线程名称:用来处理非IO任务,比如计算任务:setNumCPUWorkerThreads、setCPUWorkerThreadName。 setNumCPUWorkerThreads和setThreadManager是互斥,只能选择设置一个。因为setNumCPUWorkerThreads设置后,内部会创建ThreadManager。

    serve方法中:根据setNumCPUWorkerThreads的结果(getNumCPUWorkerThreads)
    创建ThreadManager然后调用setThreadManager。
    
    void ThriftServer::setupThreadManager() {
      if (!threadManager_) {
        std::shared_ptr<apache::thrift::concurrency::ThreadManager> threadManager(
            PriorityThreadManager::newPriorityThreadManager(
                getNumCPUWorkerThreads(), true /*stats*/));
        threadManager->enableCodel(getEnableCodel());
        // If a thread factory has been specified, use it.
        if (threadFactory_) {
          threadManager->threadFactory(threadFactory_);
        }
        auto poolThreadName = getCPUWorkerThreadName();
        if (!poolThreadName.empty()) {
          threadManager->setNamePrefix(poolThreadName);
        }
        threadManager->start();
        setThreadManager(threadManager);
      }
    }
    
    • setIdleTimeout:配置客户端连接成功后,如果连接上没有请求空闲到该值后,则断开连接,默认一直等待:
    • setListenBacklog: 配置socket的全连接队列大小,但是需要考虑系统参数net.core.somaxconn的值(默认50),最终socket的全连接队列大小选其中小的一个,当同一个时间点同时出现的连接数很大时,可以调高这两个值:setListenBacklog
    # The number of threads to execute user queries, 0 for # of CPU cores
    gServer->setNumCPUWorkerThreads(FLAGS_num_worker_threads);
    gServer->setCPUWorkerThreadName("executor");
    
    # The number of threads to accept incoming connections
    gServer->setNumAcceptThreads(FLAGS_num_accept_threads);
    

    相关文章

      网友评论

          本文标题:fbthrift--ThriftServer解析

          本文链接:https://www.haomeiwen.com/subject/gkmvsktx.html