@ -62,13 +62,7 @@ void InitDmsContext(dms_context_t *dmsContext)
|
||||
dmsContext->inst_id = (unsigned int)SS_MY_INST_ID;
|
||||
dmsContext->sess_id = (unsigned int)(t_thrd.proc ? t_thrd.proc->logictid : t_thrd.myLogicTid + TotalProcs);
|
||||
dmsContext->db_handle = t_thrd.proc;
|
||||
if (AmDmsReformProcProcess()) {
|
||||
dmsContext->sess_type = DMS_SESSION_REFORM;
|
||||
} else if (AmPageRedoProcess() || AmStartupProcess()) {
|
||||
dmsContext->sess_type = DMS_SESSION_RECOVER;
|
||||
} else {
|
||||
dmsContext->sess_type = DMS_SESSION_NORMAL;
|
||||
}
|
||||
dmsContext->sess_type = DMSGetProcType4RequestPage();
|
||||
dmsContext->is_try = 0;
|
||||
}
|
||||
|
||||
@ -406,7 +400,7 @@ Buffer TerminateReadSegPage(BufferDesc *buf_desc, ReadBufferMode read_mode, SegS
|
||||
return buffer;
|
||||
}
|
||||
|
||||
Buffer DmsReadSegPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode)
|
||||
Buffer DmsReadSegPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode, bool* with_io)
|
||||
{
|
||||
BufferDesc *buf_desc = GetBufferDescriptor(buffer - 1);
|
||||
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
|
||||
@ -415,7 +409,16 @@ Buffer DmsReadSegPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode)
|
||||
return buffer;
|
||||
}
|
||||
|
||||
if (!DmsCheckBufAccessible()) {
|
||||
*with_io = false;
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (!DmsStartBufferIO(buf_desc, mode)) {
|
||||
if (!DmsCheckBufAccessible()) {
|
||||
*with_io = false;
|
||||
return 0;
|
||||
}
|
||||
return buffer;
|
||||
}
|
||||
|
||||
@ -425,7 +428,7 @@ Buffer DmsReadSegPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode)
|
||||
return TerminateReadSegPage(buf_desc, read_mode);
|
||||
}
|
||||
|
||||
Buffer DmsReadPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode)
|
||||
Buffer DmsReadPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode, bool* with_io)
|
||||
{
|
||||
BufferDesc *buf_desc = GetBufferDescriptor(buffer - 1);
|
||||
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
|
||||
@ -443,7 +446,16 @@ Buffer DmsReadPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode)
|
||||
pblk.lsn = buf_ctrl->pblk_lsn;
|
||||
}
|
||||
|
||||
if (!DmsCheckBufAccessible()) {
|
||||
*with_io = false;
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (!DmsStartBufferIO(buf_desc, mode)) {
|
||||
if (!DmsCheckBufAccessible()) {
|
||||
*with_io = false;
|
||||
return 0;
|
||||
}
|
||||
return buffer;
|
||||
}
|
||||
|
||||
@ -673,7 +685,7 @@ void CheckPageNeedSkipInRecovery(Buffer buf)
|
||||
Assert(!skip);
|
||||
}
|
||||
|
||||
unsigned int DMSGetProcType4RequestPage()
|
||||
dms_session_e DMSGetProcType4RequestPage()
|
||||
{
|
||||
// proc type used in DMS request page
|
||||
if (AmDmsReformProcProcess() || AmPageRedoProcess() || AmStartupProcess()) {
|
||||
@ -745,3 +757,11 @@ bool SSSegRead(SMgrRelation reln, ForkNumber forknum, char *buffer)
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool DmsCheckBufAccessible()
|
||||
{
|
||||
if (dms_drc_accessible((uint8)DRC_RES_PAGE_TYPE) || DMSGetProcType4RequestPage() == DMS_SESSION_RECOVER) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -5891,20 +5891,24 @@ retry:
|
||||
read_mode = RBM_ZERO_AND_LOCK;
|
||||
GetDmsBufCtrl(buffer - 1)->state &= ~BUF_READ_MODE_ZERO_LOCK;
|
||||
}
|
||||
bool with_io_in_progress = true;
|
||||
|
||||
if (IsSegmentBufferID(buf->buf_id)) {
|
||||
tmp_buffer = DmsReadSegPage(buffer, lock_mode, read_mode);
|
||||
tmp_buffer = DmsReadSegPage(buffer, lock_mode, read_mode, &with_io_in_progress);
|
||||
} else {
|
||||
tmp_buffer = DmsReadPage(buffer, lock_mode, read_mode);
|
||||
tmp_buffer = DmsReadPage(buffer, lock_mode, read_mode, &with_io_in_progress);
|
||||
}
|
||||
|
||||
if (tmp_buffer == 0) {
|
||||
/* failed to request newest page, release related locks, and retry */
|
||||
if (IsSegmentBufferID(buf->buf_id)) {
|
||||
SegTerminateBufferIO((BufferDesc *)buf, false, 0);
|
||||
} else {
|
||||
TerminateBufferIO(buf, false, 0);
|
||||
if (with_io_in_progress) {
|
||||
if (IsSegmentBufferID(buf->buf_id)) {
|
||||
SegTerminateBufferIO((BufferDesc *)buf, false, 0);
|
||||
} else {
|
||||
TerminateBufferIO(buf, false, 0);
|
||||
}
|
||||
}
|
||||
|
||||
LWLockRelease(buf->content_lock);
|
||||
|
||||
pg_usleep(5000L);
|
||||
@ -5947,18 +5951,21 @@ bool TryLockBuffer(Buffer buffer, int mode, bool must_wait)
|
||||
if (ENABLE_DMS && ret) {
|
||||
LWLockMode lock_mode = (mode == BUFFER_LOCK_SHARE) ? LW_SHARED : LW_EXCLUSIVE;
|
||||
Buffer tmp_buffer;
|
||||
bool with_io_in_progress = true;
|
||||
if (IsSegmentBufferID(buf->buf_id)) {
|
||||
tmp_buffer = DmsReadSegPage(buffer, lock_mode, RBM_NORMAL);
|
||||
tmp_buffer = DmsReadSegPage(buffer, lock_mode, RBM_NORMAL, &with_io_in_progress);
|
||||
} else {
|
||||
tmp_buffer = DmsReadPage(buffer, lock_mode, RBM_NORMAL);
|
||||
tmp_buffer = DmsReadPage(buffer, lock_mode, RBM_NORMAL, &with_io_in_progress);
|
||||
}
|
||||
|
||||
if (tmp_buffer == 0) {
|
||||
/* failed to request newest page, release related locks, and retry */
|
||||
if (IsSegmentBufferID(buf->buf_id)) {
|
||||
SegTerminateBufferIO((BufferDesc *)buf, false, 0);
|
||||
} else {
|
||||
TerminateBufferIO(buf, false, 0);
|
||||
if (with_io_in_progress) {
|
||||
if (IsSegmentBufferID(buf->buf_id)) {
|
||||
SegTerminateBufferIO((BufferDesc *)buf, false, 0);
|
||||
} else {
|
||||
TerminateBufferIO(buf, false, 0);
|
||||
}
|
||||
}
|
||||
LWLockRelease(buf->content_lock);
|
||||
ret = false;
|
||||
@ -5989,18 +5996,21 @@ retry:
|
||||
|
||||
if (ENABLE_DMS && ret) {
|
||||
Buffer tmp_buffer;
|
||||
bool with_io_in_progress = true;
|
||||
if (IsSegmentBufferID(buf->buf_id)) {
|
||||
tmp_buffer = DmsReadSegPage(buffer, LW_EXCLUSIVE, RBM_NORMAL);
|
||||
tmp_buffer = DmsReadSegPage(buffer, LW_EXCLUSIVE, RBM_NORMAL, &with_io_in_progress);
|
||||
} else {
|
||||
tmp_buffer = DmsReadPage(buffer, LW_EXCLUSIVE, RBM_NORMAL);
|
||||
tmp_buffer = DmsReadPage(buffer, LW_EXCLUSIVE, RBM_NORMAL, &with_io_in_progress);
|
||||
}
|
||||
|
||||
/* failed to request newest page, release related locks, and retry */
|
||||
if (tmp_buffer == 0) {
|
||||
if (IsSegmentBufferID(buf->buf_id)) {
|
||||
SegTerminateBufferIO((BufferDesc *)buf, false, 0);
|
||||
} else {
|
||||
TerminateBufferIO(buf, false, 0);
|
||||
if (with_io_in_progress) {
|
||||
if (IsSegmentBufferID(buf->buf_id)) {
|
||||
SegTerminateBufferIO((BufferDesc *)buf, false, 0);
|
||||
} else {
|
||||
TerminateBufferIO(buf, false, 0);
|
||||
}
|
||||
}
|
||||
LWLockRelease(buf->content_lock);
|
||||
|
||||
|
||||
@ -56,8 +56,8 @@ bool StartReadPage(BufferDesc *buf_desc, LWLockMode mode);
|
||||
void ClearReadHint(int buf_id, bool buf_deleted = false);
|
||||
Buffer TerminateReadPage(BufferDesc* buf_desc, ReadBufferMode read_mode, const XLogPhyBlock *pblk);
|
||||
Buffer TerminateReadSegPage(BufferDesc *buf_desc, ReadBufferMode read_mode, SegSpace *spc = NULL);
|
||||
Buffer DmsReadPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode);
|
||||
Buffer DmsReadSegPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode);
|
||||
Buffer DmsReadPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode, bool *with_io);
|
||||
Buffer DmsReadSegPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode, bool *with_io);
|
||||
bool DmsReleaseOwner(BufferTag buf_tag, int buf_id);
|
||||
int32 CheckBuf4Rebuild(BufferDesc* buf_desc);
|
||||
int SSLockAcquire(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock, bool dontWait,
|
||||
@ -72,9 +72,10 @@ void TransformLockTagToDmsLatch(dms_drlatch_t* dlatch, const LOCKTAG locktag);
|
||||
void CheckPageNeedSkipInRecovery(Buffer buf);
|
||||
void SmgrNetPageCheckDiskLSN(BufferDesc* buf_desc, ReadBufferMode read_mode, const XLogPhyBlock *pblk);
|
||||
void SegNetPageCheckDiskLSN(BufferDesc* buf_desc, ReadBufferMode read_mode, SegSpace *spc);
|
||||
unsigned int DMSGetProcType4RequestPage();
|
||||
dms_session_e DMSGetProcType4RequestPage();
|
||||
void BufValidateDrc(BufferDesc *buf_desc);
|
||||
bool SSPageCheckIfCanEliminate(BufferDesc* buf_desc);
|
||||
bool SSSegRead(SMgrRelation reln, ForkNumber forknum, char *buffer);
|
||||
bool DmsCheckBufAccessible();
|
||||
|
||||
#endif
|
||||
|
||||
Reference in New Issue
Block a user