UPSERT code

This commit is contained in:
gentle_hu
2020-08-22 14:01:44 +08:00
committed by gentle_hu
parent ca5f135273
commit e936f40df7
60 changed files with 1836 additions and 328 deletions

View File

@ -248,6 +248,277 @@ static TupleTableSlot* ExecProcessReturning(
return ExecProject(projectReturning, NULL);
}
static void ExecCheckHeapTupleVisible(EState* estate, HeapTuple tuple, Buffer buffer)
{
if (!IsolationUsesXactSnapshot())
return;
if (!HeapTupleSatisfiesVisibility(tuple, estate->es_snapshot, buffer))
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("could not serialize access due to concurrent update")));
}
static void ExecCheckTIDVisible(EState* estate, Relation rel, ItemPointer tid)
{
Buffer buffer;
HeapTupleData tuple;
/* check isolation level to tell if tuple visibility check is needed */
if (!IsolationUsesXactSnapshot()) {
return;
}
tuple.t_self = *tid;
if (!heap_fetch(rel, SnapshotAny, &tuple, &buffer, false, NULL)) {
ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("failed to fetch conflicting tuple for DUPLICATE KEY UPDATE")));
}
ExecCheckHeapTupleVisible(estate, &tuple, buffer);
ReleaseBuffer(buffer);
}
static bool ExecConflictUpdate(ModifyTableState* mtstate, ResultRelInfo* resultRelInfo, ItemPointer conflictTid,
TupleTableSlot* planSlot, TupleTableSlot* excludedSlot, EState* estate, Relation targetRel,
Oid oldPartitionOid, int2 bucketid, bool canSetTag, TupleTableSlot** returning)
{
ExprContext* econtext = mtstate->ps.ps_ExprContext;
Relation relation = targetRel;
UpsertState* upsertState = mtstate->mt_upsert;
HeapTupleData tuple;
HTSU_Result test;
Buffer buffer;
ItemPointerData update_ctid;
TransactionId update_xmax;
tuple.t_self = *conflictTid;
test = heap_lock_tuple(relation, &tuple, &buffer,
&update_ctid, &update_xmax,
estate->es_output_cid, LockTupleExclusive, false);
checktest:
switch (test) {
case HeapTupleMayBeUpdated:
/* success */
break;
case HeapTupleSelfCreated:
/*
* This can occur when a just inserted tuple is updated again in
* the same command. E.g. because multiple rows with the same
* conflicting key values are inserted using STREAM:
* INSERT INTO t VALUES(1),(1) ON DUPLICATE KEY UPDATE ...
*
* This is somewhat similar to the ExecUpdate()
* HeapTupleSelfUpdated case. We do not want to proceed because
* it would lead to the same row being updated a second time in
* some unspecified order, and in contrast to plain UPDATEs
* there's no historical behavior to break.
*
* It is the user's responsibility to prevent this situation from
* occurring. These problems are why SQL-2003 similarly specifies
* that for SQL MERGE, an exception must be raised in the event of
* an attempt to update the same row twice.
*
* However, in order by be compatible with SQL, we have to break the
* rule and update the same row which is created within the command.
*/
ReleaseBuffer(buffer);
#ifdef ENABLE_MULTIPLE_NODES
if (!(u_sess->attr.attr_sql.sql_compatibility & DB_CMPT_C)) {
ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("ON DUPLICATE KEY UPDATE command cannot affect row a second time"),
errhint("Ensure that no rows proposed for insertion within"
"the same command have duplicate constrained values.")));
}
#endif
test = heap_lock_tuple(relation, &tuple, &buffer, &update_ctid, &update_xmax,
estate->es_output_cid, LockTupleExclusive, false, true);
Assert(test != HeapTupleSelfCreated);
goto checktest;
break;
case HeapTupleSelfUpdated:
ReleaseBuffer(buffer);
/*
* This state should never be reached. As a dirty snapshot is used
* to find conflicting tuples, speculative insertion wouldn't have
* seen this row to conflict with.
*/
ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("unexpected self-updated tuple")));
break;
case HeapTupleUpdated:
ReleaseBuffer(buffer);
if (IsolationUsesXactSnapshot()) {
ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("could not serialize access due to concurrent update")));
}
/*
* Tell caller to try again from the very start.
* It does not make sense to use the usual EvalPlanQual() style
* loop here, as the new version of the row might not conflict
* anymore, or the conflicting tuple has actually been deleted.
*/
return false;
case HeapTupleBeingUpdated:
ReleaseBuffer(buffer);
ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("unexpected concurrent update tuple")));
break;
default:
ReleaseBuffer(buffer);
elog(ERROR, "unrecognized heap_lock_tuple status: %u", test);
break;
}
/*
* Success, the tuple is locked.
*
* Reset per-tuple memory context to free any expression evaluation
* storage allocated in the previous cycle.
*/
ResetExprContext(econtext);
/* NOTE: we rely on ExecUpdate() to do MVCC snapshot check, thus projection is
* done here although the final ExecUpdate might be failed.
*/
ExecCheckHeapTupleVisible(estate, &tuple, buffer);
/* Store target's existing tuple in the state's dedicated slot */
ExecStoreTuple(&tuple, upsertState->us_existing, buffer, false);
/*
* Make tuple and any needed join variables available to ExecQual and
* ExecProject. The EXCLUDED tuple is installed in ecxt_innertuple, while
* the target's existing tuple is installed in the scantuple. EXCLUDED has
* been made to reference INNER_VAR in setrefs.c, but there is no other redirection.
*/
econtext->ecxt_scantuple = upsertState->us_existing;
econtext->ecxt_innertuple = excludedSlot;
econtext->ecxt_outertuple = NULL;
ExecProject(resultRelInfo->ri_updateProj, NULL);
*returning = ExecUpdate(conflictTid, oldPartitionOid, bucketid, NULL,
upsertState->us_updateproj, planSlot, &mtstate->mt_epqstate,
mtstate, canSetTag, false);
ReleaseBuffer(buffer);
return true;
}
static Oid ExecUpsert(ModifyTableState* state, TupleTableSlot* slot, TupleTableSlot* planSlot, EState* estate,
bool canSetTag, HeapTuple tuple, TupleTableSlot** returning, bool* updated)
{
Oid newid = InvalidOid;
bool specConflict;
List* recheckIndexes = NIL;
ResultRelInfo* resultRelInfo = NULL;
Relation resultRelationDesc = NULL;
Relation heaprel = NULL; /* actual relation to upsert index */
Relation targetrel = NULL; /* actual relation to upsert tuple */
Oid partitionid = InvalidOid; /* bucket id for bucket hash table */
Partition partition = NULL; /* partition info for partition table */
int2 bucketid = InvalidBktId;
ItemPointerData conflictTid;
UpsertState* upsertState = state->mt_upsert;
*updated = false;
/*
* get information on the (current) result relation
*/
resultRelInfo = estate->es_result_relation_info;
resultRelationDesc = resultRelInfo->ri_RelationDesc;
heaprel = resultRelationDesc;
if (unlikely(RelationIsCUFormat(resultRelationDesc))) {
ereport(ERROR,
(errmodule(MOD_EXECUTOR),
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("ON DUPLICATE KEY UPDATE is not supported on column orientated table"))));
}
if (unlikely(RelationIsPAXFormat(resultRelationDesc))) {
ereport(ERROR,
(errmodule(MOD_EXECUTOR),
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("ON DUPLICATE KEY UPDATE is not supported on DFS table"))));
}
if (RelationIsPartitioned(resultRelationDesc)) {
partitionid = heapTupleGetPartitionId(resultRelationDesc, tuple);
searchFakeReationForPartitionOid(estate->esfRelations,
estate->es_query_cxt,
resultRelationDesc,
partitionid,
heaprel,
partition,
RowExclusiveLock);
}
targetrel = heaprel;
if (RELATION_OWN_BUCKET(resultRelationDesc)) {
bucketid = computeTupleBucketId(resultRelationDesc, tuple);
if (unlikely(bucketid != InvalidBktId)) {
searchHBucketFakeRelation(estate->esfRelations, estate->es_query_cxt, heaprel, bucketid, targetrel);
}
}
vlock:
specConflict = false;
if (!ExecCheckIndexConstraints(slot, estate, targetrel, partition, bucketid, &conflictTid)) {
/* committed conflict tuple found */
if (upsertState->us_action == UPSERT_UPDATE) {
/*
* In case of DUPLICATE KEY UPDATE, execute the UPDATE part.
* Be prepared to retry if the UPDATE fails because
* of another concurrent UPDATE/DELETE to the conflict tuple.
*/
*returning = NULL;
if (ExecConflictUpdate(state, resultRelInfo, &conflictTid, planSlot, slot, estate, targetrel, partitionid,
bucketid, canSetTag, returning)) {
InstrCountFiltered2(&state->ps, 1);
*updated = true;
return InvalidOid;
} else {
goto vlock;
}
} else {
/*
* In case of DUPLICATE UPDATE NOTHING, do nothing.
* However, verify that the tuple is visible to the
* executor's MVCC snapshot at higher isolation levels.
*/
Assert(upsertState->us_action == UPSERT_NOTHING);
ExecCheckTIDVisible(estate, targetrel, &conflictTid);
InstrCountFiltered2(&state->ps, 1);
*updated = true;
return InvalidOid;
}
}
/* insert the tuple */
newid = heap_insert(targetrel, tuple, estate->es_output_cid, 0, NULL);
/* insert index entries for tuple */
recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self), estate, heaprel,
partition, bucketid, &specConflict);
/* other transaction commit index insertion before us,
* then abort the tuple and try to find the conflict tuple again
*/
if (specConflict) {
heap_abort_speculative(targetrel, tuple);
list_free(recheckIndexes);
goto vlock;
}
return newid;
}
/* ----------------------------------------------------------------
* ExecInsert
*
@ -307,9 +578,12 @@ TupleTableSlot* ExecInsertT(ModifyTableState* state, TupleTableSlot* slot, Tuple
if (result_relation_desc->rd_rel->relhasoids)
HeapTupleSetOid(tuple, InvalidOid);
/* BEFORE ROW INSERT Triggers */
if (state->operation != CMD_MERGE && result_rel_info->ri_TrigDesc &&
result_rel_info->ri_TrigDesc->trig_insert_before_row) {
/* BEFORE ROW INSERT Triggers
* Note: We fire BEFORE ROW TRIGGERS for every attempted insertion in an except
* for a MERGE or INSERT ... ON DUPLICATE KEY UPDATE statement.
*/
if (state->operation != CMD_MERGE &&
result_rel_info->ri_TrigDesc && result_rel_info->ri_TrigDesc->trig_insert_before_row) {
slot = ExecBRInsertTriggers(estate, result_rel_info, slot);
if (slot == NULL) /* "do nothing" */
return NULL;
@ -318,9 +592,12 @@ TupleTableSlot* ExecInsertT(ModifyTableState* state, TupleTableSlot* slot, Tuple
tuple = ExecMaterializeSlot(slot);
}
/* INSTEAD OF ROW INSERT Triggers */
if (state->operation != CMD_MERGE && result_rel_info->ri_TrigDesc &&
result_rel_info->ri_TrigDesc->trig_insert_instead_row) {
/* INSTEAD OF ROW INSERT Triggers
* Note: We fire INSREAD OF ROW TRIGGERS for every attempted insertion except
* for a MERGE or INSERT ... ON DUPLICATE KEY UPDATE statement.
*/
if (state->operation != CMD_MERGE &&
result_rel_info->ri_TrigDesc && result_rel_info->ri_TrigDesc->trig_insert_instead_row) {
slot = ExecIRInsertTriggers(estate, result_rel_info, slot);
if (slot == NULL) /* "do nothing" */
return NULL;
@ -449,6 +726,14 @@ TupleTableSlot* ExecInsertT(ModifyTableState* state, TupleTableSlot* slot, Tuple
}
ExecDropSingleTupleTableSlot(tmp_slot);
} else if (state->mt_upsert->us_action != UPSERT_NONE && result_rel_info->ri_NumIndices > 0) {
TupleTableSlot* returning = NULL;
bool updated = false;
new_id = InvalidOid;
new_id = ExecUpsert(state, slot, planSlot, estate, canSetTag, tuple, &returning, &updated);
if (updated) {
return returning;
}
} else {
/*
* insert the tuple
@ -524,7 +809,7 @@ TupleTableSlot* ExecInsertT(ModifyTableState* state, TupleTableSlot* slot, Tuple
estate,
RELATION_IS_PARTITIONED(result_relation_desc) ? heap_rel : NULL,
RELATION_IS_PARTITIONED(result_relation_desc) ? partition : NULL,
bucket_id);
bucket_id, NULL);
}
}
@ -539,7 +824,10 @@ TupleTableSlot* ExecInsertT(ModifyTableState* state, TupleTableSlot* slot, Tuple
setLastTid(&(tuple->t_self));
}
/* AFTER ROW INSERT Triggers */
/* AFTER ROW INSERT Triggers
* Note: We fire AFTER ROW TRIGGERS for every attempted insertion except
* for a MERGE or INSERT ... ON DUPLICATE KEY UPDATE statement.
*/
if (state->operation != CMD_MERGE && !useHeapMultiInsert)
ExecARInsertTriggers(estate, result_rel_info, partition_id, bucket_id, tuple, recheck_indexes);
@ -938,6 +1226,8 @@ TupleTableSlot* ExecUpdate(ItemPointer tupleid,
#ifdef PGXC
RemoteQueryState* result_remote_rel = NULL;
#endif
bool allow_update_self = (node->mt_upsert != NULL &&
node->mt_upsert->us_action != UPSERT_NONE) ? true : false;
/*
* abort the operation if not running transactions
@ -1049,7 +1339,6 @@ TupleTableSlot* ExecUpdate(ItemPointer tupleid,
return NULL;
}
/* FDW might have changed tuple */
tuple = ExecMaterializeSlot(slot);
} else {
@ -1102,7 +1391,8 @@ TupleTableSlot* ExecUpdate(ItemPointer tupleid,
&update_xmax,
estate->es_output_cid,
estate->es_crosscheck_snapshot,
true /* wait for commit */);
true /* wait for commit */,
allow_update_self);
switch (result) {
case HeapTupleSelfUpdated:
/* can not update one row more than once for merge into */
@ -1212,7 +1502,8 @@ TupleTableSlot* ExecUpdate(ItemPointer tupleid,
* If it's a HOT update, we mustn't insert new index entries.
*/
if (result_rel_info->ri_NumIndices > 0 && !HeapTupleIsHeapOnly(tuple))
recheck_indexes = ExecInsertIndexTuples(slot, &(tuple->t_self), estate, NULL, NULL, bucketid);
recheck_indexes = ExecInsertIndexTuples(slot, &(tuple->t_self), estate,
NULL, NULL, bucketid, NULL);
} else {
/* for partitioned table */
bool row_movement = false;
@ -1300,7 +1591,8 @@ TupleTableSlot* ExecUpdate(ItemPointer tupleid,
&update_xmax,
estate->es_output_cid,
estate->es_crosscheck_snapshot,
true /* wait for commit */);
true /* wait for commit */,
allow_update_self);
switch (result) {
case HeapTupleSelfUpdated:
/* can not update one row more than once for merge into */
@ -1403,7 +1695,7 @@ TupleTableSlot* ExecUpdate(ItemPointer tupleid,
* delete index entries for tuple
*/
recheck_indexes = ExecInsertIndexTuples(slot, &(tuple->t_self), estate,
fake_part_rel, partition, bucketid);
fake_part_rel, partition, bucketid, NULL);
}
} else {
/* row movement */
@ -1432,7 +1724,8 @@ TupleTableSlot* ExecUpdate(ItemPointer tupleid,
&update_xmax,
estate->es_output_cid,
estate->es_crosscheck_snapshot,
true /* wait for commit */);
true /* wait for commit */,
allow_update_self);
switch (result) {
case HeapTupleSelfUpdated:
/* can not update one row more than once for merge into */
@ -1541,7 +1834,7 @@ TupleTableSlot* ExecUpdate(ItemPointer tupleid,
if (result_rel_info->ri_NumIndices > 0) {
recheck_indexes = ExecInsertIndexTuples(
slot, &(tuple->t_self), estate, fake_part_rel, insert_partition, bucketid);
slot, &(tuple->t_self), estate, fake_part_rel, insert_partition, bucketid, NULL);
}
}
}
@ -1602,6 +1895,9 @@ static void fireBSTriggers(ModifyTableState* node)
switch (node->operation) {
case CMD_INSERT:
ExecBSInsertTriggers(node->ps.state, node->resultRelInfo);
if (node->mt_upsert->us_action == UPSERT_UPDATE) {
ExecBSUpdateTriggers(node->ps.state, node->resultRelInfo);
}
break;
case CMD_UPDATE:
ExecBSUpdateTriggers(node->ps.state, node->resultRelInfo);
@ -1629,6 +1925,9 @@ static void fireASTriggers(ModifyTableState* node)
switch (node->operation) {
case CMD_INSERT:
ExecASInsertTriggers(node->ps.state, node->resultRelInfo);
if (node->mt_upsert->us_action == UPSERT_UPDATE) {
ExecASUpdateTriggers(node->ps.state, node->resultRelInfo);
}
break;
case CMD_UPDATE:
ExecASUpdateTriggers(node->ps.state, node->resultRelInfo);
@ -1755,7 +2054,7 @@ TupleTableSlot* ExecModifyTable(ModifyTableState* node)
#endif
if (operation == CMD_INSERT) {
if (node->ps.type == T_ModifyTableState ||
if (node->ps.type == T_ModifyTableState || node->mt_upsert->us_action != UPSERT_NONE ||
(result_rel_info->ri_TrigDesc != NULL && (result_rel_info->ri_TrigDesc->trig_insert_before_row ||
result_rel_info->ri_TrigDesc->trig_insert_instead_row)))
ExecInsert = ExecInsertT<false>;
@ -2048,6 +2347,7 @@ ModifyTableState* ExecInitModifyTable(ModifyTable* node, EState* estate, int efl
ResultRelInfo* result_rel_info = NULL;
TupleDesc tup_desc = NULL;
Plan* sub_plan = NULL;
UpsertState* upsertState = NULL;
ListCell* l = NULL;
int i;
#ifdef PGXC
@ -2110,6 +2410,13 @@ ModifyTableState* ExecInitModifyTable(ModifyTable* node, EState* estate, int efl
mt_state->mt_arowmarks = (List**)palloc0(sizeof(List*) * nplans);
mt_state->mt_nplans = nplans;
upsertState = (UpsertState*)palloc0(sizeof(UpsertState));
upsertState->us_action = node->upsertAction;
upsertState->us_existing = NULL;
upsertState->us_excludedtlist = NIL;
upsertState->us_updateproj = NULL;
mt_state->mt_upsert = upsertState;
/* set up epqstate with dummy sub_plan data for the moment */
EvalPlanQualInit(&mt_state->mt_epqstate, estate, NULL, NIL, node->epqParam);
mt_state->fireBSTriggers = true;
@ -2160,7 +2467,7 @@ ModifyTableState* ExecInitModifyTable(ModifyTable* node, EState* estate, int efl
result_rel_info->ri_IndexRelationDescs == NULL) {
if (result_rel_info->ri_FdwRoutine == NULL || result_rel_info->ri_FdwRoutine->GetFdwType == NULL ||
result_rel_info->ri_FdwRoutine->GetFdwType() != MOT_ORC)
ExecOpenIndices(result_rel_info);
ExecOpenIndices(result_rel_info, node->upsertAction != UPSERT_NONE);
}
init_gtt_storage(operation, result_rel_info);
/* Now init the plan for this result rel */
@ -2281,6 +2588,43 @@ ModifyTableState* ExecInitModifyTable(ModifyTable* node, EState* estate, int efl
mt_state->ps.ps_ExprContext = NULL;
}
/*
* If needed, Initialize target list, projection and qual for DUPLICATE KEY UPDATE
*/
result_rel_info = mt_state->resultRelInfo;
if (node->upsertAction == UPSERT_UPDATE) {
ExprContext* econtext;
ExprState* setexpr;
TupleDesc tupDesc;
/* insert may only have one plan, inheritance is not expanded */
Assert(nplans = 1);
/* already exists if created by RETURNING processing above */
if (mt_state->ps.ps_ExprContext == NULL) {
ExecAssignExprContext(estate, &mt_state->ps);
}
econtext = mt_state->ps.ps_ExprContext;
/* initialize slot for the existing tuple */
upsertState->us_existing = ExecInitExtraTupleSlot(mt_state->ps.state);
ExecSetSlotDescriptor(upsertState->us_existing, result_rel_info->ri_RelationDesc->rd_att);
upsertState->us_excludedtlist = node->exclRelTlist;
/* create target slot for UPDATE SET projection */
tupDesc = ExecTypeFromTL((List*)node->updateTlist, result_rel_info->ri_RelationDesc->rd_rel->relhasoids);
upsertState->us_updateproj = ExecInitExtraTupleSlot(mt_state->ps.state);
ExecSetSlotDescriptor(upsertState->us_updateproj, tupDesc);
/* build UPDATE SET expression and projection state */
setexpr = ExecInitExpr((Expr*)node->updateTlist, &mt_state->ps);
result_rel_info->ri_updateProj =
ExecBuildProjectionInfo((List*)setexpr, econtext,
upsertState->us_updateproj, result_rel_info->ri_RelationDesc->rd_att);
}
/*
* If we have any secondary relations in an UPDATE or DELETE, they need to
* be treated like non-locked relations in SELECT FOR UPDATE, ie, the