Fix bug:priority's log_ts may fallback.
This commit is contained in:
@ -81,16 +81,17 @@ int PriorityV1::compare(const AbstractPriority &rhs, int &result, ObStringHolder
|
|||||||
|
|
||||||
int PriorityV1::get_log_ts_(const share::ObLSID &ls_id, int64_t &log_ts)
|
int PriorityV1::get_log_ts_(const share::ObLSID &ls_id, int64_t &log_ts)
|
||||||
{
|
{
|
||||||
|
LC_TIME_GUARD(100_ms);
|
||||||
#define PRINT_WRAPPER KR(ret), K(MTL_ID()), K(ls_id), K(*this)
|
#define PRINT_WRAPPER KR(ret), K(MTL_ID()), K(ls_id), K(*this)
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
palf::PalfHandleGuard palf_handle_guard;
|
palf::PalfHandleGuard palf_handle_guard;
|
||||||
palf::AccessMode access_mode = palf::AccessMode::INVALID_ACCESS_MODE;
|
// palf::AccessMode access_mode = palf::AccessMode::INVALID_ACCESS_MODE;
|
||||||
if (OB_ISNULL(MTL(ObLogService*))) {
|
if (OB_ISNULL(MTL(ObLogService*))) {
|
||||||
COORDINATOR_LOG_(ERROR, "ObLogService is nullptr");
|
COORDINATOR_LOG_(ERROR, "ObLogService is nullptr");
|
||||||
} else if (OB_FAIL(MTL(ObLogService*)->open_palf(ls_id, palf_handle_guard))) {
|
} else if (CLICK_FAIL(MTL(ObLogService*)->open_palf(ls_id, palf_handle_guard))) {
|
||||||
COORDINATOR_LOG_(WARN, "open_palf failed");
|
COORDINATOR_LOG_(WARN, "open_palf failed");
|
||||||
} else if (OB_FAIL(palf_handle_guard.get_palf_handle()->get_access_mode(access_mode))) {
|
// } else if (CLICK_FAIL(palf_handle_guard.get_palf_handle()->get_access_mode(access_mode))) {
|
||||||
COORDINATOR_LOG_(WARN, "get_access_mode failed");
|
// COORDINATOR_LOG_(WARN, "get_access_mode failed");
|
||||||
// } else if (palf::AccessMode::APPEND != access_mode) {
|
// } else if (palf::AccessMode::APPEND != access_mode) {
|
||||||
// // Set log_ts to 0 when current access mode is not APPEND.
|
// // Set log_ts to 0 when current access mode is not APPEND.
|
||||||
// log_ts = 0;
|
// log_ts = 0;
|
||||||
@ -98,12 +99,12 @@ int PriorityV1::get_log_ts_(const share::ObLSID &ls_id, int64_t &log_ts)
|
|||||||
common::ObRole role;
|
common::ObRole role;
|
||||||
int64_t unused_pid = -1;
|
int64_t unused_pid = -1;
|
||||||
int64_t min_unreplay_log_ts_ns = OB_INVALID_TIMESTAMP;
|
int64_t min_unreplay_log_ts_ns = OB_INVALID_TIMESTAMP;
|
||||||
if (OB_FAIL(palf_handle_guard.get_role(role, unused_pid))) {
|
if (CLICK_FAIL(palf_handle_guard.get_role(role, unused_pid))) {
|
||||||
COORDINATOR_LOG_(WARN, "get_role failed");
|
COORDINATOR_LOG_(WARN, "get_role failed");
|
||||||
} else if (OB_FAIL(palf_handle_guard.get_max_ts_ns(log_ts))) {
|
} else if (CLICK_FAIL(palf_handle_guard.get_max_ts_ns(log_ts))) {
|
||||||
COORDINATOR_LOG_(WARN, "get_max_ts_ns failed");
|
COORDINATOR_LOG_(WARN, "get_max_ts_ns failed");
|
||||||
} else if (FOLLOWER == role) {
|
} else if (FOLLOWER == role) {
|
||||||
if (OB_FAIL(MTL(ObLogService*)->get_log_replay_service()->get_min_unreplayed_log_ts_ns(ls_id, min_unreplay_log_ts_ns))) {
|
if (CLICK_FAIL(MTL(ObLogService*)->get_log_replay_service()->get_min_unreplayed_log_ts_ns(ls_id, min_unreplay_log_ts_ns))) {
|
||||||
COORDINATOR_LOG_(WARN, "failed to get_min_unreplayed_log_ts_ns");
|
COORDINATOR_LOG_(WARN, "failed to get_min_unreplayed_log_ts_ns");
|
||||||
ret = OB_SUCCESS;
|
ret = OB_SUCCESS;
|
||||||
} else if (min_unreplay_log_ts_ns - 1 < log_ts) {
|
} else if (min_unreplay_log_ts_ns - 1 < log_ts) {
|
||||||
@ -111,6 +112,12 @@ int PriorityV1::get_log_ts_(const share::ObLSID &ls_id, int64_t &log_ts)
|
|||||||
log_ts = min_unreplay_log_ts_ns - 1;
|
log_ts = min_unreplay_log_ts_ns - 1;
|
||||||
} else {}
|
} else {}
|
||||||
} else {}
|
} else {}
|
||||||
|
// log_ts may fallback because palf's role may be different with apply_service.
|
||||||
|
// So we need check it here to keep inc update semantic.
|
||||||
|
if (log_ts < log_ts_) {
|
||||||
|
COORDINATOR_LOG_(TRACE, "new log_ts is smaller than current, no need update", K(role), K(min_unreplay_log_ts_ns), K(log_ts));
|
||||||
|
log_ts = log_ts_;
|
||||||
|
}
|
||||||
COORDINATOR_LOG_(TRACE, "get_log_ts_ finished", K(role), K(min_unreplay_log_ts_ns), K(log_ts));
|
COORDINATOR_LOG_(TRACE, "get_log_ts_ finished", K(role), K(min_unreplay_log_ts_ns), K(log_ts));
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
@ -119,7 +126,7 @@ int PriorityV1::get_log_ts_(const share::ObLSID &ls_id, int64_t &log_ts)
|
|||||||
|
|
||||||
int PriorityV1::refresh_(const share::ObLSID &ls_id)
|
int PriorityV1::refresh_(const share::ObLSID &ls_id)
|
||||||
{
|
{
|
||||||
LC_TIME_GUARD(1_s);
|
LC_TIME_GUARD(100_ms);
|
||||||
#define PRINT_WRAPPER KR(ret), K(MTL_ID()), K(*this)
|
#define PRINT_WRAPPER KR(ret), K(MTL_ID()), K(*this)
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObLeaderCoordinator* coordinator = MTL(ObLeaderCoordinator*);
|
ObLeaderCoordinator* coordinator = MTL(ObLeaderCoordinator*);
|
||||||
|
|||||||
Reference in New Issue
Block a user