[CP] do not generate origin row for no_non_distinct_aggr_ in 3-aggr

This commit is contained in:
18523270951@163.com
2023-07-19 03:42:12 +00:00
committed by ob-robot
parent 32636e05c4
commit 08373e0dd0
2 changed files with 92 additions and 65 deletions

View File

@ -2463,46 +2463,61 @@ int ObHashGroupByOp::load_one_row()
bool last_group = false;
bool insert_group_ht = false;
clear_evaluated_flag();
if (OB_ISNULL(last_child_row_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("last child row not init", K(ret));
} else if (ObThreeStageAggrStage::FIRST_STAGE == MY_SPEC.aggr_stage_
&& by_pass_nth_group_ <= MY_SPEC.dist_col_group_idxs_.count()
&& by_pass_nth_group_ > 0) {
// next permutation
if (OB_ISNULL(last_child_row_->store_row_)) {
bool got_row = false;
while (OB_SUCC(ret) && !got_row) {
if (OB_ISNULL(last_child_row_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get last store row", K(ret));
} else if (OB_FAIL(last_child_row_->store_row_->to_expr(child_->get_spec().output_, eval_ctx_))) {
LOG_WARN("last child row not init", K(ret));
} else if (ObThreeStageAggrStage::FIRST_STAGE == MY_SPEC.aggr_stage_
&& by_pass_nth_group_ <= MY_SPEC.dist_col_group_idxs_.count()
&& by_pass_nth_group_ > 0) {
// next permutation
if (OB_ISNULL(last_child_row_->store_row_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get last store row", K(ret));
} else if (OB_FAIL(last_child_row_->store_row_->to_expr(child_->get_spec().output_, eval_ctx_))) {
LOG_WARN("failed to restore last row", K(ret));
} else if (OB_FAIL(by_pass_get_next_permutation(by_pass_nth_group_, last_group, insert_group_ht))) {
LOG_WARN("failed to get next permutation row", K(ret));
} else {
if (no_non_distinct_aggr_ && last_group) {
got_row = false;
} else {
got_row = true;
}
}
} else if (FALSE_IT(by_pass_nth_group_ = 0)) {
} else if (nullptr != last_child_row_->store_row_
&& OB_FAIL(last_child_row_->store_row_->to_expr(child_->get_spec().output_, eval_ctx_))) {
LOG_WARN("failed to restore last row", K(ret));
} else if (OB_FAIL(child_->get_next_row())) {
if (OB_UNLIKELY(OB_ITER_END != ret)) {
LOG_WARN("failed to get next row", K(ret));
}
} else if (OB_FAIL(last_child_row_->save_store_row(child_->get_spec().output_, eval_ctx_))) {
LOG_WARN("failed to save last row", K(ret));
} else if (OB_FAIL(try_check_status())) {
LOG_WARN("check status failed", K(ret));
} else if (OB_FAIL(by_pass_get_next_permutation(by_pass_nth_group_, last_group, insert_group_ht))) {
LOG_WARN("failed to get next permutation row", K(ret));
} else {
if (no_non_distinct_aggr_ && last_group) {
got_row = false;
} else {
got_row = true;
}
}
} else if (FALSE_IT(by_pass_nth_group_ = 0)) {
} else if (nullptr != last_child_row_->store_row_
&& OB_FAIL(last_child_row_->store_row_->to_expr(child_->get_spec().output_, eval_ctx_))) {
LOG_WARN("failed to restore last row", K(ret));
} else if (OB_FAIL(child_->get_next_row())) {
if (OB_UNLIKELY(OB_ITER_END != ret)) {
LOG_WARN("failed to get next row", K(ret));
}
} else if (OB_FAIL(last_child_row_->save_store_row(child_->get_spec().output_, eval_ctx_))) {
LOG_WARN("failed to save last row", K(ret));
} else if (OB_FAIL(try_check_status())) {
LOG_WARN("check status failed", K(ret));
} else if (OB_FAIL(by_pass_get_next_permutation(by_pass_nth_group_, last_group, insert_group_ht))) {
LOG_WARN("failed to get next permutation row", K(ret));
}
if (OB_FAIL(ret) || no_non_distinct_aggr_) {
} else if (OB_ISNULL(by_pass_group_row_)
|| OB_ISNULL(by_pass_group_row_->group_row_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("by pass group row is not init", K(ret));
} else {
++agged_row_cnt_;
++agged_group_cnt_;
if (OB_FAIL(aggr_processor_.single_row_agg(*by_pass_group_row_->group_row_, eval_ctx_))) {
LOG_WARN("failed to do single row agg", K(ret));
if (OB_FAIL(ret) || no_non_distinct_aggr_) {
} else if (OB_ISNULL(by_pass_group_row_)
|| OB_ISNULL(by_pass_group_row_->group_row_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("by pass group row is not init", K(ret));
} else {
++agged_row_cnt_;
++agged_group_cnt_;
if (OB_FAIL(aggr_processor_.single_row_agg(*by_pass_group_row_->group_row_, eval_ctx_))) {
LOG_WARN("failed to do single row agg", K(ret));
}
}
}
return ret;
@ -2520,7 +2535,9 @@ int ObHashGroupByOp::by_pass_prepare_one_batch(const int64_t batch_size)
//consume curr batch
if (OB_FAIL(by_pass_brs_holder_.restore())) {
LOG_WARN("failed to restore child batch", K(ret));
} else if (OB_FAIL(by_pass_get_next_permutation_batch(by_pass_nth_group_, last_group, by_pass_child_brs_, insert_group_ht))) {
} else if (OB_FAIL(by_pass_get_next_permutation_batch(by_pass_nth_group_, last_group,
by_pass_child_brs_, brs_,
insert_group_ht))) {
LOG_WARN("failed to get next permutation row", K(ret));
}
} else if (FALSE_IT(by_pass_nth_group_ = 0)) {
@ -2536,7 +2553,9 @@ int ObHashGroupByOp::by_pass_prepare_one_batch(const int64_t batch_size)
LOG_WARN("failed to save child batch", K(ret));
} else if (OB_FAIL(try_check_status())) {
LOG_WARN("check status failed", K(ret));
} else if (OB_FAIL(by_pass_get_next_permutation_batch(by_pass_nth_group_, last_group, by_pass_child_brs_, insert_group_ht))) {
} else if (OB_FAIL(by_pass_get_next_permutation_batch(by_pass_nth_group_, last_group,
by_pass_child_brs_, brs_,
insert_group_ht))) {
LOG_WARN("failed to get next permutation row", K(ret));
}
if (OB_FAIL(ret) || no_non_distinct_aggr_) {
@ -2544,22 +2563,18 @@ int ObHashGroupByOp::by_pass_prepare_one_batch(const int64_t batch_size)
|| by_pass_batch_size_ <= 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("by pass group row is not init", K(ret), K(by_pass_batch_size_));
} else if (OB_FAIL(aggr_processor_.eval_aggr_param_batch(*by_pass_child_brs_))) {
LOG_WARN("fail to eval aggr param batch", K(ret), K(*by_pass_child_brs_));
} else if (OB_FAIL(aggr_processor_.eval_aggr_param_batch(brs_))) {
LOG_WARN("fail to eval aggr param batch", K(ret), K(brs_));
} else {
if (OB_FAIL(aggr_processor_.single_row_agg_batch(by_pass_group_batch_, eval_ctx_,
by_pass_child_brs_->size_, by_pass_child_brs_->skip_))) {
brs_.size_, brs_.skip_))) {
LOG_WARN("failed to single row agg", K(ret));
} else {
int64_t aggr_cnt = by_pass_child_brs_->size_ - by_pass_child_brs_->skip_->accumulate_bit_cnt(by_pass_child_brs_->size_);
int64_t aggr_cnt = brs_.size_ - brs_.skip_->accumulate_bit_cnt(brs_.size_);
agged_row_cnt_ += aggr_cnt;
agged_group_cnt_ += aggr_cnt;
}
}
if (OB_SUCC(ret)) {
brs_.size_ = by_pass_child_brs_->size_;
brs_.skip_->deep_copy(*by_pass_child_brs_->skip_, by_pass_child_brs_->size_);
}
return ret;
}
@ -2588,35 +2603,44 @@ int ObHashGroupByOp::by_pass_get_next_permutation(int64_t &nth_group, bool &last
}
int ObHashGroupByOp::by_pass_get_next_permutation_batch(int64_t &nth_group, bool &last_group,
const ObBatchRows *child_brs, bool &insert_group_ht)
const ObBatchRows *child_brs,
ObBatchRows &my_brs,
bool &insert_group_ht)
{
int ret = OB_SUCCESS;
if (ObThreeStageAggrStage::NONE_STAGE == MY_SPEC.aggr_stage_) {
CK (OB_NOT_NULL(child_brs));
OX (my_brs.size_ = child_brs->size_);
OX (my_brs.skip_->deep_copy(*child_brs->skip_, child_brs->size_));
if (OB_FAIL(ret)) {
} else if (ObThreeStageAggrStage::NONE_STAGE == MY_SPEC.aggr_stage_) {
last_group = true;
} else if (OB_FAIL(next_duplicate_data_permutation(nth_group, last_group, child_brs, insert_group_ht))) {
LOG_WARN("failed to get next permutation", K(ret));
} else {
CK (dup_groupby_exprs_.count() == all_groupby_exprs_.count());
CK (OB_NOT_NULL(child_brs));
for (int64_t i = 0; OB_SUCC(ret) && i < dup_groupby_exprs_.count(); ++i) {
ObDatum *datum = nullptr;
if (nullptr == dup_groupby_exprs_.at(i)) {
for (int64_t j = 0; j < child_brs->size_; ++j) {
if (child_brs->skip_->at(j)) {
continue;
if (no_non_distinct_aggr_ && last_group) {
my_brs.skip_->set_all(child_brs->size_);
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < dup_groupby_exprs_.count(); ++i) {
ObDatum *datum = nullptr;
if (nullptr == dup_groupby_exprs_.at(i)) {
for (int64_t j = 0; j < child_brs->size_; ++j) {
if (child_brs->skip_->at(j)) {
continue;
}
all_groupby_exprs_.at(i)->locate_expr_datum(eval_ctx_, j).set_null();
}
all_groupby_exprs_.at(i)->locate_expr_datum(eval_ctx_, j).set_null();
}
} else if (OB_FAIL(dup_groupby_exprs_.at(i)->eval_batch(eval_ctx_, *child_brs->skip_, child_brs->size_))) {
LOG_WARN("failed to eval dup exprs", K(ret), K(i));
} else {
for (int64_t j = 0; j < child_brs->size_; ++j) {
if (child_brs->skip_->at(j)) {
continue;
} else if (OB_FAIL(dup_groupby_exprs_.at(i)->eval_batch(eval_ctx_, *child_brs->skip_, child_brs->size_))) {
LOG_WARN("failed to eval dup exprs", K(ret), K(i));
} else {
for (int64_t j = 0; j < child_brs->size_; ++j) {
if (child_brs->skip_->at(j)) {
continue;
}
all_groupby_exprs_.at(i)->locate_expr_datum(eval_ctx_, j).set_datum(dup_groupby_exprs_.at(i)->locate_expr_datum(eval_ctx_, j));
}
all_groupby_exprs_.at(i)->locate_expr_datum(eval_ctx_, j).set_datum(dup_groupby_exprs_.at(i)->locate_expr_datum(eval_ctx_, j));
all_groupby_exprs_.at(i)->set_evaluated_projected(eval_ctx_);
}
all_groupby_exprs_.at(i)->set_evaluated_projected(eval_ctx_);
}
}
}

View File

@ -518,7 +518,10 @@ private:
private:
int by_pass_prepare_one_batch(const int64_t batch_size);
int by_pass_get_next_permutation(int64_t &nth_group, bool &last_group, bool &insert_group_ht);
int by_pass_get_next_permutation_batch(int64_t &nth_group, bool &last_group, const ObBatchRows *child_brs, bool &insert_group_ht);
int by_pass_get_next_permutation_batch(int64_t &nth_group, bool &last_group,
const ObBatchRows *child_brs,
ObBatchRows &my_brs,
bool &insert_group_ht);
int init_by_pass_op();
// Alloc one batch group_row_item at a time
static const int64_t BATCH_GROUP_ITEM_SIZE = 16;