解决事务日志队列满导致实时构建卡住的问题;调整实时构建下xlog-record内存申请步长

This commit is contained in:
chendong76
2024-09-16 11:16:58 +08:00
parent 184c1cb677
commit edd95fc5bd
5 changed files with 47 additions and 11 deletions

View File

@ -443,11 +443,11 @@ void HandleStartupInterruptsForExtremeRto()
static void SetOndemandXLogParseFlagValue(uint32 maxParseBufNum)
{
g_ondemandXLogParseMemFullValue = maxParseBufNum * ONDEMAND_FORCE_PRUNE_RATIO;
g_ondemandXLogParseMemFullValue = maxParseBufNum * ONDEMAND_HASHMAP_FORCE_PRUNE_RATIO;
g_ondemandXLogParseMemCancelPauseVaule = maxParseBufNum * ONDEMAND_DISTRIBUTE_CANCEL_RATIO;
g_ondemandXLogParseMemCancelPauseVaulePerPipeline =
(maxParseBufNum - g_ondemandXLogParseMemFullValue) / get_batch_redo_num();
g_ondemandRealtimeBuildQueueFullValue = REALTIME_BUILD_RECORD_QUEUE_SIZE * ONDEMAND_FORCE_PRUNE_RATIO;
g_ondemandRealtimeBuildQueueFullValue = REALTIME_BUILD_RECORD_QUEUE_SIZE * ONDEMAND_RECORD_QUEUE_FORCE_PRUNE_RATIO;
}
/* Run from the dispatcher thread. */
@ -474,8 +474,7 @@ void StartRecoveryWorkers(XLogReaderState *xlogreader, uint32 privateLen)
rc = memcpy_s(g_dispatcher->restoreControlFile, (size_t)sizeof(ControlFileData), &restoreControlFile, (size_t)sizeof(ControlFileData));
securec_check(rc, "", "");
}
g_dispatcher->maxItemNum = (get_batch_redo_num() + 4) * PAGE_WORK_QUEUE_SIZE *
ITEM_QUQUE_SIZE_RATIO; // 4: a startup, readmanager, txnmanager, txnworker
g_dispatcher->maxItemNum = 3 * REALTIME_BUILD_RECORD_QUEUE_SIZE; // 3: TrxnQueue, SegQueue, Hashmap(reuse)
uint32 maxParseBufNum = (uint32)((uint64)g_instance.attr.attr_storage.dms_attr.ondemand_recovery_mem_size *
1024 / (sizeof(XLogRecParseState) + sizeof(ParseBufferDesc) + sizeof(RedoMemSlot)));
XLogParseBufferInitFunc(&(g_dispatcher->parseManager), maxParseBufNum, &recordRefOperate, RedoInterruptCallBack);
@ -1842,7 +1841,7 @@ void CopyDataFromOldReader(XLogReaderState *newReaderState, const XLogReaderStat
errno_t rc = EOK;
if ((newReaderState->readRecordBuf == NULL) ||
(oldReaderState->readRecordBufSize > newReaderState->readRecordBufSize)) {
if (!allocate_recordbuf(newReaderState, oldReaderState->readRecordBufSize)) {
if (!ondemand_allocate_recordbuf(newReaderState, oldReaderState->readRecordBufSize)) {
ereport(PANIC,
(errmodule(MOD_REDO),
errcode(ERRCODE_LOG),

View File

@ -1775,7 +1775,11 @@ static void TrxnManagerPruneAndDistributeIfRealtimeBuildFailover()
static void TrxnManagerPruneIfQueueFullInRealtimeBuild()
{
while (SS_ONDEMAND_RECOVERY_TRXN_QUEUE_FULL && SS_ONDEMAND_REALTIME_BUILD_NORMAL) {
/*
* we used OndemandTrxnQueueFullInRealtimeBuild instead of SS_ONDEMAND_RECOVERY_TRXN_QUEUE_FULL, because
* OndemandCtrlWorker may not get pause status immediately
*/
while (OndemandTrxnQueueFullInRealtimeBuild() && SS_ONDEMAND_REALTIME_BUILD_NORMAL) {
TrxnManagerProcHashMapPrune();
RedoInterruptCallBack();
}
@ -1842,7 +1846,6 @@ bool TrxnManagerDistributeItemsBeforeEnd(RedoItem *item)
TestXLogReaderProbe(UTEST_EVENT_RTO_TRXNMGR_DISTRIBUTE_ITEMS,
__FUNCTION__, &item->record);
#endif
TrxnManagerPruneIfQueueFullInRealtimeBuild();
TrxnManagerAddTrxnRecord(item, syncRecord);
CountRedoTime(g_redoWorker->timeCostList[TIME_COST_STEP_5]);
}
@ -1898,6 +1901,7 @@ void TrxnManagerMain()
}
}
CountRedoTime(g_redoWorker->timeCostList[TIME_COST_STEP_3]);
TrxnManagerPruneIfQueueFullInRealtimeBuild();
TrxnManagerPruneAndDistributeIfRealtimeBuildFailover();
if (!SPSCBlockingQueueIsEmpty(g_redoWorker->queue)) {
GetRedoStartTime(g_redoWorker->timeCostList[TIME_COST_STEP_1]);

View File

@ -394,6 +394,38 @@ err:
return -1;
}
/*
* copy from allocate_recordbuf
*
* In ondemand realtime build, we need save readRecordBuf for segQueue and
* trxnQueue, so we allocate smaller (512) for save memory.
*/
bool ondemand_allocate_recordbuf(XLogReaderState *state, uint32 reclength)
{
uint32 newSize = reclength;
const uint32 recordBufferAllocStep = 512;
if (SS_ONDEMAND_REALTIME_BUILD_NORMAL) {
newSize += recordBufferAllocStep - (newSize % recordBufferAllocStep);
} else {
newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
}
newSize = Max(newSize, recordBufferAllocStep);
if (state->readRecordBuf != NULL) {
pfree(state->readRecordBuf);
state->readRecordBuf = NULL;
}
state->readRecordBuf = (char *)palloc_extended(newSize, MCXT_ALLOC_NO_OOM);
if (state->readRecordBuf == NULL) {
state->readRecordBufSize = 0;
return false;
}
state->readRecordBufSize = newSize;
return true;
}
XLogRecord *ParallelReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg, char* xlogPath)
{
XLogRecord *record = NULL;
@ -521,7 +553,7 @@ XLogRecord *ParallelReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char *
/*
* Enlarge readRecordBuf as needed.
*/
if (total_len > state->readRecordBufSize && !allocate_recordbuf(state, total_len)) {
if (total_len > state->readRecordBufSize && !ondemand_allocate_recordbuf(state, total_len)) {
/* We treat this as a "bogus data" condition */
report_invalid_record(state, "record length %u at %X/%X too long", total_len, (uint32)(RecPtr >> 32),
(uint32)RecPtr);

View File

@ -42,7 +42,8 @@
namespace ondemand_extreme_rto {
#define ONDEMAND_DISTRIBUTE_CANCEL_RATIO 0.5
#define ONDEMAND_FORCE_PRUNE_RATIO 0.99
#define ONDEMAND_HASHMAP_FORCE_PRUNE_RATIO 0.99
#define ONDEMAND_RECORD_QUEUE_FORCE_PRUNE_RATIO 0.95
#define ONDEMAND_HASHTAB_SWITCH_LIMIT 100000
#define SEG_PROC_PIPELINE_SLOT 0
#define ONDEMAND_LOG_PAUSE_STATUS_TIME 30
@ -52,7 +53,7 @@ namespace ondemand_extreme_rto {
#define ONDEMAND_HASHMAP_ENTRY_NEED_REDO 2
static const uint32 PAGE_WORK_QUEUE_SIZE = 65536;
static const uint32 REALTIME_BUILD_RECORD_QUEUE_SIZE = 4194304;
static const uint32 REALTIME_BUILD_RECORD_QUEUE_SIZE = 2097152;
static const uint32 ONDEMAND_EXTREME_RTO_ALIGN_LEN = 16; /* need 128-bit aligned */
static const uint32 MAX_REMOTE_READ_INFO_NUM = 100;

View File

@ -32,7 +32,7 @@ namespace ondemand_extreme_rto {
XLogRecord* XLogParallelReadNextRecord(XLogReaderState* xlogreader);
XLogRecord *ReadNextXLogRecord(XLogReaderState **xlogreaderptr, int emode);
XLogRecord *ParallelReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg, char* xlogPath);
bool ondemand_allocate_recordbuf(XLogReaderState *state, uint32 reclength);
} // namespace ondemand_extreme_rto
typedef struct XLogFileId {