replace timer with TG for ObSharedMacroBlockManager and T3M
This commit is contained in:
@ -129,4 +129,6 @@ TG_DEF(LogUpdater, LogUpdater, "", TG_STATIC, TIMER)
|
||||
TG_DEF(HeartBeatCheckTask, HeartBeatCheckTask, "", TG_STATIC, TIMER)
|
||||
TG_DEF(RedefHeartBeatTask, RedefHeartBeatTask, "", TG_STATIC, TIMER)
|
||||
TG_DEF(MemDumpTimer, MemDumpTimer, "", TG_STATIC, TIMER)
|
||||
TG_DEF(SSTableDefragment, SSTableDefragment, "", TG_STATIC, TIMER)
|
||||
TG_DEF(TenantMetaMemMgr, TenantMetaMemMgr, "", TG_STATIC, TIMER)
|
||||
#endif
|
||||
|
||||
@ -77,7 +77,7 @@ ObSharedMacroBlockMgr::ObSharedMacroBlockMgr()
|
||||
blocks_mutex_(),
|
||||
block_used_size_(),
|
||||
defragmentation_task_(*this),
|
||||
timer_(),
|
||||
tg_id_(-1),
|
||||
is_inited_(false)
|
||||
{
|
||||
}
|
||||
@ -89,7 +89,8 @@ ObSharedMacroBlockMgr::~ObSharedMacroBlockMgr()
|
||||
|
||||
void ObSharedMacroBlockMgr::destroy()
|
||||
{
|
||||
timer_.destroy();
|
||||
TG_DESTROY(tg_id_);
|
||||
tg_id_ = -1;
|
||||
macro_handle_.reset();
|
||||
offset_ = OB_DEFAULT_MACRO_BLOCK_SIZE; // so we can init block automatically for first write
|
||||
header_size_ = 0;
|
||||
@ -131,9 +132,8 @@ int ObSharedMacroBlockMgr::init()
|
||||
LOG_WARN("fail to serialize common header", K(ret), K(common_header));
|
||||
} else if (OB_FAIL(block_used_size_.init("ShareBlksMap", MTL_ID()))) {
|
||||
LOG_WARN("fail to init block used size array", K(ret));
|
||||
} else if (FALSE_IT(timer_.set_run_wrapper(MTL_CTX()))) {
|
||||
} else if (OB_FAIL(timer_.init("SharedBlk"))) {
|
||||
LOG_WARN("fail to init timer", K(ret));
|
||||
} else if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::SSTableDefragment, tg_id_))) {
|
||||
LOG_WARN("fail to create thread for sstable defragmentation", K(ret));
|
||||
} else {
|
||||
is_inited_ = true;
|
||||
}
|
||||
@ -147,20 +147,29 @@ int ObSharedMacroBlockMgr::init()
|
||||
int ObSharedMacroBlockMgr::start()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!timer_.task_exist(defragmentation_task_) && OB_FAIL(timer_.schedule(defragmentation_task_, DEFRAGMENT_DELAY_US, true))) {
|
||||
LOG_WARN("fail to schedule fragmentation task", K(ret));
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObSharedMacroBlockMgr hasn't been inited", K(ret));
|
||||
} else if (OB_FAIL(TG_START(tg_id_))) {
|
||||
LOG_WARN("fail to start sstable defragmentation thread", K(ret), K(tg_id_));
|
||||
} else if (OB_FAIL(TG_SCHEDULE(tg_id_, defragmentation_task_, DEFRAGMENT_DELAY_US, true/*repeat*/))) {
|
||||
LOG_WARN("fail to schedule defragmentation task", K(ret), K(tg_id_));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObSharedMacroBlockMgr::stop()
|
||||
{
|
||||
timer_.stop();
|
||||
if (OB_LIKELY(is_inited_)) {
|
||||
TG_STOP(tg_id_);
|
||||
}
|
||||
}
|
||||
|
||||
void ObSharedMacroBlockMgr::wait()
|
||||
{
|
||||
timer_.wait();
|
||||
if (OB_LIKELY(is_inited_)) {
|
||||
TG_WAIT(tg_id_);
|
||||
}
|
||||
}
|
||||
|
||||
int ObSharedMacroBlockMgr::write_block(
|
||||
|
||||
@ -182,7 +182,7 @@ private:
|
||||
lib::ObMutex blocks_mutex_; // protect block_used_size_
|
||||
ObLinearHashMap<MacroBlockId, int32_t> block_used_size_;
|
||||
ObBlockDefragmentationTask defragmentation_task_;
|
||||
common::ObTimer timer_;
|
||||
int tg_id_;
|
||||
bool is_inited_;
|
||||
};
|
||||
|
||||
|
||||
@ -29,6 +29,7 @@
|
||||
#include "storage/tx_storage/ob_ls_map.h"
|
||||
#include "storage/tx_storage/ob_ls_service.h"
|
||||
#include "storage/ddl/ob_tablet_ddl_kv.h"
|
||||
#include "share/ob_thread_define.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -86,7 +87,7 @@ ObTenantMetaMemMgr::ObTenantMetaMemMgr(const uint64_t tenant_id)
|
||||
bucket_lock_(),
|
||||
allocator_(tenant_id, wash_func_),
|
||||
tablet_map_(),
|
||||
timer_(),
|
||||
tg_id_(-1),
|
||||
table_gc_task_(this),
|
||||
min_minor_sstable_gc_task_(this),
|
||||
refresh_config_task_(),
|
||||
@ -150,15 +151,12 @@ int ObTenantMetaMemMgr::init()
|
||||
LOG_WARN("fail to create last min minor sstable set", K(ret));
|
||||
} else if (pinned_tablet_set_.create(DEFAULT_BUCKET_NUM)) {
|
||||
LOG_WARN("fail to create pinned tablet set", K(ret));
|
||||
} else {
|
||||
timer_.set_run_wrapper(MTL_CTX());
|
||||
if (OB_FAIL(timer_.init("T3mGC"))) {
|
||||
LOG_WARN("fail to init itable gc timer", K(ret));
|
||||
} else if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::TenantMetaMemMgr, tg_id_))) {
|
||||
LOG_WARN("fail to create thread for t3m", K(ret));
|
||||
} else {
|
||||
init_pool_arr();
|
||||
is_inited_ = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
destroy();
|
||||
@ -184,31 +182,46 @@ void ObTenantMetaMemMgr::init_pool_arr()
|
||||
int ObTenantMetaMemMgr::start()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(timer_.schedule(table_gc_task_, TABLE_GC_INTERVAL_US, true/*repeat*/))) {
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTenantMetaMemMgr hasn't been inited", K(ret));
|
||||
} else if (OB_FAIL(TG_START(tg_id_))) {
|
||||
LOG_WARN("fail to start thread for t3m", K(ret), K(tg_id_));
|
||||
} else if (OB_FAIL(TG_SCHEDULE(tg_id_, table_gc_task_, TABLE_GC_INTERVAL_US, true/*repeat*/))) {
|
||||
LOG_WARN("fail to schedule itables gc task", K(ret));
|
||||
} else if (OB_FAIL(timer_.schedule(min_minor_sstable_gc_task_, MIN_MINOR_SSTABLE_GC_INTERVAL_US,
|
||||
true/*repeat*/))) {
|
||||
} else if (OB_FAIL(TG_SCHEDULE(
|
||||
tg_id_, min_minor_sstable_gc_task_, MIN_MINOR_SSTABLE_GC_INTERVAL_US, true/*repeat*/))) {
|
||||
LOG_WARN("fail to schedule min minor sstable gc task", K(ret));
|
||||
} else if (OB_FAIL(timer_.schedule(refresh_config_task_, REFRESH_CONFIG_INTERVAL_US,
|
||||
true/*repeat*/))) {
|
||||
} else if (OB_FAIL(TG_SCHEDULE(
|
||||
tg_id_, refresh_config_task_, REFRESH_CONFIG_INTERVAL_US, true/*repeat*/))) {
|
||||
LOG_WARN("fail to schedule refresh config task", K(ret));
|
||||
} else {
|
||||
LOG_INFO("successfully to start t3m's three tasks", K(ret), K(tg_id_));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObTenantMetaMemMgr::stop()
|
||||
{
|
||||
timer_.stop();
|
||||
if (OB_LIKELY(is_inited_)) {
|
||||
TG_STOP(tg_id_);
|
||||
LOG_INFO("t3m's three tasks have been stopped", K(tg_id_));
|
||||
}
|
||||
}
|
||||
|
||||
void ObTenantMetaMemMgr::wait()
|
||||
{
|
||||
timer_.wait();
|
||||
if (OB_LIKELY(is_inited_)) {
|
||||
TG_WAIT(tg_id_);
|
||||
LOG_INFO("t3m's three tasks have finished wait", K(tg_id_));
|
||||
}
|
||||
}
|
||||
|
||||
void ObTenantMetaMemMgr::destroy()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
TG_DESTROY(tg_id_);
|
||||
tg_id_ = -1;
|
||||
bool is_all_clean = false;
|
||||
tablet_map_.destroy();
|
||||
last_min_minor_sstable_set_.destroy();
|
||||
@ -216,7 +229,6 @@ void ObTenantMetaMemMgr::destroy()
|
||||
while (!is_all_clean && OB_SUCC(gc_tables_in_queue(is_all_clean)));
|
||||
bucket_lock_.destroy();
|
||||
allocator_.reset();
|
||||
timer_.destroy();
|
||||
for (int64_t i = 0; i <= ObITable::TableType::REMOTE_LOGICAL_MINOR_SSTABLE; i++) {
|
||||
pool_arr_[i] = nullptr;
|
||||
}
|
||||
|
||||
@ -436,7 +436,7 @@ private:
|
||||
ObBucketLock bucket_lock_;
|
||||
TenantMetaAllocator allocator_;
|
||||
ObMetaPointerMap<ObTabletMapKey, ObTablet> tablet_map_;
|
||||
common::ObTimer timer_;
|
||||
int tg_id_;
|
||||
TableGCTask table_gc_task_;
|
||||
MinMinorSSTableGCTask min_minor_sstable_gc_task_;
|
||||
RefreshConfigTask refresh_config_task_;
|
||||
|
||||
@ -30,7 +30,6 @@
|
||||
#include <limits.h>
|
||||
#include <string.h>
|
||||
#include "share/rc/ob_tenant_base.h"
|
||||
#include "share/ob_thread_mgr.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -663,7 +662,7 @@ int ObStorageLogWriter::ObSLogWriteRunner::start()
|
||||
ret = OB_NOT_INIT;
|
||||
STORAGE_REDO_LOG(WARN, "ObSLogWriteRunner hasn't been inited.", K(ret), K(is_inited_));
|
||||
} else if (OB_FAIL(TG_SET_RUNNABLE_AND_START(tg_id_, *this))) {
|
||||
STORAGE_REDO_LOG(WARN, "Fail to start log writer thread.", K(tg_id_));
|
||||
STORAGE_REDO_LOG(WARN, "Fail to start log writer thread.", K(ret), K(tg_id_));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user