From 23c166cea8efba2bbfa6269a432a422cd53e0c60 Mon Sep 17 00:00:00 2001 From: Vinoth Veeraraghavan Date: Thu, 3 Sep 2020 16:56:42 +0800 Subject: [PATCH] MOT Coldstart optimization --- .../storage/mot/core/src/storage/table.cpp | 20 +++--- .../storage/mot/core/src/storage/table.h | 4 +- .../src/system/recovery/recovery_manager.cpp | 71 +++++++++++-------- .../src/system/recovery/recovery_manager.h | 23 +++++- .../core/src/system/recovery/recovery_ops.cpp | 27 +++++++ 5 files changed, 101 insertions(+), 44 deletions(-) diff --git a/src/gausskernel/storage/mot/core/src/storage/table.cpp b/src/gausskernel/storage/mot/core/src/storage/table.cpp index 475f8c459..9da68eb77 100644 --- a/src/gausskernel/storage/mot/core/src/storage/table.cpp +++ b/src/gausskernel/storage/mot/core/src/storage/table.cpp @@ -388,22 +388,24 @@ bool Table::CreateSecondaryIndexData(MOT::Index* index, TxnManager* txn) return ret; } -RC Table::InsertRowNonTransactional(Row* row, uint64_t tid, Key* k, bool isInterTest) +RC Table::InsertRowNonTransactional(Row* row, uint64_t tid, Key* k, bool skipSecIndex) { RC rc = RC_OK; - MaxKey m_key; + MaxKey key; Key* pk = nullptr; uint64_t surrogateprimaryKey = 0; MOT::Index* ix = GetPrimaryIndex(); uint32_t numIndexes = GetNumIndexes(); SurrogateKeyGenerator& _surr_gen = GetSurrogateKeyManager()->GetSurrogateSlot(MOT_GET_CURRENT_CONNECTION_ID()); - row->SetRowId(_surr_gen.GetSurrogateKey(MOT_GET_CURRENT_CONNECTION_ID())); + if (row->GetRowId() == 0) { + row->SetRowId(_surr_gen.GetSurrogateKey(MOT_GET_CURRENT_CONNECTION_ID())); + } // add row if (k != nullptr) { pk = k; } else { - pk = &m_key; + pk = &key; pk->InitKey(ix->GetKeyLength()); // set primary key if (ix->IsFakePrimary()) { @@ -427,18 +429,18 @@ RC Table::InsertRowNonTransactional(Row* row, uint64_t tid, Key* k, bool isInter } // add secondary indexes - if (!isInterTest) { + if (!skipSecIndex) { for (uint16_t i = 1; i < numIndexes; i++) { ix = GetSecondaryIndex(i); - m_key.InitKey(ix->GetKeyLength()); - ix->BuildKey(this, row, &m_key); - if (ix->IndexInsert(&m_key, row, tid) == nullptr) { + key.InitKey(ix->GetKeyLength()); + ix->BuildKey(this, row, &key); + if (ix->IndexInsert(&key, row, tid) == nullptr) { if (MOT_IS_SEVERE()) { MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "Insert row", "Failed to insert row to secondary index %u, key: %s", i, - m_key.GetKeyStr().c_str()); + key.GetKeyStr().c_str()); } return MOT_GET_LAST_ERROR_RC(); } diff --git a/src/gausskernel/storage/mot/core/src/storage/table.h b/src/gausskernel/storage/mot/core/src/storage/table.h index 54b33b73d..18f736b01 100644 --- a/src/gausskernel/storage/mot/core/src/storage/table.h +++ b/src/gausskernel/storage/mot/core/src/storage/table.h @@ -556,10 +556,10 @@ public: * @param row. New row to be inserted * @param tid The logical identifier of the requesting thread. * @param k row's primary ket - * @param isInterTest determines if secondaries should be added as well + * @param skipSecIndex determines if secondaries should be added as well * @return Status of the operation. */ - RC InsertRowNonTransactional(Row* row, uint64_t tid, Key* k = NULL, bool isInterTest = false); + RC InsertRowNonTransactional(Row* row, uint64_t tid, Key* k = NULL, bool skipSecIndex = false); /** * @brief Inserts a row into a newly created secondary index storage without validation. diff --git a/src/gausskernel/storage/mot/core/src/system/recovery/recovery_manager.cpp b/src/gausskernel/storage/mot/core/src/system/recovery/recovery_manager.cpp index a457c972c..5b94904fd 100644 --- a/src/gausskernel/storage/mot/core/src/system/recovery/recovery_manager.cpp +++ b/src/gausskernel/storage/mot/core/src/system/recovery/recovery_manager.cpp @@ -235,11 +235,18 @@ bool RecoveryManager::RecoverTableMetadata(uint32_t tableId) return (status == RC_OK); } -bool RecoveryManager::RecoverTableRows( - uint32_t tableId, uint32_t seg, uint32_t tid, uint64_t& maxCsn, SurrogateState& sState) +bool RecoveryManager::RecoverTableRows(uint32_t tableId, uint32_t seg, uint32_t tid, char* keyData, char* entryData, + uint64_t& maxCsn, SurrogateState& sState) { RC status = RC_OK; int fd = -1; + Table* table = nullptr; + + if (!GetRecoveryManager()->FetchTable(tableId, table)) { + MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "RecoveryManager::recoverTableRows", "Table %llu does not exist", tableId); + return false; + } + std::string fileName; CheckpointUtils::MakeCpFilename(tableId, fileName, m_workingDir, seg); if (!CheckpointUtils::OpenFileRead(fileName, fd)) { @@ -261,22 +268,14 @@ bool RecoveryManager::RecoverTableRows( return false; } + uint64_t tableExId = table->GetTableExId(); + if (tableExId != fileHeader.m_exId) { + MOT_LOG_ERROR( + "RecoveryManager::recoverTableRows: exId mismatch: my %lu - pkt %lu", tableExId, fileHeader.m_exId); + return false; + } + CheckpointUtils::EntryHeader entry; - char* keyData = (char*)malloc(MAX_KEY_SIZE); - if (keyData == nullptr) { - MOT_LOG_ERROR("RecoveryManager::recoverTableRows: failed to allocate key buffer"); - CheckpointUtils::CloseFile(fd); - return false; - } - - char* entryData = (char*)malloc(MAX_TUPLE_SIZE); - if (entryData == nullptr) { - MOT_LOG_ERROR("RecoveryManager::recoverTableRows: failed to allocate row buffer"); - CheckpointUtils::CloseFile(fd); - free(keyData); - return false; - } - for (uint64_t i = 0; i < fileHeader.m_numOps; i++) { if (IsRecoveryMemoryLimitReached(m_numWorkers)) { MOT_LOG_ERROR("Memory hard limit reached. Cannot recover datanode"); @@ -324,19 +323,16 @@ bool RecoveryManager::RecoverTableRows( break; } - BeginTransaction(); - InsertRow(tableId, - fileHeader.m_exId, + InsertRowFromCheckpoint(table, keyData, entry.m_keyLen, entryData, entry.m_dataLen, entry.m_csn, tid, - m_sState, + sState, status, entry.m_rowId); - status = CommitTransaction(entry.m_csn); if (status != RC_OK) { MOT_LOG_ERROR( "Failed to commit row recovery from checkpoint: %s (error code: %d)", RcToString(status), (int)status); @@ -354,12 +350,7 @@ bool RecoveryManager::RecoverTableRows( seg, fileHeader.m_numOps, status == RC_OK ? "OK" : "Error"); - if (keyData != nullptr) { - free(keyData); - } - if (entryData != nullptr) { - free(entryData); - } + return (status == RC_OK); } @@ -384,6 +375,21 @@ void RecoveryManager::CpWorkerFunc() "RecoveryManager::workerFunc failed to allocate surrogate state"); return; } + + char* keyData = (char*)malloc(MAX_KEY_SIZE); + if (keyData == nullptr) { + GetRecoveryManager()->OnError( + MOT::RecoveryManager::ErrCodes::CP_RECOVERY, "RecoveryManager::workerFunc: failed to allocate key buffer"); + return; + } + + char* entryData = (char*)malloc(MAX_TUPLE_SIZE); + if (entryData == nullptr) { + GetRecoveryManager()->OnError( + MOT::RecoveryManager::ErrCodes::CP_RECOVERY, "RecoveryManager::workerFunc: failed to allocate row buffer"); + free(keyData); + return; + } MOT_LOG_DEBUG("RecoveryManager::workerFunc start [%u] on cpu %lu", (unsigned)MOTCurrThreadId, sched_getcpu()); uint64_t maxCsn = 0; @@ -391,7 +397,7 @@ void RecoveryManager::CpWorkerFunc() uint32_t tableId = 0; uint32_t seg = 0; if (GetTask(tableId, seg)) { - if (!RecoverTableRows(tableId, seg, MOTCurrThreadId, maxCsn, sState)) { + if (!RecoverTableRows(tableId, seg, MOTCurrThreadId, keyData, entryData, maxCsn, sState)) { MOT_LOG_ERROR("RecoveryManager::workerFunc recovery of table %lu's data failed", tableId); GetRecoveryManager()->OnError(MOT::RecoveryManager::ErrCodes::CP_RECOVERY, "RecoveryManager::workerFunc failed to recover table: ", @@ -403,9 +409,12 @@ void RecoveryManager::CpWorkerFunc() } } + free(keyData); + free(entryData); + GetRecoveryManager()->SetCsnIfGreater(maxCsn); - if (sState.IsEmpty() == false) { - GetRecoveryManager()->AddSurrogateArrayToList(m_sState); + if (!sState.IsEmpty()) { + GetRecoveryManager()->AddSurrogateArrayToList(sState); } GetSessionManager()->DestroySessionContext(sessionContext); diff --git a/src/gausskernel/storage/mot/core/src/system/recovery/recovery_manager.h b/src/gausskernel/storage/mot/core/src/system/recovery/recovery_manager.h index 7dfa1a5bc..9db0d8968 100644 --- a/src/gausskernel/storage/mot/core/src/system/recovery/recovery_manager.h +++ b/src/gausskernel/storage/mot/core/src/system/recovery/recovery_manager.h @@ -211,12 +211,15 @@ private: * @param tableId The table id to recover. * @param seg Segment file number to recover from. * @param tid The current thread id + * @param keyData The key buffer to use. + * @param entryData The row buffer to use. * @param maxCsn The returned maxCsn encountered during the recovery. * @param sState Surrogate key state structure that will be filled * during the recovery * @return Boolean value denoting success or failure. */ - bool RecoverTableRows(uint32_t tableId, uint32_t seg, uint32_t tid, uint64_t& maxCsn, SurrogateState& sState); + bool RecoverTableRows(uint32_t tableId, uint32_t seg, uint32_t tid, char* keyData, char* entryData, + uint64_t& maxCsn, SurrogateState& sState); /** * @brief Reads and creates a table's defenition from a checkpoint @@ -814,7 +817,7 @@ private: /** * @brief performs the actual row insertion to the storage. * @param tableId the table's id. - * @param exId the the table's external id. + * @param exId the table's external id. * @param keyData key's data buffer. * @param keyLen key's data buffer len. * @param rowData row's data buffer. @@ -830,6 +833,22 @@ private: uint64_t rowLen, uint64_t csn, uint32_t tid, SurrogateState& sState, RC& status, uint64_t rowId, bool insertLocked = false); + /** + * @brief performs non transactional row insertion (for checkpoint recovery). + * @param table the table's pointer. + * @param keyData key's data buffer. + * @param keyLen key's data buffer len. + * @param rowData row's data buffer. + * @param rowLen row's data buffer len. + * @param csn the operations's csn. + * @param tid the thread id of the recovering thread. + * @param sState the returned surrugate state. + * @param status the returned status of the operation + * @param rowId the row's internal id + */ + static void InsertRowFromCheckpoint(Table* table, char* keyData, uint16_t keyLen, char* rowData, uint64_t rowLen, + uint64_t csn, uint32_t tid, SurrogateState& sState, RC& status, uint64_t rowId); + /** * @brief performs the actual row update in the storage. * @param tableId the table's id. diff --git a/src/gausskernel/storage/mot/core/src/system/recovery/recovery_ops.cpp b/src/gausskernel/storage/mot/core/src/system/recovery/recovery_ops.cpp index 3fb814270..60ccc783b 100644 --- a/src/gausskernel/storage/mot/core/src/system/recovery/recovery_ops.cpp +++ b/src/gausskernel/storage/mot/core/src/system/recovery/recovery_ops.cpp @@ -535,6 +535,33 @@ void RecoveryManager::InsertRow(uint64_t tableId, uint64_t exId, char* keyData, } } +void RecoveryManager::InsertRowFromCheckpoint(Table* table, char* keyData, uint16_t keyLen, char* rowData, + uint64_t rowLen, uint64_t csn, uint32_t tid, SurrogateState& sState, RC& status, uint64_t rowId) +{ + MaxKey key; + Row* row = table->CreateNewRow(); + if (row == nullptr) { + status = RC_ERROR; + MOT_REPORT_ERROR(MOT_ERROR_OOM, "Recovery Manager Insert Row", "failed to create row"); + return; + } + row->CopyData((const uint8_t*)rowData, rowLen); + row->SetCommitSequenceNumber(csn); + row->SetRowId(rowId); + + MOT::Index* ix = table->GetPrimaryIndex(); + if (ix->IsFakePrimary()) { + row->SetSurrogateKey(*(uint64_t*)keyData); + sState.UpdateMaxKey(rowId); + } + key.CpKey((const uint8_t*)keyData, keyLen); + status = table->InsertRowNonTransactional(row, tid, &key); + if (status != RC_OK) { + MOT_REPORT_ERROR(MOT_ERROR_OOM, "Recovery Manager Insert Row", "failed to insert row"); + table->DestroyRow(row); + } +} + void RecoveryManager::DeleteRow( uint64_t tableId, uint64_t exId, char* keyData, uint16_t keyLen, uint64_t csn, uint32_t tid, RC& status) {