Files
openGauss-server/src/include/libcomm/libcomm.h
quemingjian cee1cfe056 【资源池化】
1.SPQ多机并行协调线程尽可能固定扫描范围
2.修复#I8B8KZ pooler_port参数回归原生情况
2023-10-31 19:55:52 +08:00

752 lines
23 KiB
C++

/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* libcomm.h
*
*
* IDENTIFICATION
* src/include/libcomm/libcomm.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef _GS_LIBCOMM_H_
#define _GS_LIBCOMM_H_
#include <stdio.h>
#include <netinet/in.h>
#include "sys/epoll.h"
#ifndef WIN32
#include <pthread.h>
#else
#include "pthread-win32.h"
#endif
#include <stdlib.h>
#include "c.h"
#include "cipher.h"
#include "utils/palloc.h"
#ifdef USE_SSL
#include "openssl/err.h"
#include "openssl/ssl.h"
#include "openssl/rand.h"
#include "openssl/ossl_typ.h"
#include "openssl/obj_mac.h"
#include "openssl/dh.h"
#include "openssl/bn.h"
#include "openssl/x509.h"
#include "openssl/x509_vfy.h"
#include "openssl/opensslconf.h"
#include "openssl/crypto.h"
#include "openssl/bio.h"
#endif /* USE_SSL */
#define ECOMMTCPARGSINVAL 1001
#define ECOMMTCPMEMALLOC 1002
#define ECOMMTCPCVINIT 1003
#define ECOMMTCPCVDESTROY 1004
#define ECOMMTCPLOCKINIT 1005
#define ECOMMTCPLOCKDESTROY 1006
#define ECOMMTCPNODEIDFD 1007
#define ECOMMTCPNODEIDXTCPFD 1008
#define ECOMMTCPSETSTREAMIDX 1009
#define ECOMMTCPBUFFQSIZE 1010
#define ECOMMTCPQUTOASZIE 1011
#define ECOMMTCPSEMINIT 1012
#define ECOMMTCPSEMPOST 1013
#define ECOMMTCPSEMWAIT 1014
#define ECOMMTCPEPOLLINIT 1015
#define ECOMMTCPEPOLLHNDL 1016
#define ECOMMTCPSCTPADRINIT 1017
#define ECOMMTCPSCTPLISTEN 1018
#define ECOMMTCPCMAILBOXINIT 1019
#define ECOMMTCPNODEIDXSCTPPORT 1020
#define ECOMMTCPSTREAMIDX 1021
#define ECOMMTCPTCPFD 1022
#define ECOMMTCPEPOLLEVNT 1023
#define ECOMMTCPCTRLMSG 1024
#define ECOMMTCPCTRLMSGWR 1025
#define ECOMMTCPCTRLMSGRD 1026
#define ECOMMTCPSTREAMIDXINVAL 1027
#define ECOMMTCPINVALNODEID 1028
#define ECOMMTCPCTRLMSGSIZE 1029
#define ECOMMTCPCVSIGNAL 1030
#define ECOMMTCPEPOLLLST 1031
#define ECOMMTCPCTRLCONN 1032
#define ECOMMTCPSND 1033
#define ECOMMTCPEPOLLCLOSE 1034
#define ECOMMTCPEPOLLTIMEOUT 1035
#define ECOMMTCPHASHENTRYDEL 1036
#define ECOMMTCPTHREADSTOP 1037
#define ECOMMTCPMAILBOXCLOSE 1038
#define ECOMMTCPSTREAMSTATE 1039
#define ECOMMTCPSCTPFDINVAL 1040
#define ECOMMTCPNODATA 1041
#define ECOMMTCPTHREADSTART 1042
#define ECOMMTCPHASHENTRYADD 1043
#define ECOMMTCPBUILDSCTPASSOC 1044
#define ECOMMTCPWRONGSTREAMKEY 1045
#define ECOMMTCPRELEASEMEM 1046
#define ECOMMTCPTCPDISCONNECT 1047
#define ECOMMTCPDISCONNECT 1048
#define ECOMMTCPREMOETECLOSE 1049
#define ECOMMTCPAPPCLOSE 1050
#define ECOMMTCPLOCALCLOSEPOLL 1051
#define ECOMMTCPPEERCLOSEPOLL 1052
#define ECOMMTCPCONNFAIL 1054
#define ECOMMTCPSTREAMCONNFAIL 1055
#define ECOMMTCPREJECTSTREAM 1056
#define ECOMMTCPCONNTIMEOUT 1057
#define ECOMMTCPWAITQUOTAFAIL 1058
#define ECOMMTCPWAITPOLLERROR 1059
#define ECOMMTCPPEERCHANGED 1060
#define ECOMMTCPGSSAUTHFAIL 1061
#define ECOMMTCPSENDTIMEOUT 1062
#define ECOMMTCPNOTINTERNALIP 1063
// Structure definitions.
// Stream key is using to associate producers and consumers
//
#define HOST_ADDRSTRLEN INET6_ADDRSTRLEN
#define HOST_LEN_OF_HTAB 64
#define NAMEDATALEN 64
#define MSG_TIME_LEN 30
#define MAX_DN_NODE_NUM 8192
#define MAX_CN_NODE_NUM 1024
#define MAX_CN_DN_NODE_NUM (MAX_DN_NODE_NUM + MAX_CN_NODE_NUM) //(MaxCoords+MaxDataNodes)
#define MIN_CN_DN_NODE_NUM (1 + 1) //(1 CN + 1 DN)
#define DOUBLE_NAMEDATALEN 128
#define SEC_TO_MICRO_SEC 1000
typedef enum {
LIBCOMM_NONE,
LIBCOMM_SEND_CTRL,
LIBCOMM_RECV_CTRL,
LIBCOMM_RECV_LOOP,
LIBCOMM_AUX
} LibcommThreadTypeDef;
/* send and recv message type */
typedef enum
{
SEND_SOME = 0,
SECURE_READ,
SECURE_WRITE,
READ_DATA,
READ_DATA_FROM_LOGIC
} CommMsgOper;
typedef enum
{
POSTMASTER = 0,
GS_SEND_flow,
GS_RECV_FLOW,
GS_RECV_LOOP,
} CommThreadUsed;
typedef struct CommStreamKey {
uint64 queryId; /* Plan id of current query. */
uint32 planNodeId; /* Plan node id of stream node. */
uint32 producerSmpId; /* Smp id for producer. */
uint32 consumerSmpId; /* Smp id for consumer. */
} TcpStreamKey;
// struct of libcomm logic addr
// idx gives the node idx of backend
// sid gives the logic conn idx with specific node
// ver gives the version of this logic addr, once it is closed ver++
// type is GSOCK_TYPE
typedef struct {
uint16 idx;
uint16 sid;
uint16 ver;
uint16 type;
} gsocket;
struct StreamConnInfo;
// statistic structure
//
typedef struct {
char remote_node[NAMEDATALEN];
char remote_host[HOST_ADDRSTRLEN];
int idx;
int stream_id;
const char* stream_state;
int tcp_sock;
uint64 query_id;
TcpStreamKey stream_key;
long quota_size;
unsigned long buff_usize;
long bytes;
long time;
long speed;
unsigned long local_thread_id;
unsigned long peer_thread_id;
} CommRecvStreamStatus;
typedef struct {
char remote_node[NAMEDATALEN];
char remote_host[HOST_ADDRSTRLEN];
int idx;
int stream_id;
const char* stream_state;
int tcp_sock;
int packet_count;
int quota_count;
uint64 query_id;
TcpStreamKey stream_key;
long bytes;
long time;
long speed;
long quota_size;
long wait_quota;
long send_overhead;
unsigned long local_thread_id;
unsigned long peer_thread_id;
} CommSendStreamStatus;
typedef struct {
long recv_speed;
long send_speed;
int recv_count_speed;
int send_count_speed;
long buffer;
long mem_libcomm;
long mem_libpq;
int postmaster;
int gs_sender_flow;
int gs_receiver_flow;
int gs_receiver_loop;
int stream_conn_num;
} CommStat;
typedef struct {
char remote_node[NAMEDATALEN];
char remote_host[HOST_ADDRSTRLEN];
int idx;
int stream_num;
uint32 min_delay;
uint32 dev_delay;
uint32 max_delay;
} CommDelayInfo;
#ifdef USE_SSL
typedef struct SSL_INFO {
SSL* ssl;
X509* peer;
char* peer_cn;
unsigned long count;
int sock;
} SSL_INFO;
typedef struct libcommsslinfo {
SSL_INFO node;
struct libcommsslinfo* next;
} libcomm_sslinfo;
#endif
typedef struct {
int socket;
char *libcommhost; /* the machine on which the server is running */
char *sslcert; /* client certificate filename */
char *sslcrl; /* certificate revocation list filename */
char *sslkey; /* client key filename */
char *sslrootcert; /* root certificte filename */
char *remote_nodename; /* remote datanode name */
bool sigpipe_so; /* have we masked SIGPIPE via SO_NOSIGPIPE? */
#ifdef USE_SSL
SSL *ssl;
X509 *peer; /* X509 cert of server */
#endif
char* sslmode; /* SSL mode (require,prefer,allow,disable) */
bool sigpipe_flag; /* can we mask SIGPIPE via MSG_NOSIGNAL? */
unsigned char cipher_passwd[CIPHER_LEN + 1];
} LibCommConn;
// sctp address infomation
//
typedef struct libcommaddrinfo {
char* host; // host ip
char nodename[NAMEDATALEN]; // datanode name
int ctrl_port; // control tcp listening port
int listen_port; // listening port
int status; // status of the address info,
// -1:closed, 0:need send, 1:send finish
int nodeIdx; // datanode index, like PGXCNodeId
CommStreamKey streamKey; // stream key, use plan id,plan node id, producer smp id and consumer smp id
unsigned int qid; // query index
bool parallel_send_mode; // this connection use parallel mode to send
int addr_list_size; // how many node in addr info list, only the head node set it
libcommaddrinfo* addr_list_next; // point to next addr info node, libcomm use it to build addr info list
gsocket gs_sock; // libcomm logic addr
} libcomm_addrinfo;
/* cn and dn send and recv message log */
typedef struct MessageIpcLog
{
char type; /* the incomplete message type parsed last time */
int msg_cursor;
int msg_len; /* When the message parsed last time is incomplete, record the true length of the message */
int len_cursor; /* When msglen is parsed to be less than 4 bytes, the received bytes count are recorded */
uint32 len_cache;
/* For consecutive parses to the same message, record the msgtype, length, and last parsed time and same message count. */
char last_msg_type; /* the type of the previous message */
int last_msg_len; /* the message of the previous message */
int last_msg_count; /* same message count */
char last_msg_time[MSG_TIME_LEN]; // the time of the previous message
}MessageIpcLog;
typedef struct MessageCommLog
{
MessageIpcLog recv_ipc_log;
MessageIpcLog send_ipc_log;
}MessageCommLog;
typedef enum {
GSOCK_INVALID,
GSOCK_PRODUCER,
GSOCK_CONSUMER,
GSOCK_DAUL_CHANNEL,
} GSOCK_TYPE;
extern gsocket gs_invalid_gsock;
#define GS_INVALID_GSOCK gs_invalid_gsock
void mc_elog(int elevel, const char* fmt, ...) __attribute__((format(printf, 2, 3)));
#define LIBCOMM_DEBUG_LOG(format, ...) \
do { \
; \
} while (0)
// the role of current node
//
typedef enum { ROLE_PRODUCER, ROLE_CONSUMER, ROLE_MAX_TYPE } SctpNodeRole;
// the connection state of IP+PORT
//
typedef enum { CONNSTATEFAIL, CONNSTATECONNECTING, CONNSTATESUCCEED } ConnectionState;
// the channel type
//
typedef enum { DATA_CHANNEL, CTRL_CHANNEL } ChannelType;
typedef bool (*wakeup_hook_type)(TcpStreamKey key, StreamConnInfo connInfo);
// Set basic initialization information for communication,
// calling by each datanode for initializing the communication layer
//
extern int gs_set_basic_info(const char* local_host, // local ip of the datanode, it can be used "localhost" for simple
const char* local_node_name, // local node name of the datanode, like PGXCNodeName
int node_num, // total number of datanodes, maximum value is 1024, it could be set in postgresql.conf with
// parameter name comm_max_datanode
char* sock_path); // unix domain path
// Connect to destination datanode, and get the sctp stream index for sending
// called by Sender
//
// return: stream index
//
extern int gs_connect(libcommaddrinfo** sctp_addrinfo, // destination address
int addr_num, // connection number
int timeout // timeout threshold, default is 10 min
);
// Regist call back function of receiver for waking up Consumer in executor,
// if a sender connect successfully to receiver, wake up the consumer once
//
extern void gs_connect_regist_callback(wakeup_hook_type wakeup_callback);
// Send message through sctp channel using a sctp stream
// called by Sender
//
// return: transmitted data size
//
extern int gs_send(gsocket* gs_sock, // destination address
char* message, // message to send
int m_len, // message length
int time_out, // timeout threshold, default is -1
bool block_mode // is wait quota
);
extern int gs_broadcast_send(struct libcommaddrinfo* sctp_addrinfo, char* message, int m_len, int time_out);
// Receive message from an array of sctp channels,
// however, we will get message data from just one channel,
// and copy the data into buffer.
//
// return: received data size
//
extern int gs_recv(
gsocket* gs_sock, // array of node index, which labeled the datanodes will send data to this datanode
void* buff, // buffer to copy data message, allocated by caller
int buff_size // size of buffer
);
// Simulate linux poll interface,
// to notify the Consumer thread data have been received into the inner buffer, you can come to take it
//
// return: the number of mailbox which has data
//
extern int gs_wait_poll(gsocket* gs_sock_array, // array of producers node index
int nproducer, // number of producers
int* producer, // producers number triggers poll
int timeout, // time out in seconds, 0 for block mode
bool close_expected // is logic connection closed by remote is an expected result
);
/* Handle the same message and print message receiving and sending log */
extern void gs_comm_ipc_print(MessageIpcLog *ipc_log, char *remotenode, gsocket *gs_sock, CommMsgOper msg_oper);
/* Send and receive data between nodes, for performance problem location */
extern MessageCommLog* gs_comm_ipc_performance(MessageCommLog *msgLog,
void *ptr,
int n,
char *remotenode,
gsocket *gs_sock,
CommMsgOper logType);
// Receiver close sctp stream
//
extern int gs_r_close_stream(int sctp_idx, // node index
int sctp_sid, // stream index
int version // stream key(the plan id and plan node id), associated the pair of Consumer and Producer
);
extern int gs_r_close_stream(gsocket* gsock);
// Sender close sctp stream
//
extern int gs_s_close_stream(int sctp_idx, // node index
int sctp_sid, // node index
int version // stream key(the plan id and plan node id), associated the pair of Consumer and Producer
);
extern void gs_close_gsocket(gsocket* gsock);
extern void gs_poll_close();
extern int gs_s_close_stream(gsocket* gsock);
extern int gs_poll(int time_out);
extern int gs_poll_create();
extern int gs_close_all_stream_by_debug_id(uint64 query_id);
extern bool gs_stop_query(gsocket* gsock, uint32 remote_pid);
// Shudown the communication layer
//
extern void gs_shutdown_comm();
// Do internal cancel when cancelling is requested
//
extern void gs_r_cancel();
// set t_thrd proc workingVersionNum
extern void gs_set_working_version_num(int num);
// export communication layer status
//
extern void gs_log_comm_status();
// check if the kernel version is reliable
//
extern int gs_check_SLESSP2_version();
// get the assigned stream number
//
extern int gs_get_stream_num();
// get the error information
//
const char* gs_comm_strerror();
// get communication layer stream status at receiver as a tuple for pg_comm_recv_stream
//
extern bool get_next_recv_stream_status(CommRecvStreamStatus* stream_status);
// get communication layer stream status at sender as a tuple for pg_comm_send_stream
//
extern bool get_next_send_stream_status(CommSendStreamStatus* stream_status);
// get communication layer stream status as a tuple for pg_comm_status
//
extern bool gs_get_comm_stat(CommStat* comm_stat);
// get communication layer sctp delay infomation as a tuple for pg_comm_delay
//
extern bool get_next_comm_delay_info(CommDelayInfo* delay_info);
extern void gs_set_debug_mode(bool mod);
extern void gs_set_stat_mode(bool mod);
extern void gs_set_timer_mode(bool mod);
extern void gs_set_no_delay(bool mod);
extern void gs_set_ackchk_time(int mod);
extern void gs_set_libcomm_used_rate(int rate);
extern void init_libcomm_cpu_rate();
// set t_thrd proc workingVersionNum
extern void gs_set_working_version_num(int num);
// get availabe memory of communication layer
//
extern long gs_get_comm_used_memory(void);
extern long gs_get_comm_peak_memory(void);
extern Size gs_get_comm_context_memory(void);
extern int gs_release_comm_memory();
// interface function for postmaster read msg by unix domain
extern int gs_recv_msg_by_unix_domain(int fd, gsocket* gs_sock);
// check mailbox version for connection reused by poolmgr
extern bool gs_test_libcomm_conn(gsocket* gs_sock);
// reset cmailbox for pooler reuse
extern void gs_clean_cmailbox(gsocket gs_sock);
extern bool gs_check_mailbox(uint16 version1, uint16 version2);
extern void gs_r_reset_cmailbox(struct c_mailbox* cmailbox, int close_reason);
extern void gs_s_reset_pmailbox(struct p_mailbox* pmailbox, int close_reason);
extern bool gs_mailbox_build(int idx);
extern void gs_mailbox_destory(int idx);
// for capacity expansion
extern void gs_change_capacity(int newval);
extern int gs_get_cur_node();
extern void commSenderFlowMain();
extern void commReceiverFlowMain();
extern void commAuxiliaryMain();
extern void commPoolCleanerMain();
extern void commReceiverMain(void* tid_callback);
extern void gs_init_adapt_layer();
extern void gs_senders_struct_set();
extern void init_comm_buffer_size();
extern void gs_set_local_host(const char* host);
extern void gs_broadcast_poll();
extern void gs_set_kerberos_keyfile();
extern void gs_receivers_struct_init(int ctrl_port, int data_port);
extern void gs_set_comm_session();
extern int gs_get_node_idx(char* node_name);
extern int gs_memory_pool_queue_initial_success(uint32 index);
extern struct mc_lqueue_item* gs_memory_pool_queue_pop(char* iov);
extern bool gs_memory_pool_queue_push(char* item);
extern ThreadId startCommSenderFlow(void);
extern ThreadId startCommReceiverFlow();
extern ThreadId startCommAuxiliary();
extern ThreadId startCommReceiver(int* tid);
extern void startCommReceiverWorker(ThreadId* threadid);
extern void gs_init_hash_table();
extern int mc_tcp_connect_nonblock(const char* host, int port);
/* Network adaptation interface of the libcomm for the ThreadPoolListener. */
extern void CommResourceInit();
extern int CommEpollCreate(int size);
extern int CommEpollCtl(int epfd, int op, int fd, struct epoll_event *event);
extern int CommEpollWait(int epfd, struct epoll_event *event, int maxevents, int timeout);
extern int CommEpollClose(int epfd);
extern void InitCommLogicResource();
extern void ProcessCommLogicTearDown();
class CommEpollFd;
struct HTAB;
typedef struct LogicFd {
int idx;
int streamid;
} LogicFd;
typedef struct CommEpFdInfo {
/* hash key first */
int epfd;
CommEpollFd *comm_epfd;
} CommEpFdInfo;
typedef struct CommFd {
int fd;
gsocket logic_fd;
} CommFd;
struct knl_session_context;
class CommEpollFd;
typedef struct SessionInfo {
LogicFd logic_fd; // HASH key first
CommFd commfd;
int wakeup_cnt;
int handle_wakeup_cnt;
volatile bool err_occurs;
CommEpollFd *comm_epfd_ptr;
knl_session_context *session_ptr;
int is_idle;
} SessionInfo;
#ifndef NO_COMPILE_CLASS_FDCOLLECTION
class FdCollection : public BaseObject {
public:
FdCollection();
~FdCollection();
void Init();
void DeInit();
int AddEpfd(int epfd, int size);
int DelEpfd(int epfd);
void CleanEpfd();
inline CommEpollFd* GetCommEpollFd(int epfd);
int AddLogicFd(CommEpollFd *comm_epfd, void *session_ptr);
int DelLogicFd(const LogicFd *logic_fd);
void CleanLogicFd();
SessionInfo* GetSessionInfo(const LogicFd *logic_fd);
private:
inline bool IsEpfdValid(int fd);
/* SockfdAPI** m_sockfd_map */
HTAB *m_epfd_htab;
pthread_mutex_t m_epfd_htab_lock;
/* logicfd info */
HTAB *m_logicfd_htab;
pthread_mutex_t m_logicfd_htab_lock;
int m_logicfd_nums;
};
#endif
extern void WakeupSession(SessionInfo *session_info, bool err_occurs, const char* caller_name);
/*
* LIBCOMM_CHECK is defined when make commcheck
*/
#ifdef LIBCOMM_CHECK
#define LIBCOMM_FAULT_INJECTION_ENABLE
#define LIBCOMM_SPEED_TEST_ENABLE
#else
#undef LIBCOMM_FAULT_INJECTION_ENABLE
#undef LIBCOMM_SPEED_TEST_ENABLE
#endif
/*
* Libcomm Speed Test Framework
* Before start: 1, Must run a stream query to get DN connections.
* 2, add new guc params to $GAUSSHOME/bin/cluster_guc.conf
* Start: Use gs_guc reload to set test thread num.
* For example: gs_guc reload -Z datanode -N all -I all -c "comm_test_thread_num=1"
* Stop: Use gs_guc reload to set test thread num=0.
*/
#ifdef LIBCOMM_SPEED_TEST_ENABLE
extern void gs_set_test_thread_num(int newval);
extern void gs_set_test_msg_len(int newval);
extern void gs_set_test_send_sleep(int newval);
extern void gs_set_test_send_once(int newval);
extern void gs_set_test_recv_sleep(int newval);
extern void gs_set_test_recv_once(int newval);
#endif
/*Libcomm fault injection Framework
* Before start: 1, enable LIBCOMM_FAULT_INJECTION_ENABLE.
* 2, add new guc params to $GAUSSHOME/bin/cluster_guc.conf
* Start: Use gs_guc reload to set FI num.
* For instance: gs_guc reload -Z datanode -N all -I all -c "comm_fault_injection=6"
* Stop: Use gs_guc reload to set comm_fault_injection=0.
*/
#ifdef LIBCOMM_FAULT_INJECTION_ENABLE
typedef enum {
LIBCOMM_FI_NONE,
LIBCOMM_FI_R_TCP_DISCONNECT,
LIBCOMM_FI_R_SCTP_DISCONNECT,
LIBCOMM_FI_S_TCP_DISCONNECT,
LIBCOMM_FI_S_SCTP_DISCONNECT,
LIBCOMM_FI_RELEASE_MEMORY,
LIBCOMM_FI_FAILOVER, //
LIBCOMM_FI_NO_STREAMID,
LIBCOMM_FI_CONSUMER_REJECT,
LIBCOMM_FI_R_APP_CLOSE, //
LIBCOMM_FI_S_APP_CLOSE, //
LIBCOMM_FI_CANCEL_SIGNAL, //
LIBCOMM_FI_CLOSE_BY_VIEW, //
LIBCOMM_FI_GSS_TCP_FAILED,
LIBCOMM_FI_GSS_SCTP_FAILED,
LIBCOMM_FI_R_PACKAGE_SPLIT,
LIBCOMM_FI_MALLOC_FAILED,
LIBCOMM_FI_MC_TCP_READ_FAILED,
LIBCOMM_FI_MC_TCP_READ_BLOCK_FAILED,
LIBCOMM_FI_MC_TCP_READ_NONBLOCK_FAILED,
LIBCOMM_FI_MC_TCP_WRITE_FAILED,
LIBCOMM_FI_MC_TCP_WRITE_NONBLOCK_FAILED,
LIBCOMM_FI_MC_TCP_ACCEPT_FAILED,
LIBCOMM_FI_MC_TCP_LISTEN_FAILED,
LIBCOMM_FI_MC_TCP_CONNECT_FAILED,
LIBCOMM_FI_FD_SOCKETVERSION_FAILED,
LIBCOMM_FI_SOCKID_NODEIDX_FAILED,
LIBCOMM_FI_POLLER_ADD_FD_FAILED,
LIBCOMM_FI_NO_NODEIDX,
LIBCOMM_FI_CREATE_POLL_FAILED,
LIBCOMM_FI_DYNAMIC_CAPACITY_FAILED,
LIBCOMM_FI_MAX
} LibcommFaultInjection;
extern void gs_set_fault_injection(int newval);
extern void set_comm_fault_injection(int type);
extern bool is_comm_fault_injection(LibcommFaultInjection type);
#endif
#ifdef USE_SPQ
constexpr uint16 SPQ_QE_CONNECTION = 0;
constexpr uint16 SPQ_QC_CONNECTION = 1;
struct QCConnKey {
uint64 query_id;
uint32 plan_node_id;
uint16 node_id;
uint16 type;
};
struct QCConnEntry {
QCConnKey key;
uint64 streamcap;
gsocket forward;
gsocket backward;
int scannedPageNum;
int internal_node_id;
};
struct BackConnInfo {
uint16 node_idx;
uint16 version;
uint64 streamcap;
uint64 query_id;
CommStreamKey stream_key;
gsocket *backward;
};
extern int gs_r_build_reply_connection(BackConnInfo* fcmsgr, int local_version, uint16 *sid);
#endif
#endif //_GS_LIBCOMM_H_