[fix](hashjoin) join produce blocks with rows larger than batch size: handle join with other conjuncts (#16402)

This commit is contained in:
TengJianPing
2023-02-08 14:26:35 +08:00
committed by GitHub
parent 2883f67042
commit f6a20f844b
6 changed files with 1038 additions and 131 deletions

View File

@ -67,6 +67,12 @@ struct ProcessHashTableProbe {
MutableBlock& mutable_block, Block* output_block,
size_t probe_rows, bool is_mark_join);
void _process_splited_equal_matched_tuples(int start_row_idx, int row_count,
const ColumnPtr& other_hit_column,
std::vector<bool*>& visited_map, int right_col_idx,
int right_col_len, UInt8* __restrict null_map_data,
UInt8* __restrict filter_map, Block* output_block);
// Process full outer join/ right join / right semi/anti join to output the join result
// in hash table
template <typename HashTableType>

View File

@ -19,6 +19,7 @@
#include "common/status.h"
#include "process_hash_table_probe.h"
#include "util/simd/bits.h"
#include "vhash_join_node.h"
namespace doris::vectorized {
@ -462,95 +463,162 @@ Status ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
bool all_match_one = true;
int last_probe_index = probe_index;
while (probe_index < probe_rows) {
// ignore null rows
if constexpr (ignore_null && need_null_map_for_probe) {
if ((*null_map)[probe_index]) {
if constexpr (probe_all) {
_items_counts[probe_index++] = (uint32_t)1;
same_to_prev.emplace_back(false);
visited_map.emplace_back(nullptr);
// only full outer / left outer need insert the data of right table
if (LIKELY(current_offset < _build_block_rows.size())) {
_build_block_offsets[current_offset] = -1;
_build_block_rows[current_offset] = -1;
} else {
_build_block_offsets.emplace_back(-1);
_build_block_rows.emplace_back(-1);
}
++current_offset;
} else {
_items_counts[probe_index++] = (uint32_t)0;
}
all_match_one = false;
continue;
}
}
auto last_offset = current_offset;
auto find_result =
!need_null_map_for_probe
? key_getter.find_key(hash_table_ctx.hash_table, probe_index, *_arena)
: (*null_map)[probe_index]
? decltype(key_getter.find_key(hash_table_ctx.hash_table, probe_index,
*_arena)) {nullptr, false}
: key_getter.find_key(hash_table_ctx.hash_table, probe_index, *_arena);
if (probe_index + PREFETCH_STEP < probe_rows) {
key_getter.template prefetch<true>(hash_table_ctx.hash_table,
probe_index + PREFETCH_STEP, *_arena);
}
if (find_result.is_found()) {
auto& mapped = find_result.get_mapped();
auto origin_offset = current_offset;
// TODO: Iterators are currently considered to be a heavy operation and have a certain impact on performance.
// We should rethink whether to use this iterator mode in the future. Now just opt the one row case
if (mapped.get_row_count() == 1) {
if (LIKELY(current_offset < _build_block_rows.size())) {
_build_block_offsets[current_offset] = mapped.block_offset;
_build_block_rows[current_offset] = mapped.row_num;
} else {
_build_block_offsets.emplace_back(mapped.block_offset);
_build_block_rows.emplace_back(mapped.row_num);
}
++current_offset;
visited_map.emplace_back(&mapped.visited);
} else {
for (auto it = mapped.begin(); it.ok(); ++it) {
if (LIKELY(current_offset < _build_block_rows.size())) {
_build_block_offsets[current_offset] = it->block_offset;
_build_block_rows[current_offset] = it->row_num;
} else {
_build_block_offsets.emplace_back(it->block_offset);
_build_block_rows.emplace_back(it->row_num);
}
++current_offset;
visited_map.emplace_back(&it->visited);
}
}
same_to_prev.emplace_back(false);
for (int i = 0; i < current_offset - origin_offset - 1; ++i) {
same_to_prev.emplace_back(true);
}
} else if constexpr (JoinOpType == TJoinOp::LEFT_OUTER_JOIN ||
JoinOpType == TJoinOp::FULL_OUTER_JOIN ||
JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
same_to_prev.emplace_back(false);
visited_map.emplace_back(nullptr);
// only full outer / left outer need insert the data of right table
// left anti use -1 use a default value
int row_count_from_last_probe = 0;
bool is_the_last_sub_block = false;
size_t probe_size = 0;
auto& probe_row_match_iter =
std::get<ForwardIterator<Mapped>>(_join_node->_probe_row_match_iter);
if (probe_row_match_iter.ok()) {
auto origin_offset = current_offset;
for (; probe_row_match_iter.ok() && current_offset < _batch_size;
++probe_row_match_iter) {
if (LIKELY(current_offset < _build_block_rows.size())) {
_build_block_offsets[current_offset] = -1;
_build_block_rows[current_offset] = -1;
_build_block_offsets[current_offset] = probe_row_match_iter->block_offset;
_build_block_rows[current_offset] = probe_row_match_iter->row_num;
} else {
_build_block_offsets.emplace_back(-1);
_build_block_rows.emplace_back(-1);
_build_block_offsets.emplace_back(probe_row_match_iter->block_offset);
_build_block_rows.emplace_back(probe_row_match_iter->row_num);
}
++current_offset;
} else if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN) {
if (is_mark_join) {
visited_map.emplace_back(&probe_row_match_iter->visited);
}
same_to_prev.emplace_back(false);
for (int i = 0; i < current_offset - origin_offset - 1; ++i) {
same_to_prev.emplace_back(true);
}
row_count_from_last_probe = current_offset;
all_match_one &= (current_offset == 1);
_items_counts[probe_index] = current_offset;
if (!probe_row_match_iter.ok()) {
++probe_index;
is_the_last_sub_block = true;
}
probe_size = 1;
}
int multi_matched_output_row_count = 0;
if (current_offset < _batch_size) {
while (probe_index < probe_rows) {
// ignore null rows
if constexpr (ignore_null && need_null_map_for_probe) {
if ((*null_map)[probe_index]) {
if constexpr (probe_all) {
_items_counts[probe_index++] = (uint32_t)1;
same_to_prev.emplace_back(false);
visited_map.emplace_back(nullptr);
// only full outer / left outer need insert the data of right table
if (LIKELY(current_offset < _build_block_rows.size())) {
_build_block_offsets[current_offset] = -1;
_build_block_rows[current_offset] = -1;
} else {
_build_block_offsets.emplace_back(-1);
_build_block_rows.emplace_back(-1);
}
++current_offset;
} else {
_items_counts[probe_index++] = (uint32_t)0;
}
all_match_one = false;
if constexpr (probe_all) {
if (current_offset >= _batch_size) {
break;
}
}
continue;
}
}
auto last_offset = current_offset;
auto find_result = !need_null_map_for_probe
? key_getter.find_key(hash_table_ctx.hash_table,
probe_index, *_arena)
: (*null_map)[probe_index]
? decltype(key_getter.find_key(hash_table_ctx.hash_table,
probe_index,
*_arena)) {nullptr, false}
: key_getter.find_key(hash_table_ctx.hash_table,
probe_index, *_arena);
if (probe_index + PREFETCH_STEP < probe_rows) {
key_getter.template prefetch<true>(hash_table_ctx.hash_table,
probe_index + PREFETCH_STEP, *_arena);
}
auto current_probe_index = probe_index;
if (find_result.is_found()) {
auto& mapped = find_result.get_mapped();
auto origin_offset = current_offset;
// TODO: Iterators are currently considered to be a heavy operation and have a certain impact on performance.
// We should rethink whether to use this iterator mode in the future. Now just opt the one row case
if (mapped.get_row_count() == 1) {
if (LIKELY(current_offset < _build_block_rows.size())) {
_build_block_offsets[current_offset] = mapped.block_offset;
_build_block_rows[current_offset] = mapped.row_num;
} else {
_build_block_offsets.emplace_back(mapped.block_offset);
_build_block_rows.emplace_back(mapped.row_num);
}
++current_offset;
visited_map.emplace_back(&mapped.visited);
++probe_index;
} else {
// For mark join, if euqual-matched tuple count for one probe row
// excceeds batch size, it's difficult to implement the logic to
// split them into multiple sub blocks and handle them, keep the original
// logic for now.
if (is_mark_join) {
for (auto it = mapped.begin(); it.ok(); ++it) {
if (LIKELY(current_offset < _build_block_rows.size())) {
_build_block_offsets[current_offset] = it->block_offset;
_build_block_rows[current_offset] = it->row_num;
} else {
_build_block_offsets.emplace_back(it->block_offset);
_build_block_rows.emplace_back(it->row_num);
}
++current_offset;
visited_map.emplace_back(&it->visited);
}
} else {
auto multi_match_last_offset = current_offset;
auto it = mapped.begin();
// breaks if row count exceeds batch_size
for (; it.ok() && current_offset < _batch_size; ++it) {
if (LIKELY(current_offset < _build_block_rows.size())) {
_build_block_offsets[current_offset] = it->block_offset;
_build_block_rows[current_offset] = it->row_num;
} else {
_build_block_offsets.emplace_back(it->block_offset);
_build_block_rows.emplace_back(it->row_num);
}
++current_offset;
visited_map.emplace_back(&it->visited);
}
probe_row_match_iter = it;
// If all matched rows for the current probe row are handled,
// advance to next probe row.
if (!it.ok()) {
++probe_index;
} else {
// If not(which means it excceed batch size), probe_index is not increased and
// remaining matched rows for the current probe row will be
// handled in the next call of this function
multi_matched_output_row_count =
current_offset - multi_match_last_offset;
}
}
}
same_to_prev.emplace_back(false);
for (int i = 0; i < current_offset - origin_offset - 1; ++i) {
same_to_prev.emplace_back(true);
}
} else if constexpr (JoinOpType == TJoinOp::LEFT_OUTER_JOIN ||
JoinOpType == TJoinOp::FULL_OUTER_JOIN ||
JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
same_to_prev.emplace_back(false);
visited_map.emplace_back(nullptr);
// only full outer / left outer need insert the data of right table
// left anti use -1 use a default value
if (LIKELY(current_offset < _build_block_rows.size())) {
_build_block_offsets[current_offset] = -1;
_build_block_rows[current_offset] = -1;
@ -559,16 +627,33 @@ Status ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
_build_block_rows.emplace_back(-1);
}
++current_offset;
++probe_index;
} else if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN) {
if (is_mark_join) {
same_to_prev.emplace_back(false);
visited_map.emplace_back(nullptr);
if (LIKELY(current_offset < _build_block_rows.size())) {
_build_block_offsets[current_offset] = -1;
_build_block_rows[current_offset] = -1;
} else {
_build_block_offsets.emplace_back(-1);
_build_block_rows.emplace_back(-1);
}
++current_offset;
}
++probe_index;
} else {
// other join, no nothing
++probe_index;
}
uint32_t count = (uint32_t)(current_offset - last_offset);
_items_counts[current_probe_index] = count;
all_match_one &= (count == 1);
if (current_offset >= _batch_size) {
break;
}
} else {
// other join, no nothing
}
uint32_t count = (uint32_t)(current_offset - last_offset);
_items_counts[probe_index++] = count;
all_match_one &= (count == 1);
if (current_offset >= _batch_size && !all_match_one) {
break;
}
probe_size = probe_index - last_probe_index + (probe_row_match_iter.ok() ? 1 : 0);
}
{
@ -579,14 +664,14 @@ Status ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
{
SCOPED_TIMER(_probe_side_output_timer);
probe_side_output_column(mcol, _join_node->_left_output_slot_flags, current_offset,
last_probe_index, probe_index - last_probe_index,
all_match_one, true);
last_probe_index, probe_size, all_match_one, true);
}
auto num_cols = mutable_block.columns();
output_block->swap(mutable_block.to_block());
// dispose the other join conjunct exec
if (output_block->rows()) {
auto row_count = output_block->rows();
if (row_count) {
int result_column_id = -1;
int orig_columns = output_block->columns();
RETURN_IF_ERROR((*_join_node->_vother_join_conjunct_ptr)
@ -595,13 +680,29 @@ Status ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
auto column = output_block->get_by_position(result_column_id).column;
if constexpr (JoinOpType == TJoinOp::LEFT_OUTER_JOIN ||
JoinOpType == TJoinOp::FULL_OUTER_JOIN) {
auto new_filter_column = ColumnVector<UInt8>::create();
auto& filter_map = new_filter_column->get_data();
auto new_filter_column = ColumnVector<UInt8>::create(row_count);
auto* __restrict filter_map = new_filter_column->get_data().data();
auto null_map_column = ColumnVector<UInt8>::create(column->size(), 0);
auto null_map_column = ColumnVector<UInt8>::create(row_count, 0);
auto* __restrict null_map_data = null_map_column->get_data().data();
for (int i = 0; i < column->size(); ++i) {
// It contains non-first sub block of splited equal-conjuncts-matched tuples from last probe row
if (row_count_from_last_probe > 0) {
_process_splited_equal_matched_tuples(0, row_count_from_last_probe, column,
visited_map, right_col_idx, right_col_len,
null_map_data, filter_map, output_block);
// This is the last sub block of splitted block, and no equal-conjuncts-matched tuple
// is output in all sub blocks, need to output a tuple for this probe row
if (is_the_last_sub_block && !_join_node->_is_any_probe_match_row_output) {
filter_map[0] = true;
null_map_data[0] = true;
}
}
int end_idx = row_count - multi_matched_output_row_count;
// process equal-conjuncts-matched tuples that are newly generated
// in this run if there are any.
for (size_t i = row_count_from_last_probe; i < end_idx; ++i) {
auto join_hit = visited_map[i] != nullptr;
auto other_hit = column->get_bool(i);
@ -617,21 +718,40 @@ Status ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
}
null_map_data[i] = !join_hit || !other_hit;
// For cases where one probe row matches multiple build rows for equal conjuncts,
// all the other-conjuncts-matched tuples should be output.
//
// Other-conjuncts-NOT-matched tuples fall into two categories:
// 1. The beginning consecutive one(s).
// For these tuples, only the last one is marked to output;
// If there are any following other-conjuncts-matched tuples,
// the last tuple is also marked NOT to output.
// 2. All the remaining other-conjuncts-NOT-matched tuples.
// All these tuples are marked not to output.
if (join_hit) {
*visited_map[i] |= other_hit;
filter_map.push_back(other_hit || !same_to_prev[i] ||
(!column->get_bool(i - 1) && filter_map.back()));
filter_map[i] = other_hit || !same_to_prev[i] ||
(!column->get_bool(i - 1) && filter_map[i - 1]);
// Here to keep only hit join conjunct and other join conjunt is true need to be output.
// if not, only some key must keep one row will output will null right table column
if (same_to_prev[i] && filter_map.back() && !column->get_bool(i - 1)) {
if (same_to_prev[i] && filter_map[i] && !column->get_bool(i - 1)) {
filter_map[i - 1] = false;
}
} else {
filter_map.push_back(true);
filter_map[i] = true;
}
}
for (int i = 0; i < column->size(); ++i) {
// It contains the first sub block of splited equal-conjuncts-matched tuples of the current probe row
if (multi_matched_output_row_count > 0) {
_join_node->_is_any_probe_match_row_output = false;
_process_splited_equal_matched_tuples(
row_count - multi_matched_output_row_count,
multi_matched_output_row_count, column, visited_map, right_col_idx,
right_col_len, null_map_data, filter_map, output_block);
}
for (size_t i = 0; i < row_count; ++i) {
if (filter_map[i]) {
_tuple_is_null_right_flags->emplace_back(null_map_data[i]);
}
@ -639,13 +759,30 @@ Status ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
output_block->get_by_position(result_column_id).column =
std::move(new_filter_column);
} else if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN) {
// TODO: resize in advance
auto new_filter_column = ColumnVector<UInt8>::create();
auto& filter_map = new_filter_column->get_data();
if (!column->empty()) {
size_t start_row_idx = 1;
// We are handling euqual-conjuncts matched tuples that are splitted into multiple blocks
if (row_count_from_last_probe > 0) {
if (_join_node->_is_any_probe_match_row_output) {
// if any matched tuple for this probe row is output,
// ignore all the following tuples for this probe row.
for (int row_idx = 0; row_idx < row_count_from_last_probe; ++row_idx) {
filter_map.emplace_back(false);
}
start_row_idx += row_count_from_last_probe;
if (row_count_from_last_probe < row_count) {
filter_map.emplace_back(column->get_bool(row_count_from_last_probe));
}
} else {
filter_map.emplace_back(column->get_bool(0));
}
} else {
filter_map.emplace_back(column->get_bool(0));
}
for (int i = 1; i < column->size(); ++i) {
for (size_t i = start_row_idx; i < row_count; ++i) {
if (column->get_bool(i) || (same_to_prev[i] && filter_map[i - 1])) {
// Only last same element is true, output last one
filter_map.push_back(true);
@ -654,6 +791,21 @@ Status ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
filter_map.push_back(false);
}
}
// It contains the first sub block of splited equal-conjuncts-matched tuples of the current probe row
if (multi_matched_output_row_count > 0) {
// If a matched row is output, all the equal-matched tuples in
// the following sub blocks should be ignored
_join_node->_is_any_probe_match_row_output = filter_map[row_count - 1];
} else if (row_count_from_last_probe > 0 &&
!_join_node->_is_any_probe_match_row_output) {
// We are handling euqual-conjuncts matched tuples that are splitted into multiple blocks,
// and no matched tuple has been output in all previous run.
// If a tuple is output in this run, all the following mathced tuples should be ignored
if (filter_map[row_count_from_last_probe - 1]) {
_join_node->_is_any_probe_match_row_output = true;
}
}
if (is_mark_join) {
auto& matched_map = assert_cast<doris::vectorized::ColumnVector<UInt8>&>(
*(output_block->get_by_position(num_cols - 1)
@ -662,7 +814,7 @@ Status ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
// For mark join, we only filter rows which have duplicate join keys.
// And then, we set matched_map to the join result to do the mark join's filtering.
for (size_t i = 1; i < column->size(); ++i) {
for (size_t i = 1; i < row_count; ++i) {
if (!same_to_prev[i]) {
matched_map.push_back(filter_map[i - 1]);
filter_map[i - 1] = true;
@ -676,14 +828,43 @@ Status ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
std::move(new_filter_column);
} else if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
auto new_filter_column = ColumnVector<UInt8>::create();
auto& filter_map = new_filter_column->get_data();
auto new_filter_column = ColumnVector<UInt8>::create(row_count);
auto* __restrict filter_map = new_filter_column->get_data().data();
if (!column->empty()) {
// for left anti join, the probe side is output only when
// there are no matched tuples for the probe row.
// If multiple equal-conjuncts-matched tuples is splitted into several
// sub blocks, just filter out all the other-conjuncts-NOT-matched tuples at first,
// and when processing the last sub block, check whether there are any
// equal-conjuncts-matched tuple is output in all sub blocks,
// if there are none, just pick a tuple and output.
size_t start_row_idx = 1;
// We are handling euqual-conjuncts matched tuples that are splitted into multiple blocks
if (row_count_from_last_probe > 0) {
if (_join_node->_is_any_probe_match_row_output) {
// if any matched tuple for this probe row is output,
// ignore all the following tuples for this probe row.
for (int row_idx = 0; row_idx < row_count_from_last_probe; ++row_idx) {
filter_map[row_idx] = false;
}
start_row_idx += row_count_from_last_probe;
if (row_count_from_last_probe < row_count) {
filter_map[row_count_from_last_probe] =
column->get_bool(row_count_from_last_probe) &&
visited_map[row_count_from_last_probe];
}
} else {
// Both equal conjuncts and other conjuncts are true
filter_map[0] = column->get_bool(0) && visited_map[0];
}
} else {
// Both equal conjuncts and other conjuncts are true
filter_map.emplace_back(column->get_bool(0) && visited_map[0]);
filter_map[0] = column->get_bool(0) && visited_map[0];
}
for (int i = 1; i < column->size(); ++i) {
for (size_t i = start_row_idx; i < row_count; ++i) {
if ((visited_map[i] && column->get_bool(i)) ||
(same_to_prev[i] && filter_map[i - 1])) {
// When either of two conditions is meet:
@ -691,10 +872,10 @@ Status ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
// 2. This row is joined from the same build side row as the previous row
// Set filter_map[i] to true and filter_map[i - 1] to false if same_to_prev[i]
// is true.
filter_map.push_back(true);
filter_map[i] = true;
filter_map[i - 1] = !same_to_prev[i] && filter_map[i - 1];
} else {
filter_map.push_back(false);
filter_map[i] = false;
}
}
@ -703,22 +884,62 @@ Status ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
*(output_block->get_by_position(num_cols - 1)
.column->assume_mutable()))
.get_data();
for (int i = 1; i < same_to_prev.size(); ++i) {
for (int i = 1; i < row_count; ++i) {
if (!same_to_prev[i]) {
matched_map.push_back(!filter_map[i - 1]);
filter_map[i - 1] = true;
}
}
matched_map.push_back(!filter_map[filter_map.size() - 1]);
filter_map[filter_map.size() - 1] = true;
matched_map.push_back(!filter_map[row_count - 1]);
filter_map[row_count - 1] = true;
} else {
int end_row_idx;
if (row_count_from_last_probe > 0) {
end_row_idx = row_count - multi_matched_output_row_count;
if (!_join_node->_is_any_probe_match_row_output) {
// We are handling euqual-conjuncts matched tuples that are splitted into multiple blocks,
// and no matched tuple has been output in all previous run.
// If a tuple is output in this run, all the following mathced tuples should be ignored
if (filter_map[row_count_from_last_probe - 1]) {
_join_node->_is_any_probe_match_row_output = true;
filter_map[row_count_from_last_probe - 1] = false;
}
if (is_the_last_sub_block &&
!_join_node->_is_any_probe_match_row_output) {
// This is the last sub block of splitted block, and no equal-conjuncts-matched tuple
// is output in all sub blocks, output a tuple for this probe row
filter_map[0] = true;
}
}
if (multi_matched_output_row_count > 0) {
// It contains the first sub block of splited equal-conjuncts-matched tuples of the current probe row
// If a matched row is output, all the equal-matched tuples in
// the following sub blocks should be ignored
_join_node->_is_any_probe_match_row_output = filter_map[row_count - 1];
filter_map[row_count - 1] = false;
}
} else if (multi_matched_output_row_count > 0) {
end_row_idx = row_count - multi_matched_output_row_count;
// It contains the first sub block of splited equal-conjuncts-matched tuples of the current probe row
// If a matched row is output, all the equal-matched tuples in
// the following sub blocks should be ignored
_join_node->_is_any_probe_match_row_output = filter_map[row_count - 1];
filter_map[row_count - 1] = false;
} else {
end_row_idx = row_count;
}
// Same to the semi join, but change the last value to opposite value
for (int i = 1; i < same_to_prev.size(); ++i) {
for (int i = 1 + row_count_from_last_probe; i < end_row_idx; ++i) {
if (!same_to_prev[i]) {
filter_map[i - 1] = !filter_map[i - 1];
}
}
filter_map[same_to_prev.size() - 1] = !filter_map[same_to_prev.size() - 1];
auto non_sub_blocks_matched_row_count =
row_count - row_count_from_last_probe - multi_matched_output_row_count;
if (non_sub_blocks_matched_row_count > 0) {
filter_map[end_row_idx - 1] = !filter_map[end_row_idx - 1];
}
}
output_block->get_by_position(result_column_id).column =
@ -731,7 +952,7 @@ Status ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
}
} else if constexpr (JoinOpType == TJoinOp::RIGHT_OUTER_JOIN) {
auto filter_size = 0;
for (int i = 0; i < column->size(); ++i) {
for (int i = 0; i < row_count; ++i) {
DCHECK(visited_map[i]);
auto result = column->get_bool(i);
*visited_map[i] |= result;
@ -766,6 +987,42 @@ Status ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
}
}
// For left or full outer join with other conjuncts.
// If multiple equal-conjuncts-matched tuples is splitted into several
// sub blocks, just filter out all the other-conjuncts-NOT-matched tuples at first,
// and when processing the last sub block, check whether there are any
// equal-conjuncts-matched tuple is output in all sub blocks,
// if not, just pick a tuple and output.
template <int JoinOpType>
void ProcessHashTableProbe<JoinOpType>::_process_splited_equal_matched_tuples(
int start_row_idx, int row_count, const ColumnPtr& other_hit_column,
std::vector<bool*>& visited_map, int right_col_idx, int right_col_len,
UInt8* __restrict null_map_data, UInt8* __restrict filter_map, Block* output_block) {
int end_row_idx = start_row_idx + row_count;
for (int i = start_row_idx; i < end_row_idx; ++i) {
auto join_hit = visited_map[i] != nullptr;
auto other_hit = other_hit_column->get_bool(i);
if (!other_hit) {
for (size_t j = 0; j < right_col_len; ++j) {
typeid_cast<ColumnNullable*>(
std::move(*output_block->get_by_position(j + right_col_idx).column)
.assume_mutable()
.get())
->get_null_map_data()[i] = true;
}
}
null_map_data[i] = !join_hit || !other_hit;
filter_map[i] = other_hit;
if (join_hit) {
*visited_map[i] |= other_hit;
}
}
_join_node->_is_any_probe_match_row_output |= simd::contain_byte(filter_map, row_count, 1);
}
template <int JoinOpType>
template <typename HashTableType>
Status ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableType& hash_table_ctx,
@ -786,7 +1043,8 @@ Status ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
auto& iter = hash_table_ctx.iter;
auto block_size = 0;
auto& visited_iter = _join_node->_outer_join_pull_visited_iter;
auto& visited_iter =
std::get<ForwardIterator<Mapped>>(_join_node->_outer_join_pull_visited_iter);
auto insert_from_hash_table = [&](uint8_t offset, uint32_t row_num) {
block_size++;
@ -797,7 +1055,6 @@ Status ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
};
if (visited_iter.ok()) {
DCHECK((std::is_same_v<Mapped, RowRefListWithFlag>));
for (; visited_iter.ok() && block_size < _batch_size; ++visited_iter) {
insert_from_hash_table(visited_iter->block_offset, visited_iter->row_num);
}
@ -810,11 +1067,17 @@ Status ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
auto& mapped = iter->get_second();
if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) {
if (mapped.visited) {
for (auto it = mapped.begin(); it.ok(); ++it) {
visited_iter = mapped.begin();
for (; visited_iter.ok() && block_size < _batch_size; ++visited_iter) {
if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) {
insert_from_hash_table(it->block_offset, it->row_num);
insert_from_hash_table(visited_iter->block_offset,
visited_iter->row_num);
}
}
if (visited_iter.ok()) {
// block_size >= _batch_size, quit for loop
break;
}
} else {
if constexpr (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN) {
visited_iter = mapped.begin();
@ -829,17 +1092,24 @@ Status ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
}
}
} else {
for (auto it = mapped.begin(); it.ok(); ++it) {
visited_iter = mapped.begin();
for (; visited_iter.ok() && block_size < _batch_size; ++visited_iter) {
if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) {
if (it->visited) {
insert_from_hash_table(it->block_offset, it->row_num);
if (visited_iter->visited) {
insert_from_hash_table(visited_iter->block_offset,
visited_iter->row_num);
}
} else {
if (!it->visited) {
insert_from_hash_table(it->block_offset, it->row_num);
if (!visited_iter->visited) {
insert_from_hash_table(visited_iter->block_offset,
visited_iter->row_num);
}
}
}
if (visited_iter.ok()) {
// block_size >= _batch_size, quit for loop
break;
}
}
}
@ -862,6 +1132,7 @@ Status ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
*eos = iter == hash_table_ctx.hash_table.end();
output_block->swap(
mutable_block.to_block(right_semi_anti_without_other ? right_col_idx : 0));
DCHECK(block_size <= _batch_size);
return Status::OK();
} else {
LOG(FATAL) << "Invalid RowRefList";

View File

@ -458,8 +458,7 @@ void HashJoinNode::prepare_for_next() {
_prepare_probe_block();
}
Status HashJoinNode::pull(doris::RuntimeState* /*state*/, vectorized::Block* output_block,
bool* eos) {
Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) {
SCOPED_TIMER(_probe_timer);
if (_short_circuit_for_null_in_probe_side) {
// If we use a short-circuit strategy for null value in build side (e.g. if join operator is
@ -538,6 +537,8 @@ Status HashJoinNode::pull(doris::RuntimeState* /*state*/, vectorized::Block* out
if (_is_outer_join) {
_add_tuple_is_null_column(&temp_block);
}
auto output_rows = temp_block.rows();
DCHECK(output_rows <= state->batch_size());
{
SCOPED_TIMER(_join_filter_timer);
RETURN_IF_ERROR(
@ -1027,6 +1028,7 @@ void HashJoinNode::_hash_table_init(RuntimeState* state) {
JoinOpType::value == TJoinOp::FULL_OUTER_JOIN,
RowRefListWithFlag, RowRefList>>;
_probe_row_match_iter.emplace<ForwardIterator<RowRefListType>>();
_outer_join_pull_visited_iter.emplace<ForwardIterator<RowRefListType>>();
if (_build_expr_ctxs.size() == 1 && !_store_null_in_hash_table[0]) {
// Single column optimization

View File

@ -280,8 +280,7 @@ private:
std::unique_ptr<HashTableCtxVariants> _process_hashtable_ctx_variants;
// for full/right outer join
ForwardIterator<RowRefListWithFlag> _outer_join_pull_visited_iter;
HashTableIteratorVariants _outer_join_pull_visited_iter;
HashTableIteratorVariants _probe_row_match_iter;
std::shared_ptr<std::vector<Block>> _build_blocks;
@ -309,6 +308,8 @@ private:
std::vector<bool> _left_output_slot_flags;
std::vector<bool> _right_output_slot_flags;
// for cases when a probe row matches more than batch size build rows.
bool _is_any_probe_match_row_output = false;
uint8_t _build_block_idx = 0;
int64_t _build_side_mem_used = 0;
int64_t _build_side_last_mem_used = 0;

View File

@ -0,0 +1,606 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !test_left_anti_join_batch_size --
2 138
4 579
36 978
64 597
66 350
99 95
128 944
132 907
160 982
161 62
162 466
193 192
196 237
199 998
227 213
228 655
257 624
259 879
288 723
292 926
293 642
327 144
352 378
353 223
355 381
385 912
389 199
391 684
416 871
421 308
422 962
423 716
452 465
454 544
485 479
546 234
550 301
578 240
609 811
642 364
674 398
705 406
706 933
707 520
742 262
743 685
769 775
805 919
806 506
834 197
835 994
837 913
838 635
866 572
867 881
870 674
934 313
965 205
992 757
996 775
997 745
1030 209
1063 24
1088 445
1091 725
1095 873
1123 872
1186 247
1187 983
1189 69
1190 154
1191 84
1217 426
1219 123
1222 130
1223 947
1249 327
1313 903
1314 452
1344 526
1348 824
1349 327
1351 724
1376 120
1380 609
1383 98
1414 485
1415 825
1440 761
1442 13
1443 622
1446 188
1479 147
1507 223
1511 495
1541 595
1570 19
1602 282
1603 778
1605 856
1639 573
1664 254
1665 404
1669 839
1670 139
1699 242
1729 696
1734 35
1765 58
1796 21
1799 372
1824 751
1830 360
1831 705
1860 296
1862 216
1863 227
1892 540
1894 831
1895 819
1921 84
1922 479
1952 697
1953 549
1959 869
1984 512
1987 256
1989 739
2016 37
2017 9
2019 697
2021 246
2048 55
2054 100
2080 437
2085 275
2112 11
2114 45
2119 697
2147 858
2148 378
2182 595
2212 596
2243 700
2244 520
2247 461
2274 574
2308 753
2310 44
2336 487
2337 735
2338 617
2340 569
2341 552
2343 991
2369 141
2401 724
2402 929
2405 374
2432 181
2436 408
2471 386
2501 446
2502 217
2535 342
2564 123
2592 339
2596 748
2597 896
2626 901
2627 849
2631 429
2660 534
2663 107
2693 186
2726 567
2727 624
2755 505
2787 8
2822 343
2848 998
2849 231
2851 11
2852 140
2882 864
2887 416
2912 670
2916 510
2918 257
2947 997
2948 172
2951 410
2977 746
2981 105
3011 30
3042 243
3047 401
3074 171
3075 373
3076 48
3078 974
3137 269
3139 185
3141 687
3142 446
3173 241
3206 321
3237 324
3238 269
3266 831
3267 234
3268 569
3299 54
3301 167
3302 792
3329 53
3330 413
3332 387
3361 537
3395 295
3398 297
3424 892
3431 725
3456 563
3493 770
3495 861
3524 824
3525 698
3526 15
3527 266
3559 961
3589 412
3616 925
3620 191
3622 477
3651 564
3680 757
3681 785
3718 725
3744 15
3746 403
3776 934
3779 337
3875 534
3877 786
3879 94
3908 444
3938 368
3972 810
3974 615
3975 371
4005 717
4037 678
4098 943
4100 364
4103 989
4132 4
4133 366
4160 430
4163 288
4164 489
4194 167
4225 761
4226 976
4256 305
4259 769
4260 97
4289 563
4294 281
4323 37
4325 757
4326 274
4352 840
4353 141
4358 28
4385 623
4391 367
4417 610
4419 418
4420 483
4423 717
4449 329
4451 230
4480 570
4481 645
4482 38
4487 808
4517 255
4519 277
4546 774
4549 62
4550 169
4576 792
4581 477
4612 230
4615 390
4641 785
4643 299
4679 451
4709 435
4737 822
4772 684
4775 656
4800 55
4802 946
4838 181
4867 231
4898 675
4903 760
4928 290
4963 317
4993 119
5025 262
5026 796
5027 39
5029 361
5058 514
5091 504
5123 576
5125 909
5154 247
5155 658
5156 231
5187 852
5188 16
5219 491
5220 790
5222 844
5248 800
5250 962
5251 424
5255 916
5283 168
5287 334
5312 854
5351 111
5378 622
5379 149
5412 819
5440 933
5447 114
5508 479
5510 684
5539 210
5542 826
5602 335
5607 418
5638 785
5639 170
5667 237
5700 506
5701 605
5728 675
5730 970
5764 64
5795 839
5796 309
5825 88
5830 723
5860 783
5861 79
5862 469
5863 203
5888 899
5890 224
5891 473
5893 161
5952 35
5953 98
5955 465
5985 831
5988 733
5991 30
6052 782
6054 499
6080 182
6114 902
6149 546
6151 458
6176 472
6177 904
6178 433
6180 716
6212 989
6213 174
6214 744
6245 892
6247 552
6273 257
6277 468
6306 19
6308 807
6336 467
6368 267
6400 223
6405 315
6407 763
6434 14
6436 163
6466 289
6467 221
6468 670
6497 793
6530 573
6534 745
6595 533
6597 313
6598 602
6624 275
6658 209
6692 474
6694 474
6720 888
6752 558
6758 258
6785 232
6787 47
6791 516
6816 172
6853 118
6855 489
6880 785
6912 573
6914 494
6946 680
6950 26
6951 963
6979 701
6983 922
7008 470
7009 362
7072 753
7074 520
7076 680
7104 387
7108 999
7110 162
7111 171
7143 528
7172 505
7174 369
7175 216
7201 78
7203 83
7206 986
7232 564
7233 482
7234 607
7239 234
7264 44
7301 679
7302 887
7360 20
7397 9
7424 335
7426 518
7428 914
7430 999
7431 755
7456 530
7457 703
7459 137
7460 246
7489 247
7495 334
7525 3
7556 580
7557 694
7587 878
7616 342
7619 839
7620 470
7648 85
7650 681
7651 620
7652 630
7655 227
7681 294
7713 308
7716 635
7748 754
7749 174
7809 281
7815 687
7875 237
7876 434
7909 212
7937 324
7941 144
7970 292
7975 81
8039 186
8067 683
8069 900
8161 544
8192 737
8194 309
8197 815
8228 71
8289 282
8325 842
8326 14
8327 860
8352 342
8356 662
8386 302
8420 882
8483 148
8485 595
8544 134
8580 287
8613 542
8615 530
8640 523
8647 779
8675 970
8677 863
8679 648
8772 377
8773 191
8775 593
8800 693
8832 438
8839 237
8864 315
8870 677
8871 621
8897 203
8898 389
8901 370
8902 4
8903 46
8928 1000
8930 506
8962 362
8965 559
8967 833
8998 562
9027 567
9028 87
9029 764
9058 816
9063 295
9090 924
9093 999
9095 982
9121 582
9127 969
9152 352
9153 806
9157 464
9158 429
9159 350
9185 401
9191 620
9218 240
9220 507
9249 23
9251 281
9282 209
9283 244
9285 441
9287 84
9315 369
9317 547
9318 902
9350 474
9377 176
9379 181
9382 789
9411 259
9440 200
9444 328
9447 239
9477 468
9506 229
9507 360
9537 366
9539 95
9540 785
9575 500
9605 511
9636 66
9668 168
9699 298
9731 668
9732 779
9734 271
9761 168
9767 995
9827 994
9856 386
9858 219
9889 619
9890 127
9891 122
9893 721
9952 234
9953 533
9956 941
9959 988
9985 496
9986 168
9988 831

View File

@ -0,0 +1,21 @@
-- tables: supplier,lineitem,orders,nation
SELECT /*+SEV_VAR(batch_size=3)*/
l1.l_orderkey okey,
l1.l_suppkey skey
FROM
regression_test_tpch_unique_sql_zstd_p0.lineitem l1
WHERE
l1.l_receiptdate > l1.l_commitdate
AND l1.L_ORDERKEY < 10000
AND NOT exists(
SELECT *
FROM
regression_test_tpch_unique_sql_zstd_p0.lineitem l3
WHERE
l3.l_orderkey = l1.l_orderkey
AND l3.l_suppkey <> l1.l_suppkey
AND l3.l_receiptdate > l3.l_commitdate
AND l3.L_ORDERKEY < 10000
)
ORDER BY
okey, skey