diff --git a/be/src/runtime/define_primitive_type.h b/be/src/runtime/define_primitive_type.h index e7a246c1e7..0ecacb9234 100644 --- a/be/src/runtime/define_primitive_type.h +++ b/be/src/runtime/define_primitive_type.h @@ -17,8 +17,14 @@ #pragma once +#include + +#include "gutil/integral_types.h" namespace doris { -enum PrimitiveType { + +using PrimitiveNative = uint8_t; + +enum PrimitiveType : PrimitiveNative { INVALID_TYPE = 0, TYPE_NULL, /* 1 */ TYPE_BOOLEAN, /* 2 */ @@ -59,4 +65,6 @@ enum PrimitiveType { TYPE_AGG_STATE, /* 34 */ }; -} +constexpr PrimitiveNative BEGIN_OF_PRIMITIVE_TYPE = INVALID_TYPE; +constexpr PrimitiveNative END_OF_PRIMITIVE_TYPE = TYPE_AGG_STATE; +} // namespace doris diff --git a/be/src/runtime/primitive_type.h b/be/src/runtime/primitive_type.h index b5a9326183..a3745f8ce8 100644 --- a/be/src/runtime/primitive_type.h +++ b/be/src/runtime/primitive_type.h @@ -23,6 +23,7 @@ #include #include +#include #include "olap/decimal12.h" #include "runtime/define_primitive_type.h" @@ -31,6 +32,7 @@ #include "vec/columns/columns_number.h" #include "vec/core/types.h" #include "vec/runtime/vdatetime_value.h" +#include "vec/utils/template_helpers.hpp" namespace doris { @@ -98,11 +100,6 @@ constexpr bool is_int_or_bool(PrimitiveType type) { type == TYPE_INT || type == TYPE_BIGINT || type == TYPE_LARGEINT; } -constexpr bool has_variable_type(PrimitiveType type) { - return type == TYPE_CHAR || type == TYPE_VARCHAR || type == TYPE_OBJECT || - type == TYPE_QUANTILE_STATE || type == TYPE_STRING; -} - bool is_type_compatible(PrimitiveType lhs, PrimitiveType rhs); PrimitiveType thrift_to_type(TPrimitiveType::type ttype); @@ -287,20 +284,28 @@ struct VecPrimitiveTypeTraits { using ColumnType = vectorized::ColumnVector; }; -template <> -struct VecPrimitiveTypeTraits { - using CppType = vectorized::Decimal32; - using ColumnType = vectorized::ColumnDecimal; -}; -template <> -struct VecPrimitiveTypeTraits { - using CppType = vectorized::Decimal64; - using ColumnType = vectorized::ColumnDecimal; -}; -template <> -struct VecPrimitiveTypeTraits { - using CppType = vectorized::Decimal128I; - using ColumnType = vectorized::ColumnDecimal; +template +concept HaveCppType = requires() { sizeof(typename Traits::CppType); }; + +template +struct PrimitiveTypeSizeReducer { + template + static size_t get_size() { + return sizeof(typename Traits::CppType); + } + template + static size_t get_size() { + return 0; + } + + static void run(size_t& size) { size = get_size>(); } }; +inline size_t get_primitive_type_size(PrimitiveType t) { + size_t size = 0; + vectorized::constexpr_loop_match::run(t, size); + return size; +} + } // namespace doris diff --git a/be/src/vec/aggregate_functions/aggregate_function_approx_count_distinct.cpp b/be/src/vec/aggregate_functions/aggregate_function_approx_count_distinct.cpp index 3580ba72e8..8add4627f2 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_approx_count_distinct.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_approx_count_distinct.cpp @@ -26,7 +26,7 @@ #include "vec/columns/column_struct.h" #include "vec/data_types/data_type.h" #include "vec/data_types/data_type_nullable.h" -#include "vec/utils/template_helpers.hpp" +#include "vec/functions/function.h" namespace doris::vectorized { diff --git a/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h b/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h index 54d146ca37..e3c8105684 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h +++ b/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h @@ -20,14 +20,17 @@ #include "factory_helpers.h" #include "vec/aggregate_functions/aggregate_function.h" #include "vec/aggregate_functions/helpers.h" +#include "vec/columns/column_array.h" +#include "vec/columns/column_map.h" #include "vec/columns/column_nullable.h" +#include "vec/columns/column_struct.h" #include "vec/columns/column_vector.h" #include "vec/data_types/data_type_decimal.h" #include "vec/data_types/data_type_nullable.h" #include "vec/data_types/data_type_number.h" #include "vec/data_types/data_type_string.h" +#include "vec/functions/function.h" #include "vec/io/io_helper.h" -#include "vec/utils/template_helpers.hpp" namespace doris::vectorized { diff --git a/be/src/vec/common/hash_table/hash_map_util.h b/be/src/vec/common/hash_table/hash_map_util.h new file mode 100644 index 0000000000..7d48026699 --- /dev/null +++ b/be/src/vec/common/hash_table/hash_map_util.h @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "vec/aggregate_functions/aggregate_function.h" + +namespace doris::vectorized { + +enum class HashKeyType { + EMPTY = 0, + without_key, + serialized, + int8_key, + int16_key, + int32_key, + int32_key_phase2, + int64_key, + int64_key_phase2, + int128_key, + int128_key_phase2, + int64_keys, + int64_keys_phase2, + int128_keys, + int128_keys_phase2, + int256_keys, + int256_keys_phase2, + string_key, + int136_keys, + int136_keys_phase2, +}; + +inline HashKeyType get_hash_key_type_with_phase(HashKeyType t, bool phase2) { + if (!phase2) { + return t; + } + if (t == HashKeyType::int32_key) { + return HashKeyType::int32_key_phase2; + } + if (t == HashKeyType::int64_key) { + return HashKeyType::int64_key_phase2; + } + if (t == HashKeyType::int128_keys) { + return HashKeyType::int128_keys_phase2; + } + if (t == HashKeyType::int136_keys) { + return HashKeyType::int136_keys_phase2; + } + if (t == HashKeyType::int256_keys) { + return HashKeyType::int256_keys_phase2; + } + return t; +} + +template typename MethodNullable, + template typename MethodOneNumber, + template typename MethodFixed, template typename DataNullable> +struct DataVariants { + DataVariants() = default; + DataVariants(const DataVariants&) = delete; + DataVariants& operator=(const DataVariants&) = delete; + MethodVariants method_variant; + + using Type = HashKeyType; + + Type _type = Type::EMPTY; + + template + void emplace_single() { + if (nullable) { + method_variant.template emplace>>>(); + } else { + method_variant.template emplace>(); + } + } + + template + void emplace_fixed() { + method_variant.template emplace>(); + } +}; +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/common/schema_util.cpp b/be/src/vec/common/schema_util.cpp index adabedb440..55edd72c13 100644 --- a/be/src/vec/common/schema_util.cpp +++ b/be/src/vec/common/schema_util.cpp @@ -52,7 +52,6 @@ #include "vec/core/column_numbers.h" #include "vec/core/column_with_type_and_name.h" #include "vec/core/field.h" -#include "vec/core/names.h" #include "vec/core/types.h" #include "vec/data_types/data_type.h" #include "vec/data_types/data_type_array.h" @@ -305,7 +304,7 @@ Status unfold_object(size_t dynamic_col_position, Block& block, bool cast_to_ori CHECK(column_object_ptr->is_finalized()); Columns subcolumns; DataTypes types; - Names names; + std::vector names; std::unordered_set static_column_names; // extract columns from dynamic column diff --git a/be/src/vec/common/uint128.h b/be/src/vec/common/uint128.h index 60324d7e35..597ad2f775 100644 --- a/be/src/vec/common/uint128.h +++ b/be/src/vec/common/uint128.h @@ -105,31 +105,6 @@ struct UInt128 { } }; -template -bool inline operator==(T a, const UInt128 b) { - return UInt128(a) == b; -} -template -bool inline operator!=(T a, const UInt128 b) { - return UInt128(a) != b; -} -template -bool inline operator>=(T a, const UInt128 b) { - return UInt128(a) >= b; -} -template -bool inline operator>(T a, const UInt128 b) { - return UInt128(a) > b; -} -template -bool inline operator<=(T a, const UInt128 b) { - return UInt128(a) <= b; -} -template -bool inline operator<(T a, const UInt128 b) { - return UInt128(a) < b; -} - template <> inline constexpr bool IsNumber = true; template <> @@ -179,10 +154,7 @@ struct UInt256 { return a == rhs.a && b == rhs.b && c == rhs.c && d == rhs.d; } - bool operator!=(const UInt256 rhs) const { return !operator==(rhs); } - bool operator==(const UInt64 rhs) const { return a == rhs && b == 0 && c == 0 && d == 0; } - bool operator!=(const UInt64 rhs) const { return !operator==(rhs); } UInt256& operator=(const UInt64 rhs) { a = rhs; @@ -200,8 +172,6 @@ struct UInt136 { UInt64 c; bool operator==(const UInt136 rhs) const { return a == rhs.a && b == rhs.b && c == rhs.c; } - - bool operator!=(const UInt136 rhs) const { return !operator==(rhs); } }; #pragma pack() diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 5b89077bd3..8342ba98f2 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -624,8 +624,8 @@ const ColumnsWithTypeAndName& Block::get_columns_with_type_and_name() const { return data; } -Names Block::get_names() const { - Names res; +std::vector Block::get_names() const { + std::vector res; res.reserve(columns()); for (const auto& elem : data) { diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index bc794a8ff8..900969bda3 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -41,7 +41,6 @@ #include "vec/columns/column_nullable.h" #include "vec/core/column_with_type_and_name.h" #include "vec/core/columns_with_type_and_name.h" -#include "vec/core/names.h" #include "vec/core/types.h" #include "vec/data_types/data_type.h" #include "vec/data_types/data_type_nullable.h" @@ -197,7 +196,7 @@ public: const ColumnsWithTypeAndName& get_columns_with_type_and_name() const; - Names get_names() const; + std::vector get_names() const; DataTypes get_data_types() const; DataTypePtr get_data_type(size_t index) const { @@ -410,7 +409,7 @@ class MutableBlock { private: MutableColumns _columns; DataTypes _data_types; - Names _names; + std::vector _names; using IndexByName = phmap::flat_hash_map; IndexByName index_by_name; @@ -609,7 +608,7 @@ public: return res; } - Names& get_names() { return _names; } + std::vector& get_names() { return _names; } bool has(const std::string& name) const; diff --git a/be/src/vec/core/names.h b/be/src/vec/core/names.h deleted file mode 100644 index 33d8306845..0000000000 --- a/be/src/vec/core/names.h +++ /dev/null @@ -1,40 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -// This file is copied from -// https://github.com/ClickHouse/ClickHouse/blob/master/src/Core/Names.h -// and modified by Doris - -#pragma once - -#include -#include -#include -#include -#include - -namespace doris::vectorized { - -using Names = std::vector; -using NameSet = std::unordered_set; -using NameOrderedSet = std::set; -using NameToNameMap = std::unordered_map; -using NameToNameSetMap = std::unordered_map; - -using NameWithAlias = std::pair; -using NamesWithAliases = std::vector; - -} // namespace doris::vectorized diff --git a/be/src/vec/exec/distinct_vaggregation_node.cpp b/be/src/vec/exec/distinct_vaggregation_node.cpp index f44b0c6e36..e0dca18d39 100644 --- a/be/src/vec/exec/distinct_vaggregation_node.cpp +++ b/be/src/vec/exec/distinct_vaggregation_node.cpp @@ -125,7 +125,7 @@ void DistinctAggregationNode::_emplace_into_hash_table_to_distinct(IColumn::Sele } COUNTER_UPDATE(_hash_table_input_counter, num_rows); }, - _agg_data->_aggregated_method_variant); + _agg_data->method_variant); } } // namespace doris::vectorized diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index e1f91ab63a..c127c754f1 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -28,11 +28,13 @@ #include #include +#include "common/status.h" #include "exec/exec_node.h" #include "runtime/block_spill_manager.h" #include "runtime/define_primitive_type.h" #include "runtime/descriptors.h" #include "runtime/memory/mem_tracker.h" +#include "runtime/primitive_type.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" #include "util/telemetry/telemetry.h" @@ -182,78 +184,59 @@ Status AggregationNode::init(const TPlanNode& tnode, RuntimeState* state) { void AggregationNode::_init_hash_method(const VExprContextSPtrs& probe_exprs) { DCHECK(probe_exprs.size() >= 1); + + using Type = AggregatedDataVariants::Type; + Type t(Type::serialized); + if (probe_exprs.size() == 1) { auto is_nullable = probe_exprs[0]->root()->is_nullable(); - switch (probe_exprs[0]->root()->result_type()) { + PrimitiveType type = probe_exprs[0]->root()->result_type(); + switch (type) { case TYPE_TINYINT: case TYPE_BOOLEAN: - _agg_data->init(AggregatedDataVariants::Type::int8_key, is_nullable); - return; case TYPE_SMALLINT: - _agg_data->init(AggregatedDataVariants::Type::int16_key, is_nullable); - return; case TYPE_INT: case TYPE_FLOAT: case TYPE_DATEV2: - if (_is_first_phase) - _agg_data->init(AggregatedDataVariants::Type::int32_key, is_nullable); - else - _agg_data->init(AggregatedDataVariants::Type::int32_key_phase2, is_nullable); - return; case TYPE_BIGINT: case TYPE_DOUBLE: case TYPE_DATE: case TYPE_DATETIME: case TYPE_DATETIMEV2: - if (_is_first_phase) - _agg_data->init(AggregatedDataVariants::Type::int64_key, is_nullable); - else - _agg_data->init(AggregatedDataVariants::Type::int64_key_phase2, is_nullable); - return; - case TYPE_LARGEINT: { - if (_is_first_phase) - _agg_data->init(AggregatedDataVariants::Type::int128_key, is_nullable); - else - _agg_data->init(AggregatedDataVariants::Type::int128_key_phase2, is_nullable); - return; - } + case TYPE_LARGEINT: case TYPE_DECIMALV2: case TYPE_DECIMAL32: case TYPE_DECIMAL64: case TYPE_DECIMAL128I: { - DataTypePtr& type_ptr = probe_exprs[0]->root()->data_type(); - TypeIndex idx = is_nullable ? assert_cast(*type_ptr) - .get_nested_type() - ->get_type_id() - : type_ptr->get_type_id(); - WhichDataType which(idx); - if (which.is_decimal32()) { - if (_is_first_phase) - _agg_data->init(AggregatedDataVariants::Type::int32_key, is_nullable); - else - _agg_data->init(AggregatedDataVariants::Type::int32_key_phase2, is_nullable); - } else if (which.is_decimal64()) { - if (_is_first_phase) - _agg_data->init(AggregatedDataVariants::Type::int64_key, is_nullable); - else - _agg_data->init(AggregatedDataVariants::Type::int64_key_phase2, is_nullable); + size_t size = get_primitive_type_size(type); + if (size == 1) { + t = Type::int8_key; + } else if (size == 2) { + t = Type::int16_key; + } else if (size == 4) { + t = Type::int32_key; + } else if (size == 8) { + t = Type::int64_key; + } else if (size == 16) { + t = Type::int128_key; } else { - if (_is_first_phase) - _agg_data->init(AggregatedDataVariants::Type::int128_key, is_nullable); - else - _agg_data->init(AggregatedDataVariants::Type::int128_key_phase2, is_nullable); + throw Exception(ErrorCode::INTERNAL_ERROR, + "meet invalid type size, size={}, type={}", size, + type_to_string(type)); } - return; + break; } case TYPE_CHAR: case TYPE_VARCHAR: case TYPE_STRING: { - _agg_data->init(AggregatedDataVariants::Type::string_key, is_nullable); + t = Type::string_key; break; } default: - _agg_data->init(AggregatedDataVariants::Type::serialized); + t = Type::serialized; } + + _agg_data->init(get_hash_key_type_with_phase(t, !_is_first_phase), is_nullable); } else { bool use_fixed_key = true; bool has_null = false; @@ -286,33 +269,17 @@ void AggregationNode::_init_hash_method(const VExprContextSPtrs& probe_exprs) { if (use_fixed_key) { if (bitmap_size + key_byte_size <= sizeof(UInt64)) { - if (_is_first_phase) { - _agg_data->init(AggregatedDataVariants::Type::int64_keys, has_null); - } else { - _agg_data->init(AggregatedDataVariants::Type::int64_keys_phase2, has_null); - } + t = Type::int64_keys; } else if (bitmap_size + key_byte_size <= sizeof(UInt128)) { - if (_is_first_phase) { - _agg_data->init(AggregatedDataVariants::Type::int128_keys, has_null); - } else { - _agg_data->init(AggregatedDataVariants::Type::int128_keys_phase2, has_null); - } + t = Type::int128_keys; } else if (bitmap_size + key_byte_size <= sizeof(UInt136)) { - if (_is_first_phase) { - _agg_data->init(AggregatedDataVariants::Type::int136_keys, has_null); - } else { - _agg_data->init(AggregatedDataVariants::Type::int136_keys_phase2, has_null); - } + t = Type::int136_keys; } else { - if (_is_first_phase) { - _agg_data->init(AggregatedDataVariants::Type::int256_keys, has_null); - } else { - _agg_data->init(AggregatedDataVariants::Type::int256_keys_phase2, has_null); - } + t = Type::int256_keys; } - + _agg_data->init(get_hash_key_type_with_phase(t, !_is_first_phase), has_null); } else { - _agg_data->init(AggregatedDataVariants::Type::serialized); + _agg_data->init(Type::serialized); } } } @@ -439,7 +406,7 @@ Status AggregationNode::prepare_profile(RuntimeState* state) { _align_aggregate_states) * _align_aggregate_states)); }, - _agg_data->_aggregated_method_variant); + _agg_data->method_variant); if (_is_merge) { _executor.execute = std::bind(&AggregationNode::_merge_with_serialized_key, this, std::placeholders::_1); @@ -518,7 +485,9 @@ Status AggregationNode::open(RuntimeState* state) { RETURN_IF_ERROR(_children[0]->open(state)); // Streaming preaggregations do all processing in GetNext(). - if (_is_streaming_preagg) return Status::OK(); + if (_is_streaming_preagg) { + return Status::OK(); + } bool eos = false; Block block; while (!eos) { @@ -603,8 +572,12 @@ Status AggregationNode::sink(doris::RuntimeState* state, vectorized::Block* in_b } void AggregationNode::release_resource(RuntimeState* state) { - for (auto* aggregate_evaluator : _aggregate_evaluators) aggregate_evaluator->close(state); - if (_executor.close) _executor.close(); + for (auto* aggregate_evaluator : _aggregate_evaluators) { + aggregate_evaluator->close(state); + } + if (_executor.close) { + _executor.close(); + } /// _hash_table_size_counter may be null if prepare failed. if (_hash_table_size_counter) { @@ -612,14 +585,16 @@ void AggregationNode::release_resource(RuntimeState* state) { [&](auto&& agg_method) { COUNTER_SET(_hash_table_size_counter, int64_t(agg_method.data.size())); }, - _agg_data->_aggregated_method_variant); + _agg_data->method_variant); } _release_mem(); ExecNode::release_resource(state); } Status AggregationNode::close(RuntimeState* state) { - if (is_closed()) return Status::OK(); + if (is_closed()) { + return Status::OK(); + } return ExecNode::close(state); } @@ -797,7 +772,9 @@ void AggregationNode::_make_nullable_output_key(Block* block) { } bool AggregationNode::_should_expand_preagg_hash_tables() { - if (!_should_expand_hash_table) return false; + if (!_should_expand_hash_table) { + return false; + } return std::visit( [&](auto&& agg_method) -> bool { @@ -806,7 +783,9 @@ bool AggregationNode::_should_expand_preagg_hash_tables() { std::pair {hash_tbl.get_buffer_size_in_bytes(), hash_tbl.size()}; // Need some rows in tables to have valid statistics. - if (ht_rows == 0) return true; + if (ht_rows == 0) { + return true; + } // Find the appropriate reduction factor in our table for the current hash table sizes. int cache_level = 0; @@ -826,7 +805,9 @@ bool AggregationNode::_should_expand_preagg_hash_tables() { // TODO: workaround for IMPALA-2490: subplan node rows_returned counter may be // inaccurate, which could lead to a divide by zero below. - if (aggregated_input_rows <= 0) return true; + if (aggregated_input_rows <= 0) { + return true; + } // Extrapolate the current reduction factor (r) using the formula // R = 1 + (N / n) * (r - 1), where R is the reduction factor over the full input data @@ -846,7 +827,7 @@ bool AggregationNode::_should_expand_preagg_hash_tables() { _should_expand_hash_table = current_reduction > min_reduction; return _should_expand_hash_table; }, - _agg_data->_aggregated_method_variant); + _agg_data->method_variant); } size_t AggregationNode::_memory_usage() const { @@ -860,7 +841,7 @@ size_t AggregationNode::_memory_usage() const { } usage += agg_method.data.get_buffer_size_in_bytes(); }, - _agg_data->_aggregated_method_variant); + _agg_data->method_variant); if (_agg_arena_pool) { usage += _agg_arena_pool->size(); @@ -901,12 +882,12 @@ Status AggregationNode::_reset_hash_table() { _agg_arena_pool.reset(new Arena); return Status::OK(); }, - _agg_data->_aggregated_method_variant); + _agg_data->method_variant); } size_t AggregationNode::_get_hash_table_size() { return std::visit([&](auto&& agg_method) { return agg_method.data.size(); }, - _agg_data->_aggregated_method_variant); + _agg_data->method_variant); } void AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnRawPtrs& key_columns, @@ -970,6 +951,7 @@ void AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR places); } } else { + SCOPED_TIMER(_hash_table_emplace_timer); for (size_t i = 0; i < num_rows; ++i) { AggregateDataPtr mapped = nullptr; if constexpr (ColumnsHashing::IsSingleNullableColumnMethod< @@ -986,7 +968,7 @@ void AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR COUNTER_UPDATE(_hash_table_input_counter, num_rows); }, - _agg_data->_aggregated_method_variant); + _agg_data->method_variant); } void AggregationNode::_find_in_hash_table(AggregateDataPtr* places, ColumnRawPtrs& key_columns, @@ -1001,7 +983,9 @@ void AggregationNode::_find_in_hash_table(AggregateDataPtr* places, ColumnRawPtr _pre_serialize_key_if_need(state, agg_method, key_columns, num_rows); if constexpr (HashTableTraits::is_phmap) { - if (_hash_values.size() < num_rows) _hash_values.resize(num_rows); + if (_hash_values.size() < num_rows) { + _hash_values.resize(num_rows); + } if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits< AggState>::value) { for (size_t i = 0; i < num_rows; ++i) { @@ -1033,11 +1017,12 @@ void AggregationNode::_find_in_hash_table(AggregateDataPtr* places, ColumnRawPtr if (find_result.is_found()) { places[i] = find_result.get_mapped(); - } else + } else { places[i] = nullptr; + } } }, - _agg_data->_aggregated_method_variant); + _agg_data->method_variant); } Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* in_block, @@ -1139,7 +1124,7 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i } return Status::OK(); }, - _agg_data->_aggregated_method_variant)); + _agg_data->method_variant)); if (!ret_flag) { RETURN_IF_CATCH_EXCEPTION(_emplace_into_hash_table(_places.data(), key_columns, rows)); @@ -1331,7 +1316,7 @@ Status AggregationNode::_try_spill_disk(bool eos) { RETURN_IF_ERROR(_spill_hash_table(agg_method, hash_table)); return _reset_hash_table(); }, - _agg_data->_aggregated_method_variant); + _agg_data->method_variant); } Status AggregationNode::_execute_with_serialized_key(Block* block) { @@ -1464,10 +1449,11 @@ Status AggregationNode::_get_result_with_serialized_key_non_spill(RuntimeState* if (key_columns[0]->size() < state->batch_size()) { key_columns[0]->insert_data(nullptr, 0); auto mapped = agg_method.data.get_null_key_data(); - for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) + for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) { _aggregate_evaluators[i]->insert_result_info( mapped + _offsets_of_aggregate_states[i], value_columns[i].get()); + } *eos = true; } } else { @@ -1475,7 +1461,7 @@ Status AggregationNode::_get_result_with_serialized_key_non_spill(RuntimeState* } } }, - _agg_data->_aggregated_method_variant); + _agg_data->method_variant); if (!mem_reuse) { *block = columns_with_schema; @@ -1612,7 +1598,7 @@ Status AggregationNode::_serialize_with_serialized_key_result_non_spill(RuntimeS } } }, - _agg_data->_aggregated_method_variant); + _agg_data->method_variant); if (!mem_reuse) { ColumnsWithTypeAndName columns_with_schema; @@ -1655,7 +1641,7 @@ void AggregationNode::_update_memusage_with_serialized_key() { _mem_usage_record.used_in_arena = _agg_arena_pool->size() + _aggregate_data_container->memory_usage(); }, - _agg_data->_aggregated_method_variant); + _agg_data->method_variant); } void AggregationNode::_close_with_serialized_key() { @@ -1672,7 +1658,7 @@ void AggregationNode::_close_with_serialized_key() { _destroy_agg_status(data.get_null_key_data()); } }, - _agg_data->_aggregated_method_variant); + _agg_data->method_variant); release_tracker(); } diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h index ef4d910039..a388218c59 100644 --- a/be/src/vec/exec/vaggregation_node.h +++ b/be/src/vec/exec/vaggregation_node.h @@ -51,6 +51,7 @@ #include "vec/common/columns_hashing.h" #include "vec/common/hash_table/fixed_hash_map.h" #include "vec/common/hash_table/hash.h" +#include "vec/common/hash_table/hash_map_util.h" #include "vec/common/hash_table/partitioned_hash_map.h" #include "vec/common/hash_table/ph_hash_map.h" #include "vec/common/hash_table/string_hash_map.h" @@ -222,7 +223,7 @@ struct AggregationMethodStringNoCache { /// For the case where there is one numeric key. /// FieldType is UInt8/16/32/64 for any type with corresponding bit width. -template +template struct AggregationMethodOneNumber { using Data = TData; using Key = typename Data::key_type; @@ -240,7 +241,7 @@ struct AggregationMethodOneNumber { /// To use one `Method` in different threads, use different `State`. using State = ColumnsHashing::HashMethodOneNumber; + false>; static void insert_keys_into_columns(std::vector& keys, MutableColumns& key_columns, const size_t num_rows, const Sizes&) { @@ -482,219 +483,89 @@ using AggregatedMethodVariants = std::variant< AggregationMethodKeysFixed, AggregationMethodKeysFixed>; -struct AggregatedDataVariants { - AggregatedDataVariants() = default; - AggregatedDataVariants(const AggregatedDataVariants&) = delete; - AggregatedDataVariants& operator=(const AggregatedDataVariants&) = delete; +struct AggregatedDataVariants + : public DataVariants { AggregatedDataWithoutKey without_key = nullptr; - AggregatedMethodVariants _aggregated_method_variant; - // TODO: may we should support uint256 in the future - enum class Type { - EMPTY = 0, - without_key, - serialized, - int8_key, - int16_key, - int32_key, - int32_key_phase2, - int64_key, - int64_key_phase2, - int128_key, - int128_key_phase2, - int64_keys, - int64_keys_phase2, - int128_keys, - int128_keys_phase2, - int256_keys, - int256_keys_phase2, - string_key, - int136_keys, - int136_keys_phase2, - }; - - Type _type = Type::EMPTY; - - void init(Type type, bool is_nullable = false) { + template + void init(Type type) { _type = type; switch (_type) { case Type::without_key: break; case Type::serialized: - _aggregated_method_variant - .emplace>(); + method_variant.emplace>(); break; case Type::int8_key: - if (is_nullable) { - _aggregated_method_variant.emplace>>(); - } else { - _aggregated_method_variant - .emplace>(); - } + emplace_single(); break; case Type::int16_key: - if (is_nullable) { - _aggregated_method_variant.emplace>>(); - } else { - _aggregated_method_variant - .emplace>(); - } + emplace_single(); break; case Type::int32_key: - if (is_nullable) { - _aggregated_method_variant.emplace>>(); - } else { - _aggregated_method_variant - .emplace>(); - } + emplace_single(); break; case Type::int32_key_phase2: - if (is_nullable) { - _aggregated_method_variant - .emplace>>(); - } else { - _aggregated_method_variant.emplace< - AggregationMethodOneNumber>(); - } + emplace_single(); break; case Type::int64_key: - if (is_nullable) { - _aggregated_method_variant.emplace>>(); - } else { - _aggregated_method_variant - .emplace>(); - } + emplace_single(); break; case Type::int64_key_phase2: - if (is_nullable) { - _aggregated_method_variant - .emplace>>(); - } else { - _aggregated_method_variant.emplace< - AggregationMethodOneNumber>(); - } + emplace_single(); break; case Type::int128_key: - if (is_nullable) { - _aggregated_method_variant - .emplace>>(); - } else { - _aggregated_method_variant.emplace< - AggregationMethodOneNumber>(); - } + emplace_single(); break; case Type::int128_key_phase2: - if (is_nullable) { - _aggregated_method_variant - .emplace>>(); - } else { - _aggregated_method_variant.emplace< - AggregationMethodOneNumber>(); - } + emplace_single(); break; case Type::int64_keys: - if (is_nullable) { - _aggregated_method_variant - .emplace>(); - } else { - _aggregated_method_variant - .emplace>(); - } + emplace_fixed(); break; case Type::int64_keys_phase2: - - if (is_nullable) { - _aggregated_method_variant.emplace< - AggregationMethodKeysFixed>(); - } else { - _aggregated_method_variant.emplace< - AggregationMethodKeysFixed>(); - } - + emplace_fixed(); break; case Type::int128_keys: - - if (is_nullable) { - _aggregated_method_variant - .emplace>(); - } else { - _aggregated_method_variant - .emplace>(); - } + emplace_fixed(); break; case Type::int128_keys_phase2: - - if (is_nullable) { - _aggregated_method_variant.emplace< - AggregationMethodKeysFixed>(); - } else { - _aggregated_method_variant.emplace< - AggregationMethodKeysFixed>(); - } - + emplace_fixed(); + break; + case Type::int136_keys: + emplace_fixed(); + break; + case Type::int136_keys_phase2: + emplace_fixed(); break; case Type::int256_keys: - - if (is_nullable) { - _aggregated_method_variant - .emplace>(); - } else { - _aggregated_method_variant - .emplace>(); - } - + emplace_fixed(); break; case Type::int256_keys_phase2: - - if (is_nullable) { - _aggregated_method_variant.emplace< - AggregationMethodKeysFixed>(); - } else { - _aggregated_method_variant.emplace< - AggregationMethodKeysFixed>(); - } + emplace_fixed(); break; case Type::string_key: - if (is_nullable) { - _aggregated_method_variant.emplace< + if (nullable) { + method_variant.emplace< AggregationMethodSingleNullableColumn>>(); } else { - _aggregated_method_variant.emplace< + method_variant.emplace< AggregationMethodStringNoCache>(); } break; - case Type::int136_keys: - - if (is_nullable) { - _aggregated_method_variant - .emplace>(); - } else { - _aggregated_method_variant - .emplace>(); - } - - break; - case Type::int136_keys_phase2: - - if (is_nullable) { - _aggregated_method_variant.emplace< - AggregationMethodKeysFixed>(); - } else { - _aggregated_method_variant.emplace< - AggregationMethodKeysFixed>(); - } - break; default: - DCHECK(false) << "Do not have a rigth agg data type"; + throw Exception(ErrorCode::INTERNAL_ERROR, "meet invalid key type, type={}", type); + } + } + + void init(Type type, bool is_nullable = false) { + if (is_nullable) { + init(type); + } else { + init(type); } } }; @@ -744,7 +615,7 @@ public: IteratorBase(Container* container_, uint32_t index_) : container(container_), index(index_) { sub_container_index = index / SUB_CONTAINER_CAPACITY; - index_in_sub_container = index % SUB_CONTAINER_CAPACITY; + index_in_sub_container = index - sub_container_index * SUB_CONTAINER_CAPACITY; } bool operator==(const IteratorBase& rhs) const { return index == rhs.index; } @@ -752,8 +623,11 @@ public: Derived& operator++() { index++; - sub_container_index = index / SUB_CONTAINER_CAPACITY; - index_in_sub_container = index % SUB_CONTAINER_CAPACITY; + index_in_sub_container++; + if (index_in_sub_container == SUB_CONTAINER_CAPACITY) { + index_in_sub_container = 0; + sub_container_index++; + } return static_cast(*this); } diff --git a/be/src/vec/exec/vpartition_sort_node.cpp b/be/src/vec/exec/vpartition_sort_node.cpp index eb807ad4b4..f9c767276e 100644 --- a/be/src/vec/exec/vpartition_sort_node.cpp +++ b/be/src/vec/exec/vpartition_sort_node.cpp @@ -166,7 +166,7 @@ void VPartitionSortNode::_emplace_into_hash_table(const ColumnRawPtrs& key_colum batch_size); } }, - _partitioned_data->_partition_method_variant); + _partitioned_data->method_variant); } Status VPartitionSortNode::sink(RuntimeState* state, vectorized::Block* input_block, bool eos) { diff --git a/be/src/vec/exec/vpartition_sort_node.h b/be/src/vec/exec/vpartition_sort_node.h index 6ff6b980c6..a583d99b1f 100644 --- a/be/src/vec/exec/vpartition_sort_node.h +++ b/be/src/vec/exec/vpartition_sort_node.h @@ -33,6 +33,7 @@ #include "vec/common/sort/partition_sorter.h" #include "vec/common/sort/vsort_exec_exprs.h" #include "vec/core/block.h" +#include "vec/exec/vaggregation_node.h" namespace doris { namespace vectorized { @@ -192,7 +193,7 @@ struct PartitionMethodStringNoCache { /// For the case where there is one numeric key. /// FieldType is UInt8/16/32/64 for any type with corresponding bit width. -template +template struct PartitionMethodOneNumber { using Data = TData; using Key = typename Data::key_type; @@ -210,7 +211,7 @@ struct PartitionMethodOneNumber { /// To use one `Method` in different threads, use different `State`. using State = ColumnsHashing::HashMethodOneNumber; + false>; }; template @@ -304,134 +305,70 @@ using PartitionedMethodVariants = PartitionMethodSingleNullableColumn>>>; -struct PartitionedHashMapVariants { - PartitionedHashMapVariants() = default; - PartitionedHashMapVariants(const PartitionedHashMapVariants&) = delete; - PartitionedHashMapVariants& operator=(const PartitionedHashMapVariants&) = delete; - PartitionedMethodVariants _partition_method_variant; - - enum class Type { - EMPTY = 0, - serialized, - int8_key, - int16_key, - int32_key, - int64_key, - int128_key, - int64_keys, - int128_keys, - int256_keys, - string_key, - }; - - Type _type = Type::EMPTY; - - void init(Type type, bool is_nullable = false) { +struct PartitionedHashMapVariants + : public DataVariants { + template + void init(Type type) { _type = type; switch (_type) { case Type::serialized: { - _partition_method_variant - .emplace>(); + method_variant.emplace>(); break; } case Type::int8_key: { - if (is_nullable) { - _partition_method_variant - .emplace>>>(); - } else { - _partition_method_variant - .emplace>(); - } + emplace_single(); break; } case Type::int16_key: { - if (is_nullable) { - _partition_method_variant - .emplace>>>(); - } else { - _partition_method_variant - .emplace>(); - } + emplace_single(); break; } case Type::int32_key: { - if (is_nullable) { - _partition_method_variant - .emplace>>>(); - } else { - _partition_method_variant - .emplace>(); - } + emplace_single(); break; } case Type::int64_key: { - if (is_nullable) { - _partition_method_variant - .emplace>>>(); - } else { - _partition_method_variant - .emplace>(); - } + emplace_single(); break; } case Type::int128_key: { - if (is_nullable) { - _partition_method_variant - .emplace>>>(); - } else { - _partition_method_variant - .emplace>(); - } + emplace_single(); break; } case Type::int64_keys: { - if (is_nullable) { - _partition_method_variant - .emplace>(); - } else { - _partition_method_variant - .emplace>(); - } + emplace_fixed(); break; } case Type::int128_keys: { - if (is_nullable) { - _partition_method_variant - .emplace>(); - } else { - _partition_method_variant - .emplace>(); - } + emplace_fixed(); break; } case Type::int256_keys: { - if (is_nullable) { - _partition_method_variant - .emplace>(); - } else { - _partition_method_variant - .emplace>(); - } + emplace_fixed(); break; } case Type::string_key: { - if (is_nullable) { - _partition_method_variant + if (nullable) { + method_variant .emplace>>>(); } else { - _partition_method_variant + method_variant .emplace>(); } break; } default: - DCHECK(false) << "Do not have a rigth partition by data type: "; + throw Exception(ErrorCode::INTERNAL_ERROR, "meet invalid key type, type={}", type); + } + } + void init(Type type, bool is_nullable = false) { + if (is_nullable) { + init(type); + } else { + init(type); } } }; diff --git a/be/src/vec/functions/function.h b/be/src/vec/functions/function.h index 246cfe1ee8..e1b9016be6 100644 --- a/be/src/vec/functions/function.h +++ b/be/src/vec/functions/function.h @@ -667,4 +667,47 @@ using FunctionPtr = std::shared_ptr; ColumnPtr wrap_in_nullable(const ColumnPtr& src, const Block& block, const ColumnNumbers& args, size_t result, size_t input_rows_count); +#define NUMERIC_TYPE_TO_COLUMN_TYPE(M) \ + M(UInt8, ColumnUInt8) \ + M(Int8, ColumnInt8) \ + M(Int16, ColumnInt16) \ + M(Int32, ColumnInt32) \ + M(Int64, ColumnInt64) \ + M(Int128, ColumnInt128) \ + M(Float32, ColumnFloat32) \ + M(Float64, ColumnFloat64) + +#define DECIMAL_TYPE_TO_COLUMN_TYPE(M) \ + M(Decimal32, ColumnDecimal) \ + M(Decimal64, ColumnDecimal) \ + M(Decimal128, ColumnDecimal) \ + M(Decimal128I, ColumnDecimal) + +#define STRING_TYPE_TO_COLUMN_TYPE(M) \ + M(String, ColumnString) \ + M(JSONB, ColumnString) + +#define TIME_TYPE_TO_COLUMN_TYPE(M) \ + M(Date, ColumnInt64) \ + M(DateTime, ColumnInt64) \ + M(DateV2, ColumnUInt32) \ + M(DateTimeV2, ColumnUInt64) + +#define COMPLEX_TYPE_TO_COLUMN_TYPE(M) \ + M(Array, ColumnArray) \ + M(Map, ColumnMap) \ + M(Struct, ColumnStruct) \ + M(BitMap, ColumnBitmap) \ + M(HLL, ColumnHLL) + +#define TYPE_TO_BASIC_COLUMN_TYPE(M) \ + NUMERIC_TYPE_TO_COLUMN_TYPE(M) \ + DECIMAL_TYPE_TO_COLUMN_TYPE(M) \ + STRING_TYPE_TO_COLUMN_TYPE(M) \ + TIME_TYPE_TO_COLUMN_TYPE(M) + +#define TYPE_TO_COLUMN_TYPE(M) \ + TYPE_TO_BASIC_COLUMN_TYPE(M) \ + COMPLEX_TYPE_TO_COLUMN_TYPE(M) + } // namespace doris::vectorized diff --git a/be/src/vec/utils/template_helpers.hpp b/be/src/vec/utils/template_helpers.hpp index e0e3cd1d21..fa94180cde 100644 --- a/be/src/vec/utils/template_helpers.hpp +++ b/be/src/vec/utils/template_helpers.hpp @@ -20,59 +20,6 @@ #include #include -#include "http/http_status.h" -#include "vec/aggregate_functions/aggregate_function.h" -#include "vec/columns/column_array.h" -#include "vec/columns/column_complex.h" -#include "vec/columns/column_map.h" -#include "vec/columns/column_struct.h" -#include "vec/columns/columns_number.h" -#include "vec/data_types/data_type.h" -#include "vec/functions/function.h" - -#define NUMERIC_TYPE_TO_COLUMN_TYPE(M) \ - M(UInt8, ColumnUInt8) \ - M(Int8, ColumnInt8) \ - M(Int16, ColumnInt16) \ - M(Int32, ColumnInt32) \ - M(Int64, ColumnInt64) \ - M(Int128, ColumnInt128) \ - M(Float32, ColumnFloat32) \ - M(Float64, ColumnFloat64) - -#define DECIMAL_TYPE_TO_COLUMN_TYPE(M) \ - M(Decimal32, ColumnDecimal) \ - M(Decimal64, ColumnDecimal) \ - M(Decimal128, ColumnDecimal) \ - M(Decimal128I, ColumnDecimal) - -#define STRING_TYPE_TO_COLUMN_TYPE(M) \ - M(String, ColumnString) \ - M(JSONB, ColumnString) - -#define TIME_TYPE_TO_COLUMN_TYPE(M) \ - M(Date, ColumnInt64) \ - M(DateTime, ColumnInt64) \ - M(DateV2, ColumnUInt32) \ - M(DateTimeV2, ColumnUInt64) - -#define COMPLEX_TYPE_TO_COLUMN_TYPE(M) \ - M(Array, ColumnArray) \ - M(Map, ColumnMap) \ - M(Struct, ColumnStruct) \ - M(BitMap, ColumnBitmap) \ - M(HLL, ColumnHLL) - -#define TYPE_TO_BASIC_COLUMN_TYPE(M) \ - NUMERIC_TYPE_TO_COLUMN_TYPE(M) \ - DECIMAL_TYPE_TO_COLUMN_TYPE(M) \ - STRING_TYPE_TO_COLUMN_TYPE(M) \ - TIME_TYPE_TO_COLUMN_TYPE(M) - -#define TYPE_TO_COLUMN_TYPE(M) \ - TYPE_TO_BASIC_COLUMN_TYPE(M) \ - COMPLEX_TYPE_TO_COLUMN_TYPE(M) - namespace doris::vectorized { template typename Reducer> @@ -83,10 +30,8 @@ struct constexpr_loop_match { if (start == target) { Reducer::run(std::forward(args)...); } else { - if constexpr (start < std::numeric_limits::max()) { - constexpr_loop_match::run( - target, std::forward(args)...); - } + constexpr_loop_match::run( + target, std::forward(args)...); } } } @@ -95,55 +40,6 @@ struct constexpr_loop_match { template typename Reducer> using constexpr_int_match = constexpr_loop_match; -template