Fix the memory inflation issue in the rescan scenario.

This commit is contained in:
XIAO-HOU 2023-11-27 08:37:49 +00:00 committed by ob-robot
parent b443d65ac2
commit 2636ee3b6d
4 changed files with 54 additions and 25 deletions

View File

@ -96,11 +96,18 @@ public:
{
return (read_info_ != nullptr && read_info_ != rowkey_read_info_) ? read_info_->get_group_idx_col_index() : common::OB_INVALID_INDEX;
}
bool can_be_reused(const uint32_t cg_idx, const sql::ObExpr *expr) const
bool can_be_reused(const uint32_t cg_idx, const common::ObIArray<sql::ObExpr*> &exprs, const bool is_aggregate)
{
// there is only one column in cg now
bool can_reuse = cg_idx == cg_idx_ && nullptr != output_exprs_
&& 1 == output_exprs_->count() && output_exprs_->at(0) == expr;
bool can_reuse = cg_idx == cg_idx && enable_pd_aggregate() == is_aggregate
&& nullptr != output_exprs_ && output_exprs_->count() == exprs.count() ;
if (can_reuse) {
for (int64_t i = 0; i < exprs.count(); ++i) {
if (output_exprs_->at(i) != exprs.at(i)) {
can_reuse = false;
break;
}
}
}
return can_reuse;
}
OB_INLINE bool need_fill_group_idx() const

View File

@ -37,6 +37,28 @@ int ObCGIterParamPool::get_iter_param(
const int32_t cg_idx,
const ObTableIterParam &row_param,
sql::ObExpr *expr,
ObTableIterParam *&iter_param)
{
int ret = OB_SUCCESS;
iter_param = nullptr;
if (OB_UNLIKELY(0 > cg_idx || !row_param.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("Invalid argument", K(ret), K(cg_idx), K(row_param));
} else {
common::ObSEArray<sql::ObExpr*, 1> exprs;
if (nullptr != expr && OB_FAIL(exprs.push_back(expr))) {
LOG_WARN("Fail to push back", K(ret));
} else if (OB_FAIL(get_iter_param(cg_idx, row_param, exprs, iter_param))) {
LOG_WARN("Fail to get cg iter param", K(ret));
}
}
return ret;
}
int ObCGIterParamPool::get_iter_param(
const int32_t cg_idx,
const ObTableIterParam &row_param,
const common::ObIArray<sql::ObExpr*> &exprs,
ObTableIterParam *&iter_param,
const bool is_aggregate)
{
@ -45,26 +67,21 @@ int ObCGIterParamPool::get_iter_param(
if (OB_UNLIKELY(0 > cg_idx || !row_param.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("Invalid argument", K(ret), K(cg_idx), K(row_param));
} else if (!is_aggregate) {
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < iter_params_.count(); ++i) {
ObTableIterParam* tmp_param = iter_params_.at(i);
if (OB_ISNULL(tmp_param)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected null iter param", K(ret));
} else if (tmp_param->can_be_reused(cg_idx, expr)) {
} else if (tmp_param->can_be_reused(cg_idx, exprs, is_aggregate)) {
iter_param = tmp_param;
break;
}
}
}
if (OB_FAIL(ret) || OB_NOT_NULL(iter_param)) {
} else {
common::ObSEArray<sql::ObExpr*, 1> exprs;
if (OB_FAIL(exprs.push_back(expr))) {
LOG_WARN("Fail to push back", K(ret));
} else if (OB_FAIL(new_iter_param(cg_idx, row_param, exprs, iter_param, is_aggregate))) {
LOG_WARN("Fail to new cg iter param", K(ret));
}
} else if (OB_FAIL(new_iter_param(cg_idx, row_param, exprs, iter_param, is_aggregate))) {
LOG_WARN("Fail to new cg iter param", K(ret));
}
return ret;
}
@ -102,11 +119,11 @@ int ObCGIterParamPool::new_iter_param(
}
} else if (OB_FAIL(fill_cg_iter_param(row_param, cg_idx, exprs, *iter_param))) {
STORAGE_LOG(WARN, "Failed to mock cg iter param", K(ret), K(row_param), K(cg_idx), K(exprs));
} else if (OB_FAIL(put_iter_param(iter_param))) {
LOG_WARN("Fail to put iter param", K(ret));
}
if (OB_SUCC(ret)) {
if (!is_aggregate) {
if (OB_FAIL(put_iter_param(iter_param))) {
LOG_WARN("Fail to put iter param", K(ret));
} else if (!is_aggregate) {
iter_param->disable_pd_aggregate();
}
} else if (nullptr != iter_param) {

View File

@ -28,18 +28,23 @@ public:
~ObCGIterParamPool() { reset(); }
void reset();
int get_iter_param(
const int32_t cg_idx,
const ObTableIterParam &row_param,
sql::ObExpr *expr,
ObTableIterParam *&iter_param,
const bool is_aggregate = false);
const int32_t cg_idx,
const ObTableIterParam &row_param,
sql::ObExpr *expr,
ObTableIterParam *&iter_param);
int get_iter_param(
const int32_t cg_idx,
const ObTableIterParam &row_param,
const common::ObIArray<sql::ObExpr*> &exprs,
ObTableIterParam *&iter_param,
const bool is_aggregate = false);
private:
int new_iter_param(
const int32_t cg_idx,
const ObTableIterParam &row_param,
const common::ObIArray<sql::ObExpr*> &exprs,
ObTableIterParam *&iter_param,
const bool is_aggregate = false);
private:
int fill_cg_iter_param(
const ObTableIterParam &row_param,
const int32_t cg_idx,

View File

@ -423,7 +423,7 @@ int ObCOSSTableRowScanner::construct_cg_iter_params(
} else if (0 == row_param.output_exprs_->count()) {
const uint32_t cg_idx = OB_CS_VIRTUAL_CG_IDX;
if (project_single_row) {
} else if (OB_FAIL(cg_param_pool_->new_iter_param(cg_idx, row_param, *row_param.output_exprs_,
} else if (OB_FAIL(cg_param_pool_->get_iter_param(cg_idx, row_param, *row_param.output_exprs_,
cg_param, row_param.enable_pd_aggregate()))) {
LOG_WARN("Fail to get cg iter param", K(ret), K(cg_idx), K(row_param));
} else if (OB_FAIL(iter_params.push_back(cg_param))) {
@ -475,7 +475,7 @@ int ObCOSSTableRowScanner::construct_cg_agg_iter_params(
} else if (0 == row_param.output_exprs_->count()) {
// only COUNT(*) and without filter
const uint32_t cg_idx = OB_CS_VIRTUAL_CG_IDX;
if (OB_FAIL(cg_param_pool_->new_iter_param(cg_idx, row_param, *row_param.aggregate_exprs_,
if (OB_FAIL(cg_param_pool_->get_iter_param(cg_idx, row_param, *row_param.aggregate_exprs_,
cg_param, row_param.enable_pd_aggregate()))) {
LOG_WARN("Fail to get cg iter param", K(ret), K(cg_idx), K(row_param));
} else if (OB_FAIL(iter_params.push_back(cg_param))) {
@ -510,7 +510,7 @@ int ObCOSSTableRowScanner::construct_cg_agg_iter_params(
}
}
if (OB_FAIL(ret) || 0 == exprs.count()) {
} else if (OB_FAIL(cg_param_pool_->new_iter_param(cg_idx, row_param, exprs, cg_param, row_param.enable_pd_aggregate()))) {
} else if (OB_FAIL(cg_param_pool_->get_iter_param(cg_idx, row_param, exprs, cg_param, row_param.enable_pd_aggregate()))) {
LOG_WARN("Fail to get cg iter param", K(ret), K(cg_idx), K(row_param));
} else if (OB_FAIL(iter_params.push_back(cg_param))) {
LOG_WARN("Fail to push back cg iter param", K(ret), K(cg_param));