[CP] Fix merge join memory expand during subplan filter rescan
This commit is contained in:
@ -963,28 +963,22 @@ int ObMergeJoinOp::ChildBatchFetcher::init(
|
||||
int ObMergeJoinOp::ChildBatchFetcher::get_next_batch(const int64_t max_row_cnt)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(0 != backup_rows_cnt_)) {
|
||||
const int64_t remain_backup_rows = backup_rows_cnt_ - backup_rows_used_;
|
||||
if (OB_UNLIKELY(0 != remain_backup_rows)) {
|
||||
if (OB_UNLIKELY(backup_datums_.count() != all_exprs_->count())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("store datums cnt and child output cnt not equal", K(ret),
|
||||
K(backup_datums_.count()), K(all_exprs_->count()));
|
||||
} else {
|
||||
const int64_t restore_cnt = MIN(max_row_cnt, backup_rows_cnt_);
|
||||
const int64_t restore_cnt = MIN(max_row_cnt, remain_backup_rows);
|
||||
for (int64_t i = 0; i < backup_datums_.count(); i++) {
|
||||
ObDatum *datum = all_exprs_->at(i)->locate_batch_datums(merge_join_op_.eval_ctx_);
|
||||
MEMCPY(datum, backup_datums_.at(i), sizeof(ObDatum) * restore_cnt);
|
||||
MEMCPY(datum, backup_datums_.at(i) + backup_rows_used_, sizeof(ObDatum) * restore_cnt);
|
||||
}
|
||||
brs_.size_ = restore_cnt;
|
||||
brs_.end_ = false;
|
||||
brs_.skip_ = NULL;
|
||||
backup_rows_cnt_ -= restore_cnt;
|
||||
if (OB_LIKELY(0 == backup_rows_cnt_)) {
|
||||
backup_datums_.reset();
|
||||
} else {
|
||||
for (int64_t i = 0; i < backup_datums_.count(); i++) {
|
||||
backup_datums_.at(i) = backup_datums_.at(i) + restore_cnt;
|
||||
}
|
||||
}
|
||||
backup_rows_used_ += restore_cnt;
|
||||
}
|
||||
} else {
|
||||
if (brs_.end_) {
|
||||
@ -1021,21 +1015,36 @@ int ObMergeJoinOp::ChildBatchFetcher::backup_remain_rows()
|
||||
if (OB_UNLIKELY(cur_idx_ >= brs_.size_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("no remain rows", K(ret), K(cur_idx_), K(brs_.size_));
|
||||
} else {
|
||||
int64_t alloc_size = sizeof(ObDatum) * (brs_.size_ - cur_idx_);
|
||||
} else if (backup_datums_.empty()) {
|
||||
int64_t alloc_size = sizeof(ObDatum) * merge_join_op_.spec_.max_batch_size_;
|
||||
for (int64_t i = 0; i < all_exprs_->count() && OB_SUCC(ret); i++) {
|
||||
backup_rows_cnt_ = 0;
|
||||
ObDatum *datum = NULL;
|
||||
ObDatumVector src_datum = all_exprs_->at(i)->locate_expr_datumvector(merge_join_op_.eval_ctx_);
|
||||
if (OB_ISNULL(datum = static_cast<ObDatum *>(allocator.alloc(alloc_size)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("allocate memory failed", K(ret));
|
||||
} else if (OB_FAIL(backup_datums_.push_back(datum))) {
|
||||
LOG_WARN("push back failed", K(ret));
|
||||
} else {
|
||||
for (int64_t j = cur_idx_; j < brs_.size_ && OB_SUCC(ret); j++) {
|
||||
if (!brs_.skip_->contain(j)) {
|
||||
datum[backup_rows_cnt_++] = *src_datum.at(j);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_UNLIKELY(all_exprs_->count() != backup_datums_.count())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("count mismatch", K(ret), K(all_exprs_->count()), K(backup_datums_.count()));
|
||||
} else {
|
||||
for (int64_t i = 0; i < all_exprs_->count() && OB_SUCC(ret); i++) {
|
||||
backup_rows_cnt_ = 0;
|
||||
backup_rows_used_ = 0;
|
||||
ObDatumVector src_datum = all_exprs_->at(i)->locate_expr_datumvector(merge_join_op_.eval_ctx_);
|
||||
ObDatum *datum = backup_datums_.at(i);
|
||||
if (OB_ISNULL(datum)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("backup datums memory is null", K(ret), K(i), K(all_exprs_->count()));
|
||||
} else {
|
||||
for (int64_t j = cur_idx_; j < brs_.size_ && OB_SUCC(ret); j++) {
|
||||
if (!brs_.skip_->contain(j)) {
|
||||
datum[backup_rows_cnt_++] = *src_datum.at(j);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -214,7 +214,7 @@ private:
|
||||
cur_idx_(0), brs_(), batch_size_(0), child_(NULL),
|
||||
match_groups_(match_groups), merge_join_op_(merge_join_op),
|
||||
all_exprs_(NULL), datum_store_(), backup_datums_(),
|
||||
backup_rows_cnt_(0), brs_holder_(),
|
||||
backup_rows_cnt_(0), backup_rows_used_(0), brs_holder_(),
|
||||
equal_param_idx_(allocator)
|
||||
{}
|
||||
int init(const uint64_t tenant_id, bool is_left, ObOperator *child,
|
||||
@ -242,6 +242,7 @@ private:
|
||||
datum_store_.reuse();
|
||||
backup_datums_.reuse();
|
||||
backup_rows_cnt_ = 0;
|
||||
backup_rows_used_ = 0;
|
||||
brs_holder_.reset();
|
||||
}
|
||||
// for destroy
|
||||
@ -254,6 +255,7 @@ private:
|
||||
datum_store_.reset();
|
||||
backup_datums_.reset();
|
||||
backup_rows_cnt_ = 0;
|
||||
backup_rows_used_ = 0;
|
||||
brs_holder_.reset();
|
||||
}
|
||||
int64_t cur_idx_;
|
||||
@ -269,6 +271,7 @@ private:
|
||||
// We need store these rows and output first, then get batch from child and output directly.
|
||||
ObSEArray<ObDatum *, 256> backup_datums_;
|
||||
int64_t backup_rows_cnt_;
|
||||
int64_t backup_rows_used_;
|
||||
ObBatchResultHolder brs_holder_;
|
||||
|
||||
common::ObFixedArray<int64_t, common::ObIAllocator> equal_param_idx_;
|
||||
|
||||
Reference in New Issue
Block a user