Fix fetch tablet autoinc hang after leader switch

This commit is contained in:
Hongqin-Li 2023-03-03 03:43:34 +00:00 committed by ob-robot
parent 170ebd9ba9
commit 46579b86c9

View File

@ -151,40 +151,31 @@ int ObTabletAutoincMgr::fetch_new_range(const ObTabletAutoincParam &param,
arg.tenant_id_ = param.tenant_id_;
arg.tablet_id_ = tablet_id;
arg.ls_id_ = ls_id;
if (OB_FAIL(srv_rpc_proxy->to(leader_addr).fetch_tablet_autoinc_seq_cache(arg, res))) {
LOG_WARN("fail to fetch autoinc cache for tablets", K(ret), K(arg));
}
if (OB_FAIL(ret)) {
int tmp_ret = OB_SUCCESS;
int64_t retry_times = 0;
if (OB_NOT_MASTER == ret) {
if (OB_TMP_FAIL(location_service->get(param.tenant_id_, tablet_id, INT64_MAX/*expire_renew_time*/, is_cache_hit, ls_id))) {
LOG_WARN("fail to get log stream id", K(tmp_ret), K(ret), K(tablet_id));
} else if (OB_TMP_FAIL(location_service->get_leader(GCONF.cluster_id,
param.tenant_id_,
ls_id,
true/*force_renew*/,
leader_addr))) {
LOG_WARN("force get leader failed", K(tmp_ret), K(ret), K(ls_id));
}
bool finish = false;
for (int64_t retry_times = 0; OB_SUCC(ret) && !finish; retry_times++) {
if (OB_FAIL(srv_rpc_proxy->to(leader_addr).fetch_tablet_autoinc_seq_cache(arg, res))) {
LOG_WARN("fail to fetch autoinc cache for tablets", K(ret), K(retry_times), K(arg));
} else {
finish = true;
}
if (OB_SUCCESS == tmp_ret) {
bool worker_err = false;
while (OB_FAIL(ret) && !worker_err && is_retryable(ret)) {
++retry_times;
ob_usleep<common::ObWaitEventIds::STORAGE_AUTOINC_FETCH_RETRY_SLEEP>(RETRY_INTERVAL);
res.reset();
if (OB_FAIL(THIS_WORKER.check_status())) {
worker_err = true;
LOG_WARN("failed to check status", K(ret));
} else if (OB_FAIL(srv_rpc_proxy->to(leader_addr).fetch_tablet_autoinc_seq_cache(arg, res))) {
LOG_WARN("fail to fetch autoinc cache for tablets", K(ret), K(retry_times), K(arg));
if (OB_FAIL(ret) && is_retryable(ret)) {
const bool need_refresh_leader = OB_NOT_MASTER == ret;
ob_usleep<common::ObWaitEventIds::STORAGE_AUTOINC_FETCH_RETRY_SLEEP>(RETRY_INTERVAL);
res.reset();
if (OB_FAIL(THIS_WORKER.check_status())) { // overwrite ret
LOG_WARN("failed to check status", K(ret));
} else if (need_refresh_leader) {
if (OB_FAIL(location_service->get(param.tenant_id_, tablet_id, INT64_MAX/*expire_renew_time*/, is_cache_hit, ls_id))) {
LOG_WARN("fail to get log stream id", K(ret), K(ret), K(tablet_id));
} else if (OB_FAIL(location_service->get_leader(GCONF.cluster_id,
param.tenant_id_,
ls_id,
true/*force_renew*/,
leader_addr))) {
LOG_WARN("force get leader failed", K(ret), K(ret), K(ls_id));
}
}
} else {
LOG_WARN("fetch new range from leader failed", K(ret), K(tmp_ret));
ret = tmp_ret;
}
}