[OBCDC] fix a bug when fetch missing log time out
This commit is contained in:
@ -839,6 +839,7 @@ int ObCdcFetcher::do_fetch_missing_log_(const obrpc::ObCdcLSFetchMissLogReq &req
|
||||
resp.set_next_miss_lsn(missing_lsn);
|
||||
int64_t start_fetch_ts = ObTimeUtility::current_time();
|
||||
bool log_fetched_in_palf = false;
|
||||
bool log_fetched_in_archive = false;
|
||||
|
||||
if (is_time_up_(fetched_log_count, end_tstamp)) { // time up, stop fetching logs globally
|
||||
frt.stop("TimeUP");
|
||||
@ -863,32 +864,35 @@ int ObCdcFetcher::do_fetch_missing_log_(const obrpc::ObCdcLSFetchMissLogReq &req
|
||||
if (OB_FAIL(fetch_log_in_archive_(ls_id, remote_iter, missing_lsn,
|
||||
need_init_iter, log_entry, lsn, ctx))) {
|
||||
LOG_WARN("fetch missng log in archive failed", KR(ret), K(missing_lsn));
|
||||
} else {
|
||||
log_fetched_in_archive = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_UNLIKELY(missing_lsn != lsn)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("do_fetch_missing_log missing_lsn not match", KR(ret), K(tenant_id_), K(ls_id),
|
||||
K(missing_lsn), K(lsn));
|
||||
} else {
|
||||
resp.inc_log_fetch_time(ObTimeUtility::current_time() - start_fetch_ts);
|
||||
check_next_entry_(lsn, log_entry, resp, frt);
|
||||
|
||||
if (frt.is_stopped()) {
|
||||
// Stop fetching log
|
||||
} else if (OB_FAIL(prefill_resp_with_log_entry_(ls_id, lsn, log_entry, resp))) {
|
||||
if (OB_BUF_NOT_ENOUGH == ret) {
|
||||
handle_when_buffer_full_(frt); // stop
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
LOG_WARN("prefill_resp_with_log_entry fail", KR(ret), K(frt), K(end_tstamp), K(resp));
|
||||
}
|
||||
if (OB_SUCC(ret) && (log_fetched_in_palf || log_fetched_in_archive)) {
|
||||
if (OB_UNLIKELY(missing_lsn != lsn)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("do_fetch_missing_log missing_lsn not match", KR(ret), K(tenant_id_), K(ls_id),
|
||||
K(missing_lsn), K(lsn));
|
||||
} else {
|
||||
// log fetched successfully
|
||||
fetched_log_count++;
|
||||
resp.inc_log_fetch_time(ObTimeUtility::current_time() - start_fetch_ts);
|
||||
check_next_entry_(lsn, log_entry, resp, frt);
|
||||
|
||||
LOG_TRACE("LS fetch a missing log", K(tenant_id_), K(ls_id), K(fetched_log_count), K(frt));
|
||||
if (frt.is_stopped()) {
|
||||
// Stop fetching log
|
||||
} else if (OB_FAIL(prefill_resp_with_log_entry_(ls_id, lsn, log_entry, resp))) {
|
||||
if (OB_BUF_NOT_ENOUGH == ret) {
|
||||
handle_when_buffer_full_(frt); // stop
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
LOG_WARN("prefill_resp_with_log_entry fail", KR(ret), K(frt), K(end_tstamp), K(resp));
|
||||
}
|
||||
} else {
|
||||
// log fetched successfully
|
||||
fetched_log_count++;
|
||||
|
||||
LOG_TRACE("LS fetch a missing log", K(tenant_id_), K(ls_id), K(fetched_log_count), K(frt));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user