美文网首页Ceph学习笔记
站在 rados tools 看 ceph(5)

站在 rados tools 看 ceph(5)

作者: DaemonXiao | 来源:发表于2020-12-24 09:54 被阅读0次

    4. rados_tool_common()

    rados_tool_common() 中封装了每个 rados 命令的具体处理方式。通过解析命令参数,来判断具体的处理方法。rados_tool_common() 中也做了一些基础通用操作,用于初始化 rados 和与集群建立连接。

    rados.init_with_context() 主要工作就是根据配置文件中的参数,初始化 rados。具体调用链:rados.init_with_context() -> rados_create_with_context() -> _rados_create_with_context() -> librados::RadosClient::RadosClient()。

    rados_connect() 作用是与集群建立连接,在 librados.h 中有提示注意:在调用其他相关通讯函数之前,必须调用此函数,否则会导致进程崩溃。在建立连接的过程中,还创建了6种线程:rados、ms_dispatcher、ms_local、safe_timer、fn_anonymous、fn_radosclient。后几小节将具体介绍这6个线程创建的时机与作用。

    static int rados_tool_common(const std::map < std::string, std::string > &opts,
                                 std::vector<const char*> &nargs)
    {
        ...;
        Rados rados; 
        IoCtx io_ctx; //新建了 Rados 和 IoCtx 对象,方便后续对 librados 调用
        ...;
        //解析参数,opts 是在 main 中构造的 map,里面保留了命令中所有的有效参数
        i = opts.find("create");
        if (i != opts.end()) {
            create_pool = true;
        }
        i = opts.find("pool");
        if (i != opts.end()) {
            pool_name = i->second.c_str();
        }
        ...;
        //打开 rados
        ret = rados.init_with_context(g_ceph_context);
        ...;
        ret = rados.connect();
        ...;
        //创建新 pool
        if (create_pool) {
            ret = rados.pool_create(pool_name);
            if (ret < 0) {
                cerr << "error creating pool " << pool_name << ": " << cpp_strerror(ret) << std::endl;
                return 1;
            }
        }
        ...
        //打开io 上下文
            ret = pool_name ? rados.ioctx_create(pool_name, io_ctx) : rados.ioctx_create2(pgid->pool(), io_ctx);
    
        
        //通过判断传入的第一个参数命令,来决定调用什么方法,如果第一个参数为“bench”(rados bench ...),则进入一下代码模块。
        else if (strcmp(nargs[0], "bench") == 0) {
            ...;
        }
    }
    

    4.1 rados.connect()

    先来解析下 rados_connect() 的调用链,找到具体的函数方法。rados.connect() -> client->connect() -> librados::RadosClient::connect()。实际上,connect() 进行网络通信需要做的一些预处理,尽管它的名字叫做 connnect “链接”,但是实际的网络链接是发生在后面的操作中:_op_submit() -> _get_session() -> messenger->get_connection()。

    这里简单介绍下一个完整的 client 端发送消息的流程。
    第一步:创建 messenger 并设置策略,Messenger::create() 和 messenger->set_default_policy()。
    第二步:创建 objecter,new Object()。
    第三步:设置消息分发器, messenger->add_dispatcher_head() 和 messenger->add_dispatcher_tail()。
    第四步:启动消息,messenger->start()。
    第五步:发送请求 objecter->op_submit(),在发送请求之前,要把数据封装成 Objecter::Op 对象。

    在 connect() 方法中,完成了前四步的过程,相当于为发送消息做了预处理,后续再发送请求时只需要调用 op_submit() 即可。此外,connect() 中还做了 monclient、mgrclient 和 timer 的初始化工作。其中 monclient 的初始化函数 init() 中将 monclient 实例对象注册到 dispatcher 中心(通过 add_dispatcher_head() 方法,后文有介绍),创建路由密钥,启动定时器 timer 和开启 finisher 线程等。mgrclient 的初始化函数 init() 中做的工作就简单许多,只是启动了定时器。

    int librados::RadosClient::connect()
    {
      //获取monmap并配置
      {
        MonClient mc_bootstrap(cct);
        err = mc_bootstrap.get_monmap_and_config();
        if (err < 0)
          return err; 
      }
    
      // get monmap
      err = monclient.build_initial_monmap();
      if (err < 0)
        goto out;
    
      //新建 messenger
      messenger = Messenger::create_client_messenger(cct, "radosclient");
    
      //设置messenger策略
      // require OSDREPLYMUX feature.  this means we will fail to talk to
      // old servers.  this is necessary because otherwise we won't know
      // how to decompose the reply data into its constituent pieces.
      messenger->set_default_policy(Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX));
    
      //创建object,用来处理发送的数据
      objecter = new (std::nothrow) Objecter(cct, messenger, &monclient,
                  &finisher,
                  cct->_conf->rados_mon_op_timeout,
                  cct->_conf->rados_osd_op_timeout);
      if (!objecter)
        goto out;
      // 根据配置,设置启用 throttle 来控制发出的op数
      objecter->set_balanced_budget();
      //为 monclient 和 mgrclient 提供消息通信层实例
      monclient.set_messenger(messenger);
      mgrclient.set_messenger(messenger);
      //初始化objecter perfcounter 并提供能够查询正在处理的op状况的钩子
      objecter->init();
      //为消息层实例添加Dispatcher
      messenger->add_dispatcher_head(&mgrclient);
      messenger->add_dispatcher_tail(objecter);
      messenger->add_dispatcher_tail(this);
      //启动messenger实例
      messenger->start();
    
      ldout(cct, 1) << "setting wanted keys" << dendl;
      monclient.set_want_keys(
          CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MGR);
      ldout(cct, 1) << "calling monclient init" << dendl;
      //初始化 monclient
      err = monclient.init();
      if (err) {
        ldout(cct, 0) << conf->name << " initialization error " << cpp_strerror(-err) << dendl;
        shutdown();
        goto out;
      }
        
      //验证客户端的权限
      err = monclient.authenticate(conf->client_mount_timeout);
      if (err) {
        ldout(cct, 0) << conf->name << " authentication error " << cpp_strerror(-err) << dendl;
        shutdown();
        goto out;
      }
      messenger->set_myname(entity_name_t::CLIENT(monclient.get_global_id()));
    
      // Detect older cluster, put mgrclient into compatible mode
      mgrclient.set_mgr_optional(
          !get_required_monitor_features().contains_all(
            ceph::features::mon::FEATURE_LUMINOUS));
    
      // MgrClient needs this (it doesn't have MonClient reference itself)
      monclient.sub_want("mgrmap", 0, 0);
      monclient.renew_subs();
    
      if (service_daemon) {
        ldout(cct, 10) << __func__ << " registering as " << service_name << "."
               << daemon_name << dendl;
        mgrclient.service_daemon_register(service_name, daemon_name,
                          daemon_metadata);
      }
        
      //初始化 mgrclient
      mgrclient.init();
    
      objecter->set_client_incarnation(0);
      objecter->start();
      lock.Lock();
      //初始化timer
      timer.init();
    
      finisher.start();
    
      //状态更新为已连接
      state = CONNECTED;
      instance_id = monclient.get_global_id();
    
      return err;
    }
    

    4.1.1 ms_dispatcher 与 ms_local

    dispatcher 是消息分发中心,所有收到的消息都经由该模块,并由该模块转发给相应的处理模块(moncliet、mdsclient、osd等)。其实现方式比较简单,就是把所有的模块及其处理消息的方法 handle 注册到分发中心,具体函数为 add_dispatcher_head/tail(),这样就像 dispaatcher_queue 中添加了指定模块。后续在分发消息时,对 dispatcher_queue 进行轮询,直到有一个处理模块能够处理该消息,通过 message->get_type() 来指定消息的处理函数。所有的消息分发都在 dispatcher 线程中完成。

    在 add_dispatcher_head() 和 add_dispatcher_tail() 函数中,都做了 dispatcher 队列是否为空的判断(通过 dispatchers.empty() == true)。如果判定结果为空,说明需要重新创建 dispatcher 线程并绑定服务端地址,加入事件中心监听端口,具体方法在 ready() 中。

      void add_dispatcher_head(Dispatcher *d) {
        bool first = dispatchers.empty();
        dispatchers.push_front(d);
        if (d->ms_can_fast_dispatch_any())
          fast_dispatchers.push_front(d);
        if (first)
          ready();
      }
    

    这里给出了 AsyncMessenger::ready() 方法。p->start()(Processor::start())方法中监听 EVENT_READABLE 事件并把事件提交到 EventCenter 事件中心,由上文介绍的 msgr-worker-x 线程去轮询事件中心的队列,监听端口是否收到消息。收到的消息则由 dispatcher 线程分发给指定的处理程序,其分发消息的接口为 ms_dispatch() 和 ms_fast_dispatch()。

    dispatch_queue.start() 中开启了消息分发线程,分别为处理外部消息的 ms_dispatch 线程和处理本地消息的 ms_local 线程。相应的,它们有各自的优先级队列(注意:分发消息的队列时有优先级的,优先级越高,发送时机越早),分别是存储外部消息的 mqueue 和本地消息队列的 local_messages。消息队列的添加方式也有两种:mqueue.enqueue() 和 local_queue.emplace()。

    void AsyncMessenger::ready()
    {
      ldout(cct,10) << __func__ << " " << get_myaddrs() << dendl;
    
      stack->ready();
      //绑定端口
      if (pending_bind) {
        int err = bindv(pending_bind_addrs);
        if (err) {
          lderr(cct) << __func__ << " postponed bind failed" << dendl;
          ceph_abort();
        }
      }
    
      Mutex::Locker l(lock);
      //调用 worker 线程,监听端口
      for (auto &&p : processors)  
        p->start();
      //开启 ms_dispatcher 和 ms_locla 线程
      dispatch_queue.start();
    }
    
    void DispatchQueue::start()
    {
      ceph_assert(!stop);
      ceph_assert(!dispatch_thread.is_started());
      //开启 ms_dispatch 和 ms_local 线程
      dispatch_thread.create("ms_dispatch");
      local_delivery_thread.create("ms_local");
    }
    

    4.1.2 safe_timer

    在 rados.connect() 中,monclient、mgrclient、radsoclient 都创建了 safe_time 线程。这其实一个定时器,每个模块都有自己的定时器(例如 mgrclient 在类中声明了:SafeTime timer 对象),通过调用 SafeTime::init() 方法来开启线程启动定时器模块。SafeTimer::timer_thread() 是 safe_time 线程方法,可以看到内部采用while 死循环,重复轮询事件表 schedule,检查是否到达任务的执行事件。任务在 schedule 中按照事件升序排列。首先检查,如果第一任务没有到事件,后面的任务就不用检查,直接 break。如果任务到了事件,则执行 callback 任务,并在 schedule 中删除该定时任务,然后继续循环。

    添加定时任务函数:add_event_after()、add_event_at()。取消任务:cancel_event()、cancel_all_events()。

    void SafeTimer::init()
    {
      ldout(cct,10) << "init" << dendl;
      //创建并开启 timer,线程名为 safe_timer
      thread = new SafeTimerThread(this);
      thread->create("safe_timer");
    }
    
    //线程方法
    void SafeTimer::timer_thread()
    {
      lock.lock();
      ldout(cct,10) << "timer_thread starting" << dendl;
      while (!stopping) {
        utime_t now = ceph_clock_now();
    
        while (!schedule.empty()) {
          scheduled_map_t::iterator p = schedule.begin();
    
          // is the future now?
          if (p->first > now)
        break;
    
          Context *callback = p->second;
          events.erase(callback);
          schedule.erase(p);
          ldout(cct,10) << "timer_thread executing " << callback << dendl;
          
          if (!safe_callbacks)
        lock.unlock();
          callback->complete(0);
          if (!safe_callbacks)
        lock.lock();
        }
    
        // recheck stopping if we dropped the lock
        if (!safe_callbacks && stopping)
          break;
    
        ldout(cct,20) << "timer_thread going to sleep" << dendl;
        if (schedule.empty())
          cond.Wait(lock);
        else
          cond.WaitUntil(lock, schedule.begin()->first);
        ldout(cct,20) << "timer_thread awake" << dendl;
      }
      ldout(cct,10) << "timer_thread exiting" << dendl;
      lock.unlock();
    }
    

    4.1.3 fn_anonymous 与 fn-radosclient

    fn_anonymous 线程是在 monclient() 构造函数中创建,在 monclient.init() 初始化函数中启动的线程。fn-radosclient 线程则是 radosclient() 构造函数中创建的线程,在 rados.connect() 中启动的线程。它们都归属于 finisher 线程,只是一个属于匿名对象,一个属于命名对象。finisher 类主要用来完成回调函数 context 的执行,通过开启新线程的方式,异步处理 context 的收尾工作。

    MonClient::MonClient(CephContext *cct_) :
      ...
      //创建 fn_anonymous 线程对象 
      finisher(cct_),
      ...
    {}
    
    librados::RadosClient::RadosClient(CephContext *cct_)
      : Dispatcher(cct_->get()),
        ...
        //fn-radosclient 线程在此初始化,后续调用 finisher.start() 开启线程
        finisher(cct, "radosclient", "fn-radosclient")
    {
    }
    

    finisher 线程的方法实体为 Finisher::finisher_thread_entry(),其主要功能为循环处理 finisher_queue 队列中的结束任务。注意:这里把 finisher_queue 中的任务提取到局部变量 ls 队列中,减少了锁的使用,提高了性能。通过调用 context 的 complete 处理方法,来执行 context 的收尾函数。可以设置多种 context 实例,并个性化它们的 complete 函数。

    添加一个 context 至完成队列:Finisher::queue()。

    void *Finisher::finisher_thread_entry()
    {
      ...
      while (!finisher_stop) {
        /// Every time we are woken up, we process the queue until it is empty.
        while (!finisher_queue.empty()) {
          // To reduce lock contention, we swap out the queue to process.
          // This way other threads can submit new contexts to complete
          // while we are working.
          //把 finisher_queue 中的任务提取到 ls。这样就不必锁住 finisher_queue,在finish 线程处理任务的同时,其他线程可以向 finisher_queue 提交任务,提高了性能。
          vector<pair<Context*,int>> ls;
          ls.swap(finisher_queue);
          ...
          // Now actually process the contexts.
          for (auto p : ls) {
         //执行 context 收尾函数
        p.first->complete(p.second);
          }
          ldout(cct, 10) << "finisher_thread done with " << ls << dendl;
          ls.clear();
          ...
        }
        ...
      }
      // If we are exiting, we signal the thread waiting in stop(),
      // otherwise it would never unblock
      finisher_empty_cond.notify_all();
      ...
      return 0;
    }
    

    相关文章

      网友评论

        本文标题:站在 rados tools 看 ceph(5)

        本文链接:https://www.haomeiwen.com/subject/abfenktx.html