From fe94e4d34113e0db64522485771226b89db867d6 Mon Sep 17 00:00:00 2001 From: obdev Date: Thu, 2 Mar 2023 15:39:03 +0000 Subject: [PATCH] Fix table load resource leak when tenant are evicted --- .../table_load/ob_table_load_coordinator.cpp | 27 ++- .../ob_table_load_coordinator_ctx.cpp | 92 +++----- .../ob_table_load_coordinator_ctx.h | 43 ++-- .../table_load/ob_table_load_instance.cpp | 6 +- .../table_load/ob_table_load_manager.cpp | 42 ++++ .../table_load/ob_table_load_manager.h | 5 +- .../table_load/ob_table_load_service.cpp | 64 ++++- .../table_load/ob_table_load_service.h | 4 + .../table_load/ob_table_load_store.cpp | 17 +- .../table_load/ob_table_load_store_ctx.cpp | 218 ++++++++---------- .../table_load/ob_table_load_store_ctx.h | 42 ++-- .../table_load/ob_table_load_table_ctx.cpp | 32 ++- 12 files changed, 355 insertions(+), 237 deletions(-) diff --git a/src/observer/table_load/ob_table_load_coordinator.cpp b/src/observer/table_load/ob_table_load_coordinator.cpp index a2057e6b66..f5fcf64f31 100644 --- a/src/observer/table_load/ob_table_load_coordinator.cpp +++ b/src/observer/table_load/ob_table_load_coordinator.cpp @@ -298,11 +298,14 @@ int ObTableLoadCoordinator::begin() LOG_WARN("ObTableLoadCoordinator not init", KR(ret), KP(this)); } else { LOG_INFO("coordinator begin"); - if (OB_FAIL(pre_begin_peers())) { + obsys::ObWLockGuard guard(coordinator_ctx_->get_status_lock()); + if (OB_FAIL(coordinator_ctx_->check_status_unlock(ObTableLoadStatusType::INITED))) { + LOG_WARN("fail to check status", KR(ret)); + } else if (OB_FAIL(pre_begin_peers())) { LOG_WARN("fail to pre begin peers", KR(ret)); } else if (OB_FAIL(confirm_begin_peers())) { LOG_WARN("fail to confirm begin peers", KR(ret)); - } else if (OB_FAIL(coordinator_ctx_->set_status_loading())) { + } else if (OB_FAIL(coordinator_ctx_->set_status_loading_unlock())) { LOG_WARN("fail to set coordinator status loading", KR(ret)); } } @@ -393,8 +396,9 @@ int ObTableLoadCoordinator::finish() LOG_INFO("coordinator finish"); bool active_trans_exist = false; bool committed_trans_eixst = false; + obsys::ObWLockGuard guard(coordinator_ctx_->get_status_lock()); // 1. 冻结状态, 防止后续继续创建trans - if (OB_FAIL(coordinator_ctx_->set_status_frozen())) { + if (OB_FAIL(coordinator_ctx_->set_status_frozen_unlock())) { LOG_WARN("fail to set coordinator status frozen", KR(ret)); } // 2. 检查当前是否还有trans没有结束 @@ -420,7 +424,7 @@ int ObTableLoadCoordinator::finish() LOG_WARN("fail to start merge peers", KR(ret)); } // 5. 设置当前状态为合并中 - else if (OB_FAIL(coordinator_ctx_->set_status_merging())) { + else if (OB_FAIL(coordinator_ctx_->set_status_merging_unlock())) { LOG_WARN("fail to set coordinator status merging", KR(ret)); } // 6. 添加定时任务检查合并结果 @@ -650,8 +654,9 @@ int ObTableLoadCoordinator::commit(ObExecContext *exec_ctx, ObSQLSessionInfo &se LOG_WARN("ObTableLoadCoordinator not init", KR(ret), KP(this)); } else { LOG_INFO("coordinator commit"); + obsys::ObWLockGuard guard(coordinator_ctx_->get_status_lock()); ObTableLoadSqlStatistics sql_statistics; - if (OB_FAIL(coordinator_ctx_->check_status(ObTableLoadStatusType::MERGED))) { + if (OB_FAIL(coordinator_ctx_->check_status_unlock(ObTableLoadStatusType::MERGED))) { LOG_WARN("fail to check coordinator status", KR(ret)); } else if (OB_FAIL(commit_peers(sql_statistics))) { LOG_WARN("fail to commit peers", KR(ret)); @@ -660,7 +665,7 @@ int ObTableLoadCoordinator::commit(ObExecContext *exec_ctx, ObSQLSessionInfo &se LOG_WARN("fail to drive sql stat", KR(ret)); } else if (OB_FAIL(commit_redef_table(session_info))) { LOG_WARN("fail to commit redef table", KR(ret)); - } else if (OB_FAIL(coordinator_ctx_->set_status_commit())) { + } else if (OB_FAIL(coordinator_ctx_->set_status_commit_unlock())) { LOG_WARN("fail to set coordinator status commit", KR(ret)); } else { result_info = coordinator_ctx_->result_info_; @@ -679,8 +684,9 @@ int ObTableLoadCoordinator::px_commit_data(ObExecContext *exec_ctx) LOG_WARN("ObTableLoadCoordinator not init", KR(ret), KP(this)); } else { LOG_INFO("coordinator px_commit_data"); + obsys::ObRLockGuard guard(coordinator_ctx_->get_status_lock()); ObTableLoadSqlStatistics sql_statistics; - if (OB_FAIL(coordinator_ctx_->check_status(ObTableLoadStatusType::MERGED))) { + if (OB_FAIL(coordinator_ctx_->check_status_unlock(ObTableLoadStatusType::MERGED))) { LOG_WARN("fail to check coordinator status", KR(ret)); } else if (OB_FAIL(commit_peers(sql_statistics))) { LOG_WARN("fail to commit peers", KR(ret)); @@ -701,9 +707,12 @@ int ObTableLoadCoordinator::px_commit_ddl(ObSQLSessionInfo &session_info) LOG_WARN("ObTableLoadCoordinator not init", KR(ret), KP(this)); } else { LOG_INFO("coordinator px_commit_ddl"); - if (OB_FAIL(commit_redef_table(session_info))) { + obsys::ObWLockGuard guard(coordinator_ctx_->get_status_lock()); + if (OB_FAIL(coordinator_ctx_->check_status_unlock(ObTableLoadStatusType::MERGED))) { + LOG_WARN("fail to check coordinator status", KR(ret)); + } else if (OB_FAIL(commit_redef_table(session_info))) { LOG_WARN("fail to commit redef table", KR(ret)); - } else if (OB_FAIL(coordinator_ctx_->set_status_commit())) { + } else if (OB_FAIL(coordinator_ctx_->set_status_commit_unlock())) { LOG_WARN("fail to set coordinator status commit", KR(ret)); } } diff --git a/src/observer/table_load/ob_table_load_coordinator_ctx.cpp b/src/observer/table_load/ob_table_load_coordinator_ctx.cpp index 23ed6f736d..c90ee0b812 100644 --- a/src/observer/table_load/ob_table_load_coordinator_ctx.cpp +++ b/src/observer/table_load/ob_table_load_coordinator_ctx.cpp @@ -172,34 +172,24 @@ int ObTableLoadCoordinatorCtx::generate_credential(uint64_t user_id) return ret; } -int ObTableLoadCoordinatorCtx::advance_status(ObTableLoadStatusType status) +int ObTableLoadCoordinatorCtx::advance_status_unlock(ObTableLoadStatusType status) { int ret = OB_SUCCESS; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - LOG_WARN("ObTableLoadCoordinatorCtx not init", KR(ret)); - } else if (OB_UNLIKELY(ObTableLoadStatusType::ERROR == status || - ObTableLoadStatusType::ABORT == status)) { + if (OB_UNLIKELY(ObTableLoadStatusType::NONE == status || ObTableLoadStatusType::ERROR == status || + ObTableLoadStatusType::ABORT == status)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(status)); - } else { - obsys::ObWLockGuard guard(rwlock_); - if (OB_UNLIKELY(ObTableLoadStatusType::ERROR == status_)) { - ret = error_code_; - LOG_WARN("coordinator has error", KR(ret)); - } else if (OB_UNLIKELY(ObTableLoadStatusType::ABORT == status_)) { - ret = OB_TRANS_KILLED; - LOG_WARN("coordinator is abort", KR(ret)); - } - // 正常运行阶段, 状态是一步步推进的 - else if (OB_UNLIKELY(static_cast(status) != static_cast(status_) + 1)) { - ret = OB_STATE_NOT_MATCH; - LOG_WARN("unexpected status", KR(ret), K(status), K(status_)); - } else { - status_ = status; - table_load_status_to_string(status_, ctx_->job_stat_->coordinator.status_); - LOG_INFO("LOAD DATA COORDINATOR advance status", K(status)); - } + } + // normally, the state is advanced step by step + else if (OB_UNLIKELY(static_cast(status) != static_cast(status_) + 1)) { + ret = OB_STATE_NOT_MATCH; + LOG_WARN("unexpected status", KR(ret), K(status), K(status_)); + } + // advance status + else { + status_ = status; + table_load_status_to_string(status_, ctx_->job_stat_->coordinator.status_); + LOG_INFO("LOAD DATA COORDINATOR advance status", K(status)); } return ret; } @@ -207,17 +197,16 @@ int ObTableLoadCoordinatorCtx::advance_status(ObTableLoadStatusType status) int ObTableLoadCoordinatorCtx::set_status_error(int error_code) { int ret = OB_SUCCESS; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - LOG_WARN("ObTableLoadCoordinatorCtx not init", KR(ret)); - } else if (OB_UNLIKELY(OB_SUCCESS == error_code)) { + if (OB_UNLIKELY(OB_SUCCESS == error_code)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(error_code)); } else { - obsys::ObWLockGuard guard(rwlock_); - if (OB_UNLIKELY(status_ == ObTableLoadStatusType::ABORT)) { - ret = OB_TRANS_KILLED; - } else if (status_ != ObTableLoadStatusType::ERROR) { + obsys::ObWLockGuard guard(status_lock_); + if (status_ == ObTableLoadStatusType::ERROR) { + // ignore + } else if (static_cast(status_) > static_cast(ObTableLoadStatusType::ERROR)) { + ret = OB_STATE_NOT_MATCH; + } else { status_ = ObTableLoadStatusType::ERROR; error_code_ = error_code; table_load_status_to_string(status_, ctx_->job_stat_->coordinator.status_); @@ -230,16 +219,13 @@ int ObTableLoadCoordinatorCtx::set_status_error(int error_code) int ObTableLoadCoordinatorCtx::set_status_abort() { int ret = OB_SUCCESS; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - LOG_WARN("ObTableLoadCoordinatorCtx not init", KR(ret)); + obsys::ObWLockGuard guard(status_lock_); + if (ObTableLoadStatusType::ABORT == status_) { + LOG_INFO("LOAD DATA COORDINATOR already abort"); } else { - obsys::ObWLockGuard guard(rwlock_); - if (OB_UNLIKELY(status_ != ObTableLoadStatusType::ABORT)) { - status_ = ObTableLoadStatusType::ABORT; - table_load_status_to_string(status_, ctx_->job_stat_->coordinator.status_); - LOG_INFO("LOAD DATA COORDINATOR status abort"); - } + status_ = ObTableLoadStatusType::ABORT; + table_load_status_to_string(status_, ctx_->job_stat_->coordinator.status_); + LOG_INFO("LOAD DATA COORDINATOR status abort"); } return ret; } @@ -259,19 +245,6 @@ int ObTableLoadCoordinatorCtx::check_status_unlock(ObTableLoadStatusType status) return ret; } -int ObTableLoadCoordinatorCtx::check_status(ObTableLoadStatusType status) const -{ - int ret = OB_SUCCESS; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - LOG_WARN("ObTableLoadCoordinatorCtx not init", KR(ret)); - } else { - obsys::ObRLockGuard guard(rwlock_); - ret = check_status_unlock(status); - } - return ret; -} - int ObTableLoadCoordinatorCtx::alloc_trans_ctx(const ObTableLoadTransId &trans_id, ObTableLoadTransCtx *&trans_ctx) { @@ -336,10 +309,11 @@ int ObTableLoadCoordinatorCtx::start_trans(const ObTableLoadSegmentID &segment_i ret = OB_NOT_INIT; LOG_WARN("ObTableLoadCoordinatorCtx not init", KR(ret)); } else { - obsys::ObWLockGuard guard(rwlock_); + obsys::ObRLockGuard status_guard(status_lock_); if (OB_FAIL(check_status_unlock(ObTableLoadStatusType::LOADING))) { LOG_WARN("fail to check status", KR(ret), K_(status)); } else { + obsys::ObWLockGuard guard(rwlock_); SegmentCtx *segment_ctx = nullptr; if (OB_FAIL(segment_ctx_map_.get(segment_id, segment_ctx))) { if (OB_UNLIKELY(OB_ENTRY_NOT_EXIST != ret)) { @@ -352,7 +326,7 @@ int ObTableLoadCoordinatorCtx::start_trans(const ObTableLoadSegmentID &segment_i } if (OB_SUCC(ret)) { if (OB_UNLIKELY(nullptr != segment_ctx->current_trans_ || - nullptr != segment_ctx->committed_trans_ctx_)) { + nullptr != segment_ctx->committed_trans_ctx_)) { ret = OB_ENTRY_EXIST; LOG_WARN("trans already exist", KR(ret)); } else { @@ -404,6 +378,9 @@ int ObTableLoadCoordinatorCtx::commit_trans(ObTableLoadCoordinatorTrans *trans) segment_ctx->committed_trans_ctx_ = trans->get_trans_ctx(); trans->set_dirty(); } + if (OB_NOT_NULL(segment_ctx)) { + segment_ctx_map_.revert(segment_ctx); + } } return ret; } @@ -437,6 +414,9 @@ int ObTableLoadCoordinatorCtx::abort_trans(ObTableLoadCoordinatorTrans *trans) segment_ctx->current_trans_ = nullptr; trans->set_dirty(); } + if (OB_NOT_NULL(segment_ctx)) { + segment_ctx_map_.revert(segment_ctx); + } } return ret; } diff --git a/src/observer/table_load/ob_table_load_coordinator_ctx.h b/src/observer/table_load/ob_table_load_coordinator_ctx.h index a8f9807df1..32da8e8d6e 100644 --- a/src/observer/table_load/ob_table_load_coordinator_ctx.h +++ b/src/observer/table_load/ob_table_load_coordinator_ctx.h @@ -33,46 +33,56 @@ public: void destroy(); bool is_valid() const { return is_inited_; } public: + OB_INLINE obsys::ObRWLock &get_status_lock() + { + return status_lock_; + } OB_INLINE table::ObTableLoadStatusType get_status() const { - obsys::ObRLockGuard guard(rwlock_); + obsys::ObRLockGuard guard(status_lock_); return status_; } OB_INLINE int get_error_code() const { - obsys::ObRLockGuard guard(rwlock_); + obsys::ObRLockGuard guard(status_lock_); return error_code_; } OB_INLINE int set_status_inited() { - return advance_status(table::ObTableLoadStatusType::INITED); + obsys::ObWLockGuard guard(status_lock_); + return advance_status_unlock(table::ObTableLoadStatusType::INITED); } - OB_INLINE int set_status_loading() + OB_INLINE int set_status_loading_unlock() { - return advance_status(table::ObTableLoadStatusType::LOADING); + return advance_status_unlock(table::ObTableLoadStatusType::LOADING); } - OB_INLINE int set_status_frozen() + OB_INLINE int set_status_frozen_unlock() { - return advance_status(table::ObTableLoadStatusType::FROZEN); + return advance_status_unlock(table::ObTableLoadStatusType::FROZEN); } - OB_INLINE int set_status_merging() + OB_INLINE int set_status_merging_unlock() { - return advance_status(table::ObTableLoadStatusType::MERGING); + return advance_status_unlock(table::ObTableLoadStatusType::MERGING); } OB_INLINE int set_status_merged() { - return advance_status(table::ObTableLoadStatusType::MERGED); + obsys::ObWLockGuard guard(status_lock_); + return advance_status_unlock(table::ObTableLoadStatusType::MERGED); } - OB_INLINE int set_status_commit() + OB_INLINE int set_status_commit_unlock() { - return advance_status(table::ObTableLoadStatusType::COMMIT); + return advance_status_unlock(table::ObTableLoadStatusType::COMMIT); } int set_status_error(int error_code); int set_status_abort(); - int check_status(table::ObTableLoadStatusType status) const; -private: - int advance_status(table::ObTableLoadStatusType status); int check_status_unlock(table::ObTableLoadStatusType status) const; + OB_INLINE int check_status(table::ObTableLoadStatusType status) const + { + obsys::ObRLockGuard guard(status_lock_); + return check_status_unlock(status); + } +private: + int advance_status_unlock(table::ObTableLoadStatusType status); public: int start_trans(const table::ObTableLoadSegmentID &segment_id, ObTableLoadCoordinatorTrans *&trans); @@ -129,9 +139,10 @@ private: ObTableLoadObjectAllocator trans_allocator_; // 多线程安全 uint64_t last_trans_gid_ CACHE_ALIGNED; uint64_t next_session_id_ CACHE_ALIGNED; - mutable obsys::ObRWLock rwlock_; + obsys::ObRWLock status_lock_; table::ObTableLoadStatusType status_; int error_code_; + mutable obsys::ObRWLock rwlock_; TransMap trans_map_; TransCtxMap trans_ctx_map_; SegmentCtxMap segment_ctx_map_; diff --git a/src/observer/table_load/ob_table_load_instance.cpp b/src/observer/table_load/ob_table_load_instance.cpp index 0dcc5faa76..566f135bb2 100644 --- a/src/observer/table_load/ob_table_load_instance.cpp +++ b/src/observer/table_load/ob_table_load_instance.cpp @@ -36,10 +36,12 @@ ObTableLoadInstance::~ObTableLoadInstance() { destroy(); } void ObTableLoadInstance::destroy() { + int ret = OB_SUCCESS; trans_ctx_.reset(); if (nullptr != table_ctx_) { - ObTableLoadService::remove_ctx(table_ctx_); - if (!is_committed_) { + if (OB_FAIL(ObTableLoadService::remove_ctx(table_ctx_))) { + LOG_WARN("table ctx may remove by service", KR(ret), KP(table_ctx_)); + } else if (!is_committed_) { ObTableLoadCoordinator::abort_ctx(table_ctx_, *session_info_); } ObTableLoadService::put_ctx(table_ctx_); diff --git a/src/observer/table_load/ob_table_load_manager.cpp b/src/observer/table_load/ob_table_load_manager.cpp index ec0b2ec285..294b2162e7 100644 --- a/src/observer/table_load/ob_table_load_manager.cpp +++ b/src/observer/table_load/ob_table_load_manager.cpp @@ -130,6 +130,42 @@ int ObTableLoadManager::remove_table_ctx(const ObTableLoadUniqueKey &key) return ret; } +int ObTableLoadManager::remove_all_table_ctx(ObIArray &table_ctx_array) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObTableLoadManager not init", KR(ret), KP(this)); + } else { + table_ctx_array.reset(); + obsys::ObWLockGuard guard(rwlock_); + for (TableCtxMap::const_iterator iter = table_ctx_map_.begin(); + OB_SUCC(ret) && iter != table_ctx_map_.end(); ++iter) { + const ObTableLoadUniqueKey &key = iter->first; + ObTableLoadTableCtx *table_ctx = iter->second; + if (OB_FAIL(add_dirty_list(table_ctx))) { + LOG_WARN("fail to add dirty list", KR(ret), K(key), KP(table_ctx)); + } else if (OB_FAIL(table_ctx_array.push_back(table_ctx))) { + LOG_WARN("fail to push back", KR(ret), K(key)); + } else { + table_ctx->inc_ref_count(); + } + } + if (OB_SUCC(ret)) { + table_ctx_map_.destroy(); + table_handle_map_.destroy(); + } + if (OB_FAIL(ret)) { + for (int64_t i = 0; i < table_ctx_array.count(); ++i) { + ObTableLoadTableCtx *table_ctx = table_ctx_array.at(i); + put_table_ctx(table_ctx); + } + table_ctx_array.reset(); + } + } + return ret; +} + int ObTableLoadManager::get_table_ctx(const ObTableLoadUniqueKey &key, ObTableLoadTableCtx *&table_ctx) { @@ -228,6 +264,12 @@ void ObTableLoadManager::put_table_ctx(ObTableLoadTableCtx *table_ctx) } } +bool ObTableLoadManager::is_dirty_list_empty() const +{ + ObMutexGuard guard(mutex_); + return dirty_list_.is_empty(); +} + int ObTableLoadManager::add_dirty_list(ObTableLoadTableCtx *table_ctx) { int ret = OB_SUCCESS; diff --git a/src/observer/table_load/ob_table_load_manager.h b/src/observer/table_load/ob_table_load_manager.h index c75cfa9b8b..7a10ed3c98 100644 --- a/src/observer/table_load/ob_table_load_manager.h +++ b/src/observer/table_load/ob_table_load_manager.h @@ -25,12 +25,15 @@ public: int add_table_ctx(const ObTableLoadUniqueKey &key, ObTableLoadTableCtx *table_ctx); int remove_table_ctx(const ObTableLoadUniqueKey &key); // table ctx holds a reference count + int remove_all_table_ctx(common::ObIArray &table_ctx_array); + // table ctx holds a reference count int get_table_ctx(const ObTableLoadUniqueKey &key, ObTableLoadTableCtx *&table_ctx); // table ctx holds a reference count int get_table_ctx_by_table_id(uint64_t table_id, ObTableLoadTableCtx *&table_ctx); // all table ctx hold a reference count int get_inactive_table_ctx_list(common::ObIArray &table_ctx_array); void put_table_ctx(ObTableLoadTableCtx *table_ctx); + bool is_dirty_list_empty() const; // table ctx no reference counting int get_releasable_table_ctx_list(common::ObIArray &table_ctx_array); public: @@ -55,7 +58,7 @@ private: mutable obsys::ObRWLock rwlock_; TableCtxMap table_ctx_map_; TableHandleMap table_handle_map_; // index of the latest task - lib::ObMutex mutex_; + mutable lib::ObMutex mutex_; common::ObDList dirty_list_; bool is_inited_; }; diff --git a/src/observer/table_load/ob_table_load_service.cpp b/src/observer/table_load/ob_table_load_service.cpp index f94113ddae..83866c37ec 100644 --- a/src/observer/table_load/ob_table_load_service.cpp +++ b/src/observer/table_load/ob_table_load_service.cpp @@ -5,8 +5,11 @@ #define USING_LOG_PREFIX SERVER #include "observer/table_load/ob_table_load_service.h" +#include "observer/table_load/ob_table_load_coordinator.h" #include "observer/table_load/ob_table_load_schema.h" +#include "observer/table_load/ob_table_load_store.h" #include "observer/table_load/ob_table_load_table_ctx.h" +#include "observer/table_load/ob_table_load_utils.h" #include "share/rc/ob_tenant_base.h" #include "share/schema/ob_table_schema.h" @@ -189,6 +192,9 @@ int ObTableLoadService::add_ctx(ObTableLoadTableCtx *table_ctx) if (OB_ISNULL(service = MTL(ObTableLoadService *))) { ret = OB_ERR_SYS; LOG_WARN("null table load service", KR(ret)); + } else if (service->is_stop_) { + ret = OB_IN_STOP_STATE; + LOG_WARN("service is stop", KR(ret)); } else { ObTableLoadUniqueKey key(table_ctx->param_.table_id_, table_ctx->ddl_param_.task_id_); ret = service->get_manager().add_table_ctx(key, table_ctx); @@ -249,7 +255,7 @@ void ObTableLoadService::put_ctx(ObTableLoadTableCtx *table_ctx) } ObTableLoadService::ObTableLoadService() - : gc_task_(*this), release_task_(*this), is_inited_(false) + : gc_task_(*this), release_task_(*this), is_stop_(false), is_inited_(false) { } @@ -293,6 +299,7 @@ int ObTableLoadService::start() int ObTableLoadService::stop() { int ret = OB_SUCCESS; + is_stop_ = true; gc_timer_.stop(); return ret; } @@ -300,6 +307,8 @@ int ObTableLoadService::stop() void ObTableLoadService::wait() { gc_timer_.wait(); + abort_all_ctx(); + release_all_ctx(); } void ObTableLoadService::destroy() @@ -308,5 +317,58 @@ void ObTableLoadService::destroy() gc_timer_.destroy(); } +void ObTableLoadService::abort_all_ctx() +{ + int ret = OB_SUCCESS; + ObArray table_ctx_array; + if (OB_FAIL(manager_.remove_all_table_ctx(table_ctx_array))) { + LOG_WARN("fail to remove all table ctx list", KR(ret)); + } else { + SMART_VAR(sql::ObSQLSessionInfo, session_info) + { + if (OB_FAIL(ObTableLoadUtils::init_session_info(OB_SERVER_USER_ID, session_info))) { + LOG_WARN("fail to init session info", KR(ret)); + } else { + for (int i = 0; i < table_ctx_array.count(); ++i) { + ObTableLoadTableCtx *table_ctx = table_ctx_array.at(i); + // abort coordinator + if (nullptr != table_ctx->coordinator_ctx_) { + ObTableLoadCoordinator::abort_ctx(table_ctx, session_info); + } + // abort store + else if (nullptr != table_ctx->store_ctx_) { + ObTableLoadStore::abort_ctx(table_ctx); + } + manager_.put_table_ctx(table_ctx); + } + } + } + } +} + +void ObTableLoadService::release_all_ctx() +{ + int ret = OB_SUCCESS; + const uint64_t tenant_id = MTL_ID(); + while (OB_SUCC(ret)) { + ObArray table_ctx_array; + if (OB_FAIL(manager_.get_releasable_table_ctx_list(table_ctx_array))) { + LOG_WARN("fail to get releasable table ctx list", KR(ret)); + } + for (int64_t i = 0; i < table_ctx_array.count(); ++i) { + ObTableLoadTableCtx *table_ctx = table_ctx_array.at(i); + const uint64_t table_id = table_ctx->param_.table_id_; + const uint64_t hidden_table_id = table_ctx->ddl_param_.dest_table_id_; + LOG_INFO("free table ctx", K(tenant_id), K(table_id), K(hidden_table_id), KP(table_ctx)); + ObTableLoadService::free_ctx(table_ctx); + } + if (manager_.is_dirty_list_empty()) { + break; + } else { + ob_usleep(10 * 1000 * 1000); + } + } +} + } // namespace observer } // namespace oceanbase diff --git a/src/observer/table_load/ob_table_load_service.h b/src/observer/table_load/ob_table_load_service.h index 3f15bc9857..b2af97dde0 100644 --- a/src/observer/table_load/ob_table_load_service.h +++ b/src/observer/table_load/ob_table_load_service.h @@ -36,6 +36,9 @@ public: void wait(); void destroy(); ObTableLoadManager &get_manager() { return manager_; } +private: + void abort_all_ctx(); + void release_all_ctx(); private: static const int64_t GC_INTERVAL = 30LL * 1000 * 1000; // 30s static const int64_t RELEASE_INTERVAL = 1LL * 1000 * 1000; // 1s @@ -70,6 +73,7 @@ private: common::ObTimer gc_timer_; ObGCTask gc_task_; ObReleaseTask release_task_; + volatile bool is_stop_; bool is_inited_; }; diff --git a/src/observer/table_load/ob_table_load_store.cpp b/src/observer/table_load/ob_table_load_store.cpp index bd2646513a..eea1b0bf7e 100644 --- a/src/observer/table_load/ob_table_load_store.cpp +++ b/src/observer/table_load/ob_table_load_store.cpp @@ -15,6 +15,7 @@ #include "observer/table_load/ob_table_load_task_scheduler.h" #include "observer/table_load/ob_table_load_trans_store.h" #include "observer/table_load/ob_table_load_utils.h" +#include "storage/direct_load/ob_direct_load_insert_table_ctx.h" namespace oceanbase { @@ -282,14 +283,18 @@ int ObTableLoadStore::commit(ObTableLoadResultInfo &result_info, ObTableLoadSqlS LOG_WARN("ObTableLoadStore not init", KR(ret), KP(this)); } else { LOG_INFO("store commit"); - if (OB_FAIL(store_ctx_->check_status(ObTableLoadStatusType::MERGED))) { + obsys::ObWLockGuard guard(store_ctx_->get_status_lock()); + if (OB_FAIL(store_ctx_->check_status_unlock(ObTableLoadStatusType::MERGED))) { LOG_WARN("fail to check store status", KR(ret)); - } else if (OB_FAIL(store_ctx_->commit())) { - LOG_WARN("fail to commit store", KR(ret)); - } else if (OB_FAIL(store_ctx_->set_status_commit())) { - LOG_WARN("fail to set store status commit", KR(ret)); - } else if (param_.online_opt_stat_gather_ && OB_FAIL(store_ctx_->merger_->collect_sql_statistics(sql_statistics))){ + } else if (OB_FAIL(store_ctx_->insert_table_ctx_->commit())) { + LOG_WARN("fail to commit insert table", KR(ret)); + } else if (ctx_->schema_.has_autoinc_column_ && OB_FAIL(store_ctx_->commit_autoinc_value())) { + LOG_WARN("fail to commit sync auto increment value", KR(ret)); + } else if (param_.online_opt_stat_gather_ && + OB_FAIL(store_ctx_->merger_->collect_sql_statistics(sql_statistics))) { LOG_WARN("fail to collect sql stats", KR(ret)); + } else if (OB_FAIL(store_ctx_->set_status_commit_unlock())) { + LOG_WARN("fail to set store status commit", KR(ret)); } else { result_info = store_ctx_->result_info_; } diff --git a/src/observer/table_load/ob_table_load_store_ctx.cpp b/src/observer/table_load/ob_table_load_store_ctx.cpp index 7081864eec..0a8be47bc9 100644 --- a/src/observer/table_load/ob_table_load_store_ctx.cpp +++ b/src/observer/table_load/ob_table_load_store_ctx.cpp @@ -12,6 +12,8 @@ #include "observer/table_load/ob_table_load_task_scheduler.h" #include "observer/table_load/ob_table_load_trans_store.h" #include "observer/table_load/ob_table_load_utils.h" +#include "share/ob_autoincrement_service.h" +#include "share/sequence/ob_sequence_cache.h" #include "sql/engine/cmd/ob_load_data_utils.h" #include "storage/direct_load/ob_direct_load_data_block.h" #include "storage/direct_load/ob_direct_load_fast_heap_table_ctx.h" @@ -21,8 +23,6 @@ #include "storage/direct_load/ob_direct_load_sstable_index_block.h" #include "storage/direct_load/ob_direct_load_sstable_scan_merge.h" #include "storage/direct_load/ob_direct_load_tmp_file.h" -#include "share/ob_autoincrement_service.h" -#include "share/sequence/ob_sequence_cache.h" namespace oceanbase { @@ -299,34 +299,24 @@ void ObTableLoadStoreCtx::destroy() } } -int ObTableLoadStoreCtx::advance_status(ObTableLoadStatusType status) +int ObTableLoadStoreCtx::advance_status_unlock(ObTableLoadStatusType status) { int ret = OB_SUCCESS; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - LOG_WARN("ObTableLoadStoreCtx not init", KR(ret)); - } else if (OB_UNLIKELY(ObTableLoadStatusType::ERROR == status || - ObTableLoadStatusType::ABORT == status)) { + if (OB_UNLIKELY(ObTableLoadStatusType::NONE == status || ObTableLoadStatusType::ERROR == status || + ObTableLoadStatusType::ABORT == status)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(status)); - } else { - obsys::ObWLockGuard guard(rwlock_); - if (OB_UNLIKELY(ObTableLoadStatusType::ERROR == status_)) { - ret = error_code_; - LOG_WARN("store has error", KR(ret)); - } else if (OB_UNLIKELY(ObTableLoadStatusType::ABORT == status_)) { - ret = OB_TRANS_KILLED; - LOG_WARN("store is abort", KR(ret)); - } - // 正常运行阶段, 状态是一步步推进的 - else if (OB_UNLIKELY(static_cast(status) != static_cast(status_) + 1)) { - ret = OB_STATE_NOT_MATCH; - LOG_WARN("unexpected status", KR(ret), K(status), K(status_)); - } else { - status_ = status; - table_load_status_to_string(status_, ctx_->job_stat_->store.status_); - LOG_INFO("LOAD DATA STORE advance status", K(status)); - } + } + // normally, the state is advanced step by step + else if (OB_UNLIKELY(static_cast(status) != static_cast(status_) + 1)) { + ret = OB_STATE_NOT_MATCH; + LOG_WARN("unexpected status", KR(ret), K(status), K(status_)); + } + // advance status + else { + status_ = status; + table_load_status_to_string(status_, ctx_->job_stat_->store.status_); + LOG_INFO("LOAD DATA STORE advance status", K(status)); } return ret; } @@ -334,17 +324,16 @@ int ObTableLoadStoreCtx::advance_status(ObTableLoadStatusType status) int ObTableLoadStoreCtx::set_status_error(int error_code) { int ret = OB_SUCCESS; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - LOG_WARN("ObTableLoadStoreCtx not init", KR(ret)); - } else if (OB_UNLIKELY(OB_SUCCESS == error_code)) { + if (OB_UNLIKELY(OB_SUCCESS == error_code)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(error_code)); } else { - obsys::ObWLockGuard guard(rwlock_); - if (OB_UNLIKELY(status_ == ObTableLoadStatusType::ABORT)) { - ret = OB_TRANS_KILLED; - } else if (status_ != ObTableLoadStatusType::ERROR) { + obsys::ObWLockGuard guard(status_lock_); + if (status_ == ObTableLoadStatusType::ERROR) { + // ignore + } else if (static_cast(status_) > static_cast(ObTableLoadStatusType::ERROR)) { + ret = OB_STATE_NOT_MATCH; + } else { status_ = ObTableLoadStatusType::ERROR; error_code_ = error_code; table_load_status_to_string(status_, ctx_->job_stat_->store.status_); @@ -357,16 +346,13 @@ int ObTableLoadStoreCtx::set_status_error(int error_code) int ObTableLoadStoreCtx::set_status_abort() { int ret = OB_SUCCESS; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - LOG_WARN("ObTableLoadStoreCtx not init", KR(ret)); + obsys::ObWLockGuard guard(status_lock_); + if (ObTableLoadStatusType::ABORT == status_) { + LOG_INFO("LOAD DATA STORE already abort"); } else { - obsys::ObWLockGuard guard(rwlock_); - if (OB_UNLIKELY(status_ != ObTableLoadStatusType::ABORT)) { - status_ = ObTableLoadStatusType::ABORT; - table_load_status_to_string(status_, ctx_->job_stat_->store.status_); - LOG_INFO("LOAD DATA STORE status abort"); - } + status_ = ObTableLoadStatusType::ABORT; + table_load_status_to_string(status_, ctx_->job_stat_->store.status_); + LOG_INFO("LOAD DATA STORE status abort"); } return ret; } @@ -386,42 +372,6 @@ int ObTableLoadStoreCtx::check_status_unlock(ObTableLoadStatusType status) const return ret; } -int ObTableLoadStoreCtx::check_status(ObTableLoadStatusType status) const -{ - int ret = OB_SUCCESS; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - LOG_WARN("ObTableLoadStoreCtx not init", KR(ret)); - } else { - obsys::ObRLockGuard guard(rwlock_); - ret = check_status_unlock(status); - } - return ret; -} - -int ObTableLoadStoreCtx::alloc_trans_ctx(const ObTableLoadTransId &trans_id, - ObTableLoadTransCtx *&trans_ctx) -{ - int ret = OB_SUCCESS; - trans_ctx = nullptr; - // 分配trans_ctx - if (OB_ISNULL(trans_ctx = ctx_->alloc_trans_ctx(trans_id))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("fail to alloc trans ctx", KR(ret), K(trans_id)); - } - // 把trans_ctx插入map - else if (OB_FAIL(trans_ctx_map_.set_refactored(trans_ctx->trans_id_, trans_ctx))) { - LOG_WARN("fail to set trans ctx", KR(ret), K(trans_ctx->trans_id_)); - } - if (OB_FAIL(ret)) { - if (nullptr != trans_ctx) { - ctx_->free_trans_ctx(trans_ctx); - trans_ctx = nullptr; - } - } - return ret; -} - int ObTableLoadStoreCtx::get_wa_memory_limit(int64_t &wa_mem_limit) { int ret = OB_SUCCESS; @@ -452,34 +402,6 @@ int ObTableLoadStoreCtx::get_wa_memory_limit(int64_t &wa_mem_limit) return ret; } -int ObTableLoadStoreCtx::alloc_trans(const ObTableLoadTransId &trans_id, - ObTableLoadStoreTrans *&trans) -{ - int ret = OB_SUCCESS; - trans = nullptr; - ObTableLoadTransCtx *trans_ctx = nullptr; - // 分配trans_ctx - if (OB_FAIL(alloc_trans_ctx(trans_id, trans_ctx))) { - LOG_WARN("fail to alloc trans ctx", KR(ret), K(trans_id)); - } - // 构造trans - else if (OB_ISNULL(trans = trans_allocator_.alloc(trans_ctx))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("fail to alloc ObTableLoadStoreTrans", KR(ret)); - } else if (OB_FAIL(trans->init())) { - LOG_WARN("fail to init trans", KR(ret), K(trans_id)); - } else if (OB_FAIL(trans_map_.set_refactored(trans_id, trans))) { - LOG_WARN("fail to set_refactored", KR(ret), K(trans_id)); - } - if (OB_FAIL(ret)) { - if (nullptr != trans) { - trans_allocator_.free(trans); - trans = nullptr; - } - } - return ret; -} - int ObTableLoadStoreCtx::generate_autoinc_params(AutoincParam &autoinc_param) { int ret = OB_SUCCESS; @@ -635,6 +557,57 @@ int ObTableLoadStoreCtx::init_session_ctx_array() return ret; } +int ObTableLoadStoreCtx::alloc_trans_ctx(const ObTableLoadTransId &trans_id, + ObTableLoadTransCtx *&trans_ctx) +{ + int ret = OB_SUCCESS; + trans_ctx = nullptr; + // 分配trans_ctx + if (OB_ISNULL(trans_ctx = ctx_->alloc_trans_ctx(trans_id))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to alloc trans ctx", KR(ret), K(trans_id)); + } + // 把trans_ctx插入map + else if (OB_FAIL(trans_ctx_map_.set_refactored(trans_ctx->trans_id_, trans_ctx))) { + LOG_WARN("fail to set trans ctx", KR(ret), K(trans_ctx->trans_id_)); + } + if (OB_FAIL(ret)) { + if (nullptr != trans_ctx) { + ctx_->free_trans_ctx(trans_ctx); + trans_ctx = nullptr; + } + } + return ret; +} + +int ObTableLoadStoreCtx::alloc_trans(const ObTableLoadTransId &trans_id, + ObTableLoadStoreTrans *&trans) +{ + int ret = OB_SUCCESS; + trans = nullptr; + ObTableLoadTransCtx *trans_ctx = nullptr; + // 分配trans_ctx + if (OB_FAIL(alloc_trans_ctx(trans_id, trans_ctx))) { + LOG_WARN("fail to alloc trans ctx", KR(ret), K(trans_id)); + } + // 构造trans + else if (OB_ISNULL(trans = trans_allocator_.alloc(trans_ctx))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to alloc ObTableLoadStoreTrans", KR(ret)); + } else if (OB_FAIL(trans->init())) { + LOG_WARN("fail to init trans", KR(ret), K(trans_id)); + } else if (OB_FAIL(trans_map_.set_refactored(trans_id, trans))) { + LOG_WARN("fail to set_refactored", KR(ret), K(trans_id)); + } + if (OB_FAIL(ret)) { + if (nullptr != trans) { + trans_allocator_.free(trans); + trans = nullptr; + } + } + return ret; +} + int ObTableLoadStoreCtx::start_trans(const ObTableLoadTransId &trans_id, ObTableLoadStoreTrans *&trans) { @@ -643,10 +616,11 @@ int ObTableLoadStoreCtx::start_trans(const ObTableLoadTransId &trans_id, ret = OB_NOT_INIT; LOG_WARN("ObTableLoadStoreCtx not init", KR(ret)); } else { - obsys::ObWLockGuard guard(rwlock_); + obsys::ObRLockGuard status_guard(status_lock_); if (OB_FAIL(check_status_unlock(ObTableLoadStatusType::LOADING))) { LOG_WARN("fail to check status", KR(ret), K_(status)); } else { + obsys::ObWLockGuard guard(rwlock_); const ObTableLoadSegmentID &segment_id = trans_id.segment_id_; SegmentCtx *segment_ctx = nullptr; if (OB_FAIL(segment_ctx_map_.get(segment_id, segment_ctx))) { @@ -674,6 +648,9 @@ int ObTableLoadStoreCtx::start_trans(const ObTableLoadTransId &trans_id, } } } + if (OB_NOT_NULL(segment_ctx)) { + segment_ctx_map_.revert(segment_ctx); + } } } return ret; @@ -714,6 +691,9 @@ int ObTableLoadStoreCtx::commit_trans(ObTableLoadStoreTrans *trans) segment_ctx->committed_trans_store_ = trans_store; trans->set_dirty(); } + if (OB_NOT_NULL(segment_ctx)) { + segment_ctx_map_.revert(segment_ctx); + } if (OB_FAIL(ret)) { if (nullptr != trans_store) { trans_store->~ObTableLoadTransStore(); @@ -753,6 +733,9 @@ int ObTableLoadStoreCtx::abort_trans(ObTableLoadStoreTrans *trans) segment_ctx->current_trans_ = nullptr; trans->set_dirty(); } + if (OB_NOT_NULL(segment_ctx)) { + segment_ctx_map_.revert(segment_ctx); + } } return ret; } @@ -829,8 +812,7 @@ int ObTableLoadStoreCtx::get_trans_ctx(const ObTableLoadTransId &trans_id, return ret; } -int ObTableLoadStoreCtx::get_active_trans_ids( - ObIArray &trans_id_array) const +int ObTableLoadStoreCtx::get_active_trans_ids(ObIArray &trans_id_array) const { int ret = OB_SUCCESS; trans_id_array.reset(); @@ -911,21 +893,5 @@ void ObTableLoadStoreCtx::clear_committed_trans_stores() committed_trans_store_array_.reset(); } -int ObTableLoadStoreCtx::commit() -{ - int ret = OB_SUCCESS; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - LOG_WARN("ObTableLoadStoreCtx not init", KR(ret)); - } else if (OB_FAIL(check_status(ObTableLoadStatusType::MERGED))) { - LOG_WARN("fail to check status", KR(ret)); - } else if (OB_FAIL(insert_table_ctx_->commit())) { - LOG_WARN("fail to commit insert table", KR(ret)); - } else if (ctx_->schema_.has_autoinc_column_ && OB_FAIL(commit_autoinc_value())) { - LOG_WARN("fail to commit sync auto increment value", KR(ret)); - } - return ret; -} - } // namespace observer } // namespace oceanbase diff --git a/src/observer/table_load/ob_table_load_store_ctx.h b/src/observer/table_load/ob_table_load_store_ctx.h index 13fb90e9bd..e492ed4971 100644 --- a/src/observer/table_load/ob_table_load_store_ctx.h +++ b/src/observer/table_load/ob_table_load_store_ctx.h @@ -48,46 +48,59 @@ public: bool is_valid() const { return is_inited_; } TO_STRING_KV(K_(is_inited)); public: + OB_INLINE obsys::ObRWLock &get_status_lock() + { + return status_lock_; + } OB_INLINE table::ObTableLoadStatusType get_status() const { - obsys::ObRLockGuard guard(rwlock_); + obsys::ObRLockGuard guard(status_lock_); return status_; } OB_INLINE int get_error_code() const { - obsys::ObRLockGuard guard(rwlock_); + obsys::ObRLockGuard guard(status_lock_); return error_code_; } OB_INLINE int set_status_inited() { - return advance_status(table::ObTableLoadStatusType::INITED); + obsys::ObWLockGuard guard(status_lock_); + return advance_status_unlock(table::ObTableLoadStatusType::INITED); } OB_INLINE int set_status_loading() { - return advance_status(table::ObTableLoadStatusType::LOADING); + obsys::ObWLockGuard guard(status_lock_); + return advance_status_unlock(table::ObTableLoadStatusType::LOADING); } OB_INLINE int set_status_frozen() { - return advance_status(table::ObTableLoadStatusType::FROZEN); + obsys::ObWLockGuard guard(status_lock_); + return advance_status_unlock(table::ObTableLoadStatusType::FROZEN); } OB_INLINE int set_status_merging() { - return advance_status(table::ObTableLoadStatusType::MERGING); + obsys::ObWLockGuard guard(status_lock_); + return advance_status_unlock(table::ObTableLoadStatusType::MERGING); } OB_INLINE int set_status_merged() { - return advance_status(table::ObTableLoadStatusType::MERGED); + obsys::ObWLockGuard guard(status_lock_); + return advance_status_unlock(table::ObTableLoadStatusType::MERGED); } - OB_INLINE int set_status_commit() + OB_INLINE int set_status_commit_unlock() { - return advance_status(table::ObTableLoadStatusType::COMMIT); + return advance_status_unlock(table::ObTableLoadStatusType::COMMIT); } int set_status_error(int error_code); int set_status_abort(); - int check_status(table::ObTableLoadStatusType status) const; -private: - int advance_status(table::ObTableLoadStatusType status); int check_status_unlock(table::ObTableLoadStatusType status) const; + OB_INLINE int check_status(table::ObTableLoadStatusType status) const + { + obsys::ObRLockGuard guard(status_lock_); + return check_status_unlock(status); + } +private: + int advance_status_unlock(table::ObTableLoadStatusType status); public: int start_trans(const table::ObTableLoadTransId &trans_id, ObTableLoadStoreTrans *&trans); int commit_trans(ObTableLoadStoreTrans *trans); @@ -104,7 +117,6 @@ public: int check_exist_trans(bool &exist) const; // release disk space void clear_committed_trans_stores(); - int commit(); private: int alloc_trans_ctx(const table::ObTableLoadTransId &trans_id, ObTableLoadTransCtx *&trans_ctx); int alloc_trans(const table::ObTableLoadTransId &trans_id, ObTableLoadStoreTrans *&trans); @@ -112,6 +124,7 @@ private: int init_session_ctx_array(); int generate_autoinc_params(share::AutoincParam &autoinc_param); int init_sequence(); +public: int commit_autoinc_value(); public: ObTableLoadTableCtx * const ctx_; @@ -158,10 +171,11 @@ private: typedef common::ObLinkHashMap SegmentCtxMap; private: ObTableLoadObjectAllocator trans_allocator_; // 多线程安全 - mutable obsys::ObRWLock rwlock_; common::ObArenaAllocator allocator_; + obsys::ObRWLock status_lock_; table::ObTableLoadStatusType status_; int error_code_; + mutable obsys::ObRWLock rwlock_; TransMap trans_map_; TransCtxMap trans_ctx_map_; SegmentCtxMap segment_ctx_map_; diff --git a/src/observer/table_load/ob_table_load_table_ctx.cpp b/src/observer/table_load/ob_table_load_table_ctx.cpp index a17eaf8b79..c44dfb0246 100644 --- a/src/observer/table_load/ob_table_load_table_ctx.cpp +++ b/src/observer/table_load/ob_table_load_table_ctx.cpp @@ -157,13 +157,23 @@ int ObTableLoadTableCtx::init_coordinator_ctx(const ObIArray &idx_array ret = OB_ERR_UNEXPECTED; LOG_WARN("coordinator ctx already exist", KR(ret)); } else { - if (OB_ISNULL(coordinator_ctx_ = OB_NEWx(ObTableLoadCoordinatorCtx, (&allocator_), this))) { + ObTableLoadCoordinatorCtx *coordinator_ctx = nullptr; + if (OB_ISNULL(coordinator_ctx = OB_NEWx(ObTableLoadCoordinatorCtx, (&allocator_), this))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to new ObTableLoadCoordinatorCtx", KR(ret)); - } else if (OB_FAIL(coordinator_ctx_->init(idx_array, user_id))) { + } else if (OB_FAIL(coordinator_ctx->init(idx_array, user_id))) { LOG_WARN("fail to init coordinator ctx", KR(ret)); - } else if (OB_FAIL(coordinator_ctx_->set_status_inited())) { + } else if (OB_FAIL(coordinator_ctx->set_status_inited())) { LOG_WARN("fail to set coordinator status inited", KR(ret)); + } else { + coordinator_ctx_ = coordinator_ctx; + } + if (OB_FAIL(ret)) { + if (nullptr != coordinator_ctx) { + coordinator_ctx->~ObTableLoadCoordinatorCtx(); + allocator_.free(coordinator_ctx); + coordinator_ctx = nullptr; + } } } return ret; @@ -181,13 +191,23 @@ int ObTableLoadTableCtx::init_store_ctx( ret = OB_ENTRY_EXIST; LOG_WARN("store ctx already exist", KR(ret)); } else { - if (OB_ISNULL(store_ctx_ = OB_NEWx(ObTableLoadStoreCtx, (&allocator_), this))) { + ObTableLoadStoreCtx *store_ctx = nullptr; + if (OB_ISNULL(store_ctx = OB_NEWx(ObTableLoadStoreCtx, (&allocator_), this))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to new ObTableLoadStoreCtx", KR(ret)); - } else if (OB_FAIL(store_ctx_->init(partition_id_array, target_partition_id_array))) { + } else if (OB_FAIL(store_ctx->init(partition_id_array, target_partition_id_array))) { LOG_WARN("fail to init store ctx", KR(ret)); - } else if (OB_FAIL(store_ctx_->set_status_inited())) { + } else if (OB_FAIL(store_ctx->set_status_inited())) { LOG_WARN("fail to set store status inited", KR(ret)); + } else { + store_ctx_ = store_ctx; + } + if (OB_FAIL(ret)) { + if (nullptr != store_ctx) { + store_ctx->~ObTableLoadStoreCtx(); + allocator_.free(store_ctx); + store_ctx = nullptr; + } } } return ret;