diff --git a/server/modules/routing/schemarouter/schemarouter.cc b/server/modules/routing/schemarouter/schemarouter.cc index 742a51c55..26db7eb7d 100644 --- a/server/modules/routing/schemarouter/schemarouter.cc +++ b/server/modules/routing/schemarouter/schemarouter.cc @@ -18,27 +18,62 @@ using namespace schemarouter; Backend::Backend(SERVER_REF *ref): + m_closed(false), m_backend(ref), m_dcb(NULL), m_map_queue(NULL), m_mapped(false), m_num_mapping_eof(0), m_num_result_wait(0), - m_pending_cmd(NULL), m_state(0) { } Backend::~Backend() { + ss_dassert(m_closed); + + if (!m_closed) + { + close(); + } + gwbuf_free(m_map_queue); - gwbuf_free(m_pending_cmd); } +void Backend::close() +{ + if (!m_closed) + { + m_closed = true; + + if (BREF_IS_IN_USE(this)) + { + CHK_DCB(m_dcb); + + /** Clean operation counter in bref and in SERVER */ + while (BREF_IS_WAITING_RESULT(this)) + { + clear_state(BREF_WAITING_RESULT); + } + clear_state(BREF_IN_USE); + set_state(BREF_CLOSED); + + dcb_close(m_dcb); + + /** decrease server current connection counters */ + atomic_add(&m_backend->connections, -1); + } + } + else + { + ss_dassert(false); + } +} bool Backend::execute_sescmd() { - if (BREF_IS_CLOSED(this)) + if (BREF_IS_CLOSED(this) || m_session_commands.size() == 0) { return false; } @@ -106,3 +141,54 @@ void Backend::set_state(enum bref_state state) ss_dassert(prev2 >= 0); } } + +SERVER_REF* Backend::backend() const +{ + return m_backend; +} + +bool Backend::connect(MXS_SESSION* session) +{ + bool rval = false; + + if ((m_dcb = dcb_connect(m_backend->server, session, m_backend->server->protocol))) + { + m_state = BREF_IN_USE; + atomic_add(&m_backend->connections, 1); + rval = true; + } + + return rval; +} + +DCB* Backend::dcb() const +{ + return m_dcb; +} + +bool Backend::write(GWBUF* buffer) +{ + return m_dcb->func.write(m_dcb, buffer) != 0; +} + +void Backend::store_command(GWBUF* buffer) +{ + m_pending_cmd.reset(buffer); +} + +bool Backend::write_stored_command() +{ + bool rval = false; + + if (m_pending_cmd.length()) + { + rval = write(m_pending_cmd.release()); + + if (!rval) + { + MXS_ERROR("Routing of pending query failed."); + } + } + + return rval; +} diff --git a/server/modules/routing/schemarouter/schemarouter.hh b/server/modules/routing/schemarouter/schemarouter.hh index 5d24cbe43..a2f50d5d2 100644 --- a/server/modules/routing/schemarouter/schemarouter.hh +++ b/server/modules/routing/schemarouter/schemarouter.hh @@ -130,14 +130,25 @@ public: bool execute_sescmd(); void clear_state(enum bref_state state); void set_state(enum bref_state state); + SERVER_REF* backend() const; + bool connect(MXS_SESSION*); + void close(); + DCB* dcb() const; + bool write(GWBUF* buffer); + void store_command(GWBUF* buffer); + bool write_stored_command(); +private: + bool m_closed; /**< True if a connection has been opened and closed */ SERVER_REF* m_backend; /**< Backend server */ DCB* m_dcb; /**< Backend DCB */ + +public: GWBUF* m_map_queue; bool m_mapped; /**< Whether the backend has been mapped */ int m_num_mapping_eof; int m_num_result_wait; /**< Number of not yet received results */ - GWBUF* m_pending_cmd; /**< Pending commands */ + Buffer m_pending_cmd; /**< Pending commands */ int m_state; /**< State of the backend */ SessionCommandList m_session_commands; /**< List of session commands that are * to be executed on this backend server */ diff --git a/server/modules/routing/schemarouter/schemarouterinstance.cc b/server/modules/routing/schemarouter/schemarouterinstance.cc index 5d0b5e89a..7411bb93c 100644 --- a/server/modules/routing/schemarouter/schemarouterinstance.cc +++ b/server/modules/routing/schemarouter/schemarouterinstance.cc @@ -208,7 +208,7 @@ bool connect_backend_servers(BackendList& backends, MXS_SESSION* session) for (BackendList::iterator it = backends.begin(); it != backends.end(); it++) { - SERVER_REF* b = (*it)->m_backend; + SERVER_REF* b = (*it)->backend(); MXS_INFO("MaxScale connections : %d (%d) in \t%s:%d %s", b->connections, @@ -224,7 +224,7 @@ bool connect_backend_servers(BackendList& backends, MXS_SESSION* session) */ for (BackendList::iterator it = backends.begin(); it != backends.end(); it++) { - SERVER_REF* b = (*it)->m_backend; + SERVER_REF* b = (*it)->backend(); if (SERVER_IS_RUNNING(b->server)) { @@ -238,27 +238,9 @@ bool connect_backend_servers(BackendList& backends, MXS_SESSION* session) /** New server connection */ else { - if (((*it)->m_dcb = dcb_connect(b->server, session, b->server->protocol))) + if ((*it)->connect(session)) { servers_connected += 1; - /** - * When server fails, this callback - * is called. - * !!! Todo, routine which removes - * corresponding entries from the hash - * table. - */ - - (*it)->m_state = 0; - (*it)->set_state(BREF_IN_USE); - /** - * Increase backend connection counter. - * Server's stats are _increased_ in - * dcb.c:dcb_alloc ! - * But decreased in the calling function - * of dcb_close. - */ - atomic_add(&b->connections, 1); } else { @@ -282,7 +264,7 @@ bool connect_backend_servers(BackendList& backends, MXS_SESSION* session) { for (BackendList::iterator it = backends.begin(); it != backends.end(); it++) { - SERVER_REF* b = (*it)->m_backend; + SERVER_REF* b = (*it)->backend(); if (BREF_IS_IN_USE((*it))) { diff --git a/server/modules/routing/schemarouter/schemaroutersession.cc b/server/modules/routing/schemarouter/schemaroutersession.cc index 635fbed73..5ba3dd120 100644 --- a/server/modules/routing/schemarouter/schemaroutersession.cc +++ b/server/modules/routing/schemarouter/schemaroutersession.cc @@ -97,26 +97,9 @@ void SchemaRouterSession::close() for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { - DCB* dcb = (*it)->m_dcb; - /** Close those which had been connected */ - if (BREF_IS_IN_USE(*it)) - { - CHK_DCB(dcb); - - /** Clean operation counter in bref and in SERVER */ - while (BREF_IS_WAITING_RESULT(*it)) - { - (*it)->clear_state(BREF_WAITING_RESULT); - } - (*it)->clear_state(BREF_IN_USE); - (*it)->set_state(BREF_CLOSED); - /** - * closes protocol and dcb - */ - dcb_close(dcb); - /** decrease server current connection counters */ - atomic_add(&(*it)->m_backend->connections, -1); - } + /** The backends are closed here to trigger the shutdown of + * the connected DCBs */ + (*it)->close(); } spinlock_acquire(&m_router->m_lock); @@ -247,7 +230,7 @@ SERVER* SchemaRouterSession::resolve_query_target(GWBUF* pPacket, { for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { - SERVER *server = (*it)->m_backend->server; + SERVER *server = (*it)->backend()->server; if (SERVER_IS_RUNNING(server)) { route_target = TARGET_NAMED_SERVER; @@ -440,17 +423,17 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) if (op == QUERY_OP_LOAD) { - m_load_target = bref->m_backend->server; + m_load_target = bref->backend()->server; } - MXS_INFO("Route query to \t%s:%d <", bref->m_backend->server->name, bref->m_backend->server->port); + MXS_INFO("Route query to \t%s:%d <", bref->backend()->server->name, bref->backend()->server->port); if (bref->m_session_commands.size() > 0) { /** Store current statement if execution of the previous * session command hasn't been completed. */ - ss_dassert((bref->m_pending_cmd == NULL || m_closed)); - bref->m_pending_cmd = pPacket; + bref->store_command(pPacket); + pPacket = NULL; ret = 1; } else if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(pPacket))) == 1) @@ -552,7 +535,7 @@ void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb) MXS_DEBUG("Reply from [%s] session [%p]" " mapping [%s] queries queued [%s]", - bref->m_backend->server->unique_name, + bref->backend()->server->unique_name, m_client->session, m_state & INIT_MAPPING ? "true" : "false", m_queue.size() == 0 ? "none" : @@ -591,29 +574,16 @@ void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb) pPacket = NULL; } - if (bref->m_session_commands.size() > 0) + if (bref->execute_sescmd()) { - /** There are pending session commands to be executed. */ MXS_INFO("Backend %s:%d processed reply and starts to execute active cursor.", - bref->m_backend->server->name, bref->m_backend->server->port); - bref->execute_sescmd(); + bref->backend()->server->name, bref->backend()->server->port); } - else if (bref->m_pending_cmd) /*< non-sescmd is waiting to be routed */ + else if (bref->write_stored_command()) { - CHK_GWBUF(bref->m_pending_cmd); - int ret = bref->m_dcb->func.write(bref->m_dcb, bref->m_pending_cmd); - bref->m_pending_cmd = NULL; - - if (ret == 1) - { - atomic_add(&m_router->m_stats.n_queries, 1); - bref->set_state(BREF_QUERY_ACTIVE); - bref->set_state(BREF_WAITING_RESULT); - } - else - { - MXS_ERROR("Routing of pending query failed."); - } + atomic_add(&m_router->m_stats.n_queries, 1); + bref->set_state(BREF_QUERY_ACTIVE); + bref->set_state(BREF_WAITING_RESULT); } } @@ -764,9 +734,9 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command) if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) { MXS_INFO("Route query to %s\t%s:%d", - SERVER_IS_MASTER((*it)->m_backend->server) ? "master" : "slave", - (*it)->m_backend->server->name, - (*it)->m_backend->server->port); + SERVER_IS_MASTER((*it)->backend()->server) ? "master" : "slave", + (*it)->backend()->server->name, + (*it)->backend()->server->port); } if ((*it)->m_session_commands.size() == 1) @@ -792,8 +762,8 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command) { MXS_ERROR("Failed to execute session " "command in %s:%d", - (*it)->m_backend->server->name, - (*it)->m_backend->server->port); + (*it)->backend()->server->name, + (*it)->backend()->server->port); } } else @@ -801,8 +771,8 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command) ss_dassert((*it)->m_session_commands.size() > 1); /** The server is already executing a session command */ MXS_INFO("Backend %s:%d already executing sescmd.", - (*it)->m_backend->server->name, - (*it)->m_backend->server->port); + (*it)->backend()->server->name, + (*it)->backend()->server->port); succp = true; } } @@ -887,7 +857,7 @@ SBackend SchemaRouterSession::get_bref_from_dcb(DCB* dcb) for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { - if ((*it)->m_dcb == dcb) + if ((*it)->dcb() == dcb) { return *it; } @@ -1101,7 +1071,7 @@ int SchemaRouterSession::inspect_backend_mapping_states(SBackend& bref, for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { - if (bref->m_dcb == (*it)->m_dcb && !BREF_IS_MAPPED(*it)) + if (bref->dcb() == (*it)->dcb() && !BREF_IS_MAPPED(*it)) { if (bref->m_map_queue) { @@ -1113,7 +1083,7 @@ int SchemaRouterSession::inspect_backend_mapping_states(SBackend& bref, { (*it)->m_mapped = true; MXS_DEBUG("Received SHOW DATABASES reply from %s for session %p", - (*it)->m_backend->server->unique_name, + (*it)->backend()->server->unique_name, m_client->session); } else if (rc == SHOWDB_PARTIAL_RESPONSE) @@ -1121,7 +1091,7 @@ int SchemaRouterSession::inspect_backend_mapping_states(SBackend& bref, bref->m_map_queue = writebuf; writebuf = NULL; MXS_DEBUG("Received partial SHOW DATABASES reply from %s for session %p", - (*it)->m_backend->server->unique_name, + (*it)->backend()->server->unique_name, m_client->session); } else @@ -1174,7 +1144,7 @@ int SchemaRouterSession::inspect_backend_mapping_states(SBackend& bref, { mapped = false; MXS_DEBUG("Still waiting for reply to SHOW DATABASES from %s for session %p", - (*it)->m_backend->server->unique_name, m_client->session); + (*it)->backend()->server->unique_name, m_client->session); } } *wbuf = writebuf; @@ -1348,7 +1318,7 @@ bool SchemaRouterSession::ignore_duplicate_database(const char* data) enum showdb_response SchemaRouterSession::parse_showdb_response(SBackend& bref, GWBUF** buffer) { unsigned char* ptr; - SERVER* target = bref->m_backend->server; + SERVER* target = bref->backend()->server; GWBUF* buf; bool duplicate_found = false; enum showdb_response rval = SHOWDB_PARTIAL_RESPONSE; @@ -1429,12 +1399,12 @@ enum showdb_response SchemaRouterSession::parse_showdb_response(SBackend& bref, { atomic_add(&bref->m_num_mapping_eof, 1); MXS_INFO("SHOW DATABASES fully received from %s.", - bref->m_backend->server->unique_name); + bref->backend()->server->unique_name); } else { MXS_INFO("SHOW DATABASES partially received from %s.", - bref->m_backend->server->unique_name); + bref->backend()->server->unique_name); } gwbuf_free(buf); @@ -1460,7 +1430,7 @@ enum showdb_response SchemaRouterSession::parse_showdb_response(SBackend& bref, * @param session Router client session * @return 1 if all writes to backends were succesful and 0 if one or more errors occurred */ -int SchemaRouterSession::gen_databaselist() +void SchemaRouterSession::gen_databaselist() { DCB* dcb; const char* query = "SHOW DATABASES"; @@ -1488,21 +1458,18 @@ int SchemaRouterSession::gen_databaselist() for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { - if (BREF_IS_IN_USE(*it) && - !BREF_IS_CLOSED(*it) & - SERVER_IS_RUNNING((*it)->m_backend->server)) + if (BREF_IS_IN_USE(*it) && !BREF_IS_CLOSED(*it) & + SERVER_IS_RUNNING((*it)->backend()->server)) { clone = gwbuf_clone(buffer); - dcb = (*it)->m_dcb; - rval |= !dcb->func.write(dcb, clone); - MXS_DEBUG("Wrote SHOW DATABASES to %s for session %p: returned %d", - (*it)->m_backend->server->unique_name, - m_client->session, - rval); + if (!(*it)->write(clone)) + { + MXS_ERROR("Failed to write SHOW DATABASES to '%s'", + (*it)->backend()->server->unique_name); + } } } gwbuf_free(buffer); - return !rval; } /** @@ -1597,11 +1564,11 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype) { for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { - char *srvnm = (*it)->m_backend->server->unique_name; + char *srvnm = (*it)->backend()->server->unique_name; if (strcmp(srvnm, (char*)buffer->hint->data) == 0) { - rval = (*it)->m_backend->server; + rval = (*it)->backend()->server; MXS_INFO("Routing hint found (%s)", rval->unique_name); } } @@ -1646,7 +1613,7 @@ bool SchemaRouterSession::get_shard_dcb(DCB** p_dcb, char* name) for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { - SERVER_REF* b = (*it)->m_backend; + SERVER_REF* b = (*it)->backend(); /** * To become chosen: * backend must be in use, name must match, and @@ -1656,9 +1623,8 @@ bool SchemaRouterSession::get_shard_dcb(DCB** p_dcb, char* name) (strncasecmp(name, b->server->unique_name, PATH_MAX) == 0) && SERVER_IS_RUNNING(b->server)) { - *p_dcb = (*it)->m_dcb; + *p_dcb = (*it)->dcb(); succp = true; - ss_dassert((*it)->m_dcb->state != DCB_STATE_ZOMBIE); break; } } diff --git a/server/modules/routing/schemarouter/schemaroutersession.hh b/server/modules/routing/schemarouter/schemaroutersession.hh index 40a180853..3e051e5c6 100644 --- a/server/modules/routing/schemarouter/schemaroutersession.hh +++ b/server/modules/routing/schemarouter/schemaroutersession.hh @@ -132,7 +132,7 @@ private: bool have_servers(); bool route_session_write(GWBUF* querybuf, uint8_t command); bool send_database_list(); - int gen_databaselist(); + void gen_databaselist(); int inspect_backend_mapping_states(SBackend& bref, GWBUF** wbuf); bool process_show_shards(); enum showdb_response parse_showdb_response(SBackend& bref, GWBUF** buffer);