美文网首页
dpvs学习笔记: 7 tc流控的实现

dpvs学习笔记: 7 tc流控的实现

作者: 董泽润 | 来源:发表于2018-11-12 17:30 被阅读96次

    经常听说网络的 QOS(Quality Of Service), 进来的流量我们是无法控制的,只能根据策略去丢弃处理,一般只能对出去的流量进行整形 (Shaping),Traffic Control 就是一种方式,可以控制速率,丢包率等等。内核中有两种:无类别排队规则 CLASSLESS QDISCS,和有类别排队规则 CLASSFUL QDISCS

    无类别排队规则

    1. [p|b]fifo
      最简单的qdsic,先进先出策略,p 是 packet 数据包,b 是 byte 字节。pfifo 是普通的先入先出。而pfifo_fast包含三个波段,band0~band2,每个波段也使用先进先出策略,band0优先级最高,band2最低,如果band0中有数据包,系统就不会去处理band1的数据包,直到band0的数据包处理完。同理band1和band2。数据包会根据 TOS 字段值放入不同波段。


      pfifo_fast
    2. sfb(Stochastic Fairness Queueing)
      根据 tcp/udp 流量计算 hash 值,然后分配到不同队列。然后轮循发送每个队列的数据包,所以是随机公平队列。


      sfb
    3. tfb(Token Bucket Filter)
      基于令牌桶原理,做的限速。


      tfb

    有类别排队规则

    1. HTB(Hierarchy Token Bucket)


      层级令牌桶

      层级的也很好理解,根 root 有一个流控限速,子节点又可以对这个流量进行细分,叶子节点又是不同的规则。dpvs 使用 HTB

    2. CBQ(Class Based Queueing)

    TC小结

    这块还是比较复杂的,常用的就是 pfifo_fast 和 HTB 感兴趣可以参考官方说明

    dpvs tc 初始化及调用

    在 main 函数调用 tc_init 初始化,主要注册几个规则和分类器

        tc_register_qsch(&pfifo_sch_ops);
        tc_register_qsch(&bfifo_sch_ops);
        tc_register_qsch(&pfifo_fast_ops);
        tc_register_qsch(&tbf_sch_ops);
        /* classifier */
        tc_register_cls(&match_cls_ops);
    

    在网卡初始化时,会调用 tc_init_dev 初始化 tc

    int tc_init_dev(struct netif_port *dev)
    {
        int hash, size;
        struct netif_tc *tc = netif_tc(dev);
    
        memset(tc, 0, sizeof(*tc));
    
        rte_rwlock_init(&tc->lock);
    
        rte_rwlock_write_lock(&tc->lock);
    
        tc->dev = dev;
        tc->tc_mbuf_pool = tc_mbuf_pools[dev->socket];
    
        /* egress "root" Qsch, which handle is 0, parent is TC_H_ROOT. */
        tc->qsch = qsch_create_dflt(dev, default_qsch_ops, TC_H_ROOT);
        if (!tc->qsch) {
            rte_rwlock_write_unlock(&tc->lock);
            tc_destroy_dev(dev);
            return EDPVS_NOMEM;
        }
    
        tc->qsch_cnt = 1;
        tc->qsch_ingress = NULL;
    
        tc->qsch_hash_size = tc_qsch_hash_size;
        size = sizeof(struct hlist_head) * tc->qsch_hash_size;
    
        tc->qsch_hash = rte_malloc(NULL, size, RTE_CACHE_LINE_SIZE);
        if (!tc->qsch_hash) {
            rte_rwlock_write_unlock(&tc->lock);
            tc_destroy_dev(dev);
            return EDPVS_NOMEM;
        }
    
        for (hash = 0; hash < tc->qsch_hash_size; hash++)
            INIT_HLIST_HEAD(&tc->qsch_hash[hash]);
    
        rte_rwlock_write_unlock(&tc->lock);
        return EDPVS_OK;
    }
    
    1. qsch_create_dflt 创建默认的 qsch,根是 TC_H_ROOT
    2. 分配 qsch_hash 哈希桶,就是个二维数组,每个元素又是链表
      从源码可以看到,默认的就是 pfifo_fast. 这里还有小细节就是 RTE_CACHE_LINE_SIZE, cpu cache 对齐
    static struct Qsch_ops *default_qsch_ops = &pfifo_fast_ops;
    

    创建默认策略 qsch_create_dflt

    默认使用 pfifo_fast_ops 策略,那么看一下如何初始化的

    struct Qsch *qsch_create_dflt(struct netif_port *dev, struct Qsch_ops *ops,
                                  tc_handle_t parent)
    {
        int err;
        struct Qsch *sch;
        assert(dev && ops);
    
        tc_qsch_ops_get(ops);
    
        sch = sch_alloc(&dev->tc, ops);
        if (!sch) {
            tc_qsch_ops_put(ops);
            return NULL;
        }
    
        sch->parent = parent;
    
        if (ops->init && (err = ops->init(sch, NULL)) != EDPVS_OK) {
            tc_qsch_ops_put(ops);
            qsch_destroy(sch);
            return NULL;
        }
    
        return sch;
    }
    
    1. 调用 sch_alloc 分配 Qsch 结构体,这里还有细节
    2. 如果 ops 有 init 函数指针,那么执行
    static inline struct Qsch *sch_alloc(struct netif_tc *tc, struct Qsch_ops *ops)
    {
        struct Qsch *sch;
        unsigned int size = TC_ALIGN(sizeof(*sch)) + ops->priv_size;
        lcoreid_t cid;
    
        sch = rte_zmalloc(NULL, size, RTE_CACHE_LINE_SIZE);
        if (!sch)
            return NULL;
    
        for (cid = 0; cid < NELEMS(sch->q); cid++)
            tc_mbuf_head_init(&sch->q[cid]);
    
        INIT_LIST_HEAD(&sch->cls_list);
        INIT_HLIST_NODE(&sch->hlist);
        sch->tc = tc;
        sch->ops = ops;
        rte_atomic32_set(&sch->refcnt, 1);
    
        return sch;
    }
    
    1. 可以看到 rte_zmalloc 分配 Qsch 结构体时,还带上了 ops->priv_size 大小的内存区域,这块是给不同规则自己使用的
    2. 初始化 mbuf 队列
    static int pfifo_fast_init(struct Qsch *sch, const void *arg)
    {
        int band;
        lcoreid_t cid;
        struct pfifo_fast_priv *priv = qsch_priv(sch);
    
        for (cid = 0; cid < NELEMS(priv->q); cid++) {
            for (band = 0; band < PFIFO_FAST_BANDS; band++) {
                tc_mbuf_head_init(band2list_cpu(priv, band, cid));
            }
        }
    
        /* FIXME: txq_desc_nb is not set when alloc device.
         * we can move tc_init_dev to dev start phase but not
         * all dev will be start now, netif need be modified. */
    #if 0
        sch->limit = qsch_dev(sch)->txq_desc_nb;
    #else
        sch->limit = 128;
    #endif
        return EDPVS_OK;
    }
    
    struct pfifo_fast_priv {
        uint32_t bitmap[RTE_MAX_LCORE];
        struct tc_mbuf_head q[RTE_MAX_LCORE][PFIFO_FAST_BANDS];
    
    #define this_bitmap bitmap[rte_lcore_id()]
    #define this_pff_q  q[rte_lcore_id()]
    };
    

    再回头看 pfifo_fast 的初始化函数和 pfifo_fast_priv 结构体

    1. qsch_priv 获取私有内存区域,这块区域就是上面额外分配的 ops->priv_size 大小
    2. 初始化队列,每个队列有三个波段,即对应的 band0, band1, band2. 具体使用后面就能看到,sch->limit 写死的 128

    tc 数据出口

    在发送数据包 netif_xmit 时会调用 tc 控制函数 tc_handle_egress

    struct rte_mbuf *tc_handle_egress(struct netif_tc *tc,
                                      struct rte_mbuf *mbuf, int *ret)
    {
        int err = EDPVS_OK;
        struct Qsch *sch, *child_sch = NULL;
        struct tc_cls *cls;
        struct tc_cls_result cls_res;
        const int max_reclassify_loop = 8;
        int limit = 0;
    
        assert(tc && mbuf && ret);
    
        /* start from egress root qsch */
        sch = tc->qsch;
        if (unlikely(!sch)) {
            *ret = EDPVS_OK;
            return mbuf;
        }
    
        qsch_get(sch);
    
        /*
         * classify the traffic first.
         * support classify for child schedulers only.
         * it no classifier matchs, than use current scheduler.
         */
    again:
        list_for_each_entry(cls, &sch->cls_list, list) {
            if (unlikely(mbuf->packet_type != cls->pkt_type &&
                         cls->pkt_type != htons(ETH_P_ALL)))
                continue;
    
            err = cls->ops->classify(cls, mbuf, &cls_res);
            switch (err) {
            case TC_ACT_OK:
                break;
            case TC_ACT_SHOT:
                goto drop;
            default:
                continue;
            }
    
            if (unlikely(cls_res.drop))
                goto drop;
    
            child_sch = qsch_lookup(sch->tc, cls_res.sch_id);
    
            if (unlikely(!child_sch)) {
                RTE_LOG(WARNING, TC, "%s: target Qsch not exist.\n",
                        __func__);
                continue;
            }
    
            if (unlikely(child_sch->parent != sch->handle)) {
                RTE_LOG(WARNING, TC, "%s: classified to non-children scheduler\n",
                        __func__);
                qsch_put(child_sch);
                continue;
            }
    
            /* pass the packet to child scheduler */
            qsch_put(sch);
            sch = child_sch;
    
            if (unlikely(limit++ >= max_reclassify_loop)) {
                RTE_LOG(DEBUG, TC, "%s: exceed reclassify max loop.\n",
                        __func__);
                goto drop;
            }
    
            /* classify again for new selected Qsch */
            goto again;
        }
    
        /* this scheduler has no queue (for classify only) ? */
        if (unlikely(!sch->ops->enqueue))
            goto out; /* no need to set @ret */
    
        /* mbuf is always consumed (queued or dropped) */
        err = sch->ops->enqueue(sch, mbuf);
        mbuf = NULL;
        *ret = err;
    
        /* try dequeue and xmit */
        qsch_do_sched(sch);
    
    out:
        qsch_put(sch);
        return mbuf;
    
    drop:
        *ret = qsch_drop(sch, mbuf);
        qsch_put(sch);
        return NULL;
    }
    

    为了分析起来不那么复杂,先不考滤层级关系,忽略 again 循环。那么函数完成两个操作:

    1. enqueue 函数指针,将 mbuf 数据包入队,实际调用 pfifo_fast_enqueue
    2. qsch_do_sched 尝试将己有的数据包从队列取出,发送到网卡。

    tc 数据出口 enqueue 入队

    static int pfifo_fast_enqueue(struct Qsch *sch, struct rte_mbuf *mbuf)
    {
        int band, err;
        uint8_t prio = 0;
        struct pfifo_fast_priv *priv;
        struct tc_mbuf_head *qh;
    
        /* sch->limit is same as dev->txq_desc_nb */
        if (unlikely(sch->this_q.qlen >= sch->limit)) {
    #if defined(CONFIG_TC_DEBUG)
            RTE_LOG(WARNING, TC, "%s: queue is full.\n", __func__);
    #endif
            return qsch_drop(sch, mbuf);
        }
    
        if (unlikely(mbuf->udata64 > 0 && mbuf->udata64 <= TC_PRIO_MAX &&
                     mbuf->packet_type == ETH_P_IP))
            prio = (uint8_t)mbuf->udata64;
    
        band = prio2band[prio];
        priv = qsch_priv(sch);
        qh = band2list(priv, band);
        
        err = __qsch_enqueue_tail(sch, mbuf, qh);
        if (err == EDPVS_OK) {
            priv->this_bitmap |= (1 << band);
            sch->this_q.qlen++;
            sch->this_qstats.qlen++;
        }
    
        return err;
    }
    
    1. 首先判断是否超限 sch->this_q.qlen >= sch->limit, 如果超了直接丢弃
    2. prio 获取 TOS 字段值,这是一个四位的数,根据这个数来选择数据包落到哪个波段。prio2band 是值到波段的映射,具体值可以参考源码,一共 16 个
    3. __qsch_enqueue_tail 将 mbuf 添加到对应波段队列
    4. 将对应波段 map 置位,this_bitmap,表示这个波段有数据了

    tc 数据出口 qsch_do_sched 尝试发送数据

    void qsch_do_sched(struct Qsch *sch)
    {
        int quota = dev_tx_weight;
        int npkt;
    
        while (quota > 0 && sch_dequeue_xmit(sch, &npkt))
            quota -= npkt;
    
        return;
    }
    

    quota 是发送配额,代码写死 64 个,也就是说最多一次发送 64 个 mbuf

    static inline int sch_dequeue_xmit(struct Qsch *sch, int *npkt)
    {
        struct rte_mbuf *mbuf;
    
        *npkt = 1; /* TODO: bulk dequeue */
        mbuf = sch->ops->dequeue(sch);
        if (unlikely(!mbuf))
            return 0;
    
        netif_hard_xmit(mbuf, netif_port_get(mbuf->port));
        return sch_qlen(sch);
    }
    

    调用 dequeue 回调函数,然后将数据发送到网卡,这里回调 pfifo_fast_dequeue

    static struct rte_mbuf *pfifo_fast_dequeue(struct Qsch *sch)
    {
        struct pfifo_fast_priv *priv = qsch_priv(sch);
        int band = bitmap2band[priv->this_bitmap];
        struct tc_mbuf_head *qh;
        struct rte_mbuf *mbuf;
    
        if (unlikely(band < 0))
            return NULL;
    
        qh = band2list(priv, band);
        mbuf = __qsch_dequeue_head(sch, qh);
    
        if (mbuf) {
            sch->this_q.qlen--;
            sch->this_qstats.qlen--;
        }
    
        if (likely(qh->qlen == 0))
            priv->this_bitmap &= ~(1 << band);
    
        return mbuf;
    }
    

    this_bitmap 上文说过,如果对应波段有数据,那么对应位会置为1,一共 3 个波段,所以这个值范围是从 0 ~ 7,那么根据优先级原理,波段 0 发送完才能发送 波段 1,然后才是波段 2

    static const int bitmap2band[] = {-1, 0, 1, 0, 2, 0, 1, 0};
    

    所以这个 bitmap2band 数组实现的很巧妙。出队后如果 qh->qlen 长度为 0 了,那么再将 this_bitmap 的波段位置空。

    pfifo_fast 小结

    struct pfifo_fast_priv {
        uint32_t bitmap[RTE_MAX_LCORE];
        struct tc_mbuf_head q[RTE_MAX_LCORE][PFIFO_FAST_BANDS];
    
    #define this_bitmap bitmap[rte_lcore_id()]
    #define this_pff_q  q[rte_lcore_id()]
    };
    

    这是最简单,也是系统默认的流控规则。这里有一点要注意,dpdk 将工作锁在了每个核上,尽量不和其它核交互,所以队列 q 是每个核一个,每个队列有三个波段。

    其它规则实现

    1. [p|b]fifo 也不分什么波段,就是队列,但是入队出队有区别。pfifo 看数据包个数bfifo 看数据包大小。入队源码,唯一区别就是 if 判断。
    2. TBF 看了一下代码,标准的令牌桶算法实现。rate 速率,还有一个 peek 代表最大 burst 突发速率

    层级控制的实现

    刚才看到的都是单个规则实现,其实 tc 还有种层级控制,HTB 就是层级令牌桶的实现。就像一棵树一样,在 tc_handle_egress 有一段 again 代码,递归根据分类器去查找,最大查找深度是 max_reclassify_loop. 在 tc_init 时得知,当前只有一个 match_cls_ops 分类器。

    again:
        list_for_each_entry(cls, &sch->cls_list, list) {
            if (unlikely(mbuf->packet_type != cls->pkt_type &&
                         cls->pkt_type != htons(ETH_P_ALL)))
                continue;
            err = cls->ops->classify(cls, mbuf, &cls_res);
            switch (err) {
            case TC_ACT_OK:
                break;
            case TC_ACT_SHOT:
                goto drop;
            default:
                continue;
            }
            if (unlikely(cls_res.drop))
                goto drop;
    
            child_sch = qsch_lookup(sch->tc, cls_res.sch_id);
            ......
            /* pass the packet to child scheduler */
            qsch_put(sch);
            sch = child_sch;
    
            if (unlikely(limit++ >= max_reclassify_loop)) {
                RTE_LOG(DEBUG, TC, "%s: exceed reclassify max loop.\n",
                        __func__);
                goto drop;
            }
    
            /* classify again for new selected Qsch */
            goto again;
        }
    

    首先如果存在分类器,根据 classify 回调去分类,然后查找子规则 child_sch,直到匹配成功,返回 TC_ACT_OK

    层级控制 classify

    查看源码得知,classify 真正调用 match_classify

    static int match_classify(struct tc_cls *cls, struct rte_mbuf *mbuf,
                              struct tc_cls_result *result)
    {
        struct match_cls_priv *priv = tc_cls_priv(cls);
        struct dp_vs_match *m = &priv->match;
        struct ether_hdr *eh = rte_pktmbuf_mtod(mbuf, struct ether_hdr *);
        struct iphdr *iph = NULL;
        struct tcphdr *th;
        struct udphdr *uh;
        int offset = sizeof(*eh);
        __be16 pkt_type = eh->ether_type;
        __be16 sport, dport;
        struct netif_port *idev, *odev;
        struct vlan_ethhdr *veh;
        int err = TC_ACT_RECLASSIFY; /* by default */
    
        idev = netif_port_get_by_name(m->iifname);
        odev = netif_port_get_by_name(m->oifname);
        sport = dport = 0;
    
        /* support IPv4 and 802.1q/IPv4 */
    l2parse:
        switch (ntohs(pkt_type)) {
        case ETH_P_IP:
            if (mbuf_may_pull(mbuf, offset + sizeof(struct iphdr)) != 0) {
                err = TC_ACT_SHOT;
                goto done;
            }
    
            iph = rte_pktmbuf_mtod_offset(mbuf, struct iphdr *, offset);
    
            /* check if source/dest IP in range */
            if (m->srange.max_addr.in.s_addr != htonl(INADDR_ANY)) {
                if (ntohl(iph->saddr) < ntohl(m->srange.min_addr.in.s_addr) ||
                    ntohl(iph->saddr) > ntohl(m->srange.max_addr.in.s_addr))
                    goto done;
            }
    
            if (m->drange.max_addr.in.s_addr != htonl(INADDR_ANY)) {
                if (ntohl(iph->daddr) < ntohl(m->drange.min_addr.in.s_addr) ||
                    ntohl(iph->daddr) > ntohl(m->drange.max_addr.in.s_addr))
                    goto done;
            }
    
            offset += (iph->ihl << 2);
            break;
    
        case ETH_P_8021Q:
            veh = (struct vlan_ethhdr *)eh;
            pkt_type = veh->h_vlan_encapsulated_proto;
            offset += VLAN_HLEN;
            goto l2parse;
    
        default:
            goto done;
        }
        switch (iph->protocol) {
        case IPPROTO_TCP:
            if (mbuf_may_pull(mbuf, offset + sizeof(struct tcphdr)) != 0) {
                err = TC_ACT_SHOT;
                goto done;
            }
    
            th = rte_pktmbuf_mtod_offset(mbuf, struct tcphdr *, offset);
            sport = th->source;
            dport = th->dest;
            break;
    
        case IPPROTO_UDP:
            if (mbuf_may_pull(mbuf, offset + sizeof(struct udphdr)) != 0) {
                err = TC_ACT_SHOT;
                goto done;
            }
    
            uh = rte_pktmbuf_mtod_offset(mbuf, struct udphdr *, offset);
            sport = uh->source;
            dport = uh->dest;
            break;
    
        default: /* priv->proto is not assigned */
            goto match;
        }
    
        /* check if source/dest port in range */
        if (m->srange.max_port) {
            if (ntohs(sport) < ntohs(m->srange.min_port) ||
                ntohs(sport) > ntohs(m->srange.max_port))
                goto done;
        }
    
        if (m->drange.max_port) {
            if (ntohs(dport) < ntohs(m->drange.min_port) ||
                ntohs(dport) > ntohs(m->drange.max_port))
                goto done;
        }
    
    match:
        /* all matchs */
        *result = priv->result;
        err = TC_ACT_OK;
    
    done:
        return err;
    }
    
    1. 根据 dp_vs_match,获取数据包进入的网卡,和要出去的网卡
    2. 根据 dp_vs_match,判断数据包的 sport, dport 是否在范围之内
    3. 判断 pkt_type 网络包类型,当前仅支持 ipv4, ETH_P_8021Q (马上要支持 ipv6)
    4. 获取 tcp 或是 udp 的源目的端口用于 debug

    层级控制 查找qsch_lookup

    child_sch = qsch_lookup(sch->tc, cls_res.sch_id);
    

    根据 cls_res 匹配结果,查找子规则

    struct Qsch *qsch_lookup_noref(const struct netif_tc *tc, tc_handle_t handle)
    {
        int hash;
        struct Qsch *sch;
        assert(tc->qsch_hash && tc->qsch_hash_size);
    
        if (likely(tc->qsch && tc->qsch->handle == handle))
            return tc->qsch;
    
        hash = sch_hash(handle, tc->qsch_hash_size);
        hlist_for_each_entry(sch, &tc->qsch_hash[hash], hlist) {
            if (likely(sch->handle == handle))
                return sch;
        }
    
        if (tc->qsch_ingress && tc->qsch_ingress->handle == handle)
            return tc->qsch_ingress;
    
        return NULL;
    }
    

    可以看到,从 qsch_hash 就是个哈希桶,找到后返回。最终处理和单个规则执行一样。

    总结

    这块比较复杂,暂时也不深入了,以后有机会看看内核 tc 的实现

    相关文章

      网友评论

          本文标题:dpvs学习笔记: 7 tc流控的实现

          本文链接:https://www.haomeiwen.com/subject/sranxqtx.html