Fix group by pushdown to storage when vectorize disabled
This commit is contained in:
@ -63,6 +63,7 @@ ObMultipleMerge::ObMultipleMerge()
|
||||
get_table_param_(nullptr),
|
||||
read_memtable_only_(false),
|
||||
block_row_store_(nullptr),
|
||||
group_by_cell_(nullptr),
|
||||
skip_bit_(nullptr),
|
||||
out_project_cols_(),
|
||||
lob_reader_(),
|
||||
@ -87,6 +88,13 @@ ObMultipleMerge::~ObMultipleMerge()
|
||||
}
|
||||
block_row_store_ = nullptr;
|
||||
}
|
||||
if (nullptr != group_by_cell_) {
|
||||
group_by_cell_->~ObGroupByCell();
|
||||
if (OB_NOT_NULL(access_ctx_->stmt_allocator_)) {
|
||||
access_ctx_->stmt_allocator_->free(group_by_cell_);
|
||||
}
|
||||
group_by_cell_ = nullptr;
|
||||
}
|
||||
if (nullptr != skip_bit_) {
|
||||
if (OB_NOT_NULL(access_ctx_->stmt_allocator_)) {
|
||||
access_ctx_->stmt_allocator_->free(skip_bit_);
|
||||
@ -370,6 +378,11 @@ int ObMultipleMerge::get_next_row(ObDatumRow *&row)
|
||||
OB_FAIL(access_param_->get_op()->write_trans_info_datum(unprojected_row_))) {
|
||||
LOG_WARN("write trans_info to expr datum failed", K(ret), K(unprojected_row_));
|
||||
} else if (nullptr != row) {
|
||||
if (OB_UNLIKELY(nullptr != group_by_cell_)) {
|
||||
if (OB_FAIL(group_by_cell_->copy_single_output_row(access_param_->get_op()->get_eval_ctx()))) {
|
||||
LOG_WARN("Failed to copy single output row", K(ret));
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -784,6 +797,13 @@ void ObMultipleMerge::reset()
|
||||
}
|
||||
block_row_store_ = nullptr;
|
||||
}
|
||||
if (nullptr != group_by_cell_) {
|
||||
group_by_cell_->~ObGroupByCell();
|
||||
if (OB_NOT_NULL(access_ctx_->stmt_allocator_)) {
|
||||
access_ctx_->stmt_allocator_->free(group_by_cell_);
|
||||
}
|
||||
group_by_cell_ = nullptr;
|
||||
}
|
||||
if (nullptr != skip_bit_) {
|
||||
if (OB_NOT_NULL(access_ctx_->stmt_allocator_)) {
|
||||
access_ctx_->stmt_allocator_->free(skip_bit_);
|
||||
@ -937,6 +957,15 @@ int ObMultipleMerge::alloc_row_store(ObTableAccessContext &context, const ObTabl
|
||||
LOG_WARN("fail to init block row store", K(ret), K(block_row_store_));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret) && param.iter_param_.enable_pd_group_by() && !param.iter_param_.vectorized_enabled_) {
|
||||
if (OB_ISNULL(buf = context.stmt_allocator_->alloc(sizeof(ObGroupByCell)))) {
|
||||
ret = common::OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to alloc aggregated store", K(ret));
|
||||
} else if (FALSE_IT(group_by_cell_ = new (buf) ObGroupByCell(0, *context.stmt_allocator_))) {
|
||||
} else if (OB_FAIL(group_by_cell_->init_for_single_row(param, param.get_op()->get_eval_ctx()))) {
|
||||
LOG_WARN("Failed to init group by cell for single row", K(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -142,6 +142,7 @@ protected:
|
||||
ObGetTableParam *get_table_param_;
|
||||
bool read_memtable_only_;
|
||||
ObBlockRowStore *block_row_store_;
|
||||
ObGroupByCell *group_by_cell_;
|
||||
sql::ObBitVector *skip_bit_;
|
||||
common::ObSEArray<share::schema::ObColDesc, 32> out_project_cols_;
|
||||
ObLobDataReader lob_reader_;
|
||||
|
@ -553,6 +553,22 @@ int ObAggCell::copy_output_rows(const int32_t datum_offset)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObAggCell::copy_single_output_row(sql::ObEvalCtx &ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(basic_info_.agg_expr_->args_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("Unexpected null args of agg expr", K(ret), KPC(basic_info_.agg_expr_));
|
||||
} else {
|
||||
sql::ObDatum &datum = basic_info_.agg_expr_->args_[0]->locate_expr_datum(ctx);
|
||||
common::ObDatum &result_datum = basic_info_.agg_expr_->locate_datum_for_write(ctx);
|
||||
if (OB_FAIL(result_datum.from_storage_datum(datum, basic_info_.agg_expr_->obj_datum_map_))) {
|
||||
LOG_WARN("Failed to clone datum", K(ret), K(datum), K(basic_info_.agg_expr_->obj_datum_map_));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObAggCell::collect_result(sql::ObEvalCtx &ctx, bool need_padding)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -1054,6 +1070,28 @@ int ObCountAggCell::copy_output_rows(const int32_t datum_offset)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObCountAggCell::copy_single_output_row(sql::ObEvalCtx &ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
common::ObDatum &result_datum = basic_info_.agg_expr_->locate_datum_for_write(ctx);
|
||||
if (exclude_null_) {
|
||||
if (OB_ISNULL(basic_info_.agg_expr_->args_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("Unexpected null args of agg expr", K(ret), KPC(basic_info_.agg_expr_));
|
||||
} else {
|
||||
sql::ObDatum &datum = basic_info_.agg_expr_->args_[0]->locate_expr_datum(ctx);
|
||||
if (!datum.is_null()) {
|
||||
lib::is_oracle_mode() ? result_datum.set_number(ObDummyNumber::number_one_) : result_datum.set_int(1);
|
||||
} else {
|
||||
lib::is_oracle_mode() ? result_datum.set_number(ObDummyNumber::number_zero_) : result_datum.set_int(0);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
lib::is_oracle_mode() ? result_datum.set_number(ObDummyNumber::number_one_) : result_datum.set_int(1);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObCountAggCell::collect_batch_result_in_group_by(const int64_t distinct_cnt)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -1933,6 +1971,24 @@ int ObSumAggCell::copy_output_rows(const int32_t datum_offset)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSumAggCell::copy_single_output_row(sql::ObEvalCtx &ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(basic_info_.agg_expr_->args_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("Unexpected null args of agg expr", K(ret), KPC(basic_info_.agg_expr_));
|
||||
} else {
|
||||
sql::ObDatum &datum = basic_info_.agg_expr_->args_[0]->locate_expr_datum(ctx);
|
||||
common::ObDatum &result_datum = basic_info_.agg_expr_->locate_datum_for_write(ctx);
|
||||
if (datum.is_null()) {
|
||||
result_datum.set_null();
|
||||
} else if (OB_FAIL((this->*copy_datum_func_)(datum, result_datum))) {
|
||||
LOG_WARN("Failed to copy output datum", K(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSumAggCell::copy_int_to_number(const ObDatum &datum, ObDatum &result_datum)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -2937,25 +2993,9 @@ int ObGroupByCell::init(const ObTableAccessParam ¶m, sql::ObEvalCtx &eval_ct
|
||||
} else if (OB_ISNULL(group_by_col_datums)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("Unexpected group by col datums", K(ret), K(param));
|
||||
} else if (OB_FAIL(init_agg_cells(param, eval_ctx, false))) {
|
||||
LOG_WARN("Failed to init agg_cells", K(ret));
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < param.aggregate_exprs_->count(); ++i) {
|
||||
int32_t col_offset = param.iter_param_.agg_cols_project_->at(i);
|
||||
int32_t col_index = OB_COUNT_AGG_PD_COLUMN_ID == col_offset ? -1 : param.iter_param_.read_info_->get_columns_index().at(col_offset);
|
||||
const share::schema::ObColumnParam *col_param = OB_COUNT_AGG_PD_COLUMN_ID == col_offset ? nullptr : out_cols_param->at(col_offset);
|
||||
sql::ObExpr *agg_expr = param.aggregate_exprs_->at(i);
|
||||
bool exclude_null = false;
|
||||
if (T_FUN_COUNT == agg_expr->type_) {
|
||||
if (OB_COUNT_AGG_PD_COLUMN_ID != col_offset) {
|
||||
exclude_null = col_param->is_nullable_for_write();
|
||||
}
|
||||
}
|
||||
ObAggCellBasicInfo basic_info(col_offset, col_index, col_param, agg_expr, batch_size_);
|
||||
if (OB_FAIL(agg_cell_factory_.alloc_cell(basic_info, agg_cells_, exclude_null, true, &eval_ctx))) {
|
||||
LOG_WARN("Failed to alloc agg cell", K(ret), K(i));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (agg_cells_.count() > 2) {
|
||||
std::sort(agg_cells_.begin(), agg_cells_.end(),
|
||||
[](ObAggCell *a, ObAggCell *b) { return a->get_col_offset() < b->get_col_offset(); });
|
||||
@ -2973,6 +3013,20 @@ int ObGroupByCell::init(const ObTableAccessParam ¶m, sql::ObEvalCtx &eval_ct
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObGroupByCell::init_for_single_row(const ObTableAccessParam ¶m, sql::ObEvalCtx &eval_ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(nullptr == param.iter_param_.group_by_cols_project_ ||
|
||||
0 == param.iter_param_.group_by_cols_project_->count() ||
|
||||
nullptr == param.iter_param_.get_col_params())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("Invalid argument", K(ret), K(param.iter_param_));
|
||||
} else if (OB_FAIL(init_agg_cells(param, eval_ctx, true))) {
|
||||
LOG_WARN("Failed to init agg_cells", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObGroupByCell::eval_batch(
|
||||
const common::ObDatum *datums,
|
||||
const int64_t count,
|
||||
@ -3030,6 +3084,18 @@ int ObGroupByCell::copy_output_rows(const int64_t batch_idx)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObGroupByCell::copy_single_output_row(sql::ObEvalCtx &ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
// just shallow copy output datum to agg
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < agg_cells_.count(); ++i) {
|
||||
if (OB_FAIL(agg_cells_.at(i)->copy_single_output_row(ctx))) {
|
||||
LOG_WARN("Failed to copy output row", K(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObGroupByCell::collect_result()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -3228,6 +3294,29 @@ int ObGroupByCell::init_uniform_header(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObGroupByCell::init_agg_cells(const ObTableAccessParam ¶m, sql::ObEvalCtx &eval_ctx, const bool is_for_single_row)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const common::ObIArray<share::schema::ObColumnParam *> *out_cols_param = param.iter_param_.get_col_params();
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < param.aggregate_exprs_->count(); ++i) {
|
||||
int32_t col_offset = param.iter_param_.agg_cols_project_->at(i);
|
||||
int32_t col_index = OB_COUNT_AGG_PD_COLUMN_ID == col_offset ? -1 : param.iter_param_.read_info_->get_columns_index().at(col_offset);
|
||||
const share::schema::ObColumnParam *col_param = OB_COUNT_AGG_PD_COLUMN_ID == col_offset ? nullptr : out_cols_param->at(col_offset);
|
||||
sql::ObExpr *agg_expr = param.aggregate_exprs_->at(i);
|
||||
bool exclude_null = false;
|
||||
if (T_FUN_COUNT == agg_expr->type_) {
|
||||
if (OB_COUNT_AGG_PD_COLUMN_ID != col_offset) {
|
||||
exclude_null = col_param->is_nullable_for_write();
|
||||
}
|
||||
}
|
||||
ObAggCellBasicInfo basic_info(col_offset, col_index, col_param, agg_expr, batch_size_);
|
||||
if (OB_FAIL(agg_cell_factory_.alloc_cell(basic_info, agg_cells_, exclude_null, !is_for_single_row, &eval_ctx))) {
|
||||
LOG_WARN("Failed to alloc agg cell", K(ret), K(i));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int64_t ObGroupByCell::to_string(char *buf, const int64_t buf_len) const
|
||||
{
|
||||
int64_t pos = 0;
|
||||
|
@ -218,7 +218,7 @@ struct ObAggCellBasicInfo
|
||||
}
|
||||
OB_INLINE bool is_valid() const
|
||||
{
|
||||
return col_offset_ >= 0 && nullptr != agg_expr_ && batch_size_ > 0;
|
||||
return col_offset_ >= 0 && nullptr != agg_expr_ && batch_size_ >= 0;
|
||||
}
|
||||
TO_STRING_KV(K_(col_offset), K_(col_index), KPC_(col_param), K_(agg_expr), K_(batch_size));
|
||||
int32_t col_offset_; // offset in projector
|
||||
@ -256,6 +256,7 @@ public:
|
||||
const bool is_default_datum = false) = 0;
|
||||
virtual int copy_output_row(const int32_t datum_offset);
|
||||
virtual int copy_output_rows(const int32_t datum_offset);
|
||||
virtual int copy_single_output_row(sql::ObEvalCtx &ctx);
|
||||
virtual int collect_result(sql::ObEvalCtx &ctx, bool need_padding);
|
||||
virtual int collect_batch_result_in_group_by(const int64_t distinct_cnt);
|
||||
virtual int can_use_index_info(const blocksstable::ObMicroIndexInfo &index_info, const bool is_cg, bool &can_agg);
|
||||
@ -339,6 +340,7 @@ public:
|
||||
const bool is_default_datum = false) override;
|
||||
virtual int copy_output_row(const int32_t datum_offset) override;
|
||||
virtual int copy_output_rows(const int32_t datum_offset) override;
|
||||
virtual int copy_single_output_row(sql::ObEvalCtx &ctx) override;
|
||||
virtual int collect_result(sql::ObEvalCtx &ctx, bool need_padding) override;
|
||||
virtual int collect_batch_result_in_group_by(const int64_t distinct_cnt) override;
|
||||
virtual bool need_access_data() const override { return exclude_null_; }
|
||||
@ -429,6 +431,7 @@ public:
|
||||
const bool is_default_datum = false) override;
|
||||
virtual int copy_output_row(const int32_t datum_offset) override;
|
||||
virtual int copy_output_rows(const int32_t datum_offset) override;
|
||||
virtual int copy_single_output_row(sql::ObEvalCtx &ctx) override;
|
||||
virtual int collect_result(sql::ObEvalCtx &ctx, bool need_padding) override;
|
||||
virtual int collect_batch_result_in_group_by(const int64_t distinct_cnt) override;
|
||||
virtual int reserve_group_by_buf(const int64_t size) override;
|
||||
@ -556,6 +559,11 @@ public:
|
||||
UNUSED(datum_offset);
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
virtual int copy_single_output_row(sql::ObEvalCtx &ctx) override
|
||||
{
|
||||
UNUSED(ctx);
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
virtual int collect_result(sql::ObEvalCtx &ctx, bool need_padding) override;
|
||||
virtual int collect_batch_result_in_group_by(const int64_t distinct_cnt) override;
|
||||
virtual bool need_access_data() const override { return !finished(); }
|
||||
@ -607,6 +615,7 @@ public:
|
||||
void reset();
|
||||
void reuse();
|
||||
int init(const ObTableAccessParam ¶m, sql::ObEvalCtx &eval_ctx);
|
||||
int init_for_single_row(const ObTableAccessParam ¶m, sql::ObEvalCtx &eval_ctx);
|
||||
// do group by for aggregate cell indicated by 'agg_idx'
|
||||
// datums: batch of datums of this column
|
||||
// count: batch size
|
||||
@ -625,6 +634,7 @@ public:
|
||||
// in the case where can not do batch scan or can not do group by pushdown
|
||||
int copy_output_row(const int64_t batch_idx);
|
||||
int copy_output_rows(const int64_t batch_idx);
|
||||
int copy_single_output_row(sql::ObEvalCtx &ctx);
|
||||
int collect_result();
|
||||
int add_distinct_null_value();
|
||||
// for micro with bitmap, should extract distinct values according bitmap
|
||||
@ -690,6 +700,7 @@ public:
|
||||
const bool init_output = true);
|
||||
DECLARE_TO_STRING;
|
||||
private:
|
||||
int init_agg_cells(const ObTableAccessParam ¶m, sql::ObEvalCtx &eval_ctx, const bool is_for_single_row);
|
||||
static const int64_t DEFAULT_AGG_CELL_CNT = 2;
|
||||
static const int64_t USE_GROUP_BY_READ_CNT_FACTOR = 2;
|
||||
static constexpr double USE_GROUP_BY_DISTINCT_RATIO = 0.5;
|
||||
|
@ -266,8 +266,7 @@ int ObTableAccessParam::init(
|
||||
iter_param_.set_use_iter_pool_flag();
|
||||
}
|
||||
|
||||
if (OB_UNLIKELY(iter_param_.enable_pd_group_by() &&
|
||||
(!iter_param_.vectorized_enabled_ || scan_param.use_index_skip_scan()))) {
|
||||
if (OB_UNLIKELY(iter_param_.enable_pd_group_by() && scan_param.use_index_skip_scan())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
STORAGE_LOG(WARN, "Invalid argument for group by pushdown, vectorize must be enabled and not skip scan",
|
||||
K(ret), K(iter_param_.vectorized_enabled_), K(scan_param.use_index_skip_scan()));
|
||||
|
Reference in New Issue
Block a user