Cleaned up the clientReply function of the schemarouter.

This commit is contained in:
Markus Makela
2015-09-19 17:21:08 +03:00
parent bff6db96a1
commit af2bc91383
2 changed files with 447 additions and 432 deletions

View File

@ -214,6 +214,12 @@ int process_show_shards(ROUTER_CLIENT_SES* rses);
static int hashkeyfun(void* key);
static int hashcmpfun (void *, void *);
void write_error_to_client(DCB* dcb, int errnum, const char* mysqlstate, const char* errmsg);
int inspect_backend_mapping_states(ROUTER_CLIENT_SES *router_cli_ses,
backend_ref_t *bref,
GWBUF** wbuf);
bool handle_default_db(ROUTER_CLIENT_SES *router_cli_ses);
void route_queued_query(ROUTER_CLIENT_SES *router_cli_ses);
static int hashkeyfun(void* key)
{
if(key == NULL){
@ -2151,18 +2157,11 @@ static int routeQuery(
{
sprintf(errbuf + strlen(errbuf)," ([%lu]: DB change failed)",router_cli_ses->rses_client_dcb->session->ses_id);
}
GWBUF* error = modutil_create_mysql_err_msg(1, 0, SCHEMA_ERR_DBNOTFOUND,
SCHEMA_ERRSTR_DBNOTFOUND, errbuf);
if (error == NULL)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Creating buffer for error message failed.")));
return 0;
}
/** Set flags that help router to identify session commands reply */
router_cli_ses->rses_client_dcb->func.write(router_cli_ses->rses_client_dcb,error);
write_error_to_client(router_cli_ses->rses_client_dcb,
SCHEMA_ERR_DBNOTFOUND,
SCHEMA_ERRSTR_DBNOTFOUND,
errbuf);
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
@ -2555,8 +2554,7 @@ diagnostic(ROUTER *instance, DCB *dcb)
* @param backend_dcb The backend DCB
* @param queue The GWBUF with reply data
*/
static void clientReply (
ROUTER* instance,
static void clientReply(ROUTER* instance,
void* router_session,
GWBUF* buffer,
DCB* backend_dcb)
@ -2577,7 +2575,7 @@ static void clientReply (
if (!rses_begin_locked_router_action(router_cli_ses))
{
while ((writebuf = gwbuf_consume(writebuf, gwbuf_length(writebuf))));
goto lock_failed;
return;
}
/** Holding lock ensures that router session remains open */
ss_dassert(backend_dcb->session != NULL);
@ -2585,30 +2583,13 @@ static void clientReply (
/** Unlock */
rses_end_locked_router_action(router_cli_ses);
/**
* 1. Check if backend received reply to sescmd.
* 2. Check sescmd's state whether OK_PACKET has been
* sent to client already and if not, lock property cursor,
* reply to client, and move property cursor forward. Finally
* release the lock.
* 3. If reply for this sescmd is sent, lock property cursor
* and
*/
if (client_dcb == NULL)
if (client_dcb == NULL || !rses_begin_locked_router_action(router_cli_ses))
{
while ((writebuf = gwbuf_consume(
writebuf,
GWBUF_LENGTH(writebuf))) != NULL);
/** Log that client was closed before reply */
goto lock_failed;
}
/** Lock router session */
if (!rses_begin_locked_router_action(router_cli_ses))
{
/** Log to debug that router was closed */
while ((writebuf = gwbuf_consume(writebuf, gwbuf_length(writebuf))));
goto lock_failed;
return;
}
bref = get_bref_from_dcb(router_cli_ses, backend_dcb);
if (bref == NULL)
@ -2616,7 +2597,7 @@ static void clientReply (
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
while ((writebuf = gwbuf_consume(writebuf, gwbuf_length(writebuf))));
goto lock_failed;
return;
}
skygw_log_write(LOGFILE_DEBUG, "schemarouter: Reply from [%s] session [%p]"
@ -2631,212 +2612,51 @@ static void clientReply (
if (router_cli_ses->init & INIT_MAPPING)
{
bool mapped = true, logged = false;
int i;
backend_ref_t* bkrf = router_cli_ses->rses_backend_ref;
int rc = inspect_backend_mapping_states(router_cli_ses, bref, &writebuf);
while (writebuf && (writebuf = gwbuf_consume(writebuf, gwbuf_length(writebuf))));
for(i = 0; i < router_cli_ses->rses_nbackends; i++)
{
if(bref->bref_dcb == bkrf[i].bref_dcb && !BREF_IS_MAPPED(&bkrf[i]))
{
if (bref->map_queue)
{
writebuf = gwbuf_append(bref->map_queue, writebuf);
bref->map_queue = NULL;
}
int rc = parse_showdb_response(router_cli_ses,
&router_cli_ses->rses_backend_ref[i],
&writebuf);
if (rc == 1)
{
router_cli_ses->rses_backend_ref[i].bref_mapped = true;
skygw_log_write(LOGFILE_DEBUG, "schemarouter: Received SHOW DATABASES reply from %s for session %p",
router_cli_ses->rses_backend_ref[i].bref_backend->backend_server->unique_name,
router_cli_ses->rses_client_dcb->session);
}
else if (rc == 0)
{
bref->map_queue = writebuf;
writebuf = NULL;
skygw_log_write(LOGFILE_DEBUG, "schemarouter: Received partial SHOW DATABASES reply from %s for session %p",
router_cli_ses->rses_backend_ref[i].bref_backend->backend_server->unique_name,
router_cli_ses->rses_client_dcb->session);
}
else
{
while (writebuf && (writebuf = gwbuf_consume(writebuf, gwbuf_length(writebuf))));
DCB* client_dcb = NULL;
if((router_cli_ses->init & INIT_FAILED) == 0)
{
skygw_log_write(LE, "Error: Duplicate databases found, closing session.");
client_dcb = router_cli_ses->rses_client_dcb;
/** This is the first response to the database mapping which
* has duplicate database conflict. Set the initialization bitmask
* to INIT_FAILED */
router_cli_ses->init |= INIT_FAILED;
/** Send the client an error about duplicate databases
* if there is a queued query from the client. */
if (router_cli_ses->queue)
{
GWBUF* error = modutil_create_mysql_err_msg(1, 0,
SCHEMA_ERR_DUPLICATEDB, SCHEMA_ERRSTR_DUPLICATEDB,
"Error: duplicate databases found on two different shards.");
if (error)
{
client_dcb->func.write(client_dcb, error);
}
else
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Error : Creating buffer for error message failed.")));
}
}
}
rses_end_locked_router_action(router_cli_ses);
if(client_dcb)
dcb_close(client_dcb);
return;
}
}
if(BREF_IS_IN_USE(&bkrf[i]) &&
!BREF_IS_MAPPED(&bkrf[i]))
{
mapped = false;
if(!logged)
{
skygw_log_write(LOGFILE_DEBUG,"schemarouter: Still waiting for reply to SHOW DATABASES from %s for session %p",
bkrf[i].bref_backend->backend_server->unique_name,
router_cli_ses->rses_client_dcb->session);
logged = true;
}
}
}
while(writebuf && (writebuf = gwbuf_consume(writebuf,gwbuf_length(writebuf))));
if(mapped)
{
/*
* Check if the session is reconnecting with a database name
* that is not in the hashtable. If the database is not found
* then close the session.
*/
router_cli_ses->init &= ~INIT_MAPPING;
if (router_cli_ses->init & INIT_USE_DB)
{
char* target;
if((target = hashtable_fetch(router_cli_ses->dbhash,
router_cli_ses->connect_db)) == NULL)
{
/** Unknown database, hang up on the client*/
skygw_log_write_flush(LOGFILE_TRACE,"schemarouter: Connecting to a non-existent database '%s'",
router_cli_ses->connect_db);
char errmsg[128 + MYSQL_DATABASE_MAXLEN+1];
sprintf(errmsg,"Unknown database '%s'",router_cli_ses->connect_db);
if(router_cli_ses->rses_config.debug)
{
sprintf(errmsg + strlen(errmsg)," ([%lu]: DB not found on connect)",router_cli_ses->rses_client_dcb->session->ses_id);
}
GWBUF* errbuff = modutil_create_mysql_err_msg(1, 0, SCHEMA_ERR_DBNOTFOUND,
SCHEMA_ERRSTR_DBNOTFOUND, errmsg);
router_cli_ses->rses_client_dcb->func.write(router_cli_ses->rses_client_dcb,errbuff);
if(router_cli_ses->queue)
{
while((router_cli_ses->queue = gwbuf_consume(
router_cli_ses->queue,gwbuf_length(router_cli_ses->queue))));
}
bool success = handle_default_db(router_cli_ses);
rses_end_locked_router_action(router_cli_ses);
router_cli_ses->rses_client_dcb->func.hangup(router_cli_ses->rses_client_dcb);
return;
}
/* Send a COM_INIT_DB packet to the server with the right database
* and set it as the client's active database */
unsigned int qlen;
GWBUF* buffer;
qlen = strlen(router_cli_ses->connect_db);
buffer = gwbuf_alloc(qlen + 5);
if(buffer == NULL)
if (!success)
{
skygw_log_write_flush(LOGFILE_ERROR,"Error : Buffer allocation failed.");
router_cli_ses->rses_closed = true;
if(router_cli_ses->queue)
gwbuf_free(router_cli_ses->queue);
rses_end_locked_router_action(router_cli_ses);
return;
dcb_close(router_cli_ses->rses_client_dcb);
}
gw_mysql_set_byte3((unsigned char*)buffer->start,qlen+1);
gwbuf_set_type(buffer,GWBUF_TYPE_MYSQL);
*((unsigned char*)buffer->start + 3) = 0x0;
*((unsigned char*)buffer->start + 4) = 0x2;
memcpy(buffer->start+5,router_cli_ses->connect_db,qlen);
DCB* dcb = NULL;
if(get_shard_dcb(&dcb,router_cli_ses,target))
{
dcb->func.write(dcb,buffer);
skygw_log_write(LOGFILE_DEBUG,"schemarouter: USE '%s' sent to %s for session %p",
router_cli_ses->connect_db,
target,
router_cli_ses->rses_client_dcb->session);
}
else
{
skygw_log_write_flush(LOGFILE_TRACE,"schemarouter: Couldn't find target DCB for '%s'.",target);
router_cli_ses->rses_closed = true;
if(router_cli_ses->queue)
gwbuf_free(router_cli_ses->queue);
}
rses_end_locked_router_action(router_cli_ses);
return;
}
if (router_cli_ses->queue)
{
GWBUF* tmp = router_cli_ses->queue;
router_cli_ses->queue = router_cli_ses->queue->next;
tmp->next = NULL;
char* querystr = modutil_get_SQL(tmp);
skygw_log_write(LOGFILE_DEBUG,"schemarouter: Sending queued buffer for session %p: %s",
router_cli_ses->rses_client_dcb->session,
querystr);
poll_add_epollin_event_to_dcb(router_cli_ses->dcb_route,tmp);
free(querystr);
route_queued_query(router_cli_ses);
}
skygw_log_write_flush(LOGFILE_DEBUG,"session [%p] database map finished.",
skygw_log_write_flush(LOGFILE_DEBUG,
"session [%p] database map finished.",
router_cli_ses);
}
rses_end_locked_router_action(router_cli_ses);
if (rc == -1)
{
dcb_close(router_cli_ses->rses_client_dcb);
}
return;
}
if (router_cli_ses->queue)
{
GWBUF* tmp = router_cli_ses->queue;
router_cli_ses->queue = router_cli_ses->queue->next;
tmp->next = NULL;
char* querystr = modutil_get_SQL(tmp);
skygw_log_write(LOGFILE_DEBUG,"schemarouter: Sending queued buffer for session %p: %s",
router_cli_ses->rses_client_dcb->session,
querystr);
poll_add_epollin_event_to_dcb(router_cli_ses->dcb_route,tmp);
free(querystr);
route_queued_query(router_cli_ses);
}
if (router_cli_ses->init & INIT_USE_DB)
@ -2849,7 +2669,9 @@ static void clientReply (
ss_dassert(router_cli_ses->init == INIT_READY);
rses_end_locked_router_action(router_cli_ses);
if (writebuf)
{
while ((writebuf = gwbuf_consume(writebuf, gwbuf_length(writebuf))));
}
return;
}
@ -2940,7 +2762,7 @@ static void clientReply (
if (!rses_begin_locked_router_action(router_cli_ses))
{
/** Log to debug that router was closed */
goto lock_failed;
return;
}
/** There is one pending session command to be executed. */
if (sescmd_cursor_is_active(scur))
@ -2985,7 +2807,6 @@ static void clientReply (
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
lock_failed:
return;
}
@ -4647,3 +4468,197 @@ int process_show_shards(ROUTER_CLIENT_SES* rses)
hashtable_iterator_free(iter);
return 0;
}
/**
*
* @param dcb
* @param errnum
* @param mysqlstate
* @param errmsg
*/
void write_error_to_client(DCB* dcb, int errnum, const char* mysqlstate, const char* errmsg)
{
GWBUF* errbuff = modutil_create_mysql_err_msg(1, 0, errnum, mysqlstate, errmsg);
if (errbuff)
{
if (dcb->func.write(dcb, errbuff) != 1)
{
skygw_log_write(LE, "Error: Failed to write error packet to client.");
}
}
else
{
skygw_log_write(LE, "Error: Memory allocation failed when creating error packet.");
}
}
/**
*
* @param router_cli_ses
* @return
*/
bool handle_default_db(ROUTER_CLIENT_SES *router_cli_ses)
{
char* target;
if ((target = hashtable_fetch(router_cli_ses->dbhash,
router_cli_ses->connect_db)) == NULL)
{
/** Unknown database, hang up on the client*/
skygw_log_write_flush(LOGFILE_TRACE, "schemarouter: Connecting to a non-existent database '%s'",
router_cli_ses->connect_db);
char errmsg[128 + MYSQL_DATABASE_MAXLEN + 1];
sprintf(errmsg, "Unknown database '%s'", router_cli_ses->connect_db);
if (router_cli_ses->rses_config.debug)
{
sprintf(errmsg + strlen(errmsg), " ([%lu]: DB not found on connect)", router_cli_ses->rses_client_dcb->session->ses_id);
}
write_error_to_client(router_cli_ses->rses_client_dcb,
SCHEMA_ERR_DBNOTFOUND,
SCHEMA_ERRSTR_DBNOTFOUND,
errmsg);
return false;
}
/* Send a COM_INIT_DB packet to the server with the right database
* and set it as the client's active database */
unsigned int qlen;
GWBUF* buffer;
qlen = strlen(router_cli_ses->connect_db);
buffer = gwbuf_alloc(qlen + 5);
if (buffer == NULL)
{
skygw_log_write_flush(LOGFILE_ERROR, "Error : Buffer allocation failed.");
return false;
}
gw_mysql_set_byte3((unsigned char*) buffer->start, qlen + 1);
gwbuf_set_type(buffer, GWBUF_TYPE_MYSQL);
*((unsigned char*) buffer->start + 3) = 0x0;
*((unsigned char*) buffer->start + 4) = 0x2;
memcpy(buffer->start + 5, router_cli_ses->connect_db, qlen);
DCB* dcb = NULL;
if (get_shard_dcb(&dcb, router_cli_ses, target))
{
dcb->func.write(dcb, buffer);
skygw_log_write(LOGFILE_DEBUG, "schemarouter: USE '%s' sent to %s for session %p",
router_cli_ses->connect_db,
target,
router_cli_ses->rses_client_dcb->session);
}
else
{
skygw_log_write_flush(LOGFILE_TRACE, "schemarouter: Couldn't find target DCB for '%s'.", target);
return false;
}
return true;
}
void route_queued_query(ROUTER_CLIENT_SES *router_cli_ses)
{
GWBUF* tmp = router_cli_ses->queue;
router_cli_ses->queue = router_cli_ses->queue->next;
tmp->next = NULL;
#ifdef SS_DEBUG
char* querystr = modutil_get_SQL(tmp);
skygw_log_write(LOGFILE_DEBUG, "schemarouter: Sending queued buffer for session %p: %s",
router_cli_ses->rses_client_dcb->session,
querystr);
free(querystr);
#endif
poll_add_epollin_event_to_dcb(router_cli_ses->dcb_route, tmp);
}
/**
*
* @param router_cli_ses Router client session
* @return 1 if mapping is done, 0 if it is still ongoing and -1 on error
*/
int inspect_backend_mapping_states(ROUTER_CLIENT_SES *router_cli_ses,
backend_ref_t *bref,
GWBUF** wbuf)
{
bool mapped = true;
GWBUF* writebuf = *wbuf;
backend_ref_t* bkrf = router_cli_ses->rses_backend_ref;
for (int i = 0; i < router_cli_ses->rses_nbackends; i++)
{
if (bref->bref_dcb == bkrf[i].bref_dcb && !BREF_IS_MAPPED(&bkrf[i]))
{
if (bref->map_queue)
{
writebuf = gwbuf_append(bref->map_queue, writebuf);
bref->map_queue = NULL;
}
int rc = parse_showdb_response(router_cli_ses,
&router_cli_ses->rses_backend_ref[i],
&writebuf);
if (rc == 1)
{
router_cli_ses->rses_backend_ref[i].bref_mapped = true;
skygw_log_write(LOGFILE_DEBUG, "schemarouter: Received SHOW DATABASES reply from %s for session %p",
router_cli_ses->rses_backend_ref[i].bref_backend->backend_server->unique_name,
router_cli_ses->rses_client_dcb->session);
}
else if (rc == 0)
{
bref->map_queue = writebuf;
writebuf = NULL;
skygw_log_write(LOGFILE_DEBUG, "schemarouter: Received partial SHOW DATABASES reply from %s for session %p",
router_cli_ses->rses_backend_ref[i].bref_backend->backend_server->unique_name,
router_cli_ses->rses_client_dcb->session);
}
else
{
DCB* client_dcb = NULL;
if ((router_cli_ses->init & INIT_FAILED) == 0)
{
skygw_log_write(LE, "Error: Duplicate databases found, closing session.");
client_dcb = router_cli_ses->rses_client_dcb;
/** This is the first response to the database mapping which
* has duplicate database conflict. Set the initialization bitmask
* to INIT_FAILED */
router_cli_ses->init |= INIT_FAILED;
/** Send the client an error about duplicate databases
* if there is a queued query from the client. */
if (router_cli_ses->queue)
{
GWBUF* error = modutil_create_mysql_err_msg(1, 0,
SCHEMA_ERR_DUPLICATEDB, SCHEMA_ERRSTR_DUPLICATEDB,
"Error: duplicate databases found on two different shards.");
if (error)
{
client_dcb->func.write(client_dcb, error);
}
else
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Error : Creating buffer for error message failed.")));
}
}
}
*wbuf = writebuf;
return -1;
}
}
if (BREF_IS_IN_USE(&bkrf[i]) &&
!BREF_IS_MAPPED(&bkrf[i]))
{
mapped = false;
skygw_log_write(LOGFILE_DEBUG, "schemarouter: Still waiting for reply to SHOW DATABASES from %s for session %p",
bkrf[i].bref_backend->backend_server->unique_name,
router_cli_ses->rses_client_dcb->session);
}
}
*wbuf = writebuf;
return mapped ? 1 : 0;
}