diff --git a/src/sql/engine/aggregate/ob_aggregate_processor.cpp b/src/sql/engine/aggregate/ob_aggregate_processor.cpp index a67557d41b..f7d57a4faa 100644 --- a/src/sql/engine/aggregate/ob_aggregate_processor.cpp +++ b/src/sql/engine/aggregate/ob_aggregate_processor.cpp @@ -206,6 +206,29 @@ void ObAggregateProcessor::AggrCell::destroy() } } +int ObAggregateProcessor::AggrCell::deep_copy_advance_collect_result(const ObDatum &datum, ObIAllocator &alloc) +{ + int ret = OB_SUCCESS; + int64_t pos = 0; + uint32_t len = datum.len_ > 0 ? datum.len_: 1; + if (datum.is_null()) { + advance_collect_result_ = datum; + } else { + if (NULL == collect_buf_ || collect_buf_len_ < len) { + collect_buf_len_ = next_pow2(len); + if (OB_ISNULL(collect_buf_ = static_cast(alloc.alloc(collect_buf_len_)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("allocate memory failed", K(collect_buf_len_), K(datum), K(ret)); + } + } + if (OB_FAIL(ret)) { + } else if (OB_FAIL(advance_collect_result_.deep_copy(datum, collect_buf_, collect_buf_len_, pos))) { + LOG_WARN("failed to deep copy datum", K(ret)); + } + } + return ret; +} + int ObAggregateProcessor::AggrCell::collect_result( const ObObjTypeClass tc, ObEvalCtx &eval_ctx, const ObAggrInfo &aggr_info) { @@ -771,7 +794,8 @@ ObAggregateProcessor::ObAggregateProcessor(ObEvalCtx &eval_ctx, support_fast_single_row_agg_(false), op_eval_infos_(nullptr), profile_(ObSqlWorkAreaType::HASH_WORK_AREA), - op_monitor_info_(op_monitor_info) + op_monitor_info_(op_monitor_info), + need_advance_collect_(false) { } @@ -933,6 +957,7 @@ void ObAggregateProcessor::reuse() group_rows_.reuse(); cur_batch_group_idx_ = 0; cur_batch_group_buf_ = nullptr; + need_advance_collect_ = false; aggr_alloc_.reset_remain_one_page(); removal_info_.reset(); } @@ -1376,6 +1401,41 @@ int ObAggregateProcessor::process_batch( return ret; } +int ObAggregateProcessor::advance_collect_result(int64_t group_id) +{ + int ret = OB_SUCCESS; + int64_t aggr_cnt = aggr_infos_.count(); + GroupRow *group_row = NULL; + ObEvalCtx::BatchInfoScopeGuard guard(eval_ctx_); + guard.set_batch_size(1); + for (int64_t aggr_idx = 0; OB_SUCC(ret) && aggr_idx < aggr_cnt; ++aggr_idx) { + const ObAggrInfo &aggr_info = aggr_infos_.at(aggr_idx); + group_row = group_rows_.at(group_id); + AggrCell &aggr_cell = group_row->aggr_cells_[aggr_idx]; + if (aggr_cell.get_need_advance_collect()) { + if (aggr_info.has_distinct_) { + if (OB_FAIL(process_distinct_batch(0, aggr_cell, aggr_info, eval_ctx_.max_batch_size_))) { + LOG_WARN("aggregate distinct cell failed", K(ret)); + } + } + if (OB_SUCC(ret)) { + guard.set_batch_idx(0); + if (OB_FAIL(collect_aggr_result(aggr_cell, NULL, aggr_info))) { + LOG_WARN("collect_aggr_result failed", K(ret), K(group_id), K(aggr_idx)); + } else if (OB_FAIL(aggr_cell.deep_copy_advance_collect_result( + aggr_info.expr_->locate_expr_datum(eval_ctx_), aggr_alloc_))) { + LOG_WARN("failed to deep copy datum", K(ret)); + } else { + LOG_TRACE("finish collect", K(group_id), K(aggr_cell), K(aggr_cell.get_advance_collect_result())); + } + } + aggr_cell.set_is_advance_evaluated(); + aggr_cell.reuse_extra(); + } + } // end for + return ret; +} + int ObAggregateProcessor::collect_result_batch(const ObIArray &group_exprs, const int64_t output_batch_size, ObBatchRows &output_brs, @@ -1397,27 +1457,33 @@ int ObAggregateProcessor::collect_result_batch(const ObIArray &group_e int64_t output_batch_idx = output_brs.size_ + loop_idx; group_row = group_rows_.at(group_cur_idx); AggrCell &aggr_cell = group_row->aggr_cells_[aggr_idx]; - if (aggr_info.has_distinct_) { - if (OB_FAIL(process_distinct_batch(0, aggr_cell, aggr_info, loop_cnt))) { - LOG_WARN("aggregate distinct cell failed", K(ret)); - } - } - if (OB_FAIL(ret)) { - } else if (aggr_info.is_implicit_first_aggr()) { - // aggr_info.expr_ skip check null, check in cg - // judge aggr_cell.get_is_evaluated() whether implicit expr is evaluated - aggr_info.expr_->locate_expr_datumvector(eval_ctx_) - .at(output_batch_idx) - ->set_datum(aggr_cell.get_iter_result()); - LOG_DEBUG("first aggr result ", K(aggr_cell.get_iter_result()), + guard.set_batch_idx(output_batch_idx); + if (aggr_cell.get_need_advance_collect() && aggr_cell.get_is_advance_evaluated()) { + aggr_info.expr_->locate_datum_for_write(eval_ctx_).set_datum(aggr_cell.get_advance_collect_result()); + LOG_TRACE("fill aggr result ", K(aggr_cell.get_advance_collect_result()), K(aggr_cell.get_is_evaluated())); } else { - guard.set_batch_idx(output_batch_idx); - if (OB_FAIL(collect_aggr_result(aggr_cell, NULL, aggr_info))) { - LOG_WARN("collect_aggr_result failed", K(ret)); + if (aggr_info.has_distinct_) { + if (OB_FAIL(process_distinct_batch(0, aggr_cell, aggr_info, loop_cnt))) { + LOG_WARN("aggregate distinct cell failed", K(ret)); + } + } + if (OB_FAIL(ret)) { + } else if (aggr_info.is_implicit_first_aggr()) { + // aggr_info.expr_ skip check null, check in cg + // judge aggr_cell.get_is_evaluated() whether implicit expr is evaluated + aggr_info.expr_->locate_expr_datumvector(eval_ctx_) + .at(output_batch_idx) + ->set_datum(aggr_cell.get_iter_result()); + LOG_DEBUG("first aggr result ", K(aggr_cell.get_iter_result()), + K(aggr_cell.get_is_evaluated())); + } else { + if (OB_FAIL(collect_aggr_result(aggr_cell, NULL, aggr_info))) { + LOG_WARN("collect_aggr_result failed", K(ret)); + } + LOG_DEBUG("finish collect", K(cur_group_id), K(aggr_cell), K(group_cur_idx), + K(output_batch_idx)); } - LOG_DEBUG("finish collect", K(cur_group_id), K(aggr_cell), K(group_cur_idx), - K(output_batch_idx)); } } // end for aggr_info.expr_->get_eval_info(eval_ctx_).projected_ = true; @@ -1787,6 +1853,8 @@ int ObAggregateProcessor::generate_group_row(GroupRow *&new_group_row, case T_FUN_ORA_XMLAGG: { void *tmp_buf = NULL; + set_need_advance_collect(); + aggr_cell.set_need_advance_collect(); if (OB_ISNULL(tmp_buf = aggr_alloc_.alloc(sizeof(GroupConcatExtraResult)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("allocate memory failed", K(ret)); @@ -1851,6 +1919,8 @@ int ObAggregateProcessor::generate_group_row(GroupRow *&new_group_row, } case T_FUN_TOP_FRE_HIST: { void *tmp_buf = NULL; + set_need_advance_collect(); + aggr_cell.set_need_advance_collect(); if (OB_ISNULL(tmp_buf = aggr_alloc_.alloc(sizeof(TopKFreHistExtraResult)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("allocate memory failed", K(ret)); @@ -1891,6 +1961,8 @@ int ObAggregateProcessor::generate_group_row(GroupRow *&new_group_row, } if (OB_SUCC(ret) && aggr_info.has_distinct_) { + set_need_advance_collect(); + aggr_cell.set_need_advance_collect(); if (NULL == aggr_cell.get_extra()) { void *tmp_buf = NULL; if (OB_ISNULL(tmp_buf = aggr_alloc_.alloc(sizeof(ExtraResult)))) { diff --git a/src/sql/engine/aggregate/ob_aggregate_processor.h b/src/sql/engine/aggregate/ob_aggregate_processor.h index 1e120e75cb..13d48f93e3 100644 --- a/src/sql/engine/aggregate/ob_aggregate_processor.h +++ b/src/sql/engine/aggregate/ob_aggregate_processor.h @@ -443,9 +443,13 @@ public: : tiny_num_int_(0), extra_(NULL), iter_result_(), - flags_(0) + flags_(0), + collect_buf_(NULL), + collect_buf_len_(0), + advance_collect_result_() { iter_result_.set_null(); + advance_collect_result_.set_null(); } ~AggrCell(); @@ -472,6 +476,12 @@ public: ? (iter_result_.ptr_ - 2 * sizeof(int64_t)) : nullptr; } inline void set_buf(char *buf) { iter_result_.ptr_ = buf + 2 * sizeof(int64_t); } int64_t to_string(char *buf, const int64_t buf_len) const; + int deep_copy_advance_collect_result(const ObDatum &datum, ObIAllocator &alloc); + ObDatum &get_advance_collect_result() { return advance_collect_result_; } + void set_need_advance_collect() { need_advance_collect_ = true; } + bool get_need_advance_collect() const { return need_advance_collect_; } + void set_is_advance_evaluated() { is_advance_evaluated_ = true; } + bool get_is_advance_evaluated() const { return is_advance_evaluated_; } inline void reuse(const bool release_mem = true) { UNUSED(release_mem); @@ -481,6 +491,13 @@ public: if (NULL != extra_) { extra_->reuse(); } + advance_collect_result_.set_null(); + } + inline void reuse_extra() + { + if (NULL != extra_) { + extra_->reuse(); + } } private: @@ -498,9 +515,13 @@ public: struct { int32_t is_tiny_num_used_ : 1; int32_t is_evaluated_ : 1; + int32_t need_advance_collect_ : 1; + int32_t is_advance_evaluated_ : 1; }; }; - + char *collect_buf_; + int64_t collect_buf_len_; + ObDatum advance_collect_result_; }; @@ -632,6 +653,7 @@ public: const int64_t output_batch_size, ObBatchRows &output_brs, int64_t &cur_group_id); + int advance_collect_result(int64_t cur_group_id); int process_distinct_batch(const int64_t group_id, AggrCell &aggr_cell, const ObAggrInfo &aggr_info, @@ -742,6 +764,8 @@ public: int fast_single_row_agg(ObEvalCtx &eval_ctx, ObIArray &aggr_infos); int fast_single_row_agg_batch(ObEvalCtx &eval_ctx, const int64_t batch_size, const ObBitVector *skip); inline void set_support_fast_single_row_agg(const bool flag) { support_fast_single_row_agg_ = flag; } + void set_need_advance_collect() { need_advance_collect_ = true; } + bool get_need_advance_collect() const { return need_advance_collect_; } static int llc_add_value(const uint64_t value, char *llc_bitmap_buf, int64_t size); private: template @@ -1028,6 +1052,7 @@ private: ObIArray *op_eval_infos_; ObSqlWorkAreaProfile profile_; ObMonitorNode &op_monitor_info_; + bool need_advance_collect_; }; struct ObAggregateCalcFunc diff --git a/src/sql/engine/aggregate/ob_merge_groupby_op.cpp b/src/sql/engine/aggregate/ob_merge_groupby_op.cpp index 94515405fc..cf11c78736 100644 --- a/src/sql/engine/aggregate/ob_merge_groupby_op.cpp +++ b/src/sql/engine/aggregate/ob_merge_groupby_op.cpp @@ -782,8 +782,14 @@ int ObMergeGroupByOp::get_child_next_batch_row( K(const_cast(batch_rows)->size_)); } } else { - if (OB_FAIL(child_->get_next_batch(max_row_cnt, batch_rows))) { + if (aggr_processor_.get_need_advance_collect() && + brs_holder_.is_saved() && OB_FAIL(brs_holder_.restore())) { + LOG_WARN("failed to restore child exprs", K(ret)); + } else if (OB_FAIL(child_->get_next_batch(max_row_cnt, batch_rows))) { LOG_WARN("failed to get child row", K(ret)); + } else if (aggr_processor_.get_need_advance_collect() && + OB_FAIL(brs_holder_.save(MY_SPEC.max_batch_size_))) { + LOG_WARN("failed to backup child exprs", K(ret)); } } return ret; @@ -884,6 +890,16 @@ int ObMergeGroupByOp::batch_process_rollup_distributor(const int64_t max_row_cnt return ret; } +int ObMergeGroupByOp::advance_collect_result(int64_t group_id) +{ + int ret = OB_SUCCESS; + clear_evaluated_flag(); + if (OB_FAIL(aggr_processor_.advance_collect_result(group_id))) { + LOG_WARN("failed to calc and material distinct result", K(ret), K(group_id)); + } + return ret; +} + int ObMergeGroupByOp::inner_get_next_batch(const int64_t max_row_cnt) { // TODO qubin.qb: support rollup in next release @@ -914,6 +930,7 @@ int ObMergeGroupByOp::inner_get_next_batch(const int64_t max_row_cnt) } if (OB_SUCC(ret)) { + brs_holder_.reset(); while (OB_SUCC(ret) && OB_SUCC(get_child_next_batch_row(child_batch_cnt, child_brs))) { if (child_brs->end_ && child_brs->size_ == 0) { @@ -941,6 +958,7 @@ int ObMergeGroupByOp::inner_get_next_batch(const int64_t max_row_cnt) if (OB_SUCC(ret) && child_brs->end_ && !OB_ISNULL(cur_group_row_)) { // add last unfinised grouprow into output group inc_output_queue_cnt(); + const int64_t advance_collect_group_id = curr_group_rowid_; if (MY_SPEC.has_rollup_) { int64_t start_rollup_id = MY_SPEC.group_exprs_.count() - 1; int64_t end_rollup_id = all_groupby_exprs_.count() - 1; @@ -972,6 +990,11 @@ int ObMergeGroupByOp::inner_get_next_batch(const int64_t max_row_cnt) LOG_WARN("failed to genereate rollup group row", K(ret)); } } + if (OB_FAIL(ret)) { + } else if (aggr_processor_.get_need_advance_collect() + && OB_FAIL(advance_collect_result(advance_collect_group_id))) { + LOG_WARN("failed to collect distinct result", K(ret), K(advance_collect_group_id)); + } } if (OB_SUCC(ret) && OB_FAIL(calc_batch_results(child_brs->end_, output_batch_cnt))) { @@ -1247,7 +1270,10 @@ int ObMergeGroupByOp::gen_rollup_group_rows( ++curr_group_rowid_; curr_group_row = nullptr; inc_output_queue_cnt(); - if (OB_FAIL(get_empty_rollup_row(curr_group_rowid_, curr_group_row))) { + if (aggr_processor_.get_need_advance_collect() + && OB_FAIL(advance_collect_result(prev_group_row_id))) { + LOG_WARN("failed to calc and material distinct result", K(ret), K(prev_group_row_id)); + } else if (OB_FAIL(get_empty_rollup_row(curr_group_rowid_, curr_group_row))) { LOG_WARN("failed to get one new group row", K(ret)); } else if (OB_FAIL(aggr_processor_.swap_group_row(prev_group_row_id, curr_group_rowid_))) { LOG_WARN("failed to swap group row", K(ret)); @@ -1353,6 +1379,7 @@ int ObMergeGroupByOp::process_batch(const ObBatchRows &brs) LOG_WARN("failed to aggregate_group_rows", K(curr_group_rowid_), K(ret), K(group_start_idx), K(group_end_idx)); } else { + const int64_t advance_collect_group_id = curr_group_rowid_; if (MY_SPEC.has_rollup_) { int64_t start_rollup_id = diff_group_idx; int64_t end_rollup_id = all_groupby_exprs_.count() - 1; @@ -1385,14 +1412,19 @@ int ObMergeGroupByOp::process_batch(const ObBatchRows &brs) LOG_WARN("failed to genereate rollup group row", K(ret)); } } - ++curr_group_rowid_; - // create new group if (OB_FAIL(ret)) { - } else if (OB_FAIL(get_cur_group_row(curr_group_rowid_, cur_group_row_, - all_groupby_exprs_, all_groupby_exprs_.count()))) { - LOG_WARN("failed to get one new group row", K(ret)); + } else if (aggr_processor_.get_need_advance_collect() + && OB_FAIL(advance_collect_result(advance_collect_group_id))) { + LOG_WARN("failed to collect distinct result", K(ret), K(advance_collect_group_id)); } else { - group_start_idx = idx; // record new start idx in next round + ++curr_group_rowid_; + // create new group + if (OB_FAIL(get_cur_group_row(curr_group_rowid_, cur_group_row_, + all_groupby_exprs_, all_groupby_exprs_.count()))) { + LOG_WARN("failed to get one new group row", K(ret)); + } else { + group_start_idx = idx; // record new start idx in next round + } } } } diff --git a/src/sql/engine/aggregate/ob_merge_groupby_op.h b/src/sql/engine/aggregate/ob_merge_groupby_op.h index 84489ee968..95876046ea 100644 --- a/src/sql/engine/aggregate/ob_merge_groupby_op.h +++ b/src/sql/engine/aggregate/ob_merge_groupby_op.h @@ -206,6 +206,7 @@ private: uint64_t *hash_vals, ObBitVector *skip, int64_t count); + int advance_collect_result(int64_t group_id); private: bool is_end_; // added to support groupby with rollup