Backport query queue changes to 2.3
Backported the changes that convert the query queue in readwritesplit into a proper queue. This changes combines both 5e3198f8313b7bb33df386eb35986bfae1db94a3 and 6042a53cb31046b1100743723567906c5d8208e2 into one commit.
This commit is contained in:
parent
556c83f83a
commit
6421af1bb4
@ -320,7 +320,7 @@ bool RWSplitSession::route_single_stmt(GWBUF* querybuf)
|
||||
else if (target->has_session_commands())
|
||||
{
|
||||
// We need to wait until the session commands are executed
|
||||
m_query_queue = gwbuf_append(m_query_queue, gwbuf_clone(querybuf));
|
||||
m_query_queue.emplace_back(gwbuf_clone(querybuf));
|
||||
MXS_INFO("Queuing query until '%s' completes session command", target->name());
|
||||
}
|
||||
else
|
||||
|
@ -34,7 +34,6 @@ RWSplitSession::RWSplitSession(RWSplit* instance,
|
||||
, m_client(session->client_dcb)
|
||||
, m_sescmd_count(1)
|
||||
, m_expected_responses(0)
|
||||
, m_query_queue(NULL)
|
||||
, m_router(instance)
|
||||
, m_sent_sescmd(0)
|
||||
, m_recv_sescmd(0)
|
||||
@ -108,7 +107,6 @@ void close_all_connections(SRWBackendList& backends)
|
||||
|
||||
void RWSplitSession::close()
|
||||
{
|
||||
gwbuf_free(m_query_queue);
|
||||
close_all_connections(m_backends);
|
||||
m_current_query.reset();
|
||||
|
||||
@ -138,11 +136,11 @@ int32_t RWSplitSession::routeQuery(GWBUF* querybuf)
|
||||
{
|
||||
MXS_INFO("New query received while transaction replay is active: %s",
|
||||
mxs::extract_sql(querybuf).c_str());
|
||||
m_query_queue = gwbuf_append(m_query_queue, querybuf);
|
||||
m_query_queue.emplace_back(querybuf);
|
||||
return 1;
|
||||
}
|
||||
|
||||
if ((m_query_queue == NULL || GWBUF_IS_REPLAYED(querybuf))
|
||||
if ((m_query_queue.empty() || GWBUF_IS_REPLAYED(querybuf))
|
||||
&& (m_expected_responses == 0
|
||||
|| m_qc.load_data_state() == QueryClassifier::LOAD_DATA_ACTIVE
|
||||
|| m_qc.large_query()))
|
||||
@ -181,12 +179,12 @@ int32_t RWSplitSession::routeQuery(GWBUF* querybuf)
|
||||
* We are already processing a request from the client. Store the
|
||||
* new query and wait for the previous one to complete.
|
||||
*/
|
||||
mxb_assert(m_expected_responses > 0 || m_query_queue);
|
||||
mxb_assert(m_expected_responses > 0 || !m_query_queue.empty());
|
||||
MXS_INFO("Storing query (len: %d cmd: %0x), expecting %d replies to current command",
|
||||
gwbuf_length(querybuf),
|
||||
GWBUF_DATA(querybuf)[4],
|
||||
m_expected_responses);
|
||||
m_query_queue = gwbuf_append(m_query_queue, querybuf);
|
||||
m_query_queue.emplace_back(querybuf);
|
||||
querybuf = NULL;
|
||||
rval = 1;
|
||||
|
||||
@ -221,38 +219,23 @@ bool RWSplitSession::route_stored_query()
|
||||
/** Loop over the stored statements as long as the routeQuery call doesn't
|
||||
* append more data to the queue. If it appends data to the queue, we need
|
||||
* to wait for a response before attempting another reroute */
|
||||
while (m_query_queue)
|
||||
while (!m_query_queue.empty())
|
||||
{
|
||||
MXS_INFO(">>> Routing stored queries");
|
||||
GWBUF* query_queue = modutil_get_next_MySQL_packet(&m_query_queue);
|
||||
query_queue = gwbuf_make_contiguous(query_queue);
|
||||
mxb_assert(query_queue);
|
||||
|
||||
if (query_queue == NULL)
|
||||
{
|
||||
MXS_ALERT("Queued query unexpectedly empty. Bytes queued: %d Hexdump: ",
|
||||
gwbuf_length(m_query_queue));
|
||||
gwbuf_hexdump(m_query_queue, LOG_ALERT);
|
||||
return true;
|
||||
}
|
||||
auto query = std::move(m_query_queue.front());
|
||||
m_query_queue.pop_front();
|
||||
|
||||
/** Store the query queue locally for the duration of the routeQuery call.
|
||||
* This prevents recursive calls into this function. */
|
||||
GWBUF* temp_storage = m_query_queue;
|
||||
m_query_queue = NULL;
|
||||
decltype(m_query_queue) temp_storage;
|
||||
temp_storage.swap(m_query_queue);
|
||||
|
||||
// TODO: Move the handling of queued queries to the client protocol
|
||||
// TODO: module where the command tracking is done automatically.
|
||||
uint8_t cmd = mxs_mysql_get_command(query_queue);
|
||||
uint8_t cmd = mxs_mysql_get_command(query.get());
|
||||
mysql_protocol_set_current_command(m_client, (mxs_mysql_cmd_t)cmd);
|
||||
|
||||
if (cmd == MXS_COM_QUERY || cmd == MXS_COM_STMT_PREPARE)
|
||||
{
|
||||
// The query needs to be explicitly parsed as it was processed multiple times
|
||||
qc_parse(query_queue, QC_COLLECT_ALL);
|
||||
}
|
||||
|
||||
if (!routeQuery(query_queue))
|
||||
if (!routeQuery(query.release()))
|
||||
{
|
||||
rval = false;
|
||||
MXS_ERROR("Failed to route queued query.");
|
||||
@ -260,17 +243,21 @@ bool RWSplitSession::route_stored_query()
|
||||
|
||||
MXS_INFO("<<< Stored queries routed");
|
||||
|
||||
if (m_query_queue == NULL)
|
||||
if (m_query_queue.empty())
|
||||
{
|
||||
/** Query successfully routed and no responses are expected */
|
||||
m_query_queue = temp_storage;
|
||||
m_query_queue.swap(temp_storage);
|
||||
}
|
||||
else
|
||||
{
|
||||
/** Routing was stopped, we need to wait for a response before retrying.
|
||||
* temp_storage holds the tail end of the queue and query_queue contains the query we attempted
|
||||
* to route. Append temp_storage to query_queue to keep the order of the queries correct. */
|
||||
m_query_queue = gwbuf_append(m_query_queue, temp_storage);
|
||||
/**
|
||||
* Routing was stopped, we need to wait for a response before retrying.
|
||||
* temp_storage holds the tail end of the queue and m_query_queue contains the query we attempted
|
||||
* to route.
|
||||
*/
|
||||
mxb_assert(m_query_queue.size() == 1);
|
||||
temp_storage.push_front(std::move(m_query_queue.front()));
|
||||
m_query_queue = std::move(temp_storage);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -468,7 +455,7 @@ void RWSplitSession::trx_replay_next_stmt()
|
||||
MXS_INFO("Resuming execution: %s", mxs::extract_sql(m_interrupted_query.get()).c_str());
|
||||
retry_query(m_interrupted_query.release(), 0);
|
||||
}
|
||||
else if (m_query_queue)
|
||||
else if (!m_query_queue.empty())
|
||||
{
|
||||
route_stored_query();
|
||||
}
|
||||
@ -754,7 +741,7 @@ void RWSplitSession::clientReply(GWBUF* writebuf, DCB* backend_dcb)
|
||||
m_expected_responses++;
|
||||
}
|
||||
}
|
||||
else if (m_expected_responses == 0 && m_query_queue
|
||||
else if (m_expected_responses == 0 && !m_query_queue.empty()
|
||||
&& (!m_is_replay_active || processed_sescmd))
|
||||
{
|
||||
/**
|
||||
|
@ -16,6 +16,7 @@
|
||||
#include "trx.hh"
|
||||
|
||||
#include <string>
|
||||
#include <deque>
|
||||
|
||||
#include <maxscale/buffer.hh>
|
||||
#include <maxscale/modutil.h>
|
||||
@ -140,18 +141,17 @@ public:
|
||||
int m_nbackends; /**< Number of backend servers (obsolete) */
|
||||
DCB* m_client; /**< The client DCB */
|
||||
uint64_t m_sescmd_count; /**< Number of executed session commands (starts from 1) */
|
||||
int m_expected_responses; /**< Number of expected responses to the current
|
||||
* query */
|
||||
GWBUF* m_query_queue; /**< Queued commands waiting to be executed */
|
||||
int m_expected_responses; /**< Number of expected responses to the current query */
|
||||
|
||||
std::deque<mxs::Buffer> m_query_queue; /**< Queued commands waiting to be executed */
|
||||
RWSplit* m_router; /**< The router instance */
|
||||
mxs::SessionCommandList m_sescmd_list; /**< List of executed session commands */
|
||||
ResponseMap m_sescmd_responses; /**< Response to each session command */
|
||||
SlaveResponseList m_slave_responses; /**< Slaves that replied before the master */
|
||||
uint64_t m_sent_sescmd; /**< ID of the last sent session command*/
|
||||
uint64_t m_recv_sescmd; /**< ID of the most recently completed session
|
||||
* command */
|
||||
ExecMap m_exec_map; /**< Map of COM_STMT_EXECUTE statement IDs to
|
||||
* Backends */
|
||||
uint64_t m_recv_sescmd; /**< ID of the most recently completed session command */
|
||||
ExecMap m_exec_map; /**< Map of COM_STMT_EXECUTE statement IDs to Backends */
|
||||
|
||||
std::string m_gtid_pos; /**< Gtid position for causal read */
|
||||
wait_gtid_state m_wait_gtid; /**< State of MASTER_GTID_WAIT reply */
|
||||
uint32_t m_next_seq; /**< Next packet's sequence number */
|
||||
|
Loading…
x
Reference in New Issue
Block a user