From bea9a7ba4fa34d06404edb9d4da0697f9e2ac7a2 Mon Sep 17 00:00:00 2001 From: spaces-x Date: Thu, 24 Mar 2022 09:11:34 +0800 Subject: [PATCH] [feature] Support pre-aggregation for quantile type (#8234) Add a new column-type to speed up the approximation of quantiles. 1. The new column-type is named `quantile_state` with fixed aggregation function `quantile_union`, which stores the intermediate results of pre-aggregated approximation calculations for quantiles. 2. support pre-aggregation of new column-type and quantile_state related functions. --- be/src/common/daemon.cpp | 2 + be/src/exec/olap_scanner.cpp | 38 +- be/src/exprs/CMakeLists.txt | 1 + be/src/exprs/agg_fn_evaluator.cpp | 4 + be/src/exprs/anyval_util.cpp | 7 +- be/src/exprs/anyval_util.h | 3 + be/src/exprs/case_expr.cpp | 2 + be/src/exprs/expr.cpp | 3 + be/src/exprs/expr_context.cpp | 3 +- be/src/exprs/new_agg_fn_evaluator.cc | 3 + be/src/exprs/quantile_function.cpp | 164 ++++++++ be/src/exprs/quantile_function.h | 36 ++ be/src/olap/aggregate_func.cpp | 2 + be/src/olap/aggregate_func.h | 46 +++ be/src/olap/column_vector.cpp | 19 +- be/src/olap/field.h | 26 ++ be/src/olap/olap_common.h | 18 +- be/src/olap/row_block.cpp | 4 +- .../olap/rowset/segment_v2/encoding_info.cpp | 2 + be/src/olap/tablet_schema.cpp | 13 +- be/src/olap/tablet_schema.h | 8 +- be/src/olap/types.cpp | 12 +- be/src/olap/types.h | 25 +- be/src/olap/wrapper_field.cpp | 7 +- be/src/runtime/mysql_result_writer.cpp | 8 +- be/src/runtime/primitive_type.cpp | 12 + be/src/runtime/primitive_type.h | 21 +- be/src/runtime/raw_value.cpp | 2 + be/src/runtime/types.h | 6 +- be/src/runtime/vectorized_row_batch.cpp | 6 +- be/src/udf/udf.h | 3 +- be/src/util/CMakeLists.txt | 1 + be/src/util/quantile_state.cpp | 362 ++++++++++++++++++ be/src/util/quantile_state.h | 71 ++++ be/src/util/symbols_util.cpp | 1 + be/src/util/tdigest.h | 40 +- be/test/exprs/CMakeLists.txt | 1 + be/test/exprs/quantile_function_test.cpp | 135 +++++++ be/test/util/CMakeLists.txt | 1 + be/test/util/quantile_state_test.cpp | 54 +++ .../Data Definition/CREATE TABLE.md | 45 ++- .../Data Manipulation/STREAM LOAD.md | 14 +- .../Data Types/QUANTILE_STATE.md | 62 +++ .../Data Definition/CREATE TABLE.md | 39 +- .../Data Manipulation/STREAM LOAD.md | 15 +- .../Data Types/QUANTILE_STATE.md | 58 +++ fe/fe-core/src/main/cup/sql_parser.cup | 14 +- .../doris/analysis/CreateTableStmt.java | 21 +- .../java/org/apache/doris/analysis/Expr.java | 5 + .../doris/analysis/FunctionCallExpr.java | 19 + .../org/apache/doris/analysis/InsertStmt.java | 7 +- .../org/apache/doris/analysis/SelectStmt.java | 8 +- .../apache/doris/catalog/AggregateType.java | 25 +- .../org/apache/doris/catalog/Function.java | 2 + .../org/apache/doris/catalog/FunctionSet.java | 28 +- .../apache/doris/catalog/PrimitiveType.java | 24 +- .../apache/doris/catalog/ScalarFunction.java | 1 + .../org/apache/doris/catalog/ScalarType.java | 7 + .../java/org/apache/doris/catalog/Type.java | 38 +- .../org/apache/doris/common/util/Util.java | 1 + .../apache/doris/planner/BrokerScanNode.java | 1 + .../apache/doris/planner/LoadScanNode.java | 14 + .../doris/planner/SingleNodePlanner.java | 10 +- .../doris/task/HadoopLoadPendingTask.java | 4 + fe/fe-core/src/main/jflex/sql_scanner.flex | 2 + gensrc/script/doris_builtins_functions.py | 9 + gensrc/thrift/Types.thrift | 6 +- 67 files changed, 1498 insertions(+), 153 deletions(-) create mode 100644 be/src/exprs/quantile_function.cpp create mode 100644 be/src/exprs/quantile_function.h create mode 100644 be/src/util/quantile_state.cpp create mode 100644 be/src/util/quantile_state.h create mode 100644 be/test/exprs/quantile_function_test.cpp create mode 100644 be/test/util/quantile_state_test.cpp create mode 100644 docs/en/sql-reference/sql-statements/Data Types/QUANTILE_STATE.md create mode 100644 docs/zh-CN/sql-reference/sql-statements/Data Types/QUANTILE_STATE.md diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index f806989a42..ccfb4999e8 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -39,6 +39,7 @@ #include "exprs/math_functions.h" #include "exprs/new_in_predicate.h" #include "exprs/operators.h" +#include "exprs/quantile_function.h" #include "exprs/string_functions.h" #include "exprs/table_function/dummy_table_functions.h" #include "exprs/time_operators.h" @@ -254,6 +255,7 @@ void Daemon::init(int argc, char** argv, const std::vector& paths) { GroupingSetsFunctions::init(); BitmapFunctions::init(); HllFunctions::init(); + QuantileStateFunctions::init(); HashFunctions::init(); TopNFunctions::init(); DummyTableFunctions::init(); diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 7f2911070a..3a675d04f4 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -30,8 +30,8 @@ #include "runtime/descriptors.h" #include "runtime/mem_pool.h" #include "runtime/mem_tracker.h" -#include "runtime/thread_context.h" #include "runtime/runtime_state.h" +#include "runtime/thread_context.h" #include "service/backend_options.h" #include "util/doris_metrics.h" #include "util/mem_util.hpp" @@ -62,8 +62,11 @@ Status OlapScanner::prepare( bloom_filters) { set_tablet_reader(); // set limit to reduce end of rowset and segment mem use - _tablet_reader->set_batch_size(_parent->limit() == -1 ? _parent->_runtime_state->batch_size() : std::min( - static_cast(_parent->_runtime_state->batch_size()), _parent->limit())); + _tablet_reader->set_batch_size( + _parent->limit() == -1 + ? _parent->_runtime_state->batch_size() + : std::min(static_cast(_parent->_runtime_state->batch_size()), + _parent->limit())); // Get olap table TTabletId tablet_id = scan_range.tablet_id; @@ -128,8 +131,9 @@ Status OlapScanner::open() { if (res != OLAP_SUCCESS) { OLAP_LOG_WARNING("fail to init reader.[res=%d]", res); std::stringstream ss; - ss << "failed to initialize storage reader. tablet=" << _tablet_reader_params.tablet->full_name() - << ", res=" << res << ", backend=" << BackendOptions::get_localhost(); + ss << "failed to initialize storage reader. tablet=" + << _tablet_reader_params.tablet->full_name() << ", res=" << res + << ", backend=" << BackendOptions::get_localhost(); return Status::InternalError(ss.str().c_str()); } return Status::OK(); @@ -152,7 +156,8 @@ Status OlapScanner::_init_tablet_reader_params( _tablet_reader_params.conditions.push_back(filter); } std::copy(bloom_filters.cbegin(), bloom_filters.cend(), - std::inserter(_tablet_reader_params.bloom_filters, _tablet_reader_params.bloom_filters.begin())); + std::inserter(_tablet_reader_params.bloom_filters, + _tablet_reader_params.bloom_filters.begin())); // Range for (auto key_range : key_ranges) { @@ -175,11 +180,17 @@ Status OlapScanner::_init_tablet_reader_params( bool single_version = (_tablet_reader_params.rs_readers.size() == 1 && _tablet_reader_params.rs_readers[0]->rowset()->start_version() == 0 && - !_tablet_reader_params.rs_readers[0]->rowset()->rowset_meta()->is_segments_overlapping()) || + !_tablet_reader_params.rs_readers[0] + ->rowset() + ->rowset_meta() + ->is_segments_overlapping()) || (_tablet_reader_params.rs_readers.size() == 2 && _tablet_reader_params.rs_readers[0]->rowset()->rowset_meta()->num_rows() == 0 && _tablet_reader_params.rs_readers[1]->rowset()->start_version() == 2 && - !_tablet_reader_params.rs_readers[1]->rowset()->rowset_meta()->is_segments_overlapping()); + !_tablet_reader_params.rs_readers[1] + ->rowset() + ->rowset_meta() + ->is_segments_overlapping()); _tablet_reader_params.origin_return_columns = &_return_columns; _tablet_reader_params.tablet_columns_convert_to_null_set = &_tablet_columns_convert_to_null_set; @@ -202,7 +213,8 @@ Status OlapScanner::_init_tablet_reader_params( } // use _tablet_reader_params.return_columns, because reader use this to merge sort - OLAPStatus res = _read_row_cursor.init(_tablet->tablet_schema(), _tablet_reader_params.return_columns); + OLAPStatus res = + _read_row_cursor.init(_tablet->tablet_schema(), _tablet_reader_params.return_columns); if (res != OLAP_SUCCESS) { OLAP_LOG_WARNING("fail to init row cursor.[res=%d]", res); return Status::InternalError("failed to initialize storage read row cursor"); @@ -406,7 +418,6 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { } } } while (false); - } } @@ -434,6 +445,7 @@ void OlapScanner::_convert_row_to_tuple(Tuple* tuple) { } case TYPE_VARCHAR: case TYPE_OBJECT: + case TYPE_QUANTILE_STATE: case TYPE_HLL: case TYPE_STRING: { Slice* slice = reinterpret_cast(ptr); @@ -521,8 +533,7 @@ void OlapScanner::update_counter() { COUNTER_UPDATE(_parent->_del_filtered_counter, stats.rows_del_filtered); COUNTER_UPDATE(_parent->_del_filtered_counter, stats.rows_vec_del_cond_filtered); - COUNTER_UPDATE(_parent->_conditions_filtered_counter, - stats.rows_conditions_filtered); + COUNTER_UPDATE(_parent->_conditions_filtered_counter, stats.rows_conditions_filtered); COUNTER_UPDATE(_parent->_key_range_filtered_counter, stats.rows_key_range_filtered); COUNTER_UPDATE(_parent->_index_load_timer, stats.index_load_ns); @@ -535,8 +546,7 @@ void OlapScanner::update_counter() { COUNTER_UPDATE(_parent->_total_pages_num_counter, stats.total_pages_num); COUNTER_UPDATE(_parent->_cached_pages_num_counter, stats.cached_pages_num); - COUNTER_UPDATE(_parent->_bitmap_index_filter_counter, - stats.rows_bitmap_index_filtered); + COUNTER_UPDATE(_parent->_bitmap_index_filter_counter, stats.rows_bitmap_index_filtered); COUNTER_UPDATE(_parent->_bitmap_index_filter_timer, stats.bitmap_index_filter_timer); COUNTER_UPDATE(_parent->_block_seek_counter, stats.block_seek_num); diff --git a/be/src/exprs/CMakeLists.txt b/be/src/exprs/CMakeLists.txt index c71f23ec32..7ac1af85df 100644 --- a/be/src/exprs/CMakeLists.txt +++ b/be/src/exprs/CMakeLists.txt @@ -68,6 +68,7 @@ add_library(Exprs new_agg_fn_evaluator.cc bitmap_function.cpp hll_function.cpp + quantile_function.cpp grouping_sets_functions.cpp topn_function.cpp table_function/explode_split.cpp diff --git a/be/src/exprs/agg_fn_evaluator.cpp b/be/src/exprs/agg_fn_evaluator.cpp index b458c6f801..157cb113d8 100644 --- a/be/src/exprs/agg_fn_evaluator.cpp +++ b/be/src/exprs/agg_fn_evaluator.cpp @@ -319,6 +319,7 @@ inline void AggFnEvaluator::set_any_val(const void* slot, const TypeDescriptor& case TYPE_HLL: case TYPE_OBJECT: case TYPE_STRING: + case TYPE_QUANTILE_STATE: reinterpret_cast(slot)->to_string_val( reinterpret_cast(dst)); return; @@ -389,6 +390,7 @@ inline void AggFnEvaluator::set_output_slot(const AnyVal* src, const SlotDescrip case TYPE_VARCHAR: case TYPE_HLL: case TYPE_OBJECT: + case TYPE_QUANTILE_STATE: case TYPE_STRING: *reinterpret_cast(slot) = StringValue::from_string_val(*reinterpret_cast(src)); @@ -571,6 +573,7 @@ bool AggFnEvaluator::count_distinct_data_filter(TupleRow* row, Tuple* dst) { case TYPE_VARCHAR: case TYPE_HLL: case TYPE_OBJECT: + case TYPE_QUANTILE_STATE: case TYPE_STRING: { StringVal* value = reinterpret_cast(_staging_input_vals[i]); memcpy(begin, value->ptr, value->len); @@ -899,6 +902,7 @@ void AggFnEvaluator::serialize_or_finalize(FunctionContext* agg_fn_ctx, Tuple* s case TYPE_VARCHAR: case TYPE_HLL: case TYPE_OBJECT: + case TYPE_QUANTILE_STATE: case TYPE_STRING: { typedef StringVal (*Fn)(FunctionContext*, AnyVal*); StringVal v = reinterpret_cast(fn)(agg_fn_ctx, _staging_intermediate_val); diff --git a/be/src/exprs/anyval_util.cpp b/be/src/exprs/anyval_util.cpp index 7cf49edbf4..da0d8691ab 100644 --- a/be/src/exprs/anyval_util.cpp +++ b/be/src/exprs/anyval_util.cpp @@ -39,7 +39,8 @@ Status allocate_any_val(RuntimeState* state, MemPool* pool, const TypeDescriptor const int anyval_size = AnyValUtil::any_val_size(type); const int anyval_alignment = AnyValUtil::any_val_alignment(type); Status rst; - *result = reinterpret_cast(pool->try_allocate_aligned(anyval_size, anyval_alignment, &rst)); + *result = reinterpret_cast( + pool->try_allocate_aligned(anyval_size, anyval_alignment, &rst)); if (*result == nullptr) { RETURN_LIMIT_EXCEEDED(pool->mem_tracker(), state, mem_limit_exceeded_msg, anyval_size, rst); } @@ -81,6 +82,7 @@ AnyVal* create_any_val(ObjectPool* pool, const TypeDescriptor& type) { case TYPE_HLL: case TYPE_VARCHAR: case TYPE_OBJECT: + case TYPE_QUANTILE_STATE: case TYPE_STRING: return pool->add(new StringVal); @@ -147,6 +149,9 @@ FunctionContext::TypeDesc AnyValUtil::column_type_to_type_desc(const TypeDescrip case TYPE_OBJECT: out.type = FunctionContext::TYPE_OBJECT; // FIXME(cmy): is this fallthrough meaningful? + case TYPE_QUANTILE_STATE: + out.type = FunctionContext::TYPE_QUANTILE_STATE; + break; case TYPE_CHAR: out.type = FunctionContext::TYPE_CHAR; out.len = type.len; diff --git a/be/src/exprs/anyval_util.h b/be/src/exprs/anyval_util.h index f2874776c9..5c605b6a70 100644 --- a/be/src/exprs/anyval_util.h +++ b/be/src/exprs/anyval_util.h @@ -237,6 +237,7 @@ public: return sizeof(doris_udf::DoubleVal); case TYPE_OBJECT: + case TYPE_QUANTILE_STATE: case TYPE_HLL: case TYPE_CHAR: case TYPE_VARCHAR: @@ -279,6 +280,7 @@ public: case TYPE_DOUBLE: return alignof(DoubleVal); case TYPE_OBJECT: + case TYPE_QUANTILE_STATE: case TYPE_HLL: case TYPE_VARCHAR: case TYPE_CHAR: @@ -379,6 +381,7 @@ public: case TYPE_VARCHAR: case TYPE_HLL: case TYPE_OBJECT: + case TYPE_QUANTILE_STATE: case TYPE_STRING: reinterpret_cast(slot)->to_string_val( reinterpret_cast(dst)); diff --git a/be/src/exprs/case_expr.cpp b/be/src/exprs/case_expr.cpp index e1290fc7b0..df3bc64518 100644 --- a/be/src/exprs/case_expr.cpp +++ b/be/src/exprs/case_expr.cpp @@ -110,6 +110,7 @@ void CaseExpr::get_child_val(int child_idx, ExprContext* ctx, TupleRow* row, Any case TYPE_VARCHAR: case TYPE_HLL: case TYPE_OBJECT: + case TYPE_QUANTILE_STATE: case TYPE_STRING: *reinterpret_cast(dst) = _children[child_idx]->get_string_val(ctx, row); break; @@ -155,6 +156,7 @@ bool CaseExpr::any_val_eq(const TypeDescriptor& type, const AnyVal* v1, const An case TYPE_VARCHAR: case TYPE_HLL: case TYPE_OBJECT: + case TYPE_QUANTILE_STATE: case TYPE_STRING: return AnyValUtil::equals(type, *reinterpret_cast(v1), *reinterpret_cast(v2)); diff --git a/be/src/exprs/expr.cpp b/be/src/exprs/expr.cpp index 1bf229b4e9..c43866ff01 100644 --- a/be/src/exprs/expr.cpp +++ b/be/src/exprs/expr.cpp @@ -142,6 +142,7 @@ Expr::Expr(const TypeDescriptor& type) case TYPE_HLL: case TYPE_OBJECT: case TYPE_STRING: + case TYPE_QUANTILE_STATE: _node_type = (TExprNodeType::STRING_LITERAL); break; @@ -199,6 +200,7 @@ Expr::Expr(const TypeDescriptor& type, bool is_slotref) case TYPE_VARCHAR: case TYPE_HLL: case TYPE_OBJECT: + case TYPE_QUANTILE_STATE: case TYPE_STRING: _node_type = (TExprNodeType::STRING_LITERAL); break; @@ -687,6 +689,7 @@ doris_udf::AnyVal* Expr::get_const_val(ExprContext* context) { case TYPE_VARCHAR: case TYPE_HLL: case TYPE_OBJECT: + case TYPE_QUANTILE_STATE: case TYPE_STRING: { _constant_val.reset(new StringVal(get_string_val(context, nullptr))); break; diff --git a/be/src/exprs/expr_context.cpp b/be/src/exprs/expr_context.cpp index 46aa9c1585..b632e1b333 100644 --- a/be/src/exprs/expr_context.cpp +++ b/be/src/exprs/expr_context.cpp @@ -246,6 +246,7 @@ void* ExprContext::get_value(Expr* e, TupleRow* row) { case TYPE_VARCHAR: case TYPE_HLL: case TYPE_OBJECT: + case TYPE_QUANTILE_STATE: case TYPE_STRING: { doris_udf::StringVal v = e->get_string_val(this, row); if (v.is_null) { @@ -375,7 +376,7 @@ Status ExprContext::get_const_value(RuntimeState* state, Expr& expr, AnyVal** co char* ptr_copy = reinterpret_cast(_pool->try_allocate(sv->len, &rst)); if (ptr_copy == nullptr) { RETURN_LIMIT_EXCEEDED(_pool->mem_tracker(), state, - "Could not allocate constant string value", sv->len, rst); + "Could not allocate constant string value", sv->len, rst); } memcpy(ptr_copy, sv->ptr, sv->len); sv->ptr = reinterpret_cast(ptr_copy); diff --git a/be/src/exprs/new_agg_fn_evaluator.cc b/be/src/exprs/new_agg_fn_evaluator.cc index f5e0ec0e65..7e3738b631 100644 --- a/be/src/exprs/new_agg_fn_evaluator.cc +++ b/be/src/exprs/new_agg_fn_evaluator.cc @@ -259,6 +259,7 @@ void NewAggFnEvaluator::SetDstSlot(const AnyVal* src, const SlotDescriptor& dst_ case TYPE_VARCHAR: case TYPE_HLL: case TYPE_OBJECT: + case TYPE_QUANTILE_STATE: case TYPE_STRING: *reinterpret_cast(slot) = StringValue::from_string_val(*reinterpret_cast(src)); @@ -356,6 +357,7 @@ inline void NewAggFnEvaluator::set_any_val(const void* slot, const TypeDescripto case TYPE_VARCHAR: case TYPE_HLL: case TYPE_OBJECT: + case TYPE_QUANTILE_STATE: case TYPE_STRING: reinterpret_cast(slot)->to_string_val( reinterpret_cast(dst)); @@ -598,6 +600,7 @@ void NewAggFnEvaluator::SerializeOrFinalize(Tuple* src, const SlotDescriptor& ds case TYPE_VARCHAR: case TYPE_HLL: case TYPE_OBJECT: + case TYPE_QUANTILE_STATE: case TYPE_STRING: { typedef StringVal (*Fn)(FunctionContext*, AnyVal*); StringVal v = reinterpret_cast(fn)(agg_fn_ctx_.get(), staging_intermediate_val_); diff --git a/be/src/exprs/quantile_function.cpp b/be/src/exprs/quantile_function.cpp new file mode 100644 index 0000000000..d925b251e6 --- /dev/null +++ b/be/src/exprs/quantile_function.cpp @@ -0,0 +1,164 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exprs/quantile_function.h" + +#include "exprs/anyval_util.h" +#include "gutil/strings/numbers.h" +#include "gutil/strings/split.h" +#include "util/quantile_state.h" +#include "util/slice.h" +#include "util/string_parser.hpp" + +namespace doris { + +using doris_udf::DoubleVal; +using doris_udf::StringVal; +using doris_udf::FloatVal; + +void QuantileStateFunctions::init() {} + +void QuantileStateFunctions::quantile_state_init(FunctionContext* ctx, StringVal* dst) { + dst->is_null = false; + dst->len = sizeof(QuantileState); + dst->ptr = (uint8_t*)new QuantileState(); +} + +void QuantileStateFunctions::quantile_percent_prepare(FunctionContext* ctx, + FunctionContext::FunctionStateScope scope) { + if (scope != FunctionContext::FRAGMENT_LOCAL) { + return; + } + if (!ctx->is_arg_constant(1)) { + std::stringstream ss; + ss << "quantile_percent function's second arg must be constant."; + ctx->set_error(ss.str().c_str()); + return; + } + float percentile_value = reinterpret_cast(ctx->get_constant_arg(1))->val; + if (percentile_value > 1 || percentile_value < 0) { + std::stringstream error_msg; + error_msg << "The percentile must between 0 and 1, but input is:" + << std::to_string(percentile_value); + ctx->set_error(error_msg.str().c_str()); + return; + } +} + +void QuantileStateFunctions::to_quantile_state_prepare(FunctionContext* ctx, + FunctionContext::FunctionStateScope scope) { + if (scope != FunctionContext::FRAGMENT_LOCAL) { + return; + } + if (!ctx->is_arg_constant(1)) { + // use default value, just return is ok. + return; + } + float compression = reinterpret_cast(ctx->get_constant_arg(1))->val; + if (compression > QUANTILE_STATE_COMPRESSION_MAX || + compression < QUANTILE_STATE_COMPRESSION_MIN) { + std::stringstream error_msg; + error_msg << "The compression of to_quantile_state must between " + << QUANTILE_STATE_COMPRESSION_MIN << " and " << QUANTILE_STATE_COMPRESSION_MAX + << std::endl + << "but input is:" << std::to_string(compression); + ctx->set_error(error_msg.str().c_str()); + return; + } +} + +static StringVal serialize(FunctionContext* ctx, QuantileState* value) { + StringVal result(ctx, value->get_serialized_size()); + value->serialize(result.ptr); + return result; +} + +StringVal QuantileStateFunctions::to_quantile_state(FunctionContext* ctx, const StringVal& src) { + QuantileState quantile_state; + quantile_state.set_compression(QUANTILE_STATE_COMPRESSION_MIN); + const AnyVal* digest_compression = ctx->get_constant_arg(1); + if (digest_compression != nullptr) { + // compression will be between 2048 and 10000, promised by `to_quantile_state_prepare` + float compression = reinterpret_cast(digest_compression)->val; + quantile_state.set_compression(compression); + } + + if (!src.is_null) { + StringParser::ParseResult parse_result = StringParser::PARSE_SUCCESS; + double double_value = StringParser::string_to_float( + reinterpret_cast(src.ptr), src.len, &parse_result); + if (UNLIKELY(parse_result != StringParser::PARSE_SUCCESS)) { + std::stringstream error_msg; + error_msg << "The input: " << std::string(reinterpret_cast(src.ptr), src.len) + << " is not valid, to_quantile_state only support bigint value from 0 to " + "18446744073709551615 currently"; + ctx->set_error(error_msg.str().c_str()); + return StringVal::null(); + } + quantile_state.add_value(double_value); + } + return serialize(ctx, &quantile_state); +} + +void QuantileStateFunctions::quantile_union(FunctionContext* ctx, const StringVal& src, + StringVal* dst) { + if (src.is_null) { + return; + } + auto dst_quantile = reinterpret_cast*>(dst->ptr); + if (src.len == 0) { + dst_quantile->merge(*reinterpret_cast*>(src.ptr)); + } else { + QuantileState state(Slice(src.ptr, src.len)); + dst_quantile->merge(state); + } +} + +DoubleVal QuantileStateFunctions::quantile_percent(FunctionContext* ctx, StringVal& src) { + const AnyVal* percentile = ctx->get_constant_arg(1); + if (percentile != nullptr) { + // percentile_value will be between 0 and 1, promised by `quantile_percent_prepare` + float percentile_value = reinterpret_cast(percentile)->val; + if (src.len == 0) { + auto quantile_state = reinterpret_cast*>(src.ptr); + return {static_cast(quantile_state->get_value_by_percentile(percentile_value))}; + } else { + QuantileState quantile_state(Slice(src.ptr, src.len)); + return {static_cast(quantile_state.get_value_by_percentile(percentile_value))}; + } + + } else { + std::stringstream error_msg; + error_msg << "quantile_percent function's second argument must be constant. eg: " + "quantile_percent(col, 0.95)"; + ctx->set_error(error_msg.str().c_str()); + } + return DoubleVal::null(); +} + +StringVal QuantileStateFunctions::quantile_state_serialize(FunctionContext* ctx, + const StringVal& src) { + if (src.is_null) { + return src; + } + auto tmp_ptr = reinterpret_cast*>(src.ptr); + StringVal result = serialize(ctx, tmp_ptr); + delete tmp_ptr; + return result; +} + +} // namespace doris \ No newline at end of file diff --git a/be/src/exprs/quantile_function.h b/be/src/exprs/quantile_function.h new file mode 100644 index 0000000000..93d317cf93 --- /dev/null +++ b/be/src/exprs/quantile_function.h @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "udf/udf.h" +namespace doris { +class QuantileStateFunctions { +public: + static void init(); + static void quantile_state_init(FunctionContext* ctx, StringVal* dst); + static void to_quantile_state_prepare(FunctionContext* ctx, + FunctionContext::FunctionStateScope scope); + static void quantile_percent_prepare(FunctionContext* ctx, + FunctionContext::FunctionStateScope scope); + static StringVal to_quantile_state(FunctionContext* ctx, const StringVal& src); + static void quantile_union(FunctionContext* ctx, const StringVal& src, StringVal* dst); + static DoubleVal quantile_percent(FunctionContext* ctx, StringVal& src); + static StringVal quantile_state_serialize(FunctionContext* ctx, const StringVal& src); +}; + +} // namespace doris diff --git a/be/src/olap/aggregate_func.cpp b/be/src/olap/aggregate_func.cpp index 6e844e786c..4d583d476b 100644 --- a/be/src/olap/aggregate_func.cpp +++ b/be/src/olap/aggregate_func.cpp @@ -201,6 +201,8 @@ AggregateFuncResolver::AggregateFuncResolver() { add_aggregate_mapping(); add_aggregate_mapping(); //for backward compatibility + // quantile_state Aggregate Function + add_aggregate_mapping(); } AggregateFuncResolver::~AggregateFuncResolver() { diff --git a/be/src/olap/aggregate_func.h b/be/src/olap/aggregate_func.h index 953d7d52d3..15302dcac3 100644 --- a/be/src/olap/aggregate_func.h +++ b/be/src/olap/aggregate_func.h @@ -26,6 +26,7 @@ #include "runtime/mem_pool.h" #include "runtime/string_value.h" #include "util/bitmap_value.h" +#include "util/quantile_state.h" namespace doris { @@ -576,6 +577,51 @@ struct AggregateFuncTraits { }; +template <> +struct AggregateFuncTraits { + static void init(RowCursorCell* dst, const char* src, bool src_null, MemPool* mem_pool, + ObjectPool* agg_pool) { + DCHECK_EQ(src_null, false); + dst->set_not_null(); + + auto* src_slice = reinterpret_cast(src); + auto* dst_slice = reinterpret_cast(dst->mutable_cell_ptr()); + + // we use zero size represent this slice is a agg object + dst_slice->size = 0; + auto* dst_quantile_state = new QuantileState(*src_slice); + + dst_slice->data = reinterpret_cast(dst_quantile_state); + + agg_pool->add(dst_quantile_state); + } + + static void update(RowCursorCell* dst, const RowCursorCell& src, MemPool* mem_pool) { + DCHECK_EQ(src.is_null(), false); + + auto* dst_slice = reinterpret_cast(dst->mutable_cell_ptr()); + auto* src_slice = reinterpret_cast(src.cell_ptr()); + auto* dst_quantile_state = reinterpret_cast*>(dst_slice->data); + + if (mem_pool == nullptr) { // for query + QuantileState src_state(*src_slice); + dst_quantile_state->merge(src_state); + } else { // for stream load + auto* src_state = reinterpret_cast*>(src_slice->data); + dst_quantile_state->merge(*src_state); + } + } + + // The quantile_state object memory will be released by ObjectPool + static void finalize(RowCursorCell* src, MemPool* mem_pool) { + auto* slice = reinterpret_cast(src->mutable_cell_ptr()); + auto* quantile_state = reinterpret_cast*>(slice->data); + + slice->data = (char*)mem_pool->allocate(quantile_state->get_serialized_size()); + slice->size = quantile_state->serialize((uint8_t*)slice->data); + quantile_state->clear(); + } +}; template struct AggregateTraits : public AggregateFuncTraits { diff --git a/be/src/olap/column_vector.cpp b/be/src/olap/column_vector.cpp index 0237b62e14..e9f5c5dc4f 100644 --- a/be/src/olap/column_vector.cpp +++ b/be/src/olap/column_vector.cpp @@ -31,8 +31,8 @@ Status ColumnVectorBatch::resize(size_t new_cap) { return Status::OK(); } -Status ColumnVectorBatch::create(size_t init_capacity, bool is_nullable, std::shared_ptr type_info, - Field* field, +Status ColumnVectorBatch::create(size_t init_capacity, bool is_nullable, + std::shared_ptr type_info, Field* field, std::unique_ptr* column_vector_batch) { if (is_scalar_type(type_info->type())) { std::unique_ptr local; @@ -117,6 +117,11 @@ Status ColumnVectorBatch::create(size_t init_capacity, bool is_nullable, std::sh local.reset(new ScalarColumnVectorBatch::CppType>( type_info, is_nullable)); break; + case OLAP_FIELD_TYPE_QUANTILE_STATE: + local.reset(new ScalarColumnVectorBatch< + CppTypeTraits::CppType>(type_info, + is_nullable)); + break; default: return Status::NotSupported("unsupported type for ColumnVectorBatch: " + std::to_string(type_info->type())); @@ -139,8 +144,7 @@ Status ColumnVectorBatch::create(size_t init_capacity, bool is_nullable, std::sh array_type_info->item_type_info(), field->get_sub_field(0), &elements)); std::unique_ptr offsets; - auto offsets_type_info = - get_scalar_type_info(FieldType::OLAP_FIELD_TYPE_UNSIGNED_INT); + auto offsets_type_info = get_scalar_type_info(FieldType::OLAP_FIELD_TYPE_UNSIGNED_INT); RETURN_IF_ERROR(ColumnVectorBatch::create(init_capacity + 1, false, offsets_type_info, nullptr, &offsets)); @@ -160,8 +164,8 @@ Status ColumnVectorBatch::create(size_t init_capacity, bool is_nullable, std::sh } template -ScalarColumnVectorBatch::ScalarColumnVectorBatch(std::shared_ptr type_info, - bool is_nullable) +ScalarColumnVectorBatch::ScalarColumnVectorBatch( + std::shared_ptr type_info, bool is_nullable) : ColumnVectorBatch(type_info, is_nullable), _data(0) {} template @@ -176,7 +180,8 @@ Status ScalarColumnVectorBatch::resize(size_t new_cap) { return Status::OK(); } -ArrayColumnVectorBatch::ArrayColumnVectorBatch(std::shared_ptr type_info, bool is_nullable, +ArrayColumnVectorBatch::ArrayColumnVectorBatch(std::shared_ptr type_info, + bool is_nullable, ScalarColumnVectorBatch* offsets, ColumnVectorBatch* elements) : ColumnVectorBatch(type_info, is_nullable), _data(0) { diff --git a/be/src/olap/field.h b/be/src/olap/field.h index 4abcb4d2bf..456ff9770f 100644 --- a/be/src/olap/field.h +++ b/be/src/olap/field.h @@ -675,6 +675,30 @@ public: } }; +class QuantileStateAggField : public Field { +public: + explicit QuantileStateAggField() : Field() {} + explicit QuantileStateAggField(const TabletColumn& column) : Field(column) {} + + // quantile_state storage data always not null + void agg_init(RowCursorCell* dst, const RowCursorCell& src, MemPool* mem_pool, + ObjectPool* agg_pool) const override { + _agg_info->init(dst, (const char*)src.cell_ptr(), false, mem_pool, agg_pool); + } + + char* allocate_memory(char* cell_ptr, char* variable_ptr) const override { + auto slice = (Slice*)cell_ptr; + slice->data = nullptr; + return variable_ptr; + } + + QuantileStateAggField* clone() const override { + auto* local = new QuantileStateAggField(); + Field::clone(local); + return local; + } +}; + class HllAggField : public Field { public: explicit HllAggField() : Field() {} @@ -750,6 +774,8 @@ public: return new HllAggField(column); case OLAP_FIELD_AGGREGATION_BITMAP_UNION: return new BitmapAggField(column); + case OLAP_FIELD_AGGREGATION_QUANTILE_UNION: + return new QuantileStateAggField(column); case OLAP_FIELD_AGGREGATION_UNKNOWN: LOG(WARNING) << "WOW! value column agg type is unknown"; return nullptr; diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h index 470efee1f8..56c23fb7e6 100644 --- a/be/src/olap/olap_common.h +++ b/be/src/olap/olap_common.h @@ -143,7 +143,8 @@ enum FieldType { OLAP_FIELD_TYPE_HLL = 23, OLAP_FIELD_TYPE_BOOL = 24, OLAP_FIELD_TYPE_OBJECT = 25, - OLAP_FIELD_TYPE_STRING = 26 + OLAP_FIELD_TYPE_STRING = 26, + OLAP_FIELD_TYPE_QUANTILE_STATE = 27 }; // Define all aggregation methods supported by Field @@ -161,16 +162,17 @@ enum FieldAggregationMethod { OLAP_FIELD_AGGREGATION_BITMAP_UNION = 7, // Replace if and only if added value is not null OLAP_FIELD_AGGREGATION_REPLACE_IF_NOT_NULL = 8, + OLAP_FIELD_AGGREGATION_QUANTILE_UNION = 9 }; // Compression algorithm type enum OLAPCompressionType { // Compression algorithm used for network transmission, low compression rate, low cpu overhead OLAP_COMP_TRANSPORT = 1, - // Compression algorithm used for hard disk data, with high compression rate and high CPU overhead - OLAP_COMP_STORAGE = 2, - // The compression algorithm used for storage, the compression rate is low, and the cpu overhead is low - OLAP_COMP_LZ4 = 3, + // Compression algorithm used for hard disk data, with high compression rate and high CPU overhead + OLAP_COMP_STORAGE = 2, + // The compression algorithm used for storage, the compression rate is low, and the cpu overhead is low + OLAP_COMP_LZ4 = 3, }; enum PushType { @@ -289,12 +291,12 @@ struct OlapReaderStatistics { // general_debug_ns is designed for the purpose of DEBUG, to record any infomations of debugging or profiling. // different from specific meaningful timer such as index_load_ns, general_debug_ns can be used flexibly. // general_debug_ns has associated with OlapScanNode's _general_debug_timer already. - // so general_debug_ns' values will update to _general_debug_timer automaticly, + // so general_debug_ns' values will update to _general_debug_timer automaticly, // the timer result can be checked through QueryProfile web page easily. - // when search general_debug_ns, you can find that general_debug_ns has not been used, + // when search general_debug_ns, you can find that general_debug_ns has not been used, // this is because such codes added for debug purpose should not commit, it's just for debuging. // so, please do not delete general_debug_ns defined here - // usage example: + // usage example: // SCOPED_RAW_TIMER(&_stats->general_debug_ns[1]); int64_t general_debug_ns[GENERAL_DEBUG_COUNT] = {}; }; diff --git a/be/src/olap/row_block.cpp b/be/src/olap/row_block.cpp index 061972edd3..92c95cbd12 100644 --- a/be/src/olap/row_block.cpp +++ b/be/src/olap/row_block.cpp @@ -87,9 +87,7 @@ void RowBlock::_compute_layout() { _field_offset_in_memory.push_back(memory_size); // All field has a nullbyte in memory - if (column.type() == OLAP_FIELD_TYPE_VARCHAR || column.type() == OLAP_FIELD_TYPE_HLL || - column.type() == OLAP_FIELD_TYPE_CHAR || column.type() == OLAP_FIELD_TYPE_OBJECT || - column.type() == OLAP_FIELD_TYPE_STRING) { + if (column.is_length_variable_type()) { // 变长部分额外计算下实际最大的字符串长度(此处length已经包括记录Length的2个字节) memory_size += sizeof(Slice) + sizeof(char); } else { diff --git a/be/src/olap/rowset/segment_v2/encoding_info.cpp b/be/src/olap/rowset/segment_v2/encoding_info.cpp index bcad01f200..57a7eb586e 100644 --- a/be/src/olap/rowset/segment_v2/encoding_info.cpp +++ b/be/src/olap/rowset/segment_v2/encoding_info.cpp @@ -278,6 +278,8 @@ EncodingInfoResolver::EncodingInfoResolver() { _add_map(); _add_map(); + + _add_map(); } EncodingInfoResolver::~EncodingInfoResolver() { diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index 63d4e833aa..815bd1e6b4 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -79,6 +79,8 @@ FieldType TabletColumn::get_field_type_by_string(const std::string& type_str) { type = OLAP_FIELD_TYPE_OBJECT; } else if (0 == upper_type_str.compare("ARRAY")) { type = OLAP_FIELD_TYPE_ARRAY; + } else if (0 == upper_type_str.compare("QUANTILE_STATE")) { + type = OLAP_FIELD_TYPE_QUANTILE_STATE; } else { LOG(WARNING) << "invalid type string. [type='" << type_str << "']"; type = OLAP_FIELD_TYPE_UNKNOWN; @@ -108,6 +110,8 @@ FieldAggregationMethod TabletColumn::get_aggregation_type_by_string(const std::s aggregation_type = OLAP_FIELD_AGGREGATION_HLL_UNION; } else if (0 == upper_str.compare("BITMAP_UNION")) { aggregation_type = OLAP_FIELD_AGGREGATION_BITMAP_UNION; + } else if (0 == upper_str.compare("QUANTILE_UNION")) { + aggregation_type = OLAP_FIELD_AGGREGATION_QUANTILE_UNION; } else { LOG(WARNING) << "invalid aggregation type string. [aggregation='" << str << "']"; aggregation_type = OLAP_FIELD_AGGREGATION_UNKNOWN; @@ -189,6 +193,8 @@ std::string TabletColumn::get_string_by_field_type(FieldType type) { case OLAP_FIELD_TYPE_OBJECT: return "OBJECT"; + case OLAP_FIELD_TYPE_QUANTILE_STATE: + return "QUANTILE_STATE"; default: return "UNKNOWN"; @@ -221,6 +227,9 @@ std::string TabletColumn::get_string_by_aggregation_type(FieldAggregationMethod case OLAP_FIELD_AGGREGATION_BITMAP_UNION: return "BITMAP_UNION"; + case OLAP_FIELD_AGGREGATION_QUANTILE_UNION: + return "QUANTILE_UNION"; + default: return "UNKNOWN"; } @@ -247,6 +256,7 @@ uint32_t TabletColumn::get_field_length_by_type(TPrimitiveType::type type, uint3 return 4; case TPrimitiveType::DOUBLE: return 8; + case TPrimitiveType::QUANTILE_STATE: case TPrimitiveType::OBJECT: return 16; case TPrimitiveType::CHAR: @@ -493,7 +503,8 @@ void TabletSchema::init_field_index_for_test() { } } -vectorized::Block TabletSchema::create_block(const std::vector& return_columns, +vectorized::Block TabletSchema::create_block( + const std::vector& return_columns, const std::unordered_set* tablet_columns_need_convert_null) const { vectorized::Block block; for (int i = 0; i < return_columns.size(); ++i) { diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index f9bd519cc1..eda9c2d3dc 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -48,6 +48,11 @@ public: inline bool is_nullable() const { return _is_nullable; } inline bool is_bf_column() const { return _is_bf_column; } inline bool has_bitmap_index() const { return _has_bitmap_index; } + inline bool is_length_variable_type() const { + return _type == OLAP_FIELD_TYPE_CHAR || _type == OLAP_FIELD_TYPE_VARCHAR || + _type == OLAP_FIELD_TYPE_STRING || _type == OLAP_FIELD_TYPE_HLL || + _type == OLAP_FIELD_TYPE_OBJECT || _type == OLAP_FIELD_TYPE_QUANTILE_STATE; + } bool has_default_value() const { return _has_default_value; } std::string default_value() const { return _default_value; } bool has_reference_column() const { return _has_referenced_column; } @@ -143,7 +148,8 @@ public: inline void set_delete_sign_idx(int32_t delete_sign_idx) { _delete_sign_idx = delete_sign_idx; } inline bool has_sequence_col() const { return _sequence_col_idx != -1; } inline int32_t sequence_col_idx() const { return _sequence_col_idx; } - vectorized::Block create_block(const std::vector& return_columns, + vectorized::Block create_block( + const std::vector& return_columns, const std::unordered_set* tablet_columns_need_convert_null = nullptr) const; private: diff --git a/be/src/olap/types.cpp b/be/src/olap/types.cpp index 69761ad9a0..d3f1f3b980 100644 --- a/be/src/olap/types.cpp +++ b/be/src/olap/types.cpp @@ -16,6 +16,7 @@ // under the License. #include "olap/types.h" + #include namespace doris { @@ -83,6 +84,7 @@ ScalarTypeInfoResolver::ScalarTypeInfoResolver() { add_mapping(); add_mapping(); add_mapping(); + add_mapping(); } ScalarTypeInfoResolver::~ScalarTypeInfoResolver() {} @@ -127,7 +129,7 @@ public: std::shared_ptr get_type_info(const TabletColumn& column) { DCHECK(column.get_subtype_count() == 1) << "more than 1 child type."; - const auto &sub_column = column.get_sub_column(0); + const auto& sub_column = column.get_sub_column(0); if (is_scalar_type(sub_column.type())) { return get_type_info(sub_column.type()); } else { @@ -136,7 +138,8 @@ public: } std::shared_ptr get_type_info(const segment_v2::ColumnMetaPB& column_meta_pb) { - DCHECK(column_meta_pb.children_columns_size() >= 1 && column_meta_pb.children_columns_size() <= 3) + DCHECK(column_meta_pb.children_columns_size() >= 1 && + column_meta_pb.children_columns_size() <= 3) << "more than 3 children or no children."; const auto& child_type = column_meta_pb.children_columns(0); if (is_scalar_type((FieldType)child_type.type())) { @@ -149,9 +152,8 @@ public: private: template void add_mapping() { - _type_mapping.emplace( - item_type, - std::shared_ptr(new ArrayTypeInfo(get_scalar_type_info(item_type)))); + _type_mapping.emplace(item_type, std::shared_ptr(new ArrayTypeInfo( + get_scalar_type_info(item_type)))); } // item_type_info -> list_type_info diff --git a/be/src/olap/types.h b/be/src/olap/types.h index e007df90ef..c86b5f241c 100644 --- a/be/src/olap/types.h +++ b/be/src/olap/types.h @@ -503,11 +503,15 @@ template <> struct CppTypeTraits { using CppType = Slice; }; + +template <> +struct CppTypeTraits { + using CppType = Slice; +}; template <> struct CppTypeTraits { using CppType = CollectionValue; }; - template struct BaseFieldtypeTraits : public CppTypeTraits { using CppType = typename CppTypeTraits::CppType; @@ -1219,6 +1223,25 @@ struct FieldTypeTraits : public FieldTypeTraits +struct FieldTypeTraits + : public FieldTypeTraits { + /* + * quantile_state type only used as value, so + * cmp/from_string/set_to_max/set_to_min function + * in this struct has no significance + */ + + // See copy_row_in_memtable() in olap/row.h, will be removed in future. + static void copy_object(void* dest, const void* src, MemPool* mem_pool) { + auto dst_slice = reinterpret_cast(dest); + auto src_slice = reinterpret_cast(src); + DCHECK_EQ(src_slice->size, 0); + dst_slice->data = src_slice->data; + dst_slice->size = 0; + } +}; + // Instantiate this template to get static access to the type traits. template struct TypeTraits : public FieldTypeTraits { diff --git a/be/src/olap/wrapper_field.cpp b/be/src/olap/wrapper_field.cpp index be908a2725..467e209bc8 100644 --- a/be/src/olap/wrapper_field.cpp +++ b/be/src/olap/wrapper_field.cpp @@ -69,9 +69,10 @@ WrapperField* WrapperField::create_by_type(const FieldType& type, int32_t var_le if (rep == nullptr) { return nullptr; } - bool is_string_type = (type == OLAP_FIELD_TYPE_CHAR || type == OLAP_FIELD_TYPE_VARCHAR || - type == OLAP_FIELD_TYPE_HLL || type == OLAP_FIELD_TYPE_OBJECT || - type == OLAP_FIELD_TYPE_STRING); + bool is_string_type = + (type == OLAP_FIELD_TYPE_CHAR || type == OLAP_FIELD_TYPE_VARCHAR || + type == OLAP_FIELD_TYPE_HLL || type == OLAP_FIELD_TYPE_OBJECT || + type == OLAP_FIELD_TYPE_STRING || type == OLAP_FIELD_TYPE_QUANTILE_STATE); auto wrapper = new WrapperField(rep, var_length, is_string_type); return wrapper; } diff --git a/be/src/runtime/mysql_result_writer.cpp b/be/src/runtime/mysql_result_writer.cpp index 1cdfdc4376..6920db6870 100644 --- a/be/src/runtime/mysql_result_writer.cpp +++ b/be/src/runtime/mysql_result_writer.cpp @@ -28,11 +28,10 @@ #include "util/date_func.h" #include "util/mysql_row_buffer.h" #include "util/types.h" - -#include "vec/core/block.h" -#include "vec/columns/column_vector.h" #include "vec/columns/column_nullable.h" +#include "vec/columns/column_vector.h" #include "vec/common/assert_cast.h" +#include "vec/core/block.h" #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" @@ -121,7 +120,8 @@ int MysqlResultWriter::_add_row_value(int index, const TypeDescriptor& type, voi } case TYPE_HLL: - case TYPE_OBJECT: { + case TYPE_OBJECT: + case TYPE_QUANTILE_STATE: { if (_output_object_data) { const StringValue* string_val = (const StringValue*)(item); diff --git a/be/src/runtime/primitive_type.cpp b/be/src/runtime/primitive_type.cpp index 53df75bc03..1fb4d96b78 100644 --- a/be/src/runtime/primitive_type.cpp +++ b/be/src/runtime/primitive_type.cpp @@ -96,6 +96,9 @@ PrimitiveType thrift_to_type(TPrimitiveType::type ttype) { case TPrimitiveType::OBJECT: return TYPE_OBJECT; + case TPrimitiveType::QUANTILE_STATE: + return TYPE_QUANTILE_STATE; + case TPrimitiveType::ARRAY: return TYPE_ARRAY; @@ -166,6 +169,9 @@ TPrimitiveType::type to_thrift(PrimitiveType ptype) { case TYPE_OBJECT: return TPrimitiveType::OBJECT; + case TYPE_QUANTILE_STATE: + return TPrimitiveType::QUANTILE_STATE; + case TYPE_ARRAY: return TPrimitiveType::ARRAY; @@ -236,6 +242,9 @@ std::string type_to_string(PrimitiveType t) { case TYPE_OBJECT: return "OBJECT"; + case TYPE_QUANTILE_STATE: + return "QUANTILE_STATE"; + case TYPE_ARRAY: return "ARRAY"; @@ -306,6 +315,8 @@ std::string type_to_odbc_string(PrimitiveType t) { case TYPE_OBJECT: return "object"; + case TYPE_QUANTILE_STATE: + return "quantile_state"; }; return "unknown"; @@ -346,6 +357,7 @@ int get_slot_size(PrimitiveType type) { switch (type) { case TYPE_OBJECT: case TYPE_HLL: + case TYPE_QUANTILE_STATE: case TYPE_CHAR: case TYPE_VARCHAR: case TYPE_STRING: diff --git a/be/src/runtime/primitive_type.h b/be/src/runtime/primitive_type.h index 224957f0de..a95e00e27e 100644 --- a/be/src/runtime/primitive_type.h +++ b/be/src/runtime/primitive_type.h @@ -28,7 +28,6 @@ #include "runtime/large_int_value.h" #include "runtime/string_value.h" #include "udf/udf.h" - #include "vec/columns/column_decimal.h" #include "vec/columns/column_string.h" #include "vec/columns/columns_number.h" @@ -62,9 +61,10 @@ enum PrimitiveType { TYPE_HLL, /* 19 */ TYPE_DECIMALV2, /* 20 */ - TYPE_TIME, /* 21 */ - TYPE_OBJECT, /* 22 */ - TYPE_STRING, /* 23 */ + TYPE_TIME, /* 21 */ + TYPE_OBJECT, /* 22 */ + TYPE_STRING, /* 23 */ + TYPE_QUANTILE_STATE /* 24 */ }; inline PrimitiveType convert_type_to_primitive(FunctionContext::Type type) { @@ -93,6 +93,8 @@ inline PrimitiveType convert_type_to_primitive(FunctionContext::Type type) { return PrimitiveType::TYPE_OBJECT; case FunctionContext::Type::TYPE_HLL: return PrimitiveType::TYPE_HLL; + case FunctionContext::Type::TYPE_QUANTILE_STATE: + return PrimitiveType::TYPE_QUANTILE_STATE; case FunctionContext::Type::TYPE_TINYINT: return PrimitiveType::TYPE_TINYINT; case FunctionContext::Type::TYPE_SMALLINT: @@ -151,13 +153,15 @@ inline bool is_string_type(PrimitiveType type) { } inline bool has_variable_type(PrimitiveType type) { - return type == TYPE_CHAR || type == TYPE_VARCHAR || type == TYPE_OBJECT || type == TYPE_STRING; + return type == TYPE_CHAR || type == TYPE_VARCHAR || type == TYPE_OBJECT || + type == TYPE_QUANTILE_STATE || type == TYPE_STRING; } // Returns the byte size of 'type' Returns 0 for variable length types. inline int get_byte_size(PrimitiveType type) { switch (type) { case TYPE_OBJECT: + case TYPE_QUANTILE_STATE: case TYPE_HLL: case TYPE_VARCHAR: case TYPE_STRING: @@ -198,6 +202,7 @@ inline int get_byte_size(PrimitiveType type) { inline int get_real_byte_size(PrimitiveType type) { switch (type) { case TYPE_OBJECT: + case TYPE_QUANTILE_STATE: case TYPE_HLL: case TYPE_VARCHAR: case TYPE_STRING: @@ -242,7 +247,7 @@ int get_slot_size(PrimitiveType type); inline bool is_type_compatible(PrimitiveType lhs, PrimitiveType rhs) { if (lhs == TYPE_VARCHAR) { return rhs == TYPE_CHAR || rhs == TYPE_VARCHAR || rhs == TYPE_HLL || rhs == TYPE_OBJECT || - rhs == TYPE_STRING; + rhs == TYPE_QUANTILE_STATE || rhs == TYPE_STRING; } if (lhs == TYPE_OBJECT) { @@ -258,6 +263,10 @@ inline bool is_type_compatible(PrimitiveType lhs, PrimitiveType rhs) { rhs == TYPE_STRING; } + if (lhs == TYPE_QUANTILE_STATE) { + return rhs == TYPE_VARCHAR || rhs == TYPE_QUANTILE_STATE || rhs == TYPE_STRING; + } + return lhs == rhs; } diff --git a/be/src/runtime/raw_value.cpp b/be/src/runtime/raw_value.cpp index 6d8641e18b..8fa5d737d9 100644 --- a/be/src/runtime/raw_value.cpp +++ b/be/src/runtime/raw_value.cpp @@ -216,6 +216,7 @@ void RawValue::print_value(const void* value, const TypeDescriptor& type, int sc case TYPE_VARCHAR: case TYPE_OBJECT: case TYPE_HLL: + case TYPE_QUANTILE_STATE: case TYPE_STRING: { string_val = reinterpret_cast(value); std::stringstream ss; @@ -296,6 +297,7 @@ void RawValue::write(const void* value, void* dst, const TypeDescriptor& type, M case TYPE_OBJECT: case TYPE_HLL: + case TYPE_QUANTILE_STATE: case TYPE_VARCHAR: case TYPE_CHAR: case TYPE_STRING: { diff --git a/be/src/runtime/types.h b/be/src/runtime/types.h index 152e87e048..f47cafa798 100644 --- a/be/src/runtime/types.h +++ b/be/src/runtime/types.h @@ -165,7 +165,7 @@ struct TypeDescriptor { inline bool is_string_type() const { return type == TYPE_VARCHAR || type == TYPE_CHAR || type == TYPE_HLL || - type == TYPE_OBJECT || type == TYPE_STRING; + type == TYPE_OBJECT || type == TYPE_QUANTILE_STATE || type == TYPE_STRING; } inline bool is_date_type() const { return type == TYPE_DATE || type == TYPE_DATETIME; } @@ -176,7 +176,7 @@ struct TypeDescriptor { inline bool is_var_len_string_type() const { return type == TYPE_VARCHAR || type == TYPE_HLL || type == TYPE_CHAR || - type == TYPE_OBJECT || type == TYPE_STRING; + type == TYPE_OBJECT || type == TYPE_QUANTILE_STATE || type == TYPE_STRING; } inline bool is_complex_type() const { @@ -193,6 +193,7 @@ struct TypeDescriptor { case TYPE_VARCHAR: case TYPE_HLL: case TYPE_OBJECT: + case TYPE_QUANTILE_STATE: case TYPE_STRING: return 0; @@ -232,6 +233,7 @@ struct TypeDescriptor { case TYPE_VARCHAR: case TYPE_HLL: case TYPE_OBJECT: + case TYPE_QUANTILE_STATE: case TYPE_STRING: return sizeof(StringValue); diff --git a/be/src/runtime/vectorized_row_batch.cpp b/be/src/runtime/vectorized_row_batch.cpp index 06a19fcd9e..7e1d6449f1 100644 --- a/be/src/runtime/vectorized_row_batch.cpp +++ b/be/src/runtime/vectorized_row_batch.cpp @@ -51,8 +51,7 @@ void VectorizedRowBatch::dump_to_row_block(RowBlock* row_block) { row_block->_mem_buf + row_block->_field_offset_in_memory[column_id]; const TabletColumn& column = _schema->column(column_id); size_t field_size = 0; - if (column.type() == OLAP_FIELD_TYPE_CHAR || column.type() == OLAP_FIELD_TYPE_VARCHAR || column.type() == OLAP_FIELD_TYPE_STRING || - column.type() == OLAP_FIELD_TYPE_HLL || column.type() == OLAP_FIELD_TYPE_OBJECT) { + if (column.is_length_variable_type()) { field_size = sizeof(Slice); } else { field_size = column.length(); @@ -94,8 +93,7 @@ void VectorizedRowBatch::dump_to_row_block(RowBlock* row_block) { const TabletColumn& column = _schema->column(column_id); size_t field_size = 0; - if (column.type() == OLAP_FIELD_TYPE_CHAR || column.type() == OLAP_FIELD_TYPE_VARCHAR || column.type() == OLAP_FIELD_TYPE_STRING|| - column.type() == OLAP_FIELD_TYPE_HLL || column.type() == OLAP_FIELD_TYPE_OBJECT) { + if (column.is_length_variable_type()) { field_size = sizeof(Slice); } else { field_size = column.length(); diff --git a/be/src/udf/udf.h b/be/src/udf/udf.h index 481f19a14e..fa02b844fd 100644 --- a/be/src/udf/udf.h +++ b/be/src/udf/udf.h @@ -84,7 +84,8 @@ public: TYPE_FIXED_BUFFER, TYPE_DECIMALV2, TYPE_OBJECT, - TYPE_ARRAY + TYPE_ARRAY, + TYPE_QUANTILE_STATE }; struct TypeDesc { diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index 2385b80ee6..600882efbc 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -108,6 +108,7 @@ set(UTIL_FILES s3_util.cpp topn_counter.cpp tuple_row_zorder_compare.cpp + quantile_state.cpp jni-util.cpp ) diff --git a/be/src/util/quantile_state.cpp b/be/src/util/quantile_state.cpp new file mode 100644 index 0000000000..a6b2cf7b22 --- /dev/null +++ b/be/src/util/quantile_state.cpp @@ -0,0 +1,362 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "util/quantile_state.h" + +#include + +#include + +#include "common/logging.h" +#include "util/coding.h" + +namespace doris { + +template +QuantileState::QuantileState() : _type(EMPTY), _compression(QUANTILE_STATE_COMPRESSION_MIN) {} + +template +QuantileState::QuantileState(float compression) : _type(EMPTY), _compression(compression) {} + +template +QuantileState::QuantileState(const Slice& slice) { + DCHECK(deserialize(slice)); + if (!deserialize(slice)) { + _type = EMPTY; + } +} + +template +size_t QuantileState::get_serialized_size() { + size_t size = 1 + sizeof(float); // type(QuantileStateType) + compression(float) + switch (_type) { + case EMPTY: + break; + case SINGLE: + size += sizeof(T); + break; + case EXPLICIT: + size += sizeof(uint16_t) + sizeof(T) * _explicit_data.size(); + break; + case TDIGEST: + size += _tdigest_ptr->serialized_size(); + break; + } + return size; +} + +template +void QuantileState::set_compression(float compression) { + DCHECK(compression >= QUANTILE_STATE_COMPRESSION_MIN && + compression <= QUANTILE_STATE_COMPRESSION_MAX); + this->_compression = compression; +} + +template +bool QuantileState::is_valid(const Slice& slice) { + if (slice.size < 1) { + return false; + } + const uint8_t* ptr = (uint8_t*)slice.data; + const uint8_t* end = (uint8_t*)slice.data + slice.size; + float compress_value = *reinterpret_cast(ptr); + if (compress_value < QUANTILE_STATE_COMPRESSION_MIN || + compress_value > QUANTILE_STATE_COMPRESSION_MAX) { + return false; + } + ptr += sizeof(float); + + auto type = (QuantileStateType)*ptr++; + switch (type) { + case EMPTY: + break; + case SINGLE: { + if ((ptr + sizeof(T)) > end) { + return false; + } + ptr += sizeof(T); + break; + } + case EXPLICIT: { + if ((ptr + sizeof(uint16_t)) > end) { + return false; + } + uint16_t num_explicits = decode_fixed16_le(ptr); + ptr += sizeof(uint16_t); + ptr += num_explicits * sizeof(T); + break; + } + case TDIGEST: { + if ((ptr + sizeof(uint32_t)) > end) { + return false; + } + uint32_t tdigest_serialized_length = decode_fixed32_le(ptr); + ptr += tdigest_serialized_length; + break; + } + default: + return false; + } + return ptr == end; +} + +template +T QuantileState::get_explicit_value_by_percentile(float percentile) { + DCHECK(_type == EXPLICIT); + int n = _explicit_data.size(); + std::sort(_explicit_data.begin(), _explicit_data.end()); + + double index = (n - 1) * percentile; + int intIdx = (int)index; + if (intIdx == n - 1) { + return _explicit_data[intIdx]; + } + return _explicit_data[intIdx + 1] * (index - intIdx) + + _explicit_data[intIdx] * (intIdx + 1 - index); +} + +template +T QuantileState::get_value_by_percentile(float percentile) { + DCHECK(percentile >= 0 && percentile <= 1); + switch (_type) { + case EMPTY: { + return NAN; + } + case SINGLE: { + return _single_data; + } + case EXPLICIT: { + return get_explicit_value_by_percentile(percentile); + } + case TDIGEST: { + return _tdigest_ptr->quantile(percentile); + } + default: + break; + } + return NAN; +} + +template +bool QuantileState::deserialize(const Slice& slice) { + DCHECK(_type == EMPTY); + + // in case of insert error data caused be crashed + if (slice.data == nullptr || slice.size <= 0) { + return false; + } + // check input is valid + if (!is_valid(slice)) { + LOG(WARNING) << "QuantileState deserialize failed: slice is invalid"; + return false; + } + + const uint8_t* ptr = (uint8_t*)slice.data; + _compression = *reinterpret_cast(ptr); + ptr += sizeof(float); + // first byte : type + _type = (QuantileStateType)*ptr++; + switch (_type) { + case EMPTY: + // 1: empty + break; + case SINGLE: { + // 2: single_data value + _single_data = *reinterpret_cast(ptr); + ptr += sizeof(T); + break; + } + case EXPLICIT: { + // 3: number of explicit values + // make sure that num_explicit is positive + uint16_t num_explicits = decode_fixed16_le(ptr); + ptr += sizeof(uint16_t); + _explicit_data.reserve(std::min(num_explicits * 2, QUANTILE_STATE_EXPLICIT_NUM)); + _explicit_data.resize(num_explicits); + memcpy(&_explicit_data[0], ptr, num_explicits * sizeof(T)); + ptr += num_explicits * sizeof(T); + break; + } + case TDIGEST: { + // 4: Tdigest object value + _tdigest_ptr = std::make_unique(0); + _tdigest_ptr->unserialize(ptr); + break; + } + default: + // revert type to EMPTY + _type = EMPTY; + return false; + } + return true; +} + +template +size_t QuantileState::serialize(uint8_t* dst) const { + uint8_t* ptr = dst; + *reinterpret_cast(ptr) = _compression; + ptr += sizeof(float); + switch (_type) { + case EMPTY: { + *ptr++ = EMPTY; + break; + } + case SINGLE: { + *ptr++ = SINGLE; + *reinterpret_cast(ptr) = _single_data; + ptr += sizeof(T); + break; + } + case EXPLICIT: { + *ptr++ = EXPLICIT; + uint16_t size = _explicit_data.size(); + *reinterpret_cast(ptr) = size; + ptr += sizeof(uint16_t); + memcpy(ptr, &_explicit_data[0], size * sizeof(T)); + ptr += size * sizeof(T); + break; + } + case TDIGEST: { + *ptr++ = TDIGEST; + size_t tdigest_size = _tdigest_ptr->serialize(ptr); + ptr += tdigest_size; + break; + } + default: + break; + } + return ptr - dst; +} + +template +void QuantileState::merge(QuantileState& other) { + switch (other._type) { + case EMPTY: + break; + case SINGLE: { + add_value(other._single_data); + break; + } + case EXPLICIT: { + switch (_type) { + case EMPTY: + _type = EXPLICIT; + _explicit_data = other._explicit_data; + break; + case SINGLE: + _type = EXPLICIT; + _explicit_data = other._explicit_data; + add_value(_single_data); + break; + case EXPLICIT: + if (_explicit_data.size() + other._explicit_data.size() > QUANTILE_STATE_EXPLICIT_NUM) { + _type = TDIGEST; + _tdigest_ptr = std::make_unique(_compression); + for (int i = 0; i < _explicit_data.size(); i++) { + _tdigest_ptr->add(_explicit_data[i]); + } + for (int i = 0; i < other._explicit_data.size(); i++) { + _tdigest_ptr->add(other._explicit_data[i]); + } + } else { + _explicit_data.insert(_explicit_data.end(), other._explicit_data.begin(), + other._explicit_data.end()); + } + break; + case TDIGEST: + for (int i = 0; i < other._explicit_data.size(); i++) { + _tdigest_ptr->add(other._explicit_data[i]); + } + break; + default: + break; + } + break; + } + case TDIGEST: { + switch (_type) { + case EMPTY: + _type = TDIGEST; + _tdigest_ptr = std::move(other._tdigest_ptr); + break; + case SINGLE: + _type = TDIGEST; + _tdigest_ptr = std::move(other._tdigest_ptr); + _tdigest_ptr->add(_single_data); + break; + case EXPLICIT: + _type = TDIGEST; + _tdigest_ptr = std::move(other._tdigest_ptr); + for (int i = 0; i < _explicit_data.size(); i++) { + _tdigest_ptr->add(_explicit_data[i]); + } + break; + case TDIGEST: + _tdigest_ptr->merge(other._tdigest_ptr.get()); + break; + default: + break; + } + break; + } + default: + return; + } +} + +template +void QuantileState::add_value(const T& value) { + switch (_type) { + case EMPTY: + _single_data = value; + _type = SINGLE; + break; + case SINGLE: + _explicit_data.emplace_back(_single_data); + _explicit_data.emplace_back(value); + _type = EXPLICIT; + break; + case EXPLICIT: + if (_explicit_data.size() == QUANTILE_STATE_EXPLICIT_NUM) { + _tdigest_ptr = std::make_unique(_compression); + for (int i = 0; i < _explicit_data.size(); i++) { + _tdigest_ptr->add(_explicit_data[i]); + } + _explicit_data.clear(); + _explicit_data.shrink_to_fit(); + _type = TDIGEST; + + } else { + _explicit_data.emplace_back(value); + } + break; + case TDIGEST: + _tdigest_ptr->add(value); + break; + } +} + +template +void QuantileState::clear() { + _type = EMPTY; + _tdigest_ptr.reset(); + _explicit_data.clear(); + _explicit_data.shrink_to_fit(); +} + +template class QuantileState; + +} // namespace doris diff --git a/be/src/util/quantile_state.h b/be/src/util/quantile_state.h new file mode 100644 index 0000000000..db618d5dc2 --- /dev/null +++ b/be/src/util/quantile_state.h @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef DORIS_BE_SRC_OLAP_QUANTILE_STATE_H +#define DORIS_BE_SRC_OLAP_QUANTILE_STATE_H + +#include +#include +#include + +#include "slice.h" +#include "tdigest.h" + +namespace doris { + +class Slice; +class TDigest; + +const static int QUANTILE_STATE_EXPLICIT_NUM = 2048; +const static int QUANTILE_STATE_COMPRESSION_MIN = 2048; +const static int QUANTILE_STATE_COMPRESSION_MAX = 10000; + +enum QuantileStateType { + EMPTY = 0, + SINGLE = 1, // single element + EXPLICIT = 2, // more than one elements,stored in vector + TDIGEST = 3 // TDIGEST object +}; + +template +class QuantileState { +public: + QuantileState(); + explicit QuantileState(float compression); + explicit QuantileState(const Slice& slice); + void set_compression(float compression); + bool deserialize(const Slice& slice); + size_t serialize(uint8_t* dst) const; + void merge(QuantileState& other); + void add_value(const T& value); + void clear(); + bool is_valid(const Slice& slice); + size_t get_serialized_size(); + T get_value_by_percentile(float percentile); + T get_explicit_value_by_percentile(float percentile); + ~QuantileState() = default; + +private: + QuantileStateType _type = EMPTY; + std::unique_ptr _tdigest_ptr; + T _single_data; + std::vector _explicit_data; + float _compression; +}; + +} // namespace doris + +#endif // DORIS_BE_SRC_OLAP_QUANTILE_STATE_H \ No newline at end of file diff --git a/be/src/util/symbols_util.cpp b/be/src/util/symbols_util.cpp index 428917ec18..5a7f74ae53 100644 --- a/be/src/util/symbols_util.cpp +++ b/be/src/util/symbols_util.cpp @@ -151,6 +151,7 @@ static void append_any_val_type(int namespace_id, const TypeDescriptor& type, case TYPE_HLL: case TYPE_OBJECT: case TYPE_STRING: + case TYPE_QUANTILE_STATE: append_mangled_token("StringVal", s); break; case TYPE_DATE: diff --git a/be/src/util/tdigest.h b/be/src/util/tdigest.h index 66ff7ccda8..28569bf0a9 100644 --- a/be/src/util/tdigest.h +++ b/be/src/util/tdigest.h @@ -211,7 +211,7 @@ public: // merge in another t-digest inline void merge(const TDigest* other) { - std::vector others{other}; + std::vector others {other}; add(others.cbegin(), others.cend()); } @@ -232,7 +232,7 @@ public: std::vector::const_iterator end) { if (iter != end) { auto size = std::distance(iter, end); - TDigestQueue pq(TDigestComparator{}); + TDigestQueue pq(TDigestComparator {}); for (; iter != end; iter++) { pq.push((*iter)); } @@ -288,7 +288,7 @@ public: return 0.0; } else if (_processed.size() == 1) { VLOG_CRITICAL << "one processed value " - << " _min " << _min << " _max " << _max; + << " _min " << _min << " _max " << _max; // exactly one centroid, should have _max==_min auto width = _max - _min; if (x < _min) { @@ -306,20 +306,20 @@ public: auto n = _processed.size(); if (x <= _min) { VLOG_CRITICAL << "below _min " - << " _min " << _min << " x " << x; + << " _min " << _min << " x " << x; return 0; } if (x >= _max) { VLOG_CRITICAL << "above _max " - << " _max " << _max << " x " << x; + << " _max " << _max << " x " << x; return 1; } // check for the left tail if (x <= mean(0)) { VLOG_CRITICAL << "left tail " - << " _min " << _min << " mean(0) " << mean(0) << " x " << x; + << " _min " << _min << " mean(0) " << mean(0) << " x " << x; // note that this is different than mean(0) > _min ... this guarantees interpolation works if (mean(0) - _min > 0) { @@ -332,7 +332,7 @@ public: // and the right tail if (x >= mean(n - 1)) { VLOG_CRITICAL << "right tail" - << " _max " << _max << " mean(n - 1) " << mean(n - 1) << " x " << x; + << " _max " << _max << " mean(n - 1) " << mean(n - 1) << " x " << x; if (_max - mean(n - 1) > 0) { return 1.0 - (_max - x) / (_max - mean(n - 1)) * weight(n - 1) / @@ -352,7 +352,7 @@ public: DCHECK_LE(0.0, z1); DCHECK_LE(0.0, z2); VLOG_CRITICAL << "middle " - << " z1 " << z1 << " z2 " << z2 << " x " << x; + << " z1 " << z1 << " z2 " << z2 << " x " << x; return weightedAverage(_cumulative[i - 1], z2, _cumulative[i], z1) / _processed_weight; } @@ -443,12 +443,16 @@ public: } uint32_t serialized_size() { - return sizeof(Value) * 5 + sizeof(Index) * 2 + sizeof(uint32_t) * 3 + + return sizeof(uint32_t) + sizeof(Value) * 5 + sizeof(Index) * 2 + sizeof(uint32_t) * 3 + _processed.size() * sizeof(Centroid) + _unprocessed.size() * sizeof(Centroid) + _cumulative.size() * sizeof(Weight); } - void serialize(uint8_t* writer) { + size_t serialize(uint8_t* writer) { + uint8_t* dst = writer; + uint32_t total_size = serialized_size(); + memcpy(writer, &total_size, sizeof(uint32_t)); + writer += sizeof(uint32_t); memcpy(writer, &_compression, sizeof(Value)); writer += sizeof(Value); memcpy(writer, &_min, sizeof(Value)); @@ -475,6 +479,7 @@ public: size = _unprocessed.size(); memcpy(writer, &size, sizeof(uint32_t)); writer += sizeof(uint32_t); + //TODO(weixiang): may be once memcpy is enough! for (int i = 0; i < size; i++) { memcpy(writer, &_unprocessed[i], sizeof(Centroid)); writer += sizeof(Centroid); @@ -487,9 +492,13 @@ public: memcpy(writer, &_cumulative[i], sizeof(Weight)); writer += sizeof(Weight); } + return writer - dst; } void unserialize(const uint8_t* type_reader) { + uint32_t total_length = 0; + memcpy(&total_length, type_reader, sizeof(uint32_t)); + type_reader += sizeof(uint32_t); memcpy(&_compression, type_reader, sizeof(Value)); type_reader += sizeof(Value); memcpy(&_min, type_reader, sizeof(Value)); @@ -579,7 +588,7 @@ private: if (tdigests.size() == 0) return; size_t total = 0; - CentroidListQueue pq(CentroidListComparator{}); + CentroidListQueue pq(CentroidListComparator {}); for (auto& td : tdigests) { auto& sorted = td->_processed; auto size = sorted.size(); @@ -684,14 +693,15 @@ private: auto dq = w / total; auto k2 = integratedLocation(q + dq); if (k2 - k1 > 1 && w != 1) { - VLOG_CRITICAL << "Oversize centroid at " << std::distance(sorted.cbegin(), iter) << " k1 " - << k1 << " k2 " << k2 << " dk " << (k2 - k1) << " w " << w << " q " << q; + VLOG_CRITICAL << "Oversize centroid at " << std::distance(sorted.cbegin(), iter) + << " k1 " << k1 << " k2 " << k2 << " dk " << (k2 - k1) << " w " << w + << " q " << q; badWeight++; } if (k2 - k1 > 1.5 && w != 1) { VLOG_CRITICAL << "Egregiously Oversize centroid at " - << std::distance(sorted.cbegin(), iter) << " k1 " << k1 << " k2 " << k2 - << " dk " << (k2 - k1) << " w " << w << " q " << q; + << std::distance(sorted.cbegin(), iter) << " k1 " << k1 << " k2 " + << k2 << " dk " << (k2 - k1) << " w " << w << " q " << q; badWeight++; } q += dq; diff --git a/be/test/exprs/CMakeLists.txt b/be/test/exprs/CMakeLists.txt index 9f4702bb8f..4205be2202 100644 --- a/be/test/exprs/CMakeLists.txt +++ b/be/test/exprs/CMakeLists.txt @@ -39,4 +39,5 @@ ADD_BE_TEST(topn_function_test) ADD_BE_TEST(runtime_filter_test) ADD_BE_TEST(bloom_filter_predicate_test) ADD_BE_TEST(array_functions_test) +ADD_BE_TEST(quantile_function_test) diff --git a/be/test/exprs/quantile_function_test.cpp b/be/test/exprs/quantile_function_test.cpp new file mode 100644 index 0000000000..3ad6d44cda --- /dev/null +++ b/be/test/exprs/quantile_function_test.cpp @@ -0,0 +1,135 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exprs/quantile_function.h" + +#include + +#include +#include +#include + +#include "exprs/anyval_util.h" +#include "testutil/function_utils.h" +#include "udf/udf_internal.h" +#include "util/quantile_state.h" + +namespace doris { +using DoubleQuantileState = QuantileState; + +StringVal convert_quantile_state_to_string(FunctionContext* ctx, DoubleQuantileState& state) { + StringVal result(ctx, state.get_serialized_size()); + state.serialize(result.ptr); + return result; +} + +class QuantileStateFunctionsTest : public testing::Test { +public: + QuantileStateFunctionsTest() = default; + void SetUp() { + utils = new FunctionUtils(); + ctx = utils->get_fn_ctx(); + } + void TearDown() { delete utils; } + +private: + FunctionUtils* utils; + FunctionContext* ctx; +}; + +TEST_F(QuantileStateFunctionsTest, to_quantile_state) { + FloatVal percentile = FloatVal(2048); + std::vector constant_args; + constant_args.push_back(nullptr); + constant_args.push_back(&percentile); + + ctx->impl()->set_constant_args(constant_args); + StringVal input = AnyValUtil::from_string_temp(ctx, std::to_string(5000)); + StringVal result = QuantileStateFunctions::to_quantile_state(ctx, input); + float compression = 2048; + DoubleQuantileState state(compression); + state.add_value(5000); + StringVal expected = convert_quantile_state_to_string(ctx, state); + ASSERT_EQ(expected, result); +} + +TEST_F(QuantileStateFunctionsTest, quantile_union) { + StringVal dst; + QuantileStateFunctions::quantile_state_init(ctx, &dst); + DoubleQuantileState state1; + state1.add_value(1); + StringVal src1 = convert_quantile_state_to_string(ctx, state1); + QuantileStateFunctions::quantile_union(ctx, src1, &dst); + + DoubleQuantileState state2; + state2.add_value(2); + StringVal src2 = convert_quantile_state_to_string(ctx, state2); + QuantileStateFunctions::quantile_union(ctx, src2, &dst); + + DoubleQuantileState state3; + state3.add_value(3); + StringVal src3 = convert_quantile_state_to_string(ctx, state3); + QuantileStateFunctions::quantile_union(ctx, src3, &dst); + + DoubleQuantileState state4; + state4.add_value(4); + StringVal src4 = convert_quantile_state_to_string(ctx, state4); + QuantileStateFunctions::quantile_union(ctx, src4, &dst); + + DoubleQuantileState state5; + state5.add_value(5); + StringVal src5 = convert_quantile_state_to_string(ctx, state5); + QuantileStateFunctions::quantile_union(ctx, src5, &dst); + + DoubleQuantileState expect; + expect.add_value(1); + expect.add_value(2); + expect.add_value(3); + expect.add_value(4); + expect.add_value(5); + + StringVal result = QuantileStateFunctions::quantile_state_serialize(ctx, dst); + StringVal expected = convert_quantile_state_to_string(ctx, expect); + + ASSERT_EQ(result, expected); +} + +TEST_F(QuantileStateFunctionsTest, quantile_percent) { + FloatVal percentile = FloatVal(0.5); + std::vector constant_args; + constant_args.push_back(nullptr); + constant_args.push_back(&percentile); + ctx->impl()->set_constant_args(constant_args); + + DoubleQuantileState state; + state.add_value(1); + state.add_value(2); + state.add_value(3); + state.add_value(4); + state.add_value(5); + StringVal input = convert_quantile_state_to_string(ctx, state); + DoubleVal result = QuantileStateFunctions::quantile_percent(ctx, input); + DoubleVal expected(2.5); + ASSERT_EQ(result, expected); +} + +} // namespace doris + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/be/test/util/CMakeLists.txt b/be/test/util/CMakeLists.txt index 12daf3839c..07d8e9ed75 100644 --- a/be/test/util/CMakeLists.txt +++ b/be/test/util/CMakeLists.txt @@ -76,5 +76,6 @@ ADD_BE_TEST(counts_test) ADD_BE_TEST(date_func_test) ADD_BE_TEST(tuple_row_zorder_compare_test) ADD_BE_TEST(array_parser_test) +ADD_BE_TEST(quantile_state_test) target_link_libraries(Test_util Common Util Gutil ${Boost_LIBRARIES} glog gflags fmt protobuf) diff --git a/be/test/util/quantile_state_test.cpp b/be/test/util/quantile_state_test.cpp new file mode 100644 index 0000000000..05d3e1ee3f --- /dev/null +++ b/be/test/util/quantile_state_test.cpp @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/quantile_state.h" + +#include + +namespace doris { +using DoubleQuantileState = QuantileState; + +TEST(QuantileStateTest, merge) { + DoubleQuantileState empty; + ASSERT_EQ(EMPTY, empty._type); + empty.add_value(1); + ASSERT_EQ(SINGLE, empty._type); + empty.add_value(2); + empty.add_value(3); + empty.add_value(4); + empty.add_value(5); + ASSERT_EQ(1, empty.get_value_by_percentile(0)); + ASSERT_EQ(2.5, empty.get_value_by_percentile(0.5)); + ASSERT_EQ(5, empty.get_value_by_percentile(1)); + + DoubleQuantileState another; + another.add_value(6); + another.add_value(7); + another.add_value(8); + another.add_value(9); + another.add_value(10); + ASSERT_EQ(6, empty.get_value_by_percentile(0)); + ASSERT_EQ(7.5, empty.get_value_by_percentile(0.5)); + ASSERT_EQ(10, empty.get_value_by_percentile(1)); + + another.merge(empty); + ASSERT_EQ(1, another.get_value_by_percentile(0)); + ASSERT_EQ(5.5, another.get_value_by_percentile(0.5)); + ASSERT_EQ(10, another.get_value_by_percentile(1)); +} + +} // namespace doris \ No newline at end of file diff --git a/docs/en/sql-reference/sql-statements/Data Definition/CREATE TABLE.md b/docs/en/sql-reference/sql-statements/Data Definition/CREATE TABLE.md index 9d68e9a4a9..e89cbb7d2e 100644 --- a/docs/en/sql-reference/sql-statements/Data Definition/CREATE TABLE.md +++ b/docs/en/sql-reference/sql-statements/Data Definition/CREATE TABLE.md @@ -88,6 +88,11 @@ Syntax: This type can only be queried by hll_union_agg, hll_cardinality, hll_hash functions. BITMAP BITMAP type, No need to specify length. Represent a set of unsigned bigint numbers, the largest element could be 2^64 - 1 + QUANTILE_STATE + QUANTILE_STATE type, No need to specify length. Represents the quantile pre-aggregation result. Currently, only numerical raw data types are supported such as `int`,`float`,`double`, etc. + If the number of elements is less than 2048, the explict data is stored. + If the number of elements is greater than 2048, the intermediate result of the pre-aggregation of the TDigest algorithm is stored. + ``` agg_type: Aggregation type. If not specified, the column is key column. Otherwise, the column is value column. @@ -95,8 +100,14 @@ Syntax: * HLL_UNION: Only for HLL type * REPLACE_IF_NOT_NULL: The meaning of this aggregation type is that substitution will occur if and only if the newly imported data is a non-null value. If the newly imported data is null, Doris will still retain the original value. Note: if NOT NULL is specified in the REPLACE_IF_NOT_NULL column when the user creates the table, Doris will convert it to NULL and will not report an error to the user. Users can leverage this aggregate type to achieve importing some of columns .**It should be noted here that the default value should be NULL, not an empty string. If it is an empty string, you should replace it with an empty string**. * BITMAP_UNION: Only for BITMAP type + * QUANTILE_UNION: Only for QUANTILE_STATE type Allow NULL: Default is NOT NULL. NULL value should be represented as `\N` in load source file. - Notice: The origin value of BITMAP_UNION column should be TINYINT, SMALLINT, INT, BIGINT. + + Notice: + + The origin value of BITMAP_UNION column should be TINYINT, SMALLINT, INT, BIGINT. + + The origin value of QUANTILE_UNION column should be a numeric type such as TINYINT, INT, FLOAT, DOUBLE, DECIMAL, etc. 2. index_definition Syntax: `INDEX index_name (col_name[, col_name, ...]) [USING BITMAP] COMMENT 'xxxxxx'` @@ -125,6 +136,7 @@ Syntax: table_name in CREATE TABLE stmt is table is Doris. They can be different or same. MySQL table created in Doris is for accessing data in MySQL database. Doris does not maintain and store any data from MySQL table. + 2) For broker, properties should include: ``` @@ -633,8 +645,21 @@ Syntax: AGGREGATE KEY(k1, k2) DISTRIBUTED BY HASH(k1) BUCKETS 32; ``` - -9. Create 2 colocate join table. +9. Create a table with QUANTILE_UNION column (the origin value of **v1** and **v2** columns must be **numeric** types) + + ``` + CREATE TABLE example_db.example_table + ( + k1 TINYINT, + k2 DECIMAL(10, 2) DEFAULT "10.5", + v1 QUANTILE_STATE QUANTILE_UNION, + v2 QUANTILE_STATE QUANTILE_UNION + ) + ENGINE=olap + AGGREGATE KEY(k1, k2) + DISTRIBUTED BY HASH(k1) BUCKETS 32; + ``` +10. Create 2 colocate join table. ``` CREATE TABLE `t1` ( @@ -657,7 +682,7 @@ Syntax: ); ``` -10. Create a broker table, with file on BOS. +11. Create a broker table, with file on BOS. ``` CREATE EXTERNAL TABLE example_db.table_broker ( @@ -675,7 +700,7 @@ Syntax: ); ``` -11. Create a table with a bitmap index +12. Create a table with a bitmap index ``` CREATE TABLE example_db.table_hash @@ -692,7 +717,7 @@ Syntax: DISTRIBUTED BY HASH(k1) BUCKETS 32; ``` -12. Create a dynamic partitioning table (dynamic partitioning needs to be enabled in FE configuration), which creates partitions 3 days in advance every day. For example, if today is' 2020-01-08 ', partitions named 'p20200108', 'p20200109', 'p20200110', 'p20200111' will be created. +13. Create a dynamic partitioning table (dynamic partitioning needs to be enabled in FE configuration), which creates partitions 3 days in advance every day. For example, if today is' 2020-01-08 ', partitions named 'p20200108', 'p20200109', 'p20200110', 'p20200111' will be created. ``` [types: [DATE]; keys: [2020-01-08]; ‥types: [DATE]; keys: [2020-01-09]; ) @@ -722,7 +747,7 @@ Syntax: "dynamic_partition.buckets" = "32" ); ``` -13. Create a table with rollup index +14. Create a table with rollup index ``` CREATE TABLE example_db.rolup_index_table ( @@ -742,7 +767,7 @@ Syntax: PROPERTIES("replication_num" = "3"); ``` -14. Create a inmemory table: +15. Create a inmemory table: ``` CREATE TABLE example_db.table_hash @@ -760,7 +785,7 @@ Syntax: PROPERTIES ("in_memory"="true"); ``` -15. Create a hive external table +16. Create a hive external table ``` CREATE TABLE example_db.table_hive ( @@ -777,7 +802,7 @@ Syntax: ); ``` -16. Specify the replica distribution of the table through replication_allocation +17. Specify the replica distribution of the table through replication_allocation ``` CREATE TABLE example_db.table_hash diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md index fac74a7db1..17b328bac4 100644 --- a/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md +++ b/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md @@ -232,7 +232,11 @@ Where url is the url given by ErrorURL. ```Curl --location-trusted -u root -H "columns: k1, k2, v1=to_bitmap(k1), v2=bitmap_empty()" -T testData http://host:port/api/testDb/testTbl/_stream_load``` -10. a simple load json +10. load a table with QUANTILE_STATE columns, which can be columns in the table or a column in the data used to generate QUANTILE_STATE columns, you can also use TO_QUANTILE_STATE to transfer numberical data to QUANTILE_STATE. 2048 is an optional parameter representing the precision of the TDigest algorithm, the valid value is [2048, 10000], the larger the value, the higher the precision, default is 2048 + + ```Curl --location-trusted -u root -H "columns: k1, k2, v1, v2, v1=to_quantile_state(v1, 2048)" -T testData http://host:port/api/testDb/testTbl/_stream_load``` + +11. a simple load json table schema: `category` varchar(512) NULL COMMENT "", `author` varchar(512) NULL COMMENT "", @@ -247,7 +251,7 @@ Where url is the url given by ErrorURL. {"category":"Java","author":"avc","title":"Effective Java","price":95} {"category":"Linux","author":"avc","title":"Linux kernel","price":195} -11. Matched load json by jsonpaths +12. Matched load json by jsonpaths For example json data: [ {"category":"xuxb111","author":"1avc","title":"SayingsoftheCentury","price":895}, @@ -260,7 +264,7 @@ Where url is the url given by ErrorURL. 1)If the json data starts as an array and each object in the array is a record, you need to set the strip_outer_array to true to represent the flat array. 2)If the json data starts with an array, and each object in the array is a record, our ROOT node is actually an object in the array when we set jsonpath. -12. User specifies the json_root node +13. User specifies the json_root node For example json data: { "RECORDS":[ @@ -272,9 +276,9 @@ Where url is the url given by ErrorURL. Matched imports are made by specifying jsonpath parameter, such as `category`, `author`, and `price`, for example: curl --location-trusted -u root -H "columns: category, price, author" -H "label:123" -H "format: json" -H "jsonpaths: [\"$.category\",\"$.price\",\"$.author\"]" -H "strip_outer_array: true" -H "json_root: $.RECORDS" -T testData http://host:port/api/testDb/testTbl/_stream_load -13. delete all data which key columns match the load data +14. delete all data which key columns match the load data curl --location-trusted -u root -H "merge_type: DELETE" -T testData http://host:port/api/testDb/testTbl/_stream_load -14. delete all data which key columns match the load data where flag is true, others append +15. delete all data which key columns match the load data where flag is true, others append curl --location-trusted -u root: -H "column_separator:," -H "columns: siteid, citycode, username, pv, flag" -H "merge_type: MERGE" -H "delete: flag=1" -T testData http://host:port/api/testDb/testTbl/_stream_load ## keyword diff --git a/docs/en/sql-reference/sql-statements/Data Types/QUANTILE_STATE.md b/docs/en/sql-reference/sql-statements/Data Types/QUANTILE_STATE.md new file mode 100644 index 0000000000..83c07ec017 --- /dev/null +++ b/docs/en/sql-reference/sql-statements/Data Types/QUANTILE_STATE.md @@ -0,0 +1,62 @@ +--- +{ + "title": "QUANTILE_STATE", + "language": "zh-CN" +} +--- + + + +# QUANTILE_STATE +## description + +QUANTILE_STATE + + QUANTILE_STATE cannot be used as a key column, and the aggregation type is QUANTILE_UNION when building the table. + The user does not need to specify the length and default value. The length is controlled within the system according to the degree of data aggregation. + And the QUANTILE_STATE column can only be queried or used through the supporting QUANTILE_PERCENT, QUANTILE_UNION and TO_QUANTILE_STATE functions. + QUANTILE_STATE is a type for calculating the approximate value of quantiles. Different values with the same key are pre-aggregated during loading process. When the number of aggregated values does not exceed 2048, all data are recorded in detail. When the number of aggregated values is greater than 2048, [TDigest] is used. (https://github.com/tdunning/t-digest/blob/main/docs/t-digest-paper/histo.pdf) algorithm to aggregate (cluster) the data and save the centroid points after clustering. + +related functions: + + QUANTILE_UNION(QUANTILE_STATE): + + This function is an aggregation function, which is used to aggregate the intermediate results of different quantile calculations. The result returned by this function is still QUANTILE_STATE + + + TO_QUANTILE_STATE(INT/FLOAT/DOUBLE raw_data [,FLOAT compression]): + + This function converts a numeric type to a QUANTILE_STATE type + The compression parameter is optional and can be set in the range [2048, 10000]. + The larger the value, the higher the precision of quantile approximation calculations, the greater the memory consumption, and the longer the calculation time. + An unspecified or set value for the compression parameter is outside the range [2048, 10000], run with the default value of 2048 + + QUANTILE_PERCENT(QUANTILE_STATE): + This function converts the intermediate result variable (QUANTILE_STATE) of the quantile calculation into a specific quantile value + + + +## example + select QUANTILE_PERCENT(QUANTILE_UNION(v1)) from test_table group by k1, k2, k3; + + +## keyword + + QUANTILE_STATE, QUANTILE_UNION, TO_QUANTILE_STATE, QUANTILE_PERCENT diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE TABLE.md b/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE TABLE.md index 872a0a2c9c..ae51ab323a 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE TABLE.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE TABLE.md @@ -89,6 +89,8 @@ under the License. 程度系统内控制,并且HLL列只能通过配套的hll_union_agg、Hll_cardinality、hll_hash进行查询或使用 BITMAP bitmap列类型,不需要指定长度和默认值。表示整型的集合,元素最大支持到2^64 - 1 + QUANTILE_STATE + QUANTILE_STATE列类型,不需要指定长度和默认值,表示分位数预聚合结果。目前仅支持原始数据为数值类型如:TINYINT、INT、FLOAT、DOUBLE、DECIMAL。当元素个数小于2048时存储明细数据,当元素个数大于2048时存储 [TDigest](https://github.com/tdunning/t-digest/blob/main/docs/t-digest-paper/histo.pdf) 算法预聚合的中间结果 ``` agg_type:聚合类型,如果不指定,则该列为 key 列。否则,该列为 value 列 @@ -96,13 +98,17 @@ under the License. * SUM、MAX、MIN、REPLACE * HLL_UNION(仅用于HLL列,为HLL独有的聚合方式)、 * BITMAP_UNION(仅用于 BITMAP 列,为 BITMAP 独有的聚合方式)、 + * QUANTILE_UNION(仅用于 QUANTILE_STATE 列,为 QUANTILE_STATE 独有的聚合方式) * REPLACE_IF_NOT_NULL:这个聚合类型的含义是当且仅当新导入数据是非NULL值时会发生替换行为,如果新导入的数据是NULL,那么Doris仍然会保留原值。注意:如果用在建表时REPLACE_IF_NOT_NULL列指定了NOT NULL,那么Doris仍然会将其转化NULL,不会向用户报错。用户可以借助这个类型完成部分列导入的功能。**这里要注意的是字段默认值要给NULL,而不能是空字符串,如果是空字符串,会给你替换成空字符串**。 * 该类型只对聚合模型(key_desc的type为AGGREGATE KEY)有用,其它模型不需要指这个。 是否允许为NULL: 默认允许为 NULL。NULL 值在导入数据中用 \N 来表示 注意: - BITMAP_UNION聚合类型列在导入时的原始数据类型必须是TINYINT,SMALLINT,INT,BIGINT。 + + BITMAP_UNION聚合类型列在导入时的原始数据类型必须是TINYINT,SMALLINT,INT,BIGINT。 + + QUANTILE_UNION聚合类型列在导入时的原始数据类型必须是数值类型如:TINYINT、INT、FLOAT、DOUBLE、DECIMAL 2. index_definition 语法: @@ -668,7 +674,22 @@ under the License. DISTRIBUTED BY HASH(k1) BUCKETS 32; ``` -9. 创建两张支持Colocate Join的表t1 和t2 +1. 创建一张含有QUANTILE_UNION聚合类型的表(v1和v2列的原始数据类型必须是数值类型) + +``` + CREATE TABLE example_db.example_table + ( + k1 TINYINT, + k2 DECIMAL(10, 2) DEFAULT "10.5", + v1 QUANTILE_STATE QUANTILE_UNION, + v2 QUANTILE_STATE QUANTILE_UNION + ) + ENGINE=olap + AGGREGATE KEY(k1, k2) + DISTRIBUTED BY HASH(k1) BUCKETS 32; +``` + +10. 创建两张支持Colocate Join的表t1 和t2 ``` CREATE TABLE `t1` ( @@ -692,7 +713,7 @@ under the License. ); ``` -10. 创建一个数据文件存储在BOS上的 broker 外部表 +11. 创建一个数据文件存储在BOS上的 broker 外部表 ``` CREATE EXTERNAL TABLE example_db.table_broker ( @@ -710,7 +731,7 @@ under the License. ) ``` -11. 创建一个带有bitmap 索引的表 +12. 创建一个带有bitmap 索引的表 ``` CREATE TABLE example_db.table_hash @@ -727,7 +748,7 @@ under the License. DISTRIBUTED BY HASH(k1) BUCKETS 32; ``` -12. 创建一个动态分区表(需要在FE配置中开启动态分区功能),该表每天提前创建3天的分区,并删除3天前的分区。例如今天为`2020-01-08`,则会创建分区名为`p20200108`, `p20200109`, `p20200110`, `p20200111`的分区. 分区范围分别为: +13. 创建一个动态分区表(需要在FE配置中开启动态分区功能),该表每天提前创建3天的分区,并删除3天前的分区。例如今天为`2020-01-08`,则会创建分区名为`p20200108`, `p20200109`, `p20200110`, `p20200111`的分区. 分区范围分别为: ``` [types: [DATE]; keys: [2020-01-08]; ‥types: [DATE]; keys: [2020-01-09]; ) @@ -759,7 +780,7 @@ under the License. ); ``` -13. 创建一个带有rollup索引的表 +14. 创建一个带有rollup索引的表 ``` CREATE TABLE example_db.rollup_index_table ( @@ -778,7 +799,7 @@ under the License. ) PROPERTIES("replication_num" = "3"); ``` -14. 创建一个内存表 +15. 创建一个内存表 ``` CREATE TABLE example_db.table_hash @@ -796,7 +817,7 @@ under the License. PROPERTIES ("in_memory"="true"); ``` -15. 创建一个hive外部表 +16. 创建一个hive外部表 ``` CREATE TABLE example_db.table_hive @@ -814,7 +835,7 @@ under the License. ); ``` -16. 通过 replication_allocation 指定表的副本分布 +17. 通过 replication_allocation 指定表的副本分布 ``` CREATE TABLE example_db.table_hash diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md index 9edcd46c2e..f89363f38c 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md @@ -169,7 +169,10 @@ under the License. 9. 导入含有BITMAP列的表,可以是表中的列或者数据中的列用于生成BITMAP列,也可以使用bitmap_empty填充空的Bitmap curl --location-trusted -u root -H "columns: k1, k2, v1=to_bitmap(k1), v2=bitmap_empty()" -T testData http://host:port/api/testDb/testTbl/_stream_load - 10. 简单模式,导入json数据 + 10. 导入含有QUANTILE_STATE列的表,可以是表中的列或者数据中的列用于生成QUANTILE_STATE列,也可以使用to_quantile_state 函数将**数值类型**的原始数据转化为**QUANTILE_STATE**类型, 其中2048是可以选参数 代表 TDigest 算法的精度,有效值为[2048,10000],数值越大精度越高,默认2048 + curl --location-trusted -u root -H "columns: k1, k2, v1, v2, v1=to_quantile_state(v1, 2048)" -T testData http://host:port/api/testDb/testTbl/_stream_load + + 11. 简单模式,导入json数据 表结构: `category` varchar(512) NULL COMMENT "", @@ -185,7 +188,7 @@ under the License. {"category":"Java","author":"avc","title":"Effective Java","price":95} {"category":"Linux","author":"avc","title":"Linux kernel","price":195} - 11. 匹配模式,导入json数据 + 12. 匹配模式,导入json数据 json数据格式: [ {"category":"xuxb111","author":"1avc","title":"SayingsoftheCentury","price":895}, @@ -198,7 +201,7 @@ under the License. 1)如果json数据是以数组开始,并且数组中每个对象是一条记录,则需要将strip_outer_array设置成true,表示展平数组。 2)如果json数据是以数组开始,并且数组中每个对象是一条记录,在设置jsonpath时,我们的ROOT节点实际上是数组中对象。 - 12. 用户指定json根节点 + 13. 用户指定json根节点 json数据格式: { "RECORDS":[ @@ -210,12 +213,12 @@ under the License. 通过指定jsonpath进行精准导入,例如只导入category、author、price三个属性 curl --location-trusted -u root -H "columns: category, price, author" -H "label:123" -H "format: json" -H "jsonpaths: [\"$.category\",\"$.price\",\"$.author\"]" -H "strip_outer_array: true" -H "json_root: $.RECORDS" -T testData http://host:port/api/testDb/testTbl/_stream_load - 13. 删除与这批导入key 相同的数据 + 14. 删除与这批导入key 相同的数据 curl --location-trusted -u root -H "merge_type: DELETE" -T testData http://host:port/api/testDb/testTbl/_stream_load - 14. 将这批数据中与flag 列为ture 的数据相匹配的列删除,其他行正常追加 + 15. 将这批数据中与flag 列为ture 的数据相匹配的列删除,其他行正常追加 curl --location-trusted -u root: -H "column_separator:," -H "columns: siteid, citycode, username, pv, flag" -H "merge_type: MERGE" -H "delete: flag=1" -T testData http://host:port/api/testDb/testTbl/_stream_load - 15. 导入数据到含有sequence列的UNIQUE_KEYS表中 + 16. 导入数据到含有sequence列的UNIQUE_KEYS表中 curl --location-trusted -u root -H "columns: k1,k2,source_sequence,v1,v2" -H "function_column.sequence_col: source_sequence" -T testData http://host:port/api/testDb/testTbl/_stream_load ## keyword diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Types/QUANTILE_STATE.md b/docs/zh-CN/sql-reference/sql-statements/Data Types/QUANTILE_STATE.md new file mode 100644 index 0000000000..6b7af804f0 --- /dev/null +++ b/docs/zh-CN/sql-reference/sql-statements/Data Types/QUANTILE_STATE.md @@ -0,0 +1,58 @@ +--- +{ + "title": "QUANTILE_STATE", + "language": "zh-CN" +} +--- + + + +# QUANTILE_STATE +## description + QUANTILE_STATE + QUANTILE_STATE不能作为key列使用,建表时配合聚合类型为QUANTILE_UNION。 + 用户不需要指定长度和默认值。长度根据数据的聚合程度系统内控制。 + 并且QUANTILE_STATE列只能通过配套的QUANTILE_PERCENT、QUANTILE_UNION、TO_QUANTILE_STATE等函数进行查询或使用。 + + QUANTILE_STATE 是一种计算分位数近似值的类型,在导入时会对相同的key,不同 value 进行预聚合,当value数量不超过2048时采用明细记录所有数据,当 value 数量大于2048时采用 [TDigest](https://github.com/tdunning/t-digest/blob/main/docs/t-digest-paper/histo.pdf) 算法,对数据进行聚合(聚类)保存聚类后的质心点。 + + 相关函数: + + QUANTILE_UNION(QUANTILE_STATE): + 此函数为聚合函数,用于将不同的分位数计算中间结果进行聚合操作。此函数返回的结果仍是QUANTILE_STATE + + + TO_QUANTILE_STATE(INT/FLOAT/DOUBLE raw_data [,FLOAT compression]): + 此函数将数值类型转化成QUANTILE_STATE类型 + compression参数是可选项,可设置范围是[2048, 10000],值越大,后续分位数近似计算的精度越高,内存消耗越大,计算耗时越长。 + compression参数未指定或设置的值在[2048, 10000]范围外,以2048的默认值运行 + + QUANTILE_PERCENT(QUANTILE_STATE): + 此函数将分位数计算的中间结果变量(QUANTILE_STATE)转化为具体的分位数数值 + + + +## example + select QUANTILE_PERCENT(QUANTILE_UNION(v1)) from test_table group by k1, k2, k3; + + +## keyword + + QUANTILE_STATE, QUANTILE_UNION, TO_QUANTILE_STATE, QUANTILE_PERCENT diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 1fd8904569..81bf09d718 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -236,7 +236,7 @@ parser code {: // Total keywords of doris terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALIAS, KW_ALL, KW_ALTER, KW_AND, KW_ANTI, KW_APPEND, KW_AS, KW_ASC, KW_AUTHORS, KW_ARRAY, - KW_BACKEND, KW_BACKUP, KW_BETWEEN, KW_BEGIN, KW_BIGINT, KW_BINLOG, KW_BITMAP, KW_BITMAP_UNION, KW_BLOB, KW_BOOLEAN, KW_BROKER, KW_BACKENDS, KW_BY, KW_BUILTIN, + KW_BACKEND, KW_BACKUP, KW_BETWEEN, KW_BEGIN, KW_BIGINT, KW_BINLOG, KW_BITMAP, KW_BITMAP_UNION, KW_QUANTILE_STATE, KW_QUANTILE_UNION, KW_BLOB, KW_BOOLEAN, KW_BROKER, KW_BACKENDS, KW_BY, KW_BUILTIN, KW_CANCEL, KW_CASE, KW_CAST, KW_CHAIN, KW_CHAR, KW_CHARSET, KW_CHECK, KW_CLUSTER, KW_CLUSTERS, KW_CLEAN, KW_COLLATE, KW_COLLATION, KW_COLUMN, KW_COLUMNS, KW_COMMENT, KW_COMMIT, KW_COMMITTED, KW_COMPACT, KW_CONFIG, KW_CONNECTION, KW_CONNECTION_ID, KW_CONSISTENT, KW_CONVERT, KW_COUNT, KW_CREATE, KW_CREATION, KW_CROSS, KW_CUBE, KW_CURRENT, KW_CURRENT_USER, @@ -2100,6 +2100,12 @@ opt_agg_type ::= {: RESULT = AggregateType.BITMAP_UNION; :} + + + | KW_QUANTILE_UNION + {: + RESULT = AggregateType.QUANTILE_UNION; + :} ; opt_partition ::= @@ -4566,6 +4572,8 @@ type ::= {: RESULT = Type.TIME; :} | KW_BITMAP {: RESULT = Type.BITMAP; :} + | KW_QUANTILE_STATE + {: RESULT = Type.QUANTILE_STATE; :} | KW_STRING {: RESULT = ScalarType.createStringType(); :} | KW_TEXT @@ -5418,8 +5426,12 @@ keyword ::= {: RESULT = id; :} | KW_BITMAP:id {: RESULT = id; :} + | KW_QUANTILE_STATE:id + {: RESULT = id; :} | KW_BITMAP_UNION:id {: RESULT = id; :} + | KW_QUANTILE_UNION:id + {: RESULT = id; :} | KW_BLOB:id {: RESULT = id; :} | KW_BOOLEAN:id diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java index 16e0609c40..f370efbd78 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java @@ -364,8 +364,8 @@ public class CreateTableStmt extends DdlStmt { && keysDesc.getKeysType() == KeysType.UNIQUE_KEYS) { columnDefs.add(ColumnDef.newDeleteSignColumnDef(AggregateType.REPLACE)); } - boolean hasHll = false; - boolean hasBitmap = false; + boolean hasObjectStored = false; + String objectStoredColumn = ""; Set columnSet = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER); for (ColumnDef columnDef : columnDefs) { columnDef.analyze(engineName.equals("olap")); @@ -380,12 +380,9 @@ public class CreateTableStmt extends DdlStmt { } } - if (columnDef.getType().isHllType()) { - hasHll = true; - } - - if (columnDef.getAggregateType() == AggregateType.BITMAP_UNION) { - hasBitmap = columnDef.getType().isBitmapType(); + if (columnDef.getType().isObjectStored()) { + hasObjectStored = true; + objectStoredColumn = columnDef.getName(); } if (!columnSet.add(columnDef.getName())) { @@ -393,12 +390,8 @@ public class CreateTableStmt extends DdlStmt { } } - if (hasHll && keysDesc.getKeysType() != KeysType.AGG_KEYS) { - throw new AnalysisException("HLL must be used in AGG_KEYS"); - } - - if (hasBitmap && keysDesc.getKeysType() != KeysType.AGG_KEYS) { - throw new AnalysisException("BITMAP_UNION must be used in AGG_KEYS"); + if (hasObjectStored && keysDesc.getKeysType() != KeysType.AGG_KEYS) { + throw new AnalysisException("column:" + objectStoredColumn + " must be used in AGG_KEYS."); } if (engineName.equals("olap")) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java index a4a9b0d493..750c42f95a 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java @@ -1261,6 +1261,11 @@ abstract public class Expr extends TreeNode implements ParseNode, Cloneabl if (targetType.getPrimitiveType() == PrimitiveType.BITMAP) { throw new AnalysisException("bitmap column require the function return type is BITMAP"); } + // TODO(weixiang): why bitmap is so strict but hll is not strict, may be bitmap can be same to hll + // here `quantile_state` is also strict now. may be can be same to hll too. + if (targetType.getPrimitiveType() == PrimitiveType.QUANTILE_STATE) { + throw new AnalysisException("quantile_state column require the function return type is QUANTILE_STATE"); + } // TargetTable's hll column must be hll_hash's result if (targetType.getPrimitiveType() == PrimitiveType.HLL) { checkHllCompatibility(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java index eb94fcdfec..c0e7922953 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java @@ -532,6 +532,25 @@ public class FunctionCallExpr extends Expr { return; } + if (fnName.getFunction().equalsIgnoreCase(FunctionSet.QUANTILE_UNION)) { + if (children.size() != 1) { + throw new AnalysisException(fnName + "function could only have one child"); + } + Type inputType = getChild(0).getType(); + if (!inputType.isQuantileStateType()) { + throw new AnalysisException(fnName + " function's argument should be of QUANTILE_STATE type, but was" + inputType); + } + } + + if (fnName.getFunction().equalsIgnoreCase(FunctionSet.TO_QUANTILE_STATE)) { + if (children.size() != 2) { + throw new AnalysisException(fnName + "function must have two children"); + } + if (!getChild(1).isConstant()) { + throw new AnalysisException(fnName + "function's second argument should be constant"); + } + } + if ((fnName.getFunction().equalsIgnoreCase("HLL_UNION_AGG") || fnName.getFunction().equalsIgnoreCase("HLL_CARDINALITY") || fnName.getFunction().equalsIgnoreCase("HLL_RAW_AGG")) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java index af1b2921cc..5d34004fcf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java @@ -419,11 +419,8 @@ public class InsertStmt extends DdlStmt { } // hll column mush in mentionedColumns for (Column col : targetTable.getBaseSchema()) { - if (col.getType().isHllType() && !mentionedColumns.contains(col.getName())) { - throw new AnalysisException (" hll column " + col.getName() + " mush in insert into columns"); - } - if (col.getType().isBitmapType() && !mentionedColumns.contains(col.getName())) { - throw new AnalysisException (" object column " + col.getName() + " mush in insert into columns"); + if (col.getType().isObjectStored() && !mentionedColumns.contains(col.getName())) { + throw new AnalysisException (" object-stored column " + col.getName() + " mush in insert into columns"); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java index 779800c3ed..376fefb781 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java @@ -1132,12 +1132,8 @@ public class SelectStmt extends QueryStmt { "GROUP BY clause?): " + orderByElements.get(i).getExpr().toSql()); } - if (sortInfo.getOrderingExprs().get(i).type.isHllType()) { - throw new AnalysisException("ORDER BY expression could not contain hll column."); - } - - if (sortInfo.getOrderingExprs().get(i).type.isBitmapType()) { - throw new AnalysisException("ORDER BY expression could not contain bitmap column."); + if (sortInfo.getOrderingExprs().get(i).type.isObjectStored()) { + throw new AnalysisException("ORDER BY expression could not contain object-stored columnx."); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/AggregateType.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/AggregateType.java index bcd9e50ba6..df11d151b8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/AggregateType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/AggregateType.java @@ -33,7 +33,9 @@ public enum AggregateType { REPLACE_IF_NOT_NULL("REPLACE_IF_NOT_NULL"), HLL_UNION("HLL_UNION"), NONE("NONE"), - BITMAP_UNION("BITMAP_UNION"); + BITMAP_UNION("BITMAP_UNION"), + QUANTILE_UNION("QUANTILE_UNION"); + private static EnumMap> compatibilityMap; @@ -84,13 +86,14 @@ public enum AggregateType { compatibilityMap.put(MAX, EnumSet.copyOf(primitiveTypeList)); primitiveTypeList.clear(); - // all types except bitmap and hll. - EnumSet exc_bitmap_hll = EnumSet.allOf(PrimitiveType.class); - exc_bitmap_hll.remove(PrimitiveType.BITMAP); - exc_bitmap_hll.remove(PrimitiveType.HLL); - compatibilityMap.put(REPLACE, EnumSet.copyOf(exc_bitmap_hll)); + // all types except object stored column type, such as bitmap hll quantile_state. + EnumSet exc_object_stored = EnumSet.allOf(PrimitiveType.class); + exc_object_stored.remove(PrimitiveType.BITMAP); + exc_object_stored.remove(PrimitiveType.HLL); + exc_object_stored.remove(PrimitiveType.QUANTILE_STATE); + compatibilityMap.put(REPLACE, EnumSet.copyOf(exc_object_stored)); - compatibilityMap.put(REPLACE_IF_NOT_NULL, EnumSet.copyOf(exc_bitmap_hll)); + compatibilityMap.put(REPLACE_IF_NOT_NULL, EnumSet.copyOf(exc_object_stored)); primitiveTypeList.clear(); primitiveTypeList.add(PrimitiveType.HLL); @@ -100,7 +103,11 @@ public enum AggregateType { primitiveTypeList.add(PrimitiveType.BITMAP); compatibilityMap.put(BITMAP_UNION, EnumSet.copyOf(primitiveTypeList)); - compatibilityMap.put(NONE, EnumSet.copyOf(exc_bitmap_hll)); + primitiveTypeList.clear(); + primitiveTypeList.add(PrimitiveType.QUANTILE_STATE); + compatibilityMap.put(QUANTILE_UNION, EnumSet.copyOf(primitiveTypeList)); + + compatibilityMap.put(NONE, EnumSet.copyOf(exc_object_stored)); } private final String sqlName; @@ -153,6 +160,8 @@ public enum AggregateType { return TAggregationType.HLL_UNION; case BITMAP_UNION: return TAggregationType.BITMAP_UNION; + case QUANTILE_UNION: + return TAggregationType.QUANTILE_UNION; default: return null; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Function.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Function.java index bfc29d7dc2..17f03a3cba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Function.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Function.java @@ -492,6 +492,7 @@ public class Function implements Writable { case CHAR: case HLL: case BITMAP: + case QUANTILE_STATE: case STRING: return "string_val"; case DATE: @@ -530,6 +531,7 @@ public class Function implements Writable { case CHAR: case HLL: case BITMAP: + case QUANTILE_STATE: case STRING: return "StringVal"; case DATE: diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java index 9d49cc534b..cf3da2eee2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java @@ -760,6 +760,11 @@ public class FunctionSet ORTHOGONAL_BITMAP_INTERSECT_INIT_SYMBOL = ImmutableMap.builder() .put(Type.TINYINT, @@ -1966,8 +1971,27 @@ public class FunctionSet getIntegerTypes() { @@ -392,7 +401,8 @@ public enum PrimitiveType { compatibilityMatrix[NULL_TYPE.ordinal()][STRING.ordinal()] = STRING; compatibilityMatrix[NULL_TYPE.ordinal()][DECIMALV2.ordinal()] = DECIMALV2; compatibilityMatrix[NULL_TYPE.ordinal()][TIME.ordinal()] = TIME; - compatibilityMatrix[NULL_TYPE.ordinal()][BITMAP.ordinal()] = BITMAP; + compatibilityMatrix[NULL_TYPE.ordinal()][BITMAP.ordinal()] = BITMAP; //TODO(weixiang): bitmap can be null? + compatibilityMatrix[NULL_TYPE.ordinal()][QUANTILE_STATE.ordinal()] = QUANTILE_STATE; //TODO(weixiang): QUANTILE_STATE can be null? compatibilityMatrix[BOOLEAN.ordinal()][BOOLEAN.ordinal()] = BOOLEAN; compatibilityMatrix[BOOLEAN.ordinal()][TINYINT.ordinal()] = TINYINT; @@ -533,6 +543,8 @@ public enum PrimitiveType { compatibilityMatrix[BITMAP.ordinal()][BITMAP.ordinal()] = BITMAP; compatibilityMatrix[TIME.ordinal()][TIME.ordinal()] = TIME; + + compatibilityMatrix[QUANTILE_STATE.ordinal()][QUANTILE_STATE.ordinal()] = QUANTILE_STATE; } static { @@ -606,6 +618,8 @@ public enum PrimitiveType { return HLL; case OBJECT: return BITMAP; + case QUANTILE_STATE: + return QUANTILE_STATE; case ARRAY: return ARRAY; case MAP: diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ScalarFunction.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ScalarFunction.java index 66f0a6252b..e1f09fb248 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ScalarFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ScalarFunction.java @@ -162,6 +162,7 @@ public class ScalarFunction extends Function { case HLL: case BITMAP: case STRING: + case QUANTILE_STATE: beFn += "_string_val"; break; case DATE: diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ScalarType.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ScalarType.java index 9484d7a7bd..a3ec20168b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ScalarType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ScalarType.java @@ -151,6 +151,8 @@ public class ScalarType extends Type { return createHllType(); case BITMAP: return BITMAP; + case QUANTILE_STATE: + return QUANTILE_STATE; case DATE: return DATE; case DATETIME: @@ -202,6 +204,8 @@ public class ScalarType extends Type { return createHllType(); case "BITMAP": return BITMAP; + case "QUANTILE_STATE": + return QUANTILE_STATE; case "DATE": return DATE; case "DATETIME": @@ -382,6 +386,7 @@ public class ScalarType extends Type { case DATETIME: case HLL: case BITMAP: + case QUANTILE_STATE: stringBuilder.append(type.toString().toLowerCase()); break; case STRING: @@ -756,6 +761,8 @@ public class ScalarType extends Type { return 16385; case BITMAP: return 1024; // this is a estimated value + case QUANTILE_STATE: + return 1024; // TODO(weixiang): no used in FE, figure out whether can delete this funcion? case STRING: return 1024; default: diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java index 3f0b8d2d9c..be3f69f992 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java @@ -78,6 +78,7 @@ public abstract class Type { public static final ScalarType HLL = ScalarType.createHllType(); public static final ScalarType CHAR = (ScalarType) ScalarType.createCharType(-1); public static final ScalarType BITMAP = new ScalarType(PrimitiveType.BITMAP); + public static final ScalarType QUANTILE_STATE = new ScalarType(PrimitiveType.QUANTILE_STATE); // Only used for alias function, to represent any type in function args public static final ScalarType ALL = new ScalarType(PrimitiveType.ALL); public static final MapType Map = new MapType(); @@ -118,6 +119,7 @@ public abstract class Type { supportedTypes.add(VARCHAR); supportedTypes.add(HLL); supportedTypes.add(BITMAP); + supportedTypes.add(QUANTILE_STATE); supportedTypes.add(CHAR); supportedTypes.add(DATE); supportedTypes.add(DATETIME); @@ -195,7 +197,9 @@ public abstract class Type { // 3. don't support group by // 4. don't support index public boolean isOnlyMetricType() { - return isScalarType(PrimitiveType.HLL) || isScalarType(PrimitiveType.BITMAP); + // now only_metric_type is the same to object_stored_type + // but actually they are not same in semantics. + return isObjectStored(); } public static final String OnlyMetricTypeErrorMsg = @@ -210,6 +214,12 @@ public abstract class Type { return isScalarType(PrimitiveType.BITMAP); } + public boolean isQuantileStateType() { return isScalarType(PrimitiveType.QUANTILE_STATE); } + + public boolean isObjectStored() { + return isHllType() || isBitmapType() || isQuantileStateType(); + } + public boolean isScalarType() { return this instanceof ScalarType; } @@ -528,6 +538,8 @@ public abstract class Type { return new StructType(); case BITMAP: return Type.BITMAP; + case QUANTILE_STATE: + return Type.QUANTILE_STATE; default: return null; } @@ -824,6 +836,7 @@ public abstract class Type { compatibilityMatrix[BOOLEAN.ordinal()][TIME.ordinal()] = PrimitiveType.DOUBLE; compatibilityMatrix[BOOLEAN.ordinal()][BITMAP.ordinal()] = PrimitiveType.INVALID_TYPE; compatibilityMatrix[BOOLEAN.ordinal()][STRING.ordinal()] = PrimitiveType.INVALID_TYPE; + compatibilityMatrix[BOOLEAN.ordinal()][QUANTILE_STATE.ordinal()] = PrimitiveType.INVALID_TYPE; // TINYINT @@ -843,6 +856,7 @@ public abstract class Type { compatibilityMatrix[TINYINT.ordinal()][TIME.ordinal()] = PrimitiveType.DOUBLE; compatibilityMatrix[TINYINT.ordinal()][BITMAP.ordinal()] = PrimitiveType.INVALID_TYPE; compatibilityMatrix[TINYINT.ordinal()][STRING.ordinal()] = PrimitiveType.INVALID_TYPE; + compatibilityMatrix[TINYINT.ordinal()][QUANTILE_STATE.ordinal()] = PrimitiveType.INVALID_TYPE; // SMALLINT compatibilityMatrix[SMALLINT.ordinal()][INT.ordinal()] = PrimitiveType.INT; @@ -860,6 +874,7 @@ public abstract class Type { compatibilityMatrix[SMALLINT.ordinal()][TIME.ordinal()] = PrimitiveType.DOUBLE; compatibilityMatrix[SMALLINT.ordinal()][BITMAP.ordinal()] = PrimitiveType.INVALID_TYPE; compatibilityMatrix[SMALLINT.ordinal()][STRING.ordinal()] = PrimitiveType.INVALID_TYPE; + compatibilityMatrix[SMALLINT.ordinal()][QUANTILE_STATE.ordinal()] = PrimitiveType.INVALID_TYPE; // INT compatibilityMatrix[INT.ordinal()][BIGINT.ordinal()] = PrimitiveType.BIGINT; @@ -880,6 +895,8 @@ public abstract class Type { compatibilityMatrix[INT.ordinal()][TIME.ordinal()] = PrimitiveType.DOUBLE; compatibilityMatrix[INT.ordinal()][BITMAP.ordinal()] = PrimitiveType.INVALID_TYPE; compatibilityMatrix[INT.ordinal()][STRING.ordinal()] = PrimitiveType.INVALID_TYPE; + compatibilityMatrix[INT.ordinal()][QUANTILE_STATE.ordinal()] = PrimitiveType.INVALID_TYPE; + // BIGINT // 64 bit integer does not fit in mantissa of double or float. @@ -901,6 +918,7 @@ public abstract class Type { compatibilityMatrix[BIGINT.ordinal()][TIME.ordinal()] = PrimitiveType.DOUBLE; compatibilityMatrix[BIGINT.ordinal()][BITMAP.ordinal()] = PrimitiveType.INVALID_TYPE; compatibilityMatrix[BIGINT.ordinal()][STRING.ordinal()] = PrimitiveType.INVALID_TYPE; + compatibilityMatrix[BIGINT.ordinal()][QUANTILE_STATE.ordinal()] = PrimitiveType.INVALID_TYPE; // LARGEINT compatibilityMatrix[LARGEINT.ordinal()][FLOAT.ordinal()] = PrimitiveType.DOUBLE; @@ -914,6 +932,7 @@ public abstract class Type { compatibilityMatrix[LARGEINT.ordinal()][TIME.ordinal()] = PrimitiveType.DOUBLE; compatibilityMatrix[LARGEINT.ordinal()][BITMAP.ordinal()] = PrimitiveType.INVALID_TYPE; compatibilityMatrix[LARGEINT.ordinal()][STRING.ordinal()] = PrimitiveType.INVALID_TYPE; + compatibilityMatrix[LARGEINT.ordinal()][QUANTILE_STATE.ordinal()] = PrimitiveType.INVALID_TYPE; // FLOAT compatibilityMatrix[FLOAT.ordinal()][DOUBLE.ordinal()] = PrimitiveType.DOUBLE; @@ -926,6 +945,7 @@ public abstract class Type { compatibilityMatrix[FLOAT.ordinal()][TIME.ordinal()] = PrimitiveType.DOUBLE; compatibilityMatrix[FLOAT.ordinal()][BITMAP.ordinal()] = PrimitiveType.INVALID_TYPE; compatibilityMatrix[FLOAT.ordinal()][STRING.ordinal()] = PrimitiveType.INVALID_TYPE; + compatibilityMatrix[FLOAT.ordinal()][QUANTILE_STATE.ordinal()] = PrimitiveType.INVALID_TYPE; // DOUBLE compatibilityMatrix[DOUBLE.ordinal()][DATE.ordinal()] = PrimitiveType.INVALID_TYPE; @@ -937,6 +957,7 @@ public abstract class Type { compatibilityMatrix[DOUBLE.ordinal()][TIME.ordinal()] = PrimitiveType.DOUBLE; compatibilityMatrix[DOUBLE.ordinal()][BITMAP.ordinal()] = PrimitiveType.INVALID_TYPE; compatibilityMatrix[DOUBLE.ordinal()][STRING.ordinal()] = PrimitiveType.INVALID_TYPE; + compatibilityMatrix[DOUBLE.ordinal()][QUANTILE_STATE.ordinal()] = PrimitiveType.INVALID_TYPE; // DATE compatibilityMatrix[DATE.ordinal()][DATETIME.ordinal()] = PrimitiveType.DATETIME; @@ -947,6 +968,7 @@ public abstract class Type { compatibilityMatrix[DATE.ordinal()][TIME.ordinal()] = PrimitiveType.INVALID_TYPE; compatibilityMatrix[DATE.ordinal()][BITMAP.ordinal()] = PrimitiveType.INVALID_TYPE; compatibilityMatrix[DATE.ordinal()][STRING.ordinal()] = PrimitiveType.INVALID_TYPE; + compatibilityMatrix[DATE.ordinal()][QUANTILE_STATE.ordinal()] = PrimitiveType.INVALID_TYPE; // DATETIME compatibilityMatrix[DATETIME.ordinal()][CHAR.ordinal()] = PrimitiveType.INVALID_TYPE; @@ -956,6 +978,7 @@ public abstract class Type { compatibilityMatrix[DATETIME.ordinal()][TIME.ordinal()] = PrimitiveType.INVALID_TYPE; compatibilityMatrix[DATETIME.ordinal()][BITMAP.ordinal()] = PrimitiveType.INVALID_TYPE; compatibilityMatrix[DATETIME.ordinal()][STRING.ordinal()] = PrimitiveType.INVALID_TYPE; + compatibilityMatrix[DATETIME.ordinal()][QUANTILE_STATE.ordinal()] = PrimitiveType.INVALID_TYPE; // We can convert some but not all string values to timestamps. // CHAR @@ -965,6 +988,7 @@ public abstract class Type { compatibilityMatrix[CHAR.ordinal()][TIME.ordinal()] = PrimitiveType.INVALID_TYPE; compatibilityMatrix[CHAR.ordinal()][BITMAP.ordinal()] = PrimitiveType.INVALID_TYPE; compatibilityMatrix[CHAR.ordinal()][STRING.ordinal()] = PrimitiveType.STRING; + compatibilityMatrix[CHAR.ordinal()][QUANTILE_STATE.ordinal()] = PrimitiveType.INVALID_TYPE; // VARCHAR compatibilityMatrix[VARCHAR.ordinal()][DECIMALV2.ordinal()] = PrimitiveType.INVALID_TYPE; @@ -972,11 +996,13 @@ public abstract class Type { compatibilityMatrix[VARCHAR.ordinal()][TIME.ordinal()] = PrimitiveType.INVALID_TYPE; compatibilityMatrix[VARCHAR.ordinal()][BITMAP.ordinal()] = PrimitiveType.INVALID_TYPE; compatibilityMatrix[VARCHAR.ordinal()][STRING.ordinal()] = PrimitiveType.STRING; + compatibilityMatrix[VARCHAR.ordinal()][QUANTILE_STATE.ordinal()] = PrimitiveType.INVALID_TYPE; //String compatibilityMatrix[STRING.ordinal()][HLL.ordinal()] = PrimitiveType.INVALID_TYPE; compatibilityMatrix[STRING.ordinal()][TIME.ordinal()] = PrimitiveType.INVALID_TYPE; compatibilityMatrix[STRING.ordinal()][BITMAP.ordinal()] = PrimitiveType.INVALID_TYPE; + compatibilityMatrix[STRING.ordinal()][QUANTILE_STATE.ordinal()] = PrimitiveType.INVALID_TYPE; // DECIMALV2 @@ -984,19 +1010,26 @@ public abstract class Type { compatibilityMatrix[DECIMALV2.ordinal()][TIME.ordinal()] = PrimitiveType.INVALID_TYPE; compatibilityMatrix[DECIMALV2.ordinal()][BITMAP.ordinal()] = PrimitiveType.INVALID_TYPE; compatibilityMatrix[DECIMALV2.ordinal()][STRING.ordinal()] = PrimitiveType.INVALID_TYPE; + compatibilityMatrix[DECIMALV2.ordinal()][QUANTILE_STATE.ordinal()] = PrimitiveType.INVALID_TYPE; // HLL compatibilityMatrix[HLL.ordinal()][TIME.ordinal()] = PrimitiveType.INVALID_TYPE; compatibilityMatrix[HLL.ordinal()][BITMAP.ordinal()] = PrimitiveType.INVALID_TYPE; compatibilityMatrix[HLL.ordinal()][STRING.ordinal()] = PrimitiveType.INVALID_TYPE; + compatibilityMatrix[HLL.ordinal()][QUANTILE_STATE.ordinal()] = PrimitiveType.INVALID_TYPE; // BITMAP compatibilityMatrix[BITMAP.ordinal()][TIME.ordinal()] = PrimitiveType.INVALID_TYPE; compatibilityMatrix[BITMAP.ordinal()][STRING.ordinal()] = PrimitiveType.INVALID_TYPE; + compatibilityMatrix[BITMAP.ordinal()][QUANTILE_STATE.ordinal()] = PrimitiveType.INVALID_TYPE; - // TIME + //QUANTILE_STATE + compatibilityMatrix[QUANTILE_STATE.ordinal()][STRING.ordinal()] = PrimitiveType.INVALID_TYPE; + + + // TIME why here not??? compatibilityMatrix[TIME.ordinal()][TIME.ordinal()] = PrimitiveType.INVALID_TYPE; // Check all of the necessary entries that should be filled. @@ -1040,6 +1073,7 @@ public abstract class Type { case VARCHAR: case HLL: case BITMAP: + case QUANTILE_STATE: return VARCHAR; case DECIMALV2: return DECIMALV2; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java index 7cbc66c5a6..1ef89510a7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java @@ -73,6 +73,7 @@ public class Util { TYPE_STRING_MAP.put(PrimitiveType.HLL, "varchar(%d)"); TYPE_STRING_MAP.put(PrimitiveType.BOOLEAN, "bool"); TYPE_STRING_MAP.put(PrimitiveType.BITMAP, "bitmap"); + TYPE_STRING_MAP.put(PrimitiveType.QUANTILE_STATE,"quantile_state"); TYPE_STRING_MAP.put(PrimitiveType.ARRAY, "Array<%s>"); TYPE_STRING_MAP.put(PrimitiveType.NULL_TYPE, "null"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java index bfab5124a9..31a679aeda 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -544,6 +544,7 @@ public class BrokerScanNode extends LoadScanNode { return rangeDesc; } + //TODO(wx):support quantile state column or forbidden it. @Override public void finalize(Analyzer analyzer) throws UserException { locationsList = Lists.newArrayList(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java index 7805f8b2d5..b6dbb4f782 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java @@ -116,6 +116,16 @@ public abstract class LoadScanNode extends ScanNode { } } + protected void checkQuantileStateCompatibility(Analyzer analyzer, SlotDescriptor slotDesc, Expr expr) throws AnalysisException { + if (slotDesc.getColumn().getAggregationType() == AggregateType.QUANTILE_UNION) { + expr.analyze(analyzer); + if (!expr.getType().isQuantileStateType()) { + String errorMsg = String.format("quantile_state column %s require the function return type is QUANTILE_STATE"); + throw new AnalysisException(errorMsg); + } + } + } + protected void finalizeParams(Map slotDescByName, Map exprMap, TBrokerScanRangeParams params, @@ -173,6 +183,10 @@ public abstract class LoadScanNode extends ScanNode { checkBitmapCompatibility(analyzer, destSlotDesc, expr); + checkQuantileStateCompatibility(analyzer, destSlotDesc, expr); + + // check quantile_state + if (negative && destSlotDesc.getColumn().getAggregationType() == AggregateType.SUM) { expr = new ArithmeticExpr(ArithmeticExpr.Operator.MULTIPLY, expr, new IntLiteral(-1)); expr.analyze(analyzer); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index b4ab81b17d..4e1369e9fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -598,7 +598,15 @@ public class SingleNodePlanner { returnColumnValidate = false; break; } - } else if (aggExpr.getFnName().getFunction().equalsIgnoreCase("multi_distinct_count")) { + } else if (aggExpr.getFnName().getFunction().equalsIgnoreCase(FunctionSet.QUANTILE_UNION)) { + if (col.getAggregationType() != AggregateType.QUANTILE_UNION) { + turnOffReason = + "Aggregate Operator not match: QUANTILE_UNION <---> " + col.getAggregationType(); + returnColumnValidate = false; + break; + } + } + else if (aggExpr.getFnName().getFunction().equalsIgnoreCase("multi_distinct_count")) { // count(distinct k1), count(distinct k2) / count(distinct k1,k2) can turn on pre aggregation if ((!col.isKey())) { turnOffReason = "Multi count or sum distinct with non-key column: " + col.getName(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/HadoopLoadPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/HadoopLoadPendingTask.java index e1944eb6bd..d3beba479a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/HadoopLoadPendingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/HadoopLoadPendingTask.java @@ -544,6 +544,10 @@ public class HadoopLoadPendingTask extends LoadPendingTask { case BITMAP: columnType = "BITMAP"; break; + // TODO(weixiang): not support in broker load. + case QUANTILE_STATE: + columnType = "QUANTILE_STATE"; + break; case DECIMALV2: columnType = "DECIMAL"; break; diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex b/fe/fe-core/src/main/jflex/sql_scanner.flex index 6f356569ce..a2eeba952c 100644 --- a/fe/fe-core/src/main/jflex/sql_scanner.flex +++ b/fe/fe-core/src/main/jflex/sql_scanner.flex @@ -111,6 +111,8 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("binlog", new Integer(SqlParserSymbols.KW_BINLOG)); keywordMap.put("bitmap", new Integer(SqlParserSymbols.KW_BITMAP)); keywordMap.put("bitmap_union", new Integer(SqlParserSymbols.KW_BITMAP_UNION)); + keywordMap.put("quantile_state", new Integer(SqlParserSymbols.KW_QUANTILE_STATE)); + keywordMap.put("quantile_union", new Integer(SqlParserSymbols.KW_QUANTILE_UNION)); keywordMap.put("blob", new Integer(SqlParserSymbols.KW_BLOB)); keywordMap.put("boolean", new Integer(SqlParserSymbols.KW_BOOLEAN)); keywordMap.put("broker", new Integer(SqlParserSymbols.KW_BROKER)); diff --git a/gensrc/script/doris_builtins_functions.py b/gensrc/script/doris_builtins_functions.py index a79a51350f..316e7b9a7a 100755 --- a/gensrc/script/doris_builtins_functions.py +++ b/gensrc/script/doris_builtins_functions.py @@ -1271,6 +1271,15 @@ visible_functions = [ [['sub_bitmap'], 'BITMAP', ['BITMAP', 'BIGINT', 'BIGINT'], '_ZN5doris15BitmapFunctions10sub_bitmapEPN9doris_udf15FunctionContextERKNS1_9StringValERKNS1_9BigIntValES9_', '', '', 'vec', 'ALWAYS_NULLABLE'], + # quantile_function + [['to_quantile_state'], 'QUANTILE_STATE', ['VARCHAR', 'FLOAT'], + '_ZN5doris22QuantileStateFunctions17to_quantile_stateEPN9doris_udf15FunctionContextERKNS1_9StringValE', + '_ZN5doris22QuantileStateFunctions25to_quantile_state_prepareEPN9doris_udf15FunctionContextENS2_18FunctionStateScopeE', '', 'vec', ''], + + [['quantile_percent'], 'DOUBLE', ['QUANTILE_STATE', 'FLOAT'], + '_ZN5doris22QuantileStateFunctions16quantile_percentEPN9doris_udf15FunctionContextERNS1_9StringValE', + '_ZN5doris22QuantileStateFunctions24quantile_percent_prepareEPN9doris_udf15FunctionContextENS2_18FunctionStateScopeE', '', 'vec', ''], + # hash functions [['murmur_hash3_32'], 'INT', ['VARCHAR', '...'], diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index a9f9a7d1de..033c5a2ca5 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -80,7 +80,8 @@ enum TPrimitiveType { MAP, STRUCT, STRING, - ALL + ALL, + QUANTILE_STATE } enum TTypeNodeType { @@ -145,7 +146,8 @@ enum TAggregationType { HLL_UNION, NONE, BITMAP_UNION, - REPLACE_IF_NOT_NULL + REPLACE_IF_NOT_NULL, + QUANTILE_UNION } enum TPushType {