diff --git a/be/src/common/config.h b/be/src/common/config.h index 8b4aab5ae4..0c9fd350c6 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -286,6 +286,9 @@ CONF_mInt64(min_compaction_failure_interval_sec, "600"); // 10 min CONF_mInt32(min_compaction_threads, "10"); CONF_mInt32(max_compaction_threads, "10"); +// Thread count to do tablet meta checkpoint, -1 means use the data directories count. +CONF_Int32(max_meta_checkpoint_threads, "-1"); + // The upper limit of "permits" held by all compaction tasks. This config can be set to limit memory consumption for compaction. CONF_mInt64(total_permits_for_compaction_score, "10000"); @@ -522,6 +525,7 @@ CONF_Int32(flush_thread_num_per_store, "2"); // config for tablet meta checkpoint CONF_mInt32(tablet_meta_checkpoint_min_new_rowsets_num, "10"); CONF_mInt32(tablet_meta_checkpoint_min_interval_secs, "600"); +CONF_Int32(generate_tablet_meta_checkpoint_tasks_interval_secs, "600"); // config for default rowset type // Valid configs: ALPHA, BETA diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 97bbf8483a..ed72dc1616 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -81,16 +81,19 @@ Status StorageEngine::start_bg_threads() { &_compaction_tasks_producer_thread)); LOG(INFO) << "compaction tasks producer thread started"; - // tablet checkpoint thread - for (auto data_dir : data_dirs) { - scoped_refptr tablet_checkpoint_thread; - RETURN_IF_ERROR(Thread::create( - "StorageEngine", "tablet_checkpoint_thread", - [this, data_dir]() { this->_tablet_checkpoint_callback(data_dir); }, - &tablet_checkpoint_thread)); - _tablet_checkpoint_threads.emplace_back(tablet_checkpoint_thread); + int32_t max_checkpoint_thread_num = config::max_meta_checkpoint_threads; + if (max_checkpoint_thread_num < 0) { + max_checkpoint_thread_num = data_dirs.size(); } - LOG(INFO) << "tablet checkpoint thread started"; + ThreadPoolBuilder("TabletMetaCheckpointTaskThreadPool") + .set_max_threads(max_checkpoint_thread_num) + .build(&_tablet_meta_checkpoint_thread_pool); + + RETURN_IF_ERROR(Thread::create( + "StorageEngine", "tablet_checkpoint_tasks_producer_thread", + [this, data_dirs]() { this->_tablet_checkpoint_callback(data_dirs); }, + &_tablet_checkpoint_tasks_producer_thread)); + LOG(INFO) << "tablet checkpoint tasks producer thread started"; // fd cache clean thread RETURN_IF_ERROR(Thread::create( @@ -290,22 +293,24 @@ void StorageEngine::_path_scan_thread_callback(DataDir* data_dir) { } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval))); } -void StorageEngine::_tablet_checkpoint_callback(DataDir* data_dir) { +void StorageEngine::_tablet_checkpoint_callback(const std::vector& data_dirs) { #ifdef GOOGLE_PROFILER ProfilerRegisterThread(); #endif - int64_t interval = config::tablet_meta_checkpoint_min_interval_secs; + int64_t interval = config::generate_tablet_meta_checkpoint_tasks_interval_secs; do { - LOG(INFO) << "begin to do tablet meta checkpoint:" << data_dir->path(); - int64_t start_time = UnixMillis(); - _tablet_manager->do_tablet_meta_checkpoint(data_dir); - int64_t used_time = (UnixMillis() - start_time) / 1000; - if (used_time < config::tablet_meta_checkpoint_min_interval_secs) { - interval = config::tablet_meta_checkpoint_min_interval_secs - used_time; - } else { - interval = 1; + LOG(INFO) << "begin to produce tablet meta checkpoint tasks."; + for (auto data_dir : data_dirs) { + auto st =_tablet_meta_checkpoint_thread_pool->submit_func([=]() { + CgroupsMgr::apply_system_cgroup(); + _tablet_manager->do_tablet_meta_checkpoint(data_dir); + }); + if (!st.ok()) { + LOG(WARNING) << "submit tablet checkpoint tasks failed."; + } } + interval = config::generate_tablet_meta_checkpoint_tasks_interval_secs; } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval))); } diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 3747d91dbe..cd01226806 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -145,6 +145,9 @@ StorageEngine::~StorageEngine() { if (_compaction_thread_pool) { _compaction_thread_pool->shutdown(); } + if (_tablet_meta_checkpoint_thread_pool) { + _tablet_meta_checkpoint_thread_pool->shutdown(); + } } void StorageEngine::load_data_dirs(const std::vector& data_dirs) { @@ -543,6 +546,7 @@ void StorageEngine::stop() { THREAD_JOIN(_garbage_sweeper_thread); THREAD_JOIN(_disk_stat_monitor_thread); THREAD_JOIN(_fd_cache_clean_thread); + THREAD_JOIN(_tablet_checkpoint_tasks_producer_thread); #undef THREAD_JOIN #define THREADS_JOIN(threads) \ @@ -554,7 +558,6 @@ void StorageEngine::stop() { THREADS_JOIN(_path_gc_threads); THREADS_JOIN(_path_scan_threads); - THREADS_JOIN(_tablet_checkpoint_threads); #undef THREADS_JOIN } diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index dbae15be7e..bc06db273e 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -224,7 +224,7 @@ private: void _path_scan_thread_callback(DataDir* data_dir); - void _tablet_checkpoint_callback(DataDir* data_dir); + void _tablet_checkpoint_callback(const std::vector& data_dirs); // parse the default rowset type config to RowsetTypePB void _parse_default_rowset_type(); @@ -318,8 +318,8 @@ private: std::vector> _path_gc_threads; // threads to scan disk paths std::vector> _path_scan_threads; - // threads to run tablet checkpoint - std::vector> _tablet_checkpoint_threads; + // thread to produce tablet checkpoint tasks + scoped_refptr _tablet_checkpoint_tasks_producer_thread; // For tablet and disk-stat report std::mutex _report_mtx; @@ -342,6 +342,8 @@ private: std::unique_ptr _compaction_thread_pool; + std::unique_ptr _tablet_meta_checkpoint_thread_pool; + CompactionPermitLimiter _permit_limiter; std::mutex _tablet_submitted_compaction_mutex;