MOT Coldstart optimization

This commit is contained in:
Vinoth Veeraraghavan
2020-09-03 16:56:42 +08:00
parent c8fa438bb2
commit 23c166cea8
5 changed files with 101 additions and 44 deletions

View File

@ -388,22 +388,24 @@ bool Table::CreateSecondaryIndexData(MOT::Index* index, TxnManager* txn)
return ret;
}
RC Table::InsertRowNonTransactional(Row* row, uint64_t tid, Key* k, bool isInterTest)
RC Table::InsertRowNonTransactional(Row* row, uint64_t tid, Key* k, bool skipSecIndex)
{
RC rc = RC_OK;
MaxKey m_key;
MaxKey key;
Key* pk = nullptr;
uint64_t surrogateprimaryKey = 0;
MOT::Index* ix = GetPrimaryIndex();
uint32_t numIndexes = GetNumIndexes();
SurrogateKeyGenerator& _surr_gen = GetSurrogateKeyManager()->GetSurrogateSlot(MOT_GET_CURRENT_CONNECTION_ID());
row->SetRowId(_surr_gen.GetSurrogateKey(MOT_GET_CURRENT_CONNECTION_ID()));
if (row->GetRowId() == 0) {
row->SetRowId(_surr_gen.GetSurrogateKey(MOT_GET_CURRENT_CONNECTION_ID()));
}
// add row
if (k != nullptr) {
pk = k;
} else {
pk = &m_key;
pk = &key;
pk->InitKey(ix->GetKeyLength());
// set primary key
if (ix->IsFakePrimary()) {
@ -427,18 +429,18 @@ RC Table::InsertRowNonTransactional(Row* row, uint64_t tid, Key* k, bool isInter
}
// add secondary indexes
if (!isInterTest) {
if (!skipSecIndex) {
for (uint16_t i = 1; i < numIndexes; i++) {
ix = GetSecondaryIndex(i);
m_key.InitKey(ix->GetKeyLength());
ix->BuildKey(this, row, &m_key);
if (ix->IndexInsert(&m_key, row, tid) == nullptr) {
key.InitKey(ix->GetKeyLength());
ix->BuildKey(this, row, &key);
if (ix->IndexInsert(&key, row, tid) == nullptr) {
if (MOT_IS_SEVERE()) {
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL,
"Insert row",
"Failed to insert row to secondary index %u, key: %s",
i,
m_key.GetKeyStr().c_str());
key.GetKeyStr().c_str());
}
return MOT_GET_LAST_ERROR_RC();
}

View File

@ -556,10 +556,10 @@ public:
* @param row. New row to be inserted
* @param tid The logical identifier of the requesting thread.
* @param k row's primary ket
* @param isInterTest determines if secondaries should be added as well
* @param skipSecIndex determines if secondaries should be added as well
* @return Status of the operation.
*/
RC InsertRowNonTransactional(Row* row, uint64_t tid, Key* k = NULL, bool isInterTest = false);
RC InsertRowNonTransactional(Row* row, uint64_t tid, Key* k = NULL, bool skipSecIndex = false);
/**
* @brief Inserts a row into a newly created secondary index storage without validation.

View File

@ -235,11 +235,18 @@ bool RecoveryManager::RecoverTableMetadata(uint32_t tableId)
return (status == RC_OK);
}
bool RecoveryManager::RecoverTableRows(
uint32_t tableId, uint32_t seg, uint32_t tid, uint64_t& maxCsn, SurrogateState& sState)
bool RecoveryManager::RecoverTableRows(uint32_t tableId, uint32_t seg, uint32_t tid, char* keyData, char* entryData,
uint64_t& maxCsn, SurrogateState& sState)
{
RC status = RC_OK;
int fd = -1;
Table* table = nullptr;
if (!GetRecoveryManager()->FetchTable(tableId, table)) {
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "RecoveryManager::recoverTableRows", "Table %llu does not exist", tableId);
return false;
}
std::string fileName;
CheckpointUtils::MakeCpFilename(tableId, fileName, m_workingDir, seg);
if (!CheckpointUtils::OpenFileRead(fileName, fd)) {
@ -261,22 +268,14 @@ bool RecoveryManager::RecoverTableRows(
return false;
}
uint64_t tableExId = table->GetTableExId();
if (tableExId != fileHeader.m_exId) {
MOT_LOG_ERROR(
"RecoveryManager::recoverTableRows: exId mismatch: my %lu - pkt %lu", tableExId, fileHeader.m_exId);
return false;
}
CheckpointUtils::EntryHeader entry;
char* keyData = (char*)malloc(MAX_KEY_SIZE);
if (keyData == nullptr) {
MOT_LOG_ERROR("RecoveryManager::recoverTableRows: failed to allocate key buffer");
CheckpointUtils::CloseFile(fd);
return false;
}
char* entryData = (char*)malloc(MAX_TUPLE_SIZE);
if (entryData == nullptr) {
MOT_LOG_ERROR("RecoveryManager::recoverTableRows: failed to allocate row buffer");
CheckpointUtils::CloseFile(fd);
free(keyData);
return false;
}
for (uint64_t i = 0; i < fileHeader.m_numOps; i++) {
if (IsRecoveryMemoryLimitReached(m_numWorkers)) {
MOT_LOG_ERROR("Memory hard limit reached. Cannot recover datanode");
@ -324,19 +323,16 @@ bool RecoveryManager::RecoverTableRows(
break;
}
BeginTransaction();
InsertRow(tableId,
fileHeader.m_exId,
InsertRowFromCheckpoint(table,
keyData,
entry.m_keyLen,
entryData,
entry.m_dataLen,
entry.m_csn,
tid,
m_sState,
sState,
status,
entry.m_rowId);
status = CommitTransaction(entry.m_csn);
if (status != RC_OK) {
MOT_LOG_ERROR(
"Failed to commit row recovery from checkpoint: %s (error code: %d)", RcToString(status), (int)status);
@ -354,12 +350,7 @@ bool RecoveryManager::RecoverTableRows(
seg,
fileHeader.m_numOps,
status == RC_OK ? "OK" : "Error");
if (keyData != nullptr) {
free(keyData);
}
if (entryData != nullptr) {
free(entryData);
}
return (status == RC_OK);
}
@ -384,6 +375,21 @@ void RecoveryManager::CpWorkerFunc()
"RecoveryManager::workerFunc failed to allocate surrogate state");
return;
}
char* keyData = (char*)malloc(MAX_KEY_SIZE);
if (keyData == nullptr) {
GetRecoveryManager()->OnError(
MOT::RecoveryManager::ErrCodes::CP_RECOVERY, "RecoveryManager::workerFunc: failed to allocate key buffer");
return;
}
char* entryData = (char*)malloc(MAX_TUPLE_SIZE);
if (entryData == nullptr) {
GetRecoveryManager()->OnError(
MOT::RecoveryManager::ErrCodes::CP_RECOVERY, "RecoveryManager::workerFunc: failed to allocate row buffer");
free(keyData);
return;
}
MOT_LOG_DEBUG("RecoveryManager::workerFunc start [%u] on cpu %lu", (unsigned)MOTCurrThreadId, sched_getcpu());
uint64_t maxCsn = 0;
@ -391,7 +397,7 @@ void RecoveryManager::CpWorkerFunc()
uint32_t tableId = 0;
uint32_t seg = 0;
if (GetTask(tableId, seg)) {
if (!RecoverTableRows(tableId, seg, MOTCurrThreadId, maxCsn, sState)) {
if (!RecoverTableRows(tableId, seg, MOTCurrThreadId, keyData, entryData, maxCsn, sState)) {
MOT_LOG_ERROR("RecoveryManager::workerFunc recovery of table %lu's data failed", tableId);
GetRecoveryManager()->OnError(MOT::RecoveryManager::ErrCodes::CP_RECOVERY,
"RecoveryManager::workerFunc failed to recover table: ",
@ -403,9 +409,12 @@ void RecoveryManager::CpWorkerFunc()
}
}
free(keyData);
free(entryData);
GetRecoveryManager()->SetCsnIfGreater(maxCsn);
if (sState.IsEmpty() == false) {
GetRecoveryManager()->AddSurrogateArrayToList(m_sState);
if (!sState.IsEmpty()) {
GetRecoveryManager()->AddSurrogateArrayToList(sState);
}
GetSessionManager()->DestroySessionContext(sessionContext);

View File

@ -211,12 +211,15 @@ private:
* @param tableId The table id to recover.
* @param seg Segment file number to recover from.
* @param tid The current thread id
* @param keyData The key buffer to use.
* @param entryData The row buffer to use.
* @param maxCsn The returned maxCsn encountered during the recovery.
* @param sState Surrogate key state structure that will be filled
* during the recovery
* @return Boolean value denoting success or failure.
*/
bool RecoverTableRows(uint32_t tableId, uint32_t seg, uint32_t tid, uint64_t& maxCsn, SurrogateState& sState);
bool RecoverTableRows(uint32_t tableId, uint32_t seg, uint32_t tid, char* keyData, char* entryData,
uint64_t& maxCsn, SurrogateState& sState);
/**
* @brief Reads and creates a table's defenition from a checkpoint
@ -814,7 +817,7 @@ private:
/**
* @brief performs the actual row insertion to the storage.
* @param tableId the table's id.
* @param exId the the table's external id.
* @param exId the table's external id.
* @param keyData key's data buffer.
* @param keyLen key's data buffer len.
* @param rowData row's data buffer.
@ -830,6 +833,22 @@ private:
uint64_t rowLen, uint64_t csn, uint32_t tid, SurrogateState& sState, RC& status, uint64_t rowId,
bool insertLocked = false);
/**
* @brief performs non transactional row insertion (for checkpoint recovery).
* @param table the table's pointer.
* @param keyData key's data buffer.
* @param keyLen key's data buffer len.
* @param rowData row's data buffer.
* @param rowLen row's data buffer len.
* @param csn the operations's csn.
* @param tid the thread id of the recovering thread.
* @param sState the returned surrugate state.
* @param status the returned status of the operation
* @param rowId the row's internal id
*/
static void InsertRowFromCheckpoint(Table* table, char* keyData, uint16_t keyLen, char* rowData, uint64_t rowLen,
uint64_t csn, uint32_t tid, SurrogateState& sState, RC& status, uint64_t rowId);
/**
* @brief performs the actual row update in the storage.
* @param tableId the table's id.

View File

@ -535,6 +535,33 @@ void RecoveryManager::InsertRow(uint64_t tableId, uint64_t exId, char* keyData,
}
}
void RecoveryManager::InsertRowFromCheckpoint(Table* table, char* keyData, uint16_t keyLen, char* rowData,
uint64_t rowLen, uint64_t csn, uint32_t tid, SurrogateState& sState, RC& status, uint64_t rowId)
{
MaxKey key;
Row* row = table->CreateNewRow();
if (row == nullptr) {
status = RC_ERROR;
MOT_REPORT_ERROR(MOT_ERROR_OOM, "Recovery Manager Insert Row", "failed to create row");
return;
}
row->CopyData((const uint8_t*)rowData, rowLen);
row->SetCommitSequenceNumber(csn);
row->SetRowId(rowId);
MOT::Index* ix = table->GetPrimaryIndex();
if (ix->IsFakePrimary()) {
row->SetSurrogateKey(*(uint64_t*)keyData);
sState.UpdateMaxKey(rowId);
}
key.CpKey((const uint8_t*)keyData, keyLen);
status = table->InsertRowNonTransactional(row, tid, &key);
if (status != RC_OK) {
MOT_REPORT_ERROR(MOT_ERROR_OOM, "Recovery Manager Insert Row", "failed to insert row");
table->DestroyRow(row);
}
}
void RecoveryManager::DeleteRow(
uint64_t tableId, uint64_t exId, char* keyData, uint16_t keyLen, uint64_t csn, uint32_t tid, RC& status)
{