Add profile for parquet reader. New counters: - ParquetFilteredGroups: Filtered row groups by `RowGroup` min-max statistics - ParquetReadGroups: The number of row groups to read - ParquetFilteredRowsByGroup: The number of filtered rows by `RowGroup` min-max statistics - ParquetFilteredRowsByPage: The number of filtered rows by page min-max statistics - ParquetFilteredBytes: The filtered bytes by `RowGroup` min-max statistics - ParquetReadBytes: The total bytes in `ParquetReadGroups`, may be further filtered If a page is skipped as a whole ## Result ``` ┌──────────────────────────────────────────────────────┐ │[0: VFILE_SCAN_NODE] │ │(Active: 1s29ms, non-child: 96.42) │ │ - Counters: │ │ - BytesRead: 0.00 │ │ - FileReadCalls: 1.826K (1826) │ │ - FileReadTime: 510.627ms │ │ - FileRemoteReadBytes: 65.23 MB │ │ - FileRemoteReadCalls: 1.146K (1146) │ │ - FileRemoteReadRate: 128.29331970214844 MB/sec │ │ - FileRemoteReadTime: 508.469ms │ │ - NumDiskAccess: 0 │ │ - NumScanners: 1 │ │ - ParquetFilteredBytes: 0.00 │ │ - ParquetFilteredGroups: 0 │ │ - ParquetFilteredRowsByGroup: 0 │ │ - ParquetFilteredRowsByPage: 6.600003M (6600003)│ │ - ParquetReadBytes: 2.13 GB │ │ - ParquetReadGroups: 20 │ │ - PeakMemoryUsage: 0.00 │ │ - PredicateFilteredRows: 3.399797M (3399797) │ │ - PredicateFilteredTime: 133.302ms │ │ - RowsRead: 3.399997M (3399997) │ │ - RowsReturned: 200 │ │ - RowsReturnedRate: 194 │ │ - TotalRawReadTime(*): 726.566ms │ │ - TotalReadThroughput: 0.0 /sec │ │ - WaitScannerTime: 1s27ms │ └──────────────────────────────────────────────────────┘ ```
251 lines
9.3 KiB
C++
251 lines
9.3 KiB
C++
// Licensed to the Apache Software Foundation (ASF) under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing,
|
|
// software distributed under the License is distributed on an
|
|
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
// KIND, either express or implied. See the License for the
|
|
// specific language governing permissions and limitations
|
|
// under the License.
|
|
|
|
#include <glog/logging.h>
|
|
#include <gtest/gtest.h>
|
|
|
|
#include "io/local_file_reader.h"
|
|
#include "runtime/runtime_state.h"
|
|
#include "util/runtime_profile.h"
|
|
#include "vec/data_types/data_type_factory.hpp"
|
|
#include "vec/exec/file_hdfs_scanner.h"
|
|
#include "vec/exec/format/parquet/vparquet_reader.h"
|
|
|
|
namespace doris {
|
|
namespace vectorized {
|
|
|
|
class ParquetReaderTest : public testing::Test {
|
|
public:
|
|
ParquetReaderTest() {}
|
|
};
|
|
|
|
TEST_F(ParquetReaderTest, normal) {
|
|
TDescriptorTable t_desc_table;
|
|
TTableDescriptor t_table_desc;
|
|
|
|
t_table_desc.id = 0;
|
|
t_table_desc.tableType = TTableType::OLAP_TABLE;
|
|
t_table_desc.numCols = 0;
|
|
t_table_desc.numClusteringCols = 0;
|
|
t_desc_table.tableDescriptors.push_back(t_table_desc);
|
|
t_desc_table.__isset.tableDescriptors = true;
|
|
|
|
// init boolean and numeric slot
|
|
std::vector<std::string> numeric_types = {"boolean_col", "tinyint_col", "smallint_col",
|
|
"int_col", "bigint_col", "float_col",
|
|
"double_col"};
|
|
for (int i = 0; i < numeric_types.size(); i++) {
|
|
TSlotDescriptor tslot_desc;
|
|
{
|
|
tslot_desc.id = i;
|
|
tslot_desc.parent = 0;
|
|
TTypeDesc type;
|
|
{
|
|
TTypeNode node;
|
|
node.__set_type(TTypeNodeType::SCALAR);
|
|
TScalarType scalar_type;
|
|
scalar_type.__set_type(TPrimitiveType::type(i + 2));
|
|
node.__set_scalar_type(scalar_type);
|
|
type.types.push_back(node);
|
|
}
|
|
tslot_desc.slotType = type;
|
|
tslot_desc.columnPos = 0;
|
|
tslot_desc.byteOffset = 0;
|
|
tslot_desc.nullIndicatorByte = 0;
|
|
tslot_desc.nullIndicatorBit = -1;
|
|
tslot_desc.colName = numeric_types[i];
|
|
tslot_desc.slotIdx = 0;
|
|
tslot_desc.isMaterialized = true;
|
|
t_desc_table.slotDescriptors.push_back(tslot_desc);
|
|
}
|
|
}
|
|
|
|
t_desc_table.__isset.slotDescriptors = true;
|
|
{
|
|
// TTupleDescriptor dest
|
|
TTupleDescriptor t_tuple_desc;
|
|
t_tuple_desc.id = 0;
|
|
t_tuple_desc.byteSize = 16;
|
|
t_tuple_desc.numNullBytes = 0;
|
|
t_tuple_desc.tableId = 0;
|
|
t_tuple_desc.__isset.tableId = true;
|
|
t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
|
|
}
|
|
DescriptorTbl* desc_tbl;
|
|
ObjectPool obj_pool;
|
|
DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl);
|
|
|
|
auto slot_descs = desc_tbl->get_tuple_descriptor(0)->slots();
|
|
LocalFileReader* reader =
|
|
new LocalFileReader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0);
|
|
|
|
cctz::time_zone ctz;
|
|
TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz);
|
|
auto tuple_desc = desc_tbl->get_tuple_descriptor(0);
|
|
std::vector<std::string> column_names;
|
|
for (int i = 0; i < slot_descs.size(); i++) {
|
|
column_names.push_back(slot_descs[i]->col_name());
|
|
}
|
|
TFileScanRangeParams scan_params;
|
|
TFileRangeDesc scan_range;
|
|
{
|
|
scan_range.start_offset = 0;
|
|
scan_range.size = 1000;
|
|
}
|
|
auto p_reader = new ParquetReader(nullptr, scan_params, scan_range, column_names, 992, &ctz);
|
|
p_reader->set_file_reader(reader);
|
|
RuntimeState runtime_state((TQueryGlobals()));
|
|
runtime_state.set_desc_tbl(desc_tbl);
|
|
runtime_state.init_instance_mem_tracker();
|
|
|
|
std::vector<ExprContext*> conjunct_ctxs = std::vector<ExprContext*>();
|
|
p_reader->init_reader(conjunct_ctxs);
|
|
Block* block = new Block();
|
|
for (const auto& slot_desc : tuple_desc->slots()) {
|
|
auto data_type =
|
|
vectorized::DataTypeFactory::instance().create_data_type(slot_desc->type(), true);
|
|
MutableColumnPtr data_column = data_type->create_column();
|
|
block->insert(
|
|
ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name()));
|
|
}
|
|
bool eof = false;
|
|
p_reader->get_next_block(block, &eof);
|
|
for (auto& col : block->get_columns_with_type_and_name()) {
|
|
ASSERT_EQ(col.column->size(), 10);
|
|
}
|
|
EXPECT_TRUE(eof);
|
|
delete block;
|
|
delete p_reader;
|
|
}
|
|
|
|
TEST_F(ParquetReaderTest, scanner) {
|
|
TDescriptorTable t_desc_table;
|
|
TTableDescriptor t_table_desc;
|
|
|
|
t_table_desc.id = 0;
|
|
t_table_desc.tableType = TTableType::OLAP_TABLE;
|
|
t_table_desc.numCols = 7;
|
|
t_table_desc.numClusteringCols = 0;
|
|
t_desc_table.tableDescriptors.push_back(t_table_desc);
|
|
t_desc_table.__isset.tableDescriptors = true;
|
|
|
|
// init boolean and numeric slot
|
|
std::vector<std::string> numeric_types = {"boolean_col", "tinyint_col", "smallint_col",
|
|
"int_col", "bigint_col", "float_col",
|
|
"double_col"};
|
|
for (int i = 0; i < numeric_types.size(); i++) {
|
|
TSlotDescriptor tslot_desc;
|
|
{
|
|
tslot_desc.id = i;
|
|
tslot_desc.parent = 0;
|
|
TTypeDesc type;
|
|
{
|
|
TTypeNode node;
|
|
node.__set_type(TTypeNodeType::SCALAR);
|
|
TScalarType scalar_type;
|
|
scalar_type.__set_type(TPrimitiveType::type(i + 2));
|
|
node.__set_scalar_type(scalar_type);
|
|
type.types.push_back(node);
|
|
}
|
|
tslot_desc.slotType = type;
|
|
tslot_desc.columnPos = 0;
|
|
tslot_desc.byteOffset = 0;
|
|
tslot_desc.nullIndicatorByte = 1;
|
|
tslot_desc.nullIndicatorBit = 1;
|
|
tslot_desc.colName = numeric_types[i];
|
|
tslot_desc.slotIdx = 0;
|
|
tslot_desc.isMaterialized = true;
|
|
t_desc_table.slotDescriptors.push_back(tslot_desc);
|
|
}
|
|
}
|
|
|
|
t_desc_table.__isset.slotDescriptors = true;
|
|
{
|
|
TTupleDescriptor t_tuple_desc;
|
|
t_tuple_desc.id = 0;
|
|
t_tuple_desc.byteSize = 16;
|
|
t_tuple_desc.numNullBytes = 0;
|
|
t_tuple_desc.tableId = 0;
|
|
t_tuple_desc.__isset.tableId = true;
|
|
t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
|
|
}
|
|
|
|
// set scan range
|
|
// std::vector<TScanRangeParams> scan_ranges;
|
|
TFileScanRange file_scan_range;
|
|
{
|
|
// TScanRangeParams scan_range_params;
|
|
// TFileScanRange file_scan_range;
|
|
TFileScanRangeParams params;
|
|
{
|
|
params.__set_src_tuple_id(0);
|
|
params.__set_num_of_columns_from_file(7);
|
|
params.file_type = TFileType::FILE_LOCAL;
|
|
params.format_type = TFileFormatType::FORMAT_PARQUET;
|
|
std::vector<TFileScanSlotInfo> file_slots;
|
|
for (int i = 0; i < numeric_types.size(); i++) {
|
|
TFileScanSlotInfo slot_info;
|
|
slot_info.slot_id = i;
|
|
slot_info.is_file_slot = true;
|
|
file_slots.emplace_back(slot_info);
|
|
}
|
|
params.__set_required_slots(file_slots);
|
|
}
|
|
file_scan_range.params = params;
|
|
TFileRangeDesc range;
|
|
{
|
|
range.start_offset = 0;
|
|
range.size = 1000;
|
|
range.path = "./be/test/exec/test_data/parquet_scanner/type-decoder.parquet";
|
|
std::vector<std::string> columns_from_path {"value"};
|
|
range.__set_columns_from_path(columns_from_path);
|
|
}
|
|
file_scan_range.ranges.push_back(range);
|
|
// scan_range_params.scan_range.ext_scan_range.__set_file_scan_range(broker_scan_range);
|
|
// scan_ranges.push_back(scan_range_params);
|
|
}
|
|
|
|
std::vector<TExpr> pre_filter_texprs = std::vector<TExpr>();
|
|
RuntimeState runtime_state((TQueryGlobals()));
|
|
runtime_state.init_instance_mem_tracker();
|
|
|
|
DescriptorTbl* desc_tbl;
|
|
ObjectPool obj_pool;
|
|
DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl);
|
|
runtime_state.set_desc_tbl(desc_tbl);
|
|
ScannerCounter counter;
|
|
std::vector<ExprContext*> conjunct_ctxs = std::vector<ExprContext*>();
|
|
auto scan = new ParquetFileHdfsScanner(&runtime_state, runtime_state.runtime_profile(),
|
|
file_scan_range.params, file_scan_range.ranges,
|
|
pre_filter_texprs, &counter);
|
|
scan->reg_conjunct_ctxs(0, conjunct_ctxs);
|
|
Status st = scan->open();
|
|
EXPECT_TRUE(st.ok());
|
|
|
|
bool eof = false;
|
|
Block* block = new Block();
|
|
scan->get_next(block, &eof);
|
|
for (auto& col : block->get_columns_with_type_and_name()) {
|
|
ASSERT_EQ(col.column->size(), 10);
|
|
}
|
|
delete block;
|
|
delete scan;
|
|
}
|
|
|
|
} // namespace vectorized
|
|
} // namespace doris
|