[FEAT MERGE] dynamic_server_spec phase1

This commit is contained in:
obdev 2023-12-10 12:12:31 +00:00 committed by ob-robot
parent ff0ec7e185
commit 75f0ce04d8
29 changed files with 3756 additions and 3053 deletions

View File

@ -24,8 +24,15 @@ namespace oceanbase
namespace lib
{
class ObTenantCtxAllocator;
struct AChunk;
struct ABlock;
struct ObMemAttr;
class IChunkMgr
{
public:
virtual AChunk *alloc_chunk(const uint64_t size, const ObMemAttr &attr) = 0;
virtual void free_chunk(AChunk *chunk, const ObMemAttr &attr) = 0;
}; // end of class IChunkMgr
class IBlockMgr
{

View File

@ -21,21 +21,14 @@
using namespace oceanbase;
using namespace oceanbase::lib;
void BlockSet::Lock::lock()
{
int64_t tid = common::get_itid() + 1;
while (!ATOMIC_BCAS(&tid_, 0, tid)) {
sched_yield();
}
}
BlockSet::BlockSet()
: mutex_(common::ObLatchIds::ALLOC_BLOCK_LOCK), clist_(NULL),
: tallocator_(NULL),
locker_(NULL),
chunk_mgr_(NULL),
clist_(NULL),
avail_bm_(BLOCKS_PER_CHUNK+1, avail_bm_buf_),
total_hold_(0), total_payload_(0), total_used_(0), tallocator_(NULL),
chunk_free_list_(false/*with_mutex*/), locker_(nullptr)
total_hold_(0), total_payload_(0), total_used_(0)
{
chunk_free_list_.set_max_chunk_cache_size(0);
}
BlockSet::~BlockSet()
@ -56,17 +49,6 @@ void BlockSet::reset()
//MEMSET(block_list_, 0, sizeof(block_list_));
clist_ = nullptr;
avail_bm_.clear();
LockGuard lock(cache_shared_lock_);
for (AChunk *chunk = nullptr; (chunk = chunk_free_list_.pop()) != nullptr;) {
uint64_t payload = 0;
UNUSED(ATOMIC_FAA(&total_hold_, -chunk->hold(&payload)));
UNUSED(ATOMIC_FAA(&total_payload_, -payload));
if (chunk->washed_size_ != 0) {
tallocator_->update_wash_stat(-1, -chunk->washed_blks_, -chunk->washed_size_);
}
tallocator_->free_chunk(chunk, attr_);
}
cache_shared_lock_.reset();
}
void BlockSet::set_tenant_ctx_allocator(ObTenantCtxAllocator &allocator)
@ -293,17 +275,11 @@ AChunk *BlockSet::alloc_chunk(const uint64_t size, const ObMemAttr &attr)
AChunk *chunk = NULL;
if (OB_NOT_NULL(tallocator_)) {
const uint64_t all_size = AChunkMgr::aligned(size);
if (INTACT_ACHUNK_SIZE == all_size && chunk_free_list_.count() > 0) {
LockGuard lock(cache_shared_lock_);
chunk = chunk_free_list_.pop();
}
if (nullptr == chunk) {
chunk = tallocator_->alloc_chunk(static_cast<int64_t>(size), attr);
if (chunk != nullptr) {
uint64_t payload = 0;
UNUSED(ATOMIC_FAA(&total_hold_, chunk->hold(&payload)));
UNUSED(ATOMIC_FAA(&total_payload_, payload));
}
chunk = chunk_mgr_->alloc_chunk(static_cast<int64_t>(size), attr);
if (chunk != nullptr) {
uint64_t payload = 0;
UNUSED(ATOMIC_FAA(&total_hold_, chunk->hold(&payload)));
UNUSED(ATOMIC_FAA(&total_payload_, payload));
}
if (NULL != chunk) {
if (NULL != clist_) {
@ -351,20 +327,13 @@ void BlockSet::free_chunk(AChunk *const chunk)
}
uint64_t payload = 0;
const uint64_t hold = chunk->hold(&payload);
bool freed = false;
if (INTACT_ACHUNK_SIZE == hold) {
LockGuard lock(cache_shared_lock_);
freed = chunk_free_list_.push(chunk);
}
if (!freed) {
if (OB_NOT_NULL(tallocator_)) {
UNUSED(ATOMIC_FAA(&total_hold_, -hold));
UNUSED(ATOMIC_FAA(&total_payload_, -payload));
if (chunk->washed_size_ != 0) {
tallocator_->update_wash_stat(-1, -chunk->washed_blks_, -chunk->washed_size_);
}
tallocator_->free_chunk(chunk, attr_);
if (OB_NOT_NULL(tallocator_)) {
UNUSED(ATOMIC_FAA(&total_hold_, -hold));
UNUSED(ATOMIC_FAA(&total_payload_, -payload));
if (chunk->washed_size_ != 0) {
tallocator_->update_wash_stat(-1, -chunk->washed_blks_, -chunk->washed_size_);
}
chunk_mgr_->free_chunk(chunk, attr_);
}
}

View File

@ -20,10 +20,6 @@
namespace oceanbase
{
namespace common
{
class ObPageManagerCenter;
}
namespace lib
{
@ -31,27 +27,7 @@ class ObTenantCtxAllocator;
class ISetLocker;
class BlockSet
{
friend class common::ObPageManagerCenter;
friend class ObTenantCtxAllocator;
class Lock
{
public:
Lock() : tid_(0) {}
~Lock() { reset(); }
void reset() { ATOMIC_STORE(&tid_, 0); }
void lock();
void unlock() { ATOMIC_STORE(&tid_, 0); }
private:
int64_t tid_;
};
class LockGuard
{
public:
LockGuard(Lock &lock) : lock_(lock) { lock_.lock(); }
~LockGuard() { lock_.unlock(); }
private:
Lock &lock_;
};
public:
BlockSet();
~BlockSet();
@ -68,13 +44,12 @@ public:
inline uint64_t get_total_used() const;
void set_tenant_ctx_allocator(ObTenantCtxAllocator &allocator);
void set_max_chunk_cache_size(const int64_t max_cache_size)
{ chunk_free_list_.set_max_chunk_cache_size(max_cache_size); }
void reset();
void set_locker(ISetLocker *locker) { locker_ = locker; }
int64_t sync_wash(int64_t wash_size=INT64_MAX);
bool check_has_unfree();
ObTenantCtxAllocator *get_tenant_ctx_allocator() const { return tallocator_; }
void set_chunk_mgr(IChunkMgr *chunk_mgr) { chunk_mgr_ = chunk_mgr; }
private:
DISALLOW_COPY_AND_ASSIGN(BlockSet);
@ -87,7 +62,10 @@ private:
void free_chunk(AChunk *const chunk);
private:
lib::ObMutex mutex_;
ObTenantCtxAllocator *tallocator_;
ObMemAttr attr_;
ISetLocker *locker_;
IChunkMgr *chunk_mgr_;
// block_list_ can not be initialized, the state is maintained by avail_bm_
union {
ABlock *block_list_[BLOCKS_PER_CHUNK+1];
@ -98,11 +76,6 @@ private:
uint64_t total_hold_;
uint64_t total_payload_;
uint64_t total_used_;
ObTenantCtxAllocator *tallocator_;
ObMemAttr attr_;
lib::AChunkList chunk_free_list_;
ISetLocker *locker_;
Lock cache_shared_lock_;
}; // end of class BlockSet
void BlockSet::lock()

View File

@ -523,9 +523,6 @@ void ObMallocAllocator::print_tenant_memory_usage(uint64_t tenant_id) const
get_global_ctx_info().get_ctx_name(i), ctx_hold_bytes[i], limit);
}
}
if (OB_SUCC(ret)) {
ObPageManagerCenter::get_instance().print_tenant_stat(tenant_id, buf, BUFLEN, ctx_pos);
}
buf[std::min(ctx_pos, BUFLEN - 1)] = '\0';
allow_next_syslog();
_LOG_INFO("[MEMORY] tenant: %lu, limit: %'lu hold: %'lu rpc_hold: %'lu cache_hold: %'lu "
@ -679,6 +676,7 @@ int ObMallocAllocator::recycle_tenant_allocator(uint64_t tenant_id)
// wash idle chunks
for (int64_t ctx_id = 0; ctx_id < ObCtxIds::MAX_CTX_ID; ctx_id++) {
ta[ctx_id].set_idle(0);
ta[ctx_id].reset_req_chunk_mgr();
}
ObTenantCtxAllocator *tas[ObCtxIds::MAX_CTX_ID] = {NULL};

View File

@ -162,7 +162,8 @@ void ObTenantCtxAllocator::print_usage() const
allow_next_syslog();
_LOG_INFO("\n[MEMORY] tenant_id=%5ld ctx_id=%25s hold=% '15ld used=% '15ld limit=% '15ld"
"\n[MEMORY] idle_size=% '10ld free_size=% '10ld"
"\n[MEMORY] wash_related_chunks=% '10ld washed_blocks=% '10ld washed_size=% '10ld\n%s",
"\n[MEMORY] wash_related_chunks=% '10ld washed_blocks=% '10ld washed_size=% '10ld"
"\n[MEMORY] request_cached_chunk_cnt=% '5ld\n%s",
tenant_id_,
get_global_ctx_info().get_ctx_name(ctx_id_),
ctx_hold_bytes,
@ -173,6 +174,7 @@ void ObTenantCtxAllocator::print_usage() const
ATOMIC_LOAD(&wash_related_chunks_),
ATOMIC_LOAD(&washed_blocks_),
ATOMIC_LOAD(&washed_size_),
req_chunk_mgr_.n_chunks(),
buf);
}
}
@ -220,11 +222,7 @@ AChunk *ObTenantCtxAllocator::alloc_chunk(const int64_t size, const ObMemAttr &a
}
}
if (OB_ISNULL(chunk)) {
if (INTACT_ACHUNK_SIZE == AChunkMgr::hold(size) && get_ctx_id() != ObCtxIds::CO_STACK) {
chunk = ObPageManagerCenter::get_instance().alloc_from_thread_local_cache(tenant_id_, ctx_id_);
}
} else {
if (OB_NOT_NULL(chunk)) {
ObDisableDiagnoseGuard disable_diagnose_guard;
lib::ObMutexGuard guard(using_list_mutex_);
chunk->prev2_ = &using_list_head_;

View File

@ -38,18 +38,102 @@ class ObTenantCtxAllocator
friend class ObTenantCtxAllocatorGuard;
friend class ObMallocAllocator;
using InvokeFunc = std::function<int (const ObTenantMemoryMgr*)>;
class ChunkMgr : public IChunkMgr
{
public:
ChunkMgr(ObTenantCtxAllocator &ta) : ta_(ta) {}
AChunk *alloc_chunk(const uint64_t size, const ObMemAttr &attr) override
{
AChunk *chunk = ta_.alloc_chunk(size, attr);
if (OB_ISNULL(chunk)) {
ta_.req_chunk_mgr_.reclaim_chunks();
chunk = ta_.alloc_chunk(size, attr);
}
return chunk;
}
void free_chunk(AChunk *chunk, const ObMemAttr &attr) override
{
ta_.free_chunk(chunk, attr);
}
private:
ObTenantCtxAllocator &ta_;
};
class ReqChunkMgr : public IChunkMgr
{
public:
ReqChunkMgr(ObTenantCtxAllocator &ta)
: ta_(ta), parallel_(CTX_ATTR(ta_.get_ctx_id()).parallel_)
{
abort_unless(parallel_ <= ARRAYSIZEOF(chunks_));
MEMSET(chunks_, 0, sizeof(chunks_));
}
AChunk *alloc_chunk(const uint64_t size, const ObMemAttr &attr) override
{
AChunk *chunk = NULL;
if (INTACT_ACHUNK_SIZE == AChunk::calc_hold(size)) {
const uint64_t idx = common::get_itid() % parallel_;
chunk = ATOMIC_TAS(&chunks_[idx], NULL);
}
if (OB_ISNULL(chunk)) {
chunk = ta_.alloc_chunk(size, attr);
}
return chunk;
}
void free_chunk(AChunk *chunk, const ObMemAttr &attr) override
{
bool freed = false;
if (INTACT_ACHUNK_SIZE == chunk->hold()) {
const uint64_t idx = common::get_itid() % parallel_;
freed = ATOMIC_BCAS(&chunks_[idx], NULL, chunk);
}
if (!freed) {
ta_.free_chunk(chunk, attr);
}
}
void reclaim_chunks()
{
for (int i = 0; i < parallel_; i++) {
AChunk *chunk = ATOMIC_TAS(&chunks_[i], NULL);
if (chunk != NULL) {
ta_.free_chunk(chunk,
ObMemAttr(ta_.get_tenant_id(), "unused", ta_.get_ctx_id()));
}
}
}
int64_t n_chunks() const
{
int64_t n = 0;
for (int i = 0; i < parallel_; i++) {
AChunk *chunk = ATOMIC_LOAD(&chunks_[i]);
if (chunk != NULL) {
n++;
}
}
return n;
}
private:
ObTenantCtxAllocator &ta_;
const int parallel_;
AChunk *chunks_[32];
};
public:
explicit ObTenantCtxAllocator(uint64_t tenant_id, uint64_t ctx_id = 0)
: resource_handle_(), ref_cnt_(0), tenant_id_(tenant_id),
ctx_id_(ctx_id), deleted_(false),
obj_mgr_(*this, tenant_id_, ctx_id_, INTACT_NORMAL_AOBJECT_SIZE,
obj_mgr_(*this,
CTX_ATTR(ctx_id).enable_no_log_,
INTACT_NORMAL_AOBJECT_SIZE,
CTX_ATTR(ctx_id).parallel_,
CTX_ATTR(ctx_id).enable_dirty_list_,
NULL),
idle_size_(0), head_chunk_(), chunk_cnt_(0),
chunk_freelist_mutex_(common::ObLatchIds::CHUNK_FREE_LIST_LOCK),
using_list_mutex_(common::ObLatchIds::CHUNK_USING_LIST_LOCK),
using_list_head_(), wash_related_chunks_(0), washed_blocks_(0), washed_size_(0)
using_list_head_(), wash_related_chunks_(0), washed_blocks_(0), washed_size_(0),
chunk_mgr_(*this), req_chunk_mgr_(*this)
{
MEMSET(&head_chunk_, 0, sizeof(AChunk));
using_list_head_.prev2_ = &using_list_head_;
@ -60,8 +144,12 @@ public:
chunk_freelist_mutex_.enable_record_stat(false);
using_list_mutex_.enable_record_stat(false);
for (int i = 0; i < ObSubCtxIds::MAX_SUB_CTX_ID; ++i) {
new (obj_mgrs_ + i) ObjectMgr(*this, tenant_id_, ctx_id_, INTACT_MIDDLE_AOBJECT_SIZE,
4/*parallel*/, false/*enable_dirty_list*/, &obj_mgr_);
new (obj_mgrs_ + i) ObjectMgr(*this,
CTX_ATTR(ctx_id).enable_no_log_,
INTACT_MIDDLE_AOBJECT_SIZE,
4/*parallel*/,
false/*enable_dirty_list*/,
&obj_mgr_);
}
}
virtual ~ObTenantCtxAllocator()
@ -172,6 +260,8 @@ public:
bool update_hold(const int64_t size);
int set_idle(const int64_t size, const bool reserve = false);
IBlockMgr &get_block_mgr() { return obj_mgr_; }
IChunkMgr &get_chunk_mgr() { return chunk_mgr_; }
IChunkMgr &get_req_chunk_mgr() { return req_chunk_mgr_; }
void get_chunks(AChunk **chunks, int cap, int &cnt);
using VisitFunc = std::function<int(ObLabel &label,
common::LabelItem *l_item)>;
@ -190,6 +280,7 @@ public:
return has_unfree;
}
void update_wash_stat(int64_t related_chunks, int64_t blocks, int64_t size);
void reset_req_chunk_mgr() { req_chunk_mgr_.reclaim_chunks(); }
private:
int64_t inc_ref_cnt(int64_t cnt) { return ATOMIC_FAA(&ref_cnt_, cnt); }
int64_t get_ref_cnt() const { return ATOMIC_LOAD(&ref_cnt_); }
@ -207,7 +298,6 @@ private:
}
return ret;
}
public:
template <typename T>
static void* common_alloc(const int64_t size, const ObMemAttr &attr,
@ -240,6 +330,8 @@ private:
union {
ObjectMgr obj_mgrs_[ObSubCtxIds::MAX_SUB_CTX_ID];
};
ChunkMgr chunk_mgr_;
ReqChunkMgr req_chunk_mgr_;
}; // end of class ObTenantCtxAllocator
} // end of namespace lib

View File

@ -18,17 +18,22 @@
using namespace oceanbase;
using namespace lib;
SubObjectMgr::SubObjectMgr(const bool for_logger, const int64_t tenant_id, const int64_t ctx_id,
SubObjectMgr::SubObjectMgr(ObTenantCtxAllocator &ta,
const bool enable_no_log,
const uint32_t ablock_size,
const bool enable_dirty_list,
IBlockMgr *blk_mgr)
: IBlockMgr(tenant_id, ctx_id), mutex_(common::ObLatchIds::ALLOC_OBJECT_LOCK),
: IBlockMgr(ta.get_tenant_id(), ta.get_ctx_id()),
ta_(ta),
mutex_(common::ObLatchIds::ALLOC_OBJECT_LOCK),
normal_locker_(mutex_), no_log_locker_(mutex_),
locker_(!for_logger ? static_cast<ISetLocker&>(normal_locker_) :
locker_(!enable_no_log ? static_cast<ISetLocker&>(normal_locker_) :
static_cast<ISetLocker&>(no_log_locker_)),
bs_(), os_(NULL, ablock_size, enable_dirty_list)
{
bs_.set_tenant_ctx_allocator(ta);
bs_.set_locker(&locker_);
bs_.set_chunk_mgr(&ta.get_chunk_mgr());
os_.set_locker(&locker_);
NULL == blk_mgr ? os_.set_block_mgr(this) : os_.set_block_mgr(blk_mgr);
#ifndef ENABLE_SANITY
@ -63,16 +68,25 @@ void SubObjectMgr::free_block(ABlock *block)
bs_.free_block(block);
}
ObjectMgr::ObjectMgr(ObTenantCtxAllocator &allocator, uint64_t tenant_id, uint64_t ctx_id,
uint32_t ablock_size, int parallel, bool enable_dirty_list, IBlockMgr *blk_mgr)
: IBlockMgr(tenant_id, ctx_id), ta_(allocator),
ablock_size_(ablock_size), parallel_(parallel), enable_dirty_list_(enable_dirty_list),
blk_mgr_(blk_mgr), sub_cnt_(1),
root_mgr_(CTX_ATTR(ctx_id).enable_no_log_, tenant_id, ctx_id, ablock_size_,
ObjectMgr::ObjectMgr(ObTenantCtxAllocator &ta,
bool enable_no_log,
uint32_t ablock_size,
int parallel,
bool enable_dirty_list,
IBlockMgr *blk_mgr)
: IBlockMgr(ta.get_tenant_id(), ta.get_ctx_id()),
ta_(ta),
enable_no_log_(enable_no_log),
ablock_size_(ablock_size),
parallel_(parallel),
enable_dirty_list_(enable_dirty_list),
blk_mgr_(blk_mgr),
sub_cnt_(1),
root_mgr_(ta, enable_no_log, ablock_size_,
enable_dirty_list, blk_mgr_),
last_wash_ts_(0), last_washed_size_(0)
last_wash_ts_(0),
last_washed_size_(0)
{
root_mgr_.set_tenant_ctx_allocator(allocator);
MEMSET(sub_mgrs_, 0, sizeof(sub_mgrs_));
sub_mgrs_[0] = &root_mgr_;
}
@ -234,9 +248,9 @@ SubObjectMgr *ObjectMgr::create_sub_mgr()
root_mgr.unlock();
if (OB_NOT_NULL(obj)) {
SANITY_UNPOISON(obj->data_, obj->alloc_bytes_);
sub_mgr = new (obj->data_) SubObjectMgr(CTX_ATTR(ctx_id_).enable_no_log_, tenant_id_, ctx_id_,
sub_mgr = new (obj->data_) SubObjectMgr(ta_,
enable_no_log_,
ablock_size_, enable_dirty_list_, blk_mgr_);
sub_mgr->set_tenant_ctx_allocator(ta_);
}
return sub_mgr;
}

View File

@ -36,14 +36,12 @@ class SubObjectMgr : public IBlockMgr
{
friend class ObTenantCtxAllocator;
public:
SubObjectMgr(const bool for_logger, const int64_t tenant_id, const int64_t ctx_id,
const uint32_t ablock_size, const bool enable_dirty_list,
SubObjectMgr(ObTenantCtxAllocator &ta,
const bool enable_no_log,
const uint32_t ablock_size,
const bool enable_dirty_list,
IBlockMgr *blk_mgr);
virtual ~SubObjectMgr() {}
OB_INLINE void set_tenant_ctx_allocator(ObTenantCtxAllocator &allocator)
{
bs_.set_tenant_ctx_allocator(allocator);
}
OB_INLINE void lock() { locker_.lock(); }
OB_INLINE void unlock() { locker_.unlock(); }
OB_INLINE bool trylock() { return locker_.trylock(); }
@ -70,6 +68,7 @@ public:
return os_.check_has_unfree(first_label);
}
private:
ObTenantCtxAllocator &ta_;
#ifndef ENABLE_SANITY
lib::ObMutex mutex_;
#else
@ -85,6 +84,7 @@ private:
class ObjectMgr final : public IBlockMgr
{
static const int N = 32;
friend class SubObjectMgr;
public:
struct Stat
{
@ -95,8 +95,11 @@ public:
int64_t last_wash_ts_;
};
public:
ObjectMgr(ObTenantCtxAllocator &allocator, uint64_t tenant_id, uint64_t ctx_id,
uint32_t ablock_size, int parallel, bool enable_dirty_list,
ObjectMgr(ObTenantCtxAllocator &ta,
bool enable_no_log,
uint32_t ablock_size,
int parallel,
bool enable_dirty_list,
IBlockMgr *blk_mgr);
~ObjectMgr();
void reset();
@ -120,6 +123,7 @@ private:
public:
ObTenantCtxAllocator &ta_;
bool enable_no_log_;
uint32_t ablock_size_;
int parallel_;
bool enable_dirty_list_;

View File

@ -48,15 +48,13 @@ int ObConcurrentFIFOAllocator::init(const int64_t total_limit,
}
int ObConcurrentFIFOAllocator::init(const int64_t page_size,
const lib::ObLabel &label,
const uint64_t tenant_id,
const lib::ObMemAttr &attr,
const int64_t total_limit)
{
int ret = OB_SUCCESS;
const int64_t cache_page_count = lib::is_mini_mode() ? 0 : get_cpu_count() * STORAGE_SIZE_TIMES;
if (OB_FAIL(inner_allocator_.init(page_size,
label,
tenant_id,
attr,
cache_page_count,
total_limit))) {
LIB_LOG(WARN, "failed to init inner allocator", K(ret));
@ -64,6 +62,14 @@ int ObConcurrentFIFOAllocator::init(const int64_t page_size,
return ret;
}
int ObConcurrentFIFOAllocator::init(const int64_t page_size,
const lib::ObLabel &label,
const uint64_t tenant_id,
const int64_t total_limit)
{
return init(page_size, ObMemAttr(tenant_id, label), total_limit);
}
int ObConcurrentFIFOAllocator::set_hold_limit(int64_t hold_limit)
{
UNUSED(hold_limit);

View File

@ -27,16 +27,19 @@ public:
int init(const int64_t total_limit,
const int64_t hold_limit,
const int64_t page_size);
int init(const int64_t page_size,
const lib::ObLabel &label,
const uint64_t tenant_id,
const int64_t total_limit);
int init(const int64_t page_size,
const lib::ObMemAttr &attr,
const int64_t total_limit);
void destroy();
public:
void set_label(const lib::ObLabel &label);
void set_attr(const lib::ObMemAttr &attr);
void set_tenant_id(const uint64_t tenant_id);
void set_nway(int nway) { inner_allocator_.set_nway(nway); }
void *alloc(const int64_t size);
void *alloc(const int64_t size, const ObMemAttr &attr);
void free(void *ptr);

View File

@ -18,157 +18,6 @@ namespace oceanbase
{
namespace common
{
ObPageManagerCenter::ObPageManagerCenter()
{
mutex_.enable_record_stat(false);
}
ObPageManagerCenter &ObPageManagerCenter::get_instance()
{
static ObPageManagerCenter THE_ONE;
return THE_ONE;
}
int ObPageManagerCenter::register_pm(ObPageManager &pm)
{
int ret = OB_SUCCESS;
ObDisableDiagnoseGuard disable_diagnose_guard;
lib::ObMutexGuard guard(mutex_);
rb_tree_.insert(&pm);
pm.has_register_ = true;
OB_LOG(INFO, "register pm finish", K(ret), KP(&pm), K(pm.get_tid()),
"tenant_id", pm.get_tenant_id());
return ret;
}
void ObPageManagerCenter::unregister_pm(ObPageManager &pm)
{
ObDisableDiagnoseGuard disable_diagnose_guard;
lib::ObMutexGuard guard(mutex_);
pm.has_register_ = false;
rb_tree_.remove(&pm);
OB_LOG(INFO, "unregister pm finish", KP(&pm), K(pm.get_tid()));
}
bool ObPageManagerCenter::has_register(ObPageManager &pm) const
{
return pm.has_register_;
}
int ObPageManagerCenter::print_tenant_stat(int64_t tenant_id, char *buf,
int64_t len, int64_t &pos)
{
int ret = OB_SUCCESS;
ObDisableDiagnoseGuard disable_diagnose_guard;
lib::ObMutexGuard guard(mutex_);
int64_t sum_used = 0;
int64_t sum_hold = 0;
if (OB_SUCC(print_tenant_stat(tenant_id, sum_used, sum_hold, buf, len, pos)) &&
sum_hold > 0) {
ret = databuff_printf(buf, len, pos,
"[MEMORY][PM] tid=%10s used=%'15ld hold=%'15ld\n", "summary", sum_used, sum_hold);
}
return ret;
}
AChunk *ObPageManagerCenter::alloc_from_thread_local_cache(int64_t tenant_id, int64_t ctx_id)
{
int tmpret = OB_SUCCESS;
AChunk *ret = nullptr;
const int RETRY_LIMIT = 10;
ObDisableDiagnoseGuard disable_diagnose_guard;
for (int retry = 0; retry < RETRY_LIMIT && OB_EAGAIN == (tmpret = mutex_.trylock()); ++retry) {
sched_yield();
}
if (OB_SUCCESS == tmpret) {
ret = alloc_from_thread_local_cache_(tenant_id, ctx_id);
if (OB_SUCCESS != (tmpret = mutex_.unlock())) {
OB_LOG_RET(ERROR, tmpret, "unlock failed", K(tmpret));
}
}
return ret;
}
int ObPageManagerCenter::print_tenant_stat(int64_t tenant_id,
int64_t &sum_used, int64_t &sum_hold, char *buf, int64_t len, int64_t &pos)
{
int ret = OB_SUCCESS;
char cmp_buf[sizeof(ObPageManager)];
ObPageManager *cmp_node = (ObPageManager*)cmp_buf;
cmp_node->tenant_id_ = tenant_id - 1;
cmp_node->id_ = INT64_MAX;
ObPageManager *start = nullptr;
rb_tree_.nsearch(cmp_node, start);
struct Arg
{
int *ret_;
char *buf_;
int64_t len_;
int64_t *pos_;
int64_t *sum_used_;
int64_t *sum_hold_;
int64_t tenant_id_;
} arg{&ret, buf, len, &pos, &sum_used, &sum_hold, tenant_id};
auto &&cb = [] (decltype(rb_tree_) *, ObPageManager *pm, void *p) {
Arg *arg = (Arg*)p;
ObPageManager *return_ret = nullptr;
if (!pm->less_than(arg->tenant_id_, INT64_MAX)) {
// iter over
return_ret = pm;
} else if (pm->get_hold() > 0) {
*arg->ret_ = databuff_printf(arg->buf_, arg->len_, *arg->pos_,
"[MEMORY][PM] tid=%10ld used=%'15ld hold=%'15ld pm=%14p ctx_name=%s\n", pm->get_tid(),
pm->get_used(), pm->get_hold(), pm,
get_global_ctx_info().get_ctx_name(pm->get_ctx_id()));
*arg->sum_used_ += pm->get_used();
*arg->sum_hold_ += pm->get_hold();
}
return return_ret;
};
if (NULL != start) {
rb_tree_.iter_rbtree(&rb_tree_, start, cb, &arg);
}
return ret;
}
AChunk *ObPageManagerCenter::alloc_from_thread_local_cache_(int64_t tenant_id, int64_t ctx_id)
{
AChunk * ret = nullptr;
char cmp_buf[sizeof(ObPageManager)];
ObPageManager *cmp_node = (ObPageManager*)cmp_buf;
cmp_node->tenant_id_ = tenant_id - 1;
cmp_node->id_ = INT64_MAX;
ObPageManager *start = nullptr;
rb_tree_.nsearch(cmp_node, start);
struct Arg
{
AChunk *&ret_;
int64_t tenant_id_;
int64_t ctx_id_;
} arg{ret, tenant_id, ctx_id};
auto &&cb = [] (decltype(rb_tree_) *, ObPageManager *pm, void *p) {
Arg *arg = (Arg*)p;
ObPageManager *return_ret = nullptr;
if (!pm->less_than(arg->tenant_id_, INT64_MAX)) {
// iter over
return_ret = pm;
} else if (pm->get_ctx_id() == arg->ctx_id_) {
BlockSet::LockGuard lock(pm->bs_.cache_shared_lock_);
arg->ret_ = pm->bs_.chunk_free_list_.pop();
if (OB_NOT_NULL(arg->ret_)) {
UNUSED(ATOMIC_FAA(&(pm->bs_.total_hold_), -arg->ret_->hold()));
// iter over
return_ret = pm;
}
}
return return_ret;
};
rb_tree_.iter_rbtree(&rb_tree_, start, cb, &arg);
return ret;
}
void ObPageManager::reset()
{
ctx_id_ = ObCtxIds::GLIBC;
@ -179,7 +28,5 @@ void ObPageManager::reset()
_RLOCAL(ObPageManager *, ObPageManager::tl_instance_);
int64_t ObPageManager::global_id_ = 0;
} // end of namespace common
} // end of namespace oceanbase

View File

@ -34,38 +34,12 @@ using lib::ObTenantCtxAllocator;
class ObPageManager : public lib::IBlockMgr
{
public:
constexpr static int DEFAULT_CHUNK_CACHE_SIZE = lib::INTACT_ACHUNK_SIZE * 2;
constexpr static int MINI_MODE_CHUNK_CACHE_SIZE = 0;
RBNODE(ObPageManager, rblink);
int compare(const ObPageManager *node) const
{
int ret = 0;
ret = (tenant_id_ > node->tenant_id_) - (tenant_id_ < node->tenant_id_);
if (ret == 0) {
ret = (id_ > node->id_) - (id_ < node->id_);
}
return ret;
}
private:
friend class ObPageManagerCenter;
friend class Thread;
public:
ObPageManager();
~ObPageManager();
~ObPageManager() {}
static ObPageManager *thread_local_instance() { return tl_instance_; }
bool less_than(const ObPageManager &other) const
{
return less_than(other.tenant_id_, other.id_);
}
bool less_than(int64_t tenant_id, int64_t id) const
{
return tenant_id_ < tenant_id ||
(tenant_id_ == tenant_id && id_ < id);
}
int set_tenant_ctx(const int64_t tenant_id, const int64_t ctx_id);
void set_max_chunk_cache_size(const int64_t max_cache_size)
{ bs_.set_max_chunk_cache_size(max_cache_size); }
void reset();
int64_t get_hold() const;
int64_t get_tid() const { return tid_; }
@ -82,9 +56,7 @@ public:
private:
int init();
RLOCAL_STATIC(ObPageManager *,tl_instance_);
static int64_t global_id_;
private:
int64_t id_;
lib::ObTenantCtxAllocatorGuard ta_;
lib::BlockSet bs_;
int64_t used_;
@ -94,28 +66,8 @@ private:
bool is_inited_;
};
class ObPageManagerCenter
{
public:
static ObPageManagerCenter &get_instance();
int register_pm(ObPageManager &pm);
void unregister_pm(ObPageManager &pm);
bool has_register(ObPageManager &pm) const;
int print_tenant_stat(int64_t tenant_id, char *buf, int64_t len, int64_t &pos);
AChunk *alloc_from_thread_local_cache(int64_t tenant_id, int64_t ctx_id);
private:
ObPageManagerCenter();
int print_tenant_stat(int64_t tenant_id, int64_t &sum_used, int64_t &sum_hold,
char *buf, int64_t len, int64_t &pos);
AChunk *alloc_from_thread_local_cache_(int64_t tenant_id, int64_t ctx_id);
private:
lib::ObMutex mutex_;
container::ObRbTree<ObPageManager, container::ObDummyCompHelper<ObPageManager>> rb_tree_;
};
inline ObPageManager::ObPageManager()
: id_(ATOMIC_FAA(&global_id_, 1)),
bs_(),
: bs_(),
used_(0),
tid_(GETTID()),
itid_(get_itid()),
@ -124,28 +76,14 @@ inline ObPageManager::ObPageManager()
{
}
inline ObPageManager::~ObPageManager()
{
auto &pmc = ObPageManagerCenter::get_instance();
if (pmc.has_register(*this)) {
pmc.unregister_pm(*this);
}
}
inline int ObPageManager::set_tenant_ctx(const int64_t tenant_id, const int64_t ctx_id)
{
int ret = OB_SUCCESS;
auto &pmc = ObPageManagerCenter::get_instance();
if (tenant_id != tenant_id_ || ctx_id != ctx_id_) {
if (pmc.has_register(*this)) {
pmc.unregister_pm(*this);
}
tenant_id_ = tenant_id;
ctx_id_ = ctx_id;
is_inited_ = false;
if (OB_FAIL(init())) {
} else {
ret = pmc.register_pm(*this);
}
}
return ret;
@ -166,6 +104,7 @@ inline int ObPageManager::init()
OB_LOG(ERROR, "null ptr", K(ret));
} else {
bs_.set_tenant_ctx_allocator(*ta_.ref_allocator());
bs_.set_chunk_mgr(&ta_->get_req_chunk_mgr());
is_inited_ = true;
}
return ret;

View File

@ -221,17 +221,18 @@ private:
HashMapMemMgrCore();
~HashMapMemMgrCore();
static HashMapMemMgrCore& get_instance();
int init(const int64_t tenant_id);
ObExternalRef* get_external_ref();
ObSmallAllocator& get_node_alloc();
ObConcurrentFIFOAllocator& get_dir_alloc();
ObConcurrentFIFOAllocator& get_cnter_alloc();
ObIAllocator& get_dir_alloc();
ObIAllocator& get_cnter_alloc();
void add_map(void *ptr);
void rm_map(void *ptr);
private:
ObExternalRef hash_ref_;
ObSmallAllocator node_alloc_;
ObConcurrentFIFOAllocator dir_alloc_;
ObConcurrentFIFOAllocator cnter_alloc_;
ObMalloc dir_alloc_;
ObMalloc cnter_alloc_;
typedef common::ObArray<void*> MapArray;
MapArray map_array_;
ObSpinLock map_array_lock_;
@ -245,10 +246,11 @@ private:
typedef HashMapMemMgrCore Core;
HashMapMemMgr() {}
virtual ~HashMapMemMgr() {}
int init(const int64_t tenant_id) { UNUSED(tenant_id); return OB_SUCCESS; }
ObExternalRef* get_external_ref();
ObSmallAllocator& get_node_alloc();
ObConcurrentFIFOAllocator& get_dir_alloc();
ObConcurrentFIFOAllocator& get_cnter_alloc();
ObIAllocator& get_dir_alloc();
ObIAllocator& get_cnter_alloc();
void add_map(void *ptr) { Core::get_instance().add_map(ptr); }
void rm_map(void *ptr) { Core::get_instance().rm_map(ptr); }
private:
@ -262,11 +264,12 @@ private:
public:
HashMapMemMgr() : core_() {}
virtual ~HashMapMemMgr() {}
int init(const int64_t tenant_id);
typedef HashMapMemMgrCore Core;
ObExternalRef* get_external_ref();
ObSmallAllocator& get_node_alloc();
ObConcurrentFIFOAllocator& get_dir_alloc();
ObConcurrentFIFOAllocator& get_cnter_alloc();
ObIAllocator& get_dir_alloc();
ObIAllocator& get_cnter_alloc();
void add_map(void *ptr) { Core::get_instance().add_map(ptr); }
void rm_map(void *ptr) { Core::get_instance().rm_map(ptr); }
private:
@ -563,6 +566,13 @@ private:
template <typename Key, typename Value, typename MemMgrTag>
constexpr const char *ObLinearHashMap<Key, Value, MemMgrTag>::LABEL;
template <typename Key, typename Value, typename MemMgrTag>
template <typename Dummy>
int ObLinearHashMap<Key, Value, MemMgrTag>::HashMapMemMgr<UniqueMemMgrTag, Dummy>::init(const int64_t tenant_id)
{
return core_.init(tenant_id);
}
template <typename Key, typename Value, typename MemMgrTag>
template <typename Dummy>
ObExternalRef* ObLinearHashMap<Key, Value, MemMgrTag>::HashMapMemMgr<UniqueMemMgrTag, Dummy>::get_external_ref()
@ -579,14 +589,14 @@ ObSmallAllocator& ObLinearHashMap<Key, Value, MemMgrTag>::HashMapMemMgr<UniqueMe
template <typename Key, typename Value, typename MemMgrTag>
template <typename Dummy>
ObConcurrentFIFOAllocator& ObLinearHashMap<Key, Value, MemMgrTag>::HashMapMemMgr<UniqueMemMgrTag, Dummy>::get_dir_alloc()
ObIAllocator& ObLinearHashMap<Key, Value, MemMgrTag>::HashMapMemMgr<UniqueMemMgrTag, Dummy>::get_dir_alloc()
{
return core_.get_dir_alloc();
}
template <typename Key, typename Value, typename MemMgrTag>
template <typename Dummy>
ObConcurrentFIFOAllocator& ObLinearHashMap<Key, Value, MemMgrTag>::HashMapMemMgr<UniqueMemMgrTag, Dummy>::get_cnter_alloc()
ObIAllocator& ObLinearHashMap<Key, Value, MemMgrTag>::HashMapMemMgr<UniqueMemMgrTag, Dummy>::get_cnter_alloc()
{
return core_.get_cnter_alloc();
}
@ -607,46 +617,38 @@ ObSmallAllocator& ObLinearHashMap<Key, Value, MemMgrTag>::HashMapMemMgr<Tag, Dum
template <typename Key, typename Value, typename MemMgrTag>
template <typename Tag, typename Dummy>
ObConcurrentFIFOAllocator& ObLinearHashMap<Key, Value, MemMgrTag>::HashMapMemMgr<Tag, Dummy>::get_dir_alloc()
ObIAllocator& ObLinearHashMap<Key, Value, MemMgrTag>::HashMapMemMgr<Tag, Dummy>::get_dir_alloc()
{
return Core::get_instance().get_dir_alloc();
}
template <typename Key, typename Value, typename MemMgrTag>
template <typename Tag, typename Dummy>
ObConcurrentFIFOAllocator& ObLinearHashMap<Key, Value, MemMgrTag>::HashMapMemMgr<Tag, Dummy>::get_cnter_alloc()
ObIAllocator& ObLinearHashMap<Key, Value, MemMgrTag>::HashMapMemMgr<Tag, Dummy>::get_cnter_alloc()
{
return Core::get_instance().get_cnter_alloc();
}
/* Hash Map memory manager. */
template <typename Key, typename Value, typename MemMgrTag>
ObLinearHashMap<Key, Value, MemMgrTag>::HashMapMemMgrCore::HashMapMemMgrCore()
: map_array_lock_(common::ObLatchIds::HASH_MAP_LOCK)
{
}
/* Hash Map memory manager. */
template <typename Key, typename Value, typename MemMgrTag>
int ObLinearHashMap<Key, Value, MemMgrTag>::HashMapMemMgrCore::init(const int64_t tenant_id)
{
int ret = OB_SUCCESS;
dir_alloc_.set_attr(ObMemAttr(tenant_id, "LinearHashMapDi"));
cnter_alloc_.set_attr(ObMemAttr(tenant_id, "LinearHashMapCn"));
// Init node alloc.
int ret = node_alloc_.init(static_cast<int64_t>(sizeof(Node)), SET_USE_500("LinearHashMapNo"));
ret = node_alloc_.init(static_cast<int64_t>(sizeof(Node)),
SET_USE_500(ObMemAttr(tenant_id, "LinearHashMapNo")));
if (OB_FAIL(ret)) {
LIB_LOG(WARN, "failed to init node alloc", K(ret));
}
int64_t total_limit = 128 * (1L << 30); // 128GB
int64_t page_size = 0;
if (lib::is_mini_mode()) {
total_limit *= lib::mini_mode_resource_ratio();
}
page_size = OB_MALLOC_MIDDLE_BLOCK_SIZE;
// Init dir alloc.
ret = dir_alloc_.init(total_limit, 2 * page_size, page_size);
dir_alloc_.set_attr(SET_USE_500("LinearHashMapDi"));
if (OB_FAIL(ret)) {
LIB_LOG(WARN, "failed to init dir alloc", K(ret));
}
// Init counter alloc.
ret = cnter_alloc_.init(total_limit, 2 * page_size, page_size);
cnter_alloc_.set_attr(SET_USE_500("LinearHashMapCn"));
if (OB_FAIL(ret)) {
LIB_LOG(WARN, "failed to init cnter alloc", K(ret));
}
return ret;
}
template <typename Key, typename Value, typename MemMgrTag>
@ -658,8 +660,6 @@ ObLinearHashMap<Key, Value, MemMgrTag>::HashMapMemMgrCore::~HashMapMemMgrCore()
LIB_LOG(WARN, "hash map not destroy", "map_ptr", map_array_.at(i));
}
}
cnter_alloc_.destroy();
dir_alloc_.destroy();
if (OB_SUCCESS != (ret = node_alloc_.destroy())) {
LIB_LOG(ERROR, "failed to destroy node alloc", K(ret));
}
@ -669,7 +669,18 @@ template <typename Key, typename Value, typename MemMgrTag>
typename ObLinearHashMap<Key, Value, MemMgrTag>::HashMapMemMgrCore&
ObLinearHashMap<Key, Value, MemMgrTag>::HashMapMemMgrCore::get_instance()
{
class InitCore {
public:
InitCore(HashMapMemMgrCore &core)
{
int ret = OB_SUCCESS;
if (OB_FAIL(core.init(OB_SERVER_TENANT_ID))) {
LIB_LOG(ERROR, "failed to init MemMgrCore", K(ret));
}
}
};
static HashMapMemMgrCore core;
static InitCore init(core);
return core;
}
@ -687,13 +698,13 @@ ObSmallAllocator& ObLinearHashMap<Key, Value, MemMgrTag>::HashMapMemMgrCore::get
}
template <typename Key, typename Value, typename MemMgrTag>
ObConcurrentFIFOAllocator& ObLinearHashMap<Key, Value, MemMgrTag>::HashMapMemMgrCore::get_dir_alloc()
ObIAllocator& ObLinearHashMap<Key, Value, MemMgrTag>::HashMapMemMgrCore::get_dir_alloc()
{
return dir_alloc_;
}
template <typename Key, typename Value, typename MemMgrTag>
ObConcurrentFIFOAllocator& ObLinearHashMap<Key, Value, MemMgrTag>::HashMapMemMgrCore::get_cnter_alloc()
ObIAllocator& ObLinearHashMap<Key, Value, MemMgrTag>::HashMapMemMgrCore::get_cnter_alloc()
{
return cnter_alloc_;
}
@ -883,7 +894,9 @@ int ObLinearHashMap<Key, Value, MemMgrTag>::init(uint64_t m_seg_sz, uint64_t s_s
set_Lp_(0, 0);
init_haz_();
init_foreach_();
if (OB_SUCCESS != (ret = init_d_arr_(m_seg_sz, s_seg_sz, dir_init_sz)))
if (OB_SUCCESS != (ret = mem_mgr_.init(tenant_id)))
{ }
else if (OB_SUCCESS != (ret = init_d_arr_(m_seg_sz, s_seg_sz, dir_init_sz)))
{ }
else if (OB_SUCCESS != (ret = cnter_.init(mem_mgr_)))
{ }

View File

@ -1633,6 +1633,7 @@ OB_INLINE bool is_bootstrap_resource_pool(const uint64_t resource_pool_id)
const int64_t OB_MALLOC_NORMAL_BLOCK_SIZE = (1LL << 13) - 256; // 8KB
const int64_t OB_MALLOC_MIDDLE_BLOCK_SIZE = (1LL << 16) - 128; // 64KB
const int64_t OB_MALLOC_BIG_BLOCK_SIZE = (1LL << 21) - ACHUNK_PRESERVE_SIZE;// 2MB (-17KB)
const int64_t OB_MALLOC_REQ_NORMAL_BLOCK_SIZE = (256LL << 10); // 256KB
const int64_t OB_MAX_MYSQL_RESPONSE_PACKET_SIZE = OB_MALLOC_BIG_BLOCK_SIZE;

View File

@ -307,9 +307,6 @@ void* Thread::__th_start(void *arg)
if (OB_FAIL(ret)) {
LOG_ERROR("set tenant ctx failed", K(ret));
} else {
const int cache_size = !lib::is_mini_mode() ? ObPageManager::DEFAULT_CHUNK_CACHE_SIZE :
ObPageManager::MINI_MODE_CHUNK_CACHE_SIZE;
pm.set_max_chunk_cache_size(cache_size);
ObPageManager::set_thread_local_instance(pm);
MemoryContext *mem_context = GET_TSI0(MemoryContext);
if (OB_ISNULL(mem_context)) {

View File

@ -103,7 +103,7 @@ int ObReqQueue::process_task(ObLink *task)
LOG_ERROR("queue pop NULL task", K(task), K(ret), K(qhandler_));
} else {
lib::ContextParam param;
param.set_mem_attr(common::OB_SERVER_TENANT_ID, ObModIds::OB_ROOT_CONTEXT, ObCtxIds::WORK_AREA)
param.set_mem_attr(common::OB_SERVER_TENANT_ID, ObModIds::OB_ROOT_CONTEXT)
.set_properties(USE_TL_PAGE_OPTIONAL);
CREATE_WITH_TEMP_CONTEXT(param) {
ObRequest *req = static_cast<ObRequest *>(task);

View File

@ -366,6 +366,8 @@ TEST(utility, used_size)
int main(int argc, char **argv)
{
// This test has an unknown exit crash problem, which requires the existence of such a line of code
oceanbase::common::get_itid();
::testing::InitGoogleTest(&argc,argv);
return RUN_ALL_TESTS();
}

View File

@ -37,6 +37,7 @@ public:
{
tallocator_.set_tenant_memory_mgr();
tallocator_.set_limit(1000L << 20);
cs_.set_chunk_mgr(&tallocator_.get_chunk_mgr());
cs_.set_tenant_ctx_allocator(tallocator_);
}

View File

@ -238,10 +238,10 @@ TEST_F(TestObjectMgr, TestSubObjectMgr)
abort_unless(ptr != MAP_FAILED);
int64_t tenant_id = OB_SERVER_TENANT_ID;
int64_t ctx_id = ObCtxIds::DEFAULT_CTX_ID;
SubObjectMgr som(false, tenant_id, ctx_id, INTACT_NORMAL_AOBJECT_SIZE, false, NULL);
auto ta = ObMallocAllocator::get_instance()->get_tenant_ctx_allocator(
tenant_id, ctx_id);
ObjectMgr som(*ta.ref_allocator(), false, INTACT_NORMAL_AOBJECT_SIZE, 1, false, NULL);
ObMemAttr attr;
som.set_tenant_ctx_allocator(*ObMallocAllocator::get_instance()->get_tenant_ctx_allocator(
tenant_id, ctx_id).ref_allocator());
ObTenantResourceMgrHandle resource_handle;
ObResourceMgr::get_instance().get_tenant_resource_mgr(
tenant_id, resource_handle);

View File

@ -180,13 +180,11 @@ TEST_F(TestAllocator, pm_basic)
// freelist
int large_size = INTACT_ACHUNK_SIZE - 200;
pm.set_max_chunk_cache_size(INTACT_ACHUNK_SIZE);
ptr = pm.alloc_page(large_size);
hold = pm.get_hold();
ASSERT_GT(hold, 0);
pm.free_page(ptr);
ASSERT_EQ(pm.get_hold(), hold);
pm.set_max_chunk_cache_size(0);
ptr = pm.alloc_page(large_size);
ASSERT_EQ(pm.get_hold(), hold);
pm.free_page(ptr);
@ -197,7 +195,6 @@ TEST_F(TestAllocator, pm_basic)
pm.free_page(ptr);
ASSERT_EQ(pm.get_hold(), hold);
pm.set_max_chunk_cache_size(INTACT_ACHUNK_SIZE * 2);
pm.alloc_page(large_size);
pm.alloc_page(large_size);
pm.alloc_page(large_size);

View File

@ -22,6 +22,7 @@
#include "lib/alloc/memory_dump.h"
#include "lib/thread/thread_mgr.h"
#include "lib/allocator/ob_mem_leak_checker.h"
#include <csignal>
using namespace oceanbase;
using namespace oceanbase::common;
@ -58,7 +59,6 @@ TEST_F(TestContext, Basic)
ObPageManager g_pm;
ObPageManager::set_thread_local_instance(g_pm);
g_pm.set_tenant_ctx(tenant_id, ctx_id);
g_pm.set_max_chunk_cache_size(0);
MemoryContext &root = MemoryContext::root();
ContextParam param;
param.set_mem_attr(tenant_id, "Context", ctx_id);
@ -231,8 +231,48 @@ TEST_F(TestContext, Basic)
ObMallocAllocator::get_instance()->get_tenant_ctx_allocator(500, ObCtxIds::DEFAULT_CTX_ID)->print_memory_usage();
}
bool req_cache_empty(ObTenantCtxAllocator *ta)
{
for (int i = 0; i < ta->req_chunk_mgr_.parallel_; i++) {
if (ta->req_chunk_mgr_.chunks_[i]) {
return false;
}
}
return true;
}
TEST_F(TestContext, PM_Wash)
{
uint64_t tenant_id = 1002;
uint64_t ctx_id = ObCtxIds::DEFAULT_CTX_ID;
ObMallocAllocator *ma = ObMallocAllocator::get_instance();
ASSERT_EQ(OB_SUCCESS, ma->create_and_add_tenant_allocator(tenant_id));
auto ta = ObMallocAllocator::get_instance()->get_tenant_ctx_allocator(tenant_id, ctx_id);
ObMemAttr attr(tenant_id, "test", ctx_id);
ObPageManager g_pm;
ObPageManager::set_thread_local_instance(g_pm);
g_pm.set_tenant_ctx(tenant_id, ctx_id);
ContextTLOptGuard guard(true);
ContextParam param;
param.set_mem_attr(attr);
param.properties_ = USE_TL_PAGE_OPTIONAL;
ASSERT_TRUE(req_cache_empty(ta.ref_allocator()));
int ret = OB_SUCCESS;
CREATE_WITH_TEMP_CONTEXT(param) {
void *ptr = ctxalf(100, attr);
ASSERT_NE(nullptr, ptr);
ctxfree(ptr);
ASSERT_FALSE(req_cache_empty(ta.ref_allocator()));
ta->set_limit(ta->get_hold());
ASSERT_NE(ob_malloc(OB_MALLOC_BIG_BLOCK_SIZE, attr), nullptr);
ASSERT_TRUE(req_cache_empty(ta.ref_allocator()));
}
}
void emptySignalHandler(int) {}
int main(int argc, char **argv)
{
std::signal(49, emptySignalHandler);
oceanbase::common::ObLogger::get_logger().set_log_level("INFO");
OB_LOGGER.set_log_level("INFO");
testing::InitGoogleTest(&argc, argv);

File diff suppressed because it is too large Load Diff

View File

@ -587,8 +587,7 @@ OB_NOINLINE int ObMPQuery::process_with_tmp_context(ObSQLSessionInfo &session,
param.set_mem_attr(MTL_ID(),
ObModIds::OB_SQL_EXECUTOR, ObCtxIds::DEFAULT_CTX_ID)
.set_properties(lib::USE_TL_PAGE_OPTIONAL)
.set_page_size(!lib::is_mini_mode() ? OB_MALLOC_BIG_BLOCK_SIZE
: OB_MALLOC_MIDDLE_BLOCK_SIZE)
.set_page_size(OB_MALLOC_REQ_NORMAL_BLOCK_SIZE)
.set_ablock_size(lib::INTACT_MIDDLE_AOBJECT_SIZE);
CREATE_WITH_TEMP_CONTEXT(param) {
ret = do_process(session,

View File

@ -257,7 +257,7 @@ void ObPxPool::run1()
ObTLTaGuard ta_guard(tenant_id_);
auto *pm = common::ObPageManager::thread_local_instance();
if (OB_LIKELY(nullptr != pm)) {
pm->set_tenant_ctx(tenant_id_, common::ObCtxIds::WORK_AREA);
pm->set_tenant_ctx(tenant_id_, common::ObCtxIds::DEFAULT_CTX_ID);
}
//ObTaTLCacheGuard ta_guard(tenant_id_);
CLEAR_INTERRUPTABLE();

View File

@ -339,8 +339,7 @@ void ObThWorker::worker(int64_t &tenant_id, int64_t &req_recv_timestamp, int32_t
lib::ContextTLOptGuard guard(true);
lib::ContextParam param;
param.set_mem_attr(tenant_->id(), ObModIds::OB_SQL_EXECUTOR, ObCtxIds::DEFAULT_CTX_ID)
.set_page_size(!lib::is_mini_mode() ?
OB_MALLOC_BIG_BLOCK_SIZE : OB_MALLOC_MIDDLE_BLOCK_SIZE)
.set_page_size(OB_MALLOC_REQ_NORMAL_BLOCK_SIZE)
.set_properties(lib::USE_TL_PAGE_OPTIONAL)
.set_ablock_size(lib::INTACT_MIDDLE_AOBJECT_SIZE);
CREATE_WITH_TEMP_CONTEXT(param) {

View File

@ -309,10 +309,12 @@ int64_t ObServerMemoryConfig::get_adaptive_memory_config(const int64_t memory_si
}
return adap_memory_size;
}
int64_t ObServerMemoryConfig::get_extra_memory()
{
return memory_limit_ < lib::ObRunningModeConfig::MINI_MEM_UPPER ? 0 : hidden_sys_memory_;
}
int ObServerMemoryConfig::reload_config(const ObServerConfig& server_config)
{
int ret = OB_SUCCESS;
@ -354,11 +356,51 @@ int ObServerMemoryConfig::reload_config(const ObServerConfig& server_config)
}
if (memory_limit - system_memory >= min_server_avail_memory &&
system_memory >= hidden_sys_memory) {
memory_limit_ = memory_limit;
system_memory_ = system_memory;
hidden_sys_memory_ = hidden_sys_memory;
LOG_INFO("update observer memory config success",
K_(memory_limit), K_(system_memory), K_(hidden_sys_memory));
bool setted = false;
if (!is_mini_mode()) {
int64_t unit_assigned = 0;
if (OB_NOT_NULL(GCTX.omt_)) {
const int64_t system_memory_hold = lib::get_tenant_memory_hold(OB_SERVER_TENANT_ID);
common::ObArray<omt::ObTenantMeta> tenant_metas;
if (OB_FAIL(GCTX.omt_->get_tenant_metas(tenant_metas))) {
LOG_WARN("fail to get tenant metas", K(ret));
// ignore ret
ret = OB_SUCCESS;
} else {
for (int64_t i = 0; i < tenant_metas.count(); i++) {
unit_assigned += tenant_metas.at(i).unit_.config_.memory_size();
}
LOG_INFO("update observer memory config",
K(memory_limit_), K(system_memory_), K(hidden_sys_memory_),
K(memory_limit), K(system_memory), K(hidden_sys_memory),
K(system_memory_hold), K(unit_assigned));
if ((system_memory - hidden_sys_memory) >= system_memory_hold
&& memory_limit >= (unit_assigned + system_memory) ) {
system_memory_ = system_memory;
hidden_sys_memory_ = hidden_sys_memory;
memory_limit_ = memory_limit;
} else {
LOG_ERROR("Unreasonable memory parameters",
"[config]memory_limit", server_config.memory_limit.get_value(),
"[config]system_memory", server_config.system_memory.get_value(),
"[config]hidden_sys", server_config._hidden_sys_tenant_memory.get_value(),
"[expect]memory_limit", memory_limit,
"[expect]system_memory", system_memory,
"[expect]hidden_sys", hidden_sys_memory,
K(system_memory_hold),
K(unit_assigned));
}
setted = true;
}
}
}
if (!setted) {
memory_limit_ = memory_limit;
system_memory_ = system_memory;
hidden_sys_memory_ = hidden_sys_memory;
}
LOG_INFO("update observer memory config",
K_(memory_limit), K_(system_memory), K_(hidden_sys_memory));
} else {
ret = OB_INVALID_CONFIG;
LOG_ERROR("update observer memory config failed",

View File

@ -195,7 +195,7 @@ public:
public:
typedef hash::ObHashMap<ObMonitorNodeKey, ObMonitorNode *,
hash::SpinReadWriteDefendMode> MonitorNodeMap;
static const int64_t MONITOR_NODE_PAGE_SIZE = (1LL << 21) - (1LL << 13); // 2M - 8k
static const int64_t MONITOR_NODE_PAGE_SIZE = (128LL << 10); // 128K
static const int64_t EVICT_INTERVAL = 1000000; //1s
static const char *MOD_LABEL;
typedef common::ObRaQueue::Ref Ref;

View File

@ -2976,8 +2976,7 @@ inline int ObSQLSessionInfo::init_mem_context(uint64_t tenant_id)
if (OB_LIKELY(NULL == mem_context_)) {
lib::ContextParam param;
param.set_properties(lib::USE_TL_PAGE_OPTIONAL)
.set_mem_attr(tenant_id, ObModIds::OB_SQL_SESSION,
common::ObCtxIds::WORK_AREA);
.set_mem_attr(tenant_id, ObModIds::OB_SQL_SESSION);
if (OB_FAIL(ROOT_CONTEXT->CREATE_CONTEXT(mem_context_, param))) {
SQL_ENG_LOG(WARN, "create entity failed", K(ret));
} else if (OB_ISNULL(mem_context_)) {

View File

@ -67,7 +67,7 @@ public:
}
void clear() { map_.clear(); }
private:
ObLinearHashMap<ObIntWarp, ObTransID> map_;
ObLinearHashMap<ObIntWarp, ObTransID, UniqueMemMgrTag> map_;
};
class DeadLockBlockCallBack {