diff --git a/src/sql/engine/join/ob_merge_join_op.cpp b/src/sql/engine/join/ob_merge_join_op.cpp index aaf05a20a6..a59ff0ec28 100644 --- a/src/sql/engine/join/ob_merge_join_op.cpp +++ b/src/sql/engine/join/ob_merge_join_op.cpp @@ -963,28 +963,22 @@ int ObMergeJoinOp::ChildBatchFetcher::init( int ObMergeJoinOp::ChildBatchFetcher::get_next_batch(const int64_t max_row_cnt) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(0 != backup_rows_cnt_)) { + const int64_t remain_backup_rows = backup_rows_cnt_ - backup_rows_used_; + if (OB_UNLIKELY(0 != remain_backup_rows)) { if (OB_UNLIKELY(backup_datums_.count() != all_exprs_->count())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("store datums cnt and child output cnt not equal", K(ret), K(backup_datums_.count()), K(all_exprs_->count())); } else { - const int64_t restore_cnt = MIN(max_row_cnt, backup_rows_cnt_); + const int64_t restore_cnt = MIN(max_row_cnt, remain_backup_rows); for (int64_t i = 0; i < backup_datums_.count(); i++) { ObDatum *datum = all_exprs_->at(i)->locate_batch_datums(merge_join_op_.eval_ctx_); - MEMCPY(datum, backup_datums_.at(i), sizeof(ObDatum) * restore_cnt); + MEMCPY(datum, backup_datums_.at(i) + backup_rows_used_, sizeof(ObDatum) * restore_cnt); } brs_.size_ = restore_cnt; brs_.end_ = false; brs_.skip_ = NULL; - backup_rows_cnt_ -= restore_cnt; - if (OB_LIKELY(0 == backup_rows_cnt_)) { - backup_datums_.reset(); - } else { - for (int64_t i = 0; i < backup_datums_.count(); i++) { - backup_datums_.at(i) = backup_datums_.at(i) + restore_cnt; - } - } + backup_rows_used_ += restore_cnt; } } else { if (brs_.end_) { @@ -1021,21 +1015,36 @@ int ObMergeJoinOp::ChildBatchFetcher::backup_remain_rows() if (OB_UNLIKELY(cur_idx_ >= brs_.size_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("no remain rows", K(ret), K(cur_idx_), K(brs_.size_)); - } else { - int64_t alloc_size = sizeof(ObDatum) * (brs_.size_ - cur_idx_); + } else if (backup_datums_.empty()) { + int64_t alloc_size = sizeof(ObDatum) * merge_join_op_.spec_.max_batch_size_; for (int64_t i = 0; i < all_exprs_->count() && OB_SUCC(ret); i++) { - backup_rows_cnt_ = 0; ObDatum *datum = NULL; - ObDatumVector src_datum = all_exprs_->at(i)->locate_expr_datumvector(merge_join_op_.eval_ctx_); if (OB_ISNULL(datum = static_cast(allocator.alloc(alloc_size)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("allocate memory failed", K(ret)); } else if (OB_FAIL(backup_datums_.push_back(datum))) { LOG_WARN("push back failed", K(ret)); - } else { - for (int64_t j = cur_idx_; j < brs_.size_ && OB_SUCC(ret); j++) { - if (!brs_.skip_->contain(j)) { - datum[backup_rows_cnt_++] = *src_datum.at(j); + } + } + } + if (OB_SUCC(ret)) { + if (OB_UNLIKELY(all_exprs_->count() != backup_datums_.count())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("count mismatch", K(ret), K(all_exprs_->count()), K(backup_datums_.count())); + } else { + for (int64_t i = 0; i < all_exprs_->count() && OB_SUCC(ret); i++) { + backup_rows_cnt_ = 0; + backup_rows_used_ = 0; + ObDatumVector src_datum = all_exprs_->at(i)->locate_expr_datumvector(merge_join_op_.eval_ctx_); + ObDatum *datum = backup_datums_.at(i); + if (OB_ISNULL(datum)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("backup datums memory is null", K(ret), K(i), K(all_exprs_->count())); + } else { + for (int64_t j = cur_idx_; j < brs_.size_ && OB_SUCC(ret); j++) { + if (!brs_.skip_->contain(j)) { + datum[backup_rows_cnt_++] = *src_datum.at(j); + } } } } diff --git a/src/sql/engine/join/ob_merge_join_op.h b/src/sql/engine/join/ob_merge_join_op.h index e8fa59216f..6c83399dd4 100644 --- a/src/sql/engine/join/ob_merge_join_op.h +++ b/src/sql/engine/join/ob_merge_join_op.h @@ -214,7 +214,7 @@ private: cur_idx_(0), brs_(), batch_size_(0), child_(NULL), match_groups_(match_groups), merge_join_op_(merge_join_op), all_exprs_(NULL), datum_store_(), backup_datums_(), - backup_rows_cnt_(0), brs_holder_(), + backup_rows_cnt_(0), backup_rows_used_(0), brs_holder_(), equal_param_idx_(allocator) {} int init(const uint64_t tenant_id, bool is_left, ObOperator *child, @@ -242,6 +242,7 @@ private: datum_store_.reuse(); backup_datums_.reuse(); backup_rows_cnt_ = 0; + backup_rows_used_ = 0; brs_holder_.reset(); } // for destroy @@ -254,6 +255,7 @@ private: datum_store_.reset(); backup_datums_.reset(); backup_rows_cnt_ = 0; + backup_rows_used_ = 0; brs_holder_.reset(); } int64_t cur_idx_; @@ -269,6 +271,7 @@ private: // We need store these rows and output first, then get batch from child and output directly. ObSEArray backup_datums_; int64_t backup_rows_cnt_; + int64_t backup_rows_used_; ObBatchResultHolder brs_holder_; common::ObFixedArray equal_param_idx_;