Fix MOT ckpt and DDL concurrency issues

This commit is contained in:
Vinoth
2020-07-22 10:23:14 +08:00
parent 03d74e42be
commit 696a3d6f37
20 changed files with 308 additions and 310 deletions

View File

@ -14,7 +14,7 @@
* -------------------------------------------------------------------------
*
* rw_lock.cpp
* Implements a reader/writer lock used in the checkpoint manager.
* Implements a reader/writer lock using spinlock.
*
* IDENTIFICATION
* src/gausskernel/storage/mot/core/src/infra/synchronization/rw_lock.cpp

View File

@ -14,7 +14,7 @@
* -------------------------------------------------------------------------
*
* rw_lock.h
* Implements a reader/writer lock used in the checkpoint manager.
* Implements a reader/writer lock using spinlock.
*
* IDENTIFICATION
* src/gausskernel/storage/mot/core/src/infra/synchronization/rw_lock.h
@ -30,7 +30,7 @@ typedef unsigned Spinlock;
/**
* @class RwLock
* @brief this class implements a reader/writer lock used in the checkpint manager
* @brief this class implements a reader/writer lock using spinlock
*/
class RwLock {
public:

View File

@ -67,10 +67,21 @@ Table::~Table()
if (m_rowPool) {
ObjAllocInterface::FreeObjPool(&m_rowPool);
}
int destroyRc = pthread_rwlock_destroy(&m_rwLock);
if (destroyRc != 0) {
MOT_LOG_ERROR("~Table: rwlock destroy failed (%d)", destroyRc);
}
}
bool Table::Init(const char* tableName, const char* longName, unsigned int fieldCnt, uint64_t tableExId)
{
int initRc = pthread_rwlock_init(&m_rwLock, NULL);
if (initRc != 0) {
MOT_LOG_ERROR("failed to initialize Table %s, could not init rwlock (%d)", tableName, initRc);
return false;
}
m_tableName.assign(tableName);
m_longTableName.assign(longName);
@ -676,7 +687,7 @@ void Table::PrintSchema()
void Table::Truncate(TxnManager* txn)
{
uint32_t pid = txn->GetThdId();
m_mutex.lock();
(void)pthread_rwlock_wrlock(&m_rwLock);
// first destroy secondary index data
for (int i = 1; i < m_numIndexes; i++) {
@ -696,7 +707,7 @@ void Table::Truncate(TxnManager* txn)
m_longTableName.c_str());
}
m_mutex.unlock();
(void)pthread_rwlock_unlock(&m_rwLock);
}
void Table::Compact(TxnManager* txn)
@ -1039,7 +1050,7 @@ RC Table::DropImpl()
if (m_numIndexes == 0)
return res;
m_mutex.lock();
(void)pthread_rwlock_wrlock(&m_rwLock);
do {
m_secondaryIndexes.clear();
MOT_LOG_DEBUG("DropImpl numIndexes = %d \n", m_numIndexes);
@ -1056,7 +1067,7 @@ RC Table::DropImpl()
}
m_numIndexes = 0;
} while (0);
m_mutex.unlock();
(void)pthread_rwlock_unlock(&m_rwLock);
return res;
}

View File

@ -30,6 +30,7 @@
#include <string>
#include <iostream>
#include <memory>
#include <pthread.h>
#include "global.h"
#include "sentinel.h"
#include "surrogate_key_generator.h"
@ -704,19 +705,39 @@ public:
}
/**
* @brief locks that table for modifications
* @brief takes a read lock on the table.
*/
void Lock()
void RdLock()
{
m_mutex.lock();
(void)pthread_rwlock_rdlock(&m_rwLock);
}
/**
* @brief unlocks the table
* @brief tries to takes a write lock on the table.
* @return True on success, False if the lock could not be acquired.
*/
bool WrTryLock()
{
if (pthread_rwlock_trywrlock(&m_rwLock) != 0) {
return false;
}
return true;
}
/**
* @brief takes a write lock on the table.
*/
void WrLock()
{
(void)pthread_rwlock_wrlock(&m_rwLock);
}
/**
* @brief releases the table lock.
*/
void Unlock()
{
m_mutex.unlock();
(void)pthread_rwlock_unlock(&m_rwLock);
}
bool IsDeserialized() const
@ -785,8 +806,8 @@ private:
/** @var Secondary index map accessed by name. */
SecondaryIndexMap m_secondaryIndexes;
/** @var Lock that guards against deletion while checkpointing. */
std::mutex m_mutex;
/** @var RW Lock that guards against deletion during checkpoint/vacuum. */
pthread_rwlock_t m_rwLock;
string m_tableName;

View File

@ -47,7 +47,6 @@ CheckpointManager::CheckpointManager()
m_availableBit(true),
m_numCpTasks(0),
m_numThreads(GetGlobalConfiguration().m_checkpointWorkers),
m_checkpointValidation(GetGlobalConfiguration().m_validateCheckpoint),
m_cpSegThreshold(GetGlobalConfiguration().m_checkpointSegThreshold),
m_stopFlag(false),
m_checkpointEnded(false),
@ -56,10 +55,22 @@ CheckpointManager::CheckpointManager()
m_counters{{0}},
m_lsn(0),
m_id(0),
m_inProgressId(0),
m_lastReplayLsn(0),
m_emptyCheckpoint(false)
{}
bool CheckpointManager::Initialize()
{
int initRc = pthread_rwlock_init(&m_fetchLock, NULL);
if (initRc != 0) {
MOT_LOG_ERROR("Failed to initialize CheckpointManager, could not init rwlock (%d)", initRc);
return false;
}
return true;
}
void CheckpointManager::ResetFlags()
{
m_checkpointEnded = false;
@ -74,26 +85,24 @@ CheckpointManager::~CheckpointManager()
delete m_checkpointers;
m_checkpointers = nullptr;
}
(void)pthread_rwlock_destroy(&m_fetchLock);
}
bool CheckpointManager::CreateSnapShot()
{
if (!CheckpointManager::CreateCheckpointId(m_id)) {
if (!CheckpointManager::CreateCheckpointId(m_inProgressId)) {
MOT_LOG_ERROR("Could not begin checkpoint, checkpoint id creation failed");
OnError(CheckpointWorkerPool::ErrCodes::CALC, "Could not begin checkpoint, checkpoint id creation failed");
return false;
}
MOT_LOG_INFO("Creating MOT checkpoint snapshot: id: %lu", GetId());
MOT_LOG_INFO("Creating MOT checkpoint snapshot: id: %lu", m_inProgressId);
if (m_phase != CheckpointPhase::REST) {
MOT_LOG_WARN("Could not begin checkpoint, checkpoint is already running");
OnError(CheckpointWorkerPool::ErrCodes::CALC, "Could not begin checkpoint, checkpoint is already running");
return false;
}
MOT::MOTEngine* engine = MOT::MOTEngine::GetInstance();
engine->LockDDLForCheckpoint();
ResetFlags();
// Ensure that there are no transactions that started in Checkpoint COMPLETE
@ -114,29 +123,36 @@ bool CheckpointManager::CreateSnapShot()
bool CheckpointManager::SnapshotReady(uint64_t lsn)
{
MOT_LOG_INFO("MOT snapshot ready. id: %lu, lsn: %lu", GetId(), m_lsn);
MOT_LOG_INFO("MOT snapshot ready. id: %lu, lsn: %lu", m_inProgressId, m_lsn);
if (m_phase != CheckpointPhase::CAPTURE) {
MOT_LOG_ERROR("BAD Checkpoint state. Checkpoint ID: %lu, expected: 'CAPTURE', actual: %s",
GetId(),
m_inProgressId,
PhaseToString(m_phase));
m_errorSet = true;
} else {
SetLsn(lsn);
if (m_redoLogHandler != nullptr)
m_redoLogHandler->WrUnlock();
MOT_LOG_DEBUG("Checkpoint snapshot ready. Checkpoint ID: %lu, LSN: %lu", GetId(), GetLsn());
MOT_LOG_DEBUG("Checkpoint snapshot ready. Checkpoint ID: %lu, LSN: %lu", m_inProgressId, GetLsn());
}
return !m_errorSet;
}
bool CheckpointManager::BeginCheckpoint()
{
MOT_LOG_INFO("MOT begin checkpoint capture. id: %lu, lsn: %lu", GetId(), m_lsn);
MOT_LOG_INFO("MOT begin checkpoint capture. id: %lu, lsn: %lu", m_inProgressId, m_lsn);
Capture();
while (!m_checkpointEnded) {
usleep(100000L);
if (m_finishedTasks.empty() == false) {
std::lock_guard<std::mutex> guard(m_tasksMutex);
UnlockAndClearTables(m_finishedTasks);
}
}
// No locking required here, as the checkpoint workers have already exited.
UnlockAndClearTables(m_finishedTasks);
// Move to complete.
// No need to wait for transactions started in previous checkpoint phase
// to complete since there are no transactions that start in RESOLVE
@ -145,9 +161,13 @@ bool CheckpointManager::BeginCheckpoint()
m_lock.WrUnlock();
if (!m_errorSet) {
CompleteCheckpoint(GetId());
CompleteCheckpoint();
}
// No locking required here, as the checkpoint workers have already exited.
UnlockAndClearTables(m_tasksList);
m_numCpTasks = 0;
// Ensure that there are no transactions that started in Checkpoint CAPTURE
// phase that are not yet completed before moving to REST phase
WaitPrevPhaseCommittedTxnComplete();
@ -157,8 +177,6 @@ bool CheckpointManager::BeginCheckpoint()
MoveToNextPhase();
m_lock.WrUnlock();
MOT::MOTEngine* engine = MOT::MOTEngine::GetInstance();
engine->UnlockDDLForCheckpoint();
return !m_errorSet;
}
@ -183,13 +201,15 @@ bool CheckpointManager::Abort()
// phase that are not yet completed before moving to REST phase
WaitPrevPhaseCommittedTxnComplete();
// No locking required here, as there no checkpoint workers when the control reaches here.
UnlockAndClearTables(m_tasksList);
UnlockAndClearTables(m_finishedTasks);
m_numCpTasks = 0;
// Move to rest
m_lock.WrLock();
MoveToNextPhase();
m_lock.WrUnlock();
MOT::MOTEngine* engine = MOT::MOTEngine::GetInstance();
engine->UnlockDDLForCheckpoint();
}
return true;
}
@ -250,8 +270,10 @@ void CheckpointManager::CommitTransaction(TxnManager* txn, int writeSetSize)
void CheckpointManager::TransactionCompleted(TxnManager* txn)
{
if (txn->m_checkpointPhase == CheckpointPhase::NONE)
if (txn->m_checkpointPhase == CheckpointPhase::NONE) {
return;
}
m_lock.RdLock();
CheckpointPhase current_phase = m_phase;
if (txn->m_checkpointPhase == m_phase) { // current phase
@ -302,14 +324,18 @@ void CheckpointManager::MoveToNextPhase()
m_phase = (CheckpointPhase)nextPhase;
m_cntBit = !m_cntBit;
if (m_phase == CheckpointPhase::CAPTURE && m_redoLogHandler != nullptr) {
// hold the redo log lock to avoid inserting additional entries to the
// log. Once snapshot is taken, this lock will be released in SnapshotReady().
m_redoLogHandler->WrLock();
if (m_phase == CheckpointPhase::PREPARE) {
// Obtain a list of all tables. The tables are read locked
// in order to avoid delete/truncate during checkpoint.
FillTasksQueue();
}
if (m_phase == PREPARE && m_checkpointValidation == true) {
Checkbits();
if (m_phase == CheckpointPhase::CAPTURE) {
if (m_redoLogHandler != nullptr) {
// hold the redo log lock to avoid inserting additional entries to the
// log. Once snapshot is taken, this lock will be released in SnapshotReady().
m_redoLogHandler->WrLock();
}
}
// there are no open transactions from previous phase, we can move forward to next phase
@ -318,41 +344,6 @@ void CheckpointManager::MoveToNextPhase()
}
}
void CheckpointManager::Checkbits()
{
GetTableManager()->AddTableIdsToList(m_tasksList);
m_numCpTasks = m_tasksList.size();
while (!m_tasksList.empty()) {
uint32_t curId = m_tasksList.front();
MOT_LOG_DEBUG("checkbits - %u", curId);
Table* table = GetTableManager()->GetTable(curId);
if (table == nullptr) {
MOT_LOG_ERROR("could not find tableId %u", curId);
continue;
}
m_tasksList.pop_front();
m_numCpTasks--;
Index* index = table->GetPrimaryIndex();
if (index == nullptr) {
MOT_LOG_ERROR("could not get primary index for tableId %u", curId);
continue;
}
IndexIterator* it = index->Begin(0);
while (it != nullptr && it->IsValid()) {
MOT::Sentinel* Sentinel = it->GetPrimarySentinel();
MOT::Row* r = Sentinel->GetData();
if (!r->IsAbsentRow() && Sentinel->GetStableStatus() == m_availableBit)
MOT_LOG_ERROR("CHECKPOINT, AVAILABLE BIT IS SET");
if (Sentinel->GetStable() != nullptr)
MOT_LOG_ERROR("CHECKPOINT, HAS STABLE DATA!!!");
it->Next();
}
}
MOT_LOG_DEBUG("checkbits - done");
}
const char* CheckpointManager::PhaseToString(CheckpointPhase phase)
{
switch (phase) {
@ -375,6 +366,7 @@ const char* CheckpointManager::PhaseToString(CheckpointPhase phase)
bool CheckpointManager::ApplyWrite(TxnManager* txnMan, Row* origRow, AccessType type)
{
CheckpointPhase startPhase = txnMan->m_checkpointPhase;
MOT_ASSERT(startPhase != RESOLVE);
Sentinel* s = origRow->GetPrimarySentinel();
MOT_ASSERT(s);
if (s == nullptr) {
@ -385,32 +377,36 @@ bool CheckpointManager::ApplyWrite(TxnManager* txnMan, Row* origRow, AccessType
bool statusBit = s->GetStableStatus();
switch (startPhase) {
case REST:
if (type == INS)
if (type == INS) {
s->SetStableStatus(!m_availableBit);
}
break;
case PREPARE:
if (type == INS)
if (type == INS) {
s->SetStableStatus(!m_availableBit);
else if (statusBit == !m_availableBit) {
if (!CheckpointUtils::SetStableRow(origRow))
} else if (statusBit == !m_availableBit) {
if (!CheckpointUtils::SetStableRow(origRow)) {
return false;
}
}
break;
case RESOLVE:
case CAPTURE:
if (type == INS)
if (type == INS) {
s->SetStableStatus(m_availableBit);
else {
} else {
if (statusBit == !m_availableBit) {
if (!CheckpointUtils::SetStableRow(origRow))
if (!CheckpointUtils::SetStableRow(origRow)) {
return false;
}
s->SetStableStatus(m_availableBit);
}
}
break;
case COMPLETE:
if (type == INS)
if (type == INS) {
s->SetStableStatus(!txnMan->m_checkpointNABit);
}
break;
default:
MOT_LOG_ERROR("Unknown transaction start phase: %s", CheckpointManager::PhaseToString(startPhase));
@ -426,22 +422,40 @@ void CheckpointManager::FillTasksQueue()
OnError(CheckpointWorkerPool::ErrCodes::CALC, "CheckpointManager::fillTasksQueue: queue is not empty!");
return;
}
GetTableManager()->AddTableIdsToList(m_tasksList);
GetTableManager()->AddTablesToList(m_tasksList);
m_numCpTasks = m_tasksList.size();
m_mapfileInfo.clear();
MOT_LOG_DEBUG("CheckpointManager::fillTasksQueue:: got %d tasks", m_tasksList.size());
}
void CheckpointManager::TaskDone(uint32_t tableId, uint32_t numSegs, bool success)
void CheckpointManager::UnlockAndClearTables(std::list<Table *>& tables)
{
std::list<Table *>::iterator it;
for (it = tables.begin(); it != tables.end(); ++it) {
Table *table = *it;
if (table != nullptr) {
table->Unlock();
}
}
tables.clear();
}
void CheckpointManager::TaskDone(Table* table, uint32_t numSegs, bool success)
{
MOT_ASSERT(table);
if (success) { /* only successful tasks are added to the map file */
if (table == nullptr) {
OnError(CheckpointWorkerPool::ErrCodes::MEMORY, "Got a null table on task done");
return;
}
MapFileEntry* entry = new (std::nothrow) MapFileEntry();
if (entry != nullptr) {
entry->m_id = tableId;
entry->m_id = table->GetTableId();
entry->m_numSegs = numSegs;
MOT_LOG_DEBUG("TaskDone %lu: %u %u segs", GetId(), tableId, numSegs);
std::lock_guard<std::mutex> guard(m_mapfileMutex);
MOT_LOG_DEBUG("TaskDone %lu: %u %u segs", m_inProgressId, entry->m_id, numSegs);
std::lock_guard<std::mutex> guard(m_tasksMutex);
m_mapfileInfo.push_back(entry);
m_finishedTasks.push_back(table);
} else {
OnError(CheckpointWorkerPool::ErrCodes::MEMORY, "Failed to allocate map file entry");
return;
@ -453,7 +467,7 @@ void CheckpointManager::TaskDone(uint32_t tableId, uint32_t numSegs, bool succes
}
}
void CheckpointManager::CompleteCheckpoint(uint64_t checkpointId)
void CheckpointManager::CompleteCheckpoint()
{
if (m_emptyCheckpoint == true && CreateEmptyCheckpoint() == false) {
OnError(CheckpointWorkerPool::ErrCodes::FILE_IO, "Failed to create empty checkpoint");
@ -466,12 +480,12 @@ void CheckpointManager::CompleteCheckpoint(uint64_t checkpointId)
return;
}
if (!CreateCheckpointMap(checkpointId)) {
if (!CreateCheckpointMap()) {
OnError(CheckpointWorkerPool::ErrCodes::FILE_IO, "Failed to create map file");
return;
}
if (!CreateTpcRecoveryFile(checkpointId)) {
if (!CreateTpcRecoveryFile()) {
OnError(CheckpointWorkerPool::ErrCodes::FILE_IO, "Failed to create 2pc recovery file");
return;
}
@ -481,22 +495,32 @@ void CheckpointManager::CompleteCheckpoint(uint64_t checkpointId)
return;
}
m_fetchLock.WrLock();
if (!ctrlFile->Update(checkpointId, GetLsn(), GetLastReplayLsn())) {
OnError(CheckpointWorkerPool::ErrCodes::FILE_IO, "Failed to update control file");
bool finishedUpdatingFiles = false;
(void)pthread_rwlock_wrlock(&m_fetchLock);
do {
if (!CreateEndFile()) {
OnError(CheckpointWorkerPool::ErrCodes::FILE_IO, "Failed to create completion file");
break;
}
if (!ctrlFile->Update(m_inProgressId, GetLsn(), GetLastReplayLsn())) {
OnError(CheckpointWorkerPool::ErrCodes::FILE_IO, "Failed to update control file");
break;
}
// Update checkpoint Id
SetId(m_inProgressId);
GetRecoveryManager()->SetCheckpointId(m_id);
finishedUpdatingFiles = true;
} while (0);
(void)pthread_rwlock_unlock(&m_fetchLock);
if (!finishedUpdatingFiles) {
return;
}
GetRecoveryManager()->SetCheckpointId(checkpointId);
if (!CreateEndFile(checkpointId)) {
OnError(CheckpointWorkerPool::ErrCodes::FILE_IO, "Failed to create completion file");
return;
}
m_fetchLock.WrUnlock();
RemoveOldCheckpoints(checkpointId);
MOT_LOG_INFO("Checkpoint [%lu] completed", checkpointId);
RemoveOldCheckpoints(m_inProgressId);
MOT_LOG_INFO("Checkpoint [%lu] completed", m_inProgressId);
}
void CheckpointManager::DestroyCheckpointers()
@ -510,18 +534,12 @@ void CheckpointManager::DestroyCheckpointers()
void CheckpointManager::CreateCheckpointers()
{
m_checkpointers = new (std::nothrow)
CheckpointWorkerPool(m_numThreads, !m_availableBit, m_tasksList, m_cpSegThreshold, m_id, *this);
CheckpointWorkerPool(m_numThreads, !m_availableBit, m_tasksList, m_cpSegThreshold, m_inProgressId, *this);
}
void CheckpointManager::Capture()
{
MOT_LOG_DEBUG("CheckpointManager::capture");
if (m_numCpTasks) {
MOT_LOG_ERROR("The number of tasks is %d, cannot start capture!", m_numCpTasks.load());
return;
}
FillTasksQueue();
if (m_numCpTasks == 0) {
MOT_LOG_INFO("No tasks in queue - empty checkpoint");
@ -628,7 +646,7 @@ void CheckpointManager::RemoveCheckpointDir(uint64_t checkpointId)
free(buf);
}
bool CheckpointManager::CreateCheckpointMap(uint64_t checkpointId)
bool CheckpointManager::CreateCheckpointMap()
{
int fd = -1;
std::string fileName;
@ -636,10 +654,11 @@ bool CheckpointManager::CreateCheckpointMap(uint64_t checkpointId)
bool ret = false;
do {
if (!CheckpointUtils::SetWorkingDir(workingDir, checkpointId))
if (!CheckpointUtils::SetWorkingDir(workingDir, m_inProgressId)) {
break;
}
CheckpointUtils::MakeMapFilename(fileName, workingDir, checkpointId);
CheckpointUtils::MakeMapFilename(fileName, workingDir, m_inProgressId);
if (!CheckpointUtils::OpenFileWrite(fileName, fd)) {
MOT_LOG_ERROR("createCheckpointMap: failed to create file '%s' - %d - %s",
fileName.c_str(),
@ -704,7 +723,7 @@ bool CheckpointManager::CreateEmptyCheckpoint()
{
std::string workingDir;
if (!CheckpointUtils::SetWorkingDir(workingDir, m_id)) {
if (!CheckpointUtils::SetWorkingDir(workingDir, m_inProgressId)) {
OnError(CheckpointWorkerPool::ErrCodes::FILE_IO, "failed to setup working dir");
return false;
}
@ -763,7 +782,7 @@ bool CheckpointManager::CreateCheckpointDir(std::string& dir)
return true;
}
bool CheckpointManager::CreateTpcRecoveryFile(uint64_t checkpointId)
bool CheckpointManager::CreateTpcRecoveryFile()
{
int fd = -1;
std::string fileName;
@ -771,10 +790,11 @@ bool CheckpointManager::CreateTpcRecoveryFile(uint64_t checkpointId)
bool ret = false;
do {
if (!CheckpointUtils::SetWorkingDir(workingDir, checkpointId))
if (!CheckpointUtils::SetWorkingDir(workingDir, m_inProgressId)) {
break;
}
CheckpointUtils::MakeTpcFilename(fileName, workingDir, checkpointId);
CheckpointUtils::MakeTpcFilename(fileName, workingDir, m_inProgressId);
if (!CheckpointUtils::OpenFileWrite(fileName, fd)) {
MOT_LOG_ERROR("create2PCRecoveryFile: failed to create file '%s' - %d - %s",
fileName.c_str(),
@ -819,7 +839,7 @@ bool CheckpointManager::CreateTpcRecoveryFile(uint64_t checkpointId)
return ret;
}
bool CheckpointManager::CreateEndFile(uint64_t checkpointId)
bool CheckpointManager::CreateEndFile()
{
int fd = -1;
std::string fileName;
@ -827,11 +847,11 @@ bool CheckpointManager::CreateEndFile(uint64_t checkpointId)
bool ret = false;
do {
if (!CheckpointUtils::SetWorkingDir(workingDir, checkpointId)) {
if (!CheckpointUtils::SetWorkingDir(workingDir, m_inProgressId)) {
break;
}
CheckpointUtils::MakeEndFilename(fileName, workingDir, checkpointId);
CheckpointUtils::MakeEndFilename(fileName, workingDir, m_inProgressId);
if (!CheckpointUtils::OpenFileWrite(fileName, fd)) {
MOT_LOG_ERROR(
"CreateEndFile: failed to create file '%s' - %d - %s", fileName.c_str(), errno, gs_strerror(errno));

View File

@ -27,6 +27,7 @@
#include <atomic>
#include <iostream>
#include <pthread.h>
#include "rw_lock.h"
#include "global.h"
#include "txn.h"
@ -48,10 +49,7 @@ public:
virtual ~CheckpointManager();
void SetValidation(bool val)
{
m_checkpointValidation = val;
}
bool Initialize();
/**
* @brief Starts an MOT checkpoint snapshot operation.
@ -112,11 +110,11 @@ public:
/**
* @brief Checkpoint task completion callback
* @param checkpointId The checkpoint's id.
* @param tableId The table's id.
* @param table The table's pointer.
* @param numSegs number of segments written.
* @param success Indicates a success or a failure.
*/
virtual void TaskDone(uint32_t tableId, uint32_t numSegs, bool success);
virtual void TaskDone(Table* table, uint32_t numSegs, bool success);
virtual bool ShouldStop() const
{
@ -178,12 +176,12 @@ public:
void FetchRdLock()
{
m_fetchLock.RdLock();
(void)pthread_rwlock_rdlock(&m_fetchLock);
}
void FetchRdUnlock()
{
m_fetchLock.RdUnlock();
(void)pthread_rwlock_unlock(&m_fetchLock);
}
bool GetCheckpointDirName(std::string& dirName);
@ -209,14 +207,17 @@ private:
volatile CheckpointPhase m_phase;
// NA 'bit' handling
volatile std::atomic_bool m_availableBit;
std::atomic_bool m_availableBit;
// Counts the number of table ids that we are checkpointing.
// When this reaches 0, the checkpoint is complete;
std::atomic<uint32_t> m_numCpTasks;
// Holds table IDs to checkpoint
std::list<uint32_t> m_tasksList;
// Holds tables to checkpoint to be passed to the checkpoint threads
std::list<Table*> m_tasksList;
// Holds finished (checkpointed) tables that can be released by the main thread
std::list<Table*> m_finishedTasks;
// the checkpoint workers pool
CheckpointWorkerPool* m_checkpointers = nullptr;
@ -224,11 +225,8 @@ private:
// Number of threads to run
int m_numThreads;
// Enable checkpoint validation - checkbits
bool m_checkpointValidation;
// Checkpoint map file information
std::mutex m_mapfileMutex;
// mutex for safeguarding mapfile and tasks queues access
std::mutex m_tasksMutex;
std::list<MapFileEntry*> m_mapfileInfo;
@ -259,16 +257,19 @@ private:
// Envelope's checkpoint lsn
uint64_t m_lsn;
// Current Checkpoint's ID
// Last Valid (completed) Checkpoint ID
uint64_t m_id;
// Current (in-progress) Checkpoint ID
uint64_t m_inProgressId;
// last seen recovery lsn
uint64_t m_lastReplayLsn;
bool m_emptyCheckpoint;
// this lock guards gs_ctl checkpoint fetching
RwLock m_fetchLock;
pthread_rwlock_t m_fetchLock;
void SetId(uint64_t id)
{
@ -297,7 +298,7 @@ private:
static const char* PhaseToString(CheckpointPhase phase);
void SwapAvailableAndNotAvailable()
inline void SwapAvailableAndNotAvailable()
{
m_availableBit = !m_availableBit;
}
@ -309,12 +310,11 @@ private:
bool CreateEmptyCheckpoint();
/**
* @brief Performs a checkpoint completion tasks:
* @brief Performs checkpoint completion tasks:
* updates control file, creates the map file
* and 2pc recovery file
* @param checkpointId The checkpoint's id.
*/
void CompleteCheckpoint(uint64_t checkpointId);
void CompleteCheckpoint();
/**
* @brief Performs the checkpoint's Capture phase
@ -327,6 +327,12 @@ private:
*/
void FillTasksQueue();
/**
* @brief Unlocks tables and clear the tables' list
* @param tables Tables list to clear
*/
void UnlockAndClearTables(std::list<Table *>& tables);
/**
* @brief Destroys all the checkpoint threads
*/
@ -357,35 +363,25 @@ private:
return false;
}
/**
* @brief A utility function to validate the checkpoint.
* should not be enabled by default since it can
* have an impact on checkpoint's completion time
*/
void Checkbits();
/**
* @brief Creates the checkpoint's map file - where all
* the metadata is stored in
* @param id The new checkpoint id
* @return Boolean value denoting success or failure.
*/
bool CreateCheckpointMap(uint64_t checkpointId);
bool CreateCheckpointMap();
/**
* @brief Saves the in-process transaction data for 2pc recovery
* purposes during the checkpoint.
* @param id The new checkpoint id
* @return Boolean value denoting success or failure.
*/
bool CreateTpcRecoveryFile(uint64_t checkpointId);
bool CreateTpcRecoveryFile();
/**
* @brief Creates a file that indicates checkpoint completion.
* @param id The checkoint id
* @return Boolean value denoting success or failure.
*/
bool CreateEndFile(uint64_t checkpointId);
bool CreateEndFile();
void ResetFlags();

View File

@ -146,11 +146,9 @@ int CheckpointWorkerPool::Checkpoint(Buffer* buffer, Sentinel* sentinel, int fd,
do {
if (statusBit == !m_na) { /* has stable version */
if (!deleted && stableRow == nullptr)
if (stableRow == nullptr) {
break;
if (deleted && stableRow == nullptr)
break;
if (stableRow != nullptr) {
} else {
if (!Write(buffer, stableRow, fd)) {
wrote = -1;
} else {
@ -172,13 +170,13 @@ int CheckpointWorkerPool::Checkpoint(Buffer* buffer, Sentinel* sentinel, int fd,
break;
}
sentinel->SetStableStatus(!m_na);
if (!Write(buffer, mainRow, fd))
if (!Write(buffer, mainRow, fd)) {
wrote = -1; // we failed to write, set error
else
} else {
wrote = 1;
}
break;
}
if (stableRow != nullptr) { /* should not happen! */
} else { /* should not happen! */
wrote = -1;
m_cpManager.OnError(ErrCodes::CALC, "Calc logic error - stable row");
}
@ -190,19 +188,18 @@ int CheckpointWorkerPool::Checkpoint(Buffer* buffer, Sentinel* sentinel, int fd,
return wrote;
}
bool CheckpointWorkerPool::GetTask(uint32_t& task)
Table* CheckpointWorkerPool::GetTask()
{
bool ret = false;
Table* table = nullptr;
m_tasksLock.lock();
do {
m_tasksLock.lock();
if (m_tasksList.empty())
break;
task = m_tasksList.front();
table = m_tasksList.front();
m_tasksList.pop_front();
ret = true;
} while (0);
m_tasksLock.unlock();
return ret;
return table;
}
void CheckpointWorkerPool::WorkerFunc()
@ -232,22 +229,17 @@ void CheckpointWorkerPool::WorkerFunc()
bool taskSucceeded = false;
Table* table = nullptr;
if (m_cpManager.ShouldStop())
if (m_cpManager.ShouldStop()) {
break;
}
bool haveWork = GetTask(tableId);
if (haveWork) {
table = GetTask();
if (table != nullptr) {
int fd = -1;
do {
uint32_t overallOps = 0;
table = GetTableManager()->GetTableSafe(tableId);
if (table == nullptr) {
MOT_LOG_INFO(
"CheckpointWorkerPool::workerFunc:Table %u does not exist - probably deleted already", tableId);
break;
}
tableId = table->GetTableId();
exId = table->GetTableExId();
size_t tableSize = table->SerializeSize();
char* tableBuf = new (std::nothrow) char[tableSize];
@ -438,16 +430,7 @@ void CheckpointWorkerPool::WorkerFunc()
}
}
if (table != nullptr) {
table->Unlock();
m_cpManager.TaskDone(tableId, seg, taskSucceeded);
} else {
/* taskSucceeded is false, so this table won't be added to the map file. */
m_cpManager.TaskDone(tableId, seg, taskSucceeded);
/* Table is dropped, but we need to continue processing other tables. */
taskSucceeded = true;
}
m_cpManager.TaskDone(table, seg, taskSucceeded);
if (!taskSucceeded) {
break;

View File

@ -47,11 +47,11 @@ public:
/**
* @brief Checkpoint task completion callback
* @param checkpointId The checkpoint's id.
* @param tableId The table's id.
* @param table The table's pointer.
* @param numSegs number of segments written.
* @param success Indicates a success or a failure.
*/
virtual void TaskDone(uint32_t tableId, uint32_t numSegs, bool success) = 0;
virtual void TaskDone(Table* table, uint32_t numSegs, bool success) = 0;
/**
* @brief Checks if the thread should terminate it work
@ -77,7 +77,7 @@ public:
*/
class CheckpointWorkerPool {
public:
CheckpointWorkerPool(int n, bool b, std::list<uint32_t>& l, uint32_t s, uint64_t id, CheckpointManagerCallbacks& m)
CheckpointWorkerPool(int n, bool b, std::list<Table*>& l, uint32_t s, uint64_t id, CheckpointManagerCallbacks& m)
: m_numWorkers(n), m_tasksList(l), m_checkpointId(id), m_na(b), m_cpManager(m), m_checkpointSegsize(s)
{
Start();
@ -117,10 +117,10 @@ private:
int Checkpoint(Buffer* buffer, Sentinel* sentinel, int fd, int tid);
/**
* @brief Pops a task (table id) from the tasks queue.
* @return true if a task was fetched, false if the queue was empty.
* @brief Pops a task (table pointer) from the tasks queue.
* @return the address of the pop'd table, or nullptr if the queue was empty.
*/
bool GetTask(uint32_t& task);
Table* GetTask();
/**
* @brief Creates a checkpoint id for the current checkpoint
@ -153,8 +153,8 @@ private:
volatile std::atomic<uint32_t> m_numWorkers;
// Holds table IDs to checkpoint
std::list<uint32_t>& m_tasksList;
// Holds tables to checkpoint
std::list<Table*>& m_tasksList;
// Guards tasksList pops
std::mutex m_tasksLock;

View File

@ -38,7 +38,7 @@ DECLARE_LOGGER(SessionManager, System)
static void PrintActiveSession(SessionId sessionId, SessionContext* sessionContext)
{
MOT_LOG_ERROR("Still active session: %u", sessionId);
MOT_LOG_WARN("Still active session: %u", sessionId);
}
bool SessionManager::Initialize(uint32_t nodeCount, uint32_t threadCount)
@ -359,7 +359,7 @@ void SessionManager::DestroySessionContext(SessionContext* sessionContext)
void SessionManager::ReportActiveSessions()
{
if (!m_sessionContextMap.empty()) {
MOT_LOG_PANIC("Attempting to Destroy MOT Engine while there are still %u active sessions",
MOT_LOG_WARN("Attempting to Destroy MOT Engine while there are still %u active sessions",
(unsigned)m_sessionContextMap.size());
m_sessionContextMap.for_each(PrintActiveSession);
}

View File

@ -25,7 +25,7 @@
#include "table_manager.h"
namespace MOT {
IMPLEMENT_CLASS_LOGGER(TableManager, System)
IMPLEMENT_CLASS_LOGGER(TableManager, System);
bool TableManager::AddTable(Table* table)
{
@ -33,7 +33,6 @@ bool TableManager::AddTable(Table* table)
"Adding table %s with external id: %" PRIu64, table->GetLongTableName().c_str(), table->GetTableExId());
m_rwLock.WrLock();
InternalTableMap::iterator it = m_tablesById.find(table->GetTableId());
if (it != m_tablesById.end()) {
m_rwLock.WrUnlock();
MOT_LOG_ERROR(
@ -54,7 +53,7 @@ void TableManager::ClearTablesThreadMemoryCache()
InternalTableMap::iterator it = m_tablesById.begin();
while (it != m_tablesById.end()) {
// lock table to prevent concurrent ddl and truncate operations
it->second->Lock();
it->second->RdLock();
it->second->ClearThreadMemoryCache();
it->second->Unlock();
it++;

View File

@ -63,11 +63,13 @@ public:
*/
inline RC DropTable(Table* table, SessionContext* sessionContext)
{
if (table == nullptr) {
return RC_ERROR;
}
MOT_LOG_INFO("Dropping table %s", table->GetLongTableName().c_str());
RC status = DropTableInternal(table, sessionContext);
if (table != nullptr) {
delete table;
}
delete table;
return status;
}
@ -82,7 +84,6 @@ public:
Table* table = nullptr;
m_rwLock.RdLock();
InternalTableMap::iterator it = m_tablesById.find(tableId);
if (it != m_tablesById.end())
table = it->second;
@ -93,20 +94,19 @@ public:
/**
* @brief Retrieves a locked table from the engine. Caller is responsible for unlocking the table when done using
* it, by calling @ref Table::Unlock.
* @param tableId The internal (engine-given) identifier of the table to retrieve.
* @param tableId The external (envelope) identifier of the table to retrieve.
* @return The table object or null pointer if not found.
* @note It is assumed that the envelope guards against concurrent removal of the table.
*/
inline Table* GetTableSafe(InternalTableId tableId)
inline Table* GetTableSafeByExId(ExternalTableId tableId)
{
Table* table = nullptr;
m_rwLock.RdLock();
InternalTableMap::iterator it = m_tablesById.find(tableId);
if (it != m_tablesById.end()) {
ExternalTableMap::iterator it = m_tablesByExId.find(tableId);
if (it != m_tablesByExId.end()) {
table = it->second;
if (table != nullptr) {
table->Lock();
table->RdLock();
}
}
m_rwLock.RdUnlock();
@ -124,10 +124,10 @@ public:
InternalTableId internalId, ExternalTableId externalId, const std::string& name, const std::string& longName)
{
bool ret = false;
Table* table = GetTableSafe(internalId);
Table* table = GetTableSafeByExId(externalId);
if (table != nullptr) {
ret = ((table->GetTableName() == name) && (table->GetLongTableName() == longName) &&
(table->GetTableExId() == externalId));
(table->GetTableId() == internalId));
table->Unlock();
}
return ret;
@ -179,18 +179,20 @@ public:
}
/**
* @brief Copies the internal identifiers of all tables into a list.
* @param[out] idQueue Receives all the table identifiers.
* @return The number of table identifiers copied.
* @brief Adds the pointers of all tables into a list.
* @param[out] tablesQueue Receives all the tables.
* @return The number of tables added.
*/
inline uint32_t AddTableIdsToList(std::list<uint32_t>& idQueue)
inline uint32_t AddTablesToList(std::list<Table*>& tablesQueue)
{
m_rwLock.RdLock();
for (InternalTableMap::iterator it = m_tablesById.begin(); it != m_tablesById.end(); ++it) {
idQueue.push_back(it->first);
tablesQueue.push_back(it->second);
// lock the table, so it won't get deleted/truncated
it->second->RdLock();
}
m_rwLock.RdUnlock();
return (uint32_t)idQueue.size();
return (uint32_t)tablesQueue.size();
}
/** @brief Clears all object-pool table caches for the current thread. */

View File

@ -50,7 +50,6 @@ constexpr const char* MOTConfiguration::DEFAULT_CHECKPOINT_DIR;
constexpr const char* MOTConfiguration::DEFAULT_CHECKPOINT_SEGSIZE;
constexpr uint32_t MOTConfiguration::DEFAULT_CHECKPOINT_SEGSIZE_BYTES;
constexpr uint32_t MOTConfiguration::DEFAULT_CHECKPOINT_WORKERS;
constexpr bool MOTConfiguration::DEFAULT_VALIDATE_CHECKPOINT;
// recovery configuration members
constexpr uint32_t MOTConfiguration::DEFAULT_CHECKPOINT_RECOVERY_WORKERS;
constexpr bool MOTConfiguration::DEFAULT_ENABLE_LOG_RECOVERY_STATS;
@ -390,7 +389,6 @@ MOTConfiguration::MOTConfiguration()
m_checkpointDir(DEFAULT_CHECKPOINT_DIR),
m_checkpointSegThreshold(DEFAULT_CHECKPOINT_SEGSIZE_BYTES),
m_checkpointWorkers(DEFAULT_CHECKPOINT_WORKERS),
m_validateCheckpoint(DEFAULT_VALIDATE_CHECKPOINT),
m_checkpointRecoveryWorkers(DEFAULT_CHECKPOINT_RECOVERY_WORKERS),
m_abortBufferEnable(true),
m_preAbort(true),
@ -485,7 +483,6 @@ bool MOTConfiguration::SetFlag(const std::string& name, const std::string& value
} else if (ParseString(name, "checkpoint_dir", value, &m_checkpointDir)) {
} else if (ParseUint32(name, "checkpoint_segsize", value, &m_checkpointSegThreshold)) {
} else if (ParseUint32(name, "checkpoint_workers", value, &m_checkpointWorkers)) {
} else if (ParseBool(name, "validate_checkpoint", value, &m_validateCheckpoint)) {
} else if (ParseUint32(name, "checkpoint_recovery_workers", value, &m_checkpointRecoveryWorkers)) {
} else if (ParseBool(name, "abort_buffer_enable", value, &m_abortBufferEnable)) {
} else if (ParseBool(name, "pre_abort", value, &m_preAbort)) {
@ -647,7 +644,6 @@ void MOTConfiguration::LoadConfig()
UPDATE_STRING_CFG(m_checkpointDir, "checkpoint_dir", DEFAULT_CHECKPOINT_DIR);
UPDATE_MEM_CFG(m_checkpointSegThreshold, "checkpoint_segsize", DEFAULT_CHECKPOINT_SEGSIZE, 1);
UPDATE_INT_CFG(m_checkpointWorkers, "checkpoint_workers", DEFAULT_CHECKPOINT_WORKERS);
UPDATE_CFG(m_validateCheckpoint, "validate_checkpoint", DEFAULT_VALIDATE_CHECKPOINT);
// Recovery configuration
UPDATE_INT_CFG(m_checkpointRecoveryWorkers, "checkpoint_recovery_workers", DEFAULT_CHECKPOINT_RECOVERY_WORKERS);

View File

@ -153,9 +153,6 @@ public:
/** @var number of worker threads to spawn to perform checkpoint. */
uint32_t m_checkpointWorkers;
/** @var Do checkpoints bit validations - use it for debugging only */
bool m_validateCheckpoint;
/**********************************************************************/
// Recovery configuration
/**********************************************************************/
@ -430,9 +427,6 @@ private:
/** @var Default number of worker threads to spawn */
static constexpr uint32_t DEFAULT_CHECKPOINT_WORKERS = 3;
/** @var Default enable checkpoint validation. */
static constexpr bool DEFAULT_VALIDATE_CHECKPOINT = false;
// default recovery configuration
/** @var Default number of workers used in recovery from checkpoint. */
static constexpr uint32_t DEFAULT_CHECKPOINT_RECOVERY_WORKERS = 3;

View File

@ -367,11 +367,6 @@ bool MOTEngine::InitializeCoreServices()
CHECK_INIT_STATUS(result, "Failed to Initialize garbage collection sub-system");
m_initCoreStack.push(INIT_GC_PHASE);
int rc = pthread_mutex_init(&m_DDLCheckpointGuard, nullptr);
result = (rc == 0);
CHECK_SYS_INIT_STATUS(rc, pthread_mutex_init, "Failed to Initialize global DDL lock");
m_initCoreStack.push(INIT_DDL_LOCK_PHASE);
result = InitializeDebugUtils();
CHECK_INIT_STATUS(result, "Failed to Initialize debug utilities");
m_initCoreStack.push(INIT_DEBUG_UTILS);
@ -454,10 +449,6 @@ void MOTEngine::DestroyCoreServices()
DestroyDebugUtils();
break;
case INIT_DDL_LOCK_PHASE:
pthread_mutex_destroy(&m_DDLCheckpointGuard);
break;
case INIT_GC_PHASE:
break;
@ -771,6 +762,13 @@ bool MOTEngine::InitializeCheckpointManager()
return false;
}
if (!m_checkpointManager->Initialize()) {
MOT_REPORT_ERROR(MOT_ERROR_OOM, "MOT Engine Startup", "Failed to initialize checkpoint manager");
delete m_checkpointManager;
m_checkpointManager = nullptr;
return false;
}
MOT_LOG_INFO("Startup: Checkpoint manager initialized successfully");
return true;
}

View File

@ -272,21 +272,6 @@ public:
return nullptr;
}
int LockDDLForCheckpoint()
{
return pthread_mutex_lock(&m_DDLCheckpointGuard);
}
int TryLockDDLForCheckpoint()
{
return pthread_mutex_trylock(&m_DDLCheckpointGuard);
}
int UnlockDDLForCheckpoint()
{
return pthread_mutex_unlock(&m_DDLCheckpointGuard);
}
/**
* @brief Order the engine to write all of its redo log records to the log file.
*/
@ -524,9 +509,6 @@ private:
/** @var The commit sequence number handler (CSN). */
CSNManager m_csnManager;
/** ddl <> checkpoint sync */
pthread_mutex_t m_DDLCheckpointGuard;
/** @var Global flag for soft memory limit. */
uint32_t m_softMemoryLimitReached;
@ -575,7 +557,6 @@ private:
INIT_TABLE_MANAGER_PHASE,
INIT_SURROGATE_KEY_MANAGER_PHASE,
INIT_GC_PHASE,
INIT_DDL_LOCK_PHASE,
INIT_DEBUG_UTILS,
INIT_CORE_DONE
};

View File

@ -570,7 +570,7 @@ void TxnManager::WriteDDLChanges()
case DDL_ACCESS_TRUNCATE_TABLE:
indexes = (Index**)ddl_access->GetEntry();
table = indexes[0]->GetTable();
table->Lock();
table->WrLock();
table->m_rowCount = 0;
for (int i = 0; i < table->GetNumIndexes(); i++) {
index = indexes[i];
@ -589,7 +589,7 @@ void TxnManager::WriteDDLChanges()
if (index->IsPrimaryKey())
break;
table = index->GetTable();
table->Lock();
table->WrLock();
table->RemoveSecondaryIndex((char*)index->GetName().c_str(), this);
table->Unlock();
break;
@ -1232,7 +1232,7 @@ RC TxnManager::CreateIndex(Table* table, Index* index, bool is_primary)
// is should only be added on successful commit. Assuming that if
// a client did a create index, all other clients are waiting on a lock
// until the changes are either commited or aborted
table->Lock(); // for concurrent access
table->WrLock(); // for concurrent access
if (table->GetNumIndexes() == MAX_NUM_INDEXES) {
table->Unlock();
MOT_REPORT_ERROR(MOT_ERROR_RESOURCE_LIMIT,

View File

@ -532,10 +532,10 @@ private:
GcManager* m_gcSession;
/** @var Checkpoint phase captured during transaction start. */
CheckpointPhase m_checkpointPhase;
volatile CheckpointPhase m_checkpointPhase;
/** @var Checkpoint not available capture during being transaction. */
bool m_checkpointNABit;
volatile bool m_checkpointNABit;
/** @var CSN taken at the commit stage. */
uint64_t m_csn;

View File

@ -76,7 +76,7 @@ std::string HexStr(const uint8_t* data, uint16_t len);
#define HIGH_NIBBLE(byte) (((byte) >> 4) & 0x0F)
/** @define Low nibble of a byte. */
#define LOW_NIBBLE(byte) ((byte)&0x0F)
#define LOW_NIBBLE(byte) ((byte) & 0x0F)
/** @define Compile-time conversion of identifier to string literal. */
#define stringify(name) #name

View File

@ -1497,7 +1497,7 @@ static MOT::RC TableFieldType(const ColumnDef* colDef, MOT::MOT_CATALOG_FIELD_TY
MOT::RC MOTAdaptor::CreateIndex(IndexStmt* index, ::TransactionId tid)
{
MOT::RC rc = MOT::RC_OK;
MOT::RC res;
EnsureSafeThreadAccessInline();
MOT::TxnManager* txn = GetSafeTxn();
txn->SetTransactionId(tid);
@ -1508,7 +1508,7 @@ MOT::RC MOTAdaptor::CreateIndex(IndexStmt* index, ::TransactionId tid)
(errmodule(MOD_MM),
errcode(ERRCODE_UNDEFINED_TABLE),
errmsg("Table not found for oid %u", index->relation->foreignOid)));
return MOT::RC_OK;
return MOT::RC_ERROR;
}
if (table->GetNumIndexes() == MAX_NUM_INDEXES) {
@ -1516,7 +1516,7 @@ MOT::RC MOTAdaptor::CreateIndex(IndexStmt* index, ::TransactionId tid)
(errmodule(MOD_MM),
errcode(ERRCODE_FDW_TOO_MANY_INDEXES),
errmsg("Can not create index, max number of indexes %u reached", MAX_NUM_INDEXES)));
return MOT::RC_OK;
return MOT::RC_ERROR;
}
elog(LOG,
@ -1537,7 +1537,7 @@ MOT::RC MOTAdaptor::CreateIndex(IndexStmt* index, ::TransactionId tid)
flavor = MOT::GetGlobalConfiguration().m_indexTreeFlavor;
} else {
ereport(ERROR, (errmodule(MOD_MM), errmsg("MOT supports indexes of type BTREE only (btree or btree_art)")));
return MOT::RC_OK;
return MOT::RC_ERROR;
}
if (list_length(index->indexParams) > (int)MAX_KEY_COLUMNS) {
@ -1547,7 +1547,7 @@ MOT::RC MOTAdaptor::CreateIndex(IndexStmt* index, ::TransactionId tid)
errmsg("Can't create index"),
errdetail(
"Number of columns exceeds %d max allowed %u", list_length(index->indexParams), MAX_KEY_COLUMNS)));
return MOT::RC_OK;
return MOT::RC_ERROR;
}
// check if we have primary and delete previous definition
@ -1576,7 +1576,7 @@ MOT::RC MOTAdaptor::CreateIndex(IndexStmt* index, ::TransactionId tid)
errcode(ERRCODE_INVALID_COLUMN_DEFINITION),
errmsg("Can't create index on field"),
errdetail("Specified column not found in table definition")));
return MOT::RC_OK;
return MOT::RC_ERROR;
}
MOT::Column* col = table->GetField(colid);
@ -1589,7 +1589,7 @@ MOT::RC MOTAdaptor::CreateIndex(IndexStmt* index, ::TransactionId tid)
errcode(ERRCODE_FDW_INDEX_ON_NULLABLE_COLUMN_NOT_ALLOWED),
errmsg("Can't create index on nullable columns"),
errdetail("Column %s is nullable", col->m_name)));
return MOT::RC_OK;
return MOT::RC_ERROR;
}
// Temp solution, we have to support DECIMAL and NUMERIC indexes as well
@ -1600,7 +1600,7 @@ MOT::RC MOTAdaptor::CreateIndex(IndexStmt* index, ::TransactionId tid)
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Can't create index on field"),
errdetail("INDEX on NUMERIC or DECIMAL fields not supported yet")));
return MOT::RC_OK;
return MOT::RC_ERROR;
}
if (col->m_keySize > MAX_KEY_SIZE) {
delete ix;
@ -1609,7 +1609,7 @@ MOT::RC MOTAdaptor::CreateIndex(IndexStmt* index, ::TransactionId tid)
errcode(ERRCODE_INVALID_COLUMN_DEFINITION),
errmsg("Can't create index on field"),
errdetail("Column size is greater than maximum index size")));
return MOT::RC_OK;
return MOT::RC_ERROR;
}
keyLength += col->m_keySize;
@ -1619,13 +1619,13 @@ MOT::RC MOTAdaptor::CreateIndex(IndexStmt* index, ::TransactionId tid)
ix->SetNumIndexFields(count);
if ((rc = ix->IndexInit(keyLength, index->unique, index->idxname, nullptr)) != MOT::RC_OK) {
if ((res = ix->IndexInit(keyLength, index->unique, index->idxname, nullptr)) != MOT::RC_OK) {
delete ix;
report_pg_error(rc, txn);
return rc;
report_pg_error(res, txn);
return res;
}
MOT::RC res = txn->CreateIndex(table, ix, index->primary);
res = txn->CreateIndex(table, ix, index->primary);
if (res != MOT::RC_OK) {
delete ix;
if (res == MOT::RC_TABLE_EXCEEDS_MAX_INDEXES) {
@ -1670,10 +1670,13 @@ MOT::RC MOTAdaptor::CreateTable(CreateForeignTableStmt* table, ::TransactionId t
// prepare table name
dbname = get_database_name(u_sess->proc_cxt.MyDatabaseId);
if (dbname == nullptr) {
delete currentTable;
currentTable = nullptr;
ereport(ERROR,
(errmodule(MOD_MM),
errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database with OID %u does not exist", u_sess->proc_cxt.MyDatabaseId)));
break;
}
tname.append(dbname);
tname.append("_");
@ -1686,11 +1689,23 @@ MOT::RC MOTAdaptor::CreateTable(CreateForeignTableStmt* table, ::TransactionId t
tname.append("_");
tname.append(table->base.relation->relname);
currentTable->Init(table->base.relation->relname, tname.c_str(), columnCount, table->base.relation->foreignOid);
if (!currentTable->Init(
table->base.relation->relname, tname.c_str(), columnCount, table->base.relation->foreignOid)) {
delete currentTable;
currentTable = nullptr;
report_pg_error(MOT::RC_MEMORY_ALLOCATION_ERROR, txn);
break;
}
// the null fields are copied verbatim because we have to give them back at some point
currentTable->AddColumn(
res = currentTable->AddColumn(
"null_bytes", BITMAPLEN(columnCount - 1), MOT::MOT_CATALOG_FIELD_TYPES::MOT_TYPE_NULLBYTES);
if (res != MOT::RC_OK) {
delete currentTable;
currentTable = nullptr;
report_pg_error(MOT::RC_MEMORY_ALLOCATION_ERROR, txn);
break;
}
ListCell* cell;
@ -1701,6 +1716,8 @@ MOT::RC MOTAdaptor::CreateTable(CreateForeignTableStmt* table, ::TransactionId t
ColumnDef* colDef = (ColumnDef*)lfirst(cell);
if (colDef == nullptr || colDef->typname == nullptr) {
delete currentTable;
currentTable = nullptr;
ereport(ERROR,
(errmodule(MOD_MM),
errcode(ERRCODE_INVALID_COLUMN_DEFINITION),
@ -1711,6 +1728,8 @@ MOT::RC MOTAdaptor::CreateTable(CreateForeignTableStmt* table, ::TransactionId t
res = TableFieldType(colDef, colType, &typeLen, isBlob);
if (res != MOT::RC_OK) {
delete currentTable;
currentTable = nullptr;
report_pg_error(res, txn, colDef, (void*)(int64)typeLen);
break;
}
@ -1763,6 +1782,8 @@ MOT::RC MOTAdaptor::CreateTable(CreateForeignTableStmt* table, ::TransactionId t
}
res = currentTable->AddColumn(colDef->colname, typeLen, colType, colDef->is_not_null);
if (res != MOT::RC_OK) {
delete currentTable;
currentTable = nullptr;
report_pg_error(res, txn, colDef, (void*)(int64)typeLen);
break;
}
@ -1776,6 +1797,8 @@ MOT::RC MOTAdaptor::CreateTable(CreateForeignTableStmt* table, ::TransactionId t
uint32_t tupleSize = currentTable->GetTupleSize();
if (tupleSize > (unsigned int)MAX_TUPLE_SIZE) {
delete currentTable;
currentTable = nullptr;
ereport(ERROR,
(errmodule(MOD_MM),
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
@ -1787,6 +1810,8 @@ MOT::RC MOTAdaptor::CreateTable(CreateForeignTableStmt* table, ::TransactionId t
}
if (!currentTable->InitRowPool()) {
delete currentTable;
currentTable = nullptr;
report_pg_error(MOT::RC_MEMORY_ALLOCATION_ERROR, txn);
break;
}
@ -1806,6 +1831,8 @@ MOT::RC MOTAdaptor::CreateTable(CreateForeignTableStmt* table, ::TransactionId t
rc,
nullptr);
if (rc != MOT::RC_OK) {
delete currentTable;
currentTable = nullptr;
report_pg_error(rc, txn);
break;
}
@ -1891,11 +1918,6 @@ MOT::RC MOTAdaptor::TruncateTable(Relation rel, ::TransactionId tid)
EnsureSafeThreadAccessInline();
if (DdlCpTryLock() != 0) {
elog(LOG, "Cannot perform: truncateTable, a checkpoint is in progress");
return MOT::RC_NA;
}
MOT::TxnManager* txn = GetSafeTxn();
txn->SetTransactionId(tid);
@ -1907,10 +1929,11 @@ MOT::RC MOTAdaptor::TruncateTable(Relation rel, ::TransactionId tid)
break;
}
tab->WrLock();
res = txn->TruncateTable(tab);
tab->Unlock();
} while (0);
DdlCpUnlock();
return res;
}
@ -1919,23 +1942,20 @@ MOT::RC MOTAdaptor::VacuumTable(Relation rel, ::TransactionId tid)
MOT::RC res = MOT::RC_OK;
MOT::Table* tab = nullptr;
EnsureSafeThreadAccessInline();
MOT::MOTEngine::GetInstance()->LockDDLForCheckpoint();
MOT::TxnManager* txn = GetSafeTxn();
txn->SetTransactionId(tid);
elog(LOG, "vacuuming table %s, oid: %u", NameStr(rel->rd_rel->relname), rel->rd_id);
do {
tab = txn->GetTableByExternalId(rel->rd_id);
tab = MOT::GetTableManager()->GetTableSafeByExId(rel->rd_id);
if (tab == nullptr) {
elog(LOG, "Vacuum table %s error, table oid %u not found.", NameStr(rel->rd_rel->relname), rel->rd_id);
break;
}
tab->Lock();
tab->Compact(txn);
tab->Unlock();
} while (0);
MOT::MOTEngine::GetInstance()->UnlockDDLForCheckpoint();
return res;
}
@ -2436,25 +2456,6 @@ void MOTAdaptor::DatumToMOTKey(
}
}
// ddl <> checkpoint sync
int MOTAdaptor::DdlCpTryLock()
{
EnsureSafeThreadAccessInline();
if (m_engine != nullptr) {
return m_engine->TryLockDDLForCheckpoint();
}
return 0;
}
int MOTAdaptor::DdlCpUnlock()
{
EnsureSafeThreadAccessInline();
if (m_engine != nullptr) {
return m_engine->UnlockDDLForCheckpoint();
}
return 0;
}
bool MatchIndex::IsSameOper(KEY_OPER op1, KEY_OPER op2) const
{
bool res = true;

View File

@ -402,10 +402,6 @@ public:
MOTFdwStateSt* festate, MatchIndexArr* marr, int numClauses, bool setLocal = true);
inline static int32_t AddParam(List** params, Expr* expr);
// ddl <> checkpoint sync
static int DdlCpTryLock();
static int DdlCpUnlock();
static MOT::MOTEngine* m_engine;
static bool m_initialized;
static bool m_callbacks_initialized;