Fix distinct aggregation out of memory error
This commit is contained in:
@ -206,6 +206,29 @@ void ObAggregateProcessor::AggrCell::destroy()
|
||||
}
|
||||
}
|
||||
|
||||
int ObAggregateProcessor::AggrCell::deep_copy_advance_collect_result(const ObDatum &datum, ObIAllocator &alloc)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t pos = 0;
|
||||
uint32_t len = datum.len_ > 0 ? datum.len_: 1;
|
||||
if (datum.is_null()) {
|
||||
advance_collect_result_ = datum;
|
||||
} else {
|
||||
if (NULL == collect_buf_ || collect_buf_len_ < len) {
|
||||
collect_buf_len_ = next_pow2(len);
|
||||
if (OB_ISNULL(collect_buf_ = static_cast<char *>(alloc.alloc(collect_buf_len_)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("allocate memory failed", K(collect_buf_len_), K(datum), K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(advance_collect_result_.deep_copy(datum, collect_buf_, collect_buf_len_, pos))) {
|
||||
LOG_WARN("failed to deep copy datum", K(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObAggregateProcessor::AggrCell::collect_result(
|
||||
const ObObjTypeClass tc, ObEvalCtx &eval_ctx, const ObAggrInfo &aggr_info)
|
||||
{
|
||||
@ -771,7 +794,8 @@ ObAggregateProcessor::ObAggregateProcessor(ObEvalCtx &eval_ctx,
|
||||
support_fast_single_row_agg_(false),
|
||||
op_eval_infos_(nullptr),
|
||||
profile_(ObSqlWorkAreaType::HASH_WORK_AREA),
|
||||
op_monitor_info_(op_monitor_info)
|
||||
op_monitor_info_(op_monitor_info),
|
||||
need_advance_collect_(false)
|
||||
{
|
||||
}
|
||||
|
||||
@ -933,6 +957,7 @@ void ObAggregateProcessor::reuse()
|
||||
group_rows_.reuse();
|
||||
cur_batch_group_idx_ = 0;
|
||||
cur_batch_group_buf_ = nullptr;
|
||||
need_advance_collect_ = false;
|
||||
aggr_alloc_.reset_remain_one_page();
|
||||
removal_info_.reset();
|
||||
}
|
||||
@ -1376,6 +1401,41 @@ int ObAggregateProcessor::process_batch(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObAggregateProcessor::advance_collect_result(int64_t group_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t aggr_cnt = aggr_infos_.count();
|
||||
GroupRow *group_row = NULL;
|
||||
ObEvalCtx::BatchInfoScopeGuard guard(eval_ctx_);
|
||||
guard.set_batch_size(1);
|
||||
for (int64_t aggr_idx = 0; OB_SUCC(ret) && aggr_idx < aggr_cnt; ++aggr_idx) {
|
||||
const ObAggrInfo &aggr_info = aggr_infos_.at(aggr_idx);
|
||||
group_row = group_rows_.at(group_id);
|
||||
AggrCell &aggr_cell = group_row->aggr_cells_[aggr_idx];
|
||||
if (aggr_cell.get_need_advance_collect()) {
|
||||
if (aggr_info.has_distinct_) {
|
||||
if (OB_FAIL(process_distinct_batch(0, aggr_cell, aggr_info, eval_ctx_.max_batch_size_))) {
|
||||
LOG_WARN("aggregate distinct cell failed", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
guard.set_batch_idx(0);
|
||||
if (OB_FAIL(collect_aggr_result(aggr_cell, NULL, aggr_info))) {
|
||||
LOG_WARN("collect_aggr_result failed", K(ret), K(group_id), K(aggr_idx));
|
||||
} else if (OB_FAIL(aggr_cell.deep_copy_advance_collect_result(
|
||||
aggr_info.expr_->locate_expr_datum(eval_ctx_), aggr_alloc_))) {
|
||||
LOG_WARN("failed to deep copy datum", K(ret));
|
||||
} else {
|
||||
LOG_TRACE("finish collect", K(group_id), K(aggr_cell), K(aggr_cell.get_advance_collect_result()));
|
||||
}
|
||||
}
|
||||
aggr_cell.set_is_advance_evaluated();
|
||||
aggr_cell.reuse_extra();
|
||||
}
|
||||
} // end for
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObAggregateProcessor::collect_result_batch(const ObIArray<ObExpr *> &group_exprs,
|
||||
const int64_t output_batch_size,
|
||||
ObBatchRows &output_brs,
|
||||
@ -1397,27 +1457,33 @@ int ObAggregateProcessor::collect_result_batch(const ObIArray<ObExpr *> &group_e
|
||||
int64_t output_batch_idx = output_brs.size_ + loop_idx;
|
||||
group_row = group_rows_.at(group_cur_idx);
|
||||
AggrCell &aggr_cell = group_row->aggr_cells_[aggr_idx];
|
||||
if (aggr_info.has_distinct_) {
|
||||
if (OB_FAIL(process_distinct_batch(0, aggr_cell, aggr_info, loop_cnt))) {
|
||||
LOG_WARN("aggregate distinct cell failed", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (aggr_info.is_implicit_first_aggr()) {
|
||||
// aggr_info.expr_ skip check null, check in cg
|
||||
// judge aggr_cell.get_is_evaluated() whether implicit expr is evaluated
|
||||
aggr_info.expr_->locate_expr_datumvector(eval_ctx_)
|
||||
.at(output_batch_idx)
|
||||
->set_datum(aggr_cell.get_iter_result());
|
||||
LOG_DEBUG("first aggr result ", K(aggr_cell.get_iter_result()),
|
||||
guard.set_batch_idx(output_batch_idx);
|
||||
if (aggr_cell.get_need_advance_collect() && aggr_cell.get_is_advance_evaluated()) {
|
||||
aggr_info.expr_->locate_datum_for_write(eval_ctx_).set_datum(aggr_cell.get_advance_collect_result());
|
||||
LOG_TRACE("fill aggr result ", K(aggr_cell.get_advance_collect_result()),
|
||||
K(aggr_cell.get_is_evaluated()));
|
||||
} else {
|
||||
guard.set_batch_idx(output_batch_idx);
|
||||
if (OB_FAIL(collect_aggr_result(aggr_cell, NULL, aggr_info))) {
|
||||
LOG_WARN("collect_aggr_result failed", K(ret));
|
||||
if (aggr_info.has_distinct_) {
|
||||
if (OB_FAIL(process_distinct_batch(0, aggr_cell, aggr_info, loop_cnt))) {
|
||||
LOG_WARN("aggregate distinct cell failed", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (aggr_info.is_implicit_first_aggr()) {
|
||||
// aggr_info.expr_ skip check null, check in cg
|
||||
// judge aggr_cell.get_is_evaluated() whether implicit expr is evaluated
|
||||
aggr_info.expr_->locate_expr_datumvector(eval_ctx_)
|
||||
.at(output_batch_idx)
|
||||
->set_datum(aggr_cell.get_iter_result());
|
||||
LOG_DEBUG("first aggr result ", K(aggr_cell.get_iter_result()),
|
||||
K(aggr_cell.get_is_evaluated()));
|
||||
} else {
|
||||
if (OB_FAIL(collect_aggr_result(aggr_cell, NULL, aggr_info))) {
|
||||
LOG_WARN("collect_aggr_result failed", K(ret));
|
||||
}
|
||||
LOG_DEBUG("finish collect", K(cur_group_id), K(aggr_cell), K(group_cur_idx),
|
||||
K(output_batch_idx));
|
||||
}
|
||||
LOG_DEBUG("finish collect", K(cur_group_id), K(aggr_cell), K(group_cur_idx),
|
||||
K(output_batch_idx));
|
||||
}
|
||||
} // end for
|
||||
aggr_info.expr_->get_eval_info(eval_ctx_).projected_ = true;
|
||||
@ -1787,6 +1853,8 @@ int ObAggregateProcessor::generate_group_row(GroupRow *&new_group_row,
|
||||
case T_FUN_ORA_XMLAGG:
|
||||
{
|
||||
void *tmp_buf = NULL;
|
||||
set_need_advance_collect();
|
||||
aggr_cell.set_need_advance_collect();
|
||||
if (OB_ISNULL(tmp_buf = aggr_alloc_.alloc(sizeof(GroupConcatExtraResult)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("allocate memory failed", K(ret));
|
||||
@ -1851,6 +1919,8 @@ int ObAggregateProcessor::generate_group_row(GroupRow *&new_group_row,
|
||||
}
|
||||
case T_FUN_TOP_FRE_HIST: {
|
||||
void *tmp_buf = NULL;
|
||||
set_need_advance_collect();
|
||||
aggr_cell.set_need_advance_collect();
|
||||
if (OB_ISNULL(tmp_buf = aggr_alloc_.alloc(sizeof(TopKFreHistExtraResult)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("allocate memory failed", K(ret));
|
||||
@ -1891,6 +1961,8 @@ int ObAggregateProcessor::generate_group_row(GroupRow *&new_group_row,
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && aggr_info.has_distinct_) {
|
||||
set_need_advance_collect();
|
||||
aggr_cell.set_need_advance_collect();
|
||||
if (NULL == aggr_cell.get_extra()) {
|
||||
void *tmp_buf = NULL;
|
||||
if (OB_ISNULL(tmp_buf = aggr_alloc_.alloc(sizeof(ExtraResult)))) {
|
||||
|
||||
@ -443,9 +443,13 @@ public:
|
||||
: tiny_num_int_(0),
|
||||
extra_(NULL),
|
||||
iter_result_(),
|
||||
flags_(0)
|
||||
flags_(0),
|
||||
collect_buf_(NULL),
|
||||
collect_buf_len_(0),
|
||||
advance_collect_result_()
|
||||
{
|
||||
iter_result_.set_null();
|
||||
advance_collect_result_.set_null();
|
||||
}
|
||||
~AggrCell();
|
||||
|
||||
@ -472,6 +476,12 @@ public:
|
||||
? (iter_result_.ptr_ - 2 * sizeof(int64_t)) : nullptr; }
|
||||
inline void set_buf(char *buf) { iter_result_.ptr_ = buf + 2 * sizeof(int64_t); }
|
||||
int64_t to_string(char *buf, const int64_t buf_len) const;
|
||||
int deep_copy_advance_collect_result(const ObDatum &datum, ObIAllocator &alloc);
|
||||
ObDatum &get_advance_collect_result() { return advance_collect_result_; }
|
||||
void set_need_advance_collect() { need_advance_collect_ = true; }
|
||||
bool get_need_advance_collect() const { return need_advance_collect_; }
|
||||
void set_is_advance_evaluated() { is_advance_evaluated_ = true; }
|
||||
bool get_is_advance_evaluated() const { return is_advance_evaluated_; }
|
||||
inline void reuse(const bool release_mem = true)
|
||||
{
|
||||
UNUSED(release_mem);
|
||||
@ -481,6 +491,13 @@ public:
|
||||
if (NULL != extra_) {
|
||||
extra_->reuse();
|
||||
}
|
||||
advance_collect_result_.set_null();
|
||||
}
|
||||
inline void reuse_extra()
|
||||
{
|
||||
if (NULL != extra_) {
|
||||
extra_->reuse();
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
@ -498,9 +515,13 @@ public:
|
||||
struct {
|
||||
int32_t is_tiny_num_used_ : 1;
|
||||
int32_t is_evaluated_ : 1;
|
||||
int32_t need_advance_collect_ : 1;
|
||||
int32_t is_advance_evaluated_ : 1;
|
||||
};
|
||||
};
|
||||
|
||||
char *collect_buf_;
|
||||
int64_t collect_buf_len_;
|
||||
ObDatum advance_collect_result_;
|
||||
};
|
||||
|
||||
|
||||
@ -632,6 +653,7 @@ public:
|
||||
const int64_t output_batch_size,
|
||||
ObBatchRows &output_brs,
|
||||
int64_t &cur_group_id);
|
||||
int advance_collect_result(int64_t cur_group_id);
|
||||
int process_distinct_batch(const int64_t group_id,
|
||||
AggrCell &aggr_cell,
|
||||
const ObAggrInfo &aggr_info,
|
||||
@ -742,6 +764,8 @@ public:
|
||||
int fast_single_row_agg(ObEvalCtx &eval_ctx, ObIArray<ObAggrInfo> &aggr_infos);
|
||||
int fast_single_row_agg_batch(ObEvalCtx &eval_ctx, const int64_t batch_size, const ObBitVector *skip);
|
||||
inline void set_support_fast_single_row_agg(const bool flag) { support_fast_single_row_agg_ = flag; }
|
||||
void set_need_advance_collect() { need_advance_collect_ = true; }
|
||||
bool get_need_advance_collect() const { return need_advance_collect_; }
|
||||
static int llc_add_value(const uint64_t value, char *llc_bitmap_buf, int64_t size);
|
||||
private:
|
||||
template <typename T>
|
||||
@ -1028,6 +1052,7 @@ private:
|
||||
ObIArray<ObEvalInfo *> *op_eval_infos_;
|
||||
ObSqlWorkAreaProfile profile_;
|
||||
ObMonitorNode &op_monitor_info_;
|
||||
bool need_advance_collect_;
|
||||
};
|
||||
|
||||
struct ObAggregateCalcFunc
|
||||
|
||||
@ -782,8 +782,14 @@ int ObMergeGroupByOp::get_child_next_batch_row(
|
||||
K(const_cast<ObBatchRows *>(batch_rows)->size_));
|
||||
}
|
||||
} else {
|
||||
if (OB_FAIL(child_->get_next_batch(max_row_cnt, batch_rows))) {
|
||||
if (aggr_processor_.get_need_advance_collect() &&
|
||||
brs_holder_.is_saved() && OB_FAIL(brs_holder_.restore())) {
|
||||
LOG_WARN("failed to restore child exprs", K(ret));
|
||||
} else if (OB_FAIL(child_->get_next_batch(max_row_cnt, batch_rows))) {
|
||||
LOG_WARN("failed to get child row", K(ret));
|
||||
} else if (aggr_processor_.get_need_advance_collect() &&
|
||||
OB_FAIL(brs_holder_.save(MY_SPEC.max_batch_size_))) {
|
||||
LOG_WARN("failed to backup child exprs", K(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -884,6 +890,16 @@ int ObMergeGroupByOp::batch_process_rollup_distributor(const int64_t max_row_cnt
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMergeGroupByOp::advance_collect_result(int64_t group_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
clear_evaluated_flag();
|
||||
if (OB_FAIL(aggr_processor_.advance_collect_result(group_id))) {
|
||||
LOG_WARN("failed to calc and material distinct result", K(ret), K(group_id));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMergeGroupByOp::inner_get_next_batch(const int64_t max_row_cnt)
|
||||
{
|
||||
// TODO qubin.qb: support rollup in next release
|
||||
@ -914,6 +930,7 @@ int ObMergeGroupByOp::inner_get_next_batch(const int64_t max_row_cnt)
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
brs_holder_.reset();
|
||||
while (OB_SUCC(ret) &&
|
||||
OB_SUCC(get_child_next_batch_row(child_batch_cnt, child_brs))) {
|
||||
if (child_brs->end_ && child_brs->size_ == 0) {
|
||||
@ -941,6 +958,7 @@ int ObMergeGroupByOp::inner_get_next_batch(const int64_t max_row_cnt)
|
||||
if (OB_SUCC(ret) && child_brs->end_ && !OB_ISNULL(cur_group_row_)) {
|
||||
// add last unfinised grouprow into output group
|
||||
inc_output_queue_cnt();
|
||||
const int64_t advance_collect_group_id = curr_group_rowid_;
|
||||
if (MY_SPEC.has_rollup_) {
|
||||
int64_t start_rollup_id = MY_SPEC.group_exprs_.count() - 1;
|
||||
int64_t end_rollup_id = all_groupby_exprs_.count() - 1;
|
||||
@ -972,6 +990,11 @@ int ObMergeGroupByOp::inner_get_next_batch(const int64_t max_row_cnt)
|
||||
LOG_WARN("failed to genereate rollup group row", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (aggr_processor_.get_need_advance_collect()
|
||||
&& OB_FAIL(advance_collect_result(advance_collect_group_id))) {
|
||||
LOG_WARN("failed to collect distinct result", K(ret), K(advance_collect_group_id));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret) &&
|
||||
OB_FAIL(calc_batch_results(child_brs->end_, output_batch_cnt))) {
|
||||
@ -1247,7 +1270,10 @@ int ObMergeGroupByOp::gen_rollup_group_rows(
|
||||
++curr_group_rowid_;
|
||||
curr_group_row = nullptr;
|
||||
inc_output_queue_cnt();
|
||||
if (OB_FAIL(get_empty_rollup_row(curr_group_rowid_, curr_group_row))) {
|
||||
if (aggr_processor_.get_need_advance_collect()
|
||||
&& OB_FAIL(advance_collect_result(prev_group_row_id))) {
|
||||
LOG_WARN("failed to calc and material distinct result", K(ret), K(prev_group_row_id));
|
||||
} else if (OB_FAIL(get_empty_rollup_row(curr_group_rowid_, curr_group_row))) {
|
||||
LOG_WARN("failed to get one new group row", K(ret));
|
||||
} else if (OB_FAIL(aggr_processor_.swap_group_row(prev_group_row_id, curr_group_rowid_))) {
|
||||
LOG_WARN("failed to swap group row", K(ret));
|
||||
@ -1353,6 +1379,7 @@ int ObMergeGroupByOp::process_batch(const ObBatchRows &brs)
|
||||
LOG_WARN("failed to aggregate_group_rows", K(curr_group_rowid_), K(ret),
|
||||
K(group_start_idx), K(group_end_idx));
|
||||
} else {
|
||||
const int64_t advance_collect_group_id = curr_group_rowid_;
|
||||
if (MY_SPEC.has_rollup_) {
|
||||
int64_t start_rollup_id = diff_group_idx;
|
||||
int64_t end_rollup_id = all_groupby_exprs_.count() - 1;
|
||||
@ -1385,14 +1412,19 @@ int ObMergeGroupByOp::process_batch(const ObBatchRows &brs)
|
||||
LOG_WARN("failed to genereate rollup group row", K(ret));
|
||||
}
|
||||
}
|
||||
++curr_group_rowid_;
|
||||
// create new group
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(get_cur_group_row(curr_group_rowid_, cur_group_row_,
|
||||
all_groupby_exprs_, all_groupby_exprs_.count()))) {
|
||||
LOG_WARN("failed to get one new group row", K(ret));
|
||||
} else if (aggr_processor_.get_need_advance_collect()
|
||||
&& OB_FAIL(advance_collect_result(advance_collect_group_id))) {
|
||||
LOG_WARN("failed to collect distinct result", K(ret), K(advance_collect_group_id));
|
||||
} else {
|
||||
group_start_idx = idx; // record new start idx in next round
|
||||
++curr_group_rowid_;
|
||||
// create new group
|
||||
if (OB_FAIL(get_cur_group_row(curr_group_rowid_, cur_group_row_,
|
||||
all_groupby_exprs_, all_groupby_exprs_.count()))) {
|
||||
LOG_WARN("failed to get one new group row", K(ret));
|
||||
} else {
|
||||
group_start_idx = idx; // record new start idx in next round
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -206,6 +206,7 @@ private:
|
||||
uint64_t *hash_vals,
|
||||
ObBitVector *skip,
|
||||
int64_t count);
|
||||
int advance_collect_result(int64_t group_id);
|
||||
private:
|
||||
bool is_end_;
|
||||
// added to support groupby with rollup
|
||||
|
||||
Reference in New Issue
Block a user