Use collectable resultset type for database mapping

When the databases are mapped, it is desirable to get the complete
response in one contiguous buffer. This removes the need to manually
process the partial packets in the router code.
This commit is contained in:
Markus Mäkelä
2017-04-03 19:29:14 +03:00
parent 2310381465
commit c37c5abfeb

View File

@ -1032,29 +1032,18 @@ int SchemaRouterSession::inspect_backend_mapping_states(SBackend& bref,
{ {
if (bref->dcb() == (*it)->dcb() && !(*it)->is_mapped()) if (bref->dcb() == (*it)->dcb() && !(*it)->is_mapped())
{ {
if (bref->m_map_queue)
{
writebuf = gwbuf_append(bref->m_map_queue, writebuf);
bref->m_map_queue = NULL;
}
enum showdb_response rc = parse_showdb_response(*it, &writebuf); enum showdb_response rc = parse_showdb_response(*it, &writebuf);
if (rc == SHOWDB_FULL_RESPONSE) if (rc == SHOWDB_FULL_RESPONSE)
{ {
(*it)->m_mapped = true; (*it)->set_mapped(true);
MXS_DEBUG("Received SHOW DATABASES reply from %s for session %p", MXS_DEBUG("Received SHOW DATABASES reply from %s for session %p",
(*it)->backend()->server->unique_name, (*it)->backend()->server->unique_name,
m_client->session); m_client->session);
} }
else if (rc == SHOWDB_PARTIAL_RESPONSE)
{
bref->m_map_queue = writebuf;
writebuf = NULL;
MXS_DEBUG("Received partial SHOW DATABASES reply from %s for session %p",
(*it)->backend()->server->unique_name,
m_client->session);
}
else else
{ {
ss_dassert(rc != SHOWDB_PARTIAL_RESPONSE);
DCB* client_dcb = NULL; DCB* client_dcb = NULL;
if ((m_state & INIT_FAILED) == 0) if ((m_state & INIT_FAILED) == 0)
@ -1294,6 +1283,7 @@ enum showdb_response SchemaRouterSession::parse_showdb_response(SBackend& bref,
{ {
return SHOWDB_PARTIAL_RESPONSE; return SHOWDB_PARTIAL_RESPONSE;
} }
int n_eof = 0;
ptr = (unsigned char*) buf->start; ptr = (unsigned char*) buf->start;
@ -1304,7 +1294,7 @@ enum showdb_response SchemaRouterSession::parse_showdb_response(SBackend& bref,
return SHOWDB_FATAL_ERROR; return SHOWDB_FATAL_ERROR;
} }
if (bref->m_num_mapping_eof == 0) if (n_eof == 0)
{ {
/** Skip column definitions */ /** Skip column definitions */
while (ptr < (unsigned char*) buf->end && !PTR_IS_EOF(ptr)) while (ptr < (unsigned char*) buf->end && !PTR_IS_EOF(ptr))
@ -1319,7 +1309,7 @@ enum showdb_response SchemaRouterSession::parse_showdb_response(SBackend& bref,
return SHOWDB_FATAL_ERROR; return SHOWDB_FATAL_ERROR;
} }
atomic_add(&bref->m_num_mapping_eof, 1); n_eof++;
/** Skip first EOF packet */ /** Skip first EOF packet */
ptr += gw_mysql_get_byte3(ptr) + 4; ptr += gw_mysql_get_byte3(ptr) + 4;
} }
@ -1353,9 +1343,9 @@ enum showdb_response SchemaRouterSession::parse_showdb_response(SBackend& bref,
ptr += packetlen; ptr += packetlen;
} }
if (ptr < (unsigned char*) buf->end && PTR_IS_EOF(ptr) && bref->m_num_mapping_eof == 1) if (ptr < (unsigned char*) buf->end && PTR_IS_EOF(ptr) && n_eof == 1)
{ {
atomic_add(&bref->m_num_mapping_eof, 1); n_eof++;
MXS_INFO("SHOW DATABASES fully received from %s.", MXS_INFO("SHOW DATABASES fully received from %s.",
bref->backend()->server->unique_name); bref->backend()->server->unique_name);
} }
@ -1371,7 +1361,7 @@ enum showdb_response SchemaRouterSession::parse_showdb_response(SBackend& bref,
{ {
rval = SHOWDB_DUPLICATE_DATABASES; rval = SHOWDB_DUPLICATE_DATABASES;
} }
else if (bref->m_num_mapping_eof == 2) else if (n_eof == 2)
{ {
rval = SHOWDB_FULL_RESPONSE; rval = SHOWDB_FULL_RESPONSE;
} }
@ -1390,36 +1380,26 @@ enum showdb_response SchemaRouterSession::parse_showdb_response(SBackend& bref,
*/ */
void SchemaRouterSession::gen_databaselist() void SchemaRouterSession::gen_databaselist()
{ {
DCB* dcb;
const char* query = "SHOW DATABASES";
GWBUF *buffer, *clone;
int i, rval = 0;
unsigned int len;
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{ {
(*it)->m_mapped = false; (*it)->set_mapped(false);
(*it)->m_num_mapping_eof = 0;
} }
m_state |= INIT_MAPPING; m_state |= INIT_MAPPING;
m_state &= ~INIT_UNINT; m_state &= ~INIT_UNINT;
len = strlen(query) + 1;
buffer = gwbuf_alloc(len + 4); GWBUF *buffer = modutil_create_query("SHOW DATABASES");
uint8_t *data = GWBUF_DATA(buffer); gwbuf_set_type(buffer, GWBUF_TYPE_COLLECT_RESULT);
*(data) = len;
*(data + 1) = len >> 8;
*(data + 2) = len >> 16;
*(data + 3) = 0x0;
*(data + 4) = 0x03;
memcpy(data + 5, query, strlen(query));
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{ {
if ((*it)->in_use() && !(*it)->is_closed() & if ((*it)->in_use() && !(*it)->is_closed() &
SERVER_IS_RUNNING((*it)->backend()->server)) SERVER_IS_RUNNING((*it)->backend()->server))
{ {
clone = gwbuf_clone(buffer); GWBUF* clone = gwbuf_clone(buffer);
MXS_ABORT_IF_NULL(clone);
if (!(*it)->write(clone)) if (!(*it)->write(clone))
{ {
MXS_ERROR("Failed to write SHOW DATABASES to '%s'", MXS_ERROR("Failed to write SHOW DATABASES to '%s'",