[CP] Fix 4721 when fetch tablet ls tablet seq

This commit is contained in:
Hongqin-Li
2023-11-20 09:14:39 +00:00
committed by ob-robot
parent 2e855bc782
commit 5b1ec07b43
4 changed files with 241 additions and 29 deletions

View File

@ -126,7 +126,6 @@ int ObTabletAutoincMgr::fetch_new_range(const ObTabletAutoincParam &param,
obrpc::ObSrvRpcProxy *srv_rpc_proxy = nullptr;
share::ObLocationService *location_service = nullptr;
ObAddr leader_addr;
ObLSID ls_id;
bool is_cache_hit = false;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
@ -135,52 +134,39 @@ int ObTabletAutoincMgr::fetch_new_range(const ObTabletAutoincParam &param,
|| OB_ISNULL(location_service = GCTX.location_service_)) {
ret = OB_ERR_SYS;
LOG_WARN("root service or location_cache is null", K(ret), KP(srv_rpc_proxy), KP(location_service));
} else if (OB_FAIL(location_service->get(param.tenant_id_, tablet_id, 0/*expire_renew_time*/, is_cache_hit, ls_id))) {
LOG_WARN("fail to get log stream id", K(ret), K(tablet_id));
// try to use location cache first, if the cache is wrong, try force renew.
} else if (OB_FAIL(location_service->get_leader(GCONF.cluster_id,
param.tenant_id_,
ls_id,
false,/*force_renew*/
leader_addr))) {
LOG_WARN("get leader failed", K(ret), K(ls_id));
} else {
obrpc::ObFetchTabletSeqArg arg;
obrpc::ObFetchTabletSeqRes res;
arg.cache_size_ = MAX(cache_size_, param.auto_increment_cache_size_); // TODO(shuangcan): confirm this
arg.tenant_id_ = param.tenant_id_;
arg.tablet_id_ = tablet_id;
arg.ls_id_ = ls_id;
// arg.ls_id_ will be filled by location_service->get
bool finish = false;
for (int64_t retry_times = 0; OB_SUCC(ret) && !finish; retry_times++) {
if (OB_FAIL(srv_rpc_proxy->to(leader_addr).fetch_tablet_autoinc_seq_cache(arg, res))) {
if (OB_FAIL(location_service->get(param.tenant_id_, tablet_id, 0/*expire_renew_time*/, is_cache_hit, arg.ls_id_))) {
LOG_WARN("fail to get log stream id", K(ret), K(tablet_id));
} else if (OB_FAIL(location_service->get_leader(GCONF.cluster_id,
param.tenant_id_,
arg.ls_id_,
false,/*force_renew*/
leader_addr))) {
LOG_WARN("get leader failed", K(ret), K(arg.ls_id_));
} else if (OB_FAIL(srv_rpc_proxy->to(leader_addr).fetch_tablet_autoinc_seq_cache(arg, res))) {
LOG_WARN("fail to fetch autoinc cache for tablets", K(ret), K(retry_times), K(arg));
} else {
finish = true;
}
if (OB_FAIL(ret)) {
const bool force_refresh_leader = OB_NOT_MASTER == ret || OB_LS_NOT_EXIST == ret || OB_TABLET_NOT_EXIST == ret || OB_TENANT_NOT_IN_SERVER == ret;
(void)location_service->renew_tablet_location(param.tenant_id_, tablet_id, ret, !is_block_renew_location(ret)/*is_nonblock*/);
if (is_retryable(ret)) {
ob_usleep<common::ObWaitEventIds::STORAGE_AUTOINC_FETCH_RETRY_SLEEP>(RETRY_INTERVAL);
res.reset();
if (OB_FAIL(THIS_WORKER.check_status())) { // overwrite ret
LOG_WARN("failed to check status", K(ret));
} else {
res.reset();
ob_usleep<common::ObWaitEventIds::STORAGE_AUTOINC_FETCH_RETRY_SLEEP>(RETRY_INTERVAL);
}
}
if (OB_SUCC(ret) && force_refresh_leader) {
if (OB_FAIL(location_service->get(param.tenant_id_, tablet_id, INT64_MAX/*expire_renew_time*/, is_cache_hit, arg.ls_id_))) {
LOG_WARN("fail to get log stream id", K(ret), K(ret), K(tablet_id));
} else if (OB_FAIL(location_service->get_leader(GCONF.cluster_id,
param.tenant_id_,
arg.ls_id_,
true/*force_renew*/,
leader_addr))) {
LOG_WARN("force get leader failed", K(ret), K(ret), K(arg.ls_id_));
}
} else {
(void)location_service->renew_tablet_location(param.tenant_id_, tablet_id, ret, true/*is_nonblock*/);
}
}
}