diff --git a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/dispatcher.cpp b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/dispatcher.cpp index a4aa050ff..923f7238d 100644 --- a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/dispatcher.cpp +++ b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/dispatcher.cpp @@ -443,11 +443,11 @@ void HandleStartupInterruptsForExtremeRto() static void SetOndemandXLogParseFlagValue(uint32 maxParseBufNum) { - g_ondemandXLogParseMemFullValue = maxParseBufNum * ONDEMAND_FORCE_PRUNE_RATIO; + g_ondemandXLogParseMemFullValue = maxParseBufNum * ONDEMAND_HASHMAP_FORCE_PRUNE_RATIO; g_ondemandXLogParseMemCancelPauseVaule = maxParseBufNum * ONDEMAND_DISTRIBUTE_CANCEL_RATIO; g_ondemandXLogParseMemCancelPauseVaulePerPipeline = (maxParseBufNum - g_ondemandXLogParseMemFullValue) / get_batch_redo_num(); - g_ondemandRealtimeBuildQueueFullValue = REALTIME_BUILD_RECORD_QUEUE_SIZE * ONDEMAND_FORCE_PRUNE_RATIO; + g_ondemandRealtimeBuildQueueFullValue = REALTIME_BUILD_RECORD_QUEUE_SIZE * ONDEMAND_RECORD_QUEUE_FORCE_PRUNE_RATIO; } /* Run from the dispatcher thread. */ @@ -474,8 +474,7 @@ void StartRecoveryWorkers(XLogReaderState *xlogreader, uint32 privateLen) rc = memcpy_s(g_dispatcher->restoreControlFile, (size_t)sizeof(ControlFileData), &restoreControlFile, (size_t)sizeof(ControlFileData)); securec_check(rc, "", ""); } - g_dispatcher->maxItemNum = (get_batch_redo_num() + 4) * PAGE_WORK_QUEUE_SIZE * - ITEM_QUQUE_SIZE_RATIO; // 4: a startup, readmanager, txnmanager, txnworker + g_dispatcher->maxItemNum = 3 * REALTIME_BUILD_RECORD_QUEUE_SIZE; // 3: TrxnQueue, SegQueue, Hashmap(reuse) uint32 maxParseBufNum = (uint32)((uint64)g_instance.attr.attr_storage.dms_attr.ondemand_recovery_mem_size * 1024 / (sizeof(XLogRecParseState) + sizeof(ParseBufferDesc) + sizeof(RedoMemSlot))); XLogParseBufferInitFunc(&(g_dispatcher->parseManager), maxParseBufNum, &recordRefOperate, RedoInterruptCallBack); @@ -1842,7 +1841,7 @@ void CopyDataFromOldReader(XLogReaderState *newReaderState, const XLogReaderStat errno_t rc = EOK; if ((newReaderState->readRecordBuf == NULL) || (oldReaderState->readRecordBufSize > newReaderState->readRecordBufSize)) { - if (!allocate_recordbuf(newReaderState, oldReaderState->readRecordBufSize)) { + if (!ondemand_allocate_recordbuf(newReaderState, oldReaderState->readRecordBufSize)) { ereport(PANIC, (errmodule(MOD_REDO), errcode(ERRCODE_LOG), diff --git a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/page_redo.cpp b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/page_redo.cpp index d7eb38b47..a3fb16add 100644 --- a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/page_redo.cpp +++ b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/page_redo.cpp @@ -1775,7 +1775,11 @@ static void TrxnManagerPruneAndDistributeIfRealtimeBuildFailover() static void TrxnManagerPruneIfQueueFullInRealtimeBuild() { - while (SS_ONDEMAND_RECOVERY_TRXN_QUEUE_FULL && SS_ONDEMAND_REALTIME_BUILD_NORMAL) { + /* + * we used OndemandTrxnQueueFullInRealtimeBuild instead of SS_ONDEMAND_RECOVERY_TRXN_QUEUE_FULL, because + * OndemandCtrlWorker may not get pause status immediately + */ + while (OndemandTrxnQueueFullInRealtimeBuild() && SS_ONDEMAND_REALTIME_BUILD_NORMAL) { TrxnManagerProcHashMapPrune(); RedoInterruptCallBack(); } @@ -1842,7 +1846,6 @@ bool TrxnManagerDistributeItemsBeforeEnd(RedoItem *item) TestXLogReaderProbe(UTEST_EVENT_RTO_TRXNMGR_DISTRIBUTE_ITEMS, __FUNCTION__, &item->record); #endif - TrxnManagerPruneIfQueueFullInRealtimeBuild(); TrxnManagerAddTrxnRecord(item, syncRecord); CountRedoTime(g_redoWorker->timeCostList[TIME_COST_STEP_5]); } @@ -1898,6 +1901,7 @@ void TrxnManagerMain() } } CountRedoTime(g_redoWorker->timeCostList[TIME_COST_STEP_3]); + TrxnManagerPruneIfQueueFullInRealtimeBuild(); TrxnManagerPruneAndDistributeIfRealtimeBuildFailover(); if (!SPSCBlockingQueueIsEmpty(g_redoWorker->queue)) { GetRedoStartTime(g_redoWorker->timeCostList[TIME_COST_STEP_1]); diff --git a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/xlog_read.cpp b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/xlog_read.cpp index 200996195..2e977086f 100644 --- a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/xlog_read.cpp +++ b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/xlog_read.cpp @@ -394,6 +394,38 @@ err: return -1; } +/* + * copy from allocate_recordbuf + * + * In ondemand realtime build, we need save readRecordBuf for segQueue and + * trxnQueue, so we allocate smaller (512) for save memory. + */ +bool ondemand_allocate_recordbuf(XLogReaderState *state, uint32 reclength) +{ + uint32 newSize = reclength; + const uint32 recordBufferAllocStep = 512; + + if (SS_ONDEMAND_REALTIME_BUILD_NORMAL) { + newSize += recordBufferAllocStep - (newSize % recordBufferAllocStep); + } else { + newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ); + } + newSize = Max(newSize, recordBufferAllocStep); + + if (state->readRecordBuf != NULL) { + pfree(state->readRecordBuf); + state->readRecordBuf = NULL; + } + state->readRecordBuf = (char *)palloc_extended(newSize, MCXT_ALLOC_NO_OOM); + if (state->readRecordBuf == NULL) { + state->readRecordBufSize = 0; + return false; + } + + state->readRecordBufSize = newSize; + return true; +} + XLogRecord *ParallelReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg, char* xlogPath) { XLogRecord *record = NULL; @@ -521,7 +553,7 @@ XLogRecord *ParallelReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char * /* * Enlarge readRecordBuf as needed. */ - if (total_len > state->readRecordBufSize && !allocate_recordbuf(state, total_len)) { + if (total_len > state->readRecordBufSize && !ondemand_allocate_recordbuf(state, total_len)) { /* We treat this as a "bogus data" condition */ report_invalid_record(state, "record length %u at %X/%X too long", total_len, (uint32)(RecPtr >> 32), (uint32)RecPtr); diff --git a/src/include/access/ondemand_extreme_rto/page_redo.h b/src/include/access/ondemand_extreme_rto/page_redo.h index 25585c5a9..0689cf939 100644 --- a/src/include/access/ondemand_extreme_rto/page_redo.h +++ b/src/include/access/ondemand_extreme_rto/page_redo.h @@ -42,7 +42,8 @@ namespace ondemand_extreme_rto { #define ONDEMAND_DISTRIBUTE_CANCEL_RATIO 0.5 -#define ONDEMAND_FORCE_PRUNE_RATIO 0.99 +#define ONDEMAND_HASHMAP_FORCE_PRUNE_RATIO 0.99 +#define ONDEMAND_RECORD_QUEUE_FORCE_PRUNE_RATIO 0.95 #define ONDEMAND_HASHTAB_SWITCH_LIMIT 100000 #define SEG_PROC_PIPELINE_SLOT 0 #define ONDEMAND_LOG_PAUSE_STATUS_TIME 30 @@ -52,7 +53,7 @@ namespace ondemand_extreme_rto { #define ONDEMAND_HASHMAP_ENTRY_NEED_REDO 2 static const uint32 PAGE_WORK_QUEUE_SIZE = 65536; -static const uint32 REALTIME_BUILD_RECORD_QUEUE_SIZE = 4194304; +static const uint32 REALTIME_BUILD_RECORD_QUEUE_SIZE = 2097152; static const uint32 ONDEMAND_EXTREME_RTO_ALIGN_LEN = 16; /* need 128-bit aligned */ static const uint32 MAX_REMOTE_READ_INFO_NUM = 100; diff --git a/src/include/access/ondemand_extreme_rto/xlog_read.h b/src/include/access/ondemand_extreme_rto/xlog_read.h index 25b1f016e..b640b3c60 100644 --- a/src/include/access/ondemand_extreme_rto/xlog_read.h +++ b/src/include/access/ondemand_extreme_rto/xlog_read.h @@ -32,7 +32,7 @@ namespace ondemand_extreme_rto { XLogRecord* XLogParallelReadNextRecord(XLogReaderState* xlogreader); XLogRecord *ReadNextXLogRecord(XLogReaderState **xlogreaderptr, int emode); XLogRecord *ParallelReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg, char* xlogPath); - +bool ondemand_allocate_recordbuf(XLogReaderState *state, uint32 reclength); } // namespace ondemand_extreme_rto typedef struct XLogFileId {