This commit is contained in:
@ -492,17 +492,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
|
||||
_total_tasks = 0;
|
||||
int target_size = request.local_params.size();
|
||||
_tasks.resize(target_size);
|
||||
auto& pipeline_id_to_profile = _runtime_state->pipeline_id_to_profile();
|
||||
DCHECK(pipeline_id_to_profile.empty());
|
||||
pipeline_id_to_profile.resize(_pipelines.size());
|
||||
{
|
||||
size_t pip_idx = 0;
|
||||
for (auto& pipeline_profile : pipeline_id_to_profile) {
|
||||
pipeline_profile =
|
||||
std::make_unique<RuntimeProfile>("Pipeline : " + std::to_string(pip_idx));
|
||||
pip_idx++;
|
||||
}
|
||||
}
|
||||
auto& pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size());
|
||||
|
||||
for (size_t i = 0; i < target_size; i++) {
|
||||
const auto& local_params = request.local_params[i];
|
||||
|
||||
@ -562,4 +562,44 @@ bool RuntimeState::is_nereids() const {
|
||||
return _query_ctx->is_nereids();
|
||||
}
|
||||
|
||||
std::vector<std::shared_ptr<RuntimeProfile>> RuntimeState::pipeline_id_to_profile() {
|
||||
std::shared_lock lc(_pipeline_profile_lock);
|
||||
std::vector<std::shared_ptr<RuntimeProfile>> pipelines_profile;
|
||||
pipelines_profile.reserve(_pipeline_id_to_profile.size());
|
||||
// The sort here won't change the structure of _pipeline_id_to_profile;
|
||||
// it sorts the children of each element in sort_pipeline_id_to_profile,
|
||||
// and these children are locked.
|
||||
for (auto& pipeline_profile : _pipeline_id_to_profile) {
|
||||
DCHECK(pipeline_profile);
|
||||
// pipeline 0
|
||||
// pipeline task 0
|
||||
// pipeline task 1
|
||||
// pipleine task 2
|
||||
// .......
|
||||
// sort by pipeline task total time
|
||||
pipeline_profile->sort_children_by_total_time();
|
||||
pipelines_profile.push_back(pipeline_profile);
|
||||
}
|
||||
return pipelines_profile;
|
||||
}
|
||||
|
||||
std::vector<std::shared_ptr<RuntimeProfile>>& RuntimeState::build_pipeline_profile(
|
||||
std::size_t pipeline_size) {
|
||||
std::unique_lock lc(_pipeline_profile_lock);
|
||||
if (!_pipeline_id_to_profile.empty()) {
|
||||
throw Exception(ErrorCode::INTERNAL_ERROR,
|
||||
"build_pipeline_profile can only be called once.");
|
||||
}
|
||||
_pipeline_id_to_profile.resize(pipeline_size);
|
||||
{
|
||||
size_t pip_idx = 0;
|
||||
for (auto& pipeline_profile : _pipeline_id_to_profile) {
|
||||
pipeline_profile =
|
||||
std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_idx));
|
||||
pip_idx++;
|
||||
}
|
||||
}
|
||||
return _pipeline_id_to_profile;
|
||||
}
|
||||
|
||||
} // end namespace doris
|
||||
|
||||
@ -30,6 +30,7 @@
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <shared_mutex>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
@ -569,18 +570,9 @@ public:
|
||||
|
||||
void resize_op_id_to_local_state(int operator_size);
|
||||
|
||||
auto& pipeline_id_to_profile() {
|
||||
for (auto& pipeline_profile : _pipeline_id_to_profile) {
|
||||
// pipeline 0
|
||||
// pipeline task 0
|
||||
// pipeline task 1
|
||||
// pipleine task 2
|
||||
// .......
|
||||
// sort by pipeline task total time
|
||||
pipeline_profile->sort_children_by_total_time();
|
||||
}
|
||||
return _pipeline_id_to_profile;
|
||||
}
|
||||
std::vector<std::shared_ptr<RuntimeProfile>> pipeline_id_to_profile();
|
||||
|
||||
std::vector<std::shared_ptr<RuntimeProfile>>& build_pipeline_profile(std::size_t pipeline_size);
|
||||
|
||||
void set_task_execution_context(std::shared_ptr<TaskExecutionContext> context) {
|
||||
_task_execution_context_inited = true;
|
||||
@ -757,7 +749,9 @@ private:
|
||||
// true if max_filter_ratio is 0
|
||||
bool _load_zero_tolerance = false;
|
||||
|
||||
std::vector<std::unique_ptr<RuntimeProfile>> _pipeline_id_to_profile;
|
||||
// only to lock _pipeline_id_to_profile
|
||||
std::shared_mutex _pipeline_profile_lock;
|
||||
std::vector<std::shared_ptr<RuntimeProfile>> _pipeline_id_to_profile;
|
||||
|
||||
// prohibit copies
|
||||
RuntimeState(const RuntimeState&);
|
||||
|
||||
Reference in New Issue
Block a user