Use shared pointers to Backend classes

The schemarouter now uses shared pointers. This removes the need to copy
the class.

Following changes move the member variables inside the Backend class.
This commit is contained in:
Markus Mäkelä
2017-03-28 16:22:41 +03:00
parent f9e5275605
commit 66fa4fbc7d
7 changed files with 410 additions and 395 deletions

View File

@ -28,101 +28,12 @@ bool extract_database(GWBUF* buf, char* str);
bool detect_show_shards(GWBUF* query);
void write_error_to_client(DCB* dcb, int errnum, const char* mysqlstate, const char* errmsg);
Backend::Backend(SERVER_REF *ref):
m_backend(ref),
m_dcb(NULL),
m_map_queue(NULL),
m_mapped(false),
m_num_mapping_eof(0),
m_num_result_wait(0),
m_pending_cmd(NULL),
m_state(0)
{
}
Backend::~Backend()
{
gwbuf_free(m_map_queue);
gwbuf_free(m_pending_cmd);
}
bool Backend::execute_sescmd()
{
if (BREF_IS_CLOSED(this))
{
return false;
}
CHK_DCB(m_dcb);
int rc = 0;
/** Return if there are no pending ses commands */
if (m_session_commands.size() == 0)
{
MXS_INFO("Cursor had no pending session commands.");
return false;
}
SessionCommandList::iterator iter = m_session_commands.begin();
GWBUF *buffer = iter->copy_buffer().release();
switch (iter->get_command())
{
case MYSQL_COM_CHANGE_USER:
/** This makes it possible to handle replies correctly */
gwbuf_set_type(buffer, GWBUF_TYPE_SESCMD);
rc = m_dcb->func.auth(m_dcb, NULL, m_dcb->session, buffer);
break;
case MYSQL_COM_QUERY:
default:
/**
* Mark session command buffer, it triggers writing
* MySQL command to protocol
*/
gwbuf_set_type(buffer, GWBUF_TYPE_SESCMD);
rc = m_dcb->func.write(m_dcb, buffer);
break;
}
return rc == 1;
}
void Backend::clear_state(enum bref_state state)
{
if (state != BREF_WAITING_RESULT)
{
m_state &= ~state;
}
else
{
/** Decrease global operation count */
ss_debug(int prev2 = )atomic_add(&m_backend->server->stats.n_current_ops, -1);
ss_dassert(prev2 > 0);
}
}
void Backend::set_state(enum bref_state state)
{
if (state != BREF_WAITING_RESULT)
{
m_state |= state;
}
else
{
/** Increase global operation count */
ss_debug(int prev2 = )atomic_add(&m_backend->server->stats.n_current_ops, 1);
ss_dassert(prev2 >= 0);
}
}
SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter* router):
SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter* router, BackendList& backends):
mxs::RouterSession(session),
m_closed(false),
m_client(session->client_dcb),
m_mysql_session((MYSQL_session*)session->client_dcb->data),
m_backends(backends),
m_config(&router->m_config),
m_router(router),
m_shard(m_router->m_shard_manager.get_shard(m_client->user, m_config->refresh_min_interval)),
@ -160,20 +71,6 @@ SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter* rou
m_state |= INIT_USE_DB;
}
for (SERVER_REF *ref = m_router->m_service->dbref; ref; ref = ref->next)
{
if (ref->active)
{
m_backends.push_back(Backend(ref));
}
}
if (!connect_backend_servers(m_backends, session))
{
// TODO: Figure out how to avoid this throw
throw std::runtime_error("Failed to connect to backend servers");
}
if (db[0])
{
/* Store the database the client is connecting to */
@ -200,25 +97,25 @@ void SchemaRouterSession::close()
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{
DCB* dcb = it->m_dcb;
DCB* dcb = (*it)->m_dcb;
/** Close those which had been connected */
if (BREF_IS_IN_USE(it))
if (BREF_IS_IN_USE(*it))
{
CHK_DCB(dcb);
/** Clean operation counter in bref and in SERVER */
while (BREF_IS_WAITING_RESULT(it))
while (BREF_IS_WAITING_RESULT(*it))
{
it->clear_state(BREF_WAITING_RESULT);
(*it)->clear_state(BREF_WAITING_RESULT);
}
it->clear_state(BREF_IN_USE);
it->set_state(BREF_CLOSED);
(*it)->clear_state(BREF_IN_USE);
(*it)->set_state(BREF_CLOSED);
/**
* closes protocol and dcb
*/
dcb_close(dcb);
/** decrease server current connection counters */
atomic_add(&it->m_backend->connections, -1);
atomic_add(&(*it)->m_backend->connections, -1);
}
}
@ -350,7 +247,7 @@ SERVER* SchemaRouterSession::resolve_query_target(GWBUF* pPacket,
{
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{
SERVER *server = it->m_backend->server;
SERVER *server = (*it)->m_backend->server;
if (SERVER_IS_RUNNING(server))
{
route_target = TARGET_NAMED_SERVER;
@ -435,7 +332,7 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
if (m_load_target)
{
/** A load data local infile is active */
target = m_load_target->m_backend->server;
target = m_load_target;
route_target = TARGET_NAMED_SERVER;
if (is_empty_packet(pPacket))
@ -539,11 +436,11 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
get_shard_dcb(&target_dcb, target->unique_name))
{
/** We know where to route this query */
Backend *bref = get_bref_from_dcb(target_dcb);
SBackend bref = get_bref_from_dcb(target_dcb);
if (op == QUERY_OP_LOAD)
{
m_load_target = bref;
m_load_target = bref->m_backend->server;
}
MXS_INFO("Route query to \t%s:%d <", bref->m_backend->server->name, bref->m_backend->server->port);
@ -558,16 +455,10 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
}
else if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(pPacket))) == 1)
{
Backend* bref;
atomic_add(&m_router->m_stats.n_queries, 1);
/**
* Add one query response waiter to backend reference
*/
bref = get_bref_from_dcb(target_dcb);
/** Add one query response waiter to backend reference */
bref->set_state(BREF_QUERY_ACTIVE);
bref->set_state(BREF_WAITING_RESULT);
atomic_add(&m_router->m_stats.n_queries, 1);
}
else
{
@ -578,7 +469,7 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
gwbuf_free(pPacket);
return ret;
}
void SchemaRouterSession::handle_mapping_reply(Backend* bref, GWBUF** pPacket)
void SchemaRouterSession::handle_mapping_reply(SBackend& bref, GWBUF** pPacket)
{
int rc = inspect_backend_mapping_states(bref, pPacket);
@ -612,7 +503,7 @@ void SchemaRouterSession::handle_mapping_reply(Backend* bref, GWBUF** pPacket)
}
}
void SchemaRouterSession::process_response(Backend* bref, GWBUF** ppPacket)
void SchemaRouterSession::process_response(SBackend& bref, GWBUF** ppPacket)
{
if (bref->m_session_commands.size() > 0)
{
@ -651,9 +542,9 @@ void SchemaRouterSession::process_response(Backend* bref, GWBUF** ppPacket)
void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
{
Backend* bref = get_bref_from_dcb(pDcb);
SBackend bref = get_bref_from_dcb(pDcb);
if (m_closed || bref == NULL)
if (m_closed || bref.get() == NULL) // The bref should always be valid
{
gwbuf_free(pPacket);
return;
@ -741,14 +632,21 @@ void SchemaRouterSession::handleError(GWBUF* pMessage,
CHK_SESSION(session);
SBackend bref = get_bref_from_dcb(pProblem);
if (bref.get() == NULL) // Should never happen
{
return;
}
switch (action)
{
case ERRACT_NEW_CONNECTION:
*pSuccess = handle_error_new_connection(pProblem, pMessage);
*pSuccess = handle_error_new_connection(bref, pMessage);
break;
case ERRACT_REPLY_CLIENT:
handle_error_reply_client(pProblem, pMessage);
handle_error_reply_client(bref, pMessage);
*pSuccess = false; /*< no new backend servers were made available */
break;
@ -858,20 +756,20 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command)
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{
if (BREF_IS_IN_USE(it))
if (BREF_IS_IN_USE(*it))
{
GWBUF *buffer = gwbuf_clone(querybuf);
it->m_session_commands.push_back(SessionCommand(buffer, m_sent_sescmd));
(*it)->m_session_commands.push_back(SessionCommand(buffer, m_sent_sescmd));
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO))
{
MXS_INFO("Route query to %s\t%s:%d",
SERVER_IS_MASTER(it->m_backend->server) ? "master" : "slave",
it->m_backend->server->name,
it->m_backend->server->port);
SERVER_IS_MASTER((*it)->m_backend->server) ? "master" : "slave",
(*it)->m_backend->server->name,
(*it)->m_backend->server->port);
}
if (it->m_session_commands.size() == 1)
if ((*it)->m_session_commands.size() == 1)
{
/** Only one command, execute it */
switch (command)
@ -882,11 +780,11 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command)
break;
default:
it->set_state(BREF_WAITING_RESULT);
(*it)->set_state(BREF_WAITING_RESULT);
break;
}
if (it->execute_sescmd())
if ((*it)->execute_sescmd())
{
succp = true;
}
@ -894,17 +792,17 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command)
{
MXS_ERROR("Failed to execute session "
"command in %s:%d",
it->m_backend->server->name,
it->m_backend->server->port);
(*it)->m_backend->server->name,
(*it)->m_backend->server->port);
}
}
else
{
ss_dassert(it->m_session_commands.size() > 1);
ss_dassert((*it)->m_session_commands.size() > 1);
/** The server is already executing a session command */
MXS_INFO("Backend %s:%d already executing sescmd.",
it->m_backend->server->name,
it->m_backend->server->port);
(*it)->m_backend->server->name,
(*it)->m_backend->server->port);
succp = true;
}
}
@ -913,20 +811,14 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command)
return succp;
}
void SchemaRouterSession::handle_error_reply_client(DCB* dcb, GWBUF* errmsg)
void SchemaRouterSession::handle_error_reply_client(SBackend& bref, GWBUF* errmsg)
{
Backend* bref = get_bref_from_dcb(dcb);
bref->clear_state(BREF_IN_USE);
bref->set_state(BREF_CLOSED);
if (bref)
if (m_client->session->state == SESSION_STATE_ROUTER_READY)
{
bref->clear_state(BREF_IN_USE);
bref->set_state(BREF_CLOSED);
}
if (dcb->session->state == SESSION_STATE_ROUTER_READY)
{
dcb->session->client_dcb->func.write(dcb->session->client_dcb, gwbuf_clone(errmsg));
m_client->func.write(m_client, gwbuf_clone(errmsg));
}
}
@ -940,7 +832,7 @@ bool SchemaRouterSession::have_servers()
{
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{
if (BREF_IS_IN_USE(it) && !BREF_IS_CLOSED(it))
if (BREF_IS_IN_USE(*it) && !BREF_IS_CLOSED(*it))
{
return true;
}
@ -962,32 +854,19 @@ bool SchemaRouterSession::have_servers()
*
* @return true if there are enough backend connections to continue, false if not
*/
bool SchemaRouterSession::handle_error_new_connection(DCB* backend_dcb, GWBUF* errmsg)
bool SchemaRouterSession::handle_error_new_connection(SBackend& bref, GWBUF* errmsg)
{
MXS_SESSION *ses = backend_dcb->session;
CHK_SESSION(ses);
Backend* bref = get_bref_from_dcb(backend_dcb);
if (bref == NULL)
{
/** This should not happen */
ss_dassert(false);
return false;
}
/**
* If query was sent through the bref and it is waiting for reply from
* the backend server it is necessary to send an error to the client
* because it is waiting for reply.
*/
if (BREF_IS_WAITING_RESULT(bref))
{
DCB* client_dcb;
client_dcb = ses->client_dcb;
client_dcb->func.write(client_dcb, gwbuf_clone(errmsg));
/**
* If query was sent through the bref and it is waiting for reply from
* the backend server it is necessary to send an error to the client
* because it is waiting for reply.
*/
m_client->func.write(m_client, gwbuf_clone(errmsg));
bref->clear_state(BREF_WAITING_RESULT);
}
bref->clear_state(BREF_IN_USE);
bref->set_state(BREF_CLOSED);
@ -1002,19 +881,21 @@ bool SchemaRouterSession::handle_error_new_connection(DCB* backend_dcb, GWBUF* e
*
* @return backend reference pointer if succeed or NULL
*/
Backend* SchemaRouterSession::get_bref_from_dcb(DCB* dcb)
SBackend SchemaRouterSession::get_bref_from_dcb(DCB* dcb)
{
CHK_DCB(dcb);
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{
if (it->m_dcb == dcb)
if ((*it)->m_dcb == dcb)
{
return &(*it);
return *it;
}
}
return NULL;
// This should not happen
ss_dassert(false);
return SBackend(NULL);
}
/**
@ -1212,7 +1093,7 @@ void SchemaRouterSession::route_queued_query()
* @param router_cli_ses Router client session
* @return 1 if mapping is done, 0 if it is still ongoing and -1 on error
*/
int SchemaRouterSession::inspect_backend_mapping_states(Backend *bref,
int SchemaRouterSession::inspect_backend_mapping_states(SBackend& bref,
GWBUF** wbuf)
{
bool mapped = true;
@ -1220,20 +1101,19 @@ int SchemaRouterSession::inspect_backend_mapping_states(Backend *bref,
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{
if (bref->m_dcb == it->m_dcb && !BREF_IS_MAPPED(it))
if (bref->m_dcb == (*it)->m_dcb && !BREF_IS_MAPPED(*it))
{
if (bref->m_map_queue)
{
writebuf = gwbuf_append(bref->m_map_queue, writebuf);
bref->m_map_queue = NULL;
}
enum showdb_response rc = parse_showdb_response(&(*it),
&writebuf);
enum showdb_response rc = parse_showdb_response(*it, &writebuf);
if (rc == SHOWDB_FULL_RESPONSE)
{
it->m_mapped = true;
(*it)->m_mapped = true;
MXS_DEBUG("Received SHOW DATABASES reply from %s for session %p",
it->m_backend->server->unique_name,
(*it)->m_backend->server->unique_name,
m_client->session);
}
else if (rc == SHOWDB_PARTIAL_RESPONSE)
@ -1241,7 +1121,7 @@ int SchemaRouterSession::inspect_backend_mapping_states(Backend *bref,
bref->m_map_queue = writebuf;
writebuf = NULL;
MXS_DEBUG("Received partial SHOW DATABASES reply from %s for session %p",
it->m_backend->server->unique_name,
(*it)->m_backend->server->unique_name,
m_client->session);
}
else
@ -1290,12 +1170,11 @@ int SchemaRouterSession::inspect_backend_mapping_states(Backend *bref,
}
}
if (BREF_IS_IN_USE(it) && !BREF_IS_MAPPED(it))
if (BREF_IS_IN_USE(*it) && !BREF_IS_MAPPED(*it))
{
mapped = false;
MXS_DEBUG("Still waiting for reply to SHOW DATABASES from %s for session %p",
it->m_backend->server->unique_name,
m_client->session);
(*it)->m_backend->server->unique_name, m_client->session);
}
}
*wbuf = writebuf;
@ -1466,7 +1345,7 @@ bool SchemaRouterSession::ignore_duplicate_database(const char* data)
* @return 1 if a complete response was received, 0 if a partial response was received
* and -1 if a database was found on more than one server.
*/
enum showdb_response SchemaRouterSession::parse_showdb_response(Backend* bref, GWBUF** buffer)
enum showdb_response SchemaRouterSession::parse_showdb_response(SBackend& bref, GWBUF** buffer)
{
unsigned char* ptr;
SERVER* target = bref->m_backend->server;
@ -1591,8 +1470,8 @@ int SchemaRouterSession::gen_databaselist()
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{
it->m_mapped = false;
it->m_num_mapping_eof = 0;
(*it)->m_mapped = false;
(*it)->m_num_mapping_eof = 0;
}
m_state |= INIT_MAPPING;
@ -1609,15 +1488,15 @@ int SchemaRouterSession::gen_databaselist()
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{
if (BREF_IS_IN_USE(it) &&
!BREF_IS_CLOSED(it) &
SERVER_IS_RUNNING(it->m_backend->server))
if (BREF_IS_IN_USE(*it) &&
!BREF_IS_CLOSED(*it) &
SERVER_IS_RUNNING((*it)->m_backend->server))
{
clone = gwbuf_clone(buffer);
dcb = it->m_dcb;
dcb = (*it)->m_dcb;
rval |= !dcb->func.write(dcb, clone);
MXS_DEBUG("Wrote SHOW DATABASES to %s for session %p: returned %d",
it->m_backend->server->unique_name,
(*it)->m_backend->server->unique_name,
m_client->session,
rval);
}
@ -1718,11 +1597,11 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype)
{
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{
char *srvnm = it->m_backend->server->unique_name;
char *srvnm = (*it)->m_backend->server->unique_name;
if (strcmp(srvnm, (char*)buffer->hint->data) == 0)
{
rval = it->m_backend->server;
rval = (*it)->m_backend->server;
MXS_INFO("Routing hint found (%s)", rval->unique_name);
}
}
@ -1767,19 +1646,19 @@ bool SchemaRouterSession::get_shard_dcb(DCB** p_dcb, char* name)
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{
SERVER_REF* b = it->m_backend;
SERVER_REF* b = (*it)->m_backend;
/**
* To become chosen:
* backend must be in use, name must match, and
* the backend state must be RUNNING
*/
if (BREF_IS_IN_USE((&(*it))) &&
if (BREF_IS_IN_USE((*it)) &&
(strncasecmp(name, b->server->unique_name, PATH_MAX) == 0) &&
SERVER_IS_RUNNING(b->server))
{
*p_dcb = it->m_dcb;
*p_dcb = (*it)->m_dcb;
succp = true;
ss_dassert(it->m_dcb->state != DCB_STATE_ZOMBIE);
ss_dassert((*it)->m_dcb->state != DCB_STATE_ZOMBIE);
break;
}
}
@ -1881,131 +1760,3 @@ bool SchemaRouterSession::send_database_list()
return rval;
}
/**
* @node Search all RUNNING backend servers and connect
*
* Parameters:
* @param backend_ref - in, use, out
* Pointer to backend server reference object array.
* NULL is not allowed.
*
* @param router_nservers - in, use
* Number of backend server pointers pointed to by b.
*
* @param session - in, use
* MaxScale session pointer used when connection to backend is established.
*
* @param router - in, use
* Pointer to router instance. Used when server states are qualified.
*
* @return true, if at least one master and one slave was found.
*
*
* @details It is assumed that there is only one available server.
* There will be exactly as many backend references than there are
* connections because all servers are supposed to be operational. It is,
* however, possible that there are less available servers than expected.
*/
bool connect_backend_servers(BackendList& backends, MXS_SESSION* session)
{
bool succp = false;
int servers_found = 0;
int servers_connected = 0;
int slaves_connected = 0;
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO))
{
MXS_INFO("Servers and connection counts:");
for (BackendList::iterator it = backends.begin(); it != backends.end(); it++)
{
SERVER_REF* b = it->m_backend;
MXS_INFO("MaxScale connections : %d (%d) in \t%s:%d %s",
b->connections,
b->server->stats.n_current,
b->server->name,
b->server->port,
STRSRVSTATUS(b->server));
}
}
/**
* Scan server list and connect each of them. None should fail or session
* can't be established.
*/
for (BackendList::iterator it = backends.begin(); it != backends.end(); it++)
{
SERVER_REF* b = it->m_backend;
if (SERVER_IS_RUNNING(b->server))
{
servers_found += 1;
/** Server is already connected */
if (BREF_IS_IN_USE(it))
{
slaves_connected += 1;
}
/** New server connection */
else
{
if ((it->m_dcb = dcb_connect(b->server, session, b->server->protocol)))
{
servers_connected += 1;
/**
* When server fails, this callback
* is called.
* !!! Todo, routine which removes
* corresponding entries from the hash
* table.
*/
it->m_state = 0;
it->set_state(BREF_IN_USE);
/**
* Increase backend connection counter.
* Server's stats are _increased_ in
* dcb.c:dcb_alloc !
* But decreased in the calling function
* of dcb_close.
*/
atomic_add(&b->connections, 1);
}
else
{
succp = false;
MXS_ERROR("Unable to establish "
"connection with slave %s:%d",
b->server->name,
b->server->port);
/* handle connect error */
break;
}
}
}
}
if (servers_connected > 0)
{
succp = true;
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO))
{
for (BackendList::iterator it = backends.begin(); it != backends.end(); it++)
{
SERVER_REF* b = it->m_backend;
if (BREF_IS_IN_USE(it))
{
MXS_INFO("Connected %s in \t%s:%d",
STRSRVSTATUS(b->server),
b->server->name,
b->server->port);
}
}
}
}
return succp;
}