[Fix](parquet-reader) Fix parquet string column min max statistics issue which caused query result incorrectly. (#21675)

In parquet, min and max statistics may not be able to handle UTF8 correctly.
Current processing method is using min_value and max_value statistics introduced by PARQUET-1025 if they are used.
If not, current processing method is temporarily ignored. A better way is try to read min and max statistics if it contains 
only ASCII characters. I will improve it in the future PR.
This commit is contained in:
Qi Chen
2023-07-14 00:09:41 +08:00
committed by GitHub
parent 4158253799
commit 6fd8f5cd2f
4 changed files with 231 additions and 7 deletions

View File

@ -120,7 +120,7 @@ private:
static bool _filter_by_min_max(const ColumnValueRange<primitive_type>& col_val_range,
const ScanPredicate& predicate, const FieldSchema* col_schema,
const std::string& encoded_min, const std::string& encoded_max,
const cctz::time_zone& ctz) {
const cctz::time_zone& ctz, bool use_min_max_value = false) {
using CppType = typename PrimitiveTypeTraits<primitive_type>::CppType;
std::vector<CppType> predicate_values;
for (const void* v : predicate.values) {
@ -144,6 +144,13 @@ private:
case TYPE_CHAR:
[[fallthrough]];
case TYPE_STRING:
// TODO: In parquet, min and max statistics may not be able to handle UTF8 correctly.
// Current processing method is using min_value and max_value statistics introduced by PARQUET-1025 if they are used.
// If not, current processing method is temporarily ignored. A better way is try to read min and max statistics
// if it contains only ASCII characters.
if (!use_min_max_value) {
return false;
}
if constexpr (std::is_same_v<CppType, StringRef>) {
min_value = StringRef(encoded_min);
max_value = StringRef(encoded_max);
@ -372,7 +379,8 @@ public:
static bool filter_by_stats(const ColumnValueRangeType& col_val_range,
const FieldSchema* col_schema, bool is_set_min_max,
const std::string& encoded_min, const std::string& encoded_max,
bool is_all_null, const cctz::time_zone& ctz) {
bool is_all_null, const cctz::time_zone& ctz,
bool use_min_max_value = false) {
bool need_filter = false;
std::visit(
[&](auto&& range) {
@ -387,7 +395,7 @@ public:
}
for (auto& filter : filters) {
need_filter |= _filter_by_min_max(range, filter, col_schema, encoded_min,
encoded_max, ctz);
encoded_max, ctz, use_min_max_value);
if (need_filter) {
break;
}

View File

@ -836,15 +836,22 @@ Status ParquetReader::_process_column_stat_filter(const std::vector<tparquet::Co
auto& statistic = meta_data.statistics;
bool is_all_null =
(statistic.__isset.null_count && statistic.null_count == meta_data.num_values);
bool is_set_min_max = (statistic.__isset.max && statistic.__isset.min);
bool is_set_min_max = (statistic.__isset.max && statistic.__isset.min) ||
(statistic.__isset.max_value && statistic.__isset.min_value);
if ((!is_set_min_max) && (!is_all_null)) {
continue;
}
const FieldSchema* col_schema = schema_desc.get_column(col_name);
// Min-max of statistic is plain-encoded value
*filter_group =
ParquetPredicate::filter_by_stats(slot_iter->second, col_schema, is_set_min_max,
statistic.min, statistic.max, is_all_null, *_ctz);
if (statistic.__isset.min_value) {
*filter_group = ParquetPredicate::filter_by_stats(
slot_iter->second, col_schema, is_set_min_max, statistic.min_value,
statistic.max_value, is_all_null, *_ctz, true);
} else {
*filter_group = ParquetPredicate::filter_by_stats(
slot_iter->second, col_schema, is_set_min_max, statistic.min, statistic.max,
is_all_null, *_ctz, false);
}
if (*filter_group) {
break;
}

View File

@ -0,0 +1,148 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !01 --
2 是
-- !02 --
1
2 是
3 III类户
-- !03 --
2 1
-- !04 --
5 ありがとう
-- !05 --
1 你好
2 谢谢
3 再见
4 こんにちは
5 ありがとう
6 さようなら
7 안녕하세요
8 감사합니다
9 안녕히 가세요
10 Hola
11 Gracias
12 Adiós
13 Hallo
14 Danke
15 Auf Wiedersehen
16 مرحبا
17 شكرًا
18 مع السلامة
19 Bonjour
20 Merci
21 Au revoir
22 Ciao
23 Grazie
24 Arrivederci
25 Olá
26 Obrigado
27 Adeus
28 Hello
29 Thank you
30 Goodbye
-- !06 --
5 1
-- !01 --
2 是
-- !02 --
1
2 是
3 III类户
-- !03 --
2 1
-- !04 --
5 ありがとう
-- !05 --
1 你好
2 谢谢
3 再见
4 こんにちは
5 ありがとう
6 さようなら
7 안녕하세요
8 감사합니다
9 안녕히 가세요
10 Hola
11 Gracias
12 Adiós
13 Hallo
14 Danke
15 Auf Wiedersehen
16 مرحبا
17 شكرًا
18 مع السلامة
19 Bonjour
20 Merci
21 Au revoir
22 Ciao
23 Grazie
24 Arrivederci
25 Olá
26 Obrigado
27 Adeus
28 Hello
29 Thank you
30 Goodbye
-- !06 --
5 1
-- !01 --
2 是
-- !02 --
1
2 是
3 III类户
-- !03 --
2 1
-- !04 --
5 ありがとう
-- !05 --
1 你好
2 谢谢
3 再见
4 こんにちは
5 ありがとう
6 さようなら
7 안녕하세요
8 감사합니다
9 안녕히 가세요
10 Hola
11 Gracias
12 Adiós
13 Hallo
14 Danke
15 Auf Wiedersehen
16 مرحبا
17 شكرًا
18 مع السلامة
19 Bonjour
20 Merci
21 Au revoir
22 Ciao
23 Grazie
24 Arrivederci
25 Olá
26 Obrigado
27 Adeus
28 Hello
29 Thank you
30 Goodbye
-- !06 --
5 1

View File

@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_multi_langs", "p2") {
def formats = ["_parquet", "_orc", "_text"]
def q1 = """select * from test_chineseSUFFIX where col1='是' order by id"""
def q2 = """select * from test_chineseSUFFIX order by id"""
def q3 = """select id, count(col1) from test_chineseSUFFIX where col1='是' group by id order by id"""
def q4 = """select * from test_multi_langsSUFFIX where col1='ありがとう' order by id"""
def q5 = """select * from test_multi_langsSUFFIX order by id"""
def q6 = """select id, count(col1) from test_multi_langsSUFFIX where col1='ありがとう' group by id order by id"""
String enabled = context.config.otherConfigs.get("enableExternalHiveTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
try {
String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost")
String extHiveHmsPort = context.config.otherConfigs.get("extHiveHmsPort")
String catalog_name = "test_multi_langs"
sql """drop catalog if exists ${catalog_name};"""
sql """
create catalog if not exists ${catalog_name} properties (
'type'='hms',
'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}'
);
"""
logger.info("catalog " + catalog_name + " created")
sql """switch ${catalog_name};"""
logger.info("switched to catalog " + catalog_name)
sql """use multi_catalog;"""
logger.info("use multi_catalog")
for (String format in formats) {
logger.info("Process format " + format)
qt_01 q1.replace("SUFFIX", format)
qt_02 q2.replace("SUFFIX", format)
qt_03 q3.replace("SUFFIX", format)
qt_04 q4.replace("SUFFIX", format)
qt_05 q5.replace("SUFFIX", format)
qt_06 q6.replace("SUFFIX", format)
}
sql """drop catalog if exists ${catalog_name}"""
} finally {
}
}
}