Use a list of backends instead of an array

Changed the backend_ref_t struct into a Backend class. Replaced static
arrays with lists. Altered functions to make the code compile.

Further refactoring is needed, a part of the functions should be moved
into this new class.
This commit is contained in:
Markus Mäkelä
2017-03-27 19:41:50 +03:00
parent c4638666cf
commit d1aa2a4b8a
2 changed files with 159 additions and 201 deletions

View File

@ -20,13 +20,11 @@
#include "schemaroutersession.hh" #include "schemaroutersession.hh"
#include "schemarouterinstance.hh" #include "schemarouterinstance.hh"
bool connect_backend_servers(backend_ref_t* backend_ref, bool connect_backend_servers(BackendList& backends, MXS_SESSION* session);
int router_nservers,
MXS_SESSION* session);
bool execute_sescmd_in_backend(backend_ref_t* backend_ref); bool execute_sescmd_in_backend(Backend* backend_ref);
void bref_clear_state(backend_ref_t* bref, enum bref_state state); void bref_clear_state(Backend& bref, enum bref_state state);
void bref_set_state(backend_ref_t* bref, enum bref_state state); void bref_set_state(Backend& bref, enum bref_state state);
enum route_target get_shard_route_target(uint32_t qtype); enum route_target get_shard_route_target(uint32_t qtype);
bool change_current_db(string& dest, Shard& shard, GWBUF* buf); bool change_current_db(string& dest, Shard& shard, GWBUF* buf);
@ -34,15 +32,30 @@ bool extract_database(GWBUF* buf, char* str);
bool detect_show_shards(GWBUF* query); bool detect_show_shards(GWBUF* query);
void write_error_to_client(DCB* dcb, int errnum, const char* mysqlstate, const char* errmsg); void write_error_to_client(DCB* dcb, int errnum, const char* mysqlstate, const char* errmsg);
Backend::Backend(SERVER_REF *ref):
backend(ref),
dcb(NULL),
map_queue(NULL),
mapped(false),
n_mapping_eof(0),
num_result_wait(0),
pending_cmd(NULL),
state(0)
{
}
Backend::~Backend()
{
gwbuf_free(map_queue);
gwbuf_free(pending_cmd);
}
SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter* router): SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter* router):
mxs::RouterSession(session), mxs::RouterSession(session),
m_closed(false), m_closed(false),
m_client(session->client_dcb), m_client(session->client_dcb),
m_mysql_session((MYSQL_session*)session->client_dcb->data), m_mysql_session((MYSQL_session*)session->client_dcb->data),
m_backends(NULL),
m_config(&m_router->m_config), m_config(&m_router->m_config),
m_backend_count(0),
m_router(router), m_router(router),
m_shard(m_router->m_shard_manager.get_shard(m_client->user, m_config->refresh_min_interval)), m_shard(m_router->m_shard_manager.get_shard(m_client->user, m_config->refresh_min_interval)),
m_state(0), m_state(0),
@ -78,33 +91,15 @@ SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter* rou
m_state |= INIT_USE_DB; m_state |= INIT_USE_DB;
} }
int router_nservers = m_router->m_service->n_dbref; for (SERVER_REF *ref = m_router->m_service->dbref; ref; ref = ref->next)
backend_ref_t* backend_ref = new backend_ref_t[router_nservers];
int i = 0;
for (SERVER_REF *ref = m_router->m_service->dbref; ref && i < router_nservers; ref = ref->next)
{ {
if (ref->active) if (ref->active)
{ {
backend_ref[i].state = 0; m_backends.push_back(Backend(ref));
backend_ref[i].n_mapping_eof = 0;
backend_ref[i].map_queue = NULL;
backend_ref[i].backend = ref;
backend_ref[i].pending_cmd = NULL;
i++;
} }
} }
if (i < router_nservers) if (!connect_backend_servers(m_backends, session))
{
/** Service had less than the reported number of servers */
router_nservers = i;
}
m_backends = backend_ref;
m_backend_count = router_nservers;
if (!connect_backend_servers(backend_ref, router_nservers, session))
{ {
// TODO: Figure out how to avoid this throw // TODO: Figure out how to avoid this throw
throw std::runtime_error("Failed to connect to backend servers"); throw std::runtime_error("Failed to connect to backend servers");
@ -121,12 +116,6 @@ SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter* rou
SchemaRouterSession::~SchemaRouterSession() SchemaRouterSession::~SchemaRouterSession()
{ {
for (int i = 0; i < m_backend_count; i++)
{
gwbuf_free(m_backends[i].pending_cmd);
}
delete[] m_backends;
} }
void SchemaRouterSession::close() void SchemaRouterSession::close()
@ -140,28 +129,27 @@ void SchemaRouterSession::close()
{ {
m_closed = true; m_closed = true;
for (int i = 0; i < m_backend_count; i++) for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{ {
backend_ref_t* bref = &m_backends[i]; DCB* dcb = it->dcb;
DCB* dcb = bref->dcb;
/** Close those which had been connected */ /** Close those which had been connected */
if (BREF_IS_IN_USE(bref)) if (BREF_IS_IN_USE(it))
{ {
CHK_DCB(dcb); CHK_DCB(dcb);
/** Clean operation counter in bref and in SERVER */ /** Clean operation counter in bref and in SERVER */
while (BREF_IS_WAITING_RESULT(bref)) while (BREF_IS_WAITING_RESULT(it))
{ {
bref_clear_state(bref, BREF_WAITING_RESULT); bref_clear_state(*it, BREF_WAITING_RESULT);
} }
bref_clear_state(bref, BREF_IN_USE); bref_clear_state(*it, BREF_IN_USE);
bref_set_state(bref, BREF_CLOSED); bref_set_state(*it, BREF_CLOSED);
/** /**
* closes protocol and dcb * closes protocol and dcb
*/ */
dcb_close(dcb); dcb_close(dcb);
/** decrease server current connection counters */ /** decrease server current connection counters */
atomic_add(&bref->backend->connections, -1); atomic_add(&it->backend->connections, -1);
} }
} }
@ -291,9 +279,9 @@ SERVER* SchemaRouterSession::resolve_query_target(GWBUF* pPacket,
if (TARGET_IS_ANY(route_target)) if (TARGET_IS_ANY(route_target))
{ {
for (int i = 0; i < m_backend_count; i++) for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{ {
SERVER *server = m_backends[i].backend->server; SERVER *server = it->backend->server;
if (SERVER_IS_RUNNING(server)) if (SERVER_IS_RUNNING(server))
{ {
route_target = TARGET_NAMED_SERVER; route_target = TARGET_NAMED_SERVER;
@ -453,7 +441,7 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
get_shard_dcb(&target_dcb, target->unique_name)) get_shard_dcb(&target_dcb, target->unique_name))
{ {
/** We know where to route this query */ /** We know where to route this query */
backend_ref_t *bref = get_bref_from_dcb(target_dcb); Backend *bref = get_bref_from_dcb(target_dcb);
MXS_INFO("Route query to \t%s:%d <", bref->backend->server->name, bref->backend->server->port); MXS_INFO("Route query to \t%s:%d <", bref->backend->server->name, bref->backend->server->port);
@ -467,7 +455,7 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
} }
else if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(pPacket))) == 1) else if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(pPacket))) == 1)
{ {
backend_ref_t* bref; Backend* bref;
atomic_add(&m_router->m_stats.n_queries, 1); atomic_add(&m_router->m_stats.n_queries, 1);
@ -475,8 +463,8 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
* Add one query response waiter to backend reference * Add one query response waiter to backend reference
*/ */
bref = get_bref_from_dcb(target_dcb); bref = get_bref_from_dcb(target_dcb);
bref_set_state(bref, BREF_QUERY_ACTIVE); bref_set_state(*bref, BREF_QUERY_ACTIVE);
bref_set_state(bref, BREF_WAITING_RESULT); bref_set_state(*bref, BREF_WAITING_RESULT);
} }
else else
{ {
@ -487,7 +475,7 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
gwbuf_free(pPacket); gwbuf_free(pPacket);
return ret; return ret;
} }
void SchemaRouterSession::handle_mapping_reply(backend_ref_t* bref, GWBUF* pPacket) void SchemaRouterSession::handle_mapping_reply(Backend* bref, GWBUF* pPacket)
{ {
int rc = inspect_backend_mapping_states(bref, &pPacket); int rc = inspect_backend_mapping_states(bref, &pPacket);
@ -521,7 +509,7 @@ void SchemaRouterSession::handle_mapping_reply(backend_ref_t* bref, GWBUF* pPack
} }
} }
void SchemaRouterSession::process_response(backend_ref_t* bref, GWBUF** ppPacket) void SchemaRouterSession::process_response(Backend* bref, GWBUF** ppPacket)
{ {
if (bref->session_commands.size() > 0) if (bref->session_commands.size() > 0)
{ {
@ -547,20 +535,20 @@ void SchemaRouterSession::process_response(backend_ref_t* bref, GWBUF** ppPacket
if (*ppPacket) if (*ppPacket)
{ {
bref_clear_state(bref, BREF_WAITING_RESULT); bref_clear_state(*bref, BREF_WAITING_RESULT);
} }
} }
else if (BREF_IS_QUERY_ACTIVE(bref)) else if (BREF_IS_QUERY_ACTIVE(bref))
{ {
bref_clear_state(bref, BREF_QUERY_ACTIVE); bref_clear_state(*bref, BREF_QUERY_ACTIVE);
/** Set response status as replied */ /** Set response status as replied */
bref_clear_state(bref, BREF_WAITING_RESULT); bref_clear_state(*bref, BREF_WAITING_RESULT);
} }
} }
void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb) void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
{ {
backend_ref_t* bref = get_bref_from_dcb(pDcb); Backend* bref = get_bref_from_dcb(pDcb);
if (m_closed || bref == NULL) if (m_closed || bref == NULL)
{ {
@ -614,7 +602,7 @@ void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
/** There are pending session commands to be executed. */ /** There are pending session commands to be executed. */
MXS_INFO("Backend %s:%d processed reply and starts to execute active cursor.", MXS_INFO("Backend %s:%d processed reply and starts to execute active cursor.",
bref->backend->server->name, bref->backend->server->port); bref->backend->server->name, bref->backend->server->port);
execute_sescmd_in_backend(bref); execute_sescmd_in_backend(*bref);
} }
else if (bref->pending_cmd) /*< non-sescmd is waiting to be routed */ else if (bref->pending_cmd) /*< non-sescmd is waiting to be routed */
{ {
@ -625,8 +613,8 @@ void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
if (ret == 1) if (ret == 1)
{ {
atomic_add(&m_router->m_stats.n_queries, 1); atomic_add(&m_router->m_stats.n_queries, 1);
bref_set_state(bref, BREF_QUERY_ACTIVE); bref_set_state(*bref, BREF_QUERY_ACTIVE);
bref_set_state(bref, BREF_WAITING_RESULT); bref_set_state(*bref, BREF_WAITING_RESULT);
} }
else else
{ {
@ -753,27 +741,27 @@ retblock:
* *
* Router session must be locked. * Router session must be locked.
*/ */
bool SchemaRouterSession::execute_sescmd_in_backend(backend_ref_t* backend_ref) bool SchemaRouterSession::execute_sescmd_in_backend(Backend& backend_ref)
{ {
if (BREF_IS_CLOSED(backend_ref)) if (BREF_IS_CLOSED(&backend_ref))
{ {
return false; return false;
} }
DCB *dcb = backend_ref->dcb; DCB *dcb = backend_ref.dcb;
CHK_DCB(dcb); CHK_DCB(dcb);
int rc = 0; int rc = 0;
/** Return if there are no pending ses commands */ /** Return if there are no pending ses commands */
if (backend_ref->session_commands.size() == 0) if (backend_ref.session_commands.size() == 0)
{ {
MXS_INFO("Cursor had no pending session commands."); MXS_INFO("Cursor had no pending session commands.");
return false; return false;
} }
SessionCommandList::iterator iter = backend_ref->session_commands.begin(); SessionCommandList::iterator iter = backend_ref.session_commands.begin();
GWBUF *buffer = iter->copy_buffer().release(); GWBUF *buffer = iter->copy_buffer().release();
switch (iter->get_command()) switch (iter->get_command())
@ -813,7 +801,6 @@ bool SchemaRouterSession::execute_sescmd_in_backend(backend_ref_t* backend_ref)
bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command) bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command)
{ {
bool succp = false; bool succp = false;
backend_ref_t *backend_ref = m_backends;
MXS_INFO("Session write, routing to all servers."); MXS_INFO("Session write, routing to all servers.");
atomic_add(&m_stats.longest_sescmd, 1); atomic_add(&m_stats.longest_sescmd, 1);
@ -821,24 +808,22 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command)
/** Increment the session command count */ /** Increment the session command count */
++m_sent_sescmd; ++m_sent_sescmd;
for (int i = 0; i < m_backend_count; i++) for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{ {
if (BREF_IS_IN_USE((&backend_ref[i]))) if (BREF_IS_IN_USE(it))
{ {
GWBUF *buffer = gwbuf_clone(querybuf); GWBUF *buffer = gwbuf_clone(querybuf);
backend_ref[i].session_commands.push_back(SessionCommand(buffer, m_sent_sescmd)); it->session_commands.push_back(SessionCommand(buffer, m_sent_sescmd));
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO))
{ {
MXS_INFO("Route query to %s\t%s:%d%s", MXS_INFO("Route query to %s\t%s:%d",
(SERVER_IS_MASTER(backend_ref[i].backend->server) ? SERVER_IS_MASTER(it->backend->server) ? "master" : "slave",
"master" : "slave"), it->backend->server->name,
backend_ref[i].backend->server->name, it->backend->server->port);
backend_ref[i].backend->server->port,
(i + 1 == m_backend_count ? " <" : ""));
} }
if (backend_ref[i].session_commands.size() == 1) if (it->session_commands.size() == 1)
{ {
/** Only one command, execute it */ /** Only one command, execute it */
switch (command) switch (command)
@ -849,11 +834,11 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command)
break; break;
default: default:
bref_set_state(&backend_ref[i], BREF_WAITING_RESULT); bref_set_state(*it, BREF_WAITING_RESULT);
break; break;
} }
if (execute_sescmd_in_backend(&backend_ref[i])) if (execute_sescmd_in_backend(*it))
{ {
succp = true; succp = true;
} }
@ -861,17 +846,17 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command)
{ {
MXS_ERROR("Failed to execute session " MXS_ERROR("Failed to execute session "
"command in %s:%d", "command in %s:%d",
backend_ref[i].backend->server->name, it->backend->server->name,
backend_ref[i].backend->server->port); it->backend->server->port);
} }
} }
else else
{ {
ss_dassert(backend_ref[i].session_commands.size() > 1); ss_dassert(it->session_commands.size() > 1);
/** The server is already executing a session command */ /** The server is already executing a session command */
MXS_INFO("Backend %s:%d already executing sescmd.", MXS_INFO("Backend %s:%d already executing sescmd.",
backend_ref[i].backend->server->name, it->backend->server->name,
backend_ref[i].backend->server->port); it->backend->server->port);
succp = true; succp = true;
} }
} }
@ -882,13 +867,13 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command)
void SchemaRouterSession::handle_error_reply_client(DCB* dcb, GWBUF* errmsg) void SchemaRouterSession::handle_error_reply_client(DCB* dcb, GWBUF* errmsg)
{ {
backend_ref_t* bref = get_bref_from_dcb(dcb); Backend* bref = get_bref_from_dcb(dcb);
if (bref) if (bref)
{ {
bref_clear_state(bref, BREF_IN_USE); bref_clear_state(*bref, BREF_IN_USE);
bref_set_state(bref, BREF_CLOSED); bref_set_state(*bref, BREF_CLOSED);
} }
if (dcb->session->state == SESSION_STATE_ROUTER_READY) if (dcb->session->state == SESSION_STATE_ROUTER_READY)
@ -905,10 +890,9 @@ void SchemaRouterSession::handle_error_reply_client(DCB* dcb, GWBUF* errmsg)
*/ */
bool SchemaRouterSession::have_servers() bool SchemaRouterSession::have_servers()
{ {
for (int i = 0; i < m_backend_count; i++) for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{ {
if (BREF_IS_IN_USE(&m_backends[i]) && if (BREF_IS_IN_USE(it) && !BREF_IS_CLOSED(it))
!BREF_IS_CLOSED(&m_backends[i]))
{ {
return true; return true;
} }
@ -932,15 +916,12 @@ bool SchemaRouterSession::have_servers()
*/ */
bool SchemaRouterSession::handle_error_new_connection(DCB* backend_dcb, GWBUF* errmsg) bool SchemaRouterSession::handle_error_new_connection(DCB* backend_dcb, GWBUF* errmsg)
{ {
backend_ref_t* bref;
MXS_SESSION *ses = backend_dcb->session; MXS_SESSION *ses = backend_dcb->session;
CHK_SESSION(ses); CHK_SESSION(ses);
/** Backend* bref = get_bref_from_dcb(backend_dcb);
* If bref == NULL it has been replaced already with another one.
*/ if (bref == NULL)
if ((bref = get_bref_from_dcb(backend_dcb)) == NULL)
{ {
/** This should not happen */ /** This should not happen */
ss_dassert(false); ss_dassert(false);
@ -957,10 +938,10 @@ bool SchemaRouterSession::handle_error_new_connection(DCB* backend_dcb, GWBUF* e
DCB* client_dcb; DCB* client_dcb;
client_dcb = ses->client_dcb; client_dcb = ses->client_dcb;
client_dcb->func.write(client_dcb, gwbuf_clone(errmsg)); client_dcb->func.write(client_dcb, gwbuf_clone(errmsg));
bref_clear_state(bref, BREF_WAITING_RESULT); bref_clear_state(*bref, BREF_WAITING_RESULT);
} }
bref_clear_state(bref, BREF_IN_USE); bref_clear_state(*bref, BREF_IN_USE);
bref_set_state(bref, BREF_CLOSED); bref_set_state(*bref, BREF_CLOSED);
return have_servers(); return have_servers();
} }
@ -973,15 +954,15 @@ bool SchemaRouterSession::handle_error_new_connection(DCB* backend_dcb, GWBUF* e
* *
* @return backend reference pointer if succeed or NULL * @return backend reference pointer if succeed or NULL
*/ */
backend_ref_t* SchemaRouterSession::get_bref_from_dcb(DCB* dcb) Backend* SchemaRouterSession::get_bref_from_dcb(DCB* dcb)
{ {
CHK_DCB(dcb); CHK_DCB(dcb);
for (int i = 0; i < m_backend_count; i++) for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{ {
if (m_backends[i].dcb == dcb) if (it->dcb == dcb)
{ {
return &m_backends[i]; return &(*it);
} }
} }
@ -1183,29 +1164,28 @@ void SchemaRouterSession::route_queued_query()
* @param router_cli_ses Router client session * @param router_cli_ses Router client session
* @return 1 if mapping is done, 0 if it is still ongoing and -1 on error * @return 1 if mapping is done, 0 if it is still ongoing and -1 on error
*/ */
int SchemaRouterSession::inspect_backend_mapping_states(backend_ref_t *bref, int SchemaRouterSession::inspect_backend_mapping_states(Backend *bref,
GWBUF** wbuf) GWBUF** wbuf)
{ {
bool mapped = true; bool mapped = true;
GWBUF* writebuf = *wbuf; GWBUF* writebuf = *wbuf;
backend_ref_t* bkrf = m_backends;
for (int i = 0; i < m_backend_count; i++) for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{ {
if (bref->dcb == bkrf[i].dcb && !BREF_IS_MAPPED(&bkrf[i])) if (bref->dcb == it->dcb && !BREF_IS_MAPPED(it))
{ {
if (bref->map_queue) if (bref->map_queue)
{ {
writebuf = gwbuf_append(bref->map_queue, writebuf); writebuf = gwbuf_append(bref->map_queue, writebuf);
bref->map_queue = NULL; bref->map_queue = NULL;
} }
enum showdb_response rc = parse_showdb_response(&m_backends[i], enum showdb_response rc = parse_showdb_response(&(*it),
&writebuf); &writebuf);
if (rc == SHOWDB_FULL_RESPONSE) if (rc == SHOWDB_FULL_RESPONSE)
{ {
m_backends[i].mapped = true; it->mapped = true;
MXS_DEBUG("Received SHOW DATABASES reply from %s for session %p", MXS_DEBUG("Received SHOW DATABASES reply from %s for session %p",
m_backends[i].backend->server->unique_name, it->backend->server->unique_name,
m_client->session); m_client->session);
} }
else if (rc == SHOWDB_PARTIAL_RESPONSE) else if (rc == SHOWDB_PARTIAL_RESPONSE)
@ -1213,7 +1193,7 @@ int SchemaRouterSession::inspect_backend_mapping_states(backend_ref_t *bref,
bref->map_queue = writebuf; bref->map_queue = writebuf;
writebuf = NULL; writebuf = NULL;
MXS_DEBUG("Received partial SHOW DATABASES reply from %s for session %p", MXS_DEBUG("Received partial SHOW DATABASES reply from %s for session %p",
m_backends[i].backend->server->unique_name, it->backend->server->unique_name,
m_client->session); m_client->session);
} }
else else
@ -1262,11 +1242,11 @@ int SchemaRouterSession::inspect_backend_mapping_states(backend_ref_t *bref,
} }
} }
if (BREF_IS_IN_USE(&bkrf[i]) && !BREF_IS_MAPPED(&bkrf[i])) if (BREF_IS_IN_USE(it) && !BREF_IS_MAPPED(it))
{ {
mapped = false; mapped = false;
MXS_DEBUG("Still waiting for reply to SHOW DATABASES from %s for session %p", MXS_DEBUG("Still waiting for reply to SHOW DATABASES from %s for session %p",
bkrf[i].backend->server->unique_name, it->backend->server->unique_name,
m_client->session); m_client->session);
} }
} }
@ -1411,7 +1391,7 @@ char* get_lenenc_str(void* data)
* @return 1 if a complete response was received, 0 if a partial response was received * @return 1 if a complete response was received, 0 if a partial response was received
* and -1 if a database was found on more than one server. * and -1 if a database was found on more than one server.
*/ */
enum showdb_response SchemaRouterSession::parse_showdb_response(backend_ref_t* bref, GWBUF** buffer) enum showdb_response SchemaRouterSession::parse_showdb_response(Backend* bref, GWBUF** buffer)
{ {
unsigned char* ptr; unsigned char* ptr;
SERVER* target = bref->backend->server; SERVER* target = bref->backend->server;
@ -1539,10 +1519,10 @@ int SchemaRouterSession::gen_databaselist()
int i, rval = 0; int i, rval = 0;
unsigned int len; unsigned int len;
for (i = 0; i < m_backend_count; i++) for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{ {
m_backends[i].mapped = false; it->mapped = false;
m_backends[i].n_mapping_eof = 0; it->n_mapping_eof = 0;
} }
m_state |= INIT_MAPPING; m_state |= INIT_MAPPING;
@ -1557,17 +1537,17 @@ int SchemaRouterSession::gen_databaselist()
*(data + 4) = 0x03; *(data + 4) = 0x03;
memcpy(data + 5, query, strlen(query)); memcpy(data + 5, query, strlen(query));
for (i = 0; i < m_backend_count; i++) for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{ {
if (BREF_IS_IN_USE(&m_backends[i]) && if (BREF_IS_IN_USE(it) &&
!BREF_IS_CLOSED(&m_backends[i]) & !BREF_IS_CLOSED(it) &
SERVER_IS_RUNNING(m_backends[i].backend->server)) SERVER_IS_RUNNING(it->backend->server))
{ {
clone = gwbuf_clone(buffer); clone = gwbuf_clone(buffer);
dcb = m_backends[i].dcb; dcb = it->dcb;
rval |= !dcb->func.write(dcb, clone); rval |= !dcb->func.write(dcb, clone);
MXS_DEBUG("Wrote SHOW DATABASES to %s for session %p: returned %d", MXS_DEBUG("Wrote SHOW DATABASES to %s for session %p: returned %d",
m_backends[i].backend->server->unique_name, it->backend->server->unique_name,
m_client->session, m_client->session,
rval); rval);
} }
@ -1666,13 +1646,13 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype)
} }
else if (buffer->hint && buffer->hint->type == HINT_ROUTE_TO_NAMED_SERVER) else if (buffer->hint && buffer->hint->type == HINT_ROUTE_TO_NAMED_SERVER)
{ {
for (int i = 0; i < m_backend_count; i++) for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{ {
char *srvnm = m_backends[i].backend->server->unique_name; char *srvnm = it->backend->server->unique_name;
if (strcmp(srvnm, (char*)buffer->hint->data) == 0) if (strcmp(srvnm, (char*)buffer->hint->data) == 0)
{ {
rval = m_backends[i].backend->server; rval = it->backend->server;
MXS_INFO("Routing hint found (%s)", rval->unique_name); MXS_INFO("Routing hint found (%s)", rval->unique_name);
} }
} }
@ -1712,39 +1692,28 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype)
*/ */
bool SchemaRouterSession::get_shard_dcb(DCB** p_dcb, char* name) bool SchemaRouterSession::get_shard_dcb(DCB** p_dcb, char* name)
{ {
backend_ref_t* backend_ref;
int i;
bool succp = false; bool succp = false;
ss_dassert(p_dcb != NULL && *(p_dcb) == NULL); ss_dassert(p_dcb != NULL && *(p_dcb) == NULL);
if (p_dcb == NULL || name == NULL) for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{ {
goto return_succp; SERVER_REF* b = it->backend;
}
backend_ref = m_backends;
for (i = 0; i < m_backend_count; i++)
{
SERVER_REF* b = backend_ref[i].backend;
/** /**
* To become chosen: * To become chosen:
* backend must be in use, name must match, and * backend must be in use, name must match, and
* the backend state must be RUNNING * the backend state must be RUNNING
*/ */
if (BREF_IS_IN_USE((&backend_ref[i])) && if (BREF_IS_IN_USE((&(*it))) &&
(strncasecmp(name, b->server->unique_name, PATH_MAX) == 0) && (strncasecmp(name, b->server->unique_name, PATH_MAX) == 0) &&
SERVER_IS_RUNNING(b->server)) SERVER_IS_RUNNING(b->server))
{ {
*p_dcb = backend_ref[i].dcb; *p_dcb = it->dcb;
succp = true; succp = true;
ss_dassert(backend_ref[i].dcb->state != DCB_STATE_ZOMBIE); ss_dassert(it->dcb->state != DCB_STATE_ZOMBIE);
goto return_succp; break;
} }
} }
return_succp:
return succp; return succp;
} }
@ -1843,56 +1812,46 @@ bool SchemaRouterSession::send_database_list()
return rval; return rval;
} }
void bref_clear_state(backend_ref_t* bref, enum bref_state state) void bref_clear_state(Backend& bref, enum bref_state state)
{ {
if (bref == NULL)
{
MXS_ERROR("[%s] Error: NULL parameter.", __FUNCTION__);
return;
}
if (state != BREF_WAITING_RESULT) if (state != BREF_WAITING_RESULT)
{ {
bref->state &= ~state; bref.state &= ~state;
} }
else else
{ {
/** Decrease global operation count */ /** Decrease global operation count */
int prev2 = atomic_add(&bref->backend->server->stats.n_current_ops, -1); int prev2 = atomic_add(&bref.backend->server->stats.n_current_ops, -1);
ss_dassert(prev2 > 0); ss_dassert(prev2 > 0);
if (prev2 <= 0) if (prev2 <= 0)
{ {
MXS_ERROR("[%s] Error: negative current operation count in backend %s:%u", MXS_ERROR("[%s] Error: negative current operation count in backend %s:%u",
__FUNCTION__, __FUNCTION__,
bref->backend->server->name, bref.backend->server->name,
bref->backend->server->port); bref.backend->server->port);
} }
} }
} }
void bref_set_state(backend_ref_t* bref, enum bref_state state) void bref_set_state(Backend& bref, enum bref_state state)
{ {
if (bref == NULL)
{
MXS_ERROR("[%s] Error: NULL parameter.", __FUNCTION__);
return;
}
if (state != BREF_WAITING_RESULT) if (state != BREF_WAITING_RESULT)
{ {
bref->state |= state; bref.state |= state;
} }
else else
{ {
/** Increase global operation count */ /** Increase global operation count */
int prev2 = atomic_add(&bref->backend->server->stats.n_current_ops, 1); int prev2 = atomic_add(&bref.backend->server->stats.n_current_ops, 1);
ss_dassert(prev2 >= 0); ss_dassert(prev2 >= 0);
if (prev2 < 0) if (prev2 < 0)
{ {
MXS_ERROR("[%s] Error: negative current operation count in backend %s:%u", MXS_ERROR("[%s] Error: negative current operation count in backend %s:%u",
__FUNCTION__, __FUNCTION__,
bref->backend->server->name, bref.backend->server->name,
bref->backend->server->port); bref.backend->server->port);
} }
} }
} }
@ -1922,9 +1881,7 @@ void bref_set_state(backend_ref_t* bref, enum bref_state state)
* connections because all servers are supposed to be operational. It is, * connections because all servers are supposed to be operational. It is,
* however, possible that there are less available servers than expected. * however, possible that there are less available servers than expected.
*/ */
bool connect_backend_servers(backend_ref_t* backend_ref, bool connect_backend_servers(BackendList& backends, MXS_SESSION* session)
int router_nservers,
MXS_SESSION* session)
{ {
bool succp = false; bool succp = false;
int servers_found = 0; int servers_found = 0;
@ -1935,9 +1892,9 @@ bool connect_backend_servers(backend_ref_t* backend_ref,
{ {
MXS_INFO("Servers and connection counts:"); MXS_INFO("Servers and connection counts:");
for (int i = 0; i < router_nservers; i++) for (BackendList::iterator it = backends.begin(); it != backends.end(); it++)
{ {
SERVER_REF* b = backend_ref[i].backend; SERVER_REF* b = it->backend;
MXS_INFO("MaxScale connections : %d (%d) in \t%s:%d %s", MXS_INFO("MaxScale connections : %d (%d) in \t%s:%d %s",
b->connections, b->connections,
@ -1951,27 +1908,23 @@ bool connect_backend_servers(backend_ref_t* backend_ref,
* Scan server list and connect each of them. None should fail or session * Scan server list and connect each of them. None should fail or session
* can't be established. * can't be established.
*/ */
for (int i = 0; i < router_nservers; i++) for (BackendList::iterator it = backends.begin(); it != backends.end(); it++)
{ {
SERVER_REF* b = backend_ref[i].backend; SERVER_REF* b = it->backend;
if (SERVER_IS_RUNNING(b->server)) if (SERVER_IS_RUNNING(b->server))
{ {
servers_found += 1; servers_found += 1;
/** Server is already connected */ /** Server is already connected */
if (BREF_IS_IN_USE((&backend_ref[i]))) if (BREF_IS_IN_USE(it))
{ {
slaves_connected += 1; slaves_connected += 1;
} }
/** New server connection */ /** New server connection */
else else
{ {
backend_ref[i].dcb = dcb_connect(b->server, if ((it->dcb = dcb_connect(b->server, session, b->server->protocol)))
session,
b->server->protocol);
if (backend_ref[i].dcb != NULL)
{ {
servers_connected += 1; servers_connected += 1;
/** /**
@ -1982,8 +1935,8 @@ bool connect_backend_servers(backend_ref_t* backend_ref,
* table. * table.
*/ */
backend_ref[i].state = 0; it->state = 0;
bref_set_state(&backend_ref[i], BREF_IN_USE); bref_set_state(*it, BREF_IN_USE);
/** /**
* Increase backend connection counter. * Increase backend connection counter.
* Server's stats are _increased_ in * Server's stats are _increased_ in
@ -2013,11 +1966,11 @@ bool connect_backend_servers(backend_ref_t* backend_ref,
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO))
{ {
for (int i = 0; i < router_nservers; i++) for (BackendList::iterator it = backends.begin(); it != backends.end(); it++)
{ {
SERVER_REF* b = backend_ref[i].backend; SERVER_REF* b = it->backend;
if (BREF_IS_IN_USE((&backend_ref[i]))) if (BREF_IS_IN_USE(it))
{ {
MXS_INFO("Connected %s in \t%s:%d", MXS_INFO("Connected %s in \t%s:%d",
STRSRVSTATUS(b->server), STRSRVSTATUS(b->server),

View File

@ -61,13 +61,6 @@ enum bref_state
BREF_DB_MAPPED = 0x10 BREF_DB_MAPPED = 0x10
}; };
#define BREF_IS_NOT_USED(s) ((s)->state & ~BREF_IN_USE)
#define BREF_IS_IN_USE(s) ((s)->state & BREF_IN_USE)
#define BREF_IS_WAITING_RESULT(s) ((s)->num_result_wait > 0)
#define BREF_IS_QUERY_ACTIVE(s) ((s)->state & BREF_QUERY_ACTIVE)
#define BREF_IS_CLOSED(s) ((s)->state & BREF_CLOSED)
#define BREF_IS_MAPPED(s) ((s)->mapped)
#define SCHEMA_ERR_DUPLICATEDB 5000 #define SCHEMA_ERR_DUPLICATEDB 5000
#define SCHEMA_ERRSTR_DUPLICATEDB "DUPDB" #define SCHEMA_ERRSTR_DUPLICATEDB "DUPDB"
#define SCHEMA_ERR_DBNOTFOUND 1049 #define SCHEMA_ERR_DBNOTFOUND 1049
@ -95,20 +88,33 @@ enum route_target
* *
* Owned by router client session. * Owned by router client session.
*/ */
typedef struct backend_ref_st class Backend
{ {
int n_mapping_eof; public:
GWBUF* map_queue; Backend(SERVER_REF *ref);
~Backend();
SERVER_REF* backend; /**< Backend server */ SERVER_REF* backend; /**< Backend server */
DCB* dcb; /**< Backend DCB */ DCB* dcb; /**< Backend DCB */
int state; /**< State of the backend */ GWBUF* map_queue;
bool mapped; /**< Whether the backend has been mapped */ bool mapped; /**< Whether the backend has been mapped */
int n_mapping_eof;
int num_result_wait; /**< Number of not yet received results */ int num_result_wait; /**< Number of not yet received results */
GWBUF* pending_cmd; /**< Pending commands */ GWBUF* pending_cmd; /**< Pending commands */
int state; /**< State of the backend */
SessionCommandList session_commands; /**< List of session commands that are SessionCommandList session_commands; /**< List of session commands that are
* to be executed on this backend server */ * to be executed on this backend server */
} backend_ref_t; };
typedef list<Backend> BackendList;
// TODO: Move these as member functions, currently they operate on iterators
#define BREF_IS_NOT_USED(s) ((s)->state & ~BREF_IN_USE)
#define BREF_IS_IN_USE(s) ((s)->state & BREF_IN_USE)
#define BREF_IS_WAITING_RESULT(s) ((s)->num_result_wait > 0)
#define BREF_IS_QUERY_ACTIVE(s) ((s)->state & BREF_QUERY_ACTIVE)
#define BREF_IS_CLOSED(s) ((s)->state & BREF_CLOSED)
#define BREF_IS_MAPPED(s) ((s)->mapped)
class SchemaRouter; class SchemaRouter;
@ -163,8 +169,8 @@ public:
private: private:
/** Internal functions */ /** Internal functions */
SERVER* get_shard_target(GWBUF* buffer, uint32_t qtype); SERVER* get_shard_target(GWBUF* buffer, uint32_t qtype);
backend_ref_t* get_bref_from_dcb(DCB* dcb); Backend* get_bref_from_dcb(DCB* dcb);
bool execute_sescmd_in_backend(backend_ref_t* backend_ref); bool execute_sescmd_in_backend(Backend& backend_ref);
bool get_shard_dcb(DCB** dcb, char* name); bool get_shard_dcb(DCB** dcb, char* name);
bool handle_default_db(); bool handle_default_db();
bool handle_error_new_connection(DCB* backend_dcb, GWBUF* errmsg); bool handle_error_new_connection(DCB* backend_dcb, GWBUF* errmsg);
@ -172,14 +178,14 @@ private:
bool route_session_write(GWBUF* querybuf, uint8_t command); bool route_session_write(GWBUF* querybuf, uint8_t command);
bool send_database_list(); bool send_database_list();
int gen_databaselist(); int gen_databaselist();
int inspect_backend_mapping_states(backend_ref_t *bref, GWBUF** wbuf); int inspect_backend_mapping_states(Backend *bref, GWBUF** wbuf);
bool process_show_shards(); bool process_show_shards();
enum showdb_response parse_showdb_response(backend_ref_t* bref, GWBUF** buffer); enum showdb_response parse_showdb_response(Backend* bref, GWBUF** buffer);
void handle_error_reply_client(DCB* backend_dcb, GWBUF* errmsg); void handle_error_reply_client(DCB* backend_dcb, GWBUF* errmsg);
void route_queued_query(); void route_queued_query();
void synchronize_shard_map(); void synchronize_shard_map();
void handle_mapping_reply(backend_ref_t* bref, GWBUF* pPacket); void handle_mapping_reply(Backend* bref, GWBUF* pPacket);
void process_response(backend_ref_t* bref, GWBUF** ppPacket); void process_response(Backend* bref, GWBUF** ppPacket);
SERVER* resolve_query_target(GWBUF* pPacket, uint32_t type, uint8_t command, SERVER* resolve_query_target(GWBUF* pPacket, uint32_t type, uint8_t command,
enum route_target& route_target); enum route_target& route_target);
@ -187,9 +193,8 @@ private:
bool m_closed; /**< True if session closed */ bool m_closed; /**< True if session closed */
DCB* m_client; /**< The client DCB */ DCB* m_client; /**< The client DCB */
MYSQL_session* m_mysql_session; /**< Session client data (username, password, SHA1). */ MYSQL_session* m_mysql_session; /**< Session client data (username, password, SHA1). */
backend_ref_t* m_backends; /**< Pointer to backend reference array */ BackendList m_backends; /**< Backend references */
Config* m_config; /**< Pointer to router config */ Config* m_config; /**< Pointer to router config */
int m_backend_count; /**< Number of backends */
SchemaRouter* m_router; /**< The router instance */ SchemaRouter* m_router; /**< The router instance */
Shard m_shard; /**< Database to server mapping */ Shard m_shard; /**< Database to server mapping */
string m_connect_db; /**< Database the user was trying to connect to */ string m_connect_db; /**< Database the user was trying to connect to */