Replace the plain GWBUF query queue with std::deque

Using a std::deque to store the queries retains the exact state of the
object thus removing the need to parse the query again. It also removes
the need to split the queue into individual packets which makes the code
cleaner.
This commit is contained in:
Markus Mäkelä
2019-03-15 14:37:34 +02:00
parent 0001babd26
commit 5e3198f831
3 changed files with 26 additions and 24 deletions

View File

@ -339,7 +339,7 @@ bool RWSplitSession::route_single_stmt(GWBUF* querybuf)
else if (target->has_session_commands())
{
// We need to wait until the session commands are executed
m_query_queue = gwbuf_append(m_query_queue, gwbuf_clone(querybuf));
m_query_queue.push_back(gwbuf_clone(querybuf));
MXS_INFO("Queuing query until '%s' completes session command", target->name());
}
else

View File

@ -34,7 +34,6 @@ RWSplitSession::RWSplitSession(RWSplit* instance, MXS_SESSION* session, mxs::SRW
, m_client(session->client_dcb)
, m_sescmd_count(1)
, m_expected_responses(0)
, m_query_queue(NULL)
, m_router(instance)
, m_sent_sescmd(0)
, m_recv_sescmd(0)
@ -99,6 +98,7 @@ void close_all_connections(PRWBackends& backends)
void RWSplitSession::close()
{
std::for_each(m_query_queue.begin(), m_query_queue.end(), gwbuf_free);
close_all_connections(m_raw_backends);
m_current_query.reset();
@ -120,6 +120,7 @@ void RWSplitSession::close()
int32_t RWSplitSession::routeQuery(GWBUF* querybuf)
{
mxb_assert(GWBUF_IS_CONTIGUOUS(querybuf));
int rval = 0;
if (m_is_replay_active && !GWBUF_IS_REPLAYED(querybuf))
@ -148,12 +149,12 @@ int32_t RWSplitSession::routeQuery(GWBUF* querybuf)
else
{
// Already busy executing a query, put the query in a queue and route it later
mxb_assert(m_expected_responses > 0 || m_query_queue);
mxb_assert(m_expected_responses > 0 || !m_query_queue.empty());
MXS_INFO("Storing query (len: %d cmd: %0x), expecting %d replies to current command: %s",
gwbuf_length(querybuf), GWBUF_DATA(querybuf)[4], m_expected_responses,
mxs::extract_sql(querybuf, 1024).c_str());
m_query_queue = gwbuf_append(m_query_queue, querybuf);
m_query_queue.push_back(querybuf);
querybuf = NULL;
rval = 1;
@ -188,37 +189,36 @@ bool RWSplitSession::route_stored_query()
/** Loop over the stored statements as long as the routeQuery call doesn't
* append more data to the queue. If it appends data to the queue, we need
* to wait for a response before attempting another reroute */
while (m_query_queue)
while (!m_query_queue.empty())
{
MXS_INFO(">>> Routing stored queries");
GWBUF* query_queue = modutil_get_next_MySQL_packet(&m_query_queue);
query_queue = gwbuf_make_contiguous(query_queue);
GWBUF* query_queue = m_query_queue.front();
m_query_queue.pop_front();
mxb_assert(query_queue);
mxb_assert_message(modutil_count_packets(query_queue) == 1, "Buffer must contain only one packet");
if (query_queue == NULL)
{
MXS_ALERT("Queued query unexpectedly empty. Bytes queued: %d Hexdump: ",
gwbuf_length(m_query_queue));
gwbuf_hexdump(m_query_queue, LOG_ALERT);
MXS_ALERT("Queued query unexpectedly empty, dumping query queue contents");
for (auto& a : m_query_queue)
{
gwbuf_hexdump(a, LOG_ALERT);
}
return true;
}
/** Store the query queue locally for the duration of the routeQuery call.
* This prevents recursive calls into this function. */
GWBUF* temp_storage = m_query_queue;
m_query_queue = NULL;
decltype(m_query_queue) temp_storage;
temp_storage.swap(m_query_queue);
// TODO: Move the handling of queued queries to the client protocol
// TODO: module where the command tracking is done automatically.
uint8_t cmd = mxs_mysql_get_command(query_queue);
mysql_protocol_set_current_command(m_client, (mxs_mysql_cmd_t)cmd);
if (cmd == MXS_COM_QUERY || cmd == MXS_COM_STMT_PREPARE)
{
// The query needs to be explicitly parsed as it was processed multiple times
qc_parse(query_queue, QC_COLLECT_ALL);
}
if (!routeQuery(query_queue))
{
rval = false;
@ -227,15 +227,16 @@ bool RWSplitSession::route_stored_query()
MXS_INFO("<<< Stored queries routed");
if (m_query_queue == NULL)
if (m_query_queue.empty())
{
/** Query successfully routed and no responses are expected */
m_query_queue = temp_storage;
m_query_queue.swap(temp_storage);
}
else
{
/** Routing was stopped, we need to wait for a response before retrying */
m_query_queue = gwbuf_append(temp_storage, m_query_queue);
std::copy(m_query_queue.begin(), m_query_queue.end(), std::back_inserter(temp_storage));
m_query_queue = std::move(temp_storage);
break;
}
}
@ -724,7 +725,7 @@ void RWSplitSession::clientReply(GWBUF* writebuf, DCB* backend_dcb)
m_expected_responses++;
}
}
else if (m_expected_responses == 0 && m_query_queue
else if (m_expected_responses == 0 && !m_query_queue.empty()
&& (!m_is_replay_active || processed_sescmd))
{
/**

View File

@ -16,6 +16,7 @@
#include "trx.hh"
#include <string>
#include <deque>
#include <maxscale/buffer.hh>
#include <maxscale/modutil.hh>
@ -269,7 +270,7 @@ private:
inline bool can_route_queries() const
{
return m_query_queue == NULL
return m_query_queue.empty()
&& (m_expected_responses == 0
|| m_qc.load_data_state() == mxs::QueryClassifier::LOAD_DATA_ACTIVE
|| m_qc.large_query());
@ -319,7 +320,7 @@ private:
uint64_t m_sescmd_count; /**< Number of executed session commands (starts from 1) */
int m_expected_responses; /**< Number of expected responses to the current
* query */
GWBUF* m_query_queue; /**< Queued commands waiting to be executed */
std::deque<GWBUF*> m_query_queue; /**< Queued commands waiting to be executed */
RWSplit* m_router; /**< The router instance */
mxs::SessionCommandList m_sescmd_list; /**< List of executed session commands */
ResponseMap m_sescmd_responses; /**< Response to each session command */