MOT DDL fixes and recovery deadlock issue with ckpt
This commit is contained in:
@ -500,6 +500,11 @@ public:
|
||||
return ((m_indexOrder == IndexOrder::INDEX_ORDER_PRIMARY && m_fake) ? true : false);
|
||||
}
|
||||
|
||||
inline bool IsUnique()
|
||||
{
|
||||
return m_unique;
|
||||
}
|
||||
|
||||
inline void SetIsCommited(bool isCommited)
|
||||
{
|
||||
m_isCommited = isCommited;
|
||||
@ -765,6 +770,7 @@ public:
|
||||
{
|
||||
return m_table;
|
||||
}
|
||||
|
||||
private:
|
||||
MOT::Index* m_indexArr[MAX_NUM_INDEXES];
|
||||
MOT::Table* m_table;
|
||||
|
||||
@ -191,6 +191,7 @@ void Table::SetPrimaryIndex(MOT::Index* index)
|
||||
bool Table::UpdatePrimaryIndex(MOT::Index* index, TxnManager* txn, uint32_t tid)
|
||||
{
|
||||
if (this->m_primaryIndex) {
|
||||
DecIndexColumnUsage(this->m_primaryIndex);
|
||||
if (txn == nullptr) {
|
||||
if (DeleteIndex(this->m_primaryIndex) != RC_OK) {
|
||||
return false;
|
||||
@ -215,7 +216,6 @@ bool Table::UpdatePrimaryIndex(MOT::Index* index, TxnManager* txn, uint32_t tid)
|
||||
RC Table::DeleteIndex(MOT::Index* index)
|
||||
{
|
||||
GcManager::ClearIndexElements(index->GetIndexId());
|
||||
DecIndexColumnUsage(index);
|
||||
delete index;
|
||||
|
||||
return RC::RC_OK;
|
||||
@ -317,6 +317,8 @@ bool Table::CreateSecondaryIndexData(MOT::Index* index, TxnManager* txn)
|
||||
status = txn->AccessLookup(RD, it->GetPrimarySentinel(), tmpRow);
|
||||
switch (status) {
|
||||
case RC::RC_LOCAL_ROW_DELETED:
|
||||
row = nullptr;
|
||||
break;
|
||||
case RC::RC_LOCAL_ROW_NOT_FOUND:
|
||||
break;
|
||||
case RC::RC_LOCAL_ROW_FOUND:
|
||||
|
||||
@ -223,7 +223,8 @@ public:
|
||||
* @param index The index to use.
|
||||
* @return void.
|
||||
*/
|
||||
void RemoveSecondaryIndexFromMetaData(MOT::Index* index) {
|
||||
void RemoveSecondaryIndexFromMetaData(MOT::Index* index)
|
||||
{
|
||||
if (!index->IsPrimaryKey()) {
|
||||
uint16_t rmIx = 0;
|
||||
for (uint16_t i = 1; i < m_numIndexes; i++) {
|
||||
@ -235,6 +236,7 @@ public:
|
||||
|
||||
// prevent removing primary by mistake
|
||||
if (rmIx > 0) {
|
||||
DecIndexColumnUsage(index);
|
||||
m_numIndexes--;
|
||||
for (uint16_t i = rmIx; i < m_numIndexes; i++) {
|
||||
m_indexes[i] = m_indexes[i + 1];
|
||||
@ -251,8 +253,10 @@ public:
|
||||
* @param index The index to use.
|
||||
* @return void.
|
||||
*/
|
||||
void AddSecondaryIndexToMetaData(MOT::Index* index) {
|
||||
void AddSecondaryIndexToMetaData(MOT::Index* index)
|
||||
{
|
||||
if (!index->IsPrimaryKey()) {
|
||||
IncIndexColumnUsage(index);
|
||||
m_secondaryIndexes[index->GetName()] = index;
|
||||
m_indexes[m_numIndexes] = index;
|
||||
++m_numIndexes;
|
||||
@ -289,7 +293,9 @@ public:
|
||||
{
|
||||
m_rowPool->ClearFreeCache();
|
||||
for (int i = 0; i < m_numIndexes; i++) {
|
||||
m_indexes[i]->ClearFreeCache();
|
||||
if (m_indexes[i] != nullptr) {
|
||||
m_indexes[i]->ClearFreeCache();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -109,15 +109,28 @@ bool CheckpointManager::CreateSnapShot()
|
||||
// phase that are not yet completed
|
||||
WaitPrevPhaseCommittedTxnComplete();
|
||||
|
||||
// Move to prepare
|
||||
// Move to PREPARE phase
|
||||
m_lock.WrLock();
|
||||
MoveToNextPhase();
|
||||
m_lock.WrUnlock();
|
||||
|
||||
while (m_phase != CheckpointPhase::CAPTURE) {
|
||||
while (m_phase != CheckpointPhase::RESOLVE) {
|
||||
usleep(50000L);
|
||||
}
|
||||
|
||||
// Ensure that all the transactions that started commit in PREPARE phase are completed.
|
||||
WaitPrevPhaseCommittedTxnComplete();
|
||||
|
||||
// Now in RESOLVE phase, no transaction is allowed to start the commit.
|
||||
// It is safe now to obtain a list of all tables to included in this checkpoint.
|
||||
// The tables are read locked in order to avoid drop/truncate during checkpoint.
|
||||
FillTasksQueue();
|
||||
|
||||
// Move to CAPTURE phase
|
||||
m_lock.WrLock();
|
||||
MoveToNextPhase();
|
||||
m_lock.WrUnlock();
|
||||
|
||||
return !m_errorSet;
|
||||
}
|
||||
|
||||
@ -329,12 +342,6 @@ void CheckpointManager::MoveToNextPhase()
|
||||
m_phase = (CheckpointPhase)nextPhase;
|
||||
m_cntBit = !m_cntBit;
|
||||
|
||||
if (m_phase == CheckpointPhase::PREPARE) {
|
||||
// Obtain a list of all tables. The tables are read locked
|
||||
// in order to avoid delete/truncate during checkpoint.
|
||||
FillTasksQueue();
|
||||
}
|
||||
|
||||
if (m_phase == CheckpointPhase::CAPTURE) {
|
||||
if (m_redoLogHandler != nullptr) {
|
||||
// hold the redo log lock to avoid inserting additional entries to the
|
||||
|
||||
@ -357,7 +357,7 @@ private:
|
||||
|
||||
inline bool IsAutoCompletePhase() const
|
||||
{
|
||||
if (m_phase == PREPARE || m_phase == RESOLVE)
|
||||
if (m_phase == PREPARE)
|
||||
return true;
|
||||
else
|
||||
return false;
|
||||
|
||||
@ -315,6 +315,13 @@ public:
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* In recovery/replay, OCC Validation should always wait, since there should not be any conflict.
|
||||
* The checkpoint locks the row before writing to the checkpoint buffer. So, if the OCC validation
|
||||
* is set to true, commit transaction might fail with RC_ABORT during replay.
|
||||
*/
|
||||
ctx->GetTxnManager()->SetValidationNoWait(false);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@ -36,9 +36,6 @@
|
||||
namespace MOT {
|
||||
DECLARE_LOGGER(RecoveryManager, Recovery);
|
||||
|
||||
constexpr uint32_t NUM_DELETE_THRESHOLD = 5000;
|
||||
constexpr uint32_t NUM_DELETE_MAX_INC = 500;
|
||||
|
||||
bool RecoveryManager::Initialize()
|
||||
{
|
||||
// in a thread-pooled envelope the affinity could be disabled, so we use task affinity here
|
||||
@ -572,13 +569,6 @@ bool RecoveryManager::RecoverDbEnd()
|
||||
m_logStats->Print();
|
||||
}
|
||||
|
||||
if (m_numRedoOps != 0) {
|
||||
ClearTableCache();
|
||||
GcManager* gc = MOT_GET_CURRENT_SESSION_CONTEXT()->GetTxnManager()->GetGcSession();
|
||||
if (gc != nullptr) {
|
||||
gc->GcEndTxn();
|
||||
}
|
||||
}
|
||||
bool success = !m_errorSet;
|
||||
MOT_LOG_INFO("MOT recovery %s", success ? "completed" : "failed");
|
||||
return success;
|
||||
@ -788,18 +778,6 @@ RC RecoveryManager::RedoSegment(LogSegment* segment, uint64_t csn, uint64_t tran
|
||||
if (wasCommit) {
|
||||
txnStarted = false;
|
||||
}
|
||||
|
||||
GcManager* gc = MOTCurrTxn->GetGcSession();
|
||||
if (m_numRedoOps == 0 && gc != nullptr) {
|
||||
gc->GcStartTxn();
|
||||
}
|
||||
if (++m_numRedoOps > NUM_DELETE_THRESHOLD) {
|
||||
ClearTableCache();
|
||||
if (gc != nullptr) {
|
||||
gc->GcEndTxn();
|
||||
}
|
||||
m_numRedoOps = 0;
|
||||
}
|
||||
} else {
|
||||
operationData +=
|
||||
TwoPhaseRecoverOp(rState, operationData, csn, transactionId, MOTCurrThreadId, m_sState, status);
|
||||
@ -1376,21 +1354,4 @@ bool RecoveryManager::IsRecoveryMemoryLimitReached(uint32_t numThreads)
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
void RecoveryManager::ClearTableCache()
|
||||
{
|
||||
auto it = m_tableDeletesStat.begin();
|
||||
while (it != m_tableDeletesStat.end()) {
|
||||
auto table = *it;
|
||||
if (table.second > NUM_DELETE_MAX_INC) {
|
||||
MOT_LOG_TRACE("RecoveryManager::ClearTableCache: Table = %s items = %lu\n",
|
||||
table.first->GetTableName().c_str(),
|
||||
table.second);
|
||||
table.first->ClearRowCache();
|
||||
it = m_tableDeletesStat.erase(it);
|
||||
} else {
|
||||
it++;
|
||||
}
|
||||
}
|
||||
}
|
||||
} // namespace MOT
|
||||
|
||||
@ -187,8 +187,7 @@ public:
|
||||
m_errorSet(false),
|
||||
m_clogCallback(nullptr),
|
||||
m_threadId(AllocThreadId()),
|
||||
m_maxConnections(GetGlobalConfiguration().m_maxConnections),
|
||||
m_numRedoOps(0)
|
||||
m_maxConnections(GetGlobalConfiguration().m_maxConnections)
|
||||
{}
|
||||
|
||||
~RecoveryManager()
|
||||
@ -612,11 +611,6 @@ public:
|
||||
m_inProcessTxLock.unlock();
|
||||
}
|
||||
|
||||
inline void IncreaseTableDeletesStat(Table* t)
|
||||
{
|
||||
m_tableDeletesStat[t]++;
|
||||
}
|
||||
|
||||
inline void SetLastReplayLsn(uint64_t lastReplayLsn)
|
||||
{
|
||||
if (m_lastReplayLsn < lastReplayLsn) {
|
||||
@ -629,14 +623,10 @@ public:
|
||||
return m_lastReplayLsn;
|
||||
}
|
||||
|
||||
void ClearTableCache();
|
||||
|
||||
LogStats* m_logStats;
|
||||
|
||||
std::map<uint64_t, TableInfo*> m_preCommitedTables;
|
||||
|
||||
std::unordered_map<Table*, uint32_t> m_tableDeletesStat;
|
||||
|
||||
private:
|
||||
static constexpr uint32_t NUM_REDO_RECOVERY_THREADS = 1;
|
||||
|
||||
@ -957,18 +947,6 @@ private:
|
||||
static void RecoverTwoPhaseAbort(
|
||||
Table* table, OperationCode opCode, uint64_t csn, uint64_t transactionId, uint32_t tid, Row* row, RC& status);
|
||||
|
||||
/**
|
||||
* @brief checks if a duplicate row exists in the table
|
||||
* @param table the table object pointer
|
||||
* @param keyData key's data buffer.
|
||||
* @param keyLen key's data buffer len.
|
||||
* @param rowData row's data buffer.
|
||||
* @param rowLen row's data buffer len.
|
||||
* @param tid the thread id of the recovering thread.
|
||||
*/
|
||||
static bool DuplicateRow(
|
||||
Table* table, char* keyData, uint16_t keyLen, char* rowData, uint64_t rowLen, uint32_t tid);
|
||||
|
||||
/**
|
||||
* @brief checks if an operation is supported by the recovery.
|
||||
* @param op the operation code to check.
|
||||
@ -1107,8 +1085,6 @@ private:
|
||||
SurrogateState m_sState;
|
||||
|
||||
uint16_t m_maxConnections;
|
||||
|
||||
uint32_t m_numRedoOps;
|
||||
};
|
||||
} // namespace MOT
|
||||
|
||||
|
||||
@ -286,7 +286,8 @@ uint32_t RecoveryManager::RecoverLogOperationInsert(
|
||||
|
||||
uint32_t RecoveryManager::RecoverLogOperationUpdate(uint8_t* data, uint64_t csn, uint32_t tid, RC& status)
|
||||
{
|
||||
uint64_t tableId, rowLength, exId;
|
||||
uint64_t tableId;
|
||||
uint64_t exId;
|
||||
uint16_t keyLength;
|
||||
uint8_t *keyData, *rowData;
|
||||
status = RC_OK;
|
||||
@ -326,9 +327,9 @@ uint32_t RecoveryManager::RecoverLogOperationUpdate(uint8_t* data, uint64_t csn,
|
||||
MOT_REPORT_ERROR(MOT_ERROR_OOM, "Recovery Manager Update Row", "failed to allocate key");
|
||||
return 0;
|
||||
}
|
||||
|
||||
key->CpKey((const uint8_t*)keyData, keyLength);
|
||||
Row* row = MOTCurrTxn->RowLookupByKey(table, RD_FOR_UPDATE, key);
|
||||
|
||||
if (row == nullptr) {
|
||||
// Row not found. Error!!! Got an update for non existing row.
|
||||
MOTCurrTxn->DestroyTxnKey(key);
|
||||
@ -341,19 +342,18 @@ uint32_t RecoveryManager::RecoverLogOperationUpdate(uint8_t* data, uint64_t csn,
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool doUpdate = true;
|
||||
|
||||
// In case row has higher CSN, don't perform the update
|
||||
// we still need to calculate the length of the operation
|
||||
// in order to skip for the next one
|
||||
// CSNs can be equal if updated during the same transaction
|
||||
if (row->GetCommitSequenceNumber() > csn) {
|
||||
MOT_LOG_WARN("Recovery Manager Update Row, tableId: %lu - row csn is newer! %lu > %lu {%s}",
|
||||
tableId,
|
||||
// Row CSN is newer. Error!!!
|
||||
MOTCurrTxn->DestroyTxnKey(key);
|
||||
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL,
|
||||
"Recovery Manager Update Row",
|
||||
"row CSN is newer! %lu > %lu, key: %s, tableId: %lu",
|
||||
row->GetCommitSequenceNumber(),
|
||||
csn,
|
||||
key->GetKeyStr().c_str());
|
||||
doUpdate = false;
|
||||
key->GetKeyStr().c_str(),
|
||||
tableId);
|
||||
status = RC_ERROR;
|
||||
return 0;
|
||||
}
|
||||
|
||||
uint16_t num_columns = table->GetFieldCount() - 1;
|
||||
@ -369,28 +369,22 @@ uint32_t RecoveryManager::RecoverLogOperationUpdate(uint8_t* data, uint64_t csn,
|
||||
if (updated_columns_it.IsSet()) {
|
||||
if (valid_columns_it.IsSet()) {
|
||||
Column* column = table->GetField(updated_columns_it.GetPosition() + 1);
|
||||
if (doUpdate) {
|
||||
row_valid_columns.SetBit(updated_columns_it.GetPosition());
|
||||
erc = memcpy_s(rowData + column->m_offset, column->m_size, data, column->m_size);
|
||||
securec_check(erc, "\0", "\0");
|
||||
}
|
||||
row_valid_columns.SetBit(updated_columns_it.GetPosition());
|
||||
erc = memcpy_s(rowData + column->m_offset, column->m_size, data, column->m_size);
|
||||
securec_check(erc, "\0", "\0");
|
||||
size += column->m_size;
|
||||
data += column->m_size;
|
||||
} else {
|
||||
if (doUpdate) {
|
||||
row_valid_columns.UnsetBit(updated_columns_it.GetPosition());
|
||||
}
|
||||
row_valid_columns.UnsetBit(updated_columns_it.GetPosition());
|
||||
}
|
||||
}
|
||||
valid_columns_it.Next();
|
||||
updated_columns_it.Next();
|
||||
}
|
||||
|
||||
if (doUpdate) {
|
||||
MOTCurrTxn->UpdateLastRowState(MOT::AccessType::WR);
|
||||
}
|
||||
MOTCurrTxn->UpdateLastRowState(MOT::AccessType::WR);
|
||||
MOTCurrTxn->DestroyTxnKey(key);
|
||||
if (MOT::GetRecoveryManager()->m_logStats != nullptr && doUpdate)
|
||||
if (MOT::GetRecoveryManager()->m_logStats != nullptr)
|
||||
MOT::GetRecoveryManager()->m_logStats->IncUpdate(tableId);
|
||||
return sizeof(OperationCode) + sizeof(tableId) + sizeof(exId) + sizeof(keyLength) + keyLength +
|
||||
updated_columns.GetLength() + valid_columns.GetLength() + size;
|
||||
@ -536,12 +530,8 @@ void RecoveryManager::InsertRow(uint64_t tableId, uint64_t exId, char* keyData,
|
||||
row->GetPrimarySentinel()->Lock(0);
|
||||
}
|
||||
|
||||
if (status == RC_UNIQUE_VIOLATION && DuplicateRow(table, keyData, keyLen, rowData, rowLen, tid)) {
|
||||
// Same row already exists. ok.
|
||||
// no need to destroy row (already destroyed by TxnManager::InsertRow() in case of unique violation)
|
||||
status = RC_OK;
|
||||
} else if (status == RC_MEMORY_ALLOCATION_ERROR) {
|
||||
MOT_REPORT_ERROR(MOT_ERROR_OOM, "Recovery Manager Insert Row", "failed to insert row");
|
||||
if (status != RC_OK) {
|
||||
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "Recovery Manager Insert Row", "failed to insert row (status %u)", status);
|
||||
}
|
||||
}
|
||||
|
||||
@ -583,8 +573,6 @@ void RecoveryManager::DeleteRow(
|
||||
RcToString(status),
|
||||
status);
|
||||
}
|
||||
} else {
|
||||
GetRecoveryManager()->IncreaseTableDeletesStat(table);
|
||||
}
|
||||
} else {
|
||||
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "Recovery Manager Delete Row", "getData failed");
|
||||
@ -620,6 +608,7 @@ void RecoveryManager::UpdateRow(uint64_t tableId, uint64_t exId, char* keyData,
|
||||
MOT_REPORT_ERROR(MOT_ERROR_OOM, "Recovery Manager Update Row", "failed to create key");
|
||||
return;
|
||||
}
|
||||
|
||||
key->CpKey((const uint8_t*)keyData, keyLen);
|
||||
Row* row = MOTCurrTxn->RowLookupByKey(table, RD_FOR_UPDATE, key);
|
||||
if (row == nullptr) {
|
||||
@ -642,10 +631,17 @@ void RecoveryManager::UpdateRow(uint64_t tableId, uint64_t exId, char* keyData,
|
||||
}
|
||||
MOTCurrTxn->UpdateLastRowState(MOT::AccessType::WR);
|
||||
} else {
|
||||
MOT_LOG_WARN("RecoveryManager::updateRow, tableId: %lu - row csn is newer! %lu > %lu",
|
||||
tableId,
|
||||
// Row CSN is newer. Error!!!
|
||||
MOTCurrTxn->DestroyTxnKey(key);
|
||||
status = RC_ERROR;
|
||||
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL,
|
||||
"RecoveryManager::updateRow",
|
||||
"row CSN is newer! %lu > %lu, key: %s, tableId: %lu",
|
||||
row->GetCommitSequenceNumber(),
|
||||
csn);
|
||||
csn,
|
||||
key->GetKeyStr().c_str(),
|
||||
tableId);
|
||||
return;
|
||||
}
|
||||
}
|
||||
MOTCurrTxn->DestroyTxnKey(key);
|
||||
@ -723,8 +719,6 @@ void RecoveryManager::DropTable(char* data, RC& status)
|
||||
"Failed to drop table %s [%" PRIu64 "])",
|
||||
tableName.c_str(),
|
||||
externalTableId);
|
||||
} else {
|
||||
MOT::GetRecoveryManager()->m_tableDeletesStat.erase(table);
|
||||
}
|
||||
MOT_LOG_DEBUG("RecoveryManager::DropTable: table %s [%" PRIu64 "] dropped", tableName.c_str(), externalTableId);
|
||||
}
|
||||
@ -747,7 +741,10 @@ void RecoveryManager::CreateIndex(char* data, uint32_t tid, RC& status)
|
||||
|
||||
in = table->DesrializeMeta(in, idx);
|
||||
bool primary = idx.m_indexOrder == IndexOrder::INDEX_ORDER_PRIMARY;
|
||||
MOT_LOG_DEBUG("createIndex: creating %s Index", primary ? "Primary" : "Secondary");
|
||||
MOT_LOG_DEBUG("createIndex: creating %s Index, %s %lu",
|
||||
primary ? "Primary" : "Secondary",
|
||||
idx.m_name.c_str(),
|
||||
idx.m_indexExtId);
|
||||
Index* index = nullptr;
|
||||
status = table->CreateIndexFromMeta(idx, primary, tid, false, &index);
|
||||
if (status != RC_OK) {
|
||||
@ -1083,52 +1080,6 @@ void RecoveryManager::RecoverTwoPhaseAbort(
|
||||
}
|
||||
}
|
||||
|
||||
bool RecoveryManager::DuplicateRow(
|
||||
Table* table, char* keyData, uint16_t keyLen, char* rowData, uint64_t rowLen, uint32_t tid)
|
||||
{
|
||||
bool res = false;
|
||||
Key* key = nullptr;
|
||||
RC rc = RC_ERROR;
|
||||
Row* row = nullptr;
|
||||
Index* index = nullptr;
|
||||
do {
|
||||
index = table->GetPrimaryIndex();
|
||||
if (index == nullptr) {
|
||||
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "Recovery Manager Duplicate Row", "failed to find the primary index");
|
||||
break;
|
||||
}
|
||||
key = index->CreateNewKey();
|
||||
if (key == nullptr) {
|
||||
MOT_REPORT_ERROR(MOT_ERROR_OOM, "Recovery Manager Duplicate Row", "failed to create a key");
|
||||
break;
|
||||
}
|
||||
key->CpKey((const uint8_t*)keyData, keyLen);
|
||||
|
||||
Row* row = index->IndexRead(key, tid);
|
||||
if (row == nullptr) {
|
||||
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "Recovery Manager Duplicate Row", "failed to find row");
|
||||
break;
|
||||
}
|
||||
if (memcmp(row->GetData(), rowData, rowLen)) {
|
||||
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL,
|
||||
"Recovery Manager Duplicate Row",
|
||||
"rows differ! (Table %lu:%u:%s nidx: %u)",
|
||||
table->GetTableExId(),
|
||||
table->GetTableId(),
|
||||
table->GetTableName().c_str(),
|
||||
table->GetNumIndexes());
|
||||
CheckpointUtils::Hexdump("New Row", rowData, rowLen);
|
||||
CheckpointUtils::Hexdump("Orig Row", (char*)row->GetData(), rowLen);
|
||||
break;
|
||||
}
|
||||
res = true;
|
||||
} while (0);
|
||||
|
||||
if (key != nullptr && index != nullptr)
|
||||
index->DestroyKey(key);
|
||||
return res;
|
||||
}
|
||||
|
||||
bool RecoveryManager::BeginTransaction(uint64_t replayLsn /* = 0 */)
|
||||
{
|
||||
bool result = false;
|
||||
|
||||
@ -240,7 +240,6 @@ RC TxnManager::CommitInternal(uint64_t csn)
|
||||
}
|
||||
// first write to redo log, then write changes
|
||||
m_redoLog.Commit();
|
||||
WriteDDLChanges();
|
||||
if (!m_occManager.WriteChanges(this))
|
||||
return RC_PANIC;
|
||||
if (GetGlobalConfiguration().m_enableCheckpoint) {
|
||||
@ -282,7 +281,7 @@ RC TxnManager::LiteCommit(uint64_t transcationId)
|
||||
|
||||
// first write to redo log, then write changes
|
||||
m_redoLog.Commit();
|
||||
WriteDDLChanges();
|
||||
CleanDDLChanges();
|
||||
Cleanup();
|
||||
}
|
||||
MOT::DbSessionStatisticsProvider::GetInstance().AddCommitTxn();
|
||||
@ -308,7 +307,6 @@ RC TxnManager::CommitPrepared(uint64_t transactionId)
|
||||
m_redoLog.CommitPrepared();
|
||||
|
||||
// Run second validation phase
|
||||
WriteDDLChanges();
|
||||
if (!m_occManager.WriteChanges(this))
|
||||
return RC_PANIC;
|
||||
GetCheckpointManager()->TransactionCompleted(this);
|
||||
@ -329,7 +327,7 @@ RC TxnManager::LiteCommitPrepared(uint64_t transactionId)
|
||||
|
||||
// first write to redo log, then write changes
|
||||
m_redoLog.CommitPrepared();
|
||||
WriteDDLChanges();
|
||||
CleanDDLChanges();
|
||||
Cleanup();
|
||||
}
|
||||
MOT::DbSessionStatisticsProvider::GetInstance().AddCommitPreparedTxn();
|
||||
@ -344,6 +342,7 @@ RC TxnManager::EndTransaction()
|
||||
m_occManager.ReleaseLocks(this);
|
||||
m_occManager.CleanRowsFromIndexes(this);
|
||||
}
|
||||
CleanDDLChanges();
|
||||
Cleanup();
|
||||
return RC::RC_OK;
|
||||
}
|
||||
@ -547,6 +546,7 @@ void TxnManager::RollbackDDLs()
|
||||
table->GetLongTableName().c_str());
|
||||
table->WrLock();
|
||||
if (index->IsPrimaryKey()) {
|
||||
table->DecIndexColumnUsage(index);
|
||||
table->SetPrimaryIndex(nullptr);
|
||||
table->DeleteIndex(index);
|
||||
} else {
|
||||
@ -562,6 +562,7 @@ void TxnManager::RollbackDDLs()
|
||||
table->GetLongTableName().c_str());
|
||||
table->WrLock();
|
||||
if (index->IsPrimaryKey()) {
|
||||
table->IncIndexColumnUsage(index);
|
||||
table->SetPrimaryIndex(index);
|
||||
} else {
|
||||
table->AddSecondaryIndexToMetaData(index);
|
||||
@ -574,7 +575,7 @@ void TxnManager::RollbackDDLs()
|
||||
}
|
||||
}
|
||||
|
||||
void TxnManager::WriteDDLChanges()
|
||||
void TxnManager::CleanDDLChanges()
|
||||
{
|
||||
// early exit
|
||||
if (m_txnDdlAccess->Size() == 0)
|
||||
@ -596,15 +597,11 @@ void TxnManager::WriteDDLChanges()
|
||||
indexArr = (MOTIndexArr*)ddl_access->GetEntry();
|
||||
if (indexArr->GetNumIndexes() > 0) {
|
||||
table = indexArr->GetTable();
|
||||
table->WrLock();
|
||||
table->m_rowCount = 0;
|
||||
for (int i = 0; i < indexArr->GetNumIndexes(); i++) {
|
||||
index = indexArr->GetIndex(i);
|
||||
GcManager::ClearIndexElements(index->GetIndexId());
|
||||
index->Truncate(true);
|
||||
delete index;
|
||||
table->DeleteIndex(index);
|
||||
}
|
||||
table->Unlock();
|
||||
}
|
||||
delete indexArr;
|
||||
break;
|
||||
@ -612,21 +609,11 @@ void TxnManager::WriteDDLChanges()
|
||||
index = (Index*)ddl_access->GetEntry();
|
||||
table = index->GetTable();
|
||||
index->SetIsCommited(true);
|
||||
if (index->IsPrimaryKey()) {
|
||||
table->WrLock();
|
||||
table->SetPrimaryIndex(index);
|
||||
table->Unlock();
|
||||
}
|
||||
break;
|
||||
case DDL_ACCESS_DROP_INDEX:
|
||||
index = (Index*)ddl_access->GetEntry();
|
||||
table = index->GetTable();
|
||||
table->WrLock();
|
||||
if (index->IsPrimaryKey()) {
|
||||
table->SetPrimaryIndex(nullptr);
|
||||
}
|
||||
table->DeleteIndex(index);
|
||||
table->Unlock();
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
@ -1142,10 +1129,8 @@ RC TxnManager::TruncateTable(Table* table)
|
||||
MOT::Index* index_copy = index->CloneEmpty();
|
||||
if (index_copy == nullptr) {
|
||||
// print error, could not allocate memory for index
|
||||
MOT_REPORT_ERROR(MOT_ERROR_OOM,
|
||||
"Truncate Table",
|
||||
"Failed to clone empty index %s",
|
||||
index->GetName().c_str());
|
||||
MOT_REPORT_ERROR(
|
||||
MOT_ERROR_OOM, "Truncate Table", "Failed to clone empty index %s", index->GetName().c_str());
|
||||
for (uint16_t j = 0; j < indexesArr->GetNumIndexes(); j++) {
|
||||
// cleanup of previous created indexes copy
|
||||
MOT::Index* oldIndex = indexesArr->GetIndex(j);
|
||||
|
||||
@ -462,6 +462,17 @@ public:
|
||||
m_failedCommitPrepared = value;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Sets or clears the validate-no-wait flag in OccTransactionManager.
|
||||
* @detail Determines whether to call Access::lock() or
|
||||
* Access::try_lock() on each access item in the write set.
|
||||
* @param b The new validate-no-wait flag state.
|
||||
*/
|
||||
void SetValidationNoWait(bool b)
|
||||
{
|
||||
m_occManager.SetValidationNoWait(b);
|
||||
}
|
||||
|
||||
private:
|
||||
static constexpr uint32_t SESSION_ID_BITS = 32;
|
||||
|
||||
@ -469,7 +480,7 @@ private:
|
||||
* @brief Apply all transactional DDL changes. DDL changes are not handled
|
||||
* by occ.
|
||||
*/
|
||||
void WriteDDLChanges();
|
||||
void CleanDDLChanges();
|
||||
|
||||
/**
|
||||
* @brief Reclaims all resources associated with the transaction and
|
||||
|
||||
@ -30,6 +30,8 @@
|
||||
#include "redo_log.h"
|
||||
|
||||
namespace MOT {
|
||||
DECLARE_LOGGER(RedoLog, RedoLog);
|
||||
|
||||
RedoLog::RedoLog(TxnManager* txn)
|
||||
: m_redoLogHandler(nullptr),
|
||||
m_redoBuffer(nullptr),
|
||||
@ -129,7 +131,7 @@ RC RedoLog::InsertRow(Row* row)
|
||||
if (!m_configuration.m_enableRedoLog)
|
||||
return RC_OK;
|
||||
MaxKey key;
|
||||
Index* index = row->GetTable()->GetPrimaryIndex();
|
||||
MOT::Index* index = row->GetTable()->GetPrimaryIndex();
|
||||
key.InitKey(index->GetKeyLength());
|
||||
index->BuildKey(row->GetTable(), row, &key);
|
||||
uint64_t tableId = row->GetTable()->GetTableId();
|
||||
@ -149,7 +151,7 @@ RC RedoLog::OverwriteRow(Row* row)
|
||||
if (!m_configuration.m_enableRedoLog)
|
||||
return RC_OK;
|
||||
MaxKey key;
|
||||
Index* index = row->GetTable()->GetPrimaryIndex();
|
||||
MOT::Index* index = row->GetTable()->GetPrimaryIndex();
|
||||
key.InitKey(index->GetKeyLength());
|
||||
index->BuildKey(row->GetTable(), row, &key);
|
||||
uint64_t tableId = row->GetTable()->GetTableId();
|
||||
@ -182,7 +184,7 @@ RC RedoLog::DeleteRow(Row* row)
|
||||
if (!m_configuration.m_enableRedoLog)
|
||||
return RC_OK;
|
||||
MaxKey key;
|
||||
Index* index = row->GetTable()->GetPrimaryIndex();
|
||||
MOT::Index* index = row->GetTable()->GetPrimaryIndex();
|
||||
key.InitKey(index->GetKeyLength());
|
||||
index->BuildKey(row->GetTable(), row, &key);
|
||||
uint64_t tableId = row->GetTable()->GetTableId();
|
||||
@ -219,7 +221,7 @@ RC RedoLog::DropTable(Table* table)
|
||||
return (success == true) ? RC_OK : RC_ERROR;
|
||||
}
|
||||
|
||||
RC RedoLog::CreateIndex(Index* index)
|
||||
RC RedoLog::CreateIndex(MOT::Index* index)
|
||||
{
|
||||
if (!m_configuration.m_enableRedoLog)
|
||||
return RC_OK;
|
||||
@ -231,7 +233,7 @@ RC RedoLog::CreateIndex(Index* index)
|
||||
return (success == true) ? RC_OK : RC_ERROR;
|
||||
}
|
||||
|
||||
RC RedoLog::DropIndex(Index* index)
|
||||
RC RedoLog::DropIndex(MOT::Index* index)
|
||||
{
|
||||
if (!m_configuration.m_enableRedoLog)
|
||||
return RC_OK;
|
||||
@ -280,14 +282,18 @@ void RedoLog::WriteToLog()
|
||||
RC RedoLog::SerializeTransaction()
|
||||
{
|
||||
RC status = RC_OK;
|
||||
MOT::Index* index;
|
||||
std::map<uint64_t, TxnDDLAccess::DDLAccess*> uixMap;
|
||||
bool hasDML = (m_txn->m_accessMgr->m_rowCnt > 0 && !m_txn->m_isLightSession);
|
||||
TxnDDLAccess* transactionDDLAccess = m_txn->m_txnDdlAccess;
|
||||
if (transactionDDLAccess != nullptr && transactionDDLAccess->Size() > 0) {
|
||||
RC status = RC_ERROR;
|
||||
for (uint16_t i = 0; i < transactionDDLAccess->Size(); i++) {
|
||||
Table* truncatedTable = nullptr;
|
||||
TxnDDLAccess::DDLAccess* DDLAccess = transactionDDLAccess->Get(i);
|
||||
if (DDLAccess == nullptr)
|
||||
if (DDLAccess == nullptr) {
|
||||
return RC_ERROR;
|
||||
}
|
||||
DDLAccessType accessType = DDLAccess->GetDDLAccessType();
|
||||
switch (accessType) {
|
||||
case DDL_ACCESS_CREATE_TABLE:
|
||||
@ -299,11 +305,33 @@ RC RedoLog::SerializeTransaction()
|
||||
break;
|
||||
|
||||
case DDL_ACCESS_CREATE_INDEX:
|
||||
status = CreateIndex((Index*)DDLAccess->GetEntry());
|
||||
index = (MOT::Index*)DDLAccess->GetEntry();
|
||||
if (!hasDML || !(!index->IsPrimaryKey() && index->IsUnique())) {
|
||||
status = CreateIndex(index);
|
||||
} else {
|
||||
// in case of unique secondary skip and send it after DML
|
||||
MOT_LOG_DEBUG("Defer create index: %s %lu", index->GetName().c_str(), index->GetExtId());
|
||||
uixMap[index->GetExtId()] = DDLAccess;
|
||||
status = RC_OK;
|
||||
}
|
||||
break;
|
||||
|
||||
case DDL_ACCESS_DROP_INDEX:
|
||||
status = DropIndex((Index*)DDLAccess->GetEntry());
|
||||
index = (MOT::Index*)DDLAccess->GetEntry();
|
||||
if (!hasDML || !(!index->IsPrimaryKey() && index->IsUnique())) {
|
||||
status = DropIndex(index);
|
||||
} else {
|
||||
std::map<uint64_t, TxnDDLAccess::DDLAccess*>::iterator it = uixMap.find(index->GetExtId());
|
||||
if (it != uixMap.end()) {
|
||||
// we create and drop no need for both them
|
||||
MOT_LOG_DEBUG("Erase create index: %s %lu", index->GetName().c_str(), index->GetExtId());
|
||||
uixMap.erase(it);
|
||||
status = RC_OK;
|
||||
} else {
|
||||
uixMap[index->GetExtId()] = DDLAccess;
|
||||
status = DropIndex(index);
|
||||
}
|
||||
}
|
||||
break;
|
||||
case DDL_ACCESS_TRUNCATE_TABLE:
|
||||
// in case of truncate table the DDLAccess entry holds the
|
||||
@ -322,13 +350,17 @@ RC RedoLog::SerializeTransaction()
|
||||
return RC_ERROR;
|
||||
break;
|
||||
}
|
||||
if (status != RC_OK)
|
||||
if (status != RC_OK) {
|
||||
MOT_LOG_ERROR("Serialize DDL finished with error: %d", status);
|
||||
return RC_ERROR;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (m_txn->m_isLightSession)
|
||||
if (m_txn->m_isLightSession) {
|
||||
MOT_LOG_DEBUG("Serialize DDL light session finished");
|
||||
return RC_OK;
|
||||
}
|
||||
|
||||
for (uint32_t index = 0; index < m_txn->m_accessMgr->m_rowCnt; index++) {
|
||||
Access* access = m_txn->m_accessMgr->GetAccessPtr(index);
|
||||
@ -346,8 +378,9 @@ RC RedoLog::SerializeTransaction()
|
||||
}
|
||||
break;
|
||||
case DEL:
|
||||
if (access->m_params.IsPrimarySentinel())
|
||||
if (access->m_params.IsPrimarySentinel()) {
|
||||
status = DeleteRow(access->GetTxnRow());
|
||||
}
|
||||
break;
|
||||
case WR:
|
||||
status = UpdateRow(access->GetTxnRow(), access->m_modifiedColumns);
|
||||
@ -357,9 +390,33 @@ RC RedoLog::SerializeTransaction()
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (status != RC_OK)
|
||||
if (status != RC_OK) {
|
||||
MOT_LOG_ERROR("Serialize DML finished with error: %d", status);
|
||||
return status;
|
||||
}
|
||||
}
|
||||
|
||||
// create operations for unique indexes
|
||||
std::map<uint64_t, TxnDDLAccess::DDLAccess*>::iterator it = uixMap.begin();
|
||||
while (it != uixMap.end()) {
|
||||
DDLAccessType accessType = it->second->GetDDLAccessType();
|
||||
switch (accessType) {
|
||||
case DDL_ACCESS_CREATE_INDEX:
|
||||
index = (MOT::Index*)it->second->GetEntry();
|
||||
MOT_LOG_DEBUG("Send create index: %s %lu", index->GetName().c_str(), index->GetExtId());
|
||||
status = CreateIndex(index);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
if (status != RC_OK) {
|
||||
return RC_ERROR;
|
||||
}
|
||||
|
||||
it++;
|
||||
}
|
||||
uixMap.clear();
|
||||
return RC_OK;
|
||||
}
|
||||
|
||||
|
||||
@ -1173,7 +1173,7 @@ bool MOTAdaptor::SetMatchingExpr(
|
||||
uint16_t numIx = state->m_table->GetNumIndexes();
|
||||
|
||||
for (uint16_t i = 0; i < numIx; i++) {
|
||||
MOT::Index *ix = state->m_table->GetIndex(i);
|
||||
MOT::Index* ix = state->m_table->GetIndex(i);
|
||||
if (ix != nullptr && ix->IsFieldPresent(colId)) {
|
||||
if (marr->m_idx[i] == nullptr) {
|
||||
marr->m_idx[i] = (MatchIndex*)palloc0(sizeof(MatchIndex));
|
||||
|
||||
Reference in New Issue
Block a user