From a52b801e5f35a823c8844e046dc4ca72dce82513 Mon Sep 17 00:00:00 2001 From: obdev Date: Wed, 27 Dec 2023 06:13:31 +0000 Subject: [PATCH] fix: fix bug of multi level batch nlj lost data bug --- src/sql/engine/basic/ob_group_join_buffer.cpp | 12 +++++ .../join/hash_join/ob_hash_join_vec_op.cpp | 2 +- .../join/hash_join/ob_hash_join_vec_op.h | 2 +- .../engine/join/ob_nested_loop_join_op.cpp | 48 +++++++++++++++++++ src/sql/engine/join/ob_nested_loop_join_op.h | 6 ++- src/sql/engine/ob_operator.h | 6 ++- 6 files changed, 71 insertions(+), 5 deletions(-) diff --git a/src/sql/engine/basic/ob_group_join_buffer.cpp b/src/sql/engine/basic/ob_group_join_buffer.cpp index 11424c53b8..91f2d17149 100644 --- a/src/sql/engine/basic/ob_group_join_buffer.cpp +++ b/src/sql/engine/basic/ob_group_join_buffer.cpp @@ -197,9 +197,13 @@ int ObGroupJoinBufffer::has_next_left_row(bool &has_next) LOG_WARN("left row read and group idx do not match", KR(ret), K(left_store_read_), K(left_store_group_idx_.count())); } else if (above_group_idx_for_read_ == left_store_group_idx_.at(left_store_read_)) { + // above_group_idx_for_read_ < left_store_group_idx_.at(left_store_read_) means the row of above_group_idx_ is end, + // need to return iter_end, and rescan current NLJ operator // we are still reading results for the current rescan param, need to rescan right child has_next = true; } + } else { + LOG_TRACE("Left child operator has no left rows for read,and the join buffer has no left row, needs to return iter_end_"); } return ret; } @@ -345,6 +349,7 @@ int ObGroupJoinBufffer::drain_left() const ObChunkDatumStore::StoredRow *row = NULL; // drain old rows from left store for (int64_t i = left_store_read_; OB_SUCC(ret) && need_drain && i < left_store_group_idx_.count(); ++i) { + // In addition to the lines of the current group (above_group_idx_for_read_), the buffer also caches rows of subsequent groups if (above_group_idx_for_read_ != left_store_group_idx_.at(i)) { need_drain = false; } else if (OB_FAIL(left_store_iter_.get_next_row(row))) { @@ -354,6 +359,7 @@ int ObGroupJoinBufffer::drain_left() ++left_store_read_; } } + // Only rows of the current group are left in the buffer, and there are rows of the current group that have not been added to the buffer // discard unread rows from left op if (OB_SUCC(ret) && need_drain && !is_left_end_) { if (!spec_->is_vectorized()) { @@ -584,6 +590,8 @@ int ObGroupJoinBufffer::batch_fill_group_buffer(const int64_t max_row_cnt, } else if (OB_FAIL(backup_above_params(left_params_backup, right_params_backup))) { LOG_WARN("backup above params failed", KR(ret)); } + + // fill group join buffer of current op untill join buffer is full if (OB_SUCC(ret)) { ObEvalCtx::BatchInfoScopeGuard batch_info_guard(*eval_ctx_); if (save_last_batch_) { @@ -615,6 +623,7 @@ int ObGroupJoinBufffer::batch_fill_group_buffer(const int64_t max_row_cnt, } } } + // rescan left op, switch to next iter of left child op if (OB_SUCC(ret) && batch_rows->end_) { is_left_end_ = true; if (is_multi_level_) { @@ -628,6 +637,7 @@ int ObGroupJoinBufffer::batch_fill_group_buffer(const int64_t max_row_cnt, } } } + if (OB_SUCC(ret)) { if (!rescan_params_->empty()) { op_->set_pushdown_param_null(*rescan_params_); @@ -635,9 +645,11 @@ int ObGroupJoinBufffer::batch_fill_group_buffer(const int64_t max_row_cnt, if (batch_rows->size_ == 0 && batch_rows->end_) { // do nothing } else { + // if buffer is full ,but the last batch rows of left op is not added, save them to last batch last_batch_.from_exprs(*eval_ctx_, batch_rows->skip_, spec_->max_batch_size_); save_last_batch_ = true; } + op_->clear_evaluated_flag(); if (left_store_.get_row_cnt() <= 0) { // this could happen if we have skipped all rows diff --git a/src/sql/engine/join/hash_join/ob_hash_join_vec_op.cpp b/src/sql/engine/join/hash_join/ob_hash_join_vec_op.cpp index a14232f9a6..cf92f685f5 100644 --- a/src/sql/engine/join/hash_join/ob_hash_join_vec_op.cpp +++ b/src/sql/engine/join/hash_join/ob_hash_join_vec_op.cpp @@ -647,7 +647,7 @@ int ObHashJoinVecOp::process_left(bool &need_not_read_right) // copy ObOperator::drain_exch // It's same as the base operator, but only need add sync to wait exit for shared hash join -int ObHashJoinVecOp::drain_exch() +int ObHashJoinVecOp::do_drain_exch() { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; diff --git a/src/sql/engine/join/hash_join/ob_hash_join_vec_op.h b/src/sql/engine/join/hash_join/ob_hash_join_vec_op.h index c68dcd1526..cc936289be 100644 --- a/src/sql/engine/join/hash_join/ob_hash_join_vec_op.h +++ b/src/sql/engine/join/hash_join/ob_hash_join_vec_op.h @@ -315,7 +315,7 @@ private: public: virtual int inner_open() override; virtual int inner_rescan() override; - virtual int drain_exch() override; + virtual int do_drain_exch() override; virtual int inner_get_next_batch(const int64_t max_row_cnt) override; virtual int inner_get_next_row() { return common::OB_NOT_IMPLEMENT; }; virtual void destroy() override; diff --git a/src/sql/engine/join/ob_nested_loop_join_op.cpp b/src/sql/engine/join/ob_nested_loop_join_op.cpp index d33c996fe6..f35d8e7cf6 100644 --- a/src/sql/engine/join/ob_nested_loop_join_op.cpp +++ b/src/sql/engine/join/ob_nested_loop_join_op.cpp @@ -187,6 +187,54 @@ int ObNestedLoopJoinOp::rescan() return ret; } +int ObNestedLoopJoinOp::do_drain_exch_multi_lvel_bnlj() +{ + int ret = OB_SUCCESS; + if (OB_FAIL(try_open())) { + LOG_WARN("fail to open operator", K(ret)); + } else if (!exch_drained_) { + // the drain request is triggered by current NLJ operator, and current NLJ is a multi level Batch NLJ + // It will block rescan request for it's child operator, if the drain request is passed to it's child operator + // The child operators will be marked as iter-end_, and will not get any row if rescan is blocked + // So we block the drain request here; Only set current operator to end; + int tmp_ret = inner_drain_exch(); + exch_drained_ = true; + brs_.end_ = true; + batch_reach_end_ = true; + row_reach_end_ = true; + if (OB_SUCC(ret)) { + ret = tmp_ret; + } + } + return ret; +} + +int ObNestedLoopJoinOp::do_drain_exch() +{ + int ret = OB_SUCCESS; + if (!MY_SPEC.group_rescan_) { + if (OB_FAIL( ObOperator::do_drain_exch())) { + LOG_WARN("failed to drain NLJ operator", K(ret)); + } + } else if (!group_join_buffer_.is_multi_level()) { + if (OB_FAIL( ObOperator::do_drain_exch())) { + LOG_WARN("failed to drain NLJ operator", K(ret)); + } + } else { + if (!is_operator_end()) { + // the drain request is triggered by parent operator + // NLJ needs to pass the drain request to it's child operator + LOG_TRACE("The drain request is passed by parent operator"); + if (OB_FAIL( ObOperator::do_drain_exch())) { + LOG_WARN("failed to drain normal NLJ operator", K(ret)); + } + } else if (OB_FAIL(do_drain_exch_multi_lvel_bnlj())) { + LOG_WARN("failed to drain multi level NLJ operator", K(ret)); + } + } + return ret; +} + int ObNestedLoopJoinOp::inner_rescan() { int ret = OB_SUCCESS; diff --git a/src/sql/engine/join/ob_nested_loop_join_op.h b/src/sql/engine/join/ob_nested_loop_join_op.h index 21480bce28..6710d17075 100644 --- a/src/sql/engine/join/ob_nested_loop_join_op.h +++ b/src/sql/engine/join/ob_nested_loop_join_op.h @@ -94,6 +94,8 @@ public: virtual int switch_iterator() override; virtual int rescan() override; virtual int inner_rescan() override; + + virtual int inner_get_next_row() override; virtual void destroy() override { @@ -121,7 +123,7 @@ public: ObBatchRescanCtl &get_batch_rescan_ctl() { return batch_rescan_ctl_; } int fill_cur_row_rescan_param(); int calc_other_conds(bool &is_match); - + int do_drain_exch_multi_lvel_bnlj(); private: // state operation and transfer function type. typedef int (ObNestedLoopJoinOp::*state_operation_func_type)(); @@ -167,6 +169,8 @@ private: // for refactor vectorized end bool continue_fetching() { return !(left_brs_->end_ || is_full());} + virtual int do_drain_exch() override; + virtual int inner_drain_exch() { return OB_SUCCESS; } public: ObJoinState state_; // for bnl join diff --git a/src/sql/engine/ob_operator.h b/src/sql/engine/ob_operator.h index f6cdd96bad..aa127c9ec4 100644 --- a/src/sql/engine/ob_operator.h +++ b/src/sql/engine/ob_operator.h @@ -478,11 +478,14 @@ public: bool &filtered); ObBatchRows &get_brs() { return brs_; } // Drain exchange in data for PX, or producer DFO will be blocked. - virtual int drain_exch(); + int drain_exch(); void set_pushdown_param_null(const common::ObIArray &rescan_params); void set_feedback_node_idx(int64_t idx) { fb_node_idx_ = idx; } + + bool is_operator_end() { return batch_reach_end_ || row_reach_end_ ; } protected: + virtual int do_drain_exch(); int init_skip_vector(); // Execute filter // Calc buffer does not reset internally, you need to reset it appropriately. @@ -565,7 +568,6 @@ private: int output_expr_decint_datum_len_check(); int output_expr_decint_datum_len_check_batch(); int setup_op_feedback_info(); - int do_drain_exch(); // child can implement this interface, but can't call this directly virtual int inner_drain_exch() { return common::OB_SUCCESS; }; protected: