diff --git a/server/modules/routing/schemarouter/schemarouter.cc b/server/modules/routing/schemarouter/schemarouter.cc index 530fe7e1f..21cca1614 100644 --- a/server/modules/routing/schemarouter/schemarouter.cc +++ b/server/modules/routing/schemarouter/schemarouter.cc @@ -21,9 +21,7 @@ Backend::Backend(SERVER_REF *ref): m_closed(false), m_backend(ref), m_dcb(NULL), - m_map_queue(NULL), m_mapped(false), - m_num_mapping_eof(0), m_num_result_wait(0), m_state(0) { @@ -37,8 +35,6 @@ Backend::~Backend() { close(); } - - gwbuf_free(m_map_queue); } void Backend::close() @@ -71,9 +67,9 @@ void Backend::close() } } -bool Backend::execute_sescmd() +bool Backend::execute_session_command() { - if (is_closed() || m_session_commands.size() == 0) + if (is_closed() || !session_command_count()) { return false; } @@ -82,13 +78,6 @@ bool Backend::execute_sescmd() int rc = 0; - /** Return if there are no pending ses commands */ - if (m_session_commands.size() == 0) - { - MXS_INFO("Cursor had no pending session commands."); - return false; - } - SessionCommandList::iterator iter = m_session_commands.begin(); GWBUF *buffer = iter->copy_buffer().release(); @@ -114,6 +103,23 @@ bool Backend::execute_sescmd() return rc == 1; } +void Backend::add_session_command(GWBUF* buffer, uint64_t sequence) +{ + m_session_commands.push_back(SessionCommand(buffer, sequence)); +} + +uint64_t Backend::complete_session_command() +{ + uint64_t rval = m_session_commands.front().get_position(); + m_session_commands.pop_front(); + return rval; +} + +size_t Backend::session_command_count() const +{ + return m_session_commands.size(); +} + void Backend::clear_state(enum bref_state state) { if (state != BREF_WAITING_RESULT) @@ -213,6 +219,11 @@ bool Backend::is_closed() const return m_state & BREF_CLOSED; } +void Backend::set_mapped(bool value) +{ + m_mapped = value; +} + bool Backend::is_mapped() const { return m_mapped; diff --git a/server/modules/routing/schemarouter/schemarouter.hh b/server/modules/routing/schemarouter/schemarouter.hh index fa1aa0d41..a3ed197f2 100644 --- a/server/modules/routing/schemarouter/schemarouter.hh +++ b/server/modules/routing/schemarouter/schemarouter.hh @@ -127,11 +127,36 @@ public: ~Backend(); /** - * @brief Execute the next session command + * @brief Execute the next session command in the queue * * @return True if the command was executed successfully */ - bool execute_sescmd(); + bool execute_session_command(); + + /** + * @brief Add a new session command to the tail of the command queue + * + * @param buffer Session command to add + * @param sequence Sequence identifier of this session command, returned when + * the session command is completed + */ + void add_session_command(GWBUF* buffer, uint64_t sequence); + + /** + * @brief Mark the current session command as successfully executed + * + * This should be called when the response to the command is received + * + * @return The sequence identifier for this session command + */ + uint64_t complete_session_command(); + + /** + * @brief Check if backend has session commands + * + * @return True if backend has session commands + */ + size_t session_command_count() const; /** * @brief Clear state @@ -231,6 +256,13 @@ public: */ bool is_closed() const; + /** + * @brief Set the mapping state of the backend + * + * @param value Value to set + */ + void set_mapped(bool value); + /** * @brief Check if the backend has been mapped * @@ -242,11 +274,7 @@ private: bool m_closed; /**< True if a connection has been opened and closed */ SERVER_REF* m_backend; /**< Backend server */ DCB* m_dcb; /**< Backend DCB */ - -public: - GWBUF* m_map_queue; bool m_mapped; /**< Whether the backend has been mapped */ - int m_num_mapping_eof; int m_num_result_wait; /**< Number of not yet received results */ Buffer m_pending_cmd; /**< Pending commands */ int m_state; /**< State of the backend */ diff --git a/server/modules/routing/schemarouter/schemaroutersession.cc b/server/modules/routing/schemarouter/schemaroutersession.cc index c8ac8bc25..6ec69e713 100644 --- a/server/modules/routing/schemarouter/schemaroutersession.cc +++ b/server/modules/routing/schemarouter/schemaroutersession.cc @@ -426,7 +426,7 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) MXS_INFO("Route query to \t%s:%d <", bref->backend()->server->name, bref->backend()->server->port); - if (bref->m_session_commands.size() > 0) + if (bref->session_command_count()) { /** Store current statement if execution of the previous * session command hasn't been completed. */ @@ -486,13 +486,14 @@ void SchemaRouterSession::handle_mapping_reply(SBackend& bref, GWBUF** pPacket) void SchemaRouterSession::process_response(SBackend& bref, GWBUF** ppPacket) { - if (bref->m_session_commands.size() > 0) + if (bref->session_command_count()) { /** We are executing a session command */ if (GWBUF_IS_TYPE_SESCMD_RESPONSE((*ppPacket))) { - if (m_replied_sescmd < m_sent_sescmd && - bref->m_session_commands.front().get_position() == m_replied_sescmd + 1) + uint64_t id = bref->complete_session_command(); + + if (m_replied_sescmd < m_sent_sescmd && id == m_replied_sescmd + 1) { /** First reply to this session command, route it to the client */ ++m_replied_sescmd; @@ -504,8 +505,6 @@ void SchemaRouterSession::process_response(SBackend& bref, GWBUF** ppPacket) gwbuf_free(*ppPacket); *ppPacket = NULL; } - - bref->m_session_commands.pop_front(); } if (*ppPacket) @@ -572,7 +571,7 @@ void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb) pPacket = NULL; } - if (bref->execute_sescmd()) + if (bref->execute_session_command()) { MXS_INFO("Backend %s:%d processed reply and starts to execute active cursor.", bref->backend()->server->name, bref->backend()->server->port); @@ -732,7 +731,8 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command) if ((*it)->in_use()) { GWBUF *buffer = gwbuf_clone(querybuf); - (*it)->m_session_commands.push_back(SessionCommand(buffer, m_sent_sescmd)); + + (*it)->add_session_command(buffer, m_sent_sescmd); if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) { @@ -742,7 +742,7 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command) (*it)->backend()->server->port); } - if ((*it)->m_session_commands.size() == 1) + if ((*it)->session_command_count() == 1) { /** Only one command, execute it */ switch (command) @@ -757,7 +757,7 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command) break; } - if ((*it)->execute_sescmd()) + if ((*it)->execute_session_command()) { succp = true; } @@ -771,7 +771,7 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command) } else { - ss_dassert((*it)->m_session_commands.size() > 1); + ss_dassert((*it)->session_command_count() > 1); /** The server is already executing a session command */ MXS_INFO("Backend %s:%d already executing sescmd.", (*it)->backend()->server->name,