[Net Standby] Modify standby fetch log upper limit refresh interval and threshold
This commit is contained in:
@ -485,7 +485,12 @@ int FetchStream::get_upper_limit(int64_t &upper_limit_us)
|
||||
|
||||
global_upper_limit = progress_controller_->get_global_upper_limit();
|
||||
if (OB_INVALID_TIMESTAMP != global_upper_limit) {
|
||||
upper_limit_us = std::min(upper_limit_us, global_upper_limit);
|
||||
int64_t log_progress = ls_fetch_ctx_->get_progress();
|
||||
if (log_progress < global_upper_limit) {
|
||||
upper_limit_us = INT64_MAX - 1;
|
||||
} else {
|
||||
upper_limit_us = std::min(upper_limit_us, global_upper_limit);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -216,7 +216,6 @@ int ObLogRestoreNetDriver::scan_ls(const share::ObLogRestoreSourceType &type)
|
||||
}
|
||||
}
|
||||
delete_fetcher_if_needed_with_lock_();
|
||||
set_restore_log_upper_limit_();
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -531,7 +530,7 @@ int ObLogRestoreNetDriver::get_ls_count_in_fetcher_(int64_t &count)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLogRestoreNetDriver::set_restore_log_upper_limit_()
|
||||
int ObLogRestoreNetDriver::set_restore_log_upper_limit()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
share::SCN upper_limit_scn;
|
||||
|
@ -79,7 +79,7 @@ public:
|
||||
void clean_resource();
|
||||
|
||||
// set the max scn can be restored
|
||||
int set_restore_log_upper_limit_();
|
||||
int set_restore_log_upper_limit();
|
||||
|
||||
private:
|
||||
// TODO LogFetcher如何区分LogRestoreSource变化了, 比如从cluster 1的tenant A, 变为了cluster 2的tenant B
|
||||
|
@ -33,6 +33,8 @@ using namespace oceanbase::storage;
|
||||
using namespace oceanbase::palf;
|
||||
ObLogRestoreService::ObLogRestoreService() :
|
||||
inited_(false),
|
||||
last_normal_work_ts_(OB_INVALID_TIMESTAMP),
|
||||
last_update_restore_upper_limit_ts_(OB_INVALID_TIMESTAMP),
|
||||
ls_svr_(NULL),
|
||||
proxy_(),
|
||||
location_adaptor_(),
|
||||
@ -107,6 +109,8 @@ void ObLogRestoreService::destroy()
|
||||
allocator_.destroy();
|
||||
scheduler_.destroy();
|
||||
ls_svr_ = NULL;
|
||||
last_normal_work_ts_ = OB_INVALID_TIMESTAMP;
|
||||
last_update_restore_upper_limit_ts_ = OB_INVALID_TIMESTAMP;
|
||||
}
|
||||
|
||||
int ObLogRestoreService::start()
|
||||
@ -153,15 +157,16 @@ void ObLogRestoreService::run1()
|
||||
lib::set_thread_name("LogRessvr");
|
||||
ObCurTraceId::init(GCONF.self_addr_);
|
||||
|
||||
const int64_t THREAD_RUN_INTERVAL = 1 * 1000 * 1000L;
|
||||
if (OB_UNLIKELY(! inited_)) {
|
||||
LOG_ERROR_RET(OB_NOT_INIT, "ObLogRestoreService not init", "tenant_id", MTL_ID());
|
||||
} else {
|
||||
while (! has_set_stop()) {
|
||||
int64_t begin_stamp = ObTimeUtility::current_time();
|
||||
const bool is_primary = MTL_GET_TENANT_ROLE() == share::ObTenantRole::PRIMARY_TENANT;
|
||||
const int64_t thread_interval = is_primary ? PRIMARY_THREAD_RUN_INTERVAL : STANDBY_THREAD_RUN_INTERVAL;
|
||||
do_thread_task_();
|
||||
int64_t end_tstamp = ObTimeUtility::current_time();
|
||||
int64_t wait_interval = THREAD_RUN_INTERVAL - (end_tstamp - begin_stamp);
|
||||
int64_t end_tstamp = ObTimeUtility::fast_current_time();
|
||||
int64_t wait_interval = thread_interval - (end_tstamp - begin_stamp);
|
||||
if (wait_interval > 0) {
|
||||
cond_.timedwait(wait_interval);
|
||||
}
|
||||
@ -174,24 +179,28 @@ void ObLogRestoreService::do_thread_task_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (is_user_tenant(MTL_ID())) {
|
||||
share::ObLogRestoreSourceItem source;
|
||||
bool source_exist = false;
|
||||
if (need_schedule_()) {
|
||||
share::ObLogRestoreSourceItem source;
|
||||
bool source_exist = false;
|
||||
|
||||
update_restore_quota_();
|
||||
update_restore_quota_();
|
||||
|
||||
if (OB_FAIL(update_upstream_(source, source_exist))) {
|
||||
LOG_WARN("update_upstream_ failed");
|
||||
} else if (source_exist) {
|
||||
// log restore source exist, do schedule
|
||||
// source_exist means tenant_role is standby or restore and log_restore_source exists
|
||||
schedule_fetch_log_(source);
|
||||
} else {
|
||||
// tenant_role not match or log_restore_source not exist
|
||||
clean_resource_();
|
||||
if (OB_FAIL(update_upstream_(source, source_exist))) {
|
||||
LOG_WARN("update_upstream_ failed");
|
||||
} else if (source_exist) {
|
||||
// log restore source exist, do schedule
|
||||
// source_exist means tenant_role is standby or restore and log_restore_source exists
|
||||
schedule_fetch_log_(source);
|
||||
} else {
|
||||
// tenant_role not match or log_restore_source not exist
|
||||
clean_resource_();
|
||||
}
|
||||
|
||||
schedule_resource_();
|
||||
report_error_();
|
||||
last_normal_work_ts_ = common::ObTimeUtility::fast_current_time();
|
||||
}
|
||||
|
||||
schedule_resource_();
|
||||
report_error_();
|
||||
update_restore_upper_limit_();
|
||||
}
|
||||
}
|
||||
|
||||
@ -225,5 +234,14 @@ void ObLogRestoreService::report_error_()
|
||||
(void)error_reporter_.report_error();
|
||||
}
|
||||
|
||||
void ObLogRestoreService::update_restore_upper_limit_()
|
||||
{
|
||||
fetch_log_impl_.update_restore_upper_limit();
|
||||
}
|
||||
|
||||
bool ObLogRestoreService::need_schedule_() const
|
||||
{
|
||||
return common::ObTimeUtility::fast_current_time() - last_normal_work_ts_ > SCHEDULE_INTERVAL;
|
||||
}
|
||||
} // namespace logservice
|
||||
} // namespace oceanbase
|
||||
|
@ -48,6 +48,10 @@ using oceanbase::storage::ObLSService;
|
||||
// provide the ability to fetch log from remote cluster and backups
|
||||
class ObLogRestoreService : public share::ObThreadPool
|
||||
{
|
||||
const int64_t SCHEDULE_INTERVAL = 1000 * 1000L; // 1s
|
||||
const int64_t UPDATE_RESTORE_UPPER_LIMIT_INTERVAL = 100 * 1000L; // 100ms
|
||||
const int64_t PRIMARY_THREAD_RUN_INTERVAL = 1000 * 1000L; // 1s
|
||||
const int64_t STANDBY_THREAD_RUN_INTERVAL = 100 * 1000L; // 100ms
|
||||
public:
|
||||
ObLogRestoreService();
|
||||
~ObLogRestoreService();
|
||||
@ -75,9 +79,13 @@ private:
|
||||
void schedule_resource_();
|
||||
void clean_resource_();
|
||||
void report_error_();
|
||||
void update_restore_upper_limit_();
|
||||
bool need_schedule_() const;
|
||||
|
||||
private:
|
||||
bool inited_;
|
||||
int64_t last_normal_work_ts_;
|
||||
int64_t last_update_restore_upper_limit_ts_;
|
||||
ObLSService *ls_svr_;
|
||||
ObLogResSvrRpc proxy_;
|
||||
ObLogRestoreController restore_controller_;
|
||||
|
@ -98,5 +98,9 @@ void ObRemoteFetchLogImpl::clean_resource()
|
||||
net_driver_->clean_resource();
|
||||
}
|
||||
|
||||
void ObRemoteFetchLogImpl::update_restore_upper_limit()
|
||||
{
|
||||
(void)net_driver_->set_restore_log_upper_limit();
|
||||
}
|
||||
} // namespace logservice
|
||||
} // namespace oceanbase
|
||||
|
@ -38,6 +38,7 @@ public:
|
||||
void destroy();
|
||||
int do_schedule(const share::ObLogRestoreSourceItem &source);
|
||||
void clean_resource();
|
||||
void update_restore_upper_limit();
|
||||
|
||||
private:
|
||||
bool inited_;
|
||||
|
@ -2134,7 +2134,6 @@ void ObPrimaryLSService::do_work()
|
||||
int64_t idle_time_us = 100 * 1000L;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
while (!has_set_stop()) {
|
||||
idle_time_us = 1000 * 1000L;
|
||||
{
|
||||
ObCurTraceId::init(GCONF.self_addr_);
|
||||
share::schema::ObSchemaGetterGuard schema_guard;
|
||||
|
Reference in New Issue
Block a user