datavec限制extreme rto场景

This commit is contained in:
jiwenke
2025-03-15 14:27:43 +08:00
committed by Lin_qiang
parent 4d1e225396
commit 14a4bf392b
6 changed files with 40 additions and 2 deletions

View File

@ -173,6 +173,9 @@ Datum hnswhandler(PG_FUNCTION_ARGS)
PGDLLEXPORT PG_FUNCTION_INFO_V1(hnswbuild);
Datum hnswbuild(PG_FUNCTION_ARGS)
{
if (IsExtremeRedo()) {
elog(ERROR, "hnsw index do not support extreme rto.");
}
Relation heap = (Relation)PG_GETARG_POINTER(0);
Relation index = (Relation)PG_GETARG_POINTER(1);
IndexInfo *indexinfo = (IndexInfo *)PG_GETARG_POINTER(2);
@ -184,6 +187,9 @@ Datum hnswbuild(PG_FUNCTION_ARGS)
PGDLLEXPORT PG_FUNCTION_INFO_V1(hnswbuildempty);
Datum hnswbuildempty(PG_FUNCTION_ARGS)
{
if (IsExtremeRedo()) {
elog(ERROR, "hnsw index do not support extreme rto.");
}
Relation index = (Relation)PG_GETARG_POINTER(0);
hnswbuildempty_internal(index);
@ -193,6 +199,9 @@ Datum hnswbuildempty(PG_FUNCTION_ARGS)
PGDLLEXPORT PG_FUNCTION_INFO_V1(hnswinsert);
Datum hnswinsert(PG_FUNCTION_ARGS)
{
if (IsExtremeRedo()) {
elog(ERROR, "hnsw index do not support extreme rto.");
}
Relation rel = (Relation)PG_GETARG_POINTER(0);
Datum *values = (Datum *)PG_GETARG_POINTER(1);
bool *isnull = reinterpret_cast<bool *>(PG_GETARG_POINTER(2));
@ -207,6 +216,9 @@ Datum hnswinsert(PG_FUNCTION_ARGS)
PGDLLEXPORT PG_FUNCTION_INFO_V1(hnswbulkdelete);
Datum hnswbulkdelete(PG_FUNCTION_ARGS)
{
if (IsExtremeRedo()) {
elog(ERROR, "hnsw index do not support extreme rto.");
}
IndexVacuumInfo *info = (IndexVacuumInfo *)PG_GETARG_POINTER(0);
IndexBulkDeleteResult *volatile stats = (IndexBulkDeleteResult *)PG_GETARG_POINTER(1);
IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback)PG_GETARG_POINTER(2);
@ -219,6 +231,9 @@ Datum hnswbulkdelete(PG_FUNCTION_ARGS)
PGDLLEXPORT PG_FUNCTION_INFO_V1(hnswvacuumcleanup);
Datum hnswvacuumcleanup(PG_FUNCTION_ARGS)
{
if (IsExtremeRedo()) {
elog(ERROR, "hnsw index do not support extreme rto.");
}
IndexVacuumInfo *info = (IndexVacuumInfo *)PG_GETARG_POINTER(0);
IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *)PG_GETARG_POINTER(1);
stats = hnswvacuumcleanup_internal(info, stats);

View File

@ -902,8 +902,9 @@ static void UpdateNeighborsInMemory(char *base, FmgrInfo *procinfo, Oid collatio
HnswCandidate *hc = &neighbors->items[i];
HnswElement neighborElement = (HnswElement)HnswPtrAccess(base, hc->element);
/* Keep scan-build happy on Mac x86-64 */
Assert(neighborElement);
if (neighborElement == NULL) {
continue;
}
/* Use element for lock instead of hc since hc can be replaced */
LWLockAcquire(&neighborElement->lock, LW_EXCLUSIVE);

View File

@ -488,6 +488,7 @@ void FlushPQInfoInternal(Relation index, char* table, BlockNumber startBlkno, ui
{
Buffer buf;
Page page;
PageHeader p;
uint32 curFlushSize;
for (uint16 i = 0; i < nblks; i++) {
curFlushSize = (i == nblks - 1) ?
@ -498,6 +499,8 @@ void FlushPQInfoInternal(Relation index, char* table, BlockNumber startBlkno, ui
errno_t err = memcpy_s(PageGetContents(page), curFlushSize,
table + i * HNSW_PQTABLE_STORAGE_SIZE, curFlushSize);
securec_check(err, "\0", "\0");
p = (PageHeader)page;
p->pd_lower += curFlushSize;
MarkBufferDirty(buf);
UnlockReleaseBuffer(buf);
}

View File

@ -204,6 +204,9 @@ Datum ivfflathandler(PG_FUNCTION_ARGS)
PGDLLEXPORT PG_FUNCTION_INFO_V1(ivfflatbuild);
Datum ivfflatbuild(PG_FUNCTION_ARGS)
{
if (IsExtremeRedo()) {
elog(ERROR, "ivfflat index do not support extreme rto.");
}
Relation heap = (Relation)PG_GETARG_POINTER(0);
Relation index = (Relation)PG_GETARG_POINTER(1);
IndexInfo *indexinfo = (IndexInfo *)PG_GETARG_POINTER(2);
@ -215,6 +218,9 @@ Datum ivfflatbuild(PG_FUNCTION_ARGS)
PGDLLEXPORT PG_FUNCTION_INFO_V1(ivfflatbuildempty);
Datum ivfflatbuildempty(PG_FUNCTION_ARGS)
{
if (IsExtremeRedo()) {
elog(ERROR, "ivfflat index do not support extreme rto.");
}
Relation index = (Relation)PG_GETARG_POINTER(0);
ivfflatbuildempty_internal(index);
@ -224,6 +230,9 @@ Datum ivfflatbuildempty(PG_FUNCTION_ARGS)
PGDLLEXPORT PG_FUNCTION_INFO_V1(ivfflatinsert);
Datum ivfflatinsert(PG_FUNCTION_ARGS)
{
if (IsExtremeRedo()) {
elog(ERROR, "ivfflat index do not support extreme rto.");
}
Relation rel = (Relation)PG_GETARG_POINTER(0);
Datum *values = (Datum *)PG_GETARG_POINTER(1);
bool *isnull = reinterpret_cast<bool *>(PG_GETARG_POINTER(2));
@ -238,6 +247,9 @@ Datum ivfflatinsert(PG_FUNCTION_ARGS)
PGDLLEXPORT PG_FUNCTION_INFO_V1(ivfflatbulkdelete);
Datum ivfflatbulkdelete(PG_FUNCTION_ARGS)
{
if (IsExtremeRedo()) {
elog(ERROR, "ivfflat index do not support extreme rto.");
}
IndexVacuumInfo *info = (IndexVacuumInfo *)PG_GETARG_POINTER(0);
IndexBulkDeleteResult *volatile stats = (IndexBulkDeleteResult *)PG_GETARG_POINTER(1);
IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback)PG_GETARG_POINTER(2);
@ -250,6 +262,9 @@ Datum ivfflatbulkdelete(PG_FUNCTION_ARGS)
PGDLLEXPORT PG_FUNCTION_INFO_V1(ivfflatvacuumcleanup);
Datum ivfflatvacuumcleanup(PG_FUNCTION_ARGS)
{
if (IsExtremeRedo()) {
elog(ERROR, "ivfflat index do not support extreme rto.");
}
IndexVacuumInfo *info = (IndexVacuumInfo *)PG_GETARG_POINTER(0);
IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *)PG_GETARG_POINTER(1);
stats = ivfflatvacuumcleanup_internal(info, stats);

View File

@ -532,6 +532,7 @@ void IvfFlushPQInfoInternal(Relation index, char* table, BlockNumber startBlkno,
{
Buffer buf;
Page page;
PageHeader p;
uint32 curFlushSize;
GenericXLogState *state;
@ -545,6 +546,8 @@ void IvfFlushPQInfoInternal(Relation index, char* table, BlockNumber startBlkno,
errno_t err = memcpy_s(PageGetContents(page), curFlushSize,
table + i * IVF_PQTABLE_STORAGE_SIZE, curFlushSize);
securec_check(err, "\0", "\0");
p = (PageHeader)page;
p->pd_lower += curFlushSize;
MarkBufferDirty(buf);
IvfflatCommitBuffer(buf, state);
}

View File

@ -2,6 +2,7 @@
#define UTILS_H
#include "postgres.h"
#include "fmgr/fmgr_comp.h"
#include "access/multi_redo_api.h"
#include <vector>
#define GENERIC_DEFAULT_ENABLE_PQ false