opt whole scnner io buf
This commit is contained in:
		@ -42,17 +42,16 @@ ObSSTableRowWholeScanner::~ObSSTableRowWholeScanner()
 | 
				
			|||||||
  }
 | 
					  }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
int ObSSTableRowWholeScanner::alloc_io_buf()
 | 
					int ObSSTableRowWholeScanner::alloc_io_buf(compaction::ObCompactionBuffer &io_buf, int64_t buf_size)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
  int ret = OB_SUCCESS;
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
  int64_t size = common::OB_DEFAULT_MACRO_BLOCK_SIZE * PREFETCH_DEPTH;
 | 
					  int64_t size = common::OB_DEFAULT_MACRO_BLOCK_SIZE * PREFETCH_DEPTH;
 | 
				
			||||||
  if (OB_ISNULL(buf_ = reinterpret_cast<char*>(io_allocator_.alloc(size)))) { //continuous memory
 | 
					  if (OB_LIKELY(io_buf.is_inited())) {
 | 
				
			||||||
    ret = OB_ALLOCATE_MEMORY_FAILED;
 | 
					    if (OB_FAIL(io_buf.reserve(buf_size))) {
 | 
				
			||||||
    STORAGE_LOG(WARN, "failed to alloc ObSSTableRowWholeScanner read info buffer", K(ret), K(size));
 | 
					      LOG_WARN("fail to reserve io buf", K(ret), K(io_buf), K(buf_size));
 | 
				
			||||||
  } else {
 | 
					 | 
				
			||||||
    for (int64_t i = 0; OB_SUCC(ret) && i < PREFETCH_DEPTH; ++i) {
 | 
					 | 
				
			||||||
      io_buf_[i] = buf_ + common::OB_DEFAULT_MACRO_BLOCK_SIZE * i;
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					  } else if (OB_FAIL(io_buf.init(common::OB_DEFAULT_MACRO_BLOCK_SIZE, buf_size))) {
 | 
				
			||||||
 | 
					    LOG_WARN("fail to init io buf", K(ret), K(io_buf), K(buf_size));
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
  return ret;
 | 
					  return ret;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@ -77,8 +76,6 @@ void ObSSTableRowWholeScanner::reset()
 | 
				
			|||||||
    micro_scanner_ = nullptr;
 | 
					    micro_scanner_ = nullptr;
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
  allocator_.reset();
 | 
					  allocator_.reset();
 | 
				
			||||||
  io_allocator_.reset();
 | 
					 | 
				
			||||||
  buf_ = nullptr;
 | 
					 | 
				
			||||||
  is_inited_ = false;
 | 
					  is_inited_ = false;
 | 
				
			||||||
  last_micro_block_recycled_ = false;
 | 
					  last_micro_block_recycled_ = false;
 | 
				
			||||||
  last_mvcc_row_already_output_ = false;
 | 
					  last_mvcc_row_already_output_ = false;
 | 
				
			||||||
@ -86,7 +83,7 @@ void ObSSTableRowWholeScanner::reset()
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
void ObSSTableRowWholeScanner::reuse()
 | 
					void ObSSTableRowWholeScanner::reuse()
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
  ObStoreRowIterator::reuse();
 | 
					  ObStoreRowIterator::reset();
 | 
				
			||||||
  iter_param_ = nullptr;
 | 
					  iter_param_ = nullptr;
 | 
				
			||||||
  access_ctx_ = nullptr;
 | 
					  access_ctx_ = nullptr;
 | 
				
			||||||
  sstable_ = nullptr;
 | 
					  sstable_ = nullptr;
 | 
				
			||||||
@ -104,8 +101,6 @@ void ObSSTableRowWholeScanner::reuse()
 | 
				
			|||||||
    micro_scanner_ = nullptr;
 | 
					    micro_scanner_ = nullptr;
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
  allocator_.reuse();
 | 
					  allocator_.reuse();
 | 
				
			||||||
  io_allocator_.reuse();
 | 
					 | 
				
			||||||
  buf_ = nullptr;
 | 
					 | 
				
			||||||
  is_inited_ = false;
 | 
					  is_inited_ = false;
 | 
				
			||||||
  last_micro_block_recycled_ = false;
 | 
					  last_micro_block_recycled_ = false;
 | 
				
			||||||
  last_mvcc_row_already_output_ = false;
 | 
					  last_mvcc_row_already_output_ = false;
 | 
				
			||||||
@ -208,8 +203,6 @@ int ObSSTableRowWholeScanner::inner_open(
 | 
				
			|||||||
      rowkey_read_info = iter_param.read_info_;
 | 
					      rowkey_read_info = iter_param.read_info_;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    if (OB_FAIL(ret)) {
 | 
					    if (OB_FAIL(ret)) {
 | 
				
			||||||
    } else if (OB_FAIL(alloc_io_buf())) {
 | 
					 | 
				
			||||||
      LOG_WARN("alloc io buffers failed", K(ret));
 | 
					 | 
				
			||||||
    } else if (OB_FAIL(init_micro_scanner(range))) {
 | 
					    } else if (OB_FAIL(init_micro_scanner(range))) {
 | 
				
			||||||
      LOG_WARN("Failed to init micro scanner", K(ret));
 | 
					      LOG_WARN("Failed to init micro scanner", K(ret));
 | 
				
			||||||
    } else if (OB_FAIL(macro_block_iter_.open(
 | 
					    } else if (OB_FAIL(macro_block_iter_.open(
 | 
				
			||||||
@ -272,8 +265,8 @@ int ObSSTableRowWholeScanner::open(
 | 
				
			|||||||
    MacroScanHandle &scan_handle = scan_handles_[0];
 | 
					    MacroScanHandle &scan_handle = scan_handles_[0];
 | 
				
			||||||
    scan_handle.reset();
 | 
					    scan_handle.reset();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if (OB_FAIL(alloc_io_buf())) {
 | 
					    if (OB_FAIL(alloc_io_buf(io_buf_[0], sstable_->get_macro_read_size()))) {
 | 
				
			||||||
      LOG_WARN("alloc io buffers failed", K(ret));
 | 
					      LOG_WARN("alloc io buffers failed", K(ret), K(sstable_->get_macro_read_size()));
 | 
				
			||||||
    } else if (OB_FAIL(init_micro_scanner(&query_range))) {
 | 
					    } else if (OB_FAIL(init_micro_scanner(&query_range))) {
 | 
				
			||||||
      LOG_WARN("Fail to init micro scanner", K(ret));
 | 
					      LOG_WARN("Fail to init micro scanner", K(ret));
 | 
				
			||||||
    } else {
 | 
					    } else {
 | 
				
			||||||
@ -287,7 +280,7 @@ int ObSSTableRowWholeScanner::open(
 | 
				
			|||||||
      read_info.size_ = sstable_->get_macro_read_size();
 | 
					      read_info.size_ = sstable_->get_macro_read_size();
 | 
				
			||||||
      read_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_COMPACT_READ);
 | 
					      read_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_COMPACT_READ);
 | 
				
			||||||
      read_info.io_desc_.set_group_id(ObIOModule::SSTABLE_WHOLE_SCANNER_IO);
 | 
					      read_info.io_desc_.set_group_id(ObIOModule::SSTABLE_WHOLE_SCANNER_IO);
 | 
				
			||||||
      read_info.buf_ = io_buf_[0];
 | 
					      read_info.buf_ = io_buf_[0].data();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
      if (OB_FAIL(ret)) {
 | 
					      if (OB_FAIL(ret)) {
 | 
				
			||||||
      } else if (OB_FAIL(ObBlockManager::async_read_block(read_info, scan_handle.macro_io_handle_))) {
 | 
					      } else if (OB_FAIL(ObBlockManager::async_read_block(read_info, scan_handle.macro_io_handle_))) {
 | 
				
			||||||
@ -424,6 +417,8 @@ int ObSSTableRowWholeScanner::prefetch()
 | 
				
			|||||||
      } else {
 | 
					      } else {
 | 
				
			||||||
        LOG_WARN("Fail to get_next_macro_block ", K(ret), K(macro_block_iter_));
 | 
					        LOG_WARN("Fail to get_next_macro_block ", K(ret), K(macro_block_iter_));
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
 | 
					    } else if (OB_FAIL(alloc_io_buf(io_buf_[io_index], sstable_->get_macro_read_size()))) {
 | 
				
			||||||
 | 
					      LOG_WARN("alloc io buffers failed", K(ret), K(sstable_->get_macro_read_size()));
 | 
				
			||||||
    } else {
 | 
					    } else {
 | 
				
			||||||
      scan_handle.is_left_border_ = (0 == prefetch_macro_cursor_);
 | 
					      scan_handle.is_left_border_ = (0 == prefetch_macro_cursor_);
 | 
				
			||||||
      scan_handle.is_right_border_ = false; // set right border correctly when open macro block
 | 
					      scan_handle.is_right_border_ = false; // set right border correctly when open macro block
 | 
				
			||||||
@ -433,7 +428,7 @@ int ObSSTableRowWholeScanner::prefetch()
 | 
				
			|||||||
      read_info.size_ = sstable_->get_macro_read_size();
 | 
					      read_info.size_ = sstable_->get_macro_read_size();
 | 
				
			||||||
      read_info.io_desc_.set_wait_event(common::ObWaitEventIds::DB_FILE_COMPACT_READ);
 | 
					      read_info.io_desc_.set_wait_event(common::ObWaitEventIds::DB_FILE_COMPACT_READ);
 | 
				
			||||||
      read_info.io_desc_.set_group_id(ObIOModule::SSTABLE_WHOLE_SCANNER_IO);
 | 
					      read_info.io_desc_.set_group_id(ObIOModule::SSTABLE_WHOLE_SCANNER_IO);
 | 
				
			||||||
      read_info.buf_ = io_buf_[io_index];
 | 
					      read_info.buf_ = io_buf_[io_index].data();
 | 
				
			||||||
      if (OB_FAIL(ret)) {
 | 
					      if (OB_FAIL(ret)) {
 | 
				
			||||||
      } else if (OB_FAIL(ObBlockManager::async_read_block(read_info, scan_handle.macro_io_handle_))) {
 | 
					      } else if (OB_FAIL(ObBlockManager::async_read_block(read_info, scan_handle.macro_io_handle_))) {
 | 
				
			||||||
        LOG_WARN("Fail to read macro block, ", K(ret), K(read_info));
 | 
					        LOG_WARN("Fail to read macro block, ", K(ret), K(read_info));
 | 
				
			||||||
 | 
				
			|||||||
@ -56,7 +56,6 @@ public:
 | 
				
			|||||||
      access_ctx_(nullptr),
 | 
					      access_ctx_(nullptr),
 | 
				
			||||||
      sstable_(nullptr),
 | 
					      sstable_(nullptr),
 | 
				
			||||||
      allocator_(common::ObModIds::OB_SSTABLE_READER, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
 | 
					      allocator_(common::ObModIds::OB_SSTABLE_READER, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
 | 
				
			||||||
      io_allocator_("SSTRWS_IOUB", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
 | 
					 | 
				
			||||||
      prefetch_macro_cursor_(0),
 | 
					      prefetch_macro_cursor_(0),
 | 
				
			||||||
      cur_macro_cursor_(0),
 | 
					      cur_macro_cursor_(0),
 | 
				
			||||||
      is_macro_prefetch_end_(false),
 | 
					      is_macro_prefetch_end_(false),
 | 
				
			||||||
@ -68,12 +67,13 @@ public:
 | 
				
			|||||||
      last_micro_block_recycled_(false),
 | 
					      last_micro_block_recycled_(false),
 | 
				
			||||||
      last_mvcc_row_already_output_(false),
 | 
					      last_mvcc_row_already_output_(false),
 | 
				
			||||||
      iter_macro_cnt_(0),
 | 
					      iter_macro_cnt_(0),
 | 
				
			||||||
      buf_(nullptr)
 | 
					      io_buf_()
 | 
				
			||||||
  {}
 | 
					  {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  virtual ~ObSSTableRowWholeScanner();
 | 
					  virtual ~ObSSTableRowWholeScanner();
 | 
				
			||||||
  int alloc_io_buf();
 | 
					  int alloc_io_buf(compaction::ObCompactionBuffer &io_buf, int64_t buf_size);
 | 
				
			||||||
  virtual void reset() override;
 | 
					  virtual void reset() override;
 | 
				
			||||||
 | 
					  virtual void reuse() override;
 | 
				
			||||||
  int open(
 | 
					  int open(
 | 
				
			||||||
      const ObTableIterParam &iter_param,
 | 
					      const ObTableIterParam &iter_param,
 | 
				
			||||||
      ObTableAccessContext &access_ctx,
 | 
					      ObTableAccessContext &access_ctx,
 | 
				
			||||||
@ -97,7 +97,6 @@ protected:
 | 
				
			|||||||
      ObITable *table,
 | 
					      ObITable *table,
 | 
				
			||||||
      const void *query_range) override;
 | 
					      const void *query_range) override;
 | 
				
			||||||
  virtual int inner_get_next_row(const blocksstable::ObDatumRow *&row) override;
 | 
					  virtual int inner_get_next_row(const blocksstable::ObDatumRow *&row) override;
 | 
				
			||||||
  virtual void reuse() override;
 | 
					 | 
				
			||||||
private:
 | 
					private:
 | 
				
			||||||
  int init_micro_scanner(const blocksstable::ObDatumRange *range);
 | 
					  int init_micro_scanner(const blocksstable::ObDatumRange *range);
 | 
				
			||||||
  int open_macro_block();
 | 
					  int open_macro_block();
 | 
				
			||||||
@ -126,7 +125,6 @@ private:
 | 
				
			|||||||
  blocksstable::ObSSTable *sstable_;
 | 
					  blocksstable::ObSSTable *sstable_;
 | 
				
			||||||
  blocksstable::ObDatumRange query_range_;
 | 
					  blocksstable::ObDatumRange query_range_;
 | 
				
			||||||
  common::ObArenaAllocator allocator_;
 | 
					  common::ObArenaAllocator allocator_;
 | 
				
			||||||
  common::ObArenaAllocator io_allocator_;
 | 
					 | 
				
			||||||
  int64_t prefetch_macro_cursor_;
 | 
					  int64_t prefetch_macro_cursor_;
 | 
				
			||||||
  int64_t cur_macro_cursor_;
 | 
					  int64_t cur_macro_cursor_;
 | 
				
			||||||
  bool is_macro_prefetch_end_;
 | 
					  bool is_macro_prefetch_end_;
 | 
				
			||||||
@ -140,8 +138,7 @@ private:
 | 
				
			|||||||
  bool last_micro_block_recycled_;
 | 
					  bool last_micro_block_recycled_;
 | 
				
			||||||
  bool last_mvcc_row_already_output_;
 | 
					  bool last_mvcc_row_already_output_;
 | 
				
			||||||
  int64_t iter_macro_cnt_;
 | 
					  int64_t iter_macro_cnt_;
 | 
				
			||||||
  char *buf_;
 | 
					  compaction::ObCompactionBuffer io_buf_[PREFETCH_DEPTH];
 | 
				
			||||||
  char *io_buf_[PREFETCH_DEPTH];
 | 
					 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -56,237 +56,34 @@ void ObMicroBlockDesc::reset()
 | 
				
			|||||||
 /**
 | 
					 /**
 | 
				
			||||||
 * -------------------------------------------------------------------ObMicroBufferWriter-------------------------------------------------------------------
 | 
					 * -------------------------------------------------------------------ObMicroBufferWriter-------------------------------------------------------------------
 | 
				
			||||||
 */
 | 
					 */
 | 
				
			||||||
int ObMicroBufferWriter::init(const int64_t capacity, const int64_t reserve_size)
 | 
					int ObMicroBufferWriter::write_row(const ObDatumRow &row, const int64_t rowkey_cnt, int64_t &size)
 | 
				
			||||||
{
 | 
					 | 
				
			||||||
  int ret = OB_SUCCESS;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  if (OB_UNLIKELY(is_inited_)) {
 | 
					 | 
				
			||||||
    ret = OB_INIT_TWICE;
 | 
					 | 
				
			||||||
    STORAGE_LOG(WARN, "micro buffer writer is inited", K(ret), K(capacity_));
 | 
					 | 
				
			||||||
  } else if (OB_UNLIKELY(reserve_size < 0 || capacity > MAX_DATA_BUFFER_SIZE
 | 
					 | 
				
			||||||
      || capacity < reserve_size)) {
 | 
					 | 
				
			||||||
    ret = OB_INVALID_ARGUMENT;
 | 
					 | 
				
			||||||
    STORAGE_LOG(WARN, "invalid argument", K(ret), K(capacity), K(reserve_size));
 | 
					 | 
				
			||||||
  } else {
 | 
					 | 
				
			||||||
    capacity_ = capacity;
 | 
					 | 
				
			||||||
    len_ = 0;
 | 
					 | 
				
			||||||
    data_= nullptr;
 | 
					 | 
				
			||||||
    buffer_size_ = 0;
 | 
					 | 
				
			||||||
    reset_memory_threshold_ = DEFAULT_RESET_MEMORY_THRESHOLD;
 | 
					 | 
				
			||||||
    memory_reclaim_cnt_ = 0;
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  if (OB_SUCC(ret)) {
 | 
					 | 
				
			||||||
    if(OB_FAIL(reserve(reserve_size))) {
 | 
					 | 
				
			||||||
      STORAGE_LOG(WARN, "failed to reserve", K(ret), K(reserve_size));
 | 
					 | 
				
			||||||
    } else {
 | 
					 | 
				
			||||||
      default_reserve_ = reserve_size;
 | 
					 | 
				
			||||||
      is_inited_ = true;
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  return ret;
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
void ObMicroBufferWriter::reset()
 | 
					 | 
				
			||||||
{
 | 
					 | 
				
			||||||
  if (data_ != nullptr) {
 | 
					 | 
				
			||||||
    allocator_.free(data_);
 | 
					 | 
				
			||||||
    data_ = nullptr;
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
  has_expand_ = false;
 | 
					 | 
				
			||||||
  memory_reclaim_cnt_ = 0;
 | 
					 | 
				
			||||||
  reset_memory_threshold_ = 0;
 | 
					 | 
				
			||||||
  default_reserve_ = 0;
 | 
					 | 
				
			||||||
  len_ = 0;
 | 
					 | 
				
			||||||
  buffer_size_ = 0;
 | 
					 | 
				
			||||||
  capacity_ = 0;
 | 
					 | 
				
			||||||
  is_inited_ = false;
 | 
					 | 
				
			||||||
  allocator_.reset();
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
void ObMicroBufferWriter::reuse()
 | 
					 | 
				
			||||||
{
 | 
					 | 
				
			||||||
  if (buffer_size_ > default_reserve_ && len_ <= default_reserve_) {
 | 
					 | 
				
			||||||
    memory_reclaim_cnt_++;
 | 
					 | 
				
			||||||
    if (memory_reclaim_cnt_ >= reset_memory_threshold_) {
 | 
					 | 
				
			||||||
      reset_memory_threshold_ <<= 1;
 | 
					 | 
				
			||||||
      memory_reclaim_cnt_ = 0;
 | 
					 | 
				
			||||||
      void *buf = nullptr;
 | 
					 | 
				
			||||||
      if (OB_ISNULL(buf = allocator_.alloc(default_reserve_))) {
 | 
					 | 
				
			||||||
        int ret = OB_ALLOCATE_MEMORY_FAILED;
 | 
					 | 
				
			||||||
        STORAGE_LOG(WARN, "failed to reclaim memory", K(ret), K(default_reserve_));
 | 
					 | 
				
			||||||
      } else {
 | 
					 | 
				
			||||||
        allocator_.free(data_);
 | 
					 | 
				
			||||||
        buffer_size_ = default_reserve_;
 | 
					 | 
				
			||||||
        data_ = reinterpret_cast<char *>(buf);
 | 
					 | 
				
			||||||
      }
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
  } else {
 | 
					 | 
				
			||||||
    memory_reclaim_cnt_ = 0;
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
  has_expand_ = false;
 | 
					 | 
				
			||||||
  len_ = 0;
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
int ObMicroBufferWriter::expand(const int64_t size)
 | 
					 | 
				
			||||||
{
 | 
					 | 
				
			||||||
  int ret = OB_SUCCESS;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  if (OB_UNLIKELY(capacity_ <= buffer_size_ || size > capacity_)) {
 | 
					 | 
				
			||||||
    ret = OB_INVALID_ARGUMENT;
 | 
					 | 
				
			||||||
    STORAGE_LOG(WARN, "invalid argument", K(ret), K(size), K(buffer_size_), K(capacity_));
 | 
					 | 
				
			||||||
  } else {
 | 
					 | 
				
			||||||
    int64_t expand_size = buffer_size_ * 2;
 | 
					 | 
				
			||||||
    while (expand_size < size) {
 | 
					 | 
				
			||||||
      expand_size <<= 1;
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
    expand_size = MIN(expand_size, capacity_);
 | 
					 | 
				
			||||||
    if (OB_FAIL(reserve(expand_size))) {
 | 
					 | 
				
			||||||
      STORAGE_LOG(WARN, "fail to reserve", K(ret), K(expand_size));
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  return ret;
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
int ObMicroBufferWriter::reserve(const int64_t size)
 | 
					 | 
				
			||||||
{
 | 
					 | 
				
			||||||
  int ret = OB_SUCCESS;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  if (OB_UNLIKELY(size < 0 || size > capacity_)) {
 | 
					 | 
				
			||||||
    ret = OB_INVALID_ARGUMENT;
 | 
					 | 
				
			||||||
    STORAGE_LOG(WARN, "invalid argument", K(ret), K(size), K(capacity_));
 | 
					 | 
				
			||||||
  } else if (size <= buffer_size_) {//do nothing
 | 
					 | 
				
			||||||
  } else {
 | 
					 | 
				
			||||||
    void* buf = nullptr;
 | 
					 | 
				
			||||||
    const int64_t alloc_size = MAX(size, MIN_BUFFER_SIZE);
 | 
					 | 
				
			||||||
    if (OB_ISNULL(buf = allocator_.alloc(alloc_size))) {
 | 
					 | 
				
			||||||
      ret = OB_ALLOCATE_MEMORY_FAILED;
 | 
					 | 
				
			||||||
      STORAGE_LOG(WARN, "failed to alloc memory", K(ret), K(alloc_size));
 | 
					 | 
				
			||||||
    } else if (data_ != nullptr) {
 | 
					 | 
				
			||||||
      has_expand_ = true;
 | 
					 | 
				
			||||||
      MEMCPY(buf, data_, len_);
 | 
					 | 
				
			||||||
      allocator_.free(data_);
 | 
					 | 
				
			||||||
      data_ = nullptr;
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
    if (OB_SUCC(ret)) {
 | 
					 | 
				
			||||||
      data_ = reinterpret_cast<char *>(buf);
 | 
					 | 
				
			||||||
      buffer_size_ = alloc_size;
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  return ret;
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
int ObMicroBufferWriter::ensure_space(const int64_t append_size)
 | 
					 | 
				
			||||||
{
 | 
					 | 
				
			||||||
  int ret = OB_SUCCESS;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  if (len_ + append_size > capacity_) {
 | 
					 | 
				
			||||||
    ret = OB_BUF_NOT_ENOUGH;
 | 
					 | 
				
			||||||
  } else if (len_ + append_size > buffer_size_) {
 | 
					 | 
				
			||||||
    if (OB_FAIL(expand(len_ + append_size))) {
 | 
					 | 
				
			||||||
      STORAGE_LOG(WARN, "failed to expand size", K(ret), K(len_), K(append_size));
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  return ret;
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
int ObMicroBufferWriter::write_nop(const int64_t size, bool is_zero)
 | 
					 | 
				
			||||||
{
 | 
					 | 
				
			||||||
  int ret = OB_SUCCESS;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  if (OB_UNLIKELY(size < 0)) {
 | 
					 | 
				
			||||||
    ret = OB_INVALID_ARGUMENT;
 | 
					 | 
				
			||||||
    STORAGE_LOG(WARN, "invalid argument", K(ret), K(size), K(len_), K(capacity_));
 | 
					 | 
				
			||||||
  } else if (OB_FAIL(ensure_space(size))) {
 | 
					 | 
				
			||||||
    if (ret != OB_BUF_NOT_ENOUGH) {
 | 
					 | 
				
			||||||
      STORAGE_LOG(WARN, "failed to ensure space", K(ret), K(size));
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
  } else {
 | 
					 | 
				
			||||||
    if (is_zero) {
 | 
					 | 
				
			||||||
      MEMSET(data_ + len_, 0, size);
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
    len_ += size;
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  return ret;
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
int ObMicroBufferWriter::write(const ObDatumRow &row, const int64_t rowkey_cnt, int64_t &size)
 | 
					 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
  int ret = OB_SUCCESS;
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
  ObRowWriter row_writer;
 | 
					  ObRowWriter row_writer;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  if ((buffer_size_ == len_) && OB_FAIL(expand(buffer_size_))) {
 | 
					  if (remain_buffer_size() <= 0 && OB_FAIL(expand(ObCompactionBuffer::size()))) {
 | 
				
			||||||
    STORAGE_LOG(WARN, "failed to reserve", K(ret), K(buffer_size_));
 | 
					    STORAGE_LOG(WARN, "failed to reserve", K(ret));
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  while (OB_SUCC(ret)) {
 | 
					  while (OB_SUCC(ret)) {
 | 
				
			||||||
    if (OB_SUCC(row_writer.write(rowkey_cnt, row, data_ + len_, buffer_size_ - len_, size))) {
 | 
					    if (OB_SUCC(row_writer.write(rowkey_cnt, row, current(), remain_buffer_size(), size))) {
 | 
				
			||||||
      break;
 | 
					      break;
 | 
				
			||||||
    } else {
 | 
					    } else {
 | 
				
			||||||
      if (OB_UNLIKELY(ret != OB_BUF_NOT_ENOUGH)) {
 | 
					      if (OB_UNLIKELY(ret != OB_BUF_NOT_ENOUGH)) {
 | 
				
			||||||
        STORAGE_LOG(WARN, "failed to write row", K(ret), K(buffer_size_), K(capacity_));
 | 
					        STORAGE_LOG(WARN, "failed to write row", K(ret), KPC(this));
 | 
				
			||||||
      } else if (buffer_size_ >= capacity_) { //break
 | 
					      } else if (!check_could_expand()) { //break
 | 
				
			||||||
      } else if (OB_FAIL(expand(buffer_size_))) {
 | 
					      } else if (OB_FAIL(expand(ObCompactionBuffer::size()))) {
 | 
				
			||||||
        STORAGE_LOG(WARN, "failed to reserve", K(ret), K(buffer_size_));
 | 
					        STORAGE_LOG(WARN, "failed to reserve", K(ret));
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  if (OB_SUCC(ret)) {
 | 
					  if (OB_SUCC(ret)) {
 | 
				
			||||||
    len_ += size;
 | 
					    write_nop(size);
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
  return ret;
 | 
					  return ret;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
int ObMicroBufferWriter::write(const void *buf, int64_t size)
 | 
					 | 
				
			||||||
{
 | 
					 | 
				
			||||||
  int ret = OB_SUCCESS;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  if (OB_UNLIKELY(buf == nullptr || size < 0)) {
 | 
					 | 
				
			||||||
    ret = OB_INVALID_ARGUMENT;
 | 
					 | 
				
			||||||
    STORAGE_LOG(WARN, "invalid argument", K(ret), K(buf), K(size), K(len_), K(capacity_));
 | 
					 | 
				
			||||||
  } else if (OB_FAIL(ensure_space(size))) {
 | 
					 | 
				
			||||||
    if (ret != OB_BUF_NOT_ENOUGH) {
 | 
					 | 
				
			||||||
      STORAGE_LOG(WARN, "failed to ensure space", K(ret), K(size));
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
  } else {
 | 
					 | 
				
			||||||
    MEMCPY(data_ + len_, buf, size);
 | 
					 | 
				
			||||||
    len_ += size;
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  return ret;
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
int ObMicroBufferWriter::advance(const int64_t size)
 | 
					 | 
				
			||||||
{
 | 
					 | 
				
			||||||
  int ret = OB_SUCCESS;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  if (OB_UNLIKELY(size < 0 || len_ + size > buffer_size_)) {
 | 
					 | 
				
			||||||
    ret = OB_INVALID_ARGUMENT;
 | 
					 | 
				
			||||||
    STORAGE_LOG(WARN, "invalid argument", K(ret), K(size), K(len_), K(buffer_size_));
 | 
					 | 
				
			||||||
  } else {
 | 
					 | 
				
			||||||
    len_ += size;
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
  return ret;
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
int ObMicroBufferWriter::set_length(const int64_t len)
 | 
					 | 
				
			||||||
{
 | 
					 | 
				
			||||||
  int ret = OB_SUCCESS;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  if (OB_UNLIKELY(len > buffer_size_)) {
 | 
					 | 
				
			||||||
    ret = OB_INVALID_ARGUMENT;
 | 
					 | 
				
			||||||
    STORAGE_LOG(WARN, "invalid argument", K(ret), K(len), K(len_), K(buffer_size_));
 | 
					 | 
				
			||||||
  } else {
 | 
					 | 
				
			||||||
    len_ = len;
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
  return ret;
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 /**
 | 
					 /**
 | 
				
			||||||
 * -------------------------------------------------------------------ObIMicroBlockWriter-------------------------------------------------------------------
 | 
					 * -------------------------------------------------------------------ObIMicroBlockWriter-------------------------------------------------------------------
 | 
				
			||||||
 */
 | 
					 */
 | 
				
			||||||
 | 
				
			|||||||
@ -87,79 +87,14 @@ enum MICRO_BLOCK_MERGE_VERIFY_LEVEL
 | 
				
			|||||||
  ENCODING_AND_COMPRESSION_AND_WRITE_COMPLETE = 3,
 | 
					  ENCODING_AND_COMPRESSION_AND_WRITE_COMPLETE = 3,
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class ObMicroBufferWriter final
 | 
					class ObMicroBufferWriter : public compaction::ObCompactionBuffer
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
public:
 | 
					public:
 | 
				
			||||||
  ObMicroBufferWriter(const int64_t page_size = DEFAULT_MIDDLE_BLOCK_SIZE)
 | 
					  ObMicroBufferWriter(const int64_t page_size = DEFAULT_MIDDLE_BLOCK_SIZE)
 | 
				
			||||||
    : allocator_(MTL_ID(), "MicroBuffer"),
 | 
					    : ObCompactionBuffer("MicroBuffer", page_size)
 | 
				
			||||||
      is_inited_(false),
 | 
					 | 
				
			||||||
      capacity_(0),
 | 
					 | 
				
			||||||
      buffer_size_(0),
 | 
					 | 
				
			||||||
      len_(0),
 | 
					 | 
				
			||||||
      data_(nullptr),
 | 
					 | 
				
			||||||
      reset_memory_threshold_(0),
 | 
					 | 
				
			||||||
      memory_reclaim_cnt_(0),
 | 
					 | 
				
			||||||
      has_expand_(false)
 | 
					 | 
				
			||||||
  {}
 | 
					  {}
 | 
				
			||||||
  ~ObMicroBufferWriter() { reset(); };
 | 
					  virtual ~ObMicroBufferWriter() { reset(); };
 | 
				
			||||||
  int init(const int64_t capacity, const int64_t reserve_size = DEFAULT_MIDDLE_BLOCK_SIZE);
 | 
					  int write_row(const ObDatumRow &row, const int64_t rowkey_cnt, int64_t &len);
 | 
				
			||||||
  inline bool is_inited() const { return is_inited_; }
 | 
					 | 
				
			||||||
  inline int64_t remain() const { return capacity_ - len_; }
 | 
					 | 
				
			||||||
  inline int64_t remain_buffer_size() const { return buffer_size_ - len_; }
 | 
					 | 
				
			||||||
  inline int64_t size() const { return buffer_size_; } //curr buffer size
 | 
					 | 
				
			||||||
  inline bool has_expand() const { return has_expand_; }
 | 
					 | 
				
			||||||
  inline char *data() { return data_; }
 | 
					 | 
				
			||||||
  inline char *current() { return data_ + len_; }
 | 
					 | 
				
			||||||
  int reserve(const int64_t size);
 | 
					 | 
				
			||||||
  int ensure_space(const int64_t append_size);
 | 
					 | 
				
			||||||
  inline void pop_back(const int64_t size) { len_ = MAX(0, len_ - size); }
 | 
					 | 
				
			||||||
  int write_nop(const int64_t size, bool is_zero = false);
 | 
					 | 
				
			||||||
  int write(const ObDatumRow &row, const int64_t rowkey_cnt, int64_t &len);
 | 
					 | 
				
			||||||
  int write(const void *buf, int64_t size);
 | 
					 | 
				
			||||||
  template<typename T>
 | 
					 | 
				
			||||||
  int write(const T &value)
 | 
					 | 
				
			||||||
  {
 | 
					 | 
				
			||||||
    int ret = OB_SUCCESS;
 | 
					 | 
				
			||||||
    static_assert(std::is_trivially_copyable<T>::value, "invalid type");
 | 
					 | 
				
			||||||
    if (OB_FAIL(ensure_space(sizeof(T)))) {
 | 
					 | 
				
			||||||
      if (ret != OB_BUF_NOT_ENOUGH) {
 | 
					 | 
				
			||||||
        STORAGE_LOG(WARN, "failed to ensure space", K(ret), K(sizeof(T)));
 | 
					 | 
				
			||||||
      }
 | 
					 | 
				
			||||||
    } else {
 | 
					 | 
				
			||||||
      *((T *)(data_ + len_)) = value;
 | 
					 | 
				
			||||||
      len_ += sizeof(T);
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
    return ret;
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
  int advance(const int64_t size);
 | 
					 | 
				
			||||||
  int set_length(const int64_t len);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  void reuse();
 | 
					 | 
				
			||||||
  void reset();
 | 
					 | 
				
			||||||
  inline int64_t length() const { return len_; }
 | 
					 | 
				
			||||||
  TO_STRING_KV(K_(capacity), K_(buffer_size), K_(len), K_(data), K_(default_reserve), K_(reset_memory_threshold),
 | 
					 | 
				
			||||||
      K_(memory_reclaim_cnt), K_(has_expand));
 | 
					 | 
				
			||||||
private:
 | 
					 | 
				
			||||||
  int expand(const int64_t size);
 | 
					 | 
				
			||||||
private:
 | 
					 | 
				
			||||||
  compaction::ObLocalAllocator<common::DefaultPageAllocator> allocator_;
 | 
					 | 
				
			||||||
  bool is_inited_;
 | 
					 | 
				
			||||||
  int64_t capacity_;
 | 
					 | 
				
			||||||
  int64_t buffer_size_; //curr buffer size
 | 
					 | 
				
			||||||
  int64_t len_; //curr pos
 | 
					 | 
				
			||||||
  char *data_;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  // for reclaim memory
 | 
					 | 
				
			||||||
  int64_t default_reserve_;
 | 
					 | 
				
			||||||
  int64_t reset_memory_threshold_;
 | 
					 | 
				
			||||||
  int64_t memory_reclaim_cnt_;
 | 
					 | 
				
			||||||
  bool has_expand_;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
private:
 | 
					 | 
				
			||||||
  static const int64_t MIN_BUFFER_SIZE = 1 << 12; //4kb
 | 
					 | 
				
			||||||
  static const int64_t MAX_DATA_BUFFER_SIZE = 2 * common::OB_DEFAULT_MACRO_BLOCK_SIZE; // 4m
 | 
					 | 
				
			||||||
  static const int64_t DEFAULT_MIDDLE_BLOCK_SIZE = 1 << 16; //64K
 | 
					 | 
				
			||||||
  static const int64_t DEFAULT_RESET_MEMORY_THRESHOLD = 5;
 | 
					 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Some common interface of ObMicroBlockWriter and ObMicroBlockEncoder, not all features.
 | 
					// Some common interface of ObMicroBlockWriter and ObMicroBlockEncoder, not all features.
 | 
				
			||||||
 | 
				
			|||||||
@ -173,7 +173,7 @@ int ObMicroBlockWriter::append_row(const ObDatumRow &row)
 | 
				
			|||||||
      ret = OB_INVALID_ARGUMENT;
 | 
					      ret = OB_INVALID_ARGUMENT;
 | 
				
			||||||
      STORAGE_LOG(WARN, "append row column count is not consistent with init column count",
 | 
					      STORAGE_LOG(WARN, "append row column count is not consistent with init column count",
 | 
				
			||||||
          K(get_header(data_buffer_)->column_count_), K(row.get_column_count()), K(ret));
 | 
					          K(get_header(data_buffer_)->column_count_), K(row.get_column_count()), K(ret));
 | 
				
			||||||
    } else if (OB_FAIL(data_buffer_.write(row, rowkey_column_count_, pos))) {
 | 
					    } else if (OB_FAIL(data_buffer_.write_row(row, rowkey_column_count_, pos))) {
 | 
				
			||||||
      if (OB_BUF_NOT_ENOUGH != ret) {
 | 
					      if (OB_BUF_NOT_ENOUGH != ret) {
 | 
				
			||||||
        STORAGE_LOG(WARN, "row writer fail to write row.", K(ret), K(rowkey_column_count_),
 | 
					        STORAGE_LOG(WARN, "row writer fail to write row.", K(ret), K(rowkey_column_count_),
 | 
				
			||||||
            K(row), K(OB_P(data_buffer_.remain())), K(pos));
 | 
					            K(row), K(OB_P(data_buffer_.remain())), K(pos));
 | 
				
			||||||
 | 
				
			|||||||
@ -162,6 +162,213 @@ void ObCompactionMemoryContext::mem_click()
 | 
				
			|||||||
  mem_peak_total_ = MAX(mem_peak_total_, mem_total);
 | 
					  mem_peak_total_ = MAX(mem_peak_total_, mem_total);
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					 /**
 | 
				
			||||||
 | 
					 * -------------------------------------------------------------------ObCompactionBuffer-------------------------------------------------------------------
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					int ObCompactionBuffer::init(const int64_t capacity, const int64_t reserve_size)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  if (OB_UNLIKELY(is_inited_)) {
 | 
				
			||||||
 | 
					    ret = OB_INIT_TWICE;
 | 
				
			||||||
 | 
					    STORAGE_LOG(WARN, "micro buffer writer is inited", K(ret), K(capacity_));
 | 
				
			||||||
 | 
					  } else if (OB_UNLIKELY(reserve_size < 0 || capacity > MAX_DATA_BUFFER_SIZE
 | 
				
			||||||
 | 
					      || capacity < reserve_size)) {
 | 
				
			||||||
 | 
					    ret = OB_INVALID_ARGUMENT;
 | 
				
			||||||
 | 
					    STORAGE_LOG(WARN, "invalid argument", K(ret), K(capacity), K(reserve_size));
 | 
				
			||||||
 | 
					  } else {
 | 
				
			||||||
 | 
					    capacity_ = capacity;
 | 
				
			||||||
 | 
					    len_ = 0;
 | 
				
			||||||
 | 
					    data_= nullptr;
 | 
				
			||||||
 | 
					    buffer_size_ = 0;
 | 
				
			||||||
 | 
					    reset_memory_threshold_ = DEFAULT_RESET_MEMORY_THRESHOLD;
 | 
				
			||||||
 | 
					    memory_reclaim_cnt_ = 0;
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  if (OB_SUCC(ret)) {
 | 
				
			||||||
 | 
					    if(OB_FAIL(reserve(reserve_size))) {
 | 
				
			||||||
 | 
					      STORAGE_LOG(WARN, "failed to reserve", K(ret), K(reserve_size));
 | 
				
			||||||
 | 
					    } else {
 | 
				
			||||||
 | 
					      default_reserve_ = reserve_size;
 | 
				
			||||||
 | 
					      is_inited_ = true;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  return ret;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					void ObCompactionBuffer::reset()
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					  if (data_ != nullptr) {
 | 
				
			||||||
 | 
					    allocator_.free(data_);
 | 
				
			||||||
 | 
					    data_ = nullptr;
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					  has_expand_ = false;
 | 
				
			||||||
 | 
					  memory_reclaim_cnt_ = 0;
 | 
				
			||||||
 | 
					  reset_memory_threshold_ = 0;
 | 
				
			||||||
 | 
					  default_reserve_ = 0;
 | 
				
			||||||
 | 
					  len_ = 0;
 | 
				
			||||||
 | 
					  buffer_size_ = 0;
 | 
				
			||||||
 | 
					  capacity_ = 0;
 | 
				
			||||||
 | 
					  is_inited_ = false;
 | 
				
			||||||
 | 
					  allocator_.reset();
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					void ObCompactionBuffer::reuse()
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					  if (buffer_size_ > default_reserve_ && len_ <= default_reserve_) {
 | 
				
			||||||
 | 
					    memory_reclaim_cnt_++;
 | 
				
			||||||
 | 
					    if (memory_reclaim_cnt_ >= reset_memory_threshold_) {
 | 
				
			||||||
 | 
					      reset_memory_threshold_ <<= 1;
 | 
				
			||||||
 | 
					      memory_reclaim_cnt_ = 0;
 | 
				
			||||||
 | 
					      void *buf = nullptr;
 | 
				
			||||||
 | 
					      if (OB_ISNULL(buf = allocator_.alloc(default_reserve_))) {
 | 
				
			||||||
 | 
					        int ret = OB_ALLOCATE_MEMORY_FAILED;
 | 
				
			||||||
 | 
					        STORAGE_LOG(WARN, "failed to reclaim memory", K(ret), K(default_reserve_));
 | 
				
			||||||
 | 
					      } else {
 | 
				
			||||||
 | 
					        allocator_.free(data_);
 | 
				
			||||||
 | 
					        buffer_size_ = default_reserve_;
 | 
				
			||||||
 | 
					        data_ = reinterpret_cast<char *>(buf);
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  } else {
 | 
				
			||||||
 | 
					    memory_reclaim_cnt_ = 0;
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					  has_expand_ = false;
 | 
				
			||||||
 | 
					  len_ = 0;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					int ObCompactionBuffer::expand(const int64_t size)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  if (OB_UNLIKELY(capacity_ <= buffer_size_ || size > capacity_)) {
 | 
				
			||||||
 | 
					    ret = OB_INVALID_ARGUMENT;
 | 
				
			||||||
 | 
					    STORAGE_LOG(WARN, "invalid argument", K(ret), K(size), K(buffer_size_), K(capacity_));
 | 
				
			||||||
 | 
					  } else {
 | 
				
			||||||
 | 
					    int64_t expand_size = buffer_size_ * 2;
 | 
				
			||||||
 | 
					    while (expand_size < size) {
 | 
				
			||||||
 | 
					      expand_size <<= 1;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    expand_size = MIN(expand_size, capacity_);
 | 
				
			||||||
 | 
					    if (OB_FAIL(reserve(expand_size))) {
 | 
				
			||||||
 | 
					      STORAGE_LOG(WARN, "fail to reserve", K(ret), K(expand_size));
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  return ret;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					int ObCompactionBuffer::reserve(const int64_t size)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  if (OB_UNLIKELY(size < 0 || size > capacity_)) {
 | 
				
			||||||
 | 
					    ret = OB_INVALID_ARGUMENT;
 | 
				
			||||||
 | 
					    STORAGE_LOG(WARN, "invalid argument", K(ret), K(size), K(capacity_));
 | 
				
			||||||
 | 
					  } else if (size <= buffer_size_) {//do nothing
 | 
				
			||||||
 | 
					  } else {
 | 
				
			||||||
 | 
					    void* buf = nullptr;
 | 
				
			||||||
 | 
					    const int64_t alloc_size = MAX(size, MIN_BUFFER_SIZE);
 | 
				
			||||||
 | 
					    if (OB_ISNULL(buf = allocator_.alloc(alloc_size))) {
 | 
				
			||||||
 | 
					      ret = OB_ALLOCATE_MEMORY_FAILED;
 | 
				
			||||||
 | 
					      STORAGE_LOG(WARN, "failed to alloc memory", K(ret), K(alloc_size));
 | 
				
			||||||
 | 
					    } else if (data_ != nullptr) {
 | 
				
			||||||
 | 
					      has_expand_ = true;
 | 
				
			||||||
 | 
					      MEMCPY(buf, data_, len_);
 | 
				
			||||||
 | 
					      allocator_.free(data_);
 | 
				
			||||||
 | 
					      data_ = nullptr;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    if (OB_SUCC(ret)) {
 | 
				
			||||||
 | 
					      data_ = reinterpret_cast<char *>(buf);
 | 
				
			||||||
 | 
					      buffer_size_ = alloc_size;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  return ret;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					int ObCompactionBuffer::ensure_space(const int64_t append_size)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  if (len_ + append_size > capacity_) {
 | 
				
			||||||
 | 
					    ret = OB_BUF_NOT_ENOUGH;
 | 
				
			||||||
 | 
					  } else if (len_ + append_size > buffer_size_) {
 | 
				
			||||||
 | 
					    if (OB_FAIL(expand(len_ + append_size))) {
 | 
				
			||||||
 | 
					      STORAGE_LOG(WARN, "failed to expand size", K(ret), K(len_), K(append_size));
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  return ret;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					int ObCompactionBuffer::write_nop(const int64_t size, bool is_zero)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  if (OB_UNLIKELY(size < 0)) {
 | 
				
			||||||
 | 
					    ret = OB_INVALID_ARGUMENT;
 | 
				
			||||||
 | 
					    STORAGE_LOG(WARN, "invalid argument", K(ret), K(size), K(len_), K(capacity_));
 | 
				
			||||||
 | 
					  } else if (OB_FAIL(ensure_space(size))) {
 | 
				
			||||||
 | 
					    if (ret != OB_BUF_NOT_ENOUGH) {
 | 
				
			||||||
 | 
					      STORAGE_LOG(WARN, "failed to ensure space", K(ret), K(size));
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  } else {
 | 
				
			||||||
 | 
					    if (is_zero) {
 | 
				
			||||||
 | 
					      MEMSET(data_ + len_, 0, size);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    len_ += size;
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  return ret;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					int ObCompactionBuffer::write(const void *buf, int64_t size)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  if (OB_UNLIKELY(buf == nullptr || size < 0)) {
 | 
				
			||||||
 | 
					    ret = OB_INVALID_ARGUMENT;
 | 
				
			||||||
 | 
					    STORAGE_LOG(WARN, "invalid argument", K(ret), K(buf), K(size), K(len_), K(capacity_));
 | 
				
			||||||
 | 
					  } else if (OB_FAIL(ensure_space(size))) {
 | 
				
			||||||
 | 
					    if (ret != OB_BUF_NOT_ENOUGH) {
 | 
				
			||||||
 | 
					      STORAGE_LOG(WARN, "failed to ensure space", K(ret), K(size));
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  } else {
 | 
				
			||||||
 | 
					    MEMCPY(data_ + len_, buf, size);
 | 
				
			||||||
 | 
					    len_ += size;
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  return ret;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					int ObCompactionBuffer::advance(const int64_t size)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  if (OB_UNLIKELY(size < 0 || len_ + size > buffer_size_)) {
 | 
				
			||||||
 | 
					    ret = OB_INVALID_ARGUMENT;
 | 
				
			||||||
 | 
					    STORAGE_LOG(WARN, "invalid argument", K(ret), K(size), K(len_), K(buffer_size_));
 | 
				
			||||||
 | 
					  } else {
 | 
				
			||||||
 | 
					    len_ += size;
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					  return ret;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					int ObCompactionBuffer::set_length(const int64_t len)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  if (OB_UNLIKELY(len > buffer_size_)) {
 | 
				
			||||||
 | 
					    ret = OB_INVALID_ARGUMENT;
 | 
				
			||||||
 | 
					    STORAGE_LOG(WARN, "invalid argument", K(ret), K(len), K(len_), K(buffer_size_));
 | 
				
			||||||
 | 
					  } else {
 | 
				
			||||||
 | 
					    len_ = len;
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					  return ret;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
} //compaction
 | 
					} //compaction
 | 
				
			||||||
} //oceanbase
 | 
					} //oceanbase
 | 
				
			||||||
 | 
				
			|||||||
@ -264,6 +264,96 @@ private:
 | 
				
			|||||||
  common::ObSpinLock lock_;
 | 
					  common::ObSpinLock lock_;
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class ObCompactionBuffer
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					public:
 | 
				
			||||||
 | 
					  ObCompactionBuffer(const lib::ObLabel &label = "compaction_buf", const int64_t page_size = DEFAULT_MIDDLE_BLOCK_SIZE)
 | 
				
			||||||
 | 
					    : allocator_(MTL_ID(), label),
 | 
				
			||||||
 | 
					      is_inited_(false),
 | 
				
			||||||
 | 
					      capacity_(0),
 | 
				
			||||||
 | 
					      buffer_size_(0),
 | 
				
			||||||
 | 
					      len_(0),
 | 
				
			||||||
 | 
					      data_(nullptr),
 | 
				
			||||||
 | 
					      reset_memory_threshold_(0),
 | 
				
			||||||
 | 
					      memory_reclaim_cnt_(0),
 | 
				
			||||||
 | 
					      has_expand_(false)
 | 
				
			||||||
 | 
					  {}
 | 
				
			||||||
 | 
					  virtual ~ObCompactionBuffer() { reset(); };
 | 
				
			||||||
 | 
					  int init(const int64_t capacity, const int64_t reserve_size = DEFAULT_MIDDLE_BLOCK_SIZE);
 | 
				
			||||||
 | 
					  inline bool is_inited() const { return is_inited_; }
 | 
				
			||||||
 | 
					  inline int64_t remain() const { return capacity_ - len_; }
 | 
				
			||||||
 | 
					  inline int64_t remain_buffer_size() const { return buffer_size_ - len_; }
 | 
				
			||||||
 | 
					  inline int64_t size() const { return buffer_size_; } //curr buffer size
 | 
				
			||||||
 | 
					  inline bool has_expand() const { return has_expand_; }
 | 
				
			||||||
 | 
					  inline char *data() { return data_; }
 | 
				
			||||||
 | 
					  inline char *current() { return data_ + len_; }
 | 
				
			||||||
 | 
					  int reserve(const int64_t size);
 | 
				
			||||||
 | 
					  int ensure_space(const int64_t append_size);
 | 
				
			||||||
 | 
					  inline void pop_back(const int64_t size) { len_ = MAX(0, len_ - size); }
 | 
				
			||||||
 | 
					  int write_nop(const int64_t size, bool is_zero = false);
 | 
				
			||||||
 | 
					  int write(const void *buf, int64_t size);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  template<typename T>
 | 
				
			||||||
 | 
					  typename std::enable_if<!HAS_MEMBER(T, serialize), int>::type write(const T &value)
 | 
				
			||||||
 | 
					  {
 | 
				
			||||||
 | 
					    int ret = OB_SUCCESS;
 | 
				
			||||||
 | 
					    static_assert(std::is_pod<T>::value, "invalid type");
 | 
				
			||||||
 | 
					    if (OB_FAIL(ensure_space(sizeof(T)))) {
 | 
				
			||||||
 | 
					      if (ret != OB_BUF_NOT_ENOUGH) {
 | 
				
			||||||
 | 
					        STORAGE_LOG(WARN, "failed to ensure space", K(ret), K(sizeof(T)));
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					    } else {
 | 
				
			||||||
 | 
					      *((T *)(data_ + len_)) = value;
 | 
				
			||||||
 | 
					      len_ += sizeof(T);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    return ret;
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  template<typename T>
 | 
				
			||||||
 | 
					  typename std::enable_if<HAS_MEMBER(T, serialize), int>::type write(const T &value)
 | 
				
			||||||
 | 
					  {
 | 
				
			||||||
 | 
					    int ret = OB_SUCCESS;
 | 
				
			||||||
 | 
					    if (OB_FAIL(ensure_space(value.get_serialize_size()))) {
 | 
				
			||||||
 | 
					      if (ret != OB_BUF_NOT_ENOUGH) {
 | 
				
			||||||
 | 
					        STORAGE_LOG(WARN, "failed to ensure space", K(ret), K(value.get_serialize_size()));
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					    } else if (OB_FAIL(value.serialize(data_, buffer_size_, len_))) {
 | 
				
			||||||
 | 
					      STORAGE_LOG(WARN, "fail to serialize", K(ret), K(buffer_size_), K(len_));
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    return ret;
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					  int advance(const int64_t size);
 | 
				
			||||||
 | 
					  int set_length(const int64_t len);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  void reuse();
 | 
				
			||||||
 | 
					  void reset();
 | 
				
			||||||
 | 
					  inline int64_t length() const { return len_; }
 | 
				
			||||||
 | 
					  TO_STRING_KV(K_(capacity), K_(buffer_size), K_(len), K_(data), K_(default_reserve), K_(reset_memory_threshold),
 | 
				
			||||||
 | 
					      K_(memory_reclaim_cnt), K_(has_expand));
 | 
				
			||||||
 | 
					protected:
 | 
				
			||||||
 | 
					  bool check_could_expand() { return capacity_ > buffer_size_; }
 | 
				
			||||||
 | 
					  int expand(const int64_t size);
 | 
				
			||||||
 | 
					private:
 | 
				
			||||||
 | 
					  compaction::ObLocalAllocator<common::DefaultPageAllocator> allocator_;
 | 
				
			||||||
 | 
					  bool is_inited_;
 | 
				
			||||||
 | 
					  int64_t capacity_;
 | 
				
			||||||
 | 
					  int64_t buffer_size_; //curr buffer size
 | 
				
			||||||
 | 
					  int64_t len_; //curr pos
 | 
				
			||||||
 | 
					  char *data_;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  // for reclaim memory
 | 
				
			||||||
 | 
					  int64_t default_reserve_;
 | 
				
			||||||
 | 
					  int64_t reset_memory_threshold_;
 | 
				
			||||||
 | 
					  int64_t memory_reclaim_cnt_;
 | 
				
			||||||
 | 
					  bool has_expand_;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					protected:
 | 
				
			||||||
 | 
					  static const int64_t MIN_BUFFER_SIZE = 1 << 12; //4kb
 | 
				
			||||||
 | 
					  static const int64_t MAX_DATA_BUFFER_SIZE = 2 * common::OB_DEFAULT_MACRO_BLOCK_SIZE; // 4m
 | 
				
			||||||
 | 
					  static const int64_t DEFAULT_MIDDLE_BLOCK_SIZE = 1 << 16; //64K
 | 
				
			||||||
 | 
					  static const int64_t DEFAULT_RESET_MEMORY_THRESHOLD = 5;
 | 
				
			||||||
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
} // compaction
 | 
					} // compaction
 | 
				
			||||||
} // oceanbase
 | 
					} // oceanbase
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -559,7 +559,7 @@ int ObPartitionMacroMergeIter::open_curr_range(const bool for_rewrite, const boo
 | 
				
			|||||||
        LOG_WARN("fail to switch_query_range", K(ret));
 | 
					        LOG_WARN("fail to switch_query_range", K(ret));
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    } else {
 | 
					    } else {
 | 
				
			||||||
      iter->reset();
 | 
					      iter->reuse();
 | 
				
			||||||
      if (OB_FAIL(iter->open(
 | 
					      if (OB_FAIL(iter->open(
 | 
				
			||||||
          access_param_.iter_param_,
 | 
					          access_param_.iter_param_,
 | 
				
			||||||
          access_context_,
 | 
					          access_context_,
 | 
				
			||||||
@ -712,7 +712,7 @@ int ObPartitionMacroMergeIter::exist(const ObDatumRow &row, bool &is_exist)
 | 
				
			|||||||
      LOG_WARN("fail to switch_query_range", K(ret), K(cs_datum_range_));
 | 
					      LOG_WARN("fail to switch_query_range", K(ret), K(cs_datum_range_));
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
  } else {
 | 
					  } else {
 | 
				
			||||||
    iter->reset();
 | 
					    iter->reuse();
 | 
				
			||||||
    if (OB_FAIL(iter->open(
 | 
					    if (OB_FAIL(iter->open(
 | 
				
			||||||
              access_param_.iter_param_,
 | 
					              access_param_.iter_param_,
 | 
				
			||||||
              access_context_,
 | 
					              access_context_,
 | 
				
			||||||
@ -773,7 +773,7 @@ int ObPartitionMacroMergeIter::check_row_changed(const blocksstable::ObDatumRow
 | 
				
			|||||||
        LOG_WARN("fail to switch_query_range", K(ret), K(cs_datum_range_));
 | 
					        LOG_WARN("fail to switch_query_range", K(ret), K(cs_datum_range_));
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    } else {
 | 
					    } else {
 | 
				
			||||||
      iter->reset();
 | 
					      iter->reuse();
 | 
				
			||||||
      if (OB_FAIL(iter->open(
 | 
					      if (OB_FAIL(iter->open(
 | 
				
			||||||
          access_param_.iter_param_,
 | 
					          access_param_.iter_param_,
 | 
				
			||||||
          access_context_,
 | 
					          access_context_,
 | 
				
			||||||
@ -1811,7 +1811,7 @@ int ObPartitionMinorMacroMergeIter::open_curr_macro_block()
 | 
				
			|||||||
    LOG_WARN("Unepxcted opened macro block to open", K(ret));
 | 
					    LOG_WARN("Unepxcted opened macro block to open", K(ret));
 | 
				
			||||||
  } else {
 | 
					  } else {
 | 
				
			||||||
    ObSSTableRowWholeScanner *iter = reinterpret_cast<ObSSTableRowWholeScanner *>(row_iter_);
 | 
					    ObSSTableRowWholeScanner *iter = reinterpret_cast<ObSSTableRowWholeScanner *>(row_iter_);
 | 
				
			||||||
    iter->reset();
 | 
					    iter->reuse();
 | 
				
			||||||
    if (OB_FAIL(iter->open(
 | 
					    if (OB_FAIL(iter->open(
 | 
				
			||||||
                access_param_.iter_param_,
 | 
					                access_param_.iter_param_,
 | 
				
			||||||
                access_context_,
 | 
					                access_context_,
 | 
				
			||||||
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user