fix wait issue for vacuum record redo
This commit is contained in:
@ -566,6 +566,7 @@ static void wait_valid_snapshot(XLogReaderState *record)
|
||||
xl_heap_clean* xlrec = NULL;
|
||||
uint64 blockcnt = 0;
|
||||
XLogRecPtr cur_transed_lsn = InvalidXLogRecPtr;
|
||||
XLogRecPtr txn_trying_lsn = InvalidXLogRecPtr;
|
||||
|
||||
if(rm_id != RM_HEAP2_ID || info != XLOG_HEAP2_CLEAN)
|
||||
return;
|
||||
@ -580,16 +581,18 @@ static void wait_valid_snapshot(XLogReaderState *record)
|
||||
!in_full_sync_dispatch()) {
|
||||
if(cur_transed_lsn == InvalidXLogRecPtr)
|
||||
cur_transed_lsn = getTransedTxnLsn(g_dispatcher->txnWorker);
|
||||
txn_trying_lsn = getTryingTxnLsn(g_dispatcher->txnWorker);
|
||||
/*
|
||||
* Normaly, it need wait for startup thread handle xact wal records, but there be a case
|
||||
* that if a very old xid commit and no new xact comes then xlrec->latestRemovedXid >
|
||||
* t_thrd.xact_cxt.ShmemVariableCache->standbyXmin all the time.
|
||||
*
|
||||
* So if all xact record before current vacuum record finished, then avoid wait.
|
||||
* And if startup go fast then here on lsn, it can avoid wait too.
|
||||
*/
|
||||
if (cur_transed_lsn <= GetXLogReplayRecPtr(NULL))
|
||||
if (cur_transed_lsn <= GetXLogReplayRecPtr(NULL) || txn_trying_lsn >= record->EndRecPtr)
|
||||
return;
|
||||
pg_usleep(10);
|
||||
|
||||
blockcnt++;
|
||||
if ((blockcnt & OUTPUT_WAIT_COUNT) == OUTPUT_WAIT_COUNT) {
|
||||
XLogRecPtr LatestReplayedRecPtr = GetXLogReplayRecPtr(NULL);
|
||||
|
@ -61,8 +61,9 @@ struct TxnRedoWorker {
|
||||
RedoItem *pendingTail; /* The tail of the RedoItem list. */
|
||||
RedoItem *procHead;
|
||||
RedoItem *procTail;
|
||||
XLogRecPtr dispatched_txn_lsn;
|
||||
XLogRecPtr transed_txn_lsn;
|
||||
XLogRecPtr dispatched_txn_lsn; /* Max lsn dispatched to txn worker*/
|
||||
XLogRecPtr transed_txn_lsn; /* Max lsn transfer to txn worker list*/
|
||||
XLogRecPtr txn_trying_lsn; /* EndPtr of trying record on txn worker*/
|
||||
};
|
||||
|
||||
XLogRecPtr getTransedTxnLsn(TxnRedoWorker *worker)
|
||||
@ -70,6 +71,11 @@ XLogRecPtr getTransedTxnLsn(TxnRedoWorker *worker)
|
||||
return (XLogRecPtr)pg_atomic_read_u64((volatile uint64*)&worker->transed_txn_lsn);
|
||||
}
|
||||
|
||||
XLogRecPtr getTryingTxnLsn(TxnRedoWorker *worker)
|
||||
{
|
||||
return (XLogRecPtr)pg_atomic_read_u64((volatile uint64*)&worker->txn_trying_lsn);
|
||||
}
|
||||
|
||||
TxnRedoWorker *StartTxnRedoWorker()
|
||||
{
|
||||
TxnRedoWorker *worker = (TxnRedoWorker *)palloc(sizeof(TxnRedoWorker));
|
||||
@ -267,6 +273,7 @@ void ApplyReadyTxnLogRecords(TxnRedoWorker *worker, bool forceAll)
|
||||
XLogReaderState *record = &item->record;
|
||||
XLogRecPtr lrEnd;
|
||||
|
||||
pg_atomic_write_u64(&worker->txn_trying_lsn, record->EndRecPtr);
|
||||
if (forceAll) {
|
||||
GetRedoStartTime(t_thrd.xlog_cxt.timeCost[TIME_COST_STEP_6]);
|
||||
XLogRecPtr lrRead; /* lastReplayedReadPtr */
|
||||
|
@ -39,5 +39,6 @@ void MoveTxnItemToApplyQueue(TxnRedoWorker* worker);
|
||||
void DumpTxnWorker(TxnRedoWorker* txnWorker);
|
||||
bool IsTxnWorkerIdle(TxnRedoWorker* worker);
|
||||
XLogRecPtr getTransedTxnLsn(TxnRedoWorker *worker);
|
||||
XLogRecPtr getTryingTxnLsn(TxnRedoWorker *worker);
|
||||
}
|
||||
#endif
|
||||
|
Reference in New Issue
Block a user