diff --git a/server/modules/routing/schemarouter/schemaroutersession.cc b/server/modules/routing/schemarouter/schemaroutersession.cc index b3131f7ac..ab8702a7d 100644 --- a/server/modules/routing/schemarouter/schemaroutersession.cc +++ b/server/modules/routing/schemarouter/schemaroutersession.cc @@ -20,13 +20,11 @@ #include "schemaroutersession.hh" #include "schemarouterinstance.hh" -bool connect_backend_servers(backend_ref_t* backend_ref, - int router_nservers, - MXS_SESSION* session); +bool connect_backend_servers(BackendList& backends, MXS_SESSION* session); -bool execute_sescmd_in_backend(backend_ref_t* backend_ref); -void bref_clear_state(backend_ref_t* bref, enum bref_state state); -void bref_set_state(backend_ref_t* bref, enum bref_state state); +bool execute_sescmd_in_backend(Backend* backend_ref); +void bref_clear_state(Backend& bref, enum bref_state state); +void bref_set_state(Backend& bref, enum bref_state state); enum route_target get_shard_route_target(uint32_t qtype); bool change_current_db(string& dest, Shard& shard, GWBUF* buf); @@ -34,15 +32,30 @@ bool extract_database(GWBUF* buf, char* str); bool detect_show_shards(GWBUF* query); void write_error_to_client(DCB* dcb, int errnum, const char* mysqlstate, const char* errmsg); +Backend::Backend(SERVER_REF *ref): + backend(ref), + dcb(NULL), + map_queue(NULL), + mapped(false), + n_mapping_eof(0), + num_result_wait(0), + pending_cmd(NULL), + state(0) +{ +} + +Backend::~Backend() +{ + gwbuf_free(map_queue); + gwbuf_free(pending_cmd); +} SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter* router): mxs::RouterSession(session), m_closed(false), m_client(session->client_dcb), m_mysql_session((MYSQL_session*)session->client_dcb->data), - m_backends(NULL), m_config(&m_router->m_config), - m_backend_count(0), m_router(router), m_shard(m_router->m_shard_manager.get_shard(m_client->user, m_config->refresh_min_interval)), m_state(0), @@ -78,33 +91,15 @@ SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter* rou m_state |= INIT_USE_DB; } - int router_nservers = m_router->m_service->n_dbref; - backend_ref_t* backend_ref = new backend_ref_t[router_nservers]; - int i = 0; - - for (SERVER_REF *ref = m_router->m_service->dbref; ref && i < router_nservers; ref = ref->next) + for (SERVER_REF *ref = m_router->m_service->dbref; ref; ref = ref->next) { if (ref->active) { - backend_ref[i].state = 0; - backend_ref[i].n_mapping_eof = 0; - backend_ref[i].map_queue = NULL; - backend_ref[i].backend = ref; - backend_ref[i].pending_cmd = NULL; - i++; + m_backends.push_back(Backend(ref)); } } - if (i < router_nservers) - { - /** Service had less than the reported number of servers */ - router_nservers = i; - } - - m_backends = backend_ref; - m_backend_count = router_nservers; - - if (!connect_backend_servers(backend_ref, router_nservers, session)) + if (!connect_backend_servers(m_backends, session)) { // TODO: Figure out how to avoid this throw throw std::runtime_error("Failed to connect to backend servers"); @@ -121,12 +116,6 @@ SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter* rou SchemaRouterSession::~SchemaRouterSession() { - for (int i = 0; i < m_backend_count; i++) - { - gwbuf_free(m_backends[i].pending_cmd); - } - - delete[] m_backends; } void SchemaRouterSession::close() @@ -140,28 +129,27 @@ void SchemaRouterSession::close() { m_closed = true; - for (int i = 0; i < m_backend_count; i++) + for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { - backend_ref_t* bref = &m_backends[i]; - DCB* dcb = bref->dcb; + DCB* dcb = it->dcb; /** Close those which had been connected */ - if (BREF_IS_IN_USE(bref)) + if (BREF_IS_IN_USE(it)) { CHK_DCB(dcb); /** Clean operation counter in bref and in SERVER */ - while (BREF_IS_WAITING_RESULT(bref)) + while (BREF_IS_WAITING_RESULT(it)) { - bref_clear_state(bref, BREF_WAITING_RESULT); + bref_clear_state(*it, BREF_WAITING_RESULT); } - bref_clear_state(bref, BREF_IN_USE); - bref_set_state(bref, BREF_CLOSED); + bref_clear_state(*it, BREF_IN_USE); + bref_set_state(*it, BREF_CLOSED); /** * closes protocol and dcb */ dcb_close(dcb); /** decrease server current connection counters */ - atomic_add(&bref->backend->connections, -1); + atomic_add(&it->backend->connections, -1); } } @@ -291,9 +279,9 @@ SERVER* SchemaRouterSession::resolve_query_target(GWBUF* pPacket, if (TARGET_IS_ANY(route_target)) { - for (int i = 0; i < m_backend_count; i++) + for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { - SERVER *server = m_backends[i].backend->server; + SERVER *server = it->backend->server; if (SERVER_IS_RUNNING(server)) { route_target = TARGET_NAMED_SERVER; @@ -453,7 +441,7 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) get_shard_dcb(&target_dcb, target->unique_name)) { /** We know where to route this query */ - backend_ref_t *bref = get_bref_from_dcb(target_dcb); + Backend *bref = get_bref_from_dcb(target_dcb); MXS_INFO("Route query to \t%s:%d <", bref->backend->server->name, bref->backend->server->port); @@ -467,7 +455,7 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) } else if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(pPacket))) == 1) { - backend_ref_t* bref; + Backend* bref; atomic_add(&m_router->m_stats.n_queries, 1); @@ -475,8 +463,8 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) * Add one query response waiter to backend reference */ bref = get_bref_from_dcb(target_dcb); - bref_set_state(bref, BREF_QUERY_ACTIVE); - bref_set_state(bref, BREF_WAITING_RESULT); + bref_set_state(*bref, BREF_QUERY_ACTIVE); + bref_set_state(*bref, BREF_WAITING_RESULT); } else { @@ -487,7 +475,7 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) gwbuf_free(pPacket); return ret; } -void SchemaRouterSession::handle_mapping_reply(backend_ref_t* bref, GWBUF* pPacket) +void SchemaRouterSession::handle_mapping_reply(Backend* bref, GWBUF* pPacket) { int rc = inspect_backend_mapping_states(bref, &pPacket); @@ -521,7 +509,7 @@ void SchemaRouterSession::handle_mapping_reply(backend_ref_t* bref, GWBUF* pPack } } -void SchemaRouterSession::process_response(backend_ref_t* bref, GWBUF** ppPacket) +void SchemaRouterSession::process_response(Backend* bref, GWBUF** ppPacket) { if (bref->session_commands.size() > 0) { @@ -547,20 +535,20 @@ void SchemaRouterSession::process_response(backend_ref_t* bref, GWBUF** ppPacket if (*ppPacket) { - bref_clear_state(bref, BREF_WAITING_RESULT); + bref_clear_state(*bref, BREF_WAITING_RESULT); } } else if (BREF_IS_QUERY_ACTIVE(bref)) { - bref_clear_state(bref, BREF_QUERY_ACTIVE); + bref_clear_state(*bref, BREF_QUERY_ACTIVE); /** Set response status as replied */ - bref_clear_state(bref, BREF_WAITING_RESULT); + bref_clear_state(*bref, BREF_WAITING_RESULT); } } void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb) { - backend_ref_t* bref = get_bref_from_dcb(pDcb); + Backend* bref = get_bref_from_dcb(pDcb); if (m_closed || bref == NULL) { @@ -614,7 +602,7 @@ void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb) /** There are pending session commands to be executed. */ MXS_INFO("Backend %s:%d processed reply and starts to execute active cursor.", bref->backend->server->name, bref->backend->server->port); - execute_sescmd_in_backend(bref); + execute_sescmd_in_backend(*bref); } else if (bref->pending_cmd) /*< non-sescmd is waiting to be routed */ { @@ -625,8 +613,8 @@ void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb) if (ret == 1) { atomic_add(&m_router->m_stats.n_queries, 1); - bref_set_state(bref, BREF_QUERY_ACTIVE); - bref_set_state(bref, BREF_WAITING_RESULT); + bref_set_state(*bref, BREF_QUERY_ACTIVE); + bref_set_state(*bref, BREF_WAITING_RESULT); } else { @@ -753,27 +741,27 @@ retblock: * * Router session must be locked. */ -bool SchemaRouterSession::execute_sescmd_in_backend(backend_ref_t* backend_ref) +bool SchemaRouterSession::execute_sescmd_in_backend(Backend& backend_ref) { - if (BREF_IS_CLOSED(backend_ref)) + if (BREF_IS_CLOSED(&backend_ref)) { return false; } - DCB *dcb = backend_ref->dcb; + DCB *dcb = backend_ref.dcb; CHK_DCB(dcb); int rc = 0; /** Return if there are no pending ses commands */ - if (backend_ref->session_commands.size() == 0) + if (backend_ref.session_commands.size() == 0) { MXS_INFO("Cursor had no pending session commands."); return false; } - SessionCommandList::iterator iter = backend_ref->session_commands.begin(); + SessionCommandList::iterator iter = backend_ref.session_commands.begin(); GWBUF *buffer = iter->copy_buffer().release(); switch (iter->get_command()) @@ -813,7 +801,6 @@ bool SchemaRouterSession::execute_sescmd_in_backend(backend_ref_t* backend_ref) bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command) { bool succp = false; - backend_ref_t *backend_ref = m_backends; MXS_INFO("Session write, routing to all servers."); atomic_add(&m_stats.longest_sescmd, 1); @@ -821,24 +808,22 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command) /** Increment the session command count */ ++m_sent_sescmd; - for (int i = 0; i < m_backend_count; i++) + for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { - if (BREF_IS_IN_USE((&backend_ref[i]))) + if (BREF_IS_IN_USE(it)) { GWBUF *buffer = gwbuf_clone(querybuf); - backend_ref[i].session_commands.push_back(SessionCommand(buffer, m_sent_sescmd)); + it->session_commands.push_back(SessionCommand(buffer, m_sent_sescmd)); if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) { - MXS_INFO("Route query to %s\t%s:%d%s", - (SERVER_IS_MASTER(backend_ref[i].backend->server) ? - "master" : "slave"), - backend_ref[i].backend->server->name, - backend_ref[i].backend->server->port, - (i + 1 == m_backend_count ? " <" : "")); + MXS_INFO("Route query to %s\t%s:%d", + SERVER_IS_MASTER(it->backend->server) ? "master" : "slave", + it->backend->server->name, + it->backend->server->port); } - if (backend_ref[i].session_commands.size() == 1) + if (it->session_commands.size() == 1) { /** Only one command, execute it */ switch (command) @@ -849,11 +834,11 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command) break; default: - bref_set_state(&backend_ref[i], BREF_WAITING_RESULT); + bref_set_state(*it, BREF_WAITING_RESULT); break; } - if (execute_sescmd_in_backend(&backend_ref[i])) + if (execute_sescmd_in_backend(*it)) { succp = true; } @@ -861,17 +846,17 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command) { MXS_ERROR("Failed to execute session " "command in %s:%d", - backend_ref[i].backend->server->name, - backend_ref[i].backend->server->port); + it->backend->server->name, + it->backend->server->port); } } else { - ss_dassert(backend_ref[i].session_commands.size() > 1); + ss_dassert(it->session_commands.size() > 1); /** The server is already executing a session command */ MXS_INFO("Backend %s:%d already executing sescmd.", - backend_ref[i].backend->server->name, - backend_ref[i].backend->server->port); + it->backend->server->name, + it->backend->server->port); succp = true; } } @@ -882,13 +867,13 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command) void SchemaRouterSession::handle_error_reply_client(DCB* dcb, GWBUF* errmsg) { - backend_ref_t* bref = get_bref_from_dcb(dcb); + Backend* bref = get_bref_from_dcb(dcb); if (bref) { - bref_clear_state(bref, BREF_IN_USE); - bref_set_state(bref, BREF_CLOSED); + bref_clear_state(*bref, BREF_IN_USE); + bref_set_state(*bref, BREF_CLOSED); } if (dcb->session->state == SESSION_STATE_ROUTER_READY) @@ -905,10 +890,9 @@ void SchemaRouterSession::handle_error_reply_client(DCB* dcb, GWBUF* errmsg) */ bool SchemaRouterSession::have_servers() { - for (int i = 0; i < m_backend_count; i++) + for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { - if (BREF_IS_IN_USE(&m_backends[i]) && - !BREF_IS_CLOSED(&m_backends[i])) + if (BREF_IS_IN_USE(it) && !BREF_IS_CLOSED(it)) { return true; } @@ -932,15 +916,12 @@ bool SchemaRouterSession::have_servers() */ bool SchemaRouterSession::handle_error_new_connection(DCB* backend_dcb, GWBUF* errmsg) { - backend_ref_t* bref; - MXS_SESSION *ses = backend_dcb->session; CHK_SESSION(ses); - /** - * If bref == NULL it has been replaced already with another one. - */ - if ((bref = get_bref_from_dcb(backend_dcb)) == NULL) + Backend* bref = get_bref_from_dcb(backend_dcb); + + if (bref == NULL) { /** This should not happen */ ss_dassert(false); @@ -957,10 +938,10 @@ bool SchemaRouterSession::handle_error_new_connection(DCB* backend_dcb, GWBUF* e DCB* client_dcb; client_dcb = ses->client_dcb; client_dcb->func.write(client_dcb, gwbuf_clone(errmsg)); - bref_clear_state(bref, BREF_WAITING_RESULT); + bref_clear_state(*bref, BREF_WAITING_RESULT); } - bref_clear_state(bref, BREF_IN_USE); - bref_set_state(bref, BREF_CLOSED); + bref_clear_state(*bref, BREF_IN_USE); + bref_set_state(*bref, BREF_CLOSED); return have_servers(); } @@ -973,15 +954,15 @@ bool SchemaRouterSession::handle_error_new_connection(DCB* backend_dcb, GWBUF* e * * @return backend reference pointer if succeed or NULL */ -backend_ref_t* SchemaRouterSession::get_bref_from_dcb(DCB* dcb) +Backend* SchemaRouterSession::get_bref_from_dcb(DCB* dcb) { CHK_DCB(dcb); - for (int i = 0; i < m_backend_count; i++) + for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { - if (m_backends[i].dcb == dcb) + if (it->dcb == dcb) { - return &m_backends[i]; + return &(*it); } } @@ -1183,29 +1164,28 @@ void SchemaRouterSession::route_queued_query() * @param router_cli_ses Router client session * @return 1 if mapping is done, 0 if it is still ongoing and -1 on error */ -int SchemaRouterSession::inspect_backend_mapping_states(backend_ref_t *bref, +int SchemaRouterSession::inspect_backend_mapping_states(Backend *bref, GWBUF** wbuf) { bool mapped = true; GWBUF* writebuf = *wbuf; - backend_ref_t* bkrf = m_backends; - for (int i = 0; i < m_backend_count; i++) + for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { - if (bref->dcb == bkrf[i].dcb && !BREF_IS_MAPPED(&bkrf[i])) + if (bref->dcb == it->dcb && !BREF_IS_MAPPED(it)) { if (bref->map_queue) { writebuf = gwbuf_append(bref->map_queue, writebuf); bref->map_queue = NULL; } - enum showdb_response rc = parse_showdb_response(&m_backends[i], - &writebuf); + enum showdb_response rc = parse_showdb_response(&(*it), + &writebuf); if (rc == SHOWDB_FULL_RESPONSE) { - m_backends[i].mapped = true; + it->mapped = true; MXS_DEBUG("Received SHOW DATABASES reply from %s for session %p", - m_backends[i].backend->server->unique_name, + it->backend->server->unique_name, m_client->session); } else if (rc == SHOWDB_PARTIAL_RESPONSE) @@ -1213,7 +1193,7 @@ int SchemaRouterSession::inspect_backend_mapping_states(backend_ref_t *bref, bref->map_queue = writebuf; writebuf = NULL; MXS_DEBUG("Received partial SHOW DATABASES reply from %s for session %p", - m_backends[i].backend->server->unique_name, + it->backend->server->unique_name, m_client->session); } else @@ -1262,11 +1242,11 @@ int SchemaRouterSession::inspect_backend_mapping_states(backend_ref_t *bref, } } - if (BREF_IS_IN_USE(&bkrf[i]) && !BREF_IS_MAPPED(&bkrf[i])) + if (BREF_IS_IN_USE(it) && !BREF_IS_MAPPED(it)) { mapped = false; MXS_DEBUG("Still waiting for reply to SHOW DATABASES from %s for session %p", - bkrf[i].backend->server->unique_name, + it->backend->server->unique_name, m_client->session); } } @@ -1411,7 +1391,7 @@ char* get_lenenc_str(void* data) * @return 1 if a complete response was received, 0 if a partial response was received * and -1 if a database was found on more than one server. */ -enum showdb_response SchemaRouterSession::parse_showdb_response(backend_ref_t* bref, GWBUF** buffer) +enum showdb_response SchemaRouterSession::parse_showdb_response(Backend* bref, GWBUF** buffer) { unsigned char* ptr; SERVER* target = bref->backend->server; @@ -1539,10 +1519,10 @@ int SchemaRouterSession::gen_databaselist() int i, rval = 0; unsigned int len; - for (i = 0; i < m_backend_count; i++) + for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { - m_backends[i].mapped = false; - m_backends[i].n_mapping_eof = 0; + it->mapped = false; + it->n_mapping_eof = 0; } m_state |= INIT_MAPPING; @@ -1557,17 +1537,17 @@ int SchemaRouterSession::gen_databaselist() *(data + 4) = 0x03; memcpy(data + 5, query, strlen(query)); - for (i = 0; i < m_backend_count; i++) + for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { - if (BREF_IS_IN_USE(&m_backends[i]) && - !BREF_IS_CLOSED(&m_backends[i]) & - SERVER_IS_RUNNING(m_backends[i].backend->server)) + if (BREF_IS_IN_USE(it) && + !BREF_IS_CLOSED(it) & + SERVER_IS_RUNNING(it->backend->server)) { clone = gwbuf_clone(buffer); - dcb = m_backends[i].dcb; + dcb = it->dcb; rval |= !dcb->func.write(dcb, clone); MXS_DEBUG("Wrote SHOW DATABASES to %s for session %p: returned %d", - m_backends[i].backend->server->unique_name, + it->backend->server->unique_name, m_client->session, rval); } @@ -1666,13 +1646,13 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype) } else if (buffer->hint && buffer->hint->type == HINT_ROUTE_TO_NAMED_SERVER) { - for (int i = 0; i < m_backend_count; i++) + for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { - char *srvnm = m_backends[i].backend->server->unique_name; + char *srvnm = it->backend->server->unique_name; if (strcmp(srvnm, (char*)buffer->hint->data) == 0) { - rval = m_backends[i].backend->server; + rval = it->backend->server; MXS_INFO("Routing hint found (%s)", rval->unique_name); } } @@ -1712,39 +1692,28 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype) */ bool SchemaRouterSession::get_shard_dcb(DCB** p_dcb, char* name) { - backend_ref_t* backend_ref; - int i; bool succp = false; - - ss_dassert(p_dcb != NULL && *(p_dcb) == NULL); - if (p_dcb == NULL || name == NULL) + for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { - goto return_succp; - } - backend_ref = m_backends; - - for (i = 0; i < m_backend_count; i++) - { - SERVER_REF* b = backend_ref[i].backend; + SERVER_REF* b = it->backend; /** * To become chosen: * backend must be in use, name must match, and * the backend state must be RUNNING */ - if (BREF_IS_IN_USE((&backend_ref[i])) && + if (BREF_IS_IN_USE((&(*it))) && (strncasecmp(name, b->server->unique_name, PATH_MAX) == 0) && SERVER_IS_RUNNING(b->server)) { - *p_dcb = backend_ref[i].dcb; + *p_dcb = it->dcb; succp = true; - ss_dassert(backend_ref[i].dcb->state != DCB_STATE_ZOMBIE); - goto return_succp; + ss_dassert(it->dcb->state != DCB_STATE_ZOMBIE); + break; } } -return_succp: return succp; } @@ -1843,56 +1812,46 @@ bool SchemaRouterSession::send_database_list() return rval; } -void bref_clear_state(backend_ref_t* bref, enum bref_state state) +void bref_clear_state(Backend& bref, enum bref_state state) { - if (bref == NULL) - { - MXS_ERROR("[%s] Error: NULL parameter.", __FUNCTION__); - return; - } if (state != BREF_WAITING_RESULT) { - bref->state &= ~state; + bref.state &= ~state; } else { /** Decrease global operation count */ - int prev2 = atomic_add(&bref->backend->server->stats.n_current_ops, -1); + int prev2 = atomic_add(&bref.backend->server->stats.n_current_ops, -1); ss_dassert(prev2 > 0); if (prev2 <= 0) { MXS_ERROR("[%s] Error: negative current operation count in backend %s:%u", __FUNCTION__, - bref->backend->server->name, - bref->backend->server->port); + bref.backend->server->name, + bref.backend->server->port); } } } -void bref_set_state(backend_ref_t* bref, enum bref_state state) +void bref_set_state(Backend& bref, enum bref_state state) { - if (bref == NULL) - { - MXS_ERROR("[%s] Error: NULL parameter.", __FUNCTION__); - return; - } if (state != BREF_WAITING_RESULT) { - bref->state |= state; + bref.state |= state; } else { /** Increase global operation count */ - int prev2 = atomic_add(&bref->backend->server->stats.n_current_ops, 1); + int prev2 = atomic_add(&bref.backend->server->stats.n_current_ops, 1); ss_dassert(prev2 >= 0); if (prev2 < 0) { MXS_ERROR("[%s] Error: negative current operation count in backend %s:%u", __FUNCTION__, - bref->backend->server->name, - bref->backend->server->port); + bref.backend->server->name, + bref.backend->server->port); } } } @@ -1922,9 +1881,7 @@ void bref_set_state(backend_ref_t* bref, enum bref_state state) * connections because all servers are supposed to be operational. It is, * however, possible that there are less available servers than expected. */ -bool connect_backend_servers(backend_ref_t* backend_ref, - int router_nservers, - MXS_SESSION* session) +bool connect_backend_servers(BackendList& backends, MXS_SESSION* session) { bool succp = false; int servers_found = 0; @@ -1935,9 +1892,9 @@ bool connect_backend_servers(backend_ref_t* backend_ref, { MXS_INFO("Servers and connection counts:"); - for (int i = 0; i < router_nservers; i++) + for (BackendList::iterator it = backends.begin(); it != backends.end(); it++) { - SERVER_REF* b = backend_ref[i].backend; + SERVER_REF* b = it->backend; MXS_INFO("MaxScale connections : %d (%d) in \t%s:%d %s", b->connections, @@ -1951,27 +1908,23 @@ bool connect_backend_servers(backend_ref_t* backend_ref, * Scan server list and connect each of them. None should fail or session * can't be established. */ - for (int i = 0; i < router_nservers; i++) + for (BackendList::iterator it = backends.begin(); it != backends.end(); it++) { - SERVER_REF* b = backend_ref[i].backend; + SERVER_REF* b = it->backend; if (SERVER_IS_RUNNING(b->server)) { servers_found += 1; /** Server is already connected */ - if (BREF_IS_IN_USE((&backend_ref[i]))) + if (BREF_IS_IN_USE(it)) { slaves_connected += 1; } /** New server connection */ else { - backend_ref[i].dcb = dcb_connect(b->server, - session, - b->server->protocol); - - if (backend_ref[i].dcb != NULL) + if ((it->dcb = dcb_connect(b->server, session, b->server->protocol))) { servers_connected += 1; /** @@ -1982,8 +1935,8 @@ bool connect_backend_servers(backend_ref_t* backend_ref, * table. */ - backend_ref[i].state = 0; - bref_set_state(&backend_ref[i], BREF_IN_USE); + it->state = 0; + bref_set_state(*it, BREF_IN_USE); /** * Increase backend connection counter. * Server's stats are _increased_ in @@ -2013,11 +1966,11 @@ bool connect_backend_servers(backend_ref_t* backend_ref, if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) { - for (int i = 0; i < router_nservers; i++) + for (BackendList::iterator it = backends.begin(); it != backends.end(); it++) { - SERVER_REF* b = backend_ref[i].backend; + SERVER_REF* b = it->backend; - if (BREF_IS_IN_USE((&backend_ref[i]))) + if (BREF_IS_IN_USE(it)) { MXS_INFO("Connected %s in \t%s:%d", STRSRVSTATUS(b->server), diff --git a/server/modules/routing/schemarouter/schemaroutersession.hh b/server/modules/routing/schemarouter/schemaroutersession.hh index 0a5fecea7..3ab7210fb 100644 --- a/server/modules/routing/schemarouter/schemaroutersession.hh +++ b/server/modules/routing/schemarouter/schemaroutersession.hh @@ -61,13 +61,6 @@ enum bref_state BREF_DB_MAPPED = 0x10 }; -#define BREF_IS_NOT_USED(s) ((s)->state & ~BREF_IN_USE) -#define BREF_IS_IN_USE(s) ((s)->state & BREF_IN_USE) -#define BREF_IS_WAITING_RESULT(s) ((s)->num_result_wait > 0) -#define BREF_IS_QUERY_ACTIVE(s) ((s)->state & BREF_QUERY_ACTIVE) -#define BREF_IS_CLOSED(s) ((s)->state & BREF_CLOSED) -#define BREF_IS_MAPPED(s) ((s)->mapped) - #define SCHEMA_ERR_DUPLICATEDB 5000 #define SCHEMA_ERRSTR_DUPLICATEDB "DUPDB" #define SCHEMA_ERR_DBNOTFOUND 1049 @@ -95,20 +88,33 @@ enum route_target * * Owned by router client session. */ -typedef struct backend_ref_st +class Backend { - int n_mapping_eof; - GWBUF* map_queue; +public: + Backend(SERVER_REF *ref); + ~Backend(); + SERVER_REF* backend; /**< Backend server */ DCB* dcb; /**< Backend DCB */ - int state; /**< State of the backend */ + GWBUF* map_queue; bool mapped; /**< Whether the backend has been mapped */ + int n_mapping_eof; int num_result_wait; /**< Number of not yet received results */ GWBUF* pending_cmd; /**< Pending commands */ - + int state; /**< State of the backend */ SessionCommandList session_commands; /**< List of session commands that are * to be executed on this backend server */ -} backend_ref_t; +}; + +typedef list BackendList; + +// TODO: Move these as member functions, currently they operate on iterators +#define BREF_IS_NOT_USED(s) ((s)->state & ~BREF_IN_USE) +#define BREF_IS_IN_USE(s) ((s)->state & BREF_IN_USE) +#define BREF_IS_WAITING_RESULT(s) ((s)->num_result_wait > 0) +#define BREF_IS_QUERY_ACTIVE(s) ((s)->state & BREF_QUERY_ACTIVE) +#define BREF_IS_CLOSED(s) ((s)->state & BREF_CLOSED) +#define BREF_IS_MAPPED(s) ((s)->mapped) class SchemaRouter; @@ -163,8 +169,8 @@ public: private: /** Internal functions */ SERVER* get_shard_target(GWBUF* buffer, uint32_t qtype); - backend_ref_t* get_bref_from_dcb(DCB* dcb); - bool execute_sescmd_in_backend(backend_ref_t* backend_ref); + Backend* get_bref_from_dcb(DCB* dcb); + bool execute_sescmd_in_backend(Backend& backend_ref); bool get_shard_dcb(DCB** dcb, char* name); bool handle_default_db(); bool handle_error_new_connection(DCB* backend_dcb, GWBUF* errmsg); @@ -172,14 +178,14 @@ private: bool route_session_write(GWBUF* querybuf, uint8_t command); bool send_database_list(); int gen_databaselist(); - int inspect_backend_mapping_states(backend_ref_t *bref, GWBUF** wbuf); + int inspect_backend_mapping_states(Backend *bref, GWBUF** wbuf); bool process_show_shards(); - enum showdb_response parse_showdb_response(backend_ref_t* bref, GWBUF** buffer); + enum showdb_response parse_showdb_response(Backend* bref, GWBUF** buffer); void handle_error_reply_client(DCB* backend_dcb, GWBUF* errmsg); void route_queued_query(); void synchronize_shard_map(); - void handle_mapping_reply(backend_ref_t* bref, GWBUF* pPacket); - void process_response(backend_ref_t* bref, GWBUF** ppPacket); + void handle_mapping_reply(Backend* bref, GWBUF* pPacket); + void process_response(Backend* bref, GWBUF** ppPacket); SERVER* resolve_query_target(GWBUF* pPacket, uint32_t type, uint8_t command, enum route_target& route_target); @@ -187,9 +193,8 @@ private: bool m_closed; /**< True if session closed */ DCB* m_client; /**< The client DCB */ MYSQL_session* m_mysql_session; /**< Session client data (username, password, SHA1). */ - backend_ref_t* m_backends; /**< Pointer to backend reference array */ + BackendList m_backends; /**< Backend references */ Config* m_config; /**< Pointer to router config */ - int m_backend_count; /**< Number of backends */ SchemaRouter* m_router; /**< The router instance */ Shard m_shard; /**< Database to server mapping */ string m_connect_db; /**< Database the user was trying to connect to */