add config enable_transfer
This commit is contained in:
@ -255,8 +255,8 @@ int ObService::register_self()
|
||||
LOG_WARN("register self failed", KR(ret));
|
||||
} else if (!lease_state_mgr_.is_valid_heartbeat()) {
|
||||
ret = OB_ERROR;
|
||||
LOG_ERROR("can't renew lease", KR(ret),
|
||||
"heartbeat_expire_time", lease_state_mgr_.get_heartbeat_expire_time());
|
||||
LOG_ERROR("can't renew lease, the time difference between local and RS may be more than 2s",
|
||||
KR(ret), "heartbeat_expire_time", lease_state_mgr_.get_heartbeat_expire_time());
|
||||
} else {
|
||||
in_register_process_ = false;
|
||||
service_started_ = true;
|
||||
|
||||
@ -525,7 +525,7 @@ int ObBalanceTaskExecuteService::cancel_other_init_task_(
|
||||
} else {
|
||||
//set task status to failed
|
||||
if (OB_FAIL(comment.assign_fmt("Canceled due to parent task %ld was canceled",
|
||||
other_task.get_balance_task_id().id()))) {
|
||||
task.get_balance_task_id().id()))) {
|
||||
LOG_WARN("failed to assign fmt", KR(tmp_ret), K(task), K(other_task));
|
||||
} else if (OB_FAIL(try_update_task_comment_(other_task, comment, trans))) {
|
||||
LOG_WARN("failed to update task comment", KR(tmp_ret), KR(ret), K(task), K(comment));
|
||||
|
||||
@ -93,6 +93,9 @@ void ObCommonLSService::do_work()
|
||||
LOG_WARN("failed to create ls", KR(ret), KR(tmp_ret), K(user_tenant_schema));
|
||||
}
|
||||
if (OB_SUCC(ret) && !user_tenant_schema.is_dropping()) {
|
||||
if (OB_TMP_FAIL(ObBalanceLSPrimaryZone::try_adjust_user_ls_primary_zone(user_tenant_schema))) {
|
||||
LOG_WARN("failed to adjust user tenant primary zone", KR(ret), KR(tmp_ret), K(user_tenant_schema));
|
||||
}
|
||||
if (OB_TMP_FAIL(try_modify_ls_unit_group_(user_tenant_schema))) {
|
||||
LOG_WARN("failed to modify ls unit group", KR(ret), KR(tmp_ret), K(user_tenant_schema));
|
||||
}
|
||||
|
||||
@ -1063,8 +1063,6 @@ int ObRecoveryLSService::do_standby_balance_()
|
||||
LOG_WARN("sql can't null", K(ret), K(proxy_));
|
||||
} else if (OB_FAIL(get_tenant_schema(tenant_id_, tenant_schema))) {
|
||||
LOG_WARN("failed to get tenant schema", KR(ret), K(tenant_id_));
|
||||
} else if (OB_FAIL(ObBalanceLSPrimaryZone::try_adjust_user_ls_primary_zone(tenant_schema))) {
|
||||
LOG_WARN("failed to adjust user ls primary zone", KR(ret), K(tenant_schema));
|
||||
} else {
|
||||
ObTenantLSInfo tenant_info(proxy_, &tenant_schema, tenant_id_);
|
||||
if (OB_FAIL(ObLSServiceHelper::balance_ls_group(tenant_info))) {
|
||||
|
||||
@ -67,28 +67,9 @@ void ObTenantBalanceService::destroy()
|
||||
inited_ = false;
|
||||
}
|
||||
|
||||
int ObTenantBalanceService::balance_primary_zone_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", KR(ret));
|
||||
} else if (OB_ISNULL(GCTX.sql_proxy_)
|
||||
|| OB_ISNULL(GCTX.schema_service_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ptr is null", KR(ret), KP(GCTX.sql_proxy_), KP(GCTX.schema_service_));
|
||||
} else {
|
||||
ObTenantSchema tenant_schema_copy;
|
||||
if (OB_FAIL(get_tenant_schema(tenant_id_, tenant_schema_copy))) {
|
||||
LOG_WARN("failed to get tenant schema", KR(ret), K(tenant_id_));
|
||||
} else if (OB_FAIL(ObBalanceLSPrimaryZone::try_adjust_user_ls_primary_zone(tenant_schema_copy))) {
|
||||
LOG_WARN("failed to adjust user tenant primary zone", KR(ret),
|
||||
K(tenant_id_));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
// enable_balance = true, enable_transfer = true: balance with LS dynamic change
|
||||
// enable_balance = true, enable_transfer = false: balance without LS dynamic change
|
||||
// enable_balance = false, enable_transfer does not take effect: do not balance
|
||||
void ObTenantBalanceService::do_work()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -115,18 +96,27 @@ void ObTenantBalanceService::do_work()
|
||||
} else if (0 == job_cnt && ObShareUtil::is_tenant_enable_rebalance(tenant_id_)) {
|
||||
//check ls status is match with __all_ls
|
||||
//TODO
|
||||
if (ObShareUtil::is_tenant_enable_transfer(tenant_id_)) {
|
||||
if (OB_FAIL(gather_ls_status_stat_())) {
|
||||
LOG_WARN("failed to gather ls status", KR(ret));
|
||||
} else if (OB_FAIL(ls_balance_(job_cnt))) {
|
||||
LOG_WARN("failed to do ls balance", KR(ret));
|
||||
} else if (0 == job_cnt) {
|
||||
//balance primary zone
|
||||
if (OB_FAIL(balance_primary_zone_())) {
|
||||
LOG_WARN("failed to balance primary zone", KR(ret));
|
||||
} else if (OB_FAIL(try_do_partition_balance_(last_partition_balance_time))) {
|
||||
if (OB_FAIL(try_do_partition_balance_(last_partition_balance_time))) {
|
||||
LOG_WARN("try do partition balance failed", KR(ret), K(last_partition_balance_time));
|
||||
}
|
||||
}
|
||||
} else { // disable transfer
|
||||
ObTenantSchema tenant_schema_copy;
|
||||
if (OB_FAIL(get_tenant_schema(tenant_id_, tenant_schema_copy))) {
|
||||
LOG_WARN("failed to get tenant schema", KR(ret), K(tenant_id_));
|
||||
} else {
|
||||
ObTenantLSInfo tenant_info(GCTX.sql_proxy_, &tenant_schema_copy, tenant_id_);
|
||||
if (OB_FAIL(ObLSServiceHelper::balance_ls_group(tenant_info))) {
|
||||
LOG_WARN("failed to balance ls group", KR(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// separate statistic to avoid affecting balance jobs
|
||||
@ -144,7 +134,8 @@ void ObTenantBalanceService::do_work()
|
||||
ISTAT("finish one round", KR(ret), KR(tmp_ret), K_(tenant_id), K(job_cnt),
|
||||
K(primary_zone_num_), K(unit_group_array_),
|
||||
K(ls_array_), K(idle_time_us), K(last_partition_balance_time), K(last_statistic_bg_stat_time),
|
||||
"enable_rebalance", ObShareUtil::is_tenant_enable_rebalance(tenant_id_));
|
||||
"enable_rebalance", ObShareUtil::is_tenant_enable_rebalance(tenant_id_),
|
||||
"enable_transfer", ObShareUtil::is_tenant_enable_transfer(tenant_id_));
|
||||
reset();
|
||||
idle(idle_time_us);
|
||||
}// end while
|
||||
@ -465,12 +456,14 @@ int ObTenantBalanceService::check_ls_job_need_cancel_(const share::ObBalanceJob
|
||||
} else if (OB_UNLIKELY(!job.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("job is invalid", KR(ret), K(job));
|
||||
} else if (!ObShareUtil::is_tenant_enable_rebalance(tenant_id_)) {
|
||||
} else if (!ObShareUtil::is_tenant_enable_transfer(tenant_id_)) {
|
||||
need_cancel = true;
|
||||
if (OB_TMP_FAIL(comment.assign_fmt("Canceled due to tenant balance is disabled"))) {
|
||||
if (OB_TMP_FAIL(comment.assign_fmt("Canceled due to tenant balance or transfer is disabled"))) {
|
||||
LOG_WARN("failed to assign fmt", KR(tmp_ret), K(job));
|
||||
}
|
||||
ISTAT("tenant balance is disabled, need cancel current job", K(job), K(comment));
|
||||
ISTAT("tenant balance or transfer is disabled, need cancel current job", K(job), K(comment),
|
||||
"enable_balance", ObShareUtil::is_tenant_enable_transfer(tenant_id_),
|
||||
"enable_transfer", ObShareUtil::is_tenant_enable_transfer(tenant_id_));
|
||||
} else if (job.get_primary_zone_num() != primary_zone_num_) {
|
||||
need_cancel = true;
|
||||
if (OB_TMP_FAIL(comment.assign_fmt("Canceled due to primary zone num change from %ld to %ld",
|
||||
|
||||
@ -98,7 +98,6 @@ private:
|
||||
ObArray<share::ObBalanceTask> &tasks);
|
||||
int construct_dependency_of_each_task_(ObArray<share::ObBalanceTask> &tasks);
|
||||
int lock_and_check_balance_job_(common::ObMySQLTransaction &trans, const uint64_t tenant_id);
|
||||
int balance_primary_zone_();
|
||||
int try_update_job_comment_(const share::ObBalanceJob &job, const common::ObSqlString &comment);
|
||||
int try_do_partition_balance_(int64_t &last_partition_balance_time);
|
||||
int try_statistic_balance_group_status_(int64_t &last_statistic_bg_stat_time);
|
||||
|
||||
@ -333,6 +333,23 @@ bool ObShareUtil::is_tenant_enable_rebalance(const uint64_t tenant_id)
|
||||
return bret;
|
||||
}
|
||||
|
||||
bool ObShareUtil::is_tenant_enable_transfer(const uint64_t tenant_id)
|
||||
{
|
||||
bool bret = false;
|
||||
if (is_valid_tenant_id(tenant_id)) {
|
||||
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
|
||||
if (OB_UNLIKELY(!tenant_config.is_valid())) {
|
||||
LOG_WARN_RET(OB_ERR_UNEXPECTED, "tenant config is invalid", K(tenant_id));
|
||||
} else if (!tenant_config->enable_rebalance) {
|
||||
// if enable_rebalance is disabled, transfer is not allowed
|
||||
bret = false;
|
||||
} else {
|
||||
bret = tenant_config->enable_transfer;
|
||||
}
|
||||
}
|
||||
return bret;
|
||||
}
|
||||
|
||||
ERRSIM_POINT_DEF(ERRSIM_USER_LS_SYNC_SCN);
|
||||
int ObShareUtil::wait_user_ls_sync_scn_locally(const share::SCN &sys_ls_target_scn, storage::ObLS &ls)
|
||||
{
|
||||
|
||||
@ -87,6 +87,7 @@ public:
|
||||
static int wait_user_ls_sync_scn_locally(const share::SCN &sys_ls_target_scn, storage::ObLS &ls);
|
||||
|
||||
static bool is_tenant_enable_rebalance(const uint64_t tenant_id);
|
||||
static bool is_tenant_enable_transfer(const uint64_t tenant_id);
|
||||
};
|
||||
}//end namespace share
|
||||
}//end namespace oceanbase
|
||||
|
||||
@ -614,6 +614,11 @@ DEF_BOOL(enable_rebalance, OB_TENANT_PARAMETER, "True",
|
||||
"specifies whether the tenant load-balancing is turned on. "
|
||||
"Value: True:turned on False: turned off",
|
||||
ObParameterAttr(Section::LOAD_BALANCE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
DEF_BOOL(enable_transfer, OB_TENANT_PARAMETER, "True",
|
||||
"controls whether transfers are allowed in the tenant. "
|
||||
"This config does not take effect when enable_rebalance is disabled. "
|
||||
"Value: True:turned on False:turned off",
|
||||
ObParameterAttr(Section::LOAD_BALANCE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
DEF_TIME(balancer_idle_time, OB_TENANT_PARAMETER, "10s", "[10s,]",
|
||||
"the time interval between the schedules of the tenant load-balancing task. "
|
||||
"Range: [10s, +∞)",
|
||||
|
||||
@ -92,6 +92,7 @@ enable_syslog_wf
|
||||
enable_sys_table_ddl
|
||||
enable_sys_unit_standalone
|
||||
enable_tcp_keepalive
|
||||
enable_transfer
|
||||
enable_upgrade_mode
|
||||
enable_user_defined_rewrite_rules
|
||||
external_kms_info
|
||||
|
||||
Reference in New Issue
Block a user