fix tablesync worker does not send csn after restart

This commit is contained in:
chenxiaobin19
2022-06-22 21:31:59 +08:00
parent 00b088690d
commit 2e15bb43ec
3 changed files with 12 additions and 0 deletions

View File

@ -266,6 +266,10 @@ void StartRemoteStreaming(const LibpqrcvConnectParam *options)
ereport(DEBUG5, (errmsg("append binary true")));
}
if (options->useSnapshot) {
appendStringInfoString(&cmd, ", usesnapshot 'true'");
}
appendStringInfoChar(&cmd, ')');
}

View File

@ -1606,6 +1606,7 @@ void ApplyWorkerMain()
options.protoVersion = LOGICALREP_PROTO_VERSION_NUM;
options.publicationNames = t_thrd.applyworker_cxt.mySubscription->publications;
options.binary = t_thrd.applyworker_cxt.mySubscription->binary;
options.useSnapshot = AM_TABLESYNC_WORKER;
/* Start normal logical streaming replication. */
(WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_startstreaming(&options);

View File

@ -84,6 +84,7 @@ static void parse_output_parameters(List *options, PGOutputData *data)
bool protocol_version_given = false;
bool publication_names_given = false;
bool binary_option_given = false;
bool use_snapshot_given = false;
data->binary = false;
@ -121,6 +122,12 @@ static void parse_output_parameters(List *options, PGOutputData *data)
binary_option_given = true;
data->binary = defGetBoolean(defel);
} else if (strcmp(defel->defname, "usesnapshot") == 0) {
if (use_snapshot_given)
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options")));
use_snapshot_given = true;
t_thrd.walsender_cxt.isUseSnapshot = true;
} else
elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
}