[enhancement](exceptionsafe) force check exec node method's return value (#19538)
This commit is contained in:
@ -166,7 +166,7 @@ Status ExecNode::reset(RuntimeState* state) {
|
||||
Status ExecNode::collect_query_statistics(QueryStatistics* statistics) {
|
||||
DCHECK(statistics != nullptr);
|
||||
for (auto child_node : _children) {
|
||||
child_node->collect_query_statistics(statistics);
|
||||
RETURN_IF_ERROR(child_node->collect_query_statistics(statistics));
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -71,7 +71,7 @@ public:
|
||||
/// Initializes this object from the thrift tnode desc. The subclass should
|
||||
/// do any initialization that can fail in Init() rather than the ctor.
|
||||
/// If overridden in subclass, must first call superclass's Init().
|
||||
virtual Status init(const TPlanNode& tnode, RuntimeState* state);
|
||||
[[nodiscard]] virtual Status init(const TPlanNode& tnode, RuntimeState* state);
|
||||
|
||||
// Sets up internal structures, etc., without doing any actual work.
|
||||
// Must be called prior to open(). Will only be called once in this
|
||||
@ -80,17 +80,17 @@ public:
|
||||
// in prepare(). Retrieving the jit compiled function pointer must happen in
|
||||
// open().
|
||||
// If overridden in subclass, must first call superclass's prepare().
|
||||
virtual Status prepare(RuntimeState* state);
|
||||
[[nodiscard]] virtual Status prepare(RuntimeState* state);
|
||||
|
||||
// Performs any preparatory work prior to calling get_next().
|
||||
// Can be called repeatedly (after calls to close()).
|
||||
// Caller must not be holding any io buffers. This will cause deadlock.
|
||||
virtual Status open(RuntimeState* state);
|
||||
[[nodiscard]] virtual Status open(RuntimeState* state);
|
||||
|
||||
// Alloc and open resource for the node
|
||||
// Only pipeline operator use exec node need to impl the virtual function
|
||||
// so only vectorized exec node need to impl
|
||||
virtual Status alloc_resource(RuntimeState* state);
|
||||
[[nodiscard]] virtual Status alloc_resource(RuntimeState* state);
|
||||
|
||||
// Retrieves rows and returns them via row_batch. Sets eos to true
|
||||
// if subsequent calls will not retrieve any more rows.
|
||||
@ -105,9 +105,9 @@ public:
|
||||
// row_batch's tuple_data_pool.
|
||||
// Caller must not be holding any io buffers. This will cause deadlock.
|
||||
// TODO: AggregationNode and HashJoinNode cannot be "re-opened" yet.
|
||||
virtual Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos);
|
||||
[[nodiscard]] virtual Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos);
|
||||
// new interface to compatible new optimizers in FE
|
||||
Status get_next_after_projects(
|
||||
[[nodiscard]] Status get_next_after_projects(
|
||||
RuntimeState* state, vectorized::Block* block, bool* eos,
|
||||
const std::function<Status(RuntimeState*, vectorized::Block*, bool*)>& fn,
|
||||
bool clear_data = true);
|
||||
@ -125,11 +125,13 @@ public:
|
||||
|
||||
// Emit data, both need impl with method: sink
|
||||
// Eg: Aggregation, Sort, Scan
|
||||
virtual Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) {
|
||||
[[nodiscard]] virtual Status pull(RuntimeState* state, vectorized::Block* output_block,
|
||||
bool* eos) {
|
||||
return get_next(state, output_block, eos);
|
||||
}
|
||||
|
||||
virtual Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) {
|
||||
[[nodiscard]] virtual Status push(RuntimeState* state, vectorized::Block* input_block,
|
||||
bool eos) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -138,7 +140,8 @@ public:
|
||||
// Sink Data to ExecNode to do some stock work, both need impl with method: get_result
|
||||
// `eos` means source is exhausted, exec node should do some finalize work
|
||||
// Eg: Aggregation, Sort
|
||||
virtual Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos);
|
||||
[[nodiscard]] virtual Status sink(RuntimeState* state, vectorized::Block* input_block,
|
||||
bool eos);
|
||||
|
||||
// Resets the stream of row batches to be retrieved by subsequent GetNext() calls.
|
||||
// Clears all internal state, returning this node to the state it was in after calling
|
||||
@ -153,12 +156,12 @@ public:
|
||||
// implementation calls Reset() on children.
|
||||
// Note that this function may be called many times (proportional to the input data),
|
||||
// so should be fast.
|
||||
virtual Status reset(RuntimeState* state);
|
||||
[[nodiscard]] virtual Status reset(RuntimeState* state);
|
||||
|
||||
// This should be called before close() and after get_next(), it is responsible for
|
||||
// collecting statistics sent with row batch, it can't be called when prepare() returns
|
||||
// error.
|
||||
virtual Status collect_query_statistics(QueryStatistics* statistics);
|
||||
[[nodiscard]] virtual Status collect_query_statistics(QueryStatistics* statistics);
|
||||
|
||||
// close() will get called for every exec node, regardless of what else is called and
|
||||
// the status of these calls (i.e. prepare() may never have been called, or
|
||||
@ -183,8 +186,9 @@ public:
|
||||
// Creates exec node tree from list of nodes contained in plan via depth-first
|
||||
// traversal. All nodes are placed in pool.
|
||||
// Returns error if 'plan' is corrupted, otherwise success.
|
||||
static Status create_tree(RuntimeState* state, ObjectPool* pool, const TPlan& plan,
|
||||
const DescriptorTbl& descs, ExecNode** root);
|
||||
[[nodiscard]] static Status create_tree(RuntimeState* state, ObjectPool* pool,
|
||||
const TPlan& plan, const DescriptorTbl& descs,
|
||||
ExecNode** root);
|
||||
|
||||
// Collect all nodes of given 'node_type' that are part of this subtree, and return in
|
||||
// 'nodes'.
|
||||
|
||||
@ -371,7 +371,11 @@ Status PlanFragmentExecutor::get_vectorized_internal(::doris::vectorized::Block*
|
||||
|
||||
void PlanFragmentExecutor::_collect_query_statistics() {
|
||||
_query_statistics->clear();
|
||||
_plan->collect_query_statistics(_query_statistics.get());
|
||||
Status status = _plan->collect_query_statistics(_query_statistics.get());
|
||||
if (!status.ok()) {
|
||||
LOG(INFO) << "collect query statistics failed, st=" << status;
|
||||
return;
|
||||
}
|
||||
_query_statistics->add_cpu_ms(_fragment_cpu_timer->value() / NANOS_PER_MILLIS);
|
||||
if (_runtime_state->backend_id() != -1) {
|
||||
_collect_node_statistics();
|
||||
|
||||
@ -502,7 +502,8 @@ Status AggregationNode::alloc_resource(doris::RuntimeState* state) {
|
||||
// because during prepare and open thread is not the same one,
|
||||
// this could cause unable to get JVM
|
||||
if (_probe_expr_ctxs.empty()) {
|
||||
_create_agg_status(_agg_data->without_key);
|
||||
// _create_agg_status may acquire a lot of memory, may allocate failed when memory is very few
|
||||
RETURN_IF_CATCH_EXCEPTION(_create_agg_status(_agg_data->without_key));
|
||||
_agg_data_created_without_key = true;
|
||||
}
|
||||
|
||||
|
||||
@ -28,6 +28,7 @@
|
||||
|
||||
// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
|
||||
#include "common/compiler_util.h" // IWYU pragma: keep
|
||||
#include "common/exception.h"
|
||||
#include "common/logging.h"
|
||||
#include "runtime/descriptors.h"
|
||||
#include "runtime/memory/mem_tracker.h"
|
||||
@ -207,7 +208,7 @@ Status VAnalyticEvalNode::prepare(RuntimeState* state) {
|
||||
}
|
||||
_fn_place_ptr = _agg_arena_pool->aligned_alloc(_total_size_of_aggregate_states,
|
||||
_align_aggregate_states);
|
||||
_create_agg_status();
|
||||
RETURN_IF_CATCH_EXCEPTION(_create_agg_status());
|
||||
_executor.insert_result =
|
||||
std::bind<void>(&VAnalyticEvalNode::_insert_result_info, this, std::placeholders::_1);
|
||||
_executor.execute =
|
||||
|
||||
@ -189,7 +189,9 @@ Status VUnionNode::get_next_materialized(RuntimeState* state, Block* block) {
|
||||
child(_child_idx)->get_next_span(), _child_eos);
|
||||
SCOPED_TIMER(_materialize_exprs_evaluate_timer);
|
||||
if (child_block.rows() > 0) {
|
||||
RETURN_IF_ERROR(mblock.merge(materialize_block(&child_block, _child_idx)));
|
||||
Block res;
|
||||
RETURN_IF_ERROR(materialize_block(&child_block, _child_idx, &res));
|
||||
RETURN_IF_ERROR(mblock.merge(res));
|
||||
}
|
||||
// It shouldn't be the case that we reached the limit because we shouldn't have
|
||||
// incremented '_num_rows_returned' yet.
|
||||
@ -267,7 +269,9 @@ Status VUnionNode::materialize_child_block(RuntimeState* state, int child_id,
|
||||
_row_descriptor)));
|
||||
|
||||
if (input_block->rows() > 0) {
|
||||
RETURN_IF_ERROR(mblock.merge(materialize_block(input_block, child_id)));
|
||||
Block res;
|
||||
RETURN_IF_ERROR(materialize_block(input_block, child_id, &res));
|
||||
RETURN_IF_ERROR(mblock.merge(res));
|
||||
if (!mem_reuse) {
|
||||
output_block->swap(mblock.to_block());
|
||||
}
|
||||
@ -341,17 +345,17 @@ void VUnionNode::debug_string(int indentation_level, std::stringstream* out) con
|
||||
*out << ")" << std::endl;
|
||||
}
|
||||
|
||||
Block VUnionNode::materialize_block(Block* src_block, int child_idx) {
|
||||
Status VUnionNode::materialize_block(Block* src_block, int child_idx, Block* res_block) {
|
||||
const std::vector<VExprContext*>& child_exprs = _child_expr_lists[child_idx];
|
||||
ColumnsWithTypeAndName colunms;
|
||||
for (size_t i = 0; i < child_exprs.size(); ++i) {
|
||||
int result_column_id = -1;
|
||||
auto state = child_exprs[i]->execute(src_block, &result_column_id);
|
||||
CHECK(state.ok()) << state.to_string();
|
||||
RETURN_IF_ERROR(child_exprs[i]->execute(src_block, &result_column_id));
|
||||
colunms.emplace_back(src_block->get_by_position(result_column_id));
|
||||
}
|
||||
_child_row_idx += src_block->rows();
|
||||
return {colunms};
|
||||
*res_block = {colunms};
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace vectorized
|
||||
|
||||
@ -104,7 +104,7 @@ private:
|
||||
/// Evaluates exprs for the current child and materializes the results into 'tuple_buf',
|
||||
/// which is attached to 'dst_block'. Runs until 'dst_block' is at capacity, or all rows
|
||||
/// have been consumed from the current child block. Updates '_child_row_idx'.
|
||||
Block materialize_block(Block* dst_block, int child_idx);
|
||||
Status materialize_block(Block* dst_block, int child_idx, Block* res_block);
|
||||
|
||||
Status get_error_msg(const std::vector<VExprContext*>& exprs);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user