[feature](multi-catalog) read parquet file by start/offset (#10843)

To avoid reading the repeat row group, we should align offsets
This commit is contained in:
slothever
2022-07-18 20:51:08 +08:00
committed by GitHub
parent 60dd322aba
commit 8a366c9ba2
14 changed files with 169 additions and 66 deletions

View File

@ -769,7 +769,7 @@ CONF_Int32(object_pool_buffer_size, "100");
// ParquetReaderWrap prefetch buffer size
CONF_Int32(parquet_reader_max_buffer_size, "50");
CONF_Bool(parquet_predicate_push_down, "false");
CONF_Bool(parquet_predicate_push_down, "true");
// When the rows number reached this limit, will check the filter rate the of bloomfilter
// if it is lower than a specific threshold, the predicate will be disabled.

View File

@ -37,11 +37,14 @@ namespace doris {
// Broker
ParquetReaderWrap::ParquetReaderWrap(FileReader* file_reader, int64_t batch_size,
int32_t num_of_columns_from_file)
int32_t num_of_columns_from_file, int64_t range_start_offset,
int64_t range_size)
: ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file),
_rows_of_group(0),
_current_line_of_group(0),
_current_line_of_batch(0) {}
_current_line_of_batch(0),
_range_start_offset(range_start_offset),
_range_size(range_size) {}
ParquetReaderWrap::~ParquetReaderWrap() {
_closed = true;
@ -101,8 +104,12 @@ Status ParquetReaderWrap::init_reader(const TupleDescriptor* tuple_desc,
RETURN_IF_ERROR(column_indices(tuple_slot_descs));
if (config::parquet_predicate_push_down) {
_row_group_reader.reset(new RowGroupReader(conjunct_ctxs, _file_metadata, this));
_row_group_reader->init_filter_groups(tuple_desc, _map_column, _include_column_ids);
int64_t file_size = 0;
size(&file_size);
_row_group_reader.reset(new RowGroupReader(_range_start_offset, _range_size,
conjunct_ctxs, _file_metadata, this));
_row_group_reader->init_filter_groups(tuple_desc, _map_column, _include_column_ids,
file_size);
}
_thread = std::thread(&ParquetReaderWrap::prefetch_batch, this);
return Status::OK();

View File

@ -62,8 +62,8 @@ class RowGroupReader;
class ParquetReaderWrap final : public ArrowReaderWrap {
public:
// batch_size is not use here
ParquetReaderWrap(FileReader* file_reader, int64_t batch_size,
int32_t num_of_columns_from_file);
ParquetReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file,
int64_t range_start_offset, int64_t range_size);
~ParquetReaderWrap() override;
// Read
@ -100,6 +100,8 @@ private:
int _current_line_of_group;
int _current_line_of_batch;
std::string _timezone;
int64_t _range_start_offset;
int64_t _range_size;
private:
std::atomic<bool> _closed = false;

View File

@ -57,30 +57,37 @@
return true; \
}
#define _FILTER_GROUP_BY_IN(T, in_pred_values, min_bytes, max_bytes) \
std::vector<T> in_values; \
for (auto val : in_pred_values) { \
T value = reinterpret_cast<T*>(val)[0]; \
in_values.emplace_back(value); \
} \
if (in_values.empty()) { \
return false; \
} \
std::sort(in_values.begin(), in_values.end()); \
T in_min = in_values.front(); \
T in_max = in_values.back(); \
const T group_min = reinterpret_cast<const T*>(min_bytes)[0]; \
const T group_max = reinterpret_cast<const T*>(max_bytes)[0]; \
if (in_max < group_min || in_min > group_max) { \
return true; \
#define _FILTER_GROUP_BY_IN(T, in_pred_values, min_bytes, max_bytes) \
std::vector<T> in_values; \
for (auto val : in_pred_values) { \
T value = reinterpret_cast<T*>(val)[0]; \
in_values.emplace_back(value); \
} \
if (in_values.empty()) { \
return false; \
} \
auto result = std::minmax_element(in_values.begin(), in_values.end()); \
T in_min = *result.first; \
T in_max = *result.second; \
const T group_min = reinterpret_cast<const T*>(min_bytes)[0]; \
const T group_max = reinterpret_cast<const T*>(max_bytes)[0]; \
if (in_max < group_min || in_min > group_max) { \
return true; \
}
#define PARQUET_HEAD 4
namespace doris {
RowGroupReader::RowGroupReader(const std::vector<ExprContext*>& conjunct_ctxs,
RowGroupReader::RowGroupReader(int64_t range_start_offset, int64_t range_size,
const std::vector<ExprContext*>& conjunct_ctxs,
std::shared_ptr<parquet::FileMetaData>& file_metadata,
ParquetReaderWrap* parent)
: _conjunct_ctxs(conjunct_ctxs), _file_metadata(file_metadata), _parent(parent) {}
: _range_start_offset(range_start_offset),
_range_size(range_size),
_conjunct_ctxs(conjunct_ctxs),
_file_metadata(file_metadata),
_parent(parent) {}
RowGroupReader::~RowGroupReader() {
_slot_conjuncts.clear();
@ -89,20 +96,67 @@ RowGroupReader::~RowGroupReader() {
Status RowGroupReader::init_filter_groups(const TupleDescriptor* tuple_desc,
const std::map<std::string, int>& map_column,
const std::vector<int>& include_column_ids) {
const std::vector<int>& include_column_ids,
int64_t file_size) {
int total_group = _file_metadata->num_row_groups();
// It will not filter if head_group_offset equals tail_group_offset
int64_t head_group_offset = _range_start_offset;
int64_t tail_group_offset = _range_start_offset;
int64_t range_end_offset = _range_start_offset + _range_size;
if (_range_size > 0 && file_size > 0) {
// todo: extract to function
for (int row_group_id = 0; row_group_id < total_group; row_group_id++) {
int64_t cur_group_offset = _get_group_offset(row_group_id);
// when a whole file is in a split, range_end_offset is the EOF offset
if (row_group_id == total_group - 1) {
if (cur_group_offset < _range_start_offset) {
head_group_offset = cur_group_offset;
}
if (range_end_offset >= file_size) {
tail_group_offset = file_size;
} else {
tail_group_offset = cur_group_offset;
}
break;
}
int64_t next_group_offset = _get_group_offset(row_group_id + 1);
if (_range_start_offset >= cur_group_offset &&
_range_start_offset < next_group_offset) {
// Enter the branch only the fist time to find head group
head_group_offset = cur_group_offset;
}
if (range_end_offset < next_group_offset) {
tail_group_offset = cur_group_offset;
// find tail, break
break;
}
}
if (tail_group_offset < head_group_offset) {
tail_group_offset = head_group_offset;
}
}
std::unordered_set<int> parquet_column_ids(include_column_ids.begin(),
include_column_ids.end());
_init_conjuncts(tuple_desc, map_column, parquet_column_ids);
int total_group = _file_metadata->num_row_groups();
_parent->statistics()->total_groups = total_group;
_parent->statistics()->total_rows = _file_metadata->num_rows();
int32_t filtered_num_row_groups = 0;
int64_t filtered_num_rows = 0;
int64_t filtered_total_byte_size = 0;
bool update_statistics = false;
for (int row_group_id = 0; row_group_id < total_group; row_group_id++) {
auto row_group_meta = _file_metadata->RowGroup(row_group_id);
if (_range_size > 0 && file_size > 0) {
int64_t start_offset = _get_group_offset(row_group_id);
int64_t end_offset = row_group_id == total_group - 1
? file_size
: _get_group_offset(row_group_id + 1);
if (start_offset >= tail_group_offset || end_offset <= head_group_offset) {
_filter_group.emplace(row_group_id);
VLOG_DEBUG << "Filter extra row group id: " << row_group_id;
continue;
}
}
// if head_read_offset <= start_offset < end_offset <= tail_read_offset
for (SlotId slot_id = 0; slot_id < tuple_desc->slots().size(); slot_id++) {
const std::string& col_name = tuple_desc->slots()[slot_id]->col_name();
auto col_iter = map_column.find(col_name);
@ -129,26 +183,36 @@ Status RowGroupReader::init_filter_groups(const TupleDescriptor* tuple_desc,
bool group_need_filter = _determine_filter_row_group(slot_iter->second, min, max);
if (group_need_filter) {
update_statistics = true;
filtered_num_row_groups++;
filtered_num_rows += row_group_meta->num_rows();
filtered_total_byte_size += row_group_meta->total_byte_size();
_add_filter_group(row_group_id, row_group_meta);
VLOG_DEBUG << "Filter row group id: " << row_group_id;
_filter_group.emplace(row_group_id);
break;
}
}
}
if (update_statistics) {
_parent->statistics()->filtered_row_groups = filtered_num_row_groups;
_parent->statistics()->filtered_rows = filtered_num_rows;
_parent->statistics()->filtered_total_bytes = filtered_total_byte_size;
_parent->statistics()->filtered_row_groups = _filtered_num_row_groups;
_parent->statistics()->filtered_rows = _filtered_num_rows;
_parent->statistics()->filtered_total_bytes = _filtered_total_byte_size;
VLOG_DEBUG << "Parquet file: " << _file_metadata->schema()->name()
<< ", Num of read row group: " << total_group
<< ", and num of skip row group: " << filtered_num_row_groups;
<< ", and num of skip row group: " << _filtered_num_row_groups;
}
return Status::OK();
}
int64_t RowGroupReader::_get_group_offset(int row_group_id) {
return _file_metadata->RowGroup(row_group_id)->ColumnChunk(0)->file_offset() - PARQUET_HEAD;
}
void RowGroupReader::_add_filter_group(int row_group_id,
std::unique_ptr<parquet::RowGroupMetaData>& row_group_meta) {
_filtered_num_row_groups++;
_filtered_num_rows += row_group_meta->num_rows();
_filtered_total_byte_size += row_group_meta->total_byte_size();
_filter_group.emplace(row_group_id);
}
void RowGroupReader::_init_conjuncts(const TupleDescriptor* tuple_desc,
const std::map<std::string, int>& map_column,
const std::unordered_set<int>& include_column_ids) {
@ -292,15 +356,15 @@ bool RowGroupReader::_eval_in_val(PrimitiveType conjunct_type, std::vector<void*
case TYPE_DATETIME: {
std::vector<const char*> in_values;
for (auto val : in_pred_values) {
const char* value = ((std::string*)val)->c_str();
const char* value = ((std::string*)val)->data();
in_values.emplace_back(value);
}
if (in_values.empty()) {
return false;
}
std::sort(in_values.begin(), in_values.end());
const char* in_min = in_values.front();
const char* in_max = in_values.back();
auto result = std::minmax_element(in_values.begin(), in_values.end());
const char* in_min = *result.first;
const char* in_max = *result.second;
if (strcmp(in_max, min_bytes) < 0 || strcmp(in_min, max_bytes) > 0) {
return true;
}
@ -350,7 +414,7 @@ bool RowGroupReader::_eval_eq(PrimitiveType conjunct_type, void* value, const ch
case TYPE_CHAR:
case TYPE_DATE:
case TYPE_DATETIME: {
const char* conjunct_value = ((std::string*)value)->c_str();
const char* conjunct_value = ((std::string*)value)->data();
if (strcmp(conjunct_value, min_bytes) < 0 || strcmp(conjunct_value, max_bytes) > 0) {
return true;
}
@ -400,7 +464,7 @@ bool RowGroupReader::_eval_gt(PrimitiveType conjunct_type, void* value, const ch
case TYPE_DATE:
case TYPE_DATETIME: {
// case TYPE_TIME:
const char* conjunct_value = ((std::string*)value)->c_str();
const char* conjunct_value = ((std::string*)value)->data();
if (strcmp(max_bytes, conjunct_value) <= 0) {
return true;
}
@ -450,7 +514,7 @@ bool RowGroupReader::_eval_ge(PrimitiveType conjunct_type, void* value, const ch
case TYPE_DATE:
case TYPE_DATETIME: {
// case TYPE_TIME:
const char* conjunct_value = ((std::string*)value)->c_str();
const char* conjunct_value = ((std::string*)value)->data();
if (strcmp(max_bytes, conjunct_value) < 0) {
return true;
}
@ -500,7 +564,7 @@ bool RowGroupReader::_eval_lt(PrimitiveType conjunct_type, void* value, const ch
case TYPE_DATE:
case TYPE_DATETIME: {
// case TYPE_TIME:
const char* conjunct_value = ((std::string*)value)->c_str();
const char* conjunct_value = ((std::string*)value)->data();
if (strcmp(min_bytes, conjunct_value) >= 0) {
return true;
}
@ -550,7 +614,7 @@ bool RowGroupReader::_eval_le(PrimitiveType conjunct_type, void* value, const ch
case TYPE_DATE:
case TYPE_DATETIME: {
// case TYPE_TIME:
const char* conjunct_value = ((std::string*)value)->c_str();
const char* conjunct_value = ((std::string*)value)->data();
if (strcmp(min_bytes, conjunct_value) > 0) {
return true;
}

View File

@ -37,18 +37,24 @@ class ParquetReaderWrap;
class RowGroupReader {
public:
RowGroupReader(const std::vector<ExprContext*>& conjunct_ctxs,
RowGroupReader(int64_t range_start_offset, int64_t range_size,
const std::vector<ExprContext*>& conjunct_ctxs,
std::shared_ptr<parquet::FileMetaData>& file_metadata,
ParquetReaderWrap* parent);
~RowGroupReader();
Status init_filter_groups(const TupleDescriptor* tuple_desc,
const std::map<std::string, int>& map_column,
const std::vector<int>& include_column_ids);
const std::vector<int>& include_column_ids, int64_t file_size);
std::unordered_set<int> filter_groups() { return _filter_group; };
private:
void _add_filter_group(int row_group_id,
std::unique_ptr<parquet::RowGroupMetaData>& row_group_meta);
int64_t _get_group_offset(int row_group_id);
void _init_conjuncts(const TupleDescriptor* tuple_desc,
const std::map<std::string, int>& _map_column,
const std::unordered_set<int>& include_column_ids);
@ -78,11 +84,18 @@ private:
bool _eval_le(PrimitiveType conjunct_type, void* value, const char* min_bytes);
private:
int64_t _range_start_offset;
int64_t _range_size;
int64_t _file_size;
std::map<int, std::vector<ExprContext*>> _slot_conjuncts;
std::unordered_set<int> _filter_group;
std::vector<ExprContext*> _conjunct_ctxs;
std::shared_ptr<parquet::FileMetaData> _file_metadata;
ParquetReaderWrap* _parent;
int32_t _filtered_num_row_groups = 0;
int64_t _filtered_num_rows = 0;
int64_t _filtered_total_byte_size = 0;
};
} // namespace doris

View File

@ -107,7 +107,7 @@ Status ParquetScanner::open_next_reader() {
num_of_columns_from_file = range.num_of_columns_from_file;
}
_cur_file_reader = new ParquetReaderWrap(file_reader.release(), _state->batch_size(),
num_of_columns_from_file);
num_of_columns_from_file, 0, 0);
auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId);
Status status = _cur_file_reader->init_reader(tuple_desc, _src_slot_descs, _conjunct_ctxs,
_state->timezone());

View File

@ -66,8 +66,9 @@ Status FileArrowScanner::_open_next_reader() {
int32_t num_of_columns_from_file = _file_slot_descs.size();
_cur_file_reader = _new_arrow_reader(file_reader.release(), _state->batch_size(),
num_of_columns_from_file);
_cur_file_reader =
_new_arrow_reader(file_reader.release(), _state->batch_size(),
num_of_columns_from_file, range.start_offset, range.size);
auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId);
Status status = _cur_file_reader->init_reader(tuple_desc, _file_slot_descs, _conjunct_ctxs,
@ -217,8 +218,11 @@ VFileParquetScanner::VFileParquetScanner(RuntimeState* state, RuntimeProfile* pr
}
ArrowReaderWrap* VFileParquetScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size,
int32_t num_of_columns_from_file) {
return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file);
int32_t num_of_columns_from_file,
int64_t range_start_offset,
int64_t range_size) {
return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file,
range_start_offset, range_size);
}
void VFileParquetScanner::_init_profiles(RuntimeProfile* profile) {
@ -237,7 +241,9 @@ VFileORCScanner::VFileORCScanner(RuntimeState* state, RuntimeProfile* profile,
: FileArrowScanner(state, profile, params, ranges, pre_filter_texprs, counter) {}
ArrowReaderWrap* VFileORCScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size,
int32_t num_of_columns_from_file) {
int32_t num_of_columns_from_file,
int64_t range_start_offset,
int64_t range_size) {
return new ORCReaderWrap(file_reader, batch_size, num_of_columns_from_file);
}

View File

@ -53,7 +53,8 @@ public:
protected:
virtual ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size,
int32_t num_of_columns_from_file) = 0;
int32_t num_of_columns_from_file,
int64_t range_start_offset, int64_t range_size) = 0;
virtual void _update_profile(std::shared_ptr<Statistics>& statistics) {}
private:
@ -82,7 +83,8 @@ public:
protected:
ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size,
int32_t num_of_columns_from_file) override;
int32_t num_of_columns_from_file, int64_t range_start_offset,
int64_t range_size) override;
void _init_profiles(RuntimeProfile* profile) override;
void _update_profile(std::shared_ptr<Statistics>& statistics) override;
@ -105,7 +107,8 @@ public:
protected:
ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size,
int32_t num_of_columns_from_file) override;
int32_t num_of_columns_from_file, int64_t range_start_offset,
int64_t range_size) override;
void _init_profiles(RuntimeProfile* profile) override {};
};

View File

@ -76,8 +76,9 @@ Status VArrowScanner::_open_next_reader() {
if (range.__isset.num_of_columns_from_file) {
num_of_columns_from_file = range.num_of_columns_from_file;
}
_cur_file_reader = _new_arrow_reader(file_reader.release(), _state->batch_size(),
num_of_columns_from_file);
_cur_file_reader =
_new_arrow_reader(file_reader.release(), _state->batch_size(),
num_of_columns_from_file, range.start_offset, range.size);
auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId);
Status status = _cur_file_reader->init_reader(tuple_desc, _src_slot_descs, _conjunct_ctxs,
_state->timezone());

View File

@ -64,7 +64,8 @@ public:
protected:
virtual ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size,
int32_t num_of_columns_from_file) = 0;
int32_t num_of_columns_from_file,
int64_t range_start_offset, int64_t range_size) = 0;
private:
// Read next buffer from reader

View File

@ -30,7 +30,8 @@ VORCScanner::VORCScanner(RuntimeState* state, RuntimeProfile* profile,
counter) {}
ArrowReaderWrap* VORCScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size,
int32_t num_of_columns_from_file) {
int32_t num_of_columns_from_file,
int64_t range_start_offset, int64_t range_size) {
return new ORCReaderWrap(file_reader, batch_size, num_of_columns_from_file);
}

View File

@ -47,7 +47,8 @@ public:
protected:
ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size,
int32_t num_of_columns_from_file) override;
int32_t num_of_columns_from_file, int64_t range_start_offset,
int64_t range_size) override;
};
} // namespace doris::vectorized

View File

@ -31,8 +31,11 @@ VParquetScanner::VParquetScanner(RuntimeState* state, RuntimeProfile* profile,
counter) {}
ArrowReaderWrap* VParquetScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size,
int32_t num_of_columns_from_file) {
return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file);
int32_t num_of_columns_from_file,
int64_t range_start_offset,
int64_t range_size) {
return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file,
range_start_offset, range_size);
}
} // namespace doris::vectorized

View File

@ -48,7 +48,8 @@ public:
protected:
ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size,
int32_t num_of_columns_from_file) override;
int32_t num_of_columns_from_file, int64_t range_start_offset,
int64_t range_size) override;
};
} // namespace doris::vectorized