diff --git a/server/modules/routing/dbshard/shardrouter.c b/server/modules/routing/dbshard/shardrouter.c index 9b32ccec2..0784fdb16 100644 --- a/server/modules/routing/dbshard/shardrouter.c +++ b/server/modules/routing/dbshard/shardrouter.c @@ -35,6 +35,7 @@ #include #include + MODULE_INFO info = { MODULE_API_ROUTER, MODULE_BETA_RELEASE, @@ -303,7 +304,8 @@ bool subsvc_is_valid(SUBSERVICE* sub) spinlock_release(&sub->service->spin); if(ses_state == SESSION_STATE_ROUTER_READY && - svc_state == SERVICE_STATE_STARTED) + (svc_state != SERVICE_STATE_FAILED || + svc_state != SERVICE_STATE_STOPPED)) { return true; } @@ -359,7 +361,8 @@ get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client, GWBUF* HASHTABLE* ht = client->dbhash; int sz = 0, i, j; char** dbnms = NULL; - char* rval = NULL; + char *rval = NULL; + char *query = NULL,*tmp = NULL; bool has_dbs = false; /**If the query targets any database other than the current one*/ if(!query_is_parsed(buffer)) @@ -385,17 +388,38 @@ get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client, GWBUF* free(dbnms); } - /** - * If the query contains no explicitly stated databases proceed to - * check if the session has an active database and if it is sharded. - */ - - if(QUERY_IS_TYPE(qtype, QUERY_TYPE_SHOW_TABLES) || - (rval == NULL && !has_dbs && client->rses_mysql_session->db[0] != '\0')) + if(QUERY_IS_TYPE(qtype, QUERY_TYPE_SHOW_TABLES)) { + query = modutil_get_SQL(buffer); + if((tmp = strstr(query,"from"))) + { + char* tok = strtok(tmp, " ;"); + tok = strtok(NULL," ;"); + ss_dassert(tok != NULL); + tmp = (char*) hashtable_fetch(ht, tok); + } + free(query); + + if(tmp == NULL) + { + rval = (char*) hashtable_fetch(ht, client->rses_mysql_session->db); + } + else + { + rval = tmp; + } + } + + if(rval == NULL && !has_dbs && client->rses_mysql_session->db[0] != '\0') + { + /** + * If the query contains no explicitly stated databases proceed to + * check if the session has an active database and if it is sharded. + */ + rval = (char*) hashtable_fetch(ht, client->rses_mysql_session->db); } - + return rval; } @@ -446,79 +470,92 @@ tokenize_string(char* str) * @return returns 1 for success and 0 for error */ static int -filterReply (FILTER* instance, void *session, GWBUF *reply) +filterReply(FILTER* instance, void *session, GWBUF *reply) { - ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES*)instance; + ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES*) instance; SUBSERVICE* subsvc; - int i,rv = 1; + int i, rv = 1; bool mapped = true; sescmd_cursor_t* scur; GWBUF* tmp = NULL; - + + if(!rses_begin_locked_router_action(rses)) { return 0; } - - subsvc = get_subsvc_from_ses(rses,session); - + + subsvc = get_subsvc_from_ses(rses, session); + if(SUBSVC_IS_WAITING(subsvc)) { - subsvc_clear_state(subsvc,SUBSVC_WAITING_RESULT); + subsvc_clear_state(subsvc, SUBSVC_WAITING_RESULT); } - - subsvc_clear_state(subsvc,SUBSVC_QUERY_ACTIVE); - + + subsvc_clear_state(subsvc, SUBSVC_QUERY_ACTIVE); + if(!rses->hash_init) { - subsvc_set_state(subsvc,SUBSVC_MAPPED); - parse_mapping_response(rses,subsvc->service->name,reply); - - for(i = 0;in_subservice;i++) + subsvc_set_state(subsvc, SUBSVC_MAPPED); + parse_mapping_response(rses, subsvc->service->name, reply); + + for(i = 0; i < rses->n_subservice; i++) { if(SUBSVC_IS_OK(rses->subservice[i]) && !SUBSVC_IS_MAPPED(rses->subservice[i])) { mapped = false; break; } - + } - + gwbuf_free(reply); - + if(mapped) { rses->hash_init = true; - tmp = rses->queue; + if(rses->queue) + { + tmp = rses->queue; + rses->queue = rses->queue->next; + } } - - goto retblock; + + goto retblock; } - - - scur = subsvc->scur; - - if(sescmd_cursor_is_active(scur)) + + + + if(rses->queue) + { + tmp = rses->queue; + rses->queue = rses->queue->next; + } + + + scur = subsvc->scur; + + if(sescmd_cursor_is_active(scur)) + { + if(!sescmd_cursor_next(scur)) { - if(!sescmd_cursor_next(scur)) - { - sescmd_cursor_set_active(scur,false); - } - else - { - execute_sescmd_in_backend(subsvc); - goto retblock; - } + sescmd_cursor_set_active(scur, false); } - + else + { + execute_sescmd_in_backend(subsvc); + goto retblock; + } + } + rv = SESSION_ROUTE_REPLY(rses->session, reply); - - retblock: + +retblock: rses_end_locked_router_action(rses); if(tmp) { - routeQuery((ROUTER*)rses->router,rses,tmp); + rv = routeQuery((ROUTER*) rses->router, rses, tmp); } return rv; } @@ -1262,12 +1299,40 @@ routeQuery(ROUTER* instance, } ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf)); + /** Lock router session */ + if(!rses_begin_locked_router_action(router_cli_ses)) + { + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Route query aborted! Routing session is closed <"))); + ret = 0; + goto retblock; + } + if(!router_cli_ses->hash_init) { - router_cli_ses->queue = querybuf; + GWBUF* tmp = router_cli_ses->queue; + + while(tmp && tmp->next) + { + tmp = tmp->next; + } + + if(tmp == NULL) + { + router_cli_ses->queue = querybuf; + } + else + { + tmp->next = querybuf; + } + + rses_end_locked_router_action(router_cli_ses); return 1; } + rses_end_locked_router_action(router_cli_ses); + packet = GWBUF_DATA(querybuf); packet_type = packet[4]; @@ -1345,7 +1410,7 @@ routeQuery(ROUTER* instance, default: break; } /**< switch by packet type */ - + if(packet_type == MYSQL_COM_INIT_DB) { if(!(change_successful = change_current_db(inst, router_cli_ses, querybuf))) @@ -1419,27 +1484,8 @@ routeQuery(ROUTER* instance, else if(route_target != TARGET_ALL && (tname = get_shard_target_name(inst, router_cli_ses, querybuf, qtype)) != NULL) { - bool shard_ok = true; - - if(shard_ok) - { - route_target = TARGET_NAMED_SERVER; - } - else - { - - /** - * Shard is not a viable target right now so we check - * for an alternate backend with the database. If this is not found - * the target is undefined and an error will be returned to the client. - */ - - if((tname = get_shard_target_name(inst, router_cli_ses, querybuf, qtype)) != NULL) - { - route_target = TARGET_NAMED_SERVER; - } - } + route_target = TARGET_NAMED_SERVER; } if(TARGET_IS_UNDEFINED(route_target))