[Bug](function) catch function calculation error on aggregate node to avoid core dump (#15903)

This commit is contained in:
Pxl
2023-01-16 11:21:28 +08:00
committed by GitHub
parent 3cb7b2ea50
commit 81bab55d43
6 changed files with 91 additions and 57 deletions

View File

@ -80,12 +80,11 @@ AggregationNode::AggregationNode(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: ExecNode(pool, tnode, descs),
_intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
_intermediate_tuple_desc(NULL),
_intermediate_tuple_desc(nullptr),
_output_tuple_id(tnode.agg_node.output_tuple_id),
_output_tuple_desc(NULL),
_output_tuple_desc(nullptr),
_needs_finalize(tnode.agg_node.need_finalize),
_is_merge(false),
_agg_data(),
_build_timer(nullptr),
_serialize_key_timer(nullptr),
_exec_timer(nullptr),
@ -714,9 +713,9 @@ Status AggregationNode::_execute_without_key(Block* block) {
DCHECK(_agg_data->without_key != nullptr);
SCOPED_TIMER(_build_timer);
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
_aggregate_evaluators[i]->execute_single_add(
RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_single_add(
block, _agg_data->without_key + _offsets_of_aggregate_states[i],
_agg_arena_pool.get());
_agg_arena_pool.get()));
}
return Status::OK();
}
@ -749,9 +748,9 @@ Status AggregationNode::_merge_without_key(Block* block) {
}
}
} else {
_aggregate_evaluators[i]->execute_single_add(
RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_single_add(
block, _agg_data->without_key + _offsets_of_aggregate_states[i],
_agg_arena_pool.get());
_agg_arena_pool.get()));
}
}
return Status::OK();
@ -1019,8 +1018,8 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
// to avoid wasting memory.
// But for fixed hash map, it never need to expand
bool ret_flag = false;
std::visit(
[&](auto&& agg_method) -> void {
RETURN_IF_ERROR(std::visit(
[&](auto&& agg_method) -> Status {
if (auto& hash_tbl = agg_method.data; hash_tbl.add_elem_size_overflow(rows)) {
// do not try to do agg, just init and serialize directly return the out_block
if (!_should_expand_preagg_hash_tables()) {
@ -1052,8 +1051,10 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
for (int i = 0; i != _aggregate_evaluators.size(); ++i) {
SCOPED_TIMER(_serialize_data_timer);
_aggregate_evaluators[i]->streaming_agg_serialize_to_column(
in_block, value_columns[i], rows, _agg_arena_pool.get());
RETURN_IF_ERROR(
_aggregate_evaluators[i]->streaming_agg_serialize_to_column(
in_block, value_columns[i], rows,
_agg_arena_pool.get()));
}
} else {
std::vector<VectorBufferWriter> value_buffer_writers;
@ -1076,9 +1077,9 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
for (int i = 0; i != _aggregate_evaluators.size(); ++i) {
SCOPED_TIMER(_serialize_data_timer);
_aggregate_evaluators[i]->streaming_agg_serialize(
RETURN_IF_ERROR(_aggregate_evaluators[i]->streaming_agg_serialize(
in_block, value_buffer_writers[i], rows,
_agg_arena_pool.get());
_agg_arena_pool.get()));
}
}
@ -1104,16 +1105,17 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
}
}
}
return Status::OK();
},
_agg_data->_aggregated_method_variant);
_agg_data->_aggregated_method_variant));
if (!ret_flag) {
RETURN_IF_CATCH_BAD_ALLOC(_emplace_into_hash_table(_places.data(), key_columns, rows));
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
_aggregate_evaluators[i]->execute_batch_add(in_block, _offsets_of_aggregate_states[i],
_places.data(), _agg_arena_pool.get(),
_should_expand_hash_table);
RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add(
in_block, _offsets_of_aggregate_states[i], _places.data(),
_agg_arena_pool.get(), _should_expand_hash_table));
}
}