From db523dafcb2ee2973753aeebe27b025a7b523719 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Thu, 21 Dec 2023 12:19:58 +0800 Subject: [PATCH] [improve](move-memtable) limit task num in load stream flush token (#28748) --- be/src/common/config.cpp | 2 ++ be/src/common/config.h | 2 ++ be/src/runtime/load_stream.cpp | 13 +++++++++++-- 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index e75ac2c1c7..0c4caecc36 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -776,6 +776,8 @@ DEFINE_Int64(load_stream_max_buf_size, "20971520"); // 20MB DEFINE_Int32(load_stream_messages_in_batch, "128"); // brpc streaming StreamWait seconds on EAGAIN DEFINE_Int32(load_stream_eagain_wait_seconds, "60"); +// max tasks per flush token in load stream +DEFINE_Int32(load_stream_flush_token_max_tasks, "2"); // max send batch parallelism for OlapTableSink // The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job, diff --git a/be/src/common/config.h b/be/src/common/config.h index c911963a87..6565d15d9f 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -837,6 +837,8 @@ DECLARE_Int64(load_stream_max_buf_size); DECLARE_Int32(load_stream_messages_in_batch); // brpc streaming StreamWait seconds on EAGAIN DECLARE_Int32(load_stream_eagain_wait_seconds); +// max tasks per flush token in load stream +DECLARE_Int32(load_stream_flush_token_max_tasks); // max send batch parallelism for OlapTableSink // The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job, diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 9d05d48f54..313728091c 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -18,6 +18,7 @@ #include "runtime/load_stream.h" #include +#include #include #include #include @@ -136,7 +137,11 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data LOG(INFO) << "write data failed " << *this; } }; - return _flush_tokens[new_segid % _flush_tokens.size()]->submit_func(flush_func); + auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()]; + while (flush_token->num_tasks() >= config::load_stream_flush_token_max_tasks) { + bthread_usleep(10 * 1000); // 10ms + } + return flush_token->submit_func(flush_func); } Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data) { @@ -170,7 +175,11 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data LOG(INFO) << "add segment failed " << *this; } }; - return _flush_tokens[new_segid % _flush_tokens.size()]->submit_func(add_segment_func); + auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()]; + while (flush_token->num_tasks() >= config::load_stream_flush_token_max_tasks) { + bthread_usleep(10 * 1000); // 10ms + } + return flush_token->submit_func(add_segment_func); } Status TabletStream::close() {