diff --git a/include/maxscale/session.h b/include/maxscale/session.h index 535c92629..a60876384 100644 --- a/include/maxscale/session.h +++ b/include/maxscale/session.h @@ -628,6 +628,26 @@ void session_set_retain_last_statements(uint32_t n); */ void session_retain_statement(MXS_SESSION* session, GWBUF* buffer); +/** + * @brief Book a server response for the statement currently being handled. + * + * @param session The session. + * @param server The server having returned a response. + * @param final_response True if this was the final server to respond, + * false otherwise. + */ +void session_book_server_response(MXS_SESSION* session, struct server* server, bool final_response); + +/** + * @brief Reset the server bookkeeping for the current statement. + * + * To be called, e.g., after a transaction is rolled back (possibly with + * results having been reported) and before it is replayed. + * + * @param session The session. + */ +void session_reset_server_bookkeeping(MXS_SESSION* session); + /** * @brief Dump the last statements, if statements have been retained. * diff --git a/server/core/internal/session.hh b/server/core/internal/session.hh index dff4e7dbd..90c850e0f 100644 --- a/server/core/internal/session.hh +++ b/server/core/internal/session.hh @@ -61,24 +61,42 @@ public: class Session : public MXS_SESSION { public: - class StatementInfo + class QueryInfo { public: - StatementInfo(const std::shared_ptr& sStatement); + QueryInfo(const std::shared_ptr& sQuery); json_t* as_json() const; - const std::shared_ptr& statement() const + bool complete() const { - return m_sStatement; + return m_complete; } + const std::shared_ptr& query() const + { + return m_sQuery; + } + + void book_server_response(SERVER* pServer, bool final_response); + void book_as_complete(); + void reset_server_bookkeeping(); + + struct ServerInfo + { + SERVER* pServer; + timespec processed; + }; + private: - std::shared_ptr m_sStatement; - timespec m_received; + std::shared_ptr m_sQuery; /*< The packet, COM_QUERY *or* something else. */ + timespec m_received; /*< When was it received. */ + timespec m_completed; /*< When was it completed. */ + std::vector m_server_infos; /*< When different servers responded. */ + bool m_complete = false; /*< Is this information complete? */ }; - typedef std::deque SessionStmtQueue; + typedef std::deque QueryInfos; using FilterList = std::vector; @@ -99,8 +117,11 @@ public: bool remove_variable(const char* name, void** context); void retain_statement(GWBUF* pBuffer); void dump_statements() const; + void book_server_response(SERVER* pServer, bool final_response); + void book_last_as_complete(); + void reset_server_bookkeeping(); - json_t* statements_as_json() const; + json_t* queries_as_json() const; void link_backend_dcb(DCB* dcb) { @@ -122,8 +143,9 @@ public: private: FilterList m_filters; SessionVarsByName m_variables; - SessionStmtQueue m_last_statements;/*< The N last statements by the client */ - DCBSet m_dcb_set; /*< Set of associated backend DCBs */ + QueryInfos m_last_queries; /*< The N last queries by the client */ + int m_current_query = -1; /*< The index of the current query */ + DCBSet m_dcb_set; /*< Set of associated backend DCBs */ }; } diff --git a/server/core/session.cc b/server/core/session.cc index 3e5a15cac..03c232fc5 100644 --- a/server/core/session.cc +++ b/server/core/session.cc @@ -897,8 +897,8 @@ json_t* session_json_data(const Session* session, const char* host) json_object_set_new(attr, "connections", dcb_arr); - json_t* statements = session->statements_as_json(); - json_object_set_new(attr, "statements", statements); + json_t* queries = session->queries_as_json(); + json_object_set_new(attr, "queries", queries); json_object_set_new(data, CN_ATTRIBUTES, attr); json_object_set_new(data, CN_LINKS, mxs_json_self_link(host, CN_SESSIONS, ss.str().c_str())); @@ -1029,6 +1029,11 @@ static void session_deliver_response(MXS_SESSION* session) session->response.up.session = NULL; session->response.up.clientReply = NULL; session->response.buffer = NULL; + + // If some filter short-circuits the routing, then there will + // be no response from a server and we need to ensure that + // subsequent book-keeping targets the right statement. + static_cast(session)->book_last_as_complete(); } mxb_assert(!session->response.up.instance); @@ -1052,10 +1057,19 @@ session_dump_statements_t session_get_dump_statements() return dump_statements; } -void session_retain_statement(MXS_SESSION* session, GWBUF* pBuffer) +void session_retain_statement(MXS_SESSION* pSession, GWBUF* pBuffer) { - Session* pSession = static_cast(session); - pSession->retain_statement(pBuffer); + static_cast(pSession)->retain_statement(pBuffer); +} + +void session_book_server_response(MXS_SESSION* pSession, SERVER* pServer, bool final_response) +{ + static_cast(pSession)->book_server_response(pServer, final_response); +} + +void session_reset_server_bookkeeping(MXS_SESSION* pSession) +{ + static_cast(pSession)->reset_server_bookkeeping(); } void session_dump_statements(MXS_SESSION* session) @@ -1193,21 +1207,50 @@ Session::~Session() namespace { -bool get_stmt(GWBUF* pBuffer, char** ppStmt, int* pLen) +bool get_cmd_and_stmt(GWBUF* pBuffer, const char** ppCmd, char** ppStmt, int* pLen) { - mxb_assert(modutil_is_SQL(pBuffer)); + *ppCmd = nullptr; + *ppStmt = nullptr; + *pLen = 0; - if (GWBUF_IS_CONTIGUOUS(pBuffer)) + bool deallocate = false; + int len = gwbuf_length(pBuffer); + + if (len > MYSQL_HEADER_LEN) { - modutil_extract_SQL(pBuffer, ppStmt, pLen); - } - else - { - *ppStmt = modutil_get_SQL(pBuffer); - *pLen = strlen(*ppStmt); + uint8_t header[MYSQL_HEADER_LEN + 1]; + uint8_t* pHeader = NULL; + + if (GWBUF_LENGTH(pBuffer) > MYSQL_HEADER_LEN) + { + pHeader = GWBUF_DATA(pBuffer); + } + else + { + gwbuf_copy_data(pBuffer, 0, MYSQL_HEADER_LEN + 1, header); + pHeader = header; + } + + int cmd = MYSQL_GET_COMMAND(pHeader); + + *ppCmd = STRPACKETTYPE(cmd); + + if (cmd == MXS_COM_QUERY) + { + if (GWBUF_IS_CONTIGUOUS(pBuffer)) + { + modutil_extract_SQL(pBuffer, ppStmt, pLen); + } + else + { + *ppStmt = modutil_get_SQL(pBuffer); + *pLen = strlen(*ppStmt); + deallocate = true; + } + } } - return !GWBUF_IS_CONTIGUOUS(pBuffer); + return deallocate; } } @@ -1216,7 +1259,7 @@ void Session::dump_statements() const { if (retain_last_statements) { - int n = m_last_statements.size(); + int n = m_last_queries.size(); uint64_t id = session_get_current_id(); @@ -1228,30 +1271,34 @@ void Session::dump_statements() const ses_id); } - for (auto i = m_last_statements.rbegin(); i != m_last_statements.rend(); ++i) + for (auto i = m_last_queries.rbegin(); i != m_last_queries.rend(); ++i) { - const StatementInfo& info = *i; - const std::shared_ptr& sBuffer = info.statement(); + const QueryInfo& info = *i; + GWBUF* pBuffer = info.query().get(); + const char* pCmd; char* pStmt; int len; - bool deallocate = get_stmt(sBuffer.get(), &pStmt, &len); + bool deallocate = get_cmd_and_stmt(pBuffer, &pCmd, &pStmt, &len); - if (id != 0) + if (pStmt) { - MXS_NOTICE("Stmt %d: %.*s", n, len, pStmt); - } - else - { - // We are in a context where we do not have a current session, so we need to - // log the session id ourselves. + if (id != 0) + { + MXS_NOTICE("Stmt %d: %.*s", n, len, pStmt); + } + else + { + // We are in a context where we do not have a current session, so we need to + // log the session id ourselves. - MXS_NOTICE("(%" PRIu64 ") Stmt %d: %.*s", ses_id, n, len, pStmt); - } + MXS_NOTICE("(%" PRIu64 ") Stmt %d: %.*s", ses_id, n, len, pStmt); + } - if (deallocate) - { - MXS_FREE(pStmt); + if (deallocate) + { + MXS_FREE(pStmt); + } } --n; @@ -1259,18 +1306,18 @@ void Session::dump_statements() const } } -json_t* Session::statements_as_json() const +json_t* Session::queries_as_json() const { - json_t* pStatements = json_array(); + json_t* pQueries = json_array(); - for (auto i = m_last_statements.rbegin(); i != m_last_statements.rend(); ++i) + for (auto i = m_last_queries.rbegin(); i != m_last_queries.rend(); ++i) { - const StatementInfo& info = *i; + const QueryInfo& info = *i; - json_array_append_new(pStatements, info.as_json()); + json_array_append_new(pQueries, info.as_json()); } - return pStatements; + return pQueries; } bool Session::setup_filters(Service* service) @@ -1421,74 +1468,214 @@ void Session::retain_statement(GWBUF* pBuffer) { if (retain_last_statements) { - size_t len = gwbuf_length(pBuffer); + mxb_assert(m_last_queries.size() <= retain_last_statements); - if (len > MYSQL_HEADER_LEN) + std::shared_ptr sBuffer(gwbuf_clone(pBuffer)); + + m_last_queries.push_front(QueryInfo(sBuffer)); + + if (m_last_queries.size() > retain_last_statements) { - uint8_t header[MYSQL_HEADER_LEN + 1]; - uint8_t* pHeader = NULL; + m_last_queries.pop_back(); + } - if (GWBUF_LENGTH(pBuffer) > MYSQL_HEADER_LEN) - { - pHeader = GWBUF_DATA(pBuffer); - } - else - { - gwbuf_copy_data(pBuffer, 0, MYSQL_HEADER_LEN + 1, header); - pHeader = header; - } - - if (MYSQL_GET_COMMAND(pHeader) == MXS_COM_QUERY) - { - mxb_assert(m_last_statements.size() <= retain_last_statements); - - if (m_last_statements.size() == retain_last_statements) - { - m_last_statements.pop_back(); - } - - std::shared_ptr sBuffer(gwbuf_clone(pBuffer)); - - m_last_statements.push_front(StatementInfo(sBuffer)); - } + if (m_last_queries.size() == 1) + { + mxb_assert(m_current_query == -1); + m_current_query = 0; + } + else + { + // If requests are streamed, without the response being waited for, + // then this may cause the index to grow past the length of the array. + // That's ok and is dealt with in book_server_response() and friends. + ++m_current_query; + mxb_assert(m_current_query >= 0); } } } -Session::StatementInfo::StatementInfo(const std::shared_ptr& sStatement) - : m_sStatement(sStatement) +void Session::book_server_response(SERVER* pServer, bool final_response) { - clock_gettime(CLOCK_REALTIME, &m_received); + if (retain_last_statements && !m_last_queries.empty()) + { + mxb_assert(m_current_query >= 0); + // If enough queries have been sent by the client, without it waiting + // for the responses, then at this point it may be so that the query + // object has been popped from the size limited queue. That's apparent + // by the index pointing past the end of the queue. In that case + // we simply ignore the result. + if (m_current_query < static_cast(m_last_queries.size())) + { + auto i = m_last_queries.begin() + m_current_query; + QueryInfo& info = *i; + + mxb_assert(!info.complete()); + + info.book_server_response(pServer, final_response); + } + + if (final_response) + { + // In case what is described in the comment above has occurred, + // this will eventually take the index back into the queue. + --m_current_query; + mxb_assert(m_current_query >= -1); + } + } } -json_t* Session::StatementInfo::as_json() const +void Session::book_last_as_complete() { - json_t* pInfo = json_object(); + if (retain_last_statements && !m_last_queries.empty()) + { + mxb_assert(m_current_query >= 0); + // See comment in book_server_response(). + if (m_current_query < static_cast(m_last_queries.size())) + { + auto i = m_last_queries.begin() + m_current_query; + QueryInfo& info = *i; + info.book_as_complete(); + } + } +} + +void Session::reset_server_bookkeeping() +{ + if (retain_last_statements && !m_last_queries.empty()) + { + mxb_assert(m_current_query >= 0); + // See comment in book_server_response(). + if (m_current_query < static_cast(m_last_queries.size())) + { + auto i = m_last_queries.begin() + m_current_query; + QueryInfo& info = *i; + info.reset_server_bookkeeping(); + } + } +} + +Session::QueryInfo::QueryInfo(const std::shared_ptr& sQuery) + : m_sQuery(sQuery) +{ + clock_gettime(CLOCK_REALTIME_COARSE, &m_received); + m_completed.tv_sec = 0; + m_completed.tv_nsec = 0; +} + +namespace +{ + +static const char ISO_TEMPLATE[] = "2018-11-05T16:47:49.123"; +static const int ISO_TIME_LEN = sizeof(ISO_TEMPLATE) - 1; + +void timespec_to_iso(char* zIso, const timespec& ts) +{ + tm tm; + gmtime_r(&ts.tv_sec, &tm); + + size_t i = strftime(zIso, ISO_TIME_LEN + 1, "%G-%m-%dT%H:%M:%S", &tm); + mxb_assert(i == 19); + long int ms = ts.tv_nsec / 1000000; + i = sprintf(zIso + i, ".%03ld", ts.tv_nsec / 1000000); + mxb_assert(i == 4); +} + +} + +json_t* Session::QueryInfo::as_json() const +{ + json_t* pQuery = json_object(); + + const char* pCmd; char* pStmt; int len; - bool deallocate = get_stmt(m_sStatement.get(), &pStmt, &len); + bool deallocate = get_cmd_and_stmt(m_sQuery.get(), &pCmd, &pStmt, &len); - json_object_set_new(pInfo, "statement", json_stringn(pStmt, len)); - - if (deallocate) + if (pCmd) { - MXS_FREE(pStmt); + json_object_set_new(pQuery, "command", json_string(pCmd)); } - tm tm; - gmtime_r(&m_received.tv_sec, &tm); + if (pStmt) + { + json_object_set_new(pQuery, "statement", json_stringn(pStmt, len)); - static const char ISO_TEMPLATE[] = "2018-11-05T16:47:49.123456789"; - static const int ISO_TIME_LEN = sizeof(ISO_TEMPLATE) - 1; + if (deallocate) + { + MXS_FREE(pStmt); + } + } char iso_time[ISO_TIME_LEN + 1]; - size_t i = strftime(iso_time, sizeof(iso_time), "%G-%m-%dT%H:%M:%S", &tm); - mxb_assert(i == 19); - i = sprintf(iso_time + i, ".%09ld", m_received.tv_nsec); - mxb_assert(i == 10); - json_object_set_new(pInfo, "received", json_stringn(iso_time, ISO_TIME_LEN)); + timespec_to_iso(iso_time, m_received); + json_object_set_new(pQuery, "received", json_stringn(iso_time, ISO_TIME_LEN)); - return pInfo; + if (m_complete) + { + timespec_to_iso(iso_time, m_completed); + json_object_set_new(pQuery, "completed", json_stringn(iso_time, ISO_TIME_LEN)); + } + + json_t* pResponses = json_array(); + + for (auto& info : m_server_infos) + { + json_t* pResponse = json_object(); + + // Calculate and report in milliseconds. + long int received = m_received.tv_sec * 1000 + m_received.tv_nsec / 1000000; + long int processed = info.processed.tv_sec * 1000 + info.processed.tv_nsec / 1000000; + mxb_assert(processed >= received); + + long int duration = processed - received; + + json_object_set_new(pResponse, "server", json_string(info.pServer->name)); + json_object_set_new(pResponse, "duration", json_integer(duration)); + + json_array_append_new(pResponses, pResponse); + } + + json_object_set_new(pQuery, "responses", pResponses); + + return pQuery; +} + +void Session::QueryInfo::book_server_response(SERVER* pServer, bool final_response) +{ + // If the information has been completed, no more information may be provided. + mxb_assert(!m_complete); + // A particular server may be reported only exactly once. + mxb_assert(find_if(m_server_infos.begin(), m_server_infos.end(), [pServer](const ServerInfo& info) { + return info.pServer == pServer; + }) == m_server_infos.end()); + + timespec now; + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + m_server_infos.push_back(ServerInfo {pServer, now}); + + m_complete = final_response; + + if (m_complete) + { + m_completed = now; + } +} + +void Session::QueryInfo::book_as_complete() +{ + timespec now; + clock_gettime(CLOCK_REALTIME_COARSE, &m_completed); + m_complete = true; +} + +void Session::QueryInfo::reset_server_bookkeeping() +{ + m_server_infos.clear(); + m_completed.tv_sec = 0; + m_completed.tv_nsec = 0; + m_complete = false; } diff --git a/server/modules/routing/readwritesplit/rwsplitsession.cc b/server/modules/routing/readwritesplit/rwsplitsession.cc index dd0c33d79..2f1c6ad9b 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.cc +++ b/server/modules/routing/readwritesplit/rwsplitsession.cc @@ -585,6 +585,9 @@ void RWSplitSession::clientReply(GWBUF* writebuf, DCB* backend_dcb) { /** Got a complete reply, decrement expected response count */ m_expected_responses--; + + session_book_server_response(m_pSession, backend->backend()->server, m_expected_responses == 0); + mxb_assert(m_expected_responses >= 0); mxb_assert(backend->get_reply_state() == REPLY_STATE_DONE); MXS_INFO("Reply complete, last reply from %s", backend->name()); @@ -639,6 +642,7 @@ void RWSplitSession::clientReply(GWBUF* writebuf, DCB* backend_dcb) m_otrx_state = OTRX_INACTIVE; start_trx_replay(); gwbuf_free(writebuf); + session_reset_server_bookkeeping(m_pSession); return; } }