From 93425f57915f4039cf04df0fda60a15029baade1 Mon Sep 17 00:00:00 2001 From: Vinoth Veeraraghavan Date: Tue, 13 Oct 2020 19:03:48 +0800 Subject: [PATCH] Fixes MOT concurrent updates and core dump --- .../occ_transaction_manager.cpp | 12 +++-- .../storage/mot/core/src/storage/row.h | 16 +++--- .../system/checkpoint/checkpoint_worker.cpp | 8 +++ .../storage/mot/core/src/system/global.h | 6 +-- .../mot/core/src/system/transaction/txn.cpp | 5 +- .../src/system/transaction/txn_access.cpp | 53 ++++++++++++++----- .../core/src/system/transaction/txn_access.h | 2 +- 7 files changed, 73 insertions(+), 29 deletions(-) diff --git a/src/gausskernel/storage/mot/core/src/concurrency_control/occ_transaction_manager.cpp b/src/gausskernel/storage/mot/core/src/concurrency_control/occ_transaction_manager.cpp index da5182e8e..4e83534bd 100644 --- a/src/gausskernel/storage/mot/core/src/concurrency_control/occ_transaction_manager.cpp +++ b/src/gausskernel/storage/mot/core/src/concurrency_control/occ_transaction_manager.cpp @@ -70,14 +70,14 @@ bool OccTransactionManager::QuickVersionCheck(const Access* access) bool OccTransactionManager::QuickHeaderValidation(const Access* access) { if (access->m_type != INS) { - // For WR/DEL lets verify CSN + // For WR/DEL/RD_FOR_UPDATE lets verify CSN return QuickVersionCheck(access); } else { // Lets verify the inserts // For upgrade we verify the row // csn has not changed! Sentinel* sent = access->m_origSentinel; - if (access->m_params.IsUpgradeInsert()) { + if (access->m_params.IsUpgradeInsert() and access->m_params.IsDummyDeletedRow() == false) { if (sent->GetData()->GetCommitSequenceNumber() != access->m_tid) { return false; } @@ -227,6 +227,7 @@ RC OccTransactionManager::ValidateOcc(TxnManager* txMan) { uint32_t numSentinelLock = 0; m_rowsLocked = false; + int isolationLevel = txMan->GetTxnIsoLevel(); TxnAccess* tx = txMan->m_accessMgr.Get(); RC rc = RC_OK; const uint32_t rowCount = tx->m_rowCnt; @@ -252,6 +253,7 @@ RC OccTransactionManager::ValidateOcc(TxnManager* txMan) m_rowsSetSize++; } switch (ac->m_type) { + case RD_FOR_UPDATE: case WR: m_writeSetSize++; break; @@ -264,7 +266,11 @@ RC OccTransactionManager::ValidateOcc(TxnManager* txMan) m_writeSetSize++; break; case RD: - readSetSize++; + if (isolationLevel > READ_COMMITED) { + readSetSize++; + } else { + continue; + } break; default: break; diff --git a/src/gausskernel/storage/mot/core/src/storage/row.h b/src/gausskernel/storage/mot/core/src/storage/row.h index fe8f8e083..f4c9f16c0 100644 --- a/src/gausskernel/storage/mot/core/src/storage/row.h +++ b/src/gausskernel/storage/mot/core/src/storage/row.h @@ -145,7 +145,7 @@ public: // We test to make sure pointer was not return into the pull; // If so we copy garbage and we fall in validation. if (data != nullptr) { - errno_t erc = memcpy_s(this->m_data, this->GetTupleSize(), data, size); + errno_t erc = memcpy_s(m_data, GetTupleSize(), data, size); securec_check(erc, "\0", "\0"); } } @@ -190,7 +190,7 @@ public: */ inline void SetAbsentRow() { - this->m_rowHeader.SetAbsentBit(); + m_rowHeader.SetAbsentBit(); } /** @@ -198,7 +198,7 @@ public: */ inline void SetAbsentLockedRow() { - this->m_rowHeader.SetAbsentLockedBit(); + m_rowHeader.SetAbsentLockedBit(); } /** @@ -206,7 +206,7 @@ public: */ inline void UnsetAbsentRow() { - this->m_rowHeader.UnsetAbsentBit(); + m_rowHeader.UnsetAbsentBit(); } /** @@ -215,7 +215,7 @@ public: */ inline bool IsAbsentRow() const { - return this->m_rowHeader.IsAbsent(); + return m_rowHeader.IsAbsent(); } /** @@ -233,7 +233,7 @@ public: */ inline uint64_t GetCommitSequenceNumber() const { - return this->m_rowHeader.GetCSN(); + return m_rowHeader.GetCSN(); } /** @@ -242,7 +242,7 @@ public: */ inline void SetCommitSequenceNumber(uint64_t csn) { - this->m_rowHeader.SetCSN(csn); + m_rowHeader.SetCSN(csn); } /** @@ -540,7 +540,7 @@ protected: */ inline void SetValue(int colId, char* value) { - this->SetValue(colId, const_cast(value)); + SetValue(colId, const_cast(value)); } // disable override diff --git a/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_worker.cpp b/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_worker.cpp index a582a0a92..3134a7c38 100644 --- a/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_worker.cpp +++ b/src/gausskernel/storage/mot/core/src/system/checkpoint/checkpoint_worker.cpp @@ -246,6 +246,13 @@ void CheckpointWorkerPool::WorkerFunc() return; } SessionContext* sessionContext = GetSessionManager()->CreateSessionContext(); + if (sessionContext == nullptr) { + MOT_LOG_ERROR("CheckpointWorkerPool::workerFunc: Failed to initialize Session Context"); + m_cpManager.OnError(ErrCodes::MEMORY, "Memory allocation failure"); + MOT::MOTEngine::GetInstance()->OnCurrentThreadEnding(); + MOT_LOG_DEBUG("thread exiting"); + return; + } GcManager* gcSession = sessionContext->GetTxnManager()->GetGcSession(); gcSession->SetGcType(GcManager::GC_CHECKPOINT); int threadId = MOTCurrThreadId; @@ -258,6 +265,7 @@ void CheckpointWorkerPool::WorkerFunc() if (!deletedList) { MOT_LOG_ERROR("CheckpointWorkerPool::workerFunc: Failed to initialize buffer"); m_cpManager.OnError(ErrCodes::MEMORY, "Memory allocation failure"); + GetSessionManager()->DestroySessionContext(sessionContext); MOT::MOTEngine::GetInstance()->OnCurrentThreadEnding(); MOT_LOG_DEBUG("thread exiting"); return; diff --git a/src/gausskernel/storage/mot/core/src/system/global.h b/src/gausskernel/storage/mot/core/src/system/global.h index e69400b2d..e306a1da8 100644 --- a/src/gausskernel/storage/mot/core/src/system/global.h +++ b/src/gausskernel/storage/mot/core/src/system/global.h @@ -211,6 +211,9 @@ enum AccessType : uint8_t { /** @var Denotes read row access code. */ RD, + /** @var Denotes read for update row access code. */ + RD_FOR_UPDATE, + /** @var Denotes write row access code. */ WR, @@ -220,9 +223,6 @@ enum AccessType : uint8_t { /** @var Denotes insert row access code. */ INS, - /** @var Denotes read for update row access code. */ - RD_FOR_UPDATE, - /** @var Internal code for testing. */ SCAN, diff --git a/src/gausskernel/storage/mot/core/src/system/transaction/txn.cpp b/src/gausskernel/storage/mot/core/src/system/transaction/txn.cpp index 747291789..ba673822c 100644 --- a/src/gausskernel/storage/mot/core/src/system/transaction/txn.cpp +++ b/src/gausskernel/storage/mot/core/src/system/transaction/txn.cpp @@ -109,7 +109,8 @@ Row* TxnManager::RowLookup(const AccessType type, Sentinel* const& originalSenti return m_accessMgr->GetReadCommitedRow(originalSentinel); } else { // Row is not in the cache,map it and return the local row - return m_accessMgr->MapRowtoLocalTable(AccessType::RD, originalSentinel, rc); + AccessType rd_type = (type != RD_FOR_UPDATE) ? RD : RD_FOR_UPDATE; + return m_accessMgr->MapRowtoLocalTable(rd_type, originalSentinel, rc); } } else return nullptr; @@ -922,7 +923,7 @@ RC TxnInsertAction::ExecuteOptimisticInsert(Row* row) MOT_ASSERT(pIndexInsertResult->GetCounter() != 0); // Reuse the row connected to header if (unlikely(pIndexInsertResult->GetData() != nullptr)) { - if (pIndexInsertResult->GetData()->IsAbsentRow()) { + if (pIndexInsertResult->IsCommited() == false) { accessRow = m_manager->m_accessMgr->AddInsertToLocalAccess(pIndexInsertResult, row, rc, true); } } else { diff --git a/src/gausskernel/storage/mot/core/src/system/transaction/txn_access.cpp b/src/gausskernel/storage/mot/core/src/system/transaction/txn_access.cpp index 52908be78..3b17edef0 100644 --- a/src/gausskernel/storage/mot/core/src/system/transaction/txn_access.cpp +++ b/src/gausskernel/storage/mot/core/src/system/transaction/txn_access.cpp @@ -417,6 +417,9 @@ Access* TxnAccess::RowLookup(void* const currentKey) RC TxnAccess::AccessLookup(const AccessType type, Sentinel* const originalSentinel, Row*& r_local_Row) { + if (m_rowCnt == 0) { + return RC::RC_LOCAL_ROW_NOT_FOUND; + } // type is the external operation.. for access the operation is always RD void* key = nullptr; Access* curr_acc = nullptr; @@ -522,7 +525,11 @@ Row* TxnAccess::AddInsertToLocalAccess(Sentinel* org_sentinel, Row* org_row, RC& auto search = m_rowsSet->find(org_sentinel); if (likely(search == m_rowsSet->end())) { if (isUpgrade == true) { - curr_access = GetNewRowAccess(org_sentinel->GetData(), INS, rc); + if (org_sentinel->IsPrimaryIndex()) { + curr_access = GetNewRowAccess(org_sentinel->GetData(), INS, rc); + } else { + curr_access = GetNewRowAccess(org_row, INS, rc); + } // Check if draft is valid if (curr_access == nullptr) { return nullptr; @@ -578,8 +585,14 @@ Row* TxnAccess::AddInsertToLocalAccess(Sentinel* org_sentinel, Row* org_row, RC& return nullptr; } -static const char* const enTxnStates[] = { - stringify(INV), stringify(RD), stringify(WR), stringify(DEL), stringify(INS), stringify(SCAN), stringify(TEST)}; +static const char* const enTxnStates[] = {stringify(INV), + stringify(RD), + stringify(RD_FOR_UPDATE), + stringify(WR), + stringify(DEL), + stringify(INS), + stringify(SCAN), + stringify(TEST)}; enum NS_ACTIONS : uint32_t { NOCHANGE, INC_WRITES, FILTER_DELETES, GENERATE_ACCESS, NS_ERROR }; @@ -594,28 +607,44 @@ typedef union { } Table_Entry; static const Table_Entry txnStateMachine[TSM_SIZE][TSM_SIZE] = { - - {{INV, NS_ACTIONS::NS_ERROR}, - {RD, NS_ACTIONS::NOCHANGE}, - {WR, NS_ACTIONS::INC_WRITES}, - {DEL, NS_ACTIONS::GENERATE_ACCESS}, - {INS, NS_ACTIONS::INC_WRITES}}, // INV states + /* INVALID STATE */ + {{INV, NS_ACTIONS::NS_ERROR}, // INV + {RD, NS_ACTIONS::NOCHANGE}, // RD + {RD_FOR_UPDATE, NS_ACTIONS::NOCHANGE}, // RD_FOT_UPDATE + {WR, NS_ACTIONS::INC_WRITES}, // WR + {DEL, NS_ACTIONS::GENERATE_ACCESS}, // DEL + {INS, NS_ACTIONS::INC_WRITES}}, // INS + /* READ STATE */ {{RD, NS_ACTIONS::NS_ERROR}, {RD, NS_ACTIONS::NOCHANGE}, + {RD_FOR_UPDATE, NS_ACTIONS::NOCHANGE}, {WR, NS_ACTIONS::INC_WRITES}, {DEL, NS_ACTIONS::GENERATE_ACCESS}, - {RD, NS_ACTIONS::NS_ERROR}}, // RD states + {RD, NS_ACTIONS::NS_ERROR}}, + /* READ_FOR_UPDATE STATE */ + {{RD_FOR_UPDATE, NS_ACTIONS::NS_ERROR}, + {RD_FOR_UPDATE, NS_ACTIONS::NOCHANGE}, + {RD_FOR_UPDATE, NS_ACTIONS::NOCHANGE}, + {WR, NS_ACTIONS::INC_WRITES}, + {DEL, NS_ACTIONS::GENERATE_ACCESS}, + {RD_FOR_UPDATE, NS_ACTIONS::NS_ERROR}}, + /* WRITE STATE */ {{WR, NS_ACTIONS::NS_ERROR}, + {WR, NS_ACTIONS::NOCHANGE}, {WR, NS_ACTIONS::NOCHANGE}, {WR, NS_ACTIONS::NOCHANGE}, {DEL, NS_ACTIONS::GENERATE_ACCESS}, - {WR, NS_ACTIONS::NS_ERROR}}, // WR states + {WR, NS_ACTIONS::NS_ERROR}}, + /* DELETE STATE */ {{DEL, NS_ACTIONS::NS_ERROR}, + {DEL, NS_ACTIONS::NOCHANGE}, {DEL, NS_ACTIONS::NOCHANGE}, {DEL, NS_ACTIONS::NS_ERROR}, {DEL, NS_ACTIONS::NS_ERROR}, {INS, NS_ACTIONS::NOCHANGE}}, // DEL states + /* INSERT STATE */ {{INS, NS_ACTIONS::NS_ERROR}, + {INS, NS_ACTIONS::NOCHANGE}, {INS, NS_ACTIONS::NOCHANGE}, {INS, NS_ACTIONS::NOCHANGE}, {DEL, NS_ACTIONS::FILTER_DELETES}, @@ -743,7 +772,7 @@ bool TxnAccess::FilterOrderedSet(Access* element) Access* ac = (*it).second; res = true; rc = m_txnManager->RollbackInsert(ac); - if (ac->m_params.IsUpgradeInsert() == false) { + if (ac->m_params.IsUpgradeInsert() == false or ac->m_params.IsDummyDeletedRow() == true) { m_rowsSet->erase(it); // need to perform index clean-up! ReleaseAccess(ac); diff --git a/src/gausskernel/storage/mot/core/src/system/transaction/txn_access.h b/src/gausskernel/storage/mot/core/src/system/transaction/txn_access.h index e309862d4..4964e332c 100644 --- a/src/gausskernel/storage/mot/core/src/system/transaction/txn_access.h +++ b/src/gausskernel/storage/mot/core/src/system/transaction/txn_access.h @@ -48,7 +48,7 @@ class Row; class TxnManager; /** @var Transaction State Machine size. */ -constexpr uint8_t TSM_SIZE = 5; +constexpr uint8_t TSM_SIZE = 6; /** @var default access size */ constexpr uint32_t DEFAULT_ACCESS_SIZE = 500;