fix bug: topn sort earge filter bugs
This commit is contained in:
@ -42,6 +42,8 @@ int ObSortVecOpEagerFilter<Compare, Store_Row, has_addon>::filter(
|
|||||||
if (OB_UNLIKELY(!is_inited())) {
|
if (OB_UNLIKELY(!is_inited())) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("ObSortVecOpEagerFilter is not initialized", K(ret));
|
LOG_WARN("ObSortVecOpEagerFilter is not initialized", K(ret));
|
||||||
|
} else if (is_by_pass()) {
|
||||||
|
// do nothing
|
||||||
} else {
|
} else {
|
||||||
output_brs_.copy(&input_brs);
|
output_brs_.copy(&input_brs);
|
||||||
if (bucket_heap_->count() < bucket_num_) {
|
if (bucket_heap_->count() < bucket_num_) {
|
||||||
@ -72,10 +74,10 @@ int ObSortVecOpEagerFilter<Compare, Store_Row, has_addon>::update_filter(
|
|||||||
const Store_Row *bucket_head_row, bool &updated) {
|
const Store_Row *bucket_head_row, bool &updated) {
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (!is_inited()) {
|
if (!is_inited()) {
|
||||||
// filter is not initialized, so the filter will not be updated
|
|
||||||
// this is not an error
|
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("is not initialized", K(ret));
|
LOG_WARN("ObSortVecOpEagerFilter is not initialized", K(ret));
|
||||||
|
} else if (is_by_pass()) {
|
||||||
|
updated = false;
|
||||||
} else {
|
} else {
|
||||||
Store_Row *reuse_row = nullptr;
|
Store_Row *reuse_row = nullptr;
|
||||||
if (bucket_heap_->count() < bucket_num_) {
|
if (bucket_heap_->count() < bucket_num_) {
|
||||||
|
|||||||
@ -64,15 +64,15 @@ public:
|
|||||||
bucket_num_ = bucket_num_ < MIN_BUCKET_NUM ? 0 : bucket_num_;
|
bucket_num_ = bucket_num_ < MIN_BUCKET_NUM ? 0 : bucket_num_;
|
||||||
if (bucket_num_ != 0) {
|
if (bucket_num_ != 0) {
|
||||||
bucket_size_ = (topn_cnt + bucket_num_ - 1) / bucket_num_;
|
bucket_size_ = (topn_cnt + bucket_num_ - 1) / bucket_num_;
|
||||||
is_inited_ = true;
|
if (OB_FAIL(init_output_brs(max_batch_size))) {
|
||||||
|
SQL_ENG_LOG(WARN, "init ouput batch rows failed", K(ret));
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
is_by_pass_ = true;
|
is_by_pass_ = true;
|
||||||
SQL_ENG_LOG(INFO, "no need to use filter ", K(dumped_rows_cnt),
|
SQL_ENG_LOG(INFO, "no need to use filter ", K(dumped_rows_cnt),
|
||||||
K(topn_cnt));
|
K(topn_cnt));
|
||||||
}
|
}
|
||||||
if (OB_FAIL(init_output_brs(max_batch_size))) {
|
is_inited_ = true;
|
||||||
SQL_ENG_LOG(WARN, "init ouput batch rows failed", K(ret));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -356,7 +356,9 @@ int ObSortVecOpImpl<Compare, Store_Row, has_addon>::add_batch(const ObBatchRows
|
|||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
const ObBatchRows *input_brs_ptr = nullptr;
|
const ObBatchRows *input_brs_ptr = nullptr;
|
||||||
if (is_topn_sort() && OB_NOT_NULL(topn_filter_) && OB_LIKELY(!topn_filter_->is_by_pass())) {
|
if (is_topn_sort() && OB_NOT_NULL(topn_filter_) && OB_LIKELY(!topn_filter_->is_by_pass())) {
|
||||||
if (OB_FAIL(topn_filter_->filter(all_exprs_, *eval_ctx_, start_pos,
|
if (OB_FAIL(batch_eval_vector(all_exprs_, input_brs))) {
|
||||||
|
SQL_ENG_LOG(WARN, "failed to eval vector", K(ret));
|
||||||
|
} else if (OB_FAIL(topn_filter_->filter(all_exprs_, *eval_ctx_, start_pos,
|
||||||
input_brs))) {
|
input_brs))) {
|
||||||
SQL_ENG_LOG(WARN, "failed to do topn filter", K(ret));
|
SQL_ENG_LOG(WARN, "failed to do topn filter", K(ret));
|
||||||
} else {
|
} else {
|
||||||
@ -1048,14 +1050,18 @@ template <typename Compare, typename Store_Row, bool has_addon>
|
|||||||
int ObSortVecOpImpl<Compare, Store_Row, has_addon>::eager_topn_filter_update(
|
int ObSortVecOpImpl<Compare, Store_Row, has_addon>::eager_topn_filter_update(
|
||||||
const common::ObIArray<Store_Row *> *sorted_dumped_rows) {
|
const common::ObIArray<Store_Row *> *sorted_dumped_rows) {
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
int64_t dumped_rows_count = sorted_dumped_rows->count();
|
if (topn_filter_->is_by_pass()) {
|
||||||
int64_t bucket_size = topn_filter_->bucket_size();
|
// do nothing
|
||||||
bool updated = true;
|
} else {
|
||||||
for (int64_t i = bucket_size - 1;
|
int64_t dumped_rows_count = sorted_dumped_rows->count();
|
||||||
OB_SUCC(ret) && updated && i < dumped_rows_count; i += bucket_size) {
|
int64_t bucket_size = topn_filter_->bucket_size();
|
||||||
if (OB_FAIL(
|
bool updated = true;
|
||||||
topn_filter_->update_filter(sorted_dumped_rows->at(i), updated))) {
|
for (int64_t i = bucket_size - 1;
|
||||||
SQL_ENG_LOG(WARN, "failed to eager topn filter update", K(ret));
|
OB_SUCC(ret) && updated && i < dumped_rows_count; i += bucket_size) {
|
||||||
|
if (OB_FAIL(
|
||||||
|
topn_filter_->update_filter(sorted_dumped_rows->at(i), updated))) {
|
||||||
|
SQL_ENG_LOG(WARN, "failed to eager topn filter update", K(ret));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
|
|||||||
@ -1474,8 +1474,6 @@ int ObBasicSessionInfo::get_influence_plan_sys_var(ObSysVarInPC &sys_vars) const
|
|||||||
LOG_ERROR("influence plan system var is NULL", K(i), K(ret));
|
LOG_ERROR("influence plan system var is NULL", K(i), K(ret));
|
||||||
} else if (OB_FAIL(sys_vars.push_back(sys_vars_[index]->get_value()))) {
|
} else if (OB_FAIL(sys_vars.push_back(sys_vars_[index]->get_value()))) {
|
||||||
LOG_WARN("influence plan system variables push failed", K(ret));
|
LOG_WARN("influence plan system variables push failed", K(ret));
|
||||||
} else {
|
|
||||||
LOG_WARN("luofan test get_influence_plan_sys_var", KPC(sys_vars_[index]));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
|
|||||||
Reference in New Issue
Block a user