!74 Fix async redo performance issue and sync to standby issue

Merge pull request !74 from Vinoth/master
This commit is contained in:
opengauss-bot
2020-08-06 21:26:39 +08:00
committed by Gitee
20 changed files with 220 additions and 69 deletions

View File

@ -41,7 +41,7 @@
#enable_redo_log = true
# Specifies whether to use group commit.
# This option is relevant only when GaussDB is configured to use synchronous commit (i.e. when
# This option is relevant only when openGauss is configured to use synchronous commit (i.e. when
# postgresql.conf has synchronous_commit configured to any value other than 'off').
#enable_group_commit = false
@ -54,6 +54,13 @@
#group_commit_size = 16
#group_commit_timeout = 10 ms
# Specifies the number of redo log buffers to use for asynchronous commit mode.
# Allowed range of values for this configuration is [8, 128]. The size of one buffer is 128 MB.
# This option is relevant only when openGauss is configured to use asynchronous commit (i.e. when
# postgresql.conf has synchronous_commit configured to 'off').
#
#async_log_buffer_count = 24
#------------------------------------------------------------------------------
# CHECKPOINT
#------------------------------------------------------------------------------

View File

@ -335,6 +335,9 @@ void CheckpointManager::MoveToNextPhase()
// hold the redo log lock to avoid inserting additional entries to the
// log. Once snapshot is taken, this lock will be released in SnapshotReady().
m_redoLogHandler->WrLock();
// write all buffer entries before taking the LSN position
// relevant for asynchronous logging or group commit
m_redoLogHandler->Flush();
}
}

View File

@ -300,6 +300,10 @@ inline void Prefetch(const void* ptr)
/** @define Constant denoting indentation used for MOT printouts. */
#define PRINT_REPORT_INDENT 2
/** @define Min and Max values for asynchronous redo log buffer array count. */
#define MIN_ASYNC_REDO_LOG_BUFFER_ARRAY_COUNT 8
#define MAX_ASYNC_REDO_LOG_BUFFER_ARRAY_COUNT 128
} // namespace MOT
#endif // MOT_GLOBAL_H

View File

@ -39,6 +39,7 @@ MOTConfiguration MOTConfiguration::motGlobalConfiguration;
constexpr bool MOTConfiguration::DEFAULT_ENABLE_REDO_LOG;
constexpr LoggerType MOTConfiguration::DEFAULT_LOGGER_TYPE;
constexpr RedoLogHandlerType MOTConfiguration::DEFAULT_REDO_LOG_HANDLER_TYPE;
constexpr uint32_t MOTConfiguration::DEFAULT_ASYNC_REDO_LOG_BUFFER_ARRAY_COUNT;
constexpr bool MOTConfiguration::DEFAULT_ENABLE_GROUP_COMMIT;
constexpr uint64_t MOTConfiguration::DEFAULT_GROUP_COMMIT_SIZE;
constexpr const char* MOTConfiguration::DEFAULT_GROUP_COMMIT_TIMEOUT;
@ -381,6 +382,7 @@ MOTConfiguration::MOTConfiguration()
: m_enableRedoLog(DEFAULT_ENABLE_REDO_LOG),
m_loggerType(DEFAULT_LOGGER_TYPE),
m_redoLogHandlerType(DEFAULT_REDO_LOG_HANDLER_TYPE),
m_asyncRedoLogBufferArrayCount(DEFAULT_ASYNC_REDO_LOG_BUFFER_ARRAY_COUNT),
m_enableGroupCommit(DEFAULT_ENABLE_GROUP_COMMIT),
m_groupCommitSize(DEFAULT_GROUP_COMMIT_SIZE),
m_groupCommitTimeoutUSec(DEFAULT_GROUP_COMMIT_TIMEOUT_USEC),
@ -475,6 +477,7 @@ bool MOTConfiguration::SetFlag(const std::string& name, const std::string& value
if (ParseBool(name, "enable_redo_log", value, &m_enableRedoLog)) {
} else if (ParseLoggerType(name, "logger_type", value, &m_loggerType)) {
} else if (ParseRedoLogHandlerType(name, "redo_log_handler_type", value, &m_redoLogHandlerType)) {
} else if (ParseUint32(name, "async_log_buffer_count", value, &m_asyncRedoLogBufferArrayCount)) {
} else if (ParseBool(name, "enable_group_commit", value, &m_enableGroupCommit)) {
} else if (ParseUint64(name, "group_commit_size", value, &m_groupCommitSize)) {
} else if (ParseUint64(name, "group_commit_timeout_usec", value, &m_groupCommitTimeoutUSec)) {
@ -603,6 +606,9 @@ int MOTConfiguration::GetMappedCore(int logicId) const
#define UPDATE_INT_CFG(var, cfgPath, defaultValue) \
UpdateConfigItem(var, cfg->GetIntegerConfigValue(cfgPath, defaultValue), cfgPath)
#define UPDATE_INT_CFG_BOUNDS(var, cfgPath, defaultValue, lowerBound, upperBound) \
UpdateConfigItem(var, cfg->GetIntegerConfigValue(cfgPath, defaultValue), cfgPath, lowerBound, upperBound)
#define UPDATE_MEM_CFG(var, cfgPath, defaultValue, scale) \
do { \
uint64_t memoryValueBytes = \
@ -633,6 +639,11 @@ void MOTConfiguration::LoadConfig()
UPDATE_CFG(m_enableRedoLog, "enable_redo_log", DEFAULT_ENABLE_REDO_LOG);
UPDATE_USER_CFG(m_loggerType, "logger_type", DEFAULT_LOGGER_TYPE);
UPDATE_USER_CFG(m_redoLogHandlerType, "redo_log_handler_type", DEFAULT_REDO_LOG_HANDLER_TYPE);
UPDATE_INT_CFG_BOUNDS(m_asyncRedoLogBufferArrayCount,
"async_log_buffer_count",
DEFAULT_ASYNC_REDO_LOG_BUFFER_ARRAY_COUNT,
MIN_ASYNC_REDO_LOG_BUFFER_ARRAY_COUNT,
MAX_ASYNC_REDO_LOG_BUFFER_ARRAY_COUNT);
// commit configuration
UPDATE_CFG(m_enableGroupCommit, "enable_group_commit", DEFAULT_ENABLE_GROUP_COMMIT);

View File

@ -123,6 +123,9 @@ public:
/** Determines the redo log handler type (not configurable, but derived). */
RedoLogHandlerType m_redoLogHandlerType;
/** Determines the number of asynchronous redo log buffer arrays. */
uint32_t m_asyncRedoLogBufferArrayCount;
/**********************************************************************/
// Commit configuration
/**********************************************************************/
@ -397,6 +400,9 @@ private:
/** @var Default redo log handler type. */
static constexpr RedoLogHandlerType DEFAULT_REDO_LOG_HANDLER_TYPE = RedoLogHandlerType::SYNC_REDO_LOG_HANDLER;
/** @var Default asynchronous redo log buffer array count. */
static constexpr uint32_t DEFAULT_ASYNC_REDO_LOG_BUFFER_ARRAY_COUNT = 24;
// default commit configuration
/** @var Default enable group commit. */
static constexpr bool DEFAULT_ENABLE_GROUP_COMMIT = false;
@ -630,9 +636,12 @@ private:
}
template <typename T>
static void UpdateConfigItem(uint32_t& oldValue, T newValue, const char* name)
static void UpdateConfigItem(uint32_t& oldValue, T newValue, const char* name,
uint32_t lowerBound = 0, uint32_t upperBound = UINT_MAX)
{
if (newValue > UINT_MAX) {
if (newValue > upperBound) {
MOT_LOG_WARN("Configuration of %s overflowed: keeping default value %u", name, oldValue);
} else if (lowerBound > 0 && newValue < lowerBound) {
MOT_LOG_WARN("Configuration of %s overflowed: keeping default value %u", name, oldValue);
} else if (oldValue != newValue) {
MOT_LOG_TRACE("Configuration of %s changed: %u --> %u", name, oldValue, (uint32_t)newValue);

View File

@ -1343,7 +1343,7 @@ void RecoveryManager::ClearTableCache()
while (it != m_tableDeletesStat.end()) {
auto table = *it;
if (table.second > NUM_DELETE_MAX_INC) {
MOT_LOG_INFO("RecoveryManager::ClearTableCache: Table = %s items = %lu\n",
MOT_LOG_TRACE("RecoveryManager::ClearTableCache: Table = %s items = %lu\n",
table.first->GetTableName().c_str(),
table.second);
table.first->ClearRowCache();

View File

@ -93,7 +93,7 @@ uint32_t RecoveryManager::RecoverLogOperationCreateTable(
Table* table = nullptr;
switch (state) {
case COMMIT:
MOT_LOG_INFO("RecoverLogOperationCreateTable: COMMIT");
MOT_LOG_DEBUG("RecoverLogOperationCreateTable: COMMIT");
CreateTable((char*)data, status, table, true);
break;

View File

@ -29,19 +29,42 @@
#include "global.h"
#include "utilities.h"
#include "mot_atomic_ops.h"
#include "mot_configuration.h"
namespace MOT {
DECLARE_LOGGER(AsyncRedoLogHandler, redolog)
AsyncRedoLogHandler::AsyncRedoLogHandler() : m_bufferPool(), m_activeBuffer(0)
AsyncRedoLogHandler::AsyncRedoLogHandler()
: m_bufferPool(),
m_redoLogBufferArrayCount(GetGlobalConfiguration().m_asyncRedoLogBufferArrayCount),
m_activeBuffer(0),
m_initialized(false)
{}
AsyncRedoLogHandler::~AsyncRedoLogHandler()
{}
{
// wait for all redo log buffers to be written
while (!m_writeQueue.empty()) {
usleep(WRITE_LOG_WAIT_INTERVAL);
}
if (m_initialized) {
pthread_mutex_destroy(&m_writeLock);
}
m_initialized = false;
}
bool AsyncRedoLogHandler::Init()
{
return m_bufferPool.Init();
bool result = false;
int rc = pthread_mutex_init(&m_writeLock, nullptr);
result = (rc == 0);
if (result != true) {
MOT_LOG_ERROR("Error initializing async redolog handler lock");
return result;
}
result = m_bufferPool.Init();
m_initialized = true;
return result;
}
RedoLogBuffer* AsyncRedoLogHandler::CreateBuffer()
@ -56,16 +79,18 @@ void AsyncRedoLogHandler::DestroyBuffer(RedoLogBuffer* buffer)
RedoLogBuffer* AsyncRedoLogHandler::WriteToLog(RedoLogBuffer* buffer)
{
int position = 0;
int position = -1;
do {
uint64_t activeBuffer = MOT_ATOMIC_LOAD(m_activeBuffer) % WRITE_LOG_BUFFER_COUNT;
position = m_tripleBuffer[activeBuffer].PushBack(buffer);
if (position == MAX_BUFFERS / 2) {
WakeupWalWriter();
} else if (position == -1) {
// async redo log ternary buffer is full, waiting for write thread
// to flush the buffer
usleep(WRITE_LOG_WAIT_INTERVAL);
m_switchLock.RdLock();
int activeBuffer = m_activeBuffer;
position = m_redoLogBufferArrayArray[m_activeBuffer].PushBack(buffer);
m_switchLock.RdUnlock();
if (position == -1) {
usleep(1000);
} else if (position == (int)(MAX_BUFFERS - 1)) {
if (TrySwitchBuffers(activeBuffer)) {
WriteSingleBuffer();
}
}
} while (position == -1);
return CreateBuffer();
@ -73,34 +98,89 @@ RedoLogBuffer* AsyncRedoLogHandler::WriteToLog(RedoLogBuffer* buffer)
void AsyncRedoLogHandler::Write()
{
// buffer list switch logic:
// while writers write to list 0, we increment index to 1 and then write list 2 to log
// while writers write to list 1, we increment index to 2 and then write list 0 to log
// while writers write to list 2, we increment index to 0 and then write list 1 to log
uint64_t currentIndex = MOT_ATOMIC_LOAD(m_activeBuffer);
uint64_t nextIndex = (currentIndex + 1) % WRITE_LOG_BUFFER_COUNT;
uint64_t prevIndex = (currentIndex + 2) % WRITE_LOG_BUFFER_COUNT;
// allow writers to start writing to next buffer list
MOT_ATOMIC_STORE(m_activeBuffer, nextIndex);
// in the meantime we write and flush previous list to log
// writers might still be writing to current list, but we do not touch it in this cycle
RedoLogBufferArray& prevBufferArray = m_tripleBuffer[prevIndex];
// invoke logger only if something happened, otherwise we just juggle with empty buffer arrays
if (!prevBufferArray.Empty()) {
m_logger->AddToLog(prevBufferArray);
m_logger->FlushLog();
FreeBuffers(prevBufferArray);
prevBufferArray.Reset();
m_switchLock.RdLock();
int activeBuffer = m_activeBuffer;
m_switchLock.RdUnlock();
if (TrySwitchBuffers(activeBuffer)) {
WriteSingleBuffer();
}
}
bool AsyncRedoLogHandler::TrySwitchBuffers(int index)
{
bool result = false;
int nextIndex = (index + 1) % m_redoLogBufferArrayCount;
m_switchLock.WrLock();
while (index == m_activeBuffer && !m_redoLogBufferArrayArray[index].Empty()) {
if (!m_redoLogBufferArrayArray[nextIndex].Empty()) {
// the next buffer was not yet written to the log
// wait until write complete
m_switchLock.WrUnlock();
usleep(WRITE_LOG_WAIT_INTERVAL);
m_switchLock.WrLock();
continue;
}
m_writeQueue.push(m_activeBuffer);
m_activeBuffer = nextIndex;
result = true;
}
m_switchLock.WrUnlock();
return result;
}
void AsyncRedoLogHandler::FreeBuffers(RedoLogBufferArray& bufferArray)
{
for (uint32_t i = 0; i < bufferArray.Size(); i++) {
m_bufferPool.Free(bufferArray[i]);
}
}
void AsyncRedoLogHandler::WriteSingleBuffer()
{
uint32_t writeBufferIndex;
pthread_mutex_lock(&m_writeLock);
if (!m_writeQueue.empty()) {
writeBufferIndex = m_writeQueue.front();
m_writeQueue.pop();
RedoLogBufferArray& bufferArray = m_redoLogBufferArrayArray[writeBufferIndex];
// invoke logger only if something happened, otherwise we just juggle with empty buffer arrays
if (!bufferArray.Empty()) {
m_logger->AddToLog(bufferArray);
m_logger->FlushLog();
FreeBuffers(bufferArray);
bufferArray.Reset();
}
}
pthread_mutex_unlock(&m_writeLock);
}
void AsyncRedoLogHandler::WriteAllBuffers()
{
uint32_t writeBufferIndex;
pthread_mutex_lock(&m_writeLock);
while (!m_writeQueue.empty()) {
writeBufferIndex = m_writeQueue.front();
m_writeQueue.pop();
RedoLogBufferArray& bufferArray = m_redoLogBufferArrayArray[writeBufferIndex];
// invoke logger only if something happened, otherwise we just juggle with empty buffer arrays
if (!bufferArray.Empty()) {
m_logger->AddToLog(bufferArray);
m_logger->FlushLog();
FreeBuffers(bufferArray);
bufferArray.Reset();
}
}
pthread_mutex_unlock(&m_writeLock);
}
void AsyncRedoLogHandler::Flush()
{
m_switchLock.RdLock();
int activeBuffer = m_activeBuffer;
m_switchLock.RdUnlock();
TrySwitchBuffers(activeBuffer);
WriteAllBuffers();
}
} // namespace MOT

View File

@ -26,8 +26,10 @@
#ifndef ASYNCHRONOUS_REDO_LOG_HANDLER_H
#define ASYNCHRONOUS_REDO_LOG_HANDLER_H
#include <queue>
#include "redo_log_handler.h"
#include "redo_log_buffer_pool.h"
#include "rw_lock.h"
namespace MOT {
class TxnManager;
@ -62,6 +64,8 @@ public:
*/
RedoLogBuffer* WriteToLog(RedoLogBuffer* buffer);
void Flush();
/**
* @brief switches the buffers and flushes the log
*/
@ -72,17 +76,26 @@ public:
~AsyncRedoLogHandler();
private:
static constexpr unsigned int WRITE_LOG_WAIT_INTERVAL = 10000; // micro seconds
static constexpr unsigned int WRITE_LOG_BUFFER_COUNT = 3;
static constexpr unsigned int WRITE_LOG_WAIT_INTERVAL = 1000; // micro second
/**
* @brief free all the RedoLogBuffers in the array and return them to the pool
*/
void FreeBuffers(RedoLogBufferArray& bufferArray);
bool TrySwitchBuffers(int index);
void WriteSingleBuffer();
void WriteAllBuffers();
RedoLogBufferPool m_bufferPool;
RedoLogBufferArray m_tripleBuffer[WRITE_LOG_BUFFER_COUNT]; // 3 buffer arrays for switching in cyclic manner.
volatile uint64_t m_activeBuffer;
// array of RedoLogBufferArray for switching in cyclic manner.
RedoLogBufferArray m_redoLogBufferArrayArray[MAX_ASYNC_REDO_LOG_BUFFER_ARRAY_COUNT];
uint32_t m_redoLogBufferArrayCount;
volatile int m_activeBuffer;
bool m_initialized;
RwLock m_switchLock;
pthread_mutex_t m_writeLock;
std::queue<uint32_t> m_writeQueue;
};
} // namespace MOT
#endif /* ASYNCHRONOUS_REDO_LOG_HANDLER_H */
#endif /* ASYNCHRONOUS_REDO_LOG_HANDLER_H */

View File

@ -30,7 +30,7 @@
#include "global.h"
#include "redo_log_buffer.h"
#define MAX_BUFFERS 1000
#define MAX_BUFFERS 128
namespace MOT {
class RedoLogBufferArray {
@ -91,6 +91,10 @@ public:
return m_array;
}
static uint32_t MaxSize()
{
return MAX_BUFFERS;
}
private:
std::atomic<uint32_t> m_nextFree;
RedoLogBuffer* m_array[MAX_BUFFERS];

View File

@ -72,6 +72,11 @@ public:
*/
void Commit(bool isLeader, std::shared_ptr<CommitGroup> groupRef);
inline bool IsCommitted()
{
return m_commited;
}
private:
const uint8_t m_handlerId;
GroupSyncRedoLogHandler* m_handler;

View File

@ -155,4 +155,17 @@ RedoLogBuffer* GroupSyncRedoLogHandler::WriteToLog(RedoLogBuffer* buffer)
joinedGroup->Commit(leader, joinedGroup);
return buffer;
}
void GroupSyncRedoLogHandler::Flush()
{
std::shared_ptr<CommitGroup> flushGroup = m_currentGroup;
if (flushGroup == nullptr) {
return;
}
while (!flushGroup->IsCommitted()) {
// spin until group is flushed
cpu_relax();
}
}
} // namespace MOT

View File

@ -63,6 +63,8 @@ public:
*/
virtual RedoLogBuffer* WriteToLog(RedoLogBuffer* buffer);
void Flush();
/**
* @brief initializes group params
* @param group the group to work on

View File

@ -74,4 +74,11 @@ void SegmentedGroupSyncRedoLogHandler::SetLogger(ILogger* logger)
for (unsigned int i = 0; i < m_numaNodes; i++)
m_redoLogHandlerArray[i].SetLogger(logger);
}
void SegmentedGroupSyncRedoLogHandler::Flush()
{
for (unsigned int i = 0; i < m_numaNodes; i++) {
m_redoLogHandlerArray[i].Flush();
}
}
} // namespace MOT

View File

@ -61,6 +61,7 @@ public:
* @return The next buffer to write to, or null in case of failure.
*/
virtual RedoLogBuffer* WriteToLog(RedoLogBuffer* buffer);
virtual void Flush();
virtual void SetLogger(ILogger* logger);
private:

View File

@ -68,6 +68,11 @@ public:
*/
virtual RedoLogBuffer* WriteToLog(RedoLogBuffer* buffer) = 0;
/**
* @brief flush all buffers (if exist) to log
*/
virtual void Flush() = 0;
/**
* @brief flushes the the log
*/

View File

@ -60,4 +60,7 @@ RedoLogBuffer* SynchronousRedoLogHandler::WriteToLog(RedoLogBuffer* buffer)
m_logger->FlushLog();
return buffer;
}
void SynchronousRedoLogHandler::Flush()
{}
} // namespace MOT

View File

@ -57,6 +57,7 @@ public:
* @return The next buffer to write to, or null in case of failure.
*/
RedoLogBuffer* WriteToLog(RedoLogBuffer* buffer);
void Flush();
SynchronousRedoLogHandler(const SynchronousRedoLogHandler& orig) = delete;
SynchronousRedoLogHandler& operator=(const SynchronousRedoLogHandler& orig) = delete;
~SynchronousRedoLogHandler();

View File

@ -80,6 +80,13 @@ void MOTRedo(XLogReaderState* record)
}
}
uint64_t XLOGLogger::AddToLog(MOT::RedoLogBuffer** redoLogBufferArray, uint32_t size)
{
uint64_t written = MOT::ILogger::AddToLog(redoLogBufferArray, size);
XLogSetAsyncXactLSN(t_thrd.xlog_cxt.XactLastRecEnd);
return written;
}
uint64_t XLOGLogger::AddToLog(uint8_t* data, uint32_t size)
{
XLogBeginInsert();
@ -88,29 +95,6 @@ uint64_t XLOGLogger::AddToLog(uint8_t* data, uint32_t size)
return size;
}
uint64_t XLOGLogger::AddToLog(MOT::RedoLogBuffer* redoBuffer)
{
uint32_t length;
uint8_t* data = redoBuffer->Serialize(&length);
return AddToLog(data, length);
}
uint64_t XLOGLogger::AddToLog(MOT::RedoLogBuffer** redoTransactionArray, uint32_t size)
{
uint32_t written = 0;
// ensure that we have enough space to add all transaction buffers
XLogEnsureRecordSpace(0, size);
XLogBeginInsert();
for (uint32_t i = 0; i < size; i++) {
uint32_t length;
uint8_t* data = redoTransactionArray[i]->Serialize(&length);
XLogRegisterData((char*)data, length);
written += length;
}
XLogInsert(RM_MOT_ID, MOT_REDO_DATA);
return written;
}
void XLOGLogger::FlushLog()
{}

View File

@ -45,9 +45,8 @@ public:
inline ~XLOGLogger()
{}
uint64_t AddToLog(MOT::RedoLogBuffer** redoLogBufferArray, uint32_t size);
uint64_t AddToLog(uint8_t* data, uint32_t size);
uint64_t AddToLog(MOT::RedoLogBuffer* redoBuffer);
uint64_t AddToLog(MOT::RedoLogBuffer** redoBufferArray, uint32_t size);
void FlushLog();
void CloseLog();
void ClearLog();