From 664fbffcbaf861d8ce8b1e9211c5dfc8c01bf130 Mon Sep 17 00:00:00 2001 From: Pxl Date: Wed, 29 Mar 2023 10:45:00 +0800 Subject: [PATCH] [Enchancement](table-function) optimization for vectorized table function (#17973) --- be/src/vec/exec/vtable_function_node.cpp | 77 +++++++++--------- be/src/vec/exec/vtable_function_node.h | 31 +++++--- .../vec/exprs/table_function/table_function.h | 60 +++++++------- .../table_function/table_function_factory.h | 3 +- be/src/vec/exprs/table_function/vexplode.cpp | 53 +++---------- be/src/vec/exprs/table_function/vexplode.h | 12 ++- .../exprs/table_function/vexplode_bitmap.cpp | 78 +++++++------------ .../exprs/table_function/vexplode_bitmap.h | 10 +-- .../table_function/vexplode_json_array.cpp | 40 +++------- .../table_function/vexplode_json_array.h | 35 +++------ .../exprs/table_function/vexplode_numbers.cpp | 74 ++++++++++-------- .../exprs/table_function/vexplode_numbers.h | 34 ++++++-- .../exprs/table_function/vexplode_split.cpp | 40 ++-------- .../vec/exprs/table_function/vexplode_split.h | 6 +- be/test/vec/function/function_test_util.cpp | 24 ++---- .../test_varchar_schema_change.groovy | 12 +-- 16 files changed, 250 insertions(+), 339 deletions(-) diff --git a/be/src/vec/exec/vtable_function_node.cpp b/be/src/vec/exec/vtable_function_node.cpp index 32cfee7234..39ffb4fad7 100644 --- a/be/src/vec/exec/vtable_function_node.cpp +++ b/be/src/vec/exec/vtable_function_node.cpp @@ -43,8 +43,6 @@ Status VTableFunctionNode::init(const TPlanNode& tnode, RuntimeState* state) { _fns.push_back(fn); } _fn_num = _fns.size(); - _fn_values.resize(_fn_num); - _fn_value_lengths.resize(_fn_num); // Prepare output slot ids RETURN_IF_ERROR(_prepare_output_slot_ids(tnode)); @@ -104,6 +102,14 @@ Status VTableFunctionNode::prepare(RuntimeState* state) { } } + for (size_t i = 0; i < _child_slots.size(); i++) { + if (_slot_need_copy(i)) { + _output_slot_indexs.push_back(i); + } else { + _useless_slot_indexs.push_back(i); + } + } + _cur_child_offset = -1; return Status::OK(); @@ -121,7 +127,7 @@ Status VTableFunctionNode::get_next(RuntimeState* state, Block* block, bool* eos RETURN_IF_ERROR_AND_CHECK_SPAN( child(0)->get_next_after_projects( state, &_child_block, &_child_eos, - std::bind((Status(ExecNode::*)(RuntimeState*, vectorized::Block*, bool*)) & + std::bind((Status(ExecNode::*)(RuntimeState*, Block*, bool*)) & ExecNode::get_next, _children[0], std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)), @@ -133,11 +139,12 @@ Status VTableFunctionNode::get_next(RuntimeState* state, Block* block, bool* eos return pull(state, block, eos); } -Status VTableFunctionNode::get_expanded_block(RuntimeState* state, Block* output_block, bool* eos) { +Status VTableFunctionNode::_get_expanded_block(RuntimeState* state, Block* output_block, + bool* eos) { size_t column_size = _output_slots.size(); bool mem_reuse = output_block->mem_reuse(); - std::vector columns(column_size); + std::vector columns(column_size); for (size_t i = 0; i < column_size; i++) { if (mem_reuse) { columns[i] = std::move(*output_block->get_by_position(i).column).mutate(); @@ -146,6 +153,12 @@ Status VTableFunctionNode::get_expanded_block(RuntimeState* state, Block* output } } + for (int i = 0; i < _fn_num; i++) { + if (columns[i + _child_slots.size()]->is_nullable()) { + _fns[i]->set_nullable(); + } + } + while (columns[_child_slots.size()]->size() < state->batch_size()) { RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(state->check_query_state("VTableFunctionNode, while getting next batch.")); @@ -158,6 +171,7 @@ Status VTableFunctionNode::get_expanded_block(RuntimeState* state, Block* output while (columns[_child_slots.size()]->size() < state->batch_size()) { int idx = _find_last_fn_eos_idx(); if (idx == 0 || skip_child_row) { + _copy_output_slots(columns); // all table functions' results are exhausted, process next child row. RETURN_IF_ERROR(_process_next_child_row()); if (_cur_child_offset == -1) { @@ -175,43 +189,27 @@ Status VTableFunctionNode::get_expanded_block(RuntimeState* state, Block* output if (skip_child_row = _is_inner_and_empty(); skip_child_row) { continue; } - - // get slots from every table function. - // notice that _fn_values[i] may be null if the table function has empty result set. - for (int i = 0; i < _fn_num; i++) { - RETURN_IF_ERROR(_fns[i]->get_value(&_fn_values[i])); - RETURN_IF_ERROR(_fns[i]->get_value_length(&_fn_value_lengths[i])); - } - - // The tuples order in parent row batch should be - // child1, child2, tf1, tf2, ... - - // 1. copy data from child_block. - for (int i = 0; i < _child_slots.size(); i++) { - if (!slot_need_copy(i)) { - columns[i]->insert_default(); - continue; + if (_fn_num == 1) { + _current_row_insert_times += _fns[0]->get_value( + columns[_child_slots.size()], + state->batch_size() - columns[_child_slots.size()]->size()); + } else { + for (int i = 0; i < _fn_num; i++) { + _fns[i]->get_value(columns[i + _child_slots.size()]); } - auto src_column = _child_block.get_by_position(i).column; - columns[i]->insert_from(*src_column, _cur_child_offset); + _current_row_insert_times++; + _fns[_fn_num - 1]->forward(); } - - // 2. copy function result - for (int i = 0; i < _fns.size(); i++) { - int output_slot_idx = i + _child_slots.size(); - if (_fn_values[i] == nullptr) { - columns[output_slot_idx]->insert_default(); - } else { - columns[output_slot_idx]->insert_data(reinterpret_cast(_fn_values[i]), - _fn_value_lengths[i]); - } - } - - bool tmp = false; - _fns[_fn_num - 1]->forward(&tmp); } } + _copy_output_slots(columns); + + size_t row_size = columns[_child_slots.size()]->size(); + for (auto index : _useless_slot_indexs) { + columns[index]->insert_many_defaults(row_size - columns[index]->size()); + } + if (!columns.empty() && !columns[0]->empty()) { auto n_columns = 0; if (!mem_reuse) { @@ -292,11 +290,10 @@ int VTableFunctionNode::_find_last_fn_eos_idx() { // If `last_eos_idx` is 1, which means f2 and f3 are eos. // So we need to forward f1, and reset f2 and f3. bool VTableFunctionNode::_roll_table_functions(int last_eos_idx) { - bool fn_eos = false; int i = last_eos_idx - 1; for (; i >= 0; --i) { - _fns[i]->forward(&fn_eos); - if (!fn_eos) { + _fns[i]->forward(); + if (!_fns[i]->eos()) { break; } } diff --git a/be/src/vec/exec/vtable_function_node.h b/be/src/vec/exec/vtable_function_node.h index 99d2394514..2aad138f4a 100644 --- a/be/src/vec/exec/vtable_function_node.h +++ b/be/src/vec/exec/vtable_function_node.h @@ -39,7 +39,7 @@ public: bool need_more_input_data() const { return !_child_block.rows() && !_child_eos; } void release_resource(doris::RuntimeState* state) override { - vectorized::VExpr::close(_vfn_ctxs, state); + VExpr::close(_vfn_ctxs, state); if (_num_rows_filtered_counter != nullptr) { COUNTER_SET(_num_rows_filtered_counter, static_cast(_num_rows_filtered)); @@ -47,7 +47,7 @@ public: ExecNode::release_resource(state); } - Status push(RuntimeState*, vectorized::Block* input_block, bool eos) override { + Status push(RuntimeState*, Block* input_block, bool eos) override { _child_eos = eos; if (input_block->rows() == 0) { return Status::OK(); @@ -60,8 +60,8 @@ public: return Status::OK(); } - Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override { - RETURN_IF_ERROR(get_expanded_block(state, output_block, eos)); + Status pull(RuntimeState* state, Block* output_block, bool* eos) override { + RETURN_IF_ERROR(_get_expanded_block(state, output_block, eos)); reached_limit(output_block, eos); return Status::OK(); } @@ -97,26 +97,39 @@ private: 1. FE: create a new output tuple based on the real output slots; 2. BE: refractor (V)TableFunctionNode output rows based no the new tuple; */ - inline bool slot_need_copy(SlotId slot_id) const { + inline bool _slot_need_copy(SlotId slot_id) const { auto id = _output_slots[slot_id]->id(); return (id < _output_slot_ids.size()) && (_output_slot_ids[id]); } - Status get_expanded_block(RuntimeState* state, Block* output_block, bool* eos); + Status _get_expanded_block(RuntimeState* state, Block* output_block, bool* eos); + + void _copy_output_slots(std::vector& columns) { + if (!_current_row_insert_times) { + return; + } + for (auto index : _output_slot_indexs) { + auto src_column = _child_block.get_by_position(index).column; + columns[index]->insert_many_from(*src_column, _cur_child_offset, + _current_row_insert_times); + } + _current_row_insert_times = 0; + } + int _current_row_insert_times = 0; Block _child_block; std::vector _child_slots; std::vector _output_slots; int64_t _cur_child_offset = 0; - std::vector _vfn_ctxs; + std::vector _vfn_ctxs; std::vector _fns; - std::vector _fn_values; - std::vector _fn_value_lengths; int _fn_num = 0; std::vector _output_slot_ids; + std::vector _output_slot_indexs; + std::vector _useless_slot_indexs; std::vector _child_slot_sizes; // indicate if child node reach the end diff --git a/be/src/vec/exprs/table_function/table_function.h b/be/src/vec/exprs/table_function/table_function.h index 68e6829df5..7fff88899b 100644 --- a/be/src/vec/exprs/table_function/table_function.h +++ b/be/src/vec/exprs/table_function/table_function.h @@ -30,44 +30,49 @@ constexpr auto COMBINATOR_SUFFIX_OUTER = "_outer"; class TableFunction { public: - virtual ~TableFunction() {} + virtual ~TableFunction() = default; virtual Status prepare() { return Status::OK(); } virtual Status open() { return Status::OK(); } - // only used for vectorized. - virtual Status process_init(vectorized::Block* block) = 0; + virtual Status process_init(Block* block) = 0; - // only used for vectorized. - virtual Status process_row(size_t row_idx) = 0; + virtual Status process_row(size_t row_idx) { + _cur_size = 0; + return reset(); + } // only used for vectorized. virtual Status process_close() = 0; - virtual Status reset() = 0; - - virtual Status get_value(void** output) = 0; - - // only used for vectorized. - virtual Status get_value_length(int64_t* length) { - *length = -1; + virtual Status reset() { + _eos = false; + _cur_offset = 0; return Status::OK(); } + virtual void get_value(MutableColumnPtr& column) = 0; + + virtual int get_value(MutableColumnPtr& column, int max_step) { + max_step = std::max(1, std::min(max_step, (int)(_cur_size - _cur_offset))); + int i = 0; + for (; i < max_step && !eos(); i++) { + get_value(column); + forward(); + } + return i; + } + virtual Status close() { return Status::OK(); } - virtual Status forward(bool* eos) { - if (_is_current_empty) { - *eos = true; + virtual Status forward(int step = 1) { + if (current_empty()) { _eos = true; } else { - ++_cur_offset; - if (_cur_offset == _cur_size) { - *eos = true; + _cur_offset += step; + if (_cur_offset >= _cur_size) { _eos = true; - } else { - *eos = false; } } return Status::OK(); @@ -76,9 +81,8 @@ public: std::string name() const { return _fn_name; } bool eos() const { return _eos; } - void set_vexpr_context(vectorized::VExprContext* vexpr_context) { - _vexpr_context = vexpr_context; - } + void set_vexpr_context(VExprContext* vexpr_context) { _vexpr_context = vexpr_context; } + void set_nullable() { _is_nullable = true; } bool is_outer() const { return _is_outer; } void set_outer() { @@ -89,21 +93,21 @@ public: _fn_name += COMBINATOR_SUFFIX_OUTER; } - bool current_empty() const { return _is_current_empty; } + bool current_empty() const { return _cur_size == 0; } protected: std::string _fn_name; - vectorized::VExprContext* _vexpr_context = nullptr; + VExprContext* _vexpr_context = nullptr; // true if there is no more data can be read from this function. bool _eos = false; - // true means the function result set from current row is empty(eg, source value is null or empty). - // so that when calling reset(), we can do nothing and keep eos as false. - bool _is_current_empty = false; // the position of current cursor int64_t _cur_offset = 0; // the size of current result int64_t _cur_size = 0; // set _is_outer to false for explode function, and should not return tuple while array is null or empty bool _is_outer = false; + + bool _is_nullable = false; + bool _is_const = false; }; } // namespace doris::vectorized diff --git a/be/src/vec/exprs/table_function/table_function_factory.h b/be/src/vec/exprs/table_function/table_function_factory.h index 456dba0ddc..d299af0a9c 100644 --- a/be/src/vec/exprs/table_function/table_function_factory.h +++ b/be/src/vec/exprs/table_function/table_function_factory.h @@ -36,8 +36,7 @@ namespace vectorized { class TableFunction; class TableFunctionFactory { public: - TableFunctionFactory() {} - ~TableFunctionFactory() {} + TableFunctionFactory() = delete; static Status get_fn(const std::string& fn_name_raw, ObjectPool* pool, TableFunction** fn); const static std::unordered_map> _function_map; diff --git a/be/src/vec/exprs/table_function/vexplode.cpp b/be/src/vec/exprs/table_function/vexplode.cpp index aa0249916a..b2c93196a8 100644 --- a/be/src/vec/exprs/table_function/vexplode.cpp +++ b/be/src/vec/exprs/table_function/vexplode.cpp @@ -26,7 +26,7 @@ VExplodeTableFunction::VExplodeTableFunction() { _fn_name = "vexplode"; } -Status VExplodeTableFunction::process_init(vectorized::Block* block) { +Status VExplodeTableFunction::process_init(Block* block) { CHECK(_vexpr_context->root()->children().size() == 1) << "VExplodeTableFunction only support 1 child but has " << _vexpr_context->root()->children().size(); @@ -48,17 +48,12 @@ Status VExplodeTableFunction::process_init(vectorized::Block* block) { Status VExplodeTableFunction::process_row(size_t row_idx) { DCHECK(row_idx < _array_column->size()); - _is_current_empty = false; - _eos = false; - _cur_offset = 0; - _array_offset = (*_detail.offsets_ptr)[row_idx - 1]; - _cur_size = (*_detail.offsets_ptr)[row_idx] - _array_offset; + RETURN_IF_ERROR(TableFunction::process_row(row_idx)); - // array is NULL, or array is empty - if (_cur_size == 0 || (_detail.array_nullmap_data && _detail.array_nullmap_data[row_idx])) { - _is_current_empty = true; + if (!_detail.array_nullmap_data || !_detail.array_nullmap_data[row_idx]) { + _array_offset = (*_detail.offsets_ptr)[row_idx - 1]; + _cur_size = (*_detail.offsets_ptr)[row_idx] - _array_offset; } - return Status::OK(); } @@ -69,42 +64,14 @@ Status VExplodeTableFunction::process_close() { return Status::OK(); } -Status VExplodeTableFunction::reset() { - _eos = false; - _cur_offset = 0; - return Status::OK(); -} - -Status VExplodeTableFunction::get_value(void** output) { - if (_is_current_empty) { - *output = nullptr; - return Status::OK(); - } - +void VExplodeTableFunction::get_value(MutableColumnPtr& column) { size_t pos = _array_offset + _cur_offset; - if (_detail.nested_nullmap_data && _detail.nested_nullmap_data[pos]) { - *output = nullptr; + if (current_empty() || (_detail.nested_nullmap_data && _detail.nested_nullmap_data[pos])) { + column->insert_default(); } else { - *output = const_cast(_detail.nested_col->get_data_at(pos).data); + column->insert_data(const_cast(_detail.nested_col->get_data_at(pos).data), + _detail.nested_col->get_data_at(pos).size); } - - return Status::OK(); -} - -Status VExplodeTableFunction::get_value_length(int64_t* length) { - if (_is_current_empty) { - *length = -1; - return Status::OK(); - } - - size_t pos = _array_offset + _cur_offset; - if (_detail.nested_nullmap_data && _detail.nested_nullmap_data[pos]) { - *length = 0; - } else { - *length = _detail.nested_col->get_data_at(pos).size; - } - - return Status::OK(); } } // namespace doris::vectorized diff --git a/be/src/vec/exprs/table_function/vexplode.h b/be/src/vec/exprs/table_function/vexplode.h index 3bc8ba9ef0..8911c8f225 100644 --- a/be/src/vec/exprs/table_function/vexplode.h +++ b/be/src/vec/exprs/table_function/vexplode.h @@ -30,14 +30,12 @@ class VExplodeTableFunction : public TableFunction { public: VExplodeTableFunction(); - virtual ~VExplodeTableFunction() = default; + ~VExplodeTableFunction() override = default; - virtual Status process_init(vectorized::Block* block) override; - virtual Status process_row(size_t row_idx) override; - virtual Status process_close() override; - virtual Status reset() override; - virtual Status get_value(void** output) override; - virtual Status get_value_length(int64_t* length) override; + Status process_init(Block* block) override; + Status process_row(size_t row_idx) override; + Status process_close() override; + void get_value(MutableColumnPtr& column) override; private: ColumnPtr _array_column; diff --git a/be/src/vec/exprs/table_function/vexplode_bitmap.cpp b/be/src/vec/exprs/table_function/vexplode_bitmap.cpp index f680922e7c..8dabbac8c3 100644 --- a/be/src/vec/exprs/table_function/vexplode_bitmap.cpp +++ b/be/src/vec/exprs/table_function/vexplode_bitmap.cpp @@ -19,6 +19,8 @@ #include "common/status.h" #include "util/bitmap_value.h" +#include "vec/columns/columns_number.h" +#include "vec/exprs/table_function/table_function.h" #include "vec/exprs/vexpr.h" namespace doris::vectorized { @@ -27,7 +29,7 @@ VExplodeBitmapTableFunction::VExplodeBitmapTableFunction() { _fn_name = "vexplode_bitmap"; } -Status VExplodeBitmapTableFunction::process_init(vectorized::Block* block) { +Status VExplodeBitmapTableFunction::process_init(Block* block) { CHECK(_vexpr_context->root()->children().size() == 1) << "VExplodeNumbersTableFunction must be have 1 children but have " << _vexpr_context->root()->children().size(); @@ -42,68 +44,53 @@ Status VExplodeBitmapTableFunction::process_init(vectorized::Block* block) { Status VExplodeBitmapTableFunction::reset() { _eos = false; - if (!_is_current_empty) { - _reset_iterator(); + _cur_offset = 0; + if (!current_empty()) { + _cur_iter.reset(new BitmapValueIterator(*_cur_bitmap)); } return Status::OK(); } -Status VExplodeBitmapTableFunction::forward(bool* eos) { - if (_is_current_empty) { - *eos = true; - _eos = true; - } else { - ++(*_cur_iter); - ++_cur_offset; - if (_cur_offset == _cur_size) { - *eos = true; - _eos = true; - } else { - _cur_value = **_cur_iter; - *eos = false; +Status VExplodeBitmapTableFunction::forward(int step) { + if (!current_empty()) { + for (int i = 0; i < step; i++) { + ++(*_cur_iter); } } - return Status::OK(); + return TableFunction::forward(step); } -Status VExplodeBitmapTableFunction::get_value(void** output) { - if (_is_current_empty) { - *output = nullptr; +void VExplodeBitmapTableFunction::get_value(MutableColumnPtr& column) { + if (current_empty()) { + column->insert_default(); } else { - *output = &_cur_value; + if (_is_nullable) { + static_cast( + static_cast(column.get())->get_nested_column_ptr().get()) + ->insert_value(**_cur_iter); + static_cast( + static_cast(column.get())->get_null_map_column_ptr().get()) + ->insert_default(); + } else { + static_cast(column.get())->insert_value(**_cur_iter); + } } - return Status::OK(); -} - -void VExplodeBitmapTableFunction::_reset_iterator() { - DCHECK(_cur_bitmap->cardinality() > 0) << _cur_bitmap->cardinality(); - _cur_iter.reset(new BitmapValueIterator(*_cur_bitmap)); - _cur_value = **_cur_iter; - _cur_offset = 0; } Status VExplodeBitmapTableFunction::process_row(size_t row_idx) { - _eos = false; - _is_current_empty = false; - _cur_size = 0; - _cur_offset = 0; + RETURN_IF_ERROR(TableFunction::process_row(row_idx)); StringRef value = _value_column->get_data_at(row_idx); - if (value.data == nullptr) { - _is_current_empty = true; - } else { + if (value.data) { _cur_bitmap = reinterpret_cast(value.data); _cur_size = _cur_bitmap->cardinality(); - if (_cur_size == 0) { - _is_current_empty = true; - } else { - _reset_iterator(); + if (!current_empty()) { + _cur_iter.reset(new BitmapValueIterator(*_cur_bitmap)); } } - _is_current_empty = (_cur_size == 0); return Status::OK(); } @@ -112,13 +99,4 @@ Status VExplodeBitmapTableFunction::process_close() { return Status::OK(); } -Status VExplodeBitmapTableFunction::get_value_length(int64_t* length) { - if (_is_current_empty) { - *length = -1; - } else { - *length = sizeof(uint64_t); - } - return Status::OK(); -} - } // namespace doris::vectorized diff --git a/be/src/vec/exprs/table_function/vexplode_bitmap.h b/be/src/vec/exprs/table_function/vexplode_bitmap.h index 3d4b7c2ff0..dd47261c38 100644 --- a/be/src/vec/exprs/table_function/vexplode_bitmap.h +++ b/be/src/vec/exprs/table_function/vexplode_bitmap.h @@ -29,13 +29,12 @@ public: ~VExplodeBitmapTableFunction() override = default; Status reset() override; - Status get_value(void** output) override; - Status forward(bool* eos) override; + void get_value(MutableColumnPtr& column) override; + Status forward(int step = 1) override; - Status process_init(vectorized::Block* block) override; + Status process_init(Block* block) override; Status process_row(size_t row_idx) override; Status process_close() override; - Status get_value_length(int64_t* length) override; private: void _reset_iterator(); @@ -43,9 +42,6 @@ private: const BitmapValue* _cur_bitmap = nullptr; // iterator of _cur_bitmap std::unique_ptr _cur_iter = nullptr; - // current value read from bitmap, it will be referenced by - // table function scan node. - uint64_t _cur_value = 0; ColumnPtr _value_column; }; diff --git a/be/src/vec/exprs/table_function/vexplode_json_array.cpp b/be/src/vec/exprs/table_function/vexplode_json_array.cpp index f12fa617bc..b67467ea98 100644 --- a/be/src/vec/exprs/table_function/vexplode_json_array.cpp +++ b/be/src/vec/exprs/table_function/vexplode_json_array.cpp @@ -127,7 +127,7 @@ VExplodeJsonArrayTableFunction::VExplodeJsonArrayTableFunction(ExplodeJsonArrayT _fn_name = "vexplode_json_array"; } -Status VExplodeJsonArrayTableFunction::process_init(vectorized::Block* block) { +Status VExplodeJsonArrayTableFunction::process_init(Block* block) { CHECK(_vexpr_context->root()->children().size() == 1) << _vexpr_context->root()->children().size(); @@ -139,28 +139,15 @@ Status VExplodeJsonArrayTableFunction::process_init(vectorized::Block* block) { return Status::OK(); } -Status VExplodeJsonArrayTableFunction::reset() { - _eos = false; - _cur_offset = 0; - return Status::OK(); -} - Status VExplodeJsonArrayTableFunction::process_row(size_t row_idx) { - _is_current_empty = false; - _eos = false; + RETURN_IF_ERROR(TableFunction::process_row(row_idx)); StringRef text = _text_column->get_data_at(row_idx); - if (text.data == nullptr) { - _is_current_empty = true; - } else { + if (text.data != nullptr) { rapidjson::Document document; document.Parse(text.data, text.size); - if (UNLIKELY(document.HasParseError()) || !document.IsArray() || - document.GetArray().Size() == 0) { - _is_current_empty = true; - } else { + if (!document.HasParseError() && document.IsArray() && document.GetArray().Size()) { _cur_size = _parsed_data.set_output(_type, document); - _cur_offset = 0; } } return Status::OK(); @@ -171,22 +158,13 @@ Status VExplodeJsonArrayTableFunction::process_close() { return Status::OK(); } -Status VExplodeJsonArrayTableFunction::get_value_length(int64_t* length) { - if (_is_current_empty) { - *length = -1; +void VExplodeJsonArrayTableFunction::get_value(MutableColumnPtr& column) { + if (current_empty()) { + column->insert_default(); } else { - _parsed_data.get_value_length(_type, _cur_offset, length); + column->insert_data((char*)_parsed_data.get_value(_type, _cur_offset, true), + _parsed_data.get_value_length(_type, _cur_offset)); } - return Status::OK(); -} - -Status VExplodeJsonArrayTableFunction::get_value(void** output) { - if (_is_current_empty) { - *output = nullptr; - } else { - _parsed_data.get_value(_type, _cur_offset, output, true); - } - return Status::OK(); } } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exprs/table_function/vexplode_json_array.h b/be/src/vec/exprs/table_function/vexplode_json_array.h index ab171c7cef..9d7de170ff 100644 --- a/be/src/vec/exprs/table_function/vexplode_json_array.h +++ b/be/src/vec/exprs/table_function/vexplode_json_array.h @@ -63,33 +63,25 @@ struct ParsedData { } } - void get_value(ExplodeJsonArrayType type, int64_t offset, void** output, bool real = false) { + void* get_value(ExplodeJsonArrayType type, int64_t offset, bool real = false) { switch (type) { case ExplodeJsonArrayType::INT: case ExplodeJsonArrayType::DOUBLE: - *output = _data[offset]; - break; + return _data[offset]; case ExplodeJsonArrayType::STRING: - *output = _string_nulls[offset] ? nullptr - : real ? reinterpret_cast(_backup_string[offset].data()) - : &_data_string[offset]; - break; + return _string_nulls[offset] ? nullptr + : real ? reinterpret_cast(_backup_string[offset].data()) + : &_data_string[offset]; default: - CHECK(false) << type; + return nullptr; } } - void get_value_length(ExplodeJsonArrayType type, int64_t offset, int64_t* length) { - switch (type) { - case ExplodeJsonArrayType::INT: - case ExplodeJsonArrayType::DOUBLE: - break; - case ExplodeJsonArrayType::STRING: - *length = _string_nulls[offset] ? -1 : _backup_string[offset].size(); - break; - default: - CHECK(false) << type; + int64 get_value_length(ExplodeJsonArrayType type, int64_t offset) { + if (type == ExplodeJsonArrayType::STRING && !_string_nulls[offset]) { + return _backup_string[offset].size(); } + return 0; } int set_output(ExplodeJsonArrayType type, rapidjson::Document& document); @@ -100,13 +92,10 @@ public: VExplodeJsonArrayTableFunction(ExplodeJsonArrayType type); ~VExplodeJsonArrayTableFunction() override = default; - Status process_init(vectorized::Block* block) override; + Status process_init(Block* block) override; Status process_row(size_t row_idx) override; Status process_close() override; - Status get_value(void** output) override; - Status get_value_length(int64_t* length) override; - - Status reset() override; + void get_value(MutableColumnPtr& column) override; private: ParsedData _parsed_data; diff --git a/be/src/vec/exprs/table_function/vexplode_numbers.cpp b/be/src/vec/exprs/table_function/vexplode_numbers.cpp index 1c01653ed2..d1a5eb07dc 100644 --- a/be/src/vec/exprs/table_function/vexplode_numbers.cpp +++ b/be/src/vec/exprs/table_function/vexplode_numbers.cpp @@ -18,6 +18,9 @@ #include "vec/exprs/table_function/vexplode_numbers.h" #include "common/status.h" +#include "vec/columns/column_const.h" +#include "vec/columns/column_nullable.h" +#include "vec/columns/columns_number.h" #include "vec/exprs/vexpr.h" namespace doris::vectorized { @@ -26,7 +29,7 @@ VExplodeNumbersTableFunction::VExplodeNumbersTableFunction() { _fn_name = "vexplode_numbers"; } -Status VExplodeNumbersTableFunction::process_init(vectorized::Block* block) { +Status VExplodeNumbersTableFunction::process_init(Block* block) { CHECK(_vexpr_context->root()->children().size() == 1) << "VExplodeSplitTableFunction must be have 1 children but have " << _vexpr_context->root()->children().size(); @@ -35,24 +38,38 @@ Status VExplodeNumbersTableFunction::process_init(vectorized::Block* block) { RETURN_IF_ERROR(_vexpr_context->root()->children()[0]->execute(_vexpr_context, block, &value_column_idx)); _value_column = block->get_by_position(value_column_idx).column; + if (is_column_const(*_value_column)) { + _cur_size = 0; + auto& column_nested = assert_cast(*_value_column).get_data_column_ptr(); + if (column_nested->is_nullable()) { + if (!column_nested->is_null_at(0)) { + _cur_size = static_cast(column_nested.get()) + ->get_nested_column() + .get_int(0); + } + } else { + _cur_size = column_nested->get_int(0); + } + if (_cur_size && _cur_size <= block->rows()) { // avoid elements_column too big or empty + _is_const = true; // use const optimize + for (int i = 0; i < _cur_size; i++) { + ((ColumnInt32*)_elements_column.get())->insert_value(i); + } + } + } return Status::OK(); } Status VExplodeNumbersTableFunction::process_row(size_t row_idx) { - _is_current_empty = false; - _eos = false; + RETURN_IF_ERROR(TableFunction::process_row(row_idx)); + if (_is_const) { + return Status::OK(); + } StringRef value = _value_column->get_data_at(row_idx); - - if (value.data == nullptr) { - _is_current_empty = true; - _cur_size = 0; - _cur_offset = 0; - } else { - _cur_size = *reinterpret_cast(value.data); - _cur_offset = 0; - _is_current_empty = (_cur_size <= 0); + if (value.data != nullptr) { + _cur_size = std::max(0, *reinterpret_cast(value.data)); } return Status::OK(); } @@ -62,28 +79,21 @@ Status VExplodeNumbersTableFunction::process_close() { return Status::OK(); } -Status VExplodeNumbersTableFunction::reset() { - _eos = false; - _cur_offset = 0; - return Status::OK(); -} - -Status VExplodeNumbersTableFunction::get_value(void** output) { - if (_is_current_empty) { - *output = nullptr; +void VExplodeNumbersTableFunction::get_value(MutableColumnPtr& column) { + if (current_empty()) { + column->insert_default(); } else { - *output = &_cur_offset; + if (_is_nullable) { + static_cast( + static_cast(column.get())->get_nested_column_ptr().get()) + ->insert_value(_cur_offset); + static_cast( + static_cast(column.get())->get_null_map_column_ptr().get()) + ->insert_default(); + } else { + static_cast(column.get())->insert_value(_cur_offset); + } } - return Status::OK(); -} - -Status VExplodeNumbersTableFunction::get_value_length(int64_t* length) { - if (_is_current_empty) { - *length = -1; - } else { - *length = sizeof(int); - } - return Status::OK(); } } // namespace doris::vectorized diff --git a/be/src/vec/exprs/table_function/vexplode_numbers.h b/be/src/vec/exprs/table_function/vexplode_numbers.h index e125e5d5b8..3e93471b47 100644 --- a/be/src/vec/exprs/table_function/vexplode_numbers.h +++ b/be/src/vec/exprs/table_function/vexplode_numbers.h @@ -25,17 +25,37 @@ namespace doris::vectorized { class VExplodeNumbersTableFunction : public TableFunction { public: VExplodeNumbersTableFunction(); - virtual ~VExplodeNumbersTableFunction() = default; + ~VExplodeNumbersTableFunction() override = default; - virtual Status process_init(vectorized::Block* block) override; - virtual Status process_row(size_t row_idx) override; - virtual Status process_close() override; - virtual Status reset() override; - virtual Status get_value(void** output) override; - virtual Status get_value_length(int64_t* length) override; + Status process_init(Block* block) override; + Status process_row(size_t row_idx) override; + Status process_close() override; + void get_value(MutableColumnPtr& column) override; + int get_value(MutableColumnPtr& column, int max_step) override { + if (_is_const) { + max_step = std::min(max_step, (int)(_cur_size - _cur_offset)); + if (_is_nullable) { + static_cast( + static_cast(column.get())->get_nested_column_ptr().get()) + ->insert_many_from(*_elements_column, _cur_offset, max_step); + static_cast( + static_cast(column.get())->get_null_map_column_ptr().get()) + ->insert_many_defaults(max_step); + } else { + static_cast(column.get()) + ->insert_many_from(*_elements_column, _cur_offset, max_step); + } + + forward(max_step); + return max_step; + } + + return TableFunction::get_value(column, max_step); + } private: ColumnPtr _value_column; + ColumnPtr _elements_column = ColumnInt32::create(); }; } // namespace doris::vectorized diff --git a/be/src/vec/exprs/table_function/vexplode_split.cpp b/be/src/vec/exprs/table_function/vexplode_split.cpp index 1bceffeeba..d2438d8c9a 100644 --- a/be/src/vec/exprs/table_function/vexplode_split.cpp +++ b/be/src/vec/exprs/table_function/vexplode_split.cpp @@ -32,15 +32,7 @@ Status VExplodeSplitTableFunction::open() { return Status::OK(); } -Status VExplodeSplitTableFunction::reset() { - _eos = false; - if (!_is_current_empty) { - _cur_offset = 0; - } - return Status::OK(); -} - -Status VExplodeSplitTableFunction::process_init(vectorized::Block* block) { +Status VExplodeSplitTableFunction::process_init(Block* block) { CHECK(_vexpr_context->root()->children().size() == 2) << "VExplodeSplitTableFunction must be have 2 children but have " << _vexpr_context->root()->children().size(); @@ -77,14 +69,9 @@ Status VExplodeSplitTableFunction::process_init(vectorized::Block* block) { } Status VExplodeSplitTableFunction::process_row(size_t row_idx) { - _is_current_empty = false; - _eos = false; + RETURN_IF_ERROR(TableFunction::process_row(row_idx)); - if ((_test_null_map and _test_null_map[row_idx]) || _delimiter.data == nullptr) { - _is_current_empty = true; - _cur_size = 0; - _cur_offset = 0; - } else { + if (!(_test_null_map && _test_null_map[row_idx]) && _delimiter.data != nullptr) { // TODO: use the function to be better string_view/StringRef split auto split = [](std::string_view strv, std::string_view delims = " ") { std::vector output; @@ -113,8 +100,6 @@ Status VExplodeSplitTableFunction::process_row(size_t row_idx) { _backup = split(_real_text_column->get_data_at(row_idx), _delimiter); _cur_size = _backup.size(); - _cur_offset = 0; - _is_current_empty = (_cur_size == 0); } return Status::OK(); } @@ -127,22 +112,13 @@ Status VExplodeSplitTableFunction::process_close() { return Status::OK(); } -Status VExplodeSplitTableFunction::get_value(void** output) { - if (_is_current_empty) { - *output = nullptr; +void VExplodeSplitTableFunction::get_value(MutableColumnPtr& column) { + if (current_empty()) { + column->insert_default(); } else { - *output = const_cast(_backup[_cur_offset].data()); + column->insert_data(const_cast(_backup[_cur_offset].data()), + _backup[_cur_offset].length()); } - return Status::OK(); -} - -Status VExplodeSplitTableFunction::get_value_length(int64_t* length) { - if (_is_current_empty) { - *length = -1; - } else { - *length = _backup[_cur_offset].length(); - } - return Status::OK(); } } // namespace doris::vectorized diff --git a/be/src/vec/exprs/table_function/vexplode_split.h b/be/src/vec/exprs/table_function/vexplode_split.h index 53935b6a0b..5881ef9b6e 100644 --- a/be/src/vec/exprs/table_function/vexplode_split.h +++ b/be/src/vec/exprs/table_function/vexplode_split.h @@ -30,12 +30,10 @@ public: ~VExplodeSplitTableFunction() override = default; Status open() override; - Status process_init(vectorized::Block* block) override; + Status process_init(Block* block) override; Status process_row(size_t row_idx) override; Status process_close() override; - Status get_value(void** output) override; - Status get_value_length(int64_t* length) override; - Status reset() override; + void get_value(MutableColumnPtr& column) override; private: std::vector _backup; diff --git a/be/test/vec/function/function_test_util.cpp b/be/test/vec/function/function_test_util.cpp index e1551fe77f..da975a2c84 100644 --- a/be/test/vec/function/function_test_util.cpp +++ b/be/test/vec/function/function_test_util.cpp @@ -335,6 +335,9 @@ Block* process_table_function(TableFunction* fn, Block* input_block, // prepare output column vectorized::MutableColumnPtr column = descs[0].data_type->create_column(); + if (column->is_nullable()) { + fn->set_nullable(); + } // process table function for all rows for (size_t row = 0; row < input_block->rows(); ++row) { @@ -348,25 +351,10 @@ Block* process_table_function(TableFunction* fn, Block* input_block, continue; } - bool tmp_eos = false; do { - void* cell = nullptr; - int64_t cell_len = 0; - if (fn->get_value(&cell) != Status::OK() || - fn->get_value_length(&cell_len) != Status::OK()) { - LOG(WARNING) << "TableFunction get_value or get_value_length failed"; - return nullptr; - } - - // copy data from input block - if (cell == nullptr) { - column->insert_default(); - } else { - column->insert_data(reinterpret_cast(cell), cell_len); - } - - fn->forward(&tmp_eos); - } while (!tmp_eos); + fn->get_value(column); + fn->forward(); + } while (!fn->eos()); } std::unique_ptr output_block(new Block()); diff --git a/regression-test/suites/schema_change_p0/test_varchar_schema_change.groovy b/regression-test/suites/schema_change_p0/test_varchar_schema_change.groovy index 2b2b0619fd..a3a66c68fd 100644 --- a/regression-test/suites/schema_change_p0/test_varchar_schema_change.groovy +++ b/regression-test/suites/schema_change_p0/test_varchar_schema_change.groovy @@ -111,7 +111,7 @@ suite ("test_varchar_schema_change") { logger.info(res[2][1]) assertEquals(res[2][1].toLowerCase(),"varchar(30)") - qt_sc " select * from ${tableName} order by 1; " + qt_sc " select * from ${tableName} order by 1,2; " // test { //没捕获到异常 // sql """ insert into ${tableName} values(92,'2017-12-01',483647,'sdafdsaf') """ @@ -140,7 +140,7 @@ suite ("test_varchar_schema_change") { logger.info(res[2][1]) assertEquals(res[2][1].toLowerCase(),"varchar(30)") - qt_sc " select * from ${tableName} where c2 like '%1%' order by 1; " + qt_sc " select * from ${tableName} where c2 like '%1%' order by 1,2; " sql """ insert into ${tableName} values(22,'2011-12-01','12f2','fdsaf') """ sql """ insert into ${tableName} values(55,'2009-11-21','12d1d113','123aa') """ @@ -196,9 +196,9 @@ suite ("test_varchar_schema_change") { } while (running) } - qt_sc " select * from ${tableName} order by 1; " - qt_sc " select min(c2),max(c2) from ${tableName} order by 1; " - qt_sc " select min(c2),max(c2) from ${tableName} group by c0 order by 1; " + qt_sc " select * from ${tableName} order by 1,2; " + qt_sc " select min(c2),max(c2) from ${tableName} order by 1,2; " + qt_sc " select min(c2),max(c2) from ${tableName} group by c0 order by 1,2; " sleep(5000) sql """ alter table ${tableName} @@ -222,7 +222,7 @@ suite ("test_varchar_schema_change") { logger.info(res[2][1]) assertEquals(res[2][1].toLowerCase(),"varchar(40)") - qt_sc " select * from ${tableName} order by 1; " + qt_sc " select * from ${tableName} order by 1,2; " // test{ // sql """ alter table t0 modify column c1 varchar(20) NOT NULL """