fix for implicir aggr: if has 3stage implicit aggr, need gen last group data in first stage
This commit is contained in:
parent
8c1a4fbb58
commit
be0196308e
@ -56,8 +56,7 @@ public:
|
||||
int finish_adding_one_row();
|
||||
|
||||
inline int add_one_row(const int32_t start_agg_id, const int32_t end_agg_id, AggrRowPtr row,
|
||||
const int64_t batch_idx, const int64_t batch_size, ObIVector **aggr_vectors,
|
||||
ObFixedArray<int64_t, common::ObIAllocator> implicit_aggr_in_3stage_indexes)
|
||||
const int64_t batch_idx, const int64_t batch_size, ObIVector **aggr_vectors)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObIVector *data_vec = nullptr;
|
||||
@ -69,15 +68,6 @@ public:
|
||||
SQL_LOG(WARN, "add one row failed", K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; OB_SUCC(ret) && i < implicit_aggr_in_3stage_indexes.count(); i++) {
|
||||
int col_id = implicit_aggr_in_3stage_indexes.at(i);
|
||||
add_one_row_fn fn = add_one_row_fns_.at(col_id);
|
||||
if (OB_FAIL(
|
||||
fn(aggregates_.at(col_id), agg_ctx_, col_id, row, aggr_vectors[col_id], batch_idx, batch_size))) {
|
||||
SQL_LOG(WARN, "add one row failed", K(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -7652,45 +7652,26 @@ int ObStaticEngineCG::fill_aggr_infos(ObLogGroupBy &op,
|
||||
}
|
||||
//6.calc for implicit_aggr, if aggr_info.expr_ only in third stage, not in second stage,
|
||||
// must be calc in third stage.Normally caused by implicit aggr in filter.
|
||||
if (spec.aggr_stage_ == ObThreeStageAggrStage::THIRD_STAGE) {
|
||||
if (all_non_aggr_exprs.count() > 0 && spec.aggr_stage_ == ObThreeStageAggrStage::THIRD_STAGE) {
|
||||
ObOpSpec *child_spec = &spec;
|
||||
bool find_second_spec = false;
|
||||
while (!find_second_spec && child_spec->get_children() != NULL
|
||||
bool find_first_spec = false;
|
||||
while (!find_first_spec && child_spec->get_children() != NULL
|
||||
&& child_spec->get_child_cnt() > 0) {
|
||||
if ((child_spec->type_ == PHY_VEC_HASH_GROUP_BY ||
|
||||
child_spec->type_ == PHY_HASH_GROUP_BY ||
|
||||
child_spec->type_ == PHY_VEC_MERGE_GROUP_BY ||
|
||||
child_spec->type_ == PHY_MERGE_GROUP_BY) &&
|
||||
((ObGroupBySpec*)child_spec)->aggr_stage_ == ObThreeStageAggrStage::SECOND_STAGE) {
|
||||
find_second_spec = true;
|
||||
((ObGroupBySpec*)child_spec)->aggr_stage_ == ObThreeStageAggrStage::FIRST_STAGE) {
|
||||
find_first_spec = true;
|
||||
} else {
|
||||
child_spec = child_spec->get_children()[0];
|
||||
}
|
||||
}
|
||||
if (find_second_spec) {
|
||||
if (OB_FAIL(spec.implicit_aggr_in_3stage_indexes_.prepare_allocate_and_keep_count(
|
||||
spec.aggr_infos_.count()))) {
|
||||
OB_LOG(WARN, "fail to prepare_allocate implicit_aggr_in_3stage_indexes_", K(ret));
|
||||
} else {
|
||||
ObGroupBySpec *second_stage_spec = (ObGroupBySpec*)child_spec;
|
||||
for (int i = 0; i < spec.aggr_infos_.count(); i++) {
|
||||
if (spec.aggr_infos_.at(i).is_implicit_first_aggr()) {
|
||||
bool exist_in_second_stage = false;
|
||||
for (int j = 0; !exist_in_second_stage && j < second_stage_spec->aggr_infos_.count(); j++) {
|
||||
if (spec.aggr_infos_.at(i).expr_ == second_stage_spec->aggr_infos_.at(j).expr_) {
|
||||
exist_in_second_stage = true;
|
||||
}
|
||||
}
|
||||
if (!exist_in_second_stage) {
|
||||
spec.implicit_aggr_in_3stage_indexes_.push_back(i);
|
||||
LOG_TRACE("find implicit aggr need calc in 3stage", K(i));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (find_first_spec) {
|
||||
((ObGroupBySpec*)child_spec)->need_last_group_in_3stage_ = true;
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("cannot find second stage hashgroupby op", K(ret), K(find_second_spec));
|
||||
LOG_WARN("cannot find first stage hashgroupby op", K(ret), K(find_first_spec));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
@ -36,7 +36,8 @@ OB_SERIALIZE_MEMBER((ObGroupBySpec, ObOpSpec),
|
||||
support_fast_single_row_agg_,
|
||||
skew_detection_enabled_, // FARM COMPAT WHITELIST
|
||||
llc_ndv_est_enabled_,
|
||||
implicit_aggr_in_3stage_indexes_);
|
||||
implicit_aggr_in_3stage_indexes_,
|
||||
need_last_group_in_3stage_);
|
||||
|
||||
DEF_TO_STRING(ObGroupBySpec)
|
||||
{
|
||||
|
@ -38,7 +38,8 @@ public:
|
||||
support_fast_single_row_agg_(false),
|
||||
skew_detection_enabled_(false),
|
||||
llc_ndv_est_enabled_(false),
|
||||
implicit_aggr_in_3stage_indexes_(alloc)
|
||||
implicit_aggr_in_3stage_indexes_(alloc),
|
||||
need_last_group_in_3stage_(false)
|
||||
{
|
||||
}
|
||||
DECLARE_VIRTUAL_TO_STRING;
|
||||
@ -64,6 +65,7 @@ public:
|
||||
bool skew_detection_enabled_;
|
||||
bool llc_ndv_est_enabled_;
|
||||
ObFixedArray<int64_t, common::ObIAllocator> implicit_aggr_in_3stage_indexes_;
|
||||
bool need_last_group_in_3stage_;
|
||||
};
|
||||
|
||||
//modifiable
|
||||
|
@ -327,7 +327,7 @@ int ObHashGroupByOp::inner_open()
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (ObThreeStageAggrStage::FIRST_STAGE == MY_SPEC.aggr_stage_) {
|
||||
no_non_distinct_aggr_ = (0 == MY_SPEC.aggr_infos_.count());
|
||||
can_skip_last_group_ = (0 == MY_SPEC.aggr_infos_.count() && !MY_SPEC.need_last_group_in_3stage_);
|
||||
} else if (ObThreeStageAggrStage::SECOND_STAGE == MY_SPEC.aggr_stage_) {
|
||||
if (OB_FAIL(append(distinct_origin_exprs_, MY_SPEC.group_exprs_))) {
|
||||
LOG_WARN("failed to append distinct_origin_exprs", K(ret));
|
||||
@ -659,7 +659,7 @@ int ObHashGroupByOp::next_duplicate_data_permutation(
|
||||
LOG_DEBUG("debug write aggr code", K(ret), K(aggr_code), K(first_idx));
|
||||
}
|
||||
LOG_DEBUG("debug write aggr code", K(ret), K(last_group), K(nth_group), K(first_idx),
|
||||
K(no_non_distinct_aggr_), K(start_idx), K(end_idx));
|
||||
K(can_skip_last_group_), K(start_idx), K(end_idx));
|
||||
} else if (ObThreeStageAggrStage::SECOND_STAGE == MY_SPEC.aggr_stage_) {
|
||||
if (!use_distinct_data_) {
|
||||
//get the aggr_code and then insert group hash-table or insert duplicate hash-table
|
||||
@ -1031,7 +1031,7 @@ int ObHashGroupByOp::load_data()
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(next_duplicate_data_permutation(nth_dup_data, last_group, nullptr, insert_group_ht))) {
|
||||
LOG_WARN("failed to get next duplicate data purmutation", K(ret));
|
||||
} else if (last_group && no_non_distinct_aggr_) {
|
||||
} else if (last_group && can_skip_last_group_) {
|
||||
// no groupby exprs, don't calculate the last duplicate data for non-distinct aggregate
|
||||
break;
|
||||
} else if (!insert_group_ht) {
|
||||
@ -1755,7 +1755,7 @@ int ObHashGroupByOp::load_data_batch(int64_t max_row_cnt)
|
||||
loop_cnt, *child_brs, part_cnt, parts, est_part_cnt,
|
||||
bloom_filter))) {
|
||||
LOG_WARN("fail to group child batch rows", K(ret));
|
||||
} else if (no_non_distinct_aggr_) {
|
||||
} else if (can_skip_last_group_) {
|
||||
} else if (OB_FAIL(aggr_processor_.eval_aggr_param_batch(*child_brs))) {
|
||||
LOG_WARN("fail to eval aggr param batch", K(ret), K(*child_brs));
|
||||
}
|
||||
@ -2333,7 +2333,7 @@ int ObHashGroupByOp::group_child_batch_rows(const ObChunkDatumStore::StoredRow *
|
||||
part_shift, loop_cnt, child_brs, part_cnt, parts, est_part_cnt, bloom_filter,
|
||||
process_check_dump))) {
|
||||
LOG_WARN("failed to batch process duplicate data", K(ret));
|
||||
} else if (no_non_distinct_aggr_) {
|
||||
} else if (can_skip_last_group_) {
|
||||
// no groupby exprs, don't calculate the last duplicate data for non-distinct aggregate
|
||||
} else {
|
||||
if (!group_rows_arr_.is_valid_ && nullptr == store_rows) {
|
||||
@ -2604,7 +2604,7 @@ int ObHashGroupByOp::load_one_row()
|
||||
} else if (OB_FAIL(by_pass_get_next_permutation(by_pass_nth_group_, last_group, insert_group_ht))) {
|
||||
LOG_WARN("failed to get next permutation row", K(ret));
|
||||
} else {
|
||||
if (no_non_distinct_aggr_ && last_group) {
|
||||
if (can_skip_last_group_ && last_group) {
|
||||
got_row = false;
|
||||
} else {
|
||||
got_row = true;
|
||||
@ -2625,7 +2625,7 @@ int ObHashGroupByOp::load_one_row()
|
||||
} else if (OB_FAIL(by_pass_get_next_permutation(by_pass_nth_group_, last_group, insert_group_ht))) {
|
||||
LOG_WARN("failed to get next permutation row", K(ret));
|
||||
} else {
|
||||
if (no_non_distinct_aggr_ && last_group) {
|
||||
if (can_skip_last_group_ && last_group) {
|
||||
got_row = false;
|
||||
} else {
|
||||
got_row = true;
|
||||
@ -2652,7 +2652,7 @@ int ObHashGroupByOp::load_one_row()
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret) || no_non_distinct_aggr_
|
||||
if (OB_FAIL(ret) || can_skip_last_group_
|
||||
|| (MY_SPEC.skew_detection_enabled_ && is_popular_value)) {
|
||||
} else if (OB_ISNULL(by_pass_group_row_)
|
||||
|| OB_ISNULL(by_pass_group_row_->group_row_)) {
|
||||
@ -2726,7 +2726,7 @@ int ObHashGroupByOp::by_pass_prepare_one_batch(const int64_t batch_size)
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret) || no_non_distinct_aggr_) {
|
||||
if (OB_FAIL(ret) || can_skip_last_group_) {
|
||||
} else if (OB_ISNULL(by_pass_group_batch_)
|
||||
|| by_pass_batch_size_ <= 0) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -2786,7 +2786,7 @@ int ObHashGroupByOp::by_pass_get_next_permutation_batch(int64_t &nth_group, bool
|
||||
LOG_WARN("failed to get next permutation", K(ret));
|
||||
} else {
|
||||
CK (dup_groupby_exprs_.count() == all_groupby_exprs_.count());
|
||||
if (no_non_distinct_aggr_ && last_group) {
|
||||
if (can_skip_last_group_ && last_group) {
|
||||
my_brs.skip_->set_all(child_brs->size_);
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < dup_groupby_exprs_.count(); ++i) {
|
||||
|
@ -254,7 +254,7 @@ public:
|
||||
selector_array_(NULL),
|
||||
dup_groupby_exprs_(),
|
||||
is_dumped_(nullptr),
|
||||
no_non_distinct_aggr_(false),
|
||||
can_skip_last_group_(false),
|
||||
start_calc_hash_idx_(0),
|
||||
base_hash_vals_(nullptr),
|
||||
has_calc_base_hash_(false),
|
||||
@ -602,7 +602,7 @@ private:
|
||||
ObSEArray<ObExpr*, 4> dup_groupby_exprs_;
|
||||
ObSEArray<ObExpr*, 4> all_groupby_exprs_;
|
||||
bool *is_dumped_;
|
||||
bool no_non_distinct_aggr_;
|
||||
bool can_skip_last_group_;
|
||||
int64_t start_calc_hash_idx_;
|
||||
uint64_t *base_hash_vals_;
|
||||
bool has_calc_base_hash_;
|
||||
|
@ -309,7 +309,7 @@ int ObHashGroupByVecOp::inner_open()
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
if (ObThreeStageAggrStage::FIRST_STAGE == MY_SPEC.aggr_stage_) {
|
||||
no_non_distinct_aggr_ = (0 == MY_SPEC.aggr_infos_.count());
|
||||
can_skip_last_group_ = (0 == MY_SPEC.aggr_infos_.count() && !MY_SPEC.need_last_group_in_3stage_);
|
||||
} else if (ObThreeStageAggrStage::SECOND_STAGE == MY_SPEC.aggr_stage_) {
|
||||
if (OB_FAIL(append(distinct_origin_exprs_, MY_SPEC.group_exprs_))) {
|
||||
LOG_WARN("failed to append distinct_origin_exprs", K(ret));
|
||||
@ -515,7 +515,7 @@ int ObHashGroupByVecOp::next_duplicate_data_permutation(
|
||||
}
|
||||
}
|
||||
LOG_DEBUG("debug write aggr code", K(ret), K(last_group), K(nth_group), K(first_idx),
|
||||
K(no_non_distinct_aggr_), K(start_idx), K(end_idx), K(dup_groupby_exprs_));
|
||||
K(can_skip_last_group_), K(start_idx), K(end_idx), K(dup_groupby_exprs_));
|
||||
} else if (ObThreeStageAggrStage::SECOND_STAGE == MY_SPEC.aggr_stage_) {
|
||||
if (!use_distinct_data_) {
|
||||
//get the aggr_code and then insert group hash-table or insert duplicate hash-table
|
||||
@ -1347,7 +1347,7 @@ int ObHashGroupByVecOp::load_data_batch(int64_t max_row_cnt)
|
||||
loop_cnt, *child_brs, part_cnt, parts, est_part_cnt,
|
||||
bloom_filter))) {
|
||||
LOG_WARN("fail to group child batch rows", K(ret), K(start_dump));
|
||||
} else if (no_non_distinct_aggr_) {
|
||||
} else if (can_skip_last_group_) {
|
||||
} else if (OB_FAIL(aggr_processor_.eval_aggr_param_batch(*child_brs))) {
|
||||
LOG_WARN("fail to eval aggr param batch", K(ret), K(*child_brs));
|
||||
}
|
||||
@ -1398,8 +1398,7 @@ int ObHashGroupByVecOp::load_data_batch(int64_t max_row_cnt)
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(aggr_processor_.add_one_row(start_agg_id, end_agg_id,
|
||||
batch_new_rows_[i], i,
|
||||
child_brs->size_, aggr_vectors_,
|
||||
MY_SPEC.implicit_aggr_in_3stage_indexes_))) {
|
||||
child_brs->size_, aggr_vectors_))) {
|
||||
|
||||
LOG_WARN("fail to process row", K(ret));
|
||||
}
|
||||
@ -1435,8 +1434,7 @@ int ObHashGroupByVecOp::load_data_batch(int64_t max_row_cnt)
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(aggr_processor_.add_one_row(start_agg_id, end_agg_id,
|
||||
batch_old_rows_[i], i,
|
||||
child_brs->size_, aggr_vectors_,
|
||||
MY_SPEC.implicit_aggr_in_3stage_indexes_))) {
|
||||
child_brs->size_, aggr_vectors_))) {
|
||||
LOG_WARN("fail to process row", K(ret));
|
||||
}
|
||||
}
|
||||
@ -2064,7 +2062,7 @@ int ObHashGroupByVecOp::group_child_batch_rows(const ObCompactRow **store_rows,
|
||||
part_shift, loop_cnt, child_brs, part_cnt, parts, est_part_cnt, bloom_filter,
|
||||
process_check_dump))) {
|
||||
LOG_WARN("failed to batch process duplicate data", K(ret));
|
||||
} else if (no_non_distinct_aggr_) {
|
||||
} else if (can_skip_last_group_) {
|
||||
LOG_TRACE("no distinct aggr");
|
||||
// no groupby exprs, don't calculate the last duplicate data for non-distinct aggregate
|
||||
} else if ((ObThreeStageAggrStage::SECOND_STAGE == MY_SPEC.aggr_stage_ && !use_distinct_data_)
|
||||
@ -2305,8 +2303,8 @@ int ObHashGroupByVecOp::by_pass_prepare_one_batch(const int64_t batch_size)
|
||||
LOG_WARN("failed to add llc map batch", K(ret));
|
||||
}
|
||||
}
|
||||
if (last_group && no_non_distinct_aggr_) {
|
||||
// in last group and no_non_distinct_aggr_, the hash val will not be calc because skip are all true,
|
||||
if (last_group && can_skip_last_group_) {
|
||||
// in last group and can_skip_last_group_, the hash val will not be calc because skip are all true,
|
||||
// hash_val is wrong, dont do popular_process
|
||||
} else if (OB_SUCC(ret) && skew_detection_enabled_) {
|
||||
memset(static_cast<void *>(batch_old_rows_), 0,
|
||||
@ -2359,8 +2357,7 @@ int ObHashGroupByVecOp::by_pass_prepare_one_batch(const int64_t batch_size)
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(aggr_processor_.add_one_row(start_agg_id, end_agg_id,
|
||||
batch_old_rows_[i], i,
|
||||
brs_.size_, aggr_vectors_,
|
||||
MY_SPEC.implicit_aggr_in_3stage_indexes_))) {
|
||||
brs_.size_, aggr_vectors_))) {
|
||||
LOG_WARN("fail to process row", K(ret));
|
||||
} else {
|
||||
brs_.set_skip(i);
|
||||
@ -2368,9 +2365,9 @@ int ObHashGroupByVecOp::by_pass_prepare_one_batch(const int64_t batch_size)
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret) || no_non_distinct_aggr_) {
|
||||
if ((last_group && no_non_distinct_aggr_) || has_by_pass_agg_row) {
|
||||
// in bypass && last_group && no_non_distinct_aggr_, brs_.skip_ will be set_all (all skip)
|
||||
if (OB_FAIL(ret) || can_skip_last_group_) {
|
||||
if ((last_group && can_skip_last_group_) || has_by_pass_agg_row) {
|
||||
// in bypass && last_group && can_skip_last_group_, brs_.skip_ will be set_all (all skip)
|
||||
brs_.all_rows_active_ = false;
|
||||
} else {
|
||||
brs_.all_rows_active_ = by_pass_child_brs_->all_rows_active_;
|
||||
@ -2414,7 +2411,7 @@ int ObHashGroupByVecOp::by_pass_get_next_permutation_batch(int64_t &nth_group, b
|
||||
} else if (OB_FAIL(next_duplicate_data_permutation(nth_group, last_group, child_brs,
|
||||
insert_group_ht))) {
|
||||
LOG_WARN("failed to get next permutation", K(ret));
|
||||
} else if (no_non_distinct_aggr_ && last_group) {
|
||||
} else if (can_skip_last_group_ && last_group) {
|
||||
my_brs.skip_->set_all(child_brs->size_);
|
||||
} else {
|
||||
CK (dup_groupby_exprs_.count() == all_groupby_exprs_.count());
|
||||
|
@ -149,7 +149,7 @@ public:
|
||||
first_batch_from_store_(true),
|
||||
dup_groupby_exprs_(),
|
||||
is_dumped_(nullptr),
|
||||
no_non_distinct_aggr_(false),
|
||||
can_skip_last_group_(false),
|
||||
start_calc_hash_idx_(0),
|
||||
base_hash_vals_(nullptr),
|
||||
has_calc_base_hash_(false),
|
||||
@ -398,7 +398,7 @@ private:
|
||||
ObSEArray<ObExpr*, 4> dup_groupby_exprs_;
|
||||
ObSEArray<ObExpr*, 4> all_groupby_exprs_;
|
||||
bool *is_dumped_;
|
||||
bool no_non_distinct_aggr_;
|
||||
bool can_skip_last_group_;
|
||||
int64_t start_calc_hash_idx_;
|
||||
uint64_t *base_hash_vals_;
|
||||
bool has_calc_base_hash_;
|
||||
|
Loading…
x
Reference in New Issue
Block a user