【共享存储】flush buffer if need, which has need flush tag
This commit is contained in:
@ -30,6 +30,7 @@
|
|||||||
#include "ddes/dms/ss_dms_bufmgr.h"
|
#include "ddes/dms/ss_dms_bufmgr.h"
|
||||||
#include "securec_check.h"
|
#include "securec_check.h"
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
|
#include "access/double_write.h"
|
||||||
|
|
||||||
void InitDmsBufCtrl(void)
|
void InitDmsBufCtrl(void)
|
||||||
{
|
{
|
||||||
@ -765,3 +766,98 @@ bool DmsCheckBufAccessible()
|
|||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool SSTryFlushBuffer(BufferDesc *buf)
|
||||||
|
{
|
||||||
|
//copy from BufferAlloc
|
||||||
|
if (!backend_can_flush_dirty_page()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (LWLockConditionalAcquire(buf->content_lock, LW_SHARED)) {
|
||||||
|
if (dw_enabled() && pg_atomic_read_u32(&g_instance.ckpt_cxt_ctl->current_page_writer_count) > 0) {
|
||||||
|
if (!free_space_enough(buf->buf_id)) {
|
||||||
|
LWLockRelease(buf->content_lock);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
uint32 pos = 0;
|
||||||
|
pos = first_version_dw_single_flush(buf);
|
||||||
|
t_thrd.proc->dw_pos = pos;
|
||||||
|
FlushBuffer(buf, NULL);
|
||||||
|
g_instance.dw_single_cxt.single_flush_state[pos] = true;
|
||||||
|
t_thrd.proc->dw_pos = -1;
|
||||||
|
} else {
|
||||||
|
FlushBuffer(buf, NULL);
|
||||||
|
}
|
||||||
|
LWLockRelease(buf->content_lock);
|
||||||
|
ScheduleBufferTagForWriteback(t_thrd.storage_cxt.BackendWritebackContext, &buf->tag);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool SSTrySegFlushBuffer(BufferDesc* buf)
|
||||||
|
{
|
||||||
|
//copy from SegBufferAlloc
|
||||||
|
if (!backend_can_flush_dirty_page()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (LWLockConditionalAcquire(buf->content_lock, LW_SHARED)) {
|
||||||
|
FlushOneSegmentBuffer(buf->buf_id + 1);
|
||||||
|
LWLockRelease(buf->content_lock);
|
||||||
|
ScheduleBufferTagForWriteback(t_thrd.storage_cxt.BackendWritebackContext, &buf->tag);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** true :1)this buffer dms think need flush, and flush success
|
||||||
|
* 2) no need flush
|
||||||
|
* false: this flush dms think need flush, but cannot flush
|
||||||
|
*/
|
||||||
|
bool SSHelpFlushBufferIfNeed(BufferDesc* buf_desc)
|
||||||
|
{
|
||||||
|
if (!ENABLE_DMS) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (IsInitdb) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
|
||||||
|
if (buf_ctrl->state & BUF_DIRTY_NEED_FLUSH) {
|
||||||
|
// wait dw_init finish
|
||||||
|
while (!g_instance.dms_cxt.dw_init) {
|
||||||
|
pg_usleep(1000L);
|
||||||
|
}
|
||||||
|
|
||||||
|
XLogRecPtr pagelsn = BufferGetLSN(buf_desc);
|
||||||
|
if (!SS_IN_REFORM) {
|
||||||
|
ereport(PANIC,
|
||||||
|
(errmsg("[SS] this buffer should not exist with BUF_DIRTY_NEED_FLUSH but not in reform, "
|
||||||
|
"spc/db/rel/bucket fork-block: %u/%u/%u/%d %d-%u, page lsn (0x%llx), seg info:%u-%u",
|
||||||
|
buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode,
|
||||||
|
buf_desc->tag.rnode.bucketNode, buf_desc->tag.forkNum, buf_desc->tag.blockNum,
|
||||||
|
(unsigned long long)pagelsn, (unsigned int)buf_desc->extra->seg_fileno,
|
||||||
|
buf_desc->extra->seg_blockno)));
|
||||||
|
}
|
||||||
|
bool in_flush_copy = SS_IN_FLUSHCOPY;
|
||||||
|
bool in_recovery = !g_instance.dms_cxt.SSRecoveryInfo.recovery_pause_flag;
|
||||||
|
ereport(LOG,
|
||||||
|
(errmsg("[SS flush copy] ready to flush buffer with need flush, "
|
||||||
|
"spc/db/rel/bucket fork-block: %u/%u/%u/%d %d-%u, page lsn (0x%llx), seg info:%u-%u, reform phase "
|
||||||
|
"is in flush_copy:%d, in recovery:%d",
|
||||||
|
buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode,
|
||||||
|
buf_desc->tag.rnode.bucketNode, buf_desc->tag.forkNum, buf_desc->tag.blockNum,
|
||||||
|
(unsigned long long)pagelsn, (unsigned int)buf_desc->extra->seg_fileno, buf_desc->extra->seg_blockno,
|
||||||
|
in_flush_copy, in_recovery)));
|
||||||
|
if (IsSegmentBufferID(buf_desc->buf_id)) {
|
||||||
|
return SSTrySegFlushBuffer(buf_desc);
|
||||||
|
} else {
|
||||||
|
return SSTryFlushBuffer(buf_desc);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
@ -1274,6 +1274,11 @@ static int CBFlushCopy(void *db_handle, char *pageid)
|
|||||||
smgrcloseall();
|
smgrcloseall();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// only 1) primary restart 2) failover need flush_copy
|
||||||
|
if (SS_REFORM_REFORMER && g_instance.dms_cxt.dms_status == DMS_STATUS_IN && !SS_STANDBY_FAILOVER) {
|
||||||
|
return GS_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
BufferTag* tag = (BufferTag*)pageid;
|
BufferTag* tag = (BufferTag*)pageid;
|
||||||
Buffer buffer;
|
Buffer buffer;
|
||||||
SegSpace *spc = NULL;
|
SegSpace *spc = NULL;
|
||||||
@ -1295,7 +1300,7 @@ static int CBFlushCopy(void *db_handle, char *pageid)
|
|||||||
ErrorData* edata = CopyErrorData();
|
ErrorData* edata = CopyErrorData();
|
||||||
FlushErrorState();
|
FlushErrorState();
|
||||||
FreeErrorData(edata);
|
FreeErrorData(edata);
|
||||||
ereport(PANIC, (errmsg("[SS Flush Copy] Error happend, spc/db/rel/bucket fork-block: %u/%u/%u/%d %d-%u",
|
ereport(PANIC, (errmsg("[SS flush copy] Error happend, spc/db/rel/bucket fork-block: %u/%u/%u/%d %d-%u",
|
||||||
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
|
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
|
||||||
tag->forkNum, tag->blockNum)));
|
tag->forkNum, tag->blockNum)));
|
||||||
}
|
}
|
||||||
@ -1408,6 +1413,7 @@ static void CBReformStartNotify(void *db_handle, dms_role_t role, unsigned char
|
|||||||
if (ss_reform_type == DMS_REFORM_TYPE_FOR_FAILOVER_OPENGAUSS) {
|
if (ss_reform_type == DMS_REFORM_TYPE_FOR_FAILOVER_OPENGAUSS) {
|
||||||
g_instance.dms_cxt.SSRecoveryInfo.in_failover = true;
|
g_instance.dms_cxt.SSRecoveryInfo.in_failover = true;
|
||||||
if (role == DMS_ROLE_REFORMER) {
|
if (role == DMS_ROLE_REFORMER) {
|
||||||
|
g_instance.dms_cxt.dw_init = false;
|
||||||
// variable set order: SharedRecoveryInProgress -> failover_triggered -> dms_role
|
// variable set order: SharedRecoveryInProgress -> failover_triggered -> dms_role
|
||||||
volatile XLogCtlData *xlogctl = t_thrd.shemem_ptr_cxt.XLogCtl;
|
volatile XLogCtlData *xlogctl = t_thrd.shemem_ptr_cxt.XLogCtl;
|
||||||
SpinLockAcquire(&xlogctl->info_lck);
|
SpinLockAcquire(&xlogctl->info_lck);
|
||||||
|
@ -327,7 +327,6 @@ void ss_failover_dw_init()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
ckpt_shutdown_pagewriter();
|
ckpt_shutdown_pagewriter();
|
||||||
g_instance.dms_cxt.SSRecoveryInfo.in_flushcopy = false;
|
|
||||||
ss_failover_dw_init_internal();
|
ss_failover_dw_init_internal();
|
||||||
g_instance.dms_cxt.dw_init = true;
|
g_instance.dms_cxt.dw_init = true;
|
||||||
}
|
}
|
||||||
@ -346,5 +345,6 @@ void ss_switchover_promoting_dw_init()
|
|||||||
dw_exit(false);
|
dw_exit(false);
|
||||||
dw_ext_init();
|
dw_ext_init();
|
||||||
dw_init();
|
dw_init();
|
||||||
|
g_instance.dms_cxt.dw_init = true;
|
||||||
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS switchover] dw init finished")));
|
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS switchover] dw init finished")));
|
||||||
}
|
}
|
@ -2746,7 +2746,7 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumbe
|
|||||||
/* Pin the buffer and then release the buffer spinlock */
|
/* Pin the buffer and then release the buffer spinlock */
|
||||||
PinBuffer_Locked(buf);
|
PinBuffer_Locked(buf);
|
||||||
|
|
||||||
if (!SSPageCheckIfCanEliminate(buf)) {
|
if (!SSHelpFlushBufferIfNeed(buf)) {
|
||||||
// for dms this page cannot eliminate, get another one
|
// for dms this page cannot eliminate, get another one
|
||||||
UnpinBuffer(buf, true);
|
UnpinBuffer(buf, true);
|
||||||
continue;
|
continue;
|
||||||
@ -6350,6 +6350,7 @@ void CheckIOState(volatile void *buf_desc)
|
|||||||
bool StartBufferIO(BufferDesc *buf, bool for_input)
|
bool StartBufferIO(BufferDesc *buf, bool for_input)
|
||||||
{
|
{
|
||||||
uint32 buf_state;
|
uint32 buf_state;
|
||||||
|
bool dms_need_flush = false; // used in dms
|
||||||
|
|
||||||
Assert(!t_thrd.storage_cxt.InProgressBuf);
|
Assert(!t_thrd.storage_cxt.InProgressBuf);
|
||||||
|
|
||||||
@ -6390,8 +6391,15 @@ bool StartBufferIO(BufferDesc *buf, bool for_input)
|
|||||||
WaitIO(buf);
|
WaitIO(buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (ENABLE_DMS) {
|
||||||
|
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf->buf_id);
|
||||||
|
if (buf_ctrl->state & BUF_DIRTY_NEED_FLUSH) {
|
||||||
|
dms_need_flush = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Once we get here, there is definitely no I/O active on this buffer */
|
/* Once we get here, there is definitely no I/O active on this buffer */
|
||||||
if (for_input ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY)) {
|
if (for_input ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY) && !dms_need_flush) {
|
||||||
/* someone else already did the I/O */
|
/* someone else already did the I/O */
|
||||||
UnlockBufHdr(buf, buf_state);
|
UnlockBufHdr(buf, buf_state);
|
||||||
LWLockRelease(buf->io_in_progress_lock);
|
LWLockRelease(buf->io_in_progress_lock);
|
||||||
@ -6483,6 +6491,23 @@ static void TerminateBufferIO_common(BufferDesc *buf, bool clear_dirty, uint32 s
|
|||||||
if (ENABLE_INCRE_CKPT) {
|
if (ENABLE_INCRE_CKPT) {
|
||||||
if (!XLogRecPtrIsInvalid(pg_atomic_read_u64(&buf->extra->rec_lsn))) {
|
if (!XLogRecPtrIsInvalid(pg_atomic_read_u64(&buf->extra->rec_lsn))) {
|
||||||
remove_dirty_page_from_queue(buf);
|
remove_dirty_page_from_queue(buf);
|
||||||
|
} else if (ENABLE_DMS) {
|
||||||
|
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf->buf_id);
|
||||||
|
if (!(buf_ctrl->state & BUF_DIRTY_NEED_FLUSH)) {
|
||||||
|
ereport(PANIC, (errmodule(MOD_INCRE_CKPT), errcode(ERRCODE_INVALID_BUFFER),
|
||||||
|
(errmsg("buffer is dirty but not in dirty page queue in TerminateBufferIO_common"))));
|
||||||
|
}
|
||||||
|
buf_ctrl->state &= ~BUF_DIRTY_NEED_FLUSH;
|
||||||
|
XLogRecPtr pagelsn = BufferGetLSN(buf);
|
||||||
|
bool in_flush_copy = SS_IN_FLUSHCOPY;
|
||||||
|
bool in_recovery = !g_instance.dms_cxt.SSRecoveryInfo.recovery_pause_flag;
|
||||||
|
ereport(LOG,
|
||||||
|
(errmsg("[SS flush copy] finish flush buffer with need flush, "
|
||||||
|
"spc/db/rel/bucket fork-block: %u/%u/%u/%d %d-%u, page lsn (0x%llx), seg info:%u-%u, reform phase "
|
||||||
|
"is in flush_copy:%d, in recovery:%d",
|
||||||
|
buf->tag.rnode.spcNode, buf->tag.rnode.dbNode, buf->tag.rnode.relNode, buf->tag.rnode.bucketNode,
|
||||||
|
buf->tag.forkNum, buf->tag.blockNum, (unsigned long long)pagelsn,
|
||||||
|
(unsigned int)buf->extra->seg_fileno, buf->extra->seg_blockno, in_flush_copy, in_recovery)));
|
||||||
} else {
|
} else {
|
||||||
ereport(PANIC, (errmodule(MOD_INCRE_CKPT), errcode(ERRCODE_INVALID_BUFFER),
|
ereport(PANIC, (errmodule(MOD_INCRE_CKPT), errcode(ERRCODE_INVALID_BUFFER),
|
||||||
(errmsg("buffer is dirty but not in dirty page queue in TerminateBufferIO_common"))));
|
(errmsg("buffer is dirty but not in dirty page queue in TerminateBufferIO_common"))));
|
||||||
|
@ -75,6 +75,7 @@ void AbortSegBufferIO(void)
|
|||||||
static bool SegStartBufferIO(BufferDesc *buf, bool forInput)
|
static bool SegStartBufferIO(BufferDesc *buf, bool forInput)
|
||||||
{
|
{
|
||||||
uint32 buf_state;
|
uint32 buf_state;
|
||||||
|
bool dms_need_flush = false; // used in dms
|
||||||
|
|
||||||
SegmentCheck(!InProgressBuf);
|
SegmentCheck(!InProgressBuf);
|
||||||
|
|
||||||
@ -98,7 +99,14 @@ static bool SegStartBufferIO(BufferDesc *buf, bool forInput)
|
|||||||
WaitIO(buf);
|
WaitIO(buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY)) {
|
if (ENABLE_DMS) {
|
||||||
|
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf->buf_id);
|
||||||
|
if (buf_ctrl->state & BUF_DIRTY_NEED_FLUSH) {
|
||||||
|
dms_need_flush = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY) && !dms_need_flush) {
|
||||||
/* IO finished */
|
/* IO finished */
|
||||||
UnlockBufHdr(buf, buf_state);
|
UnlockBufHdr(buf, buf_state);
|
||||||
LWLockRelease(buf->io_in_progress_lock);
|
LWLockRelease(buf->io_in_progress_lock);
|
||||||
@ -128,6 +136,23 @@ void SegTerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bit
|
|||||||
if (ENABLE_INCRE_CKPT) {
|
if (ENABLE_INCRE_CKPT) {
|
||||||
if (!XLogRecPtrIsInvalid(pg_atomic_read_u64(&buf->extra->rec_lsn))) {
|
if (!XLogRecPtrIsInvalid(pg_atomic_read_u64(&buf->extra->rec_lsn))) {
|
||||||
remove_dirty_page_from_queue(buf);
|
remove_dirty_page_from_queue(buf);
|
||||||
|
} else if (ENABLE_DMS) {
|
||||||
|
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf->buf_id);
|
||||||
|
if (!(buf_ctrl->state & BUF_DIRTY_NEED_FLUSH)) {
|
||||||
|
ereport(PANIC, (errmodule(MOD_INCRE_CKPT), errcode(ERRCODE_INVALID_BUFFER),
|
||||||
|
(errmsg("buffer is dirty but not in dirty page queue in SegTerminateBufferIO"))));
|
||||||
|
}
|
||||||
|
buf_ctrl->state &= ~BUF_DIRTY_NEED_FLUSH;
|
||||||
|
XLogRecPtr pagelsn = BufferGetLSN(buf);
|
||||||
|
bool in_flush_copy = SS_IN_FLUSHCOPY;
|
||||||
|
bool in_recovery = !g_instance.dms_cxt.SSRecoveryInfo.recovery_pause_flag;
|
||||||
|
ereport(LOG,
|
||||||
|
(errmsg("[SS flush copy] finish seg flush buffer with need flush, "
|
||||||
|
"spc/db/rel/bucket fork-block: %u/%u/%u/%d %d-%u, page lsn (0x%llx), seg info:%u-%u, reform phase "
|
||||||
|
"is in flush_copy:%d, in recovery:%d",
|
||||||
|
buf->tag.rnode.spcNode, buf->tag.rnode.dbNode, buf->tag.rnode.relNode, buf->tag.rnode.bucketNode,
|
||||||
|
buf->tag.forkNum, buf->tag.blockNum, (unsigned long long)pagelsn,
|
||||||
|
(unsigned int)buf->extra->seg_fileno, buf->extra->seg_blockno, in_flush_copy, in_recovery)));
|
||||||
} else {
|
} else {
|
||||||
ereport(PANIC, (errmodule(MOD_INCRE_CKPT), errcode(ERRCODE_INVALID_BUFFER),
|
ereport(PANIC, (errmodule(MOD_INCRE_CKPT), errcode(ERRCODE_INVALID_BUFFER),
|
||||||
(errmsg("buffer is dirty but not in dirty page queue in TerminateBufferIO_common"))));
|
(errmsg("buffer is dirty but not in dirty page queue in TerminateBufferIO_common"))));
|
||||||
@ -704,7 +729,7 @@ BufferDesc *SegBufferAlloc(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum,
|
|||||||
|
|
||||||
SegPinBufferLocked(buf, &new_tag);
|
SegPinBufferLocked(buf, &new_tag);
|
||||||
|
|
||||||
if (!SSPageCheckIfCanEliminate(buf)) {
|
if (!SSHelpFlushBufferIfNeed(buf)) {
|
||||||
SegUnpinBuffer(buf);
|
SegUnpinBuffer(buf);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -77,5 +77,6 @@ void BufValidateDrc(BufferDesc *buf_desc);
|
|||||||
bool SSPageCheckIfCanEliminate(BufferDesc* buf_desc);
|
bool SSPageCheckIfCanEliminate(BufferDesc* buf_desc);
|
||||||
bool SSSegRead(SMgrRelation reln, ForkNumber forknum, char *buffer);
|
bool SSSegRead(SMgrRelation reln, ForkNumber forknum, char *buffer);
|
||||||
bool DmsCheckBufAccessible();
|
bool DmsCheckBufAccessible();
|
||||||
|
bool SSHelpFlushBufferIfNeed(BufferDesc* buf_desc);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
Reference in New Issue
Block a user