diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h index 7c5f6f7f4..9e0c7c7aa 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h @@ -472,6 +472,9 @@ PCODE_DEF(OB_HA_BLOCK_TX, 0x4B4) PCODE_DEF(OB_HA_KILL_TX, 0x4B5) PCODE_DEF(OB_HA_UNBLOCK_TX, 0x4B6) PCODE_DEF(OB_HA_SWITCH_LEARNER_TO_ACCEPTOR, 0x4B7) +PCODE_DEF(OB_HA_LOCK_CONFIG_CHANGE, 0x4B8) +PCODE_DEF(OB_HA_UNLOCK_CONFIG_CHANGE, 0x4B9) +PCODE_DEF(OB_HA_GET_CONFIG_CHANGE_LOCK_STAT, 0x4BA) // sql, including executor diff --git a/src/observer/ob_srv_xlator_partition.cpp b/src/observer/ob_srv_xlator_partition.cpp index 8c84ab5fc..e3da7805b 100644 --- a/src/observer/ob_srv_xlator_partition.cpp +++ b/src/observer/ob_srv_xlator_partition.cpp @@ -159,6 +159,9 @@ void oceanbase::observer::init_srv_xlator_for_migration(ObSrvRpcXlator *xlator) RPC_PROCESSOR(ObStorageBlockTxP, gctx_.bandwidth_throttle_); RPC_PROCESSOR(ObStorageKillTxP, gctx_.bandwidth_throttle_); RPC_PROCESSOR(ObStorageUnBlockTxP, gctx_.bandwidth_throttle_); + RPC_PROCESSOR(ObStorageLockConfigChangeP, gctx_.bandwidth_throttle_); + RPC_PROCESSOR(ObStorageUnlockConfigChangeP, gctx_.bandwidth_throttle_); + RPC_PROCESSOR(ObStorageGetLogConfigStatP, gctx_.bandwidth_throttle_); } void oceanbase::observer::init_srv_xlator_for_others(ObSrvRpcXlator *xlator) { diff --git a/src/storage/high_availability/ob_finish_transfer.cpp b/src/storage/high_availability/ob_finish_transfer.cpp index 9a6b37b34..cd63ab5c6 100644 --- a/src/storage/high_availability/ob_finish_transfer.cpp +++ b/src/storage/high_availability/ob_finish_transfer.cpp @@ -791,16 +791,9 @@ int ObTxFinishTransfer::lock_ls_member_list_(const uint64_t tenant_id, const sha const common::ObMemberList &member_list, const ObTransferLockStatus &status) { int ret = OB_SUCCESS; - storage::ObLS *ls = NULL; - storage::ObLSHandle ls_handle; - if (OB_FAIL(get_ls_handle_(tenant_id, ls_id, ls_handle))) { - LOG_WARN("failed to get ls", K(ret), K(tenant_id), K(ls_id)); - } else if (OB_ISNULL(ls = ls_handle.get_ls())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("log stream not exist", K(ret), K(ls_id)); - } else if (OB_FAIL(ObMemberListLockUtils::lock_ls_member_list( - tenant_id, ls_id, task_id_.id(), member_list, status, ls, *sql_proxy_))) { - LOG_WARN("failed to unlock ls member list", K(ret), K(ls_id), K(member_list), KPC(ls)); + if (OB_FAIL(ObMemberListLockUtils::lock_ls_member_list( + tenant_id, ls_id, task_id_.id(), member_list, status, *sql_proxy_))) { + LOG_WARN("failed to unlock ls member list", K(ret), K(ls_id), K(member_list)); } else { #ifdef ERRSIM if (OB_SUCC(ret)) { diff --git a/src/storage/high_availability/ob_transfer_lock_utils.cpp b/src/storage/high_availability/ob_transfer_lock_utils.cpp index c6d91d699..65fdf0910 100644 --- a/src/storage/high_availability/ob_transfer_lock_utils.cpp +++ b/src/storage/high_availability/ob_transfer_lock_utils.cpp @@ -20,6 +20,7 @@ #include "storage/ob_common_id_utils.h" #include "share/ob_common_id.h" #include "observer/ob_server_event_history_table_operator.h" +#include "storage/high_availability/ob_storage_ha_utils.h" using namespace oceanbase::share; using namespace oceanbase::common; @@ -59,18 +60,7 @@ int ObMemberListLockUtils::batch_lock_ls_member_list(const uint64_t tenant_id, c std::sort(sorted_ls_list.begin(), sorted_ls_list.end()); for (int64_t i = 0; OB_SUCC(ret) && i < sorted_ls_list.count(); ++i) { const share::ObLSID &ls_id = sorted_ls_list.at(i); - ObLSService *ls_srv = NULL; - ObLSHandle ls_handle; - ObLS *ls = NULL; - if (OB_ISNULL(ls_srv = MTL(ObLSService *))) { - ret = OB_ERR_UNEXPECTED; - LOG_ERROR("ls srv should not be NULL", K(ret), KP(ls_srv)); - } else if (OB_FAIL(ls_srv->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD))) { - LOG_ERROR("ls_srv->get_ls() fail", KR(ret)); - } else if (OB_ISNULL(ls = ls_handle.get_ls())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("ls is NULL", KR(ret), K(ls_handle)); - } else if (OB_FAIL(lock_ls_member_list(tenant_id, ls_id, task_id, member_list, status, ls, sql_proxy))) { + if (OB_FAIL(lock_ls_member_list(tenant_id, ls_id, task_id, member_list, status, sql_proxy))) { LOG_WARN("failed to lock ls member list", K(ret), K(ls_id), K(member_list)); } } @@ -79,7 +69,7 @@ int ObMemberListLockUtils::batch_lock_ls_member_list(const uint64_t tenant_id, c } int ObMemberListLockUtils::lock_ls_member_list(const uint64_t tenant_id, const share::ObLSID &ls_id, - const int64_t task_id, const common::ObMemberList &member_list, const ObTransferLockStatus &status, ObLS *ls, + const int64_t task_id, const common::ObMemberList &member_list, const ObTransferLockStatus &status, common::ObMySQLProxy &sql_proxy) { int ret = OB_SUCCESS; @@ -87,10 +77,7 @@ int ObMemberListLockUtils::lock_ls_member_list(const uint64_t tenant_id, const s int64_t real_lock_owner = -1; ObSqlString member_list_str; const int64_t lock_timeout = CONFIG_CHANGE_TIMEOUT; - if (OB_ISNULL(ls)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("ls should not be null", K(ret), KP(ls)); - } else if (!ls_id.is_valid() || !member_list.is_valid()) { + if (!ls_id.is_valid() || !member_list.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("get invalid args", K(ret), K(ls_id)); } else if (OB_FAIL(unlock_ls_member_list(tenant_id, ls_id, task_id, member_list, status, sql_proxy))) { @@ -115,11 +102,11 @@ int ObMemberListLockUtils::lock_ls_member_list(const uint64_t tenant_id, const s bool need_lock_palf = false; if (OB_FAIL(lock_info.set(tenant_id, ls_id, task_id, status, real_lock_owner, member_list_str.ptr()))) { LOG_WARN("failed to set lock info", K(ret), K(tenant_id), K(ls_id), K(task_id), K(status), K(real_lock_owner)); - } else if (OB_FAIL(ls->get_config_change_lock_stat(palf_lock_owner, palf_is_locked))) { - LOG_WARN("failed to get config change lock stat", K(ret), KPC(ls), K(tenant_id), K(ls_id)); + } else if (OB_FAIL(get_config_change_lock_stat_(lock_info, palf_lock_owner, palf_is_locked))) { + LOG_WARN("failed to get config change lock stat", K(ret), K(tenant_id), K(ls_id)); } else if (OB_FAIL(check_lock_status_(palf_is_locked, palf_lock_owner, real_lock_owner, need_lock_palf))) { LOG_WARN("failed to check lock status", K(ret), K(palf_is_locked), K(palf_lock_owner), K(real_lock_owner)); - } else if (need_lock_palf && OB_FAIL(try_lock_config_change(lock_info, lock_timeout, ls))) { + } else if (need_lock_palf && OB_FAIL(try_lock_config_change_(lock_info, lock_timeout))) { LOG_WARN("failed to try lock config config", K(ret), K(tenant_id), @@ -162,8 +149,6 @@ int ObMemberListLockUtils::unlock_ls_member_list(const uint64_t tenant_id, const int64_t palf_lock_owner = -1; bool need_unlock = true; bool need_relock_before_unlock = false; - storage::ObLSHandle ls_handle; - storage::ObLS *ls = NULL; if (OB_FAIL(ObTransferLockInfoOperator::get(row_key, task_id, status, for_update, lock_info, trans))) { if (OB_ENTRY_NOT_EXIST == ret) { // palf need to be unlocked @@ -178,21 +163,16 @@ int ObMemberListLockUtils::unlock_ls_member_list(const uint64_t tenant_id, const } else { LOG_WARN("failed to get lock info", K(ret), K(tenant_id), K(row_key)); } - } else if (OB_FAIL(get_ls_handle(tenant_id, ls_id, ls_handle))) { - LOG_WARN("failed to get ls handle", K(ret), K(tenant_id), K(ls_id)); - } else if (OB_ISNULL(ls = ls_handle.get_ls())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("ls should not be null", K(ret)); - } else if (OB_FAIL(ls->get_config_change_lock_stat(palf_lock_owner, palf_is_locked))) { - LOG_WARN("failed to get config change lock stat", K(ret)); + } else if (OB_FAIL(get_config_change_lock_stat_(lock_info, palf_lock_owner, palf_is_locked))) { + LOG_WARN("failed to get config change lock stat"); } else if (OB_FAIL(check_unlock_status_(palf_is_locked, palf_lock_owner, lock_info.lock_owner_, need_unlock, need_relock_before_unlock))) { LOG_WARN("failed to check unlock status", K(ret), K(palf_is_locked), K(palf_lock_owner), K(lock_info)); } else if (FALSE_IT(lock_owner = lock_info.lock_owner_)) { // assign lock owner - } else if (need_relock_before_unlock && OB_FAIL(relock_before_unlock_(lock_info, palf_lock_owner, lock_timeout, ls))) { - LOG_WARN("failed to relock config change", K(ret), K(lock_info), K(palf_lock_owner), KPC(ls)); - } else if (need_unlock && OB_FAIL(unlock_config_change(lock_info, lock_timeout, ls))) { + } else if (need_relock_before_unlock && OB_FAIL(relock_before_unlock_(lock_info, palf_lock_owner, lock_timeout))) { + LOG_WARN("failed to relock config change", K(ret), K(lock_info), K(palf_lock_owner)); + } else if (need_unlock && OB_FAIL(unlock_config_change_(lock_info, lock_timeout))) { LOG_WARN("failed to get paxos member list", K(ret), K(lock_info)); } else if (OB_FAIL(ObTransferLockInfoOperator::remove(tenant_id, ls_id, task_id, status, trans))) { LOG_WARN("failed to update lock info", K(ret), K(row_key), K(lock_info), K(palf_lock_owner), K(palf_is_locked)); @@ -232,92 +212,220 @@ int ObMemberListLockUtils::unlock_ls_member_list(const uint64_t tenant_id, const } } } +#ifdef ERRSIM + if (OB_FAIL(ret)) { + SERVER_EVENT_SYNC_ADD("transfer", "unlock_ls_member_list_failed", + "tenant_id", tenant_id, + "ls_id", ls_id.id(), + "task_id", task_id, + "status", status, + "result", ret); + } +#endif return ret; } -int ObMemberListLockUtils::try_lock_config_change( - const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout, ObLS *ls) +int ObMemberListLockUtils::try_lock_config_change_( + const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout) { int ret = OB_SUCCESS; - if (OB_ISNULL(ls)) { + bool ls_exist = false; + ObLSService *ls_svr = NULL; + const uint64_t tenant_id = lock_info.tenant_id_; + const share::ObLSID &ls_id = lock_info.ls_id_; + const int64_t lock_owner = lock_info.lock_owner_; + if (OB_ISNULL(ls_svr = MTL(ObLSService*))) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("ls should not be null", K(ret), KP(ls)); - } else if (!lock_info.is_valid()) { + LOG_WARN("tenant storage ptr is null", KR(ret), K(tenant_id)); + } else if (OB_FAIL(ls_svr->check_ls_exist(ls_id, ls_exist))) { + LOG_WARN("fail to check log stream exist", KR(ret), K(ls_id)); + } else if (!ls_exist) { + if (OB_FAIL(try_lock_config_change_fallback_(lock_info, lock_timeout))) { + LOG_WARN("failed to try lock config change fallback", K(ret), K(lock_info)); + } else { + LOG_INFO("try lock config change fallback", K(lock_info), K(lock_timeout)); + } + } else { + ObLSHandle ls_handle; + if (OB_FAIL(get_ls_handle(tenant_id, ls_id, ls_handle))) { + LOG_WARN("failed to get ls handle", K(ret), K(lock_info)); + } else if (OB_FAIL(ls_handle.get_ls()->try_lock_config_change(lock_owner, lock_timeout))) { + LOG_WARN("failed to try lock config change", K(ret), K(lock_info)); + } else { + LOG_INFO("try lock config change", K(lock_info), K(lock_timeout)); + } + } +#ifdef ERRSIM + SERVER_EVENT_ADD("TRANSFER_LOCK", "LOCK_CONFIG_CHANGE", + "tenant_id", lock_info.tenant_id_, + "ls_id", lock_info.ls_id_.id(), + "task_id", lock_info.task_id_, + "status", lock_info.status_.str(), + "lock_owner", lock_info.lock_owner_, + "lock_member_list", lock_info.comment_); +#endif + return ret; +} + +int ObMemberListLockUtils::try_lock_config_change_fallback_( + const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout) +{ + int ret = OB_SUCCESS; + ObLSService *ls_svr = NULL; + ObStorageRpc *storage_rpc = NULL; + ObStorageHASrcInfo src_info; + src_info.cluster_id_ = GCONF.cluster_id; + if (!lock_info.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("get invalid args", K(ret), K(lock_info)); - } else if (OB_FAIL(ls->try_lock_config_change(lock_info.lock_owner_, lock_timeout))) { + } else if (OB_ISNULL(ls_svr = (MTL(ObLSService *)))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ls service should not be NULL", K(ret), KP(ls_svr)); + } else if (OB_ISNULL(storage_rpc = ls_svr->get_storage_rpc())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("storage rpc should not be NULL", K(ret), KP(storage_rpc)); + } else if (OB_FAIL(ObStorageHAUtils::get_ls_leader(lock_info.tenant_id_, lock_info.ls_id_, src_info.src_addr_))) { + LOG_WARN("failed to get ls leader", K(ret), K(lock_info)); + } else if (OB_FAIL(storage_rpc->lock_config_change(lock_info.tenant_id_, src_info, lock_info.ls_id_, + lock_info.lock_owner_, lock_timeout))) { LOG_WARN("failed to try lock config config", K(ret), K(lock_info)); - } else { -#ifdef ERRSIM - SERVER_EVENT_ADD("TRANSFER_LOCK", "LOCK_CONFIG_CHANGE", - "tenant_id", lock_info.tenant_id_, - "ls_id", lock_info.ls_id_.id(), - "task_id", lock_info.task_id_, - "status", lock_info.status_.str(), - "lock_owner", lock_info.lock_owner_, - "lock_member_list", lock_info.comment_); -#endif - int tmp_ret = OB_SUCCESS; - if (OB_TMP_FAIL(ObMemberListLockUtils::record_config_change_lock_stat(lock_info, ls))) { - LOG_WARN("failed to get config change lock stat", K(ret), K(lock_info)); - } } return ret; } -int ObMemberListLockUtils::record_config_change_lock_stat(const ObTransferTaskLockInfo &lock_info, ObLS *ls) +int ObMemberListLockUtils::get_config_change_lock_stat_( + const ObTransferTaskLockInfo &lock_info, int64_t &palf_lock_owner, bool &is_locked) { int ret = OB_SUCCESS; - int64_t palf_lock_owner = -1; - bool is_locked = false; - if (OB_ISNULL(ls)) { + bool ls_exist = false; + ObLSService *ls_svr = NULL; + const uint64_t tenant_id = lock_info.tenant_id_; + const share::ObLSID &ls_id = lock_info.ls_id_; + if (OB_ISNULL(ls_svr = MTL(ObLSService*))) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("ls should not be null", K(ret), KP(ls)); - } else if (!lock_info.is_valid()) { + LOG_WARN("tenant storage ptr is null", KR(ret), K(tenant_id)); + } else if (OB_FAIL(ls_svr->check_ls_exist(ls_id, ls_exist))) { + LOG_WARN("fail to check log stream exist", KR(ret), K(ls_id)); + } else if (!ls_exist) { + if (OB_FAIL(get_config_change_lock_stat_fallback_(lock_info, palf_lock_owner, is_locked))) { + LOG_WARN("failed to get lock config change fallback", K(ret), K(lock_info)); + } else { + LOG_INFO("get lock config change stat fallback", K(lock_info), K(palf_lock_owner), K(is_locked)); + } + } else { + ObLSHandle ls_handle; + if (OB_FAIL(get_ls_handle(tenant_id, ls_id, ls_handle))) { + LOG_WARN("failed to get ls handle", K(ret), K(lock_info)); + } else if (OB_FAIL(ls_handle.get_ls()->get_config_change_lock_stat(palf_lock_owner, is_locked))) { + LOG_WARN("failed to try lock config change", K(ret), K(lock_info)); + } else { + LOG_INFO("get lock config change stat", K(lock_info), K(palf_lock_owner), K(is_locked)); + } + } +#ifdef ERRSIM + SERVER_EVENT_ADD("TRANSFER_LOCK", "GET_CONFIG_CHANGE_LOCK_STAT", + "tenant_id", lock_info.tenant_id_, + "ls_id", lock_info.ls_id_.id(), + "task_id", lock_info.task_id_, + "status", lock_info.status_.str(), + "palf_lock_owner", palf_lock_owner, + "is_locked", is_locked); +#endif + return ret; +} + +int ObMemberListLockUtils::get_config_change_lock_stat_fallback_( + const ObTransferTaskLockInfo &lock_info, int64_t &palf_lock_owner, bool &is_locked) +{ + int ret = OB_SUCCESS; + palf_lock_owner = -1; + is_locked = false; + ObLSService *ls_svr = NULL; + ObStorageRpc *storage_rpc = NULL; + ObStorageHASrcInfo src_info; + src_info.cluster_id_ = GCONF.cluster_id; + if (!lock_info.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("get invalid args", K(ret), K(lock_info)); - } else if (OB_FAIL(ls->get_config_change_lock_stat(palf_lock_owner, is_locked))) { + } else if (OB_ISNULL(ls_svr = (MTL(ObLSService *)))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ls service should not be NULL", K(ret), KP(ls_svr)); + } else if (OB_ISNULL(storage_rpc = ls_svr->get_storage_rpc())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("storage rpc should not be NULL", K(ret), KP(storage_rpc)); + } else if (OB_FAIL(ObStorageHAUtils::get_ls_leader(lock_info.tenant_id_, lock_info.ls_id_, src_info.src_addr_))) { + LOG_WARN("failed to get ls leader", K(ret), K(lock_info)); + } else if (OB_FAIL(storage_rpc->get_config_change_lock_stat(lock_info.tenant_id_, src_info, + lock_info.ls_id_, palf_lock_owner, is_locked))) { LOG_WARN("failed to get config change lock stat", K(ret), K(lock_info)); - } else { -#ifdef ERRSIM - SERVER_EVENT_ADD("TRANSFER_LOCK", "GET_CONFIG_CHANGE_LOCK_STAT", - "tenant_id", lock_info.tenant_id_, - "ls_id", lock_info.ls_id_.id(), - "task_id", lock_info.task_id_, - "status", lock_info.status_.str(), - "palf_lock_owner", palf_lock_owner, - "is_locked", is_locked); -#endif } return ret; } -int ObMemberListLockUtils::unlock_config_change( - const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout, ObLS *ls) +int ObMemberListLockUtils::unlock_config_change_( + const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout) { int ret = OB_SUCCESS; - if (OB_ISNULL(ls)) { + bool ls_exist = false; + ObLSService *ls_svr = NULL; + const uint64_t tenant_id = lock_info.tenant_id_; + const share::ObLSID &ls_id = lock_info.ls_id_; + const int64_t lock_owner = lock_info.lock_owner_; + if (OB_ISNULL(ls_svr = MTL(ObLSService*))) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("ls should not be null", K(ret), KP(ls)); - } else if (!lock_info.is_valid()) { + LOG_WARN("tenant storage ptr is null", KR(ret), K(tenant_id)); + } else if (OB_FAIL(ls_svr->check_ls_exist(ls_id, ls_exist))) { + LOG_WARN("fail to check log stream exist", KR(ret), K(ls_id)); + } else if (!ls_exist) { + if (OB_FAIL(unlock_config_change_fallback_(lock_info, lock_timeout))) { + LOG_WARN("failed to try lock config change fallback", K(ret), K(lock_info)); + } else { + LOG_INFO("unlock lock config change fallback", K(lock_info), K(lock_timeout)); + } + } else { + ObLSHandle ls_handle; + if (OB_FAIL(get_ls_handle(tenant_id, ls_id, ls_handle))) { + LOG_WARN("failed to get ls handle", K(ret), K(lock_info)); + } else if (OB_FAIL(ls_handle.get_ls()->unlock_config_change(lock_owner, lock_timeout))) { + LOG_WARN("failed to try lock config change", K(ret), K(lock_info)); + } else { + LOG_INFO("unlock lock config change", K(lock_info), K(lock_timeout)); + } + } +#ifdef ERRSIM + SERVER_EVENT_ADD("TRANSFER_LOCK", "UNLOCK_CONFIG_CHANGE", + "tenant_id", lock_info.tenant_id_, + "ls_id", lock_info.ls_id_.id(), + "task_id", lock_info.task_id_, + "status", lock_info.status_.str(), + "lock_owner", lock_info.lock_owner_, + "unlock_member_list", lock_info.comment_); +#endif + return ret; +} + +int ObMemberListLockUtils::unlock_config_change_fallback_( + const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout) +{ + int ret = OB_SUCCESS; + ObLSService *ls_svr = NULL; + ObStorageRpc *storage_rpc = NULL; + ObStorageHASrcInfo src_info; + src_info.cluster_id_ = GCONF.cluster_id; + if (!lock_info.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("get invalid args", K(ret), K(lock_info)); - } else if (OB_FAIL(ls->unlock_config_change(lock_info.lock_owner_, lock_timeout))) { - LOG_WARN("failed to unlock config config", K(ret), K(lock_info)); - } else { -#ifdef ERRSIM - SERVER_EVENT_ADD("TRANSFER_LOCK", "UNLOCK_CONFIG_CHANGE", - "tenant_id", lock_info.tenant_id_, - "ls_id", lock_info.ls_id_.id(), - "task_id", lock_info.task_id_, - "status", lock_info.status_.str(), - "lock_owner", lock_info.lock_owner_, - "unlock_member_list", lock_info.comment_); -#endif - int tmp_ret = OB_SUCCESS; - if (OB_TMP_FAIL(ObMemberListLockUtils::record_config_change_lock_stat(lock_info, ls))) { - LOG_WARN("failed to get config change lock stat", K(ret), K(lock_info)); - } + } else if (OB_ISNULL(ls_svr = (MTL(ObLSService *)))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ls service should not be NULL", K(ret), KP(ls_svr)); + } else if (OB_ISNULL(storage_rpc = ls_svr->get_storage_rpc())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("storage rpc should not be NULL", K(ret), KP(storage_rpc)); + } else if (OB_FAIL(ObStorageHAUtils::get_ls_leader(lock_info.tenant_id_, lock_info.ls_id_, src_info.src_addr_))) { + LOG_WARN("failed to get ls leader", K(ret), K(lock_info)); + } else if (OB_FAIL(storage_rpc->unlock_config_change(lock_info.tenant_id_, src_info, lock_info.ls_id_, + lock_info.lock_owner_, lock_timeout))) { + LOG_WARN("failed to try lock config config", K(ret), K(lock_info)); } return ret; } @@ -487,13 +595,13 @@ int ObMemberListLockUtils::check_unlock_status_( } int ObMemberListLockUtils::relock_before_unlock_(const ObTransferTaskLockInfo &lock_info, - const int64_t palf_lock_owner, const int64_t lock_timeout, ObLS *ls) + const int64_t palf_lock_owner, const int64_t lock_timeout) { int ret = OB_SUCCESS; - if (OB_FAIL(try_lock_config_change(lock_info, lock_timeout, ls))) { + if (OB_FAIL(try_lock_config_change_(lock_info, lock_timeout))) { LOG_WARN("failed to try lock config change", K(ret), K(lock_info)); } else { - LOG_WARN("relock before unlock", K(ret), K(lock_info), KPC(ls)); + LOG_WARN("relock before unlock", K(ret), K(lock_info)); } #ifdef ERRSIM SERVER_EVENT_ADD("TRANSFER_LOCK", "RELOCK_BEFORE_UNLOCK", diff --git a/src/storage/high_availability/ob_transfer_lock_utils.h b/src/storage/high_availability/ob_transfer_lock_utils.h index 7b2f1dbb8..c985e6efb 100644 --- a/src/storage/high_availability/ob_transfer_lock_utils.h +++ b/src/storage/high_availability/ob_transfer_lock_utils.h @@ -29,8 +29,7 @@ public: const common::ObArray &lock_ls_list, const common::ObMemberList &member_list, const ObTransferLockStatus &status, common::ObMySQLProxy &sql_proxy); static int lock_ls_member_list(const uint64_t tenant_id, const share::ObLSID &ls_id, const int64_t task_id, - const common::ObMemberList &member_list, const ObTransferLockStatus &status, ObLS *ls, - common::ObMySQLProxy &sql_proxy); + const common::ObMemberList &member_list, const ObTransferLockStatus &status, common::ObMySQLProxy &sql_proxy); static int unlock_ls_member_list(const uint64_t tenant_id, const share::ObLSID &ls_id, const int64_t task_id, const common::ObMemberList &member_list, const ObTransferLockStatus &status, common::ObMySQLProxy &sql_proxy); @@ -50,9 +49,12 @@ private: private: /* palf lock config*/ - static int try_lock_config_change(const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout, ObLS *ls); - static int record_config_change_lock_stat(const ObTransferTaskLockInfo &lock_info, ObLS *ls); - static int unlock_config_change(const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout, ObLS *ls); + static int try_lock_config_change_(const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout); + static int try_lock_config_change_fallback_(const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout); + static int get_config_change_lock_stat_(const ObTransferTaskLockInfo &lock_info, int64_t &palf_lock_owner, bool &is_locked); + static int get_config_change_lock_stat_fallback_(const ObTransferTaskLockInfo &lock_info, int64_t &palf_lock_owner, bool &is_locked); + static int unlock_config_change_(const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout); + static int unlock_config_change_fallback_(const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout); private: static int check_lock_status_( @@ -60,7 +62,7 @@ private: static int check_unlock_status_(const bool palf_is_locked, const int64_t palf_lock_owner, const int64_t inner_table_lock_owner, bool &need_unlock, bool &need_relock_before_unlock); static int relock_before_unlock_(const ObTransferTaskLockInfo &lock_info, const int64_t palf_lock_owner, - const int64_t lock_timeout, ObLS *ls); + const int64_t lock_timeout); private: static const int64_t CONFIG_CHANGE_TIMEOUT = 10 * 1000 * 1000L; // 10s diff --git a/src/storage/ob_storage_rpc.cpp b/src/storage/ob_storage_rpc.cpp index 63adecae2..280d67004 100644 --- a/src/storage/ob_storage_rpc.cpp +++ b/src/storage/ob_storage_rpc.cpp @@ -1120,6 +1120,51 @@ void ObStorageUnBlockTxArg::reset() OB_SERIALIZE_MEMBER(ObStorageUnBlockTxArg, tenant_id_, ls_id_, gts_); +ObStorageConfigChangeOpArg::ObStorageConfigChangeOpArg() + : tenant_id_(OB_INVALID_ID), + ls_id_(), + type_(MAX), + lock_owner_(0), + lock_timeout_(0) +{ +} + +bool ObStorageConfigChangeOpArg::is_valid() const +{ + return OB_INVALID_ID != tenant_id_ + && ls_id_.is_valid() + && type_ >= LOCK_CONFIG_CHANGE + && type_ < MAX; +} + +void ObStorageConfigChangeOpArg::reset() +{ + tenant_id_ = OB_INVALID_ID; + ls_id_.reset(); + type_ = MAX; + lock_owner_ = 0; + lock_timeout_ = 0; +} + +OB_SERIALIZE_MEMBER(ObStorageConfigChangeOpArg, tenant_id_, + ls_id_, type_, lock_owner_, lock_timeout_); + +ObStorageConfigChangeOpRes::ObStorageConfigChangeOpRes() + : palf_lock_owner_(0), + is_locked_(false), + op_succ_(false) +{ +} + +void ObStorageConfigChangeOpRes::reset() +{ + palf_lock_owner_ = 0; + is_locked_ = false; + op_succ_ = false; +} + +OB_SERIALIZE_MEMBER(ObStorageConfigChangeOpRes, palf_lock_owner_, is_locked_, op_succ_); + template ObStorageStreamRpcP::ObStorageStreamRpcP(common::ObInOutBandwidthThrottle *bandwidth_throttle) : bandwidth_throttle_(bandwidth_throttle), @@ -3050,6 +3095,126 @@ int ObStorageUnBlockTxP::process() return ret; } +ObStorageLockConfigChangeP::ObStorageLockConfigChangeP( + common::ObInOutBandwidthThrottle *bandwidth_throttle) + : ObStorageStreamRpcP(bandwidth_throttle) +{ +} + +int ObStorageLockConfigChangeP::process() +{ + int ret = OB_SUCCESS; + const uint64_t tenant_id = arg_.tenant_id_; + const share::ObLSID &ls_id = arg_.ls_id_; + const int64_t lock_owner = arg_.lock_owner_; + const int64_t lock_timeout = arg_.lock_timeout_; + MTL_SWITCH(tenant_id) { + ObLSHandle ls_handle; + ObLSService *ls_service = NULL; + ObLS *ls = NULL; + if (!arg_.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("get invalid args", K(ret), K_(arg)); + } else if (arg_.type_ != ObStorageConfigChangeOpArg::LOCK_CONFIG_CHANGE) { + ret = OB_ERR_SYS; + LOG_WARN("type not match", K(ret), K_(arg)); + } else if (OB_ISNULL(ls_service = MTL(ObLSService *))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ls service should not be null", K(ret), KP(ls_service)); + } else if (OB_FAIL(ls_service->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD))) { + LOG_WARN("fail to get log stream", KR(ret), K(arg_)); + } else if (OB_ISNULL(ls = ls_handle.get_ls())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("log stream should not be NULL", KR(ret), K(arg_), KP(ls)); + } else if (OB_FAIL(ls->try_lock_config_change(lock_owner, lock_timeout))) { + LOG_WARN("failed to try lock config config", K(ret), K_(arg)); + } else { + result_.op_succ_ = true; + LOG_INFO("lock config change success", K(arg_)); + } + } + return ret; +} + +ObStorageUnlockConfigChangeP::ObStorageUnlockConfigChangeP( + common::ObInOutBandwidthThrottle *bandwidth_throttle) + : ObStorageStreamRpcP(bandwidth_throttle) +{ +} + +int ObStorageUnlockConfigChangeP::process() +{ + int ret = OB_SUCCESS; + const uint64_t tenant_id = arg_.tenant_id_; + const share::ObLSID &ls_id = arg_.ls_id_; + const int64_t lock_owner = arg_.lock_owner_; + const int64_t lock_timeout = arg_.lock_timeout_; + MTL_SWITCH(tenant_id) { + ObLSHandle ls_handle; + ObLSService *ls_service = NULL; + ObLS *ls = NULL; + if (!arg_.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("get invalid args", K(ret), K_(arg)); + } else if (arg_.type_ != ObStorageConfigChangeOpArg::UNLOCK_CONFIG_CHANGE) { + ret = OB_ERR_SYS; + LOG_WARN("type not match", K(ret), K_(arg)); + } else if (OB_ISNULL(ls_service = MTL(ObLSService *))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ls service should not be null", K(ret), KP(ls_service)); + } else if (OB_FAIL(ls_service->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD))) { + LOG_WARN("fail to get log stream", KR(ret), K(arg_)); + } else if (OB_ISNULL(ls = ls_handle.get_ls())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("log stream should not be NULL", KR(ret), K(arg_), KP(ls)); + } else if (OB_FAIL(ls->unlock_config_change(lock_owner, lock_timeout))) { + LOG_WARN("failed to try lock config config", K(ret), K_(arg)); + } else { + result_.op_succ_ = true; + LOG_INFO("unlock config change success", K(arg_)); + } + } + return ret; +} + +ObStorageGetLogConfigStatP::ObStorageGetLogConfigStatP( + common::ObInOutBandwidthThrottle *bandwidth_throttle) + : ObStorageStreamRpcP(bandwidth_throttle) +{ +} + +int ObStorageGetLogConfigStatP::process() +{ + int ret = OB_SUCCESS; + const uint64_t tenant_id = arg_.tenant_id_; + const share::ObLSID &ls_id = arg_.ls_id_; + MTL_SWITCH(tenant_id) { + ObLSHandle ls_handle; + ObLSService *ls_service = NULL; + ObLS *ls = NULL; + if (!arg_.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("get invalid args", K(ret), K_(arg)); + } else if (arg_.type_ != ObStorageConfigChangeOpArg::GET_CONFIG_CHANGE_LOCK_STAT) { + ret = OB_ERR_SYS; + LOG_WARN("type not match", K(ret), K_(arg)); + } else if (OB_ISNULL(ls_service = MTL(ObLSService *))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ls service should not be null", K(ret), KP(ls_service)); + } else if (OB_FAIL(ls_service->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD))) { + LOG_WARN("fail to get log stream", KR(ret), K(arg_)); + } else if (OB_ISNULL(ls = ls_handle.get_ls())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("log stream should not be NULL", KR(ret), K(arg_), KP(ls)); + } else if (OB_FAIL(ls->get_config_change_lock_stat(result_.palf_lock_owner_, result_.is_locked_))) { + LOG_WARN("failed to try lock config config", K(ret), K_(arg)); + } else { + result_.op_succ_ = true; + LOG_INFO("get config change lock stat success", K(arg_), K(result_)); + } + } + return ret; +} } //namespace obrpc @@ -3645,6 +3810,106 @@ int ObStorageRpc::unblock_tx( return ret; } +int ObStorageRpc::lock_config_change( + const uint64_t tenant_id, + const ObStorageHASrcInfo &src_info, + const share::ObLSID &ls_id, + const int64_t lock_owner, + const int64_t lock_timeout) +{ + int ret = OB_SUCCESS; + if (!is_inited_) { + ret = OB_NOT_INIT; + STORAGE_LOG(WARN, "storage rpc is not inited", K(ret)); + } else if (tenant_id == OB_INVALID_ID || !src_info.is_valid() || !ls_id.is_valid()) { + ret = OB_INVALID_ARGUMENT; + STORAGE_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(src_info), K(ls_id)); + } else { + ObStorageConfigChangeOpArg arg; + ObStorageConfigChangeOpRes res; + arg.tenant_id_ = tenant_id; + arg.ls_id_ = ls_id; + arg.type_ = ObStorageConfigChangeOpArg::LOCK_CONFIG_CHANGE; + arg.lock_owner_ = lock_owner; + arg.lock_timeout_ = lock_timeout; + const int64_t timeout = GCONF.sys_bkgd_migration_change_member_list_timeout; + if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_) + .timeout(timeout) + .dst_cluster_id(src_info.cluster_id_) + .lock_config_change(arg, res))) { + LOG_WARN("failed to replace member", K(ret), K(src_info), K(arg)); + } + } + return ret; +} + +int ObStorageRpc::unlock_config_change( + const uint64_t tenant_id, + const ObStorageHASrcInfo &src_info, + const share::ObLSID &ls_id, + const int64_t lock_owner, + const int64_t lock_timeout) +{ + int ret = OB_SUCCESS; + if (!is_inited_) { + ret = OB_NOT_INIT; + STORAGE_LOG(WARN, "storage rpc is not inited", K(ret)); + } else if (tenant_id == OB_INVALID_ID || !src_info.is_valid() || !ls_id.is_valid()) { + ret = OB_INVALID_ARGUMENT; + STORAGE_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(src_info), K(ls_id)); + } else { + ObStorageConfigChangeOpArg arg; + ObStorageConfigChangeOpRes res; + arg.tenant_id_ = tenant_id; + arg.ls_id_ = ls_id; + arg.type_ = ObStorageConfigChangeOpArg::UNLOCK_CONFIG_CHANGE; + arg.lock_owner_ = lock_owner; + arg.lock_timeout_ = lock_timeout; + const int64_t timeout = GCONF.sys_bkgd_migration_change_member_list_timeout; + if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_) + .timeout(timeout) + .dst_cluster_id(src_info.cluster_id_) + .unlock_config_change(arg, res))) { + LOG_WARN("failed to replace member", K(ret), K(src_info), K(arg)); + } + } + return ret; +} + +int ObStorageRpc::get_config_change_lock_stat( + const uint64_t tenant_id, + const ObStorageHASrcInfo &src_info, + const share::ObLSID &ls_id, + int64_t &palf_lock_owner, + bool &is_locked) +{ + int ret = OB_SUCCESS; + if (!is_inited_) { + ret = OB_NOT_INIT; + STORAGE_LOG(WARN, "storage rpc is not inited", K(ret)); + } else if (tenant_id == OB_INVALID_ID || !src_info.is_valid() || !ls_id.is_valid()) { + ret = OB_INVALID_ARGUMENT; + STORAGE_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(src_info), K(ls_id)); + } else { + ObStorageConfigChangeOpArg arg; + ObStorageConfigChangeOpRes res; + arg.tenant_id_ = tenant_id; + arg.ls_id_ = ls_id; + arg.type_ = ObStorageConfigChangeOpArg::GET_CONFIG_CHANGE_LOCK_STAT; + const int64_t timeout = GCONF.sys_bkgd_migration_change_member_list_timeout; + if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_) + .by(tenant_id) + .timeout(timeout) + .dst_cluster_id(src_info.cluster_id_) + .get_config_change_lock_stat(arg, res))) { + LOG_WARN("failed to replace member", K(ret), K(src_info), K(arg)); + } else { + palf_lock_owner = res.palf_lock_owner_; + is_locked = res.is_locked_; + } + } + return ret; +} } // storage } // oceanbase diff --git a/src/storage/ob_storage_rpc.h b/src/storage/ob_storage_rpc.h index 5cda7bda2..05974199b 100755 --- a/src/storage/ob_storage_rpc.h +++ b/src/storage/ob_storage_rpc.h @@ -712,6 +712,47 @@ public: share::SCN gts_; }; +struct ObStorageConfigChangeOpArg final +{ + OB_UNIS_VERSION(1); +public: + enum TYPE + { + LOCK_CONFIG_CHANGE = 0, + UNLOCK_CONFIG_CHANGE = 1, + GET_CONFIG_CHANGE_LOCK_STAT = 2, + MAX, + }; +public: + ObStorageConfigChangeOpArg(); + ~ObStorageConfigChangeOpArg() {} + bool is_valid() const; + void reset(); + TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(lock_owner), K_(lock_timeout)); + uint64_t tenant_id_; + share::ObLSID ls_id_; + TYPE type_; + int64_t lock_owner_; + int64_t lock_timeout_; +private: + DISALLOW_COPY_AND_ASSIGN(ObStorageConfigChangeOpArg); +}; + +struct ObStorageConfigChangeOpRes final +{ + OB_UNIS_VERSION(1); +public: + ObStorageConfigChangeOpRes(); + ~ObStorageConfigChangeOpRes() {} + void reset(); + TO_STRING_KV(K_(palf_lock_owner), K_(is_locked), K_(op_succ)); + int64_t palf_lock_owner_; + bool is_locked_; + bool op_succ_; +private: + DISALLOW_COPY_AND_ASSIGN(ObStorageConfigChangeOpRes); +}; + //src class ObStorageRpcProxy : public obrpc::ObRpcProxy { @@ -744,6 +785,9 @@ public: RPC_S(PR5 block_tx, OB_HA_BLOCK_TX, (ObStorageBlockTxArg)); RPC_S(PR5 kill_tx, OB_HA_KILL_TX, (ObStorageKillTxArg)); RPC_S(PR5 unblock_tx, OB_HA_UNBLOCK_TX, (ObStorageUnBlockTxArg)); + RPC_S(PR5 lock_config_change, OB_HA_LOCK_CONFIG_CHANGE, (ObStorageConfigChangeOpArg), ObStorageConfigChangeOpRes); + RPC_S(PR5 unlock_config_change, OB_HA_UNLOCK_CONFIG_CHANGE, (ObStorageConfigChangeOpArg), ObStorageConfigChangeOpRes); + RPC_S(PR5 get_config_change_lock_stat, OB_HA_GET_CONFIG_CHANGE_LOCK_STAT, (ObStorageConfigChangeOpArg), ObStorageConfigChangeOpRes); }; template @@ -1035,6 +1079,36 @@ protected: int process(); }; +class ObStorageLockConfigChangeP: + public ObStorageStreamRpcP +{ +public: + explicit ObStorageLockConfigChangeP(common::ObInOutBandwidthThrottle *bandwidth_throttle); + virtual ~ObStorageLockConfigChangeP() {} +protected: + int process(); +}; + +class ObStorageUnlockConfigChangeP: + public ObStorageStreamRpcP +{ +public: + explicit ObStorageUnlockConfigChangeP(common::ObInOutBandwidthThrottle *bandwidth_throttle); + virtual ~ObStorageUnlockConfigChangeP() {} +protected: + int process(); +}; + +class ObStorageGetLogConfigStatP: + public ObStorageStreamRpcP +{ +public: + explicit ObStorageGetLogConfigStatP(common::ObInOutBandwidthThrottle *bandwidth_throttle); + virtual ~ObStorageGetLogConfigStatP() {} +protected: + int process(); +}; + } // obrpc @@ -1164,6 +1238,24 @@ public: const ObStorageHASrcInfo &src_info, const share::ObLSID &ls_id, const share::SCN >s) = 0; + virtual int lock_config_change( + const uint64_t tenant_id, + const ObStorageHASrcInfo &src_info, + const share::ObLSID &ls_id, + const int64_t lock_owner, + const int64_t lock_timeout) = 0; + virtual int unlock_config_change( + const uint64_t tenant_id, + const ObStorageHASrcInfo &src_info, + const share::ObLSID &ls_id, + const int64_t lock_owner, + const int64_t lock_timeout) = 0; + virtual int get_config_change_lock_stat( + const uint64_t tenant_id, + const ObStorageHASrcInfo &src_info, + const share::ObLSID &ls_id, + int64_t &palf_lock_owner, + bool &is_locked) = 0; }; class ObStorageRpc: public ObIStorageRpc @@ -1286,6 +1378,24 @@ public: const ObStorageHASrcInfo &src_info, const share::ObLSID &ls_id, const share::SCN >s); + virtual int lock_config_change( + const uint64_t tenant_id, + const ObStorageHASrcInfo &src_info, + const share::ObLSID &ls_id, + const int64_t lock_owner, + const int64_t lock_timeout); + virtual int unlock_config_change( + const uint64_t tenant_id, + const ObStorageHASrcInfo &src_info, + const share::ObLSID &ls_id, + const int64_t lock_owner, + const int64_t lock_timeout); + virtual int get_config_change_lock_stat( + const uint64_t tenant_id, + const ObStorageHASrcInfo &src_info, + const share::ObLSID &ls_id, + int64_t &palf_lock_owner, + bool &is_locked); private: bool is_inited_;