/*------ * Module: connection.c * * Description: This module contains routines related to * connecting to and disconnecting from the Postgres DBMS. * * Classes: ConnectionClass (Functions prefix: "CC_") * * API functions: SQLAllocConnect, SQLConnect, SQLDisconnect, SQLFreeConnect, * SQLBrowseConnect(NI) * * Comments: See "readme.txt" for copyright and license information. *------- */ /* Multibyte support Eiji Tokuya 2001-03-15 */ /* TryEnterCritiaclSection needs the following #define */ #ifndef _WIN32_WINNT #define _WIN32_WINNT 0x0400 #endif /* _WIN32_WINNT */ #include #include #include #include #include #include #include /* for htonl */ #ifdef WIN32 #include #include #else #include #endif #ifndef WIN32 #ifndef _GNU_SOURCE #define _GNU_SOURCE #endif #ifndef __USE_GNU #define __USE_GNU #endif #include #include #endif #include "connection.h" #include "misc.h" #include "environ.h" #include "statement.h" #include "qresult.h" #include "lobj.h" #include "dlg_specific.h" #include "loadlib.h" #include "multibyte.h" #include "pgapifunc.h" #define SAFE_STR(s) (NULL != (s) ? (s) : "(null)") /* how many statement holders to allocate * at a time */ #define STMT_INCREMENT 16 #define MAX_CN 128 /* the maximum number of CN is 128 */ #define EMPTY 0 #define CORRECT 1 #define WRONG 2 #define lenNameType 63 BOOL conn_inited = FALSE; BOOL conn_precheck = FALSE; pthread_rwlock_t init_lock = PTHREAD_RWLOCK_INITIALIZER; int refresh_flag = 0; unsigned long int pgxc_node_thread_id; typedef struct CnEntry { char ip_list[MAX_CN][MEDIUM_REGISTRY_LEN]; /* a char array to store IPs */ int ip_status[MAX_CN]; /* an integer array that indicates the status of IPs */ char port_list[MAX_CN][SMALL_REGISTRY_LEN]; /* a char array that stores port */ int port_status[MAX_CN]; /* an integer array that indicates the status of IPs */ int ip_count; /* define integer to record the number of IPs stored in the IP_list */ int port_count; /* * define integer to record the number of IPs stored in the IP_list * this should be equal to IP_count */ int step[MAX_CN]; /* record the offset for roundRobin when autobalance is on */ BOOL is_usable; pthread_rwlock_t ip_list_lock; /* define a lock to isolate read and write to ip_list */ pthread_rwlock_t step_lock[MAX_CN]; /* define a lock to isolate read and write to step */ } CnEntry; typedef struct dsn_time { char *DSN; int timeinterval; } dsn_time; CnEntry orig_entry; CnEntry pgxc_entry; static int LIBPQ_connect(ConnectionClass *self); #ifdef WIN32 DWORD WINAPI read_pgxc_node(LPVOID arg) #else static void *read_pgxc_node(void *arg) #endif { dsn_time read_cn; read_cn = *(dsn_time *) arg; char *DSN = malloc(strlen(read_cn.DSN) + 1); if (DSN == NULL) { exit(1); } strncpy_null(DSN, read_cn.DSN, strlen(read_cn.DSN) + 1); read_cn.DSN = DSN; int time = read_cn.timeinterval; #ifdef WIN32 pgxc_node_thread_id = GetCurrentThreadId(); #else pgxc_node_thread_id = pthread_self(); #endif int refresh_count = 0; /* read CNs' IP from pgxc_node */ for (;;) { MYLOG(0, "REFRESH starts\n"); SQLHENV hEnv = SQL_NULL_HENV; SQLHDBC hDbc = SQL_NULL_HDBC; SQLHSTMT hStmt = SQL_NULL_HSTMT; SQLRETURN rc = SQL_SUCCESS; SQLINTEGER RETCODE = 0; char node_port[SMALL_REGISTRY_LEN]; char node_host[lenNameType]; SQLLEN lenPort=0, lenHost=0; RETCODE = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &hEnv); if (RETCODE != SQL_SUCCESS) { continue; } SQLSetEnvAttr(hEnv, SQL_ATTR_ODBC_VERSION, (void*) SQL_OV_ODBC3, 0); RETCODE = SQLAllocHandle(SQL_HANDLE_DBC, hEnv, &hDbc); if (RETCODE != SQL_SUCCESS) { SQLFreeHandle(SQL_HANDLE_ENV, hEnv); continue; } RETCODE = SQLConnect(hDbc, // Connect handle (SQLCHAR *)DSN, //DSN SQL_NTS, // DSN is nul-terminated NULL, // Null UID 0 , NULL, // Null Auth string 0); if (RETCODE != SQL_SUCCESS) { SQLFreeHandle(SQL_HANDLE_DBC, hDbc); SQLFreeHandle(SQL_HANDLE_ENV, hEnv); continue; } RETCODE = SQLAllocHandle(SQL_HANDLE_STMT, hDbc, &hStmt); if (RETCODE != SQL_SUCCESS) { SQLFreeHandle(SQL_HANDLE_DBC, hDbc); SQLFreeHandle(SQL_HANDLE_ENV, hEnv); continue; } RETCODE = SQLExecDirect(hStmt,"select node_port, node_host from pgxc_node where node_type = 'C' and nodeis_active order by node_name",SQL_NTS); if (RETCODE != SQL_SUCCESS) { SQLFreeHandle(SQL_HANDLE_STMT, hStmt); SQLDisconnect(hDbc); SQLFreeHandle(SQL_HANDLE_DBC, hDbc); SQLFreeHandle(SQL_HANDLE_ENV, hEnv); continue; } RETCODE = SQLBindCol(hStmt, 1, SQL_C_CHAR, node_port, SMALL_REGISTRY_LEN, &lenPort); RETCODE = SQLBindCol(hStmt, 2, SQL_C_CHAR, node_host, lenNameType + 1, &lenHost); if (RETCODE != SQL_SUCCESS) { SQLFreeHandle(SQL_HANDLE_STMT, hStmt); SQLDisconnect(hDbc); SQLFreeHandle(SQL_HANDLE_DBC, hDbc); SQLFreeHandle(SQL_HANDLE_ENV, hEnv); continue; } int count = 0; char IP_list_temp[MAX_CN][MEDIUM_REGISTRY_LEN]; char port_list_temp[MAX_CN][SMALL_REGISTRY_LEN]; char port_temp[SMALL_REGISTRY_LEN]; while ((rc = SQLFetch(hStmt)) == SQL_SUCCESS) { refresh_flag = 1; STRCPY_FIXED(IP_list_temp[count], node_host); STRCPY_FIXED(port_list_temp[count++], node_port); pgxc_entry.ip_count = count; } refresh_count++; if (refresh_count > 10 && rc == SQL_ERROR) { refresh_flag = 1; MYLOG(0, "Refresh failed for ten times, change signal and unlock other threads.\n"); } int i; if (count != 0 && pthread_rwlock_wrlock(&pgxc_entry.ip_list_lock) == 0) { for (i = 0; i < pgxc_entry.ip_count; i++) { STRCPY_FIXED(pgxc_entry.ip_list[i], IP_list_temp[i]); STRCPY_FIXED(pgxc_entry.port_list[i], port_list_temp[i]); MYLOG(0, "ip = %s, port = %s\n", pgxc_entry.ip_list[i], pgxc_entry.port_list[i]); } MYLOG(0, "CN list has been refreshed.\n"); if (pthread_rwlock_unlock(&pgxc_entry.ip_list_lock) != 0) { SQLFreeHandle(SQL_HANDLE_STMT, hStmt); SQLDisconnect(hDbc); SQLFreeHandle(SQL_HANDLE_DBC, hDbc); SQLFreeHandle(SQL_HANDLE_ENV, hEnv); MYLOG(0, "Unlock failed. Exit process.\n"); exit(1); } } SQLDisconnect(hDbc); sleep(time); } } static BOOL check_IP_connection(ConnectionClass *conn, CnEntry *entry) { int i; int pqret; BOOL ret = FALSE; ConnInfo *ci = &conn->connInfo; if (conn == NULL) { return FALSE; } MYLOG(0, "Start checking the connection for each pair of IP and PORT.\n"); if (entry == &pgxc_entry) { pthread_rwlock_rdlock(&entry->ip_list_lock); } for (i = 0; i < entry->ip_count; i++) { STRCPY_FIXED(ci->server, entry->ip_list[i]); STRCPY_FIXED(ci->port, entry->port_list[i]); if ((pqret = LIBPQ_connect(conn)) <= 0) { /* connection failed, kick out the wrong IP from IP_list and write the wrong IP into log */ MYLOG(0, "Cannot establish connection via IP: %s\n", entry->ip_list[i]); entry->ip_status[i] = WRONG; } else { /* connection successful, current IP remains in IP_list but disconnect */ entry->ip_status[i] = CORRECT; PQfinish(conn->pqconn); ret = TRUE; } } MYLOG(0, "Check finished.\n"); if (entry == &pgxc_entry) { pthread_rwlock_unlock(&entry->ip_list_lock); } return ret; } static void start_new_thread(dsn_time *read_cn) { #ifdef WIN32 CreateThread(NULL, 0, read_pgxc_node, (LPVOID)(read_cn), 0, NULL); #else pthread_t ntid; pthread_create(&ntid, NULL, read_pgxc_node, read_cn); #endif } static RETCODE init_conn(ConnectionClass *conn) { RETCODE ret = SQL_SUCCESS; ConnInfo *ci = &conn->connInfo; if (conn == NULL) { return SQL_ERROR; } /* initialize */ memset(&pgxc_entry, 0, sizeof(CnEntry)); memset(&orig_entry, 0, sizeof(CnEntry)); pgxc_entry.is_usable = TRUE; orig_entry.is_usable = TRUE; int i; for (i = 0; i < MAX_CN; i++) { pthread_rwlock_init(&pgxc_entry.step_lock[i], NULL); pthread_rwlock_init(&orig_entry.step_lock[i], NULL); } pthread_rwlock_init(&pgxc_entry.ip_list_lock, NULL); pthread_rwlock_init(&orig_entry.ip_list_lock, NULL); /* make a copy of ci->server and ci->port to prevent changing the original conn when parsing it */ char server[LARGE_REGISTRY_LEN]; STRCPY_FIXED(server, ci->server); char port[LARGE_REGISTRY_LEN]; STRCPY_FIXED(port, ci->port); /* parsing ci->server to seperate IPs and store them into IP_list */ char *p = strtok(server, ","); while (p != NULL) { STRCPY_FIXED(orig_entry.ip_list[orig_entry.ip_count++], p); STRCPY_FIXED(pgxc_entry.ip_list[pgxc_entry.ip_count++], p); p = strtok(NULL, ","); } /* parsing ci->port to seperate PORTs and store them into port_list */ p = strtok(port, ","); while (p != NULL) { STRCPY_FIXED(orig_entry.port_list[orig_entry.port_count++], p); STRCPY_FIXED(pgxc_entry.port_list[pgxc_entry.port_count++], p); p = strtok(NULL, ","); } /* if only one port was configured, then each CN has the same port by default */ if (orig_entry.port_count == 1) { for (i = 1; i < orig_entry.ip_count; i++) { STRCPY_FIXED(orig_entry.port_list[orig_entry.port_count++], orig_entry.port_list[0]); STRCPY_FIXED(pgxc_entry.port_list[pgxc_entry.port_count++], pgxc_entry.port_list[0]); } } /* if serverl ports were configured, the number of ports has to be equal to the number of IPs */ if (orig_entry.ip_count != orig_entry.port_count) { MYLOG(0, "The number of IP %d does not match the number of Port %d.\n", orig_entry.ip_count, orig_entry.port_count); return SQL_ERROR; } /* check the connection of each pair of IP and port and update the list and status */ if (!check_IP_connection(conn, &orig_entry)) { return SQL_ERROR; } memcpy(pgxc_entry.ip_status, orig_entry.ip_status, sizeof(orig_entry.ip_status)); /* start new thread to connect to datbase and select node_port from pgxc_node */ dsn_time read_cn; read_cn.DSN = ci->dsn; if (ci->refreshcnlisttime == 0) { read_cn.timeinterval = 10; } else { read_cn.timeinterval = ci->refreshcnlisttime; } start_new_thread(&read_cn); return ret; } int get_location(BOOL *visited, CnEntry *entry, int *visited_count) { if (visited == NULL || entry == NULL || visited_count == NULL) { return -1; } /* select random IP from ip_list */ srand(pthread_self()); unsigned int ind = rand() % entry->ip_count; /* record the offset of each IP for roundRobin */ int *offset = &entry->step[ind]; pthread_rwlock_t *offset_lock = &entry->step_lock[ind]; /* * if the selected IP can be connected and has not been visited, connect to this IP * else enter the while loop to choose another IP for connect * use visited_count to record the number of IPs that have been visited * visited_count equals to ip_count means reaching the limit */ while ((entry->ip_status[ind] == WRONG || visited[ind] == TRUE) && (*visited_count) != entry->ip_count) { if (visited[ind] == FALSE) { visited[ind] = TRUE; (*visited_count)++; } pthread_rwlock_wrlock(offset_lock); while (visited[ind] == TRUE && (*visited_count) != entry->ip_count) { ind = ind + *offset; (*offset)++; ind = ind % entry->ip_count; } pthread_rwlock_unlock(offset_lock); } /* * the selected IP after the while loop should not have been visited * if it has been visited, return error */ if (visited[ind] == TRUE) { return -1; } visited[ind] = TRUE; (*visited_count)++; return ind; } static RETCODE connect_random_IP(ConnectionClass *conn, CnEntry *entry) { RETCODE ret = SQL_ERROR; ConnInfo *ci = &conn->connInfo; CSTR func = "PGAPI_Connect"; char fchar; BOOL visited[MAX_CN] = {FALSE}; int visited_count = 0; BOOL check_ret = check_IP_connection(conn, entry); /* only connection successful and all connection failed will break the while loop */ while (ret == SQL_ERROR) { if (entry == &pgxc_entry && pthread_rwlock_rdlock(&entry->ip_list_lock) != 0) { return SQL_ERROR; } int ind = get_location(visited, entry, &visited_count); if (ind == -1) { pthread_rwlock_unlock(&entry->ip_list_lock); return SQL_ERROR; } STRCPY_FIXED(ci->server, entry->ip_list[ind]); STRCPY_FIXED(ci->port, entry->port_list[ind]); while (entry == &pgxc_entry && pthread_rwlock_unlock(&entry->ip_list_lock) != 0); if ((fchar = CC_connect(conn, NULL)) <= 0) { /* Error messages are filled in */ CC_log_error(func, "Error on CC_connect", conn); ret = SQL_ERROR; } else { ret = SQL_SUCCESS; } } if (ret == SQL_SUCCESS && fchar == 2) { ret = SQL_SUCCESS_WITH_INFO; } MYLOG(0, "leaving..%d.\n", ret); /* Empty the password stored in memory to avoid password leak */ if (NAME_IS_VALID(ci->password)) memset(ci->password.name, 0, strlen(ci->password.name)); return ret; } static RETCODE connect_IP(ConnectionClass *conn) { if (conn->connInfo.priority == 1 && orig_entry.is_usable && connect_random_IP(conn, &orig_entry) != SQL_ERROR) { return SQL_SUCCESS; } return connect_random_IP(conn, &pgxc_entry); } static RETCODE check_and_init(ConnectionClass *conn) { if (conn_precheck) { return SQL_SUCCESS; } if (pthread_rwlock_rdlock(&init_lock)) { return SQL_ERROR; } if (conn_inited) { pthread_rwlock_unlock(&init_lock); return SQL_SUCCESS; } pthread_rwlock_unlock(&init_lock); if (pthread_rwlock_wrlock(&init_lock)) { return SQL_ERROR; } if (conn_inited) { pthread_rwlock_unlock(&init_lock); return SQL_SUCCESS; } if (init_conn(conn) != SQL_SUCCESS) { pthread_rwlock_unlock(&init_lock); return SQL_ERROR; } else { conn_inited = TRUE; } pthread_rwlock_unlock(&init_lock); conn_precheck = TRUE; return SQL_SUCCESS; } static SQLRETURN CC_lookup_lo(ConnectionClass *self); static int CC_close_eof_cursors(ConnectionClass *self); static void LIBPQ_update_transaction_status(ConnectionClass *self); static void CC_set_error_if_not_set(ConnectionClass *self, int errornumber, const char *errormsg, const char *func) { int errornum = CC_get_errornumber(self); const char *errmsg = CC_get_errormsg(self); if (errornumber == 0) return; if (errornumber > 0) { if (errornum <= 0) CC_set_error(self, errornumber, errormsg, func); else if (!errmsg) CC_set_errormsg(self, errormsg); } else if (errornum == 0) CC_set_error(self, errornumber, errormsg, func); else if (errornum < 0 && !errmsg) CC_set_errormsg(self, errormsg); } RETCODE SQL_API PGAPI_AllocConnect(HENV henv, HDBC * phdbc) { EnvironmentClass *env = (EnvironmentClass *) henv; ConnectionClass *conn; CSTR func = "PGAPI_AllocConnect"; MYLOG(0, "entering...\n"); conn = CC_Constructor(); MYLOG(0, "**** henv = %p, conn = %p\n", henv, conn); if (!conn) { env->errormsg = "Couldn't allocate memory for Connection object."; env->errornumber = ENV_ALLOC_ERROR; *phdbc = SQL_NULL_HDBC; EN_log_error(func, "", env); return SQL_ERROR; } if (!EN_add_connection(env, conn)) { env->errormsg = "Maximum number of connections exceeded."; env->errornumber = ENV_ALLOC_ERROR; CC_Destructor(conn); *phdbc = SQL_NULL_HDBC; EN_log_error(func, "", env); return SQL_ERROR; } if (phdbc) *phdbc = (HDBC) conn; return SQL_SUCCESS; } RETCODE SQL_API PGAPI_Connect(HDBC hdbc, const SQLCHAR * szDSN, SQLSMALLINT cbDSN, const SQLCHAR * szUID, SQLSMALLINT cbUID, const SQLCHAR * szAuthStr, SQLSMALLINT cbAuthStr) { ConnectionClass *conn = (ConnectionClass *) hdbc; ConnInfo *ci; CSTR func = "PGAPI_Connect"; RETCODE ret = SQL_SUCCESS; char fchar, *tmpstr; if (!conn) { CC_log_error(func, "", NULL); return SQL_INVALID_HANDLE; } ci = &conn->connInfo; CC_conninfo_init(ci, INIT_GLOBALS); make_string(szDSN, cbDSN, ci->dsn, sizeof(ci->dsn)); /* get the values for the DSN from the registry */ getDSNinfo(ci, NULL); logs_on_off(1, ci->drivers.debug, ci->drivers.commlog); /* initialize pg_version from connInfo.protocol */ CC_initialize_pg_version(conn); MYLOG(0, "entering..cbDSN=%hi.\n", cbDSN); /* * override values from DSN info with UID and authStr(pwd) This only * occurs if the values are actually there. */ fchar = ci->username[0]; /* save the first byte */ make_string(szUID, cbUID, ci->username, sizeof(ci->username)); if ('\0' == ci->username[0]) /* an empty string is specified */ ci->username[0] = fchar; /* restore the original username */ tmpstr = make_string(szAuthStr, cbAuthStr, NULL, 0); if (tmpstr) { if (tmpstr[0]) /* non-empty string is specified */ STR_TO_NAME(ci->password, tmpstr); free(tmpstr); } MYLOG(0, "conn = %p (DSN='%s', UID='%s', PWD='%s')\n", conn, ci->dsn, ci->username, NAME_IS_VALID(ci->password) ? "xxxxx" : ""); if (ci->autobalance == 1) { if (check_and_init(conn) != SQL_SUCCESS) { return SQL_ERROR; } #ifdef WIN32 if (GetCurrentThreadId() != pgxc_node_thread_id) #else if (pthread_self() != pgxc_node_thread_id) #endif { while (refresh_flag != 1) { #ifdef WIN32 sleep(10); #else usleep(10000); #endif } } ret = connect_IP(conn); } else { if ((fchar = CC_connect(conn, NULL)) <= 0) { CC_log_error(func, "Error on CC_connect", conn); ret = SQL_ERROR; } if (SQL_SUCCESS == ret && 2 == fchar) { ret = SQL_SUCCESS_WITH_INFO; } MYLOG(0, "leaving..%d.\n", ret); if (NAME_IS_VALID(ci->password)) { memset(ci->password.name, 0, strlen(ci->password.name)); } } return ret; } RETCODE SQL_API PGAPI_BrowseConnect(HDBC hdbc, const SQLCHAR * szConnStrIn, SQLSMALLINT cbConnStrIn, SQLCHAR * szConnStrOut, SQLSMALLINT cbConnStrOutMax, SQLSMALLINT * pcbConnStrOut) { CSTR func = "PGAPI_BrowseConnect"; ConnectionClass *conn = (ConnectionClass *) hdbc; MYLOG(0, "entering...\n"); CC_set_error(conn, CONN_NOT_IMPLEMENTED_ERROR, "Function not implemented", func); return SQL_ERROR; } /* Drop any hstmts open on hdbc and disconnect from database */ RETCODE SQL_API PGAPI_Disconnect(HDBC hdbc) { ConnectionClass *conn = (ConnectionClass *) hdbc; CSTR func = "PGAPI_Disconnect"; MYLOG(0, "entering...\n"); if (!conn) { CC_log_error(func, "", NULL); return SQL_INVALID_HANDLE; } if (conn->status == CONN_EXECUTING) { CC_set_error(conn, CONN_IN_USE, "A transaction is currently being executed", func); return SQL_ERROR; } logs_on_off(-1, conn->connInfo.drivers.debug, conn->connInfo.drivers.commlog); MYLOG(0, "about to CC_cleanup\n"); /* Close the connection and free statements */ CC_cleanup(conn, FALSE); MYLOG(0, "done CC_cleanup\n"); MYLOG(0, "leaving...\n"); return SQL_SUCCESS; } RETCODE SQL_API PGAPI_FreeConnect(HDBC hdbc) { ConnectionClass *conn = (ConnectionClass *) hdbc; CSTR func = "PGAPI_FreeConnect"; EnvironmentClass *env; MYLOG(0, "entering...hdbc=%p\n", hdbc); if (!conn) { CC_log_error(func, "", NULL); return SQL_INVALID_HANDLE; } /* Remove the connection from the environment */ if (NULL != (env = CC_get_env(conn)) && !EN_remove_connection(env, conn)) { CC_set_error(conn, CONN_IN_USE, "A transaction is currently being executed", func); return SQL_ERROR; } CC_Destructor(conn); MYLOG(0, "leaving...\n"); return SQL_SUCCESS; } /* * IMPLEMENTATION CONNECTION CLASS */ static void reset_current_schema(ConnectionClass *self) { if (self->current_schema) { free(self->current_schema); self->current_schema = NULL; } self->current_schema_valid = FALSE; } static ConnectionClass * CC_alloc(void) { return (ConnectionClass *) calloc(sizeof(ConnectionClass), 1); } static void CC_lockinit(ConnectionClass *self) { INIT_CONNLOCK(self); INIT_CONN_CS(self); } static ConnectionClass * CC_initialize(ConnectionClass *rv, BOOL lockinit) { size_t clear_size; #if defined(WIN_MULTITHREAD_SUPPORT) || defined(POSIX_THREADMUTEX_SUPPORT) clear_size = (char *)&(rv->cs) - (char *)rv; #else clear_size = sizeof(ConnectionClass); #endif /* WIN_MULTITHREAD_SUPPORT */ memset(rv, 0, clear_size); rv->status = CONN_NOT_CONNECTED; rv->transact_status = CONN_IN_AUTOCOMMIT; /* autocommit by default */ rv->unnamed_prepared_stmt = NULL; rv->stmts = (StatementClass **) malloc(sizeof(StatementClass *) * STMT_INCREMENT); if (!rv->stmts) goto cleanup; memset(rv->stmts, 0, sizeof(StatementClass *) * STMT_INCREMENT); rv->num_stmts = STMT_INCREMENT; rv->descs = (DescriptorClass **) malloc(sizeof(DescriptorClass *) * STMT_INCREMENT); if (!rv->descs) goto cleanup; memset(rv->descs, 0, sizeof(DescriptorClass *) * STMT_INCREMENT); rv->num_descs = STMT_INCREMENT; rv->lobj_type = PG_TYPE_LO_UNDEFINED; if (isMsAccess()) rv->ms_jet = 1; rv->isolation = 0; // means initially unknown server's default isolation rv->mb_maxbyte_per_char = 1; rv->max_identifier_length = -1; rv->autocommit_public = SQL_AUTOCOMMIT_ON; /* Initialize statement options to defaults */ /* Statements under this conn will inherit these options */ InitializeStatementOptions(&rv->stmtOptions); InitializeARDFields(&rv->ardOptions); InitializeAPDFields(&rv->apdOptions); #ifdef _HANDLE_ENLIST_IN_DTC_ rv->asdum = NULL; rv->gTranInfo = 0; #endif /* _HANDLE_ENLIST_IN_DTC_ */ if (lockinit) CC_lockinit(rv); return rv; cleanup: CC_Destructor(rv); return NULL; } ConnectionClass * CC_Constructor() { ConnectionClass *rv, *retrv = NULL; if (rv = CC_alloc(), NULL != rv) retrv = CC_initialize(rv, TRUE); return retrv; } char CC_Destructor(ConnectionClass *self) { MYLOG(0, "entering self=%p\n", self); if (self->status == CONN_EXECUTING) return 0; CC_cleanup(self, FALSE); /* cleanup socket and statements */ MYLOG(0, "after CC_Cleanup\n"); /* Free up statement holders */ if (self->stmts) { free(self->stmts); self->stmts = NULL; } if (self->descs) { free(self->descs); self->descs = NULL; } MYLOG(0, "after free statement holders\n"); NULL_THE_NAME(self->schemaIns); NULL_THE_NAME(self->tableIns); CC_conninfo_release(&self->connInfo); if (self->__error_message) free(self->__error_message); DELETE_CONN_CS(self); DELETE_CONNLOCK(self); free(self); MYLOG(0, "leaving\n"); return 1; } /* Return how many cursors are opened on this connection */ int CC_cursor_count(ConnectionClass *self) { StatementClass *stmt; int i, count = 0; QResultClass *res; MYLOG(0, "self=%p, num_stmts=%d\n", self, self->num_stmts); CONNLOCK_ACQUIRE(self); for (i = 0; i < self->num_stmts; i++) { stmt = self->stmts[i]; if (stmt && (res = SC_get_Result(stmt)) && QR_get_cursor(res)) count++; } CONNLOCK_RELEASE(self); MYLOG(0, "leaving %d\n", count); return count; } void CC_clear_error(ConnectionClass *self) { if (!self) return; CONNLOCK_ACQUIRE(self); self->__error_number = 0; if (self->__error_message) { free(self->__error_message); self->__error_message = NULL; } self->sqlstate[0] = '\0'; CONNLOCK_RELEASE(self); } void CC_examine_global_transaction(ConnectionClass *self) { if (!self) return; #ifdef _HANDLE_ENLIST_IN_DTC_ if (CC_is_in_global_trans(self)) CALL_IsolateDtcConn(self, TRUE); #endif /* _HANDLE_ENLIST_IN_DTC_ */ } CSTR bgncmd = "START TRANSACTION"; CSTR cmtcmd = "COMMIT"; CSTR rbkcmd = "ROLLBACK"; CSTR svpcmd = "SAVEPOINT"; CSTR per_query_svp = "_per_query_svp_"; CSTR rlscmd = "RELEASE"; /* * Used to begin a transaction. */ char CC_begin(ConnectionClass *self) { char ret = TRUE; if (!CC_is_in_trans(self)) { QResultClass *res = CC_send_query(self, bgncmd, NULL, 0, NULL); MYLOG(0, " sending BEGIN!\n"); ret = QR_command_maybe_successful(res); QR_Destructor(res); } return ret; } /* * Used to commit a transaction. * We are almost always in the middle of a transaction. */ char CC_commit(ConnectionClass *self) { char ret = TRUE; if (CC_is_in_trans(self)) { if (!CC_is_in_error_trans(self)) CC_close_eof_cursors(self); if (CC_is_in_trans(self)) { QResultClass *res = CC_send_query(self, cmtcmd, NULL, 0, NULL); MYLOG(0, " sending COMMIT!\n"); ret = QR_command_maybe_successful(res); QR_Destructor(res); } } return ret; } /* * Used to cancel a transaction. * We are almost always in the middle of a transaction. */ char CC_abort(ConnectionClass *self) { char ret = TRUE; if (CC_is_in_trans(self)) { QResultClass *res = CC_send_query(self, rbkcmd, NULL, 0, NULL); MYLOG(0, " sending ABORT!\n"); ret = QR_command_maybe_successful(res); QR_Destructor(res); } return ret; } /* This is called by SQLSetConnectOption etc also */ char CC_set_autocommit(ConnectionClass *self, BOOL on) { BOOL currsts = CC_is_in_autocommit(self); if ((on && currsts) || (!on && !currsts)) return on; MYLOG(0, " %d->%d\n", currsts, on); if (CC_is_in_trans(self)) CC_commit(self); if (on) self->transact_status |= CONN_IN_AUTOCOMMIT; else self->transact_status &= ~CONN_IN_AUTOCOMMIT; return on; } /* Clear cached table info */ static void CC_clear_col_info(ConnectionClass *self, BOOL destroy) { if (self->col_info) { int i; COL_INFO *coli; for (i = 0; i < self->ntables; i++) { if (coli = self->col_info[i], NULL != coli) { if (destroy || coli->refcnt == 0) { free_col_info_contents(coli); free(coli); self->col_info[i] = NULL; } else coli->acc_time = 0; } } self->ntables = 0; if (destroy) { free(self->col_info); self->col_info = NULL; self->coli_allocated = 0; } } } static void CC_set_locale_encoding(ConnectionClass *self, const char * encoding) { char *currenc = self->locale_encoding; if (encoding) self->locale_encoding = strdup(encoding); else self->locale_encoding = NULL; if (currenc) free(currenc); } static void CC_determine_locale_encoding(ConnectionClass *self) { const char *dbencoding = PQparameterStatus(self->pqconn, "client_encoding"); const char *encoding; QLOG(0, "PQparameterStatus(%p, \"client_encoding\")=%s\n", self->pqconn, SAFE_STR(dbencoding)); if (self->locale_encoding) /* already set */ return; encoding = derive_locale_encoding(dbencoding); if (!encoding) encoding = "SQL_ASCII"; CC_set_locale_encoding(self, encoding); } static void CC_set_client_encoding(ConnectionClass *self, const char * encoding) { char *currenc = self->original_client_encoding; if (encoding) { self->original_client_encoding = strdup(encoding); self->ccsc = pg_CS_code(encoding); } else { self->original_client_encoding = NULL; self->ccsc = SQL_ASCII; } self->mb_maxbyte_per_char = pg_mb_maxlen(self->ccsc); if (currenc) free(currenc); } int CC_send_client_encoding(ConnectionClass *self, const char * encoding) { const char *dbencoding = PQparameterStatus(self->pqconn, "client_encoding"); if (encoding && (!dbencoding || stricmp(encoding, dbencoding))) { char query[64]; QResultClass *res; BOOL cmd_success; SPRINTF_FIXED(query, "set client_encoding to '%s'", encoding); res = CC_send_query(self, query, NULL, 0, NULL); cmd_success = QR_command_maybe_successful(res); QR_Destructor(res); if (!cmd_success) return SQL_ERROR; } CC_set_client_encoding(self, encoding); return SQL_SUCCESS; } /* This is called by SQLDisconnect also */ char CC_cleanup(ConnectionClass *self, BOOL keepCommunication) { int i; StatementClass *stmt; DescriptorClass *desc; if (self->status == CONN_EXECUTING) return FALSE; MYLOG(0, "entering self=%p\n", self); ENTER_CONN_CS(self); /* Cancel an ongoing transaction */ /* We are always in the middle of a transaction, */ /* even if we are in auto commit. */ if (self->pqconn) { QLOG(0, "PQfinish: %p\n", self->pqconn); PQfinish(self->pqconn); self->pqconn = NULL; } MYLOG(0, "after PQfinish\n"); /* Free all the stmts on this connection */ for (i = 0; i < self->num_stmts; i++) { stmt = self->stmts[i]; if (stmt) { stmt->hdbc = NULL; /* prevent any more dbase interactions */ SC_Destructor(stmt); self->stmts[i] = NULL; } } /* Free all the descs on this connection */ for (i = 0; i < self->num_descs; i++) { desc = self->descs[i]; if (desc) { DC_get_conn(desc) = NULL; /* prevent any more dbase interactions */ DC_Destructor(desc); free(desc); self->descs[i] = NULL; } } /* Check for translation dll */ #ifdef WIN32 if (!keepCommunication && self->translation_handle) { FreeLibrary(self->translation_handle); self->translation_handle = NULL; } #endif if (!keepCommunication) { self->status = CONN_NOT_CONNECTED; self->transact_status = CONN_IN_AUTOCOMMIT; self->unnamed_prepared_stmt = NULL; } if (!keepCommunication) { CC_conninfo_init(&(self->connInfo), CLEANUP_FOR_REUSE); if (self->original_client_encoding) { free(self->original_client_encoding); self->original_client_encoding = NULL; } if (self->locale_encoding) { free(self->locale_encoding); self->locale_encoding = NULL; } if (self->server_encoding) { free(self->server_encoding); self->server_encoding = NULL; } reset_current_schema(self); } /* Free cached table info */ CC_clear_col_info(self, TRUE); if (self->num_discardp > 0 && self->discardp) { for (i = 0; i < self->num_discardp; i++) free(self->discardp[i]); self->num_discardp = 0; } if (self->discardp) { free(self->discardp); self->discardp = NULL; } LEAVE_CONN_CS(self); MYLOG(0, "leaving\n"); return TRUE; } int CC_set_translation(ConnectionClass *self) { #ifdef WIN32 CSTR func = "CC_set_translation"; if (self->translation_handle != NULL) { FreeLibrary(self->translation_handle); self->translation_handle = NULL; } if (self->connInfo.translation_dll[0] == 0) return TRUE; self->translation_option = atoi(self->connInfo.translation_option); self->translation_handle = LoadLibrary(self->connInfo.translation_dll); if (self->translation_handle == NULL) { CC_set_error(self, CONN_UNABLE_TO_LOAD_DLL, "Could not load the translation DLL.", func); return FALSE; } self->DataSourceToDriver = (DataSourceToDriverProc) GetProcAddress(self->translation_handle, "SQLDataSourceToDriver"); self->DriverToDataSource = (DriverToDataSourceProc) GetProcAddress(self->translation_handle, "SQLDriverToDataSource"); if (self->DataSourceToDriver == NULL || self->DriverToDataSource == NULL) { CC_set_error(self, CONN_UNABLE_TO_LOAD_DLL, "Could not find translation DLL functions.", func); return FALSE; } #endif return TRUE; } #ifndef PG_DIAG_SEVERITY_NONLOCALIZED #define PG_DIAG_SEVERITY_NONLOCALIZED 'V' #endif void handle_pgres_error(ConnectionClass *self, const PGresult *pgres, const char *comment, QResultClass *res, BOOL error_not_a_notice) { char *errseverity; char *errseverity_nonloc = NULL; char *errprimary = NULL; char *errmsg = NULL; size_t errmsglen; char *sqlstate = NULL; int level = MIN_LOG_LEVEL; MYLOG(DETAIL_LOG_LEVEL, "entering\n"); sqlstate = PQresultErrorField(pgres, PG_DIAG_SQLSTATE); if (res && pgres) { if (sqlstate) STRCPY_FIXED(res->sqlstate, sqlstate); } if (NULL == pgres && NULL == self->pqconn) { const char *errmsg = "The connection has been lost"; MYLOG(0, "setting error message=%s\n", errmsg); QLOG(0, "\t%ssetting error message=%s\n", __FUNCTION__, errmsg); if (CC_get_errornumber(self) <= 0) CC_set_error(self, CONNECTION_COMMUNICATION_ERROR, errmsg, comment); if (res) { QR_set_rstatus(res, PORES_FATAL_ERROR); QR_set_message(res, errmsg); } goto cleanup; } /* * The full message with details and context and everything could * be obtained with PQresultErrorMessage(). I think that would be * more user-friendly, but for now, construct a message with * severity and primary message, which is backwards compatible. */ errseverity = PQresultErrorField(pgres, PG_DIAG_SEVERITY); if (PG_VERSION_GE(self, 9.6)) { errseverity_nonloc = PQresultErrorField(pgres, PG_DIAG_SEVERITY_NONLOCALIZED); MYLOG(0, "PG_DIAG_SEVERITY_NONLOCALIZED=%s\n", SAFE_STR(errseverity_nonloc)); } if (!error_not_a_notice) { if (errseverity_nonloc) { if (stricmp(errseverity_nonloc, "NOTICE") != 0) level = 1; } else if (errseverity) { if (stricmp(errseverity, "NOTICE") != 0) level = 1; } } errprimary = PQresultErrorField(pgres, PG_DIAG_MESSAGE_PRIMARY); if (errseverity_nonloc) QLOG(level, "\t%s(%s) %s '%s'\n", errseverity_nonloc, SAFE_STR(errseverity), SAFE_STR(sqlstate), SAFE_STR(errprimary)); else QLOG(level, "\t(%s) %s '%s'\n", SAFE_STR(errseverity), SAFE_STR(sqlstate), SAFE_STR(errprimary)); if (errprimary == NULL) { /* Hmm. got no primary message. Check if there's a connection error */ if (self->pqconn) errprimary = PQerrorMessage(self->pqconn); if (errprimary == NULL) errprimary = "no error information"; } if (errseverity && errprimary) { errmsglen = strlen(errseverity) + 2 + strlen(errprimary) + 1; errmsg = malloc(errmsglen); if (errmsg) snprintf(errmsg, errmsglen, "%s: %s", errseverity, errprimary); } if (errmsg == NULL) errmsg = errprimary; if (!error_not_a_notice) /* warning, notice, log etc */ { MYLOG(0, "notice message %s\n", errmsg); if (res) { if (QR_command_successful(res)) QR_set_rstatus(res, PORES_NONFATAL_ERROR); /* notice or warning */ QR_add_notice(res, errmsg); /* will dup this string */ } goto cleanup; } MYLOG(0, "error message=%s(" FORMAT_SIZE_T ")\n", errmsg, strlen(errmsg)); if (res) { QR_set_rstatus(res, PORES_FATAL_ERROR); /* error or fatal */ if (errmsg[0]) QR_set_message(res, errmsg); QR_set_aborted(res, TRUE); } /* * If the error is continuable after rollback? */ if (PQstatus(self->pqconn) == CONNECTION_BAD) { CC_set_errornumber(self, CONNECTION_COMMUNICATION_ERROR); CC_on_abort(self, CONN_DEAD); /* give up the connection */ } else if ((errseverity_nonloc && strcmp(errseverity_nonloc, "FATAL") == 0) || (NULL == errseverity_nonloc && errseverity && strcmp(errseverity, "FATAL") == 0)) /* no */ { CC_set_errornumber(self, CONNECTION_SERVER_REPORTED_SEVERITY_FATAL); CC_on_abort(self, CONN_DEAD); /* give up the connection */ } else /* yes */ { CC_set_errornumber(self, CONNECTION_SERVER_REPORTED_SEVERITY_ERROR); if (CC_is_in_trans(self)) CC_set_in_error_trans(self); } /* If any error/warning/notice happened, there should be a message in connection, * espacially while a connection is being created. * While a connection is being created, postgresql never returns any error. But in * our cluster, there maybe a warning or a notice when GTM is in trouble. */ CC_set_errormsg(self, errmsg); cleanup: if (errmsg != errprimary) free(errmsg); LIBPQ_update_transaction_status(self); } /* * This is a libpq notice receiver callback, for handling incoming NOTICE * messages while processing a query. */ typedef struct { ConnectionClass *conn; const char *comment; QResultClass *res; } notice_receiver_arg; void receive_libpq_notice(void *arg, const PGresult *pgres) { if (arg != NULL) { notice_receiver_arg *nrarg = (notice_receiver_arg *) arg; handle_pgres_error(nrarg->conn, pgres, nrarg->comment, nrarg->res, FALSE); } } static char CC_initial_log(ConnectionClass *self, const char *func) { const ConnInfo *ci = &self->connInfo; char *encoding, vermsg[128]; snprintf(vermsg, sizeof(vermsg), "Driver Version='%s,%s'" #ifdef WIN32 " linking %d" #ifdef _MT #ifdef _DLL " dynamic" #else " static" #endif /* _DLL */ " Multithread" #else " Singlethread" #endif /* _MT */ #ifdef _DEBUG " Debug" #endif /* DEBUG */ " library" #endif /* WIN32 */ "\n", POSTGRESDRIVERVERSION, __DATE__ #ifdef _MSC_VER , _MSC_VER #endif /* _MSC_VER */ ); QLOG(0, "%s", vermsg); MYLOG(DETAIL_LOG_LEVEL, "Global Options: fetch=%d, unknown_sizes=%d, max_varchar_size=%d, max_longvarchar_size=%d\n", ci->drivers.fetch_max, ci->drivers.unknown_sizes, ci->drivers.max_varchar_size, ci->drivers.max_longvarchar_size); MYLOG(DETAIL_LOG_LEVEL, " unique_index=%d, use_declarefetch=%d\n", ci->drivers.unique_index, ci->drivers.use_declarefetch); MYLOG(DETAIL_LOG_LEVEL, " text_as_longvarchar=%d, unknowns_as_longvarchar=%d, bools_as_char=%d NAMEDATALEN=%d\n", ci->drivers.text_as_longvarchar, ci->drivers.unknowns_as_longvarchar, ci->drivers.bools_as_char, TABLE_NAME_STORAGE_LEN); if (NULL == self->locale_encoding) { encoding = check_client_encoding(ci->conn_settings); CC_set_locale_encoding(self, encoding); MYLOG(DETAIL_LOG_LEVEL, " extra_systable_prefixes='%s', conn_settings='%s' conn_encoding='%s'\n", ci->drivers.extra_systable_prefixes, PRINT_NAME(ci->conn_settings), encoding ? encoding : ""); if (NULL != encoding) { free(encoding); } } if (self->status == CONN_DOWN) { CC_set_error_if_not_set(self, CONN_OPENDB_ERROR, "Connection broken.", func); return 0; } else if (self->status != CONN_NOT_CONNECTED) { CC_set_error_if_not_set(self, CONN_OPENDB_ERROR, "Already connected.", func); return 0; } MYLOG(0, "DSN = '%s', server = '%s', port = '%s', database = '%s'\n", ci->dsn, ci->server, ci->port, ci->database); return 1; } static int handle_show_results(const QResultClass *res); #define TRANSACTION_ISOLATION "transaction_isolation" #define ISOLATION_SHOW_QUERY "show " TRANSACTION_ISOLATION static char LIBPQ_CC_connect(ConnectionClass *self, char *salt_para) { int ret; CSTR func = "LIBPQ_CC_connect"; QResultClass *res; MYLOG(0, "entering...\n"); if (0 == CC_initial_log(self, func)) return 0; if (ret = LIBPQ_connect(self), ret <= 0) return ret; res = CC_send_query(self, "SET DateStyle = 'ISO';SET extra_float_digits = 2;" ISOLATION_SHOW_QUERY, NULL, READ_ONLY_QUERY, NULL); if (QR_command_maybe_successful(res)) { handle_show_results(res); ret = 1; } else ret = 0; QR_Destructor(res); return ret; } RETCODE CC_detect_batch_proto(ConnectionClass *self) { const char *value = NULL; const char *query = "select count(*) from pg_settings where name = 'support_batch_bind' and setting = 'on';"; PGresult *res = NULL; ExecStatusType restype; int cnt = 0; RETCODE ret = SQL_SUCCESS; CSTR func = "CC_detect_batch_proto"; mylog("%s: entering...\n", func); qlog(" [%s : conn = %p, query = select count(*) from pg_settings where name = 'support_batch_bind' and setting = 'on']\n", func, self); res = PQexec(self->pqconn, query); if (NULL == res) { self->connInfo.backend_support_batch_proto = 0; mylog("%s: NULL result detected, backend_support_batch_proto set to 0", func); qlog(" [%s : conn = %p, query result = NULL]\n", func, self); return SQL_ERROR; } restype = PQresultStatus(res); if (restype != PGRES_TUPLES_OK && restype != PGRES_SINGLE_TUPLE) { self->connInfo.backend_support_batch_proto = 0; PQclear(res); mylog("%s: invalid result type %d detected, backend_support_batch_proto set to 0", func, restype); qlog(" [%s : conn = %p, query result type = %d]\n", func, self, restype); return SQL_ERROR; } if (PQntuples(res) < 1 || PQnfields(res) < 1) { self->connInfo.backend_support_batch_proto = 0; mylog("%s: invalid result rowsCount(%d) or colsCount(%d) detected, backend_support_batch_proto set to 0", func, PQntuples(res), PQnfields(res)); qlog(" [%s : conn = %p, invalid query result : %d rows, %d columns]\n", func, self, PQntuples(res), PQnfields(res)); PQclear(res); return SQL_ERROR; } value = PQgetvalue(res, 0, 0); cnt = (value ? atoi(value) : 0); PQclear(res); if (cnt < 1) self->connInfo.backend_support_batch_proto = 0; else self->connInfo.backend_support_batch_proto = 1; if (0 == self->connInfo.backend_support_batch_proto && self->connInfo.use_batch_protocol) { CC_set_error(self, CONN_UNSUPPORTED_OPTION, "Backend does not support batch bind protocol, \"" INI_USEBATCHPROTOCOL "\" disabled.", func); ret = SQL_SUCCESS_WITH_INFO; } mylog("%s: query result: %d, backend_support_batch_proto set to %d", func, cnt, self->connInfo.backend_support_batch_proto); qlog(" [%s : conn = %p, query result : %d ]\n", func, self, cnt); mylog("%s: exiting\n", func); return ret; } char CC_connect(ConnectionClass *self, char *salt_para) { ConnInfo *ci = &(self->connInfo); CSTR func = "CC_connect"; char ret, *saverr = NULL, retsend; const char *errmsg = NULL; RETCODE batchDetectRet = 0; MYLOG(0, "entering...sslmode=%s\n", self->connInfo.sslmode); ret = LIBPQ_CC_connect(self, salt_para); if (ret <= 0) return ret; CC_set_translation(self); /* * Send any initial settings */ /* * Since these functions allocate statements, and since the connection * is not established yet, it would violate odbc state transition * rules. Therefore, these functions call the corresponding local * function instead. */ /* Per Datasource settings */ retsend = CC_send_settings(self, GET_NAME(self->connInfo.conn_settings)); if (CONN_DOWN == self->status) { ret = 0; goto cleanup; } if (CC_get_errornumber(self) > 0 && NULL != (errmsg = CC_get_errormsg(self))) saverr = strdup(errmsg); CC_clear_error(self); /* clear any error */ if (!SQL_SUCCEEDED(CC_lookup_lo(self))) /* a hack to get the oid of our large object oid type */ { ret = 0; goto cleanup; } /* * Multibyte handling * * Send 'UTF8' when required Unicode behavior, otherwise send * locale encodings. */ CC_clear_error(self); CC_determine_locale_encoding(self); /* determine the locale_encoding */ #ifdef UNICODE_SUPPORT if (CC_is_in_unicode_driver(self)) { if (!SQL_SUCCEEDED(CC_send_client_encoding(self, "UTF8"))) { ret = 0; goto cleanup; } } else /* for unicode drivers require ANSI behavior */ #endif /* UNICODE_SUPPORT */ { if (!SQL_SUCCEEDED(CC_send_client_encoding(self, self->locale_encoding))) { ret = 0; goto cleanup; } } CC_clear_error(self); if (self->server_isolation != self->isolation) if (!CC_set_transact(self, self->isolation)) { ret = 0; goto cleanup; } ci_updatable_cursors_set(ci); if (CC_get_errornumber(self) > 0) CC_clear_error(self); /* clear any initial command errors */ self->status = CONN_CONNECTED; if (CC_is_in_unicode_driver(self) && (CC_is_in_ansi_app(self) || 0 < ci->bde_environment)) self->unicode |= CONN_DISALLOW_WCHAR; MYLOG(0, "conn->unicode=%d Client Encoding='%s' (Code %d)\n", self->unicode, self->original_client_encoding, self->ccsc); batchDetectRet = CC_detect_batch_proto(self); if (batchDetectRet == SQL_ERROR) goto cleanup; else if (batchDetectRet == SQL_SUCCESS_WITH_INFO) { /* Caller will recognize 2 as SQL_SUCCESS_WITH_INFO. */ ret = 2; goto cleanup; } ret = 1; cleanup: MYLOG(0, "leaving...%d\n", ret); if (NULL != saverr) { if (ret > 0 && CC_get_errornumber(self) <= 0) CC_set_error(self, -1, saverr, func); free(saverr); } if (1 == ret && FALSE == retsend) ret = 2; return ret; } char CC_add_statement(ConnectionClass *self, StatementClass *stmt) { int i; char ret = TRUE; MYLOG(0, "self=%p, stmt=%p\n", self, stmt); CONNLOCK_ACQUIRE(self); for (i = 0; i < self->num_stmts; i++) { if (!self->stmts[i]) { stmt->hdbc = self; self->stmts[i] = stmt; break; } } if (i >= self->num_stmts) /* no more room -- allocate more memory */ { StatementClass **newstmts; Int2 new_num_stmts; new_num_stmts = STMT_INCREMENT + self->num_stmts; if (new_num_stmts > 0) newstmts = (StatementClass **) realloc(self->stmts, sizeof(StatementClass *) * new_num_stmts); else newstmts = NULL; /* num_stmts overflowed */ if (!newstmts) ret = FALSE; else { self->stmts = newstmts; memset(&self->stmts[self->num_stmts], 0, sizeof(StatementClass *) * STMT_INCREMENT); stmt->hdbc = self; self->stmts[self->num_stmts] = stmt; self->num_stmts = new_num_stmts; } } CONNLOCK_RELEASE(self); return ret; } static void CC_set_error_statements(ConnectionClass *self) { int i; MYLOG(0, "entering self=%p\n", self); for (i = 0; i < self->num_stmts; i++) { if (NULL != self->stmts[i]) SC_ref_CC_error(self->stmts[i]); } } char CC_remove_statement(ConnectionClass *self, StatementClass *stmt) { int i; char ret = FALSE; CONNLOCK_ACQUIRE(self); for (i = 0; i < self->num_stmts; i++) { if (self->stmts[i] == stmt && stmt->status != STMT_EXECUTING) { self->stmts[i] = NULL; ret = TRUE; break; } } CONNLOCK_RELEASE(self); return ret; } char CC_get_escape(const ConnectionClass *self) { const char *scf; static const ConnectionClass *conn = NULL; scf = PQparameterStatus(self->pqconn, "standard_conforming_strings"); if (self != conn) { QLOG(0, "PQparameterStatus(%p, \"standard_conforming_strings\")=%s\n", self->pqconn, SAFE_STR(scf)); conn = self; } if (scf == NULL) { /* we're connected to a pre-8.1 server, and E'' is not supported */ return '\0'; } if (strcmp(scf, "on") != 0) return ESCAPE_IN_LITERAL; else return '\0'; } int CC_get_max_idlen(ConnectionClass *self) { int len = self->max_identifier_length; if (len < 0) { QResultClass *res; res = CC_send_query(self, "show max_identifier_length", NULL, READ_ONLY_QUERY, NULL); if (QR_command_maybe_successful(res)) len = self->max_identifier_length = QR_get_value_backend_int(res, 0, 0, FALSE); QR_Destructor(res); } MYLOG(0, "max_identifier_length=%d\n", len); return len < 0 ? 0 : len; } static SQLINTEGER isolation_str_to_enum(const char *str_isolation) { SQLINTEGER isolation = 0; if (strnicmp(str_isolation, "seri", 4) == 0) isolation = SQL_TXN_SERIALIZABLE; else if (strnicmp(str_isolation, "repe", 4) == 0) isolation = SQL_TXN_REPEATABLE_READ; else if (strnicmp(str_isolation, "read com", 8) == 0) isolation = SQL_TXN_READ_COMMITTED; else if (strnicmp(str_isolation, "read unc", 8) == 0) isolation = SQL_TXN_READ_UNCOMMITTED; return isolation; } static int handle_show_results(const QResultClass *res) { int count = 0; const QResultClass *qres; ConnectionClass *conn = QR_get_conn(res); for (qres = res; qres; qres = qres->next) { if (!qres->command || stricmp(qres->command, "SHOW") != 0) continue; if (strcmp(QR_get_fieldname(qres, 0), TRANSACTION_ISOLATION) == 0) { conn->server_isolation = isolation_str_to_enum(QR_get_value_backend_text(qres, 0, 0)); MYLOG(0, "isolation %d to be %d\n", conn->server_isolation, conn->isolation); if (0 == conn->isolation) conn->isolation = conn->server_isolation; if (0 == conn->default_isolation) conn->default_isolation = conn->server_isolation; count++; } } return count; } /* * This function may not be called as long as ISOLATION_SHOW_QUERY is * issued in LIBPQ_CC_connect. */ SQLUINTEGER CC_get_isolation(ConnectionClass *self) { SQLUINTEGER isolation = 0; QResultClass *res; res = CC_send_query(self, ISOLATION_SHOW_QUERY, NULL, READ_ONLY_QUERY, NULL); if (QR_command_maybe_successful(res)) { handle_show_results(res); isolation = self->server_isolation; } QR_Destructor(res); MYLOG(0, "isolation=%d\n", isolation); return isolation; } void CC_set_error(ConnectionClass *self, int number, const char *message, const char *func) { CONNLOCK_ACQUIRE(self); if (self->__error_message) free(self->__error_message); self->__error_number = number; self->__error_message = message ? strdup(message) : NULL; if (0 != number) CC_set_error_statements(self); if (func && number != 0) CC_log_error(func, "", self); CONNLOCK_RELEASE(self); } void CC_set_errormsg(ConnectionClass *self, const char *message) { CONNLOCK_ACQUIRE(self); if (self->__error_message) free(self->__error_message); self->__error_message = message ? strdup(message) : NULL; CONNLOCK_RELEASE(self); } char CC_get_error(ConnectionClass *self, int *number, char **message) { int rv; MYLOG(0, "entering\n"); CONNLOCK_ACQUIRE(self); if (CC_get_errornumber(self)) { *number = CC_get_errornumber(self); *message = CC_get_errormsg(self); } rv = (CC_get_errornumber(self) != 0); CONNLOCK_RELEASE(self); MYLOG(0, "leaving\n"); return rv; } static int CC_close_eof_cursors(ConnectionClass *self) { int i, ccount = 0; StatementClass *stmt; QResultClass *res; if (!self->ncursors) return ccount; CONNLOCK_ACQUIRE(self); for (i = 0; i < self->num_stmts; i++) { if (stmt = self->stmts[i], NULL == stmt) continue; if (res = SC_get_Result(stmt), NULL == res) continue; if (NULL != QR_get_cursor(res) && QR_is_withhold(res) && QR_once_reached_eof(res)) { if (QR_get_num_cached_tuples(res) >= QR_get_num_total_tuples(res) || SQL_CURSOR_FORWARD_ONLY == stmt->options.cursor_type) { QR_close(res); ccount++; } } } CONNLOCK_RELEASE(self); return ccount; } static void CC_clear_cursors(ConnectionClass *self, BOOL on_abort) { int i; StatementClass *stmt; QResultClass *res; if (!self->ncursors) return; CONNLOCK_ACQUIRE(self); for (i = 0; i < self->num_stmts; i++) { stmt = self->stmts[i]; if (stmt && (res = SC_get_Result(stmt)) && (NULL != QR_get_cursor(res))) { /* * non-holdable cursors are automatically closed * at commit time. * all non-permanent cursors are automatically closed * at rollback time. */ if ((on_abort && !QR_is_permanent(res)) || !QR_is_withhold(res)) { QR_on_close_cursor(res); } else if (!QR_is_permanent(res)) { QResultClass *wres; char cmd[64]; if (QR_needs_survival_check(res)) { SPRINTF_FIXED(cmd, "MOVE 0 in \"%s\"", QR_get_cursor(res)); CONNLOCK_RELEASE(self); wres = CC_send_query(self, cmd, NULL, ROLLBACK_ON_ERROR | IGNORE_ABORT_ON_CONN | READ_ONLY_QUERY, NULL); QR_set_no_survival_check(res); if (QR_command_maybe_successful(wres) && CONN_ERROR_IGNORED != CC_get_errornumber(self)) QR_set_permanent(res); else QR_set_cursor(res, NULL); QR_Destructor(wres); CONNLOCK_ACQUIRE(self); MYLOG(DETAIL_LOG_LEVEL, "%p->permanent -> %d %p\n", res, QR_is_permanent(res), QR_get_cursor(res)); } else QR_set_permanent(res); } } } CONNLOCK_RELEASE(self); } static void CC_mark_cursors_doubtful(ConnectionClass *self) { int i; StatementClass *stmt; QResultClass *res; if (!self->ncursors) return; CONNLOCK_ACQUIRE(self); for (i = 0; i < self->num_stmts; i++) { stmt = self->stmts[i]; if (NULL != stmt && NULL != (res = SC_get_Result(stmt)) && NULL != QR_get_cursor(res) && !QR_is_permanent(res)) QR_set_survival_check(res); } CONNLOCK_RELEASE(self); } void CC_on_commit(ConnectionClass *conn) { if (conn->on_commit_in_progress) return; conn->on_commit_in_progress = 1; CONNLOCK_ACQUIRE(conn); if (CC_is_in_trans(conn)) { CC_set_no_trans(conn); CC_set_no_manual_trans(conn); } CC_svp_init(conn); CC_start_stmt(conn); CC_clear_cursors(conn, FALSE); CONNLOCK_RELEASE(conn); CC_discard_marked_objects(conn); CONNLOCK_ACQUIRE(conn); if (conn->result_uncommitted) { CONNLOCK_RELEASE(conn); ProcessRollback(conn, FALSE, FALSE); CONNLOCK_ACQUIRE(conn); conn->result_uncommitted = 0; } CONNLOCK_RELEASE(conn); conn->on_commit_in_progress = 0; } void CC_on_abort(ConnectionClass *conn, unsigned int opt) { BOOL set_no_trans = FALSE; MYLOG(0, "entering opt=%x\n", opt); CONNLOCK_ACQUIRE(conn); if (0 != (opt & CONN_DEAD)) /* CONN_DEAD implies NO_TRANS also */ opt |= NO_TRANS; if (CC_is_in_trans(conn)) { if (0 != (opt & NO_TRANS)) { CC_set_no_trans(conn); CC_set_no_manual_trans(conn); set_no_trans = TRUE; } } CC_svp_init(conn); CC_start_stmt(conn); CC_clear_cursors(conn, TRUE); if (0 != (opt & CONN_DEAD)) { conn->status = CONN_DOWN; if (conn->pqconn) { CONNLOCK_RELEASE(conn); QLOG(0, "PQfinish: %p\n", conn->pqconn); PQfinish(conn->pqconn); CONNLOCK_ACQUIRE(conn); conn->pqconn = NULL; } } else if (set_no_trans) { CONNLOCK_RELEASE(conn); CC_discard_marked_objects(conn); CONNLOCK_ACQUIRE(conn); } if (conn->result_uncommitted) { CONNLOCK_RELEASE(conn); ProcessRollback(conn, TRUE, FALSE); CONNLOCK_ACQUIRE(conn); conn->result_uncommitted = 0; } CONNLOCK_RELEASE(conn); } void CC_on_abort_partial(ConnectionClass *conn) { MYLOG(0, "entering\n"); CONNLOCK_ACQUIRE(conn); ProcessRollback(conn, TRUE, TRUE); CC_discard_marked_objects(conn); CONNLOCK_RELEASE(conn); } static BOOL is_setting_search_path(const char *query) { const char *q = query; if (strnicmp(q, "set", 3) != 0) return FALSE; q += 3; while (isspace(*q)) q++; for (; *q;) { if (IS_NOT_SPACE(*q)) { if (strnicmp(q, "search_path", 11) == 0) return TRUE; q++; while (IS_NOT_SPACE(*q)) q++; } else q++; } return FALSE; } static BOOL CC_from_PGresult(QResultClass *res, StatementClass *stmt, ConnectionClass *conn, const char *cursor, PGresult **pgres) { BOOL success = TRUE; if (!QR_from_PGresult(res, stmt, conn, cursor, pgres)) { QLOG(0, "\tGetting result from PGresult failed\n"); success = FALSE; if (0 >= CC_get_errornumber(conn)) { switch (QR_get_rstatus(res)) { case PORES_NO_MEMORY_ERROR: CC_set_error(conn, CONN_NO_MEMORY_ERROR, NULL, __FUNCTION__); break; case PORES_BAD_RESPONSE: CC_set_error(conn, CONNECTION_COMMUNICATION_ERROR, "communication error occured", __FUNCTION__); break; default: CC_set_error(conn, CONN_EXEC_ERROR, QR_get_message(res), __FUNCTION__); break; } } } return success; } int CC_internal_rollback(ConnectionClass *self, int rollback_type, BOOL ignore_abort) { int ret = 0; char cmd[128]; PGresult *pgres = NULL; if (!CC_is_in_error_trans(self)) return 1; switch (rollback_type) { case PER_STATEMENT_ROLLBACK: GenerateSvpCommand(self, INTERNAL_ROLLBACK_OPERATION, cmd, sizeof(cmd)); QLOG(0, "PQexec: %p '%s'\n", self->pqconn, cmd); pgres = PQexec(self->pqconn, cmd); switch (PQresultStatus(pgres)) { case PGRES_COMMAND_OK: QLOG(0, "\tok: - 'C' - %s\n", PQcmdStatus(pgres)); case PGRES_NONFATAL_ERROR: ret = 1; if (ignore_abort) CC_set_no_error_trans(self); LIBPQ_update_transaction_status(self); break; default: handle_pgres_error(self, pgres, __FUNCTION__, NULL, TRUE); break; } break; case PER_QUERY_ROLLBACK: SPRINTF_FIXED(cmd, "%s TO %s;%s %s" , rbkcmd, per_query_svp , rlscmd, per_query_svp); QLOG(0, "PQsendQuery: %p '%s'\n", self->pqconn, cmd); PQsendQuery(self->pqconn, cmd); ret = 0; while (self->pqconn && (pgres = PQgetResult(self->pqconn)) != NULL) { switch (PQresultStatus(pgres)) { case PGRES_COMMAND_OK: QLOG(0, "\tok: - 'C' - %s\n", PQcmdTuples(pgres)); ret = 1; break; case PGRES_NONFATAL_ERROR: ret = 1; default: handle_pgres_error(self, pgres, __FUNCTION__, NULL, !ret); } } if (!ret) { if (ignore_abort) CC_set_no_error_trans(self); else MYLOG(0, " return error\n"); } LIBPQ_update_transaction_status(self); break; } if (pgres) PQclear(pgres); return ret; } /* * The "result_in" is only used by QR_next_tuple() to fetch another group of rows into * the same existing QResultClass (this occurs when the tuple cache is depleted and * needs to be re-filled). * * The "cursor" is used by SQLExecute to associate a statement handle as the cursor name * (i.e., C3326857) for SQL select statements. This cursor is then used in future * 'declare cursor C3326857 for ...' and 'fetch 100 in C3326857' statements. * * * If issue_begin, send "BEGIN" * * if needed, send "SAVEPOINT ..." * * Send "query", read result * * Send appendq, read result. * */ QResultClass * CC_send_query_append(ConnectionClass *self, const char *query, QueryInfo *qi, UDWORD flag, StatementClass *stmt, const char *appendq) { CSTR func = "CC_send_query"; QResultClass *cmdres = NULL, *retres = NULL, *res = NULL; BOOL ignore_abort_on_conn = ((flag & IGNORE_ABORT_ON_CONN) != 0), create_keyset = ((flag & CREATE_KEYSET) != 0), issue_begin = ((flag & GO_INTO_TRANSACTION) != 0 && !CC_is_in_trans(self)), rollback_on_error, query_rollback, end_with_commit, read_only, prepend_savepoint = FALSE, ignore_roundtrip_time = ((self->connInfo.extra_opts & BIT_IGNORE_ROUND_TRIP_TIME) != 0); char *ptr; BOOL ReadyToReturn = FALSE, query_completed = FALSE, aborted = FALSE, used_passed_result_object = FALSE, discard_next_begin = FALSE, discard_next_savepoint = FALSE, discard_next_release = FALSE, consider_rollback; BOOL discardTheRest = FALSE; int func_cs_count = 0; PQExpBufferData query_buf = {0}; size_t query_len; /* QR_set_command() dups this string so doesn't need static */ char *cmdbuffer; PGresult *pgres = NULL; notice_receiver_arg nrarg; if (appendq) { MYLOG(0, "conn=%p, query='%s'+'%s'\n", self, query, appendq); } else { MYLOG(0, "conn=%p, query='%s'\n", self, query); } if (!self->pqconn) { PQExpBufferData pbuf = {0}; initPQExpBuffer(&pbuf); appendPQExpBuffer(&pbuf, "The connection is down\nFailed to send '%s'", query); CC_set_error(self, CONNECTION_COULD_NOT_SEND, pbuf.data, func); termPQExpBuffer(&pbuf); return NULL; } ENTER_INNER_CONN_CS(self, func_cs_count); /* Indicate that we are sending a query to the backend */ if ((NULL == query) || (query[0] == '\0')) { CLEANUP_FUNC_CONN_CS(func_cs_count, self); return NULL; } /* * In case the round trip time can be ignored, the query * and the appeneded query would be issued separately. * Otherwise a multiple command query would be issued. */ if (appendq && ignore_roundtrip_time) { res = CC_send_query_append(self, query, qi, flag, stmt, NULL); if (QR_command_maybe_successful(res)) { cmdres = CC_send_query_append(self, appendq, qi, flag & (~(GO_INTO_TRANSACTION)), stmt, NULL); if (QR_command_maybe_successful(cmdres)) res->next = cmdres; else { QR_Destructor(res); res = cmdres; } } CLEANUP_FUNC_CONN_CS(func_cs_count, self); return res; } rollback_on_error = (flag & ROLLBACK_ON_ERROR) != 0; end_with_commit = (flag & END_WITH_COMMIT) != 0; read_only = (flag & READ_ONLY_QUERY) != 0; #define return DONT_CALL_RETURN_FROM_HERE??? consider_rollback = (issue_begin || (CC_is_in_trans(self) && !CC_is_in_error_trans(self)) || strnicmp(query, "begin", 5) == 0); if (rollback_on_error) rollback_on_error = consider_rollback; query_rollback = (rollback_on_error && !end_with_commit && PG_VERSION_GE(self, 8.0)); if (!query_rollback && consider_rollback && !end_with_commit) { if (stmt) { StatementClass *astmt = SC_get_ancestor(stmt); unsigned int svpopt = 0; if (read_only) svpopt |= SVPOPT_RDONLY; if (!ignore_roundtrip_time) svpopt |= SVPOPT_REDUCE_ROUNDTRIP; if (!CC_started_rbpoint(self)) { if (SQL_ERROR == SetStatementSvp(astmt, svpopt)) { SC_set_error(stmt, STMT_INTERNAL_ERROR, "internal savepoint error", func); goto cleanup; } } } } /* prepend internal savepoint command ? */ if (PREPEND_IN_PROGRESS == self->internal_op) prepend_savepoint = TRUE; /* append all these together, to avoid round-trips */ query_len = strlen(query); MYLOG(0, "query_len=" FORMAT_SIZE_T "\n", query_len); initPQExpBuffer(&query_buf); /* issue_begin, query_rollback and prepend_savepoint are exclusive */ if (issue_begin) { appendPQExpBuffer(&query_buf, "%s;", bgncmd); discard_next_begin = TRUE; } else if (query_rollback && !self->connInfo.drivers.for_extension_connector) { appendPQExpBuffer(&query_buf, "%s %s;", svpcmd, per_query_svp); discard_next_savepoint = TRUE; } else if (prepend_savepoint) { char prepend_cmd[128]; GenerateSvpCommand(self, INTERNAL_SAVEPOINT_OPERATION, prepend_cmd, sizeof(prepend_cmd)); appendPQExpBuffer(&query_buf, "%s;", prepend_cmd); self->internal_op = SAVEPOINT_IN_PROGRESS; } appendPQExpBufferStr(&query_buf, query); if (appendq) { appendPQExpBuffer(&query_buf, ";%s", appendq); } if (query_rollback && !self->connInfo.drivers.for_extension_connector) { appendPQExpBuffer(&query_buf, ";%s %s", rlscmd, per_query_svp); } if (PQExpBufferDataBroken(query_buf)) { CC_set_error(self, CONN_NO_MEMORY_ERROR, "Couldn't alloc buffer for query.", ""); goto cleanup; } /* Set up notice receiver */ nrarg.conn = self; nrarg.comment = func; nrarg.res = NULL; PQsetNoticeReceiver(self->pqconn, receive_libpq_notice, &nrarg); QLOG(0, "PQsendQuery: %p '%s'\n", self->pqconn, query_buf.data); if (!PQsendQuery(self->pqconn, query_buf.data)) { char *errmsg = PQerrorMessage(self->pqconn); QLOG(0, "\nCommunication Error: %s\n", SAFE_STR(errmsg)); CC_set_error(self, CONNECTION_COMMUNICATION_ERROR, errmsg, func); goto cleanup; } PQsetSingleRowMode(self->pqconn); cmdres = qi ? qi->result_in : NULL; if (cmdres) used_passed_result_object = TRUE; else { cmdres = QR_Constructor(); if (!cmdres) { CC_set_error(self, CONNECTION_COULD_NOT_RECEIVE, "Could not create result info in send_query.", func); goto cleanup; } } res = cmdres; if (qi) { res->cmd_fetch_size = qi->fetch_size; res->cache_size = qi->row_size; } nrarg.res = res; while (self->pqconn && (pgres = PQgetResult(self->pqconn)) != NULL) { int status = PQresultStatus(pgres); if (discardTheRest) continue; switch (status) { case PGRES_COMMAND_OK: /* portal query command, no tuples returned */ /* read in the return message from the backend */ cmdbuffer = PQcmdStatus(pgres); QLOG(0, "\tok: - 'C' - %s\n", cmdbuffer); if (query_completed) /* allow for "show" style notices */ { res->next = QR_Constructor(); if (!res->next) { CC_set_error(self, CONNECTION_COULD_NOT_RECEIVE, "Could not create result info in send_query.", func); ReadyToReturn = TRUE; retres = NULL; break; } res = res->next; nrarg.res = res; } MYLOG(0, " setting cmdbuffer = '%s'\n", cmdbuffer); my_trim(cmdbuffer); /* get rid of trailing space */ if (strnicmp(cmdbuffer, bgncmd, strlen(bgncmd)) == 0) { CC_set_in_trans(self); CC_set_in_manual_trans(self); if (discard_next_begin) /* discard the automatically issued BEGIN */ { discard_next_begin = FALSE; break; /* discard the result */ } } /* * There are 2 risks to RELEASE an internal savepoint. * One is to RELEASE the savepoint invalitated * due to manually issued ROLLBACK or RELEASE. * Another is to invalitate manual SAVEPOINTs unexpectedly * by RELEASing the internal savepoint. */ else if (strnicmp(cmdbuffer, svpcmd, strlen(svpcmd)) == 0) { if (discard_next_savepoint) { discard_next_savepoint = FALSE; discard_next_release = TRUE; MYLOG(DETAIL_LOG_LEVEL, "Discarded a SAVEPOINT result\n"); break; /* discard the result */ } if (SAVEPOINT_IN_PROGRESS == self->internal_op) { CC_start_rbpoint(self); self->internal_op = 0; break; /* discard the result */ } /* Don't release the internal savepoint */ self->internal_svp = 0; } else if (strnicmp(cmdbuffer, rbkcmd, strlen(rbkcmd)) == 0) { CC_mark_cursors_doubtful(self); CC_set_in_error_trans(self); /* mark the transaction error in case of manual rollback */ self->internal_svp = 0; /* possibly an internal savepoint is invalid */ self->opt_previous = 0; /* unknown */ CC_init_opt_in_progress(self); } else if (strnicmp(cmdbuffer, rlscmd, strlen(rlscmd)) == 0) { if (discard_next_release) { MYLOG(DETAIL_LOG_LEVEL, "Discarded a RELEASE result\n"); discard_next_release = FALSE; break; /* discard the result */ } self->internal_svp = 0; if (SAVEPOINT_IN_PROGRESS == self->internal_op) break; /* discard the result */ } /* * DROP TABLE or ALTER TABLE may change * the table definition. So clear the * col_info cache though it may be too simple. */ else if (strnicmp(cmdbuffer, "DROP TABLE", 10) == 0 || strnicmp(cmdbuffer, "ALTER TABLE", 11) == 0) CC_clear_col_info(self, FALSE); else { ptr = strrchr(cmdbuffer, ' '); if (ptr) res->recent_processed_row_count = atoi(ptr + 1); else res->recent_processed_row_count = -1; if (self->current_schema_valid && strnicmp(cmdbuffer, "SET", 3) == 0) { if (is_setting_search_path(query)) reset_current_schema(self); } } if (QR_command_successful(res)) QR_set_rstatus(res, PORES_COMMAND_OK); QR_set_command(res, cmdbuffer); query_completed = TRUE; MYLOG(0, " returning res = %p\n", res); break; case PGRES_EMPTY_QUERY: /* We return the empty query */ QR_set_rstatus(res, PORES_EMPTY_QUERY); break; case PGRES_NONFATAL_ERROR: handle_pgres_error(self, pgres, "send_query", res, FALSE); break; case PGRES_BAD_RESPONSE: case PGRES_FATAL_ERROR: handle_pgres_error(self, pgres, "send_query", res, TRUE); /* We should report that an error occured. Zoltan */ aborted = TRUE; query_completed = TRUE; break; case PGRES_TUPLES_OK: QLOG(0, "\tok: - 'T' - %s\n", PQcmdStatus(pgres)); case PGRES_SINGLE_TUPLE: if (query_completed) { res->next = QR_Constructor(); if (!res->next) { CC_set_error(self, CONNECTION_COULD_NOT_RECEIVE, "Could not create result info in send_query.", func); ReadyToReturn = TRUE; retres = NULL; break; } if (create_keyset) { QR_set_haskeyset(res->next); if (stmt) res->next->num_key_fields = stmt->num_key_fields; } MYLOG(0, " 'T' no result_in: res = %p\n", res->next); res = res->next; nrarg.res = res; if (qi) { QR_set_cache_size(res, qi->row_size); res->cmd_fetch_size = qi->fetch_size; } } if (!used_passed_result_object) { const char *cursor = qi ? qi->cursor : NULL; if (create_keyset) { QR_set_haskeyset(res); if (stmt) res->num_key_fields = stmt->num_key_fields; if (cursor && cursor[0]) QR_set_synchronize_keys(res); } if (CC_from_PGresult(res, stmt, self, cursor, &pgres)) query_completed = TRUE; else { aborted = TRUE; if (QR_command_maybe_successful(res)) retres = NULL; else retres = cmdres; } } else { /* next fetch, so reuse an existing result */ const char *cursor = res->cursor_name; /* * called from QR_next_tuple and must return * immediately. */ if (!CC_from_PGresult(res, stmt, NULL, cursor, &pgres)) { retres = NULL; break; } retres = cmdres; } if (res->rstatus == PORES_TUPLES_OK && res->notice) { QR_set_rstatus(res, PORES_NONFATAL_ERROR); } else if (PORES_NO_MEMORY_ERROR == QR_get_rstatus(res)) { PGcancel *cancel = PQgetCancel(self->pqconn); char dummy[8]; discardTheRest = TRUE; if (cancel != NULL) { PQcancel(cancel, dummy, sizeof(dummy)); PQfreeCancel(cancel); } else goto cleanup; } break; case PGRES_COPY_OUT: /* XXX: We used to read from stdin here. Does that make any sense? */ case PGRES_COPY_IN: if (query_completed) { res->next = QR_Constructor(); if (!res->next) { CC_set_error(self, CONNECTION_COULD_NOT_RECEIVE, "Could not create result info in send_query.", func); ReadyToReturn = TRUE; retres = NULL; break; } res = res->next; nrarg.res = res; } QR_set_rstatus(res, PORES_COPY_IN); ReadyToReturn = TRUE; retres = cmdres; break; case PGRES_COPY_BOTH: default: /* skip the unexpected response if possible */ CC_set_error(self, CONNECTION_BACKEND_CRAZY, "Unexpected result status (send_query)", func); handle_pgres_error(self, pgres, "send_query", res, TRUE); CC_on_abort(self, CONN_DEAD); MYLOG(0, " error - %s\n", CC_get_errormsg(self)); ReadyToReturn = TRUE; retres = NULL; break; } if (pgres) { PQclear(pgres); pgres = NULL; } } cleanup: if (self->pqconn) PQsetNoticeReceiver(self->pqconn, receive_libpq_notice, NULL); if (pgres != NULL) { PQclear(pgres); pgres = NULL; } MYLOG(DETAIL_LOG_LEVEL, " rollback_on_error=%d CC_is_in_trans=%d discard_next_savepoint=%d query_rollback=%d\n", rollback_on_error, CC_is_in_trans(self), discard_next_savepoint, query_rollback); if (rollback_on_error && CC_is_in_trans(self) && !discard_next_savepoint) { if (query_rollback) { if (!CC_internal_rollback(self, PER_QUERY_ROLLBACK, ignore_abort_on_conn)) ignore_abort_on_conn = FALSE; } else if (CC_is_in_error_trans(self)) { QLOG(0, "PQexec: %p '%s'\n", self->pqconn, rbkcmd); pgres = PQexec(self->pqconn, rbkcmd); } /* * XXX: we don't check the result here. Should we? We're rolling back, * so it's not clear what else we can do on error. Giving an error * message to the application would be nice though. */ if (pgres != NULL) { PQclear(pgres); pgres = NULL; } } CLEANUP_FUNC_CONN_CS(func_cs_count, self); #undef return /* * Break before being ready to return. */ if (!ReadyToReturn) retres = cmdres; if (!PQExpBufferDataBroken(query_buf)) termPQExpBuffer(&query_buf); /* * Cleanup garbage results before returning. */ if (cmdres && retres != cmdres && !used_passed_result_object) QR_Destructor(cmdres); /* * Cleanup the aborted result if specified */ if (retres) { if (aborted) { /** if (ignore_abort_on_conn) { if (!used_passed_result_object) { QR_Destructor(retres); retres = NULL; } } **/ if (retres) { /* * discard results other than errors. */ QResultClass *qres; for (qres = retres; qres->next; qres = retres) { if (QR_get_aborted(qres)) break; retres = qres->next; qres->next = NULL; QR_Destructor(qres); } /* * If error message isn't set */ if (ignore_abort_on_conn) { CC_set_errornumber(self, CONN_ERROR_IGNORED); if (retres) QR_set_rstatus(retres, PORES_NONFATAL_ERROR); MYLOG(DETAIL_LOG_LEVEL, " ignored abort_on_conn\n"); } else if (retres) { if (NULL == CC_get_errormsg(self) || !CC_get_errormsg(self)[0]) CC_set_errormsg(self, QR_get_message(retres)); if (!self->sqlstate[0]) STRCPY_FIXED(self->sqlstate, retres->sqlstate); } } } } /* * Update our copy of the transaction status. * * XXX: Once we stop using the socket directly, and do everything with * libpq, we can get rid of the transaction_status field altogether * and always ask libpq for it. */ LIBPQ_update_transaction_status(self); if (retres) QR_set_conn(retres, self); return retres; } #define MAX_SEND_FUNC_ARGS 3 static const char *func_param_str[MAX_SEND_FUNC_ARGS + 1] = { "()", "($1)", "($1, $2)", "($1, $2, $3)" }; static Int8 odbc_hton64(Int8 h64) { union { Int8 n64; UInt4 i32[2]; } u; u.i32[0] = htonl((UInt4) (h64 >> 32)); u.i32[1] = htonl((UInt4) h64); return u.n64; } static Int8 odbc_ntoh64(Int8 n64) { union { Int8 h64; UInt4 i32[2]; } u; Int8 result; u.h64 = n64; result = ntohl(u.i32[0]); result <<= 32; result |= ntohl(u.i32[1]); return result; } int CC_send_function(ConnectionClass *self, const char *fn_name, void *result_buf, int *actual_result_len, int result_is_int, LO_ARG *args, int nargs) { int i; int ret = FALSE; int func_cs_count = 0; char sqlbuffer[1000]; PGresult *pgres = NULL; Oid paramTypes[MAX_SEND_FUNC_ARGS]; char *paramValues[MAX_SEND_FUNC_ARGS]; int paramLengths[MAX_SEND_FUNC_ARGS]; int paramFormats[MAX_SEND_FUNC_ARGS]; Int4 intParamBufs[MAX_SEND_FUNC_ARGS]; Int8 int8ParamBufs[MAX_SEND_FUNC_ARGS]; MYLOG(0, "conn=%p, fn_name=%s, result_is_int=%d, nargs=%d\n", self, fn_name, result_is_int, nargs); /* Finish the pending extended query first */ #define return DONT_CALL_RETURN_FROM_HERE??? ENTER_INNER_CONN_CS(self, func_cs_count); SPRINTF_FIXED(sqlbuffer, "SELECT pg_catalog.%s%s", fn_name, func_param_str[nargs]); for (i = 0; i < nargs; ++i) { MYLOG(0, " arg[%d]: len = %d, isint = %d, integer = " FORMATI64 ", ptr = %p\n", i, args[i].len, args[i].isint, args[i].isint == 2 ? args[i].u.integer64 : args[i].u.integer, args[i].u.ptr); /* integers are sent as binary, others as text */ if (args[i].isint == 2) { paramTypes[i] = PG_TYPE_INT8; int8ParamBufs[i] = odbc_hton64(args[i].u.integer64); paramValues[i] = (char *) &int8ParamBufs[i]; paramLengths[i] = 8; paramFormats[i] = 1; } else if (args[i].isint) { paramTypes[i] = PG_TYPE_INT4; intParamBufs[i] = htonl(args[i].u.integer); paramValues[i] = (char *) &intParamBufs[i]; paramLengths[i] = 4; paramFormats[i] = 1; } else { paramTypes[i] = 0; paramValues[i] = args[i].u.ptr; paramLengths[i] = args[i].len; paramFormats[i] = 1; } } QLOG(0, "PQexecParams: %p '%s' nargs=%d\n", self->pqconn, sqlbuffer, nargs); pgres = PQexecParams(self->pqconn, sqlbuffer, nargs, paramTypes, (const char * const *) paramValues, paramLengths, paramFormats, 1); MYLOG(0, "done sending function\n"); if (PQresultStatus(pgres) == PGRES_TUPLES_OK) QLOG(0, "\tok: - 'T' - %s\n", PQcmdStatus(pgres)); else { handle_pgres_error(self, pgres, "send_query", NULL, TRUE); goto cleanup; } if (PQnfields(pgres) != 1 || PQntuples(pgres) != 1) { CC_set_errormsg(self, "unexpected result set from large_object function"); goto cleanup; } *actual_result_len = PQgetlength(pgres, 0, 0); QLOG(0, "\tgot result with length: %d\n", *actual_result_len); if (*actual_result_len > 0) { char *value = PQgetvalue(pgres, 0, 0); if (result_is_int == 2) { Int8 int8val; memcpy(&int8val, value, sizeof(Int8)); int8val = odbc_ntoh64(int8val); memcpy(result_buf, &int8val, sizeof(Int8)); MYLOG(0, "int8 result=" FORMATI64 "\n", int8val); } else if (result_is_int) { Int4 int4val; memcpy(&int4val, value, sizeof(Int4)); int4val = ntohl(int4val); memcpy(result_buf, &int4val, sizeof(Int4)); } else memcpy(result_buf, value, *actual_result_len); } ret = TRUE; cleanup: #undef return CLEANUP_FUNC_CONN_CS(func_cs_count, self); if (pgres) PQclear(pgres); return ret; } char CC_send_settings(ConnectionClass *self, const char *set_query) { HSTMT hstmt; RETCODE result; char status = TRUE; char *cs, *ptr; #ifdef HAVE_STRTOK_R char *last; #endif /* HAVE_STRTOK_R */ CSTR func = "CC_send_settings"; MYLOG(0, "entering...\n"); if (set_query == NULL) return TRUE; /* * This function must use the local odbc API functions since the odbc state * has not transitioned to "connected" yet. */ result = PGAPI_AllocStmt(self, &hstmt, 0); if (!SQL_SUCCEEDED(result)) return FALSE; /* non-external handle ensures no BEGIN/COMMIT/ABORT stuff */ cs = strdup(set_query); if (cs == NULL) { CC_set_error(self, CONN_NO_MEMORY_ERROR, "Couldn't alloc buffer for query.", func); return FALSE; } #ifdef HAVE_STRTOK_R ptr = strtok_r(cs, ";", &last); #else ptr = strtok(cs, ";"); #endif /* HAVE_STRTOK_R */ while (ptr) { result = PGAPI_ExecDirect(hstmt, (SQLCHAR *) ptr, SQL_NTS, 0); if (!SQL_SUCCEEDED(result)) status = FALSE; MYLOG(0, "result %d, status %d from '%s'\n", result, status, ptr); #ifdef HAVE_STRTOK_R ptr = strtok_r(NULL, ";", &last); #else ptr = strtok(NULL, ";"); #endif /* HAVE_STRTOK_R */ } free(cs); PGAPI_FreeStmt(hstmt, SQL_DROP); return status; } /* * This function is just a hack to get the oid of our Large Object oid type. * If a real Large Object oid type is made part of Postgres, this function * will go away and the define 'PG_TYPE_LO' will be updated. */ static SQLRETURN CC_lookup_lo(ConnectionClass *self) { SQLRETURN ret = SQL_SUCCESS; QResultClass *res; MYLOG(0, "entering...\n"); res = CC_send_query(self, "select oid, typbasetype from pg_type where typname = '" PG_TYPE_LO_NAME "'", NULL, READ_ONLY_QUERY, NULL); if (!QR_command_maybe_successful(res)) ret = SQL_ERROR; else if (QR_command_maybe_successful(res) && QR_get_num_cached_tuples(res) > 0) { OID basetype; self->lobj_type = QR_get_value_backend_int(res, 0, 0, NULL); basetype = QR_get_value_backend_int(res, 0, 1, NULL); if (PG_TYPE_OID == basetype) self->lo_is_domain = 1; else if (0 != basetype) self->lobj_type = 0; } QR_Destructor(res); MYLOG(0, "Got the large object oid: %d\n", self->lobj_type); return ret; } /* * This function initializes the version of PostgreSQL from * connInfo.protocol that we're connected to. * h-inoue 01-2-2001 */ void CC_initialize_pg_version(ConnectionClass *self) { STRCPY_FIXED(self->pg_version, "7.4"); self->pg_version_major = 7; self->pg_version_minor = 4; } void CC_log_error(const char *func, const char *desc, const ConnectionClass *self) { #define NULLCHECK(a) (a ? a : "(NULL)") if (self) { MYLOG(0, "CONN ERROR: func=%s, desc='%s', errnum=%d, errmsg='%s'\n", func, desc, self->__error_number, NULLCHECK(self->__error_message)); MYLOG(DETAIL_LOG_LEVEL, " ------------------------------------------------------------\n"); MYLOG(DETAIL_LOG_LEVEL, " henv=%p, conn=%p, status=%u, num_stmts=%d\n", self->henv, self, self->status, self->num_stmts); MYLOG(DETAIL_LOG_LEVEL, " pqconn=%p, stmts=%p, lobj_type=%d\n", self->pqconn, self->stmts, self->lobj_type); } else { MYLOG(0, "INVALID CONNECTION HANDLE ERROR: func=%s, desc='%s'\n", func, desc); } } /* * This doesn't really return the CURRENT SCHEMA * but there's no alternative. */ const char * CC_get_current_schema(ConnectionClass *conn) { if (!conn->current_schema_valid) { QResultClass *res; if (res = CC_send_query(conn, "select current_schema()", NULL, READ_ONLY_QUERY, NULL), QR_command_maybe_successful(res)) { if (QR_get_num_total_tuples(res) == 1) { char *curschema = QR_get_value_backend_text(res, 0, 0); if (curschema) conn->current_schema = strdup(curschema); } if (conn->current_schema) conn->current_schema_valid = TRUE; } QR_Destructor(res); } return (const char *) conn->current_schema; } int CC_mark_a_object_to_discard(ConnectionClass *conn, int type, const char *plan) { int cnt = conn->num_discardp + 1, plansize; char *pname; CC_REALLOC_return_with_error(conn->discardp, char *, (cnt * sizeof(char *)), conn, "Couldn't alloc discardp.", -1); plansize = strlen(plan) + 2; CC_MALLOC_return_with_error(pname, char, plansize, conn, "Couldn't alloc discardp mem.", -1); pname[0] = (char) type; /* 's':prepared statement 'p':cursor */ strncpy_null(pname + 1, plan, plansize - 1); conn->discardp[conn->num_discardp++] = pname; return 1; } int CC_discard_marked_objects(ConnectionClass *conn) { int i, cnt; QResultClass *res; char *pname, cmd[64]; if ((cnt = conn->num_discardp) <= 0) return 0; for (i = cnt - 1; i >= 0; i--) { pname = conn->discardp[i]; if ('s' == pname[0]) SPRINTF_FIXED(cmd, "DEALLOCATE \"%s\"", pname + 1); else SPRINTF_FIXED(cmd, "CLOSE \"%s\"", pname + 1); res = CC_send_query(conn, cmd, NULL, ROLLBACK_ON_ERROR | IGNORE_ABORT_ON_CONN | READ_ONLY_QUERY, NULL); QR_Destructor(res); free(conn->discardp[i]); /* CodeDEX with CID=12044 */ conn->discardp[i] = NULL; conn->num_discardp--; } return 1; } static void LIBPQ_update_transaction_status(ConnectionClass *self) { if (!self->pqconn) return; switch (PQtransactionStatus(self->pqconn)) { case PQTRANS_IDLE: if (CC_is_in_trans(self)) { if (CC_is_in_error_trans(self)) CC_on_abort(self, NO_TRANS); else CC_on_commit(self); } break; case PQTRANS_INTRANS: CC_set_in_trans(self); if (CC_is_in_error_trans(self)) { CC_set_no_error_trans(self); CC_on_abort_partial(self); } break; case PQTRANS_INERROR: CC_set_in_trans(self); CC_set_in_error_trans(self); break; case PQTRANS_ACTIVE: /* * A query is still executing. It might have already aborted, * but all we know for sure is that we're in a transaction. */ CC_set_in_trans(self); break; default: /* unknown status */ break; } } static void CC_getLibpath(char *libpath, int libpathLen) { const char *fname = NULL; int fnameIndex = 0; int libpathIndex = 0; #ifndef WIN32 Dl_info dl_info; int ret = 0; if (libpath == NULL || libpathLen <= 0) { return ; } ret = dladdr((void*)CC_getLibpath, &dl_info); if (ret != 0) { fname = dl_info.dli_fname; } #else MEMORY_BASIC_INFORMATION mbi; char fpath[4096] = {'\0'}; if (libpath == NULL || libpathLen <= 0) { return ; } if ((VirtualQuery(CC_getLibpath, &mbi, sizeof(mbi)) != 0) && GetModuleFileName((HMODULE)mbi.AllocationBase, fpath, sizeof(fpath))) { fname = fpath; } #endif while ((fname != NULL) && (fname[fnameIndex] != '\0') && (libpathIndex < libpathLen - 1)) { if (fname[fnameIndex] == '\'') { libpath[libpathIndex++] = '\''; } else if ((fname[fnameIndex] == '"') || (fname[fnameIndex] == '\\')) { libpath[libpathIndex++] = '\\'; } libpath[libpathIndex++] = fname[fnameIndex++]; } libpath[libpathIndex] = '\0'; } static void CC_getOSUser(char *username, int usernameLen) { #ifndef WIN32 struct passwd *pw = NULL; if (username == NULL || usernameLen <= 0) { return ; } pw = getpwuid(geteuid()); if (pw == NULL) { username[0] = '\0'; } else { strncpy(username, pw->pw_name, strlen(pw->pw_name)); } #else DWORD len = usernameLen; if (username == NULL || usernameLen <= 0) { return ; } if(!GetUserName(username, &len)) { username[0] = '\0'; return ; } #endif } #define PROTOCOL3_OPTS_MAX 30 static int LIBPQ_connect(ConnectionClass *self) { CSTR func = "LIBPQ_connect"; ConnInfo *ci = &(self->connInfo); char ret = 0; void *pqconn = NULL; int pqret; int pversion; const char *opts[PROTOCOL3_OPTS_MAX], *vals[PROTOCOL3_OPTS_MAX]; PQconninfoOption *conninfoOption = NULL, *pqopt; int i, cnt; char login_timeout_str[20]; char keepalive_idle_str[20]; char keepalive_interval_str[20]; char *errmsg = NULL; char local_conninfo[8192]; MYLOG(0, "connecting to the database using %s as the server and pqopt={%s}\n", self->connInfo.server, SAFE_NAME(ci->pqopt)); if (NULL == (conninfoOption = PQconninfoParse(SAFE_NAME(ci->pqopt), &errmsg))) { char emsg[200]; if (errmsg != NULL) SPRINTF_FIXED(emsg, "libpq connection parameter error:%s", errmsg); else STRCPY_FIXED(emsg, "memory error? in PQconninfoParse"); CC_set_error(self, CONN_OPENDB_ERROR, emsg, func); goto cleanup; } /* Build arrays of keywords & values, for PQconnectDBParams */ cnt = 0; if (ci->server[0]) { opts[cnt] = "host"; vals[cnt++] = ci->server; } if (ci->port[0]) { opts[cnt] = "port"; vals[cnt++] = ci->port; } if (ci->database[0]) { opts[cnt] = "dbname"; vals[cnt++] = ci->database; } if (ci->username[0]) { opts[cnt] = "user"; vals[cnt++] = ci->username; } switch (ci->sslmode[0]) { case '\0': break; case SSLLBYTE_VERIFY: opts[cnt] = "sslmode"; switch (ci->sslmode[1]) { case 'f': vals[cnt++] = SSLMODE_VERIFY_FULL; break; case 'c': vals[cnt++] = SSLMODE_VERIFY_CA; break; default: vals[cnt++] = ci->sslmode; } break; default: opts[cnt] = "sslmode"; vals[cnt++] = ci->sslmode; } if (NAME_IS_VALID(ci->password)) { opts[cnt] = "password"; vals[cnt++] = SAFE_NAME(ci->password); } if (ci->disable_keepalive) { opts[cnt] = "keepalives"; vals[cnt++] = "0"; } if (self->login_timeout > 0) { SPRINTF_FIXED(login_timeout_str, "%u", (unsigned int) self->login_timeout); opts[cnt] = "connect_timeout"; vals[cnt++] = login_timeout_str; } if (self->connInfo.keepalive_idle > 0) { ITOA_FIXED(keepalive_idle_str, self->connInfo.keepalive_idle); opts[cnt] = "keepalives_idle"; vals[cnt++] = keepalive_idle_str; } if (self->connInfo.keepalive_interval > 0) { ITOA_FIXED(keepalive_interval_str, self->connInfo.keepalive_interval); opts[cnt] = "keepalives_interval"; vals[cnt++] = keepalive_interval_str; } if ((odbcVersionString != NULL) && (odbcVersionString[0] != '\0')) { if (self->connInfo.connection_extra_info > 0) { char libpath[4096] = {'\0'}; char username[128] = {'\0'}; (void)CC_getLibpath(libpath, sizeof(libpath)); (void)CC_getOSUser(username, sizeof(username)); snprintf(local_conninfo, sizeof(local_conninfo), "{\"driver_name\":\"ODBC\",\"driver_version\":\"%s\",\"driver_path\":\"%s\",\"os_user\":\"%s\"}", odbcVersionString, libpath, username); } else { snprintf(local_conninfo, sizeof(local_conninfo), "{\"driver_name\":\"ODBC\",\"driver_version\":\"%s\"}", odbcVersionString); } opts[cnt] = "connection_info"; vals[cnt++] = local_conninfo; } opts[cnt] = "target_session_attrs"; vals[cnt++] = "primary"; if (conninfoOption != NULL) { const char *keyword, *val; int j; for (i = 0, pqopt = conninfoOption; (keyword = pqopt->keyword) != NULL; i++, pqopt++) { if ((val = pqopt->val) != NULL) { for (j = 0; j < cnt; j++) { if (stricmp(opts[j], keyword) == 0) { char emsg[100]; if (vals[j] != NULL && strcmp(vals[j], val) == 0) continue; SPRINTF_FIXED(emsg, "%s parameter in pqopt option conflicts with other ordinary option", keyword); CC_set_error(self, CONN_OPENDB_ERROR, emsg, func); goto cleanup; } } if (j >= cnt && cnt < PROTOCOL3_OPTS_MAX - 1) { opts[cnt] = keyword; vals[cnt++] = val; } } } } opts[cnt] = vals[cnt] = NULL; /* Ok, we're all set to connect */ if (get_qlog() > 0 || get_mylog() > 0) { const char **popt, **pval; const char* pwdKey = "password"; QLOG(0, "PQconnectdbParams:"); for (popt = opts, pval = vals; *popt; popt++, pval++) { if (strcmp(pwdKey, *popt) == 0) { QPRINTF(0, " %s='xxxxx'", *popt); } else { QPRINTF(0, " %s='%s'", *popt, *pval); } } QPRINTF(0, "\n"); } pqconn = PQconnectdbParams(opts, vals, FALSE); if (!pqconn) { CC_set_error(self, CONN_OPENDB_ERROR, "PQconnectdb error", func); goto cleanup; } self->pqconn = pqconn; pqret = PQstatus(pqconn); if (pqret == CONNECTION_BAD && PQconnectionNeedsPassword(pqconn)) { const char *errmsg; MYLOG(0, "password retry\n"); errmsg = PQerrorMessage(pqconn); CC_set_error(self, CONNECTION_SERVER_NOT_REACHED, errmsg, func); QLOG(0, "PQfinish: %p\n", pqconn); PQfinish(pqconn); self->pqconn = NULL; self->connInfo.password_required = TRUE; ret = -1; goto cleanup; } if (CONNECTION_OK != pqret) { const char *errmsg; MYLOG(DETAIL_LOG_LEVEL, "status=%d\n", pqret); errmsg = PQerrorMessage(pqconn); CC_set_error(self, CONNECTION_SERVER_NOT_REACHED, errmsg, func); MYLOG(0, "Could not establish connection to the database; LIBPQ returned -> %s\n", errmsg); goto cleanup; } if (PQpass(pqconn) && strlen(PQpass(pqconn))) { char *pwd = PQpass(pqconn); memset(pwd, 0, strlen(pwd)); } MYLOG(0, "libpq connection to the database established.(IP: %s)\n", PQhost(pqconn)); pversion = PQprotocolVersion(pqconn); if (pversion < 3) { MYLOG(0, "Protocol version %d is not supported\n", pversion); goto cleanup; } MYLOG(0, "protocol=%d\n", pversion); pversion = PQserverVersion(pqconn); self->pg_version_major = pversion / 10000; self->pg_version_minor = (pversion % 10000) / 100; SPRINTF_FIXED(self->pg_version, "%d.%d.%d", self->pg_version_major, self->pg_version_minor, pversion % 100); MYLOG(0, "Server version=%s\n", self->pg_version); if (!CC_get_username(self)[0]) { MYLOG(0, "PQuser=%s\n", PQuser(pqconn)); STRCPY_FIXED(self->connInfo.username, PQuser(pqconn)); } ret = 1; cleanup: if (errmsg != NULL) free(errmsg); PQconninfoFree(conninfoOption); if (ret != 1) { if (self->pqconn) { QLOG(0, "PQfinish: %p\n", self->pqconn); PQfinish(self->pqconn); } self->pqconn = NULL; } MYLOG(0, "leaving %d\n", ret); return ret; } int CC_send_cancel_request(const ConnectionClass *conn) { int ret = 0; char errbuf[256]; void *cancel; /* Check we have an open connection */ if (!conn || !conn->pqconn) return FALSE; cancel = PQgetCancel(conn->pqconn); if (!cancel) return FALSE; ret = PQcancel(cancel, errbuf, sizeof(errbuf)); PQfreeCancel(cancel); if (1 == ret) return TRUE; else return FALSE; } const char *CurrCat(const ConnectionClass *conn) { /* * Returning the database name causes problems in MS Query. It * generates query like: "SELECT DISTINCT a FROM byronnbad3 * bad3" */ if (isMsQuery()) /* MS Query */ return NULL; return conn->connInfo.database; } const char *CurrCatString(const ConnectionClass *conn) { const char *cat = CurrCat(conn); if (!cat) cat = NULL_STRING; return cat; } /*------ * Create a null terminated lower-case string if the * original string contains upper-case characters. * The SQL_NTS length is considered. *------ */ SQLCHAR * make_lstring_ifneeded(ConnectionClass *conn, const SQLCHAR *s, ssize_t len, BOOL ifallupper) { ssize_t length = len; char *str = NULL; const char *ccs = (const char *) s; if (s && (len > 0 || (len == SQL_NTS && (length = strlen(ccs)) > 0))) { int i; UCHAR tchar; encoded_str encstr; make_encoded_str(&encstr, conn, ccs); for (i = 0; i < length; i++) { tchar = encoded_nextchar(&encstr); if (MBCS_NON_ASCII(encstr)) continue; if (ifallupper && islower(tchar)) { if (str) { free(str); str = NULL; } break; } if (tolower(tchar) != tchar) { if (!str) { str = malloc(length + 1); if (!str) return NULL; memcpy(str, s, length); str[length] = '\0'; } str[i] = tolower(tchar); } } } return (SQLCHAR *) str; } /* * Concatenate a single formatted argument to a given buffer handling the SQL_NTS thing. * "fmt" must contain somewhere in it the single form '%.*s'. * This is heavily used in creating queries for info routines (SQLTables, SQLColumns). * This routine could be modified to use vsprintf() to handle multiple arguments. */ static int my_str(char *buf, int buflen, const char *fmt, const char *s, ssize_t len) { if (s && (len > 0 || (len == SQL_NTS && *s != 0))) { size_t length = (len > 0) ? len : strlen(s); return snprintf(buf, buflen, fmt, length, s); } buf[0] = '\0'; return 0; } int schema_str(char *buf, int buflen, const SQLCHAR *s, SQLLEN len, BOOL table_is_valid, ConnectionClass *conn) { CSTR fmt = "%.*s"; buf[0] = '\0'; if (!s || 0 == len) { /* * Note that this driver assumes the implicit schema is * the CURRENT_SCHEMA() though it doesn't worth the * naming. */ if (table_is_valid) return my_str(buf, buflen, fmt, CC_get_current_schema(conn), SQL_NTS); return 0; } return my_str(buf, buflen, fmt, (char *) s, len); } static void my_appendPQExpBuffer(PQExpBufferData *buf, const char *fmt, const char *s, ssize_t len) { if (s && (len > 0 || (len == SQL_NTS && *s != 0))) { size_t length = (len > 0) ? len : strlen(s); appendPQExpBuffer(buf, fmt, length, s); } } /* * my_appendPQExpBuffer1 is a extension of my_appendPQExpBuffer. * It can have 1 more parameter than my_aapendPQExpBuffer. */ static void my_appendPQExpBuffer1(PQExpBufferData *buf, const char *fmt, const char *s1, const char *s) { if (s && s[0] != '\0') { ssize_t length = strlen(s); if (s1) appendPQExpBuffer(buf, fmt, s1, length, s); else appendPQExpBuffer(buf, fmt, length, s); } } void schema_appendPQExpBuffer(PQExpBufferData *buf, const char *fmt, const SQLCHAR *s, SQLLEN len, BOOL table_is_valid, ConnectionClass *conn) { if (!s || 0 == len) { /* * Note that this driver assumes the implicit schema is * the CURRENT_SCHEMA() though it doesn't worth the * naming. */ if (table_is_valid) my_appendPQExpBuffer(buf, fmt, CC_get_current_schema(conn), SQL_NTS); return; } my_appendPQExpBuffer(buf, fmt, (char *) s, len); } void schema_appendPQExpBuffer1(PQExpBufferData *buf, const char *fmt, const char *s1, const char *s, BOOL table_is_valid, ConnectionClass *conn) { if (!s || s[0] == '\0') { if (table_is_valid) my_appendPQExpBuffer1(buf, fmt, s1, CC_get_current_schema(conn)); return; } my_appendPQExpBuffer1(buf, fmt, s1, s); } #ifdef _HANDLE_ENLIST_IN_DTC_ /* * Export the following functions so that the pgenlist dll * can handle ConnectionClass objects as opaque ones. */ #define _PGDTC_FUNCS_IMPLEMENT_ #include "connexp.h" #define SYNC_AUTOCOMMIT(conn) \ (SQL_AUTOCOMMIT_OFF != conn->autocommit_public ? \ (conn->transact_status |= CONN_IN_AUTOCOMMIT) : \ (conn->transact_status &= ~CONN_IN_AUTOCOMMIT)) DLL_DECLARE void PgDtc_create_connect_string(void *self, char *connstr, int strsize) { ConnectionClass *conn = (ConnectionClass *) self; ConnInfo *ci = &(conn->connInfo); const char *drivername = ci->drivername; char xaOptStr[32]; #if defined(_WIN32) && !defined(_WIN64) /* * If this is an x86 driver running on an x64 host then the driver name * passed to MSDTC must be the (x64) driver but the client app will be * using the 32-bit driver name. So MSDTC.exe will fail to find the driver * and we'll fail to recover XA transactions. * * IsWow64Process(...) would be the ideal function for this, but is only * available on Windows 6+ (Vista/2k8). We'd use GetNativeSystemInfo, which * is supported on XP and 2k3, instead, but that won't link with older * SDKs. * * It's impler to just test the PROCESSOR_ARCHITEW6432 environment * variable. * * See http://www.postgresql.org/message-id/53A45B59.70303@2ndquadrant.com * for details on this issue. */ const char * const procenv = getenv("PROCESSOR_ARCHITEW6432"); if (procenv != NULL && strcmp(procenv, "AMD64") == 0) { /* * We're a 32-bit binary running under SysWow64 on a 64-bit host and need * to pass a different driver name. * * To avoid playing memory management games, just return a different * string constant depending on the unicode-ness of the driver. * * It probably doesn't matter whether we use the Unicode or ANSI driver * for the XA transaction manager, but pick the same as the client driver * to keep things as similar as possible. */ if (strcmp(drivername, DBMS_NAME) == 0) #ifdef UNICODE_SUPPORT drivername = DBMS_NAME_UNICODE"(x64)"; #else drivername = DBMS_NAME_ANSI"(x64)"; #endif } #endif // _WIN32 && !_WIN64 if (0 >= ci->xa_opt) return; switch (ci->xa_opt) { case DTC_CHECK_LINK_ONLY: case DTC_CHECK_BEFORE_LINK: SPRINTF_FIXED(xaOptStr, KEYWORD_DTC_CHECK "=0;"); break; case DTC_CHECK_RM_CONNECTION: SPRINTF_FIXED(xaOptStr, KEYWORD_DTC_CHECK "=1;"); break; default: *xaOptStr = '\0'; break; } snprintf(connstr, strsize, "DRIVER={%s};%s" "SERVER=%s;PORT=%s;DATABASE=%s;" "UID=%s;PWD=%s;" ABBR_SSLMODE "=%s", drivername, xaOptStr, ci->server, ci->port, ci->database, ci->username, SAFE_NAME(ci->password), ci->sslmode ); return; } #define SECURITY_WIN32 #include DLL_DECLARE int PgDtc_is_recovery_available(void *self, char *reason, int rsize) { ConnectionClass *conn = (ConnectionClass *) self; ConnInfo *ci = &(conn->connInfo); int ret = -1; // inknown LONG nameSize; char loginUser[256]; BOOL outReason = FALSE; BOOL doubtRootCert = TRUE, doubtCert = TRUE; const char *delim; /* * Root certificate is used? */ if (NULL != reason && rsize > 0) outReason = TRUE; /* * Root certificate is used? */ doubtRootCert = FALSE; if (0 == stricmp(ci->sslmode, SSLMODE_VERIFY_CA) || 0 == stricmp(ci->sslmode, SSLMODE_VERIFY_FULL)) { if (outReason) strncpy_null(reason, "sslmode verify-[ca|full]", rsize); return 0; } /* * Did we use SSL client certificate, SSPI, Kerberos or similar * authentication methods? * There seems no way to check it directly. */ doubtCert = FALSE; if (PQgetssl(conn->pqconn) != NULL) doubtCert = TRUE; nameSize = sizeof(loginUser); if (GetUserNameEx(NameUserPrincipal, loginUser, &nameSize)) { MYLOG(0, "loginUser=%s\n", loginUser); } else { int err = GetLastError(); switch (err) { case ERROR_NONE_MAPPED: MYLOG(0, "The user name is unavailable in the specified format\n"); break; case ERROR_NO_SUCH_DOMAIN: MYLOG(0, "The domain controller is unavailable to perform the lookup\n"); break; case ERROR_MORE_DATA: MYLOG(0, "The buffer is too small\n"); break; default: MYLOG(0, "GetUserNameEx error=%d\n", err); break; } } ret = 1; if (outReason) *reason = '\0'; delim = ""; if (doubtRootCert) { if (outReason) snprintf(reason, rsize, "%s%ssslmode verify-[ca|full]", reason, delim); delim = ", "; ret = -1; } if (doubtCert) { if (outReason) snprintf(reason, rsize, "%s%scertificate", reason, delim); delim = ", "; ret = -1; } return ret; } DLL_DECLARE void PgDtc_set_async(void *self, void *async) { ConnectionClass *conn = (ConnectionClass *) self; if (!conn) return; CONNLOCK_ACQUIRE(conn); if (NULL != async) CC_set_autocommit(conn, FALSE); else SYNC_AUTOCOMMIT(conn); conn->asdum = async; CONNLOCK_RELEASE(conn); } DLL_DECLARE void *PgDtc_get_async(void *self) { ConnectionClass *conn = (ConnectionClass *) self; return conn->asdum; } DLL_DECLARE void PgDtc_set_property(void *self, int property, void *value) { ConnectionClass *conn = (ConnectionClass *) self; CONNLOCK_ACQUIRE(conn); switch (property) { case inprogress: if (NULL != value) CC_set_dtc_executing(conn); else CC_no_dtc_executing(conn); break; case enlisted: if (NULL != value) CC_set_dtc_enlisted(conn); else CC_no_dtc_enlisted(conn); break; case prepareRequested: if (NULL != value) CC_set_dtc_prepareRequested(conn); else CC_no_dtc_prepareRequested(conn); break; } CONNLOCK_RELEASE(conn); } DLL_DECLARE void PgDtc_set_error(void *self, const char *message, const char *func) { ConnectionClass *conn = (ConnectionClass *) self; CC_set_error(conn, CONN_UNSUPPORTED_OPTION, message, func); } DLL_DECLARE int PgDtc_get_property(void *self, int property) { ConnectionClass *conn = (ConnectionClass *) self; int ret; CONNLOCK_ACQUIRE(conn); switch (property) { case inprogress: ret = CC_is_dtc_executing(conn); break; case enlisted: ret = CC_is_dtc_enlisted(conn); break; case inTrans: ret = CC_is_in_trans(conn); break; case errorNumber: ret = CC_get_errornumber(conn); break; case idleInGlobalTransaction: ret = CC_is_idle_in_global_transaction(conn); break; case connected: ret = (CONN_CONNECTED == conn->status); break; case prepareRequested: ret = CC_is_dtc_prepareRequested(conn); break; } CONNLOCK_RELEASE(conn); return ret; } DLL_DECLARE BOOL PgDtc_connect(void *self) { CSTR func = "PgDtc_connect"; ConnectionClass *conn = (ConnectionClass *) self; if (CONN_CONNECTED == conn->status) return TRUE; if (CC_connect(conn, NULL) <= 0) { /* Error messages are filled in */ CC_log_error(func, "Error on CC_connect", conn); return FALSE; } return TRUE; } DLL_DECLARE void PgDtc_free_connect(void *self) { ConnectionClass *conn = (ConnectionClass *) self; PGAPI_FreeConnect(conn); } DLL_DECLARE BOOL PgDtc_one_phase_operation(void *self, int operation) { ConnectionClass *conn = (ConnectionClass *) self; BOOL ret, is_in_progress = CC_is_dtc_executing(conn); if (!is_in_progress) CC_set_dtc_executing(conn); switch (operation) { case ONE_PHASE_COMMIT: ret = CC_commit(conn); break; default: ret = CC_abort(conn); break; } if (!is_in_progress) CC_no_dtc_executing(conn); return ret; } DLL_DECLARE BOOL PgDtc_two_phase_operation(void *self, int operation, const char *gxid) { ConnectionClass *conn = (ConnectionClass *) self; QResultClass *qres; BOOL ret = TRUE; char cmd[512]; switch (operation) { case PREPARE_TRANSACTION: SPRINTF_FIXED(cmd, "PREPARE TRANSACTION '%s'", gxid); break; case COMMIT_PREPARED: SPRINTF_FIXED(cmd, "COMMIT PREPARED '%s'", gxid); break; case ROLLBACK_PREPARED: SPRINTF_FIXED(cmd, "ROLLBACK PREPARED '%s'", gxid); break; } qres = CC_send_query(conn, cmd, NULL, 0, NULL); if (!QR_command_maybe_successful(qres)) ret = FALSE; QR_Destructor(qres); return ret; } DLL_DECLARE BOOL PgDtc_lock_cntrl(void *self, BOOL acquire, BOOL bTrial) { ConnectionClass *conn = (ConnectionClass *) self; BOOL ret = TRUE; if (acquire) if (bTrial) ret = TRY_ENTER_CONN_CS(conn); else ENTER_CONN_CS(conn); else LEAVE_CONN_CS(conn); return ret; } static ConnectionClass * CC_Copy(const ConnectionClass *conn) { ConnectionClass *newconn = CC_alloc(); if (newconn) { memcpy(newconn, conn, sizeof(ConnectionClass)); CC_lockinit(newconn); } return newconn; } DLL_DECLARE void * PgDtc_isolate(void *self, DWORD option) { BOOL disposingConn = (0 != (disposingConnection & option)); ConnectionClass *sconn = (ConnectionClass *) self, *newconn = NULL; if (0 == (useAnotherRoom & option)) { HENV henv = sconn->henv; CC_cleanup(sconn, TRUE); if (newconn = CC_Copy(sconn), NULL == newconn) return newconn; MYLOG(0, "newconn=%p from %p\n", newconn, sconn); CC_initialize(sconn, FALSE); if (!disposingConn) CC_copy_conninfo(&sconn->connInfo, &newconn->connInfo); CC_initialize_pg_version(sconn); sconn->henv = henv; newconn->henv = NULL; SYNC_AUTOCOMMIT(sconn); return newconn; } newconn = CC_Constructor(); if (!newconn) return NULL; CC_copy_conninfo(&newconn->connInfo, &sconn->connInfo); CC_initialize_pg_version(newconn); newconn->asdum = sconn->asdum; newconn->gTranInfo = sconn->gTranInfo; CC_set_dtc_isolated(newconn); sconn->asdum = NULL; SYNC_AUTOCOMMIT(sconn); CC_set_dtc_clear(sconn); MYLOG(0, "generated connection=%p with %p\n", newconn, newconn->asdum); return newconn; } #endif /* _HANDLE_ENLIST_IN_DTC_ */ BOOL CC_set_transact(ConnectionClass *self, UInt4 isolation) { char *query; QResultClass *res; BOOL bShow = FALSE; if (PG_VERSION_LT(self, 8.0) && (isolation == SQL_TXN_READ_UNCOMMITTED || isolation == SQL_TXN_REPEATABLE_READ)) { CC_set_error(self, CONN_NOT_IMPLEMENTED_ERROR, "READ_UNCOMMITTED or REPEATABLE_READ is not supported by the server", __FUNCTION__); return FALSE; } switch (isolation) { case SQL_TXN_SERIALIZABLE: query = "SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE"; break; case SQL_TXN_REPEATABLE_READ: query = "SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL REPEATABLE READ"; break; case SQL_TXN_READ_UNCOMMITTED: query = "SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ UNCOMMITTED"; break; default: query = "SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED"; break; } if (self->default_isolation == 0) bShow = TRUE; if (bShow) res = CC_send_query_append(self, ISOLATION_SHOW_QUERY, NULL, READ_ONLY_QUERY, NULL, query); else res = CC_send_query(self, query, NULL, READ_ONLY_QUERY, NULL); if (!QR_command_maybe_successful(res)) { CC_set_error(self, CONN_EXEC_ERROR, "ISOLATION change request to the server error", __FUNCTION__); QR_Destructor(res); return FALSE; } if (bShow) handle_show_results(res); QR_Destructor(res); self->server_isolation = isolation; return TRUE; }