fix standby tenant ls balance checking in rs job
This commit is contained in:
@ -112,7 +112,6 @@ static const char* job_status_str_array[JOB_STATUS_MAX] = {
|
||||
"SUCCESS",
|
||||
"FAILED",
|
||||
"SKIP_CHECKING_LS_STATUS",
|
||||
"CANCELED"
|
||||
};
|
||||
|
||||
ObRsJobStatus ObRsJobTableOperator::get_job_status(const common::ObString &job_status_str)
|
||||
@ -416,12 +415,6 @@ int ObRsJobTableOperator::complete_job(int64_t job_id, int result_code, common::
|
||||
} else if (OB_FAIL(pairs.add_column("job_status", job_status_str_array[JOB_STATUS_SKIP_CHECKING_LS_STATUS]))) {
|
||||
LOG_WARN("failed to add column", K(ret));
|
||||
}
|
||||
} else if (OB_CANCELED == result_code) {
|
||||
if (OB_FAIL(pairs.add_column("result_code", result_code))) {
|
||||
LOG_WARN("failed to add column", K(ret));
|
||||
} else if (OB_FAIL(pairs.add_column("job_status", job_status_str_array[JOB_STATUS_CANCELED]))) {
|
||||
LOG_WARN("failed to add column", K(ret));
|
||||
}
|
||||
} else {
|
||||
if (OB_FAIL(pairs.add_column("result_code", result_code))) {
|
||||
LOG_WARN("failed to add column", K(ret));
|
||||
|
||||
@ -103,7 +103,6 @@ enum ObRsJobStatus
|
||||
JOB_STATUS_SUCCESS,
|
||||
JOB_STATUS_FAILED,
|
||||
JOB_STATUS_SKIP_CHECKING_LS_STATUS,
|
||||
JOB_STATUS_CANCELED,
|
||||
JOB_STATUS_MAX
|
||||
};
|
||||
|
||||
|
||||
@ -108,7 +108,6 @@ int ObShrinkExpandResourcePoolChecker::check_shrink_resource_pool_finished_by_te
|
||||
ObArray<uint64_t> pool_ids;
|
||||
bool in_shrinking = true;
|
||||
bool is_finished = true;
|
||||
int check_ret = OB_NEED_WAIT;
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObShrinkExpandResourcePoolChecker not init", KR(ret));
|
||||
@ -128,11 +127,14 @@ int ObShrinkExpandResourcePoolChecker::check_shrink_resource_pool_finished_by_te
|
||||
} else if (OB_FAIL(unit_mgr_->check_pool_in_shrinking(pool_ids.at(0), in_shrinking))) {
|
||||
LOG_WARN("failed to check resource pool in shrink", KR(ret), K(pool_ids));
|
||||
} else if (!in_shrinking) {
|
||||
// check if there exists expand task
|
||||
// if exists, then check whether this task can be committed and commit it if ls is balanced
|
||||
// check if there exists ALTER_RESOURCE_TENANT_UNIT_NUM rs job
|
||||
// if exists
|
||||
// not in_shrinking means that the rs job created by a EXPAND task
|
||||
// or a SHRINK task which has cleared deleting units in __all_unit table
|
||||
// check whether this task can be committed (i.e. ls is balanced)
|
||||
// if not exists, return OB_SUCCESS
|
||||
if (OB_FAIL(check_and_commit_expand_resource_pool_(tenant_id))) {
|
||||
LOG_WARN("fail to execute check_and_commit_expand_resource_pool_", KR(ret), K(tenant_id));
|
||||
if (OB_FAIL(check_and_commit_rs_job_(tenant_id))) {
|
||||
LOG_WARN("fail to execute check_and_commit_rs_job_", KR(ret), K(tenant_id));
|
||||
}
|
||||
} else {
|
||||
//check shrink finish
|
||||
@ -151,30 +153,14 @@ int ObShrinkExpandResourcePoolChecker::check_shrink_resource_pool_finished_by_te
|
||||
LOG_WARN("failed to extract units server and ids", KR(ret), K(units));
|
||||
}
|
||||
}//end for get all unit group, units, server
|
||||
// find the corresponding rs job at first, then check if we can complete it
|
||||
// if we only find the rs job at the committing period,
|
||||
// we do not know whether the job has been changed during checking process
|
||||
// e.g. job 1 is the rs job before checking,
|
||||
// right after checking, job 2 is created and job 1 is canceled by job 2,
|
||||
// then committing process will find job 2 and complete job 2 immediately,
|
||||
// which means, job 2 is completed without checking.
|
||||
if (OB_SUCC(ret) && OB_TMP_FAIL(unit_mgr_->find_alter_resource_tenant_unit_num_rs_job(tenant_id, job_id, *sql_proxy_))) {
|
||||
// even if there is no corresponding rs job, we should do complete job (e.g. remove deleting unit in __all_unit table)
|
||||
if (OB_ENTRY_NOT_EXIST == tmp_ret) {
|
||||
FLOG_WARN("[ALTER_RESOURCE_TENANT_UNIT_NUM NOTICE] there exists unit_num changing without corresponding rs job",
|
||||
KR(ret), K(tmp_ret), K(tenant_id));
|
||||
} else {
|
||||
LOG_WARN("fail to execute find_alter_resource_tenant_unit_num_rs_job", KR(ret), KR(tmp_ret), K(tenant_id));
|
||||
}
|
||||
}
|
||||
if (FAILEDx(check_shrink_resource_pool_finished_by_ls_(tenant_id,
|
||||
servers, unit_ids, unit_group_ids, is_finished, check_ret))) {
|
||||
servers, unit_ids, unit_group_ids, is_finished))) {
|
||||
LOG_WARN("failed to check shrink by ls", KR(ret), K(servers), K(unit_ids), K(unit_group_ids));
|
||||
}
|
||||
if (OB_SUCC(ret) && is_finished) {
|
||||
// commit finish of the tenant
|
||||
if (OB_FAIL(commit_tenant_shrink_resource_pool_(tenant_id, job_id, check_ret))) { // shrink
|
||||
LOG_WARN("failed to shrink tenant resource pool", KR(ret), K(tenant_id), K(job_id), K(check_ret));
|
||||
if (OB_FAIL(commit_tenant_shrink_resource_pool_(tenant_id))) { // shrink
|
||||
LOG_WARN("failed to shrink tenant resource pool", KR(ret), K(tenant_id));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -211,12 +197,10 @@ int ObShrinkExpandResourcePoolChecker::check_shrink_resource_pool_finished_by_ls
|
||||
const ObIArray<common::ObAddr> &servers,
|
||||
const ObIArray<uint64_t> &unit_ids,
|
||||
const ObIArray<uint64_t> &unit_group_ids,
|
||||
bool &is_finished,
|
||||
int &check_ret)
|
||||
bool &is_finished)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
is_finished = true;
|
||||
check_ret = OB_NEED_WAIT;
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObShrinkExpandResourcePoolChecker not init", KR(ret));
|
||||
@ -230,10 +214,6 @@ int ObShrinkExpandResourcePoolChecker::check_shrink_resource_pool_finished_by_ls
|
||||
} else if (OB_ISNULL(sql_proxy_) || OB_ISNULL(unit_mgr_) || OB_ISNULL(lst_operator_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ptr is null", KR(ret), KP(sql_proxy_), KP(unit_mgr_), KP(lst_operator_));
|
||||
} else if (OB_FAIL(ObRootUtils::check_tenant_ls_balance(tenant_id, check_ret))) {
|
||||
LOG_WARN("fail to execute check_tenant_ls_balance", KR(ret), K(tenant_id));
|
||||
} else if (OB_NEED_WAIT == check_ret) {
|
||||
is_finished = false;
|
||||
} else {
|
||||
// check ls meta table for shrinking
|
||||
// to make sure that all the ls in deleting units have been migrated to some other normal units
|
||||
@ -276,10 +256,7 @@ int ObShrinkExpandResourcePoolChecker::check_shrink_resource_pool_finished_by_ls
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObShrinkExpandResourcePoolChecker::commit_tenant_shrink_resource_pool_(
|
||||
const uint64_t tenant_id,
|
||||
const int64_t job_id,
|
||||
const int check_ret)
|
||||
int ObShrinkExpandResourcePoolChecker::commit_tenant_shrink_resource_pool_(const uint64_t tenant_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
DEBUG_SYNC(BEFORE_FINISH_UNIT_NUM);
|
||||
@ -294,25 +271,21 @@ int ObShrinkExpandResourcePoolChecker::commit_tenant_shrink_resource_pool_(
|
||||
} else if (OB_ISNULL(unit_mgr_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ptr is null", KR(ret), KP(unit_mgr_));
|
||||
} else if (OB_FAIL(unit_mgr_->commit_shrink_tenant_resource_pool(tenant_id, job_id, check_ret))) {
|
||||
LOG_WARN("fail to shrink resource pool", KR(ret), K(tenant_id), K(job_id), K(check_ret));
|
||||
} else if (OB_FAIL(unit_mgr_->commit_shrink_tenant_resource_pool(tenant_id))) {
|
||||
LOG_WARN("fail to shrink resource pool", KR(ret), K(tenant_id));
|
||||
} else {} // no more to do
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObShrinkExpandResourcePoolChecker::check_and_commit_expand_resource_pool_(const uint64_t tenant_id)
|
||||
int ObShrinkExpandResourcePoolChecker::check_and_commit_rs_job_(const uint64_t tenant_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t check_job_id = 0;
|
||||
int check_ret = OB_NEED_WAIT;
|
||||
if (OB_ISNULL(sql_proxy_)) {
|
||||
if (OB_ISNULL(sql_proxy_) || OB_ISNULL(unit_mgr_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("sql_proxy_ is null", KR(ret), KP(sql_proxy_));
|
||||
} else if (OB_FAIL(RS_JOB_FIND(
|
||||
ALTER_RESOURCE_TENANT_UNIT_NUM,
|
||||
check_job_id,
|
||||
*sql_proxy_,
|
||||
"tenant_id", tenant_id))) {
|
||||
LOG_WARN("sql_proxy_ or unit_mgr_ is null", KR(ret), KP(sql_proxy_), KP(unit_mgr_));
|
||||
} else if (OB_FAIL(unit_mgr_->find_alter_resource_tenant_unit_num_rs_job(tenant_id, check_job_id, *sql_proxy_))) {
|
||||
// find the corresponding rs job at first, then check if we can complete it
|
||||
// if we only find the rs job at the committing period,
|
||||
// we do not know whether the job has been changed during checking process
|
||||
@ -332,12 +305,12 @@ int ObShrinkExpandResourcePoolChecker::check_and_commit_expand_resource_pool_(co
|
||||
if (OB_FAIL(check_stop())) {
|
||||
LOG_WARN("ObShrinkExpandResourcePoolChecker stop", KR(ret));
|
||||
} else if (OB_SUCC(RS_JOB_COMPLETE(check_job_id, check_ret, *sql_proxy_))) {
|
||||
FLOG_INFO("[ALTER_RESOURCE_TENANT_UNIT_NUM NOTICE] complete an inprogress rs job EXPAND UNIT_NUM",
|
||||
FLOG_INFO("[ALTER_RESOURCE_TENANT_UNIT_NUM NOTICE] complete an inprogress rs job",
|
||||
KR(ret), K(tenant_id), K(check_job_id), K(check_ret));
|
||||
} else {
|
||||
LOG_WARN("fail to complete rs job", KR(ret), K(tenant_id), K(check_job_id), K(check_ret));
|
||||
if (OB_EAGAIN == ret) {
|
||||
FLOG_WARN("[ALTER_RESOURCE_TENANT_UNIT_NUM NOTICE] the specified rs job EXPAND UNIT_NUM might has "
|
||||
FLOG_WARN("[ALTER_RESOURCE_TENANT_UNIT_NUM NOTICE] the specified rs job might has "
|
||||
"been already completed due to a new job or deleted in table manually",
|
||||
KR(ret), K(tenant_id), K(check_job_id), K(check_ret));
|
||||
ret = OB_SUCCESS; // no need to return the error code
|
||||
|
||||
@ -72,13 +72,9 @@ private:
|
||||
const ObIArray<common::ObAddr> &servers,
|
||||
const ObIArray<uint64_t> &unit_ids,
|
||||
const ObIArray<uint64_t> &unit_group_ids,
|
||||
bool &is_finished,
|
||||
int &check_ret);
|
||||
int commit_tenant_shrink_resource_pool_(
|
||||
const uint64_t tenant_id,
|
||||
const int64_t job_id,
|
||||
const int check_ret);
|
||||
int check_and_commit_expand_resource_pool_(const uint64_t tenant_id);
|
||||
bool &is_finished);
|
||||
int commit_tenant_shrink_resource_pool_(const uint64_t tenant_id);
|
||||
int check_and_commit_rs_job_(const uint64_t tenant_id);
|
||||
private:
|
||||
const volatile bool &is_stop_;
|
||||
common::ObMySQLProxy *sql_proxy_;
|
||||
|
||||
@ -297,16 +297,15 @@ int ObTenantBalanceService::is_ls_balance_finished(const uint64_t &tenant_id, bo
|
||||
LOG_WARN("GCTX.sql_proxy_ is null", KR(ret), KP(GCTX.sql_proxy_));
|
||||
} else if (ObAllTenantInfoProxy::is_primary_tenant(GCTX.sql_proxy_, tenant_id, is_primary)) {
|
||||
LOG_WARN("fail to execute is_primary_tenant", KR(ret), K(tenant_id));
|
||||
} else if (is_primary) {
|
||||
} else if (is_primary && ObShareUtil::is_tenant_enable_transfer(tenant_id)) {
|
||||
if (OB_FAIL(is_primary_tenant_ls_balance_finished_(tenant_id, is_finished))) {
|
||||
LOG_WARN("fail to execute is_primary_tenant_ls_balance_finished_", KR(ret), K(tenant_id));
|
||||
}
|
||||
} else {
|
||||
// standby & restore
|
||||
is_finished = true;
|
||||
// if (OB_FAIL(is_standby_tenant_ls_balance_finished_(tenant_id, is_finished))) {
|
||||
// LOG_WARN("fail to execute is_standby_tenant_ls_balance_finished_", KR(ret), K(tenant_id));
|
||||
// }
|
||||
// standby & restore & primary tenant and enable_transfer=false
|
||||
if (OB_FAIL(is_standby_tenant_ls_balance_finished_(tenant_id, is_finished))) {
|
||||
LOG_WARN("fail to execute is_standby_tenant_ls_balance_finished_", KR(ret), K(tenant_id));
|
||||
}
|
||||
}
|
||||
LOG_TRACE("check whether the tenant has balanced ls", K(ret), K(tenant_id), K(is_primary), K(is_finished));
|
||||
return ret;
|
||||
|
||||
@ -4198,10 +4198,7 @@ int ObUnitManager::get_all_unit_infos_by_tenant(const uint64_t tenant_id,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObUnitManager::commit_shrink_tenant_resource_pool(
|
||||
const uint64_t tenant_id,
|
||||
const int64_t job_id,
|
||||
const int check_ret)
|
||||
int ObUnitManager::commit_shrink_tenant_resource_pool(const uint64_t tenant_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
SpinWLockGuard guard(lock_);
|
||||
@ -4222,8 +4219,6 @@ int ObUnitManager::commit_shrink_tenant_resource_pool(
|
||||
LOG_WARN("pool ptr is null", KR(ret), KP(pools));
|
||||
} else if (OB_FAIL(trans.start(proxy_, OB_SYS_TENANT_ID))) {
|
||||
LOG_WARN("start transaction failed", KR(ret));
|
||||
} else if (OB_FAIL(complete_shrink_tenant_pool_unit_num_rs_job_(tenant_id, job_id, check_ret, trans))) {
|
||||
LOG_WARN("fail to complete rs job", KR(ret), K(tenant_id), K(job_id), K(check_ret));
|
||||
} else if (OB_FAIL(commit_shrink_resource_pool_in_trans_(*pools, trans, resource_units))) {
|
||||
LOG_WARN("failed to shrink in trans", KR(ret), KPC(pools));
|
||||
}
|
||||
|
||||
@ -232,10 +232,7 @@ public:
|
||||
common::ObIArray<share::ObUnitInfo> &unit_infos) const;
|
||||
virtual int get_deleting_units_of_pool(const uint64_t resource_pool_id,
|
||||
common::ObIArray<share::ObUnit> &units) const;
|
||||
virtual int commit_shrink_tenant_resource_pool(
|
||||
const uint64_t tenant_id,
|
||||
const int64_t job_id,
|
||||
const int check_ret);
|
||||
virtual int commit_shrink_tenant_resource_pool(const uint64_t tenant_id);
|
||||
virtual int get_all_unit_infos_by_tenant(const uint64_t tenant_id,
|
||||
common::ObIArray<share::ObUnitInfo> &unit_infos);
|
||||
virtual int get_unit_infos(const common::ObIArray<share::ObResourcePoolName> &pools,
|
||||
|
||||
Reference in New Issue
Block a user