[feature-wip](parquet-reader) add predicate filter and column reader (#11488)
This commit is contained in:
@ -17,6 +17,8 @@
|
||||
|
||||
#include "file_hdfs_scanner.h"
|
||||
|
||||
#include "io/file_factory.h"
|
||||
|
||||
namespace doris::vectorized {
|
||||
|
||||
Status ParquetFileHdfsScanner::open() {
|
||||
@ -24,7 +26,31 @@ Status ParquetFileHdfsScanner::open() {
|
||||
}
|
||||
|
||||
Status ParquetFileHdfsScanner::get_next(vectorized::Block* block, bool* eof) {
|
||||
return Status();
|
||||
// todo: get block from queue
|
||||
auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId);
|
||||
if (_next_range >= _ranges.size()) {
|
||||
_scanner_eof = true;
|
||||
return Status::OK();
|
||||
}
|
||||
const TFileRangeDesc& range = _ranges[_next_range++];
|
||||
std::unique_ptr<FileReader> file_reader;
|
||||
RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(), _profile, _params, range,
|
||||
file_reader));
|
||||
_reader.reset(new ParquetReader(file_reader.release(), _file_slot_descs.size(),
|
||||
range.start_offset, range.size));
|
||||
Status status =
|
||||
_reader->init_reader(tuple_desc, _file_slot_descs, _conjunct_ctxs, _state->timezone());
|
||||
if (!status.ok()) {
|
||||
_scanner_eof = true;
|
||||
return Status::OK();
|
||||
}
|
||||
while (_reader->has_next()) {
|
||||
Status st = _reader->read_next_batch(block);
|
||||
if (st.is_end_of_file()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void ParquetFileHdfsScanner::close() {}
|
||||
|
||||
@ -18,18 +18,13 @@
|
||||
#pragma once
|
||||
|
||||
#include "common/status.h"
|
||||
#include "file_scanner.h"
|
||||
#include "vec/core/block.h"
|
||||
#include "vec/exec/format/parquet/vparquet_reader.h"
|
||||
|
||||
namespace doris::vectorized {
|
||||
|
||||
class HdfsFileScanner {
|
||||
public:
|
||||
virtual Status open() = 0;
|
||||
|
||||
virtual Status get_next(vectorized::Block* block, bool* eof) = 0;
|
||||
|
||||
virtual void close() = 0;
|
||||
};
|
||||
class HdfsFileScanner : public FileScanner {};
|
||||
|
||||
class ParquetFileHdfsScanner : public HdfsFileScanner {
|
||||
public:
|
||||
@ -41,6 +36,9 @@ public:
|
||||
|
||||
private:
|
||||
void _prefetch_batch();
|
||||
|
||||
private:
|
||||
std::shared_ptr<ParquetReader> _reader;
|
||||
};
|
||||
|
||||
} // namespace doris::vectorized
|
||||
@ -17,4 +17,53 @@
|
||||
|
||||
#include "vparquet_column_reader.h"
|
||||
|
||||
namespace doris::vectorized {}
|
||||
#include <common/status.h>
|
||||
#include <gen_cpp/parquet_types.h>
|
||||
|
||||
#include "schema_desc.h"
|
||||
#include "vparquet_column_chunk_reader.h"
|
||||
|
||||
namespace doris::vectorized {
|
||||
|
||||
Status ScalarColumnReader::init(const FileReader* file, const FieldSchema* field,
|
||||
const tparquet::ColumnChunk* chunk, const TypeDescriptor& col_type,
|
||||
int64_t chunk_size) {
|
||||
// todo1: init column chunk reader
|
||||
// BufferedFileStreamReader stream_reader(reader, 0, chunk_size);
|
||||
// _chunk_reader(&stream_reader, chunk, field);
|
||||
// _chunk_reader.init();
|
||||
return Status();
|
||||
}
|
||||
|
||||
Status ParquetColumnReader::create(const FileReader* file, int64_t chunk_size,
|
||||
const FieldSchema* field, const ParquetReadColumn& column,
|
||||
const TypeDescriptor& col_type,
|
||||
const tparquet::RowGroup& row_group,
|
||||
const ParquetColumnReader* reader) {
|
||||
if (field->type.type == TYPE_MAP || field->type.type == TYPE_STRUCT) {
|
||||
return Status::Corruption("not supported type");
|
||||
}
|
||||
if (field->type.type == TYPE_ARRAY) {
|
||||
return Status::Corruption("not supported array type yet");
|
||||
} else {
|
||||
ScalarColumnReader* scalar_reader = new ScalarColumnReader(column);
|
||||
RETURN_IF_ERROR(scalar_reader->init(file, field,
|
||||
&row_group.columns[field->physical_column_index],
|
||||
col_type, chunk_size));
|
||||
reader = scalar_reader;
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status ScalarColumnReader::read_column_data(const tparquet::RowGroup& row_group_meta,
|
||||
ColumnPtr* data) {
|
||||
// todo2: read data with chunk reader to load page data
|
||||
// while (_chunk_reader.has_next) {
|
||||
// _chunk_reader.next_page();
|
||||
// _chunk_reader.load_page_data();
|
||||
// }
|
||||
return Status();
|
||||
}
|
||||
|
||||
void ScalarColumnReader::close() {}
|
||||
}; // namespace doris::vectorized
|
||||
@ -17,5 +17,39 @@
|
||||
|
||||
#pragma once
|
||||
#include <common/status.h>
|
||||
#include <gen_cpp/parquet_types.h>
|
||||
|
||||
namespace doris::vectorized {}
|
||||
#include "schema_desc.h"
|
||||
#include "vparquet_reader.h"
|
||||
//#include "vparquet_column_chunk_reader.h"
|
||||
|
||||
namespace doris::vectorized {
|
||||
|
||||
class ParquetReadColumn;
|
||||
|
||||
class ParquetColumnReader {
|
||||
public:
|
||||
ParquetColumnReader(const ParquetReadColumn& column) : _column(column) {};
|
||||
virtual ~ParquetColumnReader() = 0;
|
||||
virtual Status read_column_data(const tparquet::RowGroup& row_group_meta, ColumnPtr* data) = 0;
|
||||
static Status create(const FileReader* file, int64_t chunk_size, const FieldSchema* field,
|
||||
const ParquetReadColumn& column, const TypeDescriptor& col_type,
|
||||
const tparquet::RowGroup& row_group, const ParquetColumnReader* reader);
|
||||
virtual void close() = 0;
|
||||
|
||||
protected:
|
||||
const ParquetReadColumn& _column;
|
||||
// const ColumnChunkReader& _chunk_reader;
|
||||
};
|
||||
|
||||
class ScalarColumnReader : public ParquetColumnReader {
|
||||
public:
|
||||
ScalarColumnReader(const ParquetReadColumn& column) : ParquetColumnReader(column) {};
|
||||
~ScalarColumnReader() override = default;
|
||||
Status init(const FileReader* file, const FieldSchema* field,
|
||||
const tparquet::ColumnChunk* chunk, const TypeDescriptor& col_type,
|
||||
int64_t chunk_size);
|
||||
Status read_column_data(const tparquet::RowGroup& row_group_meta, ColumnPtr* data) override;
|
||||
void close() override;
|
||||
};
|
||||
}; // namespace doris::vectorized
|
||||
@ -17,55 +17,226 @@
|
||||
|
||||
#include "vparquet_group_reader.h"
|
||||
|
||||
#include "parquet_pred_cmp.h"
|
||||
#include "schema_desc.h"
|
||||
#include "vparquet_column_reader.h"
|
||||
|
||||
namespace doris::vectorized {
|
||||
|
||||
RowGroupReader::RowGroupReader(doris::FileReader* file_reader,
|
||||
std::shared_ptr<FileMetaData> file_metadata,
|
||||
const std::vector<int>& column_ids)
|
||||
const std::shared_ptr<FileMetaData>& file_metadata,
|
||||
const std::vector<ParquetReadColumn>& read_columns,
|
||||
const std::map<std::string, int>& map_column,
|
||||
const std::vector<ExprContext*>& conjunct_ctxs)
|
||||
: _file_reader(file_reader),
|
||||
_file_metadata(file_metadata),
|
||||
_column_ids(column_ids),
|
||||
_current_row_group(0) {
|
||||
DCHECK_LE(column_ids.size(), file_metadata->num_columns());
|
||||
_init_column_readers(column_ids);
|
||||
_read_columns(read_columns),
|
||||
_map_column(map_column),
|
||||
_conjunct_ctxs(conjunct_ctxs),
|
||||
_current_row_group(-1) {}
|
||||
|
||||
RowGroupReader::~RowGroupReader() {
|
||||
for (auto& column_reader : _column_readers) {
|
||||
auto reader = column_reader.second;
|
||||
reader->close();
|
||||
delete reader;
|
||||
reader = nullptr;
|
||||
}
|
||||
_column_readers.clear();
|
||||
}
|
||||
|
||||
void RowGroupReader::_init_column_readers(const std::vector<int>& column_ids) {
|
||||
// for (int col_id: column_ids) {
|
||||
//
|
||||
// }
|
||||
Status RowGroupReader::init(const TupleDescriptor* tuple_desc, int64_t split_start_offset,
|
||||
int64_t split_size) {
|
||||
_tuple_desc = tuple_desc;
|
||||
_split_start_offset = split_start_offset;
|
||||
_split_size = split_size;
|
||||
_init_conjuncts(tuple_desc, _conjunct_ctxs);
|
||||
RETURN_IF_ERROR(_init_column_readers());
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status RowGroupReader::read_next_row_group(const int32_t* group_id) {
|
||||
int32_t total_group = _file_metadata->num_row_groups();
|
||||
while (_current_row_group < total_group) {
|
||||
bool filter_group = false;
|
||||
_process_row_group_filter(&filter_group);
|
||||
if (!filter_group) {
|
||||
group_id = &_current_row_group;
|
||||
void RowGroupReader::_init_conjuncts(const TupleDescriptor* tuple_desc,
|
||||
const std::vector<ExprContext*>& conjunct_ctxs) {
|
||||
if (tuple_desc->slots().empty()) {
|
||||
return;
|
||||
}
|
||||
for (auto& read_col : _read_columns) {
|
||||
_parquet_column_ids.emplace(read_col.parquet_column_id);
|
||||
}
|
||||
|
||||
for (int i = 0; i < tuple_desc->slots().size(); i++) {
|
||||
auto col_iter = _map_column.find(tuple_desc->slots()[i]->col_name());
|
||||
if (col_iter == _map_column.end()) {
|
||||
continue;
|
||||
}
|
||||
_current_row_group++;
|
||||
int parquet_col_id = col_iter->second;
|
||||
if (_parquet_column_ids.end() == _parquet_column_ids.find(parquet_col_id)) {
|
||||
continue;
|
||||
}
|
||||
for (int conj_idx = 0; conj_idx < conjunct_ctxs.size(); conj_idx++) {
|
||||
Expr* conjunct = conjunct_ctxs[conj_idx]->root();
|
||||
if (conjunct->get_num_children() == 0) {
|
||||
continue;
|
||||
}
|
||||
Expr* raw_slot = conjunct->get_child(0);
|
||||
if (TExprNodeType::SLOT_REF != raw_slot->node_type()) {
|
||||
continue;
|
||||
}
|
||||
SlotRef* slot_ref = (SlotRef*)raw_slot;
|
||||
SlotId conjunct_slot_id = slot_ref->slot_id();
|
||||
if (conjunct_slot_id == tuple_desc->slots()[i]->id()) {
|
||||
// Get conjuncts by conjunct_slot_id
|
||||
auto iter = _slot_conjuncts.find(conjunct_slot_id);
|
||||
if (_slot_conjuncts.end() == iter) {
|
||||
std::vector<ExprContext*> conjuncts;
|
||||
conjuncts.emplace_back(conjunct_ctxs[conj_idx]);
|
||||
_slot_conjuncts.emplace(std::make_pair(conjunct_slot_id, conjuncts));
|
||||
} else {
|
||||
std::vector<ExprContext*> conjuncts = iter->second;
|
||||
conjuncts.emplace_back(conjunct_ctxs[conj_idx]);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Status RowGroupReader::_init_column_readers() {
|
||||
for (auto& read_col : _read_columns) {
|
||||
SlotDescriptor* slot_desc = read_col.slot_desc;
|
||||
FieldDescriptor schema = _file_metadata->schema();
|
||||
TypeDescriptor col_type = slot_desc->type();
|
||||
const auto& field = schema.get_column(slot_desc->col_name());
|
||||
const tparquet::RowGroup row_group =
|
||||
_file_metadata->to_thrift_metadata().row_groups[_current_row_group];
|
||||
ParquetColumnReader* reader = nullptr;
|
||||
RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, MAX_PARQUET_BLOCK_SIZE, field,
|
||||
read_col, slot_desc->type(), row_group,
|
||||
reader));
|
||||
if (reader == nullptr) {
|
||||
return Status::Corruption("Init row group reader failed");
|
||||
}
|
||||
_column_readers[slot_desc->id()] = reader;
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status RowGroupReader::_process_row_group_filter(bool* filter_group) {
|
||||
Status RowGroupReader::fill_columns_data(Block* block, const int32_t group_id) {
|
||||
// get ColumnWithTypeAndName from src_block
|
||||
for (auto& read_col : _read_columns) {
|
||||
const tparquet::RowGroup row_group =
|
||||
_file_metadata->to_thrift_metadata().row_groups[_current_row_group];
|
||||
auto& column_with_type_and_name = block->get_by_name(read_col.slot_desc->col_name());
|
||||
RETURN_IF_ERROR(_column_readers[read_col.slot_desc->id()]->read_column_data(
|
||||
row_group, &column_with_type_and_name.column));
|
||||
VLOG_DEBUG << column_with_type_and_name.name;
|
||||
}
|
||||
// use data fill utils read column data to column ptr
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status RowGroupReader::get_next_row_group(const int32_t* group_id) {
|
||||
int32_t total_group = _file_metadata->num_row_groups();
|
||||
if (total_group == 0 || _file_metadata->num_rows() == 0 || _split_size < 0) {
|
||||
return Status::EndOfFile("No row group need read");
|
||||
}
|
||||
while (_current_row_group < total_group) {
|
||||
_current_row_group++;
|
||||
const tparquet::RowGroup& row_group =
|
||||
_file_metadata->to_thrift_metadata().row_groups[_current_row_group];
|
||||
if (!_is_misaligned_range_group(row_group)) {
|
||||
continue;
|
||||
}
|
||||
bool filter_group = false;
|
||||
RETURN_IF_ERROR(_process_row_group_filter(row_group, _conjunct_ctxs, &filter_group));
|
||||
if (!filter_group) {
|
||||
group_id = &_current_row_group;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
bool RowGroupReader::_is_misaligned_range_group(const tparquet::RowGroup& row_group) {
|
||||
int64_t start_offset = _get_column_start_offset(row_group.columns[0].meta_data);
|
||||
|
||||
auto last_column = row_group.columns[row_group.columns.size() - 1].meta_data;
|
||||
int64_t end_offset = _get_column_start_offset(last_column) + last_column.total_compressed_size;
|
||||
|
||||
int64_t row_group_mid = start_offset + (end_offset - start_offset) / 2;
|
||||
if (!(row_group_mid >= _split_start_offset &&
|
||||
row_group_mid < _split_start_offset + _split_size)) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
Status RowGroupReader::_process_row_group_filter(const tparquet::RowGroup& row_group,
|
||||
const std::vector<ExprContext*>& conjunct_ctxs,
|
||||
bool* filter_group) {
|
||||
_process_column_stat_filter(row_group, conjunct_ctxs, filter_group);
|
||||
_init_chunk_dicts();
|
||||
RETURN_IF_ERROR(_process_dict_filter());
|
||||
RETURN_IF_ERROR(_process_dict_filter(filter_group));
|
||||
_init_bloom_filter();
|
||||
RETURN_IF_ERROR(_process_bloom_filter());
|
||||
RETURN_IF_ERROR(_process_bloom_filter(filter_group));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status RowGroupReader::_process_column_stat_filter(const tparquet::RowGroup& row_group,
|
||||
const std::vector<ExprContext*>& conjunct_ctxs,
|
||||
bool* filter_group) {
|
||||
int total_group = _file_metadata->num_row_groups();
|
||||
// It will not filter if head_group_offset equals tail_group_offset
|
||||
int64_t total_rows = 0;
|
||||
int64_t total_bytes = 0;
|
||||
for (int row_group_id = 0; row_group_id < total_group; row_group_id++) {
|
||||
total_rows += row_group.num_rows;
|
||||
total_bytes += row_group.total_byte_size;
|
||||
for (SlotId slot_id = 0; slot_id < _tuple_desc->slots().size(); slot_id++) {
|
||||
const std::string& col_name = _tuple_desc->slots()[slot_id]->col_name();
|
||||
auto col_iter = _map_column.find(col_name);
|
||||
if (col_iter == _map_column.end()) {
|
||||
continue;
|
||||
}
|
||||
int parquet_col_id = col_iter->second;
|
||||
if (_parquet_column_ids.end() == _parquet_column_ids.find(parquet_col_id)) {
|
||||
// Column not exist in parquet file
|
||||
continue;
|
||||
}
|
||||
auto slot_iter = _slot_conjuncts.find(slot_id);
|
||||
if (slot_iter == _slot_conjuncts.end()) {
|
||||
continue;
|
||||
}
|
||||
auto statistic = row_group.columns[parquet_col_id].meta_data.statistics;
|
||||
if (!statistic.__isset.max || !statistic.__isset.min) {
|
||||
continue;
|
||||
}
|
||||
// Min-max of statistic is plain-encoded value
|
||||
*filter_group =
|
||||
_determine_filter_row_group(slot_iter->second, statistic.min, statistic.max);
|
||||
if (*filter_group) {
|
||||
_filtered_num_row_groups++;
|
||||
VLOG_DEBUG << "Filter row group id: " << row_group_id;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
VLOG_DEBUG << "DEBUG total_rows: " << total_rows;
|
||||
VLOG_DEBUG << "DEBUG total_bytes: " << total_bytes;
|
||||
VLOG_DEBUG << "Parquet file: " << _file_metadata->schema().debug_string()
|
||||
<< ", Num of read row group: " << total_group
|
||||
<< ", and num of skip row group: " << _filtered_num_row_groups;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void RowGroupReader::_init_chunk_dicts() {}
|
||||
|
||||
Status RowGroupReader::_process_dict_filter() {
|
||||
Status RowGroupReader::_process_dict_filter(bool* filter_group) {
|
||||
return Status();
|
||||
}
|
||||
|
||||
void RowGroupReader::_init_bloom_filter() {}
|
||||
|
||||
Status RowGroupReader::_process_bloom_filter() {
|
||||
Status RowGroupReader::_process_bloom_filter(bool* filter_group) {
|
||||
RETURN_IF_ERROR(_file_reader->seek(0));
|
||||
return Status();
|
||||
}
|
||||
@ -76,4 +247,12 @@ int64_t RowGroupReader::_get_row_group_start_offset(const tparquet::RowGroup& ro
|
||||
}
|
||||
return row_group.columns[0].meta_data.data_page_offset;
|
||||
}
|
||||
|
||||
int64_t RowGroupReader::_get_column_start_offset(const tparquet::ColumnMetaData& column) {
|
||||
if (column.__isset.dictionary_page_offset) {
|
||||
DCHECK_LT(column.dictionary_page_offset, column.data_page_offset);
|
||||
return column.dictionary_page_offset;
|
||||
}
|
||||
return column.data_page_offset;
|
||||
}
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -19,32 +19,54 @@
|
||||
|
||||
#include "exprs/expr_context.h"
|
||||
#include "io/file_reader.h"
|
||||
#include "vec/core/block.h"
|
||||
#include "vparquet_column_reader.h"
|
||||
#include "vparquet_file_metadata.h"
|
||||
#include "vparquet_reader.h"
|
||||
|
||||
#define MAX_PARQUET_BLOCK_SIZE 1024
|
||||
|
||||
namespace doris::vectorized {
|
||||
class ParquetReadColumn;
|
||||
class ParquetColumnReader;
|
||||
class RowGroupReader {
|
||||
public:
|
||||
RowGroupReader(doris::FileReader* file_reader, std::shared_ptr<FileMetaData> file_metadata,
|
||||
const std::vector<int>& column_ids);
|
||||
|
||||
~RowGroupReader() = default;
|
||||
|
||||
Status read_next_row_group(const int32_t* group_id);
|
||||
RowGroupReader(doris::FileReader* file_reader,
|
||||
const std::shared_ptr<FileMetaData>& file_metadata,
|
||||
const std::vector<ParquetReadColumn>& read_columns,
|
||||
const std::map<std::string, int>& map_column,
|
||||
const std::vector<ExprContext*>& conjunct_ctxs);
|
||||
~RowGroupReader();
|
||||
Status init(const TupleDescriptor* tuple_desc, int64_t split_start_offset, int64_t split_size);
|
||||
Status get_next_row_group(const int32_t* group_id);
|
||||
Status fill_columns_data(Block* block, const int32_t group_id);
|
||||
|
||||
private:
|
||||
void _init_column_readers(const std::vector<int>& column_ids);
|
||||
bool _is_misaligned_range_group(const tparquet::RowGroup& row_group);
|
||||
|
||||
Status _process_row_group_filter(bool* filter_group);
|
||||
Status _process_column_stat_filter(const tparquet::RowGroup& row_group,
|
||||
const std::vector<ExprContext*>& conjunct_ctxs,
|
||||
bool* filter_group);
|
||||
|
||||
void _init_conjuncts(const TupleDescriptor* tuple_desc,
|
||||
const std::vector<ExprContext*>& conjunct_ctxs);
|
||||
|
||||
Status _init_column_readers();
|
||||
|
||||
Status _process_row_group_filter(const tparquet::RowGroup& row_group,
|
||||
const std::vector<ExprContext*>& conjunct_ctxs,
|
||||
bool* filter_group);
|
||||
|
||||
void _init_chunk_dicts();
|
||||
|
||||
Status _process_dict_filter();
|
||||
Status _process_dict_filter(bool* filter_group);
|
||||
|
||||
void _init_bloom_filter();
|
||||
|
||||
Status _process_bloom_filter();
|
||||
Status _process_bloom_filter(bool* filter_group);
|
||||
|
||||
int64_t _get_row_group_start_offset(const tparquet::RowGroup& row_group);
|
||||
int64_t _get_column_start_offset(const tparquet::ColumnMetaData& column_init_column_readers);
|
||||
|
||||
bool _determine_filter_row_group(const std::vector<ExprContext*>& conjuncts,
|
||||
const std::string& encoded_min,
|
||||
@ -72,8 +94,17 @@ private:
|
||||
|
||||
private:
|
||||
doris::FileReader* _file_reader;
|
||||
std::shared_ptr<FileMetaData> _file_metadata;
|
||||
std::vector<int> _column_ids;
|
||||
const std::shared_ptr<FileMetaData>& _file_metadata;
|
||||
std::unordered_map<int32_t, ParquetColumnReader*> _column_readers;
|
||||
const TupleDescriptor* _tuple_desc; // get all slot info
|
||||
const std::vector<ParquetReadColumn>& _read_columns;
|
||||
const std::map<std::string, int>& _map_column;
|
||||
std::unordered_set<int> _parquet_column_ids;
|
||||
const std::vector<ExprContext*>& _conjunct_ctxs;
|
||||
std::unordered_map<int, std::vector<ExprContext*>> _slot_conjuncts;
|
||||
int64_t _split_start_offset;
|
||||
int64_t _split_size;
|
||||
int32_t _current_row_group;
|
||||
int32_t _filtered_num_row_groups = 0;
|
||||
};
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -22,7 +22,9 @@
|
||||
namespace doris::vectorized {
|
||||
ParquetReader::ParquetReader(FileReader* file_reader, int32_t num_of_columns_from_file,
|
||||
int64_t range_start_offset, int64_t range_size)
|
||||
: _num_of_columns_from_file(num_of_columns_from_file) {
|
||||
: _num_of_columns_from_file(num_of_columns_from_file),
|
||||
_range_start_offset(range_start_offset),
|
||||
_range_size(range_size) {
|
||||
_file_reader = file_reader;
|
||||
_total_groups = 0;
|
||||
// _current_group = 0;
|
||||
@ -53,44 +55,48 @@ Status ParquetReader::init_reader(const TupleDescriptor* tuple_desc,
|
||||
if (_total_groups == 0) {
|
||||
return Status::EndOfFile("Empty Parquet File");
|
||||
}
|
||||
auto schemaDescriptor = _file_metadata->schema();
|
||||
auto schema_desc = _file_metadata->schema();
|
||||
for (int i = 0; i < _file_metadata->num_columns(); ++i) {
|
||||
LOG(WARNING) << schemaDescriptor.debug_string();
|
||||
// // Get the Column Reader for the boolean column
|
||||
// if (schemaDescriptor->(i)->max_definition_level() > 1) {
|
||||
// _map_column.emplace(schemaDescriptor->(i)->path()->ToDotVector()[0], i);
|
||||
// } else {
|
||||
// _map_column.emplace(schemaDescriptor->(i)->name(), i);
|
||||
// }
|
||||
LOG(WARNING) << schema_desc.debug_string();
|
||||
// Get the Column Reader for the boolean column
|
||||
_map_column.emplace(schema_desc.get_column(i)->name, i);
|
||||
}
|
||||
LOG(WARNING) << "";
|
||||
RETURN_IF_ERROR(_column_indices(tuple_slot_descs));
|
||||
_init_row_group_reader();
|
||||
RETURN_IF_ERROR(_init_read_columns(tuple_slot_descs));
|
||||
RETURN_IF_ERROR(
|
||||
_init_row_group_reader(tuple_desc, _range_start_offset, _range_size, conjunct_ctxs));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status ParquetReader::_column_indices(const std::vector<SlotDescriptor*>& tuple_slot_descs) {
|
||||
Status ParquetReader::_init_read_columns(const std::vector<SlotDescriptor*>& tuple_slot_descs) {
|
||||
DCHECK(_num_of_columns_from_file <= tuple_slot_descs.size());
|
||||
_include_column_ids.clear();
|
||||
for (int i = 0; i < _num_of_columns_from_file; i++) {
|
||||
auto slot_desc = tuple_slot_descs.at(i);
|
||||
// Get the Column Reader for the boolean column
|
||||
auto iter = _map_column.find(slot_desc->col_name());
|
||||
auto parquet_col_id = iter->second;
|
||||
if (iter != _map_column.end()) {
|
||||
_include_column_ids.emplace_back(iter->second);
|
||||
_include_column_ids.emplace_back(parquet_col_id);
|
||||
} else {
|
||||
std::stringstream str_error;
|
||||
str_error << "Invalid Column Name:" << slot_desc->col_name();
|
||||
LOG(WARNING) << str_error.str();
|
||||
return Status::InvalidArgument(str_error.str());
|
||||
}
|
||||
ParquetReadColumn column;
|
||||
column.slot_desc = slot_desc;
|
||||
column.parquet_column_id = parquet_col_id;
|
||||
auto physical_type = _file_metadata->schema().get_column(parquet_col_id)->physical_type;
|
||||
column.parquet_type = physical_type;
|
||||
_read_columns.emplace_back(column);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status ParquetReader::read_next_batch(Block* block) {
|
||||
int32_t group_id = 0;
|
||||
RETURN_IF_ERROR(_row_group_reader->read_next_row_group(&group_id));
|
||||
RETURN_IF_ERROR(_row_group_reader->get_next_row_group(&group_id));
|
||||
auto metadata = _file_metadata->to_thrift_metadata();
|
||||
auto column_chunks = metadata.row_groups[group_id].columns;
|
||||
if (_has_page_index(column_chunks)) {
|
||||
@ -104,19 +110,26 @@ Status ParquetReader::read_next_batch(Block* block) {
|
||||
}
|
||||
}
|
||||
// metadata has been processed, fill parquet data to block
|
||||
_fill_block_data(column_chunks);
|
||||
block = _batch;
|
||||
// block is the batch data of a row group. a row group has N batch
|
||||
// push to scanner queue
|
||||
_fill_block_data(block, group_id);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void ParquetReader::_fill_block_data(std::vector<tparquet::ColumnChunk> columns) {
|
||||
void ParquetReader::_fill_block_data(Block* block, int group_id) {
|
||||
// make and init src block here
|
||||
// read column chunk
|
||||
// _batch =
|
||||
_row_group_reader->fill_columns_data(block, group_id);
|
||||
}
|
||||
|
||||
void ParquetReader::_init_row_group_reader() {
|
||||
_row_group_reader.reset(new RowGroupReader(_file_reader, _file_metadata, _include_column_ids));
|
||||
Status ParquetReader::_init_row_group_reader(const TupleDescriptor* tuple_desc,
|
||||
int64_t range_start_offset, int64_t range_size,
|
||||
const std::vector<ExprContext*>& conjunct_ctxs) {
|
||||
// todo: extract as create()
|
||||
_row_group_reader.reset(new RowGroupReader(_file_reader, _file_metadata, _read_columns,
|
||||
_map_column, conjunct_ctxs));
|
||||
RETURN_IF_ERROR(_row_group_reader->init(tuple_desc, range_start_offset, range_size));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
bool ParquetReader::_has_page_index(std::vector<tparquet::ColumnChunk> columns) {
|
||||
|
||||
@ -41,6 +41,20 @@ namespace doris::vectorized {
|
||||
// int64_t filtered_total_bytes = 0;
|
||||
// int64_t total_bytes = 0;
|
||||
// };
|
||||
class RowGroupReader;
|
||||
|
||||
class ParquetReadColumn {
|
||||
public:
|
||||
friend class ParquetReader;
|
||||
friend class RowGroupReader;
|
||||
|
||||
private:
|
||||
SlotDescriptor* slot_desc;
|
||||
int parquet_column_id;
|
||||
tparquet::Type::type parquet_type;
|
||||
// int64_t start_offset;
|
||||
// int64_t chunk_size;
|
||||
};
|
||||
|
||||
class ParquetReader {
|
||||
public:
|
||||
@ -63,9 +77,11 @@ public:
|
||||
int64_t size() const { return _file_reader->size(); }
|
||||
|
||||
private:
|
||||
Status _column_indices(const std::vector<SlotDescriptor*>& tuple_slot_descs);
|
||||
void _init_row_group_reader();
|
||||
void _fill_block_data(std::vector<tparquet::ColumnChunk> columns);
|
||||
Status _init_read_columns(const std::vector<SlotDescriptor*>& tuple_slot_descs);
|
||||
Status _init_row_group_reader(const TupleDescriptor* tuple_desc, int64_t range_start_offset,
|
||||
int64_t range_size,
|
||||
const std::vector<ExprContext*>& conjunct_ctxs);
|
||||
void _fill_block_data(Block* block, int group_id);
|
||||
bool _has_page_index(std::vector<tparquet::ColumnChunk> columns);
|
||||
Status _process_page_index(std::vector<tparquet::ColumnChunk> columns);
|
||||
|
||||
@ -78,12 +94,13 @@ private:
|
||||
// int _current_group; // current group(stripe)
|
||||
// std::shared_ptr<Statistics> _statistics;
|
||||
const int32_t _num_of_columns_from_file;
|
||||
|
||||
std::map<std::string, int> _map_column; // column-name <---> column-index
|
||||
std::vector<int> _include_column_ids; // columns that need to get from file
|
||||
std::vector<ParquetReadColumn> _read_columns;
|
||||
// parquet file reader object
|
||||
Block* _batch;
|
||||
bool* _batch_eof;
|
||||
// int64_t _range_start_offset;
|
||||
// int64_t _range_size;
|
||||
int64_t _range_start_offset;
|
||||
int64_t _range_size;
|
||||
};
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -60,6 +60,7 @@ set(EXEC_TEST_FILES
|
||||
exec/multi_bytes_separator_test.cpp
|
||||
exec/hdfs_file_reader_test.cpp
|
||||
vec/exec/parquet/parquet_thrift_test.cpp
|
||||
vec/exec/parquet/parquet_reader_test.cpp
|
||||
# exec/new_olap_scan_node_test.cpp
|
||||
# exec/pre_aggregation_node_test.cpp
|
||||
# exec/partitioned_hash_table_test.cpp
|
||||
|
||||
48
be/test/vec/exec/parquet/parquet_reader_test.cpp
Normal file
48
be/test/vec/exec/parquet/parquet_reader_test.cpp
Normal file
@ -0,0 +1,48 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include <glog/logging.h>
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
//#include "io/buffered_reader.h"
|
||||
//#include "io/file_reader.h"
|
||||
//#include "io/local_file_reader.h"
|
||||
//#include "util/runtime_profile.h"
|
||||
//#include "vec/exec/format/parquet/vparquet_file_metadata.h"
|
||||
|
||||
namespace doris {
|
||||
namespace vectorized {
|
||||
|
||||
class ParquetReaderTest : public testing::Test {
|
||||
public:
|
||||
ParquetReaderTest() {}
|
||||
};
|
||||
|
||||
//TEST_F(ParquetReaderTest, normal) {
|
||||
// LocalFileReader reader("./be/test/exec/test_data/parquet_scanner/localfile.parquet", 0);
|
||||
// auto st = reader.open();
|
||||
// EXPECT_TRUE(st.ok());
|
||||
// std::shared_ptr<FileMetaData> metaData;
|
||||
// parse_thrift_footer(&reader, metaData);
|
||||
// tparquet::FileMetaData t_metadata = metaData->to_thrift_metadata();
|
||||
// for (auto value : t_metadata.row_groups) {
|
||||
// LOG(WARNING) << "row group num_rows: " << value.num_rows;
|
||||
// }
|
||||
//}
|
||||
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
Reference in New Issue
Block a user