[feature-wip](parquet-reader) update column read model and add page index (#11601)

This commit is contained in:
slothever
2022-08-16 15:04:07 +08:00
committed by GitHub
parent 01383c3217
commit f39f57636b
21 changed files with 555 additions and 471 deletions

View File

@ -27,7 +27,7 @@
#include "exprs/expr_value.h"
#include "exprs/slot_ref.h"
#include "udf/udf.h"
#include "vec/exec/format/parquet/vparquet_group_reader.h"
#include "vec/exec/format/parquet/vparquet_reader.h"
#undef USING_DORIS_UDF
#define USING_DORIS_UDF using namespace doris_udf
@ -38,7 +38,7 @@ namespace doris {
namespace vectorized {
class VOlapScanNode;
class RowGroupReader;
class ParquetReader;
} // namespace vectorized
class Expr;
@ -166,7 +166,7 @@ private:
friend class OlapScanNode;
friend class EsPredicate;
friend class RowGroupReader;
friend class vectorized::RowGroupReader;
friend class vectorized::ParquetReader;
friend class vectorized::VOlapScanNode;
/// FunctionContexts for each registered expression. The FunctionContexts are created

View File

@ -21,43 +21,62 @@
namespace doris::vectorized {
ParquetFileHdfsScanner::ParquetFileHdfsScanner(RuntimeState* state, RuntimeProfile* profile,
const TFileScanRangeParams& params,
const std::vector<TFileRangeDesc>& ranges,
const std::vector<TExpr>& pre_filter_texprs,
ScannerCounter* counter)
: HdfsFileScanner(state, profile, params, ranges, pre_filter_texprs, counter) {}
Status ParquetFileHdfsScanner::open() {
RETURN_IF_ERROR(FileScanner::open());
if (_ranges.empty()) {
return Status::OK();
}
RETURN_IF_ERROR(_get_next_reader(_next_range));
return Status();
}
void ParquetFileHdfsScanner::_init_profiles(RuntimeProfile* profile) {}
Status ParquetFileHdfsScanner::get_next(vectorized::Block* block, bool* eof) {
// todo: get block from queue
auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId);
if (_next_range >= _ranges.size()) {
_scanner_eof = true;
if (_next_range >= _ranges.size() || _scanner_eof) {
*eof = true;
return Status::OK();
}
const TFileRangeDesc& range = _ranges[_next_range++];
std::unique_ptr<FileReader> file_reader;
RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(), _profile, _params, range,
file_reader));
_reader.reset(new ParquetReader(file_reader.release(), _file_slot_descs.size(),
range.start_offset, range.size));
Status status =
_reader->init_reader(tuple_desc, _file_slot_descs, _conjunct_ctxs, _state->timezone());
if (!status.ok()) {
_scanner_eof = true;
return Status::OK();
}
while (_reader->has_next()) {
Status st = _reader->read_next_batch(block);
if (st.is_end_of_file()) {
break;
}
RETURN_IF_ERROR(init_block(block));
bool range_eof = false;
RETURN_IF_ERROR(_reader->read_next_batch(block, &range_eof));
if (range_eof) {
RETURN_IF_ERROR(_get_next_reader(_next_range++));
}
return Status::OK();
}
void ParquetFileHdfsScanner::close() {}
Status ParquetFileHdfsScanner::_get_next_reader(int _next_range) {
const TFileRangeDesc& range = _ranges[_next_range];
_current_range_offset = range.start_offset;
std::unique_ptr<FileReader> file_reader;
RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(), _profile, _params, range,
file_reader));
_reader.reset(new ParquetReader(file_reader.release(), _file_slot_descs.size(),
_state->query_options().batch_size, range.start_offset,
range.size));
auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId);
Status status =
_reader->init_reader(tuple_desc, _file_slot_descs, _conjunct_ctxs, _state->timezone());
if (!status.ok()) {
if (status.is_end_of_file()) {
_scanner_eof = true;
return Status::OK();
}
return status;
}
return Status::OK();
}
void ParquetFileHdfsScanner::_prefetch_batch() {
// 1. call file reader next batch
// 2. push batch to queue, when get_next is called, pop batch
void ParquetFileHdfsScanner::close() {
FileScanner::close();
}
} // namespace doris::vectorized

View File

@ -24,21 +24,34 @@
namespace doris::vectorized {
class HdfsFileScanner : public FileScanner {};
class HdfsFileScanner : public FileScanner {
public:
HdfsFileScanner(RuntimeState* state, RuntimeProfile* profile,
const TFileScanRangeParams& params, const std::vector<TFileRangeDesc>& ranges,
const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
: FileScanner(state, profile, params, ranges, pre_filter_texprs, counter) {};
};
class ParquetFileHdfsScanner : public HdfsFileScanner {
public:
ParquetFileHdfsScanner(RuntimeState* state, RuntimeProfile* profile,
const TFileScanRangeParams& params,
const std::vector<TFileRangeDesc>& ranges,
const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
Status open() override;
Status get_next(vectorized::Block* block, bool* eof) override;
void close() override;
protected:
void _init_profiles(RuntimeProfile* profile) override;
private:
void _prefetch_batch();
Status _get_next_reader(int _next_range);
private:
std::shared_ptr<ParquetReader> _reader;
int64_t _current_range_offset;
};
} // namespace doris::vectorized

View File

@ -30,6 +30,7 @@
#include "util/thread.h"
#include "util/types.h"
#include "vec/exec/file_arrow_scanner.h"
#include "vec/exec/file_hdfs_scanner.h"
#include "vec/exec/file_text_scanner.h"
#include "vec/exprs/vcompound_pred.h"
#include "vec/exprs/vexpr.h"
@ -471,6 +472,8 @@ std::unique_ptr<FileScanner> FileScanNode::create_scanner(const TFileScanRange&
case TFileFormatType::FORMAT_PARQUET:
scan = new VFileParquetScanner(_runtime_state, runtime_profile(), scan_range.params,
scan_range.ranges, _pre_filter_texprs, counter);
// scan = new ParquetFileHdfsScanner(_runtime_state, runtime_profile(), scan_range.params,
// scan_range.ranges, _pre_filter_texprs, counter);
break;
case TFileFormatType::FORMAT_ORC:
scan = new VFileORCScanner(_runtime_state, runtime_profile(), scan_range.params,

View File

@ -79,8 +79,8 @@ namespace doris::vectorized {
return true; \
}
bool RowGroupReader::_eval_in_val(PrimitiveType conjunct_type, std::vector<void*> in_pred_values,
const char* min_bytes, const char* max_bytes) {
bool _eval_in_val(PrimitiveType conjunct_type, std::vector<void*> in_pred_values,
const char* min_bytes, const char* max_bytes) {
switch (conjunct_type) {
case TYPE_TINYINT: {
_FILTER_GROUP_BY_IN(int8_t, in_pred_values, min_bytes, max_bytes)
@ -125,8 +125,8 @@ bool RowGroupReader::_eval_in_val(PrimitiveType conjunct_type, std::vector<void*
return false;
}
void RowGroupReader::_eval_in_predicate(ExprContext* ctx, const char* min_bytes,
const char* max_bytes, bool& need_filter) {
void ParquetReader::_eval_in_predicate(ExprContext* ctx, const char* min_bytes,
const char* max_bytes, bool& need_filter) {
Expr* conjunct = ctx->root();
std::vector<void*> in_pred_values;
const InPredicate* pred = static_cast<const InPredicate*>(conjunct);
@ -150,8 +150,8 @@ void RowGroupReader::_eval_in_predicate(ExprContext* ctx, const char* min_bytes,
}
}
bool RowGroupReader::_eval_eq(PrimitiveType conjunct_type, void* value, const char* min_bytes,
const char* max_bytes) {
bool _eval_eq(PrimitiveType conjunct_type, void* value, const char* min_bytes,
const char* max_bytes) {
switch (conjunct_type) {
case TYPE_TINYINT: {
_PLAIN_DECODE(int16_t, value, min_bytes, max_bytes, conjunct_value, min, max)
@ -200,7 +200,7 @@ bool RowGroupReader::_eval_eq(PrimitiveType conjunct_type, void* value, const ch
return false;
}
bool RowGroupReader::_eval_gt(PrimitiveType conjunct_type, void* value, const char* max_bytes) {
bool _eval_gt(PrimitiveType conjunct_type, void* value, const char* max_bytes) {
switch (conjunct_type) {
case TYPE_TINYINT: {
_PLAIN_DECODE_SINGLE(int8_t, value, max_bytes, conjunct_value, max)
@ -250,7 +250,7 @@ bool RowGroupReader::_eval_gt(PrimitiveType conjunct_type, void* value, const ch
return false;
}
bool RowGroupReader::_eval_ge(PrimitiveType conjunct_type, void* value, const char* max_bytes) {
bool _eval_ge(PrimitiveType conjunct_type, void* value, const char* max_bytes) {
switch (conjunct_type) {
case TYPE_TINYINT: {
_PLAIN_DECODE_SINGLE(int8_t, value, max_bytes, conjunct_value, max)
@ -300,7 +300,7 @@ bool RowGroupReader::_eval_ge(PrimitiveType conjunct_type, void* value, const ch
return false;
}
bool RowGroupReader::_eval_lt(PrimitiveType conjunct_type, void* value, const char* min_bytes) {
bool _eval_lt(PrimitiveType conjunct_type, void* value, const char* min_bytes) {
switch (conjunct_type) {
case TYPE_TINYINT: {
_PLAIN_DECODE_SINGLE(int8_t, value, min_bytes, conjunct_value, min)
@ -350,7 +350,7 @@ bool RowGroupReader::_eval_lt(PrimitiveType conjunct_type, void* value, const ch
return false;
}
bool RowGroupReader::_eval_le(PrimitiveType conjunct_type, void* value, const char* min_bytes) {
bool _eval_le(PrimitiveType conjunct_type, void* value, const char* min_bytes) {
switch (conjunct_type) {
case TYPE_TINYINT: {
_PLAIN_DECODE_SINGLE(int8_t, value, min_bytes, conjunct_value, min)
@ -400,8 +400,8 @@ bool RowGroupReader::_eval_le(PrimitiveType conjunct_type, void* value, const ch
return false;
}
void RowGroupReader::_eval_binary_predicate(ExprContext* ctx, const char* min_bytes,
const char* max_bytes, bool& need_filter) {
void ParquetReader::_eval_binary_predicate(ExprContext* ctx, const char* min_bytes,
const char* max_bytes, bool& need_filter) {
Expr* conjunct = ctx->root();
Expr* expr = conjunct->get_child(1);
if (expr == nullptr) {
@ -433,9 +433,9 @@ void RowGroupReader::_eval_binary_predicate(ExprContext* ctx, const char* min_by
}
}
bool RowGroupReader::_determine_filter_row_group(const std::vector<ExprContext*>& conjuncts,
const std::string& encoded_min,
const std::string& encoded_max) {
bool ParquetReader::_determine_filter_min_max(const std::vector<ExprContext*>& conjuncts,
const std::string& encoded_min,
const std::string& encoded_max) {
const char* min_bytes = encoded_min.data();
const char* max_bytes = encoded_max.data();
bool need_filter = false;

View File

@ -67,9 +67,4 @@ Status parse_thrift_footer(FileReader* file, std::shared_ptr<FileMetaData>& file
RETURN_IF_ERROR(file_metadata->init_schema());
return Status::OK();
}
// Status parse_page_header() {
// uint8_t* page_buf;
//
// }
} // namespace doris::vectorized

View File

@ -32,6 +32,7 @@ Status ColumnChunkReader::init() {
? _metadata.dictionary_page_offset
: _metadata.data_page_offset;
size_t chunk_size = _metadata.total_compressed_size;
VLOG_DEBUG << "create _page_reader";
_page_reader = std::make_unique<PageReader>(_stream_reader, start_offset, chunk_size);
if (_metadata.__isset.dictionary_page_offset) {
@ -43,12 +44,13 @@ Status ColumnChunkReader::init() {
// get the block compression codec
RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, _block_compress_codec));
VLOG_DEBUG << "initColumnChunkReader finish";
return Status::OK();
}
Status ColumnChunkReader::next_page() {
RETURN_IF_ERROR(_page_reader->next_page());
_num_values = _page_reader->get_page_header()->data_page_header.num_values;
RETURN_IF_ERROR(_page_reader->next_page_header());
_remaining_num_values = _page_reader->get_page_header()->data_page_header.num_values;
return Status::OK();
}
@ -72,12 +74,12 @@ Status ColumnChunkReader::load_page_data() {
if (_max_rep_level > 0) {
RETURN_IF_ERROR(_rep_level_decoder.init(&_page_data,
header.data_page_header.repetition_level_encoding,
_max_rep_level, _num_values));
_max_rep_level, _remaining_num_values));
}
if (_max_def_level > 0) {
RETURN_IF_ERROR(_def_level_decoder.init(&_page_data,
header.data_page_header.definition_level_encoding,
_max_def_level, _num_values));
_max_def_level, _remaining_num_values));
}
auto encoding = header.data_page_header.encoding;
@ -85,6 +87,7 @@ Status ColumnChunkReader::load_page_data() {
if (encoding == tparquet::Encoding::PLAIN_DICTIONARY) {
encoding = tparquet::Encoding::RLE_DICTIONARY;
}
// Reuse page decoder
if (_decoders.find(static_cast<int>(encoding)) != _decoders.end()) {
_page_decoder = _decoders[static_cast<int>(encoding)].get();
@ -104,7 +107,7 @@ Status ColumnChunkReader::load_page_data() {
Status ColumnChunkReader::_decode_dict_page() {
int64_t dict_offset = _metadata.dictionary_page_offset;
_page_reader->seek_to_page(dict_offset);
_page_reader->next_page();
_page_reader->next_page_header();
const tparquet::PageHeader& header = *_page_reader->get_page_header();
DCHECK_EQ(tparquet::PageType::DICTIONARY_PAGE, header.type);
// TODO(gaoxin): decode dictionary page
@ -119,10 +122,10 @@ void ColumnChunkReader::_reserve_decompress_buf(size_t size) {
}
Status ColumnChunkReader::skip_values(size_t num_values) {
if (UNLIKELY(_num_values < num_values)) {
if (UNLIKELY(_remaining_num_values < num_values)) {
return Status::IOError("Skip too many values in current page");
}
_num_values -= num_values;
_remaining_num_values -= num_values;
return _page_decoder->skip_values(num_values);
}
@ -138,27 +141,27 @@ size_t ColumnChunkReader::get_def_levels(level_t* levels, size_t n) {
Status ColumnChunkReader::decode_values(ColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_values) {
if (UNLIKELY(_num_values < num_values)) {
if (UNLIKELY(_remaining_num_values < num_values)) {
return Status::IOError("Decode too many values in current page");
}
_num_values -= num_values;
_remaining_num_values -= num_values;
return _page_decoder->decode_values(doris_column, data_type, num_values);
}
Status ColumnChunkReader::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_values) {
if (UNLIKELY(_num_values < num_values)) {
if (UNLIKELY(_remaining_num_values < num_values)) {
return Status::IOError("Decode too many values in current page");
}
_num_values -= num_values;
_remaining_num_values -= num_values;
return _page_decoder->decode_values(doris_column, data_type, num_values);
}
Status ColumnChunkReader::decode_values(Slice& slice, size_t num_values) {
if (UNLIKELY(_num_values < num_values)) {
if (UNLIKELY(_remaining_num_values < num_values)) {
return Status::IOError("Decode too many values in current page");
}
_num_values -= num_values;
_remaining_num_values -= num_values;
return _page_decoder->decode_values(slice, num_values);
}

View File

@ -85,10 +85,10 @@ public:
// and initialize the repetition and definition level decoder for current page data.
Status load_page_data();
// The remaining number of values in current page(including null values). Decreased when reading or skipping.
uint32_t num_values() const { return _num_values; };
uint32_t remaining_num_values() const { return _remaining_num_values; };
// null values are not analyzing from definition levels
// the caller should maintain the consistency after analyzing null values from definition levels.
void dec_num_values(uint32_t dec_num) { _num_values -= dec_num; };
void dec_num_values(uint32_t dec_num) { _remaining_num_values -= dec_num; };
// Get the raw data of current page.
Slice& get_page_data() { return _page_data; }
@ -116,6 +116,7 @@ private:
FieldSchema* _field_schema;
level_t _max_rep_level;
level_t _max_def_level;
tparquet::LogicalType _parquet_logical_type;
BufferedStreamReader* _stream_reader;
// tparquet::ColumnChunk* _column_chunk;
@ -127,7 +128,7 @@ private:
LevelDecoder _rep_level_decoder;
LevelDecoder _def_level_decoder;
uint32_t _num_values = 0;
uint32_t _remaining_num_values = 0;
Slice _page_data;
std::unique_ptr<uint8_t[]> _decompress_buf;
size_t _decompress_buf_size = 0;

View File

@ -19,50 +19,83 @@
#include <common/status.h>
#include <gen_cpp/parquet_types.h>
#include <vec/columns/columns_number.h>
#include "schema_desc.h"
#include "vparquet_column_chunk_reader.h"
namespace doris::vectorized {
Status ScalarColumnReader::init(const FileReader* file, const FieldSchema* field,
const tparquet::ColumnChunk* chunk, const TypeDescriptor& col_type,
int64_t chunk_size) {
// todo1: init column chunk reader
// BufferedFileStreamReader stream_reader(reader, 0, chunk_size);
// _chunk_reader(&stream_reader, chunk, field);
// _chunk_reader.init();
return Status();
}
Status ParquetColumnReader::create(const FileReader* file, int64_t chunk_size,
const FieldSchema* field, const ParquetReadColumn& column,
const TypeDescriptor& col_type,
Status ParquetColumnReader::create(FileReader* file, FieldSchema* field,
const ParquetReadColumn& column,
const tparquet::RowGroup& row_group,
const ParquetColumnReader* reader) {
std::vector<RowRange>& row_ranges,
std::unique_ptr<ParquetColumnReader>& reader) {
if (field->type.type == TYPE_MAP || field->type.type == TYPE_STRUCT) {
return Status::Corruption("not supported type");
}
if (field->type.type == TYPE_ARRAY) {
return Status::Corruption("not supported array type yet");
} else {
VLOG_DEBUG << "field->physical_column_index: " << field->physical_column_index;
tparquet::ColumnChunk chunk = row_group.columns[field->physical_column_index];
ScalarColumnReader* scalar_reader = new ScalarColumnReader(column);
RETURN_IF_ERROR(scalar_reader->init(file, field,
&row_group.columns[field->physical_column_index],
col_type, chunk_size));
reader = scalar_reader;
scalar_reader->init_column_metadata(chunk);
RETURN_IF_ERROR(scalar_reader->init(file, field, &chunk, row_ranges));
reader.reset(scalar_reader);
}
return Status::OK();
}
Status ScalarColumnReader::read_column_data(const tparquet::RowGroup& row_group_meta,
ColumnPtr* data) {
// todo2: read data with chunk reader to load page data
// while (_chunk_reader.has_next) {
// _chunk_reader.next_page();
// _chunk_reader.load_page_data();
// }
return Status();
void ParquetColumnReader::init_column_metadata(const tparquet::ColumnChunk& chunk) {
auto chunk_meta = chunk.meta_data;
int64_t chunk_start = chunk_meta.__isset.dictionary_page_offset
? chunk_meta.dictionary_page_offset
: chunk_meta.data_page_offset;
size_t chunk_len = chunk_meta.total_compressed_size;
_metadata.reset(new ParquetColumnMetadata(chunk_start, chunk_len, chunk_meta));
}
void ParquetColumnReader::_skipped_pages() {}
Status ScalarColumnReader::init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* chunk,
std::vector<RowRange>& row_ranges) {
BufferedFileStreamReader stream_reader(file, _metadata->start_offset(), _metadata->size());
_row_ranges.reset(&row_ranges);
_chunk_reader.reset(new ColumnChunkReader(&stream_reader, chunk, field));
_chunk_reader->init();
return Status::OK();
}
Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
size_t batch_size, size_t* read_rows, bool* eof) {
if (_chunk_reader->remaining_num_values() <= 0) {
// seek to next page header
_chunk_reader->next_page();
if (_row_ranges->size() != 0) {
_skipped_pages();
}
// load data to decoder
_chunk_reader->load_page_data();
}
size_t read_values = _chunk_reader->remaining_num_values() < batch_size
? _chunk_reader->remaining_num_values()
: batch_size;
*read_rows = read_values;
WhichDataType which_type(type);
switch (_metadata->t_metadata().type) {
case tparquet::Type::INT32: {
_chunk_reader->decode_values(doris_column, type, read_values);
return Status::OK();
}
case tparquet::Type::INT64: {
// todo: test int64
return Status::OK();
}
default:
return Status::Corruption("unsupported parquet data type");
}
return Status::OK();
}
void ScalarColumnReader::close() {}

View File

@ -20,36 +20,78 @@
#include <gen_cpp/parquet_types.h>
#include "schema_desc.h"
#include "vparquet_column_chunk_reader.h"
#include "vparquet_reader.h"
//#include "vparquet_column_chunk_reader.h"
namespace doris::vectorized {
struct RowRange;
class ParquetReadColumn;
class ParquetColumnMetadata {
public:
ParquetColumnMetadata(int64_t chunk_start_offset, int64_t chunk_length,
tparquet::ColumnMetaData metadata)
: _chunk_start_offset(chunk_start_offset),
_chunk_length(chunk_length),
_metadata(metadata) {};
~ParquetColumnMetadata() = default;
int64_t start_offset() const { return _chunk_start_offset; };
int64_t size() const { return _chunk_length; };
tparquet::ColumnMetaData t_metadata() { return _metadata; };
private:
int64_t _chunk_start_offset;
int64_t _chunk_length;
tparquet::ColumnMetaData _metadata;
};
class ParquetColumnReader {
public:
ParquetColumnReader(const ParquetReadColumn& column) : _column(column) {};
virtual ~ParquetColumnReader() = 0;
virtual Status read_column_data(const tparquet::RowGroup& row_group_meta, ColumnPtr* data) = 0;
static Status create(const FileReader* file, int64_t chunk_size, const FieldSchema* field,
const ParquetReadColumn& column, const TypeDescriptor& col_type,
const tparquet::RowGroup& row_group, const ParquetColumnReader* reader);
virtual ~ParquetColumnReader() = default;
virtual Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, size_t batch_size,
size_t* read_rows, bool* eof) = 0;
static Status create(FileReader* file, FieldSchema* field, const ParquetReadColumn& column,
const tparquet::RowGroup& row_group, std::vector<RowRange>& row_ranges,
std::unique_ptr<ParquetColumnReader>& reader);
void init_column_metadata(const tparquet::ColumnChunk& chunk);
virtual void close() = 0;
protected:
void _skipped_pages();
protected:
const ParquetReadColumn& _column;
// const ColumnChunkReader& _chunk_reader;
std::unique_ptr<ParquetColumnMetadata> _metadata;
std::unique_ptr<std::vector<RowRange>> _row_ranges;
};
class ScalarColumnReader : public ParquetColumnReader {
public:
ScalarColumnReader(const ParquetReadColumn& column) : ParquetColumnReader(column) {};
~ScalarColumnReader() override = default;
Status init(const FileReader* file, const FieldSchema* field,
const tparquet::ColumnChunk* chunk, const TypeDescriptor& col_type,
int64_t chunk_size);
Status read_column_data(const tparquet::RowGroup& row_group_meta, ColumnPtr* data) override;
Status init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* chunk,
std::vector<RowRange>& row_ranges);
Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, size_t batch_size,
size_t* read_rows, bool* eof) override;
void close() override;
private:
std::unique_ptr<ColumnChunkReader> _chunk_reader;
};
//class ArrayColumnReader : public ParquetColumnReader {
//public:
// ArrayColumnReader(const ParquetReadColumn& column) : ParquetColumnReader(column) {};
// ~ArrayColumnReader() override = default;
// Status init(FileReader* file, FieldSchema* field,
// tparquet::ColumnChunk* chunk, const TypeDescriptor& col_type,
// int64_t chunk_size);
// Status read_column_data(ColumnPtr* data) override;
// void close() override;
//private:
// std::unique_ptr<ColumnChunkReader> _chunk_reader;
//};
}; // namespace doris::vectorized

View File

@ -39,7 +39,7 @@ Status FileMetaData::init_schema() {
return Status();
}
const tparquet::FileMetaData& FileMetaData::to_thrift_metadata() {
tparquet::FileMetaData& FileMetaData::to_thrift_metadata() {
return _metadata;
}

View File

@ -27,7 +27,7 @@ public:
FileMetaData(tparquet::FileMetaData& metadata);
~FileMetaData() = default;
Status init_schema();
const tparquet::FileMetaData& to_thrift_metadata();
tparquet::FileMetaData& to_thrift_metadata();
int32_t num_row_groups() const { return _num_groups; }
int32_t num_columns() const { return _num_columns; };
int32_t num_rows() const { return _num_rows; };

View File

@ -24,235 +24,61 @@
namespace doris::vectorized {
RowGroupReader::RowGroupReader(doris::FileReader* file_reader,
const std::shared_ptr<FileMetaData>& file_metadata,
const std::vector<ParquetReadColumn>& read_columns,
const std::map<std::string, int>& map_column,
const std::vector<ExprContext*>& conjunct_ctxs)
const int32_t row_group_id, tparquet::RowGroup& row_group)
: _file_reader(file_reader),
_file_metadata(file_metadata),
_read_columns(read_columns),
_map_column(map_column),
_conjunct_ctxs(conjunct_ctxs),
_current_row_group(-1) {}
_row_group_id(row_group_id),
_row_group_meta(row_group),
_total_rows(row_group.num_rows) {}
RowGroupReader::~RowGroupReader() {
for (auto& column_reader : _column_readers) {
auto reader = column_reader.second;
reader->close();
delete reader;
reader = nullptr;
}
_column_readers.clear();
}
Status RowGroupReader::init(const TupleDescriptor* tuple_desc, int64_t split_start_offset,
int64_t split_size) {
_tuple_desc = tuple_desc;
_split_start_offset = split_start_offset;
_split_size = split_size;
_init_conjuncts(tuple_desc, _conjunct_ctxs);
RETURN_IF_ERROR(_init_column_readers());
Status RowGroupReader::init(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges) {
VLOG_DEBUG << "Row group id: " << _row_group_id;
RETURN_IF_ERROR(_init_column_readers(schema, row_ranges));
return Status::OK();
}
void RowGroupReader::_init_conjuncts(const TupleDescriptor* tuple_desc,
const std::vector<ExprContext*>& conjunct_ctxs) {
if (tuple_desc->slots().empty()) {
return;
}
for (auto& read_col : _read_columns) {
_parquet_column_ids.emplace(read_col.parquet_column_id);
}
for (int i = 0; i < tuple_desc->slots().size(); i++) {
auto col_iter = _map_column.find(tuple_desc->slots()[i]->col_name());
if (col_iter == _map_column.end()) {
continue;
}
int parquet_col_id = col_iter->second;
if (_parquet_column_ids.end() == _parquet_column_ids.find(parquet_col_id)) {
continue;
}
for (int conj_idx = 0; conj_idx < conjunct_ctxs.size(); conj_idx++) {
Expr* conjunct = conjunct_ctxs[conj_idx]->root();
if (conjunct->get_num_children() == 0) {
continue;
}
Expr* raw_slot = conjunct->get_child(0);
if (TExprNodeType::SLOT_REF != raw_slot->node_type()) {
continue;
}
SlotRef* slot_ref = (SlotRef*)raw_slot;
SlotId conjunct_slot_id = slot_ref->slot_id();
if (conjunct_slot_id == tuple_desc->slots()[i]->id()) {
// Get conjuncts by conjunct_slot_id
auto iter = _slot_conjuncts.find(conjunct_slot_id);
if (_slot_conjuncts.end() == iter) {
std::vector<ExprContext*> conjuncts;
conjuncts.emplace_back(conjunct_ctxs[conj_idx]);
_slot_conjuncts.emplace(std::make_pair(conjunct_slot_id, conjuncts));
} else {
std::vector<ExprContext*> conjuncts = iter->second;
conjuncts.emplace_back(conjunct_ctxs[conj_idx]);
}
}
}
}
}
Status RowGroupReader::_init_column_readers() {
Status RowGroupReader::_init_column_readers(const FieldDescriptor& schema,
std::vector<RowRange>& row_ranges) {
for (auto& read_col : _read_columns) {
SlotDescriptor* slot_desc = read_col.slot_desc;
FieldDescriptor schema = _file_metadata->schema();
TypeDescriptor col_type = slot_desc->type();
const auto& field = schema.get_column(slot_desc->col_name());
const tparquet::RowGroup row_group =
_file_metadata->to_thrift_metadata().row_groups[_current_row_group];
ParquetColumnReader* reader = nullptr;
RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, MAX_PARQUET_BLOCK_SIZE, field,
read_col, slot_desc->type(), row_group,
reader));
auto field = const_cast<FieldSchema*>(schema.get_column(slot_desc->col_name()));
VLOG_DEBUG << "field: " << field->debug_string();
std::unique_ptr<ParquetColumnReader> reader;
RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, field, read_col, _row_group_meta,
row_ranges, reader));
if (reader == nullptr) {
VLOG_DEBUG << "Init row group reader failed";
return Status::Corruption("Init row group reader failed");
}
_column_readers[slot_desc->id()] = reader;
_column_readers[slot_desc->id()] = std::move(reader);
}
return Status::OK();
}
Status RowGroupReader::fill_columns_data(Block* block, const int32_t group_id) {
// get ColumnWithTypeAndName from src_block
Status RowGroupReader::next_batch(Block* block, size_t batch_size, bool* _batch_eof) {
if (_read_rows >= _total_rows) {
*_batch_eof = true;
}
for (auto& read_col : _read_columns) {
const tparquet::RowGroup row_group =
_file_metadata->to_thrift_metadata().row_groups[_current_row_group];
auto& column_with_type_and_name = block->get_by_name(read_col.slot_desc->col_name());
RETURN_IF_ERROR(_column_readers[read_col.slot_desc->id()]->read_column_data(
row_group, &column_with_type_and_name.column));
VLOG_DEBUG << column_with_type_and_name.name;
auto slot_desc = read_col.slot_desc;
auto& column_with_type_and_name = block->get_by_name(slot_desc->col_name());
auto column_ptr = column_with_type_and_name.column;
auto column_type = column_with_type_and_name.type;
size_t batch_read_rows = 0;
RETURN_IF_ERROR(_column_readers[slot_desc->id()]->read_column_data(
column_ptr, column_type, batch_size, &batch_read_rows, _batch_eof));
_read_rows += batch_read_rows;
VLOG_DEBUG << "read column: " << column_with_type_and_name.name;
VLOG_DEBUG << "read rows in column: " << batch_read_rows;
}
// use data fill utils read column data to column ptr
return Status::OK();
}
Status RowGroupReader::get_next_row_group(const int32_t* group_id) {
int32_t total_group = _file_metadata->num_row_groups();
if (total_group == 0 || _file_metadata->num_rows() == 0 || _split_size < 0) {
return Status::EndOfFile("No row group need read");
}
while (_current_row_group < total_group) {
_current_row_group++;
const tparquet::RowGroup& row_group =
_file_metadata->to_thrift_metadata().row_groups[_current_row_group];
if (!_is_misaligned_range_group(row_group)) {
continue;
}
bool filter_group = false;
RETURN_IF_ERROR(_process_row_group_filter(row_group, _conjunct_ctxs, &filter_group));
if (!filter_group) {
group_id = &_current_row_group;
break;
}
}
return Status::OK();
}
bool RowGroupReader::_is_misaligned_range_group(const tparquet::RowGroup& row_group) {
int64_t start_offset = _get_column_start_offset(row_group.columns[0].meta_data);
auto last_column = row_group.columns[row_group.columns.size() - 1].meta_data;
int64_t end_offset = _get_column_start_offset(last_column) + last_column.total_compressed_size;
int64_t row_group_mid = start_offset + (end_offset - start_offset) / 2;
if (!(row_group_mid >= _split_start_offset &&
row_group_mid < _split_start_offset + _split_size)) {
return true;
}
return false;
}
Status RowGroupReader::_process_row_group_filter(const tparquet::RowGroup& row_group,
const std::vector<ExprContext*>& conjunct_ctxs,
bool* filter_group) {
_process_column_stat_filter(row_group, conjunct_ctxs, filter_group);
_init_chunk_dicts();
RETURN_IF_ERROR(_process_dict_filter(filter_group));
_init_bloom_filter();
RETURN_IF_ERROR(_process_bloom_filter(filter_group));
return Status::OK();
}
Status RowGroupReader::_process_column_stat_filter(const tparquet::RowGroup& row_group,
const std::vector<ExprContext*>& conjunct_ctxs,
bool* filter_group) {
int total_group = _file_metadata->num_row_groups();
// It will not filter if head_group_offset equals tail_group_offset
int64_t total_rows = 0;
int64_t total_bytes = 0;
for (int row_group_id = 0; row_group_id < total_group; row_group_id++) {
total_rows += row_group.num_rows;
total_bytes += row_group.total_byte_size;
for (SlotId slot_id = 0; slot_id < _tuple_desc->slots().size(); slot_id++) {
const std::string& col_name = _tuple_desc->slots()[slot_id]->col_name();
auto col_iter = _map_column.find(col_name);
if (col_iter == _map_column.end()) {
continue;
}
int parquet_col_id = col_iter->second;
if (_parquet_column_ids.end() == _parquet_column_ids.find(parquet_col_id)) {
// Column not exist in parquet file
continue;
}
auto slot_iter = _slot_conjuncts.find(slot_id);
if (slot_iter == _slot_conjuncts.end()) {
continue;
}
auto statistic = row_group.columns[parquet_col_id].meta_data.statistics;
if (!statistic.__isset.max || !statistic.__isset.min) {
continue;
}
// Min-max of statistic is plain-encoded value
*filter_group =
_determine_filter_row_group(slot_iter->second, statistic.min, statistic.max);
if (*filter_group) {
_filtered_num_row_groups++;
VLOG_DEBUG << "Filter row group id: " << row_group_id;
break;
}
}
}
VLOG_DEBUG << "DEBUG total_rows: " << total_rows;
VLOG_DEBUG << "DEBUG total_bytes: " << total_bytes;
VLOG_DEBUG << "Parquet file: " << _file_metadata->schema().debug_string()
<< ", Num of read row group: " << total_group
<< ", and num of skip row group: " << _filtered_num_row_groups;
return Status::OK();
}
void RowGroupReader::_init_chunk_dicts() {}
Status RowGroupReader::_process_dict_filter(bool* filter_group) {
return Status();
}
void RowGroupReader::_init_bloom_filter() {}
Status RowGroupReader::_process_bloom_filter(bool* filter_group) {
RETURN_IF_ERROR(_file_reader->seek(0));
return Status();
}
int64_t RowGroupReader::_get_row_group_start_offset(const tparquet::RowGroup& row_group) {
if (row_group.__isset.file_offset) {
return row_group.file_offset;
}
return row_group.columns[0].meta_data.data_page_offset;
}
int64_t RowGroupReader::_get_column_start_offset(const tparquet::ColumnMetaData& column) {
if (column.__isset.dictionary_page_offset) {
DCHECK_LT(column.dictionary_page_offset, column.data_page_offset);
return column.dictionary_page_offset;
}
return column.data_page_offset;
}
} // namespace doris::vectorized

View File

@ -24,87 +24,30 @@
#include "vparquet_file_metadata.h"
#include "vparquet_reader.h"
#define MAX_PARQUET_BLOCK_SIZE 1024
namespace doris::vectorized {
class ParquetReadColumn;
class ParquetColumnReader;
struct RowRange;
class RowGroupReader {
public:
RowGroupReader(doris::FileReader* file_reader,
const std::shared_ptr<FileMetaData>& file_metadata,
const std::vector<ParquetReadColumn>& read_columns,
const std::map<std::string, int>& map_column,
const std::vector<ExprContext*>& conjunct_ctxs);
const std::vector<ParquetReadColumn>& read_columns, const int32_t _row_group_id,
tparquet::RowGroup& row_group);
~RowGroupReader();
Status init(const TupleDescriptor* tuple_desc, int64_t split_start_offset, int64_t split_size);
Status get_next_row_group(const int32_t* group_id);
Status fill_columns_data(Block* block, const int32_t group_id);
Status init(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges);
Status next_batch(Block* block, size_t batch_size, bool* _batch_eof);
private:
bool _is_misaligned_range_group(const tparquet::RowGroup& row_group);
Status _process_column_stat_filter(const tparquet::RowGroup& row_group,
const std::vector<ExprContext*>& conjunct_ctxs,
bool* filter_group);
void _init_conjuncts(const TupleDescriptor* tuple_desc,
const std::vector<ExprContext*>& conjunct_ctxs);
Status _init_column_readers();
Status _process_row_group_filter(const tparquet::RowGroup& row_group,
const std::vector<ExprContext*>& conjunct_ctxs,
bool* filter_group);
void _init_chunk_dicts();
Status _process_dict_filter(bool* filter_group);
void _init_bloom_filter();
Status _process_bloom_filter(bool* filter_group);
int64_t _get_row_group_start_offset(const tparquet::RowGroup& row_group);
int64_t _get_column_start_offset(const tparquet::ColumnMetaData& column_init_column_readers);
bool _determine_filter_row_group(const std::vector<ExprContext*>& conjuncts,
const std::string& encoded_min,
const std::string& encoded_max);
void _eval_binary_predicate(ExprContext* ctx, const char* min_bytes, const char* max_bytes,
bool& need_filter);
void _eval_in_predicate(ExprContext* ctx, const char* min_bytes, const char* max_bytes,
bool& need_filter);
bool _eval_in_val(PrimitiveType conjunct_type, std::vector<void*> in_pred_values,
const char* min_bytes, const char* max_bytes);
bool _eval_eq(PrimitiveType conjunct_type, void* value, const char* min_bytes,
const char* max_bytes);
bool _eval_gt(PrimitiveType conjunct_type, void* value, const char* max_bytes);
bool _eval_ge(PrimitiveType conjunct_type, void* value, const char* max_bytes);
bool _eval_lt(PrimitiveType conjunct_type, void* value, const char* min_bytes);
bool _eval_le(PrimitiveType conjunct_type, void* value, const char* min_bytes);
Status _init_column_readers(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges);
private:
doris::FileReader* _file_reader;
const std::shared_ptr<FileMetaData>& _file_metadata;
std::unordered_map<int32_t, ParquetColumnReader*> _column_readers;
const TupleDescriptor* _tuple_desc; // get all slot info
std::unordered_map<int32_t, std::unique_ptr<ParquetColumnReader>> _column_readers;
const std::vector<ParquetReadColumn>& _read_columns;
const std::map<std::string, int>& _map_column;
std::unordered_set<int> _parquet_column_ids;
const std::vector<ExprContext*>& _conjunct_ctxs;
std::unordered_map<int, std::vector<ExprContext*>> _slot_conjuncts;
int64_t _split_start_offset;
int64_t _split_size;
int32_t _current_row_group;
int32_t _filtered_num_row_groups = 0;
const int32_t _row_group_id;
tparquet::RowGroup& _row_group_meta;
int64_t _read_rows = 0;
int64_t _total_rows;
};
} // namespace doris::vectorized

View File

@ -21,22 +21,22 @@
namespace doris::vectorized {
PageIndex::~PageIndex() {
if (_column_index != nullptr) {
delete _column_index;
_column_index = nullptr;
}
if (_offset_index != nullptr) {
delete _offset_index;
_offset_index = nullptr;
Status PageIndex::create_skipped_row_range(tparquet::OffsetIndex& offset_index,
int total_rows_of_group, int page_idx,
RowRange* row_range) {
const auto& page_locations = offset_index.page_locations;
DCHECK_LT(page_idx, page_locations.size());
row_range->first_row = page_locations[page_idx].first_row_index;
if (page_idx == page_locations.size() - 1) {
row_range->last_row = total_rows_of_group - 1;
} else {
row_range->last_row = page_locations[page_idx + 1].first_row_index - 1;
}
return Status::OK();
}
Status PageIndex::get_row_range_for_page() {
return Status();
}
Status PageIndex::collect_skipped_page_range() {
Status PageIndex::collect_skipped_page_range(std::vector<ExprContext*> conjuncts,
std::vector<int> page_range) {
return Status();
}
@ -67,20 +67,21 @@ bool PageIndex::check_and_get_page_index_ranges(const std::vector<tparquet::Colu
return has_page_index;
}
Status PageIndex::parse_column_index(const tparquet::ColumnChunk& chunk, const uint8_t* buff) {
Status PageIndex::parse_column_index(const tparquet::ColumnChunk& chunk, const uint8_t* buff,
tparquet::ColumnIndex* column_index) {
int64_t buffer_offset = chunk.column_index_offset - _column_index_start;
uint32_t length = chunk.column_index_length;
DCHECK_LE(buffer_offset + length, _column_index_size);
RETURN_IF_ERROR(deserialize_thrift_msg(buff + buffer_offset, &length, true, _column_index));
RETURN_IF_ERROR(deserialize_thrift_msg(buff + buffer_offset, &length, true, column_index));
return Status::OK();
}
Status PageIndex::parse_offset_index(const tparquet::ColumnChunk& chunk, const uint8_t* buff,
int64_t buffer_size) {
int64_t buffer_size, tparquet::OffsetIndex* offset_index) {
int64_t buffer_offset = chunk.offset_index_offset - _offset_index_start + _column_index_size;
uint32_t length = chunk.offset_index_length;
DCHECK_LE(buffer_offset + length, buffer_size);
RETURN_IF_ERROR(deserialize_thrift_msg(buff + buffer_offset, &length, true, _offset_index));
RETURN_IF_ERROR(deserialize_thrift_msg(buff + buffer_offset, &length, true, offset_index));
return Status::OK();
}

View File

@ -19,30 +19,32 @@
#include <common/status.h>
#include <gen_cpp/parquet_types.h>
#include "exprs/expr_context.h"
namespace doris::vectorized {
class ParquetReader;
struct RowRange;
class PageIndex {
public:
PageIndex() = default;
~PageIndex();
Status get_row_range_for_page();
Status collect_skipped_page_range();
~PageIndex() = default;
Status create_skipped_row_range(tparquet::OffsetIndex& offset_index, int total_rows_of_group,
int page_idx, RowRange* row_range);
Status collect_skipped_page_range(std::vector<ExprContext*> conjuncts,
std::vector<int> page_range);
bool check_and_get_page_index_ranges(const std::vector<tparquet::ColumnChunk>& columns);
Status parse_column_index(const tparquet::ColumnChunk& chunk, const uint8_t* buff);
Status parse_column_index(const tparquet::ColumnChunk& chunk, const uint8_t* buff,
tparquet::ColumnIndex* _column_index);
Status parse_offset_index(const tparquet::ColumnChunk& chunk, const uint8_t* buff,
int64_t buffer_size);
int64_t buffer_size, tparquet::OffsetIndex* _offset_index);
private:
private:
friend class ParquetReader;
int64_t _column_index_start;
int64_t _column_index_size;
int64_t _offset_index_start;
int64_t _offset_index_size;
tparquet::OffsetIndex* _offset_index;
tparquet::ColumnIndex* _column_index;
// row range define
};
} // namespace doris::vectorized

View File

@ -28,7 +28,7 @@ static constexpr size_t initPageHeaderSize = 1024;
PageReader::PageReader(BufferedStreamReader* reader, uint64_t offset, uint64_t length)
: _reader(reader), _start_offset(offset), _end_offset(offset + length) {}
Status PageReader::next_page() {
Status PageReader::next_page_header() {
if (_offset < _start_offset || _offset >= _end_offset) {
return Status::IOError("Out-of-bounds Access");
}

View File

@ -34,7 +34,7 @@ public:
bool has_next_page() const { return _offset < _end_offset; }
Status next_page();
Status next_page_header();
Status skip_page();

View File

@ -21,13 +21,14 @@
namespace doris::vectorized {
ParquetReader::ParquetReader(FileReader* file_reader, int32_t num_of_columns_from_file,
int64_t range_start_offset, int64_t range_size)
size_t batch_size, int64_t range_start_offset, int64_t range_size)
: _num_of_columns_from_file(num_of_columns_from_file),
_batch_size(batch_size),
_range_start_offset(range_start_offset),
_range_size(range_size) {
_file_reader = file_reader;
_total_groups = 0;
// _current_group = 0;
_current_row_group_id = 0;
// _statistics = std::make_shared<Statistics>();
}
@ -36,6 +37,10 @@ ParquetReader::~ParquetReader() {
}
void ParquetReader::close() {
for (auto& conjuncts : _slot_conjuncts) {
conjuncts.second.clear();
}
_slot_conjuncts.clear();
if (_file_reader != nullptr) {
_file_reader->close();
delete _file_reader;
@ -45,26 +50,26 @@ void ParquetReader::close() {
Status ParquetReader::init_reader(const TupleDescriptor* tuple_desc,
const std::vector<SlotDescriptor*>& tuple_slot_descs,
const std::vector<ExprContext*>& conjunct_ctxs,
std::vector<ExprContext*>& conjunct_ctxs,
const std::string& timezone) {
_file_reader->open();
_conjunct_ctxs.reset(&conjunct_ctxs);
RETURN_IF_ERROR(parse_thrift_footer(_file_reader, _file_metadata));
auto metadata = _file_metadata->to_thrift_metadata();
_total_groups = metadata.row_groups.size();
_t_metadata.reset(&_file_metadata->to_thrift_metadata());
_total_groups = _file_metadata->num_row_groups();
if (_total_groups == 0) {
return Status::EndOfFile("Empty Parquet File");
}
auto schema_desc = _file_metadata->schema();
for (int i = 0; i < _file_metadata->num_columns(); ++i) {
LOG(WARNING) << schema_desc.debug_string();
// for test
VLOG_DEBUG << schema_desc.debug_string();
// Get the Column Reader for the boolean column
_map_column.emplace(schema_desc.get_column(i)->name, i);
}
LOG(WARNING) << "";
RETURN_IF_ERROR(_init_read_columns(tuple_slot_descs));
RETURN_IF_ERROR(
_init_row_group_reader(tuple_desc, _range_start_offset, _range_size, conjunct_ctxs));
_init_row_group_readers(tuple_desc, _range_start_offset, _range_size, conjunct_ctxs));
return Status::OK();
}
@ -81,7 +86,7 @@ Status ParquetReader::_init_read_columns(const std::vector<SlotDescriptor*>& tup
} else {
std::stringstream str_error;
str_error << "Invalid Column Name:" << slot_desc->col_name();
LOG(WARNING) << str_error.str();
VLOG_DEBUG << str_error.str();
return Status::InvalidArgument(str_error.str());
}
ParquetReadColumn column;
@ -90,63 +95,231 @@ Status ParquetReader::_init_read_columns(const std::vector<SlotDescriptor*>& tup
auto physical_type = _file_metadata->schema().get_column(parquet_col_id)->physical_type;
column.parquet_type = physical_type;
_read_columns.emplace_back(column);
VLOG_DEBUG << "slot_desc " << slot_desc->debug_string();
}
return Status::OK();
}
Status ParquetReader::read_next_batch(Block* block) {
int32_t group_id = 0;
RETURN_IF_ERROR(_row_group_reader->get_next_row_group(&group_id));
auto metadata = _file_metadata->to_thrift_metadata();
auto column_chunks = metadata.row_groups[group_id].columns;
if (_has_page_index(column_chunks)) {
Status st = _process_page_index(column_chunks);
if (st.ok()) {
// todo: process filter page
return Status::OK();
} else {
// todo: record profile
LOG(WARNING) << "";
Status ParquetReader::read_next_batch(Block* block, bool* eof) {
DCHECK(_total_groups == _row_group_readers.size());
if (_total_groups == 0) {
*eof = true;
}
bool _batch_eof = false;
auto row_group_reader = _row_group_readers[_current_row_group_id];
RETURN_IF_ERROR(row_group_reader->next_batch(block, _batch_size, &_batch_eof));
if (_batch_eof) {
_current_row_group_id++;
if (_current_row_group_id > _total_groups) {
*eof = true;
}
}
// metadata has been processed, fill parquet data to block
// block is the batch data of a row group. a row group has N batch
// push to scanner queue
_fill_block_data(block, group_id);
return Status::OK();
}
void ParquetReader::_fill_block_data(Block* block, int group_id) {
// make and init src block here
// read column chunk
_row_group_reader->fill_columns_data(block, group_id);
Status ParquetReader::_init_row_group_readers(const TupleDescriptor* tuple_desc,
int64_t range_start_offset, int64_t range_size,
const std::vector<ExprContext*>& conjunct_ctxs) {
std::vector<int32_t> read_row_groups;
RETURN_IF_ERROR(_filter_row_groups(&read_row_groups));
_init_conjuncts(tuple_desc, conjunct_ctxs);
for (auto row_group_id : read_row_groups) {
VLOG_DEBUG << "_has_page_index";
auto row_group = _t_metadata->row_groups[row_group_id];
auto column_chunks = row_group.columns;
std::vector<RowRange> skipped_row_ranges;
if (_has_page_index(column_chunks)) {
VLOG_DEBUG << "_process_page_index";
RETURN_IF_ERROR(_process_page_index(row_group, skipped_row_ranges));
}
std::shared_ptr<RowGroupReader> row_group_reader;
row_group_reader.reset(
new RowGroupReader(_file_reader, _read_columns, row_group_id, row_group));
// todo: can filter row with candidate ranges rather than skipped ranges
RETURN_IF_ERROR(row_group_reader->init(_file_metadata->schema(), skipped_row_ranges));
_row_group_readers.emplace_back(row_group_reader);
}
VLOG_DEBUG << "_init_row_group_reader finished";
return Status::OK();
}
Status ParquetReader::_init_row_group_reader(const TupleDescriptor* tuple_desc,
int64_t range_start_offset, int64_t range_size,
const std::vector<ExprContext*>& conjunct_ctxs) {
// todo: extract as create()
_row_group_reader.reset(new RowGroupReader(_file_reader, _file_metadata, _read_columns,
_map_column, conjunct_ctxs));
RETURN_IF_ERROR(_row_group_reader->init(tuple_desc, range_start_offset, range_size));
void ParquetReader::_init_conjuncts(const TupleDescriptor* tuple_desc,
const std::vector<ExprContext*>& conjunct_ctxs) {
if (tuple_desc->slots().empty()) {
return;
}
std::unordered_set<int> parquet_col_ids(_include_column_ids.begin(), _include_column_ids.end());
for (int i = 0; i < tuple_desc->slots().size(); i++) {
auto col_iter = _map_column.find(tuple_desc->slots()[i]->col_name());
if (col_iter == _map_column.end()) {
continue;
}
int parquet_col_id = col_iter->second;
if (parquet_col_ids.end() == parquet_col_ids.find(parquet_col_id)) {
continue;
}
for (int conj_idx = 0; conj_idx < conjunct_ctxs.size(); conj_idx++) {
Expr* conjunct = conjunct_ctxs[conj_idx]->root();
if (conjunct->get_num_children() == 0) {
continue;
}
Expr* raw_slot = conjunct->get_child(0);
if (TExprNodeType::SLOT_REF != raw_slot->node_type()) {
continue;
}
SlotRef* slot_ref = (SlotRef*)raw_slot;
SlotId conjunct_slot_id = slot_ref->slot_id();
if (conjunct_slot_id == tuple_desc->slots()[i]->id()) {
// Get conjuncts by conjunct_slot_id
auto iter = _slot_conjuncts.find(conjunct_slot_id);
if (_slot_conjuncts.end() == iter) {
std::vector<ExprContext*> conjuncts;
conjuncts.emplace_back(conjunct_ctxs[conj_idx]);
_slot_conjuncts.emplace(std::make_pair(conjunct_slot_id, conjuncts));
} else {
std::vector<ExprContext*> conjuncts = iter->second;
conjuncts.emplace_back(conjunct_ctxs[conj_idx]);
}
}
}
}
}
Status ParquetReader::_filter_row_groups(std::vector<int32_t>* read_row_group_ids) {
if (_total_groups == 0 || _file_metadata->num_rows() == 0 || _range_size < 0) {
return Status::EndOfFile("No row group need read");
}
int32_t row_group_idx = -1;
while (row_group_idx < _total_groups) {
row_group_idx++;
const tparquet::RowGroup& row_group = _t_metadata->row_groups[row_group_idx];
if (_is_misaligned_range_group(row_group)) {
continue;
}
bool filter_group = false;
RETURN_IF_ERROR(_process_row_group_filter(row_group, &filter_group));
if (!filter_group) {
read_row_group_ids->emplace_back(row_group_idx);
break;
}
}
return Status::OK();
}
bool ParquetReader::_is_misaligned_range_group(const tparquet::RowGroup& row_group) {
int64_t start_offset = _get_column_start_offset(row_group.columns[0].meta_data);
auto last_column = row_group.columns[row_group.columns.size() - 1].meta_data;
int64_t end_offset = _get_column_start_offset(last_column) + last_column.total_compressed_size;
int64_t row_group_mid = start_offset + (end_offset - start_offset) / 2;
if (!(row_group_mid >= _range_start_offset &&
row_group_mid < _range_start_offset + _range_size)) {
return true;
}
return false;
}
bool ParquetReader::_has_page_index(std::vector<tparquet::ColumnChunk> columns) {
_page_index.reset(new PageIndex());
return _page_index->check_and_get_page_index_ranges(columns);
}
Status ParquetReader::_process_page_index(std::vector<tparquet::ColumnChunk> columns) {
Status ParquetReader::_process_page_index(tparquet::RowGroup& row_group,
std::vector<RowRange>& skipped_row_ranges) {
int64_t buffer_size = _page_index->_column_index_size + _page_index->_offset_index_size;
uint8_t buff[buffer_size];
for (auto col_id : _include_column_ids) {
auto chunk = columns[col_id];
RETURN_IF_ERROR(_page_index->parse_column_index(chunk, buff));
// todo: use page index filter min/max val
RETURN_IF_ERROR(_page_index->parse_offset_index(chunk, buff, buffer_size));
// todo: calculate row range
uint8_t buff[buffer_size];
auto chunk = row_group.columns[col_id];
tparquet::ColumnIndex column_index;
RETURN_IF_ERROR(_page_index->parse_column_index(chunk, buff, &column_index));
VLOG_DEBUG << "_column_index_size : " << _page_index->_column_index_size;
VLOG_DEBUG << "_page_index 0 max_values : " << column_index.max_values[0];
const int num_of_page = column_index.null_pages.size();
if (num_of_page <= 1) {
break;
}
auto conjunct_iter = _slot_conjuncts.find(col_id);
if (_slot_conjuncts.end() == conjunct_iter) {
continue;
}
auto conjuncts = conjunct_iter->second;
std::vector<int> candidate_page_range;
_page_index->collect_skipped_page_range(conjuncts, candidate_page_range);
tparquet::OffsetIndex offset_index;
RETURN_IF_ERROR(_page_index->parse_offset_index(chunk, buff, buffer_size, &offset_index));
VLOG_DEBUG << "page_locations size : " << offset_index.page_locations.size();
for (int page_id : candidate_page_range) {
RowRange skipped_row_range;
_page_index->create_skipped_row_range(offset_index, row_group.num_rows, page_id,
&skipped_row_range);
skipped_row_ranges.emplace_back(skipped_row_range);
}
}
return Status::OK();
}
Status ParquetReader::_process_row_group_filter(const tparquet::RowGroup& row_group,
bool* filter_group) {
_process_column_stat_filter(row_group.columns, filter_group);
_init_chunk_dicts();
RETURN_IF_ERROR(_process_dict_filter(filter_group));
_init_bloom_filter();
RETURN_IF_ERROR(_process_bloom_filter(filter_group));
return Status::OK();
}
Status ParquetReader::_process_column_stat_filter(const std::vector<tparquet::ColumnChunk>& columns,
bool* filter_group) {
// It will not filter if head_group_offset equals tail_group_offset
std::unordered_set<int> _parquet_column_ids(_include_column_ids.begin(),
_include_column_ids.end());
for (SlotId slot_id = 0; slot_id < _tuple_desc->slots().size(); slot_id++) {
auto slot_iter = _slot_conjuncts.find(slot_id);
if (slot_iter == _slot_conjuncts.end()) {
continue;
}
const std::string& col_name = _tuple_desc->slots()[slot_id]->col_name();
auto col_iter = _map_column.find(col_name);
if (col_iter == _map_column.end()) {
continue;
}
int parquet_col_id = col_iter->second;
if (_parquet_column_ids.end() == _parquet_column_ids.find(parquet_col_id)) {
// Column not exist in parquet file
continue;
}
auto statistic = columns[parquet_col_id].meta_data.statistics;
if (!statistic.__isset.max || !statistic.__isset.min) {
continue;
}
// Min-max of statistic is plain-encoded value
*filter_group = _determine_filter_min_max(slot_iter->second, statistic.min, statistic.max);
if (*filter_group) {
break;
}
}
return Status::OK();
}
void ParquetReader::_init_chunk_dicts() {}
Status ParquetReader::_process_dict_filter(bool* filter_group) {
return Status();
}
void ParquetReader::_init_bloom_filter() {}
Status ParquetReader::_process_bloom_filter(bool* filter_group) {
RETURN_IF_ERROR(_file_reader->seek(0));
return Status();
}
int64_t ParquetReader::_get_column_start_offset(const tparquet::ColumnMetaData& column) {
if (column.__isset.dictionary_page_offset) {
DCHECK_LT(column.dictionary_page_offset, column.data_page_offset);
return column.dictionary_page_offset;
}
return column.data_page_offset;
}
} // namespace doris::vectorized

View File

@ -42,6 +42,12 @@ namespace doris::vectorized {
// int64_t total_bytes = 0;
// };
class RowGroupReader;
class PageIndex;
struct RowRange {
int64_t first_row;
int64_t last_row;
};
class ParquetReadColumn {
public:
@ -58,49 +64,73 @@ private:
class ParquetReader {
public:
ParquetReader(FileReader* file_reader, int32_t num_of_columns_from_file,
ParquetReader(FileReader* file_reader, int32_t num_of_columns_from_file, size_t batch_size,
int64_t range_start_offset, int64_t range_size);
~ParquetReader();
Status init_reader(const TupleDescriptor* tuple_desc,
const std::vector<SlotDescriptor*>& tuple_slot_descs,
const std::vector<ExprContext*>& conjunct_ctxs, const std::string& timezone);
std::vector<ExprContext*>& conjunct_ctxs, const std::string& timezone);
Status read_next_batch(Block* block);
Status read_next_batch(Block* block, bool* eof);
bool has_next() const { return !_batch_eof; };
// std::shared_ptr<Statistics>& statistics() { return _statistics; }
// std::shared_ptr<Statistics>& statistics() { return _statistics; }
void close();
int64_t size() const { return _file_reader->size(); }
private:
Status _init_read_columns(const std::vector<SlotDescriptor*>& tuple_slot_descs);
Status _init_row_group_reader(const TupleDescriptor* tuple_desc, int64_t range_start_offset,
int64_t range_size,
const std::vector<ExprContext*>& conjunct_ctxs);
void _fill_block_data(Block* block, int group_id);
Status _init_row_group_readers(const TupleDescriptor* tuple_desc, int64_t range_start_offset,
int64_t range_size,
const std::vector<ExprContext*>& conjunct_ctxs);
void _init_conjuncts(const TupleDescriptor* tuple_desc,
const std::vector<ExprContext*>& conjunct_ctxs);
// Page Index Filter
bool _has_page_index(std::vector<tparquet::ColumnChunk> columns);
Status _process_page_index(std::vector<tparquet::ColumnChunk> columns);
Status _process_page_index(tparquet::RowGroup& row_group,
std::vector<RowRange>& skipped_row_ranges);
// Row Group Filter
bool _is_misaligned_range_group(const tparquet::RowGroup& row_group);
Status _process_column_stat_filter(const std::vector<tparquet::ColumnChunk>& column_meta,
bool* filter_group);
Status _process_row_group_filter(const tparquet::RowGroup& row_group, bool* filter_group);
void _init_chunk_dicts();
Status _process_dict_filter(bool* filter_group);
void _init_bloom_filter();
Status _process_bloom_filter(bool* filter_group);
Status _filter_row_groups(std::vector<int32_t>* read_row_group_ids);
int64_t _get_column_start_offset(const tparquet::ColumnMetaData& column_init_column_readers);
bool _determine_filter_min_max(const std::vector<ExprContext*>& conjuncts,
const std::string& encoded_min, const std::string& encoded_max);
void _eval_binary_predicate(ExprContext* ctx, const char* min_bytes, const char* max_bytes,
bool& need_filter);
void _eval_in_predicate(ExprContext* ctx, const char* min_bytes, const char* max_bytes,
bool& need_filter);
private:
FileReader* _file_reader;
std::shared_ptr<FileMetaData> _file_metadata;
std::shared_ptr<RowGroupReader> _row_group_reader;
std::unique_ptr<tparquet::FileMetaData> _t_metadata;
std::shared_ptr<PageIndex> _page_index;
int _total_groups; // num of groups(stripes) of a parquet(orc) file
// int _current_group; // current group(stripe)
std::vector<std::shared_ptr<RowGroupReader>> _row_group_readers;
int32_t _total_groups; // num of groups(stripes) of a parquet(orc) file
int32_t _current_row_group_id;
// std::shared_ptr<Statistics> _statistics;
const int32_t _num_of_columns_from_file;
std::map<std::string, int> _map_column; // column-name <---> column-index
std::vector<int> _include_column_ids; // columns that need to get from file
std::shared_ptr<std::vector<ExprContext*>> _conjunct_ctxs;
std::unordered_map<int, std::vector<ExprContext*>> _slot_conjuncts;
std::vector<int> _include_column_ids; // columns that need to get from file
std::vector<ParquetReadColumn> _read_columns;
bool* _file_eof;
// parquet file reader object
bool* _batch_eof;
size_t _batch_size;
int64_t _range_start_offset;
int64_t _range_size;
const TupleDescriptor* _tuple_desc; // get all slot info
};
} // namespace doris::vectorized

View File

@ -147,7 +147,7 @@ static Status get_column_values(FileReader* file_reader, tparquet::ColumnChunk*
// load page data into underlying container
chunk_reader.load_page_data();
// decode page data
return chunk_reader.decode_values(doris_column, data_type, chunk_reader.num_values());
return chunk_reader.decode_values(doris_column, data_type, chunk_reader.remaining_num_values());
}
static void create_block(std::unique_ptr<vectorized::Block>& block) {