[refine](profileV2) use task dependency in profile and print pipelinetask index (#26059)
This commit is contained in:
@ -249,6 +249,8 @@ public:
|
||||
|
||||
TUniqueId instance_id() const { return _state->fragment_instance_id(); }
|
||||
|
||||
void set_parent_profile(RuntimeProfile* profile) { _parent_profile = profile; }
|
||||
|
||||
protected:
|
||||
void _finish_p_dependency() {
|
||||
for (const auto& p : _pipeline->_parents) {
|
||||
|
||||
@ -450,6 +450,29 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
|
||||
* and JoinProbeOperator2.
|
||||
*/
|
||||
|
||||
// First, set up the parent profile,
|
||||
// then prepare the task profile and add it to operator_id_to_task_profile.
|
||||
std::vector<RuntimeProfile*> operator_id_to_task_profile(
|
||||
max_operator_id(), _runtime_states[i]->runtime_profile());
|
||||
auto prepare_and_set_parent_profile = [&](PipelineXTask* task) {
|
||||
auto sink = task->sink();
|
||||
const auto& dests_id = sink->dests_id();
|
||||
int dest_id = dests_id.front();
|
||||
DCHECK(dest_id < operator_id_to_task_profile.size());
|
||||
task->set_parent_profile(operator_id_to_task_profile[dest_id]);
|
||||
|
||||
RETURN_IF_ERROR(task->prepare(_runtime_states[i].get(), local_params,
|
||||
request.fragment.output_sink));
|
||||
|
||||
for (auto o : task->operatorXs()) {
|
||||
int id = o->operator_id();
|
||||
DCHECK(id < operator_id_to_task_profile.size());
|
||||
auto* op_local_state = _runtime_states[i].get()->get_local_state(o->operator_id());
|
||||
operator_id_to_task_profile[id] = op_local_state->profile();
|
||||
}
|
||||
return Status::OK();
|
||||
};
|
||||
|
||||
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
|
||||
auto task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
|
||||
DCHECK(task != nullptr);
|
||||
@ -462,8 +485,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
|
||||
pipeline_id_to_task[dep]->get_downstream_dependency());
|
||||
}
|
||||
}
|
||||
RETURN_IF_ERROR(task->prepare(_runtime_states[i].get(), local_params,
|
||||
request.fragment.output_sink));
|
||||
RETURN_IF_ERROR(prepare_and_set_parent_profile(task));
|
||||
}
|
||||
|
||||
{
|
||||
|
||||
@ -135,7 +135,7 @@ Status PipelineXTask::extract_dependencies() {
|
||||
|
||||
void PipelineXTask::_init_profile() {
|
||||
std::stringstream ss;
|
||||
ss << "PipelineTask"
|
||||
ss << "PipelineXTask"
|
||||
<< " (index=" << _index << ")";
|
||||
auto* task_profile = new RuntimeProfile(ss.str());
|
||||
_parent_profile->add_child(task_profile, true, nullptr);
|
||||
|
||||
@ -151,6 +151,12 @@ public:
|
||||
|
||||
void push_blocked_task_to_dependency(Dependency* dep) {}
|
||||
|
||||
DataSinkOperatorXPtr sink() const { return _sink; }
|
||||
|
||||
OperatorXPtr source() const { return _source; }
|
||||
|
||||
OperatorXs operatorXs() { return _operators; }
|
||||
|
||||
private:
|
||||
void set_close_pipeline_time() override {}
|
||||
void _init_profile() override;
|
||||
|
||||
Reference in New Issue
Block a user