Opt cold scan for column store
This commit is contained in:
@ -346,7 +346,7 @@ public:
|
||||
ObTableAccessContext &access_ctx,
|
||||
const void *query_range) override final;
|
||||
void inc_cur_micro_data_fetch_idx();
|
||||
int prefetch();
|
||||
virtual int prefetch();
|
||||
OB_INLINE ObSSTableReadHandle ¤t_read_handle()
|
||||
{ return read_handles_[cur_range_fetch_idx_ % max_range_prefetching_cnt_]; }
|
||||
OB_INLINE ObMicroBlockDataHandle ¤t_micro_handle()
|
||||
@ -424,7 +424,7 @@ protected:
|
||||
}
|
||||
void reset_tree_handles();
|
||||
virtual int init_tree_handles(const int64_t count);
|
||||
virtual int get_prefetch_depth(int64_t &depth);
|
||||
int get_prefetch_depth(int64_t &depth);
|
||||
|
||||
static const int32_t DEFAULT_SCAN_RANGE_PREFETCH_CNT = 4;
|
||||
static const int32_t DEFAULT_SCAN_MICRO_DATA_HANDLE_CNT = DATA_PREFETCH_DEPTH;
|
||||
|
@ -10,6 +10,7 @@
|
||||
|
||||
#define USING_LOG_PREFIX STORAGE
|
||||
#include "ob_cg_prefetcher.h"
|
||||
#include "ob_i_cg_iterator.h"
|
||||
#include "storage/access/ob_aggregated_store.h"
|
||||
|
||||
namespace oceanbase {
|
||||
@ -23,7 +24,9 @@ void ObCGPrefetcher::reset()
|
||||
query_index_range_.reset();
|
||||
query_range_.reset();
|
||||
is_reverse_scan_ = false;
|
||||
cg_iter_type_ = -1;
|
||||
filter_bitmap_ = nullptr;
|
||||
micro_data_prewarm_idx_ = 0;
|
||||
cur_micro_data_read_idx_ = -1;
|
||||
cg_agg_cells_ = nullptr;
|
||||
ObIndexTreeMultiPassPrefetcher::reset();
|
||||
@ -35,6 +38,7 @@ void ObCGPrefetcher::reuse()
|
||||
query_index_range_.reset();
|
||||
query_range_.reset();
|
||||
filter_bitmap_ = nullptr;
|
||||
micro_data_prewarm_idx_ = 0;
|
||||
cur_micro_data_read_idx_ = -1;
|
||||
ObIndexTreeMultiPassPrefetcher::reuse();
|
||||
}
|
||||
@ -57,12 +61,14 @@ int ObCGPrefetcher::init_tree_handles(const int64_t count)
|
||||
}
|
||||
|
||||
int ObCGPrefetcher::init(
|
||||
const int cg_iter_type,
|
||||
ObSSTable &sstable,
|
||||
const ObTableIterParam &iter_param,
|
||||
ObTableAccessContext &access_ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
query_range_.set_whole_range();
|
||||
cg_iter_type_ = cg_iter_type;
|
||||
if (OB_FAIL(ObIndexTreeMultiPassPrefetcher::init(
|
||||
ObStoreRowIterator::IteratorScan,
|
||||
sstable,
|
||||
@ -80,12 +86,14 @@ int ObCGPrefetcher::init(
|
||||
}
|
||||
|
||||
int ObCGPrefetcher::switch_context(
|
||||
const int cg_iter_type,
|
||||
ObSSTable &sstable,
|
||||
const ObTableIterParam &iter_param,
|
||||
ObTableAccessContext &access_ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
query_range_.set_whole_range();
|
||||
cg_iter_type_ = cg_iter_type;
|
||||
if (OB_FAIL(ObIndexTreeMultiPassPrefetcher::switch_context(
|
||||
ObStoreRowIterator::IteratorScan,
|
||||
sstable,
|
||||
@ -179,21 +187,44 @@ int ObCGPrefetcher::locate_in_prefetched_data(bool &found)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
found = false;
|
||||
const ObCSRowId row_idx = is_reverse_scan_ ? query_index_range_.end_row_id_ : query_index_range_.start_row_id_;
|
||||
if (micro_data_prefetch_idx_ > 0 && micro_data_prefetch_idx_ > cur_micro_data_fetch_idx_) {
|
||||
ObMicroIndexInfo &last_prefetched_info = micro_data_infos_[(micro_data_prefetch_idx_ - 1) % max_micro_handle_cnt_];
|
||||
const ObCSRange &last_prefetched_range = last_prefetched_info.get_row_range();
|
||||
if (0 == last_prefetched_range.compare(row_idx)) {
|
||||
cur_micro_data_fetch_idx_ = micro_data_prefetch_idx_ - 2;
|
||||
cur_micro_data_read_idx_ = cur_micro_data_fetch_idx_;
|
||||
const ObCSRowId start_row_idx = is_reverse_scan_ ? query_index_range_.end_row_id_ : query_index_range_.start_row_id_;
|
||||
if (micro_data_prewarm_idx_ > 0 && micro_data_prewarm_idx_ > cur_micro_data_fetch_idx_) {
|
||||
int cmp_ret = -1;
|
||||
for (int64_t micro_data_idx = MAX(0, cur_micro_data_fetch_idx_); OB_SUCC(ret) && cmp_ret < 0 && micro_data_idx < micro_data_prewarm_idx_; micro_data_idx++) {
|
||||
ObMicroIndexInfo µ_info = micro_data_infos_[micro_data_idx % max_micro_handle_cnt_];
|
||||
const ObCSRange µ_range = micro_info.get_row_range();
|
||||
cmp_ret = micro_range.compare(start_row_idx);
|
||||
if (is_reverse_scan_) {
|
||||
last_prefetched_info.is_right_border_ = true;
|
||||
is_prefetch_end_ = (query_index_range_.start_row_id_ >= last_prefetched_range.start_row_id_);
|
||||
} else {
|
||||
last_prefetched_info.is_left_border_ = true;
|
||||
is_prefetch_end_ = (query_index_range_.end_row_id_ <= last_prefetched_range.end_row_id_);
|
||||
cmp_ret = -cmp_ret;
|
||||
}
|
||||
if (0 == cmp_ret) {
|
||||
cur_micro_data_fetch_idx_ = micro_data_idx - 1;
|
||||
cur_micro_data_read_idx_ = cur_micro_data_fetch_idx_;
|
||||
if (is_reverse_scan_) {
|
||||
micro_info.is_right_border_ = true;
|
||||
} else {
|
||||
micro_info.is_left_border_ = true;
|
||||
}
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && found) {
|
||||
cmp_ret = 1;
|
||||
is_prefetch_end_ = false;
|
||||
micro_data_prefetch_idx_ = micro_data_prewarm_idx_;
|
||||
const ObCSRowId end_row_id = is_reverse_scan_ ? query_index_range_.start_row_id_ : query_index_range_.end_row_id_;
|
||||
for (int64_t micro_data_idx = micro_data_prewarm_idx_ - 1; OB_SUCC(ret) && cmp_ret > 0 && micro_data_idx > cur_micro_data_fetch_idx_; micro_data_idx--) {
|
||||
const ObCSRange µ_range = micro_data_infos_[micro_data_idx % max_micro_handle_cnt_].get_row_range();
|
||||
cmp_ret = micro_range.compare(end_row_id);
|
||||
if (is_reverse_scan_) {
|
||||
cmp_ret = -cmp_ret;
|
||||
}
|
||||
if (0 == cmp_ret) {
|
||||
is_prefetch_end_ = true;
|
||||
micro_data_prefetch_idx_ = micro_data_idx + 1;
|
||||
}
|
||||
}
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -224,6 +255,7 @@ int ObCGPrefetcher::refresh_index_tree()
|
||||
cur_micro_data_fetch_idx_ = -1;
|
||||
cur_micro_data_read_idx_ = -1;
|
||||
micro_data_prefetch_idx_ = 0;
|
||||
micro_data_prewarm_idx_ = 0;
|
||||
total_micro_data_cnt_ = 0;
|
||||
for (int64_t level = 1; level < index_tree_height_; level++) {
|
||||
if (level <= cur_level_) {
|
||||
@ -265,6 +297,7 @@ int ObCGPrefetcher::locate(const ObCSRange &range, const ObCGBitmap *bitmap)
|
||||
is_prefetch_end_ = true;
|
||||
} else if (locate_back(range)) {
|
||||
ObIndexTreeMultiPassPrefetcher::reuse();
|
||||
micro_data_prewarm_idx_ = 0;
|
||||
cur_micro_data_read_idx_ = -1;
|
||||
update_query_range(range);
|
||||
read_handles_[0].row_state_ = ObSSTableRowState::IN_BLOCK;
|
||||
@ -324,7 +357,7 @@ int ObCGPrefetcher::prefetch_micro_data()
|
||||
K_(micro_data_prefetch_idx), K_(cur_micro_data_read_idx), K_(max_micro_handle_cnt));
|
||||
} else if (micro_data_prefetch_idx_ - cur_micro_data_read_idx_ == max_micro_handle_cnt_) {
|
||||
// DataBlock ring buf full
|
||||
} else if (OB_FAIL(get_prefetch_depth(prefetch_depth))) {
|
||||
} else if (OB_FAIL(get_prefetch_depth(prefetch_depth, micro_data_prefetch_idx_))) {
|
||||
LOG_WARN("Fail to get prefetch depth", K(ret));
|
||||
} else {
|
||||
while (OB_SUCC(ret) && !is_prefetch_end_ && prefetched_cnt < prefetch_depth) {
|
||||
@ -642,16 +675,88 @@ void ObCGPrefetcher::recycle_block_data()
|
||||
cur_micro_data_read_idx_ = cur_micro_data_fetch_idx_;
|
||||
}
|
||||
|
||||
int ObCGPrefetcher::get_prefetch_depth(int64_t &depth)
|
||||
int ObCGPrefetcher::get_prefetch_depth(int64_t &depth, const int64_t prefetching_idx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
depth = 0;
|
||||
prefetch_depth_ = MIN(2 * prefetch_depth_, DEFAULT_SCAN_MICRO_DATA_HANDLE_CNT);
|
||||
depth = min(static_cast<int64_t>(prefetch_depth_),
|
||||
max_micro_handle_cnt_ - (micro_data_prefetch_idx_ - cur_micro_data_read_idx_));
|
||||
max_micro_handle_cnt_ - (prefetching_idx - cur_micro_data_read_idx_));
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObCGPrefetcher::prefetch()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(ObIndexTreeMultiPassPrefetcher::prefetch())) {
|
||||
LOG_WARN("Fail to prefetch", K(ret));
|
||||
} else if (OB_FAIL(prewarm())) {
|
||||
LOG_WARN("Fail to prewarm", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObCGPrefetcher::prewarm()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int32_t prefetch_limit = MAX(2, max_micro_handle_cnt_ / 2);
|
||||
if (micro_data_prewarm_idx_ < micro_data_prefetch_idx_) {
|
||||
micro_data_prewarm_idx_ = micro_data_prefetch_idx_;
|
||||
}
|
||||
if (is_prefetch_end_ &&
|
||||
ObICGIterator::OB_CG_SCANNER == cg_iter_type_ &&
|
||||
nullptr == cg_agg_cells_ &&
|
||||
nullptr == access_ctx_->limit_param_ &&
|
||||
(index_tree_height_ - 1) == cur_level_ &&
|
||||
micro_data_prewarm_idx_ - cur_micro_data_fetch_idx_ < prefetch_limit) {
|
||||
|
||||
int64_t prefetched_cnt = 0;
|
||||
int64_t prefetch_micro_idx = 0;
|
||||
int64_t prefetch_depth = 0;
|
||||
if (OB_FAIL(get_prefetch_depth(prefetch_depth, micro_data_prewarm_idx_))) {
|
||||
LOG_WARN("Fail to get prefetch depth", K(ret));
|
||||
} else {
|
||||
while (OB_SUCC(ret) && prefetched_cnt < prefetch_depth) {
|
||||
prefetch_micro_idx = micro_data_prewarm_idx_ % max_micro_handle_cnt_;
|
||||
ObMicroIndexInfo &block_info = micro_data_infos_[prefetch_micro_idx];
|
||||
if (OB_FAIL(tree_handles_[cur_level_].get_next_data_row(false, block_info))) {
|
||||
if (OB_UNLIKELY(OB_ITER_END != ret)) {
|
||||
LOG_WARN("fail to get next", K(ret), K_(cur_level), K(tree_handles_[cur_level_]));
|
||||
} else {
|
||||
// open next leaf by get_next_index_leaf in the next loop
|
||||
ret = OB_SUCCESS;
|
||||
break;
|
||||
}
|
||||
} else if (nullptr != sstable_index_filter_
|
||||
&& block_info.has_agg_data()
|
||||
&& block_info.is_filter_uncertain()
|
||||
&& OB_FAIL(sstable_index_filter_->check_range(
|
||||
iter_param_->read_info_, block_info, *(access_ctx_->allocator_)))) {
|
||||
LOG_WARN("Fail to check if can skip prefetch", K(ret), K(block_info));
|
||||
} else if (nullptr != sstable_index_filter_
|
||||
&& (block_info.is_filter_always_false() || block_info.is_filter_always_true())) {
|
||||
// TODO: skip data block which is always_false/always_true and record the result in filter bitmap
|
||||
prefetched_cnt++;
|
||||
micro_data_prewarm_idx_++;
|
||||
tree_handles_[cur_level_].current_block_read_handle().end_prefetched_row_idx_++;
|
||||
} else if (OB_FAIL(prefetch_block_data(block_info, micro_data_handles_[prefetch_micro_idx]))) {
|
||||
LOG_WARN("fail to prefetch_block_data", K(ret), K(block_info));
|
||||
} else {
|
||||
prefetched_cnt++;
|
||||
micro_data_prewarm_idx_++;
|
||||
tree_handles_[cur_level_].current_block_read_handle().end_prefetched_row_idx_++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && 0 < prefetched_cnt) {
|
||||
LOG_DEBUG("[INDEX BLOCK] prewarm prefetched info", K(ret), K_(cur_micro_data_fetch_idx), K_(micro_data_prefetch_idx),
|
||||
K_(micro_data_prewarm_idx), K_(total_micro_data_cnt), K(prefetched_cnt), K(prefetch_depth));
|
||||
total_micro_data_cnt_ += prefetched_cnt;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -23,10 +23,12 @@ class ObCGPrefetcher : public ObIndexTreeMultiPassPrefetcher<>
|
||||
public:
|
||||
ObCGPrefetcher() :
|
||||
is_reverse_scan_(false),
|
||||
cg_iter_type_(-1),
|
||||
query_index_range_(),
|
||||
query_range_(),
|
||||
leaf_query_range_(),
|
||||
filter_bitmap_(nullptr),
|
||||
micro_data_prewarm_idx_(0),
|
||||
cur_micro_data_read_idx_(-1),
|
||||
cg_agg_cells_(nullptr),
|
||||
sstable_index_filter_(nullptr)
|
||||
@ -35,13 +37,18 @@ public:
|
||||
{}
|
||||
virtual void reset() override final;
|
||||
virtual void reuse() override final;
|
||||
int init(ObSSTable &sstable,
|
||||
const ObTableIterParam &iter_param,
|
||||
ObTableAccessContext &access_ctx);
|
||||
int switch_context(ObSSTable &sstable,
|
||||
const ObTableIterParam &iter_param,
|
||||
ObTableAccessContext &access_ctx);
|
||||
int init(
|
||||
const int cg_iter_type,
|
||||
ObSSTable &sstable,
|
||||
const ObTableIterParam &iter_param,
|
||||
ObTableAccessContext &access_ctx);
|
||||
int switch_context(
|
||||
const int cg_iter_type,
|
||||
ObSSTable &sstable,
|
||||
const ObTableIterParam &iter_param,
|
||||
ObTableAccessContext &access_ctx);
|
||||
int locate(const ObCSRange &range, const ObCGBitmap *bitmap);
|
||||
virtual int prefetch() override final;
|
||||
OB_INLINE bool is_empty_range() const
|
||||
{ return 0 == micro_data_prefetch_idx_ && is_prefetch_end_; }
|
||||
virtual bool read_wait() override final
|
||||
@ -63,12 +70,13 @@ public:
|
||||
void recycle_block_data();
|
||||
void set_cg_agg_cells(ObCGAggCells &cg_agg_cells) { cg_agg_cells_ = &cg_agg_cells; }
|
||||
INHERIT_TO_STRING_KV("ObCGPrefetcher", ObIndexTreeMultiPassPrefetcher,
|
||||
K_(query_index_range), K_(query_range),
|
||||
K_(cur_micro_data_read_idx), KP_(filter_bitmap),
|
||||
K_(query_index_range), K_(query_range), K_(cg_iter_type),
|
||||
K_(micro_data_prewarm_idx), K_(cur_micro_data_read_idx), KP_(filter_bitmap),
|
||||
KP_(cg_agg_cells), KP_(sstable_index_filter));
|
||||
protected:
|
||||
virtual int get_prefetch_depth(int64_t &depth) override;
|
||||
int get_prefetch_depth(int64_t &depth, const int64_t prefetching_idx);
|
||||
private:
|
||||
int prewarm();
|
||||
struct ObCSIndexTreeLevelHandle : public ObIndexTreeLevelHandle {
|
||||
public:
|
||||
int prefetch(const int64_t level, ObCGPrefetcher &prefetcher);
|
||||
@ -95,11 +103,13 @@ private:
|
||||
|
||||
private:
|
||||
bool is_reverse_scan_;
|
||||
int16_t cg_iter_type_;
|
||||
ObStorageDatum datums_[2];
|
||||
ObCSRange query_index_range_;
|
||||
ObDatumRange query_range_;
|
||||
ObDatumRange leaf_query_range_;
|
||||
const ObCGBitmap *filter_bitmap_;
|
||||
int64_t micro_data_prewarm_idx_;
|
||||
public:
|
||||
int64_t cur_micro_data_read_idx_;
|
||||
ObCGAggCells *cg_agg_cells_;
|
||||
|
@ -43,7 +43,7 @@ int ObCGScanner::init(
|
||||
} else if (OB_UNLIKELY(!sstable_->is_normal_cg_sstable())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("Unexpected not normal cg sstable", K(ret), KPC_(sstable));
|
||||
} else if (OB_FAIL(prefetcher_.init(*sstable_, iter_param, access_ctx))) {
|
||||
} else if (OB_FAIL(prefetcher_.init(get_type(), *sstable_, iter_param, access_ctx))) {
|
||||
LOG_WARN("fail to init prefetcher, ", K(ret));
|
||||
} else {
|
||||
iter_param_ = &iter_param;
|
||||
@ -83,10 +83,10 @@ int ObCGScanner::switch_context(
|
||||
LOG_WARN("Unexpected not normal cg sstable", K(ret), KPC_(sstable));
|
||||
} else {
|
||||
if (!prefetcher_.is_valid()) {
|
||||
if (OB_FAIL(prefetcher_.init(*sstable_, iter_param, access_ctx))) {
|
||||
if (OB_FAIL(prefetcher_.init(get_type(), *sstable_, iter_param, access_ctx))) {
|
||||
LOG_WARN("fail to init prefetcher, ", K(ret));
|
||||
}
|
||||
} else if (OB_FAIL(prefetcher_.switch_context(*sstable_, iter_param, access_ctx))) {
|
||||
} else if (OB_FAIL(prefetcher_.switch_context(get_type(), *sstable_, iter_param, access_ctx))) {
|
||||
LOG_WARN("Fail to switch context for prefetcher", K(ret));
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
|
Reference in New Issue
Block a user