[fix](pipeline) fix query returns empty result instead of an error occasionally after being cancelled (#19561)
This commit is contained in:
@ -32,6 +32,7 @@
|
||||
#include "common/status.h"
|
||||
#include "exec/exec_node.h"
|
||||
#include "runtime/memory/mem_tracker.h"
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "util/runtime_profile.h"
|
||||
#include "vec/core/block.h"
|
||||
|
||||
@ -296,7 +297,7 @@ public:
|
||||
return Status::OK();
|
||||
}
|
||||
_fresh_exec_timer(_sink);
|
||||
RETURN_IF_ERROR(_sink->close(state, Status::OK()));
|
||||
RETURN_IF_ERROR(_sink->close(state, state->query_status()));
|
||||
_is_closed = true;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -154,6 +154,7 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
|
||||
_exec_status = Status::Cancelled(msg);
|
||||
}
|
||||
_runtime_state->set_is_cancelled(true);
|
||||
_runtime_state->set_process_status(_exec_status);
|
||||
// Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe
|
||||
// For stream load the fragment's query_id == load id, it is set in FE.
|
||||
auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id);
|
||||
|
||||
Reference in New Issue
Block a user