diff --git a/server/modules/include/schemarouter.h b/server/modules/include/schemarouter.h index cf0c80ca8..805bbf544 100644 --- a/server/modules/include/schemarouter.h +++ b/server/modules/include/schemarouter.h @@ -215,6 +215,8 @@ typedef struct backend_ref_st { #if defined(SS_DEBUG) skygw_chk_t bref_chk_top; #endif + int n_mapping_eof; + GWBUF* map_queue; BACKEND* bref_backend; /*< Backend server */ DCB* bref_dcb; /*< Backend DCB */ bref_state_t bref_state; /*< State of the backend */ diff --git a/server/modules/routing/schemarouter/schemarouter.c b/server/modules/routing/schemarouter/schemarouter.c index b9ecb126d..44a12f0a0 100644 --- a/server/modules/routing/schemarouter/schemarouter.c +++ b/server/modules/routing/schemarouter/schemarouter.c @@ -311,32 +311,52 @@ char* get_lenenc_str(void* data, int* len) * @param buf GWBUF containing the result set * @return True if the buffer contained a result set with a single column. All other responses return false. */ -bool parse_showdb_response(ROUTER_CLIENT_SES* rses, char* target, GWBUF* buf) +bool parse_showdb_response(ROUTER_CLIENT_SES* rses, backend_ref_t* bref, GWBUF** buffer) { - bool rval = false; unsigned char* ptr; - int more = 0; - if(PTR_IS_RESULTSET(((unsigned char*)buf->start)) && - modutil_count_signal_packets(buf,0,0,&more) == 2) + char* target = bref->bref_backend->backend_server->unique_name; + GWBUF* buf; + + if(buffer == NULL || *buffer == NULL) + return false; + + buf = modutil_get_complete_packets(buffer); + + if(buf == NULL) + return false; + + ptr = (unsigned char*)buf->start; + + if(PTR_IS_ERR(ptr)) + { + skygw_log_write(LOGFILE_TRACE,"schemarouter: SHOW DATABASES returned an error."); + gwbuf_free(buf); + return true; + } + + if(bref->n_mapping_eof == 0) { - ptr = (unsigned char*)buf->start; - - if(ptr[4] != 1) - { - /** Something else came back, discard and return with an error*/ - return false; - } - /** Skip column definitions */ - while(!PTR_IS_EOF(ptr)) + while(ptr < (unsigned char*)buf->end && !PTR_IS_EOF(ptr)) { ptr += gw_mysql_get_byte3(ptr) + 4; } + if(ptr >= (unsigned char*)buf->end) + { + skygw_log_write(LOGFILE_TRACE,"schemarouter: Malformed packet for SHOW DATABASES."); + *buffer = gwbuf_append(buf,*buffer); + return false; + } + + atomic_add(&bref->n_mapping_eof,1); /** Skip first EOF packet */ ptr += gw_mysql_get_byte3(ptr) + 4; - - while(!PTR_IS_EOF(ptr)) + } + + if(bref->n_mapping_eof == 1) + { + while(ptr < (unsigned char*)buf->end && !PTR_IS_EOF(ptr)) { int payloadlen = gw_mysql_get_byte3(ptr); int packetlen = payloadlen + 4; @@ -353,11 +373,20 @@ bool parse_showdb_response(ROUTER_CLIENT_SES* rses, char* target, GWBUF* buf) } ptr += packetlen; } - rval = true; - } - - return rval; + if(ptr < (unsigned char*)buf->end && PTR_IS_EOF(ptr) && + bref->n_mapping_eof == 1) + { + atomic_add(&bref->n_mapping_eof,1); + skygw_log_write(LOGFILE_TRACE,"schemarouter: SHOW DATABASES fully received from %s.", + bref->bref_backend->backend_server->unique_name); + } + else + { + skygw_log_write(LOGFILE_TRACE,"schemarouter: SHOW DATABASES partially received from %s.", + bref->bref_backend->backend_server->unique_name); + } + return bref->n_mapping_eof == 2; } /** @@ -372,14 +401,14 @@ bool parse_showdb_response(ROUTER_CLIENT_SES* rses, char* target, GWBUF* buf) int gen_databaselist(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* session) { DCB* dcb; - const char* query = "SHOW DATABASES;"; + const char* query = "SHOW DATABASES"; GWBUF *buffer,*clone; int i,rval = 0; unsigned int len; session->init |= INIT_MAPPING; session->init &= ~INIT_UNINT; - len = strlen(query); + len = strlen(query) + 1; buffer = gwbuf_alloc(len + 4); *((unsigned char*)buffer->start) = len; *((unsigned char*)buffer->start + 1) = len>>8; @@ -391,7 +420,8 @@ int gen_databaselist(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* session) for(i = 0;irses_nbackends;i++) { if(BREF_IS_IN_USE(&session->rses_backend_ref[i]) && - !BREF_IS_CLOSED(&session->rses_backend_ref[i])) + !BREF_IS_CLOSED(&session->rses_backend_ref[i]) && + SERVER_IS_RUNNING(session->rses_backend_ref[i].bref_backend->backend_server)) { clone = gwbuf_clone(buffer); dcb = session->rses_backend_ref[i].bref_dcb; @@ -905,6 +935,8 @@ static void* newSession( backend_ref[i].bref_sescmd_cur.scmd_cur_chk_tail = CHK_NUM_SESCMD_CUR; #endif backend_ref[i].bref_state = 0; + backend_ref[i].n_mapping_eof = 0; + backend_ref[i].map_queue = NULL; backend_ref[i].bref_backend = router->servers[i]; /** store pointers to sescmd list to both cursors */ backend_ref[i].bref_sescmd_cur.scmd_cur_rses = client_rses; @@ -1734,7 +1766,7 @@ static int routeQuery( { char* querystr = modutil_get_SQL(querybuf); - skygw_log_write(LOGFILE_DEBUG,"schemarouter: Storing query for session %p: %s", + skygw_log_write(LOGFILE_DEBUG|LOGFILE_TRACE,"schemarouter: Storing query for session %p: %s", router_cli_ses->rses_client_dcb->session, querystr); free(querystr); @@ -2300,19 +2332,37 @@ static void clientReply ( bool mapped = true, logged = false; int i; backend_ref_t* bkrf = router_cli_ses->rses_backend_ref; + GWBUF* tmpbuf = writebuf; for(i = 0; i < router_cli_ses->rses_nbackends; i++) { if(bref->bref_dcb == bkrf[i].bref_dcb && !BREF_IS_MAPPED(&bkrf[i])) { - router_cli_ses->rses_backend_ref[i].bref_mapped = true; - parse_showdb_response(router_cli_ses, - router_cli_ses->rses_backend_ref[i].bref_backend->backend_server->unique_name, - writebuf); - skygw_log_write(LOGFILE_DEBUG,"schemarouter: Received SHOW DATABASES reply from %s for session %p", + + + if(bref->map_queue) + { + tmpbuf = gwbuf_append(bref->map_queue,tmpbuf); + } + + if(parse_showdb_response(router_cli_ses, + &router_cli_ses->rses_backend_ref[i], + &tmpbuf)) + { + router_cli_ses->rses_backend_ref[i].bref_mapped = true; + skygw_log_write(LOGFILE_DEBUG,"schemarouter: Received SHOW DATABASES reply from %s for session %p", router_cli_ses->rses_backend_ref[i].bref_backend->backend_server->unique_name, router_cli_ses->rses_client_dcb->session); + } + else + { + bref->map_queue = tmpbuf; + writebuf = NULL; + skygw_log_write(LOGFILE_DEBUG,"schemarouter: Received partial SHOW DATABASES reply from %s for session %p", + router_cli_ses->rses_backend_ref[i].bref_backend->backend_server->unique_name, + router_cli_ses->rses_client_dcb->session); + } } if(BREF_IS_IN_USE(&bkrf[i]) && @@ -2329,7 +2379,7 @@ static void clientReply ( } } - while((writebuf = gwbuf_consume(writebuf,gwbuf_length(writebuf)))); + while(tmpbuf && (tmpbuf = gwbuf_consume(tmpbuf,gwbuf_length(tmpbuf)))); if(mapped) { @@ -3885,7 +3935,7 @@ static bool handle_error_new_connection( { SESSION* ses; int router_nservers,i; - + unsigned char cmd = *((unsigned char*)errmsg->start + 4); backend_ref_t* bref; bool succp; @@ -3960,10 +4010,11 @@ static bool handle_error_new_connection( } rses->init |= INIT_MAPPING; - + for(i = 0;irses_nbackends;i++) { - bref_clear_state(&rses->rses_backend_ref[i],BREF_DB_MAPPED); + bref_clear_state(&rses->rses_backend_ref[i],BREF_DB_MAPPED); + rses->rses_backend_ref[i].n_mapping_eof = 0; } HASHITERATOR* iter = hashtable_iterator(rses->dbhash);