[Enchancement](table-function) optimization for vectorized table function (#17973)
This commit is contained in:
@ -43,8 +43,6 @@ Status VTableFunctionNode::init(const TPlanNode& tnode, RuntimeState* state) {
|
||||
_fns.push_back(fn);
|
||||
}
|
||||
_fn_num = _fns.size();
|
||||
_fn_values.resize(_fn_num);
|
||||
_fn_value_lengths.resize(_fn_num);
|
||||
|
||||
// Prepare output slot ids
|
||||
RETURN_IF_ERROR(_prepare_output_slot_ids(tnode));
|
||||
@ -104,6 +102,14 @@ Status VTableFunctionNode::prepare(RuntimeState* state) {
|
||||
}
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < _child_slots.size(); i++) {
|
||||
if (_slot_need_copy(i)) {
|
||||
_output_slot_indexs.push_back(i);
|
||||
} else {
|
||||
_useless_slot_indexs.push_back(i);
|
||||
}
|
||||
}
|
||||
|
||||
_cur_child_offset = -1;
|
||||
|
||||
return Status::OK();
|
||||
@ -121,7 +127,7 @@ Status VTableFunctionNode::get_next(RuntimeState* state, Block* block, bool* eos
|
||||
RETURN_IF_ERROR_AND_CHECK_SPAN(
|
||||
child(0)->get_next_after_projects(
|
||||
state, &_child_block, &_child_eos,
|
||||
std::bind((Status(ExecNode::*)(RuntimeState*, vectorized::Block*, bool*)) &
|
||||
std::bind((Status(ExecNode::*)(RuntimeState*, Block*, bool*)) &
|
||||
ExecNode::get_next,
|
||||
_children[0], std::placeholders::_1, std::placeholders::_2,
|
||||
std::placeholders::_3)),
|
||||
@ -133,11 +139,12 @@ Status VTableFunctionNode::get_next(RuntimeState* state, Block* block, bool* eos
|
||||
return pull(state, block, eos);
|
||||
}
|
||||
|
||||
Status VTableFunctionNode::get_expanded_block(RuntimeState* state, Block* output_block, bool* eos) {
|
||||
Status VTableFunctionNode::_get_expanded_block(RuntimeState* state, Block* output_block,
|
||||
bool* eos) {
|
||||
size_t column_size = _output_slots.size();
|
||||
bool mem_reuse = output_block->mem_reuse();
|
||||
|
||||
std::vector<vectorized::MutableColumnPtr> columns(column_size);
|
||||
std::vector<MutableColumnPtr> columns(column_size);
|
||||
for (size_t i = 0; i < column_size; i++) {
|
||||
if (mem_reuse) {
|
||||
columns[i] = std::move(*output_block->get_by_position(i).column).mutate();
|
||||
@ -146,6 +153,12 @@ Status VTableFunctionNode::get_expanded_block(RuntimeState* state, Block* output
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < _fn_num; i++) {
|
||||
if (columns[i + _child_slots.size()]->is_nullable()) {
|
||||
_fns[i]->set_nullable();
|
||||
}
|
||||
}
|
||||
|
||||
while (columns[_child_slots.size()]->size() < state->batch_size()) {
|
||||
RETURN_IF_CANCELLED(state);
|
||||
RETURN_IF_ERROR(state->check_query_state("VTableFunctionNode, while getting next batch."));
|
||||
@ -158,6 +171,7 @@ Status VTableFunctionNode::get_expanded_block(RuntimeState* state, Block* output
|
||||
while (columns[_child_slots.size()]->size() < state->batch_size()) {
|
||||
int idx = _find_last_fn_eos_idx();
|
||||
if (idx == 0 || skip_child_row) {
|
||||
_copy_output_slots(columns);
|
||||
// all table functions' results are exhausted, process next child row.
|
||||
RETURN_IF_ERROR(_process_next_child_row());
|
||||
if (_cur_child_offset == -1) {
|
||||
@ -175,43 +189,27 @@ Status VTableFunctionNode::get_expanded_block(RuntimeState* state, Block* output
|
||||
if (skip_child_row = _is_inner_and_empty(); skip_child_row) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// get slots from every table function.
|
||||
// notice that _fn_values[i] may be null if the table function has empty result set.
|
||||
for (int i = 0; i < _fn_num; i++) {
|
||||
RETURN_IF_ERROR(_fns[i]->get_value(&_fn_values[i]));
|
||||
RETURN_IF_ERROR(_fns[i]->get_value_length(&_fn_value_lengths[i]));
|
||||
}
|
||||
|
||||
// The tuples order in parent row batch should be
|
||||
// child1, child2, tf1, tf2, ...
|
||||
|
||||
// 1. copy data from child_block.
|
||||
for (int i = 0; i < _child_slots.size(); i++) {
|
||||
if (!slot_need_copy(i)) {
|
||||
columns[i]->insert_default();
|
||||
continue;
|
||||
if (_fn_num == 1) {
|
||||
_current_row_insert_times += _fns[0]->get_value(
|
||||
columns[_child_slots.size()],
|
||||
state->batch_size() - columns[_child_slots.size()]->size());
|
||||
} else {
|
||||
for (int i = 0; i < _fn_num; i++) {
|
||||
_fns[i]->get_value(columns[i + _child_slots.size()]);
|
||||
}
|
||||
auto src_column = _child_block.get_by_position(i).column;
|
||||
columns[i]->insert_from(*src_column, _cur_child_offset);
|
||||
_current_row_insert_times++;
|
||||
_fns[_fn_num - 1]->forward();
|
||||
}
|
||||
|
||||
// 2. copy function result
|
||||
for (int i = 0; i < _fns.size(); i++) {
|
||||
int output_slot_idx = i + _child_slots.size();
|
||||
if (_fn_values[i] == nullptr) {
|
||||
columns[output_slot_idx]->insert_default();
|
||||
} else {
|
||||
columns[output_slot_idx]->insert_data(reinterpret_cast<char*>(_fn_values[i]),
|
||||
_fn_value_lengths[i]);
|
||||
}
|
||||
}
|
||||
|
||||
bool tmp = false;
|
||||
_fns[_fn_num - 1]->forward(&tmp);
|
||||
}
|
||||
}
|
||||
|
||||
_copy_output_slots(columns);
|
||||
|
||||
size_t row_size = columns[_child_slots.size()]->size();
|
||||
for (auto index : _useless_slot_indexs) {
|
||||
columns[index]->insert_many_defaults(row_size - columns[index]->size());
|
||||
}
|
||||
|
||||
if (!columns.empty() && !columns[0]->empty()) {
|
||||
auto n_columns = 0;
|
||||
if (!mem_reuse) {
|
||||
@ -292,11 +290,10 @@ int VTableFunctionNode::_find_last_fn_eos_idx() {
|
||||
// If `last_eos_idx` is 1, which means f2 and f3 are eos.
|
||||
// So we need to forward f1, and reset f2 and f3.
|
||||
bool VTableFunctionNode::_roll_table_functions(int last_eos_idx) {
|
||||
bool fn_eos = false;
|
||||
int i = last_eos_idx - 1;
|
||||
for (; i >= 0; --i) {
|
||||
_fns[i]->forward(&fn_eos);
|
||||
if (!fn_eos) {
|
||||
_fns[i]->forward();
|
||||
if (!_fns[i]->eos()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user