[OBCDC][LogFetcher][CdcService] adapt for archive_lag_target
This commit is contained in:
committed by
ob-robot
parent
1568215ec4
commit
f49cb4aaeb
@ -398,7 +398,7 @@ int ObCdcFetcher::set_fetch_mode_before_fetch_log_(const ObLSID &ls_id,
|
|||||||
// set the switch interval to 10s in test switch fetch mode, the unit of measurement of log_ts(scn) is nano second.
|
// set the switch interval to 10s in test switch fetch mode, the unit of measurement of log_ts(scn) is nano second.
|
||||||
const int64_t SECOND_NS = 1000L * 1000 * 1000;
|
const int64_t SECOND_NS = 1000L * 1000 * 1000;
|
||||||
SCN end_scn;
|
SCN end_scn;
|
||||||
const int64_t SWITCH_INTERVAL = test_switch_fetch_mode ? 10L * SECOND_NS : 60L * SECOND_NS;
|
const int64_t SWITCH_INTERVAL = test_switch_fetch_mode ? 10L * SECOND_NS : 60 * SECOND_NS; //default 60s
|
||||||
if (OB_FAIL(palf_guard.get_end_scn(end_scn))) {
|
if (OB_FAIL(palf_guard.get_end_scn(end_scn))) {
|
||||||
LOG_WARN("get palf end ts failed", KR(ret));
|
LOG_WARN("get palf end ts failed", KR(ret));
|
||||||
} else {
|
} else {
|
||||||
@ -553,7 +553,9 @@ int ObCdcFetcher::ls_fetch_log_(const ObLSID &ls_id,
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// exit
|
// exit
|
||||||
|
resp.set_feedback_type(obrpc::ObCdcLSFetchLogResp::ARCHIVE_ITER_END_BUT_LS_NOT_EXIST_IN_PALF);
|
||||||
reach_max_lsn = true;
|
reach_max_lsn = true;
|
||||||
|
LOG_INFO("reach max lsn in archive but ls not exists in this server, need switch server", K(ls_id));
|
||||||
}
|
}
|
||||||
} else if (OB_NEED_RETRY == ret) {
|
} else if (OB_NEED_RETRY == ret) {
|
||||||
frt.stop("ArchiveNeedRetry");
|
frt.stop("ArchiveNeedRetry");
|
||||||
|
|||||||
@ -352,6 +352,8 @@ public:
|
|||||||
LAGGED_FOLLOWER = 0, // lagged follower
|
LAGGED_FOLLOWER = 0, // lagged follower
|
||||||
LOG_NOT_IN_THIS_SERVER = 1, // this server does not server this log
|
LOG_NOT_IN_THIS_SERVER = 1, // this server does not server this log
|
||||||
LS_OFFLINED = 2, // LS offlined
|
LS_OFFLINED = 2, // LS offlined
|
||||||
|
ARCHIVE_ITER_END_BUT_LS_NOT_EXIST_IN_PALF = 3, // Reach Max LSN in archive log but cannot switch
|
||||||
|
// to palf because ls not exists in current server
|
||||||
};
|
};
|
||||||
public:
|
public:
|
||||||
ObCdcLSFetchLogResp() { reset(); }
|
ObCdcLSFetchLogResp() { reset(); }
|
||||||
|
|||||||
@ -1272,7 +1272,9 @@ bool FetchStream::need_add_into_blacklist_(const KickOutReason reason)
|
|||||||
{
|
{
|
||||||
bool bool_ret = false;
|
bool bool_ret = false;
|
||||||
|
|
||||||
if ((NEED_SWITCH_SERVER == reason) || (DISCARDED == reason)) {
|
if ((NEED_SWITCH_SERVER == reason) ||
|
||||||
|
(DISCARDED == reason) ||
|
||||||
|
(ARCHIVE_ITER_END_BUT_LS_NOT_EXIST_IN_PALF == reason)) {
|
||||||
bool_ret = false;
|
bool_ret = false;
|
||||||
} else {
|
} else {
|
||||||
bool_ret = true;
|
bool_ret = true;
|
||||||
@ -1983,6 +1985,10 @@ FetchStream::KickOutReason FetchStream::get_feedback_reason_(const Feedback &fee
|
|||||||
reason = LS_OFFLINED;
|
reason = LS_OFFLINED;
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
case ObCdcLSFetchLogResp::ARCHIVE_ITER_END_BUT_LS_NOT_EXIST_IN_PALF:
|
||||||
|
reason = ARCHIVE_ITER_END_BUT_LS_NOT_EXIST_IN_PALF;
|
||||||
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
reason = NONE;
|
reason = NONE;
|
||||||
break;
|
break;
|
||||||
|
|||||||
@ -151,6 +151,9 @@ private:
|
|||||||
|
|
||||||
NEED_SWITCH_SERVER = 10, // There is a higher priority server that actively switch
|
NEED_SWITCH_SERVER = 10, // There is a higher priority server that actively switch
|
||||||
DISCARDED = 11, // Partition is discard
|
DISCARDED = 11, // Partition is discard
|
||||||
|
|
||||||
|
// Feedback
|
||||||
|
ARCHIVE_ITER_END_BUT_LS_NOT_EXIST_IN_PALF = 12, //same as ARCHIVE_ITER_END_BUT_LS_NOT_EXIST_IN_PALF
|
||||||
};
|
};
|
||||||
static const char *print_kick_out_reason_(const KickOutReason reason);
|
static const char *print_kick_out_reason_(const KickOutReason reason);
|
||||||
// Determine if the server needs to be blacklisted,
|
// Determine if the server needs to be blacklisted,
|
||||||
|
|||||||
@ -42,6 +42,9 @@ enum KickOutReason
|
|||||||
|
|
||||||
NEED_SWITCH_SERVER = 10, // There is a higher priority server that actively switch
|
NEED_SWITCH_SERVER = 10, // There is a higher priority server that actively switch
|
||||||
DISCARDED = 11, // Partition is discard
|
DISCARDED = 11, // Partition is discard
|
||||||
|
|
||||||
|
// Feedback
|
||||||
|
ARCHIVE_ITER_END_BUT_LS_NOT_EXIST_IN_PALF = 12, //same as ARCHIVE_ITER_END_BUT_LS_NOT_EXIST_IN_PALF
|
||||||
};
|
};
|
||||||
const char *print_switch_reason(const KickOutReason reason);
|
const char *print_switch_reason(const KickOutReason reason);
|
||||||
|
|
||||||
|
|||||||
@ -1301,7 +1301,9 @@ bool FetchStream::need_add_into_blacklist_(const KickOutReason reason)
|
|||||||
{
|
{
|
||||||
bool bool_ret = false;
|
bool bool_ret = false;
|
||||||
|
|
||||||
if ((NEED_SWITCH_SERVER == reason) || (DISCARDED == reason)) {
|
if ((NEED_SWITCH_SERVER == reason) ||
|
||||||
|
(DISCARDED == reason) ||
|
||||||
|
(ARCHIVE_ITER_END_BUT_LS_NOT_EXIST_IN_PALF == reason)) {
|
||||||
bool_ret = false;
|
bool_ret = false;
|
||||||
} else {
|
} else {
|
||||||
bool_ret = true;
|
bool_ret = true;
|
||||||
@ -1470,6 +1472,10 @@ KickOutReason FetchStream::get_feedback_reason_(const Feedback &feedback) const
|
|||||||
reason = LS_OFFLINED;
|
reason = LS_OFFLINED;
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
case ObCdcLSFetchLogResp::ARCHIVE_ITER_END_BUT_LS_NOT_EXIST_IN_PALF:
|
||||||
|
reason = ARCHIVE_ITER_END_BUT_LS_NOT_EXIST_IN_PALF;
|
||||||
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
reason = NONE;
|
reason = NONE;
|
||||||
break;
|
break;
|
||||||
|
|||||||
Reference in New Issue
Block a user