From cd3450bd9d0f04b33af8dff736941915a2c1bc32 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Tue, 18 Oct 2022 12:37:17 +0800 Subject: [PATCH] [Improvement](join) optimize join probing phase (#13357) --- be/src/vec/columns/column.h | 8 +- be/src/vec/columns/column_array.cpp | 10 +- be/src/vec/columns/column_array.h | 3 +- be/src/vec/columns/column_complex.h | 12 +- be/src/vec/columns/column_const.cpp | 3 +- be/src/vec/columns/column_const.h | 3 +- be/src/vec/columns/column_decimal.cpp | 9 +- be/src/vec/columns/column_decimal.h | 3 +- be/src/vec/columns/column_jsonb.cpp | 11 +- be/src/vec/columns/column_jsonb.h | 3 +- be/src/vec/columns/column_nullable.cpp | 8 +- be/src/vec/columns/column_nullable.h | 3 +- be/src/vec/columns/column_string.cpp | 11 +- be/src/vec/columns/column_string.h | 3 +- be/src/vec/columns/column_vector.cpp | 8 +- be/src/vec/columns/column_vector.h | 3 +- be/src/vec/exec/join/vhash_join_node.cpp | 1089 +++++++++++----------- be/src/vec/exec/join/vhash_join_node.h | 107 ++- 18 files changed, 722 insertions(+), 575 deletions(-) diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h index ab6c103e86..46b93f6e2d 100644 --- a/be/src/vec/columns/column.h +++ b/be/src/vec/columns/column.h @@ -428,7 +428,13 @@ public: */ virtual Ptr replicate(const Offsets& offsets) const = 0; - virtual void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const { + /** Copies each element according offsets parameter. + * (i-th element should be copied counts[i] times.) + * If `begin` and `count_sz` specified, it means elements in range [`begin`, `begin` + `count_sz`) will be replicated. + * If `count_sz` is -1, `begin` must be 0. + */ + virtual void replicate(const uint32_t* counts, size_t target_size, IColumn& column, + size_t begin = 0, int count_sz = -1) const { LOG(FATAL) << "not support"; }; diff --git a/be/src/vec/columns/column_array.cpp b/be/src/vec/columns/column_array.cpp index 9d57a8f003..413c490b96 100644 --- a/be/src/vec/columns/column_array.cpp +++ b/be/src/vec/columns/column_array.cpp @@ -528,17 +528,19 @@ ColumnPtr ColumnArray::replicate(const IColumn::Offsets& replicate_offsets) cons return replicate_generic(replicate_offsets); } -void ColumnArray::replicate(const uint32_t* counts, size_t target_size, IColumn& column) const { - size_t col_size = size(); +void ColumnArray::replicate(const uint32_t* counts, size_t target_size, IColumn& column, + size_t begin, int count_sz) const { + size_t col_size = count_sz < 0 ? size() : count_sz; if (col_size == 0) { return; } IColumn::Offsets replicate_offsets(col_size); size_t cur_offset = 0; - for (size_t i = 0; i < col_size; ++i) { + size_t end = begin + col_size; + for (size_t i = begin; i < end; ++i) { cur_offset += counts[i]; - replicate_offsets[i] = cur_offset; + replicate_offsets[i - begin] = cur_offset; } if (cur_offset != target_size) { LOG(WARNING) << "ColumnArray replicate input target_size:" << target_size diff --git a/be/src/vec/columns/column_array.h b/be/src/vec/columns/column_array.h index 2fc0194e69..043cf5f629 100644 --- a/be/src/vec/columns/column_array.h +++ b/be/src/vec/columns/column_array.h @@ -120,7 +120,8 @@ public: size_t allocated_bytes() const override; void protect() override; ColumnPtr replicate(const IColumn::Offsets& replicate_offsets) const override; - void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override; + void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0, + int count_sz = -1) const override; ColumnPtr convert_to_full_column_if_const() const override; void get_extremes(Field& min, Field& max) const override { LOG(FATAL) << "get_extremes not implemented"; diff --git a/be/src/vec/columns/column_complex.h b/be/src/vec/columns/column_complex.h index ed79fc1cdd..0b950713fb 100644 --- a/be/src/vec/columns/column_complex.h +++ b/be/src/vec/columns/column_complex.h @@ -236,7 +236,8 @@ public: ColumnPtr replicate(const IColumn::Offsets& replicate_offsets) const override; - void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override; + void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0, + int count_sz = -1) const override; [[noreturn]] MutableColumns scatter(IColumn::ColumnIndex num_columns, const IColumn::Selector& selector) const override { @@ -348,16 +349,17 @@ ColumnPtr ColumnComplexType::replicate(const IColumn::Offsets& offsets) const } template -void ColumnComplexType::replicate(const uint32_t* counts, size_t target_size, - IColumn& column) const { - size_t size = data.size(); +void ColumnComplexType::replicate(const uint32_t* counts, size_t target_size, IColumn& column, + size_t begin, int count_sz) const { + size_t size = count_sz < 0 ? data.size() : count_sz; if (0 == size) return; auto& res = reinterpret_cast&>(column); typename Self::Container& res_data = res.get_data(); res_data.reserve(target_size); - for (size_t i = 0; i < size; ++i) { + size_t end = size + begin; + for (size_t i = begin; i < end; ++i) { size_t size_to_replicate = counts[i]; for (size_t j = 0; j < size_to_replicate; ++j) { res_data.push_back(data[i]); diff --git a/be/src/vec/columns/column_const.cpp b/be/src/vec/columns/column_const.cpp index dc7142693e..4720b4cd02 100644 --- a/be/src/vec/columns/column_const.cpp +++ b/be/src/vec/columns/column_const.cpp @@ -67,7 +67,8 @@ ColumnPtr ColumnConst::replicate(const Offsets& offsets) const { return ColumnConst::create(data, replicated_size); } -void ColumnConst::replicate(const uint32_t* counts, size_t target_size, IColumn& column) const { +void ColumnConst::replicate(const uint32_t* counts, size_t target_size, IColumn& column, + size_t begin, int count_sz) const { if (s == 0) return; auto& res = reinterpret_cast(column); res.s = s; diff --git a/be/src/vec/columns/column_const.h b/be/src/vec/columns/column_const.h index f001150bee..9637a0943f 100644 --- a/be/src/vec/columns/column_const.h +++ b/be/src/vec/columns/column_const.h @@ -138,7 +138,8 @@ public: ColumnPtr filter(const Filter& filt, ssize_t result_size_hint) const override; ColumnPtr replicate(const Offsets& offsets) const override; - void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override; + void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0, + int count_sz = -1) const override; ColumnPtr permute(const Permutation& perm, size_t limit) const override; // ColumnPtr index(const IColumn & indexes, size_t limit) const override; void get_permutation(bool reverse, size_t limit, int nan_direction_hint, diff --git a/be/src/vec/columns/column_decimal.cpp b/be/src/vec/columns/column_decimal.cpp index e9a7dcc7d2..e74d9972be 100644 --- a/be/src/vec/columns/column_decimal.cpp +++ b/be/src/vec/columns/column_decimal.cpp @@ -347,16 +347,17 @@ ColumnPtr ColumnDecimal::replicate(const IColumn::Offsets& offsets) const { } template -void ColumnDecimal::replicate(const uint32_t* counts, size_t target_size, - IColumn& column) const { - size_t size = data.size(); +void ColumnDecimal::replicate(const uint32_t* counts, size_t target_size, IColumn& column, + size_t begin, int count_sz) const { + size_t size = count_sz < 0 ? data.size() : count_sz; if (0 == size) return; auto& res = reinterpret_cast&>(column); typename Self::Container& res_data = res.get_data(); res_data.reserve(target_size); - for (size_t i = 0; i < size; ++i) { + size_t end = size + begin; + for (size_t i = begin; i < end; ++i) { res_data.add_num_element_without_reserve(data[i], counts[i]); } } diff --git a/be/src/vec/columns/column_decimal.h b/be/src/vec/columns/column_decimal.h index 124a7e7ba2..8f6520e92c 100644 --- a/be/src/vec/columns/column_decimal.h +++ b/be/src/vec/columns/column_decimal.h @@ -191,7 +191,8 @@ public: ColumnPtr replicate(const IColumn::Offsets& offsets) const override; - void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override; + void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0, + int count_sz = -1) const override; void get_extremes(Field& min, Field& max) const override; diff --git a/be/src/vec/columns/column_jsonb.cpp b/be/src/vec/columns/column_jsonb.cpp index 040d0977e9..bc904dcb80 100644 --- a/be/src/vec/columns/column_jsonb.cpp +++ b/be/src/vec/columns/column_jsonb.cpp @@ -332,8 +332,9 @@ ColumnPtr ColumnJsonb::replicate(const Offsets& replicate_offsets) const { return res; } -void ColumnJsonb::replicate(const uint32_t* counts, size_t target_size, IColumn& column) const { - size_t col_size = size(); +void ColumnJsonb::replicate(const uint32_t* counts, size_t target_size, IColumn& column, + size_t begin, int count_sz) const { + size_t col_size = count_sz < 0 ? size() : count_sz; if (0 == col_size) return; auto& res = reinterpret_cast(column); @@ -343,10 +344,12 @@ void ColumnJsonb::replicate(const uint32_t* counts, size_t target_size, IColumn& res_chars.reserve(chars.size() / col_size * target_size); res_offsets.reserve(target_size); - Offset prev_json_offset = 0; + size_t base = begin > 0 ? offset_at(begin - 1) : 0; + Offset prev_json_offset = 0 + base; Offset current_new_offset = 0; - for (size_t i = 0; i < col_size; ++i) { + size_t end = begin + col_size; + for (size_t i = begin; i < end; ++i) { size_t size_to_replicate = counts[i]; size_t json_size = offsets[i] - prev_json_offset; diff --git a/be/src/vec/columns/column_jsonb.h b/be/src/vec/columns/column_jsonb.h index e6cc7961bc..58789d0783 100644 --- a/be/src/vec/columns/column_jsonb.h +++ b/be/src/vec/columns/column_jsonb.h @@ -252,7 +252,8 @@ public: ColumnPtr replicate(const Offsets& replicate_offsets) const override; - void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override; + void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0, + int count_sz = -1) const override; MutableColumns scatter(ColumnIndex num_columns, const Selector& selector) const override { return scatter_impl(num_columns, selector); diff --git a/be/src/vec/columns/column_nullable.cpp b/be/src/vec/columns/column_nullable.cpp index 401ee02a9e..5328131605 100644 --- a/be/src/vec/columns/column_nullable.cpp +++ b/be/src/vec/columns/column_nullable.cpp @@ -530,10 +530,12 @@ ColumnPtr ColumnNullable::replicate(const Offsets& offsets) const { return ColumnNullable::create(replicated_data, replicated_null_map); } -void ColumnNullable::replicate(const uint32_t* counts, size_t target_size, IColumn& column) const { +void ColumnNullable::replicate(const uint32_t* counts, size_t target_size, IColumn& column, + size_t begin, int count_sz) const { auto& res = reinterpret_cast(column); - get_nested_column().replicate(counts, target_size, res.get_nested_column()); - get_null_map_column().replicate(counts, target_size, res.get_null_map_column()); + get_nested_column().replicate(counts, target_size, res.get_nested_column(), begin, count_sz); + get_null_map_column().replicate(counts, target_size, res.get_null_map_column(), begin, + count_sz); } template diff --git a/be/src/vec/columns/column_nullable.h b/be/src/vec/columns/column_nullable.h index 906686676d..2a51aef0d7 100644 --- a/be/src/vec/columns/column_nullable.h +++ b/be/src/vec/columns/column_nullable.h @@ -162,7 +162,8 @@ public: size_t allocated_bytes() const override; void protect() override; ColumnPtr replicate(const Offsets& replicate_offsets) const override; - void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override; + void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0, + int count_sz = -1) const override; void update_hash_with_value(size_t n, SipHash& hash) const override; void update_hashes_with_value(std::vector& hashes, const uint8_t* __restrict null_data) const override; diff --git a/be/src/vec/columns/column_string.cpp b/be/src/vec/columns/column_string.cpp index 276fb4fcec..0d702ef89c 100644 --- a/be/src/vec/columns/column_string.cpp +++ b/be/src/vec/columns/column_string.cpp @@ -402,8 +402,9 @@ ColumnPtr ColumnString::replicate(const Offsets& replicate_offsets) const { return res; } -void ColumnString::replicate(const uint32_t* counts, size_t target_size, IColumn& column) const { - size_t col_size = size(); +void ColumnString::replicate(const uint32_t* counts, size_t target_size, IColumn& column, + size_t begin, int count_sz) const { + size_t col_size = count_sz < 0 ? size() : count_sz; if (0 == col_size) { return; } @@ -415,10 +416,12 @@ void ColumnString::replicate(const uint32_t* counts, size_t target_size, IColumn res_chars.reserve(chars.size() / col_size * target_size); res_offsets.reserve(target_size); - Offset prev_string_offset = 0; + size_t base = begin > 0 ? offsets[begin - 1] : 0; + Offset prev_string_offset = 0 + base; Offset current_new_offset = 0; - for (size_t i = 0; i < col_size; ++i) { + size_t end = begin + col_size; + for (size_t i = begin; i < end; ++i) { size_t size_to_replicate = counts[i]; size_t string_size = offsets[i] - prev_string_offset; diff --git a/be/src/vec/columns/column_string.h b/be/src/vec/columns/column_string.h index a2065b4837..cd70e228b6 100644 --- a/be/src/vec/columns/column_string.h +++ b/be/src/vec/columns/column_string.h @@ -377,7 +377,8 @@ public: ColumnPtr replicate(const Offsets& replicate_offsets) const override; - void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override; + void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0, + int count_sz = -1) const override; MutableColumns scatter(ColumnIndex num_columns, const Selector& selector) const override { return scatter_impl(num_columns, selector); diff --git a/be/src/vec/columns/column_vector.cpp b/be/src/vec/columns/column_vector.cpp index 4f95fa2593..4bd84f8856 100644 --- a/be/src/vec/columns/column_vector.cpp +++ b/be/src/vec/columns/column_vector.cpp @@ -478,15 +478,17 @@ ColumnPtr ColumnVector::replicate(const IColumn::Offsets& offsets) const { } template -void ColumnVector::replicate(const uint32_t* counts, size_t target_size, IColumn& column) const { - size_t size = data.size(); +void ColumnVector::replicate(const uint32_t* counts, size_t target_size, IColumn& column, + size_t begin, int count_sz) const { + size_t size = count_sz < 0 ? data.size() : count_sz; if (size == 0) return; auto& res = reinterpret_cast&>(column); typename Self::Container& res_data = res.get_data(); res_data.reserve(target_size); - for (size_t i = 0; i < size; ++i) { + size_t end = begin + size; + for (size_t i = begin; i < end; ++i) { res_data.add_num_element_without_reserve(data[i], counts[i]); } } diff --git a/be/src/vec/columns/column_vector.h b/be/src/vec/columns/column_vector.h index 3163fd8bb0..211e225e07 100644 --- a/be/src/vec/columns/column_vector.h +++ b/be/src/vec/columns/column_vector.h @@ -339,7 +339,8 @@ public: ColumnPtr replicate(const IColumn::Offsets& offsets) const override; - void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override; + void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0, + int count_sz = -1) const override; void get_extremes(Field& min, Field& max) const override; diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 2993fbe889..41332b6850 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -186,556 +186,568 @@ private: HashJoinNode* _join_node; }; -template -struct ProcessHashTableProbe { - ProcessHashTableProbe(HashJoinNode* join_node, int batch_size, int probe_rows) - : _join_node(join_node), - _batch_size(batch_size), - _probe_rows(probe_rows), - _build_blocks(join_node->_build_blocks), - _probe_block(join_node->_probe_block), - _probe_index(join_node->_probe_index), - _probe_raw_ptrs(join_node->_probe_columns), - _items_counts(join_node->_items_counts), - _build_block_offsets(join_node->_build_block_offsets), - _build_block_rows(join_node->_build_block_rows), - _tuple_is_null_left_flags( - reinterpret_cast(*join_node->_tuple_is_null_left_flag_column) - .get_data()), - _tuple_is_null_right_flags( - reinterpret_cast(*join_node->_tuple_is_null_right_flag_column) - .get_data()), - _rows_returned_counter(join_node->_rows_returned_counter), - _search_hashtable_timer(join_node->_search_hashtable_timer), - _build_side_output_timer(join_node->_build_side_output_timer), - _probe_side_output_timer(join_node->_probe_side_output_timer) {} +template +ProcessHashTableProbe::ProcessHashTableProbe(HashJoinNode* join_node, + int batch_size) + : _join_node(join_node), + _batch_size(batch_size), + _build_blocks(join_node->_build_blocks), + _tuple_is_null_left_flags( + reinterpret_cast(*join_node->_tuple_is_null_left_flag_column) + .get_data()), + _tuple_is_null_right_flags( + reinterpret_cast(*join_node->_tuple_is_null_right_flag_column) + .get_data()), + _rows_returned_counter(join_node->_rows_returned_counter), + _search_hashtable_timer(join_node->_search_hashtable_timer), + _build_side_output_timer(join_node->_build_side_output_timer), + _probe_side_output_timer(join_node->_probe_side_output_timer) {} - // output build side result column - template - void build_side_output_column(MutableColumns& mcol, int column_offset, int column_length, - const std::vector& output_slot_flags, int size) { - constexpr auto is_semi_anti_join = JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN || - JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN || - JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN || - JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN; +template +template +void ProcessHashTableProbe::build_side_output_column( + MutableColumns& mcol, int column_offset, int column_length, + const std::vector& output_slot_flags, int size) { + constexpr auto is_semi_anti_join = JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN || + JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN || + JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN || + JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN; - constexpr auto probe_all = JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || - JoinOpType::value == TJoinOp::FULL_OUTER_JOIN; + constexpr auto probe_all = JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || + JoinOpType::value == TJoinOp::FULL_OUTER_JOIN; - if constexpr (!is_semi_anti_join || have_other_join_conjunct) { - if (_build_blocks.size() == 1) { - for (int i = 0; i < column_length; i++) { - auto& column = *_build_blocks[0].get_by_position(i).column; - if (output_slot_flags[i]) { - mcol[i + column_offset]->insert_indices_from( - column, _build_block_rows.data(), _build_block_rows.data() + size); - } else { - mcol[i + column_offset]->resize(size); - } - } - } else { - for (int i = 0; i < column_length; i++) { - if (output_slot_flags[i]) { - for (int j = 0; j < size; j++) { - if constexpr (probe_all) { - if (_build_block_offsets[j] == -1) { - DCHECK(mcol[i + column_offset]->is_nullable()); - assert_cast(mcol[i + column_offset].get()) - ->insert_default(); - } else { - auto& column = *_build_blocks[_build_block_offsets[j]] - .get_by_position(i) - .column; - mcol[i + column_offset]->insert_from(column, - _build_block_rows[j]); - } - } else { - if (_build_block_offsets[j] == -1) { - // the only case to reach here: - // 1. left anti join with other conjuncts, and - // 2. equal conjuncts does not match - // since nullptr is emplaced back to visited_map, - // the output value of the build side does not matter, - // just insert default value - mcol[i + column_offset]->insert_default(); - } else { - auto& column = *_build_blocks[_build_block_offsets[j]] - .get_by_position(i) - .column; - mcol[i + column_offset]->insert_from(column, - _build_block_rows[j]); - } - } - } - } else { - mcol[i + column_offset]->resize(size); - } - } - } - } - - // Dispose right tuple is null flags columns - if constexpr (probe_all && !have_other_join_conjunct) { - _tuple_is_null_right_flags.resize(size); - auto* __restrict null_data = _tuple_is_null_right_flags.data(); - for (int i = 0; i < size; ++i) { - null_data[i] = _build_block_rows[i] == -1; - } - } - } - - // output probe side result column - template - void probe_side_output_column(MutableColumns& mcol, const std::vector& output_slot_flags, - int size) { - for (int i = 0; i < output_slot_flags.size(); ++i) { - if (output_slot_flags[i]) { - auto& column = _probe_block.get_by_position(i).column; - column->replicate(&_items_counts[0], size, *mcol[i]); - } else { - mcol[i]->resize(size); - } - } - - if constexpr (JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN && !have_other_join_conjunct) { - _tuple_is_null_left_flags.resize_fill(size, 0); - } - } - // Only process the join with no other join conjunt, because of no other join conjunt - // the output block struct is same with mutable block. we can do more opt on it and simplify - // the logic of probe - // TODO: opt the visited here to reduce the size of hash table - Status do_process(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, - MutableBlock& mutable_block, Block* output_block) { - using KeyGetter = typename HashTableContext::State; - using Mapped = typename HashTableContext::Mapped; - - int right_col_idx = - _join_node->_is_right_semi_anti ? 0 : _join_node->_left_table_data_types.size(); - int right_col_len = _join_node->_right_table_data_types.size(); - - KeyGetter key_getter(_probe_raw_ptrs, _join_node->_probe_key_sz, nullptr); - auto& mcol = mutable_block.mutable_columns(); - int current_offset = 0; - - _items_counts.resize(_probe_rows); - _build_block_offsets.resize(_batch_size); - _build_block_rows.resize(_batch_size); - memset(_items_counts.data(), 0, sizeof(uint32_t) * _probe_rows); - - constexpr auto need_to_set_visited = JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN || - JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN || - JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN || - JoinOpType::value == TJoinOp::FULL_OUTER_JOIN; - - constexpr auto is_right_semi_anti_join = JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN || - JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN; - - constexpr auto probe_all = JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || - JoinOpType::value == TJoinOp::FULL_OUTER_JOIN; - - { - SCOPED_TIMER(_search_hashtable_timer); - while (_probe_index < _probe_rows) { - if constexpr (ignore_null) { - if ((*null_map)[_probe_index]) { - _items_counts[_probe_index++] = (uint32_t)0; - continue; - } - } - int last_offset = current_offset; - auto find_result = (*null_map)[_probe_index] - ? decltype(key_getter.find_key(hash_table_ctx.hash_table, - _probe_index, - _arena)) {nullptr, false} - : key_getter.find_key(hash_table_ctx.hash_table, - _probe_index, _arena); - if (_probe_index + PREFETCH_STEP < _probe_rows) - key_getter.template prefetch(hash_table_ctx.hash_table, - _probe_index + PREFETCH_STEP, _arena); - - if constexpr (JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) { - if (!find_result.is_found()) { - ++current_offset; - } - } else if constexpr (JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN) { - if (find_result.is_found()) { - ++current_offset; - } + if constexpr (!is_semi_anti_join || have_other_join_conjunct) { + if (_build_blocks.size() == 1) { + for (int i = 0; i < column_length; i++) { + auto& column = *_build_blocks[0].get_by_position(i).column; + if (output_slot_flags[i]) { + mcol[i + column_offset]->insert_indices_from(column, _build_block_rows.data(), + _build_block_rows.data() + size); } else { - if (find_result.is_found()) { - auto& mapped = find_result.get_mapped(); - // TODO: Iterators are currently considered to be a heavy operation and have a certain impact on performance. - // We should rethink whether to use this iterator mode in the future. Now just opt the one row case - if (mapped.get_row_count() == 1) { - if constexpr (need_to_set_visited) mapped.visited = true; - - if constexpr (!is_right_semi_anti_join) { - _build_block_offsets[current_offset] = mapped.block_offset; - _build_block_rows[current_offset] = mapped.row_num; - ++current_offset; + mcol[i + column_offset]->resize(size); + } + } + } else { + for (int i = 0; i < column_length; i++) { + if (output_slot_flags[i]) { + for (int j = 0; j < size; j++) { + if constexpr (probe_all) { + if (_build_block_offsets[j] == -1) { + DCHECK(mcol[i + column_offset]->is_nullable()); + assert_cast(mcol[i + column_offset].get()) + ->insert_default(); + } else { + auto& column = *_build_blocks[_build_block_offsets[j]] + .get_by_position(i) + .column; + mcol[i + column_offset]->insert_from(column, _build_block_rows[j]); } } else { - for (auto it = mapped.begin(); it.ok(); ++it) { - if constexpr (!is_right_semi_anti_join) { - if (current_offset < _batch_size) { - _build_block_offsets[current_offset] = it->block_offset; - _build_block_rows[current_offset] = it->row_num; - } else { - _build_block_offsets.emplace_back(it->block_offset); - _build_block_rows.emplace_back(it->row_num); - } - ++current_offset; - } - if constexpr (need_to_set_visited) it->visited = true; + if (_build_block_offsets[j] == -1) { + // the only case to reach here: + // 1. left anti join with other conjuncts, and + // 2. equal conjuncts does not match + // since nullptr is emplaced back to visited_map, + // the output value of the build side does not matter, + // just insert default value + mcol[i + column_offset]->insert_default(); + } else { + auto& column = *_build_blocks[_build_block_offsets[j]] + .get_by_position(i) + .column; + mcol[i + column_offset]->insert_from(column, _build_block_rows[j]); } } - } else { - if constexpr (probe_all) { - // only full outer / left outer need insert the data of right table - _build_block_offsets[current_offset] = -1; - _build_block_rows[current_offset] = -1; - ++current_offset; - } } - } - - _items_counts[_probe_index++] = (uint32_t)(current_offset - last_offset); - if (current_offset >= _batch_size) { - break; + } else { + mcol[i + column_offset]->resize(size); } } } - - { - SCOPED_TIMER(_build_side_output_timer); - build_side_output_column(mcol, right_col_idx, right_col_len, - _join_node->_right_output_slot_flags, current_offset); - } - - if constexpr (JoinOpType::value != TJoinOp::RIGHT_SEMI_JOIN && - JoinOpType::value != TJoinOp::RIGHT_ANTI_JOIN) { - SCOPED_TIMER(_probe_side_output_timer); - probe_side_output_column(mcol, _join_node->_left_output_slot_flags, current_offset); - } - - output_block->swap(mutable_block.to_block()); - - return Status::OK(); } - // In the presence of other join conjunt, the process of join become more complicated. - // each matching join column need to be processed by other join conjunt. so the sturct of mutable block - // and output block may be different - // The output result is determined by the other join conjunt result and same_to_prev struct - Status do_process_with_other_join_conjunts(HashTableContext& hash_table_ctx, - ConstNullMapPtr null_map, - MutableBlock& mutable_block, Block* output_block) { - using KeyGetter = typename HashTableContext::State; - using Mapped = typename HashTableContext::Mapped; - KeyGetter key_getter(_probe_raw_ptrs, _join_node->_probe_key_sz, nullptr); - int right_col_idx = _join_node->_left_table_data_types.size(); - int right_col_len = _join_node->_right_table_data_types.size(); + // Dispose right tuple is null flags columns + if constexpr (probe_all && !have_other_join_conjunct) { + _tuple_is_null_right_flags.resize(size); + auto* __restrict null_data = _tuple_is_null_right_flags.data(); + for (int i = 0; i < size; ++i) { + null_data[i] = _build_block_rows[i] == -1; + } + } +} - auto& mcol = mutable_block.mutable_columns(); - // use in right join to change visited state after - // exec the vother join conjunt - std::vector visited_map; - visited_map.reserve(1.2 * _batch_size); +template +template +void ProcessHashTableProbe::probe_side_output_column( + MutableColumns& mcol, const std::vector& output_slot_flags, int size, + int last_probe_index, size_t probe_size, bool all_match_one) { + auto& probe_block = _join_node->_probe_block; + for (int i = 0; i < output_slot_flags.size(); ++i) { + if (output_slot_flags[i]) { + auto& column = probe_block.get_by_position(i).column; + if (all_match_one) { + DCHECK_EQ(probe_size, column->size() - last_probe_index); + mcol[i]->insert_range_from(*column, last_probe_index, probe_size); + } else { + DCHECK_GE(_items_counts.size(), last_probe_index + probe_size); + column->replicate(&_items_counts[0], size, *mcol[i], last_probe_index, probe_size); + } + } else { + mcol[i]->resize(size); + } + } - std::vector same_to_prev; - same_to_prev.reserve(1.2 * _batch_size); + if constexpr (JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN && !have_other_join_conjunct) { + _tuple_is_null_left_flags.resize_fill(size, 0); + } +} - _items_counts.resize(_probe_rows); - _build_block_offsets.resize(_batch_size); - _build_block_rows.resize(_batch_size); - memset(_items_counts.data(), 0, sizeof(uint32_t) * _probe_rows); +template +template +Status ProcessHashTableProbe::do_process(HashTableType& hash_table_ctx, + ConstNullMapPtr null_map, + MutableBlock& mutable_block, + Block* output_block, + size_t probe_rows) { + auto& probe_index = _join_node->_probe_index; + auto& probe_raw_ptrs = _join_node->_probe_columns; + if (probe_index == 0 && _items_counts.size() < probe_rows) { + _items_counts.resize(probe_rows); + } - int current_offset = 0; + if (_build_block_rows.size() < probe_rows * PROBE_SIDE_EXPLODE_RATE) { + _build_block_rows.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE); + _build_block_offsets.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE); + } + using KeyGetter = typename HashTableType::State; + using Mapped = typename HashTableType::Mapped; - while (_probe_index < _probe_rows) { - // ignore null rows + int right_col_idx = + _join_node->_is_right_semi_anti ? 0 : _join_node->_left_table_data_types.size(); + int right_col_len = _join_node->_right_table_data_types.size(); + + KeyGetter key_getter(probe_raw_ptrs, _join_node->_probe_key_sz, nullptr); + auto& mcol = mutable_block.mutable_columns(); + int current_offset = 0; + + constexpr auto need_to_set_visited = JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN || + JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN || + JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN || + JoinOpType::value == TJoinOp::FULL_OUTER_JOIN; + + constexpr auto is_right_semi_anti_join = JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN || + JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN; + + constexpr auto probe_all = JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || + JoinOpType::value == TJoinOp::FULL_OUTER_JOIN; + + bool all_match_one = true; + int last_probe_index = probe_index; + { + SCOPED_TIMER(_search_hashtable_timer); + while (probe_index < probe_rows) { if constexpr (ignore_null) { - if ((*null_map)[_probe_index]) { - _items_counts[_probe_index++] = (uint32_t)0; + if ((*null_map)[probe_index]) { + _items_counts[probe_index++] = (uint32_t)0; + all_match_one = false; continue; } } - - auto last_offset = current_offset; + int last_offset = current_offset; auto find_result = - (*null_map)[_probe_index] - ? decltype(key_getter.find_key(hash_table_ctx.hash_table, _probe_index, + (*null_map)[probe_index] + ? decltype(key_getter.find_key(hash_table_ctx.hash_table, probe_index, _arena)) {nullptr, false} - : key_getter.find_key(hash_table_ctx.hash_table, _probe_index, _arena); - if (_probe_index + PREFETCH_STEP < _probe_rows) + : key_getter.find_key(hash_table_ctx.hash_table, probe_index, _arena); + if (probe_index + PREFETCH_STEP < probe_rows) key_getter.template prefetch(hash_table_ctx.hash_table, - _probe_index + PREFETCH_STEP, _arena); - if (find_result.is_found()) { - auto& mapped = find_result.get_mapped(); - auto origin_offset = current_offset; - // TODO: Iterators are currently considered to be a heavy operation and have a certain impact on performance. - // We should rethink whether to use this iterator mode in the future. Now just opt the one row case - if (mapped.get_row_count() == 1) { - _build_block_offsets[current_offset] = mapped.block_offset; - _build_block_rows[current_offset] = mapped.row_num; + probe_index + PREFETCH_STEP, _arena); + + if constexpr (JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) { + if (!find_result.is_found()) { ++current_offset; - visited_map.emplace_back(&mapped.visited); + } + } else if constexpr (JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN) { + if (find_result.is_found()) { + ++current_offset; + } + } else { + if (find_result.is_found()) { + auto& mapped = find_result.get_mapped(); + // TODO: Iterators are currently considered to be a heavy operation and have a certain impact on performance. + // We should rethink whether to use this iterator mode in the future. Now just opt the one row case + if (mapped.get_row_count() == 1) { + if constexpr (need_to_set_visited) mapped.visited = true; + + if constexpr (!is_right_semi_anti_join) { + if (LIKELY(current_offset < _build_block_rows.size())) { + _build_block_offsets[current_offset] = mapped.block_offset; + _build_block_rows[current_offset] = mapped.row_num; + } else { + _build_block_offsets.emplace_back(mapped.block_offset); + _build_block_rows.emplace_back(mapped.row_num); + } + ++current_offset; + } + } else { + for (auto it = mapped.begin(); it.ok(); ++it) { + if constexpr (!is_right_semi_anti_join) { + if (LIKELY(current_offset < _build_block_rows.size())) { + _build_block_offsets[current_offset] = it->block_offset; + _build_block_rows[current_offset] = it->row_num; + } else { + _build_block_offsets.emplace_back(it->block_offset); + _build_block_rows.emplace_back(it->row_num); + } + ++current_offset; + } + if constexpr (need_to_set_visited) it->visited = true; + } + } } else { - for (auto it = mapped.begin(); it.ok(); ++it) { - if (current_offset < _batch_size) { - _build_block_offsets[current_offset] = it->block_offset; - _build_block_rows[current_offset] = it->row_num; + if constexpr (probe_all) { + // only full outer / left outer need insert the data of right table + if (LIKELY(current_offset < _build_block_rows.size())) { + _build_block_offsets[current_offset] = -1; + _build_block_rows[current_offset] = -1; } else { - _build_block_offsets.emplace_back(it->block_offset); - _build_block_rows.emplace_back(it->row_num); + _build_block_offsets.emplace_back(-1); + _build_block_rows.emplace_back(-1); } ++current_offset; - visited_map.emplace_back(&it->visited); } } - same_to_prev.emplace_back(false); - for (int i = 0; i < current_offset - origin_offset - 1; ++i) { - same_to_prev.emplace_back(true); - } - } else if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || - JoinOpType::value == TJoinOp::FULL_OUTER_JOIN || - JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) { - same_to_prev.emplace_back(false); - visited_map.emplace_back(nullptr); - // only full outer / left outer need insert the data of right table - // left anti use -1 use a default value - _build_block_offsets[current_offset] = -1; - _build_block_rows[current_offset] = -1; - ++current_offset; - } else { - // other join, no nothing } - _items_counts[_probe_index++] = (uint32_t)(current_offset - last_offset); - if (current_offset >= _batch_size) { + uint32_t count = (uint32_t)(current_offset - last_offset); + _items_counts[probe_index++] = count; + all_match_one &= (count == 1); + if (current_offset >= _batch_size && !all_match_one) { break; } } - - { - SCOPED_TIMER(_build_side_output_timer); - build_side_output_column(mcol, right_col_idx, right_col_len, - _join_node->_right_output_slot_flags, current_offset); - } - { - SCOPED_TIMER(_probe_side_output_timer); - probe_side_output_column(mcol, _join_node->_left_output_slot_flags, - current_offset); - } - output_block->swap(mutable_block.to_block()); - - // dispose the other join conjunt exec - if (output_block->rows()) { - int result_column_id = -1; - int orig_columns = output_block->columns(); - (*_join_node->_vother_join_conjunct_ptr)->execute(output_block, &result_column_id); - - auto column = output_block->get_by_position(result_column_id).column; - if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || - JoinOpType::value == TJoinOp::FULL_OUTER_JOIN) { - auto new_filter_column = ColumnVector::create(); - auto& filter_map = new_filter_column->get_data(); - - auto null_map_column = ColumnVector::create(column->size(), 0); - auto* __restrict null_map_data = null_map_column->get_data().data(); - - for (int i = 0; i < column->size(); ++i) { - auto join_hit = visited_map[i] != nullptr; - auto other_hit = column->get_bool(i); - - if (!other_hit) { - for (size_t j = 0; j < right_col_len; ++j) { - typeid_cast( - std::move(*output_block->get_by_position(j + right_col_idx) - .column) - .assume_mutable() - .get()) - ->get_null_map_data()[i] = true; - } - } - null_map_data[i] = !join_hit || !other_hit; - - if (join_hit) { - *visited_map[i] |= other_hit; - filter_map.push_back(other_hit || !same_to_prev[i] || - (!column->get_bool(i - 1) && filter_map.back())); - // Here to keep only hit join conjunt and other join conjunt is true need to be output. - // if not, only some key must keep one row will output will null right table column - if (same_to_prev[i] && filter_map.back() && !column->get_bool(i - 1)) - filter_map[i - 1] = false; - } else { - filter_map.push_back(true); - } - } - - for (int i = 0; i < column->size(); ++i) { - if (filter_map[i]) { - _tuple_is_null_right_flags.emplace_back(null_map_data[i]); - } - } - output_block->get_by_position(result_column_id).column = - std::move(new_filter_column); - } else if constexpr (JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN) { - auto new_filter_column = ColumnVector::create(); - auto& filter_map = new_filter_column->get_data(); - - if (!column->empty()) filter_map.emplace_back(column->get_bool(0)); - for (int i = 1; i < column->size(); ++i) { - if (column->get_bool(i) || (same_to_prev[i] && filter_map[i - 1])) { - // Only last same element is true, output last one - filter_map.push_back(true); - filter_map[i - 1] = !same_to_prev[i] && filter_map[i - 1]; - } else { - filter_map.push_back(false); - } - } - - output_block->get_by_position(result_column_id).column = - std::move(new_filter_column); - } else if constexpr (JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) { - auto new_filter_column = ColumnVector::create(); - auto& filter_map = new_filter_column->get_data(); - - if (!column->empty()) - filter_map.emplace_back(column->get_bool(0) && visited_map[0]); - for (int i = 1; i < column->size(); ++i) { - if ((visited_map[i] && column->get_bool(i)) || - (same_to_prev[i] && filter_map[i - 1])) { - filter_map.push_back(true); - filter_map[i - 1] = !same_to_prev[i] && filter_map[i - 1]; - } else { - filter_map.push_back(false); - } - } - - // Same to the semi join, but change the last value to opposite value - for (int i = 1; i < same_to_prev.size(); ++i) { - if (!same_to_prev[i]) filter_map[i - 1] = !filter_map[i - 1]; - } - filter_map[same_to_prev.size() - 1] = !filter_map[same_to_prev.size() - 1]; - - output_block->get_by_position(result_column_id).column = - std::move(new_filter_column); - } else if constexpr (JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN || - JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN) { - for (int i = 0; i < column->size(); ++i) { - DCHECK(visited_map[i]); - *visited_map[i] |= column->get_bool(i); - } - } else if constexpr (JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN) { - auto filter_size = 0; - for (int i = 0; i < column->size(); ++i) { - DCHECK(visited_map[i]); - auto result = column->get_bool(i); - *visited_map[i] |= result; - filter_size += result; - } - _tuple_is_null_left_flags.resize_fill(filter_size, 0); - } else { - // inner join do nothing - } - - if constexpr (JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN || - JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN) { - output_block->clear(); - } else { - if constexpr (JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN || - JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) - orig_columns = right_col_idx; - Block::filter_block(output_block, result_column_id, orig_columns); - } - } - - return Status::OK(); } - // Process full outer join/ right join / right semi/anti join to output the join result - // in hash table - Status process_data_in_hashtable(HashTableContext& hash_table_ctx, MutableBlock& mutable_block, - Block* output_block, bool* eos) { - hash_table_ctx.init_once(); - auto& mcol = mutable_block.mutable_columns(); + { + SCOPED_TIMER(_build_side_output_timer); + build_side_output_column(mcol, right_col_idx, right_col_len, + _join_node->_right_output_slot_flags, current_offset); + } - bool right_semi_anti_without_other = - _join_node->_is_right_semi_anti && !_join_node->_have_other_join_conjunct; - int right_col_idx = - right_semi_anti_without_other ? 0 : _join_node->_left_table_data_types.size(); - int right_col_len = _join_node->_right_table_data_types.size(); + if constexpr (JoinOpType::value != TJoinOp::RIGHT_SEMI_JOIN && + JoinOpType::value != TJoinOp::RIGHT_ANTI_JOIN) { + SCOPED_TIMER(_probe_side_output_timer); + probe_side_output_column(mcol, _join_node->_left_output_slot_flags, current_offset, + last_probe_index, probe_index - last_probe_index, all_match_one); + } - auto& iter = hash_table_ctx.iter; - auto block_size = 0; + output_block->swap(mutable_block.to_block()); - auto insert_from_hash_table = [&](uint8_t offset, uint32_t row_num) { - block_size++; - for (size_t j = 0; j < right_col_len; ++j) { - auto& column = *_build_blocks[offset].get_by_position(j).column; - mcol[j + right_col_idx]->insert_from(column, row_num); + return Status::OK(); +} + +template +template +Status ProcessHashTableProbe::do_process_with_other_join_conjunts( + HashTableType& hash_table_ctx, ConstNullMapPtr null_map, MutableBlock& mutable_block, + Block* output_block, size_t probe_rows) { + auto& probe_index = _join_node->_probe_index; + auto& probe_raw_ptrs = _join_node->_probe_columns; + if (probe_index == 0 && _items_counts.size() < probe_rows) { + _items_counts.resize(probe_rows); + } + if (_build_block_rows.size() < probe_rows * PROBE_SIDE_EXPLODE_RATE) { + _build_block_rows.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE); + _build_block_offsets.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE); + } + + using KeyGetter = typename HashTableType::State; + using Mapped = typename HashTableType::Mapped; + KeyGetter key_getter(probe_raw_ptrs, _join_node->_probe_key_sz, nullptr); + + int right_col_idx = _join_node->_left_table_data_types.size(); + int right_col_len = _join_node->_right_table_data_types.size(); + + auto& mcol = mutable_block.mutable_columns(); + // use in right join to change visited state after + // exec the vother join conjunt + std::vector visited_map; + visited_map.reserve(1.2 * _batch_size); + + std::vector same_to_prev; + same_to_prev.reserve(1.2 * _batch_size); + + int current_offset = 0; + + bool all_match_one = true; + int last_probe_index = probe_index; + while (probe_index < probe_rows) { + // ignore null rows + if constexpr (ignore_null) { + if ((*null_map)[probe_index]) { + _items_counts[probe_index++] = (uint32_t)0; + continue; } - }; + } - for (; iter != hash_table_ctx.hash_table.end() && block_size < _batch_size; ++iter) { - auto& mapped = iter->get_second(); - for (auto it = mapped.begin(); it.ok(); ++it) { - if constexpr (JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN) { - if (it->visited) insert_from_hash_table(it->block_offset, it->row_num); + auto last_offset = current_offset; + auto find_result = + (*null_map)[probe_index] + ? decltype(key_getter.find_key(hash_table_ctx.hash_table, probe_index, + _arena)) {nullptr, false} + : key_getter.find_key(hash_table_ctx.hash_table, probe_index, _arena); + if (probe_index + PREFETCH_STEP < probe_rows) + key_getter.template prefetch(hash_table_ctx.hash_table, + probe_index + PREFETCH_STEP, _arena); + if (find_result.is_found()) { + auto& mapped = find_result.get_mapped(); + auto origin_offset = current_offset; + // TODO: Iterators are currently considered to be a heavy operation and have a certain impact on performance. + // We should rethink whether to use this iterator mode in the future. Now just opt the one row case + if (mapped.get_row_count() == 1) { + if (LIKELY(current_offset < _build_block_rows.size())) { + _build_block_offsets[current_offset] = mapped.block_offset; + _build_block_rows[current_offset] = mapped.row_num; } else { - if (!it->visited) insert_from_hash_table(it->block_offset, it->row_num); + _build_block_offsets.emplace_back(mapped.block_offset); + _build_block_rows.emplace_back(mapped.row_num); + } + ++current_offset; + visited_map.emplace_back(&mapped.visited); + } else { + for (auto it = mapped.begin(); it.ok(); ++it) { + if (LIKELY(current_offset < _build_block_rows.size())) { + _build_block_offsets[current_offset] = it->block_offset; + _build_block_rows[current_offset] = it->row_num; + } else { + _build_block_offsets.emplace_back(it->block_offset); + _build_block_rows.emplace_back(it->row_num); + } + ++current_offset; + visited_map.emplace_back(&it->visited); } } - } - - // just resize the left table column in case with other conjunct to make block size is not zero - if (_join_node->_is_right_semi_anti && _join_node->_have_other_join_conjunct) { - auto target_size = mcol[right_col_idx]->size(); - for (int i = 0; i < right_col_idx; ++i) { - mcol[i]->resize(target_size); + same_to_prev.emplace_back(false); + for (int i = 0; i < current_offset - origin_offset - 1; ++i) { + same_to_prev.emplace_back(true); } - } - - // right outer join / full join need insert data of left table - if constexpr (JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN || - JoinOpType::value == TJoinOp::FULL_OUTER_JOIN) { - for (int i = 0; i < right_col_idx; ++i) { - assert_cast(mcol[i].get())->insert_many_defaults(block_size); + } else if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || + JoinOpType::value == TJoinOp::FULL_OUTER_JOIN || + JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) { + same_to_prev.emplace_back(false); + visited_map.emplace_back(nullptr); + // only full outer / left outer need insert the data of right table + // left anti use -1 use a default value + if (LIKELY(current_offset < _build_block_rows.size())) { + _build_block_offsets[current_offset] = -1; + _build_block_rows[current_offset] = -1; + } else { + _build_block_offsets.emplace_back(-1); + _build_block_rows.emplace_back(-1); } - _tuple_is_null_left_flags.resize_fill(block_size, 1); + ++current_offset; + } else { + // other join, no nothing } - *eos = iter == hash_table_ctx.hash_table.end(); - output_block->swap( - mutable_block.to_block(right_semi_anti_without_other ? right_col_idx : 0)); - return Status::OK(); + uint32_t count = (uint32_t)(current_offset - last_offset); + _items_counts[probe_index++] = count; + all_match_one &= (count == 1); + if (current_offset >= _batch_size && !all_match_one) { + break; + } } -private: - HashJoinNode* _join_node; - const int _batch_size; - const size_t _probe_rows; - const std::vector& _build_blocks; - const Block& _probe_block; - int& _probe_index; - ColumnRawPtrs& _probe_raw_ptrs; - Arena _arena; + { + SCOPED_TIMER(_build_side_output_timer); + build_side_output_column(mcol, right_col_idx, right_col_len, + _join_node->_right_output_slot_flags, current_offset); + } + { + SCOPED_TIMER(_probe_side_output_timer); + probe_side_output_column(mcol, _join_node->_left_output_slot_flags, current_offset, + last_probe_index, probe_index - last_probe_index, + all_match_one); + } + output_block->swap(mutable_block.to_block()); - std::vector& _items_counts; - std::vector& _build_block_offsets; - std::vector& _build_block_rows; - // only need set the tuple is null in RIGHT_OUTER_JOIN and FULL_OUTER_JOIN - ColumnUInt8::Container& _tuple_is_null_left_flags; - // only need set the tuple is null in LEFT_OUTER_JOIN and FULL_OUTER_JOIN - ColumnUInt8::Container& _tuple_is_null_right_flags; + // dispose the other join conjunt exec + if (output_block->rows()) { + int result_column_id = -1; + int orig_columns = output_block->columns(); + (*_join_node->_vother_join_conjunct_ptr)->execute(output_block, &result_column_id); - ProfileCounter* _rows_returned_counter; - ProfileCounter* _search_hashtable_timer; - ProfileCounter* _build_side_output_timer; - ProfileCounter* _probe_side_output_timer; -}; + auto column = output_block->get_by_position(result_column_id).column; + if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || + JoinOpType::value == TJoinOp::FULL_OUTER_JOIN) { + auto new_filter_column = ColumnVector::create(); + auto& filter_map = new_filter_column->get_data(); + + auto null_map_column = ColumnVector::create(column->size(), 0); + auto* __restrict null_map_data = null_map_column->get_data().data(); + + for (int i = 0; i < column->size(); ++i) { + auto join_hit = visited_map[i] != nullptr; + auto other_hit = column->get_bool(i); + + if (!other_hit) { + for (size_t j = 0; j < right_col_len; ++j) { + typeid_cast( + std::move(*output_block->get_by_position(j + right_col_idx).column) + .assume_mutable() + .get()) + ->get_null_map_data()[i] = true; + } + } + null_map_data[i] = !join_hit || !other_hit; + + if (join_hit) { + *visited_map[i] |= other_hit; + filter_map.push_back(other_hit || !same_to_prev[i] || + (!column->get_bool(i - 1) && filter_map.back())); + // Here to keep only hit join conjunt and other join conjunt is true need to be output. + // if not, only some key must keep one row will output will null right table column + if (same_to_prev[i] && filter_map.back() && !column->get_bool(i - 1)) + filter_map[i - 1] = false; + } else { + filter_map.push_back(true); + } + } + + for (int i = 0; i < column->size(); ++i) { + if (filter_map[i]) { + _tuple_is_null_right_flags.emplace_back(null_map_data[i]); + } + } + output_block->get_by_position(result_column_id).column = std::move(new_filter_column); + } else if constexpr (JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN) { + auto new_filter_column = ColumnVector::create(); + auto& filter_map = new_filter_column->get_data(); + + if (!column->empty()) filter_map.emplace_back(column->get_bool(0)); + for (int i = 1; i < column->size(); ++i) { + if (column->get_bool(i) || (same_to_prev[i] && filter_map[i - 1])) { + // Only last same element is true, output last one + filter_map.push_back(true); + filter_map[i - 1] = !same_to_prev[i] && filter_map[i - 1]; + } else { + filter_map.push_back(false); + } + } + + output_block->get_by_position(result_column_id).column = std::move(new_filter_column); + } else if constexpr (JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) { + auto new_filter_column = ColumnVector::create(); + auto& filter_map = new_filter_column->get_data(); + + if (!column->empty()) filter_map.emplace_back(column->get_bool(0) && visited_map[0]); + for (int i = 1; i < column->size(); ++i) { + if ((visited_map[i] && column->get_bool(i)) || + (same_to_prev[i] && filter_map[i - 1])) { + filter_map.push_back(true); + filter_map[i - 1] = !same_to_prev[i] && filter_map[i - 1]; + } else { + filter_map.push_back(false); + } + } + + // Same to the semi join, but change the last value to opposite value + for (int i = 1; i < same_to_prev.size(); ++i) { + if (!same_to_prev[i]) filter_map[i - 1] = !filter_map[i - 1]; + } + filter_map[same_to_prev.size() - 1] = !filter_map[same_to_prev.size() - 1]; + + output_block->get_by_position(result_column_id).column = std::move(new_filter_column); + } else if constexpr (JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN || + JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN) { + for (int i = 0; i < column->size(); ++i) { + DCHECK(visited_map[i]); + *visited_map[i] |= column->get_bool(i); + } + } else if constexpr (JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN) { + auto filter_size = 0; + for (int i = 0; i < column->size(); ++i) { + DCHECK(visited_map[i]); + auto result = column->get_bool(i); + *visited_map[i] |= result; + filter_size += result; + } + _tuple_is_null_left_flags.resize_fill(filter_size, 0); + } else { + // inner join do nothing + } + + if constexpr (JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN || + JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN) { + output_block->clear(); + } else { + if constexpr (JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN || + JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) + orig_columns = right_col_idx; + Block::filter_block(output_block, result_column_id, orig_columns); + } + } + + return Status::OK(); +} + +template +template +Status ProcessHashTableProbe::process_data_in_hashtable( + HashTableType& hash_table_ctx, MutableBlock& mutable_block, Block* output_block, + bool* eos) { + hash_table_ctx.init_once(); + auto& mcol = mutable_block.mutable_columns(); + + bool right_semi_anti_without_other = + _join_node->_is_right_semi_anti && !_join_node->_have_other_join_conjunct; + int right_col_idx = + right_semi_anti_without_other ? 0 : _join_node->_left_table_data_types.size(); + int right_col_len = _join_node->_right_table_data_types.size(); + + auto& iter = hash_table_ctx.iter; + auto block_size = 0; + + auto insert_from_hash_table = [&](uint8_t offset, uint32_t row_num) { + block_size++; + for (size_t j = 0; j < right_col_len; ++j) { + auto& column = *_build_blocks[offset].get_by_position(j).column; + mcol[j + right_col_idx]->insert_from(column, row_num); + } + }; + + for (; iter != hash_table_ctx.hash_table.end() && block_size < _batch_size; ++iter) { + auto& mapped = iter->get_second(); + for (auto it = mapped.begin(); it.ok(); ++it) { + if constexpr (JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN) { + if (it->visited) insert_from_hash_table(it->block_offset, it->row_num); + } else { + if (!it->visited) insert_from_hash_table(it->block_offset, it->row_num); + } + } + } + + // just resize the left table column in case with other conjunct to make block size is not zero + if (_join_node->_is_right_semi_anti && _join_node->_have_other_join_conjunct) { + auto target_size = mcol[right_col_idx]->size(); + for (int i = 0; i < right_col_idx; ++i) { + mcol[i]->resize(target_size); + } + } + + // right outer join / full join need insert data of left table + if constexpr (JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN || + JoinOpType::value == TJoinOp::FULL_OUTER_JOIN) { + for (int i = 0; i < right_col_idx; ++i) { + assert_cast(mcol[i].get())->insert_many_defaults(block_size); + } + _tuple_is_null_left_flags.resize_fill(block_size, 1); + } + *eos = iter == hash_table_ctx.hash_table.end(); + + output_block->swap(mutable_block.to_block(right_semi_anti_without_other ? right_col_idx : 0)); + return Status::OK(); +} HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : ExecNode(pool, tnode, descs), @@ -816,6 +828,10 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { null_aware || (_probe_expr_ctxs.back()->root()->is_nullable() && probe_dispose_null)); } + for (size_t i = 0; i < _probe_expr_ctxs.size(); ++i) { + _probe_ignore_null |= !_probe_not_ignore_null[i]; + } + _probe_column_disguise_null.reserve(eq_join_conjuncts.size()); if (tnode.hash_join_node.__isset.vother_join_conjunct) { @@ -931,10 +947,9 @@ Status HashJoinNode::prepare(RuntimeState* state) { // Hash Table Init _hash_table_init(); + _process_hashtable_ctx_variants_init(state); _construct_mutable_join_block(); - _build_block_offsets.resize(state->batch_size()); - _build_block_rows.resize(state->batch_size()); return Status::OK(); } @@ -994,7 +1009,7 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo if constexpr (!std::is_same_v) { auto& null_map_val = _null_map_column->get_data(); return _extract_probe_join_column(_probe_block, null_map_val, - _probe_columns, _probe_ignore_null, + _probe_columns, *_probe_expr_call_timer); } else { LOG(FATAL) << "FATAL: uninited hash table"; @@ -1014,51 +1029,51 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo if (_probe_index < _probe_block.rows()) { std::visit( - [&](auto&& arg, auto&& join_op_variants, auto have_other_join_conjunct, - auto probe_ignore_null) { - using HashTableCtxType = std::decay_t; - using JoinOpType = std::decay_t; - if constexpr (have_other_join_conjunct) { - if constexpr (!std::is_same_v) { - ProcessHashTableProbe - process_hashtable_ctx(this, state->batch_size(), probe_rows); - st = process_hashtable_ctx.do_process_with_other_join_conjunts( - arg, &_null_map_column->get_data(), mutable_join_block, - &temp_block); + [&](auto&& arg, auto&& process_hashtable_ctx, auto have_other_join_conjunct) { + using HashTableProbeType = std::decay_t; + if constexpr (!std::is_same_v) { + using HashTableCtxType = std::decay_t; + if constexpr (have_other_join_conjunct) { + if constexpr (!std::is_same_v) { + st = process_hashtable_ctx.do_process_with_other_join_conjunts( + arg, &_null_map_column->get_data(), mutable_join_block, + &temp_block, probe_rows); + } else { + LOG(FATAL) << "FATAL: uninited hash table"; + } } else { - LOG(FATAL) << "FATAL: uninited hash table"; + if constexpr (!std::is_same_v) { + st = process_hashtable_ctx.do_process( + arg, &_null_map_column->get_data(), mutable_join_block, + &temp_block, probe_rows); + } else { + LOG(FATAL) << "FATAL: uninited hash table"; + } } } else { - if constexpr (!std::is_same_v) { - ProcessHashTableProbe - process_hashtable_ctx(this, state->batch_size(), probe_rows); - st = process_hashtable_ctx.do_process(arg, - &_null_map_column->get_data(), - mutable_join_block, &temp_block); - } else { - LOG(FATAL) << "FATAL: uninited hash table"; - } + LOG(FATAL) << "FATAL: uninited hash probe"; } }, - _hash_table_variants, _join_op_variants, - make_bool_variant(_have_other_join_conjunct), - make_bool_variant(_probe_ignore_null)); + _hash_table_variants, _process_hashtable_ctx_variants, + make_bool_variant(_have_other_join_conjunct)); } else if (_probe_eos) { if (_is_right_semi_anti || (_is_outer_join && _join_op != TJoinOp::LEFT_OUTER_JOIN)) { std::visit( - [&](auto&& arg, auto&& join_op_variants) { - using JoinOpType = std::decay_t; - using HashTableCtxType = std::decay_t; - if constexpr (!std::is_same_v) { - ProcessHashTableProbe - process_hashtable_ctx(this, state->batch_size(), probe_rows); - st = process_hashtable_ctx.process_data_in_hashtable( - arg, mutable_join_block, &temp_block, eos); + [&](auto&& arg, auto&& process_hashtable_ctx) { + using HashTableProbeType = std::decay_t; + if constexpr (!std::is_same_v) { + using HashTableCtxType = std::decay_t; + if constexpr (!std::is_same_v) { + st = process_hashtable_ctx.process_data_in_hashtable( + arg, mutable_join_block, &temp_block, eos); + } else { + LOG(FATAL) << "FATAL: uninited hash table"; + } } else { LOG(FATAL) << "FATAL: uninited hash table"; } }, - _hash_table_variants, _join_op_variants); + _hash_table_variants, _process_hashtable_ctx_variants); } else { *eos = true; return Status::OK(); @@ -1257,7 +1272,7 @@ Status HashJoinNode::_extract_build_join_column(Block& block, NullMap& null_map, } Status HashJoinNode::_extract_probe_join_column(Block& block, NullMap& null_map, - ColumnRawPtrs& raw_ptrs, bool& ignore_null, + ColumnRawPtrs& raw_ptrs, RuntimeProfile::Counter& expr_call_timer) { for (size_t i = 0; i < _probe_expr_ctxs.size(); ++i) { int result_col_id = -1; @@ -1279,7 +1294,6 @@ Status HashJoinNode::_extract_probe_join_column(Block& block, NullMap& null_map, auto& col_nested = nullable->get_nested_column(); auto& col_nullmap = nullable->get_null_map_data(); - ignore_null |= !_probe_not_ignore_null[i]; VectorizedUtils::update_null_map(null_map, col_nullmap); if (_build_not_ignore_null[i]) { raw_ptrs[i] = nullable; @@ -1458,6 +1472,17 @@ void HashJoinNode::_hash_table_init() { } } +void HashJoinNode::_process_hashtable_ctx_variants_init(RuntimeState* state) { + std::visit( + [&](auto&& join_op_variants, auto probe_ignore_null) { + using JoinOpType = std::decay_t; + _process_hashtable_ctx_variants + .emplace>( + this, state->batch_size()); + }, + _join_op_variants, make_bool_variant(_probe_ignore_null)); +} + std::vector HashJoinNode::_convert_block_to_null(Block& block) { std::vector results; for (int i = 0; i < block.columns(); ++i) { diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index 923999626d..011bc244a5 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -151,6 +151,101 @@ using JoinOpVariants = M(NULL_AWARE_LEFT_ANTI_JOIN) class VExprContext; +class HashJoinNode; + +template +struct ProcessHashTableProbe { + ProcessHashTableProbe(HashJoinNode* join_node, int batch_size); + + // output build side result column + template + void build_side_output_column(MutableColumns& mcol, int column_offset, int column_length, + const std::vector& output_slot_flags, int size); + + template + void probe_side_output_column(MutableColumns& mcol, const std::vector& output_slot_flags, + int size, int last_probe_index, size_t probe_size, + bool all_match_one); + // Only process the join with no other join conjunt, because of no other join conjunt + // the output block struct is same with mutable block. we can do more opt on it and simplify + // the logic of probe + // TODO: opt the visited here to reduce the size of hash table + template + Status do_process(HashTableType& hash_table_ctx, ConstNullMapPtr null_map, + MutableBlock& mutable_block, Block* output_block, size_t probe_rows); + // In the presence of other join conjunt, the process of join become more complicated. + // each matching join column need to be processed by other join conjunt. so the sturct of mutable block + // and output block may be different + // The output result is determined by the other join conjunt result and same_to_prev struct + template + Status do_process_with_other_join_conjunts(HashTableType& hash_table_ctx, + ConstNullMapPtr null_map, + MutableBlock& mutable_block, Block* output_block, + size_t probe_rows); + + // Process full outer join/ right join / right semi/anti join to output the join result + // in hash table + template + Status process_data_in_hashtable(HashTableType& hash_table_ctx, MutableBlock& mutable_block, + Block* output_block, bool* eos); + + vectorized::HashJoinNode* _join_node; + const int _batch_size; + const std::vector& _build_blocks; + Arena _arena; + + std::vector _items_counts; + std::vector _build_block_offsets; + std::vector _build_block_rows; + // only need set the tuple is null in RIGHT_OUTER_JOIN and FULL_OUTER_JOIN + ColumnUInt8::Container& _tuple_is_null_left_flags; + // only need set the tuple is null in LEFT_OUTER_JOIN and FULL_OUTER_JOIN + ColumnUInt8::Container& _tuple_is_null_right_flags; + + RuntimeProfile::Counter* _rows_returned_counter; + RuntimeProfile::Counter* _search_hashtable_timer; + RuntimeProfile::Counter* _build_side_output_timer; + RuntimeProfile::Counter* _probe_side_output_timer; + + static constexpr int PROBE_SIDE_EXPLODE_RATE = 3; +}; + +using HashTableCtxVariants = std::variant< + std::monostate, + ProcessHashTableProbe, true>, + ProcessHashTableProbe, true>, + ProcessHashTableProbe, true>, + ProcessHashTableProbe, + true>, + ProcessHashTableProbe, + true>, + ProcessHashTableProbe, + true>, + ProcessHashTableProbe, true>, + ProcessHashTableProbe, + true>, + ProcessHashTableProbe, + true>, + ProcessHashTableProbe< + std::integral_constant, true>, + ProcessHashTableProbe, false>, + ProcessHashTableProbe, + false>, + ProcessHashTableProbe, + false>, + ProcessHashTableProbe, + false>, + ProcessHashTableProbe, + false>, + ProcessHashTableProbe, + false>, + ProcessHashTableProbe, false>, + ProcessHashTableProbe, + false>, + ProcessHashTableProbe, + false>, + ProcessHashTableProbe< + std::integral_constant, false>>; class HashJoinNode : public ::doris::ExecNode { public: @@ -224,6 +319,8 @@ private: Arena _arena; HashTableVariants _hash_table_variants; + HashTableCtxVariants _process_hashtable_ctx_variants; + std::vector _build_blocks; Block _probe_block; ColumnRawPtrs _probe_columns; @@ -246,10 +343,6 @@ private: Block _join_block; - std::vector _items_counts; - std::vector _build_block_offsets; - std::vector _build_block_rows; - std::vector _hash_output_slot_ids; std::vector _left_output_slot_flags; std::vector _right_output_slot_flags; @@ -260,7 +353,6 @@ private: MutableColumnPtr _tuple_is_null_left_flag_column; MutableColumnPtr _tuple_is_null_right_flag_column; -private: void _hash_table_build_thread(RuntimeState* state, std::promise* status); Status _hash_table_build(RuntimeState* state); @@ -271,9 +363,10 @@ private: bool& ignore_null, RuntimeProfile::Counter& expr_call_timer); Status _extract_probe_join_column(Block& block, NullMap& null_map, ColumnRawPtrs& raw_ptrs, - bool& ignore_null, RuntimeProfile::Counter& expr_call_timer); + RuntimeProfile::Counter& expr_call_timer); void _hash_table_init(); + void _process_hashtable_ctx_variants_init(RuntimeState* state); static constexpr auto _MAX_BUILD_BLOCK_COUNT = 128; @@ -294,7 +387,7 @@ private: template friend struct ProcessHashTableBuild; - template + template friend struct ProcessHashTableProbe; template