Fix unexpected error of bypass aggregation

This commit is contained in:
obdev
2024-09-20 09:10:35 +00:00
committed by ob-robot
parent 839e3466c3
commit 91b606dfdb
3 changed files with 27 additions and 11 deletions

View File

@ -464,7 +464,8 @@ public:
const int32_t output_start_idx,
const int32_t expect_batch_size,
int32_t &output_size,
const ObBitVector *skip = nullptr) = 0;
const ObBitVector *skip = nullptr,
const bool init_vector = true) = 0;
inline virtual int collect_batch_group_results(RuntimeContext &agg_ctx,
const int32_t agg_col_id,

View File

@ -319,7 +319,8 @@ public:
int collect_batch_group_results(RuntimeContext &agg_ctx, const int32_t agg_col_id,
const int32_t cur_group_id, const int32_t output_start_idx,
const int32_t expect_batch_size, int32_t &output_size,
const ObBitVector *skip = nullptr) override
const ObBitVector *skip = nullptr,
const bool init_vector = true) override
{
int ret = OB_SUCCESS;
OB_ASSERT(agg_col_id < agg_ctx.aggr_infos_.count());
@ -332,7 +333,7 @@ public:
SQL_LOG(DEBUG, "no need to collect", K(ret), K(agg_ctx.agg_rows_.count()), K(cur_group_id));
} else {
output_size = 0;
if (OB_FAIL(agg_expr.init_vector_for_write(
if (init_vector && OB_FAIL(agg_expr.init_vector_for_write(
agg_ctx.eval_ctx_, agg_expr.get_default_res_format(), expect_batch_size))) {
SQL_LOG(WARN, "init vector for write failed", K(ret));
} else {

View File

@ -602,12 +602,12 @@ int Processor::single_row_agg_batch(AggrRowPtr *agg_rows, const int64_t batch_si
SQL_LOG(WARN, "unexpected null aggregate rows", K(ret));
} else if (FALSE_IT(MEMSET(agg_rows[0], 0, get_aggregate_row_size() * batch_size))) {
} else if (!support_fast_single_row_agg_) {
sql::EvalBound bound(1, true);
for (int i = 0; OB_SUCC(ret) && i < batch_size; i++) {
if (skip.at(i)) {
} else if (OB_FAIL(setup_rt_info(agg_rows[i], agg_ctx_))) {
SQL_LOG(WARN, "setup runtime info failed", K(ret));
} else {
sql::EvalBound bound(batch_size, i, i + 1, true);
AggrRowPtr row = agg_rows[i];
int32_t aggr_cell_len = 0;
int32_t output_size = 0;
@ -620,17 +620,31 @@ int Processor::single_row_agg_batch(AggrRowPtr *agg_rows, const int64_t batch_si
bound,
aggr_cell))) {
SQL_LOG(WARN, "add batch rows failed", K(ret));
} else if (OB_FAIL(aggregates_.at(agg_col_id)->collect_batch_group_results(
agg_ctx_, agg_col_id, i, i, 1,
output_size))) {
SQL_LOG(WARN, "collect result batch faile", K(ret));
} else if (OB_UNLIKELY(output_size != 1)) {
ret = OB_ERR_UNEXPECTED;
SQL_LOG(WARN, "invalid output size", K(output_size));
}
} // end for
}
} // end for
// must do init vector here, otherwise value stored in agg_expr is reset unexpectedly.
for (int col_id = 0; OB_SUCC(ret) && col_id < aggregates_.count(); col_id++) {
ObExpr *agg_expr = agg_ctx_.aggr_infos_.at(col_id).expr_;
if (OB_FAIL(agg_expr->init_vector_for_write(
agg_ctx_.eval_ctx_, agg_expr->get_default_res_format(), batch_size))) {
LOG_WARN("init vector for write failed", K(ret));
}
} // end for
for (int i = 0; OB_SUCC(ret) && i < batch_size; i++) {
int32_t output_size = 0;
for (int agg_col_id = 0; OB_SUCC(ret) && agg_col_id < aggregates_.count(); agg_col_id++) {
if (OB_FAIL(aggregates_.at(agg_col_id)->collect_batch_group_results(
agg_ctx_, agg_col_id, i, i, 1, output_size, nullptr, false))) {
SQL_LOG(WARN, "collect result batch faile", K(ret));
} else if (OB_UNLIKELY(output_size != 1)) {
ret = OB_ERR_UNEXPECTED;
SQL_LOG(WARN, "invalid output size", K(output_size));
}
} // end for
}
} else {
EvalBound bound(batch_size, skip.accumulate_bit_cnt(batch_size) == 0);
int32_t output_size = 0;