Make connections and command queues internal to Backends

The SERVER_REF and DCB members of the Backend class are now
private. Access to the stored SERVER_REF is provided with the backend()
function. No accompanying setter function is provided as the backend
server should not change during the lifetime of the session.

The creation of the internal DCB is hidden behind the connect()
function. It simplifies the process of connecting to a server by removing
the need to manually do the bookkeeping of the server reference connection
counts. Access to the DCB is provided by the dcb() function.

The closing of the backend is done with the close() function which
contains the code that was previously in closeSession. If the backend
isn't closed when the destructor is called, it will be done
automatically. This should prevent connection leakage.

The pending command queues and the methods used to write them are now also
internal to the backends. They are simple wrappers around dcb->func.write
and the interfaces provided by the Buffer class. The mapping command queue
is still public. It needs to be combined with the generic command queue.
This commit is contained in:
Markus Mäkelä
2017-03-28 21:04:29 +03:00
parent 66fa4fbc7d
commit 6e218adc1d
5 changed files with 148 additions and 103 deletions

View File

@ -18,27 +18,62 @@
using namespace schemarouter; using namespace schemarouter;
Backend::Backend(SERVER_REF *ref): Backend::Backend(SERVER_REF *ref):
m_closed(false),
m_backend(ref), m_backend(ref),
m_dcb(NULL), m_dcb(NULL),
m_map_queue(NULL), m_map_queue(NULL),
m_mapped(false), m_mapped(false),
m_num_mapping_eof(0), m_num_mapping_eof(0),
m_num_result_wait(0), m_num_result_wait(0),
m_pending_cmd(NULL),
m_state(0) m_state(0)
{ {
} }
Backend::~Backend() Backend::~Backend()
{ {
ss_dassert(m_closed);
if (!m_closed)
{
close();
}
gwbuf_free(m_map_queue); gwbuf_free(m_map_queue);
gwbuf_free(m_pending_cmd);
} }
void Backend::close()
{
if (!m_closed)
{
m_closed = true;
if (BREF_IS_IN_USE(this))
{
CHK_DCB(m_dcb);
/** Clean operation counter in bref and in SERVER */
while (BREF_IS_WAITING_RESULT(this))
{
clear_state(BREF_WAITING_RESULT);
}
clear_state(BREF_IN_USE);
set_state(BREF_CLOSED);
dcb_close(m_dcb);
/** decrease server current connection counters */
atomic_add(&m_backend->connections, -1);
}
}
else
{
ss_dassert(false);
}
}
bool Backend::execute_sescmd() bool Backend::execute_sescmd()
{ {
if (BREF_IS_CLOSED(this)) if (BREF_IS_CLOSED(this) || m_session_commands.size() == 0)
{ {
return false; return false;
} }
@ -106,3 +141,54 @@ void Backend::set_state(enum bref_state state)
ss_dassert(prev2 >= 0); ss_dassert(prev2 >= 0);
} }
} }
SERVER_REF* Backend::backend() const
{
return m_backend;
}
bool Backend::connect(MXS_SESSION* session)
{
bool rval = false;
if ((m_dcb = dcb_connect(m_backend->server, session, m_backend->server->protocol)))
{
m_state = BREF_IN_USE;
atomic_add(&m_backend->connections, 1);
rval = true;
}
return rval;
}
DCB* Backend::dcb() const
{
return m_dcb;
}
bool Backend::write(GWBUF* buffer)
{
return m_dcb->func.write(m_dcb, buffer) != 0;
}
void Backend::store_command(GWBUF* buffer)
{
m_pending_cmd.reset(buffer);
}
bool Backend::write_stored_command()
{
bool rval = false;
if (m_pending_cmd.length())
{
rval = write(m_pending_cmd.release());
if (!rval)
{
MXS_ERROR("Routing of pending query failed.");
}
}
return rval;
}

View File

@ -130,14 +130,25 @@ public:
bool execute_sescmd(); bool execute_sescmd();
void clear_state(enum bref_state state); void clear_state(enum bref_state state);
void set_state(enum bref_state state); void set_state(enum bref_state state);
SERVER_REF* backend() const;
bool connect(MXS_SESSION*);
void close();
DCB* dcb() const;
bool write(GWBUF* buffer);
void store_command(GWBUF* buffer);
bool write_stored_command();
private:
bool m_closed; /**< True if a connection has been opened and closed */
SERVER_REF* m_backend; /**< Backend server */ SERVER_REF* m_backend; /**< Backend server */
DCB* m_dcb; /**< Backend DCB */ DCB* m_dcb; /**< Backend DCB */
public:
GWBUF* m_map_queue; GWBUF* m_map_queue;
bool m_mapped; /**< Whether the backend has been mapped */ bool m_mapped; /**< Whether the backend has been mapped */
int m_num_mapping_eof; int m_num_mapping_eof;
int m_num_result_wait; /**< Number of not yet received results */ int m_num_result_wait; /**< Number of not yet received results */
GWBUF* m_pending_cmd; /**< Pending commands */ Buffer m_pending_cmd; /**< Pending commands */
int m_state; /**< State of the backend */ int m_state; /**< State of the backend */
SessionCommandList m_session_commands; /**< List of session commands that are SessionCommandList m_session_commands; /**< List of session commands that are
* to be executed on this backend server */ * to be executed on this backend server */

View File

@ -208,7 +208,7 @@ bool connect_backend_servers(BackendList& backends, MXS_SESSION* session)
for (BackendList::iterator it = backends.begin(); it != backends.end(); it++) for (BackendList::iterator it = backends.begin(); it != backends.end(); it++)
{ {
SERVER_REF* b = (*it)->m_backend; SERVER_REF* b = (*it)->backend();
MXS_INFO("MaxScale connections : %d (%d) in \t%s:%d %s", MXS_INFO("MaxScale connections : %d (%d) in \t%s:%d %s",
b->connections, b->connections,
@ -224,7 +224,7 @@ bool connect_backend_servers(BackendList& backends, MXS_SESSION* session)
*/ */
for (BackendList::iterator it = backends.begin(); it != backends.end(); it++) for (BackendList::iterator it = backends.begin(); it != backends.end(); it++)
{ {
SERVER_REF* b = (*it)->m_backend; SERVER_REF* b = (*it)->backend();
if (SERVER_IS_RUNNING(b->server)) if (SERVER_IS_RUNNING(b->server))
{ {
@ -238,27 +238,9 @@ bool connect_backend_servers(BackendList& backends, MXS_SESSION* session)
/** New server connection */ /** New server connection */
else else
{ {
if (((*it)->m_dcb = dcb_connect(b->server, session, b->server->protocol))) if ((*it)->connect(session))
{ {
servers_connected += 1; servers_connected += 1;
/**
* When server fails, this callback
* is called.
* !!! Todo, routine which removes
* corresponding entries from the hash
* table.
*/
(*it)->m_state = 0;
(*it)->set_state(BREF_IN_USE);
/**
* Increase backend connection counter.
* Server's stats are _increased_ in
* dcb.c:dcb_alloc !
* But decreased in the calling function
* of dcb_close.
*/
atomic_add(&b->connections, 1);
} }
else else
{ {
@ -282,7 +264,7 @@ bool connect_backend_servers(BackendList& backends, MXS_SESSION* session)
{ {
for (BackendList::iterator it = backends.begin(); it != backends.end(); it++) for (BackendList::iterator it = backends.begin(); it != backends.end(); it++)
{ {
SERVER_REF* b = (*it)->m_backend; SERVER_REF* b = (*it)->backend();
if (BREF_IS_IN_USE((*it))) if (BREF_IS_IN_USE((*it)))
{ {

View File

@ -97,26 +97,9 @@ void SchemaRouterSession::close()
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{ {
DCB* dcb = (*it)->m_dcb; /** The backends are closed here to trigger the shutdown of
/** Close those which had been connected */ * the connected DCBs */
if (BREF_IS_IN_USE(*it)) (*it)->close();
{
CHK_DCB(dcb);
/** Clean operation counter in bref and in SERVER */
while (BREF_IS_WAITING_RESULT(*it))
{
(*it)->clear_state(BREF_WAITING_RESULT);
}
(*it)->clear_state(BREF_IN_USE);
(*it)->set_state(BREF_CLOSED);
/**
* closes protocol and dcb
*/
dcb_close(dcb);
/** decrease server current connection counters */
atomic_add(&(*it)->m_backend->connections, -1);
}
} }
spinlock_acquire(&m_router->m_lock); spinlock_acquire(&m_router->m_lock);
@ -247,7 +230,7 @@ SERVER* SchemaRouterSession::resolve_query_target(GWBUF* pPacket,
{ {
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{ {
SERVER *server = (*it)->m_backend->server; SERVER *server = (*it)->backend()->server;
if (SERVER_IS_RUNNING(server)) if (SERVER_IS_RUNNING(server))
{ {
route_target = TARGET_NAMED_SERVER; route_target = TARGET_NAMED_SERVER;
@ -440,17 +423,17 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
if (op == QUERY_OP_LOAD) if (op == QUERY_OP_LOAD)
{ {
m_load_target = bref->m_backend->server; m_load_target = bref->backend()->server;
} }
MXS_INFO("Route query to \t%s:%d <", bref->m_backend->server->name, bref->m_backend->server->port); MXS_INFO("Route query to \t%s:%d <", bref->backend()->server->name, bref->backend()->server->port);
if (bref->m_session_commands.size() > 0) if (bref->m_session_commands.size() > 0)
{ {
/** Store current statement if execution of the previous /** Store current statement if execution of the previous
* session command hasn't been completed. */ * session command hasn't been completed. */
ss_dassert((bref->m_pending_cmd == NULL || m_closed)); bref->store_command(pPacket);
bref->m_pending_cmd = pPacket; pPacket = NULL;
ret = 1; ret = 1;
} }
else if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(pPacket))) == 1) else if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(pPacket))) == 1)
@ -552,7 +535,7 @@ void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
MXS_DEBUG("Reply from [%s] session [%p]" MXS_DEBUG("Reply from [%s] session [%p]"
" mapping [%s] queries queued [%s]", " mapping [%s] queries queued [%s]",
bref->m_backend->server->unique_name, bref->backend()->server->unique_name,
m_client->session, m_client->session,
m_state & INIT_MAPPING ? "true" : "false", m_state & INIT_MAPPING ? "true" : "false",
m_queue.size() == 0 ? "none" : m_queue.size() == 0 ? "none" :
@ -591,30 +574,17 @@ void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
pPacket = NULL; pPacket = NULL;
} }
if (bref->m_session_commands.size() > 0) if (bref->execute_sescmd())
{ {
/** There are pending session commands to be executed. */
MXS_INFO("Backend %s:%d processed reply and starts to execute active cursor.", MXS_INFO("Backend %s:%d processed reply and starts to execute active cursor.",
bref->m_backend->server->name, bref->m_backend->server->port); bref->backend()->server->name, bref->backend()->server->port);
bref->execute_sescmd();
} }
else if (bref->m_pending_cmd) /*< non-sescmd is waiting to be routed */ else if (bref->write_stored_command())
{
CHK_GWBUF(bref->m_pending_cmd);
int ret = bref->m_dcb->func.write(bref->m_dcb, bref->m_pending_cmd);
bref->m_pending_cmd = NULL;
if (ret == 1)
{ {
atomic_add(&m_router->m_stats.n_queries, 1); atomic_add(&m_router->m_stats.n_queries, 1);
bref->set_state(BREF_QUERY_ACTIVE); bref->set_state(BREF_QUERY_ACTIVE);
bref->set_state(BREF_WAITING_RESULT); bref->set_state(BREF_WAITING_RESULT);
} }
else
{
MXS_ERROR("Routing of pending query failed.");
}
}
} }
gwbuf_free(pPacket); gwbuf_free(pPacket);
@ -764,9 +734,9 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command)
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO))
{ {
MXS_INFO("Route query to %s\t%s:%d", MXS_INFO("Route query to %s\t%s:%d",
SERVER_IS_MASTER((*it)->m_backend->server) ? "master" : "slave", SERVER_IS_MASTER((*it)->backend()->server) ? "master" : "slave",
(*it)->m_backend->server->name, (*it)->backend()->server->name,
(*it)->m_backend->server->port); (*it)->backend()->server->port);
} }
if ((*it)->m_session_commands.size() == 1) if ((*it)->m_session_commands.size() == 1)
@ -792,8 +762,8 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command)
{ {
MXS_ERROR("Failed to execute session " MXS_ERROR("Failed to execute session "
"command in %s:%d", "command in %s:%d",
(*it)->m_backend->server->name, (*it)->backend()->server->name,
(*it)->m_backend->server->port); (*it)->backend()->server->port);
} }
} }
else else
@ -801,8 +771,8 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command)
ss_dassert((*it)->m_session_commands.size() > 1); ss_dassert((*it)->m_session_commands.size() > 1);
/** The server is already executing a session command */ /** The server is already executing a session command */
MXS_INFO("Backend %s:%d already executing sescmd.", MXS_INFO("Backend %s:%d already executing sescmd.",
(*it)->m_backend->server->name, (*it)->backend()->server->name,
(*it)->m_backend->server->port); (*it)->backend()->server->port);
succp = true; succp = true;
} }
} }
@ -887,7 +857,7 @@ SBackend SchemaRouterSession::get_bref_from_dcb(DCB* dcb)
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{ {
if ((*it)->m_dcb == dcb) if ((*it)->dcb() == dcb)
{ {
return *it; return *it;
} }
@ -1101,7 +1071,7 @@ int SchemaRouterSession::inspect_backend_mapping_states(SBackend& bref,
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{ {
if (bref->m_dcb == (*it)->m_dcb && !BREF_IS_MAPPED(*it)) if (bref->dcb() == (*it)->dcb() && !BREF_IS_MAPPED(*it))
{ {
if (bref->m_map_queue) if (bref->m_map_queue)
{ {
@ -1113,7 +1083,7 @@ int SchemaRouterSession::inspect_backend_mapping_states(SBackend& bref,
{ {
(*it)->m_mapped = true; (*it)->m_mapped = true;
MXS_DEBUG("Received SHOW DATABASES reply from %s for session %p", MXS_DEBUG("Received SHOW DATABASES reply from %s for session %p",
(*it)->m_backend->server->unique_name, (*it)->backend()->server->unique_name,
m_client->session); m_client->session);
} }
else if (rc == SHOWDB_PARTIAL_RESPONSE) else if (rc == SHOWDB_PARTIAL_RESPONSE)
@ -1121,7 +1091,7 @@ int SchemaRouterSession::inspect_backend_mapping_states(SBackend& bref,
bref->m_map_queue = writebuf; bref->m_map_queue = writebuf;
writebuf = NULL; writebuf = NULL;
MXS_DEBUG("Received partial SHOW DATABASES reply from %s for session %p", MXS_DEBUG("Received partial SHOW DATABASES reply from %s for session %p",
(*it)->m_backend->server->unique_name, (*it)->backend()->server->unique_name,
m_client->session); m_client->session);
} }
else else
@ -1174,7 +1144,7 @@ int SchemaRouterSession::inspect_backend_mapping_states(SBackend& bref,
{ {
mapped = false; mapped = false;
MXS_DEBUG("Still waiting for reply to SHOW DATABASES from %s for session %p", MXS_DEBUG("Still waiting for reply to SHOW DATABASES from %s for session %p",
(*it)->m_backend->server->unique_name, m_client->session); (*it)->backend()->server->unique_name, m_client->session);
} }
} }
*wbuf = writebuf; *wbuf = writebuf;
@ -1348,7 +1318,7 @@ bool SchemaRouterSession::ignore_duplicate_database(const char* data)
enum showdb_response SchemaRouterSession::parse_showdb_response(SBackend& bref, GWBUF** buffer) enum showdb_response SchemaRouterSession::parse_showdb_response(SBackend& bref, GWBUF** buffer)
{ {
unsigned char* ptr; unsigned char* ptr;
SERVER* target = bref->m_backend->server; SERVER* target = bref->backend()->server;
GWBUF* buf; GWBUF* buf;
bool duplicate_found = false; bool duplicate_found = false;
enum showdb_response rval = SHOWDB_PARTIAL_RESPONSE; enum showdb_response rval = SHOWDB_PARTIAL_RESPONSE;
@ -1429,12 +1399,12 @@ enum showdb_response SchemaRouterSession::parse_showdb_response(SBackend& bref,
{ {
atomic_add(&bref->m_num_mapping_eof, 1); atomic_add(&bref->m_num_mapping_eof, 1);
MXS_INFO("SHOW DATABASES fully received from %s.", MXS_INFO("SHOW DATABASES fully received from %s.",
bref->m_backend->server->unique_name); bref->backend()->server->unique_name);
} }
else else
{ {
MXS_INFO("SHOW DATABASES partially received from %s.", MXS_INFO("SHOW DATABASES partially received from %s.",
bref->m_backend->server->unique_name); bref->backend()->server->unique_name);
} }
gwbuf_free(buf); gwbuf_free(buf);
@ -1460,7 +1430,7 @@ enum showdb_response SchemaRouterSession::parse_showdb_response(SBackend& bref,
* @param session Router client session * @param session Router client session
* @return 1 if all writes to backends were succesful and 0 if one or more errors occurred * @return 1 if all writes to backends were succesful and 0 if one or more errors occurred
*/ */
int SchemaRouterSession::gen_databaselist() void SchemaRouterSession::gen_databaselist()
{ {
DCB* dcb; DCB* dcb;
const char* query = "SHOW DATABASES"; const char* query = "SHOW DATABASES";
@ -1488,21 +1458,18 @@ int SchemaRouterSession::gen_databaselist()
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{ {
if (BREF_IS_IN_USE(*it) && if (BREF_IS_IN_USE(*it) && !BREF_IS_CLOSED(*it) &
!BREF_IS_CLOSED(*it) & SERVER_IS_RUNNING((*it)->backend()->server))
SERVER_IS_RUNNING((*it)->m_backend->server))
{ {
clone = gwbuf_clone(buffer); clone = gwbuf_clone(buffer);
dcb = (*it)->m_dcb; if (!(*it)->write(clone))
rval |= !dcb->func.write(dcb, clone); {
MXS_DEBUG("Wrote SHOW DATABASES to %s for session %p: returned %d", MXS_ERROR("Failed to write SHOW DATABASES to '%s'",
(*it)->m_backend->server->unique_name, (*it)->backend()->server->unique_name);
m_client->session, }
rval);
} }
} }
gwbuf_free(buffer); gwbuf_free(buffer);
return !rval;
} }
/** /**
@ -1597,11 +1564,11 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype)
{ {
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{ {
char *srvnm = (*it)->m_backend->server->unique_name; char *srvnm = (*it)->backend()->server->unique_name;
if (strcmp(srvnm, (char*)buffer->hint->data) == 0) if (strcmp(srvnm, (char*)buffer->hint->data) == 0)
{ {
rval = (*it)->m_backend->server; rval = (*it)->backend()->server;
MXS_INFO("Routing hint found (%s)", rval->unique_name); MXS_INFO("Routing hint found (%s)", rval->unique_name);
} }
} }
@ -1646,7 +1613,7 @@ bool SchemaRouterSession::get_shard_dcb(DCB** p_dcb, char* name)
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{ {
SERVER_REF* b = (*it)->m_backend; SERVER_REF* b = (*it)->backend();
/** /**
* To become chosen: * To become chosen:
* backend must be in use, name must match, and * backend must be in use, name must match, and
@ -1656,9 +1623,8 @@ bool SchemaRouterSession::get_shard_dcb(DCB** p_dcb, char* name)
(strncasecmp(name, b->server->unique_name, PATH_MAX) == 0) && (strncasecmp(name, b->server->unique_name, PATH_MAX) == 0) &&
SERVER_IS_RUNNING(b->server)) SERVER_IS_RUNNING(b->server))
{ {
*p_dcb = (*it)->m_dcb; *p_dcb = (*it)->dcb();
succp = true; succp = true;
ss_dassert((*it)->m_dcb->state != DCB_STATE_ZOMBIE);
break; break;
} }
} }

View File

@ -132,7 +132,7 @@ private:
bool have_servers(); bool have_servers();
bool route_session_write(GWBUF* querybuf, uint8_t command); bool route_session_write(GWBUF* querybuf, uint8_t command);
bool send_database_list(); bool send_database_list();
int gen_databaselist(); void gen_databaselist();
int inspect_backend_mapping_states(SBackend& bref, GWBUF** wbuf); int inspect_backend_mapping_states(SBackend& bref, GWBUF** wbuf);
bool process_show_shards(); bool process_show_shards();
enum showdb_response parse_showdb_response(SBackend& bref, GWBUF** buffer); enum showdb_response parse_showdb_response(SBackend& bref, GWBUF** buffer);