diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 474483d9d7..a14ad0e770 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1166,10 +1166,15 @@ DEFINE_String(spill_storage_root_path, ""); DEFINE_String(spill_storage_limit, "20%"); // 20% DEFINE_mInt32(spill_gc_interval_ms, "2000"); // 2s DEFINE_mInt32(spill_gc_file_count, "2000"); -DEFINE_Int32(spill_io_thread_pool_per_disk_thread_num, "2"); -DEFINE_Int32(spill_io_thread_pool_queue_size, "1024"); -DEFINE_Int32(spill_async_task_thread_pool_thread_num, "2"); -DEFINE_Int32(spill_async_task_thread_pool_queue_size, "1024"); +DEFINE_Int32(spill_io_thread_pool_thread_num, "-1"); +DEFINE_Validator(spill_io_thread_pool_thread_num, [](const int config) -> bool { + if (config == -1) { + CpuInfo::init(); + spill_io_thread_pool_thread_num = std::max(48, CpuInfo::num_cores() * 2); + } + return true; +}); +DEFINE_Int32(spill_io_thread_pool_queue_size, "102400"); DEFINE_mBool(check_segment_when_build_rowset_meta, "false"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 6efb4d74cb..e94934ae72 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1248,10 +1248,8 @@ DECLARE_String(spill_storage_root_path); DECLARE_String(spill_storage_limit); DECLARE_mInt32(spill_gc_interval_ms); DECLARE_mInt32(spill_gc_file_count); -DECLARE_Int32(spill_io_thread_pool_per_disk_thread_num); +DECLARE_Int32(spill_io_thread_pool_thread_num); DECLARE_Int32(spill_io_thread_pool_queue_size); -DECLARE_Int32(spill_async_task_thread_pool_thread_num); -DECLARE_Int32(spill_async_task_thread_pool_queue_size); DECLARE_mBool(check_segment_when_build_rowset_meta); diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 55a0650dc1..78079a0ddf 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -253,7 +253,7 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) { MonotonicStopWatch submit_timer; submit_timer.start(); - status = ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func( + status = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func( [this, &parent, state, query_id, execution_context, submit_timer] { auto execution_context_lock = execution_context.lock(); if (!execution_context_lock) { diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 7ec582905d..1755cd866f 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -83,7 +83,7 @@ public: for (int i = 0; i < Base::_shared_state->partition_count && !state->is_cancelled(); ++i) { if (spill_infos[i].keys_.size() >= spill_batch_rows) { - status = _async_spill_partition_and_wait( + status = _spill_partition( state, context, Base::_shared_state->spill_partitions[i], spill_infos[i].keys_, spill_infos[i].values_, nullptr, false); RETURN_IF_ERROR(status); @@ -98,13 +98,13 @@ public: auto spill_null_key_data = (hash_null_key_data && i == Base::_shared_state->partition_count - 1); if (spill_infos[i].keys_.size() > 0 || spill_null_key_data) { - status = _async_spill_partition_and_wait( - state, context, Base::_shared_state->spill_partitions[i], - spill_infos[i].keys_, spill_infos[i].values_, - spill_null_key_data ? hash_table.template get_null_key_data< - vectorized::AggregateDataPtr>() - : nullptr, - true); + status = _spill_partition(state, context, Base::_shared_state->spill_partitions[i], + spill_infos[i].keys_, spill_infos[i].values_, + spill_null_key_data + ? hash_table.template get_null_key_data< + vectorized::AggregateDataPtr>() + : nullptr, + true); RETURN_IF_ERROR(status); } } @@ -120,12 +120,10 @@ public: } template - Status _async_spill_partition_and_wait(RuntimeState* state, HashTableCtxType& context, - AggSpillPartitionSPtr& spill_partition, - std::vector& keys, - std::vector& values, - const vectorized::AggregateDataPtr null_key_data, - bool is_last) { + Status _spill_partition(RuntimeState* state, HashTableCtxType& context, + AggSpillPartitionSPtr& spill_partition, std::vector& keys, + std::vector& values, + const vectorized::AggregateDataPtr null_key_data, bool is_last) { vectorized::SpillStreamSPtr spill_stream; auto status = spill_partition->get_spill_stream(state, Base::_parent->node_id(), Base::profile(), spill_stream); @@ -148,27 +146,15 @@ public: keys.clear(); values.clear(); } - status = spill_stream->prepare_spill(); RETURN_IF_ERROR(status); - status = ExecEnv::GetInstance() - ->spill_stream_mgr() - ->get_spill_io_thread_pool(spill_stream->get_spill_root_dir()) - ->submit_func([this, state, &spill_stream] { - (void)state; // avoid ut compile error - SCOPED_ATTACH_TASK(state); - SCOPED_TIMER(_spill_write_disk_timer); - Status status; - Defer defer {[&]() { spill_stream->end_spill(status); }}; - status = spill_stream->spill_block(state, block_, false); - return status; - }); - if (!status.ok()) { - spill_stream->end_spill(status); + { + SCOPED_TIMER(_spill_write_disk_timer); + status = spill_stream->spill_block(state, block_, false); } RETURN_IF_ERROR(status); - status = spill_partition->wait_spill(state); + status = spill_partition->flush_if_full(); _reset_tmp_data(); return status; } diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index a5753ef765..ff4795f207 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -211,7 +211,7 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime submit_timer.start(); RETURN_IF_ERROR( - ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func( + ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func( [this, state, query_id, execution_context, submit_timer] { auto execution_context_lock = execution_context.lock(); if (!execution_context_lock) { diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 03ca1299b4..2784522c9f 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -175,8 +175,7 @@ Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state _spill_write_wait_io_timer); } - auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool( - build_spilling_stream->get_spill_root_dir()); + auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); auto execution_context = state->get_task_execution_context(); _shared_state_holder = _shared_state->shared_from_this(); MonotonicStopWatch submit_timer; @@ -230,8 +229,7 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat _spill_write_wait_io_timer); } - auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool( - spilling_stream->get_spill_root_dir()); + auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); auto& blocks = _probe_blocks[partition_index]; auto& partitioned_block = _partitioned_blocks[partition_index]; @@ -296,7 +294,6 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat Status PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_index) { auto& build_spilling_stream = _shared_state->spilled_streams[partition_index]; if (build_spilling_stream) { - build_spilling_stream->end_spill(Status::OK()); RETURN_IF_ERROR(build_spilling_stream->spill_eof()); build_spilling_stream->set_read_counters(_spill_read_data_time, _spill_deserialize_time, _spill_read_bytes, _spill_read_wait_io_timer); @@ -305,7 +302,6 @@ Status PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in auto& probe_spilling_stream = _probe_spilling_streams[partition_index]; if (probe_spilling_stream) { - probe_spilling_stream->end_spill(Status::OK()); RETURN_IF_ERROR(probe_spilling_stream->spill_eof()); probe_spilling_stream->set_read_counters(_spill_read_data_time, _spill_deserialize_time, _spill_read_bytes, _spill_read_wait_io_timer); @@ -387,7 +383,7 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti _dependency->set_ready(); }; - auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool(); + auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); has_data = true; _dependency->block(); @@ -453,7 +449,7 @@ Status PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti _dependency->set_ready(); }; - auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool(); + auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); DCHECK(spill_io_pool != nullptr); _dependency->block(); has_data = true; diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index e8e3944308..69f1cab6f4 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -219,7 +219,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta _dependency->set_ready(); }; - auto* thread_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool(); + auto* thread_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); return thread_pool->submit_func(spill_func); } @@ -246,8 +246,8 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { DCHECK(spilling_stream != nullptr); - auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool( - spilling_stream->get_spill_root_dir()); + auto* spill_io_pool = + ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); DCHECK(spill_io_pool != nullptr); auto execution_context = state->get_task_execution_context(); _shared_state_holder = _shared_state->shared_from_this(); diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 48b1670ca1..2fdd78b40b 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -232,80 +232,74 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) { MonotonicStopWatch submit_timer; submit_timer.start(); - status = ExecEnv::GetInstance() - ->spill_stream_mgr() - ->get_spill_io_thread_pool(_spilling_stream->get_spill_root_dir()) - ->submit_func([this, state, query_id, &parent, execution_context, - submit_timer] { - auto execution_context_lock = execution_context.lock(); - if (!execution_context_lock) { - LOG(INFO) << "query " << print_id(query_id) - << " execution_context released, maybe query was cancelled."; - return Status::OK(); - } + status = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func( + [this, state, query_id, &parent, execution_context, submit_timer] { + auto execution_context_lock = execution_context.lock(); + if (!execution_context_lock) { + LOG(INFO) << "query " << print_id(query_id) + << " execution_context released, maybe query was cancelled."; + return Status::OK(); + } - _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); - SCOPED_ATTACH_TASK(state); - Defer defer {[&]() { - if (!_shared_state->sink_status.ok() || state->is_cancelled()) { - if (!_shared_state->sink_status.ok()) { - LOG(WARNING) << "query " << print_id(query_id) << " sort node " - << _parent->id() << " revoke memory error: " - << _shared_state->sink_status; - } - _shared_state->close(); - } else { - VLOG_DEBUG << "query " << print_id(query_id) << " sort node " - << _parent->id() << " revoke memory finish"; - } + _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); + SCOPED_ATTACH_TASK(state); + Defer defer {[&]() { + if (!_shared_state->sink_status.ok() || state->is_cancelled()) { + if (!_shared_state->sink_status.ok()) { + LOG(WARNING) << "query " << print_id(query_id) << " sort node " + << _parent->id() + << " revoke memory error: " << _shared_state->sink_status; + } + _shared_state->close(); + } else { + VLOG_DEBUG << "query " << print_id(query_id) << " sort node " + << _parent->id() << " revoke memory finish"; + } - _spilling_stream->end_spill(_shared_state->sink_status); - if (!_shared_state->sink_status.ok()) { - _shared_state->close(); - } + if (!_shared_state->sink_status.ok()) { + _shared_state->close(); + } - _spilling_stream.reset(); - if (_eos) { - _dependency->set_ready_to_read(); - _finish_dependency->set_ready(); - } else { - _dependency->Dependency::set_ready(); - } - }}; + _spilling_stream.reset(); + if (_eos) { + _dependency->set_ready_to_read(); + _finish_dependency->set_ready(); + } else { + _dependency->Dependency::set_ready(); + } + }}; - _shared_state->sink_status = parent._sort_sink_operator->prepare_for_spill( - _runtime_state.get()); - RETURN_IF_ERROR(_shared_state->sink_status); + _shared_state->sink_status = + parent._sort_sink_operator->prepare_for_spill(_runtime_state.get()); + RETURN_IF_ERROR(_shared_state->sink_status); - auto* sink_local_state = _runtime_state->get_sink_local_state(); - update_profile(sink_local_state->profile()); + auto* sink_local_state = _runtime_state->get_sink_local_state(); + update_profile(sink_local_state->profile()); - bool eos = false; - vectorized::Block block; - while (!eos && !state->is_cancelled()) { - { - SCOPED_TIMER(_spill_merge_sort_timer); - _shared_state->sink_status = - parent._sort_sink_operator->merge_sort_read_for_spill( - _runtime_state.get(), &block, - _shared_state->spill_block_batch_row_count, &eos); - } - RETURN_IF_ERROR(_shared_state->sink_status); - { - SCOPED_TIMER(Base::_spill_timer); - _shared_state->sink_status = - _spilling_stream->spill_block(state, block, eos); - } - RETURN_IF_ERROR(_shared_state->sink_status); - block.clear_column_data(); - } - parent._sort_sink_operator->reset(_runtime_state.get()); + bool eos = false; + vectorized::Block block; + while (!eos && !state->is_cancelled()) { + { + SCOPED_TIMER(_spill_merge_sort_timer); + _shared_state->sink_status = + parent._sort_sink_operator->merge_sort_read_for_spill( + _runtime_state.get(), &block, + _shared_state->spill_block_batch_row_count, &eos); + } + RETURN_IF_ERROR(_shared_state->sink_status); + { + SCOPED_TIMER(Base::_spill_timer); + _shared_state->sink_status = + _spilling_stream->spill_block(state, block, eos); + } + RETURN_IF_ERROR(_shared_state->sink_status); + block.clear_column_data(); + } + parent._sort_sink_operator->reset(_runtime_state.get()); - return Status::OK(); - }); + return Status::OK(); + }); if (!status.ok()) { - _spilling_stream->end_spill(status); - if (!_eos) { Base::_dependency->Dependency::set_ready(); } diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp b/be/src/pipeline/exec/spill_sort_source_operator.cpp index 37b76425b1..a086fcc43e 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp @@ -152,7 +152,6 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat _status = tmp_stream->prepare_spill(); RETURN_IF_ERROR(_status); - Defer defer {[&]() { tmp_stream->end_spill(_status); }}; _shared_state->sorted_streams.emplace_back(tmp_stream); bool eos = false; @@ -177,7 +176,7 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat } return Status::OK(); }; - return ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func( + return ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func( spill_func); } diff --git a/be/src/pipeline/pipeline_x/dependency.cpp b/be/src/pipeline/pipeline_x/dependency.cpp index 6dccf15dbd..ec890dac7f 100644 --- a/be/src/pipeline/pipeline_x/dependency.cpp +++ b/be/src/pipeline/pipeline_x/dependency.cpp @@ -231,7 +231,6 @@ Status AggSpillPartition::get_spill_stream(RuntimeState* state, int node_id, } void AggSpillPartition::close() { if (spilling_stream_) { - (void)spilling_stream_->wait_spill(); spilling_stream_.reset(); } for (auto& stream : spill_streams_) { diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index ebd44bec4d..d663b0aae8 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -445,11 +445,9 @@ struct AggSpillPartition { Status get_spill_stream(RuntimeState* state, int node_id, RuntimeProfile* profile, vectorized::SpillStreamSPtr& spilling_stream); - // wait for current bock spilling to finish - Status wait_spill(RuntimeState* state) { + Status flush_if_full() { DCHECK(spilling_stream_); - auto status = spilling_stream_->wait_spill(); - RETURN_IF_ERROR(status); + Status status; // avoid small spill files if (spilling_stream_->get_written_bytes() >= AGG_SPILL_FILE_SIZE) { status = spilling_stream_->spill_eof(); diff --git a/be/src/vec/spill/spill_stream.cpp b/be/src/vec/spill/spill_stream.cpp index ed7be9a0b2..e4631f1e1c 100644 --- a/be/src/vec/spill/spill_stream.cpp +++ b/be/src/vec/spill/spill_stream.cpp @@ -42,10 +42,7 @@ SpillStream::SpillStream(RuntimeState* state, int64_t stream_id, SpillDataDir* d spill_dir_(std::move(spill_dir)), batch_rows_(batch_rows), batch_bytes_(batch_bytes), - profile_(profile) { - io_thread_pool_ = - ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(data_dir->path()); -} + profile_(profile) {} SpillStream::~SpillStream() { bool exists = false; @@ -70,14 +67,6 @@ void SpillStream::close() { } VLOG_ROW << "closing: " << stream_id_; closed_ = true; - if (spill_promise_) { - spill_future_.wait(); - spill_promise_.reset(); - } - if (read_promise_) { - read_future_.wait(); - read_promise_.reset(); - } if (writer_) { (void)writer_->close(); @@ -97,25 +86,7 @@ const std::string& SpillStream::get_spill_root_dir() const { return data_dir_->path(); } Status SpillStream::prepare_spill() { - DCHECK(!spill_promise_); - RETURN_IF_ERROR(writer_->open()); - - spill_promise_ = std::make_unique>(); - spill_future_ = spill_promise_->get_future(); - return Status::OK(); -} -void SpillStream::end_spill(const Status& status) { - spill_promise_->set_value(status); -} - -Status SpillStream::wait_spill() { - if (spill_promise_) { - SCOPED_TIMER(write_wait_io_timer_); - auto status = spill_future_.get(); - spill_promise_.reset(); - return status; - } - return Status::OK(); + return writer_->open(); } Status SpillStream::spill_block(RuntimeState* state, const Block& block, bool eof) { @@ -135,34 +106,13 @@ Status SpillStream::spill_eof() { } Status SpillStream::read_next_block_sync(Block* block, bool* eos) { - DCHECK(!read_promise_); DCHECK(reader_ != nullptr); - Status status; - read_promise_ = std::make_unique>(); - read_future_ = read_promise_->get_future(); - // use thread pool to limit concurrent io tasks - status = io_thread_pool_->submit_func([this, block, eos] { - SCOPED_ATTACH_TASK(state_); - Status st; - Defer defer {[&]() { read_promise_->set_value(st); }}; - st = reader_->open(); - if (!st.ok()) { - return; - } - st = reader_->read(block, eos); - }); - if (!status.ok()) { - LOG(WARNING) << "read spill data failed: " << status; - read_promise_.reset(); - return status; - } + DCHECK(!_is_reading); + _is_reading = true; + Defer defer([this] { _is_reading = false; }); - { - SCOPED_TIMER(read_wait_io_timer_); - status = read_future_.get(); - } - read_promise_.reset(); - return status; + RETURN_IF_ERROR(reader_->open()); + return reader_->read(block, eos); } } // namespace doris::vectorized diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h index 68abfa9aaf..638942d1af 100644 --- a/be/src/vec/spill/spill_stream.h +++ b/be/src/vec/spill/spill_stream.h @@ -55,12 +55,8 @@ public: Status spill_block(RuntimeState* state, const Block& block, bool eof); - void end_spill(const Status& status); - Status spill_eof(); - Status wait_spill(); - Status read_next_block_sync(Block* block, bool* eos); void set_write_counters(RuntimeProfile::Counter* serialize_timer, @@ -91,7 +87,6 @@ private: void close(); RuntimeState* state_ = nullptr; - ThreadPool* io_thread_pool_; int64_t stream_id_; std::atomic_bool closed_ = false; SpillDataDir* data_dir_ = nullptr; @@ -99,10 +94,7 @@ private: size_t batch_rows_; size_t batch_bytes_; - std::unique_ptr> spill_promise_; - std::future spill_future_; - std::unique_ptr> read_promise_; - std::future read_future_; + std::atomic_bool _is_reading = false; SpillWriterUPtr writer_; SpillReaderUPtr reader_; diff --git a/be/src/vec/spill/spill_stream_manager.cpp b/be/src/vec/spill/spill_stream_manager.cpp index 05a2531c46..2042555e49 100644 --- a/be/src/vec/spill/spill_stream_manager.cpp +++ b/be/src/vec/spill/spill_stream_manager.cpp @@ -50,11 +50,6 @@ Status SpillStreamManager::init() { LOG(INFO) << "init spill stream manager"; RETURN_IF_ERROR(_init_spill_store_map()); - int spill_io_thread_count = config::spill_io_thread_pool_per_disk_thread_num; - if (spill_io_thread_count <= 0) { - spill_io_thread_count = 2; - } - int pool_idx = 0; for (const auto& [path, store] : _spill_store_map) { auto gc_dir_root_dir = fmt::format("{}/{}", path, SPILL_GC_DIR_PREFIX); bool exists = true; @@ -85,20 +80,12 @@ Status SpillStreamManager::init() { } } store->update_spill_data_usage(spill_data_size); - - std::unique_ptr io_pool; - static_cast(ThreadPoolBuilder(fmt::format("SpillIOThreadPool-{}", pool_idx++)) - .set_min_threads(spill_io_thread_count) - .set_max_threads(spill_io_thread_count) - .set_max_queue_size(config::spill_io_thread_pool_queue_size) - .build(&io_pool)); - path_to_io_thread_pool_[path] = std::move(io_pool); } - static_cast(ThreadPoolBuilder("SpillAsyncTaskThreadPool") - .set_min_threads(config::spill_async_task_thread_pool_thread_num) - .set_max_threads(config::spill_async_task_thread_pool_thread_num) - .set_max_queue_size(config::spill_async_task_thread_pool_queue_size) - .build(&async_task_thread_pool_)); + static_cast(ThreadPoolBuilder("SpillIOThreadPool") + .set_min_threads(config::spill_io_thread_pool_thread_num) + .set_max_threads(config::spill_io_thread_pool_thread_num) + .set_max_queue_size(config::spill_io_thread_pool_queue_size) + .build(&_spill_io_thread_pool)); RETURN_IF_ERROR(Thread::create( "Spill", "spill_gc_thread", [this]() { this->_spill_gc_thread_callback(); }, @@ -274,7 +261,7 @@ void SpillStreamManager::gc(int64_t max_file_count) { } void SpillStreamManager::async_cleanup_query(TUniqueId query_id) { - (void)get_async_task_thread_pool()->submit_func([this, query_id] { + (void)get_spill_io_thread_pool()->submit_func([this, query_id] { for (auto& [_, store] : _spill_store_map) { std::string query_spill_dir = fmt::format("{}/{}/{}", store->path(), SPILL_DIR_PREFIX, print_id(query_id)); diff --git a/be/src/vec/spill/spill_stream_manager.h b/be/src/vec/spill/spill_stream_manager.h index 36062ce0b4..298af77afc 100644 --- a/be/src/vec/spill/spill_stream_manager.h +++ b/be/src/vec/spill/spill_stream_manager.h @@ -112,12 +112,7 @@ public: void gc(int64_t max_file_count); - ThreadPool* get_spill_io_thread_pool(const std::string& path) const { - const auto it = path_to_io_thread_pool_.find(path); - DCHECK(it != path_to_io_thread_pool_.end()); - return it->second.get(); - } - ThreadPool* get_async_task_thread_pool() const { return async_task_thread_pool_.get(); } + ThreadPool* get_spill_io_thread_pool() const { return _spill_io_thread_pool.get(); } private: Status _init_spill_store_map(); @@ -127,8 +122,7 @@ private: std::unordered_map> _spill_store_map; CountDownLatch _stop_background_threads_latch; - std::unique_ptr async_task_thread_pool_; - std::unordered_map> path_to_io_thread_pool_; + std::unique_ptr _spill_io_thread_pool; scoped_refptr _spill_gc_thread; std::atomic_uint64_t id_ = 0;