[Bug] Fix bug of cumulative compaction and deletion of stale version (#4593)
When selecting candidate rowsets to do the cumulative compaction, some rowsets may not be selected because the protection time has not expired. Therefore, we need to find the current longest continuous version path in the candidate rowsets.
This commit is contained in:
@ -34,6 +34,11 @@ BaseTablet::BaseTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir)
|
||||
_data_dir(data_dir) {
|
||||
_gen_tablet_path();
|
||||
|
||||
std::stringstream ss;
|
||||
ss << _tablet_meta->tablet_id() << "." << _tablet_meta->schema_hash() << "."
|
||||
<< _tablet_meta->tablet_uid().to_string();
|
||||
_full_name = ss.str();
|
||||
|
||||
_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(
|
||||
strings::Substitute("Tablet.$0", tablet_id()),
|
||||
{{"tablet_id", std::to_string(tablet_id())}},
|
||||
|
||||
@ -77,6 +77,8 @@ protected:
|
||||
|
||||
// metrics of this tablet
|
||||
std::shared_ptr<MetricEntity> _metric_entity = nullptr;
|
||||
|
||||
std::string _full_name;
|
||||
public:
|
||||
IntCounter* query_scan_bytes;
|
||||
IntCounter* query_scan_rows;
|
||||
@ -110,10 +112,7 @@ inline int64_t BaseTablet::table_id() const {
|
||||
}
|
||||
|
||||
inline const std::string BaseTablet::full_name() const {
|
||||
std::stringstream ss;
|
||||
ss << _tablet_meta->tablet_id() << "." << _tablet_meta->schema_hash() << "."
|
||||
<< _tablet_meta->tablet_uid().to_string();
|
||||
return ss.str();
|
||||
return _full_name;
|
||||
}
|
||||
|
||||
inline int64_t BaseTablet::partition_id() const {
|
||||
|
||||
@ -177,6 +177,33 @@ OLAPStatus Compaction::gc_unused_rowsets() {
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
// Find the longest consecutive version path in "rowset", from begining.
|
||||
// Two versions before and after the missing version will be saved in missing_version,
|
||||
// if missing_version is not null.
|
||||
OLAPStatus Compaction::find_longest_consecutive_version(
|
||||
vector<RowsetSharedPtr>* rowsets,
|
||||
vector<Version>* missing_version) {
|
||||
if (rowsets->empty()) {
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
RowsetSharedPtr prev_rowset = rowsets->front();
|
||||
size_t i = 1;
|
||||
for (; i < rowsets->size(); ++i) {
|
||||
RowsetSharedPtr rowset = (*rowsets)[i];
|
||||
if (rowset->start_version() != prev_rowset->end_version() + 1) {
|
||||
if (missing_version != nullptr) {
|
||||
missing_version->push_back(prev_rowset->version());
|
||||
missing_version->push_back(rowset->version());
|
||||
}
|
||||
break;
|
||||
}
|
||||
prev_rowset = rowset;
|
||||
}
|
||||
|
||||
rowsets->resize(i);
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
OLAPStatus Compaction::check_version_continuity(const vector<RowsetSharedPtr>& rowsets) {
|
||||
RowsetSharedPtr prev_rowset = rowsets.front();
|
||||
for (size_t i = 1; i < rowsets.size(); ++i) {
|
||||
|
||||
@ -65,6 +65,7 @@ protected:
|
||||
|
||||
OLAPStatus check_version_continuity(const std::vector<RowsetSharedPtr>& rowsets);
|
||||
OLAPStatus check_correctness(const Merger::Statistics& stats);
|
||||
OLAPStatus find_longest_consecutive_version(std::vector<RowsetSharedPtr>* rowsets, std::vector<Version>* missing_version);
|
||||
|
||||
private:
|
||||
// get num rows from segment group meta of input rowsets.
|
||||
|
||||
@ -84,7 +84,18 @@ OLAPStatus CumulativeCompaction::pick_rowsets_to_compact() {
|
||||
return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS;
|
||||
}
|
||||
|
||||
RETURN_NOT_OK(check_version_continuity(candidate_rowsets));
|
||||
// candidate_rowsets may not be continuous. Because some rowset may not be selected
|
||||
// because the protection time has not expired(config::cumulative_compaction_skip_window_seconds).
|
||||
// So we need to choose the longest continuous path from it.
|
||||
std::vector<Version> missing_versions;
|
||||
RETURN_NOT_OK(find_longest_consecutive_version(&candidate_rowsets, &missing_versions));
|
||||
if (!missing_versions.empty()) {
|
||||
DCHECK(missing_versions.size() == 2);
|
||||
LOG(WARNING) << "There are missed versions among rowsets. "
|
||||
<< "prev rowset verison=" << missing_versions[0]
|
||||
<< ", next rowset version=" << missing_versions[1]
|
||||
<< ", tablet=" << _tablet->full_name();
|
||||
}
|
||||
|
||||
size_t compaction_score = 0;
|
||||
int transient_size = _tablet->cumulative_compaction_policy()->pick_input_rowsets(
|
||||
@ -150,4 +161,3 @@ OLAPStatus CumulativeCompaction::pick_rowsets_to_compact() {
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
|
||||
|
||||
@ -436,7 +436,6 @@ void CumulativeCompactionPolicy::pick_candidate_rowsets(int64_t skip_window_sec,
|
||||
}
|
||||
}
|
||||
std::sort(candidate_rowsets->begin(), candidate_rowsets->end(), Rowset::comparator);
|
||||
|
||||
}
|
||||
|
||||
std::unique_ptr<CumulativeCompactionPolicy> CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(std::string type) {
|
||||
@ -467,4 +466,4 @@ void CumulativeCompactionPolicyFactory::_parse_cumulative_compaction_policy(std:
|
||||
*policy_type = NUM_BASED_POLICY;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -20,6 +20,7 @@
|
||||
|
||||
#include "olap/tablet_meta.h"
|
||||
#include "olap/rowset/rowset_meta.h"
|
||||
#include "olap/cumulative_compaction.h"
|
||||
#include "olap/cumulative_compaction_policy.h"
|
||||
|
||||
namespace doris {
|
||||
@ -638,6 +639,24 @@ public:
|
||||
rs_metas->push_back(ptr5);
|
||||
}
|
||||
|
||||
void init_rs_meta_missing_version(std::vector<RowsetMetaSharedPtr>* rs_metas) {
|
||||
RowsetMetaSharedPtr ptr1(new RowsetMeta());
|
||||
init_rs_meta(ptr1, 0, 0);
|
||||
rs_metas->push_back(ptr1);
|
||||
|
||||
RowsetMetaSharedPtr ptr2(new RowsetMeta());
|
||||
init_rs_meta(ptr2, 1, 1);
|
||||
rs_metas->push_back(ptr2);
|
||||
|
||||
RowsetMetaSharedPtr ptr3(new RowsetMeta());
|
||||
init_rs_meta(ptr3, 2, 2);
|
||||
rs_metas->push_back(ptr3);
|
||||
|
||||
RowsetMetaSharedPtr ptr5(new RowsetMeta());
|
||||
init_rs_meta(ptr5, 4, 4);
|
||||
rs_metas->push_back(ptr5);
|
||||
}
|
||||
|
||||
protected:
|
||||
std::string _json_rowset_meta;
|
||||
TabletMetaSharedPtr _tablet_meta;
|
||||
@ -1013,10 +1032,46 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, _level_size) {
|
||||
ASSERT_EQ(134217728, policy->_levels[2]);
|
||||
ASSERT_EQ(67108864, policy->_levels[3]);
|
||||
}
|
||||
|
||||
TEST_F(TestSizeBasedCumulativeCompactionPolicy, _pick_missing_version_cumulative_compaction) {
|
||||
std::vector<RowsetMetaSharedPtr> rs_metas;
|
||||
init_rs_meta_missing_version(&rs_metas);
|
||||
|
||||
for (auto &rowset : rs_metas) {
|
||||
_tablet_meta->add_rs_meta(rowset);
|
||||
}
|
||||
|
||||
TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
|
||||
_tablet->init();
|
||||
|
||||
// has miss version
|
||||
std::vector<RowsetSharedPtr> rowsets;
|
||||
rowsets.push_back(_tablet->get_rowset_by_version({0, 0}));
|
||||
rowsets.push_back(_tablet->get_rowset_by_version({1, 1}));
|
||||
rowsets.push_back(_tablet->get_rowset_by_version({2, 2}));
|
||||
rowsets.push_back(_tablet->get_rowset_by_version({4, 4}));
|
||||
std::shared_ptr<MemTracker> mem_tracker(new MemTracker());
|
||||
CumulativeCompaction compaction(_tablet, "label", mem_tracker);
|
||||
compaction.find_longest_consecutive_version(&rowsets);
|
||||
ASSERT_EQ(3, rowsets.size());
|
||||
ASSERT_EQ(2, rowsets[2]->end_version());
|
||||
|
||||
// no miss version
|
||||
std::vector<RowsetSharedPtr> rowsets2;
|
||||
rowsets2.push_back(_tablet->get_rowset_by_version({0, 0}));
|
||||
compaction.find_longest_consecutive_version(&rowsets2);
|
||||
ASSERT_EQ(1, rowsets2.size());
|
||||
ASSERT_EQ(0, rowsets[0]->end_version());
|
||||
|
||||
// no version
|
||||
std::vector<RowsetSharedPtr> rowsets3;
|
||||
compaction.find_longest_consecutive_version(&rowsets3);
|
||||
ASSERT_EQ(0, rowsets3.size());
|
||||
}
|
||||
}
|
||||
|
||||
// @brief Test Stub
|
||||
int main(int argc, char** argv) {
|
||||
testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user