From f1b91858309a66ea37d3d87190f36646cd081ef3 Mon Sep 17 00:00:00 2001 From: plat1ko Date: Tue, 14 Feb 2023 15:21:54 +0800 Subject: [PATCH] [feature](cooldown) Implement cold data compaction (#16681) --- be/src/common/config.h | 2 + be/src/olap/CMakeLists.txt | 1 + be/src/olap/base_compaction.cpp | 9 +- be/src/olap/base_compaction.h | 2 +- be/src/olap/cold_data_compaction.cpp | 78 +++++++++ be/src/olap/cold_data_compaction.h | 40 +++++ be/src/olap/compaction.cpp | 93 +++++++--- be/src/olap/compaction.h | 4 +- be/src/olap/cumulative_compaction.cpp | 2 +- be/src/olap/cumulative_compaction.h | 2 +- be/src/olap/cumulative_compaction_policy.cpp | 34 ++-- be/src/olap/cumulative_compaction_policy.h | 9 +- be/src/olap/olap_common.h | 1 + be/src/olap/olap_server.cpp | 112 ++++++++++++ be/src/olap/reader.cpp | 4 +- be/src/olap/storage_engine.h | 3 + be/src/olap/tablet.cpp | 175 ++++++++++++------- be/src/olap/tablet.h | 11 ++ be/src/olap/tablet_meta.h | 4 +- be/src/vec/olap/vertical_merge_iterator.cpp | 4 + 20 files changed, 466 insertions(+), 124 deletions(-) create mode 100644 be/src/olap/cold_data_compaction.cpp create mode 100644 be/src/olap/cold_data_compaction.h diff --git a/be/src/common/config.h b/be/src/common/config.h index 176b3d665a..abf2bf3574 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -811,6 +811,8 @@ CONF_Int32(cooldown_thread_num, "5"); CONF_mInt64(generate_cooldown_task_interval_sec, "20"); CONF_mInt32(remove_unused_remote_files_interval_sec, "21600"); // 6h CONF_mInt32(confirm_unused_remote_files_interval_sec, "60"); +CONF_Int32(cold_data_compaction_thread_num, "2"); +CONF_mInt32(cold_data_compaction_interval_sec, "1800"); CONF_mInt64(generate_cache_cleaner_task_interval_sec, "43200"); // 12 h CONF_Int32(concurrency_per_dir, "2"); CONF_mInt64(cooldown_lag_time_sec, "10800"); // 3h diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt index 3ae9d90ba2..c37bf8938f 100644 --- a/be/src/olap/CMakeLists.txt +++ b/be/src/olap/CMakeLists.txt @@ -28,6 +28,7 @@ add_library(Olap STATIC base_tablet.cpp bloom_filter.hpp block_column_predicate.cpp + cold_data_compaction.cpp compaction.cpp compaction_permit_limiter.cpp cumulative_compaction.cpp diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp index 36fdb193de..d67f8a2d72 100644 --- a/be/src/olap/base_compaction.cpp +++ b/be/src/olap/base_compaction.cpp @@ -23,7 +23,7 @@ namespace doris { using namespace ErrorCode; -BaseCompaction::BaseCompaction(TabletSharedPtr tablet) +BaseCompaction::BaseCompaction(const TabletSharedPtr& tablet) : Compaction(tablet, "BaseCompaction:" + std::to_string(tablet->tablet_id())) {} BaseCompaction::~BaseCompaction() {} @@ -90,9 +90,14 @@ Status BaseCompaction::execute_compact_impl() { void BaseCompaction::_filter_input_rowset() { // if dup_key and no delete predicate // we skip big files too save resources - if (_tablet->keys_type() != KeysType::DUP_KEYS || _tablet->delete_predicates().size() != 0) { + if (_tablet->keys_type() != KeysType::DUP_KEYS) { return; } + for (auto& rs : _input_rowsets) { + if (rs->rowset_meta()->has_delete_predicate()) { + return; + } + } int64_t max_size = config::base_compaction_dup_key_max_file_size_mbytes * 1024 * 1024; // first find a proper rowset for start auto rs_iter = _input_rowsets.begin(); diff --git a/be/src/olap/base_compaction.h b/be/src/olap/base_compaction.h index 96a4a362f4..da53a89d95 100644 --- a/be/src/olap/base_compaction.h +++ b/be/src/olap/base_compaction.h @@ -28,7 +28,7 @@ namespace doris { class BaseCompaction : public Compaction { public: - BaseCompaction(TabletSharedPtr tablet); + BaseCompaction(const TabletSharedPtr& tablet); ~BaseCompaction() override; Status prepare_compact() override; diff --git a/be/src/olap/cold_data_compaction.cpp b/be/src/olap/cold_data_compaction.cpp new file mode 100644 index 0000000000..4b06ee7616 --- /dev/null +++ b/be/src/olap/cold_data_compaction.cpp @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/cold_data_compaction.h" + +#include "common/compiler_util.h" +#include "olap/compaction.h" +#include "olap/rowset/rowset.h" + +namespace doris { +using namespace ErrorCode; + +ColdDataCompaction::ColdDataCompaction(const TabletSharedPtr& tablet) + : Compaction(tablet, "ColdDataCompaction:" + std::to_string(tablet->tablet_id())) {} + +ColdDataCompaction::~ColdDataCompaction() = default; + +Status ColdDataCompaction::prepare_compact() { + if (UNLIKELY(!_tablet->init_succeeded())) { + return Status::Error(); + } + return pick_rowsets_to_compact(); +} + +Status ColdDataCompaction::execute_compact_impl() { +#ifndef __APPLE__ + if (config::enable_base_compaction_idle_sched) { + Thread::set_idle_sched(); + } +#endif + SCOPED_ATTACH_TASK(_mem_tracker); + int64_t permits = get_compaction_permits(); + RETURN_IF_ERROR(do_compaction(permits)); + _state = CompactionState::SUCCESS; + return Status::OK(); +} + +Status ColdDataCompaction::pick_rowsets_to_compact() { + _tablet->traverse_rowsets([this](const auto& rs) { + if (!rs->is_local()) { + _input_rowsets.push_back(rs); + } + }); + std::sort(_input_rowsets.begin(), _input_rowsets.end(), Rowset::comparator); + return check_version_continuity(_input_rowsets); +} + +Status ColdDataCompaction::modify_rowsets() { + { + std::lock_guard wlock(_tablet->get_header_lock()); + // Merged cooldowned rowsets MUST NOT be managed by version graph, they will be reclaimed by `remove_unused_remote_files`. + _tablet->delete_rowsets(_input_rowsets, false); + _tablet->add_rowsets({_output_rowset}); + // TODO(plat1ko): process primary key + _tablet->tablet_meta()->set_cooldown_meta_id(UniqueId::gen_uid()); + } + { + std::shared_lock rlock(_tablet->get_header_lock()); + _tablet->save_meta(); + } + return Status::OK(); +} + +} // namespace doris diff --git a/be/src/olap/cold_data_compaction.h b/be/src/olap/cold_data_compaction.h new file mode 100644 index 0000000000..75b4d064ca --- /dev/null +++ b/be/src/olap/cold_data_compaction.h @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "olap/compaction.h" + +namespace doris { + +class ColdDataCompaction final : public Compaction { +public: + ColdDataCompaction(const TabletSharedPtr& tablet); + ~ColdDataCompaction() override; + + Status prepare_compact() override; + Status execute_compact_impl() override; + +private: + std::string compaction_name() const override { return "cold data compaction"; } + ReaderType compaction_type() const override { return ReaderType::READER_COLD_DATA_COMPACTION; } + + Status pick_rowsets_to_compact() override; + Status modify_rowsets() override; +}; + +} // namespace doris diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index f968f219fe..b7fedd2203 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -33,7 +33,7 @@ using std::vector; namespace doris { using namespace ErrorCode; -Compaction::Compaction(TabletSharedPtr tablet, const std::string& label) +Compaction::Compaction(const TabletSharedPtr& tablet, const std::string& label) : _tablet(tablet), _input_rowsets_size(0), _input_row_num(0), @@ -197,6 +197,10 @@ bool Compaction::handle_ordered_data_compaction() { if (!config::enable_ordered_data_compaction) { return false; } + if (compaction_type() == ReaderType::READER_COLD_DATA_COMPACTION) { + // The remote file system does not support to link files. + return false; + } if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && _tablet->enable_unique_key_merge_on_write()) { return false; @@ -204,7 +208,7 @@ bool Compaction::handle_ordered_data_compaction() { // check delete version: if compaction type is base compaction and // has a delete version, use original compaction if (compaction_type() == ReaderType::READER_BASE_COMPACTION) { - for (auto rowset : _input_rowsets) { + for (auto& rowset : _input_rowsets) { if (rowset->rowset_meta()->has_delete_predicate()) { return false; } @@ -247,7 +251,7 @@ Status Compaction::do_compaction_impl(int64_t permits) { int64_t now = UnixMillis(); if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) { _tablet->set_last_cumu_compaction_success_time(now); - } else { + } else if (compaction_type() == ReaderType::READER_BASE_COMPACTION) { _tablet->set_last_base_compaction_success_time(now); } auto cumu_policy = _tablet->cumulative_compaction_policy(); @@ -273,41 +277,52 @@ Status Compaction::do_compaction_impl(int64_t permits) { // 2. write merged rows to output rowset // The test results show that merger is low-memory-footprint, there is no need to tracker its mem pool Merger::Statistics stats; - Status res; if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && _tablet->enable_unique_key_merge_on_write()) { stats.rowid_conversion = &_rowid_conversion; } - if (use_vectorized_compaction) { - if (vertical_compaction) { - res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), _cur_tablet_schema, - _input_rs_readers, _output_rs_writer.get(), - get_avg_segment_rows(), &stats); + auto build_output_rowset = [&]() { + Status res; + if (use_vectorized_compaction) { + if (vertical_compaction) { + res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), _cur_tablet_schema, + _input_rs_readers, _output_rs_writer.get(), + get_avg_segment_rows(), &stats); + } else { + res = Merger::vmerge_rowsets(_tablet, compaction_type(), _cur_tablet_schema, + _input_rs_readers, _output_rs_writer.get(), &stats); + } } else { - res = Merger::vmerge_rowsets(_tablet, compaction_type(), _cur_tablet_schema, - _input_rs_readers, _output_rs_writer.get(), &stats); + LOG(FATAL) << "Only support vectorized compaction"; } - } else { - LOG(FATAL) << "Only support vectorized compaction"; - } - if (!res.ok()) { - LOG(WARNING) << "fail to do " << merge_type << compaction_name() << ". res=" << res - << ", tablet=" << _tablet->full_name() - << ", output_version=" << _output_version; + if (!res.ok()) { + LOG(WARNING) << "fail to do " << merge_type << compaction_name() << ". res=" << res + << ", tablet=" << _tablet->full_name() + << ", output_version=" << _output_version; + return res; + } + TRACE("merge rowsets finished"); + TRACE_COUNTER_INCREMENT("merged_rows", stats.merged_rows); + TRACE_COUNTER_INCREMENT("filtered_rows", stats.filtered_rows); + + _output_rowset = _output_rs_writer->build(); + if (_output_rowset == nullptr) { + LOG(WARNING) << "rowset writer build failed. writer version:" + << ", output_version=" << _output_version; + return Status::Error(); + } return res; - } - TRACE("merge rowsets finished"); - TRACE_COUNTER_INCREMENT("merged_rows", stats.merged_rows); - TRACE_COUNTER_INCREMENT("filtered_rows", stats.filtered_rows); + }; - _output_rowset = _output_rs_writer->build(); - if (_output_rowset == nullptr) { - LOG(WARNING) << "rowset writer build failed. writer version:" - << ", output_version=" << _output_version; - return Status::Error(); + if (compaction_type() == ReaderType::READER_COLD_DATA_COMPACTION) { + std::shared_lock slock(_tablet->get_remote_files_lock()); + RETURN_IF_ERROR(build_output_rowset()); + } else { + RETURN_IF_ERROR(build_output_rowset()); } + TRACE_COUNTER_INCREMENT("output_rowset_data_size", _output_rowset->data_disk_size()); TRACE_COUNTER_INCREMENT("output_row_num", _output_rowset->num_rows()); TRACE_COUNTER_INCREMENT("output_segments_num", _output_rowset->num_segments()); @@ -326,7 +341,7 @@ Status Compaction::do_compaction_impl(int64_t permits) { // TODO(yingchun): do the judge in Tablet class if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) { _tablet->set_last_cumu_compaction_success_time(now); - } else { + } else if (compaction_type() == ReaderType::READER_BASE_COMPACTION) { _tablet->set_last_base_compaction_success_time(now); } @@ -364,6 +379,22 @@ Status Compaction::construct_output_rowset_writer(bool is_vertical) { ctx.segments_overlap = NONOVERLAPPING; ctx.tablet_schema = _cur_tablet_schema; ctx.newest_write_timestamp = _newest_write_timestamp; + if (compaction_type() == ReaderType::READER_COLD_DATA_COMPACTION) { + // write output rowset to storage policy resource + auto storage_policy = get_storage_policy(_tablet->storage_policy_id()); + if (storage_policy == nullptr) { + return Status::InternalError("could not find storage_policy, storage_policy_id={}", + _tablet->storage_policy_id()); + } + auto resource = get_storage_resource(storage_policy->resource_id); + if (resource.fs == nullptr) { + return Status::InternalError("could not find resource, resouce_id={}", + storage_policy->resource_id); + } + DCHECK(atol(resource.fs->id().c_str()) == storage_policy->resource_id); + DCHECK(resource.fs->type() != io::FileSystemType::LOCAL); + ctx.fs = std::move(resource.fs); + } if (is_vertical) { return _tablet->create_vertical_rowset_writer(ctx, &_output_rs_writer); } @@ -404,6 +435,12 @@ Status Compaction::modify_rowsets() { void Compaction::gc_output_rowset() { if (_state != CompactionState::SUCCESS && _output_rowset != nullptr) { + if (!_output_rowset->is_local()) { + _tablet->record_unused_remote_rowset(_output_rowset->rowset_id(), + _output_rowset->rowset_meta()->resource_id(), + _output_rowset->num_segments()); + return; + } StorageEngine::instance()->add_unused_rowset(_output_rowset); } } diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index 8501df9ef4..d136650d32 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -42,7 +42,7 @@ class Merger; // 4. gc output rowset if failed class Compaction { public: - Compaction(TabletSharedPtr tablet, const std::string& label); + Compaction(const TabletSharedPtr& tablet, const std::string& label); virtual ~Compaction(); // This is only for http CompactionAction @@ -64,7 +64,7 @@ protected: Status do_compaction(int64_t permits); Status do_compaction_impl(int64_t permits); - Status modify_rowsets(); + virtual Status modify_rowsets(); void gc_output_rowset(); Status construct_output_rowset_writer(bool is_vertical = false); diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index 765790859c..f6d56d142e 100644 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -24,7 +24,7 @@ namespace doris { using namespace ErrorCode; -CumulativeCompaction::CumulativeCompaction(TabletSharedPtr tablet) +CumulativeCompaction::CumulativeCompaction(const TabletSharedPtr& tablet) : Compaction(tablet, "CumulativeCompaction:" + std::to_string(tablet->tablet_id())) {} CumulativeCompaction::~CumulativeCompaction() {} diff --git a/be/src/olap/cumulative_compaction.h b/be/src/olap/cumulative_compaction.h index f228e91975..fdfc9dbe46 100644 --- a/be/src/olap/cumulative_compaction.h +++ b/be/src/olap/cumulative_compaction.h @@ -26,7 +26,7 @@ namespace doris { class CumulativeCompaction : public Compaction { public: - CumulativeCompaction(TabletSharedPtr tablet); + CumulativeCompaction(const TabletSharedPtr& tablet); ~CumulativeCompaction() override; Status prepare_compact() override; diff --git a/be/src/olap/cumulative_compaction_policy.cpp b/be/src/olap/cumulative_compaction_policy.cpp index c2735def03..b38271fd1a 100644 --- a/be/src/olap/cumulative_compaction_policy.cpp +++ b/be/src/olap/cumulative_compaction_policy.cpp @@ -149,22 +149,21 @@ void SizeBasedCumulativeCompactionPolicy::update_cumulative_point( } } -void SizeBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score( - Tablet* tablet, TabletState state, const std::vector& all_metas, - int64_t current_cumulative_point, uint32_t* score) { +uint32_t SizeBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) { + uint32_t score = 0; bool base_rowset_exist = false; - const int64_t point = current_cumulative_point; + const int64_t point = tablet->cumulative_layer_point(); int64_t promotion_size = 0; std::vector rowset_to_compact; int64_t total_size = 0; - // check the base rowset and collect the rowsets of cumulative part - auto rs_meta_iter = all_metas.begin(); RowsetMetaSharedPtr first_meta; int64_t first_version = INT64_MAX; - for (; rs_meta_iter != all_metas.end(); rs_meta_iter++) { - auto rs_meta = *rs_meta_iter; + // NOTE: tablet._meta_lock is hold + auto& rs_metas = tablet->tablet_meta()->all_rs_metas(); + // check the base rowset and collect the rowsets of cumulative part + for (auto& rs_meta : rs_metas) { if (rs_meta->start_version() < first_version) { first_version = rs_meta->start_version(); first_meta = rs_meta; @@ -173,20 +172,19 @@ void SizeBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score( if (rs_meta->start_version() == 0) { base_rowset_exist = true; } - if (rs_meta->end_version() < point) { + if (rs_meta->end_version() < point || !rs_meta->is_local()) { // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here. continue; } else { // collect the rowsets of cumulative part total_size += rs_meta->total_disk_size(); - *score += rs_meta->get_compaction_score(); + score += rs_meta->get_compaction_score(); rowset_to_compact.push_back(rs_meta); } } if (first_meta == nullptr) { - *score = 0; - return; + return 0; } // Use "first"(not base) version to calc promotion size @@ -195,15 +193,14 @@ void SizeBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score( // If base version does not exist, but its state is RUNNING. // It is abnormal, do not select it and set *score = 0 - if (!base_rowset_exist && state == TABLET_RUNNING) { + if (!base_rowset_exist && tablet->tablet_state() == TABLET_RUNNING) { LOG(WARNING) << "tablet state is running but have no base version"; - *score = 0; - return; + return 0; } // if total_size is greater than promotion_size, return total score if (total_size >= promotion_size) { - return; + return score; } // sort the rowsets of cumulative part @@ -219,11 +216,12 @@ void SizeBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score( // if current level less then remain level, score contains current rowset // and process return; otherwise, score does not contains current rowset. if (current_level <= remain_level) { - return; + return score; } total_size -= rs_meta->total_disk_size(); - *score -= rs_meta->get_compaction_score(); + score -= rs_meta->get_compaction_score(); } + return score; } int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets( diff --git a/be/src/olap/cumulative_compaction_policy.h b/be/src/olap/cumulative_compaction_policy.h index 623688290d..20a9d64b61 100644 --- a/be/src/olap/cumulative_compaction_policy.h +++ b/be/src/olap/cumulative_compaction_policy.h @@ -54,9 +54,7 @@ public: /// param all_rowsets, all rowsets in tablet. /// param current_cumulative_point, current cumulative point value. /// return score, the result score after calculate. - virtual void calc_cumulative_compaction_score( - Tablet* tablet, TabletState state, const std::vector& all_rowsets, - int64_t current_cumulative_point, uint32_t* score) = 0; + virtual uint32_t calc_cumulative_compaction_score(Tablet* tablet) = 0; /// Pick input rowsets from candidate rowsets for compaction. This function is pure virtual function. /// Its implementation depends on concrete compaction policy. @@ -142,10 +140,7 @@ public: /// Num based cumulative compaction policy implements calc cumulative compaction score function. /// Its main policy is calculating the accumulative compaction score after current cumulative_point in tablet. - void calc_cumulative_compaction_score(Tablet* tablet, TabletState state, - const std::vector& all_rowsets, - int64_t current_cumulative_point, - uint32_t* score) override; + uint32_t calc_cumulative_compaction_score(Tablet* tablet) override; std::string name() override { return CUMULATIVE_SIZE_BASED_POLICY; } diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h index 35179caf6d..a624e59a6d 100644 --- a/be/src/olap/olap_common.h +++ b/be/src/olap/olap_common.h @@ -175,6 +175,7 @@ enum ReaderType { READER_BASE_COMPACTION = 2, READER_CUMULATIVE_COMPACTION = 3, READER_CHECKSUM = 4, + READER_COLD_DATA_COMPACTION = 5, }; constexpr bool field_is_slice_type(const FieldType& field_type) { diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 5f61adea46..707b77db5b 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -30,6 +30,7 @@ #include "common/status.h" #include "gutil/strings/substitute.h" #include "io/cache/file_cache_manager.h" +#include "olap/cold_data_compaction.h" #include "olap/cumulative_compaction.h" #include "olap/olap_common.h" #include "olap/olap_define.h" @@ -88,6 +89,10 @@ Status StorageEngine::start_bg_threads() { .set_max_threads(config::seg_compaction_max_threads) .build(&_seg_compaction_thread_pool); } + ThreadPoolBuilder("ColdDataCompactionTaskThreadPool") + .set_min_threads(config::cold_data_compaction_thread_num) + .set_max_threads(config::cold_data_compaction_thread_num) + .build(&_cold_data_compaction_thread_pool); // compaction tasks producer thread RETURN_IF_ERROR(Thread::create( @@ -162,6 +167,12 @@ Status StorageEngine::start_bg_threads() { &_remove_unused_remote_files_thread)); LOG(INFO) << "remove unused remote files thread started"; + RETURN_IF_ERROR(Thread::create( + "StorageEngine", "remove_unused_remote_files_thread", + [this]() { this->_cold_data_compaction_producer_callback(); }, + &_remove_unused_remote_files_thread)); + LOG(INFO) << "cold data compaction producer thread started"; + RETURN_IF_ERROR(Thread::create( "StorageEngine", "cache_file_cleaner_tasks_producer_thread", [this]() { this->_cache_file_cleaner_tasks_producer_callback(); }, @@ -755,6 +766,107 @@ void StorageEngine::_remove_unused_remote_files_callback() { } } +void StorageEngine::_cold_data_compaction_producer_callback() { + std::unordered_set tablet_submitted; + std::mutex tablet_submitted_mtx; + + while (!_stop_background_threads_latch.wait_for( + std::chrono::seconds(config::cold_data_compaction_interval_sec))) { + if (config::disable_auto_compaction) { + continue; + } + + std::unordered_set copied_tablet_submitted; + { + std::lock_guard lock(tablet_submitted_mtx); + copied_tablet_submitted = tablet_submitted; + } + int n = config::cold_data_compaction_thread_num - copied_tablet_submitted.size(); + if (n <= 0) { + continue; + } + auto tablets = _tablet_manager->get_all_tablet([&copied_tablet_submitted](Tablet* t) { + return t->tablet_meta()->cooldown_meta_id().initialized() && t->is_used() && + t->tablet_state() == TABLET_RUNNING && + !copied_tablet_submitted.count(t->tablet_id()) && + !t->tablet_meta()->tablet_schema()->disable_auto_compaction(); + }); + std::vector> tablet_to_compact; + tablet_to_compact.reserve(n + 1); + std::vector> tablet_to_follow; + tablet_to_follow.reserve(n + 1); + + for (auto& t : tablets) { + if (t->replica_id() == t->cooldown_replica_id()) { + auto score = t->calc_cold_data_compaction_score(); + if (score < 4) { + continue; + } + tablet_to_compact.emplace_back(t, score); + std::sort(tablet_to_compact.begin(), tablet_to_compact.end(), + [](auto& a, auto& b) { return a.second > b.second; }); + if (tablet_to_compact.size() > n) tablet_to_compact.pop_back(); + continue; + } + // else, need to follow + { + std::lock_guard lock(_running_cooldown_mutex); + if (_running_cooldown_tablets.count(t->table_id())) { + // already in cooldown queue + continue; + } + } + // TODO(plat1ko): some avoidance strategy if failed to follow + auto score = t->calc_cold_data_compaction_score(); + tablet_to_follow.emplace_back(t, score); + std::sort(tablet_to_follow.begin(), tablet_to_follow.end(), + [](auto& a, auto& b) { return a.second > b.second; }); + if (tablet_to_follow.size() > n) tablet_to_follow.pop_back(); + } + + for (auto& [tablet, score] : tablet_to_compact) { + LOG(INFO) << "submit cold data compaction. tablet_id=" << tablet->tablet_id() + << " score=" << score; + _cold_data_compaction_thread_pool->submit_func([&, t = std::move(tablet)]() { + auto compaction = std::make_shared(t); + { + std::lock_guard lock(tablet_submitted_mtx); + tablet_submitted.insert(t->tablet_id()); + } + auto st = compaction->compact(); + { + std::lock_guard lock(tablet_submitted_mtx); + tablet_submitted.erase(t->tablet_id()); + } + if (!st.ok()) { + LOG(WARNING) << "failed to do cold data compaction. tablet_id=" + << t->tablet_id() << " err=" << st; + } + }); + } + + for (auto& [tablet, score] : tablet_to_follow) { + LOG(INFO) << "submit to follow cooldown meta. tablet_id=" << tablet->tablet_id() + << " score=" << score; + _cold_data_compaction_thread_pool->submit_func([&, t = std::move(tablet)]() { + { + std::lock_guard lock(tablet_submitted_mtx); + tablet_submitted.insert(t->tablet_id()); + } + auto st = t->cooldown(); + { + std::lock_guard lock(tablet_submitted_mtx); + tablet_submitted.erase(t->tablet_id()); + } + if (!st.ok()) { + LOG(WARNING) << "failed to cooldown. tablet_id=" << t->tablet_id() + << " err=" << st; + } + }); + } + } +} + void StorageEngine::_cache_file_cleaner_tasks_producer_callback() { int64_t interval = config::generate_cache_cleaner_task_interval_sec; do { diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index 8d59085908..e201d49e68 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -308,6 +308,7 @@ Status TabletReader::_init_return_columns(const ReaderParams& read_params) { VLOG_NOTICE << "return column is empty, using full column as default."; } else if ((read_params.reader_type == READER_CUMULATIVE_COMPACTION || read_params.reader_type == READER_BASE_COMPACTION || + read_params.reader_type == READER_COLD_DATA_COMPACTION || read_params.reader_type == READER_ALTER_TABLE) && !read_params.return_columns.empty()) { _return_columns = read_params.return_columns; @@ -592,11 +593,12 @@ Status TabletReader::_init_delete_condition(const ReaderParams& read_params) { if (read_params.reader_type == READER_CUMULATIVE_COMPACTION) { return Status::OK(); } - // Only BASE_COMPACTION need set filter_delete = true + // Only BASE_COMPACTION and COLD_DATA_COMPACTION need set filter_delete = true // other reader type: // QUERY will filter the row in query layer to keep right result use where clause. // CUMULATIVE_COMPACTION will lost the filter_delete info of base rowset if (read_params.reader_type == READER_BASE_COMPACTION || + read_params.reader_type == READER_COLD_DATA_COMPACTION || read_params.reader_type == READER_CHECKSUM) { _filter_delete = true; } diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 768e95daf9..3fe49c3246 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -274,6 +274,7 @@ private: void _cooldown_tasks_producer_callback(); void _remove_unused_remote_files_callback(); + void _cold_data_compaction_producer_callback(); void _cache_file_cleaner_tasks_producer_callback(); @@ -372,6 +373,7 @@ private: std::unique_ptr _base_compaction_thread_pool; std::unique_ptr _cumu_compaction_thread_pool; std::unique_ptr _seg_compaction_thread_pool; + std::unique_ptr _cold_data_compaction_thread_pool; std::unique_ptr _tablet_publish_txn_thread_pool; @@ -396,6 +398,7 @@ private: scoped_refptr _cooldown_tasks_producer_thread; scoped_refptr _remove_unused_remote_files_thread; + scoped_refptr _cold_data_compaction_producer_thread; scoped_refptr _cache_file_cleaner_tasks_producer_thread; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 8748c67c86..a931b6c81b 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -418,6 +418,38 @@ Status Tablet::modify_rowsets(std::vector& to_add, return Status::OK(); } +void Tablet::add_rowsets(const std::vector& to_add) { + std::vector rs_metas; + rs_metas.reserve(to_add.size()); + for (auto& rs : to_add) { + _rs_version_map.emplace(rs->version(), rs); + _timestamped_version_tracker.add_version(rs->version()); + rs_metas.push_back(rs->rowset_meta()); + } + _tablet_meta->modify_rs_metas(rs_metas, {}); +} + +void Tablet::delete_rowsets(const std::vector& to_delete, bool move_to_stale) { + std::vector rs_metas; + rs_metas.reserve(to_delete.size()); + for (auto& rs : to_delete) { + rs_metas.push_back(rs->rowset_meta()); + _rs_version_map.erase(rs->version()); + } + _tablet_meta->modify_rs_metas({}, rs_metas, !move_to_stale); + if (move_to_stale) { + for (auto& rs : to_delete) { + _stale_rs_version_map[rs->version()] = rs; + } + _timestamped_version_tracker.add_stale_path_version(rs_metas); + } else { + for (auto& rs : to_delete) { + _timestamped_version_tracker.delete_version(rs->version()); + StorageEngine::instance()->add_unused_rowset(rs); + } + } +} + // snapshot manager may call this api to check if version exists, so that // the version maybe not exist const RowsetSharedPtr Tablet::get_rowset_by_version(const Version& version, @@ -854,6 +886,32 @@ uint32_t Tablet::calc_compaction_score( } } +uint32_t Tablet::calc_cold_data_compaction_score() const { + uint32_t score = 0; + std::vector cooldowned_rowsets; + int64_t max_delete_version = 0; + { + std::shared_lock rlock(_meta_lock); + for (auto& rs_meta : _tablet_meta->all_rs_metas()) { + if (!rs_meta->is_local()) { + cooldowned_rowsets.push_back(rs_meta); + if (rs_meta->has_delete_predicate() && + rs_meta->end_version() > max_delete_version) { + max_delete_version = rs_meta->end_version(); + } + } + } + } + for (auto& rs_meta : cooldowned_rowsets) { + if (rs_meta->end_version() < max_delete_version) { + score += rs_meta->num_segments(); + } else { + score += rs_meta->get_compaction_score(); + } + } + return (keys_type() != KeysType::DUP_KEYS) ? score * 2 : score; +} + uint32_t Tablet::_calc_cumulative_compaction_score( std::shared_ptr cumulative_compaction_policy) { #ifndef BE_TEST @@ -862,10 +920,7 @@ uint32_t Tablet::_calc_cumulative_compaction_score( _cumulative_compaction_policy = cumulative_compaction_policy; } #endif - uint32_t score = 0; - _cumulative_compaction_policy->calc_cumulative_compaction_score( - this, tablet_state(), _tablet_meta->all_rs_metas(), cumulative_layer_point(), &score); - return score; + return _cumulative_compaction_policy->calc_cumulative_compaction_score(this); } uint32_t Tablet::_calc_base_compaction_score() const { @@ -876,7 +931,7 @@ uint32_t Tablet::_calc_base_compaction_score() const { if (rs_meta->start_version() == 0) { base_rowset_exist = true; } - if (rs_meta->start_version() >= point) { + if (rs_meta->start_version() >= point || !rs_meta->is_local()) { // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here. continue; } @@ -1724,11 +1779,16 @@ Status Tablet::_cooldown_data(const std::shared_ptr& dest_ { std::unique_lock meta_wlock(_meta_lock); if (tablet_state() == TABLET_RUNNING) { - modify_rowsets(to_add, to_delete); + delete_rowsets({std::move(old_rowset)}, false); + add_rowsets({std::move(new_rowset)}); + // TODO(plat1ko): process primary key _tablet_meta->set_cooldown_meta_id(cooldown_meta_id); - save_meta(); } } + { + std::unique_lock meta_rlock(_meta_lock); + save_meta(); + } return Status::OK(); } @@ -1802,66 +1862,59 @@ Status Tablet::_follow_cooldowned_data(io::RemoteFileSystem* fs, int64_t cooldow std::vector overlap_rowsets; bool version_aligned = false; - std::lock_guard wlock(_meta_lock); - if (tablet_state() != TABLET_RUNNING) { - return Status::InternalError("tablet not running"); - } - - for (auto& [v, rs] : _rs_version_map) { - if (v.second == cooldowned_version) { - version_aligned = true; - break; + { + std::lock_guard wlock(_meta_lock); + if (tablet_state() != TABLET_RUNNING) { + return Status::InternalError("tablet not running"); } - } - if (!version_aligned) { - return Status::InternalError("cooldowned version is not aligned"); - } - for (auto& [v, rs] : _rs_version_map) { - if (v.second <= cooldowned_version) { - overlap_rowsets.push_back(rs); - } else if (!rs->is_local()) { - return Status::InternalError("cooldowned version larger than that to follow"); + + for (auto& [v, rs] : _rs_version_map) { + if (v.second == cooldowned_version) { + version_aligned = true; + break; + } } - } - std::sort(overlap_rowsets.begin(), overlap_rowsets.end(), Rowset::comparator); - auto rs_pb_it = cooldown_meta_pb.rs_metas().begin(); - auto rs_it = overlap_rowsets.begin(); - for (; rs_pb_it != cooldown_meta_pb.rs_metas().end() && rs_it != overlap_rowsets.end(); - ++rs_pb_it, ++rs_it) { - // skip cooldowned rowset with same version in BE - if ((*rs_it)->is_local() || rs_pb_it->end_version() != (*rs_it)->end_version()) { - break; + if (!version_aligned) { + return Status::InternalError("cooldowned version is not aligned"); } + for (auto& [v, rs] : _rs_version_map) { + if (v.second <= cooldowned_version) { + overlap_rowsets.push_back(rs); + } else if (!rs->is_local()) { + return Status::InternalError("cooldowned version larger than that to follow"); + } + } + std::sort(overlap_rowsets.begin(), overlap_rowsets.end(), Rowset::comparator); + auto rs_pb_it = cooldown_meta_pb.rs_metas().begin(); + auto rs_it = overlap_rowsets.begin(); + for (; rs_pb_it != cooldown_meta_pb.rs_metas().end() && rs_it != overlap_rowsets.end(); + ++rs_pb_it, ++rs_it) { + // skip cooldowned rowset with same version in BE + if ((*rs_it)->is_local() || rs_pb_it->end_version() != (*rs_it)->end_version()) { + break; + } + } + std::vector to_delete(rs_it, overlap_rowsets.end()); + std::vector to_add; + to_add.reserve(cooldown_meta_pb.rs_metas().end() - rs_pb_it); + for (; rs_pb_it != cooldown_meta_pb.rs_metas().end(); ++rs_pb_it) { + auto rs_meta = std::make_shared(); + rs_meta->init_from_pb(*rs_pb_it); + RowsetSharedPtr rs; + RowsetFactory::create_rowset(_schema, _tablet_path, std::move(rs_meta), &rs); + to_add.push_back(std::move(rs)); + } + // Note: We CANNOT call `modify_rowsets` here because `modify_rowsets` cannot process version graph correctly. + delete_rowsets(to_delete, false); + add_rowsets(to_add); + // TODO(plat1ko): process primary key + _tablet_meta->set_cooldown_meta_id(cooldown_meta_pb.cooldown_meta_id()); } - // Note: We CANNOT call `modify_rowsets` here because `modify_rowsets` cannot process version graph correctly. - std::vector to_delete; - to_delete.reserve(overlap_rowsets.end() - rs_it); - for (; rs_it != overlap_rowsets.end(); ++rs_it) { - _rs_version_map.erase((*rs_it)->version()); - to_delete.push_back((*rs_it)->rowset_meta()); - _timestamped_version_tracker.delete_version((*rs_it)->version()); - StorageEngine::instance()->add_unused_rowset(*rs_it); + { + std::lock_guard rlock(_meta_lock); + save_meta(); } - std::vector to_add; - to_add.reserve(cooldown_meta_pb.rs_metas().end() - rs_pb_it); - for (; rs_pb_it != cooldown_meta_pb.rs_metas().end(); ++rs_pb_it) { - auto rs_meta = std::make_shared(); - rs_meta->init_from_pb(*rs_pb_it); - RowsetSharedPtr rowset; - RowsetFactory::create_rowset(_schema, _tablet_path, std::move(rs_meta), &rowset); - _rs_version_map.emplace(rowset->version(), rowset); - to_add.push_back(rowset->rowset_meta()); - _timestamped_version_tracker.add_version(rowset->version()); - } - - _tablet_meta->modify_rs_metas(to_add, to_delete); - - // TODO(plat1ko): process primary key - - _tablet_meta->set_cooldown_meta_id(cooldown_meta_pb.cooldown_meta_id()); - save_meta(); - return Status::OK(); } diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index dc0034f065..f06463dcb3 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -293,9 +293,16 @@ public: Status create_rowset(RowsetMetaSharedPtr rowset_meta, RowsetSharedPtr* rowset); + // MUST hold EXCLUSIVE `_meta_lock` + void add_rowsets(const std::vector& to_add); + // MUST hold EXCLUSIVE `_meta_lock` + void delete_rowsets(const std::vector& to_delete, bool move_to_stale); + //////////////////////////////////////////////////////////////////////////// // begin cooldown functions //////////////////////////////////////////////////////////////////////////// + int64_t cooldown_replica_id() const { return _cooldown_replica_id; } + // Cooldown to remote fs. Status cooldown(); @@ -320,6 +327,10 @@ public: int64_t num_segments); static void remove_unused_remote_files(); + + std::shared_mutex& get_remote_files_lock() { return _remote_files_lock; } + + uint32_t calc_cold_data_compaction_score() const; //////////////////////////////////////////////////////////////////////////// // end cooldown functions //////////////////////////////////////////////////////////////////////////// diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index 8b147216a8..96adc4baad 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -146,7 +146,7 @@ public: bool in_restore_mode() const; void set_in_restore_mode(bool in_restore_mode); - TabletSchemaSPtr tablet_schema() const; + const TabletSchemaSPtr& tablet_schema() const; const TabletSchemaSPtr tablet_schema(Version version) const; @@ -529,7 +529,7 @@ inline void TabletMeta::set_in_restore_mode(bool in_restore_mode) { _in_restore_mode = in_restore_mode; } -inline TabletSchemaSPtr TabletMeta::tablet_schema() const { +inline const TabletSchemaSPtr& TabletMeta::tablet_schema() const { return _schema; } diff --git a/be/src/vec/olap/vertical_merge_iterator.cpp b/be/src/vec/olap/vertical_merge_iterator.cpp index f173c6b3da..1f86ca81a2 100644 --- a/be/src/vec/olap/vertical_merge_iterator.cpp +++ b/be/src/vec/olap/vertical_merge_iterator.cpp @@ -17,6 +17,8 @@ #include "vec/olap/vertical_merge_iterator.h" +#include "olap/olap_common.h" + namespace doris { using namespace ErrorCode; @@ -137,6 +139,8 @@ Status RowSourcesBuffer::_create_buffer_file() { file_path_ss << "_base"; } else if (_reader_type == READER_CUMULATIVE_COMPACTION) { file_path_ss << "_cumu"; + } else if (_reader_type == READER_COLD_DATA_COMPACTION) { + file_path_ss << "_cold"; } else { DCHECK(false); return Status::InternalError("unknown reader type");