From 8a1d0f10b5eb9e2bf4591d12a7f126d672e44673 Mon Sep 17 00:00:00 2001 From: "18523270951@163.com" <18523270951@163.com> Date: Sat, 28 Sep 2024 06:15:53 +0000 Subject: [PATCH] make dtl_buffer_size effective immediately --- src/sql/dtl/ob_dtl_basic_channel.cpp | 9 +++++---- src/sql/dtl/ob_dtl_basic_channel.h | 2 ++ src/sql/dtl/ob_dtl_channel_agent.cpp | 1 + src/sql/dtl/ob_dtl_channel_agent.h | 6 +++++- src/sql/dtl/ob_dtl_channel_mem_manager.cpp | 20 +++++++++++++------ src/sql/dtl/ob_dtl_channel_mem_manager.h | 1 + .../engine/px/exchange/ob_px_transmit_op.cpp | 7 +++++++ 7 files changed, 35 insertions(+), 11 deletions(-) diff --git a/src/sql/dtl/ob_dtl_basic_channel.cpp b/src/sql/dtl/ob_dtl_basic_channel.cpp index 796c007a7c..6f75aeded6 100644 --- a/src/sql/dtl/ob_dtl_basic_channel.cpp +++ b/src/sql/dtl/ob_dtl_basic_channel.cpp @@ -990,6 +990,7 @@ int ObDtlBasicChannel::switch_writer(const ObDtlMsg &msg) } else if (DtlWriterType::CHUNK_DATUM_WRITER == msg_writer_map[px_row.get_data_type()]) { msg_writer_ = &datum_msg_writer_; } else if (DtlWriterType::VECTOR_FIXED_WRITER == msg_writer_map[px_row.get_data_type()]) { + vector_fixed_msg_writer_.set_size_per_buffer(send_buffer_size_); msg_writer_ = &vector_fixed_msg_writer_; } else if (DtlWriterType::VECTOR_ROW_WRITER == msg_writer_map[px_row.get_data_type()]) { vector_row_msg_writer_.set_row_meta(meta_); @@ -1521,7 +1522,7 @@ int ObDtlVectorMsgWriter::serialize() //--------------end ObDtlVectorsFixedMsgWriter--------------- ObDtlVectorFixedMsgWriter::ObDtlVectorFixedMsgWriter() : type_(VECTOR_FIXED_WRITER), write_buffer_(nullptr), vector_buffer_(), - write_ret_(OB_SUCCESS) + write_ret_(OB_SUCCESS), size_per_buffer_(-1) {} ObDtlVectorFixedMsgWriter::~ObDtlVectorFixedMsgWriter() @@ -1533,12 +1534,12 @@ int ObDtlVectorFixedMsgWriter::init(ObDtlLinkedBuffer *buffer, uint64_t tenant_i { int ret = OB_SUCCESS; UNUSED(tenant_id); - if (nullptr == buffer) { + if (nullptr == buffer || size_per_buffer_ < 0 || buffer->size() < size_per_buffer_) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("write buffer is null", K(ret)); + LOG_WARN("write buffer is null", K(ret), K(size_per_buffer_), K(buffer->size()), K(lbt())); } else { reset(); - vector_buffer_.set_buf(buffer->buf(), buffer->size()); + vector_buffer_.set_buf(buffer->buf(), size_per_buffer_); /*keep fixed msg use buffer with same size*/ write_buffer_ = buffer; } return ret; diff --git a/src/sql/dtl/ob_dtl_basic_channel.h b/src/sql/dtl/ob_dtl_basic_channel.h index 8f6f4c2ebd..6b7726c8ea 100644 --- a/src/sql/dtl/ob_dtl_basic_channel.h +++ b/src/sql/dtl/ob_dtl_basic_channel.h @@ -563,11 +563,13 @@ public: { buffer->msg_type() = ObDtlMsgType::PX_VECTOR_FIXED; } + void set_size_per_buffer(const int64_t size) { size_per_buffer_ = size; } private: DtlWriterType type_; ObDtlLinkedBuffer *write_buffer_; ObDtlVectors vector_buffer_; int write_ret_; + int64_t size_per_buffer_; }; OB_INLINE int ObDtlVectorFixedMsgWriter::write( diff --git a/src/sql/dtl/ob_dtl_channel_agent.cpp b/src/sql/dtl/ob_dtl_channel_agent.cpp index 92bac8d20f..2fc9ee856e 100644 --- a/src/sql/dtl/ob_dtl_channel_agent.cpp +++ b/src/sql/dtl/ob_dtl_channel_agent.cpp @@ -33,6 +33,7 @@ int ObDtlBufEncoder::switch_writer(const ObDtlMsg &msg) } else if (DtlWriterType::CHUNK_DATUM_WRITER == msg_writer_map[px_row.get_data_type()]) { msg_writer_ = &datum_msg_writer_; } else if (DtlWriterType::VECTOR_FIXED_WRITER == msg_writer_map[px_row.get_data_type()]) { + vector_fixed_msg_writer_.set_size_per_buffer(size_per_buffer_); msg_writer_ = &vector_fixed_msg_writer_; } else if (DtlWriterType::VECTOR_ROW_WRITER == msg_writer_map[px_row.get_data_type()]) { vector_row_msg_writer_.set_row_meta(meta_); diff --git a/src/sql/dtl/ob_dtl_channel_agent.h b/src/sql/dtl/ob_dtl_channel_agent.h index 6920bae578..4f5a930dc9 100644 --- a/src/sql/dtl/ob_dtl_channel_agent.h +++ b/src/sql/dtl/ob_dtl_channel_agent.h @@ -39,7 +39,8 @@ public: tenant_id_(500), buffer_(nullptr), msg_writer_(nullptr), - meta_(nullptr) + meta_(nullptr), + size_per_buffer_(-1) {} ~ObDtlBufEncoder() {} void set_tenant_id(int64_t tenant_id) { @@ -76,6 +77,7 @@ public: { msg_writer_->write_msg_type(buffer); } ObDtlLinkedBuffer *get_buffer() { return buffer_; } void set_row_meta(RowMeta &meta) { meta_ = &meta; } + void set_size_per_buffer(const int64_t size) { size_per_buffer_ = size; } private: int64_t use_row_store_; int64_t tenant_id_; @@ -88,6 +90,7 @@ private: ObDtlVectorFixedMsgWriter vector_fixed_msg_writer_; ObDtlChannelEncoder *msg_writer_; RowMeta *meta_; + int64_t size_per_buffer_; }; class ObDtlBcastService @@ -143,6 +146,7 @@ public: int64_t timeout_ts); int destroy(); void set_row_meta(RowMeta &meta) { dtl_buf_encoder_.set_row_meta(meta); } + void set_size_per_buffer(const int64_t size) { dtl_buf_encoder_.set_size_per_buffer(size); } private: int switch_buffer(int64_t need_size); int send_last_buffer(ObDtlLinkedBuffer *&last_buffer); diff --git a/src/sql/dtl/ob_dtl_channel_mem_manager.cpp b/src/sql/dtl/ob_dtl_channel_mem_manager.cpp index 736f1eaa85..c5f113bf52 100644 --- a/src/sql/dtl/ob_dtl_channel_mem_manager.cpp +++ b/src/sql/dtl/ob_dtl_channel_mem_manager.cpp @@ -99,12 +99,20 @@ ObDtlLinkedBuffer *ObDtlChannelMemManager::alloc(int64_t chid, int64_t size) { int ret = OB_SUCCESS; ObDtlLinkedBuffer *allocated_buf = NULL; + const int64_t size_per_buffer = size_per_buffer_; void *buf = nullptr; - if (size <= size_per_buffer_) { + if (size <= size_per_buffer) { if (OB_SUCC(free_queue_.pop(buf, 0)) && NULL != buf) { - allocated_buf = new (buf) ObDtlLinkedBuffer( - static_cast(buf) + sizeof (ObDtlLinkedBuffer), size_per_buffer_); - allocated_buf->allocated_chid() = chid; + int64_t real_size = (static_cast (buf))->size(); + if (real_size >= size_per_buffer) { + allocated_buf = new (buf) ObDtlLinkedBuffer( + static_cast(buf) + sizeof (ObDtlLinkedBuffer), real_size); + allocated_buf->allocated_chid() = chid; + } else { + real_free(static_cast (buf)); + increase_free_cnt(); + buf = nullptr; + } } else { if (OB_ENTRY_NOT_EXIST == ret) { LOG_TRACE("queue has no element", K(ret), K(seqno_), K(free_queue_.size())); @@ -122,7 +130,7 @@ ObDtlLinkedBuffer *ObDtlChannelMemManager::alloc(int64_t chid, int64_t size) K(max_mem_percent_), K_(memstore_limit_percent), K(allocated_buf), K(size)); } else { const int64_t alloc_size = sizeof (ObDtlLinkedBuffer) - + std::max(size, size_per_buffer_); + + std::max(size, size_per_buffer); char *buf = reinterpret_cast(allocator_.alloc(alloc_size)); if (nullptr == buf) { ret = OB_ALLOCATE_MEMORY_FAILED; @@ -161,7 +169,7 @@ int ObDtlChannelMemManager::free(ObDtlLinkedBuffer *buf, bool auto_free) int ret = OB_SUCCESS; if (NULL != buf) { buf->reset_batch_info(); - if (auto_free && buf->size() <= size_per_buffer_) { + if (auto_free && buf->size() == size_per_buffer_) { if (OB_FAIL(free_queue_.push(buf))) { LOG_TRACE("failed to push back buffer", K(ret), K(seqno_), K(free_queue_.size())); } else { diff --git a/src/sql/dtl/ob_dtl_channel_mem_manager.h b/src/sql/dtl/ob_dtl_channel_mem_manager.h index 6e714dc485..2e70673945 100644 --- a/src/sql/dtl/ob_dtl_channel_mem_manager.h +++ b/src/sql/dtl/ob_dtl_channel_mem_manager.h @@ -128,6 +128,7 @@ OB_INLINE bool ObDtlChannelMemManager::out_of_memory() OB_INLINE void ObDtlChannelMemManager::update_max_memory_percent() { + size_per_buffer_ = GCONF.dtl_buffer_size; get_memstore_limit_percentage_(); get_max_mem_percent(); } diff --git a/src/sql/engine/px/exchange/ob_px_transmit_op.cpp b/src/sql/engine/px/exchange/ob_px_transmit_op.cpp index e1ee4a2c34..4501405ab1 100644 --- a/src/sql/engine/px/exchange/ob_px_transmit_op.cpp +++ b/src/sql/engine/px/exchange/ob_px_transmit_op.cpp @@ -228,6 +228,13 @@ int ObPxTransmitOp::inner_open() } } if (OB_SUCC(ret) && get_spec().use_rich_format_) { + if (dtl::ObDtlMsgType::PX_VECTOR_FIXED == data_msg_type_) { + int64_t size_per_buffer = GCONF.dtl_buffer_size; + chs_agent_.set_size_per_buffer(size_per_buffer); + for (int64_t i = 0; i < task_channels_.count(); ++i) { + task_channels_.at(i)->set_send_buffer_size(size_per_buffer); + } + } if (OB_FAIL(params_.init_keep_order_params(get_spec().max_batch_size_, task_channels_.count(), get_spec().output_.count(),