Added support for direct connections to shardrouter.

This commit is contained in:
Markus Makela
2015-03-30 20:45:05 +03:00
parent 97a6bd1b4b
commit e42f171564
2 changed files with 237 additions and 94 deletions

View File

@ -119,7 +119,6 @@ static route_target_t get_shard_route_target(
static uint8_t getCapabilities(ROUTER* inst, void* router_session);
//bool parse_db_ignore_list(ROUTER_INSTANCE* router, char* param);
static void subsvc_clear_state(SUBSERVICE* svc,subsvc_state_t state);
static void subsvc_set_state(SUBSERVICE* svc,subsvc_state_t state);
static bool get_shard_subsvc(SUBSERVICE** subsvc,ROUTER_CLIENT_SES* session,char* target);
@ -414,8 +413,14 @@ bool subsvc_is_valid(SUBSERVICE* sub)
return false;
}
/**
* Map the databases of all subservices.
* @param inst router instance
* @param session router session
* @return 0 on success, 1 on error
*/
int
gen_tablelist(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* session)
gen_subsvc_dblist(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* session)
{
const char* query = "SHOW DATABASES;";
GWBUF *buffer, *clone;
@ -475,7 +480,6 @@ get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client, GWBUF*
if(sz > 0)
{
has_dbs = true;
for(i = 0; i < sz; i++)
{
@ -489,9 +493,8 @@ get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client, GWBUF*
else
{
skygw_log_write(LOGFILE_TRACE,"shardrouter: Query targets database '%s' on server '%s",dbnms[i],rval);
has_dbs = true;
}
for(j = i; j < sz; j++) free(dbnms[j]);
break;
}
free(dbnms[i]);
}
@ -554,6 +557,10 @@ get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client, GWBUF*
*/
rval = (char*) hashtable_fetch(ht, client->rses_mysql_session->db);
if(rval)
{
skygw_log_write(LOGFILE_TRACE,"shardrouter: Using active database '%s'",client->rses_mysql_session->db);
}
}
return rval;
@ -612,64 +619,160 @@ filterReply(FILTER* instance, void *session, GWBUF *reply)
ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES*) instance;
SUBSERVICE* subsvc;
int i, rv = 1;
bool mapped = true;
sescmd_cursor_t* scur;
GWBUF* tmp = NULL;
skygw_log_write_flush(LOGFILE_TRACE,"shardrouter: filterReply mapped: %s",rses->hash_init ? "true" : "false");
if(!rses_begin_locked_router_action(rses))
{
tmp = reply;
while((tmp = gwbuf_consume(tmp,gwbuf_length(tmp))));
return 0;
}
subsvc = get_subsvc_from_ses(rses, session);
if(SUBSVC_IS_WAITING(subsvc))
if(rses->init & INIT_MAPPING)
{
subsvc_clear_state(subsvc, SUBSVC_WAITING_RESULT);
bool mapped = true, logged = false;
int i;
for(i = 0; i < rses->n_subservice; i++)
{
if(subsvc->session == rses->subservice[i]->session &&
!SUBSVC_IS_MAPPED(rses->subservice[i]))
{
rses->subservice[i]->state |= SUBSVC_MAPPED;
parse_mapping_response(rses,
rses->subservice[i]->service->name,
reply);
}
if(SUBSVC_IS_OK(rses->subservice[i]) &&
!SUBSVC_IS_MAPPED(rses->subservice[i]))
{
mapped = false;
if(!logged)
{
/*
skygw_log_write(LOGFILE_DEBUG,"schemarouter: Still waiting for reply to SHOW DATABASES from %s for session %p",
bkrf[i].bref_backend->backend_server->unique_name,
rses->rses_client_dcb->session);
*/
logged = true;
}
}
}
if(mapped)
{
/*
* Check if the session is reconnecting with a database name
* that is not in the hashtable. If the database is not found
* then close the session.
*/
rses->init &= ~INIT_MAPPING;
if(rses->init & INIT_USE_DB)
{
char* target;
if((target = hashtable_fetch(rses->dbhash,
rses->connect_db)) == NULL)
{
skygw_log_write_flush(LOGFILE_TRACE,"schemarouter: Connecting to a non-existent database '%s'",
rses->connect_db);
rses->rses_closed = true;
if(rses->queue)
{
while((rses->queue = gwbuf_consume(
rses->queue,gwbuf_length(rses->queue))));
}
rses_end_locked_router_action(rses);
goto retblock;
}
/* Send a COM_INIT_DB packet to the server with the right database
* and set it as the client's active database */
unsigned int qlen;
GWBUF* buffer;
qlen = strlen(rses->connect_db);
buffer = gwbuf_alloc(qlen + 5);
if(buffer == NULL)
{
skygw_log_write_flush(LOGFILE_ERROR,"Error : Buffer allocation failed.");
rses->rses_closed = true;
if(rses->queue)
gwbuf_free(rses->queue);
goto retblock;
}
gw_mysql_set_byte3((unsigned char*)buffer->start,qlen+1);
gwbuf_set_type(buffer,GWBUF_TYPE_MYSQL);
*((unsigned char*)buffer->start + 3) = 0x0;
*((unsigned char*)buffer->start + 4) = 0x2;
memcpy(buffer->start+5,rses->connect_db,qlen);
DCB* dcb = NULL;
SESSION_ROUTE_QUERY(subsvc->session,buffer);
goto retblock;
}
if(rses->queue)
{
GWBUF* tmp = rses->queue;
rses->queue = rses->queue->next;
tmp->next = NULL;
char* querystr = modutil_get_SQL(tmp);
skygw_log_write(LOGFILE_DEBUG,"schemarouter: Sending queued buffer for session %p: %s",
rses->rses_client_dcb->session,
querystr);
poll_add_epollin_event_to_dcb(rses->routedcb,tmp);
free(querystr);
}
skygw_log_write_flush(LOGFILE_DEBUG,"session [%p] database map finished.",
rses);
}
goto retblock;
}
subsvc_clear_state(subsvc, SUBSVC_QUERY_ACTIVE);
if(!rses->hash_init)
{
subsvc_set_state(subsvc, SUBSVC_MAPPED);
parse_mapping_response(rses, subsvc->service->name, reply);
for(i = 0; i < rses->n_subservice; i++)
{
if(SUBSVC_IS_OK(rses->subservice[i]) && !SUBSVC_IS_MAPPED(rses->subservice[i]))
{
mapped = false;
break;
}
}
gwbuf_free(reply);
if(mapped)
{
rses->hash_init = true;
if(rses->queue)
{
tmp = rses->queue;
rses->queue = rses->queue->next;
}
}
goto retblock;
}
if(rses->queue)
{
tmp = rses->queue;
rses->queue = rses->queue->next;
GWBUF* tmp = rses->queue;
rses->queue = rses->queue->next;
tmp->next = NULL;
char* querystr = modutil_get_SQL(tmp);
skygw_log_write(LOGFILE_DEBUG,"schemarouter: Sending queued buffer for session %p: %s",
rses->rses_client_dcb->session,
querystr);
poll_add_epollin_event_to_dcb(rses->routedcb,tmp);
free(querystr);
tmp = NULL;
}
if(rses->init & INIT_USE_DB)
{
skygw_log_write(LOGFILE_DEBUG,"schemarouter: Reply to USE '%s' received for session %p",
rses->connect_db,
rses->rses_client_dcb->session);
rses->init &= ~INIT_USE_DB;
strcpy(rses->rses_mysql_session->db,rses->connect_db);
ss_dassert(rses->init == INIT_READY);
if(reply)
{
tmp = reply;
while((tmp = gwbuf_consume(tmp,gwbuf_length(tmp))));
tmp = NULL;
}
goto retblock;
}
scur = subsvc->scur;
@ -688,12 +791,8 @@ filterReply(FILTER* instance, void *session, GWBUF *reply)
rv = SESSION_ROUTE_REPLY(rses->session, reply);
retblock:
rses_end_locked_router_action(rses);
if(tmp)
{
poll_add_epollin_event_to_dcb(rses->queue_dcb,tmp);
}
retblock:
rses_end_locked_router_action(rses);
return rv;
}
@ -876,6 +975,14 @@ createInstance(SERVICE *service, char **options)
tok = strtok(services, ",");
*/
if(options == NULL)
{
free(router);
skygw_log_write(LOGFILE_ERROR, "Error : No 'subservice' router option found. Shardrouter requires at least %d "
"configured services listed in the 'subservices' router option to work.", min_nsvc);
return NULL;
}
while(options[i])
{
if(sz <= i)
@ -896,6 +1003,13 @@ createInstance(SERVICE *service, char **options)
}
res_svc[i] = service_find(options[i]);
if(res_svc[i] == NULL)
{
free(res_svc);
free(router);
skygw_log_write(LOGFILE_ERROR, "Error : No service named '%s' found.", options[i]);
return NULL;
}
i++;
}
/*
@ -907,7 +1021,7 @@ createInstance(SERVICE *service, char **options)
if(i < min_nsvc)
{
skygw_log_write(LOGFILE_ERROR, "Error : Not enough services. Shardrouter requires at least %d "
skygw_log_write(LOGFILE_ERROR, "Error : Not enough parameters for 'subservice' router option. Shardrouter requires at least %d "
"configured services to work.", min_nsvc);
free(router->services);
free(router);
@ -980,15 +1094,15 @@ newSession(
client_rses->rses_autocommit_enabled = true;
client_rses->rses_transaction_active = false;
client_rses->session = session;
client_rses->dummy_dcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER);
client_rses->dummy_dcb->func.read = fakeReply;
client_rses->dummy_dcb->state = DCB_STATE_POLLING;
client_rses->dummy_dcb->session = session;
client_rses->replydcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER);
client_rses->replydcb->func.read = fakeReply;
client_rses->replydcb->state = DCB_STATE_POLLING;
client_rses->replydcb->session = session;
client_rses->queue_dcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER);
client_rses->queue_dcb->func.read = fakeQuery;
client_rses->queue_dcb->state = DCB_STATE_POLLING;
client_rses->queue_dcb->session = session;
client_rses->routedcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER);
client_rses->routedcb->func.read = fakeQuery;
client_rses->routedcb->state = DCB_STATE_POLLING;
client_rses->routedcb->session = session;
spinlock_init(&client_rses->rses_lock);
@ -1168,10 +1282,10 @@ closeSession(
}
router_cli_ses->subservice[i]->state = SUBSVC_CLOSED;
}
router_cli_ses->dummy_dcb->session = NULL;
router_cli_ses->queue_dcb->session = NULL;
dcb_close(router_cli_ses->dummy_dcb);
dcb_close(router_cli_ses->queue_dcb);
router_cli_ses->replydcb->session = NULL;
router_cli_ses->routedcb->session = NULL;
dcb_close(router_cli_ses->replydcb);
dcb_close(router_cli_ses->routedcb);
/** Unlock */
rses_end_locked_router_action(router_cli_ses);
@ -1480,32 +1594,46 @@ routeQuery(ROUTER* instance,
ret = 0;
goto retblock;
}
if(!router_cli_ses->hash_init)
{
gen_tablelist(inst, router_cli_ses);
skygw_log_write(LOGFILE_TRACE,"shardrouter: got a query while mapping databases.");
GWBUF* tmp = router_cli_ses->queue;
while(tmp && tmp->next)
if(!(rses_is_closed = router_cli_ses->rses_closed))
{
tmp = tmp->next;
if(router_cli_ses->init & INIT_UNINT)
{
/* Generate database list */
gen_subsvc_dblist(inst,router_cli_ses);
}
if(router_cli_ses->init & INIT_MAPPING)
{
char* querystr = modutil_get_SQL(querybuf);
skygw_log_write(LOGFILE_DEBUG,"shardrouter: Storing query for session %p: %s",
router_cli_ses->rses_client_dcb->session,
querystr);
free(querystr);
gwbuf_make_contiguous(querybuf);
GWBUF* ptr = router_cli_ses->queue;
while(ptr && ptr->next)
{
ptr = ptr->next;
}
if(ptr == NULL)
{
router_cli_ses->queue = querybuf;
}
else
{
ptr->next = querybuf;
}
rses_end_locked_router_action(router_cli_ses);
return 1;
}
}
if(tmp == NULL)
{
router_cli_ses->queue = querybuf;
}
else
{
tmp->next = querybuf;
}
rses_end_locked_router_action(router_cli_ses);
return 1;
}
rses_end_locked_router_action(router_cli_ses);
packet = GWBUF_DATA(querybuf);
@ -1608,7 +1736,7 @@ routeQuery(ROUTER* instance,
*/
GWBUF* dbres = gen_show_dbs_response(inst,router_cli_ses);
poll_add_epollin_event_to_dcb(router_cli_ses->dummy_dcb,dbres);
poll_add_epollin_event_to_dcb(router_cli_ses->replydcb,dbres);
ret = 1;
goto retblock;
}
@ -2982,7 +3110,7 @@ reply_error:
* Create an incoming event for randomly selected backend DCB which
* will then be notified and replied 'back' to the client.
*/
poll_add_epollin_event_to_dcb(rses->dummy_dcb,
poll_add_epollin_event_to_dcb(rses->replydcb,
gwbuf_clone(errbuf));
gwbuf_free(errbuf);
}