Fixed SHOW TABLES FROM ... queries being routed to the active database instead of the one defined in the query.

This commit is contained in:
Markus Makela
2015-01-31 12:38:39 +02:00
parent a69e1d5f01
commit d9ab0261b9

View File

@ -35,6 +35,7 @@
#include <modutil.h> #include <modutil.h>
#include <mysql_client_server_protocol.h> #include <mysql_client_server_protocol.h>
MODULE_INFO info = { MODULE_INFO info = {
MODULE_API_ROUTER, MODULE_API_ROUTER,
MODULE_BETA_RELEASE, MODULE_BETA_RELEASE,
@ -303,7 +304,8 @@ bool subsvc_is_valid(SUBSERVICE* sub)
spinlock_release(&sub->service->spin); spinlock_release(&sub->service->spin);
if(ses_state == SESSION_STATE_ROUTER_READY && if(ses_state == SESSION_STATE_ROUTER_READY &&
svc_state == SERVICE_STATE_STARTED) (svc_state != SERVICE_STATE_FAILED ||
svc_state != SERVICE_STATE_STOPPED))
{ {
return true; return true;
} }
@ -359,7 +361,8 @@ get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client, GWBUF*
HASHTABLE* ht = client->dbhash; HASHTABLE* ht = client->dbhash;
int sz = 0, i, j; int sz = 0, i, j;
char** dbnms = NULL; char** dbnms = NULL;
char* rval = NULL; char *rval = NULL;
char *query = NULL,*tmp = NULL;
bool has_dbs = false; /**If the query targets any database other than the current one*/ bool has_dbs = false; /**If the query targets any database other than the current one*/
if(!query_is_parsed(buffer)) if(!query_is_parsed(buffer))
@ -385,14 +388,35 @@ get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client, GWBUF*
free(dbnms); free(dbnms);
} }
if(QUERY_IS_TYPE(qtype, QUERY_TYPE_SHOW_TABLES))
{
query = modutil_get_SQL(buffer);
if((tmp = strstr(query,"from")))
{
char* tok = strtok(tmp, " ;");
tok = strtok(NULL," ;");
ss_dassert(tok != NULL);
tmp = (char*) hashtable_fetch(ht, tok);
}
free(query);
if(tmp == NULL)
{
rval = (char*) hashtable_fetch(ht, client->rses_mysql_session->db);
}
else
{
rval = tmp;
}
}
if(rval == NULL && !has_dbs && client->rses_mysql_session->db[0] != '\0')
{
/** /**
* If the query contains no explicitly stated databases proceed to * If the query contains no explicitly stated databases proceed to
* check if the session has an active database and if it is sharded. * check if the session has an active database and if it is sharded.
*/ */
if(QUERY_IS_TYPE(qtype, QUERY_TYPE_SHOW_TABLES) ||
(rval == NULL && !has_dbs && client->rses_mysql_session->db[0] != '\0'))
{
rval = (char*) hashtable_fetch(ht, client->rses_mysql_session->db); rval = (char*) hashtable_fetch(ht, client->rses_mysql_session->db);
} }
@ -446,36 +470,37 @@ tokenize_string(char* str)
* @return returns 1 for success and 0 for error * @return returns 1 for success and 0 for error
*/ */
static int static int
filterReply (FILTER* instance, void *session, GWBUF *reply) filterReply(FILTER* instance, void *session, GWBUF *reply)
{ {
ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES*)instance; ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES*) instance;
SUBSERVICE* subsvc; SUBSERVICE* subsvc;
int i,rv = 1; int i, rv = 1;
bool mapped = true; bool mapped = true;
sescmd_cursor_t* scur; sescmd_cursor_t* scur;
GWBUF* tmp = NULL; GWBUF* tmp = NULL;
if(!rses_begin_locked_router_action(rses)) if(!rses_begin_locked_router_action(rses))
{ {
return 0; return 0;
} }
subsvc = get_subsvc_from_ses(rses,session); subsvc = get_subsvc_from_ses(rses, session);
if(SUBSVC_IS_WAITING(subsvc)) if(SUBSVC_IS_WAITING(subsvc))
{ {
subsvc_clear_state(subsvc,SUBSVC_WAITING_RESULT); subsvc_clear_state(subsvc, SUBSVC_WAITING_RESULT);
} }
subsvc_clear_state(subsvc,SUBSVC_QUERY_ACTIVE); subsvc_clear_state(subsvc, SUBSVC_QUERY_ACTIVE);
if(!rses->hash_init) if(!rses->hash_init)
{ {
subsvc_set_state(subsvc,SUBSVC_MAPPED); subsvc_set_state(subsvc, SUBSVC_MAPPED);
parse_mapping_response(rses,subsvc->service->name,reply); parse_mapping_response(rses, subsvc->service->name, reply);
for(i = 0;i<rses->n_subservice;i++) for(i = 0; i < rses->n_subservice; i++)
{ {
if(SUBSVC_IS_OK(rses->subservice[i]) && !SUBSVC_IS_MAPPED(rses->subservice[i])) if(SUBSVC_IS_OK(rses->subservice[i]) && !SUBSVC_IS_MAPPED(rses->subservice[i]))
{ {
@ -490,20 +515,32 @@ filterReply (FILTER* instance, void *session, GWBUF *reply)
if(mapped) if(mapped)
{ {
rses->hash_init = true; rses->hash_init = true;
if(rses->queue)
{
tmp = rses->queue; tmp = rses->queue;
rses->queue = rses->queue->next;
}
} }
goto retblock; goto retblock;
} }
if(rses->queue)
{
tmp = rses->queue;
rses->queue = rses->queue->next;
}
scur = subsvc->scur; scur = subsvc->scur;
if(sescmd_cursor_is_active(scur)) if(sescmd_cursor_is_active(scur))
{ {
if(!sescmd_cursor_next(scur)) if(!sescmd_cursor_next(scur))
{ {
sescmd_cursor_set_active(scur,false); sescmd_cursor_set_active(scur, false);
} }
else else
{ {
@ -514,11 +551,11 @@ filterReply (FILTER* instance, void *session, GWBUF *reply)
rv = SESSION_ROUTE_REPLY(rses->session, reply); rv = SESSION_ROUTE_REPLY(rses->session, reply);
retblock: retblock:
rses_end_locked_router_action(rses); rses_end_locked_router_action(rses);
if(tmp) if(tmp)
{ {
routeQuery((ROUTER*)rses->router,rses,tmp); rv = routeQuery((ROUTER*) rses->router, rses, tmp);
} }
return rv; return rv;
} }
@ -1262,12 +1299,40 @@ routeQuery(ROUTER* instance,
} }
ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf)); ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf));
/** Lock router session */
if(!rses_begin_locked_router_action(router_cli_ses))
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Route query aborted! Routing session is closed <")));
ret = 0;
goto retblock;
}
if(!router_cli_ses->hash_init) if(!router_cli_ses->hash_init)
{
GWBUF* tmp = router_cli_ses->queue;
while(tmp && tmp->next)
{
tmp = tmp->next;
}
if(tmp == NULL)
{ {
router_cli_ses->queue = querybuf; router_cli_ses->queue = querybuf;
}
else
{
tmp->next = querybuf;
}
rses_end_locked_router_action(router_cli_ses);
return 1; return 1;
} }
rses_end_locked_router_action(router_cli_ses);
packet = GWBUF_DATA(querybuf); packet = GWBUF_DATA(querybuf);
packet_type = packet[4]; packet_type = packet[4];
@ -1419,28 +1484,9 @@ routeQuery(ROUTER* instance,
else if(route_target != TARGET_ALL && else if(route_target != TARGET_ALL &&
(tname = get_shard_target_name(inst, router_cli_ses, querybuf, qtype)) != NULL) (tname = get_shard_target_name(inst, router_cli_ses, querybuf, qtype)) != NULL)
{ {
bool shard_ok = true;
if(shard_ok)
{
route_target = TARGET_NAMED_SERVER; route_target = TARGET_NAMED_SERVER;
} }
else
{
/**
* Shard is not a viable target right now so we check
* for an alternate backend with the database. If this is not found
* the target is undefined and an error will be returned to the client.
*/
if((tname = get_shard_target_name(inst, router_cli_ses, querybuf, qtype)) != NULL)
{
route_target = TARGET_NAMED_SERVER;
}
}
}
if(TARGET_IS_UNDEFINED(route_target)) if(TARGET_IS_UNDEFINED(route_target))
{ {