Replace raw GWBUF pointers with mxs::Buffer
Now that the query queue is stored in an actual container, it is only logical to use mxs::Buffer instead of GWBUF as the stored type.
This commit is contained in:
@ -24,6 +24,11 @@
|
|||||||
|
|
||||||
class SERVER;
|
class SERVER;
|
||||||
|
|
||||||
|
namespace maxscale
|
||||||
|
{
|
||||||
|
class Buffer;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Buffer properties - used to store properties related to the buffer
|
* Buffer properties - used to store properties related to the buffer
|
||||||
* contents. This may be added at any point during the processing of the
|
* contents. This may be added at any point during the processing of the
|
||||||
@ -404,6 +409,7 @@ extern void dprintAllBuffers(void* pdcb);
|
|||||||
* @param log_level Log priority where the message is written
|
* @param log_level Log priority where the message is written
|
||||||
*/
|
*/
|
||||||
void gwbuf_hexdump(GWBUF* buffer, int log_level);
|
void gwbuf_hexdump(GWBUF* buffer, int log_level);
|
||||||
|
void gwbuf_hexdump(const mxs::Buffer& buffer, int log_level);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return pointer of the byte at offset from start of chained buffer
|
* Return pointer of the byte at offset from start of chained buffer
|
||||||
@ -1043,6 +1049,16 @@ public:
|
|||||||
return gwbuf_length(m_pBuffer);
|
return gwbuf_length(m_pBuffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether the buffer is empty.
|
||||||
|
*
|
||||||
|
* @return True if the buffer is empty
|
||||||
|
*/
|
||||||
|
bool empty() const
|
||||||
|
{
|
||||||
|
return m_pBuffer == nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Whether the buffer is contiguous.
|
* Whether the buffer is contiguous.
|
||||||
*
|
*
|
||||||
|
@ -783,3 +783,8 @@ void gwbuf_hexdump(GWBUF* buffer, int log_level)
|
|||||||
|
|
||||||
MXS_LOG_MESSAGE(log_level, "%.*s", n, ss.str().c_str());
|
MXS_LOG_MESSAGE(log_level, "%.*s", n, ss.str().c_str());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void gwbuf_hexdump(const mxs::Buffer& buffer, int log_level)
|
||||||
|
{
|
||||||
|
return gwbuf_hexdump(const_cast<mxs::Buffer&>(buffer).get(), log_level);
|
||||||
|
}
|
||||||
|
@ -339,7 +339,7 @@ bool RWSplitSession::route_single_stmt(GWBUF* querybuf)
|
|||||||
else if (target->has_session_commands())
|
else if (target->has_session_commands())
|
||||||
{
|
{
|
||||||
// We need to wait until the session commands are executed
|
// We need to wait until the session commands are executed
|
||||||
m_query_queue.push_back(gwbuf_clone(querybuf));
|
m_query_queue.emplace_back(gwbuf_clone(querybuf));
|
||||||
MXS_INFO("Queuing query until '%s' completes session command", target->name());
|
MXS_INFO("Queuing query until '%s' completes session command", target->name());
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -98,7 +98,6 @@ void close_all_connections(PRWBackends& backends)
|
|||||||
|
|
||||||
void RWSplitSession::close()
|
void RWSplitSession::close()
|
||||||
{
|
{
|
||||||
std::for_each(m_query_queue.begin(), m_query_queue.end(), gwbuf_free);
|
|
||||||
close_all_connections(m_raw_backends);
|
close_all_connections(m_raw_backends);
|
||||||
m_current_query.reset();
|
m_current_query.reset();
|
||||||
|
|
||||||
@ -154,7 +153,7 @@ int32_t RWSplitSession::routeQuery(GWBUF* querybuf)
|
|||||||
gwbuf_length(querybuf), GWBUF_DATA(querybuf)[4], m_expected_responses,
|
gwbuf_length(querybuf), GWBUF_DATA(querybuf)[4], m_expected_responses,
|
||||||
mxs::extract_sql(querybuf, 1024).c_str());
|
mxs::extract_sql(querybuf, 1024).c_str());
|
||||||
|
|
||||||
m_query_queue.push_back(querybuf);
|
m_query_queue.emplace_back(querybuf);
|
||||||
querybuf = NULL;
|
querybuf = NULL;
|
||||||
rval = 1;
|
rval = 1;
|
||||||
|
|
||||||
@ -193,13 +192,13 @@ bool RWSplitSession::route_stored_query()
|
|||||||
{
|
{
|
||||||
MXS_INFO(">>> Routing stored queries");
|
MXS_INFO(">>> Routing stored queries");
|
||||||
|
|
||||||
GWBUF* query_queue = m_query_queue.front();
|
auto query = std::move(m_query_queue.front());
|
||||||
m_query_queue.pop_front();
|
m_query_queue.pop_front();
|
||||||
|
|
||||||
mxb_assert(query_queue);
|
mxb_assert(!query.empty());
|
||||||
mxb_assert_message(modutil_count_packets(query_queue) == 1, "Buffer must contain only one packet");
|
mxb_assert_message(modutil_count_packets(query.get()) == 1, "Buffer must contain only one packet");
|
||||||
|
|
||||||
if (query_queue == NULL)
|
if (query.empty())
|
||||||
{
|
{
|
||||||
MXS_ALERT("Queued query unexpectedly empty, dumping query queue contents");
|
MXS_ALERT("Queued query unexpectedly empty, dumping query queue contents");
|
||||||
for (auto& a : m_query_queue)
|
for (auto& a : m_query_queue)
|
||||||
@ -216,10 +215,10 @@ bool RWSplitSession::route_stored_query()
|
|||||||
|
|
||||||
// TODO: Move the handling of queued queries to the client protocol
|
// TODO: Move the handling of queued queries to the client protocol
|
||||||
// TODO: module where the command tracking is done automatically.
|
// TODO: module where the command tracking is done automatically.
|
||||||
uint8_t cmd = mxs_mysql_get_command(query_queue);
|
uint8_t cmd = mxs_mysql_get_command(query.get());
|
||||||
mysql_protocol_set_current_command(m_client, (mxs_mysql_cmd_t)cmd);
|
mysql_protocol_set_current_command(m_client, (mxs_mysql_cmd_t)cmd);
|
||||||
|
|
||||||
if (!routeQuery(query_queue))
|
if (!routeQuery(query.release()))
|
||||||
{
|
{
|
||||||
rval = false;
|
rval = false;
|
||||||
MXS_ERROR("Failed to route queued query.");
|
MXS_ERROR("Failed to route queued query.");
|
||||||
@ -235,7 +234,7 @@ bool RWSplitSession::route_stored_query()
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
/** Routing was stopped, we need to wait for a response before retrying */
|
/** Routing was stopped, we need to wait for a response before retrying */
|
||||||
std::copy(m_query_queue.begin(), m_query_queue.end(), std::back_inserter(temp_storage));
|
temp_storage.push_front(std::move(m_query_queue.front()));
|
||||||
m_query_queue = std::move(temp_storage);
|
m_query_queue = std::move(temp_storage);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -320,7 +320,7 @@ private:
|
|||||||
uint64_t m_sescmd_count; /**< Number of executed session commands (starts from 1) */
|
uint64_t m_sescmd_count; /**< Number of executed session commands (starts from 1) */
|
||||||
int m_expected_responses; /**< Number of expected responses to the current
|
int m_expected_responses; /**< Number of expected responses to the current
|
||||||
* query */
|
* query */
|
||||||
std::deque<GWBUF*> m_query_queue; /**< Queued commands waiting to be executed */
|
std::deque<mxs::Buffer> m_query_queue; /**< Queued commands waiting to be executed */
|
||||||
RWSplit* m_router; /**< The router instance */
|
RWSplit* m_router; /**< The router instance */
|
||||||
mxs::SessionCommandList m_sescmd_list; /**< List of executed session commands */
|
mxs::SessionCommandList m_sescmd_list; /**< List of executed session commands */
|
||||||
ResponseMap m_sescmd_responses; /**< Response to each session command */
|
ResponseMap m_sescmd_responses; /**< Response to each session command */
|
||||||
|
Reference in New Issue
Block a user