diff --git a/be/src/exprs/table_function/dummy_table_functions.cpp b/be/src/exprs/table_function/dummy_table_functions.cpp index 40923bf1cf..8bbf7a35cb 100644 --- a/be/src/exprs/table_function/dummy_table_functions.cpp +++ b/be/src/exprs/table_function/dummy_table_functions.cpp @@ -50,4 +50,15 @@ IntVal DummyTableFunctions::explode_numbers(doris_udf::FunctionContext* context, const doris_udf::IntVal& str) { return IntVal(); } + +AnyVal DummyTableFunctions::explode(doris_udf::FunctionContext* context, + const doris_udf::CollectionVal& value) { + return AnyVal(); +} + +AnyVal DummyTableFunctions::explode_outer(doris_udf::FunctionContext* context, + const doris_udf::CollectionVal& value) { + return AnyVal(); +} + } // namespace doris diff --git a/be/src/exprs/table_function/dummy_table_functions.h b/be/src/exprs/table_function/dummy_table_functions.h index 9fde65cbe5..3f6f879548 100644 --- a/be/src/exprs/table_function/dummy_table_functions.h +++ b/be/src/exprs/table_function/dummy_table_functions.h @@ -45,5 +45,9 @@ public: const doris_udf::StringVal& str); static doris_udf::IntVal explode_numbers(doris_udf::FunctionContext* context, const doris_udf::IntVal& value); + static doris_udf::AnyVal explode(doris_udf::FunctionContext* context, + const doris_udf::CollectionVal& value); + static doris_udf::AnyVal explode_outer(doris_udf::FunctionContext* context, + const doris_udf::CollectionVal& value); }; } // namespace doris diff --git a/be/src/exprs/table_function/table_function.h b/be/src/exprs/table_function/table_function.h index c431d361a7..3c2e188950 100644 --- a/be/src/exprs/table_function/table_function.h +++ b/be/src/exprs/table_function/table_function.h @@ -98,6 +98,9 @@ public: _vexpr_context = vexpr_context; } + bool is_outer() const { return _is_outer; } + bool current_empty() const { return _is_current_empty; } + protected: std::string _fn_name; ExprContext* _expr_context = nullptr; @@ -111,6 +114,8 @@ protected: int64_t _cur_offset = 0; // the size of current result int64_t _cur_size = 0; + // set _is_outer to false for explode function, and should not return tuple while array is null or empty + bool _is_outer = true; }; } // namespace doris diff --git a/be/src/exprs/table_function/table_function_factory.cpp b/be/src/exprs/table_function/table_function_factory.cpp index 7dcce8c9c5..e44e3e65ed 100644 --- a/be/src/exprs/table_function/table_function_factory.cpp +++ b/be/src/exprs/table_function/table_function_factory.cpp @@ -22,6 +22,7 @@ #include "exprs/table_function/explode_json_array.h" #include "exprs/table_function/explode_split.h" #include "exprs/table_function/table_function.h" +#include "vec/exprs/table_function/vexplode.h" #include "vec/exprs/table_function/vexplode_bitmap.h" #include "vec/exprs/table_function/vexplode_json_array.h" #include "vec/exprs/table_function/vexplode_numbers.h" @@ -48,6 +49,12 @@ struct TableFunctionCreator { } }; +template <> +struct TableFunctionCreator { + bool is_outer; + TableFunction* operator()() { return new vectorized::VExplodeTableFunction(is_outer); } +}; + inline auto ExplodeJsonArrayIntCreator = TableFunctionCreator {ExplodeJsonArrayType::INT}; inline auto ExplodeJsonArrayDoubleCreator = @@ -65,6 +72,9 @@ inline auto VExplodeJsonArrayStringCreator = TableFunctionCreator { ExplodeJsonArrayType::STRING}; +inline auto VExplodeCreator = TableFunctionCreator {false}; +inline auto VExplodeOuterCreator = TableFunctionCreator {true}; + //{fn_name,is_vectorized}->table_function_creator const std::unordered_map, std::function> TableFunctionFactory::_function_map { @@ -81,7 +91,9 @@ const std::unordered_map, std::function()}}; + TableFunctionCreator()}, + {{"explode", true}, VExplodeCreator}, + {{"explode_outer", true}, VExplodeOuterCreator}}; // namespace doris Status TableFunctionFactory::get_fn(const std::string& fn_name, bool is_vectorized, ObjectPool* pool, TableFunction** fn) { diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 886de1326d..3e789b7365 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -111,6 +111,7 @@ set(VEC_FILES exprs/vcast_expr.cpp exprs/vcase_expr.cpp exprs/vinfo_func.cpp + exprs/table_function/vexplode.cpp exprs/table_function/vexplode_split.cpp exprs/table_function/vexplode_numbers.cpp exprs/table_function/vexplode_bitmap.cpp diff --git a/be/src/vec/exec/vtable_function_node.cpp b/be/src/vec/exec/vtable_function_node.cpp index 30dd0097f6..6b25849f77 100644 --- a/be/src/vec/exec/vtable_function_node.cpp +++ b/be/src/vec/exec/vtable_function_node.cpp @@ -92,6 +92,16 @@ Status VTableFunctionNode::get_next(RuntimeState* state, Block* block, bool* eos return Status::OK(); } +bool VTableFunctionNode::_is_inner_and_empty() { + for (int i = 0; i < _fn_num; i++) { + // if any table function is not outer and has empty result, go to next child row + if (!_fns[i]->is_outer() && _fns[i]->current_empty()) { + return true; + } + } + return false; +} + Status VTableFunctionNode::get_expanded_block(RuntimeState* state, Block* output_block, bool* eos) { DCHECK(_child_block != nullptr); @@ -128,9 +138,10 @@ Status VTableFunctionNode::get_expanded_block(RuntimeState* state, Block* output RETURN_IF_ERROR(_process_next_child_row()); } + bool skip_child_row = false; while (true) { int idx = _find_last_fn_eos_idx(); - if (idx == 0) { + if (idx == 0 || skip_child_row) { // all table functions' results are exhausted, process next child row. RETURN_IF_ERROR(_process_next_child_row()); if (_cur_child_offset == -1) { @@ -144,6 +155,11 @@ Status VTableFunctionNode::get_expanded_block(RuntimeState* state, Block* output } } + // if any table function is not outer and has empty result, go to next child row + if ((skip_child_row = _is_inner_and_empty()) == true) { + continue; + } + // get slots from every table function. // notice that _fn_values[i] may be null if the table function has empty result set. for (int i = 0; i < _fn_num; i++) { diff --git a/be/src/vec/exec/vtable_function_node.h b/be/src/vec/exec/vtable_function_node.h index 1913cd1d35..6192e52ff6 100644 --- a/be/src/vec/exec/vtable_function_node.h +++ b/be/src/vec/exec/vtable_function_node.h @@ -37,6 +37,8 @@ private: Status get_expanded_block(RuntimeState* state, Block* output_block, bool* eos); + bool _is_inner_and_empty(); + std::unique_ptr _child_block; std::vector _child_slots; std::vector _output_slots; diff --git a/be/src/vec/exprs/table_function/vexplode.cpp b/be/src/vec/exprs/table_function/vexplode.cpp new file mode 100644 index 0000000000..58f8e7f2e0 --- /dev/null +++ b/be/src/vec/exprs/table_function/vexplode.cpp @@ -0,0 +1,114 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exprs/table_function/vexplode.h" + +#include "vec/exprs/vexpr.h" + +namespace doris::vectorized { + +VExplodeTableFunction::VExplodeTableFunction(bool is_outer) { + _is_outer = is_outer; + if (_is_outer) { + _fn_name = "vexplode_outer"; + } else { + _fn_name = "vexplode"; + } +} + +Status VExplodeTableFunction::process_init(vectorized::Block* block) { + CHECK(_vexpr_context->root()->children().size() == 1) + << "VExplodeTableFunction must be have 1 children but have " + << _vexpr_context->root()->children().size(); + + int value_column_idx = -1; + _vexpr_context->root()->children()[0]->execute(_vexpr_context, block, &value_column_idx); + + if (block->get_by_position(value_column_idx).column->is_nullable()) { + auto array_nullable_column = check_and_get_column( + *block->get_by_position(value_column_idx).column); + _array_null_map = array_nullable_column->get_null_map_column().get_data().data(); + _array_column = + check_and_get_column(array_nullable_column->get_nested_column_ptr()); + } else { + _array_null_map = nullptr; + _array_column = + check_and_get_column(*block->get_by_position(value_column_idx).column); + } + if (!_array_column) { + return Status::NotSupported("column type " + + block->get_by_position(value_column_idx).column->get_name() + + " not supported now"); + } + + return Status::OK(); +} + +Status VExplodeTableFunction::process_row(size_t row_idx) { + DCHECK(row_idx < _array_column->size()); + _is_current_empty = false; + _eos = false; + + if (_array_null_map && _array_null_map[row_idx]) { + _is_current_empty = true; + _cur_size = 0; + _cur_offset = 0; + _pos = 0; + } else { + _cur_size = + _array_column->get_offsets()[row_idx] - _array_column->get_offsets()[row_idx - 1]; + _cur_offset = 0; + _is_current_empty = (_cur_size == 0); + _pos = _array_column->get_offsets()[row_idx - 1]; + } + return Status::OK(); +} + +Status VExplodeTableFunction::process_close() { + _array_column = nullptr; + _array_null_map = nullptr; + _pos = 0; + return Status::OK(); +} + +Status VExplodeTableFunction::reset() { + _eos = false; + _cur_offset = 0; + return Status::OK(); +} + +Status VExplodeTableFunction::get_value(void** output) { + if (_is_current_empty) { + *output = nullptr; + return Status::OK(); + } + + *output = const_cast(_array_column->get_data().get_data_at(_pos + _cur_offset).data); + return Status::OK(); +} + +Status VExplodeTableFunction::get_value_length(int64_t* length) { + if (_is_current_empty) { + *length = -1; + return Status::OK(); + } + + *length = _array_column->get_data().get_data_at(_pos + _cur_offset).size; + return Status::OK(); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/exprs/table_function/vexplode.h b/be/src/vec/exprs/table_function/vexplode.h new file mode 100644 index 0000000000..b7ee993c0e --- /dev/null +++ b/be/src/vec/exprs/table_function/vexplode.h @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "exprs/table_function/table_function.h" +#include "vec/common/string_ref.h" +#include "vec/columns/column.h" +#include "vec/columns/column_array.h" +#include "vec/columns/column_nullable.h" + +namespace doris::vectorized { + +class VExplodeTableFunction : public TableFunction { +public: + VExplodeTableFunction(bool is_outer); + + virtual ~VExplodeTableFunction() = default; + + virtual Status process_init(vectorized::Block* block) override; + virtual Status process_row(size_t row_idx) override; + virtual Status process_close() override; + virtual Status reset() override; + virtual Status get_value(void** output) override; + virtual Status get_value_length(int64_t* length) override; + +private: + const UInt8* _array_null_map; + const ColumnArray* _array_column; + size_t _pos; +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/exprs/table_function/vexplode_numbers.cpp b/be/src/vec/exprs/table_function/vexplode_numbers.cpp index 540c7ac5df..c5b6629c06 100644 --- a/be/src/vec/exprs/table_function/vexplode_numbers.cpp +++ b/be/src/vec/exprs/table_function/vexplode_numbers.cpp @@ -50,7 +50,7 @@ Status VExplodeNumbersTableFunction::process_row(size_t row_idx) { } else { _cur_size = *reinterpret_cast(value.data); _cur_offset = 0; - _is_current_empty = (_cur_size == 0); + _is_current_empty = (_cur_size <= 0); } return Status::OK(); } diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h index 1a99aee464..26681baf91 100644 --- a/be/src/vec/exprs/vexpr.h +++ b/be/src/vec/exprs/vexpr.h @@ -35,6 +35,8 @@ class VExpr { public: VExpr(const TExprNode& node); VExpr(const TypeDescriptor& type, bool is_slotref, bool is_nullable); + // only used for test + VExpr() {} virtual ~VExpr() = default; virtual VExpr* clone(ObjectPool* pool) const = 0; diff --git a/be/src/vec/functions/function_fake.cpp b/be/src/vec/functions/function_fake.cpp index 9deacb2afa..b661960517 100644 --- a/be/src/vec/functions/function_fake.cpp +++ b/be/src/vec/functions/function_fake.cpp @@ -27,6 +27,8 @@ void register_function_fake(SimpleFunctionFactory& factory) { factory.register_function>(); factory.register_function>(); factory.register_function>(); + factory.register_function>(); + factory.register_function>(); } } // namespace doris::vectorized diff --git a/be/src/vec/functions/function_fake.h b/be/src/vec/functions/function_fake.h index c7e536d6ea..56648d8877 100644 --- a/be/src/vec/functions/function_fake.h +++ b/be/src/vec/functions/function_fake.h @@ -19,8 +19,10 @@ #include "common/status.h" #include "vec/core/types.h" +#include "vec/data_types/data_type_array.h" #include "vec/data_types/data_type_number.h" #include "vec/data_types/data_type_string.h" +#include "vec/functions/function_helpers.h" #include "vec/functions/simple_function_factory.h" #include "vec/utils/util.hpp" @@ -75,6 +77,24 @@ struct FunctionExplodeBitmapImpl { } }; +struct FunctionExplodeImpl { + static constexpr auto name = "explode"; + static DataTypePtr get_return_type_impl(const DataTypes& arguments) { + DCHECK(is_array(arguments[0])) << arguments[0]->get_name() << " not supported"; + return make_nullable( + check_and_get_data_type(arguments[0].get())->get_nested_type()); + } +}; + +struct FunctionExplodeOuterImpl { + static constexpr auto name = "explode_outer"; + static DataTypePtr get_return_type_impl(const DataTypes& arguments) { + DCHECK(is_array(arguments[0])) << arguments[0]->get_name() << " not supported"; + return make_nullable( + check_and_get_data_type(arguments[0].get())->get_nested_type()); + } +}; + //FunctionFake is use for some function call expr only work at prepare/open phase, do not support execute(). template class FunctionFake : public IFunction { diff --git a/be/test/exprs/mock_vexpr.h b/be/test/exprs/mock_vexpr.h new file mode 100644 index 0000000000..4da44ef3b6 --- /dev/null +++ b/be/test/exprs/mock_vexpr.h @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "vec/exprs/vexpr.h" + +namespace doris { +namespace vectorized { + +class MockVExpr : public VExpr { +public: + MOCK_CONST_METHOD1(clone, VExpr*(ObjectPool* pool)); + MOCK_CONST_METHOD0(expr_name, const std::string&()); + MOCK_METHOD3(execute, Status(VExprContext* context, vectorized::Block* block, int* result_column_id)); +}; // class MockVExpr + +} // namespace vectorized +} // namespace doris diff --git a/be/test/vec/function/CMakeLists.txt b/be/test/vec/function/CMakeLists.txt index 116030e984..4a68ad1231 100644 --- a/be/test/vec/function/CMakeLists.txt +++ b/be/test/vec/function/CMakeLists.txt @@ -32,3 +32,4 @@ ADD_BE_TEST(function_like_test) ADD_BE_TEST(function_arithmetic_test) ADD_BE_TEST(function_json_test) ADD_BE_TEST(function_geo_test) +ADD_BE_TEST(table_function_test) diff --git a/be/test/vec/function/function_test_util.h b/be/test/vec/function/function_test_util.h index c345354fbf..16b0aac4be 100644 --- a/be/test/vec/function/function_test_util.h +++ b/be/test/vec/function/function_test_util.h @@ -23,6 +23,7 @@ #include #include "exec/schema_scanner.h" +#include "exprs/table_function/table_function.h" #include "runtime/row_batch.h" #include "runtime/tuple_row.h" #include "testutil/function_utils.h" @@ -37,6 +38,7 @@ namespace doris::vectorized { +using InputDataSet = std::vector>; // without result using DataSet = std::vector, std::any>>; using InputTypeSet = std::vector; @@ -80,10 +82,9 @@ using UTDataTypeDescs = std::vector; } // namespace ut_type -size_t type_index_to_data_type(const std::vector& input_types, size_t index, - doris_udf::FunctionContext::TypeDesc& desc, - DataTypePtr& type) { - if(index < 0 || index >= input_types.size()) { +size_t type_index_to_data_type(const InputTypeSet& input_types, size_t index, + doris_udf::FunctionContext::TypeDesc& desc, DataTypePtr& type) { + if (index < 0 || index >= input_types.size()) { return -1; } @@ -156,10 +157,11 @@ size_t type_index_to_data_type(const std::vector& input_types, size_t return 0; } } -bool parse_ut_data_type(const std::vector& input_types, ut_type::UTDataTypeDescs& descs) { + +bool parse_ut_data_type(const InputTypeSet& input_types, ut_type::UTDataTypeDescs& descs) { descs.clear(); descs.reserve(input_types.size()); - for (size_t i = 0; i < input_types.size(); ) { + for (size_t i = 0; i < input_types.size();) { ut_type::UTDataTypeDesc desc; if (input_types[i].type() == typeid(Consted)) { desc.is_const = true; @@ -178,11 +180,193 @@ bool parse_ut_data_type(const std::vector& input_types, ut_type::UTDat return true; } +bool insert_cell(MutableColumnPtr& column, DataTypePtr type_ptr, const std::any& cell) { + if (cell.type() == typeid(Null)) { + column->insert_data(nullptr, 0); + return true; + } + + WhichDataType type(type_ptr); + if (type.is_string()) { + auto str = std::any_cast(cell); + column->insert_data(str.c_str(), str.size()); + } else if (type.idx == TypeIndex::BitMap) { + BitmapValue* bitmap = std::any_cast(cell); + column->insert_data((char*)bitmap, sizeof(BitmapValue)); + } else if (type.is_int8()) { + auto value = std::any_cast(cell); + column->insert_data(reinterpret_cast(&value), 0); + } else if (type.is_int16()) { + auto value = std::any_cast(cell); + column->insert_data(reinterpret_cast(&value), 0); + } else if (type.is_int32()) { + auto value = std::any_cast(cell); + column->insert_data(reinterpret_cast(&value), 0); + } else if (type.is_int64()) { + auto value = std::any_cast(cell); + column->insert_data(reinterpret_cast(&value), 0); + } else if (type.is_int128()) { + auto value = std::any_cast(cell); + column->insert_data(reinterpret_cast(&value), 0); + } else if (type.is_float64()) { + auto value = std::any_cast(cell); + column->insert_data(reinterpret_cast(&value), 0); + } else if (type.is_float64()) { + auto value = std::any_cast(cell); + column->insert_data(reinterpret_cast(&value), 0); + } else if (type.is_decimal128()) { + auto value = std::any_cast>(cell); + column->insert_data(reinterpret_cast(&value), 0); + } else if (type.is_date_time()) { + static std::string date_time_format("%Y-%m-%d %H:%i:%s"); + auto datetime_str = std::any_cast(cell); + VecDateTimeValue v; + v.from_date_format_str(date_time_format.c_str(), date_time_format.size(), + datetime_str.c_str(), datetime_str.size()); + v.to_datetime(); + column->insert_data(reinterpret_cast(&v), 0); + } else if (type.is_date()) { + static std::string date_time_format("%Y-%m-%d"); + auto datetime_str = std::any_cast(cell); + VecDateTimeValue v; + v.from_date_format_str(date_time_format.c_str(), date_time_format.size(), + datetime_str.c_str(), datetime_str.size()); + v.cast_to_date(); + column->insert_data(reinterpret_cast(&v), 0); + } else if (type.is_array()) { + auto v = std::any_cast(cell); + column->insert(v); + } else { + LOG(WARNING) << "dataset not supported for TypeIndex:" << (int)type.idx; + return false; + } + return true; +} + +Block* create_block_from_inputset(const InputTypeSet& input_types, const InputDataSet& input_set) { + // 1.0 create data type + ut_type::UTDataTypeDescs descs; + if (!parse_ut_data_type(input_types, descs)) { + return nullptr; + } + + // 1.1 insert data and create block + auto row_size = input_set.size(); + std::unique_ptr block(new Block()); + for (size_t i = 0; i < descs.size(); ++i) { + auto& desc = descs[i]; + auto column = desc.data_type->create_column(); + column->reserve(row_size); + + auto type_ptr = desc.data_type->is_nullable() + ? ((DataTypeNullable*)(desc.data_type.get()))->get_nested_type() + : desc.data_type; + WhichDataType type(type_ptr); + + for (int j = 0; j < row_size; j++) { + if (!insert_cell(column, type_ptr, input_set[j][i])) { + return nullptr; + } + } + + if (desc.is_const) { + column = ColumnConst::create(std::move(column), row_size); + } + block->insert({std::move(column), desc.data_type, desc.col_name}); + } + return block.release(); +} + +Block* process_table_function(TableFunction* fn, Block* input_block, + const InputTypeSet& output_types) { + // pasrse output data types + ut_type::UTDataTypeDescs descs; + if (!parse_ut_data_type(output_types, descs)) { + return nullptr; + } + if (descs.size() != 1) { + LOG(WARNING) << "Now table function test only support return one column"; + return nullptr; + } + + // process table function init + if (fn->process_init(input_block) != Status::OK()) { + LOG(WARNING) << "TableFunction process_init failed"; + return nullptr; + } + + // prepare output column + vectorized::MutableColumnPtr column = descs[0].data_type->create_column(); + + // process table function for all rows + for (size_t row = 0; row < input_block->rows(); ++row) { + if (fn->process_row(row) != Status::OK()) { + LOG(WARNING) << "TableFunction process_row failed"; + return nullptr; + } + + // consider outer + if (!fn->is_outer() && fn->current_empty()) { + continue; + } + + bool tmp_eos = false; + do { + void* cell = nullptr; + int64_t cell_len = 0; + if (fn->get_value(&cell) != Status::OK() || + fn->get_value_length(&cell_len) != Status::OK()) { + LOG(WARNING) << "TableFunction get_value or get_value_length failed"; + return nullptr; + } + + // copy data from input block + if (cell == nullptr) { + column->insert_default(); + } else { + column->insert_data(reinterpret_cast(cell), cell_len); + } + + fn->forward(&tmp_eos); + } while (!tmp_eos); + } + + std::unique_ptr output_block(new Block()); + output_block->insert({std::move(column), descs[0].data_type, descs[0].col_name}); + return output_block.release(); +} + +void check_vec_table_function(TableFunction* fn, const InputTypeSet& input_types, + const InputDataSet& input_set, const InputTypeSet& output_types, + const InputDataSet& output_set) { + std::unique_ptr input_block(create_block_from_inputset(input_types, input_set)); + ASSERT_TRUE(input_block != nullptr); + + std::unique_ptr expect_output_block( + create_block_from_inputset(output_types, output_set)); + ASSERT_TRUE(expect_output_block != nullptr); + + std::unique_ptr real_output_block( + process_table_function(fn, input_block.get(), output_types)); + ASSERT_TRUE(real_output_block != nullptr); + + // compare real_output_block with expect_output_block + ASSERT_EQ(expect_output_block->columns(), real_output_block->columns()); + ASSERT_EQ(expect_output_block->rows(), real_output_block->rows()); + for (size_t col = 0; col < expect_output_block->columns(); ++col) { + auto left_col = expect_output_block->get_by_position(col).column; + auto right_col = real_output_block->get_by_position(col).column; + for (size_t row = 0; row < expect_output_block->rows(); ++row) { + ASSERT_EQ(left_col->compare_at(row, row, *right_col, 0), 0); + } + } +} + // Null values are represented by Null() // The type of the constant column is represented as follows: Consted {TypeIndex::String} // A DataSet with a constant column can only have one row of data template -void check_function(const std::string& func_name, const std::vector& input_types, +void check_function(const std::string& func_name, const InputTypeSet& input_types, const DataSet& data_set) { // 1.0 create data type ut_type::UTDataTypeDescs descs; @@ -196,69 +380,12 @@ void check_function(const std::string& func_name, const std::vector& i auto column = desc.data_type->create_column(); column->reserve(row_size); - auto type_ptr = desc.data_type->is_nullable() ? - ((DataTypeNullable*)(desc.data_type.get()))->get_nested_type() : desc.data_type; - WhichDataType type(type_ptr); + auto type_ptr = desc.data_type->is_nullable() + ? ((DataTypeNullable*)(desc.data_type.get()))->get_nested_type() + : desc.data_type; for (int j = 0; j < row_size; j++) { - if (data_set[j].first[i].type() == typeid(Null)) { - column->insert_data(nullptr, 0); - continue; - } - - if (type.is_string()) { - auto str = std::any_cast(data_set[j].first[i]); - column->insert_data(str.c_str(), str.size()); - } else if (type.idx == TypeIndex::BitMap) { - BitmapValue* bitmap = std::any_cast(data_set[j].first[i]); - column->insert_data((char*)bitmap, sizeof(BitmapValue)); - } else if (type.is_int8()) { - auto value = std::any_cast(data_set[j].first[i]); - column->insert_data(reinterpret_cast(&value), 0); - } else if (type.is_int16()) { - auto value = std::any_cast(data_set[j].first[i]); - column->insert_data(reinterpret_cast(&value), 0); - } else if (type.is_int32()) { - auto value = std::any_cast(data_set[j].first[i]); - column->insert_data(reinterpret_cast(&value), 0); - } else if (type.is_int64()) { - auto value = std::any_cast(data_set[j].first[i]); - column->insert_data(reinterpret_cast(&value), 0); - } else if (type.is_int128()) { - auto value = std::any_cast(data_set[j].first[i]); - column->insert_data(reinterpret_cast(&value), 0); - } else if (type.is_float64()) { - auto value = std::any_cast(data_set[j].first[i]); - column->insert_data(reinterpret_cast(&value), 0); - } else if (type.is_float64()) { - auto value = std::any_cast(data_set[j].first[i]); - column->insert_data(reinterpret_cast(&value), 0); - } else if (type.is_decimal128()) { - auto value = std::any_cast>(data_set[j].first[i]); - column->insert_data(reinterpret_cast(&value), 0); - } else if (type.is_date_time()) { - static std::string date_time_format("%Y-%m-%d %H:%i:%s"); - auto datetime_str = std::any_cast(data_set[j].first[i]); - VecDateTimeValue v; - v.from_date_format_str(date_time_format.c_str(), date_time_format.size(), - datetime_str.c_str(), datetime_str.size()); - v.to_datetime(); - column->insert_data(reinterpret_cast(&v), 0); - } else if (type.is_date()) { - static std::string date_time_format("%Y-%m-%d"); - auto datetime_str = std::any_cast(data_set[j].first[i]); - VecDateTimeValue v; - v.from_date_format_str(date_time_format.c_str(), date_time_format.size(), - datetime_str.c_str(), datetime_str.size()); - v.cast_to_date(); - column->insert_data(reinterpret_cast(&v), 0); - } else if (type.is_array()) { - auto v = std::any_cast(data_set[j].first[i]); - column->insert(v); - } else { - LOG(WARNING) << "dataset not supported for TypeIndex:" << (int)type.idx; - ASSERT_TRUE(false); - } + ASSERT_TRUE(insert_cell(column, type_ptr, data_set[j].first[i])); } if (desc.is_const) { @@ -277,7 +404,8 @@ void check_function(const std::string& func_name, const std::vector& i arguments.push_back(i); arg_types.push_back(desc.type_desc); if (desc.is_const) { - constant_col_ptrs.push_back(std::make_shared(block.get_by_position(i).column)); + constant_col_ptrs.push_back( + std::make_shared(block.get_by_position(i).column)); constant_cols.push_back(constant_col_ptrs.back().get()); } else { constant_cols.push_back(nullptr); @@ -287,7 +415,8 @@ void check_function(const std::string& func_name, const std::vector& i // 2. execute function auto return_type = nullable ? make_nullable(std::make_shared()) : std::make_shared(); - auto func = SimpleFunctionFactory::instance().get_function(func_name, block.get_columns_with_type_and_name(), return_type); + auto func = SimpleFunctionFactory::instance().get_function( + func_name, block.get_columns_with_type_and_name(), return_type); ASSERT_TRUE(func != nullptr); doris_udf::FunctionContext::TypeDesc fn_ctx_return; diff --git a/be/test/vec/function/table_function_test.cpp b/be/test/vec/function/table_function_test.cpp new file mode 100644 index 0000000000..8404892480 --- /dev/null +++ b/be/test/vec/function/table_function_test.cpp @@ -0,0 +1,174 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exprs/mock_vexpr.h" +#include "vec/exprs/table_function/vexplode.h" +#include "vec/exprs/table_function/vexplode_numbers.h" +#include "vec/exprs/table_function/vexplode_split.h" +#include "vec/exprs/vexpr_context.h" +#include "vec/function/function_test_util.h" + +namespace doris::vectorized { + +using ::testing::_; +using ::testing::DoAll; +using ::testing::Return; +using ::testing::SetArgPointee; + +class TableFunctionTest : public testing::Test { +protected: + virtual void SetUp() {} + virtual void TearDown() {} + + void clear() { + _ctx = nullptr; + _root = nullptr; + _children.clear(); + _column_ids.clear(); + } + + void init_expr_context(int child_num) { + clear(); + + _root = std::make_unique(); + for (int i = 0; i < child_num; ++i) { + _column_ids.push_back(i); + _children.push_back(std::make_unique()); + EXPECT_CALL(*_children[i], execute(_, _, _)) + .WillRepeatedly(DoAll(SetArgPointee<2>(_column_ids[i]), Return(Status::OK()))); + _root->add_child(_children[i].get()); + } + _ctx = std::make_unique(_root.get()); + } + +private: + std::unique_ptr _ctx; + std::unique_ptr _root; + std::vector> _children; + std::vector _column_ids; +}; + +TEST_F(TableFunctionTest, vexplode_outer) { + init_expr_context(1); + VExplodeTableFunction explode_outer(true); + explode_outer.set_vexpr_context(_ctx.get()); + + // explode_outer(Array) + { + InputTypeSet input_types = {TypeIndex::Array, TypeIndex::Int32}; + Array vec = {Int32(1), Int32(2), Int32(3)}; + InputDataSet input_set = {{vec}, {Null()}, {Array()}}; + + InputTypeSet output_types = {TypeIndex::Int32}; + InputDataSet output_set = {{Int32(1)}, {Int32(2)}, {Int32(3)}, {Null()}, {Null()}}; + + check_vec_table_function(&explode_outer, input_types, input_set, output_types, output_set); + } + + // explode_outer(Array) + { + InputTypeSet input_types = {TypeIndex::Array, TypeIndex::String}; + Array vec = {std::string("abc"), std::string(""), std::string("def")}; + InputDataSet input_set = {{Null()}, {Array()}, {vec}}; + + InputTypeSet output_types = {TypeIndex::String}; + InputDataSet output_set = { + {Null()}, {Null()}, {std::string("abc")}, {std::string("")}, {std::string("def")}}; + + check_vec_table_function(&explode_outer, input_types, input_set, output_types, output_set); + } +} + +TEST_F(TableFunctionTest, vexplode) { + init_expr_context(1); + VExplodeTableFunction explode(false); + explode.set_vexpr_context(_ctx.get()); + + // explode(Array) + { + InputTypeSet input_types = {TypeIndex::Array, TypeIndex::Int32}; + + Array vec = {Int32(1), Int32(2), Int32(3)}; + InputDataSet input_set = {{vec}, {Null()}, {Array()}}; + + InputTypeSet output_types = {TypeIndex::Int32}; + InputDataSet output_set = {{Int32(1)}, {Int32(2)}, {Int32(3)}}; + + check_vec_table_function(&explode, input_types, input_set, output_types, output_set); + } + + // explode(Array) + { + InputTypeSet input_types = {TypeIndex::Array, TypeIndex::String}; + Array vec = {std::string("abc"), std::string(""), std::string("def")}; + InputDataSet input_set = {{Null()}, {Array()}, {vec}}; + + InputTypeSet output_types = {TypeIndex::String}; + InputDataSet output_set = {{std::string("abc")}, {std::string("")}, {std::string("def")}}; + + check_vec_table_function(&explode, input_types, input_set, output_types, output_set); + } +} + +TEST_F(TableFunctionTest, vexplode_numbers) { + init_expr_context(1); + VExplodeNumbersTableFunction tfn; + tfn.set_vexpr_context(_ctx.get()); + + { + InputTypeSet input_types = {TypeIndex::Int32}; + InputDataSet input_set = {{Int32(2)}, {Int32(3)}, {Null()}, {Int32(0)}, {Int32(-2)}}; + + InputTypeSet output_types = {TypeIndex::Int32}; + InputDataSet output_set = {{Int32(0)}, {Int32(1)}, {Int32(0)}, {Int32(1)}, + {Int32(2)}, {Null()}, {Null()}, {Null()}}; + + check_vec_table_function(&tfn, input_types, input_set, output_types, output_set); + } +} + +TEST_F(TableFunctionTest, vexplode_split) { + init_expr_context(2); + VExplodeSplitTableFunction tfn; + tfn.set_vexpr_context(_ctx.get()); + + { + // Case 1: explode_split(null) --- null + // Case 2: explode_split("a,b,c", ",") --> ["a", "b", "c"] + // Case 3: explode_split("a,b,c", "a,")) --> ["", "b,c"] + // Case 4: explode_split("", ",")) --> [""] + InputTypeSet input_types = {TypeIndex::String, TypeIndex::String}; + InputDataSet input_set = {{Null(), Null()}, + {std::string("a,b,c"), std::string(",")}, + {std::string("a,b,c"), std::string("a,")}, + {std::string(""), std::string(",")}}; + + InputTypeSet output_types = {TypeIndex::String}; + InputDataSet output_set = {{Null()}, {std::string("a")}, {std::string("b")}, + {std::string("c")}, {std::string("")}, {std::string("b,c")}, + {std::string("")}}; + + check_vec_table_function(&tfn, input_types, input_set, output_types, output_set); + } +} + +} // namespace doris::vectorized + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TypeDef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TypeDef.java index d81d63357f..37c8465d20 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TypeDef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TypeDef.java @@ -82,11 +82,6 @@ public class TypeDef implements ParseNode { analyzeScalarType((ScalarType) type); } - if (type.isArrayType()) { - Type itemType = ((ArrayType) type).getItemType(); - analyze(itemType); - } - if (type.isComplexType()) { if (!Config.enable_complex_type_support) { throw new AnalysisException("Unsupported data type: " + type.toSql()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java index 0bbc83ffd9..b705151d79 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java @@ -2370,6 +2370,8 @@ public class FunctionSet explodeSplits = Lists.newArrayList(); @@ -2419,5 +2421,31 @@ public class FunctionSet explodes = Lists.newArrayList(); + explodes.add(ScalarFunction.createBuiltin( + EXPLODE, Type.INT, Function.NullableMode.ALWAYS_NULLABLE, + Lists.newArrayList(new ArrayType(Type.INT)), false, + "_ZN5doris19DummyTableFunctions7explodeEPN9doris_udf15FunctionContextERKNS1_13CollectionValE", + null, null, true)); + explodes.add(ScalarFunction.createBuiltin( + EXPLODE, Type.VARCHAR, Function.NullableMode.ALWAYS_NULLABLE, + Lists.newArrayList(new ArrayType(Type.VARCHAR)), false, + "_ZN5doris19DummyTableFunctions7explodeEPN9doris_udf15FunctionContextERKNS1_13CollectionValE", + null, null, true)); + tableFunctions.put(EXPLODE, explodes); + + List explodeOuters = Lists.newArrayList(); + explodeOuters.add(ScalarFunction.createBuiltin( + EXPLODE_OUTER, Type.INT, Function.NullableMode.ALWAYS_NULLABLE, + Lists.newArrayList(new ArrayType(Type.INT)), false, + "_ZN5doris19DummyTableFunctions13explode_outerEPN9doris_udf15FunctionContextERKNS1_13CollectionValE", + null, null, true)); + explodeOuters.add(ScalarFunction.createBuiltin( + EXPLODE_OUTER, Type.VARCHAR, Function.NullableMode.ALWAYS_NULLABLE, + Lists.newArrayList(new ArrayType(Type.VARCHAR)), false, + "_ZN5doris19DummyTableFunctions13explode_outerEPN9doris_udf15FunctionContextERKNS1_13CollectionValE", + null, null, true)); + tableFunctions.put(EXPLODE_OUTER, explodeOuters); } }