美文网首页ovs源码剖析
ovs中handler和revalidator线程

ovs中handler和revalidator线程

作者: 分享放大价值 | 来源:发表于2021-02-28 11:30 被阅读0次

    给ovs添加网桥后,ovs-vswitchd进程就会自动生成若干个handler和revalidator线程,如下所示:

    root@master:~# ovs-vsctl add-br br1
    root@master:~# ovs-vsctl show
    ed1aefeb-6dbe-4634-bda8-bf97dac313e5
        Bridge "br1"
            Port "br1"
                Interface "br1"
                    type: internal
    
    root@master:~# ps -ef | grep ovs
    root     11443     1  0 22:33 ?        00:00:00 ovs-vswitchd unix:/usr/local/var/run/openvswitch/db.sock
    root@master:~# top -H -p 11443
    top - 22:40:15 up 1 day,  9:46,  1 user,  load average: 0.20, 0.19, 0.13
    Threads:   6 total,   0 running,   6 sleeping,   0 stopped,   0 zombie
    %Cpu(s):  1.7 us,  1.7 sy,  0.0 ni, 96.7 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
    MiB Mem :   7961.7 total,   4198.7 free,    427.4 used,   3335.5 buff/cache
    MiB Swap:      0.0 total,      0.0 free,      0.0 used.   7247.0 avail Mem
    
      PID USER      PR  NI    VIRT    RES    SHR S  %CPU  %MEM     TIME+ COMMAND
    11443 root      20   0  374132   4324   3396 S   0.0   0.1   0:00.14 ovs-vswitchd
    11737 root      20   0  374132   4324   3396 S   0.0   0.1   0:00.00 urcu4
    17118 root      20   0  374132   4324   3396 S   0.0   0.1   0:00.00 handler24
    17119 root      20   0  374132   4324   3396 S   0.0   0.1   0:00.00 handler23
    17120 root      20   0  374132   4324   3396 S   0.0   0.1   0:00.00 revalidator25
    17121 root      20   0  374132   4324   3396 S   0.0   0.1   0:00.00 revalidator22
    

    handler和revalidator线程个数

    线程个数由如下代码决定:
    a. 如果配置了线程个数,则使用配置的值。使用如下命令配置,实时生效
    ovs-vsctl --no-wait set Open_vSwitch . other_config:n-handler-threads=4
    ovs-vsctl --no-wait set Open_vSwitch . other_config:n-revalidator-threads=4

    b. 如果没有配置,则根据cpu个数来定。n_revalidators 等于cpu个数除4后加1,n_handlers 等于cpu个数减去n_revalidators 个数。
    上面的测试环境只有四个cpu,所以有两个revalidator线程和两个handler。

    static void
    bridge_reconfigure(const struct ovsrec_open_vswitch *ovs_cfg)
        ofproto_set_threads(
            smap_get_int(&ovs_cfg->other_config, "n-handler-threads", 0),
            smap_get_int(&ovs_cfg->other_config, "n-revalidator-threads", 0));
    
    void
    ofproto_set_threads(int n_handlers_, int n_revalidators_)
    {
        int threads = MAX(count_cpu_cores(), 2);
    
        n_revalidators = MAX(n_revalidators_, 0);
        n_handlers = MAX(n_handlers_, 0);
    
        if (!n_revalidators) {
            n_revalidators = n_handlers
                ? MAX(threads - (int) n_handlers, 1)
                : threads / 4 + 1;
        }
    
        if (!n_handlers) {
            n_handlers = MAX(threads - (int) n_revalidators, 1);
        }
    }
    

    线程的作用

    handler 线程
    我们知道,ovs包含三个流表:microflow,megaflow和openflow。其中openflow流表是由用户或者controller配置的,其他两个流表是报文触发创建,一条流的首包查找microflow,megaflow失败后,会走慢速路径查找openflow流表(流表规则为normal的话,就会变为mac学习,根据fdb转发),然后将查找到的转发信息,再下发到microflow,megaflow,这样后面的报文就可以根据microflow,megaflow进行转发,这称为快速路径。这里的慢速路径的处理就是handler线程的工作内容。
    有两点需要注意的是,一个是对于普通的ovs来说,microflow和megaflow存在于openvswitch.ko内核模块中,而对于ovs+dpdk来说,microflow和megaflow存在于用户态的ovs-vswitchd进程中。当然在ovs+dpdk模式下,也可以加载openvswitch.ko内核模块,这样两种模式可以共同存在,创建两种类型的桥。
    另一个是handler线程只对普通ovs下,从内核upcall的消息进行处理。ovs+dpdk下,handler线程也存在,但是一直处于堵塞状态,实际上什么也没干。ovs+dpdk下的慢速路径的处理直接由收包线程执行,比如pmd线程。

    revalidator线程
    考虑下面这几种情况

    a. 比如已经通过handler线程下发了一条megaflow流表(可通过ovs-appctl dpctl/dump-flows进行查看),如果一段时间内没有流量使用此流表,10s后就会被删除。
    b. 又比如已经下发了一条megaflow流表,10s内将其对应的openflow流表删除,则这条megaflow流表也要立即被删除。
    c. 再比如已经下发了一条megaflow流表,10s内修改其对应openflow流表的action后,则这条megaflow流表的action也要相应改变。
    

    revalidator线程就是干上面的工作的,当然还有其他没考虑到的情况,总结来说就是revalidator线程用于megaflow流表的超时删除,响应openflow流表的改动,还有就是周期获取datapath流表的统计信息,一方面是为了确认流表是否被使用了,另一方面用来响应查看统计信息的命令(ovs-ofctl dump-flows br1)。

    可以使用下面两条命令设置megaflow流表最大值(默认ofproto_flow_limit 20万)和流表超时时间(默认ofproto_max_idle 10s)
    ovs-vsctl set Open_vSwitch . other_config:flow-limit=2
    ovs-vsctl set Open_vSwitch . other_config:max-idle=1000000

    revalidator线程在普通ovs和ovs+dpdk模式下都生效。

    源码分析

    handler线程
    handler线程的主循环如下,只要exit_latch不设置就一直存在。

    static void *
    udpif_upcall_handler(void *arg)
    {
        struct handler *handler = arg;
        struct udpif *udpif = handler->udpif;
    
        while (!latch_is_set(&handler->udpif->exit_latch)) {
            //recv_upcalls用于接收upcall消息。只有kernel path提供了recv函数dpif_netlink_recv,所以说handler线程对dpdk path不生效。
            //recv_upcalls返回值大于0,说明已经处理过upcall消息,防止
            //丢失后续的upcall消息,线程不能堵塞,需要再次调用recv_upcalls
            //尝试接收upcall消息,所以需要调用poll_immediate_wake将
            //timeout_when设置为最小值LLONG_MIN,这样poll_block调用poll函数就能立即返回,继续执行recv_upcalls。
            //recv_upcalls返回值小于0,说明没有upcall消息,dpif_recv_wait调用等待upcall消息即可。
            if (recv_upcalls(handler)) {
                poll_immediate_wake();
            } else {
                //等待upcall消息
                dpif_recv_wait(udpif->dpif, handler->handler_id);
                //等待exit消息
                latch_wait(&udpif->exit_latch);
            }
            //堵塞在poll函数上,超时时间timeout_when为最大值,等待事件发生
            poll_block();
        }
    
        return NULL;
    }
    

    主处理函数recv_upcalls

    static size_t
    recv_upcalls(struct handler *handler)
    {
        struct udpif *udpif = handler->udpif;
        uint64_t recv_stubs[UPCALL_MAX_BATCH][512 / 8];
        struct ofpbuf recv_bufs[UPCALL_MAX_BATCH];
        struct dpif_upcall dupcalls[UPCALL_MAX_BATCH];
        struct upcall upcalls[UPCALL_MAX_BATCH];
        struct flow flows[UPCALL_MAX_BATCH];
        size_t n_upcalls, i;
    
        n_upcalls = 0;
    //每次最多处理50个upcall消息
        while (n_upcalls < UPCALL_MAX_BATCH) {
            struct ofpbuf *recv_buf = &recv_bufs[n_upcalls];
            struct dpif_upcall *dupcall = &dupcalls[n_upcalls];
            struct upcall *upcall = &upcalls[n_upcalls];
            struct flow *flow = &flows[n_upcalls];
            unsigned int mru;
            int error;
    
            ofpbuf_use_stub(recv_buf, recv_stubs[n_upcalls],
                            sizeof recv_stubs[n_upcalls]);
            //从datapath接收upcall消息存储在dupcall中
            if (dpif_recv(udpif->dpif, handler->handler_id, dupcall, recv_buf)) {
                ofpbuf_uninit(recv_buf);
                break;
            }
            //将dupcall消息中的key转换到flow中
            if (odp_flow_key_to_flow(dupcall->key, dupcall->key_len, flow)
                == ODP_FIT_ERROR) {
                goto free_dupcall;
            }
    
            if (dupcall->mru) {
                mru = nl_attr_get_u16(dupcall->mru);
            } else {
                mru = 0;
            }
    
            error = upcall_receive(upcall, udpif->backer, &dupcall->packet,
                                   dupcall->type, dupcall->userdata, flow, mru,
                                   &dupcall->ufid, PMD_ID_NULL);
            if (error) {
                if (error == ENODEV) {
                    /* Received packet on datapath port for which we couldn't
                     * associate an ofproto.  This can happen if a port is removed
                     * while traffic is being received.  Print a rate-limited
                     * message in case it happens frequently. */
                    dpif_flow_put(udpif->dpif, DPIF_FP_CREATE, dupcall->key,
                                  dupcall->key_len, NULL, 0, NULL, 0,
                                  &dupcall->ufid, PMD_ID_NULL, NULL);
                    VLOG_INFO_RL(&rl, "received packet on unassociated datapath "
                                 "port %"PRIu32, flow->in_port.odp_port);
                }
                goto free_dupcall;
            }
    
            upcall->key = dupcall->key;
            upcall->key_len = dupcall->key_len;
            upcall->ufid = &dupcall->ufid;
    
            upcall->out_tun_key = dupcall->out_tun_key;
            upcall->actions = dupcall->actions;
    
            pkt_metadata_from_flow(&dupcall->packet.md, flow);
            flow_extract(&dupcall->packet, flow);
            //开始处理,根据flow信息查找openflow流表
            error = process_upcall(udpif, upcall,
                                   &upcall->odp_actions, &upcall->wc);
            if (error) {
                goto cleanup;
            }
    
            n_upcalls++;
            continue;
    
    cleanup:
            upcall_uninit(upcall);
    free_dupcall:
            dp_packet_uninit(&dupcall->packet);
            ofpbuf_uninit(recv_buf);
        }
    
        if (n_upcalls) {
            //如果查找openflow成功,则将相关信息安装到datapath中,后续的报文根据datapath中的流表转发
            handle_upcalls(handler->udpif, upcalls, n_upcalls);
            for (i = 0; i < n_upcalls; i++) {
                dp_packet_uninit(&dupcalls[i].packet);
                ofpbuf_uninit(&recv_bufs[i]);
                upcall_uninit(&upcalls[i]);
            }
        }
    
        return n_upcalls;
    }
    

    upcall消息有两种类型DPIF_UC_MISS和DPIF_UC_ACTION,前者表示查找流表失败,需要走慢速路径查找openflow流表,后者表示流表的action为将报文上送。这里只看一下DPIF_UC_MISS的情况。

    static int
    process_upcall(struct udpif *udpif, struct upcall *upcall,
                   struct ofpbuf *odp_actions, struct flow_wildcards *wc)
    {
        const struct nlattr *userdata = upcall->userdata;
        const struct dp_packet *packet = upcall->packet;
        const struct flow *flow = upcall->flow;
        size_t actions_len = 0;
        enum upcall_type upcall_type = classify_upcall(upcall->type, userdata);
        //upcall类型为 MISS_UPCALL
        switch (upcall_type) {
        case MISS_UPCALL:
            upcall_xlate(udpif, upcall, odp_actions, wc);
            return 0;
        ...
        }
    }
    
    static void
    upcall_xlate(struct udpif *udpif, struct upcall *upcall,
                 struct ofpbuf *odp_actions, struct flow_wildcards *wc)
    {
        struct dpif_flow_stats stats;
        struct xlate_in xin;
    
        stats.n_packets = 1;
        stats.n_bytes = dp_packet_size(upcall->packet);
        stats.used = time_msec();
        stats.tcp_flags = ntohs(upcall->flow->tcp_flags);
    
        xlate_in_init(&xin, upcall->ofproto,
                      ofproto_dpif_get_tables_version(upcall->ofproto),
                      upcall->flow, upcall->in_port, NULL,
                      stats.tcp_flags, upcall->packet, wc, odp_actions);
    
        if (upcall->type == DPIF_UC_MISS) {
            xin.resubmit_stats = &stats;
    
            if (xin.frozen_state) {
                /* We may install a datapath flow only if we get a reference to the
                 * recirculation context (otherwise we could have recirculation
                 * upcalls using recirculation ID for which no context can be
                 * found).  We may still execute the flow's actions even if we
                 * don't install the flow. */
                upcall->recirc = recirc_id_node_from_state(xin.frozen_state);
                upcall->have_recirc_ref = recirc_id_node_try_ref_rcu(upcall->recirc);
            }
        } else {
            /* For non-miss upcalls, we are either executing actions (one of which
             * is an userspace action) for an upcall, in which case the stats have
             * already been taken care of, or there's a flow in the datapath which
             * this packet was accounted to.  Presumably the revalidators will deal
             * with pushing its stats eventually. */
        }
    
        upcall->dump_seq = seq_read(udpif->dump_seq);
        upcall->reval_seq = seq_read(udpif->reval_seq);
        //关键函数,查找openflow流表
        xlate_actions(&xin, &upcall->xout);
        if (wc) {
            /* Convert the input port wildcard from OFP to ODP format. There's no
             * real way to do this for arbitrary bitmasks since the numbering spaces
             * aren't the same. However, flow translation always exact matches the
             * whole thing, so we can do the same here. */
            WC_MASK_FIELD(wc, in_port.odp_port);
        }
    
        upcall->xout_initialized = true;
    
        if (!upcall->xout.slow) {
            ofpbuf_use_const(&upcall->put_actions,
                             odp_actions->data, odp_actions->size);
        } else {
            uint32_t smid = upcall->ofproto->up.slowpath_meter_id;
            uint32_t cmid = upcall->ofproto->up.controller_meter_id;
            /* upcall->put_actions already initialized by upcall_receive(). */
            compose_slow_path(udpif, &upcall->xout, upcall->flow,
                              upcall->flow->in_port.odp_port,
                              &upcall->put_actions, smid, cmid);
        }
    
        /* This function is also called for slow-pathed flows.  As we are only
         * going to create new datapath flows for actual datapath misses, there is
         * no point in creating a ukey otherwise. */
        if (upcall->type == DPIF_UC_MISS) {
            upcall->ukey = ukey_create_from_upcall(upcall, wc);
        }
    }
    
    static void
    handle_upcalls(struct udpif *udpif, struct upcall *upcalls,
                   size_t n_upcalls)
    {
        //ops 为upcall个数的两倍,其中一个type为DPIF_OP_FLOW_PUT,用于安装flow,
        //另一个为DPIF_OP_EXECUTE,用于将upcall的报文安装action处理掉
        struct dpif_op *opsp[UPCALL_MAX_BATCH * 2];
        struct ukey_op ops[UPCALL_MAX_BATCH * 2];
        size_t n_ops, n_opsp, i;
    
        /* Handle the packets individually in order of arrival.
         *
         *   - For SLOW_CFM, SLOW_LACP, SLOW_STP, SLOW_BFD, and SLOW_LLDP,
         *     translation is what processes received packets for these
         *     protocols.
         *
         *   - For SLOW_CONTROLLER, translation sends the packet to the OpenFlow
         *     controller.
         *
         *   - For SLOW_ACTION, translation executes the actions directly.
         *
         * The loop fills 'ops' with an array of operations to execute in the
         * datapath. */
        n_ops = 0;
        for (i = 0; i < n_upcalls; i++) {
            struct upcall *upcall = &upcalls[i];
            const struct dp_packet *packet = upcall->packet;
            struct ukey_op *op;
            //should_install_flow决定是否install flow到datapath,比如是否超了flow_limit
            if (should_install_flow(udpif, upcall)) {
                struct udpif_key *ukey = upcall->ukey;
    
                if (ukey_install(udpif, ukey)) {
                    upcall->ukey_persists = true;
                    put_op_init(&ops[n_ops++], ukey, DPIF_FP_CREATE);
                }
            }
    
            if (upcall->odp_actions.size) {
                op = &ops[n_ops++];
                op->ukey = NULL;
                op->dop.type = DPIF_OP_EXECUTE;
                op->dop.u.execute.packet = CONST_CAST(struct dp_packet *, packet);
                op->dop.u.execute.flow = upcall->flow;
                odp_key_to_dp_packet(upcall->key, upcall->key_len,
                                     op->dop.u.execute.packet);
                op->dop.u.execute.actions = upcall->odp_actions.data;
                op->dop.u.execute.actions_len = upcall->odp_actions.size;
                op->dop.u.execute.needs_help = (upcall->xout.slow & SLOW_ACTION) != 0;
                op->dop.u.execute.probe = false;
                op->dop.u.execute.mtu = upcall->mru;
            }
        }
    
        /* Execute batch. */
        n_opsp = 0;
        for (i = 0; i < n_ops; i++) {
            opsp[n_opsp++] = &ops[i].dop;
        }
        //和datapath交互,将flow安装到datapath,并处理upcall报文
        dpif_operate(udpif->dpif, opsp, n_opsp);
        //设置 ukey 状态,ukey会在revalidator线程中使用
        for (i = 0; i < n_ops; i++) {
            struct udpif_key *ukey = ops[i].ukey;
    
            if (ukey) {
                ovs_mutex_lock(&ukey->mutex);
                if (ops[i].dop.error) {
                    transition_ukey(ukey, UKEY_EVICTED);
                } else if (ukey->state < UKEY_OPERATIONAL) {
                    transition_ukey(ukey, UKEY_OPERATIONAL);
                }
                ovs_mutex_unlock(&ukey->mutex);
            }
        }
    }
    

    revalidator线程
    revalidator线程在以下几种情况下才会将poll中从堵塞变为运行
    a. need_revalidate被设置,说明网桥配置或者流表发生变化,需要重新计算flow
    b. pause_latch被设置,说明此线程需要暂时停止运行
    c. exit_latch被设置,说明需要将此线程退出,比如将线程数量改小后,会有一部分线程需要退出
    d. timeout 超时后运行,最小值为500ms

    static void *
    udpif_revalidator(void *arg)
    {
        /* Used by all revalidators. */
        struct revalidator *revalidator = arg;
        struct udpif *udpif = revalidator->udpif;
        //第一个revalidator线程作为leader
        bool leader = revalidator == &udpif->revalidators[0];
    
        /* Used only by the leader. */
        long long int start_time = 0;
        uint64_t last_reval_seq = 0;
        size_t n_flows = 0;
    
        revalidator->id = ovsthread_id_self();
        for (;;) {
            if (leader) {
                uint64_t reval_seq;
    
                recirc_run(); /* Recirculation cleanup. */
                //获取 reval_seq,只有 need_revalidate 变为true后,reval_seq才会加1,进而唤醒revalidator线程
                reval_seq = seq_read(udpif->reval_seq);
                last_reval_seq = reval_seq;
                //获取datapath中flow个数
                n_flows = udpif_get_n_flows(udpif);
                udpif->max_n_flows = MAX(n_flows, udpif->max_n_flows);
                udpif->avg_n_flows = (udpif->avg_n_flows + n_flows) / 2;
    
                /* Only the leader checks the pause latch to prevent a race where
                 * some threads think it's false and proceed to block on
                 * reval_barrier and others think it's true and block indefinitely
                 * on the pause_barrier */
                //在dp_purge_cb清理所有ukey时,会设置pause_latch,暂停revalidator线程执行
                udpif->pause = latch_is_set(&udpif->pause_latch);
    
                /* Only the leader checks the exit latch to prevent a race where
                 * some threads think it's true and exit and others think it's
                 * false and block indefinitely on the reval_barrier */
                //如果不需要revalidator线程了,会在udpif_stop_threads设置exit_latch,停止此线程
                udpif->reval_exit = latch_is_set(&udpif->exit_latch);
    
                start_time = time_msec();
                if (!udpif->reval_exit) {
                    bool terse_dump;
                    //如果revalidator线程没有被停止,则创建dump,用来dump datapath中的flow
                    terse_dump = udpif_use_ufid(udpif);
                    udpif->dump = dpif_flow_dump_create(udpif->dpif, terse_dump,
                                                        NULL);
                }
            }
    
            /* Wait for the leader to start the flow dump. */
            //等待所有revalidator线程都运行到此处,才会开始向下执行
            ovs_barrier_block(&udpif->reval_barrier);
            if (udpif->pause) {
                revalidator_pause(revalidator);
            }
            //如果线程停止了,跳出循环
            if (udpif->reval_exit) {
                break;
            }
            //关键函数,下面会单独分析
            revalidate(revalidator);
    
            /* Wait for all flows to have been dumped before we garbage collect. */
            ovs_barrier_block(&udpif->reval_barrier);
            //删除状态为UKEY_EVICTED的ukey
            revalidator_sweep(revalidator);
    
            /* Wait for all revalidators to finish garbage collection. */
            ovs_barrier_block(&udpif->reval_barrier);
    
            if (leader) {
                unsigned int flow_limit;
                long long int duration;
    
                atomic_read_relaxed(&udpif->flow_limit, &flow_limit);
    
                dpif_flow_dump_destroy(udpif->dump);
                seq_change(udpif->dump_seq);
    
                duration = MAX(time_msec() - start_time, 1);
                udpif->dump_duration = duration;
                //flow比较多的情况下,最大限制flow_limit会发生变化
                //如果dump时间大于2000ms,则flow_limit需要除duration对于1000ms的倍数
                //如果dump时间大于1300ms,小于2000ms,则flow_limit变为当前值的四分之三
                //如果dump时间小于1000ms,并且当前flow个数大于2000,并且xxx,则flow_limit增加1000
                if (duration > 2000) {
                    flow_limit /= duration / 1000;
                } else if (duration > 1300) {
                    flow_limit = flow_limit * 3 / 4;
                } else if (duration < 1000 && n_flows > 2000
                           && flow_limit < n_flows * 1000 / duration) {
                    flow_limit += 1000;
                }
                //flow_limit最大值不能超过ofproto_flow_limit,即20万
                flow_limit = MIN(ofproto_flow_limit, MAX(flow_limit, 1000));
                //保存flow_limit,handler线程安装flow时会判断是否超过此值
                atomic_store_relaxed(&udpif->flow_limit, flow_limit);
    
                if (duration > 2000) {
                    VLOG_INFO("Spent an unreasonably long %lldms dumping flows",
                              duration);
                }
                //用于计算poll函数超时时间
                poll_timer_wait_until(start_time + MIN(ofproto_max_idle, 500));
                //将seq插入poll_loop节点,等待 seq_change 
                seq_wait(udpif->reval_seq, last_reval_seq);
                //将exit_latch插入poll_loop节点
                latch_wait(&udpif->exit_latch);
                //将pause_latch插入poll_loop节点
                latch_wait(&udpif->pause_latch);
                //调用poll函数,获取poll_loop节点上发生的事件,如果有事件发生,则继续执行,否则就等待超时后再继续执行。
                poll_block();
    
                if (!latch_is_set(&udpif->pause_latch) &&
                    !latch_is_set(&udpif->exit_latch)) {
                    long long int now = time_msec();
                    /* Block again if we are woken up within 5ms of the last start
                     * time. */
                    start_time += 5;
    
                    if (now < start_time) {
                        poll_timer_wait_until(start_time);
                        latch_wait(&udpif->exit_latch);
                        latch_wait(&udpif->pause_latch);
                        poll_block();
                    }
                }
            }
        }
    
        return NULL;
    }
    
    static void
    revalidate(struct revalidator *revalidator)
    {
        uint64_t odp_actions_stub[1024 / 8];
        struct ofpbuf odp_actions = OFPBUF_STUB_INITIALIZER(odp_actions_stub);
    
        struct udpif *udpif = revalidator->udpif;
        struct dpif_flow_dump_thread *dump_thread;
        uint64_t dump_seq, reval_seq;
        unsigned int flow_limit;
    
        dump_seq = seq_read(udpif->dump_seq);
        reval_seq = seq_read(udpif->reval_seq);
        atomic_read_relaxed(&udpif->flow_limit, &flow_limit);
        dump_thread = dpif_flow_dump_thread_create(udpif->dump);
        for (;;) {
            struct ukey_op ops[REVALIDATE_MAX_BATCH];
            int n_ops = 0;
    
            struct dpif_flow flows[REVALIDATE_MAX_BATCH];
            const struct dpif_flow *f;
            int n_dumped;
    
            long long int max_idle;
            long long int now;
            size_t n_dp_flows;
            bool kill_them_all;
            //从datapath dump flow信息
            n_dumped = dpif_flow_dump_next(dump_thread, flows, ARRAY_SIZE(flows));
            if (!n_dumped) {
                break;
            }
    
            now = time_msec();
    
            /* In normal operation we want to keep flows around until they have
             * been idle for 'ofproto_max_idle' milliseconds.  However:
             *
             *     - If the number of datapath flows climbs above 'flow_limit',
             *       drop that down to 100 ms to try to bring the flows down to
             *       the limit.
             *
             *     - If the number of datapath flows climbs above twice
             *       'flow_limit', delete all the datapath flows as an emergency
             *       measure.  (We reassess this condition for the next batch of
             *       datapath flows, so we will recover before all the flows are
             *       gone.) */
            //再次获取datapath中flow个数
            n_dp_flows = udpif_get_n_flows(udpif);
            //如果当前flow个数超过flow_limit的两倍,则删除获取的所有flow
            kill_them_all = n_dp_flows > flow_limit * 2;
            //如果当前flow个数超过 flow_limit,则max_idle为100ms,否则为默认的10s.
            //max_idle为超时时间,如果在超时时间内,此flow没有被使用,则超时后就把flow删除。默认10s删除一次。
            //如果flow个数超过最大限制flow_limit了,超时时间修改为100ms,需要更快的删除。
            max_idle = n_dp_flows > flow_limit ? 100 : ofproto_max_idle;
    
            for (f = flows; f < &flows[n_dumped]; f++) {
                long long int used = f->stats.used;
                struct recirc_refs recircs = RECIRC_REFS_EMPTY_INITIALIZER;
                enum reval_result result;
                struct udpif_key *ukey;
                bool already_dumped;
                int error;
                //获取锁失败,说明正在被其他revalidate线程处理,继续处理下一个
                if (ukey_acquire(udpif, f, &ukey, &error)) {
                    if (error == EBUSY) {
                        /* Another thread is processing this flow, so don't bother
                         * processing it.*/
                        COVERAGE_INC(upcall_ukey_contention);
                    } else {
                        log_unexpected_flow(f, error);
                        if (error != ENOENT) {
                            delete_op_init__(udpif, &ops[n_ops++], f);
                        }
                    }
                    continue;
                }
                //dump_seq 相等,说明已经被dump过,或者刚创建ukey,继续处理下一个
                already_dumped = ukey->dump_seq == dump_seq;
                if (already_dumped) {
                    /* The flow has already been handled during this flow dump
                     * operation. Skip it. */
                    if (ukey->xcache) {
                        COVERAGE_INC(dumped_duplicate_flow);
                    } else {
                        COVERAGE_INC(dumped_new_flow);
                    }
                    ovs_mutex_unlock(&ukey->mutex);
                    continue;
                }
    
                if (ukey->state <= UKEY_OPERATIONAL) {
                    /* The flow is now confirmed to be in the datapath. */
                    transition_ukey(ukey, UKEY_OPERATIONAL);
                } else {
                    VLOG_INFO("Unexpected ukey transition from state %d "
                              "(last transitioned from thread %u at %s)",
                              ukey->state, ukey->state_thread, ukey->state_where);
                    ovs_mutex_unlock(&ukey->mutex);
                    continue;
                }
    
                if (!used) {
                    used = ukey->created;
                }
                //used 表示flow最近被用的时间,如果超了max_idle这么长时间不被使用就会被删除。
                //比如只创建了一条flow,max_idle就是默认的10s,如果10s内此flow没有流量,则会被删除。
                if (kill_them_all || (used && used < now - max_idle)) {
                    result = UKEY_DELETE;
                } else {
                    //验证ukey,比如只创建了一条flow,10s内openflow流表发生改变后,需要验证megaflow中flow是否需要改变,
                    //比如同一条流的action发生改变。
                    //udpif->reval_seq和ukey->reval_seq不一致,说明openflow流表发生变化
                    result = revalidate_ukey(udpif, ukey, &f->stats, &odp_actions,
                                             reval_seq, &recircs);
                }
                ukey->dump_seq = dump_seq;
                //如果结果不是keep,则需要根据结果(删除/修改)初始化ops
                if (result != UKEY_KEEP) {
                    /* Takes ownership of 'recircs'. */
                    reval_op_init(&ops[n_ops++], result, udpif, ukey, &recircs,
                                  &odp_actions);
                }
                ovs_mutex_unlock(&ukey->mutex);
            }
            //和datapath交互,删除或者修改flow
            if (n_ops) {
                /* Push datapath ops but defer ukey deletion to 'sweep' phase. */
                push_dp_ops(udpif, ops, n_ops);
            }
            ovsrcu_quiesce();
        }
        dpif_flow_dump_thread_destroy(dump_thread);
        ofpbuf_uninit(&odp_actions);
    }
    

    revalidate_ukey用来验证openflow流表是否发生改变,方法是使用datapath获取的flow信息重新查找openflow流表,根据结果判断是否和datapath的action是否一致,如果不一致说明说明需要修改或者删除datapath流表。

    /* Verifies that the datapath actions of 'ukey' are still correct, and pushes
     * 'stats' for it.
     *
     * Returns a recommended action for 'ukey', options include:
     *      UKEY_DELETE The ukey should be deleted.
     *      UKEY_KEEP   The ukey is fine as is.
     *      UKEY_MODIFY The ukey's actions should be changed but is otherwise
     *                  fine.  Callers should change the actions to those found
     *                  in the caller supplied 'odp_actions' buffer.  The
     *                  recirculation references can be found in 'recircs' and
     *                  must be handled by the caller.
     *
     * If the result is UKEY_MODIFY, then references to all recirc_ids used by the
     * new flow will be held within 'recircs' (which may be none).
     *
     * The caller is responsible for both initializing 'recircs' prior this call,
     * and ensuring any references are eventually freed.
     */
    static enum reval_result
    revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey,
                    const struct dpif_flow_stats *stats,
                    struct ofpbuf *odp_actions, uint64_t reval_seq,
                    struct recirc_refs *recircs)
        OVS_REQUIRES(ukey->mutex)
    {
        //最关键的判断,只有流表发生改变就会设置need_revalidate 
        bool need_revalidate = ukey->reval_seq != reval_seq;
        enum reval_result result = UKEY_DELETE;
        struct dpif_flow_stats push;
    
        ofpbuf_clear(odp_actions);
    
        push.used = stats->used;
        push.tcp_flags = stats->tcp_flags;
        push.n_packets = (stats->n_packets > ukey->stats.n_packets
                          ? stats->n_packets - ukey->stats.n_packets
                          : 0);
        push.n_bytes = (stats->n_bytes > ukey->stats.n_bytes
                        ? stats->n_bytes - ukey->stats.n_bytes
                        : 0);
        //need_revalidate为true,说明openflow流表发生变化,需要验证mask/action是否改变
        if (need_revalidate) {
            if (should_revalidate(udpif, push.n_packets, ukey->stats.used)) {
                if (!ukey->xcache) {
                    ukey->xcache = xlate_cache_new();
                } else {
                    xlate_cache_clear(ukey->xcache);
                }
                result = revalidate_ukey__(udpif, ukey, push.tcp_flags,
                                           odp_actions, recircs, ukey->xcache);
            } /* else delete; too expensive to revalidate */
        } else if (!push.n_packets || ukey->xcache
                   || !populate_xcache(udpif, ukey, push.tcp_flags)) {
            result = UKEY_KEEP;
        }
    
        /* Stats for deleted flows will be attributed upon flow deletion. Skip. */
        if (result != UKEY_DELETE) {
            xlate_push_stats(ukey->xcache, &push);
            ukey->stats = *stats;
            ukey->reval_seq = reval_seq;
        }
    
        return result;
    }
    

    配置流表后,会将 ofproto->backer->need_revalidate 设置为 REV_FLOW_TABLE,这样在 type_run 中就可以将reval_seq加1,唤醒revalidator线程进行处理(修改datapath流表)

    handle_flow_mod__
        ofproto_bump_tables_version(ofproto);
            ++ofproto->tables_version;
            //set_tables_version
            ofproto->ofproto_class->set_tables_version(ofproto, ofproto->tables_version);
                struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofproto_);
    
                /* Use memory_order_release to signify that any prior memory accesses can
                 * not be reordered to happen after this atomic store.  This makes sure the
                 * new version is properly set up when the readers can read this 'version'
                 * value. */
                atomic_store_explicit(&ofproto->tables_version, version,
                                      memory_order_release);
                /* 'need_revalidate' can be reordered to happen before the atomic_store
                 * above, but it does not matter as this variable is not accessed by other
                 * threads. */
                ofproto->backer->need_revalidate = REV_FLOW_TABLE;
    
    static int
    type_run(const char *type)
        if (backer->need_revalidate) {
            udpif_revalidate(backer->udpif);
                //seq加1,并且唤醒正在等待seq的线程
                seq_change(udpif->reval_seq);
        }
    

    相关文章

      网友评论

        本文标题:ovs中handler和revalidator线程

        本文链接:https://www.haomeiwen.com/subject/jflffltx.html