[CP] Control thread lease time for submit log thread.

This commit is contained in:
obdev
2024-02-08 07:19:15 +00:00
committed by ob-robot
parent 77513bde39
commit 97d79c5a48
18 changed files with 587 additions and 14 deletions

View File

@ -16,6 +16,7 @@
#include "observer/omt/ob_multi_tenant.h"
#include "logservice/palf/log_io_task.h"
#include "logservice/palf/fetch_log_engine.h"
#include "logservice/palf/log_shared_task.h"
#include "logservice/replayservice/ob_replay_status.h"
namespace oceanbase
@ -28,6 +29,7 @@ namespace common
ObTenantMutilAllocator::ObTenantMutilAllocator(uint64_t tenant_id)
: tenant_id_(tenant_id), total_limit_(INT64_MAX), pending_replay_mutator_size_(0),
LOG_HANDLE_SUBMIT_TASK_SIZE(sizeof(palf::LogHandleSubmitTask)),
LOG_IO_FLUSH_LOG_TASK_SIZE(sizeof(palf::LogIOFlushLogTask)),
LOG_IO_TRUNCATE_LOG_TASK_SIZE(sizeof(palf::LogIOTruncateLogTask)),
LOG_IO_FLUSH_META_TASK_SIZE(sizeof(palf::LogIOFlushMetaTask)),
@ -40,6 +42,7 @@ ObTenantMutilAllocator::ObTenantMutilAllocator(uint64_t tenant_id)
unlimited_blk_alloc_(),
replay_log_task_blk_alloc_(REPLAY_MEM_LIMIT_THRESHOLD),
clog_ge_alloc_(ObMemAttr(tenant_id, ObModIds::OB_CLOG_GE), ObVSliceAlloc::DEFAULT_BLOCK_SIZE, clog_blk_alloc_),
log_handle_submit_task_alloc_(LOG_HANDLE_SUBMIT_TASK_SIZE, ObMemAttr(tenant_id, "HandleSubmit"), choose_blk_size(LOG_HANDLE_SUBMIT_TASK_SIZE), clog_blk_alloc_, this),
log_io_flush_log_task_alloc_(LOG_IO_FLUSH_LOG_TASK_SIZE, ObMemAttr(tenant_id, "FlushLog"), choose_blk_size(LOG_IO_FLUSH_LOG_TASK_SIZE), clog_blk_alloc_, this),
log_io_truncate_log_task_alloc_(LOG_IO_TRUNCATE_LOG_TASK_SIZE, ObMemAttr(tenant_id, "TruncateLog"), choose_blk_size(LOG_IO_TRUNCATE_LOG_TASK_SIZE), clog_blk_alloc_, this),
log_io_flush_meta_task_alloc_(LOG_IO_FLUSH_META_TASK_SIZE, ObMemAttr(tenant_id, "FlushMeta"), choose_blk_size(LOG_IO_FLUSH_META_TASK_SIZE), clog_blk_alloc_, this),
@ -71,6 +74,7 @@ void ObTenantMutilAllocator::destroy()
{
OB_LOG(INFO, "ObTenantMutilAllocator destroy", K(tenant_id_));
clog_ge_alloc_.destroy();
log_handle_submit_task_alloc_.destroy();
log_io_flush_log_task_alloc_.destroy();
log_io_truncate_log_task_alloc_.destroy();
log_io_flush_meta_task_alloc_.destroy();
@ -98,6 +102,7 @@ int ObTenantMutilAllocator::choose_blk_size(int obj_size)
void ObTenantMutilAllocator::try_purge()
{
clog_ge_alloc_.purge_extra_cached_block(0);
log_handle_submit_task_alloc_.purge_extra_cached_block(0);
log_io_flush_log_task_alloc_.purge_extra_cached_block(0);
log_io_truncate_log_task_alloc_.purge_extra_cached_block(0);
log_io_flush_meta_task_alloc_.purge_extra_cached_block(0);
@ -161,6 +166,27 @@ void ObTenantMutilAllocator::free_log_io_flush_log_task(LogIOFlushLogTask *ptr)
}
}
LogHandleSubmitTask *ObTenantMutilAllocator::alloc_log_handle_submit_task(
const int64_t palf_id, const int64_t palf_epoch)
{
LogHandleSubmitTask *ret_ptr = NULL;
void *ptr = log_handle_submit_task_alloc_.alloc();
if (NULL != ptr) {
ret_ptr = new(ptr)LogHandleSubmitTask(palf_id, palf_epoch);
ATOMIC_INC(&flying_log_handle_submit_task_);
}
return ret_ptr;
}
void ObTenantMutilAllocator::free_log_handle_submit_task(LogHandleSubmitTask *ptr)
{
if (OB_LIKELY(NULL != ptr)) {
ptr->~LogHandleSubmitTask();
log_handle_submit_task_alloc_.free(ptr);
ATOMIC_DEC(&flying_log_handle_submit_task_);
}
}
LogIOTruncateLogTask *ObTenantMutilAllocator::alloc_log_io_truncate_log_task(
const int64_t palf_id, const int64_t palf_epoch)
{

View File

@ -25,6 +25,7 @@ namespace oceanbase
namespace palf
{
class LogIOFlushLogTask;
class LogHandleSubmitTask;
class LogIOTruncateLogTask;
class LogIOFlushMetaTask;
class LogIOTruncatePrefixBlocksTask;
@ -44,7 +45,7 @@ class ObTraceProfile;
class ObILogAllocator : public ObIAllocator
{
public:
ObILogAllocator() : flying_log_task_(0), flying_meta_task_(0) {}
ObILogAllocator() : flying_log_task_(0), flying_log_handle_submit_task_(0), flying_meta_task_(0) {}
virtual ~ObILogAllocator() {}
public:
@ -54,6 +55,8 @@ public:
virtual void *ge_alloc(const int64_t size) = 0;
virtual void ge_free(void *ptr) = 0;
virtual const ObBlockAllocMgr &get_clog_blk_alloc_mgr() const = 0;
virtual palf::LogHandleSubmitTask *alloc_log_handle_submit_task(const int64_t palf_id, const int64_t palf_epoch) = 0;
virtual void free_log_handle_submit_task(palf::LogHandleSubmitTask *ptr) = 0;
virtual palf::LogIOFlushLogTask *alloc_log_io_flush_log_task(const int64_t palf_id, const int64_t palf_epoch) = 0;
virtual void free_log_io_flush_log_task(palf::LogIOFlushLogTask *ptr) = 0;
virtual palf::LogIOTruncateLogTask *alloc_log_io_truncate_log_task(const int64_t palf_id, const int64_t palf_epoch) = 0;
@ -76,6 +79,7 @@ public:
protected:
int64_t flying_log_task_;
int64_t flying_log_handle_submit_task_;
int64_t flying_meta_task_;
};
@ -118,6 +122,8 @@ public:
// V4.0
palf::LogIOFlushLogTask *alloc_log_io_flush_log_task(const int64_t palf_id, const int64_t palf_epoch);
void free_log_io_flush_log_task(palf::LogIOFlushLogTask *ptr);
palf::LogHandleSubmitTask *alloc_log_handle_submit_task(const int64_t palf_id, const int64_t palf_epoch);
void free_log_handle_submit_task(palf::LogHandleSubmitTask *ptr);
palf::LogIOTruncateLogTask *alloc_log_io_truncate_log_task(const int64_t palf_id, const int64_t palf_epoch);
void free_log_io_truncate_log_task(palf::LogIOTruncateLogTask *ptr);
palf::LogIOFlushMetaTask *alloc_log_io_flush_meta_task(const int64_t palf_id, const int64_t palf_epoch);
@ -139,6 +145,7 @@ private:
uint64_t tenant_id_ CACHE_ALIGNED;
int64_t total_limit_;
int64_t pending_replay_mutator_size_;
const int LOG_HANDLE_SUBMIT_TASK_SIZE;
const int LOG_IO_FLUSH_LOG_TASK_SIZE;
const int LOG_IO_TRUNCATE_LOG_TASK_SIZE;
const int LOG_IO_FLUSH_META_TASK_SIZE;
@ -151,6 +158,7 @@ private:
ObBlockAllocMgr unlimited_blk_alloc_;
ObBlockAllocMgr replay_log_task_blk_alloc_;
ObVSliceAlloc clog_ge_alloc_;
ObSliceAlloc log_handle_submit_task_alloc_;
ObSliceAlloc log_io_flush_log_task_alloc_;
ObSliceAlloc log_io_truncate_log_task_alloc_;
ObSliceAlloc log_io_flush_meta_task_alloc_;