@ -669,9 +669,9 @@ static int getOptions(const int argc, char* const* argv)
|
||||
break;
|
||||
case 'p':
|
||||
check_env_value_c(optarg);
|
||||
if (atoi(optarg) <= 0) {
|
||||
if (checkIsDigit(optarg) == 0) {
|
||||
fprintf(stderr, _("%s: invalid port number \"%s\"\n"), progname, optarg);
|
||||
return 1;
|
||||
exit(1);
|
||||
}
|
||||
dbport = pg_strdup(optarg);
|
||||
break;
|
||||
@ -736,8 +736,7 @@ static int getOptions(const int argc, char* const* argv)
|
||||
|
||||
fsync_interval = atoi(optarg) * 1000;
|
||||
if (fsync_interval < 0) {
|
||||
fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"), progname, optarg);
|
||||
return -1;
|
||||
fsync_interval = 0;
|
||||
}
|
||||
break;
|
||||
case 'S':
|
||||
|
||||
@ -22,6 +22,7 @@
|
||||
#include "knl/knl_variable.h"
|
||||
#include "utils/timestamp.h"
|
||||
#include "streamutil.h"
|
||||
#include "libpq/libpq-int.h"
|
||||
#include <sys/time.h>
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
@ -138,6 +139,17 @@ static void CalculateArgCount(int *argcount)
|
||||
return;
|
||||
}
|
||||
|
||||
static void CheckConnectionHost(PGconn* tmpconn)
|
||||
{
|
||||
if (tmpconn == NULL || tmpconn->sock < 0) {
|
||||
fprintf(stderr, "failed to connect %s:%s.\n",
|
||||
(dbhost == NULL) ? "Unknown" : dbhost,
|
||||
(dbport == NULL) ? "Unknown" : dbport);
|
||||
PQfinish(tmpconn);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Connect to the server. Returns a valid PGconn pointer if connected,
|
||||
* or NULL on non-permanent error. On permanent error, the function will
|
||||
@ -212,10 +224,7 @@ PGconn* GetConnection(void)
|
||||
* If there is too little memory even to allocate the PGconn object
|
||||
* and PQconnectdbParams returns NULL, we call exit(1) directly.
|
||||
*/
|
||||
if (tmpconn == NULL) {
|
||||
fprintf(stderr, _("%s: could not connect to server\n"), progname);
|
||||
exit(1);
|
||||
}
|
||||
CheckConnectionHost(tmpconn);
|
||||
|
||||
if (PQstatus(tmpconn) == CONNECTION_BAD && PQconnectionNeedsPassword(tmpconn) && dbgetpassword != -1) {
|
||||
dbgetpassword = 1; /* ask for password next time */
|
||||
|
||||
@ -4060,6 +4060,9 @@ Node* parseParamRef(ParseState* pstate, ParamRef* pref)
|
||||
pfree_ext(val);
|
||||
if (pref->number == params_info->numParams && u_sess->parser_cxt.ddl_pbe_context != NULL) {
|
||||
MemoryContextDelete(u_sess->parser_cxt.ddl_pbe_context);
|
||||
if (((IS_PGXC_COORDINATOR || IS_PGXC_DATANODE) && IsConnFromCoord())) {
|
||||
u_sess->parser_cxt.param_info = NULL;
|
||||
}
|
||||
u_sess->parser_cxt.ddl_pbe_context = NULL;
|
||||
}
|
||||
} else {
|
||||
|
||||
@ -591,6 +591,11 @@ void PortalDrop(Portal portal, bool isTopCommit)
|
||||
MemoryContextDelete(portal->copyCxt);
|
||||
|
||||
u_sess->parser_cxt.param_message = NULL;
|
||||
|
||||
if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) {
|
||||
u_sess->parser_cxt.param_info = NULL;
|
||||
}
|
||||
|
||||
/* release portal struct (it's in u_sess->portal_mem_cxt) */
|
||||
pfree(portal);
|
||||
}
|
||||
|
||||
@ -4849,14 +4849,6 @@ static void exec_execute_message(const char* portal_name, long max_rows)
|
||||
u_sess->xact_cxt.pbe_execute_complete = false;
|
||||
}
|
||||
|
||||
#ifndef ENABLE_MULTIPLE_NODES
|
||||
if (u_sess->xact_cxt.pbe_execute_complete == true) {
|
||||
/* Set sync point for waiting all stream threads complete. */
|
||||
StreamNodeGroup::syncQuit(STREAM_COMPLETE);
|
||||
UnRegisterStreamSnapshots();
|
||||
}
|
||||
#endif
|
||||
|
||||
if (ENABLE_WORKLOAD_CONTROL) {
|
||||
if (g_instance.wlm_cxt->dynamic_workload_inited) {
|
||||
if (t_thrd.wlm_cxt.parctl_state.simple == 0)
|
||||
|
||||
@ -1253,7 +1253,7 @@ static void alloc_context_from_top(knl_session_context* sess, MemoryContext top_
|
||||
"SessionCacheMemoryContext",
|
||||
ALLOCSET_DEFAULT_MINSIZE,
|
||||
ALLOCSET_DEFAULT_INITSIZE,
|
||||
ALLOCSET_DEFAULT_MAXSIZE);
|
||||
(1024 * 1024)); /* set max block size to 1MB */
|
||||
sess->temp_mem_cxt = AllocSetContextCreate(top_mem_cxt,
|
||||
"SessionTempMemoryContext",
|
||||
ALLOCSET_DEFAULT_MINSIZE,
|
||||
|
||||
@ -386,7 +386,7 @@ void ReplicationSlotCreate(const char *name, ReplicationSlotPersistency persiste
|
||||
/*
|
||||
* Find a previously created slot and mark it as used by this backend.
|
||||
*/
|
||||
void ReplicationSlotAcquire(const char *name, bool isDummyStandby, bool isDrop)
|
||||
void ReplicationSlotAcquire(const char *name, bool isDummyStandby, bool allowDrop)
|
||||
{
|
||||
ReplicationSlot *slot = NULL;
|
||||
int i;
|
||||
@ -421,7 +421,7 @@ void ReplicationSlotAcquire(const char *name, bool isDummyStandby, bool isDrop)
|
||||
if (active) {
|
||||
if ((slot->data.database != InvalidOid || isDummyStandby != slot->data.isDummyStandby)
|
||||
#ifndef ENABLE_MULTIPLE_NODES
|
||||
&& !isDrop
|
||||
&& !allowDrop
|
||||
#endif
|
||||
)
|
||||
ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), errmsg("replication slot \"%s\" is already active", name)));
|
||||
|
||||
@ -207,7 +207,7 @@ extern void ReplicationSlotCreate(const char* name, ReplicationSlotPersistency p
|
||||
Oid databaseId, XLogRecPtr restart_lsn, char* extra_content = NULL, bool encrypted = false);
|
||||
extern void ReplicationSlotPersist(void);
|
||||
extern void ReplicationSlotDrop(const char* name, bool for_backup = false);
|
||||
extern void ReplicationSlotAcquire(const char* name, bool isDummyStandby, bool isDrop = false);
|
||||
extern void ReplicationSlotAcquire(const char* name, bool isDummyStandby, bool allowDrop = false);
|
||||
extern bool IsReplicationSlotActive(const char *name);
|
||||
bool ReplicationSlotFind(const char* name);
|
||||
extern void ReplicationSlotRelease(void);
|
||||
|
||||
Reference in New Issue
Block a user