Files
openGauss-server/src/include/replication/logical.h
openGaussDev 1f0543f212 parallel decoding change toastMask to the highest bit
Offering: openGaussDev

More detail: parallel decoding fix initialization bug

Match-id-8e0cddece001059dedf185e00506ed2f72f370e8
2022-03-14 19:34:45 +08:00

337 lines
11 KiB
C

/* ---------------------------------------------------------------------------------------
*
* logical.h
* openGauss logical decoding coordination
*
* Copyright (c) 2012-2014, PostgreSQL Global Development Group
*
*
* IDENTIFICATION
* src/include/replication/logical.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef LOGICAL_H
#define LOGICAL_H
#include "replication/slot.h"
#include "access/xlog.h"
#include "access/xlogreader.h"
#include "replication/output_plugin.h"
#include "postgres.h"
#include "knl/knl_variable.h"
#include "access/extreme_rto/redo_item.h"
#include "nodes/pg_list.h"
#include "storage/proc.h"
#include "access/extreme_rto/posix_semaphore.h"
#include "access/extreme_rto/spsc_blocking_queue.h"
#include "access/parallel_recovery/redo_item.h"
#include "nodes/replnodes.h"
#include "access/ustore/knl_utuple.h"
#include "replication/logical_queue.h"
#include "replication/parallel_reorderbuffer.h"
/* The number of extra digits displayed for floating-point values in logical decoding */
#define LOGICAL_DECODE_EXTRA_FLOAT_DIGITS 3
/* Maximum number of max parallel decode threads */
#define MAX_PARALLEL_DECODE_NUM 20
/* Maximum number of max replication slots */
#define MAX_REPLICATION_SLOT_NUM 100
/* FLag and mask for TOAST in parallel decoding */
#define TOAST_FLAG ((uint32)1 << 31)
#define TOAST_MASK (((uint32)1 << 31) - 1)
typedef void (*LogicalOutputPluginWriterWrite)(
struct LogicalDecodingContext* lr, XLogRecPtr Ptr, TransactionId xid, bool last_write);
typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;
typedef struct logicalLog logicalLog;
typedef struct LogicalDecodingContext {
/* memory context this is all allocated in */
MemoryContext context;
/* infrastructure pieces */
XLogReaderState* reader;
ReplicationSlot* slot;
struct ReorderBuffer* reorder;
struct SnapBuild* snapshot_builder;
/*
* Marks the logical decoding context as fast forward decoding one.
* Such a context does not have plugin loaded so most of the the following
* properties are unused.
*/
bool fast_forward;
OutputPluginCallbacks callbacks;
OutputPluginOptions options;
/*
* User specified options
*/
List* output_plugin_options;
/*
* User-Provided callback for writing/streaming out data.
*/
LogicalOutputPluginWriterPrepareWrite prepare_write;
LogicalOutputPluginWriterWrite write;
/*
* Output buffer.
*/
StringInfo out;
/*
* Private data pointer of the output plugin.
*/
void* output_plugin_private;
/*
* Private data pointer for the data writer.
*/
void* output_writer_private;
/*
* State for writing output.
*/
bool accept_writes;
bool prepared_write;
XLogRecPtr write_location;
TransactionId write_xid;
bool random_mode;
} LogicalDecodingContext;
typedef struct chosenTable {
char *schema; /* NULL means any schema */
char *table; /* NULL means any table */
} chosenTable;
/* parallel decode callback signature */
typedef void(*ParallelDecodeChangeCB)(Relation relation, ParallelReorderBufferChange* change, logicalLog *logChange,
ParallelLogicalDecodingContext* ctx, int slotId);
typedef struct {
bool include_xids;
bool include_timestamp;
bool skip_empty_xacts;
bool xact_wrote_changes;
bool only_local;
char decode_style; /* 'j' stands for json while 't' stands for text */
int parallel_decode_num;
int sending_batch;
ParallelDecodeChangeCB decode_change;
List *tableWhiteList;
} ParallelDecodeOption;
typedef struct {
MemoryContext context;
ParallelDecodeOption pOptions;
} ParallelDecodingData;
typedef struct {
MemoryContext context;
bool include_xids;
bool include_timestamp;
bool skip_empty_xacts;
bool xact_wrote_changes;
bool only_local;
List *tableWhiteList;
} PluginTestDecodingData;
typedef struct ParallelLogicalDecodingContext {
/* memory context this is all allocated in */
MemoryContext context;
/* infrastructure pieces */
XLogReaderState* reader;
ReplicationSlot* slot;
ParallelReorderBuffer* reorder;
/*
* Marks the logical decoding context as fast forward decoding one.
* Such a context does not have plugin loaded so most of the the following
* properties are unused.
*/
bool fast_forward;
ParallelOutputPluginCallbacks callbacks;
OutputPluginOptions options;
/*
* User specified options
*/
List* output_plugin_options;
/*
* Output buffer.
*/
StringInfo out;
/*
* Private data pointer of the output plugin.
*/
void* output_plugin_private;
/*
* Private data pointer for the data writer.
*/
void* output_writer_private;
/*
* State for writing output.
*/
bool accept_writes;
bool prepared_write;
XLogRecPtr write_location;
TransactionId write_xid;
bool random_mode;
bool isParallel;
} ParallelLogicalDecodingContext;
typedef struct ParallelDecodeWorker {
/* Worker id. */
uint32 id;
/* Thread id */
gs_thread_t tid;
int slotId;
/* To-be-replayed log-record-list queue. */
LogicalQueue* changeQueue;
LogicalQueue* LogicalLogQueue;
logicalLog* freeGetLogicalLogHead;
MemoryContext oldCtx;
char dbUser[NAMEDATALEN];
char dbName[NAMEDATALEN];
char slotname[NAMEDATALEN];
char decodeStyle;
} ParallelDecodeWorker;
typedef struct ParallelDecodeReaderWorker {
/* Worker id. */
uint32 id;
/* Thread id */
ThreadId tid;
int slotId;
char dbUser[NAMEDATALEN];
char dbName[NAMEDATALEN];
char slotname[NAMEDATALEN];
StartReplicationCmd *cmd;
XLogRecPtr current_lsn;
XLogRecPtr restart_lsn;
XLogRecPtr candidate_oldest_xmin_lsn;
XLogRecPtr candidate_oldest_xmin;
XLogRecPtr flushLSN;
/* To-be-replayed log-record-list queue. */
LogicalQueue* queue;
MemoryContext oldCtx;
ParallelDecodingData data;
slock_t rwlock;
} ParallelDecodeReaderWorker;
typedef struct LogicalDispatcher {
MemoryContext oldCtx;
int decodeWorkerId;
ParallelDecodeWorker** decodeWorkers; /* Array of parallel decode workers. */
ParallelDecodeOption pOptions;
int totalWorkerCount; /* Number of parallel decode workers started. */
ParallelDecodeReaderWorker* readWorker; /* Txn reader worker. */
ParallelReorderBufferChange* freeChangeHead; /* Head of freed-item list. */
ParallelReorderBufferChange* freeGetChangeHead;
ReorderBufferTupleBuf* freeTupleHead; /* Head of freed-item list. */
ReorderBufferTupleBuf* freeGetTupleHead;
logicalLog* freeLogicalLogHead; /* Head of freed-item list. */
char slotName[NAMEDATALEN];
int32 pendingCount; /* Number of records pending. */
int32 pendingMax; /* The max. pending count per batch. */
int exitCode; /* Thread exit code. */
uint64 totalCostTime;
uint64 txnCostTime; /* txn cost time */
uint64 pprCostTime;
uint64 sentPtr;
uint32 curChangeNum;
uint32 curTupleNum;
uint32 curLogNum;
uint64 num;
uint32* chosedWorkerIds;
uint32 chosedWorkerCount;
uint32 readyWorkerCnt;
int id;
TimestampTz decodeTime;
bool remainPatch;
bool checkpointNeedFullSync;
bool active;
bool firstLoop;
bool abnormal;
XLogRecPtr startpoint;
struct ReplicationSlot* MyReplicationSlot;
} LogicalDispatcher;
#define QUEUE_RESULT_LEN 256
typedef struct ParallelStatusData {
char slotName[NAMEDATALEN];
int parallelDecodeNum;
char readQueueLen[QUEUE_RESULT_LEN];
char decodeQueueLen[QUEUE_RESULT_LEN];
} ParallelStatusData;
extern LogicalDispatcher g_Logicaldispatcher[];
extern bool firstCreateDispatcher;
extern void CheckLogicalDecodingRequirements(Oid databaseId);
extern void ParallelReorderBufferQueueChange(ParallelReorderBuffer *rb, logicalLog *change, int slotId);
extern void ParallelReorderBufferForget(ParallelReorderBuffer *rb, int slotId, ParallelReorderBufferTXN *txn);
extern void ParallelReorderBufferCommit(ParallelReorderBuffer *rb, logicalLog *change, int slotId,
ParallelReorderBufferTXN *txn);
extern LogicalDecodingContext* CreateInitDecodingContext(const char* plugin, List* output_plugin_options,
bool need_full_snapshot, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write);
extern LogicalDecodingContext* CreateInitDecodingContextInternal(char* plugin, List* output_plugin_options,
XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write, bool set_xmin_horizon, ReplicationSlot* slot);
extern LogicalDecodingContext* CreateDecodingContext(XLogRecPtr start_lsn, List* output_plugin_options,
bool fast_forward, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write);
extern LogicalDecodingContext* CreateDecodingContextForArea(XLogRecPtr start_lsn, const char* plugin,List* output_plugin_options,
bool fast_forward, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write);
extern ParallelLogicalDecodingContext *ParallelCreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options,
bool fast_forward, XLogPageReadCB read_page, int slotId);
extern void DecodingContextFindStartpoint(LogicalDecodingContext* ctx);
extern bool DecodingContextReady(LogicalDecodingContext* ctx);
extern void FreeDecodingContext(LogicalDecodingContext* ctx);
extern void LogicalIncreaseXminForSlot(XLogRecPtr lsn, TransactionId xmin);
extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn);
extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext* ctx, RepOriginId origin_id);
extern void CloseLogicalAdvanceConnect();
extern void NotifyPrimaryAdvance(XLogRecPtr restart, XLogRecPtr flush);
extern void ParallelDecodeWorkerMain(void* point);
extern void LogicalReadWorkerMain(void* point);
extern void ParseProcessRecord(ParallelLogicalDecodingContext *ctx, XLogReaderState *record, ParallelDecodeReaderWorker
*worker);
extern void XLogSendPararllelLogical();
extern int StartLogicalLogWorkers(char* dbUser, char* dbName, char* slotname, List *options, int parallelDecodeNum);
extern void CheckBooleanOption(DefElem *elem, bool *booleanOption, bool defaultValue);
extern int ParseParallelDecodeNumOnly(List *options);
extern bool CheckWhiteList(const List *whiteList, const char *schema, const char *table);
extern bool ParseStringToWhiteList(char *tableString, List **tableWhiteList);
extern void ParseWhiteList(List **whiteList, DefElem* elem);
extern void ParseDecodingOptionPlugin(ListCell* option, PluginTestDecodingData* data, OutputPluginOptions* opt);
extern ParallelStatusData *GetParallelDecodeStatus(uint32 *num);
extern void PrintLiteral(StringInfo s, Oid typid, char* outputstr);
#endif