[improve](topn) seperate multiget rpc to ThreadPool (#16598)
multiget_data working in bthread and may block the whole worker pthread of BRPC framework and effect other bthreads, so I seperate work task into a seperate task pool.
This commit is contained in:
@ -322,6 +322,9 @@ CONF_mInt64(cumulative_compaction_max_deltas, "100");
|
||||
// This config can be set to limit thread number in segcompaction thread pool.
|
||||
CONF_mInt32(seg_compaction_max_threads, "10");
|
||||
|
||||
// This config can be set to limit thread number in multiget thread pool.
|
||||
CONF_mInt32(multi_get_max_threads, "10");
|
||||
|
||||
// The upper limit of "permits" held by all compaction tasks. This config can be set to limit memory consumption for compaction.
|
||||
CONF_mInt64(total_permits_for_compaction_score, "10000");
|
||||
|
||||
|
||||
@ -103,6 +103,10 @@ Status StorageEngine::start_bg_threads() {
|
||||
.set_max_threads(max_checkpoint_thread_num)
|
||||
.build(&_tablet_meta_checkpoint_thread_pool);
|
||||
|
||||
ThreadPoolBuilder("MultiGetTaskThreadPool")
|
||||
.set_min_threads(config::multi_get_max_threads)
|
||||
.set_max_threads(config::multi_get_max_threads)
|
||||
.build(&_bg_multi_get_thread_pool);
|
||||
RETURN_IF_ERROR(Thread::create(
|
||||
"StorageEngine", "tablet_checkpoint_tasks_producer_thread",
|
||||
[this, data_dirs]() { this->_tablet_checkpoint_callback(data_dirs); },
|
||||
|
||||
@ -193,6 +193,7 @@ public:
|
||||
return _tablet_publish_txn_thread_pool;
|
||||
}
|
||||
bool stopped() { return _stopped; }
|
||||
ThreadPool* get_bg_multiget_threadpool() { return _bg_multi_get_thread_pool.get(); }
|
||||
|
||||
private:
|
||||
// Instance should be inited from `static open()`
|
||||
@ -374,6 +375,7 @@ private:
|
||||
std::unique_ptr<ThreadPool> _tablet_publish_txn_thread_pool;
|
||||
|
||||
std::unique_ptr<ThreadPool> _tablet_meta_checkpoint_thread_pool;
|
||||
std::unique_ptr<ThreadPool> _bg_multi_get_thread_pool;
|
||||
|
||||
CompactionPermitLimiter _permit_limiter;
|
||||
|
||||
|
||||
@ -1097,14 +1097,22 @@ void PInternalServiceImpl::multiget_data(google::protobuf::RpcController* contro
|
||||
const PMultiGetRequest* request,
|
||||
PMultiGetResponse* response,
|
||||
google::protobuf::Closure* done) {
|
||||
// multi get data by rowid
|
||||
MonotonicStopWatch watch;
|
||||
watch.start();
|
||||
brpc::ClosureGuard closure_guard(done);
|
||||
response->mutable_status()->set_status_code(0);
|
||||
Status st = _multi_get(request, response);
|
||||
st.to_protobuf(response->mutable_status());
|
||||
LOG(INFO) << "multiget_data finished, cost(us):" << watch.elapsed_time() / 1000;
|
||||
// Submit task to seperate ThreadPool for avoiding block bthread working pthread
|
||||
ThreadPool* task_pool = StorageEngine::instance()->get_bg_multiget_threadpool();
|
||||
Status submit_st = task_pool->submit_func([request, response, done, this]() {
|
||||
// multi get data by rowid
|
||||
MonotonicStopWatch watch;
|
||||
watch.start();
|
||||
brpc::ClosureGuard closure_guard(done);
|
||||
response->mutable_status()->set_status_code(0);
|
||||
Status st = _multi_get(request, response);
|
||||
st.to_protobuf(response->mutable_status());
|
||||
LOG(INFO) << "multiget_data finished, cost(us):" << watch.elapsed_time() / 1000;
|
||||
});
|
||||
if (!submit_st.ok()) {
|
||||
submit_st.to_protobuf(response->mutable_status());
|
||||
done->Run();
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
|
||||
Reference in New Issue
Block a user