Remove default implementation of IAggergate::add_one_row

This commit is contained in:
Zach41
2024-01-04 15:42:34 +00:00
committed by ob-robot
parent cdb7b711a6
commit ac69a200d2
6 changed files with 83 additions and 16 deletions

View File

@ -108,6 +108,41 @@ public:
return ret;
}
inline int add_one_row(RuntimeContext &agg_ctx, int64_t batch_idx, int64_t batch_size,
const bool is_null, const char *data, const int32_t data_len,
int32_t agg_col_idx, char *agg_cell) override
{
int ret = OB_SUCCESS;
NotNullBitVector &not_nulls = agg_ctx.locate_notnulls_bitmap(agg_col_idx, agg_cell);
if (OB_LIKELY(not_nulls.at(agg_col_idx))) {
// already copied
} else if (!is_null) {
if (data_len > 0) {
if (OB_ISNULL(data)) {
ret = OB_INVALID_ARGUMENT;
SQL_LOG(WARN, "invalid null payload", K(ret));
} else {
void *tmp_buf = agg_ctx.allocator_.alloc(data_len);
if (OB_ISNULL(tmp_buf)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
SQL_LOG(WARN, "allocate memory failed", K(ret));
} else {
MEMCPY(tmp_buf, data, data_len);
// store data ptr and len
agg_ctx.set_agg_cell((char *)tmp_buf, data_len, agg_col_idx, agg_cell);
}
}
} else {
agg_ctx.set_agg_cell(nullptr, data_len, agg_col_idx, agg_cell);
}
not_nulls.set(agg_col_idx);
} else {
agg_ctx.set_agg_cell(nullptr, INT32_MAX, agg_col_idx, agg_cell);
not_nulls.set(agg_col_idx);
}
return ret;
}
template <typename ColumnFmt>
inline int collect_group_result(RuntimeContext &agg_ctx, const sql::ObExpr &agg_expr,
const int32_t agg_col_id, const char *agg_cell,

View File

@ -89,18 +89,6 @@ public:
return 0;
}
inline int add_one_row(RuntimeContext &agg_ctx, int64_t batch_idx, int64_t batch_size,
const bool is_null, const char *data, const int32_t data_len,
int32_t agg_col_idx, char *agg_cell) override
{
UNUSEDx(is_null, data, data_len);
int ret = OB_SUCCESS;
sql::EvalBound bound(batch_size, batch_idx, batch_idx + 1, true);
char mock_skip_data[1] = {0};
ObBitVector &mock_skip = *to_bit_vector(mock_skip_data);
return static_cast<Derived *>(this)->add_batch_rows(agg_ctx, agg_col_idx, mock_skip, bound,
agg_cell);
}
int add_batch_rows(RuntimeContext &agg_ctx, const int32_t agg_col_id,
const sql::ObBitVector &skip, const sql::EvalBound &bound, char *agg_cell,
const RowSelector row_sel = RowSelector{}) override
@ -709,6 +697,17 @@ public:
return OB_SUCCESS;
}
inline int add_one_row(RuntimeContext &agg_ctx, int64_t batch_idx, int64_t batch_size,
const bool is_null, const char *data, const int32_t data_len,
int32_t agg_col_idx, char *agg_cell) override
{
// FIXME: opt performance
sql::EvalBound bound(batch_size, batch_idx, batch_idx + 1, true);
char mock_skip_data[1] = {0};
ObBitVector &mock_skip = *to_bit_vector(mock_skip_data);
return static_cast<Aggregate *>(agg_)->add_batch_rows(agg_ctx, agg_col_idx, mock_skip, bound,
agg_cell);
}
void reuse() override
{
if (agg_ != NULL) {

View File

@ -654,11 +654,19 @@ int Processor::prepare_adding_one_row()
for (int i = 0; OB_SUCC(ret) && i < agg_ctx_.aggr_infos_.count(); i++) {
ObAggrInfo &info = agg_ctx_.aggr_infos_.at(i);
add_one_row_fn fn_ptr = nullptr;
if (info.param_exprs_.count() <= 0 || info.param_exprs_.count() > 1) {
if ((info.param_exprs_.count() <= 0 && !info.is_implicit_first_aggr())
|| info.param_exprs_.count() > 1) {
fn_ptr = aggregate::add_one_row<ObVectorBase>;
} else {
ObDatumMeta meta = info.param_exprs_.at(0)->datum_meta_;
VectorFormat fmt = info.param_exprs_.at(0)->get_format(agg_ctx_.eval_ctx_);
ObDatumMeta meta;
VectorFormat fmt;
if (info.is_implicit_first_aggr()) {
meta = info.expr_->datum_meta_;
fmt = info.expr_->get_format(agg_ctx_.eval_ctx_);
} else {
meta = info.param_exprs_.at(0)->datum_meta_;
fmt = info.param_exprs_.at(0)->get_format(agg_ctx_.eval_ctx_);
}
VecValueTypeClass vec_tc = get_vec_value_tc(meta.type_, meta.scale_, meta.precision_);
switch(fmt) {
case common::VEC_UNIFORM: {

View File

@ -1,4 +1,4 @@
/**get
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.

View File

@ -94,6 +94,29 @@ public:
}
return ret;
}
inline int add_one_row(RuntimeContext &agg_ctx, int64_t batch_idx, int64_t batch_size,
const bool is_null, const char *data, const int32_t data_len,
int32_t agg_col_idx, char *agg_cell) override
{
int ret = OB_SUCCESS;
AggrRowPtr agg_row = agg_ctx.agg_rows_.at(batch_idx);
NotNullBitVector &notnulls = agg_ctx.row_meta().locate_notnulls_bitmap(agg_row);
if (agg_func != T_FUN_COUNT) {
if (OB_LIKELY(!is_null)) {
char *cell = agg_ctx.row_meta().locate_cell_payload(agg_col_idx, agg_row);
if (helper::is_var_len_agg_cell(in_tc)) {
*reinterpret_cast<int64_t *>(cell) = reinterpret_cast<int64_t>(data);
*reinterpret_cast<int32_t *>(cell + sizeof(char *)) = data_len;
} else {
MEMCPY(cell, data, data_len);
}
notnulls.set(agg_col_idx);
}
} else if (!is_null) { // COUNT function, only need to set not null
notnulls.set(agg_col_idx);
}
return ret;
}
template <typename ColumnFmt>
inline int add_row(RuntimeContext &agg_ctx, ColumnFmt &columns, const int32_t row_num,
const int32_t agg_col_id, char *agg_cell, void *tmp_res, int64_t &calc_info)

View File

@ -1080,6 +1080,8 @@ int ObHashGroupByVecOp::load_data_batch(int64_t max_row_cnt)
for (int64_t i = 0; i < MY_SPEC.aggr_infos_.count(); ++i) {
if (MY_SPEC.aggr_infos_.at(i).param_exprs_.count() == 1) {
aggr_vectors_[i] = MY_SPEC.aggr_infos_.at(i).param_exprs_.at(0)->get_vector(eval_ctx_);
} else if (MY_SPEC.aggr_infos_.at(i).is_implicit_first_aggr()) {
aggr_vectors_[i] = MY_SPEC.aggr_infos_.at(i).expr_->get_vector(eval_ctx_);
} else {
aggr_vectors_[i] = nullptr;
}