cherry-pick #45484
This commit is contained in:
@ -143,7 +143,7 @@ void ORCFileInputStream::read(void* buf, uint64_t length, uint64_t offset) {
|
||||
OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state,
|
||||
const TFileScanRangeParams& params, const TFileRangeDesc& range,
|
||||
size_t batch_size, const std::string& ctz, io::IOContext* io_ctx,
|
||||
bool enable_lazy_mat, std::vector<orc::TypeKind>* unsupported_pushdown_types)
|
||||
bool enable_lazy_mat)
|
||||
: _profile(profile),
|
||||
_state(state),
|
||||
_scan_params(params),
|
||||
@ -156,8 +156,7 @@ OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state,
|
||||
_enable_lazy_mat(enable_lazy_mat),
|
||||
_enable_filter_by_min_max(
|
||||
state == nullptr ? true : state->query_options().enable_orc_filter_by_min_max),
|
||||
_dict_cols_has_converted(false),
|
||||
_unsupported_pushdown_types(unsupported_pushdown_types) {
|
||||
_dict_cols_has_converted(false) {
|
||||
TimezoneUtils::find_cctz_time_zone(ctz, _time_zone);
|
||||
VecDateTimeValue t;
|
||||
t.from_unixtime(0, ctz);
|
||||
@ -453,7 +452,8 @@ static std::unordered_map<orc::TypeKind, orc::PredicateDataType> TYPEKIND_TO_PRE
|
||||
{orc::TypeKind::DOUBLE, orc::PredicateDataType::FLOAT},
|
||||
{orc::TypeKind::STRING, orc::PredicateDataType::STRING},
|
||||
{orc::TypeKind::BINARY, orc::PredicateDataType::STRING},
|
||||
{orc::TypeKind::CHAR, orc::PredicateDataType::STRING},
|
||||
// should not pust down CHAR type, because CHAR type is fixed length and will be padded
|
||||
// {orc::TypeKind::CHAR, orc::PredicateDataType::STRING},
|
||||
{orc::TypeKind::VARCHAR, orc::PredicateDataType::STRING},
|
||||
{orc::TypeKind::DATE, orc::PredicateDataType::DATE},
|
||||
{orc::TypeKind::DECIMAL, orc::PredicateDataType::DECIMAL},
|
||||
@ -483,8 +483,9 @@ std::tuple<bool, orc::Literal> convert_to_orc_literal(const orc::Type* type, con
|
||||
[[fallthrough]];
|
||||
case orc::TypeKind::BINARY:
|
||||
[[fallthrough]];
|
||||
case orc::TypeKind::CHAR:
|
||||
[[fallthrough]];
|
||||
// should not pust down CHAR type, because CHAR type is fixed length and will be padded
|
||||
// case orc::TypeKind::CHAR:
|
||||
// [[fallthrough]];
|
||||
case orc::TypeKind::VARCHAR: {
|
||||
StringRef* string_value = (StringRef*)value;
|
||||
return std::make_tuple(true, orc::Literal(string_value->data, string_value->size));
|
||||
@ -560,8 +561,7 @@ std::tuple<bool, orc::Literal> convert_to_orc_literal(const orc::Type* type, con
|
||||
|
||||
template <PrimitiveType primitive_type>
|
||||
std::vector<OrcPredicate> value_range_to_predicate(
|
||||
const ColumnValueRange<primitive_type>& col_val_range, const orc::Type* type,
|
||||
std::vector<orc::TypeKind>* unsupported_pushdown_types) {
|
||||
const ColumnValueRange<primitive_type>& col_val_range, const orc::Type* type) {
|
||||
std::vector<OrcPredicate> predicates;
|
||||
|
||||
PrimitiveType src_type = OrcReader::convert_to_doris_type(type).type;
|
||||
@ -572,16 +572,6 @@ std::vector<OrcPredicate> value_range_to_predicate(
|
||||
}
|
||||
}
|
||||
|
||||
if (unsupported_pushdown_types != nullptr) {
|
||||
for (vector<orc::TypeKind>::iterator it = unsupported_pushdown_types->begin();
|
||||
it != unsupported_pushdown_types->end(); ++it) {
|
||||
if (*it == type->getKind()) {
|
||||
// Unsupported type
|
||||
return predicates;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
orc::PredicateDataType predicate_data_type;
|
||||
auto type_it = TYPEKIND_TO_PREDICATE_TYPE.find(type->getKind());
|
||||
if (type_it == TYPEKIND_TO_PREDICATE_TYPE.end()) {
|
||||
@ -723,8 +713,8 @@ bool OrcReader::_init_search_argument(
|
||||
}
|
||||
std::visit(
|
||||
[&](auto& range) {
|
||||
std::vector<OrcPredicate> value_predicates = value_range_to_predicate(
|
||||
range, type_it->second, _unsupported_pushdown_types);
|
||||
std::vector<OrcPredicate> value_predicates =
|
||||
value_range_to_predicate(range, type_it->second);
|
||||
for (auto& range_predicate : value_predicates) {
|
||||
predicates.emplace_back(range_predicate);
|
||||
}
|
||||
|
||||
@ -133,8 +133,7 @@ public:
|
||||
|
||||
OrcReader(RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params,
|
||||
const TFileRangeDesc& range, size_t batch_size, const std::string& ctz,
|
||||
io::IOContext* io_ctx, bool enable_lazy_mat = true,
|
||||
std::vector<orc::TypeKind>* unsupported_pushdown_types = nullptr);
|
||||
io::IOContext* io_ctx, bool enable_lazy_mat = true);
|
||||
|
||||
OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& range,
|
||||
const std::string& ctz, io::IOContext* io_ctx, bool enable_lazy_mat = true);
|
||||
@ -619,7 +618,6 @@ private:
|
||||
std::unique_ptr<StringDictFilterImpl> _string_dict_filter;
|
||||
bool _dict_cols_has_converted = false;
|
||||
bool _has_complex_type = false;
|
||||
std::vector<orc::TypeKind>* _unsupported_pushdown_types;
|
||||
|
||||
// resolve schema change
|
||||
std::unordered_map<std::string, std::unique_ptr<converter::ColumnTypeConverter>> _converters;
|
||||
|
||||
@ -886,17 +886,9 @@ Status VFileScanner::_get_next_reader() {
|
||||
break;
|
||||
}
|
||||
case TFileFormatType::FORMAT_ORC: {
|
||||
std::vector<orc::TypeKind>* unsupported_pushdown_types = nullptr;
|
||||
if (range.__isset.table_format_params &&
|
||||
range.table_format_params.table_format_type == "paimon") {
|
||||
static std::vector<orc::TypeKind> paimon_unsupport_type =
|
||||
std::vector<orc::TypeKind> {orc::TypeKind::CHAR};
|
||||
unsupported_pushdown_types = &paimon_unsupport_type;
|
||||
}
|
||||
std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
|
||||
_profile, _state, *_params, range, _state->query_options().batch_size,
|
||||
_state->timezone(), _io_ctx.get(), _state->query_options().enable_orc_lazy_mat,
|
||||
unsupported_pushdown_types);
|
||||
_state->timezone(), _io_ctx.get(), _state->query_options().enable_orc_lazy_mat);
|
||||
orc_reader->set_push_down_agg_type(_get_push_down_agg_type());
|
||||
if (push_down_predicates) {
|
||||
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
|
||||
|
||||
@ -0,0 +1,16 @@
|
||||
CREATE DATABASE IF NOT EXISTS multi_catalog;
|
||||
USE multi_catalog;
|
||||
|
||||
create table fixed_char_table (
|
||||
i int,
|
||||
c char(2)
|
||||
) stored as orc;
|
||||
|
||||
insert into fixed_char_table values(1,'a'),(2,'b '), (3,'cd');
|
||||
|
||||
create table type_changed_table (
|
||||
id int,
|
||||
name string
|
||||
) stored as orc;
|
||||
insert into type_changed_table values (1, 'Alice'), (2, 'Bob'), (3, 'Charlie');
|
||||
ALTER TABLE type_changed_table CHANGE COLUMN id id STRING;
|
||||
@ -0,0 +1,9 @@
|
||||
#!/bin/bash
|
||||
set -x
|
||||
|
||||
CUR_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)"
|
||||
|
||||
# create table
|
||||
hive -f "${CUR_DIR}"/orc_predicate_table.hql
|
||||
|
||||
|
||||
@ -0,0 +1,29 @@
|
||||
-- This file is automatically generated. You should know what you did if you want to edit this
|
||||
-- !predicate_fixed_char1 --
|
||||
1 a
|
||||
|
||||
-- !predicate_fixed_char2 --
|
||||
|
||||
-- !predicate_changed_type1 --
|
||||
1 Alice
|
||||
|
||||
-- !predicate_changed_type2 --
|
||||
2 Bob
|
||||
|
||||
-- !predicate_changed_type3 --
|
||||
3 Charlie
|
||||
|
||||
-- !predicate_fixed_char1 --
|
||||
1 a
|
||||
|
||||
-- !predicate_fixed_char2 --
|
||||
|
||||
-- !predicate_changed_type1 --
|
||||
1 Alice
|
||||
|
||||
-- !predicate_changed_type2 --
|
||||
2 Bob
|
||||
|
||||
-- !predicate_changed_type3 --
|
||||
3 Charlie
|
||||
|
||||
@ -0,0 +1,50 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite("test_hive_orc_predicate", "p0,external,hive,external_docker,external_docker_hive") {
|
||||
|
||||
String enabled = context.config.otherConfigs.get("enableHiveTest")
|
||||
if (enabled == null || !enabled.equalsIgnoreCase("true")) {
|
||||
logger.info("disable Hive test.")
|
||||
return;
|
||||
}
|
||||
|
||||
for (String hivePrefix : ["hive2", "hive3"]) {
|
||||
try {
|
||||
String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort")
|
||||
String catalog_name = "${hivePrefix}_test_predicate"
|
||||
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
|
||||
|
||||
sql """drop catalog if exists ${catalog_name}"""
|
||||
sql """create catalog if not exists ${catalog_name} properties (
|
||||
"type"="hms",
|
||||
'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}'
|
||||
);"""
|
||||
sql """use `${catalog_name}`.`multi_catalog`"""
|
||||
|
||||
qt_predicate_fixed_char1 """ select * from fixed_char_table where c = 'a';"""
|
||||
qt_predicate_fixed_char2 """ select * from fixed_char_table where c = 'a ';"""
|
||||
|
||||
qt_predicate_changed_type1 """ select * from type_changed_table where id = '1';"""
|
||||
qt_predicate_changed_type2 """ select * from type_changed_table where id = '2';"""
|
||||
qt_predicate_changed_type3 """ select * from type_changed_table where id = '3';"""
|
||||
|
||||
sql """drop catalog if exists ${catalog_name}"""
|
||||
} finally {
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user