From ac97c968bfb7fe742fe75fc78032f5701025f1ed Mon Sep 17 00:00:00 2001 From: godyangfight Date: Wed, 29 Nov 2023 14:41:41 +0000 Subject: [PATCH] replace fetch_ls_repay_scn sycn rpc into async rpc --- src/observer/ob_srv_xlator_partition.cpp | 2 +- .../high_availability/ob_finish_transfer.cpp | 80 +++----------- .../high_availability/ob_finish_transfer.h | 18 +--- .../high_availability/ob_storage_ha_utils.cpp | 101 ++++++++++++++++++ .../high_availability/ob_storage_ha_utils.h | 12 +++ .../high_availability/ob_transfer_handler.cpp | 77 +++++-------- .../high_availability/ob_transfer_handler.h | 6 +- src/storage/ob_storage_async_rpc.h | 1 + src/storage/ob_storage_rpc.cpp | 77 +++++++------ src/storage/ob_storage_rpc.h | 31 +++--- 10 files changed, 211 insertions(+), 194 deletions(-) diff --git a/src/observer/ob_srv_xlator_partition.cpp b/src/observer/ob_srv_xlator_partition.cpp index 79603421c..ce5425000 100644 --- a/src/observer/ob_srv_xlator_partition.cpp +++ b/src/observer/ob_srv_xlator_partition.cpp @@ -159,7 +159,7 @@ void oceanbase::observer::init_srv_xlator_for_migration(ObSrvRpcXlator *xlator) RPC_PROCESSOR(ObCheckStartTransferTabletsP); RPC_PROCESSOR(ObGetLSActiveTransCountP, gctx_.bandwidth_throttle_); RPC_PROCESSOR(ObGetTransferStartScnP, gctx_.bandwidth_throttle_); - RPC_PROCESSOR(ObFetchLSReplayScnP, gctx_.bandwidth_throttle_); + RPC_PROCESSOR(ObFetchLSReplayScnP); RPC_PROCESSOR(ObCheckTransferTabletsBackfillP); RPC_PROCESSOR(ObStorageGetConfigVersionAndTransferScnP); RPC_PROCESSOR(ObStorageLockConfigChangeP, gctx_.bandwidth_throttle_); diff --git a/src/storage/high_availability/ob_finish_transfer.cpp b/src/storage/high_availability/ob_finish_transfer.cpp index 41c48a85b..a7bb853a6 100644 --- a/src/storage/high_availability/ob_finish_transfer.cpp +++ b/src/storage/high_availability/ob_finish_transfer.cpp @@ -677,20 +677,23 @@ int ObTxFinishTransfer::wait_all_ls_replica_replay_scn_(const ObTransferTaskID & const int64_t quorum, ObTimeoutCtx &timeout_ctx, bool &check_passed) { int ret = OB_SUCCESS; + common::ObArray finished_addr_list; + while (OB_SUCC(ret)) { check_passed = false; if (timeout_ctx.is_timeouted()) { ret = OB_TIMEOUT; LOG_WARN("some ls replay not finished", K(ret), K(tenant_id), K(ls_id)); } else if (OB_FAIL(check_all_ls_replica_replay_scn_( - task_id, tenant_id, ls_id, member_addr_list, finish_scn, quorum, check_passed))) { + task_id, tenant_id, ls_id, member_addr_list, finish_scn, timeout_ctx, finished_addr_list))) { LOG_WARN("failed to check all ls replica replay scn", K(ret), K(tenant_id), K(member_addr_list), K(ls_id), K(quorum)); - } else if (check_passed) { + } else if (finished_addr_list.count() == member_addr_list.count()) { + check_passed = true; LOG_INFO("all ls has passed ls replica replay scn", K(tenant_id), K(ls_id)); break; } else { @@ -713,48 +716,19 @@ int ObTxFinishTransfer::wait_all_ls_replica_replay_scn_(const ObTransferTaskID & } int ObTxFinishTransfer::check_all_ls_replica_replay_scn_(const ObTransferTaskID &task_id, const uint64_t tenant_id, - const share::ObLSID &ls_id, const common::ObArray &member_addr_list, const share::SCN &finish_scn, - const int64_t quorum, bool &meet_criteria) + const share::ObLSID &ls_id, const common::ObIArray &total_addr_list, const share::SCN &finish_scn, + ObTimeoutCtx &timeout_ctx, common::ObIArray &finished_addr_list) { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; int64_t cur_quorum = 0; - FOREACH_X(location, member_addr_list, OB_SUCC(ret)) - { - if (OB_ISNULL(location)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("location should not be null", K(ret)); - } else { - const common::ObAddr &server = *location; - bool passed_finish_scn = false; - if (OB_FAIL(inner_check_ls_replay_scn_(task_id, tenant_id, ls_id, server, finish_scn, passed_finish_scn))) { - LOG_WARN("failed to check criteria", K(ret), K(task_id), K(tenant_id), K(ls_id), K(server)); - } else if (!passed_finish_scn) { - LOG_INFO("server has not passed finish scn", K(task_id), K(tenant_id), K(server), K(finish_scn)); - } else { - cur_quorum++; - LOG_INFO("server has replayed passed finish scn", K(ret), K(server)); - } - } - } - if (OB_SUCC(ret)) { - meet_criteria = cur_quorum == quorum ; - } - return ret; -} - -int ObTxFinishTransfer::inner_check_ls_replay_scn_(const ObTransferTaskID &task_id, const uint64_t tenant_id, - const share::ObLSID &ls_id, const common::ObAddr &addr, const SCN &finish_scn, bool &passed_scn) -{ - int ret = OB_SUCCESS; - passed_scn = false; - const int64_t cluster_id = GCONF.cluster_id; - SCN tmp_finish_scn; - if (OB_FAIL(fetch_ls_replay_scn_(task_id, cluster_id, addr, tenant_id, ls_id, tmp_finish_scn))) { - LOG_WARN("failed to fetch finish scn for transfer", K(ret), K(task_id), K(tenant_id), K(ls_id)); - } else { - passed_scn = tmp_finish_scn >= finish_scn; - LOG_INFO("check ls replay scn", K(passed_scn), K(tmp_finish_scn), K(finish_scn)); + common::ObArray member_addr_list; + const int32_t group_id = share::OBCG_STORAGE_HA_LEVEL2; + if (OB_FAIL(ObTransferUtils::get_need_check_member(total_addr_list, finished_addr_list, member_addr_list))) { + LOG_WARN("failed to get need check member", K(ret), K(task_id), K(tenant_id), K(ls_id)); + } else if (OB_FAIL(ObTransferUtils::check_ls_replay_scn( + tenant_id, ls_id, finish_scn, group_id, member_addr_list, timeout_ctx, finished_addr_list))) { + LOG_WARN("failed to check ls replay scn", K(ret), K(total_addr_list), K(finish_scn)); } return ret; } @@ -984,32 +958,6 @@ int ObTxFinishTransfer::report_result_( return ret; } -int ObTxFinishTransfer::fetch_ls_replay_scn_(const ObTransferTaskID &task_id, const int64_t cluster_id, - const common::ObAddr &server_addr, const uint64_t tenant_id, const share::ObLSID &ls_id, share::SCN &finish_scn) -{ - int ret = OB_SUCCESS; - ObLSService *ls_service = NULL; - storage::ObStorageRpc *storage_rpc = NULL; - storage::ObStorageHASrcInfo src_info; - src_info.src_addr_ = server_addr; - src_info.cluster_id_ = GCONF.cluster_id; - if (!src_info.is_valid() || !ls_id.is_valid()) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("get invalid args", K(ret), K(src_info), K(ls_id)); - } else if (OB_ISNULL(ls_service = MTL_WITH_CHECK_TENANT(ObLSService *, tenant_id))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("log stream service is NULL", K(ret)); - } else if (OB_ISNULL(storage_rpc = ls_service->get_storage_rpc())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("storage rpc proxy is NULL", K(ret)); - } else if (OB_FAIL(storage_rpc->fetch_ls_replay_scn(tenant_id, src_info, ls_id, finish_scn))) { - LOG_WARN("failed to fetch ls replay scn", K(ret), K(tenant_id), K(src_info), K(ls_id)); - } else { - LOG_INFO("fetch ls replay scn", K(tenant_id), K(src_info), K(ls_id)); - } - return ret; -} - int ObTxFinishTransfer::check_self_ls_leader_(const share::ObLSID &ls_id, bool &is_leader) { int ret = OB_SUCCESS; diff --git a/src/storage/high_availability/ob_finish_transfer.h b/src/storage/high_availability/ob_finish_transfer.h index 99dfcac35..c0c4f88b2 100644 --- a/src/storage/high_availability/ob_finish_transfer.h +++ b/src/storage/high_availability/ob_finish_transfer.h @@ -112,18 +112,11 @@ private: // @param[in]: server addr // @param[in]: dest_ls_scn // @param[in]: current scn - // @param[bool]: check passed + // @param[in]: timeout_ctx + // @param[in/out]: finished_addr_list int check_all_ls_replica_replay_scn_(const share::ObTransferTaskID &task_id, const uint64_t tenant_id, - const share::ObLSID &ls_id, const common::ObArray &member_addr_list, const share::SCN &finish_scn, - const int64_t quorum, bool &check_passed); - - // param[in]: tenant_id, - // param[in]: ls_id - // param[in]: server addr - // param[in]: finish_scn - // param[out]: is the check passed - int inner_check_ls_replay_scn_(const share::ObTransferTaskID &task_id, const uint64_t tenant_id, - const share::ObLSID &ls_id, const common::ObAddr &addr, const share::SCN &finish_scn, bool &passed_scn); + const share::ObLSID &ls_id, const common::ObIArray &total_addr_list, const share::SCN &finish_scn, + ObTimeoutCtx &timeout_ctx, common::ObIArray &finished_addr_list); private: /* helper functions */ @@ -208,9 +201,6 @@ private: int report_result_(const share::ObTransferTaskID &task_id, const int64_t result, obrpc::ObSrvRpcProxy *rs_rpc_proxy); private: - /*rpc section*/ - int fetch_ls_replay_scn_(const share::ObTransferTaskID &task_id, const int64_t cluster_id, - const common::ObAddr &server_addr, const uint64_t tenant_id, const share::ObLSID &ls_id, share::SCN &finish_scn); // check self is leader // @param[in]: ls_id diff --git a/src/storage/high_availability/ob_storage_ha_utils.cpp b/src/storage/high_availability/ob_storage_ha_utils.cpp index c2f2bfacd..16b597a1b 100644 --- a/src/storage/high_availability/ob_storage_ha_utils.cpp +++ b/src/storage/high_availability/ob_storage_ha_utils.cpp @@ -505,5 +505,106 @@ void ObTransferUtils::clear_transfer_module() #endif } +int ObTransferUtils::get_need_check_member( + const common::ObIArray &total_member_addr_list, + const common::ObIArray &finished_member_addr_list, + common::ObIArray &member_addr_list) +{ + int ret = OB_SUCCESS; + member_addr_list.reset(); + for (int64_t i = 0; OB_SUCC(ret) && i < total_member_addr_list.count(); ++i) { + const ObAddr &addr = total_member_addr_list.at(i); + bool need_add = true; + for (int64_t j = 0; OB_SUCC(ret) && j < finished_member_addr_list.count(); ++j) { + if (finished_member_addr_list.at(j) == addr) { + need_add = false; + break; + } + } + + if (OB_SUCC(ret) && need_add) { + if (OB_FAIL(member_addr_list.push_back(addr))) { + LOG_WARN("failed to push addr into array", K(ret), K(addr)); + } + } + } + + return ret; +} + +int ObTransferUtils::check_ls_replay_scn( + const uint64_t tenant_id, + const share::ObLSID &ls_id, + const share::SCN &check_scn, + const int32_t group_id, + const common::ObIArray &member_addr_list, + ObTimeoutCtx &timeout_ctx, + common::ObIArray &finished_addr_list) +{ + int ret = OB_SUCCESS; + storage::ObFetchLSReplayScnProxy batch_proxy( + *(GCTX.storage_rpc_proxy_), &obrpc::ObStorageRpcProxy::fetch_ls_replay_scn); + ObFetchLSReplayScnArg arg; + const int64_t timeout = 10 * 1000 * 1000; //10s + const int64_t cluster_id = GCONF.cluster_id; + + if (OB_INVALID_ID == tenant_id || !ls_id.is_valid() || !check_scn.is_valid() || group_id < 0) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("check transfer in tablet abort get invalid argument", K(ret), K(tenant_id), K(ls_id), K(check_scn), K(group_id)); + } else { + arg.tenant_id_ = tenant_id; + arg.ls_id_ = ls_id; + for (int64_t i = 0; OB_SUCC(ret) && i < member_addr_list.count(); ++i) { + const ObAddr &addr = member_addr_list.at(i); + if (timeout_ctx.is_timeouted()) { + ret = OB_TIMEOUT; + LOG_WARN("check transfer in tablet abort already timeout", K(ret), K(tenant_id), K(ls_id)); + break; + } else if (OB_FAIL(batch_proxy.call( + addr, + timeout, + cluster_id, + arg.tenant_id_, + group_id, + arg))) { + LOG_WARN("failed to send fetch ls replay scn request", K(ret), K(addr), K(tenant_id), K(ls_id)); + } else { + LOG_INFO("fetch ls replay scn complete", K(arg), K(addr)); + } + } + + ObArray return_code_array; + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(batch_proxy.wait_all(return_code_array))) { + LOG_WARN("fail to wait all batch result", KR(ret), KR(tmp_ret)); + ret = OB_SUCC(ret) ? tmp_ret : ret; + } + if (OB_FAIL(ret)) { + } else if (return_code_array.count() != member_addr_list.count() + || return_code_array.count() != batch_proxy.get_results().count()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("cnt not match", K(ret), + "return_cnt", return_code_array.count(), + "result_cnt", batch_proxy.get_results().count(), + "server_cnt", member_addr_list.count()); + } else { + ARRAY_FOREACH_X(batch_proxy.get_results(), idx, cnt, OB_SUCC(ret)) { + const obrpc::ObFetchLSReplayScnRes *response = batch_proxy.get_results().at(idx); + const int res_ret = return_code_array.at(idx); + if (OB_SUCCESS != res_ret) { + ret = res_ret; + LOG_WARN("rpc execute failed", KR(ret), K(idx)); + } else if (OB_ISNULL(response)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("response is null", K(ret)); + } else if (response->replay_scn_ >= check_scn && OB_FAIL(finished_addr_list.push_back(member_addr_list.at(idx)))) { + LOG_WARN("failed to push member addr into list", K(ret), K(idx), K(member_addr_list)); + } + } + } + } + return ret; +} + } // end namespace storage } // end namespace oceanbase diff --git a/src/storage/high_availability/ob_storage_ha_utils.h b/src/storage/high_availability/ob_storage_ha_utils.h index 9ed6cc584..33c3c57e4 100644 --- a/src/storage/high_availability/ob_storage_ha_utils.h +++ b/src/storage/high_availability/ob_storage_ha_utils.h @@ -66,6 +66,18 @@ struct ObTransferUtils static int get_gts(const uint64_t tenant_id, share::SCN >s); static void set_transfer_module(); static void clear_transfer_module(); + static int get_need_check_member( + const common::ObIArray &total_member_addr_list, + const common::ObIArray &finished_member_addr_list, + common::ObIArray &member_addr_list); + static int check_ls_replay_scn( + const uint64_t tenant_id, + const share::ObLSID &ls_id, + const share::SCN &check_scn, + const int32_t group_id, + const common::ObIArray &member_addr_list, + ObTimeoutCtx &timeout_ctx, + common::ObIArray &finished_addr_list); }; } // end namespace storage diff --git a/src/storage/high_availability/ob_transfer_handler.cpp b/src/storage/high_availability/ob_transfer_handler.cpp index 8b1b5695c..c1e68563a 100644 --- a/src/storage/high_availability/ob_transfer_handler.cpp +++ b/src/storage/high_availability/ob_transfer_handler.cpp @@ -1249,6 +1249,7 @@ int ObTransferHandler::wait_src_ls_replay_to_start_scn_( common::ObMemberList member_list; ObArray member_addr_list; const int64_t start_ts = ObTimeUtil::current_time(); + const int32_t group_id = share::OBCG_STORAGE_HA_LEVEL1; if (!is_inited_) { ret = OB_NOT_INIT; @@ -1260,7 +1261,7 @@ int ObTransferHandler::wait_src_ls_replay_to_start_scn_( LOG_WARN("failed to get src ls member list", K(ret), K(task_info)); } else if (OB_FAIL(member_list.get_addr_array(member_addr_list))) { LOG_WARN("failed to get addr array", K(ret), K(task_info), K(member_list)); - } else if (OB_FAIL(wait_ls_replay_event_(task_info, member_addr_list, start_scn, timeout_ctx))) { + } else if (OB_FAIL(wait_ls_replay_event_(task_info.src_ls_id_, task_info, member_addr_list, start_scn, group_id, timeout_ctx))) { LOG_WARN("failed to wait ls replay event", K(ret), K(task_info), K(member_list), K(start_scn)); } else { LOG_INFO("[TRANSFER_BLOCK_TX] wait src ls repaly to start scn", "cost", ObTimeUtil::current_time() - start_ts); @@ -1287,6 +1288,7 @@ int ObTransferHandler::precheck_ls_replay_scn_(const share::ObTransferTaskInfo & share::SCN check_scn; ObTimeoutCtx timeout_ctx; omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID())); + const int32_t group_id = share::OBCG_STORAGE_HA_LEVEL2; if (tenant_config.is_valid()) { const int64_t timeout = tenant_config->_transfer_start_trans_timeout * 0.5; if (OB_FAIL(timeout_ctx.set_timeout(timeout))) { @@ -1307,7 +1309,7 @@ int ObTransferHandler::precheck_ls_replay_scn_(const share::ObTransferTaskInfo & LOG_WARN("failed to get addr array", K(ret), K(task_info), K(member_list)); } else if (OB_FAIL(get_max_decided_scn_(task_info.tenant_id_, task_info.src_ls_id_, check_scn))) { LOG_WARN("failed to get max decided scn", K(ret), K(task_info)); - } else if (OB_FAIL(wait_ls_replay_event_(task_info, member_addr_list, check_scn, timeout_ctx))) { + } else if (OB_FAIL(wait_ls_replay_event_(task_info.src_ls_id_, task_info, member_addr_list, check_scn, group_id, timeout_ctx))) { LOG_WARN("failed to wait ls replay event", K(ret), K(task_info), K(member_list), K(check_scn)); if (OB_TIMEOUT == ret) { ret = OB_TRANSFER_CANNOT_START; @@ -1350,56 +1352,43 @@ int ObTransferHandler::get_max_decided_scn_( } int ObTransferHandler::wait_ls_replay_event_( + const share::ObLSID &ls_id, const share::ObTransferTaskInfo &task_info, - const common::ObArray &member_addr_list, + const common::ObArray &total_addr_list, const share::SCN &check_scn, + const int32_t group_id, ObTimeoutCtx &timeout_ctx) { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; const int64_t OB_CHECK_START_SCN_READY_INTERVAL = 5 * 1000; //5ms const int64_t start_ts = ObTimeUtil::current_time(); - hash::ObHashSet replica_addr_set; - if (OB_FAIL(replica_addr_set.create(OB_DEFAULT_REPLICA_NUM))) { - LOG_WARN("failed to create replica addr set", K(ret), K(task_info)); - } + common::ObArray member_addr_list; + common::ObArray finished_member_addr_list; + bool is_leader = false; + while (OB_SUCC(ret)) { - int64_t replica_count = 0; if (timeout_ctx.is_timeouted()) { ret = OB_TIMEOUT; LOG_WARN("already timeout", K(ret), K(task_info)); break; - } else { - for (int64_t i = 0; OB_SUCC(ret) && i < member_addr_list.count(); ++i) { - const ObAddr &replica_addr = member_addr_list.at(i); - const int32_t hash_ret = replica_addr_set.exist_refactored(replica_addr); - SCN replica_scn; - if (OB_HASH_EXIST == hash_ret) { - replica_count++; - } else if (OB_HASH_NOT_EXIST == hash_ret) { - ObStorageHASrcInfo src_info; - src_info.cluster_id_ = GCONF.cluster_id; - src_info.src_addr_ = replica_addr; - if (OB_TMP_FAIL(inner_get_scn_for_wait_event_(task_info, src_info, replica_scn))) { - LOG_WARN("failed to inner get scn for wait event", K(tmp_ret), K(src_info)); - } else if (replica_scn >= check_scn) { - if (OB_FAIL(replica_addr_set.set_refactored(replica_addr))) { - LOG_WARN("failed to set replica into hash set", K(ret), K(replica_addr)); - } else { - replica_count++; - } - } - } else { - ret = OB_SUCC(hash_ret) ? OB_ERR_UNEXPECTED : hash_ret; - LOG_WARN("failed to get replica server from hash set", K(ret), K(task_info)); - } - } + } else if (OB_FAIL(check_self_is_leader_(is_leader))) { + LOG_WARN("failed to check self is leader", K(ret), KPC(ls_)); + } else if (!is_leader) { + ret = OB_LS_NOT_LEADER; + LOG_WARN("ls leader has been changed", K(ret), K(task_info)); + break; + } else if (OB_FAIL(ObTransferUtils::get_need_check_member(total_addr_list, finished_member_addr_list, member_addr_list))) { + LOG_WARN("failed to get need check member", K(ret), K(task_info), K(total_addr_list)); + } else if (OB_FAIL(ObTransferUtils::check_ls_replay_scn(task_info.tenant_id_, ls_id, check_scn, + group_id, member_addr_list, timeout_ctx, finished_member_addr_list))) { + LOG_WARN("failed to check ls replay scn", K(ret), K(task_info), K(ls_id), K(check_scn)); } if (OB_SUCC(ret)) { - if (replica_count == member_addr_list.count()) { + if (finished_member_addr_list.count() == total_addr_list.count()) { FLOG_INFO("[TRANSFER] src ls all replicas replay reach check_scn", "src_ls", task_info.src_ls_id_, - K(check_scn), K(member_addr_list), "cost", ObTimeUtil::current_time() - start_ts); + K(check_scn), K(total_addr_list), "cost", ObTimeUtil::current_time() - start_ts); break; } } @@ -1412,24 +1401,6 @@ int ObTransferHandler::wait_ls_replay_event_( return ret; } -int ObTransferHandler::inner_get_scn_for_wait_event_( - const share::ObTransferTaskInfo &task_info, - const ObStorageHASrcInfo &src_info, - share::SCN &replica_scn) -{ - int ret = OB_SUCCESS; - const uint64_t tenant_id = task_info.tenant_id_; - const share::ObLSID &src_ls_id = task_info.src_ls_id_; - - if (OB_ISNULL(storage_rpc_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("storage rpc should not be null", K(ret)); - } else if (OB_FAIL(storage_rpc_->fetch_ls_replay_scn(tenant_id, src_info, src_ls_id, replica_scn))) { - LOG_WARN("failed to fetch ls replay scn", K(ret), K(tenant_id), K(src_info)); - } - return ret; -} - int ObTransferHandler::get_transfer_tablets_meta_( const share::ObTransferTaskInfo &task_info, common::ObIArray &tablet_meta_list) diff --git a/src/storage/high_availability/ob_transfer_handler.h b/src/storage/high_availability/ob_transfer_handler.h index e75e9c497..d6522f91e 100644 --- a/src/storage/high_availability/ob_transfer_handler.h +++ b/src/storage/high_availability/ob_transfer_handler.h @@ -157,14 +157,12 @@ private: ObTimeoutCtx &timeout_ctx, share::SCN &start_scn); int wait_ls_replay_event_( + const share::ObLSID &ls_id, const share::ObTransferTaskInfo &task_info, const common::ObArray &member_addr_list, const share::SCN &check_scn, + const int32_t group_id, ObTimeoutCtx &timeout_ctx); - int inner_get_scn_for_wait_event_( - const share::ObTransferTaskInfo &task_info, - const ObStorageHASrcInfo &src_info, - share::SCN &replica_scn); int precheck_ls_replay_scn_( const share::ObTransferTaskInfo &task_info); int get_max_decided_scn_( diff --git a/src/storage/ob_storage_async_rpc.h b/src/storage/ob_storage_async_rpc.h index 55b90cc09..2371e4d77 100644 --- a/src/storage/ob_storage_async_rpc.h +++ b/src/storage/ob_storage_async_rpc.h @@ -32,6 +32,7 @@ namespace storage RPC_HA(obrpc::OB_HA_CHECK_TRANSFER_TABLET_BACKFILL, obrpc::ObCheckTransferTabletBackfillArg, obrpc::ObCheckTransferTabletBackfillRes, ObCheckTransferTabletBackfillProxy); RPC_HA(obrpc::OB_HA_CHANGE_MEMBER_SERVICE, obrpc::ObStorageChangeMemberArg, obrpc::ObStorageChangeMemberRes, ObHAChangeMemberProxy); RPC_HA(obrpc::OB_CHECK_START_TRANSFER_TABLETS, obrpc::ObTransferTabletInfoArg, obrpc::Int64, ObCheckStartTransferTabletsProxy); +RPC_HA(obrpc::OB_HA_FETCH_LS_REPLAY_SCN, obrpc::ObFetchLSReplayScnArg, obrpc::ObFetchLSReplayScnRes, ObFetchLSReplayScnProxy); }//end namespace storage }//end namespace oceanbase diff --git a/src/storage/ob_storage_rpc.cpp b/src/storage/ob_storage_rpc.cpp index 595061063..9fd9c4dd1 100644 --- a/src/storage/ob_storage_rpc.cpp +++ b/src/storage/ob_storage_rpc.cpp @@ -2696,12 +2696,31 @@ int ObGetTransferStartScnP::process() return ret; } -ObFetchLSReplayScnP::ObFetchLSReplayScnP( - common::ObInOutBandwidthThrottle *bandwidth_throttle) - : ObStorageStreamRpcP(bandwidth_throttle) +OFetchLSReplayScnDelegate::OFetchLSReplayScnDelegate(obrpc::ObFetchLSReplayScnRes &result) + : is_inited_(false), + arg_(), + result_(result) { } -int ObFetchLSReplayScnP::process() + +int OFetchLSReplayScnDelegate::init( + const obrpc::ObFetchLSReplayScnArg &arg) +{ + int ret = OB_SUCCESS; + if (IS_INIT) { + ret = OB_INIT_TWICE; + LOG_WARN("fetch ls replay scn delegate init twice", K(ret)); + } else if (!arg.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("get invalid arg", K(ret), K(arg)); + } else { + arg_ = arg; + is_inited_ = true; + } + return ret; +} + +int OFetchLSReplayScnDelegate::process() { int ret = OB_SUCCESS; MTL_SWITCH(arg_.tenant_id_) { @@ -2710,11 +2729,7 @@ int ObFetchLSReplayScnP::process() ObLS *ls = NULL; SCN max_decided_scn; LOG_INFO("start to fetch ls replay scn", K(arg_)); - if (OB_ISNULL(bandwidth_throttle_)) { - ret = OB_ERR_UNEXPECTED; - STORAGE_LOG(ERROR, "bandwidth_throttle_ must not null", K(ret), - KP_(bandwidth_throttle)); - } else if (OB_ISNULL(ls_service = MTL(ObLSService *))) { + if (OB_ISNULL(ls_service = MTL(ObLSService *))) { ret = OB_ERR_UNEXPECTED; STORAGE_LOG(WARN, "ls service should not be null", K(ret), KP(ls_service)); } else if (OB_FAIL(ls_service->get_ls(arg_.ls_id_, ls_handle, ObLSGetMod::STORAGE_MOD))) { @@ -2732,6 +2747,18 @@ int ObFetchLSReplayScnP::process() return ret; } +int ObFetchLSReplayScnP::process() +{ + int ret = OB_SUCCESS; + OFetchLSReplayScnDelegate delegate(result_); + if (OB_FAIL(delegate.init(arg_))) { + LOG_WARN("failed to init delegate", K(ret)); + } else if (OB_FAIL(delegate.process())) { + LOG_WARN("failed to do process", K(ret), K_(arg)); + } + return ret; +} + ObCheckTransferTabletsBackfillDelegate::ObCheckTransferTabletsBackfillDelegate(obrpc::ObCheckTransferTabletBackfillRes &result) : is_inited_(false), arg_(), @@ -3490,38 +3517,6 @@ int ObStorageRpc::get_transfer_start_scn( return ret; } -int ObStorageRpc::fetch_ls_replay_scn( - const uint64_t tenant_id, - const ObStorageHASrcInfo &src_info, - const share::ObLSID &ls_id, - SCN &ls_replay_scn) -{ - int ret = OB_SUCCESS; - ls_replay_scn.reset(); - if (!is_inited_) { - ret = OB_NOT_INIT; - STORAGE_LOG(WARN, "storage rpc is not inited", K(ret)); - } else if (tenant_id == OB_INVALID_ID || !src_info.is_valid() || !ls_id.is_valid()) { - ret = OB_INVALID_ARGUMENT; - STORAGE_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(src_info), K(ls_id)); - } else { - ObFetchLSReplayScnArg arg; - ObFetchLSReplayScnRes res; - arg.tenant_id_ = tenant_id; - arg.ls_id_ = ls_id; - if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_) - .by(tenant_id) - .dst_cluster_id(src_info.cluster_id_) - .group_id(share::OBCG_STORAGE_HA_LEVEL2) - .fetch_ls_replay_scn(arg, res))) { - LOG_WARN("failed to fetch ls replay scn", K(ret), K(src_info), K(arg)); - } else { - ls_replay_scn = res.replay_scn_; - } - } - return ret; -} - int ObStorageRpc::lock_config_change( const uint64_t tenant_id, const ObStorageHASrcInfo &src_info, diff --git a/src/storage/ob_storage_rpc.h b/src/storage/ob_storage_rpc.h index 3a280fe2c..d1214f29e 100644 --- a/src/storage/ob_storage_rpc.h +++ b/src/storage/ob_storage_rpc.h @@ -754,7 +754,6 @@ public: RPC_S(PR5 update_ls_meta, OB_HA_UPDATE_LS_META, (ObRestoreUpdateLSMetaArg)); RPC_S(PR5 get_ls_active_trans_count, OB_GET_LS_ACTIVE_TRANSACTION_COUNT, (ObGetLSActiveTransCountArg), ObGetLSActiveTransCountRes); RPC_S(PR5 get_transfer_start_scn, OB_GET_TRANSFER_START_SCN, (ObGetTransferStartScnArg), ObGetTransferStartScnRes); - RPC_S(PR5 fetch_ls_replay_scn, OB_HA_FETCH_LS_REPLAY_SCN, (ObFetchLSReplayScnArg), ObFetchLSReplayScnRes); RPC_S(PR5 lock_config_change, OB_HA_LOCK_CONFIG_CHANGE, (ObStorageConfigChangeOpArg), ObStorageConfigChangeOpRes); RPC_S(PR5 unlock_config_change, OB_HA_UNLOCK_CONFIG_CHANGE, (ObStorageConfigChangeOpArg), ObStorageConfigChangeOpRes); RPC_S(PR5 get_config_change_lock_stat, OB_HA_GET_CONFIG_CHANGE_LOCK_STAT, (ObStorageConfigChangeOpArg), ObStorageConfigChangeOpRes); @@ -765,6 +764,7 @@ public: RPC_AP(PR5 check_transfer_tablet_backfill_completed, OB_HA_CHECK_TRANSFER_TABLET_BACKFILL, (obrpc::ObCheckTransferTabletBackfillArg), obrpc::ObCheckTransferTabletBackfillRes); RPC_AP(PR5 get_config_version_and_transfer_scn, OB_HA_CHANGE_MEMBER_SERVICE, (obrpc::ObStorageChangeMemberArg), obrpc::ObStorageChangeMemberRes); RPC_AP(PR5 check_start_transfer_tablets, OB_CHECK_START_TRANSFER_TABLETS, (obrpc::ObTransferTabletInfoArg), obrpc::Int64); + RPC_AP(PR5 fetch_ls_replay_scn, OB_HA_FETCH_LS_REPLAY_SCN, (obrpc::ObFetchLSReplayScnArg), obrpc::ObFetchLSReplayScnRes); }; template @@ -952,15 +952,28 @@ protected: }; class ObFetchLSReplayScnP: - public ObStorageStreamRpcP + public ObStorageRpcProxy::Processor { public: - explicit ObFetchLSReplayScnP(common::ObInOutBandwidthThrottle *bandwidth_throttle); + ObFetchLSReplayScnP() = default; virtual ~ObFetchLSReplayScnP() {} protected: int process(); }; +class OFetchLSReplayScnDelegate final +{ +public: + OFetchLSReplayScnDelegate(obrpc::ObFetchLSReplayScnRes &result); + int init(const obrpc::ObFetchLSReplayScnArg &arg); + int process(); +private: + bool is_inited_; + obrpc::ObFetchLSReplayScnArg arg_; + obrpc::ObFetchLSReplayScnRes &result_; + DISALLOW_COPY_AND_ASSIGN(OFetchLSReplayScnDelegate); +}; + class ObCheckTransferTabletsBackfillP: public ObStorageRpcProxy::Processor { @@ -1157,12 +1170,6 @@ public: const share::ObLSID &ls_id, const common::ObIArray &tablet_list, share::SCN &transfer_start_scn) = 0; - - virtual int fetch_ls_replay_scn( - const uint64_t tenant_id, - const ObStorageHASrcInfo &src_info, - const share::ObLSID &ls_id, - share::SCN &ls_replay_scn) = 0; virtual int lock_config_change( const uint64_t tenant_id, const ObStorageHASrcInfo &src_info, @@ -1251,12 +1258,6 @@ public: const share::ObLSID &ls_id, const common::ObIArray &tablet_list, share::SCN &transfer_start_scn); - - virtual int fetch_ls_replay_scn( - const uint64_t tenant_id, - const ObStorageHASrcInfo &src_info, - const share::ObLSID &ls_id, - share::SCN &ls_replay_scn); virtual int lock_config_change( const uint64_t tenant_id, const ObStorageHASrcInfo &src_info,