From a43f5a770acefe5e686d5d267b085136cb6b4f5f Mon Sep 17 00:00:00 2001 From: oceanoverflow Date: Thu, 6 Jul 2023 02:12:16 +0000 Subject: [PATCH] change transfer start and abort stage execution to src ls leader --- deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h | 1 + src/observer/ob_srv_xlator_partition.cpp | 1 + src/rootserver/ob_tenant_transfer_service.cpp | 20 +-- src/rootserver/ob_tenant_transfer_service.h | 4 +- src/share/ob_debug_sync_point.h | 1 + .../transfer/ob_transfer_task_operator.cpp | 83 +++++++++--- .../transfer/ob_transfer_task_operator.h | 25 ++++ .../high_availability/ob_storage_ha_utils.cpp | 1 + .../high_availability/ob_transfer_handler.cpp | 125 ++++++++++++++++-- .../high_availability/ob_transfer_handler.h | 9 +- src/storage/ob_storage_rpc.cpp | 65 +++++++++ src/storage/ob_storage_rpc.h | 30 +++++ 12 files changed, 323 insertions(+), 42 deletions(-) diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h index 2d074cf883..24b76614d7 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h @@ -473,6 +473,7 @@ PCODE_DEF(OB_HA_UNBLOCK_TX, 0x4B5) PCODE_DEF(OB_HA_LOCK_CONFIG_CHANGE, 0x4B6) PCODE_DEF(OB_HA_UNLOCK_CONFIG_CHANGE, 0x4B7) PCODE_DEF(OB_HA_GET_CONFIG_CHANGE_LOCK_STAT, 0x4B8) +PCODE_DEF(OB_HA_WAKEUP_TRANSFER_SERVICE, 0x4B9) // sql, including executor diff --git a/src/observer/ob_srv_xlator_partition.cpp b/src/observer/ob_srv_xlator_partition.cpp index eaf3a44413..ec48effdb2 100644 --- a/src/observer/ob_srv_xlator_partition.cpp +++ b/src/observer/ob_srv_xlator_partition.cpp @@ -160,6 +160,7 @@ void oceanbase::observer::init_srv_xlator_for_migration(ObSrvRpcXlator *xlator) RPC_PROCESSOR(ObStorageLockConfigChangeP, gctx_.bandwidth_throttle_); RPC_PROCESSOR(ObStorageUnlockConfigChangeP, gctx_.bandwidth_throttle_); RPC_PROCESSOR(ObStorageGetLogConfigStatP, gctx_.bandwidth_throttle_); + RPC_PROCESSOR(ObStorageWakeupTransferServiceP, gctx_.bandwidth_throttle_); } void oceanbase::observer::init_srv_xlator_for_others(ObSrvRpcXlator *xlator) { diff --git a/src/rootserver/ob_tenant_transfer_service.cpp b/src/rootserver/ob_tenant_transfer_service.cpp index f466da4c6b..3e3d5014f1 100644 --- a/src/rootserver/ob_tenant_transfer_service.cpp +++ b/src/rootserver/ob_tenant_transfer_service.cpp @@ -310,7 +310,7 @@ int ObTenantTransferService::process_init_task_(const ObTransferTaskID task_id) DEBUG_SYNC(AFTER_TRANSFER_PROCESS_INIT_TASK_AND_BEFORE_NOTIFY_STORAGE); } if (OB_FAIL(ret) || task.get_tablet_list().empty()) { - } else if (OB_FAIL(notify_storage_transfer_service_(task_id, task.get_dest_ls()))) { + } else if (OB_FAIL(notify_storage_transfer_service_(task_id, task.get_src_ls()))) { LOG_WARN("notify storage transfer service failed", KR(ret), K(task_id), K(task)); } TTS_INFO("process init task finish", KR(ret), K(task_id), @@ -1424,7 +1424,7 @@ int ObTenantTransferService::unlock_table_lock_( int ObTenantTransferService::notify_storage_transfer_service_( const ObTransferTaskID task_id, - const ObLSID &dest_ls) + const ObLSID &src_ls) { int ret = OB_SUCCESS; obrpc::ObStartTransferTaskArg arg; @@ -1432,25 +1432,25 @@ int ObTenantTransferService::notify_storage_transfer_service_( if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not init", KR(ret)); - } else if (OB_UNLIKELY(! task_id.is_valid() || !dest_ls.is_valid())) { + } else if (OB_UNLIKELY(! task_id.is_valid() || !src_ls.is_valid())) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid arg", KR(ret), K(task_id), K(dest_ls)); - } else if (OB_FAIL(arg.init(tenant_id_, task_id, dest_ls))) { - LOG_WARN("init ObStartTransferTaskArg failed", KR(ret), K(task_id), K(dest_ls)); + LOG_WARN("invalid arg", KR(ret), K(task_id), K(src_ls)); + } else if (OB_FAIL(arg.init(tenant_id_, task_id, src_ls))) { + LOG_WARN("init ObStartTransferTaskArg failed", KR(ret), K(task_id), K(src_ls)); } else if (OB_ISNULL(GCTX.location_service_) || OB_ISNULL(GCTX.srv_rpc_proxy_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("GCTX has null ptr", KR(ret), K(task_id), K_(tenant_id)); } else if (OB_FAIL(GCTX.location_service_->get_leader_with_retry_until_timeout( GCONF.cluster_id, tenant_id_, - dest_ls, + src_ls, leader_addr))) { // default 1s timeout LOG_WARN("get leader failed", KR(ret), K(task_id), - "cluster_id", GCONF.cluster_id.get_value(), K_(tenant_id), K(dest_ls), K(leader_addr)); + "cluster_id", GCONF.cluster_id.get_value(), K_(tenant_id), K(src_ls), K(leader_addr)); } else if (OB_FAIL(GCTX.srv_rpc_proxy_->to(leader_addr).by(tenant_id_).start_transfer_task(arg))) { - LOG_WARN("send rpc failed", KR(ret), K(task_id), K(dest_ls), K(leader_addr), K(arg)); + LOG_WARN("send rpc failed", KR(ret), K(task_id), K(src_ls), K(leader_addr), K(arg)); } - TTS_INFO("send rpc to storage finished", KR(ret), K(task_id), K(dest_ls), K(leader_addr), K(arg)); + TTS_INFO("send rpc to storage finished", KR(ret), K(task_id), K(src_ls), K(leader_addr), K(arg)); return ret; } diff --git a/src/rootserver/ob_tenant_transfer_service.h b/src/rootserver/ob_tenant_transfer_service.h index bba79c8e9f..062deec6ac 100644 --- a/src/rootserver/ob_tenant_transfer_service.h +++ b/src/rootserver/ob_tenant_transfer_service.h @@ -161,7 +161,9 @@ private: int unlock_and_clear_task_( const share::ObTransferTaskID task_id, share::ObTransferTask &task); - int notify_storage_transfer_service_(const share::ObTransferTaskID task_id, const share::ObLSID &dest_ls); + int notify_storage_transfer_service_( + const share::ObTransferTaskID task_id, + const share::ObLSID &src_ls); int add_in_trans_lock_and_refresh_schema_( ObMySQLTransaction &trans, const share::ObLSID &src_ls, diff --git a/src/share/ob_debug_sync_point.h b/src/share/ob_debug_sync_point.h index 61ee6d4b57..d99a288729 100755 --- a/src/share/ob_debug_sync_point.h +++ b/src/share/ob_debug_sync_point.h @@ -541,6 +541,7 @@ class ObString; ACT(STOP_RECOVERY_LS_THREAD0,)\ ACT(STOP_RECOVERY_LS_THREAD1,)\ ACT(STOP_TRANSFER_LS_LOGICAL_TABLE_REPLACED,)\ + ACT(BEFORE_TRANSFER_DOING,)\ ACT(MAX_DEBUG_SYNC_POINT,) DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF); diff --git a/src/share/transfer/ob_transfer_task_operator.cpp b/src/share/transfer/ob_transfer_task_operator.cpp index d78793c087..3e2a5092ab 100644 --- a/src/share/transfer/ob_transfer_task_operator.cpp +++ b/src/share/transfer/ob_transfer_task_operator.cpp @@ -167,6 +167,23 @@ int ObTransferTaskOperator::get_all_task_status( return ret; } +int ObTransferTaskOperator::get_by_src_ls( + common::ObISQLClient &sql_proxy, + const uint64_t tenant_id, + const ObLSID &src_ls, + ObTransferTask &task) +{ + int ret = OB_SUCCESS; + const bool is_src_ls = true; + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || !src_ls.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(src_ls)); + } else if (OB_FAIL(get_by_ls_id_(sql_proxy, tenant_id, src_ls, is_src_ls, task))) { + LOG_WARN("failed to get by ls id", K(ret), K(tenant_id), K(src_ls)); + } + return ret; +} + int ObTransferTaskOperator::get_by_dest_ls( common::ObISQLClient &sql_proxy, const uint64_t tenant_id, @@ -174,28 +191,12 @@ int ObTransferTaskOperator::get_by_dest_ls( ObTransferTask &task) { int ret = OB_SUCCESS; + const bool is_src_ls = false; if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || !dest_ls.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(dest_ls)); - } else { - ObSqlString sql; - SMART_VAR(ObISQLClient::ReadResult, result) { - if (OB_FAIL(sql.assign_fmt("SELECT * FROM %s WHERE dest_ls = %ld", - OB_ALL_TRANSFER_TASK_TNAME, dest_ls.id()))) { - LOG_WARN("fail to assign sql", KR(ret), K(dest_ls)); - } else if (OB_FAIL(sql_proxy.read(result, tenant_id, sql.ptr()))) { - LOG_WARN("execute sql failed", KR(ret), K(tenant_id), K(sql)); - } else if (OB_ISNULL(result.get_result())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("get mysql result failed", KR(ret), K(tenant_id), K(sql)); - } else if (OB_FAIL(construct_transfer_task_(*result.get_result(), task))) { - if (OB_ENTRY_NOT_EXIST != ret) { - LOG_WARN("construct transfer task failed", KR(ret), K(tenant_id), K(dest_ls), K(sql)); - } else { - LOG_TRACE("dest ls transfer task not found", KR(ret), K(tenant_id), K(dest_ls)); - } - } - } + } else if (OB_FAIL(get_by_ls_id_(sql_proxy, tenant_id, dest_ls, is_src_ls, task))) { + LOG_WARN("failed to get by ls id", K(ret), K(tenant_id), K(dest_ls)); } return ret; } @@ -681,6 +682,50 @@ int ObTransferTaskOperator::update_finish_scn( return ret; } +int ObTransferTaskOperator::get_by_ls_id_( + common::ObISQLClient &sql_proxy, + const uint64_t tenant_id, + const ObLSID &ls_id, + const bool is_src_ls, + ObTransferTask &task) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || !ls_id.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(ls_id)); + } else { + ObSqlString sql; + SMART_VAR(ObISQLClient::ReadResult, result) { + if (is_src_ls) { + if (OB_FAIL(sql.assign_fmt("SELECT * FROM %s WHERE src_ls = %ld", + OB_ALL_TRANSFER_TASK_TNAME, ls_id.id()))) { + LOG_WARN("fail to assign sql", KR(ret), K(ls_id)); + } + } else { + if (OB_FAIL(sql.assign_fmt("SELECT * FROM %s WHERE dest_ls = %ld", + OB_ALL_TRANSFER_TASK_TNAME, ls_id.id()))) { + LOG_WARN("fail to assign sql", KR(ret), K(ls_id)); + } + } + + if (FAILEDx(sql_proxy.read(result, tenant_id, sql.ptr()))) { + LOG_WARN("execute sql failed", KR(ret), K(tenant_id), K(sql)); + } else if (OB_ISNULL(result.get_result())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get mysql result failed", KR(ret), K(tenant_id), K(sql)); + } else if (OB_FAIL(construct_transfer_task_(*result.get_result(), task))) { + if (OB_ENTRY_NOT_EXIST != ret) { + LOG_WARN("construct transfer task failed", KR(ret), K(tenant_id), K(ls_id), K(sql)); + } else { + LOG_WARN("dest ls transfer task not found", KR(ret), K(tenant_id), K(ls_id)); + } + } + } + } + + return ret; +} + int ObTransferTaskOperator::construct_transfer_tasks_( common::sqlclient::ObMySQLResult &res, ObIArray &tasks) diff --git a/src/share/transfer/ob_transfer_task_operator.h b/src/share/transfer/ob_transfer_task_operator.h index 46213322bc..c9bdb1ca74 100644 --- a/src/share/transfer/ob_transfer_task_operator.h +++ b/src/share/transfer/ob_transfer_task_operator.h @@ -116,6 +116,25 @@ public: const uint64_t tenant_id, common::ObIArray &task_status); + /* + * get transfer task by src ls (there is no more than 1 transfer task on a ls) + * + * @param [in] sql_proxy: sql client + * @param [in] tenant_id: target tenant_id + * @param [in] src_ls: src ls_id + * @param [out] task: transfer task + * @return + * - OB_ENTRY_NOT_EXIST: not found + * - OB_ERR_UNEXPECTED: more than 1 transfer task on a ls + * - OB_SUCCESS: successful + * - other: failed + */ + static int get_by_src_ls( + common::ObISQLClient &sql_proxy, + const uint64_t tenant_id, + const ObLSID &src_ls, + ObTransferTask &task); + /* * get transfer task by dest ls (there is no more than 1 transfer task on a ls) * @@ -375,6 +394,12 @@ public: const ObTransferTaskComment &comment); private: + static int get_by_ls_id_( + common::ObISQLClient &sql_proxy, + const uint64_t tenant_id, + const ObLSID &ls_id, + const bool is_src_ls, + ObTransferTask &task); static int construct_transfer_tasks_( common::sqlclient::ObMySQLResult &res, common::ObIArray &tasks); diff --git a/src/storage/high_availability/ob_storage_ha_utils.cpp b/src/storage/high_availability/ob_storage_ha_utils.cpp index 61174176d2..3dd2fffbed 100644 --- a/src/storage/high_availability/ob_storage_ha_utils.cpp +++ b/src/storage/high_availability/ob_storage_ha_utils.cpp @@ -273,6 +273,7 @@ bool ObTransferUtils::is_need_retry_error(const int err) switch (err) { //Has active trans need retry case OB_TRANSFER_MEMBER_LIST_NOT_SAME: + case OB_LS_LOCATION_LEADER_NOT_EXIST: case OB_PARTITION_NOT_LEADER: case OB_TRANS_TIMEOUT: case OB_TIMEOUT: diff --git a/src/storage/high_availability/ob_transfer_handler.cpp b/src/storage/high_availability/ob_transfer_handler.cpp index 5d4dfa197e..586c3fd377 100644 --- a/src/storage/high_availability/ob_transfer_handler.cpp +++ b/src/storage/high_availability/ob_transfer_handler.cpp @@ -107,6 +107,33 @@ void ObTransferHandler::wakeup_() } } +int ObTransferHandler::wakeup_dest_ls_leader_(const share::ObTransferTaskInfo &task_info) +{ + int ret = OB_SUCCESS; + const uint64_t tenant_id = task_info.tenant_id_; + const share::ObLSID &dest_ls_id = task_info.dest_ls_id_; + ObLSService *ls_svr = NULL; + common::ObAddr leader_addr; + ObStorageHASrcInfo src_info; + ObStorageRpc *storage_rpc = NULL; + if (OB_ISNULL(ls_svr = (MTL(ObLSService *)))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ls service should not be NULL", K(ret), KP(ls_svr)); + } else if (OB_ISNULL(storage_rpc = ls_svr->get_storage_rpc())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("storage rpc should not be NULL", K(ret), KP(storage_rpc)); + } else if (OB_FAIL(ObStorageHAUtils::get_ls_leader(tenant_id, dest_ls_id, leader_addr))) { + LOG_WARN("failed to get ls leader", K(ret), K(tenant_id)); + } else { + src_info.src_addr_ = leader_addr; + src_info.cluster_id_ = GCONF.cluster_id; + if (OB_FAIL(storage_rpc->wakeup_transfer_service(tenant_id, src_info))) { + LOG_WARN("failed to wakeup dest ls leader", K(ret), K(task_info), K(src_info)); + } + } + return ret; +} + int ObTransferHandler::get_transfer_task_(ObTransferTaskInfo &task_info) { int ret = OB_SUCCESS; @@ -114,7 +141,7 @@ int ObTransferHandler::get_transfer_task_(ObTransferTaskInfo &task_info) if (!is_inited_) { ret = OB_NOT_INIT; LOG_WARN("transfer handler do not init", K(ret)); - } else if (OB_FAIL(get_transfer_task_from_inner_table_(task_info))) { + } else if (OB_FAIL(fetch_transfer_task_from_inner_table_(task_info))) { if (OB_ENTRY_NOT_EXIST != ret) { LOG_WARN("failed to get transfer task from inner table", K(ret), KPC(ls_)); } @@ -148,24 +175,84 @@ int ObTransferHandler::get_transfer_task_from_inner_table_( return ret; } -int ObTransferHandler::get_transfer_task_from_inner_table_( +int ObTransferHandler::fetch_transfer_task_from_inner_table_( share::ObTransferTaskInfo &task_info) { int ret = OB_SUCCESS; + // currently START stage is executed on src ls leader + // and DOING stage is executed on dest ls leader + // so here try to fetch task by src ls first, then dest ls later + // either one succeeded will return the task + bool src_exist = false; + bool dst_exist = false; + share::ObTransferTaskInfo src_task_info; + share::ObTransferTaskInfo dst_task_info; + if (OB_FAIL(fetch_transfer_task_from_inner_table_by_src_ls_(src_task_info, src_exist))) { + LOG_WARN("failed to fetch transfer task from inner table by src ls", K(ret)); + } else if (OB_FAIL(fetch_transfer_task_from_inner_table_by_dest_ls_(dst_task_info, dst_exist))) { + LOG_WARN("failed to fetch transfer task from inner table by dst ls", K(ret)); + } else if (src_exist && dst_exist) { + ret = OB_SCHEDULER_TASK_CNT_MISTACH; + LOG_WARN("src task info and dst task info transfer ls overlap", K(ret), K(src_task_info), K(dst_task_info)); + } else if (src_exist && OB_FAIL(task_info.assign(src_task_info))) { + LOG_WARN("failed to assign task info", K(ret), K(src_task_info)); + } else if (dst_exist && OB_FAIL(task_info.assign(dst_task_info))) { + LOG_WARN("failed to assign task info", K(ret), K(dst_task_info)); + } + return ret; +} + +int ObTransferHandler::fetch_transfer_task_from_inner_table_by_src_ls_( + share::ObTransferTaskInfo &task_info, + bool &task_exist) +{ + int ret = OB_SUCCESS; + task_exist = false; task_info.reset(); const uint64_t tenant_id = MTL_ID(); - const bool for_update = false; + const ObLSID &src_ls_id = ls_->get_ls_id(); ObTransferTask task; - const ObLSID &dest_ls_id = ls_->get_ls_id(); - - if (OB_FAIL(ObTransferTaskOperator::get_by_dest_ls(*sql_proxy_, tenant_id, dest_ls_id, task))) { - if (OB_ENTRY_NOT_EXIST != ret) { - LOG_WARN("failed to get transfer task", K(ret), K(tenant_id), K(dest_ls_id)); - } + if (OB_FAIL(ObTransferTaskOperator::get_by_src_ls( + *sql_proxy_, tenant_id, src_ls_id, task))) { + LOG_WARN("failed to get transfer task", K(ret), K(tenant_id), K(src_ls_id)); } else if (OB_FAIL(task_info.convert_from(tenant_id, task))) { LOG_WARN("failed to convert from transfer task", K(ret), K(task)); + } else if (!task_info.status_.is_start_status() + && !task_info.status_.is_aborted_status()) { + // task not exist } else { - LOG_INFO("get transfer task from inner table", K(task_info)); + task_exist = true; + } + if (OB_ENTRY_NOT_EXIST == ret || OB_TABLE_NOT_EXIST == ret) { + task_exist = false; + ret = OB_SUCCESS; + } + return ret; +} + +int ObTransferHandler::fetch_transfer_task_from_inner_table_by_dest_ls_( + share::ObTransferTaskInfo &task_info, + bool &task_exist) +{ + int ret = OB_SUCCESS; + task_exist = false; + task_info.reset(); + const uint64_t tenant_id = MTL_ID(); + const ObLSID &dest_ls_id = ls_->get_ls_id(); + ObTransferTask task; + if (OB_FAIL(ObTransferTaskOperator::get_by_dest_ls( + *sql_proxy_, tenant_id, dest_ls_id, task))) { + LOG_WARN("failed to get transfer task by dest ls", K(ret), K(tenant_id), K(dest_ls_id)); + } else if (OB_FAIL(task_info.convert_from(tenant_id, task))) { + LOG_WARN("failed to convert from transfer task", K(ret), K(task)); + } else if (!task_info.status_.is_doing_status()) { + // task not exist + } else { + task_exist = true; + } + if (OB_ENTRY_NOT_EXIST == ret || OB_TABLE_NOT_EXIST == ret) { + task_exist = false; + ret = OB_SUCCESS; } return ret; } @@ -407,7 +494,15 @@ int ObTransferHandler::do_with_start_status_(const share::ObTransferTaskInfo &ta if (OB_SUCCESS != (tmp_ret = record_server_event_(ret, round_, task_info))) { LOG_WARN("failed to record server event", K(tmp_ret), K(ret), K(retry_count_), K(task_info)); } - wakeup_(); + // if START stage execution failed, just wakeup self + // if START stage execution succeeded, try to wakeup dest ls leader to go to DOING stage + if (OB_FAIL(ret)) { + wakeup_(); // wakeup self + } else { + if (OB_TMP_FAIL(wakeup_dest_ls_leader_(task_info))) { + LOG_WARN("failed to wakeup dest ls leader", K(tmp_ret), K(task_info)); + } + } LOG_INFO("[TRANSFER] finish do with start status", K(ret), K(task_info), "cost_ts", ObTimeUtil::current_time() - start_ts); return ret; } @@ -1509,6 +1604,14 @@ int ObTransferHandler::do_with_doing_status_(const share::ObTransferTaskInfo &ta const uint64_t tenant_id = task_info.tenant_id_; const share::ObLSID &src_ls_id = task_info.src_ls_id_; const share::ObLSID &dest_ls_id = task_info.dest_ls_id_; +#ifdef ERRSIM + SERVER_EVENT_SYNC_ADD("transfer_errsim", "before_transfer_doing", + "task_id", task_id, + "tenant_id", tenant_id, + "src_ls_id", src_ls_id, + "dest_ls_id", dest_ls_id); + DEBUG_SYNC(BEFORE_TRANSFER_DOING); +#endif if (!task_info.is_valid()) { ret = OB_INVALID_ARGUMENT; diff --git a/src/storage/high_availability/ob_transfer_handler.h b/src/storage/high_availability/ob_transfer_handler.h index 2481b50759..c506a15533 100644 --- a/src/storage/high_availability/ob_transfer_handler.h +++ b/src/storage/high_availability/ob_transfer_handler.h @@ -76,9 +76,16 @@ private: const bool for_update, common::ObISQLClient &trans, share::ObTransferTaskInfo &task_info); - int get_transfer_task_from_inner_table_( + int fetch_transfer_task_from_inner_table_( share::ObTransferTaskInfo &task_info); + int fetch_transfer_task_from_inner_table_by_src_ls_( + share::ObTransferTaskInfo &task_info, + bool &task_exist); + int fetch_transfer_task_from_inner_table_by_dest_ls_( + share::ObTransferTaskInfo &task_info, + bool &task_exist); void wakeup_(); + int wakeup_dest_ls_leader_(const share::ObTransferTaskInfo &task_info); int do_leader_transfer_(); int do_worker_transfer_(); diff --git a/src/storage/ob_storage_rpc.cpp b/src/storage/ob_storage_rpc.cpp index f180711b7e..60a7440dc8 100644 --- a/src/storage/ob_storage_rpc.cpp +++ b/src/storage/ob_storage_rpc.cpp @@ -1053,6 +1053,23 @@ void ObStorageUnBlockTxArg::reset() OB_SERIALIZE_MEMBER(ObStorageUnBlockTxArg, tenant_id_, ls_id_, gts_); +ObStorageWakeupTransferServiceArg::ObStorageWakeupTransferServiceArg() + : tenant_id_(OB_INVALID_ID) +{ +} + +bool ObStorageWakeupTransferServiceArg::is_valid() const +{ + return OB_INVALID_ID != tenant_id_; +} + +void ObStorageWakeupTransferServiceArg::reset() +{ + tenant_id_ = OB_INVALID_ID; +} + +OB_SERIALIZE_MEMBER(ObStorageWakeupTransferServiceArg, tenant_id_); + ObStorageConfigChangeOpArg::ObStorageConfigChangeOpArg() : tenant_id_(OB_INVALID_ID), @@ -3077,6 +3094,29 @@ int ObStorageGetLogConfigStatP::process() return ret; } +ObStorageWakeupTransferServiceP::ObStorageWakeupTransferServiceP( + common::ObInOutBandwidthThrottle *bandwidth_throttle) + : ObStorageStreamRpcP(bandwidth_throttle) +{ +} + +int ObStorageWakeupTransferServiceP::process() +{ + int ret = OB_SUCCESS; + const uint64_t tenant_id = arg_.tenant_id_; + MTL_SWITCH(tenant_id) { + ObTransferService *transfer_service = MTL(ObTransferService*); + if (OB_ISNULL(transfer_service)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("transfer service should not be NULL", K(ret), KP(transfer_service)); + } else { + transfer_service->wakeup(); + } + } + return ret; +} + + } //namespace obrpc namespace storage @@ -3700,5 +3740,30 @@ int ObStorageRpc::get_config_change_lock_stat( return ret; } + +int ObStorageRpc::wakeup_transfer_service( + const uint64_t tenant_id, + const ObStorageHASrcInfo &src_info) +{ + int ret = OB_SUCCESS; + if (!is_inited_) { + ret = OB_NOT_INIT; + STORAGE_LOG(WARN, "storage rpc is not inited", K(ret)); + } else if (tenant_id == OB_INVALID_ID || !src_info.is_valid()) { + ret = OB_INVALID_ARGUMENT; + STORAGE_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(src_info), K(ls_id)); + } else { + ObStorageWakeupTransferServiceArg arg; + arg.tenant_id_ = tenant_id; + if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_) + .by(tenant_id) + .dst_cluster_id(src_info.cluster_id_) + .wakeup_transfer_service(arg))) { + LOG_WARN("failed to wakeup transfer service", K(ret), K(src_info), K(arg)); + } + } + return ret; +} + } // storage } // oceanbase diff --git a/src/storage/ob_storage_rpc.h b/src/storage/ob_storage_rpc.h index 2e59808d1e..fdda33aa1f 100755 --- a/src/storage/ob_storage_rpc.h +++ b/src/storage/ob_storage_rpc.h @@ -715,6 +715,19 @@ private: DISALLOW_COPY_AND_ASSIGN(ObStorageConfigChangeOpRes); }; +struct ObStorageWakeupTransferServiceArg final +{ + OB_UNIS_VERSION(1); +public: + ObStorageWakeupTransferServiceArg(); + ~ObStorageWakeupTransferServiceArg() {} + bool is_valid() const; + void reset(); + + TO_STRING_KV(K_(tenant_id)); + uint64_t tenant_id_; +}; + //src class ObStorageRpcProxy : public obrpc::ObRpcProxy { @@ -748,6 +761,7 @@ public: RPC_S(PR5 lock_config_change, OB_HA_LOCK_CONFIG_CHANGE, (ObStorageConfigChangeOpArg), ObStorageConfigChangeOpRes); RPC_S(PR5 unlock_config_change, OB_HA_UNLOCK_CONFIG_CHANGE, (ObStorageConfigChangeOpArg), ObStorageConfigChangeOpRes); RPC_S(PR5 get_config_change_lock_stat, OB_HA_GET_CONFIG_CHANGE_LOCK_STAT, (ObStorageConfigChangeOpArg), ObStorageConfigChangeOpRes); + RPC_S(PR5 wakeup_transfer_service, OB_HA_WAKEUP_TRANSFER_SERVICE, (ObStorageWakeupTransferServiceArg)); }; template @@ -1049,6 +1063,16 @@ protected: int process(); }; +class ObStorageWakeupTransferServiceP: + public ObStorageStreamRpcP +{ +public: + explicit ObStorageWakeupTransferServiceP(common::ObInOutBandwidthThrottle *bandwidth_throttle); + virtual ~ObStorageWakeupTransferServiceP() {} +protected: + int process(); +}; + } // obrpc @@ -1177,6 +1201,9 @@ public: const share::ObLSID &ls_id, int64_t &palf_lock_owner, bool &is_locked) = 0; + virtual int wakeup_transfer_service( + const uint64_t tenant_id, + const ObStorageHASrcInfo &src_info) = 0; }; class ObStorageRpc: public ObIStorageRpc @@ -1299,6 +1326,9 @@ public: const share::ObLSID &ls_id, int64_t &palf_lock_owner, bool &is_locked); + virtual int wakeup_transfer_service( + const uint64_t tenant_id, + const ObStorageHASrcInfo &src_info); private: bool is_inited_;