[CP] [CdcService] Retry when get iter_end from remote_log_iter when locating lsn
This commit is contained in:
		@ -350,7 +350,8 @@ int ObCdcFetcher::fetch_log_in_archive_(
 | 
				
			|||||||
  int ret = OB_SUCCESS;
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
  // always reserve 4K for archive header
 | 
					  // always reserve 4K for archive header
 | 
				
			||||||
  const int64_t SINGLE_READ_SIZE = 16 * 1024 * 1024L - 4 * 1024;
 | 
					  const int64_t SINGLE_READ_SIZE = 16 * 1024 * 1024L - 4 * 1024;
 | 
				
			||||||
  if (OB_ISNULL(ctx.get_source()) && OB_FAIL(init_archive_source_(ctx, ls_id))) {
 | 
					  const int64_t MAX_RETRY_COUNT = 4;
 | 
				
			||||||
 | 
					  if (OB_ISNULL(ctx.get_source()) && OB_FAIL(ObCdcService::init_archive_source(ls_id, ctx))) {
 | 
				
			||||||
    LOG_WARN("init archive source failed", K(ctx), K(ls_id));
 | 
					    LOG_WARN("init archive source failed", K(ctx), K(ls_id));
 | 
				
			||||||
  } else {
 | 
					  } else {
 | 
				
			||||||
    const char *buf = NULL;
 | 
					    const char *buf = NULL;
 | 
				
			||||||
@ -358,14 +359,20 @@ int ObCdcFetcher::fetch_log_in_archive_(
 | 
				
			|||||||
    share::SCN pre_scn;
 | 
					    share::SCN pre_scn;
 | 
				
			||||||
    if (OB_FAIL(pre_scn.convert_from_ts(ctx.get_progress()/1000L))) {
 | 
					    if (OB_FAIL(pre_scn.convert_from_ts(ctx.get_progress()/1000L))) {
 | 
				
			||||||
      LOG_WARN("convert progress to scn failed", KR(ret), K(ctx));
 | 
					      LOG_WARN("convert progress to scn failed", KR(ret), K(ctx));
 | 
				
			||||||
    } else if (need_init_iter && OB_FAIL(remote_iter.init(tenant_id_, ls_id, pre_scn,
 | 
					    } else  {
 | 
				
			||||||
                                                          start_lsn, LSN(LOG_MAX_LSN_VAL), large_buffer_pool_,
 | 
					      int64_t retry_count = 0;
 | 
				
			||||||
                                                          log_ext_handler_, SINGLE_READ_SIZE))) {
 | 
					      do {
 | 
				
			||||||
 | 
					        if (! remote_iter.is_init() && OB_FAIL(remote_iter.init(tenant_id_, ls_id, pre_scn,
 | 
				
			||||||
 | 
					            start_lsn, LSN(LOG_MAX_LSN_VAL), large_buffer_pool_, log_ext_handler_, SINGLE_READ_SIZE))) {
 | 
				
			||||||
          LOG_WARN("init remote log iterator failed", KR(ret), K(tenant_id_), K(ls_id));
 | 
					          LOG_WARN("init remote log iterator failed", KR(ret), K(tenant_id_), K(ls_id));
 | 
				
			||||||
        } else if (OB_FAIL(remote_iter.next(log_entry, lsn, buf, buf_size))) {
 | 
					        } else if (OB_FAIL(remote_iter.next(log_entry, lsn, buf, buf_size))) {
 | 
				
			||||||
          // expected OB_ITER_END and OB_SUCCEES, error occurs when other code is returned.
 | 
					          // expected OB_ITER_END and OB_SUCCEES, error occurs when other code is returned.
 | 
				
			||||||
          if (OB_ITER_END != ret) {
 | 
					          if (OB_ITER_END != ret) {
 | 
				
			||||||
            LOG_WARN("iterate remote log failed", KR(ret), K(need_init_iter), K(ls_id));
 | 
					            LOG_WARN("iterate remote log failed", KR(ret), K(need_init_iter), K(ls_id));
 | 
				
			||||||
 | 
					          } else {
 | 
				
			||||||
 | 
					            remote_iter.update_source_cb();
 | 
				
			||||||
 | 
					            remote_iter.reset();
 | 
				
			||||||
 | 
					            LOG_INFO("get iter end from remote_iter, retry", K(retry_count), K(MAX_RETRY_COUNT));
 | 
				
			||||||
          }
 | 
					          }
 | 
				
			||||||
        } else if (start_lsn != lsn) {
 | 
					        } else if (start_lsn != lsn) {
 | 
				
			||||||
          // to keep consistency with the ret code of palf
 | 
					          // to keep consistency with the ret code of palf
 | 
				
			||||||
@ -375,6 +382,8 @@ int ObCdcFetcher::fetch_log_in_archive_(
 | 
				
			|||||||
        } else {
 | 
					        } else {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					      } while (OB_ITER_END == ret && ++retry_count < MAX_RETRY_COUNT);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
  return ret;
 | 
					  return ret;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@ -993,34 +1002,6 @@ int ObCdcFetcher::prefill_resp_with_log_entry_(const ObLSID &ls_id,
 | 
				
			|||||||
  return ret;
 | 
					  return ret;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// called when source in ctx is null
 | 
					 | 
				
			||||||
int ObCdcFetcher::init_archive_source_(ClientLSCtx &ctx, ObLSID ls_id) {
 | 
					 | 
				
			||||||
  int ret = OB_SUCCESS;
 | 
					 | 
				
			||||||
  ObRemoteLogParent *source = ctx.get_source();
 | 
					 | 
				
			||||||
  if (OB_NOT_NULL(source)) {
 | 
					 | 
				
			||||||
    LOG_WARN("archive source is not null, no need to init");
 | 
					 | 
				
			||||||
  } else if (OB_ISNULL(source = logservice::ObResSrcAlloctor::alloc(ObLogRestoreSourceType::LOCATION, ls_id))) {
 | 
					 | 
				
			||||||
    ret = OB_ERR_UNEXPECTED;
 | 
					 | 
				
			||||||
    LOG_WARN("alloc RemoteLocationParent failed", KR(ret), K(tenant_id_), K(ls_id));
 | 
					 | 
				
			||||||
  } else {
 | 
					 | 
				
			||||||
    share::ObBackupDest archive_dest;
 | 
					 | 
				
			||||||
    if (OB_FAIL(ObCdcService::get_backup_dest(ls_id, archive_dest))) {
 | 
					 | 
				
			||||||
      LOG_WARN("get backupdest from archivedestinfo failed", KR(ret), K(ls_id));
 | 
					 | 
				
			||||||
    } else if (OB_FAIL(static_cast<ObRemoteLocationParent*>(source)->set(archive_dest, SCN::max_scn()))) {
 | 
					 | 
				
			||||||
      LOG_WARN("source set archive dest info failed", KR(ret), K(archive_dest));
 | 
					 | 
				
			||||||
    } else {
 | 
					 | 
				
			||||||
      ctx.set_source(source);
 | 
					 | 
				
			||||||
      LOG_INFO("init archive source succ", K(ctx), K(ls_id));
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    if (OB_FAIL(ret)) {
 | 
					 | 
				
			||||||
      logservice::ObResSrcAlloctor::free(source);
 | 
					 | 
				
			||||||
      source = nullptr;
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
  return ret;
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
int ObCdcFetcher::prepare_berfore_fetch_missing_(const ObLSID &ls_id,
 | 
					int ObCdcFetcher::prepare_berfore_fetch_missing_(const ObLSID &ls_id,
 | 
				
			||||||
    ClientLSCtx &ctx,
 | 
					    ClientLSCtx &ctx,
 | 
				
			||||||
    palf::PalfHandleGuard &palf_handle_guard,
 | 
					    palf::PalfHandleGuard &palf_handle_guard,
 | 
				
			||||||
@ -1037,7 +1018,7 @@ int ObCdcFetcher::prepare_berfore_fetch_missing_(const ObLSID &ls_id,
 | 
				
			|||||||
      }
 | 
					      }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if (OB_SUCC(ret) && OB_ISNULL(ctx.get_source()) && OB_FAIL(init_archive_source_(ctx, ls_id))) {
 | 
					    if (OB_SUCC(ret) && OB_ISNULL(ctx.get_source()) && OB_FAIL(ObCdcService::init_archive_source(ls_id, ctx))) {
 | 
				
			||||||
      if (OB_ALREADY_IN_NOARCHIVE_MODE == ret) {
 | 
					      if (OB_ALREADY_IN_NOARCHIVE_MODE == ret) {
 | 
				
			||||||
        ret = OB_SUCCESS;
 | 
					        ret = OB_SUCCESS;
 | 
				
			||||||
        archive_is_on = false;
 | 
					        archive_is_on = false;
 | 
				
			||||||
 | 
				
			|||||||
@ -135,7 +135,6 @@ private:
 | 
				
			|||||||
      LogEntryType &log_group_entry,
 | 
					      LogEntryType &log_group_entry,
 | 
				
			||||||
      LSN &lsn,
 | 
					      LSN &lsn,
 | 
				
			||||||
      ClientLSCtx &ctx);
 | 
					      ClientLSCtx &ctx);
 | 
				
			||||||
  int init_archive_source_(ClientLSCtx &ctx, ObLSID ls_id);
 | 
					 | 
				
			||||||
  // Check whether has reached time limit
 | 
					  // Check whether has reached time limit
 | 
				
			||||||
  inline bool is_time_up_(const int64_t scan_round, const int64_t end_tstamp)
 | 
					  inline bool is_time_up_(const int64_t scan_round, const int64_t end_tstamp)
 | 
				
			||||||
  {
 | 
					  {
 | 
				
			||||||
 | 
				
			|||||||
@ -77,6 +77,7 @@ int ObCdcService::get_backup_dest(const share::ObLSID &ls_id, share::ObBackupDes
 | 
				
			|||||||
ObCdcService::ObCdcService()
 | 
					ObCdcService::ObCdcService()
 | 
				
			||||||
  : is_inited_(false),
 | 
					  : is_inited_(false),
 | 
				
			||||||
    stop_flag_(true),
 | 
					    stop_flag_(true),
 | 
				
			||||||
 | 
					    tenant_id_(OB_INVALID_TENANT_ID),
 | 
				
			||||||
    locator_(),
 | 
					    locator_(),
 | 
				
			||||||
    fetcher_(),
 | 
					    fetcher_(),
 | 
				
			||||||
    tg_id_(-1),
 | 
					    tg_id_(-1),
 | 
				
			||||||
@ -114,6 +115,7 @@ int ObCdcService::init(const uint64_t tenant_id,
 | 
				
			|||||||
  } else if (OB_FAIL(create_tenant_tg_(tenant_id))) {
 | 
					  } else if (OB_FAIL(create_tenant_tg_(tenant_id))) {
 | 
				
			||||||
    EXTLOG_LOG(WARN, "cdc thread group create failed", KR(ret), K(tenant_id));
 | 
					    EXTLOG_LOG(WARN, "cdc thread group create failed", KR(ret), K(tenant_id));
 | 
				
			||||||
  } else {
 | 
					  } else {
 | 
				
			||||||
 | 
					    tenant_id_ = tenant_id;
 | 
				
			||||||
    is_inited_ = true;
 | 
					    is_inited_ = true;
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -190,7 +192,7 @@ int ObCdcService::start()
 | 
				
			|||||||
    EXTLOG_LOG(WARN, "ObCdcService not init", K(ret));
 | 
					    EXTLOG_LOG(WARN, "ObCdcService not init", K(ret));
 | 
				
			||||||
  } else if (OB_FAIL(log_ext_handler_.start(0))) {
 | 
					  } else if (OB_FAIL(log_ext_handler_.start(0))) {
 | 
				
			||||||
    EXTLOG_LOG(WARN, "log ext handler start failed", K(ret));
 | 
					    EXTLOG_LOG(WARN, "log ext handler start failed", K(ret));
 | 
				
			||||||
  } else if (OB_FAIL(start_tenant_tg_(MTL_ID()))) {
 | 
					  } else if (OB_FAIL(start_tenant_tg_(tenant_id_))) {
 | 
				
			||||||
    EXTLOG_LOG(ERROR, "start CDCService failed", KR(ret));
 | 
					    EXTLOG_LOG(ERROR, "start CDCService failed", KR(ret));
 | 
				
			||||||
  } else {
 | 
					  } else {
 | 
				
			||||||
    stop_flag_ = false;
 | 
					    stop_flag_ = false;
 | 
				
			||||||
@ -202,13 +204,13 @@ int ObCdcService::start()
 | 
				
			|||||||
void ObCdcService::stop()
 | 
					void ObCdcService::stop()
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
  ATOMIC_STORE(&stop_flag_, true);
 | 
					  ATOMIC_STORE(&stop_flag_, true);
 | 
				
			||||||
  stop_tenant_tg_(MTL_ID());
 | 
					  stop_tenant_tg_(tenant_id_);
 | 
				
			||||||
  log_ext_handler_.stop();
 | 
					  log_ext_handler_.stop();
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
void ObCdcService::wait()
 | 
					void ObCdcService::wait()
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
  wait_tenant_tg_(MTL_ID());
 | 
					  wait_tenant_tg_(tenant_id_);
 | 
				
			||||||
  log_ext_handler_.wait();
 | 
					  log_ext_handler_.wait();
 | 
				
			||||||
  // do nothing
 | 
					  // do nothing
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@ -217,13 +219,14 @@ void ObCdcService::destroy()
 | 
				
			|||||||
{
 | 
					{
 | 
				
			||||||
  is_inited_ = false;
 | 
					  is_inited_ = false;
 | 
				
			||||||
  stop_flag_ = true;
 | 
					  stop_flag_ = true;
 | 
				
			||||||
  destroy_tenant_tg_(MTL_ID());
 | 
					  destroy_tenant_tg_(tenant_id_);
 | 
				
			||||||
  fetcher_.destroy();
 | 
					  fetcher_.destroy();
 | 
				
			||||||
  locator_.destroy();
 | 
					  locator_.destroy();
 | 
				
			||||||
  dest_info_.reset();
 | 
					  dest_info_.reset();
 | 
				
			||||||
  large_buffer_pool_.destroy();
 | 
					  large_buffer_pool_.destroy();
 | 
				
			||||||
  ls_ctx_map_.destroy();
 | 
					  ls_ctx_map_.destroy();
 | 
				
			||||||
  log_ext_handler_.destroy();
 | 
					  log_ext_handler_.destroy();
 | 
				
			||||||
 | 
					  tenant_id_ = OB_INVALID_TENANT_ID;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
int ObCdcService::req_start_lsn_by_ts_ns(const obrpc::ObCdcReqStartLSNByTsReq &req,
 | 
					int ObCdcService::req_start_lsn_by_ts_ns(const obrpc::ObCdcReqStartLSNByTsReq &req,
 | 
				
			||||||
@ -312,6 +315,35 @@ int ObCdcService::fetch_missing_log(const obrpc::ObCdcLSFetchMissLogReq &req,
 | 
				
			|||||||
  return ret;
 | 
					  return ret;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					int ObCdcService::init_archive_source(const ObLSID &ls_id,
 | 
				
			||||||
 | 
					    ClientLSCtx &ctx)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
 | 
					  logservice::ObRemoteLogParent *source = ctx.get_source();
 | 
				
			||||||
 | 
					  if (OB_NOT_NULL(source)) {
 | 
				
			||||||
 | 
					    EXTLOG_LOG(WARN, "archive source is not null, no need to init");
 | 
				
			||||||
 | 
					  } else if (OB_ISNULL(source = logservice::ObResSrcAlloctor::alloc(ObLogRestoreSourceType::LOCATION, ls_id))) {
 | 
				
			||||||
 | 
					    ret = OB_ERR_UNEXPECTED;
 | 
				
			||||||
 | 
					    EXTLOG_LOG(WARN, "alloc RemoteLocationParent failed", KR(ret), K(ls_id));
 | 
				
			||||||
 | 
					  } else {
 | 
				
			||||||
 | 
					    share::ObBackupDest archive_dest;
 | 
				
			||||||
 | 
					    if (OB_FAIL(get_backup_dest(ls_id, archive_dest))) {
 | 
				
			||||||
 | 
					      EXTLOG_LOG(WARN, "get backupdest from archivedestinfo failed", KR(ret), K(ls_id));
 | 
				
			||||||
 | 
					    } else if (OB_FAIL(static_cast<logservice::ObRemoteLocationParent*>(source)->set(archive_dest, SCN::max_scn()))) {
 | 
				
			||||||
 | 
					      EXTLOG_LOG(WARN, "source set archive dest info failed", KR(ret), K(archive_dest));
 | 
				
			||||||
 | 
					    } else {
 | 
				
			||||||
 | 
					      ctx.set_source(source);
 | 
				
			||||||
 | 
					      EXTLOG_LOG(WARN, "init archive source succ", K(ctx), K(ls_id));
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    if (OB_FAIL(ret)) {
 | 
				
			||||||
 | 
					      logservice::ObResSrcAlloctor::free(source);
 | 
				
			||||||
 | 
					      source = nullptr;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					  return ret;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
int ObCdcService::query_tenant_archive_info_()
 | 
					int ObCdcService::query_tenant_archive_info_()
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
  int ret = OB_SUCCESS;
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
 | 
				
			|||||||
@ -72,6 +72,7 @@ class ObCdcService: public lib::TGRunnable
 | 
				
			|||||||
{
 | 
					{
 | 
				
			||||||
public:
 | 
					public:
 | 
				
			||||||
  static int get_backup_dest(const share::ObLSID &ls_id, share::ObBackupDest &backup_dest);
 | 
					  static int get_backup_dest(const share::ObLSID &ls_id, share::ObBackupDest &backup_dest);
 | 
				
			||||||
 | 
					  static int init_archive_source(const ObLSID &ls_id, ClientLSCtx &ctx);
 | 
				
			||||||
public:
 | 
					public:
 | 
				
			||||||
  ObCdcService();
 | 
					  ObCdcService();
 | 
				
			||||||
  ~ObCdcService();
 | 
					  ~ObCdcService();
 | 
				
			||||||
@ -138,6 +139,8 @@ private:
 | 
				
			|||||||
private:
 | 
					private:
 | 
				
			||||||
  bool is_inited_;
 | 
					  bool is_inited_;
 | 
				
			||||||
  volatile bool stop_flag_ CACHE_ALIGNED;
 | 
					  volatile bool stop_flag_ CACHE_ALIGNED;
 | 
				
			||||||
 | 
					  uint64_t tenant_id_;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  ObCdcStartLsnLocator locator_;
 | 
					  ObCdcStartLsnLocator locator_;
 | 
				
			||||||
  ObCdcFetcher fetcher_;
 | 
					  ObCdcFetcher fetcher_;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -231,39 +231,34 @@ int ObCdcStartLsnLocator::do_locate_ls_(const bool fetch_archive_only,
 | 
				
			|||||||
    } else {
 | 
					    } else {
 | 
				
			||||||
      result_ts_ns = start_ts_ns;
 | 
					      result_ts_ns = start_ts_ns;
 | 
				
			||||||
      // for RemoteLogIterator::init
 | 
					      // for RemoteLogIterator::init
 | 
				
			||||||
      logservice::GetSourceFunc get_source_func = [&](const ObLSID& ls_id, logservice::ObRemoteSourceGuard &guard) ->int {
 | 
					      ClientLSCtx ctx;
 | 
				
			||||||
        int ret = OB_SUCCESS;
 | 
					      ObCdcGetSourceFunctor get_source_func(ctx);
 | 
				
			||||||
        logservice::ObRemoteLogParent *source = logservice::ObResSrcAlloctor::alloc(share::ObLogRestoreSourceType::LOCATION, ls_id);
 | 
					      ObCdcUpdateSourceFunctor update_source_func(ctx);
 | 
				
			||||||
        logservice::ObRemoteLocationParent *location_source = static_cast<logservice::ObRemoteLocationParent*>(source);
 | 
					      logservice::ObRemoteLogGroupEntryIterator remote_group_iter(get_source_func, update_source_func);
 | 
				
			||||||
        if (OB_ISNULL(location_source)) {
 | 
					      constexpr int64_t MAX_RETRY_COUNT = 4;
 | 
				
			||||||
          ret = OB_ERR_UNEXPECTED;
 | 
					 | 
				
			||||||
          LOG_WARN("source allocated is null", KR(ret), K(ls_id));
 | 
					 | 
				
			||||||
        } else if (OB_FAIL(location_source->set(backup_dest, SCN::max_scn()))) {
 | 
					 | 
				
			||||||
          LOG_WARN("set backup dest failed", KR(ret), K(backup_dest), K(ls_id));
 | 
					 | 
				
			||||||
        } else if (OB_FAIL(guard.set_source(location_source))) {
 | 
					 | 
				
			||||||
          LOG_WARN("remote source guard set source failed", KR(ret), K(ls_id));
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        if (OB_FAIL(ret) && OB_NOT_NULL(location_source)) {
 | 
					 | 
				
			||||||
          logservice::ObResSrcAlloctor::free(location_source);
 | 
					 | 
				
			||||||
          location_source = nullptr;
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
        return ret;
 | 
					 | 
				
			||||||
      };
 | 
					 | 
				
			||||||
      logservice::ObRemoteLogGroupEntryIterator remote_group_iter(get_source_func);
 | 
					 | 
				
			||||||
      // for RemoteLogIterator::next
 | 
					      // for RemoteLogIterator::next
 | 
				
			||||||
      int64_t next_buf_size = 0;
 | 
					      int64_t next_buf_size = 0;
 | 
				
			||||||
      const char *next_buf = NULL;
 | 
					      const char *next_buf = NULL;
 | 
				
			||||||
      LSN lsn;
 | 
					      LSN lsn;
 | 
				
			||||||
 | 
					      int64_t retry_time = 0;
 | 
				
			||||||
      if (OB_FAIL(remote_group_iter.init(tenant_id_, ls_id, start_scn,
 | 
					      do {
 | 
				
			||||||
 | 
					        if (nullptr == ctx.get_source() && OB_FAIL(ObCdcService::init_archive_source(ls_id, ctx))) {
 | 
				
			||||||
 | 
					          LOG_WARN("failed to init archive source", K(ctx), K(ls_id));
 | 
				
			||||||
 | 
					        } else if (! remote_group_iter.is_init() && OB_FAIL(remote_group_iter.init(tenant_id_, ls_id, start_scn,
 | 
				
			||||||
                result_lsn, LSN(palf::LOG_MAX_LSN_VAL), large_buffer_pool_, log_ext_handler_))) {
 | 
					                result_lsn, LSN(palf::LOG_MAX_LSN_VAL), large_buffer_pool_, log_ext_handler_))) {
 | 
				
			||||||
          LOG_WARN("init remote group iter failed when retriving log group entry in start lsn locator", KR(ret), K(ls_id), K(tenant_id_));
 | 
					          LOG_WARN("init remote group iter failed when retriving log group entry in start lsn locator", KR(ret), K(ls_id), K(tenant_id_));
 | 
				
			||||||
        } else if (OB_FAIL(remote_group_iter.next(log_group_entry, lsn, next_buf, next_buf_size))) {
 | 
					        } else if (OB_FAIL(remote_group_iter.next(log_group_entry, lsn, next_buf, next_buf_size))) {
 | 
				
			||||||
 | 
					          if (OB_ITER_END != ret) {
 | 
				
			||||||
            LOG_WARN("iterate through archive log failed", KR(ret), K(ls_id), K(tenant_id_));
 | 
					            LOG_WARN("iterate through archive log failed", KR(ret), K(ls_id), K(tenant_id_));
 | 
				
			||||||
 | 
					          } else {
 | 
				
			||||||
 | 
					            remote_group_iter.update_source_cb();
 | 
				
			||||||
 | 
					            remote_group_iter.reset();
 | 
				
			||||||
 | 
					            LOG_INFO("get iter end from remote_group_iter, retry", K(result_lsn), K(retry_time), K(MAX_RETRY_COUNT));
 | 
				
			||||||
 | 
					          }
 | 
				
			||||||
        } else {
 | 
					        } else {
 | 
				
			||||||
          result_ts_ns = log_group_entry.get_scn().get_val_for_logservice();
 | 
					          result_ts_ns = log_group_entry.get_scn().get_val_for_logservice();
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					      } while (OB_ITER_END == ret && ++retry_time < MAX_RETRY_COUNT);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
  // Unconditional setting ret code
 | 
					  // Unconditional setting ret code
 | 
				
			||||||
 | 
				
			|||||||
@ -120,7 +120,7 @@ public:
 | 
				
			|||||||
  void set_progress(int64_t progress) { client_progress_ = progress; }
 | 
					  void set_progress(int64_t progress) { client_progress_ = progress; }
 | 
				
			||||||
  int64_t get_progress() const { return client_progress_; }
 | 
					  int64_t get_progress() const { return client_progress_; }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  TO_STRING_KV(K_(source),
 | 
					  TO_STRING_KV(KP_(source),
 | 
				
			||||||
               K_(fetch_mode),
 | 
					               K_(fetch_mode),
 | 
				
			||||||
               K_(last_touch_ts),
 | 
					               K_(last_touch_ts),
 | 
				
			||||||
               K_(client_progress))
 | 
					               K_(client_progress))
 | 
				
			||||||
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user