diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index be7a78b198..a440643c6e 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1173,9 +1173,9 @@ DEFINE_mDouble(high_disk_avail_level_diff_usages, "0.15"); DEFINE_Int32(partition_disk_index_lru_size, "10000"); // limit the storage space that query spill files can use DEFINE_String(spill_storage_root_path, ""); -DEFINE_String(spill_storage_limit, "20%"); // 20% -DEFINE_mInt32(spill_gc_interval_ms, "2000"); // 2s -DEFINE_mInt32(spill_gc_file_count, "2000"); +DEFINE_String(spill_storage_limit, "20%"); // 20% +DEFINE_mInt32(spill_gc_interval_ms, "2000"); // 2s +DEFINE_mInt32(spill_gc_work_time_ms, "2000"); // 2s DEFINE_Int32(spill_io_thread_pool_thread_num, "-1"); DEFINE_Validator(spill_io_thread_pool_thread_num, [](const int config) -> bool { if (config == -1) { diff --git a/be/src/common/config.h b/be/src/common/config.h index da399de9ab..39a5a0eb72 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1259,7 +1259,7 @@ DECLARE_String(spill_storage_root_path); // disk_capacity_bytes * storage_flood_stage_usage_percent * spill_storage_limit DECLARE_String(spill_storage_limit); DECLARE_mInt32(spill_gc_interval_ms); -DECLARE_mInt32(spill_gc_file_count); +DECLARE_mInt32(spill_gc_work_time_ms); DECLARE_Int32(spill_io_thread_pool_thread_num); DECLARE_Int32(spill_io_thread_pool_queue_size); diff --git a/be/src/vec/spill/spill_stream.cpp b/be/src/vec/spill/spill_stream.cpp index e4631f1e1c..b9c27a9d6a 100644 --- a/be/src/vec/spill/spill_stream.cpp +++ b/be/src/vec/spill/spill_stream.cpp @@ -61,23 +61,6 @@ Status SpillStream::prepare() { return Status::OK(); } -void SpillStream::close() { - if (closed_) { - return; - } - VLOG_ROW << "closing: " << stream_id_; - closed_ = true; - - if (writer_) { - (void)writer_->close(); - writer_.reset(); - } - if (reader_) { - (void)reader_->close(); - reader_.reset(); - } -} - const TUniqueId& SpillStream::query_id() const { return state_->query_id(); } @@ -94,13 +77,17 @@ Status SpillStream::spill_block(RuntimeState* state, const Block& block, bool eo RETURN_IF_ERROR(writer_->write(state, block, written_bytes)); if (eof) { RETURN_IF_ERROR(writer_->close()); + total_written_bytes_ = writer_->get_written_bytes(); writer_.reset(); + } else { + total_written_bytes_ = writer_->get_written_bytes(); } return Status::OK(); } Status SpillStream::spill_eof() { RETURN_IF_ERROR(writer_->close()); + total_written_bytes_ = writer_->get_written_bytes(); writer_.reset(); return Status::OK(); } @@ -115,4 +102,8 @@ Status SpillStream::read_next_block_sync(Block* block, bool* eos) { return reader_->read(block, eos); } +void SpillStream::decrease_spill_data_usage() { + data_dir_->update_spill_data_usage(-total_written_bytes_); +} + } // namespace doris::vectorized diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h index 638942d1af..cadfa6fb6d 100644 --- a/be/src/vec/spill/spill_stream.h +++ b/be/src/vec/spill/spill_stream.h @@ -49,7 +49,7 @@ public: const std::string& get_spill_dir() const { return spill_dir_; } - size_t get_written_bytes() const { return writer_->get_written_bytes(); } + int64_t get_written_bytes() const { return total_written_bytes_; } Status prepare_spill(); @@ -79,20 +79,20 @@ public: const TUniqueId& query_id() const; + void decrease_spill_data_usage(); + private: friend class SpillStreamManager; Status prepare(); - void close(); - RuntimeState* state_ = nullptr; int64_t stream_id_; - std::atomic_bool closed_ = false; SpillDataDir* data_dir_ = nullptr; std::string spill_dir_; size_t batch_rows_; size_t batch_bytes_; + int64_t total_written_bytes_ = 0; std::atomic_bool _is_reading = false; diff --git a/be/src/vec/spill/spill_stream_manager.cpp b/be/src/vec/spill/spill_stream_manager.cpp index 2042555e49..12a1e59037 100644 --- a/be/src/vec/spill/spill_stream_manager.cpp +++ b/be/src/vec/spill/spill_stream_manager.cpp @@ -98,7 +98,7 @@ Status SpillStreamManager::init() { void SpillStreamManager::_spill_gc_thread_callback() { while (!_stop_background_threads_latch.wait_for( std::chrono::milliseconds(config::spill_gc_interval_ms))) { - gc(config::spill_gc_file_count); + gc(config::spill_gc_work_time_ms); for (auto& [path, dir] : _spill_store_map) { static_cast(dir->update_capacity()); } @@ -203,16 +203,29 @@ void SpillStreamManager::delete_spill_stream(SpillStreamSPtr stream) { fmt::format("{}/{}", query_dir, std::filesystem::path(stream->get_spill_dir()).filename().string()); (void)io::global_local_filesystem()->rename(stream->get_spill_dir(), gc_dir); + stream->decrease_spill_data_usage(); } } -void SpillStreamManager::gc(int64_t max_file_count) { - if (max_file_count < 1) { - return; - } - +void SpillStreamManager::gc(int32_t max_work_time_ms) { bool exists = true; - int64_t count = 0; + bool has_work = false; + int64_t max_work_time_ns = max_work_time_ms * 1000L * 1000L; + MonotonicStopWatch watch; + watch.start(); + Defer defer {[&]() { + if (has_work) { + std::string msg( + fmt::format("spill gc time: {}", + PrettyPrinter::print(watch.elapsed_time(), TUnit::TIME_NS))); + msg += ", spill storage:\n"; + for (const auto& [path, store_dir] : _spill_store_map) { + msg += " " + store_dir->debug_string(); + msg += "\n"; + } + LOG(INFO) << msg; + } + }}; for (const auto& [path, store_dir] : _spill_store_map) { std::string gc_root_dir = fmt::format("{}/{}", path, SPILL_GC_DIR_PREFIX); @@ -221,6 +234,7 @@ void SpillStreamManager::gc(int64_t max_file_count) { if (ec || !exists) { continue; } + // dirs of queries std::vector dirs; auto st = io::global_local_filesystem()->list(gc_root_dir, false, &dirs, &exists); if (!st.ok()) { @@ -228,32 +242,32 @@ void SpillStreamManager::gc(int64_t max_file_count) { } for (const auto& dir : dirs) { + has_work = true; if (dir.is_file) { continue; } std::string abs_dir = fmt::format("{}/{}", gc_root_dir, dir.file_name); + // operator spill sub dirs of a query std::vector files; - st = io::global_local_filesystem()->list(abs_dir, true, &files, &exists); + st = io::global_local_filesystem()->list(abs_dir, false, &files, &exists); if (!st.ok()) { continue; } if (files.empty()) { static_cast(io::global_local_filesystem()->delete_directory(abs_dir)); - if (count++ == max_file_count) { - return; - } continue; } - int64_t data_size = 0; - Defer defer {[&]() { store_dir->update_spill_data_usage(-data_size); }}; - for (const auto& file : files) { auto abs_file_path = fmt::format("{}/{}", abs_dir, file.file_name); - data_size += file.file_size; - static_cast(io::global_local_filesystem()->delete_file(abs_file_path)); - if (count++ == max_file_count) { - return; + if (file.is_file) { + static_cast(io::global_local_filesystem()->delete_file(abs_file_path)); + } else { + static_cast( + io::global_local_filesystem()->delete_directory(abs_file_path)); + } + if (watch.elapsed_time() > max_work_time_ns) { + break; } } } @@ -357,4 +371,13 @@ bool SpillDataDir::reach_capacity_limit(int64_t incoming_data_size) { } return false; } +std::string SpillDataDir::debug_string() { + return fmt::format( + "path: {}, capacity: {}, limit: {}, used: {}, available: " + "{}", + _path, PrettyPrinter::print_bytes(_disk_capacity_bytes), + PrettyPrinter::print_bytes(_spill_data_limit_bytes), + PrettyPrinter::print_bytes(_spill_data_bytes), + PrettyPrinter::print_bytes(_available_bytes)); +} } // namespace doris::vectorized diff --git a/be/src/vec/spill/spill_stream_manager.h b/be/src/vec/spill/spill_stream_manager.h index 298af77afc..2ac68bb136 100644 --- a/be/src/vec/spill/spill_stream_manager.h +++ b/be/src/vec/spill/spill_stream_manager.h @@ -64,6 +64,8 @@ public: return _spill_data_limit_bytes; } + std::string debug_string(); + private: bool _reach_disk_capacity_limit(int64_t incoming_data_size); double _get_disk_usage(int64_t incoming_data_size) const { @@ -110,7 +112,7 @@ public: void async_cleanup_query(TUniqueId query_id); - void gc(int64_t max_file_count); + void gc(int32_t max_work_time_ms); ThreadPool* get_spill_io_thread_pool() const { return _spill_io_thread_pool.get(); } diff --git a/be/src/vec/spill/spill_writer.h b/be/src/vec/spill/spill_writer.h index 865b7aeb00..d77bbd6908 100644 --- a/be/src/vec/spill/spill_writer.h +++ b/be/src/vec/spill/spill_writer.h @@ -44,7 +44,7 @@ public: int64_t get_id() const { return stream_id_; } - size_t get_written_bytes() const { return total_written_bytes_; } + int64_t get_written_bytes() const { return total_written_bytes_; } const std::string& get_file_path() const { return file_path_; } @@ -75,7 +75,7 @@ private: std::unique_ptr file_writer_; size_t written_blocks_ = 0; - size_t total_written_bytes_ = 0; + int64_t total_written_bytes_ = 0; std::string meta_; RuntimeProfile::Counter* write_bytes_counter_;