make dtl_buffer_size effective immediately

This commit is contained in:
18523270951@163.com
2024-09-28 06:15:53 +00:00
committed by ob-robot
parent 656ae36701
commit 8a1d0f10b5
7 changed files with 35 additions and 11 deletions

View File

@ -990,6 +990,7 @@ int ObDtlBasicChannel::switch_writer(const ObDtlMsg &msg)
} else if (DtlWriterType::CHUNK_DATUM_WRITER == msg_writer_map[px_row.get_data_type()]) {
msg_writer_ = &datum_msg_writer_;
} else if (DtlWriterType::VECTOR_FIXED_WRITER == msg_writer_map[px_row.get_data_type()]) {
vector_fixed_msg_writer_.set_size_per_buffer(send_buffer_size_);
msg_writer_ = &vector_fixed_msg_writer_;
} else if (DtlWriterType::VECTOR_ROW_WRITER == msg_writer_map[px_row.get_data_type()]) {
vector_row_msg_writer_.set_row_meta(meta_);
@ -1521,7 +1522,7 @@ int ObDtlVectorMsgWriter::serialize()
//--------------end ObDtlVectorsFixedMsgWriter---------------
ObDtlVectorFixedMsgWriter::ObDtlVectorFixedMsgWriter() :
type_(VECTOR_FIXED_WRITER), write_buffer_(nullptr), vector_buffer_(),
write_ret_(OB_SUCCESS)
write_ret_(OB_SUCCESS), size_per_buffer_(-1)
{}
ObDtlVectorFixedMsgWriter::~ObDtlVectorFixedMsgWriter()
@ -1533,12 +1534,12 @@ int ObDtlVectorFixedMsgWriter::init(ObDtlLinkedBuffer *buffer, uint64_t tenant_i
{
int ret = OB_SUCCESS;
UNUSED(tenant_id);
if (nullptr == buffer) {
if (nullptr == buffer || size_per_buffer_ < 0 || buffer->size() < size_per_buffer_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("write buffer is null", K(ret));
LOG_WARN("write buffer is null", K(ret), K(size_per_buffer_), K(buffer->size()), K(lbt()));
} else {
reset();
vector_buffer_.set_buf(buffer->buf(), buffer->size());
vector_buffer_.set_buf(buffer->buf(), size_per_buffer_); /*keep fixed msg use buffer with same size*/
write_buffer_ = buffer;
}
return ret;

View File

@ -563,11 +563,13 @@ public:
{
buffer->msg_type() = ObDtlMsgType::PX_VECTOR_FIXED;
}
void set_size_per_buffer(const int64_t size) { size_per_buffer_ = size; }
private:
DtlWriterType type_;
ObDtlLinkedBuffer *write_buffer_;
ObDtlVectors vector_buffer_;
int write_ret_;
int64_t size_per_buffer_;
};
OB_INLINE int ObDtlVectorFixedMsgWriter::write(

View File

@ -33,6 +33,7 @@ int ObDtlBufEncoder::switch_writer(const ObDtlMsg &msg)
} else if (DtlWriterType::CHUNK_DATUM_WRITER == msg_writer_map[px_row.get_data_type()]) {
msg_writer_ = &datum_msg_writer_;
} else if (DtlWriterType::VECTOR_FIXED_WRITER == msg_writer_map[px_row.get_data_type()]) {
vector_fixed_msg_writer_.set_size_per_buffer(size_per_buffer_);
msg_writer_ = &vector_fixed_msg_writer_;
} else if (DtlWriterType::VECTOR_ROW_WRITER == msg_writer_map[px_row.get_data_type()]) {
vector_row_msg_writer_.set_row_meta(meta_);

View File

@ -39,7 +39,8 @@ public:
tenant_id_(500),
buffer_(nullptr),
msg_writer_(nullptr),
meta_(nullptr)
meta_(nullptr),
size_per_buffer_(-1)
{}
~ObDtlBufEncoder() {}
void set_tenant_id(int64_t tenant_id) {
@ -76,6 +77,7 @@ public:
{ msg_writer_->write_msg_type(buffer); }
ObDtlLinkedBuffer *get_buffer() { return buffer_; }
void set_row_meta(RowMeta &meta) { meta_ = &meta; }
void set_size_per_buffer(const int64_t size) { size_per_buffer_ = size; }
private:
int64_t use_row_store_;
int64_t tenant_id_;
@ -88,6 +90,7 @@ private:
ObDtlVectorFixedMsgWriter vector_fixed_msg_writer_;
ObDtlChannelEncoder *msg_writer_;
RowMeta *meta_;
int64_t size_per_buffer_;
};
class ObDtlBcastService
@ -143,6 +146,7 @@ public:
int64_t timeout_ts);
int destroy();
void set_row_meta(RowMeta &meta) { dtl_buf_encoder_.set_row_meta(meta); }
void set_size_per_buffer(const int64_t size) { dtl_buf_encoder_.set_size_per_buffer(size); }
private:
int switch_buffer(int64_t need_size);
int send_last_buffer(ObDtlLinkedBuffer *&last_buffer);

View File

@ -99,12 +99,20 @@ ObDtlLinkedBuffer *ObDtlChannelMemManager::alloc(int64_t chid, int64_t size)
{
int ret = OB_SUCCESS;
ObDtlLinkedBuffer *allocated_buf = NULL;
const int64_t size_per_buffer = size_per_buffer_;
void *buf = nullptr;
if (size <= size_per_buffer_) {
if (size <= size_per_buffer) {
if (OB_SUCC(free_queue_.pop(buf, 0)) && NULL != buf) {
allocated_buf = new (buf) ObDtlLinkedBuffer(
static_cast<char *>(buf) + sizeof (ObDtlLinkedBuffer), size_per_buffer_);
allocated_buf->allocated_chid() = chid;
int64_t real_size = (static_cast<ObDtlLinkedBuffer *> (buf))->size();
if (real_size >= size_per_buffer) {
allocated_buf = new (buf) ObDtlLinkedBuffer(
static_cast<char *>(buf) + sizeof (ObDtlLinkedBuffer), real_size);
allocated_buf->allocated_chid() = chid;
} else {
real_free(static_cast<ObDtlLinkedBuffer *> (buf));
increase_free_cnt();
buf = nullptr;
}
} else {
if (OB_ENTRY_NOT_EXIST == ret) {
LOG_TRACE("queue has no element", K(ret), K(seqno_), K(free_queue_.size()));
@ -122,7 +130,7 @@ ObDtlLinkedBuffer *ObDtlChannelMemManager::alloc(int64_t chid, int64_t size)
K(max_mem_percent_), K_(memstore_limit_percent), K(allocated_buf), K(size));
} else {
const int64_t alloc_size = sizeof (ObDtlLinkedBuffer)
+ std::max(size, size_per_buffer_);
+ std::max(size, size_per_buffer);
char *buf = reinterpret_cast<char*>(allocator_.alloc(alloc_size));
if (nullptr == buf) {
ret = OB_ALLOCATE_MEMORY_FAILED;
@ -161,7 +169,7 @@ int ObDtlChannelMemManager::free(ObDtlLinkedBuffer *buf, bool auto_free)
int ret = OB_SUCCESS;
if (NULL != buf) {
buf->reset_batch_info();
if (auto_free && buf->size() <= size_per_buffer_) {
if (auto_free && buf->size() == size_per_buffer_) {
if (OB_FAIL(free_queue_.push(buf))) {
LOG_TRACE("failed to push back buffer", K(ret), K(seqno_), K(free_queue_.size()));
} else {

View File

@ -128,6 +128,7 @@ OB_INLINE bool ObDtlChannelMemManager::out_of_memory()
OB_INLINE void ObDtlChannelMemManager::update_max_memory_percent()
{
size_per_buffer_ = GCONF.dtl_buffer_size;
get_memstore_limit_percentage_();
get_max_mem_percent();
}

View File

@ -228,6 +228,13 @@ int ObPxTransmitOp::inner_open()
}
}
if (OB_SUCC(ret) && get_spec().use_rich_format_) {
if (dtl::ObDtlMsgType::PX_VECTOR_FIXED == data_msg_type_) {
int64_t size_per_buffer = GCONF.dtl_buffer_size;
chs_agent_.set_size_per_buffer(size_per_buffer);
for (int64_t i = 0; i < task_channels_.count(); ++i) {
task_channels_.at(i)->set_send_buffer_size(size_per_buffer);
}
}
if (OB_FAIL(params_.init_keep_order_params(get_spec().max_batch_size_,
task_channels_.count(),
get_spec().output_.count(),