WorkerThread:存在多个(用户配置),每个线程里有若干个用户客户端的连接,负责接收处理用户命令并返回结果,每个线程执行写命令后,追加到binlog中。
WorkerThread是在DispatchThread中创建的线程,开始多少线程,由配置决定的。
DispatchThread初始化时,根据配置创建了WorkerThread(与DispatchThread一样继承third/pink/pink/src/pink_thread.cc)
|third/pink/pink/src/dispatch_thread.cc|worker_thread_[i] = new WorkerThread(conn_factory, this, cron_interval)|
|-|-|-|
以下分为两个部分,一、创建PinkConn 二、处理请求阐述整个流程。
一、创建PinkConn
pika tcp连接都是在DispatchThread中accept的,将tcp连接封装成PinkConn是在WorkerThread中完成的。
在WorkerThread类中定义了conn_queue_
|./third/pink/pink/src/worker_thread.h|std::queue<PinkItem> conn_queue_
|-|-|-
- DispatchThread创建链接之后,将连接放入WorkerThread的conn_queue_中
third/pink/pink/src/dispatch_thread.cc
std::queue<PinkItem> *q = &(worker_thread_[next_thread]->conn_queue_);
if (q->size() < static_cast<size_t>(queue_limit_)) {
log_info("queue limit is %d", queue_limit_);
q->push(ti);
find = true;
break;
}
- 接着使用管道的方式,通知WorkerThread,将放入的连接封装成PinkConn(原因不明,复杂)
DispatchThread通知:
|third/pink/pink/src/dispatch_thread.cc|write(worker_thread_[next_thread]->notify_send_fd(), "", 1)
|-|-|-
WorkerThread响应:
./third/pink/pink/src/worker_thread.cc
if (pfe->fd == notify_receive_fd_) {
if (pfe->mask & EPOLLIN) {
read(notify_receive_fd_, bb, 1);
{
slash::MutexLock l(&mutex_);
ti = conn_queue_.front();
conn_queue_.pop();
}
PinkConn *tc = conn_factory_->NewPinkConn( //创建pinkConn
ti.fd(), ti.ip_port(),
server_thread_, private_data_);
if (!tc || !tc->SetNonblock()) {
delete tc;
continue;
}
二、处理请求
如前文所说,WorkerThread是继承third/pink/pink/src/pink_thread.cc。它的主要工作都在void *WorkerThread::ThreadMain() { }中实现。
调用流程如下:
1.third/pink/pink/src/worker_thread.cc
if (pfe->mask & EPOLLOUT && in_conn->is_reply()) { //回复客户端请求
WriteStatus write_status = in_conn->SendReply();
in_conn->set_last_interaction(now);
…...
}
if (!should_close && pfe->mask & EPOLLIN) { //处理客户端请求
ReadStatus getRes = in_conn->GetRequest();
in_conn->set_last_interaction(now);
…...
}
}
2.third/pink/pink/src/redis_conn.cc
ReadStatus RedisConn::GetRequest() {
…...
nread = read(fd(), rbuf_ + next_read_pos, remain); //从socket中读取数据
…...
ReadStatus ret = ProcessInputBuffer(); //处理从socket中读取的数据
…...
}
third/pink/pink/src/redis_conn.cc
ReadStatus RedisConn::ProcessInputBuffer() {
…...
if (req_type_ == REDIS_REQ_INLINE) { //区分命令是INLINE还是MULTIBULK
ret = ProcessInlineBuffer();
if (ret != kReadAll) {
return ret;
}
} else if (req_type_ == REDIS_REQ_MULTIBULK) {
ret = ProcessMultibulkBuffer();
if (ret != kReadAll) { // FULL_ERROR || HALF || PARSE_ERROR
return ret;
}
}
if (!argv_.empty()) {
if (DealMessage(argv_, &response_) != 0) { //处理redis消息
return kDealError;
}
}
int PikaClientConn::DealMessage(
if (response->empty()) {
// Avoid memory copy
*response = std::move(DoCmd(argv, opt)); //处理redis命令
} else {
// Maybe pipeline
response->append(DoCmd(argv, opt));
}
}
3.src/pika_client_conn.cc
std::string PikaClientConn::DoCmd(
PikaCmdArgsType& argv, const std::string& opt) {
// Get command info
const CmdInfo* const cinfo_ptr = GetCmdInfo(opt);
Cmd* c_ptr = GetCmdFromTable(opt, *cmds_table_);
……
c_ptr->Do(); // 根据命令决定调用哪个命令的Do,以set请求为例子说明处理请求的流程。
……
if (cinfo_ptr->is_write()
&& g_pika_conf->write_binlog()) {。//命令可写,且配置过binlog,
std::string binlog = c_ptr->ToBinlog(…) //生成binlog格式
if (!binlog.empty()) {
s = g_pika_server->logger_->Put(binlog); //追加到binlog中
}
}
以上是pika处理命令的大体流程,接下来详细写两点:
如何调用rocksdb接口,将数据写入
如何追加到binlog中
1.假设是set请求,那么Do的实现在src/pika_kv.cc中的
void SetCmd::Do() {
rocksdb::Status s;
int32_t res = 1;
switch (condition_) {
case SetCmd::kXX:
s = g_pika_server->db()->Setxx(key_, value_, &res, sec_);
//db的类型为BlackWidow,在third/blackwidow/src/blackwidow.cc中,
//根据命令会设置调用相应类型的rocksdb( strings_db_,hashes_db_, sets_db_, zsets_db_,lists_db_)
//比如比如Status BlackWidow::Setnx(){return strings_db_->Setnx(key, value, ret, ttl);}
}
break;
case SetCmd::kNX:
s = g_pika_server->db()->Setnx(key_, value_, &res, sec_);
break;
case SetCmd::kVX:
s = g_pika_server->db()->Setvx(key_, target_, value_, &success_, sec_);
break;
default:
s = g_pika_server->db()->Set(key_, value_, sec_);
break;
}
最终将在third/blackwidow/src/redis_strings.cc中调用Set(),写入rocksdb中
Status RedisStrings::Set(const Slice& key,
const Slice& value,
const int32_t ttl) {
StringsValue strings_value(value);
ScopeRecordLock l(lock_mgr_, key);
if (ttl > 0) {
strings_value.SetRelativeTimestamp(ttl);
}
return db_->Put(default_write_options_, key, strings_value.Encode());
}
2.追加到binlog中
src/pika_binlog.cc中实现,具体在后面binlog线程删除中展开。
binlog 用slash::WritableFile queue_追加数据,具体在third/slash/slash/src/env.cc中实现。
third/slash/slash/src/env.cc中NewWritableFile中可与看到binlog是通过mmap方式实现的,
追加数据并不是写入磁盘,而且写入共享内存中。
void ptr = mmap(NULL, map_size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0); //mmap起始地址由系统决定
网友评论