diff --git a/src/storage/ob_pg_sstable_mgr.cpp b/src/storage/ob_pg_sstable_mgr.cpp index 7971e807aa..e6ff6ef227 100644 --- a/src/storage/ob_pg_sstable_mgr.cpp +++ b/src/storage/ob_pg_sstable_mgr.cpp @@ -965,13 +965,17 @@ int ObPGSSTableMgr::GetCleanOutLogIdFunctor::operator()(ObITable& table, bool& i { int ret = OB_SUCCESS; is_full = false; - if (table.get_ref() > 0 && table.is_multi_version_minor_sstable() && table.get_end_log_ts() < clean_out_log_ts_) { - clean_out_log_ts_ = table.get_end_log_ts(); + if (table.get_ref() > 0 && table.is_multi_version_minor_sstable()) { + if (table.is_complement_minor_sstable()) { + min_complement_log_ts_ = std::min(min_complement_log_ts_, table.get_end_log_ts()); + } else { + clean_out_log_ts_.push_back(table.get_end_log_ts()); + } } return ret; } -int ObPGSSTableMgr::get_clean_out_log_ts(int64_t& clean_out_log_ts) +int ObPGSSTableMgr::get_clean_out_log_ts(ObIArray &clean_out_log_ts) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { @@ -982,7 +986,21 @@ int ObPGSSTableMgr::get_clean_out_log_ts(int64_t& clean_out_log_ts) if (OB_FAIL(table_map_.foreach (functor))) { LOG_WARN("fail to foreach table map", K(ret)); } else { - clean_out_log_ts = functor.get_clean_out_log_ts(); + functor.sort(); + const ObIArray &log_ts_array = functor.get_clean_out_log_ts(); + int64_t last_log_ts = -1; + // duplicate log ts may exist, need to remove duplicate + for (int64_t i = 0; OB_SUCC(ret) && i < log_ts_array.count(); i++) { + if (log_ts_array.at(i) < functor.get_min_complement_log_ts()) { + if (last_log_ts != log_ts_array.at(i)) { + if (OB_FAIL(clean_out_log_ts.push_back(log_ts_array.at(i)))) { + LOG_WARN("failed to push back log ts", K(ret)); + } else { + last_log_ts = log_ts_array.at(i); + } + } + } + } } } return ret; diff --git a/src/storage/ob_pg_sstable_mgr.h b/src/storage/ob_pg_sstable_mgr.h index 22b8e57b5f..b0286717a9 100644 --- a/src/storage/ob_pg_sstable_mgr.h +++ b/src/storage/ob_pg_sstable_mgr.h @@ -19,6 +19,7 @@ #include "blocksstable/ob_block_sstable_struct.h" #include "storage/ob_i_table.h" #include "storage/ob_sstable.h" +#include "lib/container/ob_se_array.h" namespace oceanbase { namespace storage { @@ -56,7 +57,7 @@ public: { return file_handle_.assign(file_handle); } - int get_clean_out_log_ts(int64_t& clean_out_log_ts); + int get_clean_out_log_ts(ObIArray &clean_out_log_ts); private: static const int64_t DEFAULT_HASH_BUCKET_COUNT = 100; @@ -130,17 +131,29 @@ private: }; class GetCleanOutLogIdFunctor : public TableMap::ForeachFunctor { public: - GetCleanOutLogIdFunctor() : clean_out_log_ts_(INT64_MAX) - {} + GetCleanOutLogIdFunctor() : clean_out_log_ts_() + { + min_complement_log_ts_ = INT64_MAX; + clean_out_log_ts_.push_back(0); + } virtual ~GetCleanOutLogIdFunctor() = default; - int64_t get_clean_out_log_ts() const + const common::ObIArray &get_clean_out_log_ts() const { return clean_out_log_ts_; } - virtual int operator()(ObITable& table, bool& is_full) override; + void sort() + { + std::sort(clean_out_log_ts_.begin(), clean_out_log_ts_.end()); + } + int64_t get_min_complement_log_ts() + { + return min_complement_log_ts_; + } + virtual int operator()(ObITable &table, bool &is_full) override; private: - int64_t clean_out_log_ts_; + int64_t min_complement_log_ts_; + common::ObSEArray clean_out_log_ts_; }; void destroy(); int write_add_sstable_log(ObSSTable& sstable); diff --git a/src/storage/ob_pg_storage.cpp b/src/storage/ob_pg_storage.cpp index 30b188f890..58e784b210 100644 --- a/src/storage/ob_pg_storage.cpp +++ b/src/storage/ob_pg_storage.cpp @@ -6917,22 +6917,28 @@ int ObPGStorage::remove_mem_ctx_for_trans_ctx_(memtable::ObMemtable* mt) return ret; } -int ObPGStorage::get_max_cleanout_log_ts(int64_t& max_cleanout_log_ts) +int ObPGStorage::get_max_cleanout_log_ts(ObIArray &sstable_cleanout_log_ts) { int ret = OB_SUCCESS; - int64_t sstable_clean_out_log_ts = 0; - max_cleanout_log_ts = 0; - ObTablesHandle handle; + sstable_cleanout_log_ts.reset(); + ObSEArray log_ts_array; + int64_t last_replay_log_ts = 0; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("pg storage is not inited", K(ret)); - } else if (OB_FAIL(sstable_mgr_.get_clean_out_log_ts(sstable_clean_out_log_ts))) { + } else if (OB_FAIL(sstable_mgr_.get_clean_out_log_ts(log_ts_array))) { LOG_WARN("failed to get clean out log id", K(ret), K(pkey_)); } else { { TCRLockGuard lock_guard(lock_); - max_cleanout_log_ts = - std::min(sstable_clean_out_log_ts, meta_->storage_info_.get_data_info().get_last_replay_log_ts()); + last_replay_log_ts = meta_->storage_info_.get_data_info().get_last_replay_log_ts(); + } + for (int64_t i = 0; OB_SUCC(ret) && i < log_ts_array.count(); i++) { + if (log_ts_array.at(i) < last_replay_log_ts) { + if (OB_FAIL(sstable_cleanout_log_ts.push_back(log_ts_array.at(i)))) { + LOG_WARN("failed to push back log ts", K(ret), K(pkey_)); + } + } } } return ret; @@ -6981,7 +6987,7 @@ int ObPGStorage::get_min_schema_version(int64_t& min_schema_version) int ObPGStorage::clear_unused_trans_status() { int ret = OB_SUCCESS; - int64_t max_cleanout_log_ts = 0; + ObSEArray max_cleanout_log_ts; if (OB_FAIL(get_max_cleanout_log_ts(max_cleanout_log_ts))) { LOG_WARN("failed to get max cleanout log id", K(ret), K_(pkey)); } else if (OB_FAIL(txs_->clear_unused_trans_status(pkey_, max_cleanout_log_ts))) { diff --git a/src/storage/ob_pg_storage.h b/src/storage/ob_pg_storage.h index 99a457c026..4d9841ab97 100644 --- a/src/storage/ob_pg_storage.h +++ b/src/storage/ob_pg_storage.h @@ -351,7 +351,7 @@ public: transaction::ObTransService& trans_service, const int64_t frozen_version); int restore_mem_trans_table(ObSSTable& trans_sstable); int restore_mem_trans_table(); - int get_max_cleanout_log_ts(int64_t& max_cleanout_log_ts); + int get_max_cleanout_log_ts(ObIArray &max_cleanout_log_ts); int clear_unused_trans_status(); int physical_flashback(const int64_t flashback_scn); int set_meta_block_list(const common::ObIArray& meta_block_list); diff --git a/src/storage/transaction/ob_trans_ctx_mgr.cpp b/src/storage/transaction/ob_trans_ctx_mgr.cpp index d802dc62bf..7924172646 100644 --- a/src/storage/transaction/ob_trans_ctx_mgr.cpp +++ b/src/storage/transaction/ob_trans_ctx_mgr.cpp @@ -1724,7 +1724,7 @@ int ObPartitionTransCtxMgr::get_max_trans_version_before_given_log_ts( return ret; } -int ObPartitionTransCtxMgr::clear_unused_trans_status(const int64_t max_cleanout_log_ts) +int ObPartitionTransCtxMgr::clear_unused_trans_status(const ObIArray &max_cleanout_log_ts) { int ret = OB_SUCCESS; @@ -5099,7 +5099,8 @@ int ObPartTransCtxMgr::get_max_trans_version_before_given_log_ts( return ret; } -int ObPartTransCtxMgr::clear_unused_trans_status(const ObPartitionKey& pkey, const int64_t max_cleanout_log_ts) +int ObPartTransCtxMgr::clear_unused_trans_status( + const ObPartitionKey &pkey, const ObIArray &max_cleanout_log_ts) { int ret = OB_SUCCESS; ObPartitionTransCtxMgr* ctx_mgr = nullptr; diff --git a/src/storage/transaction/ob_trans_ctx_mgr.h b/src/storage/transaction/ob_trans_ctx_mgr.h index d0268e073d..052ba1ab94 100644 --- a/src/storage/transaction/ob_trans_ctx_mgr.h +++ b/src/storage/transaction/ob_trans_ctx_mgr.h @@ -323,8 +323,8 @@ public: const transaction::ObTransID& data_trans_id, const int32_t sql_sequence, storage::ObStoreRowLockState& lock_state); int get_max_trans_version_before_given_log_ts( - const int64_t log_ts, int64_t& max_trans_version, bool& is_all_rollback_trans); - int clear_unused_trans_status(const int64_t max_cleanout_log_ts); + const int64_t log_ts, int64_t &max_trans_version, bool &is_all_rollback_trans); + int clear_unused_trans_status(const ObIArray &max_cleanout_log_ts); int has_terminated_trx_in_given_log_ts_range( const int64_t start_log_ts, const int64_t end_log_ts, bool& has_terminated_trx); bool is_clog_aggregation_enabled(); @@ -975,8 +975,8 @@ public: int get_applied_log_ts(const common::ObPartitionKey& pkey, int64_t& applied_log_ts); int get_max_trans_version_before_given_log_ts( - const ObPartitionKey& pkey, const int64_t log_ts, int64_t& max_trans_version, bool& is_all_rollback_trans); - int clear_unused_trans_status(const ObPartitionKey& pkey, const int64_t max_cleanout_log_id); + const ObPartitionKey &pkey, const int64_t log_ts, int64_t &max_trans_version, bool &is_all_rollback_trans); + int clear_unused_trans_status(const ObPartitionKey &pkey, const ObIArray &max_cleanout_log_id); int has_terminated_trx_in_given_log_ts_range( const ObPartitionKey& pkey, const int64_t start_log_ts, const int64_t end_log_ts, bool& has_terminated_trx); int set_last_restore_log_info( diff --git a/src/storage/transaction/ob_trans_define.h b/src/storage/transaction/ob_trans_define.h index dedb8de59a..6cd417b791 100644 --- a/src/storage/transaction/ob_trans_define.h +++ b/src/storage/transaction/ob_trans_define.h @@ -3123,13 +3123,15 @@ public: : status_(ObTransTableStatusType::RUNNING), trans_version_(common::OB_INVALID_VERSION), undo_status_(), + start_log_ts_(0), end_log_ts_(INT64_MAX) {} ObTransTableStatusType status_; int64_t trans_version_; ObTransUndoStatus undo_status_; + int64_t start_log_ts_; int64_t end_log_ts_; - TO_STRING_KV(K_(status), K_(trans_version), K_(undo_status), K_(end_log_ts)); + TO_STRING_KV(K_(status), K_(trans_version), K_(undo_status), K_(start_log_ts), K_(end_log_ts)); }; class ObTransTableStatusInfo { diff --git a/src/storage/transaction/ob_trans_functor.h b/src/storage/transaction/ob_trans_functor.h index f05bddb5de..6ee85877d4 100644 --- a/src/storage/transaction/ob_trans_functor.h +++ b/src/storage/transaction/ob_trans_functor.h @@ -1752,7 +1752,8 @@ private: class ObCleanTransTableFunctor { public: - explicit ObCleanTransTableFunctor(const int64_t max_cleanout_log_ts) : max_cleanout_log_ts_(max_cleanout_log_ts) + explicit ObCleanTransTableFunctor(const ObIArray &max_cleanout_log_ts) + : max_cleanout_log_ts_(max_cleanout_log_ts) {} bool operator()(const ObTransID& trans_id, ObTransCtx* ctx_base) { @@ -1767,9 +1768,10 @@ public: TRANS_LOG(WARN, "failed to get trans table status info", K(ret)); } else if ((ObTransTableStatusType::COMMIT == trans_info.status_ || ObTransTableStatusType::ABORT == trans_info.status_) && - ctx->is_exiting()) { - if (trans_info.end_log_ts_ <= max_cleanout_log_ts_) { - if (ctx->is_dirty_trans()) { + ctx->is_exiting() && ctx->is_dirty_trans()) { + for (int64_t i = 0; i < max_cleanout_log_ts_.count() - 1; i++) { + if (trans_info.start_log_ts_ > max_cleanout_log_ts_.at(i) && + trans_info.end_log_ts_ <= max_cleanout_log_ts_.at(i + 1)) { ctx->remove_trans_table(); } } @@ -1779,7 +1781,7 @@ public: } private: - int64_t max_cleanout_log_ts_; + const ObIArray &max_cleanout_log_ts_; }; class ObCheckHasTerminatedTrxBeforeLogFunction { diff --git a/src/storage/transaction/ob_trans_part_ctx.cpp b/src/storage/transaction/ob_trans_part_ctx.cpp index d0550bb7b2..39ad877794 100644 --- a/src/storage/transaction/ob_trans_part_ctx.cpp +++ b/src/storage/transaction/ob_trans_part_ctx.cpp @@ -10690,6 +10690,8 @@ int ObPartTransCtx::get_trans_state_and_version_without_lock(ObTransStatusInfo& if (trans_info.status_ == ObTransTableStatusType::COMMIT || trans_info.status_ == ObTransTableStatusType::ABORT) { trans_info.end_log_ts_ = ATOMIC_LOAD(&end_log_ts_); } + + trans_info.start_log_ts_ = ATOMIC_LOAD(&min_log_ts_); return ret; } diff --git a/src/storage/transaction/ob_trans_service.cpp b/src/storage/transaction/ob_trans_service.cpp index 660f520edc..392fe4e5ee 100644 --- a/src/storage/transaction/ob_trans_service.cpp +++ b/src/storage/transaction/ob_trans_service.cpp @@ -9042,7 +9042,8 @@ int ObTransService::get_max_trans_version_before_given_log_ts( return ret; } -int ObTransService::clear_unused_trans_status(const ObPartitionKey& pg_key, const int64_t max_cleanout_log_ts) +int ObTransService::clear_unused_trans_status( + const ObPartitionKey &pg_key, const ObIArray &max_cleanout_log_ts) { int ret = OB_SUCCESS; diff --git a/src/storage/transaction/ob_trans_service.h b/src/storage/transaction/ob_trans_service.h index 3361a0d21b..8f4f87ffcb 100644 --- a/src/storage/transaction/ob_trans_service.h +++ b/src/storage/transaction/ob_trans_service.h @@ -554,8 +554,8 @@ public: return &trans_status_mgr_; } int get_max_trans_version_before_given_log_ts( - const ObPartitionKey& pkey, const int64_t log_ts, int64_t& max_trans_version, bool& is_all_rollback_trans); - int clear_unused_trans_status(const ObPartitionKey& pg_key, const int64_t max_cleanout_log_ts); + const ObPartitionKey &pkey, const int64_t log_ts, int64_t &max_trans_version, bool &is_all_rollback_trans); + int clear_unused_trans_status(const ObPartitionKey &pg_key, const ObIArray &max_cleanout_log_ts); virtual int has_terminated_trx_in_given_log_ts_range( const ObPartitionKey& pkey, const int64_t start_log_ts, const int64_t end_log_ts, bool& has_terminated_trx); ObPartTransSameLeaderBatchRpcMgr* get_part_trans_same_leader_batch_rpc_mgr()