diff --git a/src/logservice/palf/palf_handle_impl.cpp b/src/logservice/palf/palf_handle_impl.cpp index 14e5b0680..5cb57a566 100755 --- a/src/logservice/palf/palf_handle_impl.cpp +++ b/src/logservice/palf/palf_handle_impl.cpp @@ -2974,6 +2974,16 @@ int PalfHandleImpl::receive_log_(const common::ObAddr &server, { int ret = OB_SUCCESS; TruncateLogInfo truncate_log_info; +#ifdef ERRSIM + if (!GCONF.palf_inject_receive_log_error_zone.get_value_string().empty()) { + if (0 == strcmp(GCONF.zone.str(), GCONF.palf_inject_receive_log_error_zone.str())) { + ret = OB_ERROR; + LOG_WARN("palf receive log errsim", K(ret)); + return ret; + } + } +#endif + if (IS_NOT_INIT) { ret = OB_NOT_INIT; } else if (!server.is_valid() || INVALID_PROPOSAL_ID == msg_proposal_id || !lsn.is_valid() diff --git a/src/observer/ob_server.cpp b/src/observer/ob_server.cpp index 52fa4acd2..cd85da7f2 100644 --- a/src/observer/ob_server.cpp +++ b/src/observer/ob_server.cpp @@ -160,7 +160,7 @@ ObServer::ObServer() prepare_stop_(true), stop_(true), has_stopped_(true), has_destroy_(false), net_frame_(gctx_), sql_conn_pool_(), ddl_conn_pool_(), dblink_conn_pool_(), res_inner_conn_pool_(), restore_ctx_(), srv_rpc_proxy_(), - rs_rpc_proxy_(), sql_proxy_(), + storage_rpc_proxy_(), rs_rpc_proxy_(), sql_proxy_(), dblink_proxy_(), executor_proxy_(), executor_rpc_(), dbms_job_rpc_proxy_(), dbms_sched_job_rpc_proxy_(), interrupt_proxy_(), config_(ObServerConfig::get_instance()), @@ -2239,6 +2239,8 @@ int ObServer::init_network() LOG_ERROR("init server network fail"); } else if (OB_FAIL(net_frame_.get_proxy(srv_rpc_proxy_))) { LOG_ERROR("get rpc proxy fail", KR(ret)); + } else if (OB_FAIL(net_frame_.get_proxy(storage_rpc_proxy_))) { + LOG_ERROR("get rpc proxy fail", KR(ret)); } else if (OB_FAIL(net_frame_.get_proxy(rs_rpc_proxy_))) { LOG_ERROR("get rpc proxy fail", KR(ret)); } else if (OB_FAIL(net_frame_.get_proxy(executor_proxy_))) { @@ -2488,6 +2490,7 @@ int ObServer::init_global_context() gctx_.lst_operator_ = &lst_operator_; gctx_.tablet_operator_ = &tablet_operator_; gctx_.srv_rpc_proxy_ = &srv_rpc_proxy_; + gctx_.storage_rpc_proxy_ = &storage_rpc_proxy_; gctx_.dbms_job_rpc_proxy_ = &dbms_job_rpc_proxy_; gctx_.inner_sql_rpc_proxy_ = &inner_sql_rpc_proxy_; gctx_.dbms_sched_job_rpc_proxy_ = &dbms_sched_job_rpc_proxy_; diff --git a/src/observer/ob_server.h b/src/observer/ob_server.h index d80b5be57..2ff33c700 100644 --- a/src/observer/ob_server.h +++ b/src/observer/ob_server.h @@ -338,6 +338,7 @@ private: // The two proxies by which local OceanBase server has ability to // communicate with other server. obrpc::ObSrvRpcProxy srv_rpc_proxy_; + obrpc::ObStorageRpcProxy storage_rpc_proxy_; obrpc::ObCommonRpcProxy rs_rpc_proxy_; common::ObMySQLProxy sql_proxy_; common::ObMySQLProxy ddl_sql_proxy_; diff --git a/src/observer/ob_server_struct.cpp b/src/observer/ob_server_struct.cpp index ca52ab428..d6ff9d80f 100644 --- a/src/observer/ob_server_struct.cpp +++ b/src/observer/ob_server_struct.cpp @@ -69,6 +69,7 @@ DEF_TO_STRING(ObGlobalContext) KP_(lst_operator), KP_(tablet_operator), KP_(srv_rpc_proxy), + KP_(storage_rpc_proxy), KP_(rs_rpc_proxy), KP_(load_data_proxy), KP_(executor_rpc), @@ -110,6 +111,7 @@ ObGlobalContext &ObGlobalContext::operator=(const ObGlobalContext &other) lst_operator_ = other.lst_operator_; tablet_operator_ = other.tablet_operator_; srv_rpc_proxy_ = other.srv_rpc_proxy_; + storage_rpc_proxy_ = other.storage_rpc_proxy_; rs_rpc_proxy_ = other.rs_rpc_proxy_; load_data_proxy_ = other.load_data_proxy_; inner_sql_rpc_proxy_ = other.inner_sql_rpc_proxy_; diff --git a/src/observer/ob_server_struct.h b/src/observer/ob_server_struct.h index 42084d20e..7b9c27c30 100644 --- a/src/observer/ob_server_struct.h +++ b/src/observer/ob_server_struct.h @@ -34,6 +34,7 @@ class ObMysqlRandom; namespace obrpc { class ObSrvRpcProxy; +class ObStorageRpcProxy; class ObCommonRpcProxy; class ObLoadDataRpcProxy; class ObDBMSJobRpcProxy; @@ -234,6 +235,7 @@ struct ObGlobalContext share::ObLSTableOperator *lst_operator_; share::ObTabletTableOperator *tablet_operator_; obrpc::ObSrvRpcProxy *srv_rpc_proxy_; + obrpc::ObStorageRpcProxy *storage_rpc_proxy_; obrpc::ObDBMSJobRpcProxy *dbms_job_rpc_proxy_; obrpc::ObInnerSQLRpcProxy *inner_sql_rpc_proxy_; obrpc::ObDBMSSchedJobRpcProxy *dbms_sched_job_rpc_proxy_; diff --git a/src/observer/ob_srv_xlator_partition.cpp b/src/observer/ob_srv_xlator_partition.cpp index 1e69e9610..29e5a8e66 100644 --- a/src/observer/ob_srv_xlator_partition.cpp +++ b/src/observer/ob_srv_xlator_partition.cpp @@ -155,12 +155,12 @@ void oceanbase::observer::init_srv_xlator_for_migration(ObSrvRpcXlator *xlator) RPC_PROCESSOR(ObUpdateLSMetaP, gctx_.bandwidth_throttle_); //transfer - RPC_PROCESSOR(ObCheckStartTransferTabletsP, gctx_.bandwidth_throttle_); + RPC_PROCESSOR(ObCheckStartTransferTabletsP); RPC_PROCESSOR(ObGetLSActiveTransCountP, gctx_.bandwidth_throttle_); RPC_PROCESSOR(ObGetTransferStartScnP, gctx_.bandwidth_throttle_); RPC_PROCESSOR(ObFetchLSReplayScnP, gctx_.bandwidth_throttle_); - RPC_PROCESSOR(ObCheckTransferTabletsBackfillP, gctx_.bandwidth_throttle_); - RPC_PROCESSOR(ObStorageGetConfigVersionAndTransferScnP, gctx_.bandwidth_throttle_); + RPC_PROCESSOR(ObCheckTransferTabletsBackfillP); + RPC_PROCESSOR(ObStorageGetConfigVersionAndTransferScnP); RPC_PROCESSOR(ObStorageLockConfigChangeP, gctx_.bandwidth_throttle_); RPC_PROCESSOR(ObStorageUnlockConfigChangeP, gctx_.bandwidth_throttle_); RPC_PROCESSOR(ObStorageGetLogConfigStatP, gctx_.bandwidth_throttle_); diff --git a/src/share/ob_debug_sync_point.h b/src/share/ob_debug_sync_point.h index 959fbe2fd..7544582fa 100755 --- a/src/share/ob_debug_sync_point.h +++ b/src/share/ob_debug_sync_point.h @@ -558,6 +558,7 @@ class ObString; ACT(BEFORE_CHECK_PRIMARY_ZONE,)\ ACT(BEFORE_RELOAD_UNIT,)\ ACT(BEFORE_PROCESS_EVENT_TASK,)\ + ACT(BEFORE_CHECK_LS_TRANSFER_SCN_FOR_STANDBY,)\ ACT(MAX_DEBUG_SYNC_POINT,) DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF); diff --git a/src/share/parameter/ob_parameter_seed.ipp b/src/share/parameter/ob_parameter_seed.ipp index a423b582f..24ed0e649 100755 --- a/src/share/parameter/ob_parameter_seed.ipp +++ b/src/share/parameter/ob_parameter_seed.ipp @@ -1734,7 +1734,6 @@ DEF_STR_WITH_CHECKER(sql_protocol_min_tls_version, OB_CLUSTER_PARAMETER, "none", DEF_MODE_WITH_PARSER(_obkv_feature_mode, OB_CLUSTER_PARAMETER, "", common::ObKvFeatureModeParser, "_obkv_feature_mode is a option list to control specified OBKV features on/off.", ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); - DEF_BOOL(_enable_range_extraction_for_not_in, OB_TENANT_PARAMETER, "True", "Enable extract query range for not in predicate", ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); @@ -1742,3 +1741,5 @@ DEF_BOOL(_enable_range_extraction_for_not_in, OB_TENANT_PARAMETER, "True", // DEF_BOOL(_enable_new_query_range_extraction, OB_TENANT_PARAMETER, "True", // "decide whether use new algorithm to extract query range.", // ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); +ERRSIM_DEF_STR(palf_inject_receive_log_error_zone, OB_CLUSTER_PARAMETER, "", "specifies the zone name that palf module want to inject error when receive log", + ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); diff --git a/src/share/rpc/ob_async_rpc_proxy.h b/src/share/rpc/ob_async_rpc_proxy.h index 186fd7e35..838e6590b 100644 --- a/src/share/rpc/ob_async_rpc_proxy.h +++ b/src/share/rpc/ob_async_rpc_proxy.h @@ -27,11 +27,11 @@ namespace oceanbase namespace obrpc { -template -class ObAsyncCB : public ObSrvRpcProxy::AsyncCB, - public common::ObDLinkBase > +template +class ObAsyncCB : public RpcProxy::template AsyncCB, + public common::ObDLinkBase > { - using AsyncCB = typename ObSrvRpcProxy::AsyncCB; + using AsyncCB = typename RpcProxy::template AsyncCB; public: ObAsyncCB(AsyncRpcProxy &proxy) : proxy_(proxy) {} virtual ~ObAsyncCB() {} @@ -57,8 +57,8 @@ private: AsyncRpcProxy &proxy_; }; -template -rpc::frame::ObReqTransport::AsyncCB *ObAsyncCB::clone( +template +rpc::frame::ObReqTransport::AsyncCB *ObAsyncCB::clone( const rpc::frame::SPAlloc &alloc) const { UNUSED(alloc); @@ -66,8 +66,8 @@ rpc::frame::ObReqTransport::AsyncCB *ObAsyncCB::clone( static_cast(this)); } -template -int ObAsyncCB::process() +template +int ObAsyncCB::process() { int ret = common::OB_SUCCESS; if (OB_FAIL(proxy_.receive_response())) { @@ -76,8 +76,8 @@ int ObAsyncCB::process() return ret; } -template -void ObAsyncCB::on_timeout() +template +void ObAsyncCB::on_timeout() { int ret = common::OB_SUCCESS; RPC_LOG(WARN, "some error in rcode and enter on_timeout", K(AsyncCB::rcode_.rcode_)); @@ -87,8 +87,8 @@ void ObAsyncCB::on_timeout() } } -template -void ObAsyncCB::on_invalid() +template +void ObAsyncCB::on_invalid() { int tmp_ret = common::OB_SUCCESS; AsyncCB::rcode_.rcode_ = common::OB_RPC_PACKET_INVALID; @@ -97,7 +97,7 @@ void ObAsyncCB::on_invalid() } } -template +template class ObAsyncRpcProxy { public: @@ -106,7 +106,7 @@ public: bool is_valid() const { return true; } TO_STRING_EMPTY(); }; - ObAsyncRpcProxy(ObSrvRpcProxy &rpc_proxy, const Func &func); + ObAsyncRpcProxy(RpcProxy &rpc_proxy, const Func &func); virtual ~ObAsyncRpcProxy(); void reuse(); @@ -142,29 +142,29 @@ public: int receive_response(); private: int call_rpc(const common::ObAddr &server, const int64_t timeout, const int64_t cluster_id, - const uint64_t tenant_id, const RpcArg &arg, ObAsyncCB *cb); + const uint64_t tenant_id, const RpcArg &arg, ObAsyncCB *cb); int call_rpc(const common::ObAddr &server, const int64_t timeout, const int64_t cluster_id, const uint64_t tenant_id, const uint64_t group_id, const RpcArg &arg, - ObAsyncCB *cb); + ObAsyncCB *cb); int call_rpc(const common::ObAddr &server, const int64_t timeout, const uint64_t tenant_id, - const EmptyType &empty_obj, ObAsyncCB *cb); + const EmptyType &empty_obj, ObAsyncCB *cb); int wait(common::ObIArray *return_code_array, const bool return_rpc_error); - ObSrvRpcProxy &rpc_proxy_; + RpcProxy &rpc_proxy_; common::ObArray args_; common::ObArray dests_; common::ObArray results_; Func func_; common::ObArenaAllocator allocator_; - common::ObDList > cb_list_; + common::ObDList > cb_list_; int64_t response_count_; common::ObThreadCond cond_; private: DISALLOW_COPY_AND_ASSIGN(ObAsyncRpcProxy); }; -template -ObAsyncRpcProxy::ObAsyncRpcProxy( - ObSrvRpcProxy &rpc_proxy, const Func &func) +template +ObAsyncRpcProxy::ObAsyncRpcProxy( + RpcProxy &rpc_proxy, const Func &func) : rpc_proxy_(rpc_proxy), args_(), results_(), func_(func), allocator_(common::ObModIds::OB_ASYNC_RPC_PROXY), cb_list_(), response_count_(0), cond_() @@ -175,20 +175,20 @@ ObAsyncRpcProxy::ObAsyncRpcProxy( } } -template -ObAsyncRpcProxy::~ObAsyncRpcProxy() +template +ObAsyncRpcProxy::~ObAsyncRpcProxy() { reuse(); } -template -void ObAsyncRpcProxy::reuse() +template +void ObAsyncRpcProxy::reuse() { args_.reuse(); results_.reuse(); response_count_ = 0; - ObAsyncCB *cb = cb_list_.get_first(); - ObAsyncCB *next = NULL; + ObAsyncCB *cb = cb_list_.get_first(); + ObAsyncCB *next = NULL; while (cb != cb_list_.get_header()) { next = cb->get_next(); cb->~ObAsyncCB(); @@ -198,8 +198,8 @@ void ObAsyncRpcProxy::reuse() allocator_.reuse(); } -template -int ObAsyncRpcProxy::call( +template +int ObAsyncRpcProxy::call( const common::ObAddr &server, const int64_t timeout) { @@ -221,8 +221,8 @@ int ObAsyncRpcProxy::call( return ret; } -template -int ObAsyncRpcProxy::call( +template +int ObAsyncRpcProxy::call( const common::ObAddr &server, const int64_t timeout, const RpcArg &arg) @@ -230,8 +230,8 @@ int ObAsyncRpcProxy::call( return call(server, timeout, common::OB_INVALID_CLUSTER_ID, OB_SYS_TENANT_ID, 0, arg); } -template -int ObAsyncRpcProxy::call( +template +int ObAsyncRpcProxy::call( const common::ObAddr &server, const int64_t timeout, const uint64_t tenant_id, @@ -240,8 +240,8 @@ int ObAsyncRpcProxy::call( return call(server, timeout, common::OB_INVALID_CLUSTER_ID, tenant_id, 0, arg); } -template -int ObAsyncRpcProxy::call( +template +int ObAsyncRpcProxy::call( const common::ObAddr &server, const int64_t timeout, const int64_t cluster_id, @@ -251,8 +251,8 @@ int ObAsyncRpcProxy::call( return call(server, timeout, cluster_id, tenant_id, 0, arg); } -template -int ObAsyncRpcProxy::call( +template +int ObAsyncRpcProxy::call( const common::ObAddr &server, const int64_t timeout, const int64_t cluster_id, @@ -265,12 +265,12 @@ int ObAsyncRpcProxy::call( if (!server.is_valid() || timeout <= 0 || !arg.is_valid()) { ret = common::OB_INVALID_ARGUMENT; RPC_LOG(WARN, "invalid argument", K(server), K(timeout), K(arg), KR(ret)); - } else if (NULL == (mem = allocator_.alloc(sizeof(ObAsyncCB)))) { + } else if (NULL == (mem = allocator_.alloc(sizeof(ObAsyncCB)))) { ret = common::OB_ALLOCATE_MEMORY_FAILED; RPC_LOG(ERROR, "alloc memory failed", - "size", sizeof(ObAsyncCB), KR(ret)); + "size", sizeof(ObAsyncCB), KR(ret)); } else { - ObAsyncCB *cb = new (mem) ObAsyncCB(*this); + ObAsyncCB *cb = new (mem) ObAsyncCB(*this); if (!cb_list_.add_last(cb)) { ret = common::OB_ERR_UNEXPECTED; RPC_LOG(WARN, "cb_list add_last failed", KR(ret)); @@ -304,11 +304,11 @@ int ObAsyncRpcProxy::call( return ret; } -template -int ObAsyncRpcProxy::call_rpc( +template +int ObAsyncRpcProxy::call_rpc( const common::ObAddr &server, const int64_t timeout, const int64_t cluster_id, const uint64_t tenant_id, - const RpcArg &arg, ObAsyncCB *cb) + const RpcArg &arg, ObAsyncCB *cb) { int ret = common::OB_SUCCESS; if (!server.is_valid() || timeout <= 0 || !arg.is_valid() || NULL == cb) { @@ -329,12 +329,12 @@ int ObAsyncRpcProxy::call_rpc( return ret; } -template -int ObAsyncRpcProxy::call_rpc( +template +int ObAsyncRpcProxy::call_rpc( const common::ObAddr &server, const int64_t timeout, const int64_t cluster_id, const uint64_t tenant_id, const uint64_t group_id, const RpcArg &arg, - ObAsyncCB *cb) + ObAsyncCB *cb) { int ret = common::OB_SUCCESS; if (!server.is_valid() || timeout <= 0 || !arg.is_valid() || NULL == cb) { @@ -356,10 +356,10 @@ int ObAsyncRpcProxy::call_rpc( return ret; } -template -int ObAsyncRpcProxy::call_rpc( +template +int ObAsyncRpcProxy::call_rpc( const common::ObAddr &server, const int64_t timeout, const uint64_t tenant_id, - const EmptyType &empty_obj, ObAsyncCB *cb) + const EmptyType &empty_obj, ObAsyncCB *cb) { UNUSED(empty_obj); int ret = common::OB_SUCCESS; @@ -373,23 +373,23 @@ int ObAsyncRpcProxy::call_rpc( return ret; } -template -int ObAsyncRpcProxy::wait() +template +int ObAsyncRpcProxy::wait() { common::ObIArray *return_code_array = NULL; const bool return_rpc_error = true; return wait(return_code_array, return_rpc_error); } -template -int ObAsyncRpcProxy::wait_all(common::ObIArray &return_code_array) +template +int ObAsyncRpcProxy::wait_all(common::ObIArray &return_code_array) { const bool return_rpc_error = false; return wait(&return_code_array, return_rpc_error); } -template -int ObAsyncRpcProxy::wait( +template +int ObAsyncRpcProxy::wait( common::ObIArray *return_code_array, const bool return_rpc_error) { int ret = common::OB_SUCCESS; @@ -406,7 +406,7 @@ int ObAsyncRpcProxy::wait( // set results int index = 0; - ObAsyncCB *cb = cb_list_.get_first(); + ObAsyncCB *cb = cb_list_.get_first(); while (common::OB_SUCCESS == ret && cb != cb_list_.get_header()) { if (NULL == cb) { ret = common::OB_ERR_UNEXPECTED; @@ -447,8 +447,8 @@ int ObAsyncRpcProxy::wait( return ret; } -template -int ObAsyncRpcProxy::receive_response() +template +int ObAsyncRpcProxy::receive_response() { int ret = common::OB_SUCCESS; common::ObThreadCondGuard guard(cond_); @@ -469,7 +469,7 @@ int ObAsyncRpcProxy::receive_response() #define RPC_F(code, arg, result, name) \ typedef obrpc::ObAsyncRpcProxy *, const obrpc::ObRpcOpts &)> name + int (obrpc::ObSrvRpcProxy::*)(const arg &, obrpc::ObSrvRpcProxy::AsyncCB *, const obrpc::ObRpcOpts &), obrpc::ObSrvRpcProxy> name }//end namespace obrpc }//end namespace oceanbase diff --git a/src/storage/high_availability/ob_finish_transfer.cpp b/src/storage/high_availability/ob_finish_transfer.cpp index 42ba6625e..41c48a85b 100644 --- a/src/storage/high_availability/ob_finish_transfer.cpp +++ b/src/storage/high_availability/ob_finish_transfer.cpp @@ -511,9 +511,9 @@ int ObTxFinishTransfer::inner_check_ls_logical_table_replaced_(const uint64_t te const common::ObArray &tablet_list, const int64_t quorum, bool &all_backfilled) { int ret = OB_SUCCESS; - int tmp_ret = OB_SUCCESS; - int64_t cur_quorum = 0; - const int64_t cluster_id = GCONF.cluster_id; + all_backfilled = true; + storage::ObCheckTransferTabletBackfillProxy batch_proxy( + *(GCTX.storage_rpc_proxy_), &obrpc::ObStorageRpcProxy::check_transfer_tablet_backfill_completed); FOREACH_X(location, member_addr_list, OB_SUCC(ret)) { if (OB_ISNULL(location)) { @@ -521,20 +521,55 @@ int ObTxFinishTransfer::inner_check_ls_logical_table_replaced_(const uint64_t te LOG_WARN("location should not be null", K(ret)); } else { const common::ObAddr &server = *location; - bool backfill_completed = false; - if (OB_FAIL(post_check_logical_table_replaced_request_( - cluster_id, server, tenant_id, dest_ls_id, tablet_list, backfill_completed))) { - LOG_WARN("failed to check criteria", K(ret), K(cluster_id), K(tenant_id), K(dest_ls_id), K(server)); - } else if (!backfill_completed) { - LOG_INFO("server has not finish backfill", K(tenant_id), K(dest_ls_id), K(quorum), K(member_addr_list), K(server)); + const int64_t timeout = GCONF.rpc_timeout; + const int64_t cluster_id = GCONF.cluster_id; + const uint64_t group_id = share::OBCG_STORAGE_HA_LEVEL2; + ObCheckTransferTabletBackfillArg arg; + arg.tenant_id_ = tenant_id; + arg.ls_id_ = dest_ls_id; + if (OB_FAIL(arg.tablet_list_.assign(tablet_list))) { + LOG_WARN("failed to assign tablet array", K(ret), K(tablet_list)); + } else if (OB_FAIL(batch_proxy.call(server, + timeout, + cluster_id, + tenant_id, + group_id, + arg))) { + LOG_WARN("failed to send check transfer tablet backfill request", K(ret), K(server), K(tenant_id)); } else { - cur_quorum++; - LOG_INFO("server has replayed passed finish scn", K(ret), K(server)); + LOG_INFO("check_transfer_tablet_backfill_completed", K(arg), K(server)); } } } - if (OB_SUCC(ret)) { - all_backfilled = cur_quorum == quorum; + ObArray return_code_array; + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(batch_proxy.wait_all(return_code_array))) { + LOG_WARN("fail to wait all batch result", KR(ret), KR(tmp_ret)); + ret = OB_SUCC(ret) ? tmp_ret : ret; + } + if (OB_FAIL(ret)) { + } else if (return_code_array.count() != member_addr_list.count() + || return_code_array.count() != batch_proxy.get_results().count()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("cnt not match", K(ret), + "return_cnt", return_code_array.count(), + "result_cnt", batch_proxy.get_results().count(), + "server_cnt", member_addr_list.count()); + } else { + ARRAY_FOREACH_X(batch_proxy.get_results(), idx, cnt, OB_SUCC(ret)) { + const ObCheckTransferTabletBackfillRes *response = batch_proxy.get_results().at(idx); + const int res_ret = return_code_array.at(idx); + if (OB_SUCCESS != res_ret) { + ret = res_ret; + LOG_WARN("rpc execute failed", KR(ret), K(idx)); + } else if (OB_ISNULL(response)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("response is null", K(ret)); + } else if (!response->backfill_finished_) { + all_backfilled = false; + break; + } + } } return ret; } @@ -975,36 +1010,6 @@ int ObTxFinishTransfer::fetch_ls_replay_scn_(const ObTransferTaskID &task_id, co return ret; } -int ObTxFinishTransfer::post_check_logical_table_replaced_request_(const int64_t cluster_id, - const common::ObAddr &server_addr, const uint64_t tenant_id, const share::ObLSID &dest_ls_id, - const common::ObIArray &tablet_list, bool &replace_finished) -{ - int ret = OB_SUCCESS; - replace_finished = false; - ObLSService *ls_service = NULL; - storage::ObStorageRpc *storage_rpc = NULL; - storage::ObStorageHASrcInfo src_info; - src_info.src_addr_ = server_addr; - src_info.cluster_id_ = GCONF.cluster_id; - if (!src_info.is_valid() || !dest_ls_id.is_valid()) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("get invalid args", K(ret), K(src_info), K(dest_ls_id)); - } else if (OB_ISNULL(ls_service = MTL_WITH_CHECK_TENANT(ObLSService *, tenant_id))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("log stream service is NULL", K(ret)); - } else if (OB_ISNULL(storage_rpc = ls_service->get_storage_rpc())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("storage rpc proxy is NULL", K(ret)); - } else if (OB_FAIL(storage_rpc->check_tablets_logical_table_replaced( - tenant_id, src_info, dest_ls_id, tablet_list, replace_finished))) { - LOG_WARN( - "failed to post transfer backfill finished", K(ret), K(tenant_id), K(src_info), K(dest_ls_id), K(tablet_list)); - } else { - LOG_INFO("check logical table replaced completed", K(tenant_id), K(src_info), K(dest_ls_id), K(replace_finished)); - } - return ret; -} - int ObTxFinishTransfer::check_self_ls_leader_(const share::ObLSID &ls_id, bool &is_leader) { int ret = OB_SUCCESS; diff --git a/src/storage/high_availability/ob_finish_transfer.h b/src/storage/high_availability/ob_finish_transfer.h index 93035518d..99dfcac35 100644 --- a/src/storage/high_availability/ob_finish_transfer.h +++ b/src/storage/high_availability/ob_finish_transfer.h @@ -23,6 +23,7 @@ #include "lib/lock/ob_thread_cond.h" #include "share/transfer/ob_transfer_info.h" #include "share/ob_balance_define.h" +#include "storage/ob_storage_async_rpc.h" namespace oceanbase { namespace storage { @@ -211,16 +212,6 @@ private: int fetch_ls_replay_scn_(const share::ObTransferTaskID &task_id, const int64_t cluster_id, const common::ObAddr &server_addr, const uint64_t tenant_id, const share::ObLSID &ls_id, share::SCN &finish_scn); - // post check if logical sstable has been replaced - // @param[in]: cluster_id - // @param[in]: server_addr - // @param[in]: tenant_id - // @param[in]: ls_id - // @param[out] backfill completed - int post_check_logical_table_replaced_request_(const int64_t cluster_id, const common::ObAddr &server_addr, - const uint64_t tenant_id, const share::ObLSID &dest_ls_id, - const common::ObIArray &tablet_list, bool &backfill_completed); - // check self is leader // @param[in]: ls_id // @param[out]: is_leader diff --git a/src/storage/high_availability/ob_ls_member_list_service.cpp b/src/storage/high_availability/ob_ls_member_list_service.cpp index c4090dd8e..c35eaa224 100644 --- a/src/storage/high_availability/ob_ls_member_list_service.cpp +++ b/src/storage/high_availability/ob_ls_member_list_service.cpp @@ -17,6 +17,7 @@ #include "storage/high_availability/ob_storage_ha_src_provider.h" #include "storage/meta_mem/ob_tenant_meta_mem_mgr.h" #include "storage/tablet/ob_tablet_iterator.h" +#include "storage/tablet/ob_tablet.h" namespace oceanbase { @@ -237,45 +238,69 @@ int ObLSMemberListService::get_leader_config_version_and_transfer_scn_( share::SCN &leader_transfer_scn) { int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; ObLSService *ls_svr = NULL; common::ObAddr addr; const bool need_get_config_version = true; + storage::ObHAChangeMemberProxy proxy( + *(GCTX.storage_rpc_proxy_), &obrpc::ObStorageRpcProxy::get_config_version_and_transfer_scn); if (OB_ISNULL(ls_svr = (MTL(ObLSService *)))) { ret = OB_ERR_UNEXPECTED; STORAGE_LOG(WARN, "ls service should not be NULL", K(ret), KP(ls_svr)); } else if (OB_FAIL(ObStorageHAUtils::get_ls_leader(ls_->get_tenant_id(), ls_->get_ls_id(), addr))) { STORAGE_LOG(WARN, "failed to get ls leader", K(ret), KPC(ls_)); - } else if (OB_FAIL(get_config_version_and_transfer_scn_(need_get_config_version, addr, leader_config_version, leader_transfer_scn))) { + } else if (OB_FAIL(get_config_version_and_transfer_scn_(proxy, + addr, + need_get_config_version, + ls_->get_tenant_id(), + ls_->get_ls_id()))) { STORAGE_LOG(WARN, "failed to get config version and transfer scn", K(ret), K(addr)); } + ObArray return_code_array; + if (OB_TMP_FAIL(proxy.wait_all(return_code_array))) { + STORAGE_LOG(WARN, "fail to wait all batch result", KR(ret), KR(tmp_ret)); + ret = OB_SUCC(ret) ? tmp_ret : ret; + } + int64_t check_pass_count = 0; + if (OB_FAIL(ret)) { + } else if (1 != return_code_array.count()) { + ret = OB_ERR_UNEXPECTED; + STORAGE_LOG(WARN, "cnt not match", KR(ret), + "return_cnt", return_code_array.count()); + } else if (OB_FAIL(process_result_from_async_rpc_(proxy, + addr, + return_code_array, + false/*for_standby*/, + check_pass_count, + leader_config_version, + leader_transfer_scn))) { + STORAGE_LOG(WARN, "failed to process result from async rpc", KR(ret), KR(tmp_ret)); + } return ret; } int ObLSMemberListService::get_config_version_and_transfer_scn_( - const bool need_get_config_version, + ObHAChangeMemberProxy &proxy, const common::ObAddr &addr, - palf::LogConfigVersion &config_version, - share::SCN &transfer_scn) + const bool need_get_config_version, + const uint64_t tenant_id, + const share::ObLSID &ls_id) { int ret = OB_SUCCESS; - ObLSService *ls_svr = NULL; - ObStorageRpc *storage_rpc = NULL; - ObStorageHASrcInfo src_info; - src_info.cluster_id_ = GCONF.cluster_id; - src_info.src_addr_ = addr; - if (OB_ISNULL(ls_svr = (MTL(ObLSService *)))) { - ret = OB_ERR_UNEXPECTED; - STORAGE_LOG(WARN, "ls service should not be NULL", K(ret), KP(ls_svr)); - } else if (OB_ISNULL(storage_rpc = ls_svr->get_storage_rpc())) { - ret = OB_ERR_UNEXPECTED; - STORAGE_LOG(WARN, "storage rpc should not be NULL", K(ret), KP(storage_rpc)); - } else if (OB_FAIL(storage_rpc->get_config_version_and_transfer_scn(ls_->get_tenant_id(), - src_info, - ls_->get_ls_id(), - need_get_config_version, - config_version, - transfer_scn))) { - STORAGE_LOG(WARN, "failed to get config version and transfer scn", K(ret), KPC(ls_)); + ObStorageChangeMemberArg arg; + arg.tenant_id_ = tenant_id; + arg.ls_id_ = ls_id; + arg.need_get_config_version_ = need_get_config_version; + const int64_t cluster_id = GCONF.cluster_id; + const int64_t timeout = GCONF.sys_bkgd_migration_change_member_list_timeout; + const uint64_t group_id = share::OBCG_STORAGE_HA_LEVEL2; + if (OB_FAIL(proxy.call(addr, + timeout, + cluster_id, + tenant_id, + group_id, + arg))) { + STORAGE_LOG(WARN, "failed to call get config version and transfer scn", K(ret), K(addr), K(timeout), K(tenant_id), K(arg)); } return ret; } @@ -335,6 +360,10 @@ int ObLSMemberListService::check_ls_transfer_scn_validity_(palf::LogConfigVersio STORAGE_LOG(WARN, "failed to check ls transfer scn validity for primary", K(ret), KP_(ls)); } } else {//standby restore + SERVER_EVENT_SYNC_ADD("storage_ha", "before_check_ls_transfer_scn_validity_for_standby", + "tenant_id", ls_->get_tenant_id(), + "ls_id", ls_->get_ls_id().id()); + DEBUG_SYNC(BEFORE_CHECK_LS_TRANSFER_SCN_FOR_STANDBY); if (OB_FAIL(check_ls_transfer_scn_validity_for_standby_(leader_config_version))) { STORAGE_LOG(WARN, "failed to check ls transfer scn validity for standby", K(ret), KP_(ls)); } @@ -374,33 +403,57 @@ int ObLSMemberListService::check_ls_transfer_scn_validity_for_standby_(palf::Log STORAGE_LOG(WARN, "failed to get ls leader", K(ret), KPC(ls_)); } else { int64_t check_pass_count = 0; + storage::ObHAChangeMemberProxy batch_proxy( + *(GCTX.storage_rpc_proxy_), &obrpc::ObStorageRpcProxy::get_config_version_and_transfer_scn); for (int64_t i = 0; OB_SUCC(ret) && i < addr_list.count(); ++i) { const ObAddr &addr = addr_list.at(i); bool check_pass = false; share::SCN transfer_scn; palf::LogConfigVersion config_version; bool need_get_config_version = (addr == leader_addr); - if (OB_TMP_FAIL(get_config_version_and_transfer_scn_(need_get_config_version, addr, config_version, transfer_scn))) { + if (OB_FAIL(get_config_version_and_transfer_scn_(batch_proxy, + addr, + need_get_config_version, + ls_->get_tenant_id(), + ls_->get_ls_id()))) { STORAGE_LOG(WARN, "failed to get config version and transfer scn", K(ret), K(addr)); - } else if (OB_FAIL(check_ls_transfer_scn_(transfer_scn, check_pass))) { - STORAGE_LOG(WARN, "failed to check ls transfer scn", K(ret), K(transfer_scn)); - } else { - if (addr == leader_addr) { - if (!config_version.is_valid()) { - ret = OB_ERR_UNEXPECTED; - STORAGE_LOG(WARN, "config version is not valid", K(ret), K(config_version)); - } else { - leader_config_version = config_version; - } - } - check_pass_count++; } } + ObArray return_code_array; + if (OB_TMP_FAIL(batch_proxy.wait_all(return_code_array))) { + STORAGE_LOG(WARN, "fail to wait all batch result", KR(ret), KR(tmp_ret)); + ret = OB_SUCC(ret) ? tmp_ret : ret; + } + share::SCN leader_transfer_scn; + if (OB_FAIL(ret)) { + } else if (return_code_array.count() != addr_list.count() + || return_code_array.count() != batch_proxy.get_results().count()) { + ret = OB_ERR_UNEXPECTED; + STORAGE_LOG(WARN, "cnt not match", KR(ret), + "return_cnt", return_code_array.count(), + "result_cnt", batch_proxy.get_results().count(), + "server_cnt", addr_list.count()); + } else if (OB_FAIL(process_result_from_async_rpc_(batch_proxy, + leader_addr, + return_code_array, + true/*for_standby*/, + check_pass_count, + leader_config_version, + leader_transfer_scn))) { + STORAGE_LOG(WARN, "failed to process result from async rpc", KR(ret), KR(tmp_ret)); + } if (OB_SUCC(ret)) { // standby check transfer scn need reach majority if (check_pass_count < (addr_list.count() / 2 + 1)) { ret = OB_LS_TRANSFER_SCN_TOO_SMALL; STORAGE_LOG(WARN, "transfer scn compare do not reach majority", K(ret), K(addr_list)); +#ifdef ERRSIM + SERVER_EVENT_ADD("storage_ha", "standby_check_transfer_scn_too_small", + "tenant_id", ls_->get_tenant_id(), + "ls_id", ls_->get_ls_id().id(), + "member_list_count", addr_list.count(), + "check_pass_count", check_pass_count); +#endif } else { STORAGE_LOG(INFO, "passed transfer scn check for standby", K(ret), K(addr_list), K(check_pass_count)); } @@ -409,5 +462,48 @@ int ObLSMemberListService::check_ls_transfer_scn_validity_for_standby_(palf::Log return ret; } +int ObLSMemberListService::process_result_from_async_rpc_( + ObHAChangeMemberProxy &proxy, + const common::ObAddr &leader_addr, + const common::ObIArray &return_code_array, + const bool for_standby, + int64_t &pass_count, + palf::LogConfigVersion &leader_config_version, + share::SCN &leader_transfer_scn) +{ + int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + ARRAY_FOREACH_X(proxy.get_results(), idx, cnt, OB_SUCC(ret)) { + const ObStorageChangeMemberRes *response = proxy.get_results().at(idx); + bool check_pass = false; + if (OB_ISNULL(response)) { + tmp_ret = OB_ERR_UNEXPECTED; + STORAGE_LOG(WARN, "hb_response is null", KR(ret), KR(tmp_ret)); + } else if (for_standby && OB_FAIL(check_ls_transfer_scn_(response->transfer_scn_, check_pass))) { + STORAGE_LOG(WARN, "failed to check ls transfer scn", K(ret)); + } else if (for_standby && !check_pass) { + continue; + } else { + const palf::LogConfigVersion &config_version = response->config_version_; + const ObAddr &addr = proxy.get_dests().at(idx); + if (addr == leader_addr) { + if (!config_version.is_valid()) { + ret = OB_ERR_UNEXPECTED; + STORAGE_LOG(WARN, "config version is not valid", K(ret), K(config_version)); + } else { + leader_config_version = config_version; + leader_transfer_scn = response->transfer_scn_; + } + } + if (OB_SUCC(ret)) { + pass_count++; + } + } + } + return ret; +} + + + } } diff --git a/src/storage/high_availability/ob_ls_member_list_service.h b/src/storage/high_availability/ob_ls_member_list_service.h index d5c6352fe..3d8640e21 100644 --- a/src/storage/high_availability/ob_ls_member_list_service.h +++ b/src/storage/high_availability/ob_ls_member_list_service.h @@ -15,6 +15,7 @@ #include "logservice/ob_log_handler.h" #include "common/ob_member.h" +#include "storage/ob_storage_async_rpc.h" namespace oceanbase { @@ -53,15 +54,27 @@ private: palf::LogConfigVersion &leader_config_version, share::SCN &leader_transfer_scn); int get_config_version_and_transfer_scn_( - const bool need_get_config_version, + ObHAChangeMemberProxy &proxy, const common::ObAddr &addr, - palf::LogConfigVersion &config_version, - share::SCN &transfer_scn); + const bool need_get_config_version, + const uint64_t tenant_id, + const share::ObLSID &ls_id); int check_ls_transfer_scn_(const share::SCN &transfer_scn, bool &is_match); int get_ls_member_list_(common::ObIArray &addr_list); int check_ls_transfer_scn_validity_(palf::LogConfigVersion &leader_config_version); int check_ls_transfer_scn_validity_for_primary_(palf::LogConfigVersion &leader_config_version); int check_ls_transfer_scn_validity_for_standby_(palf::LogConfigVersion &leader_config_version); + +private: + int process_result_from_async_rpc_( + ObHAChangeMemberProxy &proxy, + const common::ObAddr &leader_addr, + const common::ObIArray &return_code_array, + const bool for_standby, + int64_t &pass_count, + palf::LogConfigVersion &leader_config_version, + share::SCN &leader_transfer_scn); + private: bool is_inited_; storage::ObLS *ls_; diff --git a/src/storage/high_availability/ob_transfer_handler.cpp b/src/storage/high_availability/ob_transfer_handler.cpp index be7c38941..096fca962 100644 --- a/src/storage/high_availability/ob_transfer_handler.cpp +++ b/src/storage/high_availability/ob_transfer_handler.cpp @@ -817,14 +817,46 @@ int ObTransferHandler::check_start_status_transfer_tablets_( } else if (OB_FAIL(member_list.get_addr_array(member_addr_list))) { LOG_WARN("failed to get addr array", K(ret), K(task_info), K(member_list)); } else { + storage::ObCheckStartTransferTabletsProxy batch_proxy( + *(GCTX.storage_rpc_proxy_), &obrpc::ObStorageRpcProxy::check_start_transfer_tablets); for (int64_t i = 0; OB_SUCC(ret) && i < member_addr_list.count(); ++i) { const ObAddr &addr = member_addr_list.at(i); - ObStorageHASrcInfo src_info; - src_info.src_addr_ = addr; - src_info.cluster_id_ = cluster_id; - if (OB_FAIL(storage_rpc_->check_start_transfer_tablets(task_info.tenant_id_, - src_info, task_info.src_ls_id_, task_info.dest_ls_id_, task_info.tablet_list_))) { - LOG_WARN("failed to check src transfer tablets", K(ret), K(task_info), K(src_info)); + ObTransferTabletInfoArg arg; + arg.tenant_id_ = task_info.tenant_id_; + arg.src_ls_id_ = task_info.src_ls_id_; + arg.dest_ls_id_ = task_info.dest_ls_id_; + const int64_t timeout = GCONF.rpc_timeout; + const int64_t cluster_id = GCONF.cluster_id; + const uint64_t group_id = share::OBCG_STORAGE_HA_LEVEL2; + if (OB_FAIL(arg.tablet_list_.assign(task_info.tablet_list_))) { + LOG_WARN("failed to assign tablet list", K(ret), K(task_info)); + } else if (OB_FAIL(batch_proxy.call(addr, + timeout, + cluster_id, + task_info.tenant_id_, + group_id, + arg))) { + STORAGE_LOG(WARN, "failed to call check start transfer tablets", K(ret), K(addr), K(task_info), K(arg)); + } + } + ObArray return_code_array; + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(batch_proxy.wait_all(return_code_array))) { + STORAGE_LOG(WARN, "fail to wait all batch result", KR(ret), KR(tmp_ret)); + ret = OB_SUCC(ret) ? tmp_ret : ret; + } + if (OB_FAIL(ret)) { + } else if (return_code_array.count() != member_addr_list.count()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("cnt not match", KR(ret), + "return_cnt", return_code_array.count(), + "server_cnt", member_addr_list.count()); + } + for (int64_t i = 0; OB_SUCC(ret) && i < return_code_array.count(); ++i) { + const int res_ret = return_code_array.at(i); + if (OB_SUCCESS != res_ret) { + ret = res_ret; + LOG_WARN("rpc execute failed", KR(ret), K(i)); } } } diff --git a/src/storage/ob_storage_async_rpc.h b/src/storage/ob_storage_async_rpc.h new file mode 100644 index 000000000..55b90cc09 --- /dev/null +++ b/src/storage/ob_storage_async_rpc.h @@ -0,0 +1,39 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OCEANBASE_STORAGE_STORAGE_ASYNC_RPC_H_ +#define OCEANBASE_STORAGE_STORAGE_ASYNC_RPC_H_ + +#include "share/ob_rpc_struct.h" +#include "share/ob_srv_rpc_proxy.h" +#include "share/rpc/ob_async_rpc_proxy.h" +#include "rpc/obrpc/ob_rpc_packet.h" +#include "rpc/obrpc/ob_rpc_result_code.h" +#include "rpc/obrpc/ob_rpc_proxy.h" + +namespace oceanbase +{ +namespace storage +{ + +#define RPC_HA(code, arg, result, name) \ + typedef obrpc::ObAsyncRpcProxy *, const obrpc::ObRpcOpts &), obrpc::ObStorageRpcProxy> name + +RPC_HA(obrpc::OB_HA_CHECK_TRANSFER_TABLET_BACKFILL, obrpc::ObCheckTransferTabletBackfillArg, obrpc::ObCheckTransferTabletBackfillRes, ObCheckTransferTabletBackfillProxy); +RPC_HA(obrpc::OB_HA_CHANGE_MEMBER_SERVICE, obrpc::ObStorageChangeMemberArg, obrpc::ObStorageChangeMemberRes, ObHAChangeMemberProxy); +RPC_HA(obrpc::OB_CHECK_START_TRANSFER_TABLETS, obrpc::ObTransferTabletInfoArg, obrpc::Int64, ObCheckStartTransferTabletsProxy); + +}//end namespace storage +}//end namespace oceanbase + +#endif //OCEANBASE_STORAGE_STORAGE_ASYNC_RPC_H_ diff --git a/src/storage/ob_storage_rpc.cpp b/src/storage/ob_storage_rpc.cpp index 663f435ab..7cf7d0f1a 100644 --- a/src/storage/ob_storage_rpc.cpp +++ b/src/storage/ob_storage_rpc.cpp @@ -883,7 +883,6 @@ bool ObGetTransferStartScnRes::is_valid() const OB_SERIALIZE_MEMBER(ObGetTransferStartScnRes, start_scn_); - ObTransferTabletInfoArg::ObTransferTabletInfoArg() : tenant_id_(OB_INVALID_ID), src_ls_id_(), @@ -900,6 +899,22 @@ void ObTransferTabletInfoArg::reset() tablet_list_.reset(); } +int ObTransferTabletInfoArg::assign(const ObTransferTabletInfoArg &other) +{ + int ret = OB_SUCCESS; + if (!other.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("get invalid arg", K(ret), K(other)); + } else if (OB_FAIL(tablet_list_.assign(other.tablet_list_))) { + LOG_WARN("failed to assign tablet list", K(ret), K(other)); + } else { + tenant_id_ = other.tenant_id_; + src_ls_id_ = other.src_ls_id_; + dest_ls_id_ = other.dest_ls_id_; + } + return ret; +} + bool ObTransferTabletInfoArg::is_valid() const { return OB_INVALID_ID != tenant_id_ @@ -939,35 +954,59 @@ bool ObFetchLSReplayScnRes::is_valid() const return replay_scn_.is_valid(); } OB_SERIALIZE_MEMBER(ObFetchLSReplayScnRes, replay_scn_); + + ObCheckTransferTabletBackfillArg::ObCheckTransferTabletBackfillArg() : tenant_id_(OB_INVALID_ID), ls_id_(), tablet_list_() { } + bool ObCheckTransferTabletBackfillArg::is_valid() const { return OB_INVALID_ID != tenant_id_ && ls_id_.is_valid() && !tablet_list_.empty(); } + void ObCheckTransferTabletBackfillArg::reset() { tenant_id_ = OB_INVALID_ID; ls_id_.reset(); tablet_list_.reset(); } + +int ObCheckTransferTabletBackfillArg::assign(const ObCheckTransferTabletBackfillArg &other) +{ + int ret = OB_SUCCESS; + if (!other.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("get invalid arg", K(ret), K(other)); + } else if (OB_FAIL(tablet_list_.assign(other.tablet_list_))) { + LOG_WARN("failed to assign tablet list", K(ret), K(other)); + } else { + tenant_id_ = other.tenant_id_; + ls_id_ = other.ls_id_; + } + return ret; +} + OB_SERIALIZE_MEMBER(ObCheckTransferTabletBackfillArg, tenant_id_, ls_id_, tablet_list_); + ObCheckTransferTabletBackfillRes::ObCheckTransferTabletBackfillRes() : backfill_finished_(false) { } + void ObCheckTransferTabletBackfillRes::reset() { backfill_finished_ = false; } + OB_SERIALIZE_MEMBER(ObCheckTransferTabletBackfillRes, backfill_finished_); + ObStorageChangeMemberArg::ObStorageChangeMemberArg() : tenant_id_(OB_INVALID_ID), ls_id_(), @@ -977,7 +1016,7 @@ ObStorageChangeMemberArg::ObStorageChangeMemberArg() bool ObStorageChangeMemberArg::is_valid() const { - return OB_INVALID_ID == tenant_id_ + return OB_INVALID_ID != tenant_id_ && ls_id_.is_valid(); } @@ -987,6 +1026,20 @@ void ObStorageChangeMemberArg::reset() ls_id_.reset(); } +int ObStorageChangeMemberArg::assign(const ObStorageChangeMemberArg &other) +{ + int ret = OB_SUCCESS; + if (!other.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("get invalid arg", K(ret), K(other)); + } else { + tenant_id_ = other.tenant_id_; + ls_id_ = other.ls_id_; + need_get_config_version_ = other.need_get_config_version_; + } + return ret; +} + OB_SERIALIZE_MEMBER(ObStorageChangeMemberArg, tenant_id_, ls_id_, need_get_config_version_); ObStorageChangeMemberRes::ObStorageChangeMemberRes() @@ -2013,13 +2066,29 @@ int ObFetchSSTableMacroInfoP::fetch_sstable_macro_range_info_(const obrpc::ObCop return ret; } -ObCheckStartTransferTabletsP::ObCheckStartTransferTabletsP( - common::ObInOutBandwidthThrottle *bandwidth_throttle) - : ObStorageStreamRpcP(bandwidth_throttle) +ObCheckStartTransferTabletsDelegate::ObCheckStartTransferTabletsDelegate() + : is_inited_(false), + arg_() +{} + +int ObCheckStartTransferTabletsDelegate::init(const obrpc::ObTransferTabletInfoArg &arg) { + int ret = OB_SUCCESS; + if (IS_INIT) { + ret = OB_INIT_TWICE; + LOG_WARN("check start transfer tablets delegate init twice", K(ret)); + } else if (!arg.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("get invalid arg", K(ret), K(arg)); + } else if (OB_FAIL(arg_.assign(arg))) { + LOG_WARN("failed to assign arg", K(ret), K(arg)); + } else { + is_inited_ = true; + } + return ret; } -int ObCheckStartTransferTabletsP::process() +int ObCheckStartTransferTabletsDelegate::process() { int ret = OB_SUCCESS; MTL_SWITCH(arg_.tenant_id_) { @@ -2029,7 +2098,6 @@ int ObCheckStartTransferTabletsP::process() ObDeviceHealthStatus dhs = DEVICE_HEALTH_NORMAL; int64_t disk_abnormal_time = 0; ObMigrationStatus migration_status = ObMigrationStatus::OB_MIGRATION_STATUS_MAX; - #ifdef ERRSIM if (OB_SUCC(ret) && DEVICE_HEALTH_NORMAL == dhs && GCONF.fake_disk_error) { dhs = DEVICE_HEALTH_ERROR; @@ -2058,7 +2126,7 @@ int ObCheckStartTransferTabletsP::process() // In addition to the tablet in the recovery process, if the major sstable does not exist on the tablet, the transfer start will fail. // For tablets with ddl sstable, you need to wait for ddl merge to complete -int ObCheckStartTransferTabletsP::check_transfer_out_tablet_sstable_(const ObTablet *tablet) +int ObCheckStartTransferTabletsDelegate::check_transfer_out_tablet_sstable_(const ObTablet *tablet) { int ret = OB_SUCCESS; ObTableStoreIterator ddl_iter; @@ -2082,14 +2150,13 @@ int ObCheckStartTransferTabletsP::check_transfer_out_tablet_sstable_(const ObTab return ret; } -int ObCheckStartTransferTabletsP::check_start_transfer_out_tablets_() +int ObCheckStartTransferTabletsDelegate::check_start_transfer_out_tablets_() { int ret = OB_SUCCESS; ObLSHandle ls_handle; ObLSService *ls_service = nullptr; ObLS *ls = nullptr; ObMigrationStatus migration_status = ObMigrationStatus::OB_MIGRATION_STATUS_MAX; - if (OB_ISNULL(ls_service = MTL(ObLSService *))) { ret = OB_ERR_UNEXPECTED; STORAGE_LOG(WARN, "ls service should not be null", K(ret), KP(ls_service)); @@ -2138,14 +2205,13 @@ int ObCheckStartTransferTabletsP::check_start_transfer_out_tablets_() return ret; } -int ObCheckStartTransferTabletsP::check_start_transfer_in_tablets_() +int ObCheckStartTransferTabletsDelegate::check_start_transfer_in_tablets_() { int ret = OB_SUCCESS; ObLSHandle ls_handle; ObLSService *ls_service = nullptr; ObLS *ls = nullptr; ObMigrationStatus migration_status = ObMigrationStatus::OB_MIGRATION_STATUS_MAX; - if (OB_ISNULL(ls_service = MTL(ObLSService *))) { ret = OB_ERR_UNEXPECTED; STORAGE_LOG(WARN, "ls service should not be null", K(ret), KP(ls_service)); @@ -2668,12 +2734,32 @@ int ObFetchLSReplayScnP::process() } return ret; } -ObCheckTransferTabletsBackfillP::ObCheckTransferTabletsBackfillP( - common::ObInOutBandwidthThrottle *bandwidth_throttle) - : ObStorageStreamRpcP(bandwidth_throttle) + +ObCheckTransferTabletsBackfillDelegate::ObCheckTransferTabletsBackfillDelegate(obrpc::ObCheckTransferTabletBackfillRes &result) + : is_inited_(false), + arg_(), + result_(result) +{} + +int ObCheckTransferTabletsBackfillDelegate::init( + const obrpc::ObCheckTransferTabletBackfillArg &arg) { + int ret = OB_SUCCESS; + if (IS_INIT) { + ret = OB_INIT_TWICE; + LOG_WARN("check start transfer backfill delegate init twice", K(ret)); + } else if (!arg.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("get invalid arg", K(ret), K(arg)); + } else if (OB_FAIL(arg_.assign(arg))) { + LOG_WARN("failed to assign arg", K(ret), K(arg)); + } else { + is_inited_ = true; + } + return ret; } -int ObCheckTransferTabletsBackfillP::process() + +int ObCheckTransferTabletsBackfillDelegate::process() { int ret = OB_SUCCESS; MTL_SWITCH(arg_.tenant_id_) { @@ -2682,11 +2768,7 @@ int ObCheckTransferTabletsBackfillP::process() ObLS *ls = NULL; ObTransferService *transfer_service = NULL; LOG_INFO("check transfer tablet", K(arg_)); - if (OB_ISNULL(bandwidth_throttle_)) { - ret = OB_ERR_UNEXPECTED; - STORAGE_LOG(ERROR, "bandwidth_throttle_ must not null", K(ret), - KP_(bandwidth_throttle)); - } else if (OB_ISNULL(ls_service = MTL(ObLSService *))) { + if (OB_ISNULL(ls_service = MTL(ObLSService *))) { ret = OB_ERR_UNEXPECTED; STORAGE_LOG(WARN, "ls service should not be null", K(ret), KP(ls_service)); } else if (OB_FAIL(ls_service->get_ls(arg_.ls_id_, ls_handle, ObLSGetMod::STORAGE_MOD))) { @@ -2721,7 +2803,44 @@ int ObCheckTransferTabletsBackfillP::process() } return ret; } -int ObCheckTransferTabletsBackfillP::check_has_transfer_table_( + +int ObCheckTransferTabletsBackfillP::process() +{ + int ret = OB_SUCCESS; + ObCheckTransferTabletsBackfillDelegate delegate(result_); + if (OB_FAIL(delegate.init(arg_))) { + LOG_WARN("failed to init delegate", K(ret)); + } else if (OB_FAIL(delegate.process())) { + LOG_WARN("failed to do process", K(ret), K_(arg)); + } + return ret; +} + +int ObStorageGetConfigVersionAndTransferScnP::process() +{ + int ret = OB_SUCCESS; + ObStorageGetConfigVersionAndTransferScnDelegate delegate(result_); + if (OB_FAIL(delegate.init(arg_))) { + LOG_WARN("failed to init delegate", K(ret)); + } else if (OB_FAIL(delegate.process())) { + LOG_WARN("failed to do process", K(ret), K_(arg)); + } + return ret; +} + +int ObCheckStartTransferTabletsP::process() +{ + int ret = OB_SUCCESS; + ObCheckStartTransferTabletsDelegate delegate; + if (OB_FAIL(delegate.init(arg_))) { + LOG_WARN("failed to init delegate", K(ret)); + } else if (OB_FAIL(delegate.process())) { + LOG_WARN("failed to do process", K(ret), K_(arg)); + } + return ret; +} + +int ObCheckTransferTabletsBackfillDelegate::check_has_transfer_table_( const ObTransferTabletInfo &tablet_info, storage::ObLS *ls, bool &has_transfer_table) { int ret = OB_SUCCESS; @@ -2746,13 +2865,31 @@ int ObCheckTransferTabletsBackfillP::check_has_transfer_table_( return ret; } -ObStorageGetConfigVersionAndTransferScnP::ObStorageGetConfigVersionAndTransferScnP( - common::ObInOutBandwidthThrottle *bandwidth_throttle) - : ObStorageStreamRpcP(bandwidth_throttle) +ObStorageGetConfigVersionAndTransferScnDelegate::ObStorageGetConfigVersionAndTransferScnDelegate(obrpc::ObStorageChangeMemberRes &result) + : is_inited_(false), + arg_(), + result_(result) { } -int ObStorageGetConfigVersionAndTransferScnP::process() +int ObStorageGetConfigVersionAndTransferScnDelegate::init(const obrpc::ObStorageChangeMemberArg &arg) +{ + int ret = OB_SUCCESS; + if (IS_INIT) { + ret = OB_INIT_TWICE; + LOG_WARN("check start transfer backfill delegate init twice", K(ret)); + } else if (!arg.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("get invalid arg", K(ret), K(arg)); + } else if (OB_FAIL(arg_.assign(arg))) { + LOG_WARN("failed to assign arg", K(ret), K(arg)); + } else { + is_inited_ = true; + } + return ret; +} + +int ObStorageGetConfigVersionAndTransferScnDelegate::process() { int ret = OB_SUCCESS; const uint64_t tenant_id = arg_.tenant_id_; @@ -2779,6 +2916,7 @@ int ObStorageGetConfigVersionAndTransferScnP::process() LOG_INFO("get config version and transfer scn succ", K(tenant_id), K(ls_id), K(result_)); } } + return ret; } @@ -3273,38 +3411,6 @@ int ObStorageRpc::update_ls_meta( return ret; } -int ObStorageRpc::check_start_transfer_tablets( - const uint64_t tenant_id, - const ObStorageHASrcInfo &src_info, - const share::ObLSID &src_ls_id, - const share::ObLSID &dest_ls_id, - const common::ObIArray &tablet_array) -{ - int ret = OB_SUCCESS; - if (!is_inited_) { - ret = OB_NOT_INIT; - STORAGE_LOG(WARN, "storage rpc is not inited", K(ret)); - } else if (tenant_id == OB_INVALID_ID || !src_info.is_valid() || !src_ls_id.is_valid() || !dest_ls_id.is_valid() || tablet_array.empty()) { - ret = OB_INVALID_ARGUMENT; - STORAGE_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(src_info), K(src_ls_id), K(dest_ls_id), K(tablet_array)); - } else { - ObTransferTabletInfoArg arg; - arg.tenant_id_ = tenant_id; - arg.src_ls_id_ = src_ls_id; - arg.dest_ls_id_ = dest_ls_id; - if (OB_FAIL(arg.tablet_list_.assign(tablet_array))) { - LOG_WARN("failed to assign tablet list", K(ret), K(tablet_array)); - } else if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_) - .by(tenant_id) - .dst_cluster_id(src_info.cluster_id_) - .group_id(share::OBCG_STORAGE_HA_LEVEL2) - .check_start_transfer_tablets(arg))) { - LOG_WARN("failed to check src transfer tablets", K(ret), K(src_info), K(arg)); - } - } - return ret; -} - int ObStorageRpc::get_ls_active_trans_count( const uint64_t tenant_id, const ObStorageHASrcInfo &src_info, @@ -3410,79 +3516,6 @@ int ObStorageRpc::fetch_ls_replay_scn( } return ret; } -int ObStorageRpc::check_tablets_logical_table_replaced( - const uint64_t tenant_id, - const ObStorageHASrcInfo &src_info, - const share::ObLSID &dest_ls_id, - const common::ObIArray& tablet_array, - bool &backfill_finished) -{ - int ret = OB_SUCCESS; - backfill_finished = false; - if (!is_inited_) { - ret = OB_NOT_INIT; - STORAGE_LOG(WARN, "storage rpc is not inited", K(ret)); - } else if (tenant_id == OB_INVALID_ID || !src_info.is_valid() || !dest_ls_id.is_valid()) { - ret = OB_INVALID_ARGUMENT; - STORAGE_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(src_info), K(dest_ls_id)); - } else { - ObCheckTransferTabletBackfillArg arg; - ObCheckTransferTabletBackfillRes res; - arg.tenant_id_ = tenant_id; - arg.ls_id_ = dest_ls_id; - if (OB_FAIL(arg.tablet_list_.assign(tablet_array))) { - LOG_WARN("failed to assign tablet array", K(ret), K(tablet_array)); - } else if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_) - .by(tenant_id) - .dst_cluster_id(src_info.cluster_id_) - .group_id(share::OBCG_STORAGE_HA_LEVEL2) - .check_transfer_tablet_backfill_completed(arg, res))) { - LOG_WARN("failed to check tablets backfill completed", K(ret), K(src_info), K(arg)); - } else { - backfill_finished = res.backfill_finished_; - } - } - return ret; -} - -int ObStorageRpc::get_config_version_and_transfer_scn( - const uint64_t tenant_id, - const ObStorageHASrcInfo &src_info, - const share::ObLSID &ls_id, - const bool need_get_config_version, - palf::LogConfigVersion &config_version, - share::SCN &transfer_scn) -{ - int ret = OB_SUCCESS; - config_version.reset(); - transfer_scn.reset(); - if (!is_inited_) { - ret = OB_NOT_INIT; - STORAGE_LOG(WARN, "storage rpc is not inited", K(ret)); - } else if (tenant_id == OB_INVALID_ID || !src_info.is_valid() || !ls_id.is_valid()) { - ret = OB_INVALID_ARGUMENT; - STORAGE_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(src_info), K(ls_id)); - } else { - ObStorageChangeMemberArg arg; - ObStorageChangeMemberRes res; - arg.tenant_id_ = tenant_id; - arg.ls_id_ = ls_id; - arg.need_get_config_version_ = need_get_config_version; - const int64_t timeout = GCONF.sys_bkgd_migration_change_member_list_timeout; - if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_) - .by(tenant_id) - .timeout(timeout) - .dst_cluster_id(src_info.cluster_id_) - .get_config_version_and_transfer_scn(arg, res))) { - LOG_WARN("failed to get config version and transfer scn", K(ret), K(src_info), K(arg)); - } else { - config_version = res.config_version_; - transfer_scn = res.transfer_scn_; - FLOG_INFO("get config version and transfer scn succ", K(tenant_id), K(src_info), K(ls_id)); - } - } - return ret; -} int ObStorageRpc::lock_config_change( const uint64_t tenant_id, diff --git a/src/storage/ob_storage_rpc.h b/src/storage/ob_storage_rpc.h index 12a2c22fd..b0244f775 100644 --- a/src/storage/ob_storage_rpc.h +++ b/src/storage/ob_storage_rpc.h @@ -19,6 +19,7 @@ #include "rpc/obrpc/ob_rpc_proxy.h" #include "rpc/obrpc/ob_rpc_processor.h" #include "rpc/obrpc/ob_rpc_result_code.h" +#include "share/rpc/ob_async_rpc_proxy.h" #include "common/ob_member.h" #include "storage/ob_storage_struct.h" #include "observer/ob_server_struct.h" @@ -31,6 +32,7 @@ #include "share/transfer/ob_transfer_info.h" #include "storage/lob/ob_lob_rpc_struct.h" #include "storage/blocksstable/ob_logic_macro_id.h" +#include "share/rpc/ob_async_rpc_proxy.h" namespace oceanbase { @@ -537,6 +539,7 @@ public: ObTransferTabletInfoArg(); ~ObTransferTabletInfoArg() {} bool is_valid() const; + int assign(const ObTransferTabletInfoArg &other); void reset(); TO_STRING_KV(K_(tenant_id), K_(src_ls_id), K_(dest_ls_id), K_(tablet_list)); @@ -581,6 +584,7 @@ public: ~ObCheckTransferTabletBackfillArg() {} bool is_valid() const; void reset(); + int assign(const ObCheckTransferTabletBackfillArg &other); TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(tablet_list)); uint64_t tenant_id_; share::ObLSID ls_id_; @@ -608,6 +612,7 @@ public: ~ObStorageChangeMemberArg() {} bool is_valid() const; void reset(); + int assign(const ObStorageChangeMemberArg &other); TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(need_get_config_version)); uint64_t tenant_id_; share::ObLSID ls_id_; @@ -746,17 +751,19 @@ public: RPC_S(PR5 notify_restore_tablets, OB_HA_NOTIFY_RESTORE_TABLETS, (ObNotifyRestoreTabletsArg), ObNotifyRestoreTabletsResp); RPC_S(PR5 inquire_restore, OB_HA_NOTIFY_FOLLOWER_RESTORE, (ObInquireRestoreArg), ObInquireRestoreResp); RPC_S(PR5 update_ls_meta, OB_HA_UPDATE_LS_META, (ObRestoreUpdateLSMetaArg)); - RPC_S(PR5 check_start_transfer_tablets, OB_CHECK_START_TRANSFER_TABLETS, (ObTransferTabletInfoArg)); RPC_S(PR5 get_ls_active_trans_count, OB_GET_LS_ACTIVE_TRANSACTION_COUNT, (ObGetLSActiveTransCountArg), ObGetLSActiveTransCountRes); RPC_S(PR5 get_transfer_start_scn, OB_GET_TRANSFER_START_SCN, (ObGetTransferStartScnArg), ObGetTransferStartScnRes); RPC_S(PR5 fetch_ls_replay_scn, OB_HA_FETCH_LS_REPLAY_SCN, (ObFetchLSReplayScnArg), ObFetchLSReplayScnRes); - RPC_S(PR5 check_transfer_tablet_backfill_completed, OB_HA_CHECK_TRANSFER_TABLET_BACKFILL, (ObCheckTransferTabletBackfillArg), ObCheckTransferTabletBackfillRes); - RPC_S(PR5 get_config_version_and_transfer_scn, OB_HA_CHANGE_MEMBER_SERVICE, (ObStorageChangeMemberArg), ObStorageChangeMemberRes); RPC_S(PR5 lock_config_change, OB_HA_LOCK_CONFIG_CHANGE, (ObStorageConfigChangeOpArg), ObStorageConfigChangeOpRes); RPC_S(PR5 unlock_config_change, OB_HA_UNLOCK_CONFIG_CHANGE, (ObStorageConfigChangeOpArg), ObStorageConfigChangeOpRes); RPC_S(PR5 get_config_change_lock_stat, OB_HA_GET_CONFIG_CHANGE_LOCK_STAT, (ObStorageConfigChangeOpArg), ObStorageConfigChangeOpRes); RPC_S(PR5 wakeup_transfer_service, OB_HA_WAKEUP_TRANSFER_SERVICE, (ObStorageWakeupTransferServiceArg)); RPC_S(PR5 fetch_ls_member_and_learner_list, OB_HA_FETCH_LS_MEMBER_AND_LEARNER_LIST, (ObFetchLSMemberAndLearnerListArg), ObFetchLSMemberAndLearnerListInfo); + + // RPC_AP stands for asynchronous RPC. + RPC_AP(PR5 check_transfer_tablet_backfill_completed, OB_HA_CHECK_TRANSFER_TABLET_BACKFILL, (obrpc::ObCheckTransferTabletBackfillArg), obrpc::ObCheckTransferTabletBackfillRes); + RPC_AP(PR5 get_config_version_and_transfer_scn, OB_HA_CHANGE_MEMBER_SERVICE, (obrpc::ObStorageChangeMemberArg), obrpc::ObStorageChangeMemberRes); + RPC_AP(PR5 check_start_transfer_tablets, OB_CHECK_START_TRANSFER_TABLETS, (obrpc::ObTransferTabletInfoArg), obrpc::Int64); }; template @@ -906,18 +913,23 @@ protected: int process(); }; -class ObCheckStartTransferTabletsP : public ObStorageStreamRpcP +class ObCheckStartTransferTabletsDelegate final { public: - explicit ObCheckStartTransferTabletsP(common::ObInOutBandwidthThrottle *bandwidth_throttle); - virtual ~ObCheckStartTransferTabletsP() {} -protected: + ObCheckStartTransferTabletsDelegate(); + int init(const obrpc::ObTransferTabletInfoArg &arg); int process(); + private: int check_start_transfer_out_tablets_(); int check_start_transfer_in_tablets_(); // Major sstable or ddl sstable needs to exist in src_tablet int check_transfer_out_tablet_sstable_(const ObTablet *tablet); + +private: + bool is_inited_; + obrpc::ObTransferTabletInfoArg arg_; + DISALLOW_COPY_AND_ASSIGN(ObCheckStartTransferTabletsDelegate); }; class ObGetLSActiveTransCountP : public ObStorageStreamRpcP @@ -947,27 +959,65 @@ public: protected: int process(); }; + class ObCheckTransferTabletsBackfillP: - public ObStorageStreamRpcP + public ObStorageRpcProxy::Processor { public: - explicit ObCheckTransferTabletsBackfillP(common::ObInOutBandwidthThrottle *bandwidth_throttle); + ObCheckTransferTabletsBackfillP() = default; virtual ~ObCheckTransferTabletsBackfillP() {} protected: int process(); +}; + +class ObStorageGetConfigVersionAndTransferScnP: + public ObStorageRpcProxy::Processor +{ +public: + ObStorageGetConfigVersionAndTransferScnP() = default; + virtual ~ObStorageGetConfigVersionAndTransferScnP() {} +protected: + int process(); +}; + +class ObCheckStartTransferTabletsP: + public ObStorageRpcProxy::Processor +{ +public: + ObCheckStartTransferTabletsP() = default; + virtual ~ObCheckStartTransferTabletsP() {} +protected: + int process(); +}; + +class ObCheckTransferTabletsBackfillDelegate final +{ +public: + ObCheckTransferTabletsBackfillDelegate(obrpc::ObCheckTransferTabletBackfillRes &result); + int init(const obrpc::ObCheckTransferTabletBackfillArg &arg); + int process(); private: int check_has_transfer_table_(const share::ObTransferTabletInfo &tablet_info, storage::ObLS *ls, bool &has_transfer_table); +private: + bool is_inited_; + obrpc::ObCheckTransferTabletBackfillArg arg_; + obrpc::ObCheckTransferTabletBackfillRes &result_; + DISALLOW_COPY_AND_ASSIGN(ObCheckTransferTabletsBackfillDelegate); }; -class ObStorageGetConfigVersionAndTransferScnP: - public ObStorageStreamRpcP +class ObStorageGetConfigVersionAndTransferScnDelegate final { public: - explicit ObStorageGetConfigVersionAndTransferScnP(common::ObInOutBandwidthThrottle *bandwidth_throttle); - virtual ~ObStorageGetConfigVersionAndTransferScnP() {} -protected: + ObStorageGetConfigVersionAndTransferScnDelegate(obrpc::ObStorageChangeMemberRes &result); + int init(const obrpc::ObStorageChangeMemberArg &arg); int process(); + +private: + bool is_inited_; + obrpc::ObStorageChangeMemberArg arg_; + obrpc::ObStorageChangeMemberRes &result_; + DISALLOW_COPY_AND_ASSIGN(ObStorageGetConfigVersionAndTransferScnDelegate); }; class ObLobQueryP : public ObStorageStreamRpcP @@ -982,7 +1032,6 @@ private: int process_getlength(); }; - // Stream get ls meta and all tablet meta class ObStorageFetchLSViewP: public ObStorageStreamRpcP @@ -1095,13 +1144,6 @@ public: const ObStorageHASrcInfo &dest_info, const storage::ObLSMetaPackage &ls_meta) = 0; - virtual int check_start_transfer_tablets( - const uint64_t tenant_id, - const ObStorageHASrcInfo &src_info, - const share::ObLSID &src_ls_id, - const share::ObLSID &dest_ls_id, - const common::ObIArray& tablet_array) = 0; - virtual int get_ls_active_trans_count( const uint64_t tenant_id, const ObStorageHASrcInfo &src_info, @@ -1120,19 +1162,6 @@ public: const ObStorageHASrcInfo &src_info, const share::ObLSID &ls_id, share::SCN &ls_replay_scn) = 0; - virtual int check_tablets_logical_table_replaced( - const uint64_t tenant_id, - const ObStorageHASrcInfo &src_info, - const share::ObLSID &dest_ls_id, - const common::ObIArray &tablet_array, - bool &replace_finished) = 0; - virtual int get_config_version_and_transfer_scn( - const uint64_t tenant_id, - const ObStorageHASrcInfo &src_info, - const share::ObLSID &ls_id, - const bool need_get_config_version, - palf::LogConfigVersion &config_version, - share::SCN &transfer_scn) = 0; virtual int lock_config_change( const uint64_t tenant_id, const ObStorageHASrcInfo &src_info, @@ -1209,13 +1238,6 @@ public: const ObStorageHASrcInfo &dest_info, const storage::ObLSMetaPackage &ls_meta); - virtual int check_start_transfer_tablets( - const uint64_t tenant_id, - const ObStorageHASrcInfo &src_info, - const share::ObLSID &src_ls_id, - const share::ObLSID &dest_ls_id, - const common::ObIArray& tablet_array); - virtual int get_ls_active_trans_count( const uint64_t tenant_id, const ObStorageHASrcInfo &src_info, @@ -1234,19 +1256,6 @@ public: const ObStorageHASrcInfo &src_info, const share::ObLSID &ls_id, share::SCN &ls_replay_scn); - virtual int check_tablets_logical_table_replaced( - const uint64_t tenant_id, - const ObStorageHASrcInfo &src_info, - const share::ObLSID &dest_ls_id, - const common::ObIArray &tablet_array, - bool &replace_finished); - virtual int get_config_version_and_transfer_scn( - const uint64_t tenant_id, - const ObStorageHASrcInfo &src_info, - const share::ObLSID &ls_id, - const bool need_get_config_version, - palf::LogConfigVersion &config_version, - share::SCN &transfer_scn); virtual int lock_config_change( const uint64_t tenant_id, const ObStorageHASrcInfo &src_info,