[FIX](map) fix map key-column nullable for arrow serde #21762
arrow is not support key column has null element , but doris default map key column is nullable , so need to deal with if doris map row if key column has null element , we put null to arrow
This commit is contained in:
@ -19,6 +19,7 @@
|
||||
|
||||
#include "arrow/array/builder_nested.h"
|
||||
#include "util/jsonb_document.h"
|
||||
#include "util/simd/bits.h"
|
||||
#include "vec/columns/column.h"
|
||||
#include "vec/columns/column_const.h"
|
||||
#include "vec/columns/column_map.h"
|
||||
@ -50,13 +51,23 @@ void DataTypeMapSerDe::write_column_to_arrow(const IColumn& column, const NullMa
|
||||
auto& builder = assert_cast<arrow::MapBuilder&>(*array_builder);
|
||||
auto& map_column = assert_cast<const ColumnMap&>(column);
|
||||
const IColumn& nested_keys_column = map_column.get_keys();
|
||||
CHECK(!nested_keys_column.is_nullable());
|
||||
const IColumn& nested_values_column = map_column.get_values();
|
||||
// now we default set key value in map is nullable
|
||||
DCHECK(nested_keys_column.is_nullable());
|
||||
DCHECK(nested_values_column.is_nullable());
|
||||
auto keys_nullmap_data =
|
||||
check_and_get_column<ColumnNullable>(nested_keys_column)->get_null_map_data().data();
|
||||
auto& offsets = map_column.get_offsets();
|
||||
auto key_builder = builder.key_builder();
|
||||
auto value_builder = builder.item_builder();
|
||||
|
||||
for (size_t r = start; r < end; ++r) {
|
||||
if (null_map && (*null_map)[r]) {
|
||||
if ((null_map && (*null_map)[r])) {
|
||||
checkArrowStatus(builder.AppendNull(), column.get_name(),
|
||||
array_builder->type()->name());
|
||||
} else if (simd::contain_byte(keys_nullmap_data + offsets[r - 1],
|
||||
offsets[r] - offsets[r - 1], 1)) {
|
||||
// arrow do not support key is null so we just put null with this row
|
||||
checkArrowStatus(builder.AppendNull(), column.get_name(),
|
||||
array_builder->type()->name());
|
||||
} else {
|
||||
|
||||
@ -356,7 +356,9 @@ void serialize_and_deserialize_arrow_test() {
|
||||
type_desc.add_sub_type(TYPE_STRING, true);
|
||||
tslot.__set_slotType(type_desc.to_thrift());
|
||||
{
|
||||
DataTypePtr s = std::make_shared<DataTypeString>();
|
||||
DataTypePtr s =
|
||||
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
|
||||
;
|
||||
DataTypePtr d =
|
||||
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
|
||||
DataTypePtr m = std::make_shared<DataTypeMap>(s, d);
|
||||
@ -503,4 +505,84 @@ TEST(DataTypeSerDeArrowTest, DataTypeCollectionSerDeTest) {
|
||||
serialize_and_deserialize_arrow_test<false>();
|
||||
}
|
||||
|
||||
TEST(DataTypeSerDeArrowTest, DataTypeMapNullKeySerDeTest) {
|
||||
TupleDescriptor tuple_desc(PTupleDescriptor(), true);
|
||||
TSlotDescriptor tslot;
|
||||
std::string col_name = "map_null_key";
|
||||
tslot.__set_colName(col_name);
|
||||
TypeDescriptor type_desc(TYPE_MAP);
|
||||
type_desc.add_sub_type(TYPE_STRING, true);
|
||||
type_desc.add_sub_type(TYPE_INT, true);
|
||||
tslot.__set_slotType(type_desc.to_thrift());
|
||||
vectorized::Block block;
|
||||
{
|
||||
DataTypePtr s = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
|
||||
;
|
||||
DataTypePtr d = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
|
||||
DataTypePtr m = std::make_shared<DataTypeMap>(s, d);
|
||||
Array k1, k2, v1, v2, k3, v3;
|
||||
k1.push_back(Null());
|
||||
k1.push_back("doris");
|
||||
k1.push_back("clever amory");
|
||||
v1.push_back(11);
|
||||
v1.push_back(Null());
|
||||
v1.push_back(30);
|
||||
k2.push_back("hello amory");
|
||||
k2.push_back("NULL");
|
||||
k2.push_back("cute amory");
|
||||
k2.push_back("doris");
|
||||
v2.push_back(26);
|
||||
v2.push_back(Null());
|
||||
v2.push_back(6);
|
||||
v2.push_back(7);
|
||||
k3.push_back("test");
|
||||
k3.push_back(Null());
|
||||
v3.push_back(11);
|
||||
v3.push_back(30);
|
||||
Map m1, m2, m3;
|
||||
m1.push_back(k1);
|
||||
m1.push_back(v1);
|
||||
m2.push_back(k2);
|
||||
m2.push_back(v2);
|
||||
m3.push_back(k3);
|
||||
m3.push_back(v3);
|
||||
MutableColumnPtr map_column = m->create_column();
|
||||
map_column->reserve(3);
|
||||
map_column->insert(m1);
|
||||
map_column->insert(m2);
|
||||
map_column->insert(m3);
|
||||
vectorized::ColumnWithTypeAndName type_and_name(map_column->get_ptr(), m, col_name);
|
||||
block.insert(type_and_name);
|
||||
}
|
||||
|
||||
tslot.__set_col_unique_id(1);
|
||||
SlotDescriptor* slot = new SlotDescriptor(tslot);
|
||||
tuple_desc.add_slot(slot);
|
||||
RowDescriptor row_desc(&tuple_desc, true);
|
||||
// arrow schema
|
||||
std::shared_ptr<arrow::Schema> _arrow_schema;
|
||||
EXPECT_EQ(convert_to_arrow_schema(row_desc, &_arrow_schema), Status::OK());
|
||||
|
||||
// serialize
|
||||
std::shared_ptr<arrow::RecordBatch> result;
|
||||
std::cout << "block structure: " << block.dump_structure() << std::endl;
|
||||
std::cout << "_arrow_schema: " << _arrow_schema->ToString(true) << std::endl;
|
||||
|
||||
convert_to_arrow_batch(block, _arrow_schema, arrow::default_memory_pool(), &result);
|
||||
Block new_block = block.clone_empty();
|
||||
EXPECT_TRUE(result != nullptr);
|
||||
std::cout << "result: " << result->ToString() << std::endl;
|
||||
// deserialize
|
||||
auto* array = result->GetColumnByName(col_name).get();
|
||||
auto& column_with_type_and_name = new_block.get_by_name(col_name);
|
||||
arrow_column_to_doris_column(array, 0, column_with_type_and_name.column,
|
||||
column_with_type_and_name.type, block.rows(), "UTC");
|
||||
std::cout << block.dump_data() << std::endl;
|
||||
std::cout << new_block.dump_data() << std::endl;
|
||||
// new block row_index 0, 2 is should be empty
|
||||
EXPECT_EQ(new_block.dump_one_line(0, 1), "{}");
|
||||
EXPECT_EQ(new_block.dump_one_line(2, 1), "{}");
|
||||
EXPECT_EQ(block.dump_data(1, 1), new_block.dump_data(1, 1));
|
||||
}
|
||||
|
||||
} // namespace doris::vectorized
|
||||
|
||||
Reference in New Issue
Block a user