diff --git a/query_classifier/query_classifier.cc b/query_classifier/query_classifier.cc index 7b2ec8b99..82cf85b71 100644 --- a/query_classifier/query_classifier.cc +++ b/query_classifier/query_classifier.cc @@ -764,7 +764,6 @@ static int is_autocommit_stmt( { String str(target, sizeof(target), system_charset_info); String* res = item->val_str(&str); - int rc; if ((rc = find_type(&bool_typelib, res->ptr(), res->length(), false))) { diff --git a/server/core/gateway.c b/server/core/gateway.c index bd773c7f9..2bd592fe7 100644 --- a/server/core/gateway.c +++ b/server/core/gateway.c @@ -136,7 +136,7 @@ static void usage(void); static char* get_expanded_pathname( char** abs_path, char* input_path, - char* fname); + const char* fname); static void print_log_n_stderr( bool do_log, bool do_stderr, @@ -725,7 +725,7 @@ static bool file_is_writable( static char* get_expanded_pathname( char** output_path, char* relative_path, - char* fname) + const char* fname) { char* cnf_file_buf = NULL; char* expanded_path; @@ -1228,7 +1228,7 @@ int main(int argc, char **argv) datadir))); LOGIF(LM, (skygw_log_write_flush( LOGFILE_MESSAGE, - "Configuration file : %s", + "Configuration file : %s", cnf_file_path))); /*< Update the server options */ diff --git a/server/include/config.h b/server/include/config.h index af91bbf5a..09f2459e1 100644 --- a/server/include/config.h +++ b/server/include/config.h @@ -78,17 +78,18 @@ typedef struct { int n_threads; /**< Number of polling threads */ } GATEWAY_CONF; -extern int config_load(char *); -extern int config_reload(); -extern int config_threadcount(); -CONFIG_PARAMETER* config_get_param(CONFIG_PARAMETER* params, const char* name); +extern int config_load(char *); +extern int config_reload(); +extern int config_threadcount(); +CONFIG_PARAMETER* config_get_param(CONFIG_PARAMETER* params, const char* name); +config_param_type_t config_get_paramtype(CONFIG_PARAMETER* param); +CONFIG_PARAMETER* config_clone_param(CONFIG_PARAMETER* param); bool config_set_qualified_param( CONFIG_PARAMETER* param, void* val, config_param_type_t type); -CONFIG_PARAMETER* config_clone_param(CONFIG_PARAMETER* param); int config_get_valint( CONFIG_PARAMETER* param, diff --git a/server/modules/include/readwritesplit.h b/server/modules/include/readwritesplit.h index 9a63ab6f0..408743c25 100644 --- a/server/modules/include/readwritesplit.h +++ b/server/modules/include/readwritesplit.h @@ -31,7 +31,6 @@ #include - typedef enum backend_type_t { BE_UNDEFINED=-1, BE_MASTER, @@ -105,27 +104,45 @@ typedef struct sescmd_cursor_st { * Internal structure used to define the set of backend servers we are routing * connections to. This provides the storage for routing module specific data * that is required for each of the backend servers. + * + * Owned by router_instance, referenced by each routing session. */ -typedef struct backend { +typedef struct backend_st { #if defined(SS_DEBUG) skygw_chk_t be_chk_top; #endif SERVER* backend_server; /*< The server itself */ int backend_conn_count; /*< Number of connections to the server */ bool be_valid; /*< valid when belongs to the router's configuration */ - DCB* be_dcb; - /*< cursor is pointer and status variable to current session command */ - sescmd_cursor_t be_sescmd_cursor; #if defined(SS_DEBUG) skygw_chk_t be_chk_tail; #endif } BACKEND; + +/** + * Reference to BACKEND. + * + * Owned by router client session. + */ +typedef struct backend_ref_st { +#if defined(SS_DEBUG) + skygw_chk_t bref_chk_top; +#endif + BACKEND* bref_backend; + DCB* bref_dcb; + sescmd_cursor_t bref_sescmd_cur; +#if defined(SS_DEBUG) + skygw_chk_t bref_chk_tail; +#endif +} backend_ref_t; + + typedef struct rwsplit_config_st { int rw_max_slave_conn_percent; int rw_max_slave_conn_count; } rwsplit_config_t; - + /** * The client session structure used within this router. @@ -139,11 +156,13 @@ struct router_client_session { bool rses_closed; /*< true when closeSession is called */ /** Properties listed by their type */ rses_property_t* rses_properties[RSES_PROP_TYPE_COUNT]; - BACKEND* rses_master; /*< Pointer to master */ - BACKEND** rses_backend; /*< All backends used by client session */ - rwsplit_config_t rses_config; /*< copied config info from router instance */ + backend_ref_t* rses_master_ref; + backend_ref_t* rses_backend_ref; /*< Pointer to backend reference array */ + rwsplit_config_t rses_config; /*< copied config info from router instance */ int rses_nbackends; int rses_capabilities; /*< input type, for example */ + bool rses_autocommit_enabled; + bool rses_transaction_active; struct router_client_session* next; #if defined(SS_DEBUG) skygw_chk_t rses_chk_tail; @@ -178,4 +197,8 @@ typedef struct router_instance { struct router_instance* next; /*< Next router on the list */ } ROUTER_INSTANCE; +#define BACKEND_TYPE(b) (SERVER_IS_MASTER((b)->backend_server) ? BE_MASTER : \ + (SERVER_IS_SLAVE((b)->backend_server) ? BE_SLAVE : \ + (SERVER_IS_JOINED((b)->backend_server) ? BE_JOINED : BE_UNDEFINED))); + #endif /*< _RWSPLITROUTER_H */ diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index f056da724..de4d803a5 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -70,13 +70,13 @@ static void clientReply( DCB* backend_dcb); static uint8_t getCapabilities (ROUTER* inst, void* router_session); -static int backend_cmp( - const void* be_1, - const void* be_2); +int bref_cmp( + const void* bref1, + const void* bref2); static bool select_connect_backend_servers( - BACKEND** p_master, - BACKEND** b, + backend_ref_t** p_master_ref, + backend_ref_t* backend_ref, int router_nservers, int max_nslaves, SESSION* session, @@ -131,7 +131,7 @@ static mysql_sescmd_t* rses_property_get_sescmd( rses_property_t* prop); static bool execute_sescmd_in_backend( - BACKEND* backend); + backend_ref_t* backend_ref); static void sescmd_cursor_set_active( sescmd_cursor_t* sescmd_cursor, @@ -154,13 +154,11 @@ static GWBUF* sescmd_cursor_process_replies( GWBUF* replybuf, sescmd_cursor_t* scur); -#if 0 /*< disabled for now due multiple slaves changes */ static void tracelog_routed_query( ROUTER_CLIENT_SES* rses, char* funcname, - DCB* dcb, + backend_ref_t* bref, GWBUF* buf); -#endif static bool route_session_write( ROUTER_CLIENT_SES* router_client_ses, @@ -255,13 +253,12 @@ static ROUTER* createInstance( if (options != NULL) { - LOGIF(LM, (skygw_log_write_flush( + LOGIF(LM, (skygw_log_write( LOGFILE_MESSAGE, "Router options supplied to read/write statement router " "module but none are supported. The options will be " "ignored."))); } - /** * Create an array of the backend servers in the router structure to * maintain a count of the number of connections to each @@ -284,14 +281,13 @@ static ROUTER* createInstance( router->servers[nservers]->backend_server = server; router->servers[nservers]->backend_conn_count = 0; router->servers[nservers]->be_valid = false; - router->servers[nservers]->be_dcb = NULL; #if defined(SS_DEBUG) router->servers[nservers]->be_chk_top = CHK_NUM_BACKEND; router->servers[nservers]->be_chk_tail = CHK_NUM_BACKEND; #endif nservers += 1; server = server->nextdb; - } + } router->servers[nservers] = NULL; /** @@ -369,45 +365,87 @@ static ROUTER* createInstance( * @param session The session itself * @return Session specific data for this session */ - static void* newSession( ROUTER* router_inst, SESSION* session) { - BACKEND** pp_backend; + backend_ref_t* backend_ref; /*< array of backend references (DCB,BACKEND,cursor) */ + backend_ref_t* master_ref = NULL; /*< pointer to selected master */ BACKEND** b; - BACKEND* master = NULL; /*< pointer to selected master */ ROUTER_CLIENT_SES* client_rses = NULL; ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)router_inst; bool succp; int router_nservers = 0; /*< # of servers in total */ int max_nslaves; /*< max # of slaves used in this session */ int conf_max_nslaves; /*< value from configuration file */ - - b = router->servers; - - /** count servers */ - while (*(b++) != NULL) router_nservers++; - - /** Master + Slave is minimum requirement */ - if (router_nservers < 2) - { - /** log */ - goto return_rses; - } + int i; + static int min_nservers = 1; /*< hard-coded for now */ + client_rses = (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES)); - + if (client_rses == NULL) { ss_dassert(false); goto return_rses; } +#if defined(SS_DEBUG) + client_rses->rses_chk_top = CHK_NUM_ROUTER_SES; + client_rses->rses_chk_tail = CHK_NUM_ROUTER_SES; +#endif /** Copy config struct from router instance */ client_rses->rses_config = router->rwsplit_config; /** - * Either copy direct count of slave connections or calculate the count - * from percentage value. + * Set defaults to session variables. + */ + client_rses->rses_autocommit_enabled = true; + client_rses->rses_transaction_active = false; + + /** count servers */ + b = router->servers; + while (*(b++) != NULL) router_nservers++; + + /** With too few servers session is not created */ + if (router_nservers < min_nservers) + { + /** log this */ + goto return_rses; + } + /** + * Create backend reference objects for this session. + */ + backend_ref = (backend_ref_t *)calloc (1, router_nservers*sizeof(backend_ref_t)); + + if (backend_ref == NULL) + { + /** log this */ + free(client_rses); + goto return_rses; + } + /** + * Initialize backend references with BACKEND ptr. + * Initialize session command cursors for each backend reference. + */ + for (i=0; i< router_nservers; i++) + { +#if defined(SS_DEBUG) + backend_ref[i].bref_chk_top = CHK_NUM_BACKEND_REF; + backend_ref[i].bref_chk_tail = CHK_NUM_BACKEND_REF; + backend_ref[i].bref_sescmd_cur.scmd_cur_chk_top = CHK_NUM_SESCMD_CUR; + backend_ref[i].bref_sescmd_cur.scmd_cur_chk_tail = CHK_NUM_SESCMD_CUR; +#endif + backend_ref[i].bref_backend = router->servers[i]; + /** store pointers to sescmd list to both cursors */ + backend_ref[i].bref_sescmd_cur.scmd_cur_rses = client_rses; + backend_ref[i].bref_sescmd_cur.scmd_cur_active = false; + backend_ref[i].bref_sescmd_cur.scmd_cur_ptr_property = + &client_rses->rses_properties[RSES_PROP_TYPE_SESCMD]; + backend_ref[i].bref_sescmd_cur.scmd_cur_cmd = NULL; + } + /** + * Find out the number of read backend servers. + * Depending on the configuration value type, either copy direct count + * of slave connections or calculate the count from percentage value. */ if (client_rses->rses_config.rw_max_slave_conn_count > 0) { @@ -417,48 +455,22 @@ static void* newSession( { conf_max_nslaves = (router_nservers*client_rses->rses_config.rw_max_slave_conn_percent)/100; - } - max_nslaves = MIN(router_nservers-1, MAX(1, conf_max_nslaves)); - pp_backend = (BACKEND **)calloc(1, (router_nservers)*sizeof(BACKEND *)); - - /** - * Copy backend pointer array from global router instance to private use. - */ - memcpy(pp_backend, router->servers, router_nservers*sizeof(BACKEND *)); - ss_dassert(pp_backend[router_nservers] == NULL); - - spinlock_init(&client_rses->rses_lock); -#if defined(SS_DEBUG) - client_rses->rses_chk_top = CHK_NUM_ROUTER_SES; - client_rses->rses_chk_tail = CHK_NUM_ROUTER_SES; -#endif - b = pp_backend; - - /** Set up ses cmd objects for each backend */ - while (*b != NULL) - { - /** store pointers to sescmd list to both cursors */ - (*b)->be_sescmd_cursor.scmd_cur_rses = client_rses; - (*b)->be_sescmd_cursor.scmd_cur_active = false; - (*b)->be_sescmd_cursor.scmd_cur_ptr_property = - &client_rses->rses_properties[RSES_PROP_TYPE_SESCMD]; - (*b)->be_sescmd_cursor.scmd_cur_cmd = NULL; -#if defined(SS_DEBUG) - (*b)->be_sescmd_cursor.scmd_cur_chk_top = CHK_NUM_SESCMD_CUR; - (*b)->be_sescmd_cursor.scmd_cur_chk_tail = CHK_NUM_SESCMD_CUR; -#endif - b++; } - /** + max_nslaves = MIN(router_nservers-1, MAX(1, conf_max_nslaves)); + + spinlock_init(&client_rses->rses_lock); + client_rses->rses_backend_ref = backend_ref; + + /** * Find a backend servers to connect to. */ - succp = select_connect_backend_servers(&master, - pp_backend, - router_nservers, - max_nslaves, - session, - router); - + succp = select_connect_backend_servers(&master_ref, + backend_ref, + router_nservers, + max_nslaves, + session, + router); + /** Both Master and at least 1 slave must be found */ if (!succp) { free(client_rses); @@ -466,11 +478,12 @@ static void* newSession( goto return_rses; } /** Copy backend pointers to router session. */ - client_rses->rses_master = master; - client_rses->rses_backend = pp_backend; + client_rses->rses_master_ref = master_ref; + client_rses->rses_backend_ref = backend_ref; client_rses->rses_nbackends = router_nservers; /*< # of backend servers */ client_rses->rses_capabilities = RCAP_TYPE_STMT_INPUT; router->stats.n_sessions += 1; + /** * Version is bigger than zero once initialized. */ @@ -504,12 +517,12 @@ static void closeSession( void* router_session) { ROUTER_CLIENT_SES* router_cli_ses; - BACKEND** b; + backend_ref_t* backend_ref; router_cli_ses = (ROUTER_CLIENT_SES *)router_session; CHK_CLIENT_RSES(router_cli_ses); - b = router_cli_ses->rses_backend; + backend_ref = router_cli_ses->rses_backend_ref; /** * Lock router client session for secure read and update. */ @@ -524,28 +537,27 @@ static void closeSession( * whithout checking this first. */ router_cli_ses->rses_closed = true; - - while (*b != NULL) + + for (i=0; irses_nbackends; i++) { /** decrease server current connection counters */ - atomic_add(&(*b)->backend_server->stats.n_current, -1); + atomic_add(&backend_ref[i].bref_backend->backend_server->stats.n_current, -1); /** Close those which had been connected */ - if ((*b)->be_dcb != NULL) + if (backend_ref[i].bref_dcb != NULL) { - CHK_DCB((*b)->be_dcb); - dcbs[i] = (*b)->be_dcb; - (*b)->be_dcb = NULL; /*< prevent new uses of DCB */ + CHK_DCB(backend_ref[i].bref_dcb); + dcbs[i] = backend_ref[i].bref_dcb; + backend_ref[i].bref_dcb = + (DCB *)0xdeadbeef; /*< prevent new uses of DCB */ dcbs[i]->func.close(dcbs[i]); } - b++; } /** Unlock */ rses_end_locked_router_action(router_cli_ses); } } - static void freeSession( ROUTER* router_instance, void* router_client_session) @@ -553,13 +565,16 @@ static void freeSession( ROUTER_CLIENT_SES* router_cli_ses; ROUTER_INSTANCE* router; int i; + backend_ref_t* backend_ref; router_cli_ses = (ROUTER_CLIENT_SES *)router_client_session; - router = (ROUTER_INSTANCE *)router_instance; - - atomic_add(&router_cli_ses->rses_backend[BE_SLAVE]->backend_conn_count, -1); - atomic_add(&router_cli_ses->rses_backend[BE_MASTER]->backend_conn_count, -1); - + router = (ROUTER_INSTANCE *)router_instance; + backend_ref = router_cli_ses->rses_backend_ref; + + for (i=0; irses_nbackends; i++) + { + atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1); + } spinlock_acquire(&router->lock); if (router->connections == router_cli_ses) { @@ -602,49 +617,58 @@ static void freeSession( return; } + static bool get_dcb( DCB** p_dcb, ROUTER_CLIENT_SES* rses, backend_type_t btype) { - BACKEND** b; - int smallest_nconn = -1; - bool succp = false; + backend_ref_t* backend_ref; + int smallest_nconn = -1; + int i; + bool succp = false; CHK_CLIENT_RSES(rses); - ss_dassert(*(p_dcb) == NULL); - b = rses->rses_backend; + ss_dassert(p_dcb != NULL && *(p_dcb) == NULL); + + if (p_dcb == NULL) + { + goto return_succp; + } + backend_ref = rses->rses_backend_ref; if (btype == BE_SLAVE) { - while (*b != NULL) - { - if ((*b)->be_dcb != NULL && - SERVER_IS_SLAVE((*b)->backend_server) && + for (i=0; irses_nbackends; i++) + { + BACKEND* b = backend_ref[i].bref_backend; + + if (backend_ref[i].bref_dcb != NULL && + SERVER_IS_SLAVE(b->backend_server) && (smallest_nconn == -1 || - (*b)->backend_conn_count < smallest_nconn)) + b->backend_conn_count < smallest_nconn)) { - *p_dcb = (*b)->be_dcb; - smallest_nconn = (*b)->backend_conn_count; + *p_dcb = backend_ref[i].bref_dcb; + smallest_nconn = b->backend_conn_count; succp = true; } - b++; } ss_dassert(succp); } else if (btype == BE_MASTER || BE_JOINED) { - while (*b != NULL) + for (i=0; irses_nbackends; i++) { - if ((*b)->be_dcb != NULL && - (SERVER_IS_MASTER((*b)->backend_server) || - SERVER_IS_JOINED((*b)->backend_server))) + BACKEND* b = backend_ref[i].bref_backend; + + if (backend_ref[i].bref_dcb != NULL && + (SERVER_IS_MASTER(b->backend_server) || + SERVER_IS_JOINED(b->backend_server))) { - *p_dcb = (*b)->be_dcb; + *p_dcb = backend_ref[i].bref_dcb; succp = true; goto return_succp; } - b++; } } return_succp: @@ -673,23 +697,19 @@ static int routeQuery( void* router_session, GWBUF* querybuf) { - skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN; - GWBUF* plainsqlbuf = NULL; - char* querystr = NULL; + skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN; + GWBUF* plainsqlbuf = NULL; + char* querystr = NULL; char* startpos; unsigned char packet_type; uint8_t* packet; int ret = 0; - DCB* master_dcb = NULL; - DCB* slave_dcb = NULL; + DCB* master_dcb = NULL; + DCB* slave_dcb = NULL; ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session; bool rses_is_closed = false; size_t len; - /** if false everything goes to master and session commands to slave too */ - static bool autocommit_enabled = true; - /** if true everything goes to master and session commands to slave too */ - static bool transaction_active = false; CHK_CLIENT_RSES(router_cli_ses); @@ -719,7 +739,7 @@ static int routeQuery( inst->stats.n_queries++; startpos = (char *)&packet[5]; - master_dcb = router_cli_ses->rses_master->be_dcb; + master_dcb = router_cli_ses->rses_master_ref->bref_dcb; CHK_DCB(master_dcb); switch(packet_type) { @@ -778,36 +798,36 @@ static int routeQuery( * transaction becomes active and master gets all statements until * transaction is committed and autocommit is enabled again. */ - if (autocommit_enabled && + if (router_cli_ses->rses_autocommit_enabled && QUERY_IS_TYPE(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT)) { - autocommit_enabled = false; + router_cli_ses->rses_autocommit_enabled = false; - if (!transaction_active) + if (!router_cli_ses->rses_transaction_active) { - transaction_active = true; + router_cli_ses->rses_transaction_active = true; } } - else if (!transaction_active && + else if (!router_cli_ses->rses_transaction_active && QUERY_IS_TYPE(qtype, QUERY_TYPE_BEGIN_TRX)) { - transaction_active = true; + router_cli_ses->rses_transaction_active = true; } /** * Explicit COMMIT and ROLLBACK, implicit COMMIT. */ - if (autocommit_enabled && - transaction_active && + if (router_cli_ses->rses_autocommit_enabled && + router_cli_ses->rses_transaction_active && (QUERY_IS_TYPE(qtype,QUERY_TYPE_COMMIT) || QUERY_IS_TYPE(qtype,QUERY_TYPE_ROLLBACK))) { - transaction_active = false; + router_cli_ses->rses_transaction_active = false; } - else if (!autocommit_enabled && + else if (!router_cli_ses->rses_autocommit_enabled && QUERY_IS_TYPE(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT)) { - autocommit_enabled = true; - transaction_active = false; + router_cli_ses->rses_autocommit_enabled = true; + router_cli_ses->rses_transaction_active = false; } /** * Session update is always routed in the same way. @@ -829,7 +849,8 @@ static int routeQuery( ss_dassert(ret == 1); goto return_ret; } - else if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) && !transaction_active) + else if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) && + !router_cli_ses->rses_transaction_active) { bool succp; @@ -857,7 +878,7 @@ static int routeQuery( if (LOG_IS_ENABLED(LOGFILE_TRACE)) { - if (transaction_active) /*< all to master */ + if (router_cli_ses->rses_transaction_active) /*< all to master */ { LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, @@ -1026,8 +1047,8 @@ static void clientReply( DCB* client_dcb; ROUTER_CLIENT_SES* router_cli_ses; sescmd_cursor_t* scur = NULL; - backend_type_t be_type; - BACKEND** be; + backend_ref_t* backend_ref; + int i; router_cli_ses = (ROUTER_CLIENT_SES *)router_session; CHK_CLIENT_RSES(router_cli_ses); @@ -1066,30 +1087,29 @@ static void clientReply( /** Log that client was closed before reply */ goto lock_failed; } - be = router_cli_ses->rses_backend; - - while (*be !=NULL) - { - if ((*be)->be_dcb == backend_dcb) - { - be_type = (SERVER_IS_MASTER((*be)->backend_server) ? BE_MASTER : - (SERVER_IS_SLAVE((*be)->backend_server) ? BE_SLAVE : - (SERVER_IS_JOINED((*be)->backend_server) ? BE_JOINED : BE_UNDEFINED))); - break; - } - be++; - } -// LOGIF(LT, tracelog_routed_query(router_cli_ses, -// "reply_by_statement", -// backend_dcb, -// gwbuf_clone(writebuf))); /** Lock router session */ if (!rses_begin_locked_router_action(router_cli_ses)) { /** Log to debug that router was closed */ goto lock_failed; } - scur = &(*be)->be_sescmd_cursor; + backend_ref = router_cli_ses->rses_backend_ref; + + /** find backend_dcb's corresponding BACKEND */ + i = 0; + while (irses_nbackends && + backend_ref[i].bref_dcb != backend_dcb) + { + i++; + } + ss_dassert(backend_ref[i].bref_dcb == backend_dcb); + + LOGIF(LT, tracelog_routed_query(router_cli_ses, + "reply_by_statement", + &backend_ref[i], + gwbuf_clone(writebuf))); + + scur = &backend_ref[i].bref_sescmd_cur; /** * Active cursor means that reply is from session command * execution. Majority of the time there are no session commands @@ -1123,13 +1143,13 @@ lock_failed: } -int backend_cmp( - const void* be_1, - const void* be_2) +int bref_cmp( + const void* bref1, + const void* bref2) { - BACKEND* b1 = *(BACKEND **)be_1; - BACKEND* b2 = *(BACKEND **)be_2; - + BACKEND* b1 = ((backend_ref_t *)bref1)->bref_backend; + BACKEND* b2 = ((backend_ref_t *)bref2)->bref_backend; + return ((b1->backend_conn_count < b2->backend_conn_count) ? -1 : ((b1->backend_conn_count > b2->backend_conn_count) ? 1 : 0)); } @@ -1138,12 +1158,12 @@ int backend_cmp( * @node Search suitable backend servers from those of router instance. * * Parameters: - * @param p_master - in, use, out - * Pointer to location where master's address is to be stored. + * @param p_master_ref - in, use, out + * Pointer to location where master's backend reference is to be stored. * NULL is not allowed. * - * @param b - in, use, out - * Pointer to location where all backend server pointers are stored. + * @param backend_ref - in, use, out + * Pointer to backend server reference object array. * NULL is not allowed. * * @param router_nservers - in, use @@ -1165,63 +1185,85 @@ int backend_cmp( * a router instance. As a result, the first master found is chosen. */ static bool select_connect_backend_servers( - BACKEND** p_master, - BACKEND** b, + backend_ref_t** p_master_ref, + backend_ref_t* backend_ref, int router_nservers, int max_nslaves, SESSION* session, ROUTER_INSTANCE* router) { - bool succp = true; - bool master_found = false; - bool master_connected = false; - int slaves_found = 0; - int slaves_connected = 0; + bool succp = true; + bool master_found = false; + bool master_connected = false; + int slaves_found = 0; + int slaves_connected = 0; + int i; + const int min_nslaves = 0; /*< not configurable at the time */ + bool is_synced_master; + + if (p_master_ref == NULL || backend_ref == NULL) + { + ss_dassert(FALSE); + succp = false; + goto return_succp; + } + + if (router->bitvalue != 0) /*< 'synced' is the only bitvalue in rwsplit */ + { + is_synced_master = true; + } + else + { + is_synced_master = false; + } /** * Sort the pointer list to servers according to connection counts. As * a consequence those backends having least connections are in the * beginning of the list. */ - qsort((void *)b, (size_t)router_nservers, sizeof(void*), backend_cmp); + qsort((void *)backend_ref, (size_t)router_nservers, sizeof(backend_ref_t), bref_cmp); /** * Choose at least 1+1 (master and slave) and at most 1+max_nslaves * servers from the sorted list. First master found is selected. */ - while (*b != NULL && (slaves_connected < max_nslaves || !master_connected)) - { + for (i=0; + ibitvalue is %d", pthread_self(), - (*b)->backend_server->name, - (*b)->backend_server->port, - (*b)->backend_conn_count, - (*b)->backend_server->status, - router->bitmask))); + b->backend_server->name, + b->backend_server->port, + b->backend_conn_count, + b->backend_server->status, + router->bitmask))); - if (SERVER_IS_RUNNING((*b)->backend_server) && - (((*b)->backend_server->status & router->bitmask) == + if (SERVER_IS_RUNNING(b->backend_server) && + ((b->backend_server->status & router->bitmask) == router->bitvalue)) { if (slaves_found < max_nslaves && - SERVER_IS_SLAVE((*b)->backend_server)) + SERVER_IS_SLAVE(b->backend_server)) { slaves_found += 1; - - (*b)->be_dcb = dcb_connect( - (*b)->backend_server, + backend_ref[i].bref_dcb = dcb_connect( + b->backend_server, session, - (*b)->backend_server->protocol); + b->backend_server->protocol); - if ((*b)->be_dcb != NULL) + if (backend_ref[i].bref_dcb != NULL) { slaves_connected += 1; /** Increase backend connection counter */ - atomic_add(&(*b)->backend_conn_count, 1); + atomic_add(&b->backend_conn_count, 1); } else { @@ -1229,22 +1271,22 @@ static bool select_connect_backend_servers( } } else if (!master_connected && - (SERVER_IS_MASTER((*b)->backend_server) || - SERVER_IS_JOINED((*b)->backend_server))) + (SERVER_IS_MASTER(b->backend_server) || + SERVER_IS_JOINED(b->backend_server))) { master_found = true; - (*b)->be_dcb = dcb_connect( - (*b)->backend_server, + backend_ref[i].bref_dcb = dcb_connect( + b->backend_server, session, - (*b)->backend_server->protocol); + b->backend_server->protocol); - if ((*b)->be_dcb != NULL) + if (backend_ref[i].bref_dcb != NULL) { master_connected = true; - *p_master = *b; + *p_master_ref = &backend_ref[i]; /** Increase backend connection counter */ - atomic_add(&(*b)->backend_conn_count, 1); + atomic_add(&b->backend_conn_count, 1); } else { @@ -1252,79 +1294,170 @@ static bool select_connect_backend_servers( } } } - b++; - } /*< while */ + } /*< for */ - if (master_connected && slaves_connected > 0 && slaves_connected <= max_nslaves) + /** + * Successful cases + */ + if (master_connected && + slaves_connected >= min_nslaves && + slaves_connected <= max_nslaves) { succp = true; + + if (slaves_connected == 0 && slaves_found > 0) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Warning : Couldn't connect to any of the %d " + "slaves. Routing to %s only.", + slaves_found, + (is_synced_master ? "Galera nodes" : "Master")))); + + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "* Warning : Couldn't connect to any of the %d " + "slaves. Routing to %s only.", + slaves_found, + (is_synced_master ? "Galera nodes" : "Master")))); + } + else if (slaves_found == 0) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Warning : Couldn't find any slaves from existing " + "%d servers. Routing to %s only.", + router_nservers, + (is_synced_master ? "Galera nodes" : "Master")))); + + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "* Warning : Couldn't find any slaves from existing " + "%d servers. Routing to %s only.", + router_nservers, + (is_synced_master ? "Galera nodes" : "Master")))); + } + else if (slaves_connected < max_nslaves) + { + LOGIF(LT, (skygw_log_write_flush( + LOGFILE_TRACE, + "Note : Couldn't connect to maximum number of " + "slaves. Connected successfully to %d slaves " + "of %d of them.", + slaves_connected, + slaves_found))); + } + + if (LOG_IS_ENABLED(LT)) + { + for (i=0; ibackend_server->name, + b->backend_server->port))); + } + } /* for */ + } } + /** + * Failure cases + */ else - { - /** disconnect and clean up */ + { + if (!master_found) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Error : Couldn't find suitable %s from %d " + "candidates.", + (is_synced_master ? "Galera node" : "Master"), + router_nservers))); + + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "* Error : Couldn't find suitable %s from %d " + "candidates.", + (is_synced_master ? "Galera node" : "Master"), + router_nservers))); + + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Error : Couldn't find suitable %s from %d " + "candidates.", + (is_synced_master ? "Galera node" : "Master"), + router_nservers))); + } + else if (!master_connected) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Error : Couldn't connect to any %s although " + "there exists at least one %s node in the " + "cluster.", + (is_synced_master ? "Galera node" : "Master"), + (is_synced_master ? "Galera node" : "Master")))); + + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "* Error : Couldn't connect to any %s although " + "there exists at least one %s node in the " + "cluster.", + (is_synced_master ? "Galera node" : "Master"), + (is_synced_master ? "Galera node" : "Master")))); + + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Error : Couldn't connect to any %s although " + "there exists at least one %s node in the " + "cluster.", + (is_synced_master ? "Galera node" : "Master"), + (is_synced_master ? "Galera node" : "Master")))); + } + + if (slaves_connected < min_nslaves) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Error : Couldn't establish required amount of " + "slave connections for router session."))); + + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "*Error : Couldn't establish required amount of " + "slave connections for router session."))); + } + + /** Clean up connections */ + for (i=0; ifunc.close(backend_ref[i].bref_dcb); + atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1); + } + } + master_connected = false; + slaves_connected = 0; } -#if 0 - if (router->bitvalue != 0 && - p_master != NULL && - local_backend[BE_JOINED] == NULL) - { - succp = false; - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Couldn't find a Joined Galera node from %d " - "candidates.", - i))); - goto return_succp; - } - - if (p_slave != NULL && local_backend[BE_SLAVE] == NULL) { - succp = false; - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Couldn't find suitable Slave from %d " - "candidates.", - i))); - } - - if (p_master != NULL && local_backend[BE_MASTER] == NULL) { - succp = false; - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Couldn't find suitable Master from %d " - "candidates.", - i))); - } - - if (local_backend[BE_SLAVE] != NULL) { - *p_slave = local_backend[BE_SLAVE]; - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "%lu [readwritesplit:search_backend_servers] Selected " - "Slave %s:%d from %d candidates.", - pthread_self(), - local_backend[BE_SLAVE]->backend_server->name, - local_backend[BE_SLAVE]->backend_server->port, - i))); - } - if (local_backend[BE_MASTER] != NULL) { - *p_master = local_backend[BE_MASTER]; - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "%lu [readwritesplit:search_backend_servers] Selected " - "Master %s:%d " - "from %d candidates.", - pthread_self(), - local_backend[BE_MASTER]->backend_server->name, - local_backend[BE_MASTER]->backend_server->port, - i))); - } -#endif return_succp: + return succp; } - /** * Create a generic router session property strcture. */ @@ -1650,26 +1783,26 @@ static GWBUF* sescmd_cursor_clone_querybuf( * Router session must be locked. */ static bool execute_sescmd_in_backend( - BACKEND* backend) + backend_ref_t* backend_ref) { DCB* dcb; bool succp = true; int rc = 0; sescmd_cursor_t* scur; - if (backend->be_dcb == NULL) + if (backend_ref->bref_dcb == NULL) { goto return_succp; } - dcb = backend->be_dcb; + dcb = backend_ref->bref_dcb; CHK_DCB(dcb); - CHK_BACKEND(backend); + CHK_BACKEND_REF(backend_ref); /** * Get cursor pointer and copy of command buffer to cursor. */ - scur = &backend->be_sescmd_cursor; + scur = &backend_ref->bref_sescmd_cur; /** Return if there are no pending ses commands */ if (sescmd_cursor_get_command(scur) == NULL) @@ -1683,12 +1816,12 @@ static bool execute_sescmd_in_backend( /** Cursor is left active when function returns. */ sescmd_cursor_set_active(scur, true); } - /* - LOGIF(LT, tracelog_routed_query(rses, + + LOGIF(LT, tracelog_routed_query(scur->scmd_cur_rses, "execute_sescmd_in_backend", - dcb, + backend_ref, sescmd_cursor_clone_querybuf(scur))); - */ + switch (scur->scmd_cur_cmd->my_sescmd_packet_type) { case COM_CHANGE_USER: rc = dcb->func.auth( @@ -1800,11 +1933,10 @@ static rses_property_t* mysql_sescmd_get_property( return scmd->my_sescmd_prop; } -#if 0 static void tracelog_routed_query( ROUTER_CLIENT_SES* rses, char* funcname, - DCB* dcb, + backend_ref_t* bref, GWBUF* buf) { uint8_t* packet = GWBUF_DATA(buf); @@ -1813,20 +1945,18 @@ static void tracelog_routed_query( size_t buflen = GWBUF_LENGTH(buf); char* querystr; char* startpos = (char *)&packet[5]; + BACKEND* b; backend_type_t be_type; - - if (rses->rses_dcb[BE_MASTER] == dcb) - { - be_type = BE_MASTER; - } - else if (rses->rses_dcb[BE_SLAVE] == dcb) - { - be_type = BE_SLAVE; - } - else - { - be_type = BE_UNDEFINED; - } + DCB* dcb; + + CHK_BACKEND_REF(bref); + b = bref->bref_backend; + CHK_BACKEND(b); + dcb = bref->bref_dcb; + CHK_DCB(dcb); + + be_type = BACKEND_TYPE(b); + if (GWBUF_TYPE(buf) == GWBUF_TYPE_MYSQL) { len = packet[0]; @@ -1845,23 +1975,16 @@ static void tracelog_routed_query( funcname, buflen, querystr, - (be_type == BE_MASTER ? - rses->rses_backend[BE_MASTER]->backend_server->name : - (be_type == BE_SLAVE ? - rses->rses_backend[BE_SLAVE]->backend_server->name : - "Target DCB is neither of the backends. This is error")), - (be_type == BE_MASTER ? - rses->rses_backend[BE_MASTER]->backend_server->port : - (be_type == BE_SLAVE ? - rses->rses_backend[BE_SLAVE]->backend_server->port : - -1)), + b->backend_server->name, + b->backend_server->port, STRBETYPE(be_type), dcb))); } } gwbuf_free(buf); } -#endif + + /** * Return rc, rc < 0 if router session is closed. rc == 0 if there are no * capabilities specified, rc > 0 when there are capabilities. @@ -1907,8 +2030,9 @@ static bool route_session_write( { bool succp; rses_property_t* prop; - BACKEND** b; - + backend_ref_t* backend_ref; + int i; + LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, "Session write, query type\t%s, packet type %s, " @@ -1916,7 +2040,7 @@ static bool route_session_write( STRQTYPE(qtype), STRPACKETTYPE(packet_type)))); - b = router_cli_ses->rses_backend; + backend_ref = router_cli_ses->rses_backend_ref; /** * COM_QUIT is one-way message. Server doesn't respond to that. @@ -1930,9 +2054,9 @@ static bool route_session_write( succp = true; - while (*b != NULL) + for (i=0; irses_nbackends; i++) { - DCB* dcb = (*b)->be_dcb; + DCB* dcb = backend_ref[i].bref_dcb; if (dcb != NULL) { @@ -1943,7 +2067,6 @@ static bool route_session_write( succp = false; } } - b++; } gwbuf_free(querybuf); goto return_succp; @@ -1965,10 +2088,10 @@ static bool route_session_write( } /** Add sescmd property to router client session */ rses_property_add(router_cli_ses, prop); - - while (*b != NULL) + + for (i=0; irses_nbackends; i++) { - succp = execute_sescmd_in_backend((*b)); + succp = execute_sescmd_in_backend(&backend_ref[i]); if (!succp) { @@ -1976,7 +2099,6 @@ static bool route_session_write( rses_end_locked_router_action(router_cli_ses); goto return_succp; } - b++; } /** Unlock router session */ rses_end_locked_router_action(router_cli_ses); @@ -1986,3 +2108,4 @@ static bool route_session_write( return_succp: return succp; } + diff --git a/server/modules/routing/readwritesplit/test/rwsplit.sh b/server/modules/routing/readwritesplit/test/rwsplit.sh index fc0c56822..d64fcf495 100755 --- a/server/modules/routing/readwritesplit/test/rwsplit.sh +++ b/server/modules/routing/readwritesplit/test/rwsplit.sh @@ -169,6 +169,8 @@ else echo "$TINPUT PASSED">>$TLOG ; fi +# Disable autocommit in the first session and then test in new session that +# it is again enabled. TINPUT=test_autocommit_disabled2.sql TRETVAL=1 a=`$RUNCMD < ./$TINPUT` @@ -178,3 +180,14 @@ else echo "$TINPUT PASSED">>$TLOG ; fi +TINPUT=set_autocommit_disabled.sql +`$RUNCMD < ./$TINPUT` +TINPUT=test_after_autocommit_disabled.sql +TRETVAL=$TMASTER_ID +a=`$RUNCMD < ./$TINPUT` +if [ "$a" == "$TRETVAL" ]; then + echo "$TINPUT FAILED, return value $a when it was not accetable">>$TLOG; +else + echo "$TINPUT PASSED">>$TLOG ; +fi + diff --git a/server/modules/routing/readwritesplit/test/set_autocommit_disabled.sql b/server/modules/routing/readwritesplit/test/set_autocommit_disabled.sql new file mode 100644 index 000000000..a182b4922 --- /dev/null +++ b/server/modules/routing/readwritesplit/test/set_autocommit_disabled.sql @@ -0,0 +1,7 @@ +use test; +drop table if exists t1; +create table t1 (id integer); +set autocommit=0; -- open transaction +begin; +insert into t1 values(1); -- write to master +commit; diff --git a/server/modules/routing/readwritesplit/test/test_after_autocommit_disabled.sql b/server/modules/routing/readwritesplit/test/test_after_autocommit_disabled.sql new file mode 100644 index 000000000..f10c5eb8c --- /dev/null +++ b/server/modules/routing/readwritesplit/test/test_after_autocommit_disabled.sql @@ -0,0 +1,2 @@ +use test; +select @@server_id; \ No newline at end of file diff --git a/server/modules/routing/readwritesplit/test/test_autocommit_disabled3.sql b/server/modules/routing/readwritesplit/test/test_autocommit_disabled3.sql new file mode 100644 index 000000000..04be4024e --- /dev/null +++ b/server/modules/routing/readwritesplit/test/test_autocommit_disabled3.sql @@ -0,0 +1,9 @@ +use test; +drop table if exists t1; +create table t1 (id integer); +set autocommit=0; -- open transaction +begin; +insert into t1 values(1); -- write to master +commit; +select count(*) from t1; -- read from master since autocommit is disabled +drop table t1; diff --git a/utils/skygw_debug.h b/utils/skygw_debug.h index faaf268d4..5d0f9955f 100644 --- a/utils/skygw_debug.h +++ b/utils/skygw_debug.h @@ -121,7 +121,8 @@ typedef enum skygw_chk_t { CHK_NUM_MY_SESCMD, CHK_NUM_ROUTER_PROPERTY, CHK_NUM_SESCMD_CUR, - CHK_NUM_BACKEND + CHK_NUM_BACKEND, + CHK_NUM_BACKEND_REF } skygw_chk_t; # define STRBOOL(b) ((b) ? "true" : "false") @@ -459,7 +460,14 @@ typedef enum skygw_chk_t { (b)->be_chk_tail == CHK_NUM_BACKEND, \ "BACKEND has invalid check fields"); \ } - + +#define CHK_BACKEND_REF(r) { \ + ss_info_dassert((r)->bref_chk_top == CHK_NUM_BACKEND_REF && \ + (r)->bref_chk_tail == CHK_NUM_BACKEND_REF, \ + "Backend reference has invalid check fields"); \ +} + + #if defined(SS_DEBUG) bool conn_open[10240]; #endif