replace fetch_ls_repay_scn sycn rpc into async rpc

This commit is contained in:
godyangfight 2023-11-29 14:41:41 +00:00 committed by ob-robot
parent 5495b064a7
commit ac97c968bf
10 changed files with 211 additions and 194 deletions

View File

@ -159,7 +159,7 @@ void oceanbase::observer::init_srv_xlator_for_migration(ObSrvRpcXlator *xlator)
RPC_PROCESSOR(ObCheckStartTransferTabletsP);
RPC_PROCESSOR(ObGetLSActiveTransCountP, gctx_.bandwidth_throttle_);
RPC_PROCESSOR(ObGetTransferStartScnP, gctx_.bandwidth_throttle_);
RPC_PROCESSOR(ObFetchLSReplayScnP, gctx_.bandwidth_throttle_);
RPC_PROCESSOR(ObFetchLSReplayScnP);
RPC_PROCESSOR(ObCheckTransferTabletsBackfillP);
RPC_PROCESSOR(ObStorageGetConfigVersionAndTransferScnP);
RPC_PROCESSOR(ObStorageLockConfigChangeP, gctx_.bandwidth_throttle_);

View File

@ -677,20 +677,23 @@ int ObTxFinishTransfer::wait_all_ls_replica_replay_scn_(const ObTransferTaskID &
const int64_t quorum, ObTimeoutCtx &timeout_ctx, bool &check_passed)
{
int ret = OB_SUCCESS;
common::ObArray<common::ObAddr> finished_addr_list;
while (OB_SUCC(ret)) {
check_passed = false;
if (timeout_ctx.is_timeouted()) {
ret = OB_TIMEOUT;
LOG_WARN("some ls replay not finished", K(ret), K(tenant_id), K(ls_id));
} else if (OB_FAIL(check_all_ls_replica_replay_scn_(
task_id, tenant_id, ls_id, member_addr_list, finish_scn, quorum, check_passed))) {
task_id, tenant_id, ls_id, member_addr_list, finish_scn, timeout_ctx, finished_addr_list))) {
LOG_WARN("failed to check all ls replica replay scn",
K(ret),
K(tenant_id),
K(member_addr_list),
K(ls_id),
K(quorum));
} else if (check_passed) {
} else if (finished_addr_list.count() == member_addr_list.count()) {
check_passed = true;
LOG_INFO("all ls has passed ls replica replay scn", K(tenant_id), K(ls_id));
break;
} else {
@ -713,48 +716,19 @@ int ObTxFinishTransfer::wait_all_ls_replica_replay_scn_(const ObTransferTaskID &
}
int ObTxFinishTransfer::check_all_ls_replica_replay_scn_(const ObTransferTaskID &task_id, const uint64_t tenant_id,
const share::ObLSID &ls_id, const common::ObArray<common::ObAddr> &member_addr_list, const share::SCN &finish_scn,
const int64_t quorum, bool &meet_criteria)
const share::ObLSID &ls_id, const common::ObIArray<common::ObAddr> &total_addr_list, const share::SCN &finish_scn,
ObTimeoutCtx &timeout_ctx, common::ObIArray<common::ObAddr> &finished_addr_list)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
int64_t cur_quorum = 0;
FOREACH_X(location, member_addr_list, OB_SUCC(ret))
{
if (OB_ISNULL(location)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("location should not be null", K(ret));
} else {
const common::ObAddr &server = *location;
bool passed_finish_scn = false;
if (OB_FAIL(inner_check_ls_replay_scn_(task_id, tenant_id, ls_id, server, finish_scn, passed_finish_scn))) {
LOG_WARN("failed to check criteria", K(ret), K(task_id), K(tenant_id), K(ls_id), K(server));
} else if (!passed_finish_scn) {
LOG_INFO("server has not passed finish scn", K(task_id), K(tenant_id), K(server), K(finish_scn));
} else {
cur_quorum++;
LOG_INFO("server has replayed passed finish scn", K(ret), K(server));
}
}
}
if (OB_SUCC(ret)) {
meet_criteria = cur_quorum == quorum ;
}
return ret;
}
int ObTxFinishTransfer::inner_check_ls_replay_scn_(const ObTransferTaskID &task_id, const uint64_t tenant_id,
const share::ObLSID &ls_id, const common::ObAddr &addr, const SCN &finish_scn, bool &passed_scn)
{
int ret = OB_SUCCESS;
passed_scn = false;
const int64_t cluster_id = GCONF.cluster_id;
SCN tmp_finish_scn;
if (OB_FAIL(fetch_ls_replay_scn_(task_id, cluster_id, addr, tenant_id, ls_id, tmp_finish_scn))) {
LOG_WARN("failed to fetch finish scn for transfer", K(ret), K(task_id), K(tenant_id), K(ls_id));
} else {
passed_scn = tmp_finish_scn >= finish_scn;
LOG_INFO("check ls replay scn", K(passed_scn), K(tmp_finish_scn), K(finish_scn));
common::ObArray<ObAddr> member_addr_list;
const int32_t group_id = share::OBCG_STORAGE_HA_LEVEL2;
if (OB_FAIL(ObTransferUtils::get_need_check_member(total_addr_list, finished_addr_list, member_addr_list))) {
LOG_WARN("failed to get need check member", K(ret), K(task_id), K(tenant_id), K(ls_id));
} else if (OB_FAIL(ObTransferUtils::check_ls_replay_scn(
tenant_id, ls_id, finish_scn, group_id, member_addr_list, timeout_ctx, finished_addr_list))) {
LOG_WARN("failed to check ls replay scn", K(ret), K(total_addr_list), K(finish_scn));
}
return ret;
}
@ -984,32 +958,6 @@ int ObTxFinishTransfer::report_result_(
return ret;
}
int ObTxFinishTransfer::fetch_ls_replay_scn_(const ObTransferTaskID &task_id, const int64_t cluster_id,
const common::ObAddr &server_addr, const uint64_t tenant_id, const share::ObLSID &ls_id, share::SCN &finish_scn)
{
int ret = OB_SUCCESS;
ObLSService *ls_service = NULL;
storage::ObStorageRpc *storage_rpc = NULL;
storage::ObStorageHASrcInfo src_info;
src_info.src_addr_ = server_addr;
src_info.cluster_id_ = GCONF.cluster_id;
if (!src_info.is_valid() || !ls_id.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get invalid args", K(ret), K(src_info), K(ls_id));
} else if (OB_ISNULL(ls_service = MTL_WITH_CHECK_TENANT(ObLSService *, tenant_id))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("log stream service is NULL", K(ret));
} else if (OB_ISNULL(storage_rpc = ls_service->get_storage_rpc())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("storage rpc proxy is NULL", K(ret));
} else if (OB_FAIL(storage_rpc->fetch_ls_replay_scn(tenant_id, src_info, ls_id, finish_scn))) {
LOG_WARN("failed to fetch ls replay scn", K(ret), K(tenant_id), K(src_info), K(ls_id));
} else {
LOG_INFO("fetch ls replay scn", K(tenant_id), K(src_info), K(ls_id));
}
return ret;
}
int ObTxFinishTransfer::check_self_ls_leader_(const share::ObLSID &ls_id, bool &is_leader)
{
int ret = OB_SUCCESS;

View File

@ -112,18 +112,11 @@ private:
// @param[in]: server addr
// @param[in]: dest_ls_scn
// @param[in]: current scn
// @param[bool]: check passed
// @param[in]: timeout_ctx
// @param[in/out]: finished_addr_list
int check_all_ls_replica_replay_scn_(const share::ObTransferTaskID &task_id, const uint64_t tenant_id,
const share::ObLSID &ls_id, const common::ObArray<common::ObAddr> &member_addr_list, const share::SCN &finish_scn,
const int64_t quorum, bool &check_passed);
// param[in]: tenant_id,
// param[in]: ls_id
// param[in]: server addr
// param[in]: finish_scn
// param[out]: is the check passed
int inner_check_ls_replay_scn_(const share::ObTransferTaskID &task_id, const uint64_t tenant_id,
const share::ObLSID &ls_id, const common::ObAddr &addr, const share::SCN &finish_scn, bool &passed_scn);
const share::ObLSID &ls_id, const common::ObIArray<common::ObAddr> &total_addr_list, const share::SCN &finish_scn,
ObTimeoutCtx &timeout_ctx, common::ObIArray<common::ObAddr> &finished_addr_list);
private:
/* helper functions */
@ -208,9 +201,6 @@ private:
int report_result_(const share::ObTransferTaskID &task_id, const int64_t result, obrpc::ObSrvRpcProxy *rs_rpc_proxy);
private:
/*rpc section*/
int fetch_ls_replay_scn_(const share::ObTransferTaskID &task_id, const int64_t cluster_id,
const common::ObAddr &server_addr, const uint64_t tenant_id, const share::ObLSID &ls_id, share::SCN &finish_scn);
// check self is leader
// @param[in]: ls_id

View File

@ -505,5 +505,106 @@ void ObTransferUtils::clear_transfer_module()
#endif
}
int ObTransferUtils::get_need_check_member(
const common::ObIArray<ObAddr> &total_member_addr_list,
const common::ObIArray<ObAddr> &finished_member_addr_list,
common::ObIArray<ObAddr> &member_addr_list)
{
int ret = OB_SUCCESS;
member_addr_list.reset();
for (int64_t i = 0; OB_SUCC(ret) && i < total_member_addr_list.count(); ++i) {
const ObAddr &addr = total_member_addr_list.at(i);
bool need_add = true;
for (int64_t j = 0; OB_SUCC(ret) && j < finished_member_addr_list.count(); ++j) {
if (finished_member_addr_list.at(j) == addr) {
need_add = false;
break;
}
}
if (OB_SUCC(ret) && need_add) {
if (OB_FAIL(member_addr_list.push_back(addr))) {
LOG_WARN("failed to push addr into array", K(ret), K(addr));
}
}
}
return ret;
}
int ObTransferUtils::check_ls_replay_scn(
const uint64_t tenant_id,
const share::ObLSID &ls_id,
const share::SCN &check_scn,
const int32_t group_id,
const common::ObIArray<ObAddr> &member_addr_list,
ObTimeoutCtx &timeout_ctx,
common::ObIArray<ObAddr> &finished_addr_list)
{
int ret = OB_SUCCESS;
storage::ObFetchLSReplayScnProxy batch_proxy(
*(GCTX.storage_rpc_proxy_), &obrpc::ObStorageRpcProxy::fetch_ls_replay_scn);
ObFetchLSReplayScnArg arg;
const int64_t timeout = 10 * 1000 * 1000; //10s
const int64_t cluster_id = GCONF.cluster_id;
if (OB_INVALID_ID == tenant_id || !ls_id.is_valid() || !check_scn.is_valid() || group_id < 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("check transfer in tablet abort get invalid argument", K(ret), K(tenant_id), K(ls_id), K(check_scn), K(group_id));
} else {
arg.tenant_id_ = tenant_id;
arg.ls_id_ = ls_id;
for (int64_t i = 0; OB_SUCC(ret) && i < member_addr_list.count(); ++i) {
const ObAddr &addr = member_addr_list.at(i);
if (timeout_ctx.is_timeouted()) {
ret = OB_TIMEOUT;
LOG_WARN("check transfer in tablet abort already timeout", K(ret), K(tenant_id), K(ls_id));
break;
} else if (OB_FAIL(batch_proxy.call(
addr,
timeout,
cluster_id,
arg.tenant_id_,
group_id,
arg))) {
LOG_WARN("failed to send fetch ls replay scn request", K(ret), K(addr), K(tenant_id), K(ls_id));
} else {
LOG_INFO("fetch ls replay scn complete", K(arg), K(addr));
}
}
ObArray<int> return_code_array;
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(batch_proxy.wait_all(return_code_array))) {
LOG_WARN("fail to wait all batch result", KR(ret), KR(tmp_ret));
ret = OB_SUCC(ret) ? tmp_ret : ret;
}
if (OB_FAIL(ret)) {
} else if (return_code_array.count() != member_addr_list.count()
|| return_code_array.count() != batch_proxy.get_results().count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("cnt not match", K(ret),
"return_cnt", return_code_array.count(),
"result_cnt", batch_proxy.get_results().count(),
"server_cnt", member_addr_list.count());
} else {
ARRAY_FOREACH_X(batch_proxy.get_results(), idx, cnt, OB_SUCC(ret)) {
const obrpc::ObFetchLSReplayScnRes *response = batch_proxy.get_results().at(idx);
const int res_ret = return_code_array.at(idx);
if (OB_SUCCESS != res_ret) {
ret = res_ret;
LOG_WARN("rpc execute failed", KR(ret), K(idx));
} else if (OB_ISNULL(response)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("response is null", K(ret));
} else if (response->replay_scn_ >= check_scn && OB_FAIL(finished_addr_list.push_back(member_addr_list.at(idx)))) {
LOG_WARN("failed to push member addr into list", K(ret), K(idx), K(member_addr_list));
}
}
}
}
return ret;
}
} // end namespace storage
} // end namespace oceanbase

View File

@ -66,6 +66,18 @@ struct ObTransferUtils
static int get_gts(const uint64_t tenant_id, share::SCN &gts);
static void set_transfer_module();
static void clear_transfer_module();
static int get_need_check_member(
const common::ObIArray<ObAddr> &total_member_addr_list,
const common::ObIArray<ObAddr> &finished_member_addr_list,
common::ObIArray<ObAddr> &member_addr_list);
static int check_ls_replay_scn(
const uint64_t tenant_id,
const share::ObLSID &ls_id,
const share::SCN &check_scn,
const int32_t group_id,
const common::ObIArray<ObAddr> &member_addr_list,
ObTimeoutCtx &timeout_ctx,
common::ObIArray<ObAddr> &finished_addr_list);
};
} // end namespace storage

View File

@ -1249,6 +1249,7 @@ int ObTransferHandler::wait_src_ls_replay_to_start_scn_(
common::ObMemberList member_list;
ObArray<ObAddr> member_addr_list;
const int64_t start_ts = ObTimeUtil::current_time();
const int32_t group_id = share::OBCG_STORAGE_HA_LEVEL1;
if (!is_inited_) {
ret = OB_NOT_INIT;
@ -1260,7 +1261,7 @@ int ObTransferHandler::wait_src_ls_replay_to_start_scn_(
LOG_WARN("failed to get src ls member list", K(ret), K(task_info));
} else if (OB_FAIL(member_list.get_addr_array(member_addr_list))) {
LOG_WARN("failed to get addr array", K(ret), K(task_info), K(member_list));
} else if (OB_FAIL(wait_ls_replay_event_(task_info, member_addr_list, start_scn, timeout_ctx))) {
} else if (OB_FAIL(wait_ls_replay_event_(task_info.src_ls_id_, task_info, member_addr_list, start_scn, group_id, timeout_ctx))) {
LOG_WARN("failed to wait ls replay event", K(ret), K(task_info), K(member_list), K(start_scn));
} else {
LOG_INFO("[TRANSFER_BLOCK_TX] wait src ls repaly to start scn", "cost", ObTimeUtil::current_time() - start_ts);
@ -1287,6 +1288,7 @@ int ObTransferHandler::precheck_ls_replay_scn_(const share::ObTransferTaskInfo &
share::SCN check_scn;
ObTimeoutCtx timeout_ctx;
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
const int32_t group_id = share::OBCG_STORAGE_HA_LEVEL2;
if (tenant_config.is_valid()) {
const int64_t timeout = tenant_config->_transfer_start_trans_timeout * 0.5;
if (OB_FAIL(timeout_ctx.set_timeout(timeout))) {
@ -1307,7 +1309,7 @@ int ObTransferHandler::precheck_ls_replay_scn_(const share::ObTransferTaskInfo &
LOG_WARN("failed to get addr array", K(ret), K(task_info), K(member_list));
} else if (OB_FAIL(get_max_decided_scn_(task_info.tenant_id_, task_info.src_ls_id_, check_scn))) {
LOG_WARN("failed to get max decided scn", K(ret), K(task_info));
} else if (OB_FAIL(wait_ls_replay_event_(task_info, member_addr_list, check_scn, timeout_ctx))) {
} else if (OB_FAIL(wait_ls_replay_event_(task_info.src_ls_id_, task_info, member_addr_list, check_scn, group_id, timeout_ctx))) {
LOG_WARN("failed to wait ls replay event", K(ret), K(task_info), K(member_list), K(check_scn));
if (OB_TIMEOUT == ret) {
ret = OB_TRANSFER_CANNOT_START;
@ -1350,56 +1352,43 @@ int ObTransferHandler::get_max_decided_scn_(
}
int ObTransferHandler::wait_ls_replay_event_(
const share::ObLSID &ls_id,
const share::ObTransferTaskInfo &task_info,
const common::ObArray<ObAddr> &member_addr_list,
const common::ObArray<ObAddr> &total_addr_list,
const share::SCN &check_scn,
const int32_t group_id,
ObTimeoutCtx &timeout_ctx)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
const int64_t OB_CHECK_START_SCN_READY_INTERVAL = 5 * 1000; //5ms
const int64_t start_ts = ObTimeUtil::current_time();
hash::ObHashSet<ObAddr> replica_addr_set;
if (OB_FAIL(replica_addr_set.create(OB_DEFAULT_REPLICA_NUM))) {
LOG_WARN("failed to create replica addr set", K(ret), K(task_info));
}
common::ObArray<ObAddr> member_addr_list;
common::ObArray<ObAddr> finished_member_addr_list;
bool is_leader = false;
while (OB_SUCC(ret)) {
int64_t replica_count = 0;
if (timeout_ctx.is_timeouted()) {
ret = OB_TIMEOUT;
LOG_WARN("already timeout", K(ret), K(task_info));
break;
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < member_addr_list.count(); ++i) {
const ObAddr &replica_addr = member_addr_list.at(i);
const int32_t hash_ret = replica_addr_set.exist_refactored(replica_addr);
SCN replica_scn;
if (OB_HASH_EXIST == hash_ret) {
replica_count++;
} else if (OB_HASH_NOT_EXIST == hash_ret) {
ObStorageHASrcInfo src_info;
src_info.cluster_id_ = GCONF.cluster_id;
src_info.src_addr_ = replica_addr;
if (OB_TMP_FAIL(inner_get_scn_for_wait_event_(task_info, src_info, replica_scn))) {
LOG_WARN("failed to inner get scn for wait event", K(tmp_ret), K(src_info));
} else if (replica_scn >= check_scn) {
if (OB_FAIL(replica_addr_set.set_refactored(replica_addr))) {
LOG_WARN("failed to set replica into hash set", K(ret), K(replica_addr));
} else {
replica_count++;
}
}
} else {
ret = OB_SUCC(hash_ret) ? OB_ERR_UNEXPECTED : hash_ret;
LOG_WARN("failed to get replica server from hash set", K(ret), K(task_info));
}
}
} else if (OB_FAIL(check_self_is_leader_(is_leader))) {
LOG_WARN("failed to check self is leader", K(ret), KPC(ls_));
} else if (!is_leader) {
ret = OB_LS_NOT_LEADER;
LOG_WARN("ls leader has been changed", K(ret), K(task_info));
break;
} else if (OB_FAIL(ObTransferUtils::get_need_check_member(total_addr_list, finished_member_addr_list, member_addr_list))) {
LOG_WARN("failed to get need check member", K(ret), K(task_info), K(total_addr_list));
} else if (OB_FAIL(ObTransferUtils::check_ls_replay_scn(task_info.tenant_id_, ls_id, check_scn,
group_id, member_addr_list, timeout_ctx, finished_member_addr_list))) {
LOG_WARN("failed to check ls replay scn", K(ret), K(task_info), K(ls_id), K(check_scn));
}
if (OB_SUCC(ret)) {
if (replica_count == member_addr_list.count()) {
if (finished_member_addr_list.count() == total_addr_list.count()) {
FLOG_INFO("[TRANSFER] src ls all replicas replay reach check_scn", "src_ls", task_info.src_ls_id_,
K(check_scn), K(member_addr_list), "cost", ObTimeUtil::current_time() - start_ts);
K(check_scn), K(total_addr_list), "cost", ObTimeUtil::current_time() - start_ts);
break;
}
}
@ -1412,24 +1401,6 @@ int ObTransferHandler::wait_ls_replay_event_(
return ret;
}
int ObTransferHandler::inner_get_scn_for_wait_event_(
const share::ObTransferTaskInfo &task_info,
const ObStorageHASrcInfo &src_info,
share::SCN &replica_scn)
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = task_info.tenant_id_;
const share::ObLSID &src_ls_id = task_info.src_ls_id_;
if (OB_ISNULL(storage_rpc_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("storage rpc should not be null", K(ret));
} else if (OB_FAIL(storage_rpc_->fetch_ls_replay_scn(tenant_id, src_info, src_ls_id, replica_scn))) {
LOG_WARN("failed to fetch ls replay scn", K(ret), K(tenant_id), K(src_info));
}
return ret;
}
int ObTransferHandler::get_transfer_tablets_meta_(
const share::ObTransferTaskInfo &task_info,
common::ObIArray<ObMigrationTabletParam> &tablet_meta_list)

View File

@ -157,14 +157,12 @@ private:
ObTimeoutCtx &timeout_ctx,
share::SCN &start_scn);
int wait_ls_replay_event_(
const share::ObLSID &ls_id,
const share::ObTransferTaskInfo &task_info,
const common::ObArray<ObAddr> &member_addr_list,
const share::SCN &check_scn,
const int32_t group_id,
ObTimeoutCtx &timeout_ctx);
int inner_get_scn_for_wait_event_(
const share::ObTransferTaskInfo &task_info,
const ObStorageHASrcInfo &src_info,
share::SCN &replica_scn);
int precheck_ls_replay_scn_(
const share::ObTransferTaskInfo &task_info);
int get_max_decided_scn_(

View File

@ -32,6 +32,7 @@ namespace storage
RPC_HA(obrpc::OB_HA_CHECK_TRANSFER_TABLET_BACKFILL, obrpc::ObCheckTransferTabletBackfillArg, obrpc::ObCheckTransferTabletBackfillRes, ObCheckTransferTabletBackfillProxy);
RPC_HA(obrpc::OB_HA_CHANGE_MEMBER_SERVICE, obrpc::ObStorageChangeMemberArg, obrpc::ObStorageChangeMemberRes, ObHAChangeMemberProxy);
RPC_HA(obrpc::OB_CHECK_START_TRANSFER_TABLETS, obrpc::ObTransferTabletInfoArg, obrpc::Int64, ObCheckStartTransferTabletsProxy);
RPC_HA(obrpc::OB_HA_FETCH_LS_REPLAY_SCN, obrpc::ObFetchLSReplayScnArg, obrpc::ObFetchLSReplayScnRes, ObFetchLSReplayScnProxy);
}//end namespace storage
}//end namespace oceanbase

View File

@ -2696,12 +2696,31 @@ int ObGetTransferStartScnP::process()
return ret;
}
ObFetchLSReplayScnP::ObFetchLSReplayScnP(
common::ObInOutBandwidthThrottle *bandwidth_throttle)
: ObStorageStreamRpcP(bandwidth_throttle)
OFetchLSReplayScnDelegate::OFetchLSReplayScnDelegate(obrpc::ObFetchLSReplayScnRes &result)
: is_inited_(false),
arg_(),
result_(result)
{
}
int ObFetchLSReplayScnP::process()
int OFetchLSReplayScnDelegate::init(
const obrpc::ObFetchLSReplayScnArg &arg)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("fetch ls replay scn delegate init twice", K(ret));
} else if (!arg.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get invalid arg", K(ret), K(arg));
} else {
arg_ = arg;
is_inited_ = true;
}
return ret;
}
int OFetchLSReplayScnDelegate::process()
{
int ret = OB_SUCCESS;
MTL_SWITCH(arg_.tenant_id_) {
@ -2710,11 +2729,7 @@ int ObFetchLSReplayScnP::process()
ObLS *ls = NULL;
SCN max_decided_scn;
LOG_INFO("start to fetch ls replay scn", K(arg_));
if (OB_ISNULL(bandwidth_throttle_)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR, "bandwidth_throttle_ must not null", K(ret),
KP_(bandwidth_throttle));
} else if (OB_ISNULL(ls_service = MTL(ObLSService *))) {
if (OB_ISNULL(ls_service = MTL(ObLSService *))) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "ls service should not be null", K(ret), KP(ls_service));
} else if (OB_FAIL(ls_service->get_ls(arg_.ls_id_, ls_handle, ObLSGetMod::STORAGE_MOD))) {
@ -2732,6 +2747,18 @@ int ObFetchLSReplayScnP::process()
return ret;
}
int ObFetchLSReplayScnP::process()
{
int ret = OB_SUCCESS;
OFetchLSReplayScnDelegate delegate(result_);
if (OB_FAIL(delegate.init(arg_))) {
LOG_WARN("failed to init delegate", K(ret));
} else if (OB_FAIL(delegate.process())) {
LOG_WARN("failed to do process", K(ret), K_(arg));
}
return ret;
}
ObCheckTransferTabletsBackfillDelegate::ObCheckTransferTabletsBackfillDelegate(obrpc::ObCheckTransferTabletBackfillRes &result)
: is_inited_(false),
arg_(),
@ -3490,38 +3517,6 @@ int ObStorageRpc::get_transfer_start_scn(
return ret;
}
int ObStorageRpc::fetch_ls_replay_scn(
const uint64_t tenant_id,
const ObStorageHASrcInfo &src_info,
const share::ObLSID &ls_id,
SCN &ls_replay_scn)
{
int ret = OB_SUCCESS;
ls_replay_scn.reset();
if (!is_inited_) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "storage rpc is not inited", K(ret));
} else if (tenant_id == OB_INVALID_ID || !src_info.is_valid() || !ls_id.is_valid()) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(src_info), K(ls_id));
} else {
ObFetchLSReplayScnArg arg;
ObFetchLSReplayScnRes res;
arg.tenant_id_ = tenant_id;
arg.ls_id_ = ls_id;
if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_)
.by(tenant_id)
.dst_cluster_id(src_info.cluster_id_)
.group_id(share::OBCG_STORAGE_HA_LEVEL2)
.fetch_ls_replay_scn(arg, res))) {
LOG_WARN("failed to fetch ls replay scn", K(ret), K(src_info), K(arg));
} else {
ls_replay_scn = res.replay_scn_;
}
}
return ret;
}
int ObStorageRpc::lock_config_change(
const uint64_t tenant_id,
const ObStorageHASrcInfo &src_info,

View File

@ -754,7 +754,6 @@ public:
RPC_S(PR5 update_ls_meta, OB_HA_UPDATE_LS_META, (ObRestoreUpdateLSMetaArg));
RPC_S(PR5 get_ls_active_trans_count, OB_GET_LS_ACTIVE_TRANSACTION_COUNT, (ObGetLSActiveTransCountArg), ObGetLSActiveTransCountRes);
RPC_S(PR5 get_transfer_start_scn, OB_GET_TRANSFER_START_SCN, (ObGetTransferStartScnArg), ObGetTransferStartScnRes);
RPC_S(PR5 fetch_ls_replay_scn, OB_HA_FETCH_LS_REPLAY_SCN, (ObFetchLSReplayScnArg), ObFetchLSReplayScnRes);
RPC_S(PR5 lock_config_change, OB_HA_LOCK_CONFIG_CHANGE, (ObStorageConfigChangeOpArg), ObStorageConfigChangeOpRes);
RPC_S(PR5 unlock_config_change, OB_HA_UNLOCK_CONFIG_CHANGE, (ObStorageConfigChangeOpArg), ObStorageConfigChangeOpRes);
RPC_S(PR5 get_config_change_lock_stat, OB_HA_GET_CONFIG_CHANGE_LOCK_STAT, (ObStorageConfigChangeOpArg), ObStorageConfigChangeOpRes);
@ -765,6 +764,7 @@ public:
RPC_AP(PR5 check_transfer_tablet_backfill_completed, OB_HA_CHECK_TRANSFER_TABLET_BACKFILL, (obrpc::ObCheckTransferTabletBackfillArg), obrpc::ObCheckTransferTabletBackfillRes);
RPC_AP(PR5 get_config_version_and_transfer_scn, OB_HA_CHANGE_MEMBER_SERVICE, (obrpc::ObStorageChangeMemberArg), obrpc::ObStorageChangeMemberRes);
RPC_AP(PR5 check_start_transfer_tablets, OB_CHECK_START_TRANSFER_TABLETS, (obrpc::ObTransferTabletInfoArg), obrpc::Int64);
RPC_AP(PR5 fetch_ls_replay_scn, OB_HA_FETCH_LS_REPLAY_SCN, (obrpc::ObFetchLSReplayScnArg), obrpc::ObFetchLSReplayScnRes);
};
template <ObRpcPacketCode RPC_CODE>
@ -952,15 +952,28 @@ protected:
};
class ObFetchLSReplayScnP:
public ObStorageStreamRpcP<OB_HA_FETCH_LS_REPLAY_SCN>
public ObStorageRpcProxy::Processor<OB_HA_FETCH_LS_REPLAY_SCN>
{
public:
explicit ObFetchLSReplayScnP(common::ObInOutBandwidthThrottle *bandwidth_throttle);
ObFetchLSReplayScnP() = default;
virtual ~ObFetchLSReplayScnP() {}
protected:
int process();
};
class OFetchLSReplayScnDelegate final
{
public:
OFetchLSReplayScnDelegate(obrpc::ObFetchLSReplayScnRes &result);
int init(const obrpc::ObFetchLSReplayScnArg &arg);
int process();
private:
bool is_inited_;
obrpc::ObFetchLSReplayScnArg arg_;
obrpc::ObFetchLSReplayScnRes &result_;
DISALLOW_COPY_AND_ASSIGN(OFetchLSReplayScnDelegate);
};
class ObCheckTransferTabletsBackfillP:
public ObStorageRpcProxy::Processor<OB_HA_CHECK_TRANSFER_TABLET_BACKFILL>
{
@ -1157,12 +1170,6 @@ public:
const share::ObLSID &ls_id,
const common::ObIArray<share::ObTransferTabletInfo> &tablet_list,
share::SCN &transfer_start_scn) = 0;
virtual int fetch_ls_replay_scn(
const uint64_t tenant_id,
const ObStorageHASrcInfo &src_info,
const share::ObLSID &ls_id,
share::SCN &ls_replay_scn) = 0;
virtual int lock_config_change(
const uint64_t tenant_id,
const ObStorageHASrcInfo &src_info,
@ -1251,12 +1258,6 @@ public:
const share::ObLSID &ls_id,
const common::ObIArray<share::ObTransferTabletInfo> &tablet_list,
share::SCN &transfer_start_scn);
virtual int fetch_ls_replay_scn(
const uint64_t tenant_id,
const ObStorageHASrcInfo &src_info,
const share::ObLSID &ls_id,
share::SCN &ls_replay_scn);
virtual int lock_config_change(
const uint64_t tenant_id,
const ObStorageHASrcInfo &src_info,