[Feature](join) Support null aware left anti join (#13871)
This commit is contained in:
@ -59,8 +59,8 @@ struct ProcessHashTableBuild {
|
||||
_build_side_compute_hash_timer(join_node->_build_side_compute_hash_timer) {}
|
||||
|
||||
template <bool need_null_map_for_build, bool ignore_null, bool build_unique,
|
||||
bool has_runtime_filter>
|
||||
void run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map) {
|
||||
bool has_runtime_filter, bool short_circuit_for_null>
|
||||
void run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, bool* has_null_key) {
|
||||
using KeyGetter = typename HashTableContext::State;
|
||||
using Mapped = typename HashTableContext::Mapped;
|
||||
int64_t old_bucket_bytes = hash_table_ctx.hash_table.get_buffer_size_in_bytes();
|
||||
@ -97,6 +97,15 @@ struct ProcessHashTableBuild {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
// If apply short circuit strategy for null value (e.g. join operator is
|
||||
// NULL_AWARE_LEFT_ANTI_JOIN), we build hash table until we meet a null value.
|
||||
if constexpr (short_circuit_for_null && need_null_map_for_build) {
|
||||
if ((*null_map)[k]) {
|
||||
DCHECK(has_null_key);
|
||||
*has_null_key = true;
|
||||
return;
|
||||
}
|
||||
}
|
||||
if constexpr (IsSerializedHashTableContextTraits<KeyGetter>::value) {
|
||||
_build_side_hash_values[k] =
|
||||
hash_table_ctx.hash_table.hash(key_getter.get_key_holder(k, arena).key);
|
||||
@ -218,6 +227,7 @@ void ProcessHashTableProbe<JoinOpType, ignore_null>::build_side_output_column(
|
||||
constexpr auto is_semi_anti_join = JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN ||
|
||||
JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN ||
|
||||
JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN ||
|
||||
JoinOpType::value == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
|
||||
JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN;
|
||||
|
||||
constexpr auto probe_all = JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN ||
|
||||
@ -380,7 +390,8 @@ Status ProcessHashTableProbe<JoinOpType, ignore_null>::do_process(HashTableType&
|
||||
key_getter.template prefetch<true>(hash_table_ctx.hash_table,
|
||||
probe_index + PREFETCH_STEP, _arena);
|
||||
|
||||
if constexpr (JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) {
|
||||
if constexpr (JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN ||
|
||||
JoinOpType::value == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
|
||||
if (!find_result.is_found()) {
|
||||
++current_offset;
|
||||
}
|
||||
@ -575,7 +586,8 @@ Status ProcessHashTableProbe<JoinOpType, ignore_null>::do_process_with_other_joi
|
||||
}
|
||||
} else if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN ||
|
||||
JoinOpType::value == TJoinOp::FULL_OUTER_JOIN ||
|
||||
JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) {
|
||||
JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN ||
|
||||
JoinOpType::value == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
|
||||
same_to_prev.emplace_back(false);
|
||||
visited_map.emplace_back(nullptr);
|
||||
// only full outer / left outer need insert the data of right table
|
||||
@ -682,16 +694,23 @@ Status ProcessHashTableProbe<JoinOpType, ignore_null>::do_process_with_other_joi
|
||||
|
||||
output_block->get_by_position(result_column_id).column =
|
||||
std::move(new_filter_column);
|
||||
} else if constexpr (JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) {
|
||||
} else if constexpr (JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN ||
|
||||
JoinOpType::value == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
|
||||
auto new_filter_column = ColumnVector<UInt8>::create();
|
||||
auto& filter_map = new_filter_column->get_data();
|
||||
|
||||
if (!column->empty()) {
|
||||
// Both equal conjuncts and other conjuncts are true
|
||||
filter_map.emplace_back(column->get_bool(0) && visited_map[0]);
|
||||
}
|
||||
for (int i = 1; i < column->size(); ++i) {
|
||||
if ((visited_map[i] && column->get_bool(i)) ||
|
||||
(same_to_prev[i] && filter_map[i - 1])) {
|
||||
// When either of two conditions is meet:
|
||||
// 1. Both equal conjuncts and other conjuncts are true or same_to_prev
|
||||
// 2. This row is joined from the same build side row as the previous row
|
||||
// Set filter_map[i] to true and filter_map[i - 1] to false if same_to_prev[i]
|
||||
// is true.
|
||||
filter_map.push_back(true);
|
||||
filter_map[i - 1] = !same_to_prev[i] && filter_map[i - 1];
|
||||
} else {
|
||||
@ -731,8 +750,10 @@ Status ProcessHashTableProbe<JoinOpType, ignore_null>::do_process_with_other_joi
|
||||
output_block->clear();
|
||||
} else {
|
||||
if constexpr (JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN ||
|
||||
JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN)
|
||||
JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN ||
|
||||
JoinOpType::value == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
|
||||
orig_columns = right_col_idx;
|
||||
}
|
||||
Block::filter_block(output_block, result_column_id, orig_columns);
|
||||
}
|
||||
}
|
||||
@ -828,14 +849,16 @@ Status ProcessHashTableProbe<JoinOpType, ignore_null>::process_data_in_hashtable
|
||||
HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
|
||||
: ExecNode(pool, tnode, descs),
|
||||
_join_op(tnode.hash_join_node.join_op),
|
||||
_hash_table_rows(0),
|
||||
_mem_used(0),
|
||||
_have_other_join_conjunct(tnode.hash_join_node.__isset.vother_join_conjunct),
|
||||
_match_all_probe(_join_op == TJoinOp::LEFT_OUTER_JOIN ||
|
||||
_join_op == TJoinOp::FULL_OUTER_JOIN),
|
||||
_match_one_build(_join_op == TJoinOp::LEFT_SEMI_JOIN),
|
||||
_match_all_build(_join_op == TJoinOp::RIGHT_OUTER_JOIN ||
|
||||
_join_op == TJoinOp::FULL_OUTER_JOIN),
|
||||
_build_unique(_join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op == TJoinOp::LEFT_SEMI_JOIN),
|
||||
_build_unique(!_have_other_join_conjunct &&
|
||||
(_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
|
||||
_join_op == TJoinOp::LEFT_ANTI_JOIN ||
|
||||
_join_op == TJoinOp::LEFT_SEMI_JOIN)),
|
||||
_is_right_semi_anti(_join_op == TJoinOp::RIGHT_ANTI_JOIN ||
|
||||
_join_op == TJoinOp::RIGHT_SEMI_JOIN),
|
||||
_is_outer_join(_match_all_build || _match_all_probe),
|
||||
@ -874,17 +897,17 @@ void HashJoinNode::init_join_op() {
|
||||
Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
|
||||
RETURN_IF_ERROR(ExecNode::init(tnode, state));
|
||||
DCHECK(tnode.__isset.hash_join_node);
|
||||
if (tnode.hash_join_node.join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
|
||||
return Status::InternalError("Do not support null aware left anti join");
|
||||
}
|
||||
|
||||
const bool build_stores_null = _join_op == TJoinOp::RIGHT_OUTER_JOIN ||
|
||||
_join_op == TJoinOp::FULL_OUTER_JOIN ||
|
||||
_join_op == TJoinOp::RIGHT_ANTI_JOIN;
|
||||
const bool probe_dispose_null =
|
||||
_match_all_probe || _build_unique || _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
|
||||
_match_all_probe || _build_unique || _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
|
||||
_join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op == TJoinOp::LEFT_SEMI_JOIN;
|
||||
|
||||
const std::vector<TEqJoinCondition>& eq_join_conjuncts = tnode.hash_join_node.eq_join_conjuncts;
|
||||
std::vector<bool> probe_not_ignore_null(eq_join_conjuncts.size());
|
||||
size_t conjuncts_index = 0;
|
||||
for (const auto& eq_join_conjunct : eq_join_conjuncts) {
|
||||
VExprContext* ctx = nullptr;
|
||||
RETURN_IF_ERROR(VExpr::create_expr_tree(_pool, eq_join_conjunct.left, &ctx));
|
||||
@ -897,17 +920,20 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
|
||||
_is_null_safe_eq_join.push_back(null_aware);
|
||||
|
||||
// if is null aware, build join column and probe join column both need dispose null value
|
||||
_build_not_ignore_null.emplace_back(
|
||||
_store_null_in_hash_table.emplace_back(
|
||||
null_aware ||
|
||||
(_build_expr_ctxs.back()->root()->is_nullable() && build_stores_null));
|
||||
_probe_not_ignore_null.emplace_back(
|
||||
probe_not_ignore_null[conjuncts_index] =
|
||||
null_aware ||
|
||||
(_probe_expr_ctxs.back()->root()->is_nullable() && probe_dispose_null));
|
||||
_build_side_ignore_null |= !_build_not_ignore_null.back();
|
||||
(_probe_expr_ctxs.back()->root()->is_nullable() && probe_dispose_null);
|
||||
_build_side_ignore_null |= (_join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
|
||||
!_store_null_in_hash_table.back());
|
||||
conjuncts_index++;
|
||||
}
|
||||
for (size_t i = 0; i < _probe_expr_ctxs.size(); ++i) {
|
||||
_probe_ignore_null |= !_probe_not_ignore_null[i];
|
||||
_probe_ignore_null |= !probe_not_ignore_null[i];
|
||||
}
|
||||
_short_circuit_for_null_in_build_side = _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
|
||||
|
||||
_probe_column_disguise_null.reserve(eq_join_conjuncts.size());
|
||||
|
||||
@ -918,8 +944,8 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
|
||||
|
||||
// If LEFT SEMI JOIN/LEFT ANTI JOIN with not equal predicate,
|
||||
// build table should not be deduplicated.
|
||||
_build_unique = false;
|
||||
_have_other_join_conjunct = true;
|
||||
DCHECK(!_build_unique);
|
||||
DCHECK(_have_other_join_conjunct);
|
||||
}
|
||||
|
||||
const auto& output_exprs = tnode.hash_join_node.srcExprList;
|
||||
@ -1057,6 +1083,12 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo
|
||||
SCOPED_TIMER(_runtime_profile->total_time_counter());
|
||||
SCOPED_TIMER(_probe_timer);
|
||||
|
||||
if (_short_circuit_for_null_in_probe_side) {
|
||||
// If we use a short-circuit strategy for null value in build side (e.g. if join operator is
|
||||
// NULL_AWARE_LEFT_ANTI_JOIN), we should return empty block directly.
|
||||
*eos = true;
|
||||
return Status::OK();
|
||||
}
|
||||
size_t probe_rows = _probe_block.rows();
|
||||
if ((probe_rows == 0 || _probe_index == probe_rows) && !_probe_eos) {
|
||||
_probe_index = 0;
|
||||
@ -1285,7 +1317,9 @@ Status HashJoinNode::_hash_table_build(RuntimeState* state) {
|
||||
constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
|
||||
|
||||
Block block;
|
||||
while (!eos) {
|
||||
// If eos or have already met a null value using short-circuit strategy, we do not need to pull
|
||||
// data from data.
|
||||
while (!eos && !_short_circuit_for_null_in_probe_side) {
|
||||
block.clear_column_data();
|
||||
RETURN_IF_CANCELLED(state);
|
||||
|
||||
@ -1315,7 +1349,7 @@ Status HashJoinNode::_hash_table_build(RuntimeState* state) {
|
||||
}
|
||||
}
|
||||
|
||||
if (!mutable_block.empty()) {
|
||||
if (!mutable_block.empty() && !_short_circuit_for_null_in_probe_side) {
|
||||
if (_build_blocks.size() == _MAX_BUILD_BLOCK_COUNT) {
|
||||
return Status::NotSupported(
|
||||
strings::Substitute("data size of right table in hash join > $0",
|
||||
@ -1356,7 +1390,7 @@ Status HashJoinNode::_extract_join_column(Block& block, ColumnUInt8::MutablePtr&
|
||||
DCHECK(null_map != nullptr);
|
||||
VectorizedUtils::update_null_map(null_map->get_data(), col_nullmap);
|
||||
}
|
||||
if (_build_not_ignore_null[i]) {
|
||||
if (_store_null_in_hash_table[i]) {
|
||||
raw_ptrs[i] = nullable;
|
||||
} else {
|
||||
if constexpr (BuildSide) {
|
||||
@ -1400,7 +1434,7 @@ bool HashJoinNode::_need_null_map(Block& block, const std::vector<int>& res_col_
|
||||
auto column = block.get_by_position(res_col_ids[i]).column.get();
|
||||
if constexpr (BuildSide) {
|
||||
if (check_and_get_column<ColumnNullable>(*column)) {
|
||||
if (!_build_not_ignore_null[i]) {
|
||||
if (!_store_null_in_hash_table[i]) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@ -1434,7 +1468,8 @@ Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, uin
|
||||
// so we have to initialize this flag by the first build block.
|
||||
if (!_has_set_need_null_map_for_build) {
|
||||
_has_set_need_null_map_for_build = true;
|
||||
_need_null_map_for_build = _need_null_map<true>(block, res_col_ids);
|
||||
_need_null_map_for_build =
|
||||
_short_circuit_for_null_in_build_side || _need_null_map<true>(block, res_col_ids);
|
||||
}
|
||||
if (_need_null_map_for_build) {
|
||||
null_map_val = ColumnUInt8::create();
|
||||
@ -1458,21 +1493,24 @@ Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, uin
|
||||
|
||||
std::visit(
|
||||
[&](auto&& arg, auto has_null_value, auto build_unique, auto has_runtime_filter_value,
|
||||
auto need_null_map_for_build) {
|
||||
auto need_null_map_for_build, auto short_circuit_for_null_in_build_side) {
|
||||
using HashTableCtxType = std::decay_t<decltype(arg)>;
|
||||
if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
|
||||
ProcessHashTableBuild<HashTableCtxType> hash_table_build_process(
|
||||
rows, block, raw_ptrs, this, state->batch_size(), offset);
|
||||
hash_table_build_process.template run<need_null_map_for_build, has_null_value,
|
||||
build_unique, has_runtime_filter_value>(
|
||||
arg, need_null_map_for_build ? &null_map_val->get_data() : nullptr);
|
||||
build_unique, has_runtime_filter_value,
|
||||
short_circuit_for_null_in_build_side>(
|
||||
arg, need_null_map_for_build ? &null_map_val->get_data() : nullptr,
|
||||
&_short_circuit_for_null_in_probe_side);
|
||||
} else {
|
||||
LOG(FATAL) << "FATAL: uninited hash table";
|
||||
}
|
||||
},
|
||||
_hash_table_variants, make_bool_variant(_build_side_ignore_null),
|
||||
make_bool_variant(_build_unique), make_bool_variant(has_runtime_filter),
|
||||
make_bool_variant(_need_null_map_for_build));
|
||||
make_bool_variant(_need_null_map_for_build),
|
||||
make_bool_variant(_short_circuit_for_null_in_build_side));
|
||||
|
||||
return st;
|
||||
}
|
||||
@ -1488,7 +1526,7 @@ void HashJoinNode::_hash_table_init() {
|
||||
JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN ||
|
||||
JoinOpType::value == TJoinOp::FULL_OUTER_JOIN,
|
||||
RowRefListWithFlag, RowRefList>>;
|
||||
if (_build_expr_ctxs.size() == 1 && !_build_not_ignore_null[0]) {
|
||||
if (_build_expr_ctxs.size() == 1 && !_store_null_in_hash_table[0]) {
|
||||
// Single column optimization
|
||||
switch (_build_expr_ctxs[0]->root()->result_type()) {
|
||||
case TYPE_BOOLEAN:
|
||||
|
||||
@ -288,7 +288,6 @@ public:
|
||||
Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
|
||||
Status get_next(RuntimeState* state, Block* block, bool* eos) override;
|
||||
Status close(RuntimeState* state) override;
|
||||
HashTableVariants& get_hash_table_variants() { return _hash_table_variants; }
|
||||
void init_join_op();
|
||||
|
||||
const RowDescriptor& row_desc() const override { return _output_row_desc; }
|
||||
@ -311,10 +310,8 @@ private:
|
||||
// mark the join column whether support null eq
|
||||
std::vector<bool> _is_null_safe_eq_join;
|
||||
|
||||
// mark the build hash table whether contain null column
|
||||
std::vector<bool> _build_not_ignore_null;
|
||||
// mark the probe table should dispose null column
|
||||
std::vector<bool> _probe_not_ignore_null;
|
||||
// mark the build hash table whether it needs to store null value
|
||||
std::vector<bool> _store_null_in_hash_table;
|
||||
|
||||
std::vector<uint16_t> _probe_column_disguise_null;
|
||||
std::vector<uint16_t> _probe_column_convert_to_null;
|
||||
@ -343,7 +340,6 @@ private:
|
||||
|
||||
RuntimeProfile::Counter* _join_filter_timer;
|
||||
|
||||
int64_t _hash_table_rows;
|
||||
int64_t _mem_used;
|
||||
|
||||
Arena _arena;
|
||||
@ -368,14 +364,20 @@ private:
|
||||
Sizes _probe_key_sz;
|
||||
Sizes _build_key_sz;
|
||||
|
||||
bool _have_other_join_conjunct;
|
||||
const bool _match_all_probe; // output all rows coming from the probe input. Full/Left Join
|
||||
const bool _match_one_build; // match at most one build row to each probe row. Left semi Join
|
||||
const bool _match_all_build; // output all rows coming from the build input. Full/Right Join
|
||||
bool _build_unique; // build a hash table without duplicated rows. Left semi/anti Join
|
||||
|
||||
const bool _is_right_semi_anti;
|
||||
const bool _is_outer_join;
|
||||
bool _have_other_join_conjunct = false;
|
||||
|
||||
// For null aware left anti join, we apply a short circuit strategy.
|
||||
// 1. Set _short_circuit_for_null_in_build_side to true if join operator is null aware left anti join.
|
||||
// 2. In build phase, we stop building hash table when we meet the first null value and set _short_circuit_for_null_in_probe_side to true.
|
||||
// 3. In probe phase, if _short_circuit_for_null_in_probe_side is true, join node returns empty block directly. Otherwise, probing will continue as the same as generic left anti join.
|
||||
bool _short_circuit_for_null_in_build_side = false;
|
||||
bool _short_circuit_for_null_in_probe_side = false;
|
||||
|
||||
Block _join_block;
|
||||
|
||||
|
||||
@ -61,7 +61,8 @@ public enum JoinOperator {
|
||||
}
|
||||
|
||||
public boolean isSemiAntiJoin() {
|
||||
return this == LEFT_SEMI_JOIN || this == RIGHT_SEMI_JOIN || this == LEFT_ANTI_JOIN || this == RIGHT_ANTI_JOIN;
|
||||
return this == LEFT_SEMI_JOIN || this == RIGHT_SEMI_JOIN || this == LEFT_ANTI_JOIN
|
||||
|| this == NULL_AWARE_LEFT_ANTI_JOIN || this == RIGHT_ANTI_JOIN;
|
||||
}
|
||||
|
||||
public boolean isSemiJoin() {
|
||||
|
||||
@ -27,6 +27,7 @@ import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.TableAliasGenerator;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.VectorizedUtil;
|
||||
import org.apache.doris.policy.RowPolicy;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
|
||||
@ -755,8 +756,8 @@ public class StmtRewriter {
|
||||
// For the case of a NOT IN with an eq join conjunct, replace the join
|
||||
// conjunct with a conjunct that uses the null-matching eq operator.
|
||||
if (expr instanceof InPredicate) {
|
||||
// joinOp = JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN;
|
||||
joinOp = JoinOperator.LEFT_ANTI_JOIN;
|
||||
joinOp = VectorizedUtil.isVectorized()
|
||||
? JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN : JoinOperator.LEFT_ANTI_JOIN;
|
||||
List<TupleId> tIds = Lists.newArrayList();
|
||||
joinConjunct.getIds(tIds, null);
|
||||
if (tIds.size() <= 1 || !tIds.contains(inlineView.getDesc().getId())) {
|
||||
@ -804,7 +805,8 @@ public class StmtRewriter {
|
||||
for (int j = 0; j < tableIdx; ++j) {
|
||||
TableRef tableRef = stmt.fromClause.get(j);
|
||||
if (tableRef.getJoinOp() == JoinOperator.LEFT_SEMI_JOIN
|
||||
|| tableRef.getJoinOp() == JoinOperator.LEFT_ANTI_JOIN) {
|
||||
|| tableRef.getJoinOp() == JoinOperator.LEFT_ANTI_JOIN
|
||||
|| tableRef.getJoinOp() == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) {
|
||||
continue;
|
||||
}
|
||||
newItems.add(SelectListItem.createStarItem(tableRef.getAliasAsName()));
|
||||
|
||||
@ -640,6 +640,8 @@ public class TableRef implements ParseNode, Writable {
|
||||
return "FULL OUTER JOIN";
|
||||
case CROSS_JOIN:
|
||||
return "CROSS JOIN";
|
||||
case NULL_AWARE_LEFT_ANTI_JOIN:
|
||||
return "NULL AWARE LEFT ANTI JOIN";
|
||||
default:
|
||||
return "bad join op: " + joinOp.toString();
|
||||
}
|
||||
|
||||
@ -37,6 +37,7 @@ public enum JoinType {
|
||||
LEFT_ANTI_JOIN,
|
||||
RIGHT_ANTI_JOIN,
|
||||
CROSS_JOIN,
|
||||
NULL_AWARE_LEFT_ANTI_JOIN,
|
||||
;
|
||||
|
||||
private static final Map<JoinType, JoinType> joinSwapMap = ImmutableMap
|
||||
@ -71,6 +72,8 @@ public enum JoinType {
|
||||
return JoinOperator.FULL_OUTER_JOIN;
|
||||
case LEFT_ANTI_JOIN:
|
||||
return JoinOperator.LEFT_ANTI_JOIN;
|
||||
case NULL_AWARE_LEFT_ANTI_JOIN:
|
||||
return JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN;
|
||||
case RIGHT_ANTI_JOIN:
|
||||
return JoinOperator.RIGHT_ANTI_JOIN;
|
||||
case LEFT_SEMI_JOIN:
|
||||
@ -97,7 +100,8 @@ public enum JoinType {
|
||||
}
|
||||
|
||||
public final boolean isLeftJoin() {
|
||||
return this == LEFT_OUTER_JOIN || this == LEFT_ANTI_JOIN || this == LEFT_SEMI_JOIN;
|
||||
return this == LEFT_OUTER_JOIN || this == LEFT_ANTI_JOIN || this == NULL_AWARE_LEFT_ANTI_JOIN
|
||||
|| this == LEFT_SEMI_JOIN;
|
||||
}
|
||||
|
||||
public final boolean isRightJoin() {
|
||||
@ -117,7 +121,7 @@ public enum JoinType {
|
||||
}
|
||||
|
||||
public final boolean isLeftSemiOrAntiJoin() {
|
||||
return this == LEFT_SEMI_JOIN || this == LEFT_ANTI_JOIN;
|
||||
return this == LEFT_SEMI_JOIN || this == LEFT_ANTI_JOIN || this == NULL_AWARE_LEFT_ANTI_JOIN;
|
||||
}
|
||||
|
||||
public final boolean isRightSemiOrAntiJoin() {
|
||||
@ -125,7 +129,8 @@ public enum JoinType {
|
||||
}
|
||||
|
||||
public final boolean isSemiOrAntiJoin() {
|
||||
return this == LEFT_SEMI_JOIN || this == RIGHT_SEMI_JOIN || this == LEFT_ANTI_JOIN || this == RIGHT_ANTI_JOIN;
|
||||
return this == LEFT_SEMI_JOIN || this == RIGHT_SEMI_JOIN || this == LEFT_ANTI_JOIN
|
||||
|| this == NULL_AWARE_LEFT_ANTI_JOIN || this == RIGHT_ANTI_JOIN;
|
||||
}
|
||||
|
||||
public final boolean isOuterJoin() {
|
||||
@ -137,7 +142,7 @@ public enum JoinType {
|
||||
}
|
||||
|
||||
public final boolean isRemainRightJoin() {
|
||||
return this != LEFT_SEMI_JOIN && this != LEFT_ANTI_JOIN;
|
||||
return this != LEFT_SEMI_JOIN && this != LEFT_ANTI_JOIN && this != NULL_AWARE_LEFT_ANTI_JOIN;
|
||||
}
|
||||
|
||||
public final boolean isSwapJoinType() {
|
||||
|
||||
@ -27,4 +27,6 @@ public class DistributedPlanColocateRule {
|
||||
public static final String COLOCATE_GROUP_IS_NOT_STABLE = "Colocate group is not stable";
|
||||
public static final String INCONSISTENT_DISTRIBUTION_OF_TABLE_AND_QUERY
|
||||
= "Inconsistent distribution of table and queries";
|
||||
public static final String NULL_AWARE_LEFT_ANTI_JOIN_MUST_BROADCAST
|
||||
= "Build side of null aware left anti join must be broadcast";
|
||||
}
|
||||
|
||||
@ -353,7 +353,10 @@ public class DistributedPlanner {
|
||||
// - and the expected size of the hash tbl doesn't exceed autoBroadcastThreshold
|
||||
// we set partition join as default when broadcast join cost equals partition join cost
|
||||
|
||||
if (node.getJoinOp() != JoinOperator.RIGHT_OUTER_JOIN && node.getJoinOp() != JoinOperator.FULL_OUTER_JOIN) {
|
||||
if (node.getJoinOp() == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) {
|
||||
doBroadcast = true;
|
||||
} else if (node.getJoinOp() != JoinOperator.RIGHT_OUTER_JOIN
|
||||
&& node.getJoinOp() != JoinOperator.FULL_OUTER_JOIN) {
|
||||
if (node.getInnerRef().isBroadcastJoin()) {
|
||||
// respect user join hint
|
||||
doBroadcast = true;
|
||||
@ -425,26 +428,33 @@ public class DistributedPlanner {
|
||||
|
||||
/**
|
||||
* Colocate Join can be performed when the following 4 conditions are met at the same time.
|
||||
* 1. Session variables disable_colocate_plan = false
|
||||
* 2. There is no join hints in HashJoinNode
|
||||
* 3. There are no exchange node between source scan node and HashJoinNode.
|
||||
* 4. The scan nodes which are related by EqConjuncts in HashJoinNode are colocate and group can be matched.
|
||||
* 1. Join operator is not NULL_AWARE_LEFT_ANTI_JOIN
|
||||
* 2. Session variables disable_colocate_plan = false
|
||||
* 3. There is no join hints in HashJoinNode
|
||||
* 4. There are no exchange node between source scan node and HashJoinNode.
|
||||
* 5. The scan nodes which are related by EqConjuncts in HashJoinNode are colocate and group can be matched.
|
||||
*/
|
||||
private boolean canColocateJoin(HashJoinNode node, PlanFragment leftChildFragment, PlanFragment rightChildFragment,
|
||||
List<String> cannotReason) {
|
||||
// Condition1
|
||||
if (node.getJoinOp() == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) {
|
||||
cannotReason.add(DistributedPlanColocateRule.NULL_AWARE_LEFT_ANTI_JOIN_MUST_BROADCAST);
|
||||
return false;
|
||||
}
|
||||
|
||||
// Condition2
|
||||
if (ConnectContext.get().getSessionVariable().isDisableColocatePlan()) {
|
||||
cannotReason.add(DistributedPlanColocateRule.SESSION_DISABLED);
|
||||
return false;
|
||||
}
|
||||
|
||||
// Condition2: If user have a join hint to use proper way of join, can not be colocate join
|
||||
// Condition3: If user have a join hint to use proper way of join, can not be colocate join
|
||||
if (node.getInnerRef().hasJoinHints()) {
|
||||
cannotReason.add(DistributedPlanColocateRule.HAS_JOIN_HINT);
|
||||
return false;
|
||||
}
|
||||
|
||||
// Condition3:
|
||||
// Condition4:
|
||||
// If there is an exchange node between the HashJoinNode and their real associated ScanNode,
|
||||
// it means that the data has been rehashed.
|
||||
// The rehashed data can no longer be guaranteed to correspond to the left and right buckets,
|
||||
@ -468,7 +478,7 @@ public class DistributedPlanner {
|
||||
predicateList.add(eqJoinPredicate);
|
||||
}
|
||||
|
||||
// Condition4
|
||||
// Condition5
|
||||
return dataDistributionMatchEqPredicate(scanNodeWithJoinConjuncts, cannotReason);
|
||||
}
|
||||
|
||||
@ -581,6 +591,10 @@ public class DistributedPlanner {
|
||||
|
||||
private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFragment,
|
||||
List<Expr> rhsHashExprs) {
|
||||
if (node.getJoinOp() == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -222,7 +222,8 @@ public final class RuntimeFilterGenerator {
|
||||
// from the ON clause.
|
||||
if (!joinNode.getJoinOp().isLeftOuterJoin()
|
||||
&& !joinNode.getJoinOp().isFullOuterJoin()
|
||||
&& !joinNode.getJoinOp().equals(JoinOperator.LEFT_ANTI_JOIN)) {
|
||||
&& !joinNode.getJoinOp().equals(JoinOperator.LEFT_ANTI_JOIN)
|
||||
&& !joinNode.getJoinOp().equals(JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN)) {
|
||||
joinConjuncts.addAll(joinNode.getEqJoinConjuncts());
|
||||
}
|
||||
|
||||
|
||||
@ -74,7 +74,9 @@ public class ExprRewriter {
|
||||
case FULL_OUTER_JOIN: return FULL_OUTER_JOIN_CLAUSE;
|
||||
case LEFT_SEMI_JOIN: return LEFT_SEMI_JOIN_CLAUSE;
|
||||
case RIGHT_SEMI_JOIN: return RIGHT_SEMI_JOIN_CLAUSE;
|
||||
case LEFT_ANTI_JOIN: return LEFT_ANTI_JOIN_CLAUSE;
|
||||
case NULL_AWARE_LEFT_ANTI_JOIN:
|
||||
case LEFT_ANTI_JOIN:
|
||||
return LEFT_ANTI_JOIN_CLAUSE;
|
||||
case RIGHT_ANTI_JOIN: return RIGHT_ANTI_JOIN_CLAUSE;
|
||||
case CROSS_JOIN: return CROSS_JOIN_CLAUSE;
|
||||
default: return OTHER_CLAUSE;
|
||||
|
||||
@ -457,7 +457,8 @@ public class InferFiltersRule implements ExprRewriteRule {
|
||||
|| (joinOperator == JoinOperator.LEFT_SEMI_JOIN)
|
||||
|| (!needChange && joinOperator == JoinOperator.RIGHT_OUTER_JOIN)
|
||||
|| (needChange && (joinOperator == JoinOperator.LEFT_OUTER_JOIN
|
||||
|| joinOperator == JoinOperator.LEFT_ANTI_JOIN))) {
|
||||
|| joinOperator == JoinOperator.LEFT_ANTI_JOIN
|
||||
|| joinOperator == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN))) {
|
||||
ret = true;
|
||||
}
|
||||
} else if (clauseType == ExprRewriter.ClauseType.WHERE_CLAUSE) {
|
||||
@ -465,7 +466,8 @@ public class InferFiltersRule implements ExprRewriteRule {
|
||||
|| (joinOperator == JoinOperator.LEFT_SEMI_JOIN
|
||||
|| (needChange && joinOperator == JoinOperator.RIGHT_OUTER_JOIN))
|
||||
|| (!needChange && (joinOperator == JoinOperator.LEFT_OUTER_JOIN
|
||||
|| joinOperator == JoinOperator.LEFT_ANTI_JOIN))) {
|
||||
|| joinOperator == JoinOperator.LEFT_ANTI_JOIN
|
||||
|| joinOperator == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN))) {
|
||||
ret = true;
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,10 @@
|
||||
-- This file is automatically generated. You should know what you did if you want to edit this
|
||||
-- !select --
|
||||
2
|
||||
|
||||
-- !select --
|
||||
\N
|
||||
2
|
||||
|
||||
-- !select --
|
||||
|
||||
@ -0,0 +1,66 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite("test_null_aware_left_anti_join") {
|
||||
def tableName1 = "test_null_aware_left_anti_join1"
|
||||
def tableName2 = "test_null_aware_left_anti_join2"
|
||||
sql """
|
||||
drop table if exists ${tableName1};
|
||||
"""
|
||||
|
||||
sql """
|
||||
drop table if exists ${tableName2};
|
||||
"""
|
||||
|
||||
sql """
|
||||
create table if not exists ${tableName1} ( `k1` int(11) NULL ) DISTRIBUTED BY HASH(`k1`) BUCKETS 4 PROPERTIES ( "replication_num" = "1");
|
||||
"""
|
||||
|
||||
sql """
|
||||
create table if not exists ${tableName2} ( `k1` int(11) NULL ) DISTRIBUTED BY HASH(`k1`) BUCKETS 4 PROPERTIES ( "replication_num" = "1");
|
||||
"""
|
||||
|
||||
sql """
|
||||
insert into ${tableName1} values (1), (3);
|
||||
"""
|
||||
|
||||
sql """
|
||||
insert into ${tableName2} values (1), (2);
|
||||
"""
|
||||
|
||||
qt_select """ select ${tableName2}.k1 from ${tableName2} where k1 not in (select ${tableName1}.k1 from ${tableName1}) order by ${tableName2}.k1; """
|
||||
|
||||
sql """
|
||||
insert into ${tableName2} values(null);
|
||||
"""
|
||||
|
||||
qt_select """ select ${tableName2}.k1 from ${tableName2} where k1 not in (select ${tableName1}.k1 from ${tableName1}) order by ${tableName2}.k1; """
|
||||
|
||||
sql """
|
||||
insert into ${tableName1} values(null);
|
||||
"""
|
||||
|
||||
qt_select """ select ${tableName2}.k1 from ${tableName2} where k1 not in (select ${tableName1}.k1 from ${tableName1}) order by ${tableName2}.k1; """
|
||||
|
||||
sql """
|
||||
drop table if exists ${tableName2};
|
||||
"""
|
||||
|
||||
sql """
|
||||
drop table if exists ${tableName1};
|
||||
"""
|
||||
}
|
||||
@ -67,9 +67,9 @@ suite("test_explain_tpch_sf_1_q16") {
|
||||
explainStr.contains("VAGGREGATE (update serialize)\n" +
|
||||
" | STREAMING\n" +
|
||||
" | group by: <slot 29>, <slot 30>, <slot 31>, <slot 27>") &&
|
||||
explainStr.contains("join op: LEFT ANTI JOIN(BROADCAST)[The src data has been redistributed]\n" +
|
||||
explainStr.contains("join op: NULL AWARE LEFT ANTI JOIN(BROADCAST)[Build side of null aware left anti join must be broadcast]\n" +
|
||||
" | equal join conjunct: <slot 21> = `s_suppkey`") &&
|
||||
explainStr.contains("vec output tuple id: 8") &&
|
||||
explainStr.contains("vec output tuple id: 8") &&
|
||||
explainStr.contains("output slot ids: 27 29 30 31 \n" +
|
||||
" | hash output slot ids: 21 23 24 25 ") &&
|
||||
explainStr.contains("join op: INNER JOIN(BROADCAST)[Tables are not in the same group]\n" +
|
||||
|
||||
Reference in New Issue
Block a user