[CP] [CdcService] Retry when get iter_end from remote_log_iter when locating lsn

This commit is contained in:
zxlzxlzxlzxlzxl
2024-02-05 02:51:48 +00:00
committed by ob-robot
parent a88f95d70e
commit aabbf9b9fb
6 changed files with 90 additions and 80 deletions

View File

@ -350,7 +350,8 @@ int ObCdcFetcher::fetch_log_in_archive_(
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
// always reserve 4K for archive header // always reserve 4K for archive header
const int64_t SINGLE_READ_SIZE = 16 * 1024 * 1024L - 4 * 1024; const int64_t SINGLE_READ_SIZE = 16 * 1024 * 1024L - 4 * 1024;
if (OB_ISNULL(ctx.get_source()) && OB_FAIL(init_archive_source_(ctx, ls_id))) { const int64_t MAX_RETRY_COUNT = 4;
if (OB_ISNULL(ctx.get_source()) && OB_FAIL(ObCdcService::init_archive_source(ls_id, ctx))) {
LOG_WARN("init archive source failed", K(ctx), K(ls_id)); LOG_WARN("init archive source failed", K(ctx), K(ls_id));
} else { } else {
const char *buf = NULL; const char *buf = NULL;
@ -358,22 +359,30 @@ int ObCdcFetcher::fetch_log_in_archive_(
share::SCN pre_scn; share::SCN pre_scn;
if (OB_FAIL(pre_scn.convert_from_ts(ctx.get_progress()/1000L))) { if (OB_FAIL(pre_scn.convert_from_ts(ctx.get_progress()/1000L))) {
LOG_WARN("convert progress to scn failed", KR(ret), K(ctx)); LOG_WARN("convert progress to scn failed", KR(ret), K(ctx));
} else if (need_init_iter && OB_FAIL(remote_iter.init(tenant_id_, ls_id, pre_scn, } else {
start_lsn, LSN(LOG_MAX_LSN_VAL), large_buffer_pool_, int64_t retry_count = 0;
log_ext_handler_, SINGLE_READ_SIZE))) { do {
LOG_WARN("init remote log iterator failed", KR(ret), K(tenant_id_), K(ls_id)); if (! remote_iter.is_init() && OB_FAIL(remote_iter.init(tenant_id_, ls_id, pre_scn,
} else if (OB_FAIL(remote_iter.next(log_entry, lsn, buf, buf_size))) { start_lsn, LSN(LOG_MAX_LSN_VAL), large_buffer_pool_, log_ext_handler_, SINGLE_READ_SIZE))) {
// expected OB_ITER_END and OB_SUCCEES, error occurs when other code is returned. LOG_WARN("init remote log iterator failed", KR(ret), K(tenant_id_), K(ls_id));
if (OB_ITER_END != ret) { } else if (OB_FAIL(remote_iter.next(log_entry, lsn, buf, buf_size))) {
LOG_WARN("iterate remote log failed", KR(ret), K(need_init_iter), K(ls_id)); // expected OB_ITER_END and OB_SUCCEES, error occurs when other code is returned.
} if (OB_ITER_END != ret) {
} else if (start_lsn != lsn) { LOG_WARN("iterate remote log failed", KR(ret), K(need_init_iter), K(ls_id));
// to keep consistency with the ret code of palf } else {
ret = OB_INVALID_DATA; remote_iter.update_source_cb();
LOG_WARN("remote iterator returned unexpected log entry lsn", K(start_lsn), K(lsn), K(log_entry), K(ls_id), remote_iter.reset();
K(remote_iter)); LOG_INFO("get iter end from remote_iter, retry", K(retry_count), K(MAX_RETRY_COUNT));
} else { }
} else if (start_lsn != lsn) {
// to keep consistency with the ret code of palf
ret = OB_INVALID_DATA;
LOG_WARN("remote iterator returned unexpected log entry lsn", K(start_lsn), K(lsn), K(log_entry), K(ls_id),
K(remote_iter));
} else {
}
} while (OB_ITER_END == ret && ++retry_count < MAX_RETRY_COUNT);
} }
} }
return ret; return ret;
@ -993,34 +1002,6 @@ int ObCdcFetcher::prefill_resp_with_log_entry_(const ObLSID &ls_id,
return ret; return ret;
} }
// called when source in ctx is null
int ObCdcFetcher::init_archive_source_(ClientLSCtx &ctx, ObLSID ls_id) {
int ret = OB_SUCCESS;
ObRemoteLogParent *source = ctx.get_source();
if (OB_NOT_NULL(source)) {
LOG_WARN("archive source is not null, no need to init");
} else if (OB_ISNULL(source = logservice::ObResSrcAlloctor::alloc(ObLogRestoreSourceType::LOCATION, ls_id))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("alloc RemoteLocationParent failed", KR(ret), K(tenant_id_), K(ls_id));
} else {
share::ObBackupDest archive_dest;
if (OB_FAIL(ObCdcService::get_backup_dest(ls_id, archive_dest))) {
LOG_WARN("get backupdest from archivedestinfo failed", KR(ret), K(ls_id));
} else if (OB_FAIL(static_cast<ObRemoteLocationParent*>(source)->set(archive_dest, SCN::max_scn()))) {
LOG_WARN("source set archive dest info failed", KR(ret), K(archive_dest));
} else {
ctx.set_source(source);
LOG_INFO("init archive source succ", K(ctx), K(ls_id));
}
if (OB_FAIL(ret)) {
logservice::ObResSrcAlloctor::free(source);
source = nullptr;
}
}
return ret;
}
int ObCdcFetcher::prepare_berfore_fetch_missing_(const ObLSID &ls_id, int ObCdcFetcher::prepare_berfore_fetch_missing_(const ObLSID &ls_id,
ClientLSCtx &ctx, ClientLSCtx &ctx,
palf::PalfHandleGuard &palf_handle_guard, palf::PalfHandleGuard &palf_handle_guard,
@ -1037,7 +1018,7 @@ int ObCdcFetcher::prepare_berfore_fetch_missing_(const ObLSID &ls_id,
} }
} }
if (OB_SUCC(ret) && OB_ISNULL(ctx.get_source()) && OB_FAIL(init_archive_source_(ctx, ls_id))) { if (OB_SUCC(ret) && OB_ISNULL(ctx.get_source()) && OB_FAIL(ObCdcService::init_archive_source(ls_id, ctx))) {
if (OB_ALREADY_IN_NOARCHIVE_MODE == ret) { if (OB_ALREADY_IN_NOARCHIVE_MODE == ret) {
ret = OB_SUCCESS; ret = OB_SUCCESS;
archive_is_on = false; archive_is_on = false;

View File

@ -135,7 +135,6 @@ private:
LogEntryType &log_group_entry, LogEntryType &log_group_entry,
LSN &lsn, LSN &lsn,
ClientLSCtx &ctx); ClientLSCtx &ctx);
int init_archive_source_(ClientLSCtx &ctx, ObLSID ls_id);
// Check whether has reached time limit // Check whether has reached time limit
inline bool is_time_up_(const int64_t scan_round, const int64_t end_tstamp) inline bool is_time_up_(const int64_t scan_round, const int64_t end_tstamp)
{ {

View File

@ -77,6 +77,7 @@ int ObCdcService::get_backup_dest(const share::ObLSID &ls_id, share::ObBackupDes
ObCdcService::ObCdcService() ObCdcService::ObCdcService()
: is_inited_(false), : is_inited_(false),
stop_flag_(true), stop_flag_(true),
tenant_id_(OB_INVALID_TENANT_ID),
locator_(), locator_(),
fetcher_(), fetcher_(),
tg_id_(-1), tg_id_(-1),
@ -114,6 +115,7 @@ int ObCdcService::init(const uint64_t tenant_id,
} else if (OB_FAIL(create_tenant_tg_(tenant_id))) { } else if (OB_FAIL(create_tenant_tg_(tenant_id))) {
EXTLOG_LOG(WARN, "cdc thread group create failed", KR(ret), K(tenant_id)); EXTLOG_LOG(WARN, "cdc thread group create failed", KR(ret), K(tenant_id));
} else { } else {
tenant_id_ = tenant_id;
is_inited_ = true; is_inited_ = true;
} }
@ -190,7 +192,7 @@ int ObCdcService::start()
EXTLOG_LOG(WARN, "ObCdcService not init", K(ret)); EXTLOG_LOG(WARN, "ObCdcService not init", K(ret));
} else if (OB_FAIL(log_ext_handler_.start(0))) { } else if (OB_FAIL(log_ext_handler_.start(0))) {
EXTLOG_LOG(WARN, "log ext handler start failed", K(ret)); EXTLOG_LOG(WARN, "log ext handler start failed", K(ret));
} else if (OB_FAIL(start_tenant_tg_(MTL_ID()))) { } else if (OB_FAIL(start_tenant_tg_(tenant_id_))) {
EXTLOG_LOG(ERROR, "start CDCService failed", KR(ret)); EXTLOG_LOG(ERROR, "start CDCService failed", KR(ret));
} else { } else {
stop_flag_ = false; stop_flag_ = false;
@ -202,13 +204,13 @@ int ObCdcService::start()
void ObCdcService::stop() void ObCdcService::stop()
{ {
ATOMIC_STORE(&stop_flag_, true); ATOMIC_STORE(&stop_flag_, true);
stop_tenant_tg_(MTL_ID()); stop_tenant_tg_(tenant_id_);
log_ext_handler_.stop(); log_ext_handler_.stop();
} }
void ObCdcService::wait() void ObCdcService::wait()
{ {
wait_tenant_tg_(MTL_ID()); wait_tenant_tg_(tenant_id_);
log_ext_handler_.wait(); log_ext_handler_.wait();
// do nothing // do nothing
} }
@ -217,13 +219,14 @@ void ObCdcService::destroy()
{ {
is_inited_ = false; is_inited_ = false;
stop_flag_ = true; stop_flag_ = true;
destroy_tenant_tg_(MTL_ID()); destroy_tenant_tg_(tenant_id_);
fetcher_.destroy(); fetcher_.destroy();
locator_.destroy(); locator_.destroy();
dest_info_.reset(); dest_info_.reset();
large_buffer_pool_.destroy(); large_buffer_pool_.destroy();
ls_ctx_map_.destroy(); ls_ctx_map_.destroy();
log_ext_handler_.destroy(); log_ext_handler_.destroy();
tenant_id_ = OB_INVALID_TENANT_ID;
} }
int ObCdcService::req_start_lsn_by_ts_ns(const obrpc::ObCdcReqStartLSNByTsReq &req, int ObCdcService::req_start_lsn_by_ts_ns(const obrpc::ObCdcReqStartLSNByTsReq &req,
@ -312,6 +315,35 @@ int ObCdcService::fetch_missing_log(const obrpc::ObCdcLSFetchMissLogReq &req,
return ret; return ret;
} }
int ObCdcService::init_archive_source(const ObLSID &ls_id,
ClientLSCtx &ctx)
{
int ret = OB_SUCCESS;
logservice::ObRemoteLogParent *source = ctx.get_source();
if (OB_NOT_NULL(source)) {
EXTLOG_LOG(WARN, "archive source is not null, no need to init");
} else if (OB_ISNULL(source = logservice::ObResSrcAlloctor::alloc(ObLogRestoreSourceType::LOCATION, ls_id))) {
ret = OB_ERR_UNEXPECTED;
EXTLOG_LOG(WARN, "alloc RemoteLocationParent failed", KR(ret), K(ls_id));
} else {
share::ObBackupDest archive_dest;
if (OB_FAIL(get_backup_dest(ls_id, archive_dest))) {
EXTLOG_LOG(WARN, "get backupdest from archivedestinfo failed", KR(ret), K(ls_id));
} else if (OB_FAIL(static_cast<logservice::ObRemoteLocationParent*>(source)->set(archive_dest, SCN::max_scn()))) {
EXTLOG_LOG(WARN, "source set archive dest info failed", KR(ret), K(archive_dest));
} else {
ctx.set_source(source);
EXTLOG_LOG(WARN, "init archive source succ", K(ctx), K(ls_id));
}
if (OB_FAIL(ret)) {
logservice::ObResSrcAlloctor::free(source);
source = nullptr;
}
}
return ret;
}
int ObCdcService::query_tenant_archive_info_() int ObCdcService::query_tenant_archive_info_()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;

View File

@ -72,6 +72,7 @@ class ObCdcService: public lib::TGRunnable
{ {
public: public:
static int get_backup_dest(const share::ObLSID &ls_id, share::ObBackupDest &backup_dest); static int get_backup_dest(const share::ObLSID &ls_id, share::ObBackupDest &backup_dest);
static int init_archive_source(const ObLSID &ls_id, ClientLSCtx &ctx);
public: public:
ObCdcService(); ObCdcService();
~ObCdcService(); ~ObCdcService();
@ -138,6 +139,8 @@ private:
private: private:
bool is_inited_; bool is_inited_;
volatile bool stop_flag_ CACHE_ALIGNED; volatile bool stop_flag_ CACHE_ALIGNED;
uint64_t tenant_id_;
ObCdcStartLsnLocator locator_; ObCdcStartLsnLocator locator_;
ObCdcFetcher fetcher_; ObCdcFetcher fetcher_;

View File

@ -231,39 +231,34 @@ int ObCdcStartLsnLocator::do_locate_ls_(const bool fetch_archive_only,
} else { } else {
result_ts_ns = start_ts_ns; result_ts_ns = start_ts_ns;
// for RemoteLogIterator::init // for RemoteLogIterator::init
logservice::GetSourceFunc get_source_func = [&](const ObLSID& ls_id, logservice::ObRemoteSourceGuard &guard) ->int { ClientLSCtx ctx;
int ret = OB_SUCCESS; ObCdcGetSourceFunctor get_source_func(ctx);
logservice::ObRemoteLogParent *source = logservice::ObResSrcAlloctor::alloc(share::ObLogRestoreSourceType::LOCATION, ls_id); ObCdcUpdateSourceFunctor update_source_func(ctx);
logservice::ObRemoteLocationParent *location_source = static_cast<logservice::ObRemoteLocationParent*>(source); logservice::ObRemoteLogGroupEntryIterator remote_group_iter(get_source_func, update_source_func);
if (OB_ISNULL(location_source)) { constexpr int64_t MAX_RETRY_COUNT = 4;
ret = OB_ERR_UNEXPECTED;
LOG_WARN("source allocated is null", KR(ret), K(ls_id));
} else if (OB_FAIL(location_source->set(backup_dest, SCN::max_scn()))) {
LOG_WARN("set backup dest failed", KR(ret), K(backup_dest), K(ls_id));
} else if (OB_FAIL(guard.set_source(location_source))) {
LOG_WARN("remote source guard set source failed", KR(ret), K(ls_id));
}
if (OB_FAIL(ret) && OB_NOT_NULL(location_source)) {
logservice::ObResSrcAlloctor::free(location_source);
location_source = nullptr;
}
return ret;
};
logservice::ObRemoteLogGroupEntryIterator remote_group_iter(get_source_func);
// for RemoteLogIterator::next // for RemoteLogIterator::next
int64_t next_buf_size = 0; int64_t next_buf_size = 0;
const char *next_buf = NULL; const char *next_buf = NULL;
LSN lsn; LSN lsn;
int64_t retry_time = 0;
if (OB_FAIL(remote_group_iter.init(tenant_id_, ls_id, start_scn, do {
result_lsn, LSN(palf::LOG_MAX_LSN_VAL), large_buffer_pool_, log_ext_handler_))) { if (nullptr == ctx.get_source() && OB_FAIL(ObCdcService::init_archive_source(ls_id, ctx))) {
LOG_WARN("init remote group iter failed when retriving log group entry in start lsn locator", KR(ret), K(ls_id), K(tenant_id_)); LOG_WARN("failed to init archive source", K(ctx), K(ls_id));
} else if (OB_FAIL(remote_group_iter.next(log_group_entry, lsn, next_buf, next_buf_size))) { } else if (! remote_group_iter.is_init() && OB_FAIL(remote_group_iter.init(tenant_id_, ls_id, start_scn,
LOG_WARN("iterate through archive log failed", KR(ret), K(ls_id), K(tenant_id_)); result_lsn, LSN(palf::LOG_MAX_LSN_VAL), large_buffer_pool_, log_ext_handler_))) {
} else { LOG_WARN("init remote group iter failed when retriving log group entry in start lsn locator", KR(ret), K(ls_id), K(tenant_id_));
result_ts_ns = log_group_entry.get_scn().get_val_for_logservice(); } else if (OB_FAIL(remote_group_iter.next(log_group_entry, lsn, next_buf, next_buf_size))) {
} if (OB_ITER_END != ret) {
LOG_WARN("iterate through archive log failed", KR(ret), K(ls_id), K(tenant_id_));
} else {
remote_group_iter.update_source_cb();
remote_group_iter.reset();
LOG_INFO("get iter end from remote_group_iter, retry", K(result_lsn), K(retry_time), K(MAX_RETRY_COUNT));
}
} else {
result_ts_ns = log_group_entry.get_scn().get_val_for_logservice();
}
} while (OB_ITER_END == ret && ++retry_time < MAX_RETRY_COUNT);
} }
} }
// Unconditional setting ret code // Unconditional setting ret code

View File

@ -120,7 +120,7 @@ public:
void set_progress(int64_t progress) { client_progress_ = progress; } void set_progress(int64_t progress) { client_progress_ = progress; }
int64_t get_progress() const { return client_progress_; } int64_t get_progress() const { return client_progress_; }
TO_STRING_KV(K_(source), TO_STRING_KV(KP_(source),
K_(fetch_mode), K_(fetch_mode),
K_(last_touch_ts), K_(last_touch_ts),
K_(client_progress)) K_(client_progress))