From 70b2e8f730835cd771c963734950394bdad40719 Mon Sep 17 00:00:00 2001 From: SanmuWangZJU Date: Wed, 25 Dec 2024 02:46:42 +0000 Subject: [PATCH] [OBCDC] Fix use wrong version to create log_stream in cdc --- src/logservice/libobcdc/src/ob_log_config.h | 2 ++ src/logservice/libobcdc/src/ob_log_instance.cpp | 8 ++++++++ src/logservice/libobcdc/src/ob_log_part_trans_task.cpp | 4 ++-- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/logservice/libobcdc/src/ob_log_config.h b/src/logservice/libobcdc/src/ob_log_config.h index e34b6a94c..015467b75 100644 --- a/src/logservice/libobcdc/src/ob_log_config.h +++ b/src/logservice/libobcdc/src/ob_log_config.h @@ -448,6 +448,8 @@ public: // ------------------------------------------------------------------------ // Test mode, used only in obtest and other test tool scenarios T_DEF_BOOL(test_mode_on, OB_CLUSTER_PARAMETER, 0, "0:disabled, 1:enabled"); + // Test mode fail while init + T_DEF_BOOL(test_mode_init_fail, OB_CLUSTER_PARAMETER, 0, "0:disabled, 1:enabled"); // if force fetch archive is on, cdc service will seek archive for all rpc request unconditionally T_DEF_BOOL(test_mode_force_fetch_archive, OB_CLUSTER_PARAMETER, 0, "0:disabled, 1:enabled"); diff --git a/src/logservice/libobcdc/src/ob_log_instance.cpp b/src/logservice/libobcdc/src/ob_log_instance.cpp index cb68cb021..8971c9f60 100644 --- a/src/logservice/libobcdc/src/ob_log_instance.cpp +++ b/src/logservice/libobcdc/src/ob_log_instance.cpp @@ -778,6 +778,7 @@ int ObLogInstance::init_components_(const uint64_t start_tstamp_ns) : TCONF.tb_black_list.str(); const bool enable_direct_load_inc = (1 == TCONF.enable_direct_load_inc); + const bool is_mock_fail_on_init = (TCONF.test_mode_on = 1 && TCONF.test_mode_init_fail != 0); if (OB_UNLIKELY(! is_working_mode_valid(working_mode))) { ret = OB_INVALID_CONFIG; @@ -1056,6 +1057,9 @@ int ObLogInstance::init_components_(const uint64_t start_tstamp_ns) LOG_ERROR("start_tenant_service_ failed", KR(ret)); } + if (is_mock_fail_on_init) { + ret = OB_ERR_UNEXPECTED; + } if (OB_SUCC(ret)) { LOG_INFO("init all components done", KR(ret), K(start_tstamp_ns), K_(sys_start_schema_version), K(max_cached_trans_ctx_count), K_(is_schema_split_mode), K_(enable_filter_sys_tenant)); @@ -1467,6 +1471,10 @@ void ObLogInstance::do_destroy_(const bool force_destroy) ObKVGlobalCache::get_instance().destroy(); ObMemoryDump::get_instance().destroy(); ObClockGenerator::destroy(); + if (OB_LIKELY(!ObTimerService::get_instance().is_stopped())) { + ObTimerService::get_instance().stop(); + ObTimerService::get_instance().wait(); + } ObTimerService::get_instance().destroy(); ObSimpleThreadPoolDynamicMgr::get_instance().destroy(); diff --git a/src/logservice/libobcdc/src/ob_log_part_trans_task.cpp b/src/logservice/libobcdc/src/ob_log_part_trans_task.cpp index 32ffd8789..78aba693a 100644 --- a/src/logservice/libobcdc/src/ob_log_part_trans_task.cpp +++ b/src/logservice/libobcdc/src/ob_log_part_trans_task.cpp @@ -3519,11 +3519,11 @@ int PartTransTask::commit( } else if (OB_FAIL(ObLogLSOpProcessor::process_ls_op( tls_id_.get_tenant_id(), commit_log_lsn, - commit_log_submit_ts, + trans_commit_version, ls_attr))) { if (OB_ENTRY_NOT_EXIST != ret) { LOG_ERROR("ObLogLSOpProcessor process_ls_op failed", KR(ret), K(tls_id_), K(tx_id), K(commit_log_lsn), - K(commit_log_submit_ts), K(ls_attr)); + K(trans_commit_version), K(commit_log_submit_ts), K(ls_attr)); } else { if (is_data_dict_mode) { // In Data dictionary, it need to fetch the log of the baseline data dict before adding a tenant,