[master] modify log_restore_concurrency default value & set archive worker count

This commit is contained in:
taoshuning 2023-07-13 15:18:20 +00:00 committed by ob-robot
parent dff6229dbd
commit 4eb4ed3d5d
8 changed files with 46 additions and 15 deletions

View File

@ -114,13 +114,18 @@ int ObLogRestoreArchiveDriver::check_need_schedule_(ObLS &ls,
int64_t &task_count)
{
int ret = OB_SUCCESS;
int64_t concurrency = 0;
ObRemoteFetchContext context;
ObLogRestoreHandler *restore_handler = NULL;
bool need_delay = false;
need_schedule = false;
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id_));
const int64_t concurrency = std::min(MAX_LS_FETCH_LOG_TASK_CONCURRENCY,
tenant_config.is_valid() ? tenant_config->log_restore_concurrency : 1L);
const int64_t log_restore_concurrency = tenant_config.is_valid() ? tenant_config->log_restore_concurrency : 1L;
if (0 == log_restore_concurrency) {
concurrency = std::min(get_restore_concurrency_by_max_cpu(), MAX_LS_FETCH_LOG_TASK_CONCURRENCY);
} else {
concurrency = std::min(log_restore_concurrency, MAX_LS_FETCH_LOG_TASK_CONCURRENCY);
}
if (OB_ISNULL(restore_handler = ls.get_log_restore_handler())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("get restore_handler failed", K(ret), "id", ls.get_ls_id());

View File

@ -18,6 +18,7 @@
#include "lib/string/ob_string.h"
#include "lib/utility/ob_macro_utils.h"
#include "logservice/palf/log_define.h"
#include "share/rc/ob_tenant_base.h"
#include <cstdint>
namespace oceanbase
@ -78,5 +79,11 @@ bool ObLogRestoreSourceTenant::is_valid() const
&& !user_passwd_.is_empty()
&& ip_list_.count() > 0;
}
int64_t get_restore_concurrency_by_max_cpu()
{
return static_cast<int64_t>(MTL_CPU_COUNT() + 7) / 8;
}
} // namespace logservice
} // namespace oceanbase

View File

@ -81,6 +81,8 @@ struct ObLogRestoreSourceTenant final
K_(is_oracle),
K_(ip_list));
};
int64_t get_restore_concurrency_by_max_cpu();
} // namespace logservice
} // namespace oceanbase

View File

@ -14,9 +14,11 @@
#include "lib/ob_define.h"
#include "lib/ob_errno.h"
#include "lib/oblog/ob_log_module.h"
#include "logservice/restoreservice/ob_log_restore_define.h"
#include "ob_remote_fetch_log_worker.h"
#include "ob_log_restore_allocator.h"
#include "observer/omt/ob_tenant_config_mgr.h"
#include "share/rc/ob_tenant_base.h"
#include <cstdint>
namespace oceanbase
@ -65,19 +67,31 @@ void ObLogRestoreScheduler::destroy()
worker_ = NULL;
}
int ObLogRestoreScheduler::schedule()
int ObLogRestoreScheduler::schedule(const share::ObLogRestoreSourceType &source_type)
{
(void)modify_thread_count_();
(void)modify_thread_count_(source_type);
(void)purge_cached_buffer_();
return OB_SUCCESS;
}
int ObLogRestoreScheduler::modify_thread_count_()
int ObLogRestoreScheduler::modify_thread_count_(const share::ObLogRestoreSourceType &source_type)
{
int ret = OB_SUCCESS;
int64_t restore_concurrency = 0;
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id_));
const int64_t restore_concurrency =
const int64_t log_restore_concurrency =
tenant_config.is_valid() ? tenant_config->log_restore_concurrency : 1L;
// primary tenant or log restore source not location type, restore_concurrency is 1
// parameter log_restore_concurrency is default zero, set restore_concurrency = max_cpu / 8, rounded up
// parameter log_restore_concurrency not zero, set restore_concurrency = log_restore_concurrency
if (MTL_GET_TENANT_ROLE() == share::ObTenantRole::PRIMARY_TENANT
|| !share::is_location_log_source_type(source_type)) {
restore_concurrency = 1;
} else if (0 == log_restore_concurrency) {
restore_concurrency = get_restore_concurrency_by_max_cpu();
} else {
restore_concurrency = log_restore_concurrency;
}
if (OB_FAIL(worker_->modify_thread_count(std::max(1L, restore_concurrency)))) {
CLOG_LOG(WARN, "modify worker thread failed", K(ret));
}

View File

@ -14,6 +14,7 @@
#define OCEANBASE_LOGSERVICE_OB_LOG_RESTORE_SCHEDULER_H_
#include "lib/utility/ob_macro_utils.h"
#include "share/restore/ob_log_restore_source.h" // ObLogRestoreSourceType
#include <cstdint>
namespace oceanbase
@ -30,10 +31,10 @@ public:
int init(const uint64_t tenant_id, ObLogRestoreAllocator *allocator, ObRemoteFetchWorker *worker);
void destroy();
int schedule();
int schedule(const share::ObLogRestoreSourceType &source_type);
private:
int modify_thread_count_();
int modify_thread_count_(const share::ObLogRestoreSourceType &source_type);
int purge_cached_buffer_();
private:

View File

@ -196,7 +196,7 @@ void ObLogRestoreService::do_thread_task_()
clean_resource_();
}
schedule_resource_();
schedule_resource_(source.type_);
report_error_();
last_normal_work_ts_ = common::ObTimeUtility::fast_current_time();
}
@ -226,9 +226,9 @@ void ObLogRestoreService::clean_resource_()
(void)fetch_log_impl_.clean_resource();
}
void ObLogRestoreService::schedule_resource_()
void ObLogRestoreService::schedule_resource_(const share::ObLogRestoreSourceType &source_type)
{
(void)scheduler_.schedule();
(void)scheduler_.schedule(source_type);
}
void ObLogRestoreService::report_error_()

View File

@ -76,7 +76,7 @@ private:
void update_restore_quota_();
int update_upstream_(share::ObLogRestoreSourceItem &source, bool &source_exist);
void schedule_fetch_log_(share::ObLogRestoreSourceItem &source);
void schedule_resource_();
void schedule_resource_(const share::ObLogRestoreSourceType &source_type);
void clean_resource_();
void report_error_();
void update_restore_upper_limit_();

View File

@ -519,9 +519,11 @@ DEF_STR_WITH_CHECKER(log_transport_compress_func, OB_TENANT_PARAMETER, "lz4_1.0"
// "control if enable log archive",
// ObParameterAttr(Section::LOGSERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_INT(log_restore_concurrency, OB_TENANT_PARAMETER, "1", "[1, 100]",
"log restore concurrency, for both restore tenant and standby tenant. "
"Range: [1, 100] in integer",
DEF_INT(log_restore_concurrency, OB_TENANT_PARAMETER, "0", "[0, 100]",
"log restore concurrency, for both the restore tenant and standby tenant. "
"If the value is default 0, the database will automatically calculate the number of restore worker threads "
"based on the tenant specification, which is tenant max_cpu; otherwise set the the worker count equals to the value."
"Range: [0, 100] in integer",
ObParameterAttr(Section::LOGSERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_INT(log_archive_concurrency, OB_TENANT_PARAMETER, "0", "[0, 100]",