[Feature] Support exact percentile aggregate function (#6410)
Support to calculate the exact percentile value array of numeric column `col` at the given percentage(s).
This commit is contained in:
@ -31,6 +31,7 @@
|
||||
#include "runtime/string_value.h"
|
||||
#include "util/debug_util.h"
|
||||
#include "util/tdigest.h"
|
||||
#include "util/counts.h"
|
||||
|
||||
// TODO: this file should be cross compiled and then all of the builtin
|
||||
// aggregate functions will have a codegen enabled path. Then we can remove
|
||||
@ -172,6 +173,76 @@ void AggregateFunctions::count_remove(FunctionContext*, const AnyVal& src, BigIn
|
||||
}
|
||||
}
|
||||
|
||||
struct PercentileState {
|
||||
Counts counts;
|
||||
double quantile = -1.0;
|
||||
};
|
||||
|
||||
void AggregateFunctions::percentile_init(FunctionContext* ctx, StringVal* dst) {
|
||||
dst->is_null = false;
|
||||
dst->len = sizeof(PercentileState);
|
||||
dst->ptr = (uint8_t*) new PercentileState();
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void AggregateFunctions::percentile_update(FunctionContext* ctx, const T& src,
|
||||
const DoubleVal& quantile, StringVal* dst) {
|
||||
if (src.is_null) {
|
||||
return;
|
||||
}
|
||||
|
||||
DCHECK(dst->ptr != nullptr);
|
||||
DCHECK_EQ(sizeof(PercentileState), dst->len);
|
||||
|
||||
PercentileState* percentile = reinterpret_cast<PercentileState*>(dst->ptr);
|
||||
percentile->counts.increment(src.val, 1);
|
||||
percentile->quantile = quantile.val;
|
||||
}
|
||||
|
||||
void AggregateFunctions::percentile_merge(FunctionContext* ctx, const StringVal& src, StringVal* dst) {
|
||||
DCHECK(dst->ptr != nullptr);
|
||||
DCHECK_EQ(sizeof(PercentileState), dst->len);
|
||||
|
||||
double quantile;
|
||||
memcpy(&quantile, src.ptr, sizeof(double));
|
||||
|
||||
PercentileState* src_percentile = new PercentileState();
|
||||
src_percentile->quantile = quantile;
|
||||
src_percentile->counts.unserialize(src.ptr + sizeof(double));
|
||||
|
||||
PercentileState* dst_percentile = reinterpret_cast<PercentileState*>(dst->ptr);
|
||||
dst_percentile->counts.merge(&src_percentile->counts);
|
||||
if (dst_percentile->quantile == -1.0) {
|
||||
dst_percentile->quantile = quantile;
|
||||
}
|
||||
|
||||
delete src_percentile;
|
||||
}
|
||||
|
||||
StringVal AggregateFunctions::percentile_serialize(FunctionContext* ctx, const StringVal& src) {
|
||||
DCHECK(!src.is_null);
|
||||
|
||||
PercentileState* percentile = reinterpret_cast<PercentileState*>(src.ptr);
|
||||
uint32_t serialize_size = percentile->counts.serialized_size();
|
||||
StringVal result(ctx, sizeof(double) + serialize_size);
|
||||
memcpy(result.ptr, &percentile->quantile, sizeof(double));
|
||||
percentile->counts.serialize(result.ptr + sizeof(double));
|
||||
|
||||
delete percentile;
|
||||
return result;
|
||||
}
|
||||
|
||||
DoubleVal AggregateFunctions::percentile_finalize(FunctionContext* ctx, const StringVal& src) {
|
||||
DCHECK(!src.is_null);
|
||||
|
||||
PercentileState* percentile = reinterpret_cast<PercentileState*>(src.ptr);
|
||||
double quantile = percentile->quantile;
|
||||
auto result = percentile->counts.terminate(quantile);
|
||||
|
||||
delete percentile;
|
||||
return result;
|
||||
}
|
||||
|
||||
struct PercentileApproxState {
|
||||
public:
|
||||
PercentileApproxState() : digest(new TDigest()) {}
|
||||
@ -2696,6 +2767,9 @@ template void AggregateFunctions::offset_fn_update<DecimalV2Val>(FunctionContext
|
||||
const DecimalV2Val&,
|
||||
DecimalV2Val* dst);
|
||||
|
||||
template void AggregateFunctions::percentile_update<BigIntVal>(
|
||||
FunctionContext* ctx, const BigIntVal&, const DoubleVal&, StringVal*);
|
||||
|
||||
template void AggregateFunctions::percentile_approx_update<doris_udf::DoubleVal>(
|
||||
FunctionContext* ctx, const doris_udf::DoubleVal&, const doris_udf::DoubleVal&,
|
||||
doris_udf::StringVal*);
|
||||
|
||||
Reference in New Issue
Block a user