diff --git a/be/src/common/config.h b/be/src/common/config.h index 391e665f0b..f4b38bad4c 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -515,6 +515,8 @@ CONF_mInt32(storage_flood_stage_usage_percent, "90"); // 90% CONF_mInt64(storage_flood_stage_left_capacity_bytes, "1073741824"); // 1GB // number of thread for flushing memtable per store CONF_Int32(flush_thread_num_per_store, "2"); +// number of thread for flushing memtable per store, for high priority load task +CONF_Int32(high_priority_flush_thread_num_per_store, "1"); // config for tablet meta checkpoint CONF_mInt32(tablet_meta_checkpoint_min_new_rowsets_num, "10"); @@ -670,6 +672,24 @@ CONF_Int32(max_minidump_file_number, "10"); // and the valid values are: 0.9.0, 0.8.2, 0.8.1, 0.8.0. CONF_String(kafka_broker_version_fallback, "0.10.0"); +// The the number of pool siz of routine load consumer. +// If you meet the error describe in https://github.com/edenhill/librdkafka/issues/3608 +// Change this size to 0 to fix it temporarily. +CONF_Int32(routine_load_consumer_pool_size, "10"); + +// When the timeout of a load task is less than this threshold, +// Doris treats it as a high priority task. +// high priority tasks use a separate thread pool for flush and do not block rpc by memory cleanup logic. +// this threshold is mainly used to identify routine load tasks and should not be modified if not necessary. +CONF_mInt32(load_task_high_priority_threshold_second, "120"); + +// The min timeout of load rpc (add batch, close, etc.) +// Because a load rpc may be blocked for a while. +// Increase this config may avoid rpc timeout. +CONF_mInt32(min_load_rpc_timeout_ms, "20000"); + + + } // namespace config } // namespace doris diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 184c4b06f2..1718f7d9d7 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -116,6 +116,8 @@ void NodeChannel::open() { request.set_need_gen_rollup(_parent->_need_gen_rollup); request.set_load_mem_limit(_parent->_load_mem_limit); request.set_load_channel_timeout_s(_parent->_load_channel_timeout_s); + request.set_is_high_priority(_parent->_is_high_priority); + request.set_sender_ip(BackendOptions::get_localhost()); _open_closure = new RefCountClosure(); _open_closure->ref(); @@ -332,8 +334,8 @@ void NodeChannel::cancel(const std::string& cancel_msg) { closure->ref(); int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / NANOS_PER_MILLIS; - if (UNLIKELY(remain_ms < _min_rpc_timeout_ms)) { - remain_ms = _min_rpc_timeout_ms; + if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) { + remain_ms = config::min_load_rpc_timeout_ms; } closure->cntl.set_timeout_ms(remain_ms); if (config::tablet_writer_ignore_eovercrowded) { @@ -387,11 +389,11 @@ void NodeChannel::try_send_batch() { _add_batch_closure->reset(); int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / NANOS_PER_MILLIS; - if (UNLIKELY(remain_ms < _min_rpc_timeout_ms)) { + if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) { if (remain_ms <= 0 && !request.eos()) { cancel(fmt::format("{}, err: timeout", channel_info())); } else { - remain_ms = _min_rpc_timeout_ms; + remain_ms = config::min_load_rpc_timeout_ms; } } _add_batch_closure->cntl.set_timeout_ms(remain_ms); @@ -562,6 +564,7 @@ Status OlapTableSink::prepare(RuntimeState* state) { _sender_id = state->per_fragment_instance_idx(); _num_senders = state->num_per_fragment_instances(); + _is_high_priority = (state->query_options().query_timeout <= config::load_task_high_priority_threshold_second); // profile must add to state's object pool _profile = state->obj_pool()->add(new RuntimeProfile("OlapTableSink")); diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 261912d59e..b5e86f7f92 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -224,7 +224,6 @@ private: // this should be set in init() using config int _rpc_timeout_ms = 60000; - static const int _min_rpc_timeout_ms = 1000; // The min query timeout is 1 second. int64_t _next_packet_seq = 0; MonotonicStopWatch _timeout_watch; @@ -380,6 +379,7 @@ private: // To support multiple senders, we maintain a channel for each sender. int _sender_id = -1; int _num_senders = -1; + bool _is_high_priority = false; // TODO(zc): think about cache this data std::shared_ptr _schema; diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index c9467dd4c4..bc94c71675 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -148,7 +148,8 @@ OLAPStatus DeltaWriter::init() { _reset_mem_table(); // create flush handler - RETURN_NOT_OK(_storage_engine->memtable_flush_executor()->create_flush_token(&_flush_token, writer_context.rowset_type)); + RETURN_NOT_OK(_storage_engine->memtable_flush_executor()->create_flush_token(&_flush_token, + writer_context.rowset_type, _req.is_high_priority)); _is_init = true; return OLAP_SUCCESS; diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index 00e0436774..4b80b571c7 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -48,6 +48,7 @@ struct WriteRequest { TupleDescriptor* tuple_desc; // slots are in order of tablet's schema const std::vector* slots; + bool is_high_priority = false; }; // Writer for a particular (load, index, tablet). diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index d257ab6450..b63074d282 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -88,18 +88,35 @@ void MemTableFlushExecutor::init(const std::vector& data_dirs) { .set_min_threads(min_threads) .set_max_threads(max_threads) .build(&_flush_pool); + + min_threads = std::max(1, config::high_priority_flush_thread_num_per_store); + max_threads = data_dir_num * min_threads; + ThreadPoolBuilder("MemTableHighPriorityFlushThreadPool") + .set_min_threads(min_threads) + .set_max_threads(max_threads) + .build(&_high_prio_flush_pool); } // NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are flushed in order. OLAPStatus MemTableFlushExecutor::create_flush_token( std::unique_ptr* flush_token, - RowsetTypePB rowset_type) { - if (rowset_type == BETA_ROWSET) { - // beta rowset can be flush in CONCURRENT, because each memtable using a new segment writer. - flush_token->reset(new FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT))); + RowsetTypePB rowset_type, bool is_high_priority) { + if (!is_high_priority) { + if (rowset_type == BETA_ROWSET) { + // beta rowset can be flush in CONCURRENT, because each memtable using a new segment writer. + flush_token->reset(new FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT))); + } else { + // alpha rowset do not support flush in CONCURRENT. + flush_token->reset(new FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL))); + } } else { - // alpha rowset do not support flush in CONCURRENT. - flush_token->reset(new FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL))); + if (rowset_type == BETA_ROWSET) { + // beta rowset can be flush in CONCURRENT, because each memtable using a new segment writer. + flush_token->reset(new FlushToken(_high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT))); + } else { + // alpha rowset do not support flush in CONCURRENT. + flush_token->reset(new FlushToken(_high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL))); + } } return OLAP_SUCCESS; } diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h index 8b81bde524..c880b3c307 100644 --- a/be/src/olap/memtable_flush_executor.h +++ b/be/src/olap/memtable_flush_executor.h @@ -92,7 +92,10 @@ private: class MemTableFlushExecutor { public: MemTableFlushExecutor() {} - ~MemTableFlushExecutor() { _flush_pool->shutdown(); } + ~MemTableFlushExecutor() { + _flush_pool->shutdown(); + _high_prio_flush_pool->shutdown(); + } // init should be called after storage engine is opened, // because it needs path hash of each data dir. @@ -100,10 +103,11 @@ public: OLAPStatus create_flush_token( std::unique_ptr* flush_token, - RowsetTypePB rowset_type); + RowsetTypePB rowset_type, bool is_high_priority); private: std::unique_ptr _flush_pool; + std::unique_ptr _high_prio_flush_pool; }; } // namespace doris diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index 5834777216..27352a01fc 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -24,8 +24,8 @@ namespace doris { LoadChannel::LoadChannel(const UniqueId& load_id, int64_t mem_limit, int64_t timeout_s, - const std::shared_ptr& mem_tracker) - : _load_id(load_id), _timeout_s(timeout_s) { + const std::shared_ptr& mem_tracker, bool is_high_priority) + : _load_id(load_id), _timeout_s(timeout_s), _is_high_priority(is_high_priority) { _mem_tracker = MemTracker::CreateTracker( mem_limit, "LoadChannel:" + _load_id.to_string(), mem_tracker, true, false, MemTrackerLevel::TASK); // _last_updated_time should be set before being inserted to @@ -36,11 +36,15 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t mem_limit, int64_t tim LoadChannel::~LoadChannel() { LOG(INFO) << "load channel mem peak usage=" << _mem_tracker->peak_consumption() - << ", info=" << _mem_tracker->debug_string() << ", load_id=" << _load_id; + << ", info=" << _mem_tracker->debug_string() << ", load_id=" << _load_id + << ", is high priority=" << _is_high_priority << ", sender_ip=" << _sender_ip; } Status LoadChannel::open(const PTabletWriterOpenRequest& params) { int64_t index_id = params.index_id(); + if (params.has_sender_ip()) { + _sender_ip = params.sender_ip(); + } std::shared_ptr channel; { std::lock_guard l(_lock); @@ -50,7 +54,7 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& params) { } else { // create a new tablets channel TabletsChannelKey key(params.id(), index_id); - channel.reset(new TabletsChannel(key, _mem_tracker)); + channel.reset(new TabletsChannel(key, _mem_tracker, _is_high_priority)); _tablets_channels.insert({index_id, channel}); } } diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index d0ee17a140..257dba8548 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -39,7 +39,7 @@ class TabletsChannel; class LoadChannel { public: LoadChannel(const UniqueId& load_id, int64_t mem_limit, int64_t timeout_s, - const std::shared_ptr& mem_tracker); + const std::shared_ptr& mem_tracker, bool is_high_priority); ~LoadChannel(); // open a new load channel if not exist @@ -68,6 +68,8 @@ public: int64_t timeout() const { return _timeout_s; } + bool is_high_priority() const { return _is_high_priority; } + private: // when mem consumption exceeds limit, should call this method to find the channel // that consumes the largest memory(, and then we can reduce its memory usage). @@ -91,11 +93,18 @@ private: // the timeout of this load job. // Timed out channels will be periodically deleted by LoadChannelMgr. int64_t _timeout_s; + + // true if this is a high priority load task + bool _is_high_priority = false; + + // the ip where tablet sink locate + std::string _sender_ip = ""; }; inline std::ostream& operator<<(std::ostream& os, const LoadChannel& load_channel) { os << "LoadChannel(id=" << load_channel.load_id() << ", mem=" << load_channel.mem_consumption() - << ", last_update_time=" << static_cast(load_channel.last_updated_time()) << ")"; + << ", last_update_time=" << static_cast(load_channel.last_updated_time()) + << ", is high priority: " << load_channel.is_high_priority() << ")"; return os; } diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index f0110aaa3f..0b3367a1ce 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -111,7 +111,8 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) { params.has_load_channel_timeout_s() ? params.load_channel_timeout_s() : -1; int64_t job_timeout_s = calc_job_timeout_s(timeout_in_req_s); - channel.reset(new LoadChannel(load_id, job_max_memory, job_timeout_s, _mem_tracker)); + bool is_high_priority = (params.has_is_high_priority() && params.is_high_priority()); + channel.reset(new LoadChannel(load_id, job_max_memory, job_timeout_s, _mem_tracker, is_high_priority)); _load_channels.insert({load_id, channel}); } } @@ -145,8 +146,12 @@ Status LoadChannelMgr::add_batch(const PTabletWriterAddBatchRequest& request, channel = it->second; } - // 2. check if mem consumption exceed limit - _handle_mem_exceed_limit(); + if (!channel->is_high_priority()) { + // 2. check if mem consumption exceed limit + // If this is a high priority load task, do not handle this. + // because this may block for a while, which may lead to rpc timeout. + _handle_mem_exceed_limit(); + } // 3. add batch to load channel // batch may not exist in request(eg: eos request without batch), @@ -178,6 +183,11 @@ void LoadChannelMgr::_handle_mem_exceed_limit() { int64_t max_consume = 0; std::shared_ptr channel; for (auto& kv : _load_channels) { + if (kv.second->is_high_priority()) { + // do not select high priority channel to reduce memory + // to avoid blocking them. + continue; + } if (kv.second->mem_consumption() > max_consume) { max_consume = kv.second->mem_consumption(); channel = kv.second; diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index 8cc68a65df..1504d1171b 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -39,7 +39,7 @@ RoutineLoadTaskExecutor::RoutineLoadTaskExecutor(ExecEnv* exec_env) : _exec_env(exec_env), _thread_pool(config::routine_load_thread_pool_size, config::routine_load_thread_pool_size), - _data_consumer_pool(config::routine_load_thread_pool_size) { + _data_consumer_pool(config::routine_load_consumer_pool_size) { REGISTER_HOOK_METRIC(routine_load_task_count, [this]() { std::lock_guard l(_lock); return _task_map.size(); diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index a49d8476d8..68370f664a 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -32,8 +32,9 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_writer_count, MetricUnit::NOUNIT); std::atomic TabletsChannel::_s_tablet_writer_count; TabletsChannel::TabletsChannel(const TabletsChannelKey& key, - const std::shared_ptr& mem_tracker) - : _key(key), _state(kInitialized), _closed_senders(64) { + const std::shared_ptr& mem_tracker, + bool is_high_priority) + : _key(key), _state(kInitialized), _closed_senders(64), _is_high_priority(is_high_priority) { _mem_tracker = MemTracker::CreateTracker(-1, "TabletsChannel", mem_tracker); static std::once_flag once_flag; std::call_once(once_flag, [] { @@ -274,6 +275,7 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& params) request.need_gen_rollup = params.need_gen_rollup(); request.tuple_desc = _tuple_desc; request.slots = index_slots; + request.is_high_priority = _is_high_priority; DeltaWriter* writer = nullptr; auto st = DeltaWriter::open(&request, _mem_tracker, &writer); diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index 9f87e8afbf..11144cbb2d 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -54,7 +54,7 @@ class OlapTableSchemaParam; // Write channel for a particular (load, index). class TabletsChannel { public: - TabletsChannel(const TabletsChannelKey& key, const std::shared_ptr& mem_tracker); + TabletsChannel(const TabletsChannelKey& key, const std::shared_ptr& mem_tracker, bool is_high_priority); ~TabletsChannel(); @@ -124,6 +124,8 @@ private: std::shared_ptr _mem_tracker; static std::atomic _s_tablet_writer_count; + + bool _is_high_priority = false; }; } // namespace doris diff --git a/docs/en/administrator-guide/config/be_config.md b/docs/en/administrator-guide/config/be_config.md index 6604041ab5..98d0c9b099 100644 --- a/docs/en/administrator-guide/config/be_config.md +++ b/docs/en/administrator-guide/config/be_config.md @@ -1474,3 +1474,30 @@ The default value is currently only an empirical value, and may need to be modif * Type: bool * Description: When obtaining a brpc connection, judge the availability of the connection through hand_shake rpc, and re-establish the connection if it is not available 。 * Default value: false + +### `high_priority_flush_thread_num_per_store` + +* Type: int32 +* Description: The number of flush threads per store path allocated for the high priority import task. +* Default value: 1 + +### `routine_load_consumer_pool_size` + +* Type: int32 +* Description: The number of caches for the data consumer used by the routine load. +* Default: 10 + +### `load_task_high_priority_threshold_second` + +* Type: int32 +* Description: When the timeout of an import task is less than this threshold, Doris will consider it to be a high priority task. High priority tasks use a separate pool of flush threads. +* Default: 120 + +### `min_load_rpc_timeout_ms` + +* Type: int32 +* Description: The minimum timeout for each rpc in the load job. +* Default: 20 + +Translated with www.DeepL.com/Translator (free version) + diff --git a/docs/zh-CN/administrator-guide/config/be_config.md b/docs/zh-CN/administrator-guide/config/be_config.md index ccd27c76c8..83ce156358 100644 --- a/docs/zh-CN/administrator-guide/config/be_config.md +++ b/docs/zh-CN/administrator-guide/config/be_config.md @@ -1493,3 +1493,27 @@ webserver默认工作线程数 * 类型: bool * 描述: 获取brpc连接时,通过hand_shake rpc 判断连接的可用性,如果不可用则重新建立连接 * 默认值: false + +### `high_priority_flush_thread_num_per_store` + +* 类型:int32 +* 描述:每个存储路径所分配的用于高优导入任务的 flush 线程数量。 +* 默认值:1 + +### `routine_load_consumer_pool_size` + +* 类型:int32 +* 描述:routine load 所使用的 data consumer 的缓存数量。 +* 默认值:10 + +### `load_task_high_priority_threshold_second` + +* 类型:int32 +* 描述:当一个导入任务的超时时间小于这个阈值是,Doris 将认为他是一个高优任务。高优任务会使用独立的 flush 线程池。 +* 默认:120 + +### `min_load_rpc_timeout_ms` + +* 类型:int32 +* 描述:load 作业中各个rpc 的最小超时时间。 +* 默认:20 diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 080c8d328c..7de412d1a3 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -74,6 +74,8 @@ message PTabletWriterOpenRequest { required bool need_gen_rollup = 7; optional int64 load_mem_limit = 8; optional int64 load_channel_timeout_s = 9; + optional bool is_high_priority = 10 [default = false]; + optional string sender_ip = 11 [default = ""]; }; message PTabletWriterOpenResult { @@ -100,6 +102,7 @@ message PTabletWriterAddBatchRequest { optional int64 backend_id = 9 [default = -1]; // transfer the RowBatch to the Controller Attachment optional bool transfer_by_attachment = 10 [default = false]; + optional bool is_high_priority = 11 [default = false]; }; message PTabletWriterAddBatchResult {