diff --git a/src/sql/engine/cmd/ob_load_data_impl.cpp b/src/sql/engine/cmd/ob_load_data_impl.cpp index 944dbd8ee1..67139764ea 100644 --- a/src/sql/engine/cmd/ob_load_data_impl.cpp +++ b/src/sql/engine/cmd/ob_load_data_impl.cpp @@ -808,11 +808,29 @@ int ObLoadDataBase::memory_wait_local(ObExecContext &ctx, leader_addr.reset(); res.reuse(); + + bool force_renew = false; + do { + const int64_t retry_us = 200 * 1000; - if (OB_FAIL(ObLoadDataUtils::check_session_status(*session))) { - LOG_WARN("session is not valid during wait", K(ret)); - } else if (OB_FAIL(loc_cache->get_strong_leader(part_key, leader_addr, force_renew))) { - LOG_WARN("get partition location cache failed", K(ret), K(part_key)); + if (OB_FAIL(ObLoadDataUtils::check_session_status(*session))) { + LOG_WARN("session is not valid during wait", K(ret)); + } else if (OB_FAIL(loc_cache->get_strong_leader(part_key, leader_addr, force_renew))) { + if (OB_LOCATION_LEADER_NOT_EXIST == ret && !force_renew) { + // retry one time + force_renew = true; + LOG_WARN("failed to get location and force renew", K(ret), K(part_key)); + } else { + LOG_WARN("failed to get location", K(ret), K(part_key)); + usleep(retry_us); + } + } else { + force_renew = false; + LOG_DEBUG("get participants", K(part_key), K(leader_addr)); + } + } while (OB_LOCATION_LEADER_NOT_EXIST == ret && force_renew); + + if (OB_FAIL(ret)) { } else if (OB_FAIL(sql.assign_fmt(SERVER_TENANT_MEMORY_EXAMINE_SQL, tenant_id, (leader_addr.get_ipv4() >> 24) & 0XFF, @@ -1079,10 +1097,31 @@ int ObLoadDataImpl::send_and_switch_buffer( ObAddr leader_addr; if (OB_FAIL(part_key.init(load_args_.table_id_, part_id, 0))) { LOG_WARN("partition key init failed", K(load_args_.table_id_), K(part_id), K(part_num_)); - } else if (OB_FAIL(partition_locatition_cache->get_strong_leader(part_key, leader_addr))) { - LOG_WARN("get partition location cache failed", K(ret), K(part_key)); } else { - // async rpc call send used buffer + bool force_renew = false; + do { + const int64_t retry_us = 200 * 1000; + + if (OB_FAIL(ObLoadDataUtils::check_session_status(*ctx.get_my_session()))) { + LOG_WARN("session is not valid during wait", K(ret)); + } else if (OB_FAIL(partition_locatition_cache->get_strong_leader(part_key, leader_addr, force_renew))) { + if (OB_LOCATION_LEADER_NOT_EXIST == ret && !force_renew) { + // retry one time + force_renew = true; + LOG_WARN("failed to get location and force renew", K(ret), K(part_key)); + } else { + LOG_WARN("failed to get location", K(ret), K(part_key)); + usleep(retry_us); + } + } else { + force_renew = false; + LOG_DEBUG("get participants", K(part_key), K(leader_addr)); + } + } while (OB_LOCATION_LEADER_NOT_EXIST == ret && force_renew); + } + + if (OB_SUCC(ret)) { + //async rpc call send used buffer buffer->set_addr(leader_addr); ObRpcLoadDataTaskCallBack mycallback(task_controller_, complete_task_array_, buffer); serialize_timer_.start_stat(); @@ -3838,7 +3877,13 @@ void ObDataFragMgr::distory_datafrag(ObDataFrag* frag) int ObPartDataFragMgr::update_part_location(ObExecContext& ctx) { int ret = OB_SUCCESS; - ObIPartitionLocationCache* location_cache = NULL; + ObIPartitionLocationCache *location_cache = NULL; + const int64_t retry_us = 200 * 1000; + int64_t query_timeout = 0; + int64_t retry_timeout = 0; + ctx.get_my_session()->get_query_timeout(query_timeout); + retry_timeout = std::min(ObTimeUtil::current_time() + 30 * USECS_PER_SEC, // the RTO is 30s + query_timeout); if (OB_UNLIKELY(!part_key_.is_valid())) { ret = OB_NOT_INIT; @@ -3855,8 +3900,12 @@ int ObPartDataFragMgr::update_part_location(ObExecContext& ctx) force_renew = true; LOG_WARN("failed to get location and force renew", K(ret), K(part_key_)); } else { - force_renew = false; LOG_WARN("failed to get location", K(ret), K(part_key_)); + if (ObTimeUtil::current_time() + retry_us > retry_timeout) { + force_renew = false; + } else { + usleep(retry_us); + } } } else { LOG_DEBUG("get participants", K(part_key_), K(leader_addr_));