fix rs obtests
This commit is contained in:
parent
8dc7d2981d
commit
403d9c483b
@ -19,6 +19,7 @@
|
||||
#include "rootserver/ob_disaster_recovery_worker.h" // for ObDRWorker LocalityMap
|
||||
#include "rootserver/ob_disaster_recovery_info.h" // for DRLSInfo
|
||||
#include "share/schema/ob_schema_mgr.h" // for SimpleTenantSchema
|
||||
#include "rootserver/ob_root_service.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -80,6 +81,7 @@ int ObAlterLocalityFinishChecker::check()
|
||||
ObArray<const ObSimpleTenantSchema *> tenant_schemas;
|
||||
LOG_INFO("start to check alter locality finish");
|
||||
//STEP 0: previous check
|
||||
int64_t rs_job_id = 0;
|
||||
if (OB_UNLIKELY(!inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObAlterLocalityFinishChecker not init", KR(ret));
|
||||
@ -103,6 +105,7 @@ int ObAlterLocalityFinishChecker::check()
|
||||
DEBUG_SYNC(BEFORE_CHECK_LOCALITY);
|
||||
bool alter_locality_finish = false;
|
||||
bool meta_alter_locality_finish = false;
|
||||
bool is_previous_locality_empty = true;
|
||||
uint64_t tenant_id = OB_INVALID_TENANT_ID;
|
||||
ObCurTraceId::init(GCONF.self_addr_);
|
||||
if (OB_ISNULL(tenant_schemas.at(i)) || OB_ISNULL(GCTX.sql_proxy_)) {
|
||||
@ -116,16 +119,25 @@ int ObAlterLocalityFinishChecker::check()
|
||||
// shall never be here
|
||||
} else if (is_meta_tenant(tenant_id)) {
|
||||
continue;
|
||||
} else if (tenant_schemas.at(i)->get_previous_locality_str().empty()) {
|
||||
// two possibilities
|
||||
// 1. we cannot find any inprogress ALTER_TENANT_LOCALITY in __all_rootservice_job
|
||||
// 2. there is an inprogress rs job ALTER_TENANT_LOCALITY, all tasks related to disaster recovery
|
||||
// (e.g. add missing ls replicas, remove redundant ls replicas) have been finished
|
||||
// the last thing to do is that check whether ls are balanced and complete the rs job if yes
|
||||
if (OB_FAIL(ObRootUtils::check_and_commit_rs_job(tenant_id, ObRsJobType::JOB_TYPE_ALTER_TENANT_LOCALITY))) {
|
||||
LOG_WARN("fail to check and commit rs job", KR(ret));
|
||||
} else if (OB_TMP_FAIL(find_rs_job(tenant_id, rs_job_id, *GCTX.sql_proxy_))) {
|
||||
if (OB_ENTRY_NOT_EXIST == tmp_ret) {
|
||||
tmp_ret = OB_SUCCESS;
|
||||
rs_job_id = 0;
|
||||
} else {
|
||||
LOG_WARN("fail to find rs job", KR(ret), KR(tmp_ret), K(tenant_id), K(rs_job_id));
|
||||
}
|
||||
} else if (OB_SUCCESS != (tmp_ret = ObDRWorker::check_tenant_locality_match(
|
||||
}
|
||||
if (OB_FAIL(ret) || OB_SUCCESS != tmp_ret) {
|
||||
} else if (OB_TMP_FAIL(check_tenant_previous_locality_(tenant_id, is_previous_locality_empty))) {
|
||||
LOG_WARN("fail to execute check_tenant_previous_locality_", KR(ret), K(tenant_id));
|
||||
} else if (is_previous_locality_empty) {
|
||||
if (0 != rs_job_id && OB_TMP_FAIL(ObRootUtils::check_ls_balance_and_commit_rs_job(
|
||||
tenant_id,
|
||||
rs_job_id,
|
||||
ObRsJobType::JOB_TYPE_ALTER_TENANT_LOCALITY))) {
|
||||
LOG_WARN("fail to execute check_ls_balance_and_commit_rs_job", KR(ret), K(tenant_id), K(rs_job_id));
|
||||
}
|
||||
} else if (OB_SUCCESS != (tmp_ret = ObDRWorker::check_tenant_locality_match(
|
||||
tenant_id,
|
||||
*unit_mgr_,
|
||||
*zone_mgr_,
|
||||
@ -189,5 +201,30 @@ int ObAlterLocalityFinishChecker::check_stop() const
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObAlterLocalityFinishChecker::check_tenant_previous_locality_(
|
||||
const uint64_t tenant_id,
|
||||
bool &is_previous_locality_empty)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSchemaGetterGuard tenant_schema_guard;
|
||||
const ObTenantSchema *tenant_schema = NULL;
|
||||
ObRootService *root_service = GCTX.root_service_;
|
||||
is_previous_locality_empty= true;
|
||||
if (OB_ISNULL(root_service)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("root_service is null", KR(ret), KP(root_service));
|
||||
} else if (OB_FAIL(root_service->get_ddl_service().get_tenant_schema_guard_with_version_in_inner_table(OB_SYS_TENANT_ID, tenant_schema_guard))) {
|
||||
LOG_WARN("fail to get tenant schema guard", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(tenant_schema_guard.get_tenant_info(tenant_id, tenant_schema))) {
|
||||
LOG_WARN("fail to get tenant schema", KR(ret), K(tenant_id));
|
||||
} else if (OB_ISNULL(tenant_schema)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("tenant_schema is null", KR(ret), KP(tenant_schema));
|
||||
} else {
|
||||
is_previous_locality_empty = tenant_schema->get_previous_locality_str().empty();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
} // end namespace rootserver
|
||||
} // end namespace oceanbase
|
||||
|
@ -68,6 +68,7 @@ public:
|
||||
private:
|
||||
//check whether this checker is stopped
|
||||
virtual int check_stop() const override;
|
||||
virtual int check_tenant_previous_locality_(const uint64_t tenant_id, bool &is_previous_locality_empty);
|
||||
private:
|
||||
bool inited_;
|
||||
share::schema::ObMultiVersionSchemaService *schema_service_;
|
||||
|
@ -2272,43 +2272,34 @@ int ObRootUtils::is_first_priority_primary_zone_changed(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRootUtils::check_and_commit_rs_job(const uint64_t tenant_id, const ObRsJobType rs_job_type)
|
||||
int ObRootUtils::check_ls_balance_and_commit_rs_job(
|
||||
const uint64_t tenant_id,
|
||||
const int64_t rs_job_id,
|
||||
const ObRsJobType rs_job_type)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t rs_job_id = 0;
|
||||
int check_ret = OB_NEED_WAIT;
|
||||
const char* rs_job_type_str = NULL;
|
||||
if (OB_ISNULL(GCTX.sql_proxy_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("GCTX.sql_proxy_ is null", KR(ret), KP(GCTX.sql_proxy_));
|
||||
} else if (OB_UNLIKELY(!ObRsJobTableOperator::is_valid_job_type(rs_job_type)
|
||||
|| NULL == (rs_job_type_str = ObRsJobTableOperator::get_job_type_str(rs_job_type)))) {
|
||||
|| NULL == (rs_job_type_str = ObRsJobTableOperator::get_job_type_str(rs_job_type))
|
||||
|| rs_job_id < 1)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid job type", KR(ret), K(rs_job_type), KP(rs_job_type_str));
|
||||
} else if (OB_FAIL(find_rs_job(tenant_id, rs_job_type, rs_job_id))) {
|
||||
// find the corresponding rs job at first, then check if we can complete it
|
||||
// if we only find the rs job at the committing period,
|
||||
// we do not know whether the job has been changed during checking process
|
||||
// e.g. job 1 is the rs job before checking,
|
||||
// right after checking, job 2 is created and job 1 is canceled by job 2,
|
||||
// then committing process will find job 2 and complete job 2 immediately,
|
||||
// which means, job 2 is completed without checking.
|
||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
LOG_WARN("fail to find rs job", KR(ret), K(tenant_id), K(rs_job_type), K(rs_job_type_str));
|
||||
}
|
||||
LOG_WARN("invalid job type", KR(ret), K(rs_job_type), KP(rs_job_type_str), K(rs_job_id));
|
||||
} else if (OB_FAIL(ObRootUtils::check_tenant_ls_balance(tenant_id, check_ret))) {
|
||||
LOG_WARN("fail to execute check_tenant_ls_balance", KR(ret), K(tenant_id));
|
||||
} else if (OB_NEED_WAIT != check_ret) {
|
||||
DEBUG_SYNC(BEFORE_FINISH_UNIT_NUM);
|
||||
if (OB_SUCC(RS_JOB_COMPLETE(rs_job_id, check_ret, *GCTX.sql_proxy_))) {
|
||||
FLOG_INFO("[CHECK_AND_COMMIT_RS_JOB NOTICE] complete an inprogress rs job",
|
||||
FLOG_INFO("[COMMIT_RS_JOB NOTICE] complete an inprogress rs job",
|
||||
KR(ret), K(tenant_id), K(rs_job_id), K(check_ret), K(rs_job_type), K(rs_job_type_str));
|
||||
} else {
|
||||
LOG_WARN("fail to complete rs job", KR(ret), K(tenant_id), K(rs_job_id), K(check_ret),
|
||||
K(rs_job_type), K(rs_job_type_str));
|
||||
if (OB_EAGAIN == ret) {
|
||||
FLOG_WARN("[CHECK_AND_COMMIT_RS_JOB NOTICE] the specified rs job might has "
|
||||
FLOG_WARN("[COMMIT_RS_JOB NOTICE] the specified rs job might has "
|
||||
"been already completed due to a new job or deleted in table manually",
|
||||
KR(ret), K(tenant_id), K(rs_job_id), K(check_ret), K(rs_job_type), K(rs_job_type_str));
|
||||
ret = OB_SUCCESS; // no need to return the error code
|
||||
@ -2317,32 +2308,6 @@ int ObRootUtils::check_and_commit_rs_job(const uint64_t tenant_id, const ObRsJob
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
int ObRootUtils::find_rs_job(const uint64_t tenant_id, const ObRsJobType rs_job_type, int64_t &rs_job_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
rs_job_id = 0;
|
||||
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || !ObRsJobTableOperator::is_valid_job_type(rs_job_type))) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(rs_job_type));
|
||||
} else if (OB_ISNULL(GCTX.sql_proxy_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("GCTX.sql_proxy_ is null", KR(ret), KP(GCTX.sql_proxy_));
|
||||
} else {
|
||||
switch (rs_job_type) {
|
||||
case ObRsJobType::JOB_TYPE_ALTER_TENANT_LOCALITY:
|
||||
ret = ObAlterLocalityFinishChecker::find_rs_job(tenant_id, rs_job_id, *GCTX.sql_proxy_);
|
||||
break;
|
||||
case ObRsJobType::JOB_TYPE_ALTER_RESOURCE_TENANT_UNIT_NUM:
|
||||
ret = ObUnitManager::find_alter_resource_tenant_unit_num_rs_job(tenant_id, rs_job_id, *GCTX.sql_proxy_);
|
||||
break;
|
||||
default: ret = OB_NOT_SUPPORTED;
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
LOG_WARN("fail to find rs job", KR(ret), K(tenant_id), K(rs_job_type));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
///////////////////////////////
|
||||
|
||||
|
@ -651,8 +651,10 @@ public:
|
||||
ObIArray<ObZone> &orig_first_primary_zone,
|
||||
ObIArray<ObZone> &new_first_primary_zone,
|
||||
bool &is_changed);
|
||||
static int check_and_commit_rs_job(const uint64_t tenant_id, const ObRsJobType rs_job_type);
|
||||
static int find_rs_job(const uint64_t tenant_id, const ObRsJobType rs_job_type, int64_t &rs_job_id);
|
||||
static int check_ls_balance_and_commit_rs_job(
|
||||
const uint64_t tenant_id,
|
||||
const int64_t rs_job_id,
|
||||
const ObRsJobType rs_job_type);
|
||||
|
||||
template<class T>
|
||||
static int check_left_f_in_primary_zone(ObZoneManager &zone_mgr,
|
||||
|
@ -108,6 +108,7 @@ int ObShrinkExpandResourcePoolChecker::check_shrink_resource_pool_finished_by_te
|
||||
ObArray<uint64_t> pool_ids;
|
||||
bool in_shrinking = true;
|
||||
bool is_finished = true;
|
||||
int64_t rs_job_id = 0;
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObShrinkExpandResourcePoolChecker not init", KR(ret));
|
||||
@ -124,17 +125,27 @@ int ObShrinkExpandResourcePoolChecker::check_shrink_resource_pool_finished_by_te
|
||||
} else if (OB_UNLIKELY(0 == pool_ids.count())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("failed to get tenant resource pool", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(unit_mgr_->check_pool_in_shrinking(pool_ids.at(0), in_shrinking))) {
|
||||
} else if (OB_FAIL(unit_mgr_->find_alter_resource_tenant_unit_num_rs_job(tenant_id, rs_job_id, *sql_proxy_))) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
rs_job_id = 0;
|
||||
} else {
|
||||
LOG_WARN("fail to find rs job", KR(ret), K(tenant_id), K(rs_job_id));
|
||||
}
|
||||
}
|
||||
if (FAILEDx(unit_mgr_->check_pool_in_shrinking(pool_ids.at(0), in_shrinking))) {
|
||||
LOG_WARN("failed to check resource pool in shrink", KR(ret), K(pool_ids));
|
||||
} else if (!in_shrinking) {
|
||||
// check if there exists ALTER_RESOURCE_TENANT_UNIT_NUM rs job
|
||||
// if exists
|
||||
// not in_shrinking means that the rs job created by a EXPAND task
|
||||
// or a SHRINK task which has cleared deleting units in __all_unit table
|
||||
// check whether this task can be committed (i.e. ls is balanced)
|
||||
// if not exists, return OB_SUCCESS
|
||||
if (OB_FAIL(ObRootUtils::check_and_commit_rs_job(tenant_id, ObRsJobType::JOB_TYPE_ALTER_RESOURCE_TENANT_UNIT_NUM))) {
|
||||
LOG_WARN("fail to execute check_and_commit_rs_job", KR(ret), K(tenant_id));
|
||||
if (0 != rs_job_id
|
||||
&& OB_FAIL(ObRootUtils::check_ls_balance_and_commit_rs_job(
|
||||
tenant_id,
|
||||
rs_job_id,
|
||||
ObRsJobType::JOB_TYPE_ALTER_RESOURCE_TENANT_UNIT_NUM))) {
|
||||
LOG_WARN("fail to execute check_ls_balance_and_commit_rs_job", KR(ret), K(tenant_id));
|
||||
}
|
||||
} else {
|
||||
//check shrink finish
|
||||
|
Loading…
x
Reference in New Issue
Block a user