Fix aggregate pushdown to sst index when refresh table
This commit is contained in:
@ -333,6 +333,7 @@ int ObAggregatedStore::fill_index_info(const blocksstable::ObMicroIndexInfo &ind
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObAggregatedStore is not inited", K(ret), K(*this));
|
||||
} else {
|
||||
set_aggregated_in_prefetch();
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < agg_row_.get_agg_count(); ++i) {
|
||||
ObAggCell *cell = agg_row_.at(i);
|
||||
if (OB_FAIL(cell->eval_index_info(index_info))) {
|
||||
|
@ -126,7 +126,7 @@ public:
|
||||
OB_INLINE void set_end() { iter_end_flag_ = IterEndState::ITER_END; }
|
||||
int check_agg_in_row_mode(const ObTableIterParam &iter_param);
|
||||
bool has_data();
|
||||
TO_STRING_KV(K_(agg_row), K_(agg_flat_row_mode));
|
||||
INHERIT_TO_STRING_KV("ObBlockBatchedRowStore", ObBlockBatchedRowStore, K_(agg_row), K_(agg_flat_row_mode));
|
||||
|
||||
private:
|
||||
ObAggRow agg_row_;
|
||||
|
@ -36,7 +36,8 @@ ObBlockRowStore::ObBlockRowStore(ObTableAccessContext &context)
|
||||
iter_param_(nullptr),
|
||||
can_blockscan_(false),
|
||||
filter_applied_(false),
|
||||
disabled_(false)
|
||||
disabled_(false),
|
||||
is_aggregated_in_prefetch_(false)
|
||||
{}
|
||||
|
||||
ObBlockRowStore::~ObBlockRowStore()
|
||||
@ -50,6 +51,7 @@ void ObBlockRowStore::reset()
|
||||
filter_applied_ = false;
|
||||
pd_filter_info_.reset();
|
||||
disabled_ = false;
|
||||
is_aggregated_in_prefetch_ = false;
|
||||
iter_param_ = nullptr;
|
||||
}
|
||||
|
||||
@ -58,6 +60,7 @@ void ObBlockRowStore::reuse()
|
||||
can_blockscan_ = false;
|
||||
filter_applied_ = false;
|
||||
disabled_ = false;
|
||||
is_aggregated_in_prefetch_ = false;
|
||||
}
|
||||
|
||||
int ObBlockRowStore::init(const ObTableAccessParam ¶m)
|
||||
|
@ -62,6 +62,8 @@ public:
|
||||
OB_INLINE bool is_valid() const { return is_inited_; }
|
||||
OB_INLINE bool is_disabled() const { return disabled_; }
|
||||
OB_INLINE void disable() { disabled_ = true; }
|
||||
OB_INLINE bool can_refresh() const { return !is_aggregated_in_prefetch_; }
|
||||
OB_INLINE void set_aggregated_in_prefetch() { is_aggregated_in_prefetch_ = true; }
|
||||
// for blockscan
|
||||
OB_INLINE void reset_blockscan() { can_blockscan_ = false; filter_applied_ = false; }
|
||||
OB_INLINE bool can_blockscan() const { return can_blockscan_; }
|
||||
@ -77,7 +79,7 @@ public:
|
||||
{ return pd_filter_info_.filter_; }
|
||||
virtual bool is_end() const { return false; }
|
||||
virtual bool is_empty() const { return true; }
|
||||
VIRTUAL_TO_STRING_KV(K_(is_inited), K_(can_blockscan), K_(filter_applied), K_(disabled));
|
||||
VIRTUAL_TO_STRING_KV(K_(is_inited), K_(can_blockscan), K_(filter_applied), K_(disabled), K_(is_aggregated_in_prefetch));
|
||||
protected:
|
||||
int filter_micro_block(
|
||||
const int64_t row_count,
|
||||
@ -92,6 +94,7 @@ private:
|
||||
bool can_blockscan_;
|
||||
bool filter_applied_;
|
||||
bool disabled_;
|
||||
bool is_aggregated_in_prefetch_;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -1273,7 +1273,7 @@ int ObIndexTreeMultiPassPrefetcher<DATA_PREFETCH_DEPTH, INDEX_PREFETCH_DEPTH>::p
|
||||
if (OB_FAIL(agg_row_store_->fill_index_info(block_info))) {
|
||||
LOG_WARN("Fail to agg index info", K(ret), K(block_info), KPC(this));
|
||||
} else {
|
||||
LOG_DEBUG("Success to agg index info", K(ret), KPC(agg_row_store_));
|
||||
LOG_DEBUG("Success to agg index info", K(ret), K(block_info), KPC(agg_row_store_));
|
||||
continue;
|
||||
}
|
||||
} else if (OB_FAIL(check_row_lock(block_info, is_row_lock_checked_))) {
|
||||
@ -1735,7 +1735,7 @@ int ObIndexTreeMultiPassPrefetcher<DATA_PREFETCH_DEPTH, INDEX_PREFETCH_DEPTH>::O
|
||||
if (OB_FAIL(prefetcher.agg_row_store_->fill_index_info(index_info))) {
|
||||
LOG_WARN("Fail to agg index info", K(ret), KPC(this));
|
||||
} else {
|
||||
LOG_DEBUG("Success to agg index info", K(ret), K(index_info));
|
||||
LOG_DEBUG("Success to agg index info", K(ret), K(index_info), KPC(prefetcher.agg_row_store_));
|
||||
}
|
||||
} else if (OB_FAIL(prefetcher.check_row_lock(index_info, is_row_lock_checked_))) {
|
||||
if (OB_UNLIKELY(OB_ITER_END != ret)) {
|
||||
|
@ -1373,9 +1373,11 @@ int ObMultipleMerge::refresh_table_on_demand()
|
||||
STORAGE_LOG(WARN, "ObMultipleMerge has not been inited", K(ret));
|
||||
} else if (ScanState::NONE == scan_state_) {
|
||||
STORAGE_LOG(DEBUG, "skip refresh table");
|
||||
} else if (get_table_param_->sample_info_.is_block_sample()) {
|
||||
} else if (get_table_param_->sample_info_.is_block_sample() ||
|
||||
(nullptr != block_row_store_ && !block_row_store_->can_refresh())) {
|
||||
// TODO : @yuanzhe refactor block sample for table refresh
|
||||
STORAGE_LOG(DEBUG, "skip refresh table for block sample");
|
||||
STORAGE_LOG(DEBUG, "skip refresh table for block sample or aggregated in prefetch",
|
||||
K(get_table_param_->sample_info_.is_block_sample()), KPC(block_row_store_));
|
||||
} else if (OB_FAIL(check_need_refresh_table(need_refresh))) {
|
||||
STORAGE_LOG(WARN, "fail to check need refresh table", K(ret));
|
||||
} else if (need_refresh) {
|
||||
|
@ -123,6 +123,7 @@ int ObSSTableRowScanner<PrefetchType>::inner_open(
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (iter_param_->enable_pd_aggregate() &&
|
||||
!iter_param_->is_use_column_store() &&
|
||||
nullptr != block_row_store_ &&
|
||||
iter_param_->enable_skip_index() &&
|
||||
!sstable_->is_multi_version_table()) {
|
||||
|
Reference in New Issue
Block a user