[fix](lateral-view) Fix some lateral view bugs (#7772)
1. Fix bug that BE may crash when input node of TableFunctionNode has non-null column 2. Fix bug that TableFunctionNode may not return all results
This commit is contained in:
@ -105,6 +105,7 @@ Status TableFunctionNode::open(RuntimeState* state) {
|
||||
|
||||
Status TableFunctionNode::_process_next_child_row() {
|
||||
if (_cur_child_offset == _cur_child_batch->num_rows()) {
|
||||
_cur_child_batch->reset();
|
||||
_child_batch_exhausted = true;
|
||||
return Status::OK();
|
||||
}
|
||||
@ -205,15 +206,20 @@ Status TableFunctionNode::get_next(RuntimeState* state, RowBatch* row_batch, boo
|
||||
_cur_child_batch.reset(new RowBatch(child_rowdesc, state->batch_size(), mem_tracker().get()));
|
||||
}
|
||||
if (_child_batch_exhausted) {
|
||||
if (_child_eos) {
|
||||
// current child batch is exhausted, and no more batch from child node
|
||||
break;
|
||||
}
|
||||
// current child batch is exhausted, get next batch from child
|
||||
RETURN_IF_ERROR(_children[0]->get_next(state, _cur_child_batch.get(), eos));
|
||||
if (*eos) {
|
||||
RETURN_IF_ERROR(_children[0]->get_next(state, _cur_child_batch.get(), &_child_eos));
|
||||
if (_cur_child_batch->num_rows() == 0) {
|
||||
// no more batch from child node
|
||||
break;
|
||||
}
|
||||
|
||||
_cur_child_offset = 0;
|
||||
RETURN_IF_ERROR(_process_next_child_row());
|
||||
if (_child_batch_exhausted) {
|
||||
_cur_child_batch->reset();
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@ -258,11 +264,11 @@ Status TableFunctionNode::get_next(RuntimeState* state, RowBatch* row_batch, boo
|
||||
TupleDescriptor* child_tuple_desc = child_rowdesc.tuple_descriptors()[tuple_idx];
|
||||
TupleDescriptor* parent_tuple_desc = parent_rowdesc.tuple_descriptors()[tuple_idx];
|
||||
|
||||
Tuple* child_tuple = _cur_child_tuple_row->get_tuple(child_rowdesc.get_tuple_idx(child_tuple_desc->id()));
|
||||
for (int j = 0; j < _child_slot_sizes[i]; ++j) {
|
||||
SlotDescriptor* child_slot_desc = child_tuple_desc->slots()[j];
|
||||
SlotDescriptor* parent_slot_desc = parent_tuple_desc->slots()[j];
|
||||
|
||||
Tuple* child_tuple = _cur_child_tuple_row->get_tuple(child_rowdesc.get_tuple_idx(child_tuple_desc->id()));
|
||||
if (_output_slot_ids[parent_slot_desc->id()] && !child_tuple->is_null(child_slot_desc->null_indicator_offset())) {
|
||||
// only write child slot if it is selected and not null.
|
||||
void* dest_slot = tuple_ptr->get_slot(parent_slot_desc->tuple_offset());
|
||||
@ -273,7 +279,7 @@ Status TableFunctionNode::get_next(RuntimeState* state, RowBatch* row_batch, boo
|
||||
}
|
||||
}
|
||||
parent_tuple_row->set_tuple(tuple_idx, tuple_ptr);
|
||||
tuple_ptr = reinterpret_cast<Tuple*>(reinterpret_cast<uint8_t*>(tuple_ptr) + child_tuple_desc->byte_size());
|
||||
tuple_ptr = reinterpret_cast<Tuple*>(reinterpret_cast<uint8_t*>(tuple_ptr) + parent_tuple_desc->byte_size());
|
||||
}
|
||||
|
||||
// 2. copy function result
|
||||
@ -288,7 +294,6 @@ Status TableFunctionNode::get_next(RuntimeState* state, RowBatch* row_batch, boo
|
||||
tuple_ptr->set_null(parent_slot_desc->null_indicator_offset());
|
||||
}
|
||||
parent_tuple_row->set_tuple(tuple_idx, tuple_ptr);
|
||||
|
||||
tuple_ptr = reinterpret_cast<Tuple*>(reinterpret_cast<uint8_t*>(tuple_ptr) + parent_tuple_desc->byte_size());
|
||||
}
|
||||
|
||||
@ -307,7 +312,6 @@ Status TableFunctionNode::get_next(RuntimeState* state, RowBatch* row_batch, boo
|
||||
_fns[_fn_num - 1]->forward(&tmp);
|
||||
|
||||
if (row_batch->at_capacity()) {
|
||||
*eos = false;
|
||||
break;
|
||||
}
|
||||
} // end while true
|
||||
@ -315,13 +319,16 @@ Status TableFunctionNode::get_next(RuntimeState* state, RowBatch* row_batch, boo
|
||||
if (row_batch->at_capacity()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} // end while cur_eos
|
||||
|
||||
if (reached_limit()) {
|
||||
int num_rows_over = _num_rows_returned - _limit;
|
||||
row_batch->set_num_rows(row_batch->num_rows() - num_rows_over);
|
||||
_num_rows_returned -= num_rows_over;
|
||||
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
|
||||
*eos = true;
|
||||
} else {
|
||||
*eos = row_batch->num_rows() == 0;
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
|
||||
@ -72,6 +72,8 @@ private:
|
||||
int _parent_tuple_desc_size = -1;
|
||||
int _child_tuple_desc_size = -1;
|
||||
std::vector<int> _child_slot_sizes;
|
||||
// indicate if child node reach the end
|
||||
bool _child_eos = false;
|
||||
};
|
||||
|
||||
}; // namespace doris
|
||||
|
||||
@ -676,4 +676,4 @@ void PlanFragmentExecutor::close() {
|
||||
_closed = true;
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
} // namespace doris
|
||||
|
||||
@ -21,8 +21,10 @@ import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.LateralViewRef;
|
||||
import org.apache.doris.analysis.SelectStmt;
|
||||
import org.apache.doris.analysis.SlotDescriptor;
|
||||
import org.apache.doris.analysis.SlotId;
|
||||
import org.apache.doris.analysis.SlotRef;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.analysis.TupleId;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.UserException;
|
||||
@ -107,6 +109,20 @@ public class TableFunctionNode extends PlanNode {
|
||||
for (SlotRef slotRef : outputSlotRef) {
|
||||
outputSlotIds.add(slotRef.getSlotId());
|
||||
}
|
||||
|
||||
// For all other slots from input node which are not in outputSlotIds,
|
||||
// set them as nullable, so that we can set them to null in TableFunctionNode
|
||||
// TODO(cmy): This should be done with a ProjectionNode
|
||||
PlanNode inputNode = getChild(0);
|
||||
List<TupleId> inputTupleIds = inputNode.getTupleIds();
|
||||
for (TupleId tupleId : inputTupleIds) {
|
||||
TupleDescriptor td = analyzer.getTupleDesc(tupleId);
|
||||
for (SlotDescriptor sd : td.getSlots()) {
|
||||
if (!outputSlotIds.contains(sd.getId())) {
|
||||
sd.setIsNullable(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -179,3 +195,4 @@ public class TableFunctionNode extends PlanNode {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user