Transactional recovery to fix issues in standby and update from core

This commit is contained in:
Vinoth
2020-08-19 10:19:36 +08:00
parent 767f9efcdf
commit 21fee69cd8
20 changed files with 534 additions and 199 deletions

View File

@ -24,6 +24,7 @@
#include <malloc.h>
#include <string.h>
#include <algorithm>
#include "table.h"
#include "mot_engine.h"
#include "utilities.h"
@ -127,7 +128,9 @@ bool Table::InitRowPool(bool local)
void Table::ClearThreadMemoryCache()
{
for (int i = 0; i < m_numIndexes; i++) {
m_indexes[i]->ClearThreadMemoryCache();
if (m_indexes[i] != nullptr) {
m_indexes[i]->ClearThreadMemoryCache();
}
}
if (m_rowPool != nullptr) {
@ -189,9 +192,13 @@ bool Table::UpdatePrimaryIndex(Index* index, TxnManager* txn, uint32_t tid)
{
if (this->m_primaryIndex) {
if (txn == nullptr) {
DeletePrimaryIndex(this->m_primaryIndex);
if (DeletePrimaryIndex(this->m_primaryIndex) != RC_OK) {
return false;
}
} else {
txn->DropIndex(this->m_primaryIndex);
if (txn->DropIndex(this->m_primaryIndex) != RC_OK) {
return false;
}
}
} else {
if (m_numIndexes == 0) {
@ -449,7 +456,13 @@ RC Table::InsertRow(Row* row, TxnManager* txn)
// set primary key
row->SetRowId(txn->GetSurrogateKey());
mot_vector<Key*> cleanupKeys;
key = txn->GetTxnKey(ix);
if (key == nullptr) {
MOT_REPORT_ERROR(MOT_ERROR_OOM, "Insert Row", "Failed to create primary key");
return RC_MEMORY_ALLOCATION_ERROR;
}
cleanupKeys.push_back(key);
if (ix->IsFakePrimary()) {
surrogateprimaryKey = htobe64(row->GetRowId());
row->SetSurrogateKey(surrogateprimaryKey);
@ -464,10 +477,19 @@ RC Table::InsertRow(Row* row, TxnManager* txn)
for (uint16_t i = 1; i < numIndexes; i++) {
ix = GetSecondaryIndex(i);
key = txn->GetTxnKey(ix);
if (key == nullptr) {
MOT_REPORT_ERROR(
MOT_ERROR_OOM, "Insert Row", "Failed to create key for secondary index %s", ix->GetName().c_str());
std::for_each(cleanupKeys.begin(), cleanupKeys.end(), [](Key*& key) { MOTCurrTxn->DestroyTxnKey(key); });
MOTCurrTxn->Rollback();
return RC_MEMORY_ALLOCATION_ERROR;
}
cleanupKeys.push_back(key);
ix->BuildKey(this, row, key);
txn->GetNextInsertItem()->SetItem(row, ix, key);
}
cleanupKeys.clear(); // subsequent call to insert row takes care of key cleanup
return txn->InsertRow(row);
}
@ -675,21 +697,23 @@ bool Table::ModifyColumnSize(const uint32_t& id, const uint64_t& size)
}
// column size is uint64_t but tuple size is uint32_t, so we must check for overflow
if (size >= (uint64_t)std::numeric_limits<decltype(this->m_tupleSize)>::max())
if (size >= (uint64_t)std::numeric_limits<decltype(this->m_tupleSize)>::max()) {
return false;
}
uint64_t oldColSize = m_columns[id]->m_size;
uint64_t newTupleSize = ((uint64_t)m_tupleSize) - oldColSize + size;
if (newTupleSize >= (uint64_t)std::numeric_limits<decltype(this->m_tupleSize)>::max())
if (newTupleSize >= (uint64_t)std::numeric_limits<decltype(this->m_tupleSize)>::max()) {
return false;
}
m_tupleSize = newTupleSize;
m_columns[id]->m_size = size;
// now we need to fix the offset of all subsequent fields
for (uint32_t i = id + 1; i < m_fieldCnt; ++i)
for (uint32_t i = id + 1; i < m_fieldCnt; ++i) {
m_columns[id]->m_offset = m_columns[id]->m_offset - oldColSize + size;
}
return true;
}
@ -909,13 +933,17 @@ char* Table::DesrializeMeta(char* dataIn, CommonIndexMeta& meta)
return dataIn;
}
RC Table::CreateIndexFromMeta(CommonIndexMeta& meta, bool primary, uint32_t tid)
RC Table::CreateIndexFromMeta(
CommonIndexMeta& meta, bool primary, uint32_t tid, bool addToTable /* = true */, Index** outIndex /* = nullptr */)
{
IndexTreeFlavor flavor = DEFAULT_TREE_FLAVOR;
Index* ix = nullptr;
MOT_LOG_DEBUG("%s: %s (%s)", __func__, meta.m_name.c_str(), primary ? "primary" : "secondary");
if (meta.m_indexingMethod == IndexingMethod::INDEXING_METHOD_TREE)
if (meta.m_indexingMethod == IndexingMethod::INDEXING_METHOD_TREE) {
flavor = GetGlobalConfiguration().m_indexTreeFlavor;
}
ix = IndexFactory::CreateIndex(meta.m_indexOrder, meta.m_indexingMethod, flavor);
if (ix == nullptr) {
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL,
@ -924,6 +952,7 @@ RC Table::CreateIndexFromMeta(CommonIndexMeta& meta, bool primary, uint32_t tid)
m_longTableName.c_str());
return RC_ERROR;
}
ix->SetUnique(meta.m_unique);
if (!ix->SetNumTableFields(meta.m_numTableFields)) {
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL,
@ -934,30 +963,40 @@ RC Table::CreateIndexFromMeta(CommonIndexMeta& meta, bool primary, uint32_t tid)
delete ix;
return RC_ERROR;
}
for (int i = 0; i < meta.m_numKeyFields; i++)
for (int i = 0; i < meta.m_numKeyFields; i++) {
ix->SetLenghtKeyFields(i, meta.m_columnKeyFields[i], meta.m_lengthKeyFields[i]);
}
ix->SetFakePrimary(meta.m_fake);
ix->SetNumIndexFields(meta.m_numKeyFields);
ix->SetTable(this);
ix->SetIsCommited(true);
if (ix->IndexInit(meta.m_keyLength, meta.m_unique, meta.m_name, nullptr) != RC_OK) {
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "Create Index from meta-data", "Failed to initialize index");
delete ix;
return RC_ERROR;
}
if (primary) {
if (UpdatePrimaryIndex(ix, nullptr, tid) != true) {
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "Create Index from meta-data", "Failed to add primary index");
delete ix;
return RC_ERROR;
}
} else {
if (AddSecondaryIndex(ix->GetName(), (Index*)ix, nullptr, tid) != true) {
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "Create Index from meta-data", "Failed to add secondary index");
delete ix;
return RC_ERROR;
if (addToTable) {
// In transactional recovery we set index as committed only during commit.
ix->SetIsCommited(true);
if (primary) {
if (UpdatePrimaryIndex(ix, nullptr, tid) != true) {
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "Create Index from meta-data", "Failed to add primary index");
delete ix;
return RC_ERROR;
}
} else {
if (AddSecondaryIndex(ix->GetName(), (Index*)ix, nullptr, tid) != true) {
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "Create Index from meta-data", "Failed to add secondary index");
delete ix;
return RC_ERROR;
}
}
}
if (outIndex != nullptr) {
*outIndex = ix;
}
return RC_OK;
}

View File

@ -944,7 +944,8 @@ public:
* @param tid the thread identifier
* @return RC error code.
*/
RC CreateIndexFromMeta(CommonIndexMeta& meta, bool primary, uint32_t tid);
RC CreateIndexFromMeta(
CommonIndexMeta& meta, bool primary, uint32_t tid, bool addToTable = true, Index** outIndex = nullptr);
/**
* @brief returns the serialized size of a table

View File

@ -281,6 +281,11 @@ void CheckpointManager::TransactionCompleted(TxnManager* txn)
} else { // previous phase
m_counters[!m_cntBit].fetch_sub(1);
}
if (txn->m_replayLsn != 0 && MOTEngine::GetInstance()->IsRecovering()) {
// Update the last replay LSN in recovery manager in case of redo replay.
// This is needed for getting the last replay LSN during checkpoint in standby.
GetRecoveryManager()->SetLastReplayLsn(txn->GetReplayLsn());
}
m_lock.RdUnlock();
if (m_counters[!m_cntBit] == 0 && IsAutoCompletePhase()) {
@ -339,6 +344,15 @@ void CheckpointManager::MoveToNextPhase()
// relevant for asynchronous logging or group commit
m_redoLogHandler->Flush();
}
if (MOTEngine::GetInstance()->IsRecovering()) {
// We are moving from RESOLVE to CAPTURE phase. No transaction is allowed to commit
// as this point. This is the point where we take snapshot and any rows committed
// after this point will not be included in this checkpoint.
// Get the current last replay LSN from recovery manager and use it as m_lastReplayLsn
// for the current checkpoint. If the system recovers from disk after this checkpoint,
// it is safe to ignore any redo replay before this LSN.
SetLastReplayLsn(GetRecoveryManager()->GetLastReplayLsn());
}
}
// there are no open transactions from previous phase, we can move forward to next phase
@ -760,7 +774,8 @@ bool CheckpointManager::CreateCheckpointId(uint64_t& checkpointId)
bool CheckpointManager::GetCheckpointDirName(std::string& dirName)
{
if (!CheckpointUtils::SetDirName(dirName, m_id)) {
uint64_t checkpointId = GetRecoveryManager()->GetCheckpointId();
if (!CheckpointUtils::SetDirName(dirName, checkpointId)) {
MOT_LOG_ERROR("SetDirName failed");
return false;
}

View File

@ -164,11 +164,6 @@ public:
return m_id;
}
void SetLastReplayLsn(uint64_t lsn)
{
m_lastReplayLsn = lsn;
}
uint64_t GetLastReplayLsn()
{
return m_lastReplayLsn;
@ -296,6 +291,11 @@ private:
return m_lsn;
}
void SetLastReplayLsn(uint64_t lsn)
{
m_lastReplayLsn = lsn;
}
static const char* PhaseToString(CheckpointPhase phase);
inline void SwapAvailableAndNotAvailable()

View File

@ -31,7 +31,7 @@
namespace MOT {
DECLARE_LOGGER(CSNManager, System);
CSNManager::CSNManager() : m_csn(0)
CSNManager::CSNManager() : m_csn(MOT_INITIAL_CSN)
{}
CSNManager::~CSNManager()

View File

@ -28,6 +28,10 @@
#include <atomic>
#include "global.h"
#define MOT_INVALID_CSN ((uint64_t)-1)
#define MOT_RECOVERED_TABLE_CSN ((uint64_t)0)
#define MOT_INITIAL_CSN ((uint64_t)1)
namespace MOT {
/**
* @class CSNManager

View File

@ -327,6 +327,7 @@ bool RecoveryManager::RecoverTableRows(
break;
}
BeginTransaction();
InsertRow(tableId,
fileHeader.m_exId,
keyData,
@ -338,8 +339,13 @@ bool RecoveryManager::RecoverTableRows(
m_sState,
status,
entry.m_rowId);
if (status != RC_OK)
status = CommitTransaction(entry.m_csn);
if (status != RC_OK) {
MOT_LOG_ERROR(
"Failed to commit row recovery from checkpoint: %s (error code: %d)", RcToString(status), (int)status);
break;
}
MOT_LOG_DEBUG("Inserted into table %u row with CSN %" PRIu64, tableId, entry.m_csn);
if (entry.m_csn > maxCsn)
maxCsn = entry.m_csn;
}
@ -412,7 +418,6 @@ void RecoveryManager::CpWorkerFunc()
bool RecoveryManager::RecoverFromCheckpoint()
{
uint64_t lastReplayLsn = 0;
m_checkpointWorkerStop = false;
if (!m_tasksList.empty()) {
MOT_LOG_ERROR("RecoveryManager:: tasksQueue is not empty!");
@ -426,7 +431,7 @@ bool RecoveryManager::RecoverFromCheckpoint()
if (IsCheckpointValid(CheckpointControlFile::GetCtrlFile()->GetId())) {
m_checkpointId = CheckpointControlFile::GetCtrlFile()->GetId();
m_lsn = CheckpointControlFile::GetCtrlFile()->GetLsn();
lastReplayLsn = CheckpointControlFile::GetCtrlFile()->GetLastReplayLsn();
m_lastReplayLsn = CheckpointControlFile::GetCtrlFile()->GetLastReplayLsn();
} else {
MOT_LOG_ERROR("RecoveryManager:: no valid checkpoint exist");
OnError(RecoveryManager::ErrCodes::CP_SETUP, "RecoveryManager:: no valid checkpoint exist");
@ -440,11 +445,15 @@ bool RecoveryManager::RecoverFromCheckpoint()
return false;
}
if (m_lsn >= lastReplayLsn) {
MOT_LOG_DEBUG("Recovery LSN Check: will use lsn: %lu (last replay lsn: %lu)", m_lsn, lastReplayLsn);
} else {
m_lsn = lastReplayLsn;
MOT_LOG_WARN("Recovery LSN Check: modifying lsn to %lu (last replay lsn)", m_lsn);
if (m_checkpointId != CheckpointControlFile::invalidId) {
if (m_lsn >= m_lastReplayLsn) {
MOT_LOG_INFO(
"Recovery LSN Check will use the LSN (%lu), ignoring the lastReplayLSN (%lu)", m_lsn, m_lastReplayLsn);
} else {
MOT_LOG_WARN(
"Recovery LSN Check will use the lastReplayLSN (%lu), ignoring the LSN (%lu)", m_lastReplayLsn, m_lsn);
m_lsn = m_lastReplayLsn;
}
}
int taskFillStat = FillTasksFromMapFile();
@ -467,6 +476,7 @@ bool RecoveryManager::RecoverFromCheckpoint()
m_tableIds.size(),
m_checkpointId);
BeginTransaction();
for (auto it = m_tableIds.begin(); it != m_tableIds.end(); ++it) {
if (IsRecoveryMemoryLimitReached(NUM_REDO_RECOVERY_THREADS)) {
MOT_LOG_ERROR("Memory hard limit reached. Cannot recover datanode");
@ -481,6 +491,12 @@ bool RecoveryManager::RecoverFromCheckpoint()
return false;
}
}
RC status = CommitTransaction(MOT_RECOVERED_TABLE_CSN);
if (status != RC_OK) {
MOT_LOG_ERROR("Failed to commit table recovery: %s (error code: %d)", RcToString(status), (int)status);
OnError(RecoveryManager::ErrCodes::CP_TABLE_COMMIT, "Failed to commit table recovery from checkpoint");
return false;
}
std::vector<std::thread> recoveryThreadPool;
for (uint32_t i = 0; i < m_numWorkers; ++i) {
@ -593,20 +609,18 @@ void RecoveryManager::FreeRedoSegment(LogSegment* segment)
delete segment;
}
bool RecoveryManager::ApplyLogSegmentFromData(uint64_t redoLsn, char* data, size_t len)
bool RecoveryManager::ApplyRedoLog(uint64_t redoLsn, char* data, size_t len)
{
if (redoLsn < m_lsn) {
// ignore old redo records which are prior to our checkpoint LSN
MOT_LOG_DEBUG(
"ApplyLogSegmentFromData - ignoring old redo record. Checkpoint LSN: %lu, redo LSN: %lu", m_lsn, redoLsn);
MOT_LOG_DEBUG("ApplyRedoLog - ignoring old redo record. Checkpoint LSN: %lu, redo LSN: %lu", m_lsn, redoLsn);
return true;
}
MOTEngine::GetInstance()->GetCheckpointManager()->SetLastReplayLsn(redoLsn);
return ApplyLogSegmentFromData(data, len);
return ApplyLogSegmentFromData(data, len, redoLsn);
}
bool RecoveryManager::ApplyLogSegmentFromData(char* data, size_t len)
bool RecoveryManager::ApplyLogSegmentFromData(char* data, size_t len, uint64_t replayLsn /* = 0 */)
{
bool result = false;
char* curData = data;
@ -614,7 +628,7 @@ bool RecoveryManager::ApplyLogSegmentFromData(char* data, size_t len)
while (data + len > curData) {
// obtain LogSegment from buffer
RedoLogTransactionIterator iterator(curData, len);
LogSegment* segment = iterator.AllocRedoSegment();
LogSegment* segment = iterator.AllocRedoSegment(replayLsn);
if (segment == nullptr) {
MOT_LOG_ERROR("ApplyLogSegmentFromData - failed to allocate segment");
return false;
@ -741,18 +755,41 @@ RC RecoveryManager::RedoSegment(LogSegment* segment, uint64_t csn, uint64_t tran
bool is2pcRecovery = !MOTEngine::GetInstance()->IsRecovering();
uint8_t* endPosition = (uint8_t*)(segment->m_data + segment->m_len);
uint8_t* operationData = (uint8_t*)(segment->m_data);
bool txnStarted = false;
bool wasCommit = false;
while (operationData < endPosition) {
// redolog recovery - single threaded
// redo log recovery - single threaded
if (IsRecoveryMemoryLimitReached(NUM_REDO_RECOVERY_THREADS)) {
status = RC_ERROR;
MOT_LOG_ERROR("Memory hard limit reached. Cannot recover datanode");
break;
}
// begin transaction on-demand
if (!txnStarted) {
if (!BeginTransaction(segment->m_replayLsn)) {
MOT_REPORT_ERROR(MOT_ERROR_RESOURCE_LIMIT, "Recover Redo Segment", "Cannot start a new transaction");
return RC_ERROR;
} else {
txnStarted = true;
}
}
if (!is2pcRecovery) {
operationData += RecoverLogOperation(operationData, csn, transactionId, MOTCurrThreadId, m_sState, status);
GcManager* gc = MOT_GET_CURRENT_SESSION_CONTEXT()->GetTxnManager()->GetGcSession();
operationData +=
RecoverLogOperation(operationData, csn, transactionId, MOTCurrThreadId, m_sState, status, wasCommit);
// check operation result status
if (status != RC_OK) {
MOT_REPORT_ERROR(MOT_ERROR_RESOURCE_LIMIT, "Recover Redo Segment", "Failed to recover redo segment");
break;
}
// update transactional state
if (wasCommit) {
txnStarted = false;
}
GcManager* gc = MOTCurrTxn->GetGcSession();
if (m_numRedoOps == 0 && gc != nullptr) {
gc->GcStartTxn();
}
@ -1221,6 +1258,7 @@ bool RecoveryManager::DeserializeInProcessTxns(int fd, uint64_t numEntries)
break;
}
segment->m_replayLsn = 0;
erc = memcpy_s(&segment->m_len, sizeof(size_t), buf, sizeof(size_t));
securec_check(erc, "\0", "\0");
segment->m_data = new (std::nothrow) char[segment->m_len];

View File

@ -53,7 +53,10 @@ public:
CP_RECOVERY = 3,
XLOG_SETUP = 4,
XLOG_RECOVERY = 5,
SURROGATE = 6
SURROGATE = 6,
CP_TABLE_COMMIT = 7,
CP_ROW_COMMIT = 8,
XLOG_TXN_COMMIT = 9
};
enum RecoveryOpState { COMMIT = 1, ABORT = 2, TPC_APPLY = 3, TPC_COMMIT = 4, TPC_ABORT = 5 };
@ -174,6 +177,7 @@ public:
m_recoverFromCkptDone(false),
m_checkpointId(0),
m_lsn(0),
m_lastReplayLsn(0),
m_numWorkers(GetGlobalConfiguration().m_checkpointRecoveryWorkers),
m_tid(0),
m_maxRecoveredCsn(0),
@ -488,7 +492,7 @@ public:
* transactions map and operate on it
* @return Boolean value denoting success or failure.
*/
bool ApplyLogSegmentFromData(char* data, size_t len);
bool ApplyLogSegmentFromData(char* data, size_t len, uint64_t replayLsn = 0);
/**
* @brief attempts to insert a data chunk into the in-process
@ -497,7 +501,7 @@ public:
* ignored.
* @return Boolean value denoting success or failure.
*/
bool ApplyLogSegmentFromData(uint64_t redoLsn, char* data, size_t len);
bool ApplyRedoLog(uint64_t redoLsn, char* data, size_t len);
/**
* @brief performs a commit on an in-process transaction,
* @return Boolean value denoting success or failure to commit.
@ -575,6 +579,11 @@ public:
m_checkpointId = id;
}
inline uint64_t GetCheckpointId() const
{
return m_checkpointId;
}
/**
* @brief applies a failed 2pc transaction according to its type.
* a detailed info is described in the function's implementation.
@ -608,6 +617,18 @@ public:
m_tableDeletesStat[t]++;
}
inline void SetLastReplayLsn(uint64_t lastReplayLsn)
{
if (m_lastReplayLsn < lastReplayLsn) {
m_lastReplayLsn = lastReplayLsn;
}
}
inline uint64_t GetLastReplayLsn() const
{
return m_lastReplayLsn;
}
void ClearTableCache();
LogStats* m_logStats;
@ -670,11 +691,12 @@ private:
* @param transactionId the transaction id
* @param tid the thread id of the recovering thread
* @param sState the returned surrogate state of this operation
* @param status the returned status of the operation
* @param[out] status the returned status of the operation
* @param[out] Was this a commit operation (required for managing transactional state).
* @return Int value denoting the number of bytes recovered
*/
static uint32_t RecoverLogOperation(
uint8_t* data, uint64_t csn, uint64_t transactionId, uint32_t tid, SurrogateState& sState, RC& status);
static uint32_t RecoverLogOperation(uint8_t* data, uint64_t csn, uint64_t transactionId, uint32_t tid,
SurrogateState& sState, RC& status, bool& wasCommit);
/**
* @brief performs an insert operation of a data buffer
@ -727,7 +749,16 @@ private:
* @param tid the thread id of the recovering thread
* @return Int value denoting the number of bytes recovered
*/
static uint32_t RecoverLogOperationCommit(uint8_t* data, uint64_t csn, uint32_t tid);
static uint32_t RecoverLogOperationCommit(uint8_t* data, uint64_t csn, uint32_t tid, RC& status);
/**
* @brief performs a rollback operation of a data buffer
* @param data the buffer to recover.
* @param csn The CSN of the operation.
* @param tid the thread id of the recovering thread
* @return Int value denoting the number of bytes recovered
*/
static uint32_t RecoverLogOperationRollback(uint8_t* data, uint64_t csn, uint32_t tid, RC& status);
/**
* @brief performs a create table operation from a data buffer
@ -983,6 +1014,18 @@ private:
*/
bool IsRecoveryMemoryLimitReached(uint32_t numThreads);
/**
* @brief Starts a new transaction for recovery operations.
* @param replayLsn the redo LSN for this transaction during replay.
*/
bool BeginTransaction(uint64_t replayLsn = 0);
/** @brief Commits the current recovery transaction. */
RC CommitTransaction(uint64_t csn);
/** @brief Rolls back the current recovery transaction. */
RC RollbackTransaction();
/**
* @brief a helper to extract a type from a buffer
* @param data the data buffer to extract from.
@ -1017,6 +1060,8 @@ private:
uint64_t m_lsn;
uint64_t m_lastReplayLsn;
std::string m_workingDir;
std::set<uint32_t> m_tableIds;

View File

@ -28,14 +28,16 @@
#include "checkpoint_utils.h"
#include "bitmapset.h"
#include "column.h"
#include "txn_insert_action.h"
#include <string>
#include <algorithm>
namespace MOT {
DECLARE_LOGGER(RecoveryOps, Recovery);
uint32_t RecoveryManager::RecoverLogOperation(
uint8_t* data, uint64_t csn, uint64_t transactionId, uint32_t tid, SurrogateState& sState, RC& status)
uint32_t RecoveryManager::RecoverLogOperation(uint8_t* data, uint64_t csn, uint64_t transactionId, uint32_t tid,
SurrogateState& sState, RC& status, bool& wasCommit)
{
OperationCode opCode = *static_cast<OperationCode*>((void*)data);
switch (opCode) {
@ -59,11 +61,17 @@ uint32_t RecoveryManager::RecoverLogOperation(
return RecoverLogOperationTruncateTable(data, status, COMMIT);
case COMMIT_TX:
case COMMIT_PREPARED_TX:
return RecoverLogOperationCommit(data, csn, tid);
wasCommit = true;
return RecoverLogOperationCommit(data, csn, tid, status);
case PARTIAL_REDO_TX:
case PREPARE_TX:
return sizeof(EndSegmentBlock);
case ROLLBACK_TX:
case ROLLBACK_PREPARED_TX:
return RecoverLogOperationRollback(data, csn, tid, status);
default:
MOT_LOG_ERROR("Unknown recovery redo record op-code: %u", (unsigned)opCode);
status = RC_ERROR;
return 0;
}
}
@ -173,7 +181,7 @@ uint32_t RecoveryManager::RecoverLogOperationDropTable(uint8_t* data, RC& status
status = RC_ERROR;
break;
}
return sizeof(OperationCode) + sizeof(uint32_t);
return sizeof(OperationCode) + sizeof(uint64_t);
}
uint32_t RecoveryManager::RecoverLogOperationCreateIndex(uint8_t* data, uint32_t tid, RC& status, RecoveryOpState state)
@ -197,7 +205,7 @@ uint32_t RecoveryManager::RecoverLogOperationCreateIndex(uint8_t* data, uint32_t
status = RC_ERROR;
break;
}
return sizeof(OperationCode) + sizeof(bufSize) + sizeof(uint32_t) + bufSize; // sizeof(uint32_t) is for tableId
return sizeof(OperationCode) + sizeof(bufSize) + sizeof(uint64_t) + bufSize; // sizeof(uint64_t) is for tableId
}
uint32_t RecoveryManager::RecoverLogOperationDropIndex(uint8_t* data, RC& status, RecoveryOpState state)
@ -220,11 +228,11 @@ uint32_t RecoveryManager::RecoverLogOperationDropIndex(uint8_t* data, RC& status
status = RC_ERROR;
break;
}
uint32_t tableId = 0;
uint64_t tableId = 0;
size_t nameLen = 0;
Extract(data, tableId);
Extract(data, nameLen);
return sizeof(OperationCode) + sizeof(uint32_t) + sizeof(size_t) + nameLen;
return sizeof(OperationCode) + sizeof(uint64_t) + sizeof(size_t) + nameLen;
}
uint32_t RecoveryManager::RecoverLogOperationTruncateTable(uint8_t* data, RC& status, RecoveryOpState state)
@ -246,7 +254,7 @@ uint32_t RecoveryManager::RecoverLogOperationTruncateTable(uint8_t* data, RC& st
status = RC_ERROR;
break;
}
return sizeof(OperationCode) + sizeof(uint32_t);
return sizeof(OperationCode) + sizeof(uint64_t);
}
uint32_t RecoveryManager::RecoverLogOperationInsert(
@ -319,7 +327,7 @@ uint32_t RecoveryManager::RecoverLogOperationUpdate(uint8_t* data, uint64_t csn,
return 0;
}
key->CpKey((const uint8_t*)keyData, keyLength);
Row* row = index->IndexRead(key, tid);
Row* row = MOTCurrTxn->RowLookupByKey(table, RD_FOR_UPDATE, key);
if (row == nullptr) {
/// row not found...
@ -433,140 +441,144 @@ uint32_t RecoveryManager::RecoverLogOperationDelete(uint8_t* data, uint64_t csn,
return sizeof(OperationCode) + sizeof(tableId) + sizeof(exId) + sizeof(keyLength) + keyLength;
}
uint32_t RecoveryManager::RecoverLogOperationCommit(uint8_t* data, uint64_t csn, uint32_t tid)
uint32_t RecoveryManager::RecoverLogOperationCommit(uint8_t* data, uint64_t csn, uint32_t tid, RC& status)
{
// OperationCode + CSN + transaction_type + commit_counter + transaction_id
if (MOT::GetRecoveryManager()->m_logStats != nullptr)
MOT::GetRecoveryManager()->m_logStats->m_tcls++;
status = MOT::GetRecoveryManager()->CommitTransaction(csn);
if (status != RC_OK) {
MOT_LOG_ERROR("Failed to commit row recovery from log: %s (error code: %d)", RcToString(status), (int)status);
}
return sizeof(EndSegmentBlock);
}
uint32_t RecoveryManager::RecoverLogOperationRollback(uint8_t* data, uint64_t csn, uint32_t tid, RC& status)
{
// OperationCode + CSN + transaction_type + commit_counter + transaction_id
status = MOT::GetRecoveryManager()->RollbackTransaction();
if (status != RC_OK) {
MOT_LOG_ERROR("Failed to rollback row recovery from log: %s (error code: %d)", RcToString(status), (int)status);
}
return sizeof(EndSegmentBlock);
}
void RecoveryManager::InsertRow(uint64_t tableId, uint64_t exId, char* keyData, uint16_t keyLen, char* rowData,
uint64_t rowLen, uint64_t csn, uint32_t tid, SurrogateState& sState, RC& status, uint64_t rowId, bool insertLocked)
{
Table* table = nullptr;
if (!GetRecoveryManager()->FetchTable(tableId, table)) {
Table* table = MOTCurrTxn->GetTableByExternalId(exId);
if (table == nullptr) {
status = RC_ERROR;
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "RecoveryManager::insertRow", "Table %llu does not exist", tableId);
return;
}
uint64_t tableExId = table->GetTableExId();
if (tableExId != exId) {
status = RC_ERROR;
MOT_REPORT_ERROR(
MOT_ERROR_INTERNAL, "Recovery Manager Insert Row", "exId mismatch: my %llu - pkt %llu", tableExId, exId);
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "Recover Insert Row", "Table %" PRIu64 " does not exist", exId);
return;
}
Row* row = table->CreateNewRow();
if (row == nullptr) { // OA: check result before using pointer
if (row == nullptr) {
status = RC_ERROR;
MOT_REPORT_ERROR(MOT_ERROR_OOM, "Recovery Manager Insert Row", "failed to create row");
MOT_REPORT_ERROR(MOT_ERROR_OOM, "Recover Insert Row", "Failed to create row");
return;
}
row->CopyData((const uint8_t*)rowData, rowLen);
row->SetCommitSequenceNumber(csn);
row->SetRowId(rowId);
if (insertLocked == true) {
row->SetTwoPhaseMode(true);
row->m_rowHeader.Lock();
}
uint64_t surrogateprimaryKey = 0;
uint8_t* keyBytes = nullptr;
Key* key = nullptr;
MOT::Index* ix = nullptr;
uint32_t numIndexes = table->GetNumIndexes();
mot_vector<Key*> cleanupKeys;
ix = table->GetPrimaryIndex();
key = ix->CreateNewKey();
key = MOTCurrTxn->GetTxnKey(ix);
if (key == nullptr) {
table->DestroyRow(row);
status = RC_ERROR;
MOT_REPORT_ERROR(MOT_ERROR_OOM, "Recovery Manager Insert Row", "failed to create key");
MOT_REPORT_ERROR(MOT_ERROR_OOM, "Recover Insert Row", "Failed to create primary key");
return;
}
cleanupKeys.push_back(key);
if (ix->IsFakePrimary()) {
row->SetSurrogateKey(*(uint64_t*)keyData);
sState.UpdateMaxKey(rowId);
}
key->CpKey((const uint8_t*)keyData, keyLen);
status = table->InsertRowNonTransactional(row, tid, key);
MOTCurrTxn->GetNextInsertItem()->SetItem(row, ix, key);
for (uint16_t i = 1; i < table->GetNumIndexes(); i++) {
ix = table->GetSecondaryIndex(i);
key = MOTCurrTxn->GetTxnKey(ix);
if (key == nullptr) {
status = RC_MEMORY_ALLOCATION_ERROR;
MOT_REPORT_ERROR(MOT_ERROR_OOM,
"Recover Insert Row",
"Failed to create key for secondary index %s",
ix->GetName().c_str());
std::for_each(cleanupKeys.begin(), cleanupKeys.end(), [](Key*& key) { MOTCurrTxn->DestroyTxnKey(key); });
MOTCurrTxn->Rollback();
return;
}
cleanupKeys.push_back(key);
ix->BuildKey(table, row, key);
MOTCurrTxn->GetNextInsertItem()->SetItem(row, ix, key);
}
cleanupKeys.clear(); // subsequent call to insert row takes care of key cleanup
status = MOTCurrTxn->InsertRow(row);
if (insertLocked == true) {
row->GetPrimarySentinel()->Lock(0);
}
if (status == RC_UNIQUE_VIOLATION && DuplicateRow(table, keyData, keyLen, rowData, rowLen, tid)) {
/* Same row already exists. ok. */
table->DestroyRow(row);
// Same row already exists. ok.
// no need to destroy row (already destroyed by TxnManager::InsertRow() in case of unique violation)
status = RC_OK;
} else if (status == RC_MEMORY_ALLOCATION_ERROR) {
MOT_REPORT_ERROR(MOT_ERROR_OOM, "Recovery Manager Insert Row", "failed to insert row");
table->DestroyRow(row);
} else if (status != RC_OK) {
table->DestroyRow(row);
}
ix->DestroyKey(key);
}
void RecoveryManager::DeleteRow(
uint64_t tableId, uint64_t exId, char* keyData, uint16_t keyLen, uint64_t csn, uint32_t tid, RC& status)
{
Sentinel* pSentinel = nullptr;
RC rc;
Row* row = nullptr;
Key* key = nullptr;
Index* index = nullptr;
uint64_t tableExId;
Table* table = nullptr;
if (!GetRecoveryManager()->FetchTable(tableId, table)) {
MOT_REPORT_ERROR(MOT_ERROR_INVALID_ARG, "Recovery Manager Delete Row", "table %u does not exist", tableId);
Table* table = MOTCurrTxn->GetTableByExternalId(exId);
if (table == nullptr) {
MOT_REPORT_ERROR(
MOT_ERROR_INVALID_ARG, "Recovery Manager Delete Row", "table %" PRIu64 " does not exist", exId);
status = RC_ERROR;
MOT_LOG_ERROR("RecoveryManager::deleteRow - table %u does not exist", tableId);
return;
}
index = table->GetPrimaryIndex();
key = index->CreateNewKey();
if (key == nullptr) { // OA: check result before using pointer
key = MOTCurrTxn->GetTxnKey(index);
if (key == nullptr) {
MOT_REPORT_ERROR(MOT_ERROR_OOM, "Recovery Manager Delete Row", "failed to create key");
status = RC_ERROR;
return;
}
key->CpKey((const uint8_t*)keyData, keyLen);
tableExId = table->GetTableExId();
if (tableExId != exId) {
MOT_REPORT_ERROR(
MOT_ERROR_INTERNAL, "Recovery Manager Delete Row", "exId mismatch: my %lu - pkt %lu", tableExId, exId);
status = RC_ERROR;
index->DestroyKey(key);
return;
}
rc = table->FindRow(key, pSentinel, tid);
if ((rc != RC_OK) || (pSentinel == 0)) {
MOT_LOG_ERROR("RecoveryManager::deleteRow - findRow rc %u, sentinel %p", rc, pSentinel);
status = RC_OK;
index->DestroyKey(key);
return;
}
row = pSentinel->GetData();
if (row != 0) {
if (!table->RemoveRow(row, tid)) {
row = MOTCurrTxn->RowLookupByKey(table, WR, key);
if (row != nullptr) {
status = MOTCurrTxn->DeleteLastRow();
if (status != RC_OK) {
if (MOT_IS_OOM()) {
// OA: report error if remove row failed due to OOM (what about other errors?)
MOT_REPORT_ERROR(
MOT_ERROR_OOM, "Recovery Manager Delete Row", "failed to remove row due to lack of memory");
status = RC_MEMORY_ALLOCATION_ERROR;
} else {
status = RC_ERROR;
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL,
"Recovery Manager Delete Row",
"failed to remove row: %s (error code: %d)",
RcToString(status),
status);
}
MOT_LOG_ERROR("RecoveryManager::deleteRow2 - findRow rc %u, sentinel %p", rc, pSentinel);
} else {
GetRecoveryManager()->IncreaseTableDeletesStat(table);
}
@ -574,17 +586,18 @@ void RecoveryManager::DeleteRow(
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "Recovery Manager Delete Row", "getData failed");
status = RC_ERROR;
}
index->DestroyKey(key);
MOTCurrTxn->DestroyTxnKey(key);
status = RC_OK;
}
void RecoveryManager::UpdateRow(uint64_t tableId, uint64_t exId, char* keyData, uint16_t keyLen, char* rowData,
uint64_t rowLen, uint64_t csn, uint32_t tid, SurrogateState& sState, RC& status)
{
Table* table = nullptr;
if (!GetRecoveryManager()->FetchTable(tableId, table)) {
Table* table = MOTCurrTxn->GetTableByExternalId(exId);
if (table == nullptr) {
status = RC_ERROR;
MOT_REPORT_ERROR(MOT_ERROR_INVALID_ARG, "Recovery Manager Update Row", "table %u does not exist", tableId);
MOT_REPORT_ERROR(
MOT_ERROR_INVALID_ARG, "Recovery Manager Update Row", "table %" PRIu64 " does not exist", exId);
return;
}
@ -597,14 +610,14 @@ void RecoveryManager::UpdateRow(uint64_t tableId, uint64_t exId, char* keyData,
}
Index* index = table->GetPrimaryIndex();
Key* key = index->CreateNewKey();
if (key == nullptr) { // OA: check result before using pointer
Key* key = MOTCurrTxn->GetTxnKey(index);
if (key == nullptr) {
status = RC_ERROR;
MOT_REPORT_ERROR(MOT_ERROR_OOM, "Recovery Manager Update Row", "failed to create key");
return;
}
key->CpKey((const uint8_t*)keyData, keyLen);
Row* row = index->IndexRead(key, tid);
Row* row = MOTCurrTxn->RowLookupByKey(table, RD_FOR_UPDATE, key);
if (row == nullptr) {
/// row not found... need to check row version
// if row version is less than the updated row version it means that we
@ -628,7 +641,7 @@ void RecoveryManager::UpdateRow(uint64_t tableId, uint64_t exId, char* keyData,
csn);
}
}
index->DestroyKey(key);
MOTCurrTxn->DestroyTxnKey(key);
}
void RecoveryManager::CreateTable(char* data, RC& status, Table*& table, bool addToEngine)
@ -647,7 +660,7 @@ void RecoveryManager::CreateTable(char* data, RC& status, Table*& table, bool ad
table = new (std::nothrow) Table();
if (table == nullptr) {
MOT_REPORT_ERROR(MOT_ERROR_OOM, "Recovery Manager Create Table", "failed to allocate table's memory");
MOT_REPORT_ERROR(MOT_ERROR_OOM, "Recovery Manager Create Table", "failed to allocate table object");
status = RC_ERROR;
return;
}
@ -655,16 +668,16 @@ void RecoveryManager::CreateTable(char* data, RC& status, Table*& table, bool ad
table->Deserialize((const char*)data);
do {
if (!table->IsDeserialized()) {
MOT_LOG_ERROR("RecoveryManager::CreateTable: failed to deserialize table");
MOT_LOG_ERROR("RecoveryManager::CreateTable: failed to de-serialize table");
break;
}
if (addToEngine && !GetTableManager()->AddTable(table)) {
if (addToEngine && ((status = MOTCurrTxn->CreateTable(table)) != RC_OK)) {
MOT_LOG_ERROR("RecoveryManager::CreateTable: failed to add table to engine");
break;
}
MOT_LOG_DEBUG("RecoveryManager::CreateTable: table %s [%lu] created (%s to engine)",
MOT_LOG_DEBUG("RecoveryManager::CreateTable: table %s [internal id %u] created (%s to engine)",
table->GetLongTableName().c_str(),
table->GetTableId(),
addToEngine ? "added" : "not added");
@ -675,60 +688,79 @@ void RecoveryManager::CreateTable(char* data, RC& status, Table*& table, bool ad
MOT_LOG_ERROR("RecoveryManager::CreateTable: failed to recover table");
delete table;
status = RC_ERROR;
if (status == RC_OK) {
status = RC_ERROR;
}
return;
}
void RecoveryManager::DropTable(char* data, RC& status)
{
char* in = (char*)data;
uint32_t tableId;
uint64_t externalTableId;
Table* table;
string tableName;
in = SerializablePOD<uint32_t>::Deserialize(in, tableId);
table = GetTableManager()->GetTable(tableId);
in = SerializablePOD<uint64_t>::Deserialize(in, externalTableId);
table = MOTCurrTxn->GetTableByExternalId(externalTableId);
if (table == nullptr) {
MOT_LOG_DEBUG("DropTable: could not find table %u", tableId);
MOT_LOG_DEBUG("DropTable: could not find table %" PRIu64, externalTableId);
/* this might happen if we try to replay an outdated xlog entry - currently we do not error out */
return;
}
tableName.assign(table->GetLongTableName());
if (GetTableManager()->DropTable(table, MOT_GET_CURRENT_SESSION_CONTEXT()) != RC_OK) {
status = MOTCurrTxn->DropTable(table);
if (status != RC_OK) {
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL,
"Recovery Manager Drop Table",
"Failed to drop table %s [%lu])",
"Failed to drop table %s [%" PRIu64 "])",
tableName.c_str(),
tableId);
status = RC_ERROR;
externalTableId);
} else {
MOT::GetRecoveryManager()->m_tableDeletesStat.erase(table);
}
MOT_LOG_DEBUG("RecoveryManager::DropTable: table %s [%lu] dropped", tableName.c_str(), tableId);
MOT_LOG_DEBUG("RecoveryManager::DropTable: table %s [%" PRIu64 "] dropped", tableName.c_str(), externalTableId);
}
void RecoveryManager::CreateIndex(char* data, uint32_t tid, RC& status)
{
char* in = (char*)data;
uint32_t tableId;
uint64_t externalTableId;
Table* table;
Table::CommonIndexMeta idx;
in = SerializablePOD<uint32_t>::Deserialize(in, tableId);
table = GetTableManager()->GetTable(tableId);
in = SerializablePOD<uint64_t>::Deserialize(in, externalTableId);
table = MOTCurrTxn->GetTableByExternalId(externalTableId);
if (table == nullptr) {
MOT_REPORT_ERROR(MOT_ERROR_INVALID_ARG, "Recovery Manager Create Index", "Could not find table %u", tableId);
MOT_REPORT_ERROR(
MOT_ERROR_INVALID_ARG, "Recover Create Index", "Could not find table %" PRIu64, externalTableId);
status = RC_ERROR;
return;
}
in = table->DesrializeMeta(in, idx);
if (idx.m_indexOrder == IndexOrder::INDEX_ORDER_PRIMARY) {
MOT_LOG_DEBUG("createIndex: creating Primary Index");
table->CreateIndexFromMeta(idx, true, tid);
} else {
MOT_LOG_DEBUG("createIndex: creating Secondary Index");
table->CreateIndexFromMeta(idx, false, tid);
bool primary = idx.m_indexOrder == IndexOrder::INDEX_ORDER_PRIMARY;
MOT_LOG_DEBUG("createIndex: creating %s Index", primary ? "Primary" : "Secondary");
Index* index = nullptr;
status = table->CreateIndexFromMeta(idx, primary, tid, false, &index);
if (status != RC_OK) {
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL,
"Recover Create Index",
"Failed to create index for table %" PRIu64 " from meta-data: %s (error code: %d)",
externalTableId,
RcToString(status),
status);
}
if (status == RC_OK) {
status = MOTCurrTxn->CreateIndex(table, index, primary);
if (status != RC_OK) {
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL,
"Recover Create Index",
"Failed to create index for table %" PRIu64 ": %s (error code: %d)",
externalTableId,
RcToString(status),
status);
}
}
}
@ -736,21 +768,26 @@ void RecoveryManager::DropIndex(char* data, RC& status)
{
RC res;
char* in = (char*)data;
uint32_t tableId;
uint64_t externalTableId;
Table* table;
uint32_t indexNameLength;
string indexName;
in = SerializablePOD<uint32_t>::Deserialize(in, tableId);
table = GetTableManager()->GetTable(tableId);
in = SerializablePOD<uint64_t>::Deserialize(in, externalTableId);
table = MOTCurrTxn->GetTableByExternalId(externalTableId);
if (table == nullptr) {
/* this might happen if we try to replay an outdated xlog entry - currently we do not error out */
MOT_LOG_DEBUG("dropIndex: could not find table %u", tableId);
MOT_LOG_DEBUG("dropIndex: could not find table %" PRIu64, externalTableId);
return;
}
in = SerializableSTR::Deserialize(in, indexName);
res = table->RemoveSecondaryIndex((char*)(indexName.c_str()), MOT_GET_CURRENT_SESSION_CONTEXT()->GetTxnManager());
Index* index = table->GetSecondaryIndex(indexName);
if (index == nullptr) {
res = RC_INDEX_NOT_FOUND;
} else {
res = MOTCurrTxn->DropIndex(index);
}
if (res != RC_OK) {
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL,
"Recovery Manager Drop Index",
@ -764,18 +801,20 @@ void RecoveryManager::TruncateTable(char* data, RC& status)
{
RC res = RC_OK;
char* in = (char*)data;
uint32_t tableId;
uint64_t externalTableId;
Table* table;
in = SerializablePOD<uint32_t>::Deserialize(in, tableId);
table = GetTableManager()->GetTable(tableId);
in = SerializablePOD<uint64_t>::Deserialize(in, externalTableId);
table = MOTCurrTxn->GetTableByExternalId(externalTableId);
if (table == nullptr) {
/* this might happen if we try to replay an outdated xlog entry - currently we do not error out */
MOT_LOG_DEBUG("truncateTable: could not find table %u", tableId);
MOT_LOG_DEBUG("truncateTable: could not find table %" PRIu64, externalTableId);
return;
}
table->Truncate(MOT_GET_CURRENT_SESSION_CONTEXT()->GetTxnManager());
table->WrLock();
status = MOTCurrTxn->TruncateTable(table);
table->Unlock();
}
// in-process (2pc) transactions recovery
@ -839,8 +878,8 @@ uint32_t RecoveryManager::TwoPhaseRecoverOp(RecoveryOpState state, uint8_t* data
if (opCode == CREATE_ROW)
ret += sizeof(uint64_t); // rowId
Table* table = nullptr;
if (!GetRecoveryManager()->FetchTable(tableId, table)) {
Table* table = MOTCurrTxn->GetTableByExternalId(exId);
if (table == nullptr) {
status = RC_ERROR;
MOT_LOG_ERROR("RecoveryManager::applyInProcessInsert: fetch table failed (id %lu)", tableId);
return ret;
@ -1080,4 +1119,65 @@ bool RecoveryManager::DuplicateRow(
index->DestroyKey(key);
return res;
}
bool RecoveryManager::BeginTransaction(uint64_t replayLsn /* = 0 */)
{
bool result = false;
SessionContext* sessionContext = MOT_GET_CURRENT_SESSION_CONTEXT();
if (sessionContext == nullptr) {
MOT_REPORT_ERROR(
MOT_ERROR_INVALID_STATE, "Recover DB", "Cannot start recovery transaction: no session context");
} else {
TxnManager* txn = sessionContext->GetTxnManager();
txn->StartTransaction(INVALID_TRANSACTIOIN_ID, READ_COMMITED);
txn->SetReplayLsn(replayLsn);
result = true;
}
return result;
}
RC RecoveryManager::CommitTransaction(uint64_t csn)
{
RC result = RC_ERROR;
SessionContext* sessionContext = MOT_GET_CURRENT_SESSION_CONTEXT();
if (sessionContext == nullptr) {
MOT_REPORT_ERROR(
MOT_ERROR_INVALID_STATE, "Recover DB", "Cannot commit recovery transaction: no session context");
} else {
TxnManager* txn = sessionContext->GetTxnManager();
result = txn->Commit(INVALID_TRANSACTIOIN_ID, csn);
if (result != RC_OK) {
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL,
"Recover DB",
"Failed to commit recovery transaction: %s (error code: %d)",
RcToString(result),
(int)result);
txn->Rollback();
} else {
txn->EndTransaction();
}
}
return result;
}
RC RecoveryManager::RollbackTransaction()
{
RC result = RC_ERROR;
SessionContext* sessionContext = MOT_GET_CURRENT_SESSION_CONTEXT();
if (sessionContext == nullptr) {
MOT_REPORT_ERROR(
MOT_ERROR_INVALID_STATE, "Recover DB", "Cannot rollback recovery transaction: no session context");
} else {
TxnManager* txn = sessionContext->GetTxnManager();
result = txn->Rollback();
if (result != RC_OK) {
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL,
"Recover DB",
"Failed to rollback recovery transaction: %s (error code: %d)",
RcToString(result),
(int)result);
}
}
return result;
}
} // namespace MOT

View File

@ -51,7 +51,7 @@ void* RedoLogTransactionIterator::GetTransactionEntry()
return reinterpret_cast<void*>(m_buffer + m_position);
}
LogSegment* RedoLogTransactionIterator::AllocRedoSegment()
LogSegment* RedoLogTransactionIterator::AllocRedoSegment(uint64_t replayLsn)
{
LogSegment* segment = new (std::nothrow) LogSegment();
if (segment == nullptr) {
@ -69,6 +69,7 @@ LogSegment* RedoLogTransactionIterator::AllocRedoSegment()
reinterpret_cast<void*>(m_buffer + m_position + sizeof(uint32_t)),
segment->m_len);
securec_check(erc, "\0", "\0");
segment->m_replayLsn = replayLsn;
return segment;
}

View File

@ -41,6 +41,8 @@ struct LogSegment : public Serializable {
EndSegmentBlock m_controlBlock;
uint64_t m_replayLsn;
/**
* @brief fetches the size of the log segment
* @return Size_t value denoting the size of the segment.
@ -106,7 +108,7 @@ public:
void* GetTransactionEntry();
LogSegment* AllocRedoSegment();
LogSegment* AllocRedoSegment(uint64_t replayLsn);
private:
char* m_buffer;

View File

@ -227,9 +227,13 @@ RC TxnManager::LitePrepare(TransactionId transactionId)
return RC_OK;
}
RC TxnManager::CommitInternal()
RC TxnManager::CommitInternal(uint64_t csn)
{
SetCommitSequenceNumber(GetCSNManager().GetNextCSN());
if (csn == MOT_INVALID_CSN) {
SetCommitSequenceNumber(GetCSNManager().GetNextCSN());
} else {
SetCommitSequenceNumber(csn); // for recovery
}
// Record the start write phase for this transaction
if (GetGlobalConfiguration().m_enableCheckpoint) {
GetCheckpointManager()->BeginTransaction(this);
@ -255,14 +259,14 @@ RC TxnManager::Commit()
return Commit(INVALID_TRANSACTIOIN_ID);
}
RC TxnManager::Commit(uint64_t transcationId)
RC TxnManager::Commit(uint64_t transcationId, uint64_t csn /* = MOT_INVALID_CSN */)
{
// Validate concurrency control
if (transcationId != INVALID_TRANSACTIOIN_ID)
m_transactionId = transcationId;
RC rc = m_occManager.ValidateOcc(this);
if (rc == RC_OK) {
rc = CommitInternal();
rc = CommitInternal(csn);
MOT::DbSessionStatisticsProvider::GetInstance().AddCommitTxn();
}
return rc;
@ -515,6 +519,7 @@ void TxnManager::RollbackDDLs()
indexes = (Index**)ddl_access->GetEntry();
table = indexes[0]->GetTable();
MOT_LOG_INFO("Rollback of truncate table %s", table->GetLongTableName().c_str());
table->WrLock();
for (int idx = 0; idx < table->GetNumIndexes(); idx++) {
index = table->m_indexes[idx];
table->m_indexes[idx] = indexes[idx];
@ -526,6 +531,7 @@ void TxnManager::RollbackDDLs()
index->Truncate(true);
delete index;
}
table->Unlock();
delete[] indexes;
break;
case DDL_ACCESS_CREATE_INDEX:
@ -534,6 +540,7 @@ void TxnManager::RollbackDDLs()
MOT_LOG_INFO("Rollback of create index %s for table %s",
index->GetName().c_str(),
table->GetLongTableName().c_str());
table->WrLock();
if (index->IsPrimaryKey()) {
table->SetPrimaryIndex(nullptr);
GcManager::ClearIndexElements(index->GetIndexId());
@ -541,6 +548,7 @@ void TxnManager::RollbackDDLs()
} else {
table->RemoveSecondaryIndex((char*)index->GetName().c_str(), this);
}
table->Unlock();
break;
case DDL_ACCESS_DROP_INDEX:
index = (Index*)ddl_access->GetEntry();
@ -549,7 +557,9 @@ void TxnManager::RollbackDDLs()
index->GetName().c_str(),
table->GetLongTableName().c_str());
if (index->IsPrimaryKey()) {
table->WrLock();
table->SetPrimaryIndex(index);
table->Unlock();
}
break;
default:
@ -595,7 +605,9 @@ void TxnManager::WriteDDLChanges()
table = index->GetTable();
index->SetIsCommited(true);
if (index->IsPrimaryKey()) {
table->WrLock();
table->SetPrimaryIndex(index);
table->Unlock();
}
break;
case DDL_ACCESS_DROP_INDEX:
@ -679,6 +691,7 @@ TxnManager::TxnManager(SessionContext* session_context)
m_checkpointNABit(false),
m_csn(0),
m_transactionId(INVALID_TRANSACTIOIN_ID),
m_replayLsn(0),
m_surrogateGen(0),
m_flushDone(false),
m_internalTransactionId(((uint64_t)m_sessionContext->GetSessionId()) << SESSION_ID_BITS),

View File

@ -41,6 +41,7 @@
#include "bitmapset.h"
#include "txn_ddl_access.h"
#include "mm_session_api.h"
#include "commit_sequence_number.h"
namespace MOT {
class MOTContext;
@ -56,6 +57,8 @@ class LoggerTask;
class Key;
class Index;
#define MOTCurrTxn MOT_GET_CURRENT_SESSION_CONTEXT()->GetTxnManager()
/**
* @class TxnManager
* @brief Transaction manager is used to manage the life cycle of a single
@ -138,6 +141,16 @@ public:
m_internalTransactionId = transactionId;
}
inline void SetReplayLsn(uint64_t replayLsn)
{
m_replayLsn = replayLsn;
}
inline uint64_t GetReplayLsn() const
{
return m_replayLsn;
}
void RemoveTableFromStat(Table* t);
/**
@ -151,7 +164,7 @@ public:
* @return Result code denoting success or failure.
*/
RC Commit();
RC Commit(uint64_t transcationId);
RC Commit(uint64_t transcationId, uint64_t csn = MOT_INVALID_CSN);
RC LiteCommit(uint64_t transcationId);
/**
@ -472,7 +485,7 @@ private:
/**
* @brief Internal commit (used by commit and commitPrepared)
*/
RC CommitInternal();
RC CommitInternal(uint64_t csn);
RC RollbackInternal(bool isPrepared);
// Disable class level new operator
@ -543,8 +556,10 @@ private:
/** @var transaction_id Provided by envelop on start transaction. */
uint64_t m_transactionId;
/** @var serogate_counter Promotes every insert
transaction. */
/** @var Replay LSN for this transaction, used only during replay in standby. */
uint64_t m_replayLsn;
/** @var surrogate_counter Promotes every insert transaction. */
SurrogateKeyGenerator m_surrogateGen;
bool m_flushDone;

View File

@ -238,7 +238,7 @@ bool RedoLogWriter::AppendPartial(RedoLogBuffer& redoLogBuffer, TxnManager* txn)
bool RedoLogWriter::AppendIndex(RedoLogBuffer& buffer, Table* table, Index* index)
{
size_t bufSize = table->SerializeItemSize(index);
uint16_t entrySize = sizeof(OperationCode) + sizeof(size_t) + sizeof(uint32_t) + bufSize + sizeof(EndSegmentBlock);
uint16_t entrySize = sizeof(OperationCode) + sizeof(size_t) + sizeof(uint64_t) + bufSize + sizeof(EndSegmentBlock);
if (buffer.FreeSize() < entrySize)
return false;
@ -248,7 +248,7 @@ bool RedoLogWriter::AppendIndex(RedoLogBuffer& buffer, Table* table, Index* inde
return false;
}
uint32_t tableId = table->GetTableId();
uint64_t tableId = table->GetTableExId();
table->SerializeItem(buf, index);
buffer.Append(OperationCode::CREATE_INDEX);
buffer.Append(bufSize);
@ -262,10 +262,10 @@ bool RedoLogWriter::AppendIndex(RedoLogBuffer& buffer, Table* table, Index* inde
bool RedoLogWriter::AppendDropIndex(RedoLogBuffer& buffer, Table* table, Index* index)
{
uint16_t entrySize =
sizeof(OperationCode) + sizeof(uint32_t) + sizeof(size_t) + index->GetName().length() + sizeof(EndSegmentBlock);
sizeof(OperationCode) + sizeof(uint64_t) + sizeof(size_t) + index->GetName().length() + sizeof(EndSegmentBlock);
if (buffer.FreeSize() < entrySize)
return false;
uint32_t tableId = table->GetTableId();
uint64_t tableId = table->GetTableExId();
buffer.Append(OperationCode::DROP_INDEX);
buffer.Append(tableId);
buffer.Append(index->GetName().length());
@ -297,11 +297,11 @@ bool RedoLogWriter::AppendTable(RedoLogBuffer& buffer, Table* table)
bool RedoLogWriter::AppendDropTable(RedoLogBuffer& buffer, Table* table)
{
uint16_t entrySize = sizeof(OperationCode) + sizeof(uint32_t) + sizeof(EndSegmentBlock);
uint16_t entrySize = sizeof(OperationCode) + sizeof(uint64_t) + sizeof(EndSegmentBlock);
if (buffer.FreeSize() < entrySize)
return false;
uint32_t tableId = table->GetTableId();
uint64_t tableId = table->GetTableExId();
buffer.Append(OperationCode::DROP_TABLE);
buffer.Append(tableId);
return true;
@ -309,11 +309,11 @@ bool RedoLogWriter::AppendDropTable(RedoLogBuffer& buffer, Table* table)
bool RedoLogWriter::AppendTruncateTable(RedoLogBuffer& buffer, Table* table)
{
uint16_t entrySize = sizeof(OperationCode) + sizeof(uint32_t) + sizeof(EndSegmentBlock);
uint16_t entrySize = sizeof(OperationCode) + sizeof(uint64_t) + sizeof(EndSegmentBlock);
if (buffer.FreeSize() < entrySize)
return false;
uint32_t tableId = table->GetTableId();
uint64_t tableId = table->GetTableExId();
buffer.Append(OperationCode::TRUNCATE_TABLE);
buffer.Append(tableId);
return true;

View File

@ -1329,13 +1329,16 @@ List* MOTPlanForeignModify(PlannerInfo* root, ModifyTable* plan, ::Index resultR
RangeTblEntry* rte = planner_rt_fetch(resultRelation, root);
Relation rel = heap_open(rte->relid, NoLock);
TupleDesc desc = RelationGetDescr(rel);
bool isFromScan = false;
uint8_t attrsModify[BITMAP_GETLEN(desc->natts)];
uint8_t* ptrAttrsModify = attrsModify;
MOT::TxnManager* currTxn = GetSafeTxn(/*GetCurrentTransactionId()*/);
MOT::Table* table = currTxn->GetTableByExternalId(RelationGetRelid(rel));
if ((int)resultRelation < root->simple_rel_array_size && root->simple_rel_array[resultRelation] != nullptr) {
isFromScan = true;
if (root->simple_rel_array[resultRelation]->fdw_private != nullptr) {
fdwState = (MOTFdwStateSt*)root->simple_rel_array[resultRelation]->fdw_private;
ptrAttrsModify = fdwState->m_attrsUsed;
}
} else {
fdwState = (MOTFdwStateSt*)palloc0(sizeof(MOTFdwStateSt));
fdwState->m_cmdOper = plan->operation;
@ -1367,7 +1370,7 @@ List* MOTPlanForeignModify(PlannerInfo* root, ModifyTable* plan, ::Index resultR
securec_check(erc, "\0", "\0");
for (int i = 0; i < desc->natts; i++) {
if (bms_is_member(desc->attrs[i]->attnum - FirstLowInvalidHeapAttributeNumber, rte->updatedCols)) {
BITMAP_SET(attrsModify, (desc->attrs[i]->attnum - 1));
BITMAP_SET(ptrAttrsModify, (desc->attrs[i]->attnum - 1));
}
}
break;
@ -1378,7 +1381,7 @@ List* MOTPlanForeignModify(PlannerInfo* root, ModifyTable* plan, ::Index resultR
securec_check(erc, "\0", "\0");
for (int i = 0; i < desc->natts; i++) {
if (!desc->attrs[i]->attisdropped) {
BITMAP_SET(attrsModify, (desc->attrs[i]->attnum - 1));
BITMAP_SET(ptrAttrsModify, (desc->attrs[i]->attnum - 1));
}
}
}
@ -1390,8 +1393,8 @@ List* MOTPlanForeignModify(PlannerInfo* root, ModifyTable* plan, ::Index resultR
heap_close(rel, NoLock);
return (isFromScan ? (List*)BitmapSerialize(nullptr, attrsModify, BITMAP_GETLEN(desc->natts))
: (List*)SerializeFdwState(fdwState));
return ((fdwState == nullptr) ? (List*)BitmapSerialize(nullptr, attrsModify, BITMAP_GETLEN(desc->natts))
: (List*)SerializeFdwState(fdwState));
}
static TupleTableSlot* MOTExecForeignInsert(
@ -2121,7 +2124,7 @@ uint64_t MOTCheckpointGetId()
{
MOT::MOTEngine* engine = MOT::MOTEngine::GetInstance();
if (engine != nullptr) {
return engine->GetCheckpointManager()->GetId();
return engine->GetRecoveryManager()->GetCheckpointId();
}
return 0;
}

View File

@ -71,8 +71,7 @@ void MOTRedo(XLogReaderState* record)
if (!IsValidEntry(recordType)) {
elog(ERROR, "MOTRedo: invalid op code %u", recordType);
}
if (MOT::GetRecoveryManager()->IsErrorSet() ||
!MOT::GetRecoveryManager()->ApplyLogSegmentFromData(lsn, data, len)) {
if (MOT::GetRecoveryManager()->IsErrorSet() || !MOT::GetRecoveryManager()->ApplyRedoLog(lsn, data, len)) {
// we treat errors fatally.
ereport(FATAL,
(MOTXlateRecoveryErr(MOT::GetRecoveryManager()->GetErrorCode()),

View File

@ -38,8 +38,9 @@ function test_1()
fi
start_primary_as_standby
sleep 5
sleep 30
switchover_to_primary
sleep 30
#test the copy results on dn1_primary
if [ $(gsql -d $db -p $dn1_primary_port -c "select pgxc_pool_reload();select count(1) from cstore_copy_t1;" | grep `expr 1 \* $cstore_rawdata_lines` |wc -l) -eq 1 ]; then

View File

@ -13,18 +13,32 @@ function test_1()
#create mot data
gsql -d $db -p $dn1_primary_port -c "DROP FOREIGN TABLE if exists mot_switch1; CREATE FOREIGN TABLE mot_switch1(id INT,name VARCHAR(15) NOT NULL) SERVER mot_server;"
gsql -d $db -p $dn1_primary_port -c "copy mot_switch1 from '$g_data_path/datanode1/pg_copydir/data5';"
print_time
echo "start cluter success!"
inc_build_pattern="dn incremental build completed"
print_time
echo "killing primary"
kill_primary
print_time
echo "primary killed"
print_time
echo "failing over to standby"
failover_to_standby
echo "failover_to_standby"
print_time
echo "failover_to_standby DONE"
print_time
echo build
#sleep 5
build_result=`gs_ctl build -D ${primary_data_dir}`
#build_result=`gs_ctl build -D ${primary_data_dir} -b full`
echo $build_result
print_time
if [[ $build_result =~ $inc_build_pattern ]]
then
echo "inc build success"
@ -32,9 +46,11 @@ function test_1()
echo "inc build $failed_keyword"
fi
sleep 5
# wait for recovery to complete on primary node
sleep 30
gsql -d $db -p $dn1_primary_port -m -c "select count(1) from mot_switch1;"
print_time
if [ $(gsql -d $db -p $dn1_primary_port -m -c "select count(1) from mot_switch1;" | grep `expr 1 \* $rawdata_lines` |wc -l) -eq 1 ]; then
echo "copy success on dn1_primary"
else
@ -42,6 +58,7 @@ function test_1()
fi
gsql -d $db -p $dn1_standby_port -m -c "select count(1) from mot_switch1;"
print_time
if [ $(gsql -d $db -p $dn1_standby_port -m -c "select count(1) from mot_switch1;" | grep `expr 1 \* $rawdata_lines` |wc -l) -eq 1 ]; then
echo "copy success on dn1_standby"
else
@ -51,6 +68,8 @@ function test_1()
function tear_down() {
sleep 1
print_time
echo "Test done. Tearing down cluter. Failing over to primary after test"
failover_to_primary
gsql -d $db -p $dn1_standby_port -m -c " select * from pg_drop_replication_slot('dn_s2');"
gsql -d $db -p $dn1_standby_port -m -c " select * from pg_drop_replication_slot('dn_s3');"

View File

@ -14,9 +14,13 @@ function test_1()
gsql -d $db -p $dn1_primary_port -c "copy mot_switch1 from '$g_data_path/datanode1/pg_copydir/data5';"
inc_build_pattern="dn incremental build completed"
print_time
echo "Killing stadnby..."
kill_standby
echo "standy killed"
print_time
echo "standy killed, building standby..."
build_result=`gs_ctl build -D ${standby_data_dir}`
print_time
if [[ $build_result =~ $inc_build_pattern ]]
then
echo "inc build success"
@ -24,16 +28,23 @@ function test_1()
echo "inc build $failed_keyword"
fi
sleep 5
# wait for recovery on standby to complete
sleep 30
print_time
echo "Querying primary..."
gsql -d $db -p $dn1_primary_port -c "select count(1) from mot_switch1;"
print_time
if [ $(gsql -d $db -p $dn1_primary_port -c "select count(1) from mot_switch1;" | grep `expr 1 \* $rawdata_lines` |wc -l) -eq 1 ]; then
echo "copy success on dn1_primary"
else
echo "copy $failed_keyword on dn1_primary"
fi
print_time
echo "Attempting access to standby"
gsql -d $db -p $dn1_standby_port -m -c "select count(1) from mot_switch1;"
print_time
if [ $(gsql -d $db -p $dn1_standby_port -m -c "select count(1) from mot_switch1;" | grep `expr 1 \* $rawdata_lines` |wc -l) -eq 1 ]; then
echo "copy success on dn1_standby"
else

View File

@ -315,6 +315,35 @@ function failover_to_standby4() {
fi
}
function print_time() {
cur_time=`date +"%F %T.%N" | cut -c1-23`
echo -n "[${cur_time}] "
}
function wait_recovery_done() {
# wait for recovery to complete on primary node
# this function is incomplete as it does not find the correct log line (still need to support minimum timstamp for search)
recovery_done=0
node_data_dir=$1
wait_time_seconds=$2
last_log=`ls -ltr ${node_data_dir}/pg_log/postgresql-* | tail -1 | awk '{print $9}'`
for i in `seq 1 $wait_time_seconds`;
do
recovery_done=`grep "database system is ready to accept read only connections" $last_log | wc -l`
if [ $recovery_done -eq 1 ]; then
print_time
echo "Recovery done on node detected after $i seconds at: $node_data_dir"
break
fi
sleep 1
done
if [ $recovery_done -eq 0 ]; then
print_time
echo "Failed to find recovery done message after $wait_time_seconds seconds in node log at: $node_data_dir"
fi
}
#check_synchronous_commit "datanode1" 1
#check_detailed_instance
#check_primary "datanode1" 2