!6227 修复toast表索引类型问题

Merge pull request !6227 from lyannaa/master905
This commit is contained in:
opengauss_bot
2024-09-05 13:42:58 +00:00
committed by Gitee
8 changed files with 28 additions and 119 deletions

View File

@ -135,10 +135,10 @@ static bool create_toast_table(Relation rel, Oid toastOid, Oid toastIndexOid, Da
Oid toast_relid;
Oid toast_typid = InvalidOid;
Oid namespaceid;
Oid index_am_oid = BTREE_AM_OID;
char toast_relname[NAMEDATALEN];
char toast_idxname[NAMEDATALEN];
IndexInfo* indexInfo = NULL;
Oid index_am_oid = BTREE_AM_OID;
Oid collationObjectId[2];
Oid classObjectId[2];
int16 coloptions[2];

View File

@ -985,25 +985,21 @@ Datum transformRelOptions(Datum oldOptions, List *defList, const char *namspace,
}
}
if (namspace != NULL && pg_strcasecmp(namspace, "toast") == 0 && toastStorageType != NULL) {
const char *actualStorageType = NULL;
if (storageType == NULL) {
actualStorageType = u_sess->attr.attr_sql.enable_default_ustore_table ? "ustore" : "astore";
} else {
actualStorageType = storageType;
}
if (pg_strcasecmp(actualStorageType, "astore") == 0 || pg_strcasecmp(actualStorageType, "ustore") == 0) {
if (pg_strcasecmp(actualStorageType, toastStorageType) != 0) {
/* we did not specify a storage type for toast, so use the same storage type as its parent */
if (namspace != NULL && pg_strcasecmp(namspace, "toast") == 0) {
if (toastStorageType != NULL) {
const char *parentStorageType = (storageType == NULL)
? (u_sess->attr.attr_sql.enable_default_ustore_table ? "ustore" : "astore")
: storageType;
if (parentStorageType != toastStorageType) {
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("toast cannot be set for %s with storage_type=%s", actualStorageType, toastStorageType)));
errmsg("parent storage type is %s but toast storage type is %s, toast should use the "
"same storage type as its parent",
parentStorageType, toastStorageType)));
}
}
}
/* we did not specify a storage type for toast, so use the same storage type as its parent */
if (namspace != NULL && pg_strcasecmp(namspace, "toast") == 0 && !toastStorageTypeSet) {
if (storageType != NULL) {
if (!toastStorageTypeSet && storageType != NULL) {
Size len = VARHDRSZ + strlen("storage_type") + 1 + strlen(storageType);
/* +1 leaves room for sprintf's trailing null */
text *t = (text *)palloc(len + 1);

View File

@ -2644,6 +2644,9 @@ bool heap_hot_search(ItemPointer tid, Relation relation, Snapshot snapshot, bool
bool result = false;
Buffer buffer;
HeapTupleData heap_tuple;
if (RelationIsUstoreFormat(relation)) {
ereport(ERROR, (errmsg("heap_hot_search relation type is ustore!")));
}
buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
LockBuffer(buffer, BUFFER_LOCK_SHARE);

View File

@ -2584,7 +2584,7 @@ static bool CheckItemIsAlive(ItemPointer tid, Relation relation, Snapshot snapsh
bool* all_dead, CUDescScan* cudescScan)
{
if (!RelationIsCUFormat(relation)) {
return TableIndexFetchTupleCheck(relation, tid, snapshot, all_dead);
return heap_hot_search(tid, relation, snapshot, all_dead);
} else {
return cudescScan->CheckItemIsAlive(tid);
}

View File

@ -1533,6 +1533,7 @@ void UHeap2XlogFreezeOperatorPage(RedoBufferInfo *buffer, void *recorddata, void
offsets++;
}
}
PageSetLSN(page, buffer->lsn);
}
void UHeap2XlogExtendTDSlotsOperatorPage(RedoBufferInfo *buffer, void *recorddata)

View File

@ -46,22 +46,20 @@ static Datum UHeapToastSaveDatum(Relation rel, Datum value, struct varlena *olde
static Datum UHeapToastCompressDatum(Datum value);
static bool UHeapToastIdValueIdExists(Oid toastrelid, Oid valueid, int2 bucketid);
static bool UHeapToastRelValueidExists(Relation toastrel, Oid valueid);
static Oid UHeapGetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn,
bool *inconsistent);
static Oid UHeapGetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn);
static Datum UHeapToastCompressDatum(Datum value)
{
return toast_compress_datum(value);
}
Oid UHeapGetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn, bool *inconsistent)
Oid UHeapGetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
{
Oid newOid;
SysScanDesc scan;
ScanKeyData key;
bool collides = false;
Assert(RelationIsUstoreFormat(relation) || RelationIsToast(relation));
Assert(inconsistent != NULL);
TupleTableSlot *slot = MakeSingleTupleTableSlot(RelationGetDescr(relation), false, relation->rd_tam_ops);
/* Generate new OIDs until we find one not in the table */
do {
@ -72,40 +70,12 @@ Oid UHeapGetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn
* chunk_id for toast datum to prevent wrap around.
*/
newOid = GetNewObjectId(IsToastNamespace(RelationGetNamespace(relation)));
*inconsistent = false;
ScanKeyInit(&key, oidcolumn, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(newOid));
/* see notes above about using SnapshotAny */
scan = systable_beginscan(relation, indexId, true, SnapshotAny, ATTR_FIRST, &key);
while (UHeapSysIndexGetnextSlot(scan, ForwardScanDirection, slot)) {
bool isnull = false;
UHeapTuple ttup = ExecGetUHeapTupleFromSlot(slot);
Oid chunk_id = DatumGetObjectId(UHeapFastGetAttr(ttup, ATTR_FIRST, RelationGetDescr(relation), &isnull));
Assert(!isnull);
if (chunk_id == newOid) {
collides = true;
break;
} else {
*inconsistent = true;
if (scan->iscan != NULL && (!scan->iscan->xactStartedInRecovery)) {
scan->iscan->kill_prior_tuple = true;
BTScanOpaque so = (BTScanOpaque)scan->iscan->opaque;
if (so != NULL) {
BTScanPosItem indexItem = so->currPos.items[so->currPos.itemIndex];
OffsetNumber indexOffset = indexItem.indexOffset;
ItemPointerData heapTid = indexItem.heapTid;
ereport(LOG, (errcode(ERRCODE_UNEXPECTED_CHUNK_VALUE),
errmsg("found toast chunk %u is not scan toast value %u of toast relation %u, will skip."
"toast index tuple at offset %hu with ctid (%u, %u) is marked dead.",
chunk_id, newOid, relation->rd_node.relNode, indexOffset,
ItemPointerGetBlockNumber(&heapTid), ItemPointerGetOffsetNumber(&heapTid)),
errcause("found toast chunk is not scan toast value."),
erraction("Check the toast chunk.")));
}
}
}
}
collides = UHeapSysIndexGetnextSlot(scan, ForwardScanDirection, slot);
systable_endscan(scan);
} while (collides);
ExecDropSingleTupleTableSlot(slot);
@ -695,7 +665,6 @@ static Datum UHeapToastSaveDatum(Relation rel, Datum value, struct varlena *olde
Pointer dval = DatumGetPointer(value);
errno_t rc;
int2 bucketid = InvalidBktId;
bool inconsistent = false;
Assert(!VARATT_IS_EXTERNAL(value));
rc = memset_s(&chunkData, sizeof(chunkData), 0, sizeof(chunkData));
securec_check(rc, "", "");
@ -767,7 +736,7 @@ static Datum UHeapToastSaveDatum(Relation rel, Datum value, struct varlena *olde
*/
if (!OidIsValid(rel->rd_toastoid)) {
/* normal case: just choose an unused OID */
toastPointer.va_valueid = UHeapGetNewOidWithIndex(toastrel, RelationGetRelid(toastidx), (AttrNumber)1, &inconsistent);
toastPointer.va_valueid = UHeapGetNewOidWithIndex(toastrel, RelationGetRelid(toastidx), (AttrNumber)1);
} else {
/* rewrite case: check to see if value was in old toast table */
toastPointer.va_valueid = InvalidOid;
@ -812,7 +781,7 @@ static Datum UHeapToastSaveDatum(Relation rel, Datum value, struct varlena *olde
* old or new toast table
*/
do {
toastPointer.va_valueid = UHeapGetNewOidWithIndex(toastrel, RelationGetRelid(toastidx), (AttrNumber)1, &inconsistent);
toastPointer.va_valueid = UHeapGetNewOidWithIndex(toastrel, RelationGetRelid(toastidx), (AttrNumber)1);
} while (UHeapToastIdValueIdExists(rel->rd_toastoid, toastPointer.va_valueid, bucketid));
}
}
@ -855,7 +824,7 @@ static Datum UHeapToastSaveDatum(Relation rel, Datum value, struct varlena *olde
* the TOAST table, since we don't bother to update anything else.
*/
(void)index_insert(toastidx, tValues, tIsnull, &(toasttup->ctid), toastrel,
(toastidx->rd_index->indisunique && !inconsistent) ? UNIQUE_CHECK_YES : UNIQUE_CHECK_NO);
toastidx->rd_index->indisunique ? UNIQUE_CHECK_YES : UNIQUE_CHECK_NO);
/*
* Free memory
@ -913,8 +882,6 @@ static void UHeapToastDeleteDatum(Relation rel, Datum value, int options)
SysScanDesc toastscan;
UHeapTuple toasttup;
int2 bucketid;
bool found = false;
bool isnull = false;
if (!VARATT_IS_EXTERNAL_ONDISK_B(attr))
return;
@ -958,14 +925,6 @@ static void UHeapToastDeleteDatum(Relation rel, Datum value, int options)
* Have a chunk, delete it
*/
toasttup = ExecGetUHeapTupleFromSlot(slot);
Oid chunk_id = DatumGetObjectId(UHeapFastGetAttr(toasttup, ATTR_FIRST, RelationGetDescr(toastrel), &isnull));
Assert(!isnull);
if (chunk_id != toastPointer.va_valueid) {
ereport(LOG, (errmsg("Delete toast chunk %u is not scan toast chunk %u of toast relation is %u, will skip",
chunk_id, toastPointer.va_valueid, toastPointer.va_toastrelid)));
continue;
}
found = true;
SimpleUHeapDelete(toastrel, &toasttup->ctid, SnapshotToast);
Datum values[INDEX_MAX_KEYS];
@ -979,11 +938,6 @@ static void UHeapToastDeleteDatum(Relation rel, Datum value, int options)
index_delete(toastidx, values, isnulls, &toasttup->ctid, false);
}
if (!found) {
ereport(LOG, (errmsg("Toast chunk %u of toast relation is %u delete 0 rows", toastPointer.va_valueid,
toastPointer.va_toastrelid)));
}
/*
* End scan and close relations
*/
@ -1052,31 +1006,6 @@ struct varlena *UHeapInternalToastFetchDatum(struct varatt_external toastPointer
* Have a chunk, extract the sequence number and the data
*/
ttup = ExecGetUHeapTupleFromSlot(slot);
Oid chunk_id = DatumGetObjectId(UHeapFastGetAttr(ttup, ATTR_FIRST, toastTupDesc, &isnull));
Assert(!isnull);
if (chunk_id != toastPointer.va_valueid) {
if (toastscan->iscan != NULL && (!toastscan->iscan->xactStartedInRecovery)) {
toastscan->iscan->kill_prior_tuple = true;
BTScanOpaque so = (BTScanOpaque)toastscan->iscan->opaque;
if (so != NULL) {
BTScanPosItem indexItem = so->currPos.items[so->currPos.itemIndex];
OffsetNumber indexOffset = indexItem.indexOffset;
ItemPointerData heapTid = indexItem.heapTid;
ereport(LOG, (errcode(ERRCODE_UNEXPECTED_CHUNK_VALUE),
errmsg("UHeapInternalToastFetchDatum found toast chunk %u is not scan toast chunk %u of "
"toast relation %u toast size detail (%d, %d), will skip."
"toast index tuple at offset %hu with ctid (%u, %u) is marked dead,"
"toast tuple ctid is (%u, %u).",
chunk_id, toastPointer.va_valueid, toastPointer.va_toastrelid,
toastPointer.va_rawsize, toastPointer.va_extsize, indexOffset,
ItemPointerGetBlockNumber(&heapTid), ItemPointerGetOffsetNumber(&heapTid),
ItemPointerGetBlockNumber(&(ttup->ctid)), ItemPointerGetOffsetNumber(&(ttup->ctid))),
errcause("found toast chunk is not scan toast value."),
erraction("Check the toast chunk.")));
}
}
continue;
}
residx = DatumGetInt32(UHeapFastGetAttr(ttup, ATTR_SECOND, toastTupDesc, &isnull));
Assert(!isnull);
chunk = DatumGetPointer(UHeapFastGetAttr(ttup, ATTR_THIRD, toastTupDesc, &isnull));
@ -1263,13 +1192,6 @@ struct varlena *UHeapInternalToastFetchDatumSlice(struct varatt_external toastPo
* Have a chunk, extract the sequence number and the data
*/
ttup = ExecGetUHeapTupleFromSlot(slot);
Oid chunk_id = DatumGetObjectId(UHeapFastGetAttr(ttup, ATTR_FIRST, toastTupDesc, &isnull));
Assert(!isnull);
if (chunk_id != toastPointer.va_valueid) {
ereport(LOG, (errmsg("UHeapInternalToastFetchDatumSlice find toast chunk %u is not scan toast chunk %u of "
"toast relation %u, will skip", chunk_id, toastPointer.va_valueid, toastPointer.va_toastrelid)));
continue;
}
residx = DatumGetInt32(UHeapFastGetAttr(ttup, CHUNK_ID_ATTR, toastTupDesc, &isnull));
Assert(!isnull);
chunk = DatumGetPointer(UHeapFastGetAttr(ttup, CHUNK_DATA_ATTR, toastTupDesc, &isnull));
@ -1380,20 +1302,7 @@ static bool UHeapToastRelValueidExists(Relation toastrel, Oid valueid)
* Is there any such chunk?
*/
toastscan = systable_beginscan(toastrel, toastrel->rd_rel->reltoastidxid, true, SnapshotAny, 1, &toastkey);
while (UHeapSysIndexGetnextSlot(toastscan, ForwardScanDirection, slot)) {
bool isnull = false;
UHeapTuple ttup = ExecGetUHeapTupleFromSlot(slot);
Oid chunk_id = DatumGetObjectId(UHeapFastGetAttr(ttup, ATTR_FIRST, RelationGetDescr(toastrel), &isnull));
Assert(!isnull);
if (chunk_id == valueid) {
result = true;
break;
}
else {
ereport(LOG, (errmsg("UHeapToastRelValueidExists find toast chunk %u is not scan toast chunk %u of toast "
"relation %u, will skip", chunk_id, valueid, toastrel->rd_id)));
}
}
result = UHeapSysIndexGetnextSlot(toastscan, ForwardScanDirection, slot);
systable_endscan(toastscan);
ExecDropSingleTupleTableSlot(slot);

View File

@ -196,7 +196,7 @@ static const struct {
"%s, PageHeaderInfo: pd_lsn:%X/%X, pd_checksum:%u, pd_flags:%u, " \
"pd_lower:%u, pd_upper:%u, pd_special:%u, pd_pagesize_version:%u, pd_prune_xid:%u", \
_msg, pageHeader->pd_lsn.xlogid, \
pageHeader->pd_lsn.xlogid << XLOG_UHEAP_LSN_HIGH_OFF + pageHeader->pd_lsn.xrecoff, \
((uint64)pageHeader->pd_lsn.xlogid << XLOG_UHEAP_LSN_HIGH_OFF) + pageHeader->pd_lsn.xrecoff, \
pageHeader->pd_checksum, pageHeader->pd_flags, pageHeader->pd_lower, pageHeader->pd_upper, \
pageHeader->pd_special, pageHeader->pd_pagesize_version, pageHeader->pd_prune_xid); \
} while (0)

View File

@ -222,9 +222,9 @@
"%s, UPageHeaderInfo: pd_lsn:%X/%X, pd_checksum:%u, " \
"pd_flags:%u, pd_lower:%u, " \
"pd_upper:%u, pd_special:%u, pd_pagesize_version:%u, potential_freespace:%u, td_count:%u, " \
"pd_prune_xid:%lu, pd_xid_base:%lu, pd_multi_base:%lu" _msg, \
pageHeader->pd_lsn.xlogid, \
pageHeader->pd_lsn.xlogid << XLOG_UHEAP_LSN_HIGH_OFF + pageHeader->pd_lsn.xrecoff, \
"pd_prune_xid:%lu, pd_xid_base:%lu, pd_multi_base:%lu", \
_msg, pageHeader->pd_lsn.xlogid, \
((uint64)pageHeader->pd_lsn.xlogid << XLOG_UHEAP_LSN_HIGH_OFF) + pageHeader->pd_lsn.xrecoff, \
pageHeader->pd_checksum, pageHeader->pd_flags, pageHeader->pd_lower, pageHeader->pd_upper, \
pageHeader->pd_special, pageHeader->pd_pagesize_version, pageHeader->potential_freespace, \
pageHeader->td_count, pageHeader->pd_prune_xid, pageHeader->pd_xid_base, pageHeader->pd_multi_base); \