From ca77e7beb38c9e5b5eabbdbf501f7ba3f3c99d2c Mon Sep 17 00:00:00 2001 From: qingsuijiu <642782632@qq.com> Date: Fri, 15 Mar 2024 04:15:34 +0000 Subject: [PATCH] Remove the duplicated logic for handling remote side errors in the RPC channel. --- src/sql/dtl/ob_dtl_interm_result_manager.cpp | 4 +-- src/sql/dtl/ob_dtl_rpc_channel.cpp | 36 +++----------------- src/sql/dtl/ob_dtl_rpc_channel.h | 12 ++----- 3 files changed, 8 insertions(+), 44 deletions(-) diff --git a/src/sql/dtl/ob_dtl_interm_result_manager.cpp b/src/sql/dtl/ob_dtl_interm_result_manager.cpp index 2a106d889..ec1b8b06f 100644 --- a/src/sql/dtl/ob_dtl_interm_result_manager.cpp +++ b/src/sql/dtl/ob_dtl_interm_result_manager.cpp @@ -346,7 +346,7 @@ int ObDTLIntermResultManager::atomic_append_block(ObDTLIntermResultKey &key, ObA if (OB_FAIL(map_.atomic_refactored(key, call))) { LOG_WARN("fail to get row store in result manager", K(ret)); } else if (OB_FAIL(call.ret_)) { - LOG_WARN("ObAtomicAppendBlockCall fail", K(ret), K(call.ret_)); + LOG_WARN("ObAtomicAppendBlockCall fail", K(ret)); } else { LOG_DEBUG("debug append block to interm result info", K(key)); } @@ -359,7 +359,7 @@ int ObDTLIntermResultManager::atomic_append_part_block(ObDTLIntermResultKey &key if (OB_FAIL(map_.atomic_refactored(key, call))) { LOG_WARN("fail to get row store in result manager", K(ret)); } else if (OB_FAIL(call.ret_)) { - LOG_WARN("ObAtomicAppendPartBlockCall fail", K(ret), K(call.ret_)); + LOG_WARN("ObAtomicAppendPartBlockCall fail", K(ret)); } else { LOG_DEBUG("debug append part block to interm result info", K(key)); } diff --git a/src/sql/dtl/ob_dtl_rpc_channel.cpp b/src/sql/dtl/ob_dtl_rpc_channel.cpp index 49c2f78b9..25d5cc358 100644 --- a/src/sql/dtl/ob_dtl_rpc_channel.cpp +++ b/src/sql/dtl/ob_dtl_rpc_channel.cpp @@ -66,22 +66,6 @@ int ObDtlRpcChannel::SendMsgCB::process() const ObDtlRpcDataResponse &resp = result_; // if request queue is full or serialize faild, then rcode is set, and rpc process is not called int tmp_ret = OB_SUCCESS != rcode_.rcode_ ? rcode_.rcode_ : resp.recode_; - if (tmp_ret != OB_SUCCESS) { - int ret = OB_SUCCESS; - LOG_DEBUG("rpc channel callback receive error ret", K(rcode_.rcode_ ), K(resp.recode_)); - ObDtlChannel *chan = nullptr; - if (OB_FAIL(DTL.get_channel(channel_id_, chan))) { - LOG_WARN("SendMsgCB get channel failed", K(ret)); - } else if (OB_ISNULL(chan)) { - LOG_WARN("chan is nullptr", K(ret)); - } else { - ObDtlRpcChannel *rpc_chan = reinterpret_cast(chan); - rpc_chan->set_cb_ret(tmp_ret); - if (OB_FAIL(DTL.release_channel(rpc_chan))) { - LOG_WARN("release channel failed", K(ret)); - } - } - } int ret = response_.on_finish(resp.is_block_, tmp_ret); if (OB_FAIL(ret)) { LOG_WARN("set finish failed", K_(trace_id), K(ret)); @@ -95,7 +79,7 @@ rpc::frame::ObReqTransport::AsyncCB *ObDtlRpcChannel::SendMsgCB::clone( SendMsgCB *cb = NULL; void *mem = alloc(sizeof(*this)); if (NULL != mem) { - cb = new(mem)SendMsgCB(response_, trace_id_, timeout_ts_, channel_id_); + cb = new(mem)SendMsgCB(response_, trace_id_, timeout_ts_); } return cb; } @@ -193,7 +177,7 @@ ObDtlRpcChannel::ObDtlRpcChannel( const uint64_t id, const ObAddr &peer, DtlChannelType type) - : ObDtlBasicChannel(tenant_id, id, peer, type), recv_sqc_fin_res_(false), cb_ret_(OB_SUCCESS) + : ObDtlBasicChannel(tenant_id, id, peer, type), recv_sqc_fin_res_(false) {} ObDtlRpcChannel::ObDtlRpcChannel( @@ -202,7 +186,7 @@ ObDtlRpcChannel::ObDtlRpcChannel( const ObAddr &peer, const int64_t hash_val, DtlChannelType type) - : ObDtlBasicChannel(tenant_id, id, peer, hash_val, type), recv_sqc_fin_res_(false), cb_ret_(OB_SUCCESS) + : ObDtlBasicChannel(tenant_id, id, peer, hash_val, type), recv_sqc_fin_res_(false) {} ObDtlRpcChannel::~ObDtlRpcChannel() @@ -330,7 +314,7 @@ int ObDtlRpcChannel::send_message(ObDtlLinkedBuffer *&buf) // The peer may not setup when the first message arrive, // we wait first message return and retry until peer setup. int64_t timeout_us = buf->timeout_ts() - ObTimeUtility::current_time(); - SendMsgCB cb(msg_response_, *cur_trace_id, buf->timeout_ts(), get_id()); + SendMsgCB cb(msg_response_, *cur_trace_id, buf->timeout_ts()); if (timeout_us <= 0) { ret = OB_TIMEOUT; LOG_WARN("send dtl message timeout", K(ret), K(peer_), @@ -366,18 +350,6 @@ int ObDtlRpcChannel::send_message(ObDtlLinkedBuffer *&buf) return ret; } -int ObDtlRpcChannel::send(const ObDtlMsg &msg, int64_t timeout_ts, - ObEvalCtx *eval_ctx, bool is_eof) -{ - int ret = OB_SUCCESS; - if (msg.is_data_msg() && OB_FAIL(cb_ret_)) { - LOG_WARN("ObDtlRpcChannel callback failed", K(cb_ret_)); - } else if (OB_FAIL(ObDtlBasicChannel::send(msg, timeout_ts, eval_ctx, is_eof))) { - LOG_WARN("ObDtlBasicChannel send failed", K(ret)); - } - return ret; -} - } // dtl } // sql } // oceanbase diff --git a/src/sql/dtl/ob_dtl_rpc_channel.h b/src/sql/dtl/ob_dtl_rpc_channel.h index 179d80402..092052b44 100644 --- a/src/sql/dtl/ob_dtl_rpc_channel.h +++ b/src/sql/dtl/ob_dtl_rpc_channel.h @@ -48,11 +48,9 @@ class ObDtlRpcChannel class SendMsgCB : public obrpc::ObDtlRpcProxy::AsyncCB { public: - explicit SendMsgCB(SendMsgResponse &response, const common::ObCurTraceId::TraceId trace_id, - const int64_t timeout_ts, const uint64_t channel_id) + explicit SendMsgCB(SendMsgResponse &response, const common::ObCurTraceId::TraceId trace_id, const int64_t timeout_ts) : response_(response), - timeout_ts_(timeout_ts), - channel_id_(channel_id) + timeout_ts_(timeout_ts) { trace_id_.set(trace_id); } @@ -65,7 +63,6 @@ class ObDtlRpcChannel SendMsgResponse &response_; common::ObCurTraceId::TraceId trace_id_; int64_t timeout_ts_; - uint64_t channel_id_; }; class SendBCMsgCB : public obrpc::ObDtlRpcProxy::AsyncCB @@ -102,14 +99,9 @@ public: virtual int feedup(ObDtlLinkedBuffer *&buffer) override; virtual int send_message(ObDtlLinkedBuffer *&buf); - virtual int send(const ObDtlMsg &msg, int64_t timeout_ts, - ObEvalCtx *eval_ctx = nullptr, bool is_eof = false) override; - bool recv_sqc_fin_res() { return recv_sqc_fin_res_; } - void set_cb_ret(int cb_ret) { cb_ret_ = cb_ret; } private: bool recv_sqc_fin_res_; - int cb_ret_; }; } // dtl