!3501 opfusion支持 insert ... select

Merge pull request !3501 from wuchenglin/master_insert_sub_0530
This commit is contained in:
opengauss_bot
2023-06-19 15:13:44 +00:00
committed by Gitee
18 changed files with 674 additions and 48 deletions

View File

@ -116,7 +116,7 @@ THR_LOCAL bool is_syncup_producer = false;
void InitPlan(QueryDesc *queryDesc, int eflags);
static void CheckValidRowMarkRel(Relation rel, RowMarkType markType);
static void ExecPostprocessPlan(EState *estate);
static void ExecEndPlan(PlanState *planstate, EState *estate);
void ExecEndPlan(PlanState *planstate, EState *estate);
static void ExecCollectMaterialForSubplan(EState *estate);
#ifdef ENABLE_MOT
static void ExecutePlan(EState *estate, PlanState *planstate, CmdType operation, bool sendTuples, long numberTuples,
@ -2021,7 +2021,7 @@ static void ExecPostprocessPlan(EState *estate)
* tuple tables must be cleared or dropped to ensure pins are released.
* ----------------------------------------------------------------
*/
static void ExecEndPlan(PlanState *planstate, EState *estate)
void ExecEndPlan(PlanState *planstate, EState *estate)
{
ResultRelInfo *resultRelInfo = NULL;
int i;

View File

@ -186,8 +186,6 @@ void Start_Prefetch(TableScanDesc scan, SeqScanAccessor* p_accessor, ScanDirecti
p_accessor->sa_last_prefbf = last;
}
static TupleTableSlot* SeqNext(SeqScanState* node);
static void ExecInitNextPartitionForSeqScan(SeqScanState* node);
template<TableAmType type, bool hashBucket, bool pushdown>
@ -249,7 +247,7 @@ void seq_scan_getnext(TableScanDesc scan, TupleTableSlot* slot, ScanDirection d
* This is a workhorse for ExecSeqScan
* ----------------------------------------------------------------
*/
static TupleTableSlot* SeqNext(SeqScanState* node)
TupleTableSlot* SeqNext(SeqScanState* node)
{
TableScanDesc scanDesc;
EState* estate = NULL;

View File

@ -30,8 +30,18 @@
#include "commands/matview.h"
#include "commands/sequence.h"
#include "executor/node/nodeModifyTable.h"
#include "executor/node/nodeSeqscan.h"
#include "parser/parse_coerce.h"
extern void ExecEndPlan(PlanState* planstate, EState* estate);
inline bool check_attisnull_exist(ResultRelInfo* ResultRelInfo, TupleTableSlot* slot);
static List* ExecInsertIndexTuplesOpfusion(TupleTableSlot* slot, ItemPointer tupleid, EState* estate,
Relation targetPartRel, Partition p, int2 bucketId, bool* conflict,
Bitmapset* modifiedIdxAttrs);
static TupleTableSlot* insert_real(ModifyTableState* state, TupleTableSlot* slot, EState* estate, bool canSetTag);
void InsertFusion::InitBaseParam(List* targetList) {
ListCell* lc = NULL;
int i = 0;
@ -492,3 +502,511 @@ bool InsertFusion::ResetReuseFusion(MemoryContext context, CachedPlanSource* psr
return true;
}
void InsertSubFusion::InitGlobals()
{
m_c_global = (InsertSubFusionGlobalVariable*)palloc0(sizeof(InsertSubFusionGlobalVariable));
m_global->m_reloid = getrelid(linitial_int((List*)linitial(m_global->m_planstmt->resultRelations)),
m_global->m_planstmt->rtable);
ModifyTable* node = (ModifyTable*)m_global->m_planstmt->planTree;
SeqScan* seqscan = (SeqScan*)linitial(node->plans);
List* targetList = seqscan->plan.targetlist;
m_c_local.m_ss_plan = (SeqScan*)copyObject(seqscan);
m_c_local.m_plan = (Plan*)copyObject(node);
Relation rel = heap_open(m_global->m_reloid, AccessShareLock);
m_global->m_table_type = RelationIsUstoreFormat(rel) ? TAM_USTORE : TAM_HEAP;
m_global->m_exec_func_ptr = (OpFusionExecfuncType)&InsertSubFusion::ExecInsert;
m_global->m_natts = RelationGetDescr(rel)->natts;
m_global->m_is_bucket_rel = RELATION_OWN_BUCKET(rel);
m_global->m_tupDesc = CreateTupleDescCopy(RelationGetDescr(rel));
m_global->m_tupDesc->td_tam_ops = GetTableAmRoutine(m_global->m_table_type);
heap_close(rel, AccessShareLock);
/* init param func const */
m_c_global->m_targetVarLoc = (VarLoc*)palloc0(m_global->m_natts * sizeof(VarLoc));
m_c_global->m_varNum = 0;
ListCell* lc = NULL;
int i = 0;
TargetEntry* res = NULL;
Expr* expr = NULL;
foreach (lc, targetList) {
res = (TargetEntry*)lfirst(lc);
expr = res->expr;
Assert(
IsA(expr, Var) || IsA(expr, RelabelType)
);
while (IsA(expr, RelabelType)) {
expr = ((RelabelType*)expr)->arg;
}
if (IsA(expr, Var)) {
Var* var = (Var*)expr;
m_c_global->m_targetVarLoc[m_c_global->m_varNum].varNo = var->varattno;
m_c_global->m_targetVarLoc[m_c_global->m_varNum++].scanKeyIndx = i;
}
i++;
}
m_c_global->m_targetConstNum = i;
}
void InsertSubFusion::InitLocals(ParamListInfo params)
{
m_c_local.m_estate = CreateExecutorStateForOpfusion(m_local.m_localContext, m_local.m_tmpContext);
m_c_local.m_estate->es_range_table = m_global->m_planstmt->rtable;
m_c_local.m_estate->es_plannedstmt = m_global->m_planstmt;
m_local.m_reslot = MakeSingleTupleTableSlot(m_global->m_tupDesc);
if (m_global->m_table_type == TAM_USTORE) {
m_local.m_reslot->tts_tam_ops = TableAmUstore;
}
m_local.m_values = (Datum*)palloc0(m_global->m_natts * sizeof(Datum));
m_local.m_isnull = (bool*)palloc0(m_global->m_natts * sizeof(bool));
m_c_local.m_curVarValue = (Datum*)palloc0(m_global->m_natts * sizeof(Datum));
m_c_local.m_curVarIsnull = (bool*)palloc0(m_global->m_natts * sizeof(bool));
initParams(params);
m_local.m_receiver = NULL;
m_local.m_isInsideRec = true;
m_local.m_optype = INSERT_SUB_FUSION;
}
InsertSubFusion::InsertSubFusion(MemoryContext context, CachedPlanSource* psrc, List* plantree_list, ParamListInfo params)
: OpFusion(context, psrc, plantree_list)
{
MemoryContext old_context = NULL;
if (!IsGlobal()) {
old_context = MemoryContextSwitchTo(m_global->m_context);
InitGlobals();
MemoryContextSwitchTo(old_context);
} else {
m_c_global = ((InsertSubFusion*)(psrc->opFusionObj))->m_c_global;
}
old_context = MemoryContextSwitchTo(m_local.m_localContext);
InitLocals(params);
MemoryContextSwitchTo(old_context);
}
/*
* ExecInsertIndexTuplesOpfusion
*
* This routine takes care of inserting index tuples
* into all the relations indexing the result relation
* when a heap tuple is inserted into the result relation.
* It is similar to ExecInsertIndexTuples.
*/
static List* ExecInsertIndexTuplesOpfusion(TupleTableSlot* slot, ItemPointer tupleid, EState* estate,
Relation targetPartRel, Partition p, int2 bucketId, bool* conflict,
Bitmapset* modifiedIdxAttrs)
{
List* result = NIL;
ResultRelInfo* resultRelInfo = NULL;
int i;
int numIndices;
RelationPtr relationDescs;
IndexInfo** indexInfoArray;
Datum values[INDEX_MAX_KEYS];
bool isnull[INDEX_MAX_KEYS];
Relation actualheap;
bool containGPI;
List* partitionIndexOidList = NIL;
/*
* Get information from the result relation info structure.
*/
resultRelInfo = estate->es_result_relation_info;
numIndices = resultRelInfo->ri_NumIndices;
relationDescs = resultRelInfo->ri_IndexRelationDescs;
indexInfoArray = resultRelInfo->ri_IndexRelationInfo;
actualheap = resultRelInfo->ri_RelationDesc;
containGPI = resultRelInfo->ri_ContainGPI;
/*
* for each index, form and insert the index tuple
*/
for (i = 0; i < numIndices; i++) {
Relation indexRelation = relationDescs[i];
IndexInfo* indexInfo = NULL;
IndexUniqueCheck checkUnique;
bool satisfiesConstraint = false;
Relation actualindex = NULL;
if (indexRelation == NULL) {
continue;
}
indexInfo = indexInfoArray[i];
/* If the index is marked as read-only, ignore it */
if (!indexInfo->ii_ReadyForInserts) {
continue;
}
actualindex = indexRelation;
/*
* FormIndexDatum fills in its values and isnull parameters with the
* appropriate values for the column(s) of the index.
*/
FormIndexDatum(indexInfo, slot, estate, values, isnull);
/*
* The index AM does the actual insertion, plus uniqueness checking.
*
* For an immediate-mode unique index, we just tell the index AM to
* throw error if not unique.
*
* For a deferrable unique index, we tell the index AM to just detect
* possible non-uniqueness, and we add the index OID to the result
* list if further checking is needed.
*/
if (!indexRelation->rd_index->indisunique) {
checkUnique = UNIQUE_CHECK_NO;
} else if (conflict != NULL) {
checkUnique = UNIQUE_CHECK_PARTIAL;
} else if (indexRelation->rd_index->indimmediate) {
checkUnique = UNIQUE_CHECK_YES;
} else {
checkUnique = UNIQUE_CHECK_PARTIAL;
}
satisfiesConstraint = index_insert(actualindex, /* index relation */
values, /* array of index Datums */
isnull, /* null flags */
tupleid, /* tid of heap tuple */
actualheap, /* heap relation */
checkUnique); /* type of uniqueness check to do */
/*
* If the index has an associated exclusion constraint, check that.
* This is simpler than the process for uniqueness checks since we
* always insert first and then check. If the constraint is deferred,
* we check now anyway, but don't throw error on violation; instead
* we'll queue a recheck event.
*
* An index for an exclusion constraint can't also be UNIQUE (not an
* essential property, we just don't allow it in the grammar), so no
* need to preserve the prior state of satisfiesConstraint.
*/
if (indexInfo->ii_ExclusionOps != NULL) {
bool errorOK = !actualindex->rd_index->indimmediate;
satisfiesConstraint = check_exclusion_constraint(
actualheap, actualindex, indexInfo, tupleid, values, isnull, estate, false, errorOK);
}
if ((checkUnique == UNIQUE_CHECK_PARTIAL || indexInfo->ii_ExclusionOps != NULL) && !satisfiesConstraint) {
/*
* The tuple potentially violates the uniqueness or exclusion
* constraint, so make a note of the index so that we can re-check
* it later. Speculative inserters are told if there was a
* speculative conflict, since that always requires a restart.
*/
result = lappend_oid(result, RelationGetRelid(indexRelation));
if (conflict != NULL) {
*conflict = true;
}
}
}
list_free_ext(partitionIndexOidList);
return result;
}
/*
* check attribute isnull constrain
*
* if we find a isnull attribute, return false, otherwise return true.
*/
bool check_attisnull_exist(ResultRelInfo* ResultRelInfo, TupleTableSlot* slot)
{
Relation rel = ResultRelInfo->ri_RelationDesc;
TupleDesc tupdesc = RelationGetDescr(rel);
TupleConstr* constr = tupdesc->constr;
if (constr->has_not_null) {
int natts = tupdesc->natts;
int attrChk;
for (attrChk = 1; attrChk <= natts; attrChk++) {
Form_pg_attribute att = TupleDescAttr(tupdesc, attrChk -1);
if (att->attnotnull && tableam_tslot_attisnull(slot, attrChk)) {
return false;
}
}
}
if (constr->num_check == 0) {
return true;
}
return false;
}
/*
* insert_real
*
* execute insert a slot to target relation
* state: ModifyTableState state of current plan
* slot: slot be inserted
* estate: execute state
* canSetTag: do we set the command tag/es_processed?
*/
TupleTableSlot* insert_real(ModifyTableState* state, TupleTableSlot* slot, EState* estate, bool canSetTag)
{
Tuple tuple = NULL;
ResultRelInfo* result_rel_info = NULL;
Relation result_relation_desc;
Oid new_id = InvalidOid;
List* recheck_indexes = NIL;
Partition partition = NULL;
Relation target_rel = NULL;
ItemPointer pTSelf = NULL;
bool rel_isblockchain = false;
int2 bucket_id = InvalidBktId;
#ifdef ENABLE_MULTIPLE_NDOES
RemoteQueryState* result_remote_rel = NULL;
#endif
/*
* get information on the (current) result relation
*/
result_rel_info = estate->es_result_relation_info;
result_relation_desc = result_rel_info->ri_RelationDesc;
rel_isblockchain = result_relation_desc->rd_isblockchain;
/*
* get the heap tuple out of the tuple table slot, making sure we have a
* writable copy
*/
tuple = tableam_tslot_get_tuple_from_slot(result_rel_info->ri_RelationDesc, slot);
#ifdef ENABLE_MULTIPLE_NDOES
result_remote_rel = (RemoteQueryState*)estate->es_result_remoterel;
#endif
/*
* If the result relation has OIDs, force the tuple's OID to zero so that
* heap_insert will assign a fresh OID. Usually the OID already will be
* zero at this point, but there are corner cases where the plan tree can
* return a tuple extracted literally from some table with the same
* rowtype.
*
* XXX if we ever wanted to allow users to assign their own OIDs to new
* rows, this'd be the place to do it. For the moment, we make a point of
* doing this before calling triggers, so that a user-supplied trigger
* could hack the OID if desired.
*/
/*
* Check the constraints of the tuple
*/
if (result_relation_desc->rd_att->constr) {
TupleTableSlot *tmp_slot = state->mt_insert_constr_slot == NULL ? slot : state->mt_insert_constr_slot;
if (likely(check_attisnull_exist(result_rel_info, tmp_slot))) {
/* do nothing */
} else if (!ExecConstraints(result_rel_info, tmp_slot, estate, true)) {
if (u_sess->utils_cxt.sql_ignore_strategy_val == SQL_OVERWRITE_NULL) {
tuple = ReplaceTupleNullCol(RelationGetDescr(result_relation_desc), tmp_slot);
/*
* Double check constraints in case that new val in column with not null constraints
* violated check constraints
*/
ExecConstraints(result_rel_info, tmp_slot, estate, true);
} else {
return NULL;
}
}
}
/*
* insert the tuple
* Note: heap_insert returns the tid (location) of the new tuple in
* the t_self field.
*/
new_id = InvalidOid;
bool isgpi = false;
ConflictInfoData conflictInfo;
Oid conflictPartOid = InvalidOid;
int2 conflictBucketid = InvalidBktId;
target_rel = result_rel_info->ri_RelationDesc;
// check unique constraints first if SQL has keyword IGNORE
if (estate->es_plannedstmt && estate->es_plannedstmt->hasIgnore &&
!ExecCheckIndexConstraints(slot, estate, target_rel, partition, &isgpi,
bucket_id, &conflictInfo, &conflictPartOid,
&conflictBucketid)) {
ereport(WARNING,
(errmsg("duplicate key value violates unique constraint in table \"%s\"",
RelationGetRelationName(target_rel))));
return NULL;
}
new_id = tableam_tuple_insert(target_rel, tuple, estate->es_output_cid, 0, NULL);
/* insert index entries for tuple */
if (result_rel_info->ri_NumIndices > 0) {
pTSelf = tableam_tops_get_t_self(target_rel, tuple);
recheck_indexes = ExecInsertIndexTuplesOpfusion(slot, pTSelf, estate, NULL, NULL,
InvalidBktId, NULL, NULL);
list_free_ext(recheck_indexes);
}
if (canSetTag) {
(estate->es_processed)++;
estate->es_lastoid = new_id;
}
return NULL;
}
unsigned long InsertSubFusion::ExecInsert(Relation rel, ResultRelInfo* result_rel_info)
{
/*******************
* step 1: prepare *
*******************/
Relation bucket_rel = NULL;
Partition part = NULL;
Relation partRel = NULL;
/* indicates whether it is the first time to insert, delete, update or not. */
bool is_first_modified = true;
init_gtt_storage(CMD_INSERT, result_rel_info);
ModifyTableState* node = m_c_local.m_mt_state;
EState* estate = m_c_local.m_estate;
estate->es_output_cid = GetCurrentCommandId(true);
int resultRelationNum = node->mt_ResultTupleSlots ? list_length(node->mt_ResultTupleSlots) : 1;
/************************
* step 2: begin insert *
************************/
SeqScanState* subPlanState = (SeqScanState*)m_c_local.m_sub_ps;
TupleTableSlot* last_slot = NULL;
Tuple tuple = NULL;
for (;;) {
result_rel_info = node->resultRelInfo + estate->result_rel_index;
estate->es_result_relation_info = result_rel_info;
ResetPerTupleExprContext(estate);
TupleTableSlot* slot = SeqNext(subPlanState);
if (TupIsNull(slot)) {
record_first_time();
break;
}
(void)insert_real(node, slot, estate, node->canSetTag);
last_slot = slot;
record_first_time();
if (estate->result_rel_index == resultRelationNum -1) {
estate->result_rel_index = 0;
} else {
estate->result_rel_index++;
}
}
uint64 nprocessed = estate->es_processed;
/****************
* step 3: done *
****************/
ExecReleaseResource(tuple, m_local.m_reslot, result_rel_info, m_c_local.m_estate, bucket_rel, rel, part, partRel);
return nprocessed;
}
void InsertSubFusion::InitPlan()
{
m_c_local.m_ps = ExecInitNode(m_c_local.m_plan, m_c_local.m_estate, 0);
ModifyTableState* node = castNode(ModifyTableState, m_c_local.m_ps);
m_c_local.m_mt_state = node;
}
bool InsertSubFusion::execute(long max_rows, char* completionTag)
{
bool success = false;
errno_t errorno = EOK;
/*******************
* step 1: prepare *
*******************/
ResultRelInfo* saved_result_rel_info = NULL;
ResultRelInfo* result_rel_info = NULL;
PlanState* subPlanState = NULL;
#ifdef ENABLE_MULTIPLE_NODES
PlanState* remote_rel_state = NULL;
PlanState* insert_remote_rel_state = NULL;
PlanState* update_remote_rel_state = NULL;
PlanState* delete_remote_rel_state = NULL;
PlanState* saved_result_remote_rel = NULL;
#endif
Relation rel = heap_open(m_global->m_reloid, RowExclusiveLock);
result_rel_info = makeNode(ResultRelInfo);
InitResultRelInfo(result_rel_info, rel, 1, 0);
m_c_local.m_estate->es_snapshot = GetActiveSnapshot();
m_c_local.m_estate->es_result_relation_info = result_rel_info;
m_c_local.m_estate->es_result_relations = result_rel_info;
InitPlan();
ModifyTableState* node = m_c_local.m_mt_state;
EState* estate = m_c_local.m_estate;
subPlanState = node->mt_plans[0];
#ifdef ENABLE_MULTIPLE_NODES
/* Initialize remote plan state */
remote_rel_state = node->mt_remoterels[0];
insert_remote_rel_state = node->mt_insert_remoterels[0];
update_remote_rel_state = node->mt_update_remoterels[0];
delete_remote_rel_state = node->mt_delete_remoterels[0];
#endif
/*
* es_result_relation_info must point to the currently active result
* relation while we are within this ModifyTable node. Even though
* ModifyTable nodes can't be nested statically, they can be nested
* dynamically (since our subplan could include a reference to a modifying
* CTE). So we have to save and restore the caller's value.
*/
saved_result_rel_info = estate->es_result_relation_info;
#ifdef ENABLE_MULTIPLE_NODES
saved_result_remote_rel = estate->es_result_remoterel;
#endif
#ifdef ENABLE_MULTIPLE_NODES
estate->es_result_remoterel = remote_rel_state;
estate->es_result_insert_remoterel = insert_remote_rel_state;
estate->es_result_update_remoterel = update_remote_rel_state;
estate->es_result_delete_remoterel = delete_remote_rel_state;
#endif
estate->es_plannedstmt = m_global->m_planstmt;
m_c_local.m_sub_ps = castNode(SeqScanState, subPlanState);
/************************
* step 2: begin insert *
************************/
unsigned long nprocessed = (this->*(m_global->m_exec_func_ptr))(rel, result_rel_info);
heap_close(rel, RowExclusiveLock);
/****************
* step 3: done *
****************/
success = true;
m_local.m_isCompleted = true;
if (m_local.m_ledger_hash_exist && !IsConnFromApp()) {
errorno = snprintf_s(completionTag, COMPLETION_TAG_BUFSIZE, COMPLETION_TAG_BUFSIZE - 1,
"INSERT 0 %ld %lu\0", nprocessed, m_local.m_ledger_relhash);
} else {
errorno =
snprintf_s(completionTag, COMPLETION_TAG_BUFSIZE, COMPLETION_TAG_BUFSIZE - 1, "INSERT 0 %ld", nprocessed);
}
securec_check_ss(errorno, "\0", "\0");
/* do some extra hand release, in case of resources leak */
ExecEndPlan(m_c_local.m_ps, m_c_local.m_estate);
FreeExecutorStateForOpfusion(m_c_local.m_estate);
u_sess->statement_cxt.current_row_count = nprocessed;
u_sess->statement_cxt.last_row_count = u_sess->statement_cxt.current_row_count;
return success;
}

View File

@ -71,6 +71,10 @@ const char* getBypassReason(FusionType result)
return "Bypass executed through delete fusion";
}
case INSERT_SUB_FUSION: {
return "Bypass executed through insert sub fusion";
}
case AGG_INDEX_FUSION: {
return "Bypass executed through agg fusion";
}
@ -281,6 +285,11 @@ const char* getBypassReason(FusionType result)
break;
}
case NOBYPASS_INSERT_SUB_FUSION_NOT_SUPPORT_PARTITION_BYPASS: {
return "Bypass not executed because insert sub fusion not support partition table bypass.";
break;
}
case NOBYPASS_GPC_NOT_SUPPORT_PARTITION_BYPASS: {
return "Bypass not executed because GPC not support partition table bypass.";
break;
@ -921,13 +930,19 @@ FusionType checkBaseResult(Plan* top_plan)
{
FusionType result = INSERT_FUSION;
ModifyTable *node = (ModifyTable *)top_plan;
if (!IsA(linitial(node->plans), BaseResult)) {
return NOBYPASS_NO_SIMPLE_INSERT;
}
BaseResult *base = (BaseResult *)linitial(node->plans);
if (base->plan.lefttree != NULL || base->plan.initPlan != NIL || base->resconstantqual != NULL) {
if (IsA(linitial(node->plans), SeqScan) && list_length(node->plans) == 1) {
result = INSERT_SUB_FUSION;
/* may be we need do some extra check here like BaseResult? */
} else if (IsA(linitial(node->plans), BaseResult)) {
BaseResult *base = (BaseResult *)linitial(node->plans);
if (base->plan.lefttree != NULL || base->plan.initPlan != NIL || base->resconstantqual != NULL) {
return NOBYPASS_NO_SIMPLE_INSERT;
}
} else {
return NOBYPASS_NO_SIMPLE_INSERT;
}
if (node->upsertAction != UPSERT_NONE) {
return NOBYPASS_UPSERT_NOT_SUPPORT;
}
@ -959,12 +974,12 @@ FusionType getInsertFusionType(List *stmt_list, ParamListInfo params)
#endif
/* check subquery num */
FusionType ttype = checkBaseResult(top_plan);
if (ttype > BYPASS_OK) {
return ttype;
/* we also check SeqScan here, so ftype may be INSERT_SUB_FUSION */
ftype = checkBaseResult(top_plan);
if (ftype > BYPASS_OK) {
return ftype;
}
ModifyTable *node = (ModifyTable *)top_plan;
BaseResult *base = (BaseResult *)linitial(node->plans);
/* check relation */
Index res_rel_idx = linitial_int((List*)linitial(plannedstmt->resultRelations));
@ -997,6 +1012,10 @@ FusionType getInsertFusionType(List *stmt_list, ParamListInfo params)
heap_close(rel, NoLock);
return NOBYPASS_PARTITION_BYPASS_NOT_OPEN;
}
if (RELATION_IS_PARTITIONED(rel) && ftype == INSERT_SUB_FUSION) {
heap_close(rel, AccessShareLock);
return NOBYPASS_PARTITION_BYPASS_NOT_OPEN;
}
if (RELATION_IS_PARTITIONED(rel) && ENABLE_GPC) {
heap_close(rel, NoLock);
return NOBYPASS_GPC_NOT_SUPPORT_PARTITION_BYPASS;
@ -1010,7 +1029,12 @@ FusionType getInsertFusionType(List *stmt_list, ParamListInfo params)
* check targetlist
* maybe expr type is FuncExpr because of type conversion.
*/
List *targetlist = base->plan.targetlist;
List *targetlist = NIL;
if (IsA(linitial(node->plans), SeqScan)) {
targetlist = ((SeqScan*)linitial(node->plans))->plan.targetlist;
} else {
targetlist = ((BaseResult*)linitial(node->plans))->plan.targetlist;
}
checkTargetlist(targetlist, &ftype);
return ftype;
}

View File

@ -300,6 +300,7 @@ void opfusion_ledger_ExecutorEnd(FusionType fusiontype, Oid relid, const char *q
case INSERT_FUSION:
case UPDATE_FUSION:
case DELETE_FUSION:
case INSERT_SUB_FUSION:
if (is_ledger_usertable(relid)) {
ledger_gchain_append(relid, query, relhash);
}

View File

@ -16,6 +16,7 @@
#include "nodes/execnodes.h"
extern TupleTableSlot* SeqNext(SeqScanState* node);
extern SeqScanState* ExecInitSeqScan(SeqScan* node, EState* estate, int eflags);
extern void ExecEndSeqScan(SeqScanState* node);
extern void ExecSeqMarkPos(SeqScanState* node);

View File

@ -72,4 +72,53 @@ private:
InsertFusionLocaleVariable m_c_local;
};
class InsertSubFusion : public OpFusion {
public:
InsertSubFusion(MemoryContext context, CachedPlanSource* psrc, List* plantree_list, ParamListInfo params);
~InsertSubFusion(){};
bool execute(long max_rows, char* completionTag);
void InitLocals(ParamListInfo params);
void InitGlobals();
void InitPlan();
private:
unsigned long ExecInsert(Relation rel, ResultRelInfo* resultRelInfo);
struct VarLoc {
int varNo;
int scanKeyIndx;
};
struct InsertSubFusionGlobalVariable {
int m_targetParamNum;
int m_targetConstNum;
ConstLoc* m_targetConstLoc;
int m_varNum;
VarLoc* m_targetVarLoc;
};
InsertSubFusionGlobalVariable* m_c_global;
struct InsertSubFusionLocaleVariable {
EState* m_estate; /* Top estate*/
Datum* m_curVarValue;
bool* m_curVarIsnull;
Plan* m_plan;
SeqScan* m_ss_plan;
PlanState* m_ps;
SeqScanState* m_sub_ps;
ModifyTableState* m_mt_state;
};
InsertSubFusionLocaleVariable m_c_local;
};
#endif /* SRC_INCLUDE_OPFUSION_OPFUSION_INSERT_H_ */

View File

@ -50,6 +50,7 @@ enum FusionType {
INSERT_FUSION,
UPDATE_FUSION,
DELETE_FUSION,
INSERT_SUB_FUSION,
AGG_INDEX_FUSION,
SORT_INDEX_FUSION,
@ -114,6 +115,7 @@ enum FusionType {
NOBYPASS_VERSION_SCAN_PLAN,
NOBYPASS_PARTITION_TYPE_NOT_SUPPORT,
NOBYPASS_REPLACE_NOT_SUPPORT,
NOBYPASS_INSERT_SUB_FUSION_NOT_SUPPORT_PARTITION_BYPASS,
NOBYPASS_GPC_NOT_SUPPORT_PARTITION_BYPASS
};

View File

@ -507,10 +507,11 @@ alter table test_drop_column_1 drop column c;
explain (verbose true, costs false) insert into test_drop_column_1 select * from test_drop_column_2;
QUERY PLAN
---------------------------------------------------------------------------
[Bypass]
Insert on public.test_drop_column_1
-> Seq Scan on public.test_drop_column_2
Output: test_drop_column_2.a, test_drop_column_2.b, NULL::integer
(3 rows)
(4 rows)
insert into test_drop_column_1 select * from test_drop_column_2;
explain (verbose true, costs false) insert into test_drop_column_1 select * from test_drop_column_2 order by 2;

View File

@ -20,50 +20,55 @@ analyze distribute_source_hash_02;
explain (verbose on, costs off) insert into distribute_target_hash_01 select * from distribute_source_hash_01;
QUERY PLAN
----------------------------------------------------------------------------------------------------------
[Bypass]
Insert on distribute_dml.distribute_target_hash_01
-> Seq Scan on distribute_dml.distribute_source_hash_01
Output: distribute_source_hash_01.c1, distribute_source_hash_01.c2, distribute_source_hash_01.c3
(3 rows)
(4 rows)
insert into distribute_target_hash_01 select * from distribute_source_hash_01;
-- hash source doesn't match target distribute keys
explain (verbose on, costs off) insert into distribute_target_hash_02 select * from distribute_source_hash_01;
QUERY PLAN
----------------------------------------------------------------------------------------------------------
[Bypass]
Insert on distribute_dml.distribute_target_hash_02
-> Seq Scan on distribute_dml.distribute_source_hash_01
Output: distribute_source_hash_01.c1, distribute_source_hash_01.c2, distribute_source_hash_01.c3
(3 rows)
(4 rows)
insert into distribute_target_hash_02 select * from distribute_source_hash_01;
-- replicate target, hashed source
explain (verbose on, costs off) insert into distribute_target_replication_01 select * from distribute_source_hash_01;
QUERY PLAN
----------------------------------------------------------------------------------------------------------
[Bypass]
Insert on distribute_dml.distribute_target_replication_01
-> Seq Scan on distribute_dml.distribute_source_hash_01
Output: distribute_source_hash_01.c1, distribute_source_hash_01.c2, distribute_source_hash_01.c3
(3 rows)
(4 rows)
insert into distribute_target_replication_01 select * from distribute_source_hash_01;
-- replicate source, hashed target
explain (verbose on, costs off) insert into distribute_target_hash_01 select * from distribute_source_replication_01;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------
[Bypass]
Insert on distribute_dml.distribute_target_hash_01
-> Seq Scan on distribute_dml.distribute_source_replication_01
Output: distribute_source_replication_01.c1, distribute_source_replication_01.c2, distribute_source_replication_01.c3
(3 rows)
(4 rows)
insert into distribute_target_hash_02 select * from distribute_source_replication_01;
-- replicate source, replicate target
explain (verbose on, costs off) insert into distribute_target_replication_01 select * from distribute_source_replication_01;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------
[Bypass]
Insert on distribute_dml.distribute_target_replication_01
-> Seq Scan on distribute_dml.distribute_source_replication_01
Output: distribute_source_replication_01.c1, distribute_source_replication_01.c2, distribute_source_replication_01.c3
(3 rows)
(4 rows)
insert into distribute_target_replication_01 select * from distribute_source_replication_01;
-- join source
@ -387,10 +392,11 @@ insert into stream_UI_diskey1 select generate_series(1,100) from src;
explain (verbose on, costs off) insert into stream_UI_diskey1 select b1 + 100 from stream_UI_diskey1;
QUERY PLAN
-------------------------------------------------------------
[Bypass]
Insert on distribute_dml.stream_ui_diskey1
-> Seq Scan on distribute_dml.stream_ui_diskey1
Output: (distribute_dml.stream_ui_diskey1.b1 + 100)
(3 rows)
(4 rows)
insert into stream_UI_diskey1 select b1 + 100 from stream_UI_diskey1;
select count(*) from stream_UI_diskey1 where b1 = 100;
@ -417,51 +423,57 @@ create table insert_tb2 (a int, b int, c int);
explain (costs off, verbose on) insert into insert_tb1 select * from insert_tb2;
QUERY PLAN
----------------------------------------------------------
[Bypass]
Insert on distribute_dml.insert_tb1
-> Seq Scan on distribute_dml.insert_tb2
Output: insert_tb2.a, insert_tb2.b, insert_tb2.c
(3 rows)
(4 rows)
--Need redistribute
explain (costs off, verbose on) insert into insert_tb1 (a, c) select a, c from insert_tb2;
QUERY PLAN
-----------------------------------------------------------
[Bypass]
Insert on distribute_dml.insert_tb1
-> Seq Scan on distribute_dml.insert_tb2
Output: insert_tb2.a, NULL::integer, insert_tb2.c
(3 rows)
(4 rows)
explain (costs off, verbose on) insert into insert_tb1 (b, c) select a, c from insert_tb2;
QUERY PLAN
-----------------------------------------------------------
[Bypass]
Insert on distribute_dml.insert_tb1
-> Seq Scan on distribute_dml.insert_tb2
Output: NULL::integer, insert_tb2.a, insert_tb2.c
(3 rows)
(4 rows)
explain (costs off, verbose on) insert into insert_tb1 (a, b, c) select b, a, c from insert_tb2;
QUERY PLAN
----------------------------------------------------------
[Bypass]
Insert on distribute_dml.insert_tb1
-> Seq Scan on distribute_dml.insert_tb2
Output: insert_tb2.b, insert_tb2.a, insert_tb2.c
(3 rows)
(4 rows)
explain (costs off, verbose on) insert into insert_tb1 (b, a, c) select b, a, c from insert_tb2;
QUERY PLAN
----------------------------------------------------------
[Bypass]
Insert on distribute_dml.insert_tb1
-> Seq Scan on distribute_dml.insert_tb2
Output: insert_tb2.a, insert_tb2.b, insert_tb2.c
(3 rows)
(4 rows)
explain (costs off, verbose on) insert into insert_tb1 (b, a, c) select a, b, c from insert_tb2;
QUERY PLAN
----------------------------------------------------------
[Bypass]
Insert on distribute_dml.insert_tb1
-> Seq Scan on distribute_dml.insert_tb2
Output: insert_tb2.b, insert_tb2.a, insert_tb2.c
(3 rows)
(4 rows)
drop table insert_tb1;
drop table insert_tb2;
@ -470,10 +482,11 @@ create table insert_tb2 (a int, b int, c int);
explain (costs off, verbose on) insert into insert_tb1 select * from insert_tb2;
QUERY PLAN
----------------------------------------------------------
[Bypass]
Insert on distribute_dml.insert_tb1
-> Seq Scan on distribute_dml.insert_tb2
Output: insert_tb2.a, insert_tb2.b, insert_tb2.c
(3 rows)
(4 rows)
explain (costs off, verbose on) delete from insert_tb1 where (a, b ,c) in (select * from insert_tb2);
QUERY PLAN
@ -546,34 +559,38 @@ create table insert_tb2 (a int, b int, c int);
explain (costs off, verbose on) insert into insert_tb1 select * from insert_tb2;
QUERY PLAN
----------------------------------------------------------
[Bypass]
Insert on distribute_dml.insert_tb1
-> Seq Scan on distribute_dml.insert_tb2
Output: insert_tb2.a, insert_tb2.b, insert_tb2.c
(3 rows)
(4 rows)
explain (costs off, verbose on) insert into insert_tb1 (a, b, c) select b, a, c from insert_tb2;
QUERY PLAN
----------------------------------------------------------
[Bypass]
Insert on distribute_dml.insert_tb1
-> Seq Scan on distribute_dml.insert_tb2
Output: insert_tb2.b, insert_tb2.a, insert_tb2.c
(3 rows)
(4 rows)
explain (costs off, verbose on) insert into insert_tb1 (b, a, c) select b, a, c from insert_tb2;
QUERY PLAN
----------------------------------------------------------
[Bypass]
Insert on distribute_dml.insert_tb1
-> Seq Scan on distribute_dml.insert_tb2
Output: insert_tb2.a, insert_tb2.b, insert_tb2.c
(3 rows)
(4 rows)
explain (costs off, verbose on) insert into insert_tb2 select * from insert_tb1;
QUERY PLAN
----------------------------------------------------------
[Bypass]
Insert on distribute_dml.insert_tb2
-> Seq Scan on distribute_dml.insert_tb1
Output: insert_tb1.a, insert_tb1.b, insert_tb1.c
(3 rows)
(4 rows)
drop table insert_tb1;
drop table insert_tb2;
@ -729,10 +746,11 @@ explain (verbose on, costs off)
insert into pg_description select * from tmp_description;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------
[Bypass]
Insert on pg_catalog.pg_description
-> Seq Scan on distribute_dml.tmp_description
Output: tmp_description.objoid, tmp_description.classoid, tmp_description.objsubid, tmp_description.description
(3 rows)
(4 rows)
drop table tmp_description;
--data distribute skew functions and view

View File

@ -4094,10 +4094,11 @@ insert into openGauss9999999_t1 values (1,1),(20,20);
explain analyze create table openGauss9999999_t2 as select * from openGauss9999999_t1;
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------
[Bypass]
--? Insert on opengauss9999999_t2 (cost=.* rows=.* width=8) (actual time=.* rows=2 loops=1)
--? -> Seq Scan on opengauss9999999_t1 (cost=.* rows=.* width=8) (actual time=.* rows=2 loops=1)
--? Total runtime: .* ms
(3 rows)
(4 rows)
drop table openGauss9999999_t2;
drop table openGauss9999999_t1;

View File

@ -37,18 +37,20 @@ CONTEXT: PL/pgSQL function test_explain_param(integer) line 2 at SQL statement
explain (costs off, analyze on) insert into explain_table_01 select * from explain_table_01;
--?.*
--?.*
[Bypass]
--? Insert on explain_table_01 (actual time=.* rows=5 loops=1)
--? -> Seq Scan on explain_table_01 (actual time=[\d+.\d+\,\d+\.\d+]\.\.[\d+\.\d+\,\d+\.\d+] rows=5 loops=1)
--? Total runtime:.*
(3 rows)
(4 rows)
explain (costs off, analyze on) insert into explain_table_01 select * from explain_table_02;
--?.*
--?.*
[Bypass]
--? Insert on explain_table_01 (actual time=.* rows=4 loops=1)
--? -> Seq Scan on explain_table_02 (actual time=[\d+.\d+\,\d+\.\d+]\.\.[\d+\.\d+\,\d+\.\d+] rows=4 loops=1)
--? Total runtime:.*
(3 rows)
(4 rows)
explain (costs off, analyze on) insert into explain_table_03 select * from explain_table_03;
--?.*

View File

@ -29,10 +29,11 @@ create table aa_t2 (id int, num int);
explain analyze insert into aa_t2 select * from aa_t1;
QUERY PLAN
---------------------------------------------------------------------------------------------------------
[Bypass]
--? Insert on aa_t2 (cost=.* rows=2149 width=8) (actual time=.* rows=2 loops=1)
--? -> Seq Scan on aa_t1 (cost=.* rows=2149 width=8) (actual time=.* rows=2 loops=1)
--? Total runtime: .* ms
(3 rows)
(4 rows)
select relname, reltuples, relpages from pg_class where relname='aa_t1' or relname='aa_t2' order by relname;
relname | reltuples | relpages
@ -45,10 +46,11 @@ select relname, reltuples, relpages from pg_class where relname='aa_t1' or relna
explain analyze insert into aa_t2 select * from aa_t1;
QUERY PLAN
---------------------------------------------------------------------------------------------------------
[Bypass]
--? Insert on aa_t2 (cost=.* rows=2149 width=8) (actual time=.* rows=2 loops=1)
--? -> Seq Scan on aa_t1 (cost=.* rows=2149 width=8) (actual time=.* rows=2 loops=1)
--? Total runtime: .* ms
(3 rows)
(4 rows)
select relname, reltuples, relpages from pg_class where relname='aa_t1' or relname='aa_t2' order by relname;
relname | reltuples | relpages
@ -62,10 +64,11 @@ select relname, reltuples, relpages from pg_class where relname='aa_t1' or relna
explain analyze create table aa_t3 as select * from aa_t2;
QUERY PLAN
---------------------------------------------------------------------------------------------------------
[Bypass]
--? Insert on aa_t3 (cost=.* rows=2149 width=8) (actual time=.* rows=4 loops=1)
--? -> Seq Scan on aa_t2 (cost=.* rows=2149 width=8) (actual time=.* rows=4 loops=1)
--? Total runtime: .* ms
(3 rows)
(4 rows)
select relname, reltuples, relpages from pg_class where relname='aa_t1' or relname='aa_t2' or relname='aa_t3' order by relname;
relname | reltuples | relpages
@ -303,10 +306,11 @@ start transaction;
explain analyze create table aa_t3 as select * from aa_t2;
QUERY PLAN
---------------------------------------------------------------------------------------------------------
[Bypass]
--? Insert on aa_t3 (cost=.* rows=2149 width=8) (actual time=.* rows=2 loops=1)
--? -> Seq Scan on aa_t2 (cost=.* rows=2149 width=8) (actual time=.* rows=2 loops=1)
--? Total runtime: .* ms
(3 rows)
(4 rows)
-- aa_t3 no
explain analyze select num, count(*) from aa_t3 group by num;

View File

@ -4092,10 +4092,11 @@ insert into TESTTABLE_t1 values (1,1),(20,20);
explain analyze create table TESTTABLE_t2 as select * from TESTTABLE_t1;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------
[Bypass]
--?Insert on testtable_t2 (cost=0.00..31.49 rows=2149 width=8) (actual time=.* rows=2 loops=1)
--? -> Seq Scan on testtable_t1 (cost=0.00..31.49 rows=2149 width=8) (actual time=.* rows=2 loops=1)
--? Total runtime: .* ms
(3 rows)
(4 rows)
drop table TESTTABLE_t2;
drop table TESTTABLE_t1;

View File

@ -21,10 +21,11 @@ analyze t2;
:EXP insert into t1 select c1, c2 from t2 where c2 = 1;
QUERY PLAN
--------------------------
[Bypass]
Insert on t1
-> Seq Scan on t2
Filter: (c2 = 1)
(3 rows)
(4 rows)
--- Scan
:EXP insert /*+ indexscan(t2) */ into t1 select c1, c2 from t2 where c2 = 1;
@ -90,10 +91,11 @@ set plan_cache_mode = force_custom_plan;
:EXP execute insert_g(2);
QUERY PLAN
---------------------------
[Bypass]
Insert on t1
-> Seq Scan on t2
Filter: (c2 = $1)
(3 rows)
(4 rows)
prepare insert_c as insert /*+ use_cplan */ into t1 select c1, c2 from t2 where c2 = $1;
set plan_cache_mode = force_generic_plan;

View File

@ -315,12 +315,13 @@ begin;
explain (verbose on, analyze on) create table t4 as select * from t1 where t1.a = 1;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------
[Bypass]
--? Insert on schema_plan_hint.t4 (cost=0.00..1.01 rows=1 width=12) (actual time=.* rows=1 loops=1)
--? -> Seq Scan on schema_plan_hint.t1 (cost=0.00..1.01 rows=1 width=12) (actual time=.* rows=1 loops=1)
Output: t1.a, t1.b, t1.c
Filter: (t1.a = 1)
--?.*
(5 rows)
(6 rows)
rollback;
--- with hint

View File

@ -104,10 +104,11 @@ ERROR: EXPLAIN SELECT INTO requires ANALYZE
EXPLAIN ANALYZE SELECT * INTO select_into_tbl FROM tmp_table;
--?.*
--?.*
[Bypass]
--? Insert on select_into_tbl (cost=0.00..31.49 rows=2149 width=8) (actual time=.* rows=2 loops=1)
--? -> Seq Scan on tmp_table (cost=0.00..31.49 rows=2149 width=8) (actual time=.* rows=2 loops=1)
--? Total runtime: .*
(3 rows)
(4 rows)
SELECT count(*) FROM select_into_tbl;
count
@ -121,10 +122,11 @@ ERROR: EXPLAIN CREATE TABLE AS SELECT requires ANALYZE
EXPLAIN ANALYZE CREATE TABLE ctas_tbl AS SELECT * FROM tmp_table;
--?.*
--?.*
[Bypass]
--? Insert on ctas_tbl (cost=0.00..31.49 rows=2149 width=8) (actual time=.* rows=2 loops=1)
--? -> Seq Scan on tmp_table (cost=0.00..31.49 rows=2149 width=8) (actual time=.* rows=2 loops=1)
--? Total runtime: .*
(3 rows)
(4 rows)
SELECT count(*) FROM ctas_tbl;
count

View File

@ -76,10 +76,11 @@ explain (costs on, verbose on) select * from (select name from regex_1 union sel
explain (costs on, verbose on) insert into regex_1 (select * from regex_2);
QUERY PLAN
------------------------------------------------------------------------
[Bypass]
--? Insert on public.regex_1 \(cost=[0-9]*\.[0-9]*\.\.[0-9]*\.[0-9]* rows=[0-9]* width=[0-9]*\)
--? -> Seq Scan on public.regex_2 \(cost=[0-9]*\.[0-9]*\.\.[0-9]*\.[0-9]* rows=[0-9]* width=[0-9]*\)
Output: regex_2.id, regex_2.name, regex_2.value
(3 rows)
(4 rows)
set enable_hashjoin=on;
set enable_nestloop=off;