diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index a9418d9f3..91edc010c 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -320,7 +320,7 @@ bool RWSplitSession::route_single_stmt(GWBUF* querybuf) else if (target->has_session_commands()) { // We need to wait until the session commands are executed - m_query_queue = gwbuf_append(m_query_queue, gwbuf_clone(querybuf)); + m_query_queue.emplace_back(gwbuf_clone(querybuf)); MXS_INFO("Queuing query until '%s' completes session command", target->name()); } else diff --git a/server/modules/routing/readwritesplit/rwsplitsession.cc b/server/modules/routing/readwritesplit/rwsplitsession.cc index 02a9ff990..0fa0b9536 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.cc +++ b/server/modules/routing/readwritesplit/rwsplitsession.cc @@ -34,7 +34,6 @@ RWSplitSession::RWSplitSession(RWSplit* instance, , m_client(session->client_dcb) , m_sescmd_count(1) , m_expected_responses(0) - , m_query_queue(NULL) , m_router(instance) , m_sent_sescmd(0) , m_recv_sescmd(0) @@ -108,7 +107,6 @@ void close_all_connections(SRWBackendList& backends) void RWSplitSession::close() { - gwbuf_free(m_query_queue); close_all_connections(m_backends); m_current_query.reset(); @@ -138,11 +136,11 @@ int32_t RWSplitSession::routeQuery(GWBUF* querybuf) { MXS_INFO("New query received while transaction replay is active: %s", mxs::extract_sql(querybuf).c_str()); - m_query_queue = gwbuf_append(m_query_queue, querybuf); + m_query_queue.emplace_back(querybuf); return 1; } - if ((m_query_queue == NULL || GWBUF_IS_REPLAYED(querybuf)) + if ((m_query_queue.empty() || GWBUF_IS_REPLAYED(querybuf)) && (m_expected_responses == 0 || m_qc.load_data_state() == QueryClassifier::LOAD_DATA_ACTIVE || m_qc.large_query())) @@ -181,12 +179,12 @@ int32_t RWSplitSession::routeQuery(GWBUF* querybuf) * We are already processing a request from the client. Store the * new query and wait for the previous one to complete. */ - mxb_assert(m_expected_responses > 0 || m_query_queue); + mxb_assert(m_expected_responses > 0 || !m_query_queue.empty()); MXS_INFO("Storing query (len: %d cmd: %0x), expecting %d replies to current command", gwbuf_length(querybuf), GWBUF_DATA(querybuf)[4], m_expected_responses); - m_query_queue = gwbuf_append(m_query_queue, querybuf); + m_query_queue.emplace_back(querybuf); querybuf = NULL; rval = 1; @@ -221,38 +219,23 @@ bool RWSplitSession::route_stored_query() /** Loop over the stored statements as long as the routeQuery call doesn't * append more data to the queue. If it appends data to the queue, we need * to wait for a response before attempting another reroute */ - while (m_query_queue) + while (!m_query_queue.empty()) { MXS_INFO(">>> Routing stored queries"); - GWBUF* query_queue = modutil_get_next_MySQL_packet(&m_query_queue); - query_queue = gwbuf_make_contiguous(query_queue); - mxb_assert(query_queue); - - if (query_queue == NULL) - { - MXS_ALERT("Queued query unexpectedly empty. Bytes queued: %d Hexdump: ", - gwbuf_length(m_query_queue)); - gwbuf_hexdump(m_query_queue, LOG_ALERT); - return true; - } + auto query = std::move(m_query_queue.front()); + m_query_queue.pop_front(); /** Store the query queue locally for the duration of the routeQuery call. * This prevents recursive calls into this function. */ - GWBUF* temp_storage = m_query_queue; - m_query_queue = NULL; + decltype(m_query_queue) temp_storage; + temp_storage.swap(m_query_queue); // TODO: Move the handling of queued queries to the client protocol // TODO: module where the command tracking is done automatically. - uint8_t cmd = mxs_mysql_get_command(query_queue); + uint8_t cmd = mxs_mysql_get_command(query.get()); mysql_protocol_set_current_command(m_client, (mxs_mysql_cmd_t)cmd); - if (cmd == MXS_COM_QUERY || cmd == MXS_COM_STMT_PREPARE) - { - // The query needs to be explicitly parsed as it was processed multiple times - qc_parse(query_queue, QC_COLLECT_ALL); - } - - if (!routeQuery(query_queue)) + if (!routeQuery(query.release())) { rval = false; MXS_ERROR("Failed to route queued query."); @@ -260,17 +243,21 @@ bool RWSplitSession::route_stored_query() MXS_INFO("<<< Stored queries routed"); - if (m_query_queue == NULL) + if (m_query_queue.empty()) { /** Query successfully routed and no responses are expected */ - m_query_queue = temp_storage; + m_query_queue.swap(temp_storage); } else { - /** Routing was stopped, we need to wait for a response before retrying. - * temp_storage holds the tail end of the queue and query_queue contains the query we attempted - * to route. Append temp_storage to query_queue to keep the order of the queries correct. */ - m_query_queue = gwbuf_append(m_query_queue, temp_storage); + /** + * Routing was stopped, we need to wait for a response before retrying. + * temp_storage holds the tail end of the queue and m_query_queue contains the query we attempted + * to route. + */ + mxb_assert(m_query_queue.size() == 1); + temp_storage.push_front(std::move(m_query_queue.front())); + m_query_queue = std::move(temp_storage); break; } } @@ -468,7 +455,7 @@ void RWSplitSession::trx_replay_next_stmt() MXS_INFO("Resuming execution: %s", mxs::extract_sql(m_interrupted_query.get()).c_str()); retry_query(m_interrupted_query.release(), 0); } - else if (m_query_queue) + else if (!m_query_queue.empty()) { route_stored_query(); } @@ -754,7 +741,7 @@ void RWSplitSession::clientReply(GWBUF* writebuf, DCB* backend_dcb) m_expected_responses++; } } - else if (m_expected_responses == 0 && m_query_queue + else if (m_expected_responses == 0 && !m_query_queue.empty() && (!m_is_replay_active || processed_sescmd)) { /** diff --git a/server/modules/routing/readwritesplit/rwsplitsession.hh b/server/modules/routing/readwritesplit/rwsplitsession.hh index 8472b9deb..85ef72782 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.hh +++ b/server/modules/routing/readwritesplit/rwsplitsession.hh @@ -16,6 +16,7 @@ #include "trx.hh" #include +#include #include #include @@ -140,18 +141,17 @@ public: int m_nbackends; /**< Number of backend servers (obsolete) */ DCB* m_client; /**< The client DCB */ uint64_t m_sescmd_count; /**< Number of executed session commands (starts from 1) */ - int m_expected_responses; /**< Number of expected responses to the current - * query */ - GWBUF* m_query_queue; /**< Queued commands waiting to be executed */ + int m_expected_responses; /**< Number of expected responses to the current query */ + + std::deque m_query_queue; /**< Queued commands waiting to be executed */ RWSplit* m_router; /**< The router instance */ mxs::SessionCommandList m_sescmd_list; /**< List of executed session commands */ ResponseMap m_sescmd_responses; /**< Response to each session command */ SlaveResponseList m_slave_responses; /**< Slaves that replied before the master */ uint64_t m_sent_sescmd; /**< ID of the last sent session command*/ - uint64_t m_recv_sescmd; /**< ID of the most recently completed session - * command */ - ExecMap m_exec_map; /**< Map of COM_STMT_EXECUTE statement IDs to - * Backends */ + uint64_t m_recv_sescmd; /**< ID of the most recently completed session command */ + ExecMap m_exec_map; /**< Map of COM_STMT_EXECUTE statement IDs to Backends */ + std::string m_gtid_pos; /**< Gtid position for causal read */ wait_gtid_state m_wait_gtid; /**< State of MASTER_GTID_WAIT reply */ uint32_t m_next_seq; /**< Next packet's sequence number */