fixed restart failed because of create ls failed in ObLogService

This commit is contained in:
HaHaJeff
2024-02-10 08:55:39 +00:00
committed by ob-robot
parent d6af81e763
commit a41bedc587
5 changed files with 113 additions and 60 deletions

View File

@ -1825,7 +1825,7 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_iow_memleak)
// case2: palf epoch has been changed during after_consume
IOTaskConsumeCond consume_cond(id, leader.palf_env_impl_->last_palf_epoch_);
EXPECT_EQ(OB_SUCCESS, iow->submit_io_task(&consume_cond));
sleep(1);
sleep(3);
EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->set_base_lsn(end_lsn));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE));
EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->log_engine_.submit_purge_throttling_task(PurgeThrottlingType::PURGE_BY_GET_MC_REQ));
@ -1853,6 +1853,43 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_iow_memleak)
}
}
TEST_F(TestObSimpleLogClusterSingleReplica, test_log_service_interface)
{
SET_CASE_LOG_FILE(TEST_NAME, "test_log_service_interface");
int64_t id = ATOMIC_AAF(&palf_id_, 1);
ObSimpleLogServer *log_server = dynamic_cast<ObSimpleLogServer*>(get_cluster()[0]);
ASSERT_NE(nullptr, log_server);
ObLogService *log_service = &log_server->log_service_;
ObTenantRole tenant_role; tenant_role.value_ = ObTenantRole::Role::PRIMARY_TENANT;
PalfBaseInfo palf_base_info; palf_base_info.generate_by_default();
ObLogHandler log_handler; ObLogRestoreHandler restore_handler;
ObLogApplyService *apply_service = &log_service->apply_service_;
ObReplicaType replica_type;
ObLSID ls_id(id);
ObApplyStatus *apply_status = nullptr;
ASSERT_NE(nullptr, apply_status = static_cast<ObApplyStatus*>(mtl_malloc(sizeof(ObApplyStatus), "mittest")));
new (apply_status) ObApplyStatus();
apply_status->inc_ref();
EXPECT_EQ(OB_SUCCESS, log_service->start());
EXPECT_EQ(OB_SUCCESS, apply_service->apply_status_map_.insert(ls_id, apply_status));
apply_service->is_running_ = true;
EXPECT_EQ(OB_ENTRY_EXIST, log_service->create_ls(ls_id, REPLICA_TYPE_FULL, tenant_role, palf_base_info, true, log_handler, restore_handler));
bool is_exist = false;
EXPECT_EQ(OB_SUCCESS, log_service->check_palf_exist(ls_id, is_exist));
EXPECT_EQ(is_exist, false);
EXPECT_EQ(OB_ENTRY_NOT_EXIST, apply_service->apply_status_map_.erase(ls_id));
EXPECT_EQ(OB_SUCCESS, log_service->create_ls(ls_id, REPLICA_TYPE_FULL, tenant_role, palf_base_info, true, log_handler, restore_handler));
EXPECT_EQ(OB_ENTRY_EXIST, log_service->create_ls(ls_id, REPLICA_TYPE_FULL, tenant_role, palf_base_info, true, log_handler, restore_handler));
EXPECT_EQ(OB_SUCCESS, log_service->check_palf_exist(ls_id, is_exist));
EXPECT_EQ(is_exist, true);
const char *log_dir = log_service->palf_env_->palf_env_impl_.log_dir_;
bool result = false;
EXPECT_EQ(OB_SUCCESS, FileDirectoryUtils::is_empty_directory(log_dir, result));
EXPECT_EQ(false, result);
EXPECT_EQ(OB_SUCCESS, log_service->remove_ls(ls_id, log_handler, restore_handler));
EXPECT_EQ(OB_SUCCESS, log_service->check_palf_exist(ls_id, is_exist));
}
} // namespace unittest
} // namespace oceanbase

View File

@ -62,7 +62,6 @@ int ObLogHandler::init(const int64_t id,
ObLogApplyService *apply_service,
ObLogReplayService *replay_service,
ObRoleChangeService *rc_service,
PalfHandle &palf_handle,
PalfEnv *palf_env,
PalfLocationCacheCb *lc_cb,
obrpc::ObLogServiceRpcProxy *rpc_proxy)
@ -73,18 +72,19 @@ int ObLogHandler::init(const int64_t id,
share::ObLSID ls_id(id);
if (IS_INIT) {
ret = OB_INIT_TWICE;
} else if (false == palf_handle.is_valid() ||
OB_ISNULL(palf_env) ||
} else if (OB_ISNULL(palf_env) ||
OB_ISNULL(apply_service) ||
OB_ISNULL(lc_cb) ||
OB_ISNULL(rpc_proxy)) {
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(WARN, "invalid arguments", K(palf_handle), KP(palf_env), KP(lc_cb), KP(rpc_proxy));
CLOG_LOG(WARN, "invalid arguments", K(id), KP(palf_env), KP(lc_cb), KP(rpc_proxy));
} else if (OB_FAIL(apply_service->get_apply_status(ls_id, guard))) {
CLOG_LOG(WARN, "guard get apply status failed", K(ret), K(id));
} else if (NULL == (apply_status_ = guard.get_apply_status())) {
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(WARN, "apply status is not exist", K(ret), K(id));
} else if (OB_FAIL(palf_env->open(id, palf_handle_))) {
CLOG_LOG(WARN, "open palf failed", K(ret), K(id));
} else {
get_max_decided_scn_debug_time_ = OB_INVALID_TIMESTAMP;
apply_service_ = apply_service;
@ -95,7 +95,6 @@ int ObLogHandler::init(const int64_t id,
append_cost_stat_.set_extra_info(EXTRA_INFOS);
id_ = id;
self_ = self;
palf_handle_ = palf_handle;
palf_env_ = palf_env;
role_ = FOLLOWER;
lc_cb_ = lc_cb;
@ -103,7 +102,10 @@ int ObLogHandler::init(const int64_t id,
is_in_stop_state_ = false;
is_offline_ = true; // offline at default.
is_inited_ = true;
FLOG_INFO("ObLogHandler init success", K(id), K(palf_handle));
FLOG_INFO("ObLogHandler init success", K(id));
}
if (OB_FAIL(ret) && OB_INIT_TWICE != ret) {
destroy();
}
return ret;
}
@ -180,26 +182,25 @@ int ObLogHandler::safe_to_destroy(bool &is_safe_destroy)
void ObLogHandler::destroy()
{
WLockGuard guard(lock_);
int ret = OB_SUCCESS;
if (IS_INIT) {
is_inited_ = false;
is_offline_ = false;
is_in_stop_state_ = true;
common::TCWLockGuard deps_guard(deps_lock_);
is_inited_ = false;
is_offline_ = false;
is_in_stop_state_ = true;
common::TCWLockGuard deps_guard(deps_lock_);
if (NULL != apply_service_ && NULL != apply_status_) {
apply_service_->revert_apply_status(apply_status_);
apply_status_ = NULL;
apply_service_ = NULL;
replay_service_ = NULL;
if (true == palf_handle_.is_valid()) {
palf_env_->close(palf_handle_);
}
rc_service_ = NULL;
lc_cb_ = NULL;
rpc_proxy_ = NULL;
palf_env_ = NULL;
id_ = -1;
get_max_decided_scn_debug_time_ = OB_INVALID_TIMESTAMP;
}
apply_status_ = NULL;
apply_service_ = NULL;
replay_service_ = NULL;
if (NULL != palf_env_ && true == palf_handle_.is_valid()) {
palf_env_->close(palf_handle_);
}
rc_service_ = NULL;
lc_cb_ = NULL;
rpc_proxy_ = NULL;
palf_env_ = NULL;
id_ = -1;
get_max_decided_scn_debug_time_ = OB_INVALID_TIMESTAMP;
}
int ObLogHandler::append(const void *buffer,

View File

@ -196,7 +196,6 @@ public:
ObLogApplyService *apply_service,
ObLogReplayService *replay_service,
ObRoleChangeService *rc_service,
palf::PalfHandle &palf_handle,
palf::PalfEnv *palf_env,
palf::PalfLocationCacheCb *lc_cb,
obrpc::ObLogServiceRpcProxy *rpc_proxy);

View File

@ -409,22 +409,19 @@ int ObLogService::add_ls(const ObLSID &id,
ObLogRestoreHandler &restore_handler)
{
int ret = OB_SUCCESS;
PalfHandle palf_handle;
PalfHandle &log_handler_palf_handle = log_handler.palf_handle_;
PalfRoleChangeCb *rc_cb = &role_change_service_;
PalfLocationCacheCb *loc_cache_cb = &location_adapter_;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
CLOG_LOG(WARN, "log_service is not inited", K(ret), K(id));
} else if (OB_FAIL(palf_env_->open(id.id(), palf_handle))) {
CLOG_LOG(WARN, "failed to get palf_handle", K(ret), K(id));
} else if (OB_FAIL(apply_service_.add_ls(id))) {
CLOG_LOG(WARN, "failed to add_ls for apply_service", K(ret), K(id));
} else if (OB_FAIL(replay_service_.add_ls(id))) {
CLOG_LOG(WARN, "failed to add_ls for replay_service", K(ret), K(id));
} else if (OB_FAIL(log_handler.init(id.id(), self_, &apply_service_, &replay_service_,
&role_change_service_, palf_handle, palf_env_, loc_cache_cb, &rpc_proxy_))) {
CLOG_LOG(WARN, "ObLogHandler init failed", K(ret), K(id), KP(palf_env_), K(palf_handle));
&role_change_service_, palf_env_, loc_cache_cb, &rpc_proxy_))) {
CLOG_LOG(WARN, "ObLogHandler init failed", K(ret), K(id), KP(palf_env_));
} else if (OB_FAIL(restore_handler.init(id.id(), palf_env_))) {
CLOG_LOG(WARN, "ObLogRestoreHandler init failed", K(ret), K(id), KP(palf_env_));
} else if (OB_FAIL(log_handler_palf_handle.register_role_change_cb(rc_cb))) {
@ -435,12 +432,6 @@ int ObLogService::add_ls(const ObLSID &id,
FLOG_INFO("add_ls success", K(ret), K(id), KP(this));
}
if (OB_FAIL(ret)) {
if (true == palf_handle.is_valid() && false == log_handler.is_valid()) {
palf_env_->close(palf_handle);
}
}
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_LS_NOT_EXIST;
}
@ -656,36 +647,51 @@ int ObLogService::create_ls_(const share::ObLSID &id,
PalfLocationCacheCb *loc_cache_cb = &location_adapter_;
const bool is_arb_replica = (replica_type == REPLICA_TYPE_ARBITRATION);
PalfHandle &log_handler_palf_handle = log_handler.palf_handle_;
bool palf_exist = true;
if (false == id.is_valid() ||
INVALID_TENANT_ROLE == tenant_role ||
false == palf_base_info.is_valid()) {
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(WARN, "invalid argument", K(ret), K(id), K(id), K(tenant_role), K(palf_base_info));
} else if (!is_arb_replica &&
OB_FAIL(palf_env_->create(id.id(), get_palf_access_mode(tenant_role), palf_base_info, palf_handle))) {
CLOG_LOG(WARN, "failed to get palf_handle", K(ret), K(id), K(replica_type));
} else if (false == allow_log_sync && OB_FAIL(palf_handle.disable_sync())) {
CLOG_LOG(WARN, "failed to disable_sync", K(ret), K(id));
} else if (OB_FAIL(apply_service_.add_ls(id))) {
CLOG_LOG(WARN, "failed to add_ls for apply engine", K(ret), K(id));
} else if (OB_FAIL(replay_service_.add_ls(id))) {
CLOG_LOG(WARN, "failed to add_ls", K(ret), K(id));
} else if (OB_FAIL(log_handler.init(id.id(), self_, &apply_service_, &replay_service_,
&role_change_service_, palf_handle, palf_env_, loc_cache_cb, &rpc_proxy_))) {
CLOG_LOG(WARN, "ObLogHandler init failed", K(ret), KP(palf_env_), K(palf_handle));
} else if (OB_FAIL(restore_handler.init(id.id(), palf_env_))) {
CLOG_LOG(WARN, "ObLogRestoreHandler init failed", K(ret), K(id), KP(palf_env_));
} else if (OB_FAIL(log_handler_palf_handle.register_role_change_cb(rc_cb))) {
CLOG_LOG(WARN, "register_role_change_cb failed", K(ret), K(id));
} else if (OB_FAIL(log_handler_palf_handle.set_location_cache_cb(loc_cache_cb))) {
CLOG_LOG(WARN, "set_location_cache_cb failed", K(ret), K(id));
CLOG_LOG(WARN, "invalid argument", K(ret), K(id), K(tenant_role), K(palf_base_info));
} else if (OB_FAIL(check_palf_exist(id, palf_exist))) {
CLOG_LOG(WARN, "check_palf_exist failed", K(ret), K(id), K(tenant_role), K(palf_base_info));
} else if (palf_exist) {
ret = OB_ENTRY_EXIST;
CLOG_LOG(WARN, "palf has eixst", K(ret), K(id), K(tenant_role), K(palf_base_info));
} else {
CLOG_LOG(INFO, "ObLogService create_ls success", K(ret), K(id), K(log_handler));
}
if (OB_FAIL(ret)) {
if (true == palf_handle.is_valid() && false == log_handler.is_valid()) {
if (!is_arb_replica &&
OB_FAIL(palf_env_->create(id.id(), get_palf_access_mode(tenant_role), palf_base_info, palf_handle))) {
CLOG_LOG(WARN, "failed to get palf_handle", K(ret), K(id), K(replica_type));
} else if (false == allow_log_sync && OB_FAIL(palf_handle.disable_sync())) {
CLOG_LOG(WARN, "failed to disable_sync", K(ret), K(id));
} else if (OB_FAIL(apply_service_.add_ls(id))) {
CLOG_LOG(WARN, "failed to add_ls for apply engine", K(ret), K(id));
} else if (OB_FAIL(replay_service_.add_ls(id))) {
CLOG_LOG(WARN, "failed to add_ls", K(ret), K(id));
} else if (OB_FAIL(log_handler.init(id.id(), self_, &apply_service_, &replay_service_,
&role_change_service_, palf_env_, loc_cache_cb, &rpc_proxy_))) {
CLOG_LOG(WARN, "ObLogHandler init failed", K(ret), KP(palf_env_), K(palf_handle));
} else if (OB_FAIL(restore_handler.init(id.id(), palf_env_))) {
CLOG_LOG(WARN, "ObLogRestoreHandler init failed", K(ret), K(id), KP(palf_env_));
} else if (OB_FAIL(log_handler_palf_handle.register_role_change_cb(rc_cb))) {
CLOG_LOG(WARN, "register_role_change_cb failed", K(ret), K(id));
} else if (OB_FAIL(log_handler_palf_handle.set_location_cache_cb(loc_cache_cb))) {
CLOG_LOG(WARN, "set_location_cache_cb failed", K(ret), K(id));
} else {
CLOG_LOG(INFO, "ObLogService create_ls success", K(ret), K(id), K(log_handler));
}
if (palf_handle.is_valid() && nullptr != palf_env_) {
palf_env_->close(palf_handle);
}
if (OB_FAIL(ret)) {
CLOG_LOG(ERROR, "create_ls failed!!!", KR(ret), K(id));
restore_handler.destroy();
replay_service_.remove_ls(id);
apply_service_.remove_ls(id);
log_handler.destroy();
palf_env_->close(palf_handle);
palf_env_->remove(id.id());
}
}
return ret;
}

View File

@ -1801,6 +1801,16 @@ int ObMultiTenant::clear_persistent_data(const uint64_t tenant_id)
} else if (OB_FAIL(FileDirectoryUtils::is_exists(tenant_clog_dir, exist))) {
LOG_WARN("fail to check exist", K(ret));
} else if (exist) {
// defense code begin
int tmp_ret = OB_SUCCESS;
bool directory_empty = true;
if (OB_TMP_FAIL(FileDirectoryUtils::is_empty_directory(tenant_clog_dir, directory_empty))) {
LOG_WARN("fail to check directory whether is empty", KR(tmp_ret), K(tenant_clog_dir));
}
if (!directory_empty) {
LOG_DBA_ERROR(OB_ERR_UNEXPECTED, "msg", "clog directory must be empty when delete tenant", K(tenant_clog_dir));
}
// defense code end
if (OB_FAIL(FileDirectoryUtils::delete_directory_rec(tenant_clog_dir))) {
LOG_WARN("fail to delete clog dir", K(ret), K(tenant_clog_dir));
}