From 6ba1c277be3e598d3be3d3b8dcdae0491ad0b0cf Mon Sep 17 00:00:00 2001 From: Niclas Antti Date: Wed, 19 Jun 2019 00:29:47 +0300 Subject: [PATCH] MXS-2555 Fix a few TODOs and other cleanup --- .../routing/smartrouter/smartsession.cc | 111 +++++++++++++----- 1 file changed, 83 insertions(+), 28 deletions(-) diff --git a/server/modules/routing/smartrouter/smartsession.cc b/server/modules/routing/smartrouter/smartsession.cc index 44d76cab5..037dda1d1 100644 --- a/server/modules/routing/smartrouter/smartsession.cc +++ b/server/modules/routing/smartrouter/smartsession.cc @@ -17,11 +17,6 @@ #include #include -// TODO, missing error handling. I did not add overly many asserts, which make reading code harder. -// But please note any that may be missing. - -// TODO, for m_qc.target_is_all(), check that responses from all routers match. - // COPY-PASTED error-extraction functions from rwsplit. TODO move to lib. inline void extract_error_state(uint8_t* pBuffer, uint8_t** ppState, uint16_t* pnState) { @@ -74,6 +69,54 @@ std::string extract_error(GWBUF* buffer) return rval; } +// TODO Another thing to move to maxutils +static std::array size_suffix {"B", "KB", "MB", "GB", "TB", "PB", "EB"}; +constexpr size_t KiloByte {1024}; + +/** return a pair {double_value, suffix} for humanizing a size. + * Example: pretty_size_split(2000) => {1.953125, "KB"} + */ +std::pair pretty_size_split(size_t sz) +{ + double dsize = sz; + size_t i {0}; + + for (; i < size_suffix.size() && dsize >= KiloByte; ++i, dsize /= KiloByte) + { + } + + return {dsize, size_suffix[i]}; +} + +/** Pretty string from a size_t, e.g pretty_size(2000) => "1.95KB" + */ +std::string pretty_size(size_t sz, const char* separator = "") +{ + char buf[64]; + double dsize; + const char* suffix; + + std::tie(dsize, suffix) = pretty_size_split(sz); + + // format with two decimals + auto len = std::sprintf(buf, "%.2f", dsize); + + // remove trailing 0-decimals + char* ptr = buf + len - 1; + for (; *ptr == '0'; --ptr) + { + } + if (*ptr != '.') + { + ++ptr; + } + + // append suffix + sprintf(ptr, "%s%s", separator, suffix); + + return buf; +} + SmartRouterSession::SmartRouterSession(SmartRouter* pRouter, MXS_SESSION* pSession, Clusters clusters) @@ -158,9 +201,11 @@ int SmartRouterSession::routeQuery(GWBUF* pBuf) { bool ret = false; + MXS_SDEBUG("routeQuery() buffer size " << pretty_size(gwbuf_length(pBuf))); + if (expecting_request_packets()) { - write_split_packets(pBuf); + ret = write_split_packets(pBuf); if (all_clusters_are_idle()) { m_mode = Mode::Idle; @@ -169,16 +214,15 @@ int SmartRouterSession::routeQuery(GWBUF* pBuf) else if (m_mode != Mode::Idle) { auto is_busy = !all_clusters_are_idle(); - // TODO add more detail, operator<< to PacketRouter. MXS_SERROR("routeQuery() in wrong state. clusters busy = " << std::boolalpha << is_busy); mxb_assert(false); - ret = false; } else { auto route_info = m_qc.update_route_info(mxs::QueryClassifier::CURRENT_TARGET_UNDEFINED, pBuf); + std::string canonical = maxscale::get_canonical(pBuf); - m_measurement = {maxbase::Clock::now(), maxscale::get_canonical(pBuf)}; + m_measurement = {maxbase::Clock::now(), canonical}; if (m_qc.target_is_all(route_info.target())) { @@ -192,13 +236,12 @@ int SmartRouterSession::routeQuery(GWBUF* pBuf) } else { - std::string canonical = maxscale::get_canonical(pBuf); auto perf = m_router.perf_find(canonical); if (perf.is_valid()) { - MXS_SDEBUG("Route to " << perf.host() << " based on performance, canonical = " - << show_some(canonical)); + MXS_SDEBUG("Smart route to " << perf.host() + << ", canonical = " << show_some(canonical)); ret = write_to_host(perf.host(), pBuf); } else if (modutil_is_SQL(pBuf)) @@ -219,7 +262,7 @@ int SmartRouterSession::routeQuery(GWBUF* pBuf) void SmartRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb) { - mxb_assert(GWBUF_IS_CONTIGUOUS(pPacket)); // TODO, do non-contiguous for slightly better speed? + mxb_assert(GWBUF_IS_CONTIGUOUS(pPacket)); auto it = std::find_if(begin(m_clusters), end(m_clusters), [pDcb](const Cluster& cluster) { @@ -251,7 +294,7 @@ void SmartRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb) // marker1: If a connection is lost down the pipeline, we first get an ErrorPacket, then a call to // handleError(). If we only rely on the handleError() the client receiving the ErrorPacket - // can retry using this connection/session, causing a an error (or assert) in routeQuery(). + // could retry using this connection/session, causing an error (or assert) in routeQuery(). // This will change once we implement direct function calls to the Clusters (which really // are routers). if (cluster.tracker.state() == maxsql::PacketTracker::State::ErrorPacket) @@ -260,9 +303,9 @@ void SmartRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb) switch (err_code) { case ER_CONNECTION_KILLED: // there might be more error codes needing to be caught here - MXS_SERROR("clientReply(): Lost connection to " - << cluster.host << " Error code=" << err_code << " " - << extract_error(pPacket)); + MXS_SERROR("clientReply(): Lost connection to " << cluster.host + << " Error code=" << err_code + << ' ' << extract_error(pPacket)); poll_fake_hangup_event(m_pClient_dcb); return; } @@ -270,8 +313,9 @@ void SmartRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb) if (cluster.tracker.state() == maxsql::PacketTracker::State::Error) { - // TODO add more info - MXS_SERROR("ProtocolTracker error in state " << tracker_state_before); + MXS_SERROR("ProtocolTracker from state " << tracker_state_before + << " to state " << cluster.tracker.state() + << ". Disconnect."); poll_fake_hangup_event(m_pClient_dcb); return; } @@ -281,8 +325,9 @@ void SmartRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb) if (first_response_packet) { maxbase::Duration query_dur = maxbase::Clock::now() - m_measurement.start; - MXS_SDEBUG("Host " << cluster.host << " will be responding to the client. " - << "First packet received in time " << query_dur); + MXS_SDEBUG("Host " << cluster.host + << " will be responding to the client." + << " First packet received in time " << query_dur); cluster.is_replying_to_client = true; will_reply = true; // tentatively, the packet might have to be delayed @@ -399,13 +444,18 @@ bool SmartRouterSession::write_to_host(const maxbase::Host& host, GWBUF* pBuf) bool SmartRouterSession::write_to_all(GWBUF* pBuf, Mode mode) { + bool success = true; + for (auto it = begin(m_clusters); it != end(m_clusters); ++it) { auto& cluster = *it; cluster.tracker = maxsql::PacketTracker(pBuf); cluster.is_replying_to_client = false; auto pBuf_send = (std::next(it) == end(m_clusters)) ? pBuf : gwbuf_clone(pBuf); - cluster.pDcb->func.write(cluster.pDcb, pBuf_send); + if (!cluster.pDcb->func.write(cluster.pDcb, pBuf_send)) + { + success = false; + } } if (expecting_response_packets()) @@ -413,7 +463,7 @@ bool SmartRouterSession::write_to_all(GWBUF* pBuf, Mode mode) m_mode = mode; } - return true; // TODO. What could possibly go wrong? + return success; } bool SmartRouterSession::write_split_packets(GWBUF* pBuf) @@ -428,14 +478,21 @@ bool SmartRouterSession::write_split_packets(GWBUF* pBuf) } } + bool success = true; + for (auto it = begin(active); it != end(active); ++it) { auto& cluster = **it; + auto pBuf_send = (std::next(it) == end(active)) ? pBuf : gwbuf_clone(pBuf); - cluster.pDcb->func.write(cluster.pDcb, pBuf_send); + if (!cluster.pDcb->func.write(cluster.pDcb, pBuf_send)) + { + success = false; + break; + } } - return true; // TODO. What could possibly go wrong? + return success; } void SmartRouterSession::kill_all_others(const Cluster& cluster) @@ -452,7 +509,7 @@ void SmartRouterSession::handleError(GWBUF* pPacket, mxs_error_action_t action, bool* pSuccess) { - // One of the clusters closed the connection, in terms of SmartRouter this is a hopeless situation. + // One of the clusters closed the connection. In terms of SmartRouter this is a hopeless situation. // Close the shop, and let the client retry. Also see marker1. auto it = std::find_if(begin(m_clusters), end(m_clusters), [pProblem](const Cluster& cluster) { @@ -461,7 +518,6 @@ void SmartRouterSession::handleError(GWBUF* pPacket, mxb_assert(it != end(m_clusters)); Cluster& cluster = *it; - // TODO: Will the session close gracefully, or is some more checking needed here. auto err_code = mxs_mysql_get_mysql_errno(pPacket); MXS_SERROR("handleError(): Lost connection to " << cluster.host << " Error code=" << err_code << " " @@ -478,7 +534,6 @@ void SmartRouterSession::handleError(GWBUF* pPacket, pClient->func.write(pClient, pCopy); } - // This will lead to the rest of the connections to be closed. *pSuccess = false; }