!2190 修复行存压缩表 truncate xlog日志记录 opt属性问题

Merge pull request !2190 from Fat-ray/master
This commit is contained in:
opengauss-bot
2022-09-28 09:56:36 +00:00
committed by Gitee
10 changed files with 85 additions and 46 deletions

View File

@ -759,15 +759,24 @@ void PartitionTruncate(Relation parent, Partition part, BlockNumber nblocks)
if (RelationNeedsWAL(parent)) {
XLogRecPtr lsn;
xl_smgr_truncate xlrec;
xl_smgr_truncate_compress xlrec;
uint8 info = XLOG_SMGR_TRUNCATE | XLR_SPECIAL_REL_UPDATE;
int redoSize;
xlrec.blkno = nblocks;
RelFileNodeRelCopy(xlrec.rnode, part->pd_node);
if (rel->rd_node.opt != 0) {
xlrec.pageCompressOpts = rel->rd_node.opt;
info |= XLR_REL_COMPRESS;
redoSize = sizeof(xl_smgr_truncate_compress);
} else {
redoSize = sizeof(xl_smgr_truncate);
}
xlrec.xlrec.blkno = nblocks;
RelFileNodeRelCopy(xlrec.xlrec.rnode, part->pd_node);
XLogBeginInsert();
XLogRegisterData((char*)&xlrec, sizeof(xlrec));
XLogRegisterData((char*)&xlrec, redoSize);
lsn = XLogInsert(RM_SMGR_ID, XLOG_SMGR_TRUNCATE | XLR_SPECIAL_REL_UPDATE, part->pd_node.bucketNode);
lsn = XLogInsert(RM_SMGR_ID, info, part->pd_node.bucketNode);
/*
* Flush, because otherwise the truncation of the main relation might
@ -1286,6 +1295,7 @@ void smgrApplyXLogTruncateRelation(XLogReaderState* record)
RelFileNodeBackend rbnode;
RelFileNodeCopy(rbnode.node, xlrec->rnode, (int2)XLogRecGetBucketId(record));
rbnode.node.opt = GetTruncateXlogFileNodeOpt(record);
rbnode.backend = InvalidBackendId;
smgrclosenode(rbnode);

View File

@ -408,61 +408,67 @@ static void ReadBinaryFileBlocksFirstCall(PG_FUNCTION_ARGS, int32 startBlockNum,
(void)MemoryContextSwitchTo(mctx);
}
static Relation CompressAddressGetRelation(Oid relid)
static RelFileNode GetCompressRelFileNode(Oid oid)
{
HeapTuple tuple;
Form_pg_partition pgPartitionForm;
RelFileNode rnode;
rnode.spcNode = InvalidOid;
rnode.dbNode = InvalidOid;
rnode.relNode = InvalidOid;
rnode.bucketNode = InvalidBktId;
rnode.opt = 0;
// try to get origin oid
Relation relation = try_relation_open(relid, AccessShareLock);
Relation relation = try_relation_open(oid, AccessShareLock);
if (RelationIsValid(relation)) {
return relation;
rnode = relation->rd_node;
relation_close(relation, AccessShareLock);
return rnode;
}
// try to open parrent oid
Oid parentoid = partid_get_parentid(relid);
if (!OidIsValid(parentoid)) {
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("the specified relid is invalid.")));
tuple = SearchSysCache1(PARTRELID, ObjectIdGetDatum(oid));
if (!HeapTupleIsValid(tuple)) {
return rnode;
}
relation = try_relation_open(parentoid, AccessShareLock);
if (RelationIsValid(relation)) {
return relation;
pgPartitionForm = (Form_pg_partition)GETSTRUCT(tuple);
switch (pgPartitionForm->parttype) {
case PARTTYPE_PARTITIONED_RELATION:
case PARTTYPE_VALUE_PARTITIONED_RELATION:
case PARTTYPE_SUBPARTITIONED_RELATION:
rnode.spcNode = ConvertToRelfilenodeTblspcOid(pgPartitionForm->reltablespace);
if (pgPartitionForm->relfilenode) {
rnode.relNode = pgPartitionForm->relfilenode;
} else {
rnode.relNode = RelationMapOidToFilenode(oid, false);
}
if (rnode.spcNode == GLOBALTABLESPACE_OID) {
rnode.dbNode = InvalidOid;
} else {
rnode.dbNode = u_sess->proc_cxt.MyDatabaseId;
}
rnode.bucketNode = InvalidBktId;
break;
default:
break;
}
// try to open parrent oid
parentoid = partid_get_parentid(parentoid);
if (!OidIsValid(parentoid)) {
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("the specified relid is invalid.")));
}
relation = try_relation_open(parentoid, AccessShareLock);
if (RelationIsValid(relation)) {
return relation;
}
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("the specified relid is invalid.")));
return relation;
ReleaseSysCache(tuple);
return rnode;
}
static CompressAddrState* CompressAddressInit(PG_FUNCTION_ARGS)
{
Oid relid = PG_GETARG_OID(0);
Oid old = PG_GETARG_OID(0);
int64 segmentNo = PG_GETARG_INT64(1);
RelFileNode relFileNode;
Relation relation = CompressAddressGetRelation(relid);
relation_close(relation, AccessShareLock);
relFileNode = relation->rd_node;
relFileNode.relNode = relid; // specified the reloid to construct the path
relFileNode = GetCompressRelFileNode(old);
if (!OidIsValid(relFileNode.relNode)) {
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("can not find table ")));
}
if (!IS_COMPRESSED_RNODE(relFileNode, MAIN_FORKNUM)) {
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("the specified file(relation) is non-compression relation.")));
}
char *path = relpathbackend(relFileNode, InvalidBackendId, MAIN_FORKNUM);
char pcaFilePath[MAXPGPATH];

View File

@ -40,6 +40,7 @@ XLogRecParseState *smgr_xlog_relnode_parse_to_block(XLogReaderState *record, uin
int ddltype;
bool colmrel = false;
bool compress = (bool)(XLogRecGetInfo(record) & XLR_REL_COMPRESS);
uint2 opt;
if (info == XLOG_SMGR_CREATE) {
xl_smgr_create *xlrec = (xl_smgr_create *)XLogRecGetData(record);
@ -47,11 +48,13 @@ XLogRecParseState *smgr_xlog_relnode_parse_to_block(XLogReaderState *record, uin
forknum = xlrec->forkNum;
ddltype = BLOCK_DDL_CREATE_RELNODE;
colmrel = IsValidColForkNum(xlrec->forkNum);
opt = compress ? ((xl_smgr_create_compress*)(void *)XLogRecGetData(record))->pageCompressOpts : 0;
} else {
xl_smgr_truncate *xlrec = (xl_smgr_truncate *)XLogRecGetData(record);
rnode = &(xlrec->rnode);
blkno = xlrec->blkno;
ddltype = BLOCK_DDL_TRUNCATE_RELNODE;
opt = compress ? ((xl_smgr_truncate_compress*)(void *)XLogRecGetData(record))->pageCompressOpts : 0;
}
(*blocknum)++;
@ -59,6 +62,7 @@ XLogRecParseState *smgr_xlog_relnode_parse_to_block(XLogReaderState *record, uin
RelFileNode tmp_node;
RelFileNodeCopy(tmp_node, *rnode, (int2)XLogRecGetBucketId(record));
tmp_node.opt = opt;
RelFileNodeForkNum filenode = RelFileNodeForkNumFill(&tmp_node, InvalidBackendId, forknum, blkno);
XLogRecSetBlockCommonState(record, BLOCK_DATA_DDL_TYPE, filenode, recordstatehead);

View File

@ -41,7 +41,7 @@ void smgr_desc(StringInfo buf, XLogReaderState *record)
xl_smgr_create *xlrec = (xl_smgr_create *)rec;
RelFileNode rnode;
RelFileNodeCopy(rnode, xlrec->rnode, XLogRecGetBucketId(record));
rnode.opt = GetCreateXlogFileNodeOpt(record);
char *path = relpathperm(rnode, xlrec->forkNum);
appendStringInfo(buf, "file create: %s", path);
@ -55,7 +55,7 @@ void smgr_desc(StringInfo buf, XLogReaderState *record)
xl_smgr_truncate *xlrec = (xl_smgr_truncate *)rec;
RelFileNode rnode;
RelFileNodeCopy(rnode, xlrec->rnode, XLogRecGetBucketId(record));
rnode.opt = GetTruncateXlogFileNodeOpt(record);
char *path = relpathperm(rnode, MAIN_FORKNUM);
appendStringInfo(buf, "file truncate: %s to %u blocks", path, xlrec->blkno);

View File

@ -1228,6 +1228,7 @@ static void TrackRelStorageCreate(XLogReaderState *record)
RelFileNode rnode;
RelFileNodeCopy(rnode, xlrec->rnode, (int2)XLogRecGetBucketId(record));
rnode.opt = GetCreateXlogFileNodeOpt(record);
/* Logic relfilenode create is ignored */
if (IsSegmentFileNode(rnode)) {
@ -1242,6 +1243,7 @@ static void TrackRelStorageTruncate(XLogReaderState *record)
BlockNumber mainTruncBlkNo, fsmTruncBlkNo, vmTruncBlkNo;
RelFileNode rnode;
RelFileNodeCopy(rnode, xlrec->rnode, (int2)XLogRecGetBucketId(record));
rnode.opt = GetTruncateXlogFileNodeOpt(record);
/* Logic relfilenode create is ignored */
if (IsSegmentFileNode(rnode)) {

View File

@ -1086,11 +1086,13 @@ static bool DispatchSmgrRecord(XLogReaderState *record, List *expectedTLIs, Time
xl_smgr_create *xlrec = (xl_smgr_create *)XLogRecGetData(record);
RelFileNode rnode;
RelFileNodeCopy(rnode, xlrec->rnode, XLogRecGetBucketId(record));
rnode.opt = GetCreateXlogFileNodeOpt(record);
DispatchToOnePageWorker(record, rnode, expectedTLIs);
} else if (IsSmgrTruncate(record)) {
xl_smgr_truncate *xlrec = (xl_smgr_truncate *)XLogRecGetData(record);
RelFileNode rnode;
RelFileNodeCopy(rnode, xlrec->rnode, XLogRecGetBucketId(record));
rnode.opt = GetTruncateXlogFileNodeOpt(record);
uint32 id = GetSlotId(rnode, 0, 0, GetBatchCount());
AddSlotToPLSet(id);

View File

@ -977,6 +977,7 @@ static bool DispatchSmgrRecord(XLogReaderState *record, List *expectedTLIs, Time
xl_smgr_create *xlrec = (xl_smgr_create *)XLogRecGetData(record);
RelFileNode rnode;
RelFileNodeCopy(rnode, xlrec->rnode, XLogRecGetBucketId(record));
rnode.opt = GetCreateXlogFileNodeOpt(record);
DispatchToOnePageWorker(record, rnode, expectedTLIs);
} else {
@ -996,6 +997,7 @@ static bool DispatchSmgrRecord(XLogReaderState *record, List *expectedTLIs, Time
xl_smgr_truncate *xlrec = (xl_smgr_truncate *)XLogRecGetData(record);
RelFileNode rnode;
RelFileNodeCopy(rnode, xlrec->rnode, XLogRecGetBucketId(record));
rnode.opt = GetTruncateXlogFileNodeOpt(record);
id = GetWorkerId(rnode, 0, 0);
AddWorkerToSet(id);
} else {

View File

@ -179,5 +179,17 @@ extern void CfsShrinkRedo(XLogReaderState *record);
extern void CfsShrinkDesc(StringInfo buf, XLogReaderState *record);
extern const char* CfsShrinkTypeName(uint8 subtype);
static inline uint2 GetCreateXlogFileNodeOpt(const XLogReaderState *record)
{
bool compress = (bool)(XLogRecGetInfo(record) & XLR_REL_COMPRESS);
return compress ? ((xl_smgr_create_compress*)(void *)XLogRecGetData(record))->pageCompressOpts : 0;
}
static inline uint2 GetTruncateXlogFileNodeOpt(const XLogReaderState *record)
{
bool compress = (bool)(XLogRecGetInfo(record) & XLR_REL_COMPRESS);
return compress ? ((xl_smgr_truncate_compress*)(void *)XLogRecGetData(record))->pageCompressOpts : 0;
}
#endif /* STORAGE_XLOG_H */

View File

@ -70,13 +70,14 @@ BlockNumber PageCompression::GetSegmentNo() const
size_t PageCompression::ReadCompressedBuffer(BlockNumber blockNum, char *buffer, size_t bufferLen, bool zeroAlign)
{
CfsExtentHeader *header = GetStruct(blockNum);
BlockNumber globalBlockNumber = (this->segmentNo * CFS_LOGIC_BLOCKS_PER_FILE) + blockNum;
CfsReadStruct cfsReadStruct {
.fd = this->fd,
.header = header,
.extentCount = blockNum / CFS_LOGIC_BLOCKS_PER_EXTENT
};
size_t actualSize = CfsReadCompressedPage(buffer, bufferLen,
blockNum % CFS_LOGIC_BLOCKS_PER_EXTENT, &cfsReadStruct, InvalidBlockNumber);
blockNum % CFS_LOGIC_BLOCKS_PER_EXTENT, &cfsReadStruct, globalBlockNumber);
/* valid check */
if (actualSize == COMPRESS_FSEEK_ERROR) {
return 0;

View File

@ -267,9 +267,9 @@ ERROR: the specified file(relation) is non-compression relation.
select * from compress_statistic_info(compress_func_findpath('row_noncompression_test_tbl1_kkk'), 1);
ERROR: the specified file(relation) is non-compression relation.
select * from compress_address_header(compress_func_findoid('row_noncompression_test_tbl1_kkk'), 0);
ERROR: the specified file(relation) is non-compression relation.
ERROR: compress File: can not find file
select * from compress_address_details(compress_func_findoid('row_noncompression_test_tbl1_kkk'), 0);
ERROR: the specified file(relation) is non-compression relation.
ERROR: compress File: can not find file
drop table if exists row_noncompression_test_tbl1_kkk cascade;
--compression table
drop table if exists row_compression_test_tbl1_kkk cascade;