diff --git a/be/src/pipeline/exec/meta_scan_operator.cpp b/be/src/pipeline/exec/meta_scan_operator.cpp index bc96a23398..4f639eb9f2 100644 --- a/be/src/pipeline/exec/meta_scan_operator.cpp +++ b/be/src/pipeline/exec/meta_scan_operator.cpp @@ -46,11 +46,10 @@ void MetaScanLocalState::set_scan_ranges(const std::vector& sc MetaScanOperatorX::MetaScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : ScanOperatorX(pool, tnode, descs), - _tuple_id(tnode.meta_scan_node.tuple_id), - _scan_params(tnode.meta_scan_node) { + _tuple_id(tnode.meta_scan_node.tuple_id) { _output_tuple_id = _tuple_id; - if (_scan_params.__isset.current_user_ident) { - _user_identity = _scan_params.current_user_ident; + if (tnode.meta_scan_node.__isset.current_user_ident) { + _user_identity = tnode.meta_scan_node.current_user_ident; } } diff --git a/be/src/pipeline/exec/meta_scan_operator.h b/be/src/pipeline/exec/meta_scan_operator.h index 6463958236..0e9c8db791 100644 --- a/be/src/pipeline/exec/meta_scan_operator.h +++ b/be/src/pipeline/exec/meta_scan_operator.h @@ -63,7 +63,6 @@ private: TupleId _tuple_id; TUserIdentity _user_identity; - TMetaScanNode _scan_params; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 415015a807..8e80a73143 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -118,7 +118,7 @@ public: virtual void add_merge_controller_handler( std::shared_ptr& handler) {} - void send_report(bool); + virtual void send_report(bool); virtual void report_profile(); @@ -130,7 +130,7 @@ public: return _query_ctx->exec_status(); } - taskgroup::TaskGroupPipelineTaskEntity* get_task_group_entity() const { + [[nodiscard]] taskgroup::TaskGroupPipelineTaskEntity* get_task_group_entity() const { return _task_group_entity; } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index c167588234..7a0cc80556 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -875,12 +875,10 @@ void PipelineXFragmentContext::send_report(bool done) { runtime_states[i] = _runtime_states[i].get(); } - std::vector empty_vector(0); - _report_status_cb( - {true, exec_status, _runtime_state->enable_profile() ? runtime_states : empty_vector, - nullptr, nullptr, done || !exec_status.ok(), _query_ctx->coord_addr, _query_id, - _fragment_id, TUniqueId(), _backend_num, _runtime_state.get(), + {true, exec_status, runtime_states, nullptr, nullptr, done || !exec_status.ok(), + _query_ctx->coord_addr, _query_id, _fragment_id, TUniqueId(), _backend_num, + _runtime_state.get(), std::bind(&PipelineFragmentContext::update_status, this, std::placeholders::_1), std::bind(&PipelineFragmentContext::cancel, this, std::placeholders::_1, std::placeholders::_2)}); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index a84b61cd88..7c54a411c6 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -94,7 +94,7 @@ public: void cancel(const PPlanFragmentCancelReason& reason = PPlanFragmentCancelReason::INTERNAL_ERROR, const std::string& msg = "") override; - void send_report(bool); + void send_report(bool) override; void report_profile() override; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index dcded46aa4..79e67503e1 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -212,7 +212,25 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { params.__set_loaded_rows(req.runtime_state->num_rows_load_total()); params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total()); } - params.__isset.detailed_report = req.is_pipeline_x; + if (req.is_pipeline_x) { + params.__isset.detailed_report = true; + for (auto* rs : req.runtime_states) { + TDetailedReportParams detailed_param; + detailed_param.__set_fragment_instance_id(rs->fragment_instance_id()); + detailed_param.__isset.fragment_instance_id = true; + + if (rs->enable_profile()) { + detailed_param.__isset.profile = true; + detailed_param.__isset.loadChannelProfile = true; + + rs->runtime_profile()->to_thrift(&detailed_param.profile); + rs->load_channel_profile()->to_thrift(&detailed_param.loadChannelProfile); + } + + params.detailed_report.push_back(detailed_param); + } + } + if (req.profile != nullptr) { req.profile->to_thrift(¶ms.profile); if (req.load_channel_profile) { @@ -220,20 +238,6 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { } params.__isset.profile = true; params.__isset.loadChannelProfile = true; - } else if (!req.runtime_states.empty()) { - params.__isset.detailed_report = true; - for (auto* rs : req.runtime_states) { - TDetailedReportParams detailed_param; - detailed_param.__set_fragment_instance_id(rs->fragment_instance_id()); - detailed_param.__isset.fragment_instance_id = true; - detailed_param.__isset.profile = true; - detailed_param.__isset.loadChannelProfile = true; - - rs->runtime_profile()->to_thrift(&detailed_param.profile); - rs->load_channel_profile()->to_thrift(&detailed_param.loadChannelProfile); - - params.detailed_report.push_back(detailed_param); - } } else { params.__isset.profile = false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPipelineForDml.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPipelineForDml.java index 79afe5d2ea..606bc9d69a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPipelineForDml.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPipelineForDml.java @@ -52,6 +52,8 @@ public class TurnOffPipelineForDml extends PlanPreprocessor { try { VariableMgr.setVar(sessionVariable, new SetVar(SessionVariable.ENABLE_PIPELINE_ENGINE, new StringLiteral("false"))); + VariableMgr.setVar(sessionVariable, + new SetVar(SessionVariable.ENABLE_PIPELINE_X_ENGINE, new StringLiteral("false"))); } catch (Throwable t) { throw new AnalysisException("Can not set turn off pipeline for DML", t); }