[bugfix] avoid small requests into large_query queue
This commit is contained in:
parent
42e39c6f92
commit
5c55cfa3c7
@ -1557,7 +1557,6 @@ int ObTenant::recv_large_request(rpc::ObRequest &req)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
req.set_enqueue_timestamp(ObTimeUtility::current_time());
|
||||
req.set_large_retry_flag(true);
|
||||
if (0 != req.get_group_id()) {
|
||||
if (OB_FAIL(recv_group_request(req, req.get_group_id()))) {
|
||||
LOG_WARN("tenant receive large retry request fail", K(ret));
|
||||
@ -1606,11 +1605,21 @@ void ObTenant::handle_retry_req(bool need_clear)
|
||||
int ret = OB_SUCCESS;
|
||||
ObLink* task = nullptr;
|
||||
ObRequest *req = NULL;
|
||||
while (OB_SUCC(retry_queue_.pop(task, need_clear))) {
|
||||
while (OB_SUCC(ret) && OB_SUCC(retry_queue_.pop(task, need_clear))) {
|
||||
req = static_cast<rpc::ObRequest*>(task);
|
||||
if (OB_FAIL(recv_large_request(*req))) {
|
||||
LOG_ERROR("tenant patrol push req fail", "tenant", id_);
|
||||
break;
|
||||
if (nullptr != req) {
|
||||
if (req->large_retry_flag()) {
|
||||
if (OB_FAIL(recv_large_request(*req))) {
|
||||
LOG_ERROR("tenant patrol push req into large_query queue fail", "tenant_id", id_, K(ret));
|
||||
}
|
||||
} else {
|
||||
if (OB_FAIL(recv_request(*req))) {
|
||||
LOG_ERROR("tenant patrol push req into common queue fail", "tenant_id", id_, K(ret));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("the req is NULL", "tenant_id", id_, K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -235,6 +235,7 @@ inline void ObThWorker::process_request(rpc::ObRequest &req)
|
||||
// reset retry flags
|
||||
can_retry_ = true;
|
||||
need_retry_ = false;
|
||||
req.set_large_retry_flag(false);
|
||||
bool need_wait_lock = false;
|
||||
int ret = OB_SUCCESS;
|
||||
reset_sql_throttle_current_priority();
|
||||
@ -259,7 +260,7 @@ inline void ObThWorker::process_request(rpc::ObRequest &req)
|
||||
}
|
||||
}
|
||||
} else if (retry_times) {
|
||||
if (retry_times == 1) {
|
||||
if (1 == retry_times) {
|
||||
LOG_WARN("tenant push retry request to wait queue", "tenant", tenant_->id(), K(req));
|
||||
}
|
||||
uint64_t curr_timestamp = common::ObClockGenerator::getClock();
|
||||
@ -268,9 +269,19 @@ inline void ObThWorker::process_request(rpc::ObRequest &req)
|
||||
if (OB_FAIL(tenant_->push_retry_queue(req, timestamp))) {
|
||||
LOG_WARN("tenant schedule retry_on_lock request fail, retry with current worker","tenant", tenant_->id(), K(ret));
|
||||
}
|
||||
} else if (OB_FAIL(tenant_->recv_large_request(req))) {
|
||||
LOG_WARN("tenant receive large request fail, "
|
||||
"retry with current worker", K(ret));
|
||||
} else {
|
||||
// first retry, do not put the req to retry_queue
|
||||
if (req.large_retry_flag()) {
|
||||
if (OB_FAIL(tenant_->recv_large_request(req))) {
|
||||
LOG_WARN("tenant receive large request fail, "
|
||||
"retry with current worker", "tenant", tenant_->id(), K(ret));
|
||||
}
|
||||
} else {
|
||||
if (OB_FAIL(tenant_->recv_request(req))) {
|
||||
LOG_WARN("tenant receive request fail, "
|
||||
"retry with current worker", "tenant", tenant_->id(), K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
@ -439,8 +450,17 @@ int ObThWorker::check_large_query_quota()
|
||||
!large_query()) {
|
||||
// if current query is not served by large_query worker (!large_query())
|
||||
// evict it back to large query queue
|
||||
need_retry_ = true;
|
||||
ret = OB_EAGAIN;
|
||||
if (has_req_flag()) {
|
||||
rpc::ObRequest *req = const_cast<rpc::ObRequest *>(get_cur_request());
|
||||
req->set_large_retry_flag(true);
|
||||
need_retry_ = true;
|
||||
ret = OB_EAGAIN;
|
||||
} else {
|
||||
// large query retry is not supported when req is NULL (i.e. ret = OB_SUCCESS)
|
||||
// but, this situation is unexpected, so log it as ERROR
|
||||
LOG_ERROR("want to set large_retry_flag on request, but the req is NULL",
|
||||
"tenant_id", tenant_->id(), K(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user