Clean up routeQuery and clientReply

Moved parts of the functionality into subfunctions. Reordered code to
remove redundant logic.
This commit is contained in:
Markus Mäkelä
2017-03-27 12:01:27 +03:00
parent 39903e40b7
commit a10aa85736
2 changed files with 318 additions and 461 deletions

View File

@ -68,7 +68,6 @@ SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter& rou
this->m_router = router; this->m_router = router;
this->m_client = (DCB*)session->client_dcb; this->m_client = (DCB*)session->client_dcb;
this->m_queue = NULL;
this->m_closed = false; this->m_closed = false;
this->m_sent_sescmd = 0; this->m_sent_sescmd = 0;
this->m_replied_sescmd = 0; this->m_replied_sescmd = 0;
@ -192,8 +191,6 @@ void SchemaRouterSession::close()
} }
} }
gwbuf_free(this->m_queue);
spinlock_acquire(&m_router.m_lock); spinlock_acquire(&m_router.m_lock);
if (m_router.m_stats.longest_sescmd < this->m_stats.longest_sescmd) if (m_router.m_stats.longest_sescmd < this->m_stats.longest_sescmd)
{ {
@ -217,93 +214,12 @@ void SchemaRouterSession::close()
} }
} }
int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) static void inspect_query(GWBUF* pPacket, uint32_t* type, qc_query_op_t* op, uint8_t* command)
{ {
uint32_t qtype = QUERY_TYPE_UNKNOWN; uint8_t* data = GWBUF_DATA(pPacket);
uint8_t packet_type; *command = data[4];
uint8_t* packet;
int ret = 0;
DCB* target_dcb = NULL;
bool change_successful = false;
route_target_t route_target = TARGET_UNDEFINED;
bool succp = false;
char db[MYSQL_DATABASE_MAXLEN + 1];
char errbuf[26 + MYSQL_DATABASE_MAXLEN];
SERVER* target = NULL; switch (*command)
ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(pPacket));
if (this->m_closed)
{
return 0;
}
if (this->m_shard.empty())
{
/* Generate database list */
gen_databaselist();
}
/**
* If the databases are still being mapped or if the client connected
* with a default database but no database mapping was performed we need
* to store the query. Once the databases have been mapped and/or the
* default database is taken into use we can send the query forward.
*/
if (this->m_state & (INIT_MAPPING | INIT_USE_DB))
{
int init_rval = 1;
char* querystr = modutil_get_SQL(pPacket);
MXS_INFO("Storing query for session %p: %s",
this->m_client->session,
querystr);
MXS_FREE(querystr);
pPacket = gwbuf_make_contiguous(pPacket);
GWBUF* ptr = this->m_queue;
while (ptr && ptr->next)
{
ptr = ptr->next;
}
if (ptr == NULL)
{
this->m_queue = pPacket;
}
else
{
ptr->next = pPacket;
}
if (this->m_state == (INIT_READY | INIT_USE_DB))
{
/**
* This state is possible if a client connects with a default database
* and the shard map was found from the router cache
*/
if (!handle_default_db())
{
init_rval = 0;
}
}
return init_rval;
}
packet = GWBUF_DATA(pPacket);
packet_type = packet[4];
qc_query_op_t op = QUERY_OP_UNDEFINED;
if (detect_show_shards(pPacket))
{
process_show_shards();
gwbuf_free(pPacket);
return 1;
}
switch (packet_type)
{ {
case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */ case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */
case MYSQL_COM_INIT_DB: /*< 2 DDL must go to the master */ case MYSQL_COM_INIT_DB: /*< 2 DDL must go to the master */
@ -314,27 +230,27 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
case MYSQL_COM_STMT_CLOSE: /*< free prepared statement */ case MYSQL_COM_STMT_CLOSE: /*< free prepared statement */
case MYSQL_COM_STMT_SEND_LONG_DATA: /*< send data to column */ case MYSQL_COM_STMT_SEND_LONG_DATA: /*< send data to column */
case MYSQL_COM_STMT_RESET: /*< resets the data of a prepared statement */ case MYSQL_COM_STMT_RESET: /*< resets the data of a prepared statement */
qtype = QUERY_TYPE_SESSION_WRITE; *type = QUERY_TYPE_SESSION_WRITE;
break; break;
case MYSQL_COM_CREATE_DB: /**< 5 DDL must go to the master */ case MYSQL_COM_CREATE_DB: /**< 5 DDL must go to the master */
case MYSQL_COM_DROP_DB: /**< 6 DDL must go to the master */ case MYSQL_COM_DROP_DB: /**< 6 DDL must go to the master */
qtype = QUERY_TYPE_WRITE; *type = QUERY_TYPE_WRITE;
break; break;
case MYSQL_COM_QUERY: case MYSQL_COM_QUERY:
qtype = qc_get_type_mask(pPacket); *type = qc_get_type_mask(pPacket);
op = qc_get_operation(pPacket); *op = qc_get_operation(pPacket);
break; break;
case MYSQL_COM_STMT_PREPARE: case MYSQL_COM_STMT_PREPARE:
qtype = qc_get_type_mask(pPacket); *type = qc_get_type_mask(pPacket);
qtype |= QUERY_TYPE_PREPARE_STMT; *type |= QUERY_TYPE_PREPARE_STMT;
break; break;
case MYSQL_COM_STMT_EXECUTE: case MYSQL_COM_STMT_EXECUTE:
/** Parsing is not needed for this type of packet */ /** Parsing is not needed for this type of packet */
qtype = QUERY_TYPE_EXEC_STMT; *type = QUERY_TYPE_EXEC_STMT;
break; break;
case MYSQL_COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */ case MYSQL_COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */
@ -353,167 +269,50 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
{ {
char *sql; char *sql;
int sql_len; int sql_len;
char* qtypestr = qc_typemask_to_string(qtype); char* qtypestr = qc_typemask_to_string(*type);
modutil_extract_SQL(pPacket, &sql, &sql_len); modutil_extract_SQL(pPacket, &sql, &sql_len);
MXS_INFO("> Command: %s, stmt: %.*s %s%s", MXS_INFO("> Command: %s, stmt: %.*s %s%s",
STRPACKETTYPE(packet_type), sql_len, sql, STRPACKETTYPE(*command), sql_len, sql,
(pPacket->hint == NULL ? "" : ", Hint:"), (pPacket->hint == NULL ? "" : ", Hint:"),
(pPacket->hint == NULL ? "" : STRHINTTYPE(pPacket->hint->type))); (pPacket->hint == NULL ? "" : STRHINTTYPE(pPacket->hint->type)));
MXS_FREE(qtypestr); MXS_FREE(qtypestr);
} }
/** }
* Find out whether the query should be routed to single server or to
* all of them.
*/
if (packet_type == MYSQL_COM_INIT_DB || op == QUERY_OP_CHANGE_DB) SERVER* SchemaRouterSession::resolve_query_target(GWBUF* pPacket,
uint32_t type,
uint8_t command,
route_target_t& route_target)
{
SERVER* target = NULL;
if (route_target != TARGET_NAMED_SERVER)
{ {
change_successful = change_current_db(this->m_current_db, /** We either don't know or don't care where this query should go */
this->m_shard, target = get_shard_target(pPacket, type);
pPacket);
if (!change_successful)
{
extract_database(pPacket, db);
snprintf(errbuf, 25 + MYSQL_DATABASE_MAXLEN, "Unknown database: %s", db);
if (this->m_config.debug) if (target && SERVER_IS_RUNNING(target))
{
sprintf(errbuf + strlen(errbuf),
" ([%lu]: DB change failed)",
this->m_client->session->ses_id);
}
write_error_to_client(this->m_client,
SCHEMA_ERR_DBNOTFOUND,
SCHEMA_ERRSTR_DBNOTFOUND,
errbuf);
MXS_ERROR("Changing database failed.");
gwbuf_free(pPacket);
return 1;
}
}
/** Create the response to the SHOW DATABASES from the mapped databases */
if (qc_query_is_type(qtype, QUERY_TYPE_SHOW_DATABASES))
{
if (send_database_list())
{
ret = 1;
}
gwbuf_free(pPacket);
return ret;
}
route_target = get_shard_route_target(qtype);
if (packet_type == MYSQL_COM_INIT_DB || op == QUERY_OP_CHANGE_DB)
{
route_target = TARGET_UNDEFINED;
target = this->m_shard.get_location(this->m_current_db);
if (target)
{
MXS_INFO("INIT_DB for database '%s' on server '%s'",
this->m_current_db.c_str(), target->unique_name);
route_target = TARGET_NAMED_SERVER;
}
else
{
MXS_INFO("INIT_DB with unknown database");
}
}
else if (route_target != TARGET_ALL)
{
/** If no database is found in the query and there is no active database
* or hints in the query we route the query to the first available
* server. This isn't ideal for monitoring server status but works if
* we just want the server to send an error back. */
target = get_shard_target(pPacket, qtype);
if (target)
{
if (SERVER_IS_RUNNING(target))
{ {
route_target = TARGET_NAMED_SERVER; route_target = TARGET_NAMED_SERVER;
} }
else
{
MXS_INFO("Backend server '%s' is not in a viable state", target->unique_name);
/**
* Shard is not a viable target right now so we check
* for an alternate backend with the database. If this is not found
* the target is undefined and an error will be returned to the client.
*/
}
}
} }
if (TARGET_IS_UNDEFINED(route_target)) if (TARGET_IS_UNDEFINED(route_target))
{ {
target = get_shard_target(pPacket, qtype); /** We don't know where to send this. Route it to either the server with
* the current default database or to the first available server. */
target = get_shard_target(pPacket, type);
if ((target == NULL && if ((target == NULL && command != MYSQL_COM_INIT_DB && this->m_current_db.length() == 0) ||
packet_type != MYSQL_COM_INIT_DB && command == MYSQL_COM_FIELD_LIST ||
this->m_current_db.length() == 0) || this->m_current_db.length() == 0)
packet_type == MYSQL_COM_FIELD_LIST ||
(this->m_current_db.length() == 0))
{ {
/** /** No current database and no databases in query or the database is
* No current database and no databases in query or * ignored, route to first available backend. */
* the database is ignored, route to first available backend.
*/
route_target = TARGET_ANY; route_target = TARGET_ANY;
MXS_INFO("Routing query to first available backend.");
} }
else
{
if (!change_successful)
{
/**
* Bad shard status. The changing of the database
* was not successful and the error message was already sent.
*/
ret = 1;
}
else
{
MXS_ERROR("Error : Router internal failure (schemarouter)");
/** Something else went wrong, terminate connection */
ret = 0;
}
gwbuf_free(pPacket);
return ret;
}
}
if (TARGET_IS_ALL(route_target))
{
/**
* It is not sure if the session command in question requires
* response. Statement is examined in route_session_write.
* Router locking is done inside the function.
*/
succp = route_session_write(pPacket, packet_type);
if (succp)
{
atomic_add(&m_router.m_stats.n_sescmd, 1);
atomic_add(&m_router.m_stats.n_queries, 1);
ret = 1;
}
gwbuf_free(pPacket);
return ret;
} }
if (TARGET_IS_ANY(route_target)) if (TARGET_IS_ANY(route_target))
@ -533,54 +332,166 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
{ {
/**No valid backends alive*/ /**No valid backends alive*/
MXS_ERROR("Failed to route query, no backends are available."); MXS_ERROR("Failed to route query, no backends are available.");
gwbuf_free(pPacket); }
}
return target;
}
int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
{
ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(pPacket));
if (this->m_closed)
{
return 0; return 0;
} }
if (this->m_shard.empty())
{
/* Generate database list */
gen_databaselist();
} }
int ret = 0;
/** /**
* Query is routed to one of the backends * If the databases are still being mapped or if the client connected
* with a default database but no database mapping was performed we need
* to store the query. Once the databases have been mapped and/or the
* default database is taken into use we can send the query forward.
*/ */
if (TARGET_IS_NAMED_SERVER(route_target) && target) if (this->m_state & (INIT_MAPPING | INIT_USE_DB))
{
this->m_queue.push_back(pPacket);
if (this->m_state == (INIT_READY | INIT_USE_DB))
{ {
/** /**
* Search backend server by name or replication lag. * This state is possible if a client connects with a default database
* If it fails, then try to find valid slave or master. * and the shard map was found from the router cache
*/ */
if (handle_default_db())
succp = get_shard_dcb(&target_dcb, target->unique_name);
if (!succp)
{ {
MXS_INFO("Was supposed to route to named server " ret = 1;
"%s but couldn't find the server in a " }
"suitable state.", target->unique_name);
} }
return ret;
} }
if (succp) /*< Have DCB of the target backend */ uint8_t command = 0;
{ SERVER* target = NULL;
backend_ref_t *bref = get_bref_from_dcb(target_dcb); uint32_t type = QUERY_TYPE_UNKNOWN;
qc_query_op_t op = QUERY_OP_UNDEFINED;
route_target_t route_target = TARGET_UNDEFINED;
MXS_INFO("Route query to \t%s:%d <", inspect_query(pPacket, &type, &op, &command);
bref->backend->server->name,
bref->backend->server->port); /** Create the response to the SHOW DATABASES from the mapped databases */
/** if (qc_query_is_type(type, QUERY_TYPE_SHOW_DATABASES))
* Store current stmt if execution of previous session command
* haven't completed yet. Note that according to MySQL protocol
* there can only be one such non-sescmd stmt at the time.
*/
if (bref->session_commands.size() > 0)
{ {
ss_dassert((bref->pending_cmd == NULL || if (send_database_list())
this->m_closed)); {
bref->pending_cmd = pPacket; ret = 1;
}
gwbuf_free(pPacket);
return ret;
}
else if (detect_show_shards(pPacket))
{
if (process_show_shards())
{
ret = 1;
}
gwbuf_free(pPacket);
return ret;
}
/** The default database changes must be routed to a specific server */
if (command == MYSQL_COM_INIT_DB || op == QUERY_OP_CHANGE_DB)
{
if (!change_current_db(m_current_db, m_shard, pPacket))
{
char db[MYSQL_DATABASE_MAXLEN + 1];
extract_database(pPacket, db);
gwbuf_free(pPacket);
char errbuf[128 + MYSQL_DATABASE_MAXLEN];
snprintf(errbuf, sizeof(errbuf), "Unknown database: %s", db);
if (this->m_config.debug)
{
sprintf(errbuf + strlen(errbuf),
" ([%lu]: DB change failed)",
this->m_client->session->ses_id);
}
write_error_to_client(this->m_client,
SCHEMA_ERR_DBNOTFOUND,
SCHEMA_ERRSTR_DBNOTFOUND,
errbuf);
return 1; return 1;
} }
if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(pPacket))) == 1) route_target = TARGET_UNDEFINED;
target = this->m_shard.get_location(this->m_current_db);
if (target)
{
MXS_INFO("INIT_DB for database '%s' on server '%s'",
this->m_current_db.c_str(), target->unique_name);
route_target = TARGET_NAMED_SERVER;
}
else
{
MXS_INFO("INIT_DB with unknown database");
}
}
else
{
route_target = get_shard_route_target(type);
}
/**
* Find a suitable server that matches the requirements of @c route_target
*/
if (TARGET_IS_ALL(route_target))
{
/** Session commands, route to all servers */
if (route_session_write(pPacket, command))
{
atomic_add(&m_router.m_stats.n_sescmd, 1);
atomic_add(&m_router.m_stats.n_queries, 1);
ret = 1;
}
}
else
{
target = resolve_query_target(pPacket, type, command, route_target);
}
DCB* target_dcb = NULL;
if (TARGET_IS_NAMED_SERVER(route_target) && target &&
get_shard_dcb(&target_dcb, target->unique_name))
{
/** We know where to route this query */
backend_ref_t *bref = get_bref_from_dcb(target_dcb);
MXS_INFO("Route query to \t%s:%d <", bref->backend->server->name, bref->backend->server->port);
if (bref->session_commands.size() > 0)
{
/** Store current statement if execution of the previous
* session command hasn't been completed. */
ss_dassert((bref->pending_cmd == NULL || this->m_closed));
bref->pending_cmd = pPacket;
ret = 1;
}
else if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(pPacket))) == 1)
{ {
backend_ref_t* bref; backend_ref_t* bref;
@ -600,75 +511,30 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
} }
gwbuf_free(pPacket); gwbuf_free(pPacket);
return ret; return ret;
} }
void SchemaRouterSession::handle_mapping_reply(backend_ref_t* bref, GWBUF* pPacket)
void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
{ {
backend_ref_t* bref; int rc = inspect_backend_mapping_states(bref, &pPacket);
GWBUF* writebuf = pPacket;
/**
* Lock router client session for secure read of router session members.
* Note that this could be done without lock by using version #
*/
if (this->m_closed)
{
gwbuf_free(pPacket);
return;
}
/** Holding lock ensures that router session remains open */
ss_dassert(pDcb->session != NULL);
DCB *client_dcb = pDcb->session->client_dcb;
bref = get_bref_from_dcb(pDcb);
if (bref == NULL)
{
gwbuf_free(writebuf);
return;
}
MXS_DEBUG("Reply from [%s] session [%p]"
" mapping [%s] queries queued [%s]",
bref->backend->server->unique_name,
this->m_client->session,
this->m_state & INIT_MAPPING ? "true" : "false",
this->m_queue == NULL ? "none" :
this->m_queue->next ? "multiple" : "one");
if (this->m_state & INIT_MAPPING)
{
int rc = inspect_backend_mapping_states(bref, &writebuf);
gwbuf_free(writebuf);
writebuf = NULL;
if (rc == 1) if (rc == 1)
{ {
synchronize_shard_map(); synchronize_shard_map();
/*
* Check if the session is reconnecting with a database name
* that is not in the hashtable. If the database is not found
* then close the session.
*/
this->m_state &= ~INIT_MAPPING; this->m_state &= ~INIT_MAPPING;
/* Check if the session is reconnecting with a database name
* that is not in the hashtable. If the database is not found
* then close the session. */
if (this->m_state & INIT_USE_DB) if (this->m_state & INIT_USE_DB)
{ {
bool success = handle_default_db(); if (!handle_default_db())
if (!success)
{ {
dcb_close(this->m_client); rc = -1;
} }
return;
} }
if (this->m_queue) if (this->m_queue.size() && rc != -1)
{ {
ss_dassert(this->m_state == INIT_READY); ss_dassert(this->m_state == INIT_READY);
route_queued_query(); route_queued_query();
@ -677,143 +543,125 @@ void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
if (rc == -1) if (rc == -1)
{ {
dcb_close(this->m_client); poll_fake_hangup_event(this->m_client);
}
return;
} }
}
if (this->m_state & INIT_USE_DB) void SchemaRouterSession::process_response(backend_ref_t* bref, GWBUF** ppPacket)
{ {
MXS_DEBUG("Reply to USE '%s' received for session %p",
this->m_connect_db.c_str(),
this->m_client->session);
this->m_state &= ~INIT_USE_DB;
this->m_current_db = this->m_connect_db;
ss_dassert(this->m_state == INIT_READY);
if (this->m_queue)
{
route_queued_query();
}
gwbuf_free(writebuf);
return;
}
if (this->m_queue)
{
ss_dassert(this->m_state == INIT_READY);
route_queued_query();
return;
}
/**
* Active cursor means that reply is from session command
* execution.
*/
if (bref->session_commands.size() > 0) if (bref->session_commands.size() > 0)
{ {
if (GWBUF_IS_TYPE_SESCMD_RESPONSE(writebuf)) /** We are executing a session command */
if (GWBUF_IS_TYPE_SESCMD_RESPONSE((*ppPacket)))
{ {
/**
* Discard all those responses that have already been sent to
* the client. Return with buffer including response that
* needs to be sent to client or NULL.
*/
if (this->m_replied_sescmd < this->m_sent_sescmd && if (this->m_replied_sescmd < this->m_sent_sescmd &&
bref->session_commands.front().get_position() == this->m_replied_sescmd + 1) bref->session_commands.front().get_position() == this->m_replied_sescmd + 1)
{ {
/** First reply to this session command, route it to the client */
++this->m_replied_sescmd; ++this->m_replied_sescmd;
} }
else else
{ {
/** The reply to this session command has already been sent /** The reply to this session command has already been sent to
* to the client. */ * the client, discard it */
gwbuf_free(writebuf); gwbuf_free(*ppPacket);
writebuf = NULL; *ppPacket = NULL;
} }
bref->session_commands.pop_front(); bref->session_commands.pop_front();
} }
/**
* If response will be sent to client, decrease waiter count. if (*ppPacket)
* This applies to session commands only. Counter decrement
* for other type of queries is done outside this block.
*/
if (writebuf != NULL && client_dcb != NULL)
{ {
/** Set response status as replied */
bref_clear_state(bref, BREF_WAITING_RESULT); bref_clear_state(bref, BREF_WAITING_RESULT);
} }
} }
/**
* Clear BREF_QUERY_ACTIVE flag and decrease waiter counter.
* This applies for queries other than session commands.
*/
else if (BREF_IS_QUERY_ACTIVE(bref)) else if (BREF_IS_QUERY_ACTIVE(bref))
{ {
bref_clear_state(bref, BREF_QUERY_ACTIVE); bref_clear_state(bref, BREF_QUERY_ACTIVE);
/** Set response status as replied */ /** Set response status as replied */
bref_clear_state(bref, BREF_WAITING_RESULT); bref_clear_state(bref, BREF_WAITING_RESULT);
} }
}
if (writebuf != NULL && client_dcb != NULL) void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
{
backend_ref_t* bref = get_bref_from_dcb(pDcb);
if (this->m_closed || bref == NULL)
{ {
unsigned char* cmd = (unsigned char*) writebuf->start; gwbuf_free(pPacket);
int state = this->m_state; return;
/** Write reply to client DCB */ }
MXS_INFO("returning reply [%s] "
"state [%s] session [%p]", MXS_DEBUG("Reply from [%s] session [%p]"
PTR_IS_ERR(cmd) ? "ERR" : PTR_IS_OK(cmd) ? "OK" : "RSET", " mapping [%s] queries queued [%s]",
state & INIT_UNINT ? "UNINIT" : state & INIT_MAPPING ? "MAPPING" : "READY", bref->backend->server->unique_name,
this->m_client->session); this->m_client->session,
MXS_SESSION_ROUTE_REPLY(pDcb->session, writebuf); this->m_state & INIT_MAPPING ? "true" : "false",
this->m_queue.size() == 0 ? "none" :
this->m_queue.size() > 0 ? "multiple" : "one");
if (this->m_state & INIT_MAPPING)
{
handle_mapping_reply(bref, pPacket);
}
else if (this->m_state & INIT_USE_DB)
{
MXS_DEBUG("Reply to USE '%s' received for session %p",
this->m_connect_db.c_str(), this->m_client->session);
this->m_state &= ~INIT_USE_DB;
this->m_current_db = this->m_connect_db;
ss_dassert(this->m_state == INIT_READY);
if (this->m_queue.size())
{
route_queued_query();
}
}
else if (this->m_queue.size())
{
ss_dassert(this->m_state == INIT_READY);
route_queued_query();
}
else
{
process_response(bref, &pPacket);
if (pPacket)
{
MXS_SESSION_ROUTE_REPLY(pDcb->session, pPacket);
pPacket = NULL;
} }
/** There is one pending session command to be executed. */
if (bref->session_commands.size() > 0) if (bref->session_commands.size() > 0)
{ {
/** There are pending session commands to be executed. */
MXS_INFO("Backend %s:%d processed reply and starts to execute " MXS_INFO("Backend %s:%d processed reply and starts to execute active cursor.",
"active cursor.", bref->backend->server->name, bref->backend->server->port);
bref->backend->server->name,
bref->backend->server->port);
execute_sescmd_in_backend(bref); execute_sescmd_in_backend(bref);
} }
else if (bref->pending_cmd != NULL) /*< non-sescmd is waiting to be routed */ else if (bref->pending_cmd) /*< non-sescmd is waiting to be routed */
{ {
int ret;
CHK_GWBUF(bref->pending_cmd); CHK_GWBUF(bref->pending_cmd);
int ret = bref->dcb->func.write(bref->dcb, bref->pending_cmd);
bref->pending_cmd = NULL;
if ((ret = bref->dcb->func.write(bref->dcb, gwbuf_clone(bref->pending_cmd))) == 1) if (ret == 1)
{ {
atomic_add(&this->m_router.m_stats.n_queries, 1); atomic_add(&this->m_router.m_stats.n_queries, 1);
/**
* Add one query response waiter to backend reference
*/
bref_set_state(bref, BREF_QUERY_ACTIVE); bref_set_state(bref, BREF_QUERY_ACTIVE);
bref_set_state(bref, BREF_WAITING_RESULT); bref_set_state(bref, BREF_WAITING_RESULT);
} }
else else
{ {
char* sql = modutil_get_SQL(bref->pending_cmd); MXS_ERROR("Routing of pending query failed.");
}
}
}
if (sql) gwbuf_free(pPacket);
{
MXS_ERROR("Routing query \"%s\" failed.", sql);
MXS_FREE(sql);
}
else
{
MXS_ERROR("Routing query failed.");
}
}
gwbuf_free(bref->pending_cmd);
bref->pending_cmd = NULL;
}
} }
void SchemaRouterSession::handleError(GWBUF* pMessage, void SchemaRouterSession::handleError(GWBUF* pMessage,
@ -1230,9 +1078,9 @@ RESULT_ROW* shard_list_cb(struct resultset* rset, void* data)
* @param rses Router client session * @param rses Router client session
* @return 0 on success, -1 on error * @return 0 on success, -1 on error
*/ */
int SchemaRouterSession::process_show_shards() bool SchemaRouterSession::process_show_shards()
{ {
int rval = -1; bool rval = false;
ServerMap pContent; ServerMap pContent;
this->m_shard.get_content(pContent); this->m_shard.get_content(pContent);
@ -1244,7 +1092,7 @@ int SchemaRouterSession::process_show_shards()
resultset_add_column(rset, "Server", MYSQL_DATABASE_MAXLEN, COL_TYPE_VARCHAR); resultset_add_column(rset, "Server", MYSQL_DATABASE_MAXLEN, COL_TYPE_VARCHAR);
resultset_stream_mysql(rset, this->m_client); resultset_stream_mysql(rset, this->m_client);
resultset_free(rset); resultset_free(rset);
rval = 0; rval = true;
} }
return rval; return rval;
@ -1342,9 +1190,9 @@ bool SchemaRouterSession::handle_default_db()
void SchemaRouterSession::route_queued_query() void SchemaRouterSession::route_queued_query()
{ {
GWBUF* tmp = this->m_queue; GWBUF* tmp = this->m_queue.front().release();
this->m_queue = this->m_queue->next; this->m_queue.pop_front();
tmp->next = NULL;
#ifdef SS_DEBUG #ifdef SS_DEBUG
char* querystr = modutil_get_SQL(tmp); char* querystr = modutil_get_SQL(tmp);
MXS_DEBUG("Sending queued buffer for session %p: %s", MXS_DEBUG("Sending queued buffer for session %p: %s",
@ -1352,6 +1200,7 @@ void SchemaRouterSession::route_queued_query()
querystr); querystr);
MXS_FREE(querystr); MXS_FREE(querystr);
#endif #endif
poll_add_epollin_event_to_dcb(this->m_client, tmp); poll_add_epollin_event_to_dcb(this->m_client, tmp);
} }
@ -1416,7 +1265,7 @@ int SchemaRouterSession::inspect_backend_mapping_states(backend_ref_t *bref,
/** Send the client an error about duplicate databases /** Send the client an error about duplicate databases
* if there is a queued query from the client. */ * if there is a queued query from the client. */
if (this->m_queue) if (this->m_queue.size())
{ {
GWBUF* error = modutil_create_mysql_err_msg(1, 0, GWBUF* error = modutil_create_mysql_err_msg(1, 0,
SCHEMA_ERR_DUPLICATEDB, SCHEMA_ERR_DUPLICATEDB,

View File

@ -16,6 +16,7 @@
#include "schemarouter.hh" #include "schemarouter.hh"
#include <string> #include <string>
#include <list>
#include <maxscale/protocol/mysql.h> #include <maxscale/protocol/mysql.h>
#include <maxscale/router.hh> #include <maxscale/router.hh>
@ -23,6 +24,9 @@
#include "shard_map.hh" #include "shard_map.hh"
#include "session_command.hh" #include "session_command.hh"
using std::string;
using std::list;
/** /**
* Bitmask values for the router session's initialization. These values are used * Bitmask values for the router session's initialization. These values are used
* to prevent responses from internal commands being forwarded to the client. * to prevent responses from internal commands being forwarded to the client.
@ -167,7 +171,7 @@ private:
string m_connect_db; /**< Database the user was trying to connect to */ string m_connect_db; /**< Database the user was trying to connect to */
string m_current_db; /**< Current active database */ string m_current_db; /**< Current active database */
int m_state; /**< Initialization state bitmask */ int m_state; /**< Initialization state bitmask */
GWBUF* m_queue; /**< Query that was received before the session was ready */ list<Buffer> m_queue; /**< Query that was received before the session was ready */
ROUTER_STATS m_stats; /**< Statistics for this router */ ROUTER_STATS m_stats; /**< Statistics for this router */
uint64_t m_sent_sescmd; /**< The latest session command being executed */ uint64_t m_sent_sescmd; /**< The latest session command being executed */
uint64_t m_replied_sescmd; /**< The last session command reply that was sent to the client */ uint64_t m_replied_sescmd; /**< The last session command reply that was sent to the client */
@ -184,9 +188,13 @@ private:
bool send_database_list(); bool send_database_list();
int gen_databaselist(); int gen_databaselist();
int inspect_backend_mapping_states(backend_ref_t *bref, GWBUF** wbuf); int inspect_backend_mapping_states(backend_ref_t *bref, GWBUF** wbuf);
int process_show_shards(); bool process_show_shards();
showdb_response_t parse_showdb_response(backend_ref_t* bref, GWBUF** buffer); showdb_response_t parse_showdb_response(backend_ref_t* bref, GWBUF** buffer);
void handle_error_reply_client(DCB* backend_dcb, GWBUF* errmsg); void handle_error_reply_client(DCB* backend_dcb, GWBUF* errmsg);
void route_queued_query(); void route_queued_query();
void synchronize_shard_map(); void synchronize_shard_map();
void handle_mapping_reply(backend_ref_t* bref, GWBUF* pPacket);
void process_response(backend_ref_t* bref, GWBUF** ppPacket);
SERVER* resolve_query_target(GWBUF* pPacket, uint32_t type, uint8_t command,
route_target_t& route_target);
}; };