fix mysqltest

This commit is contained in:
bf0
2022-01-04 11:12:14 +08:00
committed by LINxiansheng
parent e68621cd71
commit 245b8b7abd

View File

@ -809,10 +809,28 @@ int ObLoadDataBase::memory_wait_local(ObExecContext &ctx,
leader_addr.reset(); leader_addr.reset();
res.reuse(); res.reuse();
if (OB_FAIL(ObLoadDataUtils::check_session_status(*session))) { bool force_renew = false;
LOG_WARN("session is not valid during wait", K(ret)); do {
} else if (OB_FAIL(loc_cache->get_strong_leader(part_key, leader_addr, force_renew))) { const int64_t retry_us = 200 * 1000;
LOG_WARN("get partition location cache failed", K(ret), K(part_key));
if (OB_FAIL(ObLoadDataUtils::check_session_status(*session))) {
LOG_WARN("session is not valid during wait", K(ret));
} else if (OB_FAIL(loc_cache->get_strong_leader(part_key, leader_addr, force_renew))) {
if (OB_LOCATION_LEADER_NOT_EXIST == ret && !force_renew) {
// retry one time
force_renew = true;
LOG_WARN("failed to get location and force renew", K(ret), K(part_key));
} else {
LOG_WARN("failed to get location", K(ret), K(part_key));
usleep(retry_us);
}
} else {
force_renew = false;
LOG_DEBUG("get participants", K(part_key), K(leader_addr));
}
} while (OB_LOCATION_LEADER_NOT_EXIST == ret && force_renew);
if (OB_FAIL(ret)) {
} else if (OB_FAIL(sql.assign_fmt(SERVER_TENANT_MEMORY_EXAMINE_SQL, } else if (OB_FAIL(sql.assign_fmt(SERVER_TENANT_MEMORY_EXAMINE_SQL,
tenant_id, tenant_id,
(leader_addr.get_ipv4() >> 24) & 0XFF, (leader_addr.get_ipv4() >> 24) & 0XFF,
@ -1079,10 +1097,31 @@ int ObLoadDataImpl::send_and_switch_buffer(
ObAddr leader_addr; ObAddr leader_addr;
if (OB_FAIL(part_key.init(load_args_.table_id_, part_id, 0))) { if (OB_FAIL(part_key.init(load_args_.table_id_, part_id, 0))) {
LOG_WARN("partition key init failed", K(load_args_.table_id_), K(part_id), K(part_num_)); LOG_WARN("partition key init failed", K(load_args_.table_id_), K(part_id), K(part_num_));
} else if (OB_FAIL(partition_locatition_cache->get_strong_leader(part_key, leader_addr))) {
LOG_WARN("get partition location cache failed", K(ret), K(part_key));
} else { } else {
// async rpc call send used buffer bool force_renew = false;
do {
const int64_t retry_us = 200 * 1000;
if (OB_FAIL(ObLoadDataUtils::check_session_status(*ctx.get_my_session()))) {
LOG_WARN("session is not valid during wait", K(ret));
} else if (OB_FAIL(partition_locatition_cache->get_strong_leader(part_key, leader_addr, force_renew))) {
if (OB_LOCATION_LEADER_NOT_EXIST == ret && !force_renew) {
// retry one time
force_renew = true;
LOG_WARN("failed to get location and force renew", K(ret), K(part_key));
} else {
LOG_WARN("failed to get location", K(ret), K(part_key));
usleep(retry_us);
}
} else {
force_renew = false;
LOG_DEBUG("get participants", K(part_key), K(leader_addr));
}
} while (OB_LOCATION_LEADER_NOT_EXIST == ret && force_renew);
}
if (OB_SUCC(ret)) {
//async rpc call send used buffer
buffer->set_addr(leader_addr); buffer->set_addr(leader_addr);
ObRpcLoadDataTaskCallBack mycallback(task_controller_, complete_task_array_, buffer); ObRpcLoadDataTaskCallBack mycallback(task_controller_, complete_task_array_, buffer);
serialize_timer_.start_stat(); serialize_timer_.start_stat();
@ -3838,7 +3877,13 @@ void ObDataFragMgr::distory_datafrag(ObDataFrag* frag)
int ObPartDataFragMgr::update_part_location(ObExecContext& ctx) int ObPartDataFragMgr::update_part_location(ObExecContext& ctx)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObIPartitionLocationCache* location_cache = NULL; ObIPartitionLocationCache *location_cache = NULL;
const int64_t retry_us = 200 * 1000;
int64_t query_timeout = 0;
int64_t retry_timeout = 0;
ctx.get_my_session()->get_query_timeout(query_timeout);
retry_timeout = std::min(ObTimeUtil::current_time() + 30 * USECS_PER_SEC, // the RTO is 30s
query_timeout);
if (OB_UNLIKELY(!part_key_.is_valid())) { if (OB_UNLIKELY(!part_key_.is_valid())) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
@ -3855,8 +3900,12 @@ int ObPartDataFragMgr::update_part_location(ObExecContext& ctx)
force_renew = true; force_renew = true;
LOG_WARN("failed to get location and force renew", K(ret), K(part_key_)); LOG_WARN("failed to get location and force renew", K(ret), K(part_key_));
} else { } else {
force_renew = false;
LOG_WARN("failed to get location", K(ret), K(part_key_)); LOG_WARN("failed to get location", K(ret), K(part_key_));
if (ObTimeUtil::current_time() + retry_us > retry_timeout) {
force_renew = false;
} else {
usleep(retry_us);
}
} }
} else { } else {
LOG_DEBUG("get participants", K(part_key_), K(leader_addr_)); LOG_DEBUG("get participants", K(part_key_), K(leader_addr_));