[Feature](function) support group concat with distinct and order by (#38851)
pick from #38744 and #38776
This commit is contained in:
@ -43,6 +43,8 @@ class AggregateFunctionBitmapCount;
|
||||
template <typename Op>
|
||||
class AggregateFunctionBitmapOp;
|
||||
struct AggregateFunctionBitmapUnionOp;
|
||||
class IAggregateFunction;
|
||||
using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>;
|
||||
|
||||
using DataTypePtr = std::shared_ptr<const IDataType>;
|
||||
using DataTypes = std::vector<DataTypePtr>;
|
||||
@ -178,11 +180,6 @@ public:
|
||||
const size_t offset, IColumn& to,
|
||||
const size_t num_rows) const = 0;
|
||||
|
||||
/** Returns true for aggregate functions of type -State.
|
||||
* They are executed as other aggregate functions, but not finalized (return an aggregation state that can be combined with another).
|
||||
*/
|
||||
virtual bool is_state() const { return false; }
|
||||
|
||||
/** Contains a loop with calls to "add" function. You can collect arguments into array "places"
|
||||
* and do a single call to "add_batch" for devirtualization and inlining.
|
||||
*/
|
||||
@ -223,6 +220,8 @@ public:
|
||||
|
||||
virtual void set_version(const int version_) { version = version_; }
|
||||
|
||||
virtual AggregateFunctionPtr transmit_to_stable() { return nullptr; }
|
||||
|
||||
protected:
|
||||
DataTypes argument_types;
|
||||
int version {};
|
||||
@ -519,8 +518,6 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>;
|
||||
|
||||
class AggregateFunctionGuard {
|
||||
public:
|
||||
using AggregateData = std::remove_pointer_t<AggregateDataPtr>;
|
||||
|
||||
@ -29,6 +29,16 @@
|
||||
|
||||
namespace doris::vectorized {
|
||||
|
||||
template <typename T>
|
||||
struct Reducer {
|
||||
template <bool stable>
|
||||
using Output = AggregateFunctionDistinctSingleNumericData<T, stable>;
|
||||
using AggregateFunctionDistinctNormal = AggregateFunctionDistinct<Output, false>;
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
using AggregateFunctionDistinctNumeric = Reducer<T>::AggregateFunctionDistinctNormal;
|
||||
|
||||
class AggregateFunctionCombinatorDistinct final : public IAggregateFunctionCombinator {
|
||||
public:
|
||||
String get_name() const override { return "Distinct"; }
|
||||
@ -51,22 +61,15 @@ public:
|
||||
|
||||
if (arguments.size() == 1) {
|
||||
AggregateFunctionPtr res(
|
||||
creator_with_numeric_type::create<AggregateFunctionDistinct,
|
||||
AggregateFunctionDistinctSingleNumericData>(
|
||||
creator_with_numeric_type::create<AggregateFunctionDistinctNumeric>(
|
||||
arguments, result_is_nullable, nested_function));
|
||||
if (res) {
|
||||
return res;
|
||||
}
|
||||
|
||||
if (arguments[0]->is_value_unambiguously_represented_in_contiguous_memory_region()) {
|
||||
res = creator_without_type::create<AggregateFunctionDistinct<
|
||||
AggregateFunctionDistinctSingleGenericData<true>>>(
|
||||
arguments, result_is_nullable, nested_function);
|
||||
} else {
|
||||
res = creator_without_type::create<AggregateFunctionDistinct<
|
||||
AggregateFunctionDistinctSingleGenericData<false>>>(
|
||||
arguments, result_is_nullable, nested_function);
|
||||
}
|
||||
res = creator_without_type::create<
|
||||
AggregateFunctionDistinct<AggregateFunctionDistinctSingleGenericData>>(
|
||||
arguments, result_is_nullable, nested_function);
|
||||
return res;
|
||||
}
|
||||
return creator_without_type::create<
|
||||
|
||||
@ -28,6 +28,8 @@
|
||||
#include <memory>
|
||||
#include <new>
|
||||
#include <string>
|
||||
#include <type_traits>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "vec/aggregate_functions/aggregate_function.h"
|
||||
@ -54,105 +56,170 @@ struct DefaultHash;
|
||||
|
||||
namespace doris::vectorized {
|
||||
|
||||
template <typename T>
|
||||
template <typename T, bool stable>
|
||||
struct AggregateFunctionDistinctSingleNumericData {
|
||||
/// When creating, the hash table must be small.
|
||||
using Set = HashSetWithStackMemory<T, DefaultHash<T>, 4>;
|
||||
using Self = AggregateFunctionDistinctSingleNumericData<T>;
|
||||
Set set;
|
||||
using Container = std::conditional_t<stable, phmap::flat_hash_map<T, uint32_t>,
|
||||
HashSetWithStackMemory<T, DefaultHash<T>, 4>>;
|
||||
using Self = AggregateFunctionDistinctSingleNumericData<T, stable>;
|
||||
Container data;
|
||||
|
||||
void add(const IColumn** columns, size_t /* columns_num */, size_t row_num, Arena*) {
|
||||
const auto& vec = assert_cast<const ColumnVector<T>&>(*columns[0]).get_data();
|
||||
set.insert(vec[row_num]);
|
||||
}
|
||||
|
||||
void merge(const Self& rhs, Arena*) { set.merge(rhs.set); }
|
||||
|
||||
void serialize(BufferWritable& buf) const { set.write(buf); }
|
||||
|
||||
void deserialize(BufferReadable& buf, Arena*) { set.read(buf); }
|
||||
|
||||
MutableColumns get_arguments(const DataTypes& argument_types) const {
|
||||
MutableColumns argument_columns;
|
||||
argument_columns.emplace_back(argument_types[0]->create_column());
|
||||
for (const auto& elem : set) {
|
||||
argument_columns[0]->insert(elem.get_value());
|
||||
if constexpr (stable) {
|
||||
data.emplace(vec[row_num], data.size());
|
||||
} else {
|
||||
data.insert(vec[row_num]);
|
||||
}
|
||||
|
||||
return argument_columns;
|
||||
}
|
||||
};
|
||||
|
||||
struct AggregateFunctionDistinctGenericData {
|
||||
/// When creating, the hash table must be small.
|
||||
using Set = HashSetWithStackMemory<StringRef, StringRefHash, 4>;
|
||||
using Self = AggregateFunctionDistinctGenericData;
|
||||
Set set;
|
||||
|
||||
void merge(const Self& rhs, Arena* arena) {
|
||||
Set::LookupResult it;
|
||||
bool inserted;
|
||||
for (const auto& elem : rhs.set) {
|
||||
StringRef key = elem.get_value();
|
||||
key.data = arena->insert(key.data, key.size);
|
||||
set.emplace(key, it, inserted);
|
||||
void merge(const Self& rhs, Arena*) {
|
||||
DCHECK(!stable);
|
||||
if constexpr (!stable) {
|
||||
data.merge(rhs.data);
|
||||
}
|
||||
}
|
||||
|
||||
void serialize(BufferWritable& buf) const {
|
||||
write_var_uint(set.size(), buf);
|
||||
for (const auto& elem : set) {
|
||||
write_string_binary(elem.get_value(), buf);
|
||||
DCHECK(!stable);
|
||||
if constexpr (!stable) {
|
||||
data.write(buf);
|
||||
}
|
||||
}
|
||||
|
||||
void deserialize(BufferReadable& buf, Arena* arena) {
|
||||
UInt64 size;
|
||||
read_var_uint(size, buf);
|
||||
|
||||
StringRef ref;
|
||||
for (size_t i = 0; i < size; ++i) {
|
||||
read_string_binary(ref, buf);
|
||||
set.insert(ref);
|
||||
void deserialize(BufferReadable& buf, Arena*) {
|
||||
DCHECK(!stable);
|
||||
if constexpr (!stable) {
|
||||
data.read(buf);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
template <bool is_plain_column>
|
||||
struct AggregateFunctionDistinctSingleGenericData : public AggregateFunctionDistinctGenericData {
|
||||
void add(const IColumn** columns, size_t /* columns_num */, size_t row_num, Arena* arena) {
|
||||
Set::LookupResult it;
|
||||
bool inserted;
|
||||
auto key = columns[0]->get_data_at(row_num);
|
||||
key.data = arena->insert(key.data, key.size);
|
||||
set.emplace(key, it, inserted);
|
||||
}
|
||||
|
||||
MutableColumns get_arguments(const DataTypes& argument_types) const {
|
||||
MutableColumns argument_columns;
|
||||
argument_columns.emplace_back(argument_types[0]->create_column());
|
||||
for (const auto& elem : set) {
|
||||
argument_columns[0]->insert_data(elem.get_value().data, elem.get_value().size);
|
||||
|
||||
if constexpr (stable) {
|
||||
argument_columns[0]->resize(data.size());
|
||||
auto ptr = (T*)const_cast<char*>(argument_columns[0]->get_raw_data().data);
|
||||
for (auto it : data) {
|
||||
ptr[it.second] = it.first;
|
||||
}
|
||||
} else {
|
||||
for (const auto& elem : data) {
|
||||
argument_columns[0]->insert(elem.get_value());
|
||||
}
|
||||
}
|
||||
|
||||
return argument_columns;
|
||||
}
|
||||
};
|
||||
|
||||
struct AggregateFunctionDistinctMultipleGenericData : public AggregateFunctionDistinctGenericData {
|
||||
void add(const IColumn** columns, size_t columns_num, size_t row_num, Arena* arena) {
|
||||
const char* begin = nullptr;
|
||||
StringRef value(begin, 0);
|
||||
for (size_t i = 0; i < columns_num; ++i) {
|
||||
auto cur_ref = columns[i]->serialize_value_into_arena(row_num, *arena, begin);
|
||||
value.data = cur_ref.data - value.size;
|
||||
value.size += cur_ref.size;
|
||||
template <bool stable>
|
||||
struct AggregateFunctionDistinctGenericData {
|
||||
/// When creating, the hash table must be small.
|
||||
using Container = std::conditional_t<stable, phmap::flat_hash_map<StringRef, uint32_t>,
|
||||
HashSetWithStackMemory<StringRef, StringRefHash, 4>>;
|
||||
using Self = AggregateFunctionDistinctGenericData;
|
||||
Container data;
|
||||
|
||||
void merge(const Self& rhs, Arena* arena) {
|
||||
DCHECK(!stable);
|
||||
if constexpr (!stable) {
|
||||
typename Container::LookupResult it;
|
||||
bool inserted;
|
||||
for (const auto& elem : rhs.data) {
|
||||
StringRef key = elem.get_value();
|
||||
key.data = arena->insert(key.data, key.size);
|
||||
data.emplace(key, it, inserted);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void serialize(BufferWritable& buf) const {
|
||||
DCHECK(!stable);
|
||||
if constexpr (!stable) {
|
||||
write_var_uint(data.size(), buf);
|
||||
for (const auto& elem : data) {
|
||||
write_string_binary(elem.get_value(), buf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void deserialize(BufferReadable& buf, Arena* arena) {
|
||||
DCHECK(!stable);
|
||||
if constexpr (!stable) {
|
||||
UInt64 size;
|
||||
read_var_uint(size, buf);
|
||||
|
||||
StringRef ref;
|
||||
for (size_t i = 0; i < size; ++i) {
|
||||
read_string_binary(ref, buf);
|
||||
data.insert(ref);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
template <bool stable>
|
||||
struct AggregateFunctionDistinctSingleGenericData
|
||||
: public AggregateFunctionDistinctGenericData<stable> {
|
||||
using Base = AggregateFunctionDistinctGenericData<stable>;
|
||||
using Base::data;
|
||||
void add(const IColumn** columns, size_t /* columns_num */, size_t row_num, Arena* arena) {
|
||||
auto key = columns[0]->get_data_at(row_num);
|
||||
key.data = arena->insert(key.data, key.size);
|
||||
|
||||
if constexpr (stable) {
|
||||
data.emplace(key, data.size());
|
||||
} else {
|
||||
typename Base::Container::LookupResult it;
|
||||
bool inserted;
|
||||
data.emplace(key, it, inserted);
|
||||
}
|
||||
}
|
||||
|
||||
MutableColumns get_arguments(const DataTypes& argument_types) const {
|
||||
MutableColumns argument_columns;
|
||||
argument_columns.emplace_back(argument_types[0]->create_column());
|
||||
if constexpr (stable) {
|
||||
std::vector<StringRef> tmp(data.size());
|
||||
for (auto it : data) {
|
||||
tmp[it.second] = it.first;
|
||||
}
|
||||
for (int i = 0; i < data.size(); i++) {
|
||||
argument_columns[0]->insert_data(tmp[i].data, tmp[i].size);
|
||||
}
|
||||
} else {
|
||||
for (const auto& elem : data) {
|
||||
argument_columns[0]->insert_data(elem.get_value().data, elem.get_value().size);
|
||||
}
|
||||
}
|
||||
|
||||
Set::LookupResult it;
|
||||
bool inserted;
|
||||
value.data = arena->insert(value.data, value.size);
|
||||
set.emplace(value, it, inserted);
|
||||
return argument_columns;
|
||||
}
|
||||
};
|
||||
|
||||
template <bool stable>
|
||||
struct AggregateFunctionDistinctMultipleGenericData
|
||||
: public AggregateFunctionDistinctGenericData<stable> {
|
||||
using Base = AggregateFunctionDistinctGenericData<stable>;
|
||||
using Base::data;
|
||||
void add(const IColumn** columns, size_t columns_num, size_t row_num, Arena* arena) {
|
||||
const char* begin = nullptr;
|
||||
StringRef key(begin, 0);
|
||||
for (size_t i = 0; i < columns_num; ++i) {
|
||||
auto cur_ref = columns[i]->serialize_value_into_arena(row_num, *arena, begin);
|
||||
key.data = cur_ref.data - key.size;
|
||||
key.size += cur_ref.size;
|
||||
}
|
||||
|
||||
if constexpr (stable) {
|
||||
data.emplace(key, data.size());
|
||||
} else {
|
||||
typename Base::Container::LookupResult it;
|
||||
bool inserted;
|
||||
data.emplace(key, it, inserted);
|
||||
}
|
||||
}
|
||||
|
||||
MutableColumns get_arguments(const DataTypes& argument_types) const {
|
||||
@ -161,10 +228,23 @@ struct AggregateFunctionDistinctMultipleGenericData : public AggregateFunctionDi
|
||||
argument_columns[i] = argument_types[i]->create_column();
|
||||
}
|
||||
|
||||
for (const auto& elem : set) {
|
||||
const char* begin = elem.get_value().data;
|
||||
for (auto& column : argument_columns) {
|
||||
begin = column->deserialize_and_insert_from_arena(begin);
|
||||
if constexpr (stable) {
|
||||
std::vector<StringRef> tmp(data.size());
|
||||
for (auto it : data) {
|
||||
tmp[it.second] = it.first;
|
||||
}
|
||||
for (int i = 0; i < data.size(); i++) {
|
||||
const char* begin = tmp[i].data;
|
||||
for (auto& column : argument_columns) {
|
||||
begin = column->deserialize_and_insert_from_arena(begin);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (const auto& elem : data) {
|
||||
const char* begin = elem.get_value().data;
|
||||
for (auto& column : argument_columns) {
|
||||
begin = column->deserialize_and_insert_from_arena(begin);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -175,9 +255,10 @@ struct AggregateFunctionDistinctMultipleGenericData : public AggregateFunctionDi
|
||||
/** Adaptor for aggregate functions.
|
||||
* Adding -Distinct suffix to aggregate function
|
||||
**/
|
||||
template <typename Data>
|
||||
template <template <bool stable> typename Data, bool stable = false>
|
||||
class AggregateFunctionDistinct
|
||||
: public IAggregateFunctionDataHelper<Data, AggregateFunctionDistinct<Data>> {
|
||||
: public IAggregateFunctionDataHelper<Data<stable>,
|
||||
AggregateFunctionDistinct<Data, stable>> {
|
||||
private:
|
||||
size_t prefix_size;
|
||||
AggregateFunctionPtr nested_func;
|
||||
@ -193,12 +274,13 @@ private:
|
||||
|
||||
public:
|
||||
AggregateFunctionDistinct(AggregateFunctionPtr nested_func_, const DataTypes& arguments)
|
||||
: IAggregateFunctionDataHelper<Data, AggregateFunctionDistinct>(arguments),
|
||||
nested_func(nested_func_),
|
||||
: IAggregateFunctionDataHelper<Data<stable>, AggregateFunctionDistinct<Data, stable>>(
|
||||
arguments),
|
||||
nested_func(std::move(nested_func_)),
|
||||
arguments_num(arguments.size()) {
|
||||
size_t nested_size = nested_func->align_of_data();
|
||||
CHECK_GT(nested_size, 0);
|
||||
prefix_size = (sizeof(Data) + nested_size - 1) / nested_size * nested_size;
|
||||
prefix_size = (sizeof(Data<stable>) + nested_size - 1) / nested_size * nested_size;
|
||||
}
|
||||
|
||||
void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num,
|
||||
@ -221,7 +303,7 @@ public:
|
||||
}
|
||||
|
||||
void insert_result_into(ConstAggregateDataPtr targetplace, IColumn& to) const override {
|
||||
auto place = const_cast<AggregateDataPtr>(targetplace);
|
||||
auto* place = const_cast<AggregateDataPtr>(targetplace);
|
||||
auto arguments = this->data(place).get_arguments(this->argument_types);
|
||||
ColumnRawPtrs arguments_raw(arguments.size());
|
||||
for (size_t i = 0; i < arguments.size(); ++i) {
|
||||
@ -229,11 +311,9 @@ public:
|
||||
}
|
||||
|
||||
assert(!arguments.empty());
|
||||
// nested_func->add_batch_single_place(arguments[0]->size(), get_nested_place(place), arguments_raw.data(), arena);
|
||||
// nested_func->insert_result_into(get_nested_place(place), to, arena);
|
||||
|
||||
Arena arena;
|
||||
nested_func->add_batch_single_place(arguments[0]->size(), get_nested_place(place),
|
||||
arguments_raw.data(), nullptr);
|
||||
arguments_raw.data(), &arena);
|
||||
nested_func->insert_result_into(get_nested_place(place), to);
|
||||
}
|
||||
|
||||
@ -242,12 +322,13 @@ public:
|
||||
size_t align_of_data() const override { return nested_func->align_of_data(); }
|
||||
|
||||
void create(AggregateDataPtr __restrict place) const override {
|
||||
new (place) Data;
|
||||
SAFE_CREATE(nested_func->create(get_nested_place(place)), this->data(place).~Data());
|
||||
new (place) Data<stable>;
|
||||
SAFE_CREATE(nested_func->create(get_nested_place(place)),
|
||||
this->data(place).~Data<stable>());
|
||||
}
|
||||
|
||||
void destroy(AggregateDataPtr __restrict place) const noexcept override {
|
||||
this->data(place).~Data();
|
||||
this->data(place).~Data<stable>();
|
||||
nested_func->destroy(get_nested_place(place));
|
||||
}
|
||||
|
||||
@ -256,6 +337,11 @@ public:
|
||||
DataTypePtr get_return_type() const override { return nested_func->get_return_type(); }
|
||||
|
||||
bool allocates_memory_in_arena() const override { return true; }
|
||||
|
||||
AggregateFunctionPtr transmit_to_stable() override {
|
||||
return AggregateFunctionPtr(new AggregateFunctionDistinct<Data, true>(
|
||||
nested_func, IAggregateFunction::argument_types));
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -223,8 +223,6 @@ public:
|
||||
return nested_function->allocates_memory_in_arena();
|
||||
}
|
||||
|
||||
bool is_state() const override { return nested_function->is_state(); }
|
||||
|
||||
void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num,
|
||||
Arena* arena) const override {
|
||||
const IColumn* nested[num_arguments];
|
||||
|
||||
@ -180,8 +180,6 @@ public:
|
||||
bool allocates_memory_in_arena() const override {
|
||||
return nested_function->allocates_memory_in_arena();
|
||||
}
|
||||
|
||||
bool is_state() const override { return nested_function->is_state(); }
|
||||
};
|
||||
|
||||
/** There are two cases: for single argument and variadic.
|
||||
|
||||
@ -26,7 +26,6 @@
|
||||
|
||||
namespace doris::vectorized {
|
||||
|
||||
void register_aggregate_function_combinator_sort(AggregateFunctionSimpleFactory& factory);
|
||||
void register_aggregate_function_combinator_distinct(AggregateFunctionSimpleFactory& factory);
|
||||
void register_aggregate_function_combinator_foreach(AggregateFunctionSimpleFactory& factory);
|
||||
|
||||
|
||||
@ -126,13 +126,16 @@ private:
|
||||
}
|
||||
|
||||
public:
|
||||
AggregateFunctionSort(const AggregateFunctionPtr& nested_func, const DataTypes& arguments,
|
||||
AggregateFunctionSort(AggregateFunctionPtr nested_func, const DataTypes& arguments,
|
||||
const SortDescription& sort_desc, const RuntimeState* state)
|
||||
: IAggregateFunctionDataHelper<Data, AggregateFunctionSort>(arguments),
|
||||
_nested_func(nested_func),
|
||||
_nested_func(std::move(nested_func)),
|
||||
_arguments(arguments),
|
||||
_sort_desc(sort_desc),
|
||||
_state(state) {
|
||||
if (auto f = _nested_func->transmit_to_stable(); f) {
|
||||
_nested_func = f;
|
||||
}
|
||||
for (const auto& type : _arguments) {
|
||||
_block.insert({type, ""});
|
||||
}
|
||||
@ -158,7 +161,8 @@ public:
|
||||
}
|
||||
|
||||
void insert_result_into(ConstAggregateDataPtr targetplace, IColumn& to) const override {
|
||||
auto place = const_cast<AggregateDataPtr>(targetplace);
|
||||
auto* place = const_cast<AggregateDataPtr>(targetplace);
|
||||
Arena arena;
|
||||
if (!this->data(place).block.is_empty_column()) {
|
||||
this->data(place).sort();
|
||||
|
||||
@ -167,9 +171,10 @@ public:
|
||||
arguments_nested.emplace_back(
|
||||
this->data(place).block.get_by_position(i).column.get());
|
||||
}
|
||||
|
||||
_nested_func->add_batch_single_place(arguments_nested[0]->size(),
|
||||
get_nested_place(place), arguments_nested.data(),
|
||||
nullptr);
|
||||
&arena);
|
||||
}
|
||||
|
||||
_nested_func->insert_result_into(get_nested_place(place), to);
|
||||
|
||||
Reference in New Issue
Block a user