[feature](executor)make scan task wait timeout config #28467
This commit is contained in:
@ -967,6 +967,7 @@ DEFINE_Bool(enable_debug_points, "false");
|
||||
|
||||
DEFINE_Int32(pipeline_executor_size, "0");
|
||||
DEFINE_Bool(enable_workload_group_for_scan, "false");
|
||||
DEFINE_mInt64(workload_group_scan_task_wait_timeout_ms, "10000");
|
||||
// 128 MB
|
||||
DEFINE_mInt64(local_exchange_buffer_mem_limit, "134217728");
|
||||
|
||||
|
||||
@ -1021,6 +1021,7 @@ DECLARE_Bool(enable_debug_points);
|
||||
|
||||
DECLARE_Int32(pipeline_executor_size);
|
||||
DECLARE_Bool(enable_workload_group_for_scan);
|
||||
DECLARE_mInt64(workload_group_scan_task_wait_timeout_ms);
|
||||
|
||||
// Temp config. True to use optimization for bitmap_index apply predicate except leaf node of the and node.
|
||||
// Will remove after fully test.
|
||||
|
||||
@ -70,11 +70,14 @@ bool ScanTaskTaskGroupQueue::take(ScanTask* scan_task) {
|
||||
return false;
|
||||
}
|
||||
if (_group_entities.empty()) {
|
||||
_wait_task.wait_for(lock, std::chrono::milliseconds(WAIT_CORE_TASK_TIMEOUT_MS * 5));
|
||||
_wait_task.wait_for(lock, std::chrono::milliseconds(
|
||||
config::workload_group_scan_task_wait_timeout_ms));
|
||||
} else {
|
||||
entity = _next_tg_entity();
|
||||
if (!entity) {
|
||||
_wait_task.wait_for(lock, std::chrono::milliseconds(WAIT_CORE_TASK_TIMEOUT_MS));
|
||||
_wait_task.wait_for(lock,
|
||||
std::chrono::milliseconds(
|
||||
config::workload_group_scan_task_wait_timeout_ms));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -82,7 +85,8 @@ bool ScanTaskTaskGroupQueue::take(ScanTask* scan_task) {
|
||||
if (entity->task_size() == 1) {
|
||||
_dequeue_task_group(entity);
|
||||
}
|
||||
return entity->task_queue()->try_get(scan_task, WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */);
|
||||
return entity->task_queue()->try_get(
|
||||
scan_task, config::workload_group_scan_task_wait_timeout_ms /* timeout_ms */);
|
||||
}
|
||||
|
||||
bool ScanTaskTaskGroupQueue::push_back(ScanTask scan_task) {
|
||||
|
||||
@ -28,7 +28,6 @@ class ScannerContext;
|
||||
namespace taskgroup {
|
||||
|
||||
using WorkFunction = std::function<void()>;
|
||||
static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100;
|
||||
|
||||
// Like PriorityThreadPool::Task
|
||||
struct ScanTask {
|
||||
|
||||
Reference in New Issue
Block a user