From 588e5bee4793cc383dc49e73f06f6cde2e286986 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Wed, 21 Oct 2020 10:03:55 +0800 Subject: [PATCH] [Bug] Fix bug of cumulative compaction and deletion of stale version (#4593) When selecting candidate rowsets to do the cumulative compaction, some rowsets may not be selected because the protection time has not expired. Therefore, we need to find the current longest continuous version path in the candidate rowsets. --- be/src/olap/base_tablet.cpp | 5 ++ be/src/olap/base_tablet.h | 7 +-- be/src/olap/compaction.cpp | 27 +++++++++ be/src/olap/compaction.h | 1 + be/src/olap/cumulative_compaction.cpp | 14 ++++- be/src/olap/cumulative_compaction_policy.cpp | 3 +- .../cumulative_compaction_policy_test.cpp | 57 ++++++++++++++++++- 7 files changed, 105 insertions(+), 9 deletions(-) diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 507bbc6637..64a31052ee 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -34,6 +34,11 @@ BaseTablet::BaseTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir) _data_dir(data_dir) { _gen_tablet_path(); + std::stringstream ss; + ss << _tablet_meta->tablet_id() << "." << _tablet_meta->schema_hash() << "." + << _tablet_meta->tablet_uid().to_string(); + _full_name = ss.str(); + _metric_entity = DorisMetrics::instance()->metric_registry()->register_entity( strings::Substitute("Tablet.$0", tablet_id()), {{"tablet_id", std::to_string(tablet_id())}}, diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index 6097d23957..bce485dc8a 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -77,6 +77,8 @@ protected: // metrics of this tablet std::shared_ptr _metric_entity = nullptr; + + std::string _full_name; public: IntCounter* query_scan_bytes; IntCounter* query_scan_rows; @@ -110,10 +112,7 @@ inline int64_t BaseTablet::table_id() const { } inline const std::string BaseTablet::full_name() const { - std::stringstream ss; - ss << _tablet_meta->tablet_id() << "." << _tablet_meta->schema_hash() << "." - << _tablet_meta->tablet_uid().to_string(); - return ss.str(); + return _full_name; } inline int64_t BaseTablet::partition_id() const { diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index c5526314b6..7aa6b9fb4c 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -177,6 +177,33 @@ OLAPStatus Compaction::gc_unused_rowsets() { return OLAP_SUCCESS; } +// Find the longest consecutive version path in "rowset", from begining. +// Two versions before and after the missing version will be saved in missing_version, +// if missing_version is not null. +OLAPStatus Compaction::find_longest_consecutive_version( + vector* rowsets, + vector* missing_version) { + if (rowsets->empty()) { + return OLAP_SUCCESS; + } + RowsetSharedPtr prev_rowset = rowsets->front(); + size_t i = 1; + for (; i < rowsets->size(); ++i) { + RowsetSharedPtr rowset = (*rowsets)[i]; + if (rowset->start_version() != prev_rowset->end_version() + 1) { + if (missing_version != nullptr) { + missing_version->push_back(prev_rowset->version()); + missing_version->push_back(rowset->version()); + } + break; + } + prev_rowset = rowset; + } + + rowsets->resize(i); + return OLAP_SUCCESS; +} + OLAPStatus Compaction::check_version_continuity(const vector& rowsets) { RowsetSharedPtr prev_rowset = rowsets.front(); for (size_t i = 1; i < rowsets.size(); ++i) { diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index 12adbd481e..ed767a78fb 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -65,6 +65,7 @@ protected: OLAPStatus check_version_continuity(const std::vector& rowsets); OLAPStatus check_correctness(const Merger::Statistics& stats); + OLAPStatus find_longest_consecutive_version(std::vector* rowsets, std::vector* missing_version); private: // get num rows from segment group meta of input rowsets. diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index 49d7c0ebac..25398a0e2b 100755 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -84,7 +84,18 @@ OLAPStatus CumulativeCompaction::pick_rowsets_to_compact() { return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS; } - RETURN_NOT_OK(check_version_continuity(candidate_rowsets)); + // candidate_rowsets may not be continuous. Because some rowset may not be selected + // because the protection time has not expired(config::cumulative_compaction_skip_window_seconds). + // So we need to choose the longest continuous path from it. + std::vector missing_versions; + RETURN_NOT_OK(find_longest_consecutive_version(&candidate_rowsets, &missing_versions)); + if (!missing_versions.empty()) { + DCHECK(missing_versions.size() == 2); + LOG(WARNING) << "There are missed versions among rowsets. " + << "prev rowset verison=" << missing_versions[0] + << ", next rowset version=" << missing_versions[1] + << ", tablet=" << _tablet->full_name(); + } size_t compaction_score = 0; int transient_size = _tablet->cumulative_compaction_policy()->pick_input_rowsets( @@ -150,4 +161,3 @@ OLAPStatus CumulativeCompaction::pick_rowsets_to_compact() { } } // namespace doris - diff --git a/be/src/olap/cumulative_compaction_policy.cpp b/be/src/olap/cumulative_compaction_policy.cpp index dc3257e81e..ee39fb39cf 100644 --- a/be/src/olap/cumulative_compaction_policy.cpp +++ b/be/src/olap/cumulative_compaction_policy.cpp @@ -436,7 +436,6 @@ void CumulativeCompactionPolicy::pick_candidate_rowsets(int64_t skip_window_sec, } } std::sort(candidate_rowsets->begin(), candidate_rowsets->end(), Rowset::comparator); - } std::unique_ptr CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(std::string type) { @@ -467,4 +466,4 @@ void CumulativeCompactionPolicyFactory::_parse_cumulative_compaction_policy(std: *policy_type = NUM_BASED_POLICY; } } -} \ No newline at end of file +} diff --git a/be/test/olap/cumulative_compaction_policy_test.cpp b/be/test/olap/cumulative_compaction_policy_test.cpp index 7819cc21b9..ca80100bde 100644 --- a/be/test/olap/cumulative_compaction_policy_test.cpp +++ b/be/test/olap/cumulative_compaction_policy_test.cpp @@ -20,6 +20,7 @@ #include "olap/tablet_meta.h" #include "olap/rowset/rowset_meta.h" +#include "olap/cumulative_compaction.h" #include "olap/cumulative_compaction_policy.h" namespace doris { @@ -638,6 +639,24 @@ public: rs_metas->push_back(ptr5); } + void init_rs_meta_missing_version(std::vector* rs_metas) { + RowsetMetaSharedPtr ptr1(new RowsetMeta()); + init_rs_meta(ptr1, 0, 0); + rs_metas->push_back(ptr1); + + RowsetMetaSharedPtr ptr2(new RowsetMeta()); + init_rs_meta(ptr2, 1, 1); + rs_metas->push_back(ptr2); + + RowsetMetaSharedPtr ptr3(new RowsetMeta()); + init_rs_meta(ptr3, 2, 2); + rs_metas->push_back(ptr3); + + RowsetMetaSharedPtr ptr5(new RowsetMeta()); + init_rs_meta(ptr5, 4, 4); + rs_metas->push_back(ptr5); + } + protected: std::string _json_rowset_meta; TabletMetaSharedPtr _tablet_meta; @@ -1013,10 +1032,46 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, _level_size) { ASSERT_EQ(134217728, policy->_levels[2]); ASSERT_EQ(67108864, policy->_levels[3]); } + +TEST_F(TestSizeBasedCumulativeCompactionPolicy, _pick_missing_version_cumulative_compaction) { + std::vector rs_metas; + init_rs_meta_missing_version(&rs_metas); + + for (auto &rowset : rs_metas) { + _tablet_meta->add_rs_meta(rowset); + } + + TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY)); + _tablet->init(); + + // has miss version + std::vector rowsets; + rowsets.push_back(_tablet->get_rowset_by_version({0, 0})); + rowsets.push_back(_tablet->get_rowset_by_version({1, 1})); + rowsets.push_back(_tablet->get_rowset_by_version({2, 2})); + rowsets.push_back(_tablet->get_rowset_by_version({4, 4})); + std::shared_ptr mem_tracker(new MemTracker()); + CumulativeCompaction compaction(_tablet, "label", mem_tracker); + compaction.find_longest_consecutive_version(&rowsets); + ASSERT_EQ(3, rowsets.size()); + ASSERT_EQ(2, rowsets[2]->end_version()); + + // no miss version + std::vector rowsets2; + rowsets2.push_back(_tablet->get_rowset_by_version({0, 0})); + compaction.find_longest_consecutive_version(&rowsets2); + ASSERT_EQ(1, rowsets2.size()); + ASSERT_EQ(0, rowsets[0]->end_version()); + + // no version + std::vector rowsets3; + compaction.find_longest_consecutive_version(&rowsets3); + ASSERT_EQ(0, rowsets3.size()); +} } // @brief Test Stub int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); -} \ No newline at end of file +}