MXS-2363 Implement /v1/maxscale/query_classifier/cache
That URL will now return information about the statements in the query classifier cache. The information is collected using the same map in a serial manner from all routing workers (that each have their own cache). Since all caches will contains the same statements, collecting the information in a serial manner means that the overall memory consumption will be lower than what it would be if the information was collected in parallel.
This commit is contained in:
parent
64a0327ada
commit
5f5d2ef183
@ -12,6 +12,7 @@
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include <map>
|
||||
#include <maxscale/ccdefs.hh>
|
||||
#include <maxbase/jansson.h>
|
||||
#include <maxscale/buffer.hh>
|
||||
@ -172,6 +173,17 @@ struct QC_STMT_INFO
|
||||
{
|
||||
};
|
||||
|
||||
/**
|
||||
* QC_STMT_RESULT contains limited information about a particular
|
||||
* statement.
|
||||
*/
|
||||
struct QC_STMT_RESULT
|
||||
{
|
||||
qc_parse_result_t status;
|
||||
uint32_t type_mask;
|
||||
qc_query_op_t op;
|
||||
};
|
||||
|
||||
/**
|
||||
* QUERY_CLASSIFIER defines the object a query classifier plugin must
|
||||
* implement and return.
|
||||
@ -439,6 +451,15 @@ struct QUERY_CLASSIFIER
|
||||
* @param info The info to be closed.
|
||||
*/
|
||||
void (* qc_info_close)(QC_STMT_INFO* info);
|
||||
|
||||
/**
|
||||
* Get result from info.
|
||||
*
|
||||
* @param The info whose result should be returned.
|
||||
*
|
||||
* @return The result of the provided info.
|
||||
*/
|
||||
QC_STMT_RESULT (*qc_get_result_from_info)(const QC_STMT_INFO* info);
|
||||
};
|
||||
|
||||
/**
|
||||
@ -949,3 +970,23 @@ json_t* qc_get_cache_stats_as_json();
|
||||
* @return The corresponding string.
|
||||
*/
|
||||
const char* qc_result_to_string(qc_parse_result_t result);
|
||||
|
||||
/**
|
||||
* Public interface to query classifier cache state.
|
||||
*/
|
||||
struct QC_CACHE_ENTRY
|
||||
{
|
||||
int64_t hits;
|
||||
QC_STMT_RESULT result;
|
||||
};
|
||||
|
||||
/**
|
||||
* Obtain query classifier cache information for the @b calling thread.
|
||||
*
|
||||
* @param state Map where information is added.
|
||||
*
|
||||
* @note Calling with a non-empty @c state means that a cumulative result
|
||||
* will be obtained, that is, the hits of a particular key will
|
||||
* be added the hits of that key if it already is in the map.
|
||||
*/
|
||||
void qc_get_cache_state(std::map<std::string, QC_CACHE_ENTRY>& state);
|
||||
|
@ -3561,6 +3561,7 @@ extern "C"
|
||||
qc_mysql_set_sql_mode,
|
||||
nullptr, // qc_info_dup not supported.
|
||||
nullptr, // qc_info_close not supported.
|
||||
nullptr, // qc_get_result_from_info not supported
|
||||
};
|
||||
|
||||
static MXS_MODULE info =
|
||||
|
@ -256,6 +256,18 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
QC_STMT_RESULT get_result() const
|
||||
{
|
||||
QC_STMT_RESULT result =
|
||||
{
|
||||
m_status,
|
||||
m_type_mask,
|
||||
m_operation
|
||||
};
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
static QcSqliteInfo* create(uint32_t collect)
|
||||
{
|
||||
QcSqliteInfo* pInfo = new(std::nothrow) QcSqliteInfo(collect);
|
||||
@ -4496,27 +4508,29 @@ void maxscaleUse(Parse* pParse, Token* pToken)
|
||||
/**
|
||||
* API
|
||||
*/
|
||||
static int32_t qc_sqlite_setup(qc_sql_mode_t sql_mode, const char* args);
|
||||
static int32_t qc_sqlite_process_init(void);
|
||||
static void qc_sqlite_process_end(void);
|
||||
static int32_t qc_sqlite_thread_init(void);
|
||||
static void qc_sqlite_thread_end(void);
|
||||
static int32_t qc_sqlite_parse(GWBUF* query, uint32_t collect, int32_t* result);
|
||||
static int32_t qc_sqlite_get_type_mask(GWBUF* query, uint32_t* typemask);
|
||||
static int32_t qc_sqlite_get_operation(GWBUF* query, int32_t* op);
|
||||
static int32_t qc_sqlite_get_created_table_name(GWBUF* query, char** name);
|
||||
static int32_t qc_sqlite_is_drop_table_query(GWBUF* query, int32_t* is_drop_table);
|
||||
static int32_t qc_sqlite_get_table_names(GWBUF* query, int32_t fullnames, char*** names, int* tblsize);
|
||||
static int32_t qc_sqlite_get_canonical(GWBUF* query, char** canonical);
|
||||
static int32_t qc_sqlite_query_has_clause(GWBUF* query, int32_t* has_clause);
|
||||
static int32_t qc_sqlite_get_database_names(GWBUF* query, char*** names, int* sizep);
|
||||
static int32_t qc_sqlite_get_preparable_stmt(GWBUF* stmt, GWBUF** preparable_stmt);
|
||||
static void qc_sqlite_set_server_version(uint64_t version);
|
||||
static void qc_sqlite_get_server_version(uint64_t* version);
|
||||
static int32_t qc_sqlite_get_sql_mode(qc_sql_mode_t* sql_mode);
|
||||
static int32_t qc_sqlite_set_sql_mode(qc_sql_mode_t sql_mode);
|
||||
static QC_STMT_INFO* qc_sqlite_info_dup(QC_STMT_INFO* info);
|
||||
static void qc_sqlite_info_close(QC_STMT_INFO* info);
|
||||
static int32_t qc_sqlite_setup(qc_sql_mode_t sql_mode, const char* args);
|
||||
static int32_t qc_sqlite_process_init(void);
|
||||
static void qc_sqlite_process_end(void);
|
||||
static int32_t qc_sqlite_thread_init(void);
|
||||
static void qc_sqlite_thread_end(void);
|
||||
static int32_t qc_sqlite_parse(GWBUF* query, uint32_t collect, int32_t* result);
|
||||
static int32_t qc_sqlite_get_type_mask(GWBUF* query, uint32_t* typemask);
|
||||
static int32_t qc_sqlite_get_operation(GWBUF* query, int32_t* op);
|
||||
static int32_t qc_sqlite_get_created_table_name(GWBUF* query, char** name);
|
||||
static int32_t qc_sqlite_is_drop_table_query(GWBUF* query, int32_t* is_drop_table);
|
||||
static int32_t qc_sqlite_get_table_names(GWBUF* query, int32_t fullnames, char*** names, int* tblsize);
|
||||
static int32_t qc_sqlite_get_canonical(GWBUF* query, char** canonical);
|
||||
static int32_t qc_sqlite_query_has_clause(GWBUF* query, int32_t* has_clause);
|
||||
static int32_t qc_sqlite_get_database_names(GWBUF* query, char*** names, int* sizep);
|
||||
static int32_t qc_sqlite_get_preparable_stmt(GWBUF* stmt, GWBUF** preparable_stmt);
|
||||
static void qc_sqlite_set_server_version(uint64_t version);
|
||||
static void qc_sqlite_get_server_version(uint64_t* version);
|
||||
static int32_t qc_sqlite_get_sql_mode(qc_sql_mode_t* sql_mode);
|
||||
static int32_t qc_sqlite_set_sql_mode(qc_sql_mode_t sql_mode);
|
||||
static QC_STMT_INFO* qc_sqlite_info_dup(QC_STMT_INFO* info);
|
||||
static void qc_sqlite_info_close(QC_STMT_INFO* info);
|
||||
static QC_STMT_RESULT qc_sqlite_get_result_from_info(const QC_STMT_INFO* pInfo);
|
||||
|
||||
|
||||
static bool get_key_and_value(char* arg, const char** pkey, const char** pvalue)
|
||||
{
|
||||
@ -5199,6 +5213,11 @@ void qc_sqlite_info_close(QC_STMT_INFO* info)
|
||||
static_cast<QcSqliteInfo*>(info)->dec_ref();
|
||||
}
|
||||
|
||||
QC_STMT_RESULT qc_sqlite_get_result_from_info(const QC_STMT_INFO* pInfo)
|
||||
{
|
||||
return static_cast<const QcSqliteInfo*>(pInfo)->get_result();
|
||||
}
|
||||
|
||||
/**
|
||||
* EXPORTS
|
||||
*/
|
||||
@ -5234,6 +5253,7 @@ extern "C"
|
||||
qc_sqlite_set_sql_mode,
|
||||
qc_sqlite_info_dup,
|
||||
qc_sqlite_info_close,
|
||||
qc_sqlite_get_result_from_info,
|
||||
};
|
||||
|
||||
static MXS_MODULE info =
|
||||
|
@ -24,6 +24,7 @@
|
||||
#include <maxscale/json_api.hh>
|
||||
#include <maxscale/modutil.hh>
|
||||
#include <maxscale/pcre2.h>
|
||||
#include <maxscale/routingworker.hh>
|
||||
#include <maxscale/utils.h>
|
||||
#include <maxscale/jansson.hh>
|
||||
#include <maxscale/buffer.hh>
|
||||
@ -205,6 +206,40 @@ public:
|
||||
*pStats = m_stats;
|
||||
}
|
||||
|
||||
void get_state(std::map<std::string, QC_CACHE_ENTRY>& state) const
|
||||
{
|
||||
for (const auto& info : m_infos)
|
||||
{
|
||||
const std::string& stmt = info.first;
|
||||
const Entry& entry = info.second;
|
||||
|
||||
auto it = state.find(stmt);
|
||||
|
||||
if (it == state.end())
|
||||
{
|
||||
QC_CACHE_ENTRY e {};
|
||||
|
||||
e.hits = entry.hits;
|
||||
e.result = this_unit.classifier->qc_get_result_from_info(entry.pInfo);
|
||||
|
||||
state.insert(std::make_pair(stmt, e));
|
||||
}
|
||||
else
|
||||
{
|
||||
QC_CACHE_ENTRY& e = it->second;
|
||||
|
||||
e.hits += entry.hits;
|
||||
#if defined (SS_DEBUG)
|
||||
QC_STMT_RESULT result = this_unit.classifier->qc_get_result_from_info(entry.pInfo);
|
||||
|
||||
mxb_assert(e.result.status == result.status);
|
||||
mxb_assert(e.result.type_mask == result.type_mask);
|
||||
mxb_assert(e.result.op == result.op);
|
||||
#endif
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
struct Entry
|
||||
{
|
||||
@ -1507,8 +1542,52 @@ std::unique_ptr<json_t> qc_classify_as_json(const char* zHost, const std::string
|
||||
|
||||
std::unique_ptr<json_t> qc_cache_as_json(const char* zHost)
|
||||
{
|
||||
std::map<std::string, QC_CACHE_ENTRY> state;
|
||||
|
||||
// Assuming the classification cache of all workers will roughly be similar
|
||||
// (which will be the case unless something is broken), collecting the
|
||||
// information serially from all routing workers will consume 1/N of the
|
||||
// memory that would be consumed if the information were collected in
|
||||
// parallel and then coalesced here.
|
||||
|
||||
mxs::RoutingWorker::execute_serially([&state]() {
|
||||
qc_get_cache_state(state);
|
||||
});
|
||||
|
||||
json_t* pArray = json_array();
|
||||
|
||||
for (const auto& p : state)
|
||||
{
|
||||
const auto& stmt = p.first;
|
||||
const auto& entry = p.second;
|
||||
|
||||
json_t* pStmt = json_string(stmt.c_str());
|
||||
json_t* pHits = json_integer(entry.hits);
|
||||
json_t* pClassification = json_object();
|
||||
|
||||
json_object_set_new(pClassification,
|
||||
CN_PARSE_RESULT, json_string(qc_result_to_string(entry.result.status)));
|
||||
|
||||
char* zType_mask = qc_typemask_to_string(entry.result.type_mask);
|
||||
json_object_set_new(pClassification, CN_TYPE_MASK, json_string(zType_mask));
|
||||
MXS_FREE(zType_mask);
|
||||
json_object_set_new(pClassification,
|
||||
CN_OPERATION,
|
||||
json_string(qc_op_to_string(entry.result.op)));
|
||||
|
||||
json_t* pObject = json_object();
|
||||
|
||||
json_object_set_new(pObject, CN_STATEMENT, pStmt);
|
||||
json_object_set_new(pObject, CN_HITS, pHits);
|
||||
json_object_set_new(pObject, CN_CLASSIFICATION, pClassification);
|
||||
|
||||
json_array_append(pArray, pObject);
|
||||
}
|
||||
|
||||
json_t* pAttributes = json_object();
|
||||
|
||||
json_object_set_new(pAttributes, CN_STATEMENTS, pArray);
|
||||
|
||||
json_t* pSelf = json_object();
|
||||
json_object_set_new(pSelf, CN_ID, json_string(CN_CACHE));
|
||||
json_object_set_new(pSelf, CN_TYPE, json_string(CN_CACHE));
|
||||
@ -1516,3 +1595,13 @@ std::unique_ptr<json_t> qc_cache_as_json(const char* zHost)
|
||||
|
||||
return std::unique_ptr<json_t>(mxs_json_resource(zHost, MXS_JSON_API_QC_CACHE, pSelf));
|
||||
}
|
||||
|
||||
void qc_get_cache_state(std::map<std::string, QC_CACHE_ENTRY>& state)
|
||||
{
|
||||
QCInfoCache* pCache = this_thread.pInfo_cache;
|
||||
|
||||
if (pCache)
|
||||
{
|
||||
pCache->get_state(state);
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user