MXS-2555 Fix a few TODOs and other cleanup
This commit is contained in:
@ -17,11 +17,6 @@
|
|||||||
#include <maxscale/modutil.hh>
|
#include <maxscale/modutil.hh>
|
||||||
#include <maxsql/mysql_plus.hh>
|
#include <maxsql/mysql_plus.hh>
|
||||||
|
|
||||||
// TODO, missing error handling. I did not add overly many asserts, which make reading code harder.
|
|
||||||
// But please note any that may be missing.
|
|
||||||
|
|
||||||
// TODO, for m_qc.target_is_all(), check that responses from all routers match.
|
|
||||||
|
|
||||||
// COPY-PASTED error-extraction functions from rwsplit. TODO move to lib.
|
// COPY-PASTED error-extraction functions from rwsplit. TODO move to lib.
|
||||||
inline void extract_error_state(uint8_t* pBuffer, uint8_t** ppState, uint16_t* pnState)
|
inline void extract_error_state(uint8_t* pBuffer, uint8_t** ppState, uint16_t* pnState)
|
||||||
{
|
{
|
||||||
@ -74,6 +69,54 @@ std::string extract_error(GWBUF* buffer)
|
|||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO Another thing to move to maxutils
|
||||||
|
static std::array<const char*, 7> size_suffix {"B", "KB", "MB", "GB", "TB", "PB", "EB"};
|
||||||
|
constexpr size_t KiloByte {1024};
|
||||||
|
|
||||||
|
/** return a pair {double_value, suffix} for humanizing a size.
|
||||||
|
* Example: pretty_size_split(2000) => {1.953125, "KB"}
|
||||||
|
*/
|
||||||
|
std::pair<double, const char*> pretty_size_split(size_t sz)
|
||||||
|
{
|
||||||
|
double dsize = sz;
|
||||||
|
size_t i {0};
|
||||||
|
|
||||||
|
for (; i < size_suffix.size() && dsize >= KiloByte; ++i, dsize /= KiloByte)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
return {dsize, size_suffix[i]};
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Pretty string from a size_t, e.g pretty_size(2000) => "1.95KB"
|
||||||
|
*/
|
||||||
|
std::string pretty_size(size_t sz, const char* separator = "")
|
||||||
|
{
|
||||||
|
char buf[64];
|
||||||
|
double dsize;
|
||||||
|
const char* suffix;
|
||||||
|
|
||||||
|
std::tie(dsize, suffix) = pretty_size_split(sz);
|
||||||
|
|
||||||
|
// format with two decimals
|
||||||
|
auto len = std::sprintf(buf, "%.2f", dsize);
|
||||||
|
|
||||||
|
// remove trailing 0-decimals
|
||||||
|
char* ptr = buf + len - 1;
|
||||||
|
for (; *ptr == '0'; --ptr)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
if (*ptr != '.')
|
||||||
|
{
|
||||||
|
++ptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
// append suffix
|
||||||
|
sprintf(ptr, "%s%s", separator, suffix);
|
||||||
|
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
SmartRouterSession::SmartRouterSession(SmartRouter* pRouter,
|
SmartRouterSession::SmartRouterSession(SmartRouter* pRouter,
|
||||||
MXS_SESSION* pSession,
|
MXS_SESSION* pSession,
|
||||||
Clusters clusters)
|
Clusters clusters)
|
||||||
@ -158,9 +201,11 @@ int SmartRouterSession::routeQuery(GWBUF* pBuf)
|
|||||||
{
|
{
|
||||||
bool ret = false;
|
bool ret = false;
|
||||||
|
|
||||||
|
MXS_SDEBUG("routeQuery() buffer size " << pretty_size(gwbuf_length(pBuf)));
|
||||||
|
|
||||||
if (expecting_request_packets())
|
if (expecting_request_packets())
|
||||||
{
|
{
|
||||||
write_split_packets(pBuf);
|
ret = write_split_packets(pBuf);
|
||||||
if (all_clusters_are_idle())
|
if (all_clusters_are_idle())
|
||||||
{
|
{
|
||||||
m_mode = Mode::Idle;
|
m_mode = Mode::Idle;
|
||||||
@ -169,16 +214,15 @@ int SmartRouterSession::routeQuery(GWBUF* pBuf)
|
|||||||
else if (m_mode != Mode::Idle)
|
else if (m_mode != Mode::Idle)
|
||||||
{
|
{
|
||||||
auto is_busy = !all_clusters_are_idle();
|
auto is_busy = !all_clusters_are_idle();
|
||||||
// TODO add more detail, operator<< to PacketRouter.
|
|
||||||
MXS_SERROR("routeQuery() in wrong state. clusters busy = " << std::boolalpha << is_busy);
|
MXS_SERROR("routeQuery() in wrong state. clusters busy = " << std::boolalpha << is_busy);
|
||||||
mxb_assert(false);
|
mxb_assert(false);
|
||||||
ret = false;
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
auto route_info = m_qc.update_route_info(mxs::QueryClassifier::CURRENT_TARGET_UNDEFINED, pBuf);
|
auto route_info = m_qc.update_route_info(mxs::QueryClassifier::CURRENT_TARGET_UNDEFINED, pBuf);
|
||||||
|
std::string canonical = maxscale::get_canonical(pBuf);
|
||||||
|
|
||||||
m_measurement = {maxbase::Clock::now(), maxscale::get_canonical(pBuf)};
|
m_measurement = {maxbase::Clock::now(), canonical};
|
||||||
|
|
||||||
if (m_qc.target_is_all(route_info.target()))
|
if (m_qc.target_is_all(route_info.target()))
|
||||||
{
|
{
|
||||||
@ -192,13 +236,12 @@ int SmartRouterSession::routeQuery(GWBUF* pBuf)
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
std::string canonical = maxscale::get_canonical(pBuf);
|
|
||||||
auto perf = m_router.perf_find(canonical);
|
auto perf = m_router.perf_find(canonical);
|
||||||
|
|
||||||
if (perf.is_valid())
|
if (perf.is_valid())
|
||||||
{
|
{
|
||||||
MXS_SDEBUG("Route to " << perf.host() << " based on performance, canonical = "
|
MXS_SDEBUG("Smart route to " << perf.host()
|
||||||
<< show_some(canonical));
|
<< ", canonical = " << show_some(canonical));
|
||||||
ret = write_to_host(perf.host(), pBuf);
|
ret = write_to_host(perf.host(), pBuf);
|
||||||
}
|
}
|
||||||
else if (modutil_is_SQL(pBuf))
|
else if (modutil_is_SQL(pBuf))
|
||||||
@ -219,7 +262,7 @@ int SmartRouterSession::routeQuery(GWBUF* pBuf)
|
|||||||
|
|
||||||
void SmartRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
|
void SmartRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
|
||||||
{
|
{
|
||||||
mxb_assert(GWBUF_IS_CONTIGUOUS(pPacket)); // TODO, do non-contiguous for slightly better speed?
|
mxb_assert(GWBUF_IS_CONTIGUOUS(pPacket));
|
||||||
|
|
||||||
auto it = std::find_if(begin(m_clusters), end(m_clusters),
|
auto it = std::find_if(begin(m_clusters), end(m_clusters),
|
||||||
[pDcb](const Cluster& cluster) {
|
[pDcb](const Cluster& cluster) {
|
||||||
@ -251,7 +294,7 @@ void SmartRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
|
|||||||
|
|
||||||
// marker1: If a connection is lost down the pipeline, we first get an ErrorPacket, then a call to
|
// marker1: If a connection is lost down the pipeline, we first get an ErrorPacket, then a call to
|
||||||
// handleError(). If we only rely on the handleError() the client receiving the ErrorPacket
|
// handleError(). If we only rely on the handleError() the client receiving the ErrorPacket
|
||||||
// can retry using this connection/session, causing a an error (or assert) in routeQuery().
|
// could retry using this connection/session, causing an error (or assert) in routeQuery().
|
||||||
// This will change once we implement direct function calls to the Clusters (which really
|
// This will change once we implement direct function calls to the Clusters (which really
|
||||||
// are routers).
|
// are routers).
|
||||||
if (cluster.tracker.state() == maxsql::PacketTracker::State::ErrorPacket)
|
if (cluster.tracker.state() == maxsql::PacketTracker::State::ErrorPacket)
|
||||||
@ -260,9 +303,9 @@ void SmartRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
|
|||||||
switch (err_code)
|
switch (err_code)
|
||||||
{
|
{
|
||||||
case ER_CONNECTION_KILLED: // there might be more error codes needing to be caught here
|
case ER_CONNECTION_KILLED: // there might be more error codes needing to be caught here
|
||||||
MXS_SERROR("clientReply(): Lost connection to "
|
MXS_SERROR("clientReply(): Lost connection to " << cluster.host
|
||||||
<< cluster.host << " Error code=" << err_code << " "
|
<< " Error code=" << err_code
|
||||||
<< extract_error(pPacket));
|
<< ' ' << extract_error(pPacket));
|
||||||
poll_fake_hangup_event(m_pClient_dcb);
|
poll_fake_hangup_event(m_pClient_dcb);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -270,8 +313,9 @@ void SmartRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
|
|||||||
|
|
||||||
if (cluster.tracker.state() == maxsql::PacketTracker::State::Error)
|
if (cluster.tracker.state() == maxsql::PacketTracker::State::Error)
|
||||||
{
|
{
|
||||||
// TODO add more info
|
MXS_SERROR("ProtocolTracker from state " << tracker_state_before
|
||||||
MXS_SERROR("ProtocolTracker error in state " << tracker_state_before);
|
<< " to state " << cluster.tracker.state()
|
||||||
|
<< ". Disconnect.");
|
||||||
poll_fake_hangup_event(m_pClient_dcb);
|
poll_fake_hangup_event(m_pClient_dcb);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -281,8 +325,9 @@ void SmartRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
|
|||||||
if (first_response_packet)
|
if (first_response_packet)
|
||||||
{
|
{
|
||||||
maxbase::Duration query_dur = maxbase::Clock::now() - m_measurement.start;
|
maxbase::Duration query_dur = maxbase::Clock::now() - m_measurement.start;
|
||||||
MXS_SDEBUG("Host " << cluster.host << " will be responding to the client. "
|
MXS_SDEBUG("Host " << cluster.host
|
||||||
<< "First packet received in time " << query_dur);
|
<< " will be responding to the client."
|
||||||
|
<< " First packet received in time " << query_dur);
|
||||||
cluster.is_replying_to_client = true;
|
cluster.is_replying_to_client = true;
|
||||||
will_reply = true; // tentatively, the packet might have to be delayed
|
will_reply = true; // tentatively, the packet might have to be delayed
|
||||||
|
|
||||||
@ -399,13 +444,18 @@ bool SmartRouterSession::write_to_host(const maxbase::Host& host, GWBUF* pBuf)
|
|||||||
|
|
||||||
bool SmartRouterSession::write_to_all(GWBUF* pBuf, Mode mode)
|
bool SmartRouterSession::write_to_all(GWBUF* pBuf, Mode mode)
|
||||||
{
|
{
|
||||||
|
bool success = true;
|
||||||
|
|
||||||
for (auto it = begin(m_clusters); it != end(m_clusters); ++it)
|
for (auto it = begin(m_clusters); it != end(m_clusters); ++it)
|
||||||
{
|
{
|
||||||
auto& cluster = *it;
|
auto& cluster = *it;
|
||||||
cluster.tracker = maxsql::PacketTracker(pBuf);
|
cluster.tracker = maxsql::PacketTracker(pBuf);
|
||||||
cluster.is_replying_to_client = false;
|
cluster.is_replying_to_client = false;
|
||||||
auto pBuf_send = (std::next(it) == end(m_clusters)) ? pBuf : gwbuf_clone(pBuf);
|
auto pBuf_send = (std::next(it) == end(m_clusters)) ? pBuf : gwbuf_clone(pBuf);
|
||||||
cluster.pDcb->func.write(cluster.pDcb, pBuf_send);
|
if (!cluster.pDcb->func.write(cluster.pDcb, pBuf_send))
|
||||||
|
{
|
||||||
|
success = false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (expecting_response_packets())
|
if (expecting_response_packets())
|
||||||
@ -413,7 +463,7 @@ bool SmartRouterSession::write_to_all(GWBUF* pBuf, Mode mode)
|
|||||||
m_mode = mode;
|
m_mode = mode;
|
||||||
}
|
}
|
||||||
|
|
||||||
return true; // TODO. What could possibly go wrong?
|
return success;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool SmartRouterSession::write_split_packets(GWBUF* pBuf)
|
bool SmartRouterSession::write_split_packets(GWBUF* pBuf)
|
||||||
@ -428,14 +478,21 @@ bool SmartRouterSession::write_split_packets(GWBUF* pBuf)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool success = true;
|
||||||
|
|
||||||
for (auto it = begin(active); it != end(active); ++it)
|
for (auto it = begin(active); it != end(active); ++it)
|
||||||
{
|
{
|
||||||
auto& cluster = **it;
|
auto& cluster = **it;
|
||||||
|
|
||||||
auto pBuf_send = (std::next(it) == end(active)) ? pBuf : gwbuf_clone(pBuf);
|
auto pBuf_send = (std::next(it) == end(active)) ? pBuf : gwbuf_clone(pBuf);
|
||||||
cluster.pDcb->func.write(cluster.pDcb, pBuf_send);
|
if (!cluster.pDcb->func.write(cluster.pDcb, pBuf_send))
|
||||||
|
{
|
||||||
|
success = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return true; // TODO. What could possibly go wrong?
|
return success;
|
||||||
}
|
}
|
||||||
|
|
||||||
void SmartRouterSession::kill_all_others(const Cluster& cluster)
|
void SmartRouterSession::kill_all_others(const Cluster& cluster)
|
||||||
@ -452,7 +509,7 @@ void SmartRouterSession::handleError(GWBUF* pPacket,
|
|||||||
mxs_error_action_t action,
|
mxs_error_action_t action,
|
||||||
bool* pSuccess)
|
bool* pSuccess)
|
||||||
{
|
{
|
||||||
// One of the clusters closed the connection, in terms of SmartRouter this is a hopeless situation.
|
// One of the clusters closed the connection. In terms of SmartRouter this is a hopeless situation.
|
||||||
// Close the shop, and let the client retry. Also see marker1.
|
// Close the shop, and let the client retry. Also see marker1.
|
||||||
auto it = std::find_if(begin(m_clusters), end(m_clusters),
|
auto it = std::find_if(begin(m_clusters), end(m_clusters),
|
||||||
[pProblem](const Cluster& cluster) {
|
[pProblem](const Cluster& cluster) {
|
||||||
@ -461,7 +518,6 @@ void SmartRouterSession::handleError(GWBUF* pPacket,
|
|||||||
|
|
||||||
mxb_assert(it != end(m_clusters));
|
mxb_assert(it != end(m_clusters));
|
||||||
Cluster& cluster = *it;
|
Cluster& cluster = *it;
|
||||||
// TODO: Will the session close gracefully, or is some more checking needed here.
|
|
||||||
|
|
||||||
auto err_code = mxs_mysql_get_mysql_errno(pPacket);
|
auto err_code = mxs_mysql_get_mysql_errno(pPacket);
|
||||||
MXS_SERROR("handleError(): Lost connection to " << cluster.host << " Error code=" << err_code << " "
|
MXS_SERROR("handleError(): Lost connection to " << cluster.host << " Error code=" << err_code << " "
|
||||||
@ -478,7 +534,6 @@ void SmartRouterSession::handleError(GWBUF* pPacket,
|
|||||||
pClient->func.write(pClient, pCopy);
|
pClient->func.write(pClient, pCopy);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// This will lead to the rest of the connections to be closed.
|
// This will lead to the rest of the connections to be closed.
|
||||||
*pSuccess = false;
|
*pSuccess = false;
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user