[OBCDC] Fix OBCDC exits when fetching missing log failed

This commit is contained in:
zxlzxlzxlzxlzxl
2023-05-24 12:41:37 +00:00
committed by ob-robot
parent b3b42a8de6
commit a434fb6e6d
4 changed files with 39 additions and 14 deletions

View File

@ -409,6 +409,9 @@ public:
// enable test_mode_switch_fetch_mode to test whether cdc service can fetch log correctly when switching fetch mode // enable test_mode_switch_fetch_mode to test whether cdc service can fetch log correctly when switching fetch mode
T_DEF_BOOL(test_mode_switch_fetch_mode, OB_CLUSTER_PARAMETER, 0, "0:disabled 1:enabled"); T_DEF_BOOL(test_mode_switch_fetch_mode, OB_CLUSTER_PARAMETER, 0, "0:disabled 1:enabled");
// simulate fetch missing error when fetching missing log for the first time
T_DEF_BOOL(test_fetch_missing_errsim, OB_CLUSTER_PARAMETER, 0, "0:disabled, 1:enabled");
// Whether check tenant status for each schema request with tenant_id under test mode, default disabled // Whether check tenant status for each schema request with tenant_id under test mode, default disabled
T_DEF_BOOL(test_mode_force_check_tenant_status, OB_CLUSTER_PARAMETER, 0, "0:disabled, 1:enabled"); T_DEF_BOOL(test_mode_force_check_tenant_status, OB_CLUSTER_PARAMETER, 0, "0:disabled, 1:enabled");

View File

@ -296,6 +296,7 @@ if (! mem_storage_.is_inited()) {
void LSFetchCtx::reset_memory_storage() void LSFetchCtx::reset_memory_storage()
{ {
mem_storage_.destroy(); mem_storage_.destroy();
LOG_DEBUG("reset memory storage", KPC(this));
} }
int LSFetchCtx::get_next_group_entry(palf::LogGroupEntry &group_entry, palf::LSN &lsn) int LSFetchCtx::get_next_group_entry(palf::LogGroupEntry &group_entry, palf::LSN &lsn)

View File

@ -961,10 +961,9 @@ int FetchStream::handle_fetch_archive_task_(volatile bool &stop_flag)
if (OB_TMP_FAIL(set_(kick_out_info, tls_id, KickOutReason::FETCH_LOG_FAIL_IN_DIRECT_MODE))) { if (OB_TMP_FAIL(set_(kick_out_info, tls_id, KickOutReason::FETCH_LOG_FAIL_IN_DIRECT_MODE))) {
LOG_WARN("set kickout info failed", KR(tmp_ret), K(kick_out_info), K(tls_id)); LOG_WARN("set kickout info failed", KR(tmp_ret), K(kick_out_info), K(tls_id));
} }
} else {
// ret equals OB_ITER_END
} }
// retry on fetch remote log failure anyway // retry on fetch remote log failure anyway
// for all scenario above, no need to fetch log and need to reset remote iterator.
need_fetch_log = false; need_fetch_log = false;
ls_fetch_ctx_->reset_remote_iter(); ls_fetch_ctx_->reset_remote_iter();
ret = OB_SUCCESS; ret = OB_SUCCESS;
@ -978,6 +977,12 @@ int FetchStream::handle_fetch_archive_task_(volatile bool &stop_flag)
} else if (OB_NEED_RETRY == ret) { } else if (OB_NEED_RETRY == ret) {
LOG_WARN("read_group_entry failed, retry", KR(ret), K(log_group_entry), K(lsn), K(tls_id)); LOG_WARN("read_group_entry failed, retry", KR(ret), K(log_group_entry), K(lsn), K(tls_id));
need_fetch_log = false; need_fetch_log = false;
// reset remote iter to fetch log that match the next_lsn in progress next round,
// otherwise incorrect log may be fetched.
ls_fetch_ctx_->reset_remote_iter();
// reset memory storage to prevent the remain logentry in mem_storage from
// disruppting the iteration of log group entry.
ls_fetch_ctx_->reset_memory_storage();
ret = OB_SUCCESS; ret = OB_SUCCESS;
} }
} else if (OB_FAIL(ls_fetch_ctx_->update_progress(log_group_entry, lsn))) { } else if (OB_FAIL(ls_fetch_ctx_->update_progress(log_group_entry, lsn))) {
@ -1015,12 +1020,6 @@ int FetchStream::handle_fetch_archive_task_(volatile bool &stop_flag)
} }
} }
// rewrite ret code when ret equals OB_NEED_RETRY.
// TODO: IS THIS A REDUNDANT CODE FRAGMENT?
if (OB_NEED_RETRY == ret) {
ret = OB_SUCCESS;
}
// when exit from loop, there could still be some fetch tasks to be synchronized // when exit from loop, there could still be some fetch tasks to be synchronized
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
int64_t flush_time = 0; int64_t flush_time = 0;
@ -1032,6 +1031,12 @@ int FetchStream::handle_fetch_archive_task_(volatile bool &stop_flag)
read_log_time, fetch_remote_time, flush_time, tsi); read_log_time, fetch_remote_time, flush_time, tsi);
} }
} }
// rewrite ret code when ret equals OB_NEED_RETRY.
if (OB_NEED_RETRY == ret) {
ret = OB_SUCCESS;
}
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
if (kick_out_info.need_kick_out()) { if (kick_out_info.need_kick_out()) {
if (OB_FAIL(kick_out_task_(kick_out_info))) { if (OB_FAIL(kick_out_task_(kick_out_info))) {
@ -1560,7 +1565,13 @@ int FetchStream::fetch_miss_log_(
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
const ClientFetchingMode fetching_mode = ls_fetch_ctx.get_fetching_mode(); const ClientFetchingMode fetching_mode = ls_fetch_ctx.get_fetching_mode();
if (! is_fetching_mode_valid(fetching_mode)) {
static bool fetch_missing_fail = true;
if ((1 == TCONF.test_fetch_missing_errsim) && fetch_missing_fail) {
fetch_missing_fail = false;
ret = OB_NEED_RETRY;
LOG_ERROR("errsim fetch missing log fail", K(ls_fetch_ctx));
} else if (! is_fetching_mode_valid(fetching_mode)) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_ERROR("fetching mode is not valid", KR(ret), K(fetching_mode)); LOG_ERROR("fetching mode is not valid", KR(ret), K(fetching_mode));
} else if (is_integrated_fetching_mode(fetching_mode)) { } else if (is_integrated_fetching_mode(fetching_mode)) {
@ -1579,6 +1590,8 @@ int FetchStream::fetch_miss_log_(
// mock FetchLogSRpc here // mock FetchLogSRpc here
if (OB_FAIL(fetch_miss_log_direct_(miss_log_array, timeout, fetch_srpc, ls_fetch_ctx))) { if (OB_FAIL(fetch_miss_log_direct_(miss_log_array, timeout, fetch_srpc, ls_fetch_ctx))) {
LOG_ERROR("fetch missing log direct failed", KR(ret), K(ls_fetch_ctx), K(miss_log_array)); LOG_ERROR("fetch missing log direct failed", KR(ret), K(ls_fetch_ctx), K(miss_log_array));
// rewrite ret code to make sure that cdc wouldn't exit because fetch_missing_log_direct_ failed.
ret = OB_NEED_RETRY;
} }
} else { } else {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;

View File

@ -936,6 +936,7 @@ int FetchStream::handle_fetch_archive_task_(volatile bool &stop_flag)
} }
} }
// retry on fetch remote log failure anyway // retry on fetch remote log failure anyway
// for all scenario above, no need to fetch log and need to reset remote iterator.
need_fetch_log = false; need_fetch_log = false;
ls_fetch_ctx_->reset_remote_iter(); ls_fetch_ctx_->reset_remote_iter();
ret = OB_SUCCESS; ret = OB_SUCCESS;
@ -948,6 +949,12 @@ int FetchStream::handle_fetch_archive_task_(volatile bool &stop_flag)
K(lsn), K(kick_out_info), KPC(ls_fetch_ctx_)); K(lsn), K(kick_out_info), KPC(ls_fetch_ctx_));
} else if (OB_NEED_RETRY == ret) { } else if (OB_NEED_RETRY == ret) {
LOG_WARN("read_group_entry failed, retry", KR(ret), K(log_group_entry), K(lsn), K(tls_id)); LOG_WARN("read_group_entry failed, retry", KR(ret), K(log_group_entry), K(lsn), K(tls_id));
// reset remote iter to fetch log that match the next_lsn in progress next round,
// otherwise incorrect log may be fetched.
ls_fetch_ctx_->reset_remote_iter();
// reset memory storage to prevent the remain logentry in mem_storage from
// disruppting the iteration of log group entry.
ls_fetch_ctx_->reset_memory_storage();
need_fetch_log = false; need_fetch_log = false;
ret = OB_SUCCESS; ret = OB_SUCCESS;
} }
@ -986,11 +993,6 @@ int FetchStream::handle_fetch_archive_task_(volatile bool &stop_flag)
} }
} }
// rewrite ret code when ret equals OB_NEED_RETRY.
if (OB_NEED_RETRY == ret) {
ret = OB_SUCCESS;
}
// when exit from loop, there could still be some fetch tasks to be synchronized // when exit from loop, there could still be some fetch tasks to be synchronized
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
int64_t flush_time = 0; int64_t flush_time = 0;
@ -1002,6 +1004,12 @@ int FetchStream::handle_fetch_archive_task_(volatile bool &stop_flag)
read_log_time, fetch_remote_time, flush_time, tsi); read_log_time, fetch_remote_time, flush_time, tsi);
} }
} }
// rewrite ret code when ret equals OB_NEED_RETRY.
if (OB_NEED_RETRY == ret) {
ret = OB_SUCCESS;
}
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
if (kick_out_info.need_kick_out()) { if (kick_out_info.need_kick_out()) {
if (OB_FAIL(kick_out_task_(kick_out_info))) { if (OB_FAIL(kick_out_task_(kick_out_info))) {