diff --git a/be/src/exprs/aggregate_functions.cpp b/be/src/exprs/aggregate_functions.cpp index dd9dd2ed88..825dd207d7 100644 --- a/be/src/exprs/aggregate_functions.cpp +++ b/be/src/exprs/aggregate_functions.cpp @@ -27,6 +27,7 @@ #include "runtime/datetime_value.h" #include "exprs/anyval_util.h" #include "exprs/hybird_set.h" +#include "util/tdigest.h" #include "util/debug_util.h" // TODO: this file should be cross compiled and then all of the builtin @@ -231,6 +232,79 @@ void AggregateFunctions::avg_update(FunctionContext* ctx, const T& src, StringVa ++avg->count; } +struct PercentileApproxState { +public: + PercentileApproxState() : digest(new TDigest()){ + } + ~PercentileApproxState() { + delete digest; + } + + TDigest *digest = nullptr; + double targetQuantile = -1.0; +}; + +void AggregateFunctions::percentile_approx_init(FunctionContext* ctx, StringVal* dst) { + dst->is_null = false; + dst->len = sizeof(PercentileApproxState); + dst->ptr = (uint8_t*) new PercentileApproxState(); +}; + +template +void AggregateFunctions::percentile_approx_update(FunctionContext* ctx, const T& src, const DoubleVal& quantile, StringVal* dst) { + if (src.is_null) { + return; + } + DCHECK(dst->ptr != NULL); + DCHECK_EQ(sizeof(PercentileApproxState), dst->len); + + PercentileApproxState* percentile = reinterpret_cast(dst->ptr); + percentile->digest->add(src.val); + percentile->targetQuantile = quantile.val; +} + +StringVal AggregateFunctions::percentile_approx_serialize(FunctionContext* ctx, const StringVal& src) { + DCHECK(!src.is_null); + + PercentileApproxState* percentile = reinterpret_cast(src.ptr); + uint32_t serialized_size = percentile->digest->serialized_size(); + StringVal result(ctx, sizeof(double) + serialized_size); + memcpy(result.ptr, &percentile->targetQuantile, sizeof(double)); + percentile->digest->serialize(result.ptr + sizeof(double)); + + delete percentile; + return result; +} + +void AggregateFunctions::percentile_approx_merge(FunctionContext* ctx, const StringVal& src, StringVal* dst) { + DCHECK(dst->ptr != NULL); + DCHECK_EQ(sizeof(PercentileApproxState), dst->len); + + double quantile; + memcpy(&quantile, src.ptr, sizeof(double)); + + PercentileApproxState *src_percentile = new PercentileApproxState(); + src_percentile->targetQuantile = quantile; + src_percentile->digest->unserialize(src.ptr + sizeof(double)); + + PercentileApproxState* dst_percentile = reinterpret_cast(dst->ptr); + dst_percentile->digest->merge(src_percentile->digest); + dst_percentile->targetQuantile = quantile; + + delete src_percentile; +} + +DoubleVal AggregateFunctions::percentile_approx_finalize(FunctionContext* ctx, const StringVal& src) { + DCHECK(!src.is_null); + + PercentileApproxState* percentile = reinterpret_cast(src.ptr); + double quantile = percentile->targetQuantile; + double result = percentile->digest->quantile(quantile); + + delete percentile; + return DoubleVal(result); +} + void AggregateFunctions::decimal_avg_update(FunctionContext* ctx, const DecimalVal& src, StringVal* dst) { @@ -2732,4 +2806,7 @@ template void AggregateFunctions::offset_fn_update( template void AggregateFunctions::offset_fn_update( FunctionContext*, const DecimalV2Val& src, const BigIntVal&, const DecimalV2Val&, DecimalV2Val* dst); + +template void AggregateFunctions::percentile_approx_update( + FunctionContext* ctx, const doris_udf::DoubleVal&, const doris_udf::DoubleVal&, doris_udf::StringVal*); } diff --git a/be/src/exprs/aggregate_functions.h b/be/src/exprs/aggregate_functions.h index df3e8ce740..76c1775e55 100644 --- a/be/src/exprs/aggregate_functions.h +++ b/be/src/exprs/aggregate_functions.h @@ -64,6 +64,19 @@ public: static void count_star_update(doris_udf::FunctionContext*, doris_udf::BigIntVal* dst); static void count_star_remove(FunctionContext*, BigIntVal* dst); + + // Impementation of percentile_approx + static void percentile_approx_init(doris_udf::FunctionContext* ctx, doris_udf::StringVal* dst); + + template + static void percentile_approx_update(FunctionContext* ctx, const T& src, const DoubleVal& quantile, StringVal* dst); + + static void percentile_approx_merge(FunctionContext* ctx, const StringVal& src, StringVal* dst); + + static DoubleVal percentile_approx_finalize(FunctionContext* ctx, const StringVal& src); + + static StringVal percentile_approx_serialize(FunctionContext* ctx, const StringVal& state_sv); + // Implementation of Avg. // TODO: Change this to use a fixed-sized BufferVal as intermediate type. static void avg_init(doris_udf::FunctionContext* ctx, doris_udf::StringVal* dst); diff --git a/be/src/util/tdigest.h b/be/src/util/tdigest.h new file mode 100644 index 0000000000..22654539e4 --- /dev/null +++ b/be/src/util/tdigest.h @@ -0,0 +1,750 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/* + * Licensed to Derrick R. Burns under one or more + * contributor license agreements. See the NOTICES file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// T-Digest : Percentile and Quantile Estimation of Big Data +// A new data structure for accurate on-line accumulation of rank-based statistics +// such as quantiles and trimmed means. +// See original paper: "Computing extremely accurate quantiles using t-digest" +// by Ted Dunning and Otmar Ertl for more details +// https://github.com/tdunning/t-digest/blob/07b8f2ca2be8d0a9f04df2feadad5ddc1bb73c88/docs/t-digest-paper/histo.pdf. +// https://github.com/derrickburns/tdigest + +#ifndef TDIGEST2_TDIGEST_H_ +#define TDIGEST2_TDIGEST_H_ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/logging.h" +#include "util/debug_util.h" +#include "udf/udf.h" + +namespace doris { + +using Value = double; +using Weight = double; +using Index = size_t; + +const size_t kHighWater = 40000; + +class Centroid { +public: + Centroid() : Centroid(0.0, 0.0) {} + + Centroid(Value mean, Weight weight) : _mean(mean), _weight(weight) {} + + inline Value mean() const noexcept { return _mean; } + + inline Weight weight() const noexcept { return _weight; } + + inline void add(const Centroid &c) { + DCHECK_GT(c._weight, 0); + if (_weight != 0.0) { + _weight += c._weight; + _mean += c._weight * (c._mean - _mean) / _weight; + } else { + _weight = c._weight; + _mean = c._mean; + } + } + +private: + Value _mean = 0; + Weight _weight = 0; +}; + +struct CentroidList { + CentroidList(const std::vector& s) : iter(s.cbegin()), end(s.cend()) {} + std::vector::const_iterator iter; + std::vector::const_iterator end; + + bool advance() { return ++iter != end; } +}; + +class CentroidListComparator { +public: + CentroidListComparator() {} + + bool operator()(const CentroidList &left, const CentroidList &right) const { + return left.iter->mean() > right.iter->mean(); + } +}; + +using CentroidListQueue = std::priority_queue, CentroidListComparator>; + +struct CentroidComparator { + bool operator()(const Centroid &a, const Centroid &b) const { return a.mean() < b.mean(); } +}; + +class TDigest { + + class TDigestComparator { + public: + TDigestComparator() {} + + bool operator()(const TDigest *left, const TDigest *right) const { + return left->totalSize() > right->totalSize(); + } + }; + using TDigestQueue = std::priority_queue, TDigestComparator>; + +public: + TDigest() : TDigest(1000) {} + + explicit TDigest(Value compression) : TDigest(compression, 0) {} + + TDigest(Value compression, Index bufferSize) : TDigest(compression, bufferSize, 0) {} + + TDigest(Value compression, Index unmergedSize, Index mergedSize) + : _compression(compression), + _max_processed(processedSize(mergedSize, compression)), + _max_unprocessed(unprocessedSize(unmergedSize, compression)) { + _processed.reserve(_max_processed); + _unprocessed.reserve(_max_unprocessed + 1); + } + + TDigest(std::vector&& processed, std::vector&& unprocessed, Value compression, + Index unmergedSize, Index mergedSize) + : TDigest(compression, unmergedSize, mergedSize) { + _processed = std::move(processed); + _unprocessed = std::move(unprocessed); + + _processed_weight = weight(_processed); + _unprocessed_weight = weight(_unprocessed); + if (_processed.size() > 0) { + _min = std::min(_min, _processed[0].mean()); + _max = std::max(_max, (_processed.cend() - 1)->mean()); + } + updateCumulative(); + } + + static Weight weight(std::vector& centroids) noexcept { + Weight w = 0.0; + for (auto centroid : centroids) { + w += centroid.weight(); + } + return w; + } + + TDigest& operator=(TDigest&& o) { + _compression = o._compression; + _max_processed = o._max_processed; + _max_unprocessed = o._max_unprocessed; + _processed_weight = o._processed_weight; + _unprocessed_weight = o._unprocessed_weight; + _processed = std::move(o._processed); + _unprocessed = std::move(o._unprocessed); + _cumulative = std::move(o._cumulative); + _min = o._min; + _max = o._max; + return *this; + } + + TDigest(TDigest&& o) + : TDigest(std::move(o._processed), std::move(o._unprocessed), o._compression, o._max_unprocessed, + o._max_processed) {} + + static inline Index processedSize(Index size, Value compression) noexcept { + return (size == 0) ? static_cast(2 * std::ceil(compression)) : size; + } + + static inline Index unprocessedSize(Index size, Value compression) noexcept { + return (size == 0) ? static_cast(8 * std::ceil(compression)) : size; + } + + // merge in another t-digest + inline void merge(const TDigest* other) { + std::vector others{other}; + add(others.cbegin(), others.cend()); + } + + const std::vector& processed() const { return _processed; } + + const std::vector& unprocessed() const { return _unprocessed; } + + Index maxUnprocessed() const { return _max_unprocessed; } + + Index maxProcessed() const { return _max_processed; } + + inline void add(std::vector digests) { add(digests.cbegin(), digests.cend()); } + + // merge in a vector of tdigests in the most efficient manner possible + // in constant space + // works for any value of kHighWater + void add(std::vector::const_iterator iter, std::vector::const_iterator end) { + if (iter != end) { + auto size = std::distance(iter, end); + TDigestQueue pq(TDigestComparator{}); + for (; iter != end; iter++) { + pq.push((*iter)); + } + std::vector batch; + batch.reserve(size); + + size_t totalSize = 0; + while (!pq.empty()) { + auto td = pq.top(); + batch.push_back(td); + pq.pop(); + totalSize += td->totalSize(); + if (totalSize >= kHighWater || pq.empty()) { + mergeProcessed(batch); + mergeUnprocessed(batch); + processIfNecessary(); + batch.clear(); + totalSize = 0; + } + } + updateCumulative(); + } + } + + Weight processedWeight() const { return _processed_weight; } + + Weight unprocessedWeight() const { return _unprocessed_weight; } + + bool haveUnprocessed() const { return _unprocessed.size() > 0; } + + size_t totalSize() const { return _processed.size() + _unprocessed.size(); } + + long totalWeight() const { return static_cast(_processed_weight + _unprocessed_weight); } + + // return the cdf on the t-digest + Value cdf(Value x) { + if (haveUnprocessed() || isDirty()) process(); + return cdfProcessed(x); + } + + bool isDirty() { return _processed.size() > _max_processed || _unprocessed.size() > _max_unprocessed; } + + // return the cdf on the processed values + Value cdfProcessed(Value x) const { + VLOG(1) << "cdf value " << x; + VLOG(1) << "processed size " << _processed.size(); + if (_processed.size() == 0) { + // no data to examin_e + VLOG(1) << "no processed values"; + + return 0.0; + } else if (_processed.size() == 1) { + VLOG(1) << "one processed value " + << " _min " << _min << " _max " << _max; + // exactly one centroid, should have _max==_min + auto width = _max - _min; + if (x < _min) { + return 0.0; + } else if (x > _max) { + return 1.0; + } else if (x - _min <= width) { + // _min and _max are too close together to do any viable interpolation + return 0.5; + } else { + // interpolate if somehow we have weight > 0 and _max != _min + return (x - _min) / (_max - _min); + } + } else { + auto n = _processed.size(); + if (x <= _min) { + VLOG(1) << "below _min " + << " _min " << _min << " x " << x; + return 0; + } + + if (x >= _max) { + VLOG(1) << "above _max " + << " _max " << _max << " x " << x; + return 1; + } + + // check for the left tail + if (x <= mean(0)) { + VLOG(1) << "left tail " + << " _min " << _min << " mean(0) " << mean(0) << " x " << x; + + // note that this is different than mean(0) > _min ... this guarantees interpolation works + if (mean(0) - _min > 0) { + return (x - _min) / (mean(0) - _min) * weight(0) / _processed_weight / 2.0; + } else { + return 0; + } + } + + // and the right tail + if (x >= mean(n - 1)) { + VLOG(1) << "right tail" + << " _max " << _max << " mean(n - 1) " << mean(n - 1) << " x " << x; + + if (_max - mean(n - 1) > 0) { + return 1.0 - (_max - x) / (_max - mean(n - 1)) * weight(n - 1) / _processed_weight / 2.0; + } else { + return 1; + } + } + + CentroidComparator cc; + auto iter = std::upper_bound(_processed.cbegin(), _processed.cend(), Centroid(x, 0), cc); + + auto i = std::distance(_processed.cbegin(), iter); + auto z1 = x - (iter - 1)->mean(); + auto z2 = (iter)->mean() - x; + DCHECK_LE(0.0, z1); + DCHECK_LE(0.0, z2); + VLOG(1) << "middle " + << " z1 " << z1 << " z2 " << z2 << " x " << x; + + return weightedAverage(_cumulative[i - 1], z2, _cumulative[i], z1) / _processed_weight; + } + } + + // this returns a quantile on the t-digest + Value quantile(Value q) { + if (haveUnprocessed() || isDirty()) process(); + return quantileProcessed(q); + } + + // this returns a quantile on the currently processed values without changing the t-digest + // the value will not represent the unprocessed values + Value quantileProcessed(Value q) const { + if (q < 0 || q > 1) { + VLOG(1) << "q should be in [0,1], got " << q; + return NAN; + } + + if (_processed.size() == 0) { + // no sorted means no data, no way to get a quantile + return NAN; + } else if (_processed.size() == 1) { + // with one data point, all quantiles lead to Rome + + return mean(0); + } + + // we know that there are at least two sorted now + auto n = _processed.size(); + + // if values were stored in a sorted array, index would be the offset we are Weighterested in + const auto index = q * _processed_weight; + + // at the boundaries, we return _min or _max + if (index <= weight(0) / 2.0) { + DCHECK_GT(weight(0), 0); + return _min + 2.0 * index / weight(0) * (mean(0) - _min); + } + + auto iter = std::lower_bound(_cumulative.cbegin(), _cumulative.cend(), index); + + if (iter + 1 != _cumulative.cend()) { + auto i = std::distance(_cumulative.cbegin(), iter); + auto z1 = index - *(iter - 1); + auto z2 = *(iter) - index; + // VLOG(1) << "z2 " << z2 << " index " << index << " z1 " << z1; + return weightedAverage(mean(i - 1), z2, mean(i), z1); + } + + DCHECK_LE(index, _processed_weight); + DCHECK_GE(index, _processed_weight - weight(n - 1) / 2.0); + + auto z1 = index - _processed_weight - weight(n - 1) / 2.0; + auto z2 = weight(n - 1) / 2 - z1; + return weightedAverage(mean(n - 1), z1, _max, z2); + } + + Value compression() const { return _compression; } + + void add(Value x) { add(x, 1); } + + inline void compress() { process(); } + + // add a single centroid to the unprocessed vector, processing previously unprocessed sorted if our limit has + // been reached. + inline bool add(Value x, Weight w) { + if (std::isnan(x)) { + return false; + } + _unprocessed.push_back(Centroid(x, w)); + _unprocessed_weight += w; + processIfNecessary(); + return true; + } + + inline void add(std::vector::const_iterator iter, std::vector::const_iterator end) { + while (iter != end) { + const size_t diff = std::distance(iter, end); + const size_t room = _max_unprocessed - _unprocessed.size(); + auto mid = iter + std::min(diff, room); + while (iter != mid) _unprocessed.push_back(*(iter++)); + if (_unprocessed.size() >= _max_unprocessed) { + process(); + } + } + } + + uint32_t serialized_size() { + return sizeof(Value) * 5 + sizeof(Index) * 2 + sizeof(size_t) * 3 + + _processed.size() * sizeof(Centroid) + + _unprocessed.size() * sizeof(Centroid) + + _cumulative.size() * sizeof(Weight); + } + + void serialize(uint8_t* writer) { + memcpy(writer, &_compression, sizeof(Value)); + writer += sizeof(Value); + memcpy(writer, &_min, sizeof(Value)); + writer += sizeof(Value); + memcpy(writer, &_max, sizeof(Value)); + writer += sizeof(Value); + memcpy(writer, &_max_processed, sizeof(Index)); + writer += sizeof(Index); + memcpy(writer, &_max_unprocessed, sizeof(Index)); + writer += sizeof(Index); + memcpy(writer, &_processed_weight, sizeof(Value)); + writer += sizeof(Value); + memcpy(writer, &_unprocessed_weight, sizeof(Value)); + writer += sizeof(Value); + + uint32_t size = _processed.size(); + memcpy(writer, &size, sizeof(uint32_t)); + writer += sizeof(uint32_t); + for (int i = 0; i < size; i++) { + memcpy(writer, &_processed[i], sizeof(Centroid)); + writer += sizeof(Centroid); + } + + size = _unprocessed.size(); + memcpy(writer, &size, sizeof(uint32_t)); + writer += sizeof(uint32_t); + for (int i = 0; i < size; i++) { + memcpy(writer, &_unprocessed[i], sizeof(Centroid)); + writer += sizeof(Centroid); + } + + size = _cumulative.size(); + memcpy(writer, &size, sizeof(uint32_t)); + writer += sizeof(uint32_t); + for (int i = 0; i < size; i++) { + memcpy(writer, &_cumulative[i], sizeof(Weight)); + writer += sizeof(Weight); + } + } + + void unserialize(const uint8_t* type_reader) { + memcpy(&_compression, type_reader, sizeof(Value)); + type_reader += sizeof(Value); + memcpy(&_min, type_reader, sizeof(Value)); + type_reader += sizeof(Value); + memcpy(&_max, type_reader, sizeof(Value)); + type_reader += sizeof(Value); + + memcpy(&_max_processed, type_reader, sizeof(Index)); + type_reader += sizeof(Index); + memcpy(&_max_unprocessed, type_reader, sizeof(Index)); + type_reader += sizeof(Index); + memcpy(&_processed_weight, type_reader, sizeof(Value)); + type_reader += sizeof(Value); + memcpy(&_unprocessed_weight, type_reader, sizeof(Value)); + type_reader += sizeof(Value); + + uint32_t size; + memcpy(&size, type_reader, sizeof(uint32_t)); + type_reader += sizeof(uint32_t); + _processed.resize(size); + for (int i = 0; i < size; i++) { + memcpy(&_processed[i], type_reader, sizeof(Centroid)); + type_reader += sizeof(Centroid); + } + memcpy(&size, type_reader, sizeof(uint32_t)); + type_reader += sizeof(uint32_t); + _unprocessed.resize(size); + for (int i = 0; i < size; i++) { + memcpy(&_unprocessed[i], type_reader, sizeof(Centroid)); + type_reader += sizeof(Centroid); + } + memcpy(&size, type_reader, sizeof(uint32_t)); + type_reader += sizeof(uint32_t); + _cumulative.resize(size); + for (int i = 0; i < size; i++) { + memcpy(&_cumulative[i], type_reader, sizeof(Weight)); + type_reader += sizeof(Weight); + } + } + +private: + Value _compression; + + Value _min = std::numeric_limits::max(); + + Value _max = std::numeric_limits::min(); + + Index _max_processed; + + Index _max_unprocessed; + + Value _processed_weight = 0.0; + + Value _unprocessed_weight = 0.0; + + std::vector _processed; + + std::vector _unprocessed; + + std::vector _cumulative; + + // return mean of i-th centroid + inline Value mean(int i) const noexcept { return _processed[i].mean(); } + + // return weight of i-th centroid + inline Weight weight(int i) const noexcept { return _processed[i].weight(); } + + // append all unprocessed centroids into current unprocessed vector + void mergeUnprocessed(const std::vector& tdigests) { + if (tdigests.size() == 0) return; + + size_t total = _unprocessed.size(); + for (auto& td : tdigests) { + total += td->_unprocessed.size(); + } + + _unprocessed.reserve(total); + for (auto& td : tdigests) { + _unprocessed.insert(_unprocessed.end(), td->_unprocessed.cbegin(), td->_unprocessed.cend()); + _unprocessed_weight += td->_unprocessed_weight; + } + } + + // merge all processed centroids together into a single sorted vector + void mergeProcessed(const std::vector& tdigests) { + if (tdigests.size() == 0) return; + + size_t + total = 0; + CentroidListQueue pq(CentroidListComparator{}); + for (auto &td : tdigests) { + auto &sorted = td->_processed; + auto size = sorted.size(); + if (size > 0) { + pq.push(CentroidList(sorted)); + total += size; + _processed_weight += td->_processed_weight; + } + } + if (total == 0) return; + + if (_processed.size() > 0) { + pq.push(CentroidList(_processed)); + total += _processed.size(); + } + + std::vector sorted; + VLOG(1) << "total " << total; + sorted.reserve(total); + + while (!pq.empty()) { + auto best = pq.top(); + pq.pop(); + sorted.push_back(*(best.iter)); + if (best.advance()) pq.push(best); + } + _processed = std::move(sorted); + if (_processed.size() > 0) { + _min = std::min(_min, _processed[0].mean()); + _max = std::max(_max, (_processed.cend() - 1)->mean()); + } + } + + inline void processIfNecessary() { + if (isDirty()) { + process(); + } + } + + void updateCumulative() { + const auto n = _processed.size(); + _cumulative.clear(); + _cumulative.reserve(n + 1); + auto previous = 0.0; + for (Index i = 0; i < n; i++) { + auto current = weight(i); + auto halfCurrent = current / 2.0; + _cumulative.push_back(previous + halfCurrent); + previous = previous + current; + } + _cumulative.push_back(previous); + } + + // merges _unprocessed centroids and _processed centroids together and processes them + // when complete, _unprocessed will be empty and _processed will have at most _max_processed centroids + inline void process() { + CentroidComparator cc; + std::sort(_unprocessed.begin(), _unprocessed.end(), cc); + auto count = _unprocessed.size(); + _unprocessed.insert(_unprocessed.end(), _processed.cbegin(), _processed.cend()); + std::inplace_merge(_unprocessed.begin(), _unprocessed.begin() + count, _unprocessed.end(), cc); + + _processed_weight += _unprocessed_weight; + _unprocessed_weight = 0; + _processed.clear(); + + _processed.push_back(_unprocessed[0]); + Weight wSoFar = _unprocessed[0].weight(); + Weight wLimit = _processed_weight * integratedQ(1.0); + + auto end = _unprocessed.end(); + for (auto iter = _unprocessed.cbegin() + 1; iter < end; iter++) { + auto& centroid = *iter; + Weight projectedW = wSoFar + centroid.weight(); + if (projectedW <= wLimit) { + wSoFar = projectedW; + (_processed.end() - 1)->add(centroid); + } else { + auto k1 = integratedLocation(wSoFar / _processed_weight); + wLimit = _processed_weight * integratedQ(k1 + 1.0); + wSoFar += centroid.weight(); + _processed.emplace_back(centroid); + } + } + _unprocessed.clear(); + _min = std::min(_min, _processed[0].mean()); + VLOG(1) << "new _min " << _min; + _max = std::max(_max, (_processed.cend() - 1)->mean()); + VLOG(1) << "new _max " << _max; + updateCumulative(); + } + + inline int checkWeights() { return checkWeights(_processed, _processed_weight); } + + size_t checkWeights(const std::vector& sorted, Value total) { + size_t + badWeight = 0; + auto k1 = 0.0; + auto q = 0.0; + for (auto iter = sorted.cbegin(); iter != sorted.cend(); iter++) { + auto w = iter->weight(); + auto dq = w / total; + auto k2 = integratedLocation(q + dq); + if (k2 - k1 > 1 && w != 1) { + VLOG(1) << "Oversize centroid at " << std::distance(sorted.cbegin(), iter) << " k1 " << k1 << " k2 " + << k2 + << " dk " << (k2 - k1) << " w " << w << " q " << q; + badWeight++; + } + if (k2 - k1 > 1.5 && w != 1) { + VLOG(1) << "Egregiously Oversize centroid at " << std::distance(sorted.cbegin(), iter) << " k1 " << k1 + << " k2 " << k2 << " dk " << (k2 - k1) << " w " << w << " q " << q; + badWeight++; + } + q += dq; + k1 = k2; + } + + return badWeight; + } + + /** + * Converts a quantile into a centroid scale value. The centroid scale is nomin_ally + * the number k of the centroid that a quantile point q should belong to. Due to + * round-offs, however, we can't align things perfectly without splitting points + * and sorted. We don't want to do that, so we have to allow for offsets. + * In the end, the criterion is that any quantile range that spans a centroid + * scale range more than one should be split across more than one centroid if + * possible. This won't be possible if the quantile range refers to a single point + * or an already existing centroid. + *

+ * This mapping is steep near q=0 or q=1 so each centroid there will correspond to + * less q range. Near q=0.5, the mapping is flatter so that sorted there will + * represent a larger chunk of quantiles. + * + * @param q The quantile scale value to be mapped. + * @return The centroid scale value corresponding to q. + */ + inline Value integratedLocation(Value q) const { + return _compression * (std::asin(2.0 * q - 1.0) + M_PI / 2) / M_PI; + } + + inline Value integratedQ(Value k) const { + return (std::sin(std::min(k, _compression) * M_PI / _compression - M_PI / 2) + 1) / 2; + } + + /** + * Same as {@link #weightedAverageSorted(Value, Value, Value, Value)} but flips + * the order of the variables if x2 is greater than + * x1. + */ + static Value weightedAverage(Value x1, Value w1, Value x2, Value w2) { + return (x1 <= x2) ? weightedAverageSorted(x1, w1, x2, w2) : weightedAverageSorted(x2, w2, x1, w1); + } + + /** + * Compute the weighted average between x1 with a weight of + * w1 and x2 with a weight of w2. + * This expects x1 to be less than or equal to x2 + * and is guaranteed to return a number between x1 and + * x2. + */ + static Value weightedAverageSorted(Value x1, Value w1, Value x2, Value w2) { + DCHECK_LE(x1, x2); + const Value x = (x1 * w1 + x2 * w2) / (w1 + w2); + return std::max(x1, std::min(x, x2)); + } + + static Value interpolate(Value x, Value x0, Value x1) { return (x - x0) / (x1 - x0); } + + /** + * Computes an interpolated value of a quantile that is between two sorted. + * + * Index is the quantile desired multiplied by the total number of samples - 1. + * + * @param index Denormalized quantile desired + * @param previousIndex The denormalized quantile corresponding to the center of the previous centroid. + * @param nextIndex The denormalized quantile corresponding to the center of the following centroid. + * @param previousMean The mean of the previous centroid. + * @param nextMean The mean of the following centroid. + * @return The interpolated mean. + */ + static Value quantile(Value index, Value previousIndex, Value nextIndex, Value previousMean, Value nextMean) { + const auto delta = nextIndex - previousIndex; + const auto previousWeight = (nextIndex - index) / delta; + const auto nextWeight = (index - previousIndex) / delta; + return previousMean * previousWeight + nextMean * nextWeight; + } +}; + +} // namespace doris + +#endif // TDIGEST2_TDIGEST_H_ diff --git a/be/test/exprs/CMakeLists.txt b/be/test/exprs/CMakeLists.txt index c9cab10d78..25c95aeb42 100644 --- a/be/test/exprs/CMakeLists.txt +++ b/be/test/exprs/CMakeLists.txt @@ -28,4 +28,5 @@ ADD_BE_TEST(json_function_test) ADD_BE_TEST(hybird_set_test) ADD_BE_TEST(string_functions_test) ADD_BE_TEST(timestamp_functions_test) +ADD_BE_TEST(percentile_approx_test) #ADD_BE_TEST(in-predicate-test) diff --git a/be/test/exprs/percentile_approx_test.cpp b/be/test/exprs/percentile_approx_test.cpp new file mode 100644 index 0000000000..5f3df0832b --- /dev/null +++ b/be/test/exprs/percentile_approx_test.cpp @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exprs/aggregate_functions.h" +#include "testutil/function_utils.h" +#include + +namespace doris { + +class PercentileApproxTest : public testing::Test { +public: + PercentileApproxTest() {} +}; + +TEST_F(PercentileApproxTest, testSample) { + FunctionUtils* futil = new FunctionUtils(); + doris_udf::FunctionContext *context = futil->get_fn_ctx(); + + DoubleVal doubleQ(0.9); + + StringVal stringVal1; + DoubleVal int1(1); + AggregateFunctions::percentile_approx_init(context, &stringVal1); + AggregateFunctions::percentile_approx_update(context, int1, doubleQ, &stringVal1); + DoubleVal int2(2); + AggregateFunctions::percentile_approx_update(context, int2, doubleQ, &stringVal1); + + StringVal s = AggregateFunctions::percentile_approx_serialize(context, stringVal1); + + StringVal stringVal2; + AggregateFunctions::percentile_approx_init(context, &stringVal2); + AggregateFunctions::percentile_approx_merge(context, s, &stringVal2); + DoubleVal v = AggregateFunctions::percentile_approx_finalize(context, stringVal2); + ASSERT_EQ(v.val, 2); +} + +TEST_F(PercentileApproxTest, testNoMerge) { + FunctionUtils* futil = new FunctionUtils(); + doris_udf::FunctionContext *context = futil->get_fn_ctx(); + + DoubleVal doubleQ(0.9); + + StringVal stringVal1; + DoubleVal val(1); + AggregateFunctions::percentile_approx_init(context, &stringVal1); + AggregateFunctions::percentile_approx_update(context, val, doubleQ, &stringVal1); + DoubleVal val2(2); + AggregateFunctions::percentile_approx_update(context, val2, doubleQ, &stringVal1); + + DoubleVal v = AggregateFunctions::percentile_approx_finalize(context, stringVal1); + ASSERT_EQ(v.val, 2); +} + +TEST_F(PercentileApproxTest, testSerialize) { + FunctionUtils* futil = new FunctionUtils(); + doris_udf::FunctionContext *context = futil->get_fn_ctx(); + + DoubleVal doubleQ(0.999); + StringVal stringVal; + AggregateFunctions::percentile_approx_init(context, &stringVal); + + for (int i = 1 ;i <= 100000 ; i++) { + DoubleVal val(i); + AggregateFunctions::percentile_approx_update(context, val, doubleQ, &stringVal); + } + StringVal serialized = AggregateFunctions::percentile_approx_serialize(context, stringVal); + + // mock serialize + StringVal stringVal2; + AggregateFunctions::percentile_approx_init(context, &stringVal2); + AggregateFunctions::percentile_approx_merge(context, serialized, &stringVal2); + DoubleVal v = AggregateFunctions::percentile_approx_finalize(context, stringVal2); + ASSERT_DOUBLE_EQ(v.val, 99900.5); +} + +TEST_F(PercentileApproxTest, testNullVale) { + FunctionUtils* futil = new FunctionUtils(); + doris_udf::FunctionContext *context = futil->get_fn_ctx(); + + DoubleVal doubleQ(0.999); + StringVal stringVal; + AggregateFunctions::percentile_approx_init(context, &stringVal); + + for (int i = 1 ;i <= 100000 ; i++) { + if (i % 3 == 0) { + AggregateFunctions::percentile_approx_update(context, DoubleVal::null(), doubleQ, &stringVal); + } else { + AggregateFunctions::percentile_approx_update(context, DoubleVal(i), doubleQ, &stringVal); + } + } + StringVal serialized = AggregateFunctions::percentile_approx_serialize(context, stringVal); + + // mock serialize + StringVal stringVal2; + AggregateFunctions::percentile_approx_init(context, &stringVal2); + AggregateFunctions::percentile_approx_merge(context, serialized, &stringVal2); + DoubleVal v = AggregateFunctions::percentile_approx_finalize(context, stringVal2); + //ASSERT_EQ(v.val, 99900.5); + ASSERT_DOUBLE_EQ(v.val, 99900.499499999991); +} + +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/be/test/util/CMakeLists.txt b/be/test/util/CMakeLists.txt index 3d5db0fc4c..36cdc5b72d 100644 --- a/be/test/util/CMakeLists.txt +++ b/be/test/util/CMakeLists.txt @@ -42,3 +42,4 @@ ADD_BE_TEST(md5_test) ADD_BE_TEST(bitmap_test) ADD_BE_TEST(faststring_test) ADD_BE_TEST(rle_encoding_test) +ADD_BE_TEST(tdigest_test) diff --git a/be/test/util/tdigest_test.cpp b/be/test/util/tdigest_test.cpp new file mode 100644 index 0000000000..b0aef3e2b9 --- /dev/null +++ b/be/test/util/tdigest_test.cpp @@ -0,0 +1,260 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include "util/tdigest.h" + +namespace doris { + +class TDigestTest : public ::testing::Test { +protected: + // You can remove any or all of the following functions if its body + // is empty. + TDigestTest() { + // You can do set-up work for each test here. + } + + virtual ~TDigestTest() { + // You can do clean-up work that doesn't throw exceptions here. + } + + // If the constructor and destructor are not enough for setting up + // and cleaning up each test, you can define the following methods: + + virtual void SetUp() { + // Code here will be called immediately after the constructor (right + // before each test). + } + + virtual void TearDown() { + // Code here will be called immediately after each test (right + // before the destructor). + } + + static void SetUpTestCase() { + static bool initialized = false; + if (!initialized) { + FLAGS_logtostderr = true; + google::InstallFailureSignalHandler(); + google::InitGoogleLogging("testing::TDigestTest"); + initialized = true; + } + } + + // Objects declared here can be used by all tests in the test case for Foo. +}; + +static double cdf(const double x, const std::vector& data) { + int n1 = 0; + int n2 = 0; + for (auto v : data) { + n1 += (v < x) ? 1 : 0; + n2 += (v <= x) ? 1 : 0; + } + return (n1 + n2) / 2.0 / data.size(); +} + +static double quantile(const double q, const std::vector& values) { + double q1; + if (values.size() == 0) { + q1 = NAN; + } else if (q == 1 || values.size() == 1) { + q1 = values[values.size() - 1]; + } else { + auto index = q * values.size(); + if (index < 0.5) { + q1 = values[0]; + } else if (values.size() - index < 0.5) { + q1 = values[values.size() - 1]; + } else { + index -= 0.5; + const int intIndex = static_cast(index); + q1 = values[intIndex + 1] * (index - intIndex) + values[intIndex] * (intIndex + 1 - index); + } + } + return q1; +} + +TEST_F(TDigestTest, CrashAfterMerge) { + TDigest digest(1000); + std::uniform_real_distribution<> reals(0.0, 1.0); + std::random_device gen; + for (int i = 0; i < 100000; i++) { + digest.add(reals(gen)); + } + digest.compress(); + + TDigest digest2(1000); + digest2.merge(&digest); + digest2.quantile(0.5); +} + +TEST_F(TDigestTest, EmptyDigest) { + TDigest digest(100); + EXPECT_EQ(0, digest.processed().size()); +} + +TEST_F(TDigestTest, SingleValue) { + TDigest digest(100); + std::random_device gen; + std::uniform_real_distribution<> dist(0, 1000); + const auto value = dist(gen); + digest.add(value); + std::uniform_real_distribution<> dist2(0, 1.0); + const double q = dist2(gen); + EXPECT_NEAR(value, digest.quantile(0.0), 0.001f); + EXPECT_NEAR(value, digest.quantile(q), 0.001f); + EXPECT_NEAR(value, digest.quantile(1.0), 0.001f); +} + +TEST_F(TDigestTest, FewValues) { + // When there are few values in the tree, quantiles should be exact + TDigest digest(1000); + + std::random_device gen; + std::uniform_real_distribution<> reals(0.0, 100.0); + std::uniform_int_distribution<> dist(0, 10); + std::uniform_int_distribution<> bools(0, 1); + std::uniform_real_distribution<> qvalue(0.0, 1.0); + + const auto length = 10;//dist(gen); + + std::vector values; + values.reserve(length); + for (int i = 0; i < length; ++i) { + auto const value = (i == 0 || bools(gen)) ? reals(gen) : values[i - 1]; + digest.add(value); + values.push_back(value); + } + std::sort(values.begin(), values.end()); + digest.compress(); + + EXPECT_EQ(digest.processed().size(), values.size()); + + std::vector testValues{0.0, 1.0e-10, qvalue(gen), 0.5, 1.0 - 1e-10, 1.0}; + for (auto q : testValues) { + double q1 = quantile(q, values); + auto q2 = digest.quantile(q); + if (std::isnan(q1)) { + EXPECT_TRUE(std::isnan(q2)); + } else { + EXPECT_NEAR(q1, q2, 0.03) << "q = " << q; + } + } +} + +TEST_F(TDigestTest, MoreThan2BValues) { + TDigest digest(1000); + + std::random_device gen; + std::uniform_real_distribution<> reals(0.0, 1.0); + for (int i = 0; i < 1000; ++i) { + const double next = reals(gen); + digest.add(next); + } + for (int i = 0; i < 10; ++i) { + const double next = reals(gen); + const auto count = 1L << 28; + digest.add(next, count); + } + EXPECT_EQ(1000 + 10L * (1 << 28), digest.totalWeight()); + EXPECT_GT(digest.totalWeight(), std::numeric_limits::max()); + std::vector quantiles{0, 0.1, 0.5, 0.9, 1, reals(gen)}; + std::sort(quantiles.begin(), quantiles.end()); + auto prev = std::numeric_limits::min(); + for (double q : quantiles) { + const double v = digest.quantile(q); + EXPECT_GE(v, prev) << "q = " << q; + prev = v; + } +} + +TEST_F(TDigestTest, MergeTest) { + + TDigest digest1(1000); + TDigest digest2(1000); + + digest2.add(std::vector {&digest1}); +} + +TEST_F(TDigestTest, TestSorted) { + TDigest digest(1000); + std::uniform_real_distribution<> reals(0.0, 1.0); + std::uniform_int_distribution<> ints(0, 10); + + std::random_device gen; + for (int i = 0; i < 10000; ++i) { + digest.add(reals(gen), 1 + ints(gen)); + } + digest.compress(); + Centroid previous(0, 0); + for (auto centroid : digest.processed()) { + if (previous.weight() != 0) { + CHECK_LE(previous.mean(), centroid.mean()); + } + previous = centroid; + } +} + +TEST_F(TDigestTest, ExtremeQuantiles) { + TDigest digest(1000); + // t-digest shouldn't merge extreme nodes, but let's still test how it would + // answer to extreme quantiles in that case ('extreme' in the sense that the + // quantile is either before the first node or after the last one) + + digest.add(10, 3); + digest.add(20, 1); + digest.add(40, 5); + // this group tree is roughly equivalent to the following sorted array: + // [ ?, 10, ?, 20, ?, ?, 50, ?, ? ] + // and we expect it to compute approximate missing values: + // [ 5, 10, 15, 20, 30, 40, 50, 60, 70] + std::vector values{5.0, 10.0, 15.0, 20.0, 30.0, 35.0, 40.0, 45.0, 50.0}; + std::vector quantiles{1.5 / 9.0, 3.5 / 9.0, 6.5 / 9.0}; + for (auto q : quantiles) { + EXPECT_NEAR(quantile(q, values), digest.quantile(q), 0.01) << "q = " << q; + } +} + +TEST_F(TDigestTest, Montonicity) { + TDigest digest(1000); + std::uniform_real_distribution<> reals(0.0, 1.0); + std::random_device gen; + for (int i = 0; i < 100000; i++) { + digest.add(reals(gen)); + } + + double lastQuantile = -1; + double lastX = -1; + for (double z = 0; z <= 1; z += 1e-5) { + double x = digest.quantile(z); + EXPECT_GE(x, lastX); + lastX = x; + + double q = digest.cdf(z); + EXPECT_GE(q, lastQuantile); + lastQuantile = q; + } +} + +} // namespace stesting + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/fe/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java b/fe/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java index 0591c65220..6e3cc297a3 100644 --- a/fe/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java +++ b/fe/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java @@ -397,6 +397,16 @@ public class FunctionCallExpr extends Expr { || fnName.getFunction().equalsIgnoreCase("HLL_UNION_AGG")) { fnParams.setIsDistinct(false); } + + if (fnName.getFunction().equalsIgnoreCase("percentile_approx")) { + if (children.size() != 2) { + throw new AnalysisException("percentile_approx(expr, DOUBLE) requires two parameters"); + } + if (!getChild(1).isConstant()) { + throw new AnalysisException("percentile_approx requires second parameter must be a constant : " + + this.toSql()); + } + } return; } diff --git a/fe/src/main/java/org/apache/doris/catalog/FunctionSet.java b/fe/src/main/java/org/apache/doris/catalog/FunctionSet.java index 6d175a50cf..264339ad49 100644 --- a/fe/src/main/java/org/apache/doris/catalog/FunctionSet.java +++ b/fe/src/main/java/org/apache/doris/catalog/FunctionSet.java @@ -957,6 +957,16 @@ public class FunctionSet { null, false, true, false)); } + //PercentileApprox + addBuiltin(AggregateFunction.createBuiltin("percentile_approx", + Lists.newArrayList(Type.DOUBLE, Type.DOUBLE), Type.DOUBLE, Type.VARCHAR, + prefix + "22percentile_approx_initEPN9doris_udf15FunctionContextEPNS1_9StringValE", + prefix + "24percentile_approx_updateIN9doris_udf9DoubleValEEEvPNS2_15FunctionContextERKT_RKS3_PNS2_9StringValE", + prefix + "23percentile_approx_mergeEPN9doris_udf15FunctionContextERKNS1_9StringValEPS4_", + prefix + "27percentile_approx_serializeEPN9doris_udf15FunctionContextERKNS1_9StringValE", + prefix + "26percentile_approx_finalizeEPN9doris_udf15FunctionContextERKNS1_9StringValE", + false, false, false)); + // Avg // TODO: switch to CHAR(sizeof(AvgIntermediateType) when that becomes available diff --git a/run-ut.sh b/run-ut.sh index b839bfb573..eda8bf8931 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -153,6 +153,7 @@ ${DORIS_TEST_BINARY_DIR}/util/aes_util_test ${DORIS_TEST_BINARY_DIR}/util/string_util_test ${DORIS_TEST_BINARY_DIR}/util/coding_test ${DORIS_TEST_BINARY_DIR}/util/faststring_test +${DORIS_TEST_BINARY_DIR}/util/tdigest_test ## Running common Unittest ${DORIS_TEST_BINARY_DIR}/common/resource_tls_test @@ -161,6 +162,7 @@ ${DORIS_TEST_BINARY_DIR}/common/resource_tls_test ${DORIS_TEST_BINARY_DIR}/exprs/string_functions_test ${DORIS_TEST_BINARY_DIR}/exprs/json_function_test ${DORIS_TEST_BINARY_DIR}/exprs/timestamp_functions_test +${DORIS_TEST_BINARY_DIR}/exprs/percentile_approx_test ## Running geo unit test ${DORIS_TEST_BINARY_DIR}/geo/geo_functions_test