fix wrong output bug for group join buffer

This commit is contained in:
shibin-xu 2023-05-10 05:51:43 +00:00 committed by ob-robot
parent 7deb484afd
commit 6a42a36864
2 changed files with 103 additions and 31 deletions

View File

@ -333,43 +333,114 @@ int ObGroupJoinBufffer::get_next_left_iter()
return ret;
}
int ObGroupJoinBufffer::drain_left()
{
int ret = OB_SUCCESS;
bool need_drain = !left_store_group_idx_.empty();
const ObChunkDatumStore::StoredRow *row = NULL;
// drain old rows from left store
for (int64_t i = left_store_read_; OB_SUCC(ret) && need_drain && i < left_store_group_idx_.count(); ++i) {
if (above_group_idx_for_read_ != left_store_group_idx_.at(i)) {
need_drain = false;
} else if (OB_FAIL(left_store_iter_.get_next_row(row))) {
ret = (OB_ITER_END == ret) ? OB_ERR_UNEXPECTED : ret;
LOG_WARN("get next row failed", KR(ret));
} else {
++left_store_read_;
}
}
// discard unread rows from left op
if (OB_SUCC(ret) && need_drain && !is_left_end_) {
if (!spec_->is_vectorized()) {
while (OB_SUCC(ret)) {
op_->clear_evaluated_flag();
if (!rescan_params_->empty()) {
op_->set_pushdown_param_null(*rescan_params_);
}
ret = left_->get_next_row();
}
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
is_left_end_ = true;
} else {
LOG_WARN("get next left row failed", KR(ret));
}
} else {
const ObBatchRows *batch_rows = NULL;
int64_t max_row_cnt = min(max_group_size_, spec_->max_batch_size_);
if (save_last_batch_) {
last_batch_.to_exprs(*eval_ctx_);
save_last_batch_ = false;
}
batch_rows = &left_->get_brs();
while (OB_SUCC(ret) && !batch_rows->end_) {
if (!rescan_params_->empty()) {
op_->set_pushdown_param_null(*rescan_params_);
}
if (OB_FAIL(left_->get_next_batch(max_row_cnt, batch_rows))) {
LOG_WARN("get next batch from left failed", KR(ret));
}
}
if (OB_SUCC(ret) && batch_rows->end_) {
is_left_end_ = true;
}
if (OB_SUCC(ret)) {
if (!(batch_rows->size_ == 0 && batch_rows->end_)) {
last_batch_.from_exprs(*eval_ctx_, batch_rows->skip_, spec_->max_batch_size_);
save_last_batch_ = true;
}
const_cast<ObBatchRows *&>(batch_rows)->end_ = false;
}
}
}
return ret;
}
int ObGroupJoinBufffer::rescan_left()
{
int ret = OB_SUCCESS;
bool need_rescan = false;
if (is_multi_level_) {
// for multi level group rescan, we only rescan left if
// there is a new group of params for left child.
// note that if op_ is a child of SPF, param store can be
// filled with a new group before we finish processing the
// current group. in that case, we need to discard the old
// group and switch to the new batch.
ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(*ctx_);
for (int64_t i = 0; !need_rescan && i < left_rescan_params_->count(); i++) {
int64_t param_idx = left_rescan_params_->at(i).param_idx_;
if (plan_ctx->get_param_store().at(param_idx).is_ext_sql_array()) {
need_rescan = true;
const ObChunkDatumStore::StoredRow *row = NULL;
if (OB_SUCC(ret)) {
if (is_multi_level_) {
// for multi level group rescan, we only rescan left if
// there is a new group of params for left child.
// note that if op_ is a child of SPF, param store can be
// filled with a new group before we finish processing the
// current group. in that case, we need to discard the old
// group and switch to the new batch.
ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(*ctx_);
for (int64_t i = 0; !need_rescan && i < left_rescan_params_->count(); i++) {
int64_t param_idx = left_rescan_params_->at(i).param_idx_;
if (plan_ctx->get_param_store().at(param_idx).is_ext_sql_array()) {
need_rescan = true;
}
}
} else {
need_rescan = true;
}
}
if (OB_SUCC(ret)) {
if (need_rescan) {
if (OB_FAIL(set_above_group_size())) {
LOG_WARN("set above group size failed", KR(ret));
} else if (OB_FAIL(left_->rescan())) {
LOG_WARN("rescan left failed", KR(ret),
K(left_->get_spec().get_id()), K(left_->op_name()));
} else {
is_left_end_ = false;
above_group_idx_for_expand_ = 0;
above_group_idx_for_read_ = 0;
reset_buffer_state();
}
} else {
if (OB_FAIL(drain_left())) {
LOG_WARN("drain left failed", KR(ret));
} else {
is_left_end_ = false;
above_group_idx_for_read_++;
}
}
} else {
need_rescan = true;
}
if (OB_FAIL(ret)) {
// do nothing
} else if (need_rescan) {
if (OB_FAIL(set_above_group_size())) {
LOG_WARN("set above group size failed", KR(ret));
} else if (OB_FAIL(left_->rescan())) {
LOG_WARN("rescan left failed", KR(ret),
K(left_->get_spec().get_id()), K(left_->op_name()));
} else {
is_left_end_ = false;
above_group_idx_for_expand_ = 0;
above_group_idx_for_read_ = 0;
}
} else {
is_left_end_ = false;
above_group_idx_for_read_++;
}
return ret;
}

View File

@ -77,6 +77,7 @@ public:
int has_next_left_row(bool &has_next);
int init_above_group_params();
int fill_cur_row_group_param();
int drain_left();
int rescan_left();
int rescan_right();
int fill_group_buffer();