[feature-wip](parquet-reader) row group reader ut finish (#11887)

Co-authored-by: jinzhe <jinzhe@selectdb.com>
This commit is contained in:
slothever
2022-08-18 17:18:14 +08:00
committed by GitHub
parent 4fa53b4cdb
commit 124b4f7694
8 changed files with 215 additions and 42 deletions

View File

@ -34,7 +34,7 @@ constexpr uint8_t PARQUET_VERSION_NUMBER[4] = {'P', 'A', 'R', '1'};
constexpr int64_t PARQUET_FOOTER_READ_SIZE = 64 * 1024;
constexpr uint32_t PARQUET_FOOTER_SIZE = 8;
Status parse_thrift_footer(FileReader* file, std::shared_ptr<FileMetaData>& file_metadata) {
static Status parse_thrift_footer(FileReader* file, std::shared_ptr<FileMetaData>& file_metadata) {
// try with buffer on stack
uint8_t buff[PARQUET_FOOTER_READ_SIZE];
int64_t file_size = file->size();

View File

@ -32,19 +32,14 @@ Status ColumnChunkReader::init() {
? _metadata.dictionary_page_offset
: _metadata.data_page_offset;
size_t chunk_size = _metadata.total_compressed_size;
VLOG_DEBUG << "create _page_reader";
_page_reader = std::make_unique<PageReader>(_stream_reader, start_offset, chunk_size);
if (_metadata.__isset.dictionary_page_offset) {
RETURN_IF_ERROR(_decode_dict_page());
}
// seek to the first data page
_page_reader->seek_to_page(_metadata.data_page_offset);
// get the block compression codec
RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, _block_compress_codec));
VLOG_DEBUG << "initColumnChunkReader finish";
return Status::OK();
}

View File

@ -37,7 +37,6 @@ Status ParquetColumnReader::create(FileReader* file, FieldSchema* field,
if (field->type.type == TYPE_ARRAY) {
return Status::Corruption("not supported array type yet");
} else {
VLOG_DEBUG << "field->physical_column_index: " << field->physical_column_index;
tparquet::ColumnChunk chunk = row_group.columns[field->physical_column_index];
ScalarColumnReader* scalar_reader = new ScalarColumnReader(column);
scalar_reader->init_column_metadata(chunk);
@ -60,23 +59,27 @@ void ParquetColumnReader::_skipped_pages() {}
Status ScalarColumnReader::init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* chunk,
std::vector<RowRange>& row_ranges) {
BufferedFileStreamReader stream_reader(file, _metadata->start_offset(), _metadata->size());
_row_ranges.reset(&row_ranges);
_chunk_reader.reset(new ColumnChunkReader(&stream_reader, chunk, field));
_chunk_reader->init();
_stream_reader =
new BufferedFileStreamReader(file, _metadata->start_offset(), _metadata->size());
_row_ranges = &row_ranges;
_chunk_reader.reset(new ColumnChunkReader(_stream_reader, chunk, field));
RETURN_IF_ERROR(_chunk_reader->init());
RETURN_IF_ERROR(_chunk_reader->next_page());
if (_row_ranges->size() != 0) {
_skipped_pages();
}
RETURN_IF_ERROR(_chunk_reader->load_page_data());
return Status::OK();
}
Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
size_t batch_size, size_t* read_rows, bool* eof) {
if (_chunk_reader->remaining_num_values() <= 0) {
// seek to next page header
_chunk_reader->next_page();
RETURN_IF_ERROR(_chunk_reader->next_page());
if (_row_ranges->size() != 0) {
_skipped_pages();
}
// load data to decoder
_chunk_reader->load_page_data();
RETURN_IF_ERROR(_chunk_reader->load_page_data());
}
size_t read_values = _chunk_reader->remaining_num_values() < batch_size
? _chunk_reader->remaining_num_values()
@ -84,14 +87,14 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr
*read_rows = read_values;
WhichDataType which_type(type);
switch (_metadata->t_metadata().type) {
case tparquet::Type::INT32: {
case tparquet::Type::INT32:
case tparquet::Type::INT64:
case tparquet::Type::FLOAT:
case tparquet::Type::DOUBLE:
case tparquet::Type::BOOLEAN: {
_chunk_reader->decode_values(doris_column, type, read_values);
return Status::OK();
}
case tparquet::Type::INT64: {
// todo: test int64
return Status::OK();
}
default:
return Status::Corruption("unsupported parquet data type");
}

View File

@ -50,7 +50,12 @@ private:
class ParquetColumnReader {
public:
ParquetColumnReader(const ParquetReadColumn& column) : _column(column) {};
virtual ~ParquetColumnReader() = default;
virtual ~ParquetColumnReader() {
if (_stream_reader != nullptr) {
delete _stream_reader;
_stream_reader = nullptr;
}
};
virtual Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, size_t batch_size,
size_t* read_rows, bool* eof) = 0;
static Status create(FileReader* file, FieldSchema* field, const ParquetReadColumn& column,
@ -64,14 +69,15 @@ protected:
protected:
const ParquetReadColumn& _column;
BufferedFileStreamReader* _stream_reader;
std::unique_ptr<ParquetColumnMetadata> _metadata;
std::unique_ptr<std::vector<RowRange>> _row_ranges;
std::vector<RowRange>* _row_ranges;
};
class ScalarColumnReader : public ParquetColumnReader {
public:
ScalarColumnReader(const ParquetReadColumn& column) : ParquetColumnReader(column) {};
~ScalarColumnReader() override = default;
~ScalarColumnReader() override { close(); };
Status init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* chunk,
std::vector<RowRange>& row_ranges);
Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, size_t batch_size,

View File

@ -45,10 +45,9 @@ Status RowGroupReader::init(const FieldDescriptor& schema, std::vector<RowRange>
Status RowGroupReader::_init_column_readers(const FieldDescriptor& schema,
std::vector<RowRange>& row_ranges) {
for (auto& read_col : _read_columns) {
SlotDescriptor* slot_desc = read_col.slot_desc;
SlotDescriptor* slot_desc = read_col._slot_desc;
TypeDescriptor col_type = slot_desc->type();
auto field = const_cast<FieldSchema*>(schema.get_column(slot_desc->col_name()));
VLOG_DEBUG << "field: " << field->debug_string();
std::unique_ptr<ParquetColumnReader> reader;
RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, field, read_col, _row_group_meta,
row_ranges, reader));
@ -62,20 +61,18 @@ Status RowGroupReader::_init_column_readers(const FieldDescriptor& schema,
}
Status RowGroupReader::next_batch(Block* block, size_t batch_size, bool* _batch_eof) {
if (_read_rows >= _total_rows) {
*_batch_eof = true;
}
for (auto& read_col : _read_columns) {
auto slot_desc = read_col.slot_desc;
auto slot_desc = read_col._slot_desc;
auto& column_with_type_and_name = block->get_by_name(slot_desc->col_name());
auto column_ptr = column_with_type_and_name.column;
auto column_type = column_with_type_and_name.type;
auto& column_ptr = column_with_type_and_name.column;
auto& column_type = column_with_type_and_name.type;
size_t batch_read_rows = 0;
RETURN_IF_ERROR(_column_readers[slot_desc->id()]->read_column_data(
column_ptr, column_type, batch_size, &batch_read_rows, _batch_eof));
_read_rows += batch_read_rows;
VLOG_DEBUG << "read column: " << column_with_type_and_name.name;
VLOG_DEBUG << "read rows in column: " << batch_read_rows;
if (_read_rows >= _total_rows) {
*_batch_eof = true;
}
}
// use data fill utils read column data to column ptr
return Status::OK();

View File

@ -62,7 +62,6 @@ Status ParquetReader::init_reader(const TupleDescriptor* tuple_desc,
}
auto schema_desc = _file_metadata->schema();
for (int i = 0; i < _file_metadata->num_columns(); ++i) {
// for test
VLOG_DEBUG << schema_desc.debug_string();
// Get the Column Reader for the boolean column
_map_column.emplace(schema_desc.get_column(i)->name, i);
@ -89,11 +88,7 @@ Status ParquetReader::_init_read_columns(const std::vector<SlotDescriptor*>& tup
VLOG_DEBUG << str_error.str();
return Status::InvalidArgument(str_error.str());
}
ParquetReadColumn column;
column.slot_desc = slot_desc;
column.parquet_column_id = parquet_col_id;
auto physical_type = _file_metadata->schema().get_column(parquet_col_id)->physical_type;
column.parquet_type = physical_type;
ParquetReadColumn column(slot_desc);
_read_columns.emplace_back(column);
VLOG_DEBUG << "slot_desc " << slot_desc->debug_string();
}

View File

@ -53,11 +53,11 @@ class ParquetReadColumn {
public:
friend class ParquetReader;
friend class RowGroupReader;
ParquetReadColumn(SlotDescriptor* slot_desc) : _slot_desc(slot_desc) {};
~ParquetReadColumn() = default;
private:
SlotDescriptor* slot_desc;
int parquet_column_id;
tparquet::Type::type parquet_type;
SlotDescriptor* _slot_desc;
// int64_t start_offset;
// int64_t chunk_size;
};

View File

@ -33,6 +33,7 @@
#include "vec/data_types/data_type_factory.hpp"
#include "vec/exec/format/parquet/parquet_thrift_util.h"
#include "vec/exec/format/parquet/vparquet_column_chunk_reader.h"
#include "vec/exec/format/parquet/vparquet_column_reader.h"
#include "vec/exec/format/parquet/vparquet_file_metadata.h"
namespace doris {
@ -353,6 +354,182 @@ TEST_F(ParquetThriftReaderTest, type_decoder) {
}
}
TEST_F(ParquetThriftReaderTest, column_reader) {
LocalFileReader file_reader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0);
auto st = file_reader.open();
EXPECT_TRUE(st.ok());
// prepare metadata
std::shared_ptr<FileMetaData> meta_data;
parse_thrift_footer(&file_reader, meta_data);
tparquet::FileMetaData t_metadata = meta_data->to_thrift_metadata();
FieldDescriptor schema_descriptor;
// todo use schema of meta_data
schema_descriptor.parse_from_thrift(t_metadata.schema);
// create scalar column reader
std::unique_ptr<ParquetColumnReader> reader;
auto field = const_cast<FieldSchema*>(schema_descriptor.get_column(0));
// create read model
TDescriptorTable t_desc_table;
// table descriptors
TTableDescriptor t_table_desc;
t_table_desc.id = 0;
t_table_desc.tableType = TTableType::OLAP_TABLE;
t_table_desc.numCols = 0;
t_table_desc.numClusteringCols = 0;
t_desc_table.tableDescriptors.push_back(t_table_desc);
t_desc_table.__isset.tableDescriptors = true;
TSlotDescriptor tslot_desc;
{
tslot_desc.id = 0;
tslot_desc.parent = 0;
TTypeDesc type;
{
TTypeNode node;
node.__set_type(TTypeNodeType::SCALAR);
TScalarType scalar_type;
scalar_type.__set_type(TPrimitiveType::TINYINT);
node.__set_scalar_type(scalar_type);
type.types.push_back(node);
}
tslot_desc.slotType = type;
tslot_desc.columnPos = 0;
tslot_desc.byteOffset = 0;
tslot_desc.nullIndicatorByte = 0;
tslot_desc.nullIndicatorBit = -1;
tslot_desc.colName = "tinyint_col";
tslot_desc.slotIdx = 0;
tslot_desc.isMaterialized = true;
t_desc_table.slotDescriptors.push_back(tslot_desc);
}
t_desc_table.__isset.slotDescriptors = true;
{
// TTupleDescriptor dest
TTupleDescriptor t_tuple_desc;
t_tuple_desc.id = 0;
t_tuple_desc.byteSize = 16;
t_tuple_desc.numNullBytes = 0;
t_tuple_desc.tableId = 0;
t_tuple_desc.__isset.tableId = true;
t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
}
DescriptorTbl* desc_tbl;
ObjectPool obj_pool;
DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl);
auto slot_desc = desc_tbl->get_slot_descriptor(0);
ParquetReadColumn column(slot_desc);
std::vector<RowRange> row_ranges = std::vector<RowRange>();
ParquetColumnReader::create(&file_reader, field, column, t_metadata.row_groups[0], row_ranges,
reader);
std::unique_ptr<vectorized::Block> block;
create_block(block);
auto& column_with_type_and_name = block->get_by_name(slot_desc->col_name());
auto& column_ptr = column_with_type_and_name.column;
auto& column_type = column_with_type_and_name.type;
size_t batch_read_rows = 0;
bool batch_eof = false;
ASSERT_EQ(column_ptr->size(), 0);
reader->read_column_data(column_ptr, column_type, 1024, &batch_read_rows, &batch_eof);
EXPECT_TRUE(!batch_eof);
ASSERT_EQ(batch_read_rows, 10);
ASSERT_EQ(column_ptr->size(), 10);
auto* nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>((*std::move(column_ptr)).mutate().get());
MutableColumnPtr nested_column = nullable_column->get_nested_column_ptr();
int int_sum = 0;
for (int i = 0; i < column_ptr->size(); i++) {
int_sum += (int8_t)column_ptr->get64(i);
}
ASSERT_EQ(int_sum, 5);
}
TEST_F(ParquetThriftReaderTest, group_reader) {
TDescriptorTable t_desc_table;
TTableDescriptor t_table_desc;
std::vector<std::string> int_types = {"boolean_col", "tinyint_col", "smallint_col", "int_col",
"bigint_col", "float_col", "double_col"};
// "string_col"
t_table_desc.id = 0;
t_table_desc.tableType = TTableType::OLAP_TABLE;
t_table_desc.numCols = 0;
t_table_desc.numClusteringCols = 0;
t_desc_table.tableDescriptors.push_back(t_table_desc);
t_desc_table.__isset.tableDescriptors = true;
for (int i = 0; i < int_types.size(); i++) {
TSlotDescriptor tslot_desc;
{
tslot_desc.id = i;
tslot_desc.parent = 0;
TTypeDesc type;
{
TTypeNode node;
node.__set_type(TTypeNodeType::SCALAR);
TScalarType scalar_type;
scalar_type.__set_type(TPrimitiveType::type(i + 2));
node.__set_scalar_type(scalar_type);
type.types.push_back(node);
}
tslot_desc.slotType = type;
tslot_desc.columnPos = 0;
tslot_desc.byteOffset = 0;
tslot_desc.nullIndicatorByte = 0;
tslot_desc.nullIndicatorBit = -1;
tslot_desc.colName = int_types[i];
tslot_desc.slotIdx = 0;
tslot_desc.isMaterialized = true;
t_desc_table.slotDescriptors.push_back(tslot_desc);
}
}
t_desc_table.__isset.slotDescriptors = true;
{
// TTupleDescriptor dest
TTupleDescriptor t_tuple_desc;
t_tuple_desc.id = 0;
t_tuple_desc.byteSize = 16;
t_tuple_desc.numNullBytes = 0;
t_tuple_desc.tableId = 0;
t_tuple_desc.__isset.tableId = true;
t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
}
DescriptorTbl* desc_tbl;
ObjectPool obj_pool;
DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl);
std::vector<ParquetReadColumn> read_columns;
for (int i = 0; i < int_types.size(); i++) {
auto slot_desc = desc_tbl->get_slot_descriptor(i);
ParquetReadColumn column(slot_desc);
read_columns.emplace_back(column);
}
LocalFileReader file_reader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0);
auto st = file_reader.open();
EXPECT_TRUE(st.ok());
// prepare metadata
std::shared_ptr<FileMetaData> meta_data;
parse_thrift_footer(&file_reader, meta_data);
tparquet::FileMetaData t_metadata = meta_data->to_thrift_metadata();
auto row_group = t_metadata.row_groups[0];
std::shared_ptr<RowGroupReader> row_group_reader;
row_group_reader.reset(new RowGroupReader(&file_reader, read_columns, 0, row_group));
std::vector<RowRange> row_ranges = std::vector<RowRange>();
auto stg = row_group_reader->init(meta_data->schema(), row_ranges);
EXPECT_TRUE(stg.ok());
std::unique_ptr<vectorized::Block> block;
create_block(block);
bool batch_eof = false;
auto stb = row_group_reader->next_batch(block.get(), 1024, &batch_eof);
EXPECT_TRUE(stb.ok());
LOG(WARNING) << "block data: " << block->dump_structure();
}
} // namespace vectorized
} // namespace doris