diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index ea982cdc2..a915b58ee 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -339,7 +339,7 @@ bool RWSplitSession::route_single_stmt(GWBUF* querybuf) else if (target->has_session_commands()) { // We need to wait until the session commands are executed - m_query_queue = gwbuf_append(m_query_queue, gwbuf_clone(querybuf)); + m_query_queue.push_back(gwbuf_clone(querybuf)); MXS_INFO("Queuing query until '%s' completes session command", target->name()); } else diff --git a/server/modules/routing/readwritesplit/rwsplitsession.cc b/server/modules/routing/readwritesplit/rwsplitsession.cc index 625655541..baea45a21 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.cc +++ b/server/modules/routing/readwritesplit/rwsplitsession.cc @@ -34,7 +34,6 @@ RWSplitSession::RWSplitSession(RWSplit* instance, MXS_SESSION* session, mxs::SRW , m_client(session->client_dcb) , m_sescmd_count(1) , m_expected_responses(0) - , m_query_queue(NULL) , m_router(instance) , m_sent_sescmd(0) , m_recv_sescmd(0) @@ -99,6 +98,7 @@ void close_all_connections(PRWBackends& backends) void RWSplitSession::close() { + std::for_each(m_query_queue.begin(), m_query_queue.end(), gwbuf_free); close_all_connections(m_raw_backends); m_current_query.reset(); @@ -120,6 +120,7 @@ void RWSplitSession::close() int32_t RWSplitSession::routeQuery(GWBUF* querybuf) { + mxb_assert(GWBUF_IS_CONTIGUOUS(querybuf)); int rval = 0; if (m_is_replay_active && !GWBUF_IS_REPLAYED(querybuf)) @@ -148,12 +149,12 @@ int32_t RWSplitSession::routeQuery(GWBUF* querybuf) else { // Already busy executing a query, put the query in a queue and route it later - mxb_assert(m_expected_responses > 0 || m_query_queue); + mxb_assert(m_expected_responses > 0 || !m_query_queue.empty()); MXS_INFO("Storing query (len: %d cmd: %0x), expecting %d replies to current command: %s", gwbuf_length(querybuf), GWBUF_DATA(querybuf)[4], m_expected_responses, mxs::extract_sql(querybuf, 1024).c_str()); - m_query_queue = gwbuf_append(m_query_queue, querybuf); + m_query_queue.push_back(querybuf); querybuf = NULL; rval = 1; @@ -188,37 +189,36 @@ bool RWSplitSession::route_stored_query() /** Loop over the stored statements as long as the routeQuery call doesn't * append more data to the queue. If it appends data to the queue, we need * to wait for a response before attempting another reroute */ - while (m_query_queue) + while (!m_query_queue.empty()) { MXS_INFO(">>> Routing stored queries"); - GWBUF* query_queue = modutil_get_next_MySQL_packet(&m_query_queue); - query_queue = gwbuf_make_contiguous(query_queue); + + GWBUF* query_queue = m_query_queue.front(); + m_query_queue.pop_front(); + mxb_assert(query_queue); + mxb_assert_message(modutil_count_packets(query_queue) == 1, "Buffer must contain only one packet"); if (query_queue == NULL) { - MXS_ALERT("Queued query unexpectedly empty. Bytes queued: %d Hexdump: ", - gwbuf_length(m_query_queue)); - gwbuf_hexdump(m_query_queue, LOG_ALERT); + MXS_ALERT("Queued query unexpectedly empty, dumping query queue contents"); + for (auto& a : m_query_queue) + { + gwbuf_hexdump(a, LOG_ALERT); + } return true; } /** Store the query queue locally for the duration of the routeQuery call. * This prevents recursive calls into this function. */ - GWBUF* temp_storage = m_query_queue; - m_query_queue = NULL; + decltype(m_query_queue) temp_storage; + temp_storage.swap(m_query_queue); // TODO: Move the handling of queued queries to the client protocol // TODO: module where the command tracking is done automatically. uint8_t cmd = mxs_mysql_get_command(query_queue); mysql_protocol_set_current_command(m_client, (mxs_mysql_cmd_t)cmd); - if (cmd == MXS_COM_QUERY || cmd == MXS_COM_STMT_PREPARE) - { - // The query needs to be explicitly parsed as it was processed multiple times - qc_parse(query_queue, QC_COLLECT_ALL); - } - if (!routeQuery(query_queue)) { rval = false; @@ -227,15 +227,16 @@ bool RWSplitSession::route_stored_query() MXS_INFO("<<< Stored queries routed"); - if (m_query_queue == NULL) + if (m_query_queue.empty()) { /** Query successfully routed and no responses are expected */ - m_query_queue = temp_storage; + m_query_queue.swap(temp_storage); } else { /** Routing was stopped, we need to wait for a response before retrying */ - m_query_queue = gwbuf_append(temp_storage, m_query_queue); + std::copy(m_query_queue.begin(), m_query_queue.end(), std::back_inserter(temp_storage)); + m_query_queue = std::move(temp_storage); break; } } @@ -724,7 +725,7 @@ void RWSplitSession::clientReply(GWBUF* writebuf, DCB* backend_dcb) m_expected_responses++; } } - else if (m_expected_responses == 0 && m_query_queue + else if (m_expected_responses == 0 && !m_query_queue.empty() && (!m_is_replay_active || processed_sescmd)) { /** diff --git a/server/modules/routing/readwritesplit/rwsplitsession.hh b/server/modules/routing/readwritesplit/rwsplitsession.hh index 5b2df4f54..eb58f7b2a 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.hh +++ b/server/modules/routing/readwritesplit/rwsplitsession.hh @@ -16,6 +16,7 @@ #include "trx.hh" #include +#include #include #include @@ -269,7 +270,7 @@ private: inline bool can_route_queries() const { - return m_query_queue == NULL + return m_query_queue.empty() && (m_expected_responses == 0 || m_qc.load_data_state() == mxs::QueryClassifier::LOAD_DATA_ACTIVE || m_qc.large_query()); @@ -319,7 +320,7 @@ private: uint64_t m_sescmd_count; /**< Number of executed session commands (starts from 1) */ int m_expected_responses; /**< Number of expected responses to the current * query */ - GWBUF* m_query_queue; /**< Queued commands waiting to be executed */ + std::deque m_query_queue; /**< Queued commands waiting to be executed */ RWSplit* m_router; /**< The router instance */ mxs::SessionCommandList m_sescmd_list; /**< List of executed session commands */ ResponseMap m_sescmd_responses; /**< Response to each session command */