opt whole scnner io buf

This commit is contained in:
z404289981
2023-11-23 02:41:03 +00:00
committed by ob-robot
parent c419b32061
commit f6083f5351
8 changed files with 332 additions and 311 deletions

View File

@ -42,17 +42,16 @@ ObSSTableRowWholeScanner::~ObSSTableRowWholeScanner()
}
}
int ObSSTableRowWholeScanner::alloc_io_buf()
int ObSSTableRowWholeScanner::alloc_io_buf(compaction::ObCompactionBuffer &io_buf, int64_t buf_size)
{
int ret = OB_SUCCESS;
int64_t size = common::OB_DEFAULT_MACRO_BLOCK_SIZE * PREFETCH_DEPTH;
if (OB_ISNULL(buf_ = reinterpret_cast<char*>(io_allocator_.alloc(size)))) { //continuous memory
ret = OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "failed to alloc ObSSTableRowWholeScanner read info buffer", K(ret), K(size));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < PREFETCH_DEPTH; ++i) {
io_buf_[i] = buf_ + common::OB_DEFAULT_MACRO_BLOCK_SIZE * i;
if (OB_LIKELY(io_buf.is_inited())) {
if (OB_FAIL(io_buf.reserve(buf_size))) {
LOG_WARN("fail to reserve io buf", K(ret), K(io_buf), K(buf_size));
}
} else if (OB_FAIL(io_buf.init(common::OB_DEFAULT_MACRO_BLOCK_SIZE, buf_size))) {
LOG_WARN("fail to init io buf", K(ret), K(io_buf), K(buf_size));
}
return ret;
}
@ -77,8 +76,6 @@ void ObSSTableRowWholeScanner::reset()
micro_scanner_ = nullptr;
}
allocator_.reset();
io_allocator_.reset();
buf_ = nullptr;
is_inited_ = false;
last_micro_block_recycled_ = false;
last_mvcc_row_already_output_ = false;
@ -86,7 +83,7 @@ void ObSSTableRowWholeScanner::reset()
void ObSSTableRowWholeScanner::reuse()
{
ObStoreRowIterator::reuse();
ObStoreRowIterator::reset();
iter_param_ = nullptr;
access_ctx_ = nullptr;
sstable_ = nullptr;
@ -104,8 +101,6 @@ void ObSSTableRowWholeScanner::reuse()
micro_scanner_ = nullptr;
}
allocator_.reuse();
io_allocator_.reuse();
buf_ = nullptr;
is_inited_ = false;
last_micro_block_recycled_ = false;
last_mvcc_row_already_output_ = false;
@ -208,8 +203,6 @@ int ObSSTableRowWholeScanner::inner_open(
rowkey_read_info = iter_param.read_info_;
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(alloc_io_buf())) {
LOG_WARN("alloc io buffers failed", K(ret));
} else if (OB_FAIL(init_micro_scanner(range))) {
LOG_WARN("Failed to init micro scanner", K(ret));
} else if (OB_FAIL(macro_block_iter_.open(
@ -272,8 +265,8 @@ int ObSSTableRowWholeScanner::open(
MacroScanHandle &scan_handle = scan_handles_[0];
scan_handle.reset();
if (OB_FAIL(alloc_io_buf())) {
LOG_WARN("alloc io buffers failed", K(ret));
if (OB_FAIL(alloc_io_buf(io_buf_[0], sstable_->get_macro_read_size()))) {
LOG_WARN("alloc io buffers failed", K(ret), K(sstable_->get_macro_read_size()));
} else if (OB_FAIL(init_micro_scanner(&query_range))) {
LOG_WARN("Fail to init micro scanner", K(ret));
} else {
@ -287,7 +280,7 @@ int ObSSTableRowWholeScanner::open(
read_info.size_ = sstable_->get_macro_read_size();
read_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_COMPACT_READ);
read_info.io_desc_.set_group_id(ObIOModule::SSTABLE_WHOLE_SCANNER_IO);
read_info.buf_ = io_buf_[0];
read_info.buf_ = io_buf_[0].data();
if (OB_FAIL(ret)) {
} else if (OB_FAIL(ObBlockManager::async_read_block(read_info, scan_handle.macro_io_handle_))) {
@ -424,6 +417,8 @@ int ObSSTableRowWholeScanner::prefetch()
} else {
LOG_WARN("Fail to get_next_macro_block ", K(ret), K(macro_block_iter_));
}
} else if (OB_FAIL(alloc_io_buf(io_buf_[io_index], sstable_->get_macro_read_size()))) {
LOG_WARN("alloc io buffers failed", K(ret), K(sstable_->get_macro_read_size()));
} else {
scan_handle.is_left_border_ = (0 == prefetch_macro_cursor_);
scan_handle.is_right_border_ = false; // set right border correctly when open macro block
@ -433,7 +428,7 @@ int ObSSTableRowWholeScanner::prefetch()
read_info.size_ = sstable_->get_macro_read_size();
read_info.io_desc_.set_wait_event(common::ObWaitEventIds::DB_FILE_COMPACT_READ);
read_info.io_desc_.set_group_id(ObIOModule::SSTABLE_WHOLE_SCANNER_IO);
read_info.buf_ = io_buf_[io_index];
read_info.buf_ = io_buf_[io_index].data();
if (OB_FAIL(ret)) {
} else if (OB_FAIL(ObBlockManager::async_read_block(read_info, scan_handle.macro_io_handle_))) {
LOG_WARN("Fail to read macro block, ", K(ret), K(read_info));

View File

@ -56,7 +56,6 @@ public:
access_ctx_(nullptr),
sstable_(nullptr),
allocator_(common::ObModIds::OB_SSTABLE_READER, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
io_allocator_("SSTRWS_IOUB", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
prefetch_macro_cursor_(0),
cur_macro_cursor_(0),
is_macro_prefetch_end_(false),
@ -68,12 +67,13 @@ public:
last_micro_block_recycled_(false),
last_mvcc_row_already_output_(false),
iter_macro_cnt_(0),
buf_(nullptr)
io_buf_()
{}
virtual ~ObSSTableRowWholeScanner();
int alloc_io_buf();
int alloc_io_buf(compaction::ObCompactionBuffer &io_buf, int64_t buf_size);
virtual void reset() override;
virtual void reuse() override;
int open(
const ObTableIterParam &iter_param,
ObTableAccessContext &access_ctx,
@ -97,7 +97,6 @@ protected:
ObITable *table,
const void *query_range) override;
virtual int inner_get_next_row(const blocksstable::ObDatumRow *&row) override;
virtual void reuse() override;
private:
int init_micro_scanner(const blocksstable::ObDatumRange *range);
int open_macro_block();
@ -126,7 +125,6 @@ private:
blocksstable::ObSSTable *sstable_;
blocksstable::ObDatumRange query_range_;
common::ObArenaAllocator allocator_;
common::ObArenaAllocator io_allocator_;
int64_t prefetch_macro_cursor_;
int64_t cur_macro_cursor_;
bool is_macro_prefetch_end_;
@ -140,8 +138,7 @@ private:
bool last_micro_block_recycled_;
bool last_mvcc_row_already_output_;
int64_t iter_macro_cnt_;
char *buf_;
char *io_buf_[PREFETCH_DEPTH];
compaction::ObCompactionBuffer io_buf_[PREFETCH_DEPTH];
};
}

View File

@ -56,237 +56,34 @@ void ObMicroBlockDesc::reset()
/**
* -------------------------------------------------------------------ObMicroBufferWriter-------------------------------------------------------------------
*/
int ObMicroBufferWriter::init(const int64_t capacity, const int64_t reserve_size)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
STORAGE_LOG(WARN, "micro buffer writer is inited", K(ret), K(capacity_));
} else if (OB_UNLIKELY(reserve_size < 0 || capacity > MAX_DATA_BUFFER_SIZE
|| capacity < reserve_size)) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(capacity), K(reserve_size));
} else {
capacity_ = capacity;
len_ = 0;
data_= nullptr;
buffer_size_ = 0;
reset_memory_threshold_ = DEFAULT_RESET_MEMORY_THRESHOLD;
memory_reclaim_cnt_ = 0;
}
if (OB_SUCC(ret)) {
if(OB_FAIL(reserve(reserve_size))) {
STORAGE_LOG(WARN, "failed to reserve", K(ret), K(reserve_size));
} else {
default_reserve_ = reserve_size;
is_inited_ = true;
}
}
return ret;
}
void ObMicroBufferWriter::reset()
{
if (data_ != nullptr) {
allocator_.free(data_);
data_ = nullptr;
}
has_expand_ = false;
memory_reclaim_cnt_ = 0;
reset_memory_threshold_ = 0;
default_reserve_ = 0;
len_ = 0;
buffer_size_ = 0;
capacity_ = 0;
is_inited_ = false;
allocator_.reset();
}
void ObMicroBufferWriter::reuse()
{
if (buffer_size_ > default_reserve_ && len_ <= default_reserve_) {
memory_reclaim_cnt_++;
if (memory_reclaim_cnt_ >= reset_memory_threshold_) {
reset_memory_threshold_ <<= 1;
memory_reclaim_cnt_ = 0;
void *buf = nullptr;
if (OB_ISNULL(buf = allocator_.alloc(default_reserve_))) {
int ret = OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "failed to reclaim memory", K(ret), K(default_reserve_));
} else {
allocator_.free(data_);
buffer_size_ = default_reserve_;
data_ = reinterpret_cast<char *>(buf);
}
}
} else {
memory_reclaim_cnt_ = 0;
}
has_expand_ = false;
len_ = 0;
}
int ObMicroBufferWriter::expand(const int64_t size)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(capacity_ <= buffer_size_ || size > capacity_)) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(size), K(buffer_size_), K(capacity_));
} else {
int64_t expand_size = buffer_size_ * 2;
while (expand_size < size) {
expand_size <<= 1;
}
expand_size = MIN(expand_size, capacity_);
if (OB_FAIL(reserve(expand_size))) {
STORAGE_LOG(WARN, "fail to reserve", K(ret), K(expand_size));
}
}
return ret;
}
int ObMicroBufferWriter::reserve(const int64_t size)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(size < 0 || size > capacity_)) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(size), K(capacity_));
} else if (size <= buffer_size_) {//do nothing
} else {
void* buf = nullptr;
const int64_t alloc_size = MAX(size, MIN_BUFFER_SIZE);
if (OB_ISNULL(buf = allocator_.alloc(alloc_size))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "failed to alloc memory", K(ret), K(alloc_size));
} else if (data_ != nullptr) {
has_expand_ = true;
MEMCPY(buf, data_, len_);
allocator_.free(data_);
data_ = nullptr;
}
if (OB_SUCC(ret)) {
data_ = reinterpret_cast<char *>(buf);
buffer_size_ = alloc_size;
}
}
return ret;
}
int ObMicroBufferWriter::ensure_space(const int64_t append_size)
{
int ret = OB_SUCCESS;
if (len_ + append_size > capacity_) {
ret = OB_BUF_NOT_ENOUGH;
} else if (len_ + append_size > buffer_size_) {
if (OB_FAIL(expand(len_ + append_size))) {
STORAGE_LOG(WARN, "failed to expand size", K(ret), K(len_), K(append_size));
}
}
return ret;
}
int ObMicroBufferWriter::write_nop(const int64_t size, bool is_zero)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(size < 0)) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(size), K(len_), K(capacity_));
} else if (OB_FAIL(ensure_space(size))) {
if (ret != OB_BUF_NOT_ENOUGH) {
STORAGE_LOG(WARN, "failed to ensure space", K(ret), K(size));
}
} else {
if (is_zero) {
MEMSET(data_ + len_, 0, size);
}
len_ += size;
}
return ret;
}
int ObMicroBufferWriter::write(const ObDatumRow &row, const int64_t rowkey_cnt, int64_t &size)
int ObMicroBufferWriter::write_row(const ObDatumRow &row, const int64_t rowkey_cnt, int64_t &size)
{
int ret = OB_SUCCESS;
ObRowWriter row_writer;
if ((buffer_size_ == len_) && OB_FAIL(expand(buffer_size_))) {
STORAGE_LOG(WARN, "failed to reserve", K(ret), K(buffer_size_));
if (remain_buffer_size() <= 0 && OB_FAIL(expand(ObCompactionBuffer::size()))) {
STORAGE_LOG(WARN, "failed to reserve", K(ret));
}
while (OB_SUCC(ret)) {
if (OB_SUCC(row_writer.write(rowkey_cnt, row, data_ + len_, buffer_size_ - len_, size))) {
if (OB_SUCC(row_writer.write(rowkey_cnt, row, current(), remain_buffer_size(), size))) {
break;
} else {
if (OB_UNLIKELY(ret != OB_BUF_NOT_ENOUGH)) {
STORAGE_LOG(WARN, "failed to write row", K(ret), K(buffer_size_), K(capacity_));
} else if (buffer_size_ >= capacity_) { //break
} else if (OB_FAIL(expand(buffer_size_))) {
STORAGE_LOG(WARN, "failed to reserve", K(ret), K(buffer_size_));
STORAGE_LOG(WARN, "failed to write row", K(ret), KPC(this));
} else if (!check_could_expand()) { //break
} else if (OB_FAIL(expand(ObCompactionBuffer::size()))) {
STORAGE_LOG(WARN, "failed to reserve", K(ret));
}
}
}
if (OB_SUCC(ret)) {
len_ += size;
write_nop(size);
}
return ret;
}
int ObMicroBufferWriter::write(const void *buf, int64_t size)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(buf == nullptr || size < 0)) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(buf), K(size), K(len_), K(capacity_));
} else if (OB_FAIL(ensure_space(size))) {
if (ret != OB_BUF_NOT_ENOUGH) {
STORAGE_LOG(WARN, "failed to ensure space", K(ret), K(size));
}
} else {
MEMCPY(data_ + len_, buf, size);
len_ += size;
}
return ret;
}
int ObMicroBufferWriter::advance(const int64_t size)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(size < 0 || len_ + size > buffer_size_)) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(size), K(len_), K(buffer_size_));
} else {
len_ += size;
}
return ret;
}
int ObMicroBufferWriter::set_length(const int64_t len)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(len > buffer_size_)) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(len), K(len_), K(buffer_size_));
} else {
len_ = len;
}
return ret;
}
/**
* -------------------------------------------------------------------ObIMicroBlockWriter-------------------------------------------------------------------
*/

View File

@ -87,79 +87,14 @@ enum MICRO_BLOCK_MERGE_VERIFY_LEVEL
ENCODING_AND_COMPRESSION_AND_WRITE_COMPLETE = 3,
};
class ObMicroBufferWriter final
class ObMicroBufferWriter : public compaction::ObCompactionBuffer
{
public:
ObMicroBufferWriter(const int64_t page_size = DEFAULT_MIDDLE_BLOCK_SIZE)
: allocator_(MTL_ID(), "MicroBuffer"),
is_inited_(false),
capacity_(0),
buffer_size_(0),
len_(0),
data_(nullptr),
reset_memory_threshold_(0),
memory_reclaim_cnt_(0),
has_expand_(false)
: ObCompactionBuffer("MicroBuffer", page_size)
{}
~ObMicroBufferWriter() { reset(); };
int init(const int64_t capacity, const int64_t reserve_size = DEFAULT_MIDDLE_BLOCK_SIZE);
inline bool is_inited() const { return is_inited_; }
inline int64_t remain() const { return capacity_ - len_; }
inline int64_t remain_buffer_size() const { return buffer_size_ - len_; }
inline int64_t size() const { return buffer_size_; } //curr buffer size
inline bool has_expand() const { return has_expand_; }
inline char *data() { return data_; }
inline char *current() { return data_ + len_; }
int reserve(const int64_t size);
int ensure_space(const int64_t append_size);
inline void pop_back(const int64_t size) { len_ = MAX(0, len_ - size); }
int write_nop(const int64_t size, bool is_zero = false);
int write(const ObDatumRow &row, const int64_t rowkey_cnt, int64_t &len);
int write(const void *buf, int64_t size);
template<typename T>
int write(const T &value)
{
int ret = OB_SUCCESS;
static_assert(std::is_trivially_copyable<T>::value, "invalid type");
if (OB_FAIL(ensure_space(sizeof(T)))) {
if (ret != OB_BUF_NOT_ENOUGH) {
STORAGE_LOG(WARN, "failed to ensure space", K(ret), K(sizeof(T)));
}
} else {
*((T *)(data_ + len_)) = value;
len_ += sizeof(T);
}
return ret;
}
int advance(const int64_t size);
int set_length(const int64_t len);
void reuse();
void reset();
inline int64_t length() const { return len_; }
TO_STRING_KV(K_(capacity), K_(buffer_size), K_(len), K_(data), K_(default_reserve), K_(reset_memory_threshold),
K_(memory_reclaim_cnt), K_(has_expand));
private:
int expand(const int64_t size);
private:
compaction::ObLocalAllocator<common::DefaultPageAllocator> allocator_;
bool is_inited_;
int64_t capacity_;
int64_t buffer_size_; //curr buffer size
int64_t len_; //curr pos
char *data_;
// for reclaim memory
int64_t default_reserve_;
int64_t reset_memory_threshold_;
int64_t memory_reclaim_cnt_;
bool has_expand_;
private:
static const int64_t MIN_BUFFER_SIZE = 1 << 12; //4kb
static const int64_t MAX_DATA_BUFFER_SIZE = 2 * common::OB_DEFAULT_MACRO_BLOCK_SIZE; // 4m
static const int64_t DEFAULT_MIDDLE_BLOCK_SIZE = 1 << 16; //64K
static const int64_t DEFAULT_RESET_MEMORY_THRESHOLD = 5;
virtual ~ObMicroBufferWriter() { reset(); };
int write_row(const ObDatumRow &row, const int64_t rowkey_cnt, int64_t &len);
};
// Some common interface of ObMicroBlockWriter and ObMicroBlockEncoder, not all features.

View File

@ -173,7 +173,7 @@ int ObMicroBlockWriter::append_row(const ObDatumRow &row)
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "append row column count is not consistent with init column count",
K(get_header(data_buffer_)->column_count_), K(row.get_column_count()), K(ret));
} else if (OB_FAIL(data_buffer_.write(row, rowkey_column_count_, pos))) {
} else if (OB_FAIL(data_buffer_.write_row(row, rowkey_column_count_, pos))) {
if (OB_BUF_NOT_ENOUGH != ret) {
STORAGE_LOG(WARN, "row writer fail to write row.", K(ret), K(rowkey_column_count_),
K(row), K(OB_P(data_buffer_.remain())), K(pos));

View File

@ -162,6 +162,213 @@ void ObCompactionMemoryContext::mem_click()
mem_peak_total_ = MAX(mem_peak_total_, mem_total);
}
/**
* -------------------------------------------------------------------ObCompactionBuffer-------------------------------------------------------------------
*/
int ObCompactionBuffer::init(const int64_t capacity, const int64_t reserve_size)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
STORAGE_LOG(WARN, "micro buffer writer is inited", K(ret), K(capacity_));
} else if (OB_UNLIKELY(reserve_size < 0 || capacity > MAX_DATA_BUFFER_SIZE
|| capacity < reserve_size)) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(capacity), K(reserve_size));
} else {
capacity_ = capacity;
len_ = 0;
data_= nullptr;
buffer_size_ = 0;
reset_memory_threshold_ = DEFAULT_RESET_MEMORY_THRESHOLD;
memory_reclaim_cnt_ = 0;
}
if (OB_SUCC(ret)) {
if(OB_FAIL(reserve(reserve_size))) {
STORAGE_LOG(WARN, "failed to reserve", K(ret), K(reserve_size));
} else {
default_reserve_ = reserve_size;
is_inited_ = true;
}
}
return ret;
}
void ObCompactionBuffer::reset()
{
if (data_ != nullptr) {
allocator_.free(data_);
data_ = nullptr;
}
has_expand_ = false;
memory_reclaim_cnt_ = 0;
reset_memory_threshold_ = 0;
default_reserve_ = 0;
len_ = 0;
buffer_size_ = 0;
capacity_ = 0;
is_inited_ = false;
allocator_.reset();
}
void ObCompactionBuffer::reuse()
{
if (buffer_size_ > default_reserve_ && len_ <= default_reserve_) {
memory_reclaim_cnt_++;
if (memory_reclaim_cnt_ >= reset_memory_threshold_) {
reset_memory_threshold_ <<= 1;
memory_reclaim_cnt_ = 0;
void *buf = nullptr;
if (OB_ISNULL(buf = allocator_.alloc(default_reserve_))) {
int ret = OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "failed to reclaim memory", K(ret), K(default_reserve_));
} else {
allocator_.free(data_);
buffer_size_ = default_reserve_;
data_ = reinterpret_cast<char *>(buf);
}
}
} else {
memory_reclaim_cnt_ = 0;
}
has_expand_ = false;
len_ = 0;
}
int ObCompactionBuffer::expand(const int64_t size)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(capacity_ <= buffer_size_ || size > capacity_)) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(size), K(buffer_size_), K(capacity_));
} else {
int64_t expand_size = buffer_size_ * 2;
while (expand_size < size) {
expand_size <<= 1;
}
expand_size = MIN(expand_size, capacity_);
if (OB_FAIL(reserve(expand_size))) {
STORAGE_LOG(WARN, "fail to reserve", K(ret), K(expand_size));
}
}
return ret;
}
int ObCompactionBuffer::reserve(const int64_t size)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(size < 0 || size > capacity_)) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(size), K(capacity_));
} else if (size <= buffer_size_) {//do nothing
} else {
void* buf = nullptr;
const int64_t alloc_size = MAX(size, MIN_BUFFER_SIZE);
if (OB_ISNULL(buf = allocator_.alloc(alloc_size))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "failed to alloc memory", K(ret), K(alloc_size));
} else if (data_ != nullptr) {
has_expand_ = true;
MEMCPY(buf, data_, len_);
allocator_.free(data_);
data_ = nullptr;
}
if (OB_SUCC(ret)) {
data_ = reinterpret_cast<char *>(buf);
buffer_size_ = alloc_size;
}
}
return ret;
}
int ObCompactionBuffer::ensure_space(const int64_t append_size)
{
int ret = OB_SUCCESS;
if (len_ + append_size > capacity_) {
ret = OB_BUF_NOT_ENOUGH;
} else if (len_ + append_size > buffer_size_) {
if (OB_FAIL(expand(len_ + append_size))) {
STORAGE_LOG(WARN, "failed to expand size", K(ret), K(len_), K(append_size));
}
}
return ret;
}
int ObCompactionBuffer::write_nop(const int64_t size, bool is_zero)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(size < 0)) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(size), K(len_), K(capacity_));
} else if (OB_FAIL(ensure_space(size))) {
if (ret != OB_BUF_NOT_ENOUGH) {
STORAGE_LOG(WARN, "failed to ensure space", K(ret), K(size));
}
} else {
if (is_zero) {
MEMSET(data_ + len_, 0, size);
}
len_ += size;
}
return ret;
}
int ObCompactionBuffer::write(const void *buf, int64_t size)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(buf == nullptr || size < 0)) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(buf), K(size), K(len_), K(capacity_));
} else if (OB_FAIL(ensure_space(size))) {
if (ret != OB_BUF_NOT_ENOUGH) {
STORAGE_LOG(WARN, "failed to ensure space", K(ret), K(size));
}
} else {
MEMCPY(data_ + len_, buf, size);
len_ += size;
}
return ret;
}
int ObCompactionBuffer::advance(const int64_t size)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(size < 0 || len_ + size > buffer_size_)) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(size), K(len_), K(buffer_size_));
} else {
len_ += size;
}
return ret;
}
int ObCompactionBuffer::set_length(const int64_t len)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(len > buffer_size_)) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(len), K(len_), K(buffer_size_));
} else {
len_ = len;
}
return ret;
}
} //compaction
} //oceanbase

View File

@ -264,6 +264,96 @@ private:
common::ObSpinLock lock_;
};
class ObCompactionBuffer
{
public:
ObCompactionBuffer(const lib::ObLabel &label = "compaction_buf", const int64_t page_size = DEFAULT_MIDDLE_BLOCK_SIZE)
: allocator_(MTL_ID(), label),
is_inited_(false),
capacity_(0),
buffer_size_(0),
len_(0),
data_(nullptr),
reset_memory_threshold_(0),
memory_reclaim_cnt_(0),
has_expand_(false)
{}
virtual ~ObCompactionBuffer() { reset(); };
int init(const int64_t capacity, const int64_t reserve_size = DEFAULT_MIDDLE_BLOCK_SIZE);
inline bool is_inited() const { return is_inited_; }
inline int64_t remain() const { return capacity_ - len_; }
inline int64_t remain_buffer_size() const { return buffer_size_ - len_; }
inline int64_t size() const { return buffer_size_; } //curr buffer size
inline bool has_expand() const { return has_expand_; }
inline char *data() { return data_; }
inline char *current() { return data_ + len_; }
int reserve(const int64_t size);
int ensure_space(const int64_t append_size);
inline void pop_back(const int64_t size) { len_ = MAX(0, len_ - size); }
int write_nop(const int64_t size, bool is_zero = false);
int write(const void *buf, int64_t size);
template<typename T>
typename std::enable_if<!HAS_MEMBER(T, serialize), int>::type write(const T &value)
{
int ret = OB_SUCCESS;
static_assert(std::is_pod<T>::value, "invalid type");
if (OB_FAIL(ensure_space(sizeof(T)))) {
if (ret != OB_BUF_NOT_ENOUGH) {
STORAGE_LOG(WARN, "failed to ensure space", K(ret), K(sizeof(T)));
}
} else {
*((T *)(data_ + len_)) = value;
len_ += sizeof(T);
}
return ret;
}
template<typename T>
typename std::enable_if<HAS_MEMBER(T, serialize), int>::type write(const T &value)
{
int ret = OB_SUCCESS;
if (OB_FAIL(ensure_space(value.get_serialize_size()))) {
if (ret != OB_BUF_NOT_ENOUGH) {
STORAGE_LOG(WARN, "failed to ensure space", K(ret), K(value.get_serialize_size()));
}
} else if (OB_FAIL(value.serialize(data_, buffer_size_, len_))) {
STORAGE_LOG(WARN, "fail to serialize", K(ret), K(buffer_size_), K(len_));
}
return ret;
}
int advance(const int64_t size);
int set_length(const int64_t len);
void reuse();
void reset();
inline int64_t length() const { return len_; }
TO_STRING_KV(K_(capacity), K_(buffer_size), K_(len), K_(data), K_(default_reserve), K_(reset_memory_threshold),
K_(memory_reclaim_cnt), K_(has_expand));
protected:
bool check_could_expand() { return capacity_ > buffer_size_; }
int expand(const int64_t size);
private:
compaction::ObLocalAllocator<common::DefaultPageAllocator> allocator_;
bool is_inited_;
int64_t capacity_;
int64_t buffer_size_; //curr buffer size
int64_t len_; //curr pos
char *data_;
// for reclaim memory
int64_t default_reserve_;
int64_t reset_memory_threshold_;
int64_t memory_reclaim_cnt_;
bool has_expand_;
protected:
static const int64_t MIN_BUFFER_SIZE = 1 << 12; //4kb
static const int64_t MAX_DATA_BUFFER_SIZE = 2 * common::OB_DEFAULT_MACRO_BLOCK_SIZE; // 4m
static const int64_t DEFAULT_MIDDLE_BLOCK_SIZE = 1 << 16; //64K
static const int64_t DEFAULT_RESET_MEMORY_THRESHOLD = 5;
};
} // compaction
} // oceanbase

View File

@ -559,7 +559,7 @@ int ObPartitionMacroMergeIter::open_curr_range(const bool for_rewrite, const boo
LOG_WARN("fail to switch_query_range", K(ret));
}
} else {
iter->reset();
iter->reuse();
if (OB_FAIL(iter->open(
access_param_.iter_param_,
access_context_,
@ -712,7 +712,7 @@ int ObPartitionMacroMergeIter::exist(const ObDatumRow &row, bool &is_exist)
LOG_WARN("fail to switch_query_range", K(ret), K(cs_datum_range_));
}
} else {
iter->reset();
iter->reuse();
if (OB_FAIL(iter->open(
access_param_.iter_param_,
access_context_,
@ -773,7 +773,7 @@ int ObPartitionMacroMergeIter::check_row_changed(const blocksstable::ObDatumRow
LOG_WARN("fail to switch_query_range", K(ret), K(cs_datum_range_));
}
} else {
iter->reset();
iter->reuse();
if (OB_FAIL(iter->open(
access_param_.iter_param_,
access_context_,
@ -1811,7 +1811,7 @@ int ObPartitionMinorMacroMergeIter::open_curr_macro_block()
LOG_WARN("Unepxcted opened macro block to open", K(ret));
} else {
ObSSTableRowWholeScanner *iter = reinterpret_cast<ObSSTableRowWholeScanner *>(row_iter_);
iter->reset();
iter->reuse();
if (OB_FAIL(iter->open(
access_param_.iter_param_,
access_context_,