Fix reclamation in MOT ckpt

This commit is contained in:
Vinoth Veeraraghavan
2020-10-13 15:38:50 +08:00
parent 399a957141
commit 81ceea7497
12 changed files with 178 additions and 34 deletions

View File

@ -373,7 +373,9 @@ bool OccTransactionManager::WriteChanges(TxnManager* txMan)
access->m_origSentinel->SetNextPtr(access->m_auxRow->GetPrimarySentinel());
}
// upgrade should not change the reference count!
access->m_origSentinel->SetUpgradeCounter();
if (access->m_origSentinel->IsCommited()) {
access->m_origSentinel->SetUpgradeCounter();
}
}
}
}
@ -403,6 +405,8 @@ bool OccTransactionManager::WriteChanges(TxnManager* txMan)
}
}
CleanRowsFromIndexes(txMan);
if (cfg.m_enableCheckpoint) {
GetCheckpointManager()->CommitTransaction(txMan, m_rowsSetSize);
}

View File

@ -134,11 +134,7 @@ void RowHeader::WriteChangesToRow(const Access* access, uint64_t csn)
case DEL:
MOT_ASSERT(access->m_origSentinel->IsCommited() == true);
if (access->m_params.IsPrimarySentinel()) {
if (row->GetPrimarySentinel()->GetStable() == nullptr) {
m_csnWord = (csn | LOCK_BIT | ABSENT_BIT | LATEST_VER_BIT);
} else {
m_csnWord = (csn | LOCK_BIT | ABSENT_BIT); // if has stable, avoid destroying the row
}
m_csnWord = (csn | LOCK_BIT | ABSENT_BIT | LATEST_VER_BIT);
// and allow reuse of the original row
}
// Invalidate sentinel - row is still locked!

View File

@ -192,12 +192,12 @@ private:
}
/**
* @brief Queries whether the row is valid.
* @return True if the absent bit and the latet vsersion are not set for the row.
* @brief Queries whether the row is deleted.
* @return True if the absent bit and the latet vsersion are set for the row.
*/
bool IsRowValid() const
bool IsRowDeleted() const
{
return (m_csnWord & (ABSENT_BIT | LATEST_VER_BIT)) == 0;
return (m_csnWord & (ABSENT_BIT | LATEST_VER_BIT)) == (ABSENT_BIT | LATEST_VER_BIT);
}
/** @brief Sets the absent bit in the row. */

View File

@ -40,7 +40,7 @@ GcLock g_gcGlobalEpochLock;
GcManager* GcManager::allGcManagers = nullptr;
inline GcManager::GcManager(int purpose, int getThreadId, int rcuMaxFreeCount)
inline GcManager::GcManager(GC_TYPE purpose, int getThreadId, int rcuMaxFreeCount)
: m_rcuFreeCount(rcuMaxFreeCount), m_tid(getThreadId), m_purpose(purpose)
{}
@ -80,7 +80,7 @@ bool GcManager::Initialize()
return result;
}
GcManager* GcManager::Make(int purpose, int threadId, int rcuMaxFreeCount)
GcManager* GcManager::Make(GC_TYPE purpose, int threadId, int rcuMaxFreeCount)
{
GcManager* gc = nullptr;
@ -247,7 +247,11 @@ void GcManager::CleanIndexItems(uint32_t indexId, bool dropIndex)
}
m_managerLock.unlock();
if (counter) {
MOT_LOG_INFO("threadId = %d cleaned from index id = %d items = %d\n", m_tid, indexId, counter);
MOT_LOG_INFO("Entity:%s threadId = %d cleaned from index id = %d items = %d\n",
enGcTypes[m_purpose],
m_tid,
indexId,
counter);
}
}

View File

@ -134,6 +134,9 @@ struct LimboGroup {
inline unsigned CleanIndexItemPerGroup(GcManager& ti, uint32_t indexId, bool dropIndex);
};
static const char* const enGcTypes[] = {
stringify(GC_MAIN), stringify(GC_INDEX), stringify(GC_LOG), stringify(GC_CHECKPOINT)};
/**
* @class GcManager
* @brief Garbage-collector manager per-session
@ -180,7 +183,7 @@ public:
* @param rcuMaxFreeCount How many objects to reclaim
* @return Pointer to instance of a GC manager
*/
static GcManager* Make(int purpose, int threadId, int rcuMaxFreeCount = MASSTREE_OBJECT_COUNT_PER_CLEANUP);
static GcManager* Make(GC_TYPE purpose, int threadId, int rcuMaxFreeCount = MASSTREE_OBJECT_COUNT_PER_CLEANUP);
/**
* @brief Add Current manager to the global list
@ -210,6 +213,21 @@ public:
return m_tid;
}
void SetGcType(GC_TYPE type)
{
m_purpose = type;
}
const char* GetGcTypeStr()
{
return enGcTypes[m_purpose];
}
GC_TYPE GetGcType()
{
return m_purpose;
}
void GcStartTxnMTtests()
{
if (m_gcEpoch != GetGlobalEpoch())
@ -307,7 +325,37 @@ public:
m_managerLock.lock();
ShrinkMem();
m_managerLock.unlock();
MOT_LOG_DEBUG("THD_ID:%d closed session cleaned %d elements from limbo!\n", m_tid, inuseElements);
MOT_LOG_DEBUG("Entity:%s THD_ID:%d closed session cleaned %d elements from limbo!\n",
enGcTypes[m_purpose],
m_tid,
inuseElements);
}
/** @brief Clean all object at the end of the session */
inline void GcCheckPointClean()
{
if (m_isGcEnabled == false) {
return;
}
// End CP Txn
m_gcEpoch = 0;
m_isTxnStarted = false;
// Increase the global epoch to insure all elements are from a lower epoch
SetGlobalEpoch(GetGlobalEpoch() + 1);
m_managerLock.lock();
HardQuiesce(m_totalLimboInuseElements);
m_managerLock.unlock();
#ifdef MOT_DEBUG
uint32_t inuseElements = m_totalLimboInuseElements;
uint32_t cleandObjects = inuseElements - m_totalLimboInuseElements;
if (cleandObjects) {
MOT_LOG_INFO(
"Entity:%s THD_ID:%d cleaned %d elements from limbo!\n", enGcTypes[m_purpose], m_tid, (cleandObjects));
}
#endif
}
/**
@ -455,7 +503,7 @@ private:
uint16_t m_tid;
/** @var GC manager type */
uint8_t m_purpose;
GC_TYPE m_purpose;
/** @brief Calculate the minimum epoch among all active GC Managers.
* @return The minimum epoch among all active GC Managers.
@ -468,7 +516,7 @@ private:
}
/** @brief Constructor */
inline GcManager(int purpose, int index, int rcuMaxFreeCount);
inline GcManager(GC_TYPE purpose, int index, int rcuMaxFreeCount);
/** @brief Initialize GC Manager's structures.
* @return True for success

View File

@ -222,9 +222,9 @@ public:
* @brief Queries the validity of the row header.
* @return Boolean value denoting whether the row is valid or not.
*/
inline bool IsRowValid() const
inline bool IsRowDeleted() const
{
return m_rowHeader.IsRowValid();
return m_rowHeader.IsRowDeleted();
}
/**

View File

@ -323,13 +323,14 @@ private:
Index* m_index = nullptr;
/** @var m_stable A Container for the row and its status bit */
uint64_t m_stable = 0;
volatile uint64_t m_stable = 0;
/** @var m_refCount A counter of concurrent inserters of the same key */
volatile uint32_t m_refCount = 0;
inline void DecCounter()
{
MOT_ASSERT(GetCounter() > 0);
uint32_t counter = GetCounter() - 1;
m_refCount = (m_refCount & ~S_COUNTER_MASK) | (counter & S_COUNTER_MASK);
}

View File

@ -582,6 +582,16 @@ Row* Table::RemoveKeyFromIndex(Row* row, Sentinel* sentinel, uint64_t tid, GcMan
if (likely(gc != nullptr)) {
if (ix->GetIndexOrder() == IndexOrder::INDEX_ORDER_PRIMARY) {
OutputRow = currSentinel->GetData();
if (currSentinel->GetStable()) {
#ifdef MOT_DEBUG
if (gc->GetGcType() != GcManager::GC_CHECKPOINT) {
MOT_LOG_ERROR("ERROR!!!! Txn Delete with a stable ROW Type = %c\n", gc->GetGcTypeStr());
MOT_ASSERT(false);
}
#endif
gc->GcRecordObject(
ix->GetIndexId(), currSentinel->GetStable(), nullptr, Row::RowDtor, ROW_SIZE_FROM_POOL(this));
}
gc->GcRecordObject(ix->GetIndexId(), currSentinel, nullptr, Index::SentinelDtor, SENTINEL_SIZE);
gc->GcRecordObject(ix->GetIndexId(), OutputRow, nullptr, Row::RowDtor, ROW_SIZE_FROM_POOL(this));
} else {

View File

@ -290,6 +290,31 @@ public:
*/
void Compact(TxnManager* txn);
/**
* @brief Count number of absent sentinels in a table
* @param none
*/
#ifdef MOT_DEBUG
void CountAbsents()
{
uint64_t absentCounter = 0;
MOT_LOG_INFO("Testing table %s \n", GetTableName().c_str());
for (uint16_t i = 0; i < m_numIndexes; i++) {
Index* index = GetIndex(i);
IndexIterator* it = index->Begin(0);
while (it->IsValid()) {
Sentinel* Sentinel = it->GetPrimarySentinel();
if (Sentinel->IsDirty()) {
absentCounter++;
}
it->Next();
}
MOT_LOG_INFO("Found %lu Absents in index %s\n", absentCounter, index->GetName().c_str());
absentCounter = 0;
}
}
#endif
void ClearRowCache()
{
m_rowPool->ClearFreeCache();

View File

@ -123,10 +123,12 @@ bool CheckpointWorkerPool::Write(Buffer* buffer, Row* row, int fd)
return true;
}
int CheckpointWorkerPool::Checkpoint(Buffer* buffer, Sentinel* sentinel, int fd, int tid)
int CheckpointWorkerPool::Checkpoint(Buffer* buffer, Sentinel* sentinel, int fd, int tid, bool& isDeleted)
{
Row* mainRow = sentinel->GetData();
Row* stableRow = nullptr;
int wrote = 0;
isDeleted = false;
if (mainRow != nullptr) {
bool headerLocked = sentinel->TryLock(tid);
@ -138,11 +140,25 @@ int CheckpointWorkerPool::Checkpoint(Buffer* buffer, Sentinel* sentinel, int fd,
}
sentinel->Lock(tid);
}
stableRow = sentinel->GetStable();
if (mainRow->IsRowDeleted()) {
if (stableRow) {
// Truely deleted and was not removed by txn manager
isDeleted = true;
} else {
MOT_LOG_DEBUG("Detected Deleted row without Stable Row!");
}
}
} else {
return 0;
}
if (unlikely(stableRow == nullptr)) {
stableRow = sentinel->GetStable();
}
Row* stableRow = sentinel->GetStable();
bool deleted = !sentinel->IsCommited(); /* this currently indicates if the row is deleted or not */
bool statusBit = sentinel->GetStableStatus();
bool deleted = !sentinel->IsCommited(); /* this currently indicates if the row is deleted or not */
do {
if (statusBit == !m_na) { /* has stable version */
@ -152,8 +168,10 @@ int CheckpointWorkerPool::Checkpoint(Buffer* buffer, Sentinel* sentinel, int fd,
if (!Write(buffer, stableRow, fd)) {
wrote = -1;
} else {
CheckpointUtils::DestroyStableRow(stableRow);
sentinel->SetStable(nullptr);
if (isDeleted == false) {
CheckpointUtils::DestroyStableRow(stableRow);
sentinel->SetStable(nullptr);
}
wrote = 1;
}
break;
@ -202,6 +220,19 @@ Table* CheckpointWorkerPool::GetTask()
return table;
}
void CheckpointWorkerPool::ExecuteMicroGcTransaction(
Sentinel** deletedList, GcManager* gcSession, Table* table, uint16_t& deletedCounter, uint16_t limit)
{
if (deletedCounter == limit) {
gcSession->GcStartTxn();
for (uint16_t i = 0; i < limit; i++) {
Row* out = table->RemoveKeyFromIndex(deletedList[i]->GetData(), deletedList[i], 0, gcSession);
}
gcSession->GcCheckPointClean();
deletedCounter = 0;
}
}
void CheckpointWorkerPool::WorkerFunc()
{
MOT_DECLARE_NON_KERNEL_THREAD();
@ -215,12 +246,23 @@ void CheckpointWorkerPool::WorkerFunc()
return;
}
SessionContext* sessionContext = GetSessionManager()->CreateSessionContext();
GcManager* gcSession = sessionContext->GetTxnManager()->GetGcSession();
gcSession->SetGcType(GcManager::GC_CHECKPOINT);
int threadId = MOTCurrThreadId;
if (GetGlobalConfiguration().m_enableNuma && !GetTaskAffinity().SetAffinity(threadId)) {
MOT_LOG_WARN("Failed to set affinity for checkpoint worker, checkpoint performance may be affected");
}
Sentinel** deletedList = new (nothrow) Sentinel*[DELETE_LIST_SIZE];
uint16_t deletedListLocation = 0;
if (!deletedList) {
MOT_LOG_ERROR("CheckpointWorkerPool::workerFunc: Failed to initialize buffer");
m_cpManager.OnError(ErrCodes::MEMORY, "Memory allocation failure");
MOT::MOTEngine::GetInstance()->OnCurrentThreadEnding();
MOT_LOG_DEBUG("thread exiting");
return;
}
while (true) {
uint32_t tableId = 0;
uint64_t exId = 0;
@ -325,6 +367,7 @@ void CheckpointWorkerPool::WorkerFunc()
}
bool iterationSucceeded = true;
bool isDeleted = false;
while (it->IsValid()) {
MOT::Sentinel* Sentinel = it->GetPrimarySentinel();
MOT_ASSERT(Sentinel);
@ -333,8 +376,11 @@ void CheckpointWorkerPool::WorkerFunc()
it->Next();
continue;
}
int ckptStatus = Checkpoint(&buffer, Sentinel, fd, threadId);
int ckptStatus = Checkpoint(&buffer, Sentinel, fd, threadId, isDeleted);
if (isDeleted) {
deletedList[deletedListLocation++] = Sentinel;
ExecuteMicroGcTransaction(deletedList, gcSession, table, deletedListLocation, DELETE_LIST_SIZE);
}
if (ckptStatus == 1) {
numOps++;
curSegLen += table->GetTupleSize() + sizeof(CheckpointUtils::EntryHeader);
@ -429,6 +475,13 @@ void CheckpointWorkerPool::WorkerFunc()
MOT_LOG_ERROR("CheckpointWorkerPool::finishFile: failed to close file (id: %u)", tableId);
}
}
if (deletedListLocation > 0) {
MOT_LOG_DEBUG("Checkpoint worker Clean Leftovers Table %c deletedListLocation = %u\n",
table->GetTableName().c_str(),
deletedListLocation);
ExecuteMicroGcTransaction(deletedList, gcSession, table, deletedListLocation, deletedListLocation);
}
table->ClearThreadMemoryCache();
m_cpManager.TaskDone(table, seg, taskSucceeded);
@ -439,7 +492,7 @@ void CheckpointWorkerPool::WorkerFunc()
break;
}
}
delete[] deletedList;
GetSessionManager()->DestroySessionContext(sessionContext);
MOT::MOTEngine::GetInstance()->OnCurrentThreadEnding();
MOT_LOG_DEBUG("thread exiting");

View File

@ -89,6 +89,8 @@ public:
enum ErrCodes { NO_ERROR = 0, FILE_IO = 1, MEMORY = 2, TABLE = 3, INDEX = 4, CALC = 5 };
static constexpr uint16_t DELETE_LIST_SIZE = 1000;
private:
/**
* @brief The main worker function
@ -112,9 +114,10 @@ private:
* @param sentinel The sentinel that holds to row.
* @param fd The file descriptor to write to.
* @param tid The thread id.
* @param isDeleted The row delete status
* @return Int equal to -1 on error, 0 if nothing was written and 1 if the row was written.
*/
int Checkpoint(Buffer* buffer, Sentinel* sentinel, int fd, int tid);
int Checkpoint(Buffer* buffer, Sentinel* sentinel, int fd, int tid, bool& isDeleted);
/**
* @brief Pops a task (table pointer) from the tasks queue.
@ -148,6 +151,9 @@ private:
*/
bool FinishFile(int& fd, uint32_t tableId, uint64_t numOps, uint64_t exId);
void ExecuteMicroGcTransaction(
Sentinel** deletedList, GcManager* gcSession, Table* table, uint16_t& deletedCounter, uint16_t limit);
// Workers
void* m_workers;

View File

@ -248,7 +248,6 @@ RC TxnManager::CommitInternal(uint64_t csn)
if (!GetGlobalConfiguration().m_enableRedoLog ||
GetGlobalConfiguration().m_redoLogHandlerType == RedoLogHandlerType::ASYNC_REDO_LOG_HANDLER) {
m_occManager.ReleaseLocks(this);
m_occManager.CleanRowsFromIndexes(this);
}
return RC_OK;
}
@ -313,7 +312,6 @@ RC TxnManager::CommitPrepared(uint64_t transactionId)
if (!GetGlobalConfiguration().m_enableRedoLog ||
GetGlobalConfiguration().m_redoLogHandlerType == RedoLogHandlerType::ASYNC_REDO_LOG_HANDLER) {
m_occManager.ReleaseLocks(this);
m_occManager.CleanRowsFromIndexes(this);
}
MOT::DbSessionStatisticsProvider::GetInstance().AddCommitPreparedTxn();
return RC_OK;
@ -340,7 +338,6 @@ RC TxnManager::EndTransaction()
GetGlobalConfiguration().m_redoLogHandlerType != RedoLogHandlerType::ASYNC_REDO_LOG_HANDLER &&
IsFailedCommitPrepared() == false) {
m_occManager.ReleaseLocks(this);
m_occManager.CleanRowsFromIndexes(this);
}
CleanDDLChanges();
Cleanup();
@ -638,7 +635,7 @@ Row* TxnManager::RemoveKeyFromIndex(Row* row, Sentinel* sentinel)
Table* table = row->GetTable();
Row* outputRow = nullptr;
if (row->GetStable() == nullptr) {
if (sentinel->GetStable() == nullptr) {
outputRow = table->RemoveKeyFromIndex(row, sentinel, m_threadId, GetGcSession());
} else {
// Checkpoint works on primary-sentinel only!