diff --git a/src/sql/dtl/ob_dtl_basic_channel.cpp b/src/sql/dtl/ob_dtl_basic_channel.cpp index 811e680100..063b167867 100644 --- a/src/sql/dtl/ob_dtl_basic_channel.cpp +++ b/src/sql/dtl/ob_dtl_basic_channel.cpp @@ -392,7 +392,8 @@ int ObDtlBasicChannel::mock_eof_buffer(int64_t timeout_ts) return ret; } -int ObDtlBasicChannel::attach(ObDtlLinkedBuffer *&linked_buffer, bool is_first_buffer_cached) +int ObDtlBasicChannel::attach(ObDtlLinkedBuffer *&linked_buffer, bool is_first_buffer_cached, + bool inc_recv_buf_cnt) { int ret = OB_SUCCESS; ObDtlMsgHeader header; @@ -405,7 +406,9 @@ int ObDtlBasicChannel::attach(ObDtlLinkedBuffer *&linked_buffer, bool is_first_b LOG_WARN("failed to deserialize msg header", K(ret)); } else if (header.is_drain()) { alloc_buffer_count(); - inc_recv_buffer_cnt(); + if (inc_recv_buf_cnt) { + inc_recv_buffer_cnt(); + } dfc_->set_drain(this); LOG_TRACE("transmit receive drain cmd", KP(linked_buffer), K(this), KP(id_), KP(peer_id_), K(recv_list_.is_empty())); @@ -424,7 +427,9 @@ int ObDtlBasicChannel::attach(ObDtlLinkedBuffer *&linked_buffer, bool is_first_b linked_buffer = nullptr; // 将attach收到的是自己申请的,因为释放权交给了当前channel,所以认为这次申请也是自己,与free保持一致,方便统计 alloc_buffer_count(); - inc_recv_buffer_cnt(); + if (inc_recv_buf_cnt) { + inc_recv_buffer_cnt(); + } if (is_data_msg) { metric_.mark_first_in(); if (is_eof) { diff --git a/src/sql/dtl/ob_dtl_basic_channel.h b/src/sql/dtl/ob_dtl_basic_channel.h index e861817273..e84fbd606c 100644 --- a/src/sql/dtl/ob_dtl_basic_channel.h +++ b/src/sql/dtl/ob_dtl_basic_channel.h @@ -357,7 +357,8 @@ public: virtual int send(const ObDtlMsg &msg, int64_t timeout_ts, ObEvalCtx *eval_ctx = nullptr, bool is_eof = false) override; virtual int feedup(ObDtlLinkedBuffer *&buffer) override; - virtual int attach(ObDtlLinkedBuffer *&linked_buffer, bool is_first_buffer_cached = false); + virtual int attach(ObDtlLinkedBuffer *&linked_buffer, bool is_first_buffer_cached = false, + bool inc_recv_buf_cnt = true); // don't call send&flush in different threads. virtual int flush(bool wait=true, bool wait_response = true) override; diff --git a/src/sql/dtl/ob_dtl_channel.h b/src/sql/dtl/ob_dtl_channel.h index 61df85cb27..4c1418c83a 100644 --- a/src/sql/dtl/ob_dtl_channel.h +++ b/src/sql/dtl/ob_dtl_channel.h @@ -108,7 +108,8 @@ public: virtual int send(const ObDtlMsg &msg, int64_t timeout_ts, ObEvalCtx *eval_ctx = nullptr, bool is_eof = false) = 0; virtual int feedup(ObDtlLinkedBuffer *&buffer) = 0; - virtual int attach(ObDtlLinkedBuffer *&linked_buffer, bool is_firt_buffer_cached = false) = 0; + virtual int attach(ObDtlLinkedBuffer *&linked_buffer, bool is_firt_buffer_cached = false, + bool inc_recv_buf_cnt = true) = 0; virtual int flush(bool wait=true, bool wait_response = true) = 0; virtual bool is_empty() const = 0; diff --git a/src/sql/dtl/ob_dtl_rpc_channel.cpp b/src/sql/dtl/ob_dtl_rpc_channel.cpp index 25052903af..ba0fe9fcdf 100644 --- a/src/sql/dtl/ob_dtl_rpc_channel.cpp +++ b/src/sql/dtl/ob_dtl_rpc_channel.cpp @@ -176,7 +176,7 @@ ObDtlRpcChannel::ObDtlRpcChannel( const uint64_t tenant_id, const uint64_t id, const ObAddr &peer) - : ObDtlBasicChannel(tenant_id, id, peer), recv_mock_eof_cnt_(0) + : ObDtlBasicChannel(tenant_id, id, peer) {} ObDtlRpcChannel::ObDtlRpcChannel( @@ -184,7 +184,7 @@ ObDtlRpcChannel::ObDtlRpcChannel( const uint64_t id, const ObAddr &peer, const int64_t hash_val) - : ObDtlBasicChannel(tenant_id, id, peer, hash_val), recv_mock_eof_cnt_(0) + : ObDtlBasicChannel(tenant_id, id, peer, hash_val) {} ObDtlRpcChannel::~ObDtlRpcChannel() @@ -233,11 +233,10 @@ int ObDtlRpcChannel::feedup(ObDtlLinkedBuffer *&buffer) LOG_TRACE("DTL feedup a new msg to msg loop", K(buffer->size()), KP(id_), K(peer_)); ObDtlLinkedBuffer::assign(*buffer, linked_buffer); if (1 == linked_buffer->seq_no() && linked_buffer->is_data_msg() - && 0 != get_recv_buffer_cnt() && - !(1 == get_recv_buffer_cnt() && 1 == recv_mock_eof_cnt_)) { + && 0 != get_recv_buffer_cnt()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("first buffer is not first", K(ret), K(get_id()), K(get_peer_id()), - K(get_recv_buffer_cnt()), K(recv_mock_eof_cnt_)); + K(get_recv_buffer_cnt()), K(linked_buffer->seq_no())); free_buf(linked_buffer); linked_buffer = nullptr; } else if (OB_FAIL(block_on_increase_size(linked_buffer->size()))) { @@ -250,9 +249,6 @@ int ObDtlRpcChannel::feedup(ObDtlLinkedBuffer *&buffer) linked_buffer = nullptr; } else if (FALSE_IT(inc_recv_buffer_cnt())) { } else { - if (1 == buffer->seq_no() && buffer->is_eof() && 0 == buffer->pos()) { - recv_mock_eof_cnt_++; - } if (buffer->is_data_msg()) { metric_.mark_first_in(); if (buffer->is_eof()) { diff --git a/src/sql/dtl/ob_dtl_rpc_channel.h b/src/sql/dtl/ob_dtl_rpc_channel.h index 36aad55323..1b58bdd059 100644 --- a/src/sql/dtl/ob_dtl_rpc_channel.h +++ b/src/sql/dtl/ob_dtl_rpc_channel.h @@ -101,8 +101,6 @@ public: virtual int feedup(ObDtlLinkedBuffer *&buffer) override; virtual int send_message(ObDtlLinkedBuffer *&buf); -private: - int64_t recv_mock_eof_cnt_; }; } // dtl diff --git a/src/sql/engine/px/ob_px_rpc_processor.cpp b/src/sql/engine/px/ob_px_rpc_processor.cpp index a8e93966b5..d06ba69700 100644 --- a/src/sql/engine/px/ob_px_rpc_processor.cpp +++ b/src/sql/engine/px/ob_px_rpc_processor.cpp @@ -317,6 +317,8 @@ int ObFastInitSqcReportQCMessageCall::mock_sqc_finish_msg() buffer->set_data_msg(false); buffer->timeout_ts() = timeout_ts_; buffer->set_msg_type(dtl::ObDtlMsgType::FINISH_SQC_RESULT); + const bool is_first_buffer_cached = false; + const bool inc_recv_buf_cnt = false; if (OB_FAIL(common::serialization::encode(buf, size, pos, header))) { LOG_WARN("fail to encode buffer", K(ret)); } else if (OB_FAIL(common::serialization::encode(buf, size, pos, finish_msg))) { @@ -324,7 +326,7 @@ int ObFastInitSqcReportQCMessageCall::mock_sqc_finish_msg() } else if (FALSE_IT(buffer->size() = pos)) { } else if (FALSE_IT(pos = 0)) { } else if (FALSE_IT(buffer->tenant_id() = ch->get_tenant_id())) { - } else if (OB_FAIL(ch->attach(buffer))) { + } else if (OB_FAIL(ch->attach(buffer, is_first_buffer_cached, inc_recv_buf_cnt))) { LOG_WARN("fail to feedup buffer", K(ret)); } else if (FALSE_IT(ch->free_buffer_count())) { } else {