Move Backend functions inside the class

The functions that handle Backend classes are now methods of the class
itself.

Prefix all member variables with `m_` to distinct them from other
variables.
This commit is contained in:
Markus Mäkelä
2017-03-27 20:32:21 +03:00
parent d1aa2a4b8a
commit 5ba9de6f42
2 changed files with 179 additions and 208 deletions

View File

@ -22,10 +22,6 @@
bool connect_backend_servers(BackendList& backends, MXS_SESSION* session); bool connect_backend_servers(BackendList& backends, MXS_SESSION* session);
bool execute_sescmd_in_backend(Backend* backend_ref);
void bref_clear_state(Backend& bref, enum bref_state state);
void bref_set_state(Backend& bref, enum bref_state state);
enum route_target get_shard_route_target(uint32_t qtype); enum route_target get_shard_route_target(uint32_t qtype);
bool change_current_db(string& dest, Shard& shard, GWBUF* buf); bool change_current_db(string& dest, Shard& shard, GWBUF* buf);
bool extract_database(GWBUF* buf, char* str); bool extract_database(GWBUF* buf, char* str);
@ -33,21 +29,93 @@ bool detect_show_shards(GWBUF* query);
void write_error_to_client(DCB* dcb, int errnum, const char* mysqlstate, const char* errmsg); void write_error_to_client(DCB* dcb, int errnum, const char* mysqlstate, const char* errmsg);
Backend::Backend(SERVER_REF *ref): Backend::Backend(SERVER_REF *ref):
backend(ref), m_backend(ref),
dcb(NULL), m_dcb(NULL),
map_queue(NULL), m_map_queue(NULL),
mapped(false), m_mapped(false),
n_mapping_eof(0), m_num_mapping_eof(0),
num_result_wait(0), m_num_result_wait(0),
pending_cmd(NULL), m_pending_cmd(NULL),
state(0) m_state(0)
{ {
} }
Backend::~Backend() Backend::~Backend()
{ {
gwbuf_free(map_queue); gwbuf_free(m_map_queue);
gwbuf_free(pending_cmd); gwbuf_free(m_pending_cmd);
}
bool Backend::execute_sescmd()
{
if (BREF_IS_CLOSED(this))
{
return false;
}
CHK_DCB(m_dcb);
int rc = 0;
/** Return if there are no pending ses commands */
if (m_session_commands.size() == 0)
{
MXS_INFO("Cursor had no pending session commands.");
return false;
}
SessionCommandList::iterator iter = m_session_commands.begin();
GWBUF *buffer = iter->copy_buffer().release();
switch (iter->get_command())
{
case MYSQL_COM_CHANGE_USER:
/** This makes it possible to handle replies correctly */
gwbuf_set_type(buffer, GWBUF_TYPE_SESCMD);
rc = m_dcb->func.auth(m_dcb, NULL, m_dcb->session, buffer);
break;
case MYSQL_COM_QUERY:
default:
/**
* Mark session command buffer, it triggers writing
* MySQL command to protocol
*/
gwbuf_set_type(buffer, GWBUF_TYPE_SESCMD);
rc = m_dcb->func.write(m_dcb, buffer);
break;
}
return rc == 1;
}
void Backend::clear_state(enum bref_state state)
{
if (state != BREF_WAITING_RESULT)
{
m_state &= ~state;
}
else
{
/** Decrease global operation count */
ss_debug(int prev2 = )atomic_add(&m_backend->server->stats.n_current_ops, -1);
ss_dassert(prev2 > 0);
}
}
void Backend::set_state(enum bref_state state)
{
if (state != BREF_WAITING_RESULT)
{
m_state |= state;
}
else
{
/** Increase global operation count */
ss_debug(int prev2 = )atomic_add(&m_backend->server->stats.n_current_ops, 1);
ss_dassert(prev2 >= 0);
}
} }
SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter* router): SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter* router):
@ -131,7 +199,7 @@ void SchemaRouterSession::close()
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{ {
DCB* dcb = it->dcb; DCB* dcb = it->m_dcb;
/** Close those which had been connected */ /** Close those which had been connected */
if (BREF_IS_IN_USE(it)) if (BREF_IS_IN_USE(it))
{ {
@ -140,16 +208,16 @@ void SchemaRouterSession::close()
/** Clean operation counter in bref and in SERVER */ /** Clean operation counter in bref and in SERVER */
while (BREF_IS_WAITING_RESULT(it)) while (BREF_IS_WAITING_RESULT(it))
{ {
bref_clear_state(*it, BREF_WAITING_RESULT); it->clear_state(BREF_WAITING_RESULT);
} }
bref_clear_state(*it, BREF_IN_USE); it->clear_state(BREF_IN_USE);
bref_set_state(*it, BREF_CLOSED); it->set_state(BREF_CLOSED);
/** /**
* closes protocol and dcb * closes protocol and dcb
*/ */
dcb_close(dcb); dcb_close(dcb);
/** decrease server current connection counters */ /** decrease server current connection counters */
atomic_add(&it->backend->connections, -1); atomic_add(&it->m_backend->connections, -1);
} }
} }
@ -281,7 +349,7 @@ SERVER* SchemaRouterSession::resolve_query_target(GWBUF* pPacket,
{ {
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{ {
SERVER *server = it->backend->server; SERVER *server = it->m_backend->server;
if (SERVER_IS_RUNNING(server)) if (SERVER_IS_RUNNING(server))
{ {
route_target = TARGET_NAMED_SERVER; route_target = TARGET_NAMED_SERVER;
@ -443,14 +511,14 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
/** We know where to route this query */ /** We know where to route this query */
Backend *bref = get_bref_from_dcb(target_dcb); Backend *bref = get_bref_from_dcb(target_dcb);
MXS_INFO("Route query to \t%s:%d <", bref->backend->server->name, bref->backend->server->port); MXS_INFO("Route query to \t%s:%d <", bref->m_backend->server->name, bref->m_backend->server->port);
if (bref->session_commands.size() > 0) if (bref->m_session_commands.size() > 0)
{ {
/** Store current statement if execution of the previous /** Store current statement if execution of the previous
* session command hasn't been completed. */ * session command hasn't been completed. */
ss_dassert((bref->pending_cmd == NULL || m_closed)); ss_dassert((bref->m_pending_cmd == NULL || m_closed));
bref->pending_cmd = pPacket; bref->m_pending_cmd = pPacket;
ret = 1; ret = 1;
} }
else if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(pPacket))) == 1) else if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(pPacket))) == 1)
@ -463,8 +531,8 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
* Add one query response waiter to backend reference * Add one query response waiter to backend reference
*/ */
bref = get_bref_from_dcb(target_dcb); bref = get_bref_from_dcb(target_dcb);
bref_set_state(*bref, BREF_QUERY_ACTIVE); bref->set_state(BREF_QUERY_ACTIVE);
bref_set_state(*bref, BREF_WAITING_RESULT); bref->set_state(BREF_WAITING_RESULT);
} }
else else
{ {
@ -511,13 +579,13 @@ void SchemaRouterSession::handle_mapping_reply(Backend* bref, GWBUF* pPacket)
void SchemaRouterSession::process_response(Backend* bref, GWBUF** ppPacket) void SchemaRouterSession::process_response(Backend* bref, GWBUF** ppPacket)
{ {
if (bref->session_commands.size() > 0) if (bref->m_session_commands.size() > 0)
{ {
/** We are executing a session command */ /** We are executing a session command */
if (GWBUF_IS_TYPE_SESCMD_RESPONSE((*ppPacket))) if (GWBUF_IS_TYPE_SESCMD_RESPONSE((*ppPacket)))
{ {
if (m_replied_sescmd < m_sent_sescmd && if (m_replied_sescmd < m_sent_sescmd &&
bref->session_commands.front().get_position() == m_replied_sescmd + 1) bref->m_session_commands.front().get_position() == m_replied_sescmd + 1)
{ {
/** First reply to this session command, route it to the client */ /** First reply to this session command, route it to the client */
++m_replied_sescmd; ++m_replied_sescmd;
@ -530,19 +598,19 @@ void SchemaRouterSession::process_response(Backend* bref, GWBUF** ppPacket)
*ppPacket = NULL; *ppPacket = NULL;
} }
bref->session_commands.pop_front(); bref->m_session_commands.pop_front();
} }
if (*ppPacket) if (*ppPacket)
{ {
bref_clear_state(*bref, BREF_WAITING_RESULT); bref->clear_state(BREF_WAITING_RESULT);
} }
} }
else if (BREF_IS_QUERY_ACTIVE(bref)) else if (BREF_IS_QUERY_ACTIVE(bref))
{ {
bref_clear_state(*bref, BREF_QUERY_ACTIVE); bref->clear_state(BREF_QUERY_ACTIVE);
/** Set response status as replied */ /** Set response status as replied */
bref_clear_state(*bref, BREF_WAITING_RESULT); bref->clear_state(BREF_WAITING_RESULT);
} }
} }
@ -558,7 +626,7 @@ void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
MXS_DEBUG("Reply from [%s] session [%p]" MXS_DEBUG("Reply from [%s] session [%p]"
" mapping [%s] queries queued [%s]", " mapping [%s] queries queued [%s]",
bref->backend->server->unique_name, bref->m_backend->server->unique_name,
m_client->session, m_client->session,
m_state & INIT_MAPPING ? "true" : "false", m_state & INIT_MAPPING ? "true" : "false",
m_queue.size() == 0 ? "none" : m_queue.size() == 0 ? "none" :
@ -597,24 +665,24 @@ void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
pPacket = NULL; pPacket = NULL;
} }
if (bref->session_commands.size() > 0) if (bref->m_session_commands.size() > 0)
{ {
/** There are pending session commands to be executed. */ /** There are pending session commands to be executed. */
MXS_INFO("Backend %s:%d processed reply and starts to execute active cursor.", MXS_INFO("Backend %s:%d processed reply and starts to execute active cursor.",
bref->backend->server->name, bref->backend->server->port); bref->m_backend->server->name, bref->m_backend->server->port);
execute_sescmd_in_backend(*bref); bref->execute_sescmd();
} }
else if (bref->pending_cmd) /*< non-sescmd is waiting to be routed */ else if (bref->m_pending_cmd) /*< non-sescmd is waiting to be routed */
{ {
CHK_GWBUF(bref->pending_cmd); CHK_GWBUF(bref->m_pending_cmd);
int ret = bref->dcb->func.write(bref->dcb, bref->pending_cmd); int ret = bref->m_dcb->func.write(bref->m_dcb, bref->m_pending_cmd);
bref->pending_cmd = NULL; bref->m_pending_cmd = NULL;
if (ret == 1) if (ret == 1)
{ {
atomic_add(&m_router->m_stats.n_queries, 1); atomic_add(&m_router->m_stats.n_queries, 1);
bref_set_state(*bref, BREF_QUERY_ACTIVE); bref->set_state(BREF_QUERY_ACTIVE);
bref_set_state(*bref, BREF_WAITING_RESULT); bref->set_state(BREF_WAITING_RESULT);
} }
else else
{ {
@ -731,61 +799,6 @@ retblock:
return succp; return succp;
} }
/**
* If session command cursor is passive, sends the command to backend for
* execution.
*
* Returns true if command was sent or added successfully to the queue.
* Returns false if command sending failed or if there are no pending session
* commands.
*
* Router session must be locked.
*/
bool SchemaRouterSession::execute_sescmd_in_backend(Backend& backend_ref)
{
if (BREF_IS_CLOSED(&backend_ref))
{
return false;
}
DCB *dcb = backend_ref.dcb;
CHK_DCB(dcb);
int rc = 0;
/** Return if there are no pending ses commands */
if (backend_ref.session_commands.size() == 0)
{
MXS_INFO("Cursor had no pending session commands.");
return false;
}
SessionCommandList::iterator iter = backend_ref.session_commands.begin();
GWBUF *buffer = iter->copy_buffer().release();
switch (iter->get_command())
{
case MYSQL_COM_CHANGE_USER:
/** This makes it possible to handle replies correctly */
gwbuf_set_type(buffer, GWBUF_TYPE_SESCMD);
rc = dcb->func.auth(dcb, NULL, dcb->session, buffer);
break;
case MYSQL_COM_QUERY:
default:
/**
* Mark session command buffer, it triggers writing
* MySQL command to protocol
*/
gwbuf_set_type(buffer, GWBUF_TYPE_SESCMD);
rc = dcb->func.write(dcb, buffer);
break;
}
return rc == 1;
}
/** /**
* Execute in backends used by current router session. * Execute in backends used by current router session.
* Save session variable commands to router session property * Save session variable commands to router session property
@ -813,17 +826,17 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command)
if (BREF_IS_IN_USE(it)) if (BREF_IS_IN_USE(it))
{ {
GWBUF *buffer = gwbuf_clone(querybuf); GWBUF *buffer = gwbuf_clone(querybuf);
it->session_commands.push_back(SessionCommand(buffer, m_sent_sescmd)); it->m_session_commands.push_back(SessionCommand(buffer, m_sent_sescmd));
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO))
{ {
MXS_INFO("Route query to %s\t%s:%d", MXS_INFO("Route query to %s\t%s:%d",
SERVER_IS_MASTER(it->backend->server) ? "master" : "slave", SERVER_IS_MASTER(it->m_backend->server) ? "master" : "slave",
it->backend->server->name, it->m_backend->server->name,
it->backend->server->port); it->m_backend->server->port);
} }
if (it->session_commands.size() == 1) if (it->m_session_commands.size() == 1)
{ {
/** Only one command, execute it */ /** Only one command, execute it */
switch (command) switch (command)
@ -834,11 +847,11 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command)
break; break;
default: default:
bref_set_state(*it, BREF_WAITING_RESULT); it->set_state(BREF_WAITING_RESULT);
break; break;
} }
if (execute_sescmd_in_backend(*it)) if (it->execute_sescmd())
{ {
succp = true; succp = true;
} }
@ -846,17 +859,17 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command)
{ {
MXS_ERROR("Failed to execute session " MXS_ERROR("Failed to execute session "
"command in %s:%d", "command in %s:%d",
it->backend->server->name, it->m_backend->server->name,
it->backend->server->port); it->m_backend->server->port);
} }
} }
else else
{ {
ss_dassert(it->session_commands.size() > 1); ss_dassert(it->m_session_commands.size() > 1);
/** The server is already executing a session command */ /** The server is already executing a session command */
MXS_INFO("Backend %s:%d already executing sescmd.", MXS_INFO("Backend %s:%d already executing sescmd.",
it->backend->server->name, it->m_backend->server->name,
it->backend->server->port); it->m_backend->server->port);
succp = true; succp = true;
} }
} }
@ -872,8 +885,8 @@ void SchemaRouterSession::handle_error_reply_client(DCB* dcb, GWBUF* errmsg)
if (bref) if (bref)
{ {
bref_clear_state(*bref, BREF_IN_USE); bref->clear_state(BREF_IN_USE);
bref_set_state(*bref, BREF_CLOSED); bref->set_state(BREF_CLOSED);
} }
if (dcb->session->state == SESSION_STATE_ROUTER_READY) if (dcb->session->state == SESSION_STATE_ROUTER_READY)
@ -938,10 +951,10 @@ bool SchemaRouterSession::handle_error_new_connection(DCB* backend_dcb, GWBUF* e
DCB* client_dcb; DCB* client_dcb;
client_dcb = ses->client_dcb; client_dcb = ses->client_dcb;
client_dcb->func.write(client_dcb, gwbuf_clone(errmsg)); client_dcb->func.write(client_dcb, gwbuf_clone(errmsg));
bref_clear_state(*bref, BREF_WAITING_RESULT); bref->clear_state(BREF_WAITING_RESULT);
} }
bref_clear_state(*bref, BREF_IN_USE); bref->clear_state(BREF_IN_USE);
bref_set_state(*bref, BREF_CLOSED); bref->set_state(BREF_CLOSED);
return have_servers(); return have_servers();
} }
@ -960,7 +973,7 @@ Backend* SchemaRouterSession::get_bref_from_dcb(DCB* dcb)
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{ {
if (it->dcb == dcb) if (it->m_dcb == dcb)
{ {
return &(*it); return &(*it);
} }
@ -1172,28 +1185,28 @@ int SchemaRouterSession::inspect_backend_mapping_states(Backend *bref,
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{ {
if (bref->dcb == it->dcb && !BREF_IS_MAPPED(it)) if (bref->m_dcb == it->m_dcb && !BREF_IS_MAPPED(it))
{ {
if (bref->map_queue) if (bref->m_map_queue)
{ {
writebuf = gwbuf_append(bref->map_queue, writebuf); writebuf = gwbuf_append(bref->m_map_queue, writebuf);
bref->map_queue = NULL; bref->m_map_queue = NULL;
} }
enum showdb_response rc = parse_showdb_response(&(*it), enum showdb_response rc = parse_showdb_response(&(*it),
&writebuf); &writebuf);
if (rc == SHOWDB_FULL_RESPONSE) if (rc == SHOWDB_FULL_RESPONSE)
{ {
it->mapped = true; it->m_mapped = true;
MXS_DEBUG("Received SHOW DATABASES reply from %s for session %p", MXS_DEBUG("Received SHOW DATABASES reply from %s for session %p",
it->backend->server->unique_name, it->m_backend->server->unique_name,
m_client->session); m_client->session);
} }
else if (rc == SHOWDB_PARTIAL_RESPONSE) else if (rc == SHOWDB_PARTIAL_RESPONSE)
{ {
bref->map_queue = writebuf; bref->m_map_queue = writebuf;
writebuf = NULL; writebuf = NULL;
MXS_DEBUG("Received partial SHOW DATABASES reply from %s for session %p", MXS_DEBUG("Received partial SHOW DATABASES reply from %s for session %p",
it->backend->server->unique_name, it->m_backend->server->unique_name,
m_client->session); m_client->session);
} }
else else
@ -1246,7 +1259,7 @@ int SchemaRouterSession::inspect_backend_mapping_states(Backend *bref,
{ {
mapped = false; mapped = false;
MXS_DEBUG("Still waiting for reply to SHOW DATABASES from %s for session %p", MXS_DEBUG("Still waiting for reply to SHOW DATABASES from %s for session %p",
it->backend->server->unique_name, it->m_backend->server->unique_name,
m_client->session); m_client->session);
} }
} }
@ -1394,7 +1407,7 @@ char* get_lenenc_str(void* data)
enum showdb_response SchemaRouterSession::parse_showdb_response(Backend* bref, GWBUF** buffer) enum showdb_response SchemaRouterSession::parse_showdb_response(Backend* bref, GWBUF** buffer)
{ {
unsigned char* ptr; unsigned char* ptr;
SERVER* target = bref->backend->server; SERVER* target = bref->m_backend->server;
GWBUF* buf; GWBUF* buf;
bool duplicate_found = false; bool duplicate_found = false;
enum showdb_response rval = SHOWDB_PARTIAL_RESPONSE; enum showdb_response rval = SHOWDB_PARTIAL_RESPONSE;
@ -1422,7 +1435,7 @@ enum showdb_response SchemaRouterSession::parse_showdb_response(Backend* bref, G
return SHOWDB_FATAL_ERROR; return SHOWDB_FATAL_ERROR;
} }
if (bref->n_mapping_eof == 0) if (bref->m_num_mapping_eof == 0)
{ {
/** Skip column definitions */ /** Skip column definitions */
while (ptr < (unsigned char*) buf->end && !PTR_IS_EOF(ptr)) while (ptr < (unsigned char*) buf->end && !PTR_IS_EOF(ptr))
@ -1437,7 +1450,7 @@ enum showdb_response SchemaRouterSession::parse_showdb_response(Backend* bref, G
return SHOWDB_FATAL_ERROR; return SHOWDB_FATAL_ERROR;
} }
atomic_add(&bref->n_mapping_eof, 1); atomic_add(&bref->m_num_mapping_eof, 1);
/** Skip first EOF packet */ /** Skip first EOF packet */
ptr += gw_mysql_get_byte3(ptr) + 4; ptr += gw_mysql_get_byte3(ptr) + 4;
} }
@ -1476,16 +1489,16 @@ enum showdb_response SchemaRouterSession::parse_showdb_response(Backend* bref, G
ptr += packetlen; ptr += packetlen;
} }
if (ptr < (unsigned char*) buf->end && PTR_IS_EOF(ptr) && bref->n_mapping_eof == 1) if (ptr < (unsigned char*) buf->end && PTR_IS_EOF(ptr) && bref->m_num_mapping_eof == 1)
{ {
atomic_add(&bref->n_mapping_eof, 1); atomic_add(&bref->m_num_mapping_eof, 1);
MXS_INFO("SHOW DATABASES fully received from %s.", MXS_INFO("SHOW DATABASES fully received from %s.",
bref->backend->server->unique_name); bref->m_backend->server->unique_name);
} }
else else
{ {
MXS_INFO("SHOW DATABASES partially received from %s.", MXS_INFO("SHOW DATABASES partially received from %s.",
bref->backend->server->unique_name); bref->m_backend->server->unique_name);
} }
gwbuf_free(buf); gwbuf_free(buf);
@ -1494,7 +1507,7 @@ enum showdb_response SchemaRouterSession::parse_showdb_response(Backend* bref, G
{ {
rval = SHOWDB_DUPLICATE_DATABASES; rval = SHOWDB_DUPLICATE_DATABASES;
} }
else if (bref->n_mapping_eof == 2) else if (bref->m_num_mapping_eof == 2)
{ {
rval = SHOWDB_FULL_RESPONSE; rval = SHOWDB_FULL_RESPONSE;
} }
@ -1521,8 +1534,8 @@ int SchemaRouterSession::gen_databaselist()
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{ {
it->mapped = false; it->m_mapped = false;
it->n_mapping_eof = 0; it->m_num_mapping_eof = 0;
} }
m_state |= INIT_MAPPING; m_state |= INIT_MAPPING;
@ -1541,13 +1554,13 @@ int SchemaRouterSession::gen_databaselist()
{ {
if (BREF_IS_IN_USE(it) && if (BREF_IS_IN_USE(it) &&
!BREF_IS_CLOSED(it) & !BREF_IS_CLOSED(it) &
SERVER_IS_RUNNING(it->backend->server)) SERVER_IS_RUNNING(it->m_backend->server))
{ {
clone = gwbuf_clone(buffer); clone = gwbuf_clone(buffer);
dcb = it->dcb; dcb = it->m_dcb;
rval |= !dcb->func.write(dcb, clone); rval |= !dcb->func.write(dcb, clone);
MXS_DEBUG("Wrote SHOW DATABASES to %s for session %p: returned %d", MXS_DEBUG("Wrote SHOW DATABASES to %s for session %p: returned %d",
it->backend->server->unique_name, it->m_backend->server->unique_name,
m_client->session, m_client->session,
rval); rval);
} }
@ -1648,11 +1661,11 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype)
{ {
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{ {
char *srvnm = it->backend->server->unique_name; char *srvnm = it->m_backend->server->unique_name;
if (strcmp(srvnm, (char*)buffer->hint->data) == 0) if (strcmp(srvnm, (char*)buffer->hint->data) == 0)
{ {
rval = it->backend->server; rval = it->m_backend->server;
MXS_INFO("Routing hint found (%s)", rval->unique_name); MXS_INFO("Routing hint found (%s)", rval->unique_name);
} }
} }
@ -1697,7 +1710,7 @@ bool SchemaRouterSession::get_shard_dcb(DCB** p_dcb, char* name)
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{ {
SERVER_REF* b = it->backend; SERVER_REF* b = it->m_backend;
/** /**
* To become chosen: * To become chosen:
* backend must be in use, name must match, and * backend must be in use, name must match, and
@ -1707,9 +1720,9 @@ bool SchemaRouterSession::get_shard_dcb(DCB** p_dcb, char* name)
(strncasecmp(name, b->server->unique_name, PATH_MAX) == 0) && (strncasecmp(name, b->server->unique_name, PATH_MAX) == 0) &&
SERVER_IS_RUNNING(b->server)) SERVER_IS_RUNNING(b->server))
{ {
*p_dcb = it->dcb; *p_dcb = it->m_dcb;
succp = true; succp = true;
ss_dassert(it->dcb->state != DCB_STATE_ZOMBIE); ss_dassert(it->m_dcb->state != DCB_STATE_ZOMBIE);
break; break;
} }
} }
@ -1812,50 +1825,6 @@ bool SchemaRouterSession::send_database_list()
return rval; return rval;
} }
void bref_clear_state(Backend& bref, enum bref_state state)
{
if (state != BREF_WAITING_RESULT)
{
bref.state &= ~state;
}
else
{
/** Decrease global operation count */
int prev2 = atomic_add(&bref.backend->server->stats.n_current_ops, -1);
ss_dassert(prev2 > 0);
if (prev2 <= 0)
{
MXS_ERROR("[%s] Error: negative current operation count in backend %s:%u",
__FUNCTION__,
bref.backend->server->name,
bref.backend->server->port);
}
}
}
void bref_set_state(Backend& bref, enum bref_state state)
{
if (state != BREF_WAITING_RESULT)
{
bref.state |= state;
}
else
{
/** Increase global operation count */
int prev2 = atomic_add(&bref.backend->server->stats.n_current_ops, 1);
ss_dassert(prev2 >= 0);
if (prev2 < 0)
{
MXS_ERROR("[%s] Error: negative current operation count in backend %s:%u",
__FUNCTION__,
bref.backend->server->name,
bref.backend->server->port);
}
}
}
/** /**
* @node Search all RUNNING backend servers and connect * @node Search all RUNNING backend servers and connect
* *
@ -1894,7 +1863,7 @@ bool connect_backend_servers(BackendList& backends, MXS_SESSION* session)
for (BackendList::iterator it = backends.begin(); it != backends.end(); it++) for (BackendList::iterator it = backends.begin(); it != backends.end(); it++)
{ {
SERVER_REF* b = it->backend; SERVER_REF* b = it->m_backend;
MXS_INFO("MaxScale connections : %d (%d) in \t%s:%d %s", MXS_INFO("MaxScale connections : %d (%d) in \t%s:%d %s",
b->connections, b->connections,
@ -1910,7 +1879,7 @@ bool connect_backend_servers(BackendList& backends, MXS_SESSION* session)
*/ */
for (BackendList::iterator it = backends.begin(); it != backends.end(); it++) for (BackendList::iterator it = backends.begin(); it != backends.end(); it++)
{ {
SERVER_REF* b = it->backend; SERVER_REF* b = it->m_backend;
if (SERVER_IS_RUNNING(b->server)) if (SERVER_IS_RUNNING(b->server))
{ {
@ -1924,7 +1893,7 @@ bool connect_backend_servers(BackendList& backends, MXS_SESSION* session)
/** New server connection */ /** New server connection */
else else
{ {
if ((it->dcb = dcb_connect(b->server, session, b->server->protocol))) if ((it->m_dcb = dcb_connect(b->server, session, b->server->protocol)))
{ {
servers_connected += 1; servers_connected += 1;
/** /**
@ -1935,8 +1904,8 @@ bool connect_backend_servers(BackendList& backends, MXS_SESSION* session)
* table. * table.
*/ */
it->state = 0; it->m_state = 0;
bref_set_state(*it, BREF_IN_USE); it->set_state(BREF_IN_USE);
/** /**
* Increase backend connection counter. * Increase backend connection counter.
* Server's stats are _increased_ in * Server's stats are _increased_ in
@ -1968,7 +1937,7 @@ bool connect_backend_servers(BackendList& backends, MXS_SESSION* session)
{ {
for (BackendList::iterator it = backends.begin(); it != backends.end(); it++) for (BackendList::iterator it = backends.begin(); it != backends.end(); it++)
{ {
SERVER_REF* b = it->backend; SERVER_REF* b = it->m_backend;
if (BREF_IS_IN_USE(it)) if (BREF_IS_IN_USE(it))
{ {

View File

@ -93,28 +93,31 @@ class Backend
public: public:
Backend(SERVER_REF *ref); Backend(SERVER_REF *ref);
~Backend(); ~Backend();
bool execute_sescmd();
void clear_state(enum bref_state state);
void set_state(enum bref_state state);
SERVER_REF* backend; /**< Backend server */ SERVER_REF* m_backend; /**< Backend server */
DCB* dcb; /**< Backend DCB */ DCB* m_dcb; /**< Backend DCB */
GWBUF* map_queue; GWBUF* m_map_queue;
bool mapped; /**< Whether the backend has been mapped */ bool m_mapped; /**< Whether the backend has been mapped */
int n_mapping_eof; int m_num_mapping_eof;
int num_result_wait; /**< Number of not yet received results */ int m_num_result_wait; /**< Number of not yet received results */
GWBUF* pending_cmd; /**< Pending commands */ GWBUF* m_pending_cmd; /**< Pending commands */
int state; /**< State of the backend */ int m_state; /**< State of the backend */
SessionCommandList session_commands; /**< List of session commands that are SessionCommandList m_session_commands; /**< List of session commands that are
* to be executed on this backend server */ * to be executed on this backend server */
}; };
typedef list<Backend> BackendList; typedef list<Backend> BackendList;
// TODO: Move these as member functions, currently they operate on iterators // TODO: Move these as member functions, currently they operate on iterators
#define BREF_IS_NOT_USED(s) ((s)->state & ~BREF_IN_USE) #define BREF_IS_NOT_USED(s) ((s)->m_state & ~BREF_IN_USE)
#define BREF_IS_IN_USE(s) ((s)->state & BREF_IN_USE) #define BREF_IS_IN_USE(s) ((s)->m_state & BREF_IN_USE)
#define BREF_IS_WAITING_RESULT(s) ((s)->num_result_wait > 0) #define BREF_IS_WAITING_RESULT(s) ((s)->m_num_result_wait > 0)
#define BREF_IS_QUERY_ACTIVE(s) ((s)->state & BREF_QUERY_ACTIVE) #define BREF_IS_QUERY_ACTIVE(s) ((s)->m_state & BREF_QUERY_ACTIVE)
#define BREF_IS_CLOSED(s) ((s)->state & BREF_CLOSED) #define BREF_IS_CLOSED(s) ((s)->m_state & BREF_CLOSED)
#define BREF_IS_MAPPED(s) ((s)->mapped) #define BREF_IS_MAPPED(s) ((s)->m_mapped)
class SchemaRouter; class SchemaRouter;
@ -170,7 +173,6 @@ private:
/** Internal functions */ /** Internal functions */
SERVER* get_shard_target(GWBUF* buffer, uint32_t qtype); SERVER* get_shard_target(GWBUF* buffer, uint32_t qtype);
Backend* get_bref_from_dcb(DCB* dcb); Backend* get_bref_from_dcb(DCB* dcb);
bool execute_sescmd_in_backend(Backend& backend_ref);
bool get_shard_dcb(DCB** dcb, char* name); bool get_shard_dcb(DCB** dcb, char* name);
bool handle_default_db(); bool handle_default_db();
bool handle_error_new_connection(DCB* backend_dcb, GWBUF* errmsg); bool handle_error_new_connection(DCB* backend_dcb, GWBUF* errmsg);