From a434fb6e6dfdb9e08e908c45c7fc4f6943eadb21 Mon Sep 17 00:00:00 2001 From: zxlzxlzxlzxlzxl Date: Wed, 24 May 2023 12:41:37 +0000 Subject: [PATCH] [OBCDC] Fix OBCDC exits when fetching missing log failed --- src/logservice/libobcdc/src/ob_log_config.h | 3 ++ .../libobcdc/src/ob_log_ls_fetch_ctx.cpp | 1 + .../libobcdc/src/ob_log_ls_fetch_stream.cpp | 31 +++++++++++++------ .../logfetcher/ob_log_ls_fetch_stream.cpp | 18 ++++++++--- 4 files changed, 39 insertions(+), 14 deletions(-) diff --git a/src/logservice/libobcdc/src/ob_log_config.h b/src/logservice/libobcdc/src/ob_log_config.h index f2e7bdc596..8258d02489 100644 --- a/src/logservice/libobcdc/src/ob_log_config.h +++ b/src/logservice/libobcdc/src/ob_log_config.h @@ -409,6 +409,9 @@ public: // enable test_mode_switch_fetch_mode to test whether cdc service can fetch log correctly when switching fetch mode T_DEF_BOOL(test_mode_switch_fetch_mode, OB_CLUSTER_PARAMETER, 0, "0:disabled 1:enabled"); + // simulate fetch missing error when fetching missing log for the first time + T_DEF_BOOL(test_fetch_missing_errsim, OB_CLUSTER_PARAMETER, 0, "0:disabled, 1:enabled"); + // Whether check tenant status for each schema request with tenant_id under test mode, default disabled T_DEF_BOOL(test_mode_force_check_tenant_status, OB_CLUSTER_PARAMETER, 0, "0:disabled, 1:enabled"); diff --git a/src/logservice/libobcdc/src/ob_log_ls_fetch_ctx.cpp b/src/logservice/libobcdc/src/ob_log_ls_fetch_ctx.cpp index 0c30e46eac..f32d027091 100644 --- a/src/logservice/libobcdc/src/ob_log_ls_fetch_ctx.cpp +++ b/src/logservice/libobcdc/src/ob_log_ls_fetch_ctx.cpp @@ -296,6 +296,7 @@ if (! mem_storage_.is_inited()) { void LSFetchCtx::reset_memory_storage() { mem_storage_.destroy(); + LOG_DEBUG("reset memory storage", KPC(this)); } int LSFetchCtx::get_next_group_entry(palf::LogGroupEntry &group_entry, palf::LSN &lsn) diff --git a/src/logservice/libobcdc/src/ob_log_ls_fetch_stream.cpp b/src/logservice/libobcdc/src/ob_log_ls_fetch_stream.cpp index facf398f07..dae0b87bc3 100644 --- a/src/logservice/libobcdc/src/ob_log_ls_fetch_stream.cpp +++ b/src/logservice/libobcdc/src/ob_log_ls_fetch_stream.cpp @@ -961,10 +961,9 @@ int FetchStream::handle_fetch_archive_task_(volatile bool &stop_flag) if (OB_TMP_FAIL(set_(kick_out_info, tls_id, KickOutReason::FETCH_LOG_FAIL_IN_DIRECT_MODE))) { LOG_WARN("set kickout info failed", KR(tmp_ret), K(kick_out_info), K(tls_id)); } - } else { - // ret equals OB_ITER_END } // retry on fetch remote log failure anyway + // for all scenario above, no need to fetch log and need to reset remote iterator. need_fetch_log = false; ls_fetch_ctx_->reset_remote_iter(); ret = OB_SUCCESS; @@ -978,6 +977,12 @@ int FetchStream::handle_fetch_archive_task_(volatile bool &stop_flag) } else if (OB_NEED_RETRY == ret) { LOG_WARN("read_group_entry failed, retry", KR(ret), K(log_group_entry), K(lsn), K(tls_id)); need_fetch_log = false; + // reset remote iter to fetch log that match the next_lsn in progress next round, + // otherwise incorrect log may be fetched. + ls_fetch_ctx_->reset_remote_iter(); + // reset memory storage to prevent the remain logentry in mem_storage from + // disruppting the iteration of log group entry. + ls_fetch_ctx_->reset_memory_storage(); ret = OB_SUCCESS; } } else if (OB_FAIL(ls_fetch_ctx_->update_progress(log_group_entry, lsn))) { @@ -1015,12 +1020,6 @@ int FetchStream::handle_fetch_archive_task_(volatile bool &stop_flag) } } - // rewrite ret code when ret equals OB_NEED_RETRY. - // TODO: IS THIS A REDUNDANT CODE FRAGMENT? - if (OB_NEED_RETRY == ret) { - ret = OB_SUCCESS; - } - // when exit from loop, there could still be some fetch tasks to be synchronized if (OB_SUCC(ret)) { int64_t flush_time = 0; @@ -1032,6 +1031,12 @@ int FetchStream::handle_fetch_archive_task_(volatile bool &stop_flag) read_log_time, fetch_remote_time, flush_time, tsi); } } + + // rewrite ret code when ret equals OB_NEED_RETRY. + if (OB_NEED_RETRY == ret) { + ret = OB_SUCCESS; + } + if (OB_SUCC(ret)) { if (kick_out_info.need_kick_out()) { if (OB_FAIL(kick_out_task_(kick_out_info))) { @@ -1560,7 +1565,13 @@ int FetchStream::fetch_miss_log_( { int ret = OB_SUCCESS; const ClientFetchingMode fetching_mode = ls_fetch_ctx.get_fetching_mode(); - if (! is_fetching_mode_valid(fetching_mode)) { + + static bool fetch_missing_fail = true; + if ((1 == TCONF.test_fetch_missing_errsim) && fetch_missing_fail) { + fetch_missing_fail = false; + ret = OB_NEED_RETRY; + LOG_ERROR("errsim fetch missing log fail", K(ls_fetch_ctx)); + } else if (! is_fetching_mode_valid(fetching_mode)) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("fetching mode is not valid", KR(ret), K(fetching_mode)); } else if (is_integrated_fetching_mode(fetching_mode)) { @@ -1579,6 +1590,8 @@ int FetchStream::fetch_miss_log_( // mock FetchLogSRpc here if (OB_FAIL(fetch_miss_log_direct_(miss_log_array, timeout, fetch_srpc, ls_fetch_ctx))) { LOG_ERROR("fetch missing log direct failed", KR(ret), K(ls_fetch_ctx), K(miss_log_array)); + // rewrite ret code to make sure that cdc wouldn't exit because fetch_missing_log_direct_ failed. + ret = OB_NEED_RETRY; } } else { ret = OB_ERR_UNEXPECTED; diff --git a/src/logservice/logfetcher/ob_log_ls_fetch_stream.cpp b/src/logservice/logfetcher/ob_log_ls_fetch_stream.cpp index c4a6d78aaa..c0c312da14 100644 --- a/src/logservice/logfetcher/ob_log_ls_fetch_stream.cpp +++ b/src/logservice/logfetcher/ob_log_ls_fetch_stream.cpp @@ -936,6 +936,7 @@ int FetchStream::handle_fetch_archive_task_(volatile bool &stop_flag) } } // retry on fetch remote log failure anyway + // for all scenario above, no need to fetch log and need to reset remote iterator. need_fetch_log = false; ls_fetch_ctx_->reset_remote_iter(); ret = OB_SUCCESS; @@ -948,6 +949,12 @@ int FetchStream::handle_fetch_archive_task_(volatile bool &stop_flag) K(lsn), K(kick_out_info), KPC(ls_fetch_ctx_)); } else if (OB_NEED_RETRY == ret) { LOG_WARN("read_group_entry failed, retry", KR(ret), K(log_group_entry), K(lsn), K(tls_id)); + // reset remote iter to fetch log that match the next_lsn in progress next round, + // otherwise incorrect log may be fetched. + ls_fetch_ctx_->reset_remote_iter(); + // reset memory storage to prevent the remain logentry in mem_storage from + // disruppting the iteration of log group entry. + ls_fetch_ctx_->reset_memory_storage(); need_fetch_log = false; ret = OB_SUCCESS; } @@ -986,11 +993,6 @@ int FetchStream::handle_fetch_archive_task_(volatile bool &stop_flag) } } - // rewrite ret code when ret equals OB_NEED_RETRY. - if (OB_NEED_RETRY == ret) { - ret = OB_SUCCESS; - } - // when exit from loop, there could still be some fetch tasks to be synchronized if (OB_SUCC(ret)) { int64_t flush_time = 0; @@ -1002,6 +1004,12 @@ int FetchStream::handle_fetch_archive_task_(volatile bool &stop_flag) read_log_time, fetch_remote_time, flush_time, tsi); } } + + // rewrite ret code when ret equals OB_NEED_RETRY. + if (OB_NEED_RETRY == ret) { + ret = OB_SUCCESS; + } + if (OB_SUCC(ret)) { if (kick_out_info.need_kick_out()) { if (OB_FAIL(kick_out_task_(kick_out_info))) {