suricata 3.1 源码分析17 (流管理2)

时间:2021-09-17 09:21:35

TmThreadsManagement

static void *TmThreadsManagement(void *td)
{
/* block usr2. usr2 to be handled by the main thread only */
UtilSignalBlock(SIGUSR2);

ThreadVars *tv = (ThreadVars *)td;
//流管理中tv = tv_flowmgr (上文提到的)

TmSlot *s = (TmSlot *)tv->tm_slots;
//流管理中 s = tmm_modules[TMM_FLOWMANAGER]

TmEcode r = TM_ECODE_OK;

BUG_ON(s == NULL);

/* Set the thread name */
if (SCSetThreadName(tv->name) < 0) {
SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
}

if (tv->thread_setup_flags != 0)
TmThreadSetupOptions(tv);
//流管理时此处不会进入

/* Drop the capabilities for this thread */
SCDropCaps(tv);
//什么都不做

SCLogDebug("%s starting", tv->name);

if (s->SlotThreadInit != NULL) {
void *slot_data = NULL;
r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
//流管理时调用FlowManagerThreadInit,s->slot_initdat和slot_data均为空,在函数内初始化。

if (r != TM_ECODE_OK) {
EngineKill();

TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
pthread_exit((void *) -1);
//FlowManagerThreadInit调用失败时,设置tv_flowmgr的flag为“THV_CLOSED | THV_RUNNING_DONE”,然后结束线程。

return NULL;
}
(void)SC_ATOMIC_SET(s->slot_data, slot_data);
//设置s->slot_data为slot_data。
}
memset(&s->slot_pre_pq, 0, sizeof(PacketQueue));
memset(&s->slot_post_pq, 0, sizeof(PacketQueue));

StatsSetupPrivate(tv);
//这个函数比较难懂,首先调用StatsGetAllCountersArray,将(tv)->perf_private_ctx中的stats的counter指向所有的(tv)->perf_public_ctx,我理解这里是会记到stats.log中的统计内容的列表,针对不同的tv会有所不同,在流管理中为如下几个统计:“flow_mgr.closed_pruned”“flow_mgr.new_pruned”“flow_mgr.est_pruned”“flow.spare”“flow.emerg_mode_entered”“flow.emerg_mode_over”“flow.tcp_reuse”。然后调用StatsThreadRegister将每个线程的stats信息存储到stats_ctx->sts中。

TmThreadsSetFlag(tv, THV_INIT_DONE);
//设置tv的flag为THV_INIT_DONE

r = s->Management(tv, SC_ATOMIC_GET(s->slot_data));
//重点在这里,在流管理中s->Management指向的是FlowManager函数,这个在下章分析。

/* handle error */
if (r == TM_ECODE_FAILED) {
TmThreadsSetFlag(tv, THV_FAILED);
//失败设置对应的flag
}

if (TmThreadsCheckFlag(tv, THV_KILL)) {
StatsSyncCounters(tv);
//同步tv的统计,其中调用StatsUpdateCounterArray将(tv)->perf_private_ctx的值传给(tv)->perf_public_ctx。
}

TmThreadsSetFlag(tv, THV_RUNNING_DONE);
//设置对应flag
TmThreadWaitForFlag(tv, THV_DEINIT);
//等待flag被置为THV_DEINIT,后面就是释放资源等操作,我没有详细看。

if (s->SlotThreadExitPrintStats != NULL) {
s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
}

if (s->SlotThreadDeinit != NULL) {
r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
if (r != TM_ECODE_OK) {
TmThreadsSetFlag(tv, THV_CLOSED);
pthread_exit((void *) -1);
return NULL;
}
}

TmThreadsSetFlag(tv, THV_CLOSED);
pthread_exit((void *) 0);
return NULL;
}