Fixes MOT concurrent updates and core dump

This commit is contained in:
Vinoth Veeraraghavan
2020-10-13 19:03:48 +08:00
parent 81ceea7497
commit 93425f5791
7 changed files with 73 additions and 29 deletions

View File

@ -70,14 +70,14 @@ bool OccTransactionManager::QuickVersionCheck(const Access* access)
bool OccTransactionManager::QuickHeaderValidation(const Access* access)
{
if (access->m_type != INS) {
// For WR/DEL lets verify CSN
// For WR/DEL/RD_FOR_UPDATE lets verify CSN
return QuickVersionCheck(access);
} else {
// Lets verify the inserts
// For upgrade we verify the row
// csn has not changed!
Sentinel* sent = access->m_origSentinel;
if (access->m_params.IsUpgradeInsert()) {
if (access->m_params.IsUpgradeInsert() and access->m_params.IsDummyDeletedRow() == false) {
if (sent->GetData()->GetCommitSequenceNumber() != access->m_tid) {
return false;
}
@ -227,6 +227,7 @@ RC OccTransactionManager::ValidateOcc(TxnManager* txMan)
{
uint32_t numSentinelLock = 0;
m_rowsLocked = false;
int isolationLevel = txMan->GetTxnIsoLevel();
TxnAccess* tx = txMan->m_accessMgr.Get();
RC rc = RC_OK;
const uint32_t rowCount = tx->m_rowCnt;
@ -252,6 +253,7 @@ RC OccTransactionManager::ValidateOcc(TxnManager* txMan)
m_rowsSetSize++;
}
switch (ac->m_type) {
case RD_FOR_UPDATE:
case WR:
m_writeSetSize++;
break;
@ -264,7 +266,11 @@ RC OccTransactionManager::ValidateOcc(TxnManager* txMan)
m_writeSetSize++;
break;
case RD:
readSetSize++;
if (isolationLevel > READ_COMMITED) {
readSetSize++;
} else {
continue;
}
break;
default:
break;

View File

@ -145,7 +145,7 @@ public:
// We test to make sure pointer was not return into the pull;
// If so we copy garbage and we fall in validation.
if (data != nullptr) {
errno_t erc = memcpy_s(this->m_data, this->GetTupleSize(), data, size);
errno_t erc = memcpy_s(m_data, GetTupleSize(), data, size);
securec_check(erc, "\0", "\0");
}
}
@ -190,7 +190,7 @@ public:
*/
inline void SetAbsentRow()
{
this->m_rowHeader.SetAbsentBit();
m_rowHeader.SetAbsentBit();
}
/**
@ -198,7 +198,7 @@ public:
*/
inline void SetAbsentLockedRow()
{
this->m_rowHeader.SetAbsentLockedBit();
m_rowHeader.SetAbsentLockedBit();
}
/**
@ -206,7 +206,7 @@ public:
*/
inline void UnsetAbsentRow()
{
this->m_rowHeader.UnsetAbsentBit();
m_rowHeader.UnsetAbsentBit();
}
/**
@ -215,7 +215,7 @@ public:
*/
inline bool IsAbsentRow() const
{
return this->m_rowHeader.IsAbsent();
return m_rowHeader.IsAbsent();
}
/**
@ -233,7 +233,7 @@ public:
*/
inline uint64_t GetCommitSequenceNumber() const
{
return this->m_rowHeader.GetCSN();
return m_rowHeader.GetCSN();
}
/**
@ -242,7 +242,7 @@ public:
*/
inline void SetCommitSequenceNumber(uint64_t csn)
{
this->m_rowHeader.SetCSN(csn);
m_rowHeader.SetCSN(csn);
}
/**
@ -540,7 +540,7 @@ protected:
*/
inline void SetValue(int colId, char* value)
{
this->SetValue(colId, const_cast<const char*>(value));
SetValue(colId, const_cast<const char*>(value));
}
// disable override

View File

@ -246,6 +246,13 @@ void CheckpointWorkerPool::WorkerFunc()
return;
}
SessionContext* sessionContext = GetSessionManager()->CreateSessionContext();
if (sessionContext == nullptr) {
MOT_LOG_ERROR("CheckpointWorkerPool::workerFunc: Failed to initialize Session Context");
m_cpManager.OnError(ErrCodes::MEMORY, "Memory allocation failure");
MOT::MOTEngine::GetInstance()->OnCurrentThreadEnding();
MOT_LOG_DEBUG("thread exiting");
return;
}
GcManager* gcSession = sessionContext->GetTxnManager()->GetGcSession();
gcSession->SetGcType(GcManager::GC_CHECKPOINT);
int threadId = MOTCurrThreadId;
@ -258,6 +265,7 @@ void CheckpointWorkerPool::WorkerFunc()
if (!deletedList) {
MOT_LOG_ERROR("CheckpointWorkerPool::workerFunc: Failed to initialize buffer");
m_cpManager.OnError(ErrCodes::MEMORY, "Memory allocation failure");
GetSessionManager()->DestroySessionContext(sessionContext);
MOT::MOTEngine::GetInstance()->OnCurrentThreadEnding();
MOT_LOG_DEBUG("thread exiting");
return;

View File

@ -211,6 +211,9 @@ enum AccessType : uint8_t {
/** @var Denotes read row access code. */
RD,
/** @var Denotes read for update row access code. */
RD_FOR_UPDATE,
/** @var Denotes write row access code. */
WR,
@ -220,9 +223,6 @@ enum AccessType : uint8_t {
/** @var Denotes insert row access code. */
INS,
/** @var Denotes read for update row access code. */
RD_FOR_UPDATE,
/** @var Internal code for testing. */
SCAN,

View File

@ -109,7 +109,8 @@ Row* TxnManager::RowLookup(const AccessType type, Sentinel* const& originalSenti
return m_accessMgr->GetReadCommitedRow(originalSentinel);
} else {
// Row is not in the cache,map it and return the local row
return m_accessMgr->MapRowtoLocalTable(AccessType::RD, originalSentinel, rc);
AccessType rd_type = (type != RD_FOR_UPDATE) ? RD : RD_FOR_UPDATE;
return m_accessMgr->MapRowtoLocalTable(rd_type, originalSentinel, rc);
}
} else
return nullptr;
@ -922,7 +923,7 @@ RC TxnInsertAction::ExecuteOptimisticInsert(Row* row)
MOT_ASSERT(pIndexInsertResult->GetCounter() != 0);
// Reuse the row connected to header
if (unlikely(pIndexInsertResult->GetData() != nullptr)) {
if (pIndexInsertResult->GetData()->IsAbsentRow()) {
if (pIndexInsertResult->IsCommited() == false) {
accessRow = m_manager->m_accessMgr->AddInsertToLocalAccess(pIndexInsertResult, row, rc, true);
}
} else {

View File

@ -417,6 +417,9 @@ Access* TxnAccess::RowLookup(void* const currentKey)
RC TxnAccess::AccessLookup(const AccessType type, Sentinel* const originalSentinel, Row*& r_local_Row)
{
if (m_rowCnt == 0) {
return RC::RC_LOCAL_ROW_NOT_FOUND;
}
// type is the external operation.. for access the operation is always RD
void* key = nullptr;
Access* curr_acc = nullptr;
@ -522,7 +525,11 @@ Row* TxnAccess::AddInsertToLocalAccess(Sentinel* org_sentinel, Row* org_row, RC&
auto search = m_rowsSet->find(org_sentinel);
if (likely(search == m_rowsSet->end())) {
if (isUpgrade == true) {
curr_access = GetNewRowAccess(org_sentinel->GetData(), INS, rc);
if (org_sentinel->IsPrimaryIndex()) {
curr_access = GetNewRowAccess(org_sentinel->GetData(), INS, rc);
} else {
curr_access = GetNewRowAccess(org_row, INS, rc);
}
// Check if draft is valid
if (curr_access == nullptr) {
return nullptr;
@ -578,8 +585,14 @@ Row* TxnAccess::AddInsertToLocalAccess(Sentinel* org_sentinel, Row* org_row, RC&
return nullptr;
}
static const char* const enTxnStates[] = {
stringify(INV), stringify(RD), stringify(WR), stringify(DEL), stringify(INS), stringify(SCAN), stringify(TEST)};
static const char* const enTxnStates[] = {stringify(INV),
stringify(RD),
stringify(RD_FOR_UPDATE),
stringify(WR),
stringify(DEL),
stringify(INS),
stringify(SCAN),
stringify(TEST)};
enum NS_ACTIONS : uint32_t { NOCHANGE, INC_WRITES, FILTER_DELETES, GENERATE_ACCESS, NS_ERROR };
@ -594,28 +607,44 @@ typedef union {
} Table_Entry;
static const Table_Entry txnStateMachine[TSM_SIZE][TSM_SIZE] = {
{{INV, NS_ACTIONS::NS_ERROR},
{RD, NS_ACTIONS::NOCHANGE},
{WR, NS_ACTIONS::INC_WRITES},
{DEL, NS_ACTIONS::GENERATE_ACCESS},
{INS, NS_ACTIONS::INC_WRITES}}, // INV states
/* INVALID STATE */
{{INV, NS_ACTIONS::NS_ERROR}, // INV
{RD, NS_ACTIONS::NOCHANGE}, // RD
{RD_FOR_UPDATE, NS_ACTIONS::NOCHANGE}, // RD_FOT_UPDATE
{WR, NS_ACTIONS::INC_WRITES}, // WR
{DEL, NS_ACTIONS::GENERATE_ACCESS}, // DEL
{INS, NS_ACTIONS::INC_WRITES}}, // INS
/* READ STATE */
{{RD, NS_ACTIONS::NS_ERROR},
{RD, NS_ACTIONS::NOCHANGE},
{RD_FOR_UPDATE, NS_ACTIONS::NOCHANGE},
{WR, NS_ACTIONS::INC_WRITES},
{DEL, NS_ACTIONS::GENERATE_ACCESS},
{RD, NS_ACTIONS::NS_ERROR}}, // RD states
{RD, NS_ACTIONS::NS_ERROR}},
/* READ_FOR_UPDATE STATE */
{{RD_FOR_UPDATE, NS_ACTIONS::NS_ERROR},
{RD_FOR_UPDATE, NS_ACTIONS::NOCHANGE},
{RD_FOR_UPDATE, NS_ACTIONS::NOCHANGE},
{WR, NS_ACTIONS::INC_WRITES},
{DEL, NS_ACTIONS::GENERATE_ACCESS},
{RD_FOR_UPDATE, NS_ACTIONS::NS_ERROR}},
/* WRITE STATE */
{{WR, NS_ACTIONS::NS_ERROR},
{WR, NS_ACTIONS::NOCHANGE},
{WR, NS_ACTIONS::NOCHANGE},
{WR, NS_ACTIONS::NOCHANGE},
{DEL, NS_ACTIONS::GENERATE_ACCESS},
{WR, NS_ACTIONS::NS_ERROR}}, // WR states
{WR, NS_ACTIONS::NS_ERROR}},
/* DELETE STATE */
{{DEL, NS_ACTIONS::NS_ERROR},
{DEL, NS_ACTIONS::NOCHANGE},
{DEL, NS_ACTIONS::NOCHANGE},
{DEL, NS_ACTIONS::NS_ERROR},
{DEL, NS_ACTIONS::NS_ERROR},
{INS, NS_ACTIONS::NOCHANGE}}, // DEL states
/* INSERT STATE */
{{INS, NS_ACTIONS::NS_ERROR},
{INS, NS_ACTIONS::NOCHANGE},
{INS, NS_ACTIONS::NOCHANGE},
{INS, NS_ACTIONS::NOCHANGE},
{DEL, NS_ACTIONS::FILTER_DELETES},
@ -743,7 +772,7 @@ bool TxnAccess::FilterOrderedSet(Access* element)
Access* ac = (*it).second;
res = true;
rc = m_txnManager->RollbackInsert(ac);
if (ac->m_params.IsUpgradeInsert() == false) {
if (ac->m_params.IsUpgradeInsert() == false or ac->m_params.IsDummyDeletedRow() == true) {
m_rowsSet->erase(it);
// need to perform index clean-up!
ReleaseAccess(ac);

View File

@ -48,7 +48,7 @@ class Row;
class TxnManager;
/** @var Transaction State Machine size. */
constexpr uint8_t TSM_SIZE = 5;
constexpr uint8_t TSM_SIZE = 6;
/** @var default access size */
constexpr uint32_t DEFAULT_ACCESS_SIZE = 500;