[improvement][agg]Process aggregated results in the vectorized way (#11084)

This commit is contained in:
Jerry Hu
2022-07-22 22:04:43 +08:00
committed by GitHub
parent ad31b6c902
commit b7c9007776
14 changed files with 314 additions and 33 deletions

View File

@ -959,16 +959,27 @@ Status AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo
auto& data = agg_method.data;
auto& iter = agg_method.iterator;
agg_method.init_once();
while (iter != data.end() && key_columns[0]->size() < state->batch_size()) {
const auto& key = iter->get_first();
auto& mapped = iter->get_second();
agg_method.insert_key_into_columns(key, key_columns, _probe_key_sz);
for (size_t i = 0; i < _aggregate_evaluators.size(); ++i)
_aggregate_evaluators[i]->insert_result_info(
mapped + _offsets_of_aggregate_states[i], value_columns[i].get());
const auto size = std::min(data.size(), size_t(state->batch_size()));
using KeyType = std::decay_t<decltype(iter->get_first())>;
std::vector<KeyType> keys(size);
std::vector<AggregateDataPtr> values(size);
size_t num_rows = 0;
while (iter != data.end() && num_rows < state->batch_size()) {
keys[num_rows] = iter->get_first();
values[num_rows] = iter->get_second();
++iter;
++num_rows;
}
agg_method.insert_keys_into_columns(keys, key_columns, num_rows, _probe_key_sz);
for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
_aggregate_evaluators[i]->insert_result_info_vec(
values, _offsets_of_aggregate_states[i], value_columns[i].get(),
num_rows);
}
if (iter == data.end()) {
if (agg_method.data.has_null_key_data()) {
// only one key of group by support wrap null key
@ -1043,19 +1054,26 @@ Status AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
agg_method.init_once();
auto& data = agg_method.data;
auto& iter = agg_method.iterator;
while (iter != data.end() && key_columns[0]->size() < state->batch_size()) {
const auto& key = iter->get_first();
auto& mapped = iter->get_second();
// insert keys
agg_method.insert_key_into_columns(key, key_columns, _probe_key_sz);
// serialize values
for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
_aggregate_evaluators[i]->function()->serialize(
mapped + _offsets_of_aggregate_states[i], value_buffer_writers[i]);
value_buffer_writers[i].commit();
}
const auto size = std::min(data.size(), size_t(state->batch_size()));
using KeyType = std::decay_t<decltype(iter->get_first())>;
std::vector<KeyType> keys(size);
std::vector<AggregateDataPtr> values(size);
size_t num_rows = 0;
while (iter != data.end() && num_rows < state->batch_size()) {
keys[num_rows] = iter->get_first();
values[num_rows] = iter->get_second();
++iter;
++num_rows;
}
agg_method.insert_keys_into_columns(keys, key_columns, num_rows, _probe_key_sz);
for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
_aggregate_evaluators[i]->function()->serialize_vec(
values, _offsets_of_aggregate_states[i], value_buffer_writers[i],
num_rows);
}
if (iter == data.end()) {
@ -1166,8 +1184,6 @@ Status AggregationNode::_merge_with_serialized_key(Block* block) {
},
_agg_data._aggregated_method_variant);
std::unique_ptr<char[]> deserialize_buffer(new char[_total_size_of_aggregate_states]);
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
DCHECK(_aggregate_evaluators[i]->input_exprs_ctxs().size() == 1 &&
_aggregate_evaluators[i]->input_exprs_ctxs()[0]->root()->is_slot_ref());
@ -1179,21 +1195,16 @@ Status AggregationNode::_merge_with_serialized_key(Block* block) {
column = ((ColumnNullable*)column.get())->get_nested_column_ptr();
}
for (int j = 0; j < rows; ++j) {
VectorBufferReader buffer_reader(((ColumnString*)(column.get()))->get_data_at(j));
_create_agg_status(deserialize_buffer.get());
std::unique_ptr<char> deserialize_buffer(
new char[_aggregate_evaluators[i]->function()->size_of_data() * rows]);
_aggregate_evaluators[i]->function()->deserialize(
deserialize_buffer.get() + _offsets_of_aggregate_states[i], buffer_reader,
&_agg_arena_pool);
_aggregate_evaluators[i]->function()->deserialize_vec(deserialize_buffer.get(),
(ColumnString*)(column.get()),
&_agg_arena_pool, rows);
_aggregate_evaluators[i]->function()->merge_vec(
places.data(), _offsets_of_aggregate_states[i], deserialize_buffer.get(),
&_agg_arena_pool, rows);
_aggregate_evaluators[i]->function()->merge(
places.data()[j] + _offsets_of_aggregate_states[i],
deserialize_buffer.get() + _offsets_of_aggregate_states[i],
&_agg_arena_pool);
_destroy_agg_status(deserialize_buffer.get());
}
} else {
_aggregate_evaluators[i]->execute_batch_add(block, _offsets_of_aggregate_states[i],
places.data(), &_agg_arena_pool);