diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index da262645bb..8dd84dfd27 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -139,8 +139,7 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* state) { void HashJoinBuildSinkLocalState::init_short_circuit_for_probe() { auto& p = _parent->cast(); _shared_state->short_circuit_for_probe = - (_short_circuit_for_null_in_probe_side && - p._join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) || + (_has_null_in_build_side && p._join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) || (_shared_state->build_blocks->empty() && p._join_op == TJoinOp::INNER_JOIN && !p._is_mark_join) || (_shared_state->build_blocks->empty() && p._join_op == TJoinOp::LEFT_SEMI_JOIN && @@ -204,7 +203,7 @@ Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state, has_null_value || short_circuit_for_null_in_build_side ? &null_map_val->get_data() : nullptr, - &_short_circuit_for_null_in_probe_side); + &_has_null_in_build_side); }}, *_shared_state->hash_table_variants, vectorized::make_bool_variant(_build_side_ignore_null), @@ -453,8 +452,8 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* // make one block for each 4 gigabytes constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL; - if (local_state._short_circuit_for_null_in_probe_side) { - // TODO: if _short_circuit_for_null_in_probe_side is true we should finish current pipeline task. + if (local_state._has_null_in_build_side) { + // TODO: if _has_null_in_build_side is true we should finish current pipeline task. DCHECK(state->enable_pipeline_exec()); return Status::OK(); } @@ -539,7 +538,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* _shared_hash_table_context->hash_table_variants = local_state._shared_state->hash_table_variants; _shared_hash_table_context->short_circuit_for_null_in_probe_side = - local_state._short_circuit_for_null_in_probe_side; + local_state._has_null_in_build_side; if (local_state._runtime_filter_slots) { local_state._runtime_filter_slots->copy_to_shared_context( _shared_hash_table_context); @@ -557,7 +556,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state.profile()->add_info_string( "SharedHashTableFrom", print_id(_shared_hashtable_controller->get_builder_fragment_instance_id(id()))); - local_state._short_circuit_for_null_in_probe_side = + local_state._has_null_in_build_side = _shared_hash_table_context->short_circuit_for_null_in_probe_side; local_state._shared_state->hash_table_variants = std::static_pointer_cast( diff --git a/be/src/pipeline/exec/join_build_sink_operator.h b/be/src/pipeline/exec/join_build_sink_operator.h index a67c724f69..2f7a3ec03e 100644 --- a/be/src/pipeline/exec/join_build_sink_operator.h +++ b/be/src/pipeline/exec/join_build_sink_operator.h @@ -40,7 +40,7 @@ protected: template friend class JoinBuildSinkOperatorX; - bool _short_circuit_for_null_in_probe_side = false; + bool _has_null_in_build_side = false; RuntimeProfile::Counter* _build_rows_counter; RuntimeProfile::Counter* _push_down_timer; @@ -73,8 +73,8 @@ protected: // For null aware left anti join, we apply a short circuit strategy. // 1. Set _short_circuit_for_null_in_build_side to true if join operator is null aware left anti join. - // 2. In build phase, we stop materialize build side when we meet the first null value and set _short_circuit_for_null_in_probe_side to true. - // 3. In probe phase, if _short_circuit_for_null_in_probe_side is true, join node returns empty block directly. Otherwise, probing will continue as the same as generic left anti join. + // 2. In build phase, we stop materialize build side when we meet the first null value and set _has_null_in_build_side to true. + // 3. In probe phase, if _has_null_in_build_side is true, join node returns empty block directly. Otherwise, probing will continue as the same as generic left anti join. const bool _short_circuit_for_null_in_build_side; }; diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index c69b49870d..575b305b01 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -504,7 +504,7 @@ private: struct JoinSharedState { // For some join case, we can apply a short circuit strategy - // 1. _short_circuit_for_null_in_probe_side = true + // 1. _has_null_in_build_side = true // 2. build side rows is empty, Join op is: inner join/right outer join/left semi/right semi/right anti bool short_circuit_for_probe = false; vectorized::JoinOpVariants join_op_variants; diff --git a/be/src/vec/columns/column_filter_helper.cpp b/be/src/vec/columns/column_filter_helper.cpp new file mode 100644 index 0000000000..f65bd8d864 --- /dev/null +++ b/be/src/vec/columns/column_filter_helper.cpp @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/columns/column_filter_helper.h" + +namespace doris::vectorized { +ColumnFilterHelper::ColumnFilterHelper(IColumn& column_) + : _column(assert_cast(column_)), + _value_column(assert_cast(_column.get_nested_column())), + _null_map_column(_column.get_null_map_column()) {} + +void ColumnFilterHelper::resize_fill(size_t size, doris::vectorized::UInt8 value) { + _value_column.get_data().resize_fill(size, value); + _null_map_column.get_data().resize_fill(size, 0); +} + +void ColumnFilterHelper::insert_value(doris::vectorized::UInt8 value) { + _value_column.get_data().push_back(value); + _null_map_column.get_data().push_back(0); +} + +void ColumnFilterHelper::insert_null() { + _value_column.insert_default(); + _null_map_column.get_data().push_back(1); +} + +void ColumnFilterHelper::reserve(size_t size) { + _value_column.reserve(size); + _null_map_column.reserve(size); +} + +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/columns/column_filter_helper.h b/be/src/vec/columns/column_filter_helper.h new file mode 100644 index 0000000000..2dc529ef3b --- /dev/null +++ b/be/src/vec/columns/column_filter_helper.h @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "column_nullable.h" + +namespace doris::vectorized { +class ColumnFilterHelper { +public: + ColumnFilterHelper(IColumn&); + + void resize_fill(size_t size, UInt8 value); + void insert_null(); + void insert_value(UInt8 value); + void reserve(size_t size); + + [[nodiscard]] size_t size() const { return _column.size(); } + +private: + ColumnNullable& _column; + ColumnUInt8& _value_column; + ColumnUInt8& _null_map_column; +}; +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h b/be/src/vec/exec/join/process_hash_table_probe_impl.h index 0cee4fe749..edaa705640 100644 --- a/be/src/vec/exec/join/process_hash_table_probe_impl.h +++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h @@ -21,6 +21,7 @@ #include "process_hash_table_probe.h" #include "runtime/thread_context.h" // IWYU pragma: keep #include "util/simd/bits.h" +#include "vec/columns/column_filter_helper.h" #include "vec/exprs/vexpr_context.h" #include "vhash_join_node.h" @@ -330,6 +331,7 @@ Status ProcessHashTableProbe::do_process(HashTableType& hash_table_c int current_offset = 0; bool all_match_one = true; size_t probe_size = 0; + auto& probe_row_match_iter = _probe_row_match( current_offset, probe_index, probe_size, all_match_one); @@ -353,6 +355,11 @@ Status ProcessHashTableProbe::do_process(HashTableType& hash_table_c _probe_hash(keys, hash_table_ctx, null_map); + std::unique_ptr mark_column; + if (is_mark_join) { + mark_column = std::make_unique(*mcol[mcol.size() - 1]); + } + { SCOPED_TIMER(_search_hashtable_timer); using FindResult = decltype(key_getter.find_key(hash_table_ctx.hash_table, 0, *_arena)); @@ -399,9 +406,14 @@ Status ProcessHashTableProbe::do_process(HashTableType& hash_table_c (JoinOpType != TJoinOp::LEFT_SEMI_JOIN) ^ find_result.is_found(); if constexpr (is_mark_join) { ++current_offset; - assert_cast&>(*mcol[mcol.size() - 1]) - .get_data() - .template push_back(need_go_ahead); + bool null_result = + (*null_map)[probe_index] || + (!need_go_ahead && _join_context->_has_null_value_in_build_side); + if (null_result) { + mark_column->insert_null(); + } else { + mark_column->insert_value(need_go_ahead); + } } else { current_offset += need_go_ahead; } @@ -650,21 +662,21 @@ Status ProcessHashTableProbe::do_other_join_conjuncts( } } + /// FIXME: incorrect result of semi mark join with other conjuncts(null value missed). if (is_mark_join) { - auto& matched_map = assert_cast&>( - *(output_block->get_by_position(orig_columns - 1) - .column->assume_mutable())) - .get_data(); + auto mark_column = + output_block->get_by_position(orig_columns - 1).column->assume_mutable(); + ColumnFilterHelper helper(*mark_column); // For mark join, we only filter rows which have duplicate join keys. // And then, we set matched_map to the join result to do the mark join's filtering. for (size_t i = 1; i < row_count; ++i) { if (!_same_to_prev[i]) { - matched_map.push_back(filter_map[i - 1]); + helper.insert_value(filter_map[i - 1]); filter_map[i - 1] = true; } } - matched_map.push_back(filter_map[filter_map.size() - 1]); + helper.insert_value(filter_map[filter_map.size() - 1]); filter_map[filter_map.size() - 1] = true; } diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index aa91846cc8..5f769e4caf 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -129,7 +129,8 @@ HashJoinProbeContext::HashJoinProbeContext(HashJoinNode* join_node) _probe_key_sz(join_node->_probe_key_sz), _left_output_slot_flags(&join_node->_left_output_slot_flags), _right_output_slot_flags(&join_node->_right_output_slot_flags), - _is_any_probe_match_row_output(&join_node->_is_any_probe_match_row_output) {} + _is_any_probe_match_row_output(&join_node->_is_any_probe_match_row_output), + _has_null_value_in_build_side(join_node->_has_null_in_build_side) {} HashJoinProbeContext::HashJoinProbeContext(pipeline::HashJoinProbeLocalState* local_state) : _have_other_join_conjunct(local_state->join_probe()->_have_other_join_conjunct), @@ -435,10 +436,10 @@ Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_ return Status::OK(); } - if (_short_circuit_for_null_in_probe_side && _is_mark_join) { - /// If `_short_circuit_for_null_in_probe_side` is true, this indicates no rows - /// match the join condition, and this is 'mark join', so we need to create a column as mark - /// with all rows set to 0. + /// `_has_null_in_build_side` means have null value in build side. + /// `_short_circuit_for_null_in_build_side` means short circuit if has null in build side(e.g. null aware left anti join). + if (_has_null_in_build_side && _short_circuit_for_null_in_build_side && _is_mark_join) { + /// We need to create a column as mark with all rows set to NULL. auto block_rows = _probe_block.rows(); if (block_rows == 0) { *eos = _probe_eos; @@ -452,8 +453,10 @@ Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_ temp_block.insert(_probe_block.get_by_position(i)); } } - auto mark_column = ColumnUInt8::create(block_rows, 0); - temp_block.insert({std::move(mark_column), std::make_shared(), ""}); + auto mark_column = ColumnNullable::create(ColumnUInt8::create(block_rows, 0), + ColumnUInt8::create(block_rows, 1)); + temp_block.insert( + {std::move(mark_column), make_nullable(std::make_shared()), ""}); { SCOPED_TIMER(_join_filter_timer); @@ -810,7 +813,7 @@ Status HashJoinNode::_materialize_build_side(RuntimeState* state) { Block block; // If eos or have already met a null value using short-circuit strategy, we do not need to pull // data from data. - while (!eos && !_short_circuit_for_null_in_probe_side && + while (!eos && (!_short_circuit_for_null_in_build_side || !_has_null_in_build_side) && (!_probe_open_finish || !_is_hash_join_early_start_probe_eos(state))) { block.clear_column_data(); RETURN_IF_CANCELLED(state); @@ -839,8 +842,8 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc // make one block for each 4 gigabytes constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL; - if (_short_circuit_for_null_in_probe_side) { - // TODO: if _short_circuit_for_null_in_probe_side is true we should finish current pipeline task. + if (_has_null_in_build_side) { + // TODO: if _has_null_in_build_side is true we should finish current pipeline task. DCHECK(state->enable_pipeline_exec()); return Status::OK(); } @@ -913,7 +916,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc _shared_hash_table_context->blocks = _build_blocks; _shared_hash_table_context->hash_table_variants = _hash_table_variants; _shared_hash_table_context->short_circuit_for_null_in_probe_side = - _short_circuit_for_null_in_probe_side; + _has_null_in_build_side; if (_runtime_filter_slots) { _runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context); } @@ -930,8 +933,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc _build_phase_profile->add_info_string( "SharedHashTableFrom", print_id(_shared_hashtable_controller->get_builder_fragment_instance_id(id()))); - _short_circuit_for_null_in_probe_side = - _shared_hash_table_context->short_circuit_for_null_in_probe_side; + _has_null_in_build_side = _shared_hash_table_context->short_circuit_for_null_in_probe_side; _hash_table_variants = std::static_pointer_cast( _shared_hash_table_context->hash_table_variants); _build_blocks = _shared_hash_table_context->blocks; @@ -1117,7 +1119,7 @@ Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, uin has_null_value || short_circuit_for_null_in_build_side ? &null_map_val->get_data() : nullptr, - &_short_circuit_for_null_in_probe_side); + &_has_null_in_build_side); }}, *_hash_table_variants, make_bool_variant(_build_side_ignore_null), make_bool_variant(_short_circuit_for_null_in_build_side)); diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index c75ab58357..c4a369e802 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -235,6 +235,9 @@ struct ProcessHashTableBuild { } if constexpr (ignore_null) { if ((*null_map)[k]) { + if (has_null_key) { + *has_null_key = true; + } continue; } } @@ -525,6 +528,7 @@ struct HashJoinProbeContext { // for cases when a probe row matches more than batch size build rows. bool* _is_any_probe_match_row_output; + bool _has_null_value_in_build_side {}; }; class HashJoinNode final : public VJoinNodeBase { @@ -576,8 +580,8 @@ private: void _init_short_circuit_for_probe() override { _short_circuit_for_probe = - (_short_circuit_for_null_in_probe_side && - _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_mark_join) || + (_has_null_in_build_side && _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && + !_is_mark_join) || (_build_blocks->empty() && _join_op == TJoinOp::INNER_JOIN && !_is_mark_join) || (_build_blocks->empty() && _join_op == TJoinOp::LEFT_SEMI_JOIN && !_is_mark_join) || (_build_blocks->empty() && _join_op == TJoinOp::RIGHT_OUTER_JOIN) || diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp b/be/src/vec/exec/join/vjoin_node_base.cpp index 2a3a06f404..4cdb7aed78 100644 --- a/be/src/vec/exec/join/vjoin_node_base.cpp +++ b/be/src/vec/exec/join/vjoin_node_base.cpp @@ -146,11 +146,9 @@ void VJoinNodeBase::_construct_mutable_join_block() { _join_block.insert({type_ptr->create_column(), type_ptr, slot_desc->col_name()}); } } - if (_is_mark_join) { - _join_block.replace_by_position( - _join_block.columns() - 1, - remove_nullable(_join_block.get_by_position(_join_block.columns() - 1).column)); - } + + DCHECK(!_is_mark_join || + _join_block.get_by_position(_join_block.columns() - 1).column->is_nullable()); } Status VJoinNodeBase::_build_output_block(Block* origin_block, Block* output_block, diff --git a/be/src/vec/exec/join/vjoin_node_base.h b/be/src/vec/exec/join/vjoin_node_base.h index 0de7ae1106..9bb946bc0e 100644 --- a/be/src/vec/exec/join/vjoin_node_base.h +++ b/be/src/vec/exec/join/vjoin_node_base.h @@ -117,13 +117,13 @@ protected: // For null aware left anti join, we apply a short circuit strategy. // 1. Set _short_circuit_for_null_in_build_side to true if join operator is null aware left anti join. - // 2. In build phase, we stop materialize build side when we meet the first null value and set _short_circuit_for_null_in_probe_side to true. - // 3. In probe phase, if _short_circuit_for_null_in_probe_side is true, join node returns empty block directly. Otherwise, probing will continue as the same as generic left anti join. + // 2. In build phase, we stop materialize build side when we meet the first null value and set _has_null_in_build_side to true. + // 3. In probe phase, if _has_null_in_build_side is true, join node returns empty block directly. Otherwise, probing will continue as the same as generic left anti join. const bool _short_circuit_for_null_in_build_side = false; - bool _short_circuit_for_null_in_probe_side = false; + bool _has_null_in_build_side = false; // For some join case, we can apply a short circuit strategy - // 1. _short_circuit_for_null_in_probe_side = true + // 1. _has_null_in_build_side = true // 2. build side rows is empty, Join op is: inner join/right outer join/left semi/right semi/right anti bool _short_circuit_for_probe = false; diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp index 49aa6970ba..7996333a40 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.cpp +++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp @@ -48,6 +48,7 @@ #include "util/simd/bits.h" #include "util/telemetry/telemetry.h" #include "vec/columns/column_const.h" +#include "vec/columns/column_filter_helper.h" #include "vec/columns/column_nullable.h" #include "vec/columns/column_vector.h" #include "vec/columns/columns_number.h" @@ -300,10 +301,9 @@ void VNestedLoopJoinNode::_append_left_data_with_null(MutableBlock& mutable_bloc for (size_t i = 0; i < _num_build_side_columns; ++i) { dst_columns[_num_probe_side_columns + i]->insert_many_defaults(_left_side_process_count); } - IColumn::Filter& mark_data = assert_cast&>( - *dst_columns[dst_columns.size() - 1]) - .get_data(); - mark_data.resize_fill(mark_data.size() + _left_side_process_count, 0); + + auto& mark_column = *dst_columns[dst_columns.size() - 1]; + ColumnFilterHelper(mark_column).resize_fill(mark_column.size() + _left_side_process_count, 0); } void VNestedLoopJoinNode::_process_left_child_block(MutableBlock& mutable_block, @@ -363,12 +363,9 @@ void VNestedLoopJoinNode::_update_additional_flags(Block* block) { } } if (_is_mark_join) { - IColumn::Filter& mark_data = - assert_cast&>( - *block->get_by_position(block->columns() - 1).column->assume_mutable()) - .get_data(); - if (mark_data.size() < block->rows()) { - mark_data.resize_fill(block->rows(), 1); + auto mark_column = block->get_by_position(block->columns() - 1).column->assume_mutable(); + if (mark_column->size() < block->rows()) { + ColumnFilterHelper(*mark_column).resize_fill(block->rows(), 1); } } } @@ -490,14 +487,12 @@ void VNestedLoopJoinNode::_finalize_current_phase(MutableBlock& mutable_block, s _resize_fill_tuple_is_null_column(new_size, 0, 1); } } else { - IColumn::Filter& mark_data = assert_cast&>( - *dst_columns[dst_columns.size() - 1]) - .get_data(); - mark_data.reserve(mark_data.size() + _left_side_process_count); + ColumnFilterHelper mark_column(*dst_columns[dst_columns.size() - 1]); + mark_column.reserve(mark_column.size() + _left_side_process_count); DCHECK_LE(_left_block_start_pos + _left_side_process_count, _left_block.rows()); for (int j = _left_block_start_pos; j < _left_block_start_pos + _left_side_process_count; ++j) { - mark_data.emplace_back(IsSemi == _cur_probe_row_visited_flags[j]); + mark_column.insert_value(IsSemi == _cur_probe_row_visited_flags[j]); } for (size_t i = 0; i < _num_probe_side_columns; ++i) { const ColumnWithTypeAndName src_column = _left_block.get_by_position(i); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/MarkJoinSlotReference.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/MarkJoinSlotReference.java index 099e64eb5d..021fcea1a3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/MarkJoinSlotReference.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/MarkJoinSlotReference.java @@ -29,17 +29,17 @@ public class MarkJoinSlotReference extends SlotReference implements SlotNotFromC final boolean existsHasAgg; public MarkJoinSlotReference(String name) { - super(name, BooleanType.INSTANCE, false); + super(name, BooleanType.INSTANCE, true); this.existsHasAgg = false; } public MarkJoinSlotReference(String name, boolean existsHasAgg) { - super(name, BooleanType.INSTANCE, false); + super(name, BooleanType.INSTANCE, true); this.existsHasAgg = existsHasAgg; } public MarkJoinSlotReference(ExprId exprId, String name, boolean existsHasAgg) { - super(exprId, name, BooleanType.INSTANCE, false, ImmutableList.of()); + super(exprId, name, BooleanType.INSTANCE, true, ImmutableList.of()); this.existsHasAgg = existsHasAgg; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java index dc969418ce..b9e61e256b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java @@ -53,7 +53,8 @@ import java.util.stream.Collectors; public class JoinUtils { public static boolean couldShuffle(Join join) { // Cross-join and Null-Aware-Left-Anti-Join only can be broadcast join. - return !(join.getJoinType().isCrossJoin()) && !(join.getJoinType().isNullAwareLeftAntiJoin()); + // Because mark join would consider null value from both build and probe side, so must use broadcast join too. + return !(join.getJoinType().isCrossJoin() || join.getJoinType().isNullAwareLeftAntiJoin() || join.isMarkJoin()); } public static boolean couldBroadcast(Join join) { diff --git a/regression-test/data/nereids_syntax_p0/sub_query_correlated.out b/regression-test/data/nereids_syntax_p0/sub_query_correlated.out index 732a72a390..647babc200 100644 --- a/regression-test/data/nereids_syntax_p0/sub_query_correlated.out +++ b/regression-test/data/nereids_syntax_p0/sub_query_correlated.out @@ -450,3 +450,18 @@ 22 3 24 4 +-- !mark_join_nullable -- +\N +\N +\N +\N +\N +\N +true +true +true +true +\N +\N +\N + diff --git a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query10.out b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query10.out index fcb2ade8da..f732357c25 100644 --- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query10.out +++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query10.out @@ -19,11 +19,11 @@ PhysicalResultSink ----------------------------PhysicalProject ------------------------------filter((date_dim.d_moy <= 4) and (date_dim.d_moy >= 1) and (date_dim.d_year = 2001)) --------------------------------PhysicalOlapScan[date_dim] ---------------------PhysicalProject -----------------------filter(($c$1 OR $c$2)) -------------------------hashJoin[LEFT_SEMI_JOIN] hashCondition=((c.c_customer_sk = catalog_sales.cs_ship_customer_sk))otherCondition=() ---------------------------hashJoin[LEFT_SEMI_JOIN] hashCondition=((c.c_customer_sk = web_sales.ws_bill_customer_sk))otherCondition=() -----------------------------PhysicalDistribute +--------------------PhysicalDistribute +----------------------PhysicalProject +------------------------filter(($c$1 OR $c$2)) +--------------------------hashJoin[LEFT_SEMI_JOIN] hashCondition=((c.c_customer_sk = catalog_sales.cs_ship_customer_sk))otherCondition=() +----------------------------hashJoin[LEFT_SEMI_JOIN] hashCondition=((c.c_customer_sk = web_sales.ws_bill_customer_sk))otherCondition=() ------------------------------PhysicalProject --------------------------------hashJoin[INNER_JOIN] hashCondition=((customer_demographics.cd_demo_sk = c.c_current_cdemo_sk))otherCondition=() ----------------------------------PhysicalOlapScan[customer_demographics] @@ -36,22 +36,22 @@ PhysicalResultSink ------------------------------------------PhysicalProject --------------------------------------------filter(ca_county IN ('Cochran County', 'Kandiyohi County', 'Marquette County', 'Storey County', 'Warren County')) ----------------------------------------------PhysicalOlapScan[customer_address] +------------------------------PhysicalDistribute +--------------------------------PhysicalProject +----------------------------------hashJoin[INNER_JOIN] hashCondition=((web_sales.ws_sold_date_sk = date_dim.d_date_sk))otherCondition=() +------------------------------------PhysicalProject +--------------------------------------PhysicalOlapScan[web_sales] +------------------------------------PhysicalDistribute +--------------------------------------PhysicalProject +----------------------------------------filter((date_dim.d_moy <= 4) and (date_dim.d_moy >= 1) and (date_dim.d_year = 2001)) +------------------------------------------PhysicalOlapScan[date_dim] ----------------------------PhysicalDistribute ------------------------------PhysicalProject ---------------------------------hashJoin[INNER_JOIN] hashCondition=((web_sales.ws_sold_date_sk = date_dim.d_date_sk))otherCondition=() +--------------------------------hashJoin[INNER_JOIN] hashCondition=((catalog_sales.cs_sold_date_sk = date_dim.d_date_sk))otherCondition=() ----------------------------------PhysicalProject -------------------------------------PhysicalOlapScan[web_sales] +------------------------------------PhysicalOlapScan[catalog_sales] ----------------------------------PhysicalDistribute ------------------------------------PhysicalProject --------------------------------------filter((date_dim.d_moy <= 4) and (date_dim.d_moy >= 1) and (date_dim.d_year = 2001)) ----------------------------------------PhysicalOlapScan[date_dim] ---------------------------PhysicalDistribute -----------------------------PhysicalProject -------------------------------hashJoin[INNER_JOIN] hashCondition=((catalog_sales.cs_sold_date_sk = date_dim.d_date_sk))otherCondition=() ---------------------------------PhysicalProject -----------------------------------PhysicalOlapScan[catalog_sales] ---------------------------------PhysicalDistribute -----------------------------------PhysicalProject -------------------------------------filter((date_dim.d_moy <= 4) and (date_dim.d_moy >= 1) and (date_dim.d_year = 2001)) ---------------------------------------PhysicalOlapScan[date_dim] diff --git a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query35.out b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query35.out index 47b9f1061a..4bf6857ee7 100644 --- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query35.out +++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query35.out @@ -19,11 +19,11 @@ PhysicalResultSink ----------------------------PhysicalProject ------------------------------filter((date_dim.d_qoy < 4) and (date_dim.d_year = 2001)) --------------------------------PhysicalOlapScan[date_dim] ---------------------PhysicalProject -----------------------filter(($c$1 OR $c$2)) -------------------------hashJoin[LEFT_SEMI_JOIN] hashCondition=((c.c_customer_sk = catalog_sales.cs_ship_customer_sk))otherCondition=() ---------------------------hashJoin[LEFT_SEMI_JOIN] hashCondition=((c.c_customer_sk = web_sales.ws_bill_customer_sk))otherCondition=() -----------------------------PhysicalDistribute +--------------------PhysicalDistribute +----------------------PhysicalProject +------------------------filter(($c$1 OR $c$2)) +--------------------------hashJoin[LEFT_SEMI_JOIN] hashCondition=((c.c_customer_sk = catalog_sales.cs_ship_customer_sk))otherCondition=() +----------------------------hashJoin[LEFT_SEMI_JOIN] hashCondition=((c.c_customer_sk = web_sales.ws_bill_customer_sk))otherCondition=() ------------------------------PhysicalProject --------------------------------hashJoin[INNER_JOIN] hashCondition=((customer_demographics.cd_demo_sk = c.c_current_cdemo_sk))otherCondition=() ----------------------------------PhysicalDistribute @@ -38,22 +38,22 @@ PhysicalResultSink ----------------------------------PhysicalDistribute ------------------------------------PhysicalProject --------------------------------------PhysicalOlapScan[customer_demographics] +------------------------------PhysicalDistribute +--------------------------------PhysicalProject +----------------------------------hashJoin[INNER_JOIN] hashCondition=((web_sales.ws_sold_date_sk = date_dim.d_date_sk))otherCondition=() +------------------------------------PhysicalProject +--------------------------------------PhysicalOlapScan[web_sales] +------------------------------------PhysicalDistribute +--------------------------------------PhysicalProject +----------------------------------------filter((date_dim.d_qoy < 4) and (date_dim.d_year = 2001)) +------------------------------------------PhysicalOlapScan[date_dim] ----------------------------PhysicalDistribute ------------------------------PhysicalProject ---------------------------------hashJoin[INNER_JOIN] hashCondition=((web_sales.ws_sold_date_sk = date_dim.d_date_sk))otherCondition=() +--------------------------------hashJoin[INNER_JOIN] hashCondition=((catalog_sales.cs_sold_date_sk = date_dim.d_date_sk))otherCondition=() ----------------------------------PhysicalProject -------------------------------------PhysicalOlapScan[web_sales] +------------------------------------PhysicalOlapScan[catalog_sales] ----------------------------------PhysicalDistribute ------------------------------------PhysicalProject --------------------------------------filter((date_dim.d_qoy < 4) and (date_dim.d_year = 2001)) ----------------------------------------PhysicalOlapScan[date_dim] ---------------------------PhysicalDistribute -----------------------------PhysicalProject -------------------------------hashJoin[INNER_JOIN] hashCondition=((catalog_sales.cs_sold_date_sk = date_dim.d_date_sk))otherCondition=() ---------------------------------PhysicalProject -----------------------------------PhysicalOlapScan[catalog_sales] ---------------------------------PhysicalDistribute -----------------------------------PhysicalProject -------------------------------------filter((date_dim.d_qoy < 4) and (date_dim.d_year = 2001)) ---------------------------------------PhysicalOlapScan[date_dim] diff --git a/regression-test/suites/nereids_syntax_p0/sub_query_correlated.groovy b/regression-test/suites/nereids_syntax_p0/sub_query_correlated.groovy index 482eab7a6a..6664ad0c6c 100644 --- a/regression-test/suites/nereids_syntax_p0/sub_query_correlated.groovy +++ b/regression-test/suites/nereids_syntax_p0/sub_query_correlated.groovy @@ -50,6 +50,14 @@ suite ("sub_query_correlated") { DROP TABLE IF EXISTS `sub_query_correlated_subquery7` """ + sql """ + DROP TABLE IF EXISTS `sub_query_correlated_subquery8` + """ + + sql """ + DROP TABLE IF EXISTS `sub_query_correlated_subquery9` + """ + sql """ create table if not exists sub_query_correlated_subquery1 (k1 bigint, k2 bigint) @@ -105,6 +113,21 @@ suite ("sub_query_correlated") { properties('replication_num' = '1'); """ + sql """ + create table if not exists sub_query_correlated_subquery8 + (k1 bigint, k2 bigint) + duplicate key(k1) + distributed by hash(k2) buckets 1 + properties('replication_num' = '1') + """ + + sql """ + create table if not exists sub_query_correlated_subquery9 + (k1 int, k2 varchar(128), k3 bigint, v1 bigint, v2 bigint) + distributed by hash(k2) buckets 1 + properties('replication_num' = '1'); + """ + sql """ insert into sub_query_correlated_subquery1 values (1,2), (1,3), (2,4), (2,5), (3,3), (3,4), (20,2), (22,3), (24,4) """ @@ -126,7 +149,7 @@ suite ("sub_query_correlated") { insert into sub_query_correlated_subquery5 values (5,4), (5,2), (8,3), (5,4), (6,7), (8,9) """ - sql """ + sql """ insert into sub_query_correlated_subquery6 values (1,null),(null,1),(1,2), (null,2),(1,3), (2,4), (2,5), (3,3), (3,4), (20,2), (22,3), (24,4),(null,null); """ @@ -135,6 +158,15 @@ suite ("sub_query_correlated") { (2,"uvw",3,4,2), (2,"uvw",3,4,2), (3,"abc",4,5,3), (3,"abc",4,5,3), (null,null,null,null,null); """ + sql """ + insert into sub_query_correlated_subquery8 values (1,null),(null,1),(1,2), (null,2),(1,3), (2,4), (2,5), (3,3), (3,4), (20,2), (22,3), (24,4),(null,null); + """ + + sql """ + insert into sub_query_correlated_subquery9 values (1,"abc",2,3,4), (1,"abcd",3,3,4), + (2,"xyz",2,4,2),(2,"uvw",3,4,2), (2,"uvw",3,4,2), (3,"abc",4,5,3), (3,"abc",4,5,3), (null,null,null,null,null); + """ + sql "SET enable_fallback_to_original_planner=false" //------------------Correlated----------------- @@ -496,6 +528,10 @@ suite ("sub_query_correlated") { order by k1, k2; """ + qt_mark_join_nullable """ + select sub_query_correlated_subquery8.k1 in (select sub_query_correlated_subquery9.k3 from sub_query_correlated_subquery9) from sub_query_correlated_subquery8 order by k1, k2; + """ + // order_qt_doris_6937_2 """ // select * from sub_query_correlated_subquery1 where sub_query_correlated_subquery1.k1 not in (select sub_query_correlated_subquery3.k3 from sub_query_correlated_subquery3 where sub_query_correlated_subquery3.v2 > sub_query_correlated_subquery1.k2) or k1 < 10 order by k1, k2; // """