From c899f00541ecc8428451e70745e63ab93b246233 Mon Sep 17 00:00:00 2001 From: Johan Wikman Date: Wed, 7 Nov 2018 09:45:13 +0200 Subject: [PATCH] MXS-1780 Collect server response information As the router is the only one that knows what backends a particular statement has been sent to, it is the responsibility of the router to keep the session bookkeeping up to date. If it doesn't we will know what statements a session has received (provided at least some component in the routing chain has RCAP_TYPE_STMT_INPUT capability), but not how long their processing took. Currently only readwritesplit does that. All queries are stored and not just COM_QUERY as that makes the overall bookkeeping simpler; at clientReply() time we do not need to know whether or not to bookkeep information, we can just do it. When session information is queried for, we report as much information we have available. --- include/maxscale/session.h | 20 + server/core/internal/session.hh | 42 ++- server/core/session.cc | 357 +++++++++++++----- .../routing/readwritesplit/rwsplitsession.cc | 4 + 4 files changed, 328 insertions(+), 95 deletions(-) diff --git a/include/maxscale/session.h b/include/maxscale/session.h index 535c92629..a60876384 100644 --- a/include/maxscale/session.h +++ b/include/maxscale/session.h @@ -628,6 +628,26 @@ void session_set_retain_last_statements(uint32_t n); */ void session_retain_statement(MXS_SESSION* session, GWBUF* buffer); +/** + * @brief Book a server response for the statement currently being handled. + * + * @param session The session. + * @param server The server having returned a response. + * @param final_response True if this was the final server to respond, + * false otherwise. + */ +void session_book_server_response(MXS_SESSION* session, struct server* server, bool final_response); + +/** + * @brief Reset the server bookkeeping for the current statement. + * + * To be called, e.g., after a transaction is rolled back (possibly with + * results having been reported) and before it is replayed. + * + * @param session The session. + */ +void session_reset_server_bookkeeping(MXS_SESSION* session); + /** * @brief Dump the last statements, if statements have been retained. * diff --git a/server/core/internal/session.hh b/server/core/internal/session.hh index dff4e7dbd..90c850e0f 100644 --- a/server/core/internal/session.hh +++ b/server/core/internal/session.hh @@ -61,24 +61,42 @@ public: class Session : public MXS_SESSION { public: - class StatementInfo + class QueryInfo { public: - StatementInfo(const std::shared_ptr& sStatement); + QueryInfo(const std::shared_ptr& sQuery); json_t* as_json() const; - const std::shared_ptr& statement() const + bool complete() const { - return m_sStatement; + return m_complete; } + const std::shared_ptr& query() const + { + return m_sQuery; + } + + void book_server_response(SERVER* pServer, bool final_response); + void book_as_complete(); + void reset_server_bookkeeping(); + + struct ServerInfo + { + SERVER* pServer; + timespec processed; + }; + private: - std::shared_ptr m_sStatement; - timespec m_received; + std::shared_ptr m_sQuery; /*< The packet, COM_QUERY *or* something else. */ + timespec m_received; /*< When was it received. */ + timespec m_completed; /*< When was it completed. */ + std::vector m_server_infos; /*< When different servers responded. */ + bool m_complete = false; /*< Is this information complete? */ }; - typedef std::deque SessionStmtQueue; + typedef std::deque QueryInfos; using FilterList = std::vector; @@ -99,8 +117,11 @@ public: bool remove_variable(const char* name, void** context); void retain_statement(GWBUF* pBuffer); void dump_statements() const; + void book_server_response(SERVER* pServer, bool final_response); + void book_last_as_complete(); + void reset_server_bookkeeping(); - json_t* statements_as_json() const; + json_t* queries_as_json() const; void link_backend_dcb(DCB* dcb) { @@ -122,8 +143,9 @@ public: private: FilterList m_filters; SessionVarsByName m_variables; - SessionStmtQueue m_last_statements;/*< The N last statements by the client */ - DCBSet m_dcb_set; /*< Set of associated backend DCBs */ + QueryInfos m_last_queries; /*< The N last queries by the client */ + int m_current_query = -1; /*< The index of the current query */ + DCBSet m_dcb_set; /*< Set of associated backend DCBs */ }; } diff --git a/server/core/session.cc b/server/core/session.cc index 3e5a15cac..03c232fc5 100644 --- a/server/core/session.cc +++ b/server/core/session.cc @@ -897,8 +897,8 @@ json_t* session_json_data(const Session* session, const char* host) json_object_set_new(attr, "connections", dcb_arr); - json_t* statements = session->statements_as_json(); - json_object_set_new(attr, "statements", statements); + json_t* queries = session->queries_as_json(); + json_object_set_new(attr, "queries", queries); json_object_set_new(data, CN_ATTRIBUTES, attr); json_object_set_new(data, CN_LINKS, mxs_json_self_link(host, CN_SESSIONS, ss.str().c_str())); @@ -1029,6 +1029,11 @@ static void session_deliver_response(MXS_SESSION* session) session->response.up.session = NULL; session->response.up.clientReply = NULL; session->response.buffer = NULL; + + // If some filter short-circuits the routing, then there will + // be no response from a server and we need to ensure that + // subsequent book-keeping targets the right statement. + static_cast(session)->book_last_as_complete(); } mxb_assert(!session->response.up.instance); @@ -1052,10 +1057,19 @@ session_dump_statements_t session_get_dump_statements() return dump_statements; } -void session_retain_statement(MXS_SESSION* session, GWBUF* pBuffer) +void session_retain_statement(MXS_SESSION* pSession, GWBUF* pBuffer) { - Session* pSession = static_cast(session); - pSession->retain_statement(pBuffer); + static_cast(pSession)->retain_statement(pBuffer); +} + +void session_book_server_response(MXS_SESSION* pSession, SERVER* pServer, bool final_response) +{ + static_cast(pSession)->book_server_response(pServer, final_response); +} + +void session_reset_server_bookkeeping(MXS_SESSION* pSession) +{ + static_cast(pSession)->reset_server_bookkeeping(); } void session_dump_statements(MXS_SESSION* session) @@ -1193,21 +1207,50 @@ Session::~Session() namespace { -bool get_stmt(GWBUF* pBuffer, char** ppStmt, int* pLen) +bool get_cmd_and_stmt(GWBUF* pBuffer, const char** ppCmd, char** ppStmt, int* pLen) { - mxb_assert(modutil_is_SQL(pBuffer)); + *ppCmd = nullptr; + *ppStmt = nullptr; + *pLen = 0; - if (GWBUF_IS_CONTIGUOUS(pBuffer)) + bool deallocate = false; + int len = gwbuf_length(pBuffer); + + if (len > MYSQL_HEADER_LEN) { - modutil_extract_SQL(pBuffer, ppStmt, pLen); - } - else - { - *ppStmt = modutil_get_SQL(pBuffer); - *pLen = strlen(*ppStmt); + uint8_t header[MYSQL_HEADER_LEN + 1]; + uint8_t* pHeader = NULL; + + if (GWBUF_LENGTH(pBuffer) > MYSQL_HEADER_LEN) + { + pHeader = GWBUF_DATA(pBuffer); + } + else + { + gwbuf_copy_data(pBuffer, 0, MYSQL_HEADER_LEN + 1, header); + pHeader = header; + } + + int cmd = MYSQL_GET_COMMAND(pHeader); + + *ppCmd = STRPACKETTYPE(cmd); + + if (cmd == MXS_COM_QUERY) + { + if (GWBUF_IS_CONTIGUOUS(pBuffer)) + { + modutil_extract_SQL(pBuffer, ppStmt, pLen); + } + else + { + *ppStmt = modutil_get_SQL(pBuffer); + *pLen = strlen(*ppStmt); + deallocate = true; + } + } } - return !GWBUF_IS_CONTIGUOUS(pBuffer); + return deallocate; } } @@ -1216,7 +1259,7 @@ void Session::dump_statements() const { if (retain_last_statements) { - int n = m_last_statements.size(); + int n = m_last_queries.size(); uint64_t id = session_get_current_id(); @@ -1228,30 +1271,34 @@ void Session::dump_statements() const ses_id); } - for (auto i = m_last_statements.rbegin(); i != m_last_statements.rend(); ++i) + for (auto i = m_last_queries.rbegin(); i != m_last_queries.rend(); ++i) { - const StatementInfo& info = *i; - const std::shared_ptr& sBuffer = info.statement(); + const QueryInfo& info = *i; + GWBUF* pBuffer = info.query().get(); + const char* pCmd; char* pStmt; int len; - bool deallocate = get_stmt(sBuffer.get(), &pStmt, &len); + bool deallocate = get_cmd_and_stmt(pBuffer, &pCmd, &pStmt, &len); - if (id != 0) + if (pStmt) { - MXS_NOTICE("Stmt %d: %.*s", n, len, pStmt); - } - else - { - // We are in a context where we do not have a current session, so we need to - // log the session id ourselves. + if (id != 0) + { + MXS_NOTICE("Stmt %d: %.*s", n, len, pStmt); + } + else + { + // We are in a context where we do not have a current session, so we need to + // log the session id ourselves. - MXS_NOTICE("(%" PRIu64 ") Stmt %d: %.*s", ses_id, n, len, pStmt); - } + MXS_NOTICE("(%" PRIu64 ") Stmt %d: %.*s", ses_id, n, len, pStmt); + } - if (deallocate) - { - MXS_FREE(pStmt); + if (deallocate) + { + MXS_FREE(pStmt); + } } --n; @@ -1259,18 +1306,18 @@ void Session::dump_statements() const } } -json_t* Session::statements_as_json() const +json_t* Session::queries_as_json() const { - json_t* pStatements = json_array(); + json_t* pQueries = json_array(); - for (auto i = m_last_statements.rbegin(); i != m_last_statements.rend(); ++i) + for (auto i = m_last_queries.rbegin(); i != m_last_queries.rend(); ++i) { - const StatementInfo& info = *i; + const QueryInfo& info = *i; - json_array_append_new(pStatements, info.as_json()); + json_array_append_new(pQueries, info.as_json()); } - return pStatements; + return pQueries; } bool Session::setup_filters(Service* service) @@ -1421,74 +1468,214 @@ void Session::retain_statement(GWBUF* pBuffer) { if (retain_last_statements) { - size_t len = gwbuf_length(pBuffer); + mxb_assert(m_last_queries.size() <= retain_last_statements); - if (len > MYSQL_HEADER_LEN) + std::shared_ptr sBuffer(gwbuf_clone(pBuffer)); + + m_last_queries.push_front(QueryInfo(sBuffer)); + + if (m_last_queries.size() > retain_last_statements) { - uint8_t header[MYSQL_HEADER_LEN + 1]; - uint8_t* pHeader = NULL; + m_last_queries.pop_back(); + } - if (GWBUF_LENGTH(pBuffer) > MYSQL_HEADER_LEN) - { - pHeader = GWBUF_DATA(pBuffer); - } - else - { - gwbuf_copy_data(pBuffer, 0, MYSQL_HEADER_LEN + 1, header); - pHeader = header; - } - - if (MYSQL_GET_COMMAND(pHeader) == MXS_COM_QUERY) - { - mxb_assert(m_last_statements.size() <= retain_last_statements); - - if (m_last_statements.size() == retain_last_statements) - { - m_last_statements.pop_back(); - } - - std::shared_ptr sBuffer(gwbuf_clone(pBuffer)); - - m_last_statements.push_front(StatementInfo(sBuffer)); - } + if (m_last_queries.size() == 1) + { + mxb_assert(m_current_query == -1); + m_current_query = 0; + } + else + { + // If requests are streamed, without the response being waited for, + // then this may cause the index to grow past the length of the array. + // That's ok and is dealt with in book_server_response() and friends. + ++m_current_query; + mxb_assert(m_current_query >= 0); } } } -Session::StatementInfo::StatementInfo(const std::shared_ptr& sStatement) - : m_sStatement(sStatement) +void Session::book_server_response(SERVER* pServer, bool final_response) { - clock_gettime(CLOCK_REALTIME, &m_received); + if (retain_last_statements && !m_last_queries.empty()) + { + mxb_assert(m_current_query >= 0); + // If enough queries have been sent by the client, without it waiting + // for the responses, then at this point it may be so that the query + // object has been popped from the size limited queue. That's apparent + // by the index pointing past the end of the queue. In that case + // we simply ignore the result. + if (m_current_query < static_cast(m_last_queries.size())) + { + auto i = m_last_queries.begin() + m_current_query; + QueryInfo& info = *i; + + mxb_assert(!info.complete()); + + info.book_server_response(pServer, final_response); + } + + if (final_response) + { + // In case what is described in the comment above has occurred, + // this will eventually take the index back into the queue. + --m_current_query; + mxb_assert(m_current_query >= -1); + } + } } -json_t* Session::StatementInfo::as_json() const +void Session::book_last_as_complete() { - json_t* pInfo = json_object(); + if (retain_last_statements && !m_last_queries.empty()) + { + mxb_assert(m_current_query >= 0); + // See comment in book_server_response(). + if (m_current_query < static_cast(m_last_queries.size())) + { + auto i = m_last_queries.begin() + m_current_query; + QueryInfo& info = *i; + info.book_as_complete(); + } + } +} + +void Session::reset_server_bookkeeping() +{ + if (retain_last_statements && !m_last_queries.empty()) + { + mxb_assert(m_current_query >= 0); + // See comment in book_server_response(). + if (m_current_query < static_cast(m_last_queries.size())) + { + auto i = m_last_queries.begin() + m_current_query; + QueryInfo& info = *i; + info.reset_server_bookkeeping(); + } + } +} + +Session::QueryInfo::QueryInfo(const std::shared_ptr& sQuery) + : m_sQuery(sQuery) +{ + clock_gettime(CLOCK_REALTIME_COARSE, &m_received); + m_completed.tv_sec = 0; + m_completed.tv_nsec = 0; +} + +namespace +{ + +static const char ISO_TEMPLATE[] = "2018-11-05T16:47:49.123"; +static const int ISO_TIME_LEN = sizeof(ISO_TEMPLATE) - 1; + +void timespec_to_iso(char* zIso, const timespec& ts) +{ + tm tm; + gmtime_r(&ts.tv_sec, &tm); + + size_t i = strftime(zIso, ISO_TIME_LEN + 1, "%G-%m-%dT%H:%M:%S", &tm); + mxb_assert(i == 19); + long int ms = ts.tv_nsec / 1000000; + i = sprintf(zIso + i, ".%03ld", ts.tv_nsec / 1000000); + mxb_assert(i == 4); +} + +} + +json_t* Session::QueryInfo::as_json() const +{ + json_t* pQuery = json_object(); + + const char* pCmd; char* pStmt; int len; - bool deallocate = get_stmt(m_sStatement.get(), &pStmt, &len); + bool deallocate = get_cmd_and_stmt(m_sQuery.get(), &pCmd, &pStmt, &len); - json_object_set_new(pInfo, "statement", json_stringn(pStmt, len)); - - if (deallocate) + if (pCmd) { - MXS_FREE(pStmt); + json_object_set_new(pQuery, "command", json_string(pCmd)); } - tm tm; - gmtime_r(&m_received.tv_sec, &tm); + if (pStmt) + { + json_object_set_new(pQuery, "statement", json_stringn(pStmt, len)); - static const char ISO_TEMPLATE[] = "2018-11-05T16:47:49.123456789"; - static const int ISO_TIME_LEN = sizeof(ISO_TEMPLATE) - 1; + if (deallocate) + { + MXS_FREE(pStmt); + } + } char iso_time[ISO_TIME_LEN + 1]; - size_t i = strftime(iso_time, sizeof(iso_time), "%G-%m-%dT%H:%M:%S", &tm); - mxb_assert(i == 19); - i = sprintf(iso_time + i, ".%09ld", m_received.tv_nsec); - mxb_assert(i == 10); - json_object_set_new(pInfo, "received", json_stringn(iso_time, ISO_TIME_LEN)); + timespec_to_iso(iso_time, m_received); + json_object_set_new(pQuery, "received", json_stringn(iso_time, ISO_TIME_LEN)); - return pInfo; + if (m_complete) + { + timespec_to_iso(iso_time, m_completed); + json_object_set_new(pQuery, "completed", json_stringn(iso_time, ISO_TIME_LEN)); + } + + json_t* pResponses = json_array(); + + for (auto& info : m_server_infos) + { + json_t* pResponse = json_object(); + + // Calculate and report in milliseconds. + long int received = m_received.tv_sec * 1000 + m_received.tv_nsec / 1000000; + long int processed = info.processed.tv_sec * 1000 + info.processed.tv_nsec / 1000000; + mxb_assert(processed >= received); + + long int duration = processed - received; + + json_object_set_new(pResponse, "server", json_string(info.pServer->name)); + json_object_set_new(pResponse, "duration", json_integer(duration)); + + json_array_append_new(pResponses, pResponse); + } + + json_object_set_new(pQuery, "responses", pResponses); + + return pQuery; +} + +void Session::QueryInfo::book_server_response(SERVER* pServer, bool final_response) +{ + // If the information has been completed, no more information may be provided. + mxb_assert(!m_complete); + // A particular server may be reported only exactly once. + mxb_assert(find_if(m_server_infos.begin(), m_server_infos.end(), [pServer](const ServerInfo& info) { + return info.pServer == pServer; + }) == m_server_infos.end()); + + timespec now; + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + m_server_infos.push_back(ServerInfo {pServer, now}); + + m_complete = final_response; + + if (m_complete) + { + m_completed = now; + } +} + +void Session::QueryInfo::book_as_complete() +{ + timespec now; + clock_gettime(CLOCK_REALTIME_COARSE, &m_completed); + m_complete = true; +} + +void Session::QueryInfo::reset_server_bookkeeping() +{ + m_server_infos.clear(); + m_completed.tv_sec = 0; + m_completed.tv_nsec = 0; + m_complete = false; } diff --git a/server/modules/routing/readwritesplit/rwsplitsession.cc b/server/modules/routing/readwritesplit/rwsplitsession.cc index dd0c33d79..2f1c6ad9b 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.cc +++ b/server/modules/routing/readwritesplit/rwsplitsession.cc @@ -585,6 +585,9 @@ void RWSplitSession::clientReply(GWBUF* writebuf, DCB* backend_dcb) { /** Got a complete reply, decrement expected response count */ m_expected_responses--; + + session_book_server_response(m_pSession, backend->backend()->server, m_expected_responses == 0); + mxb_assert(m_expected_responses >= 0); mxb_assert(backend->get_reply_state() == REPLY_STATE_DONE); MXS_INFO("Reply complete, last reply from %s", backend->name()); @@ -639,6 +642,7 @@ void RWSplitSession::clientReply(GWBUF* writebuf, DCB* backend_dcb) m_otrx_state = OTRX_INACTIVE; start_trx_replay(); gwbuf_free(writebuf); + session_reset_server_bookkeeping(m_pSession); return; } }