美文网首页Ceph学习笔记
站在 rados tools 看 ceph(3)

站在 rados tools 看 ceph(3)

作者: DaemonXiao | 来源:发表于2020-12-23 10:14 被阅读0次

    2. global_init()

    global_init() 是全局初始化函数,所有的 ceph 相关进程(rados,ceph,rbd等)都需要执行该操作,msgr-worker 和 log 线程都是在这一步创建的。该函数主要目的是进行参数的初始化解析工作、创建 CephContext、开启一些基础线程以及执行一些通用的预处理工作。

    run_pre_init 标志位默认是 true,所以通常情况下,都会执行 global_pre_init()。下文会详细介绍。

    block_signals() ,此函数屏蔽了 siglist[] 中的信号,这里是 SIGPIPE 信号。SIGPIPE 信号是服务端断开链接后发送给客户端的信号,一般来说客户端收到 SIGPIPE 信号会立刻中断进程。Ceph 不希望出现客户端突然中断,所以这里屏蔽了该信号。install_standard_sighandlers() 则是制定了某些信号的处理程序,包括:SIGSEGV、SIGABRT、SIGBUS、SIGILL、SIGFPE、SIGXCPU、SIGCFSZ、SIGSYS,具体处理方法见 handle_fatal_signal() 函数(本文这里不做分析)。

    g_ceph_context->_log->set_flush_on_exit() 向退出函数注册 log_on_exit() 回调函数,该函数旨在退出时清空 log 指针,避免内存泄漏。

    在 getuid() 循环体中,如果当前是 root 用户,则更改配置文件中的 userid 和 gruopid 属性为0(最高权限),并设置环境变量 HOME 为当前家目录。

    globla_init() 最终返回一个 g_ceph_context 指针,该指针实际指向一个 CephContext 对象。CephContext表示由单个库用户持有的上下文。在同一个进程中可以有多个CephContexts。对于守护进程和实际程序(如本文最开始的 rados bench...),只有一个CephContext。CephContext包含配置、dout对象以及您可能希望在每次函数调用时传递给libcommon的任何其他内容。

    boost::intrusive_ptr<CephContext>
    global_init(const std::map<std::string,std::string> *defaults,
            std::vector < const char* >& args,
            uint32_t module_type, code_environment_t code_env,
            int flags,
            const char *data_dir_option, bool run_pre_init)
    {
        //标志位,避免一个进程中多次执行 global_init()
        static bool first_run = true;
        //执行 global_pre_init()...
        if (run_pre_init) {
            ceph_assert(!g_ceph_context && first_run);
            global_pre_init(defaults, args, module_type, code_env, flags);
        } else {
            ceph_assert(g_ceph_context && first_run);
        }
        //更改标志位,避免重复执行global_init()
        first_run = false;
        //屏蔽 SIGPIPE 信号
        int siglist[] = { SIGPIPE, 0 };
        block_signals(siglist, NULL);
        //加载信号处理器
        if (g_conf()->fatal_signal_handlers) {
            install_standard_sighandlers();
        }
        //退出时删除 log 指针,注意是内存中的 log 指针,不是日志文件
        if (g_conf()->log_flush_on_exit)
            g_ceph_context->_log->set_flush_on_exit();
        
        //读取 --setuser 配置参数,如果当前不是 root 则什么也不做,如果时 root,则根据配置参数更改
        if (getuid() != 0) {
            ...
        }
        else if (g_conf()->setgroup.length() ||
                 g_conf()->setuser.length()) {
            ...
            g_ceph_context->set_uid_gid(uid, gid);
            g_ceph_context->set_uid_gid_strings(uid_string, gid_string);
            ...
            setenv("HOME", home_directory.c_str(), 1)
            ...
        }
        //生效更改的配置
        // Expand metavariables. Invoke configuration observers. Open log file.
        g_conf().apply_changes(nullptr);
    
        // call all observers now.  this has the side-effect of configuring
        // and opening the log file immediately.
        g_conf().call_all_observers();
        //打印先前发生的错误到日志文件中
        g_conf().complain_about_parse_errors(g_ceph_context);
        ...
        //内存泄漏检测
        if (g_conf()->debug_deliberately_leak_memory) {
            derr << "deliberately leaking some memory" << dendl;
            char *s = new char[1234567];
            (void)s;
            // cppcheck-suppress memleak
        } 
        ...
        return boost::intrusive_ptr<CephContext>{g_ceph_context, false};
    }
    

    2.1 global_pre_init()

    global_pre_init() 是 global_init() 的预执行步骤,主要解析配置、启动 MonClient、启动 log 线程。这里的解析配置的顺序是:部分命令行参数、默认配置、配置文件 ceph.conf、环境变量、命令行参数。后一步的配置可以覆盖前一步获得的配置参数。因此命令行参数的优先级最高,默认配置的优先级最低。

    ceph_argparse_early_args() 就是用来预解释命令含参数的,包括:conf、cluster、id、name。此外遇到“--version”则会直接打印版本号,然后退出程序;遇到“--show_args”则会打印出所有的命令行参数。

    common_preinit() 返回了一个 CephContext 实例,对于一个实例进程来说,贯穿一生的唯一 CephContext 其实就是在这里创建的,上文 global_init() 返回的 g_ceph_context 指针实际就是这个 CephContext 对象。其中包括了 log、配置参数、网络通信等等一个 Ceph 进程所包含的各种模块的信息或者实例指针。

    conf.parse_config_files() 读取配置文件的参数。默认使用 /etc/ceph/ceph.conf。conf.parse_env() 从环境读取“CEPH_ARGS”参数。conf.parse_argv() 再从命令行读取配置参数。

    cct->_log->start() 启动 log 线程,开启日志。

    mc_bootstrap.get_monmap_and_config() 配置网络通信,并开启3个 msgr_worker 线程。

    do_argv_commands() 执行 --show-config[-val] 命令,打印指定参数值。 g_conf().complain_about_parse_errors() 则是向日志中写入错误信息。因为日志刚刚开启,所以没有在一开始发生错误的时候就写日志,而是收集起来,最后一起写入日志文件。

    void global_pre_init(
      const std::map<std::string,std::string> *defaults,
      std::vector < const char* >& args,
      uint32_t module_type, code_environment_t code_env,
      int flags)
    {
        ...
        //预读取命令行参数
        CephInitParameters iparams = ceph_argparse_early_args(
        args, module_type,
        &cluster, &conf_file_list);
        ...
        //创建 CephContext
        CephContext *cct = common_preinit(iparams, code_env, flags);
        cct->_conf->cluster = cluster;
        global_init_set_globals(cct);
        auto& conf = cct->_conf;
        ...
        //读取配置文件参数
        int ret = conf.parse_config_files(c_str_or_null(conf_file_list),&cerr, flags);
        ...
        //读取环境变量参数
        conf.parse_env(cct->get_module_type());
        //读取命令行参数
        conf.parse_argv(args);
        //开启 log
        if (conf->log_early &&!cct->_log->is_started()) {
            cct->_log->start();
        }
        //开启网络通信
        if (!conf->no_mon_config) {
            conf.apply_changes(nullptr);
            MonClient mc_bootstrap(g_ceph_context);
            if (mc_bootstrap.get_monmap_and_config() < 0) {
                cct->_log->flush();
                cerr << "failed to fetch mon config (--no-mon-config to skip)"
                    << std::endl;
                _exit(1);
            }
        }
        // do the --show-config[-val], if present in argv
        conf.do_argv_commands();
    
        // Now we're ready to complain about config file parse errors
       g_conf().complain_about_parse_errors(g_ceph_context); 
    }
    

    2.1.1 log

    Ceph 对每个子系统的日志都预先定义了日志级别,支持动态修改。每条 log 都带有日志级别,低于预先定义的级别才会被打印,预定义级别可以在配置文件中修改,并且可以针对某个模块,如 osd、rbd 等。log 模块有单独的线程,只需要向该线程提交 job 即可,是异步的过程,对系统的性能影响很小。

    下面给出了 log 类的头文件,里面定义了 log 所需的变量以及方法。DEFAULT_MAX_NEW =100: 表示新建日志的队列深度为100条,DEFAULT_MAX_RECENT=10000 表示最多记录最近写的10000条日志记录。

    log 类其实可以简单的通过三个队列来理解。m_new 储存新加入的待写日志,m_recent 存储最近写完的日志,m_flush 表示即将写入日志文件的日志。就像是3个杯子,首先向 new 杯子倒水,如果不把 new 杯子里的水倒掉,最多倒100次 new 杯子就满了。new 杯子的水则倒向向 flush 杯子,flush杯子像是一个临时的中转杯,水在这里短暂停留,一旦用小本本记下了这杯水,就立刻把 flush 杯子的水倒入 recent 杯子。看起来 recent 杯子好像是个垃圾桶,明明都已经记载在小本上了,却还要留着这杯水,这是为了方便我们后续可以直接通过 recent 杯子查看倒了多少水、什么水。但是 recent 杯子也不是无底洞,它最多可以存10000次,满了就把杯底的开关打开,把水从底部放掉,这样可以优先把最不新鲜的水倒掉。

    log 线程则通过 start() 来创建,具体调用链:cct->_log->start() -> Thread::create() -> Thread::try_create() -> pthread_create()。线程具体方法则是 *Log::entry()。

    class Log : private Thread
    {
      ...
      static const std::size_t DEFAULT_MAX_NEW = 100;
      static const std::size_t DEFAULT_MAX_RECENT = 10000;
      ...
      //三个队列,分别是最新的entries(待写日志)队列,最近写完的日志队列和即将写的日志队列(m_new flush后得到)
      EntryVector m_new;    ///< new entries
      EntryRing m_recent; ///< recent (less new) entries we've already written at low detail
      EntryVector m_flush; ///< entries to be flushed (here to optimize heap allocations)
      ...  
    
      //线程入口
      void *entry() override;
      ...
    
    public:
      using Thread::is_started;
    
      Log(const SubsystemMap *s);
      ~Log() override;
      //退出时 flush 一次,清空 m_new,保证写入所有日志
      void set_flush_on_exit();
      ...
      //刷新
      void flush();
      //打印最近的日志
      void dump_recent();
    
      ...
      //提交日志
      void submit_entry(Entry&& e);
      //开启线程和终止线程
      void start();
      void stop();
      ...
    }
    

    注意上面 log 的构造函数: Log(const SubsystemMap *s),创建 Log 实例需要传入 SubsystemMap 。这个 map 实际也是在 CephContext 构造函数初始化列表中创建的。主要用来描述单个子系统的日志信息(即子模块,类似 rbd、rbd bench)。此处的日志级别区别与每条日志的级别,只有当日志级别小于等于该子系统日志级别时才会被打印,可在配置文件中更改子系统日志级别。

    struct Subsystem {
      int log_level; // 日志级别
      int gather_level; // gather级别
      std::string name; // 子系统名称
      
      Subsystem() : log_level(0), gather_level(0) {}     
    };
    

    下面给出了日志写入文件操作的部分源代码。简单叙述整个过程,就是把 m_new 队列数据转移到 m_flush,读取 m_flush 队列中的每条记录,判断是否满足日志级别小于子系统级别的要求,同时把数据复制到 m_recent 队列,然后经过层层调用,最终通过 write() 写入日志文件。

    void *Log::entry()
    {
      reopen_log_file(); //打开日志文件,返回 fd 文件号,记录在 Log.cc 中的 m_fd 局部变量中
      {
        ...
        //循环刷新日志,m_new 转移到 m_flush,具体在 flush() 函数实现。
        while (!m_stop) {
          if (!m_new.empty()) {
            ...
            flush();
            ...
            continue;
          }
          ...
        }
        ...
      }
      //线程结束时,再次刷新
      flush();
      return NULL;
    }
    
    
    void Log::flush()
    {
      {
        ...
        m_flush.swap(m_new);//把 m_new 队列数据转移到 m_flush
        m_cond_loggers.notify_all();//唤醒所有线程
        ...
      }
      _flush(m_flush, true, false);//实例化 m_flush 队列,数据记录在 m_recent
     ...
    }
    
    
    void Log::_flush(EntryVector& t, bool requeue, bool crash)
    {
      ...
      for (auto& e : t) {
        //判断本条日志级别是否小于等于子系统日志级别,小于则为true
        bool should_log = crash || m_subs->get_log_level(sub) >= prio;
        //日志文件是否成功打开,是否写本条日志
        bool do_fd = m_fd >= 0 && should_log;
        bool do_syslog = m_syslog_crash >= prio && should_log;
        bool do_stderr = m_stderr_crash >= prio && should_log;
        bool do_graylog2 = m_graylog_crash >= prio && should_log;
    
        if (do_fd || do_syslog || do_stderr) {
          ...
          if (do_syslog) {
            syslog(LOG_USER|LOG_INFO, "%s", pos);
          }
          if (do_stderr) {
            std::cerr << m_log_stderr_prefix << std::string_view(pos, used) << std::endl;
          }
          ...
          if (do_fd) {
          ...
          if (m_log_buf.size() > MAX_LOG_BUF) {
            _flush_logbuf();
          }
        }
        //把数据存储压入m_recent
        if (requeue) {
          m_recent.push_back(std::move(e));
        }
      }
      t.clear();
    
      _flush_logbuf();
    }
    
    
    //再经过几次调用,最终通过 write() 函数把日志序列化到文件中。
    void Log::_flush_logbuf()
    {
      if (m_log_buf.size()) {
        _log_safe_write(std::string_view(m_log_buf.data(), m_log_buf.size()));
        m_log_buf.resize(0);
      }
    }
    

    大致了解了 log 线程的工作模式后,让我们来看看如何通过代码的方式记录日志,以及它是如何实现的?

    随机挑选一条日志记录代码,具体如下。

    ldout(cct, 20) << "Log depth=" << depth << " x=" << x << dendl;
    

    可以看到和我们通常使用的 cout 命令有点像。<< 拼接字符串,dendl 结尾,ldout() 中传入两个参数,第一个是固定的 cct (CephContext),第二个代表日志级别:error = -1、warn = 0、info = 1、debug = 5、20 = trace。了解这些,已经能够完成记录日志的操作了。

    ldout 主要在dout.h 文件中通过宏定义来实现,如下给出了3个日志输出方式:lsubdout、ldout、lderr,都通过 dout_impl() 方法实现。should_gather 标志位,判定是否满足输出级别要求。创建 MutableEntry 实例,通过submit_entr() 方法提交到 m_new 队列,交由 log 线程处理。

    #define lsubdout(cct, sub, v)  dout_impl(cct, ceph_subsys_##sub, v) dout_prefix
    #define ldout(cct, v)  dout_impl(cct, dout_subsys, v) dout_prefix
    #define lderr(cct) dout_impl(cct, ceph_subsys_, -1) dout_prefix
    
    #define dout_impl(cct, sub, v)                      \
      do {                                  \
      const bool should_gather = [&](const auto cctX) {         \
        if constexpr (ceph::dout::is_dynamic<decltype(sub)>::value ||   \
              ceph::dout::is_dynamic<decltype(v)>::value) {     \
          return cctX->_conf->subsys.should_gather(sub, v);         \
        } else {                                \
          /* The parentheses are **essential** because commas in angle  \
           * brackets are NOT ignored on macro expansion! A language's  \
           * limitation, sorry. */                      \
          return (cctX->_conf->subsys.template should_gather<sub, v>());    \
        }                                   \
      }(cct);                               \
                                        \
      if (should_gather) {                          \
        ceph::logging::MutableEntry _dout_e(v, sub);                        \
        static_assert(std::is_convertible<decltype(&*cct),          \
                          CephContext* >::value,        \
              "provided cct must be compatible with CephContext*"); \
        auto _dout_cct = cct;                       \
        std::ostream* _dout = &_dout_e.get_ostream();
    
    #define dendl_impl std::flush;                                          \
        _dout_cct->_log->submit_entry(std::move(_dout_e));                  \
      }                                                                     \
      } while (0)
    

    2.1.2 msgr-worker-x

    说到 msgr-worker,它实际是 Messenger 模块创建的通信线程,用来监听和发送对象。在《Ceph 源码分析》一书中有简单介绍 SimpleMessenger,通过 pipe 传递消息。但是在目前的N版中,主要使用 AsyncMessenger,AsynsMessenger 使用 epoll 来监听端口发送消息,有异步、多路IO复用等功能,能够提升端口监听的数量以及降低资源消耗。

    msgr-worker 创建过程的具体调用链如下:
    global_init() -> global_pre_init() -> mc_bootstrap.get_monmap_and_config() -> Messenger::create_client_messenger() -> Messenger::create() -> new AsyncMessenger -> Stack->start()。

    网络通信模块实在是纷繁复杂,想要顺畅、完整地介绍它,本人还做不到。这里就简单的介绍下 AsyncMessenger,关于网络通信模块详细介绍留待后续。

    就从 Messenger::create() 开始吧。Meseenger 支持三种模式:simple、async、xio,可以通过指定 --ms_type=XX 参数来选择模式。AsyncMessenger 除了支持 posix 传输模式外,还支持 rdma 和 dpdk,通过 --ms_type=async+XX来配置。AsyncMessenger 首先创建一个 StackSingleton 单例,再调用 single->ready(),再到 NetworkStack::create() 函数创建 NetworkStack。

    AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
                                   const std::string &type, string mname, uint64_t _nonce)
      : SimplePolicyMessenger(cct, name,mname, _nonce),
        dispatch_queue(cct, this, mname),
        lock("AsyncMessenger::lock"),
        nonce(_nonce), need_addr(true), did_bind(false),
        global_seq(0), deleted_lock("AsyncMessenger::deleted_lock"),
        cluster_protocol(0), stopped(true)
    {
      std::string transport_type = "posix";
      //支持 rdma 和 dpdk 传输模式
      if (type.find("rdma") != std::string::npos)
        transport_type = "rdma";
      else if (type.find("dpdk") != std::string::npos)
        transport_type = "dpdk";
            
      //创建 StackSingleton 单例
      auto single = &cct->lookup_or_create_singleton_object<StackSingleton>(
        "AsyncMessenger::NetworkStack::" + transport_type, true, cct);
      //创建 NetworkStack
      single->ready(transport_type);
      //创建 PosixWorker
      stack = single->stack.get();
      //开启 msgr-worker 线程
      stack->start();
      ...
    }
    
    struct StackSingleton {
      ...
      void ready(std::string &type) {
        if (!stack)
          stack = NetworkStack::create(cct, type);
      }
      ...
    };
    

    NetworkStack 构造函数如下。NetworkStack 中的 workers 用来保存多个 worker,每个 worker 都会创建一个 epoll(大多的网络编程中,都会使用基于事件通知的异步网络 IO 方式来实现,比如 Epoll 和 Kqueue,ceph 的网络模块使用的是Epoll)。worker 的作用主要就是监听端口和建立连接,使用 epoll 来进行 socket 事件驱动。3个 worker 保证了负载均衡,每次调用 connection 都会寻找连接数最少的 worker 线程。

    NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(false), cct(c)
    {
      //读取 msger_worker 线程的数量配置
      num_workers = cct->_conf->ms_async_op_threads;
      ...
     //循环创建 woker,压入 workers 队列,在事件中心初始化
      for (unsigned i = 0; i < num_workers; ++i) {
        Worker *w = create_worker(cct, type, i);
        w->center.init(InitEventNumber, i, type);
        workers.push_back(w);
      }
    }
    
    class PosixWorker : public Worker {
      //监听端口
      int listen(entity_addr_t &sa,
             unsigned addr_slot,
             const SocketOptions &opt,
             ServerSocket *socks) override;
      //连接端口
      int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override;
    };
    

    在 stack->start() 方法中,add_thread() 创建了线程的方法体(线程开启时,执行该方法),spawn_worker() 中通过 std::thread() 方法创建线程。线程的数量 num_workers 根据配置中的 ms.async.op.threads 来决定,默认为3。

     void NetworkStack::start(){
         ...
          for (unsigned i = 0; i < num_workers; ++i) { 
              if (workers[i]->is_init())
                  continue;
              //创建线程主体
              std::function<void ()> thread = add_thread(i);
              //开启线程
              spawn_worker(i, std::move(thread));
          }
         ...
     }
    

    add_thread() 中设置了当前线程的名称:msgr-worker-{num},在事件中心注册 worker 线程,进行初始化工作。while 循环中则一直轮询处理 EventCenter 中的事件。EventCenter 是一个存储事件的容器,它通过注册的回调函数 EventCallbackRef 来针对的处理事件。事件类型有三种:time_events(定时事件)、external_events(外部事件,发送消息)、可读事件(epoll 监听的事件,接收消息)。

    std::function<void ()> NetworkStack::add_thread(unsigned i)
    {
      Worker *w = workers[i];
      return [this, w]() {
          char tp_name[16];
          sprintf(tp_name, "msgr-worker-%u", w->id);
          //线程改名:msgr-worker-x
          ceph_pthread_setname(pthread_self(), tp_name);
          const unsigned EventMaxWaitUs = 30000000;
          w->center.set_owner();
          ldout(cct, 10) << __func__ << " starting" << dendl;
          w->initialize();
          w->init_done();
          //轮询事件中心,处理消息
          while (!w->done) {
            ldout(cct, 30) << __func__ << " calling event process" << dendl;
    
            ceph::timespan dur;
            int r = w->center.process_events(EventMaxWaitUs, &dur);
            if (r < 0) {
              ldout(cct, 20) << __func__ << " process events failed: "
                             << cpp_strerror(errno) << dendl;
              // TODO do something?
            }
            w->perf_logger->tinc(l_msgr_running_total_time, dur);
          }
          w->reset();
          w->destroy();
      };
    }
    

    至此,建立了网络通信模块的基础,后续需要连接某个 ip 端口时,需要从 stack 中申请一个负载最小的 msgr_worker 线程,通过 new AsyncConnection() 方式建立连接。可以在上述代码中看到消息处理的蛛丝马迹:w->center.process_events(),消息的传递都经由 EventCenter 事件中心,我们在实际操作中,只需要把消息封装成 message ,通过 dispatch_event_external() 提交给事件中心并注册为外部事件即可,然后由 connection 建立连接时指定的 msgr_worker 线程去处理消息的传递。

    总结:global_init() 函数是通用了全局初始化函数,无论是客户端还是服务端,Ceph 的任意子系统都需要进行该初始化。主要内容包括:解析配置(默认配置、命令行参数、配置文件)、开启 log、开启网络通信、加载信号处理器,创建 CephContext。

    相关文章

      网友评论

        本文标题:站在 rados tools 看 ceph(3)

        本文链接:https://www.haomeiwen.com/subject/dvfenktx.html