美文网首页
pika WorkerThread线程

pika WorkerThread线程

作者: 白馨_1114 | 来源:发表于2020-04-21 17:52 被阅读0次

    WorkerThread:存在多个(用户配置),每个线程里有若干个用户客户端的连接,负责接收处理用户命令并返回结果,每个线程执行写命令后,追加到binlog中。

    WorkerThread是在DispatchThread中创建的线程,开始多少线程,由配置决定的。
    DispatchThread初始化时,根据配置创建了WorkerThread(与DispatchThread一样继承third/pink/pink/src/pink_thread.cc)
    |third/pink/pink/src/dispatch_thread.cc|worker_thread_[i] = new WorkerThread(conn_factory, this, cron_interval)|
    |-|-|-|

    以下分为两个部分,一、创建PinkConn 二、处理请求阐述整个流程。
    一、创建PinkConn
    pika tcp连接都是在DispatchThread中accept的,将tcp连接封装成PinkConn是在WorkerThread中完成的。
    在WorkerThread类中定义了conn_queue_
    |./third/pink/pink/src/worker_thread.h|std::queue<PinkItem> conn_queue_
    |-|-|-

    1. DispatchThread创建链接之后,将连接放入WorkerThread的conn_queue_中
    third/pink/pink/src/dispatch_thread.cc 
    std::queue<PinkItem> *q = &(worker_thread_[next_thread]->conn_queue_);
        if (q->size() < static_cast<size_t>(queue_limit_)) {
          log_info("queue limit is %d", queue_limit_);
          q->push(ti);
          find = true;
          break;
        }
    
    1. 接着使用管道的方式,通知WorkerThread,将放入的连接封装成PinkConn(原因不明,复杂)
      DispatchThread通知:

    |third/pink/pink/src/dispatch_thread.cc|write(worker_thread_[next_thread]->notify_send_fd(), "", 1)
    |-|-|-

    WorkerThread响应:
    ./third/pink/pink/src/worker_thread.cc

    if (pfe->fd == notify_receive_fd_) {
            if (pfe->mask & EPOLLIN) {
              read(notify_receive_fd_, bb, 1);
              {
                slash::MutexLock l(&mutex_);
                ti = conn_queue_.front();
                conn_queue_.pop();
              }
              PinkConn *tc = conn_factory_->NewPinkConn( //创建pinkConn
                  ti.fd(), ti.ip_port(),
                  server_thread_, private_data_);
              if (!tc || !tc->SetNonblock()) {
                delete tc;
                continue;
              }
    

    二、处理请求
    如前文所说,WorkerThread是继承third/pink/pink/src/pink_thread.cc。它的主要工作都在void *WorkerThread::ThreadMain() { }中实现。
    调用流程如下:
    1.third/pink/pink/src/worker_thread.cc

    
     if (pfe->mask & EPOLLOUT && in_conn->is_reply()) {  //回复客户端请求
              WriteStatus write_status = in_conn->SendReply();
              in_conn->set_last_interaction(now);
              …...
            }
    
            if (!should_close && pfe->mask & EPOLLIN) { //处理客户端请求
              ReadStatus getRes = in_conn->GetRequest();  
              in_conn->set_last_interaction(now);
             …...
            }
    }
    

    2.third/pink/pink/src/redis_conn.cc

    ReadStatus RedisConn::GetRequest() {
    …...
      nread = read(fd(), rbuf_ + next_read_pos, remain); //从socket中读取数据
    …...
      ReadStatus ret = ProcessInputBuffer();  //处理从socket中读取的数据
    …...
    }
    
    third/pink/pink/src/redis_conn.cc
    ReadStatus RedisConn::ProcessInputBuffer() {
    …...
        if (req_type_ == REDIS_REQ_INLINE) { //区分命令是INLINE还是MULTIBULK
          ret = ProcessInlineBuffer();
          if (ret != kReadAll) {
            return ret;
          }
        } else if (req_type_ == REDIS_REQ_MULTIBULK) {
          ret = ProcessMultibulkBuffer();
          if (ret != kReadAll) { // FULL_ERROR || HALF || PARSE_ERROR
            return ret;
          }
        }
    
        if (!argv_.empty()) {
          if (DealMessage(argv_, &response_) != 0) { //处理redis消息
            return kDealError;
          }
        }
    
    int PikaClientConn::DealMessage(
      if (response->empty()) {
        // Avoid memory copy
        *response = std::move(DoCmd(argv, opt)); //处理redis命令
      } else {
        // Maybe pipeline
        response->append(DoCmd(argv, opt));
      }
    }
    

    3.src/pika_client_conn.cc

    std::string PikaClientConn::DoCmd(
        PikaCmdArgsType& argv, const std::string& opt) {
      // Get command info
      const CmdInfo* const cinfo_ptr = GetCmdInfo(opt);
      Cmd* c_ptr = GetCmdFromTable(opt, *cmds_table_);
    ……
    c_ptr->Do(); // 根据命令决定调用哪个命令的Do,以set请求为例子说明处理请求的流程。
    ……
    if (cinfo_ptr->is_write()
        && g_pika_conf->write_binlog()) {。//命令可写,且配置过binlog,
        std::string binlog = c_ptr->ToBinlog(…)  //生成binlog格式
          if (!binlog.empty()) {
            s = g_pika_server->logger_->Put(binlog); //追加到binlog中
          }
    }
    

    以上是pika处理命令的大体流程,接下来详细写两点:
    如何调用rocksdb接口,将数据写入
    如何追加到binlog中

    1.假设是set请求,那么Do的实现在src/pika_kv.cc中的

    void SetCmd::Do() {
      rocksdb::Status s;
      int32_t res = 1;
      switch (condition_) {
        case SetCmd::kXX:
          s = g_pika_server->db()->Setxx(key_, value_, &res, sec_);
    //db的类型为BlackWidow,在third/blackwidow/src/blackwidow.cc中,
    //根据命令会设置调用相应类型的rocksdb(  strings_db_,hashes_db_, sets_db_, zsets_db_,lists_db_)
    //比如比如Status BlackWidow::Setnx(){return strings_db_->Setnx(key, value, ret, ttl);}                                                                                   
    }
          break;
        case SetCmd::kNX:
          s = g_pika_server->db()->Setnx(key_, value_, &res, sec_);
          break;
        case SetCmd::kVX:
          s = g_pika_server->db()->Setvx(key_, target_, value_, &success_, sec_);
          break;
        default:
          s = g_pika_server->db()->Set(key_, value_, sec_);
          break;
      }
    

    最终将在third/blackwidow/src/redis_strings.cc中调用Set(),写入rocksdb中

    Status RedisStrings::Set(const Slice& key,
                             const Slice& value,
                             const int32_t ttl) {
      StringsValue strings_value(value);
      ScopeRecordLock l(lock_mgr_, key);
      if (ttl > 0) {
        strings_value.SetRelativeTimestamp(ttl);
      }
      return db_->Put(default_write_options_, key, strings_value.Encode());
    }
    

    2.追加到binlog中
    src/pika_binlog.cc中实现,具体在后面binlog线程删除中展开。
    binlog 用slash::WritableFile queue_追加数据,具体在third/slash/slash/src/env.cc中实现。
    third/slash/slash/src/env.cc中NewWritableFile中可与看到binlog是通过mmap方式实现的,
    追加数据并不是写入磁盘,而且写入共享内存中。
    void
    ptr = mmap(NULL, map_size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0); //mmap起始地址由系统决定

    相关文章

      网友评论

          本文标题:pika WorkerThread线程

          本文链接:https://www.haomeiwen.com/subject/rvepihtx.html