diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index af220b21a4..fe971165bd 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -305,6 +305,9 @@ void Daemon::calculate_metrics_thread() { std::map lst_net_receive_bytes; do { + if (!ExecEnv::GetInstance()->initialized()) { + continue; + } DorisMetrics::instance()->metric_registry()->trigger_all_hooks(true); if (last_ts == -1L) { @@ -353,12 +356,12 @@ void Daemon::calculate_metrics_thread() { DorisMetrics::instance()->all_segments_num->set_value( StorageEngine::instance()->tablet_manager()->get_segment_nums()); } - } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(15))); + } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(15)) && !k_doris_exit); } // clean up stale spilled files void Daemon::block_spill_gc_thread() { - while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(60))) { + while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(60)) && !k_doris_exit) { if (ExecEnv::GetInstance()->initialized()) { ExecEnv::GetInstance()->block_spill_mgr()->gc(200); } diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index e698980a9c..e2840d29b4 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -250,7 +250,8 @@ Status StorageEngine::start_bg_threads() { void StorageEngine::_cache_clean_callback() { int32_t interval = config::cache_prune_stale_interval; - while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))) { + while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)) && + !k_doris_exit) { if (interval <= 0) { LOG(WARNING) << "config of cache clean interval is illegal: [" << interval << "], force set to 3600 "; @@ -278,7 +279,8 @@ void StorageEngine::_garbage_sweeper_thread_callback() { double usage = 1.0; // After the program starts, the first round of cleaning starts after min_interval. uint32_t curr_interval = min_interval; - while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(curr_interval))) { + while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(curr_interval)) && + !k_doris_exit) { // Function properties: // when usage < 0.6, ratio close to 1.(interval close to max_interval) // when usage at [0.6, 0.75], ratio is rapidly decreasing from 0.87 to 0.27. @@ -312,7 +314,8 @@ void StorageEngine::_disk_stat_monitor_thread_callback() { << ", force set to 1"; interval = 1; } - } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))); + } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)) && + !k_doris_exit); } void StorageEngine::check_cumulative_compaction_config() { @@ -343,7 +346,8 @@ void StorageEngine::_unused_rowset_monitor_thread_callback() { << ", force set to 1"; interval = 1; } - } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))); + } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)) && + !k_doris_exit); } void StorageEngine::_path_gc_thread_callback(DataDir* data_dir) { @@ -362,7 +366,8 @@ void StorageEngine::_path_gc_thread_callback(DataDir* data_dir) { << "will be forced set to half hour"; interval = 1800; // 0.5 hour } - } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))); + } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)) && + !k_doris_exit); } void StorageEngine::_path_scan_thread_callback(DataDir* data_dir) { @@ -380,7 +385,8 @@ void StorageEngine::_path_scan_thread_callback(DataDir* data_dir) { << "will be forced set to one day"; interval = 24 * 3600; // one day } - } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))); + } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)) && + !k_doris_exit); } void StorageEngine::_tablet_checkpoint_callback(const std::vector& data_dirs) { @@ -395,7 +401,8 @@ void StorageEngine::_tablet_checkpoint_callback(const std::vector& dat } } interval = config::generate_tablet_meta_checkpoint_tasks_interval_secs; - } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))); + } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)) && + !k_doris_exit); } void StorageEngine::_tablet_path_check_callback() { @@ -414,7 +421,8 @@ void StorageEngine::_tablet_path_check_callback() { do { int32_t batch_size = config::tablet_path_check_batch_size; if (batch_size <= 0) { - if (_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))) { + if (_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)) && + !k_doris_exit) { break; } continue; @@ -472,7 +480,8 @@ void StorageEngine::_tablet_path_check_callback() { } } - } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))); + } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)) && + !k_doris_exit); } void StorageEngine::_adjust_compaction_thread_num() { @@ -628,7 +637,8 @@ void StorageEngine::_compaction_tasks_producer_callback() { } else { interval = 5000; // 5s to check disable_auto_compaction } - } while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval))); + } while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval)) && + !k_doris_exit); } void StorageEngine::_update_replica_infos_callback() { @@ -712,7 +722,8 @@ void StorageEngine::_update_replica_infos_callback() { start = end; } interval = config::update_replica_infos_interval_seconds; - } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))); + } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)) && + !k_doris_exit); } Status StorageEngine::_submit_single_replica_compaction_task(TabletSharedPtr tablet, @@ -1077,12 +1088,14 @@ void StorageEngine::_cooldown_tasks_producer_callback() { LOG(INFO) << "failed to submit cooldown task"; } } - } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))); + } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)) && + !k_doris_exit); } void StorageEngine::_remove_unused_remote_files_callback() { while (!_stop_background_threads_latch.wait_for( - std::chrono::seconds(config::remove_unused_remote_files_interval_sec))) { + std::chrono::seconds(config::remove_unused_remote_files_interval_sec)) && + !k_doris_exit) { LOG(INFO) << "begin to remove unused remote files"; Tablet::remove_unused_remote_files(); } @@ -1093,7 +1106,8 @@ void StorageEngine::_cold_data_compaction_producer_callback() { std::mutex tablet_submitted_mtx; while (!_stop_background_threads_latch.wait_for( - std::chrono::seconds(config::cold_data_compaction_interval_sec))) { + std::chrono::seconds(config::cold_data_compaction_interval_sec)) && + !k_doris_exit) { if (config::disable_auto_compaction || MemInfo::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) { continue; @@ -1202,7 +1216,7 @@ void StorageEngine::_cache_file_cleaner_tasks_producer_callback() { interval = 10; } bool stop = _stop_background_threads_latch.wait_for(std::chrono::seconds(interval)); - if (stop) { + if (stop || k_doris_exit) { break; } if (config::generate_cache_cleaner_task_interval_sec <= 0) { @@ -1250,7 +1264,8 @@ int64_t StorageEngine::get_pending_publish_min_version(int64_t tablet_id) { } void StorageEngine::_async_publish_callback() { - while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(30))) { + while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(30)) && + !k_doris_exit) { // tablet, publish_version std::vector> need_removed_tasks; {