opt alter_tenant_locality rs job

This commit is contained in:
linqiucen 2023-09-14 09:14:08 +00:00 committed by ob-robot
parent 8915b19f61
commit afb0b00db9
10 changed files with 105 additions and 111 deletions

View File

@ -26,7 +26,7 @@ using namespace common;
using namespace share;
namespace rootserver
{
OB_SERIALIZE_MEMBER((ObCommitAlterTenantLocalityArg, ObDDLArg), tenant_id_, rs_job_id_, rs_job_check_ret_);
OB_SERIALIZE_MEMBER((ObCommitAlterTenantLocalityArg, ObDDLArg), tenant_id_);
ObAlterLocalityFinishChecker::ObAlterLocalityFinishChecker(volatile bool &stop)
: inited_(false),
@ -103,9 +103,7 @@ int ObAlterLocalityFinishChecker::check()
DEBUG_SYNC(BEFORE_CHECK_LOCALITY);
bool alter_locality_finish = false;
bool meta_alter_locality_finish = false;
int check_ret = OB_NEED_WAIT;
uint64_t tenant_id = OB_INVALID_TENANT_ID;
int64_t job_id = 0;
ObCurTraceId::init(GCONF.self_addr_);
if (OB_ISNULL(tenant_schemas.at(i)) || OB_ISNULL(GCTX.sql_proxy_)) {
ret = OB_ERR_UNEXPECTED;
@ -116,26 +114,17 @@ int ObAlterLocalityFinishChecker::check()
LOG_WARN("invalid tenant schema", KR(ret), "schema", tenant_schemas.at(i));
} else if (FALSE_IT(tenant_id = tenant_schemas.at(i)->get_tenant_id())) {
// shall never be here
} else if (is_meta_tenant(tenant_id)
|| tenant_schemas.at(i)->get_previous_locality_str().empty()) {
} else if (is_meta_tenant(tenant_id)) {
continue;
} else if (OB_FAIL(find_rs_job(tenant_id, job_id, *GCTX.sql_proxy_))) {
// find the corresponding rs job at first, then check if we can complete it
// if we only find the rs job at the committing period,
// we do not know whether the job has been changed during checking process
// e.g. job 1 is the rs job before checking,
// right after checking, job 2 is created and job 1 is canceled by job 2,
// then committing process will find job 2 and complete job 2 immediately,
// which means, job 2 is completed without checking.
if (OB_ENTRY_NOT_EXIST == ret) {
FLOG_WARN("[ALTER_TENANT_LOCALITY NOTICE] there exists locality changing without corresponding rs job",
KR(ret), KPC(tenant_schemas.at(i)));
ret = OB_SUCCESS;
} else {
LOG_WARN("fail to find rs job", KR(ret), K(tenant_id));
} else if (tenant_schemas.at(i)->get_previous_locality_str().empty()) {
// two possibilities
// 1. we cannot find any inprogress ALTER_TENANT_LOCALITY in __all_rootservice_job
// 2. there is an inprogress rs job ALTER_TENANT_LOCALITY, all tasks related to disaster recovery
// (e.g. add missing ls replicas, remove redundant ls replicas) have been finished
// the last thing to do is that check whether ls are balanced and complete the rs job if yes
if (OB_FAIL(ObRootUtils::check_and_commit_rs_job(tenant_id, ObRsJobType::JOB_TYPE_ALTER_TENANT_LOCALITY))) {
LOG_WARN("fail to check and commit rs job", KR(ret));
}
}
if (OB_FAIL(ret)) {
} else if (OB_SUCCESS != (tmp_ret = ObDRWorker::check_tenant_locality_match(
tenant_id,
*unit_mgr_,
@ -150,18 +139,13 @@ int ObAlterLocalityFinishChecker::check()
meta_alter_locality_finish))){
LOG_WARN("fail to check tenant locality match", KR(tmp_ret), "meta_tenant_id",
gen_meta_tenant_id(tenant_id), K(meta_alter_locality_finish));
} else if (OB_FAIL(ObRootUtils::check_tenant_ls_balance(tenant_id, check_ret))) {
LOG_WARN("fail to execute check_tenant_ls_balance", KR(ret), K(tenant_id));
} else if (alter_locality_finish
&& OB_NEED_WAIT != check_ret
&& (meta_alter_locality_finish || is_sys_tenant(tenant_id))) {
DEBUG_SYNC(BEFORE_FINISH_LOCALITY);
const int64_t timeout = GCONF.internal_sql_execute_timeout; // 30s default
rootserver::ObCommitAlterTenantLocalityArg arg;
arg.tenant_id_ = tenant_id;
arg.exec_tenant_id_ = OB_SYS_TENANT_ID;
arg.rs_job_id_ = job_id;
arg.rs_job_check_ret_ = check_ret;
if (OB_FAIL(check_stop())) {
LOG_WARN("ObAlterLocalityFinishChecker stopped", KR(ret));
} else if (OB_SUCCESS != (tmp_ret = common_rpc_proxy_->to(self_).timeout(timeout).commit_alter_tenant_locality(arg))) {
@ -204,5 +188,6 @@ int ObAlterLocalityFinishChecker::check_stop() const
}
return ret;
}
} // end namespace rootserver
} // end namespace oceanbase

View File

@ -41,16 +41,11 @@ struct ObCommitAlterTenantLocalityArg : public obrpc::ObDDLArg
{
OB_UNIS_VERSION(1);
public:
ObCommitAlterTenantLocalityArg() :
tenant_id_(common::OB_INVALID_ID),
rs_job_id_(0),
rs_job_check_ret_(OB_NEED_WAIT) {}
ObCommitAlterTenantLocalityArg() : tenant_id_(common::OB_INVALID_ID) {}
bool is_valid() const { return common::OB_INVALID_ID != tenant_id_;}
TO_STRING_KV(K_(tenant_id), K_(rs_job_id), K_(rs_job_check_ret));
TO_STRING_KV(K_(tenant_id));
uint64_t tenant_id_;
int64_t rs_job_id_;
int rs_job_check_ret_;
};
class ObAlterLocalityFinishChecker : public share::ObCheckStopProvider
@ -66,14 +61,13 @@ public:
ObUnitManager &unit_mgr,
ObZoneManager &zone_mgr,
common::ObMySQLProxy &sql_proxy,
share::ObLSTableOperator &lst_operator);
share::ObLSTableOperator &lst_operator);
int check();
static int find_rs_job(const uint64_t tenant_id, int64_t &job_id, ObISQLClient &sql_proxy);
private:
//check whether this checker is stopped
virtual int check_stop() const override;
private:
bool inited_;
share::schema::ObMultiVersionSchemaService *schema_service_;

View File

@ -23453,29 +23453,6 @@ int ObDDLService::commit_alter_tenant_locality(
LOG_WARN("fail to alter meta tenant", KR(ret));
}
}
int return_code = arg.rs_job_check_ret_;
int64_t job_id = arg.rs_job_id_;
if (OB_FAIL(ret)) {
} else if (OB_SUCC(RS_JOB_COMPLETE(job_id, return_code, trans))) {
FLOG_INFO("[ALTER_TENANT_LOCALITY NOTICE] complete an inprogress rs job", KR(ret),
K(arg), K(return_code));
} else {
LOG_WARN("fail to complete rs job", KR(ret), K(job_id), K(return_code));
if (OB_EAGAIN == ret) {
int64_t find_job_id = 0;
if (OB_FAIL(ObAlterLocalityFinishChecker::find_rs_job(arg.tenant_id_, find_job_id, trans))) {
LOG_WARN("fail to find rs job", KR(ret), K(arg));
if (OB_ENTRY_NOT_EXIST == ret) {
FLOG_WARN("[ALTER_TENANT_LOCALITY NOTICE] the specified rs job might has "
"been already deleted in table manually", KR(ret), K(arg), K(return_code));
ret = OB_SUCCESS;
}
} else {
ret = OB_EAGAIN;
FLOG_WARN("[ALTER_TENANT_LOCALITY NOTICE] a non-checked rs job cannot be committed", KR(ret), K(arg), K(find_job_id));
}
}
}
}
int temp_ret = OB_SUCCESS;
if (OB_SUCCESS != (temp_ret = trans.end(OB_SUCC(ret)))) {
@ -24817,16 +24794,22 @@ int ObDDLService::record_tenant_locality_event_history(
if (ALTER_LOCALITY_OP_INVALID == alter_locality_op) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid alter locality op", K(ret), K(alter_locality_op));
} else if (ROLLBACK_ALTER_LOCALITY == alter_locality_op) {
} else {
int64_t job_id = 0;
if (OB_FAIL(ObAlterLocalityFinishChecker::find_rs_job(tenant_schema.get_tenant_id(), job_id, trans))) {
LOG_WARN("failed to find rs job", K(ret), "tenant_id", tenant_schema.get_tenant_id());
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_SUCCESS;
} else {
LOG_WARN("fail to find rs job", K(ret), "tenant_id", tenant_schema.get_tenant_id());
}
} else {
ret = RS_JOB_COMPLETE(job_id, OB_CANCELED, trans); // The change task is rolled back, this change failed
ret = RS_JOB_COMPLETE(job_id, OB_CANCELED, trans);
FLOG_INFO("[ALTER_TENANT_LOCALITY NOTICE] cancel an old inprogress rs job due to rollback", KR(ret),
"tenant_id", tenant_schema.get_tenant_id(), K(job_id));
}
if (FAILEDx(GET_MIN_DATA_VERSION(OB_SYS_TENANT_ID, tenant_data_version))) {
}
if (OB_SUCC(ret) && ROLLBACK_ALTER_LOCALITY == alter_locality_op) {
if (OB_FAIL(GET_MIN_DATA_VERSION(OB_SYS_TENANT_ID, tenant_data_version))) {
LOG_WARN("fail to get sys tenant's min data version", KR(ret));
} else if (tenant_data_version < DATA_VERSION_4_2_1_0) {
job_type = ObRsJobType::JOB_TYPE_ROLLBACK_ALTER_TENANT_LOCALITY;

View File

@ -2272,6 +2272,78 @@ int ObRootUtils::is_first_priority_primary_zone_changed(
return ret;
}
int ObRootUtils::check_and_commit_rs_job(const uint64_t tenant_id, const ObRsJobType rs_job_type)
{
int ret = OB_SUCCESS;
int64_t rs_job_id = 0;
int check_ret = OB_NEED_WAIT;
const char* rs_job_type_str = NULL;
if (OB_ISNULL(GCTX.sql_proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("GCTX.sql_proxy_ is null", KR(ret), KP(GCTX.sql_proxy_));
} else if (OB_UNLIKELY(!ObRsJobTableOperator::is_valid_job_type(rs_job_type)
|| NULL == (rs_job_type_str = ObRsJobTableOperator::get_job_type_str(rs_job_type)))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid job type", KR(ret), K(rs_job_type), KP(rs_job_type_str));
} else if (OB_FAIL(find_rs_job(tenant_id, rs_job_type, rs_job_id))) {
// find the corresponding rs job at first, then check if we can complete it
// if we only find the rs job at the committing period,
// we do not know whether the job has been changed during checking process
// e.g. job 1 is the rs job before checking,
// right after checking, job 2 is created and job 1 is canceled by job 2,
// then committing process will find job 2 and complete job 2 immediately,
// which means, job 2 is completed without checking.
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_SUCCESS;
} else {
LOG_WARN("fail to find rs job", KR(ret), K(tenant_id), K(rs_job_type), K(rs_job_type_str));
}
} else if (OB_FAIL(ObRootUtils::check_tenant_ls_balance(tenant_id, check_ret))) {
LOG_WARN("fail to execute check_tenant_ls_balance", KR(ret), K(tenant_id));
} else if (OB_NEED_WAIT != check_ret) {
if (OB_SUCC(RS_JOB_COMPLETE(rs_job_id, check_ret, *GCTX.sql_proxy_))) {
FLOG_INFO("[CHECK_AND_COMMIT_RS_JOB NOTICE] complete an inprogress rs job",
KR(ret), K(tenant_id), K(rs_job_id), K(check_ret), K(rs_job_type), K(rs_job_type_str));
} else {
LOG_WARN("fail to complete rs job", KR(ret), K(tenant_id), K(rs_job_id), K(check_ret),
K(rs_job_type), K(rs_job_type_str));
if (OB_EAGAIN == ret) {
FLOG_WARN("[CHECK_AND_COMMIT_RS_JOB NOTICE] the specified rs job might has "
"been already completed due to a new job or deleted in table manually",
KR(ret), K(tenant_id), K(rs_job_id), K(check_ret), K(rs_job_type), K(rs_job_type_str));
ret = OB_SUCCESS; // no need to return the error code
}
}
}
return ret;
}
int ObRootUtils::find_rs_job(const uint64_t tenant_id, const ObRsJobType rs_job_type, int64_t &rs_job_id)
{
int ret = OB_SUCCESS;
rs_job_id = 0;
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || !ObRsJobTableOperator::is_valid_job_type(rs_job_type))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(rs_job_type));
} else if (OB_ISNULL(GCTX.sql_proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("GCTX.sql_proxy_ is null", KR(ret), KP(GCTX.sql_proxy_));
} else {
switch (rs_job_type) {
case ObRsJobType::JOB_TYPE_ALTER_TENANT_LOCALITY:
ret = ObAlterLocalityFinishChecker::find_rs_job(tenant_id, rs_job_id, *GCTX.sql_proxy_);
break;
case ObRsJobType::JOB_TYPE_ALTER_RESOURCE_TENANT_UNIT_NUM:
ret = ObUnitManager::find_alter_resource_tenant_unit_num_rs_job(tenant_id, rs_job_id, *GCTX.sql_proxy_);
break;
default: ret = OB_NOT_SUPPORTED;
}
if (OB_FAIL(ret)) {
LOG_WARN("fail to find rs job", KR(ret), K(tenant_id), K(rs_job_type));
}
}
return ret;
}
///////////////////////////////
ObClusterRole ObClusterInfoGetter::get_cluster_role_v2()

View File

@ -651,6 +651,8 @@ public:
ObIArray<ObZone> &orig_first_primary_zone,
ObIArray<ObZone> &new_first_primary_zone,
bool &is_changed);
static int check_and_commit_rs_job(const uint64_t tenant_id, const ObRsJobType rs_job_type);
static int find_rs_job(const uint64_t tenant_id, const ObRsJobType rs_job_type, int64_t &rs_job_id);
template<class T>
static int check_left_f_in_primary_zone(ObZoneManager &zone_mgr,

View File

@ -133,8 +133,8 @@ int ObShrinkExpandResourcePoolChecker::check_shrink_resource_pool_finished_by_te
// or a SHRINK task which has cleared deleting units in __all_unit table
// check whether this task can be committed (i.e. ls is balanced)
// if not exists, return OB_SUCCESS
if (OB_FAIL(check_and_commit_rs_job_(tenant_id))) {
LOG_WARN("fail to execute check_and_commit_rs_job_", KR(ret), K(tenant_id));
if (OB_FAIL(ObRootUtils::check_and_commit_rs_job(tenant_id, ObRsJobType::JOB_TYPE_ALTER_RESOURCE_TENANT_UNIT_NUM))) {
LOG_WARN("fail to execute check_and_commit_rs_job", KR(ret), K(tenant_id));
}
} else {
//check shrink finish
@ -276,48 +276,5 @@ int ObShrinkExpandResourcePoolChecker::commit_tenant_shrink_resource_pool_(const
} else {} // no more to do
return ret;
}
int ObShrinkExpandResourcePoolChecker::check_and_commit_rs_job_(const uint64_t tenant_id)
{
int ret = OB_SUCCESS;
int64_t check_job_id = 0;
int check_ret = OB_NEED_WAIT;
if (OB_ISNULL(sql_proxy_) || OB_ISNULL(unit_mgr_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sql_proxy_ or unit_mgr_ is null", KR(ret), KP(sql_proxy_), KP(unit_mgr_));
} else if (OB_FAIL(unit_mgr_->find_alter_resource_tenant_unit_num_rs_job(tenant_id, check_job_id, *sql_proxy_))) {
// find the corresponding rs job at first, then check if we can complete it
// if we only find the rs job at the committing period,
// we do not know whether the job has been changed during checking process
// e.g. job 1 is the rs job before checking,
// right after checking, job 2 is created and job 1 is canceled by job 2,
// then committing process will find job 2 and complete job 2 immediately,
// which means, job 2 is completed without checking.
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_SUCCESS;
} else {
LOG_WARN("fail to find rs job", KR(ret), K(tenant_id));
}
} else if (OB_FAIL(ObRootUtils::check_tenant_ls_balance(tenant_id, check_ret))) {
LOG_WARN("fail to execute check_tenant_ls_balance", KR(ret), K(tenant_id));
} else if (OB_NEED_WAIT != check_ret) {
DEBUG_SYNC(BEFORE_FINISH_UNIT_NUM);
if (OB_FAIL(check_stop())) {
LOG_WARN("ObShrinkExpandResourcePoolChecker stop", KR(ret));
} else if (OB_SUCC(RS_JOB_COMPLETE(check_job_id, check_ret, *sql_proxy_))) {
FLOG_INFO("[ALTER_RESOURCE_TENANT_UNIT_NUM NOTICE] complete an inprogress rs job",
KR(ret), K(tenant_id), K(check_job_id), K(check_ret));
} else {
LOG_WARN("fail to complete rs job", KR(ret), K(tenant_id), K(check_job_id), K(check_ret));
if (OB_EAGAIN == ret) {
FLOG_WARN("[ALTER_RESOURCE_TENANT_UNIT_NUM NOTICE] the specified rs job might has "
"been already completed due to a new job or deleted in table manually",
KR(ret), K(tenant_id), K(check_job_id), K(check_ret));
ret = OB_SUCCESS; // no need to return the error code
}
}
}
return ret;
}
} // end namespace rootserver
} // end oceanbase

View File

@ -74,7 +74,6 @@ private:
const ObIArray<uint64_t> &unit_group_ids,
bool &is_finished);
int commit_tenant_shrink_resource_pool_(const uint64_t tenant_id);
int check_and_commit_rs_job_(const uint64_t tenant_id);
private:
const volatile bool &is_stop_;
common::ObMySQLProxy *sql_proxy_;

View File

@ -171,7 +171,7 @@ public:
const uint64_t tenant_id,
const int64_t new_unit_num,
const common::ObIArray<uint64_t> &unit_group_id_array);
int find_alter_resource_tenant_unit_num_rs_job(
static int find_alter_resource_tenant_unit_num_rs_job(
const uint64_t tenant_id,
int64_t &job_id,
common::ObISQLClient &sql_proxy);

View File

@ -760,7 +760,7 @@ int ObInnerTableSchema::dba_ob_tenant_jobs_schema(ObTableSchema &table_schema)
table_schema.set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset()));
if (OB_SUCC(ret)) {
if (OB_FAIL(table_schema.set_view_definition(R"__( SELECT JOB_ID, JOB_TYPE, JOB_STATUS, RESULT_CODE, PROGRESS, gmt_create AS START_TIME, gmt_modified AS MODIFY_TIME, TENANT_ID, SQL_TEXT, EXTRA_INFO, RS_SVR_IP, RS_SVR_PORT FROM oceanbase.__all_rootservice_job WHERE JOB_TYPE in ( 'ALTER_TENANT_LOCALITY', 'ROLLBACK_ALTER_TENANT_LOCALITY', 'SHRINK_RESOURCE_TENANT_UNIT_NUM' ) )__"))) {
if (OB_FAIL(table_schema.set_view_definition(R"__( SELECT JOB_ID, JOB_TYPE, JOB_STATUS, RESULT_CODE, PROGRESS, gmt_create AS START_TIME, gmt_modified AS MODIFY_TIME, TENANT_ID, SQL_TEXT, EXTRA_INFO, RS_SVR_IP, RS_SVR_PORT FROM oceanbase.__all_rootservice_job WHERE JOB_TYPE in ( 'ALTER_TENANT_LOCALITY', 'ROLLBACK_ALTER_TENANT_LOCALITY', 'SHRINK_RESOURCE_TENANT_UNIT_NUM', 'ALTER_TENANT_PRIMARY_ZONE', 'ALTER_RESOURCE_TENANT_UNIT_NUM' ) )__"))) {
LOG_ERROR("fail to set view_definition", K(ret));
}
}

View File

@ -17467,7 +17467,9 @@ WHERE
JOB_TYPE in (
'ALTER_TENANT_LOCALITY',
'ROLLBACK_ALTER_TENANT_LOCALITY',
'SHRINK_RESOURCE_TENANT_UNIT_NUM'
'SHRINK_RESOURCE_TENANT_UNIT_NUM',
'ALTER_TENANT_PRIMARY_ZONE',
'ALTER_RESOURCE_TENANT_UNIT_NUM'
)
""".replace("\n", " ")
)