881 lines
33 KiB
C++
881 lines
33 KiB
C++
/* -------------------------------------------------------------------------
|
|
*
|
|
* execReplication.cpp
|
|
* miscellaneous executor routines for logical replication
|
|
*
|
|
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
|
|
* Portions Copyright (c) 1994, Regents of the University of California
|
|
*
|
|
*
|
|
* IDENTIFICATION
|
|
* src/gausskernel/runtime/executor/execReplication.cpp
|
|
*
|
|
* -------------------------------------------------------------------------
|
|
*/
|
|
|
|
#include "postgres.h"
|
|
|
|
#include "access/relscan.h"
|
|
#include "access/transam.h"
|
|
#include "access/tableam.h"
|
|
#include "access/xact.h"
|
|
#include "commands/trigger.h"
|
|
#include "commands/cluster.h"
|
|
#include "catalog/pg_partition_fn.h"
|
|
#include "catalog/pg_publication.h"
|
|
#include "executor/executor.h"
|
|
#include "executor/node/nodeModifyTable.h"
|
|
#include "nodes/nodeFuncs.h"
|
|
#include "parser/parse_relation.h"
|
|
#include "parser/parsetree.h"
|
|
#include "storage/buf/bufmgr.h"
|
|
#include "storage/lmgr.h"
|
|
#include "utils/builtins.h"
|
|
#include "utils/datum.h"
|
|
#include "utils/lsyscache.h"
|
|
#include "utils/memutils.h"
|
|
#include "utils/rel.h"
|
|
#include "utils/snapmgr.h"
|
|
#include "utils/syscache.h"
|
|
#include "utils/typcache.h"
|
|
|
|
static bool RelationFindReplTupleByIndex(EState *estate, Relation rel, Relation idxrel, LockTupleMode lockmode,
|
|
TupleTableSlot *searchslot, TupleTableSlot *outslot, FakeRelationPartition *fakeRelPart);
|
|
static bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot,
|
|
TupleTableSlot *outslot, FakeRelationPartition *fakeRelPart);
|
|
|
|
/*
|
|
* Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
|
|
* is setup to match 'rel' (*NOT* idxrel!).
|
|
*
|
|
* Returns whether any column contains NULLs.
|
|
*
|
|
* This is not generic routine, it expects the idxrel to be replication
|
|
* identity of a rel and meet all limitations associated with that.
|
|
*/
|
|
static bool build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel, TupleTableSlot *searchslot)
|
|
{
|
|
int attoff;
|
|
bool isnull;
|
|
Datum indclassDatum;
|
|
oidvector *opclass;
|
|
int2vector *indkey = &idxrel->rd_index->indkey;
|
|
bool hasnulls = false;
|
|
|
|
indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple, Anum_pg_index_indclass, &isnull);
|
|
Assert(!isnull);
|
|
opclass = (oidvector *)DatumGetPointer(indclassDatum);
|
|
|
|
/* Build scankey for every attribute in the index. */
|
|
for (attoff = 0; attoff < IndexRelationGetNumberOfKeyAttributes(idxrel); attoff++) {
|
|
Oid op;
|
|
Oid opfamily;
|
|
RegProcedure regop;
|
|
int pkattno = attoff + 1;
|
|
int mainattno = indkey->values[attoff];
|
|
Oid optype = get_opclass_input_type(opclass->values[attoff]);
|
|
if (mainattno > searchslot->tts_tupleDescriptor->natts) {
|
|
ereport(ERROR, (errcode(ERRCODE_INVALID_ATTRIBUTE),
|
|
errmsg("index key attribute number %d exceeds number of columns %d",
|
|
mainattno, searchslot->tts_tupleDescriptor->natts)));
|
|
}
|
|
|
|
/*
|
|
* Load the operator info. We need this to get the equality operator
|
|
* function for the scan key.
|
|
*/
|
|
opfamily = get_opclass_family(opclass->values[attoff]);
|
|
|
|
op = get_opfamily_member(opfamily, optype, optype, BTEqualStrategyNumber);
|
|
if (!OidIsValid(op))
|
|
elog(ERROR, "missing operator %d(%u,%u) in opfamily %u", BTEqualStrategyNumber, optype, optype, opfamily);
|
|
|
|
regop = get_opcode(op);
|
|
|
|
/* Initialize the scankey. */
|
|
ScanKeyInit(&skey[attoff], pkattno, BTEqualStrategyNumber, regop, searchslot->tts_values[mainattno - 1]);
|
|
skey[attoff].sk_collation = idxrel->rd_indcollation[attoff];
|
|
|
|
/* Check for null value. */
|
|
if (searchslot->tts_isnull[mainattno - 1]) {
|
|
hasnulls = true;
|
|
skey[attoff].sk_flags |= SK_ISNULL;
|
|
}
|
|
}
|
|
|
|
return hasnulls;
|
|
}
|
|
|
|
/* Check tableam_tuple_lock result, and return if need to retry */
|
|
static bool inline CheckTupleLockRes(TM_Result res)
|
|
{
|
|
switch (res) {
|
|
case TM_Ok:
|
|
break;
|
|
case TM_Updated:
|
|
case TM_Deleted:
|
|
/* XXX: Improve handling here */
|
|
ereport(LOG, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
|
|
errmsg("concurrent update or delete, retrying")));
|
|
return true;
|
|
case TM_Invisible:
|
|
elog(ERROR, "attempted to lock invisible tuple");
|
|
break;
|
|
default:
|
|
elog(ERROR, "unexpected heap_lock_tuple status: %u", res);
|
|
break;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
/* Check heap modify result */
|
|
static void inline CheckTupleModifyRes(TM_Result res)
|
|
{
|
|
switch (res) {
|
|
case TM_SelfModified:
|
|
/* Tuple was already updated in current command? */
|
|
ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("tuple already updated by self")));
|
|
break;
|
|
case TM_Ok:
|
|
break;
|
|
case TM_Updated:
|
|
case TM_Deleted:
|
|
ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("tuple concurrently updated")));
|
|
break;
|
|
default:
|
|
ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("unrecognized tuple status: %u", res)));
|
|
break;
|
|
}
|
|
}
|
|
|
|
static inline List* GetPartitionList(Relation rel, LOCKMODE lockmode)
|
|
{
|
|
if (RelationIsSubPartitioned(rel)) {
|
|
return RelationGetSubPartitionList(rel, lockmode);
|
|
} else {
|
|
return relationGetPartitionList(rel, lockmode);
|
|
}
|
|
}
|
|
|
|
static bool PartitionFindReplTupleByIndex(EState *estate, Relation rel, Relation idxrel, LockTupleMode lockmode,
|
|
TupleTableSlot *searchslot, TupleTableSlot *outslot, FakeRelationPartition *fakeRelInfo)
|
|
{
|
|
/* must be non-GPI index */
|
|
Assert(!RelationIsGlobalIndex(idxrel));
|
|
|
|
fakeRelInfo->partList = GetPartitionList(rel, RowExclusiveLock);
|
|
/* search the tuple in partition list one by one */
|
|
ListCell *cell = NULL;
|
|
foreach (cell, fakeRelInfo->partList) {
|
|
Partition heapPart = (Partition)lfirst(cell);
|
|
Relation partionRel = RelationIsSubPartitioned(rel) ? SubPartitionGetRelation(rel, heapPart, NoLock) :
|
|
partitionGetRelation(rel, heapPart);
|
|
/* Get index partition of this heap partition */
|
|
Oid idxPartOid = getPartitionIndexOid(RelationGetRelid(idxrel), heapPart->pd_id);
|
|
Partition idxPart = partitionOpen(idxrel, idxPartOid, RowExclusiveLock);
|
|
Relation idxPartRel = RelationIsSubPartitioned(idxrel) ? SubPartitionGetRelation(idxrel, idxPart, NoLock) :
|
|
partitionGetRelation(idxrel, idxPart);
|
|
|
|
fakeRelInfo->partRel = partionRel;
|
|
fakeRelInfo->part = heapPart;
|
|
fakeRelInfo->partOid = heapPart->pd_id;
|
|
|
|
if (RelationFindReplTupleByIndex(estate, rel, idxPartRel, lockmode, searchslot, outslot, fakeRelInfo)) {
|
|
/* Hit, release index resource, heap partition need to be used later, so don't release it */
|
|
partitionClose(idxrel, idxPart, NoLock);
|
|
releaseDummyRelation(&idxPartRel);
|
|
/* caller shoud release partRel */
|
|
fakeRelInfo->needRleaseDummyRel = true;
|
|
return true;
|
|
}
|
|
|
|
/* didn't find tuple in current partition, release dummy relation and switch to next partition */
|
|
releaseDummyRelation(&fakeRelInfo->partRel);
|
|
partitionClose(idxrel, idxPart, NoLock);
|
|
releaseDummyRelation(&idxPartRel);
|
|
}
|
|
|
|
/* do not find tuple in any patition, close and return */
|
|
releasePartitionList(rel, &fakeRelInfo->partList, NoLock);
|
|
return false;
|
|
}
|
|
|
|
static bool PartitionFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
|
|
TupleTableSlot *searchslot, TupleTableSlot *outslot, FakeRelationPartition *fakeRelInfo)
|
|
{
|
|
fakeRelInfo->partList = GetPartitionList(rel, RowExclusiveLock);
|
|
ListCell *cell = NULL;
|
|
foreach (cell, fakeRelInfo->partList) {
|
|
Partition heapPart = (Partition)lfirst(cell);
|
|
Relation partionRel = RelationIsSubPartitioned(rel) ? SubPartitionGetRelation(rel, heapPart, NoLock) :
|
|
partitionGetRelation(rel, heapPart);
|
|
|
|
fakeRelInfo->partRel = partionRel;
|
|
fakeRelInfo->part = heapPart;
|
|
fakeRelInfo->partOid = heapPart->pd_id;
|
|
|
|
if (RelationFindReplTupleSeq(rel, lockmode, searchslot, outslot, fakeRelInfo)) {
|
|
/* caller shoud release partRel */
|
|
fakeRelInfo->needRleaseDummyRel = true;
|
|
return true;
|
|
}
|
|
releaseDummyRelation(&fakeRelInfo->partRel);
|
|
}
|
|
|
|
/* do not find tuple in any patition, close and return */
|
|
releasePartitionList(rel, &fakeRelInfo->partList, NoLock);
|
|
return false;
|
|
}
|
|
|
|
/*
|
|
* Search the relation 'rel' for tuple using the index or seq scan.
|
|
*
|
|
* If a matching tuple is found, lock it with lockmode, fill the slot with its
|
|
* contents, and return true. Return false otherwise.
|
|
*
|
|
* Caller should check and release fakeRelInfo->partList and fakeRelInfo->partRel
|
|
*/
|
|
bool RelationFindReplTuple(EState *estate, Relation rel, Oid idxoid, LockTupleMode lockmode,
|
|
TupleTableSlot *searchslot, TupleTableSlot *outslot, FakeRelationPartition *fakeRelInfo)
|
|
{
|
|
int rc;
|
|
bool found = false;
|
|
Relation idxrel = NULL;
|
|
|
|
/* clear fake rel info */
|
|
rc = memset_s(fakeRelInfo, sizeof(FakeRelationPartition), 0, sizeof(FakeRelationPartition));
|
|
securec_check(rc, "", "");
|
|
|
|
if (OidIsValid(idxoid)) {
|
|
idxrel = index_open(idxoid, RowExclusiveLock);
|
|
}
|
|
|
|
/* for non partitioned table, or partitioned table with GPI, use parent heap and index to do the scan */
|
|
if (RelationIsNonpartitioned(rel) || (idxrel != NULL && RelationIsGlobalIndex(idxrel))) {
|
|
if (idxrel != NULL) {
|
|
found = RelationFindReplTupleByIndex(estate, rel, idxrel, lockmode, searchslot, outslot, fakeRelInfo);
|
|
index_close(idxrel, NoLock);
|
|
return found;
|
|
} else {
|
|
return RelationFindReplTupleSeq(rel, lockmode, searchslot, outslot, fakeRelInfo);
|
|
}
|
|
}
|
|
|
|
/* scan with partition */
|
|
if (idxrel != NULL) {
|
|
found = PartitionFindReplTupleByIndex(estate, rel, idxrel, lockmode, searchslot, outslot, fakeRelInfo);
|
|
index_close(idxrel, NoLock);
|
|
return found;
|
|
} else {
|
|
return PartitionFindReplTupleSeq(rel, lockmode, searchslot, outslot, fakeRelInfo);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Search the relation 'rel' for tuple using the index.
|
|
*
|
|
* If a matching tuple is found, lock it with lockmode, fill the slot with its
|
|
* contents, and return true. Return false otherwise.
|
|
*/
|
|
static bool RelationFindReplTupleByIndex(EState *estate, Relation rel, Relation idxrel, LockTupleMode lockmode,
|
|
TupleTableSlot *searchslot, TupleTableSlot *outslot, FakeRelationPartition *fakeRelPart)
|
|
{
|
|
HeapTuple scantuple;
|
|
ScanKeyData skey[INDEX_MAX_KEYS];
|
|
IndexScanDesc scan;
|
|
SnapshotData snap;
|
|
TransactionId xwait;
|
|
Relation targetRel = NULL;
|
|
bool found;
|
|
int rc;
|
|
bool isGpi = RelationIsGlobalIndex(idxrel);
|
|
/*
|
|
* For GPI and non-partition table, use parent heap relation to search the tuple,
|
|
* otherwise use partition relation
|
|
*/
|
|
if (isGpi || RelationIsNonpartitioned(rel)) {
|
|
targetRel = rel;
|
|
} else {
|
|
targetRel = fakeRelPart->partRel;
|
|
}
|
|
Assert(targetRel != NULL);
|
|
/* Start an index scan. */
|
|
InitDirtySnapshot(snap);
|
|
scan = scan_handler_idx_beginscan(targetRel, idxrel, &snap,
|
|
IndexRelationGetNumberOfKeyAttributes(idxrel), 0);
|
|
/* refer to check_violation, we need to set isUpsert if we want to use dirty snapshot in UStore */
|
|
scan->isUpsert = true;
|
|
|
|
/* Build scan key. */
|
|
build_replindex_scan_key(skey, targetRel, idxrel, searchslot);
|
|
|
|
while (true) {
|
|
found = false;
|
|
scan_handler_idx_rescan(scan, skey, IndexRelationGetNumberOfKeyAttributes(idxrel), NULL, 0);
|
|
|
|
/* Try to find the tuple */
|
|
if (RelationIsUstoreFormat(targetRel)) {
|
|
found = IndexGetnextSlot(scan, ForwardScanDirection, outslot);
|
|
} else {
|
|
if ((scantuple = scan_handler_idx_getnext(scan, ForwardScanDirection)) != NULL) {
|
|
found = true;
|
|
ExecStoreTuple(scantuple, outslot, InvalidBuffer, false);
|
|
}
|
|
}
|
|
if (found) {
|
|
/* Found tuple, try to lock it in the lockmode. */
|
|
xwait = TransactionIdIsValid(snap.xmin) ? snap.xmin : snap.xmax;
|
|
/*
|
|
* If the tuple is locked, wait for locking transaction to finish
|
|
* and retry.
|
|
*/
|
|
if (TransactionIdIsValid(xwait)) {
|
|
XactLockTableWait(xwait);
|
|
continue;
|
|
}
|
|
|
|
Buffer buf;
|
|
TM_FailureData hufd;
|
|
TM_Result res;
|
|
Tuple locktup;
|
|
HeapTupleData heaplocktup;
|
|
UHeapTupleData UHeaplocktup;
|
|
struct {
|
|
UHeapDiskTupleData hdr;
|
|
char data[MaxPossibleUHeapTupleSize];
|
|
} tbuf;
|
|
ItemPointer tid = tableam_tops_get_t_self(targetRel, outslot->tts_tuple);
|
|
|
|
if (RelationIsUstoreFormat(targetRel)) {
|
|
/* materialize the slot, so we can visit it after the scan is end */
|
|
outslot->tts_tuple = UHeapMaterialize(outslot);
|
|
ItemPointerCopy(tid, &UHeaplocktup.ctid);
|
|
rc = memset_s(&tbuf, sizeof(tbuf), 0, sizeof(tbuf));
|
|
securec_check(rc, "\0", "\0");
|
|
UHeaplocktup.disk_tuple = &tbuf.hdr;
|
|
locktup = &UHeaplocktup;
|
|
} else {
|
|
/* materialize the slot, so we can visit it after the scan is end */
|
|
outslot->tts_tuple = ExecMaterializeSlot(outslot);
|
|
ItemPointerCopy(tid, &heaplocktup.t_self);
|
|
locktup = &heaplocktup;
|
|
}
|
|
|
|
/* Get the target tuple's partition for GPI */
|
|
if (isGpi) {
|
|
GetFakeRelAndPart(estate, rel, outslot, fakeRelPart);
|
|
targetRel = fakeRelPart->partRel;
|
|
}
|
|
|
|
PushActiveSnapshot(GetLatestSnapshot());
|
|
res = tableam_tuple_lock(targetRel,
|
|
locktup, &buf, GetCurrentCommandId(false), lockmode, LockWaitBlock, &hufd,
|
|
false, false, /* don't follow updates */
|
|
false, /* eval */
|
|
GetLatestSnapshot(), tid, /* ItemPointer */
|
|
false); /* is select for update */
|
|
/* the tuple slot already has the buffer pinned */
|
|
ReleaseBuffer(buf);
|
|
PopActiveSnapshot();
|
|
|
|
if (CheckTupleLockRes(res)) {
|
|
/* lock tuple failed, try again */
|
|
continue;
|
|
}
|
|
}
|
|
/* we are done */
|
|
break;
|
|
}
|
|
|
|
scan_handler_idx_endscan(scan);
|
|
return found;
|
|
}
|
|
|
|
/*
|
|
* Compare the tuple and slot and check if they have equal values.
|
|
*/
|
|
static bool tuple_equals_slot(TupleDesc desc, const Tuple tup, TupleTableSlot *slot, TypeCacheEntry **eq)
|
|
{
|
|
Datum values[MaxTupleAttributeNumber];
|
|
bool isnull[MaxTupleAttributeNumber];
|
|
int attrnum;
|
|
Form_pg_attribute att;
|
|
|
|
tableam_tops_deform_tuple(tup, desc, values, isnull);
|
|
|
|
/* Check equality of the attributes. */
|
|
for (attrnum = 0; attrnum < desc->natts; attrnum++) {
|
|
TypeCacheEntry *typentry;
|
|
/* skip generate column */
|
|
if (GetGeneratedCol(desc, attrnum)) {
|
|
continue;
|
|
}
|
|
/*
|
|
* If one value is NULL and other is not, then they are certainly not
|
|
* equal
|
|
*/
|
|
if (isnull[attrnum] != slot->tts_isnull[attrnum])
|
|
return false;
|
|
|
|
/*
|
|
* If both are NULL, they can be considered equal.
|
|
*/
|
|
if (isnull[attrnum])
|
|
continue;
|
|
|
|
att = desc->attrs[attrnum];
|
|
typentry = eq[attrnum];
|
|
if (typentry == NULL) {
|
|
typentry = lookup_type_cache(att->atttypid, TYPECACHE_EQ_OPR_FINFO);
|
|
if (!OidIsValid(typentry->eq_opr_finfo.fn_oid)) {
|
|
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_FUNCTION),
|
|
errmsg("could not identify an equality operator for type %s", format_type_be(att->atttypid))));
|
|
}
|
|
eq[attrnum] = typentry;
|
|
}
|
|
|
|
if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo, att->attcollation, values[attrnum],
|
|
slot->tts_values[attrnum]))) {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
/*
|
|
* Search the relation 'rel' for tuple using the sequential scan.
|
|
*
|
|
* If a matching tuple is found, lock it with lockmode, fill the slot with its
|
|
* contents, and return true. Return false otherwise.
|
|
*
|
|
* Note that this stops on the first matching tuple.
|
|
*
|
|
* This can obviously be quite slow on tables that have more than few rows.
|
|
*/
|
|
static bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot,
|
|
TupleTableSlot *outslot, FakeRelationPartition *fakeRelPart)
|
|
{
|
|
Tuple scantuple;
|
|
TableScanDesc scan;
|
|
SnapshotData snap;
|
|
TypeCacheEntry **eq;
|
|
TransactionId xwait;
|
|
bool found;
|
|
int rc;
|
|
Relation targetRel = fakeRelPart->partRel == NULL ? rel : fakeRelPart->partRel;
|
|
TupleDesc desc = RelationGetDescr(rel);
|
|
bool retry = false;
|
|
|
|
Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
|
|
eq = (TypeCacheEntry **)palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
|
|
|
|
/* Start a heap scan. */
|
|
InitDirtySnapshot(snap);
|
|
scan = scan_handler_tbl_beginscan(targetRel, &snap, 0, NULL, NULL);
|
|
|
|
while (true) {
|
|
retry = false;
|
|
found = false;
|
|
scan_handler_tbl_rescan(scan, NULL, targetRel);
|
|
|
|
/* Try to find the tuple */
|
|
while ((scantuple = scan_handler_tbl_getnext(scan, ForwardScanDirection, targetRel)) != NULL) {
|
|
if (!tuple_equals_slot(desc, scantuple, searchslot, eq)) {
|
|
continue;
|
|
}
|
|
|
|
found = true;
|
|
ExecStoreTuple(scantuple, outslot, InvalidBuffer, false);
|
|
|
|
xwait = TransactionIdIsValid(snap.xmin) ? snap.xmin : snap.xmax;
|
|
/*
|
|
* If the tuple is locked, wait for locking transaction to finish
|
|
* and retry.
|
|
*/
|
|
if (TransactionIdIsValid(xwait)) {
|
|
/* retry */
|
|
retry = true;
|
|
XactLockTableWait(xwait);
|
|
}
|
|
break;
|
|
}
|
|
|
|
if (retry) {
|
|
continue;
|
|
}
|
|
if (found) {
|
|
/* Found tuple, try to lock it in the lockmode. */
|
|
Buffer buf;
|
|
TM_FailureData hufd;
|
|
TM_Result res;
|
|
Tuple locktup = NULL;
|
|
HeapTupleData heaplocktup;
|
|
UHeapTupleData UHeaplocktup;
|
|
struct {
|
|
UHeapDiskTupleData hdr;
|
|
char data[MaxPossibleUHeapTupleSize];
|
|
} tbuf;
|
|
ItemPointer tid = tableam_tops_get_t_self(rel, outslot->tts_tuple);
|
|
|
|
if (RelationIsUstoreFormat(targetRel)) {
|
|
/* materialize the slot, so we can visit it after the scan is end */
|
|
outslot->tts_tuple = UHeapMaterialize(outslot);
|
|
ItemPointerCopy(tid, &UHeaplocktup.ctid);
|
|
rc = memset_s(&tbuf, sizeof(tbuf), 0, sizeof(tbuf));
|
|
securec_check(rc, "\0", "\0");
|
|
UHeaplocktup.disk_tuple = &tbuf.hdr;
|
|
locktup = &UHeaplocktup;
|
|
} else {
|
|
/* materialize the slot, so we can visit it after the scan is end */
|
|
outslot->tts_tuple = ExecMaterializeSlot(outslot);
|
|
ItemPointerCopy(tid, &heaplocktup.t_self);
|
|
locktup = &heaplocktup;
|
|
}
|
|
|
|
PushActiveSnapshot(GetLatestSnapshot());
|
|
res = tableam_tuple_lock(targetRel, locktup, &buf, GetCurrentCommandId(false),
|
|
lockmode, LockWaitBlock, &hufd, false,
|
|
false, /* don't follow updates */
|
|
false, /* eval */
|
|
GetLatestSnapshot(), tid, /* ItemPointer */
|
|
false); /* is select for update */
|
|
|
|
/* the tuple slot already has the buffer pinned */
|
|
ReleaseBuffer(buf);
|
|
PopActiveSnapshot();
|
|
|
|
if (CheckTupleLockRes(res)) {
|
|
/* lock tuple failed, try again */
|
|
continue;
|
|
}
|
|
}
|
|
/* we are done */
|
|
break;
|
|
}
|
|
|
|
scan_handler_tbl_endscan(scan);
|
|
|
|
return found;
|
|
}
|
|
|
|
/*
|
|
* Insert tuple represented in the slot to the relation, update the indexes,
|
|
* and execute any constraints and per-row triggers.
|
|
*
|
|
* Caller is responsible for opening the indexes.
|
|
*/
|
|
void ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot, FakeRelationPartition *relAndPart)
|
|
{
|
|
Tuple tuple;
|
|
ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
|
|
Relation rel = resultRelInfo->ri_RelationDesc;
|
|
Relation targetRel = relAndPart->partRel == NULL ? rel : relAndPart->partRel;
|
|
|
|
/* For now we support only tables. */
|
|
Assert(rel->rd_rel->relkind == RELKIND_RELATION);
|
|
|
|
CheckCmdReplicaIdentity(rel, CMD_INSERT);
|
|
|
|
/* BEFORE ROW INSERT Triggers */
|
|
if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_insert_before_row) {
|
|
slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);
|
|
if (slot == NULL) {
|
|
/* "do nothing" */
|
|
return;
|
|
}
|
|
}
|
|
|
|
List *recheckIndexes = NIL;
|
|
/* Materialize slot into a tuple that we can scribble upon. */
|
|
tuple = tableam_tslot_get_tuple_from_slot(rel, slot);
|
|
tableam_tops_update_tuple_with_oid(targetRel, tuple, slot);
|
|
|
|
/* Compute stored generated columns */
|
|
if (rel->rd_att->constr && rel->rd_att->constr->has_generated_stored) {
|
|
ExecComputeStoredGenerated(resultRelInfo, estate, slot, tuple, CMD_INSERT);
|
|
tuple = slot->tts_tuple;
|
|
}
|
|
|
|
/* Check the constraints of the tuple */
|
|
if (rel->rd_att->constr) {
|
|
ExecConstraints(resultRelInfo, slot, estate, true);
|
|
tuple = ExecAutoIncrement(rel, estate, slot, tuple);
|
|
}
|
|
|
|
/* OK, store the tuple and create index entries for it */
|
|
(void)tableam_tuple_insert(targetRel, tuple, GetCurrentCommandId(true), 0, NULL);
|
|
if (resultRelInfo->ri_NumIndices > 0) {
|
|
ItemPointer pTSelf = tableam_tops_get_t_self(rel, tuple);
|
|
recheckIndexes =
|
|
ExecInsertIndexTuples(slot, pTSelf, estate, targetRel, relAndPart->part, InvalidBktId, NULL, NULL);
|
|
}
|
|
/* AFTER ROW INSERT Triggers */
|
|
ExecARInsertTriggers(estate, resultRelInfo, relAndPart->partOid, InvalidBktId, (HeapTuple)tuple, recheckIndexes);
|
|
|
|
list_free_ext(recheckIndexes);
|
|
}
|
|
|
|
/*
|
|
* Find the searchslot tuple and update it with data in the slot,
|
|
* update the indexes, and execute any constraints and per-row triggers.
|
|
*
|
|
* Caller is responsible for opening the indexes.
|
|
*/
|
|
void ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot,
|
|
FakeRelationPartition *relAndPart)
|
|
{
|
|
bool allowInplaceUpdate = true;
|
|
Tuple tuple = NULL;
|
|
ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
|
|
Relation rel = resultRelInfo->ri_RelationDesc;
|
|
ItemPointer searchSlotTid = tableam_tops_get_t_self(rel, searchslot->tts_tuple);
|
|
|
|
/* For now we support only tables. */
|
|
Assert(rel->rd_rel->relkind == RELKIND_RELATION);
|
|
|
|
CheckCmdReplicaIdentity(rel, CMD_UPDATE);
|
|
|
|
if ((resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_update_after_row) ||
|
|
resultRelInfo->ri_RelationDesc->rd_mlogoid) {
|
|
allowInplaceUpdate = false;
|
|
}
|
|
|
|
/* BEFORE ROW UPDATE Triggers */
|
|
if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_update_before_row) {
|
|
slot = ExecBRUpdateTriggers(estate, epqstate, resultRelInfo, relAndPart->partOid, InvalidBktId, NULL,
|
|
searchSlotTid, slot);
|
|
if (slot == NULL) {
|
|
/* "do nothing" */
|
|
return;
|
|
}
|
|
}
|
|
|
|
/* Materialize slot into a tuple that we can scribble upon. */
|
|
tuple = tableam_tslot_get_tuple_from_slot(rel, slot);
|
|
List *recheckIndexes = NIL;
|
|
Bitmapset *modifiedIdxAttrs = NULL;
|
|
TupleTableSlot *oldslot = NULL;
|
|
bool updateIndexes = false;
|
|
bool rowMovement = false;
|
|
TM_FailureData tmfd;
|
|
FakeRelationPartition newTupleInfo;
|
|
TM_Result res;
|
|
Relation targetRelation = relAndPart->partRel == NULL ? rel : relAndPart->partRel;
|
|
Relation parentRelation = relAndPart->partRel == NULL ? NULL : rel;
|
|
|
|
/* Compute stored generated columns */
|
|
if (rel->rd_att->constr && rel->rd_att->constr->has_generated_stored) {
|
|
ExecComputeStoredGenerated(resultRelInfo, estate, slot, tuple, CMD_UPDATE);
|
|
}
|
|
|
|
/* Check the constraints of the tuple */
|
|
if (rel->rd_att->constr) {
|
|
ExecConstraints(resultRelInfo, slot, estate);
|
|
}
|
|
|
|
/* check whether there is a row movement for partition table */
|
|
GetFakeRelAndPart(estate, rel, slot, &newTupleInfo);
|
|
if (newTupleInfo.partOid != InvalidOid && newTupleInfo.partOid != relAndPart->partOid) {
|
|
if (!rel->rd_rel->relrowmovement) {
|
|
ereport(ERROR, (errmodule(MOD_EXECUTOR), (errcode(ERRCODE_S_R_E_MODIFYING_SQL_DATA_NOT_PERMITTED),
|
|
errmsg("fail to update partitioned table \"%s\"", RelationGetRelationName(rel)),
|
|
errdetail("disable row movement"))));
|
|
}
|
|
rowMovement = true;
|
|
}
|
|
|
|
tuple = slot->tts_tuple;
|
|
CommandId cid = GetCurrentCommandId(true);
|
|
/* OK, update the tuple and index entries for it */
|
|
if (!rowMovement) {
|
|
res = tableam_tuple_update(targetRelation, parentRelation, searchSlotTid, tuple, cid,
|
|
InvalidSnapshot, estate->es_snapshot, true, &oldslot, &tmfd, &updateIndexes, &modifiedIdxAttrs,
|
|
false, allowInplaceUpdate);
|
|
CheckTupleModifyRes(res);
|
|
if (updateIndexes && resultRelInfo->ri_NumIndices > 0) {
|
|
ExecIndexTuplesState exec_index_tuples_state;
|
|
exec_index_tuples_state.estate = estate;
|
|
exec_index_tuples_state.targetPartRel = RELATION_IS_PARTITIONED(rel) ? targetRelation : NULL;
|
|
exec_index_tuples_state.p = relAndPart->part;
|
|
exec_index_tuples_state.conflict = NULL;
|
|
exec_index_tuples_state.rollbackIndex = false;
|
|
recheckIndexes = tableam_tops_exec_update_index_tuples(slot, oldslot, targetRelation, NULL, tuple,
|
|
searchSlotTid, exec_index_tuples_state, InvalidBktId, modifiedIdxAttrs);
|
|
}
|
|
} else {
|
|
/* rowMovement, delete origin tuple and insert new */
|
|
Assert(relAndPart->partRel != NULL);
|
|
Assert(newTupleInfo.partRel != NULL);
|
|
res = tableam_tuple_delete(relAndPart->partRel, searchSlotTid, cid, InvalidSnapshot,
|
|
estate->es_snapshot, true, &oldslot, &tmfd);
|
|
CheckTupleModifyRes(res);
|
|
|
|
ExecIndexTuplesState exec_index_tuples_state;
|
|
exec_index_tuples_state.estate = estate;
|
|
exec_index_tuples_state.targetPartRel = relAndPart->partRel;
|
|
exec_index_tuples_state.p = relAndPart->part;
|
|
exec_index_tuples_state.conflict = NULL;
|
|
exec_index_tuples_state.rollbackIndex = false;
|
|
tableam_tops_exec_delete_index_tuples(oldslot, relAndPart->partRel, NULL, searchSlotTid,
|
|
exec_index_tuples_state, modifiedIdxAttrs);
|
|
|
|
/* Insert new tuple */
|
|
(void)tableam_tuple_insert(newTupleInfo.partRel, tuple, cid, 0, NULL);
|
|
if (resultRelInfo->ri_NumIndices > 0) {
|
|
ItemPointer pTSelf = tableam_tops_get_t_self(rel, tuple);
|
|
recheckIndexes = ExecInsertIndexTuples(slot, pTSelf, estate, newTupleInfo.partRel,
|
|
newTupleInfo.part, InvalidBktId, NULL, NULL);
|
|
}
|
|
}
|
|
|
|
if (oldslot) {
|
|
ExecDropSingleTupleTableSlot(oldslot);
|
|
}
|
|
|
|
/* AFTER ROW UPDATE Triggers */
|
|
ExecARUpdateTriggers(estate, resultRelInfo, relAndPart->partOid, InvalidBktId, relAndPart->partOid,
|
|
searchSlotTid, (HeapTuple)tuple, NULL, recheckIndexes);
|
|
|
|
list_free(recheckIndexes);
|
|
}
|
|
|
|
/*
|
|
* Find the searchslot tuple and delete it, and execute any constraints
|
|
* and per-row triggers.
|
|
*
|
|
* Caller is responsible for opening the indexes.
|
|
*/
|
|
void ExecSimpleRelationDelete(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot,
|
|
FakeRelationPartition *relAndPart)
|
|
{
|
|
bool skip_tuple = false;
|
|
ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
|
|
Relation rel = resultRelInfo->ri_RelationDesc;
|
|
ItemPointer tid = tableam_tops_get_t_self(rel, searchslot->tts_tuple);
|
|
|
|
/* For now we support only tables. */
|
|
Assert(rel->rd_rel->relkind == RELKIND_RELATION);
|
|
|
|
CheckCmdReplicaIdentity(rel, CMD_DELETE);
|
|
|
|
/* BEFORE ROW INSERT Triggers */
|
|
if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_delete_before_row) {
|
|
skip_tuple =
|
|
!ExecBRDeleteTriggers(estate, epqstate, resultRelInfo, relAndPart->partOid, InvalidBktId, NULL, tid);
|
|
if (skip_tuple) {
|
|
return;
|
|
}
|
|
}
|
|
|
|
TupleTableSlot *oldslot = NULL;
|
|
Relation targetRel = relAndPart->partRel == NULL ? rel : relAndPart->partRel;
|
|
TM_FailureData tmfd;
|
|
|
|
/* OK, delete the tuple */
|
|
TM_Result res = tableam_tuple_delete(targetRel, tid, GetCurrentCommandId(true), InvalidSnapshot,
|
|
estate->es_snapshot, true, &oldslot, &tmfd);
|
|
CheckTupleModifyRes(res);
|
|
|
|
Bitmapset *modifiedIdxAttrs = NULL;
|
|
ExecIndexTuplesState exec_index_tuples_state;
|
|
exec_index_tuples_state.estate = estate;
|
|
exec_index_tuples_state.targetPartRel = RELATION_IS_PARTITIONED(rel) ? relAndPart->partRel : NULL;
|
|
exec_index_tuples_state.p = relAndPart->part;
|
|
exec_index_tuples_state.conflict = NULL;
|
|
exec_index_tuples_state.rollbackIndex = false;
|
|
tableam_tops_exec_delete_index_tuples(oldslot, targetRel, NULL, tid, exec_index_tuples_state, modifiedIdxAttrs);
|
|
if (oldslot) {
|
|
ExecDropSingleTupleTableSlot(oldslot);
|
|
}
|
|
|
|
/* AFTER ROW DELETE Triggers */
|
|
ExecARDeleteTriggers(estate, resultRelInfo, relAndPart->partOid, InvalidBktId, NULL, tid);
|
|
}
|
|
|
|
/*
|
|
* Check if command can be executed with current replica identity.
|
|
*/
|
|
void CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
|
|
{
|
|
PublicationActions *pubactions;
|
|
|
|
/* We only need to do checks for UPDATE and DELETE. */
|
|
if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
|
|
return;
|
|
|
|
/* If relation has replica identity we are always good. */
|
|
Assert(rel->relreplident == RelationGetRelReplident(rel));
|
|
if (rel->relreplident == REPLICA_IDENTITY_FULL || OidIsValid(RelationGetReplicaIndex(rel)))
|
|
return;
|
|
|
|
/*
|
|
* This is either UPDATE OR DELETE and there is no replica identity.
|
|
*
|
|
* Check if the table publishes UPDATES or DELETES.
|
|
*/
|
|
pubactions = GetRelationPublicationActions(rel);
|
|
if (cmd == CMD_UPDATE && pubactions->pubupdate) {
|
|
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
|
|
RelationGetRelationName(rel)),
|
|
errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
|
|
} else if (cmd == CMD_DELETE && pubactions->pubdelete) {
|
|
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
|
|
RelationGetRelationName(rel)),
|
|
errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
|
|
}
|
|
pfree(pubactions);
|
|
}
|
|
|
|
void GetFakeRelAndPart(EState *estate, Relation rel, TupleTableSlot *slot, FakeRelationPartition *relAndPart)
|
|
{
|
|
relAndPart->partRel = NULL;
|
|
relAndPart->part = NULL;
|
|
relAndPart->partOid = InvalidOid;
|
|
|
|
if (RelationIsNonpartitioned(rel)) {
|
|
return;
|
|
}
|
|
|
|
Relation partRelation = NULL;
|
|
Partition partition = NULL;
|
|
Oid partitionOid;
|
|
Tuple tuple = tableam_tslot_get_tuple_from_slot(rel, slot);
|
|
switch (rel->rd_rel->parttype) {
|
|
case PARTTYPE_NON_PARTITIONED_RELATION:
|
|
case PARTTYPE_VALUE_PARTITIONED_RELATION:
|
|
break;
|
|
case PARTTYPE_PARTITIONED_RELATION:
|
|
partitionOid = heapTupleGetPartitionId(rel, tuple);
|
|
searchFakeReationForPartitionOid(estate->esfRelations, estate->es_query_cxt, rel, partitionOid,
|
|
partRelation, partition, RowExclusiveLock);
|
|
relAndPart->partRel = partRelation;
|
|
relAndPart->part = partition;
|
|
relAndPart->partOid = partitionOid;
|
|
break;
|
|
case PARTTYPE_SUBPARTITIONED_RELATION: {
|
|
Relation subPartRel = NULL;
|
|
Partition subPart = NULL;
|
|
Oid subPartOid;
|
|
partitionOid = heapTupleGetPartitionId(rel, tuple);
|
|
searchFakeReationForPartitionOid(estate->esfRelations, estate->es_query_cxt, rel, partitionOid,
|
|
partRelation, partition, RowExclusiveLock);
|
|
subPartOid = heapTupleGetPartitionId(partRelation, tuple);
|
|
searchFakeReationForPartitionOid(estate->esfRelations, estate->es_query_cxt, partRelation, subPartOid,
|
|
subPartRel, subPart, RowExclusiveLock);
|
|
|
|
relAndPart->partRel = subPartRel;
|
|
relAndPart->part = subPart;
|
|
relAndPart->partOid = subPartOid;
|
|
break;
|
|
}
|
|
default:
|
|
ereport(ERROR, (errmodule(MOD_EXECUTOR),
|
|
(errcode(ERRCODE_UNRECOGNIZED_NODE_TYPE), errmsg("Unrecognized parttype as \"%c\" for relation \"%s\"",
|
|
rel->rd_rel->parttype, RelationGetRelationName(rel)))));
|
|
break;
|
|
}
|
|
}
|