SortAggRunner::SortAggRunner 是 SortAggRunner 类的构造函数,用于初始化排序聚合(Sort Aggregation)操作的相关成员变量和资源。SortAggRunner 继承自 BaseAggRunner,用于处理向量化的排序聚合逻辑,特别是在处理复杂聚合查询(如涉及 GROUP BY 和 OLAP 函数)时。
/*
* @Description: Sort agg constructed function.
* @描述: 构造SortAggRunner函数,用于处理聚合操作的排序。
*/
SortAggRunner::SortAggRunner(VecAggState* runtime)
: BaseAggRunner(runtime, false), // 调用基类BaseAggRunner的构造函数,传入runtime和false(表示不使用streaming aggregation)。
m_FreeMem(false), // 初始化m_FreeMem为false,表示是否释放内存。
m_ApFun(false), // 初始化m_ApFun为false,表示是否使用OLAP函数。
m_noData(true), // 初始化m_noData为true,表示当前没有数据。
m_batchSortIn(NULL), // 初始化m_batchSortIn为NULL,表示输入批处理为空。
m_batchSortOut(NULL), // 初始化m_batchSortOut为NULL,表示输出批处理为空。
m_SortBatch(NULL), // 初始化m_SortBatch为NULL,表示排序批处理对象为空。
m_sortSource(NULL) // 初始化m_sortSource为NULL,表示排序源为空。
{
VecAgg* node = NULL; // 声明一个VecAgg指针node并初始化为NULL。
errno_t rc;
node = (VecAgg*)(runtime->ss.ps.plan); // 从runtime的计划状态中获取VecAgg节点。
init_phase(); // 调用init_phase()函数初始化聚合的阶段信息。
/* OLAP function */
if (node->groupingSets) { // 如果节点包含分组集(grouping sets),说明需要特殊处理。
m_ApFun = true; // 将m_ApFun标记为true,表示当前使用的是OLAP函数。
init_indexForApFun(); // 初始化与OLAP函数相关的索引。
if (m_runtime->numphases > 1) { // 如果存在多个阶段(numphases > 1),需要保存每个阶段的返回结果。
/* Store return results of phase sort. */
m_SortBatch = New(CurrentMemoryContext)
VectorBatch(CurrentMemoryContext, m_runtime->ss.ss_ScanTupleSlot->tts_tupleDescriptor); // 分配内存并初始化m_SortBatch,用于保存阶段排序的结果批处理。
}
build_batch(); // 构建批处理,准备进行数据聚合操作。
m_cellSize = offsetof(hashCell, m_val) + m_cols * sizeof(hashVal); // 计算哈希单元的大小,用于存储聚合键值对。
}
m_groupState = GET_MATCH_KEY; // 设置分组状态为GET_MATCH_KEY,表示正在寻找匹配的键值。
/* Init sort state for distinct operate. */
int64 workmem = SET_NODEMEM(node->plan.operatorMemKB[0], node->plan.dop); // 计算用于排序操作的工作内存。
int64 maxmem = (node->plan.operatorMaxMem > 0) ? SET_NODEMEM(node->plan.operatorMaxMem, node->plan.dop) : 0; // 如果有最大内存限制,则设置最大内存,否则为0。
initialize_sortstate(workmem, maxmem, node->plan.plan_node_id, SET_DOP(node->plan.dop)); // 初始化排序状态,传入工作内存、最大内存、节点ID和并行度(dop)。
int numset = Max(m_runtime->maxsets, 1); // 计算最大集合数,至少为1。
m_sortGrps = (GroupintAtomContainer*)palloc0(numset * sizeof(GroupintAtomContainer)); // 分配并初始化用于存储分组信息的结构体数组。
for (int i = 0; i < numset; i++) { // 遍历每个集合进行初始化。
rc = memset_s(
m_sortGrps[i].sortGrp, 2 * BatchMaxSize * sizeof(hashCell*), 0, 2 * BatchMaxSize * sizeof(hashCell*)); // 将排序分组数组清零,确保没有旧数据残留。
securec_check(rc, "\0", "\0"); // 检查memset操作是否成功。
m_sortGrps[i].sortGrpIdx = 0; // 初始化分组索引为0。
m_sortGrps[i].sortCurrentBuf = New(CurrentMemoryContext) VarBuf(CurrentMemoryContext); // 为当前排序分组分配新的缓冲区。
m_sortGrps[i].sortCurrentBuf->Init(); // 初始化当前分组的缓冲区。
m_sortGrps[i].sortBckBuf = New(CurrentMemoryContext) VarBuf(CurrentMemoryContext); // 为备份缓冲区分配新的内存。
m_sortGrps[i].sortBckBuf->Init(); // 初始化备份缓冲区。
}
m_runState = AGG_FETCH; // 设置运行状态为AGG_FETCH,表示当前阶段为获取数据。
m_prepareState = GET_SOURCE; // 设置准备状态为GET_SOURCE,表示准备从数据源获取数据。
BindingFp(); // 绑定所需的函数指针或回调函数。
}
示例
假设我们有一个SQL查询,涉及到对大数据集的分组聚合,并且需要对聚合结果进行排序。这个查询可能涉及某种 OLAP (Online Analytical Processing) 函数,如GROUPING SETS,或对某些列执行DISTINCT操作。这时,SortAggRunner类的构造函数会负责初始化排序聚合的执行环境。
示例SQL查询:
SELECT department, COUNT(employee_id)
FROM employees
GROUP BY department
GROUPING SETS ((department), ())
ORDER BY department;
在上述查询中,要求根据department进行聚合,计算员工数量,并对结果进行排序。为了实现这个功能,数据库引擎需要在聚合阶段(GROUP BY和GROUPING SETS)中进行排序。这时候,SortAggRunner类的构造函数将会被调用,确保为后续的排序、聚合以及分组操作分配和初始化内存。
- 首先,构造函数会确定是否使用OLAP函数(通过GROUPING SETS)。
- 接着,会初始化用于存储中间结果的批处理(m_SortBatch),并分配内存。
- 它还会初始化不同的状态,如当前处理阶段的状态(m_runState)以及排序状态(initialize_sortstate())。
- 分配与分组和排序相关的内存结构(m_sortGrps),并为每个集合(numset)准备独立的缓冲区,用来存储中间分组结果。