From aabbf9b9fb86f98c0e87079456ed6a57f8b9ec16 Mon Sep 17 00:00:00 2001 From: zxlzxlzxlzxlzxl Date: Mon, 5 Feb 2024 02:51:48 +0000 Subject: [PATCH] [CP] [CdcService] Retry when get iter_end from remote_log_iter when locating lsn --- src/logservice/cdcservice/ob_cdc_fetcher.cpp | 71 +++++++------------ src/logservice/cdcservice/ob_cdc_fetcher.h | 1 - src/logservice/cdcservice/ob_cdc_service.cpp | 40 +++++++++-- src/logservice/cdcservice/ob_cdc_service.h | 3 + .../cdcservice/ob_cdc_start_lsn_locator.cpp | 53 +++++++------- src/logservice/cdcservice/ob_cdc_struct.h | 2 +- 6 files changed, 90 insertions(+), 80 deletions(-) diff --git a/src/logservice/cdcservice/ob_cdc_fetcher.cpp b/src/logservice/cdcservice/ob_cdc_fetcher.cpp index f956ee93d4..abc9921692 100644 --- a/src/logservice/cdcservice/ob_cdc_fetcher.cpp +++ b/src/logservice/cdcservice/ob_cdc_fetcher.cpp @@ -350,7 +350,8 @@ int ObCdcFetcher::fetch_log_in_archive_( int ret = OB_SUCCESS; // always reserve 4K for archive header const int64_t SINGLE_READ_SIZE = 16 * 1024 * 1024L - 4 * 1024; - if (OB_ISNULL(ctx.get_source()) && OB_FAIL(init_archive_source_(ctx, ls_id))) { + const int64_t MAX_RETRY_COUNT = 4; + if (OB_ISNULL(ctx.get_source()) && OB_FAIL(ObCdcService::init_archive_source(ls_id, ctx))) { LOG_WARN("init archive source failed", K(ctx), K(ls_id)); } else { const char *buf = NULL; @@ -358,22 +359,30 @@ int ObCdcFetcher::fetch_log_in_archive_( share::SCN pre_scn; if (OB_FAIL(pre_scn.convert_from_ts(ctx.get_progress()/1000L))) { LOG_WARN("convert progress to scn failed", KR(ret), K(ctx)); - } else if (need_init_iter && OB_FAIL(remote_iter.init(tenant_id_, ls_id, pre_scn, - start_lsn, LSN(LOG_MAX_LSN_VAL), large_buffer_pool_, - log_ext_handler_, SINGLE_READ_SIZE))) { - LOG_WARN("init remote log iterator failed", KR(ret), K(tenant_id_), K(ls_id)); - } else if (OB_FAIL(remote_iter.next(log_entry, lsn, buf, buf_size))) { - // expected OB_ITER_END and OB_SUCCEES, error occurs when other code is returned. - if (OB_ITER_END != ret) { - LOG_WARN("iterate remote log failed", KR(ret), K(need_init_iter), K(ls_id)); - } - } else if (start_lsn != lsn) { - // to keep consistency with the ret code of palf - ret = OB_INVALID_DATA; - LOG_WARN("remote iterator returned unexpected log entry lsn", K(start_lsn), K(lsn), K(log_entry), K(ls_id), - K(remote_iter)); - } else { + } else { + int64_t retry_count = 0; + do { + if (! remote_iter.is_init() && OB_FAIL(remote_iter.init(tenant_id_, ls_id, pre_scn, + start_lsn, LSN(LOG_MAX_LSN_VAL), large_buffer_pool_, log_ext_handler_, SINGLE_READ_SIZE))) { + LOG_WARN("init remote log iterator failed", KR(ret), K(tenant_id_), K(ls_id)); + } else if (OB_FAIL(remote_iter.next(log_entry, lsn, buf, buf_size))) { + // expected OB_ITER_END and OB_SUCCEES, error occurs when other code is returned. + if (OB_ITER_END != ret) { + LOG_WARN("iterate remote log failed", KR(ret), K(need_init_iter), K(ls_id)); + } else { + remote_iter.update_source_cb(); + remote_iter.reset(); + LOG_INFO("get iter end from remote_iter, retry", K(retry_count), K(MAX_RETRY_COUNT)); + } + } else if (start_lsn != lsn) { + // to keep consistency with the ret code of palf + ret = OB_INVALID_DATA; + LOG_WARN("remote iterator returned unexpected log entry lsn", K(start_lsn), K(lsn), K(log_entry), K(ls_id), + K(remote_iter)); + } else { + } + } while (OB_ITER_END == ret && ++retry_count < MAX_RETRY_COUNT); } } return ret; @@ -993,34 +1002,6 @@ int ObCdcFetcher::prefill_resp_with_log_entry_(const ObLSID &ls_id, return ret; } -// called when source in ctx is null -int ObCdcFetcher::init_archive_source_(ClientLSCtx &ctx, ObLSID ls_id) { - int ret = OB_SUCCESS; - ObRemoteLogParent *source = ctx.get_source(); - if (OB_NOT_NULL(source)) { - LOG_WARN("archive source is not null, no need to init"); - } else if (OB_ISNULL(source = logservice::ObResSrcAlloctor::alloc(ObLogRestoreSourceType::LOCATION, ls_id))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("alloc RemoteLocationParent failed", KR(ret), K(tenant_id_), K(ls_id)); - } else { - share::ObBackupDest archive_dest; - if (OB_FAIL(ObCdcService::get_backup_dest(ls_id, archive_dest))) { - LOG_WARN("get backupdest from archivedestinfo failed", KR(ret), K(ls_id)); - } else if (OB_FAIL(static_cast(source)->set(archive_dest, SCN::max_scn()))) { - LOG_WARN("source set archive dest info failed", KR(ret), K(archive_dest)); - } else { - ctx.set_source(source); - LOG_INFO("init archive source succ", K(ctx), K(ls_id)); - } - - if (OB_FAIL(ret)) { - logservice::ObResSrcAlloctor::free(source); - source = nullptr; - } - } - return ret; -} - int ObCdcFetcher::prepare_berfore_fetch_missing_(const ObLSID &ls_id, ClientLSCtx &ctx, palf::PalfHandleGuard &palf_handle_guard, @@ -1037,7 +1018,7 @@ int ObCdcFetcher::prepare_berfore_fetch_missing_(const ObLSID &ls_id, } } - if (OB_SUCC(ret) && OB_ISNULL(ctx.get_source()) && OB_FAIL(init_archive_source_(ctx, ls_id))) { + if (OB_SUCC(ret) && OB_ISNULL(ctx.get_source()) && OB_FAIL(ObCdcService::init_archive_source(ls_id, ctx))) { if (OB_ALREADY_IN_NOARCHIVE_MODE == ret) { ret = OB_SUCCESS; archive_is_on = false; diff --git a/src/logservice/cdcservice/ob_cdc_fetcher.h b/src/logservice/cdcservice/ob_cdc_fetcher.h index ce1e2ada5b..f234326a26 100644 --- a/src/logservice/cdcservice/ob_cdc_fetcher.h +++ b/src/logservice/cdcservice/ob_cdc_fetcher.h @@ -135,7 +135,6 @@ private: LogEntryType &log_group_entry, LSN &lsn, ClientLSCtx &ctx); - int init_archive_source_(ClientLSCtx &ctx, ObLSID ls_id); // Check whether has reached time limit inline bool is_time_up_(const int64_t scan_round, const int64_t end_tstamp) { diff --git a/src/logservice/cdcservice/ob_cdc_service.cpp b/src/logservice/cdcservice/ob_cdc_service.cpp index 1ab998f290..7a9a387c16 100644 --- a/src/logservice/cdcservice/ob_cdc_service.cpp +++ b/src/logservice/cdcservice/ob_cdc_service.cpp @@ -77,6 +77,7 @@ int ObCdcService::get_backup_dest(const share::ObLSID &ls_id, share::ObBackupDes ObCdcService::ObCdcService() : is_inited_(false), stop_flag_(true), + tenant_id_(OB_INVALID_TENANT_ID), locator_(), fetcher_(), tg_id_(-1), @@ -114,6 +115,7 @@ int ObCdcService::init(const uint64_t tenant_id, } else if (OB_FAIL(create_tenant_tg_(tenant_id))) { EXTLOG_LOG(WARN, "cdc thread group create failed", KR(ret), K(tenant_id)); } else { + tenant_id_ = tenant_id; is_inited_ = true; } @@ -190,7 +192,7 @@ int ObCdcService::start() EXTLOG_LOG(WARN, "ObCdcService not init", K(ret)); } else if (OB_FAIL(log_ext_handler_.start(0))) { EXTLOG_LOG(WARN, "log ext handler start failed", K(ret)); - } else if (OB_FAIL(start_tenant_tg_(MTL_ID()))) { + } else if (OB_FAIL(start_tenant_tg_(tenant_id_))) { EXTLOG_LOG(ERROR, "start CDCService failed", KR(ret)); } else { stop_flag_ = false; @@ -202,13 +204,13 @@ int ObCdcService::start() void ObCdcService::stop() { ATOMIC_STORE(&stop_flag_, true); - stop_tenant_tg_(MTL_ID()); + stop_tenant_tg_(tenant_id_); log_ext_handler_.stop(); } void ObCdcService::wait() { - wait_tenant_tg_(MTL_ID()); + wait_tenant_tg_(tenant_id_); log_ext_handler_.wait(); // do nothing } @@ -217,13 +219,14 @@ void ObCdcService::destroy() { is_inited_ = false; stop_flag_ = true; - destroy_tenant_tg_(MTL_ID()); + destroy_tenant_tg_(tenant_id_); fetcher_.destroy(); locator_.destroy(); dest_info_.reset(); large_buffer_pool_.destroy(); ls_ctx_map_.destroy(); log_ext_handler_.destroy(); + tenant_id_ = OB_INVALID_TENANT_ID; } int ObCdcService::req_start_lsn_by_ts_ns(const obrpc::ObCdcReqStartLSNByTsReq &req, @@ -312,6 +315,35 @@ int ObCdcService::fetch_missing_log(const obrpc::ObCdcLSFetchMissLogReq &req, return ret; } +int ObCdcService::init_archive_source(const ObLSID &ls_id, + ClientLSCtx &ctx) +{ + int ret = OB_SUCCESS; + logservice::ObRemoteLogParent *source = ctx.get_source(); + if (OB_NOT_NULL(source)) { + EXTLOG_LOG(WARN, "archive source is not null, no need to init"); + } else if (OB_ISNULL(source = logservice::ObResSrcAlloctor::alloc(ObLogRestoreSourceType::LOCATION, ls_id))) { + ret = OB_ERR_UNEXPECTED; + EXTLOG_LOG(WARN, "alloc RemoteLocationParent failed", KR(ret), K(ls_id)); + } else { + share::ObBackupDest archive_dest; + if (OB_FAIL(get_backup_dest(ls_id, archive_dest))) { + EXTLOG_LOG(WARN, "get backupdest from archivedestinfo failed", KR(ret), K(ls_id)); + } else if (OB_FAIL(static_cast(source)->set(archive_dest, SCN::max_scn()))) { + EXTLOG_LOG(WARN, "source set archive dest info failed", KR(ret), K(archive_dest)); + } else { + ctx.set_source(source); + EXTLOG_LOG(WARN, "init archive source succ", K(ctx), K(ls_id)); + } + + if (OB_FAIL(ret)) { + logservice::ObResSrcAlloctor::free(source); + source = nullptr; + } + } + return ret; +} + int ObCdcService::query_tenant_archive_info_() { int ret = OB_SUCCESS; diff --git a/src/logservice/cdcservice/ob_cdc_service.h b/src/logservice/cdcservice/ob_cdc_service.h index 22dcb0a1dc..f043688e1d 100644 --- a/src/logservice/cdcservice/ob_cdc_service.h +++ b/src/logservice/cdcservice/ob_cdc_service.h @@ -72,6 +72,7 @@ class ObCdcService: public lib::TGRunnable { public: static int get_backup_dest(const share::ObLSID &ls_id, share::ObBackupDest &backup_dest); + static int init_archive_source(const ObLSID &ls_id, ClientLSCtx &ctx); public: ObCdcService(); ~ObCdcService(); @@ -138,6 +139,8 @@ private: private: bool is_inited_; volatile bool stop_flag_ CACHE_ALIGNED; + uint64_t tenant_id_; + ObCdcStartLsnLocator locator_; ObCdcFetcher fetcher_; diff --git a/src/logservice/cdcservice/ob_cdc_start_lsn_locator.cpp b/src/logservice/cdcservice/ob_cdc_start_lsn_locator.cpp index 2b0c87e262..41b359dc9f 100644 --- a/src/logservice/cdcservice/ob_cdc_start_lsn_locator.cpp +++ b/src/logservice/cdcservice/ob_cdc_start_lsn_locator.cpp @@ -231,39 +231,34 @@ int ObCdcStartLsnLocator::do_locate_ls_(const bool fetch_archive_only, } else { result_ts_ns = start_ts_ns; // for RemoteLogIterator::init - logservice::GetSourceFunc get_source_func = [&](const ObLSID& ls_id, logservice::ObRemoteSourceGuard &guard) ->int { - int ret = OB_SUCCESS; - logservice::ObRemoteLogParent *source = logservice::ObResSrcAlloctor::alloc(share::ObLogRestoreSourceType::LOCATION, ls_id); - logservice::ObRemoteLocationParent *location_source = static_cast(source); - if (OB_ISNULL(location_source)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("source allocated is null", KR(ret), K(ls_id)); - } else if (OB_FAIL(location_source->set(backup_dest, SCN::max_scn()))) { - LOG_WARN("set backup dest failed", KR(ret), K(backup_dest), K(ls_id)); - } else if (OB_FAIL(guard.set_source(location_source))) { - LOG_WARN("remote source guard set source failed", KR(ret), K(ls_id)); - } - - if (OB_FAIL(ret) && OB_NOT_NULL(location_source)) { - logservice::ObResSrcAlloctor::free(location_source); - location_source = nullptr; - } - return ret; - }; - logservice::ObRemoteLogGroupEntryIterator remote_group_iter(get_source_func); + ClientLSCtx ctx; + ObCdcGetSourceFunctor get_source_func(ctx); + ObCdcUpdateSourceFunctor update_source_func(ctx); + logservice::ObRemoteLogGroupEntryIterator remote_group_iter(get_source_func, update_source_func); + constexpr int64_t MAX_RETRY_COUNT = 4; // for RemoteLogIterator::next int64_t next_buf_size = 0; const char *next_buf = NULL; LSN lsn; - - if (OB_FAIL(remote_group_iter.init(tenant_id_, ls_id, start_scn, - result_lsn, LSN(palf::LOG_MAX_LSN_VAL), large_buffer_pool_, log_ext_handler_))) { - LOG_WARN("init remote group iter failed when retriving log group entry in start lsn locator", KR(ret), K(ls_id), K(tenant_id_)); - } else if (OB_FAIL(remote_group_iter.next(log_group_entry, lsn, next_buf, next_buf_size))) { - LOG_WARN("iterate through archive log failed", KR(ret), K(ls_id), K(tenant_id_)); - } else { - result_ts_ns = log_group_entry.get_scn().get_val_for_logservice(); - } + int64_t retry_time = 0; + do { + if (nullptr == ctx.get_source() && OB_FAIL(ObCdcService::init_archive_source(ls_id, ctx))) { + LOG_WARN("failed to init archive source", K(ctx), K(ls_id)); + } else if (! remote_group_iter.is_init() && OB_FAIL(remote_group_iter.init(tenant_id_, ls_id, start_scn, + result_lsn, LSN(palf::LOG_MAX_LSN_VAL), large_buffer_pool_, log_ext_handler_))) { + LOG_WARN("init remote group iter failed when retriving log group entry in start lsn locator", KR(ret), K(ls_id), K(tenant_id_)); + } else if (OB_FAIL(remote_group_iter.next(log_group_entry, lsn, next_buf, next_buf_size))) { + if (OB_ITER_END != ret) { + LOG_WARN("iterate through archive log failed", KR(ret), K(ls_id), K(tenant_id_)); + } else { + remote_group_iter.update_source_cb(); + remote_group_iter.reset(); + LOG_INFO("get iter end from remote_group_iter, retry", K(result_lsn), K(retry_time), K(MAX_RETRY_COUNT)); + } + } else { + result_ts_ns = log_group_entry.get_scn().get_val_for_logservice(); + } + } while (OB_ITER_END == ret && ++retry_time < MAX_RETRY_COUNT); } } // Unconditional setting ret code diff --git a/src/logservice/cdcservice/ob_cdc_struct.h b/src/logservice/cdcservice/ob_cdc_struct.h index e50d87516d..d0d09321a2 100644 --- a/src/logservice/cdcservice/ob_cdc_struct.h +++ b/src/logservice/cdcservice/ob_cdc_struct.h @@ -120,7 +120,7 @@ public: void set_progress(int64_t progress) { client_progress_ = progress; } int64_t get_progress() const { return client_progress_; } - TO_STRING_KV(K_(source), + TO_STRING_KV(KP_(source), K_(fetch_mode), K_(last_touch_ts), K_(client_progress))