同步source code

日期: 12-26
    revision: ee5b054c
This commit is contained in:
dengxuyue
2020-12-28 22:19:21 +08:00
parent b7337ff802
commit 1567043064
6076 changed files with 2376818 additions and 741042 deletions

View File

@ -124,6 +124,7 @@
#include "postgres.h"
#include "knl/knl_variable.h"
#include "access/tableam.h"
#include "catalog/pg_aggregate.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_type.h"
@ -227,7 +228,6 @@ void initialize_phase(AggState* aggstate, int newphase)
sortnode->collations,
sortnode->nullsFirst,
workMem,
NULL,
false,
maxMem,
sortnode->plan.plan_node_id,
@ -297,7 +297,6 @@ static void initialize_aggregate(AggState* aggstate, AggStatePerAgg peraggstate,
peraggstate->sortCollations[0],
peraggstate->sortNullsFirst[0],
local_work_mem,
NULL,
false);
} else {
peraggstate->sortstates[aggstate->current_set] = tuplesort_begin_heap(peraggstate->evaldesc,
@ -307,7 +306,6 @@ static void initialize_aggregate(AggState* aggstate, AggStatePerAgg peraggstate,
peraggstate->sortCollations,
peraggstate->sortNullsFirst,
local_work_mem,
NULL,
false,
max_mem,
plan->plan_node_id,
@ -478,8 +476,16 @@ static void advance_transition_function(
fcinfo->argnull[0] = pergroupstate->transValueIsNull;
fcinfo->isnull = false; /* just in case transfn doesn't set it */
Node *origin_fcxt = fcinfo->context;
if (IS_PGXC_DATANODE && peraggstate->is_avg) {
Node *fcontext = (Node *)palloc0(sizeof(Node));
fcontext->type = (NodeTag)(peraggstate->is_avg);
fcinfo->context = fcontext;
}
newVal = FunctionCallInvoke(fcinfo);
aggstate->curperagg = NULL;
fcinfo->context = origin_fcxt;
/*
* If pass-by-ref datatype, must copy the new value into aggcontext and
@ -808,7 +814,7 @@ static void process_ordered_aggregate_multi(
* Extract the first numTransInputs as datums to pass to the transfn.
* (This will help execTuplesMatch too, so do it immediately.)
*/
slot_getsomeattrs(slot1, numTransInputs);
tableam_tslot_getsomeattrs(slot1, numTransInputs);
if (numDistinctCols == 0 || !haveOldValue || newAbbrevVal != oldAbbrevVal ||
!execTuplesMatch(
@ -999,8 +1005,9 @@ static void prepare_projection_slot(AggState* aggstate, TupleTableSlot* slot, in
} else if (aggstate->all_grouped_cols) {
ListCell* lc = NULL;
Assert(slot->tts_tupleDescriptor != NULL);
/* all_grouped_cols is arranged in desc order */
slot_getsomeattrs(slot, linitial_int(aggstate->all_grouped_cols));
tableam_tslot_getsomeattrs(slot, linitial_int(aggstate->all_grouped_cols));
foreach (lc, aggstate->all_grouped_cols) {
int attnum = lfirst_int(lc);
@ -1167,7 +1174,7 @@ static void build_hash_table(AggState* aggstate)
* To eliminate duplicates, we build a bitmapset of the needed columns, then
* convert it to an integer list (cheaper to scan at runtime). The list is
* in decreasing order so that the first entry is the largest;
* lookup_hash_entry depends on this to use slot_getsomeattrs correctly.
* lookup_hash_entry depends on this to use table's getsomeattrs correctly.
* Note that the list is preserved over ExecReScanAgg, so we allocate it in
* the per-query context (unlike the hash table itself).
*
@ -1234,6 +1241,9 @@ uint32 ComputeHashValue(TupleHashTable hashtbl)
slot = hashtable->inputslot;
hashfunctions = hashtable->in_hash_funcs;
/* Get the Table Accessor Method*/
Assert(slot != NULL && slot->tts_tupleDescriptor != NULL);
for (i = 0; i < numCols; i++) {
AttrNumber att = keyColIdx[i];
Datum attr;
@ -1242,7 +1252,7 @@ uint32 ComputeHashValue(TupleHashTable hashtbl)
/* rotate hashkey left 1 bit at each step */
hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0);
attr = slot_getattr(slot, att, &isNull);
attr = tableam_tslot_getattr(slot, att, &isNull);
/* treat nulls as having hash key 0 */
if (!isNull) {
@ -1280,7 +1290,7 @@ static AggHashEntry lookup_hash_entry(AggState* aggstate, TupleTableSlot* inputs
}
/* transfer just the needed columns into hashslot */
slot_getsomeattrs(inputslot, linitial_int(aggstate->hash_needed));
tableam_tslot_getsomeattrs(inputslot, linitial_int(aggstate->hash_needed));
foreach (l, aggstate->hash_needed) {
int varNumber = lfirst_int(l) - 1;
@ -1327,7 +1337,12 @@ static AggHashEntry lookup_hash_entry(AggState* aggstate, TupleTableSlot* inputs
MemoryContextSwitchTo(oldContext);
TempFileControl->filesource->writeTup(tuple, hashvalue & (TempFileControl->filenum - 1));
}
} else if (((Agg *)aggstate->ss.ps.plan)->unique_check) {
ereport(ERROR,
(errcode(ERRCODE_CARDINALITY_VIOLATION),
errmsg("more than one row returned by a subquery used as an expression")));
}
return entry;
}
@ -1789,7 +1804,7 @@ static void agg_fill_hash_table(AggState* aggstate)
(aggstate->ss.ps.plan)->plan_node_id,
getSessionMemoryUsageMB()));
}
(void)pgstat_report_waitstatus(oldStatus);
pgstat_report_waitstatus(oldStatus);
break;
}
@ -2061,8 +2076,10 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags)
/*
* Initialize result tuple type and projection info.
* Result tuple slot of Aggregation always contains a virtual tuple,
* Default tableAMtype for this slot is Heap.
*/
ExecAssignResultTypeFromTL(&aggstate->ss.ps);
ExecAssignResultTypeFromTL(&aggstate->ss.ps, TAM_HEAP);
ExecAssignProjectionInfo(&aggstate->ss.ps, NULL);
aggstate->ss.ps.ps_TupFromTlist = false;
@ -2268,11 +2285,17 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags)
peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
#ifdef PGXC
peraggstate->collectfn_oid = collectfn_oid = aggform->aggcollectfn;
peraggstate->is_avg = false;
if (finalfn_oid == 1830) {
peraggstate->is_avg = true;
}
#ifdef ENABLE_MULTIPLE_NODES
/*
* For PGXC final and collection functions are used to combine results at Coordinator,
* disable those for Datanode
*/
if (IS_PGXC_DATANODE && !IS_SINGLE_NODE) {
if (IS_PGXC_DATANODE) {
if (!u_sess->exec_cxt.under_stream_runtime) {
peraggstate->finalfn_oid = finalfn_oid = InvalidOid;
peraggstate->collectfn_oid = collectfn_oid = InvalidOid;
@ -2285,6 +2308,18 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags)
}
}
}
#else
if (IS_STREAM_PLAN || StreamThreadAmI()) {
if (need_adjust_agg_inner_func_type(peraggstate->aggref)) {
if (!node->is_final && !node->single_node) {
peraggstate->finalfn_oid = finalfn_oid = InvalidOid;
}
if (aggref->aggstage == 0 && !node->is_final && !node->single_node) {
peraggstate->collectfn_oid = collectfn_oid = InvalidOid;
}
}
}
#endif /* ENABLE_MULTIPLE_NODES */
#endif /* PGXC */
/* Check that aggregate owner has permission to call component fns */
{
@ -2718,7 +2753,7 @@ void ExecReScanAgg(AggState* node)
*/
/* Release first tuple of group, if we have made a copy */
if (node->grp_firstTuple != NULL) {
heap_freetuple_ext(node->grp_firstTuple);
tableam_tops_free_tuple(node->grp_firstTuple);
node->grp_firstTuple = NULL;
}
(void)ExecClearTuple(node->ss.ss_ScanTupleSlot);
@ -3072,7 +3107,7 @@ void ExecReSetAgg(AggState* node)
*/
/* Release first tuple of group, if we have made a copy */
if (node->grp_firstTuple != NULL) {
heap_freetuple(node->grp_firstTuple);
tableam_tops_free_tuple(node->grp_firstTuple);
node->grp_firstTuple = NULL;
}
(void)ExecClearTuple(node->ss.ss_ScanTupleSlot);