From efb4e189df4646695c31b5de6aeb581fe8318a2b Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Tue, 18 Jan 2022 12:09:32 +0800 Subject: [PATCH] [fix](lateral-view) Fix some lateral view bugs (#7772) 1. Fix bug that BE may crash when input node of TableFunctionNode has non-null column 2. Fix bug that TableFunctionNode may not return all results --- be/src/exec/table_function_node.cpp | 23 ++++++++++++------- be/src/exec/table_function_node.h | 2 ++ be/src/runtime/plan_fragment_executor.cpp | 2 +- .../doris/planner/TableFunctionNode.java | 17 ++++++++++++++ 4 files changed, 35 insertions(+), 9 deletions(-) diff --git a/be/src/exec/table_function_node.cpp b/be/src/exec/table_function_node.cpp index 22010c413a..6eac8eb824 100644 --- a/be/src/exec/table_function_node.cpp +++ b/be/src/exec/table_function_node.cpp @@ -105,6 +105,7 @@ Status TableFunctionNode::open(RuntimeState* state) { Status TableFunctionNode::_process_next_child_row() { if (_cur_child_offset == _cur_child_batch->num_rows()) { + _cur_child_batch->reset(); _child_batch_exhausted = true; return Status::OK(); } @@ -205,15 +206,20 @@ Status TableFunctionNode::get_next(RuntimeState* state, RowBatch* row_batch, boo _cur_child_batch.reset(new RowBatch(child_rowdesc, state->batch_size(), mem_tracker().get())); } if (_child_batch_exhausted) { + if (_child_eos) { + // current child batch is exhausted, and no more batch from child node + break; + } // current child batch is exhausted, get next batch from child - RETURN_IF_ERROR(_children[0]->get_next(state, _cur_child_batch.get(), eos)); - if (*eos) { + RETURN_IF_ERROR(_children[0]->get_next(state, _cur_child_batch.get(), &_child_eos)); + if (_cur_child_batch->num_rows() == 0) { + // no more batch from child node break; } + _cur_child_offset = 0; RETURN_IF_ERROR(_process_next_child_row()); if (_child_batch_exhausted) { - _cur_child_batch->reset(); continue; } } @@ -258,11 +264,11 @@ Status TableFunctionNode::get_next(RuntimeState* state, RowBatch* row_batch, boo TupleDescriptor* child_tuple_desc = child_rowdesc.tuple_descriptors()[tuple_idx]; TupleDescriptor* parent_tuple_desc = parent_rowdesc.tuple_descriptors()[tuple_idx]; + Tuple* child_tuple = _cur_child_tuple_row->get_tuple(child_rowdesc.get_tuple_idx(child_tuple_desc->id())); for (int j = 0; j < _child_slot_sizes[i]; ++j) { SlotDescriptor* child_slot_desc = child_tuple_desc->slots()[j]; SlotDescriptor* parent_slot_desc = parent_tuple_desc->slots()[j]; - Tuple* child_tuple = _cur_child_tuple_row->get_tuple(child_rowdesc.get_tuple_idx(child_tuple_desc->id())); if (_output_slot_ids[parent_slot_desc->id()] && !child_tuple->is_null(child_slot_desc->null_indicator_offset())) { // only write child slot if it is selected and not null. void* dest_slot = tuple_ptr->get_slot(parent_slot_desc->tuple_offset()); @@ -273,7 +279,7 @@ Status TableFunctionNode::get_next(RuntimeState* state, RowBatch* row_batch, boo } } parent_tuple_row->set_tuple(tuple_idx, tuple_ptr); - tuple_ptr = reinterpret_cast(reinterpret_cast(tuple_ptr) + child_tuple_desc->byte_size()); + tuple_ptr = reinterpret_cast(reinterpret_cast(tuple_ptr) + parent_tuple_desc->byte_size()); } // 2. copy function result @@ -288,7 +294,6 @@ Status TableFunctionNode::get_next(RuntimeState* state, RowBatch* row_batch, boo tuple_ptr->set_null(parent_slot_desc->null_indicator_offset()); } parent_tuple_row->set_tuple(tuple_idx, tuple_ptr); - tuple_ptr = reinterpret_cast(reinterpret_cast(tuple_ptr) + parent_tuple_desc->byte_size()); } @@ -307,7 +312,6 @@ Status TableFunctionNode::get_next(RuntimeState* state, RowBatch* row_batch, boo _fns[_fn_num - 1]->forward(&tmp); if (row_batch->at_capacity()) { - *eos = false; break; } } // end while true @@ -315,13 +319,16 @@ Status TableFunctionNode::get_next(RuntimeState* state, RowBatch* row_batch, boo if (row_batch->at_capacity()) { break; } - } + } // end while cur_eos + if (reached_limit()) { int num_rows_over = _num_rows_returned - _limit; row_batch->set_num_rows(row_batch->num_rows() - num_rows_over); _num_rows_returned -= num_rows_over; COUNTER_SET(_rows_returned_counter, _num_rows_returned); *eos = true; + } else { + *eos = row_batch->num_rows() == 0; } return Status::OK(); diff --git a/be/src/exec/table_function_node.h b/be/src/exec/table_function_node.h index f91577e7d3..daf4d91666 100644 --- a/be/src/exec/table_function_node.h +++ b/be/src/exec/table_function_node.h @@ -72,6 +72,8 @@ private: int _parent_tuple_desc_size = -1; int _child_tuple_desc_size = -1; std::vector _child_slot_sizes; + // indicate if child node reach the end + bool _child_eos = false; }; }; // namespace doris diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index fe64a4faf3..b878e17b87 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -676,4 +676,4 @@ void PlanFragmentExecutor::close() { _closed = true; } -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/TableFunctionNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/TableFunctionNode.java index c975fc0af4..ad81f60c4e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/TableFunctionNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/TableFunctionNode.java @@ -21,8 +21,10 @@ import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.LateralViewRef; import org.apache.doris.analysis.SelectStmt; +import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.SlotId; import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.analysis.TupleId; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; @@ -107,6 +109,20 @@ public class TableFunctionNode extends PlanNode { for (SlotRef slotRef : outputSlotRef) { outputSlotIds.add(slotRef.getSlotId()); } + + // For all other slots from input node which are not in outputSlotIds, + // set them as nullable, so that we can set them to null in TableFunctionNode + // TODO(cmy): This should be done with a ProjectionNode + PlanNode inputNode = getChild(0); + List inputTupleIds = inputNode.getTupleIds(); + for (TupleId tupleId : inputTupleIds) { + TupleDescriptor td = analyzer.getTupleDesc(tupleId); + for (SlotDescriptor sd : td.getSlots()) { + if (!outputSlotIds.contains(sd.getId())) { + sd.setIsNullable(true); + } + } + } } @Override @@ -179,3 +195,4 @@ public class TableFunctionNode extends PlanNode { } } } +