[Performance](Join) Support all match one logic (#30019)

Support all match one logic
This commit is contained in:
HappenLee
2024-01-17 20:53:42 +08:00
committed by yiguolei
parent 8a0a6bf856
commit 7e821f3d66
2 changed files with 18 additions and 10 deletions

View File

@ -47,8 +47,8 @@ struct ProcessHashTableProbe {
int size, bool have_other_join_conjunct);
void probe_side_output_column(MutableColumns& mcol, const std::vector<bool>& output_slot_flags,
int size, int last_probe_index, size_t probe_size,
bool all_match_one, bool have_other_join_conjunct);
int size, int last_probe_index, bool all_match_one,
bool have_other_join_conjunct);
template <bool need_null_map_for_probe, bool ignore_null, typename HashTableType>
Status process(HashTableType& hash_table_ctx, ConstNullMapPtr null_map,

View File

@ -97,15 +97,14 @@ void ProcessHashTableProbe<JoinOpType, Parent>::build_side_output_column(
template <int JoinOpType, typename Parent>
void ProcessHashTableProbe<JoinOpType, Parent>::probe_side_output_column(
MutableColumns& mcol, const std::vector<bool>& output_slot_flags, int size,
int last_probe_index, size_t probe_size, bool all_match_one,
bool have_other_join_conjunct) {
int last_probe_index, bool all_match_one, bool have_other_join_conjunct) {
SCOPED_TIMER(_probe_side_output_timer);
auto& probe_block = _parent->_probe_block;
for (int i = 0; i < output_slot_flags.size(); ++i) {
if (output_slot_flags[i]) {
auto& column = probe_block.get_by_position(i).column;
if (all_match_one) {
mcol[i]->insert_range_from(*column, last_probe_index, probe_size);
mcol[i]->insert_range_from(*column, last_probe_index, size);
} else {
column->replicate(_probe_indexs.data(), size, *mcol[i]);
}
@ -168,8 +167,6 @@ Status ProcessHashTableProbe<JoinOpType, Parent>::do_process(HashTableType& hash
auto& mcol = mutable_block.mutable_columns();
int current_offset = 0;
bool all_match_one = false;
size_t probe_size = 0;
std::unique_ptr<ColumnFilterHelper> mark_column;
if (is_mark_join) {
@ -188,16 +185,27 @@ Status ProcessHashTableProbe<JoinOpType, Parent>::do_process(HashTableType& hash
probe_index = new_probe_idx;
build_index = new_build_idx;
current_offset = new_current_offset;
probe_size = probe_index - last_probe_index;
}
build_side_output_column(mcol, *_right_output_slot_flags, current_offset, with_other_conjuncts);
if constexpr (with_other_conjuncts || (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN &&
JoinOpType != TJoinOp::RIGHT_ANTI_JOIN)) {
auto check_all_match_one = [](const std::vector<uint32_t>& vecs, uint32_t probe_idx,
int size) {
if (size < 1 || vecs[0] != probe_idx) return false;
for (int i = 1; i < size; i++) {
if (vecs[i] - vecs[i - 1] != 1) {
return false;
}
}
return true;
};
RETURN_IF_CATCH_EXCEPTION(probe_side_output_column(
mcol, *_left_output_slot_flags, current_offset, last_probe_index, probe_size,
all_match_one, with_other_conjuncts));
mcol, *_left_output_slot_flags, current_offset, last_probe_index,
check_all_match_one(_probe_indexs, last_probe_index, current_offset),
with_other_conjuncts));
}
output_block->swap(mutable_block.to_block());