[CDCService] fix CDCService stuck when fetching log in sys tenant

This commit is contained in:
obdev 2023-03-07 07:41:22 +00:00 committed by ob-robot
parent 7f06100d28
commit 37196ce6f0
4 changed files with 27 additions and 14 deletions

View File

@ -391,7 +391,7 @@ int ObCdcFetcher::set_fetch_mode_before_fetch_log_(const ObLSID &ls_id,
ctx.set_fetch_mode(FetchMode::FETCHMODE_ARCHIVE, "LSNotExistInPalf");
ret = OB_SUCCESS;
} else {
LOG_WARN("logstream in sys tenant doesn't exist, unexpected", KR(ret));
LOG_WARN("logstream in sys tenant doesn't exist, unexpected", KR(ret), K(ls_id));
}
} else if (FetchMode::FETCHMODE_ARCHIVE == ctx.get_fetch_mode()) {
int64_t end_ts_ns = OB_INVALID_TIMESTAMP;
@ -468,6 +468,7 @@ int ObCdcFetcher::ls_fetch_log_(const ObLSID &ls_id,
// always reset remote_iter when need_init_iter is true
// always set need_init_inter=true when switch fetch_mode
bool need_init_iter = true;
bool log_exist_in_palf = true;
int64_t retry_count = 0;
const bool fetch_archive_only = ObCdcRpcTestFlag::is_fetch_archive_only(fetch_flag);
// test switch fetch mode requires that the fetch mode should be FETCHMODE_ARCHIVE at first, and then
@ -500,7 +501,7 @@ int ObCdcFetcher::ls_fetch_log_(const ObLSID &ls_id,
int64_t start_fetch_ts = ObTimeUtility::current_time();
bool fetch_log_succ = false;
const int64_t MAX_RETRY_COUNT = 3;
if (is_time_up_(fetched_log_count, end_tstamp)) { // time up, stop fetching logs globally
if (is_time_up_(scan_round_count, end_tstamp)) { // time up, stop fetching logs globally
frt.stop("TimeUP");
LOG_INFO("fetch log quit in time", K(end_tstamp), K(frt), K(fetched_log_count));
} // time up
@ -514,13 +515,19 @@ int ObCdcFetcher::ls_fetch_log_(const ObLSID &ls_id,
ret = OB_SUCCESS;
} else if (OB_ERR_OUT_OF_LOWER_BOUND == ret) {
// switch to fetchmode_archive, when in FETCHMODE_ONLINE, remote_iter is not inited
need_init_iter = true;
ctx.set_fetch_mode(FetchMode::FETCHMODE_ARCHIVE, "PalfOutOfLowerBound");
ret = OB_SUCCESS;
if (OB_SYS_TENANT_ID != tenant_id_) {
need_init_iter = true;
log_exist_in_palf = false;
ctx.set_fetch_mode(FetchMode::FETCHMODE_ARCHIVE, "PalfOutOfLowerBound");
ret = OB_SUCCESS;
} else {
LOG_INFO("log in sys tenant may be recycled", KR(ret), K(ls_id), K(resp));
}
} else {
LOG_WARN("fetching log in palf failed", KR(ret));
}
} else {
log_exist_in_palf = true;
need_init_iter = false;
fetch_log_succ = true;
} // fetch log succ
@ -535,10 +542,14 @@ int ObCdcFetcher::ls_fetch_log_(const ObLSID &ls_id,
remote_iter.update_source_cb();
remote_iter.reset();
if (ls_exist_in_palf) {
// switch to palf, reset remote_iter
need_init_iter = true;
ctx.set_fetch_mode(FetchMode::FETCHMODE_ONLINE, "ArchiveIterEnd");
ret = OB_SUCCESS;
if (log_exist_in_palf) {
// switch to palf, reset remote_iter
need_init_iter = true;
ctx.set_fetch_mode(FetchMode::FETCHMODE_ONLINE, "ArchiveIterEnd");
ret = OB_SUCCESS;
} else {
ret = OB_ERR_OUT_OF_LOWER_BOUND;
}
} else {
// exit
reach_max_lsn = true;
@ -559,6 +570,7 @@ int ObCdcFetcher::ls_fetch_log_(const ObLSID &ls_id,
}
}
} else { // OB_SUCCESS
log_exist_in_palf = true;
need_init_iter = false;
fetch_log_succ = true;
} // fetch log succ

View File

@ -128,14 +128,14 @@ private:
ClientLSCtx &ctx);
int init_archive_source_(ClientLSCtx &ctx, ObLSID ls_id);
// Check whether has reached time limit
inline bool is_time_up_(const int64_t log_count, const int64_t end_tstamp)
inline bool is_time_up_(const int64_t scan_round, const int64_t end_tstamp)
{
// every batch of logs, check whether has timed out
static const int64_t CHECK_TIMEOUT_LOG_COUNT = 100;
static const int64_t CHECK_TIMEOUT_LOG_INDEX = 10;
static const int64_t CHECK_TIMEOUT_SCAN_ROUND = 100;
static const int64_t CHECK_TIMEOUT_SCAN_INDEX = 10;
bool bool_ret = false;
if (((log_count % CHECK_TIMEOUT_LOG_COUNT) == CHECK_TIMEOUT_LOG_INDEX)) {
if (((scan_round % CHECK_TIMEOUT_SCAN_ROUND) == CHECK_TIMEOUT_SCAN_INDEX)) {
int64_t cur_time = ObTimeUtility::current_time();
bool_ret = cur_time > end_tstamp;
}

View File

@ -51,7 +51,7 @@
int64_t max_rpc_proc_time = \
ATOMIC_LOAD(&ObLogRpc::g_rpc_process_handler_time_upper_limit); \
proxy.set_server((SVR)); \
if (OB_FAIL(proxy.by(tenant_id).trace_time(true).timeout((TIMEOUT))\
if (OB_FAIL(proxy.by(tenant_id).group_id(share::OBCG_CDCSERVICE).trace_time(true).timeout((TIMEOUT))\
.max_process_handler_time(static_cast<int32_t>(max_rpc_proc_time))\
.RPC((REQ), (ARG)))) { \
LOG_ERROR("rpc fail: " #RPC, "tenant_id", tenant_id, "svr", (SVR), "rpc_ret", ret, \

View File

@ -13,4 +13,5 @@ CGID_DEF(OBCG_DETECT_RS, 9)
CGID_DEF(OBCG_LOC_CACHE, 10)
CGID_DEF(OBCG_SQL_NIO, 11)
CGID_DEF(OBCG_MYSQL_LOGIN, 12)
CGID_DEF(OBCG_CDCSERVICE, 13)
CGID_DEF(OBCG_LQ, 100)