branch-2.1: [fix](array index) Correct null bitmap writing for inverted index #47846 (#48214)

cherry pick from #47846 #48231
This commit is contained in:
airborne12
2025-02-25 20:31:18 +08:00
committed by GitHub
parent 27cd8636bb
commit 1aa57a3b13
7 changed files with 1085 additions and 112 deletions

View File

@ -507,7 +507,9 @@ Status ScalarColumnWriter::init() {
return Status::OK();
}
Status add_nulls(uint32_t count) override { return Status::OK(); }
Status add_array_nulls(uint32_t row_id) override { return Status::OK(); }
Status add_array_nulls(const uint8_t* null_map, size_t num_rows) override {
return Status::OK();
}
Status finish() override { return Status::OK(); }
int64_t size() const override { return 0; }
int64_t file_size() const override { return 0; }
@ -1018,11 +1020,7 @@ Status ArrayColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t
RETURN_IF_ERROR(append_data(ptr, num_rows));
if (is_nullable()) {
if (_opts.need_inverted_index) {
for (int row_id = 0; row_id < num_rows; row_id++) {
if (null_map[row_id] == 1) {
RETURN_IF_ERROR(_inverted_index_builder->add_array_nulls(row_id));
}
}
RETURN_IF_ERROR(_inverted_index_builder->add_array_nulls(null_map, num_rows));
}
RETURN_IF_ERROR(_null_writer->append_data(&null_map, num_rows));
}

View File

@ -323,8 +323,26 @@ public:
return Status::OK();
}
Status add_array_nulls(uint32_t row_id) override {
_null_bitmap.add(row_id);
Status add_array_nulls(const uint8_t* null_map, size_t num_rows) override {
DCHECK(_rid >= num_rows);
if (num_rows == 0 || null_map == nullptr) {
return Status::OK();
}
std::vector<uint32_t> null_indices;
null_indices.reserve(num_rows / 8);
// because _rid is the row id in block, not segment, and we add data before we add nulls,
// so we need to subtract num_rows to get the row id in segment
for (size_t i = 0; i < num_rows; i++) {
if (null_map[i] == 1) {
null_indices.push_back(_rid - num_rows + static_cast<uint32_t>(i));
}
}
if (!null_indices.empty()) {
_null_bitmap.addMany(null_indices.size(), null_indices.data());
}
return Status::OK();
}
@ -384,8 +402,11 @@ public:
return Status::OK();
}
Status add_array_values(size_t field_size, const void* value_ptr, const uint8_t* null_map,
const uint8_t* offsets_ptr, size_t count) override {
Status add_array_values(size_t field_size, const void* value_ptr,
const uint8_t* nested_null_map, const uint8_t* offsets_ptr,
size_t count) override {
DBUG_EXECUTE_IF("InvertedIndexColumnWriterImpl::add_array_values_count_is_zero",
{ count = 0; })
if (count == 0) {
// no values to add inverted index
return Status::OK();
@ -408,7 +429,7 @@ public:
lucene::document::Field* new_field = nullptr;
CL_NS(analysis)::TokenStream* ts = nullptr;
for (auto j = start_off; j < start_off + array_elem_size; ++j) {
if (null_map[j] == 1) {
if (nested_null_map && nested_null_map[j] == 1) {
continue;
}
auto* v = (Slice*)((const uint8_t*)value_ptr + j * field_size);
@ -471,7 +492,7 @@ public:
for (int i = 0; i < count; ++i) {
auto array_elem_size = offsets[i + 1] - offsets[i];
for (size_t j = start_off; j < start_off + array_elem_size; ++j) {
if (null_map[j] == 1) {
if (nested_null_map && nested_null_map[j] == 1) {
continue;
}
const CppType* p = &reinterpret_cast<const CppType*>(value_ptr)[j];
@ -488,6 +509,12 @@ public:
Status add_array_values(size_t field_size, const CollectionValue* values,
size_t count) override {
if constexpr (field_is_slice_type(field_type)) {
DBUG_EXECUTE_IF("InvertedIndexColumnWriterImpl::add_array_values_field_is_nullptr",
{ _field = nullptr; })
DBUG_EXECUTE_IF(
"InvertedIndexColumnWriterImpl::add_array_values_index_writer_is_"
"nullptr",
{ _index_writer = nullptr; })
if (_field == nullptr || _index_writer == nullptr) {
LOG(ERROR) << "field or index writer is null in inverted index writer.";
return Status::InternalError(
@ -548,9 +575,10 @@ public:
std::string new_value;
size_t value_length = sizeof(CppType);
DBUG_EXECUTE_IF("InvertedIndexColumnWriterImpl::add_value_bkd_writer_add_throw_error", {
_CLTHROWA(CL_ERR_IllegalArgument, ("packedValue should be length=xxx"));
});
DBUG_EXECUTE_IF(
"InvertedIndexColumnWriterImpl::add_value_bkd_writer_add_throw_"
"error",
{ _CLTHROWA(CL_ERR_IllegalArgument, ("packedValue should be length=xxx")); });
_value_key_coder->full_encode_ascending(&value, &new_value);
_bkd_writer->add((const uint8_t*)new_value.c_str(), value_length, _rid);
@ -614,8 +642,8 @@ public:
_bkd_writer->finish(data_out.get(), index_out.get()),
int(field_type));
} else {
LOG(WARNING)
<< "Inverted index writer create output error occurred: nullptr";
LOG(WARNING) << "Inverted index writer create output error "
"occurred: nullptr";
_CLTHROWA(CL_ERR_IO, "Create output error with nullptr");
}
meta_out->close();
@ -630,9 +658,12 @@ public:
write_null_bitmap(null_bitmap_out.get());
close();
DBUG_EXECUTE_IF(
"InvertedIndexWriter._throw_clucene_error_in_fulltext_writer_close", {
"InvertedIndexWriter._throw_clucene_error_in_fulltext_"
"writer_close",
{
_CLTHROWA(CL_ERR_IO,
"debug point: test throw error in fulltext index writer");
"debug point: test throw error in fulltext "
"index writer");
});
}
} catch (CLuceneError& e) {

View File

@ -64,7 +64,7 @@ public:
size_t count) = 0;
virtual Status add_nulls(uint32_t count) = 0;
virtual Status add_array_nulls(uint32_t row_id) = 0;
virtual Status add_array_nulls(const uint8_t* null_map, size_t num_rows) = 0;
virtual Status finish() = 0;

View File

@ -509,9 +509,9 @@ Status IndexBuilder::_write_inverted_index_data(TabletSchemaSPtr tablet_schema,
return converted_result.first;
}
const auto* ptr = (const uint8_t*)converted_result.second->get_data();
if (converted_result.second->get_nullmap()) {
RETURN_IF_ERROR(_add_nullable(column_name, writer_sign, field.get(),
converted_result.second->get_nullmap(), &ptr,
const auto* null_map = converted_result.second->get_nullmap();
if (null_map) {
RETURN_IF_ERROR(_add_nullable(column_name, writer_sign, field.get(), null_map, &ptr,
block->rows()));
} else {
RETURN_IF_ERROR(_add_data(column_name, writer_sign, field.get(), &ptr, block->rows()));
@ -526,6 +526,32 @@ Status IndexBuilder::_add_nullable(const std::string& column_name,
const std::pair<int64_t, int64_t>& index_writer_sign,
Field* field, const uint8_t* null_map, const uint8_t** ptr,
size_t num_rows) {
// TODO: need to process null data for inverted index
if (field->type() == FieldType::OLAP_FIELD_TYPE_ARRAY) {
DCHECK(field->get_sub_field_count() == 1);
// [size, offset_ptr, item_data_ptr, item_nullmap_ptr]
const auto* data_ptr = reinterpret_cast<const uint64_t*>(*ptr);
// total number length
auto offset_data = *(data_ptr + 1);
const auto* offsets_ptr = (const uint8_t*)offset_data;
try {
auto data = *(data_ptr + 2);
auto nested_null_map = *(data_ptr + 3);
RETURN_IF_ERROR(_inverted_index_builders[index_writer_sign]->add_array_values(
field->get_sub_field(0)->size(), reinterpret_cast<const void*>(data),
reinterpret_cast<const uint8_t*>(nested_null_map), offsets_ptr, num_rows));
DBUG_EXECUTE_IF("IndexBuilder::_add_nullable_add_array_values_error", {
_CLTHROWA(CL_ERR_IO, "debug point: _add_nullable_add_array_values_error");
})
RETURN_IF_ERROR(_inverted_index_builders[index_writer_sign]->add_array_nulls(null_map,
num_rows));
} catch (const std::exception& e) {
return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"CLuceneError occured: {}", e.what());
}
return Status::OK();
}
size_t offset = 0;
auto next_run_step = [&]() {
size_t step = 1;
@ -538,37 +564,6 @@ Status IndexBuilder::_add_nullable(const std::string& column_name,
}
return step;
};
// TODO: need to process null data for inverted index
if (field->type() == FieldType::OLAP_FIELD_TYPE_ARRAY) {
DCHECK(field->get_sub_field_count() == 1);
// [size, offset_ptr, item_data_ptr, item_nullmap_ptr]
const auto* data_ptr = reinterpret_cast<const uint64_t*>(*ptr);
// total number length
auto element_cnt = size_t((unsigned long)(*data_ptr));
auto offset_data = *(data_ptr + 1);
const auto* offsets_ptr = (const uint8_t*)offset_data;
try {
if (element_cnt > 0) {
auto data = *(data_ptr + 2);
auto nested_null_map = *(data_ptr + 3);
RETURN_IF_ERROR(_inverted_index_builders[index_writer_sign]->add_array_values(
field->get_sub_field(0)->size(), reinterpret_cast<const void*>(data),
reinterpret_cast<const uint8_t*>(nested_null_map), offsets_ptr, num_rows));
}
} catch (const std::exception& e) {
return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"CLuceneError occured: {}", e.what());
}
// we should refresh nullmap for array
for (int row_id = 0; row_id < num_rows; row_id++) {
if (null_map && null_map[row_id] == 1) {
RETURN_IF_ERROR(
_inverted_index_builders[index_writer_sign]->add_array_nulls(row_id));
}
}
return Status::OK();
}
try {
do {
auto step = next_run_step();

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,17 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
-- !sql --
-- !sql --
1 \N
2 \N
-- !sql --
-- !sql --
-- !sql --
1 \N
2 \N

View File

@ -106,9 +106,9 @@ suite("test_add_index_for_arr") {
// query without inverted index
// query rows with array_contains
def sql_query_name1 = sql "select id, name[1], description[1] from my_test_array where array_contains(name,'text7')"
def sql_query_name1 = sql "select id, name[1], description[1] from my_test_array where array_contains(name,'text7') order by id"
// query rows with !array_contains
def sql_query_name2 = sql "select id, name[1], description[1] from my_test_array where !array_contains(name,'text7')"
def sql_query_name2 = sql "select id, name[1], description[1] from my_test_array where !array_contains(name,'text7') order by id"
// add index for name
sql "ALTER TABLE my_test_array ADD INDEX name_idx (name) USING INVERTED;"
@ -122,9 +122,9 @@ suite("test_add_index_for_arr") {
// query with inverted index
sql "set enable_inverted_index_query=true"
// query rows with array_contains
def sql_query_name1_inverted = sql "select id, name[1], description[1] from my_test_array where array_contains(name,'text7')"
def sql_query_name1_inverted = sql "select id, name[1], description[1] from my_test_array where array_contains(name,'text7') order by id"
// query rows with !array_contains
def sql_query_name2_inverted = sql "select id, name[1], description[1] from my_test_array where !array_contains(name,'text7')"
def sql_query_name2_inverted = sql "select id, name[1], description[1] from my_test_array where !array_contains(name,'text7') order by id"
// check result for query without inverted index and with inverted index
def size1 = sql_query_name1.size();
@ -147,9 +147,38 @@ suite("test_add_index_for_arr") {
sql "drop index name_idx on my_test_array"
wait_for_latest_op_on_table_finish("my_test_array", timeout)
def sql_query_name1_without_inverted = sql "select id, name[1], description[1] from my_test_array where array_contains(name,'text7')"
def sql_query_name2_without_inverted = sql "select id, name[1], description[1] from my_test_array where !array_contains(name,'text7')"
def sql_query_name1_without_inverted = sql "select id, name[1], description[1] from my_test_array where array_contains(name,'text7') order by id"
def sql_query_name2_without_inverted = sql "select id, name[1], description[1] from my_test_array where !array_contains(name,'text7') order by id"
assertEquals(sql_query_name1.size(), sql_query_name1_without_inverted.size())
assertEquals(sql_query_name2.size(), sql_query_name2_without_inverted.size())
}
def table_name = "test_add_index_for_arr_all_null"
sql "DROP TABLE IF EXISTS ${table_name}"
sql """
CREATE TABLE IF NOT EXISTS ${table_name} (
`id` int(11) NULL,
`name` ARRAY<text> NULL,
)
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
properties("replication_num" = "1");
"""
sql "insert into ${table_name} values (1, null), (2, null)"
sql "ALTER TABLE ${table_name} ADD INDEX name_idx (name) USING INVERTED;"
wait_for_latest_op_on_table_finish("${table_name}", timeout)
// build index for name that name data can using inverted index
if (!isCloudMode()) {
sql "BUILD INDEX name_idx ON ${table_name}"
wait_for_build_index_on_partition_finish("${table_name}", timeout)
}
qt_sql "select /*+SET_VAR(enable_inverted_index_query=true)*/ * from ${table_name} where array_contains(name, 'text7') order by id"
qt_sql "select /*+SET_VAR(enable_inverted_index_query=true)*/ * from ${table_name} where !array_contains(name, 'text7') order by id"
qt_sql "select /*+SET_VAR(enable_inverted_index_query=true)*/ * from ${table_name} where name is null order by id"
qt_sql "select /*+SET_VAR(enable_inverted_index_query=false)*/ * from ${table_name} where array_contains(name, 'text7') order by id"
qt_sql "select /*+SET_VAR(enable_inverted_index_query=false)*/ * from ${table_name} where !array_contains(name, 'text7') order by id"
qt_sql "select /*+SET_VAR(enable_inverted_index_query=false)*/ * from ${table_name} where name is null order by id"
}