diff --git a/server/modules/monitor/galera_mon.c b/server/modules/monitor/galera_mon.c index 61d7cd15b..46b8bf4bc 100644 --- a/server/modules/monitor/galera_mon.c +++ b/server/modules/monitor/galera_mon.c @@ -542,6 +542,12 @@ int log_no_members = 1; ptr->server->port, STRSRVSTATUS(ptr->server)))); } + + if (!(SERVER_IS_RUNNING(ptr->server)) || + !(SERVER_IS_IN_CLUSTER(ptr->server))) + { + dcb_call_foreach(DCB_REASON_NOT_RESPONDING); + } if (SERVER_IS_DOWN(ptr->server)) { diff --git a/server/modules/routing/dbshard/dbshard.c b/server/modules/routing/dbshard/dbshard.c index e41439d84..e8d61dbd6 100644 --- a/server/modules/routing/dbshard/dbshard.c +++ b/server/modules/routing/dbshard/dbshard.c @@ -44,6 +44,7 @@ MODULE_INFO info = { #if defined(SS_DEBUG) # include + #endif /** Defined in log_manager.cc */ @@ -278,7 +279,7 @@ static void* hfree(void* fval) bool parse_showdb_response(ROUTER_CLIENT_SES* rses, char* target, GWBUF* buf) { - int rval = 0; + int rval = 0,i; RESULTSET* rset; RSET_ROW* row; @@ -292,7 +293,29 @@ bool parse_showdb_response(ROUTER_CLIENT_SES* rses, char* target, GWBUF* buf) while(row) { - hashtable_add(rses->dbhash,row->data[0],target); + if(hashtable_add(rses->dbhash,row->data[0],target)) + { + skygw_log_write(LOGFILE_TRACE,"dbshard: <%s, %s>",target,row->data[0]); + } + else + { + char* oldval = strdup(hashtable_fetch(rses->dbhash,row->data[0])); + + for(i=0;irses_nbackends;i++) + { + if(strcmp(oldval,rses->rses_backend_ref[i].bref_backend->backend_server->unique_name) == 0 && + BREF_IS_CLOSED(&rses->rses_backend_ref[i])) + { + hashtable_delete(rses->dbhash,row->data[0]); + hashtable_add(rses->dbhash,row->data[0],target); + skygw_log_write(LOGFILE_TRACE,"dbshard: <%s, %s> (replaced %s)",target,row->data[0],oldval); + free(oldval); + oldval = NULL; + break; + } + } + free(oldval); + } row = row->next; } resultset_free(rset); @@ -303,7 +326,7 @@ bool parse_showdb_response(ROUTER_CLIENT_SES* rses, char* target, GWBUF* buf) return rval; } -int gen_tablelist(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* session) +int gen_databaselist(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* session) { DCB* dcb; const char* query = "SHOW DATABASES;"; @@ -324,15 +347,18 @@ int gen_tablelist(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* session) *((unsigned char*)buffer->start + 4) = 0x03; memcpy(buffer->start + 5,query,strlen(query)); - for(i = 0;irses_nbackends;i++) { clone = gwbuf_clone(buffer); dcb = backends[i].bref_dcb; - if(BREF_IS_IN_USE(&backends[i])) + if(BREF_IS_IN_USE(&backends[i]) && !BREF_IS_CLOSED(&backends[i])) { rval = dcb->func.write(dcb,clone); - } + } + else + { + gwbuf_free(clone); + } } return !rval; @@ -347,6 +373,7 @@ int gen_tablelist(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* session) * @return True if all database and server names were successfully retrieved * otherwise false */ +/* bool update_dbnames_hash(ROUTER_INSTANCE* inst,BACKEND** backends, HASHTABLE* hashtable) { const unsigned int connect_timeout = 1; @@ -408,7 +435,7 @@ bool update_dbnames_hash(ROUTER_INSTANCE* inst,BACKEND** backends, HASHTABLE* ha rval = false; goto cleanup; } - /** Plain-text password used for authentication for now */ + // Plain-text password used for authentication for now user = server->monuser; pwd = server->monpw; @@ -432,9 +459,9 @@ bool update_dbnames_hash(ROUTER_INSTANCE* inst,BACKEND** backends, HASHTABLE* ha rval = false; goto cleanup; } - /** - * The server was successfully connected to, proceed to query for database names - */ + + //The server was successfully connected to, proceed to query for database names + if((result = mysql_list_dbs(handle,NULL)) == NULL) { @@ -456,11 +483,13 @@ bool update_dbnames_hash(ROUTER_INSTANCE* inst,BACKEND** backends, HASHTABLE* ha server->name))); goto cleanup; } + */ /** * Walk through the list of databases in this backend * and insert them into the hashtable. If the value is already in the hashtable * but the backend isn't in the list of backends it is replaced with the first found backend. */ +/* while((row = mysql_fetch_row(result))) { unsigned long *lengths; @@ -471,12 +500,6 @@ bool update_dbnames_hash(ROUTER_INSTANCE* inst,BACKEND** backends, HASHTABLE* ha dbnm = (char*)calloc(lengths[0] + 1,sizeof(char)); memcpy(dbnm,row[0],lengths[0]); - /*if(is_ignored_database(inst,dbnm)) - { - free(dbnm); - continue; - }*/ - servnm = strdup(server->unique_name); if(hashtable_add(hashtable,dbnm,servnm) == 0) @@ -511,11 +534,13 @@ bool update_dbnames_hash(ROUTER_INSTANCE* inst,BACKEND** backends, HASHTABLE* ha for(j = 0;backends[j];j++) { + * */ /** * See if the old backend is still * alive. If not then update * the hashtable with the current backend's name. */ +/* if(strcmp(backends[j]->backend_server->unique_name,old_backend) == 0 && SERVER_IS_RUNNING(backends[j]->backend_server)) { @@ -544,8 +569,8 @@ bool update_dbnames_hash(ROUTER_INSTANCE* inst,BACKEND** backends, HASHTABLE* ha } } } - } /*< hashtable_add failed */ - } /*< while */ + } + } cleanup: if(result) @@ -554,42 +579,11 @@ cleanup: } result = NULL; mysql_close(handle); - } /*< for */ + } return rval; } - -/** - * Allocates a new hashtable and inserts database names and where to find them - * into it. - * @param inst Router instance - * @param backends Backends to query for database names - * @return Pointer to the newly allocated hashtable or NULL if an error occurred - */ -void* dbnames_hash_init(ROUTER_INSTANCE* inst,BACKEND** backends) -{ - HASHTABLE* htbl = hashtable_alloc(512,hashkeyfun,hashcmpfun); - - if(htbl == NULL) - { - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error: hashtable allocation failed."))); - return NULL; - } - /**Update the new hashtable with the key-value pairs*/ - if(!update_dbnames_hash(inst,backends,htbl)) - { - /** - * Log if there were some errors during the database configuration. - */ - - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Warning : Errors occurred while resolving shard locations."))); - } - return htbl; -} +*/ /** * Check the hashtable for the right backend for this query. @@ -602,7 +596,7 @@ char* get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client, HASHTABLE* ht = client->dbhash; int sz = 0,i,j; char** dbnms = NULL; - char* rval = NULL; + char* rval = NULL,*query, *tmp = NULL; bool has_dbs = false; /**If the query targets any database other than the current one*/ if(!query_is_parsed(buffer)){ @@ -615,7 +609,8 @@ char* get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client, has_dbs = true; for(i = 0; i < sz; i++){ - if((rval = (char*)hashtable_fetch(ht,dbnms[i]))){ + if((rval = (char*)hashtable_fetch(ht,dbnms[i]))){ + skygw_log_write(LOGFILE_TRACE,"dbshard: Query targets specific database (%s)",rval); for(j = i;j < sz;j++) free(dbnms[j]); break; } @@ -624,6 +619,59 @@ char* get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client, free(dbnms); } + /* Check if the query is a show tables query with a specific database */ + + if(QUERY_IS_TYPE(qtype, QUERY_TYPE_SHOW_TABLES)) + { + query = modutil_get_SQL(buffer); + if((tmp = strstr(query,"from"))) + { + char* tok = strtok(tmp, " ;"); + tok = strtok(NULL," ;"); + ss_dassert(tok != NULL); + tmp = (char*) hashtable_fetch(ht, tok); + } + free(query); + + if(tmp == NULL) + { + rval = (char*) hashtable_fetch(ht, client->rses_mysql_session->db); + } + else + { + rval = tmp; + has_dbs = true; + skygw_log_write(LOGFILE_TRACE,"dbshard: SHOW TABLES with specific database (%s)", tmp); + } + } + + if(buffer->hint && buffer->hint->type == HINT_ROUTE_TO_NAMED_SERVER) + { + for(i = 0; i < client->rses_nbackends; i++) + { + + char *srvnm = client->rses_backend_ref[i].bref_backend->backend_server->unique_name; + if(strcmp(srvnm,buffer->hint->data) == 0) + { + rval = srvnm; + skygw_log_write(LOGFILE_TRACE,"dbshard: Routing hint found (%s)",srvnm); + + } + } + } + + if(rval == NULL && !has_dbs && client->rses_mysql_session->db[0] != '\0') + { + /** + * If the query contains no explicitly stated databases proceed to + * check if the session has an active database and if it is sharded. + */ + + rval = (char*) hashtable_fetch(ht, client->rses_mysql_session->db); + } + + + /** * If the query contains no explicitly stated databases proceed to * check if the session has an active database and if it is sharded. @@ -782,7 +830,7 @@ static void refreshInstance( } /** - * Create an instance of dbshard statement router within the MaxScale. + * Create an instance of dbshard router within the MaxScale. * * * @param service The service this router is being create for @@ -929,13 +977,7 @@ static void* newSession( bool succp; int router_nservers = 0; /*< # of servers in total */ int i; -#if 0 - /** - * It could be possibe to accept new session if some of the servers are - * not reachable - */ - const int min_nservers = 1; /*< hard-coded for now */ -#endif + client_rses = (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES)); if (client_rses == NULL) @@ -960,7 +1002,6 @@ static void* newSession( spinlock_release(&router->lock); /** * Set defaults to session variables. - * ??? tarvitaanko */ client_rses->rses_autocommit_enabled = true; client_rses->rses_transaction_active = false; @@ -1028,6 +1069,12 @@ static void* newSession( router_nservers, session, router); + + client_rses->dbhash = hashtable_alloc(100, hashkeyfun, hashcmpfun); + hashtable_memory_fns(client_rses->dbhash,(HASHMEMORYFN)strdup, + (HASHMEMORYFN)strdup, + (HASHMEMORYFN)free, + (HASHMEMORYFN)free); rses_end_locked_router_action(client_rses); @@ -1046,6 +1093,18 @@ static void* newSession( client_rses->rses_nbackends = router_nservers; /*< # of backend servers */ router->stats.n_sessions += 1; + if (!(succp = rses_begin_locked_router_action(client_rses))) + { + free(client_rses->rses_backend_ref); + free(client_rses); + + client_rses = NULL; + goto return_rses; + } + + /* Generate database list */ + gen_databaselist(router,client_rses); + rses_end_locked_router_action(client_rses); /** * Version is bigger than zero once initialized. */ @@ -1766,15 +1825,9 @@ static int routeQuery( } ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf)); - if(router_cli_ses->dbhash == NULL && !router_cli_ses->hash_init) + if(!router_cli_ses->hash_init) { - router_cli_ses->queue = querybuf; - router_cli_ses->dbhash = hashtable_alloc(7, hashkeyfun, hashcmpfun); - hashtable_memory_fns(router_cli_ses->dbhash,(HASHMEMORYFN)strdup, - (HASHMEMORYFN)strdup, - (HASHMEMORYFN)free, - (HASHMEMORYFN)free); - gen_tablelist(inst,router_cli_ses); + router_cli_ses->queue = querybuf; return 1; } packet = GWBUF_DATA(querybuf); @@ -1941,22 +1994,20 @@ static int routeQuery( backend_ref_t* backend = NULL; DCB* backend_dcb = NULL; - //update_dbnames_hash(inst,inst->servers,inst->dbnames_hash); - for(i = 0;i < router_cli_ses->rses_nbackends;i++) { if(SERVER_IS_RUNNING(router_cli_ses->rses_backend_ref[i].bref_backend->backend_server)) { backend = &router_cli_ses->rses_backend_ref[i]; backend_dcb = backend->bref_dcb; - break; + break; } } if(backend) { - GWBUF* fake = gen_show_dbs_response(inst,router_cli_ses); - poll_add_epollin_event_to_dcb(backend_dcb,fake); + GWBUF* fake = gen_show_dbs_response(inst,router_cli_ses); + poll_add_epollin_event_to_dcb(backend_dcb,fake); ret = 1; } else @@ -2396,6 +2447,51 @@ static void clientReply ( } #endif + if(!router_cli_ses->hash_init) + { + bool mapped = true; + int i; + backend_ref_t* bkrf = router_cli_ses->rses_backend_ref; + + for(i = 0; i < router_cli_ses->rses_nbackends; i++) + { + + if(bref->bref_dcb == bkrf[i].bref_dcb) + { + router_cli_ses->rses_backend_ref[i].bref_mapped = true; + parse_showdb_response(router_cli_ses, + router_cli_ses->rses_backend_ref[i].bref_backend->backend_server->unique_name, + writebuf); + skygw_log_write_flush(LOGFILE_DEBUG,"session [%p] server '%s' databases mapped.", + router_cli_ses, + bref->bref_backend->backend_server->unique_name); + } + + if(BREF_IS_IN_USE(&bkrf[i]) && + !BREF_IS_MAPPED(&bkrf[i])) + { + mapped = false; + } + } + + gwbuf_free(writebuf); + rses_end_locked_router_action(router_cli_ses); + + if(mapped) + { + router_cli_ses->hash_init = true; + if(router_cli_ses->queue) + { + routeQuery(instance,router_session,router_cli_ses->queue); + router_cli_ses->queue = NULL; + } + skygw_log_write_flush(LOGFILE_DEBUG,"session [%p] database map finished.", + router_cli_ses); + } + + return; + } + CHK_BACKEND_REF(bref); scur = &bref->bref_sescmd_cur; /** @@ -2464,52 +2560,7 @@ static void clientReply ( bref_clear_state(bref, BREF_WAITING_RESULT); } - if(!router_cli_ses->hash_init) - { - bool mapped = true; - int i; - backend_ref_t* bkrf = router_cli_ses->rses_backend_ref; - - for(i = 0; i < router_cli_ses->rses_nbackends; i++) - { - - if(bref->bref_dcb == bkrf[i].bref_dcb) - { - router_cli_ses->rses_backend_ref[i].bref_mapped = true; - parse_showdb_response(router_cli_ses, - router_cli_ses->rses_backend_ref[i].bref_backend->backend_server->unique_name, - writebuf); - skygw_log_write_flush(LOGFILE_DEBUG,"session [%p] server '%s' databases mapped.", - router_cli_ses, - bref->bref_backend->backend_server->unique_name); - } - - if(BREF_IS_IN_USE(&bkrf[i]) && - !BREF_IS_MAPPED(&bkrf[i])) - { - mapped = false; - skygw_log_write_flush(LOGFILE_DEBUG,"session [%p] server '%s' databases not yet mapped.", - router_cli_ses, - bkrf[i].bref_backend->backend_server->unique_name); - //break; - } - } - - gwbuf_free(writebuf); - rses_end_locked_router_action(router_cli_ses); - - if(mapped) - { - router_cli_ses->hash_init = true; - routeQuery(instance,router_session,router_cli_ses->queue); - router_cli_ses->queue = NULL; - skygw_log_write_flush(LOGFILE_DEBUG,"session [%p] database map finished.", - router_cli_ses); - } - - return; - } - else if (writebuf != NULL && client_dcb != NULL) + if (writebuf != NULL && client_dcb != NULL) { /** Write reply to client DCB */ SESSION_ROUTE_REPLY(backend_dcb->session, writebuf); @@ -2770,7 +2821,8 @@ static bool connect_backend_servers( router->bitvalue)) { servers_found += 1; - + + /** Server is already connected */ if (BREF_IS_IN_USE((&backend_ref[i]))) { @@ -2812,6 +2864,11 @@ static bool connect_backend_servers( * of dcb_close. */ atomic_add(&b->backend_conn_count, 1); + + dcb_add_callback(backend_ref[i].bref_dcb, + DCB_REASON_NOT_RESPONDING, + &router_handle_state_switch, + (void *)&backend_ref[i]); } else { @@ -3717,8 +3774,12 @@ static void handleError ( SESSION* session; ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES *)router_session; - + CHK_DCB(backend_dcb); + if(succp == NULL || action == ERRACT_RESET) + { + return; + } /** Don't handle same error twice on same DCB */ if (backend_dcb->dcb_errhandle_called) { @@ -3809,6 +3870,22 @@ static void handle_error_reply_client( } } +bool have_servers(ROUTER_CLIENT_SES* rses) +{ + int i; + + for(i=0;irses_nbackends;i++) + { + if(BREF_IS_IN_USE(&rses->rses_backend_ref[i]) && + !BREF_IS_CLOSED(&rses->rses_backend_ref[i])) + { + return true; + } + } + + return false; +} + /** * Check if there is backend reference pointing at failed DCB, and reset its * flags. Then clear DCB's callback and finally try to reconnect. @@ -3829,7 +3906,7 @@ static bool handle_error_new_connection( GWBUF* errmsg) { SESSION* ses; - int router_nservers; + int router_nservers,i; backend_ref_t* bref; @@ -3896,6 +3973,22 @@ static bool handle_error_new_connection( router_nservers, ses, inst); + + if(!have_servers(rses)) + { + skygw_log_write(LOGFILE_ERROR,"Error : No more valid servers, closing session"); + succp = false; + goto return_succp; + } + + rses->hash_init = false; + for(i = 0;irses_nbackends;i++) + { + bref_clear_state(&rses->rses_backend_ref[i],BREF_DB_MAPPED); + } + + skygw_log_write(LOGFILE_TRACE,"dbshard: Re-mapping databases"); + gen_databaselist(rses->router,rses); return_succp: return succp; @@ -4037,7 +4130,7 @@ static int router_handle_state_switch( switch (reason) { case DCB_REASON_NOT_RESPONDING: - dcb->func.hangup(dcb); + dcb->func.hangup(dcb); break; default: @@ -4080,7 +4173,7 @@ static bool change_current_db( bool succp; uint8_t* packet; unsigned int plen; - int message_len,i; + int message_len; char* fail_str; if(GWBUF_LENGTH(buf) <= MYSQL_DATABASE_MAXLEN - 5) diff --git a/server/modules/routing/dbshard/shardrouter.c b/server/modules/routing/dbshard/shardrouter.c index 6eee8e78a..46cb46b2c 100644 --- a/server/modules/routing/dbshard/shardrouter.c +++ b/server/modules/routing/dbshard/shardrouter.c @@ -275,7 +275,10 @@ parse_mapping_response(ROUTER_CLIENT_SES* rses, char* target, GWBUF* buf) while(row) { - hashtable_add(rses->dbhash, row->data[0], target); + if(hashtable_add(rses->dbhash, row->data[0], target)) + { + skygw_log_write(LOGFILE_TRACE,"shardrouter: <%s, %s>",target,row->data[0]); + } row = row->next; } resultset_free(rset);