replay reate tablets concurrently to speed up slog replay

This commit is contained in:
windye 2023-07-13 04:42:22 +00:00 committed by ob-robot
parent 713cfc4ea4
commit 2b3285745a
9 changed files with 452 additions and 16 deletions

View File

@ -28,6 +28,7 @@
#include "storage/test_tablet_helper.h"
#include "storage/test_dml_common.h"
#include "observer/ob_safe_destroy_thread.h"
#include "observer/ob_server_startup_task_handler.h"
#include "lib/oblog/ob_log.h"
#include "share/ob_force_print_log.h"
@ -68,6 +69,8 @@ void TestWriteTabletSlog::SetUpTestCase()
SAFE_DESTROY_INSTANCE.init();
SAFE_DESTROY_INSTANCE.start();
ObServerCheckpointSlogHandler::get_instance().is_started_ = true;
ASSERT_EQ(OB_SUCCESS, SERVER_STARTUP_TASK_HANDLER.init());
ASSERT_EQ(OB_SUCCESS, SERVER_STARTUP_TASK_HANDLER.start());
// create ls
ObLSHandle ls_handle;
@ -84,6 +87,7 @@ void TestWriteTabletSlog::TearDownTestCase()
SAFE_DESTROY_INSTANCE.stop();
SAFE_DESTROY_INSTANCE.wait();
SAFE_DESTROY_INSTANCE.destroy();
SERVER_STARTUP_TASK_HANDLER.destroy();
MockTenantModuleEnv::get_instance().destroy();
}
@ -159,7 +163,7 @@ TEST_F(TestWriteTabletSlog, basic)
ASSERT_EQ(OB_SUCCESS, log_replayer.register_redo_module(
ObRedoLogMainType::OB_REDO_LOG_TENANT_STORAGE, slog_handler));
ASSERT_EQ(OB_SUCCESS, log_replayer.replay(replay_start_cursor, replay_finish_cursor, OB_SERVER_TENANT_ID));
ASSERT_EQ(OB_SUCCESS, slog_handler->replay_load_tablets());
ASSERT_EQ(OB_SUCCESS, slog_handler->concurrent_replay_load_tablets());
// check the result of replay
ObTabletHandle replay_tablet_handle;

View File

@ -37,6 +37,7 @@ ob_set_subtarget(ob_server common
ob_srv_xlator_storage.cpp
ob_tenant_duty_task.cpp
ob_uniq_task_queue.cpp
ob_server_startup_task_handler.cpp
)
ob_set_subtarget(ob_server common_mixed

View File

@ -105,6 +105,7 @@
#include "observer/ob_server_utils.h"
#include "observer/table_load/ob_table_load_partition_calc.h"
#include "observer/virtual_table/ob_mds_event_buffer.h"
#include "observer/ob_server_startup_task_handler.h"
#include "share/detect/ob_detect_manager.h"
using namespace oceanbase::lib;
@ -369,6 +370,8 @@ int ObServer::init(const ObServerOptions &opts, const ObPLogWriterCfg &log_cfg)
LOG_ERROR("init ObStorageLoggerManager failed", KR(ret));
} else if (OB_FAIL(ObVirtualTenantManager::get_instance().init())) {
LOG_ERROR("init tenant manager failed", KR(ret));
} else if (OB_FAIL(SERVER_STARTUP_TASK_HANDLER.init())) {
LOG_ERROR("init server startup task handler failed", KR(ret));
} else if (OB_FAIL(ObServerCheckpointSlogHandler::get_instance().init())) {
LOG_ERROR("init server checkpoint slog handler failed", KR(ret));
} else if (FALSE_IT(common::occam::ObThreadHungDetector::get_instance())) {
@ -681,6 +684,10 @@ void ObServer::destroy()
ObServerCheckpointSlogHandler::get_instance().destroy();
FLOG_INFO("server checkpoint slog handler destroyed");
FLOG_INFO("begin to destroy server startup task handler");
SERVER_STARTUP_TASK_HANDLER.destroy();
FLOG_INFO("server startup task handler destroyed");
FLOG_INFO("begin to destroy backup index cache");
OB_BACKUP_INDEX_CACHE.destroy();
FLOG_INFO("backup index cache destroyed");
@ -750,6 +757,12 @@ int ObServer::start()
LOG_ERROR("fail to start signal worker", KR(ret));
}
if (FAILEDx(SERVER_STARTUP_TASK_HANDLER.start())) {
LOG_ERROR("fail to start server startup task handler", KR(ret));
} else {
FLOG_INFO("success to start server startup task handler");
}
if (FAILEDx(OB_TS_MGR.start())) {
LOG_ERROR("fail to start ts mgr", KR(ret));
} else {
@ -907,6 +920,8 @@ int ObServer::start()
stop_ = false;
has_stopped_ = false;
}
// this handler is only used to process tasks during startup. so it can be destroied here.
SERVER_STARTUP_TASK_HANDLER.destroy();
// refresh server configure
//
@ -1263,6 +1278,10 @@ int ObServer::stop()
ObServerCheckpointSlogHandler::get_instance().stop();
FLOG_INFO("server checkpoint slog handler stopped");
FLOG_INFO("begin to stop server startup task handler");
SERVER_STARTUP_TASK_HANDLER.stop();
FLOG_INFO("server startup task handler stopped");
// It will wait for all requests done.
FLOG_INFO("begin to stop multi tenant");
multi_tenant_.stop();
@ -1585,6 +1604,10 @@ int ObServer::wait()
ObServerCheckpointSlogHandler::get_instance().wait();
FLOG_INFO("wait server checkpoint slog handler success");
FLOG_INFO("begin to wait server startup task handler");
SERVER_STARTUP_TASK_HANDLER.wait();
FLOG_INFO("wait server startup task handler success");
FLOG_INFO("begin to wait global election report timer");
palf::election::GLOBAL_REPORT_TIMER.wait();
FLOG_INFO("wait global election report timer success");

View File

@ -0,0 +1,119 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SERVER
#include "observer/ob_server_startup_task_handler.h"
#include "share/ob_thread_mgr.h"
namespace oceanbase
{
namespace observer
{
const int64_t ObServerStartupTaskHandler::MAX_QUEUED_TASK_NUM = 128;
const int64_t ObServerStartupTaskHandler::MAX_THREAD_NUM = 64;
ObServerStartupTaskHandler::ObServerStartupTaskHandler()
: is_inited_(false),
tg_id_(-1),
task_allocator_()
{}
ObServerStartupTaskHandler::~ObServerStartupTaskHandler()
{
destroy();
}
int ObServerStartupTaskHandler::init()
{
int ret = OB_SUCCESS;
if (is_inited_) {
ret = OB_INIT_TWICE;
LOG_WARN("ObServerStartupTaskHandler has already been inited", K(ret));
} else if (OB_FAIL(task_allocator_.init(lib::ObMallocAllocator::get_instance(),
OB_MALLOC_NORMAL_BLOCK_SIZE, ObMemAttr(OB_SERVER_TENANT_ID, "StartupTask", ObCtxIds::DEFAULT_CTX_ID)))) {
LOG_WARN("fail to init tenant tiny allocator", K(ret));
} else if (OB_FAIL(TG_CREATE(lib::TGDefIDs::SvrStartupHandler, tg_id_))) {
LOG_WARN("lib::TGDefIDs::SvrStartupHandler tg create", K(ret));
} else {
is_inited_ = true;
}
return ret;
}
int ObServerStartupTaskHandler::start()
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObServerStartupTaskHandler not inited", K(ret));
} else if (OB_FAIL(TG_SET_HANDLER_AND_START(tg_id_, *this))) {
LOG_WARN("fail to start ObServerStartupTaskHandler", K(ret));
}
return ret;
}
void ObServerStartupTaskHandler::stop()
{
if (IS_INIT) {
TG_STOP(tg_id_);
}
}
void ObServerStartupTaskHandler::wait()
{
if (IS_INIT) {
TG_WAIT(tg_id_);
}
}
void ObServerStartupTaskHandler::destroy()
{
if (IS_INIT) {
TG_STOP(tg_id_);
TG_WAIT(tg_id_);
TG_DESTROY(tg_id_);
tg_id_ = -1;
task_allocator_.reset();
is_inited_ = false;
}
}
int ObServerStartupTaskHandler::push_task(ObServerStartupTask *task)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObServerStartupTaskHandler not inited", K(ret));
} else if (OB_FAIL(TG_PUSH_TASK(tg_id_, task))) {
LOG_WARN("fail to push server startup task", K(ret), KPC(task));
}
return ret;
}
void ObServerStartupTaskHandler::handle(void *task)
{
int ret = OB_SUCCESS;
if (NULL == task) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("task is null", K(ret));
} else {
ObServerStartupTask *startup_task = static_cast<ObServerStartupTask *>(task);
if (OB_FAIL(startup_task->execute())) {
LOG_WARN("fail to execute startup task", K(ret), KPC(startup_task));
}
startup_task->~ObServerStartupTask();
task_allocator_.free(startup_task);
}
}
} // observer
} // oceanbase

View File

@ -0,0 +1,72 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEABASE_SERVER_OB_SERVER_STARTUP_TASK_HANDLER_H_
#define OCEABASE_SERVER_OB_SERVER_STARTUP_TASK_HANDLER_H_
#include "lib/ob_define.h"
#include "lib/thread/thread_mgr.h"
#include "lib/thread/thread_mgr_interface.h"
#include "lib/allocator/ob_fifo_allocator.h"
namespace oceanbase
{
namespace observer
{
class ObServerStartupTask
{
public:
ObServerStartupTask() {}
virtual ~ObServerStartupTask() {}
virtual int execute() = 0;
DECLARE_PURE_VIRTUAL_TO_STRING;
};
// This handler is only used to process tasks during startup. it can speed up the startup process.
// If you have tasks that need to be processed in parallel, you can use this handler,
// but please note that this handler will be destroyed after obsever startup.
class ObServerStartupTaskHandler : public lib::TGTaskHandler
{
public:
static const int64_t MAX_QUEUED_TASK_NUM;
static const int64_t MAX_THREAD_NUM;
ObServerStartupTaskHandler();
~ObServerStartupTaskHandler();
int init();
int start();
void stop();
void wait();
void destroy();
void handle(void *task) override;
ObIAllocator &get_task_allocator() { return task_allocator_; }
int push_task(ObServerStartupTask *task);
static int64_t get_thread_num() { return std::min(MAX_THREAD_NUM, common::get_cpu_count()); }
static OB_INLINE ObServerStartupTaskHandler &get_instance();
private:
bool is_inited_;
int tg_id_;
common::ObFIFOAllocator task_allocator_;
};
OB_INLINE ObServerStartupTaskHandler &ObServerStartupTaskHandler::get_instance()
{
static ObServerStartupTaskHandler instance;
return instance;
}
#define SERVER_STARTUP_TASK_HANDLER (::oceanbase::observer::ObServerStartupTaskHandler::get_instance())
} // observer
} // oceanbase
#endif

View File

@ -152,4 +152,7 @@ TG_DEF(InfoPoolResize, InfoPoolResize, TIMER)
TG_DEF(MinorScan, MinorScan, TIMER)
TG_DEF(MajorScan, MajorScan, TIMER)
TG_DEF(TenantTransferService, TransferSrv, REENTRANT_THREAD_POOL, ThreadCountPair(4 ,1))
TG_DEF(SvrStartupHandler, SvrStartupHandler, QUEUE_THREAD,
ThreadCountPair(observer::ObServerStartupTaskHandler::get_thread_num(), observer::ObServerStartupTaskHandler::get_thread_num()),
observer::ObServerStartupTaskHandler::MAX_QUEUED_TASK_NUM)
#endif

View File

@ -1869,7 +1869,7 @@ int ObLSTabletService::replay_create_tablet(
if (OB_SUCC(ret)) {
tablet_transfer_info = tablet->get_tablet_meta().transfer_info_;
FLOG_INFO("succeeded to create tablet for replay slog", K(ret), K(ls_id), K(tablet_id), KPC(tablet));
FLOG_INFO("succeeded to replay create one tablet", K(ret), K(ls_id), K(tablet_id), KPC(tablet));
} else {
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(tablet_id_set_.erase(tablet_id))) {

View File

@ -155,11 +155,83 @@ void ObTenantCheckpointSlogHandler::ObWriteCheckpointTask::runTimerTask()
}
}
int ObTenantCheckpointSlogHandler::ObReplayCreateTabletTask::init(
const int64_t task_idx, ObTenantBase *tenant_base, ObTenantCheckpointSlogHandler *handler)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
LOG_WARN("task has been inited", K(ret), KPC(this));
} else {
idx_ = task_idx;
tenant_base_ = tenant_base;
tablet_addr_arr_.reset();
tnt_ckpt_slog_handler_ = handler;
handler->inc_inflight_replay_tablet_task_cnt();
is_inited_ = true;
}
return ret;
}
void ObTenantCheckpointSlogHandler::ObReplayCreateTabletTask::destroy()
{
if (IS_INIT) {
tnt_ckpt_slog_handler_->dec_inflight_replay_tablet_task_cnt();
idx_ = -1;
tenant_base_ = nullptr;
tnt_ckpt_slog_handler_ = nullptr;
tablet_addr_arr_.reset();
is_inited_ = false;
}
}
int ObTenantCheckpointSlogHandler::ObReplayCreateTabletTask::execute()
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("task not init", K(ret), KPC(this));
} else {
ObTenantSwitchGuard guard(tenant_base_);
if (OB_UNLIKELY(MTL(ObTenantCheckpointSlogHandler*) != tnt_ckpt_slog_handler_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected ObTenantCheckpointSlogHandler", K(ret), KPC(this));
} else if (OB_FAIL(tnt_ckpt_slog_handler_->replay_create_tablets_per_task(tablet_addr_arr_))) {
LOG_WARN("fail to execute replay_create_tablets_per_task", K(ret), KPC(this));
} else {
FLOG_INFO("successfully execute replay create tablet task", KPC(this));
}
}
if (OB_FAIL(ret)) {
tnt_ckpt_slog_handler_->set_replay_create_tablet_errcode(ret);
}
return ret;
}
int ObTenantCheckpointSlogHandler::ObReplayCreateTabletTask::add_tablet_addr(
const ObTabletMapKey &tablet_key, const ObMetaDiskAddr &tablet_addr, bool &is_enough)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("task not init", K(ret));
} else if (OB_FAIL(tablet_addr_arr_.push_back(std::make_pair(tablet_key, tablet_addr)))) {
LOG_WARN("fail to push_back", K(ret), K(*this));
} else if (tablet_addr_arr_.count() >= TABLET_NUM_PER_TASK) {
is_enough = true;
} else {
is_enough = false;
}
return ret;
}
ObTenantCheckpointSlogHandler::ObTenantCheckpointSlogHandler()
: is_inited_(false),
is_writing_checkpoint_(false),
last_ckpt_time_(0),
last_frozen_version_(0),
inflight_replay_tablet_task_cnt_(0),
finished_replay_tablet_cnt_(0),
replay_create_tablet_errcode_(OB_SUCCESS),
lock_(common::ObLatchIds::SLOG_CKPT_LOCK),
mutex_(),
tablet_key_set_(),
@ -492,8 +564,8 @@ int ObTenantCheckpointSlogHandler::replay_tenant_slog(const common::ObLogCursor
LOG_WARN("fail to register redo module", K(ret));
} else if (OB_FAIL(replayer.replay(start_point, replay_finish_point, MTL_ID()))) {
LOG_WARN("fail to replay tenant slog", K(ret));
} else if (OB_FAIL(replay_load_tablets())) {
LOG_WARN("fail to replay load tablets", K(ret));
} else if (OB_FAIL(concurrent_replay_load_tablets())) {
LOG_WARN("fail to concurrent replay load tablets", K(ret));
} else if (OB_FAIL(replayer.replay_over())) {
LOG_WARN("fail to replay over", K(ret));
} else if (OB_FAIL(MTL(ObStorageLogger *)->start_log(replay_finish_point))) {
@ -631,33 +703,130 @@ int ObTenantCheckpointSlogHandler::record_ls_transfer_info(
}
return ret;
}
int ObTenantCheckpointSlogHandler::replay_load_tablets()
int ObTenantCheckpointSlogHandler::concurrent_replay_load_tablets()
{
int ret = OB_SUCCESS;
const int64_t start_time = ObTimeUtility::current_time();
const int64_t total_tablet_cnt = replay_tablet_disk_addr_map_.size();
int64_t cost_time_us = 0;
ReplayTabletDiskAddrMap::iterator iter = replay_tablet_disk_addr_map_.begin();
ObReplayCreateTabletTask *task = nullptr;
int64_t task_idx = 0;
while (OB_SUCC(ret) && iter != replay_tablet_disk_addr_map_.end()) {
if (nullptr == task) {
if (OB_ISNULL(task = reinterpret_cast<ObReplayCreateTabletTask*>(
SERVER_STARTUP_TASK_HANDLER.get_task_allocator().alloc(sizeof(ObReplayCreateTabletTask))))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc task buf", K(ret));
} else if (FALSE_IT(task = new(task) ObReplayCreateTabletTask())) {
} else if (OB_FAIL(task->init(task_idx++, share::ObTenantEnv::get_tenant(), this))) {
LOG_WARN("fail to init ObReplayCreateTabletTask", K(ret), KPC(task));
}
}
if (OB_SUCC(ret)) {
bool is_enough = false;
if (OB_FAIL(task->add_tablet_addr(iter->first, iter->second, is_enough))) {
LOG_WARN("fail to add tablet", K(ret), K(iter->first), K(iter->second), KPC(task));
} else if (is_enough) { // tablet count of this task is enough and will create a new task at next round
if (OB_FAIL(add_replay_create_tablet_task(task))) {
LOG_WARN("fail to add replay tablet task", K(ret), KPC(task), K(inflight_replay_tablet_task_cnt_));
} else {
task = nullptr;
++iter;
}
} else {
++iter;
}
}
if (OB_FAIL(ret) && OB_NOT_NULL(task)) {
task->~ObReplayCreateTabletTask();
SERVER_STARTUP_TASK_HANDLER.get_task_allocator().free(task);
task = nullptr;
}
}
if (OB_SUCC(ret)) { // handle the last task
if (OB_NOT_NULL(task) && OB_FAIL(add_replay_create_tablet_task(task))) {
LOG_WARN("fail to add last replay tablet task", K(ret), KPC(task), K(inflight_replay_tablet_task_cnt_));
task->~ObReplayCreateTabletTask();
SERVER_STARTUP_TASK_HANDLER.get_task_allocator().free(task);
task = nullptr;
}
}
// waiting all task finish even if failure has occurred
while (ATOMIC_LOAD(&inflight_replay_tablet_task_cnt_) != 0) {
LOG_INFO("waiting replay create tablet task finish", K(inflight_replay_tablet_task_cnt_));
ob_usleep(10 * 1000); // 10ms
}
if (OB_SUCC(ret)) {
if (OB_FAIL(ATOMIC_LOAD(&replay_create_tablet_errcode_))) {
LOG_WARN("ObReplayCreateTabletTask has failed", K(ret));
} else if (ATOMIC_LOAD(&finished_replay_tablet_cnt_) != total_tablet_cnt) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("finished replay tablet cnt mismatch", K(ret), K_(finished_replay_tablet_cnt), K(total_tablet_cnt));
}
}
cost_time_us = ObTimeUtility::current_time() - start_time;
FLOG_INFO("finish concurrently repaly load tablets", K(ret), K(total_tablet_cnt), K(cost_time_us));
return ret;
}
int ObTenantCheckpointSlogHandler::add_replay_create_tablet_task(ObReplayCreateTabletTask *task)
{
int ret = OB_SUCCESS;
bool need_retry = false;
do {
if (OB_FAIL(ATOMIC_LOAD(&replay_create_tablet_errcode_))) {
LOG_WARN("ObReplayCreateTabletTask has failed", K(ret), K(inflight_replay_tablet_task_cnt_));
} else if (OB_FAIL(SERVER_STARTUP_TASK_HANDLER.push_task(task))) {
if (OB_EAGAIN == ret) {
LOG_INFO("task queue is full, wait and retry", KPC(task), K(inflight_replay_tablet_task_cnt_));
need_retry = true;
ob_usleep(10 * 1000); // 10ms
} else {
LOG_WARN("fail to push task", K(ret), KPC(task), K(inflight_replay_tablet_task_cnt_));
}
} else {
FLOG_INFO("add task success", KPC(task), K(inflight_replay_tablet_task_cnt_));
}
} while(OB_FAIL(ret) && need_retry);
return ret;
}
int ObTenantCheckpointSlogHandler::replay_create_tablets_per_task(
const ObIArray<std::pair<ObTabletMapKey, ObMetaDiskAddr>> &tablet_addr_arr)
{
int ret = OB_SUCCESS;
char *buf = nullptr;
int64_t buf_len = 0;
ReplayTabletDiskAddrMap::iterator iter = replay_tablet_disk_addr_map_.begin();
ObTabletTransferInfo tablet_transfer_info;
while (OB_SUCC(ret) && iter != replay_tablet_disk_addr_map_.end()) {
ObArenaAllocator allocator("ReplayLoad");
const ObTabletMapKey &key = iter->first;
ObMetaDiskAddr addr = iter->second;
for (int64_t i = 0; OB_SUCC(ret) && i < tablet_addr_arr.count(); i++) {
ObArenaAllocator allocator("ReplayTablet");
const ObTabletMapKey &key = tablet_addr_arr.at(i).first;
const ObMetaDiskAddr &addr = tablet_addr_arr.at(i).second;
ObLSTabletService *ls_tablet_svr = nullptr;
ObLSHandle ls_handle;
tablet_transfer_info.reset();
if (OB_FAIL(read_from_disk(addr, allocator, buf, buf_len))) {
if (OB_FAIL(ATOMIC_LOAD(&replay_create_tablet_errcode_))) {
LOG_WARN("replay create has already failed", K(ret));
} else if (OB_FAIL(read_from_disk(addr, allocator, buf, buf_len))) {
LOG_WARN("fail to read from disk", K(ret), K(addr), KP(buf), K(buf_len));
} else if (OB_FAIL(get_tablet_svr(key.ls_id_, ls_tablet_svr, ls_handle))) {
LOG_WARN("fail to get ls tablet service", K(ret));
} else if (OB_FAIL(ls_tablet_svr->replay_create_tablet(addr, buf, buf_len, key.tablet_id_, tablet_transfer_info))) {
LOG_WARN("fail to create tablet for replay", K(ret), K(key), K(addr));
} else if (tablet_transfer_info.has_transfer_table() && OB_FAIL(record_ls_transfer_info(ls_handle, key.tablet_id_, tablet_transfer_info))) {
LOG_WARN("fail to create tablet for replay", K(ret), K(key), K(addr));
} else {
LOG_INFO("Successfully load tablet", K(key), K(addr));
++iter;
LOG_WARN("fail to record_ls_transfer_info", K(ret), K(key), K(tablet_transfer_info));
}
}
if (OB_SUCC(ret)) {
inc_finished_replay_tablet_cnt(tablet_addr_arr.count());
}
return ret;
}

View File

@ -23,6 +23,7 @@
#include "storage/ls/ob_ls_meta.h"
#include "storage/tx/ob_dup_table_base.h"
#include "storage/high_availability/ob_tablet_transfer_info.h"
#include "observer/ob_server_startup_task_handler.h"
namespace oceanbase
{
@ -88,6 +89,37 @@ public:
ObTenantCheckpointSlogHandler *handler_;
};
class ObReplayCreateTabletTask : public observer::ObServerStartupTask
{
public:
ObReplayCreateTabletTask()
: is_inited_(false),
idx_(-1),
tenant_base_(nullptr),
tnt_ckpt_slog_handler_(nullptr) {}
virtual ~ObReplayCreateTabletTask()
{
destroy();
}
int init(const int64_t task_idx, ObTenantBase *tenant_base, ObTenantCheckpointSlogHandler *handler);
int execute() override;
int add_tablet_addr(const ObTabletMapKey &tablet_key, const ObMetaDiskAddr &tablet_addr, bool &is_enough);
VIRTUAL_TO_STRING_KV(K_(idx), KP(this), KP_(tenant_base), "tablet_count", tablet_addr_arr_.count());
private:
static const int64_t TABLET_NUM_PER_TASK = 200;
void destroy();
private:
bool is_inited_;
int64_t idx_;
ObTenantBase *tenant_base_;
ObTenantCheckpointSlogHandler *tnt_ckpt_slog_handler_;
common::ObSEArray<std::pair<ObTabletMapKey, ObMetaDiskAddr>, TABLET_NUM_PER_TASK> tablet_addr_arr_;
};
ObTenantCheckpointSlogHandler();
~ObTenantCheckpointSlogHandler() = default;
ObTenantCheckpointSlogHandler(const ObTenantCheckpointSlogHandler &) = delete;
@ -117,6 +149,15 @@ public:
char *&buf,
int64_t &buf_len);
void inc_inflight_replay_tablet_task_cnt() { ATOMIC_INC(&inflight_replay_tablet_task_cnt_); }
void dec_inflight_replay_tablet_task_cnt() { ATOMIC_DEC(&inflight_replay_tablet_task_cnt_); }
void inc_finished_replay_tablet_cnt(const int64_t cnt) { (void)ATOMIC_FAA(&finished_replay_tablet_cnt_, cnt); }
void set_replay_create_tablet_errcode(const int errcode)
{
ATOMIC_STORE(&replay_create_tablet_errcode_, errcode);
};
int replay_create_tablets_per_task(const common::ObIArray<std::pair<ObTabletMapKey, ObMetaDiskAddr>> &tablet_addr_arr);
private:
int get_cur_cursor();
void clean_copy_status();
@ -134,7 +175,7 @@ private:
const bool is_replay_old, ObTenantStorageCheckpointWriter &ckpt_writer);
int replay_dup_table_ls_meta(const transaction::ObDupTableLSCheckpoint::ObLSDupTableMeta &dup_ls_meta);
int replay_tenant_slog(const common::ObLogCursor &start_point);
int replay_load_tablets();
int concurrent_replay_load_tablets();
int inner_replay_update_ls_slog(const ObRedoModuleReplayParam &param);
int inner_replay_create_ls_slog(const ObRedoModuleReplayParam &param);
int inner_replay_create_ls_commit_slog(const ObRedoModuleReplayParam &param);
@ -172,6 +213,7 @@ private:
const share::ObLSID &src_ls_id,
const share::SCN &transfer_start_scn,
bool &is_need);
int add_replay_create_tablet_task(ObReplayCreateTabletTask *task);
private:
const static int64_t BUCKET_NUM = 109;
@ -181,6 +223,9 @@ private:
bool is_writing_checkpoint_;
int64_t last_ckpt_time_;
int64_t last_frozen_version_;
int64_t inflight_replay_tablet_task_cnt_;
int64_t finished_replay_tablet_cnt_;
int replay_create_tablet_errcode_;
common::TCRWLock lock_; // protect block_handle
lib::ObMutex mutex_;
common::hash::ObHashSet<ObTabletMapKey> tablet_key_set_;