Remove default implementation of IAggergate::add_one_row
This commit is contained in:
@ -108,6 +108,41 @@ public:
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline int add_one_row(RuntimeContext &agg_ctx, int64_t batch_idx, int64_t batch_size,
|
||||||
|
const bool is_null, const char *data, const int32_t data_len,
|
||||||
|
int32_t agg_col_idx, char *agg_cell) override
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
NotNullBitVector ¬_nulls = agg_ctx.locate_notnulls_bitmap(agg_col_idx, agg_cell);
|
||||||
|
if (OB_LIKELY(not_nulls.at(agg_col_idx))) {
|
||||||
|
// already copied
|
||||||
|
} else if (!is_null) {
|
||||||
|
if (data_len > 0) {
|
||||||
|
if (OB_ISNULL(data)) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
SQL_LOG(WARN, "invalid null payload", K(ret));
|
||||||
|
} else {
|
||||||
|
void *tmp_buf = agg_ctx.allocator_.alloc(data_len);
|
||||||
|
if (OB_ISNULL(tmp_buf)) {
|
||||||
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
|
SQL_LOG(WARN, "allocate memory failed", K(ret));
|
||||||
|
} else {
|
||||||
|
MEMCPY(tmp_buf, data, data_len);
|
||||||
|
// store data ptr and len
|
||||||
|
agg_ctx.set_agg_cell((char *)tmp_buf, data_len, agg_col_idx, agg_cell);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
agg_ctx.set_agg_cell(nullptr, data_len, agg_col_idx, agg_cell);
|
||||||
|
}
|
||||||
|
not_nulls.set(agg_col_idx);
|
||||||
|
} else {
|
||||||
|
agg_ctx.set_agg_cell(nullptr, INT32_MAX, agg_col_idx, agg_cell);
|
||||||
|
not_nulls.set(agg_col_idx);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
template <typename ColumnFmt>
|
template <typename ColumnFmt>
|
||||||
inline int collect_group_result(RuntimeContext &agg_ctx, const sql::ObExpr &agg_expr,
|
inline int collect_group_result(RuntimeContext &agg_ctx, const sql::ObExpr &agg_expr,
|
||||||
const int32_t agg_col_id, const char *agg_cell,
|
const int32_t agg_col_id, const char *agg_cell,
|
||||||
|
|||||||
@ -89,18 +89,6 @@ public:
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
inline int add_one_row(RuntimeContext &agg_ctx, int64_t batch_idx, int64_t batch_size,
|
|
||||||
const bool is_null, const char *data, const int32_t data_len,
|
|
||||||
int32_t agg_col_idx, char *agg_cell) override
|
|
||||||
{
|
|
||||||
UNUSEDx(is_null, data, data_len);
|
|
||||||
int ret = OB_SUCCESS;
|
|
||||||
sql::EvalBound bound(batch_size, batch_idx, batch_idx + 1, true);
|
|
||||||
char mock_skip_data[1] = {0};
|
|
||||||
ObBitVector &mock_skip = *to_bit_vector(mock_skip_data);
|
|
||||||
return static_cast<Derived *>(this)->add_batch_rows(agg_ctx, agg_col_idx, mock_skip, bound,
|
|
||||||
agg_cell);
|
|
||||||
}
|
|
||||||
int add_batch_rows(RuntimeContext &agg_ctx, const int32_t agg_col_id,
|
int add_batch_rows(RuntimeContext &agg_ctx, const int32_t agg_col_id,
|
||||||
const sql::ObBitVector &skip, const sql::EvalBound &bound, char *agg_cell,
|
const sql::ObBitVector &skip, const sql::EvalBound &bound, char *agg_cell,
|
||||||
const RowSelector row_sel = RowSelector{}) override
|
const RowSelector row_sel = RowSelector{}) override
|
||||||
@ -709,6 +697,17 @@ public:
|
|||||||
return OB_SUCCESS;
|
return OB_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline int add_one_row(RuntimeContext &agg_ctx, int64_t batch_idx, int64_t batch_size,
|
||||||
|
const bool is_null, const char *data, const int32_t data_len,
|
||||||
|
int32_t agg_col_idx, char *agg_cell) override
|
||||||
|
{
|
||||||
|
// FIXME: opt performance
|
||||||
|
sql::EvalBound bound(batch_size, batch_idx, batch_idx + 1, true);
|
||||||
|
char mock_skip_data[1] = {0};
|
||||||
|
ObBitVector &mock_skip = *to_bit_vector(mock_skip_data);
|
||||||
|
return static_cast<Aggregate *>(agg_)->add_batch_rows(agg_ctx, agg_col_idx, mock_skip, bound,
|
||||||
|
agg_cell);
|
||||||
|
}
|
||||||
void reuse() override
|
void reuse() override
|
||||||
{
|
{
|
||||||
if (agg_ != NULL) {
|
if (agg_ != NULL) {
|
||||||
|
|||||||
@ -654,11 +654,19 @@ int Processor::prepare_adding_one_row()
|
|||||||
for (int i = 0; OB_SUCC(ret) && i < agg_ctx_.aggr_infos_.count(); i++) {
|
for (int i = 0; OB_SUCC(ret) && i < agg_ctx_.aggr_infos_.count(); i++) {
|
||||||
ObAggrInfo &info = agg_ctx_.aggr_infos_.at(i);
|
ObAggrInfo &info = agg_ctx_.aggr_infos_.at(i);
|
||||||
add_one_row_fn fn_ptr = nullptr;
|
add_one_row_fn fn_ptr = nullptr;
|
||||||
if (info.param_exprs_.count() <= 0 || info.param_exprs_.count() > 1) {
|
if ((info.param_exprs_.count() <= 0 && !info.is_implicit_first_aggr())
|
||||||
|
|| info.param_exprs_.count() > 1) {
|
||||||
fn_ptr = aggregate::add_one_row<ObVectorBase>;
|
fn_ptr = aggregate::add_one_row<ObVectorBase>;
|
||||||
} else {
|
} else {
|
||||||
ObDatumMeta meta = info.param_exprs_.at(0)->datum_meta_;
|
ObDatumMeta meta;
|
||||||
VectorFormat fmt = info.param_exprs_.at(0)->get_format(agg_ctx_.eval_ctx_);
|
VectorFormat fmt;
|
||||||
|
if (info.is_implicit_first_aggr()) {
|
||||||
|
meta = info.expr_->datum_meta_;
|
||||||
|
fmt = info.expr_->get_format(agg_ctx_.eval_ctx_);
|
||||||
|
} else {
|
||||||
|
meta = info.param_exprs_.at(0)->datum_meta_;
|
||||||
|
fmt = info.param_exprs_.at(0)->get_format(agg_ctx_.eval_ctx_);
|
||||||
|
}
|
||||||
VecValueTypeClass vec_tc = get_vec_value_tc(meta.type_, meta.scale_, meta.precision_);
|
VecValueTypeClass vec_tc = get_vec_value_tc(meta.type_, meta.scale_, meta.precision_);
|
||||||
switch(fmt) {
|
switch(fmt) {
|
||||||
case common::VEC_UNIFORM: {
|
case common::VEC_UNIFORM: {
|
||||||
|
|||||||
@ -1,4 +1,4 @@
|
|||||||
/**get
|
/**
|
||||||
* Copyright (c) 2021 OceanBase
|
* Copyright (c) 2021 OceanBase
|
||||||
* OceanBase CE is licensed under Mulan PubL v2.
|
* OceanBase CE is licensed under Mulan PubL v2.
|
||||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||||
|
|||||||
@ -94,6 +94,29 @@ public:
|
|||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
inline int add_one_row(RuntimeContext &agg_ctx, int64_t batch_idx, int64_t batch_size,
|
||||||
|
const bool is_null, const char *data, const int32_t data_len,
|
||||||
|
int32_t agg_col_idx, char *agg_cell) override
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
AggrRowPtr agg_row = agg_ctx.agg_rows_.at(batch_idx);
|
||||||
|
NotNullBitVector ¬nulls = agg_ctx.row_meta().locate_notnulls_bitmap(agg_row);
|
||||||
|
if (agg_func != T_FUN_COUNT) {
|
||||||
|
if (OB_LIKELY(!is_null)) {
|
||||||
|
char *cell = agg_ctx.row_meta().locate_cell_payload(agg_col_idx, agg_row);
|
||||||
|
if (helper::is_var_len_agg_cell(in_tc)) {
|
||||||
|
*reinterpret_cast<int64_t *>(cell) = reinterpret_cast<int64_t>(data);
|
||||||
|
*reinterpret_cast<int32_t *>(cell + sizeof(char *)) = data_len;
|
||||||
|
} else {
|
||||||
|
MEMCPY(cell, data, data_len);
|
||||||
|
}
|
||||||
|
notnulls.set(agg_col_idx);
|
||||||
|
}
|
||||||
|
} else if (!is_null) { // COUNT function, only need to set not null
|
||||||
|
notnulls.set(agg_col_idx);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
template <typename ColumnFmt>
|
template <typename ColumnFmt>
|
||||||
inline int add_row(RuntimeContext &agg_ctx, ColumnFmt &columns, const int32_t row_num,
|
inline int add_row(RuntimeContext &agg_ctx, ColumnFmt &columns, const int32_t row_num,
|
||||||
const int32_t agg_col_id, char *agg_cell, void *tmp_res, int64_t &calc_info)
|
const int32_t agg_col_id, char *agg_cell, void *tmp_res, int64_t &calc_info)
|
||||||
|
|||||||
@ -1080,6 +1080,8 @@ int ObHashGroupByVecOp::load_data_batch(int64_t max_row_cnt)
|
|||||||
for (int64_t i = 0; i < MY_SPEC.aggr_infos_.count(); ++i) {
|
for (int64_t i = 0; i < MY_SPEC.aggr_infos_.count(); ++i) {
|
||||||
if (MY_SPEC.aggr_infos_.at(i).param_exprs_.count() == 1) {
|
if (MY_SPEC.aggr_infos_.at(i).param_exprs_.count() == 1) {
|
||||||
aggr_vectors_[i] = MY_SPEC.aggr_infos_.at(i).param_exprs_.at(0)->get_vector(eval_ctx_);
|
aggr_vectors_[i] = MY_SPEC.aggr_infos_.at(i).param_exprs_.at(0)->get_vector(eval_ctx_);
|
||||||
|
} else if (MY_SPEC.aggr_infos_.at(i).is_implicit_first_aggr()) {
|
||||||
|
aggr_vectors_[i] = MY_SPEC.aggr_infos_.at(i).expr_->get_vector(eval_ctx_);
|
||||||
} else {
|
} else {
|
||||||
aggr_vectors_[i] = nullptr;
|
aggr_vectors_[i] = nullptr;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user