diff --git a/src/common/backend/nodes/copyfuncs.cpp b/src/common/backend/nodes/copyfuncs.cpp index a10fadb78..e6a111acc 100644 --- a/src/common/backend/nodes/copyfuncs.cpp +++ b/src/common/backend/nodes/copyfuncs.cpp @@ -412,6 +412,9 @@ static ModifyTable* _copyModifyTable(const ModifyTable* from) COPY_NODE_FIELD(targetlists); COPY_NODE_FIELD(withCheckOptionLists); +#ifdef USE_SPQ + COPY_NODE_FIELD(isSplitUpdates); +#endif return newnode; } @@ -7576,6 +7579,31 @@ static Sequence *_copySequence(const Sequence *from) return newnode; } + +static DMLActionExpr *_copyDMLActionExpr(const DMLActionExpr *from) +{ + DMLActionExpr *newnode = makeNode(DMLActionExpr); + + return newnode; +} + +static SplitUpdate *_copySplitUpdate(const SplitUpdate *from) +{ + SplitUpdate *newnode = makeNode(SplitUpdate); + + /* + * copy node superclass fields + */ + CopyPlanFields((Plan *) from, (Plan *) newnode); + + COPY_SCALAR_FIELD(actionColIdx); + COPY_SCALAR_FIELD(tupleoidColIdx); + COPY_NODE_FIELD(insertColIdx); + COPY_NODE_FIELD(deleteColIdx); + + return newnode; +} + #endif static CondInfo *_copyCondInfo(const CondInfo *from) { @@ -9051,7 +9079,13 @@ void* copyObject(const void* from) #ifdef USE_SPQ case T_Motion: retval = _copyMotion((Motion*)from); - break;; + break; + case T_DMLActionExpr: + retval = _copyDMLActionExpr((DMLActionExpr*)from); + break; + case T_SplitUpdate: + retval = _copySplitUpdate((SplitUpdate*)from); + break; #endif default: ereport(ERROR, diff --git a/src/common/backend/nodes/nodeFuncs.cpp b/src/common/backend/nodes/nodeFuncs.cpp index fc3d82fd0..553048de2 100644 --- a/src/common/backend/nodes/nodeFuncs.cpp +++ b/src/common/backend/nodes/nodeFuncs.cpp @@ -243,6 +243,11 @@ Oid exprType(const Node* expr) case T_UserSetElem: type = userSetElemTypeCollInfo(expr, exprType); break; +#ifdef USE_SPQ + case T_DMLActionExpr: + type = INT4OID; + break; +#endif default: ereport(ERROR, (errcode(ERRCODE_UNRECOGNIZED_NODE_TYPE), errmsg("unrecognized node type: %d", (int)nodeTag(expr)))); @@ -967,6 +972,11 @@ Oid exprCollation(const Node* expr) case T_UserSetElem: coll = userSetElemTypeCollInfo(expr, exprCollation); break; +#ifdef USE_SPQ + case T_DMLActionExpr: + coll = InvalidOid; + break; +#endif default: ereport( ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), errmsg("unrecognized node type: %d", (int)nodeTag(expr)))); @@ -1703,6 +1713,9 @@ bool expression_tree_walker(Node* node, bool (*walker)(), void* context) case T_Rownum: case T_UserVar: case T_SetVariableExpr: +#ifdef USE_SPQ + case T_DMLActionExpr: +#endif /* primitive node types with no expression subnodes */ break; case T_WithCheckOption: @@ -2792,6 +2805,14 @@ Node* expression_tree_mutator(Node* node, Node* (*mutator)(Node*, void*), void* MUTATE(newnode->val, use->val, Expr*); return (Node*)newnode; } break; +#ifdef USE_SPQ + case T_DMLActionExpr: { + DMLActionExpr *action_expr = (DMLActionExpr *) node; + DMLActionExpr *newnode = NULL; + FLATCOPY(newnode, action_expr, DMLActionExpr, isCopy); + return (Node *)newnode; + } break; +#endif default: ereport(ERROR, (errcode(ERRCODE_UNRECOGNIZED_NODE_TYPE), errmsg("unrecognized node type: %d", (int)nodeTag(node)))); diff --git a/src/common/backend/nodes/outfuncs.cpp b/src/common/backend/nodes/outfuncs.cpp index 73b83343c..fc6954776 100755 --- a/src/common/backend/nodes/outfuncs.cpp +++ b/src/common/backend/nodes/outfuncs.cpp @@ -944,6 +944,11 @@ static void _outModifyTable(StringInfo str, ModifyTable* node) if (t_thrd.proc->workingVersionNum >= SUPPORT_VIEW_AUTO_UPDATABLE) { WRITE_NODE_FIELD(withCheckOptionLists); } +#ifdef USE_SPQ + if (t_thrd.proc->workingVersionNum >= SPQ_VERSION_NUM) { + WRITE_NODE_FIELD(isSplitUpdates); + } +#endif } static void _outUpsertClause(StringInfo str, const UpsertClause* node) @@ -1486,6 +1491,23 @@ static void _outSpqBitmapHeapScan(StringInfo str, SpqBitmapHeapScan* node) WRITE_NODE_TYPE("SPQBITMAPHEAPSCAN"); _outBitmapHeapScanInfo(str, &node->scan); } + +static void _outDMLActionExpr(StringInfo str, const DMLActionExpr *node) +{ + WRITE_NODE_TYPE("DMLACTIONEXPR"); +} + +static void _outSplitUpdate(StringInfo str, const SplitUpdate *node) +{ + WRITE_NODE_TYPE("SPLITUPDATE"); + + WRITE_INT_FIELD(actionColIdx); + WRITE_INT_FIELD(tupleoidColIdx); + WRITE_NODE_FIELD(insertColIdx); + WRITE_NODE_FIELD(deleteColIdx); + + _outPlanInfo(str, (Plan *) node); +} #endif static void _outCStoreIndexCtidScan(StringInfo str, CStoreIndexCtidScan* node) @@ -6307,6 +6329,12 @@ static void _outNode(StringInfo str, const void* obj) case T_SpqBitmapHeapScan: _outSpqBitmapHeapScan(str, (SpqBitmapHeapScan*)obj); break; + case T_DMLActionExpr: + _outDMLActionExpr(str, (DMLActionExpr*)obj); + break; + case T_SplitUpdate: + _outSplitUpdate(str, (SplitUpdate*)obj); + break; #endif #ifdef PGXC case T_RemoteQuery: diff --git a/src/common/backend/nodes/readfuncs.cpp b/src/common/backend/nodes/readfuncs.cpp index 0d8e58e71..196b6612e 100755 --- a/src/common/backend/nodes/readfuncs.cpp +++ b/src/common/backend/nodes/readfuncs.cpp @@ -4346,7 +4346,16 @@ static ModifyTable* _readModifyTable(ModifyTable* local_node) IF_EXIST(targetlists) { READ_NODE_FIELD(targetlists); } - + if (t_thrd.proc->workingVersionNum >= SUPPORT_VIEW_AUTO_UPDATABLE) { + READ_NODE_FIELD(withCheckOptionLists); + } +#ifdef USE_SPQ + if (t_thrd.proc->workingVersionNum >= SPQ_VERSION_NUM) { + IF_EXIST(isSplitUpdates) { + READ_NODE_FIELD(isSplitUpdates); + } + } +#endif READ_DONE(); } @@ -4839,6 +4848,27 @@ static Sequence* _readSequence(void) READ_END(); } + +static DMLActionExpr * _readDMLActionExpr(void) +{ + READ_LOCALS_NO_FIELDS(DMLActionExpr); + + READ_END(); +} + +static SplitUpdate * _readSplitUpdate(void) +{ + READ_LOCALS(SplitUpdate); + + READ_INT_FIELD(actionColIdx); + READ_INT_FIELD(tupleoidColIdx); + READ_NODE_FIELD(insertColIdx); + READ_NODE_FIELD(deleteColIdx); + + _readPlan(&local_node->plan); + + READ_END(); +} #endif static SetOp* _readSetOp(SetOp* local_node) @@ -6520,6 +6550,10 @@ Node* parseNodeString(void) return_value = _readSpqIndexOnlyScan(); } else if (MATCH("SPQBITMAPHEAPSCAN", 17)) { return_value = _readSpqBitmapHeapScan(); + } else if (MATCH("DMLACTIONEXPR", 13)) { + return_value = _readDMLActionExpr(); + } else if (MATCH("SPLITUPDATE", 11)) { + return_value = _readSplitUpdate(); #endif } else if (MATCH("BITMAPHEAPSCAN", 14)) { return_value = _readBitmapHeapScan(NULL); diff --git a/src/common/backend/utils/adt/ruleutils.cpp b/src/common/backend/utils/adt/ruleutils.cpp index bfa0779b3..d15055e19 100644 --- a/src/common/backend/utils/adt/ruleutils.cpp +++ b/src/common/backend/utils/adt/ruleutils.cpp @@ -10572,6 +10572,11 @@ static void get_rule_expr(Node* node, deparse_context* context, bool showimplici appendStringInfo(buf, "(%d)", pkey->length); } break; +#ifdef USE_SPQ + case T_DMLActionExpr: + appendStringInfo(buf, "DMLAction"); + break; +#endif default: if (context->qrw_phase) appendStringInfo(buf, "", (int)nodeTag(node)); diff --git a/src/gausskernel/optimizer/util/clauses.cpp b/src/gausskernel/optimizer/util/clauses.cpp index dd2e0bb1f..b9073a7f4 100644 --- a/src/gausskernel/optimizer/util/clauses.cpp +++ b/src/gausskernel/optimizer/util/clauses.cpp @@ -3409,6 +3409,13 @@ Node* eval_const_expressions_mutator(Node* node, eval_const_expressions_context* break; } + if (IS_SPQ_COORDINATOR) { + /* recurse into query structure if requested */ + if (IsA(node, Query) && context->recurse_queries) { + return (Node *)query_tree_mutator((Query *)node, (Node * (*)(Node *, void *)) eval_const_expressions_mutator, (void *)context, 0); + } + } + ELOG_FIELD_NAME_START(IsA(node, TargetEntry) ? ((TargetEntry*)node)->resname : NULL); /* diff --git a/src/gausskernel/optimizer/util/optcommon.cpp b/src/gausskernel/optimizer/util/optcommon.cpp index f2b4018d2..f5d96f95b 100755 --- a/src/gausskernel/optimizer/util/optcommon.cpp +++ b/src/gausskernel/optimizer/util/optcommon.cpp @@ -177,6 +177,9 @@ void GetPlanNodePlainText( else *pname = *sname = *pt_options = "Spq Bitmap Heap Scan"; break; + case T_SplitUpdate: + *pname = *sname = *pt_operation = "Split"; + break; #endif case T_CStoreScan: *pt_operation = "TABLE ACCESS"; diff --git a/src/gausskernel/optimizer/util/planmem_walker.cpp b/src/gausskernel/optimizer/util/planmem_walker.cpp index 72c998635..245f3382f 100644 --- a/src/gausskernel/optimizer/util/planmem_walker.cpp +++ b/src/gausskernel/optimizer/util/planmem_walker.cpp @@ -485,6 +485,7 @@ bool plan_tree_walker(Node* node, MethodWalker walker, void* context) break; + case T_SplitUpdate: case T_AssertOp: if (walk_plan_node_fields((Plan *) node, walker, context)) return true; diff --git a/src/gausskernel/process/stream/execStream.cpp b/src/gausskernel/process/stream/execStream.cpp index 40cdc3b9b..ae0e0633b 100755 --- a/src/gausskernel/process/stream/execStream.cpp +++ b/src/gausskernel/process/stream/execStream.cpp @@ -172,6 +172,9 @@ const char* GetStreamTypeRedistribute(Stream* node) case REMOTE_ROUNDROBIN: stream_tag = "ROUNDROBIN"; break; + case REMOTE_DML_WRITE_NODE: + stream_tag = "DML REDISTRIBUTE"; + break; #endif default: { diff --git a/src/gausskernel/process/stream/streamProducer.cpp b/src/gausskernel/process/stream/streamProducer.cpp index fafda55eb..d8fc3060a 100755 --- a/src/gausskernel/process/stream/streamProducer.cpp +++ b/src/gausskernel/process/stream/streamProducer.cpp @@ -2143,6 +2143,12 @@ void StreamProducer::SetDest(bool is_vec_plan) else m_dest = DestTupleRoundRobin; break; +#ifdef USE_SPQ + case REMOTE_DML_WRITE_NODE: + if (!is_vec_plan) + m_dest = DestTupleDML; + break; +#endif case PARALLEL_NONE: case REMOTE_DISTRIBUTE: case REMOTE_SPLIT_DISTRIBUTE: @@ -2339,6 +2345,24 @@ void StreamProducer::roundRobinStream(VectorBatch* batch) { roundRobinBatch(batch); } + +#ifdef USE_SPQ +void StreamProducer::dmlStream(TupleTableSlot* tuple, DestReceiver* self) +{ + assembleStreamMessage(tuple, self, &m_tupleBuffer); + + int write_node_index = u_sess->attr.attr_spq.spq_wr_node_index; + sendByteStream(write_node_index + m_roundRobinIdx * m_plan->num_nodes); + + m_roundRobinIdx++; + /* only send to write node. */ + int write_dop = m_connNum / m_plan->num_nodes; + m_roundRobinIdx = m_roundRobinIdx % write_dop; + + /* reset tuple buffer. */ + resetStringInfo(&m_tupleBuffer); +} +#endif template void StreamProducer::roundRobinBatch(VectorBatch* batch) @@ -2811,4 +2835,4 @@ uint2 GetTargetConsumerNodeIdx(ExecBoundary* enBoundary, Const** distValues, int void StreamProducer::setEcontext(ExprContext* econtext) { m_econtext = econtext; -} \ No newline at end of file +} diff --git a/src/gausskernel/process/tcop/dest.cpp b/src/gausskernel/process/tcop/dest.cpp index efd2f6223..9d1794d13 100644 --- a/src/gausskernel/process/tcop/dest.cpp +++ b/src/gausskernel/process/tcop/dest.cpp @@ -156,6 +156,7 @@ DestReceiver* CreateDestReceiver(CommandDest dest) #ifdef USE_SPQ case DestTupleRoundRobin: case DestBatchRoundRobin: + case DestTupleDML: #endif case DestBatchBroadCast: case DestBatchLocalBroadCast: diff --git a/src/gausskernel/runtime/executor/Makefile b/src/gausskernel/runtime/executor/Makefile index 75189777c..474227e2b 100644 --- a/src/gausskernel/runtime/executor/Makefile +++ b/src/gausskernel/runtime/executor/Makefile @@ -49,7 +49,7 @@ OBJS = execAmi.o execCurrent.o execGrouping.o execJunk.o execMain.o \ nodeForeignscan.o nodeWindowAgg.o tstoreReceiver.o spi.o \ nodePartIterator.o nodeStub.o execClusterResize.o lightProxy.o execMerge.o \ nodeExtensible.o route.o nodeTrainModel.o db4ai_common.o spiDbesql.o \ - nodeProjectSet.o nodeSortGroup.o nodeAssertOp.o nodeSequence.o \ + nodeProjectSet.o nodeSortGroup.o nodeAssertOp.o nodeSequence.o nodeSplitUpdate.o \ nodeShareInputScan.o nodeSpqSeqscan.o nodeSpqIndexscan.o nodeSpqIndexonlyscan.o nodeSpqBitmapHeapscan.o override CPPFLAGS += -D__STDC_FORMAT_MACROS diff --git a/src/gausskernel/runtime/executor/execMain.cpp b/src/gausskernel/runtime/executor/execMain.cpp index 8e6875997..8349263c5 100755 --- a/src/gausskernel/runtime/executor/execMain.cpp +++ b/src/gausskernel/runtime/executor/execMain.cpp @@ -1841,6 +1841,9 @@ void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc resultRelInfo->ri_projectReturning = NULL; resultRelInfo->ri_mergeTargetRTI = 0; resultRelInfo->ri_mergeState = (MergeState *)palloc0(sizeof(MergeState)); +#ifdef USE_SPQ + resultRelInfo->ri_actionAttno = InvalidAttrNumber; +#endif } /* diff --git a/src/gausskernel/runtime/executor/execProcnode.cpp b/src/gausskernel/runtime/executor/execProcnode.cpp index 390a7533c..fc5be0b57 100755 --- a/src/gausskernel/runtime/executor/execProcnode.cpp +++ b/src/gausskernel/runtime/executor/execProcnode.cpp @@ -168,6 +168,7 @@ #include "executor/node/nodeSpqIndexscan.h" #include "executor/node/nodeSpqIndexonlyscan.h" #include "executor/node/nodeSpqBitmapHeapscan.h" +#include "executor/node/nodeSplitUpdate.h" #endif #define NODENAMELEN 64 static TupleTableSlot *ExecProcNodeFirst(PlanState *node); @@ -320,6 +321,8 @@ PlanState* ExecInitNodeByType(Plan* node, EState* estate, int eflags) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("spqbitmapheapscan hook init_spqbitmapheapscan_hook uninited."))); } + case T_SplitUpdate: + return (PlanState *)ExecInitSplitUpdate((SplitUpdate *)node, estate, eflags); #endif case T_IndexScan: return (PlanState*)ExecInitIndexScan((IndexScan*)node, estate, eflags); @@ -1121,15 +1124,19 @@ static void ExecEndNodeByType(PlanState* node) break; case T_AssertOpState: - ExecEndAssertOp((AssertOpState *) node); + ExecEndAssertOp((AssertOpState *)node); break; case T_ShareInputScanState: - ExecEndShareInputScan((ShareInputScanState *) node); + ExecEndShareInputScan((ShareInputScanState *)node); break; case T_SequenceState: - ExecEndSequence((SequenceState *) node); + ExecEndSequence((SequenceState *)node); + break; + + case T_SplitUpdateState: + ExecEndSplitUpdate((SplitUpdateState *)node); break; #endif case T_CStoreScanState: diff --git a/src/gausskernel/runtime/executor/nodeModifyTable.cpp b/src/gausskernel/runtime/executor/nodeModifyTable.cpp index a0f697420..4682df02e 100644 --- a/src/gausskernel/runtime/executor/nodeModifyTable.cpp +++ b/src/gausskernel/runtime/executor/nodeModifyTable.cpp @@ -1588,8 +1588,11 @@ TupleTableSlot* ExecInsertT(ModifyTableState* state, TupleTableSlot* slot, Tuple #ifdef ENABLE_MULTIPLE_NODES state->operation != CMD_MERGE && state->mt_upsert->us_action == UPSERT_NONE && #endif - !useHeapMultiInsert) - ExecARInsertTriggers(estate, result_rel_info, partition_id, bucket_id, (HeapTuple)tuple, recheck_indexes); + !useHeapMultiInsert) { + if (!state->mt_isSplitUpdates) { + ExecARInsertTriggers(estate, result_rel_info, partition_id, bucket_id, (HeapTuple)tuple, recheck_indexes); + } + } /* try to insert tuple into mlog-table. */ if (target_rel != NULL && target_rel->rd_mlogoid != InvalidOid) { @@ -1790,6 +1793,11 @@ ldelete: errmsg("tuple to be updated was already modified by an operation triggered by the current command"), errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows."))); + if (node->mt_isSplitUpdates) { + ereport(ERROR, + (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION), + errmsg("multiple updates to a row by the same query is not allowed"))); + } return NULL; case TM_Ok: { @@ -1919,7 +1927,9 @@ end:; } #ifdef PGXC - ExecARDeleteTriggers(estate, result_rel_info, deletePartitionOid, bucketid, oldtuple, tupleid); + if (!node->mt_isSplitUpdates) { + ExecARDeleteTriggers(estate, result_rel_info, deletePartitionOid, bucketid, oldtuple, tupleid); + } #else /* AFTER ROW DELETE Triggers */ ExecARDeleteTriggers(estate, result_rel_info, deletePartitionOid, tupleid); @@ -3410,6 +3420,10 @@ static TupleTableSlot* ExecModifyTable(PlanState* state) List *partition_list = NIL; int resultRelationNum = node->mt_ResultTupleSlots ? list_length(node->mt_ResultTupleSlots): node->mt_nplans; +#ifdef USE_SPQ + AttrNumber action_attno = InvalidAttrNumber; + int action = -1; +#endif CHECK_FOR_INTERRUPTS(); @@ -3451,6 +3465,9 @@ static TupleTableSlot* ExecModifyTable(PlanState* state) /* Preload local variables */ result_rel_info = node->resultRelInfo + estate->result_rel_index; subPlanState = node->mt_plans[node->mt_whichplan]; +#ifdef USE_SPQ + action_attno = result_rel_info->ri_actionAttno; +#endif #ifdef ENABLE_MULTIPLE_NODES /* Initialize remote plan state */ remote_rel_state = node->mt_remoterels[node->mt_whichplan]; @@ -3519,6 +3536,9 @@ static TupleTableSlot* ExecModifyTable(PlanState* state) result_rel_info = node->resultRelInfo + estate->result_rel_index; estate->es_result_relation_info = result_rel_info; junk_filter = result_rel_info->ri_junkFilter; +#ifdef USE_SPQ + action_attno = estate->es_result_relation_info->ri_actionAttno; +#endif if (!node->isinherit) { partExprKeyStr = node->partExprKeyStrArray[estate->result_rel_index]; } @@ -3706,6 +3726,14 @@ static TupleTableSlot* ExecModifyTable(PlanState* state) old_tuple = DatumGetHeapTupleHeader(datum); } + if (IS_SPQ_RUNNING && AttributeNumberIsValid(action_attno)) { + datum = ExecGetJunkAttribute(slot, action_attno, &isNull); + /* shouldn't ever get a null result... */ + if (isNull) { + ereport(ERROR, (errmsg("action_attno is NULL"))); + } + action = DatumGetInt32(datum); + } } /* @@ -3731,20 +3759,28 @@ static TupleTableSlot* ExecModifyTable(PlanState* state) slot = ExecReplace(estate, node, slot, plan_slot, bucketid, hi_options, partition_list, partExprKeyStr); } break; - case CMD_UPDATE: { - slot = ExecUpdate(tuple_id, - old_partition_oid, - bucketid, - old_tuple, - slot, - plan_slot, - &node->mt_epqstate, - node, - node->canSetTag, - part_key_updated, - NULL, - partExprKeyStr); - } break; + case CMD_UPDATE: + if (!IS_SPQ_RUNNING) { + slot = ExecUpdate(tuple_id, + old_partition_oid, + bucketid, + old_tuple, + slot, + plan_slot, + &node->mt_epqstate, + node, + node->canSetTag, + part_key_updated, + NULL, + partExprKeyStr); + } else if (DML_INSERT == action) { + slot = ExecInsertT(node, slot, plan_slot, estate, node->canSetTag, + hi_options, &partition_list, partExprKeyStr); + } else { /* DML_DELETE */ + slot = ExecDelete(tuple_id, old_partition_oid, bucketid, old_tuple, plan_slot, + &node->mt_epqstate, node, false); + } + break; case CMD_DELETE: slot = ExecDelete( tuple_id, old_partition_oid, bucketid, old_tuple, plan_slot, &node->mt_epqstate, node, node->canSetTag); @@ -3954,6 +3990,20 @@ ModifyTableState* ExecInitModifyTable(ModifyTable* node, EState* estate, int efl mt_state->fireBSTriggers = true; + mt_state->mt_isSplitUpdates = NULL; + if (IS_SPQ_RUNNING) { + if (node->isSplitUpdates) { + if (list_length(node->isSplitUpdates) != nplans) { + ereport(ERROR, (errmsg("ModifyTable node is missing is-split-update information"))); + } + mt_state->mt_isSplitUpdates = (bool *)palloc0(nplans * sizeof(bool)); + i = 0; + foreach(l, node->isSplitUpdates) { + mt_state->mt_isSplitUpdates[i++] = (bool)lfirst_int(l); + } + } + } + /* * call ExecInitNode on each of the plans to be executed and save the * results into the array "mt_plans". This is also a convenient place to @@ -4388,6 +4438,22 @@ ModifyTableState* ExecInitModifyTable(ModifyTable* node, EState* estate, int efl } ExecInitJunkAttr(estate, operation, sub_plan->targetlist, result_rel_info); } + if (IS_SPQ_RUNNING && operation == CMD_UPDATE) { + char relkind = result_rel_info->ri_RelationDesc->rd_rel->relkind; + if (relkind == RELKIND_RELATION || + relkind == RELKIND_MATVIEW || + relkind == PARTTYPE_PARTITIONED_RELATION) { + if (mt_state->mt_isSplitUpdates && mt_state->mt_isSplitUpdates[i]) { + JunkFilter* j = result_rel_info->ri_junkFilter; + result_rel_info->ri_actionAttno = ExecFindJunkAttribute(j, "DMLAction"); + if (!AttributeNumberIsValid(result_rel_info->ri_actionAttno)) { + ereport(ERROR, (errmsg("could not find junk action column"))); + } + } else { + ereport(ERROR, (errmsg("parallel update could not find mt_isSplitUpdates"))); + } + } + } } } } else { diff --git a/src/gausskernel/runtime/executor/nodeShareInputScan.cpp b/src/gausskernel/runtime/executor/nodeShareInputScan.cpp index fdaa2d401..e4370a1c4 100644 --- a/src/gausskernel/runtime/executor/nodeShareInputScan.cpp +++ b/src/gausskernel/runtime/executor/nodeShareInputScan.cpp @@ -502,7 +502,8 @@ void ShareInputShmemInit(void) if (!found || t_thrd.shemem_ptr_cxt.shareinput_Xslice_hash == nullptr) { HASHCTL info; - + errno_t rc = memset_s(&info, sizeof(info), 0, sizeof(info)); + securec_check(rc, "\0", "\0"); info.keysize = sizeof(shareinput_tag); info.entrysize = sizeof(shareinput_Xslice_state); diff --git a/src/gausskernel/runtime/executor/nodeSplitUpdate.cpp b/src/gausskernel/runtime/executor/nodeSplitUpdate.cpp new file mode 100644 index 000000000..81245c6c0 --- /dev/null +++ b/src/gausskernel/runtime/executor/nodeSplitUpdate.cpp @@ -0,0 +1,207 @@ +/*------------------------------------------------------------------------- + * + * nodeSplitUpdate.cpp + * Implementation of nodeSplitUpdate. + * + * Portions Copyright (c) 2012, EMC Corp. + * Portions Copyright (c) 2012-2022 VMware, Inc. or its affiliates. + * Portions Copyright (c) 2023 Huawei Technologies Co.,Ltd. + * + * IDENTIFICATION + * src/gausskernel/runtime/executor/nodeSplitUpdate.cpp + * + *------------------------------------------------------------------------- + */ +#ifdef USE_SPQ +#include "postgres.h" + +#include "miscadmin.h" +#include "executor/node/nodeSplitUpdate.h" +#include "executor/executor.h" +#include "commands/tablecmds.h" +#include "executor/instrument.h" +#include "utils/memutils.h" + + +/* Splits an update tuple into a DELETE/INSERT tuples. */ +static void SplitTupleTableSlot(TupleTableSlot *slot, List *targetList, SplitUpdate *plannode, + SplitUpdateState *node, Datum *values, bool *nulls); + +/* Memory used by node */ +#define SPLITUPDATE_MEM 1 + +/* Split TupleTableSlot into a DELETE and INSERT TupleTableSlot */ +static void SplitTupleTableSlot(TupleTableSlot *slot, List *targetList, SplitUpdate *plannode, SplitUpdateState *node, + Datum *values, bool *nulls) +{ + ListCell *element; + Datum *delete_values; + bool *delete_nulls; + Datum *insert_values; + bool *insert_nulls; + ListCell *deleteAtt = list_head(plannode->deleteColIdx); + ListCell *insertAtt = list_head(plannode->insertColIdx); + + heap_slot_getallattrs(slot); + delete_values = node->deleteTuple->tts_values; + delete_nulls = node->deleteTuple->tts_isnull; + insert_values = node->insertTuple->tts_values; + insert_nulls = node->insertTuple->tts_isnull; + + /* Iterate through new TargetList and match old and new values. The action is also added in this containsTuple. */ + foreach (element, targetList) { + TargetEntry *tle = (TargetEntry*)lfirst(element); + AttrNumber attno = tle->resno; + + if (IsA(tle->expr, DMLActionExpr)) { + /* Set the corresponding action to the new tuples. */ + delete_values[attno - 1] = Int32GetDatum((int)DML_DELETE); + delete_nulls[attno - 1] = false; + + insert_values[attno - 1] = Int32GetDatum((int)DML_INSERT); + insert_nulls[attno -1 ] = false; + } else if (attno <= list_length(plannode->insertColIdx)) { + /* Old and new values */ + int deleteAttNo = lfirst_int(deleteAtt); + int insertAttNo = lfirst_int(insertAtt); + + if (deleteAttNo == -1) { + delete_values[attno - 1] = (Datum) 0; + delete_nulls[attno - 1] = true; + } else { + delete_values[attno - 1] = values[deleteAttNo - 1]; + delete_nulls[attno - 1] = nulls[deleteAttNo - 1]; + } + + insert_values[attno - 1] = values[insertAttNo - 1]; + insert_nulls[attno - 1] = nulls[insertAttNo - 1]; + + deleteAtt = lnext(deleteAtt); + insertAtt = lnext(insertAtt); + } else { + if (IsA(tle->expr, Var)) { + Var *var = (Var *) tle->expr; + + Assert(var->varno == OUTER_VAR); + + delete_values[attno - 1] = values[var->varattno - 1]; + delete_nulls[attno - 1] = nulls[var->varattno - 1]; + + insert_values[attno - 1] = values[var->varattno - 1]; + insert_nulls[attno - 1] = nulls[var->varattno - 1]; + + Assert(var->vartype == TupleDescAttr(slot->tts_tupleDescriptor, var->varattno - 1)->atttypid); + } + /* `Resjunk' values */ + } + } +} + +/** + * Splits every TupleTableSlot into two TupleTableSlots: DELETE and INSERT. + */ +static TupleTableSlot *ExecSplitUpdate(PlanState *pstate) +{ + SplitUpdateState *node = castNode(SplitUpdateState, pstate); + PlanState *outerNode = outerPlanState(node); + SplitUpdate *plannode = (SplitUpdate *) node->ps.plan; + + TupleTableSlot *slot = NULL; + TupleTableSlot *result = NULL; + + Assert(outerNode != NULL); + + /* Returns INSERT TupleTableSlot. */ + if (!node->processInsert) { + result = node->insertTuple; + + node->processInsert = true; + } else { + Datum *values; + bool *nulls; + /* Creates both TupleTableSlots. Returns DELETE TupleTableSlots.*/ + /* Get the Delete tuple */ + slot = ExecProcNode(outerNode); + + if (TupIsNull(slot)) { + return NULL; + } + + /* `Split' update into delete and insert */ + heap_slot_getallattrs(slot); + values = slot->tts_values; + nulls = slot->tts_isnull; + + ExecStoreAllNullTuple(node->deleteTuple); + ExecStoreAllNullTuple(node->insertTuple); + + SplitTupleTableSlot(slot, plannode->plan.targetlist, plannode, node, values, nulls); + + result = node->deleteTuple; + node->processInsert = false; + + } + + return result; +} + +/* + * Init SplitUpdate Node. A memory context is created to hold Split Tuples. + * + */ +SplitUpdateState* ExecInitSplitUpdate(SplitUpdate *node, EState *estate, int eflags) +{ + SplitUpdateState *splitupdatestate; + Plan *outerPlan; + bool has_oids; + TupleDesc tupDesc; + + /* Check for unsupported flags */ + Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK | EXEC_FLAG_REWIND))); + + splitupdatestate = makeNode(SplitUpdateState); + splitupdatestate->ps.plan = (Plan *)node; + splitupdatestate->ps.state = estate; + splitupdatestate->ps.ExecProcNode = ExecSplitUpdate; + splitupdatestate->processInsert = true; + + /* + * then initialize outer plan + */ + outerPlan = outerPlan(node); + outerPlanState(splitupdatestate) = ExecInitNode(outerPlan, estate, eflags); + + ExecAssignExprContext(estate, &splitupdatestate->ps); + + ExecInitResultTupleSlot(estate, &splitupdatestate->ps); + + if (!ExecContextForcesOids((PlanState*) splitupdatestate, &has_oids)) { + has_oids = false; + } + + tupDesc = ExecTypeFromTL(node->plan.targetlist, has_oids); + splitupdatestate->insertTuple = ExecInitExtraTupleSlot(estate); + ExecSetSlotDescriptor(splitupdatestate->insertTuple, tupDesc); + splitupdatestate->deleteTuple = ExecInitExtraTupleSlot(estate); + ExecSetSlotDescriptor(splitupdatestate->deleteTuple, tupDesc); + + /* + * DML nodes do not project. + */ + ExecAssignResultTypeFromTL(&splitupdatestate->ps); + splitupdatestate->ps.ps_ProjInfo = NULL; + + return splitupdatestate; +} + +/* Release Resources Requested by SplitUpdate node. */ +void ExecEndSplitUpdate(SplitUpdateState *node) +{ + ExecFreeExprContext(&node->ps); + ExecClearTuple(node->ps.ps_ResultTupleSlot); + ExecClearTuple(node->insertTuple); + ExecClearTuple(node->deleteTuple); + ExecEndNode(outerPlanState(node)); +} + +#endif diff --git a/src/gausskernel/storage/access/common/printtup.cpp b/src/gausskernel/storage/access/common/printtup.cpp index 26c1a3934..4eac715fd 100644 --- a/src/gausskernel/storage/access/common/printtup.cpp +++ b/src/gausskernel/storage/access/common/printtup.cpp @@ -68,6 +68,7 @@ static void printLocalRoundRobinBatch(VectorBatch *batch, DestReceiver *self); #ifdef USE_SPQ static void printRoundRobinTuple(TupleTableSlot *tuple, DestReceiver *self); static void printRoundRobinBatch(VectorBatch *batch, DestReceiver *self); +static void printDMLTuple(TupleTableSlot *tuple, DestReceiver *self); #endif static void printHybridBatch(VectorBatch *batch, DestReceiver *self); static void finalizeLocalStream(DestReceiver *self); @@ -122,6 +123,9 @@ DestReceiver *createStreamDestReceiver(CommandDest dest) self->pub.receiveSlot = printHybridTuple; break; #ifdef USE_SPQ + case DestTupleDML: + self->pub.receiveSlot = printDMLTuple; + break; case DestTupleRoundRobin: self->pub.receiveSlot = printRoundRobinTuple; break; @@ -258,6 +262,19 @@ static void printRoundRobinTuple(TupleTableSlot *tuple, DestReceiver *self) streamReceiver *rec = (streamReceiver *)self; rec->arg->roundRobinStream(tuple, self); } + +/* + * @Description: Send a tuple to write node + * + * @param[IN] tuple: tuple to send. + * @param[IN] dest: dest receiver. + * @return void + */ +static void printDMLTuple(TupleTableSlot *tuple, DestReceiver *self) +{ + streamReceiver *rec = (streamReceiver *)self; + rec->arg->dmlStream(tuple, self); +} #endif /* * @Description: Send a tuple in hybrid ways, some data with special values diff --git a/src/include/distributelayer/streamProducer.h b/src/include/distributelayer/streamProducer.h index 85c105452..de2babd33 100644 --- a/src/include/distributelayer/streamProducer.h +++ b/src/include/distributelayer/streamProducer.h @@ -109,6 +109,7 @@ public: #ifdef USE_SPQ /* Send batch with Roundrobin. */ void roundRobinStream(VectorBatch* batch); + void dmlStream(TupleTableSlot* tuple, DestReceiver* self); #endif /* Local roundrobin the tuple through memory. */ diff --git a/src/include/executor/node/nodeSplitUpdate.h b/src/include/executor/node/nodeSplitUpdate.h new file mode 100644 index 000000000..b6a61126a --- /dev/null +++ b/src/include/executor/node/nodeSplitUpdate.h @@ -0,0 +1,26 @@ +/*------------------------------------------------------------------------- + * + * nodeSplitUpdate.h + * Prototypes for nodeSplitUpdate. + * + * Portions Copyright (c) 2012, EMC Corp. + * Portions Copyright (c) 2012-2022 VMware, Inc. or its affiliates. + * Portions Copyright (c) 2023 Huawei Technologies Co.,Ltd. + * + * IDENTIFICATION + * src/include/executor/node/nodeSplitUpdate.h + * + *------------------------------------------------------------------------- + */ + +#ifndef OPENGAUSS_SERVER_NODESPLITUPDATE_H +#define OPENGAUSS_SERVER_NODESPLITUPDATE_H + +#ifdef USE_SPQ +#include "nodes/execnodes.h" + +extern SplitUpdateState* ExecInitSplitUpdate(SplitUpdate *node, EState *estate, int eflags); +extern void ExecEndSplitUpdate(SplitUpdateState *node); +#endif /* USE_SPQ */ + +#endif //OPENGAUSS_SERVER_NODESPLITUPDATE_H diff --git a/src/include/knl/knl_guc/knl_session_attr_spq.h b/src/include/knl/knl_guc/knl_session_attr_spq.h index 225875226..fbebbcf6d 100644 --- a/src/include/knl/knl_guc/knl_session_attr_spq.h +++ b/src/include/knl/knl_guc/knl_session_attr_spq.h @@ -69,6 +69,17 @@ typedef struct knl_session_attr_spq { bool spq_optimizer_print_optimization_stats; bool spq_optimizer_print_optimization_cost; + + /* Optimizer Parallel DML */ + bool spq_enable_insert_select; + int spq_insert_dop_num; + bool spq_enable_insert_from_tableless; + bool spq_enable_insert_order_sensitive; + bool spq_enable_delete; + int spq_delete_dop_num; + bool spq_enable_update; + int spq_update_dop_num; + int spq_wr_node_index; /* array of xforms disable flags */ #define OPTIMIZER_XFORMS_COUNT 400 /* number of transformation rules */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 2b61f8bb6..ed3906472 100755 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -590,6 +590,9 @@ typedef struct ResultRelInfo { List* ri_WithCheckOptionExprs; ProjectionInfo* ri_updateWhere; /* list of ON CONFLICT DO UPDATE exprs (qual)*/ +#ifdef USE_SPQ + AttrNumber ri_actionAttno; /* is this an INSERT or DELETE ? */ +#endif } ResultRelInfo; /* bloom filter controller */ @@ -1539,6 +1542,9 @@ typedef struct ModifyTableState { List* mt_ResultTupleSlots; /* for multiple modifying, ResultTupleSlot list for build mt_ProjInfos. */ ProjectionInfo** mt_ProjInfos; /* for multiple modifying, projectInfo list array for each result relation. */ char** partExprKeyStrArray; /* for multiple modifying, partition expr key */ +#ifdef USE_SPQ + bool* mt_isSplitUpdates; /* per-subplan flag to indicate if it's a split update */ +#endif } ModifyTableState; typedef struct CopyFromManagerData* CopyFromManager; @@ -1880,6 +1886,21 @@ typedef struct SequenceState { */ bool initState; } SequenceState; + +/* + * ExecNode for Split. + * This operator contains a Plannode in PlanState. + * The Plannode contains indexes to the ctid, insert, delete, resjunk columns + * needed for adding the action (Insert/Delete). + * A MemoryContext and TupleTableSlot are maintained to keep the INSERT + * tuple until requested. + */ +typedef struct SplitUpdateState { + PlanState ps; + bool processInsert; /* flag that specifies the operator's next action. */ + TupleTableSlot *insertTuple; /* tuple to Insert */ + TupleTableSlot *deleteTuple; /* tuple to Delete */ +} SplitUpdateState; #endif /* * These structs store information about index quals that don't have simple diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index dad691b01..0d5a655ef 100755 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -184,6 +184,7 @@ typedef enum NodeTag { T_AssertOpState, T_ShareInputScanState, T_SequenceState, + T_SplitUpdateState, #endif T_IndexScanState, T_IndexOnlyScanState, @@ -284,6 +285,9 @@ typedef enum NodeTag { T_PseudoTargetEntry, T_PrefixKey, T_SetVariableExpr, +#ifdef USE_SPQ + T_DMLActionExpr, +#endif /* * TAGS FOR EXPRESSION STATE NODES (execnodes.h) diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index d5da7fb38..3dd1cc596 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -474,6 +474,9 @@ typedef struct ModifyTable { OpMemInfo mem_info; /* Memory info for modify node */ List* targetlists; /* For multi-relation modifying */ List* withCheckOptionLists; /* per-target-table WCO lists */ +#ifdef USE_SPQ + List *isSplitUpdates; +#endif } ModifyTable; /* ---------------- @@ -1832,6 +1835,11 @@ typedef struct AssertOp { int errcode; /* SQL error code */ List *errmessage; /* error message */ } AssertOp; + +typedef enum DMLAction { + DML_DELETE, + DML_INSERT +} DMLAction; #endif /* USE_SPQ */ #endif /* PLANNODES_H */ diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h index 9cd4ed042..e46b92f6f 100644 --- a/src/include/nodes/primnodes.h +++ b/src/include/nodes/primnodes.h @@ -1483,6 +1483,17 @@ typedef struct UpsertExpr { Node* upsertWhere; /* Qualifiers for upsert's update clause to check */ } UpsertExpr; +#ifdef USE_SPQ +/* + * DMLActionExpr + * + * Represents the expression which introduces the action in a SplitUpdate statement + */ +typedef struct DMLActionExpr { + Expr xpr; +} DMLActionExpr; +#endif + /* * DB4AI */ diff --git a/src/include/optimizer/stream_cost.h b/src/include/optimizer/stream_cost.h index f91b4d0a6..4316d40fc 100644 --- a/src/include/optimizer/stream_cost.h +++ b/src/include/optimizer/stream_cost.h @@ -43,6 +43,7 @@ typedef enum { REMOTE_HYBRID, /* Hybrid send data. */ #ifdef USE_SPQ REMOTE_ROUNDROBIN, + REMOTE_DML_WRITE_NODE, /* DML only send datas to write node. */ #endif LOCAL_DISTRIBUTE, /* Distribute data to all threads at local node. */ LOCAL_BROADCAST, /* Broadcast data to all threads at local node. */ diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h index afc8440ce..b9dec2061 100644 --- a/src/include/tcop/dest.h +++ b/src/include/tcop/dest.h @@ -105,6 +105,7 @@ typedef enum { #ifdef USE_SPQ DestTupleRoundRobin, DestBatchRoundRobin, + DestTupleDML, #endif DestBatchBroadCast, /* results send to consumer thread in a broadcast way */