[CP] change sync rpc to async for transfer and migration

This commit is contained in:
oceanoverflow 2023-11-10 11:13:01 +00:00 committed by ob-robot
parent 7aec297f5c
commit 5895ce4113
17 changed files with 586 additions and 348 deletions

View File

@ -2974,6 +2974,16 @@ int PalfHandleImpl::receive_log_(const common::ObAddr &server,
{
int ret = OB_SUCCESS;
TruncateLogInfo truncate_log_info;
#ifdef ERRSIM
if (!GCONF.palf_inject_receive_log_error_zone.get_value_string().empty()) {
if (0 == strcmp(GCONF.zone.str(), GCONF.palf_inject_receive_log_error_zone.str())) {
ret = OB_ERROR;
LOG_WARN("palf receive log errsim", K(ret));
return ret;
}
}
#endif
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (!server.is_valid() || INVALID_PROPOSAL_ID == msg_proposal_id || !lsn.is_valid()

View File

@ -160,7 +160,7 @@ ObServer::ObServer()
prepare_stop_(true), stop_(true), has_stopped_(true), has_destroy_(false),
net_frame_(gctx_), sql_conn_pool_(), ddl_conn_pool_(), dblink_conn_pool_(),
res_inner_conn_pool_(), restore_ctx_(), srv_rpc_proxy_(),
rs_rpc_proxy_(), sql_proxy_(),
storage_rpc_proxy_(), rs_rpc_proxy_(), sql_proxy_(),
dblink_proxy_(),
executor_proxy_(), executor_rpc_(), dbms_job_rpc_proxy_(), dbms_sched_job_rpc_proxy_(), interrupt_proxy_(),
config_(ObServerConfig::get_instance()),
@ -2239,6 +2239,8 @@ int ObServer::init_network()
LOG_ERROR("init server network fail");
} else if (OB_FAIL(net_frame_.get_proxy(srv_rpc_proxy_))) {
LOG_ERROR("get rpc proxy fail", KR(ret));
} else if (OB_FAIL(net_frame_.get_proxy(storage_rpc_proxy_))) {
LOG_ERROR("get rpc proxy fail", KR(ret));
} else if (OB_FAIL(net_frame_.get_proxy(rs_rpc_proxy_))) {
LOG_ERROR("get rpc proxy fail", KR(ret));
} else if (OB_FAIL(net_frame_.get_proxy(executor_proxy_))) {
@ -2488,6 +2490,7 @@ int ObServer::init_global_context()
gctx_.lst_operator_ = &lst_operator_;
gctx_.tablet_operator_ = &tablet_operator_;
gctx_.srv_rpc_proxy_ = &srv_rpc_proxy_;
gctx_.storage_rpc_proxy_ = &storage_rpc_proxy_;
gctx_.dbms_job_rpc_proxy_ = &dbms_job_rpc_proxy_;
gctx_.inner_sql_rpc_proxy_ = &inner_sql_rpc_proxy_;
gctx_.dbms_sched_job_rpc_proxy_ = &dbms_sched_job_rpc_proxy_;

View File

@ -338,6 +338,7 @@ private:
// The two proxies by which local OceanBase server has ability to
// communicate with other server.
obrpc::ObSrvRpcProxy srv_rpc_proxy_;
obrpc::ObStorageRpcProxy storage_rpc_proxy_;
obrpc::ObCommonRpcProxy rs_rpc_proxy_;
common::ObMySQLProxy sql_proxy_;
common::ObMySQLProxy ddl_sql_proxy_;

View File

@ -69,6 +69,7 @@ DEF_TO_STRING(ObGlobalContext)
KP_(lst_operator),
KP_(tablet_operator),
KP_(srv_rpc_proxy),
KP_(storage_rpc_proxy),
KP_(rs_rpc_proxy),
KP_(load_data_proxy),
KP_(executor_rpc),
@ -110,6 +111,7 @@ ObGlobalContext &ObGlobalContext::operator=(const ObGlobalContext &other)
lst_operator_ = other.lst_operator_;
tablet_operator_ = other.tablet_operator_;
srv_rpc_proxy_ = other.srv_rpc_proxy_;
storage_rpc_proxy_ = other.storage_rpc_proxy_;
rs_rpc_proxy_ = other.rs_rpc_proxy_;
load_data_proxy_ = other.load_data_proxy_;
inner_sql_rpc_proxy_ = other.inner_sql_rpc_proxy_;

View File

@ -34,6 +34,7 @@ class ObMysqlRandom;
namespace obrpc
{
class ObSrvRpcProxy;
class ObStorageRpcProxy;
class ObCommonRpcProxy;
class ObLoadDataRpcProxy;
class ObDBMSJobRpcProxy;
@ -234,6 +235,7 @@ struct ObGlobalContext
share::ObLSTableOperator *lst_operator_;
share::ObTabletTableOperator *tablet_operator_;
obrpc::ObSrvRpcProxy *srv_rpc_proxy_;
obrpc::ObStorageRpcProxy *storage_rpc_proxy_;
obrpc::ObDBMSJobRpcProxy *dbms_job_rpc_proxy_;
obrpc::ObInnerSQLRpcProxy *inner_sql_rpc_proxy_;
obrpc::ObDBMSSchedJobRpcProxy *dbms_sched_job_rpc_proxy_;

View File

@ -155,12 +155,12 @@ void oceanbase::observer::init_srv_xlator_for_migration(ObSrvRpcXlator *xlator)
RPC_PROCESSOR(ObUpdateLSMetaP, gctx_.bandwidth_throttle_);
//transfer
RPC_PROCESSOR(ObCheckStartTransferTabletsP, gctx_.bandwidth_throttle_);
RPC_PROCESSOR(ObCheckStartTransferTabletsP);
RPC_PROCESSOR(ObGetLSActiveTransCountP, gctx_.bandwidth_throttle_);
RPC_PROCESSOR(ObGetTransferStartScnP, gctx_.bandwidth_throttle_);
RPC_PROCESSOR(ObFetchLSReplayScnP, gctx_.bandwidth_throttle_);
RPC_PROCESSOR(ObCheckTransferTabletsBackfillP, gctx_.bandwidth_throttle_);
RPC_PROCESSOR(ObStorageGetConfigVersionAndTransferScnP, gctx_.bandwidth_throttle_);
RPC_PROCESSOR(ObCheckTransferTabletsBackfillP);
RPC_PROCESSOR(ObStorageGetConfigVersionAndTransferScnP);
RPC_PROCESSOR(ObStorageLockConfigChangeP, gctx_.bandwidth_throttle_);
RPC_PROCESSOR(ObStorageUnlockConfigChangeP, gctx_.bandwidth_throttle_);
RPC_PROCESSOR(ObStorageGetLogConfigStatP, gctx_.bandwidth_throttle_);

View File

@ -558,6 +558,7 @@ class ObString;
ACT(BEFORE_CHECK_PRIMARY_ZONE,)\
ACT(BEFORE_RELOAD_UNIT,)\
ACT(BEFORE_PROCESS_EVENT_TASK,)\
ACT(BEFORE_CHECK_LS_TRANSFER_SCN_FOR_STANDBY,)\
ACT(MAX_DEBUG_SYNC_POINT,)
DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF);

View File

@ -1734,7 +1734,6 @@ DEF_STR_WITH_CHECKER(sql_protocol_min_tls_version, OB_CLUSTER_PARAMETER, "none",
DEF_MODE_WITH_PARSER(_obkv_feature_mode, OB_CLUSTER_PARAMETER, "", common::ObKvFeatureModeParser,
"_obkv_feature_mode is a option list to control specified OBKV features on/off.",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_BOOL(_enable_range_extraction_for_not_in, OB_TENANT_PARAMETER, "True",
"Enable extract query range for not in predicate",
ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
@ -1742,3 +1741,5 @@ DEF_BOOL(_enable_range_extraction_for_not_in, OB_TENANT_PARAMETER, "True",
// DEF_BOOL(_enable_new_query_range_extraction, OB_TENANT_PARAMETER, "True",
// "decide whether use new algorithm to extract query range.",
// ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
ERRSIM_DEF_STR(palf_inject_receive_log_error_zone, OB_CLUSTER_PARAMETER, "", "specifies the zone name that palf module want to inject error when receive log",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));

View File

@ -27,11 +27,11 @@ namespace oceanbase
namespace obrpc
{
template<ObRpcPacketCode PC, typename AsyncRpcProxy>
class ObAsyncCB : public ObSrvRpcProxy::AsyncCB<PC>,
public common::ObDLinkBase<ObAsyncCB<PC, AsyncRpcProxy> >
template<ObRpcPacketCode PC, typename AsyncRpcProxy, typename RpcProxy>
class ObAsyncCB : public RpcProxy::template AsyncCB<PC>,
public common::ObDLinkBase<ObAsyncCB<PC, AsyncRpcProxy, RpcProxy> >
{
using AsyncCB = typename ObSrvRpcProxy::AsyncCB<PC>;
using AsyncCB = typename RpcProxy::template AsyncCB<PC>;
public:
ObAsyncCB(AsyncRpcProxy &proxy) : proxy_(proxy) {}
virtual ~ObAsyncCB() {}
@ -57,8 +57,8 @@ private:
AsyncRpcProxy &proxy_;
};
template<ObRpcPacketCode PC, typename AsyncRpcProxy>
rpc::frame::ObReqTransport::AsyncCB *ObAsyncCB<PC, AsyncRpcProxy>::clone(
template<ObRpcPacketCode PC, typename AsyncRpcProxy, typename RpcProxy>
rpc::frame::ObReqTransport::AsyncCB *ObAsyncCB<PC, AsyncRpcProxy, RpcProxy>::clone(
const rpc::frame::SPAlloc &alloc) const
{
UNUSED(alloc);
@ -66,8 +66,8 @@ rpc::frame::ObReqTransport::AsyncCB *ObAsyncCB<PC, AsyncRpcProxy>::clone(
static_cast<const rpc::frame::ObReqTransport::AsyncCB * const>(this));
}
template<ObRpcPacketCode PC, typename AsyncRpcProxy>
int ObAsyncCB<PC, AsyncRpcProxy>::process()
template<ObRpcPacketCode PC, typename AsyncRpcProxy, typename RpcProxy>
int ObAsyncCB<PC, AsyncRpcProxy, RpcProxy>::process()
{
int ret = common::OB_SUCCESS;
if (OB_FAIL(proxy_.receive_response())) {
@ -76,8 +76,8 @@ int ObAsyncCB<PC, AsyncRpcProxy>::process()
return ret;
}
template<ObRpcPacketCode PC, typename AsyncRpcProxy>
void ObAsyncCB<PC, AsyncRpcProxy>::on_timeout()
template<ObRpcPacketCode PC, typename AsyncRpcProxy, typename RpcProxy>
void ObAsyncCB<PC, AsyncRpcProxy, RpcProxy>::on_timeout()
{
int ret = common::OB_SUCCESS;
RPC_LOG(WARN, "some error in rcode and enter on_timeout", K(AsyncCB::rcode_.rcode_));
@ -87,8 +87,8 @@ void ObAsyncCB<PC, AsyncRpcProxy>::on_timeout()
}
}
template<ObRpcPacketCode PC, typename AsyncRpcProxy>
void ObAsyncCB<PC, AsyncRpcProxy>::on_invalid()
template<ObRpcPacketCode PC, typename AsyncRpcProxy, typename RpcProxy>
void ObAsyncCB<PC, AsyncRpcProxy, RpcProxy>::on_invalid()
{
int tmp_ret = common::OB_SUCCESS;
AsyncCB::rcode_.rcode_ = common::OB_RPC_PACKET_INVALID;
@ -97,7 +97,7 @@ void ObAsyncCB<PC, AsyncRpcProxy>::on_invalid()
}
}
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func>
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func, typename RpcProxy>
class ObAsyncRpcProxy
{
public:
@ -106,7 +106,7 @@ public:
bool is_valid() const { return true; }
TO_STRING_EMPTY();
};
ObAsyncRpcProxy(ObSrvRpcProxy &rpc_proxy, const Func &func);
ObAsyncRpcProxy(RpcProxy &rpc_proxy, const Func &func);
virtual ~ObAsyncRpcProxy();
void reuse();
@ -142,29 +142,29 @@ public:
int receive_response();
private:
int call_rpc(const common::ObAddr &server, const int64_t timeout, const int64_t cluster_id,
const uint64_t tenant_id, const RpcArg &arg, ObAsyncCB<PC, ObAsyncRpcProxy> *cb);
const uint64_t tenant_id, const RpcArg &arg, ObAsyncCB<PC, ObAsyncRpcProxy, RpcProxy> *cb);
int call_rpc(const common::ObAddr &server, const int64_t timeout, const int64_t cluster_id,
const uint64_t tenant_id, const uint64_t group_id, const RpcArg &arg,
ObAsyncCB<PC, ObAsyncRpcProxy> *cb);
ObAsyncCB<PC, ObAsyncRpcProxy, RpcProxy> *cb);
int call_rpc(const common::ObAddr &server, const int64_t timeout, const uint64_t tenant_id,
const EmptyType &empty_obj, ObAsyncCB<PC, ObAsyncRpcProxy> *cb);
const EmptyType &empty_obj, ObAsyncCB<PC, ObAsyncRpcProxy, RpcProxy> *cb);
int wait(common::ObIArray<int> *return_code_array, const bool return_rpc_error);
ObSrvRpcProxy &rpc_proxy_;
RpcProxy &rpc_proxy_;
common::ObArray<RpcArg> args_;
common::ObArray<common::ObAddr> dests_;
common::ObArray<const RpcResult *> results_;
Func func_;
common::ObArenaAllocator allocator_;
common::ObDList<ObAsyncCB<PC, ObAsyncRpcProxy> > cb_list_;
common::ObDList<ObAsyncCB<PC, ObAsyncRpcProxy, RpcProxy> > cb_list_;
int64_t response_count_;
common::ObThreadCond cond_;
private:
DISALLOW_COPY_AND_ASSIGN(ObAsyncRpcProxy);
};
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func>
ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func>::ObAsyncRpcProxy(
ObSrvRpcProxy &rpc_proxy, const Func &func)
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func, typename RpcProxy>
ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func, RpcProxy>::ObAsyncRpcProxy(
RpcProxy &rpc_proxy, const Func &func)
: rpc_proxy_(rpc_proxy), args_(), results_(),
func_(func), allocator_(common::ObModIds::OB_ASYNC_RPC_PROXY),
cb_list_(), response_count_(0), cond_()
@ -175,20 +175,20 @@ ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func>::ObAsyncRpcProxy(
}
}
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func>
ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func>::~ObAsyncRpcProxy()
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func, typename RpcProxy>
ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func, RpcProxy>::~ObAsyncRpcProxy()
{
reuse();
}
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func>
void ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func>::reuse()
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func, typename RpcProxy>
void ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func, RpcProxy>::reuse()
{
args_.reuse();
results_.reuse();
response_count_ = 0;
ObAsyncCB<PC, ObAsyncRpcProxy> *cb = cb_list_.get_first();
ObAsyncCB<PC, ObAsyncRpcProxy> *next = NULL;
ObAsyncCB<PC, ObAsyncRpcProxy, RpcProxy> *cb = cb_list_.get_first();
ObAsyncCB<PC, ObAsyncRpcProxy, RpcProxy> *next = NULL;
while (cb != cb_list_.get_header()) {
next = cb->get_next();
cb->~ObAsyncCB();
@ -198,8 +198,8 @@ void ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func>::reuse()
allocator_.reuse();
}
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func>::call(
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func, typename RpcProxy>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func, RpcProxy>::call(
const common::ObAddr &server,
const int64_t timeout)
{
@ -221,8 +221,8 @@ int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func>::call(
return ret;
}
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func>::call(
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func, typename RpcProxy>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func, RpcProxy>::call(
const common::ObAddr &server,
const int64_t timeout,
const RpcArg &arg)
@ -230,8 +230,8 @@ int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func>::call(
return call(server, timeout, common::OB_INVALID_CLUSTER_ID, OB_SYS_TENANT_ID, 0, arg);
}
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func>::call(
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func, typename RpcProxy>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func, RpcProxy>::call(
const common::ObAddr &server,
const int64_t timeout,
const uint64_t tenant_id,
@ -240,8 +240,8 @@ int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func>::call(
return call(server, timeout, common::OB_INVALID_CLUSTER_ID, tenant_id, 0, arg);
}
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func>::call(
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func, typename RpcProxy>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func, RpcProxy>::call(
const common::ObAddr &server,
const int64_t timeout,
const int64_t cluster_id,
@ -251,8 +251,8 @@ int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func>::call(
return call(server, timeout, cluster_id, tenant_id, 0, arg);
}
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func>::call(
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func, typename RpcProxy>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func, RpcProxy>::call(
const common::ObAddr &server,
const int64_t timeout,
const int64_t cluster_id,
@ -265,12 +265,12 @@ int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func>::call(
if (!server.is_valid() || timeout <= 0 || !arg.is_valid()) {
ret = common::OB_INVALID_ARGUMENT;
RPC_LOG(WARN, "invalid argument", K(server), K(timeout), K(arg), KR(ret));
} else if (NULL == (mem = allocator_.alloc(sizeof(ObAsyncCB<PC, ObAsyncRpcProxy>)))) {
} else if (NULL == (mem = allocator_.alloc(sizeof(ObAsyncCB<PC, ObAsyncRpcProxy, RpcProxy>)))) {
ret = common::OB_ALLOCATE_MEMORY_FAILED;
RPC_LOG(ERROR, "alloc memory failed",
"size", sizeof(ObAsyncCB<PC, ObAsyncRpcProxy>), KR(ret));
"size", sizeof(ObAsyncCB<PC, ObAsyncRpcProxy, RpcProxy>), KR(ret));
} else {
ObAsyncCB<PC, ObAsyncRpcProxy> *cb = new (mem) ObAsyncCB<PC, ObAsyncRpcProxy>(*this);
ObAsyncCB<PC, ObAsyncRpcProxy, RpcProxy> *cb = new (mem) ObAsyncCB<PC, ObAsyncRpcProxy, RpcProxy>(*this);
if (!cb_list_.add_last(cb)) {
ret = common::OB_ERR_UNEXPECTED;
RPC_LOG(WARN, "cb_list add_last failed", KR(ret));
@ -304,11 +304,11 @@ int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func>::call(
return ret;
}
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func>::call_rpc(
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func, typename RpcProxy>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func, RpcProxy>::call_rpc(
const common::ObAddr &server, const int64_t timeout,
const int64_t cluster_id, const uint64_t tenant_id,
const RpcArg &arg, ObAsyncCB<PC, ObAsyncRpcProxy> *cb)
const RpcArg &arg, ObAsyncCB<PC, ObAsyncRpcProxy, RpcProxy> *cb)
{
int ret = common::OB_SUCCESS;
if (!server.is_valid() || timeout <= 0 || !arg.is_valid() || NULL == cb) {
@ -329,12 +329,12 @@ int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func>::call_rpc(
return ret;
}
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func>::call_rpc(
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func, typename RpcProxy>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func, RpcProxy>::call_rpc(
const common::ObAddr &server, const int64_t timeout,
const int64_t cluster_id, const uint64_t tenant_id,
const uint64_t group_id, const RpcArg &arg,
ObAsyncCB<PC, ObAsyncRpcProxy> *cb)
ObAsyncCB<PC, ObAsyncRpcProxy, RpcProxy> *cb)
{
int ret = common::OB_SUCCESS;
if (!server.is_valid() || timeout <= 0 || !arg.is_valid() || NULL == cb) {
@ -356,10 +356,10 @@ int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func>::call_rpc(
return ret;
}
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func>::call_rpc(
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func, typename RpcProxy>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func, RpcProxy>::call_rpc(
const common::ObAddr &server, const int64_t timeout, const uint64_t tenant_id,
const EmptyType &empty_obj, ObAsyncCB<PC, ObAsyncRpcProxy> *cb)
const EmptyType &empty_obj, ObAsyncCB<PC, ObAsyncRpcProxy, RpcProxy> *cb)
{
UNUSED(empty_obj);
int ret = common::OB_SUCCESS;
@ -373,23 +373,23 @@ int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func>::call_rpc(
return ret;
}
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func>::wait()
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func, typename RpcProxy>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func, RpcProxy>::wait()
{
common::ObIArray<int> *return_code_array = NULL;
const bool return_rpc_error = true;
return wait(return_code_array, return_rpc_error);
}
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func>::wait_all(common::ObIArray<int> &return_code_array)
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func, typename RpcProxy>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func, RpcProxy>::wait_all(common::ObIArray<int> &return_code_array)
{
const bool return_rpc_error = false;
return wait(&return_code_array, return_rpc_error);
}
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func>::wait(
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func, typename RpcProxy>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func, RpcProxy>::wait(
common::ObIArray<int> *return_code_array, const bool return_rpc_error)
{
int ret = common::OB_SUCCESS;
@ -406,7 +406,7 @@ int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func>::wait(
// set results
int index = 0;
ObAsyncCB<PC, ObAsyncRpcProxy> *cb = cb_list_.get_first();
ObAsyncCB<PC, ObAsyncRpcProxy, RpcProxy> *cb = cb_list_.get_first();
while (common::OB_SUCCESS == ret && cb != cb_list_.get_header()) {
if (NULL == cb) {
ret = common::OB_ERR_UNEXPECTED;
@ -447,8 +447,8 @@ int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func>::wait(
return ret;
}
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func>::receive_response()
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func, typename RpcProxy>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func, RpcProxy>::receive_response()
{
int ret = common::OB_SUCCESS;
common::ObThreadCondGuard guard(cond_);
@ -469,7 +469,7 @@ int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func>::receive_response()
#define RPC_F(code, arg, result, name) \
typedef obrpc::ObAsyncRpcProxy<code, arg, result, \
int (obrpc::ObSrvRpcProxy::*)(const arg &, obrpc::ObSrvRpcProxy::AsyncCB<code> *, const obrpc::ObRpcOpts &)> name
int (obrpc::ObSrvRpcProxy::*)(const arg &, obrpc::ObSrvRpcProxy::AsyncCB<code> *, const obrpc::ObRpcOpts &), obrpc::ObSrvRpcProxy> name
}//end namespace obrpc
}//end namespace oceanbase

View File

@ -511,9 +511,9 @@ int ObTxFinishTransfer::inner_check_ls_logical_table_replaced_(const uint64_t te
const common::ObArray<ObTransferTabletInfo> &tablet_list, const int64_t quorum, bool &all_backfilled)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
int64_t cur_quorum = 0;
const int64_t cluster_id = GCONF.cluster_id;
all_backfilled = true;
storage::ObCheckTransferTabletBackfillProxy batch_proxy(
*(GCTX.storage_rpc_proxy_), &obrpc::ObStorageRpcProxy::check_transfer_tablet_backfill_completed);
FOREACH_X(location, member_addr_list, OB_SUCC(ret))
{
if (OB_ISNULL(location)) {
@ -521,20 +521,55 @@ int ObTxFinishTransfer::inner_check_ls_logical_table_replaced_(const uint64_t te
LOG_WARN("location should not be null", K(ret));
} else {
const common::ObAddr &server = *location;
bool backfill_completed = false;
if (OB_FAIL(post_check_logical_table_replaced_request_(
cluster_id, server, tenant_id, dest_ls_id, tablet_list, backfill_completed))) {
LOG_WARN("failed to check criteria", K(ret), K(cluster_id), K(tenant_id), K(dest_ls_id), K(server));
} else if (!backfill_completed) {
LOG_INFO("server has not finish backfill", K(tenant_id), K(dest_ls_id), K(quorum), K(member_addr_list), K(server));
const int64_t timeout = GCONF.rpc_timeout;
const int64_t cluster_id = GCONF.cluster_id;
const uint64_t group_id = share::OBCG_STORAGE_HA_LEVEL2;
ObCheckTransferTabletBackfillArg arg;
arg.tenant_id_ = tenant_id;
arg.ls_id_ = dest_ls_id;
if (OB_FAIL(arg.tablet_list_.assign(tablet_list))) {
LOG_WARN("failed to assign tablet array", K(ret), K(tablet_list));
} else if (OB_FAIL(batch_proxy.call(server,
timeout,
cluster_id,
tenant_id,
group_id,
arg))) {
LOG_WARN("failed to send check transfer tablet backfill request", K(ret), K(server), K(tenant_id));
} else {
cur_quorum++;
LOG_INFO("server has replayed passed finish scn", K(ret), K(server));
LOG_INFO("check_transfer_tablet_backfill_completed", K(arg), K(server));
}
}
}
if (OB_SUCC(ret)) {
all_backfilled = cur_quorum == quorum;
ObArray<int> return_code_array;
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(batch_proxy.wait_all(return_code_array))) {
LOG_WARN("fail to wait all batch result", KR(ret), KR(tmp_ret));
ret = OB_SUCC(ret) ? tmp_ret : ret;
}
if (OB_FAIL(ret)) {
} else if (return_code_array.count() != member_addr_list.count()
|| return_code_array.count() != batch_proxy.get_results().count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("cnt not match", K(ret),
"return_cnt", return_code_array.count(),
"result_cnt", batch_proxy.get_results().count(),
"server_cnt", member_addr_list.count());
} else {
ARRAY_FOREACH_X(batch_proxy.get_results(), idx, cnt, OB_SUCC(ret)) {
const ObCheckTransferTabletBackfillRes *response = batch_proxy.get_results().at(idx);
const int res_ret = return_code_array.at(idx);
if (OB_SUCCESS != res_ret) {
ret = res_ret;
LOG_WARN("rpc execute failed", KR(ret), K(idx));
} else if (OB_ISNULL(response)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("response is null", K(ret));
} else if (!response->backfill_finished_) {
all_backfilled = false;
break;
}
}
}
return ret;
}
@ -975,36 +1010,6 @@ int ObTxFinishTransfer::fetch_ls_replay_scn_(const ObTransferTaskID &task_id, co
return ret;
}
int ObTxFinishTransfer::post_check_logical_table_replaced_request_(const int64_t cluster_id,
const common::ObAddr &server_addr, const uint64_t tenant_id, const share::ObLSID &dest_ls_id,
const common::ObIArray<share::ObTransferTabletInfo> &tablet_list, bool &replace_finished)
{
int ret = OB_SUCCESS;
replace_finished = false;
ObLSService *ls_service = NULL;
storage::ObStorageRpc *storage_rpc = NULL;
storage::ObStorageHASrcInfo src_info;
src_info.src_addr_ = server_addr;
src_info.cluster_id_ = GCONF.cluster_id;
if (!src_info.is_valid() || !dest_ls_id.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get invalid args", K(ret), K(src_info), K(dest_ls_id));
} else if (OB_ISNULL(ls_service = MTL_WITH_CHECK_TENANT(ObLSService *, tenant_id))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("log stream service is NULL", K(ret));
} else if (OB_ISNULL(storage_rpc = ls_service->get_storage_rpc())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("storage rpc proxy is NULL", K(ret));
} else if (OB_FAIL(storage_rpc->check_tablets_logical_table_replaced(
tenant_id, src_info, dest_ls_id, tablet_list, replace_finished))) {
LOG_WARN(
"failed to post transfer backfill finished", K(ret), K(tenant_id), K(src_info), K(dest_ls_id), K(tablet_list));
} else {
LOG_INFO("check logical table replaced completed", K(tenant_id), K(src_info), K(dest_ls_id), K(replace_finished));
}
return ret;
}
int ObTxFinishTransfer::check_self_ls_leader_(const share::ObLSID &ls_id, bool &is_leader)
{
int ret = OB_SUCCESS;

View File

@ -23,6 +23,7 @@
#include "lib/lock/ob_thread_cond.h"
#include "share/transfer/ob_transfer_info.h"
#include "share/ob_balance_define.h"
#include "storage/ob_storage_async_rpc.h"
namespace oceanbase {
namespace storage {
@ -211,16 +212,6 @@ private:
int fetch_ls_replay_scn_(const share::ObTransferTaskID &task_id, const int64_t cluster_id,
const common::ObAddr &server_addr, const uint64_t tenant_id, const share::ObLSID &ls_id, share::SCN &finish_scn);
// post check if logical sstable has been replaced
// @param[in]: cluster_id
// @param[in]: server_addr
// @param[in]: tenant_id
// @param[in]: ls_id
// @param[out] backfill completed
int post_check_logical_table_replaced_request_(const int64_t cluster_id, const common::ObAddr &server_addr,
const uint64_t tenant_id, const share::ObLSID &dest_ls_id,
const common::ObIArray<share::ObTransferTabletInfo> &tablet_list, bool &backfill_completed);
// check self is leader
// @param[in]: ls_id
// @param[out]: is_leader

View File

@ -17,6 +17,7 @@
#include "storage/high_availability/ob_storage_ha_src_provider.h"
#include "storage/meta_mem/ob_tenant_meta_mem_mgr.h"
#include "storage/tablet/ob_tablet_iterator.h"
#include "storage/tablet/ob_tablet.h"
namespace oceanbase
{
@ -237,45 +238,69 @@ int ObLSMemberListService::get_leader_config_version_and_transfer_scn_(
share::SCN &leader_transfer_scn)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
ObLSService *ls_svr = NULL;
common::ObAddr addr;
const bool need_get_config_version = true;
storage::ObHAChangeMemberProxy proxy(
*(GCTX.storage_rpc_proxy_), &obrpc::ObStorageRpcProxy::get_config_version_and_transfer_scn);
if (OB_ISNULL(ls_svr = (MTL(ObLSService *)))) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "ls service should not be NULL", K(ret), KP(ls_svr));
} else if (OB_FAIL(ObStorageHAUtils::get_ls_leader(ls_->get_tenant_id(), ls_->get_ls_id(), addr))) {
STORAGE_LOG(WARN, "failed to get ls leader", K(ret), KPC(ls_));
} else if (OB_FAIL(get_config_version_and_transfer_scn_(need_get_config_version, addr, leader_config_version, leader_transfer_scn))) {
} else if (OB_FAIL(get_config_version_and_transfer_scn_(proxy,
addr,
need_get_config_version,
ls_->get_tenant_id(),
ls_->get_ls_id()))) {
STORAGE_LOG(WARN, "failed to get config version and transfer scn", K(ret), K(addr));
}
ObArray<int> return_code_array;
if (OB_TMP_FAIL(proxy.wait_all(return_code_array))) {
STORAGE_LOG(WARN, "fail to wait all batch result", KR(ret), KR(tmp_ret));
ret = OB_SUCC(ret) ? tmp_ret : ret;
}
int64_t check_pass_count = 0;
if (OB_FAIL(ret)) {
} else if (1 != return_code_array.count()) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "cnt not match", KR(ret),
"return_cnt", return_code_array.count());
} else if (OB_FAIL(process_result_from_async_rpc_(proxy,
addr,
return_code_array,
false/*for_standby*/,
check_pass_count,
leader_config_version,
leader_transfer_scn))) {
STORAGE_LOG(WARN, "failed to process result from async rpc", KR(ret), KR(tmp_ret));
}
return ret;
}
int ObLSMemberListService::get_config_version_and_transfer_scn_(
const bool need_get_config_version,
ObHAChangeMemberProxy &proxy,
const common::ObAddr &addr,
palf::LogConfigVersion &config_version,
share::SCN &transfer_scn)
const bool need_get_config_version,
const uint64_t tenant_id,
const share::ObLSID &ls_id)
{
int ret = OB_SUCCESS;
ObLSService *ls_svr = NULL;
ObStorageRpc *storage_rpc = NULL;
ObStorageHASrcInfo src_info;
src_info.cluster_id_ = GCONF.cluster_id;
src_info.src_addr_ = addr;
if (OB_ISNULL(ls_svr = (MTL(ObLSService *)))) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "ls service should not be NULL", K(ret), KP(ls_svr));
} else if (OB_ISNULL(storage_rpc = ls_svr->get_storage_rpc())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "storage rpc should not be NULL", K(ret), KP(storage_rpc));
} else if (OB_FAIL(storage_rpc->get_config_version_and_transfer_scn(ls_->get_tenant_id(),
src_info,
ls_->get_ls_id(),
need_get_config_version,
config_version,
transfer_scn))) {
STORAGE_LOG(WARN, "failed to get config version and transfer scn", K(ret), KPC(ls_));
ObStorageChangeMemberArg arg;
arg.tenant_id_ = tenant_id;
arg.ls_id_ = ls_id;
arg.need_get_config_version_ = need_get_config_version;
const int64_t cluster_id = GCONF.cluster_id;
const int64_t timeout = GCONF.sys_bkgd_migration_change_member_list_timeout;
const uint64_t group_id = share::OBCG_STORAGE_HA_LEVEL2;
if (OB_FAIL(proxy.call(addr,
timeout,
cluster_id,
tenant_id,
group_id,
arg))) {
STORAGE_LOG(WARN, "failed to call get config version and transfer scn", K(ret), K(addr), K(timeout), K(tenant_id), K(arg));
}
return ret;
}
@ -335,6 +360,10 @@ int ObLSMemberListService::check_ls_transfer_scn_validity_(palf::LogConfigVersio
STORAGE_LOG(WARN, "failed to check ls transfer scn validity for primary", K(ret), KP_(ls));
}
} else {//standby restore
SERVER_EVENT_SYNC_ADD("storage_ha", "before_check_ls_transfer_scn_validity_for_standby",
"tenant_id", ls_->get_tenant_id(),
"ls_id", ls_->get_ls_id().id());
DEBUG_SYNC(BEFORE_CHECK_LS_TRANSFER_SCN_FOR_STANDBY);
if (OB_FAIL(check_ls_transfer_scn_validity_for_standby_(leader_config_version))) {
STORAGE_LOG(WARN, "failed to check ls transfer scn validity for standby", K(ret), KP_(ls));
}
@ -374,33 +403,57 @@ int ObLSMemberListService::check_ls_transfer_scn_validity_for_standby_(palf::Log
STORAGE_LOG(WARN, "failed to get ls leader", K(ret), KPC(ls_));
} else {
int64_t check_pass_count = 0;
storage::ObHAChangeMemberProxy batch_proxy(
*(GCTX.storage_rpc_proxy_), &obrpc::ObStorageRpcProxy::get_config_version_and_transfer_scn);
for (int64_t i = 0; OB_SUCC(ret) && i < addr_list.count(); ++i) {
const ObAddr &addr = addr_list.at(i);
bool check_pass = false;
share::SCN transfer_scn;
palf::LogConfigVersion config_version;
bool need_get_config_version = (addr == leader_addr);
if (OB_TMP_FAIL(get_config_version_and_transfer_scn_(need_get_config_version, addr, config_version, transfer_scn))) {
if (OB_FAIL(get_config_version_and_transfer_scn_(batch_proxy,
addr,
need_get_config_version,
ls_->get_tenant_id(),
ls_->get_ls_id()))) {
STORAGE_LOG(WARN, "failed to get config version and transfer scn", K(ret), K(addr));
} else if (OB_FAIL(check_ls_transfer_scn_(transfer_scn, check_pass))) {
STORAGE_LOG(WARN, "failed to check ls transfer scn", K(ret), K(transfer_scn));
} else {
if (addr == leader_addr) {
if (!config_version.is_valid()) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "config version is not valid", K(ret), K(config_version));
} else {
leader_config_version = config_version;
}
}
check_pass_count++;
}
}
ObArray<int> return_code_array;
if (OB_TMP_FAIL(batch_proxy.wait_all(return_code_array))) {
STORAGE_LOG(WARN, "fail to wait all batch result", KR(ret), KR(tmp_ret));
ret = OB_SUCC(ret) ? tmp_ret : ret;
}
share::SCN leader_transfer_scn;
if (OB_FAIL(ret)) {
} else if (return_code_array.count() != addr_list.count()
|| return_code_array.count() != batch_proxy.get_results().count()) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "cnt not match", KR(ret),
"return_cnt", return_code_array.count(),
"result_cnt", batch_proxy.get_results().count(),
"server_cnt", addr_list.count());
} else if (OB_FAIL(process_result_from_async_rpc_(batch_proxy,
leader_addr,
return_code_array,
true/*for_standby*/,
check_pass_count,
leader_config_version,
leader_transfer_scn))) {
STORAGE_LOG(WARN, "failed to process result from async rpc", KR(ret), KR(tmp_ret));
}
if (OB_SUCC(ret)) {
// standby check transfer scn need reach majority
if (check_pass_count < (addr_list.count() / 2 + 1)) {
ret = OB_LS_TRANSFER_SCN_TOO_SMALL;
STORAGE_LOG(WARN, "transfer scn compare do not reach majority", K(ret), K(addr_list));
#ifdef ERRSIM
SERVER_EVENT_ADD("storage_ha", "standby_check_transfer_scn_too_small",
"tenant_id", ls_->get_tenant_id(),
"ls_id", ls_->get_ls_id().id(),
"member_list_count", addr_list.count(),
"check_pass_count", check_pass_count);
#endif
} else {
STORAGE_LOG(INFO, "passed transfer scn check for standby", K(ret), K(addr_list), K(check_pass_count));
}
@ -409,5 +462,48 @@ int ObLSMemberListService::check_ls_transfer_scn_validity_for_standby_(palf::Log
return ret;
}
int ObLSMemberListService::process_result_from_async_rpc_(
ObHAChangeMemberProxy &proxy,
const common::ObAddr &leader_addr,
const common::ObIArray<int> &return_code_array,
const bool for_standby,
int64_t &pass_count,
palf::LogConfigVersion &leader_config_version,
share::SCN &leader_transfer_scn)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
ARRAY_FOREACH_X(proxy.get_results(), idx, cnt, OB_SUCC(ret)) {
const ObStorageChangeMemberRes *response = proxy.get_results().at(idx);
bool check_pass = false;
if (OB_ISNULL(response)) {
tmp_ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "hb_response is null", KR(ret), KR(tmp_ret));
} else if (for_standby && OB_FAIL(check_ls_transfer_scn_(response->transfer_scn_, check_pass))) {
STORAGE_LOG(WARN, "failed to check ls transfer scn", K(ret));
} else if (for_standby && !check_pass) {
continue;
} else {
const palf::LogConfigVersion &config_version = response->config_version_;
const ObAddr &addr = proxy.get_dests().at(idx);
if (addr == leader_addr) {
if (!config_version.is_valid()) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "config version is not valid", K(ret), K(config_version));
} else {
leader_config_version = config_version;
leader_transfer_scn = response->transfer_scn_;
}
}
if (OB_SUCC(ret)) {
pass_count++;
}
}
}
return ret;
}
}
}

View File

@ -15,6 +15,7 @@
#include "logservice/ob_log_handler.h"
#include "common/ob_member.h"
#include "storage/ob_storage_async_rpc.h"
namespace oceanbase
{
@ -53,15 +54,27 @@ private:
palf::LogConfigVersion &leader_config_version,
share::SCN &leader_transfer_scn);
int get_config_version_and_transfer_scn_(
const bool need_get_config_version,
ObHAChangeMemberProxy &proxy,
const common::ObAddr &addr,
palf::LogConfigVersion &config_version,
share::SCN &transfer_scn);
const bool need_get_config_version,
const uint64_t tenant_id,
const share::ObLSID &ls_id);
int check_ls_transfer_scn_(const share::SCN &transfer_scn, bool &is_match);
int get_ls_member_list_(common::ObIArray<common::ObAddr> &addr_list);
int check_ls_transfer_scn_validity_(palf::LogConfigVersion &leader_config_version);
int check_ls_transfer_scn_validity_for_primary_(palf::LogConfigVersion &leader_config_version);
int check_ls_transfer_scn_validity_for_standby_(palf::LogConfigVersion &leader_config_version);
private:
int process_result_from_async_rpc_(
ObHAChangeMemberProxy &proxy,
const common::ObAddr &leader_addr,
const common::ObIArray<int> &return_code_array,
const bool for_standby,
int64_t &pass_count,
palf::LogConfigVersion &leader_config_version,
share::SCN &leader_transfer_scn);
private:
bool is_inited_;
storage::ObLS *ls_;

View File

@ -817,14 +817,46 @@ int ObTransferHandler::check_start_status_transfer_tablets_(
} else if (OB_FAIL(member_list.get_addr_array(member_addr_list))) {
LOG_WARN("failed to get addr array", K(ret), K(task_info), K(member_list));
} else {
storage::ObCheckStartTransferTabletsProxy batch_proxy(
*(GCTX.storage_rpc_proxy_), &obrpc::ObStorageRpcProxy::check_start_transfer_tablets);
for (int64_t i = 0; OB_SUCC(ret) && i < member_addr_list.count(); ++i) {
const ObAddr &addr = member_addr_list.at(i);
ObStorageHASrcInfo src_info;
src_info.src_addr_ = addr;
src_info.cluster_id_ = cluster_id;
if (OB_FAIL(storage_rpc_->check_start_transfer_tablets(task_info.tenant_id_,
src_info, task_info.src_ls_id_, task_info.dest_ls_id_, task_info.tablet_list_))) {
LOG_WARN("failed to check src transfer tablets", K(ret), K(task_info), K(src_info));
ObTransferTabletInfoArg arg;
arg.tenant_id_ = task_info.tenant_id_;
arg.src_ls_id_ = task_info.src_ls_id_;
arg.dest_ls_id_ = task_info.dest_ls_id_;
const int64_t timeout = GCONF.rpc_timeout;
const int64_t cluster_id = GCONF.cluster_id;
const uint64_t group_id = share::OBCG_STORAGE_HA_LEVEL2;
if (OB_FAIL(arg.tablet_list_.assign(task_info.tablet_list_))) {
LOG_WARN("failed to assign tablet list", K(ret), K(task_info));
} else if (OB_FAIL(batch_proxy.call(addr,
timeout,
cluster_id,
task_info.tenant_id_,
group_id,
arg))) {
STORAGE_LOG(WARN, "failed to call check start transfer tablets", K(ret), K(addr), K(task_info), K(arg));
}
}
ObArray<int> return_code_array;
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(batch_proxy.wait_all(return_code_array))) {
STORAGE_LOG(WARN, "fail to wait all batch result", KR(ret), KR(tmp_ret));
ret = OB_SUCC(ret) ? tmp_ret : ret;
}
if (OB_FAIL(ret)) {
} else if (return_code_array.count() != member_addr_list.count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("cnt not match", KR(ret),
"return_cnt", return_code_array.count(),
"server_cnt", member_addr_list.count());
}
for (int64_t i = 0; OB_SUCC(ret) && i < return_code_array.count(); ++i) {
const int res_ret = return_code_array.at(i);
if (OB_SUCCESS != res_ret) {
ret = res_ret;
LOG_WARN("rpc execute failed", KR(ret), K(i));
}
}
}

View File

@ -0,0 +1,39 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_STORAGE_STORAGE_ASYNC_RPC_H_
#define OCEANBASE_STORAGE_STORAGE_ASYNC_RPC_H_
#include "share/ob_rpc_struct.h"
#include "share/ob_srv_rpc_proxy.h"
#include "share/rpc/ob_async_rpc_proxy.h"
#include "rpc/obrpc/ob_rpc_packet.h"
#include "rpc/obrpc/ob_rpc_result_code.h"
#include "rpc/obrpc/ob_rpc_proxy.h"
namespace oceanbase
{
namespace storage
{
#define RPC_HA(code, arg, result, name) \
typedef obrpc::ObAsyncRpcProxy<code, arg, result, \
int (obrpc::ObStorageRpcProxy::*)(const arg &, obrpc::ObStorageRpcProxy::AsyncCB<code> *, const obrpc::ObRpcOpts &), obrpc::ObStorageRpcProxy> name
RPC_HA(obrpc::OB_HA_CHECK_TRANSFER_TABLET_BACKFILL, obrpc::ObCheckTransferTabletBackfillArg, obrpc::ObCheckTransferTabletBackfillRes, ObCheckTransferTabletBackfillProxy);
RPC_HA(obrpc::OB_HA_CHANGE_MEMBER_SERVICE, obrpc::ObStorageChangeMemberArg, obrpc::ObStorageChangeMemberRes, ObHAChangeMemberProxy);
RPC_HA(obrpc::OB_CHECK_START_TRANSFER_TABLETS, obrpc::ObTransferTabletInfoArg, obrpc::Int64, ObCheckStartTransferTabletsProxy);
}//end namespace storage
}//end namespace oceanbase
#endif //OCEANBASE_STORAGE_STORAGE_ASYNC_RPC_H_

View File

@ -883,7 +883,6 @@ bool ObGetTransferStartScnRes::is_valid() const
OB_SERIALIZE_MEMBER(ObGetTransferStartScnRes, start_scn_);
ObTransferTabletInfoArg::ObTransferTabletInfoArg()
: tenant_id_(OB_INVALID_ID),
src_ls_id_(),
@ -900,6 +899,22 @@ void ObTransferTabletInfoArg::reset()
tablet_list_.reset();
}
int ObTransferTabletInfoArg::assign(const ObTransferTabletInfoArg &other)
{
int ret = OB_SUCCESS;
if (!other.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get invalid arg", K(ret), K(other));
} else if (OB_FAIL(tablet_list_.assign(other.tablet_list_))) {
LOG_WARN("failed to assign tablet list", K(ret), K(other));
} else {
tenant_id_ = other.tenant_id_;
src_ls_id_ = other.src_ls_id_;
dest_ls_id_ = other.dest_ls_id_;
}
return ret;
}
bool ObTransferTabletInfoArg::is_valid() const
{
return OB_INVALID_ID != tenant_id_
@ -939,35 +954,59 @@ bool ObFetchLSReplayScnRes::is_valid() const
return replay_scn_.is_valid();
}
OB_SERIALIZE_MEMBER(ObFetchLSReplayScnRes, replay_scn_);
ObCheckTransferTabletBackfillArg::ObCheckTransferTabletBackfillArg()
: tenant_id_(OB_INVALID_ID),
ls_id_(),
tablet_list_()
{
}
bool ObCheckTransferTabletBackfillArg::is_valid() const
{
return OB_INVALID_ID != tenant_id_
&& ls_id_.is_valid()
&& !tablet_list_.empty();
}
void ObCheckTransferTabletBackfillArg::reset()
{
tenant_id_ = OB_INVALID_ID;
ls_id_.reset();
tablet_list_.reset();
}
int ObCheckTransferTabletBackfillArg::assign(const ObCheckTransferTabletBackfillArg &other)
{
int ret = OB_SUCCESS;
if (!other.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get invalid arg", K(ret), K(other));
} else if (OB_FAIL(tablet_list_.assign(other.tablet_list_))) {
LOG_WARN("failed to assign tablet list", K(ret), K(other));
} else {
tenant_id_ = other.tenant_id_;
ls_id_ = other.ls_id_;
}
return ret;
}
OB_SERIALIZE_MEMBER(ObCheckTransferTabletBackfillArg, tenant_id_, ls_id_, tablet_list_);
ObCheckTransferTabletBackfillRes::ObCheckTransferTabletBackfillRes()
: backfill_finished_(false)
{
}
void ObCheckTransferTabletBackfillRes::reset()
{
backfill_finished_ = false;
}
OB_SERIALIZE_MEMBER(ObCheckTransferTabletBackfillRes, backfill_finished_);
ObStorageChangeMemberArg::ObStorageChangeMemberArg()
: tenant_id_(OB_INVALID_ID),
ls_id_(),
@ -977,7 +1016,7 @@ ObStorageChangeMemberArg::ObStorageChangeMemberArg()
bool ObStorageChangeMemberArg::is_valid() const
{
return OB_INVALID_ID == tenant_id_
return OB_INVALID_ID != tenant_id_
&& ls_id_.is_valid();
}
@ -987,6 +1026,20 @@ void ObStorageChangeMemberArg::reset()
ls_id_.reset();
}
int ObStorageChangeMemberArg::assign(const ObStorageChangeMemberArg &other)
{
int ret = OB_SUCCESS;
if (!other.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get invalid arg", K(ret), K(other));
} else {
tenant_id_ = other.tenant_id_;
ls_id_ = other.ls_id_;
need_get_config_version_ = other.need_get_config_version_;
}
return ret;
}
OB_SERIALIZE_MEMBER(ObStorageChangeMemberArg, tenant_id_, ls_id_, need_get_config_version_);
ObStorageChangeMemberRes::ObStorageChangeMemberRes()
@ -2013,13 +2066,29 @@ int ObFetchSSTableMacroInfoP::fetch_sstable_macro_range_info_(const obrpc::ObCop
return ret;
}
ObCheckStartTransferTabletsP::ObCheckStartTransferTabletsP(
common::ObInOutBandwidthThrottle *bandwidth_throttle)
: ObStorageStreamRpcP(bandwidth_throttle)
ObCheckStartTransferTabletsDelegate::ObCheckStartTransferTabletsDelegate()
: is_inited_(false),
arg_()
{}
int ObCheckStartTransferTabletsDelegate::init(const obrpc::ObTransferTabletInfoArg &arg)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("check start transfer tablets delegate init twice", K(ret));
} else if (!arg.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get invalid arg", K(ret), K(arg));
} else if (OB_FAIL(arg_.assign(arg))) {
LOG_WARN("failed to assign arg", K(ret), K(arg));
} else {
is_inited_ = true;
}
return ret;
}
int ObCheckStartTransferTabletsP::process()
int ObCheckStartTransferTabletsDelegate::process()
{
int ret = OB_SUCCESS;
MTL_SWITCH(arg_.tenant_id_) {
@ -2029,7 +2098,6 @@ int ObCheckStartTransferTabletsP::process()
ObDeviceHealthStatus dhs = DEVICE_HEALTH_NORMAL;
int64_t disk_abnormal_time = 0;
ObMigrationStatus migration_status = ObMigrationStatus::OB_MIGRATION_STATUS_MAX;
#ifdef ERRSIM
if (OB_SUCC(ret) && DEVICE_HEALTH_NORMAL == dhs && GCONF.fake_disk_error) {
dhs = DEVICE_HEALTH_ERROR;
@ -2058,7 +2126,7 @@ int ObCheckStartTransferTabletsP::process()
// In addition to the tablet in the recovery process, if the major sstable does not exist on the tablet, the transfer start will fail.
// For tablets with ddl sstable, you need to wait for ddl merge to complete
int ObCheckStartTransferTabletsP::check_transfer_out_tablet_sstable_(const ObTablet *tablet)
int ObCheckStartTransferTabletsDelegate::check_transfer_out_tablet_sstable_(const ObTablet *tablet)
{
int ret = OB_SUCCESS;
ObTableStoreIterator ddl_iter;
@ -2082,14 +2150,13 @@ int ObCheckStartTransferTabletsP::check_transfer_out_tablet_sstable_(const ObTab
return ret;
}
int ObCheckStartTransferTabletsP::check_start_transfer_out_tablets_()
int ObCheckStartTransferTabletsDelegate::check_start_transfer_out_tablets_()
{
int ret = OB_SUCCESS;
ObLSHandle ls_handle;
ObLSService *ls_service = nullptr;
ObLS *ls = nullptr;
ObMigrationStatus migration_status = ObMigrationStatus::OB_MIGRATION_STATUS_MAX;
if (OB_ISNULL(ls_service = MTL(ObLSService *))) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "ls service should not be null", K(ret), KP(ls_service));
@ -2138,14 +2205,13 @@ int ObCheckStartTransferTabletsP::check_start_transfer_out_tablets_()
return ret;
}
int ObCheckStartTransferTabletsP::check_start_transfer_in_tablets_()
int ObCheckStartTransferTabletsDelegate::check_start_transfer_in_tablets_()
{
int ret = OB_SUCCESS;
ObLSHandle ls_handle;
ObLSService *ls_service = nullptr;
ObLS *ls = nullptr;
ObMigrationStatus migration_status = ObMigrationStatus::OB_MIGRATION_STATUS_MAX;
if (OB_ISNULL(ls_service = MTL(ObLSService *))) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "ls service should not be null", K(ret), KP(ls_service));
@ -2668,12 +2734,32 @@ int ObFetchLSReplayScnP::process()
}
return ret;
}
ObCheckTransferTabletsBackfillP::ObCheckTransferTabletsBackfillP(
common::ObInOutBandwidthThrottle *bandwidth_throttle)
: ObStorageStreamRpcP(bandwidth_throttle)
ObCheckTransferTabletsBackfillDelegate::ObCheckTransferTabletsBackfillDelegate(obrpc::ObCheckTransferTabletBackfillRes &result)
: is_inited_(false),
arg_(),
result_(result)
{}
int ObCheckTransferTabletsBackfillDelegate::init(
const obrpc::ObCheckTransferTabletBackfillArg &arg)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("check start transfer backfill delegate init twice", K(ret));
} else if (!arg.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get invalid arg", K(ret), K(arg));
} else if (OB_FAIL(arg_.assign(arg))) {
LOG_WARN("failed to assign arg", K(ret), K(arg));
} else {
is_inited_ = true;
}
return ret;
}
int ObCheckTransferTabletsBackfillP::process()
int ObCheckTransferTabletsBackfillDelegate::process()
{
int ret = OB_SUCCESS;
MTL_SWITCH(arg_.tenant_id_) {
@ -2682,11 +2768,7 @@ int ObCheckTransferTabletsBackfillP::process()
ObLS *ls = NULL;
ObTransferService *transfer_service = NULL;
LOG_INFO("check transfer tablet", K(arg_));
if (OB_ISNULL(bandwidth_throttle_)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR, "bandwidth_throttle_ must not null", K(ret),
KP_(bandwidth_throttle));
} else if (OB_ISNULL(ls_service = MTL(ObLSService *))) {
if (OB_ISNULL(ls_service = MTL(ObLSService *))) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "ls service should not be null", K(ret), KP(ls_service));
} else if (OB_FAIL(ls_service->get_ls(arg_.ls_id_, ls_handle, ObLSGetMod::STORAGE_MOD))) {
@ -2721,7 +2803,44 @@ int ObCheckTransferTabletsBackfillP::process()
}
return ret;
}
int ObCheckTransferTabletsBackfillP::check_has_transfer_table_(
int ObCheckTransferTabletsBackfillP::process()
{
int ret = OB_SUCCESS;
ObCheckTransferTabletsBackfillDelegate delegate(result_);
if (OB_FAIL(delegate.init(arg_))) {
LOG_WARN("failed to init delegate", K(ret));
} else if (OB_FAIL(delegate.process())) {
LOG_WARN("failed to do process", K(ret), K_(arg));
}
return ret;
}
int ObStorageGetConfigVersionAndTransferScnP::process()
{
int ret = OB_SUCCESS;
ObStorageGetConfigVersionAndTransferScnDelegate delegate(result_);
if (OB_FAIL(delegate.init(arg_))) {
LOG_WARN("failed to init delegate", K(ret));
} else if (OB_FAIL(delegate.process())) {
LOG_WARN("failed to do process", K(ret), K_(arg));
}
return ret;
}
int ObCheckStartTransferTabletsP::process()
{
int ret = OB_SUCCESS;
ObCheckStartTransferTabletsDelegate delegate;
if (OB_FAIL(delegate.init(arg_))) {
LOG_WARN("failed to init delegate", K(ret));
} else if (OB_FAIL(delegate.process())) {
LOG_WARN("failed to do process", K(ret), K_(arg));
}
return ret;
}
int ObCheckTransferTabletsBackfillDelegate::check_has_transfer_table_(
const ObTransferTabletInfo &tablet_info, storage::ObLS *ls, bool &has_transfer_table)
{
int ret = OB_SUCCESS;
@ -2746,13 +2865,31 @@ int ObCheckTransferTabletsBackfillP::check_has_transfer_table_(
return ret;
}
ObStorageGetConfigVersionAndTransferScnP::ObStorageGetConfigVersionAndTransferScnP(
common::ObInOutBandwidthThrottle *bandwidth_throttle)
: ObStorageStreamRpcP(bandwidth_throttle)
ObStorageGetConfigVersionAndTransferScnDelegate::ObStorageGetConfigVersionAndTransferScnDelegate(obrpc::ObStorageChangeMemberRes &result)
: is_inited_(false),
arg_(),
result_(result)
{
}
int ObStorageGetConfigVersionAndTransferScnP::process()
int ObStorageGetConfigVersionAndTransferScnDelegate::init(const obrpc::ObStorageChangeMemberArg &arg)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("check start transfer backfill delegate init twice", K(ret));
} else if (!arg.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get invalid arg", K(ret), K(arg));
} else if (OB_FAIL(arg_.assign(arg))) {
LOG_WARN("failed to assign arg", K(ret), K(arg));
} else {
is_inited_ = true;
}
return ret;
}
int ObStorageGetConfigVersionAndTransferScnDelegate::process()
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = arg_.tenant_id_;
@ -2779,6 +2916,7 @@ int ObStorageGetConfigVersionAndTransferScnP::process()
LOG_INFO("get config version and transfer scn succ", K(tenant_id), K(ls_id), K(result_));
}
}
return ret;
}
@ -3273,38 +3411,6 @@ int ObStorageRpc::update_ls_meta(
return ret;
}
int ObStorageRpc::check_start_transfer_tablets(
const uint64_t tenant_id,
const ObStorageHASrcInfo &src_info,
const share::ObLSID &src_ls_id,
const share::ObLSID &dest_ls_id,
const common::ObIArray<share::ObTransferTabletInfo> &tablet_array)
{
int ret = OB_SUCCESS;
if (!is_inited_) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "storage rpc is not inited", K(ret));
} else if (tenant_id == OB_INVALID_ID || !src_info.is_valid() || !src_ls_id.is_valid() || !dest_ls_id.is_valid() || tablet_array.empty()) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(src_info), K(src_ls_id), K(dest_ls_id), K(tablet_array));
} else {
ObTransferTabletInfoArg arg;
arg.tenant_id_ = tenant_id;
arg.src_ls_id_ = src_ls_id;
arg.dest_ls_id_ = dest_ls_id;
if (OB_FAIL(arg.tablet_list_.assign(tablet_array))) {
LOG_WARN("failed to assign tablet list", K(ret), K(tablet_array));
} else if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_)
.by(tenant_id)
.dst_cluster_id(src_info.cluster_id_)
.group_id(share::OBCG_STORAGE_HA_LEVEL2)
.check_start_transfer_tablets(arg))) {
LOG_WARN("failed to check src transfer tablets", K(ret), K(src_info), K(arg));
}
}
return ret;
}
int ObStorageRpc::get_ls_active_trans_count(
const uint64_t tenant_id,
const ObStorageHASrcInfo &src_info,
@ -3410,79 +3516,6 @@ int ObStorageRpc::fetch_ls_replay_scn(
}
return ret;
}
int ObStorageRpc::check_tablets_logical_table_replaced(
const uint64_t tenant_id,
const ObStorageHASrcInfo &src_info,
const share::ObLSID &dest_ls_id,
const common::ObIArray<share::ObTransferTabletInfo>& tablet_array,
bool &backfill_finished)
{
int ret = OB_SUCCESS;
backfill_finished = false;
if (!is_inited_) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "storage rpc is not inited", K(ret));
} else if (tenant_id == OB_INVALID_ID || !src_info.is_valid() || !dest_ls_id.is_valid()) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(src_info), K(dest_ls_id));
} else {
ObCheckTransferTabletBackfillArg arg;
ObCheckTransferTabletBackfillRes res;
arg.tenant_id_ = tenant_id;
arg.ls_id_ = dest_ls_id;
if (OB_FAIL(arg.tablet_list_.assign(tablet_array))) {
LOG_WARN("failed to assign tablet array", K(ret), K(tablet_array));
} else if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_)
.by(tenant_id)
.dst_cluster_id(src_info.cluster_id_)
.group_id(share::OBCG_STORAGE_HA_LEVEL2)
.check_transfer_tablet_backfill_completed(arg, res))) {
LOG_WARN("failed to check tablets backfill completed", K(ret), K(src_info), K(arg));
} else {
backfill_finished = res.backfill_finished_;
}
}
return ret;
}
int ObStorageRpc::get_config_version_and_transfer_scn(
const uint64_t tenant_id,
const ObStorageHASrcInfo &src_info,
const share::ObLSID &ls_id,
const bool need_get_config_version,
palf::LogConfigVersion &config_version,
share::SCN &transfer_scn)
{
int ret = OB_SUCCESS;
config_version.reset();
transfer_scn.reset();
if (!is_inited_) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "storage rpc is not inited", K(ret));
} else if (tenant_id == OB_INVALID_ID || !src_info.is_valid() || !ls_id.is_valid()) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(src_info), K(ls_id));
} else {
ObStorageChangeMemberArg arg;
ObStorageChangeMemberRes res;
arg.tenant_id_ = tenant_id;
arg.ls_id_ = ls_id;
arg.need_get_config_version_ = need_get_config_version;
const int64_t timeout = GCONF.sys_bkgd_migration_change_member_list_timeout;
if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_)
.by(tenant_id)
.timeout(timeout)
.dst_cluster_id(src_info.cluster_id_)
.get_config_version_and_transfer_scn(arg, res))) {
LOG_WARN("failed to get config version and transfer scn", K(ret), K(src_info), K(arg));
} else {
config_version = res.config_version_;
transfer_scn = res.transfer_scn_;
FLOG_INFO("get config version and transfer scn succ", K(tenant_id), K(src_info), K(ls_id));
}
}
return ret;
}
int ObStorageRpc::lock_config_change(
const uint64_t tenant_id,

View File

@ -19,6 +19,7 @@
#include "rpc/obrpc/ob_rpc_proxy.h"
#include "rpc/obrpc/ob_rpc_processor.h"
#include "rpc/obrpc/ob_rpc_result_code.h"
#include "share/rpc/ob_async_rpc_proxy.h"
#include "common/ob_member.h"
#include "storage/ob_storage_struct.h"
#include "observer/ob_server_struct.h"
@ -31,6 +32,7 @@
#include "share/transfer/ob_transfer_info.h"
#include "storage/lob/ob_lob_rpc_struct.h"
#include "storage/blocksstable/ob_logic_macro_id.h"
#include "share/rpc/ob_async_rpc_proxy.h"
namespace oceanbase
{
@ -537,6 +539,7 @@ public:
ObTransferTabletInfoArg();
~ObTransferTabletInfoArg() {}
bool is_valid() const;
int assign(const ObTransferTabletInfoArg &other);
void reset();
TO_STRING_KV(K_(tenant_id), K_(src_ls_id), K_(dest_ls_id), K_(tablet_list));
@ -581,6 +584,7 @@ public:
~ObCheckTransferTabletBackfillArg() {}
bool is_valid() const;
void reset();
int assign(const ObCheckTransferTabletBackfillArg &other);
TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(tablet_list));
uint64_t tenant_id_;
share::ObLSID ls_id_;
@ -608,6 +612,7 @@ public:
~ObStorageChangeMemberArg() {}
bool is_valid() const;
void reset();
int assign(const ObStorageChangeMemberArg &other);
TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(need_get_config_version));
uint64_t tenant_id_;
share::ObLSID ls_id_;
@ -746,17 +751,19 @@ public:
RPC_S(PR5 notify_restore_tablets, OB_HA_NOTIFY_RESTORE_TABLETS, (ObNotifyRestoreTabletsArg), ObNotifyRestoreTabletsResp);
RPC_S(PR5 inquire_restore, OB_HA_NOTIFY_FOLLOWER_RESTORE, (ObInquireRestoreArg), ObInquireRestoreResp);
RPC_S(PR5 update_ls_meta, OB_HA_UPDATE_LS_META, (ObRestoreUpdateLSMetaArg));
RPC_S(PR5 check_start_transfer_tablets, OB_CHECK_START_TRANSFER_TABLETS, (ObTransferTabletInfoArg));
RPC_S(PR5 get_ls_active_trans_count, OB_GET_LS_ACTIVE_TRANSACTION_COUNT, (ObGetLSActiveTransCountArg), ObGetLSActiveTransCountRes);
RPC_S(PR5 get_transfer_start_scn, OB_GET_TRANSFER_START_SCN, (ObGetTransferStartScnArg), ObGetTransferStartScnRes);
RPC_S(PR5 fetch_ls_replay_scn, OB_HA_FETCH_LS_REPLAY_SCN, (ObFetchLSReplayScnArg), ObFetchLSReplayScnRes);
RPC_S(PR5 check_transfer_tablet_backfill_completed, OB_HA_CHECK_TRANSFER_TABLET_BACKFILL, (ObCheckTransferTabletBackfillArg), ObCheckTransferTabletBackfillRes);
RPC_S(PR5 get_config_version_and_transfer_scn, OB_HA_CHANGE_MEMBER_SERVICE, (ObStorageChangeMemberArg), ObStorageChangeMemberRes);
RPC_S(PR5 lock_config_change, OB_HA_LOCK_CONFIG_CHANGE, (ObStorageConfigChangeOpArg), ObStorageConfigChangeOpRes);
RPC_S(PR5 unlock_config_change, OB_HA_UNLOCK_CONFIG_CHANGE, (ObStorageConfigChangeOpArg), ObStorageConfigChangeOpRes);
RPC_S(PR5 get_config_change_lock_stat, OB_HA_GET_CONFIG_CHANGE_LOCK_STAT, (ObStorageConfigChangeOpArg), ObStorageConfigChangeOpRes);
RPC_S(PR5 wakeup_transfer_service, OB_HA_WAKEUP_TRANSFER_SERVICE, (ObStorageWakeupTransferServiceArg));
RPC_S(PR5 fetch_ls_member_and_learner_list, OB_HA_FETCH_LS_MEMBER_AND_LEARNER_LIST, (ObFetchLSMemberAndLearnerListArg), ObFetchLSMemberAndLearnerListInfo);
// RPC_AP stands for asynchronous RPC.
RPC_AP(PR5 check_transfer_tablet_backfill_completed, OB_HA_CHECK_TRANSFER_TABLET_BACKFILL, (obrpc::ObCheckTransferTabletBackfillArg), obrpc::ObCheckTransferTabletBackfillRes);
RPC_AP(PR5 get_config_version_and_transfer_scn, OB_HA_CHANGE_MEMBER_SERVICE, (obrpc::ObStorageChangeMemberArg), obrpc::ObStorageChangeMemberRes);
RPC_AP(PR5 check_start_transfer_tablets, OB_CHECK_START_TRANSFER_TABLETS, (obrpc::ObTransferTabletInfoArg), obrpc::Int64);
};
template <ObRpcPacketCode RPC_CODE>
@ -906,18 +913,23 @@ protected:
int process();
};
class ObCheckStartTransferTabletsP : public ObStorageStreamRpcP<OB_CHECK_START_TRANSFER_TABLETS>
class ObCheckStartTransferTabletsDelegate final
{
public:
explicit ObCheckStartTransferTabletsP(common::ObInOutBandwidthThrottle *bandwidth_throttle);
virtual ~ObCheckStartTransferTabletsP() {}
protected:
ObCheckStartTransferTabletsDelegate();
int init(const obrpc::ObTransferTabletInfoArg &arg);
int process();
private:
int check_start_transfer_out_tablets_();
int check_start_transfer_in_tablets_();
// Major sstable or ddl sstable needs to exist in src_tablet
int check_transfer_out_tablet_sstable_(const ObTablet *tablet);
private:
bool is_inited_;
obrpc::ObTransferTabletInfoArg arg_;
DISALLOW_COPY_AND_ASSIGN(ObCheckStartTransferTabletsDelegate);
};
class ObGetLSActiveTransCountP : public ObStorageStreamRpcP<OB_GET_LS_ACTIVE_TRANSACTION_COUNT>
@ -947,27 +959,65 @@ public:
protected:
int process();
};
class ObCheckTransferTabletsBackfillP:
public ObStorageStreamRpcP<OB_HA_CHECK_TRANSFER_TABLET_BACKFILL>
public ObStorageRpcProxy::Processor<OB_HA_CHECK_TRANSFER_TABLET_BACKFILL>
{
public:
explicit ObCheckTransferTabletsBackfillP(common::ObInOutBandwidthThrottle *bandwidth_throttle);
ObCheckTransferTabletsBackfillP() = default;
virtual ~ObCheckTransferTabletsBackfillP() {}
protected:
int process();
};
class ObStorageGetConfigVersionAndTransferScnP:
public ObStorageRpcProxy::Processor<OB_HA_CHANGE_MEMBER_SERVICE>
{
public:
ObStorageGetConfigVersionAndTransferScnP() = default;
virtual ~ObStorageGetConfigVersionAndTransferScnP() {}
protected:
int process();
};
class ObCheckStartTransferTabletsP:
public ObStorageRpcProxy::Processor<OB_CHECK_START_TRANSFER_TABLETS>
{
public:
ObCheckStartTransferTabletsP() = default;
virtual ~ObCheckStartTransferTabletsP() {}
protected:
int process();
};
class ObCheckTransferTabletsBackfillDelegate final
{
public:
ObCheckTransferTabletsBackfillDelegate(obrpc::ObCheckTransferTabletBackfillRes &result);
int init(const obrpc::ObCheckTransferTabletBackfillArg &arg);
int process();
private:
int check_has_transfer_table_(const share::ObTransferTabletInfo &tablet_info,
storage::ObLS *ls, bool &has_transfer_table);
private:
bool is_inited_;
obrpc::ObCheckTransferTabletBackfillArg arg_;
obrpc::ObCheckTransferTabletBackfillRes &result_;
DISALLOW_COPY_AND_ASSIGN(ObCheckTransferTabletsBackfillDelegate);
};
class ObStorageGetConfigVersionAndTransferScnP:
public ObStorageStreamRpcP<OB_HA_CHANGE_MEMBER_SERVICE>
class ObStorageGetConfigVersionAndTransferScnDelegate final
{
public:
explicit ObStorageGetConfigVersionAndTransferScnP(common::ObInOutBandwidthThrottle *bandwidth_throttle);
virtual ~ObStorageGetConfigVersionAndTransferScnP() {}
protected:
ObStorageGetConfigVersionAndTransferScnDelegate(obrpc::ObStorageChangeMemberRes &result);
int init(const obrpc::ObStorageChangeMemberArg &arg);
int process();
private:
bool is_inited_;
obrpc::ObStorageChangeMemberArg arg_;
obrpc::ObStorageChangeMemberRes &result_;
DISALLOW_COPY_AND_ASSIGN(ObStorageGetConfigVersionAndTransferScnDelegate);
};
class ObLobQueryP : public ObStorageStreamRpcP<OB_LOB_QUERY>
@ -982,7 +1032,6 @@ private:
int process_getlength();
};
// Stream get ls meta and all tablet meta
class ObStorageFetchLSViewP:
public ObStorageStreamRpcP<OB_HA_FETCH_LS_VIEW>
@ -1095,13 +1144,6 @@ public:
const ObStorageHASrcInfo &dest_info,
const storage::ObLSMetaPackage &ls_meta) = 0;
virtual int check_start_transfer_tablets(
const uint64_t tenant_id,
const ObStorageHASrcInfo &src_info,
const share::ObLSID &src_ls_id,
const share::ObLSID &dest_ls_id,
const common::ObIArray<share::ObTransferTabletInfo>& tablet_array) = 0;
virtual int get_ls_active_trans_count(
const uint64_t tenant_id,
const ObStorageHASrcInfo &src_info,
@ -1120,19 +1162,6 @@ public:
const ObStorageHASrcInfo &src_info,
const share::ObLSID &ls_id,
share::SCN &ls_replay_scn) = 0;
virtual int check_tablets_logical_table_replaced(
const uint64_t tenant_id,
const ObStorageHASrcInfo &src_info,
const share::ObLSID &dest_ls_id,
const common::ObIArray<share::ObTransferTabletInfo> &tablet_array,
bool &replace_finished) = 0;
virtual int get_config_version_and_transfer_scn(
const uint64_t tenant_id,
const ObStorageHASrcInfo &src_info,
const share::ObLSID &ls_id,
const bool need_get_config_version,
palf::LogConfigVersion &config_version,
share::SCN &transfer_scn) = 0;
virtual int lock_config_change(
const uint64_t tenant_id,
const ObStorageHASrcInfo &src_info,
@ -1209,13 +1238,6 @@ public:
const ObStorageHASrcInfo &dest_info,
const storage::ObLSMetaPackage &ls_meta);
virtual int check_start_transfer_tablets(
const uint64_t tenant_id,
const ObStorageHASrcInfo &src_info,
const share::ObLSID &src_ls_id,
const share::ObLSID &dest_ls_id,
const common::ObIArray<share::ObTransferTabletInfo>& tablet_array);
virtual int get_ls_active_trans_count(
const uint64_t tenant_id,
const ObStorageHASrcInfo &src_info,
@ -1234,19 +1256,6 @@ public:
const ObStorageHASrcInfo &src_info,
const share::ObLSID &ls_id,
share::SCN &ls_replay_scn);
virtual int check_tablets_logical_table_replaced(
const uint64_t tenant_id,
const ObStorageHASrcInfo &src_info,
const share::ObLSID &dest_ls_id,
const common::ObIArray<share::ObTransferTabletInfo> &tablet_array,
bool &replace_finished);
virtual int get_config_version_and_transfer_scn(
const uint64_t tenant_id,
const ObStorageHASrcInfo &src_info,
const share::ObLSID &ls_id,
const bool need_get_config_version,
palf::LogConfigVersion &config_version,
share::SCN &transfer_scn);
virtual int lock_config_change(
const uint64_t tenant_id,
const ObStorageHASrcInfo &src_info,