[CdcService] Remove MISUSED ls_id
This commit is contained in:
@ -315,7 +315,8 @@ int ObCdcFetcher::do_fetch_log_(const ObCdcLSFetchLogReq &req,
|
||||
// can't use iter.is_inited to replace need_init_iter, because they have different semantics.
|
||||
// don't block any error code here, let the caller handle the errcode for generality
|
||||
template <class LogEntryType>
|
||||
int ObCdcFetcher::fetch_log_in_palf_(PalfIterator<DiskIteratorStorage, LogEntryType> &iter,
|
||||
int ObCdcFetcher::fetch_log_in_palf_(const ObLSID &ls_id,
|
||||
PalfIterator<DiskIteratorStorage, LogEntryType> &iter,
|
||||
PalfHandleGuard &palf_guard,
|
||||
const LSN &start_lsn,
|
||||
const bool need_init_iter,
|
||||
@ -420,7 +421,7 @@ int ObCdcFetcher::set_fetch_mode_before_fetch_log_(const ObLSID &ls_id,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObCdcFetcher::get_replayable_point_scn_(SCN &replayable_point_scn)
|
||||
int ObCdcFetcher::get_replayable_point_scn_(const ObLSID &ls_id, SCN &replayable_point_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLogService *log_service = MTL(ObLogService*);
|
||||
@ -482,7 +483,7 @@ int ObCdcFetcher::ls_fetch_log_(const ObLSID &ls_id,
|
||||
SCN replayable_point_scn;
|
||||
// find out whether logstream exists in palf, if it exists try switch mode to online when
|
||||
// the gap between progress in ctx and the latest log progress is less than 1 min
|
||||
if (OB_FAIL(get_replayable_point_scn_(replayable_point_scn))) {
|
||||
if (OB_FAIL(get_replayable_point_scn_(ls_id, replayable_point_scn))) {
|
||||
LOG_WARN("get replayable point scn failed", KR(ret), K(ls_id));
|
||||
} else if (OB_FAIL(set_fetch_mode_before_fetch_log_(ls_id, test_switch_fetch_mode,
|
||||
ls_exist_in_palf, palf_guard, ctx))) {
|
||||
@ -512,8 +513,9 @@ int ObCdcFetcher::ls_fetch_log_(const ObLSID &ls_id,
|
||||
LOG_INFO("fetch log quit in time", K(end_tstamp), K(frt), K(fetched_log_count));
|
||||
} // time up
|
||||
else if (FetchMode::FETCHMODE_ONLINE == fetch_mode) {
|
||||
if (OB_FAIL(fetch_log_in_palf_(palf_iter, palf_guard, resp.get_next_req_lsn(),
|
||||
need_init_iter, replayable_point_scn, log_group_entry, lsn))) {
|
||||
if (OB_FAIL(fetch_log_in_palf_(ls_id, palf_iter, palf_guard,
|
||||
resp.get_next_req_lsn(), need_init_iter, replayable_point_scn,
|
||||
log_group_entry, lsn))) {
|
||||
if (OB_ITER_END == ret) {
|
||||
reach_max_lsn = true;
|
||||
} else if (OB_ALLOCATE_MEMORY_FAILED == ret) {
|
||||
@ -837,7 +839,7 @@ int ObCdcFetcher::do_fetch_missing_log_(const obrpc::ObCdcLSFetchMissLogReq &req
|
||||
resp.set_next_miss_lsn(miss_log_array[0].miss_lsn_);
|
||||
resp.set_ls_id(ls_id);
|
||||
SCN replayable_point_scn;
|
||||
if (OB_FAIL(get_replayable_point_scn_(replayable_point_scn))) {
|
||||
if (OB_FAIL(get_replayable_point_scn_(ls_id, replayable_point_scn))) {
|
||||
LOG_WARN("get replayable point scn failed", KR(ret), K(ls_id));
|
||||
} else if (OB_FAIL(prepare_berfore_fetch_missing_(ls_id, ctx, palf_guard, ls_exist_in_palf, archive_is_on))) {
|
||||
LOG_WARN("failed to prepare before fetching missing log", KR(ret), K(ls_id), K(tenant_id_));
|
||||
@ -862,8 +864,9 @@ int ObCdcFetcher::do_fetch_missing_log_(const obrpc::ObCdcLSFetchMissLogReq &req
|
||||
} else {
|
||||
// first, try to fetch logs in palf
|
||||
if (!fetch_archive_only && ls_exist_in_palf) {
|
||||
if (OB_FAIL(fetch_log_in_palf_(palf_iter, palf_guard, missing_lsn,
|
||||
need_init_iter, replayable_point_scn, log_entry, lsn))) {
|
||||
if (OB_FAIL(fetch_log_in_palf_(ls_id, palf_iter, palf_guard,
|
||||
missing_lsn, need_init_iter, replayable_point_scn,
|
||||
log_entry, lsn))) {
|
||||
if (OB_ERR_OUT_OF_LOWER_BOUND == ret) {
|
||||
// block OB_ERR_OUT_OF_LOWER_BOUND
|
||||
ret = OB_SUCCESS;
|
||||
|
@ -94,7 +94,7 @@ private:
|
||||
int64_t &fetched_log_count,
|
||||
ClientLSCtx &ctx,
|
||||
ObCdcFetchLogTimeStats &fetch_time_stat);
|
||||
int get_replayable_point_scn_(SCN &replayable_point_scn);
|
||||
int get_replayable_point_scn_(const ObLSID &ls_id, SCN &replayable_point_scn);
|
||||
FetchMode get_fetch_mode_when_fetching_log_(const ClientLSCtx &ctx,
|
||||
const bool fetch_archive_only);
|
||||
// template method is not defined here for tidiness, should be defined and instantiated in the same file
|
||||
@ -105,7 +105,8 @@ private:
|
||||
// return OB_ALLOCATE_MEMORY_FAILED when allocate memory failed during fetching log
|
||||
// return OB_ITER_END when no more log could be iterated
|
||||
template <class LogEntryType>
|
||||
int fetch_log_in_palf_(palf::PalfIterator<palf::DiskIteratorStorage, LogEntryType> &iter,
|
||||
int fetch_log_in_palf_(const ObLSID &ls_id,
|
||||
palf::PalfIterator<palf::DiskIteratorStorage, LogEntryType> &iter,
|
||||
palf::PalfHandleGuard &palf_guard,
|
||||
const LSN &start_lsn,
|
||||
const bool need_init_iter,
|
||||
|
Reference in New Issue
Block a user