[fix](mark join) mark join column should be nullable (#24910)
This commit is contained in:
@ -139,8 +139,7 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* state) {
|
||||
void HashJoinBuildSinkLocalState::init_short_circuit_for_probe() {
|
||||
auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
|
||||
_shared_state->short_circuit_for_probe =
|
||||
(_short_circuit_for_null_in_probe_side &&
|
||||
p._join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) ||
|
||||
(_has_null_in_build_side && p._join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) ||
|
||||
(_shared_state->build_blocks->empty() && p._join_op == TJoinOp::INNER_JOIN &&
|
||||
!p._is_mark_join) ||
|
||||
(_shared_state->build_blocks->empty() && p._join_op == TJoinOp::LEFT_SEMI_JOIN &&
|
||||
@ -204,7 +203,7 @@ Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
|
||||
has_null_value || short_circuit_for_null_in_build_side
|
||||
? &null_map_val->get_data()
|
||||
: nullptr,
|
||||
&_short_circuit_for_null_in_probe_side);
|
||||
&_has_null_in_build_side);
|
||||
}},
|
||||
*_shared_state->hash_table_variants,
|
||||
vectorized::make_bool_variant(_build_side_ignore_null),
|
||||
@ -453,8 +452,8 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
|
||||
// make one block for each 4 gigabytes
|
||||
constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
|
||||
|
||||
if (local_state._short_circuit_for_null_in_probe_side) {
|
||||
// TODO: if _short_circuit_for_null_in_probe_side is true we should finish current pipeline task.
|
||||
if (local_state._has_null_in_build_side) {
|
||||
// TODO: if _has_null_in_build_side is true we should finish current pipeline task.
|
||||
DCHECK(state->enable_pipeline_exec());
|
||||
return Status::OK();
|
||||
}
|
||||
@ -539,7 +538,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
|
||||
_shared_hash_table_context->hash_table_variants =
|
||||
local_state._shared_state->hash_table_variants;
|
||||
_shared_hash_table_context->short_circuit_for_null_in_probe_side =
|
||||
local_state._short_circuit_for_null_in_probe_side;
|
||||
local_state._has_null_in_build_side;
|
||||
if (local_state._runtime_filter_slots) {
|
||||
local_state._runtime_filter_slots->copy_to_shared_context(
|
||||
_shared_hash_table_context);
|
||||
@ -557,7 +556,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
|
||||
local_state.profile()->add_info_string(
|
||||
"SharedHashTableFrom",
|
||||
print_id(_shared_hashtable_controller->get_builder_fragment_instance_id(id())));
|
||||
local_state._short_circuit_for_null_in_probe_side =
|
||||
local_state._has_null_in_build_side =
|
||||
_shared_hash_table_context->short_circuit_for_null_in_probe_side;
|
||||
local_state._shared_state->hash_table_variants =
|
||||
std::static_pointer_cast<vectorized::HashTableVariants>(
|
||||
|
||||
@ -40,7 +40,7 @@ protected:
|
||||
template <typename LocalStateType>
|
||||
friend class JoinBuildSinkOperatorX;
|
||||
|
||||
bool _short_circuit_for_null_in_probe_side = false;
|
||||
bool _has_null_in_build_side = false;
|
||||
|
||||
RuntimeProfile::Counter* _build_rows_counter;
|
||||
RuntimeProfile::Counter* _push_down_timer;
|
||||
@ -73,8 +73,8 @@ protected:
|
||||
|
||||
// For null aware left anti join, we apply a short circuit strategy.
|
||||
// 1. Set _short_circuit_for_null_in_build_side to true if join operator is null aware left anti join.
|
||||
// 2. In build phase, we stop materialize build side when we meet the first null value and set _short_circuit_for_null_in_probe_side to true.
|
||||
// 3. In probe phase, if _short_circuit_for_null_in_probe_side is true, join node returns empty block directly. Otherwise, probing will continue as the same as generic left anti join.
|
||||
// 2. In build phase, we stop materialize build side when we meet the first null value and set _has_null_in_build_side to true.
|
||||
// 3. In probe phase, if _has_null_in_build_side is true, join node returns empty block directly. Otherwise, probing will continue as the same as generic left anti join.
|
||||
const bool _short_circuit_for_null_in_build_side;
|
||||
};
|
||||
|
||||
|
||||
@ -504,7 +504,7 @@ private:
|
||||
|
||||
struct JoinSharedState {
|
||||
// For some join case, we can apply a short circuit strategy
|
||||
// 1. _short_circuit_for_null_in_probe_side = true
|
||||
// 1. _has_null_in_build_side = true
|
||||
// 2. build side rows is empty, Join op is: inner join/right outer join/left semi/right semi/right anti
|
||||
bool short_circuit_for_probe = false;
|
||||
vectorized::JoinOpVariants join_op_variants;
|
||||
|
||||
46
be/src/vec/columns/column_filter_helper.cpp
Normal file
46
be/src/vec/columns/column_filter_helper.cpp
Normal file
@ -0,0 +1,46 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "vec/columns/column_filter_helper.h"
|
||||
|
||||
namespace doris::vectorized {
|
||||
ColumnFilterHelper::ColumnFilterHelper(IColumn& column_)
|
||||
: _column(assert_cast<ColumnNullable&>(column_)),
|
||||
_value_column(assert_cast<ColumnUInt8&>(_column.get_nested_column())),
|
||||
_null_map_column(_column.get_null_map_column()) {}
|
||||
|
||||
void ColumnFilterHelper::resize_fill(size_t size, doris::vectorized::UInt8 value) {
|
||||
_value_column.get_data().resize_fill(size, value);
|
||||
_null_map_column.get_data().resize_fill(size, 0);
|
||||
}
|
||||
|
||||
void ColumnFilterHelper::insert_value(doris::vectorized::UInt8 value) {
|
||||
_value_column.get_data().push_back(value);
|
||||
_null_map_column.get_data().push_back(0);
|
||||
}
|
||||
|
||||
void ColumnFilterHelper::insert_null() {
|
||||
_value_column.insert_default();
|
||||
_null_map_column.get_data().push_back(1);
|
||||
}
|
||||
|
||||
void ColumnFilterHelper::reserve(size_t size) {
|
||||
_value_column.reserve(size);
|
||||
_null_map_column.reserve(size);
|
||||
}
|
||||
|
||||
} // namespace doris::vectorized
|
||||
39
be/src/vec/columns/column_filter_helper.h
Normal file
39
be/src/vec/columns/column_filter_helper.h
Normal file
@ -0,0 +1,39 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "column_nullable.h"
|
||||
|
||||
namespace doris::vectorized {
|
||||
class ColumnFilterHelper {
|
||||
public:
|
||||
ColumnFilterHelper(IColumn&);
|
||||
|
||||
void resize_fill(size_t size, UInt8 value);
|
||||
void insert_null();
|
||||
void insert_value(UInt8 value);
|
||||
void reserve(size_t size);
|
||||
|
||||
[[nodiscard]] size_t size() const { return _column.size(); }
|
||||
|
||||
private:
|
||||
ColumnNullable& _column;
|
||||
ColumnUInt8& _value_column;
|
||||
ColumnUInt8& _null_map_column;
|
||||
};
|
||||
} // namespace doris::vectorized
|
||||
@ -21,6 +21,7 @@
|
||||
#include "process_hash_table_probe.h"
|
||||
#include "runtime/thread_context.h" // IWYU pragma: keep
|
||||
#include "util/simd/bits.h"
|
||||
#include "vec/columns/column_filter_helper.h"
|
||||
#include "vec/exprs/vexpr_context.h"
|
||||
#include "vhash_join_node.h"
|
||||
|
||||
@ -330,6 +331,7 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
|
||||
int current_offset = 0;
|
||||
bool all_match_one = true;
|
||||
size_t probe_size = 0;
|
||||
|
||||
auto& probe_row_match_iter = _probe_row_match<Mapped, with_other_conjuncts>(
|
||||
current_offset, probe_index, probe_size, all_match_one);
|
||||
|
||||
@ -353,6 +355,11 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
|
||||
|
||||
_probe_hash<need_null_map_for_probe, HashTableType>(keys, hash_table_ctx, null_map);
|
||||
|
||||
std::unique_ptr<ColumnFilterHelper> mark_column;
|
||||
if (is_mark_join) {
|
||||
mark_column = std::make_unique<ColumnFilterHelper>(*mcol[mcol.size() - 1]);
|
||||
}
|
||||
|
||||
{
|
||||
SCOPED_TIMER(_search_hashtable_timer);
|
||||
using FindResult = decltype(key_getter.find_key(hash_table_ctx.hash_table, 0, *_arena));
|
||||
@ -399,9 +406,14 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
|
||||
(JoinOpType != TJoinOp::LEFT_SEMI_JOIN) ^ find_result.is_found();
|
||||
if constexpr (is_mark_join) {
|
||||
++current_offset;
|
||||
assert_cast<ColumnVector<UInt8>&>(*mcol[mcol.size() - 1])
|
||||
.get_data()
|
||||
.template push_back(need_go_ahead);
|
||||
bool null_result =
|
||||
(*null_map)[probe_index] ||
|
||||
(!need_go_ahead && _join_context->_has_null_value_in_build_side);
|
||||
if (null_result) {
|
||||
mark_column->insert_null();
|
||||
} else {
|
||||
mark_column->insert_value(need_go_ahead);
|
||||
}
|
||||
} else {
|
||||
current_offset += need_go_ahead;
|
||||
}
|
||||
@ -650,21 +662,21 @@ Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
|
||||
}
|
||||
}
|
||||
|
||||
/// FIXME: incorrect result of semi mark join with other conjuncts(null value missed).
|
||||
if (is_mark_join) {
|
||||
auto& matched_map = assert_cast<ColumnVector<UInt8>&>(
|
||||
*(output_block->get_by_position(orig_columns - 1)
|
||||
.column->assume_mutable()))
|
||||
.get_data();
|
||||
auto mark_column =
|
||||
output_block->get_by_position(orig_columns - 1).column->assume_mutable();
|
||||
ColumnFilterHelper helper(*mark_column);
|
||||
|
||||
// For mark join, we only filter rows which have duplicate join keys.
|
||||
// And then, we set matched_map to the join result to do the mark join's filtering.
|
||||
for (size_t i = 1; i < row_count; ++i) {
|
||||
if (!_same_to_prev[i]) {
|
||||
matched_map.push_back(filter_map[i - 1]);
|
||||
helper.insert_value(filter_map[i - 1]);
|
||||
filter_map[i - 1] = true;
|
||||
}
|
||||
}
|
||||
matched_map.push_back(filter_map[filter_map.size() - 1]);
|
||||
helper.insert_value(filter_map[filter_map.size() - 1]);
|
||||
filter_map[filter_map.size() - 1] = true;
|
||||
}
|
||||
|
||||
|
||||
@ -129,7 +129,8 @@ HashJoinProbeContext::HashJoinProbeContext(HashJoinNode* join_node)
|
||||
_probe_key_sz(join_node->_probe_key_sz),
|
||||
_left_output_slot_flags(&join_node->_left_output_slot_flags),
|
||||
_right_output_slot_flags(&join_node->_right_output_slot_flags),
|
||||
_is_any_probe_match_row_output(&join_node->_is_any_probe_match_row_output) {}
|
||||
_is_any_probe_match_row_output(&join_node->_is_any_probe_match_row_output),
|
||||
_has_null_value_in_build_side(join_node->_has_null_in_build_side) {}
|
||||
|
||||
HashJoinProbeContext::HashJoinProbeContext(pipeline::HashJoinProbeLocalState* local_state)
|
||||
: _have_other_join_conjunct(local_state->join_probe()->_have_other_join_conjunct),
|
||||
@ -435,10 +436,10 @@ Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
if (_short_circuit_for_null_in_probe_side && _is_mark_join) {
|
||||
/// If `_short_circuit_for_null_in_probe_side` is true, this indicates no rows
|
||||
/// match the join condition, and this is 'mark join', so we need to create a column as mark
|
||||
/// with all rows set to 0.
|
||||
/// `_has_null_in_build_side` means have null value in build side.
|
||||
/// `_short_circuit_for_null_in_build_side` means short circuit if has null in build side(e.g. null aware left anti join).
|
||||
if (_has_null_in_build_side && _short_circuit_for_null_in_build_side && _is_mark_join) {
|
||||
/// We need to create a column as mark with all rows set to NULL.
|
||||
auto block_rows = _probe_block.rows();
|
||||
if (block_rows == 0) {
|
||||
*eos = _probe_eos;
|
||||
@ -452,8 +453,10 @@ Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_
|
||||
temp_block.insert(_probe_block.get_by_position(i));
|
||||
}
|
||||
}
|
||||
auto mark_column = ColumnUInt8::create(block_rows, 0);
|
||||
temp_block.insert({std::move(mark_column), std::make_shared<DataTypeUInt8>(), ""});
|
||||
auto mark_column = ColumnNullable::create(ColumnUInt8::create(block_rows, 0),
|
||||
ColumnUInt8::create(block_rows, 1));
|
||||
temp_block.insert(
|
||||
{std::move(mark_column), make_nullable(std::make_shared<DataTypeUInt8>()), ""});
|
||||
|
||||
{
|
||||
SCOPED_TIMER(_join_filter_timer);
|
||||
@ -810,7 +813,7 @@ Status HashJoinNode::_materialize_build_side(RuntimeState* state) {
|
||||
Block block;
|
||||
// If eos or have already met a null value using short-circuit strategy, we do not need to pull
|
||||
// data from data.
|
||||
while (!eos && !_short_circuit_for_null_in_probe_side &&
|
||||
while (!eos && (!_short_circuit_for_null_in_build_side || !_has_null_in_build_side) &&
|
||||
(!_probe_open_finish || !_is_hash_join_early_start_probe_eos(state))) {
|
||||
block.clear_column_data();
|
||||
RETURN_IF_CANCELLED(state);
|
||||
@ -839,8 +842,8 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc
|
||||
// make one block for each 4 gigabytes
|
||||
constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
|
||||
|
||||
if (_short_circuit_for_null_in_probe_side) {
|
||||
// TODO: if _short_circuit_for_null_in_probe_side is true we should finish current pipeline task.
|
||||
if (_has_null_in_build_side) {
|
||||
// TODO: if _has_null_in_build_side is true we should finish current pipeline task.
|
||||
DCHECK(state->enable_pipeline_exec());
|
||||
return Status::OK();
|
||||
}
|
||||
@ -913,7 +916,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc
|
||||
_shared_hash_table_context->blocks = _build_blocks;
|
||||
_shared_hash_table_context->hash_table_variants = _hash_table_variants;
|
||||
_shared_hash_table_context->short_circuit_for_null_in_probe_side =
|
||||
_short_circuit_for_null_in_probe_side;
|
||||
_has_null_in_build_side;
|
||||
if (_runtime_filter_slots) {
|
||||
_runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context);
|
||||
}
|
||||
@ -930,8 +933,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc
|
||||
_build_phase_profile->add_info_string(
|
||||
"SharedHashTableFrom",
|
||||
print_id(_shared_hashtable_controller->get_builder_fragment_instance_id(id())));
|
||||
_short_circuit_for_null_in_probe_side =
|
||||
_shared_hash_table_context->short_circuit_for_null_in_probe_side;
|
||||
_has_null_in_build_side = _shared_hash_table_context->short_circuit_for_null_in_probe_side;
|
||||
_hash_table_variants = std::static_pointer_cast<HashTableVariants>(
|
||||
_shared_hash_table_context->hash_table_variants);
|
||||
_build_blocks = _shared_hash_table_context->blocks;
|
||||
@ -1117,7 +1119,7 @@ Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, uin
|
||||
has_null_value || short_circuit_for_null_in_build_side
|
||||
? &null_map_val->get_data()
|
||||
: nullptr,
|
||||
&_short_circuit_for_null_in_probe_side);
|
||||
&_has_null_in_build_side);
|
||||
}},
|
||||
*_hash_table_variants, make_bool_variant(_build_side_ignore_null),
|
||||
make_bool_variant(_short_circuit_for_null_in_build_side));
|
||||
|
||||
@ -235,6 +235,9 @@ struct ProcessHashTableBuild {
|
||||
}
|
||||
if constexpr (ignore_null) {
|
||||
if ((*null_map)[k]) {
|
||||
if (has_null_key) {
|
||||
*has_null_key = true;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@ -525,6 +528,7 @@ struct HashJoinProbeContext {
|
||||
|
||||
// for cases when a probe row matches more than batch size build rows.
|
||||
bool* _is_any_probe_match_row_output;
|
||||
bool _has_null_value_in_build_side {};
|
||||
};
|
||||
|
||||
class HashJoinNode final : public VJoinNodeBase {
|
||||
@ -576,8 +580,8 @@ private:
|
||||
|
||||
void _init_short_circuit_for_probe() override {
|
||||
_short_circuit_for_probe =
|
||||
(_short_circuit_for_null_in_probe_side &&
|
||||
_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_mark_join) ||
|
||||
(_has_null_in_build_side && _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
|
||||
!_is_mark_join) ||
|
||||
(_build_blocks->empty() && _join_op == TJoinOp::INNER_JOIN && !_is_mark_join) ||
|
||||
(_build_blocks->empty() && _join_op == TJoinOp::LEFT_SEMI_JOIN && !_is_mark_join) ||
|
||||
(_build_blocks->empty() && _join_op == TJoinOp::RIGHT_OUTER_JOIN) ||
|
||||
|
||||
@ -146,11 +146,9 @@ void VJoinNodeBase::_construct_mutable_join_block() {
|
||||
_join_block.insert({type_ptr->create_column(), type_ptr, slot_desc->col_name()});
|
||||
}
|
||||
}
|
||||
if (_is_mark_join) {
|
||||
_join_block.replace_by_position(
|
||||
_join_block.columns() - 1,
|
||||
remove_nullable(_join_block.get_by_position(_join_block.columns() - 1).column));
|
||||
}
|
||||
|
||||
DCHECK(!_is_mark_join ||
|
||||
_join_block.get_by_position(_join_block.columns() - 1).column->is_nullable());
|
||||
}
|
||||
|
||||
Status VJoinNodeBase::_build_output_block(Block* origin_block, Block* output_block,
|
||||
|
||||
@ -117,13 +117,13 @@ protected:
|
||||
|
||||
// For null aware left anti join, we apply a short circuit strategy.
|
||||
// 1. Set _short_circuit_for_null_in_build_side to true if join operator is null aware left anti join.
|
||||
// 2. In build phase, we stop materialize build side when we meet the first null value and set _short_circuit_for_null_in_probe_side to true.
|
||||
// 3. In probe phase, if _short_circuit_for_null_in_probe_side is true, join node returns empty block directly. Otherwise, probing will continue as the same as generic left anti join.
|
||||
// 2. In build phase, we stop materialize build side when we meet the first null value and set _has_null_in_build_side to true.
|
||||
// 3. In probe phase, if _has_null_in_build_side is true, join node returns empty block directly. Otherwise, probing will continue as the same as generic left anti join.
|
||||
const bool _short_circuit_for_null_in_build_side = false;
|
||||
bool _short_circuit_for_null_in_probe_side = false;
|
||||
bool _has_null_in_build_side = false;
|
||||
|
||||
// For some join case, we can apply a short circuit strategy
|
||||
// 1. _short_circuit_for_null_in_probe_side = true
|
||||
// 1. _has_null_in_build_side = true
|
||||
// 2. build side rows is empty, Join op is: inner join/right outer join/left semi/right semi/right anti
|
||||
bool _short_circuit_for_probe = false;
|
||||
|
||||
|
||||
@ -48,6 +48,7 @@
|
||||
#include "util/simd/bits.h"
|
||||
#include "util/telemetry/telemetry.h"
|
||||
#include "vec/columns/column_const.h"
|
||||
#include "vec/columns/column_filter_helper.h"
|
||||
#include "vec/columns/column_nullable.h"
|
||||
#include "vec/columns/column_vector.h"
|
||||
#include "vec/columns/columns_number.h"
|
||||
@ -300,10 +301,9 @@ void VNestedLoopJoinNode::_append_left_data_with_null(MutableBlock& mutable_bloc
|
||||
for (size_t i = 0; i < _num_build_side_columns; ++i) {
|
||||
dst_columns[_num_probe_side_columns + i]->insert_many_defaults(_left_side_process_count);
|
||||
}
|
||||
IColumn::Filter& mark_data = assert_cast<doris::vectorized::ColumnVector<UInt8>&>(
|
||||
*dst_columns[dst_columns.size() - 1])
|
||||
.get_data();
|
||||
mark_data.resize_fill(mark_data.size() + _left_side_process_count, 0);
|
||||
|
||||
auto& mark_column = *dst_columns[dst_columns.size() - 1];
|
||||
ColumnFilterHelper(mark_column).resize_fill(mark_column.size() + _left_side_process_count, 0);
|
||||
}
|
||||
|
||||
void VNestedLoopJoinNode::_process_left_child_block(MutableBlock& mutable_block,
|
||||
@ -363,12 +363,9 @@ void VNestedLoopJoinNode::_update_additional_flags(Block* block) {
|
||||
}
|
||||
}
|
||||
if (_is_mark_join) {
|
||||
IColumn::Filter& mark_data =
|
||||
assert_cast<doris::vectorized::ColumnVector<UInt8>&>(
|
||||
*block->get_by_position(block->columns() - 1).column->assume_mutable())
|
||||
.get_data();
|
||||
if (mark_data.size() < block->rows()) {
|
||||
mark_data.resize_fill(block->rows(), 1);
|
||||
auto mark_column = block->get_by_position(block->columns() - 1).column->assume_mutable();
|
||||
if (mark_column->size() < block->rows()) {
|
||||
ColumnFilterHelper(*mark_column).resize_fill(block->rows(), 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -490,14 +487,12 @@ void VNestedLoopJoinNode::_finalize_current_phase(MutableBlock& mutable_block, s
|
||||
_resize_fill_tuple_is_null_column(new_size, 0, 1);
|
||||
}
|
||||
} else {
|
||||
IColumn::Filter& mark_data = assert_cast<doris::vectorized::ColumnVector<UInt8>&>(
|
||||
*dst_columns[dst_columns.size() - 1])
|
||||
.get_data();
|
||||
mark_data.reserve(mark_data.size() + _left_side_process_count);
|
||||
ColumnFilterHelper mark_column(*dst_columns[dst_columns.size() - 1]);
|
||||
mark_column.reserve(mark_column.size() + _left_side_process_count);
|
||||
DCHECK_LE(_left_block_start_pos + _left_side_process_count, _left_block.rows());
|
||||
for (int j = _left_block_start_pos;
|
||||
j < _left_block_start_pos + _left_side_process_count; ++j) {
|
||||
mark_data.emplace_back(IsSemi == _cur_probe_row_visited_flags[j]);
|
||||
mark_column.insert_value(IsSemi == _cur_probe_row_visited_flags[j]);
|
||||
}
|
||||
for (size_t i = 0; i < _num_probe_side_columns; ++i) {
|
||||
const ColumnWithTypeAndName src_column = _left_block.get_by_position(i);
|
||||
|
||||
@ -29,17 +29,17 @@ public class MarkJoinSlotReference extends SlotReference implements SlotNotFromC
|
||||
final boolean existsHasAgg;
|
||||
|
||||
public MarkJoinSlotReference(String name) {
|
||||
super(name, BooleanType.INSTANCE, false);
|
||||
super(name, BooleanType.INSTANCE, true);
|
||||
this.existsHasAgg = false;
|
||||
}
|
||||
|
||||
public MarkJoinSlotReference(String name, boolean existsHasAgg) {
|
||||
super(name, BooleanType.INSTANCE, false);
|
||||
super(name, BooleanType.INSTANCE, true);
|
||||
this.existsHasAgg = existsHasAgg;
|
||||
}
|
||||
|
||||
public MarkJoinSlotReference(ExprId exprId, String name, boolean existsHasAgg) {
|
||||
super(exprId, name, BooleanType.INSTANCE, false, ImmutableList.of());
|
||||
super(exprId, name, BooleanType.INSTANCE, true, ImmutableList.of());
|
||||
this.existsHasAgg = existsHasAgg;
|
||||
}
|
||||
|
||||
|
||||
@ -53,7 +53,8 @@ import java.util.stream.Collectors;
|
||||
public class JoinUtils {
|
||||
public static boolean couldShuffle(Join join) {
|
||||
// Cross-join and Null-Aware-Left-Anti-Join only can be broadcast join.
|
||||
return !(join.getJoinType().isCrossJoin()) && !(join.getJoinType().isNullAwareLeftAntiJoin());
|
||||
// Because mark join would consider null value from both build and probe side, so must use broadcast join too.
|
||||
return !(join.getJoinType().isCrossJoin() || join.getJoinType().isNullAwareLeftAntiJoin() || join.isMarkJoin());
|
||||
}
|
||||
|
||||
public static boolean couldBroadcast(Join join) {
|
||||
|
||||
@ -450,3 +450,18 @@
|
||||
22 3
|
||||
24 4
|
||||
|
||||
-- !mark_join_nullable --
|
||||
\N
|
||||
\N
|
||||
\N
|
||||
\N
|
||||
\N
|
||||
\N
|
||||
true
|
||||
true
|
||||
true
|
||||
true
|
||||
\N
|
||||
\N
|
||||
\N
|
||||
|
||||
|
||||
@ -19,11 +19,11 @@ PhysicalResultSink
|
||||
----------------------------PhysicalProject
|
||||
------------------------------filter((date_dim.d_moy <= 4) and (date_dim.d_moy >= 1) and (date_dim.d_year = 2001))
|
||||
--------------------------------PhysicalOlapScan[date_dim]
|
||||
--------------------PhysicalProject
|
||||
----------------------filter(($c$1 OR $c$2))
|
||||
------------------------hashJoin[LEFT_SEMI_JOIN] hashCondition=((c.c_customer_sk = catalog_sales.cs_ship_customer_sk))otherCondition=()
|
||||
--------------------------hashJoin[LEFT_SEMI_JOIN] hashCondition=((c.c_customer_sk = web_sales.ws_bill_customer_sk))otherCondition=()
|
||||
----------------------------PhysicalDistribute
|
||||
--------------------PhysicalDistribute
|
||||
----------------------PhysicalProject
|
||||
------------------------filter(($c$1 OR $c$2))
|
||||
--------------------------hashJoin[LEFT_SEMI_JOIN] hashCondition=((c.c_customer_sk = catalog_sales.cs_ship_customer_sk))otherCondition=()
|
||||
----------------------------hashJoin[LEFT_SEMI_JOIN] hashCondition=((c.c_customer_sk = web_sales.ws_bill_customer_sk))otherCondition=()
|
||||
------------------------------PhysicalProject
|
||||
--------------------------------hashJoin[INNER_JOIN] hashCondition=((customer_demographics.cd_demo_sk = c.c_current_cdemo_sk))otherCondition=()
|
||||
----------------------------------PhysicalOlapScan[customer_demographics]
|
||||
@ -36,22 +36,22 @@ PhysicalResultSink
|
||||
------------------------------------------PhysicalProject
|
||||
--------------------------------------------filter(ca_county IN ('Cochran County', 'Kandiyohi County', 'Marquette County', 'Storey County', 'Warren County'))
|
||||
----------------------------------------------PhysicalOlapScan[customer_address]
|
||||
------------------------------PhysicalDistribute
|
||||
--------------------------------PhysicalProject
|
||||
----------------------------------hashJoin[INNER_JOIN] hashCondition=((web_sales.ws_sold_date_sk = date_dim.d_date_sk))otherCondition=()
|
||||
------------------------------------PhysicalProject
|
||||
--------------------------------------PhysicalOlapScan[web_sales]
|
||||
------------------------------------PhysicalDistribute
|
||||
--------------------------------------PhysicalProject
|
||||
----------------------------------------filter((date_dim.d_moy <= 4) and (date_dim.d_moy >= 1) and (date_dim.d_year = 2001))
|
||||
------------------------------------------PhysicalOlapScan[date_dim]
|
||||
----------------------------PhysicalDistribute
|
||||
------------------------------PhysicalProject
|
||||
--------------------------------hashJoin[INNER_JOIN] hashCondition=((web_sales.ws_sold_date_sk = date_dim.d_date_sk))otherCondition=()
|
||||
--------------------------------hashJoin[INNER_JOIN] hashCondition=((catalog_sales.cs_sold_date_sk = date_dim.d_date_sk))otherCondition=()
|
||||
----------------------------------PhysicalProject
|
||||
------------------------------------PhysicalOlapScan[web_sales]
|
||||
------------------------------------PhysicalOlapScan[catalog_sales]
|
||||
----------------------------------PhysicalDistribute
|
||||
------------------------------------PhysicalProject
|
||||
--------------------------------------filter((date_dim.d_moy <= 4) and (date_dim.d_moy >= 1) and (date_dim.d_year = 2001))
|
||||
----------------------------------------PhysicalOlapScan[date_dim]
|
||||
--------------------------PhysicalDistribute
|
||||
----------------------------PhysicalProject
|
||||
------------------------------hashJoin[INNER_JOIN] hashCondition=((catalog_sales.cs_sold_date_sk = date_dim.d_date_sk))otherCondition=()
|
||||
--------------------------------PhysicalProject
|
||||
----------------------------------PhysicalOlapScan[catalog_sales]
|
||||
--------------------------------PhysicalDistribute
|
||||
----------------------------------PhysicalProject
|
||||
------------------------------------filter((date_dim.d_moy <= 4) and (date_dim.d_moy >= 1) and (date_dim.d_year = 2001))
|
||||
--------------------------------------PhysicalOlapScan[date_dim]
|
||||
|
||||
|
||||
@ -19,11 +19,11 @@ PhysicalResultSink
|
||||
----------------------------PhysicalProject
|
||||
------------------------------filter((date_dim.d_qoy < 4) and (date_dim.d_year = 2001))
|
||||
--------------------------------PhysicalOlapScan[date_dim]
|
||||
--------------------PhysicalProject
|
||||
----------------------filter(($c$1 OR $c$2))
|
||||
------------------------hashJoin[LEFT_SEMI_JOIN] hashCondition=((c.c_customer_sk = catalog_sales.cs_ship_customer_sk))otherCondition=()
|
||||
--------------------------hashJoin[LEFT_SEMI_JOIN] hashCondition=((c.c_customer_sk = web_sales.ws_bill_customer_sk))otherCondition=()
|
||||
----------------------------PhysicalDistribute
|
||||
--------------------PhysicalDistribute
|
||||
----------------------PhysicalProject
|
||||
------------------------filter(($c$1 OR $c$2))
|
||||
--------------------------hashJoin[LEFT_SEMI_JOIN] hashCondition=((c.c_customer_sk = catalog_sales.cs_ship_customer_sk))otherCondition=()
|
||||
----------------------------hashJoin[LEFT_SEMI_JOIN] hashCondition=((c.c_customer_sk = web_sales.ws_bill_customer_sk))otherCondition=()
|
||||
------------------------------PhysicalProject
|
||||
--------------------------------hashJoin[INNER_JOIN] hashCondition=((customer_demographics.cd_demo_sk = c.c_current_cdemo_sk))otherCondition=()
|
||||
----------------------------------PhysicalDistribute
|
||||
@ -38,22 +38,22 @@ PhysicalResultSink
|
||||
----------------------------------PhysicalDistribute
|
||||
------------------------------------PhysicalProject
|
||||
--------------------------------------PhysicalOlapScan[customer_demographics]
|
||||
------------------------------PhysicalDistribute
|
||||
--------------------------------PhysicalProject
|
||||
----------------------------------hashJoin[INNER_JOIN] hashCondition=((web_sales.ws_sold_date_sk = date_dim.d_date_sk))otherCondition=()
|
||||
------------------------------------PhysicalProject
|
||||
--------------------------------------PhysicalOlapScan[web_sales]
|
||||
------------------------------------PhysicalDistribute
|
||||
--------------------------------------PhysicalProject
|
||||
----------------------------------------filter((date_dim.d_qoy < 4) and (date_dim.d_year = 2001))
|
||||
------------------------------------------PhysicalOlapScan[date_dim]
|
||||
----------------------------PhysicalDistribute
|
||||
------------------------------PhysicalProject
|
||||
--------------------------------hashJoin[INNER_JOIN] hashCondition=((web_sales.ws_sold_date_sk = date_dim.d_date_sk))otherCondition=()
|
||||
--------------------------------hashJoin[INNER_JOIN] hashCondition=((catalog_sales.cs_sold_date_sk = date_dim.d_date_sk))otherCondition=()
|
||||
----------------------------------PhysicalProject
|
||||
------------------------------------PhysicalOlapScan[web_sales]
|
||||
------------------------------------PhysicalOlapScan[catalog_sales]
|
||||
----------------------------------PhysicalDistribute
|
||||
------------------------------------PhysicalProject
|
||||
--------------------------------------filter((date_dim.d_qoy < 4) and (date_dim.d_year = 2001))
|
||||
----------------------------------------PhysicalOlapScan[date_dim]
|
||||
--------------------------PhysicalDistribute
|
||||
----------------------------PhysicalProject
|
||||
------------------------------hashJoin[INNER_JOIN] hashCondition=((catalog_sales.cs_sold_date_sk = date_dim.d_date_sk))otherCondition=()
|
||||
--------------------------------PhysicalProject
|
||||
----------------------------------PhysicalOlapScan[catalog_sales]
|
||||
--------------------------------PhysicalDistribute
|
||||
----------------------------------PhysicalProject
|
||||
------------------------------------filter((date_dim.d_qoy < 4) and (date_dim.d_year = 2001))
|
||||
--------------------------------------PhysicalOlapScan[date_dim]
|
||||
|
||||
|
||||
@ -50,6 +50,14 @@ suite ("sub_query_correlated") {
|
||||
DROP TABLE IF EXISTS `sub_query_correlated_subquery7`
|
||||
"""
|
||||
|
||||
sql """
|
||||
DROP TABLE IF EXISTS `sub_query_correlated_subquery8`
|
||||
"""
|
||||
|
||||
sql """
|
||||
DROP TABLE IF EXISTS `sub_query_correlated_subquery9`
|
||||
"""
|
||||
|
||||
sql """
|
||||
create table if not exists sub_query_correlated_subquery1
|
||||
(k1 bigint, k2 bigint)
|
||||
@ -105,6 +113,21 @@ suite ("sub_query_correlated") {
|
||||
properties('replication_num' = '1');
|
||||
"""
|
||||
|
||||
sql """
|
||||
create table if not exists sub_query_correlated_subquery8
|
||||
(k1 bigint, k2 bigint)
|
||||
duplicate key(k1)
|
||||
distributed by hash(k2) buckets 1
|
||||
properties('replication_num' = '1')
|
||||
"""
|
||||
|
||||
sql """
|
||||
create table if not exists sub_query_correlated_subquery9
|
||||
(k1 int, k2 varchar(128), k3 bigint, v1 bigint, v2 bigint)
|
||||
distributed by hash(k2) buckets 1
|
||||
properties('replication_num' = '1');
|
||||
"""
|
||||
|
||||
sql """
|
||||
insert into sub_query_correlated_subquery1 values (1,2), (1,3), (2,4), (2,5), (3,3), (3,4), (20,2), (22,3), (24,4)
|
||||
"""
|
||||
@ -126,7 +149,7 @@ suite ("sub_query_correlated") {
|
||||
insert into sub_query_correlated_subquery5 values (5,4), (5,2), (8,3), (5,4), (6,7), (8,9)
|
||||
"""
|
||||
|
||||
sql """
|
||||
sql """
|
||||
insert into sub_query_correlated_subquery6 values (1,null),(null,1),(1,2), (null,2),(1,3), (2,4), (2,5), (3,3), (3,4), (20,2), (22,3), (24,4),(null,null);
|
||||
"""
|
||||
|
||||
@ -135,6 +158,15 @@ suite ("sub_query_correlated") {
|
||||
(2,"uvw",3,4,2), (2,"uvw",3,4,2), (3,"abc",4,5,3), (3,"abc",4,5,3), (null,null,null,null,null);
|
||||
"""
|
||||
|
||||
sql """
|
||||
insert into sub_query_correlated_subquery8 values (1,null),(null,1),(1,2), (null,2),(1,3), (2,4), (2,5), (3,3), (3,4), (20,2), (22,3), (24,4),(null,null);
|
||||
"""
|
||||
|
||||
sql """
|
||||
insert into sub_query_correlated_subquery9 values (1,"abc",2,3,4), (1,"abcd",3,3,4),
|
||||
(2,"xyz",2,4,2),(2,"uvw",3,4,2), (2,"uvw",3,4,2), (3,"abc",4,5,3), (3,"abc",4,5,3), (null,null,null,null,null);
|
||||
"""
|
||||
|
||||
sql "SET enable_fallback_to_original_planner=false"
|
||||
|
||||
//------------------Correlated-----------------
|
||||
@ -496,6 +528,10 @@ suite ("sub_query_correlated") {
|
||||
order by k1, k2;
|
||||
"""
|
||||
|
||||
qt_mark_join_nullable """
|
||||
select sub_query_correlated_subquery8.k1 in (select sub_query_correlated_subquery9.k3 from sub_query_correlated_subquery9) from sub_query_correlated_subquery8 order by k1, k2;
|
||||
"""
|
||||
|
||||
// order_qt_doris_6937_2 """
|
||||
// select * from sub_query_correlated_subquery1 where sub_query_correlated_subquery1.k1 not in (select sub_query_correlated_subquery3.k3 from sub_query_correlated_subquery3 where sub_query_correlated_subquery3.v2 > sub_query_correlated_subquery1.k2) or k1 < 10 order by k1, k2;
|
||||
// """
|
||||
|
||||
Reference in New Issue
Block a user