From 4fa25b6eb9c30ad9434aa551bba13aeb51134c8a Mon Sep 17 00:00:00 2001 From: xinghuayu007 <1450306854@qq.com> Date: Fri, 23 Apr 2021 09:45:15 +0800 Subject: [PATCH] [Optimize] make tablet meta checkpoint to be threadpool model (#5654) Currently Tablet meta checkpoint is a memory-exhausted operation. If a host has 12 disks, it will start 12 threads to do tablet meta checkpoint. In our experience, the data size of one tablet can be as high as 2G. If 12 threads do the checkpoint at the same time, it maybe cause OOM. Therefore, this PR try to solve this problem. Firstly, it only start one thread to produce table meta checkpoint tasks. Secondly, it creates a thread pool to handle these tasks. You can configure the size of the thread pool to control the parallelism in case of OOM. It is a producer-customer model. --- be/src/common/config.h | 4 ++++ be/src/olap/olap_server.cpp | 43 +++++++++++++++++++--------------- be/src/olap/storage_engine.cpp | 5 +++- be/src/olap/storage_engine.h | 8 ++++--- 4 files changed, 37 insertions(+), 23 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 8b4aab5ae4..0c9fd350c6 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -286,6 +286,9 @@ CONF_mInt64(min_compaction_failure_interval_sec, "600"); // 10 min CONF_mInt32(min_compaction_threads, "10"); CONF_mInt32(max_compaction_threads, "10"); +// Thread count to do tablet meta checkpoint, -1 means use the data directories count. +CONF_Int32(max_meta_checkpoint_threads, "-1"); + // The upper limit of "permits" held by all compaction tasks. This config can be set to limit memory consumption for compaction. CONF_mInt64(total_permits_for_compaction_score, "10000"); @@ -522,6 +525,7 @@ CONF_Int32(flush_thread_num_per_store, "2"); // config for tablet meta checkpoint CONF_mInt32(tablet_meta_checkpoint_min_new_rowsets_num, "10"); CONF_mInt32(tablet_meta_checkpoint_min_interval_secs, "600"); +CONF_Int32(generate_tablet_meta_checkpoint_tasks_interval_secs, "600"); // config for default rowset type // Valid configs: ALPHA, BETA diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 97bbf8483a..ed72dc1616 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -81,16 +81,19 @@ Status StorageEngine::start_bg_threads() { &_compaction_tasks_producer_thread)); LOG(INFO) << "compaction tasks producer thread started"; - // tablet checkpoint thread - for (auto data_dir : data_dirs) { - scoped_refptr tablet_checkpoint_thread; - RETURN_IF_ERROR(Thread::create( - "StorageEngine", "tablet_checkpoint_thread", - [this, data_dir]() { this->_tablet_checkpoint_callback(data_dir); }, - &tablet_checkpoint_thread)); - _tablet_checkpoint_threads.emplace_back(tablet_checkpoint_thread); + int32_t max_checkpoint_thread_num = config::max_meta_checkpoint_threads; + if (max_checkpoint_thread_num < 0) { + max_checkpoint_thread_num = data_dirs.size(); } - LOG(INFO) << "tablet checkpoint thread started"; + ThreadPoolBuilder("TabletMetaCheckpointTaskThreadPool") + .set_max_threads(max_checkpoint_thread_num) + .build(&_tablet_meta_checkpoint_thread_pool); + + RETURN_IF_ERROR(Thread::create( + "StorageEngine", "tablet_checkpoint_tasks_producer_thread", + [this, data_dirs]() { this->_tablet_checkpoint_callback(data_dirs); }, + &_tablet_checkpoint_tasks_producer_thread)); + LOG(INFO) << "tablet checkpoint tasks producer thread started"; // fd cache clean thread RETURN_IF_ERROR(Thread::create( @@ -290,22 +293,24 @@ void StorageEngine::_path_scan_thread_callback(DataDir* data_dir) { } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval))); } -void StorageEngine::_tablet_checkpoint_callback(DataDir* data_dir) { +void StorageEngine::_tablet_checkpoint_callback(const std::vector& data_dirs) { #ifdef GOOGLE_PROFILER ProfilerRegisterThread(); #endif - int64_t interval = config::tablet_meta_checkpoint_min_interval_secs; + int64_t interval = config::generate_tablet_meta_checkpoint_tasks_interval_secs; do { - LOG(INFO) << "begin to do tablet meta checkpoint:" << data_dir->path(); - int64_t start_time = UnixMillis(); - _tablet_manager->do_tablet_meta_checkpoint(data_dir); - int64_t used_time = (UnixMillis() - start_time) / 1000; - if (used_time < config::tablet_meta_checkpoint_min_interval_secs) { - interval = config::tablet_meta_checkpoint_min_interval_secs - used_time; - } else { - interval = 1; + LOG(INFO) << "begin to produce tablet meta checkpoint tasks."; + for (auto data_dir : data_dirs) { + auto st =_tablet_meta_checkpoint_thread_pool->submit_func([=]() { + CgroupsMgr::apply_system_cgroup(); + _tablet_manager->do_tablet_meta_checkpoint(data_dir); + }); + if (!st.ok()) { + LOG(WARNING) << "submit tablet checkpoint tasks failed."; + } } + interval = config::generate_tablet_meta_checkpoint_tasks_interval_secs; } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval))); } diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 3747d91dbe..cd01226806 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -145,6 +145,9 @@ StorageEngine::~StorageEngine() { if (_compaction_thread_pool) { _compaction_thread_pool->shutdown(); } + if (_tablet_meta_checkpoint_thread_pool) { + _tablet_meta_checkpoint_thread_pool->shutdown(); + } } void StorageEngine::load_data_dirs(const std::vector& data_dirs) { @@ -543,6 +546,7 @@ void StorageEngine::stop() { THREAD_JOIN(_garbage_sweeper_thread); THREAD_JOIN(_disk_stat_monitor_thread); THREAD_JOIN(_fd_cache_clean_thread); + THREAD_JOIN(_tablet_checkpoint_tasks_producer_thread); #undef THREAD_JOIN #define THREADS_JOIN(threads) \ @@ -554,7 +558,6 @@ void StorageEngine::stop() { THREADS_JOIN(_path_gc_threads); THREADS_JOIN(_path_scan_threads); - THREADS_JOIN(_tablet_checkpoint_threads); #undef THREADS_JOIN } diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index dbae15be7e..bc06db273e 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -224,7 +224,7 @@ private: void _path_scan_thread_callback(DataDir* data_dir); - void _tablet_checkpoint_callback(DataDir* data_dir); + void _tablet_checkpoint_callback(const std::vector& data_dirs); // parse the default rowset type config to RowsetTypePB void _parse_default_rowset_type(); @@ -318,8 +318,8 @@ private: std::vector> _path_gc_threads; // threads to scan disk paths std::vector> _path_scan_threads; - // threads to run tablet checkpoint - std::vector> _tablet_checkpoint_threads; + // thread to produce tablet checkpoint tasks + scoped_refptr _tablet_checkpoint_tasks_producer_thread; // For tablet and disk-stat report std::mutex _report_mtx; @@ -342,6 +342,8 @@ private: std::unique_ptr _compaction_thread_pool; + std::unique_ptr _tablet_meta_checkpoint_thread_pool; + CompactionPermitLimiter _permit_limiter; std::mutex _tablet_submitted_compaction_mutex;