Fixed schemarouter not handling show databases queries spanning multiple buffers.
This commit is contained in:
@ -215,6 +215,8 @@ typedef struct backend_ref_st {
|
|||||||
#if defined(SS_DEBUG)
|
#if defined(SS_DEBUG)
|
||||||
skygw_chk_t bref_chk_top;
|
skygw_chk_t bref_chk_top;
|
||||||
#endif
|
#endif
|
||||||
|
int n_mapping_eof;
|
||||||
|
GWBUF* map_queue;
|
||||||
BACKEND* bref_backend; /*< Backend server */
|
BACKEND* bref_backend; /*< Backend server */
|
||||||
DCB* bref_dcb; /*< Backend DCB */
|
DCB* bref_dcb; /*< Backend DCB */
|
||||||
bref_state_t bref_state; /*< State of the backend */
|
bref_state_t bref_state; /*< State of the backend */
|
||||||
|
@ -311,32 +311,52 @@ char* get_lenenc_str(void* data, int* len)
|
|||||||
* @param buf GWBUF containing the result set
|
* @param buf GWBUF containing the result set
|
||||||
* @return True if the buffer contained a result set with a single column. All other responses return false.
|
* @return True if the buffer contained a result set with a single column. All other responses return false.
|
||||||
*/
|
*/
|
||||||
bool parse_showdb_response(ROUTER_CLIENT_SES* rses, char* target, GWBUF* buf)
|
bool parse_showdb_response(ROUTER_CLIENT_SES* rses, backend_ref_t* bref, GWBUF** buffer)
|
||||||
{
|
{
|
||||||
bool rval = false;
|
|
||||||
unsigned char* ptr;
|
unsigned char* ptr;
|
||||||
int more = 0;
|
char* target = bref->bref_backend->backend_server->unique_name;
|
||||||
if(PTR_IS_RESULTSET(((unsigned char*)buf->start)) &&
|
GWBUF* buf;
|
||||||
modutil_count_signal_packets(buf,0,0,&more) == 2)
|
|
||||||
|
if(buffer == NULL || *buffer == NULL)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
buf = modutil_get_complete_packets(buffer);
|
||||||
|
|
||||||
|
if(buf == NULL)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
ptr = (unsigned char*)buf->start;
|
||||||
|
|
||||||
|
if(PTR_IS_ERR(ptr))
|
||||||
|
{
|
||||||
|
skygw_log_write(LOGFILE_TRACE,"schemarouter: SHOW DATABASES returned an error.");
|
||||||
|
gwbuf_free(buf);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(bref->n_mapping_eof == 0)
|
||||||
{
|
{
|
||||||
ptr = (unsigned char*)buf->start;
|
|
||||||
|
|
||||||
if(ptr[4] != 1)
|
|
||||||
{
|
|
||||||
/** Something else came back, discard and return with an error*/
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Skip column definitions */
|
/** Skip column definitions */
|
||||||
while(!PTR_IS_EOF(ptr))
|
while(ptr < (unsigned char*)buf->end && !PTR_IS_EOF(ptr))
|
||||||
{
|
{
|
||||||
ptr += gw_mysql_get_byte3(ptr) + 4;
|
ptr += gw_mysql_get_byte3(ptr) + 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if(ptr >= (unsigned char*)buf->end)
|
||||||
|
{
|
||||||
|
skygw_log_write(LOGFILE_TRACE,"schemarouter: Malformed packet for SHOW DATABASES.");
|
||||||
|
*buffer = gwbuf_append(buf,*buffer);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
atomic_add(&bref->n_mapping_eof,1);
|
||||||
/** Skip first EOF packet */
|
/** Skip first EOF packet */
|
||||||
ptr += gw_mysql_get_byte3(ptr) + 4;
|
ptr += gw_mysql_get_byte3(ptr) + 4;
|
||||||
|
}
|
||||||
while(!PTR_IS_EOF(ptr))
|
|
||||||
|
if(bref->n_mapping_eof == 1)
|
||||||
|
{
|
||||||
|
while(ptr < (unsigned char*)buf->end && !PTR_IS_EOF(ptr))
|
||||||
{
|
{
|
||||||
int payloadlen = gw_mysql_get_byte3(ptr);
|
int payloadlen = gw_mysql_get_byte3(ptr);
|
||||||
int packetlen = payloadlen + 4;
|
int packetlen = payloadlen + 4;
|
||||||
@ -353,11 +373,20 @@ bool parse_showdb_response(ROUTER_CLIENT_SES* rses, char* target, GWBUF* buf)
|
|||||||
}
|
}
|
||||||
ptr += packetlen;
|
ptr += packetlen;
|
||||||
}
|
}
|
||||||
rval = true;
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
if(ptr < (unsigned char*)buf->end && PTR_IS_EOF(ptr) &&
|
||||||
return rval;
|
bref->n_mapping_eof == 1)
|
||||||
|
{
|
||||||
|
atomic_add(&bref->n_mapping_eof,1);
|
||||||
|
skygw_log_write(LOGFILE_TRACE,"schemarouter: SHOW DATABASES fully received from %s.",
|
||||||
|
bref->bref_backend->backend_server->unique_name);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
skygw_log_write(LOGFILE_TRACE,"schemarouter: SHOW DATABASES partially received from %s.",
|
||||||
|
bref->bref_backend->backend_server->unique_name);
|
||||||
|
}
|
||||||
|
return bref->n_mapping_eof == 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -372,14 +401,14 @@ bool parse_showdb_response(ROUTER_CLIENT_SES* rses, char* target, GWBUF* buf)
|
|||||||
int gen_databaselist(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* session)
|
int gen_databaselist(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* session)
|
||||||
{
|
{
|
||||||
DCB* dcb;
|
DCB* dcb;
|
||||||
const char* query = "SHOW DATABASES;";
|
const char* query = "SHOW DATABASES";
|
||||||
GWBUF *buffer,*clone;
|
GWBUF *buffer,*clone;
|
||||||
int i,rval = 0;
|
int i,rval = 0;
|
||||||
unsigned int len;
|
unsigned int len;
|
||||||
|
|
||||||
session->init |= INIT_MAPPING;
|
session->init |= INIT_MAPPING;
|
||||||
session->init &= ~INIT_UNINT;
|
session->init &= ~INIT_UNINT;
|
||||||
len = strlen(query);
|
len = strlen(query) + 1;
|
||||||
buffer = gwbuf_alloc(len + 4);
|
buffer = gwbuf_alloc(len + 4);
|
||||||
*((unsigned char*)buffer->start) = len;
|
*((unsigned char*)buffer->start) = len;
|
||||||
*((unsigned char*)buffer->start + 1) = len>>8;
|
*((unsigned char*)buffer->start + 1) = len>>8;
|
||||||
@ -391,7 +420,8 @@ int gen_databaselist(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* session)
|
|||||||
for(i = 0;i<session->rses_nbackends;i++)
|
for(i = 0;i<session->rses_nbackends;i++)
|
||||||
{
|
{
|
||||||
if(BREF_IS_IN_USE(&session->rses_backend_ref[i]) &&
|
if(BREF_IS_IN_USE(&session->rses_backend_ref[i]) &&
|
||||||
!BREF_IS_CLOSED(&session->rses_backend_ref[i]))
|
!BREF_IS_CLOSED(&session->rses_backend_ref[i]) &&
|
||||||
|
SERVER_IS_RUNNING(session->rses_backend_ref[i].bref_backend->backend_server))
|
||||||
{
|
{
|
||||||
clone = gwbuf_clone(buffer);
|
clone = gwbuf_clone(buffer);
|
||||||
dcb = session->rses_backend_ref[i].bref_dcb;
|
dcb = session->rses_backend_ref[i].bref_dcb;
|
||||||
@ -905,6 +935,8 @@ static void* newSession(
|
|||||||
backend_ref[i].bref_sescmd_cur.scmd_cur_chk_tail = CHK_NUM_SESCMD_CUR;
|
backend_ref[i].bref_sescmd_cur.scmd_cur_chk_tail = CHK_NUM_SESCMD_CUR;
|
||||||
#endif
|
#endif
|
||||||
backend_ref[i].bref_state = 0;
|
backend_ref[i].bref_state = 0;
|
||||||
|
backend_ref[i].n_mapping_eof = 0;
|
||||||
|
backend_ref[i].map_queue = NULL;
|
||||||
backend_ref[i].bref_backend = router->servers[i];
|
backend_ref[i].bref_backend = router->servers[i];
|
||||||
/** store pointers to sescmd list to both cursors */
|
/** store pointers to sescmd list to both cursors */
|
||||||
backend_ref[i].bref_sescmd_cur.scmd_cur_rses = client_rses;
|
backend_ref[i].bref_sescmd_cur.scmd_cur_rses = client_rses;
|
||||||
@ -1734,7 +1766,7 @@ static int routeQuery(
|
|||||||
{
|
{
|
||||||
|
|
||||||
char* querystr = modutil_get_SQL(querybuf);
|
char* querystr = modutil_get_SQL(querybuf);
|
||||||
skygw_log_write(LOGFILE_DEBUG,"schemarouter: Storing query for session %p: %s",
|
skygw_log_write(LOGFILE_DEBUG|LOGFILE_TRACE,"schemarouter: Storing query for session %p: %s",
|
||||||
router_cli_ses->rses_client_dcb->session,
|
router_cli_ses->rses_client_dcb->session,
|
||||||
querystr);
|
querystr);
|
||||||
free(querystr);
|
free(querystr);
|
||||||
@ -2300,19 +2332,37 @@ static void clientReply (
|
|||||||
bool mapped = true, logged = false;
|
bool mapped = true, logged = false;
|
||||||
int i;
|
int i;
|
||||||
backend_ref_t* bkrf = router_cli_ses->rses_backend_ref;
|
backend_ref_t* bkrf = router_cli_ses->rses_backend_ref;
|
||||||
|
GWBUF* tmpbuf = writebuf;
|
||||||
|
|
||||||
for(i = 0; i < router_cli_ses->rses_nbackends; i++)
|
for(i = 0; i < router_cli_ses->rses_nbackends; i++)
|
||||||
{
|
{
|
||||||
|
|
||||||
if(bref->bref_dcb == bkrf[i].bref_dcb && !BREF_IS_MAPPED(&bkrf[i]))
|
if(bref->bref_dcb == bkrf[i].bref_dcb && !BREF_IS_MAPPED(&bkrf[i]))
|
||||||
{
|
{
|
||||||
router_cli_ses->rses_backend_ref[i].bref_mapped = true;
|
|
||||||
parse_showdb_response(router_cli_ses,
|
|
||||||
router_cli_ses->rses_backend_ref[i].bref_backend->backend_server->unique_name,
|
if(bref->map_queue)
|
||||||
writebuf);
|
{
|
||||||
skygw_log_write(LOGFILE_DEBUG,"schemarouter: Received SHOW DATABASES reply from %s for session %p",
|
tmpbuf = gwbuf_append(bref->map_queue,tmpbuf);
|
||||||
|
}
|
||||||
|
|
||||||
|
if(parse_showdb_response(router_cli_ses,
|
||||||
|
&router_cli_ses->rses_backend_ref[i],
|
||||||
|
&tmpbuf))
|
||||||
|
{
|
||||||
|
router_cli_ses->rses_backend_ref[i].bref_mapped = true;
|
||||||
|
skygw_log_write(LOGFILE_DEBUG,"schemarouter: Received SHOW DATABASES reply from %s for session %p",
|
||||||
router_cli_ses->rses_backend_ref[i].bref_backend->backend_server->unique_name,
|
router_cli_ses->rses_backend_ref[i].bref_backend->backend_server->unique_name,
|
||||||
router_cli_ses->rses_client_dcb->session);
|
router_cli_ses->rses_client_dcb->session);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
bref->map_queue = tmpbuf;
|
||||||
|
writebuf = NULL;
|
||||||
|
skygw_log_write(LOGFILE_DEBUG,"schemarouter: Received partial SHOW DATABASES reply from %s for session %p",
|
||||||
|
router_cli_ses->rses_backend_ref[i].bref_backend->backend_server->unique_name,
|
||||||
|
router_cli_ses->rses_client_dcb->session);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if(BREF_IS_IN_USE(&bkrf[i]) &&
|
if(BREF_IS_IN_USE(&bkrf[i]) &&
|
||||||
@ -2329,7 +2379,7 @@ static void clientReply (
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
while((writebuf = gwbuf_consume(writebuf,gwbuf_length(writebuf))));
|
while(tmpbuf && (tmpbuf = gwbuf_consume(tmpbuf,gwbuf_length(tmpbuf))));
|
||||||
|
|
||||||
if(mapped)
|
if(mapped)
|
||||||
{
|
{
|
||||||
@ -3885,7 +3935,7 @@ static bool handle_error_new_connection(
|
|||||||
{
|
{
|
||||||
SESSION* ses;
|
SESSION* ses;
|
||||||
int router_nservers,i;
|
int router_nservers,i;
|
||||||
|
unsigned char cmd = *((unsigned char*)errmsg->start + 4);
|
||||||
|
|
||||||
backend_ref_t* bref;
|
backend_ref_t* bref;
|
||||||
bool succp;
|
bool succp;
|
||||||
@ -3960,10 +4010,11 @@ static bool handle_error_new_connection(
|
|||||||
}
|
}
|
||||||
|
|
||||||
rses->init |= INIT_MAPPING;
|
rses->init |= INIT_MAPPING;
|
||||||
|
|
||||||
for(i = 0;i<rses->rses_nbackends;i++)
|
for(i = 0;i<rses->rses_nbackends;i++)
|
||||||
{
|
{
|
||||||
bref_clear_state(&rses->rses_backend_ref[i],BREF_DB_MAPPED);
|
bref_clear_state(&rses->rses_backend_ref[i],BREF_DB_MAPPED);
|
||||||
|
rses->rses_backend_ref[i].n_mapping_eof = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
HASHITERATOR* iter = hashtable_iterator(rses->dbhash);
|
HASHITERATOR* iter = hashtable_iterator(rses->dbhash);
|
||||||
|
Reference in New Issue
Block a user