From 343a6dc29de2566d86e2caccc015df683cc61d75 Mon Sep 17 00:00:00 2001 From: TengJianPing <18241664+jacktengg@users.noreply.github.com> Date: Thu, 17 Aug 2023 09:17:09 +0800 Subject: [PATCH] [improvement](hash join) Return result early if probe side has no data (#23044) --- be/src/runtime/runtime_state.h | 5 + be/src/vec/exec/join/vhash_join_node.cpp | 52 ++- be/src/vec/exec/join/vhash_join_node.h | 8 + be/src/vec/exec/join/vjoin_node_base.h | 2 +- .../org/apache/doris/qe/SessionVariable.java | 8 + gensrc/thrift/PaloInternalService.thrift | 2 + .../join/test_hash_join_probe_early_eos.out | 93 ++++ .../test_hash_join_probe_early_eos.groovy | 397 ++++++++++++++++++ 8 files changed, 565 insertions(+), 2 deletions(-) create mode 100644 regression-test/data/query_p0/join/test_hash_join_probe_early_eos.out create mode 100644 regression-test/suites/query_p0/join/test_hash_join_probe_early_eos.groovy diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 1f35f6c1b8..73c6f1938c 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -394,6 +394,11 @@ public: _query_options.enable_share_hash_table_for_broadcast_join; } + bool enable_hash_join_early_start_probe() const { + return _query_options.__isset.enable_hash_join_early_start_probe && + _query_options.enable_hash_join_early_start_probe; + } + int repeat_max_num() const { #ifndef BE_TEST if (!_query_options.__isset.repeat_max_num) { diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 807c63ef5c..4494ec80d2 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -678,6 +678,11 @@ Status HashJoinNode::push(RuntimeState* /*state*/, vectorized::Block* input_bloc Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); + if (_is_hash_join_early_start_probe_eos(state)) { + *eos = true; + return Status::OK(); + } + if (_short_circuit_for_probe) { // If we use a short-circuit strategy, should return empty block directly. *eos = true; @@ -761,6 +766,50 @@ void HashJoinNode::_prepare_probe_block() { release_block_memory(_probe_block); } +bool HashJoinNode::_enable_hash_join_early_start_probe(RuntimeState* state) const { + return state->enable_hash_join_early_start_probe() && + (_join_op == TJoinOp::INNER_JOIN || _join_op == TJoinOp::LEFT_OUTER_JOIN || + _join_op == TJoinOp::LEFT_SEMI_JOIN || _join_op == TJoinOp::LEFT_ANTI_JOIN); +} + +bool HashJoinNode::_is_hash_join_early_start_probe_eos(RuntimeState* state) const { + return _enable_hash_join_early_start_probe(state) && _probe_block.rows() == 0; +} + +void HashJoinNode::_probe_side_open_thread(RuntimeState* state, std::promise* promise) { + Defer defer {[&]() { _probe_open_finish = true; }}; + + SCOPED_ATTACH_TASK(state); + auto st = child(0)->open(state); + if (!st.ok()) { + promise->set_value(st); + return; + } + if (_enable_hash_join_early_start_probe(state)) { + while (need_more_input_data()) { + prepare_for_next(); + SCOPED_TIMER(_probe_next_timer); + st = child(0)->get_next_after_projects( + state, &_probe_block, &_probe_eos, + std::bind((Status(ExecNode::*)(RuntimeState*, vectorized::Block*, bool*)) & + ExecNode::get_next, + _children[0], std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3)); + if (!st.ok()) { + promise->set_value(st); + return; + } + + st = push(state, &_probe_block, _probe_eos); + if (!st.ok()) { + promise->set_value(st); + return; + } + } + } + promise->set_value(Status::OK()); +} + Status HashJoinNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); SCOPED_TIMER(_open_timer); @@ -800,7 +849,8 @@ Status HashJoinNode::_materialize_build_side(RuntimeState* state) { Block block; // If eos or have already met a null value using short-circuit strategy, we do not need to pull // data from data. - while (!eos && !_short_circuit_for_null_in_probe_side) { + while (!eos && !_short_circuit_for_null_in_probe_side && + (!_probe_open_finish || !_is_hash_join_early_start_probe_eos(state))) { block.clear_column_data(); RETURN_IF_CANCELLED(state); { diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index 096a9148cc..85e53dbdab 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -21,6 +21,7 @@ #include #include +#include #include #include #include @@ -261,6 +262,9 @@ public: return _runtime_filter_slots->ready_finish_publish(); } +protected: + void _probe_side_open_thread(RuntimeState* state, std::promise* status) override; + private: void _init_short_circuit_for_probe() override { _short_circuit_for_probe = @@ -273,6 +277,9 @@ private: (_build_blocks->empty() && _join_op == TJoinOp::RIGHT_ANTI_JOIN); } + bool _enable_hash_join_early_start_probe(RuntimeState* state) const; + bool _is_hash_join_early_start_probe_eos(RuntimeState* state) const; + // probe expr VExprContextSPtrs _probe_expr_ctxs; // build expr @@ -409,6 +416,7 @@ private: std::vector _runtime_filters; size_t _build_bf_cardinality = 0; + std::atomic_bool _probe_open_finish = false; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/exec/join/vjoin_node_base.h b/be/src/vec/exec/join/vjoin_node_base.h index 120e77785e..7ebbd692e4 100644 --- a/be/src/vec/exec/join/vjoin_node_base.h +++ b/be/src/vec/exec/join/vjoin_node_base.h @@ -82,7 +82,7 @@ protected: // columns from probe side to a nullable column. Status _build_output_block(Block* origin_block, Block* output_block, bool keep_origin = true); // Open probe side asynchronously. - void _probe_side_open_thread(RuntimeState* state, std::promise* status); + virtual void _probe_side_open_thread(RuntimeState* state, std::promise* status); // Initialize the join operation. void _init_join_op(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index ae3f638d99..1f4885d9fc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -297,6 +297,9 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_SHARE_HASH_TABLE_FOR_BROADCAST_JOIN = "enable_share_hash_table_for_broadcast_join"; + // Optimize when probe side has no data for some hash join types + public static final String ENABLE_HASH_JOIN_EARLY_START_PROBE = "enable_hash_join_early_start_probe"; + // support unicode in label, table, column, common name check public static final String ENABLE_UNICODE_NAME_SUPPORT = "enable_unicode_name_support"; @@ -935,6 +938,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_SHARE_HASH_TABLE_FOR_BROADCAST_JOIN, fuzzy = true) public boolean enableShareHashTableForBroadcastJoin = true; + @VariableMgr.VarAttr(name = ENABLE_HASH_JOIN_EARLY_START_PROBE, fuzzy = true) + public boolean enableHashJoinEarlyStartProbe = false; + @VariableMgr.VarAttr(name = ENABLE_UNICODE_NAME_SUPPORT) public boolean enableUnicodeNameSupport = false; @@ -1116,6 +1122,7 @@ public class SessionVariable implements Serializable, Writable { this.partitionedHashJoinRowsThreshold = random.nextBoolean() ? 8 : 1048576; this.partitionedHashAggRowsThreshold = random.nextBoolean() ? 8 : 1048576; this.enableShareHashTableForBroadcastJoin = random.nextBoolean(); + this.enableHashJoinEarlyStartProbe = random.nextBoolean(); int randomInt = random.nextInt(4); if (randomInt % 2 == 0) { this.rewriteOrToInPredicateThreshold = 100000; @@ -2116,6 +2123,7 @@ public class SessionVariable implements Serializable, Writable { tResult.setReturnObjectDataAsBinary(returnObjectDataAsBinary); tResult.setTrimTailingSpacesForExternalTableQuery(trimTailingSpacesForExternalTableQuery); tResult.setEnableShareHashTableForBroadcastJoin(enableShareHashTableForBroadcastJoin); + tResult.setEnableHashJoinEarlyStartProbe(enableHashJoinEarlyStartProbe); tResult.setBatchSize(batchSize); tResult.setDisableStreamPreaggregations(disableStreamPreaggregations); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 8928d78438..e78b64d33b 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -229,6 +229,8 @@ struct TQueryOptions { 76: optional bool enable_inverted_index_query = true; 77: optional bool truncate_char_or_varchar_columns = false + + 78: optional bool enable_hash_join_early_start_probe = false } diff --git a/regression-test/data/query_p0/join/test_hash_join_probe_early_eos.out b/regression-test/data/query_p0/join/test_hash_join_probe_early_eos.out new file mode 100644 index 0000000000..6d4312aba7 --- /dev/null +++ b/regression-test/data/query_p0/join/test_hash_join_probe_early_eos.out @@ -0,0 +1,93 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sanity_check0 -- +1 a +2 b +3 c +4 d +5 e + +-- !sanity_check1 -- +1 a +2 b +3 c + +-- !sanity_check2 -- + +-- !sanity_check3 -- +6 f +7 g + +-- !sanity_check4 -- + +-- !inner_join0 -- + +-- !inner_join1 -- + +-- !inner_join2 -- + +-- !left_join0 -- + +-- !left_join1 -- +1 a \N \N \N \N +2 b \N \N \N \N +3 c \N \N \N \N +4 d \N \N \N \N +5 e \N \N \N \N + +-- !left_join2 -- + +-- !left_join3 -- + +-- !left_semi_join0 -- + +-- !left_semi_join1 -- + +-- !left_semi_join2 -- + +-- !left_anti_join0 -- + +-- !left_anti_join1 -- +1 a +2 b +3 c +4 d +5 e + +-- !left_anti_join2 -- + +-- !inner_join_probe_eos0 -- + +-- !inner_join_join_probe_eos1 -- + +-- !inner_join_join_probe_eos2 -- + +-- !left_join_join_probe_eos0 -- + +-- !left_join_join_probe_eos1 -- +1 a \N \N \N \N +2 b \N \N \N \N +3 c \N \N \N \N +4 d \N \N \N \N +5 e \N \N \N \N + +-- !left_join_join_probe_eos2 -- + +-- !left_join_join_probe_eos3 -- + +-- !left_semi_join_join_probe_eos0 -- + +-- !left_semi_join_join_probe_eos1 -- + +-- !left_semi_join_join_probe_eos2 -- + +-- !left_anti_join_join_probe_eos0 -- + +-- !left_anti_join_join_probe_eos1 -- +1 a +2 b +3 c +4 d +5 e + +-- !left_anti_join_join_probe_eos2 -- + diff --git a/regression-test/suites/query_p0/join/test_hash_join_probe_early_eos.groovy b/regression-test/suites/query_p0/join/test_hash_join_probe_early_eos.groovy new file mode 100644 index 0000000000..963de0f949 --- /dev/null +++ b/regression-test/suites/query_p0/join/test_hash_join_probe_early_eos.groovy @@ -0,0 +1,397 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hash_join_probe_early_eos", "query") { + + sql """ DROP TABLE IF EXISTS test_hash_join_probe_early_eos_t0 """ + sql """ DROP TABLE IF EXISTS test_hash_join_probe_early_eos_t1 """ + sql """ DROP TABLE IF EXISTS test_hash_join_probe_early_eos_t2 """ + sql """ DROP TABLE IF EXISTS test_hash_join_probe_early_eos_t3 """ + + // t0 + sql """ + CREATE TABLE test_hash_join_probe_early_eos_t0 ( + `t0_k0` int, + `t0_v0` varchar(10) + ) ENGINE=OLAP + DISTRIBUTED BY HASH(`t0_k0`) BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + sql """insert into test_hash_join_probe_early_eos_t0 values (1,'a'),(2,'b'),(3,'c'),(4,'d'),(5,'e');""" + + // t1 + sql """ + CREATE TABLE test_hash_join_probe_early_eos_t1 ( + `t1_k0` int, + `t1_v0` varchar(10) + ) ENGINE=OLAP + DISTRIBUTED BY HASH(`t1_k0`) BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + sql """insert into test_hash_join_probe_early_eos_t1 values (1,'a'),(2,'b'),(3,'c');""" + + // t2 + sql """ + CREATE TABLE test_hash_join_probe_early_eos_t2 ( + `t2_k0` int, + `t2_v0` varchar(10) + ) ENGINE=OLAP + DISTRIBUTED BY HASH(`t2_k0`) BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + // t3 + sql """ + CREATE TABLE test_hash_join_probe_early_eos_t3 ( + `t3_k0` int, + `t3_v0` varchar(10) + ) ENGINE=OLAP + DISTRIBUTED BY HASH(`t3_k0`) BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + sql """insert into test_hash_join_probe_early_eos_t3 values (6,'f'),(7,'g');""" + + sql """ set disable_join_reorder = true; """ + // test not probe early eos + sql """ set enable_hash_join_early_start_probe = false; """ + + qt_sanity_check0 """ + select * from test_hash_join_probe_early_eos_t0 order by t0_k0; + """ + qt_sanity_check1 """ + select * from test_hash_join_probe_early_eos_t1 order by t1_k0; + """ + qt_sanity_check2 """ + select * from test_hash_join_probe_early_eos_t2 order by t2_k0; + """ + qt_sanity_check3 """ + select * from test_hash_join_probe_early_eos_t3 order by t3_k0; + """ + qt_sanity_check4 """ + select + * + from + test_hash_join_probe_early_eos_t1 + inner join test_hash_join_probe_early_eos_t3 on t1_k0 = t3_k0 + """ + + qt_inner_join0 """ + select + * + from + test_hash_join_probe_early_eos_t2 + inner join test_hash_join_probe_early_eos_t1 on t2_k0 = t1_k0 + """ + qt_inner_join1 """ + select + * + from + test_hash_join_probe_early_eos_t0 + inner join ( + select + * + from + test_hash_join_probe_early_eos_t2 + inner join test_hash_join_probe_early_eos_t1 on t2_k0 = t1_k0 + ) tmp0 on t0_k0 = t1_k0; + """ + qt_inner_join2 """ + select + * + from + ( + select + * + from + test_hash_join_probe_early_eos_t1 + inner join test_hash_join_probe_early_eos_t3 on t1_k0 = t3_k0 + ) tmp0 inner join test_hash_join_probe_early_eos_t0 on t1_k0 = t0_k0; + """ + + qt_left_join0 """ + select + * + from + test_hash_join_probe_early_eos_t2 + left join test_hash_join_probe_early_eos_t1 on t2_k0 = t1_k0 + order by t2_k0, t1_k0; + """ + qt_left_join1 """ + select + * + from + test_hash_join_probe_early_eos_t0 + left join ( + select + * + from + test_hash_join_probe_early_eos_t2 + left join test_hash_join_probe_early_eos_t1 on t2_k0 = t1_k0 + ) tmp0 on t0_k0 = t1_k0 + order by t0_k0; + """ + qt_left_join2 """ + select + * + from + ( + select + * + from + test_hash_join_probe_early_eos_t2 + left join test_hash_join_probe_early_eos_t1 on t2_k0 = t1_k0 + ) tmp0 left join test_hash_join_probe_early_eos_t0 on t1_k0 = t0_k0; + """ + qt_left_join3 """ + select + * + from + ( + select + * + from + test_hash_join_probe_early_eos_t1 + inner join test_hash_join_probe_early_eos_t3 on t1_k0 = t3_k0 + ) tmp0 left join test_hash_join_probe_early_eos_t0 on t1_k0 = t0_k0; + + """ + + qt_left_semi_join0 """ + select + * + from + test_hash_join_probe_early_eos_t2 + left semi join test_hash_join_probe_early_eos_t1 on t2_k0 = t1_k0 + """ + qt_left_semi_join1 """ + select + * + from + test_hash_join_probe_early_eos_t0 + left semi join ( + select + * + from + test_hash_join_probe_early_eos_t2 + left semi join test_hash_join_probe_early_eos_t1 on t2_k0 = t1_k0 + ) tmp0 on t0_k0 = t2_k0; + """ + qt_left_semi_join2 """ + select + * + from + ( + select + * + from + test_hash_join_probe_early_eos_t2 + left semi join test_hash_join_probe_early_eos_t1 on t2_k0 = t1_k0 + ) tmp0 left semi join test_hash_join_probe_early_eos_t0 on t2_k0 = t0_k0; + """ + + qt_left_anti_join0 """ + select + * + from + test_hash_join_probe_early_eos_t2 + left anti join test_hash_join_probe_early_eos_t1 on t2_k0 = t1_k0 + """ + qt_left_anti_join1 """ + select + * + from + test_hash_join_probe_early_eos_t0 + left anti join ( + select + * + from + test_hash_join_probe_early_eos_t2 + left anti join test_hash_join_probe_early_eos_t1 on t2_k0 = t1_k0 + ) tmp0 on t0_k0 = t2_k0 + order by t0_k0; + """ + qt_left_anti_join2 """ + select + * + from + ( + select + * + from + test_hash_join_probe_early_eos_t2 + left anti join test_hash_join_probe_early_eos_t1 on t2_k0 = t1_k0 + ) tmp0 left anti join test_hash_join_probe_early_eos_t0 on t2_k0 = t0_k0; + """ + + // test not probe early eos + sql """ set enable_hash_join_early_start_probe = true; """ + qt_inner_join_probe_eos0 """ + select + * + from + test_hash_join_probe_early_eos_t2 + inner join test_hash_join_probe_early_eos_t1 on t2_k0 = t1_k0 + """ + qt_inner_join_join_probe_eos1 """ + select + * + from + test_hash_join_probe_early_eos_t0 + inner join ( + select + * + from + test_hash_join_probe_early_eos_t2 + inner join test_hash_join_probe_early_eos_t1 on t2_k0 = t1_k0 + ) tmp0 on t0_k0 = t1_k0 + order by t0_k0; + """ + qt_inner_join_join_probe_eos2 """ + select + * + from + ( + select + * + from + test_hash_join_probe_early_eos_t1 + inner join test_hash_join_probe_early_eos_t3 on t1_k0 = t3_k0 + ) tmp0 inner join test_hash_join_probe_early_eos_t0 on t1_k0 = t0_k0; + """ + + qt_left_join_join_probe_eos0 """ + select + * + from + test_hash_join_probe_early_eos_t2 + left join test_hash_join_probe_early_eos_t1 on t2_k0 = t1_k0 + """ + qt_left_join_join_probe_eos1 """ + select + * + from + test_hash_join_probe_early_eos_t0 + left join ( + select + * + from + test_hash_join_probe_early_eos_t2 + left join test_hash_join_probe_early_eos_t1 on t2_k0 = t1_k0 + ) tmp0 on t0_k0 = t1_k0 + order by t0_k0; + """ + qt_left_join_join_probe_eos2 """ + select + * + from + ( + select + * + from + test_hash_join_probe_early_eos_t2 + left join test_hash_join_probe_early_eos_t1 on t2_k0 = t1_k0 + ) tmp0 left join test_hash_join_probe_early_eos_t0 on t1_k0 = t0_k0; + """ + qt_left_join_join_probe_eos3 """ + select + * + from + ( + select + * + from + test_hash_join_probe_early_eos_t1 + inner join test_hash_join_probe_early_eos_t3 on t1_k0 = t3_k0 + ) tmp0 left join test_hash_join_probe_early_eos_t0 on t1_k0 = t0_k0; + + """ + + qt_left_semi_join_join_probe_eos0 """ + select + * + from + test_hash_join_probe_early_eos_t2 + left semi join test_hash_join_probe_early_eos_t1 on t2_k0 = t1_k0 + """ + qt_left_semi_join_join_probe_eos1 """ + select + * + from + test_hash_join_probe_early_eos_t0 + left semi join ( + select + * + from + test_hash_join_probe_early_eos_t2 + left semi join test_hash_join_probe_early_eos_t1 on t2_k0 = t1_k0 + ) tmp0 on t0_k0 = t2_k0; + """ + qt_left_semi_join_join_probe_eos2 """ + select + * + from + ( + select + * + from + test_hash_join_probe_early_eos_t2 + left semi join test_hash_join_probe_early_eos_t1 on t2_k0 = t1_k0 + ) tmp0 left semi join test_hash_join_probe_early_eos_t0 on t2_k0 = t0_k0; + """ + + qt_left_anti_join_join_probe_eos0 """ + select + * + from + test_hash_join_probe_early_eos_t2 + left anti join test_hash_join_probe_early_eos_t1 on t2_k0 = t1_k0 + """ + qt_left_anti_join_join_probe_eos1 """ + select + * + from + test_hash_join_probe_early_eos_t0 + left anti join ( + select + * + from + test_hash_join_probe_early_eos_t2 + left anti join test_hash_join_probe_early_eos_t1 on t2_k0 = t1_k0 + ) tmp0 on t0_k0 = t2_k0 + order by t0_k0; + """ + qt_left_anti_join_join_probe_eos2 """ + select + * + from + ( + select + * + from + test_hash_join_probe_early_eos_t2 + left anti join test_hash_join_probe_early_eos_t1 on t2_k0 = t1_k0 + ) tmp0 left anti join test_hash_join_probe_early_eos_t0 on t2_k0 = t0_k0; + """ +}