Limit archive memory usage based on send_task_status
This commit is contained in:
@ -452,6 +452,8 @@ int ObArchiveFetcher::check_need_delay_(const ObLSID &id,
|
|||||||
SCN fetch_scn;
|
SCN fetch_scn;
|
||||||
need_delay = false;
|
need_delay = false;
|
||||||
int64_t send_task_count = 0;
|
int64_t send_task_count = 0;
|
||||||
|
int64_t ls_archive_task_count = 0;
|
||||||
|
int64_t send_task_status_count = 0;
|
||||||
|
|
||||||
if (FALSE_IT(check_capacity_enough_(commit_lsn, cur_lsn, end_lsn, data_enough, data_full))) {
|
if (FALSE_IT(check_capacity_enough_(commit_lsn, cur_lsn, end_lsn, data_enough, data_full))) {
|
||||||
} else if (data_full) {
|
} else if (data_full) {
|
||||||
@ -473,6 +475,14 @@ int ObArchiveFetcher::check_need_delay_(const ObLSID &id,
|
|||||||
} else if (true == (need_delay = ! check_scn_enough_(fetch_scn, commit_scn))) {
|
} else if (true == (need_delay = ! check_scn_enough_(fetch_scn, commit_scn))) {
|
||||||
ARCHIVE_LOG(TRACE, "scn not enough, need delay", K(id),
|
ARCHIVE_LOG(TRACE, "scn not enough, need delay", K(id),
|
||||||
K(station), K(fetch_scn), K(commit_scn));
|
K(station), K(fetch_scn), K(commit_scn));
|
||||||
|
} else {
|
||||||
|
ls_archive_task_count = ls_mgr_->get_ls_task_count();
|
||||||
|
send_task_status_count = archive_sender_->get_send_task_status_count();
|
||||||
|
if (ls_archive_task_count < send_task_status_count) {
|
||||||
|
need_delay = true;
|
||||||
|
ARCHIVE_LOG(TRACE, "archive_sender_ task status count more than ls archive task count, just wait",
|
||||||
|
K(ls_archive_task_count), K(send_task_status_count), K(need_delay));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -117,7 +117,6 @@ private:
|
|||||||
int get_entry_(LogEntryType &entry, LSN &lsn, const char *&buf, int64_t &buf_size);
|
int get_entry_(LogEntryType &entry, LSN &lsn, const char *&buf, int64_t &buf_size);
|
||||||
void update_data_gen_max_lsn_();
|
void update_data_gen_max_lsn_();
|
||||||
void mark_source_error_(const int ret_code);
|
void mark_source_error_(const int ret_code);
|
||||||
bool is_retry_ret_(const bool ret_code) const;
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
bool inited_;
|
bool inited_;
|
||||||
|
|||||||
@ -334,10 +334,3 @@ void ObRemoteLogIterator<LogEntryType>::mark_source_error_(const int ret_code)
|
|||||||
source->mark_error(*ObCurTraceId::get_trace_id(), ret_code);
|
source->mark_error(*ObCurTraceId::get_trace_id(), ret_code);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
template<class LogEntryType>
|
|
||||||
bool ObRemoteLogIterator<LogEntryType>::is_retry_ret_(const bool ret_code) const
|
|
||||||
{
|
|
||||||
return OB_ALLOCATE_MEMORY_FAILED == ret_code
|
|
||||||
|| is_io_error(ret_code);
|
|
||||||
}
|
|
||||||
|
|||||||
Reference in New Issue
Block a user