add limit for replay allocator

This commit is contained in:
obdev
2023-03-02 17:59:51 +00:00
committed by ob-robot
parent f4d50a1367
commit 6a4317b34e
5 changed files with 14 additions and 104 deletions

View File

@ -775,6 +775,9 @@ void ObLogReplayService::process_replay_ret_code_(const int ret_code,
replay_task.replay_hint_, false, cur_ts, ret_code); replay_task.replay_hint_, false, cur_ts, ret_code);
LOG_DBA_ERROR(OB_LOG_REPLAY_ERROR, "msg", "replay task encountered fatal error", "ret", ret_code, LOG_DBA_ERROR(OB_LOG_REPLAY_ERROR, "msg", "replay task encountered fatal error", "ret", ret_code,
K(replay_status), K(replay_task)); K(replay_status), K(replay_task));
#ifdef ENABLE_DEBUG_LOG
ob_abort();
#endif
} else {/*do nothing*/} } else {/*do nothing*/}
if (OB_SUCCESS == task_queue.get_err_info_ret_code()) { if (OB_SUCCESS == task_queue.get_err_info_ret_code()) {
@ -895,18 +898,12 @@ int ObLogReplayService::check_can_submit_log_replay_task_(ObLogReplayTask *repla
ObReplayStatus *replay_status) ObReplayStatus *replay_status)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
SCN current_replayable_point;
current_replayable_point.atomic_set(replayable_point_);
bool is_wait_replayable_point = false;
bool is_wait_barrier = false; bool is_wait_barrier = false;
bool is_tenant_out_of_mem = false; bool is_tenant_out_of_mem = false;
if (NULL == replay_task || NULL == replay_status) { if (NULL == replay_task || NULL == replay_status) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
CLOG_LOG(ERROR, "check_can_submit_log_replay_task_ invalid argument", KPC(replay_status), KPC(replay_task)); CLOG_LOG(ERROR, "check_can_submit_log_replay_task_ invalid argument", KPC(replay_status), KPC(replay_task));
} else if (replay_task->is_raw_write_ && replay_task->scn_ > current_replayable_point) {
ret = OB_EAGAIN;
is_wait_replayable_point = true;
} else if (OB_FAIL(replay_status->check_submit_barrier())) { } else if (OB_FAIL(replay_status->check_submit_barrier())) {
if (OB_EAGAIN != ret) { if (OB_EAGAIN != ret) {
CLOG_LOG(ERROR, "failed to check_submit_barrier", K(ret), KPC(replay_status)); CLOG_LOG(ERROR, "failed to check_submit_barrier", K(ret), KPC(replay_status));
@ -918,8 +915,8 @@ int ObLogReplayService::check_can_submit_log_replay_task_(ObLogReplayTask *repla
is_tenant_out_of_mem = true; is_tenant_out_of_mem = true;
} }
if (OB_EAGAIN == ret && REACH_TIME_INTERVAL(5 * 1000 * 1000)) { if (OB_EAGAIN == ret && REACH_TIME_INTERVAL(5 * 1000 * 1000)) {
CLOG_LOG(INFO, "submit replay task need retry", K(ret), KPC(replay_status), KPC(replay_task), K(current_replayable_point), CLOG_LOG(INFO, "submit replay task need retry", K(ret), KPC(replay_status), KPC(replay_task),
K(is_wait_replayable_point), K(is_wait_barrier), K(is_tenant_out_of_mem)); K(is_wait_barrier), K(is_tenant_out_of_mem));
} }
return ret; return ret;
} }

View File

@ -35,19 +35,16 @@ ObTenantMutilAllocator::ObTenantMutilAllocator(uint64_t tenant_id)
PALF_FETCH_LOG_TASK_SIZE(sizeof(palf::FetchLogTask)), PALF_FETCH_LOG_TASK_SIZE(sizeof(palf::FetchLogTask)),
LOG_IO_FLASHBACK_TASK_SIZE(sizeof(palf::LogIOFlashbackTask)), LOG_IO_FLASHBACK_TASK_SIZE(sizeof(palf::LogIOFlashbackTask)),
clog_blk_alloc_(), clog_blk_alloc_(),
inner_table_replay_blk_alloc_(REPLAY_MEM_LIMIT_THRESHOLD * INNER_TABLE_REPLAY_MEM_PERCENT / 100),
user_table_replay_blk_alloc_(REPLAY_MEM_LIMIT_THRESHOLD * (100 - INNER_TABLE_REPLAY_MEM_PERCENT) / 100),
common_blk_alloc_(), common_blk_alloc_(),
unlimited_blk_alloc_(), unlimited_blk_alloc_(),
replay_log_task_blk_alloc_(REPLAY_MEM_LIMIT_THRESHOLD),
clog_ge_alloc_(ObMemAttr(tenant_id, ObModIds::OB_CLOG_GE), ObVSliceAlloc::DEFAULT_BLOCK_SIZE, clog_blk_alloc_), clog_ge_alloc_(ObMemAttr(tenant_id, ObModIds::OB_CLOG_GE), ObVSliceAlloc::DEFAULT_BLOCK_SIZE, clog_blk_alloc_),
inner_table_replay_task_alloc_(ObMemAttr(tenant_id, ObModIds::OB_LOG_REPLAY_ENGINE), ObVSliceAlloc::DEFAULT_BLOCK_SIZE, inner_table_replay_blk_alloc_),
user_table_replay_task_alloc_(ObMemAttr(tenant_id, ObModIds::OB_LOG_REPLAY_ENGINE), ObVSliceAlloc::DEFAULT_BLOCK_SIZE, user_table_replay_blk_alloc_),
log_io_flush_log_task_alloc_(LOG_IO_FLUSH_LOG_TASK_SIZE, ObMemAttr(tenant_id, "FlushLog"), choose_blk_size(LOG_IO_FLUSH_LOG_TASK_SIZE), clog_blk_alloc_, this), log_io_flush_log_task_alloc_(LOG_IO_FLUSH_LOG_TASK_SIZE, ObMemAttr(tenant_id, "FlushLog"), choose_blk_size(LOG_IO_FLUSH_LOG_TASK_SIZE), clog_blk_alloc_, this),
log_io_truncate_log_task_alloc_(LOG_IO_TRUNCATE_LOG_TASK_SIZE, ObMemAttr(tenant_id, "TruncateLog"), choose_blk_size(LOG_IO_TRUNCATE_LOG_TASK_SIZE), clog_blk_alloc_, this), log_io_truncate_log_task_alloc_(LOG_IO_TRUNCATE_LOG_TASK_SIZE, ObMemAttr(tenant_id, "TruncateLog"), choose_blk_size(LOG_IO_TRUNCATE_LOG_TASK_SIZE), clog_blk_alloc_, this),
log_io_flush_meta_task_alloc_(LOG_IO_FLUSH_META_TASK_SIZE, ObMemAttr(tenant_id, "FlushMeta"), choose_blk_size(LOG_IO_FLUSH_META_TASK_SIZE), clog_blk_alloc_, this), log_io_flush_meta_task_alloc_(LOG_IO_FLUSH_META_TASK_SIZE, ObMemAttr(tenant_id, "FlushMeta"), choose_blk_size(LOG_IO_FLUSH_META_TASK_SIZE), clog_blk_alloc_, this),
log_io_truncate_prefix_blocks_task_alloc_(LOG_IO_TRUNCATE_PREFIX_BLOCKS_TASK_SIZE, ObMemAttr(tenant_id, "FlushMeta"), choose_blk_size(LOG_IO_TRUNCATE_PREFIX_BLOCKS_TASK_SIZE), clog_blk_alloc_, this), log_io_truncate_prefix_blocks_task_alloc_(LOG_IO_TRUNCATE_PREFIX_BLOCKS_TASK_SIZE, ObMemAttr(tenant_id, "FlushMeta"), choose_blk_size(LOG_IO_TRUNCATE_PREFIX_BLOCKS_TASK_SIZE), clog_blk_alloc_, this),
palf_fetch_log_task_alloc_(PALF_FETCH_LOG_TASK_SIZE, ObMemAttr(tenant_id, ObModIds::OB_FETCH_LOG_TASK), choose_blk_size(PALF_FETCH_LOG_TASK_SIZE), clog_blk_alloc_, this), palf_fetch_log_task_alloc_(PALF_FETCH_LOG_TASK_SIZE, ObMemAttr(tenant_id, ObModIds::OB_FETCH_LOG_TASK), choose_blk_size(PALF_FETCH_LOG_TASK_SIZE), clog_blk_alloc_, this),
replay_log_task_alloc_(ObMemAttr(tenant_id, ObModIds::OB_LOG_REPLAY_TASK), common::OB_MALLOC_BIG_BLOCK_SIZE), replay_log_task_alloc_(ObMemAttr(tenant_id, ObModIds::OB_LOG_REPLAY_TASK), common::OB_MALLOC_BIG_BLOCK_SIZE, replay_log_task_blk_alloc_),
log_io_flashback_task_alloc_(LOG_IO_FLASHBACK_TASK_SIZE, ObMemAttr(tenant_id, "Flashback"), choose_blk_size(LOG_IO_FLASHBACK_TASK_SIZE), clog_blk_alloc_, this) log_io_flashback_task_alloc_(LOG_IO_FLASHBACK_TASK_SIZE, ObMemAttr(tenant_id, "Flashback"), choose_blk_size(LOG_IO_FLASHBACK_TASK_SIZE), clog_blk_alloc_, this)
{ {
// set_nway according to tenant's max_cpu // set_nway according to tenant's max_cpu
@ -79,8 +76,6 @@ int ObTenantMutilAllocator::choose_blk_size(int obj_size)
void ObTenantMutilAllocator::try_purge() void ObTenantMutilAllocator::try_purge()
{ {
clog_ge_alloc_.purge_extra_cached_block(0); clog_ge_alloc_.purge_extra_cached_block(0);
inner_table_replay_task_alloc_.purge_extra_cached_block(0);
user_table_replay_task_alloc_.purge_extra_cached_block(0);
log_io_flush_log_task_alloc_.purge_extra_cached_block(0); log_io_flush_log_task_alloc_.purge_extra_cached_block(0);
log_io_truncate_log_task_alloc_.purge_extra_cached_block(0); log_io_truncate_log_task_alloc_.purge_extra_cached_block(0);
log_io_flush_meta_task_alloc_.purge_extra_cached_block(0); log_io_flush_meta_task_alloc_.purge_extra_cached_block(0);
@ -122,43 +117,6 @@ const ObBlockAllocMgr &ObTenantMutilAllocator::get_clog_blk_alloc_mgr() const
return clog_blk_alloc_; return clog_blk_alloc_;
} }
void *ObTenantMutilAllocator::alloc_replay_task_buf(const bool is_inner_table, const int64_t size)
{
void *ptr = NULL;
ObVSliceAlloc &allocator = is_inner_table ? inner_table_replay_task_alloc_ : user_table_replay_task_alloc_;
ptr = allocator.alloc(size);
return ptr;
}
void ObTenantMutilAllocator::free_replay_task(const bool is_inner_table, void *ptr)
{
if (OB_LIKELY(NULL != ptr)) {
ObVSliceAlloc &allocator = is_inner_table ? inner_table_replay_task_alloc_ : user_table_replay_task_alloc_;
allocator.free(ptr);
}
}
bool ObTenantMutilAllocator::can_alloc_replay_task(const bool is_inner_table, int64_t size) const
{
const ObVSliceAlloc &allocator = is_inner_table ? inner_table_replay_task_alloc_ : user_table_replay_task_alloc_;
return allocator.can_alloc_block(size);
}
void ObTenantMutilAllocator::inc_pending_replay_mutator_size(int64_t size)
{
ATOMIC_AAF(&pending_replay_mutator_size_, size);
}
void ObTenantMutilAllocator::dec_pending_replay_mutator_size(int64_t size)
{
ATOMIC_SAF(&pending_replay_mutator_size_, size);
}
int64_t ObTenantMutilAllocator::get_pending_replay_mutator_size() const
{
return ATOMIC_LOAD(&pending_replay_mutator_size_);
}
LogIOFlushLogTask *ObTenantMutilAllocator::alloc_log_io_flush_log_task( LogIOFlushLogTask *ObTenantMutilAllocator::alloc_log_io_flush_log_task(
const int64_t palf_id, const int64_t palf_epoch) const int64_t palf_id, const int64_t palf_epoch)
{ {
@ -304,8 +262,6 @@ void ObTenantMutilAllocator::set_nway(const int32_t nway)
{ {
if (nway > 0) { if (nway > 0) {
clog_ge_alloc_.set_nway(nway); clog_ge_alloc_.set_nway(nway);
inner_table_replay_task_alloc_.set_nway(nway);
user_table_replay_task_alloc_.set_nway(nway);
OB_LOG(INFO, "finish set nway", K(tenant_id_), K(nway)); OB_LOG(INFO, "finish set nway", K(tenant_id_), K(nway));
} }
} }
@ -316,15 +272,12 @@ void ObTenantMutilAllocator::set_limit(const int64_t total_limit)
ATOMIC_STORE(&total_limit_, total_limit); ATOMIC_STORE(&total_limit_, total_limit);
const int64_t clog_limit = total_limit / 100 * CLOG_MEM_LIMIT_PERCENT; const int64_t clog_limit = total_limit / 100 * CLOG_MEM_LIMIT_PERCENT;
const int64_t replay_limit = std::min(total_limit / 100 * REPLAY_MEM_LIMIT_PERCENT, REPLAY_MEM_LIMIT_THRESHOLD); const int64_t replay_limit = std::min(total_limit / 100 * REPLAY_MEM_LIMIT_PERCENT, REPLAY_MEM_LIMIT_THRESHOLD);
const int64_t inner_table_replay_limit = replay_limit * INNER_TABLE_REPLAY_MEM_PERCENT / 100;
const int64_t user_table_replay_limit = replay_limit * (100 - INNER_TABLE_REPLAY_MEM_PERCENT) / 100;
const int64_t common_limit = total_limit - (clog_limit + replay_limit); const int64_t common_limit = total_limit - (clog_limit + replay_limit);
clog_blk_alloc_.set_limit(clog_limit); clog_blk_alloc_.set_limit(clog_limit);
inner_table_replay_blk_alloc_.set_limit(inner_table_replay_limit);
user_table_replay_blk_alloc_.set_limit(user_table_replay_limit);
common_blk_alloc_.set_limit(common_limit); common_blk_alloc_.set_limit(common_limit);
replay_log_task_alloc_.set_limit(replay_limit);
OB_LOG(INFO, "ObTenantMutilAllocator set tenant mem limit finished", K(tenant_id_), K(total_limit), K(clog_limit), OB_LOG(INFO, "ObTenantMutilAllocator set tenant mem limit finished", K(tenant_id_), K(total_limit), K(clog_limit),
K(replay_limit), K(common_limit), K(inner_table_replay_limit), K(user_table_replay_limit)); K(replay_limit), K(common_limit));
} }
} }
@ -335,8 +288,8 @@ int64_t ObTenantMutilAllocator::get_limit() const
int64_t ObTenantMutilAllocator::get_hold() const int64_t ObTenantMutilAllocator::get_hold() const
{ {
return clog_blk_alloc_.hold() + inner_table_replay_blk_alloc_.hold() return clog_blk_alloc_.hold() + common_blk_alloc_.hold()
+ user_table_replay_blk_alloc_.hold() + common_blk_alloc_.hold(); + replay_log_task_blk_alloc_.hold();
} }
#define SLICE_FREE_OBJ(name, cls) \ #define SLICE_FREE_OBJ(name, cls) \

View File

@ -76,24 +76,8 @@ protected:
int64_t flying_meta_task_; int64_t flying_meta_task_;
}; };
// Interface for ReplayEngine module
class ObIReplayTaskAllocator
{
public:
ObIReplayTaskAllocator() {}
virtual ~ObIReplayTaskAllocator() {}
public:
virtual void *alloc_replay_task_buf(const bool is_inner_table, const int64_t size) = 0;
virtual void free_replay_task(const bool is_inner_table, void *ptr) = 0;
virtual bool can_alloc_replay_task(const bool is_inner_table, int64_t size) const = 0;
virtual void inc_pending_replay_mutator_size(int64_t size) = 0;
virtual void dec_pending_replay_mutator_size(int64_t size) = 0;
virtual int64_t get_pending_replay_mutator_size() const = 0;
};
class ObTenantMutilAllocator class ObTenantMutilAllocator
: public ObILogAllocator, public ObIReplayTaskAllocator, public common::ObLink : public ObILogAllocator, public common::ObLink
{ {
public: public:
// The memory percent of clog // The memory percent of clog
@ -128,12 +112,6 @@ public:
void *ge_alloc(const int64_t size); void *ge_alloc(const int64_t size);
void ge_free(void *ptr); void ge_free(void *ptr);
const ObBlockAllocMgr &get_clog_blk_alloc_mgr() const; const ObBlockAllocMgr &get_clog_blk_alloc_mgr() const;
void *alloc_replay_task_buf(const bool is_inner_table, const int64_t size);
void free_replay_task(const bool is_inner_table, void *ptr);
bool can_alloc_replay_task(const bool is_inner_table, int64_t size) const;
void inc_pending_replay_mutator_size(int64_t size);
void dec_pending_replay_mutator_size(int64_t size);
int64_t get_pending_replay_mutator_size() const;
// V4.0 // V4.0
palf::LogIOFlushLogTask *alloc_log_io_flush_log_task(const int64_t palf_id, const int64_t palf_epoch); palf::LogIOFlushLogTask *alloc_log_io_flush_log_task(const int64_t palf_id, const int64_t palf_epoch);
void free_log_io_flush_log_task(palf::LogIOFlushLogTask *ptr); void free_log_io_flush_log_task(palf::LogIOFlushLogTask *ptr);
@ -163,13 +141,10 @@ private:
const int PALF_FETCH_LOG_TASK_SIZE; const int PALF_FETCH_LOG_TASK_SIZE;
const int LOG_IO_FLASHBACK_TASK_SIZE; const int LOG_IO_FLASHBACK_TASK_SIZE;
ObBlockAllocMgr clog_blk_alloc_; ObBlockAllocMgr clog_blk_alloc_;
ObBlockAllocMgr inner_table_replay_blk_alloc_;
ObBlockAllocMgr user_table_replay_blk_alloc_;
ObBlockAllocMgr common_blk_alloc_; ObBlockAllocMgr common_blk_alloc_;
ObBlockAllocMgr unlimited_blk_alloc_; ObBlockAllocMgr unlimited_blk_alloc_;
ObBlockAllocMgr replay_log_task_blk_alloc_;
ObVSliceAlloc clog_ge_alloc_; ObVSliceAlloc clog_ge_alloc_;
ObVSliceAlloc inner_table_replay_task_alloc_;
ObVSliceAlloc user_table_replay_task_alloc_;
ObSliceAlloc log_io_flush_log_task_alloc_; ObSliceAlloc log_io_flush_log_task_alloc_;
ObSliceAlloc log_io_truncate_log_task_alloc_; ObSliceAlloc log_io_truncate_log_task_alloc_;
ObSliceAlloc log_io_flush_meta_task_alloc_; ObSliceAlloc log_io_flush_meta_task_alloc_;

View File

@ -49,19 +49,6 @@ int ObTenantMutilAllocatorMgr::get_tenant_log_allocator(const uint64_t tenant_id
return ret; return ret;
} }
// Get the replay allocator for specified tenant, create it when tenant not exist
int ObTenantMutilAllocatorMgr::get_tenant_replay_allocator(const uint64_t tenant_id,
ObIReplayTaskAllocator *&out_allocator)
{
int ret = OB_SUCCESS;
ObTenantMutilAllocator *allocator = NULL;
if (OB_FAIL(get_tenant_mutil_allocator(tenant_id, allocator))) {
} else {
out_allocator = allocator;
}
return ret;
}
int ObTenantMutilAllocatorMgr::get_tenant_mutil_allocator(const uint64_t tenant_id, int ObTenantMutilAllocatorMgr::get_tenant_mutil_allocator(const uint64_t tenant_id,
ObTenantMutilAllocator *&out_allocator) ObTenantMutilAllocator *&out_allocator)
{ {

View File

@ -41,8 +41,6 @@ public:
ObTenantMutilAllocator *&out_allocator); ObTenantMutilAllocator *&out_allocator);
int get_tenant_log_allocator(const uint64_t tenant_id, int get_tenant_log_allocator(const uint64_t tenant_id,
ObILogAllocator *&out_allocator); ObILogAllocator *&out_allocator);
int get_tenant_replay_allocator(const uint64_t tenant_id,
ObIReplayTaskAllocator *&out_allocator);
int get_tenant_limit(const uint64_t tenant_id, int64_t &limit); int get_tenant_limit(const uint64_t tenant_id, int64_t &limit);
int set_tenant_limit(const uint64_t tenant_id, const int64_t new_limit); int set_tenant_limit(const uint64_t tenant_id, const int64_t new_limit);
void *alloc_log_entry_buf(const int64_t size) void *alloc_log_entry_buf(const int64_t size)
@ -71,7 +69,7 @@ private:
obsys::ObRWLock locks_[PRESERVED_TENANT_COUNT]; obsys::ObRWLock locks_[PRESERVED_TENANT_COUNT];
ObTenantMutilAllocator *tma_array_[PRESERVED_TENANT_COUNT]; ObTenantMutilAllocator *tma_array_[PRESERVED_TENANT_COUNT];
ObBlockAllocMgr clog_body_blk_alloc_; ObBlockAllocMgr clog_body_blk_alloc_;
ObVSliceAlloc clog_entry_alloc_; ObVSliceAlloc clog_entry_alloc_;
private: private:
DISALLOW_COPY_AND_ASSIGN(ObTenantMutilAllocatorMgr); DISALLOW_COPY_AND_ASSIGN(ObTenantMutilAllocatorMgr);
}; // end of class ObTenantMutilAllocatorMgr }; // end of class ObTenantMutilAllocatorMgr