RS notifies storage again when transfer rpc fails due to leader switch

This commit is contained in:
ZhenNan0 2024-03-22 04:45:27 +00:00 committed by ob-robot
parent 2ddcb46609
commit 2dd32c31d0

View File

@ -1543,17 +1543,31 @@ int ObTenantTransferService::notify_storage_transfer_service_(
} else if (OB_ISNULL(GCTX.location_service_) || OB_ISNULL(GCTX.srv_rpc_proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("GCTX has null ptr", KR(ret), K(task_id), K_(tenant_id));
} else if (OB_FAIL(GCTX.location_service_->get_leader_with_retry_until_timeout(
GCONF.cluster_id,
tenant_id_,
src_ls,
leader_addr))) { // default 1s timeout
LOG_WARN("get leader failed", KR(ret), K(task_id),
"cluster_id", GCONF.cluster_id.get_value(), K_(tenant_id), K(src_ls), K(leader_addr));
} else if (OB_FAIL(GCTX.srv_rpc_proxy_->to(leader_addr).by(tenant_id_).start_transfer_task(arg))) {
LOG_WARN("send rpc failed", KR(ret), K(task_id), K(src_ls), K(leader_addr), K(arg));
} else {
const int64_t RETRY_CNT_LIMIT = 10;
int64_t retry_cnt = 0;
do {
if (OB_FAIL(ret)) {
ob_usleep(1_s);
ret = OB_SUCCESS;
}
if (FAILEDx(GCTX.location_service_->get_leader_with_retry_until_timeout(
GCONF.cluster_id,
tenant_id_,
src_ls,
leader_addr))) { // default 1s timeout
LOG_WARN("get leader failed", KR(ret), K(task_id), "cluster_id", GCONF.cluster_id.get_value(),
K_(tenant_id), K(src_ls), K(leader_addr));
} else if (OB_FAIL(GCTX.srv_rpc_proxy_->to(leader_addr)
.by(tenant_id_)
.group_id(share::OBCG_TRANSFER)
.start_transfer_task(arg))) {
LOG_WARN("send rpc failed", KR(ret), K(task_id), K(src_ls), K(leader_addr), K(arg), K(retry_cnt));
}
} while (OB_FAIL(ret) && ++retry_cnt <= RETRY_CNT_LIMIT);
TTS_INFO("send rpc to storage finished", KR(ret),
K(task_id), K(src_ls), K(leader_addr), K(arg), K(retry_cnt));
}
TTS_INFO("send rpc to storage finished", KR(ret), K(task_id), K(src_ls), K(leader_addr), K(arg));
return ret;
}