[ease of use] user can specify restore concurrency in restore sql with option

This commit is contained in:
oceanoverflow
2023-02-09 18:21:38 +00:00
committed by ob-robot
parent 7f1eeb45ee
commit 0797855767
7 changed files with 93 additions and 2 deletions

View File

@ -311,6 +311,8 @@ int ObRestoreService::restore_tenant(const ObPhysicalRestoreJob &job_info)
} else if (OB_FAIL(restore_op.update_restore_option(
job_id, "tenant_id", new_tenant_id))) {
LOG_WARN("update restore option", K(ret), K(new_tenant_id), K(job_id), K(tenant_id_));
} else if (OB_FAIL(may_update_restore_concurrency_(new_tenant_id, job_info))) {
LOG_WARN("failed to update restore concurrency", K(ret), K(new_tenant_id), K(job_info));
} else {
idle_time_us_ = 1;// wakeup immediately
}
@ -660,6 +662,8 @@ int ObRestoreService::tenant_restore_finish(const ObPhysicalRestoreJob &job_info
LOG_WARN("not init", K(ret));
} else if (OB_FAIL(check_stop())) {
LOG_WARN("restore scheduler stopped", K(ret));
} else if (OB_FAIL(reset_restore_concurrency_(job_info.get_tenant_id(), job_info))) {
LOG_WARN("failed to reset restore concurrency", K(ret), K(job_info));
} else if (OB_FAIL(try_get_tenant_restore_history_(job_info, history_info))) {
LOG_WARN("failed to get user tenant restory info", KR(ret), K(job_info));
} else if (share::PHYSICAL_RESTORE_SUCCESS == job_info.get_status()) {
@ -1393,5 +1397,61 @@ int ObRestoreService::reset_schema_status_(const uint64_t tenant_id)
return ret;
}
int ObRestoreService::may_update_restore_concurrency_(const uint64_t new_tenant_id, const share::ObPhysicalRestoreJob &job_info)
{
int ret = OB_SUCCESS;
const int64_t concurrency = job_info.get_concurrency();
const ObString &tenant_name = job_info.get_tenant_name();
int64_t ha_high_thread_score = 0;
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(new_tenant_id));
if (tenant_config.is_valid()) {
ha_high_thread_score = tenant_config->ha_high_thread_score;
}
if (!job_info.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get invalid args", K(ret), K(job_info));
} else if (0 == concurrency || 0 != ha_high_thread_score) {
LOG_INFO("do nothing", K(concurrency), K(ha_high_thread_score));
} else if (OB_FAIL(update_restore_concurrency_(tenant_name, new_tenant_id, concurrency))) {
LOG_WARN("failed to update restore concurrency", K(ret), K(job_info));
}
return ret;
}
int ObRestoreService::reset_restore_concurrency_(const uint64_t new_tenant_id, const share::ObPhysicalRestoreJob &job_info)
{
int ret = OB_SUCCESS;
const int64_t concurrency = 0;
const ObString &tenant_name = job_info.get_tenant_name();
if (!job_info.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get invalid args", K(ret), K(job_info));
} else if (OB_FAIL(update_restore_concurrency_(tenant_name, new_tenant_id, concurrency))) {
LOG_WARN("failed to update restore concurrency", K(ret), K(job_info));
}
return ret;
}
int ObRestoreService::update_restore_concurrency_(const common::ObString &tenant_name,
const uint64_t tenant_id, const int64_t concurrency)
{
int ret = OB_SUCCESS;
ObSqlString sql;
int64_t affected_rows = 0;
if (OB_ISNULL(sql_proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sql proxy is null", K(ret));
} else if (OB_FAIL(sql.append_fmt(
"ALTER SYSTEM SET ha_high_thread_score = %ld TENANT = '%.*s'",
concurrency, tenant_name.length(), tenant_name.ptr()))) {
LOG_WARN("failed to append fmt", K(ret), K(tenant_name));
} else if (OB_FAIL(sql_proxy_->write(sql.ptr(), affected_rows))) {
LOG_WARN("failed to write sql", K(ret), K(sql));
} else {
LOG_INFO("update restore concurrency", K(tenant_name), K(concurrency), K(sql));
}
return ret;
}
} // end namespace rootserver
} // end namespace oceanbase

View File

@ -135,6 +135,11 @@ private:
share::ObHisRestoreJobPersistInfo &history_info);
int check_tenant_can_restore_(const uint64_t tenant_id);
int reset_schema_status_(const uint64_t tenant_id);
int may_update_restore_concurrency_(const uint64_t new_tenant_id,
const share::ObPhysicalRestoreJob &job_info);
int reset_restore_concurrency_(const uint64_t new_tenant_id, const share::ObPhysicalRestoreJob &job_info);
int update_restore_concurrency_(const common::ObString &tenant_name, const uint64_t tenant_id,
const int64_t restore_concurrency);
private:
bool inited_;
share::schema::ObMultiVersionSchemaService *schema_service_;

View File

@ -255,6 +255,7 @@ DEF_TO_STRING(ObPhysicalRestoreJob)
K_(compatible),
K_(kms_info),
K_(kms_encrypt),
K_(concurrency),
K_(passwd_array),
K_(multi_restore_path_list),
K_(white_list)
@ -282,6 +283,7 @@ int ObPhysicalRestoreJob::assign(const ObPhysicalRestoreJob &other)
compat_mode_ = other.compat_mode_;
compatible_ = other.compatible_;
kms_encrypt_ = other.kms_encrypt_;
concurrency_ = other.concurrency_;
if (FAILEDx(deep_copy_ob_string(allocator_, other.comment_, comment_))) {
LOG_WARN("failed to copy string", KR(ret), K(other));
@ -344,6 +346,7 @@ void ObPhysicalRestoreJob::reset()
compatible_ = 0;
kms_info_.reset();
kms_encrypt_ = false;
concurrency_ = 0;
passwd_array_.reset();

View File

@ -180,6 +180,7 @@ public:
Property_declare_ObString(kms_info)
Property_declare_int(bool, kms_encrypt)
Property_declare_ObString(passwd_array)
Property_declare_int(int64_t, concurrency)
private:
//job_id and tenant_id in __all_restore_job primary_key

View File

@ -259,6 +259,7 @@ int ObPhysicalRestoreTableOperator::fill_dml_splicer(
ADD_COLUMN_MACRO_IN_TABLE_OPERATOR(job_info, compat_mode);
ADD_COLUMN_MACRO_IN_TABLE_OPERATOR(job_info, compatible);
ADD_COLUMN_MACRO_IN_TABLE_OPERATOR(job_info, passwd_array);
ADD_COLUMN_MACRO_IN_TABLE_OPERATOR(job_info, concurrency);
// source_cluster_version
if (OB_SUCC(ret)) {
@ -483,6 +484,7 @@ int ObPhysicalRestoreTableOperator::retrieve_restore_option(
RETRIEVE_STR_VALUE(passwd_array, job);
RETRIEVE_INT_VALUE(compatible, job);
RETRIEVE_STR_VALUE(kms_info, job);
RETRIEVE_INT_VALUE(concurrency, job);
if (OB_SUCC(ret)) {
if (name == "kms_encrypt") {

View File

@ -31,6 +31,7 @@ ObPhysicalRestoreOptionParser::ExtraArgsCb::Action ObPhysicalRestoreOptionParser
{"locality", ObPhysicalRestoreOptionParser::ExtraArgsCb::set_locality, false},
{"primary_zone", ObPhysicalRestoreOptionParser::ExtraArgsCb::set_primary_zone, false},
{"kms_encrypt", ObPhysicalRestoreOptionParser::ExtraArgsCb::set_kms_encrypt, false},
{"concurrency", ObPhysicalRestoreOptionParser::ExtraArgsCb::set_concurrency, false},
};
ObPhysicalRestoreOptionParser::ExtraArgsCb::ExtraArgsCb(ObPhysicalRestoreJob &job)
@ -181,7 +182,25 @@ int ObPhysicalRestoreOptionParser::ExtraArgsCb::set_kms_encrypt(
return ret;
}
int ObPhysicalRestoreOptionParser::ExtraArgsCb::set_concurrency(
ObPhysicalRestoreJob &job,
const char *val)
{
int ret = OB_SUCCESS;
int64_t concurrency = 0;
if (OB_ISNULL(val)) {
ret = OB_INVALID_ARGUMENT;
} else if (OB_FAIL(ob_atoll(val, concurrency))) {
LOG_WARN("failed to atoll", K(ret), K(val));
} else if (concurrency < 0 || concurrency > 100) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("concurrency is not valid", K(ret), K(concurrency));
LOG_USER_ERROR(OB_INVALID_ARGUMENT, "restore concurrency");
} else {
job.set_concurrency(concurrency);
}
return ret;
}
int ObPhysicalRestoreUriParser::parse(
const common::ObString &multi_uri,

View File

@ -45,6 +45,7 @@ public:
static int set_locality(ObPhysicalRestoreJob &job, const char *val);
static int set_primary_zone(ObPhysicalRestoreJob &job, const char *val);
static int set_kms_encrypt(ObPhysicalRestoreJob &job, const char *val);
static int set_concurrency(ObPhysicalRestoreJob &job, const char *val);
private:
ObPhysicalRestoreJob &job_;
struct Action {
@ -52,7 +53,7 @@ public:
Setter setter;
bool required;
};
const static int ACTION_CNT = 4;
const static int ACTION_CNT = 5;
static Action actions_[ACTION_CNT];
bool is_set_[ACTION_CNT];
};