diff --git a/src/share/aggregate/first_row.h b/src/share/aggregate/first_row.h index 0c960bfd7e..fa6b6cf2d1 100644 --- a/src/share/aggregate/first_row.h +++ b/src/share/aggregate/first_row.h @@ -108,6 +108,41 @@ public: return ret; } + inline int add_one_row(RuntimeContext &agg_ctx, int64_t batch_idx, int64_t batch_size, + const bool is_null, const char *data, const int32_t data_len, + int32_t agg_col_idx, char *agg_cell) override + { + int ret = OB_SUCCESS; + NotNullBitVector ¬_nulls = agg_ctx.locate_notnulls_bitmap(agg_col_idx, agg_cell); + if (OB_LIKELY(not_nulls.at(agg_col_idx))) { + // already copied + } else if (!is_null) { + if (data_len > 0) { + if (OB_ISNULL(data)) { + ret = OB_INVALID_ARGUMENT; + SQL_LOG(WARN, "invalid null payload", K(ret)); + } else { + void *tmp_buf = agg_ctx.allocator_.alloc(data_len); + if (OB_ISNULL(tmp_buf)) { + ret = OB_ALLOCATE_MEMORY_FAILED; + SQL_LOG(WARN, "allocate memory failed", K(ret)); + } else { + MEMCPY(tmp_buf, data, data_len); + // store data ptr and len + agg_ctx.set_agg_cell((char *)tmp_buf, data_len, agg_col_idx, agg_cell); + } + } + } else { + agg_ctx.set_agg_cell(nullptr, data_len, agg_col_idx, agg_cell); + } + not_nulls.set(agg_col_idx); + } else { + agg_ctx.set_agg_cell(nullptr, INT32_MAX, agg_col_idx, agg_cell); + not_nulls.set(agg_col_idx); + } + return ret; + } + template inline int collect_group_result(RuntimeContext &agg_ctx, const sql::ObExpr &agg_expr, const int32_t agg_col_id, const char *agg_cell, diff --git a/src/share/aggregate/iaggregate.h b/src/share/aggregate/iaggregate.h index a14ccb5f8b..7fe2ae5534 100644 --- a/src/share/aggregate/iaggregate.h +++ b/src/share/aggregate/iaggregate.h @@ -89,18 +89,6 @@ public: return 0; } - inline int add_one_row(RuntimeContext &agg_ctx, int64_t batch_idx, int64_t batch_size, - const bool is_null, const char *data, const int32_t data_len, - int32_t agg_col_idx, char *agg_cell) override - { - UNUSEDx(is_null, data, data_len); - int ret = OB_SUCCESS; - sql::EvalBound bound(batch_size, batch_idx, batch_idx + 1, true); - char mock_skip_data[1] = {0}; - ObBitVector &mock_skip = *to_bit_vector(mock_skip_data); - return static_cast(this)->add_batch_rows(agg_ctx, agg_col_idx, mock_skip, bound, - agg_cell); - } int add_batch_rows(RuntimeContext &agg_ctx, const int32_t agg_col_id, const sql::ObBitVector &skip, const sql::EvalBound &bound, char *agg_cell, const RowSelector row_sel = RowSelector{}) override @@ -709,6 +697,17 @@ public: return OB_SUCCESS; } + inline int add_one_row(RuntimeContext &agg_ctx, int64_t batch_idx, int64_t batch_size, + const bool is_null, const char *data, const int32_t data_len, + int32_t agg_col_idx, char *agg_cell) override + { + // FIXME: opt performance + sql::EvalBound bound(batch_size, batch_idx, batch_idx + 1, true); + char mock_skip_data[1] = {0}; + ObBitVector &mock_skip = *to_bit_vector(mock_skip_data); + return static_cast(agg_)->add_batch_rows(agg_ctx, agg_col_idx, mock_skip, bound, + agg_cell); + } void reuse() override { if (agg_ != NULL) { diff --git a/src/share/aggregate/processor.cpp b/src/share/aggregate/processor.cpp index 8a26e885ca..5a29c41820 100644 --- a/src/share/aggregate/processor.cpp +++ b/src/share/aggregate/processor.cpp @@ -654,11 +654,19 @@ int Processor::prepare_adding_one_row() for (int i = 0; OB_SUCC(ret) && i < agg_ctx_.aggr_infos_.count(); i++) { ObAggrInfo &info = agg_ctx_.aggr_infos_.at(i); add_one_row_fn fn_ptr = nullptr; - if (info.param_exprs_.count() <= 0 || info.param_exprs_.count() > 1) { + if ((info.param_exprs_.count() <= 0 && !info.is_implicit_first_aggr()) + || info.param_exprs_.count() > 1) { fn_ptr = aggregate::add_one_row; } else { - ObDatumMeta meta = info.param_exprs_.at(0)->datum_meta_; - VectorFormat fmt = info.param_exprs_.at(0)->get_format(agg_ctx_.eval_ctx_); + ObDatumMeta meta; + VectorFormat fmt; + if (info.is_implicit_first_aggr()) { + meta = info.expr_->datum_meta_; + fmt = info.expr_->get_format(agg_ctx_.eval_ctx_); + } else { + meta = info.param_exprs_.at(0)->datum_meta_; + fmt = info.param_exprs_.at(0)->get_format(agg_ctx_.eval_ctx_); + } VecValueTypeClass vec_tc = get_vec_value_tc(meta.type_, meta.scale_, meta.precision_); switch(fmt) { case common::VEC_UNIFORM: { diff --git a/src/share/aggregate/processor.h b/src/share/aggregate/processor.h index 88b586d2ec..6a33a441cb 100644 --- a/src/share/aggregate/processor.h +++ b/src/share/aggregate/processor.h @@ -1,4 +1,4 @@ -/**get +/** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. diff --git a/src/share/aggregate/single_row.h b/src/share/aggregate/single_row.h index a601bc645c..8e566f37fb 100644 --- a/src/share/aggregate/single_row.h +++ b/src/share/aggregate/single_row.h @@ -94,6 +94,29 @@ public: } return ret; } + inline int add_one_row(RuntimeContext &agg_ctx, int64_t batch_idx, int64_t batch_size, + const bool is_null, const char *data, const int32_t data_len, + int32_t agg_col_idx, char *agg_cell) override + { + int ret = OB_SUCCESS; + AggrRowPtr agg_row = agg_ctx.agg_rows_.at(batch_idx); + NotNullBitVector ¬nulls = agg_ctx.row_meta().locate_notnulls_bitmap(agg_row); + if (agg_func != T_FUN_COUNT) { + if (OB_LIKELY(!is_null)) { + char *cell = agg_ctx.row_meta().locate_cell_payload(agg_col_idx, agg_row); + if (helper::is_var_len_agg_cell(in_tc)) { + *reinterpret_cast(cell) = reinterpret_cast(data); + *reinterpret_cast(cell + sizeof(char *)) = data_len; + } else { + MEMCPY(cell, data, data_len); + } + notnulls.set(agg_col_idx); + } + } else if (!is_null) { // COUNT function, only need to set not null + notnulls.set(agg_col_idx); + } + return ret; + } template inline int add_row(RuntimeContext &agg_ctx, ColumnFmt &columns, const int32_t row_num, const int32_t agg_col_id, char *agg_cell, void *tmp_res, int64_t &calc_info) diff --git a/src/sql/engine/aggregate/ob_hash_groupby_vec_op.cpp b/src/sql/engine/aggregate/ob_hash_groupby_vec_op.cpp index 1593c563f4..301d7d8df6 100644 --- a/src/sql/engine/aggregate/ob_hash_groupby_vec_op.cpp +++ b/src/sql/engine/aggregate/ob_hash_groupby_vec_op.cpp @@ -1080,6 +1080,8 @@ int ObHashGroupByVecOp::load_data_batch(int64_t max_row_cnt) for (int64_t i = 0; i < MY_SPEC.aggr_infos_.count(); ++i) { if (MY_SPEC.aggr_infos_.at(i).param_exprs_.count() == 1) { aggr_vectors_[i] = MY_SPEC.aggr_infos_.at(i).param_exprs_.at(0)->get_vector(eval_ctx_); + } else if (MY_SPEC.aggr_infos_.at(i).is_implicit_first_aggr()) { + aggr_vectors_[i] = MY_SPEC.aggr_infos_.at(i).expr_->get_vector(eval_ctx_); } else { aggr_vectors_[i] = nullptr; }