diff --git a/mittest/mtlenv/storage/test_write_tablet_slog.cpp b/mittest/mtlenv/storage/test_write_tablet_slog.cpp index 4a4db7cca..b06d914e1 100644 --- a/mittest/mtlenv/storage/test_write_tablet_slog.cpp +++ b/mittest/mtlenv/storage/test_write_tablet_slog.cpp @@ -28,6 +28,7 @@ #include "storage/test_tablet_helper.h" #include "storage/test_dml_common.h" #include "observer/ob_safe_destroy_thread.h" +#include "observer/ob_server_startup_task_handler.h" #include "lib/oblog/ob_log.h" #include "share/ob_force_print_log.h" @@ -68,6 +69,8 @@ void TestWriteTabletSlog::SetUpTestCase() SAFE_DESTROY_INSTANCE.init(); SAFE_DESTROY_INSTANCE.start(); ObServerCheckpointSlogHandler::get_instance().is_started_ = true; + ASSERT_EQ(OB_SUCCESS, SERVER_STARTUP_TASK_HANDLER.init()); + ASSERT_EQ(OB_SUCCESS, SERVER_STARTUP_TASK_HANDLER.start()); // create ls ObLSHandle ls_handle; @@ -84,6 +87,7 @@ void TestWriteTabletSlog::TearDownTestCase() SAFE_DESTROY_INSTANCE.stop(); SAFE_DESTROY_INSTANCE.wait(); SAFE_DESTROY_INSTANCE.destroy(); + SERVER_STARTUP_TASK_HANDLER.destroy(); MockTenantModuleEnv::get_instance().destroy(); } @@ -159,7 +163,7 @@ TEST_F(TestWriteTabletSlog, basic) ASSERT_EQ(OB_SUCCESS, log_replayer.register_redo_module( ObRedoLogMainType::OB_REDO_LOG_TENANT_STORAGE, slog_handler)); ASSERT_EQ(OB_SUCCESS, log_replayer.replay(replay_start_cursor, replay_finish_cursor, OB_SERVER_TENANT_ID)); - ASSERT_EQ(OB_SUCCESS, slog_handler->replay_load_tablets()); + ASSERT_EQ(OB_SUCCESS, slog_handler->concurrent_replay_load_tablets()); // check the result of replay ObTabletHandle replay_tablet_handle; diff --git a/src/observer/CMakeLists.txt b/src/observer/CMakeLists.txt index cc4e6d423..fa47b78ec 100644 --- a/src/observer/CMakeLists.txt +++ b/src/observer/CMakeLists.txt @@ -37,6 +37,7 @@ ob_set_subtarget(ob_server common ob_srv_xlator_storage.cpp ob_tenant_duty_task.cpp ob_uniq_task_queue.cpp + ob_server_startup_task_handler.cpp ) ob_set_subtarget(ob_server common_mixed diff --git a/src/observer/ob_server.cpp b/src/observer/ob_server.cpp index e0b7b1590..7860c21fb 100644 --- a/src/observer/ob_server.cpp +++ b/src/observer/ob_server.cpp @@ -105,6 +105,7 @@ #include "observer/ob_server_utils.h" #include "observer/table_load/ob_table_load_partition_calc.h" #include "observer/virtual_table/ob_mds_event_buffer.h" +#include "observer/ob_server_startup_task_handler.h" #include "share/detect/ob_detect_manager.h" using namespace oceanbase::lib; @@ -369,6 +370,8 @@ int ObServer::init(const ObServerOptions &opts, const ObPLogWriterCfg &log_cfg) LOG_ERROR("init ObStorageLoggerManager failed", KR(ret)); } else if (OB_FAIL(ObVirtualTenantManager::get_instance().init())) { LOG_ERROR("init tenant manager failed", KR(ret)); + } else if (OB_FAIL(SERVER_STARTUP_TASK_HANDLER.init())) { + LOG_ERROR("init server startup task handler failed", KR(ret)); } else if (OB_FAIL(ObServerCheckpointSlogHandler::get_instance().init())) { LOG_ERROR("init server checkpoint slog handler failed", KR(ret)); } else if (FALSE_IT(common::occam::ObThreadHungDetector::get_instance())) { @@ -681,6 +684,10 @@ void ObServer::destroy() ObServerCheckpointSlogHandler::get_instance().destroy(); FLOG_INFO("server checkpoint slog handler destroyed"); + FLOG_INFO("begin to destroy server startup task handler"); + SERVER_STARTUP_TASK_HANDLER.destroy(); + FLOG_INFO("server startup task handler destroyed"); + FLOG_INFO("begin to destroy backup index cache"); OB_BACKUP_INDEX_CACHE.destroy(); FLOG_INFO("backup index cache destroyed"); @@ -750,6 +757,12 @@ int ObServer::start() LOG_ERROR("fail to start signal worker", KR(ret)); } + if (FAILEDx(SERVER_STARTUP_TASK_HANDLER.start())) { + LOG_ERROR("fail to start server startup task handler", KR(ret)); + } else { + FLOG_INFO("success to start server startup task handler"); + } + if (FAILEDx(OB_TS_MGR.start())) { LOG_ERROR("fail to start ts mgr", KR(ret)); } else { @@ -907,6 +920,8 @@ int ObServer::start() stop_ = false; has_stopped_ = false; } + // this handler is only used to process tasks during startup. so it can be destroied here. + SERVER_STARTUP_TASK_HANDLER.destroy(); // refresh server configure // @@ -1263,6 +1278,10 @@ int ObServer::stop() ObServerCheckpointSlogHandler::get_instance().stop(); FLOG_INFO("server checkpoint slog handler stopped"); + FLOG_INFO("begin to stop server startup task handler"); + SERVER_STARTUP_TASK_HANDLER.stop(); + FLOG_INFO("server startup task handler stopped"); + // It will wait for all requests done. FLOG_INFO("begin to stop multi tenant"); multi_tenant_.stop(); @@ -1585,6 +1604,10 @@ int ObServer::wait() ObServerCheckpointSlogHandler::get_instance().wait(); FLOG_INFO("wait server checkpoint slog handler success"); + FLOG_INFO("begin to wait server startup task handler"); + SERVER_STARTUP_TASK_HANDLER.wait(); + FLOG_INFO("wait server startup task handler success"); + FLOG_INFO("begin to wait global election report timer"); palf::election::GLOBAL_REPORT_TIMER.wait(); FLOG_INFO("wait global election report timer success"); diff --git a/src/observer/ob_server_startup_task_handler.cpp b/src/observer/ob_server_startup_task_handler.cpp new file mode 100644 index 000000000..a7188c9aa --- /dev/null +++ b/src/observer/ob_server_startup_task_handler.cpp @@ -0,0 +1,119 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX SERVER +#include "observer/ob_server_startup_task_handler.h" +#include "share/ob_thread_mgr.h" + +namespace oceanbase +{ +namespace observer +{ +const int64_t ObServerStartupTaskHandler::MAX_QUEUED_TASK_NUM = 128; +const int64_t ObServerStartupTaskHandler::MAX_THREAD_NUM = 64; + +ObServerStartupTaskHandler::ObServerStartupTaskHandler() + : is_inited_(false), + tg_id_(-1), + task_allocator_() +{} + +ObServerStartupTaskHandler::~ObServerStartupTaskHandler() +{ + destroy(); +} + +int ObServerStartupTaskHandler::init() +{ + int ret = OB_SUCCESS; + if (is_inited_) { + ret = OB_INIT_TWICE; + LOG_WARN("ObServerStartupTaskHandler has already been inited", K(ret)); + } else if (OB_FAIL(task_allocator_.init(lib::ObMallocAllocator::get_instance(), + OB_MALLOC_NORMAL_BLOCK_SIZE, ObMemAttr(OB_SERVER_TENANT_ID, "StartupTask", ObCtxIds::DEFAULT_CTX_ID)))) { + LOG_WARN("fail to init tenant tiny allocator", K(ret)); + } else if (OB_FAIL(TG_CREATE(lib::TGDefIDs::SvrStartupHandler, tg_id_))) { + LOG_WARN("lib::TGDefIDs::SvrStartupHandler tg create", K(ret)); + } else { + is_inited_ = true; + } + return ret; +} + +int ObServerStartupTaskHandler::start() +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObServerStartupTaskHandler not inited", K(ret)); + } else if (OB_FAIL(TG_SET_HANDLER_AND_START(tg_id_, *this))) { + LOG_WARN("fail to start ObServerStartupTaskHandler", K(ret)); + } + return ret; +} + +void ObServerStartupTaskHandler::stop() +{ + if (IS_INIT) { + TG_STOP(tg_id_); + } +} + +void ObServerStartupTaskHandler::wait() +{ + if (IS_INIT) { + TG_WAIT(tg_id_); + } +} + +void ObServerStartupTaskHandler::destroy() +{ + if (IS_INIT) { + TG_STOP(tg_id_); + TG_WAIT(tg_id_); + TG_DESTROY(tg_id_); + tg_id_ = -1; + task_allocator_.reset(); + is_inited_ = false; + } +} + +int ObServerStartupTaskHandler::push_task(ObServerStartupTask *task) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObServerStartupTaskHandler not inited", K(ret)); + } else if (OB_FAIL(TG_PUSH_TASK(tg_id_, task))) { + LOG_WARN("fail to push server startup task", K(ret), KPC(task)); + } + return ret; +} + +void ObServerStartupTaskHandler::handle(void *task) +{ + int ret = OB_SUCCESS; + if (NULL == task) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("task is null", K(ret)); + } else { + ObServerStartupTask *startup_task = static_cast(task); + if (OB_FAIL(startup_task->execute())) { + LOG_WARN("fail to execute startup task", K(ret), KPC(startup_task)); + } + startup_task->~ObServerStartupTask(); + task_allocator_.free(startup_task); + } +} + +} // observer +} // oceanbase diff --git a/src/observer/ob_server_startup_task_handler.h b/src/observer/ob_server_startup_task_handler.h new file mode 100644 index 000000000..e24910479 --- /dev/null +++ b/src/observer/ob_server_startup_task_handler.h @@ -0,0 +1,72 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OCEABASE_SERVER_OB_SERVER_STARTUP_TASK_HANDLER_H_ +#define OCEABASE_SERVER_OB_SERVER_STARTUP_TASK_HANDLER_H_ + +#include "lib/ob_define.h" +#include "lib/thread/thread_mgr.h" +#include "lib/thread/thread_mgr_interface.h" +#include "lib/allocator/ob_fifo_allocator.h" + +namespace oceanbase +{ +namespace observer +{ +class ObServerStartupTask +{ +public: + ObServerStartupTask() {} + virtual ~ObServerStartupTask() {} + virtual int execute() = 0; + DECLARE_PURE_VIRTUAL_TO_STRING; +}; + +// This handler is only used to process tasks during startup. it can speed up the startup process. +// If you have tasks that need to be processed in parallel, you can use this handler, +// but please note that this handler will be destroyed after obsever startup. +class ObServerStartupTaskHandler : public lib::TGTaskHandler +{ +public: + static const int64_t MAX_QUEUED_TASK_NUM; + static const int64_t MAX_THREAD_NUM; + ObServerStartupTaskHandler(); + ~ObServerStartupTaskHandler(); + int init(); + int start(); + void stop(); + void wait(); + void destroy(); + void handle(void *task) override; + ObIAllocator &get_task_allocator() { return task_allocator_; } + int push_task(ObServerStartupTask *task); + static int64_t get_thread_num() { return std::min(MAX_THREAD_NUM, common::get_cpu_count()); } + static OB_INLINE ObServerStartupTaskHandler &get_instance(); + +private: + bool is_inited_; + int tg_id_; + common::ObFIFOAllocator task_allocator_; +}; + +OB_INLINE ObServerStartupTaskHandler &ObServerStartupTaskHandler::get_instance() +{ + static ObServerStartupTaskHandler instance; + return instance; +} + +#define SERVER_STARTUP_TASK_HANDLER (::oceanbase::observer::ObServerStartupTaskHandler::get_instance()) + +} // observer +} // oceanbase + +#endif diff --git a/src/share/ob_thread_define.h b/src/share/ob_thread_define.h index 14718bf9e..0eeace0ee 100755 --- a/src/share/ob_thread_define.h +++ b/src/share/ob_thread_define.h @@ -152,4 +152,7 @@ TG_DEF(InfoPoolResize, InfoPoolResize, TIMER) TG_DEF(MinorScan, MinorScan, TIMER) TG_DEF(MajorScan, MajorScan, TIMER) TG_DEF(TenantTransferService, TransferSrv, REENTRANT_THREAD_POOL, ThreadCountPair(4 ,1)) +TG_DEF(SvrStartupHandler, SvrStartupHandler, QUEUE_THREAD, + ThreadCountPair(observer::ObServerStartupTaskHandler::get_thread_num(), observer::ObServerStartupTaskHandler::get_thread_num()), + observer::ObServerStartupTaskHandler::MAX_QUEUED_TASK_NUM) #endif diff --git a/src/storage/ls/ob_ls_tablet_service.cpp b/src/storage/ls/ob_ls_tablet_service.cpp index 2fb045849..24d1f487b 100755 --- a/src/storage/ls/ob_ls_tablet_service.cpp +++ b/src/storage/ls/ob_ls_tablet_service.cpp @@ -1869,7 +1869,7 @@ int ObLSTabletService::replay_create_tablet( if (OB_SUCC(ret)) { tablet_transfer_info = tablet->get_tablet_meta().transfer_info_; - FLOG_INFO("succeeded to create tablet for replay slog", K(ret), K(ls_id), K(tablet_id), KPC(tablet)); + FLOG_INFO("succeeded to replay create one tablet", K(ret), K(ls_id), K(tablet_id), KPC(tablet)); } else { int tmp_ret = OB_SUCCESS; if (OB_TMP_FAIL(tablet_id_set_.erase(tablet_id))) { diff --git a/src/storage/slog_ckpt/ob_tenant_checkpoint_slog_handler.cpp b/src/storage/slog_ckpt/ob_tenant_checkpoint_slog_handler.cpp index 558a5c014..5d3a99250 100755 --- a/src/storage/slog_ckpt/ob_tenant_checkpoint_slog_handler.cpp +++ b/src/storage/slog_ckpt/ob_tenant_checkpoint_slog_handler.cpp @@ -155,11 +155,83 @@ void ObTenantCheckpointSlogHandler::ObWriteCheckpointTask::runTimerTask() } } +int ObTenantCheckpointSlogHandler::ObReplayCreateTabletTask::init( + const int64_t task_idx, ObTenantBase *tenant_base, ObTenantCheckpointSlogHandler *handler) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(is_inited_)) { + ret = OB_INIT_TWICE; + LOG_WARN("task has been inited", K(ret), KPC(this)); + } else { + idx_ = task_idx; + tenant_base_ = tenant_base; + tablet_addr_arr_.reset(); + tnt_ckpt_slog_handler_ = handler; + handler->inc_inflight_replay_tablet_task_cnt(); + is_inited_ = true; + } + return ret; +} + +void ObTenantCheckpointSlogHandler::ObReplayCreateTabletTask::destroy() +{ + if (IS_INIT) { + tnt_ckpt_slog_handler_->dec_inflight_replay_tablet_task_cnt(); + idx_ = -1; + tenant_base_ = nullptr; + tnt_ckpt_slog_handler_ = nullptr; + tablet_addr_arr_.reset(); + is_inited_ = false; + } +} +int ObTenantCheckpointSlogHandler::ObReplayCreateTabletTask::execute() +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("task not init", K(ret), KPC(this)); + } else { + ObTenantSwitchGuard guard(tenant_base_); + if (OB_UNLIKELY(MTL(ObTenantCheckpointSlogHandler*) != tnt_ckpt_slog_handler_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected ObTenantCheckpointSlogHandler", K(ret), KPC(this)); + } else if (OB_FAIL(tnt_ckpt_slog_handler_->replay_create_tablets_per_task(tablet_addr_arr_))) { + LOG_WARN("fail to execute replay_create_tablets_per_task", K(ret), KPC(this)); + } else { + FLOG_INFO("successfully execute replay create tablet task", KPC(this)); + } + } + if (OB_FAIL(ret)) { + tnt_ckpt_slog_handler_->set_replay_create_tablet_errcode(ret); + } + return ret; +} + +int ObTenantCheckpointSlogHandler::ObReplayCreateTabletTask::add_tablet_addr( + const ObTabletMapKey &tablet_key, const ObMetaDiskAddr &tablet_addr, bool &is_enough) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("task not init", K(ret)); + } else if (OB_FAIL(tablet_addr_arr_.push_back(std::make_pair(tablet_key, tablet_addr)))) { + LOG_WARN("fail to push_back", K(ret), K(*this)); + } else if (tablet_addr_arr_.count() >= TABLET_NUM_PER_TASK) { + is_enough = true; + } else { + is_enough = false; + } + return ret; +} + ObTenantCheckpointSlogHandler::ObTenantCheckpointSlogHandler() : is_inited_(false), is_writing_checkpoint_(false), last_ckpt_time_(0), last_frozen_version_(0), + inflight_replay_tablet_task_cnt_(0), + finished_replay_tablet_cnt_(0), + replay_create_tablet_errcode_(OB_SUCCESS), lock_(common::ObLatchIds::SLOG_CKPT_LOCK), mutex_(), tablet_key_set_(), @@ -492,8 +564,8 @@ int ObTenantCheckpointSlogHandler::replay_tenant_slog(const common::ObLogCursor LOG_WARN("fail to register redo module", K(ret)); } else if (OB_FAIL(replayer.replay(start_point, replay_finish_point, MTL_ID()))) { LOG_WARN("fail to replay tenant slog", K(ret)); - } else if (OB_FAIL(replay_load_tablets())) { - LOG_WARN("fail to replay load tablets", K(ret)); + } else if (OB_FAIL(concurrent_replay_load_tablets())) { + LOG_WARN("fail to concurrent replay load tablets", K(ret)); } else if (OB_FAIL(replayer.replay_over())) { LOG_WARN("fail to replay over", K(ret)); } else if (OB_FAIL(MTL(ObStorageLogger *)->start_log(replay_finish_point))) { @@ -631,33 +703,130 @@ int ObTenantCheckpointSlogHandler::record_ls_transfer_info( } return ret; } -int ObTenantCheckpointSlogHandler::replay_load_tablets() +int ObTenantCheckpointSlogHandler::concurrent_replay_load_tablets() +{ + int ret = OB_SUCCESS; + const int64_t start_time = ObTimeUtility::current_time(); + const int64_t total_tablet_cnt = replay_tablet_disk_addr_map_.size(); + int64_t cost_time_us = 0; + ReplayTabletDiskAddrMap::iterator iter = replay_tablet_disk_addr_map_.begin(); + ObReplayCreateTabletTask *task = nullptr; + int64_t task_idx = 0; + while (OB_SUCC(ret) && iter != replay_tablet_disk_addr_map_.end()) { + if (nullptr == task) { + if (OB_ISNULL(task = reinterpret_cast( + SERVER_STARTUP_TASK_HANDLER.get_task_allocator().alloc(sizeof(ObReplayCreateTabletTask))))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to alloc task buf", K(ret)); + } else if (FALSE_IT(task = new(task) ObReplayCreateTabletTask())) { + } else if (OB_FAIL(task->init(task_idx++, share::ObTenantEnv::get_tenant(), this))) { + LOG_WARN("fail to init ObReplayCreateTabletTask", K(ret), KPC(task)); + } + } + if (OB_SUCC(ret)) { + bool is_enough = false; + if (OB_FAIL(task->add_tablet_addr(iter->first, iter->second, is_enough))) { + LOG_WARN("fail to add tablet", K(ret), K(iter->first), K(iter->second), KPC(task)); + } else if (is_enough) { // tablet count of this task is enough and will create a new task at next round + if (OB_FAIL(add_replay_create_tablet_task(task))) { + LOG_WARN("fail to add replay tablet task", K(ret), KPC(task), K(inflight_replay_tablet_task_cnt_)); + } else { + task = nullptr; + ++iter; + } + } else { + ++iter; + } + } + + if (OB_FAIL(ret) && OB_NOT_NULL(task)) { + task->~ObReplayCreateTabletTask(); + SERVER_STARTUP_TASK_HANDLER.get_task_allocator().free(task); + task = nullptr; + } + } + + if (OB_SUCC(ret)) { // handle the last task + if (OB_NOT_NULL(task) && OB_FAIL(add_replay_create_tablet_task(task))) { + LOG_WARN("fail to add last replay tablet task", K(ret), KPC(task), K(inflight_replay_tablet_task_cnt_)); + task->~ObReplayCreateTabletTask(); + SERVER_STARTUP_TASK_HANDLER.get_task_allocator().free(task); + task = nullptr; + } + } + // waiting all task finish even if failure has occurred + while (ATOMIC_LOAD(&inflight_replay_tablet_task_cnt_) != 0) { + LOG_INFO("waiting replay create tablet task finish", K(inflight_replay_tablet_task_cnt_)); + ob_usleep(10 * 1000); // 10ms + } + if (OB_SUCC(ret)) { + if (OB_FAIL(ATOMIC_LOAD(&replay_create_tablet_errcode_))) { + LOG_WARN("ObReplayCreateTabletTask has failed", K(ret)); + } else if (ATOMIC_LOAD(&finished_replay_tablet_cnt_) != total_tablet_cnt) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("finished replay tablet cnt mismatch", K(ret), K_(finished_replay_tablet_cnt), K(total_tablet_cnt)); + } + } + + cost_time_us = ObTimeUtility::current_time() - start_time; + FLOG_INFO("finish concurrently repaly load tablets", K(ret), K(total_tablet_cnt), K(cost_time_us)); + + return ret; +} + +int ObTenantCheckpointSlogHandler::add_replay_create_tablet_task(ObReplayCreateTabletTask *task) +{ + int ret = OB_SUCCESS; + bool need_retry = false; + do { + if (OB_FAIL(ATOMIC_LOAD(&replay_create_tablet_errcode_))) { + LOG_WARN("ObReplayCreateTabletTask has failed", K(ret), K(inflight_replay_tablet_task_cnt_)); + } else if (OB_FAIL(SERVER_STARTUP_TASK_HANDLER.push_task(task))) { + if (OB_EAGAIN == ret) { + LOG_INFO("task queue is full, wait and retry", KPC(task), K(inflight_replay_tablet_task_cnt_)); + need_retry = true; + ob_usleep(10 * 1000); // 10ms + } else { + LOG_WARN("fail to push task", K(ret), KPC(task), K(inflight_replay_tablet_task_cnt_)); + } + } else { + FLOG_INFO("add task success", KPC(task), K(inflight_replay_tablet_task_cnt_)); + } + } while(OB_FAIL(ret) && need_retry); + + return ret; +} + +int ObTenantCheckpointSlogHandler::replay_create_tablets_per_task( + const ObIArray> &tablet_addr_arr) { int ret = OB_SUCCESS; char *buf = nullptr; int64_t buf_len = 0; - ReplayTabletDiskAddrMap::iterator iter = replay_tablet_disk_addr_map_.begin(); ObTabletTransferInfo tablet_transfer_info; - while (OB_SUCC(ret) && iter != replay_tablet_disk_addr_map_.end()) { - ObArenaAllocator allocator("ReplayLoad"); - const ObTabletMapKey &key = iter->first; - ObMetaDiskAddr addr = iter->second; + + for (int64_t i = 0; OB_SUCC(ret) && i < tablet_addr_arr.count(); i++) { + ObArenaAllocator allocator("ReplayTablet"); + const ObTabletMapKey &key = tablet_addr_arr.at(i).first; + const ObMetaDiskAddr &addr = tablet_addr_arr.at(i).second; ObLSTabletService *ls_tablet_svr = nullptr; ObLSHandle ls_handle; tablet_transfer_info.reset(); - if (OB_FAIL(read_from_disk(addr, allocator, buf, buf_len))) { + if (OB_FAIL(ATOMIC_LOAD(&replay_create_tablet_errcode_))) { + LOG_WARN("replay create has already failed", K(ret)); + } else if (OB_FAIL(read_from_disk(addr, allocator, buf, buf_len))) { LOG_WARN("fail to read from disk", K(ret), K(addr), KP(buf), K(buf_len)); } else if (OB_FAIL(get_tablet_svr(key.ls_id_, ls_tablet_svr, ls_handle))) { LOG_WARN("fail to get ls tablet service", K(ret)); } else if (OB_FAIL(ls_tablet_svr->replay_create_tablet(addr, buf, buf_len, key.tablet_id_, tablet_transfer_info))) { LOG_WARN("fail to create tablet for replay", K(ret), K(key), K(addr)); } else if (tablet_transfer_info.has_transfer_table() && OB_FAIL(record_ls_transfer_info(ls_handle, key.tablet_id_, tablet_transfer_info))) { - LOG_WARN("fail to create tablet for replay", K(ret), K(key), K(addr)); - } else { - LOG_INFO("Successfully load tablet", K(key), K(addr)); - ++iter; + LOG_WARN("fail to record_ls_transfer_info", K(ret), K(key), K(tablet_transfer_info)); } } + if (OB_SUCC(ret)) { + inc_finished_replay_tablet_cnt(tablet_addr_arr.count()); + } return ret; } diff --git a/src/storage/slog_ckpt/ob_tenant_checkpoint_slog_handler.h b/src/storage/slog_ckpt/ob_tenant_checkpoint_slog_handler.h index de4dc24d7..d1990f57d 100644 --- a/src/storage/slog_ckpt/ob_tenant_checkpoint_slog_handler.h +++ b/src/storage/slog_ckpt/ob_tenant_checkpoint_slog_handler.h @@ -23,6 +23,7 @@ #include "storage/ls/ob_ls_meta.h" #include "storage/tx/ob_dup_table_base.h" #include "storage/high_availability/ob_tablet_transfer_info.h" +#include "observer/ob_server_startup_task_handler.h" namespace oceanbase { @@ -88,6 +89,37 @@ public: ObTenantCheckpointSlogHandler *handler_; }; + class ObReplayCreateTabletTask : public observer::ObServerStartupTask + { + public: + ObReplayCreateTabletTask() + : is_inited_(false), + idx_(-1), + tenant_base_(nullptr), + tnt_ckpt_slog_handler_(nullptr) {} + + virtual ~ObReplayCreateTabletTask() + { + destroy(); + } + int init(const int64_t task_idx, ObTenantBase *tenant_base, ObTenantCheckpointSlogHandler *handler); + int execute() override; + int add_tablet_addr(const ObTabletMapKey &tablet_key, const ObMetaDiskAddr &tablet_addr, bool &is_enough); + + VIRTUAL_TO_STRING_KV(K_(idx), KP(this), KP_(tenant_base), "tablet_count", tablet_addr_arr_.count()); + + private: + static const int64_t TABLET_NUM_PER_TASK = 200; + void destroy(); + + private: + bool is_inited_; + int64_t idx_; + ObTenantBase *tenant_base_; + ObTenantCheckpointSlogHandler *tnt_ckpt_slog_handler_; + common::ObSEArray, TABLET_NUM_PER_TASK> tablet_addr_arr_; + }; + ObTenantCheckpointSlogHandler(); ~ObTenantCheckpointSlogHandler() = default; ObTenantCheckpointSlogHandler(const ObTenantCheckpointSlogHandler &) = delete; @@ -117,6 +149,15 @@ public: char *&buf, int64_t &buf_len); + void inc_inflight_replay_tablet_task_cnt() { ATOMIC_INC(&inflight_replay_tablet_task_cnt_); } + void dec_inflight_replay_tablet_task_cnt() { ATOMIC_DEC(&inflight_replay_tablet_task_cnt_); } + void inc_finished_replay_tablet_cnt(const int64_t cnt) { (void)ATOMIC_FAA(&finished_replay_tablet_cnt_, cnt); } + void set_replay_create_tablet_errcode(const int errcode) + { + ATOMIC_STORE(&replay_create_tablet_errcode_, errcode); + }; + int replay_create_tablets_per_task(const common::ObIArray> &tablet_addr_arr); + private: int get_cur_cursor(); void clean_copy_status(); @@ -134,7 +175,7 @@ private: const bool is_replay_old, ObTenantStorageCheckpointWriter &ckpt_writer); int replay_dup_table_ls_meta(const transaction::ObDupTableLSCheckpoint::ObLSDupTableMeta &dup_ls_meta); int replay_tenant_slog(const common::ObLogCursor &start_point); - int replay_load_tablets(); + int concurrent_replay_load_tablets(); int inner_replay_update_ls_slog(const ObRedoModuleReplayParam ¶m); int inner_replay_create_ls_slog(const ObRedoModuleReplayParam ¶m); int inner_replay_create_ls_commit_slog(const ObRedoModuleReplayParam ¶m); @@ -172,6 +213,7 @@ private: const share::ObLSID &src_ls_id, const share::SCN &transfer_start_scn, bool &is_need); + int add_replay_create_tablet_task(ObReplayCreateTabletTask *task); private: const static int64_t BUCKET_NUM = 109; @@ -181,6 +223,9 @@ private: bool is_writing_checkpoint_; int64_t last_ckpt_time_; int64_t last_frozen_version_; + int64_t inflight_replay_tablet_task_cnt_; + int64_t finished_replay_tablet_cnt_; + int replay_create_tablet_errcode_; common::TCRWLock lock_; // protect block_handle lib::ObMutex mutex_; common::hash::ObHashSet tablet_key_set_;