!536 Removed MOT async redo handler to fully align with upper layer and fixed serialization of transaction DMLs
Merge pull request !536 from Vinoth Veeraraghavan/master
This commit is contained in:
@ -66,12 +66,6 @@
|
||||
#define NANOSECONDS_PER_MILLISECOND 1000000L
|
||||
#define NANOSECONDS_PER_SECOND 1000000000L
|
||||
|
||||
typedef struct WALCallbackItem {
|
||||
struct WALCallbackItem* next;
|
||||
WALCallback callback;
|
||||
void* arg;
|
||||
} WALCallbackItem;
|
||||
|
||||
/* Signal handlers */
|
||||
static void wal_quickdie(SIGNAL_ARGS);
|
||||
static void WalSigHupHandler(SIGNAL_ARGS);
|
||||
@ -283,9 +277,6 @@ void WalWriterMain(void)
|
||||
proc_exit(0); /* done */
|
||||
}
|
||||
|
||||
/* execute callbacks (i.e. write data from MOT) */
|
||||
CallWALCallback();
|
||||
|
||||
wrote_something = XLogBackgroundFlush();
|
||||
|
||||
if (!wrote_something && ++times_wrote_nothing > g_instance.attr.attr_storage.xlog_idle_flushes_before_sleep) {
|
||||
@ -415,23 +406,3 @@ static void walwriter_sigusr1_handler(SIGNAL_ARGS)
|
||||
|
||||
errno = save_errno;
|
||||
}
|
||||
|
||||
void RegisterWALCallback(WALCallback callback, void* arg)
|
||||
{
|
||||
WALCallbackItem* item;
|
||||
|
||||
item = (WALCallbackItem*)MemoryContextAlloc(g_instance.instance_context, sizeof(WALCallbackItem));
|
||||
item->callback = callback;
|
||||
item->arg = arg;
|
||||
item->next = g_instance.xlog_cxt.walCallback;
|
||||
g_instance.xlog_cxt.walCallback = item;
|
||||
}
|
||||
|
||||
void CallWALCallback()
|
||||
{
|
||||
WALCallbackItem* item;
|
||||
|
||||
for (item = g_instance.xlog_cxt.walCallback; item; item = item->next) {
|
||||
(*item->callback) (item->arg);
|
||||
}
|
||||
}
|
||||
|
@ -242,7 +242,6 @@ static void knl_g_executor_init(knl_g_executor_context* exec_cxt)
|
||||
static void knl_g_xlog_init(knl_g_xlog_context *xlog_cxt)
|
||||
{
|
||||
xlog_cxt->num_locks_in_group = 0;
|
||||
xlog_cxt->walCallback = NULL;
|
||||
xlog_cxt->redoCommitCallback = NULL;
|
||||
}
|
||||
|
||||
|
@ -1417,7 +1417,7 @@ static inline bool IsTimeUnitMicroSeconds(mot_string& suffix)
|
||||
|
||||
uint64_t MOTConfiguration::ParseTimeValueMicros(const char* timeValue, uint64_t defaultValue, const char* cfgPath)
|
||||
{
|
||||
uint64_t timeValueMicors = defaultValue;
|
||||
uint64_t timeValueMicros = defaultValue;
|
||||
char* endptr = NULL;
|
||||
unsigned long long value = strtoull(timeValue, &endptr, 0);
|
||||
if (endptr == timeValue) {
|
||||
@ -1425,27 +1425,27 @@ uint64_t MOTConfiguration::ParseTimeValueMicros(const char* timeValue, uint64_t
|
||||
} else if (*endptr == 0) {
|
||||
MOT_LOG_WARN("Invalid %s time value format: %s (expecting unit type after value)", cfgPath, timeValue);
|
||||
} else {
|
||||
// get unit type and convert to milli-seconds
|
||||
// get unit type and convert to micro-seconds
|
||||
mot_string suffix(endptr);
|
||||
suffix.trim();
|
||||
if (IsTimeUnitDays(suffix)) {
|
||||
MOT_LOG_TRACE("Loaded %s: %u days", cfgPath, value);
|
||||
timeValueMicors = ((uint64_t)value) * 24ull * 60ull * 60ull * 1000ull * 1000ull;
|
||||
timeValueMicros = ((uint64_t)value) * 24ull * 60ull * 60ull * 1000ull * 1000ull;
|
||||
} else if (IsTimeUnitHours(suffix)) {
|
||||
MOT_LOG_TRACE("Loaded %s: %u hours", cfgPath, value);
|
||||
timeValueMicors = ((uint64_t)value) * 60ull * 60ull * 1000ull * 1000ull;
|
||||
timeValueMicros = ((uint64_t)value) * 60ull * 60ull * 1000ull * 1000ull;
|
||||
} else if (IsTimeUnitMinutes(suffix)) {
|
||||
MOT_LOG_TRACE("Loaded %s: %u minutes", cfgPath, value);
|
||||
timeValueMicors = ((uint64_t)value) * 60ull * 1000ull * 1000ull;
|
||||
timeValueMicros = ((uint64_t)value) * 60ull * 1000ull * 1000ull;
|
||||
} else if (IsTimeUnitSeconds(suffix)) {
|
||||
MOT_LOG_TRACE("Loaded %s: %u seconds", cfgPath, value);
|
||||
timeValueMicors = ((uint64_t)value) * 1000ull * 1000ull;
|
||||
timeValueMicros = ((uint64_t)value) * 1000ull * 1000ull;
|
||||
} else if (IsTimeUnitMilliSeconds(suffix)) {
|
||||
MOT_LOG_TRACE("Loaded %s: %u milli-seconds", cfgPath, value);
|
||||
timeValueMicors = ((uint64_t)value) * 1000ull;
|
||||
timeValueMicros = ((uint64_t)value) * 1000ull;
|
||||
} else if (IsTimeUnitMicroSeconds(suffix)) {
|
||||
MOT_LOG_TRACE("Loaded %s: %u micro-seconds", cfgPath, value);
|
||||
timeValueMicors = ((uint64_t)value);
|
||||
timeValueMicros = ((uint64_t)value);
|
||||
} else {
|
||||
MOT_LOG_WARN("Invalid %s time value format: %s (invalid unit specifier '%s' - should be one of d, h, m, s, "
|
||||
"ms or us)",
|
||||
@ -1454,7 +1454,7 @@ uint64_t MOTConfiguration::ParseTimeValueMicros(const char* timeValue, uint64_t
|
||||
suffix.c_str());
|
||||
}
|
||||
}
|
||||
return timeValueMicors;
|
||||
return timeValueMicros;
|
||||
}
|
||||
|
||||
bool MOTConfiguration::CheckHyperThreads()
|
||||
|
@ -36,7 +36,8 @@
|
||||
#include "commit_sequence_number.h"
|
||||
#include "checkpoint_manager.h"
|
||||
#include "utilities.h"
|
||||
#include "asynchronous_redo_log_handler.h"
|
||||
#include "redo_log_handler.h"
|
||||
#include "mot_configuration.h"
|
||||
#include "irecovery_manager.h"
|
||||
#include "table_manager.h"
|
||||
#include "session_manager.h"
|
||||
|
@ -247,8 +247,7 @@ void TxnManager::CommitInternal()
|
||||
GetCheckpointManager()->EndCommit(this);
|
||||
}
|
||||
|
||||
if (!GetGlobalConfiguration().m_enableRedoLog ||
|
||||
GetGlobalConfiguration().m_redoLogHandlerType == RedoLogHandlerType::ASYNC_REDO_LOG_HANDLER) {
|
||||
if (!GetGlobalConfiguration().m_enableRedoLog) {
|
||||
m_occManager.ReleaseLocks(this);
|
||||
}
|
||||
}
|
||||
@ -299,8 +298,7 @@ void TxnManager::CommitPrepared()
|
||||
GetCheckpointManager()->EndCommit(this);
|
||||
}
|
||||
|
||||
if (!GetGlobalConfiguration().m_enableRedoLog ||
|
||||
GetGlobalConfiguration().m_redoLogHandlerType == RedoLogHandlerType::ASYNC_REDO_LOG_HANDLER) {
|
||||
if (!GetGlobalConfiguration().m_enableRedoLog) {
|
||||
m_occManager.ReleaseLocks(this);
|
||||
}
|
||||
MOT::DbSessionStatisticsProvider::GetInstance().AddCommitPreparedTxn();
|
||||
@ -320,7 +318,6 @@ void TxnManager::LiteCommitPrepared()
|
||||
void TxnManager::EndTransaction()
|
||||
{
|
||||
if (GetGlobalConfiguration().m_enableRedoLog &&
|
||||
GetGlobalConfiguration().m_redoLogHandlerType != RedoLogHandlerType::ASYNC_REDO_LOG_HANDLER &&
|
||||
IsFailedCommitPrepared() == false) {
|
||||
m_occManager.ReleaseLocks(this);
|
||||
}
|
||||
|
@ -310,15 +310,13 @@ private:
|
||||
*/
|
||||
void ReleaseAccess(Access* ac)
|
||||
{
|
||||
m_allocatedAc--;
|
||||
m_rowCnt--;
|
||||
uint32_t release_id = ac->GetBufferId();
|
||||
// Swap buffer_id's
|
||||
ac->SwapId(*m_accessesSetBuff[m_allocatedAc]);
|
||||
m_accessesSetBuff[release_id] = m_accessesSetBuff[m_allocatedAc];
|
||||
m_accessesSetBuff[m_allocatedAc] = nullptr;
|
||||
ac->SwapId(*m_accessesSetBuff[m_rowCnt]);
|
||||
m_accessesSetBuff[release_id] = m_accessesSetBuff[m_rowCnt];
|
||||
m_accessesSetBuff[m_rowCnt] = ac;
|
||||
DestroyAccess(ac);
|
||||
delete ac;
|
||||
}
|
||||
|
||||
/** @brief Doubles the size of the access set. */
|
||||
|
@ -1,188 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
|
||||
*
|
||||
* openGauss is licensed under Mulan PSL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PSL v2.
|
||||
* You may obtain a copy of Mulan PSL v2 at:
|
||||
*
|
||||
* http://license.coscl.org.cn/MulanPSL2
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PSL v2 for more details.
|
||||
* -------------------------------------------------------------------------
|
||||
*
|
||||
* asynchronous_redo_log_handler.cpp
|
||||
* Implements an asynchronous redo log.
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/gausskernel/storage/mot/core/src/system/transaction_logger/
|
||||
* asynchronous_redo_log/asynchronous_redo_log_handler.cpp
|
||||
*
|
||||
* -------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "asynchronous_redo_log_handler.h"
|
||||
|
||||
#include "txn.h"
|
||||
#include "global.h"
|
||||
#include "utilities.h"
|
||||
#include "mot_atomic_ops.h"
|
||||
#include "mot_configuration.h"
|
||||
|
||||
namespace MOT {
|
||||
DECLARE_LOGGER(AsyncRedoLogHandler, RedoLog)
|
||||
|
||||
AsyncRedoLogHandler::AsyncRedoLogHandler()
|
||||
: m_bufferPool(),
|
||||
m_redoLogBufferArrayCount(GetGlobalConfiguration().m_asyncRedoLogBufferArrayCount),
|
||||
m_activeBuffer(0),
|
||||
m_initialized(false)
|
||||
{}
|
||||
|
||||
AsyncRedoLogHandler::~AsyncRedoLogHandler()
|
||||
{
|
||||
// wait for all redo log buffers to be written
|
||||
while (!m_writeQueue.empty()) {
|
||||
usleep(WRITE_LOG_WAIT_INTERVAL);
|
||||
}
|
||||
if (m_initialized) {
|
||||
pthread_mutex_destroy(&m_writeLock);
|
||||
}
|
||||
m_initialized = false;
|
||||
}
|
||||
|
||||
bool AsyncRedoLogHandler::Init()
|
||||
{
|
||||
bool result = false;
|
||||
int rc = pthread_mutex_init(&m_writeLock, nullptr);
|
||||
result = (rc == 0);
|
||||
if (result != true) {
|
||||
MOT_LOG_ERROR("Error initializing async redolog handler lock");
|
||||
return result;
|
||||
}
|
||||
result = m_bufferPool.Init();
|
||||
m_initialized = true;
|
||||
return result;
|
||||
}
|
||||
|
||||
RedoLogBuffer* AsyncRedoLogHandler::CreateBuffer()
|
||||
{
|
||||
return m_bufferPool.Alloc();
|
||||
}
|
||||
|
||||
void AsyncRedoLogHandler::DestroyBuffer(RedoLogBuffer* buffer)
|
||||
{
|
||||
buffer->Reset();
|
||||
m_bufferPool.Free(buffer);
|
||||
}
|
||||
|
||||
RedoLogBuffer* AsyncRedoLogHandler::WriteToLog(RedoLogBuffer* buffer)
|
||||
{
|
||||
int position = -1;
|
||||
do {
|
||||
m_switchLock.RdLock();
|
||||
int activeBuffer = m_activeBuffer;
|
||||
position = m_redoLogBufferArrayArray[m_activeBuffer].PushBack(buffer);
|
||||
m_switchLock.RdUnlock();
|
||||
if (position == -1) {
|
||||
usleep(1000);
|
||||
} else if (position == (int)(MAX_BUFFERS - 1)) {
|
||||
if (TrySwitchBuffers(activeBuffer)) {
|
||||
WriteSingleBuffer();
|
||||
}
|
||||
}
|
||||
} while (position == -1);
|
||||
return CreateBuffer();
|
||||
}
|
||||
|
||||
void AsyncRedoLogHandler::Write()
|
||||
{
|
||||
m_switchLock.RdLock();
|
||||
int activeBuffer = m_activeBuffer;
|
||||
m_switchLock.RdUnlock();
|
||||
if (TrySwitchBuffers(activeBuffer)) {
|
||||
WriteSingleBuffer();
|
||||
}
|
||||
}
|
||||
|
||||
bool AsyncRedoLogHandler::TrySwitchBuffers(int index)
|
||||
{
|
||||
bool result = false;
|
||||
int nextIndex = (index + 1) % m_redoLogBufferArrayCount;
|
||||
m_switchLock.WrLock();
|
||||
while (index == m_activeBuffer && !m_redoLogBufferArrayArray[index].Empty()) {
|
||||
if (!m_redoLogBufferArrayArray[nextIndex].Empty()) {
|
||||
// the next buffer was not yet written to the log
|
||||
// wait until write complete
|
||||
m_switchLock.WrUnlock();
|
||||
usleep(WRITE_LOG_WAIT_INTERVAL);
|
||||
m_switchLock.WrLock();
|
||||
continue;
|
||||
}
|
||||
m_writeQueue.push(m_activeBuffer);
|
||||
m_activeBuffer = nextIndex;
|
||||
result = true;
|
||||
}
|
||||
m_switchLock.WrUnlock();
|
||||
return result;
|
||||
}
|
||||
|
||||
void AsyncRedoLogHandler::FreeBuffers(RedoLogBufferArray& bufferArray)
|
||||
{
|
||||
for (uint32_t i = 0; i < bufferArray.Size(); i++) {
|
||||
bufferArray[i]->Reset();
|
||||
m_bufferPool.Free(bufferArray[i]);
|
||||
}
|
||||
}
|
||||
|
||||
void AsyncRedoLogHandler::WriteSingleBuffer()
|
||||
{
|
||||
uint32_t writeBufferIndex;
|
||||
pthread_mutex_lock(&m_writeLock);
|
||||
if (!m_writeQueue.empty()) {
|
||||
writeBufferIndex = m_writeQueue.front();
|
||||
m_writeQueue.pop();
|
||||
RedoLogBufferArray& bufferArray = m_redoLogBufferArrayArray[writeBufferIndex];
|
||||
|
||||
// invoke logger only if something happened, otherwise we just juggle with empty buffer arrays
|
||||
if (!bufferArray.Empty()) {
|
||||
m_logger->AddToLog(bufferArray);
|
||||
m_logger->FlushLog();
|
||||
FreeBuffers(bufferArray);
|
||||
bufferArray.Reset();
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock(&m_writeLock);
|
||||
}
|
||||
|
||||
void AsyncRedoLogHandler::WriteAllBuffers()
|
||||
{
|
||||
uint32_t writeBufferIndex;
|
||||
pthread_mutex_lock(&m_writeLock);
|
||||
while (!m_writeQueue.empty()) {
|
||||
writeBufferIndex = m_writeQueue.front();
|
||||
m_writeQueue.pop();
|
||||
RedoLogBufferArray& bufferArray = m_redoLogBufferArrayArray[writeBufferIndex];
|
||||
|
||||
// invoke logger only if something happened, otherwise we just juggle with empty buffer arrays
|
||||
if (!bufferArray.Empty()) {
|
||||
m_logger->AddToLog(bufferArray);
|
||||
m_logger->FlushLog();
|
||||
FreeBuffers(bufferArray);
|
||||
bufferArray.Reset();
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock(&m_writeLock);
|
||||
}
|
||||
|
||||
void AsyncRedoLogHandler::Flush()
|
||||
{
|
||||
m_switchLock.RdLock();
|
||||
int activeBuffer = m_activeBuffer;
|
||||
m_switchLock.RdUnlock();
|
||||
TrySwitchBuffers(activeBuffer);
|
||||
WriteAllBuffers();
|
||||
}
|
||||
} // namespace MOT
|
@ -1,102 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
|
||||
*
|
||||
* openGauss is licensed under Mulan PSL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PSL v2.
|
||||
* You may obtain a copy of Mulan PSL v2 at:
|
||||
*
|
||||
* http://license.coscl.org.cn/MulanPSL2
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PSL v2 for more details.
|
||||
* -------------------------------------------------------------------------
|
||||
*
|
||||
* asynchronous_redo_log_handler.h
|
||||
* Implements an asynchronous redo log.
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/gausskernel/storage/mot/core/src/system/transaction_logger/
|
||||
* asynchronous_redo_log/asynchronous_redo_log_handler.h
|
||||
*
|
||||
* -------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#ifndef ASYNCHRONOUS_REDO_LOG_HANDLER_H
|
||||
#define ASYNCHRONOUS_REDO_LOG_HANDLER_H
|
||||
|
||||
#include <queue>
|
||||
#include "redo_log_handler.h"
|
||||
#include "redo_log_buffer_pool.h"
|
||||
#include "rw_lock.h"
|
||||
#include "mot_configuration.h"
|
||||
|
||||
namespace MOT {
|
||||
class TxnManager;
|
||||
|
||||
/**
|
||||
* @class AsyncRedoLogHandler
|
||||
* @brief implements an asynchronous redo log
|
||||
*/
|
||||
class AsyncRedoLogHandler : public RedoLogHandler {
|
||||
public:
|
||||
AsyncRedoLogHandler();
|
||||
|
||||
/** @brief Initializes the redo-log handler. */
|
||||
bool Init();
|
||||
|
||||
/**
|
||||
* @brief creates a new Buffer object
|
||||
* @return a Buffer
|
||||
*/
|
||||
RedoLogBuffer* CreateBuffer();
|
||||
|
||||
/**
|
||||
* @brief destroys a Buffer object
|
||||
* @param buffer pointer to be destroyed and de-allocated
|
||||
*/
|
||||
void DestroyBuffer(RedoLogBuffer* buffer);
|
||||
|
||||
/**
|
||||
* @brief Inserts the data to the buffer.
|
||||
* @param buffer The buffer to write to log.
|
||||
* @return The next buffer to write to, or null in case of failure.
|
||||
*/
|
||||
RedoLogBuffer* WriteToLog(RedoLogBuffer* buffer);
|
||||
|
||||
void Flush();
|
||||
|
||||
/**
|
||||
* @brief switches the buffers and flushes the log
|
||||
*/
|
||||
void Write();
|
||||
|
||||
AsyncRedoLogHandler(const AsyncRedoLogHandler& orig) = delete;
|
||||
AsyncRedoLogHandler& operator=(const AsyncRedoLogHandler& orig) = delete;
|
||||
~AsyncRedoLogHandler();
|
||||
|
||||
private:
|
||||
static constexpr unsigned int WRITE_LOG_WAIT_INTERVAL = 1000; // micro second
|
||||
|
||||
/**
|
||||
* @brief free all the RedoLogBuffers in the array and return them to the pool
|
||||
*/
|
||||
void FreeBuffers(RedoLogBufferArray& bufferArray);
|
||||
bool TrySwitchBuffers(int index);
|
||||
void WriteSingleBuffer();
|
||||
void WriteAllBuffers();
|
||||
|
||||
RedoLogBufferPool m_bufferPool;
|
||||
// array of RedoLogBufferArray for switching in cyclic manner.
|
||||
RedoLogBufferArray m_redoLogBufferArrayArray[MOTConfiguration::MAX_ASYNC_REDO_LOG_BUFFER_ARRAY_COUNT];
|
||||
uint32_t m_redoLogBufferArrayCount;
|
||||
volatile int m_activeBuffer;
|
||||
bool m_initialized;
|
||||
RwLock m_switchLock;
|
||||
pthread_mutex_t m_writeLock;
|
||||
std::queue<uint32_t> m_writeQueue;
|
||||
};
|
||||
} // namespace MOT
|
||||
|
||||
#endif /* ASYNCHRONOUS_REDO_LOG_HANDLER_H */
|
@ -1,105 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
|
||||
*
|
||||
* openGauss is licensed under Mulan PSL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PSL v2.
|
||||
* You may obtain a copy of Mulan PSL v2 at:
|
||||
*
|
||||
* http://license.coscl.org.cn/MulanPSL2
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PSL v2 for more details.
|
||||
* -------------------------------------------------------------------------
|
||||
*
|
||||
* redo_log_buffer_array.h
|
||||
* Implements an array of redo log buffers.
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/gausskernel/storage/mot/core/src/system/transaction_logger/asynchronous_redo_log/redo_log_buffer_array.h
|
||||
*
|
||||
* -------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#ifndef REDO_LOG_BUFFER_ARRAY_H
|
||||
#define REDO_LOG_BUFFER_ARRAY_H
|
||||
|
||||
#include <atomic>
|
||||
#include "utilities.h"
|
||||
#include "global.h"
|
||||
#include "redo_log_buffer.h"
|
||||
|
||||
#define MAX_BUFFERS 128
|
||||
|
||||
namespace MOT {
|
||||
class RedoLogBufferArray {
|
||||
public:
|
||||
RedoLogBufferArray() : m_nextFree(0)
|
||||
{
|
||||
for (uint32_t i = 0; i < MAX_BUFFERS; i++) {
|
||||
m_array[i] = nullptr;
|
||||
}
|
||||
}
|
||||
RedoLogBufferArray(const RedoLogBufferArray& orig) = delete;
|
||||
virtual ~RedoLogBufferArray(){};
|
||||
|
||||
uint32_t Size() const
|
||||
{
|
||||
return m_nextFree;
|
||||
}
|
||||
|
||||
bool Empty() const
|
||||
{
|
||||
return (m_nextFree == 0);
|
||||
}
|
||||
|
||||
void Reset()
|
||||
{
|
||||
m_nextFree = 0;
|
||||
}
|
||||
|
||||
RedoLogBuffer* Front()
|
||||
{
|
||||
return m_array[0];
|
||||
}
|
||||
|
||||
RedoLogBuffer* Back()
|
||||
{
|
||||
return m_array[m_nextFree - 1];
|
||||
}
|
||||
|
||||
RedoLogBuffer* operator[](size_t idx)
|
||||
{
|
||||
return m_array[idx];
|
||||
}
|
||||
|
||||
int PushBack(RedoLogBuffer* val)
|
||||
{
|
||||
uint32_t index = m_nextFree.fetch_add(1);
|
||||
if (index >= MAX_BUFFERS) {
|
||||
m_nextFree.fetch_sub(1);
|
||||
return -1;
|
||||
} else {
|
||||
m_array[index] = std::move(val);
|
||||
return (int)index;
|
||||
}
|
||||
}
|
||||
|
||||
RedoLogBuffer** GetEntries()
|
||||
{
|
||||
return m_array;
|
||||
}
|
||||
|
||||
static uint32_t MaxSize()
|
||||
{
|
||||
return MAX_BUFFERS;
|
||||
}
|
||||
|
||||
private:
|
||||
std::atomic<uint32_t> m_nextFree;
|
||||
RedoLogBuffer* m_array[MAX_BUFFERS];
|
||||
};
|
||||
} // namespace MOT
|
||||
|
||||
#endif /* REDO_LOG_BUFFER_ARRAY_H */
|
@ -1,116 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
|
||||
*
|
||||
* openGauss is licensed under Mulan PSL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PSL v2.
|
||||
* You may obtain a copy of Mulan PSL v2 at:
|
||||
*
|
||||
* http://license.coscl.org.cn/MulanPSL2
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PSL v2 for more details.
|
||||
* -------------------------------------------------------------------------
|
||||
*
|
||||
* redo_log_buffer_pool.cpp
|
||||
* Manages a simple thread-safe pool of RedoLogBuffer objects with memory buffers managed externally.
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/gausskernel/storage/mot/core/src/system/transaction_logger/asynchronous_redo_log/redo_log_buffer_pool.cpp
|
||||
*
|
||||
* -------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "redo_log_buffer_pool.h"
|
||||
#include "utilities.h"
|
||||
|
||||
namespace MOT {
|
||||
DECLARE_LOGGER(RedoLogBufferPool, RedoLog);
|
||||
|
||||
RedoLogBufferPool::RedoLogBufferPool(
|
||||
uint32_t bufferSize /* = REDO_DEFAULT_BUFFER_SIZE */, uint32_t growSize /* = REDO_DEFAULT_GROW_SIZE */)
|
||||
: m_bufferSize(bufferSize), m_growSize(growSize), m_freeList(nullptr), m_objectPool(nullptr)
|
||||
{}
|
||||
|
||||
RedoLogBufferPool::~RedoLogBufferPool()
|
||||
{
|
||||
ClearFreeList();
|
||||
ObjAllocInterface::FreeObjPool(&m_objectPool);
|
||||
}
|
||||
|
||||
bool RedoLogBufferPool::Init()
|
||||
{
|
||||
// we create a global object pool even though pool access is guarded by lock,
|
||||
// because we do not want to rely on session-local memory for two main reasons:
|
||||
// 1. At this point of pool creation there might not be a session (but we can still create one if we want)
|
||||
// 2. The pool is accessed by many threads, and therefore whenever pool needs to be refilled, each allocation
|
||||
// will be made by a different session (and this will lead to core dump during de-allocation, if
|
||||
// de-allocation takes place at the wrong session).
|
||||
bool result = true;
|
||||
m_objectPool = ObjAllocInterface::GetObjPool(sizeof(RedoLogBuffer), false);
|
||||
if (m_objectPool == nullptr) {
|
||||
MOT_REPORT_ERROR(MOT_ERROR_OOM, "Load Redo-Log Handler", "Failed to allocate object pool");
|
||||
result = false;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
RedoLogBuffer* RedoLogBufferPool::Alloc()
|
||||
{
|
||||
RedoLogBuffer* redoLogBuffer = nullptr;
|
||||
std::unique_lock<std::mutex> lock(m_lock);
|
||||
if (m_freeList == nullptr) {
|
||||
RefillFreeList();
|
||||
}
|
||||
|
||||
redoLogBuffer = m_freeList;
|
||||
if (m_freeList != nullptr) {
|
||||
m_freeList = m_freeList->GetNext();
|
||||
}
|
||||
return redoLogBuffer;
|
||||
}
|
||||
|
||||
void RedoLogBufferPool::Free(RedoLogBuffer* buffer)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(m_lock);
|
||||
buffer->SetNext(m_freeList);
|
||||
m_freeList = buffer;
|
||||
}
|
||||
|
||||
void RedoLogBufferPool::RefillFreeList()
|
||||
{
|
||||
for (uint32_t i = 0; i < m_growSize; ++i) {
|
||||
uint8_t* buffer = new (std::nothrow) uint8_t[m_bufferSize];
|
||||
if (buffer == nullptr) {
|
||||
break;
|
||||
}
|
||||
RedoLogBuffer* redoLogBuffer = m_objectPool->Alloc<RedoLogBuffer>(buffer, m_bufferSize);
|
||||
if (redoLogBuffer != nullptr) {
|
||||
redoLogBuffer->SetNext(m_freeList);
|
||||
m_freeList = redoLogBuffer;
|
||||
} else {
|
||||
delete[] buffer;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void RedoLogBufferPool::ClearFreeList()
|
||||
{
|
||||
while (m_freeList != nullptr) {
|
||||
// remove next redo-log buffer from the free list
|
||||
RedoLogBuffer* redoLogBuffer = m_freeList;
|
||||
m_freeList = m_freeList->GetNext();
|
||||
|
||||
// get memory buffer and detach it (so it will not be deallocated in Buffer destructor)
|
||||
uint8_t* buffer = redoLogBuffer->Detach();
|
||||
|
||||
// get rid of the object
|
||||
m_objectPool->Release(redoLogBuffer);
|
||||
|
||||
// and get rid of the memory buffer
|
||||
delete[] buffer;
|
||||
}
|
||||
}
|
||||
} // namespace MOT
|
@ -1,89 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
|
||||
*
|
||||
* openGauss is licensed under Mulan PSL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PSL v2.
|
||||
* You may obtain a copy of Mulan PSL v2 at:
|
||||
*
|
||||
* http://license.coscl.org.cn/MulanPSL2
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PSL v2 for more details.
|
||||
* -------------------------------------------------------------------------
|
||||
*
|
||||
* redo_log_buffer_pool.h
|
||||
* Manages a simple thread-safe pool of RedoLogBuffer objects with memory buffers managed externally.
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/gausskernel/storage/mot/core/src/system/transaction_logger/asynchronous_redo_log/redo_log_buffer_pool.h
|
||||
*
|
||||
* -------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#ifndef REDO_LOG_BUFFER_POOL_H
|
||||
#define REDO_LOG_BUFFER_POOL_H
|
||||
|
||||
#include <atomic>
|
||||
#include <cstdint>
|
||||
#include <mutex>
|
||||
|
||||
#include "object_pool.h"
|
||||
#include "redo_log_buffer.h"
|
||||
|
||||
namespace MOT {
|
||||
/** @define By default add 8 buffers each time pool is depleted. */
|
||||
#define REDO_DEFAULT_GROW_SIZE 8
|
||||
|
||||
/**
|
||||
* @brief Manages a simple thread-safe pool of RedoLogBuffer objects with memory buffers managed externally (i.e. not
|
||||
* allocated and destroyed by parent Buffer class).
|
||||
*/
|
||||
class RedoLogBufferPool {
|
||||
public:
|
||||
/**
|
||||
* @brief Constructor.
|
||||
* @param[opt] bufferSize The size of each memory buffer within each RedoLogBuffer object.
|
||||
* @param[opt] growSize The amount of RedoLogBuffer objects to add to the pool each time the pool is depleted.
|
||||
*/
|
||||
explicit RedoLogBufferPool(
|
||||
uint32_t bufferSize = REDO_DEFAULT_BUFFER_SIZE, uint32_t growSize = REDO_DEFAULT_GROW_SIZE);
|
||||
|
||||
/** @brief Destructor. */
|
||||
virtual ~RedoLogBufferPool();
|
||||
|
||||
/** @brief Initializes the buffer pool. */
|
||||
bool Init();
|
||||
|
||||
/** @brief Allocates a RedoLogBuffer object from the pool. */
|
||||
RedoLogBuffer* Alloc();
|
||||
|
||||
/** @brief Returns a RedoLogBuffer object into the pool. */
|
||||
void Free(RedoLogBuffer* buffer);
|
||||
|
||||
private:
|
||||
/** @var The size of each memory buffer within each RedoLogBuffer object. */
|
||||
uint32_t m_bufferSize;
|
||||
|
||||
/** @var The amount of RedoLogBuffer objects to add to the pool each time the pool is depleted. */
|
||||
uint32_t m_growSize;
|
||||
|
||||
/** @var Synchronizes access to the free list. */
|
||||
std::mutex m_lock;
|
||||
|
||||
/** @var The list of ready to use RedoLogBuffer objects. */
|
||||
RedoLogBuffer* m_freeList;
|
||||
|
||||
/** @var Global pool for RedoLogBuffer objects (inside memory buffers are allocated externally). */
|
||||
ObjAllocInterface* m_objectPool;
|
||||
|
||||
/** @brief Refills the free list. */
|
||||
void RefillFreeList();
|
||||
|
||||
/** @brief Clears the free list and deallocates all associated objects and buffers. */
|
||||
void ClearFreeList();
|
||||
};
|
||||
} // namespace MOT
|
||||
|
||||
#endif /* REDO_LOG_BUFFER_POOL_H */
|
@ -26,7 +26,6 @@
|
||||
#define ILOGGER_H
|
||||
|
||||
#include "redo_log_writer.h"
|
||||
#include "redo_log_buffer_array.h"
|
||||
|
||||
namespace MOT {
|
||||
/**
|
||||
@ -56,16 +55,6 @@ public:
|
||||
return AddToLog(data, size);
|
||||
}
|
||||
|
||||
/**
|
||||
* Serializes a RedoLogBuffer array into the logger.
|
||||
* @param redoLogBufferArray The RedoLogBuffers to write to the logger.
|
||||
* @return The amount of bytes written.
|
||||
*/
|
||||
virtual uint64_t AddToLog(RedoLogBufferArray& redoLogBufferArray)
|
||||
{
|
||||
return AddToLog(redoLogBufferArray.GetEntries(), redoLogBufferArray.Size());
|
||||
}
|
||||
|
||||
/**
|
||||
* Serializes a RedoLogBuffer array into the logger.
|
||||
* @param redoLogBufferArray The RedoLogBuffers to write to the logger.
|
||||
|
@ -30,8 +30,6 @@
|
||||
#include "logger_factory.h"
|
||||
#include "logger_type.h"
|
||||
#include "synchronous_redo_log_handler.h"
|
||||
#include "asynchronous_redo_log_handler.h"
|
||||
#include "group_synchronous_redo_log_handler.h"
|
||||
#include "segmented_group_synchronous_redo_log_handler.h"
|
||||
#include "mot_error.h"
|
||||
|
||||
@ -56,16 +54,9 @@ RedoLogHandler* RedoLogHandlerFactory::CreateRedoLogHandler()
|
||||
case RedoLogHandlerType::SYNC_REDO_LOG_HANDLER:
|
||||
handler = new (std::nothrow) SynchronousRedoLogHandler();
|
||||
break;
|
||||
case RedoLogHandlerType::GROUP_SYNC_REDO_LOG_HANDLER:
|
||||
// GroupSyncRedoLogHandler was replaced with SegmentedGroupSyncRedoLogHandler
|
||||
handler = new (std::nothrow) SegmentedGroupSyncRedoLogHandler();
|
||||
break;
|
||||
case RedoLogHandlerType::SEGMENTED_GROUP_SYNC_REDO_LOG_HANDLER:
|
||||
handler = new (std::nothrow) SegmentedGroupSyncRedoLogHandler();
|
||||
break;
|
||||
case RedoLogHandlerType::ASYNC_REDO_LOG_HANDLER:
|
||||
handler = new (std::nothrow) AsyncRedoLogHandler();
|
||||
break;
|
||||
default:
|
||||
MOT_REPORT_PANIC(MOT_ERROR_INTERNAL,
|
||||
"Redo Log Handler Initialization",
|
||||
|
@ -32,16 +32,12 @@ DECLARE_LOGGER(RedoLogHandlerType, RedoLog)
|
||||
|
||||
static const char* NONE_STR = "none";
|
||||
static const char* SYNC_REDO_LOG_HANDLER_STR = "synchronous";
|
||||
static const char* GROUP_SYNC_REDO_LOG_HANDLER_STR = "group_synchronous";
|
||||
static const char* SEGMENTED_GROUP_SYNC_REDO_LOG_HANDLER_STR = "segmented_group_synchronous";
|
||||
static const char* ASYNC_REDO_LOG_HANDLER_STR = "asynchronous";
|
||||
static const char* INVALID_REDO_LOG_HANDLER_STR = "INVALID";
|
||||
|
||||
static const char* redoLogHandlerTypeNames[] = {NONE_STR,
|
||||
SYNC_REDO_LOG_HANDLER_STR,
|
||||
GROUP_SYNC_REDO_LOG_HANDLER_STR,
|
||||
SEGMENTED_GROUP_SYNC_REDO_LOG_HANDLER_STR,
|
||||
ASYNC_REDO_LOG_HANDLER_STR};
|
||||
SEGMENTED_GROUP_SYNC_REDO_LOG_HANDLER_STR};
|
||||
|
||||
RedoLogHandlerType RedoLogHandlerTypeFromString(const char* redoLogHandlerType)
|
||||
{
|
||||
@ -51,12 +47,8 @@ RedoLogHandlerType RedoLogHandlerTypeFromString(const char* redoLogHandlerType)
|
||||
handlerType = RedoLogHandlerType::NONE;
|
||||
} else if (strcmp(redoLogHandlerType, SYNC_REDO_LOG_HANDLER_STR) == 0) {
|
||||
handlerType = RedoLogHandlerType::SYNC_REDO_LOG_HANDLER;
|
||||
} else if (strcmp(redoLogHandlerType, GROUP_SYNC_REDO_LOG_HANDLER_STR) == 0) {
|
||||
handlerType = RedoLogHandlerType::GROUP_SYNC_REDO_LOG_HANDLER;
|
||||
} else if (strcmp(redoLogHandlerType, SEGMENTED_GROUP_SYNC_REDO_LOG_HANDLER_STR) == 0) {
|
||||
handlerType = RedoLogHandlerType::SEGMENTED_GROUP_SYNC_REDO_LOG_HANDLER;
|
||||
} else if (strcmp(redoLogHandlerType, ASYNC_REDO_LOG_HANDLER_STR) == 0) {
|
||||
handlerType = RedoLogHandlerType::ASYNC_REDO_LOG_HANDLER;
|
||||
} else {
|
||||
MOT_LOG_ERROR("Invalid redo log handler type: %s", redoLogHandlerType);
|
||||
}
|
||||
|
@ -36,15 +36,9 @@ enum class RedoLogHandlerType : uint32_t {
|
||||
/** @var Denotes SynchronousRedoLogHandler. */
|
||||
SYNC_REDO_LOG_HANDLER,
|
||||
|
||||
/** @var Denotes GroupSyncRedoLogHandler. */
|
||||
GROUP_SYNC_REDO_LOG_HANDLER,
|
||||
|
||||
/** @var Denotes SegmentedGroupSyncRedoLogHandler. */
|
||||
SEGMENTED_GROUP_SYNC_REDO_LOG_HANDLER,
|
||||
|
||||
/** @var Denotes AsyncRedoLogHandler. */
|
||||
ASYNC_REDO_LOG_HANDLER,
|
||||
|
||||
/** @var Denotes invalid handler type. */
|
||||
INVALID_REDO_LOG_HANDLER
|
||||
};
|
||||
|
@ -141,9 +141,13 @@ private:
|
||||
{
|
||||
bool result = true;
|
||||
if (u_sess->attr.attr_storage.guc_synchronous_commit == SYNCHRONOUS_COMMIT_OFF) {
|
||||
/*
|
||||
* In MOT, for asynchronous redo log mode also we use SYNC_REDO_LOG_HANDLER.
|
||||
* Asynchronous behavior is handled by the envelope.
|
||||
*/
|
||||
MOT_LOG_INFO("Configuring asynchronous redo-log handler due to synchronous_commit=off");
|
||||
result = AddExtTypedConfigItem<MOT::RedoLogHandlerType>(
|
||||
"", "redo_log_handler_type", MOT::RedoLogHandlerType::ASYNC_REDO_LOG_HANDLER);
|
||||
"", "redo_log_handler_type", MOT::RedoLogHandlerType::SYNC_REDO_LOG_HANDLER);
|
||||
} else if (MOT::GetGlobalConfiguration().m_enableGroupCommit) {
|
||||
MOT_LOG_INFO("Configuring segmented-group redo-log handler");
|
||||
result = AddExtTypedConfigItem<MOT::RedoLogHandlerType>(
|
||||
|
@ -68,7 +68,6 @@
|
||||
#include "access/sysattr.h"
|
||||
#include "tcop/utility.h"
|
||||
#include "postmaster/bgwriter.h"
|
||||
#include "postmaster/walwriter.h"
|
||||
#include "storage/lmgr.h"
|
||||
#include "storage/ipc.h"
|
||||
|
||||
@ -163,7 +162,6 @@ static MotSessionMemoryDetail* MOTGetForeignSessionMemSize(uint32_t* sessionCoun
|
||||
static void MOTNotifyForeignConfigChange();
|
||||
|
||||
static void MOTCheckpointCallback(CheckpointEvent checkpointEvent, uint64_t lsn, void* arg);
|
||||
static void MOTWalCallback(void* arg);
|
||||
|
||||
/*
|
||||
* Helper functions
|
||||
@ -1788,16 +1786,6 @@ static void MOTCheckpointCallback(CheckpointEvent checkpointEvent, uint64_t lsn,
|
||||
}
|
||||
}
|
||||
|
||||
static void MOTWalCallback(void* arg)
|
||||
{
|
||||
MOT::MOTEngine* engine = MOT::MOTEngine::GetInstance();
|
||||
if (engine == nullptr) {
|
||||
elog(INFO, "Failed to create MOT engine");
|
||||
return;
|
||||
}
|
||||
engine->WriteLog();
|
||||
}
|
||||
|
||||
/* @MOT
|
||||
* brief: Validate table definition
|
||||
* input param @obj: A Obj including infomation to validate when alter tabel and create table.
|
||||
@ -1950,11 +1938,6 @@ static void InitMOTHandler()
|
||||
if (MOT::GetGlobalConfiguration().m_enableIncrementalCheckpoint == false) {
|
||||
// Register our checkpoint and redo callbacks to the envelope.
|
||||
if (!MOTAdaptor::m_callbacks_initialized) {
|
||||
if (MOT::GetGlobalConfiguration().m_enableRedoLog &&
|
||||
MOT::GetGlobalConfiguration().m_redoLogHandlerType == MOT::RedoLogHandlerType::ASYNC_REDO_LOG_HANDLER) {
|
||||
RegisterWALCallback(MOTWalCallback, NULL);
|
||||
}
|
||||
|
||||
if (MOT::GetGlobalConfiguration().m_enableCheckpoint) {
|
||||
RegisterCheckpointCallback(MOTCheckpointCallback, NULL);
|
||||
} else {
|
||||
|
@ -523,7 +523,6 @@ typedef struct knl_g_executor_context {
|
||||
|
||||
typedef struct knl_g_xlog_context {
|
||||
int num_locks_in_group;
|
||||
struct WALCallbackItem* walCallback;
|
||||
struct RedoCommitCallbackItem* redoCommitCallback;
|
||||
} knl_g_xlog_context;
|
||||
|
||||
|
@ -12,11 +12,6 @@
|
||||
#ifndef _WALWRITER_H
|
||||
#define _WALWRITER_H
|
||||
|
||||
typedef void (*WALCallback)(void* arg);
|
||||
|
||||
extern void RegisterWALCallback(WALCallback callback, void* arg);
|
||||
extern void CallWALCallback();
|
||||
|
||||
extern void WalWriterMain(void);
|
||||
|
||||
#endif /* _WALWRITER_H */
|
||||
|
Reference in New Issue
Block a user