Multiple slave connections for read/write split router.

Merge branch 'MAX-95' into develop
This commit is contained in:
VilhoRaatikka 2014-04-29 18:23:08 +03:00
commit bb7d583155
19 changed files with 1251 additions and 643 deletions

View File

@ -253,6 +253,7 @@ static int logmanager_write_log(
va_list valist);
static blockbuf_t* blockbuf_init(logfile_id_t id);
static void blockbuf_node_done(void* bb_data);
static char* blockbuf_get_writepos(
#if 0
int** refcount,
@ -996,8 +997,13 @@ static char* blockbuf_get_writepos(
simple_mutex_unlock(&bb->bb_mutex);
return pos;
}
static void blockbuf_node_done(
void* bb_data)
{
blockbuf_t* bb = (blockbuf_t *)bb_data;
simple_mutex_done(&bb->bb_mutex);
}
static blockbuf_t* blockbuf_init(
@ -2059,7 +2065,7 @@ static bool logfile_init(
if (mlist_init(&logfile->lf_blockbuf_list,
NULL,
strdup("logfile block buffer list"),
NULL,
blockbuf_node_done,
MAXNBLOCKBUFS) == NULL)
{
ss_dfprintf(stderr,

View File

@ -706,6 +706,10 @@ static bool skygw_stmt_causes_implicit_commit(
{
succp = true;
}
else
{
succp =false;
}
break;
default:
succp = true;

View File

@ -195,6 +195,8 @@ int error_count = 0;
"router");
if (router)
{
char* max_slave_conn_str;
obj->element = service_alloc(obj->object, router);
char *user =
config_get_value(obj->parameters, "user");
@ -203,6 +205,22 @@ int error_count = 0;
char *enable_root_user =
config_get_value(obj->parameters, "enable_root_user");
if (obj->element == NULL)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Reading configuration "
"for router service '%s' failed. "
"Router %s is not loaded.",
obj->object,
obj->object)));
obj = obj->next;
continue; /*< process next obj */
}
max_slave_conn_str =
config_get_value(obj->parameters,
"max_slave_connections");
if (enable_root_user)
serviceEnableRootUser(obj->element, atoi(enable_root_user));
@ -222,6 +240,35 @@ int error_count = 0;
"corresponding password.",
obj->object)));
}
if (max_slave_conn_str != NULL)
{
CONFIG_PARAMETER* param;
bool succp;
param = config_get_param(obj->parameters,
"max_slave_connections");
succp = service_set_slave_conn_limit(
obj->element,
param,
max_slave_conn_str,
COUNT_ATMOST);
if (!succp)
{
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"* Warning : invalid value type "
"for parameter \'%s.%s = %s\'\n\tExpected "
"type is either <int> for slave connection "
"count or\n\t<int>%% for specifying the "
"maximum percentage of available the "
"slaves that will be connected.",
((SERVICE*)obj->element)->name,
param->name,
param->value)));
}
}
}
else
{
@ -515,6 +562,89 @@ config_get_value(CONFIG_PARAMETER *params, const char *name)
return NULL;
}
CONFIG_PARAMETER* config_get_param(
CONFIG_PARAMETER* params,
const char* name)
{
while (params)
{
if (!strcmp(params->name, name))
return params;
params = params->next;
}
return NULL;
}
config_param_type_t config_get_paramtype(
CONFIG_PARAMETER* param)
{
return param->qfd_param_type;
}
int config_get_valint(
CONFIG_PARAMETER* param,
const char* name, /*< if NULL examine current param only */
config_param_type_t ptype)
{
int val = -1; /*< -1 indicates failure */
while (param)
{
if (name == NULL || !strncmp(param->name, name, MAX_PARAM_LEN))
{
switch (ptype) {
case COUNT_TYPE:
val = param->qfd.valcount;
goto return_val;
case PERCENT_TYPE:
val = param->qfd.valpercent;
goto return_val;
case BOOL_TYPE:
val = param->qfd.valbool;
goto return_val;
default:
goto return_val;
}
}
else if (name == NULL)
{
goto return_val;
}
param = param->next;
}
return_val:
return val;
}
CONFIG_PARAMETER* config_clone_param(
CONFIG_PARAMETER* param)
{
CONFIG_PARAMETER* p2;
p2 = (CONFIG_PARAMETER*) malloc(sizeof(CONFIG_PARAMETER));
if (p2 == NULL)
{
goto return_p2;
}
memcpy(p2, param, sizeof(CONFIG_PARAMETER));
p2->name = strndup(param->name, MAX_PARAM_LEN);
p2->value = strndup(param->value, MAX_PARAM_LEN);
if (param->qfd_param_type == STRING_TYPE)
{
p2->qfd.valstr = strndup(param->qfd.valstr, MAX_PARAM_LEN);
}
return_p2:
return p2;
}
/**
* Free a config tree
*
@ -861,6 +991,7 @@ static char *service_params[] =
"user",
"passwd",
"enable_root_user",
"max_slave_connections",
NULL
};
@ -950,3 +1081,47 @@ int i;
obj = obj->next;
}
}
/**
* Set qualified parameter value to CONFIG_PARAMETER struct.
*/
bool config_set_qualified_param(
CONFIG_PARAMETER* param,
void* val,
config_param_type_t type)
{
bool succp;
switch (type) {
case STRING_TYPE:
param->qfd.valstr = strndup((const char *)val, MAX_PARAM_LEN);
succp = true;
break;
case COUNT_TYPE:
param->qfd.valcount = *(int *)val;
succp = true;
break;
case PERCENT_TYPE:
param->qfd.valpercent = *(int *)val;
succp = true;
break;
case BOOL_TYPE:
param->qfd.valbool = *(bool *)val;
succp = true;
break;
default:
succp = false;
break;
}
if (succp)
{
param->qfd_param_type = type;
}
return succp;
}

View File

@ -136,7 +136,7 @@ static void usage(void);
static char* get_expanded_pathname(
char** abs_path,
char* input_path,
char* fname);
const char* fname);
static void print_log_n_stderr(
bool do_log,
bool do_stderr,
@ -725,7 +725,7 @@ static bool file_is_writable(
static char* get_expanded_pathname(
char** output_path,
char* relative_path,
char* fname)
const char* fname)
{
char* cnf_file_buf = NULL;
char* expanded_path;
@ -1228,7 +1228,7 @@ int main(int argc, char **argv)
datadir)));
LOGIF(LM, (skygw_log_write_flush(
LOGFILE_MESSAGE,
"Configuration file : %s",
"Configuration file : %s",
cnf_file_path)));
/*< Update the server options */

View File

@ -53,6 +53,17 @@ static void register_module(const char *module,
void *modobj);
static void unregister_module(const char *module);
char* get_maxscale_home(void)
{
char* home = getenv("MAXSCALE_HOME");
if (home == NULL)
{
home = "/usr/local/skysql/MaxScale";
}
return home;
}
/**
* Load the dynamic library related to a gateway module. The routine
* will look for library files in the current directory,
@ -82,10 +93,10 @@ MODULES *mod;
sprintf(fname, "./lib%s.so", module);
if (access(fname, F_OK) == -1)
{
if ((home = getenv("MAXSCALE_HOME")) == NULL)
home = "/usr/local/skysql/MaxScale";
home = get_maxscale_home ();
sprintf(fname, "%s/modules/lib%s.so", home, module);
if (access(fname, F_OK) == -1)
if (access(fname, F_OK) == -1)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
@ -100,7 +111,7 @@ MODULES *mod;
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Unable to load library for module: "
"%s, %s.",
"%s\n\t\t\t %s.",
module,
dlerror())));
return NULL;
@ -111,7 +122,7 @@ MODULES *mod;
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Version interface not supported by "
"module: %s, %s.",
"module: %s\n\t\t\t %s.",
module,
dlerror())));
dlclose(dlhandle);
@ -134,7 +145,7 @@ MODULES *mod;
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Expected entry point interface missing "
"from module: %s, %s.",
"from module: %s\n\t\t\t %s.",
module,
dlerror())));
dlclose(dlhandle);

View File

@ -34,6 +34,8 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
#include <errno.h>
#include <session.h>
#include <service.h>
#include <server.h>
@ -52,6 +54,11 @@ extern int lm_enabled_logfiles_bitmask;
static SPINLOCK service_spin = SPINLOCK_INIT;
static SERVICE *allServices = NULL;
static void service_add_qualified_param(
SERVICE* svc,
CONFIG_PARAMETER* param);
/**
* Allocate a new service for the gateway to support
*
@ -70,6 +77,20 @@ SERVICE *service;
return NULL;
if ((service->router = load_module(router, MODULE_ROUTER)) == NULL)
{
char* home = get_maxscale_home();
char* ldpath = getenv("LD_LIBRARY_PATH");
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Unable to load %s module \"%s\".\n\t\t\t"
" Ensure that lib%s.so exists in one of the "
"following directories :\n\t\t\t "
"- %s/modules\n\t\t\t - %s",
MODULE_ROUTER,
router,
router,
home,
ldpath)));
free(service);
return NULL;
}
@ -84,6 +105,7 @@ SERVICE *service;
service->enable_root = 0;
service->routerOptions = NULL;
service->databases = NULL;
service->svc_config_param = NULL;
spinlock_init(&service->spin);
spinlock_init(&service->users_table_spin);
memset(&service->rate_limit, 0, sizeof(SERVICE_REFRESH_RATE));
@ -752,3 +774,97 @@ int service_refresh_users(SERVICE *service) {
else
return 1;
}
bool service_set_slave_conn_limit (
SERVICE* service,
CONFIG_PARAMETER* param,
char* valstr,
count_spec_t count_spec)
{
char* p;
int valint;
bool percent = false;
bool succp;
/**
* Find out whether the value is numeric and ends with '%' or '\0'
*/
p = valstr;
while(isdigit(*p)) p++;
errno = 0;
if (p == valstr || (*p != '%' && *p != '\0'))
{
succp = false;
}
else if (*p == '%')
{
if (*(p+1) == '\0')
{
*p = '\0';
valint = (int) strtol(valstr, (char **)NULL, 10);
if (valint == 0 && errno != 0)
{
succp = false;
}
else
{
succp = true;
config_set_qualified_param(param, (void *)&valint, PERCENT_TYPE);
}
}
else
{
succp = false;
}
}
else if (*p == '\0')
{
valint = (int) strtol(valstr, (char **)NULL, 10);
if (valint == 0 && errno != 0)
{
succp = false;
}
else
{
succp = true;
config_set_qualified_param(param, (void *)&valint, COUNT_TYPE);
}
}
if (succp)
{
service_add_qualified_param(service, param); /*< add param to svc */
}
return succp;
}
/**
* Add qualified config parameter to SERVICE struct.
*/
static void service_add_qualified_param(
SERVICE* svc,
CONFIG_PARAMETER* param)
{
CONFIG_PARAMETER** p;
spinlock_acquire(&svc->spin);
p = &svc->svc_config_param;
if ((*p) != NULL)
{
while ((*p)->next != NULL) *p = (*p)->next;
(*p)->next = config_clone_param(param);
}
else
{
(*p) = config_clone_param(param);
}
(*p)->next = NULL;
spinlock_release(&svc->spin);
}

View File

@ -17,6 +17,7 @@
*
* Copyright SkySQL Ab 2013
*/
#include <skygw_utils.h>
/**
* @file config.h The configuration handling elements
@ -30,12 +31,32 @@
* @endverbatim
*/
/**
* Maximum length for configuration parameter value.
*/
enum {MAX_PARAM_LEN=256};
typedef enum {
UNDEFINED_TYPE=0,
STRING_TYPE,
COUNT_TYPE,
PERCENT_TYPE,
BOOL_TYPE
} config_param_type_t;
/**
* The config parameter
*/
typedef struct config_parameter {
char *name; /**< The name of the parameter */
char *value; /**< The value of the parameter */
char *value; /**< The value of the parameter */
union { /*< qualified parameter value by type */
char* valstr; /*< terminated char* array */
int valcount; /*< int */
int valpercent; /*< int */
bool valbool; /*< bool */
} qfd;
config_param_type_t qfd_param_type;
struct config_parameter *next; /**< Next pointer in the linked list */
} CONFIG_PARAMETER;
@ -57,7 +78,22 @@ typedef struct {
int n_threads; /**< Number of polling threads */
} GATEWAY_CONF;
extern int config_load(char *);
extern int config_reload();
extern int config_threadcount();
extern int config_load(char *);
extern int config_reload();
extern int config_threadcount();
CONFIG_PARAMETER* config_get_param(CONFIG_PARAMETER* params, const char* name);
config_param_type_t config_get_paramtype(CONFIG_PARAMETER* param);
CONFIG_PARAMETER* config_clone_param(CONFIG_PARAMETER* param);
bool config_set_qualified_param(
CONFIG_PARAMETER* param,
void* val,
config_param_type_t type);
int config_get_valint(
CONFIG_PARAMETER* param,
const char* name, /*< if NULL examine current param only */
config_param_type_t ptype);
#endif

View File

@ -1,126 +0,0 @@
/*
* This file is distributed as part of the SkySQL Gateway. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright SkySQL Ab 2013
*
*/
/*
* MYSQL mysql protocol header file
* Revision History
*
* Date Who Description
* 10/06/13 Massimiliano Pinto Initial implementation
*
*/
#include <dcb.h>
/* Protocol packing macros. */
#define gw_mysql_set_byte2(__buffer, __int) do { \
(__buffer)[0]= (uint8_t)((__int) & 0xFF); \
(__buffer)[1]= (uint8_t)(((__int) >> 8) & 0xFF); } while (0)
#define gw_mysql_set_byte3(__buffer, __int) do { \
(__buffer)[0]= (uint8_t)((__int) & 0xFF); \
(__buffer)[1]= (uint8_t)(((__int) >> 8) & 0xFF); \
(__buffer)[2]= (uint8_t)(((__int) >> 16) & 0xFF); } while (0)
#define gw_mysql_set_byte4(__buffer, __int) do { \
(__buffer)[0]= (uint8_t)((__int) & 0xFF); \
(__buffer)[1]= (uint8_t)(((__int) >> 8) & 0xFF); \
(__buffer)[2]= (uint8_t)(((__int) >> 16) & 0xFF); \
(__buffer)[3]= (uint8_t)(((__int) >> 24) & 0xFF); } while (0)
/* Protocol unpacking macros. */
#define gw_mysql_get_byte2(__buffer) \
(uint16_t)((__buffer)[0] | \
((__buffer)[1] << 8))
#define gw_mysql_get_byte3(__buffer) \
(uint32_t)((__buffer)[0] | \
((__buffer)[1] << 8) | \
((__buffer)[2] << 16))
#define gw_mysql_get_byte4(__buffer) \
(uint32_t)((__buffer)[0] | \
((__buffer)[1] << 8) | \
((__buffer)[2] << 16) | \
((__buffer)[3] << 24))
#define gw_mysql_get_byte8(__buffer) \
((uint64_t)(__buffer)[0] | \
((uint64_t)(__buffer)[1] << 8) | \
((uint64_t)(__buffer)[2] << 16) | \
((uint64_t)(__buffer)[3] << 24) | \
((uint64_t)(__buffer)[4] << 32) | \
((uint64_t)(__buffer)[5] << 40) | \
((uint64_t)(__buffer)[6] << 48) | \
((uint64_t)(__buffer)[7] << 56))
typedef enum
{
GW_MYSQL_CAPABILITIES_NONE= 0,
GW_MYSQL_CAPABILITIES_LONG_PASSWORD= (1 << 0),
GW_MYSQL_CAPABILITIES_FOUND_ROWS= (1 << 1),
GW_MYSQL_CAPABILITIES_LONG_FLAG= (1 << 2),
GW_MYSQL_CAPABILITIES_CONNECT_WITH_DB= (1 << 3),
GW_MYSQL_CAPABILITIES_NO_SCHEMA= (1 << 4),
GW_MYSQL_CAPABILITIES_COMPRESS= (1 << 5),
GW_MYSQL_CAPABILITIES_ODBC= (1 << 6),
GW_MYSQL_CAPABILITIES_LOCAL_FILES= (1 << 7),
GW_MYSQL_CAPABILITIES_IGNORE_SPACE= (1 << 8),
GW_MYSQL_CAPABILITIES_PROTOCOL_41= (1 << 9),
GW_MYSQL_CAPABILITIES_INTERACTIVE= (1 << 10),
GW_MYSQL_CAPABILITIES_SSL= (1 << 11),
GW_MYSQL_CAPABILITIES_IGNORE_SIGPIPE= (1 << 12),
GW_MYSQL_CAPABILITIES_TRANSACTIONS= (1 << 13),
GW_MYSQL_CAPABILITIES_RESERVED= (1 << 14),
GW_MYSQL_CAPABILITIES_SECURE_CONNECTION= (1 << 15),
GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS= (1 << 16),
GW_MYSQL_CAPABILITIES_MULTI_RESULTS= (1 << 17),
GW_MYSQL_CAPABILITIES_PS_MULTI_RESULTS= (1 << 18),
GW_MYSQL_CAPABILITIES_PLUGIN_AUTH= (1 << 19),
GW_MYSQL_CAPABILITIES_SSL_VERIFY_SERVER_CERT= (1 << 30),
GW_MYSQL_CAPABILITIES_REMEMBER_OPTIONS= (1 << 31),
GW_MYSQL_CAPABILITIES_CLIENT= (GW_MYSQL_CAPABILITIES_LONG_PASSWORD |
GW_MYSQL_CAPABILITIES_FOUND_ROWS |
GW_MYSQL_CAPABILITIES_LONG_FLAG |
GW_MYSQL_CAPABILITIES_CONNECT_WITH_DB |
GW_MYSQL_CAPABILITIES_LOCAL_FILES |
GW_MYSQL_CAPABILITIES_PLUGIN_AUTH |
GW_MYSQL_CAPABILITIES_TRANSACTIONS |
GW_MYSQL_CAPABILITIES_PROTOCOL_41 |
GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS |
GW_MYSQL_CAPABILITIES_MULTI_RESULTS |
GW_MYSQL_CAPABILITIES_PS_MULTI_RESULTS |
GW_MYSQL_CAPABILITIES_SECURE_CONNECTION),
GW_MYSQL_CAPABILITIES_CLIENT_COMPRESS= (GW_MYSQL_CAPABILITIES_LONG_PASSWORD |
GW_MYSQL_CAPABILITIES_FOUND_ROWS |
GW_MYSQL_CAPABILITIES_LONG_FLAG |
GW_MYSQL_CAPABILITIES_CONNECT_WITH_DB |
GW_MYSQL_CAPABILITIES_LOCAL_FILES |
GW_MYSQL_CAPABILITIES_PLUGIN_AUTH |
GW_MYSQL_CAPABILITIES_TRANSACTIONS |
GW_MYSQL_CAPABILITIES_PROTOCOL_41 |
GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS |
GW_MYSQL_CAPABILITIES_MULTI_RESULTS |
GW_MYSQL_CAPABILITIES_PS_MULTI_RESULTS |
GW_MYSQL_CAPABILITIES_COMPRESS
),
} gw_mysql_capabilities_t;
#define SMALL_CHUNK 1024
#define MAX_CHUNK SMALL_CHUNK * 8 * 4
#define ToHex(Y) (Y>='0'&&Y<='9'?Y-'0':Y-'A'+10)
extern int mysql_send_ok(DCB *, int, int, const char *);
extern int MySQLSendHandshake(DCB *);

View File

@ -55,4 +55,6 @@ extern void *load_module(const char *module, const char *type);
extern void unload_module(const char *module);
extern void printModules();
extern void dprintAllModules(DCB *);
char* get_maxscale_home(void);
#endif

View File

@ -22,6 +22,7 @@
#include <spinlock.h>
#include <dcb.h>
#include <server.h>
#include "config.h"
/**
* @file service.h
@ -114,6 +115,7 @@ typedef struct service {
SERVICE_STATS stats; /**< The service statistics */
struct users *users; /**< The user data for this service */
int enable_root; /**< Allow root user access */
CONFIG_PARAMETER* svc_config_param; /*< list of config params and values */
SPINLOCK
users_table_spin; /**< The spinlock for users data refresh */
SERVICE_REFRESH_RATE
@ -121,6 +123,8 @@ typedef struct service {
struct service *next; /**< The next service in the linked list */
} SERVICE;
typedef enum count_spec_t {COUNT_ATLEAST=0, COUNT_EXACT, COUNT_ATMOST} count_spec_t;
#define SERVICE_STATE_ALLOC 1 /**< The service has been allocated */
#define SERVICE_STATE_STARTED 2 /**< The service has been started */
@ -146,4 +150,11 @@ extern int service_refresh_users(SERVICE *);
extern void printService(SERVICE *);
extern void printAllServices();
extern void dprintAllServices(DCB *);
bool service_set_slave_conn_limit (
SERVICE* service,
CONFIG_PARAMETER* param,
char* valstr,
count_spec_t count_spec);
#endif

View File

@ -31,15 +31,13 @@
#include <dcb.h>
/**
* Internal structure used to define the set of backend servers we are routing
* connections to. This provides the storage for routing module specific data
* that is required for each of the backend servers.
*/
typedef struct backend {
SERVER* backend_server; /*< The server itself */
int backend_conn_count; /*< Number of connections to the server */
} BACKEND;
typedef enum backend_type_t {
BE_UNDEFINED=-1,
BE_MASTER,
BE_JOINED = BE_MASTER,
BE_SLAVE,
BE_COUNT
} backend_type_t;
typedef struct rses_property_st rses_property_t;
typedef struct router_client_session ROUTER_CLIENT_SES;
@ -52,14 +50,6 @@ typedef enum rses_property_type_t {
RSES_PROP_TYPE_COUNT=RSES_PROP_TYPE_LAST+1
} rses_property_type_t;
typedef enum backend_type_t {
BE_UNDEFINED=-1,
BE_MASTER,
BE_JOINED = BE_MASTER,
BE_SLAVE,
BE_COUNT
} backend_type_t;
/**
* Session variable command
*/
@ -98,13 +88,62 @@ struct rses_property_st {
};
typedef struct sescmd_cursor_st {
#if defined(SS_DEBUG)
skygw_chk_t scmd_cur_chk_top;
#endif
ROUTER_CLIENT_SES* scmd_cur_rses; /*< pointer to owning router session */
rses_property_t** scmd_cur_ptr_property; /*< address of pointer to owner property */
mysql_sescmd_t* scmd_cur_cmd; /*< pointer to current session command */
bool scmd_cur_active; /*< true if command is being executed */
backend_type_t scmd_cur_be_type; /*< BE_MASTER or BE_SLAVE */
#if defined(SS_DEBUG)
skygw_chk_t scmd_cur_chk_tail;
#endif
} sescmd_cursor_t;
/**
* Internal structure used to define the set of backend servers we are routing
* connections to. This provides the storage for routing module specific data
* that is required for each of the backend servers.
*
* Owned by router_instance, referenced by each routing session.
*/
typedef struct backend_st {
#if defined(SS_DEBUG)
skygw_chk_t be_chk_top;
#endif
SERVER* backend_server; /*< The server itself */
int backend_conn_count; /*< Number of connections to the server */
bool be_valid; /*< valid when belongs to the router's configuration */
#if defined(SS_DEBUG)
skygw_chk_t be_chk_tail;
#endif
} BACKEND;
/**
* Reference to BACKEND.
*
* Owned by router client session.
*/
typedef struct backend_ref_st {
#if defined(SS_DEBUG)
skygw_chk_t bref_chk_top;
#endif
BACKEND* bref_backend;
DCB* bref_dcb;
sescmd_cursor_t bref_sescmd_cur;
#if defined(SS_DEBUG)
skygw_chk_t bref_chk_tail;
#endif
} backend_ref_t;
typedef struct rwsplit_config_st {
int rw_max_slave_conn_percent;
int rw_max_slave_conn_count;
} rwsplit_config_t;
/**
* The client session structure used within this router.
*/
@ -113,15 +152,17 @@ struct router_client_session {
skygw_chk_t rses_chk_top;
#endif
SPINLOCK rses_lock; /*< protects rses_deleted */
int rses_versno; /*< even = no active update, else odd */
int rses_versno; /*< even = no active update, else odd. not used 4/14 */
bool rses_closed; /*< true when closeSession is called */
/** Properties listed by their type */
rses_property_t* rses_properties[RSES_PROP_TYPE_COUNT];
BACKEND* rses_backend[BE_COUNT];/*< Backends used by client session */
DCB* rses_dcb[BE_COUNT];
/*< cursor is pointer and status variable to current session command */
sescmd_cursor_t rses_cursor[BE_COUNT];
backend_ref_t* rses_master_ref;
backend_ref_t* rses_backend_ref; /*< Pointer to backend reference array */
rwsplit_config_t rses_config; /*< copied config info from router instance */
int rses_nbackends;
int rses_capabilities; /*< input type, for example */
bool rses_autocommit_enabled;
bool rses_transaction_active;
struct router_client_session* next;
#if defined(SS_DEBUG)
skygw_chk_t rses_chk_tail;
@ -149,10 +190,15 @@ typedef struct router_instance {
SPINLOCK lock; /*< Lock for the instance data */
BACKEND** servers; /*< Backend servers */
BACKEND* master; /*< NULL or pointer */
rwsplit_config_t rwsplit_config; /*< expanded config info from SERVICE */
unsigned int bitmask; /*< Bitmask to apply to server->status */
unsigned int bitvalue; /*< Required value of server->status */
ROUTER_STATS stats; /*< Statistics for this router */
struct router_instance* next; /*< Next router on the list */
} ROUTER_INSTANCE;
#define BACKEND_TYPE(b) (SERVER_IS_MASTER((b)->backend_server) ? BE_MASTER : \
(SERVER_IS_SLAVE((b)->backend_server) ? BE_SLAVE : \
(SERVER_IS_JOINED((b)->backend_server) ? BE_JOINED : BE_UNDEFINED)));
#endif /*< _RWSPLITROUTER_H */

View File

@ -682,7 +682,7 @@ int gw_read_client_event(DCB* dcb) {
dcb,
1,
0,
"Query routing failed. Connection to "
"Can't route query. Connection to "
"backend lost");
protocol->state = MYSQL_IDLE;
}

View File

@ -252,10 +252,12 @@ int i, n;
}
else
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"Warning : Unsupported router "
"option %s for readconnroute.",
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"* Warning : Unsupported router "
"option \'%s\' for readconnroute. "
"Expected router options are "
"[slave|master|synced]",
options[i])));
}
}

File diff suppressed because it is too large Load Diff

View File

@ -169,6 +169,8 @@ else
echo "$TINPUT PASSED">>$TLOG ;
fi
# Disable autocommit in the first session and then test in new session that
# it is again enabled.
TINPUT=test_autocommit_disabled2.sql
TRETVAL=1
a=`$RUNCMD < ./$TINPUT`
@ -178,3 +180,14 @@ else
echo "$TINPUT PASSED">>$TLOG ;
fi
TINPUT=set_autocommit_disabled.sql
`$RUNCMD < ./$TINPUT`
TINPUT=test_after_autocommit_disabled.sql
TRETVAL=$TMASTER_ID
a=`$RUNCMD < ./$TINPUT`
if [ "$a" == "$TRETVAL" ]; then
echo "$TINPUT FAILED, return value $a when it was not accetable">>$TLOG;
else
echo "$TINPUT PASSED">>$TLOG ;
fi

View File

@ -0,0 +1,7 @@
use test;
drop table if exists t1;
create table t1 (id integer);
set autocommit=0; -- open transaction
begin;
insert into t1 values(1); -- write to master
commit;

View File

@ -0,0 +1,2 @@
use test;
select @@server_id;

View File

@ -0,0 +1,9 @@
use test;
drop table if exists t1;
create table t1 (id integer);
set autocommit=0; -- open transaction
begin;
insert into t1 values(1); -- write to master
commit;
select count(*) from t1; -- read from master since autocommit is disabled
drop table t1;

View File

@ -119,7 +119,10 @@ typedef enum skygw_chk_t {
CHK_NUM_SESSION,
CHK_NUM_ROUTER_SES,
CHK_NUM_MY_SESCMD,
CHK_NUM_ROUTER_PROPERTY
CHK_NUM_ROUTER_PROPERTY,
CHK_NUM_SESCMD_CUR,
CHK_NUM_BACKEND,
CHK_NUM_BACKEND_REF
} skygw_chk_t;
# define STRBOOL(b) ((b) ? "true" : "false")
@ -446,7 +449,25 @@ typedef enum skygw_chk_t {
"Session command has invalid check fields"); \
}
#define CHK_SESCMD_CUR(c) { \
ss_info_dassert((c)->scmd_cur_chk_top == CHK_NUM_SESCMD_CUR && \
(c)->scmd_cur_chk_tail == CHK_NUM_SESCMD_CUR, \
"Session command cursor has invalid check fields"); \
}
#define CHK_BACKEND(b) { \
ss_info_dassert((b)->be_chk_top == CHK_NUM_BACKEND && \
(b)->be_chk_tail == CHK_NUM_BACKEND, \
"BACKEND has invalid check fields"); \
}
#define CHK_BACKEND_REF(r) { \
ss_info_dassert((r)->bref_chk_top == CHK_NUM_BACKEND_REF && \
(r)->bref_chk_tail == CHK_NUM_BACKEND_REF, \
"Backend reference has invalid check fields"); \
}
#if defined(SS_DEBUG)
bool conn_open[10240];
#endif