!1746 Masstree OOM feature
Merge pull request !1746 from Vinoth Veeraraghavan/master
This commit is contained in:
@ -238,16 +238,20 @@ public:
|
||||
return m_purpose;
|
||||
}
|
||||
|
||||
void GcStartTxnMTtests()
|
||||
GcEpochType GcStartInnerTxn()
|
||||
{
|
||||
if (m_gcEpoch != GetGlobalEpoch())
|
||||
m_gcEpoch = GetGlobalEpoch();
|
||||
|
||||
return m_gcEpoch;
|
||||
}
|
||||
|
||||
void GcEndTxnMTtests()
|
||||
void GcEndInnerTxn(bool clean_gc)
|
||||
{
|
||||
if (clean_gc) {
|
||||
RunQuicese();
|
||||
}
|
||||
m_gcEpoch = 0;
|
||||
}
|
||||
|
||||
void GcStartTxn()
|
||||
{
|
||||
@ -272,6 +276,7 @@ public:
|
||||
RunQuicese();
|
||||
m_managerLock.unlock();
|
||||
}
|
||||
m_gcEpoch = 0;
|
||||
m_isTxnStarted = false;
|
||||
}
|
||||
|
||||
@ -313,7 +318,6 @@ public:
|
||||
{
|
||||
if (m_performGcEpoch != g_gcActiveEpoch)
|
||||
HardQuiesce(m_rcuFreeCount);
|
||||
m_gcEpoch = 0;
|
||||
}
|
||||
|
||||
/** @brief Clean all object at the end of the session */
|
||||
|
@ -258,6 +258,15 @@ bool Index::IndexInsert(Sentinel*& outputSentinel, const Key* key, uint32_t pid,
|
||||
outputSentinel = IndexInsertImpl(key, sentinel, inserted, pid);
|
||||
// sync between rollback/delete and insert
|
||||
if (inserted == false) {
|
||||
if (unlikely(outputSentinel == nullptr)) {
|
||||
MOT_REPORT_ERROR(
|
||||
MOT_ERROR_OOM, "Index Insert", "Failed to insert sentinel to index %s", m_name.c_str());
|
||||
rc = RC_MEMORY_ALLOCATION_ERROR;
|
||||
m_sentinelPool->Release<Sentinel>(sentinel);
|
||||
sentinel = nullptr;
|
||||
return false;
|
||||
}
|
||||
|
||||
// Spin if the counter is 0 - aborting in parallel or sentinel is marks for commit
|
||||
if (outputSentinel->RefCountUpdate(INC, pid) == RC_OK)
|
||||
retryInsert = false;
|
||||
@ -300,8 +309,16 @@ Sentinel* Index::IndexInsert(const Key* key, Row* row, uint32_t pid)
|
||||
// no need to report to full error stack
|
||||
SetLastError(MOT_ERROR_UNIQUE_VIOLATION, MOT_SEVERITY_NORMAL);
|
||||
m_sentinelPool->Release<Sentinel>(sentinel);
|
||||
sentinel = nullptr;
|
||||
return nullptr;
|
||||
} else {
|
||||
if (inserted == false) {
|
||||
MOT_REPORT_ERROR(MOT_ERROR_OOM, "Index Insert", "Failed to insert sentinel to index %s", m_name.c_str());
|
||||
m_sentinelPool->Release<Sentinel>(sentinel);
|
||||
sentinel = nullptr;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
if (GetIndexOrder() == IndexOrder::INDEX_ORDER_PRIMARY) {
|
||||
sentinel->SetPrimaryIndex();
|
||||
sentinel->SetNextPtr(row);
|
||||
@ -343,9 +360,6 @@ Sentinel* Index::IndexReadHeader(const Key* key, uint32_t pid) const
|
||||
Sentinel* Index::IndexRemove(const Key* key, uint32_t pid)
|
||||
{
|
||||
Sentinel* sentinel = IndexRemoveImpl(key, pid);
|
||||
|
||||
MOT_ASSERT(sentinel != nullptr);
|
||||
MOT_ASSERT(sentinel->GetCounter() == 0);
|
||||
return sentinel;
|
||||
}
|
||||
|
||||
|
@ -56,12 +56,22 @@ void* basic_table<P>::insert(MOT::Key const* const& key, void* const& entry, boo
|
||||
5. Update the the key slice, keylen, key suffix and key's value in the
|
||||
leaf
|
||||
6. Add the key's location in permutation's back (key is not visible for
|
||||
readers yet) As key's location is not part of the permutation yet, the key
|
||||
readers yet) as key's location is not part of the permutation yet, the key
|
||||
is not reachable (aka not present). In addition, the leaf is still locked.
|
||||
Unlocking the node and enter the key into the permutation will be done
|
||||
later in finish_insert (called from lp.finish). */
|
||||
later in finish_insert (done in lp.finish function). */
|
||||
|
||||
bool found = lp.find_insert(*mtSessionThreadInfo);
|
||||
bool found = false;
|
||||
if (!lp.find_insert(*mtSessionThreadInfo, found)) {
|
||||
// Failed to insert key due to memory allocation failure.
|
||||
MOT_ASSERT(!mtSessionThreadInfo->non_disruptive_error());
|
||||
MOT_ASSERT(found == false);
|
||||
lp.finish(0, *mtSessionThreadInfo);
|
||||
result = false;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
MOT_ASSERT(mtSessionThreadInfo->non_disruptive_error());
|
||||
|
||||
// If the key is new (not previously existing) then we record the entry under
|
||||
// that key
|
||||
|
@ -33,15 +33,15 @@ namespace Masstree {
|
||||
template <typename P>
|
||||
struct gc_layer_rcu_callback_ng : public P::threadinfo_type::mrcu_callback {
|
||||
typedef typename P::threadinfo_type threadinfo;
|
||||
node_base<P>* root_;
|
||||
node_base<P>** root_ref_;
|
||||
int len_;
|
||||
size_t size_;
|
||||
MOT::MasstreePrimaryIndex* index_;
|
||||
char s_[0];
|
||||
gc_layer_rcu_callback_ng(node_base<P>* root, Str prefix, size_t size)
|
||||
: root_(root), len_(prefix.length()), size_(size), index_(mtSessionThreadInfo->get_working_index())
|
||||
gc_layer_rcu_callback_ng(node_base<P>** root_ref, Str prefix, size_t size)
|
||||
: root_ref_(root_ref), len_(prefix.length()), size_(size), index_(mtSessionThreadInfo->get_working_index())
|
||||
{
|
||||
errno_t erc = memcpy_s(s_, size_, prefix.data(), len_);
|
||||
errno_t erc = memcpy_s(s_, len_, prefix.data(), len_);
|
||||
securec_check(erc, "\0", "\0");
|
||||
}
|
||||
size_t operator()(bool drop_index);
|
||||
@ -51,7 +51,7 @@ struct gc_layer_rcu_callback_ng : public P::threadinfo_type::mrcu_callback {
|
||||
return size_;
|
||||
}
|
||||
|
||||
static void make(node_base<P>* root, Str prefix, threadinfo& ti);
|
||||
static void make(node_base<P>** root_ref, Str prefix, threadinfo& ti);
|
||||
};
|
||||
|
||||
template <typename P>
|
||||
@ -60,8 +60,8 @@ size_t gc_layer_rcu_callback_ng<P>::operator()(bool drop_index)
|
||||
// If drop_index == true, all index's pools are going to be cleaned, so we can skip gc_layer call (which might add
|
||||
// more elements into GC)
|
||||
if (drop_index == false) {
|
||||
// GC layer remove might delete elements from tree and add them to the limbolist. Index must be provided to
|
||||
// allow access to the memory pools.
|
||||
// GC layer remove might delete elements from tree and might create new gc layer removal requests and add them to GC.
|
||||
// Index must be provided to allow access to the memory pools.
|
||||
mtSessionThreadInfo->set_working_index(index_);
|
||||
(*this)(*mtSessionThreadInfo);
|
||||
mtSessionThreadInfo->set_working_index(NULL);
|
||||
@ -73,30 +73,33 @@ size_t gc_layer_rcu_callback_ng<P>::operator()(bool drop_index)
|
||||
template <typename P>
|
||||
void gc_layer_rcu_callback_ng<P>::operator()(threadinfo& ti)
|
||||
{
|
||||
// root_ node while creating gc_layer_rcu_callback_ng might not be the current root. Find updated tree's root.
|
||||
while (!root_->is_root()) {
|
||||
root_ = root_->maybe_parent();
|
||||
}
|
||||
masstree_invariant(root_ref_);
|
||||
|
||||
// If root was already deleted, do nothing.
|
||||
if (root_->deleted()) {
|
||||
return;
|
||||
}
|
||||
|
||||
tcursor<P> node_cursor(root_, s_, len_);
|
||||
if (!node_cursor.gc_layer(ti) || !node_cursor.finish_remove(ti)) {
|
||||
tcursor<P> node_cursor(root_ref_, s_, len_);
|
||||
bool do_remove = node_cursor.gc_layer(ti);
|
||||
if (!do_remove || !node_cursor.finish_remove(ti)) {
|
||||
node_cursor.n_->unlock();
|
||||
}
|
||||
ti.add_nodes_to_gc();
|
||||
}
|
||||
|
||||
template <typename P>
|
||||
void gc_layer_rcu_callback_ng<P>::make(node_base<P>* root, Str prefix, threadinfo& ti)
|
||||
void gc_layer_rcu_callback_ng<P>::make(node_base<P>** root_ref, Str prefix, threadinfo& ti)
|
||||
{
|
||||
size_t sz = prefix.len + sizeof(gc_layer_rcu_callback_ng<P>);
|
||||
// As we are using slab allocator to allocate the memory, sz is will updated in ti.allocate with the real allocated
|
||||
// size
|
||||
void* data = ti.allocate(sz, memtag_masstree_gc, &sz /*OUT PARAM*/);
|
||||
gc_layer_rcu_callback_ng<P>* cb = new (data) gc_layer_rcu_callback_ng<P>(root, prefix, sz);
|
||||
// As we are using slab allocator for allocation, sz is will be updated by ti.allocate with the real allocation
|
||||
// size. We need this size for GC deallocation size report
|
||||
void* data = ti.allocate(sz, memtag_masstree_gc, &sz /* IN/OUT PARAM */);
|
||||
if (!data) {
|
||||
// If allocation fails, gc layer removal command will not be added to GC and this layer wont be removed.
|
||||
// We might deal with this issue in the future by replacing the current mechanism with one of the following options:
|
||||
// 1. Use thread local GC layer removal object (per threadinfo) and keep list of key suffixes to clean (also in threadinfo)
|
||||
// 2. Move this feature to VACUUM process: Create special iterator that adds GC Layer callbacks when it finds empty layers
|
||||
ti.set_last_error(MT_MERR_GC_LAYER_REMOVAL_MAKE);
|
||||
return;
|
||||
}
|
||||
|
||||
gc_layer_rcu_callback_ng<P>* cb = new (data) gc_layer_rcu_callback_ng<P>(root_ref, prefix, sz);
|
||||
ti.rcu_register(cb, sz);
|
||||
}
|
||||
|
||||
|
@ -69,14 +69,17 @@ Sentinel* MasstreePrimaryIndex::IndexInsertImpl(const Key* key, Sentinel* sentin
|
||||
mtSessionThreadInfo->set_gc_session(
|
||||
MOTEngine::GetInstance()->GetCurrentGcSession()); // set current GC session in thread-pooled envelope
|
||||
|
||||
mtSessionThreadInfo->set_last_error(MT_MERR_OK);
|
||||
|
||||
existingItem = m_index.insert(key, sentinel, inserted, pid);
|
||||
|
||||
mtSessionThreadInfo->set_gc_session(NULL);
|
||||
mtSessionThreadInfo->set_working_index(NULL);
|
||||
|
||||
if (!inserted) { // key mapping already exists in unique index
|
||||
if (!inserted && existingItem) { // key mapping already exists in unique index
|
||||
result = reinterpret_cast<Sentinel*>(existingItem);
|
||||
} // otherwise return null pointer
|
||||
} // otherwise return null pointer (if !inserted && !existingItem, Key does not exist and insertation failed due to
|
||||
// memory issue)
|
||||
|
||||
return result;
|
||||
}
|
||||
@ -108,6 +111,8 @@ Sentinel* MasstreePrimaryIndex::IndexRemoveImpl(const Key* key, uint32_t pid)
|
||||
mtSessionThreadInfo->set_gc_session(
|
||||
MOTEngine::GetInstance()->GetCurrentGcSession()); // set current GC session in thread-pooled envelope
|
||||
|
||||
mtSessionThreadInfo->set_last_error(MT_MERR_OK);
|
||||
|
||||
output = m_index.remove(key->GetKeyBuf(), key->GetKeyLength(), result, pid);
|
||||
|
||||
mtSessionThreadInfo->set_gc_session(NULL);
|
||||
|
@ -37,6 +37,7 @@
|
||||
#include "masstree/mot_masstree_struct.hpp"
|
||||
#include "masstree/mot_masstree_iterator.hpp"
|
||||
#include <cmath>
|
||||
#include "mot_engine.h"
|
||||
|
||||
namespace MOT {
|
||||
/**
|
||||
@ -302,11 +303,38 @@ public:
|
||||
/**
|
||||
* @brief Print Masstree pools memory consumption details to log.
|
||||
*/
|
||||
virtual void PrintPoolsStats()
|
||||
virtual void PrintPoolsStats(LogLevel level = LogLevel::LL_DEBUG)
|
||||
{
|
||||
m_leafsPool->Print("Leafs pool: ");
|
||||
m_internodesPool->Print("Internode pool: ");
|
||||
m_ksuffixSlab->Print("Ksuffix slab: ");
|
||||
m_leafsPool->Print("Leafs pool", level);
|
||||
m_internodesPool->Print("Internode pool", level);
|
||||
m_ksuffixSlab->Print("Ksuffix slab", level);
|
||||
}
|
||||
|
||||
virtual void GetLeafsPoolStats(uint64_t& objSize, uint64_t& numUsedObj, uint64_t& totalSize, uint64_t& netto)
|
||||
{
|
||||
PoolStatsSt stats = {};
|
||||
m_leafsPool->GetStats(stats);
|
||||
|
||||
objSize = stats.m_objSize;
|
||||
numUsedObj = stats.m_totalObjCount - stats.m_freeObjCount;
|
||||
totalSize = stats.m_poolCount * stats.m_poolGrossSize;
|
||||
netto = numUsedObj * objSize;
|
||||
}
|
||||
|
||||
virtual void GetInternodesPoolStats(uint64_t& objSize, uint64_t& numUsedObj, uint64_t& totalSize, uint64_t& netto)
|
||||
{
|
||||
PoolStatsSt stats = {};
|
||||
m_internodesPool->GetStats(stats);
|
||||
|
||||
objSize = stats.m_objSize;
|
||||
numUsedObj = stats.m_totalObjCount - stats.m_freeObjCount;
|
||||
totalSize = stats.m_poolCount * stats.m_poolGrossSize;
|
||||
netto = numUsedObj * objSize;
|
||||
}
|
||||
|
||||
virtual PoolStatsSt* GetKsuffixSlabStats()
|
||||
{
|
||||
return m_ksuffixSlab->GetStats();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -316,6 +344,8 @@ public:
|
||||
{
|
||||
m_initialized = false;
|
||||
DestroyPools();
|
||||
// remove masstree's root pointer (not valid anymore)
|
||||
*(m_index.root_ref()) = nullptr;
|
||||
|
||||
return IndexInitImpl(NULL);
|
||||
}
|
||||
@ -332,7 +362,7 @@ public:
|
||||
* @param tag Hint to determine which pool to use.
|
||||
* @return Pointer to allocated memory.
|
||||
*/
|
||||
void* AllocateMem(int& size, enum memtag tag)
|
||||
virtual void* AllocateMem(int& size, enum memtag tag)
|
||||
{
|
||||
switch (tag) {
|
||||
case memtag_masstree_leaf:
|
||||
@ -360,7 +390,7 @@ public:
|
||||
* @param Pointer to allocated memory.
|
||||
* @return True if deallocation succeeded.
|
||||
*/
|
||||
bool DeallocateMem(void* ptr, int size, enum memtag tag)
|
||||
virtual bool DeallocateMem(void* ptr, int size, enum memtag tag)
|
||||
{
|
||||
switch (tag) {
|
||||
case memtag_masstree_leaf:
|
||||
@ -431,10 +461,16 @@ public:
|
||||
{
|
||||
// If dropIndex == true, all index's pools are going to be cleaned, so we skip the release here
|
||||
mtSessionThreadInfo->set_gc_session(GetCurrentGcSession());
|
||||
GcEpochType local_epoch =
|
||||
GetSessionManager()->GetCurrentSessionContext()->GetTxnManager()->GetGcSession()->GcStartInnerTxn();
|
||||
|
||||
size_t allocationSize = (*static_cast<mrcu_callback*>(gcRemoveLayerFuncObjPtr))(dropIndex);
|
||||
|
||||
if (dropIndex == false) {
|
||||
((SlabAllocator*)slab)->Release(gcRemoveLayerFuncObjPtr, allocationSize);
|
||||
}
|
||||
|
||||
GetSessionManager()->GetCurrentSessionContext()->GetTxnManager()->GetGcSession()->GcEndInnerTxn(false);
|
||||
mtSessionThreadInfo->set_gc_session(NULL);
|
||||
return allocationSize;
|
||||
}
|
||||
|
@ -582,6 +582,7 @@ Row* Table::RemoveKeyFromIndex(Row* row, Sentinel* sentinel, uint64_t tid, GcMan
|
||||
#endif
|
||||
currSentinel = ix->IndexRemove(&key, tid);
|
||||
MOT_ASSERT(currSentinel == sentinel);
|
||||
MOT_ASSERT(currSentinel->GetCounter() == 0);
|
||||
if (likely(gc != nullptr)) {
|
||||
if (ix->GetIndexOrder() == IndexOrder::INDEX_ORDER_PRIMARY) {
|
||||
OutputRow = currSentinel->GetData();
|
||||
|
@ -413,6 +413,8 @@ RC TxnManager::RollbackInsert(Access* ac)
|
||||
#endif
|
||||
outputSen = index_->IndexRemove(&m_key, GetThdId());
|
||||
MOT_ASSERT(outputSen != nullptr);
|
||||
MOT_ASSERT(outputSen->GetCounter() == 0);
|
||||
|
||||
GcSessionRecordRcu(index_->GetIndexId(), outputSen, nullptr, Index::SentinelDtor, SENTINEL_SIZE(index_));
|
||||
// If we are the owner of the key and insert on top of a deleted row,
|
||||
// lets check if we can reclaim the deleted row
|
||||
@ -820,6 +822,8 @@ void TxnInsertAction::CleanupOptimisticInsert(
|
||||
MOT_ASSERT(pIndexInsertResult->GetCounter() == 0);
|
||||
Sentinel* outputSen = index_->IndexRemove(currentItem->m_key, m_manager->GetThdId());
|
||||
MOT_ASSERT(outputSen != nullptr);
|
||||
MOT_ASSERT(outputSen->GetCounter() == 0);
|
||||
|
||||
m_manager->GcSessionRecordRcu(
|
||||
index_->GetIndexId(), outputSen, nullptr, Index::SentinelDtor, SENTINEL_SIZE(index_));
|
||||
m_manager->m_accessMgr->IncreaseTableStat(table);
|
||||
|
Reference in New Issue
Block a user