Merge branch 'develop' into plainrouter

This commit is contained in:
Markus Makela
2015-03-31 12:56:29 +03:00
18 changed files with 779 additions and 309 deletions

View File

@ -89,6 +89,8 @@
#include <mysql_client_server_protocol.h>
#include "modutil.h"
/** Defined in log_manager.cc */
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
@ -535,6 +537,7 @@ BACKEND *master_host = NULL;
pthread_self(),
candidate->server->port,
candidate->current_connection_count)));
/*
* Open a backend connection, putting the DCB for this
* connection in the client_rses->backend_dcb
@ -564,7 +567,13 @@ BACKEND *master_host = NULL;
spinlock_release(&inst->lock);
CHK_CLIENT_RSES(client_rses);
skygw_log_write(
LOGFILE_TRACE,
"Readconnroute: New session for server %s. "
"Connections : %d",
candidate->server->unique_name,
candidate->current_connection_count);
return (void *)client_rses;
}
@ -718,10 +727,19 @@ routeQuery(ROUTER *instance, void *router_session, GWBUF *queue)
"Error : Failed to route MySQL command %d to backend "
"server.",
mysql_command)));
skygw_log_write(
LOGFILE_ERROR,
"Error : Failed to route MySQL command %d to backend "
"server %s.",
mysql_command,
router_cli_ses->backend->server->unique_name);
rc = 0;
goto return_rc;
}
char* trc = NULL;
switch(mysql_command) {
case MYSQL_COM_CHANGE_USER:
rc = backend_dcb->func.auth(
@ -730,7 +748,8 @@ routeQuery(ROUTER *instance, void *router_session, GWBUF *queue)
backend_dcb->session,
queue);
break;
case MYSQL_COM_QUERY:
LOGIF(LOGFILE_TRACE,(trc = modutil_get_SQL(queue)));
default:
rc = backend_dcb->func.write(backend_dcb, queue);
break;
@ -745,6 +764,15 @@ routeQuery(ROUTER *instance, void *router_session, GWBUF *queue)
mysql_command,
backend_dcb,
rc)));
LOGIF(LOGFILE_TRACE,skygw_log_write(
LOGFILE_TRACE,
"Routed command [%#x] to '%s'%s%s",
mysql_command,
backend_dcb->server->unique_name,
trc?": ":".",
trc?trc:""));
free(trc);
return_rc:
return rc;
}

View File

@ -402,7 +402,7 @@ int gen_databaselist(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* session)
rval);
}
}
gwbuf_free(buffer);
return !rval;
}
@ -514,6 +514,10 @@ char* get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client,
*/
rval = (char*) hashtable_fetch(ht, client->rses_mysql_session->db);
if(rval)
{
skygw_log_write(LOGFILE_TRACE,"schemarouter: Using active database '%s'",client->rses_mysql_session->db);
}
}
return rval;
@ -1932,6 +1936,7 @@ static int routeQuery(
*/
route_target = TARGET_ANY;
skygw_log_write(LOGFILE_TRACE,"schemarouter: Routing query to first available backend.");
}
else
@ -2008,6 +2013,9 @@ static int routeQuery(
/**No valid backends alive*/
skygw_log_write(LOGFILE_TRACE,"schemarouter: No backends are running");
skygw_log_write(LOGFILE_ERROR,
"Error: Schemarouter: Failed to route query, "
"no backends are available.");
rses_end_locked_router_action(router_cli_ses);
ret = 0;
goto retblock;
@ -2215,6 +2223,7 @@ static void clientReply (
*/
if (!rses_begin_locked_router_action(router_cli_ses))
{
while((writebuf = gwbuf_consume(writebuf,gwbuf_length(writebuf))));
goto lock_failed;
}
/** Holding lock ensures that router session remains open */
@ -2244,6 +2253,7 @@ static void clientReply (
if (!rses_begin_locked_router_action(router_cli_ses))
{
/** Log to debug that router was closed */
while((writebuf = gwbuf_consume(writebuf,gwbuf_length(writebuf))));
goto lock_failed;
}
bref = get_bref_from_dcb(router_cli_ses, backend_dcb);
@ -2252,6 +2262,7 @@ static void clientReply (
{
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
while((writebuf = gwbuf_consume(writebuf,gwbuf_length(writebuf))));
goto lock_failed;
}
@ -2299,7 +2310,7 @@ static void clientReply (
}
}
gwbuf_free(writebuf);
while((writebuf = gwbuf_consume(writebuf,gwbuf_length(writebuf))));
if(mapped)
{
@ -2322,7 +2333,10 @@ static void clientReply (
router_cli_ses->connect_db);
router_cli_ses->rses_closed = true;
if(router_cli_ses->queue)
gwbuf_free(router_cli_ses->queue);
{
while((router_cli_ses->queue = gwbuf_consume(
router_cli_ses->queue,gwbuf_length(router_cli_ses->queue))));
}
rses_end_locked_router_action(router_cli_ses);
return;
}
@ -2376,7 +2390,7 @@ static void clientReply (
{
GWBUF* tmp = router_cli_ses->queue;
router_cli_ses->queue = router_cli_ses->queue->next;
tmp->next = NULL;
tmp->next = NULL;
char* querystr = modutil_get_SQL(tmp);
skygw_log_write(LOGFILE_DEBUG,"schemarouter: Sending queued buffer for session %p: %s",
router_cli_ses->rses_client_dcb->session,
@ -2416,6 +2430,8 @@ static void clientReply (
strcpy(router_cli_ses->rses_mysql_session->db,router_cli_ses->connect_db);
ss_dassert(router_cli_ses->init == INIT_READY);
rses_end_locked_router_action(router_cli_ses);
if(writebuf)
while((writebuf = gwbuf_consume(writebuf,gwbuf_length(writebuf))));
return;
}

View File

@ -119,7 +119,6 @@ static route_target_t get_shard_route_target(
static uint8_t getCapabilities(ROUTER* inst, void* router_session);
//bool parse_db_ignore_list(ROUTER_INSTANCE* router, char* param);
static void subsvc_clear_state(SUBSERVICE* svc,subsvc_state_t state);
static void subsvc_set_state(SUBSERVICE* svc,subsvc_state_t state);
static bool get_shard_subsvc(SUBSERVICE** subsvc,ROUTER_CLIENT_SES* session,char* target);
@ -414,8 +413,14 @@ bool subsvc_is_valid(SUBSERVICE* sub)
return false;
}
/**
* Map the databases of all subservices.
* @param inst router instance
* @param session router session
* @return 0 on success, 1 on error
*/
int
gen_tablelist(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* session)
gen_subsvc_dblist(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* session)
{
const char* query = "SHOW DATABASES;";
GWBUF *buffer, *clone;
@ -475,7 +480,6 @@ get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client, GWBUF*
if(sz > 0)
{
has_dbs = true;
for(i = 0; i < sz; i++)
{
@ -489,9 +493,8 @@ get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client, GWBUF*
else
{
skygw_log_write(LOGFILE_TRACE,"shardrouter: Query targets database '%s' on server '%s",dbnms[i],rval);
has_dbs = true;
}
for(j = i; j < sz; j++) free(dbnms[j]);
break;
}
free(dbnms[i]);
}
@ -554,6 +557,10 @@ get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client, GWBUF*
*/
rval = (char*) hashtable_fetch(ht, client->rses_mysql_session->db);
if(rval)
{
skygw_log_write(LOGFILE_TRACE,"shardrouter: Using active database '%s'",client->rses_mysql_session->db);
}
}
return rval;
@ -612,64 +619,160 @@ filterReply(FILTER* instance, void *session, GWBUF *reply)
ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES*) instance;
SUBSERVICE* subsvc;
int i, rv = 1;
bool mapped = true;
sescmd_cursor_t* scur;
GWBUF* tmp = NULL;
skygw_log_write_flush(LOGFILE_TRACE,"shardrouter: filterReply mapped: %s",rses->hash_init ? "true" : "false");
if(!rses_begin_locked_router_action(rses))
{
tmp = reply;
while((tmp = gwbuf_consume(tmp,gwbuf_length(tmp))));
return 0;
}
subsvc = get_subsvc_from_ses(rses, session);
if(SUBSVC_IS_WAITING(subsvc))
if(rses->init & INIT_MAPPING)
{
subsvc_clear_state(subsvc, SUBSVC_WAITING_RESULT);
bool mapped = true, logged = false;
int i;
for(i = 0; i < rses->n_subservice; i++)
{
if(subsvc->session == rses->subservice[i]->session &&
!SUBSVC_IS_MAPPED(rses->subservice[i]))
{
rses->subservice[i]->state |= SUBSVC_MAPPED;
parse_mapping_response(rses,
rses->subservice[i]->service->name,
reply);
}
if(SUBSVC_IS_OK(rses->subservice[i]) &&
!SUBSVC_IS_MAPPED(rses->subservice[i]))
{
mapped = false;
if(!logged)
{
/*
skygw_log_write(LOGFILE_DEBUG,"schemarouter: Still waiting for reply to SHOW DATABASES from %s for session %p",
bkrf[i].bref_backend->backend_server->unique_name,
rses->rses_client_dcb->session);
*/
logged = true;
}
}
}
if(mapped)
{
/*
* Check if the session is reconnecting with a database name
* that is not in the hashtable. If the database is not found
* then close the session.
*/
rses->init &= ~INIT_MAPPING;
if(rses->init & INIT_USE_DB)
{
char* target;
if((target = hashtable_fetch(rses->dbhash,
rses->connect_db)) == NULL)
{
skygw_log_write_flush(LOGFILE_TRACE,"schemarouter: Connecting to a non-existent database '%s'",
rses->connect_db);
rses->rses_closed = true;
if(rses->queue)
{
while((rses->queue = gwbuf_consume(
rses->queue,gwbuf_length(rses->queue))));
}
rses_end_locked_router_action(rses);
goto retblock;
}
/* Send a COM_INIT_DB packet to the server with the right database
* and set it as the client's active database */
unsigned int qlen;
GWBUF* buffer;
qlen = strlen(rses->connect_db);
buffer = gwbuf_alloc(qlen + 5);
if(buffer == NULL)
{
skygw_log_write_flush(LOGFILE_ERROR,"Error : Buffer allocation failed.");
rses->rses_closed = true;
if(rses->queue)
gwbuf_free(rses->queue);
goto retblock;
}
gw_mysql_set_byte3((unsigned char*)buffer->start,qlen+1);
gwbuf_set_type(buffer,GWBUF_TYPE_MYSQL);
*((unsigned char*)buffer->start + 3) = 0x0;
*((unsigned char*)buffer->start + 4) = 0x2;
memcpy(buffer->start+5,rses->connect_db,qlen);
DCB* dcb = NULL;
SESSION_ROUTE_QUERY(subsvc->session,buffer);
goto retblock;
}
if(rses->queue)
{
GWBUF* tmp = rses->queue;
rses->queue = rses->queue->next;
tmp->next = NULL;
char* querystr = modutil_get_SQL(tmp);
skygw_log_write(LOGFILE_DEBUG,"schemarouter: Sending queued buffer for session %p: %s",
rses->rses_client_dcb->session,
querystr);
poll_add_epollin_event_to_dcb(rses->routedcb,tmp);
free(querystr);
}
skygw_log_write_flush(LOGFILE_DEBUG,"session [%p] database map finished.",
rses);
}
goto retblock;
}
subsvc_clear_state(subsvc, SUBSVC_QUERY_ACTIVE);
if(!rses->hash_init)
{
subsvc_set_state(subsvc, SUBSVC_MAPPED);
parse_mapping_response(rses, subsvc->service->name, reply);
for(i = 0; i < rses->n_subservice; i++)
{
if(SUBSVC_IS_OK(rses->subservice[i]) && !SUBSVC_IS_MAPPED(rses->subservice[i]))
{
mapped = false;
break;
}
}
gwbuf_free(reply);
if(mapped)
{
rses->hash_init = true;
if(rses->queue)
{
tmp = rses->queue;
rses->queue = rses->queue->next;
}
}
goto retblock;
}
if(rses->queue)
{
tmp = rses->queue;
rses->queue = rses->queue->next;
GWBUF* tmp = rses->queue;
rses->queue = rses->queue->next;
tmp->next = NULL;
char* querystr = modutil_get_SQL(tmp);
skygw_log_write(LOGFILE_DEBUG,"schemarouter: Sending queued buffer for session %p: %s",
rses->rses_client_dcb->session,
querystr);
poll_add_epollin_event_to_dcb(rses->routedcb,tmp);
free(querystr);
tmp = NULL;
}
if(rses->init & INIT_USE_DB)
{
skygw_log_write(LOGFILE_DEBUG,"schemarouter: Reply to USE '%s' received for session %p",
rses->connect_db,
rses->rses_client_dcb->session);
rses->init &= ~INIT_USE_DB;
strcpy(rses->rses_mysql_session->db,rses->connect_db);
ss_dassert(rses->init == INIT_READY);
if(reply)
{
tmp = reply;
while((tmp = gwbuf_consume(tmp,gwbuf_length(tmp))));
tmp = NULL;
}
goto retblock;
}
scur = subsvc->scur;
@ -688,12 +791,8 @@ filterReply(FILTER* instance, void *session, GWBUF *reply)
rv = SESSION_ROUTE_REPLY(rses->session, reply);
retblock:
rses_end_locked_router_action(rses);
if(tmp)
{
poll_add_epollin_event_to_dcb(rses->queue_dcb,tmp);
}
retblock:
rses_end_locked_router_action(rses);
return rv;
}
@ -843,7 +942,7 @@ static ROUTER *
createInstance(SERVICE *service, char **options)
{
ROUTER_INSTANCE* router;
char *services, *tok;
char *services, *tok, *saveptr;
SERVICE **res_svc, **temp;
CONFIG_PARAMETER* conf;
int i = 0, sz;
@ -858,7 +957,6 @@ createInstance(SERVICE *service, char **options)
spinlock_init(&router->lock);
conf = config_get_param(service->svc_config_param, "subservices");
/*
if(conf == NULL)
{
@ -869,14 +967,20 @@ createInstance(SERVICE *service, char **options)
}
services = strdup(conf->value);
*/
sz = 2;
res_svc = calloc(sz, sizeof(SERVICE*));
/*
tok = strtok(services, ",");
*/
while(options[i])
if((res_svc = calloc(sz, sizeof(SERVICE*))) == NULL)
{
free(router);
skygw_log_write(LOGFILE_ERROR,"Error: Memory allocation failed.");
return NULL;
}
tok = strtok_r(services, ",",&saveptr);
while(tok)
{
if(sz <= i)
{
@ -884,9 +988,9 @@ createInstance(SERVICE *service, char **options)
if(temp == NULL)
{
skygw_log_write(LOGFILE_ERROR, "Error : Memory reallocation failed.");
skygw_log_write(LOGFILE_DEBUG, "shardrouter.c: realloc returned NULL. "
LOGIF(LD,(skygw_log_write(LOGFILE_DEBUG, "shardrouter.c: realloc returned NULL. "
"service count[%d] buffer size [%u] tried to allocate [%u]",
sz, sizeof(SERVICE*)*(sz), sizeof(SERVICE*)*(sz * 2));
sz, sizeof(SERVICE*)*(sz), sizeof(SERVICE*)*(sz * 2))));
free(res_svc);
free(router);
return NULL;
@ -895,19 +999,27 @@ createInstance(SERVICE *service, char **options)
res_svc = temp;
}
res_svc[i] = service_find(options[i]);
res_svc[i] = service_find(tok);
if(res_svc[i] == NULL)
{
free(res_svc);
free(router);
skygw_log_write(LOGFILE_ERROR, "Error : No service named '%s' found.", options[i]);
return NULL;
}
i++;
tok = strtok_r(NULL,",",&saveptr);
}
/*
free(services);
*/
router->services = res_svc;
router->n_services = i;
if(i < min_nsvc)
{
skygw_log_write(LOGFILE_ERROR, "Error : Not enough services. Shardrouter requires at least %d "
skygw_log_write(LOGFILE_ERROR, "Error : Not enough parameters for 'subservice' router option. Shardrouter requires at least %d "
"configured services to work.", min_nsvc);
free(router->services);
free(router);
@ -980,15 +1092,15 @@ newSession(
client_rses->rses_autocommit_enabled = true;
client_rses->rses_transaction_active = false;
client_rses->session = session;
client_rses->dummy_dcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER);
client_rses->dummy_dcb->func.read = fakeReply;
client_rses->dummy_dcb->state = DCB_STATE_POLLING;
client_rses->dummy_dcb->session = session;
client_rses->replydcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER);
client_rses->replydcb->func.read = fakeReply;
client_rses->replydcb->state = DCB_STATE_POLLING;
client_rses->replydcb->session = session;
client_rses->queue_dcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER);
client_rses->queue_dcb->func.read = fakeQuery;
client_rses->queue_dcb->state = DCB_STATE_POLLING;
client_rses->queue_dcb->session = session;
client_rses->routedcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER);
client_rses->routedcb->func.read = fakeQuery;
client_rses->routedcb->state = DCB_STATE_POLLING;
client_rses->routedcb->session = session;
spinlock_init(&client_rses->rses_lock);
@ -1168,10 +1280,10 @@ closeSession(
}
router_cli_ses->subservice[i]->state = SUBSVC_CLOSED;
}
router_cli_ses->dummy_dcb->session = NULL;
router_cli_ses->queue_dcb->session = NULL;
dcb_close(router_cli_ses->dummy_dcb);
dcb_close(router_cli_ses->queue_dcb);
router_cli_ses->replydcb->session = NULL;
router_cli_ses->routedcb->session = NULL;
dcb_close(router_cli_ses->replydcb);
dcb_close(router_cli_ses->routedcb);
/** Unlock */
rses_end_locked_router_action(router_cli_ses);
@ -1480,32 +1592,46 @@ routeQuery(ROUTER* instance,
ret = 0;
goto retblock;
}
if(!router_cli_ses->hash_init)
{
gen_tablelist(inst, router_cli_ses);
skygw_log_write(LOGFILE_TRACE,"shardrouter: got a query while mapping databases.");
GWBUF* tmp = router_cli_ses->queue;
while(tmp && tmp->next)
if(!(rses_is_closed = router_cli_ses->rses_closed))
{
tmp = tmp->next;
if(router_cli_ses->init & INIT_UNINT)
{
/* Generate database list */
gen_subsvc_dblist(inst,router_cli_ses);
}
if(router_cli_ses->init & INIT_MAPPING)
{
char* querystr = modutil_get_SQL(querybuf);
skygw_log_write(LOGFILE_DEBUG,"shardrouter: Storing query for session %p: %s",
router_cli_ses->rses_client_dcb->session,
querystr);
free(querystr);
gwbuf_make_contiguous(querybuf);
GWBUF* ptr = router_cli_ses->queue;
while(ptr && ptr->next)
{
ptr = ptr->next;
}
if(ptr == NULL)
{
router_cli_ses->queue = querybuf;
}
else
{
ptr->next = querybuf;
}
rses_end_locked_router_action(router_cli_ses);
return 1;
}
}
if(tmp == NULL)
{
router_cli_ses->queue = querybuf;
}
else
{
tmp->next = querybuf;
}
rses_end_locked_router_action(router_cli_ses);
return 1;
}
rses_end_locked_router_action(router_cli_ses);
packet = GWBUF_DATA(querybuf);
@ -1608,7 +1734,7 @@ routeQuery(ROUTER* instance,
*/
GWBUF* dbres = gen_show_dbs_response(inst,router_cli_ses);
poll_add_epollin_event_to_dcb(router_cli_ses->dummy_dcb,dbres);
poll_add_epollin_event_to_dcb(router_cli_ses->replydcb,dbres);
ret = 1;
goto retblock;
}
@ -2982,7 +3108,7 @@ reply_error:
* Create an incoming event for randomly selected backend DCB which
* will then be notified and replied 'back' to the client.
*/
poll_add_epollin_event_to_dcb(rses->dummy_dcb,
poll_add_epollin_event_to_dcb(rses->replydcb,
gwbuf_clone(errbuf));
gwbuf_free(errbuf);
}