From 66fa4fbc7d433fc2076a60ae509731ed2188fece Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Tue, 28 Mar 2017 16:22:41 +0300 Subject: [PATCH] Use shared pointers to Backend classes The schemarouter now uses shared pointers. This removes the need to copy the class. Following changes move the member variables inside the Backend class. --- .../routing/schemarouter/CMakeLists.txt | 2 +- .../routing/schemarouter/schemarouter.cc | 108 +++++ .../routing/schemarouter/schemarouter.hh | 60 +++ .../schemarouter/schemarouterinstance.cc | 147 ++++++- .../schemarouter/schemarouterinstance.hh | 3 +- .../schemarouter/schemaroutersession.cc | 415 ++++-------------- .../schemarouter/schemaroutersession.hh | 70 +-- 7 files changed, 410 insertions(+), 395 deletions(-) create mode 100644 server/modules/routing/schemarouter/schemarouter.cc diff --git a/server/modules/routing/schemarouter/CMakeLists.txt b/server/modules/routing/schemarouter/CMakeLists.txt index 6bc3fd575..ef81f9e57 100644 --- a/server/modules/routing/schemarouter/CMakeLists.txt +++ b/server/modules/routing/schemarouter/CMakeLists.txt @@ -1,4 +1,4 @@ -add_library(schemarouter SHARED schemarouterinstance.cc schemaroutersession.cc shard_map.cc session_command.cc) +add_library(schemarouter SHARED schemarouter.cc schemarouterinstance.cc schemaroutersession.cc shard_map.cc session_command.cc) target_link_libraries(schemarouter maxscale-common) add_dependencies(schemarouter pcre2) set_target_properties(schemarouter PROPERTIES VERSION "1.0.0") diff --git a/server/modules/routing/schemarouter/schemarouter.cc b/server/modules/routing/schemarouter/schemarouter.cc new file mode 100644 index 000000000..742a51c55 --- /dev/null +++ b/server/modules/routing/schemarouter/schemarouter.cc @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2019-07-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include "schemarouter.hh" + +#include + +using namespace schemarouter; + +Backend::Backend(SERVER_REF *ref): + m_backend(ref), + m_dcb(NULL), + m_map_queue(NULL), + m_mapped(false), + m_num_mapping_eof(0), + m_num_result_wait(0), + m_pending_cmd(NULL), + m_state(0) +{ +} + +Backend::~Backend() +{ + gwbuf_free(m_map_queue); + gwbuf_free(m_pending_cmd); +} + + +bool Backend::execute_sescmd() +{ + if (BREF_IS_CLOSED(this)) + { + return false; + } + + CHK_DCB(m_dcb); + + int rc = 0; + + /** Return if there are no pending ses commands */ + if (m_session_commands.size() == 0) + { + MXS_INFO("Cursor had no pending session commands."); + return false; + } + + SessionCommandList::iterator iter = m_session_commands.begin(); + GWBUF *buffer = iter->copy_buffer().release(); + + switch (iter->get_command()) + { + case MYSQL_COM_CHANGE_USER: + /** This makes it possible to handle replies correctly */ + gwbuf_set_type(buffer, GWBUF_TYPE_SESCMD); + rc = m_dcb->func.auth(m_dcb, NULL, m_dcb->session, buffer); + break; + + case MYSQL_COM_QUERY: + default: + /** + * Mark session command buffer, it triggers writing + * MySQL command to protocol + */ + gwbuf_set_type(buffer, GWBUF_TYPE_SESCMD); + rc = m_dcb->func.write(m_dcb, buffer); + break; + } + + return rc == 1; +} + +void Backend::clear_state(enum bref_state state) +{ + if (state != BREF_WAITING_RESULT) + { + m_state &= ~state; + } + else + { + /** Decrease global operation count */ + ss_debug(int prev2 = )atomic_add(&m_backend->server->stats.n_current_ops, -1); + ss_dassert(prev2 > 0); + } +} + +void Backend::set_state(enum bref_state state) +{ + if (state != BREF_WAITING_RESULT) + { + m_state |= state; + } + else + { + /** Increase global operation count */ + ss_debug(int prev2 = )atomic_add(&m_backend->server->stats.n_current_ops, 1); + ss_dassert(prev2 >= 0); + } +} diff --git a/server/modules/routing/schemarouter/schemarouter.hh b/server/modules/routing/schemarouter/schemarouter.hh index 23667757d..5d24cbe43 100644 --- a/server/modules/routing/schemarouter/schemarouter.hh +++ b/server/modules/routing/schemarouter/schemarouter.hh @@ -21,13 +21,43 @@ #include #include +#include #include #include +#include +#include #include +#include +#include "session_command.hh" + +using std::list; using std::set; using std::string; +using std::shared_ptr; + +using maxscale::Buffer; + +/** + * The state of the backend server reference + */ +enum bref_state +{ + BREF_IN_USE = 0x01, + BREF_WAITING_RESULT = 0x02, /**< for session commands only */ + BREF_QUERY_ACTIVE = 0x04, /**< for other queries */ + BREF_CLOSED = 0x08, + BREF_DB_MAPPED = 0x10 +}; + +// TODO: Move these as member functions, currently they operate on iterators +#define BREF_IS_NOT_USED(s) ((s)->m_state & ~BREF_IN_USE) +#define BREF_IS_IN_USE(s) ((s)->m_state & BREF_IN_USE) +#define BREF_IS_WAITING_RESULT(s) ((s)->m_num_result_wait > 0) +#define BREF_IS_QUERY_ACTIVE(s) ((s)->m_state & BREF_QUERY_ACTIVE) +#define BREF_IS_CLOSED(s) ((s)->m_state & BREF_CLOSED) +#define BREF_IS_MAPPED(s) ((s)->m_mapped) namespace schemarouter { @@ -86,4 +116,34 @@ struct Stats { } }; + +/** + * Reference to BACKEND. + * + * Owned by router client session. + */ +class Backend +{ +public: + Backend(SERVER_REF *ref); + ~Backend(); + bool execute_sescmd(); + void clear_state(enum bref_state state); + void set_state(enum bref_state state); + + SERVER_REF* m_backend; /**< Backend server */ + DCB* m_dcb; /**< Backend DCB */ + GWBUF* m_map_queue; + bool m_mapped; /**< Whether the backend has been mapped */ + int m_num_mapping_eof; + int m_num_result_wait; /**< Number of not yet received results */ + GWBUF* m_pending_cmd; /**< Pending commands */ + int m_state; /**< State of the backend */ + SessionCommandList m_session_commands; /**< List of session commands that are + * to be executed on this backend server */ +}; + +typedef shared_ptr SBackend; +typedef list BackendList; + } diff --git a/server/modules/routing/schemarouter/schemarouterinstance.cc b/server/modules/routing/schemarouter/schemarouterinstance.cc index 3d5531521..5d0b5e89a 100644 --- a/server/modules/routing/schemarouter/schemarouterinstance.cc +++ b/server/modules/routing/schemarouter/schemarouterinstance.cc @@ -170,9 +170,154 @@ SchemaRouter* SchemaRouter::create(SERVICE* pService, char** pzOptions) return success ? new SchemaRouter(pService, config) : NULL; } +/** + * @node Search all RUNNING backend servers and connect + * + * Parameters: + * @param backend_ref - in, use, out + * Pointer to backend server reference object array. + * NULL is not allowed. + * + * @param router_nservers - in, use + * Number of backend server pointers pointed to by b. + * + * @param session - in, use + * MaxScale session pointer used when connection to backend is established. + * + * @param router - in, use + * Pointer to router instance. Used when server states are qualified. + * + * @return true, if at least one master and one slave was found. + * + * + * @details It is assumed that there is only one available server. + * There will be exactly as many backend references than there are + * connections because all servers are supposed to be operational. It is, + * however, possible that there are less available servers than expected. + */ +bool connect_backend_servers(BackendList& backends, MXS_SESSION* session) +{ + bool succp = false; + int servers_found = 0; + int servers_connected = 0; + int slaves_connected = 0; + + if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) + { + MXS_INFO("Servers and connection counts:"); + + for (BackendList::iterator it = backends.begin(); it != backends.end(); it++) + { + SERVER_REF* b = (*it)->m_backend; + + MXS_INFO("MaxScale connections : %d (%d) in \t%s:%d %s", + b->connections, + b->server->stats.n_current, + b->server->name, + b->server->port, + STRSRVSTATUS(b->server)); + } + } + /** + * Scan server list and connect each of them. None should fail or session + * can't be established. + */ + for (BackendList::iterator it = backends.begin(); it != backends.end(); it++) + { + SERVER_REF* b = (*it)->m_backend; + + if (SERVER_IS_RUNNING(b->server)) + { + servers_found += 1; + + /** Server is already connected */ + if (BREF_IS_IN_USE((*it))) + { + slaves_connected += 1; + } + /** New server connection */ + else + { + if (((*it)->m_dcb = dcb_connect(b->server, session, b->server->protocol))) + { + servers_connected += 1; + /** + * When server fails, this callback + * is called. + * !!! Todo, routine which removes + * corresponding entries from the hash + * table. + */ + + (*it)->m_state = 0; + (*it)->set_state(BREF_IN_USE); + /** + * Increase backend connection counter. + * Server's stats are _increased_ in + * dcb.c:dcb_alloc ! + * But decreased in the calling function + * of dcb_close. + */ + atomic_add(&b->connections, 1); + } + else + { + succp = false; + MXS_ERROR("Unable to establish " + "connection with slave %s:%d", + b->server->name, + b->server->port); + /* handle connect error */ + break; + } + } + } + } + + if (servers_connected > 0) + { + succp = true; + + if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) + { + for (BackendList::iterator it = backends.begin(); it != backends.end(); it++) + { + SERVER_REF* b = (*it)->m_backend; + + if (BREF_IS_IN_USE((*it))) + { + MXS_INFO("Connected %s in \t%s:%d", + STRSRVSTATUS(b->server), + b->server->name, + b->server->port); + } + } + } + } + + return succp; +} + SchemaRouterSession* SchemaRouter::newSession(MXS_SESSION* pSession) { - return new SchemaRouterSession(pSession, this); + BackendList backends; + + for (SERVER_REF *ref = m_service->dbref; ref; ref = ref->next) + { + if (ref->active) + { + backends.push_back(SBackend(new Backend(ref))); + } + } + + SchemaRouterSession* rval = NULL; + + if (connect_backend_servers(backends, pSession)) + { + rval = new SchemaRouterSession(pSession, this, backends); + } + + return rval; } void SchemaRouter::diagnostics(DCB* dcb) diff --git a/server/modules/routing/schemarouter/schemarouterinstance.hh b/server/modules/routing/schemarouter/schemarouterinstance.hh index e2ce29f77..ffc066774 100644 --- a/server/modules/routing/schemarouter/schemarouterinstance.hh +++ b/server/modules/routing/schemarouter/schemarouterinstance.hh @@ -24,8 +24,7 @@ using std::string; using std::set; -using schemarouter::Config; -using schemarouter::Stats; +using namespace schemarouter; class SchemaRouterSession; diff --git a/server/modules/routing/schemarouter/schemaroutersession.cc b/server/modules/routing/schemarouter/schemaroutersession.cc index aa52890f7..635fbed73 100644 --- a/server/modules/routing/schemarouter/schemaroutersession.cc +++ b/server/modules/routing/schemarouter/schemaroutersession.cc @@ -28,101 +28,12 @@ bool extract_database(GWBUF* buf, char* str); bool detect_show_shards(GWBUF* query); void write_error_to_client(DCB* dcb, int errnum, const char* mysqlstate, const char* errmsg); -Backend::Backend(SERVER_REF *ref): - m_backend(ref), - m_dcb(NULL), - m_map_queue(NULL), - m_mapped(false), - m_num_mapping_eof(0), - m_num_result_wait(0), - m_pending_cmd(NULL), - m_state(0) -{ -} - -Backend::~Backend() -{ - gwbuf_free(m_map_queue); - gwbuf_free(m_pending_cmd); -} - - -bool Backend::execute_sescmd() -{ - if (BREF_IS_CLOSED(this)) - { - return false; - } - - CHK_DCB(m_dcb); - - int rc = 0; - - /** Return if there are no pending ses commands */ - if (m_session_commands.size() == 0) - { - MXS_INFO("Cursor had no pending session commands."); - return false; - } - - SessionCommandList::iterator iter = m_session_commands.begin(); - GWBUF *buffer = iter->copy_buffer().release(); - - switch (iter->get_command()) - { - case MYSQL_COM_CHANGE_USER: - /** This makes it possible to handle replies correctly */ - gwbuf_set_type(buffer, GWBUF_TYPE_SESCMD); - rc = m_dcb->func.auth(m_dcb, NULL, m_dcb->session, buffer); - break; - - case MYSQL_COM_QUERY: - default: - /** - * Mark session command buffer, it triggers writing - * MySQL command to protocol - */ - gwbuf_set_type(buffer, GWBUF_TYPE_SESCMD); - rc = m_dcb->func.write(m_dcb, buffer); - break; - } - - return rc == 1; -} - -void Backend::clear_state(enum bref_state state) -{ - if (state != BREF_WAITING_RESULT) - { - m_state &= ~state; - } - else - { - /** Decrease global operation count */ - ss_debug(int prev2 = )atomic_add(&m_backend->server->stats.n_current_ops, -1); - ss_dassert(prev2 > 0); - } -} - -void Backend::set_state(enum bref_state state) -{ - if (state != BREF_WAITING_RESULT) - { - m_state |= state; - } - else - { - /** Increase global operation count */ - ss_debug(int prev2 = )atomic_add(&m_backend->server->stats.n_current_ops, 1); - ss_dassert(prev2 >= 0); - } -} - -SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter* router): +SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter* router, BackendList& backends): mxs::RouterSession(session), m_closed(false), m_client(session->client_dcb), m_mysql_session((MYSQL_session*)session->client_dcb->data), + m_backends(backends), m_config(&router->m_config), m_router(router), m_shard(m_router->m_shard_manager.get_shard(m_client->user, m_config->refresh_min_interval)), @@ -160,20 +71,6 @@ SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter* rou m_state |= INIT_USE_DB; } - for (SERVER_REF *ref = m_router->m_service->dbref; ref; ref = ref->next) - { - if (ref->active) - { - m_backends.push_back(Backend(ref)); - } - } - - if (!connect_backend_servers(m_backends, session)) - { - // TODO: Figure out how to avoid this throw - throw std::runtime_error("Failed to connect to backend servers"); - } - if (db[0]) { /* Store the database the client is connecting to */ @@ -200,25 +97,25 @@ void SchemaRouterSession::close() for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { - DCB* dcb = it->m_dcb; + DCB* dcb = (*it)->m_dcb; /** Close those which had been connected */ - if (BREF_IS_IN_USE(it)) + if (BREF_IS_IN_USE(*it)) { CHK_DCB(dcb); /** Clean operation counter in bref and in SERVER */ - while (BREF_IS_WAITING_RESULT(it)) + while (BREF_IS_WAITING_RESULT(*it)) { - it->clear_state(BREF_WAITING_RESULT); + (*it)->clear_state(BREF_WAITING_RESULT); } - it->clear_state(BREF_IN_USE); - it->set_state(BREF_CLOSED); + (*it)->clear_state(BREF_IN_USE); + (*it)->set_state(BREF_CLOSED); /** * closes protocol and dcb */ dcb_close(dcb); /** decrease server current connection counters */ - atomic_add(&it->m_backend->connections, -1); + atomic_add(&(*it)->m_backend->connections, -1); } } @@ -350,7 +247,7 @@ SERVER* SchemaRouterSession::resolve_query_target(GWBUF* pPacket, { for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { - SERVER *server = it->m_backend->server; + SERVER *server = (*it)->m_backend->server; if (SERVER_IS_RUNNING(server)) { route_target = TARGET_NAMED_SERVER; @@ -435,7 +332,7 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) if (m_load_target) { /** A load data local infile is active */ - target = m_load_target->m_backend->server; + target = m_load_target; route_target = TARGET_NAMED_SERVER; if (is_empty_packet(pPacket)) @@ -539,11 +436,11 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) get_shard_dcb(&target_dcb, target->unique_name)) { /** We know where to route this query */ - Backend *bref = get_bref_from_dcb(target_dcb); + SBackend bref = get_bref_from_dcb(target_dcb); if (op == QUERY_OP_LOAD) { - m_load_target = bref; + m_load_target = bref->m_backend->server; } MXS_INFO("Route query to \t%s:%d <", bref->m_backend->server->name, bref->m_backend->server->port); @@ -558,16 +455,10 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) } else if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(pPacket))) == 1) { - Backend* bref; - - atomic_add(&m_router->m_stats.n_queries, 1); - - /** - * Add one query response waiter to backend reference - */ - bref = get_bref_from_dcb(target_dcb); + /** Add one query response waiter to backend reference */ bref->set_state(BREF_QUERY_ACTIVE); bref->set_state(BREF_WAITING_RESULT); + atomic_add(&m_router->m_stats.n_queries, 1); } else { @@ -578,7 +469,7 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) gwbuf_free(pPacket); return ret; } -void SchemaRouterSession::handle_mapping_reply(Backend* bref, GWBUF** pPacket) +void SchemaRouterSession::handle_mapping_reply(SBackend& bref, GWBUF** pPacket) { int rc = inspect_backend_mapping_states(bref, pPacket); @@ -612,7 +503,7 @@ void SchemaRouterSession::handle_mapping_reply(Backend* bref, GWBUF** pPacket) } } -void SchemaRouterSession::process_response(Backend* bref, GWBUF** ppPacket) +void SchemaRouterSession::process_response(SBackend& bref, GWBUF** ppPacket) { if (bref->m_session_commands.size() > 0) { @@ -651,9 +542,9 @@ void SchemaRouterSession::process_response(Backend* bref, GWBUF** ppPacket) void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb) { - Backend* bref = get_bref_from_dcb(pDcb); + SBackend bref = get_bref_from_dcb(pDcb); - if (m_closed || bref == NULL) + if (m_closed || bref.get() == NULL) // The bref should always be valid { gwbuf_free(pPacket); return; @@ -741,14 +632,21 @@ void SchemaRouterSession::handleError(GWBUF* pMessage, CHK_SESSION(session); + SBackend bref = get_bref_from_dcb(pProblem); + + if (bref.get() == NULL) // Should never happen + { + return; + } + switch (action) { case ERRACT_NEW_CONNECTION: - *pSuccess = handle_error_new_connection(pProblem, pMessage); + *pSuccess = handle_error_new_connection(bref, pMessage); break; case ERRACT_REPLY_CLIENT: - handle_error_reply_client(pProblem, pMessage); + handle_error_reply_client(bref, pMessage); *pSuccess = false; /*< no new backend servers were made available */ break; @@ -858,20 +756,20 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command) for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { - if (BREF_IS_IN_USE(it)) + if (BREF_IS_IN_USE(*it)) { GWBUF *buffer = gwbuf_clone(querybuf); - it->m_session_commands.push_back(SessionCommand(buffer, m_sent_sescmd)); + (*it)->m_session_commands.push_back(SessionCommand(buffer, m_sent_sescmd)); if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) { MXS_INFO("Route query to %s\t%s:%d", - SERVER_IS_MASTER(it->m_backend->server) ? "master" : "slave", - it->m_backend->server->name, - it->m_backend->server->port); + SERVER_IS_MASTER((*it)->m_backend->server) ? "master" : "slave", + (*it)->m_backend->server->name, + (*it)->m_backend->server->port); } - if (it->m_session_commands.size() == 1) + if ((*it)->m_session_commands.size() == 1) { /** Only one command, execute it */ switch (command) @@ -882,11 +780,11 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command) break; default: - it->set_state(BREF_WAITING_RESULT); + (*it)->set_state(BREF_WAITING_RESULT); break; } - if (it->execute_sescmd()) + if ((*it)->execute_sescmd()) { succp = true; } @@ -894,17 +792,17 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command) { MXS_ERROR("Failed to execute session " "command in %s:%d", - it->m_backend->server->name, - it->m_backend->server->port); + (*it)->m_backend->server->name, + (*it)->m_backend->server->port); } } else { - ss_dassert(it->m_session_commands.size() > 1); + ss_dassert((*it)->m_session_commands.size() > 1); /** The server is already executing a session command */ MXS_INFO("Backend %s:%d already executing sescmd.", - it->m_backend->server->name, - it->m_backend->server->port); + (*it)->m_backend->server->name, + (*it)->m_backend->server->port); succp = true; } } @@ -913,20 +811,14 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command) return succp; } -void SchemaRouterSession::handle_error_reply_client(DCB* dcb, GWBUF* errmsg) +void SchemaRouterSession::handle_error_reply_client(SBackend& bref, GWBUF* errmsg) { - Backend* bref = get_bref_from_dcb(dcb); + bref->clear_state(BREF_IN_USE); + bref->set_state(BREF_CLOSED); - if (bref) + if (m_client->session->state == SESSION_STATE_ROUTER_READY) { - - bref->clear_state(BREF_IN_USE); - bref->set_state(BREF_CLOSED); - } - - if (dcb->session->state == SESSION_STATE_ROUTER_READY) - { - dcb->session->client_dcb->func.write(dcb->session->client_dcb, gwbuf_clone(errmsg)); + m_client->func.write(m_client, gwbuf_clone(errmsg)); } } @@ -940,7 +832,7 @@ bool SchemaRouterSession::have_servers() { for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { - if (BREF_IS_IN_USE(it) && !BREF_IS_CLOSED(it)) + if (BREF_IS_IN_USE(*it) && !BREF_IS_CLOSED(*it)) { return true; } @@ -962,32 +854,19 @@ bool SchemaRouterSession::have_servers() * * @return true if there are enough backend connections to continue, false if not */ -bool SchemaRouterSession::handle_error_new_connection(DCB* backend_dcb, GWBUF* errmsg) +bool SchemaRouterSession::handle_error_new_connection(SBackend& bref, GWBUF* errmsg) { - MXS_SESSION *ses = backend_dcb->session; - CHK_SESSION(ses); - - Backend* bref = get_bref_from_dcb(backend_dcb); - - if (bref == NULL) - { - /** This should not happen */ - ss_dassert(false); - return false; - } - - /** - * If query was sent through the bref and it is waiting for reply from - * the backend server it is necessary to send an error to the client - * because it is waiting for reply. - */ if (BREF_IS_WAITING_RESULT(bref)) { - DCB* client_dcb; - client_dcb = ses->client_dcb; - client_dcb->func.write(client_dcb, gwbuf_clone(errmsg)); + /** + * If query was sent through the bref and it is waiting for reply from + * the backend server it is necessary to send an error to the client + * because it is waiting for reply. + */ + m_client->func.write(m_client, gwbuf_clone(errmsg)); bref->clear_state(BREF_WAITING_RESULT); } + bref->clear_state(BREF_IN_USE); bref->set_state(BREF_CLOSED); @@ -1002,19 +881,21 @@ bool SchemaRouterSession::handle_error_new_connection(DCB* backend_dcb, GWBUF* e * * @return backend reference pointer if succeed or NULL */ -Backend* SchemaRouterSession::get_bref_from_dcb(DCB* dcb) +SBackend SchemaRouterSession::get_bref_from_dcb(DCB* dcb) { CHK_DCB(dcb); for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { - if (it->m_dcb == dcb) + if ((*it)->m_dcb == dcb) { - return &(*it); + return *it; } } - return NULL; + // This should not happen + ss_dassert(false); + return SBackend(NULL); } /** @@ -1212,7 +1093,7 @@ void SchemaRouterSession::route_queued_query() * @param router_cli_ses Router client session * @return 1 if mapping is done, 0 if it is still ongoing and -1 on error */ -int SchemaRouterSession::inspect_backend_mapping_states(Backend *bref, +int SchemaRouterSession::inspect_backend_mapping_states(SBackend& bref, GWBUF** wbuf) { bool mapped = true; @@ -1220,20 +1101,19 @@ int SchemaRouterSession::inspect_backend_mapping_states(Backend *bref, for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { - if (bref->m_dcb == it->m_dcb && !BREF_IS_MAPPED(it)) + if (bref->m_dcb == (*it)->m_dcb && !BREF_IS_MAPPED(*it)) { if (bref->m_map_queue) { writebuf = gwbuf_append(bref->m_map_queue, writebuf); bref->m_map_queue = NULL; } - enum showdb_response rc = parse_showdb_response(&(*it), - &writebuf); + enum showdb_response rc = parse_showdb_response(*it, &writebuf); if (rc == SHOWDB_FULL_RESPONSE) { - it->m_mapped = true; + (*it)->m_mapped = true; MXS_DEBUG("Received SHOW DATABASES reply from %s for session %p", - it->m_backend->server->unique_name, + (*it)->m_backend->server->unique_name, m_client->session); } else if (rc == SHOWDB_PARTIAL_RESPONSE) @@ -1241,7 +1121,7 @@ int SchemaRouterSession::inspect_backend_mapping_states(Backend *bref, bref->m_map_queue = writebuf; writebuf = NULL; MXS_DEBUG("Received partial SHOW DATABASES reply from %s for session %p", - it->m_backend->server->unique_name, + (*it)->m_backend->server->unique_name, m_client->session); } else @@ -1290,12 +1170,11 @@ int SchemaRouterSession::inspect_backend_mapping_states(Backend *bref, } } - if (BREF_IS_IN_USE(it) && !BREF_IS_MAPPED(it)) + if (BREF_IS_IN_USE(*it) && !BREF_IS_MAPPED(*it)) { mapped = false; MXS_DEBUG("Still waiting for reply to SHOW DATABASES from %s for session %p", - it->m_backend->server->unique_name, - m_client->session); + (*it)->m_backend->server->unique_name, m_client->session); } } *wbuf = writebuf; @@ -1466,7 +1345,7 @@ bool SchemaRouterSession::ignore_duplicate_database(const char* data) * @return 1 if a complete response was received, 0 if a partial response was received * and -1 if a database was found on more than one server. */ -enum showdb_response SchemaRouterSession::parse_showdb_response(Backend* bref, GWBUF** buffer) +enum showdb_response SchemaRouterSession::parse_showdb_response(SBackend& bref, GWBUF** buffer) { unsigned char* ptr; SERVER* target = bref->m_backend->server; @@ -1591,8 +1470,8 @@ int SchemaRouterSession::gen_databaselist() for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { - it->m_mapped = false; - it->m_num_mapping_eof = 0; + (*it)->m_mapped = false; + (*it)->m_num_mapping_eof = 0; } m_state |= INIT_MAPPING; @@ -1609,15 +1488,15 @@ int SchemaRouterSession::gen_databaselist() for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { - if (BREF_IS_IN_USE(it) && - !BREF_IS_CLOSED(it) & - SERVER_IS_RUNNING(it->m_backend->server)) + if (BREF_IS_IN_USE(*it) && + !BREF_IS_CLOSED(*it) & + SERVER_IS_RUNNING((*it)->m_backend->server)) { clone = gwbuf_clone(buffer); - dcb = it->m_dcb; + dcb = (*it)->m_dcb; rval |= !dcb->func.write(dcb, clone); MXS_DEBUG("Wrote SHOW DATABASES to %s for session %p: returned %d", - it->m_backend->server->unique_name, + (*it)->m_backend->server->unique_name, m_client->session, rval); } @@ -1718,11 +1597,11 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype) { for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { - char *srvnm = it->m_backend->server->unique_name; + char *srvnm = (*it)->m_backend->server->unique_name; if (strcmp(srvnm, (char*)buffer->hint->data) == 0) { - rval = it->m_backend->server; + rval = (*it)->m_backend->server; MXS_INFO("Routing hint found (%s)", rval->unique_name); } } @@ -1767,19 +1646,19 @@ bool SchemaRouterSession::get_shard_dcb(DCB** p_dcb, char* name) for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { - SERVER_REF* b = it->m_backend; + SERVER_REF* b = (*it)->m_backend; /** * To become chosen: * backend must be in use, name must match, and * the backend state must be RUNNING */ - if (BREF_IS_IN_USE((&(*it))) && + if (BREF_IS_IN_USE((*it)) && (strncasecmp(name, b->server->unique_name, PATH_MAX) == 0) && SERVER_IS_RUNNING(b->server)) { - *p_dcb = it->m_dcb; + *p_dcb = (*it)->m_dcb; succp = true; - ss_dassert(it->m_dcb->state != DCB_STATE_ZOMBIE); + ss_dassert((*it)->m_dcb->state != DCB_STATE_ZOMBIE); break; } } @@ -1881,131 +1760,3 @@ bool SchemaRouterSession::send_database_list() return rval; } - -/** - * @node Search all RUNNING backend servers and connect - * - * Parameters: - * @param backend_ref - in, use, out - * Pointer to backend server reference object array. - * NULL is not allowed. - * - * @param router_nservers - in, use - * Number of backend server pointers pointed to by b. - * - * @param session - in, use - * MaxScale session pointer used when connection to backend is established. - * - * @param router - in, use - * Pointer to router instance. Used when server states are qualified. - * - * @return true, if at least one master and one slave was found. - * - * - * @details It is assumed that there is only one available server. - * There will be exactly as many backend references than there are - * connections because all servers are supposed to be operational. It is, - * however, possible that there are less available servers than expected. - */ -bool connect_backend_servers(BackendList& backends, MXS_SESSION* session) -{ - bool succp = false; - int servers_found = 0; - int servers_connected = 0; - int slaves_connected = 0; - - if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) - { - MXS_INFO("Servers and connection counts:"); - - for (BackendList::iterator it = backends.begin(); it != backends.end(); it++) - { - SERVER_REF* b = it->m_backend; - - MXS_INFO("MaxScale connections : %d (%d) in \t%s:%d %s", - b->connections, - b->server->stats.n_current, - b->server->name, - b->server->port, - STRSRVSTATUS(b->server)); - } - } - /** - * Scan server list and connect each of them. None should fail or session - * can't be established. - */ - for (BackendList::iterator it = backends.begin(); it != backends.end(); it++) - { - SERVER_REF* b = it->m_backend; - - if (SERVER_IS_RUNNING(b->server)) - { - servers_found += 1; - - /** Server is already connected */ - if (BREF_IS_IN_USE(it)) - { - slaves_connected += 1; - } - /** New server connection */ - else - { - if ((it->m_dcb = dcb_connect(b->server, session, b->server->protocol))) - { - servers_connected += 1; - /** - * When server fails, this callback - * is called. - * !!! Todo, routine which removes - * corresponding entries from the hash - * table. - */ - - it->m_state = 0; - it->set_state(BREF_IN_USE); - /** - * Increase backend connection counter. - * Server's stats are _increased_ in - * dcb.c:dcb_alloc ! - * But decreased in the calling function - * of dcb_close. - */ - atomic_add(&b->connections, 1); - } - else - { - succp = false; - MXS_ERROR("Unable to establish " - "connection with slave %s:%d", - b->server->name, - b->server->port); - /* handle connect error */ - break; - } - } - } - } - - if (servers_connected > 0) - { - succp = true; - - if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) - { - for (BackendList::iterator it = backends.begin(); it != backends.end(); it++) - { - SERVER_REF* b = it->m_backend; - - if (BREF_IS_IN_USE(it)) - { - MXS_INFO("Connected %s in \t%s:%d", - STRSRVSTATUS(b->server), - b->server->name, - b->server->port); - } - } - } - } - - return succp; -} diff --git a/server/modules/routing/schemarouter/schemaroutersession.hh b/server/modules/routing/schemarouter/schemaroutersession.hh index 2832d6096..40a180853 100644 --- a/server/modules/routing/schemarouter/schemaroutersession.hh +++ b/server/modules/routing/schemarouter/schemaroutersession.hh @@ -25,8 +25,8 @@ using std::string; using std::list; -using schemarouter::Config; -using schemarouter::Stats; + +using namespace schemarouter; /** * Bitmask values for the router session's initialization. These values are used @@ -49,18 +49,6 @@ enum showdb_response SHOWDB_FATAL_ERROR }; -/** - * The state of the backend server reference - */ -enum bref_state -{ - BREF_IN_USE = 0x01, - BREF_WAITING_RESULT = 0x02, /**< for session commands only */ - BREF_QUERY_ACTIVE = 0x04, /**< for other queries */ - BREF_CLOSED = 0x08, - BREF_DB_MAPPED = 0x10 -}; - #define SCHEMA_ERR_DUPLICATEDB 5000 #define SCHEMA_ERRSTR_DUPLICATEDB "DUPDB" #define SCHEMA_ERR_DBNOTFOUND 1049 @@ -83,42 +71,6 @@ enum route_target #define TARGET_IS_ALL(t) (t == TARGET_ALL) #define TARGET_IS_ANY(t) (t == TARGET_ANY) -/** - * Reference to BACKEND. - * - * Owned by router client session. - */ -class Backend -{ -public: - Backend(SERVER_REF *ref); - ~Backend(); - bool execute_sescmd(); - void clear_state(enum bref_state state); - void set_state(enum bref_state state); - - SERVER_REF* m_backend; /**< Backend server */ - DCB* m_dcb; /**< Backend DCB */ - GWBUF* m_map_queue; - bool m_mapped; /**< Whether the backend has been mapped */ - int m_num_mapping_eof; - int m_num_result_wait; /**< Number of not yet received results */ - GWBUF* m_pending_cmd; /**< Pending commands */ - int m_state; /**< State of the backend */ - SessionCommandList m_session_commands; /**< List of session commands that are - * to be executed on this backend server */ -}; - -typedef list BackendList; - -// TODO: Move these as member functions, currently they operate on iterators -#define BREF_IS_NOT_USED(s) ((s)->m_state & ~BREF_IN_USE) -#define BREF_IS_IN_USE(s) ((s)->m_state & BREF_IN_USE) -#define BREF_IS_WAITING_RESULT(s) ((s)->m_num_result_wait > 0) -#define BREF_IS_QUERY_ACTIVE(s) ((s)->m_state & BREF_QUERY_ACTIVE) -#define BREF_IS_CLOSED(s) ((s)->m_state & BREF_CLOSED) -#define BREF_IS_MAPPED(s) ((s)->m_mapped) - class SchemaRouter; /** @@ -128,7 +80,7 @@ class SchemaRouterSession: public mxs::RouterSession { public: - SchemaRouterSession(MXS_SESSION* session, SchemaRouter* router); + SchemaRouterSession(MXS_SESSION* session, SchemaRouter* router, BackendList& backends); /** * The RouterSession instance will be deleted when a client session @@ -172,22 +124,22 @@ public: private: /** Internal functions */ SERVER* get_shard_target(GWBUF* buffer, uint32_t qtype); - Backend* get_bref_from_dcb(DCB* dcb); + SBackend get_bref_from_dcb(DCB* dcb); bool get_shard_dcb(DCB** dcb, char* name); bool handle_default_db(); - bool handle_error_new_connection(DCB* backend_dcb, GWBUF* errmsg); + bool handle_error_new_connection(SBackend& bref, GWBUF* errmsg); + void handle_error_reply_client(SBackend& bref, GWBUF* errmsg); bool have_servers(); bool route_session_write(GWBUF* querybuf, uint8_t command); bool send_database_list(); int gen_databaselist(); - int inspect_backend_mapping_states(Backend *bref, GWBUF** wbuf); + int inspect_backend_mapping_states(SBackend& bref, GWBUF** wbuf); bool process_show_shards(); - enum showdb_response parse_showdb_response(Backend* bref, GWBUF** buffer); - void handle_error_reply_client(DCB* backend_dcb, GWBUF* errmsg); + enum showdb_response parse_showdb_response(SBackend& bref, GWBUF** buffer); void route_queued_query(); void synchronize_shard_map(); - void handle_mapping_reply(Backend* bref, GWBUF** pPacket); - void process_response(Backend* bref, GWBUF** ppPacket); + void handle_mapping_reply(SBackend& bref, GWBUF** pPacket); + void process_response(SBackend& bref, GWBUF** ppPacket); SERVER* resolve_query_target(GWBUF* pPacket, uint32_t type, uint8_t command, enum route_target& route_target); bool ignore_duplicate_database(const char* data); @@ -207,5 +159,5 @@ private: Stats m_stats; /**< Statistics for this router */ uint64_t m_sent_sescmd; /**< The latest session command being executed */ uint64_t m_replied_sescmd; /**< The last session command reply that was sent to the client */ - Backend* m_load_target; /**< Target for LOAD DATA LOCAL INFILE */ + SERVER* m_load_target; /**< Target for LOAD DATA LOCAL INFILE */ };