修复hashmap不足情况下,实时构建可能卡住的问题
This commit is contained in:
@ -1210,8 +1210,6 @@ void PageManagerProcLsnForwarder(RedoItem *lsnForwarder)
|
||||
|
||||
PageManagerAddRedoItemToSegWorkers(lsnForwarder);
|
||||
PageManagerAddRedoItemToHashMapManager(lsnForwarder);
|
||||
PageRedoPipeline *myRedoLine = &g_dispatcher->pageLines[g_redoWorker->slotId];
|
||||
|
||||
PageManagerPruneIfRealtimeBuildFailover();
|
||||
/* wait hashmapmng prune and segworker distribute segrecord to hashmap */
|
||||
uint32 refCount;
|
||||
@ -3415,6 +3413,7 @@ void HashMapManagerMain()
|
||||
SPSCBlockingQueuePop(g_dispatcher->segQueue);
|
||||
}
|
||||
|
||||
PageRedoPipeline *myRedoLine = &g_dispatcher->pageLines[g_redoWorker->slotId];
|
||||
/**
|
||||
* step2: prune idle hashmap
|
||||
*
|
||||
@ -3425,7 +3424,7 @@ void HashMapManagerMain()
|
||||
// the head of redoItem hashmap linked list
|
||||
ondemand_htab_ctrl_t *nextHtabCtrl = g_instance.comm_cxt.predo_cxt.redoItemHashCtrl[g_redoWorker->slotId];
|
||||
// the tail of redoItem hashmap linked list
|
||||
ondemand_htab_ctrl_t *targetHtabCtrl = g_dispatcher->pageLines[g_redoWorker->slotId].managerThd->redoItemHashCtrl;
|
||||
ondemand_htab_ctrl_t *targetHtabCtrl = myRedoLine->managerThd->redoItemHashCtrl;
|
||||
// the processing redoItem hashmap
|
||||
ondemand_htab_ctrl_t *procHtabCtrl = nextHtabCtrl;
|
||||
while (nextHtabCtrl != targetHtabCtrl) {
|
||||
@ -3452,6 +3451,9 @@ void HashMapManagerMain()
|
||||
}
|
||||
CountRedoTime(g_redoWorker->timeCostList[TIME_COST_STEP_3]);
|
||||
|
||||
// step4: release for global xlog memory
|
||||
OndemandGlobalXLogMemReleaseIfNeed(&myRedoLine->batchThd->parseManager.memctl);
|
||||
|
||||
RedoInterruptCallBack();
|
||||
ADD_ABNORMAL_POSITION(12);
|
||||
pg_usleep(500000L); /* 500 ms */
|
||||
|
||||
@ -121,44 +121,75 @@ static RedoMemSlot *OndemandGlobalXLogMemAlloc()
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void OndemandGlobalXLogMemReleaseIfNeed(RedoMemManager *memctl)
|
||||
static RedoMemSlot *GetTailSlot(RedoMemManager *memctl, Buffer headSlotBuffer)
|
||||
{
|
||||
RedoMemSlot *tailSlot = &memctl->memslot[headSlotBuffer - 1];
|
||||
while (tailSlot->freeNext != InvalidBuffer) {
|
||||
tailSlot = &memctl->memslot[tailSlot->freeNext - 1];
|
||||
}
|
||||
return tailSlot;
|
||||
}
|
||||
|
||||
static void InsertBatchXLogMemToSlot(RedoMemManager *memctl, Buffer *targetSlot, Buffer headSlotBuffer)
|
||||
{
|
||||
RedoMemSlot *tailSlot = GetTailSlot(memctl, headSlotBuffer);
|
||||
Buffer oldFirst = AtomicReadBuffer(targetSlot);
|
||||
pg_memory_barrier();
|
||||
do {
|
||||
AtomicWriteBuffer(&tailSlot->freeNext, oldFirst);
|
||||
} while (!AtomicCompareExchangeBuffer(targetSlot, &oldFirst, headSlotBuffer));
|
||||
}
|
||||
|
||||
// used in hashmap manager, global firstreleaseslot should not be InvalidBuffer in healthy condition
|
||||
void OndemandGlobalXLogMemReleaseIfNeed(RedoMemManager *memctl)
|
||||
{
|
||||
RedoMemManager *glbmemctl = &ondemand_extreme_rto::g_dispatcher->parseManager.memctl;
|
||||
if (AtomicReadBuffer(&glbmemctl->firstreleaseslot) == InvalidBuffer) {
|
||||
Buffer firstreleaseslot = AtomicExchangeBuffer(&memctl->firstreleaseslot, InvalidBuffer);
|
||||
Buffer invalidbuffer = InvalidBuffer;
|
||||
if (!AtomicCompareExchangeBuffer(&glbmemctl->firstreleaseslot, &invalidbuffer, firstreleaseslot)) {
|
||||
AtomicWriteBuffer(&memctl->firstreleaseslot, firstreleaseslot);
|
||||
Buffer firstReleaseSlotBuffer = AtomicExchangeBuffer(&memctl->firstreleaseslot, InvalidBuffer);
|
||||
if (firstReleaseSlotBuffer == InvalidBuffer) {
|
||||
// set pipeline firstfreeslot to pipeline firstreleaseslot, for next loop to global firstreleaseslot
|
||||
Buffer firstFreeSlotBuffer = AtomicExchangeBuffer(&memctl->firstfreeslot, InvalidBuffer);
|
||||
if (firstFreeSlotBuffer != InvalidBuffer) {
|
||||
InsertBatchXLogMemToSlot(memctl, &memctl->firstreleaseslot, firstFreeSlotBuffer);
|
||||
}
|
||||
} else {
|
||||
// set pipeline firstreleaseslot for global firstreleaseslot
|
||||
Buffer invalidBuffer = InvalidBuffer;
|
||||
if (!AtomicCompareExchangeBuffer(&glbmemctl->firstreleaseslot, &invalidBuffer, firstReleaseSlotBuffer)) {
|
||||
// exchange failed, give back
|
||||
InsertBatchXLogMemToSlot(memctl, &memctl->firstreleaseslot, firstReleaseSlotBuffer);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
RedoMemSlot *OndemandXLogMemAlloc(RedoMemManager *memctl)
|
||||
{
|
||||
RedoMemSlot *nextfreeslot = NULL;
|
||||
RedoMemSlot *nextFreeSlot = NULL;
|
||||
do {
|
||||
if (memctl->firstfreeslot == InvalidBuffer) {
|
||||
memctl->firstfreeslot = AtomicExchangeBuffer(&memctl->firstreleaseslot, InvalidBuffer);
|
||||
if (AtomicReadBuffer(&memctl->firstfreeslot) == InvalidBuffer) {
|
||||
AtomicWriteBuffer(&memctl->firstfreeslot, AtomicExchangeBuffer(&memctl->firstreleaseslot, InvalidBuffer));
|
||||
pg_read_barrier();
|
||||
}
|
||||
|
||||
if (memctl->firstfreeslot != InvalidBuffer) {
|
||||
nextfreeslot = &(memctl->memslot[memctl->firstfreeslot - 1]);
|
||||
memctl->firstfreeslot = nextfreeslot->freeNext;
|
||||
nextfreeslot->freeNext = InvalidBuffer;
|
||||
Buffer firstFreeSlotBuffer = AtomicExchangeBuffer(&memctl->firstfreeslot, InvalidBuffer);
|
||||
if (firstFreeSlotBuffer != InvalidBuffer) {
|
||||
nextFreeSlot = &(memctl->memslot[firstFreeSlotBuffer - 1]);
|
||||
AtomicWriteBuffer(&memctl->firstfreeslot, nextFreeSlot->freeNext);
|
||||
nextFreeSlot->freeNext = InvalidBuffer;
|
||||
}
|
||||
|
||||
if (nextfreeslot == NULL) {
|
||||
nextfreeslot = OndemandGlobalXLogMemAlloc();
|
||||
if (nextFreeSlot == NULL) {
|
||||
nextFreeSlot = OndemandGlobalXLogMemAlloc();
|
||||
}
|
||||
|
||||
if (memctl->doInterrupt != NULL) {
|
||||
memctl->doInterrupt();
|
||||
}
|
||||
} while (nextfreeslot == NULL);
|
||||
} while (nextFreeSlot == NULL);
|
||||
|
||||
pg_atomic_fetch_add_u32(&memctl->usedblknum, 1);
|
||||
return nextfreeslot;
|
||||
return nextFreeSlot;
|
||||
}
|
||||
|
||||
void OndemandXLogMemRelease(RedoMemManager *memctl, Buffer bufferid)
|
||||
@ -181,8 +212,6 @@ void OndemandXLogMemRelease(RedoMemManager *memctl, Buffer bufferid)
|
||||
AtomicWriteBuffer(&bufferslot->freeNext, oldFirst);
|
||||
} while (!AtomicCompareExchangeBuffer(&releasememctl->firstreleaseslot, &oldFirst, bufferid));
|
||||
pg_atomic_fetch_sub_u32(&memctl->usedblknum, 1);
|
||||
|
||||
OndemandGlobalXLogMemReleaseIfNeed(memctl);
|
||||
}
|
||||
|
||||
|
||||
@ -261,6 +290,7 @@ XLogRecParseState *OndemandXLogParseBufferAllocList(RedoParseManager *parsemanag
|
||||
void OndemandXLogParseBufferRelease(XLogRecParseState *recordstate)
|
||||
{
|
||||
if (recordstate->distributeStatus == XLOG_SKIP_DISTRIBUTE) {
|
||||
Assert(!SS_ONDEMAND_REALTIME_BUILD_NORMAL);
|
||||
// alloc in pageRedoWorker or backends
|
||||
pfree(recordstate);
|
||||
return;
|
||||
@ -335,7 +365,8 @@ long RedoRelationForOndemandExtremeRTO(Relation relation) {
|
||||
* @param dbId the dbNode of database
|
||||
* @return the redoEntry num of target database
|
||||
*/
|
||||
long RedoDatabaseForOndemandExtremeRTO(Oid dbNode) {
|
||||
long RedoDatabaseForOndemandExtremeRTO(Oid dbNode)
|
||||
{
|
||||
long entryNum = 0;
|
||||
Assert(OidIsValid(dbNode));
|
||||
|
||||
|
||||
@ -54,5 +54,6 @@ void OnDemandUpdateRealtimeBuildPrunePtr();
|
||||
void OnDemandNotifyHashMapPruneIfNeed();
|
||||
XLogRecParseType GetCurrentXLogRecParseType(XLogRecParseState *preState);
|
||||
bool IsRecParseStateHaveChildState(XLogRecParseState *checkState);
|
||||
void OndemandGlobalXLogMemReleaseIfNeed(RedoMemManager *memctl);
|
||||
|
||||
#endif /* ONDEMAND_EXTREME_RTO_REDO_UTILS_H */
|
||||
Reference in New Issue
Block a user