diff --git a/src/common/backend/utils/cache/relcache.cpp b/src/common/backend/utils/cache/relcache.cpp index 88bdf127c..14eab549e 100755 --- a/src/common/backend/utils/cache/relcache.cpp +++ b/src/common/backend/utils/cache/relcache.cpp @@ -8718,12 +8718,20 @@ char RelationGetRelReplident(Relation r) bool IsRelationReplidentKey(Relation r, int attno) { - if (RelationGetRelReplident(r) == REPLICA_IDENTITY_FULL) + /* system column is not replica identify key. */ + if (attno <= 0) { + return false; + } + + /* any user attribute is replica identity key for FULL */ + if (r->relreplident == REPLICA_IDENTITY_FULL) { return true; + } Oid replidindex = RelationGetReplicaIndex(r); - if (!OidIsValid(replidindex)) + if (!OidIsValid(replidindex)) { return true; + } Relation idx_rel = RelationIdGetRelation(replidindex); diff --git a/src/gausskernel/process/threadpool/knl_instance.cpp b/src/gausskernel/process/threadpool/knl_instance.cpp index e249ae664..4a8ee49b4 100755 --- a/src/gausskernel/process/threadpool/knl_instance.cpp +++ b/src/gausskernel/process/threadpool/knl_instance.cpp @@ -316,7 +316,6 @@ static void knl_g_parallel_decode_init(knl_g_parallel_decode_context* pdecode_cx pdecode_cxt->totalNum = 0; pdecode_cxt->edata = NULL; SpinLockInit(&(pdecode_cxt->rwlock)); - SpinLockInit(&(pdecode_cxt->destroy_lock)); } static void knl_g_cache_init(knl_g_cache_context* cache_cxt) diff --git a/src/gausskernel/storage/replication/logical/decode.cpp b/src/gausskernel/storage/replication/logical/decode.cpp index adbe842dc..3befcfc34 100644 --- a/src/gausskernel/storage/replication/logical/decode.cpp +++ b/src/gausskernel/storage/replication/logical/decode.cpp @@ -1149,11 +1149,6 @@ static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* only interested in our database */ Size tuplelen; char *tupledata = XLogRecGetBlockData(r, 0, &tuplelen); - if (tuplelen == 0 && !AllocSizeIsValid(tuplelen)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("tuplelen is invalid(%lu), tuplelen, don't decode it", tuplelen))); - return; - } XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); if (target_node.dbNode != ctx->slot->data.database) return; @@ -1195,11 +1190,6 @@ static void AreaDecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } Size tuplelen; char *tupledata = XLogRecGetBlockData(r, 0, &tuplelen); - if (tuplelen == 0 && !AllocSizeIsValid(tuplelen)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("tuplelen is invalid(%lu), don't decode it", tuplelen))); - return; - } XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); /* output plugin doesn't look for this origin, no need to queue */ if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) { @@ -1239,11 +1229,6 @@ static void DecodeUInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } Size tuplelen = 0; char *tupledata = XLogRecGetBlockData(r, 0, &tuplelen); - if (tuplelen == 0 && !AllocSizeIsValid(tuplelen)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("tuplelen is invalid(%lu), don't decode it", tuplelen))); - return; - } XLogRecGetBlockTag(r, 0, &targetNode, NULL, NULL); if (targetNode.dbNode != ctx->slot->data.database) { return; @@ -1284,11 +1269,6 @@ static void AreaDecodeUInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf Size tuplelen = 0; char *tupledata = XLogRecGetBlockData(r, 0, &tuplelen); - if (tuplelen == 0 && !AllocSizeIsValid(tuplelen)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("tuplelen is invalid(%lu), don't decode it", tuplelen))); - return; - } XLogRecGetBlockTag(r, 0, &targetNode, NULL, NULL); /* output plugin doesn't look for this origin, no need to queue */ @@ -1341,11 +1321,6 @@ static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) Size datalen_new = 0; char *data_new = XLogRecGetBlockData(r, 0, &datalen_new); Size tuplelen_new = datalen_new - SizeOfHeapHeader; - if (tuplelen_new == 0 && !AllocSizeIsValid(tuplelen_new)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("tuplelen is invalid(%lu), tuplelen, don't decode it", tuplelen_new))); - return; - } Size datalen_old = 0; /* adapt 64 xid, if this tuple is the first tuple of a new page */ @@ -1359,11 +1334,6 @@ static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } datalen_old -= hasCSN ? sizeof(CommitSeqNo) : 0; Size tuplelen_old = datalen_old - SizeOfHeapHeader; - if (tuplelen_old == 0 && !AllocSizeIsValid(tuplelen_old)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("tuplelen is invalid(%lu), don't decode it", tuplelen_old))); - return; - } /* output plugin doesn't look for this origin, no need to queue */ if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) { @@ -1415,11 +1385,6 @@ static void AreaDecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) Size datalen_new = 0; char *data_new = XLogRecGetBlockData(r, 0, &datalen_new); Size tuplelen_new = datalen_new - SizeOfHeapHeader; - if (tuplelen_new == 0 && !AllocSizeIsValid(tuplelen_new)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("tuplelen is invalid(%lu), don't decode it", tuplelen_new))); - return; - } Size datalen_old = 0; /* adapt 64 xid, if this tuple is the first tuple of a new page */ @@ -1433,11 +1398,6 @@ static void AreaDecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } datalen_old -= hasCSN ? sizeof(CommitSeqNo) : 0; Size tuplelen_old = datalen_old - SizeOfHeapHeader; - if (tuplelen_old == 0 && !AllocSizeIsValid(tuplelen_old)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("tuplelen is invalid(%lu), tuplelen, don't decode it", tuplelen_old))); - return; - } /* output plugin doesn't look for this origin, no need to queue */ if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) { @@ -1564,12 +1524,6 @@ static void DecodeUUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) Size datalenNew = 0; char *dataNew = XLogRecGetBlockData(r, 0, &datalenNew); - if (datalenNew == 0 && !AllocSizeIsValid(datalenNew)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("tuplelen is invalid(%lu), don't decode it", datalenNew))); - return; - } - Size tuplelenOld = XLogRecGetDataLen(r) - SizeOfUHeapUpdate - (hasCSN ? sizeof(CommitSeqNo) : 0); char *dataOld = (char *)xlrec + SizeOfUHeapUpdate + (hasCSN ? sizeof(CommitSeqNo) : 0); uint32 toastLen = 0; @@ -1659,11 +1613,6 @@ static void AreaDecodeUUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf Size datalenNew = 0; char *dataNew = XLogRecGetBlockData(r, 0, &datalenNew); - if (datalenNew == 0 && !AllocSizeIsValid(datalenNew)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("tuplelen is invalid(%lu), don't decode it", datalenNew))); - return; - } ReorderBufferChange *change = ReorderBufferGetChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_UUPDATE; @@ -1735,11 +1684,6 @@ static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } Size datalen = XLogRecGetDataLen(r) - heapDeleteSize; - if (datalen == 0 && !AllocSizeIsValid(datalen)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("tuplelen is invalid(%lu), tuplelen, don't decode it", datalen))); - return; - } ReorderBufferChange *change = ReorderBufferGetChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_DELETE; change->origin_id = XLogRecGetOrigin(r); @@ -1788,11 +1732,6 @@ static void AreaDecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } Size datalen = XLogRecGetDataLen(r) - heapDeleteSize; - if (datalen == 0 && !AllocSizeIsValid(datalen)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("datalen is invalid(%lu), tuplelen, don't decode it", datalen))); - return; - } ReorderBufferChange *change = ReorderBufferGetChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_DELETE; change->origin_id = XLogRecGetOrigin(r); @@ -1923,12 +1862,6 @@ static void AreaDecodeUDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf SizeOfXLUndoHeader + addLen); addLen += metaLen; - if (datalen == 0 && !AllocSizeIsValid(datalen)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("tuplelen is invalid(%lu), don't decode it", datalen))); - return; - } - ReorderBufferChange* change = ReorderBufferGetChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_UDELETE; change->origin_id = XLogRecGetOrigin(r); diff --git a/src/gausskernel/storage/replication/logical/logical_parse.cpp b/src/gausskernel/storage/replication/logical/logical_parse.cpp index 2c297c1c2..cc3745183 100644 --- a/src/gausskernel/storage/replication/logical/logical_parse.cpp +++ b/src/gausskernel/storage/replication/logical/logical_parse.cpp @@ -545,11 +545,6 @@ void ParseInsertXlog(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf, char *tupledata = XLogRecGetBlockData(r, 0, &tuplelen); int slotId = worker->slotId; - if (tuplelen == 0 && !AllocSizeIsValid(tuplelen)) { - ereport(ERROR, (errmodule(MOD_LOGICAL_DECODE), - errmsg("ParseInsertXlog tuplelen is invalid(%lu), don't decode it", tuplelen))); - return; - } XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); if (target_node.dbNode != ctx->slot->data.database) { return; @@ -613,11 +608,6 @@ void ParseUInsert(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf, Pa char *tupledata = XLogRecGetBlockData(r, 0, &tuplelen); int slotId = worker->slotId; - if (tuplelen == 0 && !AllocSizeIsValid(tuplelen)) { - ereport(ERROR, (errmodule(MOD_LOGICAL_DECODE), - errmsg("ParseUinsert tuplelen is invalid(%lu), don't decode it", tuplelen))); - return; - } XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); if (target_node.dbNode != ctx->slot->data.database) { @@ -692,11 +682,6 @@ void ParseUpdateXlog(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf, data_new = XLogRecGetBlockData(r, 0, &datalen_new); tuplelen_new = datalen_new - SizeOfHeapHeader; - if (tuplelen_new == 0 && !AllocSizeIsValid(tuplelen_new)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("tuplelen is invalid(%lu), tuplelen, don't decode it", tuplelen_new))); - return; - } /* adapt 64 xid, if this tuple is the first tuple of a new page */ is_init = (XLogRecGetInfo(r) & XLOG_HEAP_INIT_PAGE) != 0; @@ -708,11 +693,6 @@ void ParseUpdateXlog(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf, datalen_old = XLogRecGetDataLen(r) - heapUpdateSize - sizeof(CommitSeqNo); } tuplelen_old = datalen_old - SizeOfHeapHeader; - if (tuplelen_old == 0 && !AllocSizeIsValid(tuplelen_old)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("tuplelen is invalid(%lu), tuplelen, don't decode it", tuplelen_old))); - return; - } /* output plugin doesn't look for this origin, no need to queue */ if (ParallelFilterByOrigin(ctx, XLogRecGetOrigin(r))) @@ -775,12 +755,6 @@ void ParseUUpdate(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf, Pa Size datalenNew = 0; char *dataNew = XLogRecGetBlockData(r, 0, &datalenNew); - if (datalenNew == 0 && !AllocSizeIsValid(datalenNew)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("tuplelen is invalid(%lu), don't decode it", datalenNew))); - return; - } - Size tuplelenOld = XLogRecGetDataLen(r) - SizeOfUHeapUpdate - sizeof(CommitSeqNo); char *dataOld = (char *)xlrec + SizeOfUHeapUpdate + sizeof(CommitSeqNo); uint32 toastLen = 0; @@ -887,11 +861,6 @@ void ParseDeleteXlog(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf, } datalen = XLogRecGetDataLen(r) - heapDeleteSize; - if (datalen == 0 && !AllocSizeIsValid(datalen)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("tuplelen is invalid(%lu), tuplelen, don't decode it", datalen))); - return; - } change = ParallelReorderBufferGetChange(ctx->reorder, slotId); change->action = PARALLEL_REORDER_BUFFER_CHANGE_DELETE; @@ -1045,7 +1014,11 @@ void ParseMultiInsert(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf */ if (xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE) { HeapTupleHeader header; - xlhdr = (xl_multi_insert_tuple *)SHORTALIGN(data); + if ((data - tupledata) % ALIGNOF_SHORT == 0) { + xlhdr = (xl_multi_insert_tuple *)data; + } else { + xlhdr = (xl_multi_insert_tuple *)(data + ALIGNOF_SHORT - (data - tupledata) % ALIGNOF_SHORT); + } data = ((char *)xlhdr) + SizeOfMultiInsertTuple; datalen = xlhdr->datalen; if (datalen != 0 && AllocSizeIsValid((uint)datalen)) { diff --git a/src/gausskernel/storage/replication/logical/logicalfuncs.cpp b/src/gausskernel/storage/replication/logical/logicalfuncs.cpp index c82e13aba..56f38335d 100755 --- a/src/gausskernel/storage/replication/logical/logicalfuncs.cpp +++ b/src/gausskernel/storage/replication/logical/logicalfuncs.cpp @@ -88,8 +88,8 @@ static void LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, Tran str_lsn_temp = (char *)palloc0(str_lsn_len); rc = sprintf_s(str_lsn_temp, str_lsn_len, "%X/%X", uint32(lsn >> 32), uint32(lsn)); securec_check_ss(rc, "", ""); - values[0] = CStringGetTextDatum(str_lsn_temp); - values[1] = TransactionIdGetDatum(xid); + values[ARR_0] = CStringGetTextDatum(str_lsn_temp); + values[ARR_1] = TransactionIdGetDatum(xid); /* * Assert ctx->out is in database encoding when we're writing textual @@ -100,9 +100,12 @@ static void LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, Tran } /* ick, but cstring_to_text_with_len works for bytea perfectly fine */ - values[2] = PointerGetDatum(cstring_to_text_with_len(ctx->out->data, (size_t)(uint)(ctx->out->len))); + values[ARR_2] = PointerGetDatum(cstring_to_text_with_len(ctx->out->data, (size_t)(uint)(ctx->out->len))); tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls); + pfree(DatumGetPointer(values[ARR_0])); + pfree(DatumGetPointer(values[ARR_2])); + pfree(str_lsn_temp); p->returned_rows++; } diff --git a/src/gausskernel/storage/replication/logical/parallel_decode.cpp b/src/gausskernel/storage/replication/logical/parallel_decode.cpp index 519f2a321..2246bafbe 100644 --- a/src/gausskernel/storage/replication/logical/parallel_decode.cpp +++ b/src/gausskernel/storage/replication/logical/parallel_decode.cpp @@ -972,41 +972,43 @@ ParallelStatusData *GetParallelDecodeStatus(uint32 *num) knl_g_parallel_decode_context *pDecodeCxt = &g_instance.comm_cxt.pdecode_cxt[i]; if (!g_Logicaldispatcher[i].active || g_Logicaldispatcher[i].abnormal) { + FreeStringInfo(&readQueueLen); + FreeStringInfo(&decodeQueueLen); continue; } for (int j = 0; j < result[id].parallelDecodeNum; j++) { - SpinLockAcquire(&pDecodeCxt->destroy_lock); + SpinLockAcquire(&pDecodeCxt->rwlock); ParallelDecodeWorker *worker = g_Logicaldispatcher[i].decodeWorkers[j]; if (!g_Logicaldispatcher[i].active || g_Logicaldispatcher[i].abnormal || worker == NULL) { escape = true; - SpinLockRelease(&pDecodeCxt->destroy_lock); + SpinLockRelease(&pDecodeCxt->rwlock); break; } LogicalQueue *readQueue = worker->changeQueue; if (!g_Logicaldispatcher[i].active || g_Logicaldispatcher[i].abnormal || readQueue == NULL) { escape = true; - SpinLockRelease(&pDecodeCxt->destroy_lock); + SpinLockRelease(&pDecodeCxt->rwlock); break; } uint32 rmask = readQueue->mask; uint32 readHead = pg_atomic_read_u32(&readQueue->writeHead); uint32 readTail = pg_atomic_read_u32(&readQueue->readTail); uint32 readCnt = COUNT(readHead, readTail, rmask); - SpinLockRelease(&pDecodeCxt->destroy_lock); + SpinLockRelease(&pDecodeCxt->rwlock); appendStringInfo(&readQueueLen, "queue%d: %u", j, readCnt); - SpinLockAcquire(&pDecodeCxt->destroy_lock); + SpinLockAcquire(&pDecodeCxt->rwlock); LogicalQueue *decodeQueue = worker->LogicalLogQueue; if (!g_Logicaldispatcher[i].active || g_Logicaldispatcher[i].abnormal || decodeQueue == NULL) { escape = true; - SpinLockRelease(&pDecodeCxt->destroy_lock); + SpinLockRelease(&pDecodeCxt->rwlock); break; } uint32 dmask = decodeQueue->mask; uint32 decodeHead = pg_atomic_read_u32(&decodeQueue->writeHead); uint32 decodeTail = pg_atomic_read_u32(&decodeQueue->readTail); uint32 decodeCnt = COUNT(decodeHead, decodeTail, dmask); - SpinLockRelease(&pDecodeCxt->destroy_lock); + SpinLockRelease(&pDecodeCxt->rwlock); appendStringInfo(&decodeQueueLen, "queue%d: %u", j, decodeCnt); if (j < result[id].parallelDecodeNum - 1) { diff --git a/src/gausskernel/storage/replication/logical/parallel_decode_worker.cpp b/src/gausskernel/storage/replication/logical/parallel_decode_worker.cpp index 4a1f9b02f..301f9e908 100644 --- a/src/gausskernel/storage/replication/logical/parallel_decode_worker.cpp +++ b/src/gausskernel/storage/replication/logical/parallel_decode_worker.cpp @@ -198,23 +198,23 @@ static void SetDecodeWorkerThreadState(int slotId, int workId, int state) void ReleaseParallelDecodeResource(int slotId) { knl_g_parallel_decode_context *pDecodeCxt = &g_instance.comm_cxt.pdecode_cxt[slotId]; - - SpinLockAcquire(&pDecodeCxt->destroy_lock); - if (pDecodeCxt->parallelDecodeCtx != NULL) { - MemoryContextDelete(pDecodeCxt->parallelDecodeCtx); - pDecodeCxt->parallelDecodeCtx = NULL; - } - if (pDecodeCxt->logicalLogCtx != NULL) { - MemoryContextDelete(pDecodeCxt->logicalLogCtx); - pDecodeCxt->logicalLogCtx = NULL; - } - SpinLockRelease(&pDecodeCxt->destroy_lock); + MemoryContext decode_cxt = pDecodeCxt->parallelDecodeCtx; + MemoryContext llog_cxt = pDecodeCxt->logicalLogCtx; SpinLockAcquire(&pDecodeCxt->rwlock); + pDecodeCxt->parallelDecodeCtx = NULL; + pDecodeCxt->logicalLogCtx = NULL; g_Logicaldispatcher[slotId].active = false; g_Logicaldispatcher[slotId].abnormal = false; SpinLockRelease(&pDecodeCxt->rwlock); + if (decode_cxt != NULL) { + MemoryContextDelete(decode_cxt); + } + if (llog_cxt != NULL) { + MemoryContextDelete(llog_cxt); + } + ereport(LOG, (errmsg("g_Logicaldispatcher[%d].active = false", slotId))); } @@ -651,25 +651,14 @@ int GetLogicalDispatcher() const int maxReaderNum = 20; int maxDispatcherNum = Min(g_instance.attr.attr_storage.max_replication_slots, maxReaderNum); LWLockAcquire(ParallelDecodeLock, LW_EXCLUSIVE); - MemoryContext ctx = AllocSetContextCreate(g_instance.instance_context, "ParallelDecodeDispatcher", - ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE, SHARED_CONTEXT); - MemoryContext logctx = AllocSetContextCreate(g_instance.instance_context, "ParallelDecodeLog", - ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE, SHARED_CONTEXT); - knl_g_parallel_decode_context *gDecodeCxt = g_instance.comm_cxt.pdecode_cxt; - for (int i = 0; i < maxDispatcherNum; i++) { if (g_Logicaldispatcher[i].active == false) { slotId = i; - errno_t rc = memset_s(&g_Logicaldispatcher[slotId], sizeof(LogicalDispatcher), 0, - sizeof(LogicalDispatcher)); + errno_t rc = + memset_s(&g_Logicaldispatcher[slotId], sizeof(LogicalDispatcher), 0, sizeof(LogicalDispatcher)); securec_check(rc, "", ""); InitLogicalDispatcher(&g_Logicaldispatcher[slotId]); g_Logicaldispatcher[i].active = true; - - ereport(LOG, (errmsg("g_Logicaldispatcher[%d].active = true", slotId))); - g_Logicaldispatcher[i].abnormal = false; - gDecodeCxt[i].parallelDecodeCtx = ctx; - gDecodeCxt[i].logicalLogCtx = logctx; break; } } @@ -677,6 +666,17 @@ int GetLogicalDispatcher() if(slotId == -1) { return slotId; } + ereport(LOG, (errmsg("g_Logicaldispatcher[%d].active = true", slotId))); + + MemoryContext ctx = AllocSetContextCreate(g_instance.instance_context, "ParallelDecodeDispatcher", + ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE, SHARED_CONTEXT); + MemoryContext logctx = AllocSetContextCreate(g_instance.instance_context, "ParallelDecodeLog", + ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE, SHARED_CONTEXT); + knl_g_parallel_decode_context *gDecodeCxt = g_instance.comm_cxt.pdecode_cxt; + + g_Logicaldispatcher[slotId].abnormal = false; + gDecodeCxt[slotId].parallelDecodeCtx = ctx; + gDecodeCxt[slotId].logicalLogCtx = logctx; SpinLockAcquire(&(gDecodeCxt[slotId].rwlock)); int state = gDecodeCxt[slotId].state; @@ -720,56 +720,51 @@ bool CheckWhiteList(const List *whiteList, const char *schema, const char *table */ static bool ParseSchemaAndTableName(List *tableList, List **tableWhiteList) { - ListCell *lc = NULL; - char *str = NULL; - char *startPos = NULL; + ListCell *table_cell = NULL; char *curPos = NULL; - size_t len = 0; chosenTable *cTable = NULL; - bool anySchema = false; - bool anyTable = false; - errno_t rc = 0; - foreach(lc, tableList) { - str = (char*)lfirst(lc); - cTable = (chosenTable *)palloc(sizeof(chosenTable)); + foreach(table_cell, tableList) { + bool anySchema = false; + bool anyTable = false; + char *head = (char*)lfirst(table_cell); + cTable = (chosenTable *)palloc0(sizeof(chosenTable)); - if (*str == '*' && *(str + 1) == '.') { + if (*head == '*' && *(head + 1) == '.') { cTable->schema = NULL; anySchema = true; } - startPos = str; - curPos = str; + curPos = head; while (*curPos != '\0' && *curPos != '.') { curPos++; } - len = (size_t)(curPos - startPos); + size_t schema_len = (size_t)(curPos - head); if (*curPos == '\0') { pfree(cTable); return false; } else { if (!anySchema) { - cTable->schema = (char *)palloc0((len + 1) * sizeof(char)); - errno_t rc = strncpy_s(cTable->schema, len + 1, startPos, len); + cTable->schema = (char *)palloc0((schema_len + 1) * sizeof(char)); + errno_t rc = strncpy_s(cTable->schema, schema_len + 1, head, schema_len); securec_check(rc, "", ""); } curPos++; - startPos = curPos; + head = curPos; - if (*startPos == '*' && *(startPos + 1) == '\0') { + if (*head == '*' && *(head + 1) == '\0') { cTable->table = NULL; anyTable = true; } while (*curPos != '\0') { curPos++; } - len = (size_t)(curPos - startPos); + size_t table_len = (size_t)(curPos - head); if (!anyTable) { - cTable->table = (char *)palloc((len + 1) * sizeof(char)); - rc = strncpy_s(cTable->table, len + 1, startPos, len); + cTable->table = (char *)palloc0((table_len + 1) * sizeof(char)); + errno_t rc = strncpy_s(cTable->table, table_len + 1, head, table_len); securec_check(rc, "", ""); } } @@ -779,40 +774,44 @@ static bool ParseSchemaAndTableName(List *tableList, List **tableWhiteList) } /* - * Parse a rawstring to a list of table names. + * Skip leading spaces. + */ +inline void SkipSpaceForString(char **str) +{ + while (isspace(**str)) { + (*str)++; + } +} + +/* + * Parse a raw string to a list of table names. */ bool ParseStringToWhiteList(char *tableString, List **tableWhiteList) { char *curPos = tableString; - bool finished = false; - List *tableList = NIL; - while (isspace(*curPos)) { - curPos++; - } + SkipSpaceForString(&curPos); if (*curPos == '\0') { return true; } + bool finished = false; + List *tableList = NIL; do { char* tmpName = curPos; while (*curPos != '\0' && *curPos != ',' && !isspace(*curPos)) { curPos++; } - char *tmpEnd = curPos; if (tmpName == curPos) { list_free_deep(tableList); return false; } - while (isspace(*curPos)) { - curPos++; - } + char *tmpEnd = curPos; + SkipSpaceForString(&curPos); if (*curPos == '\0') { finished = true; } else if (*curPos == ',') { curPos++; - while (isspace(*curPos)) { - curPos++; - } + SkipSpaceForString(&curPos); } else { list_free_deep(tableList); return false; @@ -822,13 +821,9 @@ bool ParseStringToWhiteList(char *tableString, List **tableWhiteList) tableList = lappend(tableList, tableName); } while (!finished); - if (!ParseSchemaAndTableName(tableList, tableWhiteList)) { - list_free_deep(tableList); - return false; - } - + bool parseSuccess = ParseSchemaAndTableName(tableList, tableWhiteList); list_free_deep(tableList); - return true; + return parseSuccess; } /* @@ -1256,6 +1251,7 @@ void LogicalReadRecordMain(ParallelDecodeReaderWorker *worker) PG_TRY(); { + int retries = 0; /* make sure that our requirements are still fulfilled */ CheckLogicalDecodingRequirements(u_sess->proc_cxt.MyDatabaseId); @@ -1308,15 +1304,21 @@ void LogicalReadRecordMain(ParallelDecodeReaderWorker *worker) ProcessConfigFile(PGC_SIGHUP); } char *errm = NULL; + const uint32 upperLen = 32; XLogRecord *record = XLogReadRecord(ctx->reader, startptr, &errm); if (errm != NULL) { - const uint32 upperLen = 32; - ereport(LOG, (errmsg("Stop parsing any XLog Record at %X/%X, sleep 1 second: %s.", - (uint32)(ctx->reader->EndRecPtr >> upperLen), (uint32)ctx->reader->EndRecPtr, errm))); - + retries++; + if (retries >= XLOG_STREAM_READREC_MAXTRY) { + ereport(ERROR, (errmsg("Stop parsing any XLog Record at %X/%X after %d attempts: %s.", + (uint32)(ctx->reader->EndRecPtr >> upperLen), (uint32)ctx->reader->EndRecPtr, retries, errm))); + } const long sleepTime = 1000000L; pg_usleep(sleepTime); continue; + } else if (retries != 0) { + ereport(LOG, (errmsg("Reread XLog Record after %d retries at %X/%X.", retries, + (uint32)(ctx->reader->EndRecPtr >> upperLen), (uint32)ctx->reader->EndRecPtr))); + retries = 0; } startptr = InvalidXLogRecPtr; diff --git a/src/gausskernel/storage/replication/logical/parallel_reorderbuffer.cpp b/src/gausskernel/storage/replication/logical/parallel_reorderbuffer.cpp index 18785e93f..ac096a89c 100644 --- a/src/gausskernel/storage/replication/logical/parallel_reorderbuffer.cpp +++ b/src/gausskernel/storage/replication/logical/parallel_reorderbuffer.cpp @@ -75,9 +75,6 @@ static Size ParallelReorderBufferRestoreChanges(ParallelReorderBuffer *rb, Paral XLogSegNo *segno, int slotId); static void ParallelReorderBufferRestoreChange(ParallelReorderBuffer *rb, ParallelReorderBufferTXN *txn, char *data, int slotId); -void ParallelReorderBufferCleanupTXN(ParallelReorderBuffer *rb, ParallelReorderBufferTXN *txn, - XLogRecPtr lsn = InvalidXLogRecPtr); - /* Parallel decoding batch sending unit length is set to 1MB. */ static const int g_batch_unit_length = 1 * 1024 * 1024; diff --git a/src/gausskernel/storage/replication/slotfuncs.cpp b/src/gausskernel/storage/replication/slotfuncs.cpp index 5119b11c4..39b53dc1a 100755 --- a/src/gausskernel/storage/replication/slotfuncs.cpp +++ b/src/gausskernel/storage/replication/slotfuncs.cpp @@ -37,7 +37,6 @@ #define AllSlotInUse(a, b) ((a) == (b)) extern void *internal_load_library(const char *libname); -extern bool PMstateIsRun(void); static void redo_slot_create(const ReplicationSlotPersistentData *slotInfo, char* extra_content = NULL); #ifndef ENABLE_LITE_MODE static XLogRecPtr create_physical_replication_slot_for_backup(const char* slot_name, bool is_dummy, char* extra); @@ -49,7 +48,7 @@ static void slot_advance(const char* slotname, XLogRecPtr &moveto, NameData &dat void log_slot_create(const ReplicationSlotPersistentData *slotInfo, char* extra_content) { - if (!u_sess->attr.attr_sql.enable_slot_log || !PMstateIsRun()) { + if (!u_sess->attr.attr_sql.enable_slot_log || RecoveryInProgress()) { return; } @@ -83,7 +82,7 @@ void log_slot_create(const ReplicationSlotPersistentData *slotInfo, char* extra_ void log_slot_advance(const ReplicationSlotPersistentData *slotInfo, char* extra_content) { - if ((!u_sess->attr.attr_sql.enable_slot_log && t_thrd.role != ARCH) || !PMstateIsRun()) { + if ((!u_sess->attr.attr_sql.enable_slot_log && t_thrd.role != ARCH) || RecoveryInProgress()) { return; } @@ -119,7 +118,7 @@ void log_slot_advance(const ReplicationSlotPersistentData *slotInfo, char* extra void log_slot_drop(const char *name) { - if (!u_sess->attr.attr_sql.enable_slot_log || !PMstateIsRun()) + if (!u_sess->attr.attr_sql.enable_slot_log || RecoveryInProgress()) return; XLogRecPtr Ptr; ReplicationSlotPersistentData xlrec; @@ -153,7 +152,7 @@ void LogCheckSlot() LogicalPersistentData *LogicalSlot = NULL; size = GetAllLogicalSlot(LogicalSlot); - if (!u_sess->attr.attr_sql.enable_slot_log || !PMstateIsRun()) + if (!u_sess->attr.attr_sql.enable_slot_log || RecoveryInProgress()) return; START_CRIT_SECTION(); diff --git a/src/include/knl/knl_instance.h b/src/include/knl/knl_instance.h index 8f7cf00d2..c9aa5edd2 100755 --- a/src/include/knl/knl_instance.h +++ b/src/include/knl/knl_instance.h @@ -710,7 +710,6 @@ typedef struct knl_g_parallel_decode_context { MemoryContext logicalLogCtx; int state; slock_t rwlock; - slock_t destroy_lock; /* redo worker destroy lock */ char* dbUser; char* dbName; int totalNum; diff --git a/src/include/replication/parallel_reorderbuffer.h b/src/include/replication/parallel_reorderbuffer.h index 07a4cc1d3..8312db6ff 100644 --- a/src/include/replication/parallel_reorderbuffer.h +++ b/src/include/replication/parallel_reorderbuffer.h @@ -394,6 +394,8 @@ extern void WalSndPrepareWriteHelper(StringInfo out, XLogRecPtr lsn, Transaction extern void ParallelReorderBufferUpdateMemory(ParallelReorderBuffer *rb, logicalLog *change, int slotId, bool add); extern void CheckNewTupleMissingToastChunk(ParallelReorderBufferChange *change, bool isHeap); extern void ParallelReorderBufferChildAssignment(ParallelReorderBuffer *prb, logicalLog *logChange); +extern void ParallelReorderBufferCleanupTXN(ParallelReorderBuffer *rb, ParallelReorderBufferTXN *txn, + XLogRecPtr lsn = InvalidXLogRecPtr); const uint32 max_decode_cache_num = 100000; #endif