美文网首页
DispatchThread线程

DispatchThread线程

作者: 白馨_1114 | 来源:发表于2020-04-21 17:51 被阅读0次

    pika使用的是多线程模型,使用多个工作线程来进行读写操作,由底层blackwidow引擎来保证线程安全,线程分为11种:
    PikaServer:主线程
    DispatchThread:监听端口1个端口,接收用户连接请求
    WorkerThread:存在多个(用户配置),每个线程里有若干个用户客户端的连接,负责接收处理用户命令并返回结果,每个线程执行写命令后,追加到binlog中
    Trysync:尝试与master建立首次连接,并在以后出现故障后发起重连
    BinlogSender:存在多个(动态创建销毁,本master节点挂多少个slave节点就有多少个),每个线程根据slave节点发来的同步偏移量,从binlog指定的偏移开始实时同步命令给slave节点
    BinlogReceiver:存在1个(动态创建销毁,一个slave节点同时只能有一个master),将用户指定或当前的偏移量发送给master节点并开始接收执行master实时发来的同步命令,在本地使用和master完全一致的偏移量来追加binlog
    SlavePing:slave用来向master发送心跳进行存活检测
    bgsave:后台dump线程
    HeartBeat:master用来接收所有slave发送来的心跳并回复进行存活检测
    scan:后台扫描keyspace线程
    purge:后台删除binlog线程
    Monitor: 实时打印出Pika服务器接收到的命令
    Pub/Sub: 用来支持Pika的订阅功能

    在看这一系列线程之前,
    看一下最简单的线程编程,我们只用关心线程创建、线程资源回收以及线程终止。
    posix接口:
    一、线程创建:
    int pthread_create(pthread_t *thread, const pthread_attr_t attr, void (start_routine)(void) void *arg);
    线程资源回收:
    int pthread_join(pthread_t thread, void **retval);
    不回收:
    1)已经退出的线程,其空间没有被释放,仍然在进程地址空间;
    2)新创建的线程,无法复用退出线程的地址空间。
    1,2同时发生就会造成内存泄露。所以同多进程编程一样,所有的高级语言都需要调用wait,不然会造成僵尸进程。
    线程终止:
    void pthread_exit( void * value_ptr );

    线程DispatchThread是用来bind一个服务端口,listen客户端connot,并创建连接的一个线程。

    再pika(server端)中负责建立连接的线程底层实现为:
    third/pink/pink/src/pink_thread.cc

    void* Thread::RunThread(void *arg) {
      Thread* thread = reinterpret_cast<Thread*>(arg);
      if (!(thread->thread_name().empty())) {
        SetThreadName(pthread_self(), thread->thread_name());
      }
      thread->ThreadMain();
      return nullptr;
    }
    int Thread::StartThread() { //pthread_create
      slash::MutexLock l(&running_mu_);
      should_stop_ = false;
      if (!running_) {
        running_ = true;
        return pthread_create(&thread_id_, nullptr, RunThread, (void *)this);
      }
      return 0;
    }
    int Thread::StopThread() { //线程退出,这个不是正真意义上的pthread_exit。其实最终还是调用pthread_join,也就是在整个服务的过程中,该线程是不会退出的,只会随着进程退出而退出。
      slash::MutexLock l(&running_mu_);
      should_stop_ = true;
      if (running_) {
        running_ = false;
        return pthread_join(thread_id_, nullptr);
      }
      return 0;
    }
    int Thread::JoinThread() { //pthread_join
      return pthread_join(thread_id_, nullptr);
    }
    

    在代码pthread_create(&thread_id_, nullptr, RunThread, (void *)this)中,RunThread是创建的线程需要执行的函数。
    thread->ThreadMain(),这个thread其实就是 ServerThread。

    二、Server端的网络编程实现很简单:
    bind()—>listen()—>accept()
    在 ServerThread中实现了socket 的逻辑,具体不展开。
    third/pink/pink/src/server_thread.cc)
    connfd = accept(fd, (struct sockaddr *) &cliaddr, &clilen);
    socket_p = new ServerSocket(port_)中实现listen(),bind()逻辑。

    三、接着看如何从pikaServer入口,调用到最底层实现。

    文件 代码部分 关键调用部分
    ./src/pika_server.cc pika_dispatch_thread_ = new PikaDispatchThread(ips, port_, worker_num_, 3000, worker_queue_limit)
    src/pika_dispatch_thread.cc pink::NewDispatchThread(ips, port, work_num, &conn_factory_,cron_interval, queue_limit, &handles_);
    third/pink/pink/src/dispatch_thread.cc return new DispatchThread(port, work_num, conn_factory,
    cron_interval, queue_limit, handle)
    hird/pink/pink/src/dispatch_thread.cc DispatchThread::DispatchThread(const std::set<std::string>& ips, int port, int work_num, ConnFactory* conn_factory, int cron_interval, int queue_limit, const ServerHandle* handle) : ServerThread::ServerThread(ips, port, cron_interval, handle),last_thread_(0), work_num_(work_num), queue_limit_(queue_limit). ServerThread::ServerThread(ips, port, cron_interval, handle)
    third/pink/pink/src/server_thread.cc int ServerThread::StartThread() {
    int ret = 0; ret = InitHandle(); //bind,listern实现
    if (ret != kSuccess) return ret; return Thread::StartThread(); //线程accept实现
    }
    return Thread::StartThread();
    third/pink/pink/src/pink_thread.cc void* Thread::RunThread(void arg) { Thread thread = reinterpret_cast<Thread*>(arg);
    if (!(thread->thread_name().empty())) {
    SetThreadName(pthread_self(),
    thread->thread_name());
    }
    thread->ThreadMain();
    third/pink/pink/src/server_thread.cc ThreadMain()

    相关文章

      网友评论

          本文标题:DispatchThread线程

          本文链接:https://www.haomeiwen.com/subject/pyepihtx.html