REPLACE INTO feature for M gram mode

commit all files for replace into
This commit is contained in:
zongwei
2022-09-08 01:37:47 -04:00
parent b4215eeb05
commit bdd6a6d933
22 changed files with 1009 additions and 23 deletions

View File

@ -107,6 +107,7 @@ extern void FlushInsertSelectBulk(
extern void FlushErrorInfo(Relation rel, EState* estate, ErrorCacheEntry* cache);
extern void HeapInsertCStore(Relation relation, ResultRelInfo* resultRelInfo, HeapTuple tup, int option);
extern void HeapDeleteCStore(Relation relation, ItemPointer tid, Oid tableOid, Snapshot snapshot);
extern Oid pg_get_serial_sequence_oid(text* tablename, text* columnname);
#ifdef ENABLE_MULTIPLE_NODES
extern void HeapInsertTsStore(Relation relation, ResultRelInfo* resultRelInfo, HeapTuple tup, int option);
#endif /* ENABLE_MULTIPLE_NODES */
@ -1465,13 +1466,35 @@ TupleTableSlot* ExecInsertT(ModifyTableState* state, TupleTableSlot* slot, Tuple
* insert index entries for tuple
*/
pTSelf = tableam_tops_get_t_self(result_relation_desc, tuple);
if (result_rel_info->ri_NumIndices > 0 && !RelationIsColStore(result_relation_desc))
recheck_indexes = ExecInsertIndexTuples(slot,
if (result_rel_info->ri_NumIndices > 0 && !RelationIsColStore(result_relation_desc)) {
if (state->isReplace) {
bool specConflict = false;
recheck_indexes = ExecInsertIndexTuples(slot,
pTSelf,
estate,
RELATION_IS_PARTITIONED(result_relation_desc) ? heap_rel : NULL,
RELATION_IS_PARTITIONED(result_relation_desc) ? partition : NULL,
bucket_id, NULL, NULL);
bucket_id, &specConflict, NULL);
if (specConflict) {
state->isConflict = true;
tableam_tuple_abort_speculative(target_rel, tuple);
list_free(recheck_indexes);
return slot;
} else {
state->isConflict = false;
}
} else {
recheck_indexes = ExecInsertIndexTuples(slot,
pTSelf,
estate,
RELATION_IS_PARTITIONED(result_relation_desc) ? heap_rel : NULL,
RELATION_IS_PARTITIONED(result_relation_desc) ? partition : NULL,
bucket_id, NULL, NULL);
}
} else {
state->isConflict = false;
}
}
}
if (pTSelf == NULL) {
@ -1594,7 +1617,7 @@ TupleTableSlot* ExecDelete(ItemPointer tupleid, Oid deletePartitionOid, int2 buc
oldtuple,
#endif
tupleid);
if (!dodelete) /* "do nothing" */
if (!dodelete && !node->isReplace) /* "do nothing" */
return NULL;
}
@ -3120,6 +3143,127 @@ uint64 GetDeleteLimitCount(ExprContext* econtext, PlanState* scan, Limit *limitP
return (uint64)iCount;
}
static TupleTableSlot* ExecReplace(EState* estate, ModifyTableState* node, TupleTableSlot* slot, TupleTableSlot* plan_slot, int2 bucketid, int hi_options, List* partition_list)
{
Form_pg_attribute att_tup;
Oid seqOid = InvalidOid;
Relation rel = estate->es_result_relation_info->ri_RelationDesc;
TupleTableSlot* (*ExecInsert)(
ModifyTableState* state, TupleTableSlot*, TupleTableSlot*, EState*, bool, int, List**) = NULL;
ExecInsert = ExecInsertT<false>;
/* REPLACE INTO not support sequence, check columns to report ERROR */
for (int attrno=1; attrno<=rel->rd_att->natts; ++attrno) {
att_tup = rel->rd_att->attrs[attrno-1];
errno_t rc;
char *nspname = get_namespace_name(rel->rd_rel->relnamespace);
char tableName[NAMEDATALEN*2+2] = {0};
text *tbname;
text *attname;
rc = memset_s(tableName, NAMEDATALEN*2, 0, NAMEDATALEN*2);
securec_check(rc, "\0", "\0");
if (nspname != NULL) {
const char *sym = ".";
int nspnameLen = strlen(nspname);
int symLen = strlen(sym);
int tbnameLen = strlen(rel->rd_rel->relname.data);
rc = memcpy_s(&tableName[0], NAMEDATALEN*2+2, nspname, nspnameLen);
securec_check(rc, "\0","\0");
rc = memcpy_s(&tableName[nspnameLen], NAMEDATALEN*2+2, sym, symLen);
securec_check(rc, "\0","\0");
rc = memcpy_s(&tableName[nspnameLen+symLen], NAMEDATALEN*2+2, rel->rd_rel->relname.data, tbnameLen);
securec_check(rc, "\0","\0");
tableName[nspnameLen+symLen+tbnameLen] = 0;
tbname = cstring_to_text(&tableName[0]);
} else {
tbname = cstring_to_text(rel->rd_rel->relname.data);
}
attname = cstring_to_text(att_tup->attname.data);
seqOid = pg_get_serial_sequence_oid(tbname, attname);
if (OidIsValid(seqOid))
elog(ERROR, "REPLACE can not work on sequence!");
}
/* set flag to start loop */
node->isConflict = true;
/* we assume there is conflict by default, try to delete the conflict tuple then insert new one */
while (node->isConflict) {
ConflictInfoData conflictInfo;
Oid conflictPartOid = InvalidOid;
int2 conflictBucketid = InvalidBktId;
bool isgpi = false;
Oid partitionid = InvalidOid;
Oid subPartitionId = InvalidOid;
Relation targetrel = NULL;
Relation heaprel = NULL;
Relation subPartRel = NULL;
Partition partition = NULL;
Partition subPart = NULL;
Tuple tuple = NULL;
ResultRelInfo* resultRelInfo = NULL;
slot = plan_slot;
resultRelInfo = estate->es_result_relation_info;
targetrel = resultRelInfo->ri_RelationDesc;
heaprel = targetrel;
tuple = tableam_tslot_get_tuple_from_slot(targetrel, slot);
if (RelationIsPartitioned(targetrel)) {
partitionid = heapTupleGetPartitionId(targetrel, tuple);
searchFakeReationForPartitionOid(estate->esfRelations,
estate->es_query_cxt,
targetrel,
partitionid,
heaprel,
partition,
RowExclusiveLock);
if (RelationIsSubPartitioned(targetrel)) {
subPartitionId = heapTupleGetPartitionId(heaprel, tuple);
searchFakeReationForPartitionOid(estate->esfRelations,
estate->es_query_cxt,
heaprel,
subPartitionId,
subPartRel,
subPart,
RowExclusiveLock);
partitionid = subPartitionId;
heaprel = subPartRel;
partition = subPart;
}
}
targetrel = heaprel;
if (!ExecCheckIndexConstraints(plan_slot, estate, targetrel,
partition, &isgpi, bucketid, &conflictInfo,
&conflictPartOid, &conflictBucketid)) {
ExecDelete(&(&conflictInfo)->conflictTid, conflictPartOid,
conflictBucketid, NULL, plan_slot, &node->mt_epqstate,
node, node->canSetTag);
InstrCountFiltered2(&node->ps, 1);
} else {
slot = ExecInsert(node, slot, plan_slot, estate, node->canSetTag, hi_options, &partition_list);
}
}
}
static TupleTableSlot* FetchPlanSlot(ModifyTableState* node, PlanState* subPlanState)
{
EState* estate = node->ps.state;
if (estate->result_rel_index > 0) {
return ExecProject(node->mt_ProjInfos[estate->result_rel_index], NULL);
} else {
return ExecProcNode(subPlanState);
}
}
/* ----------------------------------------------------------------
* ExecModifyTable
*
@ -3224,7 +3368,7 @@ TupleTableSlot* ExecModifyTable(ModifyTableState* node)
#endif
if (operation == CMD_INSERT) {
if (node->ps.type == T_ModifyTableState || node->mt_upsert->us_action != UPSERT_NONE ||
if (node->ps.type == T_ModifyTableState || node->mt_upsert->us_action != UPSERT_NONE || node->isReplace ||
(result_rel_info->ri_TrigDesc != NULL && (result_rel_info->ri_TrigDesc->trig_insert_before_row ||
result_rel_info->ri_TrigDesc->trig_insert_instead_row)))
ExecInsert = ExecInsertT<false>;
@ -3456,7 +3600,11 @@ TupleTableSlot* ExecModifyTable(ModifyTableState* node)
#endif
switch (operation) {
case CMD_INSERT:
slot = ExecInsert(node, slot, plan_slot, estate, node->canSetTag, hi_options, &partition_list);
if (!node->isReplace) {
slot = ExecInsert(node, slot, plan_slot, estate, node->canSetTag, hi_options, &partition_list);
} else {
slot = ExecReplace(estate, node, slot, plan_slot, bucketid, hi_options, partition_list);
}
break;
case CMD_UPDATE:
slot = ExecUpdate(tuple_id,
@ -3650,6 +3798,7 @@ ModifyTableState* ExecInitModifyTable(ModifyTable* node, EState* estate, int efl
upsertState->us_updateproj = NULL;
upsertState->us_updateWhere = NIL;
mt_state->mt_upsert = upsertState;
mt_state->isReplace = node->isReplace;
mt_state->fireBSTriggers = true;
@ -3722,7 +3871,7 @@ ModifyTableState* ExecInitModifyTable(ModifyTable* node, EState* estate, int efl
result_rel_info->ri_FdwRoutine->GetFdwType() != MOT_ORC) {
#endif
if (RelationIsUstoreFormat(result_rel_info->ri_RelationDesc) || operation != CMD_DELETE) {
ExecOpenIndices(result_rel_info, (node->upsertAction != UPSERT_NONE ||
ExecOpenIndices(result_rel_info, (node->upsertAction != UPSERT_NONE || node->isReplace ||
(estate->es_plannedstmt && estate->es_plannedstmt->hasIgnore)));
}
#ifdef ENABLE_MOT
@ -3854,7 +4003,7 @@ ModifyTableState* ExecInitModifyTable(ModifyTable* node, EState* estate, int efl
* If needed, Initialize target list, projection and qual for DUPLICATE KEY UPDATE
*/
result_rel_info = mt_state->resultRelInfo;
if (node->upsertAction == UPSERT_UPDATE) {
if (node->upsertAction == UPSERT_UPDATE ||node->isReplace) {
ExprContext* econtext = NULL;
ExprState* setexpr = NULL;
TupleDesc tupDesc;