4. rados_tool_common()
rados_tool_common() 中封装了每个 rados 命令的具体处理方式。通过解析命令参数,来判断具体的处理方法。rados_tool_common() 中也做了一些基础通用操作,用于初始化 rados 和与集群建立连接。
rados.init_with_context() 主要工作就是根据配置文件中的参数,初始化 rados。具体调用链:rados.init_with_context() -> rados_create_with_context() -> _rados_create_with_context() -> librados::RadosClient::RadosClient()。
rados_connect() 作用是与集群建立连接,在 librados.h 中有提示注意:在调用其他相关通讯函数之前,必须调用此函数,否则会导致进程崩溃。在建立连接的过程中,还创建了6种线程:rados、ms_dispatcher、ms_local、safe_timer、fn_anonymous、fn_radosclient。后几小节将具体介绍这6个线程创建的时机与作用。
static int rados_tool_common(const std::map < std::string, std::string > &opts,
std::vector<const char*> &nargs)
{
...;
Rados rados;
IoCtx io_ctx; //新建了 Rados 和 IoCtx 对象,方便后续对 librados 调用
...;
//解析参数,opts 是在 main 中构造的 map,里面保留了命令中所有的有效参数
i = opts.find("create");
if (i != opts.end()) {
create_pool = true;
}
i = opts.find("pool");
if (i != opts.end()) {
pool_name = i->second.c_str();
}
...;
//打开 rados
ret = rados.init_with_context(g_ceph_context);
...;
ret = rados.connect();
...;
//创建新 pool
if (create_pool) {
ret = rados.pool_create(pool_name);
if (ret < 0) {
cerr << "error creating pool " << pool_name << ": " << cpp_strerror(ret) << std::endl;
return 1;
}
}
...
//打开io 上下文
ret = pool_name ? rados.ioctx_create(pool_name, io_ctx) : rados.ioctx_create2(pgid->pool(), io_ctx);
//通过判断传入的第一个参数命令,来决定调用什么方法,如果第一个参数为“bench”(rados bench ...),则进入一下代码模块。
else if (strcmp(nargs[0], "bench") == 0) {
...;
}
}
4.1 rados.connect()
先来解析下 rados_connect() 的调用链,找到具体的函数方法。rados.connect() -> client->connect() -> librados::RadosClient::connect()。实际上,connect() 进行网络通信需要做的一些预处理,尽管它的名字叫做 connnect “链接”,但是实际的网络链接是发生在后面的操作中:_op_submit() -> _get_session() -> messenger->get_connection()。
这里简单介绍下一个完整的 client 端发送消息的流程。
第一步:创建 messenger 并设置策略,Messenger::create() 和 messenger->set_default_policy()。
第二步:创建 objecter,new Object()。
第三步:设置消息分发器, messenger->add_dispatcher_head() 和 messenger->add_dispatcher_tail()。
第四步:启动消息,messenger->start()。
第五步:发送请求 objecter->op_submit(),在发送请求之前,要把数据封装成 Objecter::Op 对象。
在 connect() 方法中,完成了前四步的过程,相当于为发送消息做了预处理,后续再发送请求时只需要调用 op_submit() 即可。此外,connect() 中还做了 monclient、mgrclient 和 timer 的初始化工作。其中 monclient 的初始化函数 init() 中将 monclient 实例对象注册到 dispatcher 中心(通过 add_dispatcher_head() 方法,后文有介绍),创建路由密钥,启动定时器 timer 和开启 finisher 线程等。mgrclient 的初始化函数 init() 中做的工作就简单许多,只是启动了定时器。
int librados::RadosClient::connect()
{
//获取monmap并配置
{
MonClient mc_bootstrap(cct);
err = mc_bootstrap.get_monmap_and_config();
if (err < 0)
return err;
}
// get monmap
err = monclient.build_initial_monmap();
if (err < 0)
goto out;
//新建 messenger
messenger = Messenger::create_client_messenger(cct, "radosclient");
//设置messenger策略
// require OSDREPLYMUX feature. this means we will fail to talk to
// old servers. this is necessary because otherwise we won't know
// how to decompose the reply data into its constituent pieces.
messenger->set_default_policy(Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX));
//创建object,用来处理发送的数据
objecter = new (std::nothrow) Objecter(cct, messenger, &monclient,
&finisher,
cct->_conf->rados_mon_op_timeout,
cct->_conf->rados_osd_op_timeout);
if (!objecter)
goto out;
// 根据配置,设置启用 throttle 来控制发出的op数
objecter->set_balanced_budget();
//为 monclient 和 mgrclient 提供消息通信层实例
monclient.set_messenger(messenger);
mgrclient.set_messenger(messenger);
//初始化objecter perfcounter 并提供能够查询正在处理的op状况的钩子
objecter->init();
//为消息层实例添加Dispatcher
messenger->add_dispatcher_head(&mgrclient);
messenger->add_dispatcher_tail(objecter);
messenger->add_dispatcher_tail(this);
//启动messenger实例
messenger->start();
ldout(cct, 1) << "setting wanted keys" << dendl;
monclient.set_want_keys(
CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MGR);
ldout(cct, 1) << "calling monclient init" << dendl;
//初始化 monclient
err = monclient.init();
if (err) {
ldout(cct, 0) << conf->name << " initialization error " << cpp_strerror(-err) << dendl;
shutdown();
goto out;
}
//验证客户端的权限
err = monclient.authenticate(conf->client_mount_timeout);
if (err) {
ldout(cct, 0) << conf->name << " authentication error " << cpp_strerror(-err) << dendl;
shutdown();
goto out;
}
messenger->set_myname(entity_name_t::CLIENT(monclient.get_global_id()));
// Detect older cluster, put mgrclient into compatible mode
mgrclient.set_mgr_optional(
!get_required_monitor_features().contains_all(
ceph::features::mon::FEATURE_LUMINOUS));
// MgrClient needs this (it doesn't have MonClient reference itself)
monclient.sub_want("mgrmap", 0, 0);
monclient.renew_subs();
if (service_daemon) {
ldout(cct, 10) << __func__ << " registering as " << service_name << "."
<< daemon_name << dendl;
mgrclient.service_daemon_register(service_name, daemon_name,
daemon_metadata);
}
//初始化 mgrclient
mgrclient.init();
objecter->set_client_incarnation(0);
objecter->start();
lock.Lock();
//初始化timer
timer.init();
finisher.start();
//状态更新为已连接
state = CONNECTED;
instance_id = monclient.get_global_id();
return err;
}
4.1.1 ms_dispatcher 与 ms_local
dispatcher 是消息分发中心,所有收到的消息都经由该模块,并由该模块转发给相应的处理模块(moncliet、mdsclient、osd等)。其实现方式比较简单,就是把所有的模块及其处理消息的方法 handle 注册到分发中心,具体函数为 add_dispatcher_head/tail(),这样就像 dispaatcher_queue 中添加了指定模块。后续在分发消息时,对 dispatcher_queue 进行轮询,直到有一个处理模块能够处理该消息,通过 message->get_type() 来指定消息的处理函数。所有的消息分发都在 dispatcher 线程中完成。
在 add_dispatcher_head() 和 add_dispatcher_tail() 函数中,都做了 dispatcher 队列是否为空的判断(通过 dispatchers.empty() == true)。如果判定结果为空,说明需要重新创建 dispatcher 线程并绑定服务端地址,加入事件中心监听端口,具体方法在 ready() 中。
void add_dispatcher_head(Dispatcher *d) {
bool first = dispatchers.empty();
dispatchers.push_front(d);
if (d->ms_can_fast_dispatch_any())
fast_dispatchers.push_front(d);
if (first)
ready();
}
这里给出了 AsyncMessenger::ready() 方法。p->start()(Processor::start())方法中监听 EVENT_READABLE 事件并把事件提交到 EventCenter 事件中心,由上文介绍的 msgr-worker-x 线程去轮询事件中心的队列,监听端口是否收到消息。收到的消息则由 dispatcher 线程分发给指定的处理程序,其分发消息的接口为 ms_dispatch() 和 ms_fast_dispatch()。
dispatch_queue.start() 中开启了消息分发线程,分别为处理外部消息的 ms_dispatch 线程和处理本地消息的 ms_local 线程。相应的,它们有各自的优先级队列(注意:分发消息的队列时有优先级的,优先级越高,发送时机越早),分别是存储外部消息的 mqueue 和本地消息队列的 local_messages。消息队列的添加方式也有两种:mqueue.enqueue() 和 local_queue.emplace()。
void AsyncMessenger::ready()
{
ldout(cct,10) << __func__ << " " << get_myaddrs() << dendl;
stack->ready();
//绑定端口
if (pending_bind) {
int err = bindv(pending_bind_addrs);
if (err) {
lderr(cct) << __func__ << " postponed bind failed" << dendl;
ceph_abort();
}
}
Mutex::Locker l(lock);
//调用 worker 线程,监听端口
for (auto &&p : processors)
p->start();
//开启 ms_dispatcher 和 ms_locla 线程
dispatch_queue.start();
}
void DispatchQueue::start()
{
ceph_assert(!stop);
ceph_assert(!dispatch_thread.is_started());
//开启 ms_dispatch 和 ms_local 线程
dispatch_thread.create("ms_dispatch");
local_delivery_thread.create("ms_local");
}
4.1.2 safe_timer
在 rados.connect() 中,monclient、mgrclient、radsoclient 都创建了 safe_time 线程。这其实一个定时器,每个模块都有自己的定时器(例如 mgrclient 在类中声明了:SafeTime timer 对象),通过调用 SafeTime::init() 方法来开启线程启动定时器模块。SafeTimer::timer_thread() 是 safe_time 线程方法,可以看到内部采用while 死循环,重复轮询事件表 schedule,检查是否到达任务的执行事件。任务在 schedule 中按照事件升序排列。首先检查,如果第一任务没有到事件,后面的任务就不用检查,直接 break。如果任务到了事件,则执行 callback 任务,并在 schedule 中删除该定时任务,然后继续循环。
添加定时任务函数:add_event_after()、add_event_at()。取消任务:cancel_event()、cancel_all_events()。
void SafeTimer::init()
{
ldout(cct,10) << "init" << dendl;
//创建并开启 timer,线程名为 safe_timer
thread = new SafeTimerThread(this);
thread->create("safe_timer");
}
//线程方法
void SafeTimer::timer_thread()
{
lock.lock();
ldout(cct,10) << "timer_thread starting" << dendl;
while (!stopping) {
utime_t now = ceph_clock_now();
while (!schedule.empty()) {
scheduled_map_t::iterator p = schedule.begin();
// is the future now?
if (p->first > now)
break;
Context *callback = p->second;
events.erase(callback);
schedule.erase(p);
ldout(cct,10) << "timer_thread executing " << callback << dendl;
if (!safe_callbacks)
lock.unlock();
callback->complete(0);
if (!safe_callbacks)
lock.lock();
}
// recheck stopping if we dropped the lock
if (!safe_callbacks && stopping)
break;
ldout(cct,20) << "timer_thread going to sleep" << dendl;
if (schedule.empty())
cond.Wait(lock);
else
cond.WaitUntil(lock, schedule.begin()->first);
ldout(cct,20) << "timer_thread awake" << dendl;
}
ldout(cct,10) << "timer_thread exiting" << dendl;
lock.unlock();
}
4.1.3 fn_anonymous 与 fn-radosclient
fn_anonymous 线程是在 monclient() 构造函数中创建,在 monclient.init() 初始化函数中启动的线程。fn-radosclient 线程则是 radosclient() 构造函数中创建的线程,在 rados.connect() 中启动的线程。它们都归属于 finisher 线程,只是一个属于匿名对象,一个属于命名对象。finisher 类主要用来完成回调函数 context 的执行,通过开启新线程的方式,异步处理 context 的收尾工作。
MonClient::MonClient(CephContext *cct_) :
...
//创建 fn_anonymous 线程对象
finisher(cct_),
...
{}
librados::RadosClient::RadosClient(CephContext *cct_)
: Dispatcher(cct_->get()),
...
//fn-radosclient 线程在此初始化,后续调用 finisher.start() 开启线程
finisher(cct, "radosclient", "fn-radosclient")
{
}
finisher 线程的方法实体为 Finisher::finisher_thread_entry(),其主要功能为循环处理 finisher_queue 队列中的结束任务。注意:这里把 finisher_queue 中的任务提取到局部变量 ls 队列中,减少了锁的使用,提高了性能。通过调用 context 的 complete 处理方法,来执行 context 的收尾函数。可以设置多种 context 实例,并个性化它们的 complete 函数。
添加一个 context 至完成队列:Finisher::queue()。
void *Finisher::finisher_thread_entry()
{
...
while (!finisher_stop) {
/// Every time we are woken up, we process the queue until it is empty.
while (!finisher_queue.empty()) {
// To reduce lock contention, we swap out the queue to process.
// This way other threads can submit new contexts to complete
// while we are working.
//把 finisher_queue 中的任务提取到 ls。这样就不必锁住 finisher_queue,在finish 线程处理任务的同时,其他线程可以向 finisher_queue 提交任务,提高了性能。
vector<pair<Context*,int>> ls;
ls.swap(finisher_queue);
...
// Now actually process the contexts.
for (auto p : ls) {
//执行 context 收尾函数
p.first->complete(p.second);
}
ldout(cct, 10) << "finisher_thread done with " << ls << dendl;
ls.clear();
...
}
...
}
// If we are exiting, we signal the thread waiting in stop(),
// otherwise it would never unblock
finisher_empty_cond.notify_all();
...
return 0;
}
网友评论