From 5c55cfa3c7e441f750fc0078c50701f4a01c73d1 Mon Sep 17 00:00:00 2001 From: obdev Date: Thu, 11 Apr 2024 09:51:44 +0000 Subject: [PATCH] [bugfix] avoid small requests into large_query queue --- src/observer/omt/ob_tenant.cpp | 19 +++++++++++++----- src/observer/omt/ob_th_worker.cpp | 32 +++++++++++++++++++++++++------ 2 files changed, 40 insertions(+), 11 deletions(-) diff --git a/src/observer/omt/ob_tenant.cpp b/src/observer/omt/ob_tenant.cpp index 0bd2ae54b..fe0e38cfa 100644 --- a/src/observer/omt/ob_tenant.cpp +++ b/src/observer/omt/ob_tenant.cpp @@ -1557,7 +1557,6 @@ int ObTenant::recv_large_request(rpc::ObRequest &req) { int ret = OB_SUCCESS; req.set_enqueue_timestamp(ObTimeUtility::current_time()); - req.set_large_retry_flag(true); if (0 != req.get_group_id()) { if (OB_FAIL(recv_group_request(req, req.get_group_id()))) { LOG_WARN("tenant receive large retry request fail", K(ret)); @@ -1606,11 +1605,21 @@ void ObTenant::handle_retry_req(bool need_clear) int ret = OB_SUCCESS; ObLink* task = nullptr; ObRequest *req = NULL; - while (OB_SUCC(retry_queue_.pop(task, need_clear))) { + while (OB_SUCC(ret) && OB_SUCC(retry_queue_.pop(task, need_clear))) { req = static_cast(task); - if (OB_FAIL(recv_large_request(*req))) { - LOG_ERROR("tenant patrol push req fail", "tenant", id_); - break; + if (nullptr != req) { + if (req->large_retry_flag()) { + if (OB_FAIL(recv_large_request(*req))) { + LOG_ERROR("tenant patrol push req into large_query queue fail", "tenant_id", id_, K(ret)); + } + } else { + if (OB_FAIL(recv_request(*req))) { + LOG_ERROR("tenant patrol push req into common queue fail", "tenant_id", id_, K(ret)); + } + } + } else { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("the req is NULL", "tenant_id", id_, K(ret)); } } } diff --git a/src/observer/omt/ob_th_worker.cpp b/src/observer/omt/ob_th_worker.cpp index 4cf119c33..d0cccefcd 100644 --- a/src/observer/omt/ob_th_worker.cpp +++ b/src/observer/omt/ob_th_worker.cpp @@ -235,6 +235,7 @@ inline void ObThWorker::process_request(rpc::ObRequest &req) // reset retry flags can_retry_ = true; need_retry_ = false; + req.set_large_retry_flag(false); bool need_wait_lock = false; int ret = OB_SUCCESS; reset_sql_throttle_current_priority(); @@ -259,7 +260,7 @@ inline void ObThWorker::process_request(rpc::ObRequest &req) } } } else if (retry_times) { - if (retry_times == 1) { + if (1 == retry_times) { LOG_WARN("tenant push retry request to wait queue", "tenant", tenant_->id(), K(req)); } uint64_t curr_timestamp = common::ObClockGenerator::getClock(); @@ -268,9 +269,19 @@ inline void ObThWorker::process_request(rpc::ObRequest &req) if (OB_FAIL(tenant_->push_retry_queue(req, timestamp))) { LOG_WARN("tenant schedule retry_on_lock request fail, retry with current worker","tenant", tenant_->id(), K(ret)); } - } else if (OB_FAIL(tenant_->recv_large_request(req))) { - LOG_WARN("tenant receive large request fail, " - "retry with current worker", K(ret)); + } else { + // first retry, do not put the req to retry_queue + if (req.large_retry_flag()) { + if (OB_FAIL(tenant_->recv_large_request(req))) { + LOG_WARN("tenant receive large request fail, " + "retry with current worker", "tenant", tenant_->id(), K(ret)); + } + } else { + if (OB_FAIL(tenant_->recv_request(req))) { + LOG_WARN("tenant receive request fail, " + "retry with current worker", "tenant", tenant_->id(), K(ret)); + } + } } if (OB_FAIL(ret)) { @@ -439,8 +450,17 @@ int ObThWorker::check_large_query_quota() !large_query()) { // if current query is not served by large_query worker (!large_query()) // evict it back to large query queue - need_retry_ = true; - ret = OB_EAGAIN; + if (has_req_flag()) { + rpc::ObRequest *req = const_cast(get_cur_request()); + req->set_large_retry_flag(true); + need_retry_ = true; + ret = OB_EAGAIN; + } else { + // large query retry is not supported when req is NULL (i.e. ret = OB_SUCCESS) + // but, this situation is unexpected, so log it as ERROR + LOG_ERROR("want to set large_retry_flag on request, but the req is NULL", + "tenant_id", tenant_->id(), K(ret)); + } } return ret; }