【OpenGauss源码学习 —— (VecSortAgg)】-SortAggRunner::SortAggRunner 函数

时间:2024-10-18 08:26:37

  SortAggRunner::SortAggRunnerSortAggRunner 类的构造函数用于初始化排序聚合Sort Aggregation)操作的相关成员变量和资源SortAggRunner 继承自 BaseAggRunner,用于处理向量化的排序聚合逻辑,特别是在处理复杂聚合查询(如涉及 GROUP BYOLAP 函数)时。

/*
 * @Description: Sort agg constructed function.
 * @描述: 构造SortAggRunner函数,用于处理聚合操作的排序。
 */
SortAggRunner::SortAggRunner(VecAggState* runtime)
    : BaseAggRunner(runtime, false),  // 调用基类BaseAggRunner的构造函数,传入runtime和false(表示不使用streaming aggregation)。
      m_FreeMem(false),  // 初始化m_FreeMem为false,表示是否释放内存。
      m_ApFun(false),  // 初始化m_ApFun为false,表示是否使用OLAP函数。
      m_noData(true),  // 初始化m_noData为true,表示当前没有数据。
      m_batchSortIn(NULL),  // 初始化m_batchSortIn为NULL,表示输入批处理为空。
      m_batchSortOut(NULL),  // 初始化m_batchSortOut为NULL,表示输出批处理为空。
      m_SortBatch(NULL),  // 初始化m_SortBatch为NULL,表示排序批处理对象为空。
      m_sortSource(NULL)  // 初始化m_sortSource为NULL,表示排序源为空。
{
    VecAgg* node = NULL;  // 声明一个VecAgg指针node并初始化为NULL。
    errno_t rc;

    node = (VecAgg*)(runtime->ss.ps.plan);  // 从runtime的计划状态中获取VecAgg节点。

    init_phase();  // 调用init_phase()函数初始化聚合的阶段信息。

    /* OLAP function */
    if (node->groupingSets) {  // 如果节点包含分组集(grouping sets),说明需要特殊处理。
        m_ApFun = true;  // 将m_ApFun标记为true,表示当前使用的是OLAP函数。
        init_indexForApFun();  // 初始化与OLAP函数相关的索引。

        if (m_runtime->numphases > 1) {  // 如果存在多个阶段(numphases > 1),需要保存每个阶段的返回结果。
            /* Store return results of phase sort. */
            m_SortBatch = New(CurrentMemoryContext)
                VectorBatch(CurrentMemoryContext, m_runtime->ss.ss_ScanTupleSlot->tts_tupleDescriptor);  // 分配内存并初始化m_SortBatch,用于保存阶段排序的结果批处理。
        }

        build_batch();  // 构建批处理,准备进行数据聚合操作。

        m_cellSize = offsetof(hashCell, m_val) + m_cols * sizeof(hashVal);  // 计算哈希单元的大小,用于存储聚合键值对。
    }

    m_groupState = GET_MATCH_KEY;  // 设置分组状态为GET_MATCH_KEY,表示正在寻找匹配的键值。

    /* Init sort state for distinct operate. */
    int64 workmem = SET_NODEMEM(node->plan.operatorMemKB[0], node->plan.dop);  // 计算用于排序操作的工作内存。
    int64 maxmem = (node->plan.operatorMaxMem > 0) ? SET_NODEMEM(node->plan.operatorMaxMem, node->plan.dop) : 0;  // 如果有最大内存限制,则设置最大内存,否则为0。
    initialize_sortstate(workmem, maxmem, node->plan.plan_node_id, SET_DOP(node->plan.dop));  // 初始化排序状态,传入工作内存、最大内存、节点ID和并行度(dop)。

    int numset = Max(m_runtime->maxsets, 1);  // 计算最大集合数,至少为1。

    m_sortGrps = (GroupintAtomContainer*)palloc0(numset * sizeof(GroupintAtomContainer));  // 分配并初始化用于存储分组信息的结构体数组。
    for (int i = 0; i < numset; i++) {  // 遍历每个集合进行初始化。
        rc = memset_s(
            m_sortGrps[i].sortGrp, 2 * BatchMaxSize * sizeof(hashCell*), 0, 2 * BatchMaxSize * sizeof(hashCell*));  // 将排序分组数组清零,确保没有旧数据残留。
        securec_check(rc, "\0", "\0");  // 检查memset操作是否成功。

        m_sortGrps[i].sortGrpIdx = 0;  // 初始化分组索引为0。
        m_sortGrps[i].sortCurrentBuf = New(CurrentMemoryContext) VarBuf(CurrentMemoryContext);  // 为当前排序分组分配新的缓冲区。
        m_sortGrps[i].sortCurrentBuf->Init();  // 初始化当前分组的缓冲区。
        m_sortGrps[i].sortBckBuf = New(CurrentMemoryContext) VarBuf(CurrentMemoryContext);  // 为备份缓冲区分配新的内存。
        m_sortGrps[i].sortBckBuf->Init();  // 初始化备份缓冲区。
    }

    m_runState = AGG_FETCH;  // 设置运行状态为AGG_FETCH,表示当前阶段为获取数据。
    m_prepareState = GET_SOURCE;  // 设置准备状态为GET_SOURCE,表示准备从数据源获取数据。
    BindingFp();  // 绑定所需的函数指针或回调函数。
}

示例

  假设我们有一个SQL查询,涉及到对大数据集的分组聚合,并且需要对聚合结果进行排序。这个查询可能涉及某种 OLAP (Online Analytical Processing) 函数,如GROUPING SETS,或对某些列执行DISTINCT操作。这时,SortAggRunner类的构造函数会负责初始化排序聚合的执行环境。

示例SQL查询:

SELECT department, COUNT(employee_id)
FROM employees
GROUP BY department
GROUPING SETS ((department), ())
ORDER BY department;

  在上述查询中,要求根据department进行聚合计算员工数量,并对结果进行排序。为了实现这个功能,数据库引擎需要在聚合阶段(GROUP BYGROUPING SETS)中进行排序。这时候,SortAggRunner类的构造函数将会被调用,确保为后续的排序聚合以及分组操作分配和初始化内存。

  1. 首先,构造函数会确定是否使用OLAP函数(通过GROUPING SETS)。
  2. 接着,会初始化用于存储中间结果的批处理(m_SortBatch),并分配内存。
  3. 它还会初始化不同的状态,如当前处理阶段的状态m_runState)以及排序状态initialize_sortstate())。
  4. 分配与分组和排序相关的内存结构m_sortGrps),并为每个集合(numset)准备独立的缓冲区,用来存储中间分组结果