diff --git a/src/observer/table_load/ob_table_load_store.cpp b/src/observer/table_load/ob_table_load_store.cpp index b823a5948d..b18eb1b1de 100644 --- a/src/observer/table_load/ob_table_load_store.cpp +++ b/src/observer/table_load/ob_table_load_store.cpp @@ -369,8 +369,6 @@ int ObTableLoadStore::pre_finish_trans(const ObTableLoadTransId &trans_id) ObTableLoadStoreTrans *trans = nullptr; if (OB_FAIL(store_ctx_->get_trans(trans_id, trans))) { LOG_WARN("fail to get trans", KR(ret)); - } else if (OB_FAIL(trans->set_trans_status_frozen())) { - LOG_WARN("fail to freeze trans", KR(ret)); } else if (OB_FAIL(flush(trans))) { LOG_WARN("fail to flush", KR(ret)); } @@ -497,7 +495,7 @@ int ObTableLoadStore::clean_up_trans(ObTableLoadStoreTrans *trans) LOG_DEBUG("store clean up trans"); ObTableLoadTransStoreWriter *store_writer = nullptr; // 取出当前store_writer - if (OB_FAIL(trans->get_store_writer_for_clean_up(store_writer))) { + if (OB_FAIL(trans->get_store_writer(store_writer))) { LOG_WARN("fail to get store writer", KR(ret)); } else { for (int32_t session_id = 1; OB_SUCC(ret) && session_id <= param_.session_count_; @@ -660,7 +658,7 @@ int ObTableLoadStore::write(const ObTableLoadTransId &trans_id, int32_t session_ LOG_WARN("fail to get trans", KR(ret)); } // 取出store_writer - else if (OB_FAIL(trans->get_store_writer_for_write(store_writer))) { + else if (OB_FAIL(trans->get_store_writer(store_writer))) { LOG_WARN("fail to get store writer", KR(ret)); //} else if (OB_FAIL(store_writer->advance_sequence_no(session_id, partition_id, sequence_no, guard))) { // if (OB_UNLIKELY(OB_ENTRY_EXIST != ret)) { @@ -796,8 +794,12 @@ int ObTableLoadStore::flush(ObTableLoadStoreTrans *trans) LOG_DEBUG("store flush"); ObTableLoadTransStoreWriter *store_writer = nullptr; // 取出当前store_writer - if (OB_FAIL(trans->get_store_writer_for_flush(store_writer))) { + if (OB_FAIL(trans->get_store_writer(store_writer))) { LOG_WARN("fail to get store writer", KR(ret)); + } + // after get store writer, avoid early commit + else if (OB_FAIL(trans->set_trans_status_frozen())) { + LOG_WARN("fail to freeze trans", KR(ret)); } else { for (int32_t session_id = 1; OB_SUCC(ret) && session_id <= param_.session_count_; ++session_id) { ObTableLoadTask *task = nullptr; @@ -866,8 +868,6 @@ int ObTableLoadStore::px_finish_trans(const ObTableLoadTransId &trans_id) ObTableLoadStoreTrans *trans = nullptr; if (OB_FAIL(store_ctx_->get_trans(trans_id, trans))) { LOG_WARN("fail to get trans", KR(ret)); - } else if (OB_FAIL(trans->set_trans_status_frozen())) { - LOG_WARN("fail to freeze trans", KR(ret)); } else if (OB_FAIL(px_flush(trans))) { LOG_WARN("fail to do px flush", KR(ret)); } else if (OB_FAIL(store_ctx_->commit_trans(trans))) { @@ -928,7 +928,7 @@ int ObTableLoadStore::px_write(const ObTableLoadTransId &trans_id, ObTableLoadTransStoreWriter *store_writer = nullptr; if (OB_FAIL(store_ctx_->get_trans(trans_id, trans))) { LOG_WARN("fail to get trans", KR(ret)); - } else if (OB_FAIL(trans->get_store_writer_for_write(store_writer))) { + } else if (OB_FAIL(trans->get_store_writer(store_writer))) { LOG_WARN("fail to get store writer", KR(ret)); } else { if (OB_SUCC(trans->check_trans_status(ObTableLoadTransStatusType::RUNNING)) || @@ -960,8 +960,8 @@ int ObTableLoadStore::px_clean_up_trans(ObTableLoadStoreTrans *trans) LOG_WARN("ObTableLoadStore not init", KR(ret), KP(this)); } else { ObTableLoadTransStoreWriter *store_writer = nullptr; - if (OB_FAIL(trans->get_store_writer_for_clean_up(store_writer))) { - LOG_WARN("fail to get store writer for clean up", KR(ret)); + if (OB_FAIL(trans->get_store_writer(store_writer))) { + LOG_WARN("fail to get store writer", KR(ret)); } else if (OB_FAIL(store_writer->clean_up(PX_DEFAULT_SESSION_ID))) { LOG_WARN("fail to clean up store writer", KR(ret)); } @@ -981,8 +981,12 @@ int ObTableLoadStore::px_flush(ObTableLoadStoreTrans *trans) LOG_WARN("ObTableLoadStore not init", KR(ret), KP(this)); } else { ObTableLoadTransStoreWriter *store_writer = nullptr; - if (OB_FAIL(trans->get_store_writer_for_flush(store_writer))) { + if (OB_FAIL(trans->get_store_writer(store_writer))) { LOG_WARN("fail to get store writer", KR(ret)); + } + // after get store writer, avoid early commit + else if (OB_FAIL(trans->set_trans_status_frozen())) { + LOG_WARN("fail to freeze trans", KR(ret)); } else if (OB_FAIL(store_writer->flush(PX_DEFAULT_SESSION_ID))) { LOG_WARN("fail to flush store", KR(ret)); } else { diff --git a/src/observer/table_load/ob_table_load_store_trans.cpp b/src/observer/table_load/ob_table_load_store_trans.cpp index afc5e2d5a4..9ed48b6d48 100644 --- a/src/observer/table_load/ob_table_load_store_trans.cpp +++ b/src/observer/table_load/ob_table_load_store_trans.cpp @@ -106,69 +106,13 @@ int ObTableLoadStoreTrans::set_trans_status_abort() return ret; } -int ObTableLoadStoreTrans::get_store_writer_for_write( - ObTableLoadTransStoreWriter *&store_writer) const +int ObTableLoadStoreTrans::get_store_writer(ObTableLoadTransStoreWriter *&store_writer) const { int ret = OB_SUCCESS; store_writer = nullptr; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObTableLoadStoreTrans not init", KR(ret), KP(this)); - } else if (OB_FAIL(check_trans_status(ObTableLoadTransStatusType::RUNNING))) { - LOG_WARN("fail to check trans status", KR(ret)); - } else { - obsys::ObRLockGuard guard(trans_ctx_->rwlock_); - if (OB_ISNULL(trans_store_writer_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected null store writer", KR(ret)); - } else if (OB_UNLIKELY(trans_store_writer_->is_flush())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("trans store writer is flush", KR(ret)); - } else { - store_writer = trans_store_writer_; - store_writer->inc_ref_count(); - } - } - return ret; -} - -int ObTableLoadStoreTrans::get_store_writer_for_flush( - ObTableLoadTransStoreWriter *&store_writer) const -{ - int ret = OB_SUCCESS; - store_writer = nullptr; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - LOG_WARN("ObTableLoadStoreTrans not init", KR(ret), KP(this)); - } else if (OB_FAIL(check_trans_status(ObTableLoadTransStatusType::FROZEN))) { - LOG_WARN("fail to check trans status", KR(ret)); - } else { - obsys::ObRLockGuard guard(trans_ctx_->rwlock_); - if (OB_ISNULL(trans_store_writer_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected null store writer", KR(ret)); - } else if (OB_UNLIKELY(trans_store_writer_->is_flush())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("trans store writer is flush", KR(ret)); - } else { - trans_store_writer_->set_is_flush(); - store_writer = trans_store_writer_; - store_writer->inc_ref_count(); - } - } - return ret; -} - -int ObTableLoadStoreTrans::get_store_writer_for_clean_up( - ObTableLoadTransStoreWriter *&store_writer) const -{ - int ret = OB_SUCCESS; - store_writer = nullptr; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - LOG_WARN("ObTableLoadStoreTrans not init", KR(ret), KP(this)); - } else if (OB_FAIL(check_trans_status(ObTableLoadTransStatusType::ABORT))) { - LOG_WARN("fail to check trans status", KR(ret)); } else { obsys::ObRLockGuard guard(trans_ctx_->rwlock_); if (OB_ISNULL(trans_store_writer_)) { diff --git a/src/observer/table_load/ob_table_load_store_trans.h b/src/observer/table_load/ob_table_load_store_trans.h index b6cf504207..d160935039 100644 --- a/src/observer/table_load/ob_table_load_store_trans.h +++ b/src/observer/table_load/ob_table_load_store_trans.h @@ -57,9 +57,7 @@ public: private: int advance_trans_status(table::ObTableLoadTransStatusType trans_status); public: - int get_store_writer_for_write(ObTableLoadTransStoreWriter *&store_writer) const; - int get_store_writer_for_flush(ObTableLoadTransStoreWriter *&store_writer) const; - int get_store_writer_for_clean_up(ObTableLoadTransStoreWriter *&store_writer) const; + int get_store_writer(ObTableLoadTransStoreWriter *&store_writer) const; void put_store_writer(ObTableLoadTransStoreWriter *store_writer); // 取出store int output_store(ObTableLoadTransStore *&trans_store); diff --git a/src/observer/table_load/ob_table_load_trans_store.cpp b/src/observer/table_load/ob_table_load_trans_store.cpp index d6a0d927e7..4f92ecdb39 100644 --- a/src/observer/table_load/ob_table_load_trans_store.cpp +++ b/src/observer/table_load/ob_table_load_trans_store.cpp @@ -107,7 +107,6 @@ ObTableLoadTransStoreWriter::ObTableLoadTransStoreWriter(ObTableLoadTransStore * allocator_("TLD_TSWriter", OB_MALLOC_NORMAL_BLOCK_SIZE, param_.tenant_id_), table_data_desc_(nullptr), ref_count_(0), - is_flush_(false), is_inited_(false) { } diff --git a/src/observer/table_load/ob_table_load_trans_store.h b/src/observer/table_load/ob_table_load_trans_store.h index 21ea9d8cd9..5195446a78 100644 --- a/src/observer/table_load/ob_table_load_trans_store.h +++ b/src/observer/table_load/ob_table_load_trans_store.h @@ -64,8 +64,6 @@ public: int flush(int32_t session_id); int clean_up(int32_t session_id); public: - void set_is_flush() { is_flush_ = true; } - bool is_flush() const { return is_flush_; } int64_t get_ref_count() const { return ATOMIC_LOAD(&ref_count_); } int64_t inc_ref_count() { return ATOMIC_AAF(&ref_count_, 1); } int64_t dec_ref_count() { return ATOMIC_AAF(&ref_count_, -1); } @@ -112,7 +110,6 @@ private: }; SessionContext *session_ctx_array_; int64_t ref_count_ CACHE_ALIGNED; - bool is_flush_; bool is_inited_; ObSchemaGetterGuard schema_guard_; };