// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // This file is copied from // https://github.com/apache/impala/blob/branch-2.9.0/be/src/exec/union-node.cc // and modified by Doris #include "exec/union_node.h" #include "exprs/expr.h" #include "exprs/expr_context.h" #include "gen_cpp/PlanNodes_types.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" #include "runtime/tuple.h" #include "runtime/tuple_row.h" #include "util/runtime_profile.h" namespace doris { UnionNode::UnionNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : ExecNode(pool, tnode, descs), _tuple_id(tnode.union_node.tuple_id), _tuple_desc(nullptr), _first_materialized_child_idx(tnode.union_node.first_materialized_child_idx), _child_idx(0), _child_batch(nullptr), _child_row_idx(0), _child_eos(false), _const_expr_list_idx(0), _to_close_child_idx(-1) {} Status UnionNode::init(const TPlanNode& tnode, RuntimeState* state) { // TODO(zc): // RETURN_IF_ERROR(ExecNode::init(tnode, state)); RETURN_IF_ERROR(ExecNode::init(tnode, state)); DCHECK(tnode.__isset.union_node); DCHECK_EQ(_conjunct_ctxs.size(), 0); // Create const_expr_ctx_lists_ from thrift exprs. auto& const_texpr_lists = tnode.union_node.const_expr_lists; for (auto& texprs : const_texpr_lists) { std::vector ctxs; RETURN_IF_ERROR(Expr::create_expr_trees(_pool, texprs, &ctxs)); _const_expr_lists.push_back(ctxs); } // Create result_expr_ctx_lists_ from thrift exprs. auto& result_texpr_lists = tnode.union_node.result_expr_lists; for (auto& texprs : result_texpr_lists) { std::vector ctxs; RETURN_IF_ERROR(Expr::create_expr_trees(_pool, texprs, &ctxs)); _child_expr_lists.push_back(ctxs); } return Status::OK(); } Status UnionNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); DCHECK(_tuple_desc != nullptr); _materialize_exprs_evaluate_timer = ADD_TIMER(_runtime_profile, "MaterializeExprsEvaluateTimer"); _codegend_union_materialize_batch_fns.resize(_child_expr_lists.size()); // Prepare const expr lists. for (const std::vector& exprs : _const_expr_lists) { RETURN_IF_ERROR(Expr::prepare(exprs, state, row_desc())); // TODO(zc) // AddExprCtxsToFree(exprs); DCHECK_EQ(exprs.size(), _tuple_desc->slots().size()); } // Prepare result expr lists. for (int i = 0; i < _child_expr_lists.size(); ++i) { RETURN_IF_ERROR(Expr::prepare(_child_expr_lists[i], state, child(i)->row_desc())); // TODO(zc) // AddExprCtxsToFree(_child_expr_lists[i]); DCHECK_EQ(_child_expr_lists[i].size(), _tuple_desc->slots().size()); } return Status::OK(); } Status UnionNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); // open const expr lists. for (const std::vector& exprs : _const_expr_lists) { RETURN_IF_ERROR(Expr::open(exprs, state)); } // open result expr lists. for (const std::vector& exprs : _child_expr_lists) { RETURN_IF_ERROR(Expr::open(exprs, state)); } // Ensures that rows are available for clients to fetch after this open() has // succeeded. if (!_children.empty()) RETURN_IF_ERROR(child(_child_idx)->open(state)); return Status::OK(); } Status UnionNode::get_next_pass_through(RuntimeState* state, RowBatch* row_batch) { DCHECK(!reached_limit()); DCHECK(!is_in_subplan()); DCHECK_LT(_child_idx, _children.size()); DCHECK(is_child_passthrough(_child_idx)); // TODO(zc) // DCHECK(child(_child_idx)->row_desc().LayoutEquals(row_batch->row_desc())); if (_child_eos) { RETURN_IF_ERROR(child(_child_idx)->open(state)); _child_eos = false; } DCHECK_EQ(row_batch->num_rows(), 0); RETURN_IF_ERROR(child(_child_idx)->get_next(state, row_batch, &_child_eos)); if (_child_eos) { // Even though the child is at eos, it's not OK to close() it here. Once we close // the child, the row batches that it produced are invalid. Marking the batch as // needing a deep copy let's us safely close the child in the next get_next() call. // TODO: Remove this as part of IMPALA-4179. row_batch->mark_needs_deep_copy(); _to_close_child_idx = _child_idx; ++_child_idx; } return Status::OK(); } Status UnionNode::get_next_materialized(RuntimeState* state, RowBatch* row_batch) { // Fetch from children, evaluate corresponding exprs and materialize. DCHECK(!reached_limit()); DCHECK_LT(_child_idx, _children.size()); int64_t tuple_buf_size; uint8_t* tuple_buf; RETURN_IF_ERROR( row_batch->resize_and_allocate_tuple_buffer(state, &tuple_buf_size, &tuple_buf)); memset(tuple_buf, 0, tuple_buf_size); while (has_more_materialized() && !row_batch->at_capacity()) { // The loop runs until we are either done iterating over the children that require // materialization, or the row batch is at capacity. DCHECK(!is_child_passthrough(_child_idx)); // Child row batch was either never set or we're moving on to a different child. if (_child_batch.get() == nullptr) { DCHECK_LT(_child_idx, _children.size()); _child_batch.reset(new RowBatch(child(_child_idx)->row_desc(), state->batch_size())); _child_row_idx = 0; // open the current child unless it's the first child, which was already opened in // UnionNode::open(). if (_child_eos) { RETURN_IF_ERROR(child(_child_idx)->open(state)); _child_eos = false; } // The first batch from each child is always fetched here. RETURN_IF_ERROR(child(_child_idx)->get_next(state, _child_batch.get(), &_child_eos)); } while (!row_batch->at_capacity()) { DCHECK(_child_batch.get() != nullptr); DCHECK_LE(_child_row_idx, _child_batch->num_rows()); if (_child_row_idx == _child_batch->num_rows()) { // Move on to the next child if it is at eos. if (_child_eos) break; // Fetch more rows from the child. _child_batch->reset(); _child_row_idx = 0; // All batches except the first batch from each child are fetched here. RETURN_IF_ERROR( child(_child_idx)->get_next(state, _child_batch.get(), &_child_eos)); // If we fetched an empty batch, go back to the beginning of this while loop, and // try again. if (_child_batch->num_rows() == 0) continue; } DCHECK_EQ(_codegend_union_materialize_batch_fns.size(), _children.size()); if (_codegend_union_materialize_batch_fns[_child_idx] == nullptr) { SCOPED_TIMER(_materialize_exprs_evaluate_timer); materialize_batch(row_batch, &tuple_buf); } else { _codegend_union_materialize_batch_fns[_child_idx](this, row_batch, &tuple_buf); } } // It shouldn't be the case that we reached the limit because we shouldn't have // incremented '_num_rows_returned' yet. DCHECK(!reached_limit()); if (_child_eos && _child_row_idx == _child_batch->num_rows()) { // Unless we are inside a subplan expecting to call open()/get_next() on the child // again, the child can be closed at this point. _child_batch.reset(); if (!is_in_subplan()) child(_child_idx)->close(state); ++_child_idx; } else { // If we haven't finished consuming rows from the current child, we must have ended // up here because the row batch is at capacity. DCHECK(row_batch->at_capacity()); } } DCHECK_LE(_child_idx, _children.size()); return Status::OK(); } Status UnionNode::get_next_const(RuntimeState* state, RowBatch* row_batch) { DCHECK_EQ(state->per_fragment_instance_idx(), 0); DCHECK_LT(_const_expr_list_idx, _const_expr_lists.size()); // Create new tuple buffer for row_batch. int64_t tuple_buf_size; uint8_t* tuple_buf; RETURN_IF_ERROR( row_batch->resize_and_allocate_tuple_buffer(state, &tuple_buf_size, &tuple_buf)); memset(tuple_buf, 0, tuple_buf_size); while (_const_expr_list_idx < _const_expr_lists.size() && !row_batch->at_capacity()) { materialize_exprs(_const_expr_lists[_const_expr_list_idx], nullptr, tuple_buf, row_batch); RETURN_IF_ERROR(get_error_msg(_const_expr_lists[_const_expr_list_idx])); tuple_buf += _tuple_desc->byte_size(); ++_const_expr_list_idx; } return Status::OK(); } Status UnionNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); RETURN_IF_CANCELLED(state); if (_to_close_child_idx != -1) { // The previous child needs to be closed if passthrough was enabled for it. In the non // passthrough case, the child was already closed in the previous call to get_next(). DCHECK(is_child_passthrough(_to_close_child_idx)); DCHECK(!is_in_subplan()); child(_to_close_child_idx)->close(state); _to_close_child_idx = -1; } // Save the number of rows in case get_next() is called with a non-empty batch, which can // happen in a subplan. int num_rows_before = row_batch->num_rows(); if (has_more_passthrough()) { RETURN_IF_ERROR(get_next_pass_through(state, row_batch)); } else if (has_more_materialized()) { RETURN_IF_ERROR(get_next_materialized(state, row_batch)); } else if (has_more_const(state)) { RETURN_IF_ERROR(get_next_const(state, row_batch)); } int num_rows_added = row_batch->num_rows() - num_rows_before; DCHECK_GE(num_rows_added, 0); if (_limit != -1 && _num_rows_returned + num_rows_added > _limit) { // Truncate the row batch if we went over the limit. num_rows_added = _limit - _num_rows_returned; row_batch->set_num_rows(num_rows_before + num_rows_added); DCHECK_GE(num_rows_added, 0); } _num_rows_returned += num_rows_added; *eos = reached_limit() || (!has_more_passthrough() && !has_more_materialized() && !has_more_const(state)); COUNTER_SET(_rows_returned_counter, _num_rows_returned); return Status::OK(); } Status UnionNode::close(RuntimeState* state) { if (is_closed()) return Status::OK(); _child_batch.reset(); for (auto& exprs : _const_expr_lists) { Expr::close(exprs, state); } for (auto& exprs : _child_expr_lists) { Expr::close(exprs, state); } return ExecNode::close(state); } void UnionNode::debug_string(int indentation_level, std::stringstream* out) const { *out << string(indentation_level * 2, ' '); *out << "_union(_first_materialized_child_idx=" << _first_materialized_child_idx << " _row_descriptor=[" << row_desc().debug_string() << "] " << " _child_expr_lists=["; for (int i = 0; i < _child_expr_lists.size(); ++i) { *out << Expr::debug_string(_child_expr_lists[i]) << ", "; } *out << "] \n"; ExecNode::debug_string(indentation_level, out); *out << ")" << std::endl; } void UnionNode::materialize_exprs(const std::vector& exprs, TupleRow* row, uint8_t* tuple_buf, RowBatch* dst_batch) { DCHECK(!dst_batch->at_capacity()); Tuple* dst_tuple = reinterpret_cast(tuple_buf); TupleRow* dst_row = dst_batch->get_row(dst_batch->add_row()); // dst_tuple->materialize_exprs(row, *_tuple_desc, exprs, dst_tuple->materialize_exprs(row, *_tuple_desc, exprs, dst_batch->tuple_data_pool(), nullptr, nullptr); dst_row->set_tuple(0, dst_tuple); dst_batch->commit_last_row(); } void UnionNode::materialize_batch(RowBatch* dst_batch, uint8_t** tuple_buf) { // Take all references to member variables out of the loop to reduce the number of // loads and stores. RowBatch* child_batch = _child_batch.get(); int tuple_byte_size = _tuple_desc->byte_size(); uint8_t* cur_tuple = *tuple_buf; const std::vector& child_exprs = _child_expr_lists[_child_idx]; int num_rows_to_process = std::min(child_batch->num_rows() - _child_row_idx, dst_batch->capacity() - dst_batch->num_rows()); FOREACH_ROW_LIMIT(child_batch, _child_row_idx, num_rows_to_process, batch_iter) { TupleRow* child_row = batch_iter.get(); materialize_exprs(child_exprs, child_row, cur_tuple, dst_batch); cur_tuple += tuple_byte_size; } _child_row_idx += num_rows_to_process; *tuple_buf = cur_tuple; } Status UnionNode::get_error_msg(const std::vector& exprs) { for (auto expr_ctx : exprs) { std::string expr_error = expr_ctx->get_error_msg(); if (!expr_error.empty()) { return Status::RuntimeError(expr_error); } } return Status::OK(); } } // namespace doris