From 2e15bb43ec1d45a1abdc5ded49216c062ed081dc Mon Sep 17 00:00:00 2001 From: chenxiaobin19 <1025221611@qq.com> Date: Wed, 22 Jun 2022 21:31:59 +0800 Subject: [PATCH] fix tablesync worker does not send csn after restart --- src/gausskernel/storage/replication/libpqwalreceiver.cpp | 4 ++++ src/gausskernel/storage/replication/logical/worker.cpp | 1 + src/gausskernel/storage/replication/pgoutput/pgoutput.cpp | 7 +++++++ 3 files changed, 12 insertions(+) diff --git a/src/gausskernel/storage/replication/libpqwalreceiver.cpp b/src/gausskernel/storage/replication/libpqwalreceiver.cpp index 52a62f621..f0773a4ba 100755 --- a/src/gausskernel/storage/replication/libpqwalreceiver.cpp +++ b/src/gausskernel/storage/replication/libpqwalreceiver.cpp @@ -266,6 +266,10 @@ void StartRemoteStreaming(const LibpqrcvConnectParam *options) ereport(DEBUG5, (errmsg("append binary true"))); } + if (options->useSnapshot) { + appendStringInfoString(&cmd, ", usesnapshot 'true'"); + } + appendStringInfoChar(&cmd, ')'); } diff --git a/src/gausskernel/storage/replication/logical/worker.cpp b/src/gausskernel/storage/replication/logical/worker.cpp index 05272c941..c607366e7 100644 --- a/src/gausskernel/storage/replication/logical/worker.cpp +++ b/src/gausskernel/storage/replication/logical/worker.cpp @@ -1606,6 +1606,7 @@ void ApplyWorkerMain() options.protoVersion = LOGICALREP_PROTO_VERSION_NUM; options.publicationNames = t_thrd.applyworker_cxt.mySubscription->publications; options.binary = t_thrd.applyworker_cxt.mySubscription->binary; + options.useSnapshot = AM_TABLESYNC_WORKER; /* Start normal logical streaming replication. */ (WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_startstreaming(&options); diff --git a/src/gausskernel/storage/replication/pgoutput/pgoutput.cpp b/src/gausskernel/storage/replication/pgoutput/pgoutput.cpp index f9f070473..d15515f7b 100644 --- a/src/gausskernel/storage/replication/pgoutput/pgoutput.cpp +++ b/src/gausskernel/storage/replication/pgoutput/pgoutput.cpp @@ -84,6 +84,7 @@ static void parse_output_parameters(List *options, PGOutputData *data) bool protocol_version_given = false; bool publication_names_given = false; bool binary_option_given = false; + bool use_snapshot_given = false; data->binary = false; @@ -121,6 +122,12 @@ static void parse_output_parameters(List *options, PGOutputData *data) binary_option_given = true; data->binary = defGetBoolean(defel); + } else if (strcmp(defel->defname, "usesnapshot") == 0) { + if (use_snapshot_given) + ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); + use_snapshot_given = true; + + t_thrd.walsender_cxt.isUseSnapshot = true; } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); }