diff --git a/be/src/vec/common/pod_array.h b/be/src/vec/common/pod_array.h index bea1fd79fc..959dda59ca 100644 --- a/be/src/vec/common/pod_array.h +++ b/be/src/vec/common/pod_array.h @@ -177,7 +177,7 @@ protected: } bool is_allocated_from_stack() const { - constexpr size_t stack_threshold = TAllocator::getStackThreshold(); + constexpr size_t stack_threshold = TAllocator::get_stack_threshold(); return (stack_threshold > 0) && (allocated_bytes() <= stack_threshold); } diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index f2712542f3..bd76e5f89c 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -95,7 +95,7 @@ struct ProcessHashTableBuild { } _build_side_hash_values.resize(_rows); - auto& arena = _join_node->_arena; + auto& arena = *(_join_node->_arena); { SCOPED_TIMER(_build_side_compute_hash_timer); for (size_t k = 0; k < _rows; ++k) { @@ -152,7 +152,7 @@ struct ProcessHashTableBuild { new (&emplace_result.get_mapped()) Mapped({k, _offset}); inserted_rows.push_back(k); } else { - emplace_result.get_mapped().insert({k, _offset}, _join_node->_arena); + emplace_result.get_mapped().insert({k, _offset}, *(_join_node->_arena)); inserted_rows.push_back(k); }); } else if (!has_runtime_filter && build_unique) { @@ -165,7 +165,7 @@ struct ProcessHashTableBuild { if (emplace_result.is_inserted()) { new (&emplace_result.get_mapped()) Mapped({k, _offset}); } else { - emplace_result.get_mapped().insert({k, _offset}, _join_node->_arena); + emplace_result.get_mapped().insert({k, _offset}, *(_join_node->_arena)); }); } #undef EMPLACE_IMPL @@ -230,6 +230,9 @@ HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const Descr ? tnode.hash_join_node.hash_output_slot_ids : std::vector {}) { _runtime_filter_descs = tnode.runtime_filters; + _arena = std::make_unique(); + _hash_table_variants = std::make_unique(); + _process_hashtable_ctx_variants = std::make_unique(); _init_join_op(); // avoid vector expand change block address. @@ -395,6 +398,7 @@ Status HashJoinNode::close(RuntimeState* state) { VExpr::close(_probe_expr_ctxs, state); if (_vother_join_conjunct_ptr) (*_vother_join_conjunct_ptr)->close(state); + _release_mem(); return VJoinNodeBase::close(state); } @@ -493,7 +497,7 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo LOG(FATAL) << "FATAL: uninited hash table probe"; } }, - _hash_table_variants, _process_hashtable_ctx_variants, + *_hash_table_variants, *_process_hashtable_ctx_variants, make_bool_variant(_need_null_map_for_probe), make_bool_variant(_probe_ignore_null)); } else if (_probe_eos) { if (_is_right_semi_anti || (_is_outer_join && _join_op != TJoinOp::LEFT_OUTER_JOIN)) { @@ -512,7 +516,7 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo LOG(FATAL) << "FATAL: uninited hash table probe"; } }, - _hash_table_variants, _process_hashtable_ctx_variants); + *_hash_table_variants, *_process_hashtable_ctx_variants); } else { *eos = true; return Status::OK(); @@ -691,7 +695,7 @@ Status HashJoinNode::_materialize_build_side(RuntimeState* state) { return ret; } }}, - _hash_table_variants); + *_hash_table_variants); } template @@ -825,7 +829,7 @@ Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, uin : nullptr, &_short_circuit_for_null_in_probe_side); }}, - _hash_table_variants, make_bool_variant(_build_side_ignore_null), + *_hash_table_variants, make_bool_variant(_build_side_ignore_null), make_bool_variant(_short_circuit_for_null_in_build_side)); return st; @@ -847,22 +851,22 @@ void HashJoinNode::_hash_table_init() { switch (_build_expr_ctxs[0]->root()->result_type()) { case TYPE_BOOLEAN: case TYPE_TINYINT: - _hash_table_variants.emplace>(); + _hash_table_variants->emplace>(); break; case TYPE_SMALLINT: - _hash_table_variants.emplace>(); + _hash_table_variants->emplace>(); break; case TYPE_INT: case TYPE_FLOAT: case TYPE_DATEV2: - _hash_table_variants.emplace>(); + _hash_table_variants->emplace>(); break; case TYPE_BIGINT: case TYPE_DOUBLE: case TYPE_DATETIME: case TYPE_DATE: case TYPE_DATETIMEV2: - _hash_table_variants.emplace>(); + _hash_table_variants->emplace>(); break; case TYPE_LARGEINT: case TYPE_DECIMALV2: @@ -877,16 +881,16 @@ void HashJoinNode::_hash_table_init() { : type_ptr->get_type_id(); WhichDataType which(idx); if (which.is_decimal32()) { - _hash_table_variants.emplace>(); + _hash_table_variants->emplace>(); } else if (which.is_decimal64()) { - _hash_table_variants.emplace>(); + _hash_table_variants->emplace>(); } else { - _hash_table_variants.emplace>(); + _hash_table_variants->emplace>(); } break; } default: - _hash_table_variants.emplace>(); + _hash_table_variants->emplace>(); } return; } @@ -926,41 +930,41 @@ void HashJoinNode::_hash_table_init() { if (std::tuple_size>::value + key_byte_size <= sizeof(UInt64)) { _hash_table_variants - .emplace>(); + ->emplace>(); } else if (std::tuple_size>::value + key_byte_size <= sizeof(UInt128)) { _hash_table_variants - .emplace>(); + ->emplace>(); } else { _hash_table_variants - .emplace>(); + ->emplace>(); } } else { if (key_byte_size <= sizeof(UInt64)) { _hash_table_variants - .emplace>(); + ->emplace>(); } else if (key_byte_size <= sizeof(UInt128)) { - _hash_table_variants - .emplace>(); + _hash_table_variants->emplace< + I128FixedKeyHashTableContext>(); } else { - _hash_table_variants - .emplace>(); + _hash_table_variants->emplace< + I256FixedKeyHashTableContext>(); } } } else { - _hash_table_variants.emplace>(); + _hash_table_variants->emplace>(); } }, _join_op_variants, make_bool_variant(_have_other_join_conjunct)); - DCHECK(!std::holds_alternative(_hash_table_variants)); + DCHECK(!std::holds_alternative(*_hash_table_variants)); } void HashJoinNode::_process_hashtable_ctx_variants_init(RuntimeState* state) { std::visit( [&](auto&& join_op_variants) { using JoinOpType = std::decay_t; - _process_hashtable_ctx_variants.emplace>( + _process_hashtable_ctx_variants->emplace>( this, state->batch_size()); }, _join_op_variants); @@ -1011,4 +1015,17 @@ void HashJoinNode::_reset_tuple_is_null_column() { } } +void HashJoinNode::_release_mem() { + _arena = nullptr; + _hash_table_variants = nullptr; + _process_hashtable_ctx_variants = nullptr; + _null_map_column = nullptr; + _tuple_is_null_left_flag_column = nullptr; + _tuple_is_null_right_flag_column = nullptr; + _probe_block.clear(); + + std::vector tmp_build_blocks; + _build_blocks.swap(tmp_build_blocks); +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index f2774979c3..c2c62d193f 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -233,10 +233,10 @@ private: int64_t _mem_used; - Arena _arena; - HashTableVariants _hash_table_variants; + std::unique_ptr _arena; + std::unique_ptr _hash_table_variants; - HashTableCtxVariants _process_hashtable_ctx_variants; + std::unique_ptr _process_hashtable_ctx_variants; std::vector _build_blocks; Block _probe_block; @@ -302,6 +302,8 @@ private: static std::vector _convert_block_to_null(Block& block); + void _release_mem(); + template friend struct ProcessHashTableBuild; diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp b/be/src/vec/exec/join/vjoin_node_base.cpp index 9a73de0c5b..3c06e694ab 100644 --- a/be/src/vec/exec/join/vjoin_node_base.cpp +++ b/be/src/vec/exec/join/vjoin_node_base.cpp @@ -66,6 +66,7 @@ VJoinNodeBase::VJoinNodeBase(ObjectPool* pool, const TPlanNode& tnode, const Des Status VJoinNodeBase::close(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VJoinNodeBase::close"); VExpr::close(_output_expr_ctxs, state); + _join_block.clear(); return ExecNode::close(state); } diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index c4c58b38f0..156a8c4def 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -307,6 +307,7 @@ Status VScanNode::close(RuntimeState* state) { for (auto& ctx : _stale_vexpr_ctxs) { (*ctx)->close(state); } + _scanner_pool.clear(); RETURN_IF_ERROR(ExecNode::close(state)); return Status::OK(); diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index bfbd399c5a..3bb955fe1e 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -116,6 +116,8 @@ AggregationNode::AggregationNode(ObjectPool* pool, const TPlanNode& tnode, _use_fixed_length_serialization_opt = tnode.agg_node.__isset.use_fixed_length_serialization_opt && tnode.agg_node.use_fixed_length_serialization_opt; + _agg_data = std::make_unique(); + _agg_arena_pool = std::make_unique(); } AggregationNode::~AggregationNode() = default; @@ -152,18 +154,18 @@ void AggregationNode::_init_hash_method(std::vector& probe_exprs) switch (probe_exprs[0]->root()->result_type()) { case TYPE_TINYINT: case TYPE_BOOLEAN: - _agg_data.init(AggregatedDataVariants::Type::int8_key, is_nullable); + _agg_data->init(AggregatedDataVariants::Type::int8_key, is_nullable); return; case TYPE_SMALLINT: - _agg_data.init(AggregatedDataVariants::Type::int16_key, is_nullable); + _agg_data->init(AggregatedDataVariants::Type::int16_key, is_nullable); return; case TYPE_INT: case TYPE_FLOAT: case TYPE_DATEV2: if (_is_first_phase) - _agg_data.init(AggregatedDataVariants::Type::int32_key, is_nullable); + _agg_data->init(AggregatedDataVariants::Type::int32_key, is_nullable); else - _agg_data.init(AggregatedDataVariants::Type::int32_key_phase2, is_nullable); + _agg_data->init(AggregatedDataVariants::Type::int32_key_phase2, is_nullable); return; case TYPE_BIGINT: case TYPE_DOUBLE: @@ -171,15 +173,15 @@ void AggregationNode::_init_hash_method(std::vector& probe_exprs) case TYPE_DATETIME: case TYPE_DATETIMEV2: if (_is_first_phase) - _agg_data.init(AggregatedDataVariants::Type::int64_key, is_nullable); + _agg_data->init(AggregatedDataVariants::Type::int64_key, is_nullable); else - _agg_data.init(AggregatedDataVariants::Type::int64_key_phase2, is_nullable); + _agg_data->init(AggregatedDataVariants::Type::int64_key_phase2, is_nullable); return; case TYPE_LARGEINT: { if (_is_first_phase) - _agg_data.init(AggregatedDataVariants::Type::int128_key, is_nullable); + _agg_data->init(AggregatedDataVariants::Type::int128_key, is_nullable); else - _agg_data.init(AggregatedDataVariants::Type::int128_key_phase2, is_nullable); + _agg_data->init(AggregatedDataVariants::Type::int128_key_phase2, is_nullable); return; } case TYPE_DECIMALV2: @@ -194,30 +196,30 @@ void AggregationNode::_init_hash_method(std::vector& probe_exprs) WhichDataType which(idx); if (which.is_decimal32()) { if (_is_first_phase) - _agg_data.init(AggregatedDataVariants::Type::int32_key, is_nullable); + _agg_data->init(AggregatedDataVariants::Type::int32_key, is_nullable); else - _agg_data.init(AggregatedDataVariants::Type::int32_key_phase2, is_nullable); + _agg_data->init(AggregatedDataVariants::Type::int32_key_phase2, is_nullable); } else if (which.is_decimal64()) { if (_is_first_phase) - _agg_data.init(AggregatedDataVariants::Type::int64_key, is_nullable); + _agg_data->init(AggregatedDataVariants::Type::int64_key, is_nullable); else - _agg_data.init(AggregatedDataVariants::Type::int64_key_phase2, is_nullable); + _agg_data->init(AggregatedDataVariants::Type::int64_key_phase2, is_nullable); } else { if (_is_first_phase) - _agg_data.init(AggregatedDataVariants::Type::int128_key, is_nullable); + _agg_data->init(AggregatedDataVariants::Type::int128_key, is_nullable); else - _agg_data.init(AggregatedDataVariants::Type::int128_key_phase2, is_nullable); + _agg_data->init(AggregatedDataVariants::Type::int128_key_phase2, is_nullable); } return; } case TYPE_CHAR: case TYPE_VARCHAR: case TYPE_STRING: { - _agg_data.init(AggregatedDataVariants::Type::string_key, is_nullable); + _agg_data->init(AggregatedDataVariants::Type::string_key, is_nullable); break; } default: - _agg_data.init(AggregatedDataVariants::Type::serialized); + _agg_data->init(AggregatedDataVariants::Type::serialized); } } else { bool use_fixed_key = true; @@ -248,41 +250,41 @@ void AggregationNode::_init_hash_method(std::vector& probe_exprs) if (has_null) { if (std::tuple_size>::value + key_byte_size <= sizeof(UInt64)) { if (_is_first_phase) - _agg_data.init(AggregatedDataVariants::Type::int64_keys, has_null); + _agg_data->init(AggregatedDataVariants::Type::int64_keys, has_null); else - _agg_data.init(AggregatedDataVariants::Type::int64_keys_phase2, has_null); + _agg_data->init(AggregatedDataVariants::Type::int64_keys_phase2, has_null); } else if (std::tuple_size>::value + key_byte_size <= sizeof(UInt128)) { if (_is_first_phase) - _agg_data.init(AggregatedDataVariants::Type::int128_keys, has_null); + _agg_data->init(AggregatedDataVariants::Type::int128_keys, has_null); else - _agg_data.init(AggregatedDataVariants::Type::int128_keys_phase2, has_null); + _agg_data->init(AggregatedDataVariants::Type::int128_keys_phase2, has_null); } else { if (_is_first_phase) - _agg_data.init(AggregatedDataVariants::Type::int256_keys, has_null); + _agg_data->init(AggregatedDataVariants::Type::int256_keys, has_null); else - _agg_data.init(AggregatedDataVariants::Type::int256_keys_phase2, has_null); + _agg_data->init(AggregatedDataVariants::Type::int256_keys_phase2, has_null); } } else { if (key_byte_size <= sizeof(UInt64)) { if (_is_first_phase) - _agg_data.init(AggregatedDataVariants::Type::int64_keys, has_null); + _agg_data->init(AggregatedDataVariants::Type::int64_keys, has_null); else - _agg_data.init(AggregatedDataVariants::Type::int64_keys_phase2, has_null); + _agg_data->init(AggregatedDataVariants::Type::int64_keys_phase2, has_null); } else if (key_byte_size <= sizeof(UInt128)) { if (_is_first_phase) - _agg_data.init(AggregatedDataVariants::Type::int128_keys, has_null); + _agg_data->init(AggregatedDataVariants::Type::int128_keys, has_null); else - _agg_data.init(AggregatedDataVariants::Type::int128_keys_phase2, has_null); + _agg_data->init(AggregatedDataVariants::Type::int128_keys_phase2, has_null); } else { if (_is_merge) - _agg_data.init(AggregatedDataVariants::Type::int256_keys, has_null); + _agg_data->init(AggregatedDataVariants::Type::int256_keys, has_null); else - _agg_data.init(AggregatedDataVariants::Type::int256_keys_phase2, has_null); + _agg_data->init(AggregatedDataVariants::Type::int256_keys_phase2, has_null); } } } else { - _agg_data.init(AggregatedDataVariants::Type::serialized); + _agg_data->init(AggregatedDataVariants::Type::serialized); } } } // namespace doris::vectorized @@ -364,9 +366,9 @@ Status AggregationNode::prepare(RuntimeState* state) { } if (_probe_expr_ctxs.empty()) { - _agg_data.init(AggregatedDataVariants::Type::without_key); + _agg_data->init(AggregatedDataVariants::Type::without_key); - _agg_data.without_key = reinterpret_cast( + _agg_data->without_key = reinterpret_cast( _mem_pool->allocate(_total_size_of_aggregate_states)); if (_is_merge) { @@ -405,7 +407,7 @@ Status AggregationNode::prepare(RuntimeState* state) { _align_aggregate_states) * _align_aggregate_states)); }, - _agg_data._aggregated_method_variant); + _agg_data->_aggregated_method_variant); if (_is_merge) { _executor.execute = std::bind(&AggregationNode::_merge_with_serialized_key, this, std::placeholders::_1); @@ -462,7 +464,7 @@ Status AggregationNode::open(RuntimeState* state) { // because during prepare and open thread is not the same one, // this could cause unable to get JVM if (_probe_expr_ctxs.empty()) { - _create_agg_status(_agg_data.without_key); + _create_agg_status(_agg_data->without_key); _agg_data_created_without_key = true; } bool eos = false; @@ -538,8 +540,9 @@ Status AggregationNode::close(RuntimeState* state) { [&](auto&& agg_method) { COUNTER_SET(_hash_table_size_counter, int64_t(agg_method.data.size())); }, - _agg_data._aggregated_method_variant); + _agg_data->_aggregated_method_variant); } + _release_mem(); return ExecNode::close(state); } @@ -559,7 +562,7 @@ Status AggregationNode::_destroy_agg_status(AggregateDataPtr data) { } Status AggregationNode::_get_without_key_result(RuntimeState* state, Block* block, bool* eos) { - DCHECK(_agg_data.without_key != nullptr); + DCHECK(_agg_data->without_key != nullptr); block->clear(); *block = VectorizedUtils::create_empty_columnswithtypename(_row_descriptor); @@ -575,7 +578,7 @@ Status AggregationNode::_get_without_key_result(RuntimeState* state, Block* bloc for (int i = 0; i < _aggregate_evaluators.size(); ++i) { auto column = columns[i].get(); _aggregate_evaluators[i]->insert_result_info( - _agg_data.without_key + _offsets_of_aggregate_states[i], column); + _agg_data->without_key + _offsets_of_aggregate_states[i], column); } const auto& block_schema = block->get_columns_with_type_and_name(); @@ -613,7 +616,7 @@ Status AggregationNode::_serialize_without_key(RuntimeState* state, Block* block } block->clear(); - DCHECK(_agg_data.without_key != nullptr); + DCHECK(_agg_data->without_key != nullptr); int agg_size = _aggregate_evaluators.size(); MutableColumns value_columns(agg_size); @@ -628,7 +631,7 @@ Status AggregationNode::_serialize_without_key(RuntimeState* state, Block* block for (int i = 0; i < _aggregate_evaluators.size(); ++i) { _aggregate_evaluators[i]->function()->serialize_without_key_to_column( - _agg_data.without_key + _offsets_of_aggregate_states[i], value_columns[i]); + _agg_data->without_key + _offsets_of_aggregate_states[i], value_columns[i]); } } else { std::vector value_buffer_writers; @@ -642,7 +645,7 @@ Status AggregationNode::_serialize_without_key(RuntimeState* state, Block* block for (int i = 0; i < _aggregate_evaluators.size(); ++i) { _aggregate_evaluators[i]->function()->serialize( - _agg_data.without_key + _offsets_of_aggregate_states[i], + _agg_data->without_key + _offsets_of_aggregate_states[i], value_buffer_writers[i]); value_buffer_writers[i].commit(); } @@ -662,18 +665,19 @@ Status AggregationNode::_serialize_without_key(RuntimeState* state, Block* block } Status AggregationNode::_execute_without_key(Block* block) { - DCHECK(_agg_data.without_key != nullptr); + DCHECK(_agg_data->without_key != nullptr); SCOPED_TIMER(_build_timer); for (int i = 0; i < _aggregate_evaluators.size(); ++i) { _aggregate_evaluators[i]->execute_single_add( - block, _agg_data.without_key + _offsets_of_aggregate_states[i], &_agg_arena_pool); + block, _agg_data->without_key + _offsets_of_aggregate_states[i], + _agg_arena_pool.get()); } return Status::OK(); } Status AggregationNode::_merge_without_key(Block* block) { SCOPED_TIMER(_merge_timer); - DCHECK(_agg_data.without_key != nullptr); + DCHECK(_agg_data->without_key != nullptr); for (int i = 0; i < _aggregate_evaluators.size(); ++i) { if (_aggregate_evaluators[i]->is_merge()) { int col_id = _get_slot_column_id(_aggregate_evaluators[i]); @@ -685,8 +689,8 @@ Status AggregationNode::_merge_without_key(Block* block) { SCOPED_TIMER(_deserialize_data_timer); if (_use_fixed_length_serialization_opt) { _aggregate_evaluators[i]->function()->deserialize_and_merge_from_column( - _agg_data.without_key + _offsets_of_aggregate_states[i], *column, - &_agg_arena_pool); + _agg_data->without_key + _offsets_of_aggregate_states[i], *column, + _agg_arena_pool.get()); } else { const int rows = block->rows(); for (int j = 0; j < rows; ++j) { @@ -694,22 +698,22 @@ Status AggregationNode::_merge_without_key(Block* block) { ((ColumnString*)(column.get()))->get_data_at(j)); _aggregate_evaluators[i]->function()->deserialize_and_merge( - _agg_data.without_key + _offsets_of_aggregate_states[i], buffer_reader, - &_agg_arena_pool); + _agg_data->without_key + _offsets_of_aggregate_states[i], buffer_reader, + _agg_arena_pool.get()); } } } else { _aggregate_evaluators[i]->execute_single_add( - block, _agg_data.without_key + _offsets_of_aggregate_states[i], - &_agg_arena_pool); + block, _agg_data->without_key + _offsets_of_aggregate_states[i], + _agg_arena_pool.get()); } } return Status::OK(); } void AggregationNode::_update_memusage_without_key() { - _data_mem_tracker->consume(_agg_arena_pool.size() - _mem_usage_record.used_in_arena); - _mem_usage_record.used_in_arena = _agg_arena_pool.size(); + _data_mem_tracker->consume(_agg_arena_pool->size() - _mem_usage_record.used_in_arena); + _mem_usage_record.used_in_arena = _agg_arena_pool->size(); } void AggregationNode::_close_without_key() { @@ -717,7 +721,7 @@ void AggregationNode::_close_without_key() { //but finally call close to destory agg data, if agg data has bitmapValue //will be core dump, it's not initialized if (_agg_data_created_without_key) { - _destroy_agg_status(_agg_data.without_key); + _destroy_agg_status(_agg_data->without_key); _agg_data_created_without_key = false; } release_tracker(); @@ -782,12 +786,12 @@ bool AggregationNode::_should_expand_preagg_hash_tables() { _should_expand_hash_table = current_reduction > min_reduction; return _should_expand_hash_table; }, - _agg_data._aggregated_method_variant); + _agg_data->_aggregated_method_variant); } size_t AggregationNode::_get_hash_table_size() { return std::visit([&](auto&& agg_method) { return agg_method.data.size(); }, - _agg_data._aggregated_method_variant); + _agg_data->_aggregated_method_variant); } void AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnRawPtrs& key_columns, @@ -812,7 +816,7 @@ void AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR } else { for (size_t i = 0; i < num_rows; ++i) { _hash_values[i] = - agg_method.data.hash(state.get_key_holder(i, _agg_arena_pool)); + agg_method.data.hash(state.get_key_holder(i, *_agg_arena_pool)); } } } @@ -822,7 +826,7 @@ void AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR if constexpr (HashTableTraits::is_string_hash_table && !std::is_same_v) { StringRef string_ref = to_string_ref(key); - ArenaKeyHolder key_holder {string_ref, _agg_arena_pool}; + ArenaKeyHolder key_holder {string_ref, *_agg_arena_pool}; key_holder_persist_key(key_holder); auto mapped = _aggregate_data_container->append_data(key_holder.key); _create_agg_status(mapped); @@ -835,8 +839,8 @@ void AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR }; auto creator_for_null_key = [this](auto& mapped) { - mapped = _agg_arena_pool.aligned_alloc(_total_size_of_aggregate_states, - _align_aggregate_states); + mapped = _agg_arena_pool->aligned_alloc(_total_size_of_aggregate_states, + _align_aggregate_states); _create_agg_status(mapped); }; @@ -848,7 +852,7 @@ void AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR if (LIKELY(i + HASH_MAP_PREFETCH_DIST < num_rows)) { if constexpr (HashTableTraits::is_parallel_phmap) { agg_method.data.prefetch_by_key(state.get_key_holder( - i + HASH_MAP_PREFETCH_DIST, _agg_arena_pool)); + i + HASH_MAP_PREFETCH_DIST, *_agg_arena_pool)); } else agg_method.data.prefetch_by_hash( _hash_values[i + HASH_MAP_PREFETCH_DIST]); @@ -857,19 +861,19 @@ void AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR if constexpr (ColumnsHashing::IsSingleNullableColumnMethod< AggState>::value) { mapped = state.lazy_emplace_key(agg_method.data, _hash_values[i], i, - _agg_arena_pool, creator, + *_agg_arena_pool, creator, creator_for_null_key); } else { mapped = state.lazy_emplace_key(agg_method.data, _hash_values[i], i, - _agg_arena_pool, creator); + *_agg_arena_pool, creator); } } else { if constexpr (ColumnsHashing::IsSingleNullableColumnMethod< AggState>::value) { - mapped = state.lazy_emplace_key(agg_method.data, i, _agg_arena_pool, + mapped = state.lazy_emplace_key(agg_method.data, i, *_agg_arena_pool, creator, creator_for_null_key); } else { - mapped = state.lazy_emplace_key(agg_method.data, i, _agg_arena_pool, + mapped = state.lazy_emplace_key(agg_method.data, i, *_agg_arena_pool, creator); } } @@ -878,7 +882,7 @@ void AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR assert(places[i] != nullptr); } }, - _agg_data._aggregated_method_variant); + _agg_data->_aggregated_method_variant); } void AggregationNode::_find_in_hash_table(AggregateDataPtr* places, ColumnRawPtrs& key_columns, @@ -902,7 +906,7 @@ void AggregationNode::_find_in_hash_table(AggregateDataPtr* places, ColumnRawPtr } else { for (size_t i = 0; i < rows; ++i) { _hash_values[i] = - agg_method.data.hash(state.get_key_holder(i, _agg_arena_pool)); + agg_method.data.hash(state.get_key_holder(i, *_agg_arena_pool)); } } } @@ -914,16 +918,16 @@ void AggregationNode::_find_in_hash_table(AggregateDataPtr* places, ColumnRawPtr if (LIKELY(i + HASH_MAP_PREFETCH_DIST < rows)) { if constexpr (HashTableTraits::is_parallel_phmap) { agg_method.data.prefetch_by_key(state.get_key_holder( - i + HASH_MAP_PREFETCH_DIST, _agg_arena_pool)); + i + HASH_MAP_PREFETCH_DIST, *_agg_arena_pool)); } else agg_method.data.prefetch_by_hash( _hash_values[i + HASH_MAP_PREFETCH_DIST]); } return state.find_key(agg_method.data, _hash_values[i], i, - _agg_arena_pool); + *_agg_arena_pool); } else { - return state.find_key(agg_method.data, i, _agg_arena_pool); + return state.find_key(agg_method.data, i, *_agg_arena_pool); } }(); @@ -933,7 +937,7 @@ void AggregationNode::_find_in_hash_table(AggregateDataPtr* places, ColumnRawPtr places[i] = nullptr; } }, - _agg_data._aggregated_method_variant); + _agg_data->_aggregated_method_variant); } Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* in_block, @@ -1001,7 +1005,7 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i for (int i = 0; i != _aggregate_evaluators.size(); ++i) { SCOPED_TIMER(_serialize_data_timer); _aggregate_evaluators[i]->streaming_agg_serialize_to_column( - in_block, value_columns[i], rows, &_agg_arena_pool); + in_block, value_columns[i], rows, _agg_arena_pool.get()); } } else { std::vector value_buffer_writers; @@ -1025,7 +1029,8 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i for (int i = 0; i != _aggregate_evaluators.size(); ++i) { SCOPED_TIMER(_serialize_data_timer); _aggregate_evaluators[i]->streaming_agg_serialize( - in_block, value_buffer_writers[i], rows, &_agg_arena_pool); + in_block, value_buffer_writers[i], rows, + _agg_arena_pool.get()); } } @@ -1052,14 +1057,14 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i } } }, - _agg_data._aggregated_method_variant); + _agg_data->_aggregated_method_variant); if (!ret_flag) { RETURN_IF_CATCH_BAD_ALLOC(_emplace_into_hash_table(_places.data(), key_columns, rows)); for (int i = 0; i < _aggregate_evaluators.size(); ++i) { _aggregate_evaluators[i]->execute_batch_add(in_block, _offsets_of_aggregate_states[i], - _places.data(), &_agg_arena_pool, + _places.data(), _agg_arena_pool.get(), _should_expand_hash_table); } } @@ -1156,7 +1161,7 @@ Status AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo } } }, - _agg_data._aggregated_method_variant); + _agg_data->_aggregated_method_variant); if (!mem_reuse) { *block = column_withschema; @@ -1281,7 +1286,7 @@ Status AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat } } }, - _agg_data._aggregated_method_variant); + _agg_data->_aggregated_method_variant); if (!mem_reuse) { ColumnsWithTypeAndName columns_with_schema; @@ -1311,14 +1316,14 @@ void AggregationNode::_update_memusage_with_serialized_key() { std::visit( [&](auto&& agg_method) -> void { auto& data = agg_method.data; - _data_mem_tracker->consume(_agg_arena_pool.size() - + _data_mem_tracker->consume(_agg_arena_pool->size() - _mem_usage_record.used_in_arena); _data_mem_tracker->consume(data.get_buffer_size_in_bytes() - _mem_usage_record.used_in_state); _mem_usage_record.used_in_state = data.get_buffer_size_in_bytes(); - _mem_usage_record.used_in_arena = _agg_arena_pool.size(); + _mem_usage_record.used_in_arena = _agg_arena_pool->size(); }, - _agg_data._aggregated_method_variant); + _agg_data->_aggregated_method_variant); } void AggregationNode::_close_with_serialized_key() { @@ -1332,7 +1337,7 @@ void AggregationNode::_close_with_serialized_key() { } }); }, - _agg_data._aggregated_method_variant); + _agg_data->_aggregated_method_variant); release_tracker(); } @@ -1340,4 +1345,23 @@ void AggregationNode::release_tracker() { _data_mem_tracker->release(_mem_usage_record.used_in_state + _mem_usage_record.used_in_arena); } +void AggregationNode::_release_mem() { + _agg_data = nullptr; + _aggregate_data_container = nullptr; + _mem_pool = nullptr; + _preagg_block.clear(); + + PODArray tmp_places; + _places.swap(tmp_places); + + std::vector tmp_deserialize_buffer; + _deserialize_buffer.swap(tmp_deserialize_buffer); + + std::vector tmp_hash_values; + _hash_values.swap(tmp_hash_values); + + std::vector tmp_values; + _values.swap(tmp_values); +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h index 8223ae4232..a58e3c524f 100644 --- a/be/src/vec/exec/vaggregation_node.h +++ b/be/src/vec/exec/vaggregation_node.h @@ -614,7 +614,8 @@ struct AggregatedDataVariants { } }; -using AggregatedDataVariantsPtr = std::shared_ptr; +using AggregatedDataVariantsUPtr = std::unique_ptr; +using ArenaUPtr = std::unique_ptr; struct AggregateDataContainer { public: @@ -780,9 +781,9 @@ private: /// The total size of the row from the aggregate functions. size_t _total_size_of_aggregate_states = 0; - AggregatedDataVariants _agg_data; + AggregatedDataVariantsUPtr _agg_data; - Arena _agg_arena_pool; + ArenaUPtr _agg_arena_pool; RuntimeProfile::Counter* _build_timer; RuntimeProfile::Counter* _serialize_key_timer; @@ -881,14 +882,15 @@ private: for (int i = 0; i < _aggregate_evaluators.size(); ++i) { _aggregate_evaluators[i]->execute_batch_add_selected( - block, _offsets_of_aggregate_states[i], _places.data(), &_agg_arena_pool); + block, _offsets_of_aggregate_states[i], _places.data(), + _agg_arena_pool.get()); } } else { _emplace_into_hash_table(_places.data(), key_columns, rows); for (int i = 0; i < _aggregate_evaluators.size(); ++i) { _aggregate_evaluators[i]->execute_batch_add(block, _offsets_of_aggregate_states[i], - _places.data(), &_agg_arena_pool); + _places.data(), _agg_arena_pool.get()); } if (_should_limit_output) { @@ -948,16 +950,16 @@ private: if (_use_fixed_length_serialization_opt) { SCOPED_TIMER(_deserialize_data_timer); _aggregate_evaluators[i]->function()->deserialize_from_column( - _deserialize_buffer.data(), *column, &_agg_arena_pool, rows); + _deserialize_buffer.data(), *column, _agg_arena_pool.get(), rows); } else { SCOPED_TIMER(_deserialize_data_timer); _aggregate_evaluators[i]->function()->deserialize_vec( _deserialize_buffer.data(), (ColumnString*)(column.get()), - &_agg_arena_pool, rows); + _agg_arena_pool.get(), rows); } _aggregate_evaluators[i]->function()->merge_vec_selected( _places.data(), _offsets_of_aggregate_states[i], - _deserialize_buffer.data(), &_agg_arena_pool, rows); + _deserialize_buffer.data(), _agg_arena_pool.get(), rows); _aggregate_evaluators[i]->function()->destroy_vec(_deserialize_buffer.data(), rows); @@ -965,7 +967,7 @@ private: } else { _aggregate_evaluators[i]->execute_batch_add_selected( block, _offsets_of_aggregate_states[i], _places.data(), - &_agg_arena_pool); + _agg_arena_pool.get()); } } } else { @@ -988,24 +990,24 @@ private: if (_use_fixed_length_serialization_opt) { SCOPED_TIMER(_deserialize_data_timer); _aggregate_evaluators[i]->function()->deserialize_from_column( - _deserialize_buffer.data(), *column, &_agg_arena_pool, rows); + _deserialize_buffer.data(), *column, _agg_arena_pool.get(), rows); } else { SCOPED_TIMER(_deserialize_data_timer); _aggregate_evaluators[i]->function()->deserialize_vec( _deserialize_buffer.data(), (ColumnString*)(column.get()), - &_agg_arena_pool, rows); + _agg_arena_pool.get(), rows); } _aggregate_evaluators[i]->function()->merge_vec( _places.data(), _offsets_of_aggregate_states[i], - _deserialize_buffer.data(), &_agg_arena_pool, rows); + _deserialize_buffer.data(), _agg_arena_pool.get(), rows); _aggregate_evaluators[i]->function()->destroy_vec(_deserialize_buffer.data(), rows); } else { - _aggregate_evaluators[i]->execute_batch_add(block, - _offsets_of_aggregate_states[i], - _places.data(), &_agg_arena_pool); + _aggregate_evaluators[i]->execute_batch_add( + block, _offsets_of_aggregate_states[i], _places.data(), + _agg_arena_pool.get()); } } @@ -1024,6 +1026,8 @@ private: void release_tracker(); + void _release_mem(); + using vectorized_execute = std::function; using vectorized_pre_agg = std::function; using vectorized_get_result = diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp index 17bf9a2865..8aa1f8708a 100644 --- a/be/src/vec/exec/vanalytic_eval_node.cpp +++ b/be/src/vec/exec/vanalytic_eval_node.cpp @@ -101,6 +101,7 @@ VAnalyticEvalNode::VAnalyticEvalNode(ObjectPool* pool, const TPlanNode& tnode, std::placeholders::_3); } } + _agg_arena_pool = std::make_unique(); VLOG_ROW << "tnode=" << apache::thrift::ThriftDebugString(tnode) << " AnalyticFnScope: " << _fn_scope; } @@ -184,8 +185,8 @@ Status VAnalyticEvalNode::prepare(RuntimeState* state) { alignment_of_next_state * alignment_of_next_state; } } - _fn_place_ptr = - _agg_arena_pool.aligned_alloc(_total_size_of_aggregate_states, _align_aggregate_states); + _fn_place_ptr = _agg_arena_pool->aligned_alloc(_total_size_of_aggregate_states, + _align_aggregate_states); _create_agg_status(); _executor.insert_result = std::bind(&VAnalyticEvalNode::_insert_result_info, this, std::placeholders::_1); @@ -241,6 +242,7 @@ Status VAnalyticEvalNode::close(RuntimeState* state) { for (auto* agg_function : _agg_functions) agg_function->close(state); _destroy_agg_status(); + _release_mem(); return ExecNode::close(state); } @@ -701,4 +703,18 @@ std::string VAnalyticEvalNode::debug_window_bound_string(TAnalyticWindowBoundary return ss.str(); } +void VAnalyticEvalNode::_release_mem() { + _agg_arena_pool = nullptr; + _mem_pool = nullptr; + + std::vector tmp_input_blocks; + _input_blocks.swap(tmp_input_blocks); + + std::vector> tmp_agg_intput_columns; + _agg_intput_columns.swap(tmp_agg_intput_columns); + + std::vector tmp_result_window_columns; + _result_window_columns.swap(tmp_result_window_columns); +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/vanalytic_eval_node.h b/be/src/vec/exec/vanalytic_eval_node.h index 0afc3be361..9597606f6c 100644 --- a/be/src/vec/exec/vanalytic_eval_node.h +++ b/be/src/vec/exec/vanalytic_eval_node.h @@ -96,6 +96,8 @@ private: executor _executor; + void _release_mem(); + private: enum AnalyticFnScope { PARTITION, RANGE, ROWS }; std::vector _input_blocks; @@ -131,7 +133,7 @@ private: size_t _total_size_of_aggregate_states = 0; /// The max align size for functions size_t _align_aggregate_states = 1; - Arena _agg_arena_pool; + std::unique_ptr _agg_arena_pool; AggregateDataPtr _fn_place_ptr; TTupleId _buffered_tuple_id = 0; diff --git a/be/src/vec/exec/vexcept_node.cpp b/be/src/vec/exec/vexcept_node.cpp index ee38702fbf..8cf391f72f 100644 --- a/be/src/vec/exec/vexcept_node.cpp +++ b/be/src/vec/exec/vexcept_node.cpp @@ -72,7 +72,7 @@ Status VExceptNode::open(RuntimeState* state) { LOG(FATAL) << "FATAL: uninited hash table"; } }, - _hash_table_variants); + *_hash_table_variants); } } return st; @@ -96,7 +96,7 @@ Status VExceptNode::get_next(RuntimeState* state, Block* output_block, bool* eos LOG(FATAL) << "FATAL: uninited hash table"; } }, - _hash_table_variants); + *_hash_table_variants); RETURN_IF_ERROR( VExprContext::filter_block(_vconjunct_ctx_ptr, output_block, output_block->columns())); diff --git a/be/src/vec/exec/vintersect_node.cpp b/be/src/vec/exec/vintersect_node.cpp index 5fcc5f10fa..b232708533 100644 --- a/be/src/vec/exec/vintersect_node.cpp +++ b/be/src/vec/exec/vintersect_node.cpp @@ -73,7 +73,7 @@ Status VIntersectNode::open(RuntimeState* state) { LOG(FATAL) << "FATAL: uninited hash table"; } }, - _hash_table_variants); + *_hash_table_variants); } } return st; @@ -98,7 +98,7 @@ Status VIntersectNode::get_next(RuntimeState* state, Block* output_block, bool* LOG(FATAL) << "FATAL: uninited hash table"; } }, - _hash_table_variants); + *_hash_table_variants); RETURN_IF_ERROR( VExprContext::filter_block(_vconjunct_ctx_ptr, output_block, output_block->columns())); diff --git a/be/src/vec/exec/vrepeat_node.cpp b/be/src/vec/exec/vrepeat_node.cpp index a2001cff15..cb79fcb317 100644 --- a/be/src/vec/exec/vrepeat_node.cpp +++ b/be/src/vec/exec/vrepeat_node.cpp @@ -214,7 +214,7 @@ Status VRepeatNode::close(RuntimeState* state) { } START_AND_SCOPE_SPAN(state->get_tracer(), span, "VRepeatNode::close"); VExpr::close(_expr_ctxs, state); - RETURN_IF_ERROR(child(0)->close(state)); + _release_mem(); return ExecNode::close(state); } @@ -231,4 +231,10 @@ void VRepeatNode::debug_string(int indentation_level, std::stringstream* out) co ExecNode::debug_string(indentation_level, out); *out << ")"; } + +void VRepeatNode::_release_mem() { + _child_block = nullptr; + _intermediate_block = nullptr; +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/vrepeat_node.h b/be/src/vec/exec/vrepeat_node.h index cc857dc33a..1bf047a196 100644 --- a/be/src/vec/exec/vrepeat_node.h +++ b/be/src/vec/exec/vrepeat_node.h @@ -48,6 +48,8 @@ private: using RepeatNode::get_next; Status get_repeated_block(Block* child_block, int repeat_id_idx, Block* output_block); + void _release_mem(); + std::unique_ptr _child_block {}; std::unique_ptr _intermediate_block {}; diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp index 736c9786d9..d4be9f4eb4 100644 --- a/be/src/vec/exec/vset_operation_node.cpp +++ b/be/src/vec/exec/vset_operation_node.cpp @@ -46,11 +46,11 @@ struct HashTableBuild { KeyGetter key_getter(_build_raw_ptrs, _operation_node->_build_key_sz, nullptr); for (size_t k = 0; k < _rows; ++k) { - auto emplace_result = - key_getter.emplace_key(hash_table_ctx.hash_table, k, _operation_node->_arena); + auto emplace_result = key_getter.emplace_key(hash_table_ctx.hash_table, k, + *(_operation_node->_arena)); if (k + 1 < _rows) { - key_getter.prefetch(hash_table_ctx.hash_table, k + 1, _operation_node->_arena); + key_getter.prefetch(hash_table_ctx.hash_table, k + 1, *(_operation_node->_arena)); } if (emplace_result.is_inserted()) { //only inserted once as the same key, others skip @@ -75,7 +75,10 @@ VSetOperationNode::VSetOperationNode(ObjectPool* pool, const TPlanNode& tnode, _valid_element_in_hash_tbl(0), _mem_used(0), _probe_index(-1), - _probe_rows(0) {} + _probe_rows(0) { + _hash_table_variants = std::make_unique(); + _arena = std::make_unique(); +} Status VSetOperationNode::close(RuntimeState* state) { if (is_closed()) { @@ -85,6 +88,7 @@ Status VSetOperationNode::close(RuntimeState* state) { for (auto& exprs : _child_expr_lists) { VExpr::close(exprs, state); } + release_mem(); return ExecNode::close(state); } @@ -150,16 +154,16 @@ void VSetOperationNode::hash_table_init() { switch (_child_expr_lists[0][0]->root()->result_type()) { case TYPE_BOOLEAN: case TYPE_TINYINT: - _hash_table_variants.emplace>(); + _hash_table_variants->emplace>(); break; case TYPE_SMALLINT: - _hash_table_variants.emplace>(); + _hash_table_variants->emplace>(); break; case TYPE_INT: case TYPE_FLOAT: case TYPE_DATEV2: case TYPE_DECIMAL32: - _hash_table_variants.emplace>(); + _hash_table_variants->emplace>(); break; case TYPE_BIGINT: case TYPE_DOUBLE: @@ -167,15 +171,15 @@ void VSetOperationNode::hash_table_init() { case TYPE_DATE: case TYPE_DECIMAL64: case TYPE_DATETIMEV2: - _hash_table_variants.emplace>(); + _hash_table_variants->emplace>(); break; case TYPE_LARGEINT: case TYPE_DECIMALV2: case TYPE_DECIMAL128: - _hash_table_variants.emplace>(); + _hash_table_variants->emplace>(); break; default: - _hash_table_variants.emplace>(); + _hash_table_variants->emplace>(); } return; } @@ -209,29 +213,29 @@ void VSetOperationNode::hash_table_init() { if (has_null) { if (std::tuple_size>::value + key_byte_size <= sizeof(UInt64)) { _hash_table_variants - .emplace>(); + ->emplace>(); } else if (std::tuple_size>::value + key_byte_size <= sizeof(UInt128)) { _hash_table_variants - .emplace>(); + ->emplace>(); } else { _hash_table_variants - .emplace>(); + ->emplace>(); } } else { if (key_byte_size <= sizeof(UInt64)) { _hash_table_variants - .emplace>(); + ->emplace>(); } else if (key_byte_size <= sizeof(UInt128)) { _hash_table_variants - .emplace>(); + ->emplace>(); } else { _hash_table_variants - .emplace>(); + ->emplace>(); } } } else { - _hash_table_variants.emplace>(); + _hash_table_variants->emplace>(); } } @@ -272,6 +276,7 @@ Status VSetOperationNode::hash_table_build(RuntimeState* state) { } _build_blocks.emplace_back(mutable_block.to_block()); + child(0)->close(state); RETURN_IF_ERROR(process_build_block(_build_blocks[index], index)); return Status::OK(); } @@ -297,7 +302,7 @@ Status VSetOperationNode::process_build_block(Block& block, uint8_t offset) { LOG(FATAL) << "FATAL: uninited hash table"; } }, - _hash_table_variants); + *_hash_table_variants); return Status::OK(); } @@ -409,5 +414,15 @@ void VSetOperationNode::debug_string(int indentation_level, std::stringstream* o *out << ")" << std::endl; } +void VSetOperationNode::release_mem() { + _hash_table_variants = nullptr; + _arena = nullptr; + + std::vector tmp_build_blocks; + _build_blocks.swap(tmp_build_blocks); + + _probe_block.clear(); +} + } // namespace vectorized } // namespace doris \ No newline at end of file diff --git a/be/src/vec/exec/vset_operation_node.h b/be/src/vec/exec/vset_operation_node.h index 8c20c1447e..0c3b6fec3e 100644 --- a/be/src/vec/exec/vset_operation_node.h +++ b/be/src/vec/exec/vset_operation_node.h @@ -56,16 +56,16 @@ protected: void refresh_hash_table(); Status process_probe_block(RuntimeState* state, int child_id, bool* eos); void create_mutable_cols(Block* output_block); + void release_mem(); protected: - HashTableVariants _hash_table_variants; + std::unique_ptr _hash_table_variants; std::vector _probe_key_sz; std::vector _build_key_sz; std::vector _build_not_ignore_null; - Arena _arena; - AcquireList _acquire_list; + std::unique_ptr _arena; //record element size in hashtable int64_t _valid_element_in_hash_tbl; @@ -157,7 +157,7 @@ void VSetOperationNode::refresh_hash_table() { LOG(FATAL) << "FATAL: uninited hash table"; } }, - _hash_table_variants); + *_hash_table_variants); } template @@ -172,7 +172,7 @@ struct HashTableProbe { _probe_index(operation_node->_probe_index), _num_rows_returned(operation_node->_num_rows_returned), _probe_raw_ptrs(operation_node->_probe_columns), - _arena(operation_node->_arena), + _arena(*(operation_node->_arena)), _rows_returned_counter(operation_node->_rows_returned_counter), _build_col_idx(operation_node->_build_col_idx), _mutable_cols(operation_node->_mutable_cols) {} diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp index cb1eb699ab..37e120b015 100644 --- a/be/src/vec/exec/vsort_node.cpp +++ b/be/src/vec/exec/vsort_node.cpp @@ -100,7 +100,7 @@ Status VSortNode::open(RuntimeState* state) { } } } while (!eos); - + child(0)->close(state); RETURN_IF_ERROR(_sorter->prepare_for_read()); return Status::OK(); } @@ -130,6 +130,7 @@ Status VSortNode::close(RuntimeState* state) { } START_AND_SCOPE_SPAN(state->get_tracer(), span, "VSortNode::close"); _vsort_exec_exprs.close(state); + _sorter = nullptr; return ExecNode::close(state); }