美文网首页
redis6.0的multi threading设计

redis6.0的multi threading设计

作者: 耐寒 | 来源:发表于2021-01-29 14:39 被阅读0次

    先上图,给个整体设计:


    image.png

    画外音:以下内容凌乱,仅是让自己看懂而已。

    引入多线程IO后的两个问题:

    1. 原子性;
    2. 顺序性;

    主线程

    • 调用initServer干初始化工作

    initServer时,调用aeCreateFileEvent创建accept事件,当该server fd可读时(即client连接时),会调用到acceptTcpHandler函数,即创建client对象,并且每一个client的连接会创建一个aeFileEvent,并且设置该连接(或者说事件)的rfileProc为readQueryFromClient,readQueryFromClient处理该连接发送过来的所有数据。

    void initServer(void) {
    
        ...  // 代码省略
    
        /* Create an event handler for accepting new connections in TCP and Unix
    
         * domain sockets. */
    
        for (j = 0; j < server.ipfd_count; j++) {
    
            if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
    
                acceptTcpHandler,NULL) == AE_ERR)
    
                {
    
                    serverPanic(
    
                        "Unrecoverable error creating server.ipfd file event.");
    
                }
    
        }
    
        if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
    
            acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
    
        ... // 代码省略
    
    }
    
    void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    
        int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    
        char cip[NET_IP_STR_LEN];
    
        UNUSED(el);
    
        UNUSED(mask);
    
        UNUSED(privdata);
    
        while(max--) {
    
            cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
    
            if (cfd == ANET_ERR) {
    
                if (errno != EWOULDBLOCK)
    
                    serverLog(LL_WARNING,
    
                        "Accepting client connection: %s", server.neterr);
    
                return;
    
            }
    
            serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
    
            acceptCommonHandler(cfd,0,cip);
    
        }
    
    }
    
    static void acceptCommonHandler(int fd, int flags, char *ip) {
    
        client *c;
    
        if ((c = createClient(fd)) == NULL) {
    
            serverLog(LL_WARNING,
    
                "Error registering fd event for the new client: %s (fd=%d)",
    
                strerror(errno),fd);
    
            close(fd); /* May be already closed, just ignore errors */
    
            return;
    
        }
    
        ... // 代码省略
    
    }
    
    client *createClient(int fd) {
    
        client *c = zmalloc(sizeof(client));
    
        /* passing -1 as fd it is possible to create a non connected client.
    
         * This is useful since all the commands needs to be executed
    
         * in the context of a client. When commands are executed in other
    
         * contexts (for instance a Lua script) we need a non connected client. */
    
        if (fd != -1) {
    
            anetNonBlock(NULL,fd);
    
            anetEnableTcpNoDelay(NULL,fd);
    
            if (server.tcpkeepalive)
    
                anetKeepAlive(NULL,fd,server.tcpkeepalive);
    
            if (aeCreateFileEvent(server.el,fd,AE_READABLE,
    
                readQueryFromClient, c) == AE_ERR)
    
            {
    
                close(fd);
    
                zfree(c);
    
                return NULL;
    
            }
    
        }
    
        ... // 代码省略
    
    }
    

    这里有两个非常重要的概念:

    • event,即事件,这些事件分成了三类:

      • aeFileEvent :File event

      • aeTimeEvent :Time event

      • aeFiredEvent :fired event

    他们的定义如下,比如file event即为网络事件,封装了read和write的回调函数,mask即为可读或者可写事件(AE_READABLE, AE_WRITABLE)。当我们调用aeCreateFileEvent时会把该event添加到全局的eventloop里,哦,那eventloop是什么东西,以及如何管理file event的呢?

    /* File event structure */
    
    typedef struct aeFileEvent {
    
        int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    
        aeFileProc *rfileProc;
    
        aeFileProc *wfileProc;
    
        void *clientData;
    
    } aeFileEvent;
    
    /* Time event structure */
    
    typedef struct aeTimeEvent {
    
        long long id; /* time event identifier. */
    
        long when_sec; /* seconds */
    
        long when_ms; /* milliseconds */
    
        aeTimeProc *timeProc;
    
        aeEventFinalizerProc *finalizerProc;
    
        void *clientData;
    
        struct aeTimeEvent *prev;
    
        struct aeTimeEvent *next;
    
    } aeTimeEvent;
    
    /* A fired event */
    
    typedef struct aeFiredEvent {
    
        int fd;
    
        int mask;
    
    } aeFiredEvent;
    
    *   aeEventLoop
    
    /* State of an event based program */
    
    typedef struct aeEventLoop {
    
        int maxfd;   /* highest file descriptor currently registered */
    
        int setsize; /* max number of file descriptors tracked */
    
        long long timeEventNextId;
    
        time_t lastTime;     /* Used to detect system clock skew */
    
        aeFileEvent *events; /* Registered events */
    
        aeFiredEvent *fired; /* Fired events */
    
        aeTimeEvent *timeEventHead;
    
        int stop;
    
        void *apidata; /* This is used for polling API specific data */
    
        aeBeforeSleepProc *beforesleep;
    
        aeBeforeSleepProc *aftersleep;
    
    } aeEventLoop;
    

    eventloop就是事件循环,上文中的事件都是保存在这个结构里,多路复用,如select和epoll的私有数据就是放在apidata里。还有两个重要的回调函数 beforesleep和aftersleep,这两个函数的作用等下再说。

    • 调用aeMain函数开始一直努力干活了,
    int main(int argc, char **argv) {
    
        ...
    
        initServer();
    
        ...
    
        aeSetBeforeSleepProc(server.el,beforeSleep);
    
        aeSetAfterSleepProc(server.el,afterSleep);
    
        aeMain(server.el);
    
        aeDeleteEventLoop(server.el);
    
        return 0;
    
    }
    
    void aeMain(aeEventLoop *eventLoop) {
    
        eventLoop->stop = 0;
    
        while (!eventLoop->stop) {
    
            if (eventLoop->beforesleep != NULL)
    
                eventLoop->beforesleep(eventLoop);
    
            aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    
        }
    
    }
    
    int aeProcessEvents(aeEventLoop *eventLoop, int flags)
    
    {
    
        int processed = 0, numevents;
    
            /* Call the multiplexing API, will return only on timeout or when
    
             * some event fires. */
    
            numevents = aeApiPoll(eventLoop, tvp);
    
            /* After sleep callback. */
    
            if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
    
                eventLoop->aftersleep(eventLoop);
    
            for (j = 0; j < numevents; j++) {
    
                aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
    
                int mask = eventLoop->fired[j].mask;
    
                int fd = eventLoop->fired[j].fd;
    
                int fired = 0; /* Number of events fired for current fd. */
    
                int invert = fe->mask & AE_BARRIER;
    
                if (!invert && fe->mask & mask & AE_READABLE) {
    
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
    
                    fired++;
    
                }
    
                /* Fire the writable event. */
    
                if (fe->mask & mask & AE_WRITABLE) {
    
                    if (!fired || fe->wfileProc != fe->rfileProc) {
    
                        fe->wfileProc(eventLoop,fd,fe->clientData,mask);
    
                        fired++;
    
                    }
    
                }
    
                /* If we have to invert the call, fire the readable event now
    
                 * after the writable one. */
    
                if (invert && fe->mask & mask & AE_READABLE) {
    
                    if (!fired || fe->wfileProc != fe->rfileProc) {
    
                        fe->rfileProc(eventLoop,fd,fe->clientData,mask);
    
                        fired++;
    
                    }
    
                }
    
                processed++;
    
            }
    
        }
    
        ...
    
        return processed; /* return the number of processed file/time events */
    
    }
    

    对于client的请求来讲,rfileProc就是readQueryFromClient,那readQueryFromClient在multi thread IO的配置下,是如何工作的呢?

    void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    
        /* Check if we want to read from the client later when exiting from
    
         * the event loop. This is the case if threaded I/O is enabled. */
    
        if (postponeClientRead(c)) return;
    
        ... // 代码略
    
    }
    
    /* Return 1 if we want to handle the client read later using threaded I/O.
    
    * This is called by the readable handler of the event loop.
    
    * As a side effect of calling this function the client is put in the
    
    * pending read clients and flagged as such. */
    
    int postponeClientRead(client *c) {
    
        if (io_threads_active &&
    
            server.io_threads_do_reads &&
    
            !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
    
        {
    
            c->flags |= CLIENT_PENDING_READ;
    
            listAddNodeHead(server.clients_pending_read,c);
    
            return 1;
    
        } else {
    
            return 0;
    
        }
    
    }
    

    也就是说在多线程IO的情况下,主线程仅仅是把每一个可读连接的客户端放在一个队列里,即server.clients_pending_read里。等到

    IO线程

    每个IO线程的入口函数IOThreadMain,对于客户端发送数据到redis时,主线程把所有的读事件按照Round Robin的方式放入每一个IO线程,IO线程干三件比较重要的事情:

    1. 里从它的io_threads_list读事件链表里一个个读取出来,并且read每一个fd的数据,然后放在每个连接的querybuf;

    2. 解析为一个个可执行command,详见 readQueryFromClient的实现;

    3. 设置io_threads_pending[id]为0,即“通知”主线程,已经完成所有的读实现了

    void *IOThreadMain(void *myid) {
    
        /* The ID is the thread number (from 0 to server.iothreads_num-1), and is
    
         * used by the thread to just manipulate a single sub-array of clients. */
    
        long id = (unsigned long)myid;
    
        while(1) {
    
            /* Wait for start */
    
            for (int j = 0; j < 1000000; j++) {
    
                if (io_threads_pending[id] != 0) break;
    
            }
    
            /* Give the main thread a chance to stop this thread. */
    
            if (io_threads_pending[id] == 0) {
    
                pthread_mutex_lock(&io_threads_mutex[id]);
    
                pthread_mutex_unlock(&io_threads_mutex[id]);
    
                continue;
    
            }
    
            serverAssert(io_threads_pending[id] != 0);
    
            if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));
    
            /* Process: note that the main thread will never touch our list
    
             * before we drop the pending count to 0\. */
    
            listIter li;
    
            listNode *ln;
    
            listRewind(io_threads_list[id],&li);
    
            while((ln = listNext(&li))) {
    
                client *c = listNodeValue(ln);
    
                if (io_threads_op == IO_THREADS_OP_WRITE) {
    
                    writeToClient(c->fd,c,0);
    
                } else if (io_threads_op == IO_THREADS_OP_READ) {
    
                    readQueryFromClient(NULL,c->fd,c,0);
    
                } else {
    
                    serverPanic("io_threads_op value is unknown");
    
                }
    
            }
    
            listEmpty(io_threads_list[id]);
    
            io_threads_pending[id] = 0;
    
            if (tio_debug) printf("[%ld] Done\n", id);
    
        }
    
    }
    
    • 什么是Pending Read和Pending Write

    最后给个图:


    redis6.0_multi_threading.jpg

    相关文章

      网友评论

          本文标题:redis6.0的multi threading设计

          本文链接:https://www.haomeiwen.com/subject/rivqtltx.html