add openGauss 3.1.0 feature code

This commit is contained in:
yanghao
2022-09-03 16:22:35 +08:00
parent 801d945a3d
commit b919f404e8
2759 changed files with 521358 additions and 366321 deletions

View File

@ -38,7 +38,6 @@
#include "postgres.h"
#include "knl/knl_variable.h"
#include "access/dfs/dfs_insert.h"
#include "access/xact.h"
#include "access/tableam.h"
#include "catalog/heap.h"
@ -83,7 +82,6 @@
#include "utils/portal.h"
#include "utils/snapmgr.h"
#include "vecexecutor/vecmergeinto.h"
#include "access/dfs/dfs_insert.h"
#include "access/heapam.h"
#include "access/ustore/knl_uheap.h"
#include "access/ustore/knl_whitebox_test.h"
@ -289,7 +287,7 @@ static void ExecCheckTIDVisible(Relation targetrel, EState* estate, Relat
errmsg("failed to fetch conflicting tuple for DUPLICATE KEY UPDATE")));
}
tableam_tuple_check_visible(targetrel, estate->es_snapshot, &tuple, buffer);
tableam_tuple_check_visible(targetrel, estate->es_snapshot, tuple, buffer);
tableam_tops_destroy_tuple(targetrel, tuple);
ReleaseBuffer(buffer);
}
@ -471,7 +469,6 @@ bool ExecComputeStoredUpdateExpr(ResultRelInfo *resultRelInfo, EState *estate, T
bool *nulls;
bool *replaces;
Tuple newtuple;
errno_t rc = EOK;
FmgrInfo eqproc;
Oid opfuncoid = InvalidOid;
bool match = false;
@ -501,7 +498,7 @@ bool ExecComputeStoredUpdateExpr(ResultRelInfo *resultRelInfo, EState *estate, T
updated_colnum_resno = attnum + FirstLowInvalidHeapAttributeNumber;
temp_id = attnum;
for (int32 i = 0; i < (int32)natts; i++) {
if (updated_colnum_resno == (i + 1)) {
if (updated_colnum_resno == (uint32)(i + 1)) {
if (slot->tts_isnull[i] && oldnulls[i]) {
match = true;
} else if (slot->tts_isnull[i] && !oldnulls[i]) {
@ -550,7 +547,7 @@ bool ExecComputeStoredUpdateExpr(ResultRelInfo *resultRelInfo, EState *estate, T
if (GetUpdateExprCol(tupdesc, i) && resultRelInfo->ri_UpdatedExprs[i]) {
ExprContext *econtext;
Datum val;
bool isnull;
bool isnull = false;
econtext = GetPerTupleExprContext(estate);
econtext->ecxt_scantuple = slot;
@ -722,7 +719,13 @@ static inline void ReleaseResourcesForUpsertGPI(bool isgpi, Relation parentRel,
bool CheckPartitionOidForSpecifiedPartition(RangeTblEntry *rte, Oid partitionid, bool canIgnore = false)
{
int level = canIgnore ? WARNING : ERROR;
if (rte->isContainPartition && rte->partitionOid != partitionid) {
if (rte->isContainPartition) {
ListCell *cell = NULL;
foreach(cell, rte->partitionOidList) {
if (lfirst_oid(cell) == partitionid) {
return true;
}
}
ereport(level, (errcode(ERRCODE_NO_DATA_FOUND),
(errmsg("inserted partition key does not map to the table partition"), errdetail("N/A."),
errcause("The value is incorrect."), erraction("Use the correct value."))));
@ -735,7 +738,15 @@ bool CheckSubpartitionOidForSpecifiedSubpartition(RangeTblEntry *rte, Oid partit
bool canIgnore = false)
{
int level = canIgnore ? WARNING : ERROR;
if (rte->isContainSubPartition && (rte->partitionOid != partitionid || rte->subpartitionOid != subPartitionId)) {
if (rte->isContainSubPartition) {
ListCell *partCell = NULL;
ListCell *subpartCell = NULL;
forboth(partCell, rte->partitionOidList, subpartCell, rte->subpartitionOidList) {
if (lfirst_oid(partCell) == partitionid &&
(lfirst_oid(subpartCell) == subPartitionId || !OidIsValid(lfirst_oid(subpartCell)))) {
return true;
}
}
ereport(level, (errcode(ERRCODE_NO_DATA_FOUND),
(errmsg("inserted subpartition key does not map to the table subpartition"), errdetail("N/A."),
errcause("The value is incorrect."), erraction("Use the correct value."))));
@ -744,7 +755,7 @@ bool CheckSubpartitionOidForSpecifiedSubpartition(RangeTblEntry *rte, Oid partit
return true;
}
static void ReportErrorForSpecifiedPartitionOfUpsert(char *partition, char *table)
static void ReportErrorForSpecifiedPartitionOfUpsert(const char *partition, const char *table)
{
ereport(ERROR,
(errcode(ERRCODE_NO_DATA_FOUND),
@ -759,25 +770,43 @@ static void ReportErrorForSpecifiedPartitionOfUpsert(char *partition, char *tabl
static void CheckPartitionOidForUpsertSpecifiedPartition(RangeTblEntry *rte, Relation resultRelationDesc,
Oid targetPartOid)
{
if (RelationIsSubPartitioned(resultRelationDesc)) {
Oid parentOid = partid_get_parentid(targetPartOid);
if (rte->isContainPartition && rte->partitionOid != parentOid) {
char *partition = "partition";
char *table = "subpartition";
ReportErrorForSpecifiedPartitionOfUpsert(partition, table);
}
if (rte->isContainSubPartition && (rte->partitionOid != parentOid || rte->subpartitionOid != targetPartOid)) {
char *partition = "subpartition";
char *table = "subpartition";
ReportErrorForSpecifiedPartitionOfUpsert(partition, table);
}
Oid partitionOid;
Oid subpartitionOid;
ListCell *partCell = NULL;
ListCell *subpartCell = NULL;
const char *table = NULL;
const char *partition = NULL;
if (rte->isContainSubPartition) {
partition = (rte->subpartitionOidList->length == 1) ? "subpartition" : "partition";
} else if (rte->isContainPartition) {
partition = "partition";
} else {
if (rte->isContainPartition && rte->partitionOid != targetPartOid) {
char *partition = "partition";
char *table = "partition";
ReportErrorForSpecifiedPartitionOfUpsert(partition, table);
/* partition is not specified */
return;
}
if (RelationIsSubPartitioned(resultRelationDesc)) {
table = "subpartition";
partitionOid = partid_get_parentid(targetPartOid);
subpartitionOid = targetPartOid;
} else {
table = "partition";
partitionOid = targetPartOid;
subpartitionOid = InvalidOid;
}
forboth(partCell, rte->partitionOidList, subpartCell, rte->subpartitionOidList) {
if (!OidIsValid(lfirst_oid(subpartCell))) {
if (lfirst_oid(partCell) == partitionOid) {
return;
}
} else if (lfirst_oid(subpartCell) == subpartitionOid && lfirst_oid(partCell) == partitionOid) {
return;
}
}
/* targetPartOid is not in specified partition list */
ReportErrorForSpecifiedPartitionOfUpsert(partition, table);
}
static void ConstraintsForExecUpsert(Relation resultRelationDesc)
@ -835,13 +864,16 @@ static Oid ExecUpsert(ModifyTableState* state, TupleTableSlot* slot, TupleTableS
RangeTblEntry *rte = exec_rt_fetch(resultRelInfo->ri_RangeTableIndex, estate);
if (RelationIsPartitioned(resultRelationDesc)) {
partitionid = heapTupleGetPartitionId(resultRelationDesc, tuple);
searchFakeReationForPartitionOid(estate->esfRelations,
bool res = trySearchFakeReationForPartitionOid(&estate->esfRelations,
estate->es_query_cxt,
resultRelationDesc,
partitionid,
heaprel,
partition,
&heaprel,
&partition,
RowExclusiveLock);
if (!res) {
return InvalidOid;
}
CheckPartitionOidForSpecifiedPartition(rte, partitionid);
if (RelationIsSubPartitioned(resultRelationDesc)) {
@ -940,6 +972,21 @@ static Oid ExecUpsert(ModifyTableState* state, TupleTableSlot* slot, TupleTableS
* then abort the tuple and try to find the conflict tuple again
*/
if (specConflict) {
/* delete index tuples and mark them as dead */
ExecIndexTuplesState exec_index_tuples_state;
exec_index_tuples_state.estate = estate;
exec_index_tuples_state.targetPartRel = heaprel;
exec_index_tuples_state.p = partition;
exec_index_tuples_state.conflict = NULL;
exec_index_tuples_state.rollbackIndex = true;
UpsertAction oldAction = state->mt_upsert->us_action;
state->mt_upsert->us_action = UPSERT_NONE;
tableam_tops_exec_delete_index_tuples(slot, heaprel, state,
&item, exec_index_tuples_state, NULL);
state->mt_upsert->us_action = oldAction;
/* rollback heap/uheap tuple */
tableam_tuple_abort_speculative(targetrel, tuple);
list_free(recheckIndexes);
@ -1123,16 +1170,17 @@ TupleTableSlot* ExecInsertT(ModifyTableState* state, TupleTableSlot* slot, Tuple
}
if (result_relation_desc->rd_att->constr) {
TupleTableSlot *tmp_slot = state->mt_insert_constr_slot == NULL ? slot : state->mt_insert_constr_slot;
if (!ExecConstraints(result_rel_info, tmp_slot, estate)) {
if (!ExecConstraints(result_rel_info, tmp_slot, estate, true)) {
if (u_sess->utils_cxt.sql_ignore_strategy_val == SQL_OVERWRITE_NULL) {
tuple = ReplaceTupleNullCol(RelationGetDescr(result_relation_desc), tmp_slot);
/* Double check constraints in case that new val in column with not null constraints
* violated check constraints */
ExecConstraints(result_rel_info, tmp_slot, estate);
ExecConstraints(result_rel_info, tmp_slot, estate, true);
} else {
return NULL;
}
}
tuple = ExecAutoIncrement(result_relation_desc, estate, slot, tuple);
}
#ifdef PGXC
@ -1257,15 +1305,6 @@ TupleTableSlot* ExecInsertT(ModifyTableState* state, TupleTableSlot* slot, Tuple
if (RelationIsCUFormat(result_relation_desc)) {
HeapInsertCStore(result_relation_desc, estate->es_result_relation_info, (HeapTuple)tuple,
0);
} else if (RelationIsPAXFormat(result_relation_desc)) {
/* here the insert including both none-partitioned and value-partitioned relations */
DfsInsertInter* insert = CreateDfsInsert(result_relation_desc, false);
insert->BeginBatchInsert(TUPLE_SORT, estate->es_result_relation_info);
insert->TupleInsert(slot->tts_values, slot->tts_isnull, 0);
insert->SetEndFlag();
insert->TupleInsert(NULL, NULL, 0);
insert->Destroy();
delete insert;
} else {
target_rel = result_relation_desc;
if (bucket_id != InvalidBktId) {
@ -1306,8 +1345,11 @@ TupleTableSlot* ExecInsertT(ModifyTableState* state, TupleTableSlot* slot, Tuple
return NULL;
}
searchFakeReationForPartitionOid(estate->esfRelations, estate->es_query_cxt,
result_relation_desc, partition_id, heap_rel, partition, RowExclusiveLock);
bool res = trySearchFakeReationForPartitionOid(&estate->esfRelations, estate->es_query_cxt,
result_relation_desc, partition_id, &heap_rel, &partition, RowExclusiveLock);
if (!res) {
return NULL;
}
if (RelationIsColStore(result_relation_desc)) {
HeapInsertCStore(heap_rel, estate->es_result_relation_info, (HeapTuple)tuple, 0);
#ifdef ENABLE_MULTIPLE_NODES
@ -1360,9 +1402,11 @@ TupleTableSlot* ExecInsertT(ModifyTableState* state, TupleTableSlot* slot, Tuple
return NULL;
}
searchFakeReationForPartitionOid(estate->esfRelations, estate->es_query_cxt,
result_relation_desc, partitionId, partRel, part,
RowExclusiveLock);
bool res = trySearchFakeReationForPartitionOid(&estate->esfRelations, estate->es_query_cxt,
result_relation_desc, partitionId, &partRel, &part, RowExclusiveLock);
if (!res) {
return NULL;
}
/* get subpartititon oid for insert the record */
subPartitionId = heapTupleGetPartitionId(partRel, tuple, false, estate->es_plannedstmt->hasIgnore);
@ -1375,8 +1419,12 @@ TupleTableSlot* ExecInsertT(ModifyTableState* state, TupleTableSlot* slot, Tuple
return NULL;
}
searchFakeReationForPartitionOid(estate->esfRelations, estate->es_query_cxt, partRel,
subPartitionId, subPartRel, subPart, RowExclusiveLock);
res = trySearchFakeReationForPartitionOid(&estate->esfRelations, estate->es_query_cxt,
partRel, subPartitionId, &subPartRel, &subPart, RowExclusiveLock);
if (!res) {
partitionClose(result_relation_desc, part, RowExclusiveLock);
return NULL;
}
partition_id = subPartitionId;
heap_rel = subPartRel;
@ -1413,7 +1461,6 @@ TupleTableSlot* ExecInsertT(ModifyTableState* state, TupleTableSlot* slot, Tuple
RelationGetRelationName(result_relation_desc)))));
} break;
}
/*
* insert index entries for tuple
*/
@ -1683,6 +1730,7 @@ ldelete:
isPartitionedRelation(result_relation_desc->rd_rel) ? part_relation : NULL;
exec_index_tuples_state.p = isPartitionedRelation(result_relation_desc->rd_rel) ? partition : NULL;
exec_index_tuples_state.conflict = NULL;
exec_index_tuples_state.rollbackIndex = false;
tableam_tops_exec_delete_index_tuples(oldslot, fake_relation, node,
tupleid, exec_index_tuples_state, modifiedIdxAttrs);
if (oldslot) {
@ -2073,14 +2121,6 @@ TupleTableSlot* ExecUpdate(ItemPointer tupleid,
}
}
/*
* Compute stored generated columns
*/
if (result_relation_desc->rd_att->constr && result_relation_desc->rd_att->constr->has_generated_stored) {
ExecComputeStoredGenerated(result_rel_info, estate, slot, tuple, CMD_UPDATE);
tuple = slot->tts_tuple;
}
/*
* Check the constraints of the tuple
*
@ -2092,6 +2132,15 @@ TupleTableSlot* ExecUpdate(ItemPointer tupleid,
*/
Assert(RELATION_HAS_BUCKET(result_relation_desc) == (bucketid != InvalidBktId));
lreplace:
/*
* Compute stored generated columns
*/
if (result_relation_desc->rd_att->constr && result_relation_desc->rd_att->constr->has_generated_stored) {
ExecComputeStoredGenerated(result_rel_info, estate, slot, tuple, CMD_UPDATE);
tuple = slot->tts_tuple;
}
if (result_relation_desc->rd_att->constr) {
TupleTableSlot *tmp_slot = node->mt_update_constr_slot == NULL ? slot : node->mt_update_constr_slot;
if (!ExecConstraints(result_rel_info, tmp_slot, estate)) {
@ -2305,9 +2354,9 @@ lreplace:
exec_index_tuples_state.targetPartRel = NULL;
exec_index_tuples_state.p = NULL;
exec_index_tuples_state.conflict = NULL;
recheck_indexes = tableam_tops_exec_update_index_tuples(
slot, oldslot, fake_relation, node, tuple, tupleid, exec_index_tuples_state, bucketid,
modifiedIdxAttrs);
exec_index_tuples_state.rollbackIndex = false;
recheck_indexes = tableam_tops_exec_update_index_tuples(slot, oldslot, fake_relation,
node, tuple, tupleid, exec_index_tuples_state, bucketid, modifiedIdxAttrs);
}
if (oldslot) {
@ -2586,9 +2635,9 @@ lreplace:
exec_index_tuples_state.targetPartRel = fake_part_rel;
exec_index_tuples_state.p = partition;
exec_index_tuples_state.conflict = NULL;
recheck_indexes = tableam_tops_exec_update_index_tuples(
slot, oldslot, fake_relation, node, tuple, tupleid, exec_index_tuples_state, bucketid,
modifiedIdxAttrs);
exec_index_tuples_state.rollbackIndex = false;
recheck_indexes = tableam_tops_exec_update_index_tuples(slot, oldslot, fake_relation,
node, tuple, tupleid, exec_index_tuples_state, bucketid, modifiedIdxAttrs);
}
if (oldslot) {
@ -2710,6 +2759,7 @@ ldelete:
exec_index_tuples_state.targetPartRel = old_fake_relation;
exec_index_tuples_state.p = old_partition;
exec_index_tuples_state.conflict = NULL;
exec_index_tuples_state.rollbackIndex = false;
tableam_tops_exec_delete_index_tuples(oldslot, old_fake_relation, node,
tupleid, exec_index_tuples_state, modifiedIdxAttrs);
if (oldslot) {
@ -2830,7 +2880,32 @@ ldelete:
/* insert the new tuple */
{
Partition insert_partition = NULL;
Relation fake_insert_relation = NULL;
if (need_create_file) {
new_partId = AddNewIntervalPartition(result_relation_desc, tuple);
}
bool res = trySearchFakeReationForPartitionOid(&estate->esfRelations,
estate->es_query_cxt,
result_relation_desc,
new_partId,
&fake_part_rel,
&insert_partition,
RowExclusiveLock);
if (!res) {
return NULL;
}
fake_insert_relation = fake_part_rel;
if (bucketid != InvalidBktId) {
searchHBucketFakeRelation(estate->esfRelations,
estate->es_query_cxt,
fake_insert_relation,
bucketid,
fake_insert_relation);
}
if (result_relation_desc->rd_isblockchain) {
MemoryContext old_context = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
tuple = set_user_tuple_hash((HeapTuple)tuple, fake_insert_relation);
@ -2930,18 +3005,26 @@ ldelete:
*/
static void fireBSTriggers(ModifyTableState* node)
{
ResultRelInfo* resultRelInfo = node->resultRelInfo;
int resultRelationNum = node->mt_ResultTupleSlots ? list_length(node->mt_ResultTupleSlots) : 1;
switch (node->operation) {
case CMD_INSERT:
ExecBSInsertTriggers(node->ps.state, node->resultRelInfo);
ExecBSInsertTriggers(node->ps.state, resultRelInfo);
if (node->mt_upsert->us_action == UPSERT_UPDATE) {
ExecBSUpdateTriggers(node->ps.state, node->resultRelInfo);
ExecBSUpdateTriggers(node->ps.state, resultRelInfo);
}
break;
case CMD_UPDATE:
ExecBSUpdateTriggers(node->ps.state, node->resultRelInfo);
for (int i = 0; i < resultRelationNum; i++) {
ExecBSUpdateTriggers(node->ps.state, resultRelInfo);
resultRelInfo++;
}
break;
case CMD_DELETE:
ExecBSDeleteTriggers(node->ps.state, node->resultRelInfo);
for (int i = 0; i < resultRelationNum; i++) {
ExecBSDeleteTriggers(node->ps.state, resultRelInfo);
resultRelInfo++;
}
break;
case CMD_MERGE:
break;
@ -2960,18 +3043,26 @@ static void fireBSTriggers(ModifyTableState* node)
*/
static void fireASTriggers(ModifyTableState* node)
{
ResultRelInfo* resultRelInfo = node->resultRelInfo;
int resultRelationNum = node->mt_ResultTupleSlots ? list_length(node->mt_ResultTupleSlots) : 1;
switch (node->operation) {
case CMD_INSERT:
if (node->mt_upsert->us_action == UPSERT_UPDATE) {
ExecASUpdateTriggers(node->ps.state, node->resultRelInfo);
ExecASUpdateTriggers(node->ps.state, resultRelInfo);
}
ExecASInsertTriggers(node->ps.state, node->resultRelInfo);
ExecASInsertTriggers(node->ps.state, resultRelInfo);
break;
case CMD_UPDATE:
ExecASUpdateTriggers(node->ps.state, node->resultRelInfo);
for (int i = 0; i < resultRelationNum; i++) {
ExecASUpdateTriggers(node->ps.state, resultRelInfo);
resultRelInfo++;
}
break;
case CMD_DELETE:
ExecASDeleteTriggers(node->ps.state, node->resultRelInfo);
for (int i = 0; i < resultRelationNum; i++) {
ExecASDeleteTriggers(node->ps.state, resultRelInfo);
resultRelInfo++;
}
break;
case CMD_MERGE:
break;
@ -3029,6 +3120,17 @@ uint64 GetDeleteLimitCount(ExprContext* econtext, PlanState* scan, Limit *limitP
return (uint64)iCount;
}
static TupleTableSlot* FetchPlanSlot(ModifyTableState* node, PlanState* subPlanState)
{
EState* estate = node->ps.state;
if (estate->result_rel_index > 0) {
return ExecProject(node->mt_ProjInfos[estate->result_rel_index], NULL);
} else {
return ExecProcNode(subPlanState);
}
}
/* ----------------------------------------------------------------
* ExecModifyTable
*
@ -3056,8 +3158,6 @@ TupleTableSlot* ExecModifyTable(ModifyTableState* node)
ItemPointer tuple_id = NULL;
ItemPointerData tuple_ctid;
HeapTupleHeader old_tuple = NULL;
AttrNumber part_oid_num = InvalidAttrNumber;
AttrNumber bucket_Id_num = InvalidAttrNumber;
Oid old_partition_oid = InvalidOid;
bool part_key_updated = ((ModifyTable*)node->ps.plan)->partKeyUpdated;
TupleTableSlot* (*ExecInsert)(
@ -3068,7 +3168,7 @@ TupleTableSlot* ExecModifyTable(ModifyTableState* node)
bool is_first_modified = true;
int2 bucketid = InvalidBktId;
List *partition_list = NIL;
int resultRelationNum = node->mt_ResultTupleSlots ? list_length(node->mt_ResultTupleSlots) : 1;
/*
* This should NOT get called during EvalPlanQual; we should have passed a
* subplan tree to EvalPlanQual, instead. Use a runtime test not just
@ -3105,7 +3205,7 @@ TupleTableSlot* ExecModifyTable(ModifyTableState* node)
WHITEBOX_TEST_STUB("ExecModifyTable_Begin", WhiteboxDefaultErrorEmit);
/* Preload local variables */
result_rel_info = node->resultRelInfo + node->mt_whichplan;
result_rel_info = node->resultRelInfo + estate->result_rel_index;
subPlanState = node->mt_plans[node->mt_whichplan];
#ifdef PGXC
/* Initialize remote plan state */
@ -3114,10 +3214,6 @@ TupleTableSlot* ExecModifyTable(ModifyTableState* node)
update_remote_rel_state = node->mt_update_remoterels[node->mt_whichplan];
delete_remote_rel_state = node->mt_delete_remoterels[node->mt_whichplan];
#endif
junk_filter = result_rel_info->ri_junkFilter;
part_oid_num = result_rel_info->ri_partOidAttNum;
bucket_Id_num = result_rel_info->ri_bucketIdAttNum;
/*
* es_result_relation_info must point to the currently active result
* relation while we are within this ModifyTable node. Even though
@ -3171,24 +3267,27 @@ TupleTableSlot* ExecModifyTable(ModifyTableState* node)
subPlanState->state->es_skip_early_free = true;
subPlanState->state->es_skip_early_deinit_consumer = true;
/*
* Fetch rows from subplan(s), and execute the required table modification
* for each row.
*/
for (;;) {
result_rel_info = node->resultRelInfo + estate->result_rel_index;
estate->es_result_relation_info = result_rel_info;
junk_filter = result_rel_info->ri_junkFilter;
if (estate->deleteLimitCount != 0 && estate->es_processed == estate->deleteLimitCount) {
break;
}
/*
* Reset the per-output-tuple exprcontext. This is needed because
* Reset the per-output-tuple exprcontext. This is needed because
* triggers expect to use that context as workspace. It's a bit ugly
* to do this below the top level of the plan, however. We might need
* to rethink this later.
*/
ResetPerTupleExprContext(estate);
t_thrd.xact_cxt.ActiveLobRelid = result_rel_info->ri_RelationDesc->rd_id;
plan_slot = ExecProcNode(subPlanState);
plan_slot = FetchPlanSlot(node, subPlanState);
t_thrd.xact_cxt.ActiveLobRelid = InvalidOid;
if (TupIsNull(plan_slot)) {
record_first_time();
@ -3199,8 +3298,8 @@ TupleTableSlot* ExecModifyTable(ModifyTableState* node)
/* advance to next subplan if any */
node->mt_whichplan++;
Assert(estate->result_rel_index == 0);
if (node->mt_whichplan < node->mt_nplans) {
result_rel_info++;
subPlanState = node->mt_plans[node->mt_whichplan];
#ifdef PGXC
/* Move to next remote plan */
@ -3265,14 +3364,15 @@ TupleTableSlot* ExecModifyTable(ModifyTableState* node)
relkind = result_rel_info->ri_RelationDesc->rd_rel->relkind;
if (relkind == RELKIND_RELATION || RELKIND_IS_SEQUENCE(relkind)) {
datum = ExecGetJunkAttribute(slot, junk_filter->jf_junkAttNo, &isNull);
/* shouldn't ever get a null result... */
/* when mutil result relations doing left join, maybe exists NULL ctid. */
if (isNull) {
ereport(ERROR,
(errmodule(MOD_EXECUTOR),
(errcode(ERRCODE_NULL_JUNK_ATTRIBUTE),
errmsg("ctid is NULL when do operation %d, junk attribute number is %d",
operation,
junk_filter->jf_junkAttNo))));
if (estate->result_rel_index == resultRelationNum - 1) {
estate->result_rel_index = 0;
} else {
estate->result_rel_index++;
}
continue;
}
tuple_id = (ItemPointer)DatumGetPointer(datum);
@ -3283,7 +3383,7 @@ TupleTableSlot* ExecModifyTable(ModifyTableState* node)
Datum tableOiddatum;
bool tableOidisnull = false;
tableOiddatum = ExecGetJunkAttribute(slot, part_oid_num, &tableOidisnull);
tableOiddatum = ExecGetJunkAttribute(slot, result_rel_info->ri_partOidAttNum, &tableOidisnull);
if (tableOidisnull) {
ereport(ERROR,
@ -3299,7 +3399,8 @@ TupleTableSlot* ExecModifyTable(ModifyTableState* node)
Datum bucketIddatum;
bool bucketIdisnull = false;
bucketIddatum = ExecGetJunkAttribute(slot, bucket_Id_num, &bucketIdisnull);
bucketIddatum = ExecGetJunkAttribute(slot, result_rel_info->ri_bucketIdAttNum,
&bucketIdisnull);
if (bucketIdisnull) {
ereport(ERROR,
@ -3395,6 +3496,12 @@ TupleTableSlot* ExecModifyTable(ModifyTableState* node)
record_first_time();
if (estate->result_rel_index == resultRelationNum - 1) {
estate->result_rel_index = 0;
} else {
estate->result_rel_index++;
}
/*
* If we got a RETURNING result, return it to caller. We'll continue
* the work on next call.
@ -3431,6 +3538,42 @@ TupleTableSlot* ExecModifyTable(ModifyTableState* node)
return NULL;
}
static void InitMultipleModify(ModifyTableState* node, PlanState* subnode, uint32 resultRelationNum)
{
bool hasoid = false;
ListCell* l1 = NULL;
ListCell* l2 = NULL;
List* targetlists = ((ModifyTable*)(node->ps.plan))->targetlists;
EState* estate = node->ps.state;
if (resultRelationNum == 1) {
return;
}
node->mt_ProjInfos = (ProjectionInfo**)palloc0((uint32)resultRelationNum * sizeof(ProjectionInfo*));
node->targetlists = (List*)ExecInitExpr((Expr*)targetlists, (PlanState*)node);
int i = 0;
forboth (l1, node->targetlists, l2, targetlists) {
List* targetExprList = (List*)lfirst(l1);
List* targetList = (List*)lfirst(l2);
if (!ExecContextForcesOids((PlanState*)node, &hasoid)) {
hasoid = false;
}
estate->es_result_relation_info++;
TupleTableSlot *slot = ExecAllocTableSlot(&estate->es_tupleTable);
TupleDesc tupDesc = ExecTypeFromTL(targetList, hasoid, false);
ExecSetSlotDescriptor(slot, tupDesc);
node->mt_ResultTupleSlots = lappend(node->mt_ResultTupleSlots, slot);
/* Use the targetlist of the subplan to build mt_ProjInfos. It will be use for FetchMultipleModifySlot. */
ProjectionInfo* projInfo = ExecBuildProjectionInfo(targetExprList, subnode->ps_ExprContext, slot, NULL);
node->mt_ProjInfos[i] = projInfo;
i++;
}
}
/* ----------------------------------------------------------------
* ExecInitModifyTable
* ----------------------------------------------------------------
@ -3450,6 +3593,7 @@ ModifyTableState* ExecInitModifyTable(ModifyTable* node, EState* estate, int efl
#ifdef PGXC
PlanState* saved_remote_rel_info = NULL;
#endif
int resultRelationNum = list_length((List*)linitial(node->resultRelations));
/* check for unsupported flags */
Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
@ -3553,32 +3697,12 @@ ModifyTableState* ExecInitModifyTable(ModifyTable* node, EState* estate, int efl
/*
* Verify result relation is a valid target for the current operation
*/
CheckValidResultRel(result_rel_info->ri_RelationDesc, operation);
/*
* If there are indices on the result relation, open them and save
* descriptors in the result relation info, so that we can add new
* index entries for the tuples we add/update. We need not do this
* for a DELETE, however, since deletion doesn't affect indexes. Also,
* inside an EvalPlanQual operation, the indexes might be open
* already, since we share the resultrel state with the original
* query.
*/
if (result_rel_info->ri_RelationDesc->rd_rel->relhasindex &&
result_rel_info->ri_IndexRelationDescs == NULL) {
#ifdef ENABLE_MOT
if (result_rel_info->ri_FdwRoutine == NULL || result_rel_info->ri_FdwRoutine->GetFdwType == NULL ||
result_rel_info->ri_FdwRoutine->GetFdwType() != MOT_ORC) {
#endif
ExecOpenIndices(result_rel_info, (node->upsertAction != UPSERT_NONE ||
(estate->es_plannedstmt && estate->es_plannedstmt->hasIgnore)));
#ifdef ENABLE_MOT
}
#endif
for (int ri = 0; ri < resultRelationNum; ri++) {
CheckValidResultRel(result_rel_info->ri_RelationDesc, operation);
result_rel_info++;
}
result_rel_info = mt_state->resultRelInfo;
init_gtt_storage(operation, result_rel_info);
/* Now init the plan for this result rel */
estate->es_result_relation_info = result_rel_info;
if (sub_plan->type == T_Limit && operation == CMD_DELETE && IsLimitDML((Limit*)sub_plan)) {
@ -3592,6 +3716,36 @@ ModifyTableState* ExecInitModifyTable(ModifyTable* node, EState* estate, int efl
} else {
mt_state->mt_plans[i] = ExecInitNode(sub_plan, estate, eflags);
}
InitMultipleModify(mt_state, mt_state->mt_plans[i], (uint32)resultRelationNum);
estate->es_result_relation_info = result_rel_info;
/*
* If there are indices on the result relation, open them and save
* descriptors in the result relation info, so that we can add new
* index entries for the tuples we add/update. We need not do this
* for a DELETE, however, since deletion doesn't affect indexes. Also,
* inside an EvalPlanQual operation, the indexes might be open
* already, since we share the resultrel state with the original
* query.
*/
for (int ri = 0; ri < resultRelationNum; ri++) {
if (result_rel_info->ri_RelationDesc->rd_rel->relhasindex &&
result_rel_info->ri_IndexRelationDescs == NULL) {
#ifdef ENABLE_MOT
if (result_rel_info->ri_FdwRoutine == NULL || result_rel_info->ri_FdwRoutine->GetFdwType == NULL ||
result_rel_info->ri_FdwRoutine->GetFdwType() != MOT_ORC) {
#endif
if (RelationIsUstoreFormat(result_rel_info->ri_RelationDesc) || operation != CMD_DELETE) {
ExecOpenIndices(result_rel_info, (node->upsertAction != UPSERT_NONE ||
(estate->es_plannedstmt && estate->es_plannedstmt->hasIgnore)));
}
#ifdef ENABLE_MOT
}
#endif
}
result_rel_info++;
}
result_rel_info = mt_state->resultRelInfo;
init_gtt_storage(operation, result_rel_info);
if (operation == CMD_MERGE && RelationInClusterResizing(estate->es_result_relation_info->ri_RelationDesc)) {
ereport(ERROR,
@ -3625,7 +3779,6 @@ ModifyTableState* ExecInitModifyTable(ModifyTable* node, EState* estate, int efl
#endif
}
result_rel_info++;
i++;
}
@ -3695,7 +3848,6 @@ ModifyTableState* ExecInitModifyTable(ModifyTable* node, EState* estate, int efl
rliststate = (List*)ExecInitExpr((Expr*)rlist, &mt_state->ps);
result_rel_info->ri_projectReturning =
ExecBuildProjectionInfo(rliststate, econtext, slot, result_rel_info->ri_RelationDesc->rd_att);
result_rel_info++;
}
} else {
/*
@ -3860,88 +4012,22 @@ ModifyTableState* ExecInitModifyTable(ModifyTable* node, EState* estate, int efl
if (junk_filter_needed) {
result_rel_info = mt_state->resultRelInfo;
for (i = 0; i < nplans; i++) {
JunkFilter* j = NULL;
sub_plan = mt_state->mt_plans[i]->plan;
if (operation == CMD_INSERT || operation == CMD_UPDATE) {
CheckPlanOutput(sub_plan, result_rel_info->ri_RelationDesc);
}
j = ExecInitJunkFilter(sub_plan->targetlist,
result_rel_info->ri_RelationDesc->rd_att->tdhasoid,
ExecInitExtraTupleSlot(estate, result_rel_info->ri_RelationDesc->rd_tam_type));
if (operation == CMD_UPDATE || operation == CMD_DELETE || operation == CMD_MERGE) {
/* For UPDATE/DELETE, find the appropriate junk attr now */
char relkind;
relkind = result_rel_info->ri_RelationDesc->rd_rel->relkind;
if (relkind == RELKIND_RELATION) {
j->jf_junkAttNo = ExecFindJunkAttribute(j, "ctid");
if (!AttributeNumberIsValid(j->jf_junkAttNo)) {
ereport(ERROR,
(errmodule(MOD_EXECUTOR),
(errcode(ERRCODE_INVALID_ATTRIBUTE), errmsg("could not find junk ctid column"))));
}
/* if the table is partitioned table ,give a paritionOidJunkOid junk */
if (RELATION_IS_PARTITIONED(result_rel_info->ri_RelationDesc) ||
RelationIsCUFormat(result_rel_info->ri_RelationDesc)) {
AttrNumber tableOidAttNum = ExecFindJunkAttribute(j, "tableoid");
if (!AttributeNumberIsValid(tableOidAttNum)) {
ereport(ERROR,
(errmodule(MOD_EXECUTOR),
(errcode(ERRCODE_INVALID_ATTRIBUTE),
errmsg("could not find junk tableoid column for partition table."))));
}
result_rel_info->ri_partOidAttNum = tableOidAttNum;
j->jf_xc_part_id = result_rel_info->ri_partOidAttNum;
}
if (RELATION_HAS_BUCKET(result_rel_info->ri_RelationDesc)) {
AttrNumber bucketIdAttNum = ExecFindJunkAttribute(j, "tablebucketid");
if (!AttributeNumberIsValid(bucketIdAttNum)) {
ereport(ERROR,
(errmodule(MOD_EXECUTOR),
(errcode(ERRCODE_INVALID_ATTRIBUTE),
errmsg("could not find junk bucketid column for bucketed table."))));
}
result_rel_info->ri_bucketIdAttNum = bucketIdAttNum;
j->jf_xc_bucket_id = result_rel_info->ri_bucketIdAttNum;
}
#ifdef PGXC
if (IS_PGXC_COORDINATOR && RelationGetLocInfo(result_rel_info->ri_RelationDesc)) {
/*
* We may or may not need these attributes depending upon
* the exact kind of trigger. We defer the check; instead throw
* error only at the point when we need but don't find one.
*/
j->jf_xc_node_id = ExecFindJunkAttribute(j, "xc_node_id");
j->jf_xc_wholerow = ExecFindJunkAttribute(j, "wholerow");
j->jf_primary_keys = ExecFindJunkPrimaryKeys(sub_plan->targetlist);
} else if (IS_PGXC_DATANODE && !IS_SINGLE_NODE) {
j->jf_xc_node_id = ExecFindJunkAttribute(j, "xc_node_id");
}
#endif
} else if (relkind == RELKIND_FOREIGN_TABLE || relkind == RELKIND_STREAM) {
/* FDW must fetch any junk attrs it wants */
} else {
j->jf_junkAttNo = ExecFindJunkAttribute(j, "wholerow");
if (!AttributeNumberIsValid(j->jf_junkAttNo)) {
ereport(ERROR,
(errmodule(MOD_EXECUTOR),
(errcode(ERRCODE_INVALID_ATTRIBUTE),
errmsg("could not find junk wholerow column"))));
if (resultRelationNum > 1) {
foreach (l, node->targetlists) {
List* targetlist = (List*)lfirst(l);
if (operation == CMD_UPDATE) {
ExecCheckPlanOutput(result_rel_info->ri_RelationDesc, targetlist);
}
ExecInitJunkAttr(estate, operation, targetlist, result_rel_info);
result_rel_info++;
}
} else {
if (operation == CMD_UPDATE) {
CheckPlanOutput(sub_plan, result_rel_info->ri_RelationDesc);
}
ExecInitJunkAttr(estate, operation, sub_plan->targetlist, result_rel_info);
}
result_rel_info->ri_junkFilter = j;
result_rel_info++;
}
} else {
if (operation == CMD_INSERT) {
@ -3976,6 +4062,14 @@ ModifyTableState* ExecInitModifyTable(ModifyTable* node, EState* estate, int efl
return mt_state;
}
static void ExecClearMultipleModifyTuple(List* slots)
{
foreach_cell (l, slots) {
TupleTableSlot* slot = (TupleTableSlot*)lfirst(l);
(void)ExecClearTuple(slot);
}
}
/* ----------------------------------------------------------------
* ExecEndModifyTable
*
@ -4032,6 +4126,7 @@ void ExecEndModifyTable(ModifyTableState* node)
* clean out the tuple table
*/
(void)ExecClearTuple(node->ps.ps_ResultTupleSlot);
ExecClearMultipleModifyTuple(node->mt_ResultTupleSlots);
/*
* Terminate EPQ execution if active