!110 【Logical Replication】fix bugs for logical replication
Merge pull request !110 from Zelin_Zheng/master
This commit is contained in:
@ -241,6 +241,7 @@ static void print_literal(StringInfo s, Oid typid, char* outputstr)
|
||||
const char* valptr = NULL;
|
||||
|
||||
switch (typid) {
|
||||
case INT1OID:
|
||||
case INT2OID:
|
||||
case INT4OID:
|
||||
case INT8OID:
|
||||
@ -366,6 +367,7 @@ static void pg_decode_change(
|
||||
MemoryContext old;
|
||||
char* res = NULL;
|
||||
data = (TestDecodingData*)ctx->output_plugin_private;
|
||||
u_sess->attr.attr_common.extra_float_digits = 0;
|
||||
|
||||
/* output BEGIN if we haven't yet */
|
||||
if (data->skip_empty_xacts && !data->xact_wrote_changes) {
|
||||
|
@ -224,6 +224,7 @@ static void print_literal(StringInfo s, Oid typid, char* outputstr)
|
||||
const char* valptr = NULL;
|
||||
|
||||
switch (typid) {
|
||||
case INT1OID:
|
||||
case INT2OID:
|
||||
case INT4OID:
|
||||
case INT8OID:
|
||||
@ -351,6 +352,7 @@ static void pg_decode_change(
|
||||
MemoryContext old;
|
||||
|
||||
data = (TestDecodingData*)ctx->output_plugin_private;
|
||||
u_sess->attr.attr_common.extra_float_digits = 0;
|
||||
|
||||
/* output BEGIN if we haven't yet */
|
||||
if (data->skip_empty_xacts && !data->xact_wrote_changes) {
|
||||
|
@ -1617,6 +1617,58 @@ bool cmdStringCheck(const char* cmd_string)
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Check cmdString length.
|
||||
*/
|
||||
static bool cmdStringLengthCheck(const char* cmd_string)
|
||||
{
|
||||
const size_t cmd_length_limit = 200;
|
||||
const size_t slotname_limit = 64;
|
||||
char comd[cmd_length_limit] = {'\0'};
|
||||
char* sub_cmd = NULL;
|
||||
char* rm_cmd = NULL;
|
||||
char* slot_name = NULL;
|
||||
|
||||
size_t cmd_length = strlen(cmd_string);
|
||||
if (cmd_length == 0) {
|
||||
return true;
|
||||
}
|
||||
if (cmd_length >= cmd_length_limit) {
|
||||
return false;
|
||||
}
|
||||
errno_t ret = memset_s(comd, cmd_length_limit, 0, cmd_length_limit);
|
||||
securec_check_c(ret, "\0", "\0");
|
||||
ret = strncpy_s(comd, cmd_length_limit, cmd_string, cmd_length);
|
||||
securec_check_c(ret, "\0", "\0");
|
||||
|
||||
if (cmd_length > strlen("START_REPLICATION") &&
|
||||
strncmp(cmd_string, "START_REPLICATION", strlen("START_REPLICATION")) == 0) {
|
||||
sub_cmd = strtok_r(comd, " ", &rm_cmd);
|
||||
sub_cmd = strtok_r(NULL, " ", &rm_cmd);
|
||||
if (strlen(sub_cmd) != strlen("SLOT") ||
|
||||
strncmp(sub_cmd, "SLOT", strlen("SLOT")) != 0) {
|
||||
return true;
|
||||
} else {
|
||||
slot_name = strtok_r(NULL, " ", &rm_cmd);
|
||||
}
|
||||
} else if (cmd_length > strlen("CREATE_REPLICATION_SLOT") &&
|
||||
strncmp(cmd_string, "CREATE_REPLICATION_SLOT", strlen("CREATE_REPLICATION_SLOT")) == 0) {
|
||||
sub_cmd = strtok_r(comd, " ", &rm_cmd);
|
||||
slot_name = strtok_r(NULL, " ", &rm_cmd);
|
||||
} else if (cmd_length > strlen("DROP_REPLICATION_SLOT") &&
|
||||
strncmp(cmd_string, "DROP_REPLICATION_SLOT", strlen("DROP_REPLICATION_SLOT")) == 0) {
|
||||
sub_cmd = strtok_r(comd, " ", &rm_cmd);
|
||||
slot_name = strtok_r(NULL, " ", &rm_cmd);
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (strlen(slot_name) >= slotname_limit) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Execute an incoming replication command.
|
||||
*/
|
||||
@ -1642,6 +1694,11 @@ static bool HandleWalReplicationCommand(const char* cmd_string)
|
||||
(errmsg_internal("replication command, syntax error."))));
|
||||
}
|
||||
|
||||
if (cmdStringLengthCheck(cmd_string) == false) {
|
||||
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
|
||||
(errmsg_internal("replication slot name should be shorter than %d.", NAMEDATALEN))));
|
||||
}
|
||||
|
||||
cmd_context = AllocSetContextCreate(CurrentMemoryContext,
|
||||
"Replication command context",
|
||||
ALLOCSET_DEFAULT_MINSIZE,
|
||||
|
Reference in New Issue
Block a user