[improvement](spill) improve config of spill thread pool (#33992)

This commit is contained in:
TengJianPing
2024-04-24 17:54:11 +08:00
committed by yiguolei
parent 0faae45537
commit 2c3e838971
15 changed files with 114 additions and 216 deletions

View File

@ -1166,10 +1166,15 @@ DEFINE_String(spill_storage_root_path, "");
DEFINE_String(spill_storage_limit, "20%"); // 20%
DEFINE_mInt32(spill_gc_interval_ms, "2000"); // 2s
DEFINE_mInt32(spill_gc_file_count, "2000");
DEFINE_Int32(spill_io_thread_pool_per_disk_thread_num, "2");
DEFINE_Int32(spill_io_thread_pool_queue_size, "1024");
DEFINE_Int32(spill_async_task_thread_pool_thread_num, "2");
DEFINE_Int32(spill_async_task_thread_pool_queue_size, "1024");
DEFINE_Int32(spill_io_thread_pool_thread_num, "-1");
DEFINE_Validator(spill_io_thread_pool_thread_num, [](const int config) -> bool {
if (config == -1) {
CpuInfo::init();
spill_io_thread_pool_thread_num = std::max(48, CpuInfo::num_cores() * 2);
}
return true;
});
DEFINE_Int32(spill_io_thread_pool_queue_size, "102400");
DEFINE_mBool(check_segment_when_build_rowset_meta, "false");

View File

@ -1248,10 +1248,8 @@ DECLARE_String(spill_storage_root_path);
DECLARE_String(spill_storage_limit);
DECLARE_mInt32(spill_gc_interval_ms);
DECLARE_mInt32(spill_gc_file_count);
DECLARE_Int32(spill_io_thread_pool_per_disk_thread_num);
DECLARE_Int32(spill_io_thread_pool_thread_num);
DECLARE_Int32(spill_io_thread_pool_queue_size);
DECLARE_Int32(spill_async_task_thread_pool_thread_num);
DECLARE_Int32(spill_async_task_thread_pool_queue_size);
DECLARE_mBool(check_segment_when_build_rowset_meta);

View File

@ -253,7 +253,7 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
MonotonicStopWatch submit_timer;
submit_timer.start();
status = ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func(
status = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
[this, &parent, state, query_id, execution_context, submit_timer] {
auto execution_context_lock = execution_context.lock();
if (!execution_context_lock) {

View File

@ -83,7 +83,7 @@ public:
for (int i = 0; i < Base::_shared_state->partition_count && !state->is_cancelled();
++i) {
if (spill_infos[i].keys_.size() >= spill_batch_rows) {
status = _async_spill_partition_and_wait(
status = _spill_partition(
state, context, Base::_shared_state->spill_partitions[i],
spill_infos[i].keys_, spill_infos[i].values_, nullptr, false);
RETURN_IF_ERROR(status);
@ -98,13 +98,13 @@ public:
auto spill_null_key_data =
(hash_null_key_data && i == Base::_shared_state->partition_count - 1);
if (spill_infos[i].keys_.size() > 0 || spill_null_key_data) {
status = _async_spill_partition_and_wait(
state, context, Base::_shared_state->spill_partitions[i],
spill_infos[i].keys_, spill_infos[i].values_,
spill_null_key_data ? hash_table.template get_null_key_data<
vectorized::AggregateDataPtr>()
: nullptr,
true);
status = _spill_partition(state, context, Base::_shared_state->spill_partitions[i],
spill_infos[i].keys_, spill_infos[i].values_,
spill_null_key_data
? hash_table.template get_null_key_data<
vectorized::AggregateDataPtr>()
: nullptr,
true);
RETURN_IF_ERROR(status);
}
}
@ -120,12 +120,10 @@ public:
}
template <typename HashTableCtxType, typename KeyType>
Status _async_spill_partition_and_wait(RuntimeState* state, HashTableCtxType& context,
AggSpillPartitionSPtr& spill_partition,
std::vector<KeyType>& keys,
std::vector<vectorized::AggregateDataPtr>& values,
const vectorized::AggregateDataPtr null_key_data,
bool is_last) {
Status _spill_partition(RuntimeState* state, HashTableCtxType& context,
AggSpillPartitionSPtr& spill_partition, std::vector<KeyType>& keys,
std::vector<vectorized::AggregateDataPtr>& values,
const vectorized::AggregateDataPtr null_key_data, bool is_last) {
vectorized::SpillStreamSPtr spill_stream;
auto status = spill_partition->get_spill_stream(state, Base::_parent->node_id(),
Base::profile(), spill_stream);
@ -148,27 +146,15 @@ public:
keys.clear();
values.clear();
}
status = spill_stream->prepare_spill();
RETURN_IF_ERROR(status);
status = ExecEnv::GetInstance()
->spill_stream_mgr()
->get_spill_io_thread_pool(spill_stream->get_spill_root_dir())
->submit_func([this, state, &spill_stream] {
(void)state; // avoid ut compile error
SCOPED_ATTACH_TASK(state);
SCOPED_TIMER(_spill_write_disk_timer);
Status status;
Defer defer {[&]() { spill_stream->end_spill(status); }};
status = spill_stream->spill_block(state, block_, false);
return status;
});
if (!status.ok()) {
spill_stream->end_spill(status);
{
SCOPED_TIMER(_spill_write_disk_timer);
status = spill_stream->spill_block(state, block_, false);
}
RETURN_IF_ERROR(status);
status = spill_partition->wait_spill(state);
status = spill_partition->flush_if_full();
_reset_tmp_data();
return status;
}

View File

@ -211,7 +211,7 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
submit_timer.start();
RETURN_IF_ERROR(
ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func(
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
[this, state, query_id, execution_context, submit_timer] {
auto execution_context_lock = execution_context.lock();
if (!execution_context_lock) {

View File

@ -175,8 +175,7 @@ Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
_spill_write_wait_io_timer);
}
auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(
build_spilling_stream->get_spill_root_dir());
auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
auto execution_context = state->get_task_execution_context();
_shared_state_holder = _shared_state->shared_from_this();
MonotonicStopWatch submit_timer;
@ -230,8 +229,7 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
_spill_write_wait_io_timer);
}
auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(
spilling_stream->get_spill_root_dir());
auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
auto& blocks = _probe_blocks[partition_index];
auto& partitioned_block = _partitioned_blocks[partition_index];
@ -296,7 +294,6 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
Status PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_index) {
auto& build_spilling_stream = _shared_state->spilled_streams[partition_index];
if (build_spilling_stream) {
build_spilling_stream->end_spill(Status::OK());
RETURN_IF_ERROR(build_spilling_stream->spill_eof());
build_spilling_stream->set_read_counters(_spill_read_data_time, _spill_deserialize_time,
_spill_read_bytes, _spill_read_wait_io_timer);
@ -305,7 +302,6 @@ Status PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in
auto& probe_spilling_stream = _probe_spilling_streams[partition_index];
if (probe_spilling_stream) {
probe_spilling_stream->end_spill(Status::OK());
RETURN_IF_ERROR(probe_spilling_stream->spill_eof());
probe_spilling_stream->set_read_counters(_spill_read_data_time, _spill_deserialize_time,
_spill_read_bytes, _spill_read_wait_io_timer);
@ -387,7 +383,7 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
_dependency->set_ready();
};
auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool();
auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
has_data = true;
_dependency->block();
@ -453,7 +449,7 @@ Status PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
_dependency->set_ready();
};
auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool();
auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
DCHECK(spill_io_pool != nullptr);
_dependency->block();
has_data = true;

View File

@ -219,7 +219,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
_dependency->set_ready();
};
auto* thread_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool();
auto* thread_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
return thread_pool->submit_func(spill_func);
}
@ -246,8 +246,8 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
DCHECK(spilling_stream != nullptr);
auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(
spilling_stream->get_spill_root_dir());
auto* spill_io_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
DCHECK(spill_io_pool != nullptr);
auto execution_context = state->get_task_execution_context();
_shared_state_holder = _shared_state->shared_from_this();

View File

@ -232,80 +232,74 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
MonotonicStopWatch submit_timer;
submit_timer.start();
status = ExecEnv::GetInstance()
->spill_stream_mgr()
->get_spill_io_thread_pool(_spilling_stream->get_spill_root_dir())
->submit_func([this, state, query_id, &parent, execution_context,
submit_timer] {
auto execution_context_lock = execution_context.lock();
if (!execution_context_lock) {
LOG(INFO) << "query " << print_id(query_id)
<< " execution_context released, maybe query was cancelled.";
return Status::OK();
}
status = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
[this, state, query_id, &parent, execution_context, submit_timer] {
auto execution_context_lock = execution_context.lock();
if (!execution_context_lock) {
LOG(INFO) << "query " << print_id(query_id)
<< " execution_context released, maybe query was cancelled.";
return Status::OK();
}
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
SCOPED_ATTACH_TASK(state);
Defer defer {[&]() {
if (!_shared_state->sink_status.ok() || state->is_cancelled()) {
if (!_shared_state->sink_status.ok()) {
LOG(WARNING) << "query " << print_id(query_id) << " sort node "
<< _parent->id() << " revoke memory error: "
<< _shared_state->sink_status;
}
_shared_state->close();
} else {
VLOG_DEBUG << "query " << print_id(query_id) << " sort node "
<< _parent->id() << " revoke memory finish";
}
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
SCOPED_ATTACH_TASK(state);
Defer defer {[&]() {
if (!_shared_state->sink_status.ok() || state->is_cancelled()) {
if (!_shared_state->sink_status.ok()) {
LOG(WARNING) << "query " << print_id(query_id) << " sort node "
<< _parent->id()
<< " revoke memory error: " << _shared_state->sink_status;
}
_shared_state->close();
} else {
VLOG_DEBUG << "query " << print_id(query_id) << " sort node "
<< _parent->id() << " revoke memory finish";
}
_spilling_stream->end_spill(_shared_state->sink_status);
if (!_shared_state->sink_status.ok()) {
_shared_state->close();
}
if (!_shared_state->sink_status.ok()) {
_shared_state->close();
}
_spilling_stream.reset();
if (_eos) {
_dependency->set_ready_to_read();
_finish_dependency->set_ready();
} else {
_dependency->Dependency::set_ready();
}
}};
_spilling_stream.reset();
if (_eos) {
_dependency->set_ready_to_read();
_finish_dependency->set_ready();
} else {
_dependency->Dependency::set_ready();
}
}};
_shared_state->sink_status = parent._sort_sink_operator->prepare_for_spill(
_runtime_state.get());
RETURN_IF_ERROR(_shared_state->sink_status);
_shared_state->sink_status =
parent._sort_sink_operator->prepare_for_spill(_runtime_state.get());
RETURN_IF_ERROR(_shared_state->sink_status);
auto* sink_local_state = _runtime_state->get_sink_local_state();
update_profile(sink_local_state->profile());
auto* sink_local_state = _runtime_state->get_sink_local_state();
update_profile(sink_local_state->profile());
bool eos = false;
vectorized::Block block;
while (!eos && !state->is_cancelled()) {
{
SCOPED_TIMER(_spill_merge_sort_timer);
_shared_state->sink_status =
parent._sort_sink_operator->merge_sort_read_for_spill(
_runtime_state.get(), &block,
_shared_state->spill_block_batch_row_count, &eos);
}
RETURN_IF_ERROR(_shared_state->sink_status);
{
SCOPED_TIMER(Base::_spill_timer);
_shared_state->sink_status =
_spilling_stream->spill_block(state, block, eos);
}
RETURN_IF_ERROR(_shared_state->sink_status);
block.clear_column_data();
}
parent._sort_sink_operator->reset(_runtime_state.get());
bool eos = false;
vectorized::Block block;
while (!eos && !state->is_cancelled()) {
{
SCOPED_TIMER(_spill_merge_sort_timer);
_shared_state->sink_status =
parent._sort_sink_operator->merge_sort_read_for_spill(
_runtime_state.get(), &block,
_shared_state->spill_block_batch_row_count, &eos);
}
RETURN_IF_ERROR(_shared_state->sink_status);
{
SCOPED_TIMER(Base::_spill_timer);
_shared_state->sink_status =
_spilling_stream->spill_block(state, block, eos);
}
RETURN_IF_ERROR(_shared_state->sink_status);
block.clear_column_data();
}
parent._sort_sink_operator->reset(_runtime_state.get());
return Status::OK();
});
return Status::OK();
});
if (!status.ok()) {
_spilling_stream->end_spill(status);
if (!_eos) {
Base::_dependency->Dependency::set_ready();
}

View File

@ -152,7 +152,6 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
_status = tmp_stream->prepare_spill();
RETURN_IF_ERROR(_status);
Defer defer {[&]() { tmp_stream->end_spill(_status); }};
_shared_state->sorted_streams.emplace_back(tmp_stream);
bool eos = false;
@ -177,7 +176,7 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
}
return Status::OK();
};
return ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func(
return ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
spill_func);
}

View File

@ -231,7 +231,6 @@ Status AggSpillPartition::get_spill_stream(RuntimeState* state, int node_id,
}
void AggSpillPartition::close() {
if (spilling_stream_) {
(void)spilling_stream_->wait_spill();
spilling_stream_.reset();
}
for (auto& stream : spill_streams_) {

View File

@ -445,11 +445,9 @@ struct AggSpillPartition {
Status get_spill_stream(RuntimeState* state, int node_id, RuntimeProfile* profile,
vectorized::SpillStreamSPtr& spilling_stream);
// wait for current bock spilling to finish
Status wait_spill(RuntimeState* state) {
Status flush_if_full() {
DCHECK(spilling_stream_);
auto status = spilling_stream_->wait_spill();
RETURN_IF_ERROR(status);
Status status;
// avoid small spill files
if (spilling_stream_->get_written_bytes() >= AGG_SPILL_FILE_SIZE) {
status = spilling_stream_->spill_eof();

View File

@ -42,10 +42,7 @@ SpillStream::SpillStream(RuntimeState* state, int64_t stream_id, SpillDataDir* d
spill_dir_(std::move(spill_dir)),
batch_rows_(batch_rows),
batch_bytes_(batch_bytes),
profile_(profile) {
io_thread_pool_ =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(data_dir->path());
}
profile_(profile) {}
SpillStream::~SpillStream() {
bool exists = false;
@ -70,14 +67,6 @@ void SpillStream::close() {
}
VLOG_ROW << "closing: " << stream_id_;
closed_ = true;
if (spill_promise_) {
spill_future_.wait();
spill_promise_.reset();
}
if (read_promise_) {
read_future_.wait();
read_promise_.reset();
}
if (writer_) {
(void)writer_->close();
@ -97,25 +86,7 @@ const std::string& SpillStream::get_spill_root_dir() const {
return data_dir_->path();
}
Status SpillStream::prepare_spill() {
DCHECK(!spill_promise_);
RETURN_IF_ERROR(writer_->open());
spill_promise_ = std::make_unique<std::promise<Status>>();
spill_future_ = spill_promise_->get_future();
return Status::OK();
}
void SpillStream::end_spill(const Status& status) {
spill_promise_->set_value(status);
}
Status SpillStream::wait_spill() {
if (spill_promise_) {
SCOPED_TIMER(write_wait_io_timer_);
auto status = spill_future_.get();
spill_promise_.reset();
return status;
}
return Status::OK();
return writer_->open();
}
Status SpillStream::spill_block(RuntimeState* state, const Block& block, bool eof) {
@ -135,34 +106,13 @@ Status SpillStream::spill_eof() {
}
Status SpillStream::read_next_block_sync(Block* block, bool* eos) {
DCHECK(!read_promise_);
DCHECK(reader_ != nullptr);
Status status;
read_promise_ = std::make_unique<std::promise<Status>>();
read_future_ = read_promise_->get_future();
// use thread pool to limit concurrent io tasks
status = io_thread_pool_->submit_func([this, block, eos] {
SCOPED_ATTACH_TASK(state_);
Status st;
Defer defer {[&]() { read_promise_->set_value(st); }};
st = reader_->open();
if (!st.ok()) {
return;
}
st = reader_->read(block, eos);
});
if (!status.ok()) {
LOG(WARNING) << "read spill data failed: " << status;
read_promise_.reset();
return status;
}
DCHECK(!_is_reading);
_is_reading = true;
Defer defer([this] { _is_reading = false; });
{
SCOPED_TIMER(read_wait_io_timer_);
status = read_future_.get();
}
read_promise_.reset();
return status;
RETURN_IF_ERROR(reader_->open());
return reader_->read(block, eos);
}
} // namespace doris::vectorized

View File

@ -55,12 +55,8 @@ public:
Status spill_block(RuntimeState* state, const Block& block, bool eof);
void end_spill(const Status& status);
Status spill_eof();
Status wait_spill();
Status read_next_block_sync(Block* block, bool* eos);
void set_write_counters(RuntimeProfile::Counter* serialize_timer,
@ -91,7 +87,6 @@ private:
void close();
RuntimeState* state_ = nullptr;
ThreadPool* io_thread_pool_;
int64_t stream_id_;
std::atomic_bool closed_ = false;
SpillDataDir* data_dir_ = nullptr;
@ -99,10 +94,7 @@ private:
size_t batch_rows_;
size_t batch_bytes_;
std::unique_ptr<std::promise<Status>> spill_promise_;
std::future<Status> spill_future_;
std::unique_ptr<std::promise<Status>> read_promise_;
std::future<Status> read_future_;
std::atomic_bool _is_reading = false;
SpillWriterUPtr writer_;
SpillReaderUPtr reader_;

View File

@ -50,11 +50,6 @@ Status SpillStreamManager::init() {
LOG(INFO) << "init spill stream manager";
RETURN_IF_ERROR(_init_spill_store_map());
int spill_io_thread_count = config::spill_io_thread_pool_per_disk_thread_num;
if (spill_io_thread_count <= 0) {
spill_io_thread_count = 2;
}
int pool_idx = 0;
for (const auto& [path, store] : _spill_store_map) {
auto gc_dir_root_dir = fmt::format("{}/{}", path, SPILL_GC_DIR_PREFIX);
bool exists = true;
@ -85,20 +80,12 @@ Status SpillStreamManager::init() {
}
}
store->update_spill_data_usage(spill_data_size);
std::unique_ptr<ThreadPool> io_pool;
static_cast<void>(ThreadPoolBuilder(fmt::format("SpillIOThreadPool-{}", pool_idx++))
.set_min_threads(spill_io_thread_count)
.set_max_threads(spill_io_thread_count)
.set_max_queue_size(config::spill_io_thread_pool_queue_size)
.build(&io_pool));
path_to_io_thread_pool_[path] = std::move(io_pool);
}
static_cast<void>(ThreadPoolBuilder("SpillAsyncTaskThreadPool")
.set_min_threads(config::spill_async_task_thread_pool_thread_num)
.set_max_threads(config::spill_async_task_thread_pool_thread_num)
.set_max_queue_size(config::spill_async_task_thread_pool_queue_size)
.build(&async_task_thread_pool_));
static_cast<void>(ThreadPoolBuilder("SpillIOThreadPool")
.set_min_threads(config::spill_io_thread_pool_thread_num)
.set_max_threads(config::spill_io_thread_pool_thread_num)
.set_max_queue_size(config::spill_io_thread_pool_queue_size)
.build(&_spill_io_thread_pool));
RETURN_IF_ERROR(Thread::create(
"Spill", "spill_gc_thread", [this]() { this->_spill_gc_thread_callback(); },
@ -274,7 +261,7 @@ void SpillStreamManager::gc(int64_t max_file_count) {
}
void SpillStreamManager::async_cleanup_query(TUniqueId query_id) {
(void)get_async_task_thread_pool()->submit_func([this, query_id] {
(void)get_spill_io_thread_pool()->submit_func([this, query_id] {
for (auto& [_, store] : _spill_store_map) {
std::string query_spill_dir =
fmt::format("{}/{}/{}", store->path(), SPILL_DIR_PREFIX, print_id(query_id));

View File

@ -112,12 +112,7 @@ public:
void gc(int64_t max_file_count);
ThreadPool* get_spill_io_thread_pool(const std::string& path) const {
const auto it = path_to_io_thread_pool_.find(path);
DCHECK(it != path_to_io_thread_pool_.end());
return it->second.get();
}
ThreadPool* get_async_task_thread_pool() const { return async_task_thread_pool_.get(); }
ThreadPool* get_spill_io_thread_pool() const { return _spill_io_thread_pool.get(); }
private:
Status _init_spill_store_map();
@ -127,8 +122,7 @@ private:
std::unordered_map<std::string, std::unique_ptr<SpillDataDir>> _spill_store_map;
CountDownLatch _stop_background_threads_latch;
std::unique_ptr<ThreadPool> async_task_thread_pool_;
std::unordered_map<std::string, std::unique_ptr<ThreadPool>> path_to_io_thread_pool_;
std::unique_ptr<ThreadPool> _spill_io_thread_pool;
scoped_refptr<Thread> _spill_gc_thread;
std::atomic_uint64_t id_ = 0;