fix unexpected split filter params when batch rescan

This commit is contained in:
Hongqin-Li 2024-11-19 12:17:33 +00:00 committed by ob-robot
parent 1f5f29ca37
commit e0d05358e5
8 changed files with 69 additions and 14 deletions

View File

@ -145,6 +145,7 @@ int ObDASMergeIter::set_merge_status(MergeType merge_type)
{
int ret = OB_SUCCESS;
merge_type_ = used_for_keep_order_ ? MergeType::SORT_MERGE : merge_type;
first_get_row_ = true;
if (merge_type == MergeType::SEQUENTIAL_MERGE) {
get_next_row_ = &ObDASMergeIter::get_next_seq_row;
get_next_rows_ = &ObDASMergeIter::get_next_seq_rows;
@ -370,6 +371,9 @@ int ObDASMergeIter::inner_get_next_row()
LOG_WARN("das iter failed to get next row", K(ret));
}
}
if (OB_UNLIKELY(first_get_row_)) {
first_get_row_ = false;
}
return ret;
}
@ -382,6 +386,9 @@ int ObDASMergeIter::inner_get_next_rows(int64_t &count, int64_t capacity)
LOG_WARN("das merge iter failed to get next rows", K(ret));
}
}
if (OB_UNLIKELY(first_get_row_)) {
first_get_row_ = false;
}
LOG_DEBUG("[DAS ITER] merge iter get next rows end", K(count), K(merge_type_), K(merge_state_arr_), K(ret));
const ObBitVector *skip = nullptr;
PRINT_VECTORIZED_ROWS(SQL, DEBUG, *eval_ctx_, *output_, count, skip);
@ -503,7 +510,12 @@ int ObDASMergeIter::get_next_seq_row()
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected das task op type", K(ret));
} else {
if (OB_SUCC(scan_op->get_output_result_iter()->get_next_row())) {
if (first_get_row_) {
scan_op->get_scan_param().need_update_tablet_param_ = true;
}
ret = scan_op->get_output_result_iter()->get_next_row();
scan_op->get_scan_param().need_update_tablet_param_ = false;
if (OB_SUCC(ret)) {
got_row = true;
} else if (OB_ITER_END == ret) {
++seq_task_idx_;
@ -512,6 +524,7 @@ int ObDASMergeIter::get_next_seq_row()
} else {
ret = OB_SUCCESS;
scan_op = DAS_SCAN_OP(das_tasks_arr_.at(seq_task_idx_));
scan_op->get_scan_param().need_update_tablet_param_ = true;
if (need_update_partition_id_) {
if (OB_FAIL(update_output_tablet_id(scan_op))) {
LOG_WARN("failed to update output tablet id", K(ret), K(scan_op->get_tablet_loc()->tablet_id_));
@ -548,7 +561,11 @@ int ObDASMergeIter::get_next_seq_rows(int64_t &count, int64_t capacity)
reset_datum_ptr(scan_op, capacity);
}
count = 0;
if (first_get_row_) {
scan_op->get_scan_param().need_update_tablet_param_ = true;
}
ret = scan_op->get_output_result_iter()->get_next_rows(count, capacity);
scan_op->get_scan_param().need_update_tablet_param_ = false;
if (OB_ITER_END == ret && count > 0) {
ret = OB_SUCCESS;
}
@ -564,6 +581,7 @@ int ObDASMergeIter::get_next_seq_rows(int64_t &count, int64_t capacity)
} else {
ret = OB_SUCCESS;
scan_op = DAS_SCAN_OP(das_tasks_arr_.at(seq_task_idx_));
scan_op->get_scan_param().need_update_tablet_param_ = true;
if (need_update_partition_id_) {
if (OB_FAIL(update_output_tablet_id(scan_op))) {
LOG_WARN("update output tablet id failed", K(ret), K(scan_op->get_tablet_loc()->tablet_id_));
@ -594,6 +612,7 @@ int ObDASMergeIter::get_next_sorted_row()
if (OB_ISNULL(scan_op)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected das task op type", K(ret), KPC(das_tasks_arr_[i]));
} else if (FALSE_IT(scan_op->get_scan_param().need_update_tablet_param_ = true)) {
} else if (OB_SUCC(scan_op->get_output_result_iter()->get_next_row())) {
if (OB_FAIL(merge_store_rows_arr_[i].save(false, 1))) {
LOG_WARN("failed to save store row", K(ret));
@ -679,6 +698,7 @@ int ObDASMergeIter::get_next_sorted_rows(int64_t &count, int64_t capacity)
reset_datum_ptr(scan_op, capacity);
}
count = 0;
scan_op->get_scan_param().need_update_tablet_param_ = true;
ret = scan_op->get_output_result_iter()->get_next_rows(count, capacity);
if (OB_ITER_END == ret && count > 0) {
ret = OB_SUCCESS;

View File

@ -127,6 +127,7 @@ public:
das_tasks_arr_(),
get_next_row_(nullptr),
get_next_rows_(nullptr),
first_get_row_(true),
seq_task_idx_(OB_INVALID_INDEX),
group_id_idx_(OB_INVALID_INDEX),
need_prepare_sort_merge_info_(false),
@ -209,6 +210,7 @@ private:
DasTaskArray das_tasks_arr_;
int (ObDASMergeIter::*get_next_row_)();
int (ObDASMergeIter::*get_next_rows_)(int64_t&, int64_t);
bool first_get_row_;
/********* SEQUENTIAL MERGE BEGIN *********/
int64_t seq_task_idx_;

View File

@ -302,7 +302,8 @@ DEF_TO_STRING(ObTableScanParam)
K_(fb_read_tx_uncommitted),
K_(external_file_format),
K_(external_file_location),
K_(tx_seq_base));
K_(tx_seq_base),
K_(need_update_tablet_param));
J_OBJ_END();
return pos;
}

View File

@ -138,7 +138,8 @@ public:
need_switch_param_(false),
is_mds_query_(false),
is_thread_scope_(true),
tx_seq_base_(-1)
tx_seq_base_(-1),
need_update_tablet_param_(false)
{}
virtual ~ObTableScanParam() {}
public:
@ -172,6 +173,7 @@ public:
bool is_thread_scope_;
ObRangeArray ss_key_ranges_; // used for index skip scan, use as postfix range for ObVTableScanParam::key_ranges_
int64_t tx_seq_base_; // used by lob when main table is read_latest
bool need_update_tablet_param_; // whether need to update tablet-level param, such as split filter param
DECLARE_VIRTUAL_TO_STRING;
private:

View File

@ -411,6 +411,8 @@ int ObMultipleMerge::get_next_row(ObDatumRow *&row)
} else if (FALSE_IT(not_using_static_engine = (nullptr == access_param_->output_exprs_))) {
} else if (access_param_->iter_param_.enable_pd_aggregate()) {
ret = get_next_aggregate_row(row);
} else if (OB_FAIL(refresh_filter_params_on_demand(false/*is_open*/))) {
LOG_WARN("failed to refresh split params on demand", K(ret));
} else {
row = nullptr;
if (need_padding_) {
@ -544,6 +546,8 @@ int ObMultipleMerge::get_next_normal_rows(int64_t &count, int64_t capacity)
if (OB_FAIL(ret)) {
} else if (OB_FAIL(refresh_table_on_demand())) {
LOG_WARN("fail to refresh table on demand", K(ret));
} else if (OB_FAIL(refresh_filter_params_on_demand(false/*is_open*/))) {
LOG_WARN("failed to refresh split params on demand", K(ret));
} else {
ObVectorStore *vector_store = reinterpret_cast<ObVectorStore *>(block_row_store_);
int64_t batch_size = min(capacity, access_param_->get_op()->get_batch_size());
@ -649,6 +653,8 @@ int ObMultipleMerge::get_next_aggregate_row(ObDatumRow *&row)
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect aggregate pushdown status", K(ret),
K(access_ctx_->range_array_pos_->count()));
} else if (OB_FAIL(refresh_filter_params_on_demand(false/*is_open*/))) {
LOG_WARN("failed to refersh split params on demand", K(ret));
} else {
ObBlockBatchedRowStore *batch_row_store = static_cast<ObBlockBatchedRowStore *>(block_row_store_);
if (OB_NOT_NULL(access_param_->get_op())) {
@ -1032,15 +1038,8 @@ int ObMultipleMerge::open()
}
}
if (OB_SUCC(ret)) {
// fill auto split params if need
const ObTableIterParam &iter_param = access_param_->iter_param_;
const int64_t table_id = access_param_->iter_param_.table_id_;
if (OB_NOT_NULL(iter_param.auto_split_filter_) && iter_param.auto_split_filter_type_ < static_cast<uint64_t>(ObTabletSplitType::MAX_TYPE)) {
ObPartitionSplitQuery split_query;
if (OB_FAIL(split_query.fill_auto_split_params(iter_param.tablet_id_, iter_param.ls_id_,
iter_param.op_, iter_param.auto_split_filter_type_, iter_param.auto_split_params_, *access_ctx_->stmt_allocator_))) {
LOG_WARN("fail to fill split params.", K(ret));
}
if (OB_FAIL(refresh_filter_params_on_demand(true/*is_open*/))) {
LOG_WARN("fail to fill split params", K(ret));
}
}
if (OB_SUCC(ret)) {
@ -1289,6 +1288,28 @@ int ObMultipleMerge::check_filtered(const ObDatumRow &row, bool &filtered)
return ret;
}
int ObMultipleMerge::refresh_filter_params_on_demand(const bool is_open)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "ObMultipleMerge has not been inited", K(ret));
} else {
const ObTableIterParam &iter_param = access_param_->iter_param_;
const int64_t table_id = access_param_->iter_param_.table_id_;
const bool has_split_filter = OB_NOT_NULL(iter_param.auto_split_filter_)
&& iter_param.auto_split_filter_type_ < static_cast<uint64_t>(ObTabletSplitType::MAX_TYPE);
if (has_split_filter && (is_open || (OB_NOT_NULL(iter_param.need_update_tablet_param_) && *iter_param.need_update_tablet_param_))) {
ObPartitionSplitQuery split_query;
if (OB_FAIL(split_query.fill_auto_split_params(iter_param.tablet_id_, iter_param.ls_id_,
iter_param.op_, iter_param.auto_split_filter_type_, iter_param.auto_split_params_, *access_ctx_->stmt_allocator_))) {
LOG_WARN("fail to fill split params.", K(ret));
}
}
}
return ret;
}
int ObMultipleMerge::add_iterator(ObStoreRowIterator &iter)
{
int ret = OB_SUCCESS;

View File

@ -131,6 +131,7 @@ private:
void report_tablet_stat();
int update_and_report_tablet_stat();
void inner_reset();
int refresh_filter_params_on_demand(const bool is_open);
OB_INLINE bool can_use_vec2(); // need to remove after statistical info pushdown support vec 2.0
protected:

View File

@ -61,7 +61,8 @@ ObTableIterParam::ObTableIterParam()
auto_split_filter_(nullptr),
auto_split_params_(nullptr),
is_tablet_spliting_(false),
is_column_replica_table_(false)
is_column_replica_table_(false),
need_update_tablet_param_(nullptr)
{}
ObTableIterParam::~ObTableIterParam()
@ -115,6 +116,7 @@ void ObTableIterParam::reset()
is_tablet_spliting_ = false;
is_column_replica_table_ = false;
ObSSTableIndexFilterFactory::destroy_sstable_index_filter(sstable_index_filter_);
need_update_tablet_param_ = nullptr;
}
bool ObTableIterParam::is_valid() const
@ -220,7 +222,11 @@ DEF_TO_STRING(ObTableIterParam)
K_(is_non_unique_local_index),
K_(ss_rowkey_prefix_cnt),
K_(table_scan_opt),
K_(is_tablet_spliting));
K_(auto_split_filter_type),
KP_(auto_split_filter),
KPC_(auto_split_params),
K_(is_tablet_spliting),
KP_(need_update_tablet_param));
J_OBJ_END();
return pos;
}
@ -361,6 +367,7 @@ int ObTableAccessParam::init(
OB_FAIL(get_prefix_cnt_for_skip_scan(scan_param, iter_param_))) {
STORAGE_LOG(WARN, "Failed to get prefix for skip scan", K(ret));
} else {
iter_param_.need_update_tablet_param_ = &scan_param.need_update_tablet_param_;
is_inited_ = true;
}
}

View File

@ -228,6 +228,7 @@ public:
sql::ExprFixedArray *auto_split_params_;
bool is_tablet_spliting_;
bool is_column_replica_table_;
const bool *need_update_tablet_param_;
};
struct ObTableAccessParam