Merge branch 'develop' into tee_fixes

This commit is contained in:
Markus Makela
2015-03-06 18:27:41 +02:00
49 changed files with 4515 additions and 84 deletions

View File

@ -34,6 +34,10 @@ add_library(namedserverfilter SHARED namedserverfilter.c)
target_link_libraries(namedserverfilter log_manager utils)
install(TARGETS namedserverfilter DESTINATION modules)
add_library(lagfilter SHARED lagfilter.c)
target_link_libraries(lagfilter log_manager utils query_classifier)
install(TARGETS lagfilter DESTINATION modules)
add_subdirectory(hint)

View File

@ -1300,6 +1300,7 @@ bool rule_matches(FW_INSTANCE* my_instance, FW_SESSION* my_session, GWBUF *queue
char *ptr,*where,*msg = NULL;
char emsg[512];
int qlen;
unsigned char* memptr = (unsigned char*)queue->start;
bool is_sql, is_real, matches;
skygw_query_op_t optype = QUERY_OP_UNDEFINED;
STRLINK* strln = NULL;
@ -1312,15 +1313,15 @@ bool rule_matches(FW_INSTANCE* my_instance, FW_SESSION* my_session, GWBUF *queue
tm_now = localtime(&time_now);
matches = false;
is_sql = modutil_is_SQL(queue);
is_sql = modutil_is_SQL(queue) || modutil_is_SQL_prepare(queue);
if(is_sql){
if(!query_is_parsed(queue)){
parse_query(queue);
}
optype = query_classifier_get_operation(queue);
modutil_extract_SQL(queue, &ptr, &qlen);
is_real = skygw_is_real_query(queue);
qlen = gw_mysql_get_byte3(memptr) - 1;
}
if(rulelist->rule->on_queries == QUERY_OP_UNDEFINED || rulelist->rule->on_queries & optype){
@ -1547,18 +1548,19 @@ bool check_match_any(FW_INSTANCE* my_instance, FW_SESSION* my_session, GWBUF *qu
bool is_sql, rval = false;
int qlen;
char *fullquery = NULL,*ptr;
unsigned char* memptr = (unsigned char*)queue->start;
RULELIST* rulelist;
is_sql = modutil_is_SQL(queue);
is_sql = modutil_is_SQL(queue) || modutil_is_SQL_prepare(queue);
if(is_sql){
if(!query_is_parsed(queue)){
parse_query(queue);
}
modutil_extract_SQL(queue, &ptr, &qlen);
fullquery = malloc((qlen + 1) * sizeof(char));
memcpy(fullquery,ptr,qlen);
memset(fullquery + qlen,0,1);
qlen = gw_mysql_get_byte3(memptr);
fullquery = malloc((qlen) * sizeof(char));
memcpy(fullquery,memptr + 5,qlen - 1);
memset(fullquery + qlen - 1,0,1);
}
if((rulelist = user->rules_or) == NULL)
@ -1598,21 +1600,21 @@ bool check_match_all(FW_INSTANCE* my_instance, FW_SESSION* my_session, GWBUF *qu
{
bool is_sql, rval = true;
int qlen;
unsigned char* memptr = (unsigned char*)queue->start;
char *fullquery = NULL,*ptr;
RULELIST* rulelist;
is_sql = modutil_is_SQL(queue);
is_sql = modutil_is_SQL(queue) || modutil_is_SQL_prepare(queue);
if(is_sql){
if(!query_is_parsed(queue)){
parse_query(queue);
}
modutil_extract_SQL(queue, &ptr, &qlen);
fullquery = malloc((qlen + 1) * sizeof(char));
memcpy(fullquery,ptr,qlen);
memset(fullquery + qlen,0,1);
qlen = gw_mysql_get_byte3(memptr);
fullquery = malloc((qlen) * sizeof(char));
memcpy(fullquery,memptr + 5,qlen - 1);
memset(fullquery + qlen - 1,0,1);
}
if(strict_all)

View File

@ -0,0 +1,400 @@
/*
* This file is distributed as part of MaxScale by MariaDB Corporation. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright MariaDB Corporation Ab 2014
*/
#include <stdio.h>
#include <filter.h>
#include <modinfo.h>
#include <modutil.h>
#include <skygw_utils.h>
#include <log_manager.h>
#include <string.h>
#include <hint.h>
#include <query_classifier.h>
#include <regex.h>
/** Defined in log_manager.cc */
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
/**
* @file lagfilter.c - a very simple filter designed to send queries to the
* master server after data modification has occurred. This is done to prevent
* replication lag affecting the outcome of a select query.
*
* @verbatim
*
* Two optional parameters that define the behavior after a data modifying query
* is executed:
*
* count=<number of queries> Queries to route to master after data modification.
* time=<time period> Seconds to wait before queries are routed to slaves.
*
* Date Who Description
* 03/03/2015 Markus Mäkelä Written for demonstrative purposes
* @endverbatim
*/
MODULE_INFO info = {
MODULE_API_FILTER,
MODULE_GA,
FILTER_VERSION,
"A routing hint filter that send queries to the master after data modification"
};
static char *version_str = "V1.1.0";
static FILTER *createInstance(char **options, FILTER_PARAMETER **params);
static void *newSession(FILTER *instance, SESSION *session);
static void closeSession(FILTER *instance, void *session);
static void freeSession(FILTER *instance, void *session);
static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstream);
static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue);
static void diagnostic(FILTER *instance, void *fsession, DCB *dcb);
static FILTER_OBJECT MyObject = {
createInstance,
newSession,
closeSession,
freeSession,
setDownstream,
NULL, // No Upstream requirement
routeQuery,
NULL,
diagnostic,
};
struct LAGSTATS{
int n_add_count; /*< No. of statements diverted based on count */
int n_add_time; /*< No. of statements diverted based on time */
int n_modified; /*< No. of statements not diverted */
};
/**
* Instance structure
*/
typedef struct {
char *match; /* Regular expression to match */
char *nomatch; /* Regular expression to ignore */
int time; /*< The number of seconds to wait before routing queries
* to slave servers after a data modification operation
* is done. */
int count; /*< Number of hints to add after each operation
* that modifies data. */
struct LAGSTATS stats;
regex_t re; /* Compiled regex text of match */
regex_t nore; /* Compiled regex text of ignore */
} LAG_INSTANCE;
/**
* The session structure for this filter
*/
typedef struct {
DOWNSTREAM down; /*< The downstream filter */
int hints_left; /*< Number of hints left to add to queries*/
time_t last_modification; /*< Time of the last modifying operation */
int active; /*< Is filter active */
} LAG_SESSION;
/**
* Implementation of the mandatory version entry point
*
* @return version string of the module
*/
char *
version()
{
return version_str;
}
/**
* The module initialization routine, called when the module
* is first loaded.
*/
void
ModuleInit()
{
}
/**
* The module entry point routine. It is this routine that
* must populate the structure that is referred to as the
* "module object", this is a structure with the set of
* external entry points for this module.
*
* @return The module object
*/
FILTER_OBJECT *
GetModuleObject()
{
return &MyObject;
}
/**
* Create an instance of the filter for a particular service
* within MaxScale.
*
* @param options The options for this filter
* @param params The array of name/value pair parameters for the filter
*
* @return The instance data for this new instance
*/
static FILTER *
createInstance(char **options, FILTER_PARAMETER **params)
{
LAG_INSTANCE *my_instance;
int i,cflags = 0;
if ((my_instance = calloc(1, sizeof(LAG_INSTANCE))) != NULL)
{
my_instance->count = 0;
my_instance->time = 0;
my_instance->stats.n_add_count = 0;
my_instance->stats.n_add_time = 0;
my_instance->stats.n_modified = 0;
my_instance->match = NULL;
my_instance->nomatch = NULL;
for (i = 0; params && params[i]; i++)
{
if (!strcmp(params[i]->name, "count"))
my_instance->count = atoi(params[i]->value);
else if (!strcmp(params[i]->name, "time"))
my_instance->time = atoi(params[i]->value);
else if (!strcmp(params[i]->name, "match"))
my_instance->match = strdup(params[i]->value);
else if (!strcmp(params[i]->name, "ignore"))
my_instance->nomatch = strdup(params[i]->value);
else
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"lagfilter: Unexpected parameter '%s'.\n",
params[i]->name)));
}
}
if (options)
{
for (i = 0; options[i]; i++)
{
if (!strcasecmp(options[i], "ignorecase"))
{
cflags |= REG_ICASE;
}
else if (!strcasecmp(options[i], "case"))
{
cflags &= ~REG_ICASE;
}
else
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"lagfilter: unsupported option '%s'.",
options[i])));
}
}
}
if(my_instance->match)
{
if(regcomp(&my_instance->re,my_instance->match,cflags))
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"lagfilter: Failed to compile regex '%s'.",
my_instance->match)));
}
}
if(my_instance->nomatch)
{
if(regcomp(&my_instance->nore,my_instance->nomatch,cflags))
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"lagfilter: Failed to compile regex '%s'.",
my_instance->nomatch)));
}
}
}
return (FILTER *)my_instance;
}
/**
* Associate a new session with this instance of the filter.
*
* @param instance The filter instance data
* @param session The session itself
* @return Session specific data for this session
*/
static void *
newSession(FILTER *instance, SESSION *session)
{
LAG_INSTANCE *my_instance = (LAG_INSTANCE *)instance;
LAG_SESSION *my_session;
if ((my_session = malloc(sizeof(LAG_SESSION))) != NULL)
{
my_session->active = 1;
my_session->hints_left = 0;
my_session->last_modification = 0;
}
return my_session;
}
/**
* Close a session with the filter, this is the mechanism
* by which a filter may cleanup data structure etc.
*
* @param instance The filter instance data
* @param session The session being closed
*/
static void
closeSession(FILTER *instance, void *session)
{
}
/**
* Free the memory associated with this filter session.
*
* @param instance The filter instance data
* @param session The session being closed
*/
static void
freeSession(FILTER *instance, void *session)
{
free(session);
return;
}
/**
* Set the downstream component for this filter.
*
* @param instance The filter instance data
* @param session The session being closed
* @param downstream The downstream filter or router
*/
static void
setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream)
{
LAG_SESSION *my_session = (LAG_SESSION *)session;
my_session->down = *downstream;
}
/**
* The routeQuery entry point. This is passed the query buffer
* to which the filter should be applied. Once applied the
* query should normally be passed to the downstream component
* (filter or router) in the filter chain.
*
* If the regular expressed configured in the match parameter of the
* filter definition matches the SQL text then add the hint
* "Route to named server" with the name defined in the server parameter
*
* @param instance The filter instance data
* @param session The filter session
* @param queue The query data
*/
static int
routeQuery(FILTER *instance, void *session, GWBUF *queue)
{
LAG_INSTANCE *my_instance = (LAG_INSTANCE *)instance;
LAG_SESSION *my_session = (LAG_SESSION *)session;
char *sql;
time_t now = time(NULL);
if (modutil_is_SQL(queue))
{
if (queue->next != NULL)
{
queue = gwbuf_make_contiguous(queue);
}
if(!query_is_parsed(queue))
{
parse_query(queue);
}
if(query_classifier_get_operation(queue) & (QUERY_OP_DELETE|QUERY_OP_INSERT|QUERY_OP_UPDATE))
{
sql = modutil_get_SQL(queue);
if(my_instance->nomatch == NULL||(my_instance->nomatch && regexec(&my_instance->nore,sql,0,NULL,0) != 0))
{
if(my_instance->match == NULL||
(my_instance->match && regexec(&my_instance->re,sql,0,NULL,0) == 0))
{
my_session->hints_left = my_instance->count;
my_session->last_modification = now;
my_instance->stats.n_modified++;
}
}
free(sql);
}
else if(my_session->hints_left > 0)
{
queue->hint = hint_create_route(queue->hint,
HINT_ROUTE_TO_MASTER,
NULL);
my_session->hints_left--;
my_instance->stats.n_add_count++;
}
else if(difftime(now,my_session->last_modification) < my_instance->time)
{
queue->hint = hint_create_route(queue->hint,
HINT_ROUTE_TO_MASTER,
NULL);
my_instance->stats.n_add_time++;
}
}
return my_session->down.routeQuery(my_session->down.instance,
my_session->down.session, queue);
}
/**
* Diagnostics routine
*
* If fsession is NULL then print diagnostics on the filter
* instance as a whole, otherwise print diagnostics for the
* particular session.
*
* @param instance The filter instance
* @param fsession Filter session, may be NULL
* @param dcb The DCB for diagnostic output
*/
static void
diagnostic(FILTER *instance, void *fsession, DCB *dcb)
{
LAG_INSTANCE *my_instance = (LAG_INSTANCE *)instance;
LAG_SESSION *my_session = (LAG_SESSION *)fsession;
dcb_printf(dcb, "Configuration:\n\tCount: %d\n",
my_instance->count);
dcb_printf(dcb, "\tTime: %d seconds\n\n",
my_instance->time);
dcb_printf(dcb, "Statistics:\n");
dcb_printf(dcb, "\tNo. of data modifications: %d\n",
my_instance->stats.n_modified);
dcb_printf(dcb, "\tNo. of hints added based on count: %d\n",
my_instance->stats.n_add_count);
dcb_printf(dcb, "\tNo. of hints added based on time: %d\n",
my_instance->stats.n_add_time);
}

View File

@ -0,0 +1,132 @@
#ifndef _MAXINFO_H
#define _MAXINFO_H
/*
* This file is distributed as part of the MariaDB Corporation MaxScale. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright MariaDB Corporation Ab 2013-2014
*/
#include <service.h>
#include <session.h>
#include <spinlock.h>
/**
* @file maxinfo.h The MaxScale information schema provider
*
* @verbatim
* Revision History
*
* Date Who Description
* 16/02/15 Mark Riddoch Initial implementation
*
* @endverbatim
*/
struct maxinfo_session;
/**
* The INFO_INSTANCE structure. There is one instane of the maxinfo "router" for
* each service that uses the MaxScale information schema.
*/
typedef struct maxinfo_instance {
SPINLOCK lock; /*< The instance spinlock */
SERVICE *service; /*< The debug cli service */
struct maxinfo_session
*sessions; /*< Linked list of sessions within this instance */
struct maxinfo_instance
*next; /*< The next pointer for the list of instances */
} INFO_INSTANCE;
/**
* The INFO_SESSION structure. As INFO_SESSION is created for each user that logs into
* the MaxScale information schema.
*/
typedef struct maxinfo_session {
SESSION *session; /*< The MaxScale session */
DCB *dcb; /*< DCB of the client side */
GWBUF *queue; /*< Queue for building contiguous requests */
struct maxinfo_session
*next; /*< The next pointer for the list of sessions */
} INFO_SESSION;
/**
* The operators that can be in the parse tree
*/
typedef enum
{
MAXOP_SHOW,
MAXOP_SELECT,
MAXOP_TABLE,
MAXOP_COLUMNS,
MAXOP_ALL_COLUMNS,
MAXOP_LITERAL,
MAXOP_PREDICATE,
MAXOP_LIKE,
MAXOP_EQUAL
} MAXINFO_OPERATOR;
/**
* The Parse tree nodes for the MaxInfo parser
*/
typedef struct maxinfo_tree {
MAXINFO_OPERATOR op; /*< The operator */
char *value; /*< The value */
struct maxinfo_tree *left; /*< The left hand side of the operator */
struct maxinfo_tree *right; /*< The right hand side of the operator */
} MAXINFO_TREE;
#define MYSQL_COMMAND(buf) (*((uint8_t *)GWBUF_DATA(buf) + 4))
/**
* MySQL protocol OpCodes needed for replication
*/
#define COM_QUIT 0x01
#define COM_QUERY 0x03
#define COM_STATISTICS 0x09
#define COM_PING 0x0e
/**
* Token values for the tokeniser used by the parser for maxinfo
*/
#define LT_STRING 1
#define LT_SHOW 2
#define LT_LIKE 3
#define LT_SELECT 4
#define LT_EQUAL 5
#define LT_COMMA 6
#define LT_FROM 7
#define LT_STAR 8
#define LT_VARIABLE 9
/**
* Possible parse errors
*/
typedef enum {
PARSE_NOERROR,
PARSE_MALFORMED_SHOW,
PARSE_EXPECTED_LIKE,
PARSE_SYNTAX_ERROR
} PARSE_ERROR;
extern MAXINFO_TREE *maxinfo_parse(char *, PARSE_ERROR *);
extern void maxinfo_execute(DCB *, MAXINFO_TREE *);
extern void maxinfo_send_error(DCB *, int, char *);
extern void maxinfo_send_parse_error(DCB *, char *, PARSE_ERROR);
extern void maxinfo_send_error(DCB *, int, char *);
extern RESULTSET *maxinfo_variables();
extern RESULTSET *maxinfo_status();
#endif

View File

@ -290,6 +290,7 @@ struct router_client_session {
int rses_capabilities; /*< input type, for example */
bool rses_autocommit_enabled;
bool rses_transaction_active;
DCB* client_dcb;
#if defined(PREP_STMT_CACHING)
HASHTABLE* rses_prep_stmt[2];
#endif

View File

@ -9,7 +9,8 @@ install(TARGETS galeramon DESTINATION modules)
add_library(ndbclustermon SHARED ndbcluster_mon.c)
target_link_libraries(ndbclustermon log_manager utils)
install(TARGETS ndbclustermon DESTINATION modules)
add_library(mmmon SHARED mm_mon.c)
target_link_libraries(mmmon log_manager utils)
install(TARGETS mmmon DESTINATION modules)
if(BUILD_MMMON)
add_library(mmmon SHARED mm_mon.c)
target_link_libraries(mmmon log_manager utils)
install(TARGETS mmmon DESTINATION modules)
endif()

View File

@ -41,6 +41,7 @@
#include <gw.h>
#include <modinfo.h>
#include <log_manager.h>
#include <resultset.h>
MODULE_INFO info = {
MODULE_API_PROTOCOL,
@ -129,10 +130,10 @@ GetModuleObject()
static int
httpd_read_event(DCB* dcb)
{
//SESSION *session = dcb->session;
//ROUTER_OBJECT *router = session->service->router;
//ROUTER *router_instance = session->service->router_instance;
//void *rsession = session->router_session;
SESSION *session = dcb->session;
ROUTER_OBJECT *router = session->service->router;
ROUTER *router_instance = session->service->router_instance;
void *rsession = session->router_session;
int numchars = 1;
char buf[HTTPD_REQUESTLINE_MAXLEN-1] = "";
@ -143,6 +144,7 @@ int cgi = 0;
size_t i, j;
int headers_read = 0;
HTTPD_session *client_data = NULL;
GWBUF *uri;
client_data = dcb->data;
@ -234,13 +236,11 @@ HTTPD_session *client_data = NULL;
/* send all the basic headers and close with \r\n */
httpd_send_headers(dcb, 1);
#if 0
/**
* ToDO: launch proper content handling based on the requested URI, later REST interface
*
*/
dcb_printf(dcb, "Welcome to HTTPD MaxScale (c) %s\n\n", version_str);
if (strcmp(url, "/show") == 0) {
if (query_string && strlen(query_string)) {
if (strcmp(query_string, "dcb") == 0)
@ -249,6 +249,21 @@ HTTPD_session *client_data = NULL;
dprintAllSessions(dcb);
}
}
if (strcmp(url, "/services") == 0) {
RESULTSET *set, *seviceGetList();
if ((set = serviceGetList()) != NULL)
{
resultset_stream_json(set, dcb);
resultset_free(set);
}
}
#endif
if ((uri = gwbuf_alloc(strlen(url) + 1)) != NULL)
{
strcpy((char *)GWBUF_DATA(uri), url);
gwbuf_set_type(uri, GWBUF_TYPE_HTTP);
SESSION_ROUTE_QUERY(session, uri);
}
/* force the client connecton close */
dcb_close(dcb);
@ -345,6 +360,9 @@ int n_connect = 0;
/* create the session data for HTTPD */
client_data = (HTTPD_session *)calloc(1, sizeof(HTTPD_session));
client->data = client_data;
client->session =
session_alloc(dcb->session->service, client);
if (poll_add_dcb(client) == -1)
{
@ -354,7 +372,6 @@ int n_connect = 0;
n_connect++;
}
}
close(so);
}
return n_connect;
@ -484,7 +501,7 @@ static void httpd_send_headers(DCB *dcb, int final)
strftime(date, sizeof(date), fmt, localtime(&httpd_current_time));
dcb_printf(dcb, "HTTP/1.1 200 OK\r\nDate: %s\r\nServer: %s\r\nConnection: close\r\nContent-Type: text/plain\r\n", date, HTTP_SERVER_STRING);
dcb_printf(dcb, "HTTP/1.1 200 OK\r\nDate: %s\r\nServer: %s\r\nConnection: close\r\nContent-Type: application/json\r\n", date, HTTP_SERVER_STRING);
/* close the headers */
if (final) {

View File

@ -21,4 +21,5 @@ add_subdirectory(readwritesplit)
if(BUILD_BINLOG)
add_subdirectory(binlog)
endif()
add_subdirectory(maxinfo)

View File

@ -0,0 +1,4 @@
add_library(maxinfo SHARED maxinfo.c maxinfo_parse.c maxinfo_error.c maxinfo_exec.c)
set_target_properties(maxinfo PROPERTIES INSTALL_RPATH ${CMAKE_INSTALL_RPATH}:${CMAKE_INSTALL_PREFIX}/lib)
target_link_libraries(maxinfo pthread log_manager)
install(TARGETS maxinfo DESTINATION modules)

View File

@ -0,0 +1,803 @@
/*
* This file is distributed as part of MaxScale. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright MariaDB Corporation Ab 2014
*/
/**
* @file maxinfo.c - A "routing module" that in fact merely gives access
* to a MaxScale information schema usign the MySQL protocol
*
* @verbatim
* Revision History
*
* Date Who Description
* 16/02/15 Mark Riddoch Initial implementation
* 27/02/15 Massimiliano Pinto Added maxinfo_add_mysql_user
*
* @endverbatim
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <service.h>
#include <session.h>
#include <server.h>
#include <router.h>
#include <modules.h>
#include <modinfo.h>
#include <modutil.h>
#include <monitor.h>
#include <atomic.h>
#include <spinlock.h>
#include <dcb.h>
#include <poll.h>
#include <maxinfo.h>
#include <skygw_utils.h>
#include <log_manager.h>
#include <resultset.h>
#include <version.h>
#include <resultset.h>
#include <secrets.h>
#include <users.h>
#include <dbusers.h>
MODULE_INFO info = {
MODULE_API_ROUTER,
MODULE_ALPHA_RELEASE,
ROUTER_VERSION,
"The MaxScale Information Schema"
};
/** Defined in log_manager.cc */
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
extern char *create_hex_sha1_sha1_passwd(char *passwd);
static char *version_str = "V1.0.0";
static int maxinfo_statistics(INFO_INSTANCE *, INFO_SESSION *, GWBUF *);
static int maxinfo_ping(INFO_INSTANCE *, INFO_SESSION *, GWBUF *);
static int maxinfo_execute_query(INFO_INSTANCE *, INFO_SESSION *, char *);
static int handle_url(INFO_INSTANCE *instance, INFO_SESSION *router_session, GWBUF *queue);
static int maxinfo_add_mysql_user(SERVICE *service);
/* The router entry points */
static ROUTER *createInstance(SERVICE *service, char **options);
static void *newSession(ROUTER *instance, SESSION *session);
static void closeSession(ROUTER *instance, void *router_session);
static void freeSession(ROUTER *instance, void *router_session);
static int execute(ROUTER *instance, void *router_session, GWBUF *queue);
static void diagnostics(ROUTER *instance, DCB *dcb);
static uint8_t getCapabilities (ROUTER* inst, void* router_session);
static void handleError(
ROUTER *instance,
void *router_session,
GWBUF *errbuf,
DCB *backend_dcb,
error_action_t action,
bool *succp);
/** The module object definition */
static ROUTER_OBJECT MyObject = {
createInstance,
newSession,
closeSession,
freeSession,
execute,
diagnostics,
NULL,
handleError,
getCapabilities
};
static SPINLOCK instlock;
static INFO_INSTANCE *instances;
/**
* Implementation of the mandatory version entry point
*
* @return version string of the module
*/
char *
version()
{
return version_str;
}
/**
* The module initialisation routine, called when the module
* is first loaded.
*/
void
ModuleInit()
{
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"Initialise MaxInfo router module %s.\n",
version_str)));
spinlock_init(&instlock);
instances = NULL;
}
/**
* The module entry point routine. It is this routine that
* must populate the structure that is referred to as the
* "module object", this is a structure with the set of
* external entry points for this module.
*
* @return The module object
*/
ROUTER_OBJECT *
GetModuleObject()
{
return &MyObject;
}
/**
* Create an instance of the router for a particular service
* within the gateway.
*
* @param service The service this router is being create for
* @param options Any array of options for the query router
*
* @return The instance data for this new instance
*/
static ROUTER *
createInstance(SERVICE *service, char **options)
{
INFO_INSTANCE *inst;
int i;
if ((inst = malloc(sizeof(INFO_INSTANCE))) == NULL)
return NULL;
inst->service = service;
spinlock_init(&inst->lock);
if (options)
{
for (i = 0; options[i]; i++)
{
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"Unknown option for MaxInfo '%s'\n",
options[i])));
}
}
}
/*
* We have completed the creation of the instance data, so now
* insert this router instance into the linked list of routers
* that have been created with this module.
*/
spinlock_acquire(&instlock);
inst->next = instances;
instances = inst;
spinlock_release(&instlock);
/*
* The following add the service user to service->users via mysql_users_alloc()
* password to be used.
*/
maxinfo_add_mysql_user(service);
return (ROUTER *)inst;
}
/**
* Associate a new session with this instance of the router.
*
* @param instance The router instance data
* @param session The session itself
* @return Session specific data for this session
*/
static void *
newSession(ROUTER *instance, SESSION *session)
{
INFO_INSTANCE *inst = (INFO_INSTANCE *)instance;
INFO_SESSION *client;
if ((client = (INFO_SESSION *)malloc(sizeof(INFO_SESSION))) == NULL)
{
return NULL;
}
client->session = session;
client->dcb = session->client;
client->queue = NULL;
spinlock_acquire(&inst->lock);
client->next = inst->sessions;
inst->sessions = client;
spinlock_release(&inst->lock);
session->state = SESSION_STATE_READY;
return (void *)client;
}
/**
* Close a session with the router, this is the mechanism
* by which a router may cleanup data structure etc.
*
* @param instance The router instance data
* @param router_session The session being closed
*/
static void
closeSession(ROUTER *instance, void *router_session)
{
INFO_INSTANCE *inst = (INFO_INSTANCE *)instance;
INFO_SESSION *session = (INFO_SESSION *)router_session;
spinlock_acquire(&inst->lock);
if (inst->sessions == session)
inst->sessions = session->next;
else
{
INFO_SESSION *ptr = inst->sessions;
while (ptr && ptr->next != session)
ptr = ptr->next;
if (ptr)
ptr->next = session->next;
}
spinlock_release(&inst->lock);
/**
* Router session is freed in session.c:session_close, when session who
* owns it, is freed.
*/
}
/**
* Free a maxinfo session
*
* @param router_instance The router session
* @param router_client_session The router session as returned from newSession
*/
static void freeSession(
ROUTER* router_instance,
void* router_client_session)
{
free(router_client_session);
return;
}
/**
* Error Handler routine
*
* The routine will handle errors that occurred in backend writes.
*
* @param instance The router instance
* @param router_session The router session
* @param message The error message to reply
* @param backend_dcb The backend DCB
* @param action The action: REPLY, REPLY_AND_CLOSE, NEW_CONNECTION
*
*/
static void handleError(
ROUTER *instance,
void *router_session,
GWBUF *errbuf,
DCB *backend_dcb,
error_action_t action,
bool *succp)
{
DCB *client_dcb;
SESSION *session = backend_dcb->session;
session_state_t sesstate;
/** Reset error handle flag from a given DCB */
if (action == ERRACT_RESET)
{
backend_dcb->dcb_errhandle_called = false;
return;
}
/** Don't handle same error twice on same DCB */
if (backend_dcb->dcb_errhandle_called)
{
/** we optimistically assume that previous call succeed */
*succp = true;
return;
}
else
{
backend_dcb->dcb_errhandle_called = true;
}
spinlock_acquire(&session->ses_lock);
sesstate = session->state;
client_dcb = session->client;
if (sesstate == SESSION_STATE_ROUTER_READY)
{
CHK_DCB(client_dcb);
spinlock_release(&session->ses_lock);
client_dcb->func.write(client_dcb, gwbuf_clone(errbuf));
}
else
{
spinlock_release(&session->ses_lock);
}
/** false because connection is not available anymore */
*succp = false;
}
/**
* We have data from the client, this is a SQL command, or other MySQL
* packet type.
*
* @param instance The router instance
* @param router_session The router session returned from the newSession call
* @param queue The queue of data buffers to route
* @return The number of bytes sent
*/
static int
execute(ROUTER *rinstance, void *router_session, GWBUF *queue)
{
INFO_INSTANCE *instance = (INFO_INSTANCE *)rinstance;
INFO_SESSION *session = (INFO_SESSION *)router_session;
uint8_t *data;
int length, len, residual;
char *sql;
if (GWBUF_TYPE(queue) == GWBUF_TYPE_HTTP)
{
return handle_url(instance, session, queue);
}
if (session->queue)
{
queue = gwbuf_append(session->queue, queue);
session->queue = NULL;
queue = gwbuf_make_contiguous(queue);
}
data = (uint8_t *)GWBUF_DATA(queue);
length = data[0] + (data[1] << 8) + (data[2] << 16);
if (length + 4 > GWBUF_LENGTH(queue))
{
// Incomplete packet, must be buffered
session->queue = queue;
return 1;
}
// We have a complete request in a signle buffer
if (modutil_MySQL_Query(queue, &sql, &len, &residual))
{
sql = strndup(sql, len);
return maxinfo_execute_query(instance, session, sql);
}
else
{
switch (MYSQL_COMMAND(queue))
{
case COM_PING:
return maxinfo_ping(instance, session, queue);
case COM_STATISTICS:
return maxinfo_statistics(instance, session, queue);
default:
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"maxinfo: Unexpected MySQL command 0x%x",
MYSQL_COMMAND(queue))));
}
}
return 1;
}
/**
* Display router diagnostics
*
* @param instance Instance of the router
* @param dcb DCB to send diagnostics to
*/
static void
diagnostics(ROUTER *instance, DCB *dcb)
{
return; /* Nothing to do currently */
}
/**
* Capabilities interface for the rotuer
*
* Not used for the maxinfo router
*/
static uint8_t
getCapabilities(
ROUTER* inst,
void* router_session)
{
return 0;
}
/**
* Return some basic statistics from the router in response to a COM_STATISTICS
* request.
*
* @param router The router instance
* @param session The connection that requested the statistics
* @param queue The statistics request
*
* @return non-zero on sucessful send
*/
static int
maxinfo_statistics(INFO_INSTANCE *router, INFO_SESSION *session, GWBUF *queue)
{
char result[1000], *ptr;
GWBUF *ret;
int len;
extern int MaxScaleUptime();
snprintf(result, 1000,
"Uptime: %u Threads: %u Sessions: %u ",
MaxScaleUptime(),
config_threadcount(),
serviceSessionCountAll());
if ((ret = gwbuf_alloc(4 + strlen(result))) == NULL)
return 0;
len = strlen(result);
ptr = GWBUF_DATA(ret);
*ptr++ = len & 0xff;
*ptr++ = (len & 0xff00) >> 8;
*ptr++ = (len & 0xff0000) >> 16;
*ptr++ = 1;
strncpy(ptr, result, len);
return session->dcb->func.write(session->dcb, ret);
}
/**
* Respond to a COM_PING command
*
* @param router The router instance
* @param session The connection that requested the ping
* @param queue The ping request
*/
static int
maxinfo_ping(INFO_INSTANCE *router, INFO_SESSION *session, GWBUF *queue)
{
char *ptr;
GWBUF *ret;
int len;
if ((ret = gwbuf_alloc(5)) == NULL)
return 0;
ptr = GWBUF_DATA(ret);
*ptr++ = 0x01;
*ptr++ = 0;
*ptr++ = 0;
*ptr++ = 1;
*ptr = 0; // OK
return session->dcb->func.write(session->dcb, ret);
}
/**
* Populate the version comment with the MaxScale version
*
* @param result The result set
* @param data Pointer to int which is row count
* @return The populated row
*/
static RESULT_ROW *
version_comment(RESULTSET *result, void *data)
{
int *context = (int *)data;
RESULT_ROW *row;
if (*context == 0)
{
(*context)++;
row = resultset_make_row(result);
resultset_row_set(row, 0, MAXSCALE_VERSION);
return row;
}
return NULL;
}
/**
* The hardwired select @@vercom response
*
* @param dcb The DCB of the client
*/
static void
respond_vercom(DCB *dcb)
{
RESULTSET *result;
int context = 0;
if ((result = resultset_create(version_comment, &context)) == NULL)
{
maxinfo_send_error(dcb, 0, "No resources available");
return;
}
resultset_add_column(result, "@@version_comment", 40, COL_TYPE_VARCHAR);
resultset_stream_mysql(result, dcb);
resultset_free(result);
}
/**
* Populate the version comment with the MaxScale version
*
* @param result The result set
* @param data Pointer to int which is row count
* @return The populated row
*/
static RESULT_ROW *
starttime_row(RESULTSET *result, void *data)
{
int *context = (int *)data;
RESULT_ROW *row;
extern time_t MaxScaleStarted;
struct tm tm;
static char buf[40];
if (*context == 0)
{
(*context)++;
row = resultset_make_row(result);
sprintf(buf, "%u", (unsigned int)MaxScaleStarted);
resultset_row_set(row, 0, buf);
return row;
}
return NULL;
}
/**
* The hardwired select ... as starttime response
*
* @param dcb The DCB of the client
*/
static void
respond_starttime(DCB *dcb)
{
RESULTSET *result;
int context = 0;
if ((result = resultset_create(starttime_row, &context)) == NULL)
{
maxinfo_send_error(dcb, 0, "No resources available");
return;
}
resultset_add_column(result, "starttime", 40, COL_TYPE_VARCHAR);
resultset_stream_mysql(result, dcb);
resultset_free(result);
}
/**
* Send a MySQL OK packet to the DCB
*
* @param dcb The DCB to send the OK packet to
* @return result of a write call, non-zero if write was successful
*/
static int
maxinfo_send_ok(DCB *dcb)
{
GWBUF *buf;
uint8_t *ptr;
if ((buf = gwbuf_alloc(11)) == NULL)
return 0;
ptr = GWBUF_DATA(buf);
*ptr++ = 7; // Payload length
*ptr++ = 0;
*ptr++ = 0;
*ptr++ = 1; // Seqno
*ptr++ = 0; // ok
*ptr++ = 0;
*ptr++ = 0;
*ptr++ = 2;
*ptr++ = 0;
*ptr++ = 0;
*ptr++ = 0;
return dcb->func.write(dcb, buf);
}
/**
* Execute a SQL query against the MaxScale Information Schema
*
* @param instance The instance strcture
* @param session The session pointer
* @param sql The SQL to execute
*/
static int
maxinfo_execute_query(INFO_INSTANCE *instance, INFO_SESSION *session, char *sql)
{
MAXINFO_TREE *tree;
PARSE_ERROR err;
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE,
"maxinfo: SQL statement: '%s' for 0x%x.",
sql, session->dcb)));
if (strcmp(sql, "select @@version_comment limit 1") == 0)
{
respond_vercom(session->dcb);
return 1;
}
/* Below is a kludge for MonYog, if we see
* select unix_timestamp... as starttime
* just return the starttime of MaxScale
*/
if (strncasecmp(sql, "select UNIX_TIMESTAMP",
strlen("select UNIX_TIMESTAMP")) == 0
&& strstr(sql, "as starttime") != NULL)
{
respond_starttime(session->dcb);
return 1;
}
if (strcasecmp(sql, "set names 'utf8'") == 0)
{
return maxinfo_send_ok(session->dcb);
}
if (strncasecmp(sql, "set session", 11) == 0)
{
return maxinfo_send_ok(session->dcb);
}
if (strncasecmp(sql, "set autocommit", 14) == 0)
{
return maxinfo_send_ok(session->dcb);
}
if (strncasecmp(sql, "SELECT `ENGINES`.`SUPPORT`", 26) == 0)
{
return maxinfo_send_ok(session->dcb);
}
if ((tree = maxinfo_parse(sql, &err)) == NULL)
{
maxinfo_send_parse_error(session->dcb, sql, err);
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"Failed to parse SQL statement: '%s'.",
sql)));
}
else
maxinfo_execute(session->dcb, tree);
return 1;
}
/**
* Session all result set
* @return A resultset for all sessions
*/
static RESULTSET *
maxinfoSessionsAll()
{
return sessionGetList(SESSION_LIST_ALL);
}
/**
* Client session result set
* @return A resultset for all sessions
*/
static RESULTSET *
maxinfoClientSessions()
{
return sessionGetList(SESSION_LIST_CONNECTION);
}
typedef RESULTSET *(*RESULTSETFUNC)();
/**
* Table that maps a URI to a function to call to
* to obtain the result set related to that URI
*/
static struct uri_table {
char *uri;
RESULTSETFUNC func;
} supported_uri[] = {
{ "/services", serviceGetList },
{ "/listeners", serviceGetListenerList },
{ "/modules", moduleGetList },
{ "/monitors", monitorGetList },
{ "/sessions", maxinfoSessionsAll },
{ "/clients", maxinfoClientSessions },
{ "/servers", serverGetList },
{ "/variables", maxinfo_variables },
{ "/status", maxinfo_status },
{ "/event/times", eventTimesGetList },
{ NULL, NULL }
};
/**
* We have data from the client, this is a HTTP URL
*
* @param instance The router instance
* @param session The router session returned from the newSession call
* @param queue The queue of data buffers to route
* @return The number of bytes sent
*/
static int
handle_url(INFO_INSTANCE *instance, INFO_SESSION *session, GWBUF *queue)
{
char *uri;
int i;
RESULTSET *set;
uri = (char *)GWBUF_DATA(queue);
for (i = 0; supported_uri[i].uri; i++)
{
if (strcmp(uri, supported_uri[i].uri) == 0)
{
set = (*supported_uri[i].func)();
resultset_stream_json(set, session->dcb);
resultset_free(set);
}
}
return 1;
}
/**
* Add the service user to the service->users
* via mysql_users_alloc and add_mysql_users_with_host_ipv4
* User is added for '%' and 'localhost' hosts
*
* @param service The service for this router
* @return 0 on success, 1 on failure
*/
static int
maxinfo_add_mysql_user(SERVICE *service) {
char *dpwd = NULL;
char *newpasswd = NULL;
char *service_user = NULL;
char *service_passwd = NULL;
if (serviceGetUser(service, &service_user, &service_passwd) == 0) {
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"maxinfo: failed to get service user details")));
return 1;
}
dpwd = decryptPassword(service->credentials.authdata);
if (!dpwd) {
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"maxinfo: decrypt password failed for service user %s",
service_user)));
return 1;
}
service->users = (void *)mysql_users_alloc();
newpasswd = create_hex_sha1_sha1_passwd(dpwd);
if (!newpasswd) {
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"maxinfo: create hex_sha1_sha1_password failed for service user %s",
service_user)));
users_free(service->users);
return 1;
}
/* add service user for % and localhost */
(void)add_mysql_users_with_host_ipv4(service->users, service->credentials.name, "%", newpasswd, "Y", "");
(void)add_mysql_users_with_host_ipv4(service->users, service->credentials.name, "localhost", newpasswd, "Y", "");
free(newpasswd);
free(dpwd);
return 0;
}

View File

@ -0,0 +1,118 @@
/*
* This file is distributed as part of MaxScale. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright MariaDB Corporation Ab 2014
*/
/**
* @file maxinfo_error.c - Handle error reporting for the maxinfo router
*
* @verbatim
* Revision History
*
* Date Who Description
* 17/02/15 Mark Riddoch Initial implementation
*
* @endverbatim
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <service.h>
#include <session.h>
#include <router.h>
#include <modules.h>
#include <modinfo.h>
#include <modutil.h>
#include <atomic.h>
#include <spinlock.h>
#include <dcb.h>
#include <poll.h>
#include <maxinfo.h>
#include <skygw_utils.h>
#include <log_manager.h>
/** Defined in log_manager.cc */
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
/**
* Process a parse error and send error report to client
*
* @param dcb The DCB to send to error
* @param sql The SQL that had the parse error
* @param err The parse error code
*/
void
maxinfo_send_parse_error(DCB *dcb, char *sql, PARSE_ERROR err)
{
char *desc = "";
char *msg;
int len;
switch (err)
{
case PARSE_NOERROR:
desc = "No error";
break;
case PARSE_MALFORMED_SHOW:
desc = "Expected show <command> [like <pattern>]";
break;
case PARSE_EXPECTED_LIKE:
desc = "Expected LIKE <pattern>";
break;
case PARSE_SYNTAX_ERROR:
desc = "Syntax error";
break;
}
len = strlen(sql) + strlen(desc) + 20;
if ((msg = (char *)malloc(len)) == NULL)
return;
sprintf(msg, "%s in query '%s'", desc, sql);
maxinfo_send_error(dcb, 1149, msg);
}
/**
* Construct an error response
*
* @param dcb The DCB to send the error packet to
* @param msg The slave server instance
*/
void
maxinfo_send_error(DCB *dcb, int errcode, char *msg)
{
GWBUF *pkt;
unsigned char *data;
int len;
len = strlen(msg) + 9;
if ((pkt = gwbuf_alloc(len + 4)) == NULL)
return;
data = GWBUF_DATA(pkt);
data[0] = len & 0xff; // Payload length
data[1] = (len >> 8) & 0xff;
data[2] = (len >> 16) & 0xff;
data[3] = 1; // Sequence id
// Payload
data[4] = 0xff; // Error indicator
data[5] = errcode & 0xff; // Error Code
data[6] = (errcode >> 8) & 0xff; // Error Code
strncpy((char *)&data[7], "#42000", 6);
strncpy((char *)&data[13], msg, strlen(msg)); // Error Message
dcb->func.write(dcb, pkt);
}

View File

@ -0,0 +1,770 @@
/*
* This file is distributed as part of MaxScale. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright MariaDB Corporation Ab 2014
*/
/**
* @file maxinfo_parse.c - Parse the limited set of SQL that the MaxScale
* information schema can use
*
* @verbatim
* Revision History
*
* Date Who Description
* 17/02/15 Mark Riddoch Initial implementation
*
* @endverbatim
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <service.h>
#include <session.h>
#include <router.h>
#include <modules.h>
#include <monitor.h>
#include <version.h>
#include <modinfo.h>
#include <modutil.h>
#include <atomic.h>
#include <spinlock.h>
#include <dcb.h>
#include <poll.h>
#include <maxinfo.h>
#include <skygw_utils.h>
#include <log_manager.h>
#include <resultset.h>
#include <config.h>
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
static void exec_show(DCB *dcb, MAXINFO_TREE *tree);
static void exec_select(DCB *dcb, MAXINFO_TREE *tree);
static void exec_show_variables(DCB *dcb, MAXINFO_TREE *filter);
static void exec_show_status(DCB *dcb, MAXINFO_TREE *filter);
static int maxinfo_pattern_match(char *pattern, char *str);
/**
* Execute a parse tree and return the result set or runtime error
*
* @param dcb The DCB that connects to the client
* @param tree The parse tree for the query
*/
void
maxinfo_execute(DCB *dcb, MAXINFO_TREE *tree)
{
switch (tree->op)
{
case MAXOP_SHOW:
exec_show(dcb, tree);
break;
case MAXOP_SELECT:
exec_select(dcb, tree);
break;
case MAXOP_TABLE:
case MAXOP_COLUMNS:
case MAXOP_LITERAL:
case MAXOP_PREDICATE:
case MAXOP_LIKE:
case MAXOP_EQUAL:
default:
maxinfo_send_error(dcb, 0, "Unexpected operator in parse tree");
}
}
/**
* Fetch the list of services and stream as a result set
*
* @param dcb DCB to which to stream result set
* @param tree Potential like clause (currently unused)
*/
static void
exec_show_services(DCB *dcb, MAXINFO_TREE *tree)
{
RESULTSET *set;
if ((set = serviceGetList()) == NULL)
return;
resultset_stream_mysql(set, dcb);
resultset_free(set);
}
/**
* Fetch the list of listeners and stream as a result set
*
* @param dcb DCB to which to stream result set
* @param tree Potential like clause (currently unused)
*/
static void
exec_show_listeners(DCB *dcb, MAXINFO_TREE *tree)
{
RESULTSET *set;
if ((set = serviceGetListenerList()) == NULL)
return;
resultset_stream_mysql(set, dcb);
resultset_free(set);
}
/**
* Fetch the list of sessions and stream as a result set
*
* @param dcb DCB to which to stream result set
* @param tree Potential like clause (currently unused)
*/
static void
exec_show_sessions(DCB *dcb, MAXINFO_TREE *tree)
{
RESULTSET *set;
if ((set = sessionGetList(SESSION_LIST_ALL)) == NULL)
return;
resultset_stream_mysql(set, dcb);
resultset_free(set);
}
/**
* Fetch the list of client sessions and stream as a result set
*
* @param dcb DCB to which to stream result set
* @param tree Potential like clause (currently unused)
*/
static void
exec_show_clients(DCB *dcb, MAXINFO_TREE *tree)
{
RESULTSET *set;
if ((set = sessionGetList(SESSION_LIST_CONNECTION)) == NULL)
return;
resultset_stream_mysql(set, dcb);
resultset_free(set);
}
/**
* Fetch the list of servers and stream as a result set
*
* @param dcb DCB to which to stream result set
* @param tree Potential like clause (currently unused)
*/
static void
exec_show_servers(DCB *dcb, MAXINFO_TREE *tree)
{
RESULTSET *set;
if ((set = serverGetList()) == NULL)
return;
resultset_stream_mysql(set, dcb);
resultset_free(set);
}
/**
* Fetch the list of modules and stream as a result set
*
* @param dcb DCB to which to stream result set
* @param tree Potential like clause (currently unused)
*/
static void
exec_show_modules(DCB *dcb, MAXINFO_TREE *tree)
{
RESULTSET *set;
if ((set = moduleGetList()) == NULL)
return;
resultset_stream_mysql(set, dcb);
resultset_free(set);
}
/**
* Fetch the list of monitors and stream as a result set
*
* @param dcb DCB to which to stream result set
* @param tree Potential like clause (currently unused)
*/
static void
exec_show_monitors(DCB *dcb, MAXINFO_TREE *tree)
{
RESULTSET *set;
if ((set = monitorGetList()) == NULL)
return;
resultset_stream_mysql(set, dcb);
resultset_free(set);
}
/**
* Fetch the event times data
*
* @param dcb DCB to which to stream result set
* @param tree Potential like clause (currently unused)
*/
static void
exec_show_eventTimes(DCB *dcb, MAXINFO_TREE *tree)
{
RESULTSET *set;
if ((set = eventTimesGetList()) == NULL)
return;
resultset_stream_mysql(set, dcb);
resultset_free(set);
}
/**
* The table of show commands that are supported
*/
static struct {
char *name;
void (*func)(DCB *, MAXINFO_TREE *);
} show_commands[] = {
{ "variables", exec_show_variables },
{ "status", exec_show_status },
{ "services", exec_show_services },
{ "listeners", exec_show_listeners },
{ "sessions", exec_show_sessions },
{ "clients", exec_show_clients },
{ "servers", exec_show_servers },
{ "modules", exec_show_modules },
{ "monitors", exec_show_monitors },
{ "eventTimes", exec_show_eventTimes },
{ NULL, NULL }
};
/**
* Execute a show command parse tree and return the result set or runtime error
*
* @param dcb The DCB that connects to the client
* @param tree The parse tree for the query
*/
static void
exec_show(DCB *dcb, MAXINFO_TREE *tree)
{
int i;
char errmsg[120];
for (i = 0; show_commands[i].name; i++)
{
if (strcasecmp(show_commands[i].name, tree->value) == 0)
{
(*show_commands[i].func)(dcb, tree->right);
return;
}
}
if (strlen(tree->value) > 80) // Prevent buffer overrun
tree->value[80] = 0;
sprintf(errmsg, "Unsupported show command '%s'", tree->value);
maxinfo_send_error(dcb, 0, errmsg);
LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE, errmsg)));
}
/**
* Return the current MaxScale version
*
* @return The version string for MaxScale
*/
static char *
getVersion()
{
return MAXSCALE_VERSION;
}
static char *versionComment = "MariaDB MaxScale";
/**
* Return the current MaxScale version
*
* @return The version string for MaxScale
*/
static char *
getVersionComment()
{
return versionComment;
}
/**
* Return the current MaxScale Home Directory
*
* @return The version string for MaxScale
*/
static char *
getMaxScaleHome()
{
return getenv("MAXSCALE_HOME");
}
/* The various methods to fetch the variables */
#define VT_STRING 1
#define VT_INT 2
extern int MaxScaleUptime();
typedef void *(*STATSFUNC)();
/**
* Variables that may be sent in a show variables
*/
static struct {
char *name;
int type;
STATSFUNC func;
} variables[] = {
{ "version", VT_STRING, (STATSFUNC)getVersion },
{ "version_comment", VT_STRING, (STATSFUNC)getVersionComment },
{ "basedir", VT_STRING, (STATSFUNC)getMaxScaleHome},
{ "MAXSCALE_VERSION", VT_STRING, (STATSFUNC)getVersion },
{ "MAXSCALE_THREADS", VT_INT, (STATSFUNC)config_threadcount },
{ "MAXSCALE_NBPOLLS", VT_INT, (STATSFUNC)config_nbpolls },
{ "MAXSCALE_POLLSLEEP", VT_INT, (STATSFUNC)config_pollsleep },
{ "MAXSCALE_UPTIME", VT_INT, (STATSFUNC)MaxScaleUptime },
{ "MAXSCALE_SESSIONS", VT_INT, (STATSFUNC)serviceSessionCountAll },
{ NULL, 0, NULL }
};
typedef struct {
int index;
char *like;
} VARCONTEXT;
/**
* Callback function to populate rows of the show variable
* command
*
* @param data The context point
* @return The next row or NULL if end of rows
*/
static RESULT_ROW *
variable_row(RESULTSET *result, void *data)
{
VARCONTEXT *context = (VARCONTEXT *)data;
RESULT_ROW *row;
char buf[80];
if (variables[context->index].name)
{
if (context->like &&
maxinfo_pattern_match(context->like,
variables[context->index].name))
{
context->index++;
return variable_row(result, data);
}
row = resultset_make_row(result);
resultset_row_set(row, 0, variables[context->index].name);
switch (variables[context->index].type)
{
case VT_STRING:
resultset_row_set(row, 1,
(char *)(*variables[context->index].func)());
break;
case VT_INT:
snprintf(buf, 80, "%ld",
(long)(*variables[context->index].func)());
resultset_row_set(row, 1, buf);
break;
}
context->index++;
return row;
}
return NULL;
}
/**
* Execute a show variables command applying an optional filter
*
* @param dcb The DCB connected to the client
* @param filter A potential like clause or NULL
*/
static void
exec_show_variables(DCB *dcb, MAXINFO_TREE *filter)
{
RESULTSET *result;
VARCONTEXT context;
if (filter)
context.like = filter->value;
else
context.like = NULL;
context.index = 0;
if ((result = resultset_create(variable_row, &context)) == NULL)
{
maxinfo_send_error(dcb, 0, "No resources available");
return;
}
resultset_add_column(result, "Variable_name", 40, COL_TYPE_VARCHAR);
resultset_add_column(result, "Value", 40, COL_TYPE_VARCHAR);
resultset_stream_mysql(result, dcb);
resultset_free(result);
}
/**
* Return the show variables output a a result set
*
* @return Variables as a result set
*/
RESULTSET *
maxinfo_variables()
{
RESULTSET *result;
static VARCONTEXT context;
context.like = NULL;
context.index = 0;
if ((result = resultset_create(variable_row, &context)) == NULL)
{
return NULL;
}
resultset_add_column(result, "Variable_name", 40, COL_TYPE_VARCHAR);
resultset_add_column(result, "Value", 40, COL_TYPE_VARCHAR);
return result;
}
/**
* Interface to dcb_count_by_usage for all dcbs
*/
static int
maxinfo_all_dcbs()
{
return dcb_count_by_usage(DCB_USAGE_ALL);
}
/**
* Interface to dcb_count_by_usage for client dcbs
*/
static int
maxinfo_client_dcbs()
{
return dcb_count_by_usage(DCB_USAGE_CLIENT);
}
/**
* Interface to dcb_count_by_usage for listener dcbs
*/
static int
maxinfo_listener_dcbs()
{
return dcb_count_by_usage(DCB_USAGE_LISTENER);
}
/**
* Interface to dcb_count_by_usage for backend dcbs
*/
static int
maxinfo_backend_dcbs()
{
return dcb_count_by_usage(DCB_USAGE_BACKEND);
}
/**
* Interface to dcb_count_by_usage for internal dcbs
*/
static int
maxinfo_internal_dcbs()
{
return dcb_count_by_usage(DCB_USAGE_INTERNAL);
}
/**
* Interface to dcb_count_by_usage for zombie dcbs
*/
static int
maxinfo_zombie_dcbs()
{
return dcb_count_by_usage(DCB_USAGE_ZOMBIE);
}
/**
* Interface to poll stats for reads
*/
static int
maxinfo_read_events()
{
return poll_get_stat(POLL_STAT_READ);
}
/**
* Interface to poll stats for writes
*/
static int
maxinfo_write_events()
{
return poll_get_stat(POLL_STAT_WRITE);
}
/**
* Interface to poll stats for errors
*/
static int
maxinfo_error_events()
{
return poll_get_stat(POLL_STAT_ERROR);
}
/**
* Interface to poll stats for hangup
*/
static int
maxinfo_hangup_events()
{
return poll_get_stat(POLL_STAT_HANGUP);
}
/**
* Interface to poll stats for accepts
*/
static int
maxinfo_accept_events()
{
return poll_get_stat(POLL_STAT_ACCEPT);
}
/**
* Interface to poll stats for event queue length
*/
static int
maxinfo_event_queue_length()
{
return poll_get_stat(POLL_STAT_EVQ_LEN);
}
/**
* Interface to poll stats for event pending queue length
*/
static int
maxinfo_event_pending_queue_length()
{
return poll_get_stat(POLL_STAT_EVQ_PENDING);
}
/**
* Interface to poll stats for max event queue length
*/
static int
maxinfo_max_event_queue_length()
{
return poll_get_stat(POLL_STAT_EVQ_MAX);
}
/**
* Interface to poll stats for max queue time
*/
static int
maxinfo_max_event_queue_time()
{
return poll_get_stat(POLL_STAT_MAX_QTIME);
}
/**
* Interface to poll stats for max event execution time
*/
static int
maxinfo_max_event_exec_time()
{
return poll_get_stat(POLL_STAT_MAX_EXECTIME);
}
/**
* Variables that may be sent in a show status
*/
static struct {
char *name;
int type;
STATSFUNC func;
} status[] = {
{ "Uptime", VT_INT, (STATSFUNC)MaxScaleUptime },
{ "Uptime_since_flush_status", VT_INT, (STATSFUNC)MaxScaleUptime },
{ "Threads_created", VT_INT, (STATSFUNC)config_threadcount },
{ "Threads_running", VT_INT, (STATSFUNC)config_threadcount },
{ "Threadpool_threads", VT_INT, (STATSFUNC)config_threadcount },
{ "Threads_connected", VT_INT, (STATSFUNC)serviceSessionCountAll },
{ "Connections", VT_INT, (STATSFUNC)maxinfo_all_dcbs },
{ "Client_connections", VT_INT, (STATSFUNC)maxinfo_client_dcbs },
{ "Backend_connections", VT_INT, (STATSFUNC)maxinfo_backend_dcbs },
{ "Listeners", VT_INT, (STATSFUNC)maxinfo_listener_dcbs },
{ "Zombie_connections", VT_INT, (STATSFUNC)maxinfo_zombie_dcbs },
{ "Internal_descriptors", VT_INT, (STATSFUNC)maxinfo_internal_dcbs },
{ "Read_events", VT_INT, (STATSFUNC)maxinfo_read_events },
{ "Write_events", VT_INT, (STATSFUNC)maxinfo_write_events },
{ "Hangup_events", VT_INT, (STATSFUNC)maxinfo_hangup_events },
{ "Error_events", VT_INT, (STATSFUNC)maxinfo_error_events },
{ "Accept_events", VT_INT, (STATSFUNC)maxinfo_accept_events },
{ "Event_queue_length", VT_INT, (STATSFUNC)maxinfo_event_queue_length },
{ "Pending_events", VT_INT, (STATSFUNC)maxinfo_event_pending_queue_length },
{ "Max_event_queue_length", VT_INT, (STATSFUNC)maxinfo_max_event_queue_length },
{ "Max_event_queue_time", VT_INT, (STATSFUNC)maxinfo_max_event_queue_time },
{ "Max_event_execution_time", VT_INT, (STATSFUNC)maxinfo_max_event_exec_time },
{ NULL, 0, NULL }
};
/**
* Callback function to populate rows of the show variable
* command
*
* @param data The context point
* @return The next row or NULL if end of rows
*/
static RESULT_ROW *
status_row(RESULTSET *result, void *data)
{
VARCONTEXT *context = (VARCONTEXT *)data;
RESULT_ROW *row;
char buf[80];
if (status[context->index].name)
{
if (context->like &&
maxinfo_pattern_match(context->like,
status[context->index].name))
{
context->index++;
return status_row(result, data);
}
row = resultset_make_row(result);
resultset_row_set(row, 0, status[context->index].name);
switch (status[context->index].type)
{
case VT_STRING:
resultset_row_set(row, 1,
(char *)(*status[context->index].func)());
break;
case VT_INT:
snprintf(buf, 80, "%ld",
(long)(*status[context->index].func)());
resultset_row_set(row, 1, buf);
break;
}
context->index++;
return row;
}
return NULL;
}
/**
* Execute a show status command applying an optional filter
*
* @param dcb The DCB connected to the client
* @param filter A potential like clause or NULL
*/
static void
exec_show_status(DCB *dcb, MAXINFO_TREE *filter)
{
RESULTSET *result;
VARCONTEXT context;
if (filter)
context.like = filter->value;
else
context.like = NULL;
context.index = 0;
if ((result = resultset_create(status_row, &context)) == NULL)
{
maxinfo_send_error(dcb, 0, "No resources available");
return;
}
resultset_add_column(result, "Variable_name", 40, COL_TYPE_VARCHAR);
resultset_add_column(result, "Value", 40, COL_TYPE_VARCHAR);
resultset_stream_mysql(result, dcb);
resultset_free(result);
}
/**
* Return the show status data as a result set
*
* @return The show status data as a result set
*/
RESULTSET *
maxinfo_status()
{
RESULTSET *result;
static VARCONTEXT context;
context.like = NULL;
context.index = 0;
if ((result = resultset_create(status_row, &context)) == NULL)
{
return NULL;
}
resultset_add_column(result, "Variable_name", 40, COL_TYPE_VARCHAR);
resultset_add_column(result, "Value", 40, COL_TYPE_VARCHAR);
return result;
}
/**
* Execute a select command parse tree and return the result set
* or runtime error
*
* @param dcb The DCB that connects to the client
* @param tree The parse tree for the query
*/
static void
exec_select(DCB *dcb, MAXINFO_TREE *tree)
{
maxinfo_send_error(dcb, 0, "Select not yet implemented");
}
/**
* Perform a "like" pattern match. Only works for leading and trailing %
*
* @param pattern Pattern to match
* @param str String to match against pattern
* @return Zero on match
*/
static int
maxinfo_pattern_match(char *pattern, char *str)
{
int anchor, len, trailing;
char *fixed;
extern char *strcasestr();
if (*pattern != '%')
{
fixed = pattern;
anchor = 1;
}
else
{
fixed = &pattern[1];
}
len = strlen(fixed);
if (fixed[len - 1] == '%')
trailing = 1;
else
trailing = 0;
if (anchor == 1 && trailing == 0) // No wildcard
return strcasecmp(pattern, str);
else if (anchor == 1)
return strncasecmp(str, pattern, len - trailing);
else
{
char *portion = malloc(len + 1);
int rval;
strncpy(portion, fixed, len - trailing);
portion[len - trailing] = 0;
rval = (strcasestr(str, portion) != NULL ? 0 : 1);
free(portion);
return rval;
}
}

View File

@ -0,0 +1,320 @@
/*
* This file is distributed as part of MaxScale. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright MariaDB Corporation Ab 2014
*/
/**
* @file maxinfo_parse.c - Parse the limited set of SQL that the MaxScale
* information schema can use
*
* @verbatim
* Revision History
*
* Date Who Description
* 16/02/15 Mark Riddoch Initial implementation
*
* @endverbatim
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <service.h>
#include <session.h>
#include <router.h>
#include <modules.h>
#include <modinfo.h>
#include <modutil.h>
#include <atomic.h>
#include <spinlock.h>
#include <dcb.h>
#include <poll.h>
#include <maxinfo.h>
#include <skygw_utils.h>
#include <log_manager.h>
static MAXINFO_TREE *make_tree_node(MAXINFO_OPERATOR, char *, MAXINFO_TREE *, MAXINFO_TREE *);
static void free_tree(MAXINFO_TREE *);
static char *fetch_token(char *, int *, char **);
static MAXINFO_TREE *parse_column_list(char **sql);
static MAXINFO_TREE *parse_table_name(char **sql);
/**
* Parse a SQL subset for the maxinfo plugin and return a parse tree
*
* @param sql The SQL query
* @return Parse tree or NULL on error
*/
MAXINFO_TREE *
maxinfo_parse(char *sql, PARSE_ERROR *parse_error)
{
int token;
char *ptr, *text;
MAXINFO_TREE *tree = NULL;
MAXINFO_TREE *col, *table;
*parse_error = PARSE_NOERROR;
while ((ptr = fetch_token(sql, &token, &text)) != NULL)
{
switch (token)
{
case LT_SHOW:
free(text); // not needed
ptr = fetch_token(ptr, &token, &text);
if (ptr == NULL || token != LT_STRING)
{
// Expected show "name"
*parse_error = PARSE_MALFORMED_SHOW;
return NULL;
}
tree = make_tree_node(MAXOP_SHOW, text, NULL, NULL);
if ((ptr = fetch_token(ptr, &token, &text)) == NULL)
return tree;
else if (token == LT_LIKE)
{
if ((ptr = fetch_token(ptr, &token, &text)) != NULL)
{
tree->right = make_tree_node(MAXOP_LIKE,
text, NULL, NULL);
return tree;
}
else
{
// Expected expression
*parse_error = PARSE_EXPECTED_LIKE;
free_tree(tree);
return NULL;
}
}
// Malformed show
free_tree(tree);
*parse_error = PARSE_MALFORMED_SHOW;
return NULL;
#if 0
case LT_SELECT:
free(text); // not needed
col = parse_column_list(&ptr);
table = parse_table_name(&ptr);
return make_tree_node(MAXOP_SELECT, NULL, col, table);
#endif
default:
*parse_error = PARSE_SYNTAX_ERROR;
if (tree)
free_tree(tree);
return NULL;
}
}
*parse_error = PARSE_SYNTAX_ERROR;
if (tree)
free_tree(tree);
return NULL;
}
/**
* Parse a column list, may be a * or a valid list of string name
* seperated by a comma
*
* @param sql Pointer to pointer to column list updated to point to the table name
* @return A tree of column names
*/
static MAXINFO_TREE *
parse_column_list(char **ptr)
{
int token, lookahead;
char *text, *text2;
MAXINFO_TREE *tree = NULL;
*ptr = fetch_token(*ptr, &token, &text);
*ptr = fetch_token(*ptr, &lookahead, &text2);
switch (token)
{
case LT_STRING:
free(text2);
switch (lookahead)
{
case LT_COMMA:
return make_tree_node(MAXOP_COLUMNS, text, NULL,
parse_column_list(ptr));
case LT_FROM:
return make_tree_node(MAXOP_COLUMNS, text, NULL,
NULL);
}
break;
case LT_STAR:
free(text);
free(text2);
if (lookahead != LT_FROM)
return make_tree_node(MAXOP_ALL_COLUMNS, NULL, NULL,
NULL);
}
return NULL;
}
/**
* Parse a table name
*
* @param sql Pointer to pointer to column list updated to point to the table name
* @return A tree of table names
*/
static MAXINFO_TREE *
parse_table_name(char **ptr)
{
int token;
char *text;
MAXINFO_TREE *tree = NULL;
*ptr = fetch_token(*ptr, &token, &text);
if (token == LT_STRING)
return make_tree_node(MAXOP_TABLE, text, NULL, NULL);
return NULL;
}
/**
* Allocate and populate a parse tree node
*
* @param op The node operator
* @param value The node value
* @param left The left branch of the parse tree
* @param right The right branch of the parse tree
* @return The new parse tree node
*/
static MAXINFO_TREE *
make_tree_node(MAXINFO_OPERATOR op, char *value, MAXINFO_TREE *left, MAXINFO_TREE *right)
{
MAXINFO_TREE *node;
if ((node = (MAXINFO_TREE *)malloc(sizeof(MAXINFO_TREE))) == NULL)
return NULL;
node->op = op;
node->value = value;
node->left = left;
node->right = right;
return node;
}
/**
* Recusrsively free the storage associated with a parse tree
*
* @param tree The parse tree to free
*/
static void
free_tree(MAXINFO_TREE *tree)
{
if (tree->left)
free_tree(tree->left);
if (tree->right)
free_tree(tree->right);
if (tree->value)
free(tree->value);
free(tree);
}
/**
* The set of keywords known to the tokeniser
*/
static struct {
char *text;
int token;
} keywords[] = {
{ "show", LT_SHOW },
{ "select", LT_SELECT },
{ "from", LT_FROM },
{ "like", LT_LIKE },
{ "=", LT_EQUAL },
{ ",", LT_COMMA },
{ "*", LT_STAR },
{ NULL, 0 }
};
/**
* Limited SQL tokeniser. Understands a limited set of key words and
* quoted strings.
*
* @param sql The SQL to tokenise
* @param token The returned token
* @param text The matching text
* @return The next position to tokenise from
*/
static char *
fetch_token(char *sql, int *token, char **text)
{
char *s1, *s2, quote = '\0';
int i;
s1 = sql;
while (*s1 && isspace(*s1))
{
s1++;
}
if (quote == '\0' && (*s1 == '\'' || *s1 == '\"'))
{
quote = *s1++;
}
if (*s1 == '/' && *(s1 + 1) == '*')
{
s1 += 2;
// Skip the comment
do {
while (*s1 && *s1 != '*')
s1++;
} while (*(s1 + 1) && *(s1 + 1) != '/');
s1 += 2;
while (*s1 && isspace(*s1))
{
s1++;
}
if (quote == '\0' && (*s1 == '\'' || *s1 == '\"'))
{
quote = *s1++;
}
}
s2 = s1;
while (*s2)
{
if (quote == '\0' && (isspace(*s2)
|| *s2 == ',' || *s2 == '='))
break;
else if (quote == *s2)
{
break;
}
s2++;
}
if (*s1 == '@' && *(s1 + 1) == '@')
{
*text = strndup(s1 + 2, (s2 - s1) - 2);
*token = LT_VARIABLE;
return s2;
}
if (s1 == s2)
return NULL;
*text = strndup(s1, s2 - s1);
for (i = 0; keywords[i].text; i++)
{
if (strcasecmp(keywords[i].text, *text) == 0)
{
*token = keywords[i].token;
return s2;
}
}
*token = LT_STRING;
return s2;
}

View File

@ -790,6 +790,7 @@ static void* newSession(
#endif
client_rses->router = router;
client_rses->client_dcb = session->client;
/**
* If service config has been changed, reload config from service to
* router instance first.
@ -4355,8 +4356,10 @@ static bool route_session_write(
LOGFILE_TRACE,
"Router session exceeded session command history limit. "
"Closing router session. <")));
router_cli_ses->rses_closed = true;
gwbuf_free(querybuf);
rses_end_locked_router_action(router_cli_ses);
router_cli_ses->client_dcb->func.hangup(router_cli_ses->client_dcb);
goto return_succp;
}