diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt index 8eb8f71472..ad8ef14b63 100644 --- a/be/src/olap/CMakeLists.txt +++ b/be/src/olap/CMakeLists.txt @@ -30,6 +30,7 @@ add_library(Olap STATIC bloom_filter_reader.cpp bloom_filter_writer.cpp byte_buffer.cpp + compaction.cpp comparison_predicate.cpp compress.cpp cumulative_compaction.cpp diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp index 19b484bf19..f552806d59 100644 --- a/be/src/olap/base_compaction.cpp +++ b/be/src/olap/base_compaction.cpp @@ -16,513 +16,106 @@ // under the License. #include "olap/base_compaction.h" - -#include -#include -#include -#include -#include - -#include "olap/delete_handler.h" -#include "olap/merger.h" -#include "olap/rowset/column_data.h" -#include "olap/storage_engine.h" -#include "olap/tablet_meta.h" -#include "olap/rowset/segment_group.h" -#include "olap/tablet.h" -#include "olap/utils.h" #include "util/doris_metrics.h" -using std::list; -using std::map; -using std::string; -using std::vector; - namespace doris { -OLAPStatus BaseCompaction::init(TabletSharedPtr tablet, bool is_manual_trigger) { - // 表在首次查询或PUSH等操作时,会被加载到内存 - // 如果表没有被加载,表明该表上目前没有任何操作,所以不进行BE操作 - if (!tablet->init_succeeded()) { +BaseCompaction::BaseCompaction(TabletSharedPtr tablet) + : Compaction(tablet) +{ } + +BaseCompaction::~BaseCompaction() { } + +OLAPStatus BaseCompaction::compact() { + if (!_tablet->init_succeeded()) { return OLAP_ERR_INPUT_PARAMETER_ERROR; } - LOG(INFO) << "init base compaction handler. [tablet=" << tablet->full_name() << "]"; - - _tablet = tablet; - - // 1. 尝试取得base compaction的锁 - if (!_try_base_compaction_lock()) { - LOG(WARNING) << "another base compaction is running. tablet=" << tablet->full_name(); + MutexLock lock(_tablet->get_base_lock(), TRY_LOCK); + if (!lock.own_lock()) { + LOG(WARNING) << "another base compaction is running. tablet=" << _tablet->full_name(); return OLAP_ERR_BE_TRY_BE_LOCK_ERROR; } - // 2. 检查是否满足base compaction触发策略 - VLOG(3) << "check whether satisfy base compaction policy."; - bool is_policy_satisfied = false; - vector candidate_versions; - is_policy_satisfied = _check_whether_satisfy_policy(is_manual_trigger, &candidate_versions); + // 1. pick rowsets to compact + RETURN_NOT_OK(pick_rowsets_to_compact()); - // 2.1 如果不满足触发策略,则直接释放base compaction锁, 返回错误码 - if (!is_policy_satisfied) { - _release_base_compaction_lock(); - return OLAP_ERR_BE_NO_SUITABLE_VERSION; - } + // 2. do base compaction, merge rowsets + RETURN_NOT_OK(do_compaction()); - // 2.2 如果满足触发策略,触发base compaction - // 不释放base compaction锁, 在run()完成之后再释放 - if (!_validate_need_merged_versions(candidate_versions)) { - LOG(FATAL) << "error! invalid need merged versions"; - _release_base_compaction_lock(); - return OLAP_ERR_BE_INVALID_NEED_MERGED_VERSIONS; - } + // 3. set state to success + _state = CompactionState::SUCCESS; - _need_merged_versions = candidate_versions; + // 4. garbage collect input rowsets after base compaction + RETURN_NOT_OK(gc_unused_rowsets()); + + // 5. add metric to base compaction + DorisMetrics::base_compaction_deltas_total.increment(_input_rowsets.size()); + DorisMetrics::base_compaction_bytes_total.increment(_input_rowsets_size); return OLAP_SUCCESS; } -OLAPStatus BaseCompaction::run() { - LOG(INFO) << "start base compaction. tablet=" << _tablet->full_name() - << ", old_base_version=" << _old_base_version.second - << ", new_base_version=" << _new_base_version.second; - - OLAPStatus res = OLAP_SUCCESS; - OlapStopWatch stage_watch; - - // 1. 计算新base的version hash - VersionHash new_base_version_hash; - res = _tablet->compute_all_versions_hash(_need_merged_versions, &new_base_version_hash); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "fail to calculate new base version hash. tablet=" << _tablet->full_name() - << ", new_base_version=" << _new_base_version.second; - _garbage_collection(); - return res; +OLAPStatus BaseCompaction::pick_rowsets_to_compact() { + _input_rowsets.clear(); + _tablet->pick_candicate_rowsets_to_base_compaction(&_input_rowsets); + if (_input_rowsets.size() <= 1) { + LOG(WARNING) << "There is no enough rowsets to cumulative compaction." + << ", the size of rowsets to compact=" << _input_rowsets.size() + << ", cumulative_point=" << _tablet->cumulative_layer_point(); + return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS; } - VLOG(10) << "new_base_version_hash:" << new_base_version_hash; + std::sort(_input_rowsets.begin(), _input_rowsets.end(), Rowset::comparator); + RETURN_NOT_OK(check_version_continuity(_input_rowsets)); - // 2. 获取生成新base需要的data sources - vector rowsets; - res = _tablet->capture_consistent_rowsets(_need_merged_versions, &rowsets); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "fail to acquire need data sources. tablet=" << _tablet->full_name() - << ", version=" << _new_base_version.second; - _garbage_collection(); - return res; - } - - { - DorisMetrics::base_compaction_deltas_total.increment(_need_merged_versions.size()); - int64_t merge_bytes = 0; - for (auto& rowset : rowsets) { - merge_bytes += rowset->data_disk_size(); - } - DorisMetrics::base_compaction_bytes_total.increment(merge_bytes); - } - - // 3. 执行base compaction - // 执行过程可能会持续比较长时间 - stage_watch.reset(); - RowsetId rowset_id = 0; - RETURN_NOT_OK(_tablet->next_rowset_id(&rowset_id)); - RowsetWriterContext context; - context.rowset_id = rowset_id; - context.tablet_uid = _tablet->tablet_uid(); - context.tablet_id = _tablet->tablet_id(); - context.partition_id = _tablet->partition_id(); - context.tablet_schema_hash = _tablet->schema_hash(); - context.rowset_type = ALPHA_ROWSET; - context.rowset_path_prefix = _tablet->tablet_path(); - context.tablet_schema = &(_tablet->tablet_schema()); - context.rowset_state = VISIBLE; - context.data_dir = _tablet->data_dir(); - context.version = _new_base_version; - context.version_hash = new_base_version_hash; - - _rs_writer.reset(new (std::nothrow)AlphaRowsetWriter()); - if (_rs_writer == nullptr) { - LOG(WARNING) << "fail to new rowset."; - _garbage_collection(); - return OLAP_ERR_MALLOC_ERROR; - } - RETURN_NOT_OK(_rs_writer->init(context)); - res = _do_base_compaction(new_base_version_hash, rowsets); - _tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + std::to_string(_rs_writer->rowset_id())); - // 释放不再使用的ColumnData对象 - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "fail to do base version. tablet=" << _tablet->full_name() - << ", version=" << _new_base_version.second; - _garbage_collection(); - return res; - } - - // validate that delete action is right - // if error happened, sleep 1 hour. Report a fatal log every 1 minute - if (_validate_delete_file_action() != OLAP_SUCCESS) { - LOG(WARNING) << "failed to do base compaction. delete action has error."; - _garbage_collection(); - return OLAP_ERR_BE_ERROR_DELETE_ACTION; - } - - // 4. make new versions visable. - // If success, remove files belong to old versions; - // If fail, gc files belong to new versions. - vector unused_rowsets; - vector unused_versions; - _get_unused_versions(&unused_versions); - res = _tablet->capture_consistent_rowsets(unused_versions, &unused_rowsets); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "fail to capture consistent rowsets. tablet=" << _tablet->full_name() - << ", version=" << _new_base_version.second; - _garbage_collection(); - return res; - } - - res = _update_header(unused_rowsets); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "fail to update header. tablet=" << _tablet->full_name() - << ", version=" << _new_base_version.first << "-" << _new_base_version.second; - _garbage_collection(); - return res; - } - _delete_old_files(&unused_rowsets); - - _release_base_compaction_lock(); - - LOG(INFO) << "succeed to do base compaction. tablet=" << _tablet->full_name() - << ", base_version=" << _new_base_version.first << "-" << _new_base_version.second - << ". elapsed time of doing base compaction" - << ", time=" << stage_watch.get_elapse_second() << "s"; - - return OLAP_SUCCESS; -} - -static bool version_comparator(const Version& lhs, const Version& rhs) { - return lhs.second < rhs.second; -} - -bool BaseCompaction::_check_whether_satisfy_policy(bool is_manual_trigger, - vector* candidate_versions) { - ReadLock rdlock(_tablet->get_header_lock_ptr()); - int64_t cumulative_layer_point = _tablet->cumulative_layer_point(); - if (cumulative_layer_point == -1) { - LOG(FATAL) << "tablet has an unreasonable cumulative layer point. [tablet='" << _tablet->full_name() - << "' cumulative_layer_point=" << cumulative_layer_point << "]"; - return false; - } - - // 为了后面计算方便,我们在这里先将cumulative_layer_point减1 - --cumulative_layer_point; - - vector path_versions; - if (OLAP_SUCCESS != _tablet->capture_consistent_versions(Version(0, cumulative_layer_point), &path_versions)) { - LOG(WARNING) << "fail to select shortest version path. start=0, end=" << cumulative_layer_point; - return false; - } - - // base_compaction_layer_point应该为cumulative_layer_point之前,倒数第2个cumulative文件的end version - int64_t base_creation_time = 0; - size_t base_size = 0; - int32_t base_compaction_layer_point = -1; - for (unsigned int index = 0; index < path_versions.size(); ++index) { - Version temp = path_versions[index]; - // base文件 - if (temp.first == 0) { - _old_base_version = temp; - base_size = _tablet->get_rowset_size_by_version(temp); - base_creation_time = _tablet->get_rowset_by_version(temp)->creation_time(); - continue; - } - - if (temp.second == cumulative_layer_point) { - base_compaction_layer_point = temp.first - 1; - _latest_cumulative = temp; - _new_base_version = Version(0, base_compaction_layer_point); - } - } - - // 只有1个base文件和1个delta文件 - if (base_compaction_layer_point == -1) { - VLOG(3) << "can't do base compaction: no cumulative files." - << "tablet=" << _tablet->full_name() - << ", base_version=0-" << _old_base_version.second - << ", cumulative_layer_point=" << cumulative_layer_point + 1; - return false; - } - - // 只有1个cumulative文件 - if (base_compaction_layer_point == _old_base_version.second) { - VLOG(3) << "can't do base compaction: only one cumulative file." - << "tablet=" << _tablet->full_name() - << ", base_version=0-" << _old_base_version.second - << ", cumulative_layer_point=" << cumulative_layer_point + 1; - return false; - } - - // 使用最短路径算法,选择可合并的cumulative版本 - if (OLAP_SUCCESS != _tablet->capture_consistent_versions(_new_base_version, candidate_versions)) { - LOG(WARNING) << "fail to select shortest version path." - << "start=" << _new_base_version.first - << ", end=" << _new_base_version.second; - return false; - } - - std::sort(candidate_versions->begin(), candidate_versions->end(), version_comparator); - - // 如果是手动执行START_BASE_COMPACTION命令,则不检查base compaction policy, - // 也不考虑删除版本过期问题, 只要有可以合并的cumulative,就执行base compaction - if (is_manual_trigger) { - VLOG(3) << "manual trigger base compaction. tablet=" << _tablet->full_name(); - return true; - } - - // 统计可合并cumulative版本文件的总大小 - size_t cumulative_total_size = 0; - for (vector::const_iterator version_iter = candidate_versions->begin(); - version_iter != candidate_versions->end(); ++version_iter) { - Version temp = *version_iter; - // 跳过base文件 - if (temp.first == 0) { - continue; - } - // cumulative文件 - cumulative_total_size += _tablet->get_rowset_size_by_version(temp); - } - - // 检查是否满足base compaction的触发条件 - // 满足以下条件时触发base compaction: 触发条件1 || 触发条件2 || 触发条件3 - // 触发条件1:cumulative文件个数超过一个阈值 - const uint32_t base_compaction_num_cumulative_deltas - = config::base_compaction_num_cumulative_deltas; - // candidate_versions中包含base文件,所以这里减1 - if (candidate_versions->size() - 1 >= base_compaction_num_cumulative_deltas) { + // 1. cumulative rowset must reach base_compaction_num_cumulative_deltas threshold + if (_input_rowsets.size() > config::base_compaction_num_cumulative_deltas) { LOG(INFO) << "satisfy the base compaction policy. tablet="<< _tablet->full_name() - << ", num_cumulative_deltas=" << candidate_versions->size() - 1 - << ", base_compaction_num_cumulative_deltas=" << base_compaction_num_cumulative_deltas; - return true; + << ", num_cumulative_rowsets=" << _input_rowsets.size() - 1 + << ", base_compaction_num_cumulative_rowsets=" << config::base_compaction_num_cumulative_deltas; + return OLAP_SUCCESS; } - // 触发条件2:所有cumulative文件的大小超过base文件大小的某一比例 - const double base_cumulative_delta_ratio = config::base_cumulative_delta_ratio; + // 2. the ratio between base rowset and all input cumulative rowsets reachs the threshold + int64_t base_size = 0; + int64_t cumulative_total_size = 0; + for (auto& rowset : _input_rowsets) { + if (rowset->start_version() != 0) { + cumulative_total_size += rowset->data_disk_size(); + } else { + base_size = rowset->data_disk_size(); + } + } + + double base_cumulative_delta_ratio = config::base_cumulative_delta_ratio; double cumulative_base_ratio = static_cast(cumulative_total_size) / base_size; + if (cumulative_base_ratio > base_cumulative_delta_ratio) { LOG(INFO) << "satisfy the base compaction policy. tablet=" << _tablet->full_name() - << ", cumualtive_total_size=" << cumulative_total_size - << ", base_size=" << base_size - << ", cumulative_base_ratio=" << cumulative_base_ratio - << ", policy_ratio=" << base_cumulative_delta_ratio; - return true; + << ", cumualtive_total_size=" << cumulative_total_size + << ", base_size=" << base_size + << ", cumulative_base_ratio=" << cumulative_base_ratio + << ", policy_ratio=" << base_cumulative_delta_ratio; + return OLAP_SUCCESS; } - // 触发条件3:距离上一次进行base compaction已经超过设定的间隔时间 - const uint32_t interval_since_last_operation = config::base_compaction_interval_seconds_since_last_operation; - int64_t interval_since_last_be = time(NULL) - base_creation_time; - if (interval_since_last_be > interval_since_last_operation) { + // 3. the interval since last base compaction reachs the threshold + int64_t base_creation_time = _input_rowsets[0]->creation_time(); + int64_t interval_threshold = config::base_compaction_interval_seconds_since_last_operation; + int64_t interval_since_last_base_compaction = time(NULL) - base_creation_time; + if (interval_since_last_base_compaction > interval_threshold) { LOG(INFO) << "satisfy the base compaction policy. tablet=" << _tablet->full_name() - << ", interval_since_last_be=" << interval_since_last_be - << ", policy_interval=" << interval_since_last_operation; - return true; + << ", interval_since_last_base_compaction=" << interval_since_last_base_compaction + << ", interval_threshold=" << interval_threshold; + return OLAP_SUCCESS; } - VLOG(3) << "don't satisfy the base compaction policy. tablet=" << _tablet->full_name() - << ", cumulative_files_number=" << candidate_versions->size() - 1 - << ", cumulative_base_ratio=" << cumulative_base_ratio - << ", interval_since_last_be=" << interval_since_last_be; - - return false; -} - -OLAPStatus BaseCompaction::_do_base_compaction(VersionHash new_base_version_hash, - const vector& rowsets) { - OlapStopWatch watch; - vector rs_readers; - for (auto& rowset : rowsets) { - RowsetReaderSharedPtr rs_reader(rowset->create_reader()); - if (rs_reader == nullptr) { - LOG(WARNING) << "rowset create reader failed. rowset:" << rowset->rowset_id(); - return OLAP_ERR_ROWSET_CREATE_READER; - } - rs_readers.push_back(rs_reader); - } - - LOG(INFO) << "start merge new base. tablet=" << _tablet->full_name() - << ", version=" << _new_base_version.second; - // 2. 执行base compaction的merge - // 注意:无论是行列存,还是列存,在执行merge时都使用Merger类,不能使用MassiveMerger。 - // 原因:MassiveMerger中的base文件不是通过Reader读取的,所以会导致删除条件失效, - // 无法达到删除数据的目的 - // 想法:如果一定要使用MassiveMerger,这里可以提供一种方案 - // 1. 在此处加一个检查,检测此次BE是否包含删除条件, 即检查Reader中 - // ReaderParams的delete_handler - // 2. 如果包含删除条件,则不使用MassiveMerger,使用Merger - // 3. 如果不包含删除条件,则可以使用MassiveMerger - uint64_t merged_rows = 0; - uint64_t filted_rows = 0; - OLAPStatus res = OLAP_SUCCESS; - - Merger merger(_tablet, _rs_writer, READER_BASE_COMPACTION); - res = merger.merge(rs_readers, &merged_rows, &filted_rows); - // 3. 如果merge失败,执行清理工作,返回错误码退出 - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "fail to make new base version. res=" << res - << ", tablet=" << _tablet->full_name() - << ", version=" << _new_base_version.first - << "-" << _new_base_version.second; - return OLAP_ERR_BE_MERGE_ERROR; - } - RowsetSharedPtr new_base = _rs_writer->build(); - if (new_base == nullptr) { - LOG(WARNING) << "rowset writer build failed. writer version:" - << _rs_writer->version().first << "-" << _rs_writer->version().second; - return OLAP_ERR_MALLOC_ERROR; - } - - // 4. 如果merge成功,则将新base文件对应的olap index载入 - _new_rowsets.push_back(new_base); - - VLOG(10) << "merge new base success, start load index. tablet=" << _tablet->full_name() - << ", version=" << _new_base_version.second; - - // Check row num changes - uint64_t source_rows = 0; - for (auto& rowset : rowsets) { - source_rows += rowset->num_rows(); - } - bool row_nums_check = config::row_nums_check; - if (row_nums_check) { - if (source_rows != new_base->num_rows() + merged_rows + filted_rows) { - LOG(WARNING) << "fail to check row num!" - << "source_rows=" << source_rows - << ", merged_rows=" << merged_rows - << ", filted_rows=" << filted_rows - << ", new_index_rows=" << new_base->num_rows(); - return OLAP_ERR_CHECK_LINES_ERROR; - } - } else { - LOG(INFO) << "all row nums." - << "source_rows=" << source_rows - << ", merged_rows=" << merged_rows - << ", filted_rows=" << filted_rows - << ", new_index_rows=" << new_base->num_rows() - << ", merged_version_num=" << _need_merged_versions.size() - << ", time_us=" << watch.get_elapse_time_us(); - } - - return OLAP_SUCCESS; -} - -OLAPStatus BaseCompaction::_update_header(const vector& unused_rowsets) { - WriteLock wrlock(_tablet->get_header_lock_ptr()); - - OLAPStatus res = OLAP_SUCCESS; - // 由于在modify_rowsets中可能会发生很小概率的非事务性失败, 因此这里定位FATAL错误 - res = _tablet->modify_rowsets(_new_rowsets, unused_rowsets); - if (res != OLAP_SUCCESS) { - LOG(FATAL) << "fail to replace data sources. res" << res - << ", tablet=" << _tablet->full_name() - << ", new_base_version=" << _new_base_version.second - << ", old_base_verison=" << _old_base_version.second; - return res; - } - - // 如果保存Header失败, 所有新增的信息会在下次启动时丢失, 属于严重错误 - // 暂时没办法做很好的处理,报FATAL - res = _tablet->save_meta(); - if (res != OLAP_SUCCESS) { - LOG(FATAL) << "fail to save tablet meta. res=" << res - << ", tablet=" << _tablet->full_name() - << ", new_base_version=" << _new_base_version.second - << ", old_base_version=" << _old_base_version.second; - return OLAP_ERR_BE_SAVE_HEADER_ERROR; - } - _new_rowsets.clear(); - - return OLAP_SUCCESS; -} - -void BaseCompaction::_delete_old_files(vector* unused_indices) { - if (!unused_indices->empty()) { - StorageEngine* storage_engine = StorageEngine::instance(); - - for (vector::iterator it = unused_indices->begin(); - it != unused_indices->end(); ++it) { - storage_engine->add_unused_rowset(*it); - } - } -} - -void BaseCompaction::_garbage_collection() { - // 清理掉已生成的版本文件 - StorageEngine* storage_engine = StorageEngine::instance(); - for (vector::iterator it = _new_rowsets.begin(); - it != _new_rowsets.end(); ++it) { - storage_engine->add_unused_rowset(*it); - } - _new_rowsets.clear(); - - _release_base_compaction_lock(); -} - -bool BaseCompaction::_validate_need_merged_versions( - const vector& candidate_versions) { - if (candidate_versions.size() <= 1) { - LOG(WARNING) << "unenough versions need to be merged. size=" << candidate_versions.size(); - return false; - } - - // 1. validate versions in candidate_versions are continuous - // Skip the first element - for (unsigned int index = 1; index < candidate_versions.size(); ++index) { - Version previous_version = candidate_versions[index - 1]; - Version current_version = candidate_versions[index]; - if (current_version.first != previous_version.second + 1) { - LOG(WARNING) << "wrong need merged version. " - << "previous_version=" << previous_version.first - << "-" << previous_version.second - << ", current_version=" << current_version.first - << "-" << current_version.second; - return false; - } - } - - // 2. validate m_new_base_version is OK - if (_new_base_version.first != 0 - || _new_base_version.first != candidate_versions.begin()->first - || _new_base_version.second != candidate_versions.rbegin()->second) { - LOG(WARNING) << "new_base_version is wrong." - << " new_base_version=" << _new_base_version.first - << "-" << _new_base_version.second - << ", vector_version=" << candidate_versions.begin()->first - << "-" << candidate_versions.rbegin()->second; - return false; - } - - VLOG(10) << "valid need merged version"; - return true; -} - -OLAPStatus BaseCompaction::_validate_delete_file_action() { - // 1. acquire the latest version to make sure all is right after deleting files - ReadLock rdlock(_tablet->get_header_lock_ptr()); - const RowsetSharedPtr rowset = _tablet->rowset_with_max_version(); - if (rowset == nullptr) { - LOG(INFO) << "version is -1 when validate_delete_file_action"; - return OLAP_ERR_BE_ERROR_DELETE_ACTION; - } - Version spec_version = Version(0, rowset->end_version()); - vector rs_readers; - _tablet->capture_rs_readers(spec_version, &rs_readers); - - if (rs_readers.empty()) { - LOG(INFO) << "acquire data sources failed. version=" - << spec_version.first << "-" << spec_version.second; - return OLAP_ERR_BE_ERROR_DELETE_ACTION; - } - - VLOG(3) << "delete file action is OK"; - - return OLAP_SUCCESS; + LOG(INFO) << "don't satisfy the base compaction policy. tablet=" << _tablet->full_name() + << ", num_cumulative_rowsets=" << _input_rowsets.size() - 1 + << ", cumulative_base_ratio=" << cumulative_base_ratio + << ", interval_since_last_base_compaction=" << interval_since_last_base_compaction; + return OLAP_ERR_BE_NO_SUITABLE_VERSION; } } // namespace doris diff --git a/be/src/olap/base_compaction.h b/be/src/olap/base_compaction.h index 331b3c400e..5110734cdb 100644 --- a/be/src/olap/base_compaction.h +++ b/be/src/olap/base_compaction.h @@ -18,167 +18,33 @@ #ifndef DORIS_BE_SRC_OLAP_BASE_COMPACTION_H #define DORIS_BE_SRC_OLAP_BASE_COMPACTION_H -#include -#include - -#include "olap/olap_common.h" -#include "olap/olap_define.h" -#include "olap/tablet.h" -#include "rowset/rowset_id_generator.h" -#include "rowset/alpha_rowset_writer.h" +#include "olap/compaction.h" namespace doris { -class Rowset; -class RowsetReader; +// BaseCompaction is derived from Compaction. +// BaseCompaction will implements +// 1. its policy to pick rowsests +// 2. do compaction to produce new rowset. -// @brief 实现对START_BASE_COMPACTION命令的处理逻辑,并返回处理结果 -class BaseCompaction { +class BaseCompaction : public Compaction { public: - BaseCompaction() : - _new_base_version(0, 0), - _old_base_version(0, 0), - _base_compaction_locked(false), - _rs_writer(nullptr) {} + BaseCompaction(TabletSharedPtr tablet); + ~BaseCompaction() override; - virtual ~BaseCompaction() { - _release_base_compaction_lock(); + OLAPStatus compact() override; + +protected: + OLAPStatus pick_rowsets_to_compact() override; + std::string compaction_name() const override { + return "base compaction"; } - // 初始化BaseCompaction, 主要完成以下工作: - // 1. 检查是否满足base compaction策略 - // 2. 如果满足,计算需要合并哪些版本 - // - // 输入参数: - // - tablet: 待执行BE的Tablet的智能指针 - // - is_manual_trigger - // - 如果为true,则是手动执行START_BASE_COMPACTION命令 - // - 如果为false,则是根据BE策略来执行 - // - // 返回值: - // - 如果init执行成功,即可以执行BE,则返回OLAP_SUCCESS; - // - 其它情况下,返回相应的错误码 - OLAPStatus init(TabletSharedPtr tablet, bool is_manual_trigger = false); - - // 执行BaseCompaction, 可能会持续很长时间 - // - // 返回值: - // - 如果执行成功,则返回OLAP_SUCCESS; - // - 其它情况下,返回相应的错误码 - OLAPStatus run(); + ReaderType compaction_type() const override { + return ReaderType::READER_BASE_COMPACTION; + } private: - // 检验当前情况是否满足base compaction的触发策略 - // - // 输入参数: - // - is_manual_trigger: 是否是手动执行START_BASE_COMPACTION命令 - // 输出参数 - // - candidate_versions: BE可合并的cumulative文件 - // - // 返回值: - // - 如果满足触发策略,返回true - // - 如果不满足,返回false - bool _check_whether_satisfy_policy(bool is_manual_trigger, - std::vector* candidate_versions); - - // 生成新的Base - // - // 输入参数: - // - new_base_version_hash: 新Base的VersionHash - // - rs_readers : 生成新Base需要的RowsetReaders* - // - row_count: 生成Base过程中产生的row_count - // - // 返回值: - // - 如果执行成功,则返回OLAP_SUCCESS; - // - 其它情况下,返回相应的错误码 - OLAPStatus _do_base_compaction(VersionHash new_base_version_hash, - const std::vector& rowsets); - - // 更新Header使得修改对外可见 - // 输入参数: - // - unused_rowsets: 需要被物理删除的Rowset* - // - // 返回值: - // - 如果执行成功,则返回OLAP_SUCCESS; - // - 其它情况下,返回相应的错误码 - OLAPStatus _update_header(const std::vector& unused_rowsets); - - // 删除不再使用的Rowset - // - // 输入参数: - // - unused_rowsets: 需要被物理删除的Rowset* - // - // 返回值: - // - 如果执行成功,则返回OLAP_SUCCESS; - // - 其它情况下,返回相应的错误码 - void _delete_old_files(std::vector* unused_indices); - - // 其它函数执行失败时,调用该函数进行清理工作 - void _garbage_collection(); - - // 验证得到的candidate_versions是否正确 - // - // 返回值: - // - 如果错误,返回false - // - 如果正确,返回true - bool _validate_need_merged_versions(const std::vector& candidate_versions); - - // 验证删除文件操作是否正确 - // - // 返回值: - // - 如果错误,返回OLAP_ERR_BE_ERROR_DELETE_ACTION - // - 如果正确,返回OLAP_SUCCESS - OLAPStatus _validate_delete_file_action(); - - void _get_unused_versions(std::vector* unused_versions) { - unused_versions->clear(); - - std::vector all_versions; - _tablet->list_versions(&all_versions); - for (std::vector::const_iterator iter = all_versions.begin(); - iter != all_versions.end(); ++iter) { - if (iter->first <= _new_base_version.second) { - unused_versions->push_back(*iter); - } - } - } - - // 根据version.second值,比较2个version的大小 - static bool _version_compare(const Version& left, const Version& right) { - return left.second < right.second; - } - - bool _try_base_compaction_lock() { - if (_tablet->try_base_compaction_lock()) { - _base_compaction_locked = true; - return true; - } - - return false; - } - - void _release_base_compaction_lock() { - if (_base_compaction_locked) { - _tablet->release_base_compaction_lock(); - _base_compaction_locked = false; - } - } - - // 需要进行操作的Table指针 - TabletSharedPtr _tablet; - // 新base的version - Version _new_base_version; - // 现有base的version - Version _old_base_version; - // 现有的版本号最大的cumulative - Version _latest_cumulative; - // 在此次base compaction执行过程中,将被合并的cumulative文件版本 - std::vector _need_merged_versions; - // 需要新增的版本对应Rowset的 - std::vector _new_rowsets; - bool _base_compaction_locked; - RowsetWriterSharedPtr _rs_writer; - DISALLOW_COPY_AND_ASSIGN(BaseCompaction); }; diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp new file mode 100644 index 0000000000..8a892d197c --- /dev/null +++ b/be/src/olap/compaction.cpp @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/compaction.h" + +using std::vector; + +namespace doris { + +Compaction::Compaction(TabletSharedPtr tablet) + : _tablet(tablet), + _input_rowsets_size(0), + _input_row_num(0), + _state(CompactionState::INITED) +{ } + +Compaction::~Compaction() { } + +OLAPStatus Compaction::do_compaction() { + LOG(INFO) << "start " << compaction_name() << ". tablet=" << _tablet->full_name(); + + OlapStopWatch watch; + + // 1. prepare cumulative_version and cumulative_version + _output_version = Version(_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version()); + _tablet->compute_version_hash_from_rowsets(_input_rowsets, &_output_version_hash); + + RETURN_NOT_OK(construct_output_rowset_writer()); + RETURN_NOT_OK(construct_input_rowset_readers()); + + Merger merger(_tablet, compaction_type(), _output_rs_writer, _input_rs_readers); + OLAPStatus res = merger.merge(); + + // 2. 如果merge失败,执行清理工作,返回错误码退出 + if (res != OLAP_SUCCESS) { + LOG(WARNING) << "fail to do " << compaction_name() + << ". res=" << res + << ", tablet=" << _tablet->full_name() + << ", output_version=" << _output_version.first + << "-" << _output_version.second; + return res; + } + + _output_rowset = _output_rs_writer->build(); + if (_output_rowset == nullptr) { + LOG(WARNING) << "rowset writer build failed. writer version:" + << ", output_version=" << _output_version.first + << "-" << _output_version.second; + return OLAP_ERR_MALLOC_ERROR; + } + + // 3. check correctness + RETURN_NOT_OK(check_correctness(merger)); + + // 4. modify rowsets in memory + RETURN_NOT_OK(modify_rowsets()); + + LOG(INFO) << "succeed to do " << compaction_name() + << ". tablet=" << _tablet->full_name() + << ", output_version=" << _output_version.first + << "-" << _output_version.second + << ". elapsed time=" << watch.get_elapse_second() << "s."; + + return OLAP_SUCCESS; +} + +OLAPStatus Compaction::construct_output_rowset_writer() { + RowsetId rowset_id = 0; + RETURN_NOT_OK(_tablet->next_rowset_id(&rowset_id)); + RowsetWriterContext context; + context.rowset_id = rowset_id; + context.tablet_uid = _tablet->tablet_uid(); + context.tablet_id = _tablet->tablet_id(); + context.partition_id = _tablet->partition_id(); + context.tablet_schema_hash = _tablet->schema_hash(); + context.rowset_type = ALPHA_ROWSET; + context.rowset_path_prefix = _tablet->tablet_path(); + context.tablet_schema = &(_tablet->tablet_schema()); + context.rowset_state = VISIBLE; + context.data_dir = _tablet->data_dir(); + context.version = _output_version; + context.version_hash = _output_version_hash; + + _output_rs_writer.reset(new (std::nothrow)AlphaRowsetWriter()); + RETURN_NOT_OK(_output_rs_writer->init(context)); + return OLAP_SUCCESS; +} + +OLAPStatus Compaction::construct_input_rowset_readers() { + for (auto& rowset : _input_rowsets) { + RowsetReaderSharedPtr rs_reader(rowset->create_reader()); + if (rs_reader == nullptr) { + LOG(WARNING) << "rowset create reader failed. rowset:" << rowset->rowset_id(); + return OLAP_ERR_ROWSET_CREATE_READER; + } + _input_rs_readers.push_back(rs_reader); + } + return OLAP_SUCCESS; +} + +OLAPStatus Compaction::modify_rowsets() { + std::vector output_rowsets; + output_rowsets.push_back(_output_rowset); + + WriteLock wrlock(_tablet->get_header_lock_ptr()); + OLAPStatus res = _tablet->modify_rowsets(output_rowsets, _input_rowsets); + if (res != OLAP_SUCCESS) { + LOG(FATAL) << "fail to replace data sources. res" << res + << ", tablet=" << _tablet->full_name() + << ", compaction__version=" << _output_version.first + << "-" << _output_version.second; + return res; + } + + res = _tablet->save_meta(); + if (res != OLAP_SUCCESS) { + LOG(FATAL) << "fail to save tablet meta. res=" << res + << ", tablet=" << _tablet->full_name() + << ", compaction_version=" << _output_version.first + << "-" << _output_version.second; + return OLAP_ERR_BE_SAVE_HEADER_ERROR; + } + + return OLAP_SUCCESS; +} + +OLAPStatus Compaction::gc_unused_rowsets() { + StorageEngine* storage_engine = StorageEngine::instance(); + if (_state != CompactionState::SUCCESS) { + storage_engine->add_unused_rowset(_output_rowset); + return OLAP_SUCCESS; + } + for (auto& rowset : _input_rowsets) { + storage_engine->add_unused_rowset(rowset); + } + return OLAP_SUCCESS; +} + +OLAPStatus Compaction::check_version_continuity(const vector& rowsets) { + RowsetSharedPtr prev_rowset = rowsets.front(); + for (size_t i = 1; i < rowsets.size(); ++i) { + RowsetSharedPtr rowset = rowsets[i]; + if (rowset->start_version() != prev_rowset->end_version() + 1) { + LOG(WARNING) << "There are missed versions among rowsets. " + << "prev_rowset verison=" << prev_rowset->start_version() + << "-" << prev_rowset->end_version() + << ", rowset version=" << rowset->start_version() + << "-" << rowset->end_version(); + return OLAP_ERR_CUMULATIVE_MISS_VERSION; + } + prev_rowset = rowset; + } + + return OLAP_SUCCESS; +} + +OLAPStatus Compaction::check_correctness(const Merger& merger) { + // 1. check row number + if (_input_row_num != _output_rowset->num_rows() + merger.merged_rows() + merger.filted_rows()) { + LOG(FATAL) << "row_num does not match between cumulative input and output! " + << "input_row_num=" << _input_row_num + << ", merged_row_num=" << merger.merged_rows() + << ", filted_row_num=" << merger.filted_rows() + << ", output_row_num=" << _output_rowset->num_rows(); + return OLAP_ERR_CHECK_LINES_ERROR; + } + + return OLAP_SUCCESS; +} + +} // namespace doris diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h new file mode 100644 index 0000000000..ebe77de40d --- /dev/null +++ b/be/src/olap/compaction.h @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef DORIS_BE_SRC_OLAP_COMPACTION_H +#define DORIS_BE_SRC_OLAP_COMPACTION_H + +#include + +#include "olap/merger.h" +#include "olap/olap_common.h" +#include "olap/olap_define.h" +#include "olap/storage_engine.h" +#include "olap/tablet.h" +#include "olap/tablet_meta.h" +#include "olap/utils.h" +#include "rowset/alpha_rowset_writer.h" +#include "rowset/rowset_id_generator.h" + +namespace doris { + +class Merger; + +// This class is a base class for compaction. +// The entrance of this class is compact() +// Any compaction should go through four procedures. +// 1. pick rowsets satisfied to compact +// 2. do compaction +// 3. modify rowsets +// 4. gc unused rowstes +class Compaction { +public: + Compaction(TabletSharedPtr tablet); + virtual ~Compaction(); + + virtual OLAPStatus compact() = 0; + +protected: + virtual OLAPStatus pick_rowsets_to_compact() = 0; + virtual std::string compaction_name() const = 0; + virtual ReaderType compaction_type() const = 0; + + OLAPStatus do_compaction(); + OLAPStatus modify_rowsets(); + OLAPStatus gc_unused_rowsets(); + + OLAPStatus construct_output_rowset_writer(); + OLAPStatus construct_input_rowset_readers(); + + OLAPStatus check_version_continuity(const std::vector& rowsets); + OLAPStatus check_correctness(const Merger& merger); + +protected: + TabletSharedPtr _tablet; + + std::vector _input_rowsets; + std::vector _input_rs_readers; + int64_t _input_rowsets_size; + int64_t _input_row_num; + + RowsetSharedPtr _output_rowset; + RowsetWriterSharedPtr _output_rs_writer; + + enum CompactionState { + INITED = 0, + SUCCESS = 1 + }; + CompactionState _state; + + Version _output_version; + VersionHash _output_version_hash; + + DISALLOW_COPY_AND_ASSIGN(Compaction); +}; + +} // namespace doris + +#endif // DORIS_BE_SRC_OLAP_COMPACTION_H diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index db9b3757d9..6547e5d79a 100755 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -16,553 +16,110 @@ // under the License. #include "olap/cumulative_compaction.h" - -#include -#include -#include - -#include "olap/storage_engine.h" #include "util/doris_metrics.h" -#include "olap/rowset/alpha_rowset_writer.h" - -using std::list; -using std::nothrow; -using std::sort; -using std::vector; namespace doris { -OLAPStatus CumulativeCompaction::init(TabletSharedPtr tablet) { - LOG(INFO) << "init cumulative compaction handler. tablet=" << tablet->full_name(); +CumulativeCompaction::CumulativeCompaction(TabletSharedPtr tablet) + : Compaction(tablet), + _cumulative_rowset_size_threshold(config::cumulative_compaction_budgeted_bytes) +{ } - if (_is_init) { - LOG(WARNING) << "cumulative handler has been inited. tablet=" << tablet->full_name(); - return OLAP_ERR_CUMULATIVE_REPEAT_INIT; - } +CumulativeCompaction::~CumulativeCompaction() { } - if (!tablet->init_succeeded()) { +OLAPStatus CumulativeCompaction::compact() { + if (!_tablet->init_succeeded()) { return OLAP_ERR_CUMULATIVE_INVALID_PARAMETERS; } - _tablet = tablet; - _max_delta_file_size = config::cumulative_compaction_budgeted_bytes; - - if (!_tablet->try_cumulative_lock()) { - LOG(INFO) << "skip compaction, because another cumulative is running. tablet=" << _tablet->full_name(); + MutexLock lock(_tablet->get_cumulative_lock(), TRY_LOCK); + if (!lock.own_lock()) { + LOG(INFO) << "The tablet is under cumulative compaction. tablet=" << _tablet->full_name(); return OLAP_ERR_CE_TRY_CE_LOCK_ERROR; } - _tablet->obtain_header_rdlock(); - _old_cumulative_layer_point = _tablet->cumulative_layer_point(); - _tablet->release_header_lock(); - // 如果为-1,则该table之前没有设置过cumulative layer point - // 我们在这里设置一下 - if (_old_cumulative_layer_point == -1) { - LOG(INFO) << "tablet has an unreasonable cumulative layer point. tablet=" << _tablet->full_name() - << ", cumulative_layer_point=" << _old_cumulative_layer_point; - _tablet->release_cumulative_lock(); - return OLAP_ERR_CUMULATIVE_INVALID_PARAMETERS; - } + // 1.calculate cumulative point + RETURN_NOT_OK(_tablet->calculate_cumulative_point()); - _tablet->obtain_header_wrlock(); - OLAPStatus res = _calculate_need_merged_versions(); - _tablet->release_header_lock(); - if (res != OLAP_SUCCESS) { - _tablet->release_cumulative_lock(); - LOG(INFO) << "no suitable delta versions. don't do cumulative compaction now."; - return res; - } + // 2. pick rowsets to compact + RETURN_NOT_OK(pick_rowsets_to_compact()); - if (!_validate_need_merged_versions()) { - _tablet->release_cumulative_lock(); - LOG(FATAL) << "error! invalid need merged versions."; - return OLAP_ERR_CUMULATIVE_INVALID_NEED_MERGED_VERSIONS; - } + // 3. do cumulative compaction, merge rowsets + RETURN_NOT_OK(do_compaction()); + + // 4. set state to success + _state = CompactionState::SUCCESS; + + // 5. set cumulative point + _tablet->set_cumulative_layer_point(_input_rowsets.back()->end_version() + 1); + + // 6. garbage collect input rowsets after cumulative compaction + RETURN_NOT_OK(gc_unused_rowsets()); + + // 7. add metric to cumulative compaction + DorisMetrics::cumulative_compaction_deltas_total.increment(_input_rowsets.size()); + DorisMetrics::cumulative_compaction_bytes_total.increment(_input_rowsets_size); - _is_init = true; - _cumulative_version = Version(_need_merged_versions.begin()->first, - _need_merged_versions.rbegin()->first); - _rs_writer.reset(new (std::nothrow)AlphaRowsetWriter()); return OLAP_SUCCESS; } -OLAPStatus CumulativeCompaction::run() { - if (!_is_init) { - _tablet->release_cumulative_lock(); - LOG(WARNING) << "cumulative handler is not inited."; - return OLAP_ERR_NOT_INITED; +OLAPStatus CumulativeCompaction::pick_rowsets_to_compact() { + std::vector candidate_rowsets; + _tablet->pick_candicate_rowsets_to_cumulative_compaction(&candidate_rowsets); + + if (candidate_rowsets.size() <= 1) { + LOG(WARNING) << "There is no enough rowsets to cumulative compaction." + << ", the size of rowsets to compact=" << candidate_rowsets.size() + << ", cumulative_point=" << _tablet->cumulative_layer_point(); + return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS; } - // 0. 准备工作 - LOG(INFO) << "start cumulative compaction. tablet=" << _tablet->full_name() - << ", cumulative_version=" << _cumulative_version.first << "-" - << _cumulative_version.second; - OlapStopWatch watch; + std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator); + RETURN_NOT_OK(check_version_continuity(candidate_rowsets)); - // 1. 计算新的cumulative文件的version hash - OLAPStatus res = OLAP_SUCCESS; - res = _tablet->compute_all_versions_hash(_need_merged_versions, &_cumulative_version_hash); - if (res != OLAP_SUCCESS) { - _tablet->release_cumulative_lock(); - LOG(WARNING) << "failed to computer cumulative version hash." - << " tablet=" << _tablet->full_name() - << ", cumulative_version=" << _cumulative_version.first - << "-" << _cumulative_version.second; - return res; - } - - // 2. 获取待合并的delta文件对应的data文件 - res = _tablet->capture_consistent_rowsets(_need_merged_versions, &_rowsets); - if (res != OLAP_SUCCESS) { - _tablet->release_cumulative_lock(); - LOG(WARNING) << "fail to capture consistent rowsets. tablet=" << _tablet->full_name() - << ", version=" << _cumulative_version.first - << "-" << _cumulative_version.second; - return res; - } - - { - DorisMetrics::cumulative_compaction_deltas_total.increment(_need_merged_versions.size()); - int64_t merge_bytes = 0; - for (auto& rowset : _rowsets) { - merge_bytes += rowset->data_disk_size(); - } - DorisMetrics::cumulative_compaction_bytes_total.increment(merge_bytes); - } - - do { - // 3. 生成新cumulative文件对应的olap index - RowsetId rowset_id = 0; - RETURN_NOT_OK(_tablet->next_rowset_id(&rowset_id)); - RowsetWriterContext context; - context.rowset_id = rowset_id; - context.tablet_uid = _tablet->tablet_uid(); - context.tablet_id = _tablet->tablet_id(); - context.partition_id = _tablet->partition_id(); - context.tablet_schema_hash = _tablet->schema_hash(); - context.rowset_type = ALPHA_ROWSET; - context.rowset_path_prefix = _tablet->tablet_path(); - context.tablet_schema = &(_tablet->tablet_schema()); - context.rowset_state = VISIBLE; - context.data_dir = _tablet->data_dir(); - context.version = _cumulative_version; - context.version_hash = _cumulative_version_hash; - _rs_writer->init(context); - - // 4. 执行cumulative compaction合并过程 - for (auto& rowset : _rowsets) { - RowsetReaderSharedPtr rs_reader(rowset->create_reader()); - if (rs_reader == nullptr) { - LOG(WARNING) << "rowset create reader failed. rowset:" << rowset->rowset_id(); - _tablet->release_cumulative_lock(); - return OLAP_ERR_ROWSET_CREATE_READER; - } - _rs_readers.push_back(rs_reader); - } - res = _do_cumulative_compaction(); - _tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + std::to_string(_rs_writer->rowset_id())); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "failed to do cumulative compaction." - << ", tablet=" << _tablet->full_name() - << ", cumulative_version=" << _cumulative_version.first - << "-" << _cumulative_version.second; - break; - } - } while (0); - - // 5. 如果出现错误,执行清理工作 - if (res != OLAP_SUCCESS) { - StorageEngine::instance()->add_unused_rowset(_rowset); - } - - _tablet->release_cumulative_lock(); - - LOG(INFO) << "succeed to do cumulative compaction. tablet=" << _tablet->full_name() - << ", cumulative_version=" << _cumulative_version.first - << "-" << _cumulative_version.second - << ". elapsed time of doing cumulative compaction" - << ", time=" << watch.get_elapse_second() << "s"; - return res; -} - -OLAPStatus CumulativeCompaction::_calculate_need_merged_versions() { - Versions delta_versions; - OLAPStatus res = _get_delta_versions(&delta_versions); - if (res != OLAP_SUCCESS) { - LOG(INFO) << "failed to get delta versions. res=" << res; - return res; - } - - // 此处减1,是为了确保最新版本的delta不会合入到cumulative里 - // 因为push可能会重复导入最新版本的delta - uint32_t delta_number = delta_versions.size() - 1; - uint32_t index = 0; - // 在delta文件中寻找可合并的delta文件 - // 这些delta文件可能被delete或较大的delta文件(>= max_delta_file_size)分割为多个区间, 比如: - // v1, v2, v3, D, v4, v5, D, v6, v7 - // 我们分区间进行查找直至找到合适的可合并delta文件 - while (index < delta_number) { - Versions need_merged_versions; - size_t total_size = 0; - - // 在其中1个区间里查找可以合并的delta文件 - for (; index < delta_number; ++index) { - // 如果已找到的可合并delta文件大小大于等于_max_delta_file_size,我们认为可以执行合并了 - // 停止查找过程 - if (total_size >= _max_delta_file_size) { + std::vector transient_rowsets; + for (size_t i = 0; i < candidate_rowsets.size() - 1; ++i) { + // VersionHash will calculated from chosen rowsets. + // If ultimate singleton rowset is chosen, VersionHash + // will be different from the value recorded in FE. + // So the ultimate singleton rowset is revserved. + RowsetSharedPtr rowset = candidate_rowsets[i]; + if (_tablet->version_for_delete_predicate(rowset->version())) { + if (transient_rowsets.size() > config::cumulative_compaction_num_singleton_deltas) { + _input_rowsets = transient_rowsets; break; } - - Version delta = delta_versions[index]; - size_t delta_size = _tablet->get_rowset_size_by_version(delta); - // 如果遇到大的delta文件,或delete版本文件,则: - if (delta_size >= _max_delta_file_size - || _tablet->version_for_delete_predicate(delta) - || _tablet->version_for_load_deletion(delta)) { - // 1) 如果need_merged_versions为空,表示这2类文件在区间的开头,直接跳过 - if (need_merged_versions.empty()) { - continue; - } else { - // 2) 如果need_merged_versions不为空,则已经找到区间的末尾,跳出循环 - break; - } - } - - need_merged_versions.push_back(delta); - total_size += delta_size; - } - - // 该区间没有可以合并的delta文件,进行下一轮循环,继续查找下一个区间 - if (need_merged_versions.empty()) { + transient_rowsets.clear(); continue; } - // 如果该区间中只有一个delta,或者该区间的delta都是空的delta,则我们查看能否与区间末尾的 - // 大delta合并,或者与区间的开头的前一个版本合并 - if (need_merged_versions.size() == 1 || total_size == 0) { - // 如果区间末尾是较大的delta版, 则与它合并 - if (index < delta_number - && _tablet->get_rowset_size_by_version(delta_versions[index]) >= - _max_delta_file_size) { - need_merged_versions.push_back(delta_versions[index]); - ++index; - } - // 如果区间前一个版本可以合并, 则将其加入到可合并版本中 - Version delta_before_interval; - if (_find_previous_version(need_merged_versions[0], &delta_before_interval)) { - need_merged_versions.insert(need_merged_versions.begin(), - delta_before_interval); - } - - // 如果还是只有1个待合并的delta,则跳过,不进行合并 - if (need_merged_versions.size() == 1) { - continue; - } - - _need_merged_versions.swap(need_merged_versions); - _new_cumulative_layer_point = delta_versions[index].first; - return OLAP_SUCCESS; - } - - // 如果有多个可合并文件,则可以进行cumulative compaction的合并过程 - // 如果只有只有一个可合并的文件,为了效率,不触发cumulative compaction的合并过程 - if (need_merged_versions.size() != 1) { - // 如果在可合并区间开头之前的一个版本的大小没有达到delta文件的最大值, - // 则将可合并区间的文件合并到之前那个版本上 - Version delta; - if (_find_previous_version(need_merged_versions[0], &delta)) { - need_merged_versions.insert(need_merged_versions.begin(), delta); - } - - _need_merged_versions.swap(need_merged_versions); - _new_cumulative_layer_point = delta_versions[index].first; - return OLAP_SUCCESS; - } - } - - // 没有找到可以合并的delta文件,无法执行合并过程,但我们仍然需要设置新的cumulative_layer_point - // 如果不设置新的cumulative_layer_point, 则下次执行cumulative compaction时,扫描的文件和这次 - // 扫描的文件相同,依然找不到可以合并的delta文件, 无法执行合并过程。 - // 依此类推,就进入了死循环状态,永远不会进行cumulative compaction - _tablet->set_cumulative_layer_point(delta_versions[index].first); - _tablet->save_meta(); - return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS; -} - -static bool version_comparator(const Version& lhs, const Version& rhs) { - return lhs.second < rhs.second; -} - -OLAPStatus CumulativeCompaction::_get_delta_versions(Versions* delta_versions) { - delta_versions->clear(); - - Versions all_versions; - _tablet->list_versions(&all_versions); - - for (Versions::const_iterator version = all_versions.begin(); - version != all_versions.end(); ++version) { - if (version->first == version->second && version->first >= _old_cumulative_layer_point) { - delta_versions->push_back(*version); - } + transient_rowsets.push_back(rowset); } - if (delta_versions->size() == 0) { - LOG(INFO) << "no delta versions. cumulative_point=" << _old_cumulative_layer_point; + if (transient_rowsets.size() > config::cumulative_compaction_num_singleton_deltas) { + _input_rowsets = transient_rowsets; + } + + if (_input_rowsets.empty()) { + // There are no rowsets choosed to do cumulative compaction. + // Under this circumstance, cumulative_point should be set. + // Otherwise, the next round will not choose rowsets. + _tablet->set_cumulative_layer_point(candidate_rowsets.back()->start_version()); + } + + if (_input_rowsets.size() <= 1) { + LOG(WARNING) << "There is no enough rowsets to cumulative compaction." + << ", the size of rowsets to compact=" << candidate_rowsets.size() + << ", cumulative_point=" << _tablet->cumulative_layer_point(); return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS; } - // 如果size等于1,则这个delta一定是cumulative point所指向的delta - // 因为我们总是保留最新的delta不合并到cumulative文件里,所以此时应该返回错误,不进行cumulative - if (delta_versions->size() == 1) { - delta_versions->clear(); - VLOG(10) << "only one delta version. no new delta versions." - << " cumulative_point=" << _old_cumulative_layer_point; - return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS; + for (auto& rowset : _input_rowsets) { + _input_rowsets_size += rowset->data_disk_size(); + _input_row_num += rowset->num_rows(); } - - sort(delta_versions->begin(), delta_versions->end(), version_comparator); - - // can't do cumulative expansion if there has a hole - Versions versions_path; - OLAPStatus select_status = _tablet->capture_consistent_versions( - Version(delta_versions->front().first, delta_versions->back().second), &versions_path); - if (select_status != OLAP_SUCCESS) { - LOG(WARNING) << "can't do cumulative expansion if fail to select shortest version path." - << " tablet=" << _tablet->full_name() - << ", start=" << delta_versions->front().first - << ", end=" << delta_versions->back().second; - return select_status; - } - return OLAP_SUCCESS; } -bool CumulativeCompaction::_find_previous_version(const Version current_version, - Version* previous_version) { - Versions all_versions; - if (OLAP_SUCCESS != _tablet->capture_consistent_versions(Version(0, current_version.second), - &all_versions)) { - LOG(WARNING) << "fail to select shortest version path. start=0, end=" << current_version.second; - return false; - } - - // previous_version.second应该等于current_version.first - 1 - for (Versions::const_iterator version = all_versions.begin(); - version != all_versions.end(); ++version) { - // Skip base version - if (version->first == 0) { - continue; - } - - if (version->second == current_version.first - 1) { - if (_tablet->version_for_delete_predicate(*version) - || _tablet->version_for_load_deletion(*version)) { - return false; - } - - size_t data_size = _tablet->get_rowset_size_by_version(*version); - if (data_size >= _max_delta_file_size) { - return false; - } - - *previous_version = *version; - return true; - } - } - - return false; -} - -OLAPStatus CumulativeCompaction::_do_cumulative_compaction() { - OLAPStatus res = OLAP_SUCCESS; - OlapStopWatch watch; - Merger merger(_tablet, _rs_writer, READER_CUMULATIVE_COMPACTION); - - // 1. merge delta files into new cumulative file - uint64_t merged_rows = 0; - uint64_t filted_rows = 0; - res = merger.merge(_rs_readers, &merged_rows, &filted_rows); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "failed to do cumulative merge." - << " tablet=" << _tablet->full_name() - << ", cumulative_version=" << _cumulative_version.first - << "-" << _cumulative_version.second; - return res; - } - - _rowset = _rs_writer->build(); - if (_rowset == nullptr) { - LOG(WARNING) << "rowset writer build failed. writer version:" - << _rs_writer->version().first << "-" << _rs_writer->version().second; - return OLAP_ERR_MALLOC_ERROR; - } - - // 2. Check row num changes - uint64_t source_rows = 0; - for (auto rowset : _rowsets) { - source_rows += rowset->num_rows(); - } - bool row_nums_check = config::row_nums_check; - if (row_nums_check) { - if (source_rows != _rowset->num_rows() + merged_rows + filted_rows) { - LOG(FATAL) << "fail to check row num! " - << "source_rows=" << source_rows - << ", merged_rows=" << merged_rows - << ", filted_rows=" << filted_rows - << ", new_index_rows=" << _rowset->num_rows(); - return OLAP_ERR_CHECK_LINES_ERROR; - } - } else { - LOG(INFO) << "all row nums. source_rows=" << source_rows - << ", merged_rows=" << merged_rows - << ", filted_rows=" << filted_rows - << ", new_index_rows=" << _rowset->num_rows() - << ", merged_version_num=" << _need_merged_versions.size() - << ", time_us=" << watch.get_elapse_time_us(); - } - - // 3. add new cumulative file into tablet - vector unused_rowsets; - _tablet->obtain_header_wrlock(); - res = _tablet->capture_consistent_rowsets(_need_merged_versions, &unused_rowsets); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "fail to capture consistent rowsets. tablet=" << _tablet->full_name() - << ", version=" << _cumulative_version.first - << "-" << _cumulative_version.second; - _tablet->release_header_lock(); - return res; - } - res = _update_header(unused_rowsets); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "failed to update header for new cumulative." - << "tablet=" << _tablet->full_name() - << ", cumulative_version=" << _cumulative_version.first - << "-" << _cumulative_version.second; - _tablet->release_header_lock(); - return res; - } - - // 4. validate that delete action is right - res = _validate_delete_file_action(); - if (res != OLAP_SUCCESS) { - LOG(FATAL) << "delete action of cumulative compaction has error. roll back." - << "tablet=" << _tablet->full_name() - << ", cumulative_version=" << _cumulative_version.first - << "-" << _cumulative_version.second; - // if error happened, roll back - OLAPStatus ret = _roll_back(unused_rowsets); - if (ret != OLAP_SUCCESS) { - LOG(FATAL) << "roll back failed. [tablet=" << _tablet->full_name() << "]"; - } - - _tablet->release_header_lock(); - return res; - } - // 5. 如果合并成功,设置新的cumulative_layer_point - _tablet->set_cumulative_layer_point(_new_cumulative_layer_point); - _tablet->save_meta(); - _tablet->release_header_lock(); - - // 6. delete delta files which have been merged into new cumulative file - _delete_unused_rowsets(&unused_rowsets); - - return res; -} - -OLAPStatus CumulativeCompaction::_update_header(const vector& unused_rowsets) { - vector new_rowsets; - new_rowsets.push_back(_rowset); - - OLAPStatus res = OLAP_SUCCESS; - res = _tablet->modify_rowsets(new_rowsets, unused_rowsets); - if (res != OLAP_SUCCESS) { - LOG(FATAL) << "failed to replace data sources. res=" << res - << ", tablet=" << _tablet->full_name(); - return res; - } - - res = _tablet->save_meta(); - if (res != OLAP_SUCCESS) { - LOG(FATAL) << "failed to save header. res=" << res - << ", tablet=" << _tablet->full_name(); - return res; - } - - return res; -} - -void CumulativeCompaction::_delete_unused_rowsets(vector* unused_rowsets) { - if (!unused_rowsets->empty()) { - StorageEngine* storage_engine = StorageEngine::instance(); - - for (vector::iterator it = unused_rowsets->begin(); - it != unused_rowsets->end(); ++it) { - storage_engine->add_unused_rowset(*it); - } - } -} - -bool CumulativeCompaction::_validate_need_merged_versions() { - // 1. validate versions in _need_merged_versions are continuous - // Skip the first element - for (unsigned int index = 1; index < _need_merged_versions.size(); ++index) { - Version previous_version = _need_merged_versions[index - 1]; - Version current_version = _need_merged_versions[index]; - if (current_version.first != previous_version.second + 1) { - LOG(WARNING) << "wrong need merged version. " - << "previous_version=" << previous_version.first - << "-" << previous_version.second - << ", current_version=" << current_version.first - << "-" << current_version.second; - return false; - } - } - - return true; -} - -OLAPStatus CumulativeCompaction::_validate_delete_file_action() { - // 1. acquire the new cumulative version to make sure that all is right after deleting files - Version spec_version = Version(0, _cumulative_version.second); - vector rs_readers; - _tablet->capture_rs_readers(spec_version, &rs_readers); - if (rs_readers.empty()) { - LOG(WARNING) << "acquire data source failed. " - << "spec_verison=" << spec_version.first << "-" << spec_version.second; - return OLAP_ERR_CUMULATIVE_ERROR_DELETE_ACTION; - } - - return OLAP_SUCCESS; -} - -OLAPStatus CumulativeCompaction::_roll_back(vector& old_olap_indices) { - vector unused_rowsets; - OLAPStatus res = _tablet->capture_consistent_rowsets(_cumulative_version, &unused_rowsets); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "fail to capture consistent rowsets. tablet=" << _tablet->full_name() - << ", version=" << _cumulative_version.first - << "-" << _cumulative_version.second; - return res; - } - - // unused_rowsets will only contain new cumulative index - // we don't need to delete it here; we will delete new cumulative index in the end. - - res = OLAP_SUCCESS; - res = _tablet->modify_rowsets(old_olap_indices, unused_rowsets); - if (res != OLAP_SUCCESS) { - LOG(FATAL) << "failed to replace data sources. [tablet=" << _tablet->full_name() << "]"; - return res; - } - - res = _tablet->save_meta(); - if (res != OLAP_SUCCESS) { - LOG(FATAL) << "failed to save header. [tablet=" << _tablet->full_name() << "]"; - return res; - } - - return res; -} - } // namespace doris diff --git a/be/src/olap/cumulative_compaction.h b/be/src/olap/cumulative_compaction.h index 646deacb6b..d51257cbe0 100755 --- a/be/src/olap/cumulative_compaction.h +++ b/be/src/olap/cumulative_compaction.h @@ -18,147 +18,33 @@ #ifndef DORIS_BE_SRC_OLAP_CUMULATIVE_COMPACTION_H #define DORIS_BE_SRC_OLAP_CUMULATIVE_COMPACTION_H -#include -#include #include -#include -#include "olap/merger.h" -#include "olap/olap_define.h" -#include "olap/tablet.h" -#include "olap/rowset/rowset_id_generator.h" -#include "olap/rowset/alpha_rowset_writer.h" +#include "olap/compaction.h" namespace doris { -class Rowset; - -class CumulativeCompaction { +class CumulativeCompaction : public Compaction { public: - CumulativeCompaction() : - _is_init(false), - _old_cumulative_layer_point(0), - _new_cumulative_layer_point(0), - _max_delta_file_size(0), - _rowset(nullptr), - _rs_writer(nullptr) {} + CumulativeCompaction(TabletSharedPtr tablet); + ~CumulativeCompaction() override; + + OLAPStatus compact() override; + +protected: + OLAPStatus pick_rowsets_to_compact() override; + + std::string compaction_name() const override { + return "cumulative compaction"; + } + + ReaderType compaction_type() const override { + return ReaderType::READER_CUMULATIVE_COMPACTION; + } - ~CumulativeCompaction() {} - - // 初始化CumulativeCompaction对象,包括: - // - 检查是否触发cumulative compaction - // - 计算可合并的delta文件 - // - // 输入参数: - // - tablet 待执行cumulative compaction的tablet - // - // 返回值: - // - 如果触发cumulative compaction,返回OLAP_SUCCESS - // - 否则,返回对应错误码 - OLAPStatus init(TabletSharedPtr tablet); - // 执行cumulative compaction - // - // 返回值: - // - 如果执行成功,返回OLAP_SUCCESS - // - 如果执行失败,返回相应错误码 - OLAPStatus run(); - private: - - // 计算可以合并的delta文件,以及新的cumulative层标识点 - // - // 返回值: - // - 如果成功,返回OLAP_SUCCESS - // - 如果不成功,返回相应错误码 - OLAPStatus _calculate_need_merged_versions(); - - // 获取table现有的delta文件 - // - // 输出参数: - // - delta_versions: 将table现有delta文件的版本号存入该参数 - // - // 返回值: - // - 如果成功,返回OLAP_SUCCESS - // - 如果不成功,返回相应错误码 - OLAPStatus _get_delta_versions(Versions* delta_versions); - - // 找出某一版本文件的前一个版本文件, 即 current_version.first = previous_version.second + 1 - // - // 输入参数: - // - current_version: 某一指定版本 - // - // 输出参数: - // - previous_version: 待查找的指定版本的前一个版本 - // - // 返回值: - // - 如果查找成功,返回true - // - 如果查找失败,返回false - bool _find_previous_version(const Version current_version, Version* previous_version); - - // 执行cumulative compaction合并过程 - // - // 返回值: - // - 如果成功,返回OLAP_SUCCESS - // - 如果不成功,返回相应错误码 - OLAPStatus _do_cumulative_compaction(); - - // 将合并得到的新cumulative文件载入tablet - // - // 输出参数: - // - unused_rowsets: 返回不再使用的delta文件对应的olap index - // - // 返回值: - // - 如果成功,返回OLAP_SUCCESS - // - 如果不成功,返回相应错误码 - OLAPStatus _update_header(const std::vector& unused_rowsets); - - // 删除不再使用的delta文件 - // - // 输入输出参数 - // - unused_rowsets: 待删除的不再使用的delta文件对应的olap index - void _delete_unused_rowsets(std::vector* unused_rowsets); - - // 验证得到的m_need_merged_versions是否正确 - // - // 返回值: - // - 如果错误,返回false - // - 如果正确,返回true - bool _validate_need_merged_versions(); - - // 验证得到的删除文件操作是否正确; 使用该函数前需要对header文件加锁 - // - // 返回值: - // - 如果错误,返回OLAP_ERR_CUMULATIVE_ERROR_DELETE_ACTION - // - 如果正确,返回OLAP_SUCCESS - OLAPStatus _validate_delete_file_action(); - - // 恢复header头文件的文件版本和table的data source - OLAPStatus _roll_back(std::vector& old_olap_indices); - - // CumulativeCompaction对象是否初始化 - bool _is_init; - // table现有的cumulative层的标识点 - int64_t _old_cumulative_layer_point; - // 待cumulative compaction完成之后,新的cumulative层的标识点 - int64_t _new_cumulative_layer_point; - // 一个cumulative文件大小的最大值 - // 当delta文件的大小超过该值时,我们认为该delta文件是cumulative文件 - size_t _max_delta_file_size; - // 待执行cumulative compaction的tablet - TabletSharedPtr _tablet; - // 新cumulative文件的版本 - Version _cumulative_version; - // 新cumulative文件的version hash - VersionHash _cumulative_version_hash; - // 新cumulative文件对应的olap index - RowsetSharedPtr _rowset; - RowsetWriterSharedPtr _rs_writer; - // 可合并的delta文件的data文件 - std::vector _rowsets; - std::vector _rs_readers; - // 可合并的delta文件的版本 - std::vector _need_merged_versions; + int64_t _cumulative_rowset_size_threshold; DISALLOW_COPY_AND_ASSIGN(CumulativeCompaction); }; diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp index 2ed98bde44..be5c722557 100644 --- a/be/src/olap/merger.cpp +++ b/be/src/olap/merger.cpp @@ -35,20 +35,38 @@ namespace doris { Merger::Merger(TabletSharedPtr tablet, RowsetWriterSharedPtr writer, ReaderType type) : _tablet(tablet), - _rs_writer(writer), + _output_rs_writer(writer), + _reader_type(type), + _row_count(0) {} + +Merger::Merger(TabletSharedPtr tablet, ReaderType type, RowsetWriterSharedPtr writer, + const std::vector& rs_readers) : + _tablet(tablet), + _output_rs_writer(writer), + _input_rs_readers(rs_readers), _reader_type(type), _row_count(0) {} OLAPStatus Merger::merge(const vector& rs_readers, - uint64_t* merged_rows, uint64_t* filted_rows) { + int64_t* merged_rows, int64_t* filted_rows) { + _input_rs_readers = rs_readers; + OLAPStatus res = merge(); + if (res == OLAP_SUCCESS) { + *merged_rows= _merged_rows; + *filted_rows = _filted_rows; + } + return res; +} + +OLAPStatus Merger::merge() { // Create and initiate reader for scanning and multi-merging specified // OLAPDatas. Reader reader; ReaderParams reader_params; reader_params.tablet = _tablet; reader_params.reader_type = _reader_type; - reader_params.rs_readers = rs_readers; - reader_params.version = _rs_writer->version(); + reader_params.rs_readers = _input_rs_readers; + reader_params.version = _output_rs_writer->version(); if (OLAP_SUCCESS != reader.init(reader_params)) { LOG(WARNING) << "fail to initiate reader. tablet=" << _tablet->full_name(); @@ -67,7 +85,6 @@ OLAPStatus Merger::merge(const vector& rs_readers, row_cursor.allocate_memory_for_string_type(_tablet->tablet_schema()); // The following procedure would last for long time, half of one day, etc. while (!has_error) { - // Read one row into row_cursor OLAPStatus res = reader.next_row_with_aggregation(&row_cursor, &eof); if (OLAP_SUCCESS == res && eof) { @@ -79,7 +96,7 @@ OLAPStatus Merger::merge(const vector& rs_readers, break; } - if (OLAP_SUCCESS != _rs_writer->add_row(row_cursor)) { + if (OLAP_SUCCESS != _output_rs_writer->add_row(row_cursor)) { LOG(WARNING) << "add row to builder failed. tablet=" << _tablet->full_name(); has_error = true; break; @@ -89,18 +106,17 @@ OLAPStatus Merger::merge(const vector& rs_readers, ++_row_count; } - if (_rs_writer->flush() != OLAP_SUCCESS) { + if (_output_rs_writer->flush() != OLAP_SUCCESS) { LOG(WARNING) << "fail to finalize writer. " << "tablet=" << _tablet->full_name(); has_error = true; } if (!has_error) { - *merged_rows = reader.merged_rows(); - *filted_rows = reader.filtered_rows(); + _merged_rows = reader.merged_rows(); + _filted_rows = reader.filtered_rows(); } return has_error ? OLAP_ERR_OTHER_ERROR : OLAP_SUCCESS; } - } // namespace doris diff --git a/be/src/olap/merger.h b/be/src/olap/merger.h index 946a208645..098bb83231 100644 --- a/be/src/olap/merger.h +++ b/be/src/olap/merger.h @@ -31,24 +31,31 @@ class Merger { public: // parameter index is created by caller, and it is empty. Merger(TabletSharedPtr tablet, RowsetWriterSharedPtr writer, ReaderType type); + Merger(TabletSharedPtr tablet, ReaderType type, + RowsetWriterSharedPtr writer, const std::vector& rs_readers); virtual ~Merger() {}; // @brief read from multiple OLAPData and SegmentGroup, then write into single OLAPData and SegmentGroup // @return OLAPStatus: OLAP_SUCCESS or FAIL // @note it will take long time to finish. - OLAPStatus merge(const std::vector& rs_readers, - uint64_t* merged_rows, uint64_t* filted_rows); + OLAPStatus merge(const vector& rs_readers, + int64_t* merged_rows, int64_t* filted_rows); + OLAPStatus merge(); // 获取在做merge过程中累积的行数 - uint64_t row_count() { - return _row_count; - } + inline int64_t row_count() const { return _row_count; } + inline int64_t merged_rows() const { return _merged_rows; } + inline int64_t filted_rows() const { return _filted_rows; } private: TabletSharedPtr _tablet; - RowsetWriterSharedPtr _rs_writer; + RowsetWriterSharedPtr _output_rs_writer; + + std::vector _input_rs_readers; ReaderType _reader_type; - uint64_t _row_count; + int64_t _row_count; + int64_t _merged_rows; + int64_t _filted_rows; DISALLOW_COPY_AND_ASSIGN(Merger); }; diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h index a34e805aae..8de070442a 100644 --- a/be/src/olap/olap_define.h +++ b/be/src/olap/olap_define.h @@ -333,6 +333,7 @@ enum OLAPStatus { OLAP_ERR_CUMULATIVE_FAILED_ACQUIRE_DATA_SOURCE = -2003, OLAP_ERR_CUMULATIVE_INVALID_NEED_MERGED_VERSIONS = -2004, OLAP_ERR_CUMULATIVE_ERROR_DELETE_ACTION = -2005, + OLAP_ERR_CUMULATIVE_MISS_VERSION = -2006, // OLAPMeta // [-3000, -3100) diff --git a/be/src/olap/olap_snapshot_converter.cpp b/be/src/olap/olap_snapshot_converter.cpp index eb68c0bd89..72bc90eeaf 100755 --- a/be/src/olap/olap_snapshot_converter.cpp +++ b/be/src/olap/olap_snapshot_converter.cpp @@ -32,7 +32,7 @@ OLAPStatus OlapSnapshotConverter::to_olap_header(const TabletMetaPB& tablet_meta LOG(FATAL) << "tablet schema does not have cumulative_layer_point." << " tablet id = " << tablet_meta_pb.tablet_id(); } - olap_header->set_cumulative_layer_point(tablet_meta_pb.cumulative_layer_point()); + olap_header->set_cumulative_layer_point(-1); if (!tablet_meta_pb.schema().has_num_short_key_columns()) { LOG(FATAL) << "tablet schema does not have num_short_key_columns." << " tablet id = " << tablet_meta_pb.tablet_id(); @@ -99,7 +99,7 @@ OLAPStatus OlapSnapshotConverter::to_tablet_meta_pb(const OLAPHeaderMessage& ola tablet_meta_pb->set_shard_id(olap_header.shard()); } tablet_meta_pb->set_creation_time(olap_header.creation_time()); - tablet_meta_pb->set_cumulative_layer_point(olap_header.cumulative_layer_point()); + tablet_meta_pb->set_cumulative_layer_point(-1); TabletSchemaPB* schema = tablet_meta_pb->mutable_schema(); for (auto& column_msg : olap_header.column()) { diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h index 6f7258e4bf..8775b3aae6 100644 --- a/be/src/olap/rowset/rowset.h +++ b/be/src/olap/rowset/rowset.h @@ -125,6 +125,10 @@ public: _need_delete_file = true; } + static bool comparator(const RowsetSharedPtr& left, const RowsetSharedPtr& right) { + return left->end_version() < right->end_version(); + } + protected: // allow subclass to add custom logic when rowset is being published virtual void make_visible_extra(Version version, VersionHash version_hash) {} diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 73c121d5f1..340e74940f 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -1108,8 +1108,8 @@ bool SchemaChangeWithSorting::_external_sorting( TabletSharedPtr new_tablet) { Merger merger(new_tablet, rowset_writer, READER_ALTER_TABLE); - uint64_t merged_rows = 0; - uint64_t filtered_rows = 0; + int64_t merged_rows = 0; + int64_t filtered_rows = 0; vector rs_readers; for (vector::iterator it = src_rowsets.begin(); it != src_rowsets.end(); ++it) { @@ -1248,7 +1248,7 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe new_tablet->modify_rowsets(std::vector(), rowsets_to_delete); // inherit cumulative_layer_point from base_tablet // check if new_tablet.ce_point > base_tablet.ce_point? - new_tablet->set_cumulative_layer_point(base_tablet->cumulative_layer_point()); + new_tablet->set_cumulative_layer_point(-1); // save tablet meta res = new_tablet->save_meta(); if (res != OLAP_SUCCESS) { @@ -1519,7 +1519,7 @@ OLAPStatus SchemaChangeHandler::process_alter_tablet(AlterTabletType type, } // inherit cumulative_layer_point from base_tablet - new_tablet->set_cumulative_layer_point(base_tablet->cumulative_layer_point()); + new_tablet->set_cumulative_layer_point(-1); // get history versions to be changed res = _get_versions_to_be_changed(base_tablet, &versions_to_be_changed); diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 8877299346..6d7bd20a65 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -567,26 +567,13 @@ void StorageEngine::perform_cumulative_compaction(DataDir* data_dir) { if (best_tablet == nullptr) { return; } DorisMetrics::cumulative_compaction_request_total.increment(1); - CumulativeCompaction cumulative_compaction; - OLAPStatus res = cumulative_compaction.init(best_tablet); - if (res != OLAP_SUCCESS) { - if (res != OLAP_ERR_CUMULATIVE_REPEAT_INIT && res != OLAP_ERR_CE_TRY_CE_LOCK_ERROR) { - best_tablet->set_last_compaction_failure_time(UnixMillis()); - if (res != OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS) { - LOG(WARNING) << "failed to init cumulative compaction" - << ", table=" << best_tablet->full_name() - << ", res=" << res; - DorisMetrics::cumulative_compaction_request_failed.increment(1); - } - } - return; - } + CumulativeCompaction cumulative_compaction(best_tablet); - res = cumulative_compaction.run(); + OLAPStatus res = cumulative_compaction.compact(); if (res != OLAP_SUCCESS) { DorisMetrics::cumulative_compaction_request_failed.increment(1); best_tablet->set_last_compaction_failure_time(UnixMillis()); - LOG(WARNING) << "failed to do cumulative compaction" + LOG(WARNING) << "failed to do cumulative compaction. res=" << res << ", table=" << best_tablet->full_name() << ", res=" << res; return; @@ -599,26 +586,13 @@ void StorageEngine::perform_base_compaction(DataDir* data_dir) { if (best_tablet == nullptr) { return; } DorisMetrics::base_compaction_request_total.increment(1); - BaseCompaction base_compaction; - OLAPStatus res = base_compaction.init(best_tablet); - if (res != OLAP_SUCCESS) { - if (res != OLAP_ERR_BE_TRY_BE_LOCK_ERROR && res != OLAP_ERR_BE_NO_SUITABLE_VERSION) { - DorisMetrics::base_compaction_request_failed.increment(1); - best_tablet->set_last_compaction_failure_time(UnixMillis()); - LOG(WARNING) << "failed to init base compaction" - << ", table=" << best_tablet->full_name() - << ", res=" << res; - } - return; - } - - res = base_compaction.run(); + BaseCompaction base_compaction(best_tablet); + OLAPStatus res = base_compaction.compact(); if (res != OLAP_SUCCESS) { DorisMetrics::base_compaction_request_failed.increment(1); best_tablet->set_last_compaction_failure_time(UnixMillis()); - LOG(WARNING) << "failed to init base compaction" - << ", table=" << best_tablet->full_name() - << ", res=" << res; + LOG(WARNING) << "failed to init base compaction. res=" << res + << ", table=" << best_tablet->full_name(); return; } best_tablet->set_last_compaction_failure_time(0); diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 6ad9679493..3634c9d08e 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -150,6 +150,8 @@ OLAPStatus Tablet::init_once() { _inc_rs_version_map[version] = rowset; } + _cumulative_point = -1; + return res; } @@ -670,6 +672,17 @@ OLAPStatus Tablet::compute_all_versions_hash(const vector& versions, return OLAP_SUCCESS; } +void Tablet::compute_version_hash_from_rowsets( + const std::vector& rowsets, VersionHash* version_hash) const { + DCHECK(version_hash != nullptr) << "invalid parameter, version_hash is nullptr"; + int64_t v_hash = 0; + for (auto& rowset : rowsets) { + v_hash ^= rowset->version_hash(); + } + *version_hash = v_hash; +} + + void Tablet::calc_missed_versions(int64_t spec_version, vector* missed_versions) { ReadLock rdlock(&_meta_lock); @@ -736,6 +749,38 @@ OLAPStatus Tablet::max_continuous_version_from_begining(Version* version, Versio return OLAP_SUCCESS; } +OLAPStatus Tablet::calculate_cumulative_point() { + WriteLock wrlock(&_meta_lock); + if (_cumulative_point != -1) { + return OLAP_SUCCESS; + } + + std::list existing_versions; + for (auto& rs : _tablet_meta->all_rs_metas()) { + existing_versions.emplace_back(rs->version()); + } + + // sort the existing versions in ascending order + existing_versions.sort([](const Version& a, const Version& b) { + // simple because 2 versions are certainly not overlapping + return a.first < b.first; + }); + + int64_t prev_version = -1; + for (const Version& version : existing_versions) { + if (version.first > prev_version + 1) { + break; + } + if (version.first == version.second) { + _cumulative_point = version.first; + break; + } + prev_version = version.second; + _cumulative_point = version.second + 1; + } + return OLAP_SUCCESS; +} + OLAPStatus Tablet::split_range( const OlapTuple& start_key_strings, const OlapTuple& end_key_strings, @@ -937,4 +982,23 @@ TabletInfo Tablet::get_tablet_info() { return TabletInfo(tablet_id(), schema_hash(), tablet_uid()); } +void Tablet::pick_candicate_rowsets_to_cumulative_compaction(std::vector* candidate_rowsets) { + ReadLock rdlock(&_meta_lock); + for (auto& it : _rs_version_map) { + if (it.first.first >= _cumulative_point + && it.first.first == it.first.second) { + candidate_rowsets->push_back(it.second); + } + } +} + +void Tablet::pick_candicate_rowsets_to_base_compaction(std::vector* candidate_rowsets) { + ReadLock rdlock(&_meta_lock); + for (auto& it : _rs_version_map) { + if (it.first.first < _cumulative_point) { + candidate_rowsets->push_back(it.second); + } + } +} + } // namespace doris diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index e4cf7e42be..2a8e33f7de 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -165,14 +165,14 @@ public: inline Mutex* get_push_lock() { return &_ingest_lock; } // base lock - inline bool try_base_compaction_lock() { return _base_lock.trylock() == OLAP_SUCCESS; } inline void obtain_base_compaction_lock() { _base_lock.lock(); } inline void release_base_compaction_lock() { _base_lock.unlock(); } + inline Mutex* get_base_lock() { return &_base_lock; } // cumulative lock - inline bool try_cumulative_lock() { return (OLAP_SUCCESS == _cumulative_lock.trylock()); } inline void obtain_cumulative_lock() { _cumulative_lock.lock(); } inline void release_cumulative_lock() { _cumulative_lock.unlock(); } + inline Mutex* get_cumulative_lock() { return &_cumulative_lock; } inline RWMutex* get_migration_lock_ptr() { return &_migration_lock; } @@ -182,6 +182,8 @@ public: const uint32_t calc_base_compaction_score() const; OLAPStatus compute_all_versions_hash(const std::vector& versions, VersionHash* version_hash) const; + void compute_version_hash_from_rowsets(const std::vector& rowsets, + VersionHash* version_hash) const; // operation for clone void calc_missed_versions(int64_t spec_version, vector* missed_versions); @@ -232,6 +234,11 @@ public: TabletInfo get_tablet_info(); + void pick_candicate_rowsets_to_cumulative_compaction(std::vector* candidate_rowsets); + void pick_candicate_rowsets_to_base_compaction(std::vector* candidate_rowsets); + + OLAPStatus calculate_cumulative_point(); + private: void _print_missed_versions(const std::vector& missed_versions) const; OLAPStatus _check_added_rowset(const RowsetSharedPtr& rowset); @@ -257,6 +264,7 @@ private: std::atomic _is_bad; // if this tablet is broken, set to true. default is false std::atomic _last_compaction_failure_time; // timestamp of last compaction failure + int64_t _cumulative_point; DISALLOW_COPY_AND_ASSIGN(Tablet); }; @@ -315,11 +323,11 @@ inline void Tablet::set_creation_time(int64_t creation_time) { } inline const int64_t Tablet::cumulative_layer_point() const { - return _tablet_meta->cumulative_layer_point(); + return _cumulative_point; } void inline Tablet::set_cumulative_layer_point(const int64_t new_point) { - return _tablet_meta->set_cumulative_layer_point(new_point); + _cumulative_point = new_point; } inline bool Tablet::equal(int64_t tablet_id, int32_t schema_hash) { diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 28255de058..5f2645b074 100755 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -745,18 +745,16 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction( } if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) { - if (!table_ptr->try_cumulative_lock()) { + MutexLock lock(table_ptr->get_cumulative_lock(), TRY_LOCK); + if (!lock.own_lock()) { continue; - } else { - table_ptr->release_cumulative_lock(); } } if (compaction_type == CompactionType::BASE_COMPACTION) { - if (!table_ptr->try_base_compaction_lock()) { + MutexLock lock(table_ptr->get_base_lock(), TRY_LOCK); + if (!lock.own_lock()) { continue; - } else { - table_ptr->release_base_compaction_lock(); } } @@ -1244,7 +1242,6 @@ OLAPStatus TabletManager::_create_inital_rowset( return res; } } - tablet->set_cumulative_layer_point(request.version + 1); res = tablet->save_meta(); if (res != OLAP_SUCCESS) { LOG(WARNING) << "fail to save header. [tablet=" << tablet->full_name() << "]"; diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index 3fa14b5fdf..6a2bb650d6 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -690,7 +690,7 @@ OLAPStatus EngineCloneTask::_finish_clone(TabletSharedPtr tablet, const string& // if full clone success, need to update cumulative layer point if (!is_incremental_clone && res == OLAP_SUCCESS) { - tablet->set_cumulative_layer_point(cloned_tablet_meta.cumulative_layer_point()); + tablet->set_cumulative_layer_point(-1); } } while (0); diff --git a/be/src/olap/utils.h b/be/src/olap/utils.h index 002620817d..0cdc36f9b8 100644 --- a/be/src/olap/utils.h +++ b/be/src/olap/utils.h @@ -192,17 +192,28 @@ private: class MutexLock { public: // wait until obtain the lock - explicit MutexLock(Mutex* mutex) : _mutex(mutex) { - _mutex->lock(); + explicit MutexLock(Mutex* mutex, bool try_lock = false) + : _mutex(mutex), _locked(false) { + if (try_lock) { + _locked = (_mutex->trylock() == OLAP_SUCCESS); + } else { + _mutex->lock(); + _locked = true; + } } + // unlock is called after ~MutexLock() { - _mutex->unlock(); + if (_locked) { + _mutex->unlock(); + } } + inline bool own_lock() const { return _locked; } + private: Mutex* _mutex; - + bool _locked; DISALLOW_COPY_AND_ASSIGN(MutexLock); }; @@ -248,25 +259,25 @@ private: class ReadLock { public: explicit ReadLock(RWMutex* mutex, bool try_lock = false) - : _mutex(mutex), locked(false) { + : _mutex(mutex), _locked(false) { if (try_lock) { - locked = this->_mutex->tryrdlock() == OLAP_SUCCESS; + _locked = this->_mutex->tryrdlock() == OLAP_SUCCESS; } else { this->_mutex->rdlock(); - locked = true; + _locked = true; } } ~ReadLock() { - if (locked) { + if (_locked) { this->_mutex->unlock(); } } - bool own_lock() { return locked; } + bool own_lock() { return _locked; } private: RWMutex* _mutex; - bool locked; + bool _locked; DISALLOW_COPY_AND_ASSIGN(ReadLock); }; @@ -278,25 +289,25 @@ private: class WriteLock { public: explicit WriteLock(RWMutex* mutex, bool try_lock = false) - : _mutex(mutex), locked(false) { + : _mutex(mutex), _locked(false) { if (try_lock) { - locked = this->_mutex->trywrlock() == OLAP_SUCCESS; + _locked = this->_mutex->trywrlock() == OLAP_SUCCESS; } else { this->_mutex->wrlock(); - locked = true; + _locked = true; } } ~WriteLock() { - if (locked) { + if (_locked) { this->_mutex->unlock(); } } - bool own_lock() { return locked; } + bool own_lock() { return _locked; } private: RWMutex* _mutex; - bool locked; + bool _locked; DISALLOW_COPY_AND_ASSIGN(WriteLock); };