[fix](spill) fix wrong disk usage of spill (#35423)

## Proposed changes

Issue Number: close #xxx

<!--Describe your changes.-->

## Further comments

If this is a relatively large or complex change, kick off the discussion
at [dev@doris.apache.org](mailto:dev@doris.apache.org) by explaining why
you chose the solution you did and what alternatives you considered,
etc...
This commit is contained in:
TengJianPing
2024-05-28 17:55:45 +08:00
committed by yiguolei
parent 72a27a0938
commit eefeb4d80c
7 changed files with 62 additions and 46 deletions

View File

@ -1173,9 +1173,9 @@ DEFINE_mDouble(high_disk_avail_level_diff_usages, "0.15");
DEFINE_Int32(partition_disk_index_lru_size, "10000");
// limit the storage space that query spill files can use
DEFINE_String(spill_storage_root_path, "");
DEFINE_String(spill_storage_limit, "20%"); // 20%
DEFINE_mInt32(spill_gc_interval_ms, "2000"); // 2s
DEFINE_mInt32(spill_gc_file_count, "2000");
DEFINE_String(spill_storage_limit, "20%"); // 20%
DEFINE_mInt32(spill_gc_interval_ms, "2000"); // 2s
DEFINE_mInt32(spill_gc_work_time_ms, "2000"); // 2s
DEFINE_Int32(spill_io_thread_pool_thread_num, "-1");
DEFINE_Validator(spill_io_thread_pool_thread_num, [](const int config) -> bool {
if (config == -1) {

View File

@ -1259,7 +1259,7 @@ DECLARE_String(spill_storage_root_path);
// disk_capacity_bytes * storage_flood_stage_usage_percent * spill_storage_limit
DECLARE_String(spill_storage_limit);
DECLARE_mInt32(spill_gc_interval_ms);
DECLARE_mInt32(spill_gc_file_count);
DECLARE_mInt32(spill_gc_work_time_ms);
DECLARE_Int32(spill_io_thread_pool_thread_num);
DECLARE_Int32(spill_io_thread_pool_queue_size);

View File

@ -61,23 +61,6 @@ Status SpillStream::prepare() {
return Status::OK();
}
void SpillStream::close() {
if (closed_) {
return;
}
VLOG_ROW << "closing: " << stream_id_;
closed_ = true;
if (writer_) {
(void)writer_->close();
writer_.reset();
}
if (reader_) {
(void)reader_->close();
reader_.reset();
}
}
const TUniqueId& SpillStream::query_id() const {
return state_->query_id();
}
@ -94,13 +77,17 @@ Status SpillStream::spill_block(RuntimeState* state, const Block& block, bool eo
RETURN_IF_ERROR(writer_->write(state, block, written_bytes));
if (eof) {
RETURN_IF_ERROR(writer_->close());
total_written_bytes_ = writer_->get_written_bytes();
writer_.reset();
} else {
total_written_bytes_ = writer_->get_written_bytes();
}
return Status::OK();
}
Status SpillStream::spill_eof() {
RETURN_IF_ERROR(writer_->close());
total_written_bytes_ = writer_->get_written_bytes();
writer_.reset();
return Status::OK();
}
@ -115,4 +102,8 @@ Status SpillStream::read_next_block_sync(Block* block, bool* eos) {
return reader_->read(block, eos);
}
void SpillStream::decrease_spill_data_usage() {
data_dir_->update_spill_data_usage(-total_written_bytes_);
}
} // namespace doris::vectorized

View File

@ -49,7 +49,7 @@ public:
const std::string& get_spill_dir() const { return spill_dir_; }
size_t get_written_bytes() const { return writer_->get_written_bytes(); }
int64_t get_written_bytes() const { return total_written_bytes_; }
Status prepare_spill();
@ -79,20 +79,20 @@ public:
const TUniqueId& query_id() const;
void decrease_spill_data_usage();
private:
friend class SpillStreamManager;
Status prepare();
void close();
RuntimeState* state_ = nullptr;
int64_t stream_id_;
std::atomic_bool closed_ = false;
SpillDataDir* data_dir_ = nullptr;
std::string spill_dir_;
size_t batch_rows_;
size_t batch_bytes_;
int64_t total_written_bytes_ = 0;
std::atomic_bool _is_reading = false;

View File

@ -98,7 +98,7 @@ Status SpillStreamManager::init() {
void SpillStreamManager::_spill_gc_thread_callback() {
while (!_stop_background_threads_latch.wait_for(
std::chrono::milliseconds(config::spill_gc_interval_ms))) {
gc(config::spill_gc_file_count);
gc(config::spill_gc_work_time_ms);
for (auto& [path, dir] : _spill_store_map) {
static_cast<void>(dir->update_capacity());
}
@ -203,16 +203,29 @@ void SpillStreamManager::delete_spill_stream(SpillStreamSPtr stream) {
fmt::format("{}/{}", query_dir,
std::filesystem::path(stream->get_spill_dir()).filename().string());
(void)io::global_local_filesystem()->rename(stream->get_spill_dir(), gc_dir);
stream->decrease_spill_data_usage();
}
}
void SpillStreamManager::gc(int64_t max_file_count) {
if (max_file_count < 1) {
return;
}
void SpillStreamManager::gc(int32_t max_work_time_ms) {
bool exists = true;
int64_t count = 0;
bool has_work = false;
int64_t max_work_time_ns = max_work_time_ms * 1000L * 1000L;
MonotonicStopWatch watch;
watch.start();
Defer defer {[&]() {
if (has_work) {
std::string msg(
fmt::format("spill gc time: {}",
PrettyPrinter::print(watch.elapsed_time(), TUnit::TIME_NS)));
msg += ", spill storage:\n";
for (const auto& [path, store_dir] : _spill_store_map) {
msg += " " + store_dir->debug_string();
msg += "\n";
}
LOG(INFO) << msg;
}
}};
for (const auto& [path, store_dir] : _spill_store_map) {
std::string gc_root_dir = fmt::format("{}/{}", path, SPILL_GC_DIR_PREFIX);
@ -221,6 +234,7 @@ void SpillStreamManager::gc(int64_t max_file_count) {
if (ec || !exists) {
continue;
}
// dirs of queries
std::vector<io::FileInfo> dirs;
auto st = io::global_local_filesystem()->list(gc_root_dir, false, &dirs, &exists);
if (!st.ok()) {
@ -228,32 +242,32 @@ void SpillStreamManager::gc(int64_t max_file_count) {
}
for (const auto& dir : dirs) {
has_work = true;
if (dir.is_file) {
continue;
}
std::string abs_dir = fmt::format("{}/{}", gc_root_dir, dir.file_name);
// operator spill sub dirs of a query
std::vector<io::FileInfo> files;
st = io::global_local_filesystem()->list(abs_dir, true, &files, &exists);
st = io::global_local_filesystem()->list(abs_dir, false, &files, &exists);
if (!st.ok()) {
continue;
}
if (files.empty()) {
static_cast<void>(io::global_local_filesystem()->delete_directory(abs_dir));
if (count++ == max_file_count) {
return;
}
continue;
}
int64_t data_size = 0;
Defer defer {[&]() { store_dir->update_spill_data_usage(-data_size); }};
for (const auto& file : files) {
auto abs_file_path = fmt::format("{}/{}", abs_dir, file.file_name);
data_size += file.file_size;
static_cast<void>(io::global_local_filesystem()->delete_file(abs_file_path));
if (count++ == max_file_count) {
return;
if (file.is_file) {
static_cast<void>(io::global_local_filesystem()->delete_file(abs_file_path));
} else {
static_cast<void>(
io::global_local_filesystem()->delete_directory(abs_file_path));
}
if (watch.elapsed_time() > max_work_time_ns) {
break;
}
}
}
@ -357,4 +371,13 @@ bool SpillDataDir::reach_capacity_limit(int64_t incoming_data_size) {
}
return false;
}
std::string SpillDataDir::debug_string() {
return fmt::format(
"path: {}, capacity: {}, limit: {}, used: {}, available: "
"{}",
_path, PrettyPrinter::print_bytes(_disk_capacity_bytes),
PrettyPrinter::print_bytes(_spill_data_limit_bytes),
PrettyPrinter::print_bytes(_spill_data_bytes),
PrettyPrinter::print_bytes(_available_bytes));
}
} // namespace doris::vectorized

View File

@ -64,6 +64,8 @@ public:
return _spill_data_limit_bytes;
}
std::string debug_string();
private:
bool _reach_disk_capacity_limit(int64_t incoming_data_size);
double _get_disk_usage(int64_t incoming_data_size) const {
@ -110,7 +112,7 @@ public:
void async_cleanup_query(TUniqueId query_id);
void gc(int64_t max_file_count);
void gc(int32_t max_work_time_ms);
ThreadPool* get_spill_io_thread_pool() const { return _spill_io_thread_pool.get(); }

View File

@ -44,7 +44,7 @@ public:
int64_t get_id() const { return stream_id_; }
size_t get_written_bytes() const { return total_written_bytes_; }
int64_t get_written_bytes() const { return total_written_bytes_; }
const std::string& get_file_path() const { return file_path_; }
@ -75,7 +75,7 @@ private:
std::unique_ptr<doris::io::FileWriter> file_writer_;
size_t written_blocks_ = 0;
size_t total_written_bytes_ = 0;
int64_t total_written_bytes_ = 0;
std::string meta_;
RuntimeProfile::Counter* write_bytes_counter_;