!415 Refactor MOT recovery

Merge pull request !415 from Vinoth Veeraraghavan/master
This commit is contained in:
opengauss-bot
2020-11-17 18:57:58 +08:00
committed by Gitee
35 changed files with 2737 additions and 2145 deletions

View File

@ -38,7 +38,7 @@ class Row;
class TxnManager;
class TxnAccess;
class CheckpointWorkerPool;
class RecoveryManager;
class RecoveryOps;
// forward declarations
enum RC : uint32_t;
enum AccessType : uint8_t;
@ -238,7 +238,7 @@ private:
friend Row;
friend OccTransactionManager;
friend CheckpointWorkerPool;
friend RecoveryManager;
friend RecoveryOps;
friend Index;
};
} // namespace MOT

View File

@ -40,7 +40,7 @@
namespace MOT {
class OccTransactionManager;
class CheckpointWorkerPool;
class RecoveryManager;
class RecoveryOps;
/**
* @class Row
@ -608,7 +608,7 @@ protected:
friend CheckpointManager;
friend CheckpointWorkerPool;
friend Index;
friend RecoveryManager;
friend RecoveryOps;
friend Table;
DECLARE_CLASS_LOGGER()

View File

@ -532,7 +532,6 @@ void CheckpointManager::CompleteCheckpoint()
// Update checkpoint Id
SetId(m_inProgressId);
GetRecoveryManager()->SetCheckpointId(m_id);
finishedUpdatingFiles = true;
} while (0);
(void)pthread_rwlock_unlock(&m_fetchLock);
@ -779,8 +778,7 @@ bool CheckpointManager::CreateCheckpointId(uint64_t& checkpointId)
bool CheckpointManager::GetCheckpointDirName(std::string& dirName)
{
uint64_t checkpointId = GetRecoveryManager()->GetCheckpointId();
if (!CheckpointUtils::SetDirName(dirName, checkpointId)) {
if (!CheckpointUtils::SetDirName(dirName, GetId())) {
MOT_LOG_ERROR("SetDirName failed");
return false;
}
@ -829,10 +827,11 @@ bool CheckpointManager::CreateTpcRecoveryFile()
// this lock is held while serializing the in process transactions call by
// checkpoint. It prevents gs_clean removing entries from the in-process map
// while they are serialized
GetRecoveryManager()->LockInProcessTxns();
MOTEngine::GetInstance()->GetInProcessTransactions().Lock();
CheckpointUtils::TpcFileHeader tpcFileHeader;
tpcFileHeader.m_magic = CP_MGR_MAGIC;
tpcFileHeader.m_numEntries = GetRecoveryManager()->GetInProcessTxnsSize();
tpcFileHeader.m_numEntries = MOTEngine::GetInstance()->GetInProcessTransactions().GetNumTxns();
size_t wrStat = CheckpointUtils::WriteFile(fd, (char*)&tpcFileHeader, sizeof(CheckpointUtils::TpcFileHeader));
if (wrStat != sizeof(CheckpointUtils::TpcFileHeader)) {
MOT_LOG_ERROR("create2PCRecoveryFile: failed to write 2pc file's header (%d) [%d %s]",
@ -842,7 +841,7 @@ bool CheckpointManager::CreateTpcRecoveryFile()
break;
}
if (tpcFileHeader.m_numEntries > 0 && GetRecoveryManager()->SerializeInProcessTxns(fd) == false) {
if (tpcFileHeader.m_numEntries > 0 && SerializeInProcessTxns(fd) == false) {
MOT_LOG_ERROR("create2PCRecoveryFile: failed to serialize transactions [%d %s]", errno, gs_strerror(errno));
break;
}
@ -858,10 +857,86 @@ bool CheckpointManager::CreateTpcRecoveryFile()
}
ret = true;
} while (0);
GetRecoveryManager()->UnlockInProcessTxns();
MOTEngine::GetInstance()->GetInProcessTransactions().Unlock();
return ret;
}
bool CheckpointManager::SerializeInProcessTxns(int fd)
{
if (fd == -1) {
MOT_LOG_ERROR("SerializeInProcessTxns: bad fd");
return false;
}
auto serializeLambda = [this, fd](RedoLogTransactionSegments* segments, uint64_t) -> RC {
errno_t erc;
LogSegment* segment = segments->GetSegment(segments->GetCount() - 1);
size_t bufSize = 0;
char* buf = nullptr;
CheckpointUtils::TpcEntryHeader header;
uint64_t csn = segment->m_controlBlock.m_csn;
for (uint32_t i = 0; i < segments->GetCount(); i++) {
segment = segments->GetSegment(i);
size_t sz = segment->SerializeSize();
if (buf == nullptr) {
buf = (char*)malloc(sz);
MOT_LOG_DEBUG("SerializeInProcessTxns: alloc %lu - %p", sz, buf);
bufSize = sz;
} else if (sz > bufSize) {
char* bufTmp = (char*)malloc(sz);
if (bufTmp == nullptr) {
free(buf);
buf = nullptr;
} else {
erc = memcpy_s(bufTmp, sz, buf, bufSize);
securec_check(erc, "\0", "\0");
free(buf);
buf = bufTmp;
}
MOT_LOG_DEBUG("SerializeInProcessTxns: realloc %lu - %p", sz, buf);
bufSize = sz;
}
if (buf == nullptr) {
MOT_LOG_ERROR("SerializeInProcessTxns: failed to allocate buffer (%lu bytes)", sz);
return RC_ERROR;
}
header.m_magic = CP_MGR_MAGIC;
header.m_len = bufSize;
segment->Serialize(buf);
size_t wrStat = CheckpointUtils::WriteFile(fd, (char*)&header, sizeof(CheckpointUtils::TpcEntryHeader));
if (wrStat != sizeof(CheckpointUtils::TpcEntryHeader)) {
MOT_LOG_ERROR("SerializeInProcessTxns: failed to write header (wrote %lu) [%d:%s]",
wrStat,
errno,
gs_strerror(errno));
free(buf);
return RC_ERROR;
}
wrStat = CheckpointUtils::WriteFile(fd, buf, bufSize);
if (wrStat != bufSize) {
MOT_LOG_ERROR("SerializeInProcessTxns: failed to write %lu bytes to file (wrote %lu) [%d:%s]",
bufSize,
wrStat,
errno,
gs_strerror(errno));
free(buf);
return RC_ERROR;
}
MOT_LOG_DEBUG("SerializeInProcessTxns: wrote seg %p %lu bytes", segment, bufSize);
if (buf != nullptr) {
free(buf);
}
}
return RC_OK;
};
return MOTEngine::GetInstance()->GetInProcessTransactions().ForEachTransaction(serializeLambda, false);
}
bool CheckpointManager::CreateEndFile()
{
int fd = -1;

View File

@ -166,6 +166,11 @@ public:
return m_id;
}
void SetId(uint64_t id)
{
m_id = id;
}
uint64_t GetLastReplayLsn()
{
return m_lastReplayLsn;
@ -268,11 +273,6 @@ private:
// this lock guards gs_ctl checkpoint fetching
pthread_rwlock_t m_fetchLock;
void SetId(uint64_t id)
{
m_id = id;
}
CheckpointPhase GetPhase() const
{
return m_phase;
@ -385,6 +385,12 @@ private:
*/
bool CreateEndFile();
/**
* @brief Serializes inProcess transactions to disk
* @return Boolean value denoting success or failure.
*/
bool SerializeInProcessTxns(int fd);
void ResetFlags();
/**

View File

@ -82,12 +82,14 @@ public:
inline Table* GetTable(InternalTableId tableId)
{
Table* table = nullptr;
m_rwLock.RdLock();
InternalTableMap::iterator it = m_tablesById.find(tableId);
if (it != m_tablesById.end())
if (it != m_tablesById.end()) {
table = it->second;
}
m_rwLock.RdUnlock();
return table;
}

View File

@ -44,6 +44,7 @@
#include "connection_id.h"
#include "cycles.h"
#include "debug_utils.h"
#include "recovery_manager_factory.h"
// For mtSessionThreadInfo thread local
#include "kvthread.hh"
@ -741,7 +742,7 @@ bool MOTEngine::InitializeRecoveryManager()
MOT_ERROR_INVALID_STATE, "MOT Engine Startup", "Double attempt to initialize recovery manager");
}
m_recoveryManager = new (std::nothrow) RecoveryManager();
m_recoveryManager = RecoveryManagerFactory::CreateRecoveryManager();
if (m_recoveryManager == nullptr) {
MOT_REPORT_ERROR(MOT_ERROR_OOM, "MOT Engine Startup", "Failed to allocate memory for recovery manager object");
return false;

View File

@ -37,12 +37,13 @@
#include "checkpoint_manager.h"
#include "utilities.h"
#include "asynchronous_redo_log_handler.h"
#include "recovery_manager.h"
#include "irecovery_manager.h"
#include "table_manager.h"
#include "session_manager.h"
#include "surrogate_key_manager.h"
#include "gc_context.h"
#include "mot_atomic_ops.h"
#include "inprocess_transactions.h"
namespace MOT {
class ConfigLoader;
@ -301,11 +302,16 @@ public:
/**********************************************************************/
// Recovery API
/**********************************************************************/
inline RecoveryManager* GetRecoveryManager()
inline IRecoveryManager* GetRecoveryManager()
{
return m_recoveryManager;
}
inline InProcessTransactions& GetInProcessTransactions()
{
return m_inProcessTransactions;
}
inline bool CreateRecoverySessionContext()
{
MOT_ASSERT(m_recoveryManager);
@ -356,8 +362,8 @@ public:
inline bool IsInProcessTx(uint64_t id)
{
MOT_ASSERT(m_recoveryManager);
return m_recoveryManager->IsInProcessTx(id);
uint64_t intId;
return m_inProcessTransactions.FindTransactionId(id, intId, false);
}
inline uint64_t PerformInProcessTx(uint64_t id, bool isCommit)
@ -535,7 +541,7 @@ private:
SurrogateKeyManager* m_surrogateKeyManager;
/** @var The recovery manager. */
RecoveryManager* m_recoveryManager;
IRecoveryManager* m_recoveryManager;
/** @var The redo-log handler. */
RedoLogHandler* m_redoLogHandler;
@ -543,6 +549,9 @@ private:
/** @var The checkpoint manager. */
CheckpointManager* m_checkpointManager;
/** @var The In-ProcessTransactions container. */
InProcessTransactions m_inProcessTransactions;
// record initialization failure point, so that Destroy can be called at any point of failure
enum InitPhase {
INIT_CFG_PHASE,
@ -630,7 +639,7 @@ inline SurrogateKeyManager* GetSurrogateKeyManager()
}
/** @brief Retrieves the recovery manager. */
inline RecoveryManager* GetRecoveryManager()
inline IRecoveryManager* GetRecoveryManager()
{
return MOTEngine::GetInstance()->GetRecoveryManager();
}

View File

@ -0,0 +1,735 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* checkpoint_recovery.cpp
* Handles recovery from checkpoint.
*
* IDENTIFICATION
* src/gausskernel/storage/mot/core/src/system/recovery/checkpoint_recovery.cpp
*
* -------------------------------------------------------------------------
*/
#include <thread>
#include "mot_engine.h"
#include "checkpoint_recovery.h"
#include "checkpoint_utils.h"
#include "irecovery_manager.h"
#include "redo_log_transaction_iterator.h"
namespace MOT {
DECLARE_LOGGER(CheckpointRecovery, Recovery);
bool CheckpointRecovery::Recover()
{
MOT::MOTEngine* engine = MOT::MOTEngine::GetInstance();
if (engine == nullptr) {
MOT_REPORT_ERROR(MOT_ERROR_OOM, "Checkpoint Recover Initialization", "No MOT Engine object");
return false;
}
m_tasksList.clear();
if (CheckpointControlFile::GetCtrlFile() == nullptr) {
MOT_REPORT_ERROR(MOT_ERROR_OOM, "Checkpoint Recovery Initialization", "Failed to allocate ctrlfile object");
return false;
}
if (CheckpointControlFile::GetCtrlFile()->GetId() == CheckpointControlFile::invalidId) {
m_checkpointId = CheckpointControlFile::invalidId; // no mot control was found.
} else {
if (IsCheckpointValid(CheckpointControlFile::GetCtrlFile()->GetId())) {
m_checkpointId = CheckpointControlFile::GetCtrlFile()->GetId();
m_lsn = CheckpointControlFile::GetCtrlFile()->GetLsn();
m_lastReplayLsn = CheckpointControlFile::GetCtrlFile()->GetLastReplayLsn();
} else {
MOT_LOG_ERROR("CheckpointRecovery:: no valid checkpoint exist");
return false;
}
}
if (m_checkpointId != CheckpointControlFile::invalidId) {
if (m_lsn >= m_lastReplayLsn) {
MOT_LOG_INFO("CheckpointRecovery LSN Check: will use the LSN (%lu), ignoring the lastReplayLSN (%lu)",
m_lsn,
m_lastReplayLsn);
} else {
MOT_LOG_WARN("CheckpointRecovery LSN Check: will use the lastReplayLSN (%lu), ignoring the LSN (%lu)",
m_lastReplayLsn,
m_lsn);
m_lsn = m_lastReplayLsn;
}
}
if (!CheckpointUtils::SetWorkingDir(m_workingDir, m_checkpointId)) {
MOT_LOG_ERROR("CheckpointRecovery:: failed to obtain checkpoint's working dir");
return false;
}
int taskFillStat = FillTasksFromMapFile();
if (taskFillStat < 0) {
MOT_LOG_INFO("CheckpointRecovery:: failed to read map file");
return false; // error was already set
} else if (taskFillStat == 0) { // fresh install
return true;
}
if (m_tasksList.size() > 0) {
if (GetGlobalConfiguration().m_enableIncrementalCheckpoint) {
MOT_LOG_ERROR(
"CheckpointRecovery: recovery of MOT tables failed. MOT does not support incremental checkpoint");
return false;
}
MOT_LOG_INFO("CheckpointRecovery: starting to recover %lu tables from checkpoint id: %lu",
m_tableIds.size(),
m_checkpointId);
for (auto it = m_tableIds.begin(); it != m_tableIds.end(); ++it) {
if (!RecoverTableMetadata(*it)) {
MOT_LOG_ERROR("CheckpointRecovery: recovery of table %lu's metadata failed", *it);
return false;
}
}
std::vector<std::thread> threadPool;
for (uint32_t i = 0; i < m_numWorkers; ++i) {
threadPool.push_back(std::thread(CheckpointRecoveryWorker, this));
}
MOT_LOG_DEBUG("CheckpointRecovery:: waiting for all tasks to finish");
while (HaveTasks() && m_stopWorkers == false) {
sleep(1);
}
MOT_LOG_DEBUG("CheckpointRecovery: tasks finished (%s)", m_errorSet ? "error" : "ok");
for (auto& worker : threadPool) {
if (worker.joinable()) {
worker.join();
}
}
if (m_errorSet) {
MOT_LOG_ERROR("Checkpoint recovery failed! error: %u:%s", m_errorCode, RcToString(m_errorCode));
return false;
}
}
if (!RecoverInProcessTxns()) {
MOT_LOG_ERROR("Failed to recover the in-process transactions from the checkpoint");
return false;
}
// set the current valid id in the checkpoint manager in case
// we will need to retrieve it before a new checkpoint is created
engine->GetCheckpointManager()->SetId(m_checkpointId);
MOT_LOG_INFO("Checkpoint Recovery: finished recovering %lu tables from checkpoint id: %lu",
m_tableIds.size(),
m_checkpointId);
m_tableIds.clear();
MOTEngine::GetInstance()->GetCheckpointManager()->RemoveOldCheckpoints(m_checkpointId);
return true;
}
int CheckpointRecovery::FillTasksFromMapFile()
{
if (m_checkpointId == CheckpointControlFile::invalidId) {
return 0; // fresh install probably. no error
}
std::string mapFile;
CheckpointUtils::MakeMapFilename(mapFile, m_workingDir, m_checkpointId);
int fd = -1;
if (!CheckpointUtils::OpenFileRead(mapFile, fd)) {
MOT_LOG_ERROR("CheckpointRecovery::fillTasksFromMapFile: failed to open map file '%s'", mapFile.c_str());
return -1;
}
CheckpointUtils::MapFileHeader mapFileHeader;
if (CheckpointUtils::ReadFile(fd, (char*)&mapFileHeader, sizeof(CheckpointUtils::MapFileHeader)) !=
sizeof(CheckpointUtils::MapFileHeader)) {
MOT_LOG_ERROR("CheckpointRecovery::fillTasksFromMapFile: failed to read map file '%s' header", mapFile.c_str());
CheckpointUtils::CloseFile(fd);
return -1;
}
if (mapFileHeader.m_magic != CP_MGR_MAGIC) {
MOT_LOG_ERROR("CheckpointRecovery::fillTasksFromMapFile: failed to verify map file'%s'", mapFile.c_str());
CheckpointUtils::CloseFile(fd);
return -1;
}
CheckpointManager::MapFileEntry entry;
for (uint64_t i = 0; i < mapFileHeader.m_numEntries; i++) {
if (CheckpointUtils::ReadFile(fd, (char*)&entry, sizeof(CheckpointManager::MapFileEntry)) !=
sizeof(CheckpointManager::MapFileEntry)) {
MOT_LOG_ERROR("CheckpointRecovery::fillTasksFromMapFile: failed to read map file '%s' entry: %lu",
mapFile.c_str(),
i);
CheckpointUtils::CloseFile(fd);
return -1;
}
m_tableIds.insert(entry.m_id);
for (uint32_t i = 0; i <= entry.m_numSegs; i++) {
Task* recoveryTask = new (std::nothrow) Task(entry.m_id, i);
if (recoveryTask == nullptr) {
CheckpointUtils::CloseFile(fd);
MOT_LOG_ERROR("CheckpointRecovery::fillTasksFromMapFile: failed to allocate task object");
return -1;
}
m_tasksList.push_back(recoveryTask);
}
}
CheckpointUtils::CloseFile(fd);
MOT_LOG_INFO("CheckpointRecovery::fillTasksFromMapFile: filled %lu tasks", m_tasksList.size());
return 1;
}
bool CheckpointRecovery::RecoverTableMetadata(uint32_t tableId)
{
int fd = -1;
std::string fileName;
CheckpointUtils::MakeMdFilename(tableId, fileName, m_workingDir);
if (!CheckpointUtils::OpenFileRead(fileName, fd)) {
MOT_LOG_ERROR("CheckpointRecovery::recoverTableMetadata: failed to open file: %s", fileName.c_str());
return false;
}
CheckpointUtils::MetaFileHeader mFileHeader;
size_t reader = CheckpointUtils::ReadFile(fd, (char*)&mFileHeader, sizeof(CheckpointUtils::MetaFileHeader));
if (reader != sizeof(CheckpointUtils::MetaFileHeader)) {
MOT_LOG_ERROR("CheckpointRecovery::recoverTableMetadata: failed to read meta file header, reader %lu", reader);
CheckpointUtils::CloseFile(fd);
return false;
}
if (mFileHeader.m_fileHeader.m_magic != CP_MGR_MAGIC || mFileHeader.m_fileHeader.m_tableId != tableId) {
MOT_LOG_ERROR("CheckpointRecovery::recoverTableMetadata: file: %s is corrupted", fileName.c_str());
CheckpointUtils::CloseFile(fd);
return false;
}
char* dataBuf = new (std::nothrow) char[mFileHeader.m_entryHeader.m_dataLen];
if (dataBuf == nullptr) {
MOT_LOG_ERROR("CheckpointRecovery::recoverTableMetadata: failed to allocate table buffer");
CheckpointUtils::CloseFile(fd);
return false;
}
reader = CheckpointUtils::ReadFile(fd, dataBuf, mFileHeader.m_entryHeader.m_dataLen);
if (reader != mFileHeader.m_entryHeader.m_dataLen) {
MOT_LOG_ERROR("CheckpointRecovery::recoverTableMetadata: failed to read table entry (%u), reader %lu",
mFileHeader.m_entryHeader.m_dataLen,
reader);
CheckpointUtils::CloseFile(fd);
delete[] dataBuf;
return false;
}
CheckpointUtils::CloseFile(fd);
bool status = CreateTable(dataBuf);
delete[] dataBuf;
return status;
}
bool CheckpointRecovery::CreateTable(char* data)
{
string name;
string longName;
uint32_t intId = 0;
uint64_t extId = 0;
Table::DeserializeNameAndIds((const char*)data, intId, extId, name, longName);
MOT_LOG_INFO("CheckpointRecovery::CreateTable: got intId: %u, extId: %lu, %s/%s",
intId,
extId,
name.c_str(),
longName.c_str());
if (GetTableManager()->VerifyTableExists(intId, extId, name, longName)) {
MOT_LOG_ERROR("CheckpointRecovery::CreateTable: table %u already exists", intId);
return false;
}
Table* table = new (std::nothrow) Table();
if (table == nullptr) {
MOT_REPORT_ERROR(MOT_ERROR_OOM, "Checkpoint Recovery Create Table", "failed to allocate table object");
return false;
}
table->Deserialize((const char*)data);
do {
if (!table->IsDeserialized()) {
MOT_LOG_ERROR("CheckpointRecovery::CreateTable: failed to de-serialize table");
break;
}
if (!GetTableManager()->AddTable(table)) {
MOT_LOG_ERROR("CheckpointRecovery::CreateTable: failed to add table to engine");
break;
}
MOT_LOG_INFO("CheckpointRecovery::CreateTable: table %s [internal id %u] created",
table->GetLongTableName().c_str(),
table->GetTableId());
return true;
} while (0);
MOT_LOG_ERROR("CheckpointRecovery::CreateTable: failed to recover table");
delete table;
return false;
}
void CheckpointRecovery::OnError(RC errCode, const char* errMsg, const char* optionalMsg)
{
m_errorLock.lock();
m_stopWorkers = true;
if (!m_errorSet) {
m_errorCode = errCode;
m_errorMessage.clear();
m_errorMessage.append(errMsg);
if (optionalMsg != nullptr) {
m_errorMessage.append(" ");
m_errorMessage.append(optionalMsg);
}
m_errorSet = true;
}
m_errorLock.unlock();
}
void CheckpointRecovery::CheckpointRecoveryWorker(CheckpointRecovery* checkpointRecovery)
{
// since this is a non-kernel thread we must set-up our own u_sess struct for the current thread
MOT_DECLARE_NON_KERNEL_THREAD();
MOT::MOTEngine* engine = MOT::MOTEngine::GetInstance();
SessionContext* sessionContext = GetSessionManager()->CreateSessionContext();
int threadId = MOTCurrThreadId;
// in a thread-pooled envelope the affinity could be disabled, so we use task affinity here
if (GetGlobalConfiguration().m_enableNuma && !GetTaskAffinity().SetAffinity(threadId)) {
MOT_LOG_WARN("Failed to set affinity of checkpoint recovery worker, recovery from checkpoint performance may be"
" affected");
}
SurrogateState sState;
if (sState.IsValid() == false) {
checkpointRecovery->OnError(
RC_MEMORY_ALLOCATION_ERROR, "CheckpointRecovery::WorkerFunc failed to allocate surrogate state");
return;
}
MOT_LOG_DEBUG("CheckpointRecovery::WorkerFunc start [%u] on cpu %lu", (unsigned)MOTCurrThreadId, sched_getcpu());
uint64_t maxCsn = 0;
char* keyData = (char*)malloc(MAX_KEY_SIZE);
if (keyData == nullptr) {
MOT_LOG_ERROR("RecoveryManager::WorkerFunc: failed to allocate key buffer");
checkpointRecovery->OnError(
RC_MEMORY_ALLOCATION_ERROR, "CheckpointRecovery::WorkerFunc failed to allocate key data");
}
char* entryData = (char*)malloc(MAX_TUPLE_SIZE);
if (entryData == nullptr) {
MOT_LOG_ERROR("CheckpointRecovery::WorkerFunc: failed to allocate row buffer");
checkpointRecovery->OnError(
RC_MEMORY_ALLOCATION_ERROR, "CheckpointRecovery::WorkerFunc failed to allocate row buffer");
}
RC status = RC_OK;
while (checkpointRecovery->ShouldStopWorkers() == false) {
CheckpointRecovery::Task* task = checkpointRecovery->GetTask();
if (task != nullptr) {
bool hadError = false;
if (!checkpointRecovery->RecoverTableRows(task, keyData, entryData, maxCsn, sState, status)) {
MOT_LOG_ERROR("CheckpointRecovery::WorkerFunc recovery of table %lu's data failed", task->m_id);
checkpointRecovery->OnError(status,
"CheckpointRecovery::WorkerFunc failed to recover table: ",
std::to_string(task->m_id).c_str());
hadError = true;
}
delete task;
if (hadError) {
break;
}
} else {
break;
}
}
if (entryData != nullptr) {
free(entryData);
}
if (keyData != nullptr) {
free(keyData);
}
(GetRecoveryManager())->SetCsn(maxCsn);
if (sState.IsEmpty() == false) {
(GetRecoveryManager())->AddSurrogateArrayToList(sState);
}
GetSessionManager()->DestroySessionContext(sessionContext);
engine->OnCurrentThreadEnding();
MOT_LOG_DEBUG("CheckpointRecovery::WorkerFunc end [%u] on cpu %lu", (unsigned)MOTCurrThreadId, sched_getcpu());
}
bool CheckpointRecovery::RecoverTableRows(
Task* task, char* keyData, char* entryData, uint64_t& maxCsn, SurrogateState& sState, RC& status)
{
if (task == nullptr) {
MOT_LOG_ERROR("CheckpointRecovery::RecoverTableRows: no task given");
return false;
}
int fd = -1;
uint32_t seg = task->m_seg;
uint32_t tableId = task->m_id;
Table* table = GetTableManager()->GetTable(tableId);
if (table == nullptr) {
MOT_REPORT_ERROR(
MOT_ERROR_INTERNAL, "CheckpointRecovery::RecoverTableRows", "Table %llu does not exist", tableId);
return false;
}
std::string fileName;
CheckpointUtils::MakeCpFilename(tableId, fileName, m_workingDir, seg);
if (!CheckpointUtils::OpenFileRead(fileName, fd)) {
MOT_LOG_ERROR("CheckpointRecovery::RecoverTableRows: failed to open file: %s", fileName.c_str());
return false;
}
CheckpointUtils::FileHeader fileHeader;
size_t reader = CheckpointUtils::ReadFile(fd, (char*)&fileHeader, sizeof(CheckpointUtils::FileHeader));
if (reader != sizeof(CheckpointUtils::FileHeader)) {
MOT_LOG_ERROR("CheckpointRecovery::RecoverTableRows: failed to read file header, reader %lu", reader);
CheckpointUtils::CloseFile(fd);
return false;
}
if (fileHeader.m_magic != CP_MGR_MAGIC || fileHeader.m_tableId != tableId) {
MOT_LOG_ERROR("CheckpointRecovery::RecoverTableRows: file: %s is corrupted", fileName.c_str());
CheckpointUtils::CloseFile(fd);
return false;
}
uint64_t tableExId = table->GetTableExId();
if (tableExId != fileHeader.m_exId) {
MOT_LOG_ERROR(
"CheckpointRecovery::RecoverTableRows: exId mismatch: my %lu - pkt %lu", tableExId, fileHeader.m_exId);
return false;
}
if (IsMemoryLimitReached(m_numWorkers, GetGlobalConfiguration().m_checkpointSegThreshold)) {
MOT_LOG_ERROR("CheckpointRecovery::RecoverTableRows: Memory hard limit reached. Cannot recover datanode");
return false;
}
CheckpointUtils::EntryHeader entry;
for (uint64_t i = 0; i < fileHeader.m_numOps; i++) {
reader = CheckpointUtils::ReadFile(fd, (char*)&entry, sizeof(CheckpointUtils::EntryHeader));
if (reader != sizeof(CheckpointUtils::EntryHeader)) {
MOT_LOG_ERROR(
"CheckpointRecovery::RecoverTableRows: failed to read entry header (elem: %lu / %lu), reader %lu",
i,
fileHeader.m_numOps,
reader);
status = RC_ERROR;
break;
}
if (entry.m_keyLen > MAX_KEY_SIZE || entry.m_dataLen > MAX_TUPLE_SIZE) {
MOT_LOG_ERROR(
"CheckpointRecovery::RecoverTableRows: invalid entry (elem: %lu / %lu), keyLen %u, dataLen %u",
i,
fileHeader.m_numOps,
entry.m_keyLen,
entry.m_dataLen);
status = RC_ERROR;
break;
}
reader = CheckpointUtils::ReadFile(fd, keyData, entry.m_keyLen);
if (reader != entry.m_keyLen) {
MOT_LOG_ERROR(
"CheckpointRecovery::RecoverTableRows: failed to read entry key (elem: %lu / %lu), reader %lu",
i,
fileHeader.m_numOps,
reader);
status = RC_ERROR;
break;
}
reader = CheckpointUtils::ReadFile(fd, entryData, entry.m_dataLen);
if (reader != entry.m_dataLen) {
MOT_LOG_ERROR(
"CheckpointRecovery::RecoverTableRows: failed to read entry data (elem: %lu / %lu), reader %lu",
i,
fileHeader.m_numOps,
reader);
status = RC_ERROR;
break;
}
InsertRow(table,
keyData,
entry.m_keyLen,
entryData,
entry.m_dataLen,
entry.m_csn,
MOTCurrThreadId,
sState,
status,
entry.m_rowId);
if (status != RC_OK) {
MOT_LOG_ERROR(
"CheckpointRecovery: failed to insert row %s (error code: %d)", RcToString(status), (int)status);
break;
}
MOT_LOG_DEBUG("Inserted into table %u row with CSN %" PRIu64, tableId, entry.m_csn);
if (entry.m_csn > maxCsn)
maxCsn = entry.m_csn;
}
CheckpointUtils::CloseFile(fd);
MOT_LOG_DEBUG("[%u] CheckpointRecovery::RecoverTableRows table %u:%u, %lu rows recovered (%s)",
MOTCurrThreadId,
tableId,
seg,
fileHeader.m_numOps,
(status == RC_OK) ? "OK" : "Error");
return (status == RC_OK);
}
CheckpointRecovery::Task* CheckpointRecovery::GetTask()
{
Task* task = nullptr;
do {
m_tasksLock.lock();
if (m_tasksList.empty()) {
break;
}
task = m_tasksList.front();
m_tasksList.pop_front();
} while (0);
m_tasksLock.unlock();
return task;
}
uint32_t CheckpointRecovery::HaveTasks()
{
m_tasksLock.lock();
bool noMoreTasks = m_tasksList.empty();
m_tasksLock.unlock();
return !noMoreTasks;
}
bool CheckpointRecovery::IsMemoryLimitReached(uint32_t numThreads, uint32_t neededBytes)
{
uint64_t memoryRequiredBytes = numThreads * neededBytes;
if (MOTEngine::GetInstance()->GetCurrentMemoryConsumptionBytes() + memoryRequiredBytes >=
MOTEngine::GetInstance()->GetHardMemoryLimitBytes()) {
MOT_LOG_WARN("CheckpointRecovery::IsMemoryLimitReached: memory limit reached "
"current memory: %lu, required memory: %lu, hard limit memory: %lu",
MOTEngine::GetInstance()->GetCurrentMemoryConsumptionBytes(),
memoryRequiredBytes,
MOTEngine::GetInstance()->GetHardMemoryLimitBytes());
return true;
} else {
return false;
}
}
void CheckpointRecovery::InsertRow(Table* table, char* keyData, uint16_t keyLen, char* rowData, uint64_t rowLen,
uint64_t csn, uint32_t tid, SurrogateState& sState, RC& status, uint64_t rowId)
{
MaxKey key;
Row* row = table->CreateNewRow();
if (row == nullptr) {
status = RC_ERROR;
MOT_REPORT_ERROR(MOT_ERROR_OOM, "Recovery Manager Insert Row", "failed to create row");
return;
}
row->CopyData((const uint8_t*)rowData, rowLen);
row->SetCommitSequenceNumber(csn);
row->SetRowId(rowId);
MOT::Index* ix = table->GetPrimaryIndex();
if (ix->IsFakePrimary()) {
row->SetSurrogateKey(*(uint64_t*)keyData);
sState.UpdateMaxKey(rowId);
}
key.CpKey((const uint8_t*)keyData, keyLen);
status = table->InsertRowNonTransactional(row, tid, &key);
if (status != RC_OK) {
MOT_REPORT_ERROR(MOT_ERROR_OOM, "Recovery Manager Insert Row", "failed to insert row");
table->DestroyRow(row);
}
}
bool CheckpointRecovery::RecoverInProcessTxns()
{
int fd = -1;
std::string fileName;
std::string workingDir;
bool ret = false;
do {
if (!CheckpointUtils::SetWorkingDir(workingDir, m_checkpointId)) {
break;
}
CheckpointUtils::MakeTpcFilename(fileName, workingDir, m_checkpointId);
if (!CheckpointUtils::OpenFileRead(fileName, fd)) {
MOT_LOG_ERROR("RecoveryManager::RecoverInProcessTxns: failed to open file '%s'", fileName.c_str());
break;
}
CheckpointUtils::TpcFileHeader tpcFileHeader;
if (CheckpointUtils::ReadFile(fd, (char*)&tpcFileHeader, sizeof(CheckpointUtils::TpcFileHeader)) !=
sizeof(CheckpointUtils::TpcFileHeader)) {
MOT_LOG_ERROR("RecoveryManager::RecoverInProcessTxns: failed to read file '%s' header", fileName.c_str());
CheckpointUtils::CloseFile(fd);
break;
}
if (tpcFileHeader.m_magic != CP_MGR_MAGIC) {
MOT_LOG_ERROR(
"RecoveryManager::RecoverInProcessTxns: failed to validate file's header ('%s')", fileName.c_str());
CheckpointUtils::CloseFile(fd);
break;
}
if (tpcFileHeader.m_numEntries == 0) {
MOT_LOG_INFO("RecoveryManager::RecoverInProcessTxns: no tpc entries to recover");
CheckpointUtils::CloseFile(fd);
ret = true;
break;
}
if (!DeserializeInProcessTxns(fd, tpcFileHeader.m_numEntries)) {
MOT_LOG_ERROR("RecoveryManager::RecoverInProcessTxns: failed to deserialize in-process transactions");
CheckpointUtils::CloseFile(fd);
break;
}
CheckpointUtils::CloseFile(fd);
ret = true;
} while (0);
return ret;
}
bool CheckpointRecovery::DeserializeInProcessTxns(int fd, uint64_t numEntries)
{
errno_t erc;
char* buf = nullptr;
size_t bufSize = 0;
uint32_t readEntries = 0;
bool success = false;
CheckpointUtils::TpcEntryHeader header;
if (fd == -1) {
MOT_LOG_ERROR("deserializeInProcessTxns: bad fd");
return false;
}
MOT_LOG_DEBUG("DeserializeInProcessTxns: n %d", numEntries);
while (readEntries < numEntries) {
success = false;
size_t sz = CheckpointUtils::ReadFile(fd, (char*)&header, sizeof(CheckpointUtils::TpcEntryHeader));
if (sz == 0) {
MOT_LOG_DEBUG("DeserializeInProcessTxns: eof, read %d entries", readEntries);
success = true;
break;
} else if (sz != sizeof(CheckpointUtils::TpcEntryHeader)) {
MOT_LOG_ERROR("DeserializeInProcessTxns: failed to read segment header", sz, errno, gs_strerror(errno));
break;
}
if (header.m_magic != CP_MGR_MAGIC || header.m_len > REDO_DEFAULT_BUFFER_SIZE) {
MOT_LOG_ERROR("DeserializeInProcessTxns: bad entry %lu - %lu", header.m_magic, header.m_len);
break;
}
MOT_LOG_DEBUG("DeserializeInProcessTxns: entry len %lu", header.m_len);
if (buf == nullptr || header.m_len > bufSize) {
if (buf != nullptr) {
free(buf);
}
buf = (char*)malloc(header.m_len);
MOT_LOG_DEBUG("DeserializeInProcessTxns: alloc %lu - %p", header.m_len, buf);
bufSize = header.m_len;
}
if (buf == nullptr) {
MOT_LOG_ERROR("DeserializeInProcessTxns: failed to allocate buffer (%lu bytes)", header.m_magic);
break;
}
if (CheckpointUtils::ReadFile(fd, buf, bufSize) != bufSize) {
MOT_LOG_ERROR("DeserializeInProcessTxns: failed to read data from file (%lu bytes)", bufSize);
break;
}
MOT::LogSegment* segment = new (std::nothrow) MOT::LogSegment();
if (segment == nullptr) {
MOT_LOG_ERROR("DeserializeInProcessTxns: failed to allocate segment");
break;
}
segment->m_replayLsn = 0;
erc = memcpy_s(&segment->m_len, sizeof(size_t), buf, sizeof(size_t));
securec_check(erc, "\0", "\0");
segment->m_data = new (std::nothrow) char[segment->m_len];
if (segment->m_data == nullptr) {
MOT_LOG_ERROR("DeserializeInProcessTxns: failed to allocate memory for segment data");
delete segment;
break;
}
segment->Deserialize(buf);
if (!MOTEngine::GetInstance()->GetInProcessTransactions().InsertLogSegment(segment)) {
MOT_LOG_ERROR("DeserializeInProcessTxns: insert log segment failed");
delete segment;
break;
}
readEntries++;
success = true;
}
if (buf != nullptr) {
free(buf);
}
return success;
}
bool CheckpointRecovery::IsCheckpointValid(uint64_t id)
{
int fd = -1;
std::string fileName;
std::string workingDir;
bool ret = false;
do {
if (!CheckpointUtils::SetWorkingDir(workingDir, id)) {
break;
}
CheckpointUtils::MakeEndFilename(fileName, workingDir, id);
if (!CheckpointUtils::FileExists(fileName)) {
MOT_LOG_ERROR("IsCheckpointValid: checkpoint id %lu is invalid", id);
break;
}
ret = true;
} while (0);
return ret;
}
} // namespace MOT

View File

@ -0,0 +1,217 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* checkpoint_recovery.h
* Handles recovery from checkpoint.
*
* IDENTIFICATION
* src/gausskernel/storage/mot/core/src/system/recovery/checkpoint_recovery.h
*
* -------------------------------------------------------------------------
*/
#ifndef CHECKPOINT_RECOVERY_H
#define CHECKPOINT_RECOVERY_H
#include <set>
#include <list>
#include <mutex>
#include "global.h"
#include "spin_lock.h"
#include "table.h"
#include "surrogate_state.h"
namespace MOT {
class CheckpointRecovery {
public:
CheckpointRecovery()
: m_checkpointId(0),
m_lsn(0),
m_lastReplayLsn(0),
m_numWorkers(GetGlobalConfiguration().m_checkpointRecoveryWorkers),
m_stopWorkers(false),
m_errorSet(false),
m_errorCode(RC_OK)
{}
~CheckpointRecovery()
{}
/**
* @brief Recovers the database state from the last valid
* checkpoint
* @return Boolean value denoting success or failure.
*/
bool Recover();
/**
* @brief error callback.
*/
void OnError(RC errCode, const char* errMsg, const char* optionalMsg = nullptr);
bool ShouldStopWorkers() const
{
return m_stopWorkers;
}
/**
* @struct Task
* @brief Describes a checkpoint recovery task by its table id and
* segment file number.
*/
struct Task {
explicit Task(uint32_t id = 0, uint32_t seg = 0) : m_id(id), m_seg(seg)
{}
uint32_t m_id;
uint32_t m_seg;
};
/**
* @brief Pops a task from the tasks queue.
* @return The task that was retrieved from the queue.
*/
CheckpointRecovery::Task* GetTask();
/**
* @brief Reads and inserts rows from a checkpoint file
* @param task The task (tableid / segment) to recover from.
* @param keyData A key buffer.
* @param entryData A row buffer..
* @param maxCsn The returned maxCsn encountered during the recovery.
* @param sState Surrogate key state structure that will be filled.
* during the recovery.
* @param status RC returned from the Insert function.
* @return Boolean value denoting success or failure.
*/
bool RecoverTableRows(
Task* task, char* keyData, char* entryData, uint64_t& maxCsn, SurrogateState& sState, RC& status);
uint64_t GetLsn() const
{
return m_lsn;
}
/**
* @brief Implements the a checkpoint recovery worker
* @param checkpointRecovery The caller checkpoint recovery class
*/
static void CheckpointRecoveryWorker(CheckpointRecovery* checkpointRecovery);
private:
/**
* @brief Reads and creates a table's definition from a checkpoint
* metadata file
* @param tableId The table id to recover.
* @return Boolean value denoting success or failure.
*/
bool RecoverTableMetadata(uint32_t tableId);
/**
* @brief Reads the checkpoint map file and fills the tasks queue
* with the relevant information.
* @return Int value where 0 indicates no tasks (empty checkpoint),
* -1 denotes an error has occurred and 1 means a success.
*/
int FillTasksFromMapFile();
/**
* @brief Checks if there are any more tasks left in the queue
* @return Int value where 0 means failure and 1 success
*/
uint32_t HaveTasks();
/**
* @brief Recovers the in process two phase commit related transactions
* from the checkpoint data file.
* @return Boolean value denoting success or failure.
*/
bool RecoverInProcessTxns();
/**
* @brief Deserializes the in process two phase commit data from the
* checkpoint data file. called by RecoverTpc.
* @return Boolean value denoting success or failure.
*/
bool DeserializeInProcessTxns(int fd, uint64_t numEntries);
/**
* @brief Inserts a row into the database in a non transactional manner.
* @param table the table's object pointer.
* @param keyData key's data buffer.
* @param keyLen key's data buffer len.
* @param rowData row's data buffer.
* @param rowLen row's data buffer len.
* @param csn the operation's csn.
* @param tid the thread id of the recovering thread.
* @param sState the returned surrogate state.
* @param status the returned status of the operation
* @param rowId the row's internal id
*/
void InsertRow(Table* table, char* keyData, uint16_t keyLen, char* rowData, uint64_t rowLen, uint64_t csn,
uint32_t tid, SurrogateState& sState, RC& status, uint64_t rowId);
/**
* @brief performs table creation.
* @param data the table's data
* @return Boolean value that represents that status of the operation.
*/
bool CreateTable(char* data);
/**
* @brief returns if a checkpoint is valid by its id.
* @param id the checkpoint's id.
* @return Boolean value that is true if the transaction is committed.
*/
bool IsCheckpointValid(uint64_t id);
/**
* @brief checks if we have enough space for a segment recovery.
* @param numThreads number of workers.
* @param neededBytes the segment size in bytes.
* @return Boolean value that is true if there is not enough memory for
* recovery.
*/
bool IsMemoryLimitReached(uint32_t numThreads, uint32_t neededMBs);
uint64_t m_checkpointId;
uint64_t m_lsn;
uint64_t m_lastReplayLsn;
uint32_t m_numWorkers;
std::string m_workingDir;
std::string m_errorMessage;
bool m_stopWorkers;
bool m_errorSet;
RC m_errorCode;
spin_lock m_errorLock;
std::mutex m_tasksLock;
std::set<uint32_t> m_tableIds;
std::list<Task*> m_tasksList;
};
} // namespace MOT
#endif /* CHECKPOINT_RECOVERY_H */

View File

@ -0,0 +1,86 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* inprocess_transactions.cpp
* Implements a map that holds transactions which are pending commit or abort.
*
* IDENTIFICATION
* src/gausskernel/storage/mot/core/src/system/recovery/inprocess_transactions.cpp
*
* -------------------------------------------------------------------------
*/
#include "inprocess_transactions.h"
namespace MOT {
DECLARE_LOGGER(RecoveryManager, InProcessTransactions);
InProcessTransactions::~InProcessTransactions()
{
if (m_numEntries) {
auto destroyLambda = [](RedoLogTransactionSegments* s, uint64_t) -> RC {
delete s;
return RC_OK;
};
ForEachTransaction(destroyLambda, true);
}
}
bool InProcessTransactions::InsertLogSegment(LogSegment* segment)
{
uint64_t transactionId = segment->m_controlBlock.m_internalTransactionId;
RedoLogTransactionSegments* transactionLogEntries = nullptr;
auto it = m_map.find(transactionId);
if (it == m_map.end()) {
// this is a new transaction. Not found in the map.
transactionLogEntries = new (std::nothrow) RedoLogTransactionSegments(transactionId);
if (transactionLogEntries == nullptr) {
return false;
}
if (!transactionLogEntries->Append(segment)) {
MOT_LOG_ERROR("InsertLogSegment: could not append log segment, error re-allocating log segments array");
return false;
}
m_map[transactionId] = transactionLogEntries;
m_numEntries++;
} else {
transactionLogEntries = it->second;
if (!transactionLogEntries->Append(segment)) {
MOT_LOG_ERROR("InsertLogSegment: could not append log segment, error re-allocating log segments array");
return false;
}
}
if (segment->m_controlBlock.m_externalTransactionId != INVALID_TRANSACTION_ID) {
m_extToInt[segment->m_controlBlock.m_externalTransactionId] = segment->m_controlBlock.m_internalTransactionId;
}
return true;
}
bool InProcessTransactions::FindTransactionId(uint64_t externalId, uint64_t& internalId, bool pop)
{
internalId = 0;
auto it = m_extToInt.find(externalId);
if (it != m_extToInt.end()) {
internalId = it->second;
if (pop) {
m_extToInt.erase(it);
}
return true;
}
return false;
}
} // namespace MOT

View File

@ -0,0 +1,128 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* inprocess_transactions.h
* Implements a map that holds transactions which are pending commit or abort.
*
* IDENTIFICATION
* src/gausskernel/storage/mot/core/src/system/recovery/inprocess_transactions.h
*
* -------------------------------------------------------------------------
*/
#ifndef INPROCESS_TRANSACTIONS_H
#define INPROCESS_TRANSACTIONS_H
#include <cstdint>
#include <map>
#include <mutex>
#include "redo_log_transaction_segments.h"
namespace MOT {
class InProcessTransactions {
public:
InProcessTransactions() : m_numEntries(0)
{}
~InProcessTransactions();
bool InsertLogSegment(LogSegment* segment);
bool FindTransactionId(uint64_t externalId, uint64_t& internalId, bool pop = true);
template <typename T>
RC ForUniqueTransaction(uint64_t id, const T& func)
{
return OperateOnTransactionsMap(id, func, true, true);
}
template <typename T>
RC ForEachTransaction(const T& func, bool pop)
{
return OperateOnTransactionsMap(0, func, pop, false);
}
bool IsInProcessTx(uint64_t id)
{
return (m_extToInt.find(id) != m_extToInt.end());
}
void Lock()
{
m_lock.lock();
}
void Unlock()
{
m_lock.unlock();
}
void UpdateTxIdMap(uint64_t intTx, uint64_t extTx)
{
m_extToInt[extTx] = intTx;
}
size_t GetNumTxns() const
{
return m_map.size();
}
private:
template <typename T>
RC OperateOnTransactionsMap(uint64_t id, const T& func, bool pop, bool lock)
{
RC status = RC_OK;
if (lock) {
m_lock.lock();
}
auto it = id ? m_map.find(id) : m_map.begin();
while (it != m_map.end()) {
RedoLogTransactionSegments* segments = it->second;
if (pop) {
m_map.erase(it);
m_numEntries--;
}
if (lock) {
m_lock.unlock();
}
status = func(segments, it->first);
if (pop) {
delete segments;
}
if (id || status != RC_OK) {
return status;
} else {
++it;
}
if (lock) {
m_lock.lock();
}
}
if (lock) {
m_lock.unlock();
}
return RC_OK;
}
std::mutex m_lock;
std::map<uint64_t, RedoLogTransactionSegments*> m_map;
std::map<uint64_t, uint64_t> m_extToInt;
uint64_t m_numEntries;
};
} // namespace MOT
#endif /* INPROCESS_TRANSACTIONS_H */

View File

@ -0,0 +1,115 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* irecovery_manager.h
* Recovery manager interfaces.
*
* IDENTIFICATION
* src/gausskernel/storage/mot/core/src/system/recovery/irecovery_manager.h
*
* -------------------------------------------------------------------------
*/
#ifndef IRECOVERY_MANAGER_H
#define IRECOVERY_MANAGER_H
#include "global.h"
#include "surrogate_state.h"
namespace MOT {
typedef TxnCommitStatus (*CommitLogStatusCallback)(uint64_t);
class IRecoveryManager {
public:
// destructor
virtual ~IRecoveryManager()
{}
virtual bool Initialize() = 0;
virtual void SetCommitLogCallback(CommitLogStatusCallback clogCallback) = 0;
/**
* @brief Cleans up the recovery object.
*/
virtual void CleanUp() = 0;
/**
* @brief Starts the recovery process which currently consists of
* checkpoint recovery.
* @return Boolean value denoting success or failure.
*/
virtual bool RecoverDbStart() = 0;
/**
* @brief Performs the post recovery operations: apply the in-process
* transactions and surrogate array, set the max csn and prints the stats
* @return Boolean value denoting success or failure.
*/
virtual bool RecoverDbEnd() = 0;
/**
* @brief attempts to insert a data chunk into the in-process
* transactions map and operate on it. Checks that the redo lsn is after the
* checkpoint snapshot lsn taken. Redo records that are prior snapshot are
* ignored.
* @return Boolean value denoting success or failure.
*/
virtual bool ApplyRedoLog(uint64_t redoLsn, char* data, size_t len) = 0;
/**
* @brief attempts to insert a data chunk into the in-process
* transactions map and operate on it
* @return Boolean value denoting success or failure.
*/
virtual bool ApplyLogSegmentFromData(char* data, size_t len, uint64_t replayLsn = 0) = 0;
/**
* @brief performs a commit on an in-process transaction,
* @return Boolean value denoting success or failure to commit.
*/
virtual bool CommitRecoveredTransaction(uint64_t externalTransactionId) = 0;
/**
* @brief performs a commit or abort on an in-process transaction
* @param id the transaction id.
* @param isCommit specifies commit or abort.
* @return Int indicates the internal transaction id or 0 in case
* the transaction id was not found
*/
virtual uint64_t PerformInProcessTx(uint64_t id, bool isCommit) = 0;
/**
* @brief applies a failed 2pc transaction according to its type.
* a detailed info is described in the function's implementation.
* @param internalTransactionId the transaction id to apply
* @return RC value denoting the operation's status
*/
virtual RC ApplyInProcessTransaction(uint64_t internalTransactionId) = 0;
virtual void SetLastReplayLsn(uint64_t lastReplayLsn) = 0;
virtual uint64_t GetLastReplayLsn() const = 0;
virtual bool IsErrorSet() const = 0;
virtual void AddSurrogateArrayToList(SurrogateState& surrogate) = 0;
virtual void SetCsn(uint64_t csn) = 0;
protected:
// constructor
IRecoveryManager()
{}
};
} // namespace MOT
#endif /* IRECOVERY_MANAGER_H */

View File

@ -0,0 +1,46 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* log_segment.cpp
* Redo log data container.
*
* IDENTIFICATION
* src/gausskernel/storage/mot/core/src/system/recovery/log_segment.cpp
*
* -------------------------------------------------------------------------
*/
#include "log_segment.h"
#include "redo_log_writer.h"
namespace MOT {
size_t LogSegment::SerializeSize()
{
return SerializableCharBuf::SerializeSize(m_len) + EndSegmentBlockSerializer::SerializeSize(&m_controlBlock);
}
void LogSegment::Serialize(char* dataOut)
{
dataOut = SerializableCharBuf::Serialize(dataOut, m_data, m_len);
EndSegmentBlockSerializer::Serialize(&m_controlBlock, dataOut);
}
void LogSegment::Deserialize(const char* in)
{
char* dataIn = (char*)in;
dataIn = SerializableCharBuf::Deserialize(dataIn, m_data, m_len);
EndSegmentBlockSerializer::Deserialize(&m_controlBlock, dataIn);
}
} // namespace MOT

View File

@ -0,0 +1,82 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* log_segment.h
* Redo log data container.
*
* IDENTIFICATION
* src/gausskernel/storage/mot/core/src/system/recovery/log_segment.h
*
* -------------------------------------------------------------------------
*/
#ifndef LOG_SEGMENT_H
#define LOG_SEGMENT_H
#include "redo_log_global.h"
#include "redo_log_writer.h"
#include "serializable.h"
namespace MOT {
/**
* @struct LogSegment
* @brief encapsulates a chunk of logging data
*/
struct LogSegment : public Serializable {
char* m_data;
size_t m_len;
EndSegmentBlock m_controlBlock;
uint64_t m_replayLsn;
~LogSegment()
{
if (m_data != nullptr) {
delete[] m_data;
}
}
/**
* @brief checks if this log segment is part of a two-phase transaction
* @return Boolean value denoting if it is part of a two-phase transaction or not.
*/
bool IsTwoPhase()
{
return (m_controlBlock.m_opCode == PREPARE_TX || m_controlBlock.m_opCode == COMMIT_PREPARED_TX);
}
/**
* @brief fetches the size of the log segment
* @return Size_t value denoting the size of the segment.
*/
virtual size_t SerializeSize();
/**
* @brief serialize the log segment into a given buffer
* @param dataOut the output buffer
*/
virtual void Serialize(char* dataOut);
/**
* @brief creates a log segment from a data buffer.
* @param dataIn the input buffer.
*/
virtual void Deserialize(const char* dataIn);
};
} // namespace MOT
#endif /* LOG_SEGMENT_H */

View File

@ -0,0 +1,47 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* recovery_manager_factory.h
* Recovery manager factory interface to create recovery manager.
*
* IDENTIFICATION
* src/gausskernel/storage/mot/core/src/system/recovery/recovery_manager_factory.h
*
* -------------------------------------------------------------------------
*/
#ifndef RECOVERY_MANAGER_FACTORY_H
#define RECOVERY_MANAGER_FACTORY_H
#include "recovery_manager.h"
namespace MOT {
class RecoveryManagerFactory {
public:
static IRecoveryManager* CreateRecoveryManager()
{
return new (std::nothrow) RecoveryManager();
}
private:
RecoveryManagerFactory()
{}
~RecoveryManagerFactory()
{}
};
} // namespace MOT
#endif /* RECOVERY_MANAGER_FACTORY_H */

View File

@ -36,39 +36,45 @@
namespace MOT {
DECLARE_LOGGER(RecoveryOps, Recovery);
uint32_t RecoveryManager::RecoverLogOperation(uint8_t* data, uint64_t csn, uint64_t transactionId, uint32_t tid,
SurrogateState& sState, RC& status, bool& wasCommit)
uint32_t RecoveryOps::RecoverLogOperation(TxnManager* txn, uint8_t* data, uint64_t csn, uint64_t transactionId,
uint32_t tid, SurrogateState& sState, RC& status, bool& wasCommit)
{
if (txn == nullptr) {
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "%s: invalid TxnManager object", __FUNCTION__);
status = RC_ERROR;
return 0;
}
OperationCode opCode = *static_cast<OperationCode*>((void*)data);
switch (opCode) {
case CREATE_ROW:
return RecoverLogOperationInsert(data, csn, tid, sState, status);
return RecoverLogOperationInsert(txn, data, csn, tid, sState, status);
case UPDATE_ROW:
return RecoverLogOperationUpdate(data, csn, tid, status);
return RecoverLogOperationUpdate(txn, data, csn, tid, status);
case OVERWRITE_ROW:
return RecoverLogOperationOverwrite(data, csn, tid, sState, status);
return RecoverLogOperationOverwrite(txn, data, csn, tid, sState, status);
case REMOVE_ROW:
return RecoverLogOperationDelete(data, csn, tid, status);
return RecoverLogOperationDelete(txn, data, csn, tid, status);
case CREATE_TABLE:
return RecoverLogOperationCreateTable(data, status, COMMIT, transactionId);
return RecoverLogOperationCreateTable(txn, data, status, COMMIT, transactionId);
case CREATE_INDEX:
return RecoverLogOperationCreateIndex(data, tid, status, COMMIT);
return RecoverLogOperationCreateIndex(txn, data, tid, status, COMMIT);
case DROP_TABLE:
return RecoverLogOperationDropTable(data, status, COMMIT);
return RecoverLogOperationDropTable(txn, data, status, COMMIT);
case DROP_INDEX:
return RecoverLogOperationDropIndex(data, status, COMMIT);
return RecoverLogOperationDropIndex(txn, data, status, COMMIT);
case TRUNCATE_TABLE:
return RecoverLogOperationTruncateTable(data, status, COMMIT);
return RecoverLogOperationTruncateTable(txn, data, status, COMMIT);
case COMMIT_TX:
case COMMIT_PREPARED_TX:
wasCommit = true;
return RecoverLogOperationCommit(data, csn, tid, status);
return RecoverLogOperationCommit(txn, data, csn, tid, status);
case PARTIAL_REDO_TX:
case PREPARE_TX:
return sizeof(EndSegmentBlock);
case ROLLBACK_TX:
case ROLLBACK_PREPARED_TX:
return RecoverLogOperationRollback(data, csn, tid, status);
return RecoverLogOperationRollback(txn, data, csn, tid, status);
default:
MOT_LOG_ERROR("Unknown recovery redo record op-code: %u", (unsigned)opCode);
status = RC_ERROR;
@ -76,8 +82,8 @@ uint32_t RecoveryManager::RecoverLogOperation(uint8_t* data, uint64_t csn, uint6
}
}
uint32_t RecoveryManager::RecoverLogOperationCreateTable(
uint8_t* data, RC& status, RecoveryOpState state, uint64_t transactionId)
uint32_t RecoveryOps::RecoverLogOperationCreateTable(
TxnManager* txn, uint8_t* data, RC& status, RecoveryOpState state, uint64_t transactionId)
{
if (GetGlobalConfiguration().m_enableIncrementalCheckpoint) {
status = RC_ERROR;
@ -104,15 +110,16 @@ uint32_t RecoveryManager::RecoverLogOperationCreateTable(
switch (state) {
case COMMIT:
CreateTable((char*)data, status, table, TRANSACTIONAL);
MOT_LOG_DEBUG("RecoverLogOperationCreateTable: COMMIT");
CreateTable(txn, (char*)data, status, table, TRANSACTIONAL);
break;
case TPC_APPLY:
CreateTable((char*)data, status, table, DONT_ADD_TO_ENGINE);
CreateTable(txn, (char*)data, status, table, DONT_ADD_TO_ENGINE);
if (status == RC_OK && table != nullptr) {
tableInfo = new (std::nothrow) TableInfo(table, transactionId);
if (tableInfo != nullptr) {
MOT::GetRecoveryManager()->m_preCommitedTables[table->GetTableId()] = tableInfo;
((RecoveryManager*)GetRecoveryManager())->m_preCommitedTables[table->GetTableId()] = tableInfo;
} else {
status = RC_ERROR;
MOT_LOG_ERROR("RecoverLogOperationCreateTable: failed to create table info");
@ -129,8 +136,8 @@ uint32_t RecoveryManager::RecoverLogOperationCreateTable(
case TPC_COMMIT:
case TPC_ABORT:
Table::DeserializeNameAndIds((const char*)data, tableId, extId, tableName, longName);
it = GetRecoveryManager()->m_preCommitedTables.find(tableId);
if (it != GetRecoveryManager()->m_preCommitedTables.end()) {
it = ((RecoveryManager*)GetRecoveryManager())->m_preCommitedTables.find(tableId);
if (it != ((RecoveryManager*)GetRecoveryManager())->m_preCommitedTables.end()) {
tableInfo = (TableInfo*)it->second;
if (tableInfo != nullptr) {
if (state == TPC_COMMIT) {
@ -146,7 +153,7 @@ uint32_t RecoveryManager::RecoverLogOperationCreateTable(
delete tableInfo->m_table;
if (tableInfo != nullptr)
delete tableInfo;
GetRecoveryManager()->m_preCommitedTables.erase(it);
((RecoveryManager*)GetRecoveryManager())->m_preCommitedTables.erase(it);
} else {
MOT_LOG_ERROR(
"RecoverLogOperationCreateTable: could not find table [%lu] %s", tableId, tableName.c_str());
@ -163,7 +170,7 @@ uint32_t RecoveryManager::RecoverLogOperationCreateTable(
return sizeof(OperationCode) + sizeof(bufSize) + bufSize;
}
uint32_t RecoveryManager::RecoverLogOperationDropTable(uint8_t* data, RC& status, RecoveryOpState state)
uint32_t RecoveryOps::RecoverLogOperationDropTable(TxnManager* txn, uint8_t* data, RC& status, RecoveryOpState state)
{
OperationCode opCode = *(OperationCode*)data;
MOT_ASSERT(opCode == DROP_TABLE);
@ -171,7 +178,7 @@ uint32_t RecoveryManager::RecoverLogOperationDropTable(uint8_t* data, RC& status
switch (state) {
case COMMIT:
case TPC_COMMIT:
DropTable((char*)data, status);
DropTable(txn, (char*)data, status);
break;
case ABORT:
case TPC_APPLY:
@ -185,7 +192,8 @@ uint32_t RecoveryManager::RecoverLogOperationDropTable(uint8_t* data, RC& status
return sizeof(OperationCode) + sizeof(uint64_t);
}
uint32_t RecoveryManager::RecoverLogOperationCreateIndex(uint8_t* data, uint32_t tid, RC& status, RecoveryOpState state)
uint32_t RecoveryOps::RecoverLogOperationCreateIndex(
TxnManager* txn, uint8_t* data, uint32_t tid, RC& status, RecoveryOpState state)
{
OperationCode opCode = *(OperationCode*)data;
MOT_ASSERT(opCode == CREATE_INDEX);
@ -195,7 +203,7 @@ uint32_t RecoveryManager::RecoverLogOperationCreateIndex(uint8_t* data, uint32_t
switch (state) {
case COMMIT:
case TPC_COMMIT:
CreateIndex((char*)data, tid, status);
CreateIndex(txn, (char*)data, tid, status);
break;
case ABORT:
case TPC_APPLY:
@ -209,7 +217,7 @@ uint32_t RecoveryManager::RecoverLogOperationCreateIndex(uint8_t* data, uint32_t
return sizeof(OperationCode) + sizeof(bufSize) + sizeof(uint64_t) + bufSize; // sizeof(uint64_t) is for tableId
}
uint32_t RecoveryManager::RecoverLogOperationDropIndex(uint8_t* data, RC& status, RecoveryOpState state)
uint32_t RecoveryOps::RecoverLogOperationDropIndex(TxnManager* txn, uint8_t* data, RC& status, RecoveryOpState state)
{
OperationCode opCode = *(OperationCode*)data;
MOT_ASSERT(opCode == DROP_INDEX);
@ -218,7 +226,7 @@ uint32_t RecoveryManager::RecoverLogOperationDropIndex(uint8_t* data, RC& status
switch (state) {
case COMMIT:
case TPC_COMMIT:
DropIndex(extracted, status);
DropIndex(txn, extracted, status);
break;
case ABORT:
case TPC_APPLY:
@ -236,7 +244,8 @@ uint32_t RecoveryManager::RecoverLogOperationDropIndex(uint8_t* data, RC& status
return sizeof(OperationCode) + sizeof(uint64_t) + sizeof(size_t) + nameLen;
}
uint32_t RecoveryManager::RecoverLogOperationTruncateTable(uint8_t* data, RC& status, RecoveryOpState state)
uint32_t RecoveryOps::RecoverLogOperationTruncateTable(
TxnManager* txn, uint8_t* data, RC& status, RecoveryOpState state)
{
OperationCode opCode = *(OperationCode*)data;
MOT_ASSERT(opCode == TRUNCATE_TABLE);
@ -244,7 +253,7 @@ uint32_t RecoveryManager::RecoverLogOperationTruncateTable(uint8_t* data, RC& st
switch (state) {
case COMMIT:
case TPC_COMMIT:
TruncateTable((char*)data, status);
TruncateTable(txn, (char*)data, status);
break;
case ABORT:
case TPC_APPLY:
@ -258,8 +267,8 @@ uint32_t RecoveryManager::RecoverLogOperationTruncateTable(uint8_t* data, RC& st
return sizeof(OperationCode) + sizeof(uint64_t);
}
uint32_t RecoveryManager::RecoverLogOperationInsert(
uint8_t* data, uint64_t csn, uint32_t tid, SurrogateState& sState, RC& status)
uint32_t RecoveryOps::RecoverLogOperationInsert(
TxnManager* txn, uint8_t* data, uint64_t csn, uint32_t tid, SurrogateState& sState, RC& status)
{
uint64_t tableId, rowLength, exId;
uint16_t keyLength;
@ -278,14 +287,15 @@ uint32_t RecoveryManager::RecoverLogOperationInsert(
keyData = ExtractPtr(data, keyLength);
Extract(data, rowLength);
rowData = ExtractPtr(data, rowLength);
InsertRow(tableId, exId, (char*)keyData, keyLength, (char*)rowData, rowLength, csn, tid, sState, status, row_id);
if (MOT::GetRecoveryManager()->m_logStats != nullptr)
MOT::GetRecoveryManager()->m_logStats->IncInsert(tableId);
InsertRow(
txn, tableId, exId, (char*)keyData, keyLength, (char*)rowData, rowLength, csn, tid, sState, status, row_id);
if (((RecoveryManager*)GetRecoveryManager())->m_logStats != nullptr)
((RecoveryManager*)GetRecoveryManager())->m_logStats->IncInsert(tableId);
return sizeof(OperationCode) + sizeof(tableId) + sizeof(exId) + sizeof(row_id) + sizeof(keyLength) + keyLength +
sizeof(rowLength) + rowLength;
}
uint32_t RecoveryManager::RecoverLogOperationUpdate(uint8_t* data, uint64_t csn, uint32_t tid, RC& status)
uint32_t RecoveryOps::RecoverLogOperationUpdate(TxnManager* txn, uint8_t* data, uint64_t csn, uint32_t tid, RC& status)
{
uint64_t tableId;
uint64_t exId;
@ -302,7 +312,7 @@ uint32_t RecoveryManager::RecoverLogOperationUpdate(uint8_t* data, uint64_t csn,
Extract(data, keyLength);
keyData = ExtractPtr(data, keyLength);
Table* table = MOTCurrTxn->GetTableByExternalId(exId);
Table* table = txn->GetTableByExternalId(exId);
if (table == nullptr) {
status = RC_ERROR;
MOT_REPORT_ERROR(MOT_ERROR_INVALID_ARG,
@ -322,7 +332,7 @@ uint32_t RecoveryManager::RecoverLogOperationUpdate(uint8_t* data, uint64_t csn,
}
Index* index = table->GetPrimaryIndex();
Key* key = MOTCurrTxn->GetTxnKey(index);
Key* key = txn->GetTxnKey(index);
if (key == nullptr) {
status = RC_ERROR;
MOT_REPORT_ERROR(MOT_ERROR_OOM, "Recovery Manager Update Row", "failed to allocate key");
@ -330,10 +340,10 @@ uint32_t RecoveryManager::RecoverLogOperationUpdate(uint8_t* data, uint64_t csn,
}
key->CpKey((const uint8_t*)keyData, keyLength);
Row* row = MOTCurrTxn->RowLookupByKey(table, RD_FOR_UPDATE, key);
Row* row = txn->RowLookupByKey(table, RD_FOR_UPDATE, key);
if (row == nullptr) {
// Row not found. Error!!! Got an update for non existing row.
MOTCurrTxn->DestroyTxnKey(key);
txn->DestroyTxnKey(key);
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL,
"Recovery Manager Update Row",
"row not found, key: %s, tableId: %lu",
@ -345,7 +355,7 @@ uint32_t RecoveryManager::RecoverLogOperationUpdate(uint8_t* data, uint64_t csn,
if (row->GetCommitSequenceNumber() > csn) {
// Row CSN is newer. Error!!!
MOTCurrTxn->DestroyTxnKey(key);
txn->DestroyTxnKey(key);
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL,
"Recovery Manager Update Row",
"row CSN is newer! %lu > %lu, key: %s, tableId: %lu",
@ -383,16 +393,16 @@ uint32_t RecoveryManager::RecoverLogOperationUpdate(uint8_t* data, uint64_t csn,
updated_columns_it.Next();
}
MOTCurrTxn->UpdateLastRowState(MOT::AccessType::WR);
MOTCurrTxn->DestroyTxnKey(key);
if (MOT::GetRecoveryManager()->m_logStats != nullptr)
MOT::GetRecoveryManager()->m_logStats->IncUpdate(tableId);
txn->UpdateLastRowState(MOT::AccessType::WR);
txn->DestroyTxnKey(key);
if (((RecoveryManager*)GetRecoveryManager())->m_logStats != nullptr)
((RecoveryManager*)GetRecoveryManager())->m_logStats->IncUpdate(tableId);
return sizeof(OperationCode) + sizeof(tableId) + sizeof(exId) + sizeof(keyLength) + keyLength +
updated_columns.GetLength() + valid_columns.GetLength() + size;
}
uint32_t RecoveryManager::RecoverLogOperationOverwrite(
uint8_t* data, uint64_t csn, uint32_t tid, SurrogateState& sState, RC& status)
uint32_t RecoveryOps::RecoverLogOperationOverwrite(
TxnManager* txn, uint8_t* data, uint64_t csn, uint32_t tid, SurrogateState& sState, RC& status)
{
uint64_t tableId, rowLength, exId;
uint16_t keyLength;
@ -410,14 +420,14 @@ uint32_t RecoveryManager::RecoverLogOperationOverwrite(
Extract(data, rowLength);
rowData = ExtractPtr(data, rowLength);
UpdateRow(tableId, exId, (char*)keyData, keyLength, (char*)rowData, rowLength, csn, tid, sState, status);
if (MOT::GetRecoveryManager()->m_logStats != nullptr)
MOT::GetRecoveryManager()->m_logStats->IncUpdate(tableId);
UpdateRow(txn, tableId, exId, (char*)keyData, keyLength, (char*)rowData, rowLength, csn, tid, sState, status);
if (((RecoveryManager*)GetRecoveryManager())->m_logStats != nullptr)
((RecoveryManager*)GetRecoveryManager())->m_logStats->IncUpdate(tableId);
return sizeof(OperationCode) + sizeof(tableId) + sizeof(exId) + sizeof(keyLength) + keyLength + sizeof(rowLength) +
rowLength;
}
uint32_t RecoveryManager::RecoverLogOperationDelete(uint8_t* data, uint64_t csn, uint32_t tid, RC& status)
uint32_t RecoveryOps::RecoverLogOperationDelete(TxnManager* txn, uint8_t* data, uint64_t csn, uint32_t tid, RC& status)
{
uint64_t tableId, exId;
uint16_t keyLength;
@ -432,35 +442,37 @@ uint32_t RecoveryManager::RecoverLogOperationDelete(uint8_t* data, uint64_t csn,
Extract(data, keyLength);
keyData = ExtractPtr(data, keyLength);
DeleteRow(tableId, exId, (char*)keyData, keyLength, csn, tid, status);
if (MOT::GetRecoveryManager()->m_logStats != nullptr)
MOT::GetRecoveryManager()->m_logStats->IncDelete(tableId);
DeleteRow(txn, tableId, exId, (char*)keyData, keyLength, csn, tid, status);
if (((RecoveryManager*)GetRecoveryManager())->m_logStats != nullptr)
((RecoveryManager*)GetRecoveryManager())->m_logStats->IncDelete(tableId);
return sizeof(OperationCode) + sizeof(tableId) + sizeof(exId) + sizeof(keyLength) + keyLength;
}
uint32_t RecoveryManager::RecoverLogOperationCommit(uint8_t* data, uint64_t csn, uint32_t tid, RC& status)
uint32_t RecoveryOps::RecoverLogOperationCommit(TxnManager* txn, uint8_t* data, uint64_t csn, uint32_t tid, RC& status)
{
// OperationCode + CSN + transaction_type + commit_counter + transaction_id
if (MOT::GetRecoveryManager()->m_logStats != nullptr)
MOT::GetRecoveryManager()->m_logStats->m_tcls++;
status = MOT::GetRecoveryManager()->CommitTransaction(csn);
if (((RecoveryManager*)GetRecoveryManager())->m_logStats != nullptr)
((RecoveryManager*)GetRecoveryManager())->m_logStats->IncCommit();
status = CommitTransaction(txn, csn);
if (status != RC_OK) {
MOT_LOG_ERROR("Failed to commit row recovery from log: %s (error code: %d)", RcToString(status), (int)status);
}
return sizeof(EndSegmentBlock);
}
uint32_t RecoveryManager::RecoverLogOperationRollback(uint8_t* data, uint64_t csn, uint32_t tid, RC& status)
uint32_t RecoveryOps::RecoverLogOperationRollback(
TxnManager* txn, uint8_t* data, uint64_t csn, uint32_t tid, RC& status)
{
// OperationCode + CSN + transaction_type + commit_counter + transaction_id
MOT::GetRecoveryManager()->RollbackTransaction();
RecoveryOps::RollbackTransaction(txn);
return sizeof(EndSegmentBlock);
}
void RecoveryManager::InsertRow(uint64_t tableId, uint64_t exId, char* keyData, uint16_t keyLen, char* rowData,
uint64_t rowLen, uint64_t csn, uint32_t tid, SurrogateState& sState, RC& status, uint64_t rowId, bool insertLocked)
void RecoveryOps::InsertRow(TxnManager* txn, uint64_t tableId, uint64_t exId, char* keyData, uint16_t keyLen,
char* rowData, uint64_t rowLen, uint64_t csn, uint32_t tid, SurrogateState& sState, RC& status, uint64_t rowId,
bool insertLocked)
{
Table* table = MOTCurrTxn->GetTableByExternalId(exId);
Table* table = txn->GetTableByExternalId(exId);
if (table == nullptr) {
status = RC_ERROR;
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "Recover Insert Row", "Table %" PRIu64 " does not exist", exId);
@ -486,7 +498,7 @@ void RecoveryManager::InsertRow(uint64_t tableId, uint64_t exId, char* keyData,
MOT::Index* ix = nullptr;
MOT::Key* cleanupKeys[table->GetNumIndexes()] = {nullptr};
ix = table->GetPrimaryIndex();
key = MOTCurrTxn->GetTxnKey(ix);
key = txn->GetTxnKey(ix);
if (key == nullptr) {
table->DestroyRow(row);
status = RC_ERROR;
@ -499,10 +511,10 @@ void RecoveryManager::InsertRow(uint64_t tableId, uint64_t exId, char* keyData,
sState.UpdateMaxKey(rowId);
}
key->CpKey((const uint8_t*)keyData, keyLen);
MOTCurrTxn->GetNextInsertItem()->SetItem(row, ix, key);
txn->GetNextInsertItem()->SetItem(row, ix, key);
for (uint16_t i = 1; i < table->GetNumIndexes(); i++) {
ix = table->GetSecondaryIndex(i);
key = MOTCurrTxn->GetTxnKey(ix);
key = txn->GetTxnKey(ix);
if (key == nullptr) {
status = RC_MEMORY_ALLOCATION_ERROR;
MOT_REPORT_ERROR(MOT_ERROR_OOM,
@ -511,18 +523,18 @@ void RecoveryManager::InsertRow(uint64_t tableId, uint64_t exId, char* keyData,
ix->GetName().c_str());
for (uint16_t j = 0; j < table->GetNumIndexes(); j++) {
if (cleanupKeys[j] != nullptr) {
MOTCurrTxn->DestroyTxnKey(cleanupKeys[j]);
txn->DestroyTxnKey(cleanupKeys[j]);
}
}
table->DestroyRow(row);
MOTCurrTxn->Rollback();
txn->Rollback();
return;
}
cleanupKeys[i] = key;
ix->BuildKey(table, row, key);
MOTCurrTxn->GetNextInsertItem()->SetItem(row, ix, key);
txn->GetNextInsertItem()->SetItem(row, ix, key);
}
status = MOTCurrTxn->InsertRow(row);
status = txn->InsertRow(row);
if (insertLocked == true) {
row->GetPrimarySentinel()->Lock(0);
@ -533,41 +545,14 @@ void RecoveryManager::InsertRow(uint64_t tableId, uint64_t exId, char* keyData,
}
}
void RecoveryManager::InsertRowFromCheckpoint(Table* table, char* keyData, uint16_t keyLen, char* rowData,
uint64_t rowLen, uint64_t csn, uint32_t tid, SurrogateState& sState, RC& status, uint64_t rowId)
{
MaxKey key;
Row* row = table->CreateNewRow();
if (row == nullptr) {
status = RC_ERROR;
MOT_REPORT_ERROR(MOT_ERROR_OOM, "Recovery Manager Insert Row", "failed to create row");
return;
}
row->CopyData((const uint8_t*)rowData, rowLen);
row->SetCommitSequenceNumber(csn);
row->SetRowId(rowId);
MOT::Index* ix = table->GetPrimaryIndex();
if (ix->IsFakePrimary()) {
row->SetSurrogateKey(*(uint64_t*)keyData);
sState.UpdateMaxKey(rowId);
}
key.CpKey((const uint8_t*)keyData, keyLen);
status = table->InsertRowNonTransactional(row, tid, &key);
if (status != RC_OK) {
MOT_REPORT_ERROR(MOT_ERROR_OOM, "Recovery Manager Insert Row", "failed to insert row");
table->DestroyRow(row);
}
}
void RecoveryManager::DeleteRow(
uint64_t tableId, uint64_t exId, char* keyData, uint16_t keyLen, uint64_t csn, uint32_t tid, RC& status)
void RecoveryOps::DeleteRow(TxnManager* txn, uint64_t tableId, uint64_t exId, char* keyData, uint16_t keyLen,
uint64_t csn, uint32_t tid, RC& status)
{
Row* row = nullptr;
Key* key = nullptr;
Index* index = nullptr;
Table* table = MOTCurrTxn->GetTableByExternalId(exId);
Table* table = txn->GetTableByExternalId(exId);
if (table == nullptr) {
MOT_REPORT_ERROR(
MOT_ERROR_INVALID_ARG, "Recovery Manager Delete Row", "table %" PRIu64 " does not exist", exId);
@ -576,16 +561,16 @@ void RecoveryManager::DeleteRow(
}
index = table->GetPrimaryIndex();
key = MOTCurrTxn->GetTxnKey(index);
key = txn->GetTxnKey(index);
if (key == nullptr) {
MOT_REPORT_ERROR(MOT_ERROR_OOM, "Recovery Manager Delete Row", "failed to create key");
status = RC_ERROR;
return;
}
key->CpKey((const uint8_t*)keyData, keyLen);
row = MOTCurrTxn->RowLookupByKey(table, WR, key);
row = txn->RowLookupByKey(table, WR, key);
if (row != nullptr) {
status = MOTCurrTxn->DeleteLastRow();
status = txn->DeleteLastRow();
if (status != RC_OK) {
if (MOT_IS_OOM()) {
MOT_REPORT_ERROR(
@ -603,14 +588,14 @@ void RecoveryManager::DeleteRow(
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "Recovery Manager Delete Row", "getData failed");
status = RC_ERROR;
}
MOTCurrTxn->DestroyTxnKey(key);
txn->DestroyTxnKey(key);
status = RC_OK;
}
void RecoveryManager::UpdateRow(uint64_t tableId, uint64_t exId, char* keyData, uint16_t keyLen, char* rowData,
uint64_t rowLen, uint64_t csn, uint32_t tid, SurrogateState& sState, RC& status)
void RecoveryOps::UpdateRow(TxnManager* txn, uint64_t tableId, uint64_t exId, char* keyData, uint16_t keyLen,
char* rowData, uint64_t rowLen, uint64_t csn, uint32_t tid, SurrogateState& sState, RC& status)
{
Table* table = MOTCurrTxn->GetTableByExternalId(exId);
Table* table = txn->GetTableByExternalId(exId);
if (table == nullptr) {
status = RC_ERROR;
MOT_REPORT_ERROR(
@ -627,7 +612,7 @@ void RecoveryManager::UpdateRow(uint64_t tableId, uint64_t exId, char* keyData,
}
Index* index = table->GetPrimaryIndex();
Key* key = MOTCurrTxn->GetTxnKey(index);
Key* key = txn->GetTxnKey(index);
if (key == nullptr) {
status = RC_ERROR;
MOT_REPORT_ERROR(MOT_ERROR_OOM, "Recovery Manager Update Row", "failed to create key");
@ -635,16 +620,13 @@ void RecoveryManager::UpdateRow(uint64_t tableId, uint64_t exId, char* keyData,
}
key->CpKey((const uint8_t*)keyData, keyLen);
Row* row = MOTCurrTxn->RowLookupByKey(table, RD_FOR_UPDATE, key);
Row* row = txn->RowLookupByKey(table, RD_FOR_UPDATE, key);
if (row == nullptr) {
// Row not found. Error!!! Got an update for non existing row.
MOTCurrTxn->DestroyTxnKey(key);
txn->DestroyTxnKey(key);
status = RC_ERROR;
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL,
"RecoveryManager::updateRow",
"row not found, key: %s, tableId: %lu",
key->GetKeyStr().c_str(),
tableId);
MOT_REPORT_ERROR(
MOT_ERROR_INTERNAL, "updateRow", "row not found, key: %s, tableId: %lu", key->GetKeyStr().c_str(), tableId);
return;
} else {
// CSNs can be equal if updated during the same transaction
@ -654,13 +636,13 @@ void RecoveryManager::UpdateRow(uint64_t tableId, uint64_t exId, char* keyData,
if (row->IsAbsentRow()) {
row->UnsetAbsentRow();
}
MOTCurrTxn->UpdateLastRowState(MOT::AccessType::WR);
txn->UpdateLastRowState(MOT::AccessType::WR);
} else {
// Row CSN is newer. Error!!!
MOTCurrTxn->DestroyTxnKey(key);
txn->DestroyTxnKey(key);
status = RC_ERROR;
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL,
"RecoveryManager::updateRow",
"updateRow",
"row CSN is newer! %lu > %lu, key: %s, tableId: %lu",
row->GetCommitSequenceNumber(),
csn,
@ -669,10 +651,10 @@ void RecoveryManager::UpdateRow(uint64_t tableId, uint64_t exId, char* keyData,
return;
}
}
MOTCurrTxn->DestroyTxnKey(key);
txn->DestroyTxnKey(key);
}
void RecoveryManager::CreateTable(char* data, RC& status, Table*& table, CreateTableMethod method)
void RecoveryOps::CreateTable(TxnManager* txn, char* data, RC& status, Table*& table, CreateTableMethod method)
{
/* first verify that the table does not exists */
string name;
@ -696,14 +678,14 @@ void RecoveryManager::CreateTable(char* data, RC& status, Table*& table, CreateT
table->Deserialize((const char*)data);
do {
if (!table->IsDeserialized()) {
MOT_LOG_ERROR("RecoveryManager::CreateTable: failed to de-serialize table");
MOT_LOG_ERROR("CreateTable: failed to de-serialize table");
status = RC_ERROR;
break;
}
switch (method) {
case TRANSACTIONAL:
status = MOTCurrTxn->CreateTable(table);
status = txn->CreateTable(table);
break;
case ADD_TO_ENGINE:
@ -717,21 +699,21 @@ void RecoveryManager::CreateTable(char* data, RC& status, Table*& table, CreateT
}
if (status != RC_OK) {
MOT_LOG_ERROR("RecoveryManager::CreateTable: failed to add table %s (id: %u) to engine (method %u)",
MOT_LOG_ERROR("CreateTable: failed to add table %s (id: %u) to engine (method %u)",
table->GetLongTableName().c_str(),
table->GetTableId(),
method);
break;
}
MOT_LOG_DEBUG("RecoveryManager::CreateTable: table %s (id %u) created (method %u)",
MOT_LOG_DEBUG("CreateTable: table %s (id %u) created (method %u)",
table->GetLongTableName().c_str(),
table->GetTableId(),
method);
return;
} while (0);
MOT_LOG_ERROR("RecoveryManager::CreateTable: failed to recover table");
MOT_LOG_ERROR("CreateTable: failed to recover table");
delete table;
if (status == RC_OK) {
@ -740,7 +722,7 @@ void RecoveryManager::CreateTable(char* data, RC& status, Table*& table, CreateT
return;
}
void RecoveryManager::DropTable(char* data, RC& status)
void RecoveryOps::DropTable(TxnManager* txn, char* data, RC& status)
{
char* in = (char*)data;
uint64_t externalTableId;
@ -748,14 +730,14 @@ void RecoveryManager::DropTable(char* data, RC& status)
string tableName;
in = SerializablePOD<uint64_t>::Deserialize(in, externalTableId);
table = MOTCurrTxn->GetTableByExternalId(externalTableId);
table = txn->GetTableByExternalId(externalTableId);
if (table == nullptr) {
MOT_LOG_DEBUG("DropTable: could not find table %" PRIu64, externalTableId);
/* this might happen if we try to replay an outdated xlog entry - currently we do not error out */
return;
}
tableName.assign(table->GetLongTableName());
status = MOTCurrTxn->DropTable(table);
status = txn->DropTable(table);
if (status != RC_OK) {
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL,
"Recovery Manager Drop Table",
@ -763,10 +745,10 @@ void RecoveryManager::DropTable(char* data, RC& status)
tableName.c_str(),
externalTableId);
}
MOT_LOG_DEBUG("RecoveryManager::DropTable: table %s [%" PRIu64 "] dropped", tableName.c_str(), externalTableId);
MOT_LOG_DEBUG("DropTable: table %s [%" PRIu64 "] dropped", tableName.c_str(), externalTableId);
}
void RecoveryManager::CreateIndex(char* data, uint32_t tid, RC& status)
void RecoveryOps::CreateIndex(TxnManager* txn, char* data, uint32_t tid, RC& status)
{
char* in = (char*)data;
uint64_t externalTableId;
@ -774,7 +756,7 @@ void RecoveryManager::CreateIndex(char* data, uint32_t tid, RC& status)
Table::CommonIndexMeta idx;
in = SerializablePOD<uint64_t>::Deserialize(in, externalTableId);
table = MOTCurrTxn->GetTableByExternalId(externalTableId);
table = txn->GetTableByExternalId(externalTableId);
if (table == nullptr) {
MOT_REPORT_ERROR(
MOT_ERROR_INVALID_ARG, "Recover Create Index", "Could not find table %" PRIu64, externalTableId);
@ -799,7 +781,7 @@ void RecoveryManager::CreateIndex(char* data, uint32_t tid, RC& status)
status);
}
if (status == RC_OK) {
status = MOTCurrTxn->CreateIndex(table, index, primary);
status = txn->CreateIndex(table, index, primary);
if (status != RC_OK) {
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL,
"Recover Create Index",
@ -811,17 +793,16 @@ void RecoveryManager::CreateIndex(char* data, uint32_t tid, RC& status)
}
}
void RecoveryManager::DropIndex(char* data, RC& status)
void RecoveryOps::DropIndex(TxnManager* txn, char* data, RC& status)
{
RC res;
char* in = (char*)data;
uint64_t externalTableId;
Table* table;
uint32_t indexNameLength;
string indexName;
in = SerializablePOD<uint64_t>::Deserialize(in, externalTableId);
table = MOTCurrTxn->GetTableByExternalId(externalTableId);
table = txn->GetTableByExternalId(externalTableId);
if (table == nullptr) {
/* this might happen if we try to replay an outdated xlog entry - currently we do not error out */
MOT_LOG_DEBUG("dropIndex: could not find table %" PRIu64, externalTableId);
@ -834,7 +815,7 @@ void RecoveryManager::DropIndex(char* data, RC& status)
res = RC_INDEX_NOT_FOUND;
} else {
table->WrLock();
res = MOTCurrTxn->DropIndex(index);
res = txn->DropIndex(index);
table->Unlock();
}
if (res != RC_OK) {
@ -846,15 +827,14 @@ void RecoveryManager::DropIndex(char* data, RC& status)
}
}
void RecoveryManager::TruncateTable(char* data, RC& status)
void RecoveryOps::TruncateTable(TxnManager* txn, char* data, RC& status)
{
RC res = RC_OK;
char* in = (char*)data;
uint64_t externalTableId;
Table* table;
in = SerializablePOD<uint64_t>::Deserialize(in, externalTableId);
table = MOTCurrTxn->GetTableByExternalId(externalTableId);
table = txn->GetTableByExternalId(externalTableId);
if (table == nullptr) {
/* this might happen if we try to replay an outdated xlog entry - currently we do not error out */
MOT_LOG_DEBUG("truncateTable: could not find table %" PRIu64, externalTableId);
@ -862,14 +842,20 @@ void RecoveryManager::TruncateTable(char* data, RC& status)
}
table->WrLock();
status = MOTCurrTxn->TruncateTable(table);
status = txn->TruncateTable(table);
table->Unlock();
}
// in-process (2pc) transactions recovery
uint32_t RecoveryManager::TwoPhaseRecoverOp(RecoveryOpState state, uint8_t* data, uint64_t csn, uint64_t transactionId,
uint32_t tid, SurrogateState& sState, RC& status)
uint32_t RecoveryOps::TwoPhaseRecoverOp(TxnManager* txn, RecoveryOpState state, uint8_t* data, uint64_t csn,
uint64_t transactionId, uint32_t tid, SurrogateState& sState, RC& status)
{
if (txn == nullptr) {
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "%s: invalid TxnManager object", __FUNCTION__);
status = RC_ERROR;
return 0;
}
OperationCode opCode = *static_cast<OperationCode*>((void*)data);
uint64_t tableId;
uint64_t rowLength = 0;
@ -890,19 +876,19 @@ uint32_t RecoveryManager::TwoPhaseRecoverOp(RecoveryOpState state, uint8_t* data
// DDLs
if (opCode == CREATE_TABLE)
return RecoverLogOperationCreateTable(data, status, state, transactionId);
return RecoverLogOperationCreateTable(txn, data, status, state, transactionId);
if (opCode == DROP_TABLE)
return RecoverLogOperationDropTable(data, status, state);
return RecoverLogOperationDropTable(txn, data, status, state);
if (opCode == CREATE_INDEX)
return RecoverLogOperationCreateIndex(data, tid, status, state);
return RecoverLogOperationCreateIndex(txn, data, tid, status, state);
if (opCode == DROP_INDEX)
return RecoverLogOperationDropIndex(data, status, state);
return RecoverLogOperationDropIndex(txn, data, status, state);
if (opCode == TRUNCATE_TABLE)
return RecoverLogOperationTruncateTable(data, status, state);
return RecoverLogOperationTruncateTable(txn, data, status, state);
if (opCode == PREPARE_TX || opCode == COMMIT_PREPARED_TX)
return sizeof(EndSegmentBlock);
@ -927,10 +913,10 @@ uint32_t RecoveryManager::TwoPhaseRecoverOp(RecoveryOpState state, uint8_t* data
if (opCode == CREATE_ROW)
ret += sizeof(uint64_t); // rowId
Table* table = MOTCurrTxn->GetTableByExternalId(exId);
Table* table = txn->GetTableByExternalId(exId);
if (table == nullptr) {
status = RC_ERROR;
MOT_LOG_ERROR("RecoveryManager::applyInProcessInsert: fetch table failed (id %lu)", tableId);
MOT_LOG_ERROR("applyInProcessInsert: fetch table failed (id %lu)", tableId);
return ret;
}
@ -948,7 +934,8 @@ uint32_t RecoveryManager::TwoPhaseRecoverOp(RecoveryOpState state, uint8_t* data
switch (state) {
case RecoveryOpState::TPC_APPLY:
RecoverTwoPhaseApply(opCode,
RecoverTwoPhaseApply(txn,
opCode,
tableId,
exId,
csn,
@ -999,9 +986,9 @@ uint32_t RecoveryManager::TwoPhaseRecoverOp(RecoveryOpState state, uint8_t* data
* | DELETE | Lock | ERROR |
* +-----------+-----------------+---------------+
*/
void RecoveryManager::RecoverTwoPhaseApply(OperationCode opCode, uint64_t tableId, uint64_t exId, uint64_t csn,
uint8_t* keyData, uint64_t keyLength, uint8_t* rowData, uint64_t rowLength, uint64_t transactionId, uint32_t tid,
Row* row, uint64_t rowId, SurrogateState& sState, RC& status)
void RecoveryOps::RecoverTwoPhaseApply(TxnManager* txn, OperationCode opCode, uint64_t tableId, uint64_t exId,
uint64_t csn, uint8_t* keyData, uint64_t keyLength, uint8_t* rowData, uint64_t rowLength, uint64_t transactionId,
uint32_t tid, Row* row, uint64_t rowId, SurrogateState& sState, RC& status)
{
if (row == nullptr) {
if (opCode == UPDATE_ROW || opCode == OVERWRITE_ROW || opCode == REMOVE_ROW) {
@ -1009,7 +996,8 @@ void RecoveryManager::RecoverTwoPhaseApply(OperationCode opCode, uint64_t tableI
MOT_ERROR_INVALID_ARG, "Recovery Manager 2PC Apply", "got op %u but row does not exist!", opCode);
} else {
MOT_LOG_DEBUG("recoverTwoPhaseApply: insert row [%lu]", transactionId);
InsertRow(tableId,
InsertRow(txn,
tableId,
exId,
(char*)keyData,
keyLength,
@ -1043,7 +1031,7 @@ void RecoveryManager::RecoverTwoPhaseApply(OperationCode opCode, uint64_t tableI
* | DELETE | Delete row | ERROR |
* +-----------+---------------------+-----------+
*/
void RecoveryManager::RecoverTwoPhaseCommit(Table* table, OperationCode opCode, uint64_t csn, uint8_t* rowData,
void RecoveryOps::RecoverTwoPhaseCommit(Table* table, OperationCode opCode, uint64_t csn, uint8_t* rowData,
uint64_t rowLength, uint64_t transactionId, uint32_t tid, Row* row, RC& status)
{
if (row == nullptr) {
@ -1093,7 +1081,7 @@ void RecoveryManager::RecoverTwoPhaseCommit(Table* table, OperationCode opCode,
* | DELETE | unlock | ignore |
* +-----------+---------+-----------+
*/
void RecoveryManager::RecoverTwoPhaseAbort(
void RecoveryOps::RecoverTwoPhaseAbort(
Table* table, OperationCode opCode, uint64_t csn, uint64_t transactionId, uint32_t tid, Row* row, RC& status)
{
if (row == nullptr) {
@ -1123,59 +1111,58 @@ void RecoveryManager::RecoverTwoPhaseAbort(
}
}
bool RecoveryManager::BeginTransaction(uint64_t replayLsn /* = 0 */)
RC RecoveryOps::BeginTransaction(TxnManager* txn, uint64_t replayLsn)
{
SessionContext* sessionContext = MOT_GET_CURRENT_SESSION_CONTEXT();
if (sessionContext == nullptr) {
MOT_REPORT_ERROR(
MOT_ERROR_INVALID_STATE, "Recover DB", "Cannot start recovery transaction: no session context");
return false;
}
MOT_LOG_DEBUG("Start recovery Transaction, replayLsn %lu", replayLsn);
TxnManager* txn = sessionContext->GetTxnManager();
txn->StartTransaction(INVALID_TRANSACTION_ID, READ_COMMITED);
txn->SetReplayLsn(replayLsn);
return true;
}
RC RecoveryManager::CommitTransaction(uint64_t csn)
{
SessionContext* sessionContext = MOT_GET_CURRENT_SESSION_CONTEXT();
if (sessionContext == nullptr) {
MOT_REPORT_ERROR(
MOT_ERROR_INVALID_STATE, "Recover DB", "Cannot commit recovery transaction: no session context");
if (txn == nullptr) {
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "%s: invalid TxnManager object", __FUNCTION__);
return RC_ERROR;
}
txn->StartTransaction(INVALID_TRANSACTION_ID, READ_COMMITED);
txn->SetReplayLsn(replayLsn);
return RC_OK;
}
MOT_LOG_DEBUG("Commit recovery transaction, csn %lu", csn);
TxnManager* txn = sessionContext->GetTxnManager();
RC RecoveryOps::CommitTransaction(TxnManager* txn, uint64_t csn)
{
txn->SetCommitSequenceNumber(csn);
RC result = txn->Commit();
if (result != RC_OK) {
RC status = txn->Commit();
if (status != RC_OK) {
MOT_REPORT_ERROR(MOT_ERROR_INTERNAL,
"Recover DB",
"Failed to commit recovery transaction: %s (error code: %d)",
RcToString(result),
(int)result);
"Failed to commit recovery transaction: %s (error code: %u)",
RcToString(status),
status);
txn->Rollback();
} else {
txn->EndTransaction();
}
return result;
return status;
}
void RecoveryManager::RollbackTransaction()
void RecoveryOps::RollbackTransaction(TxnManager* txn)
{
SessionContext* sessionContext = MOT_GET_CURRENT_SESSION_CONTEXT();
if (sessionContext == nullptr) {
MOT_REPORT_ERROR(
MOT_ERROR_INVALID_STATE, "Recover DB", "Cannot rollback recovery transaction: no session context");
return;
}
TxnManager* txn = sessionContext->GetTxnManager();
txn->Rollback();
}
bool RecoveryOps::IsSupportedOp(OperationCode op)
{
switch (op) {
case CREATE_ROW:
case UPDATE_ROW:
case UPDATE_ROW_VARIABLE:
case OVERWRITE_ROW:
case REMOVE_ROW:
case PREPARE_TX:
case COMMIT_PREPARED_TX:
case CREATE_TABLE:
case DROP_TABLE:
case CREATE_INDEX:
case DROP_INDEX:
case TRUNCATE_TABLE:
return true;
default:
return false;
}
}
} // namespace MOT

View File

@ -0,0 +1,410 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* recovery_ops.h
* Recovery logic for various operation types.
*
* IDENTIFICATION
* src/gausskernel/storage/mot/core/src/system/recovery/recovery_ops.h
*
* -------------------------------------------------------------------------
*/
#ifndef RECOVERY_OPS_H
#define RECOVERY_OPS_H
#include "redo_log_global.h"
#include "redo_log_transaction_iterator.h"
#include "txn.h"
#include "global.h"
#include "surrogate_state.h"
namespace MOT {
class RecoveryOps {
public:
/**
* @struct TableInfo
* @brief Describes a table by its id and the transaction
* id that it was created with.
*/
struct TableInfo {
public:
TableInfo(Table* t, uint64_t i) : m_table(t), m_transactionId(i)
{}
~TableInfo()
{}
Table* m_table;
uint64_t m_transactionId;
};
/**
* @brief a helper to extract a type from a buffer
* @param data the data buffer to extract from.
* @param out the output value.
*/
template <typename T>
static void Extract(uint8_t*& data, T& out)
{
T* temp = static_cast<T*>((void*)data);
data += sizeof(T);
out = *temp;
}
/**
* @brief a helper to extract a pointer type from a buffer
* @param data the data buffer to extract from.
* @param size the size of the buffer to extract.
* @return the pointer that was extracted.
*/
static uint8_t* ExtractPtr(uint8_t*& data, uint32_t size)
{
uint8_t* outptr = data;
data += size;
return outptr;
}
enum RecoveryOpState { COMMIT = 1, ABORT = 2, TPC_APPLY = 3, TPC_COMMIT = 4, TPC_ABORT = 5 };
enum CreateTableMethod { TRANSACTIONAL = 1, ADD_TO_ENGINE = 2, DONT_ADD_TO_ENGINE = 3 };
/**
* @brief performs a recovery operation on a data buffer.
* @param transaction manager object.
* @param data the buffer to recover.
* @param csn the operations's csn.
* @param transactionId the transaction id
* @param tid the thread id of the recovering thread
* @param sState the returned surrogate state of this operation
* @param[out] status the returned status of the operation
* @param[out] Was this a commit operation (required for managing transactional state).
* @return Int value denoting the number of bytes recovered
*/
static uint32_t RecoverLogOperation(TxnManager* txn, uint8_t* data, uint64_t csn, uint64_t transactionId,
uint32_t tid, SurrogateState& sState, RC& status, bool& wasCommit);
/**
* @brief attempts to recover a 2pc operation.
* @param transaction manager object.
* @param state the state of the operation.
* @param data buffer to operate on.
* @param csn the operation's csn.
* @param transactionId the transaction's id.
* @param tid the thread id of the recovering thread.
* @param sState the returned surrugate state.
* @param status the returned status of the operation.
* @return Int value denoting the number of bytes recovered.
*/
static uint32_t TwoPhaseRecoverOp(TxnManager* txn, RecoveryOpState state, uint8_t* data, uint64_t csn,
uint64_t transactionId, uint32_t tid, SurrogateState& sState, RC& status);
/**
* @brief Starts a new transaction for recovery operations.
* @param transaction manager object.
* @param replayLsn the redo LSN for this transaction during replay.
*/
static RC BeginTransaction(TxnManager* txn, uint64_t replayLsn = 0);
private:
/**
* @brief performs an insert operation of a data buffer.
* @param transaction manager object.
* @param data the buffer to recover.
* @param csn the operations's csn.
* @param tid the thread id of the recovering thread.
* @param sState the returned surrogate state of this operation.
* @param status the returned status of the operation.
* @return Int value denoting the number of bytes recovered.
*/
static uint32_t RecoverLogOperationInsert(
TxnManager* txn, uint8_t* data, uint64_t csn, uint32_t tid, SurrogateState& sState, RC& status);
/**
* @brief performs a delta update operation of a data buffer.
* @param data the buffer to recover.
* @param csn the operations's csn.
* @param tid the thread id of the recovering thread.
* @param status the returned status of the operation.
* @return Int value denoting the number of bytes recovered.
*/
static uint32_t RecoverLogOperationUpdate(TxnManager* txn, uint8_t* data, uint64_t csn, uint32_t tid, RC& status);
/**
* @brief performs an update operation of a data buffer.
* @param transaction manager object.
* @param data the buffer to recover.
* @param csn the operations's csn.
* @param tid the thread id of the recovering thread.
* @param sState the returned surrogate state of this operation.
* @param status the returned status of the operation..
* @return Int value denoting the number of bytes recovered..
*/
static uint32_t RecoverLogOperationOverwrite(
TxnManager* txn, uint8_t* data, uint64_t csn, uint32_t tid, SurrogateState& sState, RC& status);
/**
* @brief performs a delete operation of a data buffer.
* @param transaction manager object.
* @param data the buffer to recover.
* @param csn the operations's csn.
* @param tid the thread id of the recovering thread.
* @param status the returned status of the operation.
* @return Int value denoting the number of bytes recovered.
*/
static uint32_t RecoverLogOperationDelete(TxnManager* txn, uint8_t* data, uint64_t csn, uint32_t tid, RC& status);
/**
* @brief performs a commit operation of a data buffer.
* @param transaction manager object.
* @param data the buffer to recover.
* @param csn the operations's csn.
* @param tid the thread id of the recovering thread.
* @return Int value denoting the number of bytes recovered.
*/
static uint32_t RecoverLogOperationCommit(TxnManager* txn, uint8_t* data, uint64_t csn, uint32_t tid, RC& status);
/**
* @brief performs a rollback operation of a data buffer.
* @param transaction manager object.
* @param data the buffer to recover.
* @param csn The CSN of the operation.
* @param tid the thread id of the recovering thread.
* @return Int value denoting the number of bytes recovered.
*/
static uint32_t RecoverLogOperationRollback(TxnManager* txn, uint8_t* data, uint64_t csn, uint32_t tid, RC& status);
/**
* @brief performs a create table operation from a data buffer.
* @param transaction manager object.
* @param data the buffer to recover.
* @param status the returned status of the operation
* @param state the operation's state.
* @param transactionId the transaction id of the operation.
* @return Int value denoting the number of bytes recovered.
*/
static uint32_t RecoverLogOperationCreateTable(
TxnManager* txn, uint8_t* data, RC& status, RecoveryOpState state, uint64_t transactionId);
/**
* @brief performs a drop table operation from a data buffer.
* @param transaction manager object.
* @param data the buffer to recover.
* @param status the returned status of the operation
* @param state the operation's state.
* @return Int value denoting the number of bytes recovered
*/
static uint32_t RecoverLogOperationDropTable(TxnManager* txn, uint8_t* data, RC& status, RecoveryOpState state);
/**
* @brief performs a create index operation from a data buffer.
* @param data the buffer to recover.
* @param tid the thread id of the recovering thread.
* @param status the returned status of the operation.
* @param state the operation's state.
* @return Int value denoting the number of bytes recovered.
*/
static uint32_t RecoverLogOperationCreateIndex(
TxnManager* txn, uint8_t* data, uint32_t tid, RC& status, RecoveryOpState state);
/**
* @brief performs a drop index operation from a data buffer.
* @param transaction manager object.
* @param data the buffer to recover.
* @param status the returned status of the operation.
* @param state the operation's state.
* @return Int value denoting the number of bytes recovered.
*/
static uint32_t RecoverLogOperationDropIndex(TxnManager* txn, uint8_t* data, RC& status, RecoveryOpState state);
/**
* @brief performs a truncate table operation from a data buffer.
* @param transaction manager object.
* @param data the buffer to recover.
* @param status the returned status of the operation.
* @param state the operation's state.
* @return Int value denoting the number of bytes recovered.
*/
static uint32_t RecoverLogOperationTruncateTable(TxnManager* txn, uint8_t* data, RC& status, RecoveryOpState state);
/**
* @brief performs the actual row deletion from the storage.
* @param transaction manager object.
* @param tableId the table's id.
* @param exId the the table's external id.
* @param keyData key's data buffer.
* @param keyLen key's data buffer len.
* @param csn the operations's csn.
* @param tid the thread id of the recovering thread.
* @param status the returned status of the operation
*/
static void DeleteRow(TxnManager* txn, uint64_t tableId, uint64_t exId, char* keyData, uint16_t keyLen,
uint64_t csn, uint32_t tid, RC& status);
/**
* @brief performs the actual row insertion to the storage.
* @param transaction manager object.
* @param tableId the table's id.
* @param exId the table's external id.
* @param keyData key's data buffer.
* @param keyLen key's data buffer len.
* @param rowData row's data buffer.
* @param rowLen row's data buffer len.
* @param csn the operations's csn.
* @param tid the thread id of the recovering thread.
* @param sState the returned surrugate state.
* @param status the returned status of the operation
* @param rowId the row's internal id
* @param insertLocked should this row be inserted locked
*/
static void InsertRow(TxnManager* txn, uint64_t tableId, uint64_t exId, char* keyData, uint16_t keyLen,
char* rowData, uint64_t rowLen, uint64_t csn, uint32_t tid, SurrogateState& sState, RC& status, uint64_t rowId,
bool insertLocked = false);
/**
* @brief performs the actual row update in the storage.
* @param transaction manager object.
* @param tableId the table's id.
* @param exId the the table's external id.
* @param keyData key's data buffer.
* @param keyLen key's data buffer len.
* @param rowData row's data buffer.
* @param rowLen row's data buffer len.
* @param csn the operations's csn.
* @param tid the thread id of the recovering thread.
* @param sState the returned surrugate state.
* @param status the returned status of the operation.
*/
static void UpdateRow(TxnManager* txn, uint64_t tableId, uint64_t exId, char* keyData, uint16_t keyLen,
char* rowData, uint64_t rowLen, uint64_t csn, uint32_t tid, SurrogateState& sState, RC& status);
/**
* @brief performs the actual table creation.
* @param transaction manager object.
* @param data the table's data.
* @param status the returned status of the operation.
* @param table the returned table object.
* @param method controls whether the table is added to the engine or not.
*/
static void CreateTable(TxnManager* txn, char* data, RC& status, Table*& table, CreateTableMethod method);
/**
* @brief performs the actual table deletion.
* @param transaction manager object.
* @param data the table's data.
* @param status the returned status of the operation.
*/
static void DropTable(TxnManager* txn, char* data, RC& status);
/**
* @brief performs the actual index creation.
* @param transaction manager object.
* @param data the table's data
* @param the thread identifier
* @param status the returned status of the operation
*/
static void CreateIndex(TxnManager* txn, char* data, uint32_t tid, RC& status);
/**
* @brief performs the actual index deletion.
* @param transaction manager object.
* @param data the table's data.
* @param status the returned status of the operation.
*/
static void DropIndex(TxnManager* txn, char* data, RC& status);
/**
* @brief performs the actual table truncation.
* @param transaction manager object.
* @param data the table's data.
* @param status the returned status of the operation.
*/
static void TruncateTable(TxnManager* txn, char* data, RC& status);
/**
* @brief Commits the current recovery transaction.
* @param transaction manager object.
* @param transaction's commit sequence number.
*/
static RC CommitTransaction(TxnManager* txn, uint64_t csn);
/**
* @brief Rolls back the current recovery transaction.
* @param transaction manager object.
*/
static void RollbackTransaction(TxnManager* txn);
/**
* @brief attempts to apply a 2pc operation.
* @param transaction manager object.
* @param transaction manager object.
* @param opCode the operation code.
* @param tableId the table id that the recovered op belongs to.
* @param exId the external table id that the recovered op belongs to.
* @param csn the operation's csn.
* @param keyData key's data buffer.
* @param keyLen key's data buffer len.
* @param rowData row's data buffer.
* @param rowLen row's data buffer len.
* @param transactionId the transaction's id.
* @param tid the thread id of the recovering thread.
* @param row a row pointer (if exists).
* @param rowId the row id.
* @param sState the returned surrugate state.
* @param status the returned status of the operation.
*/
static void RecoverTwoPhaseApply(TxnManager* txn, OperationCode opCode, uint64_t tableId, uint64_t exId,
uint64_t csn, uint8_t* keyData, uint64_t keyLength, uint8_t* rowData, uint64_t rowLength,
uint64_t transactionId, uint32_t tid, Row* row, uint64_t rowId, SurrogateState& sState, RC& status);
/**
* @brief attempts to commit a 2pc operation
* @param table a pointer to the table object that the op belongs to.
* @param opCode the operation code.
* @param csn the operation's csn.
* @param rowData row's data buffer.
* @param rowLen row's data buffer len.
* @param transactionId the transaction's id
* @param tid the thread id of the recovering thread.
* @param row a row pointer (if exists)
* @param status the returned status of the operation
*/
static void RecoverTwoPhaseCommit(Table* table, OperationCode opCode, uint64_t csn, uint8_t* rowData,
uint64_t rowLength, uint64_t transactionId, uint32_t tid, Row* row, RC& status);
/**
* @brief attempts to abort a 2pc operation
* @param table a pointer to the table object that the op belongs to.
* @param opCode the operation code.
* @param csn the operation's csn.
* @param transactionId the transaction's id.
* @param tid the thread id of the recovering thread.
* @param row a row pointer (if exists)
* @param status the returned status of the operation
*/
static void RecoverTwoPhaseAbort(
Table* table, OperationCode opCode, uint64_t csn, uint64_t transactionId, uint32_t tid, Row* row, RC& status);
/**
* @brief checks if an operation is supported by the recovery.
* @param op the operation code to check.
* @return Boolean value denoting if the op is supported.
*/
static bool IsSupportedOp(OperationCode op);
}; // class RecoveryOps
} // namespace MOT
#endif /* RECOVERY_OPS_H */

View File

@ -13,16 +13,16 @@
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* transaction_buffer_iterator.cpp
* redo_log_transaction_iterator.cpp
* Iterator for iterating over redo log transactions.
*
* IDENTIFICATION
* src/gausskernel/storage/mot/core/src/system/recovery/transaction_buffer_iterator.cpp
* src/gausskernel/storage/mot/core/src/system/recovery/redo_log_transaction_iterator.cpp
*
* -------------------------------------------------------------------------
*/
#include "transaction_buffer_iterator.h"
#include "redo_log_transaction_iterator.h"
namespace MOT {
bool RedoLogTransactionIterator::End()
@ -72,22 +72,4 @@ LogSegment* RedoLogTransactionIterator::AllocRedoSegment(uint64_t replayLsn)
segment->m_replayLsn = replayLsn;
return segment;
}
size_t LogSegment::SerializeSize()
{
return SerializableCharBuf::SerializeSize(m_len) + EndSegmentBlockSerializer::SerializeSize(&m_controlBlock);
}
void LogSegment::Serialize(char* dataOut)
{
dataOut = SerializableCharBuf::Serialize(dataOut, m_data, m_len);
EndSegmentBlockSerializer::Serialize(&m_controlBlock, dataOut);
}
void LogSegment::Deserialize(const char* in)
{
char* dataIn = (char*)in;
dataIn = SerializableCharBuf::Deserialize(dataIn, m_data, m_len);
EndSegmentBlockSerializer::Deserialize(&m_controlBlock, dataIn);
}
} // namespace MOT

View File

@ -13,55 +13,22 @@
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* transaction_buffer_iterator.h
* redo_log_transaction_iterator.h
* Iterator for iterating over redo log transactions.
*
* IDENTIFICATION
* src/gausskernel/storage/mot/core/src/system/recovery/transaction_buffer_iterator.h
* src/gausskernel/storage/mot/core/src/system/recovery/redo_log_transaction_iterator.h
*
* -------------------------------------------------------------------------
*/
#ifndef TRANSACTION_BUFFER_ITERATOR_H
#define TRANSACTION_BUFFER_ITERATOR_H
#ifndef REDO_LOG_TRANSACTION_ITERATOR_H
#define REDO_LOG_TRANSACTION_ITERATOR_H
#include "redo_log_global.h"
#include "redo_log_writer.h"
#include "serializable.h"
#include "log_segment.h"
namespace MOT {
/**
* @struct LogSegment
* @brief encapsulates a chunk of logging data
*/
struct LogSegment : public Serializable {
char* m_data;
size_t m_len;
EndSegmentBlock m_controlBlock;
uint64_t m_replayLsn;
/**
* @brief fetches the size of the log segment
* @return Size_t value denoting the size of the segment.
*/
virtual size_t SerializeSize();
/**
* @brief serialize the log segment into a given buffer
* @param dataOut the output buffer
*/
virtual void Serialize(char* dataOut);
/**
* @brief creates a log segment from a data buffer.
* @param dataIn the input buffer.
*/
virtual void Deserialize(const char* dataIn);
};
/**
* @class RedoLogBufferIterator
* @brief Iterator for iterating over redo log transactions.
@ -123,4 +90,4 @@ private:
};
} // namespace MOT
#endif /* TRANSACTION_BUFFER_ITERATOR_H */
#endif /* REDO_LOG_TRANSACTION_ITERATOR_H */

View File

@ -0,0 +1,85 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* redo_log_transaction_segments.cpp
* Implements an array of log segments that are part of a specific transaction id.
*
* IDENTIFICATION
* src/gausskernel/storage/mot/core/src/system/recovery/redo_log_transaction_segments.cpp
*
* -------------------------------------------------------------------------
*/
#include "redo_log_transaction_segments.h"
namespace MOT {
RedoLogTransactionSegments::~RedoLogTransactionSegments()
{
if (m_segments != nullptr) {
for (uint32_t i = 0; i < m_count; i++) {
delete m_segments[i];
}
free(m_segments);
}
}
bool RedoLogTransactionSegments::Append(LogSegment* segment)
{
if (m_count == m_maxSegments) {
// max segments allocated, need to extend the number of allocated LogSegments pointers
uint32_t newMaxSegments = m_maxSegments + DEFAULT_SEGMENT_NUM;
LogSegment** newSegments = (LogSegment**)malloc(newMaxSegments * sizeof(LogSegment*));
if (newSegments != nullptr) {
if (m_segments != nullptr) {
errno_t erc = memcpy_s(
newSegments, newMaxSegments * sizeof(LogSegment*), m_segments, m_maxSegments * sizeof(LogSegment*));
securec_check(erc, "\0", "\0");
free(m_segments);
}
m_segments = newSegments;
m_maxSegments = newMaxSegments;
} else {
return false;
}
}
m_size += segment->m_len;
m_segments[m_count] = segment;
m_count += 1;
return true;
}
char* RedoLogTransactionSegments::GetData(size_t position, size_t length) const
{
if (position + length > m_size) {
return nullptr;
}
uint32_t currentEntry = 0;
while (currentEntry < m_count) {
if (position > m_segments[currentEntry]->m_len) {
position -= m_segments[currentEntry]->m_len;
currentEntry++;
} else {
if (position + length > m_segments[currentEntry]->m_len) {
// Cross segments is not supported for now
return nullptr;
}
return (m_segments[currentEntry]->m_data + position);
}
}
return nullptr;
}
} // namespace MOT

View File

@ -0,0 +1,81 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* redo_log_transaction_segments.h
* Implements an array of log segments that are part of a specific transaction id.
*
* IDENTIFICATION
* src/gausskernel/storage/mot/core/src/system/recovery/redo_log_transaction_segments.h
*
* -------------------------------------------------------------------------
*/
#ifndef REDO_LOG_TRANSACTION_SEGMENTS_H
#define REDO_LOG_TRANSACTION_SEGMENTS_H
#include "log_segment.h"
namespace MOT {
class RedoLogTransactionSegments {
public:
explicit RedoLogTransactionSegments(TransactionId id)
: m_transactionId(id), m_segments(nullptr), m_count(0), m_size(0), m_maxSegments(0)
{}
~RedoLogTransactionSegments();
bool Append(LogSegment* segment);
uint64_t GetTransactionId() const
{
return m_transactionId;
}
size_t GetSize() const
{
return m_size;
}
char* GetData(size_t position, size_t length) const;
uint32_t GetCount() const
{
return m_count;
}
LogSegment* GetSegment(uint32_t index) const
{
if (index > m_count || m_count == 0) {
return nullptr;
}
return m_segments[index];
}
private:
static constexpr uint32_t DEFAULT_SEGMENT_NUM = 1024;
uint64_t m_transactionId;
LogSegment** m_segments;
uint32_t m_count;
size_t m_size;
uint32_t m_maxSegments;
};
} // namespace MOT
#endif /* REDO_LOG_TRANSACTION_SEGMENTS_H */

View File

@ -0,0 +1,95 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* surrogate_state.cpp
* Surrogate key info helper class.
*
* IDENTIFICATION
* src/gausskernel/storage/mot/core/src/system/recovery/surrogate_state.cpp
*
* -------------------------------------------------------------------------
*/
#include "mot_engine.h"
#include "surrogate_state.h"
namespace MOT {
DECLARE_LOGGER(SurrogateState, Recovery);
SurrogateState::SurrogateState()
{
uint32_t maxConnections = GetGlobalConfiguration().m_maxConnections;
m_empty = true;
m_maxConnections = 0;
m_insertsArray = new (std::nothrow) uint64_t[maxConnections];
if (m_insertsArray != nullptr) {
m_maxConnections = maxConnections;
errno_t erc =
memset_s(m_insertsArray, m_maxConnections * sizeof(uint64_t), 0, m_maxConnections * sizeof(uint64_t));
securec_check(erc, "\0", "\0");
}
}
SurrogateState::~SurrogateState()
{
if (m_insertsArray != nullptr) {
delete[] m_insertsArray;
}
}
void SurrogateState::ExtractInfoFromKey(uint64_t key, uint64_t& pid, uint64_t& insertions)
{
pid = key >> SurrogateKeyGenerator::KEY_BITS;
insertions = key & 0x0000FFFFFFFFFFFFULL;
insertions++;
}
void SurrogateState::UpdateMaxInsertions(uint64_t insertions, uint32_t pid)
{
if (pid < m_maxConnections && m_insertsArray[pid] < insertions) {
m_insertsArray[pid] = insertions;
if (m_empty) {
m_empty = false;
}
}
}
bool SurrogateState::UpdateMaxKey(uint64_t key)
{
uint64_t pid = 0;
uint64_t insertions = 0;
ExtractInfoFromKey(key, pid, insertions);
if (pid >= m_maxConnections) {
MOT_LOG_WARN(
"SurrogateState::UpdateMaxKey: ConnectionId %lu exceeds max_connections %u", pid, m_maxConnections);
return false;
}
UpdateMaxInsertions(insertions, pid);
return true;
}
void SurrogateState::Merge(std::list<uint64_t*>& arrays, SurrogateState& global)
{
std::list<uint64_t*>::iterator i;
for (i = arrays.begin(); i != arrays.end(); ++i) {
for (uint32_t j = 0; j < global.GetMaxConnections(); ++j) {
global.UpdateMaxInsertions((*i)[j], j);
}
delete[](*i);
}
}
} // namespace MOT

View File

@ -0,0 +1,104 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* surrogate_state.h
* Surrogate key info helper class.
*
* IDENTIFICATION
* src/gausskernel/storage/mot/core/src/system/recovery/surrogate_state.h
*
* -------------------------------------------------------------------------
*/
#ifndef SURROGATE_STATE_H
#define SURROGATE_STATE_H
#include <cstdint>
#include <list>
namespace MOT {
/**
* @class SurrogateState
* @brief Implements a surrogate state array in order to
* properly recover tables that were created with surrogate
* primary keys.
*/
class SurrogateState {
public:
SurrogateState();
/**
* @brief Extracts the surrogate counter from the key and updates
* the array according to the connection id
* @param key The key to extract.
* @return Boolean value denoting success or failure.
*/
bool UpdateMaxKey(uint64_t key);
/**
* @brief A helper that updates the max insertions of a thread id.
* @param insertions The insertions count.
* @param tid The thread id.
*/
void UpdateMaxInsertions(uint64_t insertions, uint32_t tid);
/**
* @brief A helper that extracts the insertions count and
* thread id from a key
* @param key The key to extract the info from.
* @param pid The returned thread id.
* @param insertions The returned number of insertions.
*/
inline void ExtractInfoFromKey(uint64_t key, uint64_t& pid, uint64_t& insertions);
/**
* @brief merges some max insertions arrays into a one single state
* @param arrays an arrays list.
* @param global the returned merged SurrogateState.
*/
static void Merge(std::list<uint64_t*>& arrays, SurrogateState& global);
bool IsEmpty() const
{
return m_empty;
}
const uint64_t* GetArray() const
{
return m_insertsArray;
}
uint32_t GetMaxConnections() const
{
return m_maxConnections;
}
bool IsValid() const
{
return (m_insertsArray != nullptr);
}
~SurrogateState();
private:
uint64_t* m_insertsArray;
bool m_empty;
uint32_t m_maxConnections;
};
} // namespace MOT
#endif /* SURROGATE_STATE_H */

View File

@ -1270,7 +1270,7 @@ RC TxnManager::SavePreparedData()
if (m_transactionId != INVALID_TRANSACTION_ID) {
MOT_LOG_DEBUG("mapping ext txid %lu to %lu", m_transactionId, m_internalTransactionId);
MOT::GetRecoveryManager()->UpdateTxIdMap(m_internalTransactionId, m_transactionId);
MOTEngine::GetInstance()->GetInProcessTransactions().UpdateTxIdMap(m_internalTransactionId, m_transactionId);
}
return RC_OK;

View File

@ -32,7 +32,7 @@
#include "mot_configuration.h"
namespace MOT {
DECLARE_LOGGER(AsyncRedoLogHandler, redolog)
DECLARE_LOGGER(AsyncRedoLogHandler, RedoLog)
AsyncRedoLogHandler::AsyncRedoLogHandler()
: m_bufferPool(),

View File

@ -29,7 +29,7 @@
#include "group_synchronous_redo_log_handler.h"
namespace MOT {
DECLARE_LOGGER(CommitGroup, redolog);
DECLARE_LOGGER(CommitGroup, RedoLog);
CommitGroup::CommitGroup(RedoLogBuffer* buffer, GroupSyncRedoLogHandler* handler, const uint8_t id)
: m_handlerId(id),

View File

@ -29,7 +29,7 @@
#include "session_context.h"
namespace MOT {
DECLARE_LOGGER(SegmentedGroupSyncRedoLogHandler, redolog);
DECLARE_LOGGER(SegmentedGroupSyncRedoLogHandler, RedoLog);
SegmentedGroupSyncRedoLogHandler::SegmentedGroupSyncRedoLogHandler()
: m_numaNodes(GetGlobalConfiguration().m_numaNodes), m_redoLogHandlerArray(NULL)

View File

@ -28,7 +28,7 @@
#include <string.h>
namespace MOT {
DECLARE_LOGGER(RedoLogHandlerType, redolog)
DECLARE_LOGGER(RedoLogHandlerType, RedoLog)
static const char* NONE_STR = "none";
static const char* SYNC_REDO_LOG_HANDLER_STR = "synchronous";

View File

@ -205,30 +205,6 @@ static inline List* BitmapSerialize(List* result, uint8_t* bitmap, int16_t len)
return result;
}
int MOTXlateRecoveryErr(int err)
{
int code = 0;
switch (err) {
case MOT::RecoveryManager::ErrCodes::NO_ERROR:
code = ERRCODE_SUCCESSFUL_COMPLETION;
break;
case MOT::RecoveryManager::ErrCodes::CP_SETUP:
code = ERRCODE_CONFIG_FILE_ERROR;
break;
case MOT::RecoveryManager::ErrCodes::CP_META:
code = ERRCODE_INVALID_TABLE_DEFINITION;
break;
case MOT::RecoveryManager::ErrCodes::CP_RECOVERY:
case MOT::RecoveryManager::ErrCodes::XLOG_SETUP:
case MOT::RecoveryManager::ErrCodes::XLOG_RECOVERY:
code = ERRCODE_INTERNAL_ERROR;
break;
default:
break;
}
return code;
}
void MOTRecover()
{
if (!MOTAdaptor::m_initialized) {
@ -239,9 +215,7 @@ void MOTRecover()
EnsureSafeThreadAccess();
if (!MOT::MOTEngine::GetInstance()->StartRecovery()) {
// we treat errors fatally.
ereport(FATAL,
(MOTXlateRecoveryErr(MOT::GetRecoveryManager()->GetErrorCode()),
errmsg("%s", MOT::GetRecoveryManager()->GetErrorString())));
ereport(FATAL, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("MOT checkpoint recovery failed.")));
}
if (!g_instance.attr.attr_common.enable_thread_pool) {
@ -260,10 +234,8 @@ void MOTRecoveryDone()
EnsureSafeThreadAccess();
if (!MOT::MOTEngine::GetInstance()->EndRecovery()) {
// we treat errors fatally.
ereport(FATAL,
(MOTXlateRecoveryErr(MOT::GetRecoveryManager()->GetErrorCode()),
errmsg("%s", MOT::GetRecoveryManager()->GetErrorString())));
ereport(FATAL, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("MOT recovery failed.")));
}
}
@ -2160,13 +2132,12 @@ bool MOTCheckpointExists(
return false;
}
MOT::RecoveryManager* recoveryManager = engine->GetRecoveryManager();
MOT::CheckpointManager* checkpointManager = engine->GetCheckpointManager();
if (recoveryManager == nullptr || checkpointManager == nullptr) {
if (checkpointManager == nullptr) {
return false;
}
if (recoveryManager->GetCheckpointId() == MOT::CheckpointControlFile::invalidId) {
if (checkpointManager->GetId() == MOT::CheckpointControlFile::invalidId) {
return false;
}

View File

@ -34,8 +34,6 @@
#include "recovery_manager.h"
#include "miscadmin.h"
extern int MOTXlateRecoveryErr(int err);
bool IsValidEntry(uint8 code)
{
return code == MOT_REDO_DATA;
@ -74,9 +72,7 @@ void MOTRedo(XLogReaderState* record)
}
if (MOT::GetRecoveryManager()->IsErrorSet() || !MOT::GetRecoveryManager()->ApplyRedoLog(lsn, data, len)) {
// we treat errors fatally.
ereport(FATAL,
(MOTXlateRecoveryErr(MOT::GetRecoveryManager()->GetErrorCode()),
errmsg("%s", MOT::GetRecoveryManager()->GetErrorString())));
ereport(FATAL, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("MOT recovery failed.")));
}
}

View File

@ -64,8 +64,6 @@
// for checking if LLVM_ENABLE_DUMP is defined and for using LLVM_VERSION_STRING
#include "llvm/Config/llvm-config.h"
extern bool GlobalCodeGenEnvironmentSuccess;
namespace JitExec {
/** @struct Holds instructions that evaluate in runtime to begin and end iterators of a cursor. */
struct JitLlvmRuntimeCursor {

View File

@ -66,4 +66,4 @@ extern int JitExecTvmQuery(JitContext* jitContext, ParamListInfo params, TupleTa
uint64_t* tuplesProcessed, int* scanEnded, int newScan);
} // namespace JitExec
#endif // JIT_TVM_EXEC_H
#endif /* JIT_TVM_QUERY_CODEGEN_H */

View File

@ -44,14 +44,9 @@
namespace JitExec {
// uncomment this to debug JIT execution
//#define MOT_JIT_DEBUG
// uncomment this to use advanced WHERE clause operators
//#define MOT_JIT_ADVANCED_WHERE_OP
// uncomment to enable features required for JIT testing
//#define MOT_JIT_TEST
// To debug JIT execution, #define MOT_JIT_DEBUG
// To use advanced WHERE clause operators, #define MOT_JIT_ADVANCED_WHERE_OP
// To enable features required for JIT testing, #define MOT_JIT_TEST
/** @enum JitCommandType Command types supported by jitted queries. */
enum JitCommandType : uint8_t {