Fix the memory inflation issue in the rescan scenario.
This commit is contained in:
@ -96,11 +96,18 @@ public:
|
|||||||
{
|
{
|
||||||
return (read_info_ != nullptr && read_info_ != rowkey_read_info_) ? read_info_->get_group_idx_col_index() : common::OB_INVALID_INDEX;
|
return (read_info_ != nullptr && read_info_ != rowkey_read_info_) ? read_info_->get_group_idx_col_index() : common::OB_INVALID_INDEX;
|
||||||
}
|
}
|
||||||
bool can_be_reused(const uint32_t cg_idx, const sql::ObExpr *expr) const
|
bool can_be_reused(const uint32_t cg_idx, const common::ObIArray<sql::ObExpr*> &exprs, const bool is_aggregate)
|
||||||
{
|
{
|
||||||
// there is only one column in cg now
|
bool can_reuse = cg_idx == cg_idx && enable_pd_aggregate() == is_aggregate
|
||||||
bool can_reuse = cg_idx == cg_idx_ && nullptr != output_exprs_
|
&& nullptr != output_exprs_ && output_exprs_->count() == exprs.count() ;
|
||||||
&& 1 == output_exprs_->count() && output_exprs_->at(0) == expr;
|
if (can_reuse) {
|
||||||
|
for (int64_t i = 0; i < exprs.count(); ++i) {
|
||||||
|
if (output_exprs_->at(i) != exprs.at(i)) {
|
||||||
|
can_reuse = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
return can_reuse;
|
return can_reuse;
|
||||||
}
|
}
|
||||||
OB_INLINE bool need_fill_group_idx() const
|
OB_INLINE bool need_fill_group_idx() const
|
||||||
|
|||||||
@ -37,6 +37,28 @@ int ObCGIterParamPool::get_iter_param(
|
|||||||
const int32_t cg_idx,
|
const int32_t cg_idx,
|
||||||
const ObTableIterParam &row_param,
|
const ObTableIterParam &row_param,
|
||||||
sql::ObExpr *expr,
|
sql::ObExpr *expr,
|
||||||
|
ObTableIterParam *&iter_param)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
iter_param = nullptr;
|
||||||
|
if (OB_UNLIKELY(0 > cg_idx || !row_param.is_valid())) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("Invalid argument", K(ret), K(cg_idx), K(row_param));
|
||||||
|
} else {
|
||||||
|
common::ObSEArray<sql::ObExpr*, 1> exprs;
|
||||||
|
if (nullptr != expr && OB_FAIL(exprs.push_back(expr))) {
|
||||||
|
LOG_WARN("Fail to push back", K(ret));
|
||||||
|
} else if (OB_FAIL(get_iter_param(cg_idx, row_param, exprs, iter_param))) {
|
||||||
|
LOG_WARN("Fail to get cg iter param", K(ret));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObCGIterParamPool::get_iter_param(
|
||||||
|
const int32_t cg_idx,
|
||||||
|
const ObTableIterParam &row_param,
|
||||||
|
const common::ObIArray<sql::ObExpr*> &exprs,
|
||||||
ObTableIterParam *&iter_param,
|
ObTableIterParam *&iter_param,
|
||||||
const bool is_aggregate)
|
const bool is_aggregate)
|
||||||
{
|
{
|
||||||
@ -45,26 +67,21 @@ int ObCGIterParamPool::get_iter_param(
|
|||||||
if (OB_UNLIKELY(0 > cg_idx || !row_param.is_valid())) {
|
if (OB_UNLIKELY(0 > cg_idx || !row_param.is_valid())) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("Invalid argument", K(ret), K(cg_idx), K(row_param));
|
LOG_WARN("Invalid argument", K(ret), K(cg_idx), K(row_param));
|
||||||
} else if (!is_aggregate) {
|
} else {
|
||||||
for (int64_t i = 0; OB_SUCC(ret) && i < iter_params_.count(); ++i) {
|
for (int64_t i = 0; OB_SUCC(ret) && i < iter_params_.count(); ++i) {
|
||||||
ObTableIterParam* tmp_param = iter_params_.at(i);
|
ObTableIterParam* tmp_param = iter_params_.at(i);
|
||||||
if (OB_ISNULL(tmp_param)) {
|
if (OB_ISNULL(tmp_param)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("Unexpected null iter param", K(ret));
|
LOG_WARN("Unexpected null iter param", K(ret));
|
||||||
} else if (tmp_param->can_be_reused(cg_idx, expr)) {
|
} else if (tmp_param->can_be_reused(cg_idx, exprs, is_aggregate)) {
|
||||||
iter_param = tmp_param;
|
iter_param = tmp_param;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (OB_FAIL(ret) || OB_NOT_NULL(iter_param)) {
|
if (OB_FAIL(ret) || OB_NOT_NULL(iter_param)) {
|
||||||
} else {
|
} else if (OB_FAIL(new_iter_param(cg_idx, row_param, exprs, iter_param, is_aggregate))) {
|
||||||
common::ObSEArray<sql::ObExpr*, 1> exprs;
|
LOG_WARN("Fail to new cg iter param", K(ret));
|
||||||
if (OB_FAIL(exprs.push_back(expr))) {
|
|
||||||
LOG_WARN("Fail to push back", K(ret));
|
|
||||||
} else if (OB_FAIL(new_iter_param(cg_idx, row_param, exprs, iter_param, is_aggregate))) {
|
|
||||||
LOG_WARN("Fail to new cg iter param", K(ret));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -102,11 +119,11 @@ int ObCGIterParamPool::new_iter_param(
|
|||||||
}
|
}
|
||||||
} else if (OB_FAIL(fill_cg_iter_param(row_param, cg_idx, exprs, *iter_param))) {
|
} else if (OB_FAIL(fill_cg_iter_param(row_param, cg_idx, exprs, *iter_param))) {
|
||||||
STORAGE_LOG(WARN, "Failed to mock cg iter param", K(ret), K(row_param), K(cg_idx), K(exprs));
|
STORAGE_LOG(WARN, "Failed to mock cg iter param", K(ret), K(row_param), K(cg_idx), K(exprs));
|
||||||
} else if (OB_FAIL(put_iter_param(iter_param))) {
|
|
||||||
LOG_WARN("Fail to put iter param", K(ret));
|
|
||||||
}
|
}
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
if (!is_aggregate) {
|
if (OB_FAIL(put_iter_param(iter_param))) {
|
||||||
|
LOG_WARN("Fail to put iter param", K(ret));
|
||||||
|
} else if (!is_aggregate) {
|
||||||
iter_param->disable_pd_aggregate();
|
iter_param->disable_pd_aggregate();
|
||||||
}
|
}
|
||||||
} else if (nullptr != iter_param) {
|
} else if (nullptr != iter_param) {
|
||||||
|
|||||||
@ -28,18 +28,23 @@ public:
|
|||||||
~ObCGIterParamPool() { reset(); }
|
~ObCGIterParamPool() { reset(); }
|
||||||
void reset();
|
void reset();
|
||||||
int get_iter_param(
|
int get_iter_param(
|
||||||
const int32_t cg_idx,
|
const int32_t cg_idx,
|
||||||
const ObTableIterParam &row_param,
|
const ObTableIterParam &row_param,
|
||||||
sql::ObExpr *expr,
|
sql::ObExpr *expr,
|
||||||
ObTableIterParam *&iter_param,
|
ObTableIterParam *&iter_param);
|
||||||
const bool is_aggregate = false);
|
int get_iter_param(
|
||||||
|
const int32_t cg_idx,
|
||||||
|
const ObTableIterParam &row_param,
|
||||||
|
const common::ObIArray<sql::ObExpr*> &exprs,
|
||||||
|
ObTableIterParam *&iter_param,
|
||||||
|
const bool is_aggregate = false);
|
||||||
|
private:
|
||||||
int new_iter_param(
|
int new_iter_param(
|
||||||
const int32_t cg_idx,
|
const int32_t cg_idx,
|
||||||
const ObTableIterParam &row_param,
|
const ObTableIterParam &row_param,
|
||||||
const common::ObIArray<sql::ObExpr*> &exprs,
|
const common::ObIArray<sql::ObExpr*> &exprs,
|
||||||
ObTableIterParam *&iter_param,
|
ObTableIterParam *&iter_param,
|
||||||
const bool is_aggregate = false);
|
const bool is_aggregate = false);
|
||||||
private:
|
|
||||||
int fill_cg_iter_param(
|
int fill_cg_iter_param(
|
||||||
const ObTableIterParam &row_param,
|
const ObTableIterParam &row_param,
|
||||||
const int32_t cg_idx,
|
const int32_t cg_idx,
|
||||||
|
|||||||
@ -423,7 +423,7 @@ int ObCOSSTableRowScanner::construct_cg_iter_params(
|
|||||||
} else if (0 == row_param.output_exprs_->count()) {
|
} else if (0 == row_param.output_exprs_->count()) {
|
||||||
const uint32_t cg_idx = OB_CS_VIRTUAL_CG_IDX;
|
const uint32_t cg_idx = OB_CS_VIRTUAL_CG_IDX;
|
||||||
if (project_single_row) {
|
if (project_single_row) {
|
||||||
} else if (OB_FAIL(cg_param_pool_->new_iter_param(cg_idx, row_param, *row_param.output_exprs_,
|
} else if (OB_FAIL(cg_param_pool_->get_iter_param(cg_idx, row_param, *row_param.output_exprs_,
|
||||||
cg_param, row_param.enable_pd_aggregate()))) {
|
cg_param, row_param.enable_pd_aggregate()))) {
|
||||||
LOG_WARN("Fail to get cg iter param", K(ret), K(cg_idx), K(row_param));
|
LOG_WARN("Fail to get cg iter param", K(ret), K(cg_idx), K(row_param));
|
||||||
} else if (OB_FAIL(iter_params.push_back(cg_param))) {
|
} else if (OB_FAIL(iter_params.push_back(cg_param))) {
|
||||||
@ -475,7 +475,7 @@ int ObCOSSTableRowScanner::construct_cg_agg_iter_params(
|
|||||||
} else if (0 == row_param.output_exprs_->count()) {
|
} else if (0 == row_param.output_exprs_->count()) {
|
||||||
// only COUNT(*) and without filter
|
// only COUNT(*) and without filter
|
||||||
const uint32_t cg_idx = OB_CS_VIRTUAL_CG_IDX;
|
const uint32_t cg_idx = OB_CS_VIRTUAL_CG_IDX;
|
||||||
if (OB_FAIL(cg_param_pool_->new_iter_param(cg_idx, row_param, *row_param.aggregate_exprs_,
|
if (OB_FAIL(cg_param_pool_->get_iter_param(cg_idx, row_param, *row_param.aggregate_exprs_,
|
||||||
cg_param, row_param.enable_pd_aggregate()))) {
|
cg_param, row_param.enable_pd_aggregate()))) {
|
||||||
LOG_WARN("Fail to get cg iter param", K(ret), K(cg_idx), K(row_param));
|
LOG_WARN("Fail to get cg iter param", K(ret), K(cg_idx), K(row_param));
|
||||||
} else if (OB_FAIL(iter_params.push_back(cg_param))) {
|
} else if (OB_FAIL(iter_params.push_back(cg_param))) {
|
||||||
@ -510,7 +510,7 @@ int ObCOSSTableRowScanner::construct_cg_agg_iter_params(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (OB_FAIL(ret) || 0 == exprs.count()) {
|
if (OB_FAIL(ret) || 0 == exprs.count()) {
|
||||||
} else if (OB_FAIL(cg_param_pool_->new_iter_param(cg_idx, row_param, exprs, cg_param, row_param.enable_pd_aggregate()))) {
|
} else if (OB_FAIL(cg_param_pool_->get_iter_param(cg_idx, row_param, exprs, cg_param, row_param.enable_pd_aggregate()))) {
|
||||||
LOG_WARN("Fail to get cg iter param", K(ret), K(cg_idx), K(row_param));
|
LOG_WARN("Fail to get cg iter param", K(ret), K(cg_idx), K(row_param));
|
||||||
} else if (OB_FAIL(iter_params.push_back(cg_param))) {
|
} else if (OB_FAIL(iter_params.push_back(cg_param))) {
|
||||||
LOG_WARN("Fail to push back cg iter param", K(ret), K(cg_param));
|
LOG_WARN("Fail to push back cg iter param", K(ret), K(cg_param));
|
||||||
|
|||||||
Reference in New Issue
Block a user