[pipelineX](fix) Fix exception reporting and Nereids plan (#24936)
This commit is contained in:
@ -46,11 +46,10 @@ void MetaScanLocalState::set_scan_ranges(const std::vector<TScanRangeParams>& sc
|
||||
MetaScanOperatorX::MetaScanOperatorX(ObjectPool* pool, const TPlanNode& tnode,
|
||||
const DescriptorTbl& descs)
|
||||
: ScanOperatorX<MetaScanLocalState>(pool, tnode, descs),
|
||||
_tuple_id(tnode.meta_scan_node.tuple_id),
|
||||
_scan_params(tnode.meta_scan_node) {
|
||||
_tuple_id(tnode.meta_scan_node.tuple_id) {
|
||||
_output_tuple_id = _tuple_id;
|
||||
if (_scan_params.__isset.current_user_ident) {
|
||||
_user_identity = _scan_params.current_user_ident;
|
||||
if (tnode.meta_scan_node.__isset.current_user_ident) {
|
||||
_user_identity = tnode.meta_scan_node.current_user_ident;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -63,7 +63,6 @@ private:
|
||||
|
||||
TupleId _tuple_id;
|
||||
TUserIdentity _user_identity;
|
||||
TMetaScanNode _scan_params;
|
||||
};
|
||||
|
||||
} // namespace doris::pipeline
|
||||
|
||||
@ -118,7 +118,7 @@ public:
|
||||
virtual void add_merge_controller_handler(
|
||||
std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {}
|
||||
|
||||
void send_report(bool);
|
||||
virtual void send_report(bool);
|
||||
|
||||
virtual void report_profile();
|
||||
|
||||
@ -130,7 +130,7 @@ public:
|
||||
return _query_ctx->exec_status();
|
||||
}
|
||||
|
||||
taskgroup::TaskGroupPipelineTaskEntity* get_task_group_entity() const {
|
||||
[[nodiscard]] taskgroup::TaskGroupPipelineTaskEntity* get_task_group_entity() const {
|
||||
return _task_group_entity;
|
||||
}
|
||||
|
||||
|
||||
@ -875,12 +875,10 @@ void PipelineXFragmentContext::send_report(bool done) {
|
||||
runtime_states[i] = _runtime_states[i].get();
|
||||
}
|
||||
|
||||
std::vector<RuntimeState*> empty_vector(0);
|
||||
|
||||
_report_status_cb(
|
||||
{true, exec_status, _runtime_state->enable_profile() ? runtime_states : empty_vector,
|
||||
nullptr, nullptr, done || !exec_status.ok(), _query_ctx->coord_addr, _query_id,
|
||||
_fragment_id, TUniqueId(), _backend_num, _runtime_state.get(),
|
||||
{true, exec_status, runtime_states, nullptr, nullptr, done || !exec_status.ok(),
|
||||
_query_ctx->coord_addr, _query_id, _fragment_id, TUniqueId(), _backend_num,
|
||||
_runtime_state.get(),
|
||||
std::bind(&PipelineFragmentContext::update_status, this, std::placeholders::_1),
|
||||
std::bind(&PipelineFragmentContext::cancel, this, std::placeholders::_1,
|
||||
std::placeholders::_2)});
|
||||
|
||||
@ -94,7 +94,7 @@ public:
|
||||
void cancel(const PPlanFragmentCancelReason& reason = PPlanFragmentCancelReason::INTERNAL_ERROR,
|
||||
const std::string& msg = "") override;
|
||||
|
||||
void send_report(bool);
|
||||
void send_report(bool) override;
|
||||
|
||||
void report_profile() override;
|
||||
|
||||
|
||||
@ -212,7 +212,25 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
|
||||
params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
|
||||
params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
|
||||
}
|
||||
params.__isset.detailed_report = req.is_pipeline_x;
|
||||
if (req.is_pipeline_x) {
|
||||
params.__isset.detailed_report = true;
|
||||
for (auto* rs : req.runtime_states) {
|
||||
TDetailedReportParams detailed_param;
|
||||
detailed_param.__set_fragment_instance_id(rs->fragment_instance_id());
|
||||
detailed_param.__isset.fragment_instance_id = true;
|
||||
|
||||
if (rs->enable_profile()) {
|
||||
detailed_param.__isset.profile = true;
|
||||
detailed_param.__isset.loadChannelProfile = true;
|
||||
|
||||
rs->runtime_profile()->to_thrift(&detailed_param.profile);
|
||||
rs->load_channel_profile()->to_thrift(&detailed_param.loadChannelProfile);
|
||||
}
|
||||
|
||||
params.detailed_report.push_back(detailed_param);
|
||||
}
|
||||
}
|
||||
|
||||
if (req.profile != nullptr) {
|
||||
req.profile->to_thrift(¶ms.profile);
|
||||
if (req.load_channel_profile) {
|
||||
@ -220,20 +238,6 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
|
||||
}
|
||||
params.__isset.profile = true;
|
||||
params.__isset.loadChannelProfile = true;
|
||||
} else if (!req.runtime_states.empty()) {
|
||||
params.__isset.detailed_report = true;
|
||||
for (auto* rs : req.runtime_states) {
|
||||
TDetailedReportParams detailed_param;
|
||||
detailed_param.__set_fragment_instance_id(rs->fragment_instance_id());
|
||||
detailed_param.__isset.fragment_instance_id = true;
|
||||
detailed_param.__isset.profile = true;
|
||||
detailed_param.__isset.loadChannelProfile = true;
|
||||
|
||||
rs->runtime_profile()->to_thrift(&detailed_param.profile);
|
||||
rs->load_channel_profile()->to_thrift(&detailed_param.loadChannelProfile);
|
||||
|
||||
params.detailed_report.push_back(detailed_param);
|
||||
}
|
||||
} else {
|
||||
params.__isset.profile = false;
|
||||
}
|
||||
|
||||
@ -52,6 +52,8 @@ public class TurnOffPipelineForDml extends PlanPreprocessor {
|
||||
try {
|
||||
VariableMgr.setVar(sessionVariable,
|
||||
new SetVar(SessionVariable.ENABLE_PIPELINE_ENGINE, new StringLiteral("false")));
|
||||
VariableMgr.setVar(sessionVariable,
|
||||
new SetVar(SessionVariable.ENABLE_PIPELINE_X_ENGINE, new StringLiteral("false")));
|
||||
} catch (Throwable t) {
|
||||
throw new AnalysisException("Can not set turn off pipeline for DML", t);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user