diff --git a/src/sql/dtl/ob_dtl_interm_result_manager.cpp b/src/sql/dtl/ob_dtl_interm_result_manager.cpp index a7806d119..4f51a0aac 100644 --- a/src/sql/dtl/ob_dtl_interm_result_manager.cpp +++ b/src/sql/dtl/ob_dtl_interm_result_manager.cpp @@ -344,6 +344,8 @@ int ObDTLIntermResultManager::atomic_append_block(ObDTLIntermResultKey &key, ObA int ret = OB_SUCCESS; if (OB_FAIL(map_.atomic_refactored(key, call))) { LOG_WARN("fail to get row store in result manager", K(ret)); + } else if (OB_FAIL(call.ret_)) { + LOG_WARN("ObAtomicAppendBlockCall fail", K(ret), K(call.ret_)); } else { LOG_DEBUG("debug append block to interm result info", K(key)); } @@ -355,6 +357,8 @@ int ObDTLIntermResultManager::atomic_append_part_block(ObDTLIntermResultKey &key int ret = OB_SUCCESS; if (OB_FAIL(map_.atomic_refactored(key, call))) { LOG_WARN("fail to get row store in result manager", K(ret)); + } else if (OB_FAIL(call.ret_)) { + LOG_WARN("ObAtomicAppendPartBlockCall fail", K(ret), K(call.ret_)); } else { LOG_DEBUG("debug append part block to interm result info", K(key)); } diff --git a/src/sql/dtl/ob_dtl_rpc_channel.cpp b/src/sql/dtl/ob_dtl_rpc_channel.cpp index 25d5cc358..49c2f78b9 100644 --- a/src/sql/dtl/ob_dtl_rpc_channel.cpp +++ b/src/sql/dtl/ob_dtl_rpc_channel.cpp @@ -66,6 +66,22 @@ int ObDtlRpcChannel::SendMsgCB::process() const ObDtlRpcDataResponse &resp = result_; // if request queue is full or serialize faild, then rcode is set, and rpc process is not called int tmp_ret = OB_SUCCESS != rcode_.rcode_ ? rcode_.rcode_ : resp.recode_; + if (tmp_ret != OB_SUCCESS) { + int ret = OB_SUCCESS; + LOG_DEBUG("rpc channel callback receive error ret", K(rcode_.rcode_ ), K(resp.recode_)); + ObDtlChannel *chan = nullptr; + if (OB_FAIL(DTL.get_channel(channel_id_, chan))) { + LOG_WARN("SendMsgCB get channel failed", K(ret)); + } else if (OB_ISNULL(chan)) { + LOG_WARN("chan is nullptr", K(ret)); + } else { + ObDtlRpcChannel *rpc_chan = reinterpret_cast(chan); + rpc_chan->set_cb_ret(tmp_ret); + if (OB_FAIL(DTL.release_channel(rpc_chan))) { + LOG_WARN("release channel failed", K(ret)); + } + } + } int ret = response_.on_finish(resp.is_block_, tmp_ret); if (OB_FAIL(ret)) { LOG_WARN("set finish failed", K_(trace_id), K(ret)); @@ -79,7 +95,7 @@ rpc::frame::ObReqTransport::AsyncCB *ObDtlRpcChannel::SendMsgCB::clone( SendMsgCB *cb = NULL; void *mem = alloc(sizeof(*this)); if (NULL != mem) { - cb = new(mem)SendMsgCB(response_, trace_id_, timeout_ts_); + cb = new(mem)SendMsgCB(response_, trace_id_, timeout_ts_, channel_id_); } return cb; } @@ -177,7 +193,7 @@ ObDtlRpcChannel::ObDtlRpcChannel( const uint64_t id, const ObAddr &peer, DtlChannelType type) - : ObDtlBasicChannel(tenant_id, id, peer, type), recv_sqc_fin_res_(false) + : ObDtlBasicChannel(tenant_id, id, peer, type), recv_sqc_fin_res_(false), cb_ret_(OB_SUCCESS) {} ObDtlRpcChannel::ObDtlRpcChannel( @@ -186,7 +202,7 @@ ObDtlRpcChannel::ObDtlRpcChannel( const ObAddr &peer, const int64_t hash_val, DtlChannelType type) - : ObDtlBasicChannel(tenant_id, id, peer, hash_val, type), recv_sqc_fin_res_(false) + : ObDtlBasicChannel(tenant_id, id, peer, hash_val, type), recv_sqc_fin_res_(false), cb_ret_(OB_SUCCESS) {} ObDtlRpcChannel::~ObDtlRpcChannel() @@ -314,7 +330,7 @@ int ObDtlRpcChannel::send_message(ObDtlLinkedBuffer *&buf) // The peer may not setup when the first message arrive, // we wait first message return and retry until peer setup. int64_t timeout_us = buf->timeout_ts() - ObTimeUtility::current_time(); - SendMsgCB cb(msg_response_, *cur_trace_id, buf->timeout_ts()); + SendMsgCB cb(msg_response_, *cur_trace_id, buf->timeout_ts(), get_id()); if (timeout_us <= 0) { ret = OB_TIMEOUT; LOG_WARN("send dtl message timeout", K(ret), K(peer_), @@ -350,6 +366,18 @@ int ObDtlRpcChannel::send_message(ObDtlLinkedBuffer *&buf) return ret; } +int ObDtlRpcChannel::send(const ObDtlMsg &msg, int64_t timeout_ts, + ObEvalCtx *eval_ctx, bool is_eof) +{ + int ret = OB_SUCCESS; + if (msg.is_data_msg() && OB_FAIL(cb_ret_)) { + LOG_WARN("ObDtlRpcChannel callback failed", K(cb_ret_)); + } else if (OB_FAIL(ObDtlBasicChannel::send(msg, timeout_ts, eval_ctx, is_eof))) { + LOG_WARN("ObDtlBasicChannel send failed", K(ret)); + } + return ret; +} + } // dtl } // sql } // oceanbase diff --git a/src/sql/dtl/ob_dtl_rpc_channel.h b/src/sql/dtl/ob_dtl_rpc_channel.h index 092052b44..179d80402 100644 --- a/src/sql/dtl/ob_dtl_rpc_channel.h +++ b/src/sql/dtl/ob_dtl_rpc_channel.h @@ -48,9 +48,11 @@ class ObDtlRpcChannel class SendMsgCB : public obrpc::ObDtlRpcProxy::AsyncCB { public: - explicit SendMsgCB(SendMsgResponse &response, const common::ObCurTraceId::TraceId trace_id, const int64_t timeout_ts) + explicit SendMsgCB(SendMsgResponse &response, const common::ObCurTraceId::TraceId trace_id, + const int64_t timeout_ts, const uint64_t channel_id) : response_(response), - timeout_ts_(timeout_ts) + timeout_ts_(timeout_ts), + channel_id_(channel_id) { trace_id_.set(trace_id); } @@ -63,6 +65,7 @@ class ObDtlRpcChannel SendMsgResponse &response_; common::ObCurTraceId::TraceId trace_id_; int64_t timeout_ts_; + uint64_t channel_id_; }; class SendBCMsgCB : public obrpc::ObDtlRpcProxy::AsyncCB @@ -99,9 +102,14 @@ public: virtual int feedup(ObDtlLinkedBuffer *&buffer) override; virtual int send_message(ObDtlLinkedBuffer *&buf); + virtual int send(const ObDtlMsg &msg, int64_t timeout_ts, + ObEvalCtx *eval_ctx = nullptr, bool is_eof = false) override; + bool recv_sqc_fin_res() { return recv_sqc_fin_res_; } + void set_cb_ret(int cb_ret) { cb_ret_ = cb_ret; } private: bool recv_sqc_fin_res_; + int cb_ret_; }; } // dtl