From 37196ce6f0a9b6a18c5a301aa835563ee1b1dd0d Mon Sep 17 00:00:00 2001 From: obdev Date: Tue, 7 Mar 2023 07:41:22 +0000 Subject: [PATCH] [CDCService] fix CDCService stuck when fetching log in sys tenant --- src/logservice/cdcservice/ob_cdc_fetcher.cpp | 30 ++++++++++++++------ src/logservice/cdcservice/ob_cdc_fetcher.h | 8 +++--- src/logservice/libobcdc/src/ob_log_rpc.cpp | 2 +- src/share/resource_manager/ob_group_list.h | 1 + 4 files changed, 27 insertions(+), 14 deletions(-) diff --git a/src/logservice/cdcservice/ob_cdc_fetcher.cpp b/src/logservice/cdcservice/ob_cdc_fetcher.cpp index 60afded1b..ff1b12bf7 100644 --- a/src/logservice/cdcservice/ob_cdc_fetcher.cpp +++ b/src/logservice/cdcservice/ob_cdc_fetcher.cpp @@ -391,7 +391,7 @@ int ObCdcFetcher::set_fetch_mode_before_fetch_log_(const ObLSID &ls_id, ctx.set_fetch_mode(FetchMode::FETCHMODE_ARCHIVE, "LSNotExistInPalf"); ret = OB_SUCCESS; } else { - LOG_WARN("logstream in sys tenant doesn't exist, unexpected", KR(ret)); + LOG_WARN("logstream in sys tenant doesn't exist, unexpected", KR(ret), K(ls_id)); } } else if (FetchMode::FETCHMODE_ARCHIVE == ctx.get_fetch_mode()) { int64_t end_ts_ns = OB_INVALID_TIMESTAMP; @@ -468,6 +468,7 @@ int ObCdcFetcher::ls_fetch_log_(const ObLSID &ls_id, // always reset remote_iter when need_init_iter is true // always set need_init_inter=true when switch fetch_mode bool need_init_iter = true; + bool log_exist_in_palf = true; int64_t retry_count = 0; const bool fetch_archive_only = ObCdcRpcTestFlag::is_fetch_archive_only(fetch_flag); // test switch fetch mode requires that the fetch mode should be FETCHMODE_ARCHIVE at first, and then @@ -500,7 +501,7 @@ int ObCdcFetcher::ls_fetch_log_(const ObLSID &ls_id, int64_t start_fetch_ts = ObTimeUtility::current_time(); bool fetch_log_succ = false; const int64_t MAX_RETRY_COUNT = 3; - if (is_time_up_(fetched_log_count, end_tstamp)) { // time up, stop fetching logs globally + if (is_time_up_(scan_round_count, end_tstamp)) { // time up, stop fetching logs globally frt.stop("TimeUP"); LOG_INFO("fetch log quit in time", K(end_tstamp), K(frt), K(fetched_log_count)); } // time up @@ -514,13 +515,19 @@ int ObCdcFetcher::ls_fetch_log_(const ObLSID &ls_id, ret = OB_SUCCESS; } else if (OB_ERR_OUT_OF_LOWER_BOUND == ret) { // switch to fetchmode_archive, when in FETCHMODE_ONLINE, remote_iter is not inited - need_init_iter = true; - ctx.set_fetch_mode(FetchMode::FETCHMODE_ARCHIVE, "PalfOutOfLowerBound"); - ret = OB_SUCCESS; + if (OB_SYS_TENANT_ID != tenant_id_) { + need_init_iter = true; + log_exist_in_palf = false; + ctx.set_fetch_mode(FetchMode::FETCHMODE_ARCHIVE, "PalfOutOfLowerBound"); + ret = OB_SUCCESS; + } else { + LOG_INFO("log in sys tenant may be recycled", KR(ret), K(ls_id), K(resp)); + } } else { LOG_WARN("fetching log in palf failed", KR(ret)); } } else { + log_exist_in_palf = true; need_init_iter = false; fetch_log_succ = true; } // fetch log succ @@ -535,10 +542,14 @@ int ObCdcFetcher::ls_fetch_log_(const ObLSID &ls_id, remote_iter.update_source_cb(); remote_iter.reset(); if (ls_exist_in_palf) { - // switch to palf, reset remote_iter - need_init_iter = true; - ctx.set_fetch_mode(FetchMode::FETCHMODE_ONLINE, "ArchiveIterEnd"); - ret = OB_SUCCESS; + if (log_exist_in_palf) { + // switch to palf, reset remote_iter + need_init_iter = true; + ctx.set_fetch_mode(FetchMode::FETCHMODE_ONLINE, "ArchiveIterEnd"); + ret = OB_SUCCESS; + } else { + ret = OB_ERR_OUT_OF_LOWER_BOUND; + } } else { // exit reach_max_lsn = true; @@ -559,6 +570,7 @@ int ObCdcFetcher::ls_fetch_log_(const ObLSID &ls_id, } } } else { // OB_SUCCESS + log_exist_in_palf = true; need_init_iter = false; fetch_log_succ = true; } // fetch log succ diff --git a/src/logservice/cdcservice/ob_cdc_fetcher.h b/src/logservice/cdcservice/ob_cdc_fetcher.h index 7e27a29e4..7fbad7d18 100644 --- a/src/logservice/cdcservice/ob_cdc_fetcher.h +++ b/src/logservice/cdcservice/ob_cdc_fetcher.h @@ -128,14 +128,14 @@ private: ClientLSCtx &ctx); int init_archive_source_(ClientLSCtx &ctx, ObLSID ls_id); // Check whether has reached time limit - inline bool is_time_up_(const int64_t log_count, const int64_t end_tstamp) + inline bool is_time_up_(const int64_t scan_round, const int64_t end_tstamp) { // every batch of logs, check whether has timed out - static const int64_t CHECK_TIMEOUT_LOG_COUNT = 100; - static const int64_t CHECK_TIMEOUT_LOG_INDEX = 10; + static const int64_t CHECK_TIMEOUT_SCAN_ROUND = 100; + static const int64_t CHECK_TIMEOUT_SCAN_INDEX = 10; bool bool_ret = false; - if (((log_count % CHECK_TIMEOUT_LOG_COUNT) == CHECK_TIMEOUT_LOG_INDEX)) { + if (((scan_round % CHECK_TIMEOUT_SCAN_ROUND) == CHECK_TIMEOUT_SCAN_INDEX)) { int64_t cur_time = ObTimeUtility::current_time(); bool_ret = cur_time > end_tstamp; } diff --git a/src/logservice/libobcdc/src/ob_log_rpc.cpp b/src/logservice/libobcdc/src/ob_log_rpc.cpp index 3bf61d2be..936f559de 100644 --- a/src/logservice/libobcdc/src/ob_log_rpc.cpp +++ b/src/logservice/libobcdc/src/ob_log_rpc.cpp @@ -51,7 +51,7 @@ int64_t max_rpc_proc_time = \ ATOMIC_LOAD(&ObLogRpc::g_rpc_process_handler_time_upper_limit); \ proxy.set_server((SVR)); \ - if (OB_FAIL(proxy.by(tenant_id).trace_time(true).timeout((TIMEOUT))\ + if (OB_FAIL(proxy.by(tenant_id).group_id(share::OBCG_CDCSERVICE).trace_time(true).timeout((TIMEOUT))\ .max_process_handler_time(static_cast(max_rpc_proc_time))\ .RPC((REQ), (ARG)))) { \ LOG_ERROR("rpc fail: " #RPC, "tenant_id", tenant_id, "svr", (SVR), "rpc_ret", ret, \ diff --git a/src/share/resource_manager/ob_group_list.h b/src/share/resource_manager/ob_group_list.h index 43e628de1..d178a4993 100644 --- a/src/share/resource_manager/ob_group_list.h +++ b/src/share/resource_manager/ob_group_list.h @@ -13,4 +13,5 @@ CGID_DEF(OBCG_DETECT_RS, 9) CGID_DEF(OBCG_LOC_CACHE, 10) CGID_DEF(OBCG_SQL_NIO, 11) CGID_DEF(OBCG_MYSQL_LOGIN, 12) +CGID_DEF(OBCG_CDCSERVICE, 13) CGID_DEF(OBCG_LQ, 100)