From af2bc91383ea76fe3d0fa5bba0a4db1e98111174 Mon Sep 17 00:00:00 2001 From: Markus Makela Date: Sat, 19 Sep 2015 17:21:08 +0300 Subject: [PATCH] Cleaned up the clientReply function of the schemarouter. --- server/modules/include/schemarouter.h | 2 +- .../routing/schemarouter/schemarouter.c | 877 +++++++++--------- 2 files changed, 447 insertions(+), 432 deletions(-) diff --git a/server/modules/include/schemarouter.h b/server/modules/include/schemarouter.h index cfb4eb825..8d1dad390 100644 --- a/server/modules/include/schemarouter.h +++ b/server/modules/include/schemarouter.h @@ -66,7 +66,7 @@ typedef enum bref_state { #define SCHEMA_ERR_DUPLICATEDB 5000 #define SCHEMA_ERRSTR_DUPLICATEDB "DUPDB" -#define SCHEMA_ERR_DBNOTFOUND 1049 +#define SCHEMA_ERR_DBNOTFOUND 1049 #define SCHEMA_ERRSTR_DBNOTFOUND "42000" /** * The type of the backend server diff --git a/server/modules/routing/schemarouter/schemarouter.c b/server/modules/routing/schemarouter/schemarouter.c index 43d92e567..6c15d51b0 100644 --- a/server/modules/routing/schemarouter/schemarouter.c +++ b/server/modules/routing/schemarouter/schemarouter.c @@ -214,6 +214,12 @@ int process_show_shards(ROUTER_CLIENT_SES* rses); static int hashkeyfun(void* key); static int hashcmpfun (void *, void *); +void write_error_to_client(DCB* dcb, int errnum, const char* mysqlstate, const char* errmsg); +int inspect_backend_mapping_states(ROUTER_CLIENT_SES *router_cli_ses, + backend_ref_t *bref, + GWBUF** wbuf); +bool handle_default_db(ROUTER_CLIENT_SES *router_cli_ses); +void route_queued_query(ROUTER_CLIENT_SES *router_cli_ses); static int hashkeyfun(void* key) { if(key == NULL){ @@ -2151,18 +2157,11 @@ static int routeQuery( { sprintf(errbuf + strlen(errbuf)," ([%lu]: DB change failed)",router_cli_ses->rses_client_dcb->session->ses_id); } - GWBUF* error = modutil_create_mysql_err_msg(1, 0, SCHEMA_ERR_DBNOTFOUND, - SCHEMA_ERRSTR_DBNOTFOUND, errbuf); - if (error == NULL) - { - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Creating buffer for error message failed."))); - return 0; - } - /** Set flags that help router to identify session commands reply */ - router_cli_ses->rses_client_dcb->func.write(router_cli_ses->rses_client_dcb,error); + write_error_to_client(router_cli_ses->rses_client_dcb, + SCHEMA_ERR_DBNOTFOUND, + SCHEMA_ERRSTR_DBNOTFOUND, + errbuf); LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, @@ -2551,442 +2550,264 @@ diagnostic(ROUTER *instance, DCB *dcb) * The routine will reply to client for session change with master server data * * @param instance The router instance - * @param router_session The router session + * @param router_session The router session * @param backend_dcb The backend DCB * @param queue The GWBUF with reply data */ -static void clientReply ( - ROUTER* instance, - void* router_session, - GWBUF* buffer, - DCB* backend_dcb) +static void clientReply(ROUTER* instance, + void* router_session, + GWBUF* buffer, + DCB* backend_dcb) { - DCB* client_dcb; - ROUTER_CLIENT_SES* router_cli_ses; - sescmd_cursor_t* scur = NULL; - backend_ref_t* bref; - GWBUF* writebuf = buffer; + DCB* client_dcb; + ROUTER_CLIENT_SES* router_cli_ses; + sescmd_cursor_t* scur = NULL; + backend_ref_t* bref; + GWBUF* writebuf = buffer; - router_cli_ses = (ROUTER_CLIENT_SES *)router_session; - CHK_CLIENT_RSES(router_cli_ses); + router_cli_ses = (ROUTER_CLIENT_SES *) router_session; + CHK_CLIENT_RSES(router_cli_ses); + /** + * Lock router client session for secure read of router session members. + * Note that this could be done without lock by using version # + */ + if (!rses_begin_locked_router_action(router_cli_ses)) + { + while ((writebuf = gwbuf_consume(writebuf, gwbuf_length(writebuf)))); + return; + } + /** Holding lock ensures that router session remains open */ + ss_dassert(backend_dcb->session != NULL); + client_dcb = backend_dcb->session->client; + + /** Unlock */ + rses_end_locked_router_action(router_cli_ses); + + if (client_dcb == NULL || !rses_begin_locked_router_action(router_cli_ses)) + { + while ((writebuf = gwbuf_consume(writebuf, gwbuf_length(writebuf)))); + return; + } + + bref = get_bref_from_dcb(router_cli_ses, backend_dcb); + + if (bref == NULL) + { + /** Unlock router session */ + rses_end_locked_router_action(router_cli_ses); + while ((writebuf = gwbuf_consume(writebuf, gwbuf_length(writebuf)))); + return; + } + + skygw_log_write(LOGFILE_DEBUG, "schemarouter: Reply from [%s] session [%p]" + " mapping [%s] queries queued [%s]", + bref->bref_backend->backend_server->unique_name, + router_cli_ses->rses_client_dcb->session, + router_cli_ses->init & INIT_MAPPING ? "true" : "false", + router_cli_ses->queue == NULL ? "none" : + router_cli_ses->queue->next ? "multiple" : "one"); + + + + if (router_cli_ses->init & INIT_MAPPING) + { + int rc = inspect_backend_mapping_states(router_cli_ses, bref, &writebuf); + + while (writebuf && (writebuf = gwbuf_consume(writebuf, gwbuf_length(writebuf)))); + + if (rc == 1) + { + /* + * Check if the session is reconnecting with a database name + * that is not in the hashtable. If the database is not found + * then close the session. + */ + router_cli_ses->init &= ~INIT_MAPPING; + + if (router_cli_ses->init & INIT_USE_DB) + { + bool success = handle_default_db(router_cli_ses); + rses_end_locked_router_action(router_cli_ses); + if (!success) + { + dcb_close(router_cli_ses->rses_client_dcb); + } + return; + } + + if (router_cli_ses->queue) + { + route_queued_query(router_cli_ses); + } + skygw_log_write_flush(LOGFILE_DEBUG, + "session [%p] database map finished.", + router_cli_ses); + } + + rses_end_locked_router_action(router_cli_ses); + + if (rc == -1) + { + dcb_close(router_cli_ses->rses_client_dcb); + } + return; + } + + if (router_cli_ses->queue) + { + route_queued_query(router_cli_ses); + } + + if (router_cli_ses->init & INIT_USE_DB) + { + skygw_log_write(LOGFILE_DEBUG, "schemarouter: Reply to USE '%s' received for session %p", + router_cli_ses->connect_db, + router_cli_ses->rses_client_dcb->session); + router_cli_ses->init &= ~INIT_USE_DB; + strcpy(router_cli_ses->rses_mysql_session->db, router_cli_ses->connect_db); + ss_dassert(router_cli_ses->init == INIT_READY); + rses_end_locked_router_action(router_cli_ses); + if (writebuf) + { + while ((writebuf = gwbuf_consume(writebuf, gwbuf_length(writebuf)))); + } + return; + } + + CHK_BACKEND_REF(bref); + scur = &bref->bref_sescmd_cur; + /** + * Active cursor means that reply is from session command + * execution. + */ + if (sescmd_cursor_is_active(scur)) + { + if (LOG_IS_ENABLED(LOGFILE_ERROR) && + MYSQL_IS_ERROR_PACKET(((uint8_t *) GWBUF_DATA(writebuf)))) + { + uint8_t* buf = + (uint8_t *) GWBUF_DATA((scur->scmd_cur_cmd->my_sescmd_buf)); + uint8_t* replybuf = (uint8_t *) GWBUF_DATA(writebuf); + size_t len = MYSQL_GET_PACKET_LEN(buf); + size_t replylen = MYSQL_GET_PACKET_LEN(replybuf); + char* cmdstr = strndup(&((char *) buf)[5], len - 4); + char* err = strndup(&((char *) replybuf)[8], 5); + char* replystr = strndup(&((char *) replybuf)[13], + replylen - 4 - 5); + + ss_dassert(len + 4 == GWBUF_LENGTH(scur->scmd_cur_cmd->my_sescmd_buf)); + + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Failed to execute %s in %s:%d. %s %s", + cmdstr, + bref->bref_backend->backend_server->name, + bref->bref_backend->backend_server->port, + err, + replystr))); + + free(cmdstr); + free(err); + free(replystr); + } + + if (GWBUF_IS_TYPE_SESCMD_RESPONSE(writebuf)) + { + /** + * Discard all those responses that have already been sent to + * the client. Return with buffer including response that + * needs to be sent to client or NULL. + */ + writebuf = sescmd_cursor_process_replies(writebuf, bref); + } /** - * Lock router client session for secure read of router session members. - * Note that this could be done without lock by using version # + * If response will be sent to client, decrease waiter count. + * This applies to session commands only. Counter decrement + * for other type of queries is done outside this block. */ - if (!rses_begin_locked_router_action(router_cli_ses)) + if (writebuf != NULL && client_dcb != NULL) { - while((writebuf = gwbuf_consume(writebuf,gwbuf_length(writebuf)))); - goto lock_failed; - } - /** Holding lock ensures that router session remains open */ - ss_dassert(backend_dcb->session != NULL); - client_dcb = backend_dcb->session->client; - - /** Unlock */ - rses_end_locked_router_action(router_cli_ses); - /** - * 1. Check if backend received reply to sescmd. - * 2. Check sescmd's state whether OK_PACKET has been - * sent to client already and if not, lock property cursor, - * reply to client, and move property cursor forward. Finally - * release the lock. - * 3. If reply for this sescmd is sent, lock property cursor - * and - */ - if (client_dcb == NULL) - { - while ((writebuf = gwbuf_consume( - writebuf, - GWBUF_LENGTH(writebuf))) != NULL); - /** Log that client was closed before reply */ - goto lock_failed; - } - /** Lock router session */ - if (!rses_begin_locked_router_action(router_cli_ses)) - { - /** Log to debug that router was closed */ - while((writebuf = gwbuf_consume(writebuf,gwbuf_length(writebuf)))); - goto lock_failed; + /** Set response status as replied */ + bref_clear_state(bref, BREF_WAITING_RESULT); } - bref = get_bref_from_dcb(router_cli_ses, backend_dcb); - - if (bref == NULL) - { - /** Unlock router session */ - rses_end_locked_router_action(router_cli_ses); - while((writebuf = gwbuf_consume(writebuf,gwbuf_length(writebuf)))); - goto lock_failed; - } - - skygw_log_write(LOGFILE_DEBUG,"schemarouter: Reply from [%s] session [%p]" - " mapping [%s] queries queued [%s]", - bref->bref_backend->backend_server->unique_name, - router_cli_ses->rses_client_dcb->session, - router_cli_ses->init & INIT_MAPPING?"true":"false", - router_cli_ses->queue == NULL ? "none" : - router_cli_ses->queue->next ? "multiple":"one"); - - - - if(router_cli_ses->init & INIT_MAPPING) - { - bool mapped = true, logged = false; - int i; - backend_ref_t* bkrf = router_cli_ses->rses_backend_ref; - - for(i = 0; i < router_cli_ses->rses_nbackends; i++) - { - if(bref->bref_dcb == bkrf[i].bref_dcb && !BREF_IS_MAPPED(&bkrf[i])) - { - if (bref->map_queue) - { - writebuf = gwbuf_append(bref->map_queue, writebuf); - bref->map_queue = NULL; - } - int rc = parse_showdb_response(router_cli_ses, - &router_cli_ses->rses_backend_ref[i], - &writebuf); - if (rc == 1) - { - router_cli_ses->rses_backend_ref[i].bref_mapped = true; - skygw_log_write(LOGFILE_DEBUG, "schemarouter: Received SHOW DATABASES reply from %s for session %p", - router_cli_ses->rses_backend_ref[i].bref_backend->backend_server->unique_name, - router_cli_ses->rses_client_dcb->session); - } - else if (rc == 0) - { - bref->map_queue = writebuf; - writebuf = NULL; - skygw_log_write(LOGFILE_DEBUG, "schemarouter: Received partial SHOW DATABASES reply from %s for session %p", - router_cli_ses->rses_backend_ref[i].bref_backend->backend_server->unique_name, - router_cli_ses->rses_client_dcb->session); - } - else - { - while (writebuf && (writebuf = gwbuf_consume(writebuf, gwbuf_length(writebuf)))); - DCB* client_dcb = NULL; - - if((router_cli_ses->init & INIT_FAILED) == 0) - { - skygw_log_write(LE, "Error: Duplicate databases found, closing session."); - client_dcb = router_cli_ses->rses_client_dcb; - - /** This is the first response to the database mapping which - * has duplicate database conflict. Set the initialization bitmask - * to INIT_FAILED */ - router_cli_ses->init |= INIT_FAILED; - - /** Send the client an error about duplicate databases - * if there is a queued query from the client. */ - if (router_cli_ses->queue) - { - GWBUF* error = modutil_create_mysql_err_msg(1, 0, - SCHEMA_ERR_DUPLICATEDB, SCHEMA_ERRSTR_DUPLICATEDB, - "Error: duplicate databases found on two different shards."); - - if (error) - { - client_dcb->func.write(client_dcb, error); - } - else - { - LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, - "Error : Creating buffer for error message failed."))); - } - } - } - rses_end_locked_router_action(router_cli_ses); - if(client_dcb) - dcb_close(client_dcb); - return; - } - } - - if(BREF_IS_IN_USE(&bkrf[i]) && - !BREF_IS_MAPPED(&bkrf[i])) - { - mapped = false; - if(!logged) - { - skygw_log_write(LOGFILE_DEBUG,"schemarouter: Still waiting for reply to SHOW DATABASES from %s for session %p", - bkrf[i].bref_backend->backend_server->unique_name, - router_cli_ses->rses_client_dcb->session); - logged = true; - } - } - } - - while(writebuf && (writebuf = gwbuf_consume(writebuf,gwbuf_length(writebuf)))); - - if(mapped) - { - /* - * Check if the session is reconnecting with a database name - * that is not in the hashtable. If the database is not found - * then close the session. - */ - - router_cli_ses->init &= ~INIT_MAPPING; - - if(router_cli_ses->init & INIT_USE_DB) - { - char* target; - - if((target = hashtable_fetch(router_cli_ses->dbhash, - router_cli_ses->connect_db)) == NULL) - { - /** Unknown database, hang up on the client*/ - skygw_log_write_flush(LOGFILE_TRACE,"schemarouter: Connecting to a non-existent database '%s'", - router_cli_ses->connect_db); - char errmsg[128 + MYSQL_DATABASE_MAXLEN+1]; - sprintf(errmsg,"Unknown database '%s'",router_cli_ses->connect_db); - if(router_cli_ses->rses_config.debug) - { - sprintf(errmsg + strlen(errmsg)," ([%lu]: DB not found on connect)",router_cli_ses->rses_client_dcb->session->ses_id); - } - GWBUF* errbuff = modutil_create_mysql_err_msg(1, 0, SCHEMA_ERR_DBNOTFOUND, - SCHEMA_ERRSTR_DBNOTFOUND, errmsg); - - router_cli_ses->rses_client_dcb->func.write(router_cli_ses->rses_client_dcb,errbuff); - if(router_cli_ses->queue) - { - while((router_cli_ses->queue = gwbuf_consume( - router_cli_ses->queue,gwbuf_length(router_cli_ses->queue)))); - } - rses_end_locked_router_action(router_cli_ses); - router_cli_ses->rses_client_dcb->func.hangup(router_cli_ses->rses_client_dcb); - return; - } - - /* Send a COM_INIT_DB packet to the server with the right database - * and set it as the client's active database */ - - unsigned int qlen; - GWBUF* buffer; - - qlen = strlen(router_cli_ses->connect_db); - buffer = gwbuf_alloc(qlen + 5); - if(buffer == NULL) - { - skygw_log_write_flush(LOGFILE_ERROR,"Error : Buffer allocation failed."); - router_cli_ses->rses_closed = true; - if(router_cli_ses->queue) - gwbuf_free(router_cli_ses->queue); - rses_end_locked_router_action(router_cli_ses); - return; - } - - gw_mysql_set_byte3((unsigned char*)buffer->start,qlen+1); - gwbuf_set_type(buffer,GWBUF_TYPE_MYSQL); - *((unsigned char*)buffer->start + 3) = 0x0; - *((unsigned char*)buffer->start + 4) = 0x2; - memcpy(buffer->start+5,router_cli_ses->connect_db,qlen); - DCB* dcb = NULL; - - if(get_shard_dcb(&dcb,router_cli_ses,target)) - { - dcb->func.write(dcb,buffer); - skygw_log_write(LOGFILE_DEBUG,"schemarouter: USE '%s' sent to %s for session %p", - router_cli_ses->connect_db, - target, - router_cli_ses->rses_client_dcb->session); - } - else - { - skygw_log_write_flush(LOGFILE_TRACE,"schemarouter: Couldn't find target DCB for '%s'.",target); - router_cli_ses->rses_closed = true; - if(router_cli_ses->queue) - gwbuf_free(router_cli_ses->queue); - } - - rses_end_locked_router_action(router_cli_ses); - return; - } - - if(router_cli_ses->queue) - { - GWBUF* tmp = router_cli_ses->queue; - router_cli_ses->queue = router_cli_ses->queue->next; - tmp->next = NULL; - char* querystr = modutil_get_SQL(tmp); - skygw_log_write(LOGFILE_DEBUG,"schemarouter: Sending queued buffer for session %p: %s", - router_cli_ses->rses_client_dcb->session, - querystr); - poll_add_epollin_event_to_dcb(router_cli_ses->dcb_route,tmp); - free(querystr); - - } - skygw_log_write_flush(LOGFILE_DEBUG,"session [%p] database map finished.", - router_cli_ses); - } - - rses_end_locked_router_action(router_cli_ses); - - return; - } - - if(router_cli_ses->queue) - { - GWBUF* tmp = router_cli_ses->queue; - router_cli_ses->queue = router_cli_ses->queue->next; - tmp->next = NULL; - char* querystr = modutil_get_SQL(tmp); - skygw_log_write(LOGFILE_DEBUG,"schemarouter: Sending queued buffer for session %p: %s", - router_cli_ses->rses_client_dcb->session, - querystr); - poll_add_epollin_event_to_dcb(router_cli_ses->dcb_route,tmp); - free(querystr); - } - - if(router_cli_ses->init & INIT_USE_DB) - { - skygw_log_write(LOGFILE_DEBUG,"schemarouter: Reply to USE '%s' received for session %p", - router_cli_ses->connect_db, - router_cli_ses->rses_client_dcb->session); - router_cli_ses->init &= ~INIT_USE_DB; - strcpy(router_cli_ses->rses_mysql_session->db,router_cli_ses->connect_db); - ss_dassert(router_cli_ses->init == INIT_READY); - rses_end_locked_router_action(router_cli_ses); - if(writebuf) - while((writebuf = gwbuf_consume(writebuf,gwbuf_length(writebuf)))); - return; - } - - CHK_BACKEND_REF(bref); - scur = &bref->bref_sescmd_cur; + } /** - * Active cursor means that reply is from session command - * execution. - */ - if (sescmd_cursor_is_active(scur)) - { - if (LOG_IS_ENABLED(LOGFILE_ERROR) && - MYSQL_IS_ERROR_PACKET(((uint8_t *)GWBUF_DATA(writebuf)))) - { - uint8_t* buf = - (uint8_t *)GWBUF_DATA((scur->scmd_cur_cmd->my_sescmd_buf)); - uint8_t* replybuf = (uint8_t *)GWBUF_DATA(writebuf); - size_t len = MYSQL_GET_PACKET_LEN(buf); - size_t replylen = MYSQL_GET_PACKET_LEN(replybuf); - char* cmdstr = strndup(&((char *)buf)[5], len-4); - char* err = strndup(&((char *)replybuf)[8], 5); - char* replystr = strndup(&((char *)replybuf)[13], - replylen-4-5); - - ss_dassert(len+4 == GWBUF_LENGTH(scur->scmd_cur_cmd->my_sescmd_buf)); - - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Failed to execute %s in %s:%d. %s %s", - cmdstr, - bref->bref_backend->backend_server->name, - bref->bref_backend->backend_server->port, - err, - replystr))); - - free(cmdstr); - free(err); - free(replystr); - } - - if (GWBUF_IS_TYPE_SESCMD_RESPONSE(writebuf)) - { - /** - * Discard all those responses that have already been sent to - * the client. Return with buffer including response that - * needs to be sent to client or NULL. - */ - writebuf = sescmd_cursor_process_replies(writebuf, bref); - } - /** - * If response will be sent to client, decrease waiter count. - * This applies to session commands only. Counter decrement - * for other type of queries is done outside this block. - */ - if (writebuf != NULL && client_dcb != NULL) - { - /** Set response status as replied */ - bref_clear_state(bref, BREF_WAITING_RESULT); - } - } - /** * Clear BREF_QUERY_ACTIVE flag and decrease waiter counter. * This applies for queries other than session commands. */ - else if (BREF_IS_QUERY_ACTIVE(bref)) - { - bref_clear_state(bref, BREF_QUERY_ACTIVE); - /** Set response status as replied */ - bref_clear_state(bref, BREF_WAITING_RESULT); - } - - if (writebuf != NULL && client_dcb != NULL) - { - unsigned char* cmd = (unsigned char*)writebuf->start; - int state = router_cli_ses->init; - /** Write reply to client DCB */ - skygw_log_write(LOGFILE_TRACE, "schemarouter: returning reply [%s] " - "state [%s] session [%p]", - PTR_IS_ERR(cmd) ? "ERR" :PTR_IS_OK(cmd) ? "OK" : "RSET", - state & INIT_UNINT ? "UNINIT" :state & INIT_MAPPING ? "MAPPING" : "READY", - router_cli_ses->rses_client_dcb->session); - SESSION_ROUTE_REPLY(backend_dcb->session, writebuf); - } - /** Unlock router session */ - rses_end_locked_router_action(router_cli_ses); - - /** Lock router session */ - if (!rses_begin_locked_router_action(router_cli_ses)) - { - /** Log to debug that router was closed */ - goto lock_failed; - } - /** There is one pending session command to be executed. */ - if (sescmd_cursor_is_active(scur)) - { + else if (BREF_IS_QUERY_ACTIVE(bref)) + { + bref_clear_state(bref, BREF_QUERY_ACTIVE); + /** Set response status as replied */ + bref_clear_state(bref, BREF_WAITING_RESULT); + } - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "Backend %s:%d processed reply and starts to execute " - "active cursor.", - bref->bref_backend->backend_server->name, - bref->bref_backend->backend_server->port))); - - execute_sescmd_in_backend(bref); - } - else if (bref->bref_pending_cmd != NULL) /*< non-sescmd is waiting to be routed */ - { - int ret; - - CHK_GWBUF(bref->bref_pending_cmd); - - if ((ret = bref->bref_dcb->func.write(bref->bref_dcb, - gwbuf_clone(bref->bref_pending_cmd))) == 1) - { - ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; - atomic_add(&inst->stats.n_queries, 1); - /** - * Add one query response waiter to backend reference - */ - bref_set_state(bref, BREF_QUERY_ACTIVE); - bref_set_state(bref, BREF_WAITING_RESULT); - } - else - { - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Routing query \"%s\" failed.", - bref->bref_pending_cmd))); - } - gwbuf_free(bref->bref_pending_cmd); - bref->bref_pending_cmd = NULL; - } - /** Unlock router session */ - rses_end_locked_router_action(router_cli_ses); - -lock_failed: + if (writebuf != NULL && client_dcb != NULL) + { + unsigned char* cmd = (unsigned char*) writebuf->start; + int state = router_cli_ses->init; + /** Write reply to client DCB */ + skygw_log_write(LOGFILE_TRACE, "schemarouter: returning reply [%s] " + "state [%s] session [%p]", + PTR_IS_ERR(cmd) ? "ERR" : PTR_IS_OK(cmd) ? "OK" : "RSET", + state & INIT_UNINT ? "UNINIT" : state & INIT_MAPPING ? "MAPPING" : "READY", + router_cli_ses->rses_client_dcb->session); + SESSION_ROUTE_REPLY(backend_dcb->session, writebuf); + } + /** Unlock router session */ + rses_end_locked_router_action(router_cli_ses); + + /** Lock router session */ + if (!rses_begin_locked_router_action(router_cli_ses)) + { + /** Log to debug that router was closed */ return; + } + /** There is one pending session command to be executed. */ + if (sescmd_cursor_is_active(scur)) + { + + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Backend %s:%d processed reply and starts to execute " + "active cursor.", + bref->bref_backend->backend_server->name, + bref->bref_backend->backend_server->port))); + + execute_sescmd_in_backend(bref); + } + else if (bref->bref_pending_cmd != NULL) /*< non-sescmd is waiting to be routed */ + { + int ret; + + CHK_GWBUF(bref->bref_pending_cmd); + + if ((ret = bref->bref_dcb->func.write(bref->bref_dcb, + gwbuf_clone(bref->bref_pending_cmd))) == 1) + { + ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *) instance; + atomic_add(&inst->stats.n_queries, 1); + /** + * Add one query response waiter to backend reference + */ + bref_set_state(bref, BREF_QUERY_ACTIVE); + bref_set_state(bref, BREF_WAITING_RESULT); + } + else + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Routing query \"%s\" failed.", + bref->bref_pending_cmd))); + } + gwbuf_free(bref->bref_pending_cmd); + bref->bref_pending_cmd = NULL; + } + /** Unlock router session */ + rses_end_locked_router_action(router_cli_ses); + + return; } /** Compare number of connections from this router in backend servers */ @@ -4647,3 +4468,197 @@ int process_show_shards(ROUTER_CLIENT_SES* rses) hashtable_iterator_free(iter); return 0; } + +/** + * + * @param dcb + * @param errnum + * @param mysqlstate + * @param errmsg + */ +void write_error_to_client(DCB* dcb, int errnum, const char* mysqlstate, const char* errmsg) +{ + GWBUF* errbuff = modutil_create_mysql_err_msg(1, 0, errnum, mysqlstate, errmsg); + if (errbuff) + { + if (dcb->func.write(dcb, errbuff) != 1) + { + skygw_log_write(LE, "Error: Failed to write error packet to client."); + } + } + else + { + skygw_log_write(LE, "Error: Memory allocation failed when creating error packet."); + } +} + +/** + * + * @param router_cli_ses + * @return + */ +bool handle_default_db(ROUTER_CLIENT_SES *router_cli_ses) +{ + char* target; + + if ((target = hashtable_fetch(router_cli_ses->dbhash, + router_cli_ses->connect_db)) == NULL) + { + /** Unknown database, hang up on the client*/ + skygw_log_write_flush(LOGFILE_TRACE, "schemarouter: Connecting to a non-existent database '%s'", + router_cli_ses->connect_db); + char errmsg[128 + MYSQL_DATABASE_MAXLEN + 1]; + sprintf(errmsg, "Unknown database '%s'", router_cli_ses->connect_db); + if (router_cli_ses->rses_config.debug) + { + sprintf(errmsg + strlen(errmsg), " ([%lu]: DB not found on connect)", router_cli_ses->rses_client_dcb->session->ses_id); + } + write_error_to_client(router_cli_ses->rses_client_dcb, + SCHEMA_ERR_DBNOTFOUND, + SCHEMA_ERRSTR_DBNOTFOUND, + errmsg); + return false; + } + + /* Send a COM_INIT_DB packet to the server with the right database + * and set it as the client's active database */ + + unsigned int qlen; + GWBUF* buffer; + + qlen = strlen(router_cli_ses->connect_db); + buffer = gwbuf_alloc(qlen + 5); + if (buffer == NULL) + { + skygw_log_write_flush(LOGFILE_ERROR, "Error : Buffer allocation failed."); + return false; + } + + gw_mysql_set_byte3((unsigned char*) buffer->start, qlen + 1); + gwbuf_set_type(buffer, GWBUF_TYPE_MYSQL); + *((unsigned char*) buffer->start + 3) = 0x0; + *((unsigned char*) buffer->start + 4) = 0x2; + memcpy(buffer->start + 5, router_cli_ses->connect_db, qlen); + DCB* dcb = NULL; + + if (get_shard_dcb(&dcb, router_cli_ses, target)) + { + dcb->func.write(dcb, buffer); + skygw_log_write(LOGFILE_DEBUG, "schemarouter: USE '%s' sent to %s for session %p", + router_cli_ses->connect_db, + target, + router_cli_ses->rses_client_dcb->session); + } + else + { + skygw_log_write_flush(LOGFILE_TRACE, "schemarouter: Couldn't find target DCB for '%s'.", target); + return false; + } + return true; +} + +void route_queued_query(ROUTER_CLIENT_SES *router_cli_ses) +{ + GWBUF* tmp = router_cli_ses->queue; + router_cli_ses->queue = router_cli_ses->queue->next; + tmp->next = NULL; +#ifdef SS_DEBUG + char* querystr = modutil_get_SQL(tmp); + skygw_log_write(LOGFILE_DEBUG, "schemarouter: Sending queued buffer for session %p: %s", + router_cli_ses->rses_client_dcb->session, + querystr); + free(querystr); +#endif + poll_add_epollin_event_to_dcb(router_cli_ses->dcb_route, tmp); +} + +/** + * + * @param router_cli_ses Router client session + * @return 1 if mapping is done, 0 if it is still ongoing and -1 on error + */ +int inspect_backend_mapping_states(ROUTER_CLIENT_SES *router_cli_ses, + backend_ref_t *bref, + GWBUF** wbuf) +{ + bool mapped = true; + GWBUF* writebuf = *wbuf; + backend_ref_t* bkrf = router_cli_ses->rses_backend_ref; + + for (int i = 0; i < router_cli_ses->rses_nbackends; i++) + { + if (bref->bref_dcb == bkrf[i].bref_dcb && !BREF_IS_MAPPED(&bkrf[i])) + { + if (bref->map_queue) + { + writebuf = gwbuf_append(bref->map_queue, writebuf); + bref->map_queue = NULL; + } + int rc = parse_showdb_response(router_cli_ses, + &router_cli_ses->rses_backend_ref[i], + &writebuf); + if (rc == 1) + { + router_cli_ses->rses_backend_ref[i].bref_mapped = true; + skygw_log_write(LOGFILE_DEBUG, "schemarouter: Received SHOW DATABASES reply from %s for session %p", + router_cli_ses->rses_backend_ref[i].bref_backend->backend_server->unique_name, + router_cli_ses->rses_client_dcb->session); + } + else if (rc == 0) + { + bref->map_queue = writebuf; + writebuf = NULL; + skygw_log_write(LOGFILE_DEBUG, "schemarouter: Received partial SHOW DATABASES reply from %s for session %p", + router_cli_ses->rses_backend_ref[i].bref_backend->backend_server->unique_name, + router_cli_ses->rses_client_dcb->session); + } + else + { + DCB* client_dcb = NULL; + + if ((router_cli_ses->init & INIT_FAILED) == 0) + { + skygw_log_write(LE, "Error: Duplicate databases found, closing session."); + client_dcb = router_cli_ses->rses_client_dcb; + + /** This is the first response to the database mapping which + * has duplicate database conflict. Set the initialization bitmask + * to INIT_FAILED */ + router_cli_ses->init |= INIT_FAILED; + + /** Send the client an error about duplicate databases + * if there is a queued query from the client. */ + if (router_cli_ses->queue) + { + GWBUF* error = modutil_create_mysql_err_msg(1, 0, + SCHEMA_ERR_DUPLICATEDB, SCHEMA_ERRSTR_DUPLICATEDB, + "Error: duplicate databases found on two different shards."); + + if (error) + { + client_dcb->func.write(client_dcb, error); + } + else + { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "Error : Creating buffer for error message failed."))); + } + } + } + *wbuf = writebuf; + return -1; + } + } + + if (BREF_IS_IN_USE(&bkrf[i]) && + !BREF_IS_MAPPED(&bkrf[i])) + { + mapped = false; + skygw_log_write(LOGFILE_DEBUG, "schemarouter: Still waiting for reply to SHOW DATABASES from %s for session %p", + bkrf[i].bref_backend->backend_server->unique_name, + router_cli_ses->rses_client_dcb->session); + } + } + *wbuf = writebuf; + return mapped ? 1 : 0; +}