new rs job related to tenant LOCALITY, PRIMARY ZONE, UNIT_NUM

This commit is contained in:
linqiucen
2023-08-30 12:14:02 +00:00
committed by ob-robot
parent f2c4960e71
commit 185d056d52
31 changed files with 1429 additions and 481 deletions

View File

@ -1468,6 +1468,67 @@ int ObUnitManager::determine_alter_resource_tenant_unit_num_type(
return ret;
}
int ObUnitManager::register_alter_resource_tenant_unit_num_rs_job(
const uint64_t tenant_id,
const int64_t new_unit_num,
const AlterUnitNumType alter_unit_num_type,
common::ObMySQLTransaction &trans)
{
int ret = OB_SUCCESS;
uint64_t tenant_data_version = 0;
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id)
|| !(AUN_SHRINK == alter_unit_num_type || AUN_EXPAND == alter_unit_num_type))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(alter_unit_num_type));
} else if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, tenant_data_version))) {
LOG_WARN("fail to get min data version", KR(ret), K(tenant_id));
} else if (tenant_data_version < DATA_VERSION_4_2_1_0) {
if (AUN_EXPAND == alter_unit_num_type) {
// skip, no rs job for expand task in version 4.1
// we do not need to rollback rs job, since in 4.1, there is no inprogress rs job at this step
} else if (OB_FAIL(register_shrink_tenant_pool_unit_num_rs_job(tenant_id, new_unit_num, trans))) {
LOG_WARN("fail to execute register_shrink_tenant_pool_unit_num_rs_job", KR(ret),
K(tenant_id), K(new_unit_num));
}
} else { // tenant_data_version >= DATA_VERSION_4_2_1_0
// step 1: cancel rs job if exists
int64_t job_id = 0;
if (!ObShareUtil::is_tenant_enable_rebalance(tenant_id)) {
ret = OB_OP_NOT_ALLOW;
LOG_WARN("enable_rebalance is disabled, modify tenant unit num not allowed", KR(ret), K(tenant_id));
(void) print_user_error_(tenant_id);
} else if(OB_FAIL(cancel_alter_resource_tenant_unit_num_rs_job(tenant_id, trans))) {
LOG_WARN("fail to execute cancel_alter_resource_tenant_unit_num_rs_job", KR(ret), K(tenant_id));
} else {
ret = create_alter_resource_tenant_unit_num_rs_job(tenant_id, new_unit_num, job_id, trans);
FLOG_INFO("[ALTER_RESOURCE_TENANT_UNIT_NUM NOTICE] create a new rs job", "type",
AUN_EXPAND == alter_unit_num_type ? "EXPAND UNIT_NUM" : "SHRINK UNIT_NUM",
KR(ret), K(tenant_id), K(job_id), K(alter_unit_num_type));
}
}
return ret;
}
int ObUnitManager::find_alter_resource_tenant_unit_num_rs_job(
const uint64_t tenant_id,
int64_t &job_id,
common::ObISQLClient &sql_proxy)
{
int ret = OB_SUCCESS;
ret = RS_JOB_FIND(
ALTER_RESOURCE_TENANT_UNIT_NUM,
job_id,
sql_proxy,
"tenant_id", tenant_id);
if (OB_ENTRY_NOT_EXIST == ret) {
ret = RS_JOB_FIND(
SHRINK_RESOURCE_TENANT_UNIT_NUM, // only created in version < 4.2
job_id,
sql_proxy,
"tenant_id", tenant_id);
}
return ret;
}
int ObUnitManager::register_shrink_tenant_pool_unit_num_rs_job(
const uint64_t tenant_id,
const int64_t new_unit_num,
@ -1475,44 +1536,136 @@ int ObUnitManager::register_shrink_tenant_pool_unit_num_rs_job(
{
int ret = OB_SUCCESS;
int64_t pos = 0;
const int64_t extra_info_len = common::MAX_ROOTSERVICE_EVENT_EXTRA_INFO_LENGTH;
char extra_info[common::MAX_ROOTSERVICE_EVENT_EXTRA_INFO_LENGTH] = {0};
if (OB_SUCCESS != (ret = databuff_printf(extra_info, extra_info_len, pos,
"new_unit_num: %ld", new_unit_num))) {
if (OB_SIZE_OVERFLOW == ret) {
LOG_WARN("format to buff size overflow", K(ret));
} else {
LOG_WARN("format new unit num failed", K(ret));
}
}
if (OB_SUCC(ret)) {
int64_t job_id = RS_JOB_CREATE(SHRINK_RESOURCE_TENANT_UNIT_NUM, trans,
"tenant_id", tenant_id,
"extra_info", extra_info);
if (job_id < 1) {
ret = OB_SQL_OPT_ERROR;
LOG_WARN("insert into all_rootservice_job failed", K(ret));
}
int64_t job_id = 0;
if (!ObShareUtil::is_tenant_enable_rebalance(tenant_id)) {
ret = OB_OP_NOT_ALLOW;
LOG_WARN("enable_rebalance is disabled, modify tenant unit num not allowed", KR(ret), K(tenant_id));
(void) print_user_error_(tenant_id);
} else {
ret = create_alter_resource_tenant_unit_num_rs_job(
tenant_id,
new_unit_num,
job_id,
trans,
JOB_TYPE_SHRINK_RESOURCE_TENANT_UNIT_NUM);
FLOG_INFO("[ALTER_RESOURCE_TENANT_UNIT_NUM NOTICE] create a new rs job in Version < 4.2",
KR(ret), K(tenant_id), K(job_id));
}
return ret ;
}
int ObUnitManager::rollback_shrink_tenant_pool_unit_num_rs_job(
const uint64_t tenant_id,
common::ObMySQLTransaction &trans)
void ObUnitManager::print_user_error_(const uint64_t tenant_id)
{
ObRsJobInfo job_info;
int ret = RS_JOB_FIND(job_info, trans,
"job_type", "SHRINK_RESOURCE_TENANT_UNIT_NUM",
"job_status", "INPROGRESS",
"tenant_id", tenant_id);
if (OB_SUCC(ret) && job_info.job_id_ > 0) {
if (OB_FAIL(RS_JOB_COMPLETE(job_info.job_id_, -1, trans))) { // Roll back, this shrink failed
LOG_WARN("update all_rootservice_job failed", K(ret), K(job_info));
const int64_t ERR_MSG_LEN = 256;
char err_msg[ERR_MSG_LEN];
int ret = OB_SUCCESS;
int64_t pos = 0;
if (OB_FAIL(databuff_printf(err_msg, ERR_MSG_LEN, pos,
"Tenant (%lu) 'enable_rebalance' is disabled, alter tenant unit num", tenant_id))) {
if (OB_SIZE_OVERFLOW == ret) {
LOG_WARN("format to buff size overflow", KR(ret));
} else {
LOG_WARN("format new unit num failed", KR(ret));
}
} else {
LOG_WARN("failed to find rs job", K(ret), "tenant_id", tenant_id);
LOG_USER_ERROR(OB_OP_NOT_ALLOW, err_msg);
}
}
int ObUnitManager::cancel_alter_resource_tenant_unit_num_rs_job(
const uint64_t tenant_id,
common::ObMySQLTransaction &trans)
{
int ret = OB_SUCCESS;
int64_t job_id = 0;
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id));
} else if (OB_FAIL(find_alter_resource_tenant_unit_num_rs_job(tenant_id, job_id, trans))) {
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_SUCCESS;
FLOG_INFO("[ALTER_RESOURCE_TENANT_UNIT_NUM NOTICE] there is no rs job in table",KR(ret), K(tenant_id));
} else {
LOG_WARN("fail to execute find_alter_resource_tenant_unit_num_rs_job", KR(ret), K(tenant_id));
}
} else {
ret = RS_JOB_COMPLETE(job_id, OB_CANCELED, trans);
FLOG_INFO("[ALTER_RESOURCE_TENANT_UNIT_NUM NOTICE] cancel an inprogress rs job", KR(ret),
K(tenant_id), K(job_id));
}
return ret;
}
int ObUnitManager::create_alter_resource_tenant_unit_num_rs_job(
const uint64_t tenant_id,
const int64_t new_unit_num,
int64_t &job_id,
common::ObMySQLTransaction &trans,
ObRsJobType job_type)
{
int ret = OB_SUCCESS;
job_id = 0;
const int64_t extra_info_len = common::MAX_ROOTSERVICE_EVENT_EXTRA_INFO_LENGTH;
HEAP_VAR(char[extra_info_len], extra_info) {
memset(extra_info, 0, extra_info_len);
int64_t pos = 0;
share::schema::ObSchemaGetterGuard schema_guard;
const ObSimpleTenantSchema *tenant_schema;
if (OB_FAIL(databuff_printf(extra_info, extra_info_len, pos, "new_unit_num: %ld", new_unit_num))) {
if (OB_SIZE_OVERFLOW == ret) {
LOG_WARN("format to buff size overflow", K(ret));
} else {
LOG_WARN("format new unit num failed", K(ret));
}
} else if (OB_ISNULL(schema_service_)) {
ret = OB_NOT_INIT;
LOG_WARN("schema service is null", KR(ret), KP(schema_service_));
} else if (OB_FAIL(schema_service_->get_tenant_schema_guard(tenant_id, schema_guard))) {
LOG_WARN("fail to get tenant schema guard", KR(ret), K(tenant_id));
} else if (OB_FAIL(schema_guard.get_tenant_info(tenant_id, tenant_schema))) {
LOG_WARN("fail to get tenant info", K(ret), K(tenant_id));
} else if (OB_ISNULL(tenant_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tenant schema is null", KR(ret), KP(tenant_schema));
} else if (OB_FAIL(RS_JOB_CREATE_WITH_RET(
job_id,
job_type,
trans,
"tenant_id", tenant_id,
"tenant_name", tenant_schema->get_tenant_name(),
"extra_info", extra_info))) {
LOG_WARN("fail to create rs job", KR(ret), K(tenant_id), K(job_type));
}
}
return ret;
}
int ObUnitManager::rollback_alter_resource_tenant_unit_num_rs_job(
const uint64_t tenant_id,
const int64_t new_unit_num,
common::ObMySQLTransaction &trans)
{
int ret = OB_SUCCESS;
int64_t job_id = 0;
uint64_t tenant_data_version = 0;
if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, tenant_data_version))) {
LOG_WARN("fail to get min data version", KR(ret), K(tenant_id));
} else if (tenant_data_version < DATA_VERSION_4_2_1_0) {
// in v4.1, it's allowed, since this will not track the number of logstreams
if (OB_FAIL(cancel_alter_resource_tenant_unit_num_rs_job(tenant_id, trans))) {
LOG_WARN("fail to execute cancel_alter_resource_tenant_unit_num_rs_job in v4.1", KR(ret), K(tenant_id));
}
} else if (!ObShareUtil::is_tenant_enable_rebalance(tenant_id)) {
ret = OB_OP_NOT_ALLOW;
LOG_WARN("enable_rebalance is disabled, modify tenant unit num not allowed", KR(ret), K(tenant_id));
(void) print_user_error_(tenant_id);
} else if (OB_FAIL(cancel_alter_resource_tenant_unit_num_rs_job(tenant_id, trans))) {
LOG_WARN("fail to execute cancel_alter_resource_tenant_unit_num_rs_job", KR(ret), K(tenant_id));
} else if (OB_FAIL(create_alter_resource_tenant_unit_num_rs_job(tenant_id, new_unit_num, job_id, trans))) {
LOG_WARN("fail to execute create_alter_resource_tenant_unit_num_rs_job", KR(ret), K(tenant_id), K(new_unit_num));
}
FLOG_INFO("[ALTER_RESOURCE_TENANT_UNIT_NUM NOTICE] rollback a SHRINK UNIT_NUM rs job",
KR(ret), K(tenant_id), K(job_id), K(new_unit_num));
return ret;
}
@ -1521,31 +1674,55 @@ int ObUnitManager::try_complete_shrink_tenant_pool_unit_num_rs_job(
common::ObMySQLTransaction &trans)
{
int ret = OB_SUCCESS;
if (OB_FAIL(complete_shrink_tenant_pool_unit_num_rs_job_(tenant_id, trans))) {
// this function is called by ObDDLService::drop_resource_pool_pre
const int check_ret = OB_SUCCESS; // no need to check ls status
int64_t job_id = 0;
if (OB_FAIL(find_alter_resource_tenant_unit_num_rs_job(tenant_id, job_id, trans))) {
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_SUCCESS;
} else {
LOG_WARN("complete_shrink_tenant_pool_unit_num_rs_job failed", KR(ret), "tenant_id" K(tenant_id));
LOG_WARN("fail to execute find_alter_resource_tenant_unit_num_rs_job", KR(ret), K(tenant_id));
}
} else {} // do nothing
} else {
ret = complete_shrink_tenant_pool_unit_num_rs_job_(
tenant_id, job_id, check_ret, trans);
FLOG_INFO("[ALTER_RESOURCE_TENANT_UNIT_NUM NOTICE] complete an inprogress rs job DROP_TENANT_FORCE",
KR(ret), K(tenant_id), K(job_id));
}
return ret;
}
int ObUnitManager::complete_shrink_tenant_pool_unit_num_rs_job_(
const uint64_t tenant_id,
const int64_t job_id,
const int check_ret,
common::ObMySQLTransaction &trans)
{
ObRsJobInfo job_info;
int ret = RS_JOB_FIND(job_info, trans,
"job_type", "SHRINK_RESOURCE_TENANT_UNIT_NUM",
"job_status", "INPROGRESS",
"tenant_id", tenant_id);
if (OB_SUCC(ret) && job_info.job_id_ > 0) {
if (OB_FAIL(RS_JOB_COMPLETE(job_info.job_id_, 0, trans))) { // job success
LOG_WARN("update all_rootservice_job failed", K(ret), K(job_info));
}
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id)) || job_id < 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id), K(job_id));
} else if (OB_SUCC(RS_JOB_COMPLETE(job_id, check_ret, trans))) {
FLOG_INFO("[ALTER_RESOURCE_TENANT_UNIT_NUM NOTICE] complete an inprogressing rs job SHRINK UNIT_NUM",
KR(ret), K(tenant_id), K(job_id), K(check_ret));
} else {
LOG_WARN("failed to find rs job", K(ret), "tenant_id" K(tenant_id));
LOG_WARN("fail to complete rs job", KR(ret), K(tenant_id), K(job_id), K(check_ret));
if (OB_EAGAIN == ret) {
int64_t curr_job_id = 0;
if (OB_FAIL(find_alter_resource_tenant_unit_num_rs_job(tenant_id, curr_job_id, trans))) {
LOG_WARN("fail to find rs job", KR(ret), K(tenant_id), K(job_id), K(check_ret));
if (OB_ENTRY_NOT_EXIST == ret) {
FLOG_WARN("[ALTER_RESOURCE_TENANT_UNIT_NUM NOTICE] the specified rs job SHRINK UNIT_NUM might has "
"been already deleted in table manually",
KR(ret), K(tenant_id), K(job_id), K(check_ret));
ret = OB_SUCCESS;
}
} else {
ret = OB_EAGAIN;
FLOG_WARN("[ALTER_RESOURCE_TENANT_UNIT_NUM NOTICE] a non-checked rs job SHRINK UNIT_NUM cannot be committed",
KR(ret), K(job_id), K(curr_job_id));
}
}
}
return ret ;
}
@ -1625,6 +1802,8 @@ int ObUnitManager::expand_tenant_pools_unit_num_(
LOG_WARN("fail to generate new unit group id array", KR(ret), K(pools), K(new_unit_num));
} else if (OB_FAIL(trans.start(proxy_, OB_SYS_TENANT_ID))) {
LOG_WARN("fail to start transaction", KR(ret));
} else if (OB_FAIL(register_alter_resource_tenant_unit_num_rs_job(tenant_id, new_unit_num, AUN_EXPAND, trans))) {
LOG_WARN("fail to register shrink tenant pool unit num rs job", KR(ret), K(tenant_id));
} else {
share::ObResourcePool new_pool;
for (int64_t i = 0; OB_SUCC(ret) && i < pools.count(); ++i) {
@ -1757,7 +1936,7 @@ int ObUnitManager::get_to_be_deleted_unit_group(
}
} else {
ret = OB_OP_NOT_ALLOW;
LOG_USER_ERROR(OB_OP_NOT_ALLOW, "delete unit group which does not belong to this tenant");
LOG_USER_ERROR(OB_OP_NOT_ALLOW, "delete unit group which is not belong to this tenant");
}
}
} else {
@ -1837,8 +2016,7 @@ int ObUnitManager::shrink_tenant_pools_unit_num(
} else if (OB_FAIL(trans.start(proxy_, OB_SYS_TENANT_ID))) {
LOG_WARN("fail to start transaction", KR(ret));
} else {
if (OB_FAIL(register_shrink_tenant_pool_unit_num_rs_job(
tenant_id, new_unit_num, trans))) {
if (OB_FAIL(register_alter_resource_tenant_unit_num_rs_job(tenant_id, new_unit_num, AUN_SHRINK, trans))) {
LOG_WARN("fail to register shrink tenant pool unit num rs job", KR(ret), K(tenant_id));
} else {
share::ObResourcePool new_pool;
@ -1882,11 +2060,13 @@ int ObUnitManager::shrink_tenant_pools_unit_num(
}
}
// however, we need to end this transaction
const bool commit = (OB_SUCCESS == ret);
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = trans.end(commit))) {
LOG_WARN("trans end failed", K(tmp_ret), K(commit));
ret = (OB_SUCCESS == ret) ? tmp_ret : ret;
if (trans.is_started()) {
const bool commit = (OB_SUCCESS == ret);
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = trans.end(commit))) {
LOG_WARN("trans end failed", K(tmp_ret), K(commit));
ret = (OB_SUCCESS == ret) ? tmp_ret : ret;
}
}
// modify in memory pool/unit info
for (int64_t i = 0; OB_SUCC(ret) && i < pools.count(); ++i) {
@ -1932,8 +2112,8 @@ int ObUnitManager::rollback_tenant_shrink_pools_unit_num(
} else if (OB_FAIL(trans.start(proxy_, OB_SYS_TENANT_ID))) {
LOG_WARN("start transaction failed", K(ret));
} else {
if (OB_FAIL(rollback_shrink_tenant_pool_unit_num_rs_job(tenant_id, trans))) {
LOG_WARN("rollback rs_job failed ", K(ret));
if (OB_FAIL(rollback_alter_resource_tenant_unit_num_rs_job(tenant_id, new_unit_num, trans))) {
LOG_WARN("rollback rs_job failed ", KR(ret), K(new_unit_num));
} else {
share::ObResourcePool new_pool;
for (int64_t i = 0; OB_SUCC(ret) && i < pools.count(); ++i) {
@ -2057,14 +2237,9 @@ int ObUnitManager::alter_resource_tenant(
tenant_id, *pools, new_unit_num))) {
LOG_WARN("fail to rollback shrink pool unit num", K(ret), K(new_unit_num));
}
} else if (!ObShareUtil::is_tenant_enable_rebalance(tenant_id)) {
ret = OB_OP_NOT_ALLOW;
LOG_WARN("enable_rebalance is disabled, modify tenant unit num not allowed", KR(ret), K(tenant_id));
char err_msg[DEFAULT_BUF_LENGTH];
(void)snprintf(err_msg, sizeof(err_msg),
"Tenant (%lu) 'enable_rebalance' is disabled, alter tenant unit num", tenant_id);
LOG_USER_ERROR(OB_OP_NOT_ALLOW, err_msg);
} else if (AUN_EXPAND == alter_unit_num_type) {
// in 4.1, if enable_rebalance is false, this op can be executed successfully
// in 4.2, it will return OB_OP_NOT_ALLOW
if (delete_unit_group_id_array.count() > 0) {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "expand pool unit num combined with deleting unit");
@ -2074,6 +2249,7 @@ int ObUnitManager::alter_resource_tenant(
KPC(pools));
}
} else if (AUN_SHRINK == alter_unit_num_type) {
// both 4.1 and 4.2 do not allow this op when enable_rebalance is false.
if (OB_FAIL(shrink_tenant_pools_unit_num(
tenant_id, *pools, new_unit_num, delete_unit_group_id_array))) {
LOG_WARN("fail to shrink pool unit num", K(ret), K(new_unit_num));
@ -4023,7 +4199,9 @@ int ObUnitManager::get_all_unit_infos_by_tenant(const uint64_t tenant_id,
}
int ObUnitManager::commit_shrink_tenant_resource_pool(
const uint64_t tenant_id)
const uint64_t tenant_id,
const int64_t job_id,
const int check_ret)
{
int ret = OB_SUCCESS;
SpinWLockGuard guard(lock_);
@ -4044,8 +4222,8 @@ int ObUnitManager::commit_shrink_tenant_resource_pool(
LOG_WARN("pool ptr is null", KR(ret), KP(pools));
} else if (OB_FAIL(trans.start(proxy_, OB_SYS_TENANT_ID))) {
LOG_WARN("start transaction failed", KR(ret));
} else if (OB_FAIL(complete_shrink_tenant_pool_unit_num_rs_job_(tenant_id, trans))) {
LOG_WARN("do rs_job failed", KR(ret), K(tenant_id));
} else if (OB_FAIL(complete_shrink_tenant_pool_unit_num_rs_job_(tenant_id, job_id, check_ret, trans))) {
LOG_WARN("fail to complete rs job", KR(ret), K(tenant_id), K(job_id), K(check_ret));
} else if (OB_FAIL(commit_shrink_resource_pool_in_trans_(*pools, trans, resource_units))) {
LOG_WARN("failed to shrink in trans", KR(ret), KPC(pools));
}
@ -6394,15 +6572,14 @@ int ObUnitManager::complete_migrate_unit_rs_job_in_pool(
} else if (unit->is_manual_migrate()) {
char ip_buf[common::MAX_IP_ADDR_LENGTH];
(void)unit->server_.ip_to_string(ip_buf, common::MAX_IP_ADDR_LENGTH);
ObRsJobInfo job_info;
int tmp_ret = RS_JOB_FIND(job_info, *proxy_, "job_type", "MIGRATE_UNIT",
"job_status", "INPROGRESS",
int64_t job_id = 0;
int tmp_ret = RS_JOB_FIND(MIGRATE_UNIT, job_id, *proxy_,
"unit_id", unit->unit_id_,
"svr_ip", ip_buf,
"svr_port", unit->server_.get_port());
if (OB_SUCCESS == tmp_ret && job_info.job_id_ > 0) {
if (OB_FAIL(RS_JOB_COMPLETE(job_info.job_id_, result_ret, trans))) {
LOG_WARN("all_rootservice_job update failed", K(ret), K(result_ret), K(job_info));
if (OB_SUCCESS == tmp_ret && job_id > 0) {
if (OB_FAIL(RS_JOB_COMPLETE(job_id, result_ret, trans))) {
LOG_WARN("all_rootservice_job update failed", K(ret), K(result_ret), K(job_id));
}
}
}
@ -8304,11 +8481,17 @@ int ObUnitManager::migrate_unit(const uint64_t unit_id, const ObAddr &dst, const
} else if (is_manual) {
char ip_buf[common::MAX_IP_ADDR_LENGTH];
(void)dst.ip_to_string(ip_buf, common::MAX_IP_ADDR_LENGTH);
const int64_t job_id = RS_JOB_CREATE(MIGRATE_UNIT, trans, "unit_id", unit_id,
"svr_ip", ip_buf, "svr_port", dst.get_port(), "tenant_id", pool->tenant_id_);
if (job_id < 1) {
ret = OB_SQL_OPT_ERROR;
LOG_WARN("insert into all_rootservice_job failed ", K(ret));
int64_t job_id = 0;
if (OB_FAIL(RS_JOB_CREATE_WITH_RET(
job_id,
ObRsJobType::JOB_TYPE_MIGRATE_UNIT,
trans,
"unit_id", unit_id,
"svr_ip", ip_buf,
"svr_port", dst.get_port(),
"tenant_id", pool->tenant_id_))) {
LOG_WARN("fail to create rs job MIGRATE_UNIT", KR(ret), "tenant_id", pool->tenant_id_,
K(unit_id));
} else if (!granted) {
if (OB_FAIL(RS_JOB_COMPLETE(job_id, OB_SUCCESS, trans))) {
LOG_WARN("all_rootservice_job update failed", K(ret), K(job_id));
@ -8535,15 +8718,14 @@ int ObUnitManager::end_migrate_unit(const uint64_t unit_id, const EndMigrateOp e
// complete the job if exists
char ip_buf[common::MAX_IP_ADDR_LENGTH];
(void)unit_server.ip_to_string(ip_buf, common::MAX_IP_ADDR_LENGTH);
ObRsJobInfo job_info;
tmp_ret = RS_JOB_FIND(job_info, trans, "job_type", "MIGRATE_UNIT",
"job_status", "INPROGRESS", "unit_id", unit_id,
int64_t job_id = 0;
tmp_ret = RS_JOB_FIND(MIGRATE_UNIT, job_id, trans, "unit_id", unit_id,
"svr_ip", ip_buf, "svr_port", unit_server.get_port());
if (OB_SUCCESS == tmp_ret && job_info.job_id_ > 0) {
if (OB_SUCCESS == tmp_ret && job_id > 0) {
tmp_ret = (end_migrate_op == COMMIT) ? OB_SUCCESS :
(end_migrate_op == REVERSE ? OB_CANCELED : OB_TIMEOUT);
if (OB_FAIL(RS_JOB_COMPLETE(job_info.job_id_, tmp_ret, trans))) {
LOG_WARN("all_rootservice_job update failed", K(ret), K(job_info));
if (OB_FAIL(RS_JOB_COMPLETE(job_id, tmp_ret, trans))) {
LOG_WARN("all_rootservice_job update failed", K(ret), K(job_id));
}
} else {
//Can not find the situation, only the user manually opened will write rs_job