[improvement](agg) Serialize the fixed-length aggregation results with corresponding columns instead of ColumnString (#11801)

This commit is contained in:
Jerry Hu
2022-08-22 10:12:06 +08:00
committed by GitHub
parent 915d8989c5
commit dc8f64b3e3
23 changed files with 969 additions and 99 deletions

View File

@ -103,6 +103,9 @@ AggregationNode::AggregationNode(ObjectPool* pool, const TPlanNode& tnode,
}
_is_first_phase = tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase;
_use_fixed_length_serialization_opt =
tnode.agg_node.__isset.use_fixed_length_serialization_opt &&
tnode.agg_node.use_fixed_length_serialization_opt;
}
AggregationNode::~AggregationNode() = default;
@ -275,11 +278,14 @@ Status AggregationNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::prepare(state));
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
_build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
_serialize_key_timer = ADD_TIMER(runtime_profile(), "SerializeKeyTimer");
_serialize_key_timer = ADD_TIMER(runtime_profile(), "SerializeKeyTime");
_exec_timer = ADD_TIMER(runtime_profile(), "ExecTime");
_merge_timer = ADD_TIMER(runtime_profile(), "MergeTime");
_expr_timer = ADD_TIMER(runtime_profile(), "ExprTime");
_get_results_timer = ADD_TIMER(runtime_profile(), "GetResultsTime");
_serialize_data_timer = ADD_TIMER(runtime_profile(), "SerializeDataTime");
_deserialize_data_timer = ADD_TIMER(runtime_profile(), "DeserializeDataTime");
_data_mem_tracker = std::make_unique<MemTracker>("AggregationNode:Data");
_intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
@ -565,20 +571,34 @@ Status AggregationNode::_serialize_without_key(RuntimeState* state, Block* block
MutableColumns value_columns(agg_size);
std::vector<DataTypePtr> data_types(agg_size);
// will serialize data to string column
std::vector<VectorBufferWriter> value_buffer_writers;
auto serialize_string_type = std::make_shared<DataTypeString>();
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
data_types[i] = serialize_string_type;
value_columns[i] = serialize_string_type->create_column();
value_buffer_writers.emplace_back(*reinterpret_cast<ColumnString*>(value_columns[i].get()));
}
if (_use_fixed_length_serialization_opt) {
auto serialize_string_type = std::make_shared<DataTypeString>();
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
data_types[i] = _aggregate_evaluators[i]->function()->get_serialized_type();
value_columns[i] = _aggregate_evaluators[i]->function()->create_serialize_column();
}
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
_aggregate_evaluators[i]->function()->serialize(
_agg_data.without_key + _offsets_of_aggregate_states[i], value_buffer_writers[i]);
value_buffer_writers[i].commit();
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
_aggregate_evaluators[i]->function()->serialize_without_key_to_column(
_agg_data.without_key + _offsets_of_aggregate_states[i], value_columns[i]);
}
} else {
std::vector<VectorBufferWriter> value_buffer_writers;
auto serialize_string_type = std::make_shared<DataTypeString>();
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
data_types[i] = serialize_string_type;
value_columns[i] = serialize_string_type->create_column();
value_buffer_writers.emplace_back(
*reinterpret_cast<ColumnString*>(value_columns[i].get()));
}
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
_aggregate_evaluators[i]->function()->serialize(
_agg_data.without_key + _offsets_of_aggregate_states[i],
value_buffer_writers[i]);
value_buffer_writers[i].commit();
}
}
{
ColumnsWithTypeAndName data_with_schema;
@ -607,8 +627,6 @@ Status AggregationNode::_execute_without_key(Block* block) {
Status AggregationNode::_merge_without_key(Block* block) {
SCOPED_TIMER(_merge_timer);
DCHECK(_agg_data.without_key != nullptr);
std::unique_ptr<char[]> deserialize_buffer(new char[_total_size_of_aggregate_states]);
int rows = block->rows();
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
if (_aggregate_evaluators[i]->is_merge()) {
int col_id = _get_slot_column_id(_aggregate_evaluators[i]);
@ -617,12 +635,21 @@ Status AggregationNode::_merge_without_key(Block* block) {
column = ((ColumnNullable*)column.get())->get_nested_column_ptr();
}
for (int j = 0; j < rows; ++j) {
VectorBufferReader buffer_reader(((ColumnString*)(column.get()))->get_data_at(j));
_aggregate_evaluators[i]->function()->deserialize_and_merge(
_agg_data.without_key + _offsets_of_aggregate_states[i], buffer_reader,
SCOPED_TIMER(_deserialize_data_timer);
if (_use_fixed_length_serialization_opt) {
_aggregate_evaluators[i]->function()->deserialize_and_merge_from_column(
_agg_data.without_key + _offsets_of_aggregate_states[i], *column,
&_agg_arena_pool);
} else {
const int rows = block->rows();
for (int j = 0; j < rows; ++j) {
VectorBufferReader buffer_reader(
((ColumnString*)(column.get()))->get_data_at(j));
_aggregate_evaluators[i]->function()->deserialize_and_merge(
_agg_data.without_key + _offsets_of_aggregate_states[i], buffer_reader,
&_agg_arena_pool);
}
}
} else {
_aggregate_evaluators[i]->execute_single_add(
@ -872,53 +899,59 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
// do not try to do agg, just init and serialize directly return the out_block
if (!_should_expand_preagg_hash_tables()) {
ret_flag = true;
if (_streaming_pre_places.size() < rows) {
_streaming_pre_places.reserve(rows);
for (size_t i = _streaming_pre_places.size(); i < rows; ++i) {
_streaming_pre_places.emplace_back(_agg_arena_pool.aligned_alloc(
_total_size_of_aggregate_states, _align_aggregate_states));
}
}
for (size_t i = 0; i < rows; ++i) {
_create_agg_status(_streaming_pre_places[i]);
}
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
_aggregate_evaluators[i]->execute_batch_add(
in_block, _offsets_of_aggregate_states[i],
_streaming_pre_places.data(), &_agg_arena_pool, false);
}
// will serialize value data to string column
std::vector<VectorBufferWriter> value_buffer_writers;
bool mem_reuse = out_block->mem_reuse();
auto serialize_string_type = std::make_shared<DataTypeString>();
std::vector<DataTypePtr> data_types;
MutableColumns value_columns;
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
if (mem_reuse) {
value_columns.emplace_back(
std::move(*out_block->get_by_position(i + key_size).column)
.mutate());
} else {
// slot type of value it should always be string type
value_columns.emplace_back(serialize_string_type->create_column());
if (_use_fixed_length_serialization_opt) {
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
auto data_type =
_aggregate_evaluators[i]->function()->get_serialized_type();
if (mem_reuse) {
value_columns.emplace_back(
std::move(*out_block->get_by_position(i + key_size)
.column)
.mutate());
} else {
// slot type of value it should always be string type
value_columns.emplace_back(_aggregate_evaluators[i]
->function()
->create_serialize_column());
}
data_types.emplace_back(data_type);
}
value_buffer_writers.emplace_back(
*reinterpret_cast<ColumnString*>(value_columns[i].get()));
}
for (size_t j = 0; j < rows; ++j) {
for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
_aggregate_evaluators[i]->function()->serialize(
_streaming_pre_places[j] + _offsets_of_aggregate_states[i],
value_buffer_writers[i]);
value_buffer_writers[i].commit();
for (int i = 0; i != _aggregate_evaluators.size(); ++i) {
SCOPED_TIMER(_serialize_data_timer);
_aggregate_evaluators[i]->streaming_agg_serialize_to_column(
in_block, value_columns[i], rows, &_agg_arena_pool);
}
} else {
std::vector<VectorBufferWriter> value_buffer_writers;
auto serialize_string_type = std::make_shared<DataTypeString>();
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
if (mem_reuse) {
value_columns.emplace_back(
std::move(*out_block->get_by_position(i + key_size)
.column)
.mutate());
} else {
// slot type of value it should always be string type
value_columns.emplace_back(
serialize_string_type->create_column());
}
data_types.emplace_back(serialize_string_type);
value_buffer_writers.emplace_back(
*reinterpret_cast<ColumnString*>(value_columns[i].get()));
}
}
for (size_t i = 0; i < rows; ++i) {
_destroy_agg_status(_streaming_pre_places[i]);
for (int i = 0; i != _aggregate_evaluators.size(); ++i) {
SCOPED_TIMER(_serialize_data_timer);
_aggregate_evaluators[i]->streaming_agg_serialize(
in_block, value_buffer_writers[i], rows, &_agg_arena_pool);
}
}
if (!mem_reuse) {
@ -931,7 +964,7 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
}
for (int i = 0; i < value_columns.size(); ++i) {
columns_with_schema.emplace_back(std::move(value_columns[i]),
serialize_string_type, "");
data_types[i], "");
}
out_block->swap(Block(columns_with_schema));
} else {
@ -1073,19 +1106,6 @@ Status AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
}
}
// will serialize data to string column
std::vector<VectorBufferWriter> value_buffer_writers;
auto serialize_string_type = std::make_shared<DataTypeString>();
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
value_data_types[i] = serialize_string_type;
if (mem_reuse) {
value_columns[i] = std::move(*block->get_by_position(i + key_size).column).mutate();
} else {
value_columns[i] = serialize_string_type->create_column();
}
value_buffer_writers.emplace_back(*reinterpret_cast<ColumnString*>(value_columns[i].get()));
}
std::visit(
[&](auto&& agg_method) -> void {
agg_method.init_once();
@ -1095,7 +1115,7 @@ Status AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
const auto size = std::min(data.size(), size_t(state->batch_size()));
using KeyType = std::decay_t<decltype(iter->get_first())>;
std::vector<KeyType> keys(size);
std::vector<AggregateDataPtr> values(size);
std::vector<AggregateDataPtr> values(size + 1);
size_t num_rows = 0;
while (iter != data.end() && num_rows < state->batch_size()) {
@ -1107,31 +1127,60 @@ Status AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
agg_method.insert_keys_into_columns(keys, key_columns, num_rows, _probe_key_sz);
for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
_aggregate_evaluators[i]->function()->serialize_vec(
values, _offsets_of_aggregate_states[i], value_buffer_writers[i],
num_rows);
}
if (iter == data.end()) {
if (agg_method.data.has_null_key_data()) {
DCHECK(key_columns.size() == 1);
DCHECK(key_columns[0]->is_nullable());
if (agg_method.data.has_null_key_data()) {
key_columns[0]->insert_data(nullptr, 0);
auto mapped = agg_method.data.get_null_key_data();
for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
_aggregate_evaluators[i]->function()->serialize(
mapped + _offsets_of_aggregate_states[i],
value_buffer_writers[i]);
value_buffer_writers[i].commit();
}
values[num_rows] = agg_method.data.get_null_key_data();
++num_rows;
*eos = true;
}
} else {
*eos = true;
}
}
if (_use_fixed_length_serialization_opt) {
SCOPED_TIMER(_serialize_data_timer);
for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
value_data_types[i] =
_aggregate_evaluators[i]->function()->get_serialized_type();
if (mem_reuse) {
value_columns[i] =
std::move(*block->get_by_position(i + key_size).column)
.mutate();
} else {
value_columns[i] =
_aggregate_evaluators[i]->function()->create_serialize_column();
}
_aggregate_evaluators[i]->function()->serialize_to_column(
values, _offsets_of_aggregate_states[i], value_columns[i],
num_rows);
}
} else {
SCOPED_TIMER(_serialize_data_timer);
std::vector<VectorBufferWriter> value_buffer_writers;
auto serialize_string_type = std::make_shared<DataTypeString>();
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
value_data_types[i] = serialize_string_type;
if (mem_reuse) {
value_columns[i] =
std::move(*block->get_by_position(i + key_size).column)
.mutate();
} else {
value_columns[i] = serialize_string_type->create_column();
}
value_buffer_writers.emplace_back(
*reinterpret_cast<ColumnString*>(value_columns[i].get()));
}
for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
_aggregate_evaluators[i]->function()->serialize_vec(
values, _offsets_of_aggregate_states[i], value_buffer_writers[i],
num_rows);
}
}
},
_agg_data._aggregated_method_variant);