diff --git a/doc/src/sgml/catalogs.sgmlin b/doc/src/sgml/catalogs.sgmlin index 04230398f..be88ef21c 100644 --- a/doc/src/sgml/catalogs.sgmlin +++ b/doc/src/sgml/catalogs.sgmlin @@ -1387,6 +1387,16 @@ Password expiry time (only used for password authentication); null if no expiration + + + rolpasswordext + text + + Password for other database encrypt rule if exists. + If we created a B compatibility database, sha1(sha1(password)) will + be stored here according to mysql_native_password strategy. + + diff --git a/src/common/backend/libpq/pqcomm.cpp b/src/common/backend/libpq/pqcomm.cpp index 13a47b31a..6fd1e595e 100644 --- a/src/common/backend/libpq/pqcomm.cpp +++ b/src/common/backend/libpq/pqcomm.cpp @@ -111,6 +111,9 @@ extern GlobalNodeDefinition* global_node_definition; +extern ProtocolExtensionConfig* ListenConfig[MAXLISTEN]; +extern ProtocolExtensionConfig default_protocol_config; + /* * Buffers for low-level I/O. * @@ -132,7 +135,6 @@ extern GlobalNodeDefinition* global_node_definition; void pq_close(int code, Datum arg); /* Internal functions */ -static int internal_putbytes(const char* s, size_t len); static int internal_flush(void); static void pq_set_nonblocking(bool nonblocking); static void pq_disk_generate_checking_header( @@ -561,8 +563,8 @@ static void StreamDoUnlink(int code, Datum arg) */ int StreamServerPort(int family, char* hostName, unsigned short portNumber, const char* unixSocketName, pgsocket ListenSocket[], int MaxListen, bool add_localaddr_flag, - bool is_create_psql_sock, bool is_create_libcomm_sock) -{ + bool is_create_psql_sock, bool is_create_libcomm_sock, + ProtocolExtensionConfig* protocol_config) { #define RETRY_SLEEP_TIME 1000000L pgsocket fd = PGINVALID_SOCKET; int err; @@ -802,6 +804,7 @@ int StreamServerPort(int family, char* hostName, unsigned short portNumber, cons goto errhandle; } ListenSocket[listen_index] = fd; + ListenConfig[listen_index] = protocol_config; added++; if (add_localaddr_flag == true) { struct sockaddr* sinp = NULL; @@ -812,25 +815,25 @@ int StreamServerPort(int family, char* hostName, unsigned short portNumber, cons result = inet_net_ntop(AF_INET6, &((struct sockaddr_in6*)sinp)->sin6_addr, 128, - t_thrd.postmaster_cxt.LocalAddrList[t_thrd.postmaster_cxt.LocalIpNum], + g_instance.listen_cxt.LocalAddrList[g_instance.listen_cxt.LocalIpNum], IP_LEN); } else if (addr->ai_family == AF_INET) { result = inet_net_ntop(AF_INET, &((struct sockaddr_in*)sinp)->sin_addr, 32, - t_thrd.postmaster_cxt.LocalAddrList[t_thrd.postmaster_cxt.LocalIpNum], + g_instance.listen_cxt.LocalAddrList[g_instance.listen_cxt.LocalIpNum], IP_LEN); } if (result == NULL) { ereport(WARNING, (errmsg("inet_net_ntop failed, error: %d", EAFNOSUPPORT))); } else { - t_thrd.postmaster_cxt.LocalIpNum++; + g_instance.listen_cxt.LocalIpNum++; } } if (is_create_psql_sock) { - t_thrd.postmaster_cxt.listen_sock_type[listen_index] = PSQL_LISTEN_SOCKET; + g_instance.listen_cxt.listen_sock_type[listen_index] = PSQL_LISTEN_SOCKET; } else { - t_thrd.postmaster_cxt.listen_sock_type[listen_index] = HA_LISTEN_SOCKET; + g_instance.listen_cxt.listen_sock_type[listen_index] = HA_LISTEN_SOCKET; } continue; @@ -1339,7 +1342,7 @@ int pq_getbytes(char* s, size_t len) * returns 0 if OK, EOF if trouble * -------------------------------- */ -static int pq_discardbytes(size_t len) +int pq_discardbytes(size_t len) { size_t amount; @@ -1524,7 +1527,7 @@ int pq_putbytes(const char* s, size_t len) return res; } -static int internal_putbytes(const char* s, size_t len) +int internal_putbytes(const char* s, size_t len) { size_t amount; diff --git a/src/common/backend/utils/error/elog.cpp b/src/common/backend/utils/error/elog.cpp index 71df30241..43a59725e 100644 --- a/src/common/backend/utils/error/elog.cpp +++ b/src/common/backend/utils/error/elog.cpp @@ -146,7 +146,6 @@ static const int CREATE_ALTER_SUBSCRIPTION = 16; static void log_line_prefix(StringInfo buf, ErrorData* edata); static void send_message_to_server_log(ErrorData* edata); -static void send_message_to_frontend(ErrorData* edata); static char* expand_fmt_string(const char* fmt, ErrorData* edata); static const char* useful_strerror(int errnum); static const char* error_severity(int elevel); @@ -1639,7 +1638,9 @@ void EmitErrorReport(void) if (can_skip && need_skip_by_retry) { /* skip sending messsage to front, do noting for now */ } else { - send_message_to_frontend(edata); + if (u_sess->proc_cxt.MyProcPort) { + u_sess->proc_cxt.MyProcPort->protocol_config->fn_send_message(edata); + } } } @@ -3451,7 +3452,7 @@ int combiner_errdata(RemoteErrorData* pErrData) /* * Write error report to client */ -static void send_message_to_frontend(ErrorData* edata) +void send_message_to_frontend(ErrorData* edata) { StringInfoData msgbuf; diff --git a/src/common/backend/utils/init/globals.cpp b/src/common/backend/utils/init/globals.cpp index 6bc900ada..ab6b37510 100644 --- a/src/common/backend/utils/init/globals.cpp +++ b/src/common/backend/utils/init/globals.cpp @@ -114,6 +114,7 @@ const uint32 LOGICAL_DECODE_FLATTEN_TOAST_VERSION_NUM = 92614; const uint32 SWITCH_ROLE_VERSION_NUM = 92618; const uint32 PLAN_SELECT_VERSION_NUM = 92776; const uint32 REPLACE_INTO_VERSION_NUM = 92778; +const uint32 PG_AUTHID_PASSWORDEXT_VERSION_NUM = 92780; /* Version number of the guc parameter backend_version added in V500R001C20 */ diff --git a/src/common/backend/utils/init/postinit.cpp b/src/common/backend/utils/init/postinit.cpp index 94517e3ff..32586eded 100644 --- a/src/common/backend/utils/init/postinit.cpp +++ b/src/common/backend/utils/init/postinit.cpp @@ -309,7 +309,7 @@ static void PerformAuthentication(Port* port) /* * Now perform authentication exchange. */ - ClientAuthentication(port); /* might not return, if failure */ + port->protocol_config->fn_authenticate(port); /* * recover the signal mask before call ClientAuthentication. diff --git a/src/common/backend/utils/misc/guc.cpp b/src/common/backend/utils/misc/guc.cpp index bf5617eeb..11cb258db 100755 --- a/src/common/backend/utils/misc/guc.cpp +++ b/src/common/backend/utils/misc/guc.cpp @@ -6381,15 +6381,12 @@ void BeginReportingGUCOptions(void) static void ReportGUCOption(struct config_generic* record) { if (u_sess->utils_cxt.reporting_enabled && (record->flags & GUC_REPORT)) { - char* val = _ShowOption(record, false, true); - StringInfoData msgbuf; - - pq_beginmessage(&msgbuf, 'S'); - pq_sendstring(&msgbuf, record->name); - pq_sendstring(&msgbuf, val); - pq_endmessage(&msgbuf); - - pfree(val); + Port* MyProcPort = u_sess->proc_cxt.MyProcPort; + if (MyProcPort && MyProcPort->protocol_config->fn_report_param_status) { + char* val = _ShowOption(record, false, true); + (MyProcPort->protocol_config->fn_report_param_status)(record->name, val); + pfree(val); + } } } diff --git a/src/gausskernel/optimizer/commands/user.cpp b/src/gausskernel/optimizer/commands/user.cpp index 9c2af46f0..a62f8fb02 100755 --- a/src/gausskernel/optimizer/commands/user.cpp +++ b/src/gausskernel/optimizer/commands/user.cpp @@ -1386,6 +1386,7 @@ void CreateRole(CreateRoleStmt* stmt) } new_record_nulls[Anum_pg_authid_rolexcpdata - 1] = true; + new_record_nulls[Anum_pg_authid_rolpasswordext - 1] = true; HeapTuple tuple = heap_form_tuple(pg_authid_dsc, new_record, new_record_nulls); diff --git a/src/gausskernel/process/postmaster/postmaster.cpp b/src/gausskernel/process/postmaster/postmaster.cpp index 9a4c26966..a535ca404 100644 --- a/src/gausskernel/process/postmaster/postmaster.cpp +++ b/src/gausskernel/process/postmaster/postmaster.cpp @@ -363,7 +363,7 @@ static void getInstallationPaths(const char* argv0); static void checkDataDir(void); static void CheckGUCConflicts(void); static Port* ConnCreateToRecvGssock(pollfd* ufds, int idx, int* nSockets); -static Port* ConnCreate(int serverFd); +static Port* ConnCreate(int serverFd, int idx); static void reset_shared(int port); static void SIGHUP_handler(SIGNAL_ARGS); static void pmdie(SIGNAL_ARGS); @@ -485,7 +485,6 @@ typedef struct { Port port; pgsocket portsocket; char DataDir[MAXPGPATH]; - pgsocket ListenSocket[MAXLISTEN]; long MyCancelKey; int MyPMChildSlot; #ifndef WIN32 @@ -499,8 +498,6 @@ typedef struct { LWLock* mainLWLockArray; PMSignalData* PMSignalState; - char LocalAddrList[MAXLISTEN][IP_LEN]; - int LocalIpNum; HaShmemData* HaShmData; TimestampTz PgStartTime; @@ -536,6 +533,32 @@ static bool save_backend_variables(BackendParameters* param, Port* port, HANDLE static void BackendArrayAllocation(void); static void BackendArrayRemove(Backend* bn); +extern void libpq_send_cancel_key(int32 pid, int32 key); +extern void libpq_report_param_status(const char *name, char *val); + +extern int ClientConnInitilize(Port* port); +extern void send_message_to_frontend(ErrorData* edata); +extern int SocketBackend(StringInfo inBuf); +extern DestReceiver* printtup_create_DR(CommandDest dest); + +ProtocolExtensionConfig* ListenConfig[MAXLISTEN]; + +ProtocolExtensionConfig default_protocol_config = { + false, + pq_init, + StartupPacketInitialize, + ClientAuthentication, + send_message_to_frontend, + libpq_send_cancel_key, + pq_comm_reset, + ReadyForQuery, + SocketBackend, + printtup_create_DR, /* use libpq defaults for printtup*() */ + NULL, + NULL, + libpq_report_param_status +}; + PMStateInfo pmStateDescription[] = {{PM_INIT, "PM_INIT"}, {PM_STARTUP, "PM_STARTUP"}, {PM_RECOVERY, "PM_RECOVERY"}, @@ -1068,7 +1091,7 @@ static void SetListenSocket(ReplConnInfo **replConnArray, bool *listen_addr_save replConnArray[i]->localhost, (unsigned short)replConnArray[i]->localport, g_instance.attr.attr_network.UnixSocketDir, - t_thrd.postmaster_cxt.ListenSocket, + g_instance.listen_cxt.ListenSocket, MAXLISTEN, false, false, @@ -1691,15 +1714,15 @@ int PostmasterMain(int argc, char* argv[]) ledger_hook_init(); /* - * process any libraries that should be preloaded at postmaster start - */ + * process any libraries that should be preloaded at postmaster start + */ process_shared_preload_libraries(); /* - * Establish input sockets. - */ + * Establish input sockets. + */ for (i = 0; i < MAXLISTEN; i++) - t_thrd.postmaster_cxt.ListenSocket[i] = PGINVALID_SOCKET; + g_instance.listen_cxt.ListenSocket[i] = PGINVALID_SOCKET; if (g_instance.attr.attr_network.ListenAddresses && !dummyStandbyMode) { char* rawstring = NULL; @@ -1708,8 +1731,8 @@ int PostmasterMain(int argc, char* argv[]) int success = 0; /* - * start commproxy if needed - */ + * start commproxy if needed + */ if (CommProxyNeedSetup()) { CommProxyStartUp(); } @@ -1746,7 +1769,7 @@ int PostmasterMain(int argc, char* argv[]) NULL, (unsigned short)g_instance.attr.attr_network.PostPortNumber, g_instance.attr.attr_network.UnixSocketDir, - t_thrd.postmaster_cxt.ListenSocket, + g_instance.listen_cxt.ListenSocket, MAXLISTEN, true, true, @@ -1756,7 +1779,7 @@ int PostmasterMain(int argc, char* argv[]) curhost, (unsigned short)g_instance.attr.attr_network.PostPortNumber, g_instance.attr.attr_network.UnixSocketDir, - t_thrd.postmaster_cxt.ListenSocket, + g_instance.listen_cxt.ListenSocket, MAXLISTEN, true, true, @@ -1785,7 +1808,7 @@ int PostmasterMain(int argc, char* argv[]) NULL, (unsigned short)g_instance.attr.attr_network.PoolerPort, g_instance.attr.attr_network.UnixSocketDir, - t_thrd.postmaster_cxt.ListenSocket, + g_instance.listen_cxt.ListenSocket, MAXLISTEN, false, false, @@ -1795,7 +1818,7 @@ int PostmasterMain(int argc, char* argv[]) curhost, (unsigned short)g_instance.attr.attr_network.PoolerPort, g_instance.attr.attr_network.UnixSocketDir, - t_thrd.postmaster_cxt.ListenSocket, + g_instance.listen_cxt.ListenSocket, MAXLISTEN, false, false, @@ -1835,7 +1858,7 @@ int PostmasterMain(int argc, char* argv[]) #ifdef USE_BONJOUR /* Register for Bonjour only if we opened TCP socket(s) */ - if (g_instance.attr.attr_common.enable_bonjour && t_thrd.postmaster_cxt.ListenSocket[0] != PGINVALID_SOCKET) { + if (g_instance.attr.attr_common.enable_bonjour && g_instance.listen_cxt.ListenSocket[0] != PGINVALID_SOCKET) { DNSServiceErrorType err; /* @@ -1878,7 +1901,7 @@ int PostmasterMain(int argc, char* argv[]) NULL, (unsigned short)g_instance.attr.attr_network.PostPortNumber, g_instance.attr.attr_network.UnixSocketDir, - t_thrd.postmaster_cxt.ListenSocket, + g_instance.listen_cxt.ListenSocket, MAXLISTEN, false, true, @@ -1895,7 +1918,7 @@ int PostmasterMain(int argc, char* argv[]) NULL, (unsigned short)g_instance.attr.attr_network.PoolerPort, g_instance.attr.attr_network.UnixSocketDir, - t_thrd.postmaster_cxt.ListenSocket, + g_instance.listen_cxt.ListenSocket, MAXLISTEN, false, false, @@ -1919,7 +1942,7 @@ int PostmasterMain(int argc, char* argv[]) NULL, (unsigned short)g_instance.attr.attr_network.comm_sctp_port, g_instance.attr.attr_network.UnixSocketDir, - t_thrd.postmaster_cxt.ListenSocket, + g_instance.listen_cxt.ListenSocket, MAXLISTEN, false, true, @@ -1932,9 +1955,9 @@ int PostmasterMain(int argc, char* argv[]) #endif /* - * check that we have some socket to listen on - */ - if (t_thrd.postmaster_cxt.ListenSocket[0] == PGINVALID_SOCKET) { + * check that we have some socket to listen on + */ + if (g_instance.listen_cxt.ListenSocket[0] == PGINVALID_SOCKET) { ereport(FATAL, (errmsg("no socket created for listening"))); } @@ -2350,6 +2373,26 @@ int PostmasterMain(int argc, char* argv[]) return 0; /* not reached */ } +void libpq_send_cancel_key(int32 pid, int32 key) +{ + StringInfoData buf; + + pq_beginmessage(&buf, 'K'); + pq_sendint32(&buf, pid); + pq_sendint32(&buf, key); + pq_endmessage(&buf); +} + +void libpq_report_param_status(const char *name, char *val) +{ + StringInfoData msgbuf; + + pq_beginmessage(&msgbuf, 'S'); + pq_sendstring(&msgbuf, name); + pq_sendstring(&msgbuf, val); + pq_endmessage(&msgbuf); +} + /* * Compute and check the directory paths to files that are part of the * installation (as deduced from the postgres executable's own location) @@ -2818,6 +2861,15 @@ static int ServerLoop(void) #endif } + if (g_instance.listen_cxt.reload_fds) { +#ifdef HAVE_POLL + nSockets = initPollfd(ufds); +#else + nSockets = initMasks(&readmask); +#endif + g_instance.listen_cxt.reload_fds = false; + } + /* * Wait for a connection request to arrive. * @@ -2919,7 +2971,7 @@ static int ServerLoop(void) ufds[i].revents = 0; #else - if (FD_ISSET(t_thrd.postmaster_cxt.ListenSocket[i], &rmask)) { + if (FD_ISSET(g_instance.listen_cxt.ListenSocket[i], &rmask)) { #endif Port* port = NULL; @@ -2928,16 +2980,16 @@ static int ServerLoop(void) if (IS_FD_TO_RECV_GSSOCK(ufds[i].fd)) { port = ConnCreateToRecvGssock(ufds, i, &nSockets); } else { - port = ConnCreate(ufds[i].fd); + port = ConnCreate(ufds[i].fd, i); } if (port != NULL) { int result = STATUS_OK; bool isConnectHaPort = - (i < MAXLISTEN) && (t_thrd.postmaster_cxt.listen_sock_type[i] == HA_LISTEN_SOCKET); + (i < MAXLISTEN) && (g_instance.listen_cxt.listen_sock_type[i] == HA_LISTEN_SOCKET); /* * Since at present, HA only uses TCP sockets, we can directly compare - * the corresponding enty in t_thrd.postmaster_cxt.listen_sock_type, even + * the corresponding enty in g_instance.listen_cxt.listen_sock_type, even * though ufds are not one-to-one mapped to tcp and sctp socket array. * If HA adopts STCP sockets later, we will need to maintain socket type * array for ufds in initPollfd. @@ -3383,7 +3435,7 @@ static int initPollfd(struct pollfd* ufds) } for (i = 0; i < MAXLISTEN; i++) { - fd = t_thrd.postmaster_cxt.ListenSocket[i]; + fd = g_instance.listen_cxt.ListenSocket[i]; if (fd == PGINVALID_SOCKET) break; @@ -3412,7 +3464,7 @@ static int initMasks(fd_set* rmask) FD_ZERO(rmask); for (i = 0; i < MAXLISTEN; i++) { - int fd = t_thrd.postmaster_cxt.ListenSocket[i]; + int fd = g_instance.listen_cxt.ListenSocket[i]; if (fd == PGINVALID_SOCKET) { continue; @@ -4284,7 +4336,7 @@ static Port* ConnCreateToRecvGssock(pollfd* ufds, int idx, int* nSockets) * so receiver flow ctrl make connection and listening fd is polled up */ if (ufds[idx].fd == t_thrd.libpq_cxt.listen_fd_for_recv_flow_ctrl) { - port = ConnCreate(ufds[idx].fd); + port = ConnCreate(ufds[idx].fd, idx); if (port == NULL) return port; @@ -4345,7 +4397,7 @@ static Port* ConnCreateToRecvGssock(pollfd* ufds, int idx, int* nSockets) * * Returns NULL on failure, other than out-of-memory which is fatal. */ -static Port* ConnCreate(int serverFd) +static Port* ConnCreate(int serverFd, int idx) { Port* port = NULL; @@ -4360,6 +4412,11 @@ static Port* ConnCreate(int serverFd) port->sock = PGINVALID_SOCKET; port->gs_sock = GS_INVALID_GSOCK; + ProtocolExtensionConfig* protocol = ListenConfig[idx]; + + Assert(protocol != NULL); + port->protocol_config = protocol; + if (StreamConnection(serverFd, port) != STATUS_OK) { if (port->sock >= 0) StreamClose(port->sock); @@ -4462,9 +4519,9 @@ void ClosePostmasterPorts(bool am_syslogger) /* Close the listen sockets */ for (i = 0; i < MAXLISTEN; i++) { - if (t_thrd.postmaster_cxt.ListenSocket[i] != PGINVALID_SOCKET) { - StreamClose(t_thrd.postmaster_cxt.ListenSocket[i]); - t_thrd.postmaster_cxt.ListenSocket[i] = PGINVALID_SOCKET; + if (g_instance.listen_cxt.ListenSocket[i] != PGINVALID_SOCKET) { + StreamClose(g_instance.listen_cxt.ListenSocket[i]); + g_instance.listen_cxt.ListenSocket[i] = PGINVALID_SOCKET; } } /* If using syslogger, close the read side of the pipe */ @@ -4509,9 +4566,9 @@ static void CloseServerPorts(int status, Datum arg) * condition if a new postmaster wants to re-use the TCP port number. */ for (i = 0; i < MAXLISTEN; i++) { - if (t_thrd.postmaster_cxt.ListenSocket[i] != PGINVALID_SOCKET) { - StreamClose(t_thrd.postmaster_cxt.ListenSocket[i]); - t_thrd.postmaster_cxt.ListenSocket[i] = PGINVALID_SOCKET; + if (g_instance.listen_cxt.ListenSocket[i] != PGINVALID_SOCKET) { + StreamClose(g_instance.listen_cxt.ListenSocket[i]); + g_instance.listen_cxt.ListenSocket[i] = PGINVALID_SOCKET; } } @@ -4525,13 +4582,13 @@ void socket_close_on_exec(void) { /* Close the listen sockets */ for (int i = 0; i < MAXLISTEN; i++) { - if (t_thrd.postmaster_cxt.ListenSocket[i] != PGINVALID_SOCKET) { - int flags = comm_fcntl(t_thrd.postmaster_cxt.ListenSocket[i], F_GETFD); + if (g_instance.listen_cxt.ListenSocket[i] != PGINVALID_SOCKET) { + int flags = comm_fcntl(g_instance.listen_cxt.ListenSocket[i], F_GETFD); if (flags < 0) ereport(ERROR, (errcode(ERRCODE_SYSTEM_ERROR), errmsg("fcntl F_GETFD failed!"))); flags |= FD_CLOEXEC; - if (comm_fcntl(t_thrd.postmaster_cxt.ListenSocket[i], F_SETFD, flags) < 0) + if (comm_fcntl(g_instance.listen_cxt.ListenSocket[i], F_SETFD, flags) < 0) ereport(ERROR, (errcode(ERRCODE_SYSTEM_ERROR), errmsg("fcntl F_SETFD failed!"))); } } @@ -7802,7 +7859,11 @@ static void BackendInitialize(Port* port) * to the client. Must do this now because authentication uses libpq to * send messages. */ - pq_init(); + if (port->protocol_config && port->protocol_config->fn_init) { + port->protocol_config->fn_init(); + } else { + pq_init(); + } /* now safe to ereport to client */ t_thrd.postgres_cxt.whereToSendOutput = DestRemote; @@ -7822,7 +7883,6 @@ static void BackendInitialize(Port* port) return; int status = ClientConnInitilize(port); - if (status == STATUS_EOF) return; else if (status == STATUS_ERROR) @@ -7865,7 +7925,7 @@ int ClientConnInitilize(Port* port) ereport(FATAL, (errmsg("could not set timer for startup packet timeout"))); } #endif - int status = StartupPacketInitialize(port); + int status = port->protocol_config->fn_start(port); /* * Stop here if it was bad or a cancel packet. ProcessStartupPacket @@ -8074,7 +8134,7 @@ void CheckClientIp(Port* port) /* Check whether the client ip is configured in pg_hba.conf */ char ip[IP_LEN] = {'\0'}; if (!check_ip_whitelist(port, ip, IP_LEN)) { - pq_init(); /* initialize libpq to talk to client */ + port->protocol_config->fn_init(); /* initialize libpq to talk to client */ t_thrd.postgres_cxt.whereToSendOutput = DestRemote; /* now safe to ereport to client */ ereport(FATAL, (errcode(ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION), @@ -9978,12 +10038,6 @@ static bool save_backend_variables(BackendParameters* param, Port* port, HANDLE strlcpy(param->DataDir, t_thrd.proc_cxt.DataDir, MAXPGPATH); - ss_rc = memcpy_s(¶m->ListenSocket, - sizeof(param->ListenSocket), - &t_thrd.postmaster_cxt.ListenSocket, - sizeof(t_thrd.postmaster_cxt.ListenSocket)); - securec_check(ss_rc, "\0", "\0"); - param->MyCancelKey = t_thrd.proc_cxt.MyCancelKey; param->MyPMChildSlot = t_thrd.proc_cxt.MyPMChildSlot; @@ -9996,9 +10050,6 @@ static bool save_backend_variables(BackendParameters* param, Port* port, HANDLE param->mainLWLockArray = (LWLock*)t_thrd.shemem_ptr_cxt.mainLWLockArray; param->PMSignalState = t_thrd.shemem_ptr_cxt.PMSignalState; - param->LocalIpNum = t_thrd.postmaster_cxt.LocalIpNum; - int rc = memcpy_s(param->LocalAddrList, (MAXLISTEN * IP_LEN), t_thrd.postmaster_cxt.LocalAddrList, (MAXLISTEN * IP_LEN)); - securec_check(rc, "", ""); param->HaShmData = t_thrd.postmaster_cxt.HaShmData; param->PgStartTime = t_thrd.time_cxt.pg_start_time; @@ -10138,12 +10189,6 @@ static void restore_backend_variables(BackendParameters* param, Port* port) SetDataDir(param->DataDir); - int ss_rc = memcpy_s(&t_thrd.postmaster_cxt.ListenSocket, - sizeof(t_thrd.postmaster_cxt.ListenSocket), - ¶m->ListenSocket, - sizeof(t_thrd.postmaster_cxt.ListenSocket)); - securec_check(ss_rc, "\0", "\0"); - t_thrd.proc_cxt.MyCancelKey = param->MyCancelKey; t_thrd.proc_cxt.MyPMChildSlot = param->MyPMChildSlot; @@ -10156,9 +10201,6 @@ static void restore_backend_variables(BackendParameters* param, Port* port) t_thrd.shemem_ptr_cxt.mainLWLockArray = (LWLockPadded*)param->mainLWLockArray; t_thrd.shemem_ptr_cxt.PMSignalState = param->PMSignalState; - t_thrd.postmaster_cxt.LocalIpNum = param->LocalIpNum; - rc = memcpy_s(t_thrd.postmaster_cxt.LocalAddrList, (MAXLISTEN * IP_LEN), param->LocalAddrList, (MAXLISTEN * IP_LEN)); - securec_check(rc, "", ""); t_thrd.postmaster_cxt.HaShmData = param->HaShmData; t_thrd.time_cxt.pg_start_time = param->PgStartTime; t_thrd.time_cxt.pg_reload_time = param->PgReloadTime; @@ -10174,7 +10216,7 @@ static void restore_backend_variables(BackendParameters* param, Port* port) PostmasterHandle = param->PostmasterHandle; pgwin32_initial_signal_pipe = param->initial_signal_pipe; #endif - ss_rc = memcpy_s(&t_thrd.postmaster_cxt.syslogPipe, + int ss_rc = memcpy_s(&t_thrd.postmaster_cxt.syslogPipe, sizeof(t_thrd.postmaster_cxt.syslogPipe), ¶m->syslogPipe, sizeof(t_thrd.postmaster_cxt.syslogPipe)); @@ -10481,10 +10523,10 @@ bool IsLocalAddr(Port* port) if (AF_UNIX == laddr->sa_family) { return true; } - for (i = 0; i != t_thrd.postmaster_cxt.LocalIpNum; ++i) { - if (0 == strcmp(local_ip, t_thrd.postmaster_cxt.LocalAddrList[i]) || - (AF_INET == laddr->sa_family && 0 == strcmp("0.0.0.0", t_thrd.postmaster_cxt.LocalAddrList[i])) || - (AF_INET6 == laddr->sa_family && 0 == strcmp("::", t_thrd.postmaster_cxt.LocalAddrList[i]))) { + for (i = 0; i != g_instance.listen_cxt.LocalIpNum; ++i) { + if (0 == strcmp(local_ip, g_instance.listen_cxt.LocalAddrList[i]) || + (AF_INET == laddr->sa_family && 0 == strcmp("0.0.0.0", g_instance.listen_cxt.LocalAddrList[i])) || + (AF_INET6 == laddr->sa_family && 0 == strcmp("::", g_instance.listen_cxt.LocalAddrList[i]))) { return true; } @@ -10494,8 +10536,8 @@ bool IsLocalAddr(Port* port) } return false; #else - for (i = 0; i != t_thrd.postmaster_cxt.LocalIpNum; ++i) { - ereport(DEBUG1, (errmsg("LocalAddrIP %s\n", t_thrd.postmaster_cxt.LocalAddrList[i]))); + for (i = 0; i != g_instance.listen_cxt.LocalIpNum; ++i) { + ereport(DEBUG1, (errmsg("LocalAddrIP %s\n", g_instance.listen_cxt.LocalAddrList[i]))); } return true; #endif @@ -10735,7 +10777,7 @@ static bool IsAlreadyListen(const char* ip, int port) } for (listen_index = 0; listen_index != MAXLISTEN; ++listen_index) { - if (t_thrd.postmaster_cxt.ListenSocket[listen_index] != PGINVALID_SOCKET) { + if (g_instance.listen_cxt.ListenSocket[listen_index] != PGINVALID_SOCKET) { struct sockaddr_storage saddr; socklen_t slen = 0; char* result = NULL; @@ -10743,7 +10785,7 @@ static bool IsAlreadyListen(const char* ip, int port) securec_check(rc, "\0", "\0"); slen = sizeof(saddr); - if (comm_getsockname(t_thrd.postmaster_cxt.ListenSocket[listen_index], + if (comm_getsockname(g_instance.listen_cxt.ListenSocket[listen_index], (struct sockaddr*)&saddr, (socklen_t*)&slen) < 0) { ereport(LOG, (errmsg("Error in getsockname int IsAlreadyListen()"))); @@ -10900,7 +10942,7 @@ void CreateServerSocket( NULL, (unsigned short)portNumber, g_instance.attr.attr_network.UnixSocketDir, - t_thrd.postmaster_cxt.ListenSocket, + g_instance.listen_cxt.ListenSocket, MAXLISTEN, add_localaddr_flag, is_create_psql_sock, @@ -10910,7 +10952,7 @@ void CreateServerSocket( ipaddr, (unsigned short)portNumber, g_instance.attr.attr_network.UnixSocketDir, - t_thrd.postmaster_cxt.ListenSocket, + g_instance.listen_cxt.ListenSocket, MAXLISTEN, add_localaddr_flag, is_create_psql_sock, @@ -10991,13 +11033,13 @@ static void CreateHaListenSocket(void) * if already has been created, just set the createmodel and continue. */ for (i = 0; i < MAXLISTEN; i++) { - if (PGINVALID_SOCKET == t_thrd.postmaster_cxt.ListenSocket[i] || - t_thrd.postmaster_cxt.listen_sock_type[i] != HA_LISTEN_SOCKET) { + if (PGINVALID_SOCKET == g_instance.listen_cxt.ListenSocket[i] || + g_instance.listen_cxt.listen_sock_type[i] != HA_LISTEN_SOCKET) { continue; } s_len = sizeof(sock_addr); - if (comm_getsockname(t_thrd.postmaster_cxt.ListenSocket[i], (struct sockaddr*)&sock_addr, (socklen_t*)&s_len) < 0) { + if (comm_getsockname(g_instance.listen_cxt.ListenSocket[i], (struct sockaddr*)&sock_addr, (socklen_t*)&s_len) < 0) { ereport(LOG, (errmsg("Error in getsockname int IsAlreadyListen()"))); } success = 0; @@ -11027,9 +11069,9 @@ static void CreateHaListenSocket(void) /* if the socket is not match all the IP-Port pair in newListenAddrs, close it and set listen_sock_type */ if (!success) { - (void)comm_closesocket(t_thrd.postmaster_cxt.ListenSocket[i]); - t_thrd.postmaster_cxt.ListenSocket[i] = PGINVALID_SOCKET; - t_thrd.postmaster_cxt.listen_sock_type[i] = UNUSED_LISTEN_SOCKET; + (void)comm_closesocket(g_instance.listen_cxt.ListenSocket[i]); + g_instance.listen_cxt.ListenSocket[i] = PGINVALID_SOCKET; + g_instance.listen_cxt.listen_sock_type[i] = UNUSED_LISTEN_SOCKET; } } @@ -11089,9 +11131,9 @@ static void IntArrayRegulation(int array[], int len, int def) */ static void ListenSocketRegulation(void) { - IntArrayRegulation((int*)t_thrd.postmaster_cxt.ListenSocket, MAXLISTEN, (int)PGINVALID_SOCKET); + IntArrayRegulation((int*)g_instance.listen_cxt.ListenSocket, MAXLISTEN, (int)PGINVALID_SOCKET); - IntArrayRegulation((int*)t_thrd.postmaster_cxt.listen_sock_type, MAXLISTEN, UNUSED_LISTEN_SOCKET); + IntArrayRegulation((int*)g_instance.listen_cxt.listen_sock_type, MAXLISTEN, UNUSED_LISTEN_SOCKET); } DbState get_local_dbstate_sub(WalRcvData* walrcv, ServerMode mode) diff --git a/src/gausskernel/process/tcop/dest.cpp b/src/gausskernel/process/tcop/dest.cpp index 66a57cd25..49586e9bb 100644 --- a/src/gausskernel/process/tcop/dest.cpp +++ b/src/gausskernel/process/tcop/dest.cpp @@ -109,7 +109,7 @@ DestReceiver* CreateDestReceiver(CommandDest dest) switch (dest) { case DestRemote: case DestRemoteExecute: - return printtup_create_DR(dest); + return u_sess->proc_cxt.MyProcPort->protocol_config->fn_printtup_create_DR(dest); case DestNone: return &donothingDR; diff --git a/src/gausskernel/process/tcop/postgres.cpp b/src/gausskernel/process/tcop/postgres.cpp index bb8fcb67d..4a1440340 100755 --- a/src/gausskernel/process/tcop/postgres.cpp +++ b/src/gausskernel/process/tcop/postgres.cpp @@ -266,7 +266,6 @@ static XLogRecPtr xlogCopyStart = InvalidXLogRecPtr; */ static int InteractiveBackend(StringInfo inBuf); static int interactive_getc(void); -static int SocketBackend(StringInfo inBuf); static int ReadCommand(StringInfo inBuf); static List* pg_rewrite_query(Query* query); bool check_log_statement(List* stmt_list); @@ -497,7 +496,7 @@ static int interactive_getc(void) * EOF is returned if the connection is lost. * ---------------- */ -static int SocketBackend(StringInfo inBuf) +int SocketBackend(StringInfo inBuf) { int qtype; #ifdef ENABLE_MULTIPLE_NODES @@ -765,7 +764,7 @@ static int ReadCommand(StringInfo inBuf) #endif if (t_thrd.postgres_cxt.whereToSendOutput == DestRemote) - result = SocketBackend(inBuf); + result = u_sess->proc_cxt.MyProcPort->protocol_config->fn_read_command(inBuf); else if (t_thrd.postgres_cxt.whereToSendOutput == DestDebug) result = InteractiveBackend(inBuf); else @@ -7846,12 +7845,12 @@ int PostgresMain(int argc, char* argv[], const char* dbname, const char* usernam */ if (t_thrd.postgres_cxt.whereToSendOutput == DestRemote && PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2 && !IS_THREAD_POOL_WORKER) { - StringInfoData buf; - pq_beginmessage(&buf, 'K'); - pq_sendint32(&buf, (int32)t_thrd.proc_cxt.MyPMChildSlot); - pq_sendint32(&buf, (int32)t_thrd.proc_cxt.MyCancelKey); - pq_endmessage(&buf); + Port* MyProcPort = u_sess->proc_cxt.MyProcPort; + if (MyProcPort && MyProcPort->protocol_config->fn_send_cancel_key) { + MyProcPort->protocol_config->fn_send_cancel_key((int32)t_thrd.proc_cxt.MyPMChildSlot, + (int32)t_thrd.proc_cxt.MyCancelKey); + } /* DN send thread pid to CN */ if (IsConnFromCoord() && IS_PGXC_DATANODE && (t_thrd.proc && t_thrd.proc->workingVersionNum >= 92060)) { @@ -8020,8 +8019,11 @@ int PostgresMain(int argc, char* argv[], const char* dbname, const char* usernam t_thrd.postgres_cxt.DoingCommandRead = false; /* Make sure libpq is in a good state */ - pq_comm_reset(); - + Port* MyProcPort = u_sess->proc_cxt.MyProcPort; + if (MyProcPort && MyProcPort->protocol_config->fn_comm_reset) { + MyProcPort->protocol_config->fn_comm_reset(); + } + /* statement retry phase : long jump */ if (IsStmtRetryEnabled()) { bool is_extended_query = u_sess->postgres_cxt.doing_extended_query_message; @@ -8384,7 +8386,8 @@ int PostgresMain(int argc, char* argv[], const char* dbname, const char* usernam /* * If connection to client is lost, we do not need to send message to client. */ - if (IS_CLIENT_CONN_VALID(u_sess->proc_cxt.MyProcPort) && (!t_thrd.int_cxt.ClientConnectionLost)) { + Port* MyProcPort = u_sess->proc_cxt.MyProcPort; + if (IS_CLIENT_CONN_VALID(MyProcPort) && (!t_thrd.int_cxt.ClientConnectionLost)) { /* * before send 'Z' to frontend, we should check if INTERRUPTS happends or not. * In SyncRepWaitForLSN() function, take HOLD_INTERRUPTS() to prevent interrupt happending and set @@ -8393,7 +8396,7 @@ int PostgresMain(int argc, char* argv[], const char* dbname, const char* usernam * interrupt. So frontend will be reveiving message forever and do nothing, which is wrong. */ CHECK_FOR_INTERRUPTS(); - ReadyForQuery((CommandDest)t_thrd.postgres_cxt.whereToSendOutput); + MyProcPort->protocol_config->fn_send_ready_for_query((CommandDest)t_thrd.postgres_cxt.whereToSendOutput); } #ifdef ENABLE_MULTIPLE_NODES /* @@ -8560,6 +8563,16 @@ int PostgresMain(int argc, char* argv[], const char* dbname, const char* usernam ResetGotPoolReload(false); } + /* + * call the protocol hook to process the request. + */ + if (firstchar != EOF && u_sess->proc_cxt.MyProcPort && + u_sess->proc_cxt.MyProcPort->protocol_config->fn_process_command) { + firstchar = u_sess->proc_cxt.MyProcPort->protocol_config->fn_process_command(&input_message); + send_ready_for_query = true; + continue; + } + /* * (7) process the command. But ignore it if we're skipping till * Sync. diff --git a/src/gausskernel/process/threadpool/knl_thread.cpp b/src/gausskernel/process/threadpool/knl_thread.cpp index bb72f9001..ddf814664 100755 --- a/src/gausskernel/process/threadpool/knl_thread.cpp +++ b/src/gausskernel/process/threadpool/knl_thread.cpp @@ -1527,7 +1527,6 @@ static void knl_t_postmaster_init(knl_t_postmaster_context* postmaster_cxt) securec_check(rc, "\0", "\0"); postmaster_cxt->HaShmData = NULL; - postmaster_cxt->LocalIpNum = 0; postmaster_cxt->IsRPCWorkerThread = false; postmaster_cxt->audit_primary_start = true; postmaster_cxt->audit_primary_failover = false; diff --git a/src/gausskernel/process/threadpool/threadpool_listener.cpp b/src/gausskernel/process/threadpool/threadpool_listener.cpp index a52d68a4a..eb92d7daa 100644 --- a/src/gausskernel/process/threadpool/threadpool_listener.cpp +++ b/src/gausskernel/process/threadpool/threadpool_listener.cpp @@ -213,6 +213,13 @@ void ThreadPoolListener::AddEpoll(knl_session_context* session) * we find an input event of the socket, so we use one_shot mode. */ ev.events = EPOLLRDHUP | EPOLLIN | EPOLLET | EPOLLONESHOT; + +#ifndef ENABLE_MULTIPLE_NODES + if (session->proc_cxt.MyProcPort->protocol_config->server_handshake_first && session->status == KNL_SESS_UNINIT) { + ev.events |= EPOLLOUT; + } +#endif + ev.data.ptr = (void*)session; if (session->status != KNL_SESS_UNINIT) { /* CommProxy Support */ @@ -404,7 +411,11 @@ knl_session_context* ThreadPoolListener::GetSessionBaseOnEvent(struct epoll_even session->status = KNL_SESS_CLOSE; } return session; - } else if (ev->events & EPOLLIN) { +#ifndef ENABLE_MULTIPLE_NODES + } else if (ev->events & (EPOLLIN | EPOLLOUT)) { +#else + } else if (ev->events & EPOLLIN) { +#endif return session; } return NULL; diff --git a/src/gausskernel/process/threadpool/threadpool_worker.cpp b/src/gausskernel/process/threadpool/threadpool_worker.cpp index 4b796dc45..e330fc3bc 100644 --- a/src/gausskernel/process/threadpool/threadpool_worker.cpp +++ b/src/gausskernel/process/threadpool/threadpool_worker.cpp @@ -907,7 +907,7 @@ static bool InitSession(knl_session_context* session) (unsigned int)maxChunksPerProcess << (chunkSizeInBits - BITS_IN_MB)))); } - ReadyForQuery((CommandDest)t_thrd.postgres_cxt.whereToSendOutput); + session->proc_cxt.MyProcPort->protocol_config->fn_send_ready_for_query((CommandDest)t_thrd.postgres_cxt.whereToSendOutput); return true; } @@ -937,12 +937,12 @@ static void SendSessionIdxToClient() GenerateCancelKey(true); if (t_thrd.postgres_cxt.whereToSendOutput == DestRemote && PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2) { - StringInfoData buf; - - pq_beginmessage(&buf, 'K'); - pq_sendint32(&buf, (uint32)u_sess->session_ctr_index); - pq_sendint32(&buf, (uint32)u_sess->cancel_key); - pq_endmessage(&buf); + + Port* MyProcPort = u_sess->proc_cxt.MyProcPort; + if (MyProcPort && MyProcPort->protocol_config->fn_send_cancel_key) { + MyProcPort->protocol_config->fn_send_cancel_key((uint32)u_sess->session_ctr_index, + (uint32)u_sess->cancel_key); + } } } diff --git a/src/gausskernel/runtime/executor/spi.cpp b/src/gausskernel/runtime/executor/spi.cpp index 848e348aa..e58367495 100644 --- a/src/gausskernel/runtime/executor/spi.cpp +++ b/src/gausskernel/runtime/executor/spi.cpp @@ -2704,7 +2704,11 @@ static int _SPI_execute_plan0(SPIPlanPtr plan, ParamListInfo paramLI, Snapshot s my_res = SPI_ERROR_COPY; goto fail; } - } else if (IsA(stmt, TransactionStmt)) { + } else if (IsA(stmt, TransactionStmt) +#ifndef ENABLE_MULTIPLE_NODES + && !u_sess->attr.attr_sql.dolphin +#endif + ) { my_res = SPI_ERROR_TRANSACTION; goto fail; } @@ -2731,6 +2735,13 @@ static int _SPI_execute_plan0(SPIPlanPtr plan, ParamListInfo paramLI, Snapshot s QueryDesc *qdesc = NULL; Snapshot snap = ActiveSnapshotSet() ? GetActiveSnapshot() : InvalidSnapshot; +#ifndef ENABLE_MULTIPLE_NODES + Port *MyPort = u_sess->proc_cxt.MyProcPort; + if (MyPort && MyPort->protocol_config->fn_set_DR_params) { + MyPort->protocol_config->fn_set_DR_params(dest, ((PlannedStmt *)stmt)->planTree->targetlist); + } +#endif + qdesc = CreateQueryDesc((PlannedStmt *)stmt, plansource->query_string, snap, crosscheck_snapshot, dest, paramLI, 0); res = _SPI_pquery(qdesc, fire_triggers, canSetTag ? tcount : 0, from_lock); @@ -3063,7 +3074,11 @@ static int _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, long tcount, bo switch (operation) { case CMD_SELECT: Assert(queryDesc->plannedstmt->utilityStmt == NULL); - if (queryDesc->dest->mydest != DestSPI) { + if (queryDesc->dest->mydest != DestSPI +#ifndef ENABLE_MULTIPLE_NODES + && queryDesc->dest->mydest != DestRemote +#endif + ) { /* Don't return SPI_OK_SELECT if we're discarding result */ res = SPI_OK_UTILITY; } else diff --git a/src/include/access/printtup.h b/src/include/access/printtup.h index 070e51572..ffca29d68 100644 --- a/src/include/access/printtup.h +++ b/src/include/access/printtup.h @@ -55,6 +55,9 @@ typedef struct { int nattrs; PrinttupAttrInfo* myinfo; /* Cached info about each attr */ int16* formats; /* format code for each column */ +#ifndef ENABLE_MULTIPLE_NODES + List* target_list; /* output target list item */ +#endif } DR_printtup; typedef struct { diff --git a/src/include/catalog/pg_authid.h b/src/include/catalog/pg_authid.h index 8aea32860..7ba0732dc 100644 --- a/src/include/catalog/pg_authid.h +++ b/src/include/catalog/pg_authid.h @@ -76,6 +76,9 @@ CATALOG(pg_authid,1260) BKI_SHARED_RELATION BKI_ROWTYPE_OID(2842) BKI_SCHEMA_MAC bool rolmonitoradmin; bool roloperatoradmin; bool rolpolicyadmin; +#ifdef CATALOG_VARLEN + text rolpasswordext; /* passwordext, if any for other database encrypt rule, dolphin etc. */ +#endif } FormData_pg_authid; #undef timestamptz @@ -92,7 +95,7 @@ typedef FormData_pg_authid *Form_pg_authid; * compiler constants for pg_authid * ---------------- */ -#define Natts_pg_authid 26 +#define Natts_pg_authid 27 #define Anum_pg_authid_rolname 1 #define Anum_pg_authid_rolsuper 2 #define Anum_pg_authid_rolinherit 3 @@ -119,6 +122,7 @@ typedef FormData_pg_authid *Form_pg_authid; #define Anum_pg_authid_rolmonitoradmin 24 #define Anum_pg_authid_roloperatoradmin 25 #define Anum_pg_authid_rolpolicyadmin 26 +#define Anum_pg_authid_rolpasswordext 27 /* ---------------- * initial contents of pg_authid @@ -127,15 +131,15 @@ typedef FormData_pg_authid *Form_pg_authid; * user choices. * ---------------- */ -DATA(insert OID = 10 ( "POSTGRES" t t t t t t t t t -1 _null_ _null_ _null_ "default_pool" t 0 _null_ n 0 _null_ _null_ _null_ t t t)); -DATA(insert OID = 1044 ( "gs_role_copy_files" f t f f f f f f f -1 _null_ _null_ _null_ "default_pool" f 0 _null_ n 0 _null_ _null_ _null_ f f f)); -DATA(insert OID = 1045 ( "gs_role_signal_backend" f t f f f f f f f -1 _null_ _null_ _null_ "default_pool" f 0 _null_ n 0 _null_ _null_ _null_ f f f)); -DATA(insert OID = 1046 ( "gs_role_tablespace" f t f f f f f f f -1 _null_ _null_ _null_ "default_pool" f 0 _null_ n 0 _null_ _null_ _null_ f f f)); -DATA(insert OID = 1047 ( "gs_role_replication" f t f f f f f f f -1 _null_ _null_ _null_ "default_pool" f 0 _null_ n 0 _null_ _null_ _null_ f f f)); -DATA(insert OID = 1048 ( "gs_role_account_lock" f t f f f f f f f -1 _null_ _null_ _null_ "default_pool" f 0 _null_ n 0 _null_ _null_ _null_ f f f)); -DATA(insert OID = 1055 ( "gs_role_pldebugger" f t f f f f f f f -1 _null_ _null_ _null_ "default_pool" f 0 _null_ n 0 _null_ _null_ _null_ f f f)); -DATA(insert OID = 1056 ( "gs_role_directory_create" f t f f f f f f f -1 _null_ _null_ _null_ "default_pool" f 0 _null_ n 0 _null_ _null_ _null_ f f f)); -DATA(insert OID = 1059 ( "gs_role_directory_drop" f t f f f f f f f -1 _null_ _null_ _null_ "default_pool" f 0 _null_ n 0 _null_ _null_ _null_ f f f)); +DATA(insert OID = 10 ( "POSTGRES" t t t t t t t t t -1 _null_ _null_ _null_ "default_pool" t 0 _null_ n 0 _null_ _null_ _null_ t t t _null_)); +DATA(insert OID = 1044 ( "gs_role_copy_files" f t f f f f f f f -1 _null_ _null_ _null_ "default_pool" f 0 _null_ n 0 _null_ _null_ _null_ f f f _null_)); +DATA(insert OID = 1045 ( "gs_role_signal_backend" f t f f f f f f f -1 _null_ _null_ _null_ "default_pool" f 0 _null_ n 0 _null_ _null_ _null_ f f f _null_)); +DATA(insert OID = 1046 ( "gs_role_tablespace" f t f f f f f f f -1 _null_ _null_ _null_ "default_pool" f 0 _null_ n 0 _null_ _null_ _null_ f f f _null_)); +DATA(insert OID = 1047 ( "gs_role_replication" f t f f f f f f f -1 _null_ _null_ _null_ "default_pool" f 0 _null_ n 0 _null_ _null_ _null_ f f f _null_)); +DATA(insert OID = 1048 ( "gs_role_account_lock" f t f f f f f f f -1 _null_ _null_ _null_ "default_pool" f 0 _null_ n 0 _null_ _null_ _null_ f f f _null_)); +DATA(insert OID = 1055 ( "gs_role_pldebugger" f t f f f f f f f -1 _null_ _null_ _null_ "default_pool" f 0 _null_ n 0 _null_ _null_ _null_ f f f _null_)); +DATA(insert OID = 1056 ( "gs_role_directory_create" f t f f f f f f f -1 _null_ _null_ _null_ "default_pool" f 0 _null_ n 0 _null_ _null_ _null_ f f f _null_)); +DATA(insert OID = 1059 ( "gs_role_directory_drop" f t f f f f f f f -1 _null_ _null_ _null_ "default_pool" f 0 _null_ n 0 _null_ _null_ _null_ f f f _null_)); #define BOOTSTRAP_SUPERUSERID 10 #define DEFAULT_ROLE_COPY_FILES 1044 diff --git a/src/include/knl/knl_instance.h b/src/include/knl/knl_instance.h index e5ecb9110..f4fa14b4e 100755 --- a/src/include/knl/knl_instance.h +++ b/src/include/knl/knl_instance.h @@ -1124,6 +1124,17 @@ typedef struct knl_g_streaming_dr_context { slock_t mutex; /* locks shared variables shown above */ } knl_g_streaming_dr_context; +typedef struct knl_g_listen_context { + #define MAXLISTEN 64 + #define IP_LEN 64 + /* The socket(s) we're listening to. */ + pgsocket ListenSocket[MAXLISTEN]; + char LocalAddrList[MAXLISTEN][IP_LEN]; + int LocalIpNum; + int listen_sock_type[MAXLISTEN]; /* ori type: enum ListenSocketType */ + bool reload_fds; +} knl_g_listen_context; + typedef struct knl_g_startup_context { uint32 remoteReadPageNum; HTAB *badPageHashTbl; @@ -1275,6 +1286,7 @@ typedef struct knl_instance_context { knl_g_audit_context audit_cxt; knl_g_abo_context abo_cxt; + knl_g_listen_context listen_cxt; } knl_instance_context; extern long random(); diff --git a/src/include/knl/knl_thread.h b/src/include/knl/knl_thread.h index 9f1904175..f7c0dc8ff 100755 --- a/src/include/knl/knl_thread.h +++ b/src/include/knl/knl_thread.h @@ -2821,8 +2821,6 @@ typedef enum { typedef struct knl_t_postmaster_context { /* Notice: the value is same sa GUC_MAX_REPLNODE_NUM */ #define MAX_REPLNODE_NUM 9 -#define MAXLISTEN 64 -#define IP_LEN 64 /* flag when process startup packet for logic conn */ bool ProcessStartupPacketForLogicConn; @@ -2846,11 +2844,6 @@ typedef struct knl_t_postmaster_context { bool CrossClusterReplConnChanged[MAX_REPLNODE_NUM]; struct hashmemdata* HaShmData; - /* The socket(s) we're listening to. */ - pgsocket ListenSocket[MAXLISTEN]; - char LocalAddrList[MAXLISTEN][IP_LEN]; - int LocalIpNum; - int listen_sock_type[MAXLISTEN]; /* ori type: enum ListenSocketType */ gs_thread_t CurExitThread; bool IsRPCWorkerThread; diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h index 13118ec06..96b920191 100644 --- a/src/include/libpq/libpq-be.h +++ b/src/include/libpq/libpq-be.h @@ -65,10 +65,12 @@ typedef struct { #endif /* ENABLE_SSPI */ #include "datatype/timestamp.h" +#include "lib/stringinfo.h" #include "libpq/hba.h" #include "libpq/pqcomm.h" #include "libpq/sha2.h" #include "libcomm/libcomm.h" +#include "tcop/dest.h" typedef enum CAC_state { CAC_OK, CAC_STARTUP, CAC_SHUTDOWN, CAC_RECOVERY, CAC_TOOMANY, CAC_WAITBACKUP, CAC_OOM } CAC_state; @@ -87,6 +89,27 @@ typedef struct { } pg_gssinfo; #endif +/* + * ProtocolExtensionConfig + * + * All the callbacks implementing a specific wire protocol + */ +typedef struct ProtocolExtensionConfig { + bool server_handshake_first; + void (*fn_init)(void); + int (*fn_start)(struct Port *port); + void (*fn_authenticate)(struct Port *port); + void (*fn_send_message)(ErrorData *edata); + void (*fn_send_cancel_key)(int32 pid, int32 key); + void (*fn_comm_reset)(void); + void (*fn_send_ready_for_query)(CommandDest dest); + int (*fn_read_command)(StringInfo inBuf); + DestReceiver* (*fn_printtup_create_DR)(CommandDest dest); + void (*fn_set_DR_params)(DestReceiver* self, List* target_list); + int (*fn_process_command)(StringInfo inBuf); + void (*fn_report_param_status)(const char *name, char *val); +} ProtocolExtensionConfig; + /* * This is used by the postmaster in its communication with frontends. It * contains all state information needed during this communication before the @@ -121,6 +144,8 @@ typedef struct Port { CAC_state canAcceptConnections; /* postmaster connection status */ + ProtocolExtensionConfig *protocol_config; /* wire protocol functions */ + /* * Information that needs to be saved from the startup packet and passed * into backend execution. "char *" fields are NULL if not set. diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h index 3d364cf18..6f69c3204 100644 --- a/src/include/libpq/libpq.h +++ b/src/include/libpq/libpq.h @@ -35,6 +35,8 @@ typedef struct { } u; } PQArgBlock; +extern ProtocolExtensionConfig default_protocol_config; + /* * External functions. */ @@ -44,13 +46,15 @@ typedef struct { */ extern int StreamServerPort(int family, char* hostName, unsigned short portNumber, const char* unixSocketName, pgsocket ListenSocket[], int MaxListen, bool add_localaddr_flag, - bool is_create_psql_sock, bool is_create_libcomm_sock); + bool is_create_psql_sock, bool is_create_libcomm_sock, + ProtocolExtensionConfig* protocol_config = &default_protocol_config); extern int StreamConnection(pgsocket server_fd, Port* port); extern void StreamClose(pgsocket sock); extern void TouchSocketFile(void); extern void pq_init(void); extern void pq_comm_reset(void); extern int pq_getbytes(char* s, size_t len); +extern int pq_discardbytes(size_t len); extern int pq_getstring(StringInfo s); extern int pq_getmessage(StringInfo s, int maxlen); extern int pq_getbyte(void); diff --git a/src/include/libpq/pqcomm.h b/src/include/libpq/pqcomm.h index b1e2ee250..35695f282 100644 --- a/src/include/libpq/pqcomm.h +++ b/src/include/libpq/pqcomm.h @@ -22,6 +22,7 @@ #include #endif #include +#include #ifdef HAVE_STRUCT_SOCKADDR_STORAGE @@ -245,4 +246,6 @@ typedef struct StopRequestPacket { uint32 query_id_end; /* query id of back 4 bytes */ } StopRequestPacket; +extern int internal_putbytes(const char* s, size_t len); + #endif /* PQCOMM_H */ diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 13e06c297..89c501575 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -113,6 +113,7 @@ extern const uint32 COMMENT_SUPPORT_VERSION_NUM; extern const uint32 PLAN_SELECT_VERSION_NUM; extern const uint32 ON_UPDATE_TIMESTAMP_VERSION_NUM; extern const uint32 STANDBY_STMTHIST_VERSION_NUM; +extern const uint32 PG_AUTHID_PASSWORDEXT_VERSION_NUM; extern void register_backend_version(uint32 backend_version); extern bool contain_backend_version(uint32 version_number); diff --git a/src/include/postgres.h b/src/include/postgres.h index c2dfcbb90..b7a1b70f1 100644 --- a/src/include/postgres.h +++ b/src/include/postgres.h @@ -54,6 +54,7 @@ #include "utils/elog.h" #include "utils/palloc.h" #include "storage/spin.h" +#include "lib/stringinfo.h" #ifndef WIN32 #include @@ -970,6 +971,9 @@ extern long codegenIRloadProcessCount; extern pthread_mutex_t nodeDefCopyLock; +/* Returns the message type code, and loads message body data into inBuf */ +extern int SocketBackend(StringInfo inBuf); + /* Job worker Process, execute procedure */ extern void execute_simple_query(const char* query_string); @@ -1017,7 +1021,6 @@ typedef enum { } ClusterRunMode; #ifdef ENABLE_UT -#include "lib/stringinfo.h" extern void exec_describe_statement_message(const char* stmt_name); extern void exec_get_ddl_params(StringInfo input_message); #endif diff --git a/src/include/utils/elog.h b/src/include/utils/elog.h index 7842c49b0..95548ab45 100644 --- a/src/include/utils/elog.h +++ b/src/include/utils/elog.h @@ -531,6 +531,7 @@ extern void ReThrowError(ErrorData* edata) __attribute__((noreturn)); extern void pg_re_throw(void) __attribute__((noreturn)); extern void PgRethrowAsFatal(void); extern char* pg_strdup(const char* in); +extern void send_message_to_frontend(ErrorData* edata); /* GUC-configurable parameters */ diff --git a/src/test/regress/output/predefined_roles.source b/src/test/regress/output/predefined_roles.source index 24fbd0af3..58df46a40 100755 --- a/src/test/regress/output/predefined_roles.source +++ b/src/test/regress/output/predefined_roles.source @@ -1,15 +1,15 @@ -- default roles SELECT oid, * FROM pg_authid WHERE rolname like 'gs_role_%' ORDER BY oid; - oid | rolname | rolsuper | rolinherit | rolcreaterole | rolcreatedb | rolcatupdate | rolcanlogin | rolreplication | rolauditadmin | rolsystemadmin | rolconnlimit | rolpassword | rolvalidbegin | rolvaliduntil | rolrespool | roluseft | rolparentid | roltabspace | rolkind | rolnodegroup | roltempspace | rolspillspace | rolexcpdata | rolmonitoradmin | roloperatoradmin | rolpolicyadmin -------+--------------------------+----------+------------+---------------+-------------+--------------+-------------+----------------+---------------+----------------+--------------+-------------+---------------+---------------+--------------+----------+-------------+-------------+---------+--------------+--------------+---------------+-------------+-----------------+------------------+---------------- - 1044 | gs_role_copy_files | f | t | f | f | f | f | f | f | f | -1 | | | | default_pool | f | 0 | | n | 0 | | | | f | f | f - 1045 | gs_role_signal_backend | f | t | f | f | f | f | f | f | f | -1 | | | | default_pool | f | 0 | | n | 0 | | | | f | f | f - 1046 | gs_role_tablespace | f | t | f | f | f | f | f | f | f | -1 | | | | default_pool | f | 0 | | n | 0 | | | | f | f | f - 1047 | gs_role_replication | f | t | f | f | f | f | f | f | f | -1 | | | | default_pool | f | 0 | | n | 0 | | | | f | f | f - 1048 | gs_role_account_lock | f | t | f | f | f | f | f | f | f | -1 | | | | default_pool | f | 0 | | n | 0 | | | | f | f | f - 1055 | gs_role_pldebugger | f | t | f | f | f | f | f | f | f | -1 | | | | default_pool | f | 0 | | n | 0 | | | | f | f | f - 1056 | gs_role_directory_create | f | t | f | f | f | f | f | f | f | -1 | | | | default_pool | f | 0 | | n | 0 | | | | f | f | f - 1059 | gs_role_directory_drop | f | t | f | f | f | f | f | f | f | -1 | | | | default_pool | f | 0 | | n | 0 | | | | f | f | f + oid | rolname | rolsuper | rolinherit | rolcreaterole | rolcreatedb | rolcatupdate | rolcanlogin | rolreplication | rolauditadmin | rolsystemadmin | rolconnlimit | rolpassword | rolvalidbegin | rolvaliduntil | rolrespool | roluseft | rolparentid | roltabspace | rolkind | rolnodegroup | roltempspace | rolspillspace | rolexcpdata | rolmonitoradmin | roloperatoradmin | rolpolicyadmin | rolpasswordext +------+--------------------------+----------+------------+---------------+-------------+--------------+-------------+----------------+---------------+----------------+--------------+-------------+---------------+---------------+--------------+----------+-------------+-------------+---------+--------------+--------------+---------------+-------------+-----------------+------------------+----------------+---------------- + 1044 | gs_role_copy_files | f | t | f | f | f | f | f | f | f | -1 | | | | default_pool | f | 0 | | n | 0 | | | | f | f | f | + 1045 | gs_role_signal_backend | f | t | f | f | f | f | f | f | f | -1 | | | | default_pool | f | 0 | | n | 0 | | | | f | f | f | + 1046 | gs_role_tablespace | f | t | f | f | f | f | f | f | f | -1 | | | | default_pool | f | 0 | | n | 0 | | | | f | f | f | + 1047 | gs_role_replication | f | t | f | f | f | f | f | f | f | -1 | | | | default_pool | f | 0 | | n | 0 | | | | f | f | f | + 1048 | gs_role_account_lock | f | t | f | f | f | f | f | f | f | -1 | | | | default_pool | f | 0 | | n | 0 | | | | f | f | f | + 1055 | gs_role_pldebugger | f | t | f | f | f | f | f | f | f | -1 | | | | default_pool | f | 0 | | n | 0 | | | | f | f | f | + 1056 | gs_role_directory_create | f | t | f | f | f | f | f | f | f | -1 | | | | default_pool | f | 0 | | n | 0 | | | | f | f | f | + 1059 | gs_role_directory_drop | f | t | f | f | f | f | f | f | f | -1 | | | | default_pool | f | 0 | | n | 0 | | | | f | f | f | (8 rows) \du gs_role_signal_backend