[Bug] Fix bug and add graceful exit for compaction producer (#5124)

1. add graceful exit mechanism for the compaction producer thread.
2. if compaction task submits unsuccessfully, the compaction task should pop from `_tablet_submitted_compaction`.
This commit is contained in:
weizuo93
2021-01-30 16:35:36 +08:00
committed by GitHub
parent 4ffc61be32
commit 90c2da54bd
3 changed files with 35 additions and 20 deletions

View File

@ -283,6 +283,9 @@ CONF_mInt32(max_compaction_threads, "10");
// The upper limit of "permits" held by all compaction tasks. This config can be set to limit memory consumption for compaction.
CONF_mInt64(total_permits_for_compaction_score, "10000");
// sleep interval in ms after generated compaction tasks
CONF_mInt32(generate_compaction_tasks_min_interval_ms, "10")
// Compaction task number per disk.
CONF_mInt32(compaction_task_num_per_disk, "2");

View File

@ -324,7 +324,8 @@ void StorageEngine::_compaction_tasks_producer_callback() {
int round = 0;
CompactionType compaction_type;
int32_t interval = 1;
int64_t interval = config::generate_compaction_tasks_min_interval_ms;
do {
if (!config::disable_auto_compaction) {
if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) {
@ -337,8 +338,8 @@ void StorageEngine::_compaction_tasks_producer_callback() {
std::vector<TabletSharedPtr> tablets_compaction =
_compaction_tasks_generator(compaction_type, data_dirs);
if (tablets_compaction.size() == 0) {
_wakeup_producer_flag = 0;
std::unique_lock<std::mutex> lock(_compaction_producer_sleep_mutex);
_wakeup_producer_flag = 0;
// It is necessary to wake up the thread on timeout to prevent deadlock
// in case of no running compaction task.
_compaction_producer_sleep_cv.wait_for(lock, std::chrono::milliseconds(2000),
@ -354,32 +355,19 @@ void StorageEngine::_compaction_tasks_producer_callback() {
for (const auto& tablet : tablets_compaction) {
int64_t permits = tablet->prepare_compaction_and_calculate_permits(compaction_type, tablet);
if (permits > 0 && _permit_limiter.request(permits)) {
{
// Push to _tablet_submitted_compaction before submitting task
std::unique_lock<std::mutex> lock(_tablet_submitted_compaction_mutex);
_tablet_submitted_compaction[tablet->data_dir()].emplace_back(
tablet->tablet_id());
}
// Push to _tablet_submitted_compaction before submitting task
_push_tablet_into_submitted_compaction(tablet);
auto st =_compaction_thread_pool->submit_func([=]() {
CgroupsMgr::apply_system_cgroup();
tablet->execute_compaction(compaction_type);
_permit_limiter.release(permits);
std::unique_lock<std::mutex> lock(_tablet_submitted_compaction_mutex);
std::vector<TTabletId>::iterator it_tablet =
find(_tablet_submitted_compaction[tablet->data_dir()].begin(),
_tablet_submitted_compaction[tablet->data_dir()].end(),
tablet->tablet_id());
if (it_tablet !=
_tablet_submitted_compaction[tablet->data_dir()].end()) {
_tablet_submitted_compaction[tablet->data_dir()].erase(it_tablet);
_wakeup_producer_flag = 1;
_compaction_producer_sleep_cv.notify_one();
}
_pop_tablet_from_submitted_compaction(tablet);
// reset compaction
tablet->reset_compaction(compaction_type);
});
if (!st.ok()) {
_permit_limiter.release(permits);
_pop_tablet_from_submitted_compaction(tablet);
// reset compaction
tablet->reset_compaction(compaction_type);
}
@ -388,7 +376,7 @@ void StorageEngine::_compaction_tasks_producer_callback() {
tablet->reset_compaction(compaction_type);
}
}
interval = 1;
interval = config::generate_compaction_tasks_min_interval_ms;
} else {
interval = config::check_auto_compaction_interval_seconds * 1000;
}
@ -414,4 +402,26 @@ std::vector<TabletSharedPtr> StorageEngine::_compaction_tasks_generator(
}
return tablets_compaction;
}
void StorageEngine::_push_tablet_into_submitted_compaction(TabletSharedPtr tablet) {
std::unique_lock<std::mutex> lock(_tablet_submitted_compaction_mutex);
_tablet_submitted_compaction[tablet->data_dir()].emplace_back(
tablet->tablet_id());
}
void StorageEngine::_pop_tablet_from_submitted_compaction(TabletSharedPtr tablet) {
std::unique_lock<std::mutex> lock(_tablet_submitted_compaction_mutex);
std::vector<TTabletId>::iterator it_tablet =
find(_tablet_submitted_compaction[tablet->data_dir()].begin(),
_tablet_submitted_compaction[tablet->data_dir()].end(),
tablet->tablet_id());
if (it_tablet !=
_tablet_submitted_compaction[tablet->data_dir()].end()) {
_tablet_submitted_compaction[tablet->data_dir()].erase(it_tablet);
std::unique_lock<std::mutex> lock(_compaction_producer_sleep_mutex);
_wakeup_producer_flag = 1;
_compaction_producer_sleep_cv.notify_one();
}
}
} // namespace doris

View File

@ -239,6 +239,8 @@ private:
void _compaction_tasks_producer_callback();
vector<TabletSharedPtr> _compaction_tasks_generator(CompactionType compaction_type,
std::vector<DataDir*> data_dirs);
void _push_tablet_into_submitted_compaction(TabletSharedPtr tablet);
void _pop_tablet_from_submitted_compaction(TabletSharedPtr tablet);
private:
struct CompactionCandidate {