From d0292eae1ad3dc1a17b194b5d88f04556cb8fa5d Mon Sep 17 00:00:00 2001 From: obdev Date: Thu, 2 Mar 2023 16:26:11 +0000 Subject: [PATCH] [CdcService] fix observer core when rpc concurrently access the ClientLSCtx --- deps/oblib/src/lib/stat/ob_latch_define.h | 1 + deps/oblib/src/lib/wait_event/ob_wait_event.h | 3 +- .../src/lib/wait_event/ob_wait_event_desc.md | 4 ++ src/logservice/cdcservice/ob_cdc_fetcher.cpp | 62 +++++++++++-------- .../cdcservice/ob_cdc_rpc_processor.cpp | 2 +- src/logservice/cdcservice/ob_cdc_service.cpp | 2 +- src/logservice/cdcservice/ob_cdc_struct.cpp | 10 +-- src/logservice/cdcservice/ob_cdc_struct.h | 17 ++--- 8 files changed, 54 insertions(+), 47 deletions(-) diff --git a/deps/oblib/src/lib/stat/ob_latch_define.h b/deps/oblib/src/lib/stat/ob_latch_define.h index 89210fc749..cda095d187 100644 --- a/deps/oblib/src/lib/stat/ob_latch_define.h +++ b/deps/oblib/src/lib/stat/ob_latch_define.h @@ -307,6 +307,7 @@ LATCH_DEF(DDL_EXECUTE_LOCK, 294, "ddl execute lock", LATCH_FIFO, 2000, 0, DDL_EX LATCH_DEF(TENANT_IO_CONFIG_LOCK, 295, "tenant io config lock", LATCH_FIFO, 2000, 0, TENANT_IO_CONFIG_WAIT, "tenant io config lock") LATCH_DEF(SQL_WF_PARTICIPATOR_COND_LOCK, 296, "window function participator lock", LATCH_FIFO, 2000, 0, SQL_WF_PARTICIPATOR_COND_WAIT, "window function participator lock") LATCH_DEF(ARB_SERVER_CONFIG_LOCK, 297, "arbserver config lock", LATCH_FIFO, 2000, 0, ARB_SERVER_CONFIG_WAIT, "arbserver config lock") +LATCH_DEF(CDC_SERVICE_LS_CTX_LOCK, 298, "cdcservice clientlsctx lock", LATCH_FIFO, 2000, 0, CDC_SERVICE_LS_CTX_LOCK_WAIT, "cdcservice clientlsctx lock") LATCH_DEF(LATCH_END, 99999, "latch end", LATCH_FIFO, 2000, 0, WAIT_EVENT_END, "latch end") #endif diff --git a/deps/oblib/src/lib/wait_event/ob_wait_event.h b/deps/oblib/src/lib/wait_event/ob_wait_event.h index a04620c6ea..718ba3aac5 100644 --- a/deps/oblib/src/lib/wait_event/ob_wait_event.h +++ b/deps/oblib/src/lib/wait_event/ob_wait_event.h @@ -364,7 +364,7 @@ WAIT_EVENT_DEF(DDL_EXECUTE_LOCK_WAIT, 16056, "ddl execute lock wait", "", "", "" //replication group WAIT_EVENT_DEF(RG_TRANSFER_LOCK_WAIT, 17000, "transfer lock wait", "src_rg", "dst_rg", "transfer_pkey", CONCURRENCY, "transfer lock wait", false) -// liboblog +// libobcdc WAIT_EVENT_DEF(OBCDC_PART_MGR_SCHEMA_VERSION_WAIT, 18000, "oblog part mgr schema version wait", "", "", "", CONCURRENCY, "oblog part mgr schema version wait", true) WAIT_EVENT_DEF(OBCDC_PROGRESS_RECYCLE_LOCK_WAIT, 18001, "latch: obcdc progress recycle lock wait", "", "", "", CONCURRENCY, "latch: obcdc progress recycle lock wait", true) WAIT_EVENT_DEF(OBCDC_METAINFO_LOCK_WAIT, 18002, "latch: obcdc metainfo lock wait", "", "", "", CONCURRENCY, "latch: obcdc metainfo lock wait", true) @@ -375,6 +375,7 @@ WAIT_EVENT_DEF(OBCDC_TIMEZONE_GETTER_LOCK_WAIT, 18006, "latch: obcdc timezone ge WAIT_EVENT_DEF(OBCDC_FETCHLOG_ARPC_LOCK_WAIT, 18007, "latch: obcdc fetchlog arpc lock wait", "", "", "", CONCURRENCY, "latch: obcdc fetchlog arpc lock wait", true) WAIT_EVENT_DEF(OBCDC_FETCHSTREAM_CONTAINER_LOCK_WAIT, 18008, "latch: obcdc fetchstream container lock wait", "", "", "", CONCURRENCY, "latch: obcdc fetchstream container lock wait", true) WAIT_EVENT_DEF(EXT_SVR_BLACKLIST_LOCK_WAIT, 18009, "latch: external server blacklist lock wait", "", "", "", CONCURRENCY, "latch: external server blacklist lock wait", true) +WAIT_EVENT_DEF(CDC_SERVICE_LS_CTX_LOCK_WAIT, 18010, "latch: cdcservice clientlsctx lock wait", "", "", "", CONCURRENCY, "latch: cdcservice clientlsctx lock wait", true) // palf WAIT_EVENT_DEF(PALF_SW_SUBMIT_INFO_WAIT, 19000, "palf sw last submit log info lock wait", "", "", "", CONCURRENCY, "PALF_SW_SUBMIT_INFO_WAIT", true) diff --git a/deps/oblib/src/lib/wait_event/ob_wait_event_desc.md b/deps/oblib/src/lib/wait_event/ob_wait_event_desc.md index 188105e21d..68017533a5 100644 --- a/deps/oblib/src/lib/wait_event/ob_wait_event_desc.md +++ b/deps/oblib/src/lib/wait_event/ob_wait_event_desc.md @@ -311,3 +311,7 @@ Updating or getting max apply scn in apply status should be mutually exclusive. ## spinlock: gc handler lock wait The read and write operation on configs in gc handler should be mutually exclusive. + +## latch: cdcservice clientlsctx lock wait + +The read and write operation on source(RemoteLogParent) in ClientLSCtx should be mutually exclusive. diff --git a/src/logservice/cdcservice/ob_cdc_fetcher.cpp b/src/logservice/cdcservice/ob_cdc_fetcher.cpp index 4bc27c4743..60afded1bf 100644 --- a/src/logservice/cdcservice/ob_cdc_fetcher.cpp +++ b/src/logservice/cdcservice/ob_cdc_fetcher.cpp @@ -159,6 +159,8 @@ int ObCdcFetcher::fetch_log(const ObCdcLSFetchLogReq &req, EVENT_ADD(CLOG_EXTLOG_FETCH_LOG_COUNT, fetch_log_count); } + LOG_INFO("fetch_log done", K(req), K(resp)); + resp.set_err(ret); return ret; } @@ -222,6 +224,8 @@ int ObCdcFetcher::fetch_missing_log(const obrpc::ObCdcLSFetchMissLogReq &req, EVENT_ADD(CLOG_EXTLOG_FETCH_LOG_COUNT, fetch_log_count); } + LOG_INFO("fetch_missing_log done", K(req), K(resp)); + resp.set_err(ret); return ret; } @@ -360,7 +364,13 @@ int ObCdcFetcher::fetch_log_in_archive_( if (OB_ITER_END != ret) { LOG_WARN("iterate remote log failed", KR(ret), K(need_init_iter), K(ls_id)); } - } else { } + } else if (start_lsn != lsn) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("remote iterator returned unexpected log entry lsn", K(start_lsn), K(lsn), K(log_entry), K(ls_id), + K(remote_iter)); + } else { + + } } return ret; } @@ -451,8 +461,12 @@ int ObCdcFetcher::ls_fetch_log_(const ObLSID &ls_id, PalfGroupBufferIterator palf_iter; PalfHandleGuard palf_guard; // use cached remote_iter - ObRemoteLogGroupEntryIterator &remote_iter = ctx.get_remote_iter(); + ObCdcGetSourceFunctor get_source_func(ctx); + ObCdcUpdateSourceFunctor update_source_func(ctx); + ObRemoteLogGroupEntryIterator remote_iter(get_source_func, update_source_func); bool ls_exist_in_palf = true; + // always reset remote_iter when need_init_iter is true + // always set need_init_inter=true when switch fetch_mode bool need_init_iter = true; int64_t retry_count = 0; const bool fetch_archive_only = ObCdcRpcTestFlag::is_fetch_archive_only(fetch_flag); @@ -499,6 +513,7 @@ int ObCdcFetcher::ls_fetch_log_(const ObLSID &ls_id, need_init_iter = false; ret = OB_SUCCESS; } else if (OB_ERR_OUT_OF_LOWER_BOUND == ret) { + // switch to fetchmode_archive, when in FETCHMODE_ONLINE, remote_iter is not inited need_init_iter = true; ctx.set_fetch_mode(FetchMode::FETCHMODE_ARCHIVE, "PalfOutOfLowerBound"); ret = OB_SUCCESS; @@ -511,27 +526,16 @@ int ObCdcFetcher::ls_fetch_log_(const ObLSID &ls_id, } // fetch log succ } // fetch palf log else if (FetchMode::FETCHMODE_ARCHIVE == fetch_mode) { - // trust the iter_next_lsn_ in ctx, binding the iter_next_lsn_ in ctx with the iter_ in ctx, - // so some of the defensive conditional statements could be omitted - // 1. set iter_next_lsn_ to LOG_INVALID_LSN_VAL when initializing a ctx. - // 2. when a log is fetched in archive, update the iter_next_lsn_ - // 3. so if iter_next_lsn_ is valid, the iter_ must be initialized. - // 4. finally, we only check the lsn continuity here - if (ctx.get_iter_next_lsn() == resp.get_next_req_lsn()) { - need_init_iter = false; - } else { - need_init_iter = true; - remote_iter.update_source_cb(); - remote_iter.reset(); - } if (OB_FAIL(fetch_log_in_archive_(ls_id, remote_iter, resp.get_next_req_lsn(), need_init_iter, log_group_entry, lsn, ctx))) { if (OB_ITER_END == ret) { // when fetch to the end, the iter become invalid even if the new log is archived later, - // so just reset iter and iter_next_lsn - ctx.set_iter_next_lsn(LSN(LOG_INVALID_LSN_VAL)); + // cdcservice would continue to fetch log in palf or return result to cdc-connector, + // reset remote_iter in either condition. + remote_iter.update_source_cb(); + remote_iter.reset(); if (ls_exist_in_palf) { - // switch to palf + // switch to palf, reset remote_iter need_init_iter = true; ctx.set_fetch_mode(FetchMode::FETCHMODE_ONLINE, "ArchiveIterEnd"); ret = OB_SUCCESS; @@ -539,22 +543,22 @@ int ObCdcFetcher::ls_fetch_log_(const ObLSID &ls_id, // exit reach_max_lsn = true; } - } else if (OB_ENTRY_NOT_EXIST == ret) { + } else if (OB_ALREADY_IN_NOARCHIVE_MODE == ret) { // archive is not on ret = OB_ERR_OUT_OF_LOWER_BOUND; } else { + // other error code, retry because various error code would be returned, retry could fix some problem + // TODO: process the error code with clear semantic LOG_WARN("fetching log in archive failed", KR(ret), K(remote_iter), K(ls_id), K(resp)); + remote_iter.reset(); if (retry_count < MAX_RETRY_COUNT) { LOG_TRACE("retry on fetching remote log failure", KR(ret), K(retry_count), K(ctx)); retry_count++; - ctx.set_iter_next_lsn(LSN(LOG_INVALID_LSN_VAL)); need_init_iter = true; ret = OB_SUCCESS; } } - } else { - LSN remote_iter_next_lsn = lsn + log_group_entry.get_serialize_size(); - ctx.set_iter_next_lsn(remote_iter_next_lsn); + } else { // OB_SUCCESS need_init_iter = false; fetch_log_succ = true; } // fetch log succ @@ -564,12 +568,14 @@ int ObCdcFetcher::ls_fetch_log_(const ObLSID &ls_id, LOG_WARN("fetch mode is invalid", KR(ret), K(fetch_mode)); } // unexpected branch + // inc log fetch time in any condition + resp.inc_log_fetch_time(ObTimeUtility::current_time() - start_fetch_ts); + // retry on OB_ITER_END (when fetching logs in archive), OB_ALLOCATE_MEMORY_FAILED and // OB_ERR_OUT_OF_LOWER_BOUND (when fetching logs in palf), thus some return codes are blocked and the // return code is unable to be used for determine whether logEntry is successfully fetched. // update the resp/frt/ctx when the logentry is successfully fetched if (OB_SUCC(ret) && fetch_log_succ) { - resp.inc_log_fetch_time(ObTimeUtility::current_time() - start_fetch_ts); check_next_group_entry_(lsn, log_group_entry, fetched_log_count, resp, frt, reach_upper_limit, ctx); resp.set_progress(ctx.get_progress()); if (frt.is_stopped()) { @@ -590,6 +596,12 @@ int ObCdcFetcher::ls_fetch_log_(const ObLSID &ls_id, } } // while + // update source back when remote_iter is valid, needn't reset remote iter, + // because it won't be used afterwards + if (remote_iter.is_init()) { + remote_iter.update_source_cb(); + } + if (OB_SUCCESS == ret) { // do nothing if (ls_exist_in_palf && reach_max_lsn) { @@ -976,7 +988,7 @@ int ObCdcFetcher::prepare_berfore_fetch_missing_(const ObLSID &ls_id, } if (OB_SUCC(ret) && OB_ISNULL(ctx.get_source()) && OB_FAIL(init_archive_source_(ctx, ls_id))) { - if (OB_ENTRY_NOT_EXIST == ret) { + if (OB_ALREADY_IN_NOARCHIVE_MODE == ret) { ret = OB_SUCCESS; archive_is_on = false; } diff --git a/src/logservice/cdcservice/ob_cdc_rpc_processor.cpp b/src/logservice/cdcservice/ob_cdc_rpc_processor.cpp index d0f7c0dcfc..285ce10d8e 100644 --- a/src/logservice/cdcservice/ob_cdc_rpc_processor.cpp +++ b/src/logservice/cdcservice/ob_cdc_rpc_processor.cpp @@ -33,7 +33,7 @@ int __get_cdc_service(uint64_t tenant_id, cdc::ObCdcService *&cdc_service) ret = OB_ERR_UNEXPECTED; EXTLOG_LOG(WARN, "cdc_service is NULL", KR(ret), K(tenant_id)); } else { - EXTLOG_LOG(INFO, "__get_cdc_service", K(cdc_service), KP(log_service), K(tenant_id), K(MTL_ID())); + // get CdcService succ } return ret; diff --git a/src/logservice/cdcservice/ob_cdc_service.cpp b/src/logservice/cdcservice/ob_cdc_service.cpp index cfe96ca913..f7214c2119 100644 --- a/src/logservice/cdcservice/ob_cdc_service.cpp +++ b/src/logservice/cdcservice/ob_cdc_service.cpp @@ -33,7 +33,7 @@ int ObCdcService::get_backup_dest(const share::ObLSID &ls_id, share::ObBackupDes EXTLOG_LOG(WARN, "cdc service is null, unexpected", KR(ret)); } else if (FALSE_IT(archive_dest = cdc_service->get_archive_dest_info())) { } else if (archive_dest.empty()) { - ret = OB_ENTRY_NOT_EXIST; + ret = OB_ALREADY_IN_NOARCHIVE_MODE; EXTLOG_LOG(WARN, "archivelog is off yet", KR(ret), K(MTL_ID())); } else if (OB_FAIL(backup_dest.set(archive_dest.at(0).second))) { EXTLOG_LOG(WARN, "failed to set backup dest info", KR(ret), K(archive_dest)); diff --git a/src/logservice/cdcservice/ob_cdc_struct.cpp b/src/logservice/cdcservice/ob_cdc_struct.cpp index 0bdfbdb40f..59918b9deb 100644 --- a/src/logservice/cdcservice/ob_cdc_struct.cpp +++ b/src/logservice/cdcservice/ob_cdc_struct.cpp @@ -85,11 +85,8 @@ void ClientLSKey::reset() ClientLSCtx::ClientLSCtx() : is_inited_(false), + source_lock_(ObLatchIds::CDC_SERVICE_LS_CTX_LOCK), source_(NULL), - get_source_func_(*this), - update_source_func_(*this), - iter_(get_source_func_, update_source_func_), - iter_next_lsn_(palf::LOG_INVALID_LSN_VAL), fetch_mode_(FetchMode::FETCHMODE_UNKNOWN), last_touch_ts_(OB_INVALID_TIMESTAMP), client_progress_(OB_INVALID_TIMESTAMP) @@ -105,7 +102,6 @@ int ClientLSCtx::init(int64_t client_progress) { int ret = OB_SUCCESS; if (OB_INVALID_TIMESTAMP != client_progress) { - iter_next_lsn_ = palf::LSN(palf::LOG_INVALID_LSN_VAL); is_inited_ = true; set_progress(client_progress); set_fetch_mode(FetchMode::FETCHMODE_ONLINE, "ClientLSCtxInit"); @@ -124,8 +120,6 @@ void ClientLSCtx::reset() logservice::ObResSrcAlloctor::free(source_); source_ = NULL; } - iter_.reset(); - iter_next_lsn_ = palf::LSN(palf::LOG_INVALID_LSN_VAL); fetch_mode_ = FetchMode::FETCHMODE_UNKNOWN; last_touch_ts_ = OB_INVALID_TIMESTAMP; client_progress_ = OB_INVALID_TIMESTAMP; @@ -143,6 +137,7 @@ void ClientLSCtx::set_source(logservice::ObRemoteLogParent *source) ////////////////////////////////////////////////////////////////////////// int ObCdcGetSourceFunctor::operator()(const share::ObLSID &id, logservice::ObRemoteSourceGuard &guard) { int ret = OB_SUCCESS; + ObSpinLockGuard ctx_source_guard(ctx_.source_lock_); logservice::ObRemoteLogParent *ctx_source = ctx_.get_source(); if (OB_ISNULL(ctx_source)) { ret = OB_ERR_UNEXPECTED; @@ -164,6 +159,7 @@ int ObCdcGetSourceFunctor::operator()(const share::ObLSID &id, logservice::ObRem int ObCdcUpdateSourceFunctor::operator()(const share::ObLSID &id, logservice::ObRemoteLogParent *source) { int ret = OB_SUCCESS; UNUSED(id); + ObSpinLockGuard ctx_source_guard(ctx_.source_lock_); logservice::ObRemoteLogParent *ctx_source = ctx_.get_source(); if (OB_ISNULL(source)) { ret = OB_ERR_UNEXPECTED; diff --git a/src/logservice/cdcservice/ob_cdc_struct.h b/src/logservice/cdcservice/ob_cdc_struct.h index 2d5a661699..5b8b11a279 100644 --- a/src/logservice/cdcservice/ob_cdc_struct.h +++ b/src/logservice/cdcservice/ob_cdc_struct.h @@ -97,12 +97,6 @@ public: void set_source(logservice::ObRemoteLogParent *source); logservice::ObRemoteLogParent *get_source() { return source_; } - logservice::ObRemoteLogGroupEntryIterator &get_remote_iter() { - return iter_; - } - void set_iter_next_lsn(const palf::LSN &lsn) { iter_next_lsn_ = lsn; } - const palf::LSN &get_iter_next_lsn() const { return iter_next_lsn_; } - void set_fetch_mode(FetchMode mode, const char *reason) { FetchMode from = fetch_mode_, to = mode; fetch_mode_ = mode; @@ -117,18 +111,17 @@ public: int64_t get_progress() const { return client_progress_; } TO_STRING_KV(K_(source), - K_(iter), - K_(iter_next_lsn), K_(fetch_mode), K_(last_touch_ts), K_(client_progress)) + +friend class ObCdcGetSourceFunctor; +friend class ObCdcUpdateSourceFunctor; + private: bool is_inited_; + ObSpinLock source_lock_; logservice::ObRemoteLogParent *source_; - ObCdcGetSourceFunctor get_source_func_; - ObCdcUpdateSourceFunctor update_source_func_; - logservice::ObRemoteLogGroupEntryIterator iter_; - palf::LSN iter_next_lsn_; FetchMode fetch_mode_; int64_t last_touch_ts_; int64_t client_progress_;