[feature-wip](parquet-reader) fix string test and support decimal64 (#13184)

1. Refactor arguments list of parquet min max filter, pass parquet type for  min max value parsing
2. Fix the filter of string min max

Co-authored-by: jinzhe <jinzhe@selectdb.com>
This commit is contained in:
slothever
2022-10-12 16:52:28 +08:00
committed by GitHub
parent bb4414e303
commit 4fc7a048d2
4 changed files with 263 additions and 153 deletions

View File

@ -38,26 +38,6 @@ namespace doris::vectorized {
return true; \
}
#define _FILTER_GROUP_BY_GT_PRED(conjunct_value, max) \
if (max <= conjunct_value) { \
return true; \
}
#define _FILTER_GROUP_BY_GE_PRED(conjunct_value, max) \
if (max < conjunct_value) { \
return true; \
}
#define _FILTER_GROUP_BY_LT_PRED(conjunct_value, min) \
if (min >= conjunct_value) { \
return true; \
}
#define _FILTER_GROUP_BY_LE_PRED(conjunct_value, min) \
if (min > conjunct_value) { \
return true; \
}
#define _FILTER_GROUP_BY_IN(T, in_pred_values, min_bytes, max_bytes) \
std::vector<T> in_values; \
for (auto val : in_pred_values) { \
@ -76,32 +56,80 @@ namespace doris::vectorized {
return true; \
}
static bool _eval_in_val(PrimitiveType conjunct_type, std::vector<void*> in_pred_values,
const char* min_bytes, const char* max_bytes) {
switch (conjunct_type) {
struct ColumnMinMaxParams {
PrimitiveType conjunct_type;
tparquet::Type::type parquet_type;
void* value;
// Use for decimal type
int32_t parquet_precision;
int32_t parquet_scale;
int32_t parquet_type_length;
// Use for in predicate
std::vector<void*> in_pred_values;
const char* min_bytes;
const char* max_bytes;
};
template <typename T>
static void _align_decimal_v2_scale(T* conjunct_value, int32_t value_scale, T* parquet_value,
int32_t parquet_scale) {
if (value_scale > parquet_scale) {
*parquet_value = *parquet_value * common::exp10_i32(value_scale - parquet_scale);
} else if (value_scale < parquet_scale) {
*conjunct_value = *conjunct_value * common::exp10_i32(parquet_scale - value_scale);
}
}
template <typename T>
static void _decode_decimal_v2_to_primary(const ColumnMinMaxParams& params,
const char* raw_parquet_val, T* out_value,
T* parquet_val) {
*parquet_val = reinterpret_cast<const T*>(raw_parquet_val)[0];
DecimalV2Value conjunct_value = *((DecimalV2Value*)params.value);
*out_value = conjunct_value.value();
_align_decimal_v2_scale(out_value, conjunct_value.scale(), parquet_val, params.parquet_scale);
}
// todo: support decimal128 after the test passes
//static Int128 _decode_value_to_int128(const ColumnMinMaxParams& params,
// const char* raw_parquet_val) {
// const uint8_t* buf = reinterpret_cast<const uint8_t*>(raw_parquet_val);
// int32_t length = params.parquet_type_length;
// Int128 value = buf[0] & 0x80 ? -1 : 0;
// memcpy(reinterpret_cast<uint8_t*>(&value) + sizeof(value) - length, buf, length);
// return BigEndian::ToHost128(value);
//}
static bool _eval_in_val(const ColumnMinMaxParams& params) {
switch (params.conjunct_type) {
case TYPE_TINYINT: {
_FILTER_GROUP_BY_IN(int8_t, in_pred_values, min_bytes, max_bytes)
_FILTER_GROUP_BY_IN(int8_t, params.in_pred_values, params.min_bytes, params.max_bytes)
break;
}
case TYPE_SMALLINT: {
_FILTER_GROUP_BY_IN(int16_t, in_pred_values, min_bytes, max_bytes)
_FILTER_GROUP_BY_IN(int16_t, params.in_pred_values, params.min_bytes, params.max_bytes)
break;
}
case TYPE_DECIMAL32:
case TYPE_INT: {
_FILTER_GROUP_BY_IN(int32_t, in_pred_values, min_bytes, max_bytes)
_FILTER_GROUP_BY_IN(int32_t, params.in_pred_values, params.min_bytes, params.max_bytes)
break;
}
case TYPE_DECIMAL64:
case TYPE_BIGINT: {
_FILTER_GROUP_BY_IN(int64_t, in_pred_values, min_bytes, max_bytes)
_FILTER_GROUP_BY_IN(int64_t, params.in_pred_values, params.min_bytes, params.max_bytes)
break;
}
case TYPE_DECIMALV2: {
break;
}
case TYPE_STRING:
case TYPE_VARCHAR:
case TYPE_CHAR: {
std::vector<const char*> in_values;
for (auto val : in_pred_values) {
const char* value = ((std::string*)val)->data();
in_values.emplace_back(value);
for (auto val : params.in_pred_values) {
std::string value = ((StringValue*)val)->to_string();
in_values.emplace_back(value.data());
}
if (in_values.empty()) {
return false;
@ -109,7 +137,7 @@ static bool _eval_in_val(PrimitiveType conjunct_type, std::vector<void*> in_pred
auto result = std::minmax_element(in_values.begin(), in_values.end());
const char* in_min = *result.first;
const char* in_max = *result.second;
if (strcmp(in_max, min_bytes) < 0 || strcmp(in_min, max_bytes) > 0) {
if (strcmp(in_max, params.min_bytes) < 0 || strcmp(in_min, params.max_bytes) > 0) {
return true;
}
break;
@ -120,34 +148,77 @@ static bool _eval_in_val(PrimitiveType conjunct_type, std::vector<void*> in_pred
return false;
}
static bool _eval_eq(PrimitiveType conjunct_type, void* value, const char* min_bytes,
const char* max_bytes) {
switch (conjunct_type) {
static bool _eval_eq(const ColumnMinMaxParams& params) {
switch (params.conjunct_type) {
case TYPE_TINYINT: {
_PLAIN_DECODE(int16_t, value, min_bytes, max_bytes, conjunct_value, min, max)
_PLAIN_DECODE(int16_t, params.value, params.min_bytes, params.max_bytes, conjunct_value,
min, max)
_FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max)
break;
}
case TYPE_SMALLINT: {
_PLAIN_DECODE(int16_t, value, min_bytes, max_bytes, conjunct_value, min, max)
_PLAIN_DECODE(int16_t, params.value, params.min_bytes, params.max_bytes, conjunct_value,
min, max)
_FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max)
break;
}
case TYPE_DECIMAL32:
case TYPE_INT: {
_PLAIN_DECODE(int32_t, value, min_bytes, max_bytes, conjunct_value, min, max)
_PLAIN_DECODE(int32_t, params.value, params.min_bytes, params.max_bytes, conjunct_value,
min, max)
_FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max)
break;
}
case TYPE_DECIMAL64:
case TYPE_BIGINT: {
_PLAIN_DECODE(int64_t, value, min_bytes, max_bytes, conjunct_value, min, max)
_PLAIN_DECODE(int64_t, params.value, params.min_bytes, params.max_bytes, conjunct_value,
min, max)
_FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max)
break;
}
case TYPE_DECIMALV2: {
if (params.parquet_type == tparquet::Type::INT32) {
int32_t min_value = reinterpret_cast<const int32_t*>(params.min_bytes)[0];
int32_t max_value = reinterpret_cast<const int32_t*>(params.max_bytes)[0];
DecimalV2Value conjunct_value = *((DecimalV2Value*)params.value);
int32_t conjunct_int_value = conjunct_value.value();
_align_decimal_v2_scale(&conjunct_int_value, conjunct_value.scale(), &min_value,
params.parquet_scale);
_align_decimal_v2_scale(&conjunct_int_value, conjunct_value.scale(), &max_value,
params.parquet_scale);
_FILTER_GROUP_BY_EQ_PRED(conjunct_int_value, min_value, max_value)
} else if (params.parquet_type == tparquet::Type::INT64) {
int64_t min_value = reinterpret_cast<const int64_t*>(params.min_bytes)[0];
int64_t max_value = reinterpret_cast<const int64_t*>(params.max_bytes)[0];
DecimalV2Value conjunct_value = *((DecimalV2Value*)params.value);
int64_t conjunct_int_value = conjunct_value.value();
_align_decimal_v2_scale(&conjunct_int_value, conjunct_value.scale(), &min_value,
params.parquet_scale);
_align_decimal_v2_scale(&conjunct_int_value, conjunct_value.scale(), &max_value,
params.parquet_scale);
_FILTER_GROUP_BY_EQ_PRED(conjunct_int_value, min_value, max_value)
}
break;
// When precision exceeds 18, decimal will use tparquet::Type::FIXED_LEN_BYTE_ARRAY to encode
// todo: support decimal128 after the test passes
// else if (params.parquet_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
// DecimalV2Value conjunct_value = *((DecimalV2Value*)params.value);
// Int128 conjunct_int_value = conjunct_value.value();
// Int128 max = _decode_value_to_int128(params, params.max_bytes);
// _align_decimal_v2_scale(&conjunct_int_value, conjunct_value.scale(), &max,
// params.parquet_scale);
// Int128 min = _decode_value_to_int128(params, params.min_bytes);
// _align_decimal_v2_scale(&conjunct_int_value, conjunct_value.scale(), &min,
// params.parquet_scale);
// _FILTER_GROUP_BY_EQ_PRED(conjunct_int_value, min, max)
// }
}
case TYPE_STRING:
case TYPE_VARCHAR:
case TYPE_CHAR: {
const char* conjunct_value = ((std::string*)value)->data();
if (strcmp(conjunct_value, min_bytes) < 0 || strcmp(conjunct_value, max_bytes) > 0) {
std::string conjunct_value = ((StringValue*)params.value)->to_string();
if (strcmp(conjunct_value.data(), params.min_bytes) < 0 ||
strcmp(conjunct_value.data(), params.max_bytes) > 0) {
return true;
}
break;
@ -158,33 +229,73 @@ static bool _eval_eq(PrimitiveType conjunct_type, void* value, const char* min_b
return false;
}
static bool _eval_gt(PrimitiveType conjunct_type, void* value, const char* max_bytes) {
switch (conjunct_type) {
template <typename T>
static bool _filter_group_by_gt_or_ge(T conjunct_value, T max, bool is_ge) {
if (!is_ge) {
if (max <= conjunct_value) {
return true;
}
} else {
if (max < conjunct_value) {
return true;
}
}
return false;
}
static bool _eval_gt(const ColumnMinMaxParams& params, bool is_eq) {
switch (params.conjunct_type) {
case TYPE_TINYINT: {
_PLAIN_DECODE_SINGLE(int8_t, value, max_bytes, conjunct_value, max)
_FILTER_GROUP_BY_GT_PRED(conjunct_value, max)
break;
_PLAIN_DECODE_SINGLE(int8_t, params.value, params.max_bytes, conjunct_value, max)
return _filter_group_by_gt_or_ge(conjunct_value, max, is_eq);
}
case TYPE_SMALLINT: {
_PLAIN_DECODE_SINGLE(int16_t, value, max_bytes, conjunct_value, max)
_FILTER_GROUP_BY_GT_PRED(conjunct_value, max)
break;
_PLAIN_DECODE_SINGLE(int16_t, params.value, params.max_bytes, conjunct_value, max)
return _filter_group_by_gt_or_ge(conjunct_value, max, is_eq);
}
case TYPE_DECIMAL32:
case TYPE_INT: {
_PLAIN_DECODE_SINGLE(int32_t, value, max_bytes, conjunct_value, max)
_FILTER_GROUP_BY_GT_PRED(conjunct_value, max)
break;
_PLAIN_DECODE_SINGLE(int32_t, params.value, params.max_bytes, conjunct_value, max)
return _filter_group_by_gt_or_ge(conjunct_value, max, is_eq);
}
case TYPE_DECIMAL64:
case TYPE_BIGINT: {
_PLAIN_DECODE_SINGLE(int64_t, value, max_bytes, conjunct_value, max)
_FILTER_GROUP_BY_GT_PRED(conjunct_value, max)
_PLAIN_DECODE_SINGLE(int64_t, params.value, params.max_bytes, conjunct_value, max)
return _filter_group_by_gt_or_ge(conjunct_value, max, is_eq);
}
case TYPE_DECIMALV2: {
if (params.parquet_type == tparquet::Type::INT32) {
int32_t conjunct_int_value = 0;
int32_t parquet_value = 0;
_decode_decimal_v2_to_primary(params, params.max_bytes, &conjunct_int_value,
&parquet_value);
return _filter_group_by_gt_or_ge(conjunct_int_value, parquet_value, is_eq);
} else if (params.parquet_type == tparquet::Type::INT64) {
int64_t conjunct_int_value = 0;
int64_t parquet_value = 0;
_decode_decimal_v2_to_primary(params, params.max_bytes, &conjunct_int_value,
&parquet_value);
return _filter_group_by_gt_or_ge(conjunct_int_value, parquet_value, is_eq);
}
break;
// When precision exceeds 18, decimal will use tparquet::Type::FIXED_LEN_BYTE_ARRAY to encode
// todo: support decimal128 after the test passes
// else if (params.parquet_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
// DecimalV2Value conjunct_value = *((DecimalV2Value*)params.value);
// Int128 conjunct_int_value = conjunct_value.value();
// Int128 max = _decode_value_to_int128(params, params.max_bytes);
// _align_decimal_v2_scale(&conjunct_int_value, conjunct_value.scale(), &max,
// params.parquet_scale);
// return _filter_group_by_gt_or_ge(conjunct_int_value, max, is_eq);
// }
}
case TYPE_STRING:
case TYPE_VARCHAR:
case TYPE_CHAR: {
const char* conjunct_value = ((std::string*)value)->data();
if (strcmp(max_bytes, conjunct_value) <= 0) {
std::string conjunct_value = ((StringValue*)params.value)->to_string();
if (!is_eq && strcmp(params.max_bytes, conjunct_value.data()) <= 0) {
return true;
} else if (strcmp(params.max_bytes, conjunct_value.data()) < 0) {
return true;
}
break;
@ -195,111 +306,82 @@ static bool _eval_gt(PrimitiveType conjunct_type, void* value, const char* max_b
return false;
}
static bool _eval_ge(PrimitiveType conjunct_type, void* value, const char* max_bytes) {
switch (conjunct_type) {
case TYPE_TINYINT: {
_PLAIN_DECODE_SINGLE(int8_t, value, max_bytes, conjunct_value, max)
_FILTER_GROUP_BY_GE_PRED(conjunct_value, max)
break;
}
case TYPE_SMALLINT: {
_PLAIN_DECODE_SINGLE(int16_t, value, max_bytes, conjunct_value, max)
_FILTER_GROUP_BY_GE_PRED(conjunct_value, max)
break;
}
case TYPE_INT: {
_PLAIN_DECODE_SINGLE(int32_t, value, max_bytes, conjunct_value, max)
_FILTER_GROUP_BY_GE_PRED(conjunct_value, max)
break;
}
case TYPE_BIGINT: {
_PLAIN_DECODE_SINGLE(int64_t, value, max_bytes, conjunct_value, max)
_FILTER_GROUP_BY_GE_PRED(conjunct_value, max)
break;
}
case TYPE_STRING:
case TYPE_VARCHAR:
case TYPE_CHAR: {
const char* conjunct_value = ((std::string*)value)->data();
if (strcmp(max_bytes, conjunct_value) < 0) {
template <typename T>
static bool _filter_group_by_lt_or_le(T conjunct_value, T min, bool is_le) {
if (!is_le) {
if (min >= conjunct_value) {
return true;
}
} else {
if (min > conjunct_value) {
return true;
}
break;
}
default:
return false;
}
return false;
}
static bool _eval_lt(PrimitiveType conjunct_type, void* value, const char* min_bytes) {
switch (conjunct_type) {
static bool _eval_lt(const ColumnMinMaxParams& params, bool is_eq) {
switch (params.conjunct_type) {
case TYPE_TINYINT: {
_PLAIN_DECODE_SINGLE(int8_t, value, min_bytes, conjunct_value, min)
_FILTER_GROUP_BY_LT_PRED(conjunct_value, min)
break;
_PLAIN_DECODE_SINGLE(int8_t, params.value, params.min_bytes, conjunct_value, min)
return _filter_group_by_lt_or_le(conjunct_value, min, is_eq);
}
case TYPE_SMALLINT: {
_PLAIN_DECODE_SINGLE(int16_t, value, min_bytes, conjunct_value, min)
_FILTER_GROUP_BY_LT_PRED(conjunct_value, min)
break;
_PLAIN_DECODE_SINGLE(int16_t, params.value, params.min_bytes, conjunct_value, min)
return _filter_group_by_lt_or_le(conjunct_value, min, is_eq);
}
case TYPE_DECIMAL32:
case TYPE_INT: {
_PLAIN_DECODE_SINGLE(int32_t, value, min_bytes, conjunct_value, min)
_FILTER_GROUP_BY_LT_PRED(conjunct_value, min)
break;
_PLAIN_DECODE_SINGLE(int32_t, params.value, params.min_bytes, conjunct_value, min)
return _filter_group_by_lt_or_le(conjunct_value, min, is_eq);
}
case TYPE_DECIMAL64:
case TYPE_BIGINT: {
_PLAIN_DECODE_SINGLE(int64_t, value, min_bytes, conjunct_value, min)
_FILTER_GROUP_BY_LT_PRED(conjunct_value, min)
break;
_PLAIN_DECODE_SINGLE(int64_t, params.value, params.min_bytes, conjunct_value, min)
return _filter_group_by_lt_or_le(conjunct_value, min, is_eq);
}
case TYPE_STRING:
case TYPE_VARCHAR:
case TYPE_CHAR: {
const char* conjunct_value = ((std::string*)value)->data();
if (strcmp(min_bytes, conjunct_value) >= 0) {
std::string conjunct_value = ((StringValue*)params.value)->to_string();
if (!is_eq && strcmp(params.min_bytes, conjunct_value.data()) >= 0) {
return true;
} else if (strcmp(params.min_bytes, conjunct_value.data()) > 0) {
return true;
}
break;
}
default:
case TYPE_DECIMALV2: {
if (params.parquet_type == tparquet::Type::INT32) {
int32_t conjunct_int_value = 0;
int32_t parquet_value = 0;
_decode_decimal_v2_to_primary(params, params.min_bytes, &conjunct_int_value,
&parquet_value);
return _filter_group_by_lt_or_le(conjunct_int_value, parquet_value, is_eq);
} else if (params.parquet_type == tparquet::Type::INT64) {
int64_t conjunct_int_value = 0;
int64_t parquet_value = 0;
_decode_decimal_v2_to_primary(params, params.min_bytes, &conjunct_int_value,
&parquet_value);
return _filter_group_by_lt_or_le(conjunct_int_value, parquet_value, is_eq);
}
break;
// When precision exceeds 18, decimal will use tparquet::Type::FIXED_LEN_BYTE_ARRAY to encode
// todo: support decimal128 after the test passes
// else if (params.parquet_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
// DecimalV2Value conjunct_value = *((DecimalV2Value*)params.value);
// Int128 conjunct_int_value = conjunct_value.value();
// Int128 min = _decode_value_to_int128(params, params.min_bytes);
// _align_decimal_v2_scale(&conjunct_int_value, conjunct_value.scale(), &min,
// params.parquet_scale);
// return _filter_group_by_lt_or_le(conjunct_int_value, min, is_eq);
// }
}
case TYPE_DATE: {
// doris::DateTimeValue* min_date = (doris::DateTimeValue*)params.value;
// LOG(INFO) << min_date->debug_string();
return false;
}
return false;
}
static bool _eval_le(PrimitiveType conjunct_type, void* value, const char* min_bytes) {
switch (conjunct_type) {
case TYPE_TINYINT: {
_PLAIN_DECODE_SINGLE(int8_t, value, min_bytes, conjunct_value, min)
_FILTER_GROUP_BY_LE_PRED(conjunct_value, min)
break;
}
case TYPE_SMALLINT: {
_PLAIN_DECODE_SINGLE(int16_t, value, min_bytes, conjunct_value, min)
_FILTER_GROUP_BY_LE_PRED(conjunct_value, min)
break;
}
case TYPE_INT: {
_PLAIN_DECODE_SINGLE(int32_t, value, min_bytes, conjunct_value, min)
_FILTER_GROUP_BY_LE_PRED(conjunct_value, min)
break;
}
case TYPE_BIGINT: {
_PLAIN_DECODE_SINGLE(int64_t, value, min_bytes, conjunct_value, min)
_FILTER_GROUP_BY_LE_PRED(conjunct_value, min)
break;
}
case TYPE_STRING:
case TYPE_VARCHAR:
case TYPE_CHAR: {
const char* conjunct_value = ((std::string*)value)->data();
if (strcmp(min_bytes, conjunct_value) > 0) {
return true;
}
break;
}
default:
return false;
}
@ -386,42 +468,48 @@ static void to_filter(const ColumnValueRange<primitive_type>& col_val_range,
}
}
static void _eval_predicate(ScanPredicate filter, PrimitiveType col_type, const char* min_bytes,
const char* max_bytes, bool& need_filter) {
static void _eval_predicate(const ScanPredicate& filter, ColumnMinMaxParams* params,
bool* need_filter) {
if (filter._values.empty()) {
return;
}
if (filter._op == TExprOpcode::FILTER_NEW_IN) {
need_filter = _eval_in_val(col_type, filter._values, min_bytes, max_bytes);
if (filter._values.size() == 1) {
params->value = filter._values[0];
*need_filter = _eval_eq(*params);
return;
}
params->in_pred_values = filter._values;
*need_filter = _eval_in_val(*params);
return;
}
// preserve TExprOpcode::FILTER_NEW_NOT_IN
auto& value = filter._values[0];
params->value = filter._values[0];
switch (filter._op) {
case TExprOpcode::EQ:
need_filter = _eval_eq(col_type, value, min_bytes, max_bytes);
*need_filter = _eval_eq(*params);
break;
case TExprOpcode::NE:
break;
case TExprOpcode::GT:
need_filter = _eval_gt(col_type, value, max_bytes);
*need_filter = _eval_gt(*params, false);
break;
case TExprOpcode::GE:
need_filter = _eval_ge(col_type, value, max_bytes);
*need_filter = _eval_gt(*params, true);
break;
case TExprOpcode::LT:
need_filter = _eval_lt(col_type, value, min_bytes);
*need_filter = _eval_lt(*params, false);
break;
case TExprOpcode::LE:
need_filter = _eval_le(col_type, value, min_bytes);
*need_filter = _eval_lt(*params, true);
break;
default:
break;
}
}
static bool determine_filter_min_max(ColumnValueRangeType& col_val_range,
const std::string& encoded_min,
static bool determine_filter_min_max(const ColumnValueRangeType& col_val_range,
const FieldSchema* col_schema, const std::string& encoded_min,
const std::string& encoded_max) {
const char* min_bytes = encoded_min.data();
const char* max_bytes = encoded_max.data();
@ -434,10 +522,21 @@ static bool determine_filter_min_max(ColumnValueRangeType& col_val_range,
to_filter(range, filters);
},
col_val_range);
if (filters.empty()) {
return false;
}
ColumnMinMaxParams params;
params.conjunct_type = col_type;
params.parquet_type = col_schema->physical_type;
params.parquet_precision = col_schema->parquet_schema.precision;
params.parquet_scale = col_schema->parquet_schema.scale;
params.parquet_type_length = col_schema->parquet_schema.type_length;
params.min_bytes = min_bytes;
params.max_bytes = max_bytes;
for (int i = 0; i < filters.size(); i++) {
ScanPredicate filter = filters[i];
_eval_predicate(filter, col_type, min_bytes, max_bytes, need_filter);
_eval_predicate(filters[i], &params, &need_filter);
if (need_filter) {
break;
}

View File

@ -39,6 +39,7 @@ Status PageIndex::create_skipped_row_range(tparquet::OffsetIndex& offset_index,
Status PageIndex::collect_skipped_page_range(tparquet::ColumnIndex* column_index,
ColumnValueRangeType& col_val_range,
const FieldSchema* col_schema,
std::vector<int>& skipped_ranges) {
const std::vector<std::string>& encoded_min_vals = column_index->min_values;
const std::vector<std::string>& encoded_max_vals = column_index->max_values;
@ -46,7 +47,7 @@ Status PageIndex::collect_skipped_page_range(tparquet::ColumnIndex* column_index
const int num_of_pages = column_index->null_pages.size();
for (int page_id = 0; page_id < num_of_pages; page_id++) {
if (determine_filter_min_max(col_val_range, encoded_min_vals[page_id],
if (determine_filter_min_max(col_val_range, col_schema, encoded_min_vals[page_id],
encoded_max_vals[page_id])) {
skipped_ranges.emplace_back(page_id);
}

View File

@ -32,6 +32,7 @@ public:
int page_idx, RowRange* row_range);
Status collect_skipped_page_range(tparquet::ColumnIndex* column_index,
ColumnValueRangeType& col_val_range,
const FieldSchema* col_schema,
std::vector<int>& skipped_ranges);
bool check_and_get_page_index_ranges(const std::vector<tparquet::ColumnChunk>& columns);
Status parse_column_index(const tparquet::ColumnChunk& chunk, const uint8_t* buff,

View File

@ -312,6 +312,7 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group,
RETURN_IF_ERROR(
_file_reader->readat(page_index._column_index_start, buffer_size, &bytes_read, buff));
auto& schema_desc = _file_metadata->schema();
std::vector<RowRange> skipped_row_ranges;
for (auto& read_col : _read_columns) {
auto conjunct_iter = _colname_to_value_range->find(read_col._file_slot_name);
@ -320,6 +321,9 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group,
}
auto& chunk = row_group.columns[read_col._parquet_col_id];
tparquet::ColumnIndex column_index;
if (chunk.column_index_offset == 0 && chunk.column_index_length == 0) {
return Status::OK();
}
RETURN_IF_ERROR(page_index.parse_column_index(chunk, buff, &column_index));
const int num_of_pages = column_index.null_pages.size();
if (num_of_pages <= 0) {
@ -327,7 +331,9 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group,
}
auto& conjuncts = conjunct_iter->second;
std::vector<int> skipped_page_range;
page_index.collect_skipped_page_range(&column_index, conjuncts, skipped_page_range);
const FieldSchema* col_schema = schema_desc.get_column(read_col._file_slot_name);
page_index.collect_skipped_page_range(&column_index, conjuncts, col_schema,
skipped_page_range);
if (skipped_page_range.empty()) {
return Status::OK();
}
@ -387,6 +393,7 @@ Status ParquetReader::_process_row_group_filter(const tparquet::RowGroup& row_gr
Status ParquetReader::_process_column_stat_filter(const std::vector<tparquet::ColumnChunk>& columns,
bool* filter_group) {
auto& schema_desc = _file_metadata->schema();
for (auto& col_name : _column_names) {
auto col_iter = _map_column.find(col_name);
if (col_iter == _map_column.end()) {
@ -401,8 +408,10 @@ Status ParquetReader::_process_column_stat_filter(const std::vector<tparquet::Co
if (!statistic.__isset.max || !statistic.__isset.min) {
continue;
}
const FieldSchema* col_schema = schema_desc.get_column(col_name);
// Min-max of statistic is plain-encoded value
*filter_group = determine_filter_min_max(slot_iter->second, statistic.min, statistic.max);
*filter_group = determine_filter_min_max(slot_iter->second, col_schema, statistic.min,
statistic.max);
if (*filter_group) {
break;
}