[fix](parquet) hasn't initialize select vector when number of nested values equals zero (#18953)
Fix bug when reading array type in parquet file: ``` ERROR 1105 (HY000): errCode = 2, detailMessage = [INTERNAL_ERROR]Read parquet file xxx failed, reason = [IO_ERROR]Decode too many values in current page ``` When reading normal columns, `ScalarColumnReader::_read_values` still calls `ColumnSelectVector::set_run_length_null_map` to initialize select vector, but `ScalarColumnReader::_read_nested_column` hasn't do this, making the number of values wrong. The situation where this error occurs is particularly extreme: The column pages have remaining values to be read, but all of them are null values at ancestor level, so there's no actual read operation, just skipping null values at ancestor level.
This commit is contained in:
@ -467,7 +467,8 @@ Status FieldDescriptor::parse_map_field(const std::vector<tparquet::SchemaElemen
|
||||
|
||||
map_field->name = map_schema.name;
|
||||
map_field->type.type = TYPE_MAP;
|
||||
map_field->type.add_sub_type(map_kv_field->type);
|
||||
map_field->type.add_sub_type(map_kv_field->type.children[0]);
|
||||
map_field->type.add_sub_type(map_kv_field->type.children[1]);
|
||||
map_field->is_nullable = is_optional;
|
||||
|
||||
return Status::OK();
|
||||
@ -492,7 +493,8 @@ Status FieldDescriptor::parse_struct_field(const std::vector<tparquet::SchemaEle
|
||||
struct_field->is_nullable = is_optional;
|
||||
struct_field->type.type = TYPE_STRUCT;
|
||||
for (int i = 0; i < num_children; ++i) {
|
||||
struct_field->type.add_sub_type(struct_field->children[i].type);
|
||||
struct_field->type.add_sub_type(struct_field->children[i].type,
|
||||
struct_field->children[0].name);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -291,6 +291,9 @@ size_t ColumnChunkReader::get_def_levels(level_t* levels, size_t n) {
|
||||
|
||||
Status ColumnChunkReader::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
|
||||
ColumnSelectVector& select_vector, bool is_dict_filter) {
|
||||
if (select_vector.num_values() == 0) {
|
||||
return Status::OK();
|
||||
}
|
||||
SCOPED_RAW_TIMER(&_statistics.decode_value_time);
|
||||
if (UNLIKELY((doris_column->is_column_dictionary() || is_dict_filter) && !_has_dict)) {
|
||||
return Status::IOError("Not dictionary coded");
|
||||
|
||||
@ -393,7 +393,7 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
|
||||
}
|
||||
|
||||
size_t num_values = parsed_values - ancestor_nulls;
|
||||
if (num_values > 0) {
|
||||
{
|
||||
SCOPED_RAW_TIMER(&_decode_null_map_time);
|
||||
select_vector.set_run_length_null_map(null_map, num_values, map_data_column);
|
||||
}
|
||||
|
||||
@ -19,13 +19,19 @@ package org.apache.doris.tablefunction;
|
||||
|
||||
import org.apache.doris.analysis.BrokerDesc;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.catalog.ArrayType;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.HdfsResource;
|
||||
import org.apache.doris.catalog.MapType;
|
||||
import org.apache.doris.catalog.PrimitiveType;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.catalog.StructField;
|
||||
import org.apache.doris.catalog.StructType;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.FeNameFormat;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.BrokerUtil;
|
||||
import org.apache.doris.datasource.property.constants.S3Properties;
|
||||
@ -62,6 +68,7 @@ import org.apache.logging.log4j.Logger;
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.thrift.TSerializer;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
@ -368,6 +375,43 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
|
||||
return columns;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert PTypeDesc into doris column type
|
||||
* @param typeNodes list PTypeNodes in PTypeDesc
|
||||
* @param start the start index of typeNode to parse
|
||||
* @return column type and the number of parsed PTypeNodes
|
||||
*/
|
||||
private Pair<Type, Integer> getColumnType(List<PTypeNode> typeNodes, int start) {
|
||||
PScalarType columnType = typeNodes.get(start).getScalarType();
|
||||
TPrimitiveType tPrimitiveType = TPrimitiveType.findByValue(columnType.getType());
|
||||
Type type;
|
||||
int parsedNodes;
|
||||
if (tPrimitiveType == TPrimitiveType.ARRAY) {
|
||||
Pair<Type, Integer> itemType = getColumnType(typeNodes, start + 1);
|
||||
type = ArrayType.create(itemType.key(), true);
|
||||
parsedNodes = 1 + itemType.value();
|
||||
} else if (tPrimitiveType == TPrimitiveType.MAP) {
|
||||
Pair<Type, Integer> keyType = getColumnType(typeNodes, start + 1);
|
||||
Pair<Type, Integer> valueType = getColumnType(typeNodes, start + 1 + keyType.value());
|
||||
type = new MapType(keyType.key(), valueType.key());
|
||||
parsedNodes = 1 + keyType.value() + valueType.value();
|
||||
} else if (tPrimitiveType == TPrimitiveType.STRUCT) {
|
||||
parsedNodes = 1;
|
||||
ArrayList<StructField> fields = new ArrayList<>();
|
||||
for (int i = 0; i < typeNodes.get(start).getStructFieldsCount(); ++i) {
|
||||
Pair<Type, Integer> fieldType = getColumnType(typeNodes, start + parsedNodes);
|
||||
fields.add(new StructField(typeNodes.get(start).getStructFields(i).getName(), fieldType.key()));
|
||||
parsedNodes += fieldType.value();
|
||||
}
|
||||
type = new StructType(fields);
|
||||
} else {
|
||||
type = ScalarType.createType(PrimitiveType.fromThrift(tPrimitiveType),
|
||||
columnType.getLen(), columnType.getPrecision(), columnType.getScale());
|
||||
parsedNodes = 1;
|
||||
}
|
||||
return Pair.of(type, parsedNodes);
|
||||
}
|
||||
|
||||
private void fillColumns(InternalService.PFetchTableSchemaResult result)
|
||||
throws AnalysisException {
|
||||
if (result.getColumnNums() == 0) {
|
||||
@ -376,14 +420,7 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
|
||||
for (int idx = 0; idx < result.getColumnNums(); ++idx) {
|
||||
PTypeDesc type = result.getColumnTypes(idx);
|
||||
String colName = result.getColumnNames(idx);
|
||||
for (PTypeNode typeNode : type.getTypesList()) {
|
||||
// only support ScalarType.
|
||||
PScalarType scalarType = typeNode.getScalarType();
|
||||
TPrimitiveType tPrimitiveType = TPrimitiveType.findByValue(scalarType.getType());
|
||||
columns.add(new Column(colName, PrimitiveType.fromThrift(tPrimitiveType),
|
||||
scalarType.getLen() <= 0 ? -1 : scalarType.getLen(), scalarType.getPrecision(),
|
||||
scalarType.getScale(), true));
|
||||
}
|
||||
columns.add(new Column(colName, getColumnType(type.getTypesList(), 0).key(), true));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -30,3 +30,12 @@
|
||||
2451656 \N 200558 \N \N 1066 \N 4 \N 2381557 \N 79.81 \N 94.50 0.00 \N \N 12956.55 481.95 0.00 \N 8514.45 1248.65
|
||||
\N \N 203791 \N 1655274 6679 \N 4 \N 3379960 71 \N 96.34 45.27 \N 3214.17 3525.86 6840.14 160.70 \N \N \N \N
|
||||
|
||||
-- !array_ancestor_null --
|
||||
500001
|
||||
|
||||
-- !nested_types_parquet --
|
||||
20926 20928 20978 23258 20962 23258 23258
|
||||
|
||||
-- !nested_types_orc --
|
||||
20926 20928 20978 23258 20962 23258 23258
|
||||
|
||||
|
||||
@ -26,5 +26,25 @@ suite("test_tvf_p2", "p2") {
|
||||
"fs.defaultFS" = "hdfs://${nameNodeHost}:${hdfsPort}",
|
||||
"format" = "parquet")
|
||||
where ss_store_sk = 4 and ss_addr_sk is null order by ss_item_sk"""
|
||||
|
||||
// array_ancestor_null.parquet is parquet file whose values in the array column are all nulls in a page
|
||||
qt_array_ancestor_null """select count(list_double_col) from hdfs(
|
||||
"uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/array_ancestor_null.parquet",
|
||||
"format" = "parquet",
|
||||
"fs.defaultFS" = "hdfs://${nameNodeHost}:${hdfsPort}")"""
|
||||
|
||||
// all_nested_types.parquet is parquet file that contains all complext types
|
||||
qt_nested_types_parquet """select count(array0), count(array1), count(array2), count(array3), count(struct0), count(struct1), count(map0)
|
||||
from hdfs(
|
||||
"uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/all_nested_types.parquet",
|
||||
"format" = "parquet",
|
||||
"fs.defaultFS" = "hdfs://${nameNodeHost}:${hdfsPort}")"""
|
||||
|
||||
// all_nested_types.orc is orc file that contains all complext types
|
||||
qt_nested_types_orc """select count(array0), count(array1), count(array2), count(array3), count(struct0), count(struct1), count(map0)
|
||||
from hdfs(
|
||||
"uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/orc/all_nested_types.orc",
|
||||
"format" = "orc",
|
||||
"fs.defaultFS" = "hdfs://${nameNodeHost}:${hdfsPort}")"""
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user