Add defensive code and some test cases for group by pushdown.
This commit is contained in:
parent
eeb16f9bd7
commit
84a10a7475
@ -5957,10 +5957,17 @@ int ObLogPlan::check_table_columns_can_storage_pushdown(const uint64_t tenant_id
|
||||
OB_UNLIKELY(!pushdown_groupby_columns.at(0)->is_column_ref_expr())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get unexpected null", K(ret));
|
||||
} else if (OB_FALSE_IT(column = static_cast<ObColumnRefRawExpr*>(pushdown_groupby_columns.at(0)))) {
|
||||
} else if (FALSE_IT(schema_guard = get_optimizer_context().get_schema_guard())) {
|
||||
} else if (OB_FAIL(schema_guard->get_table_schema(tenant_id, table_id, table_schema))) {
|
||||
LOG_WARN("get table schema failed", K(ret));
|
||||
} else if (OB_FAIL(table_schema->get_rowkey_info().get_column_id(0, first_column_id))) {
|
||||
LOG_WARN("failed to get first rowkey column id", K(ret));
|
||||
} else if (column->get_column_id() == first_column_id) {
|
||||
can_push = false;
|
||||
} else if (OB_UNLIKELY(EN_FORCE_GBY_PUSHDOWN_STORAGE)) {
|
||||
can_push = true;
|
||||
LOG_TRACE("force pushdown group by to storage layer", K(ret), K(can_push));
|
||||
} else if (OB_FALSE_IT(column = static_cast<ObColumnRefRawExpr*>(pushdown_groupby_columns.at(0)))) {
|
||||
} else if (!ObColumnStatParam::is_valid_opt_col_type(column->get_data_type())) {
|
||||
can_push = false;
|
||||
} else if (NULL == (table_meta =
|
||||
@ -5973,13 +5980,6 @@ int ObLogPlan::check_table_columns_can_storage_pushdown(const uint64_t tenant_id
|
||||
LOG_WARN("column meta not find", K(ret), K(*table_meta), K(column));
|
||||
} else if (table_meta->get_micro_block_count() <= 0) {
|
||||
can_push = false;
|
||||
} else if (FALSE_IT(schema_guard = get_optimizer_context().get_schema_guard())) {
|
||||
} else if (OB_FAIL(schema_guard->get_table_schema(tenant_id, table_id, table_schema))) {
|
||||
LOG_WARN("get table schema failed", K(ret));
|
||||
} else if (OB_FAIL(table_schema->get_rowkey_info().get_column_id(0, first_column_id))) {
|
||||
LOG_WARN("failed to get first rowkey column id", K(ret));
|
||||
} else if (column->get_column_id() == first_column_id) {
|
||||
can_push = false;
|
||||
} else {
|
||||
double micro_block_avg_count = table_meta->get_rows() / table_meta->get_micro_block_count();
|
||||
// TODO it's better to use stat of column group in column store
|
||||
|
@ -40,9 +40,8 @@ ObVectorStore::ObVectorStore(
|
||||
default_row_(),
|
||||
group_by_cell_(nullptr),
|
||||
iter_param_(nullptr),
|
||||
skip_bit_(skip_bit),
|
||||
need_check_group_by_(false)
|
||||
{}
|
||||
skip_bit_(skip_bit)
|
||||
{}
|
||||
|
||||
ObVectorStore::~ObVectorStore()
|
||||
{
|
||||
@ -66,7 +65,6 @@ void ObVectorStore::reset()
|
||||
context_.stmt_allocator_->free(group_by_cell_);
|
||||
group_by_cell_ = nullptr;
|
||||
}
|
||||
need_check_group_by_ = false;
|
||||
}
|
||||
|
||||
int ObVectorStore::init(const ObTableAccessParam ¶m, common::hash::ObHashSet<int32_t> *agg_col_mask)
|
||||
@ -204,7 +202,6 @@ int ObVectorStore::check_need_group_by(const ObTableAccessParam ¶m)
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("Invalid argument", K(ret), K(param));
|
||||
} else {
|
||||
need_check_group_by_ = true;
|
||||
ObMemAttr attr(MTL_ID(), common::ObModIds::OB_HASH_BUCKET);
|
||||
common::hash::ObHashSet<int32_t> col_offset_set;
|
||||
const int32_t group_by_col_offset = param.iter_param_.group_by_cols_project_->at(0);
|
||||
@ -218,7 +215,7 @@ int ObVectorStore::check_need_group_by(const ObTableAccessParam ¶m)
|
||||
LOG_WARN("Failed to add column offset", K(ret), K(i), K(col_offset));
|
||||
}
|
||||
}
|
||||
for (int64_t i = 0; OB_SUCC(ret) && need_check_group_by_ && i < param.output_exprs_->count(); ++i) {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < param.output_exprs_->count(); ++i) {
|
||||
if (T_PSEUDO_GROUP_ID == param.output_exprs_->at(i)->type_) {
|
||||
} else if (nullptr == param.output_sel_mask_ || param.output_sel_mask_->at(i)) {
|
||||
int32_t col_offset = param.iter_param_.out_cols_project_->at(i);
|
||||
@ -227,8 +224,8 @@ int ObVectorStore::check_need_group_by(const ObTableAccessParam ¶m)
|
||||
if (OB_HASH_EXIST == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
} else if (OB_HASH_NOT_EXIST == ret) {
|
||||
need_check_group_by_ = false;
|
||||
ret = OB_SUCCESS;
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("Invalid group by pushdown status", K(ret), K(i), K(col_offset));
|
||||
} else {
|
||||
LOG_WARN("Failed to search in hashset", K(ret), K(col_offset));
|
||||
}
|
||||
@ -324,7 +321,7 @@ int ObVectorStore::fill_rows(
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("Unexpected vector store count", K(ret), K_(count), KP_(group_by_cell));
|
||||
} else if (FALSE_IT(reader = scanner.get_reader())) {
|
||||
} else if (need_check_group_by_ && OB_FAIL(check_can_group_by(reader, begin_index, end_index, res, can_group_by))) {
|
||||
} else if (OB_FAIL(check_can_group_by(reader, begin_index, end_index, res, can_group_by))) {
|
||||
LOG_WARN("Failed to checkout pushdown group by", K(ret));
|
||||
} else if (can_group_by) {
|
||||
if (OB_FAIL(fill_group_by_rows(group_idx, reader, begin_index, end_index, res))) {
|
||||
|
@ -108,7 +108,6 @@ protected:
|
||||
ObGroupByCellBase *group_by_cell_;
|
||||
const ObTableIterParam *iter_param_;
|
||||
sql::ObBitVector *skip_bit_;
|
||||
bool need_check_group_by_;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -1099,7 +1099,7 @@ int ObCOSSTableRowScanner::fetch_group_by_rows()
|
||||
} else if (OB_FAIL(group_by_cell_->copy_output_rows(vector_store->get_row_count(), *iter_param_))) {
|
||||
LOG_WARN("Failed to copy output rows", K(ret));
|
||||
}
|
||||
LOG_DEBUG("[GROUP BY PUSHDOWN]", K(ret), KPC(group_by_cell_));
|
||||
LOG_DEBUG("[GROUP BY PUSHDOWN]", K(ret), KPC(group_by_cell_), K(can_group_by));
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user