经常听说网络的 QOS(Quality Of Service), 进来的流量我们是无法控制的,只能根据策略去丢弃处理,一般只能对出去的流量进行整形 (Shaping),Traffic Control 就是一种方式,可以控制速率,丢包率等等。内核中有两种:无类别排队规则 CLASSLESS QDISCS,和有类别排队规则 CLASSFUL QDISCS
无类别排队规则
-
[p|b]fifo
最简单的qdsic,先进先出策略,p 是 packet 数据包,b 是 byte 字节。pfifo 是普通的先入先出。而pfifo_fast包含三个波段,band0~band2,每个波段也使用先进先出策略,band0优先级最高,band2最低,如果band0中有数据包,系统就不会去处理band1的数据包,直到band0的数据包处理完。同理band1和band2。数据包会根据 TOS 字段值放入不同波段。
pfifo_fast -
sfb(Stochastic Fairness Queueing)
根据 tcp/udp 流量计算 hash 值,然后分配到不同队列。然后轮循发送每个队列的数据包,所以是随机公平队列。
sfb -
tfb(Token Bucket Filter)
基于令牌桶原理,做的限速。
tfb
有类别排队规则
-
HTB(Hierarchy Token Bucket)
层级令牌桶
层级的也很好理解,根 root 有一个流控限速,子节点又可以对这个流量进行细分,叶子节点又是不同的规则。dpvs 使用 HTB
- CBQ(Class Based Queueing)
TC小结
这块还是比较复杂的,常用的就是 pfifo_fast 和 HTB 感兴趣可以参考官方说明
dpvs tc 初始化及调用
在 main 函数调用 tc_init
初始化,主要注册几个规则和分类器
tc_register_qsch(&pfifo_sch_ops);
tc_register_qsch(&bfifo_sch_ops);
tc_register_qsch(&pfifo_fast_ops);
tc_register_qsch(&tbf_sch_ops);
/* classifier */
tc_register_cls(&match_cls_ops);
在网卡初始化时,会调用 tc_init_dev
初始化 tc
int tc_init_dev(struct netif_port *dev)
{
int hash, size;
struct netif_tc *tc = netif_tc(dev);
memset(tc, 0, sizeof(*tc));
rte_rwlock_init(&tc->lock);
rte_rwlock_write_lock(&tc->lock);
tc->dev = dev;
tc->tc_mbuf_pool = tc_mbuf_pools[dev->socket];
/* egress "root" Qsch, which handle is 0, parent is TC_H_ROOT. */
tc->qsch = qsch_create_dflt(dev, default_qsch_ops, TC_H_ROOT);
if (!tc->qsch) {
rte_rwlock_write_unlock(&tc->lock);
tc_destroy_dev(dev);
return EDPVS_NOMEM;
}
tc->qsch_cnt = 1;
tc->qsch_ingress = NULL;
tc->qsch_hash_size = tc_qsch_hash_size;
size = sizeof(struct hlist_head) * tc->qsch_hash_size;
tc->qsch_hash = rte_malloc(NULL, size, RTE_CACHE_LINE_SIZE);
if (!tc->qsch_hash) {
rte_rwlock_write_unlock(&tc->lock);
tc_destroy_dev(dev);
return EDPVS_NOMEM;
}
for (hash = 0; hash < tc->qsch_hash_size; hash++)
INIT_HLIST_HEAD(&tc->qsch_hash[hash]);
rte_rwlock_write_unlock(&tc->lock);
return EDPVS_OK;
}
-
qsch_create_dflt
创建默认的 qsch,根是 TC_H_ROOT - 分配 qsch_hash 哈希桶,就是个二维数组,每个元素又是链表
从源码可以看到,默认的就是 pfifo_fast. 这里还有小细节就是 RTE_CACHE_LINE_SIZE, cpu cache 对齐
static struct Qsch_ops *default_qsch_ops = &pfifo_fast_ops;
创建默认策略 qsch_create_dflt
默认使用 pfifo_fast_ops 策略,那么看一下如何初始化的
struct Qsch *qsch_create_dflt(struct netif_port *dev, struct Qsch_ops *ops,
tc_handle_t parent)
{
int err;
struct Qsch *sch;
assert(dev && ops);
tc_qsch_ops_get(ops);
sch = sch_alloc(&dev->tc, ops);
if (!sch) {
tc_qsch_ops_put(ops);
return NULL;
}
sch->parent = parent;
if (ops->init && (err = ops->init(sch, NULL)) != EDPVS_OK) {
tc_qsch_ops_put(ops);
qsch_destroy(sch);
return NULL;
}
return sch;
}
- 调用
sch_alloc
分配 Qsch 结构体,这里还有细节 - 如果 ops 有 init 函数指针,那么执行
static inline struct Qsch *sch_alloc(struct netif_tc *tc, struct Qsch_ops *ops)
{
struct Qsch *sch;
unsigned int size = TC_ALIGN(sizeof(*sch)) + ops->priv_size;
lcoreid_t cid;
sch = rte_zmalloc(NULL, size, RTE_CACHE_LINE_SIZE);
if (!sch)
return NULL;
for (cid = 0; cid < NELEMS(sch->q); cid++)
tc_mbuf_head_init(&sch->q[cid]);
INIT_LIST_HEAD(&sch->cls_list);
INIT_HLIST_NODE(&sch->hlist);
sch->tc = tc;
sch->ops = ops;
rte_atomic32_set(&sch->refcnt, 1);
return sch;
}
- 可以看到 rte_zmalloc 分配 Qsch 结构体时,还带上了 ops->priv_size 大小的内存区域,这块是给不同规则自己使用的
- 初始化 mbuf 队列
static int pfifo_fast_init(struct Qsch *sch, const void *arg)
{
int band;
lcoreid_t cid;
struct pfifo_fast_priv *priv = qsch_priv(sch);
for (cid = 0; cid < NELEMS(priv->q); cid++) {
for (band = 0; band < PFIFO_FAST_BANDS; band++) {
tc_mbuf_head_init(band2list_cpu(priv, band, cid));
}
}
/* FIXME: txq_desc_nb is not set when alloc device.
* we can move tc_init_dev to dev start phase but not
* all dev will be start now, netif need be modified. */
#if 0
sch->limit = qsch_dev(sch)->txq_desc_nb;
#else
sch->limit = 128;
#endif
return EDPVS_OK;
}
struct pfifo_fast_priv {
uint32_t bitmap[RTE_MAX_LCORE];
struct tc_mbuf_head q[RTE_MAX_LCORE][PFIFO_FAST_BANDS];
#define this_bitmap bitmap[rte_lcore_id()]
#define this_pff_q q[rte_lcore_id()]
};
再回头看 pfifo_fast 的初始化函数和 pfifo_fast_priv 结构体
-
qsch_priv
获取私有内存区域,这块区域就是上面额外分配的 ops->priv_size 大小 - 初始化队列,每个队列有三个波段,即对应的 band0, band1, band2. 具体使用后面就能看到,sch->limit 写死的 128
tc 数据出口
在发送数据包 netif_xmit
时会调用 tc 控制函数 tc_handle_egress
struct rte_mbuf *tc_handle_egress(struct netif_tc *tc,
struct rte_mbuf *mbuf, int *ret)
{
int err = EDPVS_OK;
struct Qsch *sch, *child_sch = NULL;
struct tc_cls *cls;
struct tc_cls_result cls_res;
const int max_reclassify_loop = 8;
int limit = 0;
assert(tc && mbuf && ret);
/* start from egress root qsch */
sch = tc->qsch;
if (unlikely(!sch)) {
*ret = EDPVS_OK;
return mbuf;
}
qsch_get(sch);
/*
* classify the traffic first.
* support classify for child schedulers only.
* it no classifier matchs, than use current scheduler.
*/
again:
list_for_each_entry(cls, &sch->cls_list, list) {
if (unlikely(mbuf->packet_type != cls->pkt_type &&
cls->pkt_type != htons(ETH_P_ALL)))
continue;
err = cls->ops->classify(cls, mbuf, &cls_res);
switch (err) {
case TC_ACT_OK:
break;
case TC_ACT_SHOT:
goto drop;
default:
continue;
}
if (unlikely(cls_res.drop))
goto drop;
child_sch = qsch_lookup(sch->tc, cls_res.sch_id);
if (unlikely(!child_sch)) {
RTE_LOG(WARNING, TC, "%s: target Qsch not exist.\n",
__func__);
continue;
}
if (unlikely(child_sch->parent != sch->handle)) {
RTE_LOG(WARNING, TC, "%s: classified to non-children scheduler\n",
__func__);
qsch_put(child_sch);
continue;
}
/* pass the packet to child scheduler */
qsch_put(sch);
sch = child_sch;
if (unlikely(limit++ >= max_reclassify_loop)) {
RTE_LOG(DEBUG, TC, "%s: exceed reclassify max loop.\n",
__func__);
goto drop;
}
/* classify again for new selected Qsch */
goto again;
}
/* this scheduler has no queue (for classify only) ? */
if (unlikely(!sch->ops->enqueue))
goto out; /* no need to set @ret */
/* mbuf is always consumed (queued or dropped) */
err = sch->ops->enqueue(sch, mbuf);
mbuf = NULL;
*ret = err;
/* try dequeue and xmit */
qsch_do_sched(sch);
out:
qsch_put(sch);
return mbuf;
drop:
*ret = qsch_drop(sch, mbuf);
qsch_put(sch);
return NULL;
}
为了分析起来不那么复杂,先不考滤层级关系,忽略 again 循环。那么函数完成两个操作:
- enqueue 函数指针,将 mbuf 数据包入队,实际调用
pfifo_fast_enqueue
- qsch_do_sched 尝试将己有的数据包从队列取出,发送到网卡。
tc 数据出口 enqueue 入队
static int pfifo_fast_enqueue(struct Qsch *sch, struct rte_mbuf *mbuf)
{
int band, err;
uint8_t prio = 0;
struct pfifo_fast_priv *priv;
struct tc_mbuf_head *qh;
/* sch->limit is same as dev->txq_desc_nb */
if (unlikely(sch->this_q.qlen >= sch->limit)) {
#if defined(CONFIG_TC_DEBUG)
RTE_LOG(WARNING, TC, "%s: queue is full.\n", __func__);
#endif
return qsch_drop(sch, mbuf);
}
if (unlikely(mbuf->udata64 > 0 && mbuf->udata64 <= TC_PRIO_MAX &&
mbuf->packet_type == ETH_P_IP))
prio = (uint8_t)mbuf->udata64;
band = prio2band[prio];
priv = qsch_priv(sch);
qh = band2list(priv, band);
err = __qsch_enqueue_tail(sch, mbuf, qh);
if (err == EDPVS_OK) {
priv->this_bitmap |= (1 << band);
sch->this_q.qlen++;
sch->this_qstats.qlen++;
}
return err;
}
- 首先判断是否超限 sch->this_q.qlen >= sch->limit, 如果超了直接丢弃
- prio 获取 TOS 字段值,这是一个四位的数,根据这个数来选择数据包落到哪个波段。prio2band 是值到波段的映射,具体值可以参考源码,一共 16 个
- __qsch_enqueue_tail 将 mbuf 添加到对应波段队列
- 将对应波段 map 置位,this_bitmap,表示这个波段有数据了
tc 数据出口 qsch_do_sched 尝试发送数据
void qsch_do_sched(struct Qsch *sch)
{
int quota = dev_tx_weight;
int npkt;
while (quota > 0 && sch_dequeue_xmit(sch, &npkt))
quota -= npkt;
return;
}
quota 是发送配额,代码写死 64 个,也就是说最多一次发送 64 个 mbuf
static inline int sch_dequeue_xmit(struct Qsch *sch, int *npkt)
{
struct rte_mbuf *mbuf;
*npkt = 1; /* TODO: bulk dequeue */
mbuf = sch->ops->dequeue(sch);
if (unlikely(!mbuf))
return 0;
netif_hard_xmit(mbuf, netif_port_get(mbuf->port));
return sch_qlen(sch);
}
调用 dequeue 回调函数,然后将数据发送到网卡,这里回调 pfifo_fast_dequeue
static struct rte_mbuf *pfifo_fast_dequeue(struct Qsch *sch)
{
struct pfifo_fast_priv *priv = qsch_priv(sch);
int band = bitmap2band[priv->this_bitmap];
struct tc_mbuf_head *qh;
struct rte_mbuf *mbuf;
if (unlikely(band < 0))
return NULL;
qh = band2list(priv, band);
mbuf = __qsch_dequeue_head(sch, qh);
if (mbuf) {
sch->this_q.qlen--;
sch->this_qstats.qlen--;
}
if (likely(qh->qlen == 0))
priv->this_bitmap &= ~(1 << band);
return mbuf;
}
this_bitmap 上文说过,如果对应波段有数据,那么对应位会置为1,一共 3 个波段,所以这个值范围是从 0 ~ 7,那么根据优先级原理,波段 0 发送完才能发送 波段 1,然后才是波段 2
static const int bitmap2band[] = {-1, 0, 1, 0, 2, 0, 1, 0};
所以这个 bitmap2band 数组实现的很巧妙。出队后如果 qh->qlen 长度为 0 了,那么再将 this_bitmap 的波段位置空。
pfifo_fast 小结
struct pfifo_fast_priv {
uint32_t bitmap[RTE_MAX_LCORE];
struct tc_mbuf_head q[RTE_MAX_LCORE][PFIFO_FAST_BANDS];
#define this_bitmap bitmap[rte_lcore_id()]
#define this_pff_q q[rte_lcore_id()]
};
这是最简单,也是系统默认的流控规则。这里有一点要注意,dpdk 将工作锁在了每个核上,尽量不和其它核交互,所以队列 q 是每个核一个,每个队列有三个波段。
其它规则实现
- [p|b]fifo 也不分什么波段,就是队列,但是入队出队有区别。pfifo 看数据包个数bfifo 看数据包大小。入队源码,唯一区别就是 if 判断。
- TBF 看了一下代码,标准的令牌桶算法实现。rate 速率,还有一个 peek 代表最大 burst 突发速率
层级控制的实现
刚才看到的都是单个规则实现,其实 tc 还有种层级控制,HTB 就是层级令牌桶的实现。就像一棵树一样,在 tc_handle_egress
有一段 again 代码,递归根据分类器去查找,最大查找深度是 max_reclassify_loop. 在 tc_init
时得知,当前只有一个 match_cls_ops
分类器。
again:
list_for_each_entry(cls, &sch->cls_list, list) {
if (unlikely(mbuf->packet_type != cls->pkt_type &&
cls->pkt_type != htons(ETH_P_ALL)))
continue;
err = cls->ops->classify(cls, mbuf, &cls_res);
switch (err) {
case TC_ACT_OK:
break;
case TC_ACT_SHOT:
goto drop;
default:
continue;
}
if (unlikely(cls_res.drop))
goto drop;
child_sch = qsch_lookup(sch->tc, cls_res.sch_id);
......
/* pass the packet to child scheduler */
qsch_put(sch);
sch = child_sch;
if (unlikely(limit++ >= max_reclassify_loop)) {
RTE_LOG(DEBUG, TC, "%s: exceed reclassify max loop.\n",
__func__);
goto drop;
}
/* classify again for new selected Qsch */
goto again;
}
首先如果存在分类器,根据 classify 回调去分类,然后查找子规则 child_sch,直到匹配成功,返回 TC_ACT_OK
层级控制 classify
查看源码得知,classify 真正调用 match_classify
static int match_classify(struct tc_cls *cls, struct rte_mbuf *mbuf,
struct tc_cls_result *result)
{
struct match_cls_priv *priv = tc_cls_priv(cls);
struct dp_vs_match *m = &priv->match;
struct ether_hdr *eh = rte_pktmbuf_mtod(mbuf, struct ether_hdr *);
struct iphdr *iph = NULL;
struct tcphdr *th;
struct udphdr *uh;
int offset = sizeof(*eh);
__be16 pkt_type = eh->ether_type;
__be16 sport, dport;
struct netif_port *idev, *odev;
struct vlan_ethhdr *veh;
int err = TC_ACT_RECLASSIFY; /* by default */
idev = netif_port_get_by_name(m->iifname);
odev = netif_port_get_by_name(m->oifname);
sport = dport = 0;
/* support IPv4 and 802.1q/IPv4 */
l2parse:
switch (ntohs(pkt_type)) {
case ETH_P_IP:
if (mbuf_may_pull(mbuf, offset + sizeof(struct iphdr)) != 0) {
err = TC_ACT_SHOT;
goto done;
}
iph = rte_pktmbuf_mtod_offset(mbuf, struct iphdr *, offset);
/* check if source/dest IP in range */
if (m->srange.max_addr.in.s_addr != htonl(INADDR_ANY)) {
if (ntohl(iph->saddr) < ntohl(m->srange.min_addr.in.s_addr) ||
ntohl(iph->saddr) > ntohl(m->srange.max_addr.in.s_addr))
goto done;
}
if (m->drange.max_addr.in.s_addr != htonl(INADDR_ANY)) {
if (ntohl(iph->daddr) < ntohl(m->drange.min_addr.in.s_addr) ||
ntohl(iph->daddr) > ntohl(m->drange.max_addr.in.s_addr))
goto done;
}
offset += (iph->ihl << 2);
break;
case ETH_P_8021Q:
veh = (struct vlan_ethhdr *)eh;
pkt_type = veh->h_vlan_encapsulated_proto;
offset += VLAN_HLEN;
goto l2parse;
default:
goto done;
}
switch (iph->protocol) {
case IPPROTO_TCP:
if (mbuf_may_pull(mbuf, offset + sizeof(struct tcphdr)) != 0) {
err = TC_ACT_SHOT;
goto done;
}
th = rte_pktmbuf_mtod_offset(mbuf, struct tcphdr *, offset);
sport = th->source;
dport = th->dest;
break;
case IPPROTO_UDP:
if (mbuf_may_pull(mbuf, offset + sizeof(struct udphdr)) != 0) {
err = TC_ACT_SHOT;
goto done;
}
uh = rte_pktmbuf_mtod_offset(mbuf, struct udphdr *, offset);
sport = uh->source;
dport = uh->dest;
break;
default: /* priv->proto is not assigned */
goto match;
}
/* check if source/dest port in range */
if (m->srange.max_port) {
if (ntohs(sport) < ntohs(m->srange.min_port) ||
ntohs(sport) > ntohs(m->srange.max_port))
goto done;
}
if (m->drange.max_port) {
if (ntohs(dport) < ntohs(m->drange.min_port) ||
ntohs(dport) > ntohs(m->drange.max_port))
goto done;
}
match:
/* all matchs */
*result = priv->result;
err = TC_ACT_OK;
done:
return err;
}
- 根据 dp_vs_match,获取数据包进入的网卡,和要出去的网卡
- 根据 dp_vs_match,判断数据包的 sport, dport 是否在范围之内
- 判断 pkt_type 网络包类型,当前仅支持 ipv4, ETH_P_8021Q (马上要支持 ipv6)
- 获取 tcp 或是 udp 的源目的端口用于 debug
层级控制 查找qsch_lookup
child_sch = qsch_lookup(sch->tc, cls_res.sch_id);
根据 cls_res 匹配结果,查找子规则
struct Qsch *qsch_lookup_noref(const struct netif_tc *tc, tc_handle_t handle)
{
int hash;
struct Qsch *sch;
assert(tc->qsch_hash && tc->qsch_hash_size);
if (likely(tc->qsch && tc->qsch->handle == handle))
return tc->qsch;
hash = sch_hash(handle, tc->qsch_hash_size);
hlist_for_each_entry(sch, &tc->qsch_hash[hash], hlist) {
if (likely(sch->handle == handle))
return sch;
}
if (tc->qsch_ingress && tc->qsch_ingress->handle == handle)
return tc->qsch_ingress;
return NULL;
}
可以看到,从 qsch_hash 就是个哈希桶,找到后返回。最终处理和单个规则执行一样。
总结
这块比较复杂,暂时也不深入了,以后有机会看看内核 tc 的实现
网友评论