diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 5d2ab598b3..84d0684f17 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -239,7 +239,7 @@ Status ExecEnv::_init(const std::vector& store_paths, new BrpcClientCache(config::function_service_protocol); _stream_load_executor = StreamLoadExecutor::create_shared(this); _routine_load_task_executor = new RoutineLoadTaskExecutor(this); - RETURN_IF_ERROR(_routine_load_task_executor->init()); + RETURN_IF_ERROR(_routine_load_task_executor->init(MemInfo::mem_limit())); _small_file_mgr = new SmallFileMgr(this, config::small_file_dir); _block_spill_mgr = new BlockSpillManager(store_paths); _group_commit_mgr = new GroupCommitMgr(this); @@ -540,7 +540,7 @@ void ExecEnv::init_mem_tracker() { _s3_file_buffer_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "S3FileBuffer"); _stream_load_pipe_tracker = - MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "StreamLoadPipe"); + MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::LOAD, "StreamLoadPipe"); } void ExecEnv::_register_metrics() { diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index 9b475ed213..e12ef7ff6d 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -75,7 +75,8 @@ RoutineLoadTaskExecutor::~RoutineLoadTaskExecutor() { _task_map.clear(); } -Status RoutineLoadTaskExecutor::init() { +Status RoutineLoadTaskExecutor::init(int64_t process_mem_limit) { + _load_mem_limit = process_mem_limit * config::load_process_max_memory_limit_percent / 100; return ThreadPoolBuilder("routine_load") .set_min_threads(0) .set_max_threads(config::max_routine_load_thread_pool_size) @@ -210,7 +211,7 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { return Status::OK(); } - if (_task_map.size() >= config::max_routine_load_thread_pool_size) { + if (_task_map.size() >= config::max_routine_load_thread_pool_size || _reach_memory_limit()) { LOG(INFO) << "too many tasks in thread pool. reject task: " << UniqueId(task.id) << ", job id: " << task.job_id << ", queue size: " << _thread_pool->get_queue_size() @@ -305,6 +306,19 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { } } +bool RoutineLoadTaskExecutor::_reach_memory_limit() { + bool is_exceed_soft_mem_limit = GlobalMemoryArbitrator::is_exceed_soft_mem_limit(); + auto current_load_mem_value = + MemTrackerLimiter::TypeMemSum[MemTrackerLimiter::Type::LOAD]->current_value(); + if (is_exceed_soft_mem_limit || current_load_mem_value > _load_mem_limit) { + LOG(INFO) << "is_exceed_soft_mem_limit: " << is_exceed_soft_mem_limit + << " current_load_mem_value: " << current_load_mem_value + << " _load_mem_limit: " << _load_mem_limit; + return true; + } + return false; +} + void RoutineLoadTaskExecutor::exec_task(std::shared_ptr ctx, DataConsumerPool* consumer_pool, ExecFinishCallback cb) { #define HANDLE_ERROR(stmt, err_msg) \ diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h b/be/src/runtime/routine_load/routine_load_task_executor.h index f16ef80ef7..0e597d796c 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.h +++ b/be/src/runtime/routine_load/routine_load_task_executor.h @@ -51,7 +51,7 @@ public: ~RoutineLoadTaskExecutor(); - Status init(); + Status init(int64_t process_mem_limit); void stop(); @@ -86,6 +86,7 @@ private: // create a dummy StreamLoadContext for PKafkaMetaProxyRequest Status _prepare_ctx(const PKafkaMetaProxyRequest& request, std::shared_ptr ctx); + bool _reach_memory_limit(); private: ExecEnv* _exec_env = nullptr; @@ -95,6 +96,8 @@ private: std::mutex _lock; // task id -> load context std::unordered_map> _task_map; + + int64_t _load_mem_limit = -1; }; } // namespace doris diff --git a/be/test/runtime/routine_load_task_executor_test.cpp b/be/test/runtime/routine_load_task_executor_test.cpp index f95fdcfdad..338b82c6eb 100644 --- a/be/test/runtime/routine_load_task_executor_test.cpp +++ b/be/test/runtime/routine_load_task_executor_test.cpp @@ -94,7 +94,7 @@ TEST_F(RoutineLoadTaskExecutorTest, exec_task) { RoutineLoadTaskExecutor executor(&_env); Status st; - st = executor.init(); + st = executor.init(1024 * 1024); EXPECT_TRUE(st.ok()); // submit task st = executor.submit_task(task);