diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 5c92091e3b..0fcf86978c 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -492,17 +492,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( _total_tasks = 0; int target_size = request.local_params.size(); _tasks.resize(target_size); - auto& pipeline_id_to_profile = _runtime_state->pipeline_id_to_profile(); - DCHECK(pipeline_id_to_profile.empty()); - pipeline_id_to_profile.resize(_pipelines.size()); - { - size_t pip_idx = 0; - for (auto& pipeline_profile : pipeline_id_to_profile) { - pipeline_profile = - std::make_unique("Pipeline : " + std::to_string(pip_idx)); - pip_idx++; - } - } + auto& pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size()); for (size_t i = 0; i < target_size; i++) { const auto& local_params = request.local_params[i]; diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 75d06adc56..b158ad0b43 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -562,4 +562,44 @@ bool RuntimeState::is_nereids() const { return _query_ctx->is_nereids(); } +std::vector> RuntimeState::pipeline_id_to_profile() { + std::shared_lock lc(_pipeline_profile_lock); + std::vector> pipelines_profile; + pipelines_profile.reserve(_pipeline_id_to_profile.size()); + // The sort here won't change the structure of _pipeline_id_to_profile; + // it sorts the children of each element in sort_pipeline_id_to_profile, + // and these children are locked. + for (auto& pipeline_profile : _pipeline_id_to_profile) { + DCHECK(pipeline_profile); + // pipeline 0 + // pipeline task 0 + // pipeline task 1 + // pipleine task 2 + // ....... + // sort by pipeline task total time + pipeline_profile->sort_children_by_total_time(); + pipelines_profile.push_back(pipeline_profile); + } + return pipelines_profile; +} + +std::vector>& RuntimeState::build_pipeline_profile( + std::size_t pipeline_size) { + std::unique_lock lc(_pipeline_profile_lock); + if (!_pipeline_id_to_profile.empty()) { + throw Exception(ErrorCode::INTERNAL_ERROR, + "build_pipeline_profile can only be called once."); + } + _pipeline_id_to_profile.resize(pipeline_size); + { + size_t pip_idx = 0; + for (auto& pipeline_profile : _pipeline_id_to_profile) { + pipeline_profile = + std::make_shared("Pipeline : " + std::to_string(pip_idx)); + pip_idx++; + } + } + return _pipeline_id_to_profile; +} + } // end namespace doris diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 38ef5ff245..e01eb6166d 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -569,18 +570,9 @@ public: void resize_op_id_to_local_state(int operator_size); - auto& pipeline_id_to_profile() { - for (auto& pipeline_profile : _pipeline_id_to_profile) { - // pipeline 0 - // pipeline task 0 - // pipeline task 1 - // pipleine task 2 - // ....... - // sort by pipeline task total time - pipeline_profile->sort_children_by_total_time(); - } - return _pipeline_id_to_profile; - } + std::vector> pipeline_id_to_profile(); + + std::vector>& build_pipeline_profile(std::size_t pipeline_size); void set_task_execution_context(std::shared_ptr context) { _task_execution_context_inited = true; @@ -757,7 +749,9 @@ private: // true if max_filter_ratio is 0 bool _load_zero_tolerance = false; - std::vector> _pipeline_id_to_profile; + // only to lock _pipeline_id_to_profile + std::shared_mutex _pipeline_profile_lock; + std::vector> _pipeline_id_to_profile; // prohibit copies RuntimeState(const RuntimeState&);