美文网首页
suricata的线程和模块功能

suricata的线程和模块功能

作者: 梁帆 | 来源:发表于2021-07-28 16:05 被阅读0次

一、线程、槽和模块

1.定义

suricata中tv、slot和tm的关系必须要搞清楚。

tv: ThreadVars类型,线程

slot: TmSlot类型,

tm: TmModule类型,模块

在源码中找到这三者的定义。

在threadvars.h中,有ThreadVars定义:

/** \brief Per thread variable structure */
typedef struct ThreadVars_ {
    pthread_t t;
    /** function pointer to the function that runs the packet pipeline for
     *  this thread. It is passed directly to pthread_create(), hence the
     *  void pointers in and out. */
    void *(*tm_func)(void *);

    char name[16];
    char *printable_name;
    char *thread_group_name;

    uint8_t thread_setup_flags;

    /** the type of thread as defined in tm-threads.h (TVT_PPT, TVT_MGMT) */
    uint8_t type;

    uint16_t cpu_affinity; /** cpu or core number to set affinity to */
    int thread_priority; /** priority (real time) for this thread. Look at threads.h */


    /** TmModule::flags for each module part of this thread */
    uint8_t tmm_flags;

    uint8_t cap_flags; /**< Flags to indicate the capabilities of all the
                            TmModules resgitered under this thread */
    uint8_t inq_id;
    uint8_t outq_id;

    /** local id */
    int id;

    /** incoming queue and handler */
    Tmq *inq;
    struct Packet_ * (*tmqh_in)(struct ThreadVars_ *);

    SC_ATOMIC_DECLARE(uint32_t, flags);

    /** list of of TmSlot objects together forming the packet pipeline. */
    struct TmSlot_ *tm_slots;

    /** pointer to the flowworker in the pipeline. Used as starting point
     *  for injected packets. Can be NULL if the flowworker is not part
     *  of this thread. */
    struct TmSlot_ *tm_flowworker;

    /** outgoing queue and handler */
    Tmq *outq;
    void *outctx;
    void (*tmqh_out)(struct ThreadVars_ *, struct Packet_ *);

    /** queue for decoders to temporarily store extra packets they
     *  generate. */
    PacketQueueNoLock decode_pq;

    /** Stream packet queue for flow time out injection. Either a pointer to the
     *  workers input queue or to stream_pq_local */
    struct PacketQueue_ *stream_pq;
    struct PacketQueue_ *stream_pq_local;

    /* counters */

    /** private counter store: counter updates modify this */
    StatsPrivateThreadContext perf_private_ctx;

    /** pointer to the next thread */
    struct ThreadVars_ *next;

    /** public counter store: counter syncs update this */
    StatsPublicThreadContext perf_public_ctx;

    /* mutex and condition used by management threads */

    SCCtrlMutex *ctrl_mutex;
    SCCtrlCondT *ctrl_cond;

    struct FlowQueue_ *flow_queue;

} ThreadVars;

在tm-thread.h中,有TmSlot的定义:

typedef struct TmSlot_ {
    /* function pointers */
    union {
        TmSlotFunc SlotFunc;
        TmEcode (*PktAcqLoop)(ThreadVars *, void *, void *);
        TmEcode (*Management)(ThreadVars *, void *);
    };
    /** linked list of slots, used when a pipeline has multiple slots
     *  in a single thread. */
    struct TmSlot_ *slot_next;

    SC_ATOMIC_DECLARE(void *, slot_data);

    TmEcode (*SlotThreadInit)(ThreadVars *, const void *, void **);
    void (*SlotThreadExitPrintStats)(ThreadVars *, void *);
    TmEcode (*SlotThreadDeinit)(ThreadVars *, void *);

    /* data storage */
    const void *slot_initdata;
    /* store the thread module id */
    int tm_id;

} TmSlot;

在tm-modules.h中,有TmModule的定义:

typedef struct TmModule_ {
    const char *name;

    /** thread handling */
    TmEcode (*ThreadInit)(ThreadVars *, const void *, void **);
    void (*ThreadExitPrintStats)(ThreadVars *, void *);
    TmEcode (*ThreadDeinit)(ThreadVars *, void *);

    /** the packet processing function */
    TmEcode (*Func)(ThreadVars *, Packet *, void *);

    TmEcode (*PktAcqLoop)(ThreadVars *, void *, void *);

    /** terminates the capture loop in PktAcqLoop */
    TmEcode (*PktAcqBreakLoop)(ThreadVars *, void *);

    TmEcode (*Management)(ThreadVars *, void *);

    /** global Init/DeInit */
    TmEcode (*Init)(void);
    TmEcode (*DeInit)(void);
#ifdef UNITTESTS
    void (*RegisterTests)(void);
#endif
    uint8_t cap_flags;   /**< Flags to indicate the capability requierment of
                             the given TmModule */
    /* Other flags used by the module */
    uint8_t flags;
    
} TmModule;

三者的关系如下图所示:

img

每个线程都包含一个slot的链表,每个slot结点都悬挂着不同的模块,程序执行时会遍历slot链表,按照加入顺序执行模块。

2.main函数中的模式、模块、线程和槽

(1)注册运行模式

SuricataMain函数中开头就有函数InitGlobal,该函数体内调用了一个RunModeRegisterRunModes函数,我们找到这个函数的定义:

/**
 * \brief Register all runmodes in the engine.
 */
void RunModeRegisterRunModes(void)
{
    memset(runmodes, 0, sizeof(runmodes));

    RunModeIdsPcapRegister();       // IDS+pcap
    RunModeFilePcapRegister();      // File+pcap
    RunModeIdsPfringRegister();     // IDS+pfring
    RunModeIpsNFQRegister();        // IPS+nfq, 以下同理
    RunModeIpsIPFWRegister();
    RunModeErfFileRegister();
    RunModeErfDagRegister();
    RunModeNapatechRegister();
    RunModeIdsAFPRegister();
    RunModeIdsNetmapRegister();
    RunModeIdsNflogRegister();
    RunModeUnixSocketRegister();
    RunModeIpsWinDivertRegister();
#ifdef UNITTESTS
    UtRunModeRegister();
#endif
    return;
}

其中每一种运行模式调用RunModeRegisterNewRunMode注册各自的Custom mode。例如

RunModeFilePcapRegister函数:

void RunModeFilePcapRegister(void)
{
    RunModeRegisterNewRunMode(RUNMODE_PCAP_FILE, "single",
                              "Single threaded pcap file mode",
                              RunModeFilePcapSingle);
    RunModeRegisterNewRunMode(RUNMODE_PCAP_FILE, "autofp",
                              "Multi threaded pcap file mode.  Packets from "
                              "each flow are assigned to a single detect thread, "
                              "unlike \"pcap-file-auto\" where packets from "
                              "the same flow can be processed by any detect "
                              "thread",
                              RunModeFilePcapAutoFp);

    return;
}

RunModeRegisterNewRunMode函数会将回调函数(RunModeFilePcapSingle、RunModeFilePcapAutoFp)添加到runmodes全局数组中。

RunModes类型的全局数组==runmodes==保存运行模式,存储结构如下图:

img

注:这张图应该比较老了,现在只有single、autofp和workers三种自定义模式。

RunMode和RunModes结构体的定义如下:

/**
 * \brief Holds description for a runmode.
 */
typedef struct RunMode_ {
    /* the runmode type */
    enum RunModes runmode;
    const char *name;           // 自定义模式,如single, autofp, workers
    const char *description;
    /* runmode function */
    int (*RunModeFunc)(void);
} RunMode;

typedef struct RunModes_ {
    int cnt;
    RunMode *runmodes;
} RunModes;

同时也可以看到,suricata有single, autofp和workers这几种不同的自定义模式。这三种模式,在下文会细说。

(2)注册模块

在suricata.c中找到RegisterAllModules的定义,如下所示:

void RegisterAllModules(void)
{
    // zero all module storage
    memset(tmm_modules, 0, TMM_SIZE * sizeof(TmModule));

    /* commanders */
    TmModuleUnixManagerRegister();
    /* managers */
    TmModuleFlowManagerRegister();
    TmModuleFlowRecyclerRegister();
    TmModuleBypassedFlowManagerRegister();
    /* nfq */
    TmModuleReceiveNFQRegister();
    TmModuleVerdictNFQRegister();
    TmModuleDecodeNFQRegister();
    /* ipfw */
    TmModuleReceiveIPFWRegister();
    TmModuleVerdictIPFWRegister();
    TmModuleDecodeIPFWRegister();
    /* pcap live */
    TmModuleReceivePcapRegister();
    TmModuleDecodePcapRegister();
    /* pcap file */
    TmModuleReceivePcapFileRegister();
    TmModuleDecodePcapFileRegister();
    /* af-packet */
    TmModuleReceiveAFPRegister();
    TmModuleDecodeAFPRegister();
    /* netmap */
    TmModuleReceiveNetmapRegister();
    TmModuleDecodeNetmapRegister();
    /* pfring */
    TmModuleReceivePfringRegister();
    TmModuleDecodePfringRegister();
    /* dag file */
    TmModuleReceiveErfFileRegister();
    TmModuleDecodeErfFileRegister();
    /* dag live */
    TmModuleReceiveErfDagRegister();
    TmModuleDecodeErfDagRegister();
    /* napatech */
    TmModuleNapatechStreamRegister();
    TmModuleNapatechDecodeRegister();

    /* flow worker */
    TmModuleFlowWorkerRegister();
    /* respond-reject */
    TmModuleRespondRejectRegister();

    /* log api */
    TmModuleLoggerRegister();
    TmModuleStatsLoggerRegister();

    TmModuleDebugList();
    /* nflog */
    TmModuleReceiveNFLOGRegister();
    TmModuleDecodeNFLOGRegister();

    /* windivert */
    TmModuleReceiveWinDivertRegister();
    TmModuleVerdictWinDivertRegister();
    TmModuleDecodeWinDivertRegister();
}

选择其中一个注册函数打开:

/**
 * \brief Registration Function for ReceivePcap.
 */
void TmModuleReceivePcapRegister (void)
{
    tmm_modules[TMM_RECEIVEPCAP].name = "ReceivePcap";
    tmm_modules[TMM_RECEIVEPCAP].ThreadInit = ReceivePcapThreadInit;
    tmm_modules[TMM_RECEIVEPCAP].PktAcqLoop = ReceivePcapLoop;
    tmm_modules[TMM_RECEIVEPCAP].PktAcqBreakLoop = ReceivePcapBreakLoop;
    tmm_modules[TMM_RECEIVEPCAP].ThreadExitPrintStats = ReceivePcapThreadExitStats;
    tmm_modules[TMM_RECEIVEPCAP].cap_flags = SC_CAP_NET_RAW;
    tmm_modules[TMM_RECEIVEPCAP].flags = TM_FLAG_RECEIVE_TM;
#ifdef UNITTESTS
    tmm_modules[TMM_RECEIVEPCAP].RegisterTests = SourcePcapRegisterTests;
#endif
}

可以发现模块被保存到了tmm_modules数组中,是模块类型TmModule的数组。

存储结构如下图所示:

img

(3)模块初始化

模块初始化的函数是TmModuleRunInit,定义如下:

void TmModuleRunInit(void)
{
    TmModule *t;
    uint16_t i;

    for (i = 0; i < TMM_SIZE; i++) {
        t = &tmm_modules[i];

        if (t->name == NULL)
            continue;

        if (t->Init == NULL)
            continue;

        t->Init();
    }
}

该函数执行了模块全局初始化函数Init()。

(4)运行模式调度

RunModeDispatch函数的功能主要如下:

从配置中读取运行模式;

获得该运行模式中默认的Custom mode(如single, auto, worker等);

执行Custom mode中设置的回调函数(如RunModeFilePcapSingle)。

(5)运行模式回调函数

/**
 * \brief Single thread version of the Pcap file processing.
 */
int RunModeFilePcapSingle(void)
{
    const char *file = NULL;
    char tname[TM_THREAD_NAME_MAX];

    if (ConfGet("pcap-file.file", &file) == 0) {
        FatalError(SC_ERR_FATAL, "Failed retrieving pcap-file from Conf");
    }

    RunModeInitialize();
    TimeModeSetOffline();

    PcapFileGlobalInit();

    snprintf(tname, sizeof(tname), "%s#01", thread_name_single);

    /* create the threads */
    ThreadVars *tv = TmThreadCreatePacketHandler(tname,
                                                 "packetpool", "packetpool",
                                                 "packetpool", "packetpool",
                                                 "pktacqloop");
    if (tv == NULL) {
        FatalError(SC_ERR_FATAL, "threading setup failed");
    }

    TmModule *tm_module = TmModuleGetByName("ReceivePcapFile");
    if (tm_module == NULL) {
        FatalError(SC_ERR_FATAL, "TmModuleGetByName failed for ReceivePcap");
    }
    TmSlotSetFuncAppend(tv, tm_module, file);

    tm_module = TmModuleGetByName("DecodePcapFile");
    if (tm_module == NULL) {
        FatalError(SC_ERR_FATAL, "TmModuleGetByName DecodePcap failed");
    }
    TmSlotSetFuncAppend(tv, tm_module, NULL);

    tm_module = TmModuleGetByName("FlowWorker");
    if (tm_module == NULL) {
        FatalError(SC_ERR_FATAL, "TmModuleGetByName for FlowWorker failed");
    }
    TmSlotSetFuncAppend(tv, tm_module, NULL);

    TmThreadSetCPU(tv, WORKER_CPU_SET);

    if (TmThreadSpawn(tv) != TM_ECODE_OK) {
        FatalError(SC_ERR_FATAL, "TmThreadSpawn failed");
    }
    return 0;
}

这个回调函数主要做了这些事:

通用模块初始化: RunModeInitialize函数;

创建线程tv句柄: TmThreadCreatePacketHandler函数;

从模块数组tmm_modules中通过给定名字获得模块: TmModuleGetByName函数;

插入槽slot: TmSlotSetFuncAppend函数;

真正创建线程: TmThreadSpawn函数。

执行顺序

  • 运行模式注册,设置回调函数
  • 所有模块注册,设置模块相关函数
  • 所有模块初始化
  • 从配置获取运行模式类型,执行回调函数
  • 创建线程
  • 根据模块名称从全局数组tmm_modules中得到模块指针
  • 插入线程槽slot

二、模块功能解析

1.三种自定义运行模式

首先,三种自定义模式的运行流程图如下:

三种运行模式(新).png

single(ids)

单工作线程完成所有工作。首个模块完成抓包,其他模块依次处理,没有后续队列。

workers(ids)

根据监听网卡数量和每个网卡可启用的并行抓包线程数量确定工作线程数量。每个工作线程与single模式单线程工作流程一样,互不影响。

autofp(ids)

两种数据包处理线程,分别是收包线程和检测线程。收包线程和检测线程间通过PacketQueue传递数据包进行处理,每个检测线程对应一个队列,多个检测线程时需要为数据包选择队列以确保同一个流的数据包按顺序传递给同一个检测线程。

其中的autofp模式是最复杂的,也是默认的自定义模式。如下所示:

autofp模式.jpg

2.模块

(1)简介

1、Receive模块:

收集数据包,封装成Packet对象后,将其传递给Decode线程模块。(源码标题的source模块runmode模块)

2、Decode模块:

对Packet按协议4层模型(数据链路层、网络层、传输层、应用层)进行解码,获取协议和负荷信息,解码完成后将 Packet传递给FlowWorker线程模块。该模块主要是进行packet解码,不处理应用层。应用层由专门的应用层解码模块处理。(源码标题的decode模块)

3、FlowWorker模块:

对packets进行分配flow(flow模块),Tcp回话管理、Tcp重组(stream-tcp模块),应用层数据解析处理(app-layer模块),Detect规则检测(detect模块)。

4、Verdict模块:

根据Detect模块检测的结果,对drop标记的包进行丢弃处理。(stream-tcp模块)

5、RespondReject模块:

根据Detect模块检测的结果,对reject标记的包需要向双端发送reset包。(respond-reject模块)

6、logs模块

将处理结果记录在日志中。(output模块)

(2)线程模块间的数据传递

同线程内的模块之间主要是以参数的形式进行数据传递,不同线程之间以共享队列的方式进行数据传递。

每个线程由ThreaVars结构体来抽象,ThreadVars对象指定线程输入数据队列inq和输出数据队列outq。

这些队列在多个线程之间进行共享,一个线程的输出队列可能是另一个线程的输入队列。

[图片上传失败...(image-3ca5b5-1627459445094)]

绿色线条表示数据的走向。从输入队列获取数据包packet,经过slot函数(模块函数)的处理后,再将加工后的packet放到输出队列中。

3.autofp模式研究

img autofp模式.png autofp模式.jpg

以上三张图都是autofp模式下的运行流程图。

(1)RX thread

从上面"autofp"模式中可以看出RX thread所处的位置和包含的功能模块。它主要用于收集packets并对其进行解码,将处理后的packets放到pickup queue中,以供下个模块使用。

在这里插入图片描述

RX thread的作用体现为线程函数TmThreadsSlotPktAcqLoop,主要执行的任务为:

1.给线程设置一个有意义的名称;

2.将线程绑定到指定CPU核心;

3.创建Packet对象池,用于快速存放网络数据包数据;

4.初始化与该线程关联的线程模块;

5.调用数据包收集模块,启动数据包收集,依次调用各模块处理packet,然后将packet放到输出队列中;

6.如果接收到退出信号,则中止数据包收集,销毁Packet对象池,调用线程模块的退出清理函数;

7.退出线程的执行。

(2)W thread

从上面"autofp"模式中可以看出W thread所处的位置和包含的功能模块。它主要由FlowWorker模块和一些log模块组成, 主要完成数据检测和特定格式特定数据的日志输出。

在这里插入图片描述

W thread的作用体现为线程函数==TmThreadsSlotVar==,主要执行的任务为:

1.给线程设置一个有意义的名称;

2.将线程绑定到指定CPU核心;

3.创建空Packet对象池;

4.初始化与该线程关联的线程模块;

5.从输入队列中获取packet,传递给线程模块依次处理后,放到输出队列中;

6.如果接收到退出信号,则中止数据包收集,销毁Packet对象池,调用线程模块的退出清理函数;

7.退出线程的执行。

(3)TX thread

TX thread的功能就是根据Detect模块检测的结果,对Verdict模块和RespondReject模块对标记的包进行相应的处理。

(4)autofp模式下数据包的传递

autofp数据包的传递.png

蓝色线条表示packet的传递路径。

相关文章

网友评论

      本文标题:suricata的线程和模块功能

      本文链接:https://www.haomeiwen.com/subject/rjvzmltx.html