[Improve](explode) explode function support multi param (#50310)

### What problem does this PR solve?
backport:https://github.com/apache/doris/pull/48537
Issue Number: close #xxx

Related PR: #xxx

Problem Summary:

### Release note

None

### Check List (For Author)

- Test <!-- At least one of them must be included. -->
    - [ ] Regression test
    - [ ] Unit Test
    - [ ] Manual test (add detailed scripts or steps below)
    - [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
        - [ ] Previous test can cover this change.
        - [ ] No code files have been changed.
        - [ ] Other reason <!-- Add your reason?  -->

- Behavior changed:
    - [ ] No.
    - [ ] Yes. <!-- Explain the behavior change -->

- Does this need documentation?
    - [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->

### Check List (For Reviewer who merge this PR)

- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
This commit is contained in:
Benjaminwei
2025-04-23 23:27:07 +08:00
committed by GitHub
parent e5a9c2552d
commit cf72fa82e2
22 changed files with 1162 additions and 67 deletions

View File

@ -25,6 +25,7 @@
#include "runtime/runtime_state.h"
#include "util/binary_cast.hpp"
#include "util/bitmap_value.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type_array.h"
#include "vec/data_types/data_type_bitmap.h"
#include "vec/data_types/data_type_date.h"
@ -33,7 +34,9 @@
#include "vec/data_types/data_type_ipv4.h"
#include "vec/data_types/data_type_ipv6.h"
#include "vec/data_types/data_type_jsonb.h"
#include "vec/data_types/data_type_map.h"
#include "vec/data_types/data_type_string.h"
#include "vec/data_types/data_type_struct.h"
#include "vec/data_types/data_type_time_v2.h"
#include "vec/exprs/table_function/table_function.h"
#include "vec/runtime/vdatetime_value.h"
@ -176,6 +179,29 @@ size_t type_index_to_data_type(const std::vector<AnyType>& input_types, size_t i
type = std::make_shared<DataTypeArray>(sub_type);
return ret + 1;
}
case TypeIndex::Struct: {
desc.type = doris::PrimitiveType::TYPE_STRUCT;
++index;
size_t ret = 0;
DataTypes sub_types;
while (index < input_types.size()) {
ut_type::UTDataTypeDesc sub_desc;
DataTypePtr sub_type = nullptr;
size_t inner_ret = type_index_to_data_type(input_types, index, sub_desc, sub_type);
if (inner_ret <= 0) {
return inner_ret;
}
ret += inner_ret;
desc.children.push_back(sub_desc.type_desc);
if (sub_desc.is_nullable) {
sub_type = make_nullable(sub_type);
sub_types.push_back(sub_type);
}
++index;
}
type = std::make_shared<DataTypeStruct>(sub_types);
return ret + 1;
}
case TypeIndex::Nullable: {
++index;
size_t ret = type_index_to_data_type(input_types, index, ut_desc, type);
@ -307,6 +333,28 @@ bool insert_cell(MutableColumnPtr& column, DataTypePtr type_ptr, const AnyType&
} else if (type.is_array()) {
auto v = any_cast<Array>(cell);
column->insert(v);
} else if (type.is_struct()) {
auto v = any_cast<CellSet>(cell);
auto struct_type = assert_cast<const DataTypeStruct*>(type_ptr.get());
auto nullable_column = assert_cast<ColumnNullable*>(column.get());
auto* struct_column =
assert_cast<ColumnStruct*>(nullable_column->get_nested_column_ptr().get());
auto* nullmap_column =
assert_cast<ColumnUInt8*>(nullable_column->get_null_map_column_ptr().get());
nullmap_column->insert_default();
for (size_t i = 0; i < v.size(); ++i) {
auto& field = v[i];
auto col = struct_column->get_column(i).get_ptr();
RETURN_IF_FALSE(insert_cell(col, struct_type->get_element(i), field));
}
} else if (type.is_nullable()) {
auto nullable_column = assert_cast<ColumnNullable*>(column.get());
auto col_type = remove_nullable(type_ptr);
auto col = nullable_column->get_nested_column_ptr();
auto* nullmap_column =
assert_cast<ColumnUInt8*>(nullable_column->get_null_map_column_ptr().get());
nullmap_column->insert_default();
RETURN_IF_FALSE(insert_cell(col, col_type, cell));
} else {
LOG(WARNING) << "dataset not supported for TypeIndex:" << (int)type.idx;
return false;
@ -324,32 +372,39 @@ Block* create_block_from_inputset(const InputTypeSet& input_types, const InputDa
// 1.1 insert data and create block
auto row_size = input_set.size();
std::unique_ptr<Block> block = Block::create_unique();
auto input_set_size = input_set[0].size();
// 1.2 calculate the input column size
auto input_col_size = input_set_size / descs.size();
for (size_t i = 0; i < descs.size(); ++i) {
auto& desc = descs[i];
auto column = desc.data_type->create_column();
column->reserve(row_size);
for (size_t j = 0; j < input_col_size; ++j) {
auto column = desc.data_type->create_column();
column->reserve(row_size);
auto type_ptr = desc.data_type->is_nullable()
? ((DataTypeNullable*)(desc.data_type.get()))->get_nested_type()
: desc.data_type;
WhichDataType type(type_ptr);
auto type_ptr = desc.data_type->is_nullable()
? ((DataTypeNullable*)(desc.data_type.get()))->get_nested_type()
: desc.data_type;
WhichDataType type(type_ptr);
for (int j = 0; j < row_size; j++) {
if (!insert_cell(column, type_ptr, input_set[j][i])) {
return nullptr;
for (int r = 0; r < row_size; r++) {
if (!insert_cell(column, type_ptr, input_set[r][i * input_col_size + j])) {
return nullptr;
}
}
}
if (desc.is_const) {
column = ColumnConst::create(std::move(column), row_size);
if (desc.is_const) {
column = ColumnConst::create(std::move(column), row_size);
}
block->insert({std::move(column), desc.data_type, desc.col_name});
}
block->insert({std::move(column), desc.data_type, desc.col_name});
}
return block.release();
}
Block* process_table_function(TableFunction* fn, Block* input_block,
const InputTypeSet& output_types) {
const InputTypeSet& output_types, bool test_get_value_func) {
// pasrse output data types
ut_type::UTDataTypeDescs descs;
if (!parse_ut_data_type(output_types, descs)) {
@ -383,8 +438,12 @@ Block* process_table_function(TableFunction* fn, Block* input_block,
}
do {
fn->get_same_many_values(column, 1);
fn->forward();
if (test_get_value_func) {
fn->get_value(column, 10);
} else {
fn->get_same_many_values(column, 1);
fn->forward();
}
} while (!fn->eos());
}
@ -395,7 +454,7 @@ Block* process_table_function(TableFunction* fn, Block* input_block,
void check_vec_table_function(TableFunction* fn, const InputTypeSet& input_types,
const InputDataSet& input_set, const InputTypeSet& output_types,
const InputDataSet& output_set) {
const InputDataSet& output_set, const bool test_get_value_func) {
std::unique_ptr<Block> input_block(create_block_from_inputset(input_types, input_set));
EXPECT_TRUE(input_block != nullptr);
@ -404,7 +463,7 @@ void check_vec_table_function(TableFunction* fn, const InputTypeSet& input_types
EXPECT_TRUE(expect_output_block != nullptr);
std::unique_ptr<Block> real_output_block(
process_table_function(fn, input_block.get(), output_types));
process_table_function(fn, input_block.get(), output_types, test_get_value_func));
EXPECT_TRUE(real_output_block != nullptr);
// compare real_output_block with expect_output_block