branch-2.1: [fix](parquet)Fix data column and null map column not equal when reading Parquet complex type cross-page data #47734 (#48039)
Cherry-picked from #47734 Co-authored-by: daidai <changyuwei@selectdb.com>
This commit is contained in:
committed by
GitHub
parent
a7f9188f79
commit
bc6af178b3
@ -323,6 +323,18 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
|
||||
// just read the remaining values of the last row in previous page,
|
||||
// so there's no a new row should be read.
|
||||
batch_size = 0;
|
||||
/*
|
||||
* Since the function is repeatedly called to fetch data for the batch size,
|
||||
* it causes `_rep_levels.resize(0); _def_levels.resize(0);`, resulting in the
|
||||
* definition and repetition levels of the reader only containing the latter
|
||||
* part of the batch (i.e., missing some parts). Therefore, when using the
|
||||
* definition and repetition levels to fill the null_map for structs and maps,
|
||||
* the function should not be called multiple times before filling.
|
||||
* todo:
|
||||
* We may need to consider reading the entire batch of data at once, as this approach
|
||||
* would be more user-friendly in terms of function usage. However, we must consider that if the
|
||||
* data spans multiple pages, memory usage may increase significantly.
|
||||
*/
|
||||
} else {
|
||||
_rep_levels.resize(0);
|
||||
_def_levels.resize(0);
|
||||
@ -746,7 +758,7 @@ Status StructColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr
|
||||
continue;
|
||||
}
|
||||
|
||||
_read_column_names.insert(doris_name);
|
||||
_read_column_names.emplace_back(doris_name);
|
||||
|
||||
select_vector.reset();
|
||||
size_t field_rows = 0;
|
||||
@ -758,6 +770,15 @@ Status StructColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr
|
||||
is_dict_filter));
|
||||
*read_rows = field_rows;
|
||||
*eof = field_eof;
|
||||
/*
|
||||
* Considering the issue in the `_read_nested_column` function where data may span across pages, leading
|
||||
* to missing definition and repetition levels, when filling the null_map of the struct later, it is
|
||||
* crucial to use the definition and repetition levels from the first read column
|
||||
* (since `_read_nested_column` is not called repeatedly).
|
||||
*
|
||||
* It is worth mentioning that, theoretically, any sub-column can be chosen to fill the null_map,
|
||||
* and selecting the shortest one will offer better performance
|
||||
*/
|
||||
} else {
|
||||
while (field_rows < *read_rows && !field_eof) {
|
||||
size_t loop_rows = 0;
|
||||
|
||||
@ -280,24 +280,30 @@ public:
|
||||
if (!_read_column_names.empty()) {
|
||||
// can't use _child_readers[*_read_column_names.begin()]
|
||||
// because the operator[] of std::unordered_map is not const :(
|
||||
return _child_readers.find(*_read_column_names.begin())->second->get_rep_level();
|
||||
/*
|
||||
* Considering the issue in the `_read_nested_column` function where data may span across pages, leading
|
||||
* to missing definition and repetition levels, when filling the null_map of the struct later, it is
|
||||
* crucial to use the definition and repetition levels from the first read column,
|
||||
* that is `_read_column_names.front()`.
|
||||
*/
|
||||
return _child_readers.find(_read_column_names.front())->second->get_rep_level();
|
||||
}
|
||||
return _child_readers.begin()->second->get_rep_level();
|
||||
}
|
||||
|
||||
const std::vector<level_t>& get_def_level() const override {
|
||||
if (!_read_column_names.empty()) {
|
||||
return _child_readers.find(*_read_column_names.begin())->second->get_def_level();
|
||||
return _child_readers.find(_read_column_names.front())->second->get_def_level();
|
||||
}
|
||||
return _child_readers.begin()->second->get_def_level();
|
||||
}
|
||||
|
||||
Statistics statistics() override {
|
||||
Statistics st;
|
||||
for (const auto& reader : _child_readers) {
|
||||
// make sure the field is read
|
||||
if (_read_column_names.find(reader.first) != _read_column_names.end()) {
|
||||
Statistics cst = reader.second->statistics();
|
||||
for (const auto& column_name : _read_column_names) {
|
||||
auto reader = _child_readers.find(column_name);
|
||||
if (reader != _child_readers.end()) {
|
||||
Statistics cst = reader->second->statistics();
|
||||
st.merge(cst);
|
||||
}
|
||||
}
|
||||
@ -308,7 +314,8 @@ public:
|
||||
|
||||
private:
|
||||
std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> _child_readers;
|
||||
std::set<std::string> _read_column_names;
|
||||
std::vector<std::string> _read_column_names;
|
||||
//Need to use vector instead of set,see `get_rep_level()` for the reason.
|
||||
};
|
||||
|
||||
}; // namespace doris::vectorized
|
||||
|
||||
@ -65,3 +65,12 @@
|
||||
-- !viewfs --
|
||||
25001 25001 25001
|
||||
|
||||
-- !row_cross_pages_2 --
|
||||
149923 149923
|
||||
|
||||
-- !row_cross_pages_3 --
|
||||
74815 74815
|
||||
|
||||
-- !row_cross_pages_4 --
|
||||
457 457
|
||||
|
||||
@ -0,0 +1,10 @@
|
||||
-- This file is automatically generated. You should know what you did if you want to edit this
|
||||
-- !1 --
|
||||
1
|
||||
|
||||
-- !2 --
|
||||
5000
|
||||
|
||||
-- !3 --
|
||||
5000
|
||||
|
||||
@ -15,7 +15,7 @@
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite("test_tvf_p2", "p0,external,tvf,external_docker,hive") {
|
||||
suite("test_tvf_p0", "p0,external,tvf,external_docker,hive") {
|
||||
String enabled = context.config.otherConfigs.get("enableHiveTest")
|
||||
if (enabled != null && enabled.equalsIgnoreCase("true")) {
|
||||
String nameNodeHost = context.config.otherConfigs.get("externalEnvIp")
|
||||
@ -46,7 +46,7 @@ suite("test_tvf_p2", "p0,external,tvf,external_docker,hive") {
|
||||
"format" = "orc");
|
||||
"""
|
||||
|
||||
// a row of complex type may be stored across more pages
|
||||
// (1): a row of complex type may be stored across more pages
|
||||
qt_row_cross_pages """select count(id), count(m1), count(m2)
|
||||
from hdfs(
|
||||
"uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/row_cross_pages.parquet",
|
||||
@ -73,5 +73,25 @@ suite("test_tvf_p2", "p0,external,tvf,external_docker,hive") {
|
||||
"format" = "parquet",
|
||||
"fs.viewfs.mounttable.my-cluster.link./ns1" = "hdfs://${nameNodeHost}:${hdfsPort}/",
|
||||
"fs.viewfs.mounttable.my-cluster.homedir" = "/ns1")"""
|
||||
|
||||
// (2): a row of complex type may be stored across more pages
|
||||
qt_row_cross_pages_2 """select count(id), count(experiment)
|
||||
from hdfs(
|
||||
"uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/row_cross_pages_2.parquet",
|
||||
"format" = "parquet");
|
||||
""" //149923
|
||||
|
||||
qt_row_cross_pages_3 """select count(id), count(experiment)
|
||||
from hdfs(
|
||||
"uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/row_cross_pages_2.parquet",
|
||||
"format" = "parquet") where id > 49923 ;
|
||||
""" // 74815
|
||||
|
||||
qt_row_cross_pages_4 """select count(id), count(experiment)
|
||||
from hdfs(
|
||||
"uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/row_cross_pages_2.parquet",
|
||||
"format" = "parquet") where id < 300 ;
|
||||
""" //457
|
||||
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,52 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite("test_parquet_complex_cross_page", "p2,external,hive,external_remote,external_remote_hive") {
|
||||
|
||||
String enabled = context.config.otherConfigs.get("enableExternalHudiTest")
|
||||
//hudi hive use same catalog in p2.
|
||||
if (enabled == null || !enabled.equalsIgnoreCase("true")) {
|
||||
logger.info("disable test")
|
||||
return;
|
||||
}
|
||||
|
||||
String props = context.config.otherConfigs.get("hudiEmrCatalog")
|
||||
String hms_catalog_name = "test_parquet_complex_cross_page"
|
||||
|
||||
sql """drop catalog if exists ${hms_catalog_name};"""
|
||||
sql """
|
||||
CREATE CATALOG IF NOT EXISTS ${hms_catalog_name}
|
||||
PROPERTIES (
|
||||
${props}
|
||||
,'hive.version' = '3.1.3'
|
||||
);
|
||||
"""
|
||||
|
||||
logger.info("catalog " + hms_catalog_name + " created")
|
||||
sql """switch ${hms_catalog_name};"""
|
||||
logger.info("switched to catalog " + hms_catalog_name)
|
||||
sql """ use regression;"""
|
||||
|
||||
sql """ set dry_run_query=true; """
|
||||
|
||||
qt_1 """ SELECT * FROM test_parquet_complex_cross_page WHERE device_id='DZ692' and format_time between 1737693770300 and 1737693770500
|
||||
and date between '20250124' and '20250124' and project='GA20230001' ; """
|
||||
qt_2 """ SELECT functions_pnc_ssm_road_di_objects from test_parquet_complex_cross_page ; """
|
||||
qt_3 """ select * from test_parquet_complex_cross_page ; """
|
||||
|
||||
sql """drop catalog ${hms_catalog_name};"""
|
||||
}
|
||||
Reference in New Issue
Block a user