From 6a42a36864838e91f18661e3ea86a3df404c90e3 Mon Sep 17 00:00:00 2001 From: shibin-xu Date: Wed, 10 May 2023 05:51:43 +0000 Subject: [PATCH] fix wrong output bug for group join buffer --- src/sql/engine/basic/ob_group_join_buffer.cpp | 133 ++++++++++++++---- src/sql/engine/basic/ob_group_join_buffer.h | 1 + 2 files changed, 103 insertions(+), 31 deletions(-) diff --git a/src/sql/engine/basic/ob_group_join_buffer.cpp b/src/sql/engine/basic/ob_group_join_buffer.cpp index e62f4e2f4..ee19ad94b 100644 --- a/src/sql/engine/basic/ob_group_join_buffer.cpp +++ b/src/sql/engine/basic/ob_group_join_buffer.cpp @@ -333,43 +333,114 @@ int ObGroupJoinBufffer::get_next_left_iter() return ret; } +int ObGroupJoinBufffer::drain_left() +{ + int ret = OB_SUCCESS; + bool need_drain = !left_store_group_idx_.empty(); + const ObChunkDatumStore::StoredRow *row = NULL; + // drain old rows from left store + for (int64_t i = left_store_read_; OB_SUCC(ret) && need_drain && i < left_store_group_idx_.count(); ++i) { + if (above_group_idx_for_read_ != left_store_group_idx_.at(i)) { + need_drain = false; + } else if (OB_FAIL(left_store_iter_.get_next_row(row))) { + ret = (OB_ITER_END == ret) ? OB_ERR_UNEXPECTED : ret; + LOG_WARN("get next row failed", KR(ret)); + } else { + ++left_store_read_; + } + } + // discard unread rows from left op + if (OB_SUCC(ret) && need_drain && !is_left_end_) { + if (!spec_->is_vectorized()) { + while (OB_SUCC(ret)) { + op_->clear_evaluated_flag(); + if (!rescan_params_->empty()) { + op_->set_pushdown_param_null(*rescan_params_); + } + ret = left_->get_next_row(); + } + if (OB_ITER_END == ret) { + ret = OB_SUCCESS; + is_left_end_ = true; + } else { + LOG_WARN("get next left row failed", KR(ret)); + } + } else { + const ObBatchRows *batch_rows = NULL; + int64_t max_row_cnt = min(max_group_size_, spec_->max_batch_size_); + if (save_last_batch_) { + last_batch_.to_exprs(*eval_ctx_); + save_last_batch_ = false; + } + batch_rows = &left_->get_brs(); + while (OB_SUCC(ret) && !batch_rows->end_) { + if (!rescan_params_->empty()) { + op_->set_pushdown_param_null(*rescan_params_); + } + if (OB_FAIL(left_->get_next_batch(max_row_cnt, batch_rows))) { + LOG_WARN("get next batch from left failed", KR(ret)); + } + } + if (OB_SUCC(ret) && batch_rows->end_) { + is_left_end_ = true; + } + if (OB_SUCC(ret)) { + if (!(batch_rows->size_ == 0 && batch_rows->end_)) { + last_batch_.from_exprs(*eval_ctx_, batch_rows->skip_, spec_->max_batch_size_); + save_last_batch_ = true; + } + const_cast(batch_rows)->end_ = false; + } + } + } + return ret; +} + int ObGroupJoinBufffer::rescan_left() { int ret = OB_SUCCESS; bool need_rescan = false; - if (is_multi_level_) { - // for multi level group rescan, we only rescan left if - // there is a new group of params for left child. - // note that if op_ is a child of SPF, param store can be - // filled with a new group before we finish processing the - // current group. in that case, we need to discard the old - // group and switch to the new batch. - ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(*ctx_); - for (int64_t i = 0; !need_rescan && i < left_rescan_params_->count(); i++) { - int64_t param_idx = left_rescan_params_->at(i).param_idx_; - if (plan_ctx->get_param_store().at(param_idx).is_ext_sql_array()) { - need_rescan = true; + const ObChunkDatumStore::StoredRow *row = NULL; + if (OB_SUCC(ret)) { + if (is_multi_level_) { + // for multi level group rescan, we only rescan left if + // there is a new group of params for left child. + // note that if op_ is a child of SPF, param store can be + // filled with a new group before we finish processing the + // current group. in that case, we need to discard the old + // group and switch to the new batch. + ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(*ctx_); + for (int64_t i = 0; !need_rescan && i < left_rescan_params_->count(); i++) { + int64_t param_idx = left_rescan_params_->at(i).param_idx_; + if (plan_ctx->get_param_store().at(param_idx).is_ext_sql_array()) { + need_rescan = true; + } + } + } else { + need_rescan = true; + } + } + if (OB_SUCC(ret)) { + if (need_rescan) { + if (OB_FAIL(set_above_group_size())) { + LOG_WARN("set above group size failed", KR(ret)); + } else if (OB_FAIL(left_->rescan())) { + LOG_WARN("rescan left failed", KR(ret), + K(left_->get_spec().get_id()), K(left_->op_name())); + } else { + is_left_end_ = false; + above_group_idx_for_expand_ = 0; + above_group_idx_for_read_ = 0; + reset_buffer_state(); + } + } else { + if (OB_FAIL(drain_left())) { + LOG_WARN("drain left failed", KR(ret)); + } else { + is_left_end_ = false; + above_group_idx_for_read_++; } } - } else { - need_rescan = true; - } - if (OB_FAIL(ret)) { - // do nothing - } else if (need_rescan) { - if (OB_FAIL(set_above_group_size())) { - LOG_WARN("set above group size failed", KR(ret)); - } else if (OB_FAIL(left_->rescan())) { - LOG_WARN("rescan left failed", KR(ret), - K(left_->get_spec().get_id()), K(left_->op_name())); - } else { - is_left_end_ = false; - above_group_idx_for_expand_ = 0; - above_group_idx_for_read_ = 0; - } - } else { - is_left_end_ = false; - above_group_idx_for_read_++; } return ret; } diff --git a/src/sql/engine/basic/ob_group_join_buffer.h b/src/sql/engine/basic/ob_group_join_buffer.h index 9c3ad6327..2fdd6d821 100644 --- a/src/sql/engine/basic/ob_group_join_buffer.h +++ b/src/sql/engine/basic/ob_group_join_buffer.h @@ -77,6 +77,7 @@ public: int has_next_left_row(bool &has_next); int init_above_group_params(); int fill_cur_row_group_param(); + int drain_left(); int rescan_left(); int rescan_right(); int fill_group_buffer();