使用案例
/*
DEFINE_int32(num_netio_threads, 0,
"Number of networking threads, 0 for number of physical CPU cores");
DEFINE_int32(num_accept_threads, 1, "Number of threads to accept incoming connections");
DEFINE_int32(num_worker_threads, 0, "Number of threads to execute user queries");
DEFINE_bool(reuse_port, true, "Whether to turn on the SO_REUSEPORT option");
DEFINE_int32(listen_backlog, 1024, "Backlog of the listen socket");
*/
auto threadFactory = std::make_shared<folly::NamedThreadFactory>("graph-netio");
auto ioThreadPool = std::make_shared<folly::IOThreadPoolExecutor>(
FLAGS_num_netio_threads, std::move(threadFactory));
gServer = std::make_unique<apache::thrift::ThriftServer>();
gServer->setIOThreadPool(ioThreadPool);
auto interface = std::make_shared<GraphService>();
status = interface->init(ioThreadPool);
if (!status.ok()) {
LOG(ERROR) << status;
return EXIT_FAILURE;
}
gServer->setInterface(std::move(interface));
gServer->setAddress(localIP, FLAGS_port);
// fbthrift-2018.08.20 always enables SO_REUSEPORT once `setReusePort' is called
// which had been fixed in later version.
if (FLAGS_reuse_port) {
gServer->setReusePort(FLAGS_reuse_port);
}
gServer->setIdleTimeout(std::chrono::seconds(FLAGS_client_idle_timeout_secs));
gServer->setNumCPUWorkerThreads(FLAGS_num_worker_threads);
gServer->setCPUWorkerThreadName("executor");
gServer->setNumAcceptThreads(FLAGS_num_accept_threads);
gServer->setListenBacklog(FLAGS_listen_backlog);
gServer->setThreadStackSizeMB(5);
FLOG_INFO("Starting nebula-graphd on %s:%d\n", localIP.c_str(), FLAGS_port);
try {
gServer->serve(); // Blocking wait until shut down via gServer->stop()
} catch (const std::exception &e) {
FLOG_ERROR("Exception thrown while starting the RPC server: %s", e.what());
return EXIT_FAILURE;
}
Thrift中的工作模型
- apache的Thrift(二):各种 Server 实现
- Fbthrift: 单acceptor + 多io线程池 + 多业务线程池
ThriftServer.h 相关option设置
- ThriftServer服务端,关于其部分方法说明可参考: ThriftServer
- 配置服务端的处理器: setInterface, 每次新的请求都使用同一个实例来处理。参考:AsyncProcessorFactory 的getProcessor返回一个新的实例
可选:
auto handler = std::make_shared<ServiceHandler>(kvStore);
auto proc_factory = std::make_shared<ThriftServerAsyncProcessorFactory<ServiceHandler>>(
handler);
server->setProcessorFactory(proc_factory);
-
ip,端口 : setAddress
-
setIOThreadPool: 配置IO线程池, 线程池用来处理网络IO任务,从连接中读取请求数据并反序列化,然后就交给CPUWorker处理,将结果数据序列化并通过连接发送出去。
-
setNumCPUWorkerThreads: 配置cpu线程数,负责处理用户的业务逻辑,如果请求过多可以调大该值,和线程名称:用来处理非IO任务,比如计算任务:setNumCPUWorkerThreads、setCPUWorkerThreadName。 setNumCPUWorkerThreads和setThreadManager是互斥,只能选择设置一个。因为setNumCPUWorkerThreads设置后,内部会创建ThreadManager。
serve方法中:根据setNumCPUWorkerThreads的结果(getNumCPUWorkerThreads)
创建ThreadManager然后调用setThreadManager。
void ThriftServer::setupThreadManager() {
if (!threadManager_) {
std::shared_ptr<apache::thrift::concurrency::ThreadManager> threadManager(
PriorityThreadManager::newPriorityThreadManager(
getNumCPUWorkerThreads(), true /*stats*/));
threadManager->enableCodel(getEnableCodel());
// If a thread factory has been specified, use it.
if (threadFactory_) {
threadManager->threadFactory(threadFactory_);
}
auto poolThreadName = getCPUWorkerThreadName();
if (!poolThreadName.empty()) {
threadManager->setNamePrefix(poolThreadName);
}
threadManager->start();
setThreadManager(threadManager);
}
}
- setIdleTimeout:配置客户端连接成功后,如果连接上没有请求空闲到该值后,则断开连接,默认一直等待:
- setListenBacklog: 配置socket的全连接队列大小,但是需要考虑系统参数
net.core.somaxconn
的值(默认50),最终socket的全连接队列大小选其中小的一个,当同一个时间点同时出现的连接数很大时,可以调高这两个值:setListenBacklog
# The number of threads to execute user queries, 0 for # of CPU cores
gServer->setNumCPUWorkerThreads(FLAGS_num_worker_threads);
gServer->setCPUWorkerThreadName("executor");
# The number of threads to accept incoming connections
gServer->setNumAcceptThreads(FLAGS_num_accept_threads);
网友评论