MXS-1780 Collect server response information

As the router is the only one that knows what backends a particular
statement has been sent to, it is the responsibility of the router
to keep the session bookkeeping up to date. If it doesn't we will
know what statements a session has received (provided at least some
component in the routing chain has RCAP_TYPE_STMT_INPUT capability),
but not how long their processing took. Currently only readwritesplit
does that.

All queries are stored and not just COM_QUERY as that makes the
overall bookkeeping simpler; at clientReply() time we do not need to
know whether or not to bookkeep information, we can just do it.

When session information is queried for, we report as much information
we have available.
This commit is contained in:
Johan Wikman 2018-11-07 09:45:13 +02:00
parent c78c5a615d
commit c899f00541
4 changed files with 328 additions and 95 deletions

View File

@ -628,6 +628,26 @@ void session_set_retain_last_statements(uint32_t n);
*/
void session_retain_statement(MXS_SESSION* session, GWBUF* buffer);
/**
* @brief Book a server response for the statement currently being handled.
*
* @param session The session.
* @param server The server having returned a response.
* @param final_response True if this was the final server to respond,
* false otherwise.
*/
void session_book_server_response(MXS_SESSION* session, struct server* server, bool final_response);
/**
* @brief Reset the server bookkeeping for the current statement.
*
* To be called, e.g., after a transaction is rolled back (possibly with
* results having been reported) and before it is replayed.
*
* @param session The session.
*/
void session_reset_server_bookkeeping(MXS_SESSION* session);
/**
* @brief Dump the last statements, if statements have been retained.
*

View File

@ -61,24 +61,42 @@ public:
class Session : public MXS_SESSION
{
public:
class StatementInfo
class QueryInfo
{
public:
StatementInfo(const std::shared_ptr<GWBUF>& sStatement);
QueryInfo(const std::shared_ptr<GWBUF>& sQuery);
json_t* as_json() const;
const std::shared_ptr<GWBUF>& statement() const
bool complete() const
{
return m_sStatement;
return m_complete;
}
const std::shared_ptr<GWBUF>& query() const
{
return m_sQuery;
}
void book_server_response(SERVER* pServer, bool final_response);
void book_as_complete();
void reset_server_bookkeeping();
struct ServerInfo
{
SERVER* pServer;
timespec processed;
};
private:
std::shared_ptr<GWBUF> m_sStatement;
timespec m_received;
std::shared_ptr<GWBUF> m_sQuery; /*< The packet, COM_QUERY *or* something else. */
timespec m_received; /*< When was it received. */
timespec m_completed; /*< When was it completed. */
std::vector<ServerInfo> m_server_infos; /*< When different servers responded. */
bool m_complete = false; /*< Is this information complete? */
};
typedef std::deque<StatementInfo> SessionStmtQueue;
typedef std::deque<QueryInfo> QueryInfos;
using FilterList = std::vector<SessionFilter>;
@ -99,8 +117,11 @@ public:
bool remove_variable(const char* name, void** context);
void retain_statement(GWBUF* pBuffer);
void dump_statements() const;
void book_server_response(SERVER* pServer, bool final_response);
void book_last_as_complete();
void reset_server_bookkeeping();
json_t* statements_as_json() const;
json_t* queries_as_json() const;
void link_backend_dcb(DCB* dcb)
{
@ -122,8 +143,9 @@ public:
private:
FilterList m_filters;
SessionVarsByName m_variables;
SessionStmtQueue m_last_statements;/*< The N last statements by the client */
DCBSet m_dcb_set; /*< Set of associated backend DCBs */
QueryInfos m_last_queries; /*< The N last queries by the client */
int m_current_query = -1; /*< The index of the current query */
DCBSet m_dcb_set; /*< Set of associated backend DCBs */
};
}

View File

@ -897,8 +897,8 @@ json_t* session_json_data(const Session* session, const char* host)
json_object_set_new(attr, "connections", dcb_arr);
json_t* statements = session->statements_as_json();
json_object_set_new(attr, "statements", statements);
json_t* queries = session->queries_as_json();
json_object_set_new(attr, "queries", queries);
json_object_set_new(data, CN_ATTRIBUTES, attr);
json_object_set_new(data, CN_LINKS, mxs_json_self_link(host, CN_SESSIONS, ss.str().c_str()));
@ -1029,6 +1029,11 @@ static void session_deliver_response(MXS_SESSION* session)
session->response.up.session = NULL;
session->response.up.clientReply = NULL;
session->response.buffer = NULL;
// If some filter short-circuits the routing, then there will
// be no response from a server and we need to ensure that
// subsequent book-keeping targets the right statement.
static_cast<Session*>(session)->book_last_as_complete();
}
mxb_assert(!session->response.up.instance);
@ -1052,10 +1057,19 @@ session_dump_statements_t session_get_dump_statements()
return dump_statements;
}
void session_retain_statement(MXS_SESSION* session, GWBUF* pBuffer)
void session_retain_statement(MXS_SESSION* pSession, GWBUF* pBuffer)
{
Session* pSession = static_cast<Session*>(session);
pSession->retain_statement(pBuffer);
static_cast<Session*>(pSession)->retain_statement(pBuffer);
}
void session_book_server_response(MXS_SESSION* pSession, SERVER* pServer, bool final_response)
{
static_cast<Session*>(pSession)->book_server_response(pServer, final_response);
}
void session_reset_server_bookkeeping(MXS_SESSION* pSession)
{
static_cast<Session*>(pSession)->reset_server_bookkeeping();
}
void session_dump_statements(MXS_SESSION* session)
@ -1193,21 +1207,50 @@ Session::~Session()
namespace
{
bool get_stmt(GWBUF* pBuffer, char** ppStmt, int* pLen)
bool get_cmd_and_stmt(GWBUF* pBuffer, const char** ppCmd, char** ppStmt, int* pLen)
{
mxb_assert(modutil_is_SQL(pBuffer));
*ppCmd = nullptr;
*ppStmt = nullptr;
*pLen = 0;
if (GWBUF_IS_CONTIGUOUS(pBuffer))
bool deallocate = false;
int len = gwbuf_length(pBuffer);
if (len > MYSQL_HEADER_LEN)
{
modutil_extract_SQL(pBuffer, ppStmt, pLen);
}
else
{
*ppStmt = modutil_get_SQL(pBuffer);
*pLen = strlen(*ppStmt);
uint8_t header[MYSQL_HEADER_LEN + 1];
uint8_t* pHeader = NULL;
if (GWBUF_LENGTH(pBuffer) > MYSQL_HEADER_LEN)
{
pHeader = GWBUF_DATA(pBuffer);
}
else
{
gwbuf_copy_data(pBuffer, 0, MYSQL_HEADER_LEN + 1, header);
pHeader = header;
}
int cmd = MYSQL_GET_COMMAND(pHeader);
*ppCmd = STRPACKETTYPE(cmd);
if (cmd == MXS_COM_QUERY)
{
if (GWBUF_IS_CONTIGUOUS(pBuffer))
{
modutil_extract_SQL(pBuffer, ppStmt, pLen);
}
else
{
*ppStmt = modutil_get_SQL(pBuffer);
*pLen = strlen(*ppStmt);
deallocate = true;
}
}
}
return !GWBUF_IS_CONTIGUOUS(pBuffer);
return deallocate;
}
}
@ -1216,7 +1259,7 @@ void Session::dump_statements() const
{
if (retain_last_statements)
{
int n = m_last_statements.size();
int n = m_last_queries.size();
uint64_t id = session_get_current_id();
@ -1228,30 +1271,34 @@ void Session::dump_statements() const
ses_id);
}
for (auto i = m_last_statements.rbegin(); i != m_last_statements.rend(); ++i)
for (auto i = m_last_queries.rbegin(); i != m_last_queries.rend(); ++i)
{
const StatementInfo& info = *i;
const std::shared_ptr<GWBUF>& sBuffer = info.statement();
const QueryInfo& info = *i;
GWBUF* pBuffer = info.query().get();
const char* pCmd;
char* pStmt;
int len;
bool deallocate = get_stmt(sBuffer.get(), &pStmt, &len);
bool deallocate = get_cmd_and_stmt(pBuffer, &pCmd, &pStmt, &len);
if (id != 0)
if (pStmt)
{
MXS_NOTICE("Stmt %d: %.*s", n, len, pStmt);
}
else
{
// We are in a context where we do not have a current session, so we need to
// log the session id ourselves.
if (id != 0)
{
MXS_NOTICE("Stmt %d: %.*s", n, len, pStmt);
}
else
{
// We are in a context where we do not have a current session, so we need to
// log the session id ourselves.
MXS_NOTICE("(%" PRIu64 ") Stmt %d: %.*s", ses_id, n, len, pStmt);
}
MXS_NOTICE("(%" PRIu64 ") Stmt %d: %.*s", ses_id, n, len, pStmt);
}
if (deallocate)
{
MXS_FREE(pStmt);
if (deallocate)
{
MXS_FREE(pStmt);
}
}
--n;
@ -1259,18 +1306,18 @@ void Session::dump_statements() const
}
}
json_t* Session::statements_as_json() const
json_t* Session::queries_as_json() const
{
json_t* pStatements = json_array();
json_t* pQueries = json_array();
for (auto i = m_last_statements.rbegin(); i != m_last_statements.rend(); ++i)
for (auto i = m_last_queries.rbegin(); i != m_last_queries.rend(); ++i)
{
const StatementInfo& info = *i;
const QueryInfo& info = *i;
json_array_append_new(pStatements, info.as_json());
json_array_append_new(pQueries, info.as_json());
}
return pStatements;
return pQueries;
}
bool Session::setup_filters(Service* service)
@ -1421,74 +1468,214 @@ void Session::retain_statement(GWBUF* pBuffer)
{
if (retain_last_statements)
{
size_t len = gwbuf_length(pBuffer);
mxb_assert(m_last_queries.size() <= retain_last_statements);
if (len > MYSQL_HEADER_LEN)
std::shared_ptr<GWBUF> sBuffer(gwbuf_clone(pBuffer));
m_last_queries.push_front(QueryInfo(sBuffer));
if (m_last_queries.size() > retain_last_statements)
{
uint8_t header[MYSQL_HEADER_LEN + 1];
uint8_t* pHeader = NULL;
m_last_queries.pop_back();
}
if (GWBUF_LENGTH(pBuffer) > MYSQL_HEADER_LEN)
{
pHeader = GWBUF_DATA(pBuffer);
}
else
{
gwbuf_copy_data(pBuffer, 0, MYSQL_HEADER_LEN + 1, header);
pHeader = header;
}
if (MYSQL_GET_COMMAND(pHeader) == MXS_COM_QUERY)
{
mxb_assert(m_last_statements.size() <= retain_last_statements);
if (m_last_statements.size() == retain_last_statements)
{
m_last_statements.pop_back();
}
std::shared_ptr<GWBUF> sBuffer(gwbuf_clone(pBuffer));
m_last_statements.push_front(StatementInfo(sBuffer));
}
if (m_last_queries.size() == 1)
{
mxb_assert(m_current_query == -1);
m_current_query = 0;
}
else
{
// If requests are streamed, without the response being waited for,
// then this may cause the index to grow past the length of the array.
// That's ok and is dealt with in book_server_response() and friends.
++m_current_query;
mxb_assert(m_current_query >= 0);
}
}
}
Session::StatementInfo::StatementInfo(const std::shared_ptr<GWBUF>& sStatement)
: m_sStatement(sStatement)
void Session::book_server_response(SERVER* pServer, bool final_response)
{
clock_gettime(CLOCK_REALTIME, &m_received);
if (retain_last_statements && !m_last_queries.empty())
{
mxb_assert(m_current_query >= 0);
// If enough queries have been sent by the client, without it waiting
// for the responses, then at this point it may be so that the query
// object has been popped from the size limited queue. That's apparent
// by the index pointing past the end of the queue. In that case
// we simply ignore the result.
if (m_current_query < static_cast<int>(m_last_queries.size()))
{
auto i = m_last_queries.begin() + m_current_query;
QueryInfo& info = *i;
mxb_assert(!info.complete());
info.book_server_response(pServer, final_response);
}
if (final_response)
{
// In case what is described in the comment above has occurred,
// this will eventually take the index back into the queue.
--m_current_query;
mxb_assert(m_current_query >= -1);
}
}
}
json_t* Session::StatementInfo::as_json() const
void Session::book_last_as_complete()
{
json_t* pInfo = json_object();
if (retain_last_statements && !m_last_queries.empty())
{
mxb_assert(m_current_query >= 0);
// See comment in book_server_response().
if (m_current_query < static_cast<int>(m_last_queries.size()))
{
auto i = m_last_queries.begin() + m_current_query;
QueryInfo& info = *i;
info.book_as_complete();
}
}
}
void Session::reset_server_bookkeeping()
{
if (retain_last_statements && !m_last_queries.empty())
{
mxb_assert(m_current_query >= 0);
// See comment in book_server_response().
if (m_current_query < static_cast<int>(m_last_queries.size()))
{
auto i = m_last_queries.begin() + m_current_query;
QueryInfo& info = *i;
info.reset_server_bookkeeping();
}
}
}
Session::QueryInfo::QueryInfo(const std::shared_ptr<GWBUF>& sQuery)
: m_sQuery(sQuery)
{
clock_gettime(CLOCK_REALTIME_COARSE, &m_received);
m_completed.tv_sec = 0;
m_completed.tv_nsec = 0;
}
namespace
{
static const char ISO_TEMPLATE[] = "2018-11-05T16:47:49.123";
static const int ISO_TIME_LEN = sizeof(ISO_TEMPLATE) - 1;
void timespec_to_iso(char* zIso, const timespec& ts)
{
tm tm;
gmtime_r(&ts.tv_sec, &tm);
size_t i = strftime(zIso, ISO_TIME_LEN + 1, "%G-%m-%dT%H:%M:%S", &tm);
mxb_assert(i == 19);
long int ms = ts.tv_nsec / 1000000;
i = sprintf(zIso + i, ".%03ld", ts.tv_nsec / 1000000);
mxb_assert(i == 4);
}
}
json_t* Session::QueryInfo::as_json() const
{
json_t* pQuery = json_object();
const char* pCmd;
char* pStmt;
int len;
bool deallocate = get_stmt(m_sStatement.get(), &pStmt, &len);
bool deallocate = get_cmd_and_stmt(m_sQuery.get(), &pCmd, &pStmt, &len);
json_object_set_new(pInfo, "statement", json_stringn(pStmt, len));
if (deallocate)
if (pCmd)
{
MXS_FREE(pStmt);
json_object_set_new(pQuery, "command", json_string(pCmd));
}
tm tm;
gmtime_r(&m_received.tv_sec, &tm);
if (pStmt)
{
json_object_set_new(pQuery, "statement", json_stringn(pStmt, len));
static const char ISO_TEMPLATE[] = "2018-11-05T16:47:49.123456789";
static const int ISO_TIME_LEN = sizeof(ISO_TEMPLATE) - 1;
if (deallocate)
{
MXS_FREE(pStmt);
}
}
char iso_time[ISO_TIME_LEN + 1];
size_t i = strftime(iso_time, sizeof(iso_time), "%G-%m-%dT%H:%M:%S", &tm);
mxb_assert(i == 19);
i = sprintf(iso_time + i, ".%09ld", m_received.tv_nsec);
mxb_assert(i == 10);
json_object_set_new(pInfo, "received", json_stringn(iso_time, ISO_TIME_LEN));
timespec_to_iso(iso_time, m_received);
json_object_set_new(pQuery, "received", json_stringn(iso_time, ISO_TIME_LEN));
return pInfo;
if (m_complete)
{
timespec_to_iso(iso_time, m_completed);
json_object_set_new(pQuery, "completed", json_stringn(iso_time, ISO_TIME_LEN));
}
json_t* pResponses = json_array();
for (auto& info : m_server_infos)
{
json_t* pResponse = json_object();
// Calculate and report in milliseconds.
long int received = m_received.tv_sec * 1000 + m_received.tv_nsec / 1000000;
long int processed = info.processed.tv_sec * 1000 + info.processed.tv_nsec / 1000000;
mxb_assert(processed >= received);
long int duration = processed - received;
json_object_set_new(pResponse, "server", json_string(info.pServer->name));
json_object_set_new(pResponse, "duration", json_integer(duration));
json_array_append_new(pResponses, pResponse);
}
json_object_set_new(pQuery, "responses", pResponses);
return pQuery;
}
void Session::QueryInfo::book_server_response(SERVER* pServer, bool final_response)
{
// If the information has been completed, no more information may be provided.
mxb_assert(!m_complete);
// A particular server may be reported only exactly once.
mxb_assert(find_if(m_server_infos.begin(), m_server_infos.end(), [pServer](const ServerInfo& info) {
return info.pServer == pServer;
}) == m_server_infos.end());
timespec now;
clock_gettime(CLOCK_REALTIME_COARSE, &now);
m_server_infos.push_back(ServerInfo {pServer, now});
m_complete = final_response;
if (m_complete)
{
m_completed = now;
}
}
void Session::QueryInfo::book_as_complete()
{
timespec now;
clock_gettime(CLOCK_REALTIME_COARSE, &m_completed);
m_complete = true;
}
void Session::QueryInfo::reset_server_bookkeeping()
{
m_server_infos.clear();
m_completed.tv_sec = 0;
m_completed.tv_nsec = 0;
m_complete = false;
}

View File

@ -585,6 +585,9 @@ void RWSplitSession::clientReply(GWBUF* writebuf, DCB* backend_dcb)
{
/** Got a complete reply, decrement expected response count */
m_expected_responses--;
session_book_server_response(m_pSession, backend->backend()->server, m_expected_responses == 0);
mxb_assert(m_expected_responses >= 0);
mxb_assert(backend->get_reply_state() == REPLY_STATE_DONE);
MXS_INFO("Reply complete, last reply from %s", backend->name());
@ -639,6 +642,7 @@ void RWSplitSession::clientReply(GWBUF* writebuf, DCB* backend_dcb)
m_otrx_state = OTRX_INACTIVE;
start_trx_replay();
gwbuf_free(writebuf);
session_reset_server_bookkeeping(m_pSession);
return;
}
}