From 64a276a23fb6fda3bbc8adf8699518bfad6cd156 Mon Sep 17 00:00:00 2001 From: obdev Date: Thu, 8 Feb 2024 05:26:55 +0000 Subject: [PATCH] [OBKV] placeholder for adding rpc header flag to classify if is kvrequesu and kv request will use queue4 --- deps/oblib/src/rpc/obrpc/ob_rpc_packet.h | 13 +++++++++++++ src/observer/omt/ob_tenant.cpp | 6 ++++++ 2 files changed, 19 insertions(+) diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_packet.h b/deps/oblib/src/rpc/obrpc/ob_rpc_packet.h index 9c22f1df74..081fd78e6c 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_packet.h +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_packet.h @@ -154,6 +154,7 @@ public: static const uint16_t ENABLE_RATELIMIT_FLAG = 1 << 8; static const uint16_t BACKGROUND_FLOW_FLAG = 1 << 7; static const uint16_t TRACE_INFO_FLAG = 1 << 6; + static const uint16_t IS_KV_REQUEST_FALG = 1 << 5; uint64_t checksum_; ObRpcPacketCode pcode_; @@ -271,6 +272,8 @@ public: inline bool unneed_response() const; inline void set_require_rerouting(); inline bool require_rerouting() const; + inline bool is_kv_request() const; + inline void set_kv_request(); inline bool ratelimit_enabled() const; inline void enable_ratelimit(); @@ -494,6 +497,16 @@ bool ObRpcPacket::has_trace_info() const return hdr_.flags_ & ObRpcPacketHeader::TRACE_INFO_FLAG; } +bool ObRpcPacket::is_kv_request() const +{ + return hdr_.flags_ & ObRpcPacketHeader::IS_KV_REQUEST_FALG; +} + +void ObRpcPacket::set_kv_request() +{ + hdr_.flags_ |= ObRpcPacketHeader::IS_KV_REQUEST_FALG; +} + void ObRpcPacket::set_stream_next() { hdr_.flags_ &= static_cast(~ObRpcPacketHeader::STREAM_LAST_FLAG); diff --git a/src/observer/omt/ob_tenant.cpp b/src/observer/omt/ob_tenant.cpp index 0dc548b148..7241f5e07d 100644 --- a/src/observer/omt/ob_tenant.cpp +++ b/src/observer/omt/ob_tenant.cpp @@ -1313,6 +1313,12 @@ int ObTenant::recv_request(ObRequest &req) if (OB_FAIL(req_queue_.push(&req, QQ_NORMAL))) { LOG_WARN("push request to QQ_NORMAL queue fail", K(ret), K(this)); } + } else if (pkt.is_kv_request()) { + // the same as sql request, kv request use q4 + ATOMIC_INC(&recv_np_rpc_cnt_); + if (OB_FAIL(req_queue_.push(&req, RQ_NORMAL))) { + LOG_WARN("push kv request to queue fail", K(ret), K(this)); + } } else if (is_normal_prio(pkt) || is_low_prio(pkt)) { ATOMIC_INC(&recv_np_rpc_cnt_); if (OB_FAIL(req_queue_.push(&req, QQ_LOW))) {