diff --git a/src/observer/ob_rpc_processor_simple.cpp b/src/observer/ob_rpc_processor_simple.cpp index f74d389ee4..f7f6f8f144 100644 --- a/src/observer/ob_rpc_processor_simple.cpp +++ b/src/observer/ob_rpc_processor_simple.cpp @@ -73,6 +73,7 @@ #include "sql/session/ob_sql_session_info.h" #include "sql/session/ob_sess_info_verify.h" #include "observer/table/ttl/ob_ttl_service.h" +#include "storage/high_availability/ob_storage_ha_utils.h" namespace oceanbase { @@ -2580,8 +2581,10 @@ int ObRpcStartTransferTaskP::process() { int ret = OB_SUCCESS; ObTransferService *transfer_service = nullptr; - - if (OB_UNLIKELY(arg_.get_tenant_id() != MTL_ID())) { + const share::ObLSID &src_ls = arg_.get_src_ls(); + const uint64_t tenant_id = arg_.get_tenant_id(); + bool is_leader = false; + if (OB_UNLIKELY(tenant_id != MTL_ID())) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("ObRpcStartTransferTaskP::process tenant not match", KR(ret), K_(arg)); } else if (OB_UNLIKELY(!arg_.is_valid())) { @@ -2590,6 +2593,11 @@ int ObRpcStartTransferTaskP::process() } else if (OB_ISNULL(transfer_service = MTL(ObTransferService *))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("ls service should not be null", K(ret), KP(transfer_service)); + } else if (OB_FAIL(storage::ObStorageHAUtils::check_ls_is_leader(tenant_id, src_ls, is_leader))) { + LOG_WARN("fail to check ls is leader", K(ret), K(tenant_id), K(src_ls)); + } else if (!is_leader) { + ret = OB_NOT_MASTER; + LOG_WARN("ls is not leader, please retry", K(ret), K(is_leader)); } else { transfer_service->wakeup(); } diff --git a/src/share/ob_rpc_struct.cpp b/src/share/ob_rpc_struct.cpp index b41ca839ff..f16ef592c3 100755 --- a/src/share/ob_rpc_struct.cpp +++ b/src/share/ob_rpc_struct.cpp @@ -9686,23 +9686,23 @@ int ObRlsContextDDLArg::assign(const ObRlsContextDDLArg &other) return ret; } -OB_SERIALIZE_MEMBER(ObStartTransferTaskArg, tenant_id_, task_id_, dest_ls_); +OB_SERIALIZE_MEMBER(ObStartTransferTaskArg, tenant_id_, task_id_, src_ls_); int ObStartTransferTaskArg::init( const uint64_t tenant_id, const ObTransferTaskID &task_id, - const ObLSID &dest_ls) + const ObLSID &src_ls) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || ! task_id.is_valid()) - || !dest_ls.is_valid()) { + || !src_ls.is_valid()) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid args", KR(ret), K(tenant_id), K(task_id), K(dest_ls)); + LOG_WARN("invalid args", KR(ret), K(tenant_id), K(task_id), K(src_ls)); } else { tenant_id_ = tenant_id; task_id_ = task_id; - dest_ls_ = dest_ls; + src_ls_ = src_ls; } return ret; } @@ -9712,7 +9712,7 @@ int ObStartTransferTaskArg::assign(const ObStartTransferTaskArg &other) int ret = OB_SUCCESS; tenant_id_ = other.tenant_id_; task_id_ = other.task_id_; - dest_ls_ = other.dest_ls_; + src_ls_ = other.src_ls_; return ret; } diff --git a/src/share/ob_rpc_struct.h b/src/share/ob_rpc_struct.h index ad1a53971c..c152592814 100755 --- a/src/share/ob_rpc_struct.h +++ b/src/share/ob_rpc_struct.h @@ -9708,20 +9708,20 @@ struct ObStartTransferTaskArg final { OB_UNIS_VERSION(1); public: - ObStartTransferTaskArg(): tenant_id_(OB_INVALID_TENANT_ID), task_id_(), dest_ls_() {} + ObStartTransferTaskArg(): tenant_id_(OB_INVALID_TENANT_ID), task_id_(), src_ls_() {} ~ObStartTransferTaskArg() {} - int init(const uint64_t tenant_id, const share::ObTransferTaskID &task_id, const share::ObLSID &dest_ls); + int init(const uint64_t tenant_id, const share::ObTransferTaskID &task_id, const share::ObLSID &src_ls); uint64_t get_tenant_id() const { return tenant_id_; } const share::ObTransferTaskID get_task_id() const { return task_id_; } - const share::ObLSID &get_dest_ls() { return dest_ls_; } + const share::ObLSID &get_src_ls() { return src_ls_; } bool is_valid() const { return is_valid_tenant_id(tenant_id_) && task_id_.is_valid(); } int assign(const ObStartTransferTaskArg &other); - TO_STRING_KV(K_(tenant_id), K_(task_id), K_(dest_ls)); + TO_STRING_KV(K_(tenant_id), K_(task_id), K_(src_ls)); private: uint64_t tenant_id_; share::ObTransferTaskID task_id_; - share::ObLSID dest_ls_; + share::ObLSID src_ls_; DISALLOW_COPY_AND_ASSIGN(ObStartTransferTaskArg); }; diff --git a/src/storage/high_availability/ob_finish_transfer.cpp b/src/storage/high_availability/ob_finish_transfer.cpp index a7bb853a63..35e4767131 100644 --- a/src/storage/high_availability/ob_finish_transfer.cpp +++ b/src/storage/high_availability/ob_finish_transfer.cpp @@ -961,23 +961,10 @@ int ObTxFinishTransfer::report_result_( int ObTxFinishTransfer::check_self_ls_leader_(const share::ObLSID &ls_id, bool &is_leader) { int ret = OB_SUCCESS; - ObRole role = ObRole::INVALID_ROLE; - int64_t proposal_id = 0; - ObLSHandle ls_handle; - ObLS *ls = nullptr; const uint64_t tenant_id = MTL_ID(); is_leader = false; - if (OB_FAIL(get_ls_handle_(tenant_id, ls_id, ls_handle))) { - LOG_WARN("failed to get ls handle", K(ret), K(tenant_id), K(ls_id)); - } else if (OB_ISNULL(ls = ls_handle.get_ls())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("ls should not be NULL", K(ret), K(tenant_id), K(ls_id)); - } else if (OB_FAIL(ls->get_log_handler()->get_role(role, proposal_id))) { - LOG_WARN("failed to get role", K(ret), K(tenant_id), K(ls_id)); - } else if (is_strong_leader(role)) { - is_leader = true; - } else { - is_leader = false; + if (OB_FAIL(ObStorageHAUtils::check_ls_is_leader(tenant_id, ls_id, is_leader))) { + LOG_WARN("failed to check ls leader", K(ret), K(tenant_id), K(ls_id)); } return ret; } diff --git a/src/storage/high_availability/ob_ls_migration.cpp b/src/storage/high_availability/ob_ls_migration.cpp index 0024f0293d..09da0c59fd 100644 --- a/src/storage/high_availability/ob_ls_migration.cpp +++ b/src/storage/high_availability/ob_ls_migration.cpp @@ -29,6 +29,7 @@ #include "share/ls/ob_ls_table_operator.h" #include "ob_rebuild_service.h" #include "share/ob_cluster_version.h" +#include "ob_storage_ha_utils.h" namespace oceanbase { @@ -1042,9 +1043,8 @@ int ObStartMigrationTask::deal_with_local_ls_() int ret = OB_SUCCESS; ObLSHandle ls_handle; ObLS *ls = nullptr; - ObRole role; - int64_t proposal_id = 0; ObLSMeta local_ls_meta; + bool is_leader = false; if (!is_inited_) { ret = OB_NOT_INIT; LOG_WARN("start migration task do not init", K(ret)); @@ -1053,18 +1053,19 @@ int ObStartMigrationTask::deal_with_local_ls_() } else if (OB_ISNULL(ls = ls_handle.get_ls())) { ret = OB_ERR_SYS; LOG_ERROR("log stream should not be NULL", K(ret), K(*ctx_)); - } else if (OB_FAIL(ls->get_log_handler()->get_role(role, proposal_id))) { - LOG_WARN("failed to get role", K(ret), "arg", ctx_->arg_); - } else if (is_strong_leader(role)) { + } else if (OB_FAIL(ObStorageHAUtils::check_ls_is_leader( + ctx_->tenant_id_, ctx_->arg_.ls_id_, is_leader))) { + LOG_WARN("failed to check ls leader", K(ret), KPC(ctx_)); + } else if (is_leader) { if (ObMigrationOpType::REBUILD_LS_OP == ctx_->arg_.type_) { ret = OB_ERR_UNEXPECTED; - LOG_ERROR("leader can not as rebuild dst", K(ret), K(role), "myaddr", MYADDR, "arg", ctx_->arg_); + LOG_ERROR("leader can not as rebuild dst", K(ret), K(is_leader), "myaddr", MYADDR, "arg", ctx_->arg_); } else if (ObMigrationOpType::ADD_LS_OP == ctx_->arg_.type_ || ObMigrationOpType::MIGRATE_LS_OP == ctx_->arg_.type_ || ObMigrationOpType::CHANGE_LS_OP == ctx_->arg_.type_) { ret = OB_ERR_SYS; LOG_WARN("leader cannot as add, migrate, change dst", - K(ret), K(role), "myaddr", MYADDR, "arg", ctx_->arg_); + K(ret), K(is_leader), "myaddr", MYADDR, "arg", ctx_->arg_); } } else if (OB_FAIL(ls->offline())) { LOG_WARN("failed to disable log", K(ret), KPC(ctx_)); diff --git a/src/storage/high_availability/ob_ls_prepare_migration.cpp b/src/storage/high_availability/ob_ls_prepare_migration.cpp index 09a968e2b4..f69278b329 100644 --- a/src/storage/high_availability/ob_ls_prepare_migration.cpp +++ b/src/storage/high_availability/ob_ls_prepare_migration.cpp @@ -20,6 +20,7 @@ #include "ob_transfer_service.h" #include "storage/tablet/ob_tablet.h" #include "ob_rebuild_service.h" +#include "ob_storage_ha_utils.h" using namespace oceanbase; using namespace common; @@ -756,8 +757,7 @@ int ObStartPrepareMigrationTask::deal_with_local_ls_() int ret = OB_SUCCESS; ObLSHandle ls_handle; ObLS *ls = nullptr; - ObRole role; - int64_t proposal_id = 0; + bool is_leader = false; ObLSSavedInfo saved_info; if (!is_inited_) { @@ -768,18 +768,19 @@ int ObStartPrepareMigrationTask::deal_with_local_ls_() } else if (OB_ISNULL(ls = ls_handle.get_ls())) { ret = OB_ERR_SYS; LOG_ERROR("log stream should not be NULL", K(ret), K(*ctx_)); - } else if (OB_FAIL(ls->get_log_handler()->get_role(role, proposal_id))) { - LOG_WARN("failed to get role", K(ret), "arg", ctx_->arg_); - } else if (is_strong_leader(role)) { + } else if (OB_FAIL(ObStorageHAUtils::check_ls_is_leader( + ctx_->tenant_id_, ctx_->arg_.ls_id_, is_leader))) { + LOG_WARN("failed to check ls leader", K(ret), KPC(ctx_)); + } else if (is_leader) { if (ObMigrationOpType::REBUILD_LS_OP == ctx_->arg_.type_) { ret = OB_ERR_UNEXPECTED; - LOG_ERROR("leader can not as rebuild dst", K(ret), K(role), "myaddr", MYADDR, "arg", ctx_->arg_); + LOG_ERROR("leader can not as rebuild dst", K(ret), K(is_leader), "myaddr", MYADDR, "arg", ctx_->arg_); } else if (ObMigrationOpType::ADD_LS_OP == ctx_->arg_.type_ || ObMigrationOpType::MIGRATE_LS_OP == ctx_->arg_.type_ || ObMigrationOpType::CHANGE_LS_OP == ctx_->arg_.type_) { ret = OB_ERR_SYS; LOG_WARN("leader cannot as add, migrate, change dst", - K(ret), K(role), "myaddr", MYADDR, "arg", ctx_->arg_); + K(ret), K(is_leader), "myaddr", MYADDR, "arg", ctx_->arg_); } } diff --git a/src/storage/high_availability/ob_storage_ha_utils.cpp b/src/storage/high_availability/ob_storage_ha_utils.cpp index e1500c3896..e99b930f2d 100644 --- a/src/storage/high_availability/ob_storage_ha_utils.cpp +++ b/src/storage/high_availability/ob_storage_ha_utils.cpp @@ -34,6 +34,7 @@ #include "rootserver/ob_tenant_info_loader.h" #include "src/observer/omt/ob_tenant_config.h" #include "common/errsim_module/ob_errsim_module_type.h" +#include "common/ob_role.h" using namespace oceanbase::share; @@ -382,6 +383,38 @@ int ObStorageHAUtils::calc_tablet_sstable_macro_block_cnt( return ret; } +int ObStorageHAUtils::check_ls_is_leader( + const uint64_t tenant_id, + const share::ObLSID &ls_id, + bool &is_leader) +{ + int ret = OB_SUCCESS; + ObLSService *ls_srv = NULL; + common::ObRole role = common::ObRole::INVALID_ROLE; + int64_t proposal_id = 0; + ObLSHandle ls_handle; + ObLS *ls = nullptr; + is_leader = false; + if (OB_INVALID_ID == tenant_id || !ls_id.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(ls_id)); + } else if (OB_ISNULL(ls_srv = MTL_WITH_CHECK_TENANT(ObLSService *, tenant_id))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("log stream service is NULL", K(ret), K(tenant_id)); + } else if (OB_FAIL(ls_srv->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD))) { + LOG_WARN("failed to get log stream", K(ret), K(tenant_id), K(ls_id)); + } else if (OB_ISNULL(ls = ls_handle.get_ls())) { + LOG_WARN("ls should not be null", K(ret), KP(ls)); + } else if (OB_FAIL(ls->get_log_handler()->get_role(role, proposal_id))) { + LOG_WARN("failed to get role", K(ret), KP(ls)); + } else if (is_strong_leader(role)) { + is_leader = true; + } else { + is_leader = false; + } + return ret; +} + bool ObTransferUtils::is_need_retry_error(const int err) { bool bool_ret = false; diff --git a/src/storage/high_availability/ob_storage_ha_utils.h b/src/storage/high_availability/ob_storage_ha_utils.h index e58ab2c6d7..7173098b91 100644 --- a/src/storage/high_availability/ob_storage_ha_utils.h +++ b/src/storage/high_availability/ob_storage_ha_utils.h @@ -46,6 +46,7 @@ public: static int64_t get_rpc_timeout(); static int check_is_primary_tenant(const uint64_t tenant_id, bool &is_primary_tenant); static int check_disk_space(); + static int check_ls_is_leader(const uint64_t tenant_id, const share::ObLSID &ls_id, bool &is_leader); static int calc_tablet_sstable_macro_block_cnt( const ObTabletHandle &tablet_handle, int64_t &data_macro_block_count);