From 81ceea749788eef1a5c7c5fa484e475dbc8eee20 Mon Sep 17 00:00:00 2001 From: Vinoth Veeraraghavan Date: Tue, 13 Oct 2020 15:38:50 +0800 Subject: [PATCH] Fix reclamation in MOT ckpt --- .../occ_transaction_manager.cpp | 6 +- .../src/concurrency_control/row_header.cpp | 6 +- .../core/src/concurrency_control/row_header.h | 8 +-- .../garbage_collector/mm_gc_manager.cpp | 10 ++- .../memory/garbage_collector/mm_gc_manager.h | 56 +++++++++++++-- .../storage/mot/core/src/storage/row.h | 4 +- .../storage/mot/core/src/storage/sentinel.h | 3 +- .../storage/mot/core/src/storage/table.cpp | 10 +++ .../storage/mot/core/src/storage/table.h | 25 +++++++ .../system/checkpoint/checkpoint_worker.cpp | 71 ++++++++++++++++--- .../src/system/checkpoint/checkpoint_worker.h | 8 ++- .../mot/core/src/system/transaction/txn.cpp | 5 +- 12 files changed, 178 insertions(+), 34 deletions(-) diff --git a/src/gausskernel/storage/mot/core/src/concurrency_control/occ_transaction_manager.cpp b/src/gausskernel/storage/mot/core/src/concurrency_control/occ_transaction_manager.cpp index 6dec000c1..da5182e8e 100644 --- a/src/gausskernel/storage/mot/core/src/concurrency_control/occ_transaction_manager.cpp +++ b/src/gausskernel/storage/mot/core/src/concurrency_control/occ_transaction_manager.cpp @@ -373,7 +373,9 @@ bool OccTransactionManager::WriteChanges(TxnManager* txMan) access->m_origSentinel->SetNextPtr(access->m_auxRow->GetPrimarySentinel()); } // upgrade should not change the reference count! - access->m_origSentinel->SetUpgradeCounter(); + if (access->m_origSentinel->IsCommited()) { + access->m_origSentinel->SetUpgradeCounter(); + } } } } @@ -403,6 +405,8 @@ bool OccTransactionManager::WriteChanges(TxnManager* txMan) } } + CleanRowsFromIndexes(txMan); + if (cfg.m_enableCheckpoint) { GetCheckpointManager()->CommitTransaction(txMan, m_rowsSetSize); } diff --git a/src/gausskernel/storage/mot/core/src/concurrency_control/row_header.cpp b/src/gausskernel/storage/mot/core/src/concurrency_control/row_header.cpp index 977ba1955..56346e3f4 100644 --- a/src/gausskernel/storage/mot/core/src/concurrency_control/row_header.cpp +++ b/src/gausskernel/storage/mot/core/src/concurrency_control/row_header.cpp @@ -134,11 +134,7 @@ void RowHeader::WriteChangesToRow(const Access* access, uint64_t csn) case DEL: MOT_ASSERT(access->m_origSentinel->IsCommited() == true); if (access->m_params.IsPrimarySentinel()) { - if (row->GetPrimarySentinel()->GetStable() == nullptr) { - m_csnWord = (csn | LOCK_BIT | ABSENT_BIT | LATEST_VER_BIT); - } else { - m_csnWord = (csn | LOCK_BIT | ABSENT_BIT); // if has stable, avoid destroying the row - } + m_csnWord = (csn | LOCK_BIT | ABSENT_BIT | LATEST_VER_BIT); // and allow reuse of the original row } // Invalidate sentinel - row is still locked! diff --git a/src/gausskernel/storage/mot/core/src/concurrency_control/row_header.h b/src/gausskernel/storage/mot/core/src/concurrency_control/row_header.h index 9634b32ed..e2455ef08 100644 --- a/src/gausskernel/storage/mot/core/src/concurrency_control/row_header.h +++ b/src/gausskernel/storage/mot/core/src/concurrency_control/row_header.h @@ -192,12 +192,12 @@ private: } /** - * @brief Queries whether the row is valid. - * @return True if the absent bit and the latet vsersion are not set for the row. + * @brief Queries whether the row is deleted. + * @return True if the absent bit and the latet vsersion are set for the row. */ - bool IsRowValid() const + bool IsRowDeleted() const { - return (m_csnWord & (ABSENT_BIT | LATEST_VER_BIT)) == 0; + return (m_csnWord & (ABSENT_BIT | LATEST_VER_BIT)) == (ABSENT_BIT | LATEST_VER_BIT); } /** @brief Sets the absent bit in the row. */ diff --git a/src/gausskernel/storage/mot/core/src/memory/garbage_collector/mm_gc_manager.cpp b/src/gausskernel/storage/mot/core/src/memory/garbage_collector/mm_gc_manager.cpp index 29e4c0168..d0f0b4909 100644 --- a/src/gausskernel/storage/mot/core/src/memory/garbage_collector/mm_gc_manager.cpp +++ b/src/gausskernel/storage/mot/core/src/memory/garbage_collector/mm_gc_manager.cpp @@ -40,7 +40,7 @@ GcLock g_gcGlobalEpochLock; GcManager* GcManager::allGcManagers = nullptr; -inline GcManager::GcManager(int purpose, int getThreadId, int rcuMaxFreeCount) +inline GcManager::GcManager(GC_TYPE purpose, int getThreadId, int rcuMaxFreeCount) : m_rcuFreeCount(rcuMaxFreeCount), m_tid(getThreadId), m_purpose(purpose) {} @@ -80,7 +80,7 @@ bool GcManager::Initialize() return result; } -GcManager* GcManager::Make(int purpose, int threadId, int rcuMaxFreeCount) +GcManager* GcManager::Make(GC_TYPE purpose, int threadId, int rcuMaxFreeCount) { GcManager* gc = nullptr; @@ -247,7 +247,11 @@ void GcManager::CleanIndexItems(uint32_t indexId, bool dropIndex) } m_managerLock.unlock(); if (counter) { - MOT_LOG_INFO("threadId = %d cleaned from index id = %d items = %d\n", m_tid, indexId, counter); + MOT_LOG_INFO("Entity:%s threadId = %d cleaned from index id = %d items = %d\n", + enGcTypes[m_purpose], + m_tid, + indexId, + counter); } } diff --git a/src/gausskernel/storage/mot/core/src/memory/garbage_collector/mm_gc_manager.h b/src/gausskernel/storage/mot/core/src/memory/garbage_collector/mm_gc_manager.h index 350f0ca48..921b96d69 100644 --- a/src/gausskernel/storage/mot/core/src/memory/garbage_collector/mm_gc_manager.h +++ b/src/gausskernel/storage/mot/core/src/memory/garbage_collector/mm_gc_manager.h @@ -134,6 +134,9 @@ struct LimboGroup { inline unsigned CleanIndexItemPerGroup(GcManager& ti, uint32_t indexId, bool dropIndex); }; +static const char* const enGcTypes[] = { + stringify(GC_MAIN), stringify(GC_INDEX), stringify(GC_LOG), stringify(GC_CHECKPOINT)}; + /** * @class GcManager * @brief Garbage-collector manager per-session @@ -180,7 +183,7 @@ public: * @param rcuMaxFreeCount How many objects to reclaim * @return Pointer to instance of a GC manager */ - static GcManager* Make(int purpose, int threadId, int rcuMaxFreeCount = MASSTREE_OBJECT_COUNT_PER_CLEANUP); + static GcManager* Make(GC_TYPE purpose, int threadId, int rcuMaxFreeCount = MASSTREE_OBJECT_COUNT_PER_CLEANUP); /** * @brief Add Current manager to the global list @@ -210,6 +213,21 @@ public: return m_tid; } + void SetGcType(GC_TYPE type) + { + m_purpose = type; + } + + const char* GetGcTypeStr() + { + return enGcTypes[m_purpose]; + } + + GC_TYPE GetGcType() + { + return m_purpose; + } + void GcStartTxnMTtests() { if (m_gcEpoch != GetGlobalEpoch()) @@ -307,7 +325,37 @@ public: m_managerLock.lock(); ShrinkMem(); m_managerLock.unlock(); - MOT_LOG_DEBUG("THD_ID:%d closed session cleaned %d elements from limbo!\n", m_tid, inuseElements); + MOT_LOG_DEBUG("Entity:%s THD_ID:%d closed session cleaned %d elements from limbo!\n", + enGcTypes[m_purpose], + m_tid, + inuseElements); + } + + /** @brief Clean all object at the end of the session */ + inline void GcCheckPointClean() + { + if (m_isGcEnabled == false) { + return; + } + + // End CP Txn + m_gcEpoch = 0; + m_isTxnStarted = false; + + // Increase the global epoch to insure all elements are from a lower epoch + SetGlobalEpoch(GetGlobalEpoch() + 1); + m_managerLock.lock(); + HardQuiesce(m_totalLimboInuseElements); + m_managerLock.unlock(); + +#ifdef MOT_DEBUG + uint32_t inuseElements = m_totalLimboInuseElements; + uint32_t cleandObjects = inuseElements - m_totalLimboInuseElements; + if (cleandObjects) { + MOT_LOG_INFO( + "Entity:%s THD_ID:%d cleaned %d elements from limbo!\n", enGcTypes[m_purpose], m_tid, (cleandObjects)); + } +#endif } /** @@ -455,7 +503,7 @@ private: uint16_t m_tid; /** @var GC manager type */ - uint8_t m_purpose; + GC_TYPE m_purpose; /** @brief Calculate the minimum epoch among all active GC Managers. * @return The minimum epoch among all active GC Managers. @@ -468,7 +516,7 @@ private: } /** @brief Constructor */ - inline GcManager(int purpose, int index, int rcuMaxFreeCount); + inline GcManager(GC_TYPE purpose, int index, int rcuMaxFreeCount); /** @brief Initialize GC Manager's structures. * @return True for success diff --git a/src/gausskernel/storage/mot/core/src/storage/row.h b/src/gausskernel/storage/mot/core/src/storage/row.h index a2483b9d9..fe8f8e083 100644 --- a/src/gausskernel/storage/mot/core/src/storage/row.h +++ b/src/gausskernel/storage/mot/core/src/storage/row.h @@ -222,9 +222,9 @@ public: * @brief Queries the validity of the row header. * @return Boolean value denoting whether the row is valid or not. */ - inline bool IsRowValid() const + inline bool IsRowDeleted() const { - return m_rowHeader.IsRowValid(); + return m_rowHeader.IsRowDeleted(); } /** diff --git a/src/gausskernel/storage/mot/core/src/storage/sentinel.h b/src/gausskernel/storage/mot/core/src/storage/sentinel.h index bf630f9c4..4b7646f09 100644 --- a/src/gausskernel/storage/mot/core/src/storage/sentinel.h +++ b/src/gausskernel/storage/mot/core/src/storage/sentinel.h @@ -323,13 +323,14 @@ private: Index* m_index = nullptr; /** @var m_stable A Container for the row and its status bit */ - uint64_t m_stable = 0; + volatile uint64_t m_stable = 0; /** @var m_refCount A counter of concurrent inserters of the same key */ volatile uint32_t m_refCount = 0; inline void DecCounter() { + MOT_ASSERT(GetCounter() > 0); uint32_t counter = GetCounter() - 1; m_refCount = (m_refCount & ~S_COUNTER_MASK) | (counter & S_COUNTER_MASK); } diff --git a/src/gausskernel/storage/mot/core/src/storage/table.cpp b/src/gausskernel/storage/mot/core/src/storage/table.cpp index ae0e32cdf..9cea27151 100644 --- a/src/gausskernel/storage/mot/core/src/storage/table.cpp +++ b/src/gausskernel/storage/mot/core/src/storage/table.cpp @@ -582,6 +582,16 @@ Row* Table::RemoveKeyFromIndex(Row* row, Sentinel* sentinel, uint64_t tid, GcMan if (likely(gc != nullptr)) { if (ix->GetIndexOrder() == IndexOrder::INDEX_ORDER_PRIMARY) { OutputRow = currSentinel->GetData(); + if (currSentinel->GetStable()) { +#ifdef MOT_DEBUG + if (gc->GetGcType() != GcManager::GC_CHECKPOINT) { + MOT_LOG_ERROR("ERROR!!!! Txn Delete with a stable ROW Type = %c\n", gc->GetGcTypeStr()); + MOT_ASSERT(false); + } +#endif + gc->GcRecordObject( + ix->GetIndexId(), currSentinel->GetStable(), nullptr, Row::RowDtor, ROW_SIZE_FROM_POOL(this)); + } gc->GcRecordObject(ix->GetIndexId(), currSentinel, nullptr, Index::SentinelDtor, SENTINEL_SIZE); gc->GcRecordObject(ix->GetIndexId(), OutputRow, nullptr, Row::RowDtor, ROW_SIZE_FROM_POOL(this)); } else { diff --git a/src/gausskernel/storage/mot/core/src/storage/table.h b/src/gausskernel/storage/mot/core/src/storage/table.h index dad21800f..ca69a6edb 100644 --- a/src/gausskernel/storage/mot/core/src/storage/table.h +++ b/src/gausskernel/storage/mot/core/src/storage/table.h @@ -290,6 +290,31 @@ public: */ void Compact(TxnManager* txn); + /** + * @brief Count number of absent sentinels in a table + * @param none + */ +#ifdef MOT_DEBUG + void CountAbsents() + { + uint64_t absentCounter = 0; + MOT_LOG_INFO("Testing table %s \n", GetTableName().c_str()); + for (uint16_t i = 0; i < m_numIndexes; i++) { + Index* index = GetIndex(i); + IndexIterator* it = index->Begin(0); + while (it->IsValid()) { + Sentinel* Sentinel = it->GetPrimarySentinel(); + if (Sentinel->IsDirty()) { + absentCounter++; + } + it->Next(); + } + MOT_LOG_INFO("Found %lu Absents in index %s\n", absentCounter, index->GetName().c_str()); + absentCounter = 0; + } + } +#endif + void ClearRowCache() { m_rowPool->ClearFreeCache(); diff --git a/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_worker.cpp b/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_worker.cpp index 83eb173df..a582a0a92 100644 --- a/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_worker.cpp +++ b/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_worker.cpp @@ -123,10 +123,12 @@ bool CheckpointWorkerPool::Write(Buffer* buffer, Row* row, int fd) return true; } -int CheckpointWorkerPool::Checkpoint(Buffer* buffer, Sentinel* sentinel, int fd, int tid) +int CheckpointWorkerPool::Checkpoint(Buffer* buffer, Sentinel* sentinel, int fd, int tid, bool& isDeleted) { Row* mainRow = sentinel->GetData(); + Row* stableRow = nullptr; int wrote = 0; + isDeleted = false; if (mainRow != nullptr) { bool headerLocked = sentinel->TryLock(tid); @@ -138,11 +140,25 @@ int CheckpointWorkerPool::Checkpoint(Buffer* buffer, Sentinel* sentinel, int fd, } sentinel->Lock(tid); } + stableRow = sentinel->GetStable(); + if (mainRow->IsRowDeleted()) { + if (stableRow) { + // Truely deleted and was not removed by txn manager + isDeleted = true; + } else { + MOT_LOG_DEBUG("Detected Deleted row without Stable Row!"); + } + } + } else { + return 0; + } + + if (unlikely(stableRow == nullptr)) { + stableRow = sentinel->GetStable(); } - Row* stableRow = sentinel->GetStable(); - bool deleted = !sentinel->IsCommited(); /* this currently indicates if the row is deleted or not */ bool statusBit = sentinel->GetStableStatus(); + bool deleted = !sentinel->IsCommited(); /* this currently indicates if the row is deleted or not */ do { if (statusBit == !m_na) { /* has stable version */ @@ -152,8 +168,10 @@ int CheckpointWorkerPool::Checkpoint(Buffer* buffer, Sentinel* sentinel, int fd, if (!Write(buffer, stableRow, fd)) { wrote = -1; } else { - CheckpointUtils::DestroyStableRow(stableRow); - sentinel->SetStable(nullptr); + if (isDeleted == false) { + CheckpointUtils::DestroyStableRow(stableRow); + sentinel->SetStable(nullptr); + } wrote = 1; } break; @@ -202,6 +220,19 @@ Table* CheckpointWorkerPool::GetTask() return table; } +void CheckpointWorkerPool::ExecuteMicroGcTransaction( + Sentinel** deletedList, GcManager* gcSession, Table* table, uint16_t& deletedCounter, uint16_t limit) +{ + if (deletedCounter == limit) { + gcSession->GcStartTxn(); + for (uint16_t i = 0; i < limit; i++) { + Row* out = table->RemoveKeyFromIndex(deletedList[i]->GetData(), deletedList[i], 0, gcSession); + } + gcSession->GcCheckPointClean(); + deletedCounter = 0; + } +} + void CheckpointWorkerPool::WorkerFunc() { MOT_DECLARE_NON_KERNEL_THREAD(); @@ -215,12 +246,23 @@ void CheckpointWorkerPool::WorkerFunc() return; } SessionContext* sessionContext = GetSessionManager()->CreateSessionContext(); - + GcManager* gcSession = sessionContext->GetTxnManager()->GetGcSession(); + gcSession->SetGcType(GcManager::GC_CHECKPOINT); int threadId = MOTCurrThreadId; if (GetGlobalConfiguration().m_enableNuma && !GetTaskAffinity().SetAffinity(threadId)) { MOT_LOG_WARN("Failed to set affinity for checkpoint worker, checkpoint performance may be affected"); } + Sentinel** deletedList = new (nothrow) Sentinel*[DELETE_LIST_SIZE]; + uint16_t deletedListLocation = 0; + if (!deletedList) { + MOT_LOG_ERROR("CheckpointWorkerPool::workerFunc: Failed to initialize buffer"); + m_cpManager.OnError(ErrCodes::MEMORY, "Memory allocation failure"); + MOT::MOTEngine::GetInstance()->OnCurrentThreadEnding(); + MOT_LOG_DEBUG("thread exiting"); + return; + } + while (true) { uint32_t tableId = 0; uint64_t exId = 0; @@ -325,6 +367,7 @@ void CheckpointWorkerPool::WorkerFunc() } bool iterationSucceeded = true; + bool isDeleted = false; while (it->IsValid()) { MOT::Sentinel* Sentinel = it->GetPrimarySentinel(); MOT_ASSERT(Sentinel); @@ -333,8 +376,11 @@ void CheckpointWorkerPool::WorkerFunc() it->Next(); continue; } - - int ckptStatus = Checkpoint(&buffer, Sentinel, fd, threadId); + int ckptStatus = Checkpoint(&buffer, Sentinel, fd, threadId, isDeleted); + if (isDeleted) { + deletedList[deletedListLocation++] = Sentinel; + ExecuteMicroGcTransaction(deletedList, gcSession, table, deletedListLocation, DELETE_LIST_SIZE); + } if (ckptStatus == 1) { numOps++; curSegLen += table->GetTupleSize() + sizeof(CheckpointUtils::EntryHeader); @@ -429,6 +475,13 @@ void CheckpointWorkerPool::WorkerFunc() MOT_LOG_ERROR("CheckpointWorkerPool::finishFile: failed to close file (id: %u)", tableId); } } + if (deletedListLocation > 0) { + MOT_LOG_DEBUG("Checkpoint worker Clean Leftovers Table %c deletedListLocation = %u\n", + table->GetTableName().c_str(), + deletedListLocation); + ExecuteMicroGcTransaction(deletedList, gcSession, table, deletedListLocation, deletedListLocation); + } + table->ClearThreadMemoryCache(); m_cpManager.TaskDone(table, seg, taskSucceeded); @@ -439,7 +492,7 @@ void CheckpointWorkerPool::WorkerFunc() break; } } - + delete[] deletedList; GetSessionManager()->DestroySessionContext(sessionContext); MOT::MOTEngine::GetInstance()->OnCurrentThreadEnding(); MOT_LOG_DEBUG("thread exiting"); diff --git a/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_worker.h b/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_worker.h index 381a0818b..3e30dac55 100644 --- a/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_worker.h +++ b/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_worker.h @@ -89,6 +89,8 @@ public: enum ErrCodes { NO_ERROR = 0, FILE_IO = 1, MEMORY = 2, TABLE = 3, INDEX = 4, CALC = 5 }; + static constexpr uint16_t DELETE_LIST_SIZE = 1000; + private: /** * @brief The main worker function @@ -112,9 +114,10 @@ private: * @param sentinel The sentinel that holds to row. * @param fd The file descriptor to write to. * @param tid The thread id. + * @param isDeleted The row delete status * @return Int equal to -1 on error, 0 if nothing was written and 1 if the row was written. */ - int Checkpoint(Buffer* buffer, Sentinel* sentinel, int fd, int tid); + int Checkpoint(Buffer* buffer, Sentinel* sentinel, int fd, int tid, bool& isDeleted); /** * @brief Pops a task (table pointer) from the tasks queue. @@ -148,6 +151,9 @@ private: */ bool FinishFile(int& fd, uint32_t tableId, uint64_t numOps, uint64_t exId); + void ExecuteMicroGcTransaction( + Sentinel** deletedList, GcManager* gcSession, Table* table, uint16_t& deletedCounter, uint16_t limit); + // Workers void* m_workers; diff --git a/src/gausskernel/storage/mot/core/src/system/transaction/txn.cpp b/src/gausskernel/storage/mot/core/src/system/transaction/txn.cpp index 0dd79c703..747291789 100644 --- a/src/gausskernel/storage/mot/core/src/system/transaction/txn.cpp +++ b/src/gausskernel/storage/mot/core/src/system/transaction/txn.cpp @@ -248,7 +248,6 @@ RC TxnManager::CommitInternal(uint64_t csn) if (!GetGlobalConfiguration().m_enableRedoLog || GetGlobalConfiguration().m_redoLogHandlerType == RedoLogHandlerType::ASYNC_REDO_LOG_HANDLER) { m_occManager.ReleaseLocks(this); - m_occManager.CleanRowsFromIndexes(this); } return RC_OK; } @@ -313,7 +312,6 @@ RC TxnManager::CommitPrepared(uint64_t transactionId) if (!GetGlobalConfiguration().m_enableRedoLog || GetGlobalConfiguration().m_redoLogHandlerType == RedoLogHandlerType::ASYNC_REDO_LOG_HANDLER) { m_occManager.ReleaseLocks(this); - m_occManager.CleanRowsFromIndexes(this); } MOT::DbSessionStatisticsProvider::GetInstance().AddCommitPreparedTxn(); return RC_OK; @@ -340,7 +338,6 @@ RC TxnManager::EndTransaction() GetGlobalConfiguration().m_redoLogHandlerType != RedoLogHandlerType::ASYNC_REDO_LOG_HANDLER && IsFailedCommitPrepared() == false) { m_occManager.ReleaseLocks(this); - m_occManager.CleanRowsFromIndexes(this); } CleanDDLChanges(); Cleanup(); @@ -638,7 +635,7 @@ Row* TxnManager::RemoveKeyFromIndex(Row* row, Sentinel* sentinel) Table* table = row->GetTable(); Row* outputRow = nullptr; - if (row->GetStable() == nullptr) { + if (sentinel->GetStable() == nullptr) { outputRow = table->RemoveKeyFromIndex(row, sentinel, m_threadId, GetGcSession()); } else { // Checkpoint works on primary-sentinel only!