micro writer buffer uses the default allocator
This commit is contained in:
parent
d0a66f4900
commit
41296128f1
@ -90,14 +90,20 @@ int ObMicroBufferWriter::init(const int64_t capacity, const int64_t reserve_size
|
||||
|
||||
void ObMicroBufferWriter::reset()
|
||||
{
|
||||
old_buf_ = nullptr;
|
||||
if (old_buf_ != nullptr) {
|
||||
allocator_.free(old_buf_);
|
||||
old_buf_ = nullptr;
|
||||
}
|
||||
if (data_ != nullptr) {
|
||||
allocator_.free(data_);
|
||||
data_ = nullptr;
|
||||
}
|
||||
old_size_ = 0;
|
||||
lazy_move_ = false;
|
||||
has_expand_ = false;
|
||||
memory_reclaim_cnt_ = 0;
|
||||
reset_memory_threshold_ = 0;
|
||||
default_reserve_ = 0;
|
||||
data_ = nullptr;
|
||||
len_ = 0;
|
||||
buffer_size_ = 0;
|
||||
capacity_ = 0;
|
||||
@ -120,7 +126,7 @@ void ObMicroBufferWriter::reuse()
|
||||
void *buf = nullptr;
|
||||
if (OB_ISNULL(buf = allocator_.alloc(default_reserve_))) {
|
||||
int ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
STORAGE_LOG(WARN, "failed to reclaim memory", K(ret), K(default_reserve_), K(allocator_));
|
||||
STORAGE_LOG(WARN, "failed to reclaim memory", K(ret), K(default_reserve_));
|
||||
} else {
|
||||
allocator_.free(data_);
|
||||
buffer_size_ = default_reserve_;
|
||||
|
@ -91,7 +91,7 @@ class ObMicroBufferWriter final
|
||||
{
|
||||
public:
|
||||
ObMicroBufferWriter(const int64_t page_size = DEFAULT_MIDDLE_BLOCK_SIZE)
|
||||
: allocator_("MicroBuffer"),
|
||||
: allocator_(MTL_ID(), "MicroBuffer"),
|
||||
is_inited_(false),
|
||||
capacity_(0),
|
||||
buffer_size_(0),
|
||||
@ -167,7 +167,7 @@ public:
|
||||
private:
|
||||
int expand(const int64_t size);
|
||||
private:
|
||||
compaction::ObLocalArena allocator_;
|
||||
compaction::ObLocalAllocator<common::DefaultPageAllocator> allocator_;
|
||||
bool is_inited_;
|
||||
int64_t capacity_;
|
||||
int64_t buffer_size_; //curr buffer size
|
||||
@ -219,7 +219,7 @@ public:
|
||||
virtual int64_t get_column_count() const = 0;
|
||||
virtual void reset()
|
||||
{
|
||||
reuse();
|
||||
ObIMicroBlockWriter::reuse();
|
||||
checksum_helper_.reset();
|
||||
}
|
||||
virtual void dump_diagnose_info() const { STORAGE_LOG(INFO, "IMicroBlockWriter", K(checksum_helper_)); }
|
||||
|
@ -32,95 +32,6 @@ namespace compaction
|
||||
{
|
||||
|
||||
|
||||
/*
|
||||
* ================================================= ObLocalArena =================================================
|
||||
*/
|
||||
ObLocalArena::ObLocalArena(
|
||||
const lib::ObLabel &label,
|
||||
const int64_t page_size)
|
||||
: ref_mem_ctx_(nullptr),
|
||||
arena_(label, page_size, MTL_ID(), ObCtxIds::DEFAULT_CTX_ID),
|
||||
hist_mem_hold_(0)
|
||||
{
|
||||
ObCompactionMemoryContext *mem_ctx = CURRENT_MEM_CTX();
|
||||
if (nullptr != mem_ctx) {
|
||||
bind_mem_ctx(*mem_ctx);
|
||||
}
|
||||
}
|
||||
|
||||
ObLocalArena::ObLocalArena(
|
||||
ObCompactionMemoryContext &mem_ctx,
|
||||
const lib::ObLabel &label,
|
||||
const int64_t page_size)
|
||||
: ref_mem_ctx_(nullptr),
|
||||
arena_(label, page_size, MTL_ID(), ObCtxIds::DEFAULT_CTX_ID),
|
||||
hist_mem_hold_(0)
|
||||
{
|
||||
bind_mem_ctx(mem_ctx);
|
||||
}
|
||||
|
||||
void ObLocalArena::bind_mem_ctx(ObCompactionMemoryContext &mem_ctx)
|
||||
{
|
||||
if (NULL == ref_mem_ctx_) {
|
||||
ref_mem_ctx_ = &mem_ctx;
|
||||
}
|
||||
arena_.set_ctx_id(ref_mem_ctx_->get_ctx_id());
|
||||
}
|
||||
|
||||
ObLocalArena::~ObLocalArena()
|
||||
{
|
||||
reset();
|
||||
ref_mem_ctx_ = nullptr;
|
||||
}
|
||||
|
||||
void* ObLocalArena::alloc(const int64_t size)
|
||||
{
|
||||
void *buf = arena_.alloc(size);
|
||||
if (OB_NOT_NULL(buf)) {
|
||||
update_mem_monitor();
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
void* ObLocalArena::alloc(const int64_t size, const ObMemAttr &attr)
|
||||
{
|
||||
void *buf = arena_.alloc(size, attr);
|
||||
if (OB_NOT_NULL(buf)) {
|
||||
update_mem_monitor();
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
void ObLocalArena::reset()
|
||||
{
|
||||
arena_.reset();
|
||||
update_mem_monitor();
|
||||
}
|
||||
|
||||
void ObLocalArena::clear()
|
||||
{
|
||||
arena_.clear();
|
||||
update_mem_monitor();
|
||||
}
|
||||
|
||||
void ObLocalArena::update_mem_monitor()
|
||||
{
|
||||
ref_mem_ctx_ = nullptr == ref_mem_ctx_
|
||||
? CURRENT_MEM_CTX()
|
||||
: ref_mem_ctx_;
|
||||
|
||||
int64_t cur_mem_hold = arena_.total();
|
||||
if (nullptr != ref_mem_ctx_ && hist_mem_hold_ != cur_mem_hold) {
|
||||
if (cur_mem_hold > hist_mem_hold_) {
|
||||
ref_mem_ctx_->inc_local_hold_mem(cur_mem_hold - hist_mem_hold_);
|
||||
} else {
|
||||
ref_mem_ctx_->inc_local_free_mem(hist_mem_hold_ - cur_mem_hold);
|
||||
}
|
||||
hist_mem_hold_ = cur_mem_hold;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ================================================= Mem Monitor Part =================================================
|
||||
*/
|
||||
|
@ -18,6 +18,9 @@
|
||||
#include "lib/allocator/ob_allocator.h"
|
||||
#include "lib/allocator/page_arena.h"
|
||||
#include "lib/allocator/ob_fifo_allocator.h"
|
||||
#include "share/rc/ob_tenant_base.h"
|
||||
#include "share/scheduler/ob_tenant_dag_scheduler.h"
|
||||
#include "lib/utility/ob_template_utils.h"
|
||||
#include "lib/lock/ob_spin_lock.h"
|
||||
#include "lib/list/ob_dlist.h"
|
||||
#include "share/ob_delegate.h"
|
||||
@ -51,76 +54,6 @@ struct ObTabletMergeDagParam;
|
||||
class ObCompactionMemoryContext;
|
||||
|
||||
|
||||
class ObLocalArena : public common::ObIAllocator
|
||||
{
|
||||
public:
|
||||
ObLocalArena(
|
||||
const lib::ObLabel &label,
|
||||
const int64_t page_size = OB_MALLOC_NORMAL_BLOCK_SIZE);
|
||||
ObLocalArena(
|
||||
ObCompactionMemoryContext &mem_ctx,
|
||||
const lib::ObLabel &label,
|
||||
const int64_t page_size = OB_MALLOC_NORMAL_BLOCK_SIZE);
|
||||
virtual ~ObLocalArena();
|
||||
void bind_mem_ctx(ObCompactionMemoryContext &mem_ctx);
|
||||
void unbind_mem_ctx() { ref_mem_ctx_ = nullptr; }
|
||||
common::ObArenaAllocator &get_arena_allocator() { return arena_; }
|
||||
|
||||
virtual void* alloc(const int64_t size) override;
|
||||
virtual void* alloc(const int64_t size, const ObMemAttr &attr) override;
|
||||
virtual void free(void *ptr) override { arena_.free(ptr); }
|
||||
virtual void reset() override;
|
||||
virtual void clear();
|
||||
|
||||
DELEGATE_WITH_RET(arena_, alloc_aligned, void*);
|
||||
DELEGATE_WITH_RET(arena_, realloc, void*);
|
||||
DELEGATE_WITHOUT_RET(arena_, reset_remain_one_page);
|
||||
DELEGATE_WITHOUT_RET(arena_, reuse);
|
||||
DELEGATE_WITHOUT_RET(arena_, set_label);
|
||||
DELEGATE_WITHOUT_RET(arena_, set_tenant_id);
|
||||
DELEGATE_WITH_RET(arena_, set_tracer, bool);
|
||||
DELEGATE_WITH_RET(arena_, revert_tracer, bool);
|
||||
DELEGATE_WITHOUT_RET(arena_, set_ctx_id);
|
||||
DELEGATE_WITHOUT_RET(arena_, set_attr);
|
||||
DELEGATE_WITH_RET(arena_, get_arena, ModuleArena&);
|
||||
DELEGATE_WITH_RET(arena_, mprotect_arena_allocator, int);
|
||||
CONST_DELEGATE_WITH_RET(arena_, used, int64_t);
|
||||
CONST_DELEGATE_WITH_RET(arena_, total, int64_t);
|
||||
CONST_DELEGATE_WITH_RET(arena_, to_string, int64_t);
|
||||
|
||||
private:
|
||||
void update_mem_monitor();
|
||||
protected:
|
||||
ObCompactionMemoryContext *ref_mem_ctx_;
|
||||
common::ObArenaAllocator arena_;
|
||||
int64_t hist_mem_hold_;
|
||||
};
|
||||
|
||||
|
||||
class ObLocalSafeArena final : public ObLocalArena
|
||||
{
|
||||
public:
|
||||
ObLocalSafeArena(const lib::ObLabel &label, const int64_t page_size = OB_MALLOC_NORMAL_BLOCK_SIZE)
|
||||
: ObLocalArena(label, page_size), lock_() {}
|
||||
virtual ~ObLocalSafeArena() {}
|
||||
virtual void *alloc(const int64_t sz) override
|
||||
{
|
||||
ObSpinLockGuard guard(lock_);
|
||||
return arena_.alloc(sz);
|
||||
}
|
||||
virtual void* alloc(const int64_t size, const ObMemAttr &attr) override
|
||||
{
|
||||
ObSpinLockGuard guard(lock_);
|
||||
return arena_.alloc(size, attr);
|
||||
}
|
||||
DELEGATE_WITH_SPIN_LOCK(arena_, lock_, clear, void);
|
||||
DELEGATE_WITH_SPIN_LOCK(arena_, lock_, reuse, void);
|
||||
DELEGATE_WITH_SPIN_LOCK(arena_, lock_, reset, void);
|
||||
private:
|
||||
common::ObSpinLock lock_;
|
||||
};
|
||||
|
||||
|
||||
struct ObCompactionMemMonitor
|
||||
{
|
||||
public:
|
||||
@ -180,6 +113,156 @@ private:
|
||||
DISABLE_COPY_ASSIGN(ObCompactionMemoryContext);
|
||||
};
|
||||
|
||||
template<class T>
|
||||
class ObLocalAllocator : public common::ObIAllocator
|
||||
{
|
||||
public:
|
||||
ObLocalAllocator(
|
||||
const lib::ObLabel &label,
|
||||
const int64_t page_size = OB_MALLOC_NORMAL_BLOCK_SIZE)
|
||||
: ref_mem_ctx_(nullptr),
|
||||
allocator_(label, page_size, MTL_ID(), ObCtxIds::DEFAULT_CTX_ID),
|
||||
hist_mem_hold_(0)
|
||||
{
|
||||
ObCompactionMemoryContext *mem_ctx = CURRENT_MEM_CTX();
|
||||
if (nullptr != mem_ctx) {
|
||||
bind_mem_ctx(*mem_ctx);
|
||||
}
|
||||
}
|
||||
|
||||
ObLocalAllocator(
|
||||
const uint64_t tenant_id,
|
||||
const lib::ObLabel &label)
|
||||
: ref_mem_ctx_(nullptr),
|
||||
allocator_(label, tenant_id),
|
||||
hist_mem_hold_(0)
|
||||
{
|
||||
static_assert(std::is_same<T, common::DefaultPageAllocator>::value, "error allocator type");
|
||||
ObCompactionMemoryContext *mem_ctx = CURRENT_MEM_CTX();
|
||||
if (nullptr != mem_ctx) {
|
||||
bind_mem_ctx(*mem_ctx);
|
||||
}
|
||||
}
|
||||
ObLocalAllocator(
|
||||
ObCompactionMemoryContext &mem_ctx,
|
||||
const lib::ObLabel &label,
|
||||
const int64_t page_size = OB_MALLOC_NORMAL_BLOCK_SIZE)
|
||||
: ref_mem_ctx_(nullptr),
|
||||
allocator_(label, page_size, MTL_ID(), ObCtxIds::DEFAULT_CTX_ID),
|
||||
hist_mem_hold_(0)
|
||||
{
|
||||
bind_mem_ctx(mem_ctx);
|
||||
}
|
||||
|
||||
~ObLocalAllocator()
|
||||
{
|
||||
reset();
|
||||
ref_mem_ctx_ = nullptr;
|
||||
}
|
||||
|
||||
void bind_mem_ctx(ObCompactionMemoryContext &mem_ctx)
|
||||
{
|
||||
if (NULL == ref_mem_ctx_) {
|
||||
ref_mem_ctx_ = &mem_ctx;
|
||||
}
|
||||
allocator_.set_ctx_id(ref_mem_ctx_->get_ctx_id());
|
||||
}
|
||||
void unbind_mem_ctx() { ref_mem_ctx_ = nullptr; }
|
||||
common::ObArenaAllocator &get_arena_allocator() { return allocator_; }
|
||||
|
||||
virtual void* alloc(const int64_t size) override
|
||||
{
|
||||
void *buf = allocator_.alloc(size);
|
||||
if (OB_NOT_NULL(buf)) {
|
||||
update_mem_monitor();
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
virtual void* alloc(const int64_t size, const ObMemAttr &attr) override
|
||||
{
|
||||
void *buf = allocator_.alloc(size, attr);
|
||||
if (OB_NOT_NULL(buf)) {
|
||||
update_mem_monitor();
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
virtual void free(void *ptr) override { allocator_.free(ptr); }
|
||||
virtual void reset() override
|
||||
{
|
||||
allocator_.reset();
|
||||
update_mem_monitor();
|
||||
}
|
||||
|
||||
template <typename = T>
|
||||
void clear()
|
||||
{
|
||||
allocator_.clear();
|
||||
update_mem_monitor();
|
||||
}
|
||||
|
||||
DELEGATE_WITH_RET(allocator_, alloc_aligned, void*);
|
||||
DELEGATE_WITH_RET(allocator_, realloc, void*);
|
||||
DELEGATE_WITHOUT_RET(allocator_, reset_remain_one_page);
|
||||
DELEGATE_WITHOUT_RET(allocator_, reuse);
|
||||
DELEGATE_WITHOUT_RET(allocator_, set_label);
|
||||
DELEGATE_WITHOUT_RET(allocator_, set_tenant_id);
|
||||
DELEGATE_WITH_RET(allocator_, set_tracer, bool);
|
||||
DELEGATE_WITH_RET(allocator_, revert_tracer, bool);
|
||||
DELEGATE_WITHOUT_RET(allocator_, set_ctx_id);
|
||||
DELEGATE_WITHOUT_RET(allocator_, set_attr);
|
||||
DELEGATE_WITH_RET(allocator_, get_arena, ModuleArena&);
|
||||
DELEGATE_WITH_RET(allocator_, mprotect_arena_allocator, int);
|
||||
CONST_DELEGATE_WITH_RET(allocator_, used, int64_t);
|
||||
CONST_DELEGATE_WITH_RET(allocator_, total, int64_t);
|
||||
CONST_DELEGATE_WITH_RET(allocator_, to_string, int64_t);
|
||||
|
||||
private:
|
||||
void update_mem_monitor()
|
||||
{
|
||||
ref_mem_ctx_ = nullptr == ref_mem_ctx_
|
||||
? CURRENT_MEM_CTX()
|
||||
: ref_mem_ctx_;
|
||||
|
||||
int64_t cur_mem_hold = allocator_.total();
|
||||
if (nullptr != ref_mem_ctx_ && hist_mem_hold_ != cur_mem_hold) {
|
||||
if (cur_mem_hold > hist_mem_hold_) {
|
||||
ref_mem_ctx_->inc_local_hold_mem(cur_mem_hold - hist_mem_hold_);
|
||||
} else {
|
||||
ref_mem_ctx_->inc_local_free_mem(hist_mem_hold_ - cur_mem_hold);
|
||||
}
|
||||
hist_mem_hold_ = cur_mem_hold;
|
||||
}
|
||||
}
|
||||
protected:
|
||||
ObCompactionMemoryContext *ref_mem_ctx_;
|
||||
T allocator_;
|
||||
int64_t hist_mem_hold_;
|
||||
};
|
||||
|
||||
using ObLocalArena=ObLocalAllocator<common::ObArenaAllocator>;
|
||||
|
||||
class ObLocalSafeArena final : public ObLocalArena
|
||||
{
|
||||
public:
|
||||
ObLocalSafeArena(const lib::ObLabel &label, const int64_t page_size = OB_MALLOC_NORMAL_BLOCK_SIZE)
|
||||
: ObLocalArena(label, page_size), lock_() {}
|
||||
virtual ~ObLocalSafeArena() {}
|
||||
virtual void *alloc(const int64_t sz) override
|
||||
{
|
||||
ObSpinLockGuard guard(lock_);
|
||||
return allocator_.alloc(sz);
|
||||
}
|
||||
virtual void* alloc(const int64_t size, const ObMemAttr &attr) override
|
||||
{
|
||||
ObSpinLockGuard guard(lock_);
|
||||
return allocator_.alloc(size, attr);
|
||||
}
|
||||
DELEGATE_WITH_SPIN_LOCK(allocator_, lock_, clear, void);
|
||||
DELEGATE_WITH_SPIN_LOCK(allocator_, lock_, reuse, void);
|
||||
DELEGATE_WITH_SPIN_LOCK(allocator_, lock_, reset, void);
|
||||
private:
|
||||
common::ObSpinLock lock_;
|
||||
};
|
||||
|
||||
} // compaction
|
||||
} // oceanbase
|
||||
|
@ -312,6 +312,8 @@ int ObMicroBlockRawEncoder::build_block(char *&buf, int64_t &size)
|
||||
} else if (datum_rows_.empty()) {
|
||||
ret = OB_INNER_STAT_ERROR;
|
||||
LOG_WARN("empty micro block", K(ret));
|
||||
} else if (OB_FAIL(set_datum_rows_ptr())) {
|
||||
STORAGE_LOG(WARN, "fail to set datum rows ptr", K(ret));
|
||||
} else if (OB_FAIL(pivot())) {
|
||||
LOG_WARN("pivot rows to columns failed", K(ret));
|
||||
} else if (OB_FAIL(row_indexs_.reserve(datum_rows_.count()))) {
|
||||
|
@ -60,6 +60,8 @@ int ObMicroBlockRawEncoder::build_block(char *&buf, int64_t &size)
|
||||
} else if (datum_rows_.empty()) {
|
||||
ret = OB_INNER_STAT_ERROR;
|
||||
LOG_WARN("empty micro block", K(ret));
|
||||
} else if (OB_FAIL(set_datum_rows_ptr())) {
|
||||
STORAGE_LOG(WARN, "fail to set datum rows ptr", K(ret));
|
||||
} else if (OB_FAIL(pivot())) {
|
||||
LOG_WARN("pivot rows to columns failed", K(ret));
|
||||
} else if (OB_FAIL(row_indexs_.reserve(datum_rows_.count()))) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user