fix datum store memory leak

This commit is contained in:
raywill
2023-06-21 12:12:51 +00:00
committed by ob-robot
parent 9dae112952
commit a8c269dfbb
5 changed files with 94 additions and 18 deletions

View File

@ -551,8 +551,7 @@ void ObChunkDatumStore::reset()
blocks_.reset();
cur_blk_ = NULL;
cur_blk_buffer_ = nullptr;
free_block(tmp_dump_blk_);
tmp_dump_blk_ = nullptr;
free_tmp_dump_blk(); // just in case, not necessary. tmp block always freed instantly after use
while (!free_list_.is_empty()) {
Block *item = free_list_.remove_first();
mem_hold_ -= item->get_buffer()->mem_size();
@ -2385,12 +2384,12 @@ void ObChunkDatumStore::Iterator::reset_cursor(const int64_t file_size)
read_blk_ = NULL;
read_blk_buf_ = NULL;
while (NULL != cached_.get_first()) {
free_block(cached_.remove_first(), default_block_size_, force_free);
while (NULL != icached_.get_first()) {
free_block(icached_.remove_first(), default_block_size_, force_free);
}
while (NULL != free_list_.get_first()) {
free_block(free_list_.remove_first(), default_block_size_, force_free);
while (NULL != ifree_list_.get_first()) {
free_block(ifree_list_.remove_first(), default_block_size_, force_free);
}
if (nullptr != blk_holder_ptr_) {
@ -2558,8 +2557,8 @@ int ObChunkDatumStore::Iterator::alloc_block(Block *&blk, const int64_t size)
{
int ret = OB_SUCCESS;
try_free_cached_blocks();
if (size == default_block_size_ && NULL != free_list_.get_first()) {
blk = free_list_.remove_first();
if (size == default_block_size_ && NULL != ifree_list_.get_first()) {
blk = ifree_list_.remove_first();
ObChunkDatumStore::init_block_buffer(blk, size, blk);
} else if (OB_FAIL(store_->alloc_block_buffer(blk, size, true))) {
LOG_WARN("alloc block buffer failed", K(ret), K(size));
@ -2586,13 +2585,13 @@ void ObChunkDatumStore::Iterator::free_block(Block *blk, const int64_t size,
*((int64_t *)((char *)blk + size - sizeof(int64_t))) = age_->get();
// Save memory size to %blk_size_
blk->blk_size_ = size;
cached_.add_last(blk);
icached_.add_last(blk);
} else {
if (size == default_block_size_ && !force_free) {
#ifndef NDEBUG
memset((char *)blk + sizeof(*blk), 0xAA, size - sizeof(*blk));
#endif
free_list_.add_last(blk);
ifree_list_.add_last(blk);
} else {
do_phy_free = true;
}
@ -2613,17 +2612,17 @@ void ObChunkDatumStore::Iterator::try_free_cached_blocks()
{
const int64_t read_age = NULL == age_ ? INT64_MAX : age_->get();
// try free age expired blocks
while (NULL != cached_.get_first()) {
Block *b = cached_.get_first();
while (NULL != icached_.get_first()) {
Block *b = icached_.get_first();
// age is stored in tail of block, see free_block()
const int64_t age = *((int64_t *)((char *)b + b->blk_size_ - sizeof(int64_t)));
if (age < read_age) {
b = cached_.remove_first();
b = icached_.remove_first();
if (b->blk_size_ == default_block_size_) {
#ifndef NDEBUG
memset((char *)b + sizeof(*b), 0xAA, b->blk_size_ - sizeof(*b));
#endif
free_list_.add_last(b);
ifree_list_.add_last(b);
} else {
const bool force_free = true;
free_block(b, b->blk_size_, force_free);

View File

@ -804,9 +804,80 @@ public:
Block *aio_blk_; // not null means aio is reading.
BlockBuffer *aio_blk_buf_;
BlockList free_list_;
// cached blocks for batch iterate
BlockList cached_;
/*
* Currently, the design philosophy of ObChunkDatumStore is as follows:
* - Support concurrent read. External readers have no "side effects" on
* ObChunkDatumStore and will not change any state in ObChunkDatumStore.
* - From the reader/writer's perspective, the data in ObChunkDatumStore
* is divided into two parts: blocks cached in memory and data persisted in files.
*
* ObChunkDatumStore's write strategy:
* All data is first written to the cache block.
* If the cache is full, the oldest data in the cache block is written to the disk,
* and then the data is written to the cache block. The final effect is that old data
* is on the disk and new data is in the cache, with the order of writing preserved.
* ObChunkDatumStore's read strategy:
* The reading order of data is consistent with the writing order,
* first reading from the disk file, and then reading from the cache block after
* finishing reading from the disk. An Iterator must be used to access data in ObChunkDatumStore.
* When the Iterator accesses data, it first reads from the disk and caches the disk data
* in the Iterator's private cache. When the Iterator is released, these cached data will also
* be released with the Iterator. After the disk data is read, the Iterator directly
* reads the block cache in ObChunkDatumStore based on its own record of the block offset.
*
* To wrap it up, the write & read mode is FIFO.
*/
/*
In this illustration, ObChunkDatumStore is readonly.
Iterator 1 and Iterator 2 are accessing the same ObChunkDatumStore in parallel.
The Iterator caches data independently, and have its own private cur_nth_blk_.
+------------------------------------------------------+
| Iterator 1 |
| +-----------+ +----------+ |
| cur_nth_blk_ | cached | |cached | |
| | | block | |block | |
| +------+ +-----+-----+ +----+-----+ |
| | | | |
+--------------+-------------+-------------+-----------+
| | |
| | |
| | |
+-------------------+-------------+-------------+----------------+
| | | | |
| +-------+ +---v---+ +---v-------------v----------+ |
| | | | | | | |
| |in mem | |in mem | | in disk | |
| |block | |block | | block | |
| | | | | | | |
| | | | | | | |
| | | | | | | |
| +---^---+ +-------+ +------^--------^------------+ |
| | | | |
| | ObChunkDatumStore | | |
| | | | |
+---------+--------------------------+--------+------------------+
| | |
| | |
| | |
+-----+--------------------------+--------+------------+
| | | | |
| +---------+-+ +---+------+ |
| cur_nth_blk_ | cached | |cached | |
| | block | |block | |
| +-----------+ +----------+ |
| Iterator 2 |
+------------------------------------------------------+
*/
// idle memory blocks cached by iterator,
// used to cache data read from aio file
BlockList ifree_list_;
// active blocks for batch iterate
// used to output data
BlockList icached_;
// inner iteration age is used for batch iteration with no outside age control.
IterationAge inner_age_;

View File

@ -147,7 +147,6 @@ int ObPDMLOpBatchRowCache::init_row_store(ObChunkDatumStore *&chunk_row_store)
// 非barrier情况下,不支持dump
LOG_WARN("failed to init chunk row store in batch row cache", K(ret));
} else {
chunk_row_store->set_allocator(allocator);
chunk_row_store->set_callback(&sql_mem_processor_);
chunk_row_store->set_io_event_observer(&io_event_observer_);
if (with_barrier_) {

View File

@ -49,6 +49,8 @@ public:
int get_next_row(const ObExprPtrIArray &row);
void set_uniq_row_checker(ObDMLOpUniqueRowChecker *uniq_row_checker)
{ uniq_row_checker_ = uniq_row_checker; }
void close()
{ row_store_it_.reset(); }
private:
int init_data_source(ObChunkDatumStore &row_datum_store,
ObEvalCtx *eval_ctx);

View File

@ -251,6 +251,8 @@ int ObPDMLOpDataDriver::write_partitions(ObExecContext &ctx)
// nop
} else if (OB_FAIL(writer_->write_rows(ctx, tablet_loc, *row_iter))) {
LOG_WARN("fail write rows", K(tablet_id), K(ret));
} else {
row_iter->close();
}
}
}
@ -374,6 +376,9 @@ int ObPDMLOpDataDriver::next_row_from_cache_for_returning(const ObExprPtrIArray
int ObPDMLOpDataDriver::switch_row_iter_to_next_partition()
{
int ret = OB_SUCCESS;
if (returning_ctx_.row_iter_) {
returning_ctx_.row_iter_->close();
}
// 当前仅仅cache一行数据
// next idx仅仅等于0,如果next idx等于1,表示没有数据
if (OB_SUCC(ret) && returning_ctx_.next_idx_ >= returning_ctx_.tablet_id_array_.count()) {