diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp index e02c8fed7f..ea2869f8b6 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp @@ -23,4 +23,14 @@ OperatorPtr MultiCastDataStreamSinkOperatorBuilder::build_operator() { return std::make_shared(this, _sink); } +std::string MultiCastDataStreamSinkLocalState::id_name() { + auto& sinks = static_cast(_parent)->sink_node().sinks; + std::string id_name = " (dst id : "; + for (auto& sink : sinks) { + id_name += std::to_string(sink.dest_node_id) + ","; + } + id_name += ")"; + return id_name; +} + } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.h b/be/src/pipeline/exec/multi_cast_data_stream_sink.h index ea5a155319..bb0c03224c 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_sink.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.h @@ -51,6 +51,7 @@ class MultiCastDataStreamSinkLocalState final friend class DataSinkOperatorX; using Base = PipelineXSinkLocalState; using Parent = MultiCastDataStreamSinkOperatorX; + std::string id_name() override; private: std::shared_ptr _multi_cast_data_streamer; @@ -65,10 +66,11 @@ public: const int cast_sender_count, ObjectPool* pool, const TMultiCastDataStreamSink& sink, const RowDescriptor& row_desc) - : Base(sink_id, sources), + : Base(sink_id, -1, sources), _pool(pool), _row_desc(row_desc), - _cast_sender_count(cast_sender_count) {} + _cast_sender_count(cast_sender_count), + _sink(sink) {} ~MultiCastDataStreamSinkOperatorX() override = default; Status sink(RuntimeState* state, vectorized::Block* in_block, @@ -96,12 +98,14 @@ public: _row_desc, _pool, _cast_sender_count); return multi_cast_data_streamer; } + const TMultiCastDataStreamSink& sink_node() { return _sink; } private: friend class MultiCastDataStreamSinkLocalState; ObjectPool* _pool; RowDescriptor _row_desc; int _cast_sender_count; + const TMultiCastDataStreamSink& _sink; friend class MultiCastDataStreamSinkLocalState; }; diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h index 98ab33ff1a..6377b5ef16 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -116,7 +116,7 @@ public: MultiCastDataStreamerSourceOperatorX(const int consumer_id, ObjectPool* pool, const TDataStreamSink& sink, const RowDescriptor& row_descriptor, int id) - : Base(pool, id), + : Base(pool, -1, id), _consumer_id(consumer_id), _t_data_stream_sink(sink), _row_descriptor(row_descriptor) { diff --git a/be/src/pipeline/exec/union_sink_operator.cpp b/be/src/pipeline/exec/union_sink_operator.cpp index 7dc369a4bf..0f09b375f6 100644 --- a/be/src/pipeline/exec/union_sink_operator.cpp +++ b/be/src/pipeline/exec/union_sink_operator.cpp @@ -108,7 +108,7 @@ Status UnionSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) UnionSinkOperatorX::UnionSinkOperatorX(int child_id, int sink_id, ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : Base(sink_id, tnode.node_id), + : Base(sink_id, tnode.node_id, tnode.node_id), _first_materialized_child_idx(tnode.union_node.first_materialized_child_idx), _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples), _cur_child_id(child_id), diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 3dd77167f3..be573c4a89 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -57,6 +57,7 @@ #include "pipeline/exec/union_sink_operator.h" #include "pipeline/exec/union_source_operator.h" #include "util/debug_util.h" +#include "util/runtime_profile.h" namespace doris::pipeline { @@ -356,8 +357,9 @@ Status OperatorX::setup_local_states(RuntimeState* state, template Status PipelineXLocalState::init(RuntimeState* state, LocalStateInfo& info) { _runtime_profile.reset(new RuntimeProfile(_parent->get_name() + - " (id=" + std::to_string(_parent->id()) + ")")); - _runtime_profile->set_metadata(_parent->id()); + " (id=" + std::to_string(_parent->node_id()) + ")")); + _runtime_profile->set_metadata(_parent->node_id()); + _runtime_profile->set_is_sink(false); info.parent_profile->add_child(_runtime_profile.get(), true, nullptr); if constexpr (!std::is_same_v) { _dependency = (DependencyType*)info.dependency; @@ -376,11 +378,13 @@ Status PipelineXLocalState::init(RuntimeState* state, LocalState for (size_t i = 0; i < _projections.size(); i++) { RETURN_IF_ERROR(_parent->_projections[i]->clone(state, _projections[i])); } - _rows_returned_counter = ADD_COUNTER(_runtime_profile, "RowsReturned", TUnit::UNIT); - _blocks_returned_counter = ADD_COUNTER(_runtime_profile, "BlocksReturned", TUnit::UNIT); - _projection_timer = ADD_TIMER(_runtime_profile, "ProjectionTime"); - _open_timer = ADD_TIMER(_runtime_profile, "OpenTime"); - _close_timer = ADD_TIMER(_runtime_profile, "CloseTime"); + _rows_returned_counter = + ADD_COUNTER_WITH_LEVEL(_runtime_profile, "RowsReturned", TUnit::UNIT, 1); + _blocks_returned_counter = + ADD_COUNTER_WITH_LEVEL(_runtime_profile, "BlocksReturned", TUnit::UNIT, 1); + _projection_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "ProjectionTime", 1); + _open_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "OpenTime", 1); + _close_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "CloseTime", 1); _rows_returned_rate = profile()->add_derived_counter( doris::ExecNode::ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND, std::bind(&RuntimeProfile::units_per_second, _rows_returned_counter, @@ -413,8 +417,9 @@ template Status PipelineXSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { // create profile - _profile = state->obj_pool()->add(new RuntimeProfile( - _parent->get_name() + " (id=" + std::to_string(_parent->id()) + ")")); + _profile = state->obj_pool()->add(new RuntimeProfile(_parent->get_name() + id_name())); + _profile->set_metadata(_parent->node_id()); + _profile->set_is_sink(true); if constexpr (!std::is_same_v) { _dependency = (DependencyType*)info.dependency; if (_dependency) { @@ -423,9 +428,9 @@ Status PipelineXSinkLocalState::init(RuntimeState* state, ADD_TIMER(_profile, "WaitForDependency[" + _dependency->name() + "]Time"); } } - _rows_input_counter = ADD_COUNTER(_profile, "InputRows", TUnit::UNIT); - _open_timer = ADD_TIMER(_profile, "OpenTime"); - _close_timer = ADD_TIMER(_profile, "CloseTime"); + _rows_input_counter = ADD_COUNTER_WITH_LEVEL(_profile, "InputRows", TUnit::UNIT, 1); + _open_timer = ADD_TIMER_WITH_LEVEL(_profile, "OpenTime", 1); + _close_timer = ADD_TIMER_WITH_LEVEL(_profile, "CloseTime", 1); info.parent_profile->add_child(_profile, true, nullptr); _mem_tracker = std::make_unique(_parent->get_name()); return Status::OK(); diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 149d28265e..f01119daef 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -104,7 +104,7 @@ public: RuntimeProfile::Counter* memory_used_counter() { return _memory_used_counter; } RuntimeProfile::Counter* projection_timer() { return _projection_timer; } RuntimeProfile::Counter* wait_for_dependency_timer() { return _wait_for_dependency_timer; } - RuntimeProfile::Counter* blocks_returned_counter() { return _rows_returned_counter; } + RuntimeProfile::Counter* blocks_returned_counter() { return _blocks_returned_counter; } OperatorXBase* parent() { return _parent; } RuntimeState* state() { return _state; } @@ -154,6 +154,7 @@ public: OperatorXBase(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : OperatorBase(nullptr), _id(tnode.node_id), + _node_id(tnode.node_id), _type(tnode.node_type), _pool(pool), _tuple_ids(tnode.row_tuples), @@ -165,7 +166,8 @@ public: } } - OperatorXBase(ObjectPool* pool, int id) : OperatorBase(nullptr), _id(id), _pool(pool) {}; + OperatorXBase(ObjectPool* pool, int node_id, int id) + : OperatorBase(nullptr), _id(id), _node_id(node_id), _pool(pool) {}; virtual Status init(const TPlanNode& tnode, RuntimeState* state); Status init(const TDataSink& tsink) override { LOG(FATAL) << "should not reach here!"; @@ -259,6 +261,7 @@ public: [[nodiscard]] RowDescriptor& row_descriptor() { return _row_descriptor; } [[nodiscard]] int id() const override { return _id; } + [[nodiscard]] int node_id() const { return _node_id; } [[nodiscard]] int64_t limit() const { return _limit; } @@ -279,7 +282,8 @@ protected: template friend class PipelineXLocalState; friend class PipelineXLocalStateBase; - int _id; // unique w/in single plan tree + int _id; + const int _node_id; // unique w/in single plan tree TPlanNodeType::type _type; ObjectPool* _pool; std::vector _tuple_ids; @@ -304,7 +308,7 @@ class OperatorX : public OperatorXBase { public: OperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : OperatorXBase(pool, tnode, descs) {} - OperatorX(ObjectPool* pool, int id) : OperatorXBase(pool, id) {}; + OperatorX(ObjectPool* pool, int node_id, int id) : OperatorXBase(pool, node_id, id) {}; ~OperatorX() override = default; Status setup_local_state(RuntimeState* state, LocalStateInfo& info) override; @@ -401,13 +405,14 @@ protected: class DataSinkOperatorXBase : public OperatorBase { public: - DataSinkOperatorXBase(const int id) : OperatorBase(nullptr), _id(id), _dests_id({id}) {} + DataSinkOperatorXBase(const int id) + : OperatorBase(nullptr), _id(id), _node_id(id), _dests_id({id}) {} - DataSinkOperatorXBase(const int id, const int dest_id) - : OperatorBase(nullptr), _id(id), _dests_id({dest_id}) {} + DataSinkOperatorXBase(const int id, const int node_id, const int dest_id) + : OperatorBase(nullptr), _id(id), _node_id(node_id), _dests_id({dest_id}) {} - DataSinkOperatorXBase(const int id, std::vector& sources) - : OperatorBase(nullptr), _id(id), _dests_id(sources) {} + DataSinkOperatorXBase(const int id, const int node_id, std::vector& sources) + : OperatorBase(nullptr), _id(id), _node_id(node_id), _dests_id(sources) {} ~DataSinkOperatorXBase() override = default; @@ -495,6 +500,8 @@ public: [[nodiscard]] const std::vector& dests_id() const { return _dests_id; } + [[nodiscard]] int node_id() const { return _node_id; } + [[nodiscard]] std::string get_name() const override { return _name; } Status finalize(RuntimeState* state) override { return Status::OK(); } @@ -504,9 +511,11 @@ public: protected: template friend class AsyncWriterSink; - + // _id : the current Operator's ID, which is not visible to the user. + // _node_id : the plan node ID corresponding to the Operator, which is visible on the profile. + // _dests_id : the target ID of the sink, for example, in the case of a multi-sink, there are multiple targets. const int _id; - + const int _node_id; std::vector _dests_id; std::string _name; @@ -521,10 +530,11 @@ class DataSinkOperatorX : public DataSinkOperatorXBase { public: DataSinkOperatorX(const int id) : DataSinkOperatorXBase(id) {} - DataSinkOperatorX(const int id, const int source_id) : DataSinkOperatorXBase(id, source_id) {} + DataSinkOperatorX(const int id, const int node_id, const int source_id) + : DataSinkOperatorXBase(id, node_id, source_id) {} - DataSinkOperatorX(const int id, std::vector sources) - : DataSinkOperatorXBase(id, sources) {} + DataSinkOperatorX(const int id, const int node_id, std::vector sources) + : DataSinkOperatorXBase(id, node_id, sources) {} ~DataSinkOperatorX() override = default; Status setup_local_states(RuntimeState* state, std::vector& infos) override; @@ -552,6 +562,8 @@ public: [[nodiscard]] std::string debug_string(int indentation_level) const override; typename DependencyType::SharedState*& get_shared_state() { return _shared_state; } + virtual std::string id_name() { return " (id=" + std::to_string(_parent->node_id()) + ")"; } + protected: DependencyType* _dependency = nullptr; typename DependencyType::SharedState* _shared_state; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 64ece0baf2..3cc8bbb36e 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -297,14 +297,12 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData DCHECK(thrift_sink.__isset.multi_cast_stream_sink); DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0); // TODO: figure out good buffer size based on size of output row - /// TODO: Here is a magic number, and we will refactor this part later. - static int sink_count = 120000; - auto sink_id = sink_count++; + auto sink_id = next_operator_id(); auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size(); // one sink has multiple sources. std::vector sources; for (int i = 0; i < sender_size; ++i) { - auto source_id = sink_count++; + auto source_id = next_operator_id(); sources.push_back(source_id); } @@ -657,7 +655,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN PipelinePtr build_side_pipe = add_pipeline(); _dag[downstream_pipeline_id].push_back(build_side_pipe->id()); DataSinkOperatorXPtr sink; - sink.reset(new UnionSinkOperatorX(i, father_id + 1000 * (i + 1), pool, tnode, descs)); + sink.reset(new UnionSinkOperatorX(i, next_operator_id(), pool, tnode, descs)); RETURN_IF_ERROR(build_side_pipe->set_sink(sink)); RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get())); if (_union_child_pipelines.find(father_id) == _union_child_pipelines.end()) { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index c548b8cfa3..314404024c 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -111,6 +111,8 @@ public: } } + int next_operator_id() { return _operator_id++; } + private: void _close_action() override; Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request) override; @@ -162,6 +164,9 @@ private: std::map _instance_id_to_runtime_state; std::mutex _state_map_lock; std::map> _union_child_pipelines; + // The number of operators is generally greater than the number of plan nodes, + // so we need additional ID and cannot rely solely on plan node ID. + int _operator_id = {1000000}; }; } // namespace pipeline diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp index cbee0bee6a..1eb3a6b139 100644 --- a/be/src/util/runtime_profile.cpp +++ b/be/src/util/runtime_profile.cpp @@ -281,6 +281,9 @@ RuntimeProfile* RuntimeProfile::create_child(const std::string& name, bool inden if (this->is_set_metadata()) { child->set_metadata(this->metadata()); } + if (this->is_set_sink()) { + child->set_is_sink(this->is_sink()); + } if (_children.empty()) { add_child_unlock(child, indent, nullptr); } else { @@ -635,6 +638,9 @@ void RuntimeProfile::to_thrift(std::vector* nodes) { node.metadata = _metadata; node.timestamp = _timestamp; node.indent = true; + if (this->is_set_sink()) { + node.__set_is_sink(this->is_sink()); + } CounterMap counter_map; { std::lock_guard l(_counter_map_lock); diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h index 23b4a1189b..db36e6133d 100644 --- a/be/src/util/runtime_profile.h +++ b/be/src/util/runtime_profile.h @@ -368,6 +368,15 @@ public: bool is_set_metadata() const { return _is_set_metadata; } + void set_is_sink(bool is_sink) { + _is_set_sink = true; + _is_sink = is_sink; + } + + bool is_sink() const { return _is_sink; } + + bool is_set_sink() const { return _is_set_sink; } + time_t timestamp() const { return _timestamp; } void set_timestamp(time_t ss) { _timestamp = ss; } @@ -430,6 +439,9 @@ private: int64_t _metadata; bool _is_set_metadata = false; + bool _is_sink = false; + bool _is_set_sink = false; + // The timestamp when the profile was modified, make sure the update is up to date. time_t _timestamp; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java index a2ec3a2cf4..e75082ddb4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java @@ -63,7 +63,7 @@ public class Profile { } public synchronized void update(long startTime, Map summaryInfo, boolean isFinished, - int profileLevel, Planner planner) { + int profileLevel, Planner planner, boolean isPipelineX) { if (this.isFinished) { return; } @@ -74,6 +74,7 @@ public class Profile { rootProfile.computeTimeInProfile(); rootProfile.setPlaner(planner); rootProfile.setProfileLevel(profileLevel); + rootProfile.setIsPipelineX(isPipelineX); ProfileManager.getInstance().pushProfile(rootProfile); this.isFinished = isFinished; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileStatistics.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileStatistics.java index 758c5e69b3..0d4e8ced88 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileStatistics.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileStatistics.java @@ -18,6 +18,7 @@ package org.apache.doris.common.util; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; /** @@ -35,11 +36,14 @@ public class ProfileStatistics { private boolean isDataSink; - public ProfileStatistics() { + private boolean isPipelineX; + + public ProfileStatistics(boolean isPipelineX) { statisticalInfo = new HashMap>(); fragmentInfo = new HashMap>(); fragmentId = 0; isDataSink = false; + this.isPipelineX = isPipelineX; } private void addPlanNodeInfo(int id, String info) { @@ -56,11 +60,20 @@ public class ProfileStatistics { fragmentInfo.get(fragmentId).add(info); } - public void addInfoFromProfile(RuntimeProfile profile, String info) { - if (isDataSink) { - addDataSinkInfo(info); + public void addInfoFromProfile(RuntimeProfile profile, String name, String info) { + if (isPipelineX) { + if (profile.sinkOperator()) { + name = name + "(Sink)"; + } else { + name = name + "(Operator)"; + } + addPlanNodeInfo(profile.nodeId(), name + ": " + info); } else { - addPlanNodeInfo(profile.nodeId(), info); + if (isDataSink) { + addDataSinkInfo(name + ": " + info); + } else { + addPlanNodeInfo(profile.nodeId(), name + ": " + info); + } } } @@ -73,6 +86,7 @@ public class ProfileStatistics { return; } ArrayList infos = statisticalInfo.get(id); + Collections.sort(infos); for (String info : infos) { str.append(prefix + info + "\n"); } @@ -83,6 +97,7 @@ public class ProfileStatistics { return; } ArrayList infos = fragmentInfo.get(fragmentIdx); + Collections.sort(infos); for (String info : infos) { str.append(prefix + info + "\n"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java index 9fc3597400..1ba7f66b69 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java @@ -75,6 +75,13 @@ public class RuntimeProfile { private Boolean isDone = false; private Boolean isCancel = false; + // In pipelineX, we have explicitly split the Operator into sink and operator, + // and we can distinguish them using tags. + // In the old pipeline, we can only differentiate them based on their position + // in the profile, which is quite tricky and only transitional. + private Boolean isPipelineX = false; + private Boolean isSinkOperator = false; + private int profileLevel = 3; private Planner planner = null; private int nodeid = -1; @@ -119,6 +126,14 @@ public class RuntimeProfile { return this.nodeid; } + public Boolean sinkOperator() { + return this.isSinkOperator; + } + + public void setIsPipelineX(boolean isPipelineX) { + this.isPipelineX = isPipelineX; + } + public Map getCounterMap() { return counterMap; } @@ -181,6 +196,9 @@ public class RuntimeProfile { if (node.isSetMetadata()) { this.nodeid = (int) node.getMetadata(); } + if (node.isSetIsSink()) { + this.isSinkOperator = node.is_sink; + } Preconditions.checkState(timestamp == -1 || node.timestamp != -1); // update this level's counters if (node.counters != null) { @@ -483,11 +501,10 @@ public class RuntimeProfile { long countNumber = rhsCounter.size() + 1; if (newCounter.getValue() > 0) { newCounter.divValue(countNumber); - String infoString = counterName + ": " - + AVG_TIME_PRE + printCounter(newCounter.getValue(), newCounter.getType()) + ", " + String infoString = AVG_TIME_PRE + printCounter(newCounter.getValue(), newCounter.getType()) + ", " + MAX_TIME_PRE + printCounter(maxCounter.getValue(), maxCounter.getType()) + ", " + MIN_TIME_PRE + printCounter(minCounter.getValue(), minCounter.getType()); - statistics.addInfoFromProfile(src, infoString); + statistics.addInfoFromProfile(src, counterName, infoString); } } else { Counter newCounter = new Counter(counter.getType(), counter.getValue()); @@ -496,8 +513,8 @@ public class RuntimeProfile { newCounter.addValue(cnt); } } - String infoString = counterName + ": " + printCounter(newCounter.getValue(), newCounter.getType()); - statistics.addInfoFromProfile(src, infoString); + String infoString = printCounter(newCounter.getValue(), newCounter.getType()); + statistics.addInfoFromProfile(src, counterName, infoString); } } @@ -516,7 +533,7 @@ public class RuntimeProfile { } StringBuilder builder = new StringBuilder(); prettyPrint(builder, ""); - ProfileStatistics statistics = new ProfileStatistics(); + ProfileStatistics statistics = new ProfileStatistics(this.isPipelineX); simpleProfile(0, 0, statistics); String planerStr = this.planner.getExplainStringToProfile(statistics); return "Simple profile \n \n " + planerStr + "\n \n \n" + builder.toString(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 408196283f..df1abe1ec9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -323,7 +323,7 @@ public class BrokerLoadJob extends BulkLoadJob { return; } jobProfile.update(createTimestamp, getSummaryInfo(true), true, - Integer.valueOf(sessionVariables.getOrDefault(SessionVariable.PROFILE_LEVEL, "3")), null); + Integer.valueOf(sessionVariables.getOrDefault(SessionVariable.PROFILE_LEVEL, "3")), null, false); } private Map getSummaryInfo(boolean isFinished) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 2297d16bc2..2ce0de22cb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -887,7 +887,8 @@ public class StmtExecutor { } profile.update(context.startTime, getSummaryInfo(isFinished), isFinished, - context.getSessionVariable().profileLevel, this.planner); + context.getSessionVariable().profileLevel, this.planner, + context.getSessionVariable().getEnablePipelineXEngine()); } // Analyze one statement to structure in memory. diff --git a/gensrc/thrift/RuntimeProfile.thrift b/gensrc/thrift/RuntimeProfile.thrift index 637ff537ea..0b4b6179de 100644 --- a/gensrc/thrift/RuntimeProfile.thrift +++ b/gensrc/thrift/RuntimeProfile.thrift @@ -52,6 +52,8 @@ struct TRuntimeProfileNode { 8: required map> child_counters_map 9: required i64 timestamp + + 10: optional bool is_sink } // A flattened tree of runtime profiles, obtained by an