From e04e3c9410f2201ec75e3169a7a41d1655be16b7 Mon Sep 17 00:00:00 2001 From: obdev Date: Wed, 15 Mar 2023 03:13:53 +0000 Subject: [PATCH] replace timer with TG for ObSharedMacroBlockManager and T3M --- src/share/ob_thread_define.h | 2 + .../ob_shared_macro_block_manager.cpp | 27 ++++++++---- .../ob_shared_macro_block_manager.h | 2 +- .../meta_mem/ob_tenant_meta_mem_mgr.cpp | 44 ++++++++++++------- src/storage/meta_mem/ob_tenant_meta_mem_mgr.h | 2 +- src/storage/slog/ob_storage_log_writer.cpp | 3 +- 6 files changed, 51 insertions(+), 29 deletions(-) diff --git a/src/share/ob_thread_define.h b/src/share/ob_thread_define.h index 52069da107..73694e52c6 100644 --- a/src/share/ob_thread_define.h +++ b/src/share/ob_thread_define.h @@ -129,4 +129,6 @@ TG_DEF(LogUpdater, LogUpdater, "", TG_STATIC, TIMER) TG_DEF(HeartBeatCheckTask, HeartBeatCheckTask, "", TG_STATIC, TIMER) TG_DEF(RedefHeartBeatTask, RedefHeartBeatTask, "", TG_STATIC, TIMER) TG_DEF(MemDumpTimer, MemDumpTimer, "", TG_STATIC, TIMER) +TG_DEF(SSTableDefragment, SSTableDefragment, "", TG_STATIC, TIMER) +TG_DEF(TenantMetaMemMgr, TenantMetaMemMgr, "", TG_STATIC, TIMER) #endif diff --git a/src/storage/blocksstable/ob_shared_macro_block_manager.cpp b/src/storage/blocksstable/ob_shared_macro_block_manager.cpp index 447714be2d..e44db64f47 100644 --- a/src/storage/blocksstable/ob_shared_macro_block_manager.cpp +++ b/src/storage/blocksstable/ob_shared_macro_block_manager.cpp @@ -77,7 +77,7 @@ ObSharedMacroBlockMgr::ObSharedMacroBlockMgr() blocks_mutex_(), block_used_size_(), defragmentation_task_(*this), - timer_(), + tg_id_(-1), is_inited_(false) { } @@ -89,7 +89,8 @@ ObSharedMacroBlockMgr::~ObSharedMacroBlockMgr() void ObSharedMacroBlockMgr::destroy() { - timer_.destroy(); + TG_DESTROY(tg_id_); + tg_id_ = -1; macro_handle_.reset(); offset_ = OB_DEFAULT_MACRO_BLOCK_SIZE; // so we can init block automatically for first write header_size_ = 0; @@ -131,9 +132,8 @@ int ObSharedMacroBlockMgr::init() LOG_WARN("fail to serialize common header", K(ret), K(common_header)); } else if (OB_FAIL(block_used_size_.init("ShareBlksMap", MTL_ID()))) { LOG_WARN("fail to init block used size array", K(ret)); - } else if (FALSE_IT(timer_.set_run_wrapper(MTL_CTX()))) { - } else if (OB_FAIL(timer_.init("SharedBlk"))) { - LOG_WARN("fail to init timer", K(ret)); + } else if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::SSTableDefragment, tg_id_))) { + LOG_WARN("fail to create thread for sstable defragmentation", K(ret)); } else { is_inited_ = true; } @@ -147,20 +147,29 @@ int ObSharedMacroBlockMgr::init() int ObSharedMacroBlockMgr::start() { int ret = OB_SUCCESS; - if (!timer_.task_exist(defragmentation_task_) && OB_FAIL(timer_.schedule(defragmentation_task_, DEFRAGMENT_DELAY_US, true))) { - LOG_WARN("fail to schedule fragmentation task", K(ret)); + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("ObSharedMacroBlockMgr hasn't been inited", K(ret)); + } else if (OB_FAIL(TG_START(tg_id_))) { + LOG_WARN("fail to start sstable defragmentation thread", K(ret), K(tg_id_)); + } else if (OB_FAIL(TG_SCHEDULE(tg_id_, defragmentation_task_, DEFRAGMENT_DELAY_US, true/*repeat*/))) { + LOG_WARN("fail to schedule defragmentation task", K(ret), K(tg_id_)); } return ret; } void ObSharedMacroBlockMgr::stop() { - timer_.stop(); + if (OB_LIKELY(is_inited_)) { + TG_STOP(tg_id_); + } } void ObSharedMacroBlockMgr::wait() { - timer_.wait(); + if (OB_LIKELY(is_inited_)) { + TG_WAIT(tg_id_); + } } int ObSharedMacroBlockMgr::write_block( diff --git a/src/storage/blocksstable/ob_shared_macro_block_manager.h b/src/storage/blocksstable/ob_shared_macro_block_manager.h index 9cfd366d85..ef82139a91 100644 --- a/src/storage/blocksstable/ob_shared_macro_block_manager.h +++ b/src/storage/blocksstable/ob_shared_macro_block_manager.h @@ -182,7 +182,7 @@ private: lib::ObMutex blocks_mutex_; // protect block_used_size_ ObLinearHashMap block_used_size_; ObBlockDefragmentationTask defragmentation_task_; - common::ObTimer timer_; + int tg_id_; bool is_inited_; }; diff --git a/src/storage/meta_mem/ob_tenant_meta_mem_mgr.cpp b/src/storage/meta_mem/ob_tenant_meta_mem_mgr.cpp index 3cc4cdfe56..1953208cc1 100644 --- a/src/storage/meta_mem/ob_tenant_meta_mem_mgr.cpp +++ b/src/storage/meta_mem/ob_tenant_meta_mem_mgr.cpp @@ -29,6 +29,7 @@ #include "storage/tx_storage/ob_ls_map.h" #include "storage/tx_storage/ob_ls_service.h" #include "storage/ddl/ob_tablet_ddl_kv.h" +#include "share/ob_thread_define.h" namespace oceanbase { @@ -86,7 +87,7 @@ ObTenantMetaMemMgr::ObTenantMetaMemMgr(const uint64_t tenant_id) bucket_lock_(), allocator_(tenant_id, wash_func_), tablet_map_(), - timer_(), + tg_id_(-1), table_gc_task_(this), min_minor_sstable_gc_task_(this), refresh_config_task_(), @@ -150,14 +151,11 @@ int ObTenantMetaMemMgr::init() LOG_WARN("fail to create last min minor sstable set", K(ret)); } else if (pinned_tablet_set_.create(DEFAULT_BUCKET_NUM)) { LOG_WARN("fail to create pinned tablet set", K(ret)); + } else if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::TenantMetaMemMgr, tg_id_))) { + LOG_WARN("fail to create thread for t3m", K(ret)); } else { - timer_.set_run_wrapper(MTL_CTX()); - if (OB_FAIL(timer_.init("T3mGC"))) { - LOG_WARN("fail to init itable gc timer", K(ret)); - } else { - init_pool_arr(); - is_inited_ = true; - } + init_pool_arr(); + is_inited_ = true; } if (OB_UNLIKELY(!is_inited_)) { @@ -184,31 +182,46 @@ void ObTenantMetaMemMgr::init_pool_arr() int ObTenantMetaMemMgr::start() { int ret = OB_SUCCESS; - if (OB_FAIL(timer_.schedule(table_gc_task_, TABLE_GC_INTERVAL_US, true/*repeat*/))) { + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("ObTenantMetaMemMgr hasn't been inited", K(ret)); + } else if (OB_FAIL(TG_START(tg_id_))) { + LOG_WARN("fail to start thread for t3m", K(ret), K(tg_id_)); + } else if (OB_FAIL(TG_SCHEDULE(tg_id_, table_gc_task_, TABLE_GC_INTERVAL_US, true/*repeat*/))) { LOG_WARN("fail to schedule itables gc task", K(ret)); - } else if (OB_FAIL(timer_.schedule(min_minor_sstable_gc_task_, MIN_MINOR_SSTABLE_GC_INTERVAL_US, - true/*repeat*/))) { + } else if (OB_FAIL(TG_SCHEDULE( + tg_id_, min_minor_sstable_gc_task_, MIN_MINOR_SSTABLE_GC_INTERVAL_US, true/*repeat*/))) { LOG_WARN("fail to schedule min minor sstable gc task", K(ret)); - } else if (OB_FAIL(timer_.schedule(refresh_config_task_, REFRESH_CONFIG_INTERVAL_US, - true/*repeat*/))) { + } else if (OB_FAIL(TG_SCHEDULE( + tg_id_, refresh_config_task_, REFRESH_CONFIG_INTERVAL_US, true/*repeat*/))) { LOG_WARN("fail to schedule refresh config task", K(ret)); + } else { + LOG_INFO("successfully to start t3m's three tasks", K(ret), K(tg_id_)); } return ret; } void ObTenantMetaMemMgr::stop() { - timer_.stop(); + if (OB_LIKELY(is_inited_)) { + TG_STOP(tg_id_); + LOG_INFO("t3m's three tasks have been stopped", K(tg_id_)); + } } void ObTenantMetaMemMgr::wait() { - timer_.wait(); + if (OB_LIKELY(is_inited_)) { + TG_WAIT(tg_id_); + LOG_INFO("t3m's three tasks have finished wait", K(tg_id_)); + } } void ObTenantMetaMemMgr::destroy() { int ret = OB_SUCCESS; + TG_DESTROY(tg_id_); + tg_id_ = -1; bool is_all_clean = false; tablet_map_.destroy(); last_min_minor_sstable_set_.destroy(); @@ -216,7 +229,6 @@ void ObTenantMetaMemMgr::destroy() while (!is_all_clean && OB_SUCC(gc_tables_in_queue(is_all_clean))); bucket_lock_.destroy(); allocator_.reset(); - timer_.destroy(); for (int64_t i = 0; i <= ObITable::TableType::REMOTE_LOGICAL_MINOR_SSTABLE; i++) { pool_arr_[i] = nullptr; } diff --git a/src/storage/meta_mem/ob_tenant_meta_mem_mgr.h b/src/storage/meta_mem/ob_tenant_meta_mem_mgr.h index 33359cdb96..9872f5d2d1 100644 --- a/src/storage/meta_mem/ob_tenant_meta_mem_mgr.h +++ b/src/storage/meta_mem/ob_tenant_meta_mem_mgr.h @@ -436,7 +436,7 @@ private: ObBucketLock bucket_lock_; TenantMetaAllocator allocator_; ObMetaPointerMap tablet_map_; - common::ObTimer timer_; + int tg_id_; TableGCTask table_gc_task_; MinMinorSSTableGCTask min_minor_sstable_gc_task_; RefreshConfigTask refresh_config_task_; diff --git a/src/storage/slog/ob_storage_log_writer.cpp b/src/storage/slog/ob_storage_log_writer.cpp index 0f0c9f2833..b2d023296f 100644 --- a/src/storage/slog/ob_storage_log_writer.cpp +++ b/src/storage/slog/ob_storage_log_writer.cpp @@ -30,7 +30,6 @@ #include #include #include "share/rc/ob_tenant_base.h" -#include "share/ob_thread_mgr.h" namespace oceanbase { @@ -663,7 +662,7 @@ int ObStorageLogWriter::ObSLogWriteRunner::start() ret = OB_NOT_INIT; STORAGE_REDO_LOG(WARN, "ObSLogWriteRunner hasn't been inited.", K(ret), K(is_inited_)); } else if (OB_FAIL(TG_SET_RUNNABLE_AND_START(tg_id_, *this))) { - STORAGE_REDO_LOG(WARN, "Fail to start log writer thread.", K(tg_id_)); + STORAGE_REDO_LOG(WARN, "Fail to start log writer thread.", K(ret), K(tg_id_)); } return ret; }