[Enhancement](compaction) enable the compaction producer to generate multiple compaction tasks in a single run (#45411) (#46160)

pick master #45411
This commit is contained in:
Luwei
2024-12-31 09:51:43 +08:00
committed by GitHub
parent 3d79955db3
commit df26475e1a
6 changed files with 276 additions and 13 deletions

View File

@ -1349,6 +1349,8 @@ DEFINE_mBool(enable_pipeline_task_leakage_detect, "false");
DEFINE_Bool(force_regenerate_rowsetid_on_start_error, "false");
DEFINE_mBool(enable_sleep_between_delete_cumu_compaction, "false");
DEFINE_mInt32(compaction_num_per_round, "1");
// clang-format off
#ifdef BE_TEST
// test s3

View File

@ -1419,6 +1419,8 @@ DECLARE_mInt32(lz4_compression_block_size);
DECLARE_mBool(enable_pipeline_task_leakage_detect);
DECLARE_Bool(force_regenerate_rowsetid_on_start_error);
DECLARE_mInt32(compaction_num_per_round);
// Enable sleep 5s between delete cumulative compaction.
DECLARE_mBool(enable_sleep_between_delete_cumu_compaction);

View File

@ -919,11 +919,11 @@ std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks(
continue;
}
// Even if need_pick_tablet is false, we still need to call find_best_tablet_to_compaction(),
// Even if need_pick_tablet is false, we still need to call find_best_tablets_to_compaction(),
// So that we can update the max_compaction_score metric.
if (!data_dir->reach_capacity_limit(0)) {
uint32_t disk_max_score = 0;
TabletSharedPtr tablet = _tablet_manager->find_best_tablet_to_compaction(
std::vector<TabletSharedPtr> tablets = _tablet_manager->find_best_tablets_to_compaction(
compaction_type, data_dir,
compaction_type == CompactionType::CUMULATIVE_COMPACTION
? copied_cumu_map[data_dir]
@ -932,11 +932,13 @@ std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks(
int concurrent_num = get_concurrent_per_disk(disk_max_score, thread_per_disk);
need_pick_tablet = need_generate_compaction_tasks(
count, concurrent_num, compaction_type, copied_cumu_map[data_dir].empty());
if (tablet != nullptr) {
if (need_pick_tablet) {
tablets_compaction.emplace_back(tablet);
for (const auto& tablet : tablets) {
if (tablet != nullptr) {
if (need_pick_tablet) {
tablets_compaction.emplace_back(tablet);
}
max_compaction_score = std::max(max_compaction_score, disk_max_score);
}
max_compaction_score = std::max(max_compaction_score, disk_max_score);
}
}
}

View File

@ -728,7 +728,12 @@ void TabletManager::get_tablet_stat(TTabletStatResult* result) {
result->__set_tablet_stat_list(*local_cache);
}
TabletSharedPtr TabletManager::find_best_tablet_to_compaction(
struct TabletScore {
TabletSharedPtr tablet_ptr;
int score;
};
std::vector<TabletSharedPtr> TabletManager::find_best_tablets_to_compaction(
CompactionType compaction_type, DataDir* data_dir,
const std::unordered_set<TabletSharedPtr>& tablet_submitted_compaction, uint32_t* score,
const std::unordered_map<std::string_view, std::shared_ptr<CumulativeCompactionPolicy>>&
@ -739,6 +744,9 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction(
uint32_t highest_score = 0;
uint32_t compaction_score = 0;
TabletSharedPtr best_tablet;
auto cmp = [](TabletScore left, TabletScore right) { return left.score > right.score; };
std::priority_queue<TabletScore, std::vector<TabletScore>, decltype(cmp)> top_tablets(cmp);
auto handler = [&](const TabletSharedPtr& tablet_ptr) {
if (tablet_ptr->tablet_meta()->tablet_schema()->disable_auto_compaction()) {
LOG_EVERY_N(INFO, 500) << "Tablet " << tablet_ptr->tablet_id()
@ -794,23 +802,55 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction(
if (current_compaction_score < 5) {
tablet_ptr->set_skip_compaction(true, compaction_type, UnixSeconds());
}
if (current_compaction_score > highest_score) {
highest_score = current_compaction_score;
compaction_score = current_compaction_score;
best_tablet = tablet_ptr;
if (config::compaction_num_per_round > 1) {
TabletScore ts;
ts.score = current_compaction_score;
ts.tablet_ptr = tablet_ptr;
if ((top_tablets.size() >= config::compaction_num_per_round &&
current_compaction_score > top_tablets.top().score) ||
top_tablets.size() < config::compaction_num_per_round) {
top_tablets.push(ts);
if (top_tablets.size() > config::compaction_num_per_round) {
top_tablets.pop();
}
if (current_compaction_score > highest_score) {
highest_score = current_compaction_score;
compaction_score = current_compaction_score;
}
}
} else {
if (current_compaction_score > highest_score) {
highest_score = current_compaction_score;
compaction_score = current_compaction_score;
best_tablet = tablet_ptr;
}
}
};
for_each_tablet(handler, filter_all_tablets);
std::vector<TabletSharedPtr> picked_tablet;
if (best_tablet != nullptr) {
VLOG_CRITICAL << "Found the best tablet for compaction. "
<< "compaction_type=" << compaction_type_str
<< ", tablet_id=" << best_tablet->tablet_id() << ", path=" << data_dir->path()
<< ", compaction_score=" << compaction_score
<< ", highest_score=" << highest_score;
picked_tablet.emplace_back(std::move(best_tablet));
*score = compaction_score;
}
return best_tablet;
std::vector<TabletSharedPtr> reverse_top_tablets;
while (!top_tablets.empty()) {
reverse_top_tablets.emplace_back(top_tablets.top().tablet_ptr);
top_tablets.pop();
}
for (auto it = reverse_top_tablets.rbegin(); it != reverse_top_tablets.rend(); ++it) {
picked_tablet.emplace_back(*it);
}
return picked_tablet;
}
Status TabletManager::load_tablet_from_meta(DataDir* data_dir, TTabletId tablet_id,

View File

@ -73,7 +73,7 @@ public:
// If `is_drop_table_or_partition` is true, we need to remove all remote rowsets in this tablet.
Status drop_tablet(TTabletId tablet_id, TReplicaId replica_id, bool is_drop_table_or_partition);
TabletSharedPtr find_best_tablet_to_compaction(
std::vector<TabletSharedPtr> find_best_tablets_to_compaction(
CompactionType compaction_type, DataDir* data_dir,
const std::unordered_set<TabletSharedPtr>& tablet_submitted_compaction, uint32_t* score,
const std::unordered_map<std::string_view, std::shared_ptr<CumulativeCompactionPolicy>>&

View File

@ -32,10 +32,13 @@
#include "common/status.h"
#include "gtest/gtest_pred_impl.h"
#include "io/fs/local_file_system.h"
#include "olap/cumulative_compaction_policy.h"
#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/data_dir.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/options.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "olap/tablet_manager.h"
@ -84,6 +87,7 @@ public:
SAFE_DELETE(k_engine);
ExecEnv::GetInstance()->set_storage_engine(nullptr);
_tablet_mgr = nullptr;
config::compaction_num_per_round = 1;
}
StorageEngine* k_engine = nullptr;
@ -341,4 +345,217 @@ TEST_F(TabletMgrTest, GetRowsetId) {
}
}
TEST_F(TabletMgrTest, FindTabletWithCompact) {
auto create_tablet = [this](int64_t tablet_id, bool enable_single_compact, int rowset_size) {
std::vector<TColumn> cols;
TColumn col1;
col1.column_type.type = TPrimitiveType::SMALLINT;
col1.__set_column_name("col1");
col1.__set_is_key(true);
cols.push_back(col1);
TColumn col2;
col2.column_type.type = TPrimitiveType::INT;
col2.__set_column_name(SEQUENCE_COL);
col2.__set_is_key(false);
col2.__set_aggregation_type(TAggregationType::REPLACE);
cols.push_back(col2);
TColumn col3;
col3.column_type.type = TPrimitiveType::INT;
col3.__set_column_name("v1");
col3.__set_is_key(false);
col3.__set_aggregation_type(TAggregationType::REPLACE);
cols.push_back(col3);
RuntimeProfile profile("CreateTablet");
TTabletSchema tablet_schema;
tablet_schema.__set_short_key_column_count(1);
tablet_schema.__set_schema_hash(3333);
tablet_schema.__set_keys_type(TKeysType::UNIQUE_KEYS);
tablet_schema.__set_storage_type(TStorageType::COLUMN);
tablet_schema.__set_columns(cols);
tablet_schema.__set_sequence_col_idx(1);
tablet_schema.__set_enable_single_replica_compaction(enable_single_compact);
TCreateTabletReq create_tablet_req;
create_tablet_req.__set_tablet_schema(tablet_schema);
create_tablet_req.__set_tablet_id(tablet_id);
create_tablet_req.__set_version(1);
create_tablet_req.__set_replica_id(tablet_id * 10);
std::vector<DataDir*> data_dirs;
data_dirs.push_back(_data_dir);
Status create_st = _tablet_mgr->create_tablet(create_tablet_req, data_dirs, &profile);
ASSERT_TRUE(create_st.ok()) << create_st;
TabletSharedPtr tablet = _tablet_mgr->get_tablet(tablet_id);
ASSERT_TRUE(tablet);
// check dir exist
bool dir_exist = false;
Status exist_st = io::global_local_filesystem()->exists(tablet->tablet_path(), &dir_exist);
ASSERT_TRUE(exist_st.ok()) << exist_st;
ASSERT_TRUE(dir_exist);
// check meta has this tablet
TabletMetaSharedPtr new_tablet_meta(new TabletMeta());
Status check_meta_st =
TabletMetaManager::get_meta(_data_dir, tablet_id, 3333, new_tablet_meta);
ASSERT_TRUE(check_meta_st.ok()) << check_meta_st;
// insert into rowset
auto create_rowset = [=, this](int64_t start, int64 end) {
auto rowset_meta = std::make_shared<RowsetMeta>();
Version version(start, end);
rowset_meta->set_version(version);
rowset_meta->set_tablet_id(tablet->tablet_id());
rowset_meta->set_tablet_uid(tablet->tablet_uid());
rowset_meta->set_rowset_id(k_engine->next_rowset_id());
return std::make_shared<BetaRowset>(tablet->tablet_schema(), tablet->tablet_path(),
std::move(rowset_meta));
};
auto st = tablet->init();
ASSERT_TRUE(st.ok()) << st;
for (int i = 2; i <= rowset_size; ++i) {
auto rs = create_rowset(i, i);
auto st = tablet->add_inc_rowset(rs);
ASSERT_TRUE(st.ok()) << st;
}
};
int rowset_size = 5;
// create 10 tablets
for (int64_t id = 1; id <= 10; ++id) {
create_tablet(id, false, rowset_size++);
}
std::unordered_set<TabletSharedPtr> cumu_set;
std::unordered_map<std::string_view, std::shared_ptr<CumulativeCompactionPolicy>>
cumulative_compaction_policies;
cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY] =
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
CUMULATIVE_SIZE_BASED_POLICY);
cumulative_compaction_policies[CUMULATIVE_TIME_SERIES_POLICY] =
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
CUMULATIVE_TIME_SERIES_POLICY);
uint32_t score = 0;
auto compact_tablets = _tablet_mgr->find_best_tablets_to_compaction(
CompactionType::CUMULATIVE_COMPACTION, _data_dir, cumu_set, &score,
cumulative_compaction_policies);
ASSERT_EQ(compact_tablets.size(), 1);
ASSERT_EQ(compact_tablets[0]->tablet_id(), 10);
ASSERT_EQ(score, 13);
// create 10 tablets enable single compact
// 5 tablets do cumu compaction, 5 tablets do single compaction
// if BE_TEST is defined, tablet_id % 2 == 0 means that tablet needs to do single compact
for (int64_t id = 11; id <= 20; ++id) {
create_tablet(id, true, rowset_size++);
}
compact_tablets = _tablet_mgr->find_best_tablets_to_compaction(
CompactionType::CUMULATIVE_COMPACTION, _data_dir, cumu_set, &score,
cumulative_compaction_policies);
ASSERT_EQ(compact_tablets.size(), 1);
ASSERT_EQ(compact_tablets[0]->tablet_id(), 20);
ASSERT_EQ(score, 23);
create_tablet(21, false, rowset_size++);
compact_tablets = _tablet_mgr->find_best_tablets_to_compaction(
CompactionType::CUMULATIVE_COMPACTION, _data_dir, cumu_set, &score,
cumulative_compaction_policies);
ASSERT_EQ(compact_tablets.size(), 1);
ASSERT_EQ(compact_tablets[0]->tablet_id(), 21);
ASSERT_EQ(score, 24);
// drop all tablets
for (int64_t id = 1; id <= 21; ++id) {
Status drop_st = _tablet_mgr->drop_tablet(id, id * 10, false);
ASSERT_TRUE(drop_st.ok()) << drop_st;
}
{
config::compaction_num_per_round = 10;
for (int64_t i = 1; i <= 100; ++i) {
create_tablet(10000 + i, false, i);
}
compact_tablets = _tablet_mgr->find_best_tablets_to_compaction(
CompactionType::CUMULATIVE_COMPACTION, _data_dir, cumu_set, &score,
cumulative_compaction_policies);
ASSERT_EQ(compact_tablets.size(), 10);
int index = 0;
for (auto t : compact_tablets) {
ASSERT_EQ(t->tablet_id(), 10100 - index);
ASSERT_EQ(t->calc_compaction_score(
CompactionType::CUMULATIVE_COMPACTION,
cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY]),
99 - index);
index++;
}
config::compaction_num_per_round = 1;
// drop all tablets
for (int64_t id = 10001; id <= 10100; ++id) {
Status drop_st = _tablet_mgr->drop_tablet(id, id * 10, false);
ASSERT_TRUE(drop_st.ok()) << drop_st;
}
}
{
config::compaction_num_per_round = 10;
for (int64_t i = 1; i <= 100; ++i) {
create_tablet(20000 + i, false, i);
}
compact_tablets = _tablet_mgr->find_best_tablets_to_compaction(
CompactionType::CUMULATIVE_COMPACTION, _data_dir, cumu_set, &score,
cumulative_compaction_policies);
ASSERT_EQ(compact_tablets.size(), 10);
for (int i = 0; i < 10; ++i) {
ASSERT_EQ(compact_tablets[i]->tablet_id(), 20100 - i);
ASSERT_EQ(compact_tablets[i]->calc_compaction_score(
CompactionType::CUMULATIVE_COMPACTION,
cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY]),
99 - i);
}
config::compaction_num_per_round = 1;
// drop all tablets
for (int64_t id = 20001; id <= 20100; ++id) {
Status drop_st = _tablet_mgr->drop_tablet(id, id * 10, false);
ASSERT_TRUE(drop_st.ok()) << drop_st;
}
Status drop_st = _tablet_mgr->drop_tablet(20102, 20102 * 10, false);
ASSERT_TRUE(drop_st.ok()) << drop_st;
}
{
config::compaction_num_per_round = 10;
for (int64_t i = 1; i <= 5; ++i) {
create_tablet(30000 + i, false, i + 5);
}
compact_tablets = _tablet_mgr->find_best_tablets_to_compaction(
CompactionType::CUMULATIVE_COMPACTION, _data_dir, cumu_set, &score,
cumulative_compaction_policies);
ASSERT_EQ(compact_tablets.size(), 5);
for (int i = 0; i < 5; ++i) {
ASSERT_EQ(compact_tablets[i]->tablet_id(), 30000 + 5 - i);
ASSERT_EQ(compact_tablets[i]->calc_compaction_score(
CompactionType::CUMULATIVE_COMPACTION,
cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY]),
9 - i);
}
config::compaction_num_per_round = 1;
// drop all tablets
for (int64_t id = 30001; id <= 30005; ++id) {
Status drop_st = _tablet_mgr->drop_tablet(id, id * 10, false);
ASSERT_TRUE(drop_st.ok()) << drop_st;
}
}
Status trash_st = _tablet_mgr->start_trash_sweep();
ASSERT_TRUE(trash_st.ok()) << trash_st;
}
} // namespace doris