diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 03e04274ed..f286c33d54 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -967,6 +967,7 @@ DEFINE_Bool(enable_debug_points, "false"); DEFINE_Int32(pipeline_executor_size, "0"); DEFINE_Bool(enable_workload_group_for_scan, "false"); +DEFINE_mInt64(workload_group_scan_task_wait_timeout_ms, "10000"); // 128 MB DEFINE_mInt64(local_exchange_buffer_mem_limit, "134217728"); diff --git a/be/src/common/config.h b/be/src/common/config.h index c24af54fa6..4054b315aa 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1021,6 +1021,7 @@ DECLARE_Bool(enable_debug_points); DECLARE_Int32(pipeline_executor_size); DECLARE_Bool(enable_workload_group_for_scan); +DECLARE_mInt64(workload_group_scan_task_wait_timeout_ms); // Temp config. True to use optimization for bitmap_index apply predicate except leaf node of the and node. // Will remove after fully test. diff --git a/be/src/vec/exec/scan/scan_task_queue.cpp b/be/src/vec/exec/scan/scan_task_queue.cpp index 89235b6b7a..edf3f6f767 100644 --- a/be/src/vec/exec/scan/scan_task_queue.cpp +++ b/be/src/vec/exec/scan/scan_task_queue.cpp @@ -70,11 +70,14 @@ bool ScanTaskTaskGroupQueue::take(ScanTask* scan_task) { return false; } if (_group_entities.empty()) { - _wait_task.wait_for(lock, std::chrono::milliseconds(WAIT_CORE_TASK_TIMEOUT_MS * 5)); + _wait_task.wait_for(lock, std::chrono::milliseconds( + config::workload_group_scan_task_wait_timeout_ms)); } else { entity = _next_tg_entity(); if (!entity) { - _wait_task.wait_for(lock, std::chrono::milliseconds(WAIT_CORE_TASK_TIMEOUT_MS)); + _wait_task.wait_for(lock, + std::chrono::milliseconds( + config::workload_group_scan_task_wait_timeout_ms)); } } } @@ -82,7 +85,8 @@ bool ScanTaskTaskGroupQueue::take(ScanTask* scan_task) { if (entity->task_size() == 1) { _dequeue_task_group(entity); } - return entity->task_queue()->try_get(scan_task, WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */); + return entity->task_queue()->try_get( + scan_task, config::workload_group_scan_task_wait_timeout_ms /* timeout_ms */); } bool ScanTaskTaskGroupQueue::push_back(ScanTask scan_task) { diff --git a/be/src/vec/exec/scan/scan_task_queue.h b/be/src/vec/exec/scan/scan_task_queue.h index aeda9a4adc..6ee339c785 100644 --- a/be/src/vec/exec/scan/scan_task_queue.h +++ b/be/src/vec/exec/scan/scan_task_queue.h @@ -28,7 +28,6 @@ class ScannerContext; namespace taskgroup { using WorkFunction = std::function; -static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100; // Like PriorityThreadPool::Task struct ScanTask {