From 5e3198f8313b7bb33df386eb35986bfae1db94a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Fri, 15 Mar 2019 14:37:34 +0200 Subject: [PATCH] Replace the plain GWBUF query queue with std::deque Using a std::deque to store the queries retains the exact state of the object thus removing the need to parse the query again. It also removes the need to split the queue into individual packets which makes the code cleaner. --- .../readwritesplit/rwsplit_route_stmt.cc | 2 +- .../routing/readwritesplit/rwsplitsession.cc | 43 ++++++++++--------- .../routing/readwritesplit/rwsplitsession.hh | 5 ++- 3 files changed, 26 insertions(+), 24 deletions(-) diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index ea982cdc2..a915b58ee 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -339,7 +339,7 @@ bool RWSplitSession::route_single_stmt(GWBUF* querybuf) else if (target->has_session_commands()) { // We need to wait until the session commands are executed - m_query_queue = gwbuf_append(m_query_queue, gwbuf_clone(querybuf)); + m_query_queue.push_back(gwbuf_clone(querybuf)); MXS_INFO("Queuing query until '%s' completes session command", target->name()); } else diff --git a/server/modules/routing/readwritesplit/rwsplitsession.cc b/server/modules/routing/readwritesplit/rwsplitsession.cc index 625655541..baea45a21 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.cc +++ b/server/modules/routing/readwritesplit/rwsplitsession.cc @@ -34,7 +34,6 @@ RWSplitSession::RWSplitSession(RWSplit* instance, MXS_SESSION* session, mxs::SRW , m_client(session->client_dcb) , m_sescmd_count(1) , m_expected_responses(0) - , m_query_queue(NULL) , m_router(instance) , m_sent_sescmd(0) , m_recv_sescmd(0) @@ -99,6 +98,7 @@ void close_all_connections(PRWBackends& backends) void RWSplitSession::close() { + std::for_each(m_query_queue.begin(), m_query_queue.end(), gwbuf_free); close_all_connections(m_raw_backends); m_current_query.reset(); @@ -120,6 +120,7 @@ void RWSplitSession::close() int32_t RWSplitSession::routeQuery(GWBUF* querybuf) { + mxb_assert(GWBUF_IS_CONTIGUOUS(querybuf)); int rval = 0; if (m_is_replay_active && !GWBUF_IS_REPLAYED(querybuf)) @@ -148,12 +149,12 @@ int32_t RWSplitSession::routeQuery(GWBUF* querybuf) else { // Already busy executing a query, put the query in a queue and route it later - mxb_assert(m_expected_responses > 0 || m_query_queue); + mxb_assert(m_expected_responses > 0 || !m_query_queue.empty()); MXS_INFO("Storing query (len: %d cmd: %0x), expecting %d replies to current command: %s", gwbuf_length(querybuf), GWBUF_DATA(querybuf)[4], m_expected_responses, mxs::extract_sql(querybuf, 1024).c_str()); - m_query_queue = gwbuf_append(m_query_queue, querybuf); + m_query_queue.push_back(querybuf); querybuf = NULL; rval = 1; @@ -188,37 +189,36 @@ bool RWSplitSession::route_stored_query() /** Loop over the stored statements as long as the routeQuery call doesn't * append more data to the queue. If it appends data to the queue, we need * to wait for a response before attempting another reroute */ - while (m_query_queue) + while (!m_query_queue.empty()) { MXS_INFO(">>> Routing stored queries"); - GWBUF* query_queue = modutil_get_next_MySQL_packet(&m_query_queue); - query_queue = gwbuf_make_contiguous(query_queue); + + GWBUF* query_queue = m_query_queue.front(); + m_query_queue.pop_front(); + mxb_assert(query_queue); + mxb_assert_message(modutil_count_packets(query_queue) == 1, "Buffer must contain only one packet"); if (query_queue == NULL) { - MXS_ALERT("Queued query unexpectedly empty. Bytes queued: %d Hexdump: ", - gwbuf_length(m_query_queue)); - gwbuf_hexdump(m_query_queue, LOG_ALERT); + MXS_ALERT("Queued query unexpectedly empty, dumping query queue contents"); + for (auto& a : m_query_queue) + { + gwbuf_hexdump(a, LOG_ALERT); + } return true; } /** Store the query queue locally for the duration of the routeQuery call. * This prevents recursive calls into this function. */ - GWBUF* temp_storage = m_query_queue; - m_query_queue = NULL; + decltype(m_query_queue) temp_storage; + temp_storage.swap(m_query_queue); // TODO: Move the handling of queued queries to the client protocol // TODO: module where the command tracking is done automatically. uint8_t cmd = mxs_mysql_get_command(query_queue); mysql_protocol_set_current_command(m_client, (mxs_mysql_cmd_t)cmd); - if (cmd == MXS_COM_QUERY || cmd == MXS_COM_STMT_PREPARE) - { - // The query needs to be explicitly parsed as it was processed multiple times - qc_parse(query_queue, QC_COLLECT_ALL); - } - if (!routeQuery(query_queue)) { rval = false; @@ -227,15 +227,16 @@ bool RWSplitSession::route_stored_query() MXS_INFO("<<< Stored queries routed"); - if (m_query_queue == NULL) + if (m_query_queue.empty()) { /** Query successfully routed and no responses are expected */ - m_query_queue = temp_storage; + m_query_queue.swap(temp_storage); } else { /** Routing was stopped, we need to wait for a response before retrying */ - m_query_queue = gwbuf_append(temp_storage, m_query_queue); + std::copy(m_query_queue.begin(), m_query_queue.end(), std::back_inserter(temp_storage)); + m_query_queue = std::move(temp_storage); break; } } @@ -724,7 +725,7 @@ void RWSplitSession::clientReply(GWBUF* writebuf, DCB* backend_dcb) m_expected_responses++; } } - else if (m_expected_responses == 0 && m_query_queue + else if (m_expected_responses == 0 && !m_query_queue.empty() && (!m_is_replay_active || processed_sescmd)) { /** diff --git a/server/modules/routing/readwritesplit/rwsplitsession.hh b/server/modules/routing/readwritesplit/rwsplitsession.hh index 5b2df4f54..eb58f7b2a 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.hh +++ b/server/modules/routing/readwritesplit/rwsplitsession.hh @@ -16,6 +16,7 @@ #include "trx.hh" #include +#include #include #include @@ -269,7 +270,7 @@ private: inline bool can_route_queries() const { - return m_query_queue == NULL + return m_query_queue.empty() && (m_expected_responses == 0 || m_qc.load_data_state() == mxs::QueryClassifier::LOAD_DATA_ACTIVE || m_qc.large_query()); @@ -319,7 +320,7 @@ private: uint64_t m_sescmd_count; /**< Number of executed session commands (starts from 1) */ int m_expected_responses; /**< Number of expected responses to the current * query */ - GWBUF* m_query_queue; /**< Queued commands waiting to be executed */ + std::deque m_query_queue; /**< Queued commands waiting to be executed */ RWSplit* m_router; /**< The router instance */ mxs::SessionCommandList m_sescmd_list; /**< List of executed session commands */ ResponseMap m_sescmd_responses; /**< Response to each session command */