[improvement](vectorized) Deserialized elements of count distinct aggregation directly inserted into target hashset (#21888)

The original logic is to first deserialize the ColumnString into a HashSet (insert the deserialized elements into the hashset), and then traverse all the HashSet elements into the target HashSet during the merge phase.
After optimization, when deserializing, elements are directly inserted into the target HashSet, thereby reducing unnecessary hashset insert overhead.

In one of our internal query tests, 30 hashsets were merged in second phase aggregation(the average cardinality is 1,400,000), and the cardinality after merging is 42,000,000. After optimization, the MergeTime dropped from 5s965ms to 3s375ms.
This commit is contained in:
ZenoYang
2023-08-02 21:19:56 +08:00
committed by GitHub
parent 781c1d5238
commit 9d3f1dcf44
11 changed files with 195 additions and 53 deletions

View File

@ -143,11 +143,21 @@ public:
virtual void deserialize_vec(AggregateDataPtr places, const ColumnString* column, Arena* arena,
size_t num_rows) const = 0;
virtual void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column,
Arena* arena, const size_t num_rows) const = 0;
virtual void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs,
const ColumnString* column, Arena* arena,
const size_t num_rows) const = 0;
virtual void deserialize_from_column(AggregateDataPtr places, const IColumn& column,
Arena* arena, size_t num_rows) const = 0;
/// Deserializes state and merge it with current aggregation function.
virtual void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf,
virtual void deserialize_and_merge(AggregateDataPtr __restrict place,
AggregateDataPtr __restrict rhs, BufferReadable& buf,
Arena* arena) const = 0;
virtual void deserialize_and_merge_from_column_range(AggregateDataPtr __restrict place,
@ -361,6 +371,51 @@ public:
}
}
void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column, Arena* arena,
const size_t num_rows) const override {
const auto size_of_data = assert_cast<const Derived*>(this)->size_of_data();
for (size_t i = 0; i != num_rows; ++i) {
try {
auto rhs_place = rhs + size_of_data * i;
VectorBufferReader buffer_reader(column->get_data_at(i));
assert_cast<const Derived*>(this)->create(rhs_place);
assert_cast<const Derived*>(this)->deserialize_and_merge(
places[i] + offset, rhs_place, buffer_reader, arena);
} catch (...) {
for (int j = 0; j < i; ++j) {
auto place = rhs + size_of_data * j;
assert_cast<const Derived*>(this)->destroy(place);
}
throw;
}
}
assert_cast<const Derived*>(this)->destroy_vec(rhs, num_rows);
}
void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column,
Arena* arena, const size_t num_rows) const override {
const auto size_of_data = assert_cast<const Derived*>(this)->size_of_data();
for (size_t i = 0; i != num_rows; ++i) {
try {
auto rhs_place = rhs + size_of_data * i;
VectorBufferReader buffer_reader(column->get_data_at(i));
assert_cast<const Derived*>(this)->create(rhs_place);
if (places[i])
assert_cast<const Derived*>(this)->deserialize_and_merge(
places[i] + offset, rhs_place, buffer_reader, arena);
} catch (...) {
for (int j = 0; j < i; ++j) {
auto place = rhs + size_of_data * j;
assert_cast<const Derived*>(this)->destroy(place);
}
throw;
}
}
assert_cast<const Derived*>(this)->destroy_vec(rhs, num_rows);
}
void deserialize_from_column(AggregateDataPtr places, const IColumn& column, Arena* arena,
size_t num_rows) const override {
deserialize_vec(places, assert_cast<const ColumnString*>(&column), arena, num_rows);
@ -395,7 +450,12 @@ public:
for (size_t i = begin; i <= end; ++i) {
VectorBufferReader buffer_reader(
(assert_cast<const ColumnString&>(column)).get_data_at(i));
deserialize_and_merge(place, buffer_reader, arena);
char deserialized_data[size_of_data()];
AggregateDataPtr deserialized_place = (AggregateDataPtr)deserialized_data;
assert_cast<const Derived*>(this)->create(deserialized_place);
DEFER({ assert_cast<const Derived*>(this)->destroy(deserialized_place); });
assert_cast<const Derived*>(this)->deserialize_and_merge(place, deserialized_place,
buffer_reader, arena);
}
}
@ -407,16 +467,10 @@ public:
deserialize_and_merge_from_column_range(place, column, 0, column.size() - 1, arena);
}
void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf,
Arena* arena) const override {
char deserialized_data[size_of_data()];
AggregateDataPtr deserialized_place = (AggregateDataPtr)deserialized_data;
auto derived = static_cast<const Derived*>(this);
derived->create(deserialized_place);
derived->deserialize(deserialized_place, buf, arena);
derived->merge(place, deserialized_place, arena);
derived->destroy(deserialized_place);
void deserialize_and_merge(AggregateDataPtr __restrict place, AggregateDataPtr __restrict rhs,
BufferReadable& buf, Arena* arena) const override {
assert_cast<const Derived*>(this)->deserialize(rhs, buf, arena);
assert_cast<const Derived*>(this)->merge(place, rhs, arena);
}
};
@ -451,16 +505,10 @@ public:
create(place);
}
void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf,
Arena* arena) const override {
char deserialized_data[size_of_data()];
AggregateDataPtr deserialized_place = (AggregateDataPtr)deserialized_data;
auto derived = assert_cast<const Derived*>(this);
derived->create(deserialized_place);
DEFER({ derived->destroy(deserialized_place); });
derived->deserialize(deserialized_place, buf, arena);
derived->merge(place, deserialized_place, arena);
void deserialize_and_merge(AggregateDataPtr __restrict place, AggregateDataPtr __restrict rhs,
BufferReadable& buf, Arena* arena) const override {
assert_cast<const Derived*>(this)->deserialize(rhs, buf, arena);
assert_cast<const Derived*>(this)->merge(place, rhs, arena);
}
};

View File

@ -233,6 +233,22 @@ public:
}
}
void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column, Arena* arena,
const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
this->merge_vec(places, offset, rhs, arena, num_rows);
}
void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column,
Arena* arena, const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
this->merge_vec_selected(places, offset, rhs, arena, num_rows);
}
void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place,
IColumn& to) const override {
auto& col = assert_cast<ColumnFixedLengthObject&>(to);

View File

@ -145,6 +145,22 @@ public:
}
}
void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column, Arena* arena,
const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
this->merge_vec(places, offset, rhs, arena, num_rows);
}
void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column,
Arena* arena, const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
this->merge_vec_selected(places, offset, rhs, arena, num_rows);
}
void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place,
IColumn& to) const override {
auto& col = assert_cast<ColumnFixedLengthObject&>(to);
@ -267,6 +283,22 @@ public:
}
}
void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column, Arena* arena,
const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
this->merge_vec(places, offset, rhs, arena, num_rows);
}
void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column,
Arena* arena, const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
this->merge_vec_selected(places, offset, rhs, arena, num_rows);
}
void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place,
IColumn& to) const override {
auto& col = assert_cast<ColumnFixedLengthObject&>(to);

View File

@ -130,6 +130,22 @@ public:
}
}
void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column, Arena* arena,
const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
this->merge_vec(places, offset, rhs, arena, num_rows);
}
void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column,
Arena* arena, const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
this->merge_vec_selected(places, offset, rhs, arena, num_rows);
}
void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place,
IColumn& to) const override {
auto& col = assert_cast<ColumnUInt64&>(to);

View File

@ -624,6 +624,22 @@ public:
}
}
void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column, Arena* arena,
const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
this->merge_vec(places, offset, rhs, arena, num_rows);
}
void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column,
Arena* arena, const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
this->merge_vec_selected(places, offset, rhs, arena, num_rows);
}
void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place,
IColumn& to) const override {
if constexpr (Data::IsFixedLength) {

View File

@ -65,8 +65,8 @@ public:
to.insert_default();
}
void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf,
Arena* arena) const override {}
void deserialize_and_merge(AggregateDataPtr __restrict place, AggregateDataPtr __restrict rhs,
BufferReadable& buf, Arena* arena) const override {}
void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, const IColumn& column,
Arena* arena) const override {}

View File

@ -143,15 +143,17 @@ public:
}
}
void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf,
Arena* arena) const override {
void deserialize_and_merge(AggregateDataPtr __restrict place, AggregateDataPtr __restrict rhs,
BufferReadable& buf, Arena* arena) const override {
bool flag = true;
if (result_is_nullable) {
read_binary(flag, buf);
}
if (flag) {
set_flag(rhs);
set_flag(place);
nested_function->deserialize_and_merge(nested_place(place), buf, arena);
nested_function->deserialize_and_merge(nested_place(place), nested_place(rhs), buf,
arena);
}
}

View File

@ -177,6 +177,22 @@ public:
}
}
void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column, Arena* arena,
const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
this->merge_vec(places, offset, rhs, arena, num_rows);
}
void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column,
Arena* arena, const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
this->merge_vec_selected(places, offset, rhs, arena, num_rows);
}
void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place,
IColumn& to) const override {
auto& col = assert_cast<ColumnFixedLengthObject&>(to);

View File

@ -193,8 +193,8 @@ public:
}
}
void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf,
Arena* arena) const override {
void deserialize_and_merge(AggregateDataPtr __restrict place, AggregateDataPtr __restrict rhs,
BufferReadable& buf, Arena* arena) const override {
auto& set = this->data(place).set;
UInt64 size;
read_var_uint(size, buf);
@ -210,7 +210,17 @@ public:
void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
Arena* arena) const override {
deserialize_and_merge(place, buf, arena);
auto& set = this->data(place).set;
UInt64 size;
read_var_uint(size, buf);
set.rehash(size + set.size());
for (size_t i = 0; i < size; ++i) {
KeyType ref;
read_pod_binary(ref, buf);
set.insert(ref);
}
}
void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override {

View File

@ -338,7 +338,7 @@ Status AggregationNode::prepare_profile(RuntimeState* state) {
_get_results_timer = ADD_TIMER(runtime_profile(), "GetResultsTime");
_serialize_data_timer = ADD_TIMER(runtime_profile(), "SerializeDataTime");
_serialize_result_timer = ADD_TIMER(runtime_profile(), "SerializeResultTime");
_deserialize_data_timer = ADD_TIMER(runtime_profile(), "DeserializeDataTime");
_deserialize_data_timer = ADD_TIMER(runtime_profile(), "DeserializeAndMergeTime");
_hash_table_compute_timer = ADD_TIMER(runtime_profile(), "HashTableComputeTime");
_hash_table_emplace_timer = ADD_TIMER(runtime_profile(), "HashTableEmplaceTime");
_hash_table_iterate_timer = ADD_TIMER(runtime_profile(), "HashTableIterateTime");

View File

@ -1091,18 +1091,11 @@ private:
{
SCOPED_TIMER(_deserialize_data_timer);
_aggregate_evaluators[i]->function()->deserialize_from_column(
_deserialize_buffer.data(), *column, _agg_arena_pool.get(), rows);
_aggregate_evaluators[i]->function()->deserialize_and_merge_vec_selected(
_places.data(), _offsets_of_aggregate_states[i],
_deserialize_buffer.data(), (ColumnString*)(column.get()),
_agg_arena_pool.get(), rows);
}
DEFER({
_aggregate_evaluators[i]->function()->destroy_vec(
_deserialize_buffer.data(), rows);
});
_aggregate_evaluators[i]->function()->merge_vec_selected(
_places.data(), _offsets_of_aggregate_states[i],
_deserialize_buffer.data(), _agg_arena_pool.get(), rows);
} else {
RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add_selected(
block, _offsets_of_aggregate_states[i], _places.data(),
@ -1133,18 +1126,11 @@ private:
{
SCOPED_TIMER(_deserialize_data_timer);
_aggregate_evaluators[i]->function()->deserialize_from_column(
_deserialize_buffer.data(), *column, _agg_arena_pool.get(), rows);
_aggregate_evaluators[i]->function()->deserialize_and_merge_vec(
_places.data(), _offsets_of_aggregate_states[i],
_deserialize_buffer.data(), (ColumnString*)(column.get()),
_agg_arena_pool.get(), rows);
}
DEFER({
_aggregate_evaluators[i]->function()->destroy_vec(
_deserialize_buffer.data(), rows);
});
_aggregate_evaluators[i]->function()->merge_vec(
_places.data(), _offsets_of_aggregate_states[i],
_deserialize_buffer.data(), _agg_arena_pool.get(), rows);
} else {
RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add(
block, _offsets_of_aggregate_states[i], _places.data(),