Files
doris/be/test/vec/exec/parquet/parquet_reader_test.cpp
2022-08-26 09:43:46 +08:00

239 lines
9.0 KiB
C++

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "io/local_file_reader.h"
#include "runtime/runtime_state.h"
#include "util/runtime_profile.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/exec/file_hdfs_scanner.h"
#include "vec/exec/format/parquet/vparquet_reader.h"
namespace doris {
namespace vectorized {
class ParquetReaderTest : public testing::Test {
public:
ParquetReaderTest() {}
};
TEST_F(ParquetReaderTest, normal) {
TDescriptorTable t_desc_table;
TTableDescriptor t_table_desc;
t_table_desc.id = 0;
t_table_desc.tableType = TTableType::OLAP_TABLE;
t_table_desc.numCols = 0;
t_table_desc.numClusteringCols = 0;
t_desc_table.tableDescriptors.push_back(t_table_desc);
t_desc_table.__isset.tableDescriptors = true;
// init boolean and numeric slot
std::vector<std::string> numeric_types = {"boolean_col", "tinyint_col", "smallint_col",
"int_col", "bigint_col", "float_col",
"double_col"};
for (int i = 0; i < numeric_types.size(); i++) {
TSlotDescriptor tslot_desc;
{
tslot_desc.id = i;
tslot_desc.parent = 0;
TTypeDesc type;
{
TTypeNode node;
node.__set_type(TTypeNodeType::SCALAR);
TScalarType scalar_type;
scalar_type.__set_type(TPrimitiveType::type(i + 2));
node.__set_scalar_type(scalar_type);
type.types.push_back(node);
}
tslot_desc.slotType = type;
tslot_desc.columnPos = 0;
tslot_desc.byteOffset = 0;
tslot_desc.nullIndicatorByte = 0;
tslot_desc.nullIndicatorBit = -1;
tslot_desc.colName = numeric_types[i];
tslot_desc.slotIdx = 0;
tslot_desc.isMaterialized = true;
t_desc_table.slotDescriptors.push_back(tslot_desc);
}
}
t_desc_table.__isset.slotDescriptors = true;
{
// TTupleDescriptor dest
TTupleDescriptor t_tuple_desc;
t_tuple_desc.id = 0;
t_tuple_desc.byteSize = 16;
t_tuple_desc.numNullBytes = 0;
t_tuple_desc.tableId = 0;
t_tuple_desc.__isset.tableId = true;
t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
}
DescriptorTbl* desc_tbl;
ObjectPool obj_pool;
DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl);
auto slot_descs = desc_tbl->get_tuple_descriptor(0)->slots();
LocalFileReader* reader =
new LocalFileReader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0);
cctz::time_zone ctz;
TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz);
auto p_reader = new ParquetReader(reader, slot_descs.size(), 1024, 0, 1000, &ctz);
RuntimeState runtime_state((TQueryGlobals()));
runtime_state.set_desc_tbl(desc_tbl);
runtime_state.init_instance_mem_tracker();
auto tuple_desc = desc_tbl->get_tuple_descriptor(0);
std::vector<ExprContext*> conjunct_ctxs = std::vector<ExprContext*>();
p_reader->init_reader(tuple_desc, slot_descs, conjunct_ctxs, runtime_state.timezone());
Block* block = new Block();
for (const auto& slot_desc : tuple_desc->slots()) {
auto data_type =
vectorized::DataTypeFactory::instance().create_data_type(slot_desc->type(), true);
MutableColumnPtr data_column = data_type->create_column();
block->insert(
ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name()));
}
bool eof = false;
p_reader->read_next_batch(block, &eof);
for (auto& col : block->get_columns_with_type_and_name()) {
ASSERT_EQ(col.column->size(), 10);
}
EXPECT_TRUE(eof);
delete block;
delete p_reader;
}
TEST_F(ParquetReaderTest, scanner) {
TDescriptorTable t_desc_table;
TTableDescriptor t_table_desc;
t_table_desc.id = 0;
t_table_desc.tableType = TTableType::OLAP_TABLE;
t_table_desc.numCols = 7;
t_table_desc.numClusteringCols = 0;
t_desc_table.tableDescriptors.push_back(t_table_desc);
t_desc_table.__isset.tableDescriptors = true;
// init boolean and numeric slot
std::vector<std::string> numeric_types = {"boolean_col", "tinyint_col", "smallint_col",
"int_col", "bigint_col", "float_col",
"double_col"};
for (int i = 0; i < numeric_types.size(); i++) {
TSlotDescriptor tslot_desc;
{
tslot_desc.id = i;
tslot_desc.parent = 0;
TTypeDesc type;
{
TTypeNode node;
node.__set_type(TTypeNodeType::SCALAR);
TScalarType scalar_type;
scalar_type.__set_type(TPrimitiveType::type(i + 2));
node.__set_scalar_type(scalar_type);
type.types.push_back(node);
}
tslot_desc.slotType = type;
tslot_desc.columnPos = 0;
tslot_desc.byteOffset = 0;
tslot_desc.nullIndicatorByte = 1;
tslot_desc.nullIndicatorBit = 1;
tslot_desc.colName = numeric_types[i];
tslot_desc.slotIdx = 0;
tslot_desc.isMaterialized = true;
t_desc_table.slotDescriptors.push_back(tslot_desc);
}
}
t_desc_table.__isset.slotDescriptors = true;
{
TTupleDescriptor t_tuple_desc;
t_tuple_desc.id = 0;
t_tuple_desc.byteSize = 16;
t_tuple_desc.numNullBytes = 0;
t_tuple_desc.tableId = 0;
t_tuple_desc.__isset.tableId = true;
t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
}
// set scan range
// std::vector<TScanRangeParams> scan_ranges;
TFileScanRange file_scan_range;
{
// TScanRangeParams scan_range_params;
// TFileScanRange file_scan_range;
TFileScanRangeParams params;
{
params.__set_src_tuple_id(0);
params.__set_num_of_columns_from_file(7);
params.file_type = TFileType::FILE_LOCAL;
params.format_type = TFileFormatType::FORMAT_PARQUET;
std::vector<TFileScanSlotInfo> file_slots;
for (int i = 0; i < numeric_types.size(); i++) {
TFileScanSlotInfo slot_info;
slot_info.slot_id = i;
slot_info.is_file_slot = true;
file_slots.emplace_back(slot_info);
}
params.__set_required_slots(file_slots);
}
file_scan_range.params = params;
TFileRangeDesc range;
{
range.start_offset = 0;
range.size = 1000;
range.path = "./be/test/exec/test_data/parquet_scanner/type-decoder.parquet";
std::vector<std::string> columns_from_path {"value"};
range.__set_columns_from_path(columns_from_path);
}
file_scan_range.ranges.push_back(range);
// scan_range_params.scan_range.ext_scan_range.__set_file_scan_range(broker_scan_range);
// scan_ranges.push_back(scan_range_params);
}
std::vector<TExpr> pre_filter_texprs = std::vector<TExpr>();
RuntimeState runtime_state((TQueryGlobals()));
runtime_state.init_instance_mem_tracker();
DescriptorTbl* desc_tbl;
ObjectPool obj_pool;
DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl);
runtime_state.set_desc_tbl(desc_tbl);
ScannerCounter counter;
std::vector<ExprContext*> conjunct_ctxs = std::vector<ExprContext*>();
auto scan = new ParquetFileHdfsScanner(&runtime_state, runtime_state.runtime_profile(),
file_scan_range.params, file_scan_range.ranges,
pre_filter_texprs, &counter);
scan->reg_conjunct_ctxs(0, conjunct_ctxs);
Status st = scan->open();
EXPECT_TRUE(st.ok());
bool eof = false;
Block* block = new Block();
scan->get_next(block, &eof);
for (auto& col : block->get_columns_with_type_and_name()) {
ASSERT_EQ(col.column->size(), 10);
}
delete block;
delete scan;
}
} // namespace vectorized
} // namespace doris