diff --git a/be/src/exec/olap_common.h b/be/src/exec/olap_common.h index 0fa99d2673..275d151ba6 100644 --- a/be/src/exec/olap_common.h +++ b/be/src/exec/olap_common.h @@ -254,6 +254,8 @@ public: _contain_null = contain_null; }; + int precision() const { return _precision; } + int scale() const { return _scale; } static void add_fixed_value_range(ColumnValueRange& range, CppType* value) { diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 85026d232c..77375831f6 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -17,6 +17,10 @@ #include "vorc_reader.h" +#include + +#include "cctz/civil_time.h" +#include "cctz/time_zone.h" #include "gutil/strings/substitute.h" #include "io/file_factory.h" #include "vec/columns/column_array.h" @@ -145,7 +149,7 @@ Status OrcReader::init_reader( _row_reader_options.range(_range_start_offset, _range_size); _row_reader_options.setTimezoneName(_ctz); RETURN_IF_ERROR(_init_read_columns()); - // _init_search_argument(colname_to_value_range); + _init_search_argument(colname_to_value_range); _row_reader_options.include(_read_cols); try { _row_reader = _reader->createRowReader(_row_reader_options); @@ -181,11 +185,282 @@ Status OrcReader::_init_read_columns() { return Status::OK(); } +struct OrcPredicate { + std::string col_name; + orc::PredicateDataType data_type; + std::vector literals; + SQLFilterOp op; +}; + +// orc only support LONG, FLOAT, STRING, DATE, DECIMAL, TIMESTAMP, BOOLEAN to push down predicates +static std::unordered_map TYPEKIND_TO_PREDICATE_TYPE = { + {orc::TypeKind::BYTE, orc::PredicateDataType::LONG}, + {orc::TypeKind::SHORT, orc::PredicateDataType::LONG}, + {orc::TypeKind::INT, orc::PredicateDataType::LONG}, + {orc::TypeKind::LONG, orc::PredicateDataType::LONG}, + {orc::TypeKind::FLOAT, orc::PredicateDataType::FLOAT}, + {orc::TypeKind::DOUBLE, orc::PredicateDataType::FLOAT}, + {orc::TypeKind::STRING, orc::PredicateDataType::STRING}, + {orc::TypeKind::BINARY, orc::PredicateDataType::STRING}, + {orc::TypeKind::CHAR, orc::PredicateDataType::STRING}, + {orc::TypeKind::VARCHAR, orc::PredicateDataType::STRING}, + {orc::TypeKind::DATE, orc::PredicateDataType::DATE}, + {orc::TypeKind::DECIMAL, orc::PredicateDataType::DECIMAL}, + {orc::TypeKind::TIMESTAMP, orc::PredicateDataType::TIMESTAMP}, + {orc::TypeKind::BOOLEAN, orc::PredicateDataType::BOOLEAN}}; + +template +static std::tuple convert_to_orc_literal(const orc::Type* type, + const void* value, int precision, + int scale) { + try { + switch (type->getKind()) { + case orc::TypeKind::BOOLEAN: + return std::make_tuple(true, orc::Literal(bool(*((uint8_t*)value)))); + case orc::TypeKind::BYTE: + return std::make_tuple(true, orc::Literal(int64_t(*((int8_t*)value)))); + case orc::TypeKind::SHORT: + return std::make_tuple(true, orc::Literal(int64_t(*((int16_t*)value)))); + case orc::TypeKind::INT: + return std::make_tuple(true, orc::Literal(int64_t(*((int32_t*)value)))); + case orc::TypeKind::LONG: + return std::make_tuple(true, orc::Literal(*((int64_t*)value))); + case orc::TypeKind::FLOAT: + return std::make_tuple(true, orc::Literal(double(*((float*)value)))); + case orc::TypeKind::DOUBLE: + return std::make_tuple(true, orc::Literal(*((double*)value))); + case orc::TypeKind::STRING: + case orc::TypeKind::BINARY: + case orc::TypeKind::CHAR: + case orc::TypeKind::VARCHAR: { + StringValue* string_value = (StringValue*)value; + return std::make_tuple(true, orc::Literal(string_value->ptr, string_value->len)); + } + case orc::TypeKind::DECIMAL: { + int128_t decimal_value; + if constexpr (std::is_same_v) { + decimal_value = *reinterpret_cast(value); + precision = DecimalV2Value::PRECISION; + scale = DecimalV2Value::SCALE; + } else if constexpr (std::is_same_v) { + decimal_value = *((int32_t*)value); + } else if constexpr (std::is_same_v) { + decimal_value = *((int64_t*)value); + } else { + decimal_value = *((int128_t*)value); + } + return std::make_tuple(true, orc::Literal(orc::Int128(uint64_t(decimal_value >> 64), + uint64_t(decimal_value)), + precision, scale)); + } + case orc::TypeKind::DATE: { + int64_t day_offset; + static const cctz::time_zone utc0 = cctz::utc_time_zone(); + if constexpr (std::is_same_v) { + const DateTimeValue date_v1 = *reinterpret_cast(value); + cctz::civil_day civil_date(date_v1.year(), date_v1.month(), date_v1.day()); + day_offset = + cctz::convert(civil_date, utc0).time_since_epoch().count() / (24 * 60 * 60); + } else { + const DateV2Value date_v2 = + *reinterpret_cast*>(value); + cctz::civil_day civil_date(date_v2.year(), date_v2.month(), date_v2.day()); + day_offset = + cctz::convert(civil_date, utc0).time_since_epoch().count() / (24 * 60 * 60); + } + return std::make_tuple(true, orc::Literal(orc::PredicateDataType::DATE, day_offset)); + } + case orc::TypeKind::TIMESTAMP: { + int64_t seconds; + int32_t nanos; + static const cctz::time_zone utc0 = cctz::utc_time_zone(); + // TODO: ColumnValueRange has lost the precision of microsecond + if constexpr (std::is_same_v) { + const DateTimeValue datetime_v1 = *reinterpret_cast(value); + cctz::civil_second civil_seconds(datetime_v1.year(), datetime_v1.month(), + datetime_v1.day(), datetime_v1.hour(), + datetime_v1.minute(), datetime_v1.second()); + seconds = cctz::convert(civil_seconds, utc0).time_since_epoch().count(); + nanos = datetime_v1.microsecond() * 1000; + } else { + const DateV2Value datetime_v2 = + *reinterpret_cast*>(value); + cctz::civil_second civil_seconds(datetime_v2.year(), datetime_v2.month(), + datetime_v2.day(), datetime_v2.hour(), + datetime_v2.minute(), datetime_v2.second()); + seconds = cctz::convert(civil_seconds, utc0).time_since_epoch().count(); + nanos = datetime_v2.microsecond() * 1000; + } + return std::make_tuple(true, orc::Literal(seconds, nanos)); + } + default: + return std::make_tuple(false, orc::Literal(false)); + } + } catch (Exception& e) { + // When table schema changed, and using new schema to read old data. + LOG(WARNING) << "Failed to convert doris value to orc predicate literal, error = " + << e.what(); + return std::make_tuple(false, orc::Literal(false)); + } +} + +template +static std::vector value_range_to_predicate( + const ColumnValueRange& col_val_range, const orc::Type* type) { + using CppType = typename PrimitiveTypeTraits::CppType; + std::vector predicates; + orc::PredicateDataType predicate_data_type; + auto type_it = TYPEKIND_TO_PREDICATE_TYPE.find(type->getKind()); + if (type_it == TYPEKIND_TO_PREDICATE_TYPE.end()) { + // Unsupported type + return predicates; + } else { + predicate_data_type = type_it->second; + } + + if (col_val_range.is_fixed_value_range()) { + OrcPredicate in_predicate; + in_predicate.col_name = col_val_range.column_name(); + in_predicate.data_type = predicate_data_type; + in_predicate.op = SQLFilterOp::FILTER_IN; + for (const auto& value : col_val_range.get_fixed_value_set()) { + auto [valid, literal] = convert_to_orc_literal( + type, &value, col_val_range.precision(), col_val_range.scale()); + if (valid) { + in_predicate.literals.push_back(literal); + } + } + if (!in_predicate.literals.empty()) { + predicates.emplace_back(in_predicate); + } + return predicates; + } + + const auto& high_value = col_val_range.get_range_max_value(); + const auto& low_value = col_val_range.get_range_min_value(); + const auto& high_op = col_val_range.get_range_high_op(); + const auto& low_op = col_val_range.get_range_low_op(); + + // orc can only push down is_null. When col_value_range._contain_null = true, only indicating that + // value can be null, not equals null, so ignore _contain_null in col_value_range + if (col_val_range.is_high_value_maximum() && high_op == SQLFilterOp::FILTER_LESS_OR_EQUAL && + col_val_range.is_low_value_mininum() && low_op == SQLFilterOp::FILTER_LARGER_OR_EQUAL) { + return predicates; + } + + if (low_value < high_value) { + if (!col_val_range.is_low_value_mininum() || + SQLFilterOp::FILTER_LARGER_OR_EQUAL != low_op) { + auto [valid, low_literal] = convert_to_orc_literal( + type, &low_value, col_val_range.precision(), col_val_range.scale()); + if (valid) { + OrcPredicate low_predicate; + low_predicate.col_name = col_val_range.column_name(); + low_predicate.data_type = predicate_data_type; + low_predicate.op = low_op; + low_predicate.literals.emplace_back(low_literal); + predicates.emplace_back(low_predicate); + } + } + if (!col_val_range.is_high_value_maximum() || + SQLFilterOp::FILTER_LESS_OR_EQUAL != high_op) { + auto [valid, high_literal] = convert_to_orc_literal( + type, &high_value, col_val_range.precision(), col_val_range.scale()); + if (valid) { + OrcPredicate high_predicate; + high_predicate.col_name = col_val_range.column_name(); + high_predicate.data_type = predicate_data_type; + high_predicate.op = high_op; + high_predicate.literals.emplace_back(high_literal); + predicates.emplace_back(high_predicate); + } + } + } + return predicates; +} + +bool static build_search_argument(std::vector& predicates, int index, + std::unique_ptr& builder) { + if (index >= predicates.size()) { + return false; + } + if (index < predicates.size() - 1) { + builder->startAnd(); + } + OrcPredicate& predicate = predicates[index]; + switch (predicate.op) { + case SQLFilterOp::FILTER_IN: { + if (predicate.literals.size() == 1) { + builder->equals(predicate.col_name, predicate.data_type, predicate.literals[0]); + } else { + builder->in(predicate.col_name, predicate.data_type, predicate.literals); + } + break; + } + case SQLFilterOp::FILTER_LESS: + builder->lessThan(predicate.col_name, predicate.data_type, predicate.literals[0]); + break; + case SQLFilterOp::FILTER_LESS_OR_EQUAL: + builder->lessThanEquals(predicate.col_name, predicate.data_type, predicate.literals[0]); + break; + case SQLFilterOp::FILTER_LARGER: { + builder->startNot(); + builder->lessThanEquals(predicate.col_name, predicate.data_type, predicate.literals[0]); + builder->end(); + break; + } + case SQLFilterOp::FILTER_LARGER_OR_EQUAL: { + builder->startNot(); + builder->lessThan(predicate.col_name, predicate.data_type, predicate.literals[0]); + builder->end(); + break; + } + default: + return false; + } + if (index < predicates.size() - 1) { + bool can_build = build_search_argument(predicates, index + 1, builder); + if (!can_build) { + return false; + } + builder->end(); + } + return true; +} + void OrcReader::_init_search_argument( std::unordered_map* colname_to_value_range) { + if (colname_to_value_range->empty()) { + return; + } + std::vector predicates; + auto& root_type = _reader->getType(); + std::unordered_map type_map; + for (int i = 0; i < root_type.getSubtypeCount(); ++i) { + type_map.emplace(root_type.getFieldName(i), root_type.getSubtype(i)); + } + for (auto it = colname_to_value_range->begin(); it != colname_to_value_range->end(); ++it) { + auto type_it = type_map.find(it->first); + if (type_it != type_map.end()) { + std::visit( + [&](auto& range) { + std::vector value_predicates = + value_range_to_predicate(range, type_it->second); + for (auto& range_predicate : value_predicates) { + predicates.emplace_back(range_predicate); + } + }, + it->second); + } + } + if (predicates.empty()) { + return; + } std::unique_ptr builder = orc::SearchArgumentFactory::newBuilder(); - // generate min-max search argument - _row_reader_options.searchArgument(builder->build()); + if (build_search_argument(predicates, 0, builder)) { + std::unique_ptr sargs = builder->build(); + _row_reader_options.searchArgument(std::move(sargs)); + } } void OrcReader::_init_bloom_filter( @@ -217,6 +492,7 @@ TypeDescriptor OrcReader::_convert_to_doris_type(const orc::Type* orc_type) { case orc::TypeKind::TIMESTAMP: return TypeDescriptor(PrimitiveType::TYPE_DATETIMEV2); case orc::TypeKind::DECIMAL: + // TODO: using decimal v3 instead return TypeDescriptor(PrimitiveType::TYPE_DECIMALV2); case orc::TypeKind::DATE: return TypeDescriptor(PrimitiveType::TYPE_DATEV2); diff --git a/be/src/vec/exec/format/parquet/parquet_common.h b/be/src/vec/exec/format/parquet/parquet_common.h index 075a7876f8..21131379d2 100644 --- a/be/src/vec/exec/format/parquet/parquet_common.h +++ b/be/src/vec/exec/format/parquet/parquet_common.h @@ -266,6 +266,7 @@ Status FixLengthDecoder::_decode_datetime64(MutableColumnPtr& doris_column, Type if constexpr (std::is_same_v>) { // nanoseconds will be ignored. v.set_microsecond((date_value % _decode_params->second_mask) * scale_to_micro); + // TODO: the precision of datetime v1 } _FIXED_SHIFT_DATA_OFFSET(); }