diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 2573cb6ba3..59b87b9da8 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -32,6 +32,7 @@ #include "common/status.h" #include "exec/exec_node.h" #include "runtime/memory/mem_tracker.h" +#include "runtime/runtime_state.h" #include "util/runtime_profile.h" #include "vec/core/block.h" @@ -296,7 +297,7 @@ public: return Status::OK(); } _fresh_exec_timer(_sink); - RETURN_IF_ERROR(_sink->close(state, Status::OK())); + RETURN_IF_ERROR(_sink->close(state, state->query_status())); _is_closed = true; return Status::OK(); } diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index d3f0443863..6384b99315 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -154,6 +154,7 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, _exec_status = Status::Cancelled(msg); } _runtime_state->set_is_cancelled(true); + _runtime_state->set_process_status(_exec_status); // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe // For stream load the fragment's query_id == load id, it is set in FE. auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id);