[Feature](vec)(quantile_state): support quantile state in vectorized engine (#16562)

* [Feature](vectorized)(quantile_state): support vectorized quantile state functions
1. now quantile column only support not nullable
2. add up some regression test cases
3. set default enable_quantile_state_type = true
---------

Co-authored-by: spaces-x <weixiang06@meituan.com>
This commit is contained in:
spaces-x
2023-03-14 10:54:04 +08:00
committed by GitHub
parent 36a0d40ac3
commit 5b39fa9843
39 changed files with 1119 additions and 23 deletions

View File

@ -286,6 +286,24 @@ void convert_col_to_pvalue(const vectorized::ColumnPtr& column,
}
break;
}
case vectorized::TypeIndex::QuantileState: {
ptype->set_id(PGenericType::QUANTILE_STATE);
arg->mutable_bytes_value()->Reserve(row_count);
for (size_t row_num = start; row_num < end; ++row_num) {
if constexpr (nullable) {
if (column->is_null_at(row_num)) {
arg->add_bytes_value(nullptr);
} else {
StringRef data = column->get_data_at(row_num);
arg->add_bytes_value(data.data, data.size);
}
} else {
StringRef data = column->get_data_at(row_num);
arg->add_bytes_value(data.data, data.size);
}
}
break;
}
default:
LOG(INFO) << "unknown type: " << data_type->get_name();
ptype->set_id(PGenericType::UNKNOWN);
@ -438,6 +456,13 @@ void convert_to_column(vectorized::MutableColumnPtr& column, const PValues& resu
}
break;
}
case PGenericType::QUANTILE_STATE: {
column->reserve(result.bytes_value_size());
for (int i = 0; i < result.bytes_value_size(); ++i) {
column->insert_data(result.bytes_value(i).c_str(), result.bytes_value(i).size());
}
break;
}
default: {
LOG(WARNING) << "unknown PGenericType: " << result.type().DebugString();
break;

View File

@ -113,22 +113,22 @@ bool QuantileState<T>::is_valid(const Slice& slice) {
}
template <typename T>
T QuantileState<T>::get_explicit_value_by_percentile(float percentile) {
T QuantileState<T>::get_explicit_value_by_percentile(float percentile) const {
DCHECK(_type == EXPLICIT);
int n = _explicit_data.size();
std::sort(_explicit_data.begin(), _explicit_data.end());
std::vector<T> sorted_data(_explicit_data.begin(), _explicit_data.end());
std::sort(sorted_data.begin(), sorted_data.end());
double index = (n - 1) * percentile;
int intIdx = (int)index;
if (intIdx == n - 1) {
return _explicit_data[intIdx];
return sorted_data[intIdx];
}
return _explicit_data[intIdx + 1] * (index - intIdx) +
_explicit_data[intIdx] * (intIdx + 1 - index);
return sorted_data[intIdx + 1] * (index - intIdx) + sorted_data[intIdx] * (intIdx + 1 - index);
}
template <typename T>
T QuantileState<T>::get_value_by_percentile(float percentile) {
T QuantileState<T>::get_value_by_percentile(float percentile) const {
DCHECK(percentile >= 0 && percentile <= 1);
switch (_type) {
case EMPTY: {
@ -191,7 +191,7 @@ bool QuantileState<T>::deserialize(const Slice& slice) {
}
case TDIGEST: {
// 4: Tdigest object value
_tdigest_ptr = std::make_unique<TDigest>(0);
_tdigest_ptr = std::make_shared<TDigest>(0);
_tdigest_ptr->unserialize(ptr);
break;
}
@ -241,7 +241,7 @@ size_t QuantileState<T>::serialize(uint8_t* dst) const {
}
template <typename T>
void QuantileState<T>::merge(QuantileState<T>& other) {
void QuantileState<T>::merge(const QuantileState<T>& other) {
switch (other._type) {
case EMPTY:
break;
@ -263,7 +263,7 @@ void QuantileState<T>::merge(QuantileState<T>& other) {
case EXPLICIT:
if (_explicit_data.size() + other._explicit_data.size() > QUANTILE_STATE_EXPLICIT_NUM) {
_type = TDIGEST;
_tdigest_ptr = std::make_unique<TDigest>(_compression);
_tdigest_ptr = std::make_shared<TDigest>(_compression);
for (int i = 0; i < _explicit_data.size(); i++) {
_tdigest_ptr->add(_explicit_data[i]);
}
@ -330,7 +330,7 @@ void QuantileState<T>::add_value(const T& value) {
break;
case EXPLICIT:
if (_explicit_data.size() == QUANTILE_STATE_EXPLICIT_NUM) {
_tdigest_ptr = std::make_unique<TDigest>(_compression);
_tdigest_ptr = std::make_shared<TDigest>(_compression);
for (int i = 0; i < _explicit_data.size(); i++) {
_tdigest_ptr->add(_explicit_data[i]);
}

View File

@ -49,21 +49,22 @@ public:
void set_compression(float compression);
bool deserialize(const Slice& slice);
size_t serialize(uint8_t* dst) const;
void merge(QuantileState<T>& other);
void merge(const QuantileState<T>& other);
void add_value(const T& value);
void clear();
bool is_valid(const Slice& slice);
size_t get_serialized_size();
T get_value_by_percentile(float percentile);
T get_explicit_value_by_percentile(float percentile);
T get_value_by_percentile(float percentile) const;
T get_explicit_value_by_percentile(float percentile) const;
~QuantileState() = default;
private:
QuantileStateType _type = EMPTY;
std::unique_ptr<TDigest> _tdigest_ptr;
std::shared_ptr<TDigest> _tdigest_ptr;
T _single_data;
std::vector<T> _explicit_data;
float _compression;
};
using QuantileStateDouble = QuantileState<double>;
} // namespace doris

View File

@ -46,6 +46,7 @@ set(VEC_FILES
aggregate_functions/aggregate_function_orthogonal_bitmap.cpp
aggregate_functions/aggregate_function_avg_weighted.cpp
aggregate_functions/aggregate_function_histogram.cpp
aggregate_functions/aggregate_function_quantile_state.cpp
columns/column.cpp
columns/column_array.cpp
columns/column_struct.cpp
@ -95,6 +96,7 @@ set(VEC_FILES
data_types/data_type_string.cpp
data_types/data_type_decimal.cpp
data_types/data_type_map.cpp
data_types/data_type_quantilestate.cpp
data_types/get_least_supertype.cpp
data_types/convert_field_to_type.cpp
data_types/nested_utils.cpp
@ -249,6 +251,7 @@ set(VEC_FILES
functions/function_running_difference.cpp
functions/function_width_bucket.cpp
functions/match.cpp
functions/function_quantile_state.cpp
jsonb/serialize.cpp
olap/vgeneric_iterators.cpp

View File

@ -0,0 +1,41 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/aggregate_functions/aggregate_function_quantile_state.h"
#include "vec/aggregate_functions//aggregate_function_simple_factory.h"
namespace doris::vectorized {
AggregateFunctionPtr create_aggregate_function_quantile_state_union(const std::string& name,
const DataTypes& argument_types,
const bool result_is_nullable) {
const bool arg_is_nullable = argument_types[0]->is_nullable();
if (arg_is_nullable) {
return std::make_shared<AggregateFunctionQuantileStateOp<
true, AggregateFunctionQuantileStateUnionOp, double>>(argument_types);
} else {
return std::make_shared<AggregateFunctionQuantileStateOp<
false, AggregateFunctionQuantileStateUnionOp, double>>(argument_types);
}
}
void register_aggregate_function_quantile_state(AggregateFunctionSimpleFactory& factory) {
factory.register_function("quantile_union", create_aggregate_function_quantile_state_union);
}
} // namespace doris::vectorized

View File

@ -0,0 +1,153 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "util/quantile_state.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/columns/column_complex.h"
#include "vec/columns/column_nullable.h"
#include "vec/common/assert_cast.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_number.h"
#include "vec/data_types/data_type_quantilestate.h"
namespace doris::vectorized {
struct AggregateFunctionQuantileStateUnionOp {
static constexpr auto name = "quantile_union";
template <typename T>
static void add(QuantileState<T>& res, const T& data, bool& is_first) {
res.add_value(data);
}
template <typename T>
static void add(QuantileState<T>& res, const QuantileState<T>& data, bool& is_first) {
if (UNLIKELY(is_first)) {
res = data;
is_first = false;
} else {
res.merge(data);
}
}
template <typename T>
static void merge(QuantileState<T>& res, const QuantileState<T>& data, bool& is_first) {
if (UNLIKELY(is_first)) {
res = data;
is_first = false;
} else {
res.merge(data);
}
}
};
template <typename Op, typename InternalType>
struct AggregateFunctionQuantileStateData {
using DataType = QuantileState<InternalType>;
DataType value;
bool is_first = true;
template <typename T>
void add(const T& data) {
Op::add(value, data, is_first);
}
void merge(const DataType& data) { Op::merge(value, data, is_first); }
void write(BufferWritable& buf) const {
DataTypeQuantileState<InternalType>::serialize_as_stream(value, buf);
}
void read(BufferReadable& buf) {
DataTypeQuantileState<InternalType>::deserialize_as_stream(value, buf);
}
void reset() { is_first = true; }
DataType& get() { return value; }
};
template <bool arg_is_nullable, typename Op, typename InternalType>
class AggregateFunctionQuantileStateOp final
: public IAggregateFunctionDataHelper<
AggregateFunctionQuantileStateData<Op, InternalType>,
AggregateFunctionQuantileStateOp<arg_is_nullable, Op, InternalType>> {
public:
using ResultDataType = QuantileState<InternalType>;
using ColVecType = ColumnQuantileState<InternalType>;
using ColVecResult = ColumnQuantileState<InternalType>;
String get_name() const override { return Op::name; }
AggregateFunctionQuantileStateOp(const DataTypes& argument_types_)
: IAggregateFunctionDataHelper<
AggregateFunctionQuantileStateData<Op, InternalType>,
AggregateFunctionQuantileStateOp<arg_is_nullable, Op, InternalType>>(
argument_types_) {}
DataTypePtr get_return_type() const override {
return std::make_shared<DataTypeQuantileState<InternalType>>();
}
void add(AggregateDataPtr __restrict place, const IColumn** columns, size_t row_num,
Arena*) const override {
if constexpr (arg_is_nullable) {
auto& nullable_column = assert_cast<const ColumnNullable&>(*columns[0]);
if (!nullable_column.is_null_at(row_num)) {
const auto& column =
static_cast<const ColVecType&>(nullable_column.get_nested_column());
this->data(place).add(column.get_data()[row_num]);
}
} else {
const auto& column = static_cast<const ColVecType&>(*columns[0]);
this->data(place).add(column.get_data()[row_num]);
}
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
Arena*) const override {
this->data(place).merge(
const_cast<AggregateFunctionQuantileStateData<Op, InternalType>&>(this->data(rhs))
.get());
}
void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override {
this->data(place).write(buf);
}
void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
Arena*) const override {
this->data(place).read(buf);
}
void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override {
auto& column = static_cast<ColVecResult&>(to);
column.get_data().push_back(
const_cast<AggregateFunctionQuantileStateData<Op, InternalType>&>(this->data(place))
.get());
}
void reset(AggregateDataPtr __restrict place) const override { this->data(place).reset(); }
};
AggregateFunctionPtr create_aggregate_function_quantile_state_union(const std::string& name,
const DataTypes& argument_types,
const bool result_is_nullable);
} // namespace doris::vectorized

View File

@ -34,6 +34,7 @@ void register_aggregate_function_reader_load(AggregateFunctionSimpleFactory& fac
register_function_both("bitmap_union", create_aggregate_function_bitmap_union);
register_function_both("hll_union",
create_aggregate_function_HLL<AggregateFunctionHLLUnionImpl>);
register_function_both("quantile_union", create_aggregate_function_quantile_state_union);
}
// only replace function in load/reader do different agg operation.

View File

@ -20,6 +20,7 @@
#include "vec/aggregate_functions/aggregate_function_bitmap.h"
#include "vec/aggregate_functions/aggregate_function_hll_union_agg.h"
#include "vec/aggregate_functions/aggregate_function_min_max.h"
#include "vec/aggregate_functions/aggregate_function_quantile_state.h"
#include "vec/aggregate_functions/aggregate_function_reader_first_last.h"
#include "vec/aggregate_functions/aggregate_function_simple_factory.h"
#include "vec/aggregate_functions/aggregate_function_sum.h"

View File

@ -38,6 +38,7 @@ void register_aggregate_function_HLL_union_agg(AggregateFunctionSimpleFactory& f
void register_aggregate_function_uniq(AggregateFunctionSimpleFactory& factory);
void register_aggregate_function_bit(AggregateFunctionSimpleFactory& factory);
void register_aggregate_function_bitmap(AggregateFunctionSimpleFactory& factory);
void register_aggregate_function_quantile_state(AggregateFunctionSimpleFactory& factory);
void register_aggregate_function_window_rank(AggregateFunctionSimpleFactory& factory);
void register_aggregate_function_window_lead_lag_first_last(
AggregateFunctionSimpleFactory& factory);
@ -69,6 +70,7 @@ AggregateFunctionSimpleFactory& AggregateFunctionSimpleFactory::instance() {
register_aggregate_function_bit(instance);
register_aggregate_function_bitmap(instance);
register_aggregate_function_group_concat(instance);
register_aggregate_function_quantile_state(instance);
register_aggregate_function_combinator_distinct(instance);
register_aggregate_function_reader_load(
instance); // register aggregate function for agg reader

View File

@ -578,6 +578,8 @@ public:
virtual bool is_hll() const { return false; }
virtual bool is_quantile_state() const { return false; }
// true if column has null element
virtual bool has_null() const { return false; }

View File

@ -24,6 +24,7 @@
#include "olap/hll.h"
#include "util/bitmap_value.h"
#include "util/quantile_state.h"
#include "vec/columns/column.h"
#include "vec/columns/column_impl.h"
#include "vec/columns/column_string.h"
@ -48,6 +49,7 @@ public:
bool is_bitmap() const override { return std::is_same_v<T, BitmapValue>; }
bool is_hll() const override { return std::is_same_v<T, HyperLogLog>; }
bool is_quantile_state() const override { return std::is_same_v<T, QuantileState<double>>; }
size_t size() const override { return data.size(); }
@ -75,6 +77,8 @@ public:
pvalue->deserialize(pos);
} else if constexpr (std::is_same_v<T, HyperLogLog>) {
pvalue->deserialize(Slice(pos, length));
} else if constexpr (std::is_same_v<T, QuantileStateDouble>) {
pvalue->deserialize(Slice(pos, length));
} else {
LOG(FATAL) << "Unexpected type in column complex";
}
@ -426,6 +430,13 @@ void ColumnComplexType<T>::replicate(const uint32_t* counts, size_t target_size,
using ColumnBitmap = ColumnComplexType<BitmapValue>;
using ColumnHLL = ColumnComplexType<HyperLogLog>;
template <typename T>
using ColumnQuantileState = ColumnComplexType<QuantileState<T>>;
using ColumnQuantileStateDouble = ColumnQuantileState<double>;
//template class ColumnQuantileState<double>;
template <typename T>
struct is_complex : std::false_type {};
@ -437,6 +448,10 @@ template <>
struct is_complex<HyperLogLog> : std::true_type {};
//DataTypeHLL::FieldType = HyperLogLog
template <>
struct is_complex<QuantileState<double>> : std::true_type {};
//DataTypeQuantileState::FieldType = QuantileState<double>
template <class T>
constexpr bool is_complex_v = is_complex<T>::value;

View File

@ -36,6 +36,9 @@ class HyperLogLog;
struct decimal12_t;
struct uint24_t;
template <typename T>
class QuantileState;
namespace vectorized {
/// Data types for representing elementary values from a database in RAM.
@ -85,6 +88,7 @@ enum class TypeIndex {
Map,
Struct,
VARIANT,
QuantileState,
};
struct Consted {
@ -206,6 +210,11 @@ struct TypeName<HyperLogLog> {
static const char* get() { return "HLL"; }
};
template <>
struct TypeName<QuantileState<double>> {
static const char* get() { return "QuantileState"; }
};
template <typename T>
struct TypeId;
template <>
@ -604,6 +613,8 @@ inline const char* getTypeName(TypeIndex idx) {
return "JSONB";
case TypeIndex::Struct:
return "Struct";
case TypeIndex::QuantileState:
return TypeName<QuantileState<double>>::get();
}
__builtin_unreachable();

View File

@ -145,6 +145,8 @@ PGenericType_TypeId IDataType::get_pdata_type(const IDataType* data_type) {
return PGenericType::BITMAP;
case TypeIndex::HLL:
return PGenericType::HLL;
case TypeIndex::QuantileState:
return PGenericType::QUANTILE_STATE;
case TypeIndex::Array:
return PGenericType::LIST;
case TypeIndex::Struct:

View File

@ -155,6 +155,9 @@ DataTypePtr DataTypeFactory::create_data_type(const TypeDescriptor& col_desc, bo
case TYPE_DECIMALV2:
nested = std::make_shared<vectorized::DataTypeDecimal<vectorized::Decimal128>>(27, 9);
break;
case TYPE_QUANTILE_STATE:
nested = std::make_shared<vectorized::DataTypeQuantileStateDouble>();
break;
case TYPE_DECIMAL32:
case TYPE_DECIMAL64:
case TYPE_DECIMAL128I:
@ -263,6 +266,9 @@ DataTypePtr DataTypeFactory::_create_primitive_data_type(const FieldType& type,
case OLAP_FIELD_TYPE_DECIMAL:
result = std::make_shared<vectorized::DataTypeDecimal<vectorized::Decimal128>>(27, 9);
break;
case OLAP_FIELD_TYPE_QUANTILE_STATE:
result = std::make_shared<vectorized::DataTypeQuantileStateDouble>();
break;
case OLAP_FIELD_TYPE_DECIMAL32:
case OLAP_FIELD_TYPE_DECIMAL64:
case OLAP_FIELD_TYPE_DECIMAL128I:
@ -386,6 +392,10 @@ DataTypePtr DataTypeFactory::create_data_type(const PColumnMeta& pcolumn) {
nested = std::make_shared<DataTypeObject>("object", true);
break;
}
case PGenericType::QUANTILE_STATE: {
nested = std::make_shared<DataTypeQuantileStateDouble>();
break;
}
default: {
LOG(FATAL) << fmt::format("Unknown data type: {}", pcolumn.type());
return nullptr;

View File

@ -42,6 +42,7 @@
#include "vec/data_types/data_type_nothing.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_number.h"
#include "vec/data_types/data_type_quantilestate.h"
#include "vec/data_types/data_type_string.h"
#include "vec/data_types/data_type_struct.h"
@ -85,6 +86,7 @@ public:
{"Jsonb", std::make_shared<DataTypeJsonb>()},
{"BitMap", std::make_shared<DataTypeBitMap>()},
{"Hll", std::make_shared<DataTypeHLL>()},
{"QuantileState", std::make_shared<DataTypeQuantileStateDouble>()},
};
for (auto const& [key, val] : base_type_map) {
instance.register_data_type(key, val);

View File

@ -0,0 +1,126 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/data_types/data_type_quantilestate.h"
#include "vec/columns/column_complex.h"
#include "vec/common/assert_cast.h"
#include "vec/io/io_helper.h"
namespace doris::vectorized {
// binary: <size array> | <quantilestate array>
// <size array>: row num | quantilestate1 size | quantilestate2 size | ...
// <quantilestate array>: quantilestate1 | quantilestate2 | ...
template <typename T>
int64_t DataTypeQuantileState<T>::get_uncompressed_serialized_bytes(const IColumn& column,
int be_exec_version) const {
auto ptr = column.convert_to_full_column_if_const();
auto& data_column = assert_cast<const ColumnQuantileState<T>&>(*ptr);
auto allocate_len_size = sizeof(size_t) * (column.size() + 1);
auto allocate_content_size = 0;
for (size_t i = 0; i < column.size(); ++i) {
auto& quantile_state = const_cast<QuantileState<T>&>(data_column.get_element(i));
allocate_content_size += quantile_state.get_serialized_size();
}
return allocate_len_size + allocate_content_size;
}
template <typename T>
char* DataTypeQuantileState<T>::serialize(const IColumn& column, char* buf,
int be_exec_version) const {
auto ptr = column.convert_to_full_column_if_const();
auto& data_column = assert_cast<const ColumnQuantileState<T>&>(*ptr);
// serialize the quantile_state size array, row num saves at index 0
size_t* meta_ptr = (size_t*)buf;
meta_ptr[0] = column.size();
for (size_t i = 0; i < meta_ptr[0]; ++i) {
auto& quantile_state = const_cast<QuantileState<T>&>(data_column.get_element(i));
meta_ptr[i + 1] = quantile_state.get_serialized_size();
}
// serialize each quantile_state
char* data_ptr = buf + sizeof(size_t) * (meta_ptr[0] + 1);
for (size_t i = 0; i < meta_ptr[0]; ++i) {
auto& quantile_state = const_cast<QuantileState<T>&>(data_column.get_element(i));
quantile_state.serialize((uint8_t*)data_ptr);
data_ptr += meta_ptr[i + 1];
}
return data_ptr;
}
template <typename T>
const char* DataTypeQuantileState<T>::deserialize(const char* buf, IColumn* column,
int be_exec_version) const {
auto& data_column = assert_cast<ColumnQuantileState<T>&>(*column);
auto& data = data_column.get_data();
// deserialize the quantile_state size array
const size_t* meta_ptr = reinterpret_cast<const size_t*>(buf);
// deserialize each quantile_state
data.resize(meta_ptr[0]);
const char* data_ptr = buf + sizeof(size_t) * (meta_ptr[0] + 1);
for (size_t i = 0; i < meta_ptr[0]; ++i) {
Slice slice(data_ptr, meta_ptr[i + 1]);
data[i].deserialize(slice);
data_ptr += meta_ptr[i + 1];
}
return data_ptr;
}
template <typename T>
MutableColumnPtr DataTypeQuantileState<T>::create_column() const {
return ColumnQuantileState<T>::create();
}
template <typename T>
void DataTypeQuantileState<T>::serialize_as_stream(const QuantileState<T>& cvalue,
BufferWritable& buf) {
auto& value = const_cast<QuantileState<T>&>(cvalue);
std::string memory_buffer;
int bytesize = value.get_serialized_size();
memory_buffer.resize(bytesize);
value.serialize(const_cast<uint8_t*>(reinterpret_cast<uint8_t*>(memory_buffer.data())));
write_string_binary(memory_buffer, buf);
}
template <typename T>
void DataTypeQuantileState<T>::deserialize_as_stream(QuantileState<T>& value, BufferReadable& buf) {
StringRef ref;
read_string_binary(ref, buf);
value.deserialize(ref.to_slice());
}
template <typename T>
void DataTypeQuantileState<T>::to_string(const class doris::vectorized::IColumn& column,
size_t row_num,
doris::vectorized::BufferWritable& ostr) const {
auto& data = const_cast<QuantileState<T>&>(
assert_cast<const ColumnQuantileState<T>&>(column).get_element(row_num));
std::string result(data.get_serialized_size(), '0');
data.serialize((uint8_t*)result.data());
ostr.write(result.data(), result.size());
}
template class DataTypeQuantileState<double>;
} // namespace doris::vectorized

View File

@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "util/quantile_state.h"
#include "vec/columns/column.h"
#include "vec/columns/column_complex.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
namespace doris::vectorized {
template <typename T>
class DataTypeQuantileState : public IDataType {
public:
DataTypeQuantileState() = default;
~DataTypeQuantileState() override = default;
using ColumnType = ColumnQuantileState<T>;
using FieldType = QuantileState<T>;
std::string do_get_name() const override { return get_family_name(); }
const char* get_family_name() const override { return "QuantileState"; }
TypeIndex get_type_id() const override { return TypeIndex::QuantileState; }
int64_t get_uncompressed_serialized_bytes(const IColumn& column,
int be_exec_version) const override;
char* serialize(const IColumn& column, char* buf, int be_exec_version) const override;
const char* deserialize(const char* buf, IColumn* column, int be_exec_version) const override;
MutableColumnPtr create_column() const override;
bool get_is_parametric() const override { return false; }
bool have_subtypes() const override { return false; }
bool should_align_right_in_pretty_formats() const override { return false; }
bool text_can_contain_only_valid_utf8() const override { return true; }
bool is_comparable() const override { return false; }
bool is_value_represented_by_number() const override { return false; }
bool is_value_represented_by_integer() const override { return false; }
bool is_value_represented_by_unsigned_integer() const override { return false; }
// TODO:
bool is_value_unambiguously_represented_in_contiguous_memory_region() const override {
return true;
}
bool have_maximum_size_of_value() const override { return false; }
bool can_be_used_as_version() const override { return false; }
bool can_be_inside_nullable() const override { return true; }
bool equals(const IDataType& rhs) const override { return typeid(rhs) == typeid(*this); }
bool is_categorial() const override { return is_value_represented_by_integer(); }
bool can_be_inside_low_cardinality() const override { return false; }
std::string to_string(const IColumn& column, size_t row_num) const override {
return "QuantileState()";
}
void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override;
[[noreturn]] virtual Field get_default() const override {
LOG(FATAL) << "Method get_default() is not implemented for data type " << get_name();
__builtin_unreachable();
}
static void serialize_as_stream(const QuantileState<T>& value, BufferWritable& buf);
static void deserialize_as_stream(QuantileState<T>& value, BufferReadable& buf);
};
using DataTypeQuantileStateDouble = DataTypeQuantileState<double>;
} // namespace doris::vectorized

View File

@ -0,0 +1,277 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Functions/FunctionBitmap.h
// and modified by Doris
#include "util/string_parser.hpp"
#include "vec/columns/column.h"
#include "vec/columns/column_array.h"
#include "vec/columns/columns_number.h"
#include "vec/data_types/data_type_number.h"
#include "vec/data_types/data_type_quantilestate.h"
#include "vec/functions/function_always_not_nullable.h"
#include "vec/functions/function_const.h"
#include "vec/functions/function_string.h"
#include "vec/functions/simple_function_factory.h"
namespace doris::vectorized {
template <typename InternalType>
struct QuantileStateEmpty {
static constexpr auto name = "quantile_state_empty";
using ReturnColVec = ColumnQuantileState<InternalType>;
static DataTypePtr get_return_type() {
return std::make_shared<DataTypeQuantileState<InternalType>>();
}
static auto init_value() { return QuantileState<InternalType> {}; }
};
template <typename InternalType>
class FunctionToQuantileState : public IFunction {
public:
static constexpr auto name = "to_quantile_state";
String get_name() const override { return name; }
static FunctionPtr create() {
return std::make_shared<FunctionToQuantileState<InternalType>>();
}
DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
return std::make_shared<DataTypeQuantileState<InternalType>>();
}
size_t get_number_of_arguments() const override { return 2; }
bool use_default_implementation_for_nulls() const override { return false; }
bool use_default_implementation_for_constants() const override { return true; }
template <typename ColumnType, bool is_nullable>
Status execute_internal(const ColumnPtr& column, const DataTypePtr& data_type,
MutableColumnPtr& column_result) {
auto type_error = [&]() {
return Status::RuntimeError("Illegal column {} of argument of function {}",
column->get_name(), get_name());
};
const ColumnNullable* col_nullable = nullptr;
const ColumnUInt8* col_nullmap = nullptr;
const ColumnType* col = nullptr;
const NullMap* nullmap = nullptr;
if constexpr (is_nullable) {
col_nullable = check_and_get_column<ColumnNullable>(column.get());
col_nullmap = check_and_get_column<ColumnUInt8>(
col_nullable->get_null_map_column_ptr().get());
col = check_and_get_column<ColumnType>(col_nullable->get_nested_column_ptr().get());
if (col == nullptr || col_nullmap == nullptr) {
return type_error();
}
nullmap = &col_nullmap->get_data();
} else {
col = check_and_get_column<ColumnType>(column.get());
}
auto* res_column =
reinterpret_cast<ColumnQuantileState<InternalType>*>(column_result.get());
auto& res_data = res_column->get_data();
size_t size = col->size();
for (size_t i = 0; i < size; ++i) {
if constexpr (is_nullable) {
if ((*nullmap)[i]) {
continue;
}
}
if constexpr (std::is_same_v<ColumnType, ColumnString>) {
const ColumnString::Chars& data = col->get_chars();
const ColumnString::Offsets& offsets = col->get_offsets();
const char* raw_str = reinterpret_cast<const char*>(&data[offsets[i - 1]]);
size_t str_size = offsets[i] - offsets[i - 1];
StringParser::ParseResult parse_result = StringParser::PARSE_SUCCESS;
InternalType value = StringParser::string_to_float<InternalType>(raw_str, str_size,
&parse_result);
if (LIKELY(parse_result == StringParser::PARSE_SUCCESS)) {
res_data[i].add_value(value);
} else {
std::stringstream ss;
ss << "The input column content: " << std::string(raw_str, str_size)
<< " is not valid in function: " << get_name();
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
} else if constexpr (std::is_same_v<ColumnType, ColumnInt64> ||
std::is_same_v<ColumnType, ColumnFloat32> ||
std::is_same_v<ColumnType, ColumnFloat64>) {
// InternalType only can be double or float, so we can cast directly
InternalType value = (InternalType)col->get_data()[i];
res_data[i].set_compression(compression);
res_data[i].add_value(value);
} else {
type_error();
}
}
return Status::OK();
}
Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
size_t result, size_t input_rows_count) override {
if constexpr (!(std::is_same_v<InternalType, float> ||
std::is_same_v<InternalType, double>)) {
std::stringstream ss;
ss << "The InternalType of quantile_state must be float or double";
return Status::InternalError(ss.str());
}
const ColumnPtr& column = block.get_by_position(arguments[0]).column;
const DataTypePtr& data_type = block.get_by_position(arguments[0]).type;
auto compression_arg = check_and_get_column_const<ColumnFloat32>(
block.get_by_position(arguments.back()).column);
if (compression_arg) {
auto compression_arg_val = compression_arg->get_value<Float32>();
if (compression_arg_val && compression_arg_val >= QUANTILE_STATE_COMPRESSION_MIN &&
compression_arg_val <= QUANTILE_STATE_COMPRESSION_MAX) {
this->compression = compression_arg_val;
}
}
WhichDataType which(data_type);
MutableColumnPtr column_result = get_return_type_impl({})->create_column();
column_result->resize(input_rows_count);
auto type_error = [&]() {
return Status::RuntimeError("Illegal column {} of argument of function {}",
block.get_by_position(arguments[0]).column->get_name(),
get_name());
};
Status status = Status::OK();
if (which.is_nullable()) {
const DataTypePtr& nested_data_type =
static_cast<const DataTypeNullable*>(data_type.get())->get_nested_type();
WhichDataType nested_which(nested_data_type);
if (nested_which.is_string_or_fixed_string()) {
status = execute_internal<ColumnString, true>(column, data_type, column_result);
} else if (nested_which.is_int64()) {
status = execute_internal<ColumnInt64, true>(column, data_type, column_result);
} else if (which.is_float32()) {
status = execute_internal<ColumnFloat32, true>(column, data_type, column_result);
} else if (which.is_float64()) {
status = execute_internal<ColumnFloat64, true>(column, data_type, column_result);
} else {
return type_error();
}
} else {
if (which.is_string_or_fixed_string()) {
status = execute_internal<ColumnString, false>(column, data_type, column_result);
} else if (which.is_int64()) {
status = execute_internal<ColumnInt64, false>(column, data_type, column_result);
} else if (which.is_float32()) {
status = execute_internal<ColumnFloat32, false>(column, data_type, column_result);
} else if (which.is_float64()) {
status = execute_internal<ColumnFloat64, false>(column, data_type, column_result);
} else {
return type_error();
}
}
if (status.ok()) {
block.replace_by_position(result, std::move(column_result));
}
return status;
}
private:
float compression = 2048;
};
template <typename InternalType>
class FunctionQuantileStatePercent : public IFunction {
public:
static constexpr auto name = "quantile_percent";
String get_name() const override { return name; }
static FunctionPtr create() {
return std::make_shared<FunctionQuantileStatePercent<InternalType>>();
}
DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
return std::make_shared<DataTypeFloat64>();
}
size_t get_number_of_arguments() const override { return 2; }
bool use_default_implementation_for_nulls() const override { return false; }
bool use_default_implementation_for_constants() const override { return true; }
Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
size_t result, size_t input_rows_count) override {
auto res_data_column = ColumnFloat64::create();
auto& res = res_data_column->get_data();
auto data_null_map = ColumnUInt8::create(input_rows_count, 0);
auto& null_map = data_null_map->get_data();
auto column = block.get_by_position(arguments[0]).column->convert_to_full_column_if_const();
if (auto* nullable = check_and_get_column<const ColumnNullable>(*column)) {
VectorizedUtils::update_null_map(null_map, nullable->get_null_map_data());
column = nullable->get_nested_column_ptr();
}
auto str_col = assert_cast<const ColumnQuantileState<InternalType>*>(column.get());
auto& col_data = str_col->get_data();
auto percent_arg = check_and_get_column_const<ColumnFloat32>(
block.get_by_position(arguments.back()).column);
if (!percent_arg) {
LOG(FATAL) << fmt::format(
"Second argument to {} must be a constant string describing type", get_name());
}
float percent_arg_value = percent_arg->get_value<Float32>();
if (percent_arg_value < 0 || percent_arg_value > 1) {
std::stringstream ss;
ss << "the input argument of percentage: " << percent_arg_value
<< " is not valid, must be in range [0,1] ";
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
res.reserve(input_rows_count);
for (size_t i = 0; i < input_rows_count; ++i) {
if (null_map[i]) {
// if null push_back meaningless result to make sure idxs can be matched
res.push_back(0);
continue;
}
res.push_back(col_data[i].get_value_by_percentile(percent_arg_value));
}
block.replace_by_position(result, std::move(res_data_column));
return Status::OK();
}
};
using FunctionQuantileStateEmpty = FunctionConst<QuantileStateEmpty<double>, false>;
using FunctionQuantileStatePercentDouble = FunctionQuantileStatePercent<double>;
using FunctionToQuantileStateDouble = FunctionToQuantileState<double>;
void register_function_quantile_state(SimpleFunctionFactory& factory) {
factory.register_function<FunctionQuantileStateEmpty>();
factory.register_function<FunctionQuantileStatePercentDouble>();
factory.register_function<FunctionToQuantileStateDouble>();
}
} // namespace doris::vectorized

View File

@ -291,6 +291,24 @@ void RPCFnImpl::_convert_col_to_pvalue(const ColumnPtr& column, const DataTypePt
}
break;
}
case TypeIndex::QuantileState: {
ptype->set_id(PGenericType::QUANTILE_STATE);
arg->mutable_bytes_value()->Reserve(row_count);
for (size_t row_num = start; row_num < end; ++row_num) {
if constexpr (nullable) {
if (column->is_null_at(row_num)) {
arg->add_bytes_value(nullptr);
} else {
StringRef data = column->get_data_at(row_num);
arg->add_bytes_value(data.data, data.size);
}
} else {
StringRef data = column->get_data_at(row_num);
arg->add_bytes_value(data.data, data.size);
}
}
break;
}
default:
LOG(INFO) << "unknown type: " << data_type->get_name();
ptype->set_id(PGenericType::UNKNOWN);
@ -443,6 +461,13 @@ void RPCFnImpl::_convert_to_column(MutableColumnPtr& column, const PValues& resu
}
break;
}
case PGenericType::QUANTILE_STATE: {
column->reserve(result.bytes_value_size());
for (int i = 0; i < result.bytes_value_size(); ++i) {
column->insert_data(result.bytes_value(i).c_str(), result.bytes_value(i).size());
}
break;
}
default: {
LOG(WARNING) << "unknown PGenericType: " << result.type().DebugString();
break;

View File

@ -49,6 +49,7 @@ void register_function_math(SimpleFunctionFactory& factory);
void register_function_modulo(SimpleFunctionFactory& factory);
void register_function_bitmap(SimpleFunctionFactory& factory);
void register_function_bitmap_variadic(SimpleFunctionFactory& factory);
void register_function_quantile_state(SimpleFunctionFactory& factory);
void register_function_is_null(SimpleFunctionFactory& factory);
void register_function_is_not_null(SimpleFunctionFactory& factory);
void register_function_non_nullable(SimpleFunctionFactory& factory);
@ -176,6 +177,7 @@ public:
static SimpleFunctionFactory instance;
std::call_once(oc, []() {
register_function_bitmap(instance);
register_function_quantile_state(instance);
register_function_bitmap_variadic(instance);
register_function_hll_cardinality(instance);
register_function_hll_empty(instance);

View File

@ -54,6 +54,9 @@ OlapBlockDataConvertor::create_olap_column_data_convertor(const TabletColumn& co
case FieldType::OLAP_FIELD_TYPE_OBJECT: {
return std::make_unique<OlapColumnDataConvertorBitMap>();
}
case FieldType::OLAP_FIELD_TYPE_QUANTILE_STATE: {
return std::make_unique<OlapColumnDataConvertorQuantileState>();
}
case FieldType::OLAP_FIELD_TYPE_HLL: {
return std::make_unique<OlapColumnDataConvertorHLL>();
}
@ -298,6 +301,85 @@ Status OlapBlockDataConvertor::OlapColumnDataConvertorBitMap::convert_to_olap()
return Status::OK();
}
Status OlapBlockDataConvertor::OlapColumnDataConvertorQuantileState::convert_to_olap() {
assert(_typed_column.column);
const vectorized::ColumnQuantileStateDouble* column_quantile_state = nullptr;
if (_nullmap) {
auto nullable_column =
assert_cast<const vectorized::ColumnNullable*>(_typed_column.column.get());
column_quantile_state = assert_cast<const vectorized::ColumnQuantileStateDouble*>(
nullable_column->get_nested_column_ptr().get());
} else {
column_quantile_state = assert_cast<const vectorized::ColumnQuantileStateDouble*>(
_typed_column.column.get());
}
assert(column_quantile_state);
QuantileStateDouble* quantile_state =
const_cast<QuantileStateDouble*>(column_quantile_state->get_data().data() + _row_pos);
QuantileStateDouble* quantile_state_cur = quantile_state;
QuantileStateDouble* quantile_state_end = quantile_state_cur + _num_rows;
size_t total_size = 0;
if (_nullmap) {
const UInt8* nullmap_cur = _nullmap + _row_pos;
while (quantile_state_cur != quantile_state_end) {
if (!*nullmap_cur) {
total_size += quantile_state_cur->get_serialized_size();
}
++nullmap_cur;
++quantile_state_cur;
}
} else {
while (quantile_state_cur != quantile_state_end) {
total_size += quantile_state_cur->get_serialized_size();
++quantile_state_cur;
}
}
_raw_data.resize(total_size);
quantile_state_cur = quantile_state;
size_t slice_size;
char* raw_data = _raw_data.data();
Slice* slice = _slice.data();
if (_nullmap) {
const UInt8* nullmap_cur = _nullmap + _row_pos;
while (quantile_state_cur != quantile_state_end) {
if (!*nullmap_cur) {
slice_size = quantile_state_cur->get_serialized_size();
quantile_state_cur->serialize((uint8_t*)raw_data);
slice->data = raw_data;
slice->size = slice_size;
raw_data += slice_size;
} else {
// TODO: this may not be necessary, check and remove later
slice->data = nullptr;
slice->size = 0;
}
++slice;
++nullmap_cur;
++quantile_state_cur;
}
assert(nullmap_cur == _nullmap + _row_pos + _num_rows && slice == _slice.get_end_ptr());
} else {
while (quantile_state_cur != quantile_state_end) {
slice_size = quantile_state_cur->get_serialized_size();
quantile_state_cur->serialize((uint8_t*)raw_data);
slice->data = raw_data;
slice->size = slice_size;
raw_data += slice_size;
++slice;
++quantile_state_cur;
}
assert(slice == _slice.get_end_ptr());
}
return Status::OK();
}
Status OlapBlockDataConvertor::OlapColumnDataConvertorHLL::convert_to_olap() {
assert(_typed_column.column);
const vectorized::ColumnHLL* column_hll = nullptr;

View File

@ -112,6 +112,11 @@ private:
Status convert_to_olap() override;
};
class OlapColumnDataConvertorQuantileState final : public OlapColumnDataConvertorObject {
public:
Status convert_to_olap() override;
};
class OlapColumnDataConvertorChar : public OlapColumnDataConvertorBase {
public:
OlapColumnDataConvertorChar(size_t length);

View File

@ -85,7 +85,8 @@ Status VMysqlResultWriter<is_binary_format>::_add_one_column(
int buf_ret = 0;
if constexpr (type == TYPE_OBJECT || type == TYPE_VARCHAR || type == TYPE_JSONB) {
if constexpr (type == TYPE_OBJECT || type == TYPE_QUANTILE_STATE || type == TYPE_VARCHAR ||
type == TYPE_JSONB) {
for (int i = 0; i < row_size; ++i) {
if (0 != buf_ret) {
return Status::InternalError("pack mysql buffer failed.");
@ -117,6 +118,16 @@ Status VMysqlResultWriter<is_binary_format>::_add_one_column(
std::unique_ptr<char[]> buf = std::make_unique<char[]>(size);
hyperLogLog.serialize((uint8*)buf.get());
buf_ret = rows_buffer[i].push_string(buf.get(), size);
} else if (column->is_quantile_state() && output_object_data()) {
const vectorized::ColumnComplexType<QuantileStateDouble>* pColumnComplexType =
assert_cast<const vectorized::ColumnComplexType<QuantileStateDouble>*>(
column.get());
QuantileStateDouble quantileValue = pColumnComplexType->get_element(i);
size_t size = quantileValue.get_serialized_size();
std::unique_ptr<char[]> buf = std::make_unique<char[]>(size);
quantileValue.serialize((uint8_t*)buf.get());
buf_ret = rows_buffer[i].push_string(buf.get(), size);
} else {
buf_ret = rows_buffer[i].push_null();
}
@ -728,6 +739,7 @@ Status VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
break;
}
case TYPE_HLL:
case TYPE_QUANTILE_STATE:
case TYPE_OBJECT: {
if (type_ptr->is_nullable()) {
status = _add_one_column<PrimitiveType::TYPE_OBJECT, true>(column_ptr, result,

View File

@ -26,6 +26,7 @@
#include "agent/heartbeat_server.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type_bitmap.h"
#include "vec/data_types/data_type_quantilestate.h"
namespace doris::vectorized {
TEST(ColumnComplexTest, BasicTest) {
using ColumnSTLString = ColumnComplexType<std::string>;
@ -83,7 +84,42 @@ private:
DataTypeBitMap _bitmap_type;
};
TEST_F(ColumnBitmapTest, SerializeAndDeserialize) {
class ColumnQuantileStateTest : public testing::Test {
public:
virtual void SetUp() override {}
virtual void TearDown() override {}
void check_bitmap_column(const IColumn& l, const IColumn& r) {
ASSERT_EQ(l.size(), r.size());
const auto& l_col = assert_cast<const ColumnQuantileStateDouble&>(l);
const auto& r_col = assert_cast<const ColumnQuantileStateDouble&>(r);
for (size_t i = 0; i < l_col.size(); ++i) {
auto& l_value = const_cast<QuantileStateDouble&>(l_col.get_element(i));
auto& r_value = const_cast<QuantileStateDouble&>(r_col.get_element(i));
ASSERT_EQ(l_value.get_serialized_size(), r_value.get_serialized_size());
}
}
void check_serialize_and_deserialize(MutableColumnPtr& col) {
auto column = assert_cast<ColumnQuantileStateDouble*>(col.get());
auto size = _quantile_state_type.get_uncompressed_serialized_bytes(
*column, BeExecVersionManager::get_newest_version());
std::unique_ptr<char[]> buf = std::make_unique<char[]>(size);
auto result = _quantile_state_type.serialize(*column, buf.get(),
BeExecVersionManager::get_newest_version());
ASSERT_EQ(result, buf.get() + size);
auto column2 = _quantile_state_type.create_column();
_quantile_state_type.deserialize(buf.get(), column2.get(),
BeExecVersionManager::get_newest_version());
check_bitmap_column(*column, *column2.get());
}
private:
DataTypeQuantileStateDouble _quantile_state_type;
};
TEST_F(ColumnBitmapTest, ColumnBitmapReadWrite) {
auto column = _bitmap_type.create_column();
// empty column
@ -106,4 +142,31 @@ TEST_F(ColumnBitmapTest, SerializeAndDeserialize) {
check_serialize_and_deserialize(column);
}
TEST_F(ColumnQuantileStateTest, ColumnQuantileStateReadWrite) {
auto column = _quantile_state_type.create_column();
// empty column
check_serialize_and_deserialize(column);
// quantile column with lots of rows
const size_t row_size = 20000;
auto& data = assert_cast<ColumnQuantileStateDouble&>(*column.get()).get_data();
data.resize(row_size);
// EMPTY type
check_serialize_and_deserialize(column);
// SINGLE type
for (size_t i = 0; i < row_size; ++i) {
data[i].add_value(i);
}
check_serialize_and_deserialize(column);
// EXPLICIT type
for (size_t i = 0; i < row_size; ++i) {
data[i].add_value(i + 1);
}
// TDIGEST type
for (size_t i = 0; i < QUANTILE_STATE_EXPLICIT_NUM; ++i) {
data[0].add_value(i);
}
check_serialize_and_deserialize(column);
}
} // namespace doris::vectorized

View File

@ -1697,7 +1697,7 @@ public class Config extends ConfigBase {
* Default is false.
* */
@ConfField(mutable = true, masterOnly = true)
public static boolean enable_quantile_state_type = false;
public static boolean enable_quantile_state_type = true;
@ConfField
public static boolean enable_vectorized_load = true;

View File

@ -638,6 +638,9 @@ public class CreateFunctionStmt extends DdlStmt {
case BITMAP:
typeBuilder.setId(Types.PGenericType.TypeId.BITMAP);
break;
case QUANTILE_STATE:
typeBuilder.setId(Types.PGenericType.TypeId.QUANTILE_STATE);
break;
case DATE:
typeBuilder.setId(Types.PGenericType.TypeId.DATE);
break;

View File

@ -872,7 +872,6 @@ public class FunctionCallExpr extends Expr {
if (!getChild(1).isConstant()) {
throw new AnalysisException(fnName + "function's second argument should be constant");
}
throw new AnalysisException(fnName + "not support on vectorized engine now.");
}
if ((fnName.getFunction().equalsIgnoreCase("HLL_UNION_AGG")

View File

@ -106,6 +106,7 @@ message PGenericType {
JSONB = 31;
DECIMAL128I = 32;
VARIANT = 33;
QUANTILE_STATE = 34;
UNKNOWN = 999;
}
required TypeId id = 2;

View File

@ -1542,10 +1542,13 @@ visible_functions = [
[['bitmap_or_count'], 'BIGINT', ['BITMAP','BITMAP'], ''],
[['sub_bitmap'], 'BITMAP', ['BITMAP', 'BIGINT', 'BIGINT'], 'ALWAYS_NULLABLE'],
[['bitmap_to_array'], 'ARRAY_BIGINT', ['BITMAP'], ''],
# quantile_function
[['to_quantile_state'], 'QUANTILE_STATE', ['VARCHAR', 'FLOAT'], ''],
[['quantile_percent'], 'DOUBLE', ['QUANTILE_STATE', 'FLOAT'], ''],
# quantile_function
[['to_quantile_state'], 'QUANTILE_STATE', ['VARCHAR', 'FLOAT'], 'ALWAYS_NOT_NULLABLE'],
[['to_quantile_state'], 'QUANTILE_STATE', ['DOUBLE', 'FLOAT'], 'ALWAYS_NOT_NULLABLE'],
[['to_quantile_state'], 'QUANTILE_STATE', ['FLOAT', 'FLOAT'], 'ALWAYS_NOT_NULLABLE'],
[['to_quantile_state'], 'QUANTILE_STATE', ['BIGINT', 'FLOAT'], 'ALWAYS_NOT_NULLABLE'],
[['quantile_percent'], 'DOUBLE', ['QUANTILE_STATE', 'FLOAT'], 'ALWAYS_NOT_NULLABLE'],
# hash functions

View File

@ -0,0 +1,4 @@
insert into quantile_state_basic_agg values
(1,to_quantile_state(-1, 2048)),
(2,to_quantile_state(0, 2048)),(2,to_quantile_state(1, 2048)),
(3,to_quantile_state(0, 2048)),(3,to_quantile_state(1, 2048)),(3,to_quantile_state(2, 2048));

View File

@ -0,0 +1,6 @@
create TABLE if not exists `quantile_state_basic_agg` (
`k1` int(11) NULL,
`k2` QUANTILE_STATE QUANTILE_UNION NOT NULL
)AGGREGATE KEY(`k1`)
DISTRIBUTED BY HASH(`k1`) BUCKETS 1
PROPERTIES("replication_num" = "1");

View File

@ -14,3 +14,12 @@
2 1
3 2
-- !sql_quantile_state --
1 \N
2 \N
3 \N
-- !sql_quantile_state_percent --
1 -1.0
2 0.5
3 1.0

View File

@ -242,3 +242,18 @@ beijing chengdu shanghai
-- !select47 --
6
-- !select48 --
20220201 0 1.0
20220201 1 -1.0
20220202 2 0.0
-- !select49 --
20220201 0 1.0
20220201 1 1.0
20220202 2 2500.0
-- !select50 --
20220201 0 1.0
20220201 1 3.0
20220202 2 4999.0

View File

@ -255,3 +255,17 @@ beijing chengdu shanghai
-- !select47 --
6
-- !select48 --
20220201 0 1.0
20220201 1 -1.0
20220202 2 0.0
-- !select49 --
20220201 0 1.0
20220201 1 1.0
20220202 2 2500.0
-- !select50 --
20220201 0 1.0
20220201 1 3.0
20220202 2 4999.0

View File

@ -14,3 +14,12 @@
2 1
3 2
-- !sql_quantile_state --
1 \N
2 \N
3 \N
-- !sql_quantile_state_percent --
1 -1.0
2 0.5
3 1.0

View File

@ -16,7 +16,7 @@
// under the License.
suite("basic_agg_test") {
def tables=["bitmap_basic_agg","hll_basic_agg"]
def tables=["bitmap_basic_agg","hll_basic_agg","quantile_state_basic_agg"]
for (String table in tables) {
sql """drop table if exists ${table};"""
@ -29,4 +29,8 @@ suite("basic_agg_test") {
qt_sql_hll """select * from hll_basic_agg;"""
qt_sql_hll_cardinality """select k1, hll_cardinality(hll_union(k2)) from hll_basic_agg group by k1 order by k1;"""
qt_sql_quantile_state """select * from quantile_state_basic_agg;"""
qt_sql_quantile_state_percent """select k1, quantile_percent(quantile_union(k2), 0.5) from quantile_state_basic_agg group by k1 order by k1;"""
}

View File

@ -498,4 +498,43 @@ suite("test_aggregate_all_functions") {
qt_select46 """select * from ${tableName_12} where id>=5 and id <=5 and level >10 order by id,level;"""
qt_select47 """select count(*) from ${tableName_12}"""
def tableName_21 = "quantile_state_agg_test"
sql "DROP TABLE IF EXISTS ${tableName_21}"
sql """
CREATE TABLE IF NOT EXISTS ${tableName_21} (
`dt` int(11) NULL COMMENT "",
`id` int(11) NULL COMMENT "",
`price` quantile_state QUANTILE_UNION NOT NULL COMMENT ""
) ENGINE=OLAP
AGGREGATE KEY(`dt`, `id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`dt`) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
);
"""
sql """INSERT INTO ${tableName_21} values(20220201,0, to_quantile_state(1, 2048))"""
sql """INSERT INTO ${tableName_21} values(20220201,1, to_quantile_state(-1, 2048)),
(20220201,1, to_quantile_state(0, 2048)),(20220201,1, to_quantile_state(1, 2048)),
(20220201,1, to_quantile_state(2, 2048)),(20220201,1, to_quantile_state(3, 2048))
"""
List rows = new ArrayList()
for (int i = 0; i < 5000; ++i) {
rows.add([20220202, 2 , i])
}
streamLoad {
table "${tableName_21}"
set 'label', UUID.randomUUID().toString()
set 'columns', 'dt, id, price, price=to_quantile_state(price, 2048)'
inputIterator rows.iterator()
}
qt_select48 """select dt, id, quantile_percent(quantile_union(price), 0) from ${tableName_21} group by dt, id order by dt, id"""
qt_select49 """select dt, id, quantile_percent(quantile_union(price), 0.5) from ${tableName_21} group by dt, id order by dt, id"""
qt_select50 """select dt, id, quantile_percent(quantile_union(price), 1) from ${tableName_21} group by dt, id order by dt, id"""
}

View File

@ -495,4 +495,45 @@ suite("test_aggregate_all_functions") {
qt_select46 """select * from ${tableName_12} where id>=5 and id <=5 and level >10 order by id,level;"""
qt_select47 """select count(*) from ${tableName_12}"""
def tableName_21 = "quantile_state_agg_test"
sql "DROP TABLE IF EXISTS ${tableName_21}"
sql """
CREATE TABLE IF NOT EXISTS ${tableName_21} (
`dt` int(11) NULL COMMENT "",
`id` int(11) NULL COMMENT "",
`price` quantile_state QUANTILE_UNION NOT NULL COMMENT ""
) ENGINE=OLAP
AGGREGATE KEY(`dt`, `id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`dt`) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
);
"""
sql """INSERT INTO ${tableName_21} values(20220201,0, to_quantile_state(1, 2048))"""
sql """INSERT INTO ${tableName_21} values(20220201,1, to_quantile_state(-1, 2048)),
(20220201,1, to_quantile_state(0, 2048)),(20220201,1, to_quantile_state(1, 2048)),
(20220201,1, to_quantile_state(2, 2048)),(20220201,1, to_quantile_state(3, 2048))
"""
List rows = new ArrayList()
for (int i = 0; i < 5000; ++i) {
rows.add([20220202, 2 , i])
}
streamLoad {
table "${tableName_21}"
set 'label', UUID.randomUUID().toString()
set 'columns', 'dt, id, price, price=to_quantile_state(price, 2048)'
inputIterator rows.iterator()
}
qt_select48 """select dt, id, quantile_percent(quantile_union(price), 0) from ${tableName_21} group by dt, id order by dt, id"""
qt_select49 """select dt, id, quantile_percent(quantile_union(price), 0.5) from ${tableName_21} group by dt, id order by dt, id"""
qt_select50 """select dt, id, quantile_percent(quantile_union(price), 1) from ${tableName_21} group by dt, id order by dt, id"""
}

View File

@ -16,7 +16,7 @@
// under the License.
suite("basic_agg_test", "types") {
def tables=["bitmap_basic_agg","hll_basic_agg"]
def tables=["bitmap_basic_agg","hll_basic_agg", "quantile_state_basic_agg"]
for (String table in tables) {
sql """drop table if exists ${table};"""
@ -29,4 +29,8 @@ suite("basic_agg_test", "types") {
qt_sql_hll """select * from hll_basic_agg;"""
qt_sql_hll_cardinality """select k1, hll_cardinality(hll_union(k2)) from hll_basic_agg group by k1 order by k1;"""
qt_sql_quantile_state """select * from quantile_state_basic_agg;"""
qt_sql_quantile_state_percent """select k1, quantile_percent(quantile_union(k2), 0.5) from quantile_state_basic_agg group by k1 order by k1;"""
}