Merge branch 'Z3' into log_manager_test

This commit is contained in:
Markus Makela
2014-09-09 10:50:50 +03:00
16 changed files with 640 additions and 272 deletions

View File

@ -406,9 +406,9 @@ return_here:
* restrictive, for example, QUERY_TYPE_READ is smaller than QUERY_TYPE_WRITE. * restrictive, for example, QUERY_TYPE_READ is smaller than QUERY_TYPE_WRITE.
* *
*/ */
static u_int16_t set_query_type( static u_int32_t set_query_type(
u_int16_t* qtype, u_int32_t* qtype,
u_int16_t new_type) u_int32_t new_type)
{ {
*qtype = MAX(*qtype, new_type); *qtype = MAX(*qtype, new_type);
return *qtype; return *qtype;
@ -434,7 +434,7 @@ static skygw_query_type_t resolve_query_type(
THD* thd) THD* thd)
{ {
skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN; skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN;
u_int16_t type = QUERY_TYPE_UNKNOWN; u_int32_t type = QUERY_TYPE_UNKNOWN;
int set_autocommit_stmt = -1; /*< -1 no, 0 disable, 1 enable */ int set_autocommit_stmt = -1; /*< -1 no, 0 disable, 1 enable */
LEX* lex; LEX* lex;
Item* item; Item* item;
@ -501,24 +501,48 @@ static skygw_query_type_t resolve_query_type(
type |= QUERY_TYPE_DISABLE_AUTOCOMMIT; type |= QUERY_TYPE_DISABLE_AUTOCOMMIT;
type |= QUERY_TYPE_BEGIN_TRX; type |= QUERY_TYPE_BEGIN_TRX;
} }
if (lex->option_type == OPT_GLOBAL)
{
/**
* SHOW syntax http://dev.mysql.com/doc/refman/5.6/en/show.html
*/
if (lex->sql_command == SQLCOM_SHOW_VARIABLES)
{
type |= QUERY_TYPE_GSYSVAR_READ;
}
/**
* SET syntax http://dev.mysql.com/doc/refman/5.6/en/set-statement.html
*/
else if (lex->sql_command == SQLCOM_SET_OPTION)
{
type |= QUERY_TYPE_GSYSVAR_WRITE;
}
/** /**
* REVOKE ALL, ASSIGN_TO_KEYCACHE, * REVOKE ALL, ASSIGN_TO_KEYCACHE,
* PRELOAD_KEYS, FLUSH, RESET, CREATE|ALTER|DROP SERVER * PRELOAD_KEYS, FLUSH, RESET, CREATE|ALTER|DROP SERVER
*/ */
if (lex->option_type == OPT_GLOBAL) else
{ {
type |= QUERY_TYPE_GLOBAL_WRITE; type |= QUERY_TYPE_GSYSVAR_WRITE;
}
goto return_qtype; goto return_qtype;
} }
else if (lex->option_type == OPT_SESSION) else if (lex->option_type == OPT_SESSION)
{ {
/** SHOW commands are all reads to one backend */ /**
* SHOW syntax http://dev.mysql.com/doc/refman/5.6/en/show.html
*/
if (lex->sql_command == SQLCOM_SHOW_VARIABLES) if (lex->sql_command == SQLCOM_SHOW_VARIABLES)
{ {
type |= QUERY_TYPE_SESSION_READ; type |= QUERY_TYPE_SYSVAR_READ;
} }
else /**
* SET syntax http://dev.mysql.com/doc/refman/5.6/en/set-statement.html
*/
else if (lex->sql_command == SQLCOM_SET_OPTION)
{ {
/** Either user- or system variable write */
type |= QUERY_TYPE_SESSION_WRITE; type |= QUERY_TYPE_SESSION_WRITE;
} }
goto return_qtype; goto return_qtype;
@ -538,31 +562,26 @@ static skygw_query_type_t resolve_query_type(
if (thd->variables.sql_log_bin == 0 && if (thd->variables.sql_log_bin == 0 &&
force_data_modify_op_replication) force_data_modify_op_replication)
{ {
/** Not replicated */
type |= QUERY_TYPE_SESSION_WRITE; type |= QUERY_TYPE_SESSION_WRITE;
} }
else else
{ {
type |= QUERY_TYPE_WRITE; /** Written to binlog, that is, replicated except tmp tables */
type |= QUERY_TYPE_WRITE; /*< to master */
if (lex->create_info.options & HA_LEX_CREATE_TMP_TABLE && if (lex->create_info.options & HA_LEX_CREATE_TMP_TABLE &&
lex->sql_command == SQLCOM_CREATE_TABLE) lex->sql_command == SQLCOM_CREATE_TABLE)
{ {
type |= QUERY_TYPE_CREATE_TMP_TABLE; type |= QUERY_TYPE_CREATE_TMP_TABLE; /*< remember in router */
} }
} }
goto return_qtype; goto return_qtype;
} }
/** Try to catch session modifications here */ /** Try to catch session modifications here */
switch (lex->sql_command) { switch (lex->sql_command) {
case SQLCOM_SET_OPTION: /*< SET commands. */ /** fallthrough */
if (lex->option_type == OPT_GLOBAL)
{
type |= QUERY_TYPE_GLOBAL_WRITE;
break;
}
/**<! fall through */
case SQLCOM_CHANGE_DB: case SQLCOM_CHANGE_DB:
case SQLCOM_DEALLOCATE_PREPARE: case SQLCOM_DEALLOCATE_PREPARE:
type |= QUERY_TYPE_SESSION_WRITE; type |= QUERY_TYPE_SESSION_WRITE;
@ -599,15 +618,23 @@ static skygw_query_type_t resolve_query_type(
default: default:
break; break;
} }
#if defined(UPDATE_VAR_SUPPORT)
if (QTYPE_LESS_RESTRICTIVE_THAN_WRITE(type)) { if (QTYPE_LESS_RESTRICTIVE_THAN_WRITE(type))
#endif
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_UNKNOWN) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_LOCAL_READ) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_USERVAR_READ) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_SYSVAR_READ) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_GSYSVAR_READ))
{
/** /**
* These values won't change qtype more restrictive than write. * These values won't change qtype more restrictive than write.
* UDFs and procedures could possibly cause session-wide write, * UDFs and procedures could possibly cause session-wide write,
* but unless their content is replicated this is a limitation * but unless their content is replicated this is a limitation
* of this implementation. * of this implementation.
* In other words : UDFs and procedures are not allowed to * In other words : UDFs and procedures are not allowed to
* perform writes which are not replicated but nede to repeat * perform writes which are not replicated but need to repeat
* in every node. * in every node.
* It is not sure if such statements exist. vraa 25.10.13 * It is not sure if such statements exist. vraa 25.10.13
*/ */
@ -628,7 +655,9 @@ static skygw_query_type_t resolve_query_type(
if (itype == Item::SUBSELECT_ITEM) { if (itype == Item::SUBSELECT_ITEM) {
continue; continue;
} else if (itype == Item::FUNC_ITEM) { }
else if (itype == Item::FUNC_ITEM)
{
int func_qtype = QUERY_TYPE_UNKNOWN; int func_qtype = QUERY_TYPE_UNKNOWN;
/** /**
* Item types: * Item types:
@ -710,23 +739,39 @@ static skygw_query_type_t resolve_query_type(
break; break;
/** System session variable */ /** System session variable */
case Item_func::GSYSVAR_FUNC: case Item_func::GSYSVAR_FUNC:
/** User-defined variable read */ func_qtype |= QUERY_TYPE_SYSVAR_READ;
case Item_func::GUSERVAR_FUNC:
/** User-defined variable modification */
case Item_func::SUSERVAR_FUNC:
func_qtype |= QUERY_TYPE_SESSION_READ;
LOGIF(LD, (skygw_log_write( LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG, LOGFILE_DEBUG,
"%lu [resolve_query_type] " "%lu [resolve_query_type] "
"functype SUSERVAR_FUNC, could be " "functype GSYSVAR_FUNC, system "
"executed in MaxScale.", "variable read.",
pthread_self())));
break;
/** User-defined variable read */
case Item_func::GUSERVAR_FUNC:
func_qtype |= QUERY_TYPE_USERVAR_READ;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [resolve_query_type] "
"functype GUSERVAR_FUNC, user "
"variable read.",
pthread_self())));
break;
/** User-defined variable modification */
case Item_func::SUSERVAR_FUNC:
func_qtype |= QUERY_TYPE_SESSION_WRITE;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [resolve_query_type] "
"functype SUSERVAR_FUNC, user "
"variable write.",
pthread_self()))); pthread_self())));
break; break;
case Item_func::UNKNOWN_FUNC: case Item_func::UNKNOWN_FUNC:
if (item->name != NULL && if (item->name != NULL &&
strcmp(item->name, "last_insert_id()") == 0) strcmp(item->name, "last_insert_id()") == 0)
{ {
func_qtype |= QUERY_TYPE_SESSION_READ; func_qtype |= QUERY_TYPE_MASTER_READ;
} }
else else
{ {
@ -757,6 +802,7 @@ static skygw_query_type_t resolve_query_type(
/**< Set new query type */ /**< Set new query type */
type |= set_query_type(&type, func_qtype); type |= set_query_type(&type, func_qtype);
} }
#if defined(UPDATE_VAR_SUPPORT)
/** /**
* Write is as restrictive as it gets due functions, * Write is as restrictive as it gets due functions,
* so break. * so break.
@ -764,6 +810,7 @@ static skygw_query_type_t resolve_query_type(
if ((type & QUERY_TYPE_WRITE) == QUERY_TYPE_WRITE) { if ((type & QUERY_TYPE_WRITE) == QUERY_TYPE_WRITE) {
break; break;
} }
#endif
} /**< for */ } /**< for */
} /**< if */ } /**< if */
return_qtype: return_qtype:

View File

@ -31,23 +31,30 @@ EXTERN_C_BLOCK_BEGIN
* is modified * is modified
*/ */
typedef enum { typedef enum {
QUERY_TYPE_UNKNOWN = 0x0000, /*< Initial value, can't be tested bitwisely */ QUERY_TYPE_UNKNOWN = 0x000000, /*< Initial value, can't be tested bitwisely */
QUERY_TYPE_LOCAL_READ = 0x0001, /*< Read non-database data, execute in MaxScale */ QUERY_TYPE_LOCAL_READ = 0x000001, /*< Read non-database data, execute in MaxScale:any */
QUERY_TYPE_READ = 0x0002, /*< No updates */ QUERY_TYPE_READ = 0x000002, /*< Read database data:any */
QUERY_TYPE_WRITE = 0x0004, /*< Master data will be modified */ QUERY_TYPE_WRITE = 0x000004, /*< Master data will be modified:master */
QUERY_TYPE_SESSION_WRITE = 0x0008, /*< Session data will be modified */ QUERY_TYPE_MASTER_READ = 0x000008, /*< Read from the master:master */
QUERY_TYPE_GLOBAL_WRITE = 0x0010, /*< Global system variable modification */ QUERY_TYPE_SESSION_WRITE = 0x000010, /*< Session data will be modified:master or all */
QUERY_TYPE_BEGIN_TRX = 0x0020, /*< BEGIN or START TRANSACTION */ /** Not implemented yet */
QUERY_TYPE_ENABLE_AUTOCOMMIT = 0x0040, /*< SET autocommit=1 */ // QUERY_TYPE_USERVAR_WRITE = 0x000020, /*< Write a user variable:master or all */
QUERY_TYPE_DISABLE_AUTOCOMMIT = 0x0080, /*< SET autocommit=0 */ QUERY_TYPE_USERVAR_READ = 0x000040, /*< Read a user variable:master or any */
QUERY_TYPE_ROLLBACK = 0x0100, /*< ROLLBACK */ QUERY_TYPE_SYSVAR_READ = 0x000080, /*< Read a system variable:master or any */
QUERY_TYPE_COMMIT = 0x0200, /*< COMMIT */ /** Not implemented yet */
QUERY_TYPE_PREPARE_NAMED_STMT = 0x0400, /*< Prepared stmt with name from user */ // QUERY_TYPE_SYSVAR_WRITE = 0x000100, /*< Write a system variable:master or all */
QUERY_TYPE_PREPARE_STMT = 0x0800, /*< Prepared stmt with id provided by server */ QUERY_TYPE_GSYSVAR_READ = 0x000200, /*< Read global system variable:master or any */
QUERY_TYPE_EXEC_STMT = 0x1000, /*< Execute prepared statement */ QUERY_TYPE_GSYSVAR_WRITE = 0x000400, /*< Write global system variable:master or all */
QUERY_TYPE_SESSION_READ = 0x2000, /*< Read session data (from master 31.8.14) */ QUERY_TYPE_BEGIN_TRX = 0x000800, /*< BEGIN or START TRANSACTION */
QUERY_TYPE_CREATE_TMP_TABLE = 0x4000, /*< Create temporary table */ QUERY_TYPE_ENABLE_AUTOCOMMIT = 0x001000, /*< SET autocommit=1 */
QUERY_TYPE_READ_TMP_TABLE = 0x8000 /*< Read temporary table */ QUERY_TYPE_DISABLE_AUTOCOMMIT = 0x002000, /*< SET autocommit=0 */
QUERY_TYPE_ROLLBACK = 0x004000, /*< ROLLBACK */
QUERY_TYPE_COMMIT = 0x008000, /*< COMMIT */
QUERY_TYPE_PREPARE_NAMED_STMT = 0x010000, /*< Prepared stmt with name from user:all */
QUERY_TYPE_PREPARE_STMT = 0x020000, /*< Prepared stmt with id provided by server:all */
QUERY_TYPE_EXEC_STMT = 0x040000, /*< Execute prepared statement:master or any */
QUERY_TYPE_CREATE_TMP_TABLE = 0x080000, /*< Create temporary table:master (could be all) */
QUERY_TYPE_READ_TMP_TABLE = 0x100000 /*< Read temporary table:master (could be any) */
} skygw_query_type_t; } skygw_query_type_t;

View File

@ -52,6 +52,8 @@ passwd=maxpwd
# version_string=<specific string for server handshake, # version_string=<specific string for server handshake,
# default is the MariaDB embedded library version> # default is the MariaDB embedded library version>
# #
# read_ses_variables_from_slaves=<Y|N> Default is Yes
# write_ses_variables_to_all=<Y|(N)> Default is No
# router_options=<option[=value]>,<option[=value]>,... # router_options=<option[=value]>,<option[=value]>,...
# where value=[master|slave|synced] # where value=[master|slave|synced]
# #
@ -70,6 +72,8 @@ router=readwritesplit
servers=server1,server2,server3 servers=server1,server2,server3
user=maxuser user=maxuser
passwd=maxpwd passwd=maxpwd
read_ses_variables_from_slaves=No
write_ses_variables_to_all=Yes
max_slave_connections=50% max_slave_connections=50%
max_slave_replication_lag=30 max_slave_replication_lag=30
router_options=slave_selection_criteria=LEAST_BEHIND_MASTER router_options=slave_selection_criteria=LEAST_BEHIND_MASTER

View File

@ -41,6 +41,7 @@
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#include <stdlib.h> #include <stdlib.h>
#include <ctype.h>
#include <ini.h> #include <ini.h>
#include <config.h> #include <config.h>
#include <service.h> #include <service.h>
@ -247,18 +248,28 @@ int error_count = 0;
{ {
char* max_slave_conn_str; char* max_slave_conn_str;
char* max_slave_rlag_str; char* max_slave_rlag_str;
char *user;
char *auth;
char *enable_root_user;
char *weightby;
char *version_string;
bool is_rwsplit = false;
obj->element = service_alloc(obj->object, router); obj->element = service_alloc(obj->object, router);
char *user = user = config_get_value(obj->parameters, "user");
config_get_value(obj->parameters, "user"); auth = config_get_value(obj->parameters, "passwd");
char *auth = enable_root_user = config_get_value(
config_get_value(obj->parameters, "passwd"); obj->parameters,
char *enable_root_user = "enable_root_user");
config_get_value(obj->parameters, "enable_root_user"); weightby = config_get_value(obj->parameters, "weightby");
char *weightby =
config_get_value(obj->parameters, "weightby");
char *version_string = config_get_value(obj->parameters, "version_string"); version_string = config_get_value(obj->parameters,
"version_string");
/** flag for rwsplit-specific parameters */
if (strncmp(router, "readwritesplit", strlen("readwritesplit")+1) == 0)
{
is_rwsplit = true;
}
if (obj->element == NULL) /*< if module load failed */ if (obj->element == NULL) /*< if module load failed */
{ {
@ -374,7 +385,71 @@ int error_count = 0;
param->value))); param->value)));
} }
} }
/** Parameters for rwsplit router only */
if (is_rwsplit)
{
CONFIG_PARAMETER* param;
char* write_sesvars_to_all;
char* read_sesvars_from_slaves;
bool succp;
write_sesvars_to_all =
config_get_value(obj->parameters,
"write_ses_variables_to_all");
if (write_sesvars_to_all != NULL)
{
param = config_get_param(
obj->parameters,
"write_ses_variables_to_all");
succp = service_set_param_value(obj->element,
param,
write_sesvars_to_all,
COUNT_NONE,
BOOL_TYPE);
if (!succp)
{
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"* Warning : invalid value type "
"for parameter \'%s.%s = %s\'\n\tExpected "
"type is <true/false> for write session "
"variables to all backends.",
((SERVICE*)obj->element)->name,
param->name,
param->value)));
} }
}
read_sesvars_from_slaves =
config_get_value(
obj->parameters,
"read_ses_variables_from_slaves");
if (read_sesvars_from_slaves != NULL)
{
param = config_get_param(
obj->parameters,
"read_ses_variables_from_slaves");
succp = service_set_param_value(obj->element,
param,
read_sesvars_from_slaves,
COUNT_NONE,
BOOL_TYPE);
if (!succp)
{
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"* Warning : invalid value type "
"for parameter \'%s.%s = %s\'\n\tExpected "
"type is <true/false> for write session "
"variables to all backends.",
((SERVICE*)obj->element)->name,
param->name,
param->value)));
}
}
} /*< if (rw_split) */
} /*< if (router) */
else else
{ {
obj->element = NULL; obj->element = NULL;
@ -813,12 +888,15 @@ config_param_type_t config_get_paramtype(
return param->qfd_param_type; return param->qfd_param_type;
} }
int config_get_valint( bool config_get_valint(
int* val,
CONFIG_PARAMETER* param, CONFIG_PARAMETER* param,
const char* name, /*< if NULL examine current param only */ const char* name, /*< if NULL examine current param only */
config_param_type_t ptype) config_param_type_t ptype)
{ {
int val = -1; /*< -1 indicates failure */ bool succp = false;;
ss_dassert(ptype == COUNT_TYPE || ptype == PERCENT_TYPE);
while (param) while (param)
{ {
@ -826,29 +904,57 @@ int config_get_valint(
{ {
switch (ptype) { switch (ptype) {
case COUNT_TYPE: case COUNT_TYPE:
val = param->qfd.valcount; *val = param->qfd.valcount;
goto return_val; succp = true;
goto return_succp;
case PERCENT_TYPE: case PERCENT_TYPE:
val = param->qfd.valpercent; *val = param->qfd.valpercent;
goto return_val; succp =true;
goto return_succp;
case BOOL_TYPE:
val = param->qfd.valbool;
goto return_val;
default: default:
goto return_val; goto return_succp;
} }
} }
else if (name == NULL)
{
goto return_val;
}
param = param->next; param = param->next;
} }
return_val: return_succp:
return val; return succp;
}
bool config_get_valbool(
bool* val,
CONFIG_PARAMETER* param,
const char* name,
config_param_type_t ptype)
{
bool succp;
ss_dassert(ptype == BOOL_TYPE);
if (ptype != BOOL_TYPE)
{
succp = false;
goto return_succp;
}
while (param)
{
if (name == NULL || !strncmp(param->name, name, MAX_PARAM_LEN))
{
*val = param->qfd.valbool;
succp = true;
goto return_succp;
}
param = param->next;
}
succp = false;
return_succp:
return succp;
} }
@ -1320,6 +1426,8 @@ static char *service_params[] =
"enable_root_user", "enable_root_user",
"max_slave_connections", "max_slave_connections",
"max_slave_replication_lag", "max_slave_replication_lag",
"write_ses_variables_to_all", /*< rwsplit only */
"read_ses_variables_from_slaves", /*< rwsplit only */
"version_string", "version_string",
"filters", "filters",
NULL NULL

View File

@ -52,12 +52,26 @@
#include <poll.h> #include <poll.h>
#include <skygw_utils.h> #include <skygw_utils.h>
#include <log_manager.h> #include <log_manager.h>
#include <../../../mariadb-5.5.30/include/ft_global.h>
extern int lm_enabled_logfiles_bitmask; extern int lm_enabled_logfiles_bitmask;
/** To be used with configuration type checks */
typedef struct typelib_st {
int tl_nelems;
const char* tl_name;
const char** tl_p_elems;
} typelib_t;
static const char* bool_strings[11]= {"FALSE", "TRUE", "OFF", "ON", "N", "Y", "0", "1", "NO", "YES", 0};
typelib_t bool_type = {array_nelems(bool_strings)-1, "bool_type", bool_strings};
static SPINLOCK service_spin = SPINLOCK_INIT; static SPINLOCK service_spin = SPINLOCK_INIT;
static SERVICE *allServices = NULL; static SERVICE *allServices = NULL;
static int find_type(typelib_t* tl, const char* needle, int maxlen);
static void service_add_qualified_param( static void service_add_qualified_param(
SERVICE* svc, SERVICE* svc,
CONFIG_PARAMETER* param); CONFIG_PARAMETER* param);
@ -1009,8 +1023,11 @@ bool service_set_param_value (
{ {
char* p; char* p;
int valint; int valint;
bool valbool;
bool succp = true; bool succp = true;
if (PARAM_IS_TYPE(type,PERCENT_TYPE) ||PARAM_IS_TYPE(type,COUNT_TYPE))
{
/** /**
* Find out whether the value is numeric and ends with '%' or '\0' * Find out whether the value is numeric and ends with '%' or '\0'
*/ */
@ -1068,13 +1085,77 @@ bool service_set_param_value (
/** Log error */ /** Log error */
} }
} }
}
else if (type == BOOL_TYPE)
{
unsigned int rc;
rc = find_type(&bool_type, valstr, strlen(valstr)+1);
if (rc > 0)
{
succp = true;
if (rc%2 == 1)
{
valbool = false;
}
else if (rc%2 == 0)
{
valbool = true;
}
/** add param to config */
config_set_qualified_param(param,
(void *)&valbool,
BOOL_TYPE);
}
else
{
succp = false;
}
}
if (succp) if (succp)
{ {
service_add_qualified_param(service, param); /*< add param to svc */ service_add_qualified_param(service, param); /*< add param to svc */
} }
return succp; return succp;
} }
/*
* Function to find a string in typelib_t
* (similar to find_type() of mysys/typelib.c)
*
* SYNOPSIS
* find_type()
* lib typelib_t
* find String to find
* length Length of string to find
* part_match Allow part matching of value
*
* RETURN
* 0 error
* > 0 position in TYPELIB->type_names +1
*/
static int find_type(
typelib_t* tl,
const char* needle,
int maxlen)
{
int i;
if (tl == NULL || needle == NULL || maxlen <= 0)
{
return -1;
}
for (i=0; i<tl->tl_nelems; i++)
{
if (strncasecmp(tl->tl_p_elems[i], needle, maxlen) == 0)
{
return i+1;
}
}
return 0;
}
/** /**
* Add qualified config parameter to SERVICE struct. * Add qualified config parameter to SERVICE struct.

View File

@ -13,7 +13,7 @@ TESTLOG := $(shell pwd)/testcore.log
LOGPATH := $(ROOT_PATH)/log_manager LOGPATH := $(ROOT_PATH)/log_manager
UTILSPATH := $(ROOT_PATH)/utils UTILSPATH := $(ROOT_PATH)/utils
LDFLAGS=-rdynamic -L$(LOGPATH) \ LDFLAGS=-rdynamic -L$(LOGPATH) -L$(EMBEDDED_LIB) \
-Wl,-rpath,$(DEST)/lib \ -Wl,-rpath,$(DEST)/lib \
-Wl,-rpath,$(LOGPATH) -Wl,-rpath,$(UTILSPATH) \ -Wl,-rpath,$(LOGPATH) -Wl,-rpath,$(UTILSPATH) \
-Wl,-rpath,$(EMBEDDED_LIB) -Wl,-rpath,$(EMBEDDED_LIB)

View File

@ -99,7 +99,14 @@ bool config_set_qualified_param(
config_param_type_t type); config_param_type_t type);
int config_get_valint( bool config_get_valint(
int* val,
CONFIG_PARAMETER* param,
const char* name, /*< if NULL examine current param only */
config_param_type_t ptype);
bool config_get_valbool(
bool* val,
CONFIG_PARAMETER* param, CONFIG_PARAMETER* param,
const char* name, /*< if NULL examine current param only */ const char* name, /*< if NULL examine current param only */
config_param_type_t ptype); config_param_type_t ptype);

View File

@ -135,7 +135,7 @@ typedef struct service {
struct service *next; /**< The next service in the linked list */ struct service *next; /**< The next service in the linked list */
} SERVICE; } SERVICE;
typedef enum count_spec_t {COUNT_ATLEAST=0, COUNT_EXACT, COUNT_ATMOST} count_spec_t; typedef enum count_spec_t {COUNT_NONE=0, COUNT_ATLEAST, COUNT_EXACT, COUNT_ATMOST} count_spec_t;
#define SERVICE_STATE_ALLOC 1 /**< The service has been allocated */ #define SERVICE_STATE_ALLOC 1 /**< The service has been allocated */
#define SERVICE_STATE_STARTED 2 /**< The service has been started */ #define SERVICE_STATE_STARTED 2 /**< The service has been started */

View File

@ -30,7 +30,7 @@ CFLAGS=-c -fPIC -I/usr/include -I../include -I../../include -I$(LOGPATH) -I$(QCL
include ../../../makefile.inc include ../../../makefile.inc
LDFLAGS=-shared -L$(LOGPATH) -L$(QCLASSPATH) -Wl,-rpath,$(DEST)/lib \ LDFLAGS=-shared -L$(LOGPATH) -L$(EMBEDDED_LIB) -L$(QCLASSPATH) -Wl,-rpath,$(DEST)/lib \
-Wl,-rpath,$(LOGPATH) -Wl,-rpath,$(UTILSPATH) -Wl,-rpath,$(LOGPATH) -Wl,-rpath,$(UTILSPATH)
TESTSRCS=testfilter.c TESTSRCS=testfilter.c
@ -48,7 +48,7 @@ MQOBJ=$(MQSRCS:.c=.o)
SRCS=$(TESTSRCS) $(QLASRCS) $(REGEXSRCS) $(TOPNSRCS) $(TEESRCS) SRCS=$(TESTSRCS) $(QLASRCS) $(REGEXSRCS) $(TOPNSRCS) $(TEESRCS)
OBJ=$(SRCS:.c=.o) OBJ=$(SRCS:.c=.o)
LIBS=$(UTILSPATH)/skygw_utils.o -lssl -llog_manager LIBS=$(UTILSPATH)/skygw_utils.o -lssl -llog_manager
MODULES= libtestfilter.so libqlafilter.so libregexfilter.so libtopfilter.so libtee.so MODULES= libtestfilter.so libqlafilter.so libregexfilter.so libtopfilter.so libhintfilter.so libtee.so
ifndef BUILD_RABBITMQ ifndef BUILD_RABBITMQ
BUILD_RABBITMQ=Y BUILD_RABBITMQ=Y
@ -81,7 +81,7 @@ libtee.so: $(TEEOBJ)
$(CC) $(LDFLAGS) $(TEEOBJ) $(LIBS) -o $@ $(CC) $(LDFLAGS) $(TEEOBJ) $(LIBS) -o $@
libhintfilter.so: libhintfilter.so:
# (cd hint; touch depend.mk ; make; cp $@ ..) (cd hint; touch depend.mk ; make; cp $@ ..)
.c.o: .c.o:
$(CC) $(CFLAGS) $< -o $@ $(CC) $(CFLAGS) $< -o $@
@ -92,12 +92,12 @@ clean:
tags: tags:
ctags $(SRCS) $(HDRS) ctags $(SRCS) $(HDRS)
# (cd hint; touch depend.mk; make tags) (cd hint; touch depend.mk; make tags)
depend: depend:
@rm -f depend.mk @rm -f depend.mk
cc -M $(CFLAGS) $(SRCS) > depend.mk cc -M $(CFLAGS) $(SRCS) > depend.mk
# (cd hint; touch depend.mk; make depend) (cd hint; touch depend.mk; make depend)
install: $(MODULES) install: $(MODULES)
install -D $(MODULES) $(DEST)/modules install -D $(MODULES) $(DEST)/modules

View File

@ -231,18 +231,9 @@ HINT_MODE mode = HM_EXECUTE;
goto retblock; goto retblock;
} }
/** This is not MaxScale hint because it doesn't start with 'maxscale' */
if (tok->token != TOK_MAXSCALE) if (tok->token != TOK_MAXSCALE)
{ {
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Error : Invalid hint string '%s'. Hint should start "
"with keyword 'maxscale'. Hint ignored.",
token_get_keyword(tok))));
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Invalid hint string '%s'. Hint should start "
"with keyword 'maxscale'. Hint ignored.",
token_get_keyword(tok))));
token_free(tok); token_free(tok);
goto retblock; goto retblock;
} }

View File

@ -118,6 +118,8 @@ typedef enum select_criteria {
/** default values for rwsplit configuration parameters */ /** default values for rwsplit configuration parameters */
#define CONFIG_MAX_SLAVE_CONN 1 #define CONFIG_MAX_SLAVE_CONN 1
#define CONFIG_MAX_SLAVE_RLAG -1 /*< not used */ #define CONFIG_MAX_SLAVE_RLAG -1 /*< not used */
#define CONFIG_READ_SESVARS_FROM_SLAVES false
#define CONFIG_WRITE_SESVARS_TO_ALL true
#define GET_SELECT_CRITERIA(s) \ #define GET_SELECT_CRITERIA(s) \
(strncmp(s,"LEAST_GLOBAL_CONNECTIONS", strlen("LEAST_GLOBAL_CONNECTIONS")) == 0 ? \ (strncmp(s,"LEAST_GLOBAL_CONNECTIONS", strlen("LEAST_GLOBAL_CONNECTIONS")) == 0 ? \
@ -232,6 +234,11 @@ typedef struct rwsplit_config_st {
int rw_max_slave_conn_count; int rw_max_slave_conn_count;
select_criteria_t rw_slave_select_criteria; select_criteria_t rw_slave_select_criteria;
int rw_max_slave_replication_lag; int rw_max_slave_replication_lag;
/** Route user- & system variable writes to all backends */
bool rw_write_sesvars_to_all;
/** Route queries including user- & system variables to slaves */
bool rw_read_sesvars_from_slaves;
} rwsplit_config_t; } rwsplit_config_t;

View File

@ -100,10 +100,11 @@ static backend_ref_t* get_bref_from_dcb(ROUTER_CLIENT_SES* rses, DCB* dcb);
static route_target_t get_route_target ( static route_target_t get_route_target (
skygw_query_type_t qtype, skygw_query_type_t qtype,
bool read_sesvars_from_slaves,
bool write_sesvars_to_all,
bool trx_active, bool trx_active,
HINT* hint); HINT* hint);
static uint8_t getCapabilities (ROUTER* inst, void* router_session); static uint8_t getCapabilities (ROUTER* inst, void* router_session);
#if defined(NOT_USED) #if defined(NOT_USED)
@ -366,6 +367,7 @@ static void refreshInstance(
{ {
CONFIG_PARAMETER* param; CONFIG_PARAMETER* param;
bool refresh_single; bool refresh_single;
config_param_type_t paramtype;
if (singleparam != NULL) if (singleparam != NULL)
{ {
@ -377,36 +379,88 @@ static void refreshInstance(
param = router->service->svc_config_param; param = router->service->svc_config_param;
refresh_single = false; refresh_single = false;
} }
paramtype = config_get_paramtype(param);
while (param != NULL) while (param != NULL)
{ {
config_param_type_t paramtype;
paramtype = config_get_paramtype(param);
if (paramtype == COUNT_TYPE) if (paramtype == COUNT_TYPE)
{ {
if (strncmp(param->name, "max_slave_connections", MAX_PARAM_LEN) == 0) if (strncmp(param->name, "max_slave_connections", MAX_PARAM_LEN) == 0)
{ {
int val;
bool succp;
router->rwsplit_config.rw_max_slave_conn_percent = 0; router->rwsplit_config.rw_max_slave_conn_percent = 0;
router->rwsplit_config.rw_max_slave_conn_count =
config_get_valint(param, NULL, paramtype); succp = config_get_valint(&val, param, NULL, paramtype);
if (succp)
{
router->rwsplit_config.rw_max_slave_conn_count = val;
}
} }
else if (strncmp(param->name, else if (strncmp(param->name,
"max_slave_replication_lag", "max_slave_replication_lag",
MAX_PARAM_LEN) == 0) MAX_PARAM_LEN) == 0)
{ {
router->rwsplit_config.rw_max_slave_replication_lag = int val;
config_get_valint(param, NULL, paramtype); bool succp;
succp = config_get_valint(&val, param, NULL, paramtype);
if (succp)
{
router->rwsplit_config.rw_max_slave_replication_lag = val;
}
} }
} }
else if (paramtype == PERCENT_TYPE) else if (paramtype == PERCENT_TYPE)
{ {
if (strncmp(param->name, "max_slave_connections", MAX_PARAM_LEN) == 0) if (strncmp(param->name, "max_slave_connections", MAX_PARAM_LEN) == 0)
{ {
int val;
bool succp;
router->rwsplit_config.rw_max_slave_conn_count = 0; router->rwsplit_config.rw_max_slave_conn_count = 0;
router->rwsplit_config.rw_max_slave_conn_percent =
config_get_valint(param, NULL, paramtype); succp = config_get_valint(&val, param, NULL, paramtype);
if (succp)
{
router->rwsplit_config.rw_max_slave_conn_percent = val;
}
}
}
if (paramtype == BOOL_TYPE)
{
if (strncmp(param->name,
"read_ses_variables_from_slaves",
MAX_PARAM_LEN) == 0)
{
bool val;
bool succp;
succp = config_get_valbool(&val, param, NULL, paramtype);
if (succp)
{
router->rwsplit_config.rw_read_sesvars_from_slaves = val;
}
}
else if (strncmp(param->name,
"write_ses_variables_to_all",
MAX_PARAM_LEN) == 0)
{
bool val;
bool succp;
succp = config_get_valbool(&val, param, NULL, paramtype);
if (succp)
{
router->rwsplit_config.rw_write_sesvars_to_all = val;
}
} }
} }
@ -459,6 +513,7 @@ static void refreshInstance(
} }
} }
#endif /*< NOT_USED */ #endif /*< NOT_USED */
} }
/** /**
@ -637,6 +692,21 @@ createInstance(SERVICE *service, char **options)
refreshInstance(router, param); refreshInstance(router, param);
} }
router->rwsplit_version = service->svc_config_version; router->rwsplit_version = service->svc_config_version;
/** Set default values */
router->rwsplit_config.rw_read_sesvars_from_slaves = CONFIG_READ_SESVARS_FROM_SLAVES;
router->rwsplit_config.rw_write_sesvars_to_all = CONFIG_WRITE_SESVARS_TO_ALL;
param = config_get_param(service->svc_config_param, "read_ses_variables_from_slaves");
if (param != NULL)
{
refreshInstance(router, param);
}
param = config_get_param(service->svc_config_param, "write_ses_variables_to_all");
if (param != NULL)
{
refreshInstance(router, param);
}
/** /**
* We have completed the creation of the router data, so now * We have completed the creation of the router data, so now
* insert this router into the linked list of routers * insert this router into the linked list of routers
@ -1115,33 +1185,54 @@ return_succp:
static route_target_t get_route_target ( static route_target_t get_route_target (
skygw_query_type_t qtype, skygw_query_type_t qtype,
bool trx_active, bool trx_active,
bool read_ses_variables_from_slaves,
bool write_ses_variables_to_all,
HINT* hint) HINT* hint)
{ {
route_target_t target; route_target_t target;
/**
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_SESSION_WRITE) || * These queries are not affected by hints
QUERY_IS_TYPE(qtype, QUERY_TYPE_PREPARE_STMT) || */
QUERY_IS_TYPE(qtype, QUERY_TYPE_PREPARE_NAMED_STMT)) if (!trx_active &&
(QUERY_IS_TYPE(qtype, QUERY_TYPE_PREPARE_STMT) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_PREPARE_NAMED_STMT) ||
/** Configured to allow writing variables to all nodes */
(write_ses_variables_to_all &&
(QUERY_IS_TYPE(qtype, QUERY_TYPE_SESSION_WRITE) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_GSYSVAR_WRITE)))))
{ {
/** hints don't affect on routing */ /** hints don't affect on routing */
target = TARGET_ALL; target = TARGET_ALL;
} }
/** /**
* Read-only statements to slave or to master can be re-routed after * Hints may affect on routing of the following queries
* the hints
*/ */
else if ((QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) || else if (!trx_active &&
QUERY_IS_TYPE(qtype, QUERY_TYPE_SESSION_READ)) && (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) || /*< any SELECT */
!trx_active) QUERY_IS_TYPE(qtype, QUERY_TYPE_USERVAR_READ)|| /*< read user var */
QUERY_IS_TYPE(qtype, QUERY_TYPE_SYSVAR_READ) || /*< read sys var */
QUERY_IS_TYPE(qtype, QUERY_TYPE_EXEC_STMT) || /*< prepared stmt exec */
QUERY_IS_TYPE(qtype, QUERY_TYPE_GSYSVAR_READ))) /*< read global sys var */
{ {
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ)) /** First set expected targets before evaluating hints */
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) ||
/** Configured to allow reading variables from slaves */
(read_ses_variables_from_slaves &&
(QUERY_IS_TYPE(qtype, QUERY_TYPE_USERVAR_READ) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_SYSVAR_READ) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_GSYSVAR_READ))))
{ {
target = TARGET_SLAVE; target = TARGET_SLAVE;
} }
else else if (QUERY_IS_TYPE(qtype, QUERY_TYPE_EXEC_STMT) ||
/** Configured not to allow reading variables from slaves */
(!read_ses_variables_from_slaves &&
(QUERY_IS_TYPE(qtype, QUERY_TYPE_USERVAR_READ) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_SYSVAR_READ))))
{ {
target = TARGET_MASTER; target = TARGET_MASTER;
} }
/** process routing hints */ /** process routing hints */
while (hint != NULL) while (hint != NULL)
{ {
@ -1210,11 +1301,32 @@ static route_target_t get_route_target (
else else
{ {
/** hints don't affect on routing */ /** hints don't affect on routing */
ss_dassert(trx_active ||
(QUERY_IS_TYPE(qtype, QUERY_TYPE_WRITE) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_MASTER_READ) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_SESSION_WRITE) ||
(QUERY_IS_TYPE(qtype, QUERY_TYPE_USERVAR_READ) &&
!write_ses_variables_to_all) ||
(QUERY_IS_TYPE(qtype, QUERY_TYPE_SYSVAR_READ) &&
!write_ses_variables_to_all) ||
(QUERY_IS_TYPE(qtype, QUERY_TYPE_GSYSVAR_READ) &&
!write_ses_variables_to_all) ||
(QUERY_IS_TYPE(qtype, QUERY_TYPE_GSYSVAR_WRITE) &&
!write_ses_variables_to_all) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_BEGIN_TRX) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_ROLLBACK) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_COMMIT) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_EXEC_STMT) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_CREATE_TMP_TABLE) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_READ_TMP_TABLE) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_UNKNOWN)));
target = TARGET_MASTER; target = TARGET_MASTER;
} }
return target; return target;
} }
/** /**
* Check if the query is a DROP TABLE... query and * Check if the query is a DROP TABLE... query and
* if it targets a temporary table, remove it from the hashtable. * if it targets a temporary table, remove it from the hashtable.
@ -1552,7 +1664,6 @@ static int routeQuery(
master_dcb = router_cli_ses->rses_master_ref->bref_dcb; master_dcb = router_cli_ses->rses_master_ref->bref_dcb;
CHK_DCB(master_dcb); CHK_DCB(master_dcb);
switch(packet_type) { switch(packet_type) {
case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */ case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */
case MYSQL_COM_INIT_DB: /*< 2 DDL must go to the master */ case MYSQL_COM_INIT_DB: /*< 2 DDL must go to the master */
@ -1597,11 +1708,9 @@ static int routeQuery(
break; break;
} /**< switch by packet type */ } /**< switch by packet type */
/** /**
* Check if the query has anything to do with temporary tables. * Check if the query has anything to do with temporary tables.
*/ */
qtype = is_read_tmp_table(instance,router_session,querybuf,qtype); qtype = is_read_tmp_table(instance,router_session,querybuf,qtype);
check_create_tmp_table(instance,router_session,querybuf,qtype); check_create_tmp_table(instance,router_session,querybuf,qtype);
check_drop_tmp_table(instance,router_session,querybuf,qtype); check_drop_tmp_table(instance,router_session,querybuf,qtype);
@ -1643,7 +1752,6 @@ static int routeQuery(
router_cli_ses->rses_transaction_active = false; router_cli_ses->rses_transaction_active = false;
} }
/** /**
* Find out where to route the query. Result may not be clear; it is * Find out where to route the query. Result may not be clear; it is
* possible to have a hint for routing to a named server which can * possible to have a hint for routing to a named server which can
@ -1663,6 +1771,8 @@ static int routeQuery(
*/ */
route_target = get_route_target(qtype, route_target = get_route_target(qtype,
router_cli_ses->rses_transaction_active, router_cli_ses->rses_transaction_active,
router_cli_ses->rses_config.rw_read_sesvars_from_slaves,
router_cli_ses->rses_config.rw_write_sesvars_to_all,
querybuf->hint); querybuf->hint);
if (TARGET_IS_ALL(route_target)) if (TARGET_IS_ALL(route_target))

View File

@ -51,15 +51,19 @@ type=service
router=readwritesplit router=readwritesplit
servers=server1,server2,server3,server4 servers=server1,server2,server3,server4
max_slave_connections=90% max_slave_connections=90%
write_ses_variables_to_all=Yes
read_ses_variables_from_slaves=Yes
user=maxuser user=maxuser
passwd=maxpwd passwd=maxpwd
filters=Hint
[RW Split Hint Router] [RW Split Hint Router]
type=service type=service
router=readwritesplit router=readwritesplit
servers=server1,server2,server3,server4 servers=server1,server2,server3,server4
max_slave_connections=90% max_slave_connections=90%
write_ses_variables_to_all=Yes
read_ses_variables_from_slaves=Yes
user=maxuser user=maxuser
passwd=maxpwd passwd=maxpwd
filters=Hint filters=Hint

View File

@ -45,5 +45,6 @@
#endif #endif
#define MAX_ERROR_MSG PATH_MAX #define MAX_ERROR_MSG PATH_MAX
#define array_nelems(a) ((uint)(sizeof(a)/sizeof(a[0])))
#endif /* SKYGW_TYPES_H */ #endif /* SKYGW_TYPES_H */

View File

@ -83,6 +83,7 @@ typedef enum { THR_INIT, THR_RUNNING, THR_STOPPED, THR_DONE } skygw_thr_state_t;
typedef enum { MES_RC_FAIL, MES_RC_SUCCESS, MES_RC_TIMEOUT } skygw_mes_rc_t; typedef enum { MES_RC_FAIL, MES_RC_SUCCESS, MES_RC_TIMEOUT } skygw_mes_rc_t;
EXTERN_C_BLOCK_BEGIN EXTERN_C_BLOCK_BEGIN
slist_cursor_t* slist_init(void); slist_cursor_t* slist_init(void);
void slist_done(slist_cursor_t* c); void slist_done(slist_cursor_t* c);