use shared block cache to manage a batch of chunk store in em sort
This commit is contained in:
committed by
ob-robot
parent
65ebe32909
commit
d5487f24ae
@ -2393,6 +2393,13 @@ void ObChunkDatumStore::Iterator::reset_cursor(const int64_t file_size)
|
||||
free_block(free_list_.remove_first(), default_block_size_, force_free);
|
||||
}
|
||||
|
||||
if (nullptr != blk_holder_ptr_) {
|
||||
if (blk_holder_ptr_->block_list_.get_size() > 0) {
|
||||
blk_holder_ptr_->release();
|
||||
blk_holder_ptr_ = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
cur_iter_blk_ = nullptr;
|
||||
cur_nth_blk_ = -1;
|
||||
cur_iter_pos_ = 0;
|
||||
@ -2567,7 +2574,12 @@ void ObChunkDatumStore::Iterator::free_block(Block *blk, const int64_t size,
|
||||
bool do_phy_free = force_free;
|
||||
if (!force_free) {
|
||||
try_free_cached_blocks();
|
||||
if (NULL != age_) {
|
||||
if (NULL != blk_holder_ptr_) {
|
||||
// fill iter ptr at pos of age, since we do not use age in shared cache
|
||||
*((int64_t *)((char *)blk + size - sizeof(int64_t))) = reinterpret_cast<int64_t> (this);
|
||||
blk_holder_ptr_->block_list_.add_last(blk);
|
||||
blk->blk_size_ = size;
|
||||
} else if (NULL != age_) {
|
||||
STATIC_ASSERT(sizeof(BlockBuffer) >= sizeof(int64_t), "unexpected block buffer size");
|
||||
// Save age to the tail of the block, we always allocate one BlockBuffer in tail of block,
|
||||
// it's safe to write it here.
|
||||
@ -2660,5 +2672,18 @@ int ObChunkDatumStore::Iterator::aio_wait()
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObChunkDatumStore::IteratedBlockHolder::release()
|
||||
{
|
||||
while (block_list_.get_size() > 0) {
|
||||
Block *blk = block_list_.remove_first();
|
||||
Iterator *iter = reinterpret_cast<Iterator *> (*((int64_t *)((char *)blk + blk->blk_size_ - sizeof(int64_t))));
|
||||
if (OB_NOT_NULL(blk) && OB_NOT_NULL(iter)) {
|
||||
iter->free_block(blk, blk->blk_size_, true);
|
||||
} else {
|
||||
LOG_ERROR_RET(OB_ERR_UNEXPECTED, "get unexpected block pair", KP(iter), KP(blk));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // end namespace sql
|
||||
} // end namespace oceanbase
|
||||
|
||||
@ -678,7 +678,17 @@ public:
|
||||
private:
|
||||
int64_t age_;
|
||||
};
|
||||
struct IteratedBlockHolder
|
||||
{
|
||||
IteratedBlockHolder() : block_list_() {}
|
||||
~IteratedBlockHolder()
|
||||
{
|
||||
release();
|
||||
}
|
||||
void release();
|
||||
|
||||
BlockList block_list_;
|
||||
};
|
||||
class Iterator
|
||||
{
|
||||
public:
|
||||
@ -689,6 +699,7 @@ public:
|
||||
DISK_ITER_END = 0x02
|
||||
};
|
||||
friend class ObChunkDatumStore;
|
||||
friend class IteratedBlockHolder;
|
||||
Iterator() : start_iter_(false),
|
||||
store_(NULL),
|
||||
cur_iter_blk_(NULL),
|
||||
@ -704,7 +715,8 @@ public:
|
||||
read_blk_buf_(NULL),
|
||||
aio_blk_(NULL),
|
||||
aio_blk_buf_(NULL),
|
||||
age_(NULL) {}
|
||||
age_(NULL),
|
||||
blk_holder_ptr_(NULL) {}
|
||||
virtual ~Iterator() { reset_cursor(0); }
|
||||
int init(ObChunkDatumStore *row_store, const IterationAge *age = NULL);
|
||||
void set_iteration_age(const IterationAge *age) { age_ = age; }
|
||||
@ -763,10 +775,11 @@ public:
|
||||
void free_block(Block *blk, const int64_t size, bool force_free = false);
|
||||
void try_free_cached_blocks();
|
||||
int64_t get_cur_chunk_row_cnt() const { return chunk_n_rows_;}
|
||||
void set_blk_holder_ptr(IteratedBlockHolder *ptr) { blk_holder_ptr_ = ptr;}
|
||||
TO_STRING_KV(KP_(store), KP_(cur_iter_blk),
|
||||
K_(cur_chunk_n_blocks), K_(cur_iter_pos), K_(file_size),
|
||||
KP_(chunk_mem), KP_(read_blk), KP_(read_blk_buf), KP_(aio_blk),
|
||||
KP_(aio_blk_buf), K_(default_block_size));
|
||||
KP_(aio_blk_buf), K_(default_block_size), KP_(blk_holder_ptr));
|
||||
private:
|
||||
explicit Iterator(ObChunkDatumStore *row_store);
|
||||
protected:
|
||||
@ -799,6 +812,7 @@ public:
|
||||
IterationAge inner_age_;
|
||||
const IterationAge *age_;
|
||||
int64_t default_block_size_;
|
||||
IteratedBlockHolder *blk_holder_ptr_;
|
||||
};
|
||||
|
||||
struct BatchCtx
|
||||
|
||||
@ -1766,9 +1766,8 @@ int ObSortOpImpl::sort()
|
||||
if (OB_FAIL(ret)) {
|
||||
// do nothing
|
||||
} else if (sort_chunks_.get_size() >= 2) {
|
||||
// clear iteration age, make sure no iteration block cached in inner round.
|
||||
set_iteration_age(NULL);
|
||||
|
||||
blk_holder_.release();
|
||||
set_blk_holder(nullptr);
|
||||
// do merge sort
|
||||
int64_t ways = 0;
|
||||
while (OB_SUCC(ret)) {
|
||||
@ -1812,8 +1811,7 @@ int ObSortOpImpl::sort()
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
// set iteration age for batch iteration.
|
||||
set_iteration_age(&iter_age_);
|
||||
set_blk_holder(&blk_holder_);
|
||||
next_stored_row_func_ = &ObSortOpImpl::ems_heap_next_stored_row;
|
||||
}
|
||||
}
|
||||
@ -1882,13 +1880,6 @@ int ObSortOpImpl::rewind()
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObSortOpImpl::set_iteration_age(ObChunkDatumStore::IterationAge *iter_age)
|
||||
{
|
||||
DLIST_FOREACH_NORET(chunk, sort_chunks_) {
|
||||
chunk->iter_.set_iteration_age(iter_age);
|
||||
}
|
||||
}
|
||||
|
||||
int ObSortOpImpl::get_next_batch_stored_rows(int64_t max_cnt, int64_t &read_rows)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -1896,8 +1887,8 @@ int ObSortOpImpl::get_next_batch_stored_rows(int64_t max_cnt, int64_t &read_rows
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("get next batch failed", K(ret));
|
||||
} else {
|
||||
iter_age_.inc();
|
||||
read_rows = 0;
|
||||
blk_holder_.release();
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < max_cnt; i++) {
|
||||
const ObChunkDatumStore::StoredRow *sr = NULL;
|
||||
if (OB_FAIL((this->*next_stored_row_func_)(sr))) {
|
||||
@ -2322,6 +2313,13 @@ int ObSortOpImpl::adjust_topn_read_rows(ObChunkDatumStore::StoredRow **stored_ro
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObSortOpImpl::set_blk_holder(ObChunkDatumStore::IteratedBlockHolder *blk_holder)
|
||||
{
|
||||
DLIST_FOREACH_NORET(chunk, sort_chunks_) {
|
||||
chunk->iter_.set_blk_holder_ptr(blk_holder);
|
||||
}
|
||||
}
|
||||
|
||||
/************************************* end ObSortOpImpl ********************************/
|
||||
|
||||
/*********************************** start ObPrefixSortImpl *****************************/
|
||||
|
||||
@ -182,7 +182,7 @@ public:
|
||||
} else if (outputted_rows_cnt_ >= topn_cnt_ && !is_fetch_with_ties_) {
|
||||
ret = OB_ITER_END;
|
||||
} else {
|
||||
iter_age_.inc();
|
||||
blk_holder_.release();
|
||||
ret = (this->*next_stored_row_func_)(sr);
|
||||
if (OB_UNLIKELY(common::OB_ITER_END == ret) && !need_rewind_) {
|
||||
reuse();
|
||||
@ -510,7 +510,7 @@ protected:
|
||||
int is_equal_part(const ObChunkDatumStore::StoredRow *l, const ObChunkDatumStore::StoredRow *r, bool &is_equal);
|
||||
int do_partition_sort(common::ObIArray<ObChunkDatumStore::StoredRow *> &rows,
|
||||
const int64_t rows_begin, const int64_t rows_end);
|
||||
void set_iteration_age(ObChunkDatumStore::IterationAge *iter_age);
|
||||
void set_blk_holder(ObChunkDatumStore::IteratedBlockHolder *blk_holder);
|
||||
// for topn sort
|
||||
int add_heap_sort_row(const common::ObIArray<ObExpr*> &exprs,
|
||||
const ObChunkDatumStore::StoredRow *&store_row);
|
||||
@ -588,7 +588,6 @@ protected:
|
||||
uint64_t op_id_;
|
||||
ObExecContext *exec_ctx_;
|
||||
ObChunkDatumStore::StoredRow **stored_rows_;
|
||||
ObChunkDatumStore::IterationAge iter_age_;
|
||||
ObIOEventObserver *io_event_observer_;
|
||||
// for window function partition sort
|
||||
PartHashNode **buckets_;
|
||||
@ -607,6 +606,7 @@ protected:
|
||||
common::ObArray<SortStoredRow *> ties_array_;
|
||||
ObChunkDatumStore::StoredRow *last_ties_row_;
|
||||
common::ObIArray<ObChunkDatumStore::StoredRow *> *rows_;
|
||||
ObChunkDatumStore::IteratedBlockHolder blk_holder_;
|
||||
};
|
||||
|
||||
class ObInMemoryTopnSortImpl;
|
||||
|
||||
Reference in New Issue
Block a user