[ARRAY] fix array with hash group by aggr

This commit is contained in:
helloamateur 2024-12-13 06:14:39 +00:00 committed by ob-robot
parent d2b62f476b
commit 1e95aafac0
3 changed files with 29 additions and 21 deletions

View File

@ -14,7 +14,6 @@
#define OCEANBASE_SHARE_AGGREGATE_FIRST_ROW_H_
#include "iaggregate.h"
#include "sql/engine/expr/ob_array_expr_utils.h"
namespace oceanbase
{

View File

@ -26,6 +26,7 @@
#include "share/aggregate/agg_ctx.h"
#include "share/aggregate/aggr_extra.h"
#include "sql/engine/basic/ob_compact_row.h"
#include "sql/engine/expr/ob_array_expr_utils.h"
namespace oceanbase
{
@ -240,7 +241,7 @@ public:
#define INNER_ADD(vec_tc) \
case (vec_tc): { \
ret = inner_add_for_multi_groups<ObFixedLengthFormat<RTCType<vec_tc>>>( \
agg_ctx, agg_rows, row_sel, batch_size, agg_col_id, param_expr->get_vector(eval_ctx)); \
agg_ctx, agg_rows, row_sel, batch_size, agg_col_id, param_expr); \
} break
int ret = OB_SUCCESS;
@ -248,7 +249,6 @@ public:
ObEvalCtx &eval_ctx = agg_ctx.eval_ctx_;
VectorFormat fmt = VEC_INVALID;
ObExpr *param_expr = nullptr;
ObIVector *param_vec = nullptr;
Derived *derived_this = static_cast<Derived *>(this);
#ifndef NDEBUG
int64_t mock_skip_data = 0;
@ -272,29 +272,27 @@ public:
SQL_LOG(WARN, "inner add one row failed", K(ret));
}
}
} else if (OB_FAIL(get_nested_expr_vec(agg_ctx, param_expr, param_vec))) {
SQL_LOG(WARN, "get nested expr vec failed", K(ret));
} else {
VecValueTypeClass vec_tc = param_expr->get_vec_value_tc();
switch(fmt) {
case common::VEC_UNIFORM: {
ret = inner_add_for_multi_groups<uniform_fmt<Derived::IN_TC, false>>(
agg_ctx, agg_rows, row_sel, batch_size, agg_col_id, param_vec);
agg_ctx, agg_rows, row_sel, batch_size, agg_col_id, param_expr);
break;
}
case common::VEC_UNIFORM_CONST: {
ret = inner_add_for_multi_groups<uniform_fmt<Derived::IN_TC, true>>(
agg_ctx, agg_rows, row_sel, batch_size, agg_col_id, param_vec);
agg_ctx, agg_rows, row_sel, batch_size, agg_col_id, param_expr);
break;
}
case common::VEC_DISCRETE: {
ret = inner_add_for_multi_groups<discrete_fmt<Derived::IN_TC>>(
agg_ctx, agg_rows, row_sel, batch_size, agg_col_id, param_vec);
agg_ctx, agg_rows, row_sel, batch_size, agg_col_id, param_expr);
break;
}
case common::VEC_CONTINUOUS: {
ret = inner_add_for_multi_groups<continuous_fmt<Derived::IN_TC>>(
agg_ctx, agg_rows, row_sel, batch_size, agg_col_id, param_vec);
agg_ctx, agg_rows, row_sel, batch_size, agg_col_id, param_expr);
break;
}
case common::VEC_FIXED: {
@ -443,19 +441,32 @@ protected:
template <typename ColumnFmt>
int inner_add_for_multi_groups(RuntimeContext &agg_ctx, AggrRowPtr *agg_rows, RowSelector &row_sel,
const int64_t batch_size, const int32_t agg_col_id,
ObIVector *ivec)
ObExpr *param_expr)
{
int ret = OB_SUCCESS;
ObIVector *ivec = param_expr->get_vector(agg_ctx.eval_ctx_);
ColumnFmt *param_vec = static_cast<ColumnFmt *>(ivec);
VectorFormat fmt = ivec->get_format();
ObArenaAllocator tmp_allocator;
bool is_null = false;
const char *payload = nullptr;
int32_t len = 0;
Derived *derived_this = static_cast<Derived *>(this);
for (int i = 0; OB_SUCC(ret) && i < row_sel.size(); i++) {
int64_t batch_idx = row_sel.index(i);
param_vec->get_payload(batch_idx, is_null, payload, len);
if (param_expr->is_nested_expr() && !is_uniform_format(ivec->get_format())) {
payload = nullptr;
if (OB_FAIL(ObArrayExprUtils::get_collection_payload(tmp_allocator, agg_ctx.eval_ctx_, *param_expr, batch_idx, payload, len))) {
SQL_LOG(WARN, "get nested collection payload failed", K(ret));
} else {
is_null = (payload == nullptr);
}
} else {
param_vec->get_payload(batch_idx, is_null, payload, len);
}
char *agg_cell = agg_ctx.row_meta().locate_cell_payload(agg_col_id, agg_rows[batch_idx]);
if (OB_FAIL(derived_this->add_one_row(agg_ctx, batch_idx, batch_size, is_null, payload, len,
if (OB_FAIL(ret)) {
} else if (OB_FAIL(derived_this->add_one_row(agg_ctx, batch_idx, batch_size, is_null, payload, len,
agg_col_id, agg_cell))) {
SQL_LOG(WARN, "inner add one row failed", K(ret));
}
@ -465,7 +476,7 @@ protected:
template <>
int inner_add_for_multi_groups<ObVectorBase>(RuntimeContext &agg_ctx, AggrRowPtr *agg_rows,
RowSelector &row_sel, const int64_t batch_size,
const int32_t agg_col_id, ObIVector *ivec)
const int32_t agg_col_id, ObExpr *param_expr)
{
return OB_NOT_IMPLEMENT;
}

View File

@ -725,10 +725,9 @@ int ObVectorsResultHolder::drive_row_extended(int64_t from_idx, int64_t start_ds
VectorFormat extend_format = get_single_row_restore_format(backup_cols_[i].header_.get_format(), exprs_->at(i));
if (OB_FAIL(exprs_->at(i)->init_vector(*eval_ctx_, extend_format, eval_ctx_->max_batch_size_))) {
LOG_WARN("failed to init vector for backup expr", K(i), K(backup_cols_[i].header_.get_format()), K(ret));
} else if (exprs_->at(i)->is_nested_expr() && !is_uniform_format(backup_format)) {
if (OB_FAIL(backup_cols_[i].extend_nested_rows(*exprs_->at(i), *eval_ctx_, extend_format, from_idx, start_dst_idx, size))) {
LOG_WARN("failed to restore nested single row", K(ret), K(i), K(backup_cols_[i].header_.get_format()));
}
} else if (exprs_->at(i)->is_nested_expr() && !is_uniform_format(backup_format)
&& OB_FAIL(backup_cols_[i].extend_nested_rows(*exprs_->at(i), *eval_ctx_, extend_format, from_idx, start_dst_idx, size))) {
LOG_WARN("failed to restore nested single row", K(ret), K(i), K(backup_cols_[i].header_.get_format()));
} else {
switch(backup_format) {
case VEC_FIXED:
@ -775,10 +774,9 @@ int ObVectorsResultHolder::restore_single_row(int64_t from_idx, int64_t to_idx)
VectorFormat extend_format = get_single_row_restore_format(backup_cols_[i].header_.get_format(), exprs_->at(i));
if (OB_FAIL(exprs_->at(i)->init_vector(*eval_ctx_, extend_format, eval_ctx_->max_batch_size_))) {
LOG_WARN("failed to init vector for backup expr", K(i), K(backup_cols_[i].header_.get_format()), K(ret));
} else if (exprs_->at(i)->is_nested_expr() && !is_uniform_format(backup_cols_[i].header_.format_)) {
if (OB_FAIL(backup_cols_[i].restore_nested_single_row(*exprs_->at(i), *eval_ctx_, extend_format, from_idx, to_idx))) {
LOG_WARN("failed to restore nested single row", K(ret), K(i), K(backup_cols_[i].header_.get_format()));
}
} else if (exprs_->at(i)->is_nested_expr() && !is_uniform_format(backup_cols_[i].header_.format_) &&
OB_FAIL(backup_cols_[i].restore_nested_single_row(*exprs_->at(i), *eval_ctx_, extend_format, from_idx, to_idx))) {
LOG_WARN("failed to restore nested single row", K(ret), K(i), K(backup_cols_[i].header_.get_format()));
} else {
VectorFormat format = backup_cols_[i].header_.format_;
LOG_TRACE("vector format is", K(format));