[feature-wip](array-type) Support insertion for vectorized engine. (#8494) (#8590)

Please refer to #8493
This commit is contained in:
Adonis Ling
2022-03-22 15:48:13 +08:00
committed by GitHub
parent 71ce3c4a6e
commit 2580da4f72
20 changed files with 789 additions and 484 deletions

View File

@ -31,6 +31,7 @@
#include "olap/field.h"
#include "olap/fs/block_manager.h"
#include "olap/fs/fs_util.h"
#include "olap/row_block2.h"
#include "olap/rowset/segment_v2/column_reader.h"
#include "olap/rowset/segment_v2/column_writer.h"
#include "olap/tablet_schema.h"
@ -39,9 +40,11 @@
#include "runtime/mem_pool.h"
#include "runtime/mem_tracker.h"
#include "runtime/raw_value.h"
#include "test_util/array_utils.h"
#include "testutil/desc_tbl_builder.h"
#include "util/file_utils.h"
#include "util/uid_util.h"
#include "vec/core/block.h"
namespace doris {
@ -103,86 +106,19 @@ TupleDescriptor* get_tuple_descriptor(ObjectPool& object_pool, const TypeInfo* t
return builder.build()->get_tuple_descriptor(0);
}
CollectionValue* parse(ObjectPool& object_pool,
const rapidjson::GenericValue<rapidjson::UTF8<>>::ConstArray& json_array,
const TypeDescriptor& type_desc) {
if (json_array.Empty()) {
return object_pool.add(new CollectionValue(0));
} else {
auto array = object_pool.add(new CollectionValue());
const auto& item_type_desc = type_desc.children[0];
CollectionValue::init_collection(&object_pool, json_array.Size(), item_type_desc.type,
array);
int index = 0;
switch (item_type_desc.type) {
case TYPE_ARRAY:
for (auto it = json_array.Begin(); it != json_array.End(); ++it) {
auto val = CollectionVal();
if (it->IsNull()) {
val.is_null = true;
} else {
auto sub_array = parse(object_pool, it->GetArray(), item_type_desc);
sub_array->to_collection_val(&val);
}
array->set(index++, item_type_desc.type, &val);
}
break;
case TYPE_INT:
for (auto it = json_array.Begin(); it != json_array.End(); ++it) {
auto val = it->IsNull() ? IntVal::null() : IntVal(it->GetInt());
array->set(index++, item_type_desc.type, &val);
}
break;
case TYPE_VARCHAR:
for (auto it = json_array.Begin(); it != json_array.End(); ++it) {
if (it->IsNull()) {
auto val = StringVal::null();
array->set(index++, item_type_desc.type, &val);
} else {
char* string = object_pool.add_array(new char[it->GetStringLength()]);
memcpy(string, it->GetString(), it->GetStringLength());
auto val = StringVal(reinterpret_cast<uint8_t*>(string), it->GetStringLength());
array->set(index++, item_type_desc.type, &val);
}
}
break;
default:
break;
}
if (!array->has_null()) {
array->set_null_signs(nullptr);
}
return array;
}
}
CollectionValue* parse(ObjectPool& object_pool, const std::string& text,
const TypeDescriptor& type_desc) {
rapidjson::Document document;
if (document.Parse(text.c_str()).HasParseError() || !document.IsArray()) {
CollectionValue* parse(MemPool& mem_pool, FunctionContext& context, const std::string& text,
const ColumnPB& column_pb) {
auto collection_value =
reinterpret_cast<CollectionValue*>(mem_pool.allocate(sizeof(CollectionValue)));
auto status = ArrayUtils::create_collection_value(collection_value, &context, text);
if (!status.ok()) {
return nullptr;
}
return parse(object_pool, (const_cast<const rapidjson::Document*>(&document))->GetArray(),
type_desc);
}
void validate(const Field* field, const CollectionValue* expect, const CollectionValue* actual,
bool check_nullptr) {
EXPECT_TRUE(field->type_info()->equal(expect, actual));
if (check_nullptr) {
if (expect->length() == 0) {
EXPECT_EQ(nullptr, actual->data());
EXPECT_EQ(expect->data(), actual->data());
}
if (!expect->has_null()) {
EXPECT_EQ(nullptr, expect->null_signs());
EXPECT_EQ(expect->null_signs(), actual->null_signs());
}
}
return collection_value;
}
void validate(const Field* field, const CollectionValue* expect, const CollectionValue* actual) {
validate(field, expect, actual, true);
EXPECT_TRUE(field->type_info()->equal(expect, actual));
}
class ArrayTest : public ::testing::Test {
@ -254,7 +190,7 @@ private:
for (auto array : arrays) {
field->type_info()->direct_copy(&cell, array);
EXPECT_EQ(cell.null_signs(), reinterpret_cast<bool*>(variable_ptr.get()));
validate(field, array, &cell, false);
validate(field, array, &cell);
}
}
@ -263,8 +199,13 @@ private:
const std::vector<const CollectionValue*>& arrays) {
const std::string path = TEST_DIR + "/" + generate_uuid_string();
LOG(INFO) << "Test directory: " << path;
segment_v2::ColumnMetaPB meta;
init_column_meta<array_encoding, item_encoding>(&meta, column_pb);
TabletColumn tablet_column;
tablet_column.init_from_pb(column_pb);
Schema schema({tablet_column}, 0);
{
auto wblock = create_writable_block(path);
ASSERT_NE(wblock, nullptr);
@ -295,14 +236,8 @@ private:
auto st = iter->seek_to_first();
ASSERT_TRUE(st.ok()) << st.to_string();
auto tracker = std::make_shared<MemTracker>();
MemPool pool(tracker.get());
std::unique_ptr<ColumnVectorBatch> cvb;
ColumnVectorBatch::create(0, true, field->type_info(), const_cast<Field*>(field), &cvb);
ASSERT_NE(cvb, nullptr) << st.to_string();
cvb->resize(1024);
ColumnBlock col(cvb.get(), &pool);
RowBlockV2 block(schema, 1024);
auto col = block.column_block(0);
int index = 0;
size_t rows_read = 1024;
do {
@ -311,12 +246,17 @@ private:
ASSERT_TRUE(st.ok());
for (int i = 0; i < rows_read; ++i) {
validate(field, arrays[index++],
reinterpret_cast<const CollectionValue*>(col.cell_ptr(i)), false);
reinterpret_cast<const CollectionValue*>(col.cell_ptr(i)));
}
ASSERT_TRUE(st.ok());
} while (rows_read >= 1024);
auto tuple_desc = get_tuple_descriptor(_object_pool, get_type_info(column_pb).get());
block.set_selected_size(rows_read);
test_convert_to_vec_block(block, tuple_desc, field, arrays);
}
}
template <segment_v2::EncodingTypePB array_encoding, segment_v2::EncodingTypePB item_encoding>
void init_column_meta(segment_v2::ColumnMetaPB* meta, const ColumnPB& column_pb) {
int column_id = 0;
@ -405,11 +345,30 @@ private:
template <segment_v2::EncodingTypePB array_encoding, segment_v2::EncodingTypePB item_encoding>
void test_array(const ColumnPB& column_pb, const Field* field,
const TupleDescriptor* tuple_desc, const CollectionValue* array) {
ASSERT_NE(array, nullptr);
test_copy_array(tuple_desc, field, array);
test_direct_copy_array(field, {array});
test_write_and_read_column<array_encoding, item_encoding>(column_pb, field, {array});
}
void test_convert_to_vec_block(RowBlockV2& row_block, const TupleDescriptor* tuple_desc,
const Field* field,
const std::vector<const CollectionValue*>& arrays) {
vectorized::Block block;
for (const auto slot_desc : tuple_desc->slots()) {
block.insert(vectorized::ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
slot_desc->get_data_type_ptr(),
slot_desc->col_name()));
}
row_block.convert_to_vec_block(&block);
for (int i = 0; i < arrays.size(); ++i) {
auto tuple = block.deep_copy_tuple(*tuple_desc, _mem_pool.get(), i, 0, false);
auto actual = tuple->get_collection_slot(tuple_desc->slots().front()->tuple_offset());
validate(field, arrays[i], actual);
}
}
private:
static constexpr size_t MAX_MEMORY_BYTES = 1024 * 1024;
static const std::string TEST_DIR;
@ -426,16 +385,17 @@ TEST_F(ArrayTest, TestSimpleIntArrays) {
auto field = create_field(column_pb);
auto tuple_desc = get_tuple_descriptor(_object_pool, type_info.get());
ASSERT_EQ(tuple_desc->slots().size(), 1);
auto type_desc = tuple_desc->slots().front()->type();
FunctionContext context;
ArrayUtils::prepare_context(context, *_mem_pool, column_pb);
std::vector<const CollectionValue*> arrays = {
parse(_object_pool, "[]", type_desc),
parse(_object_pool, "[null]", type_desc),
parse(_object_pool, "[1, 2, 3]", type_desc),
parse(_object_pool, "[1, null, 3]", type_desc),
parse(_object_pool, "[1, null, null]", type_desc),
parse(_object_pool, "[null, null, 3]", type_desc),
parse(_object_pool, "[null, null, null]", type_desc),
parse(*_mem_pool, context, "[]", column_pb),
parse(*_mem_pool, context, "[null]", column_pb),
parse(*_mem_pool, context, "[1, 2, 3]", column_pb),
parse(*_mem_pool, context, "[1, null, 3]", column_pb),
parse(*_mem_pool, context, "[1, null, null]", column_pb),
parse(*_mem_pool, context, "[null, null, 3]", column_pb),
parse(*_mem_pool, context, "[null, null, null]", column_pb),
};
for (auto array : arrays) {
test_array<segment_v2::DEFAULT_ENCODING, segment_v2::BIT_SHUFFLE>(column_pb, field.get(),
@ -453,15 +413,16 @@ TEST_F(ArrayTest, TestNestedIntArrays) {
auto field = create_field(column_pb);
auto tuple_desc = get_tuple_descriptor(_object_pool, type_info.get());
ASSERT_EQ(tuple_desc->slots().size(), 1);
auto type_desc = tuple_desc->slots().front()->type();
auto context = std::make_unique<FunctionContext>();
ArrayUtils::prepare_context(*context, *_mem_pool, column_pb);
std::vector<const CollectionValue*> arrays = {
parse(_object_pool, "[]", type_desc),
parse(_object_pool, "[[]]", type_desc),
parse(_object_pool, "[[1, 2, 3], [4, 5, 6]]", type_desc),
parse(_object_pool, "[[1, 2, 3], null, [4, 5, 6]]", type_desc),
parse(_object_pool, "[[1, 2, null], null, [4, null, 6], null, [null, 8, 9]]",
type_desc),
parse(*_mem_pool, *context, "[]", column_pb),
parse(*_mem_pool, *context, "[[]]", column_pb),
parse(*_mem_pool, *context, "[[1, 2, 3], [4, 5, 6]]", column_pb),
parse(*_mem_pool, *context, "[[1, 2, 3], null, [4, 5, 6]]", column_pb),
parse(*_mem_pool, *context, "[[1, 2, null], null, [4, null, 6], null, [null, 8, 9]]",
column_pb),
};
for (auto array : arrays) {
test_array<segment_v2::DEFAULT_ENCODING, segment_v2::BIT_SHUFFLE>(column_pb, field.get(),
@ -477,15 +438,17 @@ TEST_F(ArrayTest, TestNestedIntArrays) {
field = create_field(column_pb);
tuple_desc = get_tuple_descriptor(_object_pool, type_info.get());
ASSERT_EQ(tuple_desc->slots().size(), 1);
type_desc = tuple_desc->slots().front()->type();
arrays.clear();
ASSERT_EQ(arrays.size(), 0);
context.reset(new FunctionContext);
ArrayUtils::prepare_context(*context, *_mem_pool, column_pb);
arrays = {
parse(_object_pool, "[]", type_desc),
parse(_object_pool, "[[]]", type_desc),
parse(_object_pool, "[[[]]]", type_desc),
parse(_object_pool, "[[[null]], [[1], [2, 3]], [[4, 5, 6], null, null]]", type_desc),
parse(*_mem_pool, *context, "[]", column_pb),
parse(*_mem_pool, *context, "[[]]", column_pb),
parse(*_mem_pool, *context, "[[[]]]", column_pb),
parse(*_mem_pool, *context, "[[[null]], [[1], [2, 3]], [[4, 5, 6], null, null]]",
column_pb),
};
for (auto array : arrays) {
test_array<segment_v2::DEFAULT_ENCODING, segment_v2::BIT_SHUFFLE>(column_pb, field.get(),
@ -502,17 +465,18 @@ TEST_F(ArrayTest, TestSimpleStringArrays) {
auto field = create_field(column_pb);
auto tuple_desc = get_tuple_descriptor(_object_pool, type_info.get());
ASSERT_EQ(tuple_desc->slots().size(), 1);
auto type_desc = tuple_desc->slots().front()->type();
FunctionContext context;
ArrayUtils::prepare_context(context, *_mem_pool, column_pb);
std::vector<const CollectionValue*> arrays = {
parse(_object_pool, "[]", type_desc),
parse(_object_pool, "[null]", type_desc),
parse(_object_pool, "[\"a\", \"b\", \"c\"]", type_desc),
parse(_object_pool, "[null, \"b\", \"c\"]", type_desc),
parse(_object_pool, "[\"a\", null, \"c\"]", type_desc),
parse(_object_pool, "[\"a\", \"b\", null]", type_desc),
parse(_object_pool, "[null, \"b\", null]", type_desc),
parse(_object_pool, "[null, null, null]", type_desc),
parse(*_mem_pool, context, "[]", column_pb),
parse(*_mem_pool, context, "[null]", column_pb),
parse(*_mem_pool, context, "[\"a\", \"b\", \"c\"]", column_pb),
parse(*_mem_pool, context, "[null, \"b\", \"c\"]", column_pb),
parse(*_mem_pool, context, "[\"a\", null, \"c\"]", column_pb),
parse(*_mem_pool, context, "[\"a\", \"b\", null]", column_pb),
parse(*_mem_pool, context, "[null, \"b\", null]", column_pb),
parse(*_mem_pool, context, "[null, null, null]", column_pb),
};
for (auto array : arrays) {
test_array<segment_v2::DEFAULT_ENCODING, segment_v2::DICT_ENCODING>(column_pb, field.get(),
@ -529,15 +493,16 @@ TEST_F(ArrayTest, TestNestedStringArrays) {
auto field = create_field(column_pb);
auto tuple_desc = get_tuple_descriptor(_object_pool, type_info.get());
ASSERT_EQ(tuple_desc->slots().size(), 1);
auto type_desc = tuple_desc->slots().front()->type();
FunctionContext context;
ArrayUtils::prepare_context(context, *_mem_pool, column_pb);
std::vector<const CollectionValue*> arrays = {
parse(_object_pool, "[]", type_desc),
parse(_object_pool, "[[]]", type_desc),
parse(_object_pool, "[[[]]]", type_desc),
parse(_object_pool, "[null, [null], [[null]]]", type_desc),
parse(_object_pool, "[[[\"a\", null, \"c\"], [\"d\", \"e\", \"f\"]], null, [[\"g\"]]]",
type_desc),
parse(*_mem_pool, context, "[]", column_pb),
parse(*_mem_pool, context, "[[]]", column_pb),
parse(*_mem_pool, context, "[[[]]]", column_pb),
parse(*_mem_pool, context, "[null, [null], [[null]]]", column_pb),
parse(*_mem_pool, context,
"[[[\"a\", null, \"c\"], [\"d\", \"e\", \"f\"]], null, [[\"g\"]]]", column_pb),
};
for (auto array : arrays) {
test_array<segment_v2::DEFAULT_ENCODING, segment_v2::DICT_ENCODING>(column_pb, field.get(),