美文网首页
pika WorkerThread线程

pika WorkerThread线程

作者: 白馨_1114 | 来源:发表于2020-04-21 17:52 被阅读0次

WorkerThread:存在多个(用户配置),每个线程里有若干个用户客户端的连接,负责接收处理用户命令并返回结果,每个线程执行写命令后,追加到binlog中。

WorkerThread是在DispatchThread中创建的线程,开始多少线程,由配置决定的。
DispatchThread初始化时,根据配置创建了WorkerThread(与DispatchThread一样继承third/pink/pink/src/pink_thread.cc)
|third/pink/pink/src/dispatch_thread.cc|worker_thread_[i] = new WorkerThread(conn_factory, this, cron_interval)|
|-|-|-|

以下分为两个部分,一、创建PinkConn 二、处理请求阐述整个流程。
一、创建PinkConn
pika tcp连接都是在DispatchThread中accept的,将tcp连接封装成PinkConn是在WorkerThread中完成的。
在WorkerThread类中定义了conn_queue_
|./third/pink/pink/src/worker_thread.h|std::queue<PinkItem> conn_queue_
|-|-|-

  1. DispatchThread创建链接之后,将连接放入WorkerThread的conn_queue_中
third/pink/pink/src/dispatch_thread.cc 
std::queue<PinkItem> *q = &(worker_thread_[next_thread]->conn_queue_);
    if (q->size() < static_cast<size_t>(queue_limit_)) {
      log_info("queue limit is %d", queue_limit_);
      q->push(ti);
      find = true;
      break;
    }
  1. 接着使用管道的方式,通知WorkerThread,将放入的连接封装成PinkConn(原因不明,复杂)
    DispatchThread通知:

|third/pink/pink/src/dispatch_thread.cc|write(worker_thread_[next_thread]->notify_send_fd(), "", 1)
|-|-|-

WorkerThread响应:
./third/pink/pink/src/worker_thread.cc

if (pfe->fd == notify_receive_fd_) {
        if (pfe->mask & EPOLLIN) {
          read(notify_receive_fd_, bb, 1);
          {
            slash::MutexLock l(&mutex_);
            ti = conn_queue_.front();
            conn_queue_.pop();
          }
          PinkConn *tc = conn_factory_->NewPinkConn( //创建pinkConn
              ti.fd(), ti.ip_port(),
              server_thread_, private_data_);
          if (!tc || !tc->SetNonblock()) {
            delete tc;
            continue;
          }

二、处理请求
如前文所说,WorkerThread是继承third/pink/pink/src/pink_thread.cc。它的主要工作都在void *WorkerThread::ThreadMain() { }中实现。
调用流程如下:
1.third/pink/pink/src/worker_thread.cc


 if (pfe->mask & EPOLLOUT && in_conn->is_reply()) {  //回复客户端请求
          WriteStatus write_status = in_conn->SendReply();
          in_conn->set_last_interaction(now);
          …...
        }

        if (!should_close && pfe->mask & EPOLLIN) { //处理客户端请求
          ReadStatus getRes = in_conn->GetRequest();  
          in_conn->set_last_interaction(now);
         …...
        }
}

2.third/pink/pink/src/redis_conn.cc

ReadStatus RedisConn::GetRequest() {
…...
  nread = read(fd(), rbuf_ + next_read_pos, remain); //从socket中读取数据
…...
  ReadStatus ret = ProcessInputBuffer();  //处理从socket中读取的数据
…...
}

third/pink/pink/src/redis_conn.cc
ReadStatus RedisConn::ProcessInputBuffer() {
…...
    if (req_type_ == REDIS_REQ_INLINE) { //区分命令是INLINE还是MULTIBULK
      ret = ProcessInlineBuffer();
      if (ret != kReadAll) {
        return ret;
      }
    } else if (req_type_ == REDIS_REQ_MULTIBULK) {
      ret = ProcessMultibulkBuffer();
      if (ret != kReadAll) { // FULL_ERROR || HALF || PARSE_ERROR
        return ret;
      }
    }

    if (!argv_.empty()) {
      if (DealMessage(argv_, &response_) != 0) { //处理redis消息
        return kDealError;
      }
    }

int PikaClientConn::DealMessage(
  if (response->empty()) {
    // Avoid memory copy
    *response = std::move(DoCmd(argv, opt)); //处理redis命令
  } else {
    // Maybe pipeline
    response->append(DoCmd(argv, opt));
  }
}

3.src/pika_client_conn.cc

std::string PikaClientConn::DoCmd(
    PikaCmdArgsType& argv, const std::string& opt) {
  // Get command info
  const CmdInfo* const cinfo_ptr = GetCmdInfo(opt);
  Cmd* c_ptr = GetCmdFromTable(opt, *cmds_table_);
……
c_ptr->Do(); // 根据命令决定调用哪个命令的Do,以set请求为例子说明处理请求的流程。
……
if (cinfo_ptr->is_write()
    && g_pika_conf->write_binlog()) {。//命令可写,且配置过binlog,
    std::string binlog = c_ptr->ToBinlog(…)  //生成binlog格式
      if (!binlog.empty()) {
        s = g_pika_server->logger_->Put(binlog); //追加到binlog中
      }
}

以上是pika处理命令的大体流程,接下来详细写两点:
如何调用rocksdb接口,将数据写入
如何追加到binlog中

1.假设是set请求,那么Do的实现在src/pika_kv.cc中的

void SetCmd::Do() {
  rocksdb::Status s;
  int32_t res = 1;
  switch (condition_) {
    case SetCmd::kXX:
      s = g_pika_server->db()->Setxx(key_, value_, &res, sec_);
//db的类型为BlackWidow,在third/blackwidow/src/blackwidow.cc中,
//根据命令会设置调用相应类型的rocksdb(  strings_db_,hashes_db_, sets_db_, zsets_db_,lists_db_)
//比如比如Status BlackWidow::Setnx(){return strings_db_->Setnx(key, value, ret, ttl);}                                                                                   
}
      break;
    case SetCmd::kNX:
      s = g_pika_server->db()->Setnx(key_, value_, &res, sec_);
      break;
    case SetCmd::kVX:
      s = g_pika_server->db()->Setvx(key_, target_, value_, &success_, sec_);
      break;
    default:
      s = g_pika_server->db()->Set(key_, value_, sec_);
      break;
  }

最终将在third/blackwidow/src/redis_strings.cc中调用Set(),写入rocksdb中

Status RedisStrings::Set(const Slice& key,
                         const Slice& value,
                         const int32_t ttl) {
  StringsValue strings_value(value);
  ScopeRecordLock l(lock_mgr_, key);
  if (ttl > 0) {
    strings_value.SetRelativeTimestamp(ttl);
  }
  return db_->Put(default_write_options_, key, strings_value.Encode());
}

2.追加到binlog中
src/pika_binlog.cc中实现,具体在后面binlog线程删除中展开。
binlog 用slash::WritableFile queue_追加数据,具体在third/slash/slash/src/env.cc中实现。
third/slash/slash/src/env.cc中NewWritableFile中可与看到binlog是通过mmap方式实现的,
追加数据并不是写入磁盘,而且写入共享内存中。
void
ptr = mmap(NULL, map_size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0); //mmap起始地址由系统决定

相关文章

  • pika WorkerThread线程

    WorkerThread:存在多个(用户配置),每个线程里有若干个用户客户端的连接,负责接收处理用户命令并返回结果...

  • DispatchThread线程

    pika使用的是多线程模型,使用多个工作线程来进行读写操作,由底层blackwidow引擎来保证线程安全,线程分为...

  • Java多线程WorkerThread模式

    工作没来就一直等,工作来了就干活。 示例程序 Main 测试程序行为的类 ClientThread 表示发出工作...

  • PIKA PIKA PIKA PIKA

    大家好,我是菜鸟小白菌。 今天给大家推荐的电影是大侦探皮卡丘。PIKA-PIKA, PIKA PIKA PIKAP...

  • pika中锁的应用

    pika作为类redis的存储系统,为了弥补在性能上的不足,在整个系统中大量使用多线程的结构,涉及到多线程编程,势...

  • pika_to_redis数据迁移工具设计与实现过程

    背景 为满足运维人员对于pika可以更好地运维,需要方便地将数据从pika迁移到redis,在pika系统中之前已...

  • Python3 操作RabbitMQ

    python使用pika模块操作RabbitMQ,我们可以通过sudo pip3 install pika来安装p...

  • pika

    本次分享主题 pika 是360 DBA和基础架构组联合开发的类redis 存储系统, 完全支持Redis协议,用...

  • pika支持codis的slot迁移

    概述 pika使用redis协议,命令操作和redis一致,因而可以直接使用codis作为pika的集群解决方案,...

  • pika 链接问题

    使用celery结合pika处理异步任务,并将处理任务结果发布到其他队列中时遇到错误获取pika 链接 过一段时间...

网友评论

      本文标题:pika WorkerThread线程

      本文链接:https://www.haomeiwen.com/subject/rvepihtx.html