diff --git a/src/rootserver/ob_recovery_ls_service.cpp b/src/rootserver/ob_recovery_ls_service.cpp index 7b164bcf29..8d76eb47b5 100755 --- a/src/rootserver/ob_recovery_ls_service.cpp +++ b/src/rootserver/ob_recovery_ls_service.cpp @@ -477,7 +477,9 @@ int ObRecoveryLSService::process_ls_tx_log_(ObTxLogBlock &tx_log_block, const SC ObMySQLTransaction trans; for (int64_t i = 0; OB_SUCC(ret) && i < source_data.count(); ++i) { const ObTxBufferNode &node = source_data.at(i); - if (ObTxDataSourceType::STANDBY_UPGRADE == node.get_data_source_type()) { + if (OB_FAIL(try_cancel_clone_job_for_standby_tenant_(node))) { + LOG_WARN("fail to cancel clone job for standby tenant", KR(ret), K(node)); + } else if (ObTxDataSourceType::STANDBY_UPGRADE == node.get_data_source_type()) { if (OB_FAIL(process_upgrade_log_(sync_scn, node))) { LOG_WARN("failed to process_upgrade_log_", KR(ret), K(node)); } @@ -494,18 +496,18 @@ int ObRecoveryLSService::process_ls_tx_log_(ObTxLogBlock &tx_log_block, const SC if (OB_FAIL(process_ls_transfer_task_in_trans_(node, sync_scn, trans))) { LOG_WARN("failed to process ls transfer task", KR(ret), K(node)); } - } else if (OB_FAIL(process_ls_table_in_trans_(node, - sync_scn, trans))) { + } else if (OB_FAIL(process_ls_table_in_trans_(node, sync_scn, trans))) { //TODO ls recovery is too fast for ls manager, so it maybe failed, while change ls status //consider how to retry - LOG_WARN("failed to process ls operator", KR(ret), K(node), - K(sync_scn)); + LOG_WARN("failed to process ls operator", KR(ret), K(node), K(sync_scn)); } }// end for if (OB_FAIL(ret) || !has_operation) { } else if (OB_FAIL(report_sys_ls_recovery_stat_in_trans_(sync_scn, false, trans, "report recovery stat and has multi data source"))) { LOG_WARN("failed to report sys ls recovery stat", KR(ret), K(sync_scn)); + } else if (OB_FAIL(check_standby_tenant_not_in_cloning_(trans))) { + LOG_WARN("fail to check standby tenant in cloning", KR(ret)); } ret = ERRSIM_END_TRANS_ERROR ? : ret; END_TRANSACTION(trans) @@ -516,6 +518,81 @@ int ObRecoveryLSService::process_ls_tx_log_(ObTxLogBlock &tx_log_block, const SC return ret; } +int ObRecoveryLSService::try_cancel_clone_job_for_standby_tenant_( + const transaction::ObTxBufferNode &node) +{ + int ret = OB_SUCCESS; + bool is_conflict_with_clone = true; + if (OB_UNLIKELY(!node.is_valid()) + || OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(node), K_(tenant_id)); + } else if (OB_FAIL(log_type_conflict_with_clone_procedure_(node, is_conflict_with_clone))) { + LOG_WARN("fail to check whether log type is conflict with clone", KR(ret), K(node)); + } else if (is_conflict_with_clone) { + // determine case to check + ObConflictCaseWithClone case_to_check; + if (ObTxDataSourceType::STANDBY_UPGRADE == node.get_data_source_type()) { + case_to_check = ObConflictCaseWithClone::STANDBY_UPGRADE; + } else if (ObTxDataSourceType::LS_TABLE == node.get_data_source_type()) { + case_to_check = ObConflictCaseWithClone::STANDBY_MODIFY_LS; + } else if (ObTxDataSourceType::TRANSFER_TASK == node.get_data_source_type()) { + case_to_check = ObConflictCaseWithClone::STANDBY_TRANSFER; + } else { + ret = OB_STATE_NOT_MATCH; + LOG_WARN("unexpected log type", KR(ret), K(node)); + } + // cancel clone job if exists + if (OB_FAIL(ret)) { + } else if (OB_FAIL(ObTenantSnapshotUtil::cancel_existed_clone_job_if_need(tenant_id_, case_to_check))) { + LOG_WARN("fail to check tenant whether in cloning ", KR(ret), K_(tenant_id), K(case_to_check)); + } + } else { + // only standby upgrade/transfer/ls_alter conflict with standby clone job + // So if multi source log not these types, just skip, where is no need to cancel clone job + LOG_TRACE("log type is not conflict with clone operation", K(node)); + } + DEBUG_SYNC(AFTER_FIRST_CLONE_CHECK_FOR_STANDBY); + return ret; +} + +int ObRecoveryLSService::check_standby_tenant_not_in_cloning_( + common::ObMySQLTransaction &trans) +{ + int ret = OB_SUCCESS; + bool is_cloning = false; + if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K_(tenant_id)); + } else if (OB_FAIL(ObTenantSnapshotUtil::check_standby_tenant_not_in_cloning_procedure( + trans, tenant_id_, is_cloning))) { + LOG_WARN("fail to check standby tenant whether in cloning procedure", KR(ret), K_(tenant_id)); + } else if (is_cloning) { + ret = OB_CONFLICT_WITH_CLONE; + LOG_WARN("tenant is cloning, can not process balance task", KR(ret), K_(tenant_id), K(is_cloning)); + } + return ret; +} + +int ObRecoveryLSService::log_type_conflict_with_clone_procedure_( + const transaction::ObTxBufferNode &node, + bool &is_conflict) +{ + int ret = OB_SUCCESS; + is_conflict = true; + if (OB_UNLIKELY(!node.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(node)); + } else if (ObTxDataSourceType::STANDBY_UPGRADE == node.get_data_source_type() + || ObTxDataSourceType::LS_TABLE == node.get_data_source_type() + || ObTxDataSourceType::TRANSFER_TASK == node.get_data_source_type()) { + is_conflict = true; + } else { + is_conflict = false; + } + return ret; +} + int ObRecoveryLSService::process_upgrade_log_( const share::SCN &sync_scn, const ObTxBufferNode &node) { diff --git a/src/rootserver/ob_recovery_ls_service.h b/src/rootserver/ob_recovery_ls_service.h index d823dd588e..ac4a94247b 100755 --- a/src/rootserver/ob_recovery_ls_service.h +++ b/src/rootserver/ob_recovery_ls_service.h @@ -148,7 +148,10 @@ private: #ifdef OB_BUILD_LOG_STORAGE_COMPRESS int decompress_log_payload_(const char *in_buf, const int64_t in_buf_len, char *&decompress_buf, int64_t &decompressed_len); #endif - + // cancel clone job if clone job exists + int try_cancel_clone_job_for_standby_tenant_(const transaction::ObTxBufferNode &node); + int check_standby_tenant_not_in_cloning_(common::ObMySQLTransaction &trans); + int log_type_conflict_with_clone_procedure_(const transaction::ObTxBufferNode &node, bool &is_conflict); private: bool inited_; uint64_t tenant_id_; diff --git a/src/rootserver/ob_root_service.cpp b/src/rootserver/ob_root_service.cpp index f486f48709..a3ad5a3fcd 100755 --- a/src/rootserver/ob_root_service.cpp +++ b/src/rootserver/ob_root_service.cpp @@ -2479,7 +2479,7 @@ int ObRootService::clone_resource_pool(const obrpc::ObCloneResourcePoolArg &arg) } else if (!arg.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument to clone resource pool", KR(ret), K(arg)); - } else if (OB_FAIL(ObShareUtil::check_compat_version_for_clone_tenant( + } else if (OB_FAIL(ObShareUtil::check_compat_version_for_clone_tenant_with_tenant_role( arg.get_source_tenant_id(), is_compatible))) { LOG_WARN("fail to check compat version", KR(ret), K(arg)); @@ -2779,7 +2779,7 @@ int ObRootService::create_tenant(const ObCreateTenantArg &arg, UInt64 &tenant_id } else if (!tmp_tenant && OB_FAIL(ObResolverUtils::check_not_supported_tenant_name(tenant_name))) { LOG_WARN("unsupported tenant name", KR(ret), K(tenant_name)); } else if (arg.is_clone_tenant() - && OB_FAIL(ObShareUtil::check_compat_version_for_clone_tenant( + && OB_FAIL(ObShareUtil::check_compat_version_for_clone_tenant_with_tenant_role( arg.source_tenant_id_, compatible_with_clone_tenant))) { LOG_WARN("fail to check compatible version with clone tenant", KR(ret), K(arg)); @@ -5407,10 +5407,10 @@ int ObRootService::clone_tenant(const obrpc::ObCloneTenantArg &arg, } else if (OB_ISNULL(schema_service_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected schema_service_", KR(ret), KP(schema_service_)); - } else if (GCTX.is_standby_cluster() || GCONF.in_upgrade_mode()) { + } else if (GCONF.in_upgrade_mode()) { ret = OB_OP_NOT_ALLOW; - LOG_WARN("clone tenant while in standby cluster or in upgrade mode is not allowed", KR(ret)); - LOG_USER_ERROR(OB_OP_NOT_ALLOW, "clone tenant while in standby cluster or in upgrade mode"); + LOG_WARN("clone tenant while in upgrade mode is not allowed", KR(ret)); + LOG_USER_ERROR(OB_OP_NOT_ALLOW, "clone tenant while in upgrade mode"); } else if (OB_FAIL(ObResolverUtils::check_not_supported_tenant_name(clone_tenant_name))) { LOG_WARN("unsupported clone tenant name", KR(ret), K(clone_tenant_name)); } else { diff --git a/src/rootserver/restore/ob_clone_scheduler.cpp b/src/rootserver/restore/ob_clone_scheduler.cpp index 9c4654f912..0cb6daad33 100644 --- a/src/rootserver/restore/ob_clone_scheduler.cpp +++ b/src/rootserver/restore/ob_clone_scheduler.cpp @@ -693,6 +693,8 @@ int ObCloneScheduler::clone_wait_tenant_restore_finish(const ObCloneJob &job) if (has_set_stop()) { ret = OB_CANCELED; LOG_WARN("clone scheduler stopped", KR(ret)); + } else if (OB_FAIL(check_data_version_before_finish_clone_(job.get_source_tenant_id()))) { + LOG_WARN("fail to check data version before finish snapshot creation", KR(ret), K(job)); } else if (OB_FAIL(rpc_proxy_->timeout(timeout).create_tenant_end(arg))) { need_wait = true; LOG_WARN("fail to create tenant end", KR(ret), K(arg), K(timeout)); @@ -1279,8 +1281,8 @@ int ObCloneScheduler::try_update_job_status_( } if (OB_SUCC(ret)) { work_immediately_ = true; - LOG_INFO("[RESTORE] switch job status", KR(ret), K(job), K(next_status)); - (void)record_rs_event_(job, cur_status, next_status, return_ret); + (void)ObTenantCloneUtil::try_to_record_clone_status_change_rs_event( + job, cur_status, next_status, return_ret, ObCancelCloneJobReason()); } return ret; } @@ -1394,21 +1396,6 @@ ObTenantCloneStatus ObCloneScheduler::get_user_next_status_( return next_status; } -void ObCloneScheduler::record_rs_event_( - const share::ObCloneJob &job, - const share::ObTenantCloneStatus prev_status, - const share::ObTenantCloneStatus cur_status, - const int ret) -{ - const char *prev_status_str = ObTenantCloneStatus::get_clone_status_str(prev_status); - const char *cur_status_str = ObTenantCloneStatus::get_clone_status_str(cur_status); - ROOTSERVICE_EVENT_ADD("clone", "change_clone_status", - "job_id", job.get_job_id(), - K(ret), - "prev_clone_status", prev_status_str, - "cur_clone_status", cur_status_str); -} - int ObCloneScheduler::check_sys_tenant_(const uint64_t tenant_id) { int ret = OB_SUCCESS; @@ -1461,9 +1448,9 @@ int ObCloneScheduler::check_meta_tenant_(const uint64_t tenant_id) } else if (OB_ISNULL(GCTX.schema_service_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected null schema service", KR(ret), KP(GCTX.schema_service_)); - } else if (GCTX.is_standby_cluster() || GCONF.in_upgrade_mode()) { + } else if (GCONF.in_upgrade_mode()) { ret = OB_OP_NOT_ALLOW; - LOG_WARN("clone tenant while in standby cluster or in upgrade mode is not allowed", KR(ret)); + LOG_WARN("clone tenant while in upgrade mode is not allowed", KR(ret)); } else if (OB_FAIL(ObShareUtil::check_compat_version_for_clone_tenant(tenant_id, compatibility_satisfied))) { LOG_WARN("check tenant compatibility failed", KR(ret), K(tenant_id)); } else if (!compatibility_satisfied) { @@ -2027,5 +2014,39 @@ int ObCloneScheduler::get_latest_key_id_( return ret; } +int ObCloneScheduler::check_data_version_before_finish_clone_( + const uint64_t source_tenant_id) +{ + int ret = OB_SUCCESS; + ObTenantCloneTableOperator clone_op; + ObCloneJob clone_job; + uint64_t current_min_cluster_version = GET_MIN_CLUSTER_VERSION(); + uint64_t current_data_version = 0; + if (OB_UNLIKELY(OB_INVALID_TENANT_ID == source_tenant_id)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(source_tenant_id)); + } else if (OB_ISNULL(GCTX.sql_proxy_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret)); + } else if (OB_FAIL(clone_op.init(OB_SYS_TENANT_ID, GCTX.sql_proxy_))) { + LOG_WARN("fail to init clone table operator", KR(ret), K(source_tenant_id)); + } else if (OB_FAIL(clone_op.get_clone_job_by_source_tenant_id( + source_tenant_id, clone_job))) { + LOG_WARN("fail to get job", KR(ret), K(source_tenant_id)); + } else if (clone_job.get_min_cluster_version() != current_min_cluster_version) { + ret = OB_STATE_NOT_MATCH; + LOG_WARN("min cluster version has changed, clone tenant should fail", KR(ret), + K(source_tenant_id), K(clone_job), K(current_min_cluster_version)); + } else if (OB_FAIL(ObTenantSnapshotUtil::check_current_and_target_data_version( + source_tenant_id, current_data_version))) { + LOG_WARN("fail to check and get current data version", KR(ret), K(source_tenant_id)); + } else if (clone_job.get_data_version() != current_data_version) { + ret = OB_STATE_NOT_MATCH; + LOG_WARN("data version has changed, clone tenant should fail", KR(ret), + K(source_tenant_id), K(clone_job), K(current_data_version)); + } + return ret; +} + } } diff --git a/src/rootserver/restore/ob_clone_scheduler.h b/src/rootserver/restore/ob_clone_scheduler.h index 514dc8fb04..8198c5a292 100644 --- a/src/rootserver/restore/ob_clone_scheduler.h +++ b/src/rootserver/restore/ob_clone_scheduler.h @@ -85,10 +85,6 @@ private: share::ObTenantCloneStatus get_sys_next_status_in_failed_(const share::ObTenantCloneStatus current_status); share::ObTenantCloneStatus get_user_next_status_(const int return_ret, const share::ObTenantCloneStatus current_status); - void record_rs_event_(const share::ObCloneJob &job, - const share::ObTenantCloneStatus prev_status, - const share::ObTenantCloneStatus cur_status, - const int return_ret); int check_sys_tenant_(const uint64_t tenant_id); int check_meta_tenant_(const uint64_t tenant_id); @@ -131,6 +127,7 @@ private: const ObArray& ls_info_array, const ObArray& ls_snapshot_array, TenantRestoreStatus &tenant_restore_status); + int check_data_version_before_finish_clone_(const uint64_t source_tenant_id); private: static const int32_t MAX_RETRY_CNT = 5; static const int64_t DEFAULT_TIMEOUT = 10 * 1000 * 1000L; diff --git a/src/rootserver/restore/ob_tenant_clone_util.cpp b/src/rootserver/restore/ob_tenant_clone_util.cpp index dd501b8316..8ed4c2a692 100644 --- a/src/rootserver/restore/ob_tenant_clone_util.cpp +++ b/src/rootserver/restore/ob_tenant_clone_util.cpp @@ -16,6 +16,7 @@ #include "share/tenant_snapshot/ob_tenant_snapshot_table_operator.h" #include "share/restore/ob_tenant_clone_table_operator.h" #include "share/location_cache/ob_location_service.h" +#include "share/ob_global_stat_proxy.h" // for ObGlobalStatProxy #include "rootserver/tenant_snapshot/ob_tenant_snapshot_util.h" using namespace oceanbase::rootserver; @@ -85,6 +86,8 @@ int ObTenantCloneUtil::fill_clone_job(const int64_t job_id, clone_job.reset(); ObTenantCloneJobType job_type = ObTenantCloneJobType::CLONE_JOB_MAX_TYPE; common::ObCurTraceId::TraceId trace_id; + uint64_t data_version = 0; + uint64_t min_cluster_version = 0; if (OB_UNLIKELY(job_id < 0 || !arg.is_valid() @@ -93,10 +96,12 @@ int ObTenantCloneUtil::fill_clone_job(const int64_t job_id, ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret), K(job_id), K(arg), K(source_tenant_id), K(source_tenant_name), K(snapshot_item)); - } else if (FALSE_IT(job_type = snapshot_item.is_valid() ? - ObTenantCloneJobType::RESTORE : - ObTenantCloneJobType::FORK)) { + } else if (OB_FAIL(construct_data_version_to_record_(source_tenant_id, data_version, min_cluster_version))) { + LOG_WARN("fail to construct data version to record", KR(ret), K(source_tenant_id)); } else { + job_type = snapshot_item.is_valid() ? + ObTenantCloneJobType::RESTORE : + ObTenantCloneJobType::FORK; ObCurTraceId::TraceId *cur_trace_id = ObCurTraceId::get_trace_id(); if (nullptr != cur_trace_id) { trace_id = *cur_trace_id; @@ -120,6 +125,8 @@ int ObTenantCloneUtil::fill_clone_job(const int64_t job_id, .status_ = ObTenantCloneStatus(ObTenantCloneStatus::Status::CLONE_SYS_LOCK), .job_type_ = job_type, .ret_code_ = OB_SUCCESS, + .data_version_ = data_version, + .min_cluster_version_ = min_cluster_version, }; if (OB_FAIL(clone_job.init(init_arg))) { LOG_WARN("fail to init clone job", KR(ret), K(init_arg)); @@ -129,6 +136,37 @@ int ObTenantCloneUtil::fill_clone_job(const int64_t job_id, return ret; } +int ObTenantCloneUtil::construct_data_version_to_record_( + const uint64_t tenant_id, + uint64_t &data_version, + uint64_t &min_cluster_version) +{ + int ret = OB_SUCCESS; + data_version = 0; + min_cluster_version = 0; + bool need_to_record_data_version = false; + if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id) + || OB_ISNULL(GCTX.sql_proxy_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id)); + } else if (OB_FAIL(ObShareUtil::check_compat_version_for_clone_standby_tenant( + tenant_id, need_to_record_data_version))) { + LOG_WARN("fail to check whether need to record data version", KR(ret), K(tenant_id)); + } else if (need_to_record_data_version) { + ObGlobalStatProxy proxy(*GCTX.sql_proxy_, tenant_id); + if (OB_FAIL(proxy.get_current_data_version(data_version))) { + LOG_WARN("fail to get current data version", KR(ret), K(tenant_id)); + } else { + min_cluster_version = GET_MIN_CLUSTER_VERSION(); + } + } else { + // data version not promoted, make sure data_version and min_cluster_version are 0 + data_version = 0; + min_cluster_version = 0; + } + return ret; +} + int ObTenantCloneUtil::record_clone_job(common::ObISQLClient &sql_client, const share::ObCloneJob &clone_job) { @@ -523,22 +561,136 @@ int ObTenantCloneUtil::get_clone_job_failed_message(common::ObISQLClient &sql_cl return ret; } -//This function is called by the user executing "cancel clone" sql. -int ObTenantCloneUtil::cancel_clone_job(common::ObISQLClient &sql_client, - const ObString &clone_tenant_name, - bool &clone_already_finish) +int ObTenantCloneUtil::inner_cancel_clone_job_( + ObTenantCloneTableOperator &clone_op, + const ObCloneJob &clone_job, + const ObCancelCloneJobReason &reason, + bool &clone_already_finish) { int ret = OB_SUCCESS; clone_already_finish = false; + ObSqlString err_msg; + const ObTenantCloneStatus next_status(ObTenantCloneStatus::Status::CLONE_SYS_CANCELING); + + if (OB_UNLIKELY(!clone_op.is_inited()) + || OB_UNLIKELY(!clone_job.is_valid()) + || OB_UNLIKELY(!reason.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(clone_job), K(reason)); + } else if (clone_job.get_status().is_user_status()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected sys clone job status", KR(ret), K(clone_job)); + } else if (ObTenantCloneStatus::Status::CLONE_SYS_RELEASE_RESOURCE == clone_job.get_status() + || !clone_job.get_status().is_sys_processing_status()) { + clone_already_finish = true; + } else if (OB_FAIL(clone_op.update_job_status(clone_job.get_job_id(), + clone_job.get_status(), /*old_status*/ + next_status))) { + LOG_WARN("fail to update job status", KR(ret), K(clone_job)); + } else if (OB_FAIL(err_msg.append_fmt("clone job has been canceled in %s status %s", + ObTenantCloneStatus::get_clone_status_str(clone_job.get_status()), + reason.get_reason_str()))) { + LOG_WARN("fail to construct error message", KR(ret), K(reason), K(clone_job)); + } else if (OB_FAIL(clone_op.update_job_failed_info(clone_job.get_job_id(), OB_CANCELED, err_msg.string()))) { + LOG_WARN("fail to update job failed info", KR(ret), K(clone_job)); + } + return ret; +} + +void ObTenantCloneUtil::try_to_record_clone_status_change_rs_event( + const ObCloneJob &clone_job, + const share::ObTenantCloneStatus &prev_clone_status, + const share::ObTenantCloneStatus &cur_clone_status, + const int ret_code, + const ObCancelCloneJobReason &reason) +{ + int ret = OB_SUCCESS; + ObSqlString execute_result; + if (OB_UNLIKELY(!clone_job.is_valid()) + || OB_UNLIKELY(!prev_clone_status.is_valid()) + || OB_UNLIKELY(!cur_clone_status.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(clone_job), K(prev_clone_status), K(cur_clone_status)); + } else if (OB_FAIL(execute_result.assign_fmt("%s(%d)", common::ob_error_name(ret_code), ret_code))) { + LOG_WARN("fail to build execute result", KR(ret), K(ret_code)); + } else { + ROOTSERVICE_EVENT_ADD("clone", "change_clone_status", + "job_id", clone_job.get_job_id(), + K(execute_result), K(prev_clone_status), K(cur_clone_status), + reason.is_valid() ? "reason" : "", + reason.is_valid() ? reason.get_reason_str() : ""); + } + LOG_INFO("[CLONE] switch job status", KR(ret), K(clone_job), K(prev_clone_status), + K(cur_clone_status), K(ret_code), K(reason)); +} + +int ObTenantCloneUtil::cancel_clone_job_by_source_tenant_id( + common::ObISQLClient &sql_client, + const uint64_t source_tenant_id, + const ObCancelCloneJobReason &reason, + bool &clone_already_finish) +{ + int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + clone_already_finish = false; ObTenantCloneTableOperator clone_op; ObCloneJob clone_job; ObMySQLTransaction trans; - ObSqlString err_msg; - const ObTenantCloneStatus next_status(ObTenantCloneStatus::Status::CLONE_SYS_CANCELING); + FLOG_INFO("begin to cancel clone job", K(source_tenant_id), K(reason)); - if (OB_UNLIKELY(clone_tenant_name.empty())) { + if (OB_UNLIKELY(OB_INVALID_TENANT_ID == source_tenant_id) + || OB_UNLIKELY(!reason.is_valid())) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", KR(ret), K(clone_tenant_name)); + LOG_WARN("invalid argument", KR(ret), K(source_tenant_id), K(reason)); + } else if (OB_FAIL(trans.start(&sql_client, OB_SYS_TENANT_ID))) { + LOG_WARN("failed to start trans", KR(ret)); + } else if (OB_FAIL(clone_op.init(OB_SYS_TENANT_ID, &trans))) { + LOG_WARN("fail init clone op", KR(ret)); + } else if (OB_FAIL(clone_op.get_clone_job_by_source_tenant_id(source_tenant_id, clone_job))) { + if (OB_ENTRY_NOT_EXIST != ret) { + LOG_WARN("fail to get clone job", KR(ret), K(source_tenant_id)); + } else { + ret = OB_SUCCESS; + clone_already_finish = true; + LOG_INFO("clone job has already finished", KR(ret), K(source_tenant_id)); + } + } else if (OB_FAIL(inner_cancel_clone_job_(clone_op, clone_job, reason, clone_already_finish))) { + LOG_WARN("fail to cancel clone job", KR(ret), K(clone_job), K(reason)); + } + + if (trans.is_started()) { + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) { + LOG_WARN("trans end failed", "is_commit", (OB_SUCCESS == ret), KR(tmp_ret)); + ret = (OB_SUCC(ret)) ? tmp_ret : ret; + } + } + + if (OB_FAIL(ret)) { + } else { + (void)try_to_record_clone_status_change_rs_event( + clone_job, clone_job.get_status(), ObTenantCloneStatus(ObTenantCloneStatus::Status::CLONE_SYS_CANCELING), ret, reason); + } + return ret; +} + +//This function is called by the user executing "cancel clone" sql. +int ObTenantCloneUtil::cancel_clone_job_by_name( + common::ObISQLClient &sql_client, + const ObString &clone_tenant_name, + bool &clone_already_finish, + const ObCancelCloneJobReason &reason) +{ + int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + clone_already_finish = false; + ObTenantCloneTableOperator clone_op; + ObCloneJob clone_job; + ObMySQLTransaction trans; + + if (OB_UNLIKELY(clone_tenant_name.empty()) || OB_UNLIKELY(!reason.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(clone_tenant_name), K(reason)); } else if (OB_FAIL(trans.start(&sql_client, OB_SYS_TENANT_ID))) { LOG_WARN("failed to start trans", KR(ret)); } else if (OB_FAIL(clone_op.init(OB_SYS_TENANT_ID, &trans))) { @@ -551,20 +703,8 @@ int ObTenantCloneUtil::cancel_clone_job(common::ObISQLClient &sql_client, ret = OB_SUCCESS; clone_already_finish = true; } - } else if (clone_job.get_status().is_user_status()) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected sys clone job status", KR(ret), K(clone_job)); - } else if (ObTenantCloneStatus::Status::CLONE_SYS_RELEASE_RESOURCE == clone_job.get_status() - || !clone_job.get_status().is_sys_processing_status()) { - clone_already_finish = true; - } else if (OB_FAIL(clone_op.update_job_status(clone_job.get_job_id(), - clone_job.get_status(), /*old_status*/ - next_status))) { - LOG_WARN("fail to update job status", KR(ret), K(clone_tenant_name), K(clone_job)); - } else if (OB_FAIL(err_msg.append_fmt("clone job has been canceled in %s status", - ObTenantCloneStatus::get_clone_status_str(clone_job.get_status())))) { - } else if (OB_FAIL(clone_op.update_job_failed_info(clone_job.get_job_id(), OB_CANCELED, err_msg.string()))) { - LOG_WARN("fail to update job failed info", KR(ret), K(clone_job)); + } else if (OB_FAIL(inner_cancel_clone_job_(clone_op, clone_job, reason, clone_already_finish))) { + LOG_WARN("fail to cancel clone job", KR(ret), K(clone_job), K(reason)); } if (trans.is_started()) { @@ -575,17 +715,10 @@ int ObTenantCloneUtil::cancel_clone_job(common::ObISQLClient &sql_client, } } - if (OB_SUCC(ret)) { - LOG_INFO("[RESTORE] switch job status", KR(ret), K(clone_job), K(next_status)); - - const char *prev_status_str = ObTenantCloneStatus::get_clone_status_str(clone_job.get_status()); - const char *cur_status_str = ObTenantCloneStatus::get_clone_status_str(next_status); - - ROOTSERVICE_EVENT_ADD("clone", "change_clone_status", - "job_id", clone_job.get_job_id(), - K(ret), - "prev_clone_status", prev_status_str, - "cur_clone_status", cur_status_str); + if (OB_FAIL(ret)) { + } else { + (void)try_to_record_clone_status_change_rs_event( + clone_job, clone_job.get_status(), ObTenantCloneStatus(ObTenantCloneStatus::Status::CLONE_SYS_CANCELING), ret, reason); } return ret; } diff --git a/src/rootserver/restore/ob_tenant_clone_util.h b/src/rootserver/restore/ob_tenant_clone_util.h index cd6b73ae5f..7befc850c8 100644 --- a/src/rootserver/restore/ob_tenant_clone_util.h +++ b/src/rootserver/restore/ob_tenant_clone_util.h @@ -14,6 +14,7 @@ #define __OB_RS_TENANT_CLONE_UTIL_H__ #include "lib/mysqlclient/ob_mysql_transaction.h" +#include "share/restore/ob_tenant_clone_table_operator.h" //ObCancelCloneJobReason namespace oceanbase { @@ -72,9 +73,48 @@ public: ObIAllocator &allocator, ObString &err_msg); //attention: This function is called by the user executing "cancel clone" sql. - static int cancel_clone_job(common::ObISQLClient &sql_client, - const ObString &clone_tenant_name, - bool &clone_already_finish); + static int cancel_clone_job_by_name( + common::ObISQLClient &sql_client, + const ObString &clone_tenant_name, + bool &clone_already_finish, + const ObCancelCloneJobReason &reason); + // cancel clone job by source tenant id, this will be called by + // standby tenant iterating multi-source log(upgrade, transfer, alter_ls) + // @params[in] sql_client, the client to use + // @params[in] source tenant id, to identify clone job's source tenant id + // @params[in] reason, reason to cancel + // @params[out] clone_already_finish, whether job already finished + static int cancel_clone_job_by_source_tenant_id( + common::ObISQLClient &sql_client, + const uint64_t source_tenant_id, + const ObCancelCloneJobReason &reason, + bool &clone_already_finish); + static void try_to_record_clone_status_change_rs_event( + const ObCloneJob &clone_job, + const share::ObTenantCloneStatus &prev_clone_status, + const share::ObTenantCloneStatus &cur_clone_status, + const int ret_code, + const ObCancelCloneJobReason &reason); +private: + // inner cancel clone job + // @params[in] clone_op, operator to use + // @params[in] clone_job, which job to cancel + // @params[in] reason, the reason to cancel clone job + // @params[out] clone_already_finish, whether clone job already finished + static int inner_cancel_clone_job_( + ObTenantCloneTableOperator &clone_op, + const ObCloneJob &clone_job, + const ObCancelCloneJobReason &reason, + bool &clone_already_finish); + + // construct data version to record + // @params[in] tenant_id, which tenant clone job + // @params[out] data_version, tenant data version + // @params[out] min_cluster_version, min_cluster_version + static int construct_data_version_to_record_( + const uint64_t tenant_id, + uint64_t &data_version, + uint64_t &min_cluster_version); }; diff --git a/src/rootserver/tenant_snapshot/ob_tenant_snapshot_scheduler.cpp b/src/rootserver/tenant_snapshot/ob_tenant_snapshot_scheduler.cpp index ad03f30f54..c9cbf45fe6 100644 --- a/src/rootserver/tenant_snapshot/ob_tenant_snapshot_scheduler.cpp +++ b/src/rootserver/tenant_snapshot/ob_tenant_snapshot_scheduler.cpp @@ -105,7 +105,6 @@ void ObTenantSnapshotScheduler::do_work() idle_time_us_ = DEFAULT_IDLE_TIME; bool compatibility_satisfied = false; bool status_satisfied = false; - ObAllTenantInfo all_tenant_info; const uint64_t meta_tenant_id = MTL_ID(); const uint64_t user_tenant_id = gen_user_tenant_id(MTL_ID()); while (!has_set_stop()) { @@ -122,11 +121,6 @@ void ObTenantSnapshotScheduler::do_work() LOG_WARN("check_tenant_status failed", KR(ret), K(user_tenant_id)); } else if (!status_satisfied) { LOG_INFO("tenant status is not valid", K(user_tenant_id), K(status_satisfied)); - } else if (OB_FAIL(ObAllTenantInfoProxy::load_tenant_info(user_tenant_id, GCTX.sql_proxy_, - false/*for_update*/, all_tenant_info))) { - LOG_WARN("failed to load tenant info", KR(ret), K(user_tenant_id)); - } else if (!all_tenant_info.is_primary()) { - LOG_INFO("tenant is not primary tenant", K(user_tenant_id)); } else { //********************************************************************* //First, check whether creation or deletion jobs exist. @@ -839,6 +833,8 @@ int ObTenantSnapshotScheduler::check_create_tenant_snapshot_result_( bool whether_to_commit_trans = false; if (OB_SUCC(ret) && ObTenantSnapStatus::CREATING == tenant_snap_item.get_status()) { // majority snapshot has created successful + SCN snapshot_scn_to_persist; + bool need_persist_scn = true; if (need_wait_minority) { // considering the performance of tenant cloning is affected by the missing of snapshots, // we will wait a small interval as long as possible to make all ls replicas create snapshots successful. @@ -848,26 +844,35 @@ int ObTenantSnapshotScheduler::check_create_tenant_snapshot_result_( clog_start_scn = tmp_clog_start_scn; snapshot_scn = tmp_snapshot_scn; - SCN gts_scn; - // TODO: Currently, how to get the maximum scn in ObLSMetaPackage has not yet been solved; - // The end_interval_scn reported by the storage node may be smaller than the actual - // required; We rely on gts when local snapshots are created to determine snapshot scn; - if (OB_FAIL(OB_TS_MGR.get_ts_sync(user_tenant_id, GCONF.rpc_timeout, gts_scn))) { - LOG_WARN("fail to get gts sync", KR(ret), K(user_tenant_id)); - } else if (FALSE_IT(snapshot_scn = MAX(snapshot_scn, gts_scn))) { - } else if (OB_FAIL(table_op.update_tenant_snap_item(tenant_snapshot_id, - ObTenantSnapStatus::CREATING, - ObTenantSnapStatus::DECIDED, - snapshot_scn, - clog_start_scn))) { - LOG_WARN("fail to update snapshot status and interval scn", - KR(ret), K(user_tenant_id), K(tenant_snapshot_id), K(clog_start_scn), K(snapshot_scn)); + ObAllTenantInfo tenant_info; + if (OB_ISNULL(GCTX.sql_proxy_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret)); + } else if (OB_FAIL(ObAllTenantInfoProxy::load_tenant_info( + user_tenant_id, GCTX.sql_proxy_, false/*for update*/, tenant_info))) { + LOG_WARN("fail to get tenant info", K(ret), K(user_tenant_id)); + } else if (OB_FAIL(decide_tenant_snapshot_scn_(table_op, tenant_snapshot_id, tenant_info, snapshot_scn, + snapshot_scn_to_persist, need_persist_scn))) { + LOG_WARN("fail to decide snapshot scn to persist", KR(ret), K(tenant_info), K(snapshot_scn)); + } else if (need_persist_scn && OB_FAIL(table_op.update_tenant_snap_item( + tenant_snapshot_id, + ObTenantSnapStatus::CREATING, + tenant_info.is_standby() + ? ObTenantSnapStatus::CREATING/*try next turn for standby tenant*/ + : ObTenantSnapStatus::DECIDED, + snapshot_scn_to_persist, + clog_start_scn))) { + LOG_WARN("fail to update snapshot status and interval scn", KR(ret), K(tenant_info), + K(tenant_snapshot_id), K(clog_start_scn), K(snapshot_scn_to_persist)); + } else if (tenant_info.is_standby() && OB_FAIL(check_standby_gts_exceed_snapshot_scn_( + table_op, tenant_info.get_tenant_id(), tenant_snapshot_id, snapshot_scn_to_persist))) { + LOG_WARN("fail to check standby tenant gts_scn exceed sync_scn", KR(ret), K(tenant_info)); } else { whether_to_commit_trans = true; } } LOG_INFO("results meet the requirement", - KR(ret), K(create_job), K(clog_start_scn), K(snapshot_scn), + KR(ret), K(create_job), K(clog_start_scn), K(snapshot_scn), K(snapshot_scn_to_persist), K(need_wait_minority), K(whether_to_commit_trans)); } @@ -882,6 +887,100 @@ int ObTenantSnapshotScheduler::check_create_tenant_snapshot_result_( return ret; } +int ObTenantSnapshotScheduler::decide_tenant_snapshot_scn_( + ObTenantSnapshotTableOperator &table_op, + const ObTenantSnapshotID &tenant_snapshot_id, + const ObAllTenantInfo &tenant_info, + const SCN &snapshot_scn, + SCN &output_snapshot_scn, + bool &need_persist_scn) +{ + int ret = OB_SUCCESS; + output_snapshot_scn.reset(); + need_persist_scn = true; + + if (OB_UNLIKELY(!tenant_info.is_valid()) + || OB_UNLIKELY(!snapshot_scn.is_valid()) + || OB_UNLIKELY(!tenant_snapshot_id.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_info), K(snapshot_scn), K(tenant_snapshot_id)); + } else if (tenant_info.is_primary()) { + SCN gts_scn; + // TODO: Currently, how to get the maximum scn in ObLSMetaPackage has not yet been solved; + // The end_interval_scn reported by the storage node may be smaller than the actual + // required; We rely on gts when local snapshots are created to determine snapshot scn; + if (OB_FAIL(OB_TS_MGR.get_ts_sync(tenant_info.get_tenant_id(), GCONF.rpc_timeout, gts_scn))) { + LOG_WARN("fail to get gts sync", KR(ret), K(tenant_info)); + } else { + output_snapshot_scn = MAX(snapshot_scn, gts_scn); + } + } else if (tenant_info.is_standby()) { + // get snapshot_scn from inner_table + ObTenantSnapItem item; + if (OB_UNLIKELY(!table_op.is_inited())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument"); + } else if (OB_FAIL(table_op.get_tenant_snap_item(tenant_snapshot_id, false/*need_lock*/, item))) { + LOG_WARN("fail to get tenant snapshot item", KR(ret), K(tenant_snapshot_id)); + } else if (item.get_snapshot_scn().is_valid()) { + // use snapshot scn in __all_tenant_snapshot table + output_snapshot_scn = item.get_snapshot_scn(); + need_persist_scn = false; // snapshot_scn has already persisted, no need persist again + } else { + output_snapshot_scn = tenant_info.get_sync_scn(); + } + } else { + ret = OB_STATE_NOT_MATCH; + LOG_WARN("unexpected tenant role", KR(ret), K(tenant_info)); + } + return ret; +} + +int ObTenantSnapshotScheduler::check_standby_gts_exceed_snapshot_scn_( + ObTenantSnapshotTableOperator &table_op, + const uint64_t &tenant_id, + const ObTenantSnapshotID &tenant_snapshot_id, + const SCN &snapshot_scn_to_check) +{ + int ret = OB_SUCCESS; + bool finished = false; + SCN gts_scn; + if (OB_UNLIKELY(!snapshot_scn_to_check.is_valid()) + || OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(snapshot_scn_to_check), K(tenant_id)); + } else { + const int64_t start_check_time = ObTimeUtility::current_time(); + const int64_t check_wait_interval = 1 * 1000L * 1000L; // 1s + const int64_t sleep_time = 100 * 1000L; // 100ms + if (OB_SUCC(ret) && !finished && ObTimeUtility::current_time() - start_check_time < check_wait_interval) { + if (OB_FAIL(OB_TS_MGR.get_ts_sync(tenant_id, GCONF.rpc_timeout, gts_scn))) { + LOG_WARN("fail to get gts sync", KR(ret), K(tenant_id)); + } else if (gts_scn < snapshot_scn_to_check) { + // need to wait + finished = false; + LOG_TRACE("standby tenant gts_scn not exceed snapshot_scn, need to wait", K(tenant_id), + K(snapshot_scn_to_check), K(gts_scn)); + ob_usleep(sleep_time); + } else { + // good, gts_scn for standby tenant has already exceed sync_scn + finished = true; + ObTenantSnapItem item; + LOG_INFO("standby tenant gts_scn exceeded sync_scn", K(tenant_id), K(snapshot_scn_to_check), K(gts_scn)); + if (OB_FAIL(table_op.get_tenant_snap_item(tenant_snapshot_id, false/*need_lock*/, item))) { + LOG_WARN("fail to get tenant snapshot item", KR(ret), K(tenant_snapshot_id)); + } else if (OB_FAIL(table_op.update_tenant_snap_item( + item.get_snapshot_name(), + ObTenantSnapStatus::CREATING/*old_status*/, + ObTenantSnapStatus::DECIDED/*new_status*/))) { + LOG_WARN("fail to update snapshot status", KR(ret), K(tenant_snapshot_id), K(item)); + } + } + } + } + return ret; +} + void ObTenantSnapshotScheduler::check_need_wait_minority_create_snapshot_( const ObCreateSnapshotJob &create_job, bool &need_wait_minority) @@ -1000,7 +1099,6 @@ int ObTenantSnapshotScheduler::finish_create_tenant_snapshot_( int ret = OB_SUCCESS; ObMySQLTransaction trans; ObTenantSnapshotTableOperator table_op; - ObTenantSnapItem item; uint64_t data_version = 0; ObTenantSnapItem global_lock; @@ -1012,17 +1110,10 @@ int ObTenantSnapshotScheduler::finish_create_tenant_snapshot_( ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret), K(tenant_snapshot_id), K(user_tenant_id), K(clog_start_scn), K(snapshot_scn), KP(sql_proxy_)); - } else if (OB_FAIL(ObTenantSnapshotUtil::check_and_get_data_version(user_tenant_id, data_version))) { - LOG_WARN("fail to check and get data version or tenant is in upgrading procedure", KR(ret), K(user_tenant_id)); } else if (OB_FAIL(trans.start(sql_proxy_, gen_meta_tenant_id(user_tenant_id)))) { LOG_WARN("failed to start trans", KR(ret), K(user_tenant_id)); } else if (OB_FAIL(table_op.init(user_tenant_id, &trans))) { LOG_WARN("failed to init table op", KR(ret), K(user_tenant_id)); - } else if (OB_FAIL(table_op.get_tenant_snap_item(tenant_snapshot_id, false, item))) { - LOG_WARN("failed to get item", KR(ret), K(tenant_snapshot_id)); - } else if (item.get_data_version() != data_version) { - ret = OB_VERSION_NOT_MATCH; - LOG_WARN("data version are not match", KR(ret), K(item), K(data_version)); } else if (OB_FAIL(table_op.update_tenant_snap_item(tenant_snapshot_id, ObTenantSnapStatus::DECIDED, ObTenantSnapStatus::NORMAL))) { diff --git a/src/rootserver/tenant_snapshot/ob_tenant_snapshot_scheduler.h b/src/rootserver/tenant_snapshot/ob_tenant_snapshot_scheduler.h index f7006ff4e4..3c41d5c918 100644 --- a/src/rootserver/tenant_snapshot/ob_tenant_snapshot_scheduler.h +++ b/src/rootserver/tenant_snapshot/ob_tenant_snapshot_scheduler.h @@ -101,6 +101,32 @@ private: const ObArray &addr_array); int finish_delete_tenant_snapshot_(const share::ObTenantSnapshotID &tenant_snapshot_id, const uint64_t user_tenant_id); + + // decide snapshot scn for clone tenant snapshot + // @params[in] table_op, operator to use + // @params[in] tenant_snapshot_id, snapshot_id + // @params[in] tenant_info, user tenant info + // @params[in] snapshot_scn, the scn of snapshot + // @params[out] output_snapshot_scn, the scn to persist + // @params[out] need_persist_scn, whether persist scn in table + int decide_tenant_snapshot_scn_( + ObTenantSnapshotTableOperator &table_op, + const ObTenantSnapshotID &tenant_snapshot_id, + const ObAllTenantInfo &tenant_info, + const SCN &snapshot_scn, + SCN &output_snapshot_scn, + bool &need_persist_scn); + + // check standby tenant gts_scn exceed sync_scn + // @params[in] table_op, table operator to use + // @params[in] tenant_id, which tenant + // @params[in] tenant_snapshot_id, which snapshot + // @params[in] snapshot_scn_to_check, snapshot scn + int check_standby_gts_exceed_snapshot_scn_( + ObTenantSnapshotTableOperator &table_op, + const uint64_t &tenant_id, + const ObTenantSnapshotID &tenant_snapshot_id, + const SCN &snapshot_scn_to_check); private: static const int64_t DEFAULT_TIMEOUT = 10 * 1000 * 1000L; static const int64_t DEFAULT_IDLE_TIME = 60 * 1000 * 1000L; diff --git a/src/rootserver/tenant_snapshot/ob_tenant_snapshot_util.cpp b/src/rootserver/tenant_snapshot/ob_tenant_snapshot_util.cpp index 04462d25a4..5edb9961db 100644 --- a/src/rootserver/tenant_snapshot/ob_tenant_snapshot_util.cpp +++ b/src/rootserver/tenant_snapshot/ob_tenant_snapshot_util.cpp @@ -16,6 +16,8 @@ #include "rootserver/ob_ls_service_helper.h" //ObTenantLSInfo #include "share/backup/ob_tenant_archive_mgr.h" #include "share/balance/ob_balance_job_table_operator.h" //ObBalanceJob +#include "share/balance/ob_balance_task_table_operator.h" //ObBalanceTaskArray +#include "share/balance/ob_balance_task_helper_operator.h" //ObBalanceTaskHelper #include "share/location_cache/ob_location_service.h" #include "share/ls/ob_ls_operator.h" //ObLSAttrOperator #include "share/ob_global_stat_proxy.h" //ObGlobalStatProxy @@ -37,7 +39,10 @@ static const char* conflict_case_with_clone_strs[] = { "MODIFY_LS", "MODIFY_REPLICA", "MODIFY_TENANT_ROLE_OR_SWITCHOVER_STATUS", - "DELAY_DROP_TENANT" + "DELAY_DROP_TENANT", + "STANDBY_UPGRADE", + "STANDBY_TRANSFER", + "STANDBY_MODIFY_LS" }; const char* ObConflictCaseWithClone::get_case_name_str() const @@ -330,17 +335,6 @@ int ObTenantSnapshotUtil::check_source_tenant_info(const uint64_t tenant_id, } else { LOG_USER_ERROR(OB_OP_NOT_ALLOW, print_str.ptr()); } - } else if (OB_FAIL(ObAllTenantInfoProxy::load_tenant_info(tenant_id, GCTX.sql_proxy_, - false/*for_update*/, all_tenant_info))) { - LOG_WARN("failed to load tenant info", KR(ret), K(tenant_id)); - } else if (!all_tenant_info.is_primary()) { - ret = OB_OP_NOT_ALLOW; - LOG_WARN("tenant is not primary tenant", KR(ret), K(tenant_id)); - if (OB_TMP_FAIL(print_str.assign_fmt("source tenant is not primary tenant, %s", get_op_print_str(op)))) { - LOG_WARN("assign failed", KR(tmp_ret), K(get_op_print_str(op))); - } else { - LOG_USER_ERROR(OB_OP_NOT_ALLOW, print_str.ptr()); - } } return ret; } @@ -949,6 +943,7 @@ int ObTenantSnapshotUtil::check_and_get_data_version(const uint64_t tenant_id, uint64_t target_data_version = 0; uint64_t compatible_version = 0; ObConflictCaseWithClone case_to_check(ObConflictCaseWithClone::UPGRADE); + ObAllTenantInfo all_tenant_info; if (OB_UNLIKELY(!is_user_tenant(tenant_id))) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid tenant id", KR(ret), K(tenant_id)); @@ -959,16 +954,26 @@ int ObTenantSnapshotUtil::check_and_get_data_version(const uint64_t tenant_id, LOG_WARN("fail to get current data version", KR(ret), K(tenant_id)); } else if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, compatible_version))) { LOG_WARN("fail to get min data version", KR(ret), K(tenant_id)); - } else if (OB_FAIL(check_tenant_is_in_upgrading_procedure_(tenant_id, target_data_version))) { - LOG_WARN("fail to check or tenant is in upgrading procedure", KR(ret), K(tenant_id)); - } else if (current_data_version != compatible_version - || current_data_version != target_data_version) { + } else if (current_data_version != compatible_version) { ret = OB_CONFLICT_WITH_CLONE; LOG_WARN("source tenant is in upgrading procedure, can not do clone", KR(ret), K(tenant_id), - K(current_data_version), K(target_data_version), K(compatible_version)); + K(current_data_version), K(compatible_version)); LOG_USER_ERROR(OB_CONFLICT_WITH_CLONE, tenant_id, case_to_check.get_case_name_str(), CLONE_PROCEDURE_STR); - } else { + } else if (OB_FAIL(ObAllTenantInfoProxy::load_tenant_info( + tenant_id, GCTX.sql_proxy_, false/*for_update*/, all_tenant_info))) { + LOG_WARN("failed to load tenant info", KR(ret), K(tenant_id)); + } else if (all_tenant_info.is_standby()) { data_version = current_data_version; + } else if (all_tenant_info.is_primary()) { + if (OB_FAIL(check_tenant_is_in_upgrading_procedure_(tenant_id, target_data_version))) { + LOG_WARN("fail to check or tenant is in upgrading procedure", KR(ret), K(tenant_id)); + } else { + data_version = current_data_version; + } + } else { + ret = OB_OP_NOT_ALLOW; + LOG_WARN("can not clone tenant with source tenant neither primary nor standby", + KR(ret), K(tenant_id), K(all_tenant_info)); } return ret; } @@ -1081,13 +1086,8 @@ int ObTenantSnapshotUtil::check_tenant_in_cloning_procedure_in_trans_( int ret = OB_SUCCESS; is_tenant_in_cloning = true; common::ObMySQLTransaction trans; - int64_t user_tenant_id = OB_INVALID_TENANT_ID; int64_t meta_tenant_id = OB_INVALID_TENANT_ID; - ObTenantSnapshotTableOperator tenant_snapshot_table_operator; - ObTenantSnapItem tenant_snapshot_item; - bool lock_line = true; bool tenant_snapshot_table_exist = false; - // is_tenant_in_cloning = false if one of conditions below is satisfied // (1) tenant is not up to version 4.3, __all_tenant_snapshot not exists, clone is not supported // (2) line with snapshot_id = 0 in __all_tenant_snapshot not exists @@ -1104,14 +1104,45 @@ int ObTenantSnapshotUtil::check_tenant_in_cloning_procedure_in_trans_( is_tenant_in_cloning = false; LOG_INFO("tenant snapshot table not exists, tenant is not cloning", K(tenant_id)); } else { - user_tenant_id = gen_user_tenant_id(tenant_id); meta_tenant_id = gen_meta_tenant_id(tenant_id); if (OB_ISNULL(GCTX.sql_proxy_)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret)); } else if (OB_FAIL(trans.start(GCTX.sql_proxy_, meta_tenant_id))) { LOG_WARN("failed to start trans", KR(ret), K(tenant_id), K(meta_tenant_id)); - } else if (OB_FAIL(tenant_snapshot_table_operator.init(user_tenant_id, &trans))) { + } else if (OB_FAIL(inner_check_tenant_in_cloning_procedure_in_trans_( + trans, tenant_id, is_tenant_in_cloning))) { + LOG_WARN("fail to inner check tenant in cloning procedure", KR(ret), K(tenant_id)); + } + if (trans.is_started()) { + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) { + LOG_WARN("trans end failed", KR(tmp_ret), KR(ret)); + ret = OB_SUCC(ret) ? tmp_ret : ret; + } + } + } + return ret; +} + +int ObTenantSnapshotUtil::inner_check_tenant_in_cloning_procedure_in_trans_( + common::ObMySQLTransaction &trans, + const uint64_t tenant_id, + bool &is_tenant_in_cloning) +{ + int ret = OB_SUCCESS; + is_tenant_in_cloning = true; + int64_t user_tenant_id = OB_INVALID_TENANT_ID; + ObTenantSnapshotTableOperator tenant_snapshot_table_operator; + ObTenantSnapItem tenant_snapshot_item; + bool lock_line = true; + + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid tenant id", KR(ret), K(tenant_id)); + } else { + user_tenant_id = gen_user_tenant_id(tenant_id); + if (OB_FAIL(tenant_snapshot_table_operator.init(user_tenant_id, &trans))) { LOG_WARN("fail to init tenant snapshot table operator", KR(ret), K(tenant_id), K(user_tenant_id)); // 1. lock __all_tenant_snapshot where snapshot_id = 0 } else if (OB_FAIL(tenant_snapshot_table_operator.get_tenant_snap_item( @@ -1134,16 +1165,9 @@ int ObTenantSnapshotUtil::check_tenant_in_cloning_procedure_in_trans_( // good, there is no ls snapshot exists for this tenant is_tenant_in_cloning = false; LOG_TRACE("snapshot with GLOBAL_STATE_ID is in NORMAL status," - " tenant is not in cloning procedure", K(tenant_id), K(user_tenant_id), K(meta_tenant_id), + " tenant is not in cloning procedure", K(tenant_id), K(user_tenant_id), K(tenant_snapshot_item)); } - if (trans.is_started()) { - int tmp_ret = OB_SUCCESS; - if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) { - LOG_WARN("trans end failed", KR(tmp_ret), KR(ret)); - ret = OB_SUCC(ret) ? tmp_ret : ret; - } - } } return ret; } @@ -1231,6 +1255,85 @@ int ObTenantSnapshotUtil::check_snapshot_table_exists_( return ret; } +int ObTenantSnapshotUtil::cancel_existed_clone_job_if_need( + const uint64_t tenant_id, + const ObConflictCaseWithClone &case_to_check) +{ + int ret = OB_SUCCESS; + bool clone_already_finish = false; // not used + uint64_t meta_tenant_id = OB_INVALID_TENANT_ID; + bool is_cloning = false; + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id)) + || OB_UNLIKELY(!case_to_check.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(case_to_check)); + } else { + common::ObMySQLTransaction trans; + meta_tenant_id = gen_meta_tenant_id(tenant_id); + if (OB_ISNULL(GCTX.sql_proxy_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret)); + } else if (OB_FAIL(trans.start(GCTX.sql_proxy_, meta_tenant_id))) { + LOG_WARN("failed to start trans", KR(ret), K(tenant_id), K(meta_tenant_id)); + } else if (OB_FAIL(check_standby_tenant_not_in_cloning_procedure(trans, tenant_id, is_cloning))) { + LOG_WARN("fail to check standby tenant whether in cloning procedure", KR(ret), K(tenant_id)); + } else if (is_cloning) { + ObCancelCloneJobReason reason; + if (OB_FAIL(reason.init_by_conflict_case(case_to_check))) { + LOG_WARN("fail to inti cancel reason by conflict case", KR(ret), K(case_to_check)); + } else if (OB_FAIL(ObTenantCloneUtil::cancel_clone_job_by_source_tenant_id( + *GCTX.sql_proxy_, tenant_id, reason, clone_already_finish))) { + LOG_WARN("fail to cancel clone job by source tenant id", KR(ret), K(tenant_id), K(reason)); + } + } else { + // This function will cancel existed clone job + // If no clone job exists, there is no need to cancel, just do nothing + LOG_TRACE("there is no clone job to cancel", K(tenant_id)); + } + if (trans.is_started()) { + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) { + LOG_WARN("trans end failed", KR(tmp_ret), KR(ret)); + ret = OB_SUCC(ret) ? tmp_ret : ret; + } + } + } + return ret; +} + +int ObTenantSnapshotUtil::check_standby_tenant_not_in_cloning_procedure( + common::ObMySQLTransaction &trans, + const uint64_t tenant_id, + bool &is_cloning) +{ + int ret = OB_SUCCESS; + is_cloning = false; + bool is_compatible = false; + int64_t check_begin_time = ObTimeUtility::current_time(); + LOG_TRACE("start to check whether standby tenant is in cloning procedure", K(tenant_id)); + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id)); + } else if (is_sys_tenant(tenant_id) || is_meta_tenant(tenant_id)) { + is_cloning = false; + LOG_TRACE("sys and meta tenant can not in cloning procedure", K(tenant_id)); + } else if (OB_FAIL(ObShareUtil::check_compat_version_for_clone_standby_tenant( + tenant_id, is_compatible))) { + LOG_WARN("fail to check compatible version for clone standby tenant", KR(ret), K(tenant_id)); + } else if (!is_compatible) { + // tenant can not be in cloning procedure, do nothing + is_cloning = false; + LOG_TRACE("standby tenant not in 432 version, can not in clonine procedure", KR(ret), K(tenant_id)); + } else if (OB_FAIL(inner_check_tenant_in_cloning_procedure_in_trans_( + trans, tenant_id, is_cloning))) { + LOG_WARN("fail to inner check tenant in cloning procedure", KR(ret), K(tenant_id)); + } + int64_t cost = ObTimeUtility::current_time() - check_begin_time; + LOG_TRACE("finish check whether standby tenant is in cloning procedure", KR(ret), K(tenant_id), + K(is_cloning), K(cost)); + return ret; +} + int ObTenantSnapshotUtil::check_tenant_not_in_cloning_procedure( const uint64_t tenant_id, const ObConflictCaseWithClone &case_to_check) @@ -1267,38 +1370,186 @@ int ObTenantSnapshotUtil::check_tenant_has_no_conflict_tasks( int ret = OB_SUCCESS; int64_t check_begin_time = ObTimeUtility::current_time(); uint64_t data_version = 0; + ObAllTenantInfo all_tenant_info; + bool is_compatible_to_clone = false; LOG_INFO("begin to check whether tenant has conflict tasks", K(tenant_id)); - if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) { + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id)) || OB_ISNULL(GCTX.sql_proxy_)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret), K(tenant_id)); } else if (is_sys_tenant(tenant_id) || is_meta_tenant(tenant_id)) { ret = OB_OP_NOT_ALLOW; LOG_WARN("sys or meta tenant can not in clone procedure, clone not allowed", KR(ret), K(tenant_id)); + } else if (OB_FAIL(ObShareUtil::check_compat_version_for_clone_tenant_with_tenant_role( + tenant_id, is_compatible_to_clone))) { + LOG_WARN("fail to check tenant compatible with clone tenant", KR(ret), K(tenant_id)); + } else if (!is_compatible_to_clone) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("can not clone tenant with tenant role not expected", KR(ret), K(tenant_id)); } else if (OB_FAIL(check_tenant_is_in_dropping_procedure_(tenant_id))) { LOG_WARN("fail to check tenant in dropping procedure", KR(ret), K(tenant_id)); - } else if (OB_FAIL(check_tenant_is_in_upgrading_procedure_(tenant_id, data_version))) { - LOG_WARN("fail to check tenant in upgrading procedure", KR(ret), K(tenant_id)); - } else if (OB_FAIL(check_tenant_is_in_transfer_procedure_(tenant_id))) { - LOG_WARN("fail to check tenant in transfer procedure", KR(ret), K(tenant_id)); } else if (OB_FAIL(check_tenant_is_in_modify_resource_pool_procedure_(tenant_id))) { LOG_WARN("fail to check tenant in modify resource pool procedure", KR(ret), K(tenant_id)); } else if (OB_FAIL(check_tenant_is_in_modify_unit_procedure_(tenant_id))) { LOG_WARN("fail to check tenant in modify unit procedure", KR(ret), K(tenant_id)); - } else if (OB_FAIL(check_tenant_is_in_modify_ls_procedure_(tenant_id))) { - LOG_WARN("fail to check tenant in modify ls procedure", KR(ret), K(tenant_id)); } else if (OB_FAIL(check_tenant_is_in_modify_replica_procedure_(tenant_id))) { LOG_WARN("fail to check tenant in modify replica procedure", KR(ret), K(tenant_id)); } else if (OB_FAIL(check_tenant_is_in_switchover_procedure_(tenant_id))) { LOG_WARN("fail to check tenant in switchover procedure", KR(ret), K(tenant_id)); + } else if (OB_FAIL(ObAllTenantInfoProxy::load_tenant_info(tenant_id, GCTX.sql_proxy_, + false/*for_update*/, all_tenant_info))) { + LOG_WARN("failed to load tenant info", KR(ret), K(tenant_id)); + } else if (all_tenant_info.is_primary()) { + if (OB_FAIL(check_tenant_is_in_upgrading_procedure_(tenant_id, data_version))) { + LOG_WARN("fail to check tenant in upgrading procedure", KR(ret), K(tenant_id)); + } else if (OB_FAIL(check_tenant_is_in_transfer_procedure_(tenant_id))) { + LOG_WARN("fail to check tenant in transfer procedure", KR(ret), K(tenant_id)); + } else if (OB_FAIL(check_tenant_is_in_modify_ls_procedure_(tenant_id))) { + LOG_WARN("fail to check tenant in modify ls procedure", KR(ret), K(tenant_id)); + } + //TODO@jingyu.cr: need to consider these cases: + // (1) conflict check for arbitration service status + } else if (all_tenant_info.is_standby()) { + if (OB_FAIL(check_standby_tenant_has_no_conflict_tasks_(tenant_id))) { + LOG_WARN("fail to check conflict tasks for standby tenant", KR(ret), K(tenant_id)); + } + } else { + ret = OB_OP_NOT_ALLOW; + LOG_WARN("can not clone tenant with source tenant neither primary nor standby", + KR(ret), K(tenant_id), K(all_tenant_info)); } - //TODO@jingyu.cr: need to consider these cases: - // (1) conflict check for arbitration service status int64_t cost = ObTimeUtility::current_time() - check_begin_time; LOG_INFO("finish check whether tenant has conflict tasks", KR(ret), K(tenant_id), K(check_begin_time), K(cost)); return ret; } +int ObTenantSnapshotUtil::check_standby_tenant_has_no_conflict_tasks_( + const uint64_t tenant_id) +{ + int ret = OB_SUCCESS; + ObLSRecoveryStatOperator recovery_op; + ObLSRecoveryStat ls_recovery_stat; + ObMySQLTransaction trans; + // for standby tenant, we try to make positive check for upgrade/transfer/ls-modify + // upgrade operation will be double checked just before finish creating snapshot + // in function ObTenantSnapshotScheduler::check_data_version_before_finish_snapshot_creation_ + if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id)); + } else if (OB_ISNULL(GCTX.sql_proxy_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret)); + } else if (OB_FAIL(trans.start(GCTX.sql_proxy_, gen_meta_tenant_id(tenant_id)))) { + LOG_WARN("failed to start trans", KR(ret), K(tenant_id)); + } else if (OB_FAIL(recovery_op.get_ls_recovery_stat( + tenant_id, SYS_LS, true/* for update */, ls_recovery_stat, trans))) { + LOG_WARN("failed to get SYS ls recovery stat", KR(ret), K(tenant_id)); + } else if (OB_FAIL(check_standby_tenant_is_in_transfer_procedure_(tenant_id))) { + LOG_WARN("fail to check standby tenant whether in transfer or modify ls procedure", KR(ret), K(tenant_id)); + } else if (OB_FAIL(check_standby_tenant_is_in_modify_ls_procedure_(tenant_id))) { + LOG_WARN("fail to check standby tenant whether in modify ls procedure", KR(ret), K(tenant_id)); + } + if (trans.is_started()) { + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) { + LOG_WARN("trans end failed", KR(tmp_ret), KR(ret)); + ret = OB_SUCC(ret) ? tmp_ret : ret; + } + } + return ret; +} + +int ObTenantSnapshotUtil::check_standby_tenant_is_in_modify_ls_procedure_( + const uint64_t tenant_id) +{ + int ret = OB_SUCCESS; + int64_t check_begin_time = ObTimeUtility::current_time(); + ObConflictCaseWithClone case_to_check(ObConflictCaseWithClone::STANDBY_MODIFY_LS); + LOG_INFO("begin to check whether standby tenant is in modify_ls procedure", K(tenant_id)); + + if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id)); + } else if (OB_ISNULL(GCTX.sql_proxy_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret)); + } else { + share::ObLSStatusOperator ls_status_operator; + common::ObArray ls_status_info_array; + if (OB_FAIL(ls_status_operator.get_all_ls_status_by_order(tenant_id, ls_status_info_array, *GCTX.sql_proxy_))) { + LOG_WARN("fail to get all ls status", KR(ret), K(tenant_id)); + } else if (ls_status_info_array.count() <= 0) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ls_status_info_array has no member", KR(ret), K(ls_status_info_array)); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < ls_status_info_array.count(); ++i) { + share::ObLSStatusInfo &ls_status_info = ls_status_info_array.at(i); + if (!ls_status_info.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(ls_status_info)); + } else if (ls_status_info.ls_is_creating() + || ls_status_info.ls_is_created() + || ls_status_info.ls_is_create_abort()) { + ret = OB_CONFLICT_WITH_CLONE; + LOG_WARN("ls not in normal status, conflict with clone procedure", KR(ret), K(ls_status_info)); + LOG_USER_ERROR(OB_CONFLICT_WITH_CLONE, tenant_id, case_to_check.get_case_name_str(), CLONE_PROCEDURE_STR); + } else { + LOG_TRACE("ls status not conflict with clone procedure", KR(ret), K(ls_status_info)); + } + } + } + } + int64_t cost = ObTimeUtility::current_time() - check_begin_time; + LOG_INFO("finish check whether standby tenant is in modify ls procedure", + KR(ret), K(tenant_id), K(check_begin_time), K(cost)); + return ret; +} + +int ObTenantSnapshotUtil::check_standby_tenant_is_in_transfer_procedure_( + const uint64_t tenant_id) +{ + int ret = OB_SUCCESS; + int64_t check_begin_time = ObTimeUtility::current_time(); + ObConflictCaseWithClone case_to_check(ObConflictCaseWithClone::STANDBY_TRANSFER); + ObBalanceTaskArray balance_tasks; + ObArray ls_balance_tasks; + SCN max_scn; + max_scn.set_max(); + LOG_INFO("begin to check whether standby tenant is in transfer procedure", K(tenant_id)); + + if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id)); + } else if (OB_ISNULL(GCTX.sql_proxy_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret)); + } else if (OB_FAIL(ObBalanceTaskTableOperator::load_task(tenant_id, balance_tasks, *GCTX.sql_proxy_))) { + LOG_WARN("failed to load task from __all_balance_task", KR(ret), K(tenant_id)); + } else if (0 != balance_tasks.count()) { + ret = OB_CONFLICT_WITH_CLONE; + LOG_WARN("balance task is running, can not clone standby tenant", KR(ret), K(tenant_id), K(balance_tasks)); + LOG_USER_ERROR(OB_CONFLICT_WITH_CLONE, tenant_id, case_to_check.get_case_name_str(), CLONE_PROCEDURE_STR); + } else if (OB_FAIL(ObBalanceTaskHelperTableOperator::load_tasks_order_by_scn( + tenant_id, *GCTX.sql_proxy_, max_scn, ls_balance_tasks))) { + if (OB_ENTRY_NOT_EXIST == ret) { + // good, no transfer task exists + ret = OB_SUCCESS; + } else { + LOG_WARN("failed to load task from __all_balance_task_helper", KR(ret), K(tenant_id)); + } + } else { + // load_tasks_order_by_scn return OB_SUCCESS means task array not empty + ret = OB_CONFLICT_WITH_CLONE; + LOG_WARN("balance task in task_helper is running, can not clone standby tenant", + KR(ret), K(tenant_id), K(ls_balance_tasks)); + LOG_USER_ERROR(OB_CONFLICT_WITH_CLONE, tenant_id, case_to_check.get_case_name_str(), CLONE_PROCEDURE_STR); + } + int64_t cost = ObTimeUtility::current_time() - check_begin_time; + LOG_INFO("finish check whether standby tenant is in transfer procedure", + KR(ret), K(tenant_id), K(check_begin_time), K(cost)); + return ret; +} + int ObTenantSnapshotUtil::check_tenant_is_in_dropping_procedure_( const uint64_t tenant_id) { @@ -1617,11 +1868,15 @@ int ObTenantSnapshotUtil::check_tenant_is_in_switchover_procedure_( true/*for_update*/, tenant_info))) { LOG_WARN("failed to load tenant info", KR(ret), K(tenant_id)); - } else if (!tenant_info.is_primary() || !tenant_info.is_normal_status()) { + } else if (!tenant_info.is_normal_status()) { ret = OB_CONFLICT_WITH_CLONE; - LOG_WARN("tenant is not PRIMARY or NORMAL, create snapshot or clone tenant not allowed", + LOG_WARN("tenant is not NORMAL, create snapshot or clone tenant not allowed", KR(ret), K(tenant_id), K(tenant_info)); LOG_USER_ERROR(OB_CONFLICT_WITH_CLONE, tenant_id, case_to_check.get_case_name_str(), CLONE_PROCEDURE_STR); + } else if (!tenant_info.is_standby() && !tenant_info.is_primary()) { + ret = OB_CONFLICT_WITH_CLONE; + LOG_WARN("tenant is not PRIMARY or STANDBY, create snapshot or clone tenant not allowed", + KR(ret), K(tenant_id), K(tenant_info)); } if (trans.is_started()) { int tmp_ret = OB_SUCCESS; @@ -1870,3 +2125,43 @@ int ObTenantSnapshotUtil::inner_lock_line_for_all_tenant_table_( } return ret; } + +int ObTenantSnapshotUtil::check_current_and_target_data_version( + const uint64_t tenant_id, + uint64_t &data_version) +{ + int ret = OB_SUCCESS; + data_version = 0; + ObConflictCaseWithClone case_to_check(ObConflictCaseWithClone::UPGRADE); + uint64_t data_version_in_tenant_param_table = 0; + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid tenant id", KR(ret), K(tenant_id)); + } else if (OB_ISNULL(GCTX.sql_proxy_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret)); + } else { + ObGlobalStatProxy proxy(*GCTX.sql_proxy_, tenant_id); + uint64_t target_data_version = 0; + uint64_t current_data_version = 0; + if (OB_FAIL(proxy.get_target_data_version(false/*for_update*/, target_data_version))) { + LOG_WARN("fail to get target data version", KR(ret), K(tenant_id)); + } else if (OB_FAIL(proxy.get_current_data_version(current_data_version))) { + LOG_WARN("fail to get current data version", KR(ret), K(tenant_id)); + } else if (target_data_version != current_data_version) { + ret = OB_CONFLICT_WITH_CLONE; + LOG_WARN("source tenant is in upgrading procedure, can not do clone", KR(ret), + K(tenant_id), K(current_data_version), K(target_data_version)); + } else if (OB_FAIL(ObShareUtil::fetch_current_data_version( + *GCTX.sql_proxy_, tenant_id, data_version_in_tenant_param_table))) { + LOG_WARN("fail to fetch current data version from tenant parameter table", KR(ret), K(tenant_id)); + } else if (data_version_in_tenant_param_table != current_data_version) { + ret = OB_CONFLICT_WITH_CLONE; + LOG_WARN("source tenant is in upgrading procedure, can not do clone", KR(ret), + K(tenant_id), K(current_data_version), K(data_version_in_tenant_param_table)); + } else { + data_version = current_data_version; + } + } + return ret; +} diff --git a/src/rootserver/tenant_snapshot/ob_tenant_snapshot_util.h b/src/rootserver/tenant_snapshot/ob_tenant_snapshot_util.h index bc625521ba..cae9187680 100644 --- a/src/rootserver/tenant_snapshot/ob_tenant_snapshot_util.h +++ b/src/rootserver/tenant_snapshot/ob_tenant_snapshot_util.h @@ -46,6 +46,9 @@ public: MODIFY_REPLICA = 5, MODIFY_TENANT_ROLE_OR_SWITCHOVER_STATUS = 6, DELAY_DROP_TENANT = 7, + STANDBY_UPGRADE = 8, + STANDBY_TRANSFER = 9, + STANDBY_MODIFY_LS = 10, MAX_CASE_NAME }; public: @@ -66,8 +69,12 @@ public: bool is_modify_unit() const { return MODIFY_UNIT == case_name_; } bool is_modify_ls() const { return MODIFY_LS == case_name_; } bool is_modify_replica() const { return MODIFY_REPLICA == case_name_; } - bool is_switchover() const { return MODIFY_TENANT_ROLE_OR_SWITCHOVER_STATUS == case_name_; } + bool is_modify_tenant_role_or_switchover() const { return MODIFY_TENANT_ROLE_OR_SWITCHOVER_STATUS == case_name_; } bool is_delay_drop_tenant() const { return DELAY_DROP_TENANT == case_name_; } + bool is_standby_upgrade() const { return STANDBY_UPGRADE == case_name_; } + bool is_standby_transfer() const { return STANDBY_TRANSFER == case_name_; } + bool is_standby_modify_ls() const { return STANDBY_MODIFY_LS == case_name_; } + bool is_standby_related() const { return STANDBY_UPGRADE == case_name_ || STANDBY_TRANSFER == case_name_ || STANDBY_MODIFY_LS == case_name_; } const ConflictCaseWithClone &get_case_name() const { return case_name_; } const char* get_case_name_str() const; private: @@ -175,6 +182,17 @@ public: static int check_tenant_not_in_cloning_procedure( const uint64_t tenant_id, const ObConflictCaseWithClone &case_to_check); + + static int cancel_existed_clone_job_if_need( + const uint64_t tenant_id, + const ObConflictCaseWithClone &case_to_check); + + // check whether standby tenant is in clonong procedure when replay log + static int check_standby_tenant_not_in_cloning_procedure( + common::ObMySQLTransaction &trans, + const uint64_t tenant_id, + bool &is_cloning); + static int check_tenant_has_no_conflict_tasks(const uint64_t tenant_id); // when update __all_unit, in some case we have to lock __all_unit first and then check clone @@ -201,7 +219,28 @@ public: const uint64_t tenant_id_to_lock, ObISQLClient &sql_proxy, share::schema::ObTenantStatus &tenant_status); + // get tenant current/target data version from __all_core_table + // and compatible version in __tenant_parameter + static int check_current_and_target_data_version( + const uint64_t tenant_id, + uint64_t &data_version); private: + // check whether tenant is in cloning procedure in trans + // @params[in] user_tenant_id, the tenant to check + // @params[out] is_tenant_in_cloning, the output + // + // is_tenant_in_cloning = false if one of conditions below is satisfied + // (1) tenant is not up to version 4.3, clone is not supported + // (2) line with snapshot_id = 0 in __all_tenant_snapshot not exists + // (3) line with snapshot_id = 0 in __all_tenant_snapshot exists but is NORMAL status + // and no snapshot item exists in __all_tenant_snapshot_ls for this tenant + static int check_tenant_in_cloning_procedure_in_trans_( + const uint64_t user_tenant_id, + bool &is_tenant_in_cloning); + static int inner_check_tenant_in_cloning_procedure_in_trans_( + common::ObMySQLTransaction &trans, + const uint64_t tenant_id, + bool &is_tenant_in_cloning); static int check_tenant_snapshot_simulated_mutex_(ObMySQLTransaction &trans, const uint64_t tenant_id, share::ObTenantSnapItem &special_item, @@ -216,18 +255,6 @@ private: const uint64_t tenant_id, const TenantSnapshotOp op, const share::ObTenantSnapItem &special_item); - // check whether tenant is in cloning procedure in trans - // @params[in] user_tenant_id, the tenant to check - // @params[out] is_tenant_in_cloning, the output - // - // is_tenant_in_cloning = false if one of conditions below is satisfied - // (1) tenant is not up to version 4.3, clone is not supported - // (2) line with snapshot_id = 0 in __all_tenant_snapshot not exists - // (3) line with snapshot_id = 0 in __all_tenant_snapshot exists but is NORMAL status - // and no snapshot item exists in __all_tenant_snapshot_ls for this tenant - static int check_tenant_in_cloning_procedure_in_trans_( - const uint64_t user_tenant_id, - bool &is_tenant_in_cloning); // check whether tenant is upgrading // @params[in] tenant_id, tenant to check static int check_tenant_is_in_upgrading_procedure_(const uint64_t tenant_id, @@ -239,6 +266,9 @@ private: static int check_tenant_is_in_modify_ls_procedure_(const uint64_t tenant_id); static int check_tenant_is_in_modify_replica_procedure_(const uint64_t tenant_id); static int check_tenant_is_in_switchover_procedure_(const uint64_t tenant_id); + static int check_standby_tenant_has_no_conflict_tasks_(const uint64_t tenant_id); + static int check_standby_tenant_is_in_transfer_procedure_(const uint64_t tenant_id); + static int check_standby_tenant_is_in_modify_ls_procedure_(const uint64_t tenant_id); static int check_unit_infos_(common::sqlclient::ObMySQLResult &res, const uint64_t tenant_id); static int check_snapshot_table_exists_(const uint64_t user_tenant_id, bool &tenant_snapshot_table_exist); private: diff --git a/src/share/balance/ob_balance_task_helper_operator.cpp b/src/share/balance/ob_balance_task_helper_operator.cpp index 110092dbfc..b139aa7c8d 100644 --- a/src/share/balance/ob_balance_task_helper_operator.cpp +++ b/src/share/balance/ob_balance_task_helper_operator.cpp @@ -311,6 +311,7 @@ int ObBalanceTaskHelperTableOperator::remove_task(const uint64_t tenant_id, int64_t affected_rows = 0; ObSqlString sql; const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id); + DEBUG_SYNC(BEFORE_REMOVE_BALANCE_TASK_HELPER); if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || !operation_scn.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(operation_scn)); diff --git a/src/share/ob_debug_sync_point.h b/src/share/ob_debug_sync_point.h index 46b0ee8499..d29ed2dfe8 100755 --- a/src/share/ob_debug_sync_point.h +++ b/src/share/ob_debug_sync_point.h @@ -608,6 +608,8 @@ class ObString; ACT(BEFORE_ADD_REFRESH_SCHEMA_TASK,)\ ACT(BEFORE_ADD_ASYNC_REFRESH_SCHEMA_TASK,)\ ACT(AFTER_MEMBERLIST_CHANGED,)\ + ACT(AFTER_FIRST_CLONE_CHECK_FOR_STANDBY,)\ + ACT(BEFORE_REMOVE_BALANCE_TASK_HELPER,)\ ACT(BEFORE_CHOOSE_SOURCE,)\ ACT(AFTER_CHECK_LOG_NEED_REBUILD,)\ ACT(BEFORE_SEND_ALTER_TABLE,)\ diff --git a/src/share/ob_share_util.cpp b/src/share/ob_share_util.cpp index dc01f70e8c..79a057f35d 100644 --- a/src/share/ob_share_util.cpp +++ b/src/share/ob_share_util.cpp @@ -446,29 +446,31 @@ bool ObShareUtil::is_tenant_enable_transfer(const uint64_t tenant_id) return bret; } -int ObShareUtil::check_compat_version_for_clone_tenant( +int ObShareUtil::check_compat_version_for_tenant( const uint64_t tenant_id, + const uint64_t target_data_version, bool &is_compatible) { int ret = OB_SUCCESS; is_compatible = false; uint64_t data_version = 0; - if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)) { + if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id) + || OB_UNLIKELY(0 == target_data_version)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", KR(ret), K(tenant_id)); + LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(target_data_version)); } else if (OB_FAIL(GET_MIN_DATA_VERSION(OB_SYS_TENANT_ID, data_version))) { LOG_WARN("fail to get sys tenant data version", KR(ret)); - } else if (DATA_VERSION_4_3_0_0 > data_version) { + } else if (target_data_version > data_version) { is_compatible = false; } else if (is_sys_tenant(tenant_id)) { is_compatible = true; } else if (OB_FAIL(GET_MIN_DATA_VERSION(gen_user_tenant_id(tenant_id), data_version))) { LOG_WARN("fail to get user tenant data version", KR(ret), "tenant_id", gen_user_tenant_id(tenant_id)); - } else if (DATA_VERSION_4_3_0_0 > data_version) { + } else if (target_data_version > data_version) { is_compatible = false; } else if (OB_FAIL(GET_MIN_DATA_VERSION(gen_meta_tenant_id(tenant_id), data_version))) { LOG_WARN("fail to get meta tenant data version", KR(ret), "tenant_id", gen_meta_tenant_id(tenant_id)); - } else if (DATA_VERSION_4_3_0_0 > data_version) { + } else if (target_data_version > data_version) { is_compatible = false; } else { is_compatible = true; @@ -476,5 +478,74 @@ int ObShareUtil::check_compat_version_for_clone_tenant( return ret; } +int ObShareUtil::check_compat_version_for_clone_standby_tenant( + const uint64_t tenant_id, + bool &is_compatible) +{ + int ret = OB_SUCCESS; + is_compatible = false; + uint64_t target_data_version = DATA_VERSION_4_3_2_0; + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id)); + } else if (OB_FAIL(check_compat_version_for_tenant( + tenant_id, target_data_version, is_compatible))) { + LOG_WARN("fail to check data version for clone tenant", KR(ret), + K(tenant_id), K(target_data_version)); + } + return ret; +} + +int ObShareUtil::check_compat_version_for_clone_tenant( + const uint64_t tenant_id, + bool &is_compatible) +{ + int ret = OB_SUCCESS; + is_compatible = false; + uint64_t target_data_version = DATA_VERSION_4_3_0_0; + if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id)); + } else if (OB_FAIL(check_compat_version_for_tenant( + tenant_id, target_data_version, is_compatible))) { + LOG_WARN("fail to check data version for clone tenant", KR(ret), + K(tenant_id), K(target_data_version)); + } + return ret; +} + +int ObShareUtil::check_compat_version_for_clone_tenant_with_tenant_role( + const uint64_t tenant_id, + bool &is_compatible) +{ + int ret = OB_SUCCESS; + is_compatible = false; + ObAllTenantInfo all_tenant_info; + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id)); + } else if (OB_ISNULL(GCTX.sql_proxy_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret)); + } else if (OB_FAIL(ObAllTenantInfoProxy::load_tenant_info( + tenant_id, GCTX.sql_proxy_, + false/*for_update*/, all_tenant_info))) { + LOG_WARN("failed to load tenant info", KR(ret), K(tenant_id), K(all_tenant_info)); + } else if (all_tenant_info.is_standby()) { + if (OB_FAIL(check_compat_version_for_clone_standby_tenant(tenant_id, is_compatible))) { + LOG_WARN("fail to check compatible version for standby tenant", KR(ret), K(tenant_id)); + } + } else if (all_tenant_info.is_primary()) { + if (OB_FAIL(check_compat_version_for_clone_tenant(tenant_id, is_compatible))) { + LOG_WARN("fail to check compatible version for primary tenant", KR(ret), K(tenant_id)); + } + } else { + ret = OB_OP_NOT_ALLOW; + LOG_WARN("can not clone tenant with tenant role is neither PRIMARY nor STANDBY", + KR(ret), K(all_tenant_info)); + } + return ret; +} + } //end namespace share } //end namespace oceanbase diff --git a/src/share/ob_share_util.h b/src/share/ob_share_util.h index 9d15357eeb..2294746efd 100644 --- a/src/share/ob_share_util.h +++ b/src/share/ob_share_util.h @@ -78,9 +78,31 @@ public: static int check_compat_version_for_arbitration_service( const uint64_t tenant_id, bool &is_compatible); - // data version must up to 4.3 with clone tenant + // check whether sys/meta/user tenant has been promoted to target data version + // params[in] tenant_id, which tenant to check + // params[in] target_data_version, data version to check + // params[out] is_compatible, whether tenants are promoted to target data version + static int check_compat_version_for_tenant( + const uint64_t tenant_id, + const uint64_t target_data_version, + bool &is_compatible); + // tenant data version should up to 430 when cloning primary tenant + // tenant data version should up to 432 when cloning standby tenant // params[in] tenant_id, which tenant to check // params[out] is_compatible, whether it is up to 4.3 + static int check_compat_version_for_clone_tenant_with_tenant_role( + const uint64_t tenant_id, + bool &is_compatible); + + // data version must up to 432 with clone standby tenant + // params[in] tenant_id, which tenant to check + // params[out] is_compatible, whether it is up to 4.3.2 + static int check_compat_version_for_clone_standby_tenant( + const uint64_t tenant_id, + bool &is_compatible); + // data version must up to 430 with clone primary tenant + // params[in] tenant_id, which tenant to check + // params[out] is_compatible, whether it is up to 4.3.0 static int check_compat_version_for_clone_tenant( const uint64_t tenant_id, bool &is_compatible); diff --git a/src/share/restore/ob_tenant_clone_table_operator.cpp b/src/share/restore/ob_tenant_clone_table_operator.cpp index 0525005649..2a8b431c8e 100644 --- a/src/share/restore/ob_tenant_clone_table_operator.cpp +++ b/src/share/restore/ob_tenant_clone_table_operator.cpp @@ -18,6 +18,58 @@ using namespace oceanbase::share; using namespace oceanbase::common; +static const char* cancel_clone_job_reason_strs[] = { + "by user command", + "by standby tenant transfer event", + "by standby tenant upgrade event", + "by standby tenant alter LS event" +}; + +const char* ObCancelCloneJobReason::get_reason_str() const +{ + STATIC_ASSERT(ARRAYSIZEOF(cancel_clone_job_reason_strs) == (int64_t)MAX, + "cancel_clone_job_reason_strs string array size mismatch enum CancelCloneJobReason count"); + const char *str = NULL; + if (reason_ > INVALID && reason_ < MAX) { + str = cancel_clone_job_reason_strs[static_cast(reason_)]; + } else { + LOG_WARN_RET(OB_ERR_UNEXPECTED, "invalid CancelCloneJobReason", K_(reason)); + } + return str; +} + +int ObCancelCloneJobReason::init_by_conflict_case( + const rootserver::ObConflictCaseWithClone &case_to_check) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!case_to_check.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(case_to_check)); + } else if (OB_UNLIKELY(!case_to_check.is_standby_related())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("can not init cancel clone job reason by these cases", KR(ret), K(case_to_check)); + } else { + switch (case_to_check.get_case_name()) { + case rootserver::ObConflictCaseWithClone::STANDBY_UPGRADE : { + reason_ = CancelCloneJobReason::CANCEL_BY_STANDBY_UPGRADE; + break; + } + case rootserver::ObConflictCaseWithClone::ConflictCaseWithClone::STANDBY_TRANSFER : { + reason_ = CancelCloneJobReason::CANCEL_BY_STANDBY_TRANSFER; + break; + } + case rootserver::ObConflictCaseWithClone::ConflictCaseWithClone::STANDBY_MODIFY_LS : { + reason_ = CancelCloneJobReason::CANCEL_BY_STANDBY_ALTER_LS; + break; + } + default: + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid conflict case", K(ret), K(case_to_check)); + } + } + return ret; +} + ObTenantCloneStatus &ObTenantCloneStatus::operator=(const ObTenantCloneStatus &status) { status_ = status.status_; @@ -253,7 +305,9 @@ ObCloneJob::ObCloneJob() : status_(ObTenantCloneStatus::Status::CLONE_MAX_STATUS), job_type_(ObTenantCloneJobType::CLONE_JOB_MAX_TYPE), ret_code_(OB_SUCCESS), - allocator_("CloneJob") + allocator_("CloneJob"), + data_version_(0), + min_cluster_version_(0) {} int ObCloneJob::init(const ObCloneJobInitArg &init_arg) @@ -323,6 +377,8 @@ int ObCloneJob::init(const ObCloneJobInitArg &init_arg) status_ = init_arg.status_; job_type_ = init_arg.job_type_; ret_code_ = init_arg.ret_code_; + data_version_ = init_arg.data_version_; + min_cluster_version_ = init_arg.min_cluster_version_; } return ret; } @@ -352,6 +408,8 @@ int ObCloneJob::assign(const ObCloneJob &other) status_ = other.status_; job_type_ = other.job_type_; ret_code_ = other.ret_code_; + data_version_ = other.data_version_; + min_cluster_version_ = other.min_cluster_version_; } return ret; } @@ -375,6 +433,8 @@ void ObCloneJob::reset() job_type_ = ObTenantCloneJobType::CLONE_JOB_MAX_TYPE; ret_code_ = OB_SUCCESS; allocator_.reset(); + data_version_ = 0; + min_cluster_version_ = 0; } bool ObCloneJob::is_valid() const @@ -1121,6 +1181,7 @@ int ObTenantCloneTableOperator::build_insert_dml_( { int ret = OB_SUCCESS; char trace_id_buf[OB_MAX_TRACE_ID_BUFFER_SIZE] = {'\0'}; + bool is_compatible_with_clone_standby_tenant = false; if (OB_UNLIKELY(!job.is_valid())) { ret = OB_INVALID_ARGUMENT; @@ -1160,6 +1221,23 @@ int ObTenantCloneTableOperator::build_insert_dml_( LOG_WARN("add column failed", K(ret)); } + if (OB_FAIL(ret)) { + } else if (OB_FAIL(share::ObShareUtil::check_compat_version_for_clone_standby_tenant( + job.get_source_tenant_id(), is_compatible_with_clone_standby_tenant))) { + LOG_WARN("fail to check whether tenant is compatible to clone standby tenant", KR(ret), K(job)); + } else if (is_compatible_with_clone_standby_tenant) { + if (OB_FAIL(dml.add_column("data_version", job.get_data_version()))) { + LOG_WARN("add data_version column failed", KR(ret), K(job)); + } else if (OB_FAIL(dml.add_column("min_cluster_version", job.get_min_cluster_version()))) { + LOG_WARN("add min_cluster_version failed", KR(ret), K(job)); + } + } else { + // check data_version and min_cluster_version is 0 when clone standby tenant not supported + if (OB_UNLIKELY(0 != job.get_data_version()) || OB_UNLIKELY(0 != job.get_min_cluster_version())) { + ret = OB_STATE_NOT_MATCH; + LOG_WARN("data version should equals to 0 when not compatible with cloning standby tenant", KR(ret), K(job)); + } + } return ret; } @@ -1186,6 +1264,8 @@ int ObTenantCloneTableOperator::fill_job_from_result_(const ObMySQLResult *resul ObString status_str; ObString job_type_str; int ret_code = OB_SUCCESS; + uint64_t data_version = 0; + uint64_t min_cluster_version = 0; EXTRACT_INT_FIELD_MYSQL(*result, "tenant_id", tenant_id, uint64_t); EXTRACT_INT_FIELD_MYSQL(*result, "job_id", job_id, int64_t); EXTRACT_STRBUF_FIELD_MYSQL(*result, "trace_id", trace_id_buf, sizeof(trace_id_buf), real_length); @@ -1202,6 +1282,10 @@ int ObTenantCloneTableOperator::fill_job_from_result_(const ObMySQLResult *resul EXTRACT_VARCHAR_FIELD_MYSQL(*result, "status", status_str); EXTRACT_VARCHAR_FIELD_MYSQL(*result, "job_type", job_type_str); EXTRACT_INT_FIELD_MYSQL_SKIP_RET(*result, "ret_code", ret_code, int); + EXTRACT_UINT_FIELD_MYSQL_WITH_DEFAULT_VALUE(*result, "data_version", data_version, uint64_t, + true/*skip_null_error*/, true/*skip_column_error*/, 0/*default_value*/); + EXTRACT_UINT_FIELD_MYSQL_WITH_DEFAULT_VALUE(*result, "min_cluster_version", min_cluster_version, uint64_t, + true/*skip_null_error*/, true/*skip_column_error*/, 0/*default_value*/); if (OB_SUCC(ret)) { if (OB_FAIL(trace_id.parse_from_buf(trace_id_buf))) { LOG_WARN("fail to parse trace id from buf", KR(ret), K(trace_id_buf)); @@ -1226,6 +1310,8 @@ int ObTenantCloneTableOperator::fill_job_from_result_(const ObMySQLResult *resul .status_ = ObTenantCloneStatus::get_clone_status(status_str), .job_type_ = get_job_type(job_type_str), .ret_code_ = ret_code, + .data_version_ = data_version, + .min_cluster_version_ = min_cluster_version, }; if (OB_FAIL(job.init(init_arg))) { LOG_WARN("fail to init clone job", KR(ret), K(init_arg)); diff --git a/src/share/restore/ob_tenant_clone_table_operator.h b/src/share/restore/ob_tenant_clone_table_operator.h index d8f1a3b45a..a9ede30bf1 100644 --- a/src/share/restore/ob_tenant_clone_table_operator.h +++ b/src/share/restore/ob_tenant_clone_table_operator.h @@ -14,6 +14,7 @@ #define OCEANBASE_SHARE_OB_TENANT_CLONE_TABLE_OPERATOR_H_ #include "lib/mysqlclient/ob_mysql_proxy.h" +#include "rootserver/tenant_snapshot/ob_tenant_snapshot_util.h" // for ObConflictCaseWithClone #include "share/tenant_snapshot/ob_tenant_snapshot_id.h" #include "src/share/scn.h" @@ -24,6 +25,43 @@ namespace share class ObDMLSqlSplicer; class TenantCloneStatusStrPair; +class ObCancelCloneJobReason +{ + OB_UNIS_VERSION(1); +public: + enum CancelCloneJobReason + { + INVALID = -1, + CANCEL_BY_USER, + CANCEL_BY_STANDBY_TRANSFER, + CANCEL_BY_STANDBY_UPGRADE, + CANCEL_BY_STANDBY_ALTER_LS, + MAX + }; +public: + ObCancelCloneJobReason() : reason_(INVALID) {} + explicit ObCancelCloneJobReason(CancelCloneJobReason reason) : reason_(reason) {} + + int init_by_conflict_case(const rootserver::ObConflictCaseWithClone &case_to_check); + ObCancelCloneJobReason &operator=(const CancelCloneJobReason reason) { reason_ = reason; return *this; } + ObCancelCloneJobReason &operator=(const ObCancelCloneJobReason &other) { reason_ = other.reason_; return *this; } + void reset() { reason_ = INVALID; } + void assign(const ObCancelCloneJobReason &other) { reason_ = other.reason_; } + bool operator==(const ObCancelCloneJobReason &other) const { return other.reason_ == reason_; } + bool operator!=(const ObCancelCloneJobReason &other) const { return other.reason_ != reason_; } + bool is_valid() const { return INVALID < reason_ && MAX > reason_; } + bool is_cancel_by_user() const { return CANCEL_BY_USER == reason_; } + bool is_cancel_by_standby_transfer() const { return CANCEL_BY_STANDBY_TRANSFER == reason_; } + bool is_cancel_by_standby_upgrade() const { return CANCEL_BY_STANDBY_UPGRADE == reason_; } + bool is_cancel_by_standby_alter_ls() const { return CANCEL_BY_STANDBY_ALTER_LS == reason_; } + const CancelCloneJobReason &get_reason() const { return reason_; } + const char* get_reason_str() const; + + TO_STRING_KV("reason", get_reason_str()); +private: + CancelCloneJobReason reason_; +}; + class ObTenantCloneStatus { public: @@ -95,8 +133,7 @@ public: bool is_sys_valid_snapshot_status_for_fork() const; bool is_sys_release_resource_status() const; bool is_sys_release_clone_resource_status() const; - - TO_STRING_KV(K_(status)); + TO_STRING_KV("status", get_clone_status_str(status_)); private: Status status_; @@ -139,6 +176,8 @@ public: const ObTenantCloneJobType &get_job_type() const { return job_type_; } void set_status(const ObTenantCloneStatus::Status &status) { status_ = status; } int get_ret_code() const { return ret_code_; } + uint64_t get_data_version() const { return data_version_; } + uint64_t get_min_cluster_version() const { return min_cluster_version_; } struct ObCloneJobInitArg { @@ -158,13 +197,16 @@ public: ObTenantCloneStatus status_; ObTenantCloneJobType job_type_; int ret_code_; + uint64_t data_version_; + uint64_t min_cluster_version_; TO_STRING_KV(K_(trace_id), K_(tenant_id), K_(job_id), K_(source_tenant_id), K_(source_tenant_name), K_(clone_tenant_id), K_(clone_tenant_name), K_(tenant_snapshot_id), K_(tenant_snapshot_name), K_(resource_pool_id), K_(resource_pool_name), K_(unit_config_name), K_(restore_scn), - K_(status), K_(job_type), K_(ret_code)); + K_(status), K_(job_type), K_(ret_code), + K_(data_version), K_(min_cluster_version)); }; int init(const ObCloneJobInitArg &init_arg); int assign(const ObCloneJob &other); @@ -177,7 +219,8 @@ public: K_(tenant_snapshot_id), K_(tenant_snapshot_name), K_(resource_pool_id), K_(resource_pool_name), K_(unit_config_name), K_(restore_scn), - K_(status), K_(job_type), K_(ret_code)); + K_(status), K_(job_type), K_(ret_code), + K_(data_version), K_(min_cluster_version)); private: common::ObCurTraceId::TraceId trace_id_; //in sys tenant space, tenant_id_ is OB_SYS_TENANT_ID @@ -198,6 +241,8 @@ private: ObTenantCloneJobType job_type_; int ret_code_; ObArenaAllocator allocator_; + uint64_t data_version_; + uint64_t min_cluster_version_; private: DISALLOW_COPY_AND_ASSIGN(ObCloneJob); @@ -207,6 +252,7 @@ class ObTenantCloneTableOperator { public: ObTenantCloneTableOperator(); + bool is_inited() const { return is_inited_; } int init(const uint64_t user_tenant_id, ObISQLClient *proxy); //get clone job according to source_tenant_id diff --git a/src/share/tenant_snapshot/ob_tenant_snapshot_table_operator.h b/src/share/tenant_snapshot/ob_tenant_snapshot_table_operator.h index f64a5dcc6e..ef908d76f8 100644 --- a/src/share/tenant_snapshot/ob_tenant_snapshot_table_operator.h +++ b/src/share/tenant_snapshot/ob_tenant_snapshot_table_operator.h @@ -335,6 +335,7 @@ class ObTenantSnapshotTableOperator public: ObTenantSnapshotTableOperator(); int init(const uint64_t user_tenant_id, ObISQLClient *proxy); + bool is_inited() const { return is_inited_; } public: ObTenantSnapStatus str_to_tenant_snap_status(const ObString &status_str); diff --git a/src/sql/engine/cmd/ob_alter_system_executor.cpp b/src/sql/engine/cmd/ob_alter_system_executor.cpp index 575f924487..fec02d5f72 100644 --- a/src/sql/engine/cmd/ob_alter_system_executor.cpp +++ b/src/sql/engine/cmd/ob_alter_system_executor.cpp @@ -46,6 +46,7 @@ #include "sql/plan_cache/ob_plan_cache.h" #include "pl/pl_cache/ob_pl_cache_mgr.h" #include "sql/plan_cache/ob_ps_cache.h" +#include "share/restore/ob_tenant_clone_table_operator.h" //ObCancelCloneJobReason #include "share/table/ob_ttl_util.h" #include "rootserver/restore/ob_tenant_clone_util.h" @@ -2780,9 +2781,9 @@ int ObCancelCloneExecutor::execute(ObExecContext &ctx, ObCancelCloneStmt &stmt) } } - if (FAILEDx(rootserver::ObTenantCloneUtil::cancel_clone_job(*sql_proxy, - clone_tenant_name, - clone_already_finish))) { + if (FAILEDx(rootserver::ObTenantCloneUtil::cancel_clone_job_by_name( + *sql_proxy, clone_tenant_name, clone_already_finish, + ObCancelCloneJobReason(ObCancelCloneJobReason::CANCEL_BY_USER)))) { LOG_WARN("cancel clone job failed", KR(ret), K(clone_tenant_name)); } else if (clone_already_finish) { ret = OB_OP_NOT_ALLOW; diff --git a/src/sql/resolver/cmd/ob_alter_system_resolver.cpp b/src/sql/resolver/cmd/ob_alter_system_resolver.cpp index 2b702cb0c5..f98b8a228a 100644 --- a/src/sql/resolver/cmd/ob_alter_system_resolver.cpp +++ b/src/sql/resolver/cmd/ob_alter_system_resolver.cpp @@ -6153,7 +6153,8 @@ int ObCancelCloneResolver::resolve(const ParseNode &parse_tree) } else { bool is_compatible = false; const uint64_t tenant_id = session_info_->get_login_tenant_id(); - if (OB_FAIL(share::ObShareUtil::check_compat_version_for_clone_tenant(tenant_id, is_compatible))) { + if (OB_FAIL(share::ObShareUtil::check_compat_version_for_clone_tenant_with_tenant_role( + tenant_id, is_compatible))) { LOG_WARN("fail to check compat version", KR(ret), K(tenant_id)); } else if (!is_compatible) { ret = OB_NOT_SUPPORTED; diff --git a/src/sql/resolver/cmd/ob_tenant_clone_resolver.cpp b/src/sql/resolver/cmd/ob_tenant_clone_resolver.cpp index 3d9d211324..9fc1b74669 100644 --- a/src/sql/resolver/cmd/ob_tenant_clone_resolver.cpp +++ b/src/sql/resolver/cmd/ob_tenant_clone_resolver.cpp @@ -58,7 +58,8 @@ int ObCloneTenantResolver::resolve(const ParseNode &parse_tree) } else { bool is_compatible = false; const uint64_t tenant_id = session_info_->get_login_tenant_id(); - if (OB_FAIL(share::ObShareUtil::check_compat_version_for_clone_tenant(tenant_id, is_compatible))) { + if (OB_FAIL(share::ObShareUtil::check_compat_version_for_clone_tenant_with_tenant_role( + tenant_id, is_compatible))) { LOG_WARN("fail to check compat version", KR(ret), K(tenant_id)); } else if (!is_compatible) { ret = OB_NOT_SUPPORTED; diff --git a/src/sql/resolver/cmd/ob_tenant_snapshot_resolver.cpp b/src/sql/resolver/cmd/ob_tenant_snapshot_resolver.cpp index e2ff62ef73..03cb22cf4c 100644 --- a/src/sql/resolver/cmd/ob_tenant_snapshot_resolver.cpp +++ b/src/sql/resolver/cmd/ob_tenant_snapshot_resolver.cpp @@ -51,7 +51,8 @@ int ObCreateTenantSnapshotResolver::resolve(const ParseNode &parse_tree) } else { bool is_compatible = false; tenant_id = session_info_->get_login_tenant_id(); - if (OB_FAIL(share::ObShareUtil::check_compat_version_for_clone_tenant(tenant_id, is_compatible))) { + if (OB_FAIL(share::ObShareUtil::check_compat_version_for_clone_tenant_with_tenant_role( + tenant_id, is_compatible))) { LOG_WARN("fail to check compat version", KR(ret), K(tenant_id)); } else if (!is_compatible) { ret = OB_NOT_SUPPORTED; @@ -138,7 +139,8 @@ int ObDropTenantSnapshotResolver::resolve(const ParseNode &parse_tree) } else { bool is_compatible = false; tenant_id = session_info_->get_login_tenant_id(); - if (OB_FAIL(share::ObShareUtil::check_compat_version_for_clone_tenant(tenant_id, is_compatible))) { + if (OB_FAIL(share::ObShareUtil::check_compat_version_for_clone_tenant_with_tenant_role( + tenant_id, is_compatible))) { LOG_WARN("fail to check compat version", KR(ret), K(tenant_id)); } else if (!is_compatible) { ret = OB_NOT_SUPPORTED;