[Optimize] make tablet meta checkpoint to be threadpool model (#5654)

Currently Tablet meta checkpoint is a memory-exhausted operation.
If a host has 12 disks, it will start 12 threads to do tablet meta checkpoint.
In our experience, the data size of one tablet can be as high as 2G.
If 12 threads do the checkpoint at the same time, it maybe cause OOM.

Therefore, this PR try to solve this problem.
Firstly, it only start one thread to produce table meta checkpoint tasks.
Secondly, it creates a thread pool to handle these tasks.
You can configure the size of the thread pool to control the parallelism in case of OOM.
It is a producer-customer model.
This commit is contained in:
xinghuayu007
2021-04-23 09:45:15 +08:00
committed by GitHub
parent 7eea811f6b
commit 4fa25b6eb9
4 changed files with 37 additions and 23 deletions

View File

@ -286,6 +286,9 @@ CONF_mInt64(min_compaction_failure_interval_sec, "600"); // 10 min
CONF_mInt32(min_compaction_threads, "10");
CONF_mInt32(max_compaction_threads, "10");
// Thread count to do tablet meta checkpoint, -1 means use the data directories count.
CONF_Int32(max_meta_checkpoint_threads, "-1");
// The upper limit of "permits" held by all compaction tasks. This config can be set to limit memory consumption for compaction.
CONF_mInt64(total_permits_for_compaction_score, "10000");
@ -522,6 +525,7 @@ CONF_Int32(flush_thread_num_per_store, "2");
// config for tablet meta checkpoint
CONF_mInt32(tablet_meta_checkpoint_min_new_rowsets_num, "10");
CONF_mInt32(tablet_meta_checkpoint_min_interval_secs, "600");
CONF_Int32(generate_tablet_meta_checkpoint_tasks_interval_secs, "600");
// config for default rowset type
// Valid configs: ALPHA, BETA

View File

@ -81,16 +81,19 @@ Status StorageEngine::start_bg_threads() {
&_compaction_tasks_producer_thread));
LOG(INFO) << "compaction tasks producer thread started";
// tablet checkpoint thread
for (auto data_dir : data_dirs) {
scoped_refptr<Thread> tablet_checkpoint_thread;
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "tablet_checkpoint_thread",
[this, data_dir]() { this->_tablet_checkpoint_callback(data_dir); },
&tablet_checkpoint_thread));
_tablet_checkpoint_threads.emplace_back(tablet_checkpoint_thread);
int32_t max_checkpoint_thread_num = config::max_meta_checkpoint_threads;
if (max_checkpoint_thread_num < 0) {
max_checkpoint_thread_num = data_dirs.size();
}
LOG(INFO) << "tablet checkpoint thread started";
ThreadPoolBuilder("TabletMetaCheckpointTaskThreadPool")
.set_max_threads(max_checkpoint_thread_num)
.build(&_tablet_meta_checkpoint_thread_pool);
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "tablet_checkpoint_tasks_producer_thread",
[this, data_dirs]() { this->_tablet_checkpoint_callback(data_dirs); },
&_tablet_checkpoint_tasks_producer_thread));
LOG(INFO) << "tablet checkpoint tasks producer thread started";
// fd cache clean thread
RETURN_IF_ERROR(Thread::create(
@ -290,22 +293,24 @@ void StorageEngine::_path_scan_thread_callback(DataDir* data_dir) {
} while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval)));
}
void StorageEngine::_tablet_checkpoint_callback(DataDir* data_dir) {
void StorageEngine::_tablet_checkpoint_callback(const std::vector<DataDir*>& data_dirs) {
#ifdef GOOGLE_PROFILER
ProfilerRegisterThread();
#endif
int64_t interval = config::tablet_meta_checkpoint_min_interval_secs;
int64_t interval = config::generate_tablet_meta_checkpoint_tasks_interval_secs;
do {
LOG(INFO) << "begin to do tablet meta checkpoint:" << data_dir->path();
int64_t start_time = UnixMillis();
_tablet_manager->do_tablet_meta_checkpoint(data_dir);
int64_t used_time = (UnixMillis() - start_time) / 1000;
if (used_time < config::tablet_meta_checkpoint_min_interval_secs) {
interval = config::tablet_meta_checkpoint_min_interval_secs - used_time;
} else {
interval = 1;
LOG(INFO) << "begin to produce tablet meta checkpoint tasks.";
for (auto data_dir : data_dirs) {
auto st =_tablet_meta_checkpoint_thread_pool->submit_func([=]() {
CgroupsMgr::apply_system_cgroup();
_tablet_manager->do_tablet_meta_checkpoint(data_dir);
});
if (!st.ok()) {
LOG(WARNING) << "submit tablet checkpoint tasks failed.";
}
}
interval = config::generate_tablet_meta_checkpoint_tasks_interval_secs;
} while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval)));
}

View File

@ -145,6 +145,9 @@ StorageEngine::~StorageEngine() {
if (_compaction_thread_pool) {
_compaction_thread_pool->shutdown();
}
if (_tablet_meta_checkpoint_thread_pool) {
_tablet_meta_checkpoint_thread_pool->shutdown();
}
}
void StorageEngine::load_data_dirs(const std::vector<DataDir*>& data_dirs) {
@ -543,6 +546,7 @@ void StorageEngine::stop() {
THREAD_JOIN(_garbage_sweeper_thread);
THREAD_JOIN(_disk_stat_monitor_thread);
THREAD_JOIN(_fd_cache_clean_thread);
THREAD_JOIN(_tablet_checkpoint_tasks_producer_thread);
#undef THREAD_JOIN
#define THREADS_JOIN(threads) \
@ -554,7 +558,6 @@ void StorageEngine::stop() {
THREADS_JOIN(_path_gc_threads);
THREADS_JOIN(_path_scan_threads);
THREADS_JOIN(_tablet_checkpoint_threads);
#undef THREADS_JOIN
}

View File

@ -224,7 +224,7 @@ private:
void _path_scan_thread_callback(DataDir* data_dir);
void _tablet_checkpoint_callback(DataDir* data_dir);
void _tablet_checkpoint_callback(const std::vector<DataDir*>& data_dirs);
// parse the default rowset type config to RowsetTypePB
void _parse_default_rowset_type();
@ -318,8 +318,8 @@ private:
std::vector<scoped_refptr<Thread>> _path_gc_threads;
// threads to scan disk paths
std::vector<scoped_refptr<Thread>> _path_scan_threads;
// threads to run tablet checkpoint
std::vector<scoped_refptr<Thread>> _tablet_checkpoint_threads;
// thread to produce tablet checkpoint tasks
scoped_refptr<Thread> _tablet_checkpoint_tasks_producer_thread;
// For tablet and disk-stat report
std::mutex _report_mtx;
@ -342,6 +342,8 @@ private:
std::unique_ptr<ThreadPool> _compaction_thread_pool;
std::unique_ptr<ThreadPool> _tablet_meta_checkpoint_thread_pool;
CompactionPermitLimiter _permit_limiter;
std::mutex _tablet_submitted_compaction_mutex;