fetch location with sync interface when the location cache not exist
This commit is contained in:
@ -344,9 +344,9 @@ int ObLoadDataBase::memory_wait_local(ObExecContext &ctx,
|
||||
K(tablet_id), K(server_addr), K(total_wait_secs));
|
||||
}
|
||||
|
||||
bool force_renew = false;
|
||||
bool need_wait_freeze = true;
|
||||
ObAddr leader_addr;
|
||||
ObDASLocationRouter &loc_router = DAS_CTX(ctx).get_location_router();
|
||||
|
||||
while (OB_SUCC(ret) && need_wait_freeze) {
|
||||
|
||||
@ -355,26 +355,17 @@ int ObLoadDataBase::memory_wait_local(ObExecContext &ctx,
|
||||
leader_addr.reset();
|
||||
res.reuse();
|
||||
char leader_ip_str[MAX_IP_ADDR_LENGTH];
|
||||
bool force_renew = false;
|
||||
do {
|
||||
const int64_t retry_us = 200 * 1000;
|
||||
const int64_t expire_renew_time = 2 * 1000000; // 2s
|
||||
if (OB_FAIL(ObLoadDataUtils::check_session_status(*session))) {
|
||||
LOG_WARN("session is not valid during wait", K(ret));
|
||||
} else if (OB_FAIL(ObDASLocationRouter::get_leader(tenant_id, tablet_id, leader_addr, expire_renew_time))) {
|
||||
if (is_location_service_renew_error(ret) && !force_renew) {
|
||||
// retry one time
|
||||
force_renew = true;
|
||||
LOG_WARN("failed to get location and force renew", K(ret));
|
||||
} else {
|
||||
LOG_WARN("failed to get location", K(ret));
|
||||
ob_usleep(retry_us);
|
||||
}
|
||||
} else {
|
||||
force_renew = false;
|
||||
LOG_DEBUG("get participants", K(tablet_id), K(leader_addr));
|
||||
}
|
||||
} while (is_location_service_renew_error(ret) && force_renew);
|
||||
const int64_t retry_us = 200 * 1000;
|
||||
//Try to use the results in the cache as much as possible, without forcing a cache refresh.
|
||||
const int64_t expire_renew_time = 0;
|
||||
if (OB_FAIL(ObLoadDataUtils::check_session_status(*session))) {
|
||||
LOG_WARN("session is not valid during wait", K(ret));
|
||||
} else if (OB_FAIL(loc_router.get_leader(tenant_id, tablet_id, leader_addr, expire_renew_time))) {
|
||||
LOG_WARN("failed to get location", K(ret));
|
||||
ob_usleep(retry_us);
|
||||
} else {
|
||||
LOG_DEBUG("get participants", K(tablet_id), K(leader_addr));
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (!leader_addr.ip_to_string(leader_ip_str, sizeof(leader_ip_str))) {
|
||||
@ -393,16 +384,16 @@ int ObLoadDataBase::memory_wait_local(ObExecContext &ctx,
|
||||
} else if (OB_FAIL(result->next())) {
|
||||
LOG_WARN("fail to get result, force renew location", K(ret), K(leader_addr));
|
||||
if (OB_ITER_END == ret) {
|
||||
force_renew = true;
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
} else {
|
||||
force_renew = false;
|
||||
EXTRACT_BOOL_FIELD_MYSQL(*result, "need_wait_freeze", need_wait_freeze);
|
||||
//LOG_INFO("LOAD DATA is waiting for tenant memory available",
|
||||
//K(waited_seconds), K(total_wait_secs), K(tenant_id));
|
||||
}
|
||||
|
||||
//if it is location exception, refresh location cache with block interface
|
||||
//because load data can only local retry
|
||||
loc_router.refresh_location_cache(false, ret);
|
||||
}
|
||||
|
||||
//print info
|
||||
@ -2276,9 +2267,10 @@ int ObPartDataFragMgr::update_part_location(ObExecContext &ctx)
|
||||
LOG_WARN("invalid partition key", K(ret));
|
||||
} else {
|
||||
bool force_renew = false;
|
||||
const int64_t expire_renew_time = 2 * 1000000; // 2s
|
||||
ObDASLocationRouter &loc_router = DAS_CTX(ctx).get_location_router();
|
||||
do {
|
||||
if (OB_FAIL(ObDASLocationRouter::get_leader(tenant_id_, tablet_id_, leader_addr_, expire_renew_time))) {
|
||||
const int64_t expire_renew_time = force_renew ? INT64_MAX : 0;
|
||||
if (OB_FAIL(loc_router.get_leader(tenant_id_, tablet_id_, leader_addr_, expire_renew_time))) {
|
||||
if (is_location_service_renew_error(ret) && !force_renew) {
|
||||
// retry one time
|
||||
force_renew = true;
|
||||
|
||||
Reference in New Issue
Block a user