From 118a7dff073c5733541d1cbca55024aaefbdedca Mon Sep 17 00:00:00 2001 From: Adonis Ling Date: Fri, 11 Nov 2022 12:09:54 +0800 Subject: [PATCH] [chore](build) Optimize the compilation time (#14170) Currently, it takes too much time to build BE from source in workflow environments (P0/P1) which affects the efficiency of daily development. We can measure the time by executing the following command. time EXTRA_CXX_FLAGS='-O3' BUILD_TYPE=ASAN ./build.sh --be --fe --clean -j "$(nproc)" This PR optimizes the compilation time by exploiting the following methods. Reduce the codegen by removing some useless std::visit. Disable the optimization for some template functions which are instantiated by std::visit conditionally (except for the RELEASE build). --- be/src/vec/exec/join/vhash_join_node.cpp | 328 +++++++++++----------- be/src/vec/exec/join/vhash_join_node.h | 43 ++- be/src/vec/runtime/vdata_stream_recvr.cpp | 2 + be/src/vec/runtime/vdata_stream_recvr.h | 2 + 4 files changed, 183 insertions(+), 192 deletions(-) diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 5bbf0905cb..86e919a6ed 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -24,11 +24,25 @@ #include "vec/data_types/data_type_number.h" #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" +#include "vec/runtime/shared_hash_table_controller.h" #include "vec/utils/template_helpers.hpp" #include "vec/utils/util.hpp" namespace doris::vectorized { +#define DISABLE_OPTIMIZATION + +#ifndef NDEBUG + +#undef DISABLE_OPTIMIZATION +#if defined(__GNUC__) +#define DISABLE_OPTIMIZATION __attribute__((optimize("O0"))) +#elif defined(__clang__) +#define DISABLE_OPTIMIZATION __attribute__((optnone)) +#endif + +#endif + // TODO: Best prefetch step is decided by machine. We should also provide a // SQL hint to allow users to tune by hand. static constexpr int PREFETCH_STEP = 64; @@ -44,6 +58,15 @@ template Status HashJoinNode::_extract_join_column( std::vector> const&); using ProfileCounter = RuntimeProfile::Counter; + +template +struct Overload : Callables... { + using Callables::operator()...; +}; + +template +Overload(Callables&&... callables) -> Overload; + template struct ProcessHashTableBuild { ProcessHashTableBuild(int rows, Block& acquired_block, ColumnRawPtrs& build_raw_ptrs, @@ -212,7 +235,7 @@ private: HashJoinNode* _join_node; }; -template +template ProcessHashTableProbe::ProcessHashTableProbe(HashJoinNode* join_node, int batch_size) : _join_node(join_node), _batch_size(batch_size), @@ -233,19 +256,19 @@ ProcessHashTableProbe::ProcessHashTableProbe(HashJoinNode* join_node _build_side_output_timer(join_node->_build_side_output_timer), _probe_side_output_timer(join_node->_probe_side_output_timer) {} -template +template template void ProcessHashTableProbe::build_side_output_column( MutableColumns& mcol, int column_offset, int column_length, const std::vector& output_slot_flags, int size) { - constexpr auto is_semi_anti_join = JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN || - JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN || - JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN || - JoinOpType::value == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || - JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN; + constexpr auto is_semi_anti_join = JoinOpType == TJoinOp::RIGHT_ANTI_JOIN || + JoinOpType == TJoinOp::RIGHT_SEMI_JOIN || + JoinOpType == TJoinOp::LEFT_ANTI_JOIN || + JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || + JoinOpType == TJoinOp::LEFT_SEMI_JOIN; - constexpr auto probe_all = JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || - JoinOpType::value == TJoinOp::FULL_OUTER_JOIN; + constexpr auto probe_all = + JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == TJoinOp::FULL_OUTER_JOIN; if constexpr (!is_semi_anti_join || have_other_join_conjunct) { if (_build_blocks.size() == 1) { @@ -307,7 +330,7 @@ void ProcessHashTableProbe::build_side_output_column( } } -template +template void ProcessHashTableProbe::probe_side_output_column( MutableColumns& mcol, const std::vector& output_slot_flags, int size, int last_probe_index, size_t probe_size, bool all_match_one, @@ -328,19 +351,18 @@ void ProcessHashTableProbe::probe_side_output_column( } } - if constexpr (JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN) { + if constexpr (JoinOpType == TJoinOp::RIGHT_OUTER_JOIN) { if (!have_other_join_conjunct) { _tuple_is_null_left_flags->resize_fill(size, 0); } } } -template +template template -Status ProcessHashTableProbe::do_process(HashTableType& hash_table_ctx, - ConstNullMapPtr null_map, - MutableBlock& mutable_block, - Block* output_block, size_t probe_rows) { +DISABLE_OPTIMIZATION Status ProcessHashTableProbe::do_process( + HashTableType& hash_table_ctx, ConstNullMapPtr null_map, MutableBlock& mutable_block, + Block* output_block, size_t probe_rows) { auto& probe_index = _join_node->_probe_index; auto& probe_raw_ptrs = _join_node->_probe_columns; if (probe_index == 0 && _items_counts.size() < probe_rows) { @@ -362,11 +384,11 @@ Status ProcessHashTableProbe::do_process(HashTableType& hash_table_c auto& mcol = mutable_block.mutable_columns(); int current_offset = 0; - constexpr auto is_right_semi_anti_join = JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN || - JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN; + constexpr auto is_right_semi_anti_join = + JoinOpType == TJoinOp::RIGHT_ANTI_JOIN || JoinOpType == TJoinOp::RIGHT_SEMI_JOIN; - constexpr auto probe_all = JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || - JoinOpType::value == TJoinOp::FULL_OUTER_JOIN; + constexpr auto probe_all = + JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == TJoinOp::FULL_OUTER_JOIN; bool all_match_one = true; int last_probe_index = probe_index; @@ -406,12 +428,12 @@ Status ProcessHashTableProbe::do_process(HashTableType& hash_table_c key_getter.template prefetch(*hash_table_ctx.hash_table_ptr, probe_index + PREFETCH_STEP, _arena); - if constexpr (JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN || - JoinOpType::value == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { + if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN || + JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { if (!find_result.is_found()) { ++current_offset; } - } else if constexpr (JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN) { + } else if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN) { if (find_result.is_found()) { ++current_offset; } @@ -482,8 +504,8 @@ Status ProcessHashTableProbe::do_process(HashTableType& hash_table_c _join_node->_right_output_slot_flags, current_offset); } - if constexpr (JoinOpType::value != TJoinOp::RIGHT_SEMI_JOIN && - JoinOpType::value != TJoinOp::RIGHT_ANTI_JOIN) { + if constexpr (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN && + JoinOpType != TJoinOp::RIGHT_ANTI_JOIN) { SCOPED_TIMER(_probe_side_output_timer); probe_side_output_column(mcol, _join_node->_left_output_slot_flags, current_offset, last_probe_index, probe_index - last_probe_index, all_match_one, @@ -495,9 +517,9 @@ Status ProcessHashTableProbe::do_process(HashTableType& hash_table_c return Status::OK(); } -template +template template -Status ProcessHashTableProbe::do_process_with_other_join_conjuncts( +DISABLE_OPTIMIZATION Status ProcessHashTableProbe::do_process_with_other_join_conjuncts( HashTableType& hash_table_ctx, ConstNullMapPtr null_map, MutableBlock& mutable_block, Block* output_block, size_t probe_rows) { auto& probe_index = _join_node->_probe_index; @@ -513,8 +535,8 @@ Status ProcessHashTableProbe::do_process_with_other_join_conjuncts( using KeyGetter = typename HashTableType::State; using Mapped = typename HashTableType::Mapped; if constexpr (std::is_same_v) { - constexpr auto probe_all = JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || - JoinOpType::value == TJoinOp::FULL_OUTER_JOIN; + constexpr auto probe_all = + JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == TJoinOp::FULL_OUTER_JOIN; KeyGetter key_getter(probe_raw_ptrs, _join_node->_probe_key_sz, nullptr); int right_col_idx = _join_node->_left_table_data_types.size(); @@ -602,10 +624,10 @@ Status ProcessHashTableProbe::do_process_with_other_join_conjuncts( for (int i = 0; i < current_offset - origin_offset - 1; ++i) { same_to_prev.emplace_back(true); } - } else if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || - JoinOpType::value == TJoinOp::FULL_OUTER_JOIN || - JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN || - JoinOpType::value == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { + } else if constexpr (JoinOpType == TJoinOp::LEFT_OUTER_JOIN || + JoinOpType == TJoinOp::FULL_OUTER_JOIN || + JoinOpType == TJoinOp::LEFT_ANTI_JOIN || + JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { same_to_prev.emplace_back(false); visited_map.emplace_back(nullptr); // only full outer / left outer need insert the data of right table @@ -649,8 +671,8 @@ Status ProcessHashTableProbe::do_process_with_other_join_conjuncts( (*_join_node->_vother_join_conjunct_ptr)->execute(output_block, &result_column_id); auto column = output_block->get_by_position(result_column_id).column; - if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || - JoinOpType::value == TJoinOp::FULL_OUTER_JOIN) { + if constexpr (JoinOpType == TJoinOp::LEFT_OUTER_JOIN || + JoinOpType == TJoinOp::FULL_OUTER_JOIN) { auto new_filter_column = ColumnVector::create(); auto& filter_map = new_filter_column->get_data(); @@ -693,7 +715,7 @@ Status ProcessHashTableProbe::do_process_with_other_join_conjuncts( } output_block->get_by_position(result_column_id).column = std::move(new_filter_column); - } else if constexpr (JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN) { + } else if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN) { auto new_filter_column = ColumnVector::create(); auto& filter_map = new_filter_column->get_data(); @@ -712,8 +734,8 @@ Status ProcessHashTableProbe::do_process_with_other_join_conjuncts( output_block->get_by_position(result_column_id).column = std::move(new_filter_column); - } else if constexpr (JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN || - JoinOpType::value == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { + } else if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN || + JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { auto new_filter_column = ColumnVector::create(); auto& filter_map = new_filter_column->get_data(); @@ -744,13 +766,13 @@ Status ProcessHashTableProbe::do_process_with_other_join_conjuncts( output_block->get_by_position(result_column_id).column = std::move(new_filter_column); - } else if constexpr (JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN || - JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN) { + } else if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN || + JoinOpType == TJoinOp::RIGHT_ANTI_JOIN) { for (int i = 0; i < column->size(); ++i) { DCHECK(visited_map[i]); *visited_map[i] |= column->get_bool(i); } - } else if constexpr (JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN) { + } else if constexpr (JoinOpType == TJoinOp::RIGHT_OUTER_JOIN) { auto filter_size = 0; for (int i = 0; i < column->size(); ++i) { DCHECK(visited_map[i]); @@ -763,13 +785,13 @@ Status ProcessHashTableProbe::do_process_with_other_join_conjuncts( // inner join do nothing } - if constexpr (JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN || - JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN) { + if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN || + JoinOpType == TJoinOp::RIGHT_ANTI_JOIN) { output_block->clear(); } else { - if constexpr (JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN || - JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN || - JoinOpType::value == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { + if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN || + JoinOpType == TJoinOp::LEFT_ANTI_JOIN || + JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { orig_columns = right_col_idx; } Block::filter_block(output_block, result_column_id, orig_columns); @@ -783,12 +805,11 @@ Status ProcessHashTableProbe::do_process_with_other_join_conjuncts( } } -template +template template -Status ProcessHashTableProbe::process_data_in_hashtable(HashTableType& hash_table_ctx, - MutableBlock& mutable_block, - Block* output_block, - bool* eos) { +DISABLE_OPTIMIZATION Status ProcessHashTableProbe::process_data_in_hashtable( + HashTableType& hash_table_ctx, MutableBlock& mutable_block, Block* output_block, + bool* eos) { using Mapped = typename HashTableType::Mapped; if constexpr (std::is_same_v || std::is_same_v) { @@ -817,20 +838,20 @@ Status ProcessHashTableProbe::process_data_in_hashtable(HashTableTyp if constexpr (std::is_same_v) { if (mapped.visited) { for (auto it = mapped.begin(); it.ok(); ++it) { - if constexpr (JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN) { + if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) { insert_from_hash_table(it->block_offset, it->row_num); } } } else { for (auto it = mapped.begin(); it.ok(); ++it) { - if constexpr (JoinOpType::value != TJoinOp::RIGHT_SEMI_JOIN) { + if constexpr (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN) { insert_from_hash_table(it->block_offset, it->row_num); } } } } else { for (auto it = mapped.begin(); it.ok(); ++it) { - if constexpr (JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN) { + if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) { if (it->visited) insert_from_hash_table(it->block_offset, it->row_num); } else { if (!it->visited) insert_from_hash_table(it->block_offset, it->row_num); @@ -848,8 +869,8 @@ Status ProcessHashTableProbe::process_data_in_hashtable(HashTableTyp } // right outer join / full join need insert data of left table - if constexpr (JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN || - JoinOpType::value == TJoinOp::FULL_OUTER_JOIN) { + if constexpr (JoinOpType == TJoinOp::RIGHT_OUTER_JOIN || + JoinOpType == TJoinOp::FULL_OUTER_JOIN) { for (int i = 0; i < right_col_idx; ++i) { assert_cast(mcol[i].get())->insert_many_defaults(block_size); } @@ -1096,20 +1117,8 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo _null_map_column->get_data().assign(probe_rows, (uint8_t)0); } - Status st = std::visit( - [&](auto&& arg) -> Status { - using HashTableCtxType = std::decay_t; - if constexpr (!std::is_same_v) { - return _extract_join_column(_probe_block, _null_map_column, - _probe_columns, res_col_ids); - } else { - LOG(FATAL) << "FATAL: uninited hash table"; - } - __builtin_unreachable(); - }, - _hash_table_variants); - - RETURN_IF_ERROR(st); + RETURN_IF_ERROR(_extract_join_column(_probe_block, _null_map_column, + _probe_columns, res_col_ids)); } } @@ -1120,14 +1129,14 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo if (_probe_index < _probe_block.rows()) { DCHECK(_has_set_need_null_map_for_probe); - if (_have_other_join_conjunct) { - std::visit( - [&](auto&& arg, auto&& process_hashtable_ctx, auto need_null_map_for_probe, - auto ignore_null) { - using HashTableProbeType = std::decay_t; - if constexpr (!std::is_same_v) { - using HashTableCtxType = std::decay_t; - if constexpr (!std::is_same_v) { + std::visit( + [&](auto&& arg, auto&& process_hashtable_ctx, auto need_null_map_for_probe, + auto ignore_null) { + using HashTableProbeType = std::decay_t; + if constexpr (!std::is_same_v) { + using HashTableCtxType = std::decay_t; + if constexpr (!std::is_same_v) { + if (_have_other_join_conjunct) { st = process_hashtable_ctx .template do_process_with_other_join_conjuncts< need_null_map_for_probe, ignore_null>( @@ -1137,40 +1146,22 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo : nullptr, mutable_join_block, &temp_block, probe_rows); } else { - LOG(FATAL) << "FATAL: uninited hash table"; - } - } else { - LOG(FATAL) << "FATAL: uninited hash probe"; - } - }, - _hash_table_variants, _process_hashtable_ctx_variants, - make_bool_variant(_need_null_map_for_probe), - make_bool_variant(_probe_ignore_null)); - } else { - std::visit( - [&](auto&& arg, auto&& process_hashtable_ctx, auto need_null_map_for_probe, - auto ignore_null) { - using HashTableProbeType = std::decay_t; - if constexpr (!std::is_same_v) { - using HashTableCtxType = std::decay_t; - if constexpr (!std::is_same_v) { st = process_hashtable_ctx.template do_process< need_null_map_for_probe, ignore_null>( arg, need_null_map_for_probe ? &_null_map_column->get_data() : nullptr, mutable_join_block, &temp_block, probe_rows); - } else { - LOG(FATAL) << "FATAL: uninited hash table"; } } else { - LOG(FATAL) << "FATAL: uninited hash probe"; + LOG(FATAL) << "FATAL: uninited hash table"; } - }, - _hash_table_variants, _process_hashtable_ctx_variants, - make_bool_variant(_need_null_map_for_probe), - make_bool_variant(_probe_ignore_null)); - } + } else { + LOG(FATAL) << "FATAL: uninited hash table probe"; + } + }, + _hash_table_variants, _process_hashtable_ctx_variants, + make_bool_variant(_need_null_map_for_probe), make_bool_variant(_probe_ignore_null)); } else if (_probe_eos) { if (_is_right_semi_anti || (_is_outer_join && _join_op != TJoinOp::LEFT_OUTER_JOIN)) { std::visit( @@ -1185,7 +1176,7 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo LOG(FATAL) << "FATAL: uninited hash table"; } } else { - LOG(FATAL) << "FATAL: uninited hash table"; + LOG(FATAL) << "FATAL: uninited hash table probe"; } }, _hash_table_variants, _process_hashtable_ctx_variants); @@ -1320,44 +1311,46 @@ Status HashJoinNode::_materialize_build_side(RuntimeState* state) { } return std::visit( - [&](auto&& arg) -> Status { - using HashTableCtxType = std::decay_t; - if constexpr (!std::is_same_v) { - using HashTableType = typename HashTableCtxType::HashTable; - if (!should_build_hash_table) { - auto& ret = _shared_hashtable_controller->wait_for_hash_table(id()); - if (!ret.status.ok()) { - return ret.status; - } - arg.hash_table_ptr = reinterpret_cast(ret.hash_table_ptr); - _build_blocks = *ret.blocks; - _runtime_filter_slots = _pool->add(new VRuntimeFilterSlots( - _probe_expr_ctxs, _build_expr_ctxs, _runtime_filter_descs)); - RETURN_IF_ERROR( - _runtime_filter_slots->init(state, arg.hash_table_ptr->get_size())); - RETURN_IF_ERROR( - _runtime_filter_slots->apply_from_other(ret.runtime_filter_slots)); - { - SCOPED_TIMER(_push_down_timer); - _runtime_filter_slots->publish(); - } - return Status::OK(); - } else { - arg.hash_table_ptr = &arg.hash_table; - ProcessRuntimeFilterBuild runtime_filter_build_process( - this); - auto ret = runtime_filter_build_process(state, arg); - if (_shared_hashtable_controller) { - SharedHashTableEntry entry(ret, arg.hash_table_ptr, &_build_blocks, - _runtime_filter_slots); - _shared_hashtable_controller->put_hash_table(std::move(entry), id()); - } - return ret; - } - } else { - LOG(FATAL) << "FATAL: uninited hash table"; - } - }, + Overload {[&](std::monostate& arg) -> Status { + LOG(FATAL) << "FATAL: uninited hash table"; + __builtin_unreachable(); + }, + [&](auto&& arg) -> Status { + using HashTableCtxType = std::decay_t; + using HashTableType = typename HashTableCtxType::HashTable; + if (!should_build_hash_table) { + auto& ret = _shared_hashtable_controller->wait_for_hash_table(id()); + if (!ret.status.ok()) { + return ret.status; + } + arg.hash_table_ptr = + reinterpret_cast(ret.hash_table_ptr); + _build_blocks = *ret.blocks; + _runtime_filter_slots = _pool->add(new VRuntimeFilterSlots( + _probe_expr_ctxs, _build_expr_ctxs, _runtime_filter_descs)); + RETURN_IF_ERROR(_runtime_filter_slots->init( + state, arg.hash_table_ptr->get_size())); + RETURN_IF_ERROR(_runtime_filter_slots->apply_from_other( + ret.runtime_filter_slots)); + { + SCOPED_TIMER(_push_down_timer); + _runtime_filter_slots->publish(); + } + return Status::OK(); + } else { + arg.hash_table_ptr = &arg.hash_table; + ProcessRuntimeFilterBuild + runtime_filter_build_process(this); + auto ret = runtime_filter_build_process(state, arg); + if (_shared_hashtable_controller) { + SharedHashTableEntry entry(ret, arg.hash_table_ptr, + &_build_blocks, _runtime_filter_slots); + _shared_hashtable_controller->put_hash_table(std::move(entry), + id()); + } + return ret; + } + }}, _hash_table_variants); } @@ -1469,35 +1462,28 @@ Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, uin } // Get the key column that needs to be built - Status st = std::visit( - [&](auto&& arg) -> Status { - using HashTableCtxType = std::decay_t; - if constexpr (!std::is_same_v) { - return _extract_join_column(block, null_map_val, raw_ptrs, res_col_ids); - } else { - LOG(FATAL) << "FATAL: uninited hash table"; - } - __builtin_unreachable(); - }, - _hash_table_variants); + Status st = _extract_join_column(block, null_map_val, raw_ptrs, res_col_ids); std::visit( - [&](auto&& arg, auto has_null_value, auto short_circuit_for_null_in_build_side) { - using HashTableCtxType = std::decay_t; - if constexpr (!std::is_same_v) { - ProcessHashTableBuild hash_table_build_process( - rows, block, raw_ptrs, this, state->batch_size(), offset); - hash_table_build_process - .template run( - arg, - has_null_value || short_circuit_for_null_in_build_side - ? &null_map_val->get_data() - : nullptr, - &_short_circuit_for_null_in_probe_side); - } else { - LOG(FATAL) << "FATAL: uninited hash table"; - } - }, + Overload { + [&](std::monostate& arg, auto has_null_value, + auto short_circuit_for_null_in_build_side) { + LOG(FATAL) << "FATAL: uninited hash table"; + __builtin_unreachable(); + }, + [&](auto&& arg, auto has_null_value, + auto short_circuit_for_null_in_build_side) { + using HashTableCtxType = std::decay_t; + ProcessHashTableBuild hash_table_build_process( + rows, block, raw_ptrs, this, state->batch_size(), offset); + hash_table_build_process + .template run( + arg, + has_null_value || short_circuit_for_null_in_build_side + ? &null_map_val->get_data() + : nullptr, + &_short_circuit_for_null_in_probe_side); + }}, _hash_table_variants, make_bool_variant(_build_side_ignore_null), make_bool_variant(_short_circuit_for_null_in_build_side)); @@ -1625,13 +1611,15 @@ void HashJoinNode::_hash_table_init() { } }, _join_op_variants, make_bool_variant(_have_other_join_conjunct)); + + DCHECK(!std::holds_alternative(_hash_table_variants)); } void HashJoinNode::_process_hashtable_ctx_variants_init(RuntimeState* state) { std::visit( [&](auto&& join_op_variants) { using JoinOpType = std::decay_t; - _process_hashtable_ctx_variants.emplace>( + _process_hashtable_ctx_variants.emplace>( this, state->batch_size()); }, _join_op_variants); diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index ca930ccda8..a190b2e713 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -16,24 +16,25 @@ // under the License. #pragma once + #include #include -#include "common/object_pool.h" #include "exprs/runtime_filter_slots.h" +#include "join_op.h" #include "vec/common/columns_hashing.h" #include "vec/common/hash_table/hash_map.h" -#include "vec/common/hash_table/hash_table.h" -#include "vec/exec/join/join_op.h" -#include "vec/exec/join/vacquire_list.hpp" -#include "vec/exec/join/vjoin_node_base.h" -#include "vec/functions/function.h" -#include "vec/runtime/shared_hash_table_controller.h" +#include "vjoin_node_base.h" namespace doris { + +class ObjectPool; class IRuntimeFilter; + namespace vectorized { +class SharedHashTableController; + template struct SerializedHashTableContext { using Mapped = RowRefListType; @@ -164,7 +165,7 @@ using HashTableVariants = std::variant< class VExprContext; class HashJoinNode; -template +template struct ProcessHashTableProbe { ProcessHashTableProbe(HashJoinNode* join_node, int batch_size); @@ -220,19 +221,17 @@ struct ProcessHashTableProbe { static constexpr int PROBE_SIDE_EXPLODE_RATE = 3; }; -using HashTableCtxVariants = std::variant< - std::monostate, - ProcessHashTableProbe>, - ProcessHashTableProbe>, - ProcessHashTableProbe>, - ProcessHashTableProbe>, - ProcessHashTableProbe>, - ProcessHashTableProbe>, - ProcessHashTableProbe>, - ProcessHashTableProbe>, - ProcessHashTableProbe>, - ProcessHashTableProbe< - std::integral_constant>>; +using HashTableCtxVariants = + std::variant, + ProcessHashTableProbe, + ProcessHashTableProbe, + ProcessHashTableProbe, + ProcessHashTableProbe, + ProcessHashTableProbe, + ProcessHashTableProbe, + ProcessHashTableProbe, + ProcessHashTableProbe, + ProcessHashTableProbe>; class HashJoinNode final : public VJoinNodeBase { public: @@ -358,7 +357,7 @@ private: template friend struct ProcessHashTableBuild; - template + template friend struct ProcessHashTableProbe; template diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 7a9a55ebc1..a9c741969a 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -256,7 +256,9 @@ VDataStreamRecvr::VDataStreamRecvr( bool is_merging, int total_buffer_limit, RuntimeProfile* profile, std::shared_ptr sub_plan_query_statistics_recvr) : _mgr(stream_mgr), +#ifdef USE_MEM_TRACKER _state(state), +#endif _fragment_instance_id(fragment_instance_id), _dest_node_id(dest_node_id), _total_buffer_limit(total_buffer_limit), diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index 254d85185c..6d38ecb955 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -98,7 +98,9 @@ private: // DataStreamMgr instance used to create this recvr. (Not owned) VDataStreamMgr* _mgr; +#ifdef USE_MEM_TRACKER RuntimeState* _state; +#endif // Fragment and node id of the destination exchange node this receiver is used by. TUniqueId _fragment_instance_id;