diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index ba807bd77d..a1f29aaf8b 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -166,7 +166,7 @@ Status ExecNode::reset(RuntimeState* state) { Status ExecNode::collect_query_statistics(QueryStatistics* statistics) { DCHECK(statistics != nullptr); for (auto child_node : _children) { - child_node->collect_query_statistics(statistics); + RETURN_IF_ERROR(child_node->collect_query_statistics(statistics)); } return Status::OK(); } diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index edbec218d7..d9a7e5afd7 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -71,7 +71,7 @@ public: /// Initializes this object from the thrift tnode desc. The subclass should /// do any initialization that can fail in Init() rather than the ctor. /// If overridden in subclass, must first call superclass's Init(). - virtual Status init(const TPlanNode& tnode, RuntimeState* state); + [[nodiscard]] virtual Status init(const TPlanNode& tnode, RuntimeState* state); // Sets up internal structures, etc., without doing any actual work. // Must be called prior to open(). Will only be called once in this @@ -80,17 +80,17 @@ public: // in prepare(). Retrieving the jit compiled function pointer must happen in // open(). // If overridden in subclass, must first call superclass's prepare(). - virtual Status prepare(RuntimeState* state); + [[nodiscard]] virtual Status prepare(RuntimeState* state); // Performs any preparatory work prior to calling get_next(). // Can be called repeatedly (after calls to close()). // Caller must not be holding any io buffers. This will cause deadlock. - virtual Status open(RuntimeState* state); + [[nodiscard]] virtual Status open(RuntimeState* state); // Alloc and open resource for the node // Only pipeline operator use exec node need to impl the virtual function // so only vectorized exec node need to impl - virtual Status alloc_resource(RuntimeState* state); + [[nodiscard]] virtual Status alloc_resource(RuntimeState* state); // Retrieves rows and returns them via row_batch. Sets eos to true // if subsequent calls will not retrieve any more rows. @@ -105,9 +105,9 @@ public: // row_batch's tuple_data_pool. // Caller must not be holding any io buffers. This will cause deadlock. // TODO: AggregationNode and HashJoinNode cannot be "re-opened" yet. - virtual Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos); + [[nodiscard]] virtual Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos); // new interface to compatible new optimizers in FE - Status get_next_after_projects( + [[nodiscard]] Status get_next_after_projects( RuntimeState* state, vectorized::Block* block, bool* eos, const std::function& fn, bool clear_data = true); @@ -125,11 +125,13 @@ public: // Emit data, both need impl with method: sink // Eg: Aggregation, Sort, Scan - virtual Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) { + [[nodiscard]] virtual Status pull(RuntimeState* state, vectorized::Block* output_block, + bool* eos) { return get_next(state, output_block, eos); } - virtual Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) { + [[nodiscard]] virtual Status push(RuntimeState* state, vectorized::Block* input_block, + bool eos) { return Status::OK(); } @@ -138,7 +140,8 @@ public: // Sink Data to ExecNode to do some stock work, both need impl with method: get_result // `eos` means source is exhausted, exec node should do some finalize work // Eg: Aggregation, Sort - virtual Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos); + [[nodiscard]] virtual Status sink(RuntimeState* state, vectorized::Block* input_block, + bool eos); // Resets the stream of row batches to be retrieved by subsequent GetNext() calls. // Clears all internal state, returning this node to the state it was in after calling @@ -153,12 +156,12 @@ public: // implementation calls Reset() on children. // Note that this function may be called many times (proportional to the input data), // so should be fast. - virtual Status reset(RuntimeState* state); + [[nodiscard]] virtual Status reset(RuntimeState* state); // This should be called before close() and after get_next(), it is responsible for // collecting statistics sent with row batch, it can't be called when prepare() returns // error. - virtual Status collect_query_statistics(QueryStatistics* statistics); + [[nodiscard]] virtual Status collect_query_statistics(QueryStatistics* statistics); // close() will get called for every exec node, regardless of what else is called and // the status of these calls (i.e. prepare() may never have been called, or @@ -183,8 +186,9 @@ public: // Creates exec node tree from list of nodes contained in plan via depth-first // traversal. All nodes are placed in pool. // Returns error if 'plan' is corrupted, otherwise success. - static Status create_tree(RuntimeState* state, ObjectPool* pool, const TPlan& plan, - const DescriptorTbl& descs, ExecNode** root); + [[nodiscard]] static Status create_tree(RuntimeState* state, ObjectPool* pool, + const TPlan& plan, const DescriptorTbl& descs, + ExecNode** root); // Collect all nodes of given 'node_type' that are part of this subtree, and return in // 'nodes'. diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 0b6655193d..ddad12e176 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -371,7 +371,11 @@ Status PlanFragmentExecutor::get_vectorized_internal(::doris::vectorized::Block* void PlanFragmentExecutor::_collect_query_statistics() { _query_statistics->clear(); - _plan->collect_query_statistics(_query_statistics.get()); + Status status = _plan->collect_query_statistics(_query_statistics.get()); + if (!status.ok()) { + LOG(INFO) << "collect query statistics failed, st=" << status; + return; + } _query_statistics->add_cpu_ms(_fragment_cpu_timer->value() / NANOS_PER_MILLIS); if (_runtime_state->backend_id() != -1) { _collect_node_statistics(); diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index 3c6dd33483..18ae7f42f2 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -502,7 +502,8 @@ Status AggregationNode::alloc_resource(doris::RuntimeState* state) { // because during prepare and open thread is not the same one, // this could cause unable to get JVM if (_probe_expr_ctxs.empty()) { - _create_agg_status(_agg_data->without_key); + // _create_agg_status may acquire a lot of memory, may allocate failed when memory is very few + RETURN_IF_CATCH_EXCEPTION(_create_agg_status(_agg_data->without_key)); _agg_data_created_without_key = true; } diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp index 8710099039..9ee4b424e2 100644 --- a/be/src/vec/exec/vanalytic_eval_node.cpp +++ b/be/src/vec/exec/vanalytic_eval_node.cpp @@ -28,6 +28,7 @@ // IWYU pragma: no_include #include "common/compiler_util.h" // IWYU pragma: keep +#include "common/exception.h" #include "common/logging.h" #include "runtime/descriptors.h" #include "runtime/memory/mem_tracker.h" @@ -207,7 +208,7 @@ Status VAnalyticEvalNode::prepare(RuntimeState* state) { } _fn_place_ptr = _agg_arena_pool->aligned_alloc(_total_size_of_aggregate_states, _align_aggregate_states); - _create_agg_status(); + RETURN_IF_CATCH_EXCEPTION(_create_agg_status()); _executor.insert_result = std::bind(&VAnalyticEvalNode::_insert_result_info, this, std::placeholders::_1); _executor.execute = diff --git a/be/src/vec/exec/vunion_node.cpp b/be/src/vec/exec/vunion_node.cpp index a66e8bceec..d3b73a7ae1 100644 --- a/be/src/vec/exec/vunion_node.cpp +++ b/be/src/vec/exec/vunion_node.cpp @@ -189,7 +189,9 @@ Status VUnionNode::get_next_materialized(RuntimeState* state, Block* block) { child(_child_idx)->get_next_span(), _child_eos); SCOPED_TIMER(_materialize_exprs_evaluate_timer); if (child_block.rows() > 0) { - RETURN_IF_ERROR(mblock.merge(materialize_block(&child_block, _child_idx))); + Block res; + RETURN_IF_ERROR(materialize_block(&child_block, _child_idx, &res)); + RETURN_IF_ERROR(mblock.merge(res)); } // It shouldn't be the case that we reached the limit because we shouldn't have // incremented '_num_rows_returned' yet. @@ -267,7 +269,9 @@ Status VUnionNode::materialize_child_block(RuntimeState* state, int child_id, _row_descriptor))); if (input_block->rows() > 0) { - RETURN_IF_ERROR(mblock.merge(materialize_block(input_block, child_id))); + Block res; + RETURN_IF_ERROR(materialize_block(input_block, child_id, &res)); + RETURN_IF_ERROR(mblock.merge(res)); if (!mem_reuse) { output_block->swap(mblock.to_block()); } @@ -341,17 +345,17 @@ void VUnionNode::debug_string(int indentation_level, std::stringstream* out) con *out << ")" << std::endl; } -Block VUnionNode::materialize_block(Block* src_block, int child_idx) { +Status VUnionNode::materialize_block(Block* src_block, int child_idx, Block* res_block) { const std::vector& child_exprs = _child_expr_lists[child_idx]; ColumnsWithTypeAndName colunms; for (size_t i = 0; i < child_exprs.size(); ++i) { int result_column_id = -1; - auto state = child_exprs[i]->execute(src_block, &result_column_id); - CHECK(state.ok()) << state.to_string(); + RETURN_IF_ERROR(child_exprs[i]->execute(src_block, &result_column_id)); colunms.emplace_back(src_block->get_by_position(result_column_id)); } _child_row_idx += src_block->rows(); - return {colunms}; + *res_block = {colunms}; + return Status::OK(); } } // namespace vectorized diff --git a/be/src/vec/exec/vunion_node.h b/be/src/vec/exec/vunion_node.h index 7532c00875..79ef72106c 100644 --- a/be/src/vec/exec/vunion_node.h +++ b/be/src/vec/exec/vunion_node.h @@ -104,7 +104,7 @@ private: /// Evaluates exprs for the current child and materializes the results into 'tuple_buf', /// which is attached to 'dst_block'. Runs until 'dst_block' is at capacity, or all rows /// have been consumed from the current child block. Updates '_child_row_idx'. - Block materialize_block(Block* dst_block, int child_idx); + Status materialize_block(Block* dst_block, int child_idx, Block* res_block); Status get_error_msg(const std::vector& exprs);