diff --git a/mittest/logservice/test_ob_simple_log_single_replica_func.cpp b/mittest/logservice/test_ob_simple_log_single_replica_func.cpp index 9e9c859c10..e595e15748 100644 --- a/mittest/logservice/test_ob_simple_log_single_replica_func.cpp +++ b/mittest/logservice/test_ob_simple_log_single_replica_func.cpp @@ -1825,7 +1825,7 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_iow_memleak) // case2: palf epoch has been changed during after_consume IOTaskConsumeCond consume_cond(id, leader.palf_env_impl_->last_palf_epoch_); EXPECT_EQ(OB_SUCCESS, iow->submit_io_task(&consume_cond)); - sleep(1); + sleep(3); EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->set_base_lsn(end_lsn)); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE)); EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->log_engine_.submit_purge_throttling_task(PurgeThrottlingType::PURGE_BY_GET_MC_REQ)); @@ -1853,6 +1853,43 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_iow_memleak) } } +TEST_F(TestObSimpleLogClusterSingleReplica, test_log_service_interface) +{ + SET_CASE_LOG_FILE(TEST_NAME, "test_log_service_interface"); + int64_t id = ATOMIC_AAF(&palf_id_, 1); + ObSimpleLogServer *log_server = dynamic_cast(get_cluster()[0]); + ASSERT_NE(nullptr, log_server); + ObLogService *log_service = &log_server->log_service_; + ObTenantRole tenant_role; tenant_role.value_ = ObTenantRole::Role::PRIMARY_TENANT; + PalfBaseInfo palf_base_info; palf_base_info.generate_by_default(); + ObLogHandler log_handler; ObLogRestoreHandler restore_handler; + ObLogApplyService *apply_service = &log_service->apply_service_; + ObReplicaType replica_type; + ObLSID ls_id(id); + ObApplyStatus *apply_status = nullptr; + ASSERT_NE(nullptr, apply_status = static_cast(mtl_malloc(sizeof(ObApplyStatus), "mittest"))); + new (apply_status) ObApplyStatus(); + apply_status->inc_ref(); + EXPECT_EQ(OB_SUCCESS, log_service->start()); + EXPECT_EQ(OB_SUCCESS, apply_service->apply_status_map_.insert(ls_id, apply_status)); + apply_service->is_running_ = true; + EXPECT_EQ(OB_ENTRY_EXIST, log_service->create_ls(ls_id, REPLICA_TYPE_FULL, tenant_role, palf_base_info, true, log_handler, restore_handler)); + bool is_exist = false; + EXPECT_EQ(OB_SUCCESS, log_service->check_palf_exist(ls_id, is_exist)); + EXPECT_EQ(is_exist, false); + EXPECT_EQ(OB_ENTRY_NOT_EXIST, apply_service->apply_status_map_.erase(ls_id)); + EXPECT_EQ(OB_SUCCESS, log_service->create_ls(ls_id, REPLICA_TYPE_FULL, tenant_role, palf_base_info, true, log_handler, restore_handler)); + EXPECT_EQ(OB_ENTRY_EXIST, log_service->create_ls(ls_id, REPLICA_TYPE_FULL, tenant_role, palf_base_info, true, log_handler, restore_handler)); + EXPECT_EQ(OB_SUCCESS, log_service->check_palf_exist(ls_id, is_exist)); + EXPECT_EQ(is_exist, true); + const char *log_dir = log_service->palf_env_->palf_env_impl_.log_dir_; + bool result = false; + EXPECT_EQ(OB_SUCCESS, FileDirectoryUtils::is_empty_directory(log_dir, result)); + EXPECT_EQ(false, result); + EXPECT_EQ(OB_SUCCESS, log_service->remove_ls(ls_id, log_handler, restore_handler)); + EXPECT_EQ(OB_SUCCESS, log_service->check_palf_exist(ls_id, is_exist)); +} + } // namespace unittest } // namespace oceanbase diff --git a/src/logservice/ob_log_handler.cpp b/src/logservice/ob_log_handler.cpp index 88186d98b0..49e5ff6d8c 100755 --- a/src/logservice/ob_log_handler.cpp +++ b/src/logservice/ob_log_handler.cpp @@ -62,7 +62,6 @@ int ObLogHandler::init(const int64_t id, ObLogApplyService *apply_service, ObLogReplayService *replay_service, ObRoleChangeService *rc_service, - PalfHandle &palf_handle, PalfEnv *palf_env, PalfLocationCacheCb *lc_cb, obrpc::ObLogServiceRpcProxy *rpc_proxy) @@ -73,18 +72,19 @@ int ObLogHandler::init(const int64_t id, share::ObLSID ls_id(id); if (IS_INIT) { ret = OB_INIT_TWICE; - } else if (false == palf_handle.is_valid() || - OB_ISNULL(palf_env) || + } else if (OB_ISNULL(palf_env) || OB_ISNULL(apply_service) || OB_ISNULL(lc_cb) || OB_ISNULL(rpc_proxy)) { ret = OB_INVALID_ARGUMENT; - CLOG_LOG(WARN, "invalid arguments", K(palf_handle), KP(palf_env), KP(lc_cb), KP(rpc_proxy)); + CLOG_LOG(WARN, "invalid arguments", K(id), KP(palf_env), KP(lc_cb), KP(rpc_proxy)); } else if (OB_FAIL(apply_service->get_apply_status(ls_id, guard))) { CLOG_LOG(WARN, "guard get apply status failed", K(ret), K(id)); } else if (NULL == (apply_status_ = guard.get_apply_status())) { ret = OB_ERR_UNEXPECTED; CLOG_LOG(WARN, "apply status is not exist", K(ret), K(id)); + } else if (OB_FAIL(palf_env->open(id, palf_handle_))) { + CLOG_LOG(WARN, "open palf failed", K(ret), K(id)); } else { get_max_decided_scn_debug_time_ = OB_INVALID_TIMESTAMP; apply_service_ = apply_service; @@ -95,7 +95,6 @@ int ObLogHandler::init(const int64_t id, append_cost_stat_.set_extra_info(EXTRA_INFOS); id_ = id; self_ = self; - palf_handle_ = palf_handle; palf_env_ = palf_env; role_ = FOLLOWER; lc_cb_ = lc_cb; @@ -103,7 +102,10 @@ int ObLogHandler::init(const int64_t id, is_in_stop_state_ = false; is_offline_ = true; // offline at default. is_inited_ = true; - FLOG_INFO("ObLogHandler init success", K(id), K(palf_handle)); + FLOG_INFO("ObLogHandler init success", K(id)); + } + if (OB_FAIL(ret) && OB_INIT_TWICE != ret) { + destroy(); } return ret; } @@ -180,26 +182,25 @@ int ObLogHandler::safe_to_destroy(bool &is_safe_destroy) void ObLogHandler::destroy() { WLockGuard guard(lock_); - int ret = OB_SUCCESS; - if (IS_INIT) { - is_inited_ = false; - is_offline_ = false; - is_in_stop_state_ = true; - common::TCWLockGuard deps_guard(deps_lock_); + is_inited_ = false; + is_offline_ = false; + is_in_stop_state_ = true; + common::TCWLockGuard deps_guard(deps_lock_); + if (NULL != apply_service_ && NULL != apply_status_) { apply_service_->revert_apply_status(apply_status_); - apply_status_ = NULL; - apply_service_ = NULL; - replay_service_ = NULL; - if (true == palf_handle_.is_valid()) { - palf_env_->close(palf_handle_); - } - rc_service_ = NULL; - lc_cb_ = NULL; - rpc_proxy_ = NULL; - palf_env_ = NULL; - id_ = -1; - get_max_decided_scn_debug_time_ = OB_INVALID_TIMESTAMP; } + apply_status_ = NULL; + apply_service_ = NULL; + replay_service_ = NULL; + if (NULL != palf_env_ && true == palf_handle_.is_valid()) { + palf_env_->close(palf_handle_); + } + rc_service_ = NULL; + lc_cb_ = NULL; + rpc_proxy_ = NULL; + palf_env_ = NULL; + id_ = -1; + get_max_decided_scn_debug_time_ = OB_INVALID_TIMESTAMP; } int ObLogHandler::append(const void *buffer, diff --git a/src/logservice/ob_log_handler.h b/src/logservice/ob_log_handler.h index 1945598135..663fd2bf79 100755 --- a/src/logservice/ob_log_handler.h +++ b/src/logservice/ob_log_handler.h @@ -196,7 +196,6 @@ public: ObLogApplyService *apply_service, ObLogReplayService *replay_service, ObRoleChangeService *rc_service, - palf::PalfHandle &palf_handle, palf::PalfEnv *palf_env, palf::PalfLocationCacheCb *lc_cb, obrpc::ObLogServiceRpcProxy *rpc_proxy); diff --git a/src/logservice/ob_log_service.cpp b/src/logservice/ob_log_service.cpp index 0d060f73a7..2520373edd 100644 --- a/src/logservice/ob_log_service.cpp +++ b/src/logservice/ob_log_service.cpp @@ -409,22 +409,19 @@ int ObLogService::add_ls(const ObLSID &id, ObLogRestoreHandler &restore_handler) { int ret = OB_SUCCESS; - PalfHandle palf_handle; PalfHandle &log_handler_palf_handle = log_handler.palf_handle_; PalfRoleChangeCb *rc_cb = &role_change_service_; PalfLocationCacheCb *loc_cache_cb = &location_adapter_; if (IS_NOT_INIT) { ret = OB_NOT_INIT; CLOG_LOG(WARN, "log_service is not inited", K(ret), K(id)); - } else if (OB_FAIL(palf_env_->open(id.id(), palf_handle))) { - CLOG_LOG(WARN, "failed to get palf_handle", K(ret), K(id)); } else if (OB_FAIL(apply_service_.add_ls(id))) { CLOG_LOG(WARN, "failed to add_ls for apply_service", K(ret), K(id)); } else if (OB_FAIL(replay_service_.add_ls(id))) { CLOG_LOG(WARN, "failed to add_ls for replay_service", K(ret), K(id)); } else if (OB_FAIL(log_handler.init(id.id(), self_, &apply_service_, &replay_service_, - &role_change_service_, palf_handle, palf_env_, loc_cache_cb, &rpc_proxy_))) { - CLOG_LOG(WARN, "ObLogHandler init failed", K(ret), K(id), KP(palf_env_), K(palf_handle)); + &role_change_service_, palf_env_, loc_cache_cb, &rpc_proxy_))) { + CLOG_LOG(WARN, "ObLogHandler init failed", K(ret), K(id), KP(palf_env_)); } else if (OB_FAIL(restore_handler.init(id.id(), palf_env_))) { CLOG_LOG(WARN, "ObLogRestoreHandler init failed", K(ret), K(id), KP(palf_env_)); } else if (OB_FAIL(log_handler_palf_handle.register_role_change_cb(rc_cb))) { @@ -435,12 +432,6 @@ int ObLogService::add_ls(const ObLSID &id, FLOG_INFO("add_ls success", K(ret), K(id), KP(this)); } - if (OB_FAIL(ret)) { - if (true == palf_handle.is_valid() && false == log_handler.is_valid()) { - palf_env_->close(palf_handle); - } - } - if (OB_ENTRY_NOT_EXIST == ret) { ret = OB_LS_NOT_EXIST; } @@ -656,36 +647,51 @@ int ObLogService::create_ls_(const share::ObLSID &id, PalfLocationCacheCb *loc_cache_cb = &location_adapter_; const bool is_arb_replica = (replica_type == REPLICA_TYPE_ARBITRATION); PalfHandle &log_handler_palf_handle = log_handler.palf_handle_; + bool palf_exist = true; if (false == id.is_valid() || INVALID_TENANT_ROLE == tenant_role || false == palf_base_info.is_valid()) { ret = OB_INVALID_ARGUMENT; - CLOG_LOG(WARN, "invalid argument", K(ret), K(id), K(id), K(tenant_role), K(palf_base_info)); - } else if (!is_arb_replica && - OB_FAIL(palf_env_->create(id.id(), get_palf_access_mode(tenant_role), palf_base_info, palf_handle))) { - CLOG_LOG(WARN, "failed to get palf_handle", K(ret), K(id), K(replica_type)); - } else if (false == allow_log_sync && OB_FAIL(palf_handle.disable_sync())) { - CLOG_LOG(WARN, "failed to disable_sync", K(ret), K(id)); - } else if (OB_FAIL(apply_service_.add_ls(id))) { - CLOG_LOG(WARN, "failed to add_ls for apply engine", K(ret), K(id)); - } else if (OB_FAIL(replay_service_.add_ls(id))) { - CLOG_LOG(WARN, "failed to add_ls", K(ret), K(id)); - } else if (OB_FAIL(log_handler.init(id.id(), self_, &apply_service_, &replay_service_, - &role_change_service_, palf_handle, palf_env_, loc_cache_cb, &rpc_proxy_))) { - CLOG_LOG(WARN, "ObLogHandler init failed", K(ret), KP(palf_env_), K(palf_handle)); - } else if (OB_FAIL(restore_handler.init(id.id(), palf_env_))) { - CLOG_LOG(WARN, "ObLogRestoreHandler init failed", K(ret), K(id), KP(palf_env_)); - } else if (OB_FAIL(log_handler_palf_handle.register_role_change_cb(rc_cb))) { - CLOG_LOG(WARN, "register_role_change_cb failed", K(ret), K(id)); - } else if (OB_FAIL(log_handler_palf_handle.set_location_cache_cb(loc_cache_cb))) { - CLOG_LOG(WARN, "set_location_cache_cb failed", K(ret), K(id)); + CLOG_LOG(WARN, "invalid argument", K(ret), K(id), K(tenant_role), K(palf_base_info)); + } else if (OB_FAIL(check_palf_exist(id, palf_exist))) { + CLOG_LOG(WARN, "check_palf_exist failed", K(ret), K(id), K(tenant_role), K(palf_base_info)); + } else if (palf_exist) { + ret = OB_ENTRY_EXIST; + CLOG_LOG(WARN, "palf has eixst", K(ret), K(id), K(tenant_role), K(palf_base_info)); } else { - CLOG_LOG(INFO, "ObLogService create_ls success", K(ret), K(id), K(log_handler)); - } - if (OB_FAIL(ret)) { - if (true == palf_handle.is_valid() && false == log_handler.is_valid()) { + if (!is_arb_replica && + OB_FAIL(palf_env_->create(id.id(), get_palf_access_mode(tenant_role), palf_base_info, palf_handle))) { + CLOG_LOG(WARN, "failed to get palf_handle", K(ret), K(id), K(replica_type)); + } else if (false == allow_log_sync && OB_FAIL(palf_handle.disable_sync())) { + CLOG_LOG(WARN, "failed to disable_sync", K(ret), K(id)); + } else if (OB_FAIL(apply_service_.add_ls(id))) { + CLOG_LOG(WARN, "failed to add_ls for apply engine", K(ret), K(id)); + } else if (OB_FAIL(replay_service_.add_ls(id))) { + CLOG_LOG(WARN, "failed to add_ls", K(ret), K(id)); + } else if (OB_FAIL(log_handler.init(id.id(), self_, &apply_service_, &replay_service_, + &role_change_service_, palf_env_, loc_cache_cb, &rpc_proxy_))) { + CLOG_LOG(WARN, "ObLogHandler init failed", K(ret), KP(palf_env_), K(palf_handle)); + } else if (OB_FAIL(restore_handler.init(id.id(), palf_env_))) { + CLOG_LOG(WARN, "ObLogRestoreHandler init failed", K(ret), K(id), KP(palf_env_)); + } else if (OB_FAIL(log_handler_palf_handle.register_role_change_cb(rc_cb))) { + CLOG_LOG(WARN, "register_role_change_cb failed", K(ret), K(id)); + } else if (OB_FAIL(log_handler_palf_handle.set_location_cache_cb(loc_cache_cb))) { + CLOG_LOG(WARN, "set_location_cache_cb failed", K(ret), K(id)); + } else { + CLOG_LOG(INFO, "ObLogService create_ls success", K(ret), K(id), K(log_handler)); + } + if (palf_handle.is_valid() && nullptr != palf_env_) { palf_env_->close(palf_handle); } + if (OB_FAIL(ret)) { + CLOG_LOG(ERROR, "create_ls failed!!!", KR(ret), K(id)); + restore_handler.destroy(); + replay_service_.remove_ls(id); + apply_service_.remove_ls(id); + log_handler.destroy(); + palf_env_->close(palf_handle); + palf_env_->remove(id.id()); + } } return ret; } diff --git a/src/observer/omt/ob_multi_tenant.cpp b/src/observer/omt/ob_multi_tenant.cpp index be6ba1894a..d1088d7095 100644 --- a/src/observer/omt/ob_multi_tenant.cpp +++ b/src/observer/omt/ob_multi_tenant.cpp @@ -1801,6 +1801,16 @@ int ObMultiTenant::clear_persistent_data(const uint64_t tenant_id) } else if (OB_FAIL(FileDirectoryUtils::is_exists(tenant_clog_dir, exist))) { LOG_WARN("fail to check exist", K(ret)); } else if (exist) { + // defense code begin + int tmp_ret = OB_SUCCESS; + bool directory_empty = true; + if (OB_TMP_FAIL(FileDirectoryUtils::is_empty_directory(tenant_clog_dir, directory_empty))) { + LOG_WARN("fail to check directory whether is empty", KR(tmp_ret), K(tenant_clog_dir)); + } + if (!directory_empty) { + LOG_DBA_ERROR(OB_ERR_UNEXPECTED, "msg", "clog directory must be empty when delete tenant", K(tenant_clog_dir)); + } + // defense code end if (OB_FAIL(FileDirectoryUtils::delete_directory_rec(tenant_clog_dir))) { LOG_WARN("fail to delete clog dir", K(ret), K(tenant_clog_dir)); }