[Bug](pipeline) Fix bugs to pass TPCDS cases (#15194)

This commit is contained in:
Gabriel
2022-12-20 22:29:55 +08:00
committed by GitHub
parent 5c35f02bdb
commit 732417258c
241 changed files with 18082 additions and 59 deletions

View File

@ -70,7 +70,6 @@ Status VTableFunctionNode::prepare(RuntimeState* state) {
}
}
_child_block.reset(new Block());
_cur_child_offset = -1;
return Status::OK();
@ -85,17 +84,17 @@ Status VTableFunctionNode::get_next(RuntimeState* state, Block* block, bool* eos
// if child_block is empty, get data from child.
if (need_more_input_data()) {
while (_child_block->rows() == 0 && !_child_eos) {
while (_child_block.rows() == 0 && !_child_eos) {
RETURN_IF_ERROR_AND_CHECK_SPAN(
child(0)->get_next_after_projects(state, _child_block.get(), &_child_eos),
child(0)->get_next_after_projects(state, &_child_block, &_child_eos),
child(0)->get_next_span(), _child_eos);
}
if (_child_eos && _child_block->rows() == 0) {
if (_child_eos && _child_block.rows() == 0) {
*eos = true;
return Status::OK();
}
push(state, _child_block.get(), *eos);
push(state, &_child_block, *eos);
}
pull(state, block, eos);
@ -104,8 +103,6 @@ Status VTableFunctionNode::get_next(RuntimeState* state, Block* block, bool* eos
}
Status VTableFunctionNode::get_expanded_block(RuntimeState* state, Block* output_block, bool* eos) {
DCHECK(_child_block != nullptr);
size_t column_size = _output_slots.size();
bool mem_reuse = output_block->mem_reuse();
@ -122,7 +119,7 @@ Status VTableFunctionNode::get_expanded_block(RuntimeState* state, Block* output
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(state->check_query_state("VTableFunctionNode, while getting next batch."));
if (_child_block->rows() == 0) {
if (_child_block.rows() == 0) {
break;
}
@ -164,7 +161,7 @@ Status VTableFunctionNode::get_expanded_block(RuntimeState* state, Block* output
columns[i]->insert_default();
continue;
}
auto src_column = _child_block->get_by_position(i).column;
auto src_column = _child_block.get_by_position(i).column;
columns[i]->insert_from(*src_column, _cur_child_offset);
}
@ -207,13 +204,13 @@ Status VTableFunctionNode::get_expanded_block(RuntimeState* state, Block* output
Status VTableFunctionNode::_process_next_child_row() {
_cur_child_offset++;
if (_cur_child_offset >= _child_block->rows()) {
if (_cur_child_offset >= _child_block.rows()) {
// release block use count.
for (TableFunction* fn : _fns) {
RETURN_IF_ERROR(fn->process_close());
}
release_block_memory(*_child_block);
release_block_memory(_child_block);
_cur_child_offset = -1;
return Status::OK();
}