修改打开按需回放时,数据库启动阶段就申请HashMap所需内存,防止真正回放时内存申请不足导致回放失败
This commit is contained in:
@ -3349,6 +3349,22 @@ static void CheckExtremeRtoGUCConflicts(void)
|
||||
TRXN_REDO_MANAGER_NUM + TRXN_REDO_WORKER_NUM + XLOG_READER_NUM, MAX_RECOVERY_THREAD_NUM)));
|
||||
}
|
||||
#endif
|
||||
|
||||
if (g_instance.attr.attr_storage.dms_attr.enable_ondemand_recovery) {
|
||||
if (!g_instance.attr.attr_storage.dms_attr.enable_dms) {
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYSTEM_ERROR),
|
||||
errmsg("ondemand extreme rto only support in shared storage mode."),
|
||||
errhint("Either turn on ss_enable_dms, or turn off ss_enable_ondemand_recovery.")));
|
||||
}
|
||||
|
||||
if (g_instance.attr.attr_storage.recovery_parse_workers <= 1) {
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYSTEM_ERROR),
|
||||
errmsg("extreme rto param should be set in ondemand extreme rto mode."),
|
||||
errhint("Either turn off ss_enable_ondemand_recovery, or set extreme rto param.")));
|
||||
}
|
||||
}
|
||||
}
|
||||
static void CheckRecoveryParaConflict()
|
||||
{
|
||||
|
||||
@ -1428,6 +1428,7 @@ static void knl_t_storage_init(knl_t_storage_context* storage_cxt)
|
||||
storage_cxt->max_userdatafiles = 8192 - 1000;
|
||||
storage_cxt->timeoutRemoteOpera = 0;
|
||||
storage_cxt->dmsBufCtl = NULL;
|
||||
storage_cxt->ondemandXLogMem = NULL;
|
||||
}
|
||||
|
||||
static void knl_t_port_init(knl_t_port_context* port_cxt)
|
||||
|
||||
@ -27,48 +27,46 @@
|
||||
#include "access/ondemand_extreme_rto/redo_utils.h"
|
||||
#include "storage/lock/lwlock.h"
|
||||
|
||||
Size OndemandRecoveryShmemSize(void)
|
||||
{
|
||||
Size size = 0;
|
||||
|
||||
size = add_size(size, (Size)g_instance.attr.attr_storage.dms_attr.ondemand_recovery_mem_size << BITS_IN_KB);
|
||||
|
||||
return size;
|
||||
}
|
||||
|
||||
void OndemandRecoveryShmemInit(void)
|
||||
{
|
||||
bool found = false;
|
||||
t_thrd.storage_cxt.ondemandXLogMem =
|
||||
(char *)ShmemInitStruct("Ondemand Recovery HashMap", OndemandRecoveryShmemSize(), &found);
|
||||
|
||||
if (!found) {
|
||||
/* The memory of the memset sometimes exceeds 2 GB. so, memset_s cannot be used. */
|
||||
MemSet(t_thrd.storage_cxt.ondemandXLogMem, 0, OndemandRecoveryShmemSize());
|
||||
}
|
||||
}
|
||||
|
||||
/* add for batch redo mem manager */
|
||||
void *OndemandXLogMemCtlInit(RedoMemManager *memctl, Size itemsize, int itemnum)
|
||||
{
|
||||
void *allocdata = NULL;
|
||||
RedoMemSlot *nextfreeslot = NULL;
|
||||
OndemandParseAllocCtrl *ctrl;
|
||||
Assert(PARSEBUFFER_SIZE == itemsize);
|
||||
Size dataSize = (itemsize + sizeof(RedoMemSlot)) * itemnum;
|
||||
|
||||
allocdata = (void *)palloc(sizeof(OndemandParseAllocCtrl));
|
||||
ctrl = (OndemandParseAllocCtrl *)allocdata;
|
||||
ctrl->allocNum = itemnum / ONDEMAND_MAX_PARSEBUFF_PREPALLOC;
|
||||
if ((int)(ctrl->allocNum * ONDEMAND_MAX_PARSEBUFF_PREPALLOC) != itemnum) {
|
||||
ctrl->allocNum++;
|
||||
}
|
||||
ctrl->memslotEntry = (void *)palloc(sizeof(RedoMemSlot) * itemnum);
|
||||
Assert(t_thrd.storage_cxt.ondemandXLogMem != NULL);
|
||||
Assert(dataSize <= OndemandRecoveryShmemSize());
|
||||
|
||||
// palloc all parse mem entry
|
||||
for (int i = 0; i < ctrl->allocNum; i++) {
|
||||
ctrl->allocEntry[i] = (void *)palloc(ONDEMAND_MAX_PARSESIZE_PREPALLOC);
|
||||
if (ctrl->allocEntry[i] == NULL) {
|
||||
ereport(PANIC,
|
||||
(errmodule(MOD_REDO), errcode(ERRCODE_LOG),
|
||||
errmsg("[SS] XLogMemCtlInit Allocated buffer failed!, totalblknum:%d, itemsize:%lu",
|
||||
itemnum, itemsize)));
|
||||
/* panic */
|
||||
}
|
||||
errno_t rc = memset_s(ctrl->allocEntry[i], ONDEMAND_MAX_PARSESIZE_PREPALLOC, 0,
|
||||
ONDEMAND_MAX_PARSESIZE_PREPALLOC);
|
||||
securec_check(rc, "\0", "\0");
|
||||
}
|
||||
memctl->totalblknum = itemnum;
|
||||
memctl->usedblknum = 0;
|
||||
memctl->itemsize = itemsize;
|
||||
memctl->memslot = (RedoMemSlot *)ctrl->memslotEntry;
|
||||
nextfreeslot = memctl->memslot;
|
||||
memctl->memslot = (RedoMemSlot *)(t_thrd.storage_cxt.ondemandXLogMem + (itemsize * itemnum));
|
||||
for (int i = memctl->totalblknum; i > 0; --i) {
|
||||
memctl->memslot[i - 1].buf_id = i; /* start from 1 , 0 is invalidbuffer */
|
||||
memctl->memslot[i - 1].freeNext = i - 1;
|
||||
}
|
||||
memctl->firstfreeslot = memctl->totalblknum;
|
||||
memctl->firstreleaseslot = InvalidBuffer;
|
||||
return allocdata;
|
||||
return (void *)t_thrd.storage_cxt.ondemandXLogMem;
|
||||
}
|
||||
|
||||
RedoMemSlot *OndemandXLogMemAlloc(RedoMemManager *memctl)
|
||||
@ -136,26 +134,11 @@ void OndemandXLogParseBufferInit(RedoParseManager *parsemanager, int buffernum,
|
||||
void OndemandXLogParseBufferDestory(RedoParseManager *parsemanager)
|
||||
{
|
||||
g_parseManager = NULL;
|
||||
OndemandParseAllocCtrl *ctrl = (OndemandParseAllocCtrl *)parsemanager->parsebuffers;
|
||||
|
||||
if (ctrl != NULL) {
|
||||
for (int i = 0; i < ctrl->allocNum; i++) {
|
||||
pfree(ctrl->allocEntry[i]);
|
||||
}
|
||||
pfree(ctrl->memslotEntry);
|
||||
pfree(ctrl);
|
||||
parsemanager->parsebuffers = NULL;
|
||||
}
|
||||
// do not free parsebuffers, which is managed in shared memory
|
||||
parsemanager->parsebuffers = NULL;
|
||||
parsemanager->memctl.isInit = false;
|
||||
}
|
||||
|
||||
ParseBufferDesc *OndemandGetParseMemSlot(OndemandParseAllocCtrl *ctrl, int itemIndex)
|
||||
{
|
||||
int entryIndex = itemIndex / ONDEMAND_MAX_PARSEBUFF_PREPALLOC;
|
||||
int entryOffset = (itemIndex - (entryIndex * ONDEMAND_MAX_PARSEBUFF_PREPALLOC)) * PARSEBUFFER_SIZE;
|
||||
return (ParseBufferDesc *)((char *)ctrl->allocEntry[entryIndex] + entryOffset);
|
||||
}
|
||||
|
||||
XLogRecParseState *OndemandXLogParseBufferAllocList(RedoParseManager *parsemanager, XLogRecParseState *blkstatehead,
|
||||
void *record)
|
||||
{
|
||||
@ -175,7 +158,7 @@ XLogRecParseState *OndemandXLogParseBufferAllocList(RedoParseManager *parsemanag
|
||||
pg_read_barrier();
|
||||
Assert(allocslot->buf_id != InvalidBuffer);
|
||||
Assert(memctl->itemsize == (sizeof(XLogRecParseState) + sizeof(ParseBufferDesc)));
|
||||
descstate = OndemandGetParseMemSlot((OndemandParseAllocCtrl *)parsemanager->parsebuffers, allocslot->buf_id - 1);
|
||||
descstate = (ParseBufferDesc *)((char *)parsemanager->parsebuffers + memctl->itemsize * (allocslot->buf_id - 1));
|
||||
descstate->buff_id = allocslot->buf_id;
|
||||
Assert(descstate->state == 0);
|
||||
descstate->state = 1;
|
||||
|
||||
@ -30,6 +30,7 @@
|
||||
#include "access/ustore/undo/knl_uundoapi.h"
|
||||
#include "access/ustore/knl_undoworker.h"
|
||||
#include "access/ustore/knl_undorequest.h"
|
||||
#include "access/ondemand_extreme_rto/redo_utils.h"
|
||||
#include "commands/tablespace.h"
|
||||
#include "commands/async.h"
|
||||
#include "commands/matview.h"
|
||||
@ -195,6 +196,10 @@ Size ComputeTotalSizeOfShmem()
|
||||
|
||||
/* csf shrinker backend shared memory */
|
||||
size = add_size(size, CfsShrinkerShmemSize());
|
||||
|
||||
if (g_instance.attr.attr_storage.dms_attr.enable_ondemand_recovery) {
|
||||
size = add_size(size, OndemandRecoveryShmemSize());
|
||||
}
|
||||
return size;
|
||||
}
|
||||
|
||||
@ -444,6 +449,10 @@ void CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
|
||||
FileRepairHashTblInit();
|
||||
initRepairBadBlockStat();
|
||||
|
||||
if (g_instance.attr.attr_storage.dms_attr.enable_ondemand_recovery) {
|
||||
OndemandRecoveryShmemInit();
|
||||
}
|
||||
|
||||
if (g_instance.ckpt_cxt_ctl->prune_queue_lock == NULL) {
|
||||
g_instance.ckpt_cxt_ctl->prune_queue_lock = LWLockAssign(LWTRANCHE_PRUNE_DIRTY_QUEUE);
|
||||
}
|
||||
|
||||
@ -26,19 +26,8 @@
|
||||
|
||||
#include "access/xlogproc.h"
|
||||
|
||||
#define PARSEBUFFER_SIZE (sizeof(XLogRecParseState) + sizeof(ParseBufferDesc))
|
||||
#define ONDEMAND_MAX_PARSEBUFF_PREPALLOC ((1024 * 1024 * 1024 - 1) / PARSEBUFFER_SIZE)
|
||||
#define ONDEMAND_MAX_PARSESIZE_PREPALLOC (ONDEMAND_MAX_PARSEBUFF_PREPALLOC * PARSEBUFFER_SIZE)
|
||||
#define ONDEMAND_MAX_PARSEBUFF_ALLOCSIZE 100 // 100GB
|
||||
|
||||
typedef struct
|
||||
{
|
||||
int allocNum;
|
||||
void *allocEntry[ONDEMAND_MAX_PARSEBUFF_ALLOCSIZE];
|
||||
void *memslotEntry;
|
||||
} OndemandParseAllocCtrl;
|
||||
|
||||
|
||||
Size OndemandRecoveryShmemSize(void);
|
||||
void OndemandRecoveryShmemInit(void);
|
||||
void OndemandXLogParseBufferInit(RedoParseManager *parsemanager, int buffernum, RefOperate *refOperate,
|
||||
InterruptFunc interruptOperte);
|
||||
void OndemandXLogParseBufferDestory(RedoParseManager *parsemanager);
|
||||
|
||||
@ -2805,6 +2805,7 @@ typedef struct knl_t_storage_context {
|
||||
int timeoutRemoteOpera;
|
||||
char* PcaBufferBlocks;
|
||||
dms_buf_ctrl_t* dmsBufCtl;
|
||||
char* ondemandXLogMem;
|
||||
} knl_t_storage_context;
|
||||
|
||||
typedef struct knl_t_port_context {
|
||||
|
||||
Reference in New Issue
Block a user