!5831 新增Generic WAL接口(为实现自定义访问方法的扩展供访问WAL的接口)
Merge pull request !5831 from 吉文克/datavec-0.7.2
This commit is contained in:
@ -25,6 +25,7 @@ set(CMAKE_MODULE_PATH
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/gc_fdw
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/ndpplugin
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/spq_plugin
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/datavec
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/gms_stats
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/gms_profiler
|
||||
)
|
||||
@ -53,4 +54,7 @@ add_subdirectory(ndpplugin)
|
||||
if(EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/spq_plugin)
|
||||
add_subdirectory(spq_plugin)
|
||||
endif()
|
||||
if(EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/datavec)
|
||||
add_subdirectory(datavec)
|
||||
endif()
|
||||
add_subdirectory(gms_profiler)
|
||||
|
@ -4,6 +4,7 @@ execute_process(
|
||||
COMMAND ln -fs ${PROJECT_SRC_DIR}/gausskernel/storage/access/rmgrdesc/barrierdesc.cpp ${CMAKE_CURRENT_SOURCE_DIR}/barrierdesc.cpp
|
||||
COMMAND ln -fs ${PROJECT_SRC_DIR}/gausskernel/storage/access/rmgrdesc/clogdesc.cpp ${CMAKE_CURRENT_SOURCE_DIR}/clogdesc.cpp
|
||||
COMMAND ln -fs ${PROJECT_SRC_DIR}/gausskernel/storage/access/rmgrdesc/dbasedesc.cpp ${CMAKE_CURRENT_SOURCE_DIR}/dbasedesc.cpp
|
||||
COMMAND ln -fs ${PROJECT_SRC_DIR}/gausskernel/storage/access/rmgrdesc/genericdesc.cpp ${CMAKE_CURRENT_SOURCE_DIR}/genericdesc.cpp
|
||||
COMMAND ln -fs ${PROJECT_SRC_DIR}/gausskernel/storage/access/rmgrdesc/gindesc.cpp ${CMAKE_CURRENT_SOURCE_DIR}/gindesc.cpp
|
||||
COMMAND ln -fs ${PROJECT_SRC_DIR}/gausskernel/storage/access/rmgrdesc/gistdesc.cpp ${CMAKE_CURRENT_SOURCE_DIR}/gistdesc.cpp
|
||||
COMMAND ln -fs ${PROJECT_SRC_DIR}/gausskernel/storage/access/rmgrdesc/hashdesc.cpp ${CMAKE_CURRENT_SOURCE_DIR}/hashdesc.cpp
|
||||
|
@ -10,6 +10,7 @@
|
||||
#include "knl/knl_variable.h"
|
||||
|
||||
#include "access/clog.h"
|
||||
#include "access/generic_xlog.h"
|
||||
#include "access/gin.h"
|
||||
#include "access/gist_private.h"
|
||||
#include "access/hash.h"
|
||||
|
@ -4340,6 +4340,7 @@ IndexStmt* transformIndexStmt(Oid relid, IndexStmt* stmt, const char* queryStrin
|
||||
(0 != pg_strcasecmp(stmt->accessMethod, DEFAULT_GIN_INDEX_TYPE)) &&
|
||||
(0 != pg_strcasecmp(stmt->accessMethod, DEFAULT_GIST_INDEX_TYPE)) &&
|
||||
(0 != pg_strcasecmp(stmt->accessMethod, DEFAULT_IVFFLAT_INDEX_TYPE)) &&
|
||||
(0 != pg_strcasecmp(stmt->accessMethod, DEFAULT_HNSW_INDEX_TYPE)) &&
|
||||
(0 != pg_strcasecmp(stmt->accessMethod, DEFAULT_USTORE_INDEX_TYPE)) &&
|
||||
(0 != pg_strcasecmp(stmt->accessMethod, DEFAULT_HASH_INDEX_TYPE))) {
|
||||
/* row store only support btree/ubtree/gin/gist/hash index */
|
||||
|
@ -76,7 +76,7 @@ bool will_shutdown = false;
|
||||
*
|
||||
********************************************/
|
||||
|
||||
const uint32 GRAND_VERSION_NUM = 92942;
|
||||
const uint32 GRAND_VERSION_NUM = 92943;
|
||||
|
||||
/********************************************
|
||||
* 2.VERSION NUM FOR EACH FEATURE
|
||||
@ -96,6 +96,7 @@ const uint32 PUBLICATION_DDL_VERSION_NUM = 92921;
|
||||
const uint32 UPSERT_ALIAS_VERSION_NUM = 92920;
|
||||
const uint32 SUPPORT_GS_DEPENDENCY_VERSION_NUM = 92916;
|
||||
const uint32 SPQ_VERSION_NUM = 92915;
|
||||
const uint32 GENERICXLOG_VERSION_NUM = 92943;
|
||||
const uint32 PARTITION_ACCESS_EXCLUSIVE_LOCK_UPGRADE_VERSION = 92913;
|
||||
const uint32 PAGE_DIST_VERSION_NUM = 92912;
|
||||
const uint32 NODE_REFORM_INFO_VERSION_NUM = 92911;
|
||||
|
@ -11,12 +11,12 @@ ifneq "$(MAKECMDGOALS)" "clean"
|
||||
endif
|
||||
|
||||
ifeq ($(enable_mot), yes)
|
||||
OBJS = barrierdesc.o clogdesc.o dbasedesc.o gindesc.o gistdesc.o \
|
||||
OBJS = barrierdesc.o clogdesc.o dbasedesc.o genericdesc.o gindesc.o gistdesc.o \
|
||||
hashdesc.o heapdesc.o motdesc.o mxactdesc.o nbtdesc.o relmapdesc.o \
|
||||
replorigindesc.o seqdesc.o smgrdesc.o spgdesc.o standbydesc.o tblspcdesc.o \
|
||||
xactdesc.o xlogdesc.o slotdesc.o undologdesc.o uheapdesc.o segpagedesc.o logicalddlmsgdesc.o
|
||||
else
|
||||
OBJS = barrierdesc.o clogdesc.o dbasedesc.o gindesc.o gistdesc.o \
|
||||
OBJS = barrierdesc.o clogdesc.o dbasedesc.o genericdesc.o gindesc.o gistdesc.o \
|
||||
hashdesc.o heapdesc.o mxactdesc.o nbtdesc.o relmapdesc.o \
|
||||
replorigindesc.o seqdesc.o smgrdesc.o spgdesc.o standbydesc.o tblspcdesc.o \
|
||||
xactdesc.o xlogdesc.o slotdesc.o undologdesc.o uheapdesc.o segpagedesc.o logicalddlmsgdesc.o
|
||||
|
59
src/gausskernel/storage/access/rmgrdesc/genericdesc.cpp
Normal file
59
src/gausskernel/storage/access/rmgrdesc/genericdesc.cpp
Normal file
@ -0,0 +1,59 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* genericdesc.cpp
|
||||
* rmgr descriptor routines for access/transam/generic_xlog.cpp
|
||||
*
|
||||
* Portions Copyright (c) 2024 Huawei Technologies Co.,Ltd.
|
||||
* Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
|
||||
* Portions Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
* src/gausskernel/storage/access/rmgrdesc/genericdesc.cpp
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
|
||||
#include "access/generic_xlog.h"
|
||||
#include "lib/stringinfo.h"
|
||||
#include "storage/smgr/relfilenode.h"
|
||||
|
||||
/*
|
||||
* Description of generic xlog record: write page regions that this record
|
||||
* overrides.
|
||||
*/
|
||||
void
|
||||
generic_desc(StringInfo buf, XLogReaderState *record)
|
||||
{
|
||||
errno_t ret = EOK;
|
||||
Pointer ptr = XLogRecGetData(record),
|
||||
end = ptr + XLogRecGetDataLen(record);
|
||||
|
||||
while (ptr < end) {
|
||||
OffsetNumber offset, length;
|
||||
|
||||
ret = memcpy_s(&offset, sizeof(offset), ptr, sizeof(offset));
|
||||
securec_check(ret, "\0", "\0");
|
||||
ptr += sizeof(offset);
|
||||
ret = memcpy_s(&length, sizeof(length), ptr, sizeof(length));
|
||||
securec_check(ret, "\0", "\0");
|
||||
ptr += sizeof(length);
|
||||
ptr += length;
|
||||
|
||||
if (ptr < end)
|
||||
appendStringInfo(buf, "offset %u, length %u; ", offset, length);
|
||||
else
|
||||
appendStringInfo(buf, "offset %u, length %u", offset, length);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* Identification of generic xlog record: we don't distinguish any subtypes
|
||||
* inside generic xlog records.
|
||||
*/
|
||||
const char *
|
||||
generic_identify(uint8 info)
|
||||
{
|
||||
return "Generic";
|
||||
}
|
@ -6,6 +6,7 @@ list(APPEND TGT_transam_SRC
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/csnlog.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/double_write.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/extreme_rto_redo_api.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/generic_xlog.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/single_double_write.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/multi_redo_api.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/multi_redo_settings.cpp
|
||||
|
@ -12,12 +12,12 @@ endif
|
||||
ifeq ($(enable_multiple_nodes), yes)
|
||||
OBJS = clog.o multixact.o rmgr.o slru.o csnlog.o transam.o twophase.o \
|
||||
twophase_rmgr.o varsup.o double_write.o single_double_write.o seg_double_write.o redo_statistic.o multi_redo_api.o multi_redo_settings.o\
|
||||
xact.o xlog.o xlogfuncs.o extreme_rto_redo_api.o \
|
||||
xact.o xlog.o xlogfuncs.o extreme_rto_redo_api.o generic_xlog.o \
|
||||
xloginsert.o xlogreader.o xlogutils.o cbmparsexlog.o cbmfuncs.o
|
||||
else
|
||||
OBJS = clog.o gtm_single.o multixact.o rmgr.o slru.o csnlog.o transam.o twophase.o \
|
||||
twophase_rmgr.o varsup.o double_write.o single_double_write.o seg_double_write.o redo_statistic.o multi_redo_api.o multi_redo_settings.o\
|
||||
xact.o xlog.o xlogfuncs.o extreme_rto_redo_api.o \
|
||||
xact.o xlog.o xlogfuncs.o extreme_rto_redo_api.o generic_xlog.o \
|
||||
xloginsert.o xlogreader.o xlogutils.o cbmparsexlog.o cbmfuncs.o
|
||||
endif
|
||||
|
||||
|
430
src/gausskernel/storage/access/transam/generic_xlog.cpp
Normal file
430
src/gausskernel/storage/access/transam/generic_xlog.cpp
Normal file
@ -0,0 +1,430 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* generic_xlog.cpp
|
||||
* Implementation of generic xlog records.
|
||||
*
|
||||
* Portions Copyright (c) 2024 Huawei Technologies Co.,Ltd.
|
||||
* Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
|
||||
* Portions Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
* src/gausskernel/storage/access/transam/generic_xlog.cpp
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
|
||||
#include "access/generic_xlog.h"
|
||||
#include "access/xlogproc.h"
|
||||
#include "miscadmin.h"
|
||||
#include "utils/memutils.h"
|
||||
|
||||
/*-------------------------------------------------------------------------
|
||||
* Internally, a delta between pages consists of a set of fragments. Each
|
||||
* fragment represents changes made in a given region of a page. A fragment
|
||||
* is made up as follows:
|
||||
*
|
||||
* - offset of page region (OffsetNumber)
|
||||
* - length of page region (OffsetNumber)
|
||||
* - data - the data to place into the region ('length' number of bytes)
|
||||
*
|
||||
* Unchanged regions of a page are not represented in its delta. As a
|
||||
* result, a delta can be more compact than the full page image. But having
|
||||
* an unchanged region in the middle of two fragments that is smaller than
|
||||
* the fragment header (offset and length) does not pay off in terms of the
|
||||
* overall size of the delta. For this reason, we break fragments only if
|
||||
* the unchanged region is bigger than MATCH_THRESHOLD.
|
||||
*
|
||||
* The worst case for delta sizes occurs when we did not find any unchanged
|
||||
* region in the page. The size of the delta will be the size of the page plus
|
||||
* the size of the fragment header in that case.
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#define FRAGMENT_HEADER_SIZE (2 * sizeof(OffsetNumber))
|
||||
#define MATCH_THRESHOLD FRAGMENT_HEADER_SIZE
|
||||
#define MAX_DELTA_SIZE BLCKSZ + FRAGMENT_HEADER_SIZE
|
||||
|
||||
/* Struct of generic xlog data for single page */
|
||||
typedef struct
|
||||
{
|
||||
Buffer buffer; /* registered buffer */
|
||||
char image[BLCKSZ]; /* copy of page image for modification */
|
||||
char data[MAX_DELTA_SIZE]; /* delta between page images */
|
||||
int dataLen; /* space consumed in data field */
|
||||
int flags; /* flags for this buffer */
|
||||
} PageData;
|
||||
|
||||
/* State of generic xlog record construction */
|
||||
struct GenericXLogState
|
||||
{
|
||||
bool isLogged;
|
||||
PageData pages[MAX_GENERIC_XLOG_PAGES];
|
||||
};
|
||||
|
||||
static void writeFragment(PageData *pageData, OffsetNumber offset,
|
||||
OffsetNumber len, Pointer data);
|
||||
static void writeDelta(PageData *pageData);
|
||||
static void applyPageRedo(Page page, Pointer data, Size dataSize);
|
||||
|
||||
/*
|
||||
* Write next fragment into delta.
|
||||
*/
|
||||
static void
|
||||
writeFragment(PageData *pageData, OffsetNumber offset, OffsetNumber length,
|
||||
Pointer data)
|
||||
{
|
||||
errno_t ret = EOK;
|
||||
Pointer ptr = pageData->data + pageData->dataLen;
|
||||
|
||||
/* Check if we have enough space */
|
||||
Assert(pageData->dataLen + sizeof(offset) +
|
||||
sizeof(length) + length <= sizeof(pageData->data));
|
||||
|
||||
/* Write fragment data */
|
||||
ret = memcpy_s(ptr, MAX_DELTA_SIZE, &offset, sizeof(offset));
|
||||
securec_check(ret, "\0", "\0");
|
||||
ptr += sizeof(offset);
|
||||
ret = memcpy_s(ptr, MAX_DELTA_SIZE - sizeof(offset), &length, sizeof(length));
|
||||
securec_check(ret, "\0", "\0");
|
||||
ptr += sizeof(length);
|
||||
ret = memcpy_s(ptr, MAX_DELTA_SIZE - sizeof(offset) - sizeof(length), data, length);
|
||||
securec_check(ret, "\0", "\0");
|
||||
ptr += length;
|
||||
|
||||
pageData->dataLen = ptr - pageData->data;
|
||||
}
|
||||
|
||||
/*
|
||||
* Make delta for given page.
|
||||
*/
|
||||
static void
|
||||
writeDelta(PageData *pageData)
|
||||
{
|
||||
Page page = BufferGetPage(pageData->buffer),
|
||||
image = (Page) pageData->image;
|
||||
int i, fragmentBegin = -1, fragmentEnd = -1;
|
||||
uint16 pageLower = ((PageHeader) page)->pd_lower,
|
||||
pageUpper = ((PageHeader) page)->pd_upper,
|
||||
imageLower = ((PageHeader) image)->pd_lower,
|
||||
imageUpper = ((PageHeader) image)->pd_upper;
|
||||
|
||||
for (i = 0; i < BLCKSZ; i++) {
|
||||
bool match;
|
||||
|
||||
/*
|
||||
* Check if bytes in old and new page images match. We do not care
|
||||
* about data in the unallocated area between pd_lower and pd_upper.
|
||||
* We assume the unallocated area to expand with unmatched bytes.
|
||||
* Bytes inside the unallocated area are assumed to always match.
|
||||
*/
|
||||
if (i < pageLower) {
|
||||
if (i < imageLower)
|
||||
match = (page[i] == image[i]);
|
||||
else
|
||||
match = false;
|
||||
} else if (i >= pageUpper) {
|
||||
if (i >= imageUpper)
|
||||
match = (page[i] == image[i]);
|
||||
else
|
||||
match = false;
|
||||
} else {
|
||||
match = true;
|
||||
}
|
||||
|
||||
if (match) {
|
||||
if (fragmentBegin >= 0) {
|
||||
/* Matched byte is potentially part of a fragment. */
|
||||
if (fragmentEnd < 0)
|
||||
fragmentEnd = i;
|
||||
|
||||
/*
|
||||
* Write next fragment if sequence of matched bytes is longer
|
||||
* than MATCH_THRESHOLD.
|
||||
*/
|
||||
if (i - fragmentEnd >= MATCH_THRESHOLD) {
|
||||
writeFragment(pageData, fragmentBegin,
|
||||
fragmentEnd - fragmentBegin,
|
||||
page + fragmentBegin);
|
||||
fragmentBegin = -1;
|
||||
fragmentEnd = -1;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
/* On unmatched byte, start new fragment if it is not done yet */
|
||||
if (fragmentBegin < 0)
|
||||
fragmentBegin = i;
|
||||
fragmentEnd = -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (fragmentBegin >= 0)
|
||||
writeFragment(pageData, fragmentBegin, BLCKSZ - fragmentBegin, page + fragmentBegin);
|
||||
|
||||
#ifdef WAL_DEBUG
|
||||
/*
|
||||
* If xlog debug is enabled, then check produced delta. Result of delta
|
||||
* application to saved image should be the same as current page state.
|
||||
*/
|
||||
if (XLOG_DEBUG) {
|
||||
errno_t ret = EOK;
|
||||
char tmp[BLCKSZ];
|
||||
ret = memcpy_s(tmp, BLCKSZ, image, BLCKSZ);
|
||||
securec_check(ret, "\0", "\0");
|
||||
applyPageRedo(tmp, pageData->data, pageData->dataLen);
|
||||
if (memcmp(tmp, page, pageLower)
|
||||
|| memcmp(tmp + pageUpper, page + pageUpper, BLCKSZ - pageUpper))
|
||||
elog(ERROR, "result of generic xlog apply does not match");
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
/*
|
||||
* Start new generic xlog record.
|
||||
*/
|
||||
GenericXLogState *
|
||||
GenericXLogStart(Relation relation)
|
||||
{
|
||||
int i;
|
||||
GenericXLogState *state;
|
||||
|
||||
if (t_thrd.proc->workingVersionNum < GENERICXLOG_VERSION_NUM) {
|
||||
elog(ERROR, "workingVersionNum is lowwer than GENERICXLOG_VERSION_NUM, not supported!");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
state = (GenericXLogState *) palloc(sizeof(GenericXLogState));
|
||||
|
||||
state->isLogged = RelationNeedsWAL(relation);
|
||||
for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++)
|
||||
state->pages[i].buffer = InvalidBuffer;
|
||||
|
||||
return state;
|
||||
}
|
||||
|
||||
/*
|
||||
* Register new buffer for generic xlog record.
|
||||
*/
|
||||
Page
|
||||
GenericXLogRegisterBuffer(GenericXLogState *state, Buffer buffer, int flags)
|
||||
{
|
||||
errno_t ret = EOK;
|
||||
int block_id;
|
||||
|
||||
if (state == NULL) {
|
||||
elog(ERROR, "GenericXLogState invalid!");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Place new buffer to unused slot in array */
|
||||
for (block_id = 0; block_id < MAX_GENERIC_XLOG_PAGES; block_id++) {
|
||||
PageData *page = &state->pages[block_id];
|
||||
if (BufferIsInvalid(page->buffer)) {
|
||||
page->buffer = buffer;
|
||||
ret = memcpy_s(page->image, BLCKSZ, BufferGetPage(buffer), BLCKSZ);
|
||||
securec_check(ret, "\0", "\0");
|
||||
page->dataLen = 0;
|
||||
page->flags = flags;
|
||||
return (Page)page->image;
|
||||
} else if (page->buffer == buffer) {
|
||||
/*
|
||||
* Buffer is already registered. Just return the image, which is
|
||||
* already prepared.
|
||||
*/
|
||||
return (Page)page->image;
|
||||
}
|
||||
}
|
||||
|
||||
elog(ERROR, "maximum number of %d generic xlog buffers is exceeded",
|
||||
MAX_GENERIC_XLOG_PAGES);
|
||||
|
||||
/* keep compiler quiet */
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Unregister particular buffer for generic xlog record.
|
||||
*/
|
||||
void
|
||||
GenericXLogUnregister(GenericXLogState *state, Buffer buffer)
|
||||
{
|
||||
int block_id;
|
||||
|
||||
if (state == NULL) {
|
||||
elog(ERROR, "GenericXLogState invalid!");
|
||||
return;
|
||||
}
|
||||
|
||||
/* Find block in array to unregister */
|
||||
for (block_id = 0; block_id < MAX_GENERIC_XLOG_PAGES; block_id++) {
|
||||
if (state->pages[block_id].buffer == buffer) {
|
||||
/*
|
||||
* Preserve order of pages in array because it could matter for
|
||||
* concurrency.
|
||||
*/
|
||||
int ret = memmove_s(&state->pages[block_id], (MAX_GENERIC_XLOG_PAGES - block_id - 1) * sizeof(PageData), &state->pages[block_id + 1],
|
||||
(MAX_GENERIC_XLOG_PAGES - block_id - 1) * sizeof(PageData));
|
||||
securec_check(ret, "\0", "\0");
|
||||
state->pages[MAX_GENERIC_XLOG_PAGES - 1].buffer = InvalidBuffer;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
elog(ERROR, "registered generic xlog buffer not found");
|
||||
}
|
||||
|
||||
/*
|
||||
* Put all changes in registered buffers to generic xlog record.
|
||||
*/
|
||||
XLogRecPtr
|
||||
GenericXLogFinish(GenericXLogState *state)
|
||||
{
|
||||
XLogRecPtr lsn = InvalidXLogRecPtr;
|
||||
int i;
|
||||
errno_t ret = EOK;
|
||||
|
||||
Assert(state != NULL);
|
||||
|
||||
if (state->isLogged) {
|
||||
/* Logged relation: make xlog record in critical section. */
|
||||
XLogBeginInsert();
|
||||
|
||||
START_CRIT_SECTION();
|
||||
|
||||
for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++) {
|
||||
char tmp[BLCKSZ];
|
||||
PageData *page = &state->pages[i];
|
||||
|
||||
if (BufferIsInvalid(page->buffer))
|
||||
continue;
|
||||
|
||||
/* Swap current and saved page image. */
|
||||
ret = memcpy_s(tmp, BLCKSZ, page->image, BLCKSZ);
|
||||
securec_check(ret, "\0", "\0");
|
||||
ret = memcpy_s(page->image, BLCKSZ, BufferGetPage(page->buffer), BLCKSZ);
|
||||
securec_check(ret, "\0", "\0");
|
||||
ret = memcpy_s(BufferGetPage(page->buffer), BLCKSZ, tmp, BLCKSZ);
|
||||
securec_check(ret, "\0", "\0");
|
||||
|
||||
if (page->flags & GENERIC_XLOG_FULL_IMAGE) {
|
||||
/* A full page image does not require anything special */
|
||||
XLogRegisterBuffer(i, page->buffer, REGBUF_FORCE_IMAGE);
|
||||
} else {
|
||||
/*
|
||||
* In normal mode, calculate delta and write it as data
|
||||
* associated with this page.
|
||||
*/
|
||||
XLogRegisterBuffer(i, page->buffer, REGBUF_STANDARD);
|
||||
writeDelta(page);
|
||||
XLogRegisterBufData(i, page->data, page->dataLen);
|
||||
}
|
||||
}
|
||||
|
||||
/* Insert xlog record */
|
||||
lsn = XLogInsert(RM_GENERIC_ID, 0);
|
||||
|
||||
/* Set LSN and mark buffers dirty */
|
||||
for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++) {
|
||||
PageData *page = &state->pages[i];
|
||||
|
||||
if (BufferIsInvalid(page->buffer))
|
||||
continue;
|
||||
PageSetLSN(BufferGetPage(page->buffer), lsn);
|
||||
MarkBufferDirty(page->buffer);
|
||||
}
|
||||
END_CRIT_SECTION();
|
||||
} else {
|
||||
/* Unlogged relation: skip xlog-related stuff */
|
||||
START_CRIT_SECTION();
|
||||
for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++) {
|
||||
PageData *page = &state->pages[i];
|
||||
|
||||
if (BufferIsInvalid(page->buffer))
|
||||
continue;
|
||||
ret = memcpy_s(BufferGetPage(page->buffer), BLCKSZ, page->image, BLCKSZ);
|
||||
securec_check(ret, "\0", "\0");
|
||||
MarkBufferDirty(page->buffer);
|
||||
}
|
||||
END_CRIT_SECTION();
|
||||
}
|
||||
|
||||
pfree(state);
|
||||
|
||||
return lsn;
|
||||
}
|
||||
|
||||
/*
|
||||
* Abort generic xlog record.
|
||||
*/
|
||||
void
|
||||
GenericXLogAbort(GenericXLogState *state)
|
||||
{
|
||||
pfree(state);
|
||||
}
|
||||
|
||||
/*
|
||||
* Apply delta to given page image.
|
||||
*/
|
||||
static void
|
||||
applyPageRedo(Page page, Pointer data, Size dataSize)
|
||||
{
|
||||
errno_t ret = EOK;
|
||||
Pointer ptr = data, end = data + dataSize;
|
||||
|
||||
while (ptr < end) {
|
||||
OffsetNumber offset, length;
|
||||
|
||||
ret = memcpy_s(&offset, sizeof(offset), ptr, sizeof(offset));
|
||||
securec_check(ret, "\0", "\0");
|
||||
ptr += sizeof(offset);
|
||||
ret =memcpy_s(&length, sizeof(length), ptr, sizeof(length));
|
||||
securec_check(ret, "\0", "\0");
|
||||
ptr += sizeof(length);
|
||||
|
||||
ret = memcpy_s(page + offset, length, ptr, length);
|
||||
securec_check(ret, "\0", "\0");
|
||||
|
||||
ptr += length;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Redo function for generic xlog record.
|
||||
*/
|
||||
void
|
||||
generic_redo(XLogReaderState *record)
|
||||
{
|
||||
uint8 block_id;
|
||||
RedoBufferInfo buffers[MAX_GENERIC_XLOG_PAGES];
|
||||
XLogRecPtr lsn = record->EndRecPtr;
|
||||
|
||||
Assert(record->max_block_id < MAX_GENERIC_XLOG_PAGES);
|
||||
|
||||
/* Iterate over blocks */
|
||||
for (block_id = 0; block_id <= record->max_block_id; block_id++) {
|
||||
XLogRedoAction action;
|
||||
|
||||
if (!XLogRecHasBlockRef(record, block_id))
|
||||
continue;
|
||||
|
||||
action = XLogReadBufferForRedo(record, block_id, &buffers[block_id]);
|
||||
|
||||
/* Apply redo to given block if needed */
|
||||
if (action == BLK_NEEDS_REDO) {
|
||||
Pointer blockData;
|
||||
Size blockDataSize;
|
||||
Page page;
|
||||
|
||||
page = BufferGetPage(buffers[block_id].buf);
|
||||
blockData = XLogRecGetBlockData(record, block_id, &blockDataSize);
|
||||
applyPageRedo(page, blockData, blockDataSize);
|
||||
|
||||
PageSetLSN(page, lsn);
|
||||
MarkBufferDirty(buffers[block_id].buf);
|
||||
}
|
||||
}
|
||||
|
||||
/* Changes are done: unlock and release all buffers */
|
||||
for (block_id = 0; block_id <= record->max_block_id; block_id++) {
|
||||
if (BufferIsValid(buffers[block_id].buf))
|
||||
UnlockReleaseBuffer(buffers[block_id].buf);
|
||||
}
|
||||
}
|
@ -26,6 +26,7 @@
|
||||
#include "knl/knl_variable.h"
|
||||
|
||||
#include "access/clog.h"
|
||||
#include "access/generic_xlog.h"
|
||||
#include "access/gin.h"
|
||||
#include "access/gist_private.h"
|
||||
#include "access/hash.h"
|
||||
|
39
src/include/access/generic_xlog.h
Normal file
39
src/include/access/generic_xlog.h
Normal file
@ -0,0 +1,39 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* generic_xlog.h
|
||||
* Generic xlog API definition.
|
||||
*
|
||||
* src/include/access/generic_xlog.h
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#ifndef GENERIC_XLOG_H
|
||||
#define GENERIC_XLOG_H
|
||||
|
||||
#include "access/xlog.h"
|
||||
#include "access/xlog_internal.h"
|
||||
#include "access/xloginsert.h"
|
||||
#include "storage/buf/bufpage.h"
|
||||
#include "utils/rel.h"
|
||||
|
||||
#define MAX_GENERIC_XLOG_PAGES XLR_NORMAL_MAX_BLOCK_ID
|
||||
#define GENERIC_XLOG_FULL_IMAGE 0x0001 /* write full-page image */
|
||||
|
||||
/* state of generic xlog record construction */
|
||||
struct GenericXLogState;
|
||||
typedef struct GenericXLogState GenericXLogState;
|
||||
|
||||
/* API for construction of generic xlog records */
|
||||
extern GenericXLogState *GenericXLogStart(Relation relation);
|
||||
extern Page GenericXLogRegisterBuffer(GenericXLogState *state, Buffer buffer,
|
||||
int flags);
|
||||
extern void GenericXLogUnregister(GenericXLogState *state, Buffer buffer);
|
||||
extern XLogRecPtr GenericXLogFinish(GenericXLogState *state);
|
||||
extern void GenericXLogAbort(GenericXLogState *state);
|
||||
|
||||
/* functions defined for rmgr */
|
||||
extern void generic_redo(XLogReaderState *record);
|
||||
extern const char *generic_identify(uint8 info);
|
||||
extern void generic_desc(StringInfo buf, XLogReaderState *record);
|
||||
|
||||
#endif /* GENERIC_XLOG_H */
|
@ -82,3 +82,4 @@ PG_RMGR(RM_COMPRESSION_REL_ID, "CompressionRelation", CfsShrinkRedo, CfsShrinkDe
|
||||
CfsShrinkTypeName)
|
||||
PG_RMGR(RM_LOGICALDDLMSG_ID, "LogicalDDLMessage", logicalddlmsg_redo, logicalddlmsg_desc, NULL, NULL, NULL, NULL, NULL, \
|
||||
logicalddlmsg_type_name)
|
||||
PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, NULL, NULL, NULL, NULL, NULL, NULL)
|
||||
|
@ -29,6 +29,7 @@
|
||||
#define CSTORE_GINBTREE_INDEX_TYPE "cgin"
|
||||
#define DEFAULT_USTORE_INDEX_TYPE "ubtree"
|
||||
#define DEFAULT_IVFFLAT_INDEX_TYPE "ivfflat"
|
||||
#define DEFAULT_HNSW_INDEX_TYPE "hnsw"
|
||||
|
||||
/* Typedef for callback function for IndexBuildHeapScan */
|
||||
typedef void (*IndexBuildCallback)(Relation index, HeapTuple htup, Datum *values, const bool *isnull,
|
||||
|
@ -300,23 +300,6 @@ SH_SCOPE void SH_STAT(SH_TYPE *tb);
|
||||
|
||||
#endif
|
||||
|
||||
/* calculate ceil(log base 2) of num */
|
||||
static inline uint64 sh_log2(uint64 num)
|
||||
{
|
||||
int i;
|
||||
uint64 limit;
|
||||
|
||||
for (i = 0, limit = 1; limit < num; i++, limit <<= 1)
|
||||
;
|
||||
return i;
|
||||
}
|
||||
|
||||
/* calculate first power of 2 >= num */
|
||||
static inline uint64 sh_pow2(uint64 num)
|
||||
{
|
||||
return ((uint64)1) << sh_log2(num);
|
||||
}
|
||||
|
||||
/*
|
||||
* Compute sizing parameters for hashtable. Called when creating and growing
|
||||
* the hashtable.
|
||||
@ -1143,4 +1126,4 @@ SH_SCOPE void SH_STAT(SH_TYPE *tb)
|
||||
#undef SH_DISTANCE_FROM_OPTIMAL
|
||||
#undef SH_ENTRY_HASH
|
||||
#undef SH_INSERT_HASH_INTERNAL
|
||||
#undef SH_LOOKUP_HASH_INTERNAL
|
||||
#undef SH_LOOKUP_HASH_INTERNAL
|
||||
|
@ -141,6 +141,7 @@ extern const uint32 CREATE_TABLE_AS_VERSION_NUM;
|
||||
extern const uint32 GB18030_2022_VERSION_NUM;
|
||||
extern const uint32 PARTITION_ACCESS_EXCLUSIVE_LOCK_UPGRADE_VERSION;
|
||||
extern const uint32 SPQ_VERSION_NUM;
|
||||
extern const uint32 GENERICXLOG_VERSION_NUM;
|
||||
extern const uint32 UPSERT_ALIAS_VERSION_NUM;
|
||||
extern const uint32 SELECT_STMT_HAS_USERVAR;
|
||||
extern const uint32 PUBLICATION_DDL_VERSION_NUM;
|
||||
|
@ -159,4 +159,21 @@ static inline uint32 pg_rotate_right32(uint32 word, int n)
|
||||
return (word >> n) | (word << (sizeof(word) * BITS_PER_BYTE - n));
|
||||
}
|
||||
|
||||
/* calculate ceil(log base 2) of num */
|
||||
static inline uint64 sh_log2(uint64 num)
|
||||
{
|
||||
int i;
|
||||
uint64 limit;
|
||||
|
||||
for (i = 0, limit = 1; limit < num; i++, limit <<= 1)
|
||||
;
|
||||
return i;
|
||||
}
|
||||
|
||||
/* calculate first power of 2 >= num */
|
||||
static inline uint64 sh_pow2(uint64 num)
|
||||
{
|
||||
return ((uint64)1) << sh_log2(num);
|
||||
}
|
||||
|
||||
#endif /* PG_BITUTILS_H */
|
||||
|
Reference in New Issue
Block a user