diff --git a/be/src/exprs/minmax_predicate.h b/be/src/exprs/minmax_predicate.h index 297530dbd8..e7d82c9523 100644 --- a/be/src/exprs/minmax_predicate.h +++ b/be/src/exprs/minmax_predicate.h @@ -28,7 +28,7 @@ namespace doris { // only used in Runtime Filter -class MinMaxFuncBase { +class MinMaxFuncBase : public RuntimeFilterFuncBase { public: virtual void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) = 0; virtual void* get_max() = 0; @@ -38,6 +38,13 @@ public: // merge from other minmax_func virtual Status merge(MinMaxFuncBase* minmax_func, ObjectPool* pool) = 0; virtual ~MinMaxFuncBase() = default; + + bool contain_null() const { return _null_aware && _contain_null; } + + void set_contain_null() { _contain_null = true; } + +protected: + bool _contain_null = false; }; template @@ -47,15 +54,13 @@ public: ~MinMaxNumFunc() override = default; void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) override { - if (column->empty()) { - return; - } if (column->is_nullable()) { const auto* nullable = assert_cast(column.get()); const auto& col = nullable->get_nested_column_ptr(); const auto& nullmap = nullable->get_null_map_data(); if (nullable->has_null()) { update_batch(col, nullmap, start); + _contain_null = true; } else { update_batch(col, start); } @@ -152,6 +157,7 @@ public: } } + _contain_null |= minmax_func->contain_null(); return Status::OK(); } diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 963bcae7b0..83100b3129 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -218,7 +218,8 @@ Status create_literal(const TypeDescriptor& type, const void* data, vectorized:: } Status create_vbin_predicate(const TypeDescriptor& type, TExprOpcode::type opcode, - vectorized::VExprSPtr& expr, TExprNode* tnode) { + vectorized::VExprSPtr& expr, TExprNode* tnode, + bool contain_null = false) { TExprNode node; TScalarType tscalar_type; tscalar_type.__set_type(TPrimitiveType::BOOLEAN); @@ -232,7 +233,8 @@ Status create_vbin_predicate(const TypeDescriptor& type, TExprOpcode::type opcod node.__set_child_type(to_thrift(type.type)); node.__set_num_children(2); node.__set_output_scale(type.scale); - node.__set_node_type(TExprNodeType::BINARY_PRED); + node.__set_node_type(contain_null ? TExprNodeType::NULL_AWARE_BINARY_PRED + : TExprNodeType::BINARY_PRED); TFunction fn; TFunctionName fn_name; fn_name.__set_db_name(""); @@ -747,9 +749,15 @@ public: // used by shuffle runtime filter // assign this filter by protobuf - Status assign(const PMinMaxFilter* minmax_filter) { + Status assign(const PMinMaxFilter* minmax_filter, bool contain_null) { PrimitiveType type = to_primitive_type(minmax_filter->column_type()); _context.minmax_func.reset(create_minmax_filter(type)); + + if (contain_null) { + _context.minmax_func->set_null_aware(true); + _context.minmax_func->set_contain_null(); + } + switch (type) { case TYPE_BOOLEAN: { bool min_val = minmax_filter->min_val().boolval(); @@ -900,6 +908,9 @@ public: DCHECK(get_real_type() == RuntimeFilterType::IN_FILTER); return _context.hybrid_set->contain_null(); } + if (_context.minmax_func) { + return _context.minmax_func->contain_null(); + } return false; } @@ -1345,7 +1356,7 @@ Status IRuntimeFilter::create_wrapper(const UpdateRuntimeFilterParamsV2* param, case PFilterType::MAX_FILTER: case PFilterType::MINMAX_FILTER: { DCHECK(param->request->has_minmax_filter()); - return (*wrapper)->assign(¶m->request->minmax_filter()); + return (*wrapper)->assign(¶m->request->minmax_filter(), param->request->contain_null()); } default: return Status::InvalidArgument("unknown filter type"); @@ -1388,7 +1399,7 @@ Status IRuntimeFilter::_create_wrapper(const T* param, ObjectPool* pool, case PFilterType::MAX_FILTER: case PFilterType::MINMAX_FILTER: { DCHECK(param->request->has_minmax_filter()); - return (*wrapper)->assign(¶m->request->minmax_filter()); + return (*wrapper)->assign(¶m->request->minmax_filter(), param->request->contain_null()); } default: return Status::InvalidArgument("unknown filter type"); @@ -1773,10 +1784,9 @@ Status RuntimePredicateWrapper::get_push_exprs( node.in_predicate.__set_is_not_in(false); node.__set_opcode(TExprOpcode::FILTER_IN); node.__set_is_nullable(false); - auto in_pred = vectorized::VDirectInPredicate::create_shared(node, null_aware); - in_pred->set_filter(_context.hybrid_set); + auto in_pred = vectorized::VDirectInPredicate::create_shared(node, _context.hybrid_set); in_pred->add_child(probe_ctx->root()); - auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(node, in_pred); + auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(node, in_pred, null_aware); container.push_back(wrapper); break; } @@ -1821,8 +1831,8 @@ Status RuntimePredicateWrapper::get_push_exprs( max_literal)); max_pred->add_child(probe_ctx->root()); max_pred->add_child(max_literal); - container.push_back( - vectorized::VRuntimeFilterWrapper::create_shared(max_pred_node, max_pred)); + container.push_back(vectorized::VRuntimeFilterWrapper::create_shared(max_pred_node, + max_pred, null_aware)); vectorized::VExprContextSPtr new_probe_ctx; RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(probe_expr, new_probe_ctx)); @@ -1838,8 +1848,8 @@ Status RuntimePredicateWrapper::get_push_exprs( _context.minmax_func->get_min(), min_literal)); min_pred->add_child(new_probe_ctx->root()); min_pred->add_child(min_literal); - container.push_back( - vectorized::VRuntimeFilterWrapper::create_shared(min_pred_node, min_pred)); + container.push_back(vectorized::VRuntimeFilterWrapper::create_shared(min_pred_node, + min_pred, null_aware)); break; } case RuntimeFilterType::BLOOM_FILTER: { diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 95160c9403..6ce0b6f13b 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -2075,13 +2075,12 @@ Status OrcReader::_rewrite_dict_conjuncts(std::vector& dict_codes, int // VdirectInPredicate assume is_nullable = false. node.__set_is_nullable(false); - root = vectorized::VDirectInPredicate::create_shared(node); std::shared_ptr hybrid_set( create_set(PrimitiveType::TYPE_INT, dict_codes.size())); for (int j = 0; j < dict_codes.size(); ++j) { hybrid_set->insert(&dict_codes[j]); } - static_cast(root.get())->set_filter(hybrid_set); + root = vectorized::VDirectInPredicate::create_shared(node, hybrid_set); } { SlotDescriptor* slot = nullptr; diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 1aea6a52c4..3f8000c317 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -945,13 +945,12 @@ Status RowGroupReader::_rewrite_dict_conjuncts(std::vector& dict_codes, // VdirectInPredicate assume is_nullable = false. node.__set_is_nullable(false); - root = vectorized::VDirectInPredicate::create_shared(node); std::shared_ptr hybrid_set( create_set(PrimitiveType::TYPE_INT, dict_codes.size())); for (int j = 0; j < dict_codes.size(); ++j) { hybrid_set->insert(&dict_codes[j]); } - static_cast(root.get())->set_filter(hybrid_set); + root = vectorized::VDirectInPredicate::create_shared(node, hybrid_set); } { SlotDescriptor* slot = nullptr; diff --git a/be/src/vec/exprs/vdirect_in_predicate.h b/be/src/vec/exprs/vdirect_in_predicate.h index 9b3d861b3b..7abd43a5e0 100644 --- a/be/src/vec/exprs/vdirect_in_predicate.h +++ b/be/src/vec/exprs/vdirect_in_predicate.h @@ -26,11 +26,8 @@ class VDirectInPredicate final : public VExpr { ENABLE_FACTORY_CREATOR(VDirectInPredicate); public: - VDirectInPredicate(const TExprNode& node, bool null_aware = false) - : VExpr(node), - _filter(nullptr), - _expr_name("direct_in_predicate"), - _null_aware(null_aware) {} + VDirectInPredicate(const TExprNode& node, const std::shared_ptr& filter) + : VExpr(node), _filter(filter), _expr_name("direct_in_predicate") {} ~VDirectInPredicate() override = default; Status prepare(RuntimeState* state, const RowDescriptor& row_desc, @@ -49,8 +46,25 @@ public: } Status execute(VExprContext* context, Block* block, int* result_column_id) override { + ColumnNumbers arguments; + return _do_execute(context, block, result_column_id, arguments); + } + + Status execute_runtime_fitler(doris::vectorized::VExprContext* context, + doris::vectorized::Block* block, int* result_column_id, + std::vector& args) override { + return _do_execute(context, block, result_column_id, args); + } + + const std::string& expr_name() const override { return _expr_name; } + + std::shared_ptr get_set_func() const override { return _filter; } + +private: + Status _do_execute(VExprContext* context, Block* block, int* result_column_id, + std::vector& arguments) { DCHECK(_open_finished || _getting_const_col); - ColumnNumbers arguments(_children.size()); + arguments.resize(_children.size()); for (int i = 0; i < _children.size(); ++i) { int column_id = -1; RETURN_IF_ERROR(_children[i]->execute(context, block, &column_id)); @@ -70,12 +84,6 @@ public: const auto& null_map = static_cast(argument_column.get())->get_null_map_data(); _filter->find_batch_nullable(*column_nested, sz, null_map, res_data_column->get_data()); - if (_null_aware) { - auto* __restrict res_data = res_data_column->get_data().data(); - for (size_t i = 0; i < sz; ++i) { - res_data[i] |= null_map[i]; - } - } } else { _filter->find_batch(*argument_column, sz, res_data_column->get_data()); } @@ -88,15 +96,7 @@ public: return Status::OK(); } - const std::string& expr_name() const override { return _expr_name; } - - void set_filter(std::shared_ptr& filter) { _filter = filter; } - - std::shared_ptr get_set_func() const override { return _filter; } - -private: std::shared_ptr _filter; std::string _expr_name; - bool _null_aware; }; } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exprs/vectorized_fn_call.cpp b/be/src/vec/exprs/vectorized_fn_call.cpp index bf38185f7d..d039a4e3a3 100644 --- a/be/src/vec/exprs/vectorized_fn_call.cpp +++ b/be/src/vec/exprs/vectorized_fn_call.cpp @@ -136,43 +136,54 @@ void VectorizedFnCall::close(VExprContext* context, FunctionContext::FunctionSta VExpr::close(context, scope); } -Status VectorizedFnCall::execute(VExprContext* context, vectorized::Block* block, - int* result_column_id) { +Status VectorizedFnCall::_do_execute(doris::vectorized::VExprContext* context, + doris::vectorized::Block* block, int* result_column_id, + std::vector& args) { if (is_const_and_have_executed()) { // const have execute in open function return get_result_from_const(block, _expr_name, result_column_id); } DCHECK(_open_finished || _getting_const_col) << debug_string(); // TODO: not execute const expr again, but use the const column in function context - vectorized::ColumnNumbers arguments(_children.size()); + args.resize(_children.size()); for (int i = 0; i < _children.size(); ++i) { int column_id = -1; RETURN_IF_ERROR(_children[i]->execute(context, block, &column_id)); - arguments[i] = column_id; + args[i] = column_id; } - RETURN_IF_ERROR(check_constant(*block, arguments)); + RETURN_IF_ERROR(check_constant(*block, args)); // call function size_t num_columns_without_result = block->columns(); // prepare a column to save result block->insert({nullptr, _data_type, _expr_name}); if (_can_fast_execute) { // if not find fast execute result column, means do not need check fast execute again - _can_fast_execute = fast_execute(context->fn_context(_fn_context_index), *block, arguments, + _can_fast_execute = fast_execute(context->fn_context(_fn_context_index), *block, args, num_columns_without_result, block->rows()); if (_can_fast_execute) { *result_column_id = num_columns_without_result; return Status::OK(); } } - - RETURN_IF_ERROR(_function->execute(context->fn_context(_fn_context_index), *block, arguments, + RETURN_IF_ERROR(_function->execute(context->fn_context(_fn_context_index), *block, args, num_columns_without_result, block->rows(), false)); *result_column_id = num_columns_without_result; - return Status::OK(); } +Status VectorizedFnCall::execute_runtime_fitler(doris::vectorized::VExprContext* context, + doris::vectorized::Block* block, + int* result_column_id, std::vector& args) { + return _do_execute(context, block, result_column_id, args); +} + +Status VectorizedFnCall::execute(VExprContext* context, vectorized::Block* block, + int* result_column_id) { + std::vector arguments; + return _do_execute(context, block, result_column_id, arguments); +} + // fast_execute can direct copy expr filter result which build by apply index in segment_iterator bool VectorizedFnCall::fast_execute(FunctionContext* context, Block& block, const ColumnNumbers& arguments, size_t result, diff --git a/be/src/vec/exprs/vectorized_fn_call.h b/be/src/vec/exprs/vectorized_fn_call.h index 8c9ed6d939..24cab0c94b 100644 --- a/be/src/vec/exprs/vectorized_fn_call.h +++ b/be/src/vec/exprs/vectorized_fn_call.h @@ -47,6 +47,9 @@ class VectorizedFnCall : public VExpr { public: VectorizedFnCall(const TExprNode& node); Status execute(VExprContext* context, Block* block, int* result_column_id) override; + Status execute_runtime_fitler(doris::vectorized::VExprContext* context, + doris::vectorized::Block* block, int* result_column_id, + std::vector& args) override; Status prepare(RuntimeState* state, const RowDescriptor& desc, VExprContext* context) override; Status open(RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope) override; @@ -71,5 +74,9 @@ protected: bool _can_fast_execute = false; std::string _expr_name; std::string _function_name; + +private: + Status _do_execute(doris::vectorized::VExprContext* context, doris::vectorized::Block* block, + int* result_column_id, std::vector& args); }; } // namespace doris::vectorized diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp index 4f6b984e8f..007d2b9991 100644 --- a/be/src/vec/exprs/vexpr.cpp +++ b/be/src/vec/exprs/vexpr.cpp @@ -279,6 +279,7 @@ Status VExpr::create_expr(const TExprNode& expr_node, VExprSPtr& expr) { } case TExprNodeType::ARITHMETIC_EXPR: case TExprNodeType::BINARY_PRED: + case TExprNodeType::NULL_AWARE_BINARY_PRED: case TExprNodeType::FUNCTION_CALL: case TExprNodeType::COMPUTE_FUNCTION_CALL: { expr = VectorizedFnCall::create_shared(expr_node); diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h index a852afeb2d..9a6b514d03 100644 --- a/be/src/vec/exprs/vexpr.h +++ b/be/src/vec/exprs/vexpr.h @@ -114,6 +114,13 @@ public: virtual Status execute(VExprContext* context, Block* block, int* result_column_id) = 0; + // Only the 4th parameter is used in the runtime filter. In and MinMax need overwrite the + // interface + virtual Status execute_runtime_fitler(VExprContext* context, Block* block, + int* result_column_id, std::vector& args) { + return execute(context, block, result_column_id); + }; + /// Subclasses overriding this function should call VExpr::Close(). // /// If scope if FRAGMENT_LOCAL, both fragment- and thread-local state should be torn diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.cpp b/be/src/vec/exprs/vruntimefilter_wrapper.cpp index dcf3cba72e..8589463664 100644 --- a/be/src/vec/exprs/vruntimefilter_wrapper.cpp +++ b/be/src/vec/exprs/vruntimefilter_wrapper.cpp @@ -48,8 +48,14 @@ class VExprContext; namespace doris::vectorized { -VRuntimeFilterWrapper::VRuntimeFilterWrapper(const TExprNode& node, const VExprSPtr& impl) - : VExpr(node), _impl(impl), _always_true(false), _filtered_rows(0), _scan_rows(0) {} +VRuntimeFilterWrapper::VRuntimeFilterWrapper(const TExprNode& node, const VExprSPtr& impl, + bool null_aware) + : VExpr(node), + _impl(impl), + _always_true(false), + _filtered_rows(0), + _scan_rows(0), + _null_aware(null_aware) {} Status VRuntimeFilterWrapper::prepare(RuntimeState* state, const RowDescriptor& desc, VExprContext* context) { @@ -104,31 +110,75 @@ Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block, int* if (_getting_const_col) { _impl->set_getting_const_col(true); } - RETURN_IF_ERROR(_impl->execute(context, block, result_column_id)); + std::vector args; + RETURN_IF_ERROR(_impl->execute_runtime_fitler(context, block, result_column_id, args)); if (_getting_const_col) { _impl->set_getting_const_col(false); } - uint8_t* data = nullptr; - const ColumnWithTypeAndName& result_column = block->get_by_position(*result_column_id); + const auto rows = block->rows(); + ColumnWithTypeAndName& result_column = block->get_by_position(*result_column_id); if (is_column_const(*result_column.column)) { auto* constant_val = const_cast(result_column.column->get_data_at(0).data); - if (constant_val == nullptr || !*reinterpret_cast(constant_val)) { - filter_rows += block->rows(); + auto filter = constant_val == nullptr && reinterpret_cast(constant_val); + // if _null_aware is true, we should check the first args column is nullable. if value in + // column is null. we should set it to true + if (_null_aware) { + DCHECK(!args.empty()); + DCHECK(is_column_const(*block->get_by_position(args[0]).column)); + if (filter && + block->get_by_position(args[0]).column->get_data_at(0).data == nullptr) { + auto res_col = ColumnVector::create(1, 1); + if (result_column.type->is_nullable()) { + result_column.column = make_nullable(std::move(res_col), false); + } else { + result_column.column = std::move(res_col); + } + filter = false; + } + } + if (filter) { + filter_rows += rows; + } + } else if (auto* nullable = check_and_get_column(*result_column.column)) { + auto* __restrict data = ((ColumnVector*)nullable->get_nested_column_ptr().get()) + ->get_data() + .data(); + auto* __restrict null_map = const_cast(nullable->get_null_map_data().data()); + + if (_null_aware && block->get_by_position(args[0]).column->is_nullable()) { + auto* __restrict null_map_args = + ((ColumnNullable*)block->get_by_position(args[0]).column.get()) + ->get_null_map_data() + .data(); + // TODO: try to simd the code + for (int i = 0; i < rows; ++i) { + if (null_map_args[i]) { + null_map[i] = 0; + data[i] = 1; + } + filter_rows += (!null_map[i]) && (data[i] == 1); + } + } else { + filter_rows += doris::simd::count_zero_num( + reinterpret_cast(data), null_map, rows); } - } else if (const auto* nullable = - check_and_get_column(*result_column.column)) { - data = ((ColumnVector*)nullable->get_nested_column_ptr().get()) - ->get_data() - .data(); - filter_rows += doris::simd::count_zero_num(reinterpret_cast(data), - nullable->get_null_map_data().data(), - block->rows()); } else if (const auto* res_col = check_and_get_column>(*result_column.column)) { - data = const_cast(res_col->get_data().data()); - filter_rows += doris::simd::count_zero_num(reinterpret_cast(data), - block->rows()); + auto* __restrict data = const_cast(res_col->get_data().data()); + if (_null_aware && block->get_by_position(args[0]).column->is_nullable()) { + auto* __restrict null_map_args = + ((ColumnNullable*)block->get_by_position(args[0]).column.get()) + ->get_null_map_data() + .data(); + for (int i = 0; i < rows; ++i) { + data[i] |= null_map_args[i]; + filter_rows += data[i]; + } + } else { + filter_rows += + doris::simd::count_zero_num(reinterpret_cast(data), rows); + } } else { return Status::InternalError( "Invalid type for runtime filters!, and _expr_name is: {}. _data_type is: {}. " diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.h b/be/src/vec/exprs/vruntimefilter_wrapper.h index 3001785002..a3b96166ee 100644 --- a/be/src/vec/exprs/vruntimefilter_wrapper.h +++ b/be/src/vec/exprs/vruntimefilter_wrapper.h @@ -44,7 +44,7 @@ class VRuntimeFilterWrapper final : public VExpr { ENABLE_FACTORY_CREATOR(VRuntimeFilterWrapper); public: - VRuntimeFilterWrapper(const TExprNode& node, const VExprSPtr& impl); + VRuntimeFilterWrapper(const TExprNode& node, const VExprSPtr& impl, bool null_aware = false); ~VRuntimeFilterWrapper() override = default; Status execute(VExprContext* context, Block* block, int* result_column_id) override; Status prepare(RuntimeState* state, const RowDescriptor& desc, VExprContext* context) override; @@ -91,6 +91,7 @@ private: bool _has_calculate_filter = false; std::string _expr_name; + bool _null_aware; }; using VRuntimeFilterPtr = std::shared_ptr; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java index 03dc70e653..1e9135d600 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java @@ -24,9 +24,7 @@ import org.apache.doris.nereids.processor.post.RuntimeFilterContext; import org.apache.doris.nereids.processor.post.RuntimeFilterGenerator; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.properties.PhysicalProperties; -import org.apache.doris.nereids.trees.expressions.EqualPredicate; import org.apache.doris.nereids.trees.expressions.Expression; -import org.apache.doris.nereids.trees.expressions.NullSafeEqual; import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.plans.AbstractPlan; import org.apache.doris.nereids.trees.plans.Explainable; @@ -133,12 +131,6 @@ public abstract class AbstractPhysicalPlan extends AbstractPlan implements Physi ctx.setTargetsOnScanNode(ctx.getAliasTransferPair(probeSlot).first, scanSlot); } } else { - // null safe equal runtime filter only support bloom filter - EqualPredicate eq = (EqualPredicate) builderNode.getHashJoinConjuncts().get(exprOrder); - if (eq instanceof NullSafeEqual && type == TRuntimeFilterType.MIN_MAX - || type == TRuntimeFilterType.BITMAP) { - return false; - } filter = new RuntimeFilter(generator.getNextId(), src, ImmutableList.of(scanSlot), ImmutableList.of(probeExpr), type, exprOrder, builderNode, buildSideNdv, diff --git a/gensrc/thrift/Exprs.thrift b/gensrc/thrift/Exprs.thrift index e3503f3a77..9894c1539e 100644 --- a/gensrc/thrift/Exprs.thrift +++ b/gensrc/thrift/Exprs.thrift @@ -79,7 +79,9 @@ enum TExprNodeType { IPV6_LITERAL // only used in runtime filter + // to prevent push to storage layer NULL_AWARE_IN_PRED, + NULL_AWARE_BINARY_PRED, } //enum TAggregationOp {