Fix async redo performance issue and sync to standby issue

This commit is contained in:
Vinoth
2020-08-05 17:34:33 +08:00
parent 937bc72dec
commit 1951058d4e
20 changed files with 220 additions and 69 deletions

View File

@ -41,7 +41,7 @@
#enable_redo_log = true #enable_redo_log = true
# Specifies whether to use group commit. # Specifies whether to use group commit.
# This option is relevant only when GaussDB is configured to use synchronous commit (i.e. when # This option is relevant only when openGauss is configured to use synchronous commit (i.e. when
# postgresql.conf has synchronous_commit configured to any value other than 'off'). # postgresql.conf has synchronous_commit configured to any value other than 'off').
#enable_group_commit = false #enable_group_commit = false
@ -54,6 +54,13 @@
#group_commit_size = 16 #group_commit_size = 16
#group_commit_timeout = 10 ms #group_commit_timeout = 10 ms
# Specifies the number of redo log buffers to use for asynchronous commit mode.
# Allowed range of values for this configuration is [8, 128]. The size of one buffer is 128 MB.
# This option is relevant only when openGauss is configured to use asynchronous commit (i.e. when
# postgresql.conf has synchronous_commit configured to 'off').
#
#async_log_buffer_count = 24
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
# CHECKPOINT # CHECKPOINT
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------

View File

@ -335,6 +335,9 @@ void CheckpointManager::MoveToNextPhase()
// hold the redo log lock to avoid inserting additional entries to the // hold the redo log lock to avoid inserting additional entries to the
// log. Once snapshot is taken, this lock will be released in SnapshotReady(). // log. Once snapshot is taken, this lock will be released in SnapshotReady().
m_redoLogHandler->WrLock(); m_redoLogHandler->WrLock();
// write all buffer entries before taking the LSN position
// relevant for asynchronous logging or group commit
m_redoLogHandler->Flush();
} }
} }

View File

@ -300,6 +300,10 @@ inline void Prefetch(const void* ptr)
/** @define Constant denoting indentation used for MOT printouts. */ /** @define Constant denoting indentation used for MOT printouts. */
#define PRINT_REPORT_INDENT 2 #define PRINT_REPORT_INDENT 2
/** @define Min and Max values for asynchronous redo log buffer array count. */
#define MIN_ASYNC_REDO_LOG_BUFFER_ARRAY_COUNT 8
#define MAX_ASYNC_REDO_LOG_BUFFER_ARRAY_COUNT 128
} // namespace MOT } // namespace MOT
#endif // MOT_GLOBAL_H #endif // MOT_GLOBAL_H

View File

@ -39,6 +39,7 @@ MOTConfiguration MOTConfiguration::motGlobalConfiguration;
constexpr bool MOTConfiguration::DEFAULT_ENABLE_REDO_LOG; constexpr bool MOTConfiguration::DEFAULT_ENABLE_REDO_LOG;
constexpr LoggerType MOTConfiguration::DEFAULT_LOGGER_TYPE; constexpr LoggerType MOTConfiguration::DEFAULT_LOGGER_TYPE;
constexpr RedoLogHandlerType MOTConfiguration::DEFAULT_REDO_LOG_HANDLER_TYPE; constexpr RedoLogHandlerType MOTConfiguration::DEFAULT_REDO_LOG_HANDLER_TYPE;
constexpr uint32_t MOTConfiguration::DEFAULT_ASYNC_REDO_LOG_BUFFER_ARRAY_COUNT;
constexpr bool MOTConfiguration::DEFAULT_ENABLE_GROUP_COMMIT; constexpr bool MOTConfiguration::DEFAULT_ENABLE_GROUP_COMMIT;
constexpr uint64_t MOTConfiguration::DEFAULT_GROUP_COMMIT_SIZE; constexpr uint64_t MOTConfiguration::DEFAULT_GROUP_COMMIT_SIZE;
constexpr const char* MOTConfiguration::DEFAULT_GROUP_COMMIT_TIMEOUT; constexpr const char* MOTConfiguration::DEFAULT_GROUP_COMMIT_TIMEOUT;
@ -381,6 +382,7 @@ MOTConfiguration::MOTConfiguration()
: m_enableRedoLog(DEFAULT_ENABLE_REDO_LOG), : m_enableRedoLog(DEFAULT_ENABLE_REDO_LOG),
m_loggerType(DEFAULT_LOGGER_TYPE), m_loggerType(DEFAULT_LOGGER_TYPE),
m_redoLogHandlerType(DEFAULT_REDO_LOG_HANDLER_TYPE), m_redoLogHandlerType(DEFAULT_REDO_LOG_HANDLER_TYPE),
m_asyncRedoLogBufferArrayCount(DEFAULT_ASYNC_REDO_LOG_BUFFER_ARRAY_COUNT),
m_enableGroupCommit(DEFAULT_ENABLE_GROUP_COMMIT), m_enableGroupCommit(DEFAULT_ENABLE_GROUP_COMMIT),
m_groupCommitSize(DEFAULT_GROUP_COMMIT_SIZE), m_groupCommitSize(DEFAULT_GROUP_COMMIT_SIZE),
m_groupCommitTimeoutUSec(DEFAULT_GROUP_COMMIT_TIMEOUT_USEC), m_groupCommitTimeoutUSec(DEFAULT_GROUP_COMMIT_TIMEOUT_USEC),
@ -475,6 +477,7 @@ bool MOTConfiguration::SetFlag(const std::string& name, const std::string& value
if (ParseBool(name, "enable_redo_log", value, &m_enableRedoLog)) { if (ParseBool(name, "enable_redo_log", value, &m_enableRedoLog)) {
} else if (ParseLoggerType(name, "logger_type", value, &m_loggerType)) { } else if (ParseLoggerType(name, "logger_type", value, &m_loggerType)) {
} else if (ParseRedoLogHandlerType(name, "redo_log_handler_type", value, &m_redoLogHandlerType)) { } else if (ParseRedoLogHandlerType(name, "redo_log_handler_type", value, &m_redoLogHandlerType)) {
} else if (ParseUint32(name, "async_log_buffer_count", value, &m_asyncRedoLogBufferArrayCount)) {
} else if (ParseBool(name, "enable_group_commit", value, &m_enableGroupCommit)) { } else if (ParseBool(name, "enable_group_commit", value, &m_enableGroupCommit)) {
} else if (ParseUint64(name, "group_commit_size", value, &m_groupCommitSize)) { } else if (ParseUint64(name, "group_commit_size", value, &m_groupCommitSize)) {
} else if (ParseUint64(name, "group_commit_timeout_usec", value, &m_groupCommitTimeoutUSec)) { } else if (ParseUint64(name, "group_commit_timeout_usec", value, &m_groupCommitTimeoutUSec)) {
@ -603,6 +606,9 @@ int MOTConfiguration::GetMappedCore(int logicId) const
#define UPDATE_INT_CFG(var, cfgPath, defaultValue) \ #define UPDATE_INT_CFG(var, cfgPath, defaultValue) \
UpdateConfigItem(var, cfg->GetIntegerConfigValue(cfgPath, defaultValue), cfgPath) UpdateConfigItem(var, cfg->GetIntegerConfigValue(cfgPath, defaultValue), cfgPath)
#define UPDATE_INT_CFG_BOUNDS(var, cfgPath, defaultValue, lowerBound, upperBound) \
UpdateConfigItem(var, cfg->GetIntegerConfigValue(cfgPath, defaultValue), cfgPath, lowerBound, upperBound)
#define UPDATE_MEM_CFG(var, cfgPath, defaultValue, scale) \ #define UPDATE_MEM_CFG(var, cfgPath, defaultValue, scale) \
do { \ do { \
uint64_t memoryValueBytes = \ uint64_t memoryValueBytes = \
@ -633,6 +639,11 @@ void MOTConfiguration::LoadConfig()
UPDATE_CFG(m_enableRedoLog, "enable_redo_log", DEFAULT_ENABLE_REDO_LOG); UPDATE_CFG(m_enableRedoLog, "enable_redo_log", DEFAULT_ENABLE_REDO_LOG);
UPDATE_USER_CFG(m_loggerType, "logger_type", DEFAULT_LOGGER_TYPE); UPDATE_USER_CFG(m_loggerType, "logger_type", DEFAULT_LOGGER_TYPE);
UPDATE_USER_CFG(m_redoLogHandlerType, "redo_log_handler_type", DEFAULT_REDO_LOG_HANDLER_TYPE); UPDATE_USER_CFG(m_redoLogHandlerType, "redo_log_handler_type", DEFAULT_REDO_LOG_HANDLER_TYPE);
UPDATE_INT_CFG_BOUNDS(m_asyncRedoLogBufferArrayCount,
"async_log_buffer_count",
DEFAULT_ASYNC_REDO_LOG_BUFFER_ARRAY_COUNT,
MIN_ASYNC_REDO_LOG_BUFFER_ARRAY_COUNT,
MAX_ASYNC_REDO_LOG_BUFFER_ARRAY_COUNT);
// commit configuration // commit configuration
UPDATE_CFG(m_enableGroupCommit, "enable_group_commit", DEFAULT_ENABLE_GROUP_COMMIT); UPDATE_CFG(m_enableGroupCommit, "enable_group_commit", DEFAULT_ENABLE_GROUP_COMMIT);

View File

@ -123,6 +123,9 @@ public:
/** Determines the redo log handler type (not configurable, but derived). */ /** Determines the redo log handler type (not configurable, but derived). */
RedoLogHandlerType m_redoLogHandlerType; RedoLogHandlerType m_redoLogHandlerType;
/** Determines the number of asynchronous redo log buffer arrays. */
uint32_t m_asyncRedoLogBufferArrayCount;
/**********************************************************************/ /**********************************************************************/
// Commit configuration // Commit configuration
/**********************************************************************/ /**********************************************************************/
@ -397,6 +400,9 @@ private:
/** @var Default redo log handler type. */ /** @var Default redo log handler type. */
static constexpr RedoLogHandlerType DEFAULT_REDO_LOG_HANDLER_TYPE = RedoLogHandlerType::SYNC_REDO_LOG_HANDLER; static constexpr RedoLogHandlerType DEFAULT_REDO_LOG_HANDLER_TYPE = RedoLogHandlerType::SYNC_REDO_LOG_HANDLER;
/** @var Default asynchronous redo log buffer array count. */
static constexpr uint32_t DEFAULT_ASYNC_REDO_LOG_BUFFER_ARRAY_COUNT = 24;
// default commit configuration // default commit configuration
/** @var Default enable group commit. */ /** @var Default enable group commit. */
static constexpr bool DEFAULT_ENABLE_GROUP_COMMIT = false; static constexpr bool DEFAULT_ENABLE_GROUP_COMMIT = false;
@ -630,9 +636,12 @@ private:
} }
template <typename T> template <typename T>
static void UpdateConfigItem(uint32_t& oldValue, T newValue, const char* name) static void UpdateConfigItem(uint32_t& oldValue, T newValue, const char* name,
uint32_t lowerBound = 0, uint32_t upperBound = UINT_MAX)
{ {
if (newValue > UINT_MAX) { if (newValue > upperBound) {
MOT_LOG_WARN("Configuration of %s overflowed: keeping default value %u", name, oldValue);
} else if (lowerBound > 0 && newValue < lowerBound) {
MOT_LOG_WARN("Configuration of %s overflowed: keeping default value %u", name, oldValue); MOT_LOG_WARN("Configuration of %s overflowed: keeping default value %u", name, oldValue);
} else if (oldValue != newValue) { } else if (oldValue != newValue) {
MOT_LOG_TRACE("Configuration of %s changed: %u --> %u", name, oldValue, (uint32_t)newValue); MOT_LOG_TRACE("Configuration of %s changed: %u --> %u", name, oldValue, (uint32_t)newValue);

View File

@ -1343,7 +1343,7 @@ void RecoveryManager::ClearTableCache()
while (it != m_tableDeletesStat.end()) { while (it != m_tableDeletesStat.end()) {
auto table = *it; auto table = *it;
if (table.second > NUM_DELETE_MAX_INC) { if (table.second > NUM_DELETE_MAX_INC) {
MOT_LOG_INFO("RecoveryManager::ClearTableCache: Table = %s items = %lu\n", MOT_LOG_TRACE("RecoveryManager::ClearTableCache: Table = %s items = %lu\n",
table.first->GetTableName().c_str(), table.first->GetTableName().c_str(),
table.second); table.second);
table.first->ClearRowCache(); table.first->ClearRowCache();

View File

@ -93,7 +93,7 @@ uint32_t RecoveryManager::RecoverLogOperationCreateTable(
Table* table = nullptr; Table* table = nullptr;
switch (state) { switch (state) {
case COMMIT: case COMMIT:
MOT_LOG_INFO("RecoverLogOperationCreateTable: COMMIT"); MOT_LOG_DEBUG("RecoverLogOperationCreateTable: COMMIT");
CreateTable((char*)data, status, table, true); CreateTable((char*)data, status, table, true);
break; break;

View File

@ -29,19 +29,42 @@
#include "global.h" #include "global.h"
#include "utilities.h" #include "utilities.h"
#include "mot_atomic_ops.h" #include "mot_atomic_ops.h"
#include "mot_configuration.h"
namespace MOT { namespace MOT {
DECLARE_LOGGER(AsyncRedoLogHandler, redolog) DECLARE_LOGGER(AsyncRedoLogHandler, redolog)
AsyncRedoLogHandler::AsyncRedoLogHandler() : m_bufferPool(), m_activeBuffer(0) AsyncRedoLogHandler::AsyncRedoLogHandler()
: m_bufferPool(),
m_redoLogBufferArrayCount(GetGlobalConfiguration().m_asyncRedoLogBufferArrayCount),
m_activeBuffer(0),
m_initialized(false)
{} {}
AsyncRedoLogHandler::~AsyncRedoLogHandler() AsyncRedoLogHandler::~AsyncRedoLogHandler()
{} {
// wait for all redo log buffers to be written
while (!m_writeQueue.empty()) {
usleep(WRITE_LOG_WAIT_INTERVAL);
}
if (m_initialized) {
pthread_mutex_destroy(&m_writeLock);
}
m_initialized = false;
}
bool AsyncRedoLogHandler::Init() bool AsyncRedoLogHandler::Init()
{ {
return m_bufferPool.Init(); bool result = false;
int rc = pthread_mutex_init(&m_writeLock, nullptr);
result = (rc == 0);
if (result != true) {
MOT_LOG_ERROR("Error initializing async redolog handler lock");
return result;
}
result = m_bufferPool.Init();
m_initialized = true;
return result;
} }
RedoLogBuffer* AsyncRedoLogHandler::CreateBuffer() RedoLogBuffer* AsyncRedoLogHandler::CreateBuffer()
@ -56,16 +79,18 @@ void AsyncRedoLogHandler::DestroyBuffer(RedoLogBuffer* buffer)
RedoLogBuffer* AsyncRedoLogHandler::WriteToLog(RedoLogBuffer* buffer) RedoLogBuffer* AsyncRedoLogHandler::WriteToLog(RedoLogBuffer* buffer)
{ {
int position = 0; int position = -1;
do { do {
uint64_t activeBuffer = MOT_ATOMIC_LOAD(m_activeBuffer) % WRITE_LOG_BUFFER_COUNT; m_switchLock.RdLock();
position = m_tripleBuffer[activeBuffer].PushBack(buffer); int activeBuffer = m_activeBuffer;
if (position == MAX_BUFFERS / 2) { position = m_redoLogBufferArrayArray[m_activeBuffer].PushBack(buffer);
WakeupWalWriter(); m_switchLock.RdUnlock();
} else if (position == -1) { if (position == -1) {
// async redo log ternary buffer is full, waiting for write thread usleep(1000);
// to flush the buffer } else if (position == (int)(MAX_BUFFERS - 1)) {
usleep(WRITE_LOG_WAIT_INTERVAL); if (TrySwitchBuffers(activeBuffer)) {
WriteSingleBuffer();
}
} }
} while (position == -1); } while (position == -1);
return CreateBuffer(); return CreateBuffer();
@ -73,34 +98,89 @@ RedoLogBuffer* AsyncRedoLogHandler::WriteToLog(RedoLogBuffer* buffer)
void AsyncRedoLogHandler::Write() void AsyncRedoLogHandler::Write()
{ {
// buffer list switch logic: m_switchLock.RdLock();
// while writers write to list 0, we increment index to 1 and then write list 2 to log int activeBuffer = m_activeBuffer;
// while writers write to list 1, we increment index to 2 and then write list 0 to log m_switchLock.RdUnlock();
// while writers write to list 2, we increment index to 0 and then write list 1 to log if (TrySwitchBuffers(activeBuffer)) {
uint64_t currentIndex = MOT_ATOMIC_LOAD(m_activeBuffer); WriteSingleBuffer();
uint64_t nextIndex = (currentIndex + 1) % WRITE_LOG_BUFFER_COUNT;
uint64_t prevIndex = (currentIndex + 2) % WRITE_LOG_BUFFER_COUNT;
// allow writers to start writing to next buffer list
MOT_ATOMIC_STORE(m_activeBuffer, nextIndex);
// in the meantime we write and flush previous list to log
// writers might still be writing to current list, but we do not touch it in this cycle
RedoLogBufferArray& prevBufferArray = m_tripleBuffer[prevIndex];
// invoke logger only if something happened, otherwise we just juggle with empty buffer arrays
if (!prevBufferArray.Empty()) {
m_logger->AddToLog(prevBufferArray);
m_logger->FlushLog();
FreeBuffers(prevBufferArray);
prevBufferArray.Reset();
} }
} }
bool AsyncRedoLogHandler::TrySwitchBuffers(int index)
{
bool result = false;
int nextIndex = (index + 1) % m_redoLogBufferArrayCount;
m_switchLock.WrLock();
while (index == m_activeBuffer && !m_redoLogBufferArrayArray[index].Empty()) {
if (!m_redoLogBufferArrayArray[nextIndex].Empty()) {
// the next buffer was not yet written to the log
// wait until write complete
m_switchLock.WrUnlock();
usleep(WRITE_LOG_WAIT_INTERVAL);
m_switchLock.WrLock();
continue;
}
m_writeQueue.push(m_activeBuffer);
m_activeBuffer = nextIndex;
result = true;
}
m_switchLock.WrUnlock();
return result;
}
void AsyncRedoLogHandler::FreeBuffers(RedoLogBufferArray& bufferArray) void AsyncRedoLogHandler::FreeBuffers(RedoLogBufferArray& bufferArray)
{ {
for (uint32_t i = 0; i < bufferArray.Size(); i++) { for (uint32_t i = 0; i < bufferArray.Size(); i++) {
m_bufferPool.Free(bufferArray[i]); m_bufferPool.Free(bufferArray[i]);
} }
} }
void AsyncRedoLogHandler::WriteSingleBuffer()
{
uint32_t writeBufferIndex;
pthread_mutex_lock(&m_writeLock);
if (!m_writeQueue.empty()) {
writeBufferIndex = m_writeQueue.front();
m_writeQueue.pop();
RedoLogBufferArray& bufferArray = m_redoLogBufferArrayArray[writeBufferIndex];
// invoke logger only if something happened, otherwise we just juggle with empty buffer arrays
if (!bufferArray.Empty()) {
m_logger->AddToLog(bufferArray);
m_logger->FlushLog();
FreeBuffers(bufferArray);
bufferArray.Reset();
}
}
pthread_mutex_unlock(&m_writeLock);
}
void AsyncRedoLogHandler::WriteAllBuffers()
{
uint32_t writeBufferIndex;
pthread_mutex_lock(&m_writeLock);
while (!m_writeQueue.empty()) {
writeBufferIndex = m_writeQueue.front();
m_writeQueue.pop();
RedoLogBufferArray& bufferArray = m_redoLogBufferArrayArray[writeBufferIndex];
// invoke logger only if something happened, otherwise we just juggle with empty buffer arrays
if (!bufferArray.Empty()) {
m_logger->AddToLog(bufferArray);
m_logger->FlushLog();
FreeBuffers(bufferArray);
bufferArray.Reset();
}
}
pthread_mutex_unlock(&m_writeLock);
}
void AsyncRedoLogHandler::Flush()
{
m_switchLock.RdLock();
int activeBuffer = m_activeBuffer;
m_switchLock.RdUnlock();
TrySwitchBuffers(activeBuffer);
WriteAllBuffers();
}
} // namespace MOT } // namespace MOT

View File

@ -26,8 +26,10 @@
#ifndef ASYNCHRONOUS_REDO_LOG_HANDLER_H #ifndef ASYNCHRONOUS_REDO_LOG_HANDLER_H
#define ASYNCHRONOUS_REDO_LOG_HANDLER_H #define ASYNCHRONOUS_REDO_LOG_HANDLER_H
#include <queue>
#include "redo_log_handler.h" #include "redo_log_handler.h"
#include "redo_log_buffer_pool.h" #include "redo_log_buffer_pool.h"
#include "rw_lock.h"
namespace MOT { namespace MOT {
class TxnManager; class TxnManager;
@ -62,6 +64,8 @@ public:
*/ */
RedoLogBuffer* WriteToLog(RedoLogBuffer* buffer); RedoLogBuffer* WriteToLog(RedoLogBuffer* buffer);
void Flush();
/** /**
* @brief switches the buffers and flushes the log * @brief switches the buffers and flushes the log
*/ */
@ -72,17 +76,26 @@ public:
~AsyncRedoLogHandler(); ~AsyncRedoLogHandler();
private: private:
static constexpr unsigned int WRITE_LOG_WAIT_INTERVAL = 10000; // micro seconds static constexpr unsigned int WRITE_LOG_WAIT_INTERVAL = 1000; // micro second
static constexpr unsigned int WRITE_LOG_BUFFER_COUNT = 3;
/** /**
* @brief free all the RedoLogBuffers in the array and return them to the pool * @brief free all the RedoLogBuffers in the array and return them to the pool
*/ */
void FreeBuffers(RedoLogBufferArray& bufferArray); void FreeBuffers(RedoLogBufferArray& bufferArray);
bool TrySwitchBuffers(int index);
void WriteSingleBuffer();
void WriteAllBuffers();
RedoLogBufferPool m_bufferPool; RedoLogBufferPool m_bufferPool;
RedoLogBufferArray m_tripleBuffer[WRITE_LOG_BUFFER_COUNT]; // 3 buffer arrays for switching in cyclic manner. // array of RedoLogBufferArray for switching in cyclic manner.
volatile uint64_t m_activeBuffer; RedoLogBufferArray m_redoLogBufferArrayArray[MAX_ASYNC_REDO_LOG_BUFFER_ARRAY_COUNT];
uint32_t m_redoLogBufferArrayCount;
volatile int m_activeBuffer;
bool m_initialized;
RwLock m_switchLock;
pthread_mutex_t m_writeLock;
std::queue<uint32_t> m_writeQueue;
}; };
} // namespace MOT } // namespace MOT
#endif /* ASYNCHRONOUS_REDO_LOG_HANDLER_H */ #endif /* ASYNCHRONOUS_REDO_LOG_HANDLER_H */

View File

@ -30,7 +30,7 @@
#include "global.h" #include "global.h"
#include "redo_log_buffer.h" #include "redo_log_buffer.h"
#define MAX_BUFFERS 1000 #define MAX_BUFFERS 128
namespace MOT { namespace MOT {
class RedoLogBufferArray { class RedoLogBufferArray {
@ -91,6 +91,10 @@ public:
return m_array; return m_array;
} }
static uint32_t MaxSize()
{
return MAX_BUFFERS;
}
private: private:
std::atomic<uint32_t> m_nextFree; std::atomic<uint32_t> m_nextFree;
RedoLogBuffer* m_array[MAX_BUFFERS]; RedoLogBuffer* m_array[MAX_BUFFERS];

View File

@ -72,6 +72,11 @@ public:
*/ */
void Commit(bool isLeader, std::shared_ptr<CommitGroup> groupRef); void Commit(bool isLeader, std::shared_ptr<CommitGroup> groupRef);
inline bool IsCommitted()
{
return m_commited;
}
private: private:
const uint8_t m_handlerId; const uint8_t m_handlerId;
GroupSyncRedoLogHandler* m_handler; GroupSyncRedoLogHandler* m_handler;

View File

@ -155,4 +155,17 @@ RedoLogBuffer* GroupSyncRedoLogHandler::WriteToLog(RedoLogBuffer* buffer)
joinedGroup->Commit(leader, joinedGroup); joinedGroup->Commit(leader, joinedGroup);
return buffer; return buffer;
} }
void GroupSyncRedoLogHandler::Flush()
{
std::shared_ptr<CommitGroup> flushGroup = m_currentGroup;
if (flushGroup == nullptr) {
return;
}
while (!flushGroup->IsCommitted()) {
// spin until group is flushed
cpu_relax();
}
}
} // namespace MOT } // namespace MOT

View File

@ -63,6 +63,8 @@ public:
*/ */
virtual RedoLogBuffer* WriteToLog(RedoLogBuffer* buffer); virtual RedoLogBuffer* WriteToLog(RedoLogBuffer* buffer);
void Flush();
/** /**
* @brief initializes group params * @brief initializes group params
* @param group the group to work on * @param group the group to work on

View File

@ -74,4 +74,11 @@ void SegmentedGroupSyncRedoLogHandler::SetLogger(ILogger* logger)
for (unsigned int i = 0; i < m_numaNodes; i++) for (unsigned int i = 0; i < m_numaNodes; i++)
m_redoLogHandlerArray[i].SetLogger(logger); m_redoLogHandlerArray[i].SetLogger(logger);
} }
void SegmentedGroupSyncRedoLogHandler::Flush()
{
for (unsigned int i = 0; i < m_numaNodes; i++) {
m_redoLogHandlerArray[i].Flush();
}
}
} // namespace MOT } // namespace MOT

View File

@ -61,6 +61,7 @@ public:
* @return The next buffer to write to, or null in case of failure. * @return The next buffer to write to, or null in case of failure.
*/ */
virtual RedoLogBuffer* WriteToLog(RedoLogBuffer* buffer); virtual RedoLogBuffer* WriteToLog(RedoLogBuffer* buffer);
virtual void Flush();
virtual void SetLogger(ILogger* logger); virtual void SetLogger(ILogger* logger);
private: private:

View File

@ -68,6 +68,11 @@ public:
*/ */
virtual RedoLogBuffer* WriteToLog(RedoLogBuffer* buffer) = 0; virtual RedoLogBuffer* WriteToLog(RedoLogBuffer* buffer) = 0;
/**
* @brief flush all buffers (if exist) to log
*/
virtual void Flush() = 0;
/** /**
* @brief flushes the the log * @brief flushes the the log
*/ */

View File

@ -60,4 +60,7 @@ RedoLogBuffer* SynchronousRedoLogHandler::WriteToLog(RedoLogBuffer* buffer)
m_logger->FlushLog(); m_logger->FlushLog();
return buffer; return buffer;
} }
void SynchronousRedoLogHandler::Flush()
{}
} // namespace MOT } // namespace MOT

View File

@ -57,6 +57,7 @@ public:
* @return The next buffer to write to, or null in case of failure. * @return The next buffer to write to, or null in case of failure.
*/ */
RedoLogBuffer* WriteToLog(RedoLogBuffer* buffer); RedoLogBuffer* WriteToLog(RedoLogBuffer* buffer);
void Flush();
SynchronousRedoLogHandler(const SynchronousRedoLogHandler& orig) = delete; SynchronousRedoLogHandler(const SynchronousRedoLogHandler& orig) = delete;
SynchronousRedoLogHandler& operator=(const SynchronousRedoLogHandler& orig) = delete; SynchronousRedoLogHandler& operator=(const SynchronousRedoLogHandler& orig) = delete;
~SynchronousRedoLogHandler(); ~SynchronousRedoLogHandler();

View File

@ -80,6 +80,13 @@ void MOTRedo(XLogReaderState* record)
} }
} }
uint64_t XLOGLogger::AddToLog(MOT::RedoLogBuffer** redoLogBufferArray, uint32_t size)
{
uint64_t written = MOT::ILogger::AddToLog(redoLogBufferArray, size);
XLogSetAsyncXactLSN(t_thrd.xlog_cxt.XactLastRecEnd);
return written;
}
uint64_t XLOGLogger::AddToLog(uint8_t* data, uint32_t size) uint64_t XLOGLogger::AddToLog(uint8_t* data, uint32_t size)
{ {
XLogBeginInsert(); XLogBeginInsert();
@ -88,29 +95,6 @@ uint64_t XLOGLogger::AddToLog(uint8_t* data, uint32_t size)
return size; return size;
} }
uint64_t XLOGLogger::AddToLog(MOT::RedoLogBuffer* redoBuffer)
{
uint32_t length;
uint8_t* data = redoBuffer->Serialize(&length);
return AddToLog(data, length);
}
uint64_t XLOGLogger::AddToLog(MOT::RedoLogBuffer** redoTransactionArray, uint32_t size)
{
uint32_t written = 0;
// ensure that we have enough space to add all transaction buffers
XLogEnsureRecordSpace(0, size);
XLogBeginInsert();
for (uint32_t i = 0; i < size; i++) {
uint32_t length;
uint8_t* data = redoTransactionArray[i]->Serialize(&length);
XLogRegisterData((char*)data, length);
written += length;
}
XLogInsert(RM_MOT_ID, MOT_REDO_DATA);
return written;
}
void XLOGLogger::FlushLog() void XLOGLogger::FlushLog()
{} {}

View File

@ -45,9 +45,8 @@ public:
inline ~XLOGLogger() inline ~XLOGLogger()
{} {}
uint64_t AddToLog(MOT::RedoLogBuffer** redoLogBufferArray, uint32_t size);
uint64_t AddToLog(uint8_t* data, uint32_t size); uint64_t AddToLog(uint8_t* data, uint32_t size);
uint64_t AddToLog(MOT::RedoLogBuffer* redoBuffer);
uint64_t AddToLog(MOT::RedoLogBuffer** redoBufferArray, uint32_t size);
void FlushLog(); void FlushLog();
void CloseLog(); void CloseLog();
void ClearLog(); void ClearLog();