query_classifier.cc

resolve_query_type, traverse through the list of items of thd->free_list, identify functions and reason query type according to the function type. This phase can only increase the restrictiviness level of the query.

query_classifier.h
        Added new query type QUERY_TYPE_LOCAL_READ, for functions that can be executed in Maxscale. This type is the least restrict
ive query type. It is not used currently.

testmain.c
	Added a few test cases and fixed expected return values for query type tests.

readwritesplit.c
	polish

skygw_debug.h
	Added string macro for Item types.
This commit is contained in:
vraatikka 2013-10-25 11:55:45 +03:00
parent 241a0a6175
commit f32cfe8546
5 changed files with 240 additions and 30 deletions

View File

@ -50,10 +50,13 @@
#include <errmsg.h>
#include <client_settings.h>
#include <item_func.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#define QTYPE_LESS_RESTRICTIVE_THAN_WRITE(t) (t<QUERY_TYPE_WRITE ? true : false)
static THD* get_or_create_thd_for_parsing(
MYSQL* mysql,
@ -286,7 +289,6 @@ static bool create_parse_tree(
Parser_state parser_state;
bool failp = FALSE;
const char* virtual_db = "skygw_virtual";
//ss_dfprintf(stderr, "> create_parse_tree\n");
if (parser_state.init(thd, thd->query(), thd->query_length())) {
failp = TRUE;
@ -307,11 +309,34 @@ static bool create_parse_tree(
fprintf(stderr, "parse_sql failed\n");
}
return_here:
//ss_dfprintf(stderr, "< create_parse_tree : %s\n", STRBOOL(failp));
//fflush(stderr);
return failp;
}
/**
* @node Set new query type if new is more restrictive than old.
*
* Parameters:
* @param qtype - <usage>
* <description>
*
* @param new_type - <usage>
* <description>
*
* @return
*
*
* @details The implementation relies on that enumerated values correspond
* to the restrictiviness of the value. That is, smaller value means less
* restrictive, for example, QUERY_TYPE_READ is smaller than QUERY_TYPE_WRITE.
*
*/
static skygw_query_type_t set_query_type(
skygw_query_type_t* qtype,
skygw_query_type_t new_type)
{
*qtype = MAX(*qtype, new_type);
return *qtype;
}
/**
* @node Detect query type, read-only, write, or session update
@ -334,6 +359,8 @@ static skygw_query_type_t resolve_query_type(
{
skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN;
LEX* lex;
Item* item;
int ftype;
/**
* By default, if sql_log_bin, that is, recording data modifications
* to binary log, is disabled, gateway treats operations normally.
@ -343,10 +370,9 @@ static skygw_query_type_t resolve_query_type(
*/
bool force_data_modify_op_replication;
//ss_dfprintf(stderr, "> resolve_query_type\n");
ss_info_dassert(thd != NULL, ("thd is NULL\n"));
force_data_modify_op_replication = FALSE;
force_data_modify_op_replication = FALSE;
lex = thd->lex;
/** SELECT ..INTO variable|OUTFILE|DUMPFILE */
@ -404,6 +430,133 @@ static skygw_query_type_t resolve_query_type(
default:
break;
}
if (QTYPE_LESS_RESTRICTIVE_THAN_WRITE(qtype)) {
/**
* These values won't change qtype more restrictive than write.
* UDFs and procedures could possibly cause session-wide write,
* but unless their content is replicated this is a limitation
* of this implementation.
* In other words : UDFs and procedures are not allowed to
* perform writes which are not replicated but nede to repeat
* in every node.
* It is not sure if such statements exist. vraa 25.10.13
*/
/**
* Search for system functions, UDFs and stored procedures.
*/
for (item=thd->free_list; item != NULL; item=item->next) {
Item::Type itype;
itype = item->type();
fprintf(stderr,
"Item %s:%s\n",
item->name,
STRITEMTYPE(itype));
if (item->type() == Item::SUBSELECT_ITEM) {
continue;
} else if (item->type() == Item::FUNC_ITEM) {
skygw_query_type_t
func_qtype = QUERY_TYPE_UNKNOWN;
/**
* Item types:
* FIELD_ITEM = 0, FUNC_ITEM,
* SUM_FUNC_ITEM, STRING_ITEM, INT_ITEM,
* REAL_ITEM, NULL_ITEM, VARBIN_ITEM,
* COPY_STR_ITEM, FIELD_AVG_ITEM,
* DEFAULT_VALUE_ITEM, PROC_ITEM,
* COND_ITEM, REF_ITEM, FIELD_STD_ITEM,
* FIELD_VARIANCE_ITEM,
* INSERT_VALUE_ITEM,
* SUBSELECT_ITEM, ROW_ITEM, CACHE_ITEM,
* TYPE_HOLDER, PARAM_ITEM,
* TRIGGER_FIELD_ITEM, DECIMAL_ITEM,
* XPATH_NODESET, XPATH_NODESET_CMP,
* VIEW_FIXER_ITEM,
* EXPR_CACHE_ITEM == 27
**/
Item_func::Functype ftype;
ftype = ((Item_func*)item)->functype();
/**
* Item_func types:
*
* UNKNOWN_FUNC = 0,EQ_FUNC, EQUAL_FUNC,
* NE_FUNC, LT_FUNC, LE_FUNC,
* GE_FUNC, GT_FUNC, FT_FUNC,
* LIKE_FUNC == 10, ISNULL_FUNC, ISNOTNULL_FUNC,
* COND_AND_FUNC, COND_OR_FUNC, XOR_FUNC,
* BETWEEN, IN_FUNC,
* MULT_EQUAL_FUNC, INTERVAL_FUNC,
* ISNOTNULLTEST_FUNC == 20,
* SP_EQUALS_FUNC, SP_DISJOINT_FUNC,
* SP_INTERSECTS_FUNC,
* SP_TOUCHES_FUNC, SP_CROSSES_FUNC,
* SP_WITHIN_FUNC, SP_CONTAINS_FUNC,
* SP_OVERLAPS_FUNC,
* SP_STARTPOINT, SP_ENDPOINT == 30,
* SP_EXTERIORRING, SP_POINTN, SP_GEOMETRYN,
* SP_INTERIORRINGN,NOT_FUNC, NOT_ALL_FUNC,
* NOW_FUNC, TRIG_COND_FUNC,
* SUSERVAR_FUNC, GUSERVAR_FUNC == 40,
* COLLATE_FUNC, EXTRACT_FUNC,
* CHAR_TYPECAST_FUNC,
* FUNC_SP, UDF_FUNC, NEG_FUNC,
* GSYSVAR_FUNC == 47
**/
switch (ftype) {
case Item_func::FUNC_SP:
/**
* An unknown (for maxscale) function / sp
* belongs to this category.
*/
func_qtype = QUERY_TYPE_WRITE;
fprintf(stderr,
"FUNC_SP, Stored procedure "
"or unknown function\n");
break;
case Item_func::UDF_FUNC:
func_qtype = QUERY_TYPE_WRITE;
fprintf(stderr,
"UDF_FUNC, User-defined "
"function\n");
break;
case Item_func::NOW_FUNC:
func_qtype = QUERY_TYPE_LOCAL_READ;
fprintf(stderr,
"NOW_FUNC, can be executed in "
"maxscale\n");
break;
case Item_func::UNKNOWN_FUNC:
func_qtype = QUERY_TYPE_READ;
/**
* Many built-in functions are of this
* type, for example, rand(), soundex(),
* repeat() .
*/
fprintf(stderr,
"UNKNOWN_FUNC, system function, "
"perhaps. RO.\n");
break;
default:
fprintf(stderr,
"Unknown function type %d\n",
ftype);
break;
} /**< switch */
/**< Set new query type */
qtype = set_query_type(&qtype, func_qtype);
}
/**
* Write goes to master and that won't change.
*/
if (qtype == QUERY_TYPE_WRITE) {
break;
}
} /**< for */
} /**< if */
return_here:
//ss_dfprintf(stderr, "< resolve_query_type : %s\n", STRQTYPE(qtype));
return qtype;

View File

@ -25,12 +25,14 @@ EXTERN_C_BLOCK_BEGIN
/**
* Query type for skygateway.
* The meaninful difference is whether master data was modified
* The meaninful difference is where operation is done and whether master data
* is modified
*/
typedef enum {
QUERY_TYPE_UNKNOWN = 7, /*!< Couln't find out or parse error */
QUERY_TYPE_WRITE, /*!< Master data will be modified */
QUERY_TYPE_LOCAL_READ, /*!< Read non-database data, execute in MaxScale */
QUERY_TYPE_READ, /*!< No updates */
QUERY_TYPE_WRITE, /*!< Master data will be modified */
QUERY_TYPE_SESSION_WRITE /*!< Session data will be modified */
} skygw_query_type_t;

View File

@ -28,8 +28,6 @@ static char* server_groups[] = {
NULL
};
static void slcursor_add_case(
slist_cursor_t* c,
void* data)
@ -132,6 +130,47 @@ int main(int argc, char** argv)
ss_dfprintf(stderr, ">> testmain\n");
c = slist_init();
/** Test some functions */
q = "SELECT MY_UDF('Hello')";
slcursor_add_case(
c,
query_test_init(q, QUERY_TYPE_WRITE, false, true));
/** This could be QUERY_TYPE_LOCAL_READ */
q = "SELECT repeat('a', 1024)";
slcursor_add_case(
c,
query_test_init(q, QUERY_TYPE_READ, false, true));
/** This could be QUERY_TYPE_LOCAL_READ */
q = "SELECT soundex('Hello')";
slcursor_add_case(
c,
query_test_init(q, QUERY_TYPE_READ, false, true));
q = "SELECT ssoundexx('Hello')";
slcursor_add_case(
c,
query_test_init(q, QUERY_TYPE_WRITE, false, true));
/** This could be QUERY_TYPE_LOCAL_READ */
q = "SELECT now()";
slcursor_add_case(
c,
query_test_init(q, QUERY_TYPE_READ, false, true));
/** This could be QUERY_TYPE_LOCAL_READ */
q = "SELECT rand()";
slcursor_add_case(
c,
query_test_init(q, QUERY_TYPE_READ, false, true));
q = "SELECT rand(234), MY_UDF('Hello'), soundex('Hello')";
slcursor_add_case(
c,
query_test_init(q, QUERY_TYPE_WRITE, false, true));
/** Read-only SELECTs */
q = "SELECT user from mysql.user";
slcursor_add_case(
@ -196,24 +235,8 @@ int main(int argc, char** argv)
"select * from table3";
slcursor_add_case(
c,
query_test_init(q, QUERY_TYPE_SESSION_WRITE, false, true));
/** Functions */
q = "SELECT NOW()";
slcursor_add_case(
c,
query_test_init(q, QUERY_TYPE_READ, false, false));
q = "SELECT SOUNDEX('Hello')";
slcursor_add_case(
c,
query_test_init(q, QUERY_TYPE_READ, false, false));
q = "SELECT MY_UDF('Hello')";
slcursor_add_case(
c,
query_test_init(q, QUERY_TYPE_READ, false, true));
query_test_init(q, QUERY_TYPE_READ, false, true));
/** RENAME TABLEs */
q = "RENAME TABLE T1 to T2";
slcursor_add_case(

View File

@ -492,8 +492,8 @@ static int routeQuery(
case QUERY_TYPE_READ:
skygw_log_write(LOGFILE_TRACE,
"%lu [routeQuery:rwsplit] Query type\t%s, routing "
"to Slave.",
"%lu [routeQuery:rwsplit] Query type\t%s, "
"routing to Slave.",
pthread_self(),
STRQTYPE(qtype));

View File

@ -121,7 +121,9 @@ typedef enum skygw_chk_t {
# define STRQTYPE(t) ((t) == QUERY_TYPE_WRITE ? "QUERY_TYPE_WRITE" : \
((t) == QUERY_TYPE_READ ? "QUERY_TYPE_READ" : \
((t) == QUERY_TYPE_SESSION_WRITE ? "QUERY_TYPE_SESSION_WRITE" : \
"QUERY_TYPE_UNKNOWN")))
((t) == QUERY_TYPE_UNKNOWN ? "QUERY_TYPE_UNKNWON" : \
((t) == QUERY_TYPE_LOCAL_READ ? "QUERY_TYPE_LOCAL_READ" : \
"Unknown query type")))))
#define STRLOGID(i) ((i) == LOGFILE_TRACE ? "LOGFILE_TRACE" : \
((i) == LOGFILE_MESSAGE ? "LOGFILE_MESSAGE" : \
((i) == LOGFILE_ERROR ? "LOGFILE_ERROR" : \
@ -170,6 +172,36 @@ typedef enum skygw_chk_t {
((s) == MYSQL_SESSION_CHANGE ? "MYSQL_SESSION_CHANGE" : \
"UNKNOWN MYSQL STATE"))))))))))
#define STRITEMTYPE(t) ((t) == Item::FIELD_ITEM ? "FIELD_ITEM" : \
((t) == Item::FUNC_ITEM ? "FUNC_ITEM" : \
((t) == Item::SUM_FUNC_ITEM ? "SUM_FUNC_ITEM" : \
((t) == Item::STRING_ITEM ? "STRING_ITEM" : \
((t) == Item::INT_ITEM ? "INT_ITEM" : \
((t) == Item::REAL_ITEM ? "REAL_ITEM" : \
((t) == Item::NULL_ITEM ? "NULL_ITEM" : \
((t) == Item::VARBIN_ITEM ? "VARBIN_ITEM" : \
((t) == Item::COPY_STR_ITEM ? "COPY_STR_ITEM" : \
((t) == Item::FIELD_AVG_ITEM ? "FIELD_AVG_ITEM" : \
((t) == Item::DEFAULT_VALUE_ITEM ? "DEFAULT_VALUE_ITEM" : \
((t) == Item::PROC_ITEM ? "PROC_ITEM" : \
((t) == Item::COND_ITEM ? "COND_ITEM" : \
((t) == Item::REF_ITEM ? "REF_ITEM" : \
(t) == Item::FIELD_STD_ITEM ? "FIELD_STD_ITEM" : \
((t) == Item::FIELD_VARIANCE_ITEM ? "FIELD_VARIANCE_ITEM" : \
((t) == Item::INSERT_VALUE_ITEM ? "INSERT_VALUE_ITEM": \
((t) == Item::SUBSELECT_ITEM ? "SUBSELECT_ITEM" : \
((t) == Item::ROW_ITEM ? "ROW_ITEM" : \
((t) == Item::CACHE_ITEM ? "CACHE_ITEM" : \
((t) == Item::TYPE_HOLDER ? "TYPE_HOLDER" : \
((t) == Item::PARAM_ITEM ? "PARAM_ITEM" : \
((t) == Item::TRIGGER_FIELD_ITEM ? "TRIGGER_FIELD_ITEM" : \
((t) == Item::DECIMAL_ITEM ? "DECIMAL_ITEM" : \
((t) == Item::XPATH_NODESET ? "XPATH_NODESET" : \
((t) == Item::XPATH_NODESET_CMP ? "XPATH_NODESET_CMP" : \
((t) == Item::VIEW_FIXER_ITEM ? "VIEW_FIXER_ITEM" : \
((t) == Item::EXPR_CACHE_ITEM ? "EXPR_CACHE_ITEM" : \
"Unknown item")))))))))))))))))))))))))))
#define STRDCBROLE(r) ((r) == DCB_ROLE_SERVICE_LISTENER ? "DCB_ROLE_SERVICE_LISTENER" : \
((r) == DCB_ROLE_REQUEST_HANDLER ? "DCB_ROLE_REQUEST_HANDLER" : \
"UNKNOWN DCB ROLE"))