diff --git a/be/src/vec/exec/join/process_hash_table_probe.h b/be/src/vec/exec/join/process_hash_table_probe.h index 0863e68ec5..6b95b6e3e0 100644 --- a/be/src/vec/exec/join/process_hash_table_probe.h +++ b/be/src/vec/exec/join/process_hash_table_probe.h @@ -67,6 +67,12 @@ struct ProcessHashTableProbe { MutableBlock& mutable_block, Block* output_block, size_t probe_rows, bool is_mark_join); + void _process_splited_equal_matched_tuples(int start_row_idx, int row_count, + const ColumnPtr& other_hit_column, + std::vector& visited_map, int right_col_idx, + int right_col_len, UInt8* __restrict null_map_data, + UInt8* __restrict filter_map, Block* output_block); + // Process full outer join/ right join / right semi/anti join to output the join result // in hash table template diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h b/be/src/vec/exec/join/process_hash_table_probe_impl.h index cde05d72ae..1e43152e1d 100644 --- a/be/src/vec/exec/join/process_hash_table_probe_impl.h +++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h @@ -19,6 +19,7 @@ #include "common/status.h" #include "process_hash_table_probe.h" +#include "util/simd/bits.h" #include "vhash_join_node.h" namespace doris::vectorized { @@ -462,95 +463,162 @@ Status ProcessHashTableProbe::do_process_with_other_join_conjuncts( bool all_match_one = true; int last_probe_index = probe_index; - while (probe_index < probe_rows) { - // ignore null rows - if constexpr (ignore_null && need_null_map_for_probe) { - if ((*null_map)[probe_index]) { - if constexpr (probe_all) { - _items_counts[probe_index++] = (uint32_t)1; - same_to_prev.emplace_back(false); - visited_map.emplace_back(nullptr); - // only full outer / left outer need insert the data of right table - if (LIKELY(current_offset < _build_block_rows.size())) { - _build_block_offsets[current_offset] = -1; - _build_block_rows[current_offset] = -1; - } else { - _build_block_offsets.emplace_back(-1); - _build_block_rows.emplace_back(-1); - } - ++current_offset; - } else { - _items_counts[probe_index++] = (uint32_t)0; - } - all_match_one = false; - continue; - } - } - auto last_offset = current_offset; - auto find_result = - !need_null_map_for_probe - ? key_getter.find_key(hash_table_ctx.hash_table, probe_index, *_arena) - : (*null_map)[probe_index] - ? decltype(key_getter.find_key(hash_table_ctx.hash_table, probe_index, - *_arena)) {nullptr, false} - : key_getter.find_key(hash_table_ctx.hash_table, probe_index, *_arena); - if (probe_index + PREFETCH_STEP < probe_rows) { - key_getter.template prefetch(hash_table_ctx.hash_table, - probe_index + PREFETCH_STEP, *_arena); - } - if (find_result.is_found()) { - auto& mapped = find_result.get_mapped(); - auto origin_offset = current_offset; - // TODO: Iterators are currently considered to be a heavy operation and have a certain impact on performance. - // We should rethink whether to use this iterator mode in the future. Now just opt the one row case - if (mapped.get_row_count() == 1) { - if (LIKELY(current_offset < _build_block_rows.size())) { - _build_block_offsets[current_offset] = mapped.block_offset; - _build_block_rows[current_offset] = mapped.row_num; - } else { - _build_block_offsets.emplace_back(mapped.block_offset); - _build_block_rows.emplace_back(mapped.row_num); - } - ++current_offset; - visited_map.emplace_back(&mapped.visited); - } else { - for (auto it = mapped.begin(); it.ok(); ++it) { - if (LIKELY(current_offset < _build_block_rows.size())) { - _build_block_offsets[current_offset] = it->block_offset; - _build_block_rows[current_offset] = it->row_num; - } else { - _build_block_offsets.emplace_back(it->block_offset); - _build_block_rows.emplace_back(it->row_num); - } - ++current_offset; - visited_map.emplace_back(&it->visited); - } - } - same_to_prev.emplace_back(false); - for (int i = 0; i < current_offset - origin_offset - 1; ++i) { - same_to_prev.emplace_back(true); - } - } else if constexpr (JoinOpType == TJoinOp::LEFT_OUTER_JOIN || - JoinOpType == TJoinOp::FULL_OUTER_JOIN || - JoinOpType == TJoinOp::LEFT_ANTI_JOIN || - JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - same_to_prev.emplace_back(false); - visited_map.emplace_back(nullptr); - // only full outer / left outer need insert the data of right table - // left anti use -1 use a default value + int row_count_from_last_probe = 0; + bool is_the_last_sub_block = false; + size_t probe_size = 0; + auto& probe_row_match_iter = + std::get>(_join_node->_probe_row_match_iter); + if (probe_row_match_iter.ok()) { + auto origin_offset = current_offset; + for (; probe_row_match_iter.ok() && current_offset < _batch_size; + ++probe_row_match_iter) { if (LIKELY(current_offset < _build_block_rows.size())) { - _build_block_offsets[current_offset] = -1; - _build_block_rows[current_offset] = -1; + _build_block_offsets[current_offset] = probe_row_match_iter->block_offset; + _build_block_rows[current_offset] = probe_row_match_iter->row_num; } else { - _build_block_offsets.emplace_back(-1); - _build_block_rows.emplace_back(-1); + _build_block_offsets.emplace_back(probe_row_match_iter->block_offset); + _build_block_rows.emplace_back(probe_row_match_iter->row_num); } ++current_offset; - } else if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN) { - if (is_mark_join) { + visited_map.emplace_back(&probe_row_match_iter->visited); + } + same_to_prev.emplace_back(false); + for (int i = 0; i < current_offset - origin_offset - 1; ++i) { + same_to_prev.emplace_back(true); + } + + row_count_from_last_probe = current_offset; + all_match_one &= (current_offset == 1); + _items_counts[probe_index] = current_offset; + if (!probe_row_match_iter.ok()) { + ++probe_index; + is_the_last_sub_block = true; + } + probe_size = 1; + } + int multi_matched_output_row_count = 0; + if (current_offset < _batch_size) { + while (probe_index < probe_rows) { + // ignore null rows + if constexpr (ignore_null && need_null_map_for_probe) { + if ((*null_map)[probe_index]) { + if constexpr (probe_all) { + _items_counts[probe_index++] = (uint32_t)1; + same_to_prev.emplace_back(false); + visited_map.emplace_back(nullptr); + // only full outer / left outer need insert the data of right table + if (LIKELY(current_offset < _build_block_rows.size())) { + _build_block_offsets[current_offset] = -1; + _build_block_rows[current_offset] = -1; + } else { + _build_block_offsets.emplace_back(-1); + _build_block_rows.emplace_back(-1); + } + ++current_offset; + } else { + _items_counts[probe_index++] = (uint32_t)0; + } + all_match_one = false; + if constexpr (probe_all) { + if (current_offset >= _batch_size) { + break; + } + } + continue; + } + } + + auto last_offset = current_offset; + auto find_result = !need_null_map_for_probe + ? key_getter.find_key(hash_table_ctx.hash_table, + probe_index, *_arena) + : (*null_map)[probe_index] + ? decltype(key_getter.find_key(hash_table_ctx.hash_table, + probe_index, + *_arena)) {nullptr, false} + : key_getter.find_key(hash_table_ctx.hash_table, + probe_index, *_arena); + if (probe_index + PREFETCH_STEP < probe_rows) { + key_getter.template prefetch(hash_table_ctx.hash_table, + probe_index + PREFETCH_STEP, *_arena); + } + + auto current_probe_index = probe_index; + if (find_result.is_found()) { + auto& mapped = find_result.get_mapped(); + auto origin_offset = current_offset; + // TODO: Iterators are currently considered to be a heavy operation and have a certain impact on performance. + // We should rethink whether to use this iterator mode in the future. Now just opt the one row case + if (mapped.get_row_count() == 1) { + if (LIKELY(current_offset < _build_block_rows.size())) { + _build_block_offsets[current_offset] = mapped.block_offset; + _build_block_rows[current_offset] = mapped.row_num; + } else { + _build_block_offsets.emplace_back(mapped.block_offset); + _build_block_rows.emplace_back(mapped.row_num); + } + ++current_offset; + visited_map.emplace_back(&mapped.visited); + ++probe_index; + } else { + // For mark join, if euqual-matched tuple count for one probe row + // excceeds batch size, it's difficult to implement the logic to + // split them into multiple sub blocks and handle them, keep the original + // logic for now. + if (is_mark_join) { + for (auto it = mapped.begin(); it.ok(); ++it) { + if (LIKELY(current_offset < _build_block_rows.size())) { + _build_block_offsets[current_offset] = it->block_offset; + _build_block_rows[current_offset] = it->row_num; + } else { + _build_block_offsets.emplace_back(it->block_offset); + _build_block_rows.emplace_back(it->row_num); + } + ++current_offset; + visited_map.emplace_back(&it->visited); + } + } else { + auto multi_match_last_offset = current_offset; + auto it = mapped.begin(); + // breaks if row count exceeds batch_size + for (; it.ok() && current_offset < _batch_size; ++it) { + if (LIKELY(current_offset < _build_block_rows.size())) { + _build_block_offsets[current_offset] = it->block_offset; + _build_block_rows[current_offset] = it->row_num; + } else { + _build_block_offsets.emplace_back(it->block_offset); + _build_block_rows.emplace_back(it->row_num); + } + ++current_offset; + visited_map.emplace_back(&it->visited); + } + probe_row_match_iter = it; + // If all matched rows for the current probe row are handled, + // advance to next probe row. + if (!it.ok()) { + ++probe_index; + } else { + // If not(which means it excceed batch size), probe_index is not increased and + // remaining matched rows for the current probe row will be + // handled in the next call of this function + multi_matched_output_row_count = + current_offset - multi_match_last_offset; + } + } + } + same_to_prev.emplace_back(false); + for (int i = 0; i < current_offset - origin_offset - 1; ++i) { + same_to_prev.emplace_back(true); + } + } else if constexpr (JoinOpType == TJoinOp::LEFT_OUTER_JOIN || + JoinOpType == TJoinOp::FULL_OUTER_JOIN || + JoinOpType == TJoinOp::LEFT_ANTI_JOIN || + JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { same_to_prev.emplace_back(false); visited_map.emplace_back(nullptr); + // only full outer / left outer need insert the data of right table + // left anti use -1 use a default value if (LIKELY(current_offset < _build_block_rows.size())) { _build_block_offsets[current_offset] = -1; _build_block_rows[current_offset] = -1; @@ -559,16 +627,33 @@ Status ProcessHashTableProbe::do_process_with_other_join_conjuncts( _build_block_rows.emplace_back(-1); } ++current_offset; + ++probe_index; + } else if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN) { + if (is_mark_join) { + same_to_prev.emplace_back(false); + visited_map.emplace_back(nullptr); + if (LIKELY(current_offset < _build_block_rows.size())) { + _build_block_offsets[current_offset] = -1; + _build_block_rows[current_offset] = -1; + } else { + _build_block_offsets.emplace_back(-1); + _build_block_rows.emplace_back(-1); + } + ++current_offset; + } + ++probe_index; + } else { + // other join, no nothing + ++probe_index; + } + uint32_t count = (uint32_t)(current_offset - last_offset); + _items_counts[current_probe_index] = count; + all_match_one &= (count == 1); + if (current_offset >= _batch_size) { + break; } - } else { - // other join, no nothing - } - uint32_t count = (uint32_t)(current_offset - last_offset); - _items_counts[probe_index++] = count; - all_match_one &= (count == 1); - if (current_offset >= _batch_size && !all_match_one) { - break; } + probe_size = probe_index - last_probe_index + (probe_row_match_iter.ok() ? 1 : 0); } { @@ -579,14 +664,14 @@ Status ProcessHashTableProbe::do_process_with_other_join_conjuncts( { SCOPED_TIMER(_probe_side_output_timer); probe_side_output_column(mcol, _join_node->_left_output_slot_flags, current_offset, - last_probe_index, probe_index - last_probe_index, - all_match_one, true); + last_probe_index, probe_size, all_match_one, true); } auto num_cols = mutable_block.columns(); output_block->swap(mutable_block.to_block()); // dispose the other join conjunct exec - if (output_block->rows()) { + auto row_count = output_block->rows(); + if (row_count) { int result_column_id = -1; int orig_columns = output_block->columns(); RETURN_IF_ERROR((*_join_node->_vother_join_conjunct_ptr) @@ -595,13 +680,29 @@ Status ProcessHashTableProbe::do_process_with_other_join_conjuncts( auto column = output_block->get_by_position(result_column_id).column; if constexpr (JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == TJoinOp::FULL_OUTER_JOIN) { - auto new_filter_column = ColumnVector::create(); - auto& filter_map = new_filter_column->get_data(); + auto new_filter_column = ColumnVector::create(row_count); + auto* __restrict filter_map = new_filter_column->get_data().data(); - auto null_map_column = ColumnVector::create(column->size(), 0); + auto null_map_column = ColumnVector::create(row_count, 0); auto* __restrict null_map_data = null_map_column->get_data().data(); - for (int i = 0; i < column->size(); ++i) { + // It contains non-first sub block of splited equal-conjuncts-matched tuples from last probe row + if (row_count_from_last_probe > 0) { + _process_splited_equal_matched_tuples(0, row_count_from_last_probe, column, + visited_map, right_col_idx, right_col_len, + null_map_data, filter_map, output_block); + // This is the last sub block of splitted block, and no equal-conjuncts-matched tuple + // is output in all sub blocks, need to output a tuple for this probe row + if (is_the_last_sub_block && !_join_node->_is_any_probe_match_row_output) { + filter_map[0] = true; + null_map_data[0] = true; + } + } + + int end_idx = row_count - multi_matched_output_row_count; + // process equal-conjuncts-matched tuples that are newly generated + // in this run if there are any. + for (size_t i = row_count_from_last_probe; i < end_idx; ++i) { auto join_hit = visited_map[i] != nullptr; auto other_hit = column->get_bool(i); @@ -617,21 +718,40 @@ Status ProcessHashTableProbe::do_process_with_other_join_conjuncts( } null_map_data[i] = !join_hit || !other_hit; + // For cases where one probe row matches multiple build rows for equal conjuncts, + // all the other-conjuncts-matched tuples should be output. + // + // Other-conjuncts-NOT-matched tuples fall into two categories: + // 1. The beginning consecutive one(s). + // For these tuples, only the last one is marked to output; + // If there are any following other-conjuncts-matched tuples, + // the last tuple is also marked NOT to output. + // 2. All the remaining other-conjuncts-NOT-matched tuples. + // All these tuples are marked not to output. if (join_hit) { *visited_map[i] |= other_hit; - filter_map.push_back(other_hit || !same_to_prev[i] || - (!column->get_bool(i - 1) && filter_map.back())); + filter_map[i] = other_hit || !same_to_prev[i] || + (!column->get_bool(i - 1) && filter_map[i - 1]); // Here to keep only hit join conjunct and other join conjunt is true need to be output. // if not, only some key must keep one row will output will null right table column - if (same_to_prev[i] && filter_map.back() && !column->get_bool(i - 1)) { + if (same_to_prev[i] && filter_map[i] && !column->get_bool(i - 1)) { filter_map[i - 1] = false; } } else { - filter_map.push_back(true); + filter_map[i] = true; } } - for (int i = 0; i < column->size(); ++i) { + // It contains the first sub block of splited equal-conjuncts-matched tuples of the current probe row + if (multi_matched_output_row_count > 0) { + _join_node->_is_any_probe_match_row_output = false; + _process_splited_equal_matched_tuples( + row_count - multi_matched_output_row_count, + multi_matched_output_row_count, column, visited_map, right_col_idx, + right_col_len, null_map_data, filter_map, output_block); + } + + for (size_t i = 0; i < row_count; ++i) { if (filter_map[i]) { _tuple_is_null_right_flags->emplace_back(null_map_data[i]); } @@ -639,13 +759,30 @@ Status ProcessHashTableProbe::do_process_with_other_join_conjuncts( output_block->get_by_position(result_column_id).column = std::move(new_filter_column); } else if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN) { + // TODO: resize in advance auto new_filter_column = ColumnVector::create(); auto& filter_map = new_filter_column->get_data(); - if (!column->empty()) { + size_t start_row_idx = 1; + // We are handling euqual-conjuncts matched tuples that are splitted into multiple blocks + if (row_count_from_last_probe > 0) { + if (_join_node->_is_any_probe_match_row_output) { + // if any matched tuple for this probe row is output, + // ignore all the following tuples for this probe row. + for (int row_idx = 0; row_idx < row_count_from_last_probe; ++row_idx) { + filter_map.emplace_back(false); + } + start_row_idx += row_count_from_last_probe; + if (row_count_from_last_probe < row_count) { + filter_map.emplace_back(column->get_bool(row_count_from_last_probe)); + } + } else { + filter_map.emplace_back(column->get_bool(0)); + } + } else { filter_map.emplace_back(column->get_bool(0)); } - for (int i = 1; i < column->size(); ++i) { + for (size_t i = start_row_idx; i < row_count; ++i) { if (column->get_bool(i) || (same_to_prev[i] && filter_map[i - 1])) { // Only last same element is true, output last one filter_map.push_back(true); @@ -654,6 +791,21 @@ Status ProcessHashTableProbe::do_process_with_other_join_conjuncts( filter_map.push_back(false); } } + // It contains the first sub block of splited equal-conjuncts-matched tuples of the current probe row + if (multi_matched_output_row_count > 0) { + // If a matched row is output, all the equal-matched tuples in + // the following sub blocks should be ignored + _join_node->_is_any_probe_match_row_output = filter_map[row_count - 1]; + } else if (row_count_from_last_probe > 0 && + !_join_node->_is_any_probe_match_row_output) { + // We are handling euqual-conjuncts matched tuples that are splitted into multiple blocks, + // and no matched tuple has been output in all previous run. + // If a tuple is output in this run, all the following mathced tuples should be ignored + if (filter_map[row_count_from_last_probe - 1]) { + _join_node->_is_any_probe_match_row_output = true; + } + } + if (is_mark_join) { auto& matched_map = assert_cast&>( *(output_block->get_by_position(num_cols - 1) @@ -662,7 +814,7 @@ Status ProcessHashTableProbe::do_process_with_other_join_conjuncts( // For mark join, we only filter rows which have duplicate join keys. // And then, we set matched_map to the join result to do the mark join's filtering. - for (size_t i = 1; i < column->size(); ++i) { + for (size_t i = 1; i < row_count; ++i) { if (!same_to_prev[i]) { matched_map.push_back(filter_map[i - 1]); filter_map[i - 1] = true; @@ -676,14 +828,43 @@ Status ProcessHashTableProbe::do_process_with_other_join_conjuncts( std::move(new_filter_column); } else if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN || JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - auto new_filter_column = ColumnVector::create(); - auto& filter_map = new_filter_column->get_data(); + auto new_filter_column = ColumnVector::create(row_count); + auto* __restrict filter_map = new_filter_column->get_data().data(); - if (!column->empty()) { + // for left anti join, the probe side is output only when + // there are no matched tuples for the probe row. + + // If multiple equal-conjuncts-matched tuples is splitted into several + // sub blocks, just filter out all the other-conjuncts-NOT-matched tuples at first, + // and when processing the last sub block, check whether there are any + // equal-conjuncts-matched tuple is output in all sub blocks, + // if there are none, just pick a tuple and output. + + size_t start_row_idx = 1; + // We are handling euqual-conjuncts matched tuples that are splitted into multiple blocks + if (row_count_from_last_probe > 0) { + if (_join_node->_is_any_probe_match_row_output) { + // if any matched tuple for this probe row is output, + // ignore all the following tuples for this probe row. + for (int row_idx = 0; row_idx < row_count_from_last_probe; ++row_idx) { + filter_map[row_idx] = false; + } + start_row_idx += row_count_from_last_probe; + if (row_count_from_last_probe < row_count) { + filter_map[row_count_from_last_probe] = + column->get_bool(row_count_from_last_probe) && + visited_map[row_count_from_last_probe]; + } + } else { + // Both equal conjuncts and other conjuncts are true + filter_map[0] = column->get_bool(0) && visited_map[0]; + } + } else { // Both equal conjuncts and other conjuncts are true - filter_map.emplace_back(column->get_bool(0) && visited_map[0]); + filter_map[0] = column->get_bool(0) && visited_map[0]; } - for (int i = 1; i < column->size(); ++i) { + + for (size_t i = start_row_idx; i < row_count; ++i) { if ((visited_map[i] && column->get_bool(i)) || (same_to_prev[i] && filter_map[i - 1])) { // When either of two conditions is meet: @@ -691,10 +872,10 @@ Status ProcessHashTableProbe::do_process_with_other_join_conjuncts( // 2. This row is joined from the same build side row as the previous row // Set filter_map[i] to true and filter_map[i - 1] to false if same_to_prev[i] // is true. - filter_map.push_back(true); + filter_map[i] = true; filter_map[i - 1] = !same_to_prev[i] && filter_map[i - 1]; } else { - filter_map.push_back(false); + filter_map[i] = false; } } @@ -703,22 +884,62 @@ Status ProcessHashTableProbe::do_process_with_other_join_conjuncts( *(output_block->get_by_position(num_cols - 1) .column->assume_mutable())) .get_data(); - for (int i = 1; i < same_to_prev.size(); ++i) { + for (int i = 1; i < row_count; ++i) { if (!same_to_prev[i]) { matched_map.push_back(!filter_map[i - 1]); filter_map[i - 1] = true; } } - matched_map.push_back(!filter_map[filter_map.size() - 1]); - filter_map[filter_map.size() - 1] = true; + matched_map.push_back(!filter_map[row_count - 1]); + filter_map[row_count - 1] = true; } else { + int end_row_idx; + if (row_count_from_last_probe > 0) { + end_row_idx = row_count - multi_matched_output_row_count; + if (!_join_node->_is_any_probe_match_row_output) { + // We are handling euqual-conjuncts matched tuples that are splitted into multiple blocks, + // and no matched tuple has been output in all previous run. + // If a tuple is output in this run, all the following mathced tuples should be ignored + if (filter_map[row_count_from_last_probe - 1]) { + _join_node->_is_any_probe_match_row_output = true; + filter_map[row_count_from_last_probe - 1] = false; + } + if (is_the_last_sub_block && + !_join_node->_is_any_probe_match_row_output) { + // This is the last sub block of splitted block, and no equal-conjuncts-matched tuple + // is output in all sub blocks, output a tuple for this probe row + filter_map[0] = true; + } + } + if (multi_matched_output_row_count > 0) { + // It contains the first sub block of splited equal-conjuncts-matched tuples of the current probe row + // If a matched row is output, all the equal-matched tuples in + // the following sub blocks should be ignored + _join_node->_is_any_probe_match_row_output = filter_map[row_count - 1]; + filter_map[row_count - 1] = false; + } + } else if (multi_matched_output_row_count > 0) { + end_row_idx = row_count - multi_matched_output_row_count; + // It contains the first sub block of splited equal-conjuncts-matched tuples of the current probe row + // If a matched row is output, all the equal-matched tuples in + // the following sub blocks should be ignored + _join_node->_is_any_probe_match_row_output = filter_map[row_count - 1]; + filter_map[row_count - 1] = false; + } else { + end_row_idx = row_count; + } + // Same to the semi join, but change the last value to opposite value - for (int i = 1; i < same_to_prev.size(); ++i) { + for (int i = 1 + row_count_from_last_probe; i < end_row_idx; ++i) { if (!same_to_prev[i]) { filter_map[i - 1] = !filter_map[i - 1]; } } - filter_map[same_to_prev.size() - 1] = !filter_map[same_to_prev.size() - 1]; + auto non_sub_blocks_matched_row_count = + row_count - row_count_from_last_probe - multi_matched_output_row_count; + if (non_sub_blocks_matched_row_count > 0) { + filter_map[end_row_idx - 1] = !filter_map[end_row_idx - 1]; + } } output_block->get_by_position(result_column_id).column = @@ -731,7 +952,7 @@ Status ProcessHashTableProbe::do_process_with_other_join_conjuncts( } } else if constexpr (JoinOpType == TJoinOp::RIGHT_OUTER_JOIN) { auto filter_size = 0; - for (int i = 0; i < column->size(); ++i) { + for (int i = 0; i < row_count; ++i) { DCHECK(visited_map[i]); auto result = column->get_bool(i); *visited_map[i] |= result; @@ -766,6 +987,42 @@ Status ProcessHashTableProbe::do_process_with_other_join_conjuncts( } } +// For left or full outer join with other conjuncts. +// If multiple equal-conjuncts-matched tuples is splitted into several +// sub blocks, just filter out all the other-conjuncts-NOT-matched tuples at first, +// and when processing the last sub block, check whether there are any +// equal-conjuncts-matched tuple is output in all sub blocks, +// if not, just pick a tuple and output. +template +void ProcessHashTableProbe::_process_splited_equal_matched_tuples( + int start_row_idx, int row_count, const ColumnPtr& other_hit_column, + std::vector& visited_map, int right_col_idx, int right_col_len, + UInt8* __restrict null_map_data, UInt8* __restrict filter_map, Block* output_block) { + int end_row_idx = start_row_idx + row_count; + for (int i = start_row_idx; i < end_row_idx; ++i) { + auto join_hit = visited_map[i] != nullptr; + auto other_hit = other_hit_column->get_bool(i); + + if (!other_hit) { + for (size_t j = 0; j < right_col_len; ++j) { + typeid_cast( + std::move(*output_block->get_by_position(j + right_col_idx).column) + .assume_mutable() + .get()) + ->get_null_map_data()[i] = true; + } + } + + null_map_data[i] = !join_hit || !other_hit; + filter_map[i] = other_hit; + + if (join_hit) { + *visited_map[i] |= other_hit; + } + } + _join_node->_is_any_probe_match_row_output |= simd::contain_byte(filter_map, row_count, 1); +} + template template Status ProcessHashTableProbe::process_data_in_hashtable(HashTableType& hash_table_ctx, @@ -786,7 +1043,8 @@ Status ProcessHashTableProbe::process_data_in_hashtable(HashTableTyp auto& iter = hash_table_ctx.iter; auto block_size = 0; - auto& visited_iter = _join_node->_outer_join_pull_visited_iter; + auto& visited_iter = + std::get>(_join_node->_outer_join_pull_visited_iter); auto insert_from_hash_table = [&](uint8_t offset, uint32_t row_num) { block_size++; @@ -797,7 +1055,6 @@ Status ProcessHashTableProbe::process_data_in_hashtable(HashTableTyp }; if (visited_iter.ok()) { - DCHECK((std::is_same_v)); for (; visited_iter.ok() && block_size < _batch_size; ++visited_iter) { insert_from_hash_table(visited_iter->block_offset, visited_iter->row_num); } @@ -810,11 +1067,17 @@ Status ProcessHashTableProbe::process_data_in_hashtable(HashTableTyp auto& mapped = iter->get_second(); if constexpr (std::is_same_v) { if (mapped.visited) { - for (auto it = mapped.begin(); it.ok(); ++it) { + visited_iter = mapped.begin(); + for (; visited_iter.ok() && block_size < _batch_size; ++visited_iter) { if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) { - insert_from_hash_table(it->block_offset, it->row_num); + insert_from_hash_table(visited_iter->block_offset, + visited_iter->row_num); } } + if (visited_iter.ok()) { + // block_size >= _batch_size, quit for loop + break; + } } else { if constexpr (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN) { visited_iter = mapped.begin(); @@ -829,17 +1092,24 @@ Status ProcessHashTableProbe::process_data_in_hashtable(HashTableTyp } } } else { - for (auto it = mapped.begin(); it.ok(); ++it) { + visited_iter = mapped.begin(); + for (; visited_iter.ok() && block_size < _batch_size; ++visited_iter) { if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) { - if (it->visited) { - insert_from_hash_table(it->block_offset, it->row_num); + if (visited_iter->visited) { + insert_from_hash_table(visited_iter->block_offset, + visited_iter->row_num); } } else { - if (!it->visited) { - insert_from_hash_table(it->block_offset, it->row_num); + if (!visited_iter->visited) { + insert_from_hash_table(visited_iter->block_offset, + visited_iter->row_num); } } } + if (visited_iter.ok()) { + // block_size >= _batch_size, quit for loop + break; + } } } @@ -862,6 +1132,7 @@ Status ProcessHashTableProbe::process_data_in_hashtable(HashTableTyp *eos = iter == hash_table_ctx.hash_table.end(); output_block->swap( mutable_block.to_block(right_semi_anti_without_other ? right_col_idx : 0)); + DCHECK(block_size <= _batch_size); return Status::OK(); } else { LOG(FATAL) << "Invalid RowRefList"; diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 7352e6d693..75951be756 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -458,8 +458,7 @@ void HashJoinNode::prepare_for_next() { _prepare_probe_block(); } -Status HashJoinNode::pull(doris::RuntimeState* /*state*/, vectorized::Block* output_block, - bool* eos) { +Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) { SCOPED_TIMER(_probe_timer); if (_short_circuit_for_null_in_probe_side) { // If we use a short-circuit strategy for null value in build side (e.g. if join operator is @@ -538,6 +537,8 @@ Status HashJoinNode::pull(doris::RuntimeState* /*state*/, vectorized::Block* out if (_is_outer_join) { _add_tuple_is_null_column(&temp_block); } + auto output_rows = temp_block.rows(); + DCHECK(output_rows <= state->batch_size()); { SCOPED_TIMER(_join_filter_timer); RETURN_IF_ERROR( @@ -1027,6 +1028,7 @@ void HashJoinNode::_hash_table_init(RuntimeState* state) { JoinOpType::value == TJoinOp::FULL_OUTER_JOIN, RowRefListWithFlag, RowRefList>>; _probe_row_match_iter.emplace>(); + _outer_join_pull_visited_iter.emplace>(); if (_build_expr_ctxs.size() == 1 && !_store_null_in_hash_table[0]) { // Single column optimization diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index 9287cd6cfb..1fc592daf2 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -280,8 +280,7 @@ private: std::unique_ptr _process_hashtable_ctx_variants; // for full/right outer join - ForwardIterator _outer_join_pull_visited_iter; - + HashTableIteratorVariants _outer_join_pull_visited_iter; HashTableIteratorVariants _probe_row_match_iter; std::shared_ptr> _build_blocks; @@ -309,6 +308,8 @@ private: std::vector _left_output_slot_flags; std::vector _right_output_slot_flags; + // for cases when a probe row matches more than batch size build rows. + bool _is_any_probe_match_row_output = false; uint8_t _build_block_idx = 0; int64_t _build_side_mem_used = 0; int64_t _build_side_last_mem_used = 0; diff --git a/regression-test/data/correctness_p0/test_left_anti_join_batch_size.out b/regression-test/data/correctness_p0/test_left_anti_join_batch_size.out new file mode 100644 index 0000000000..daf635f191 --- /dev/null +++ b/regression-test/data/correctness_p0/test_left_anti_join_batch_size.out @@ -0,0 +1,606 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !test_left_anti_join_batch_size -- +2 138 +4 579 +36 978 +64 597 +66 350 +99 95 +128 944 +132 907 +160 982 +161 62 +162 466 +193 192 +196 237 +199 998 +227 213 +228 655 +257 624 +259 879 +288 723 +292 926 +293 642 +327 144 +352 378 +353 223 +355 381 +385 912 +389 199 +391 684 +416 871 +421 308 +422 962 +423 716 +452 465 +454 544 +485 479 +546 234 +550 301 +578 240 +609 811 +642 364 +674 398 +705 406 +706 933 +707 520 +742 262 +743 685 +769 775 +805 919 +806 506 +834 197 +835 994 +837 913 +838 635 +866 572 +867 881 +870 674 +934 313 +965 205 +992 757 +996 775 +997 745 +1030 209 +1063 24 +1088 445 +1091 725 +1095 873 +1123 872 +1186 247 +1187 983 +1189 69 +1190 154 +1191 84 +1217 426 +1219 123 +1222 130 +1223 947 +1249 327 +1313 903 +1314 452 +1344 526 +1348 824 +1349 327 +1351 724 +1376 120 +1380 609 +1383 98 +1414 485 +1415 825 +1440 761 +1442 13 +1443 622 +1446 188 +1479 147 +1507 223 +1511 495 +1541 595 +1570 19 +1602 282 +1603 778 +1605 856 +1639 573 +1664 254 +1665 404 +1669 839 +1670 139 +1699 242 +1729 696 +1734 35 +1765 58 +1796 21 +1799 372 +1824 751 +1830 360 +1831 705 +1860 296 +1862 216 +1863 227 +1892 540 +1894 831 +1895 819 +1921 84 +1922 479 +1952 697 +1953 549 +1959 869 +1984 512 +1987 256 +1989 739 +2016 37 +2017 9 +2019 697 +2021 246 +2048 55 +2054 100 +2080 437 +2085 275 +2112 11 +2114 45 +2119 697 +2147 858 +2148 378 +2182 595 +2212 596 +2243 700 +2244 520 +2247 461 +2274 574 +2308 753 +2310 44 +2336 487 +2337 735 +2338 617 +2340 569 +2341 552 +2343 991 +2369 141 +2401 724 +2402 929 +2405 374 +2432 181 +2436 408 +2471 386 +2501 446 +2502 217 +2535 342 +2564 123 +2592 339 +2596 748 +2597 896 +2626 901 +2627 849 +2631 429 +2660 534 +2663 107 +2693 186 +2726 567 +2727 624 +2755 505 +2787 8 +2822 343 +2848 998 +2849 231 +2851 11 +2852 140 +2882 864 +2887 416 +2912 670 +2916 510 +2918 257 +2947 997 +2948 172 +2951 410 +2977 746 +2981 105 +3011 30 +3042 243 +3047 401 +3074 171 +3075 373 +3076 48 +3078 974 +3137 269 +3139 185 +3141 687 +3142 446 +3173 241 +3206 321 +3237 324 +3238 269 +3266 831 +3267 234 +3268 569 +3299 54 +3301 167 +3302 792 +3329 53 +3330 413 +3332 387 +3361 537 +3395 295 +3398 297 +3424 892 +3431 725 +3456 563 +3493 770 +3495 861 +3524 824 +3525 698 +3526 15 +3527 266 +3559 961 +3589 412 +3616 925 +3620 191 +3622 477 +3651 564 +3680 757 +3681 785 +3718 725 +3744 15 +3746 403 +3776 934 +3779 337 +3875 534 +3877 786 +3879 94 +3908 444 +3938 368 +3972 810 +3974 615 +3975 371 +4005 717 +4037 678 +4098 943 +4100 364 +4103 989 +4132 4 +4133 366 +4160 430 +4163 288 +4164 489 +4194 167 +4225 761 +4226 976 +4256 305 +4259 769 +4260 97 +4289 563 +4294 281 +4323 37 +4325 757 +4326 274 +4352 840 +4353 141 +4358 28 +4385 623 +4391 367 +4417 610 +4419 418 +4420 483 +4423 717 +4449 329 +4451 230 +4480 570 +4481 645 +4482 38 +4487 808 +4517 255 +4519 277 +4546 774 +4549 62 +4550 169 +4576 792 +4581 477 +4612 230 +4615 390 +4641 785 +4643 299 +4679 451 +4709 435 +4737 822 +4772 684 +4775 656 +4800 55 +4802 946 +4838 181 +4867 231 +4898 675 +4903 760 +4928 290 +4963 317 +4993 119 +5025 262 +5026 796 +5027 39 +5029 361 +5058 514 +5091 504 +5123 576 +5125 909 +5154 247 +5155 658 +5156 231 +5187 852 +5188 16 +5219 491 +5220 790 +5222 844 +5248 800 +5250 962 +5251 424 +5255 916 +5283 168 +5287 334 +5312 854 +5351 111 +5378 622 +5379 149 +5412 819 +5440 933 +5447 114 +5508 479 +5510 684 +5539 210 +5542 826 +5602 335 +5607 418 +5638 785 +5639 170 +5667 237 +5700 506 +5701 605 +5728 675 +5730 970 +5764 64 +5795 839 +5796 309 +5825 88 +5830 723 +5860 783 +5861 79 +5862 469 +5863 203 +5888 899 +5890 224 +5891 473 +5893 161 +5952 35 +5953 98 +5955 465 +5985 831 +5988 733 +5991 30 +6052 782 +6054 499 +6080 182 +6114 902 +6149 546 +6151 458 +6176 472 +6177 904 +6178 433 +6180 716 +6212 989 +6213 174 +6214 744 +6245 892 +6247 552 +6273 257 +6277 468 +6306 19 +6308 807 +6336 467 +6368 267 +6400 223 +6405 315 +6407 763 +6434 14 +6436 163 +6466 289 +6467 221 +6468 670 +6497 793 +6530 573 +6534 745 +6595 533 +6597 313 +6598 602 +6624 275 +6658 209 +6692 474 +6694 474 +6720 888 +6752 558 +6758 258 +6785 232 +6787 47 +6791 516 +6816 172 +6853 118 +6855 489 +6880 785 +6912 573 +6914 494 +6946 680 +6950 26 +6951 963 +6979 701 +6983 922 +7008 470 +7009 362 +7072 753 +7074 520 +7076 680 +7104 387 +7108 999 +7110 162 +7111 171 +7143 528 +7172 505 +7174 369 +7175 216 +7201 78 +7203 83 +7206 986 +7232 564 +7233 482 +7234 607 +7239 234 +7264 44 +7301 679 +7302 887 +7360 20 +7397 9 +7424 335 +7426 518 +7428 914 +7430 999 +7431 755 +7456 530 +7457 703 +7459 137 +7460 246 +7489 247 +7495 334 +7525 3 +7556 580 +7557 694 +7587 878 +7616 342 +7619 839 +7620 470 +7648 85 +7650 681 +7651 620 +7652 630 +7655 227 +7681 294 +7713 308 +7716 635 +7748 754 +7749 174 +7809 281 +7815 687 +7875 237 +7876 434 +7909 212 +7937 324 +7941 144 +7970 292 +7975 81 +8039 186 +8067 683 +8069 900 +8161 544 +8192 737 +8194 309 +8197 815 +8228 71 +8289 282 +8325 842 +8326 14 +8327 860 +8352 342 +8356 662 +8386 302 +8420 882 +8483 148 +8485 595 +8544 134 +8580 287 +8613 542 +8615 530 +8640 523 +8647 779 +8675 970 +8677 863 +8679 648 +8772 377 +8773 191 +8775 593 +8800 693 +8832 438 +8839 237 +8864 315 +8870 677 +8871 621 +8897 203 +8898 389 +8901 370 +8902 4 +8903 46 +8928 1000 +8930 506 +8962 362 +8965 559 +8967 833 +8998 562 +9027 567 +9028 87 +9029 764 +9058 816 +9063 295 +9090 924 +9093 999 +9095 982 +9121 582 +9127 969 +9152 352 +9153 806 +9157 464 +9158 429 +9159 350 +9185 401 +9191 620 +9218 240 +9220 507 +9249 23 +9251 281 +9282 209 +9283 244 +9285 441 +9287 84 +9315 369 +9317 547 +9318 902 +9350 474 +9377 176 +9379 181 +9382 789 +9411 259 +9440 200 +9444 328 +9447 239 +9477 468 +9506 229 +9507 360 +9537 366 +9539 95 +9540 785 +9575 500 +9605 511 +9636 66 +9668 168 +9699 298 +9731 668 +9732 779 +9734 271 +9761 168 +9767 995 +9827 994 +9856 386 +9858 219 +9889 619 +9890 127 +9891 122 +9893 721 +9952 234 +9953 533 +9956 941 +9959 988 +9985 496 +9986 168 +9988 831 diff --git a/regression-test/suites/correctness_p0/test_left_anti_join_batch_size.sql b/regression-test/suites/correctness_p0/test_left_anti_join_batch_size.sql new file mode 100644 index 0000000000..94d719fbaa --- /dev/null +++ b/regression-test/suites/correctness_p0/test_left_anti_join_batch_size.sql @@ -0,0 +1,21 @@ +-- tables: supplier,lineitem,orders,nation +SELECT /*+SEV_VAR(batch_size=3)*/ + l1.l_orderkey okey, + l1.l_suppkey skey +FROM + regression_test_tpch_unique_sql_zstd_p0.lineitem l1 +WHERE + l1.l_receiptdate > l1.l_commitdate + AND l1.L_ORDERKEY < 10000 + AND NOT exists( + SELECT * + FROM + regression_test_tpch_unique_sql_zstd_p0.lineitem l3 + WHERE + l3.l_orderkey = l1.l_orderkey + AND l3.l_suppkey <> l1.l_suppkey + AND l3.l_receiptdate > l3.l_commitdate + AND l3.L_ORDERKEY < 10000 + ) +ORDER BY + okey, skey