diff --git a/be/src/common/config.h b/be/src/common/config.h index b823aff817..32ef393978 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -322,6 +322,9 @@ CONF_mInt64(cumulative_compaction_max_deltas, "100"); // This config can be set to limit thread number in segcompaction thread pool. CONF_mInt32(seg_compaction_max_threads, "10"); +// This config can be set to limit thread number in multiget thread pool. +CONF_mInt32(multi_get_max_threads, "10"); + // The upper limit of "permits" held by all compaction tasks. This config can be set to limit memory consumption for compaction. CONF_mInt64(total_permits_for_compaction_score, "10000"); diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 2bfffbbf29..3038fadcc1 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -103,6 +103,10 @@ Status StorageEngine::start_bg_threads() { .set_max_threads(max_checkpoint_thread_num) .build(&_tablet_meta_checkpoint_thread_pool); + ThreadPoolBuilder("MultiGetTaskThreadPool") + .set_min_threads(config::multi_get_max_threads) + .set_max_threads(config::multi_get_max_threads) + .build(&_bg_multi_get_thread_pool); RETURN_IF_ERROR(Thread::create( "StorageEngine", "tablet_checkpoint_tasks_producer_thread", [this, data_dirs]() { this->_tablet_checkpoint_callback(data_dirs); }, diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index b0a6b63b41..c89d6723a1 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -193,6 +193,7 @@ public: return _tablet_publish_txn_thread_pool; } bool stopped() { return _stopped; } + ThreadPool* get_bg_multiget_threadpool() { return _bg_multi_get_thread_pool.get(); } private: // Instance should be inited from `static open()` @@ -374,6 +375,7 @@ private: std::unique_ptr _tablet_publish_txn_thread_pool; std::unique_ptr _tablet_meta_checkpoint_thread_pool; + std::unique_ptr _bg_multi_get_thread_pool; CompactionPermitLimiter _permit_limiter; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index a8f4f913a1..68718d3956 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -1097,14 +1097,22 @@ void PInternalServiceImpl::multiget_data(google::protobuf::RpcController* contro const PMultiGetRequest* request, PMultiGetResponse* response, google::protobuf::Closure* done) { - // multi get data by rowid - MonotonicStopWatch watch; - watch.start(); - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(0); - Status st = _multi_get(request, response); - st.to_protobuf(response->mutable_status()); - LOG(INFO) << "multiget_data finished, cost(us):" << watch.elapsed_time() / 1000; + // Submit task to seperate ThreadPool for avoiding block bthread working pthread + ThreadPool* task_pool = StorageEngine::instance()->get_bg_multiget_threadpool(); + Status submit_st = task_pool->submit_func([request, response, done, this]() { + // multi get data by rowid + MonotonicStopWatch watch; + watch.start(); + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(0); + Status st = _multi_get(request, response); + st.to_protobuf(response->mutable_status()); + LOG(INFO) << "multiget_data finished, cost(us):" << watch.elapsed_time() / 1000; + }); + if (!submit_st.ok()) { + submit_st.to_protobuf(response->mutable_status()); + done->Run(); + } } } // namespace doris