[cherry-pick] (branch-2.1)fix variant index (#36577)

pick from master #36163
This commit is contained in:
Sun Chenyang
2024-06-20 17:57:26 +08:00
committed by GitHub
parent 64a94e883d
commit fbcf63e1f5
9 changed files with 135 additions and 11 deletions

View File

@ -75,7 +75,7 @@ public:
// check if the column is valid for inverted index, some columns
// are generated from variant, but not all of them are supported
static bool check_column_valid(const TabletColumn& column) {
static bool check_support_inverted_index(const TabletColumn& column) {
// bellow types are not supported in inverted index for extracted columns
static std::set<FieldType> invalid_types = {
FieldType::OLAP_FIELD_TYPE_DOUBLE,

View File

@ -218,9 +218,7 @@ Status SegmentWriter::init(const std::vector<uint32_t>& col_ids, bool has_key) {
}
// indexes for this column
opts.indexes = std::move(_tablet_schema->get_indexes_for_column(column));
if (!InvertedIndexColumnWriter::check_column_valid(column)) {
// skip inverted index if invalid
opts.indexes.clear();
if (!InvertedIndexColumnWriter::check_support_inverted_index(column)) {
opts.need_zone_map = false;
opts.need_bloom_filter = false;
opts.need_bitmap_index = false;

View File

@ -171,9 +171,7 @@ Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletCo
}
// indexes for this column
opts.indexes = _tablet_schema->get_indexes_for_column(column);
if (!InvertedIndexColumnWriter::check_column_valid(column)) {
// skip inverted index if invalid
opts.indexes.clear();
if (!InvertedIndexColumnWriter::check_support_inverted_index(column)) {
opts.need_zone_map = false;
opts.need_bloom_filter = false;
opts.need_bitmap_index = false;

View File

@ -1275,6 +1275,10 @@ const TabletColumn& TabletSchema::column(const std::string& field_name) const {
std::vector<const TabletIndex*> TabletSchema::get_indexes_for_column(
const TabletColumn& col) const {
std::vector<const TabletIndex*> indexes_for_column;
// Some columns (Float, Double, JSONB ...) from the variant do not support index, but they are listed in TabltetIndex.
if (!segment_v2::InvertedIndexColumnWriter::check_support_inverted_index(col)) {
return indexes_for_column;
}
int32_t col_unique_id = col.is_extracted_column() ? col.parent_unique_id() : col.unique_id();
const std::string& suffix_path =
col.has_path_info() ? escape_for_path_name(col.path_info_ptr()->get_path()) : "";
@ -1346,7 +1350,13 @@ const TabletIndex* TabletSchema::get_inverted_index(int32_t col_unique_id,
return nullptr;
}
const TabletIndex* TabletSchema::get_inverted_index(const TabletColumn& col) const {
const TabletIndex* TabletSchema::get_inverted_index(const TabletColumn& col,
bool check_valid) const {
// With check_valid set to true by default
// Some columns(Float, Double, JSONB ...) from the variant do not support inverted index
if (check_valid && !segment_v2::InvertedIndexColumnWriter::check_support_inverted_index(col)) {
return nullptr;
}
// TODO use more efficient impl
// Use parent id if unique not assigned, this could happend when accessing subcolumns of variants
int32_t col_unique_id = col.is_extracted_column() ? col.parent_unique_id() : col.unique_id();

View File

@ -353,7 +353,10 @@ public:
bool has_inverted_index_with_index_id(int64_t index_id, const std::string& suffix_path) const;
const TabletIndex* get_inverted_index_with_index_id(int64_t index_id,
const std::string& suffix_name) const;
const TabletIndex* get_inverted_index(const TabletColumn& col) const;
// check_valid: check if this column supports inverted index
// Some columns (Float, Double, JSONB ...) from the variant do not support index, but they are listed in TabletIndex.
// If returned, the index file will not be found.
const TabletIndex* get_inverted_index(const TabletColumn& col, bool check_valid = true) const;
const TabletIndex* get_inverted_index(int32_t col_unique_id,
const std::string& suffix_path) const;
bool has_ngram_bf_index(int32_t col_unique_id) const;

View File

@ -330,7 +330,7 @@ Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta
continue;
}
auto column = output_rowset_schema->column(column_idx);
if (!InvertedIndexColumnWriter::check_column_valid(column)) {
if (!InvertedIndexColumnWriter::check_support_inverted_index(column)) {
continue;
}
DCHECK(output_rowset_schema->has_inverted_index_with_index_id(index_id, ""));

View File

@ -403,7 +403,7 @@ void inherit_root_attributes(TabletSchemaSPtr& schema) {
if (it == variants_index_meta.end()) {
continue;
}
auto index_meta = schema->get_inverted_index(col);
auto index_meta = schema->get_inverted_index(col, false);
// add index meta
TabletIndex index_info = it->second;
index_info.set_escaped_escaped_index_suffix_path(col.path_info_ptr()->get_path());

View File

@ -0,0 +1,10 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
\N
\N
\N
4748
-- !sql --
4

View File

@ -0,0 +1,105 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_variant_index_format_v1", "p0") {
def calc_file_crc_on_tablet = { ip, port, tablet ->
return curl("GET", String.format("http://%s:%s/api/calc_crc?tablet_id=%s", ip, port, tablet))
}
def set_be_config = { key, value ->
String backend_id;
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
backend_id = backendId_to_backendIP.keySet()[0]
def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value)
logger.info("update config: code=" + code + ", out=" + out + ", err=" + err)
}
def load_json_data = {table_name, file_name ->
// load the json data
streamLoad {
table "${table_name}"
// set http request header params
set 'read_json_by_line', 'true'
set 'format', 'json'
set 'max_filter_ratio', '0.1'
file file_name // import json file
time 10000 // limit inflight 10s
// if declared a check callback, the default check condition will ignore.
// So you must check all condition
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
logger.info("Stream load ${file_name} result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
// assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows)
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
}
}
}
def table_name = "github_events"
sql """DROP TABLE IF EXISTS ${table_name}"""
sql """
CREATE TABLE IF NOT EXISTS ${table_name} (
k bigint,
v variant,
INDEX idx_var(v) USING INVERTED PROPERTIES("parser" = "english") COMMENT ''
)
DUPLICATE KEY(`k`)
DISTRIBUTED BY HASH(k) BUCKETS 1
properties("replication_num" = "1", "disable_auto_compaction" = "true", "inverted_index_storage_format" = "V1");
"""
set_be_config.call("memory_limitation_per_thread_for_schema_change_bytes", "6294967296")
load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""")
load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-1.json'}""")
load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-2.json'}""")
load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-3.json'}""")
load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2022-11-07-16.json'}""")
load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2022-11-07-10.json'}""")
load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2022-11-07-22.json'}""")
load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2022-11-07-23.json'}""")
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
tablets = sql_return_maparray """ show tablets from ${table_name}; """
String tablet_id = tablets[0].TabletId
String backend_id = tablets[0].BackendId
String ip = backendId_to_backendIP.get(backend_id)
String port = backendId_to_backendHttpPort.get(backend_id)
def (code_0, out_0, err_0) = calc_file_crc_on_tablet(ip, port, tablet_id)
logger.info("Run calc_file_crc_on_tablet: code=" + code_0 + ", out=" + out_0 + ", err=" + err_0)
assertTrue(code_0 == 0)
assertTrue(out_0.contains("crc_value"))
assertTrue(out_0.contains("used_time_ms"))
assertEquals("0", parseJson(out_0.trim()).start_version)
assertEquals("9", parseJson(out_0.trim()).end_version)
assertEquals("9", parseJson(out_0.trim()).rowset_count)
qt_sql """select cast(v["payload"]["pull_request"]["additions"] as int) from github_events where cast(v["repo"]["name"] as string) = 'xpressengine/xe-core' order by 1;"""
qt_sql """select count() from github_events where cast(v["repo"]["name"] as string) = 'xpressengine/xe-core'"""
set_be_config.call("memory_limitation_per_thread_for_schema_change_bytes", "2147483648")
}