From 0683181fefdaf68e48e080c2cf0ae17623be729b Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Mon, 30 May 2022 09:04:21 +0800 Subject: [PATCH] [API changed](parser) Remove merge join syntax (#9795) Remove merge join sql and merge join node --- be/src/exec/CMakeLists.txt | 1 - be/src/exec/exec_node.cpp | 5 - be/src/exec/merge_join_node.cpp | 334 ------------------ be/src/exec/merge_join_node.h | 95 ----- be/src/vec/exec/join/vhash_join_node.h | 2 - fe/fe-core/src/main/cup/sql_parser.cup | 2 - .../apache/doris/analysis/JoinOperator.java | 1 - .../org/apache/doris/analysis/TableRef.java | 2 - gensrc/thrift/PlanNodes.thrift | 4 +- 9 files changed, 2 insertions(+), 444 deletions(-) delete mode 100644 be/src/exec/merge_join_node.cpp delete mode 100644 be/src/exec/merge_join_node.h diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index d93c394047..7ce4c4caf3 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -44,7 +44,6 @@ set(EXEC_FILES hash_table.cpp local_file_reader.cpp merge_node.cpp - merge_join_node.cpp scan_node.cpp select_node.cpp text_converter.cpp diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index ccd8e3c854..dd5bfda921 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -37,7 +37,6 @@ #include "exec/exchange_node.h" #include "exec/hash_join_node.h" #include "exec/intersect_node.h" -#include "exec/merge_join_node.h" #include "exec/merge_node.h" #include "exec/mysql_scan_node.h" #include "exec/odbc_scan_node.h" @@ -475,10 +474,6 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN } return Status::OK(); - case TPlanNodeType::MERGE_JOIN_NODE: - *node = pool->add(new MergeJoinNode(pool, tnode, descs)); - return Status::OK(); - case TPlanNodeType::EMPTY_SET_NODE: if (state->enable_vectorized_exec()) { *node = pool->add(new vectorized::VEmptySetNode(pool, tnode, descs)); diff --git a/be/src/exec/merge_join_node.cpp b/be/src/exec/merge_join_node.cpp deleted file mode 100644 index 453963f601..0000000000 --- a/be/src/exec/merge_join_node.cpp +++ /dev/null @@ -1,334 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "exec/merge_join_node.h" - -#include - -#include "exprs/expr.h" -#include "exprs/expr_context.h" -#include "exprs/in_predicate.h" -#include "gen_cpp/PlanNodes_types.h" -#include "runtime/row_batch.h" -#include "runtime/runtime_state.h" -#include "util/debug_util.h" -#include "util/runtime_profile.h" - -namespace doris { - -template -int compare_value(const void* left_value, const void* right_value) { - if (*(T*)left_value < *(T*)right_value) { - return -1; - } else if (*(T*)left_value == *(T*)right_value) { - return 0; - } else { - return 1; - } -} - -template -int compare_value(const StringValue* left_value, const StringValue* right_value) { - return left_value->compare(*right_value); -} - -MergeJoinNode::MergeJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : ExecNode(pool, tnode, descs), _out_batch(nullptr) {} - -MergeJoinNode::~MergeJoinNode() {} - -Status MergeJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { - DCHECK(tnode.__isset.merge_join_node); - RETURN_IF_ERROR(ExecNode::init(tnode, state)); - const std::vector& cmp_conjuncts = tnode.merge_join_node.cmp_conjuncts; - - for (int i = 0; i < cmp_conjuncts.size(); ++i) { - ExprContext* ctx = nullptr; - RETURN_IF_ERROR(Expr::create_expr_tree(_pool, cmp_conjuncts[i].left, &ctx)); - _left_expr_ctxs.push_back(ctx); - RETURN_IF_ERROR(Expr::create_expr_tree(_pool, cmp_conjuncts[i].right, &ctx)); - _right_expr_ctxs.push_back(ctx); - } - - RETURN_IF_ERROR(Expr::create_expr_trees(_pool, tnode.merge_join_node.other_join_conjuncts, - &_other_join_conjunct_ctxs)); - return Status::OK(); -} - -Status MergeJoinNode::prepare(RuntimeState* state) { - RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); - - // build and probe exprs are evaluated in the context of the rows produced by our - // right and left children, respectively - RETURN_IF_ERROR( - Expr::prepare(_left_expr_ctxs, state, child(0)->row_desc(), expr_mem_tracker())); - RETURN_IF_ERROR( - Expr::prepare(_right_expr_ctxs, state, child(1)->row_desc(), expr_mem_tracker())); - - for (int i = 0; i < _left_expr_ctxs.size(); ++i) { - switch (_left_expr_ctxs[i]->root()->type().type) { - case TYPE_TINYINT: - _cmp_func.push_back(compare_value); - break; - - case TYPE_SMALLINT: - _cmp_func.push_back(compare_value); - break; - - case TYPE_INT: - _cmp_func.push_back(compare_value); - break; - - case TYPE_BIGINT: - _cmp_func.push_back(compare_value); - break; - - case TYPE_LARGEINT: - _cmp_func.push_back(compare_value<__int128>); - break; - - case TYPE_CHAR: - case TYPE_VARCHAR: - case TYPE_STRING: - _cmp_func.push_back(compare_value); - break; - - default: - return Status::InternalError("unsupported compare type."); - break; - } - } - - // _other_join_conjuncts are evaluated in the context of the rows produced by this node - RETURN_IF_ERROR( - Expr::prepare(_other_join_conjunct_ctxs, state, _row_descriptor, expr_mem_tracker())); - - _result_tuple_row_size = _row_descriptor.tuple_descriptors().size() * sizeof(Tuple*); - // pre-compute the tuple index of build tuples in the output row - - _left_tuple_size = child(0)->row_desc().tuple_descriptors().size(); - _right_tuple_size = child(1)->row_desc().tuple_descriptors().size(); - _right_tuple_idx.reserve(_right_tuple_size); - - for (int i = 0; i < _right_tuple_size; ++i) { - TupleDescriptor* right_tuple_desc = child(1)->row_desc().tuple_descriptors()[i]; - _right_tuple_idx.push_back(_row_descriptor.get_tuple_idx(right_tuple_desc->id())); - } - - _left_child_ctx.reset(new ChildReaderContext(row_desc(), state->batch_size())); - _right_child_ctx.reset(new ChildReaderContext(row_desc(), state->batch_size())); - - return Status::OK(); -} - -Status MergeJoinNode::close(RuntimeState* state) { - if (is_closed()) { - return Status::OK(); - } - RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE)); - Expr::close(_left_expr_ctxs, state); - Expr::close(_right_expr_ctxs, state); - Expr::close(_other_join_conjunct_ctxs, state); - return ExecNode::close(state); -} - -Status MergeJoinNode::open(RuntimeState* state) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); - RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(ExecNode::open(state)); - RETURN_IF_ERROR(Expr::open(_left_expr_ctxs, state)); - RETURN_IF_ERROR(Expr::open(_right_expr_ctxs, state)); - RETURN_IF_ERROR(Expr::open(_other_join_conjunct_ctxs, state)); - - _eos = false; - // Open the probe-side child so that it may perform any initialisation in parallel. - // Don't exit even if we see an error, we still need to wait for the build thread - // to finish. - RETURN_IF_ERROR(child(0)->open(state)); - RETURN_IF_ERROR(child(1)->open(state)); - - RETURN_IF_ERROR(get_input_row(state, 0)); - RETURN_IF_ERROR(get_input_row(state, 1)); - - return Status::OK(); -} - -Status MergeJoinNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eos) { - RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT)); - RETURN_IF_CANCELLED(state); - SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker()); - - if (reached_limit() || _eos) { - *eos = true; - return Status::OK(); - } - - while (true) { - int row_idx = out_batch->add_row(); - DCHECK(row_idx != RowBatch::INVALID_ROW_INDEX); - TupleRow* row = out_batch->get_row(row_idx); - - _out_batch = out_batch; - RETURN_IF_ERROR(get_next_row(state, row, eos)); - - if (*eos) { - _eos = true; - return Status::OK(); - } - - if (eval_conjuncts(&_other_join_conjunct_ctxs[0], _other_join_conjunct_ctxs.size(), row)) { - out_batch->commit_last_row(); - ++_num_rows_returned; - COUNTER_SET(_rows_returned_counter, _num_rows_returned); - } - - if (out_batch->is_full() || out_batch->at_resource_limit() || reached_limit()) { - break; - } - } - - return Status::OK(); -} - -void MergeJoinNode::create_output_row(TupleRow* out, TupleRow* left, TupleRow* right) { - if (left == nullptr) { - memset(out, 0, _left_tuple_size); - } else { - memcpy(out, left, _left_tuple_size); - } - - if (right != nullptr) { - for (int i = 0; i < _right_tuple_size; ++i) { - out->set_tuple(_right_tuple_idx[i], right->get_tuple(i)); - } - } else { - for (int i = 0; i < _right_tuple_size; ++i) { - out->set_tuple(_right_tuple_idx[i], nullptr); - } - } -} - -Status MergeJoinNode::compare_row(TupleRow* left_row, TupleRow* right_row, bool* is_lt) { - if (left_row == nullptr) { - *is_lt = false; - return Status::OK(); - } else if (right_row == nullptr) { - *is_lt = true; - return Status::OK(); - } - - for (int i = 0; i < _left_expr_ctxs.size(); ++i) { - void* left_value = _left_expr_ctxs[i]->get_value(left_row); - void* right_value = _right_expr_ctxs[i]->get_value(right_row); - int cmp_val = _cmp_func[i](left_value, right_value); - - if (cmp_val < 0) { - *is_lt = true; - return Status::OK(); - } else if (cmp_val == 0) { - // do nothing - } else { - *is_lt = false; - return Status::OK(); - } - } - - // equal - *is_lt = false; - - return Status::OK(); -} - -Status MergeJoinNode::get_next_row(RuntimeState* state, TupleRow* out_row, bool* eos) { - TupleRow* left_row = _left_child_ctx->current_row; - TupleRow* right_row = _right_child_ctx->current_row; - - if (left_row == nullptr && right_row == nullptr) { - *eos = true; - return Status::OK(); - } - - bool is_lt = true; - RETURN_IF_ERROR(compare_row(left_row, right_row, &is_lt)); - - if (is_lt) { - create_output_row(out_row, left_row, nullptr); - RETURN_IF_ERROR(get_input_row(state, 0)); - } else { - create_output_row(out_row, nullptr, right_row); - RETURN_IF_ERROR(get_input_row(state, 1)); - } - - return Status::OK(); -} - -Status MergeJoinNode::get_input_row(RuntimeState* state, int child_idx) { - ChildReaderContext* ctx = nullptr; - - if (child_idx == 0) { - ctx = _left_child_ctx.get(); - } else { - ctx = _right_child_ctx.get(); - } - - // loop util read a valid data - while (!ctx->is_eos && ctx->row_idx >= ctx->batch.num_rows()) { - // transfer ownership before get new batch - if (nullptr != _out_batch) { - ctx->batch.transfer_resource_ownership(_out_batch); - } - - if (child_idx == 0) { - _left_child_ctx.reset( - new ChildReaderContext(child(child_idx)->row_desc(), state->batch_size())); - ctx = _left_child_ctx.get(); - } else { - _right_child_ctx.reset( - new ChildReaderContext(child(child_idx)->row_desc(), state->batch_size())); - ctx = _right_child_ctx.get(); - } - - RETURN_IF_ERROR(child(child_idx)->get_next(state, &ctx->batch, &ctx->is_eos)); - } - - if (ctx->row_idx >= ctx->batch.num_rows()) { - ctx->current_row = nullptr; - return Status::OK(); - } - - ctx->current_row = ctx->batch.get_row(ctx->row_idx++); - return Status::OK(); -} - -void MergeJoinNode::debug_string(int indentation_level, std::stringstream* out) const { - *out << string(indentation_level * 2, ' '); - *out << "MergeJoin(eos=" << (_eos ? "true" : "false") - << " _left_child_pos=" << (_left_child_ctx.get() ? _left_child_ctx->row_idx : -1) - << " _right_child_pos=" << (_right_child_ctx.get() ? _right_child_ctx->row_idx : -1) - << " join_conjuncts="; - *out << "Conjunct("; - // << " left_exprs=" << Expr::debug_string(_left_exprs) - // << " right_exprs=" << Expr::debug_string(_right_exprs); - *out << ")"; - ExecNode::debug_string(indentation_level, out); - *out << ")"; -} - -} // namespace doris diff --git a/be/src/exec/merge_join_node.h b/be/src/exec/merge_join_node.h deleted file mode 100644 index 50c949f339..0000000000 --- a/be/src/exec/merge_join_node.h +++ /dev/null @@ -1,95 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include -#include - -#include "exec/exec_node.h" -#include "gen_cpp/PlanNodes_types.h" // for TJoinOp -#include "runtime/row_batch.h" - -namespace doris { - -class MemPool; -class TupleRow; - -// Node for in-memory merge joins: -// find the minimal tuple and output -class MergeJoinNode : public ExecNode { -public: - MergeJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); - - ~MergeJoinNode(); - - virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr); - virtual Status prepare(RuntimeState* state); - virtual Status open(RuntimeState* state); - virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos); - virtual Status close(RuntimeState* state); - -protected: - void debug_string(int indentation_level, std::stringstream* out) const; - -private: - // our equi-join predicates " = " are separated into - // _left_exprs (over child(0)) and _right_exprs (over child(1)) - // check which expr is min - std::vector _left_expr_ctxs; - std::vector _right_expr_ctxs; - - // non-equi-join conjuncts from the JOIN clause - std::vector _other_join_conjunct_ctxs; - - bool _eos; // if true, nothing left to return in get_next() - - struct ChildReaderContext { - RowBatch batch; - int row_idx; - bool is_eos; - TupleRow* current_row; - ChildReaderContext(const RowDescriptor& desc, int batch_size) - : batch(desc, batch_size), row_idx(0), is_eos(false), current_row(nullptr) {} - }; - // _left_batch must be cleared before calling get_next(). used cache child(0)'s data - // _right_batch must be cleared before calling get_next(). used cache child(1)'s data - // does not initialize all tuple ptrs in the row, only the ones that it - // is responsible for. - std::unique_ptr _left_child_ctx; - std::unique_ptr _right_child_ctx; - // _build_tuple_idx[i] is the tuple index of child(1)'s tuple[i] in the output row - std::vector _right_tuple_idx; - int _right_tuple_size; - int _left_tuple_size; - RowBatch* _out_batch; - - typedef int (*CompareFn)(const void*, const void*); - std::vector _cmp_func; - - // byte size of result tuple row (sum of the tuple ptrs, not the tuple data). - // This should be the same size as the probe tuple row. - int _result_tuple_row_size; - - void create_output_row(TupleRow* out, TupleRow* left, TupleRow* right); - Status compare_row(TupleRow* left_row, TupleRow* right_row, bool* is_lt); - Status get_next_row(RuntimeState* state, TupleRow* out_row, bool* eos); - Status get_input_row(RuntimeState* state, int child_idx); -}; - -} // namespace doris diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index 13bbb365b4..7db2db48d3 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -124,7 +124,6 @@ using JoinOpVariants = std::integral_constant, std::integral_constant, std::integral_constant, - std::integral_constant, std::integral_constant, std::integral_constant, std::integral_constant>; @@ -137,7 +136,6 @@ using JoinOpVariants = M(FULL_OUTER_JOIN) \ M(RIGHT_OUTER_JOIN) \ M(CROSS_JOIN) \ - M(MERGE_JOIN) \ M(RIGHT_SEMI_JOIN) \ M(RIGHT_ANTI_JOIN) \ M(NULL_AWARE_LEFT_ANTI_JOIN) diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 53ccf909be..bdb5730e37 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -4345,8 +4345,6 @@ join_operator ::= {: RESULT = JoinOperator.INNER_JOIN; :} | KW_LEFT opt_outer KW_JOIN {: RESULT = JoinOperator.LEFT_OUTER_JOIN; :} - | KW_MERGE KW_JOIN - {: RESULT = JoinOperator.MERGE_JOIN; :} | KW_RIGHT opt_outer KW_JOIN {: RESULT = JoinOperator.RIGHT_OUTER_JOIN; :} | KW_FULL opt_outer KW_JOIN diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/JoinOperator.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/JoinOperator.java index f0f6356ff2..6db551e76d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/JoinOperator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/JoinOperator.java @@ -32,7 +32,6 @@ public enum JoinOperator { RIGHT_ANTI_JOIN("RIGHT ANTI JOIN", TJoinOp.RIGHT_ANTI_JOIN), RIGHT_OUTER_JOIN("RIGHT OUTER JOIN", TJoinOp.RIGHT_OUTER_JOIN), FULL_OUTER_JOIN("FULL OUTER JOIN", TJoinOp.FULL_OUTER_JOIN), - MERGE_JOIN("MERGE JOIN", TJoinOp.MERGE_JOIN), CROSS_JOIN("CROSS JOIN", TJoinOp.CROSS_JOIN), // Variant of the LEFT ANTI JOIN that is used for the equal of // NOT IN subqueries. It can have a single equality join conjunct diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java index 429fc808c0..1d15ea6fe4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java @@ -602,8 +602,6 @@ public class TableRef implements ParseNode, Writable { return "FULL OUTER JOIN"; case CROSS_JOIN: return "CROSS JOIN"; - case MERGE_JOIN: - return "MERGE JOIN"; default: return "bad join op: " + joinOp.toString(); } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index d4d37e11ac..f94532bcb6 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -29,7 +29,7 @@ enum TPlanNodeType { CSV_SCAN_NODE, // deprecated SCHEMA_SCAN_NODE, HASH_JOIN_NODE, - MERGE_JOIN_NODE, + MERGE_JOIN_NODE, // deprecated AGGREGATION_NODE, PRE_AGGREGATION_NODE, SORT_NODE, @@ -366,7 +366,7 @@ enum TJoinOp { RIGHT_OUTER_JOIN, FULL_OUTER_JOIN, CROSS_JOIN, - MERGE_JOIN, + MERGE_JOIN, // deprecated RIGHT_SEMI_JOIN, LEFT_ANTI_JOIN,