Fixed the issue where memory could not be reused when reading interme results
This commit is contained in:
		@ -1672,7 +1672,6 @@ int ObChunkDatumStore::add_block(Block* block, bool need_swizzling, bool *added)
 | 
			
		||||
int ObChunkDatumStore::append_block(char *buf, int size,  bool need_swizzling)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  ObMemAttr attr(tenant_id_, label_, ctx_id_);
 | 
			
		||||
  Block *src_block = reinterpret_cast<Block*>(buf);
 | 
			
		||||
  int64_t block_data_size = size;
 | 
			
		||||
  Block *new_block = nullptr;
 | 
			
		||||
@ -1697,12 +1696,19 @@ int ObChunkDatumStore::append_block(char *buf, int size,  bool need_swizzling)
 | 
			
		||||
    } else if (OB_FAIL(add_block(new_block, need_swizzling, &added))) {
 | 
			
		||||
      LOG_WARN("fail to add block", K(ret));
 | 
			
		||||
    } else {
 | 
			
		||||
      LOG_TRACE("trace append block", K(src_block->rows_), K(size));
 | 
			
		||||
      LOG_TRACE("trace append block", K(src_block->rows_), K(size), K(mem_used_), K(mem_hold_));
 | 
			
		||||
    }
 | 
			
		||||
    if (OB_FAIL(ret) && !added) {
 | 
			
		||||
      free_blk_mem(new_block, block_buffer->mem_size());
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  // dump data if mem used > 16MB
 | 
			
		||||
  const int64_t dump_threshold = 1 << 24;
 | 
			
		||||
  if (OB_SUCC(ret) && mem_used_ > dump_threshold) {
 | 
			
		||||
    if (OB_FAIL(dump(false /* reuse */, true /* all_dump */))) {
 | 
			
		||||
      LOG_WARN("dump failed", K(ret));
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -1710,7 +1716,6 @@ int ObChunkDatumStore::append_block(char *buf, int size,  bool need_swizzling)
 | 
			
		||||
int ObChunkDatumStore::append_block_payload(char *payload, int size, int rows, bool need_swizzling)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  ObMemAttr attr(tenant_id_, label_, ctx_id_);
 | 
			
		||||
  int64_t block_data_size = size + sizeof(Block);
 | 
			
		||||
  Block *new_block = nullptr;
 | 
			
		||||
  bool added = false;
 | 
			
		||||
@ -1734,12 +1739,21 @@ int ObChunkDatumStore::append_block_payload(char *payload, int size, int rows, b
 | 
			
		||||
    } else if (OB_FAIL(add_block(new_block, need_swizzling, &added))) {
 | 
			
		||||
      LOG_WARN("fail to add block", K(ret));
 | 
			
		||||
    } else {
 | 
			
		||||
      LOG_TRACE("trace append block", K(rows), K(size));
 | 
			
		||||
      LOG_TRACE("trace append block", K(rows), K(size), K(mem_used_), K(mem_hold_));
 | 
			
		||||
    }
 | 
			
		||||
    if (OB_FAIL(ret) && !added) {
 | 
			
		||||
      free_blk_mem(new_block, block_buffer->mem_size());
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  // dump data if mem used > 1MB
 | 
			
		||||
  // This function is called when use px_batch_rescan, which may write a lot of dtl interm results,
 | 
			
		||||
  // so the threshold is smaller.
 | 
			
		||||
  const int64_t dump_threshold = 1 << 20;
 | 
			
		||||
  if (OB_SUCC(ret) && mem_used_ > dump_threshold) {
 | 
			
		||||
    if (OB_FAIL(dump(false /* reuse */, true /* all_dump */))) {
 | 
			
		||||
      LOG_WARN("dump failed", K(ret));
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -2646,11 +2660,12 @@ int ObChunkDatumStore::Iterator::alloc_block(Block *&blk, const int64_t size)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  try_free_cached_blocks();
 | 
			
		||||
  if (size == default_block_size_ && NULL != ifree_list_.get_first()) {
 | 
			
		||||
  int64_t actual_size = store_->min_blk_size(size);
 | 
			
		||||
  if (actual_size == default_block_size_ && NULL != ifree_list_.get_first()) {
 | 
			
		||||
    blk = ifree_list_.remove_first();
 | 
			
		||||
    ObChunkDatumStore::init_block_buffer(blk, size, blk);
 | 
			
		||||
  } else if (OB_FAIL(store_->alloc_block_buffer(blk, size, true))) {
 | 
			
		||||
    LOG_WARN("alloc block buffer failed", K(ret), K(size));
 | 
			
		||||
    ObChunkDatumStore::init_block_buffer(blk, actual_size, blk);
 | 
			
		||||
  } else if (OB_FAIL(store_->alloc_block_buffer(blk, actual_size, true))) {
 | 
			
		||||
    LOG_WARN("alloc block buffer failed", K(ret), K(actual_size));
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -928,12 +928,19 @@ int ObChunkRowStore::append_block(char *buf, int size,  bool need_swizzling)
 | 
			
		||||
    } else if (OB_FAIL(add_block(new_block, need_swizzling, &added))) {
 | 
			
		||||
      LOG_WARN("fail to add block", K(ret));
 | 
			
		||||
    } else {
 | 
			
		||||
      LOG_TRACE("trace append block", K(src_block->rows_), K(size));
 | 
			
		||||
      LOG_TRACE("trace append block", K(src_block->rows_), K(size), K(mem_used_), K(mem_hold_));
 | 
			
		||||
    }
 | 
			
		||||
    if (OB_FAIL(ret) && !added) {
 | 
			
		||||
      free_blk_mem(new_block, block_buffer->mem_size());
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  // dump data if mem used > 16MB
 | 
			
		||||
  const int64_t dump_threshold = 1 << 24;
 | 
			
		||||
  if (OB_SUCC(ret) && mem_used_ > dump_threshold) {
 | 
			
		||||
    if (OB_FAIL(dump(false /* reuse */, true /* all_dump */))) {
 | 
			
		||||
      LOG_WARN("dump failed", K(ret));
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -288,7 +288,14 @@ int ObTempBlockStore::append_block_payload(const char *buf, const int64_t size,
 | 
			
		||||
    MEMCPY(blk_->payload_, buf, size);
 | 
			
		||||
    block_id_cnt_ = blk_->end();
 | 
			
		||||
    blk_->get_buffer()->fast_advance(size);
 | 
			
		||||
    LOG_DEBUG("append block payload", K(*this), K(*blk_));
 | 
			
		||||
    LOG_DEBUG("append block payload", K(*this), K(*blk_), K(mem_used_), K(mem_hold_));
 | 
			
		||||
  }
 | 
			
		||||
  // dump data if mem used > 16MB
 | 
			
		||||
  const int64_t dump_threshold = 1 << 24;
 | 
			
		||||
  if (OB_SUCC(ret) && mem_used_ > dump_threshold) {
 | 
			
		||||
    if (OB_FAIL(dump(false /* reuse */, true /* all_dump */))) {
 | 
			
		||||
      LOG_WARN("dump failed", K(ret));
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user