[pipeline](task_queue) remove disable steal in task queue to speed up query (#21692)
TPCH Q9 before: 2.74s after: 2.33s
This commit is contained in:
@ -49,10 +49,7 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
|
||||
public:
|
||||
Pipeline() = delete;
|
||||
explicit Pipeline(PipelineId pipeline_id, std::weak_ptr<PipelineFragmentContext> context)
|
||||
: _complete_dependency(0),
|
||||
_pipeline_id(pipeline_id),
|
||||
_context(context),
|
||||
_can_steal(true) {
|
||||
: _complete_dependency(0), _pipeline_id(pipeline_id), _context(context) {
|
||||
_init_profile();
|
||||
}
|
||||
|
||||
@ -84,8 +81,6 @@ public:
|
||||
|
||||
RuntimeProfile* pipeline_profile() { return _pipeline_profile.get(); }
|
||||
|
||||
void disable_task_steal() { _can_steal = false; }
|
||||
|
||||
private:
|
||||
void _init_profile();
|
||||
std::atomic<uint32_t> _complete_dependency;
|
||||
@ -98,7 +93,6 @@ private:
|
||||
|
||||
PipelineId _pipeline_id;
|
||||
std::weak_ptr<PipelineFragmentContext> _context;
|
||||
bool _can_steal;
|
||||
int _previous_schedule_id = -1;
|
||||
|
||||
std::unique_ptr<RuntimeProfile> _pipeline_profile;
|
||||
|
||||
@ -603,7 +603,6 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
|
||||
OperatorBuilderPtr join_sink =
|
||||
std::make_shared<HashJoinBuildSinkBuilder>(next_operator_builder_id(), join_node);
|
||||
RETURN_IF_ERROR(new_pipe->set_sink(join_sink));
|
||||
new_pipe->disable_task_steal();
|
||||
|
||||
RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
|
||||
OperatorBuilderPtr join_source = std::make_shared<HashJoinProbeOperatorBuilder>(
|
||||
|
||||
@ -52,7 +52,6 @@ PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t index, RuntimeState*
|
||||
_sink(sink),
|
||||
_prepared(false),
|
||||
_opened(false),
|
||||
_can_steal(pipeline->_can_steal),
|
||||
_state(state),
|
||||
_cur_state(PipelineTaskState::NOT_READY),
|
||||
_data_state(SourceState::DEPEND_ON_SOURCE),
|
||||
|
||||
@ -144,8 +144,6 @@ public:
|
||||
|
||||
bool sink_can_write() { return _sink->can_write(); }
|
||||
|
||||
bool can_steal() const { return _can_steal; }
|
||||
|
||||
Status finalize();
|
||||
|
||||
PipelineFragmentContext* fragment_context() { return _fragment_context; }
|
||||
@ -214,7 +212,6 @@ private:
|
||||
|
||||
bool _prepared;
|
||||
bool _opened;
|
||||
bool _can_steal;
|
||||
RuntimeState* _state;
|
||||
int _previous_schedule_id = -1;
|
||||
uint32_t _schedule_time = 0;
|
||||
|
||||
@ -34,9 +34,6 @@ PipelineTask* SubTaskQueue::try_take(bool is_steal) {
|
||||
return nullptr;
|
||||
}
|
||||
auto task = _queue.front();
|
||||
if (!task->can_steal() && is_steal) {
|
||||
return nullptr;
|
||||
}
|
||||
_queue.pop();
|
||||
return task;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user