/* ------------------------------------------------------------------------- * * nodeMergeAppend.cpp * routines to handle MergeAppend nodes. * * Portions Copyright (c) 2020 Huawei Technologies Co.,Ltd. * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION * src/gausskernel/runtime/executor/nodeMergeAppend.cpp * * ------------------------------------------------------------------------- * * INTERFACE ROUTINES * ExecInitMergeAppend - initialize the MergeAppend node * ExecMergeAppend - retrieve the next tuple from the node * ExecEndMergeAppend - shut down the MergeAppend node * ExecReScanMergeAppend - rescan the MergeAppend node * * NOTES * A MergeAppend node contains a list of one or more subplans. * These are each expected to deliver tuples that are sorted according * to a common sort key. The MergeAppend node merges these streams * to produce output sorted the same way. * * MergeAppend nodes don't make use of their left and right * subtrees, rather they maintain a list of subplans so * a typical MergeAppend node looks like this in the plan tree: * * ... * / * MergeAppend---+------+------+--- nil * / \ | | | * nil nil ... ... ... * subplans */ #include "postgres.h" #include "knl/knl_variable.h" #include "access/tableam.h" #include "executor/exec/execdebug.h" #include "executor/node/nodeMergeAppend.h" /* * It gets quite confusing having a heap array (indexed by integers) which * contains integers which index into the slots array. These typedefs try to * clear it up, but they're only documentation. */ typedef int SlotNumber; typedef int HeapPosition; static TupleTableSlot* ExecMergeAppend(PlanState* state); static void heap_insert_slot(MergeAppendState* node, SlotNumber new_slot); static void heap_siftup_slot(MergeAppendState* node); static int32 heap_compare_slots(MergeAppendState* node, SlotNumber slot1, SlotNumber slot2); /* ---------------------------------------------------------------- * ExecInitMergeAppend * * Begin all of the subscans of the MergeAppend node. * ---------------------------------------------------------------- */ MergeAppendState* ExecInitMergeAppend(MergeAppend* node, EState* estate, int eflags) { MergeAppendState* merge_state = makeNode(MergeAppendState); PlanState** merge_plan_states; int nplans; int i; ListCell* lc = NULL; /* check for unsupported flags */ Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); /* * Set up empty vector of subplan states */ nplans = list_length(node->mergeplans); merge_plan_states = (PlanState**)palloc0(nplans * sizeof(PlanState*)); /* * create new MergeAppendState for our node */ merge_state->ps.plan = (Plan*)node; merge_state->ps.state = estate; merge_state->mergeplans = merge_plan_states; merge_state->ms_nplans = nplans; merge_state->ps.ExecProcNode = ExecMergeAppend; merge_state->ms_slots = (TupleTableSlot**)palloc0(sizeof(TupleTableSlot*) * nplans); merge_state->ms_heap = (int*)palloc0(sizeof(int) * nplans); /* * Miscellaneous initialization * * MergeAppend plans don't have expression contexts because they never * call ExecQual or ExecProject. */ /* * MergeAppend nodes do have Result slots, which hold pointers to tuples, * so we have to initialize them. */ ExecInitResultTupleSlot(estate, &merge_state->ps); /* * call ExecInitNode on each of the plans to be executed and save the * results into the array "merge_plans". */ i = 0; foreach (lc, node->mergeplans) { Plan* initNode = (Plan*)lfirst(lc); merge_plan_states[i] = ExecInitNode(initNode, estate, eflags); i++; } /* * initialize output tuple type * src/gausskernel/runtime/executor/nodeMergeAppend.cpp * set to default value HEAP */ ExecAssignResultTypeFromTL(&merge_state->ps); merge_state->ps.ps_ProjInfo = NULL; /* * initialize sort-key information */ merge_state->ms_nkeys = node->numCols; merge_state->ms_sortkeys = (SortSupportData*)palloc0(sizeof(SortSupportData) * node->numCols); for (i = 0; i < node->numCols; i++) { SortSupport sortKey = merge_state->ms_sortkeys + i; sortKey->ssup_cxt = CurrentMemoryContext; sortKey->ssup_collation = node->collations[i]; sortKey->ssup_nulls_first = node->nullsFirst[i]; sortKey->ssup_attno = node->sortColIdx[i]; /* * It isn't feasible to perform abbreviated key conversion, since * tuples are pulled into merge_state's binary heap as needed. It would * likely be counter-productive to convert tuples into an abbreviated * representation as they're pulled up, so opt out of that additional * optimization entirely. */ sortKey->abbreviate = false; PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey); } /* * initialize to show we have not run the subplans yet */ merge_state->ms_heap_size = 0; merge_state->ms_initialized = false; merge_state->ms_last_slot = -1; return merge_state; } /* ---------------------------------------------------------------- * ExecMergeAppend * * Handles iteration over multiple subplans. * ---------------------------------------------------------------- */ static TupleTableSlot* ExecMergeAppend(PlanState* state) { MergeAppendState* node = castNode(MergeAppendState, state); TupleTableSlot* result = NULL; SlotNumber i; CHECK_FOR_INTERRUPTS(); if (!node->ms_initialized) { /* * First time through: pull the first tuple from each subplan, and set * up the heap. */ for (i = 0; i < node->ms_nplans; i++) { node->ms_slots[i] = ExecProcNode(node->mergeplans[i]); if (!TupIsNull(node->ms_slots[i])) heap_insert_slot(node, i); } node->ms_initialized = true; } else { /* * Otherwise, pull the next tuple from whichever subplan we returned * from last time, and insert it into the heap. (We could simplify * the logic a bit by doing this before returning from the prior call, * but it's better to not pull tuples until necessary.) */ i = node->ms_last_slot; node->ms_slots[i] = ExecProcNode(node->mergeplans[i]); if (!TupIsNull(node->ms_slots[i])) heap_insert_slot(node, i); } if (node->ms_heap_size > 0) { /* Return the topmost heap node, and sift up the remaining nodes */ i = node->ms_heap[0]; result = node->ms_slots[i]; node->ms_last_slot = i; heap_siftup_slot(node); } else { /* All the subplans are exhausted, and so is the heap */ result = ExecClearTuple(node->ps.ps_ResultTupleSlot); } return result; } /* * Insert a new slot into the heap. The slot must contain a valid tuple. */ static void heap_insert_slot(MergeAppendState* node, SlotNumber new_slot) { SlotNumber* heap = node->ms_heap; HeapPosition j; Assert(!TupIsNull(node->ms_slots[new_slot])); j = node->ms_heap_size++; /* j is where the "hole" is */ while (j > 0) { int i = (j - 1) / 2; if (heap_compare_slots(node, new_slot, node->ms_heap[i]) >= 0) break; heap[j] = heap[i]; j = i; } heap[j] = new_slot; } /* * Delete the heap top (the slot in heap[0]), and sift up. */ static void heap_siftup_slot(MergeAppendState* node) { SlotNumber* heap = node->ms_heap; HeapPosition i, n; if (--node->ms_heap_size <= 0) return; n = node->ms_heap_size; /* heap[n] needs to be reinserted */ i = 0; /* i is where the "hole" is */ for (;;) { int j = 2 * i + 1; if (j >= n) { break; } if (j + 1 < n && heap_compare_slots(node, heap[j], heap[j + 1]) > 0) { j++; } if (heap_compare_slots(node, heap[n], heap[j]) <= 0) { break; } heap[i] = heap[j]; i = j; } heap[i] = heap[n]; } /* * Compare the tuples in the two given slots. */ static int32 heap_compare_slots(MergeAppendState* node, SlotNumber slot1, SlotNumber slot2) { TupleTableSlot* s1 = node->ms_slots[slot1]; TupleTableSlot* s2 = node->ms_slots[slot2]; Assert(s1 != NULL); Assert(s2 != NULL); int nkey; Assert(!TupIsNull(s1)); Assert(!TupIsNull(s2)); for (nkey = 0; nkey < node->ms_nkeys; nkey++) { SortSupport sortKey = node->ms_sortkeys + nkey; AttrNumber attno = sortKey->ssup_attno; Datum datum1, datum2; bool isNull1 = false; bool isNull2 = false; int compare; datum1 = tableam_tslot_getattr(s1, attno, &isNull1); datum2 = tableam_tslot_getattr(s2, attno, &isNull2); compare = ApplySortComparator(datum1, isNull1, datum2, isNull2, sortKey); if (compare != 0) return compare; } return 0; } /* ---------------------------------------------------------------- * ExecEndMergeAppend * * Shuts down the subscans of the MergeAppend node. * * Returns nothing of interest. * ---------------------------------------------------------------- */ void ExecEndMergeAppend(MergeAppendState* node) { PlanState** merge_plans; int nplans; int i; /* * get information from the node */ merge_plans = node->mergeplans; nplans = node->ms_nplans; /* * shut down each of the subscans */ for (i = 0; i < nplans; i++) ExecEndNode(merge_plans[i]); } void ExecReScanMergeAppend(MergeAppendState* node) { int i; for (i = 0; i < node->ms_nplans; i++) { PlanState* subnode = node->mergeplans[i]; /* * ExecReScan doesn't know about my subplans, so I have to do * changed-parameter signaling myself. */ if (node->ps.chgParam != NULL) UpdateChangedParamSet(subnode, node->ps.chgParam); /* * If chgParam of subnode is not null then plan will be re-scanned by * first ExecProcNode. */ if (subnode->chgParam == NULL) ExecReScan(subnode); } node->ms_heap_size = 0; node->ms_initialized = false; node->ms_last_slot = -1; }