美文网首页
dpvs学习笔记: 6 定时器实现及连接老化超时

dpvs学习笔记: 6 定时器实现及连接老化超时

作者: 董泽润 | 来源:发表于2018-11-08 18:20 被阅读105次

    定时器实现方式很多种,定时器个数也不同。比如 go 老版本只有一个全局定时器,所以有些高性能项目抛弃了,自己实现用户级别的定时器,并且开启 n 个。但是在新版本没必要了,默认 64 个。dpvs 由于流表可能巨大,并且处在不同状态的 tcp 连接超时时间还不一样,如果实现的低效,会非常影响性能。所以 dpvs 在利用 dpdk 定时器的基础上,自实现了一个,算法是常见的时间轮。

    数据结构

    struct timer_scheduler {
        /* wheels and cursors */
        rte_spinlock_t      lock;
        uint32_t            cursors[LEVEL_DEPTH];
        struct list_head    *hashs[LEVEL_DEPTH];
    
        /* leverage dpdk rte_timer to drive us */
        struct rte_timer    rte_tim;
    };
    

    lock 是锁,增删改查都要锁。rte_tim 是 dpdk 库自身的定时器,他只负责时间嘀嗒,也就是 tick ++ 操作。cursors 和 hashs 配合使用,构成时间轮。

    cursors 保存嘀嗒计数,从 0 到 LEVEL_SIZE (2<<18), LEVEL_DEPTH 值为 2,也就是说 cursors[0] 可以保存 2<<18 个嘀嗒,如果 DPVS_TIMER_HZ = 1000,那么就是 524s. cursors[0] 驱动 cursors[1] 时间轮,两个轮一共 8.7 年时间。hashs 是一个长度为 2 的数组,每个元素是一个链表数组,长度是 LEVEL_SIZE (2<<18), 链表成员就是具体的定时器消息。

    初始化

    首先,dpvs 调用 rte_timer_subsystem_init 初始化 dpdk 库的定时器。然后 dpvs_timer_init 初始化 dpvs 定时器。

    int dpvs_timer_init(void)
    {
        lcoreid_t cid;
        int err;
    
        /* per-lcore timer */
        // 每个 logic core 一个定时器
        rte_eal_mp_remote_launch(timer_lcore_init, NULL, SKIP_MASTER);
        RTE_LCORE_FOREACH_SLAVE(cid) {
            err = rte_eal_wait_lcore(cid);
            if (err < 0) {
                RTE_LOG(ERR, DTIMER, "%s: lcore %d: %s.\n", 
                        __func__, cid, dpvs_strerror(err));
                return err;
            }
        }
    
        /* global timer */
        return timer_init_schedler(&g_timer_sched, rte_get_master_lcore());
    }
    

    看源码得知,每个 slave lcore 都要调 timer_lcore_init 初始化自己的。然后再初始化全局的 g_timer_sched.

    初始化 timer_lcore_init

    static int timer_init_schedler(struct timer_scheduler *sched, lcoreid_t cid)
    {
        int i, l;
    
        rte_spinlock_init(&sched->lock);
    
    
        rte_spinlock_lock(&sched->lock);
        for (l = 0; l < LEVEL_DEPTH; l++) {
            sched->cursors[l] = 0;
    
            sched->hashs[l] = rte_malloc(NULL,
                                         sizeof(struct list_head) * LEVEL_SIZE, 0);
            if (!sched->hashs[l]) {
                RTE_LOG(ERR, DTIMER, "[%02d] no memory.\n", cid);
                return EDPVS_NOMEM;
            }
    
            for (i = 0; i < LEVEL_SIZE; i++)
                INIT_LIST_HEAD(&sched->hashs[l][i]);
        }
        rte_spinlock_unlock(&sched->lock);
    
        rte_timer_init(&sched->rte_tim);
        /* ticks should be exactly same with precision */
        if (rte_timer_reset(&sched->rte_tim, rte_get_timer_hz() / DPVS_TIMER_HZ,
                            PERIODICAL, cid, rte_timer_tick_cb, sched) != 0) {
            RTE_LOG(ERR, DTIMER, "[%02d] fail to reset rte timer.\n", cid);
            return EDPVS_INVAL;
        }
    
        RTE_LOG(DEBUG, DTIMER, "[%02d] timer initialized %p.\n", cid, sched);
        return EDPVS_OK;
    }
    

    代码蛮详细了,需要注意两点

    1. hashs 是一个二维数组,大小是 2 * sizeof(struct list_head) * LEVEL_SIZE,空间换时间
    2. 利用了 dpdk 自身的定时器去维护嘀嗒,通过 rte_timer_reset 来注册,回调函数是 rte_timer_tick_cb,间隔是 rte_get_timer_hz() / DPVS_TIMER_HZ ticks

    回调函数 rte_timer_tick_cb

    static void rte_timer_tick_cb(struct rte_timer *tim, void *arg)
    {
        struct timer_scheduler *sched = arg;
        struct dpvs_timer *timer, *next;
        uint64_t left, hash, off;
        int level, lower;
        uint32_t *cursor;
        bool carry;
    
        rte_spinlock_lock(&sched->lock);
        /* drive timer to move and handle expired timers. */
        for (level = 0; level < LEVEL_DEPTH; level++) {
            cursor = &sched->cursors[level];
            (*cursor)++; // tick ++ 
    
            if (*cursor < LEVEL_SIZE) {
                carry = false;
            } else {
                /* reset the cursor and handle next level later. */
                *cursor = 0;
                carry = true;
            }
            // 遍历当前 cursor 链表
            list_for_each_entry_safe(timer, next,
                                     &sched->hashs[level][*cursor], list) {
                /* is all lower levels ticks empty ? */
                left = timer->delay % get_level_ticks(level);
                if (!left) {
                    timer_expire(sched, timer);
                } else {
                    /* drop to lower level wheel, note it may not drop to
                     * "next" lower level wheel. */
                    list_del(&timer->list);
    
                    lower = level;
                    while (--lower >= 0) {
                        off = timer->delay / get_level_ticks(lower);
                        if (!off)
                            continue; /* next lower level */
    
                        hash = (*cursor + off) % LEVEL_SIZE;
                        list_add_tail(&timer->list, &sched->hashs[lower][hash]);
                        break;
                    }
                    assert(lower >= 0);
                }
            }
            if (!carry)
                break;
        }
        rte_spinlock_unlock(&sched->lock);
        return;
    }
    
    1. 当 level 0 时,使用第一个轮,并调用 (*cursor)++ 将嘀嗒加一。如果嘀嗒值大于 LEVEL_SIZE 说明第一个轮用完了,重置为 0 并设置 carry 进位标记。
    2. 遍历当前轮的 hash 数组,get_level_ticks 获取每一层的轮步进一个单位代表多少嘀嗒,当 level=0 时返回 1, 当 level=1时返回 LEVEL_SIZE。left = timer->delay % get_level_ticks(level) 如果 left 为 0 说明这个定时任务属于当前时间轮,并且己经过期了,触发 timer_expire 回调。
    3. 如果 left 有值,说明还没到超时时间。但是时间轮己经触发一次了,所以要降级到 --lower 层。从原有的链表中调用 list_del 删除,然后 timer->delay / get_level_ticks(lower) 求出在当前层步进单位个数,(*cursor + off) % LEVEL_SIZE 求出 hash 索引,然后 list_add_tail 添加到对应链表中。
    4. 如果没有进位,也就是 carry 未标记,那么退出循环。否则处理下一层的轮。

    添加超时任务

    dp_vs_conn_new 新建连接时会调用 dpvs_timer_sched 添加超时任务

    dpvs_timer_sched(&new->timer, &new->timeout, conn_expire, new, true);
    

    看一下具体的 dpvs_timer_sched 实现

    static int __dpvs_timer_sched(struct timer_scheduler *sched,
                                  struct dpvs_timer *timer, struct timeval *delay,
                                  dpvs_timer_cb_t handler, void *arg, bool period)
    {
        uint32_t off, hash;
        int level;
    
        assert(timer && delay && handler);
    
        if (timer_pending(timer))
            RTE_LOG(WARNING, DTIMER, "schedule a pending timer ?\n");
    
        timer->handler = handler;
        timer->priv = arg;
        timer->is_period = period;
        timer->delay = timeval_to_ticks(delay);
    
        if (unlikely(timer->delay >= TIMER_MAX_TICKS)) {
            RTE_LOG(WARNING, DTIMER, "exceed timer range\n");
            return EDPVS_INVAL;
        }
    
        /*
         * to schedule a 0 delay timer is not make sence.
         * and it will never stopped (periodic) or never triggered (one-shut).
         */
        if (unlikely(!timer->delay)) {
            RTE_LOG(WARNING, DTIMER, "schedule 0 timeout timer.\n");
            return EDPVS_INVAL;
        }
    
        /* add to corresponding wheel, from higher level to lower. */
        for (level = LEVEL_DEPTH - 1; level >= 0; level--) {
            off = timer->delay / get_level_ticks(level);
            if (off > 0) {
                hash = (sched->cursors[level] + off) % LEVEL_SIZE;
                list_add_tail(&timer->list, &sched->hashs[level][hash]);
                return EDPVS_OK;
            }
        }
    
        /* not adopted by any wheel (never happend) */
        return EDPVS_INVAL;
    }
    
    1. 生成 timer 任务结构体,最重要的是 timeval_to_ticks 根据超时时间生成嘀嗒,也就是说过了 delay 个嘀嗒后超时。
    2. 各种异常 case 检测,不能为 0,不能超过最大 TIMER_MAX_TICKS
    3. 从最大层开始遍历时间轮,添加到指定链表中。off 是在当前轮中需要多少步进单位,hash 求出索引。

    连接老化处理

    在 tcp 每个阶段,都有不同的超时时间,并且超时到期后动作也有所不同,

    static int conn_expire(void *priv)
    {
        struct dp_vs_conn *conn = priv;
        struct dp_vs_proto *pp;
        struct rte_mbuf *cloned_syn_mbuf;
        struct dp_vs_synproxy_ack_pakcet *ack_mbuf, *t_ack_mbuf;
        struct rte_mempool *pool;
        assert(conn);
    
        /* set proper timeout */
        unsigned conn_timeout = 0;
    
        pp = dp_vs_proto_lookup(conn->proto);
        if (((conn->proto == IPPROTO_TCP) &&
            (conn->state == DPVS_TCP_S_ESTABLISHED)) ||
            ((conn->proto == IPPROTO_UDP) &&
            (conn->state == DPVS_UDP_S_NORMAL))) {
            conn_timeout = dp_vs_get_conn_timeout(conn);
            if (unlikely(conn_timeout > 0))
                conn->timeout.tv_sec = conn_timeout;
            else if (pp && pp->timeout_table)
                conn->timeout.tv_sec = pp->timeout_table[conn->state];
            else
                conn->timeout.tv_sec = 60;
        }
        else if (pp && pp->timeout_table)
            conn->timeout.tv_sec = pp->timeout_table[conn->state];
        else
            conn->timeout.tv_sec = 60;
    

    dp_vs_get_conn_timeout 获取超时时间,查看源码是在配置 dpvs 时后端服务的超时时间

        dpvs_time_rand_delay(&conn->timeout, 1000000);
        rte_atomic32_inc(&conn->refcnt);
    
        /* retransmit syn packet to rs */
        if (conn->syn_mbuf && rte_atomic32_read(&conn->syn_retry_max) > 0) {
            if (likely(conn->packet_xmit != NULL)) {
                pool = get_mbuf_pool(conn, DPVS_CONN_DIR_INBOUND);
                if (unlikely(!pool)) {
                    RTE_LOG(WARNING, IPVS, "%s: no route for syn_proxy rs's syn "
                            "retransmit\n", __func__);
                } else {
                    cloned_syn_mbuf = rte_pktmbuf_clone(conn->syn_mbuf, pool);
                    if (unlikely(!cloned_syn_mbuf)) {
                        RTE_LOG(WARNING, IPVS, "%s: no memory for syn_proxy rs's syn "
                                "retransmit\n", __func__);
                    } else {
                        cloned_syn_mbuf->userdata = NULL;
                        conn->packet_xmit(pp, conn, cloned_syn_mbuf);
                    }
                }
            }
    
            rte_atomic32_dec(&conn->syn_retry_max);
            dp_vs_estats_inc(SYNPROXY_RS_ERROR);
    
            /* expire later */
            dp_vs_conn_put(conn);
            return DTIMER_OK;
        }
    

    如果支持 syn proxy, 并且当前 syn_mbuf 还在,syn_retry_max 重试次数大于 0,重发给 client

        /* somebody is controlled by me, expire later */
        if (rte_atomic32_read(&conn->n_control)) {
            dp_vs_conn_put(conn);
            return DTIMER_OK;
        }
    

    其实不太理解这个 n_control 有什么用,慢慢看吧

        /* unhash it then no further user can get it,
         * even we cannot del it now. */
        conn_unhash(conn);
    

    这是重点,删除连接,都超时了留着有啥用,清理流表

        /* refcnt == 1 means we are the only referer.
         * no one is using the conn and it's timed out. */
        if (rte_atomic32_read(&conn->refcnt) == 1) {
            struct dp_vs_proto *proto = dp_vs_proto_lookup(conn->proto);
    
            if (conn->flags & DPVS_CONN_F_TEMPLATE)
                dpvs_timer_cancel(&conn->timer, true);
            else
                dpvs_timer_cancel(&conn->timer, false);
    
            /* I was controlled by someone */
            if (conn->control)
                dp_vs_control_del(conn);
    
            if (proto && proto->conn_expire)
                proto->conn_expire(proto, conn);
    
            if (conn->dest->fwdmode == DPVS_FWD_MODE_SNAT
                    && conn->proto != IPPROTO_ICMP) {
                struct sockaddr_in daddr, saddr;
    
                memset(&daddr, 0, sizeof(daddr));
                daddr.sin_family = AF_INET;
                daddr.sin_addr = conn->caddr.in;
                daddr.sin_port = conn->cport;
    
                memset(&saddr, 0, sizeof(saddr));
                saddr.sin_family = AF_INET;
                saddr.sin_addr = conn->vaddr.in;
                saddr.sin_port = conn->vport;
    
                sa_release(conn->out_dev, &daddr, &saddr);
            }
    
            conn_unbind_dest(conn);
            dp_vs_laddr_unbind(conn);
    
            /* free stored ack packet */
            list_for_each_entry_safe(ack_mbuf, t_ack_mbuf, &conn->ack_mbuf, list) {
                list_del_init(&ack_mbuf->list);
                rte_pktmbuf_free(ack_mbuf->mbuf);
                sp_dbg_stats32_dec(sp_ack_saved);
                rte_mempool_put(this_ack_mbufpool, ack_mbuf);
            }
            conn->ack_num = 0;
    
            /* free stored syn mbuf */
            if (conn->syn_mbuf) {
                rte_pktmbuf_free(conn->syn_mbuf);
                sp_dbg_stats32_dec(sp_syn_saved);
            }
    
            rte_atomic32_dec(&conn->refcnt);
    
            rte_mempool_put(conn->connpool, conn);
            this_conn_count--;
    
    #ifdef CONFIG_DPVS_IPVS_STATS_DEBUG
            conn_stats_dump("del conn", conn);
    #endif
    #ifdef CONFIG_DPVS_IPVS_DEBUG
            conn_dump("del conn: ", conn);
    #endif
            return DTIMER_STOP;
        }
    

    如果引用计数 1,说明只有自己在用,获取协义,调用协义的连接老化函数 proto->conn_expire,这里还要兼容 DPVS_FWD_MODE_SNAT,我觉得代码写的丑... conn_unbind_dest 解除连接和后端的引用,dp_vs_laddr_unbind 释放本地地址。最后清理 ack_mbuf, syn_mbuf 等等

        conn_hash(conn);
    

    如果走到这里,说明连接还有人在用,那么加回流表

        /* some one is using it when expire,
         * try del it again later */
        if (conn->flags & DPVS_CONN_F_TEMPLATE)
            dpvs_timer_update(&conn->timer, &conn->timeout, true);
        else
            dpvs_timer_update(&conn->timer, &conn->timeout, false);
    
        rte_atomic32_dec(&conn->refcnt);
        return DTIMER_OK;
    }
    

    如果走到这里,说明连接还有人在用,那么更新超时时间加回定时器。

    tcp连接清除

    static int tcp_conn_expire(struct dp_vs_proto *proto, 
                           struct dp_vs_conn *conn)
    {
        int err;
        assert(proto && conn && conn->dest);
    
        if (conn->dest->fwdmode == DPVS_FWD_MODE_NAT 
                || conn->dest->fwdmode == DPVS_FWD_MODE_FNAT) {
            /* send RST to RS and client */
            err = tcp_send_rst(proto, conn, DPVS_CONN_DIR_INBOUND);
            if (err != EDPVS_OK)
                RTE_LOG(WARNING, IPVS, "%s: fail RST RS.\n", __func__);
            err = tcp_send_rst(proto, conn, DPVS_CONN_DIR_OUTBOUND);
            if (err != EDPVS_OK)
                RTE_LOG(WARNING, IPVS, "%s: fail RST Client.\n", __func__);
        }
    
        return EDPVS_OK;
    }
    

    代码很好理解,双向发送 rst

    小结

    这块实现的比较标准,大量连接时流表就是会很膨胀,很考验定时器性能。时间轮+多个定时器,想要高性能必备。

    相关文章

      网友评论

          本文标题:dpvs学习笔记: 6 定时器实现及连接老化超时

          本文链接:https://www.haomeiwen.com/subject/lqavxqtx.html