[fix](bug) Fix BE thread safe start and stop #22560
This commit is contained in:
@ -305,6 +305,9 @@ void Daemon::calculate_metrics_thread() {
|
||||
std::map<std::string, int64_t> lst_net_receive_bytes;
|
||||
|
||||
do {
|
||||
if (!ExecEnv::GetInstance()->initialized()) {
|
||||
continue;
|
||||
}
|
||||
DorisMetrics::instance()->metric_registry()->trigger_all_hooks(true);
|
||||
|
||||
if (last_ts == -1L) {
|
||||
@ -353,12 +356,12 @@ void Daemon::calculate_metrics_thread() {
|
||||
DorisMetrics::instance()->all_segments_num->set_value(
|
||||
StorageEngine::instance()->tablet_manager()->get_segment_nums());
|
||||
}
|
||||
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(15)));
|
||||
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(15)) && !k_doris_exit);
|
||||
}
|
||||
|
||||
// clean up stale spilled files
|
||||
void Daemon::block_spill_gc_thread() {
|
||||
while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(60))) {
|
||||
while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(60)) && !k_doris_exit) {
|
||||
if (ExecEnv::GetInstance()->initialized()) {
|
||||
ExecEnv::GetInstance()->block_spill_mgr()->gc(200);
|
||||
}
|
||||
|
||||
@ -250,7 +250,8 @@ Status StorageEngine::start_bg_threads() {
|
||||
|
||||
void StorageEngine::_cache_clean_callback() {
|
||||
int32_t interval = config::cache_prune_stale_interval;
|
||||
while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))) {
|
||||
while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)) &&
|
||||
!k_doris_exit) {
|
||||
if (interval <= 0) {
|
||||
LOG(WARNING) << "config of cache clean interval is illegal: [" << interval
|
||||
<< "], force set to 3600 ";
|
||||
@ -278,7 +279,8 @@ void StorageEngine::_garbage_sweeper_thread_callback() {
|
||||
double usage = 1.0;
|
||||
// After the program starts, the first round of cleaning starts after min_interval.
|
||||
uint32_t curr_interval = min_interval;
|
||||
while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(curr_interval))) {
|
||||
while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(curr_interval)) &&
|
||||
!k_doris_exit) {
|
||||
// Function properties:
|
||||
// when usage < 0.6, ratio close to 1.(interval close to max_interval)
|
||||
// when usage at [0.6, 0.75], ratio is rapidly decreasing from 0.87 to 0.27.
|
||||
@ -312,7 +314,8 @@ void StorageEngine::_disk_stat_monitor_thread_callback() {
|
||||
<< ", force set to 1";
|
||||
interval = 1;
|
||||
}
|
||||
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
|
||||
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)) &&
|
||||
!k_doris_exit);
|
||||
}
|
||||
|
||||
void StorageEngine::check_cumulative_compaction_config() {
|
||||
@ -343,7 +346,8 @@ void StorageEngine::_unused_rowset_monitor_thread_callback() {
|
||||
<< ", force set to 1";
|
||||
interval = 1;
|
||||
}
|
||||
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
|
||||
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)) &&
|
||||
!k_doris_exit);
|
||||
}
|
||||
|
||||
void StorageEngine::_path_gc_thread_callback(DataDir* data_dir) {
|
||||
@ -362,7 +366,8 @@ void StorageEngine::_path_gc_thread_callback(DataDir* data_dir) {
|
||||
<< "will be forced set to half hour";
|
||||
interval = 1800; // 0.5 hour
|
||||
}
|
||||
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
|
||||
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)) &&
|
||||
!k_doris_exit);
|
||||
}
|
||||
|
||||
void StorageEngine::_path_scan_thread_callback(DataDir* data_dir) {
|
||||
@ -380,7 +385,8 @@ void StorageEngine::_path_scan_thread_callback(DataDir* data_dir) {
|
||||
<< "will be forced set to one day";
|
||||
interval = 24 * 3600; // one day
|
||||
}
|
||||
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
|
||||
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)) &&
|
||||
!k_doris_exit);
|
||||
}
|
||||
|
||||
void StorageEngine::_tablet_checkpoint_callback(const std::vector<DataDir*>& data_dirs) {
|
||||
@ -395,7 +401,8 @@ void StorageEngine::_tablet_checkpoint_callback(const std::vector<DataDir*>& dat
|
||||
}
|
||||
}
|
||||
interval = config::generate_tablet_meta_checkpoint_tasks_interval_secs;
|
||||
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
|
||||
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)) &&
|
||||
!k_doris_exit);
|
||||
}
|
||||
|
||||
void StorageEngine::_tablet_path_check_callback() {
|
||||
@ -414,7 +421,8 @@ void StorageEngine::_tablet_path_check_callback() {
|
||||
do {
|
||||
int32_t batch_size = config::tablet_path_check_batch_size;
|
||||
if (batch_size <= 0) {
|
||||
if (_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))) {
|
||||
if (_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)) &&
|
||||
!k_doris_exit) {
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
@ -472,7 +480,8 @@ void StorageEngine::_tablet_path_check_callback() {
|
||||
}
|
||||
}
|
||||
|
||||
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
|
||||
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)) &&
|
||||
!k_doris_exit);
|
||||
}
|
||||
|
||||
void StorageEngine::_adjust_compaction_thread_num() {
|
||||
@ -628,7 +637,8 @@ void StorageEngine::_compaction_tasks_producer_callback() {
|
||||
} else {
|
||||
interval = 5000; // 5s to check disable_auto_compaction
|
||||
}
|
||||
} while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval)));
|
||||
} while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval)) &&
|
||||
!k_doris_exit);
|
||||
}
|
||||
|
||||
void StorageEngine::_update_replica_infos_callback() {
|
||||
@ -712,7 +722,8 @@ void StorageEngine::_update_replica_infos_callback() {
|
||||
start = end;
|
||||
}
|
||||
interval = config::update_replica_infos_interval_seconds;
|
||||
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
|
||||
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)) &&
|
||||
!k_doris_exit);
|
||||
}
|
||||
|
||||
Status StorageEngine::_submit_single_replica_compaction_task(TabletSharedPtr tablet,
|
||||
@ -1077,12 +1088,14 @@ void StorageEngine::_cooldown_tasks_producer_callback() {
|
||||
LOG(INFO) << "failed to submit cooldown task";
|
||||
}
|
||||
}
|
||||
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
|
||||
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)) &&
|
||||
!k_doris_exit);
|
||||
}
|
||||
|
||||
void StorageEngine::_remove_unused_remote_files_callback() {
|
||||
while (!_stop_background_threads_latch.wait_for(
|
||||
std::chrono::seconds(config::remove_unused_remote_files_interval_sec))) {
|
||||
std::chrono::seconds(config::remove_unused_remote_files_interval_sec)) &&
|
||||
!k_doris_exit) {
|
||||
LOG(INFO) << "begin to remove unused remote files";
|
||||
Tablet::remove_unused_remote_files();
|
||||
}
|
||||
@ -1093,7 +1106,8 @@ void StorageEngine::_cold_data_compaction_producer_callback() {
|
||||
std::mutex tablet_submitted_mtx;
|
||||
|
||||
while (!_stop_background_threads_latch.wait_for(
|
||||
std::chrono::seconds(config::cold_data_compaction_interval_sec))) {
|
||||
std::chrono::seconds(config::cold_data_compaction_interval_sec)) &&
|
||||
!k_doris_exit) {
|
||||
if (config::disable_auto_compaction ||
|
||||
MemInfo::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
|
||||
continue;
|
||||
@ -1202,7 +1216,7 @@ void StorageEngine::_cache_file_cleaner_tasks_producer_callback() {
|
||||
interval = 10;
|
||||
}
|
||||
bool stop = _stop_background_threads_latch.wait_for(std::chrono::seconds(interval));
|
||||
if (stop) {
|
||||
if (stop || k_doris_exit) {
|
||||
break;
|
||||
}
|
||||
if (config::generate_cache_cleaner_task_interval_sec <= 0) {
|
||||
@ -1250,7 +1264,8 @@ int64_t StorageEngine::get_pending_publish_min_version(int64_t tablet_id) {
|
||||
}
|
||||
|
||||
void StorageEngine::_async_publish_callback() {
|
||||
while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(30))) {
|
||||
while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(30)) &&
|
||||
!k_doris_exit) {
|
||||
// tablet, publish_version
|
||||
std::vector<std::pair<TabletSharedPtr, int64_t>> need_removed_tasks;
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user