add 3.0.0 patch file

This commit is contained in:
zhang_xubo
2022-03-11 17:10:31 +08:00
parent 5074a6f668
commit 5b71f51471
29 changed files with 3988 additions and 69 deletions

View File

@ -19,16 +19,18 @@
#define _WIN32_WINNT 0x0400
#endif /* _WIN32_WINNT */
#include "connection.h"
#include "misc.h"
#include <stdio.h>
#include <string.h>
#include <ctype.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <unistd.h>
/* for htonl */
#ifdef WIN32
#include <Winsock2.h>
#include <windows.h>
#else
#include <arpa/inet.h>
#endif
@ -45,6 +47,8 @@
#include <pwd.h>
#endif
#include "connection.h"
#include "misc.h"
#include "environ.h"
#include "statement.h"
#include "qresult.h"
@ -58,15 +62,427 @@
#define SAFE_STR(s) (NULL != (s) ? (s) : "(null)")
#define STMT_INCREMENT 16 /* how many statement holders to allocate
* at a time */
/* how many statement holders to allocate
* at a time
*/
#define STMT_INCREMENT 16
#define MAX_CN 128 /* the maximum number of CN is 128 */
#define EMPTY 0
#define CORRECT 1
#define WRONG 2
#define lenNameType 63
BOOL conn_inited = FALSE;
BOOL conn_precheck = FALSE;
pthread_rwlock_t init_lock = PTHREAD_RWLOCK_INITIALIZER;
int refresh_flag = 0;
unsigned long int pgxc_node_thread_id;
typedef struct CnEntry {
char ip_list[MAX_CN][MEDIUM_REGISTRY_LEN]; /* a char array to store IPs */
int ip_status[MAX_CN]; /* an integer array that indicates the status of IPs */
char port_list[MAX_CN][SMALL_REGISTRY_LEN]; /* a char array that stores port */
int port_status[MAX_CN]; /* an integer array that indicates the status of IPs */
int ip_count; /* define integer to record the number of IPs stored in the IP_list */
int port_count; /*
* define integer to record the number of IPs stored in the IP_list
* this should be equal to IP_count
*/
int step[MAX_CN]; /* record the offset for roundRobin when autobalance is on */
BOOL is_usable;
pthread_rwlock_t ip_list_lock; /* define a lock to isolate read and write to ip_list */
pthread_rwlock_t step_lock[MAX_CN]; /* define a lock to isolate read and write to step */
} CnEntry;
typedef struct dsn_time {
char *DSN;
int timeinterval;
} dsn_time;
CnEntry orig_entry;
CnEntry pgxc_entry;
static int LIBPQ_connect(ConnectionClass *self);
#ifdef WIN32
DWORD WINAPI read_pgxc_node(LPVOID arg)
#else
static void *read_pgxc_node(void *arg)
#endif
{
dsn_time read_cn;
read_cn = *(dsn_time *) arg;
char *DSN = malloc(strlen(read_cn.DSN) + 1);
if (DSN == NULL) {
exit(1);
}
strncpy_null(DSN, read_cn.DSN, strlen(read_cn.DSN) + 1);
read_cn.DSN = DSN;
int time = read_cn.timeinterval;
#ifdef WIN32
pgxc_node_thread_id = GetCurrentThreadId();
#else
pgxc_node_thread_id = pthread_self();
#endif
int refresh_count = 0;
/* read CNs' IP from pgxc_node */
for (;;) {
MYLOG(0, "REFRESH starts\n");
SQLHENV hEnv = SQL_NULL_HENV;
SQLHDBC hDbc = SQL_NULL_HDBC;
SQLHSTMT hStmt = SQL_NULL_HSTMT;
SQLRETURN rc = SQL_SUCCESS;
SQLINTEGER RETCODE = 0;
char node_port[SMALL_REGISTRY_LEN];
char node_host[lenNameType];
SQLLEN lenPort=0, lenHost=0;
RETCODE = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &hEnv);
if (RETCODE != SQL_SUCCESS) {
continue;
}
SQLSetEnvAttr(hEnv, SQL_ATTR_ODBC_VERSION, (void*) SQL_OV_ODBC3, 0);
RETCODE = SQLAllocHandle(SQL_HANDLE_DBC, hEnv, &hDbc);
if (RETCODE != SQL_SUCCESS) {
SQLFreeHandle(SQL_HANDLE_ENV, hEnv);
continue;
}
RETCODE = SQLConnect(hDbc, // Connect handle
(SQLCHAR *)DSN, //DSN
SQL_NTS, // DSN is nul-terminated
NULL, // Null UID
0 ,
NULL, // Null Auth string
0);
if (RETCODE != SQL_SUCCESS) {
SQLFreeHandle(SQL_HANDLE_DBC, hDbc);
SQLFreeHandle(SQL_HANDLE_ENV, hEnv);
continue;
}
RETCODE = SQLAllocHandle(SQL_HANDLE_STMT, hDbc, &hStmt);
if (RETCODE != SQL_SUCCESS) {
SQLFreeHandle(SQL_HANDLE_DBC, hDbc);
SQLFreeHandle(SQL_HANDLE_ENV, hEnv);
continue;
}
RETCODE = SQLExecDirect(hStmt,"select node_port, node_host from pgxc_node where node_type = 'C' and nodeis_active order by node_name",SQL_NTS);
if (RETCODE != SQL_SUCCESS) {
SQLFreeHandle(SQL_HANDLE_STMT, hStmt);
SQLDisconnect(hDbc);
SQLFreeHandle(SQL_HANDLE_DBC, hDbc);
SQLFreeHandle(SQL_HANDLE_ENV, hEnv);
continue;
}
RETCODE = SQLBindCol(hStmt, 1, SQL_C_CHAR, node_port, SMALL_REGISTRY_LEN, &lenPort);
RETCODE = SQLBindCol(hStmt, 2, SQL_C_CHAR, node_host, lenNameType + 1, &lenHost);
if (RETCODE != SQL_SUCCESS) {
SQLFreeHandle(SQL_HANDLE_STMT, hStmt);
SQLDisconnect(hDbc);
SQLFreeHandle(SQL_HANDLE_DBC, hDbc);
SQLFreeHandle(SQL_HANDLE_ENV, hEnv);
continue;
}
int count = 0;
char IP_list_temp[MAX_CN][MEDIUM_REGISTRY_LEN];
char port_list_temp[MAX_CN][SMALL_REGISTRY_LEN];
char port_temp[SMALL_REGISTRY_LEN];
while ((rc = SQLFetch(hStmt)) == SQL_SUCCESS) {
refresh_flag = 1;
STRCPY_FIXED(IP_list_temp[count], node_host);
STRCPY_FIXED(port_list_temp[count++], node_port);
pgxc_entry.ip_count = count;
}
refresh_count++;
if (refresh_count > 10 && rc == SQL_ERROR) {
refresh_flag = 1;
MYLOG(0, "Refresh failed for ten times, change signal and unlock other threads.\n");
}
int i;
if (count != 0 && pthread_rwlock_wrlock(&pgxc_entry.ip_list_lock) == 0) {
for (i = 0; i < pgxc_entry.ip_count; i++) {
STRCPY_FIXED(pgxc_entry.ip_list[i], IP_list_temp[i]);
STRCPY_FIXED(pgxc_entry.port_list[i], port_list_temp[i]);
MYLOG(0, "ip = %s, port = %s\n", pgxc_entry.ip_list[i], pgxc_entry.port_list[i]);
}
MYLOG(0, "CN list has been refreshed.\n");
if (pthread_rwlock_unlock(&pgxc_entry.ip_list_lock) != 0) {
SQLFreeHandle(SQL_HANDLE_STMT, hStmt);
SQLDisconnect(hDbc);
SQLFreeHandle(SQL_HANDLE_DBC, hDbc);
SQLFreeHandle(SQL_HANDLE_ENV, hEnv);
MYLOG(0, "Unlock failed. Exit process.\n");
exit(1);
}
}
SQLDisconnect(hDbc);
sleep(time);
}
}
static BOOL check_IP_connection(ConnectionClass *conn, CnEntry *entry)
{
int i;
int pqret;
BOOL ret = FALSE;
ConnInfo *ci = &conn->connInfo;
if (conn == NULL) {
return FALSE;
}
MYLOG(0, "Start checking the connection for each pair of IP and PORT.\n");
if (entry == &pgxc_entry) {
pthread_rwlock_rdlock(&entry->ip_list_lock);
}
for (i = 0; i < entry->ip_count; i++) {
STRCPY_FIXED(ci->server, entry->ip_list[i]);
STRCPY_FIXED(ci->port, entry->port_list[i]);
if ((pqret = LIBPQ_connect(conn)) <= 0) {
/* connection failed, kick out the wrong IP from IP_list and write the wrong IP into log */
MYLOG(0, "Cannot establish connection via IP: %s\n", entry->ip_list[i]);
entry->ip_status[i] = WRONG;
} else {
/* connection successful, current IP remains in IP_list but disconnect */
entry->ip_status[i] = CORRECT;
PQfinish(conn->pqconn);
ret = TRUE;
}
}
MYLOG(0, "Check finished.\n");
if (entry == &pgxc_entry) {
pthread_rwlock_unlock(&entry->ip_list_lock);
}
return ret;
}
static void start_new_thread(dsn_time *read_cn)
{
#ifdef WIN32
CreateThread(NULL, 0, read_pgxc_node, (LPVOID)(read_cn), 0, NULL);
#else
pthread_t ntid;
pthread_create(&ntid, NULL, read_pgxc_node, read_cn);
#endif
}
static RETCODE init_conn(ConnectionClass *conn)
{
RETCODE ret = SQL_SUCCESS;
ConnInfo *ci = &conn->connInfo;
if (conn == NULL) {
return SQL_ERROR;
}
/* initialize */
memset(&pgxc_entry, 0, sizeof(CnEntry));
memset(&orig_entry, 0, sizeof(CnEntry));
pgxc_entry.is_usable = TRUE;
orig_entry.is_usable = TRUE;
int i;
for (i = 0; i < MAX_CN; i++) {
pthread_rwlock_init(&pgxc_entry.step_lock[i], NULL);
pthread_rwlock_init(&orig_entry.step_lock[i], NULL);
}
pthread_rwlock_init(&pgxc_entry.ip_list_lock, NULL);
pthread_rwlock_init(&orig_entry.ip_list_lock, NULL);
/* make a copy of ci->server and ci->port to prevent changing the original conn when parsing it */
char server[LARGE_REGISTRY_LEN];
STRCPY_FIXED(server, ci->server);
char port[LARGE_REGISTRY_LEN];
STRCPY_FIXED(port, ci->port);
/* parsing ci->server to seperate IPs and store them into IP_list */
char *p = strtok(server, ",");
while (p != NULL) {
STRCPY_FIXED(orig_entry.ip_list[orig_entry.ip_count++], p);
STRCPY_FIXED(pgxc_entry.ip_list[pgxc_entry.ip_count++], p);
p = strtok(NULL, ",");
}
/* parsing ci->port to seperate PORTs and store them into port_list */
p = strtok(port, ",");
while (p != NULL) {
STRCPY_FIXED(orig_entry.port_list[orig_entry.port_count++], p);
STRCPY_FIXED(pgxc_entry.port_list[pgxc_entry.port_count++], p);
p = strtok(NULL, ",");
}
/* if only one port was configured, then each CN has the same port by default */
if (orig_entry.port_count == 1) {
for (i = 1; i < orig_entry.ip_count; i++) {
STRCPY_FIXED(orig_entry.port_list[orig_entry.port_count++], orig_entry.port_list[0]);
STRCPY_FIXED(pgxc_entry.port_list[pgxc_entry.port_count++], pgxc_entry.port_list[0]);
}
}
/* if serverl ports were configured, the number of ports has to be equal to the number of IPs */
if (orig_entry.ip_count != orig_entry.port_count) {
MYLOG(0, "The number of IP %d does not match the number of Port %d.\n", orig_entry.ip_count, orig_entry.port_count);
return SQL_ERROR;
}
/* check the connection of each pair of IP and port and update the list and status */
if (!check_IP_connection(conn, &orig_entry)) {
return SQL_ERROR;
}
memcpy(pgxc_entry.ip_status, orig_entry.ip_status, sizeof(orig_entry.ip_status));
/* start new thread to connect to datbase and select node_port from pgxc_node */
dsn_time read_cn;
read_cn.DSN = ci->dsn;
if (ci->refreshcnlisttime == 0) {
read_cn.timeinterval = 10;
} else {
read_cn.timeinterval = ci->refreshcnlisttime;
}
start_new_thread(&read_cn);
return ret;
}
int get_location(BOOL *visited, CnEntry *entry, int *visited_count)
{
if (visited == NULL || entry == NULL || visited_count == NULL) {
return -1;
}
/* select random IP from ip_list */
srand(pthread_self());
unsigned int ind = rand() % entry->ip_count;
/* record the offset of each IP for roundRobin */
int *offset = &entry->step[ind];
pthread_rwlock_t *offset_lock = &entry->step_lock[ind];
/*
* if the selected IP can be connected and has not been visited, connect to this IP
* else enter the while loop to choose another IP for connect
* use visited_count to record the number of IPs that have been visited
* visited_count equals to ip_count means reaching the limit
*/
while ((entry->ip_status[ind] == WRONG || visited[ind] == TRUE) && (*visited_count) != entry->ip_count) {
if (visited[ind] == FALSE) {
visited[ind] = TRUE;
(*visited_count)++;
}
pthread_rwlock_wrlock(offset_lock);
while (visited[ind] == TRUE && (*visited_count) != entry->ip_count) {
ind = ind + *offset;
(*offset)++;
ind = ind % entry->ip_count;
}
pthread_rwlock_unlock(offset_lock);
}
/*
* the selected IP after the while loop should not have been visited
* if it has been visited, return error
*/
if (visited[ind] == TRUE) {
return -1;
}
visited[ind] = TRUE;
(*visited_count)++;
return ind;
}
static RETCODE connect_random_IP(ConnectionClass *conn, CnEntry *entry)
{
RETCODE ret = SQL_ERROR;
ConnInfo *ci = &conn->connInfo;
CSTR func = "PGAPI_Connect";
char fchar;
BOOL visited[MAX_CN] = {FALSE};
int visited_count = 0;
BOOL check_ret = check_IP_connection(conn, entry);
/* only connection successful and all connection failed will break the while loop */
while (ret == SQL_ERROR) {
if (entry == &pgxc_entry && pthread_rwlock_rdlock(&entry->ip_list_lock) != 0) {
return SQL_ERROR;
}
int ind = get_location(visited, entry, &visited_count);
if (ind == -1) {
pthread_rwlock_unlock(&entry->ip_list_lock);
return SQL_ERROR;
}
STRCPY_FIXED(ci->server, entry->ip_list[ind]);
STRCPY_FIXED(ci->port, entry->port_list[ind]);
while (entry == &pgxc_entry && pthread_rwlock_unlock(&entry->ip_list_lock) != 0);
if ((fchar = CC_connect(conn, NULL)) <= 0) {
/* Error messages are filled in */
CC_log_error(func, "Error on CC_connect", conn);
ret = SQL_ERROR;
} else {
ret = SQL_SUCCESS;
}
}
if (ret == SQL_SUCCESS && fchar == 2) {
ret = SQL_SUCCESS_WITH_INFO;
}
MYLOG(0, "leaving..%d.\n", ret);
/* Empty the password stored in memory to avoid password leak */
if (NAME_IS_VALID(ci->password))
memset(ci->password.name, 0, strlen(ci->password.name));
return ret;
}
static RETCODE connect_IP(ConnectionClass *conn)
{
if (conn->connInfo.priority == 1 && orig_entry.is_usable && connect_random_IP(conn, &orig_entry) != SQL_ERROR) {
return SQL_SUCCESS;
}
return connect_random_IP(conn, &pgxc_entry);
}
static RETCODE check_and_init(ConnectionClass *conn)
{
if (conn_precheck) {
return SQL_SUCCESS;
}
if (pthread_rwlock_rdlock(&init_lock)) {
return SQL_ERROR;
}
if (conn_inited) {
pthread_rwlock_unlock(&init_lock);
return SQL_SUCCESS;
}
pthread_rwlock_unlock(&init_lock);
if (pthread_rwlock_wrlock(&init_lock)) {
return SQL_ERROR;
}
if (conn_inited) {
pthread_rwlock_unlock(&init_lock);
return SQL_SUCCESS;
}
if (init_conn(conn) != SQL_SUCCESS) {
pthread_rwlock_unlock(&init_lock);
return SQL_ERROR;
} else {
conn_inited = TRUE;
}
pthread_rwlock_unlock(&init_lock);
conn_precheck = TRUE;
return SQL_SUCCESS;
}
static SQLRETURN CC_lookup_lo(ConnectionClass *self);
static int CC_close_eof_cursors(ConnectionClass *self);
static void LIBPQ_update_transaction_status(ConnectionClass *self);
static void CC_set_error_if_not_set(ConnectionClass *self, int errornumber, const char *errormsg, const char *func)
{
int errornum = CC_get_errornumber(self);
@ -141,8 +557,6 @@ PGAPI_Connect(HDBC hdbc,
RETCODE ret = SQL_SUCCESS;
char fchar, *tmpstr;
MYLOG(0, "entering..cbDSN=%hi.\n", cbDSN);
if (!conn)
{
CC_log_error(func, "", NULL);
@ -161,6 +575,8 @@ PGAPI_Connect(HDBC hdbc,
/* initialize pg_version from connInfo.protocol */
CC_initialize_pg_version(conn);
MYLOG(0, "entering..cbDSN=%hi.\n", cbDSN);
/*
* override values from DSN info with UID and authStr(pwd) This only
* occurs if the values are actually there.
@ -179,20 +595,40 @@ PGAPI_Connect(HDBC hdbc,
MYLOG(0, "conn = %p (DSN='%s', UID='%s', PWD='%s')\n", conn, ci->dsn, ci->username, NAME_IS_VALID(ci->password) ? "xxxxx" : "");
if ((fchar = CC_connect(conn, NULL)) <= 0)
{
/* Error messages are filled in */
CC_log_error(func, "Error on CC_connect", conn);
ret = SQL_ERROR;
if (ci->autobalance == 1) {
if (check_and_init(conn) != SQL_SUCCESS) {
return SQL_ERROR;
}
#ifdef WIN32
if (GetCurrentThreadId() != pgxc_node_thread_id)
#else
if (pthread_self() != pgxc_node_thread_id)
#endif
{
while (refresh_flag != 1) {
#ifdef WIN32
sleep(10);
#else
usleep(10000);
#endif
}
}
ret = connect_IP(conn);
}
else {
if ((fchar = CC_connect(conn, NULL)) <= 0) {
CC_log_error(func, "Error on CC_connect", conn);
ret = SQL_ERROR;
}
if (SQL_SUCCESS == ret && 2 == fchar) {
ret = SQL_SUCCESS_WITH_INFO;
}
MYLOG(0, "leaving..%d.\n", ret);
if (NAME_IS_VALID(ci->password)) {
memset(ci->password.name, 0, strlen(ci->password.name));
}
}
if (SQL_SUCCESS == ret && 2 == fchar)
ret = SQL_SUCCESS_WITH_INFO;
MYLOG(0, "leaving..%d.\n", ret);
/* Empty the password stored in memory to avoid password leak */
if (NAME_IS_VALID(ci->password))
memset(ci->password.name, 0, strlen(ci->password.name));
return ret;
}
@ -1055,7 +1491,6 @@ static int handle_show_results(const QResultClass *res);
#define TRANSACTION_ISOLATION "transaction_isolation"
#define ISOLATION_SHOW_QUERY "show " TRANSACTION_ISOLATION
static int LIBPQ_connect(ConnectionClass *self);
static char
LIBPQ_CC_connect(ConnectionClass *self, char *salt_para)
{
@ -3003,6 +3438,10 @@ LIBPQ_connect(ConnectionClass *self)
}
opts[cnt] = "connection_info"; vals[cnt++] = local_conninfo;
}
opts[cnt] = "target_session_attrs";
vals[cnt++] = "primary";
if (conninfoOption != NULL)
{
const char *keyword, *val;
@ -3032,17 +3471,25 @@ LIBPQ_connect(ConnectionClass *self)
}
}
}
opts[cnt] = vals[cnt] = NULL;
/* Ok, we're all set to connect */
if (get_qlog() > 0 || get_mylog() > 0)
{
const char **popt, **pval;
const char* pwdKey = "password";
QLOG(0, "PQconnectdbParams:");
for (popt = opts, pval = vals; *popt; popt++, pval++)
QPRINTF(0, " %s='%s'", *popt, *pval);
QPRINTF(0, "\n");
for (popt = opts, pval = vals; *popt; popt++, pval++) {
if (strcmp(pwdKey, *popt) == 0) {
QPRINTF(0, " %s='xxxxx'", *popt);
} else {
QPRINTF(0, " %s='%s'", *popt, *pval);
}
}
QPRINTF(0, "\n");
}
pqconn = PQconnectdbParams(opts, vals, FALSE);
if (!pqconn)
@ -3084,7 +3531,7 @@ MYLOG(DETAIL_LOG_LEVEL, "status=%d\n", pqret);
memset(pwd, 0, strlen(pwd));
}
MYLOG(0, "libpq connection to the database established.\n");
MYLOG(0, "libpq connection to the database established.(IP: %s)\n", PQhost(pqconn));
pversion = PQprotocolVersion(pqconn);
if (pversion < 3)
{