enhance tuple lock
This commit is contained in:
@ -317,7 +317,7 @@ void standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
|
||||
switch (queryDesc->operation) {
|
||||
case CMD_SELECT:
|
||||
/*
|
||||
* SELECT FOR UPDATE/SHARE and modifying CTEs need to mark tuples
|
||||
* SELECT FOR [KEY] UPDATE/SHARE and modifying CTEs need to mark tuples
|
||||
*/
|
||||
if (queryDesc->plannedstmt->rowMarks != NIL || queryDesc->plannedstmt->hasModifyingCTE) {
|
||||
estate->es_output_cid = GetCurrentCommandId(true);
|
||||
@ -1241,7 +1241,7 @@ void InitPlan(QueryDesc *queryDesc, int eflags)
|
||||
}
|
||||
|
||||
/*
|
||||
* Similarly, we have to lock relations selected FOR UPDATE/FOR SHARE
|
||||
* Similarly, we have to lock relations selected FOR [KEY] UPDATE/SHARE
|
||||
* before we initialize the plan tree, else we'd be risking lock upgrades.
|
||||
* While we are at it, build the ExecRowMark list.
|
||||
*/
|
||||
@ -1264,7 +1264,9 @@ void InitPlan(QueryDesc *queryDesc, int eflags)
|
||||
*/
|
||||
switch (rc->markType) {
|
||||
case ROW_MARK_EXCLUSIVE:
|
||||
case ROW_MARK_NOKEYEXCLUSIVE:
|
||||
case ROW_MARK_SHARE:
|
||||
case ROW_MARK_KEYSHARE:
|
||||
if (IS_PGXC_COORDINATOR || u_sess->pgxc_cxt.PGXCNodeId < 0 ||
|
||||
bms_is_member(u_sess->pgxc_cxt.PGXCNodeId, rc->bms_nodeids)) {
|
||||
relid = getrelid(rc->rti, rangeTable);
|
||||
@ -1979,7 +1981,7 @@ static void ExecEndPlan(PlanState *planstate, EState *estate)
|
||||
}
|
||||
|
||||
/*
|
||||
* close any relations selected FOR UPDATE/FOR SHARE, again keeping locks
|
||||
* close any relations selected FOR [KEY] UPDATE/SHARE, again keeping locks
|
||||
*/
|
||||
foreach (l, estate->es_rowMarks) {
|
||||
ExecRowMark *erm = (ExecRowMark *)lfirst(l);
|
||||
@ -2773,6 +2775,7 @@ TupleTableSlot *EvalPlanQualUHeap(EState *estate, EPQState *epqstate, Relation r
|
||||
* epqstate - state for EvalPlanQual rechecking
|
||||
* relation - table containing tuple
|
||||
* rti - rangetable index of table containing tuple
|
||||
* lockmode - requested tuple lock mode
|
||||
* *tid - t_ctid from the outdated tuple (ie, next updated version)
|
||||
* priorXmax - t_xmax from the outdated tuple
|
||||
*
|
||||
@ -2781,9 +2784,12 @@ TupleTableSlot *EvalPlanQualUHeap(EState *estate, EPQState *epqstate, Relation r
|
||||
*
|
||||
* Returns a slot containing the new candidate update/delete tuple, or
|
||||
* NULL if we determine we shouldn't process the row.
|
||||
*
|
||||
* Note: properly, lockmode should be declared as enum LockTupleMode,
|
||||
* but we use "int" to avoid having to include heapam.h in executor.h.
|
||||
*/
|
||||
TupleTableSlot *EvalPlanQual(EState *estate, EPQState *epqstate, Relation relation, Index rti, ItemPointer tid,
|
||||
TransactionId priorXmax, bool partRowMoveUpdate)
|
||||
TupleTableSlot *EvalPlanQual(EState *estate, EPQState *epqstate, Relation relation, Index rti, int lockmode,
|
||||
ItemPointer tid, TransactionId priorXmax, bool partRowMoveUpdate)
|
||||
{
|
||||
TupleTableSlot *slot = NULL;
|
||||
Tuple copyTuple;
|
||||
@ -2793,7 +2799,7 @@ TupleTableSlot *EvalPlanQual(EState *estate, EPQState *epqstate, Relation relati
|
||||
/*
|
||||
* Get and lock the updated version of the row; if fail, return NULL.
|
||||
*/
|
||||
copyTuple = tableam_tuple_lock_updated(estate->es_output_cid, relation, LockTupleExclusive, tid, priorXmax,
|
||||
copyTuple = tableam_tuple_lock_updated(estate->es_output_cid, relation, lockmode, tid, priorXmax,
|
||||
estate->es_snapshot);
|
||||
|
||||
if (copyTuple == NULL) {
|
||||
@ -3073,7 +3079,7 @@ HeapTuple heap_lock_updated(CommandId cid, Relation relation, int lockmode, Item
|
||||
/* updated, so look at the updated row */
|
||||
tuple.t_self = tuple.t_data->t_ctid;
|
||||
/* updated row should have xmin matching this xmax */
|
||||
priorXmax = HeapTupleGetRawXmax(&tuple);
|
||||
priorXmax = HeapTupleGetUpdateXid(&tuple);
|
||||
ReleaseBuffer(buffer);
|
||||
/* loop back to fetch next in chain */
|
||||
}
|
||||
|
||||
@ -835,7 +835,8 @@ BitmapHeapScanState* ExecInitBitmapHeapScan(BitmapHeapScan* node, EState* estate
|
||||
* occured after taking the snapshot. Skip for explain only commands.
|
||||
*/
|
||||
if (isUstoreRel && !(eflags & EXEC_FLAG_EXPLAIN_ONLY)) {
|
||||
TransactionId relfrozenxid64 = getPartitionRelfrozenxid(partitiontrel);
|
||||
TransactionId relfrozenxid64 = InvalidTransactionId;
|
||||
getPartitionRelxids(partitiontrel, &relfrozenxid64);
|
||||
if (TransactionIdPrecedes(FirstNormalTransactionId, scanSnap->xmax) &&
|
||||
!TransactionIdIsCurrentTransactionId(relfrozenxid64) &&
|
||||
TransactionIdPrecedes(scanSnap->xmax, relfrozenxid64)) {
|
||||
@ -859,7 +860,8 @@ BitmapHeapScanState* ExecInitBitmapHeapScan(BitmapHeapScan* node, EState* estate
|
||||
* occured after taking the snapshot. Skip for explain only commands.
|
||||
*/
|
||||
if (!(eflags & EXEC_FLAG_EXPLAIN_ONLY)) {
|
||||
TransactionId relfrozenxid64 = getRelationRelfrozenxid(currentRelation);
|
||||
TransactionId relfrozenxid64 = InvalidTransactionId;
|
||||
getRelationRelxids(currentRelation, &relfrozenxid64);
|
||||
if (TransactionIdPrecedes(FirstNormalTransactionId, scanSnap->xmax) &&
|
||||
!TransactionIdIsCurrentTransactionId(relfrozenxid64) &&
|
||||
TransactionIdPrecedes(scanSnap->xmax, relfrozenxid64)) {
|
||||
|
||||
@ -711,7 +711,8 @@ IndexOnlyScanState* ExecInitIndexOnlyScan(IndexOnlyScan* node, EState* estate, i
|
||||
* occured after taking the snapshot.
|
||||
*/
|
||||
if (RelationIsUstoreFormat(indexstate->ss.ss_currentPartition)) {
|
||||
TransactionId relfrozenxid64 = getPartitionRelfrozenxid(indexstate->ss.ss_currentPartition);
|
||||
TransactionId relfrozenxid64 = InvalidTransactionId;
|
||||
getPartitionRelxids(indexstate->ss.ss_currentPartition, &relfrozenxid64);
|
||||
if (TransactionIdPrecedes(FirstNormalTransactionId, scanSnap->xmax) &&
|
||||
!TransactionIdIsCurrentTransactionId(relfrozenxid64) &&
|
||||
TransactionIdPrecedes(scanSnap->xmax, relfrozenxid64)) {
|
||||
@ -735,7 +736,8 @@ IndexOnlyScanState* ExecInitIndexOnlyScan(IndexOnlyScan* node, EState* estate, i
|
||||
* occured after taking the snapshot.
|
||||
*/
|
||||
if (RelationIsUstoreFormat(currentRelation)) {
|
||||
TransactionId relfrozenxid64 = getRelationRelfrozenxid(currentRelation);
|
||||
TransactionId relfrozenxid64 = InvalidTransactionId;
|
||||
getRelationRelxids(currentRelation, &relfrozenxid64);
|
||||
if (TransactionIdPrecedes(FirstNormalTransactionId, scanSnap->xmax) &&
|
||||
!TransactionIdIsCurrentTransactionId(relfrozenxid64) &&
|
||||
TransactionIdPrecedes(scanSnap->xmax, relfrozenxid64)) {
|
||||
|
||||
@ -708,7 +708,8 @@ IndexScanState* ExecInitIndexScan(IndexScan* node, EState* estate, int eflags)
|
||||
* occured after taking the snapshot.
|
||||
*/
|
||||
if (RelationIsUstoreFormat(index_state->ss.ss_currentPartition)) {
|
||||
TransactionId relfrozenxid64 = getPartitionRelfrozenxid(index_state->ss.ss_currentPartition);
|
||||
TransactionId relfrozenxid64 = InvalidTransactionId;
|
||||
getPartitionRelxids(index_state->ss.ss_currentPartition, &relfrozenxid64);
|
||||
if (TransactionIdPrecedes(FirstNormalTransactionId, scanSnap->xmax) &&
|
||||
!TransactionIdIsCurrentTransactionId(relfrozenxid64) &&
|
||||
TransactionIdPrecedes(scanSnap->xmax, relfrozenxid64)) {
|
||||
@ -733,7 +734,8 @@ IndexScanState* ExecInitIndexScan(IndexScan* node, EState* estate, int eflags)
|
||||
* occured after taking the snapshot.
|
||||
*/
|
||||
if (RelationIsUstoreFormat(current_relation)) {
|
||||
TransactionId relfrozenxid64 = getRelationRelfrozenxid(current_relation);
|
||||
TransactionId relfrozenxid64 = InvalidTransactionId;
|
||||
getRelationRelxids(current_relation, &relfrozenxid64);
|
||||
if (TransactionIdPrecedes(FirstNormalTransactionId, scanSnap->xmax) &&
|
||||
!TransactionIdIsCurrentTransactionId(relfrozenxid64) &&
|
||||
TransactionIdPrecedes(scanSnap->xmax, relfrozenxid64)) {
|
||||
|
||||
@ -181,15 +181,34 @@ lnext:
|
||||
searchHBucketFakeRelation(estate->esfRelations, estate->es_query_cxt, target_rel, bucket_id, bucket_rel);
|
||||
}
|
||||
/* okay, try to lock the tuple */
|
||||
if (erm->markType == ROW_MARK_EXCLUSIVE)
|
||||
lock_mode = LockTupleExclusive;
|
||||
else
|
||||
lock_mode = LockTupleShared;
|
||||
switch (erm->markType) {
|
||||
case ROW_MARK_EXCLUSIVE:
|
||||
lock_mode = LockTupleExclusive;
|
||||
break;
|
||||
case ROW_MARK_NOKEYEXCLUSIVE:
|
||||
lock_mode = LockTupleNoKeyExclusive;
|
||||
break;
|
||||
case ROW_MARK_SHARE:
|
||||
lock_mode = LockTupleShared;
|
||||
break;
|
||||
case ROW_MARK_KEYSHARE:
|
||||
lock_mode = LockTupleKeyShare;
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "unsupported rowmark type");
|
||||
lock_mode = LockTupleNoKeyExclusive; /* keep compiler quiet */
|
||||
break;
|
||||
}
|
||||
|
||||
/* Need to merge the ustore logic with AM logic */
|
||||
test = tableam_tuple_lock(bucket_rel, &tuple, &buffer,
|
||||
estate->es_output_cid, lock_mode, erm->noWait, &tmfd,
|
||||
#ifdef ENABLE_MULTIPLE_NODES
|
||||
false, false, false, estate->es_snapshot, NULL, true);
|
||||
#else
|
||||
false, true, false, estate->es_snapshot, NULL, true);
|
||||
#endif
|
||||
|
||||
ReleaseBuffer(buffer);
|
||||
|
||||
switch (test) {
|
||||
@ -262,8 +281,6 @@ lnext:
|
||||
errmsg("could not serialize access due to concurrent update")));
|
||||
|
||||
/* Tuple was deleted, so don't return it */
|
||||
Assert(ItemPointerEquals(&tmfd.ctid, &tuple.t_self));
|
||||
|
||||
if (rowMovement) {
|
||||
/*
|
||||
* the may be a row movement update action which delete tuple from original
|
||||
|
||||
@ -1315,7 +1315,7 @@ ldelete:
|
||||
errmsg("concurrent update under Stream mode is not yet supported")));
|
||||
}
|
||||
TupleTableSlot *epqslot = EvalPlanQual(estate, epqstate, fake_relation,
|
||||
result_rel_info->ri_RangeTableIndex, &tmfd.ctid, tmfd.xmax, false);
|
||||
result_rel_info->ri_RangeTableIndex, LockTupleExclusive, &tmfd.ctid, tmfd.xmax, false);
|
||||
if (!TupIsNull(epqslot)) {
|
||||
*tupleid = tmfd.ctid;
|
||||
goto ldelete;
|
||||
@ -1652,6 +1652,7 @@ TupleTableSlot* ExecUpdate(ItemPointer tupleid,
|
||||
tuple = tableam_tslot_get_tuple_from_slot(result_relation_desc, slot);
|
||||
} else {
|
||||
bool update_indexes = false;
|
||||
LockTupleMode lockmode;
|
||||
|
||||
/*
|
||||
* Compute stored generated columns
|
||||
@ -1721,7 +1722,8 @@ lreplace:
|
||||
/* add para 2 for heap_update */
|
||||
result = tableam_tuple_update(fake_relation, parent_relation, tupleid, tuple, estate->es_output_cid,
|
||||
estate->es_crosscheck_snapshot, estate->es_snapshot, true, // wait for commit
|
||||
&oldslot, &tmfd, &update_indexes, &modifiedIdxAttrs, allow_update_self, allowInplaceUpdate);
|
||||
&oldslot, &tmfd, &update_indexes, &modifiedIdxAttrs, allow_update_self,
|
||||
allowInplaceUpdate, &lockmode);
|
||||
switch (result) {
|
||||
case TM_SelfUpdated:
|
||||
case TM_SelfModified:
|
||||
@ -1794,7 +1796,7 @@ lreplace:
|
||||
}
|
||||
|
||||
TupleTableSlot *epq_slot = EvalPlanQual(estate, epqstate, fake_relation,
|
||||
result_rel_info->ri_RangeTableIndex, &tmfd.ctid, tmfd.xmax, false);
|
||||
result_rel_info->ri_RangeTableIndex, lockmode, &tmfd.ctid, tmfd.xmax, false);
|
||||
if (!TupIsNull(epq_slot)) {
|
||||
*tupleid = tmfd.ctid;
|
||||
|
||||
@ -1977,7 +1979,8 @@ lreplace:
|
||||
&update_indexes,
|
||||
&modifiedIdxAttrs,
|
||||
allow_update_self,
|
||||
allowInplaceUpdate);
|
||||
allowInplaceUpdate,
|
||||
&lockmode);
|
||||
switch (result) {
|
||||
case TM_SelfUpdated:
|
||||
case TM_SelfModified:
|
||||
@ -2036,7 +2039,7 @@ lreplace:
|
||||
}
|
||||
|
||||
TupleTableSlot *epq_slot = EvalPlanQual(estate, epqstate, fake_relation,
|
||||
result_rel_info->ri_RangeTableIndex, &tmfd.ctid, tmfd.xmax,
|
||||
result_rel_info->ri_RangeTableIndex, lockmode, &tmfd.ctid, tmfd.xmax,
|
||||
result_relation_desc->rd_rel->relrowmovement);
|
||||
|
||||
if (!TupIsNull(epq_slot)) {
|
||||
@ -2220,6 +2223,7 @@ ldelete:
|
||||
epqstate,
|
||||
old_fake_relation,
|
||||
result_rel_info->ri_RangeTableIndex,
|
||||
LockTupleExclusive,
|
||||
&tmfd.ctid,
|
||||
tmfd.xmax,
|
||||
result_relation_desc->rd_rel->relrowmovement);
|
||||
|
||||
@ -423,7 +423,8 @@ void InitScanRelation(SeqScanState* node, EState* estate, int eflags)
|
||||
|
||||
if (!node->isPartTbl) {
|
||||
/* add qual for redis */
|
||||
TransactionId relfrozenxid64 = getRelationRelfrozenxid(current_relation);
|
||||
TransactionId relfrozenxid64 = InvalidTransactionId;
|
||||
getRelationRelxids(current_relation, &relfrozenxid64);
|
||||
current_scan_desc = BeginScanRelation(node, current_relation, relfrozenxid64, eflags);
|
||||
} else {
|
||||
plan = (SeqScan*)node->ps.plan;
|
||||
@ -491,7 +492,8 @@ void InitScanRelation(SeqScanState* node, EState* estate, int eflags)
|
||||
node->ss_currentPartition = current_part_rel;
|
||||
|
||||
/* add qual for redis */
|
||||
TransactionId relfrozenxid64 = getPartitionRelfrozenxid(current_part_rel);
|
||||
TransactionId relfrozenxid64 = InvalidTransactionId;
|
||||
getPartitionRelxids(current_part_rel, &relfrozenxid64);
|
||||
current_scan_desc = BeginScanRelation(node, current_part_rel, relfrozenxid64, eflags);
|
||||
} else {
|
||||
node->ss_currentPartition = NULL;
|
||||
|
||||
Reference in New Issue
Block a user