From 4b5ca106efda8b5968f2a4e08d063be32f004686 Mon Sep 17 00:00:00 2001 From: Mryange <59914473+Mryange@users.noreply.github.com> Date: Mon, 30 Oct 2023 18:40:04 +0800 Subject: [PATCH] [refine](profileV2) use task dependency in profile and print pipelinetask index (#26059) --- be/src/pipeline/pipeline_task.h | 2 ++ .../pipeline_x_fragment_context.cpp | 26 +++++++++++++++++-- .../pipeline/pipeline_x/pipeline_x_task.cpp | 2 +- be/src/pipeline/pipeline_x/pipeline_x_task.h | 6 +++++ 4 files changed, 33 insertions(+), 3 deletions(-) diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index ef923868a5..690c4e3419 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -249,6 +249,8 @@ public: TUniqueId instance_id() const { return _state->fragment_instance_id(); } + void set_parent_profile(RuntimeProfile* profile) { _parent_profile = profile; } + protected: void _finish_p_dependency() { for (const auto& p : _pipeline->_parents) { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 5ca5829e1a..06659eb23e 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -450,6 +450,29 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( * and JoinProbeOperator2. */ + // First, set up the parent profile, + // then prepare the task profile and add it to operator_id_to_task_profile. + std::vector operator_id_to_task_profile( + max_operator_id(), _runtime_states[i]->runtime_profile()); + auto prepare_and_set_parent_profile = [&](PipelineXTask* task) { + auto sink = task->sink(); + const auto& dests_id = sink->dests_id(); + int dest_id = dests_id.front(); + DCHECK(dest_id < operator_id_to_task_profile.size()); + task->set_parent_profile(operator_id_to_task_profile[dest_id]); + + RETURN_IF_ERROR(task->prepare(_runtime_states[i].get(), local_params, + request.fragment.output_sink)); + + for (auto o : task->operatorXs()) { + int id = o->operator_id(); + DCHECK(id < operator_id_to_task_profile.size()); + auto* op_local_state = _runtime_states[i].get()->get_local_state(o->operator_id()); + operator_id_to_task_profile[id] = op_local_state->profile(); + } + return Status::OK(); + }; + for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { auto task = pipeline_id_to_task[_pipelines[pip_idx]->id()]; DCHECK(task != nullptr); @@ -462,8 +485,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( pipeline_id_to_task[dep]->get_downstream_dependency()); } } - RETURN_IF_ERROR(task->prepare(_runtime_states[i].get(), local_params, - request.fragment.output_sink)); + RETURN_IF_ERROR(prepare_and_set_parent_profile(task)); } { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 9ad91341ce..140fecbb1d 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -135,7 +135,7 @@ Status PipelineXTask::extract_dependencies() { void PipelineXTask::_init_profile() { std::stringstream ss; - ss << "PipelineTask" + ss << "PipelineXTask" << " (index=" << _index << ")"; auto* task_profile = new RuntimeProfile(ss.str()); _parent_profile->add_child(task_profile, true, nullptr); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index 5155c2fe76..3a33431192 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -151,6 +151,12 @@ public: void push_blocked_task_to_dependency(Dependency* dep) {} + DataSinkOperatorXPtr sink() const { return _sink; } + + OperatorXPtr source() const { return _source; } + + OperatorXs operatorXs() { return _operators; } + private: void set_close_pipeline_time() override {} void _init_profile() override;