[CP] [CdcService] Fix obcdc locate start_lsn fail
This commit is contained in:
parent
3f08f92f54
commit
b55e7ef35c
@ -236,14 +236,16 @@ int ObCdcStartLsnLocator::do_locate_ls_(const bool fetch_archive_only,
|
||||
ObCdcGetSourceFunctor get_source_func(ctx, version);
|
||||
ObCdcUpdateSourceFunctor update_source_func(ctx, version);
|
||||
logservice::ObRemoteLogGroupEntryIterator remote_group_iter(get_source_func, update_source_func);
|
||||
constexpr int64_t MAX_RETRY_COUNT = 4;
|
||||
constexpr int64_t MAX_RETRY_COUNT = 5;
|
||||
// for RemoteLogIterator::next
|
||||
int64_t next_buf_size = 0;
|
||||
const char *next_buf = NULL;
|
||||
LSN lsn;
|
||||
int64_t retry_time = 0;
|
||||
bool iterate_log_fail = false;
|
||||
do {
|
||||
const bool iter_inited = remote_group_iter.is_init();
|
||||
iterate_log_fail = false;
|
||||
if (OB_FAIL(host_->init_archive_source_if_needed(ls_id, ctx))) {
|
||||
LOG_WARN("failed to init archive source", K(ctx), K(ls_id));
|
||||
} else if (! iter_inited && OB_FAIL(remote_group_iter.init(tenant_id_, ls_id, start_scn,
|
||||
@ -252,17 +254,14 @@ int ObCdcStartLsnLocator::do_locate_ls_(const bool fetch_archive_only,
|
||||
} else if (! iter_inited && OB_FAIL(remote_group_iter.set_io_context(palf::LogIOContext(tenant_id_, ls_id.id(), palf::LogIOUser::CDC)))) {
|
||||
LOG_WARN("set_io_context failed", KR(ret), K(ls_id), K(tenant_id_));
|
||||
} else if (OB_FAIL(remote_group_iter.next(log_group_entry, lsn, next_buf, next_buf_size))) {
|
||||
if (OB_ITER_END != ret) {
|
||||
LOG_WARN("iterate through archive log failed", KR(ret), K(ls_id), K(tenant_id_));
|
||||
} else {
|
||||
remote_group_iter.update_source_cb();
|
||||
remote_group_iter.reset();
|
||||
LOG_INFO("get iter end from remote_group_iter, retry", K(result_lsn), K(retry_time), K(MAX_RETRY_COUNT));
|
||||
}
|
||||
iterate_log_fail = true;
|
||||
remote_group_iter.update_source_cb();
|
||||
remote_group_iter.reset();
|
||||
LOG_WARN("iterate through archive log failed", KR(ret), K(ls_id), K(tenant_id_));
|
||||
} else {
|
||||
result_ts_ns = log_group_entry.get_scn().get_val_for_logservice();
|
||||
}
|
||||
} while (OB_ITER_END == ret && ++retry_time < MAX_RETRY_COUNT);
|
||||
} while (iterate_log_fail && ++retry_time < MAX_RETRY_COUNT);
|
||||
}
|
||||
}
|
||||
// Unconditional setting ret code
|
||||
|
@ -721,7 +721,7 @@ int ObCDCMissLogHandler::fetch_miss_log_direct_(
|
||||
if (get_timestamp() > time_upper_limit) {
|
||||
is_timeout = true;
|
||||
} else if (OB_FAIL(entry_iter.init(tenant_id, ls_id, cur_scn, missing_lsn,
|
||||
LSN(palf::LOG_MAX_LSN_VAL), buffer_pool, log_ext_handler))) {
|
||||
LSN(palf::LOG_MAX_LSN_VAL), buffer_pool, log_ext_handler, archive::ARCHIVE_FILE_DATA_BUF_SIZE))) {
|
||||
LOG_WARN("remote entry iter init failed", KR(ret));
|
||||
} else if (OB_FAIL(entry_iter.next(log_entry, lsn, buf, buf_size))) {
|
||||
retry_on_err =true;
|
||||
|
@ -274,7 +274,7 @@ int LSFetchCtx::init_remote_iter()
|
||||
} else if (OB_FAIL(get_log_ext_handler(log_ext_handler))) {
|
||||
LOG_ERROR("get log ext handler failed", KR(ret));
|
||||
} else if (OB_FAIL(remote_iter_.init(tenant_id, ls_id, start_scn, start_lsn,
|
||||
LSN(LOG_MAX_LSN_VAL), large_buffer_pool, log_ext_handler))) {
|
||||
LSN(LOG_MAX_LSN_VAL), large_buffer_pool, log_ext_handler, archive::ARCHIVE_FILE_DATA_BUF_SIZE))) {
|
||||
LOG_ERROR("remote iter init failed", KR(ret), K(tenant_id), K(ls_id), K(start_scn), K(start_lsn));
|
||||
} else if (OB_FAIL(remote_iter_.set_io_context(palf::LogIOContext(palf::LogIOUser::CDC)))) {
|
||||
LOG_ERROR("remote iter set_io_context failed", KR(ret), K(tenant_id), K(ls_id), K(start_scn), K(start_lsn));
|
||||
|
@ -268,7 +268,7 @@ int LSFetchCtx::init_remote_iter()
|
||||
} else if (OB_FAIL(get_log_ext_handler(log_ext_handler))) {
|
||||
LOG_ERROR("get log external handler failed", KR(ret));
|
||||
} else if (OB_FAIL(remote_iter_.init(tenant_id, ls_id, start_scn, start_lsn,
|
||||
LSN(LOG_MAX_LSN_VAL), large_buffer_pool, log_ext_handler))) {
|
||||
LSN(LOG_MAX_LSN_VAL), large_buffer_pool, log_ext_handler, archive::ARCHIVE_FILE_DATA_BUF_SIZE))) {
|
||||
LOG_ERROR("remote iter init failed", KR(ret), K(tenant_id), K(ls_id), K(start_scn), K(start_lsn));
|
||||
} else if (OB_FAIL(remote_iter_.set_io_context(palf::LogIOContext(palf::LogIOUser::CDC)))) {
|
||||
LOG_ERROR("remote iter set_io_context failed", KR(ret), K(tenant_id), K(ls_id), K(start_scn), K(start_lsn));
|
||||
|
Loading…
x
Reference in New Issue
Block a user