Merge branch 'develop' into packaging

This commit is contained in:
Timofey Turenko 2014-03-27 19:16:35 +02:00
commit 5c9f25cc2d
36 changed files with 2506 additions and 653 deletions

13
README
View File

@ -25,10 +25,11 @@ as external shared objects and are referred to as routing modules.
An Google Group exists for MaxScale that can be used to discuss ideas,
issues and communicate with the MaxScale community.
[maxscale@googlegroups.com]: mailto:maxscale@googlegroups.com
Send email to [maxscale@googlegroups.com](mailto:maxscale@googlegroups.com)
or use the [forum](http://groups.google.com/forum/#!forum/maxscale) interface
Bugs can be reported in the SkySQL bugs database
[bug.skysql.com]: http://bugs.skysql.com
[bug.skysql.com](http://bugs.skysql.com)
\section Building Building MaxScale
@ -50,6 +51,12 @@ Please backup any existent my.cnf file before installing the RPMs
Install the RPM files using:
rpm -i MariaDB-5.5.34-centos6-x86_64-common.rpm MariaDB-5.5.34-centos6-x86_64-compat.rpm MariaDB-5.5.34-centos6-x86_64-devel.rpm
Note, if you wish to relocate the package to avoid an exisitng MariaDB
or MySQL installation you will need to use the --force option in addition
to the --relocate option.
rpm -i --force --relocate=/usr/=$PREFIX/usr/ MariaDB-5.5.34-centos6-x86_64-common.rpm MariaDB-5.5.34-centos6-x86_64-compat.rpm MariaDB-5.5.34-centos6-x86_64-devel.rpm
This README assumes $PREFIX = $HOME.
@ -58,7 +65,7 @@ MaxScale may be built with the embedded MariaDB library either linked
dynamically or statically.
To build with the embedded libmysqld linked dynamically from an
existent MariaDB setup
existing MariaDB source setup
set DYNLIB := Y
copy the libmysqld.so in $(HOME)/usr/lib64/dynlib

View File

@ -674,7 +674,7 @@ static int logmanager_write_log(
if (use_valist) {
vsnprintf(wp+timestamp_len, safe_str_len, str, valist);
} else {
snprintf(wp+timestamp_len, safe_str_len, str);
snprintf(wp+timestamp_len, safe_str_len, "%s", str);
}
/** write to syslog */
@ -682,11 +682,11 @@ static int logmanager_write_log(
{
switch(id) {
case LOGFILE_ERROR:
syslog(LOG_ERR, wp+timestamp_len);
syslog(LOG_ERR, "%s", wp+timestamp_len);
break;
case LOGFILE_MESSAGE:
syslog(LOG_NOTICE, wp+timestamp_len);
syslog(LOG_NOTICE, "%s", wp+timestamp_len);
break;
default:
@ -742,7 +742,7 @@ static int logmanager_write_log(
* Copy original string from block buffer to
* other logs' block buffers.
*/
snprintf(wp_c, timestamp_len+str_len, wp);
snprintf(wp_c, timestamp_len+str_len, "%s", wp);
/** remove double line feed */
if (wp_c[timestamp_len-1+str_len-2] == '\n')

View File

@ -22,6 +22,7 @@ ifdef DYNLIB
LIB := libmysqld.so.18
endif
# -O2 -g -pipe -Wformat -Werror=format-security -Wp,-D_FORTIFY_SOURCE=2 -fstack-protector --param=ssp-buffer-size=4 -fPIC
CFLAGS := $(CFLAGS) -Wall
LDLIBS := $(LDLIBS) -pthread
@ -32,7 +33,7 @@ CPP_LDLIBS := -lstdc++
# Compiler flags, httpd arguments and debugger options
#
ifdef DEBUG
DEBUG_FLAGS := -DSS_DEBUG
DEBUG_FLAGS := -DSS_DEBUG -pipe -Wformat -Werror=format-security -fstack-protector --param=ssp-buffer-size=4 -fPIC
CFLAGS := $(CFLAGS) -ggdb -O0 -pthread $(DEBUG_FLAGS)
endif

View File

@ -73,6 +73,9 @@ static bool create_parse_tree(
static skygw_query_type_t resolve_query_type(
THD* thd);
static bool skygw_stmt_causes_implicit_commit(
LEX* lex,
uint mask);
/**
* @node (write brief function description here)
@ -94,13 +97,13 @@ skygw_query_type_t skygw_query_classifier_get_type(
const char* query,
unsigned long client_flags)
{
MYSQL* mysql;
char* query_str;
const char* user = "skygw";
const char* db = "skygw";
THD* thd;
MYSQL* mysql;
char* query_str;
const char* user = "skygw";
const char* db = "skygw";
THD* thd;
skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN;
bool failp = FALSE;
bool failp = FALSE;
ss_info_dassert(query != NULL, ("query_str is NULL"));
@ -343,9 +346,9 @@ return_here:
* restrictive, for example, QUERY_TYPE_READ is smaller than QUERY_TYPE_WRITE.
*
*/
static skygw_query_type_t set_query_type(
skygw_query_type_t* qtype,
skygw_query_type_t new_type)
static u_int8_t set_query_type(
u_int8_t* qtype,
u_int8_t new_type)
{
*qtype = MAX(*qtype, new_type);
return *qtype;
@ -371,8 +374,9 @@ static skygw_query_type_t resolve_query_type(
THD* thd)
{
skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN;
LEX* lex;
Item* item;
u_int8_t type = QUERY_TYPE_UNKNOWN;
LEX* lex;
Item* item;
/**
* By default, if sql_log_bin, that is, recording data modifications
* to binary log, is disabled, gateway treats operations normally.
@ -389,8 +393,8 @@ static skygw_query_type_t resolve_query_type(
/** SELECT ..INTO variable|OUTFILE|DUMPFILE */
if (lex->result != NULL) {
qtype = QUERY_TYPE_SESSION_WRITE;
goto return_here;
type = QUERY_TYPE_SESSION_WRITE;
goto return_qtype;
}
/**
* 1:ALTER TABLE, TRUNCATE, REPAIR, OPTIMIZE, ANALYZE, CHECK.
@ -402,61 +406,99 @@ static skygw_query_type_t resolve_query_type(
* CREATE SPFUNCTION, INSTALL|UNINSTALL PLUGIN
*/
if (is_log_table_write_query(lex->sql_command) ||
is_update_query(lex->sql_command))
is_update_query(lex->sql_command))
{
if (thd->variables.sql_log_bin == 0 &&
force_data_modify_op_replication)
{
qtype = QUERY_TYPE_SESSION_WRITE;
} else {
qtype = QUERY_TYPE_WRITE;
}
if (thd->variables.sql_log_bin == 0 &&
force_data_modify_op_replication)
{
type |= QUERY_TYPE_SESSION_WRITE;
} else {
type |= QUERY_TYPE_WRITE;
}
goto return_here;
goto return_qtype;
}
/**
* REVOKE ALL, ASSIGN_TO_KEYCACHE,
* PRELOAD_KEYS, FLUSH, RESET, CREATE|ALTER|DROP SERVER
* SET autocommit, various other SET commands.
*/
if (sql_command_flags[lex->sql_command] & CF_AUTO_COMMIT_TRANS) {
if (skygw_stmt_causes_implicit_commit(lex, CF_AUTO_COMMIT_TRANS))
{
if (LOG_IS_ENABLED(LOGFILE_TRACE))
{
if (sql_command_flags[lex->sql_command] &
CF_IMPLICT_COMMIT_BEGIN)
{
skygw_log_write(
LOGFILE_TRACE,
"Implicit COMMIT before executing the "
"next command.");
}
else if (sql_command_flags[lex->sql_command] &
CF_IMPLICIT_COMMIT_END)
{
skygw_log_write(
LOGFILE_TRACE,
"Implicit COMMIT after executing the "
"next command.");
}
}
type |= QUERY_TYPE_COMMIT;
if (lex->option_type == OPT_GLOBAL)
{
qtype = QUERY_TYPE_GLOBAL_WRITE;
type |= QUERY_TYPE_GLOBAL_WRITE;
}
else
else if (lex->option_type == OPT_SESSION)
{
qtype = QUERY_TYPE_SESSION_WRITE;
type |= QUERY_TYPE_SESSION_WRITE;
}
goto return_here;
goto return_qtype;
}
/** Try to catch session modifications here */
switch (lex->sql_command) {
case SQLCOM_SET_OPTION:
if (lex->option_type == OPT_GLOBAL)
{
qtype = QUERY_TYPE_GLOBAL_WRITE;
break;
}
case SQLCOM_SET_OPTION:
if (lex->option_type == OPT_GLOBAL)
{
type |= QUERY_TYPE_GLOBAL_WRITE;
break;
}
/**<! fall through */
case SQLCOM_CHANGE_DB:
qtype = QUERY_TYPE_SESSION_WRITE;
break;
case SQLCOM_CHANGE_DB:
type |= QUERY_TYPE_SESSION_WRITE;
break;
case SQLCOM_SELECT:
qtype = QUERY_TYPE_READ;
break;
case SQLCOM_SELECT:
type |= QUERY_TYPE_READ;
break;
case SQLCOM_CALL:
qtype = QUERY_TYPE_WRITE;
break;
case SQLCOM_CALL:
type |= QUERY_TYPE_WRITE;
break;
case SQLCOM_BEGIN:
type |= QUERY_TYPE_BEGIN_TRX;
goto return_qtype;
break;
default:
break;
case SQLCOM_COMMIT:
type |= QUERY_TYPE_COMMIT;
goto return_qtype;
break;
case SQLCOM_ROLLBACK:
type |= QUERY_TYPE_ROLLBACK;
goto return_qtype;
break;
default:
break;
}
if (QTYPE_LESS_RESTRICTIVE_THAN_WRITE(qtype)) {
if (QTYPE_LESS_RESTRICTIVE_THAN_WRITE(type)) {
/**
* These values won't change qtype more restrictive than write.
* UDFs and procedures could possibly cause session-wide write,
@ -485,8 +527,7 @@ static skygw_query_type_t resolve_query_type(
if (itype == Item::SUBSELECT_ITEM) {
continue;
} else if (itype == Item::FUNC_ITEM) {
skygw_query_type_t
func_qtype = QUERY_TYPE_UNKNOWN;
int func_qtype = QUERY_TYPE_UNKNOWN;
/**
* Item types:
* FIELD_ITEM = 0, FUNC_ITEM,
@ -539,7 +580,7 @@ static skygw_query_type_t resolve_query_type(
* An unknown (for maxscale) function / sp
* belongs to this category.
*/
func_qtype = QUERY_TYPE_WRITE;
func_qtype |= QUERY_TYPE_WRITE;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [resolve_query_type] "
@ -549,7 +590,7 @@ static skygw_query_type_t resolve_query_type(
pthread_self())));
break;
case Item_func::UDF_FUNC:
func_qtype = QUERY_TYPE_WRITE;
func_qtype |= QUERY_TYPE_WRITE;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [resolve_query_type] "
@ -559,7 +600,7 @@ static skygw_query_type_t resolve_query_type(
break;
case Item_func::NOW_FUNC:
case Item_func::GSYSVAR_FUNC:
func_qtype = QUERY_TYPE_LOCAL_READ;
func_qtype |= QUERY_TYPE_LOCAL_READ;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [resolve_query_type] "
@ -568,7 +609,7 @@ static skygw_query_type_t resolve_query_type(
pthread_self())));
break;
case Item_func::UNKNOWN_FUNC:
func_qtype = QUERY_TYPE_READ;
func_qtype |= QUERY_TYPE_READ;
/**
* Many built-in functions are of this
* type, for example, rand(), soundex(),
@ -592,17 +633,48 @@ static skygw_query_type_t resolve_query_type(
break;
} /**< switch */
/**< Set new query type */
qtype = set_query_type(&qtype, func_qtype);
type |= set_query_type(&type, func_qtype);
}
/**
* Write is as restrictive as it gets due functions,
* so break.
*/
if (qtype == QUERY_TYPE_WRITE) {
if ((type & QUERY_TYPE_WRITE) == QUERY_TYPE_WRITE) {
break;
}
} /**< for */
} /**< if */
return_here:
return_qtype:
qtype = (skygw_query_type_t)type;
return qtype;
}
static bool skygw_stmt_causes_implicit_commit(LEX* lex, uint mask)
{
bool succp;
if (!(sql_command_flags[lex->sql_command] & mask))
{
succp = false;
goto return_succp;
}
switch (lex->sql_command) {
case SQLCOM_DROP_TABLE:
succp = !(lex->drop_temporary);
break;
case SQLCOM_ALTER_TABLE:
case SQLCOM_CREATE_TABLE:
/* If CREATE TABLE of non-temporary table, do implicit commit */
succp = !(lex->create_info.options & HA_LEX_CREATE_TMP_TABLE);
break;
case SQLCOM_SET_OPTION:
succp = lex->autocommit ? true : false;
break;
default:
break;
}
return_succp:
return succp;
}

View File

@ -29,15 +29,18 @@ EXTERN_C_BLOCK_BEGIN
* is modified
*/
typedef enum {
QUERY_TYPE_UNKNOWN = 7, /*!< Couln't find out or parse error */
QUERY_TYPE_LOCAL_READ, /*!< Read non-database data, execute in MaxScale */
QUERY_TYPE_READ, /*!< No updates */
QUERY_TYPE_WRITE, /*!< Master data will be modified */
QUERY_TYPE_SESSION_WRITE,/*!< Session data will be modified */
QUERY_TYPE_GLOBAL_WRITE /*!< Global system variable modification */
QUERY_TYPE_UNKNOWN = 0, /*< Couln't find out or parse error */
QUERY_TYPE_LOCAL_READ = (1<<0), /*< Read non-database data, execute in MaxScale */
QUERY_TYPE_READ = (1<<1), /*< No updates */
QUERY_TYPE_WRITE = (1<<2), /*< Master data will be modified */
QUERY_TYPE_SESSION_WRITE = (1<<3), /*< Session data will be modified */
QUERY_TYPE_GLOBAL_WRITE = (1<<4), /*< Global system variable modification */
QUERY_TYPE_BEGIN_TRX = (1<<5), /*< BEGIN or START TRANSACTION */
QUERY_TYPE_ROLLBACK = (1<<6), /*< ROLLBACK */
QUERY_TYPE_COMMIT = (1<<7), /*< COMMIT */
} skygw_query_type_t;
#define QUERY_IS_TYPE(mask,type) ((mask & type) == type)
skygw_query_type_t skygw_query_classifier_get_type(
const char* query_str,

41
script/make-binary-tarball.sh Executable file
View File

@ -0,0 +1,41 @@
#!/bin/sh
read -p "Enter path where MaxScale is installed:" instpath
if [ "${instpath}" = "" ]; then
echo "Error: input path is null, exit"
exit 1
fi
BINARY_PATH=${instpath}
cd ${BINARY_PATH}
BINARY_PATH=${PWD}
echo "Looking for MaxScale in [${BINARY_PATH}]"
if [ -s "${BINARY_PATH}/bin/maxscale" ]; then
if [ -x "${BINARY_PATH}/bin/maxscale" ]; then
MAXSCALE_VERSION=`strings ${BINARY_PATH}/bin/maxscale | grep "SkySQL MaxScale" | awk '{print $3}' | head -1`
echo "Found MaxScale, version: ${MAXSCALE_VERSION}"
fi
else
echo "Error: MaxScale was not found!"
exit 1
fi
MAXSCALE_BINARY_TARFILE=maxscale.preview.${MAXSCALE_VERSION}.tar
TARFILE_BASEDIR=maxscale-${MAXSCALE_VERSION}
TARFILE_BASEDIR_SUBST='s,^\.,'${TARFILE_BASEDIR}','
rm -rf ${MAXSCALE_BINARY_TARFILE}.gz
rm -rf ${MAXSCALE_BINARY_TARFILE}
TARFILE_BASEDIR_SUBST='s,^'${BINARY_PATH}','${TARFILE_BASEDIR}','
tar --absolute-names --owner=maxscale --group=maxscale --transform=${TARFILE_BASEDIR_SUBST} -cf ${MAXSCALE_BINARY_TARFILE} ${BINARY_PATH}/*
gzip ${MAXSCALE_BINARY_TARFILE}
if [ -s "${MAXSCALE_BINARY_TARFILE}.gz" ]; then
echo "File ["${MAXSCALE_BINARY_TARFILE}".gz] is ready in ["$BINARY_PATH"]"
else
echo "Error: File ["${MAXSCALE_BINARY_TARFILE}".gz] was not created in ["$BINARY_PATH"]"
fi

58
script/make-source-tarball.sh Executable file
View File

@ -0,0 +1,58 @@
#!/bin/sh
SOURCE_PATH=${PWD}/..
cd ${SOURCE_PATH}
SOURCE_PATH=${PWD}
read -p "Building source tarball from ${SOURCE_PATH} ? [y/n]" yn
case $yn in
[Yy]* )
break
;;
[Nn]* ) read -p "Enter MaxScale source tree path: " new_path
if [ "${new_path}" = "" ]; then
echo "Error: input path null, exit"
exit 1
fi
SOURCE_PATH=$new_path
cd ${SOURCE_PATH}
echo "Selected source tree is [$new_path]"
break
;;
* ) echo "Please answer yes or no!"
exit 1
;;
esac
if [ -s "./VERSION" ]; then
MAXSCALE_VERSION=`cat ./VERSION`
echo "MaxScale version:" ${MAXSCALE_VERSION}
else
echo "Error: MaxScale version file ./VERSION not found!"
exit 1
fi
MAXSCALE_SOURCE_TARFILE=maxscale.src.preview.${MAXSCALE_VERSION}.tar
TARFILE_BASEDIR=maxscale-${MAXSCALE_VERSION}
TARFILE_BASEDIR_SUBST='s,^\.,'${TARFILE_BASEDIR}','
rm -rf ${MAXSCALE_SOURCE_TARFILE}.gz
rm -rf ${MAXSCALE_SOURCE_TARFILE}
TARFILE_BASEDIR_SUBST='s,^'${SOURCE_PATH}','${TARFILE_BASEDIR}','
tar --absolute-names --owner=maxscale --group=maxscale --transform=${TARFILE_BASEDIR_SUBST} -cf ${MAXSCALE_SOURCE_TARFILE} ${SOURCE_PATH}/*
gzip ${MAXSCALE_SOURCE_TARFILE}
if [ -s "${MAXSCALE_SOURCE_TARFILE}.gz" ]; then
echo "File ["${MAXSCALE_SOURCE_TARFILE}".gz] is ready in ["$SOURCE_PATH"]"
else
echo "Error: File ["${MAXSCALE_SOURCE_TARFILE}".gz] was not created in ["$SOURCE_PATH"]"
fi

View File

@ -81,6 +81,7 @@ SHARED_BUF *sbuf;
sbuf->refcount = 1;
rval->sbuf = sbuf;
rval->next = NULL;
rval->gwbuf_type = GWBUF_TYPE_UNDEFINED;
rval->command = 0;
CHK_GWBUF(rval);
return rval;
@ -127,11 +128,93 @@ GWBUF *rval;
rval->sbuf = buf->sbuf;
rval->start = buf->start;
rval->end = buf->end;
rval->gwbuf_type = buf->gwbuf_type;
rval->next = NULL;
rval->command = buf->command;
CHK_GWBUF(rval);
return rval;
}
GWBUF *gwbuf_clone_portion(
GWBUF *buf,
size_t start_offset,
size_t length)
{
GWBUF* clonebuf;
CHK_GWBUF(buf);
ss_dassert(start_offset+length <= GWBUF_LENGTH(buf));
if ((clonebuf = (GWBUF *)malloc(sizeof(GWBUF))) == NULL)
{
return NULL;
}
atomic_add(&buf->sbuf->refcount, 1);
clonebuf->sbuf = buf->sbuf;
clonebuf->start = (void *)((char*)buf->start)+start_offset;
clonebuf->end = (void *)((char *)clonebuf->start)+length;
clonebuf->gwbuf_type = buf->gwbuf_type; /*< clone the type for now */
clonebuf->next = NULL;
CHK_GWBUF(clonebuf);
return clonebuf;
}
/**
* Returns pointer to GWBUF of a requested type.
* As of 10.3.14 only MySQL to plain text conversion is supported.
* Return NULL if conversion between types is not supported or due lacking
* type information.
*/
GWBUF *gwbuf_clone_transform(
GWBUF * head,
gwbuf_type_t targettype)
{
gwbuf_type_t src_type;
GWBUF* clonebuf;
CHK_GWBUF(head);
src_type = head->gwbuf_type;
if (targettype == GWBUF_TYPE_UNDEFINED ||
src_type == GWBUF_TYPE_UNDEFINED ||
src_type == GWBUF_TYPE_PLAINSQL ||
targettype == src_type)
{
clonebuf = NULL;
goto return_clonebuf;
}
switch (src_type)
{
case GWBUF_TYPE_MYSQL:
if (targettype == GWBUF_TYPE_PLAINSQL)
{
/** Crete reference to string part of buffer */
clonebuf = gwbuf_clone_portion(
head,
5,
GWBUF_LENGTH(head)-5);
ss_dassert(clonebuf != NULL);
/** Overwrite the type with new format */
clonebuf->gwbuf_type = targettype;
}
else
{
clonebuf = NULL;
}
break;
default:
clonebuf = NULL;
break;
} /*< switch (src_type) */
return_clonebuf:
return clonebuf;
}
/**
* Append a buffer onto a linked list of buffer structures.
*
@ -150,6 +233,7 @@ GWBUF *ptr = head;
if (!head)
return tail;
CHK_GWBUF(head);
CHK_GWBUF(tail);
while (ptr->next)
{
ptr = ptr->next;
@ -178,9 +262,10 @@ GWBUF *
gwbuf_consume(GWBUF *head, unsigned int length)
{
GWBUF *rval = head;
CHK_GWBUF(head);
GWBUF_CONSUME(head, length);
CHK_GWBUF(head);
if (GWBUF_EMPTY(head))
{
rval = head->next;
@ -207,3 +292,28 @@ int rval = 0;
}
return rval;
}
bool gwbuf_set_type(
GWBUF* buf,
gwbuf_type_t type)
{
bool succp;
CHK_GWBUF(buf);
switch (type) {
case GWBUF_TYPE_MYSQL:
case GWBUF_TYPE_PLAINSQL:
case GWBUF_TYPE_UNDEFINED:
buf->gwbuf_type = type;
succp = true;
break;
default:
succp = false;
break;
}
ss_dassert(succp);
return succp;
}

View File

@ -28,6 +28,7 @@
* 23/07/13 Mark Riddoch Addition on default monitor password
* 06/02/14 Massimiliano Pinto Added support for enable/disable root user in services
* 14/02/14 Massimiliano Pinto Added enable_root_user in the service_params list
* 11/03/14 Massimiliano Pinto Added Unix socket support
*
* @endverbatim
*/
@ -344,14 +345,36 @@ int error_count = 0;
char *address;
char *port;
char *protocol;
char *socket;
service = config_get_value(obj->parameters, "service");
port = config_get_value(obj->parameters, "port");
address = config_get_value(obj->parameters, "address");
protocol = config_get_value(obj->parameters, "protocol");
if (service && port && protocol)
{
socket = config_get_value(obj->parameters, "socket");
if (service && socket && protocol) {
CONFIG_CONTEXT *ptr = context;
while (ptr && strcmp(ptr->object, service) != 0)
ptr = ptr->next;
if (ptr && ptr->element)
{
serviceAddProtocol(ptr->element,
protocol,
socket,
0);
} else {
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Listener '%s', "
"service '%s' not found. "
"Listener will not execute for socket %s.",
obj->object, service, socket)));
error_count++;
}
}
if (service && port && protocol) {
CONFIG_CONTEXT *ptr = context;
while (ptr && strcmp(ptr->object, service) != 0)
ptr = ptr->next;
@ -763,11 +786,35 @@ SERVER *server;
char *port;
char *protocol;
char *address;
char *socket;
service = config_get_value(obj->parameters, "service");
address = config_get_value(obj->parameters, "address");
port = config_get_value(obj->parameters, "port");
protocol = config_get_value(obj->parameters, "protocol");
socket = config_get_value(obj->parameters, "socket");
if (service && socket && protocol)
{
CONFIG_CONTEXT *ptr = context;
while (ptr && strcmp(ptr->object, service) != 0)
ptr = ptr->next;
if (ptr &&
ptr->element &&
serviceHasProtocol(ptr->element,
protocol,
0) == 0)
{
serviceAddProtocol(ptr->element,
protocol,
socket,
0);
serviceStartProtocol(ptr->element,
protocol,
0);
}
}
if (service && port && protocol)
{
@ -834,6 +881,8 @@ static char *listener_params[] =
"service",
"protocol",
"port",
"address",
"socket",
NULL
};

View File

@ -304,7 +304,7 @@ getUsers(SERVICE *service, struct users *users)
}
num_fields = mysql_num_fields(result);
users_data = (char *)malloc(nusers * (users_data_row_len * sizeof(char)) + 1);
users_data = (char *)calloc(nusers, (users_data_row_len * sizeof(char)) + 1);
if(users_data == NULL)
return -1;
@ -571,7 +571,7 @@ char *mysql_format_user_entry(void *data)
if (entry->ipv4.sin_addr.s_addr == INADDR_ANY) {
snprintf(mysql_user, mysql_user_len, "%s@%%", entry->user);
} else {
snprintf(mysql_user, MYSQL_USER_MAXLEN, entry->user);
strncpy(mysql_user, entry->user, MYSQL_USER_MAXLEN);
strcat(mysql_user, "@");
inet_ntop(AF_INET, &(entry->ipv4).sin_addr, mysql_user+strlen(mysql_user), INET_ADDRSTRLEN);
}

View File

@ -109,7 +109,6 @@ DCB *rval;
#endif
rval->dcb_role = role;
#if 1
//let's call simple_mutex_done in dcb_final_free
simple_mutex_init(&rval->dcb_write_lock, "DCB write mutex");
simple_mutex_init(&rval->dcb_read_lock, "DCB read mutex");
rval->dcb_write_active = false;
@ -303,12 +302,8 @@ dcb_final_free(DCB *dcb)
if (dcb->remote)
free(dcb->remote);
bitmask_free(&dcb->memdata.bitmask);
#if 1
simple_mutex_done(&dcb->dcb_read_lock);
simple_mutex_done(&dcb->dcb_write_lock);
#endif
simple_mutex_done(&dcb->dcb_read_lock);
simple_mutex_done(&dcb->dcb_write_lock);
free(dcb);
}
@ -527,6 +522,8 @@ int rc;
* Successfully connected to backend. Assign file descriptor to dcb
*/
dcb->fd = fd;
/** Copy status field to DCB */
dcb->dcb_server_status = server->status;
/*<
* backend_dcb is connected to backend server, and once backend_dcb
@ -690,10 +687,18 @@ dcb_write(DCB *dcb, GWBUF *queue)
dcb->state != DCB_STATE_LISTENING &&
dcb->state != DCB_STATE_NOPOLLING))
{
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [dcb_write] Write aborted to dcb %p because "
"it is in state %s",
pthread_self(),
dcb->stats.n_buffered,
dcb,
STRDCBSTATE(dcb->state),
dcb->fd)));
return 0;
}
spinlock_acquire(&dcb->writeqlock);
if (dcb->writeq != NULL)
@ -750,7 +755,11 @@ dcb_write(DCB *dcb, GWBUF *queue)
#endif /* SS_DEBUG */
len = GWBUF_LENGTH(queue);
GW_NOINTR_CALL(
w = gw_write(dcb->fd, GWBUF_DATA(queue), len);
w = gw_write(
#if defined(SS_DEBUG)
dcb,
#endif
dcb->fd, GWBUF_DATA(queue), len);
dcb->stats.n_writes++;
);
@ -759,6 +768,8 @@ dcb_write(DCB *dcb, GWBUF *queue)
saved_errno = errno;
errno = 0;
if (LOG_IS_ENABLED(LOGFILE_DEBUG))
{
if (saved_errno == EPIPE) {
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
@ -771,7 +782,12 @@ dcb_write(DCB *dcb, GWBUF *queue)
dcb->fd,
saved_errno,
strerror(saved_errno))));
} else if (saved_errno != EAGAIN &&
}
}
if (LOG_IS_ENABLED(LOGFILE_ERROR))
{
if (saved_errno != EPIPE &&
saved_errno != EAGAIN &&
saved_errno != EWOULDBLOCK)
{
LOGIF(LE, (skygw_log_write_flush(
@ -785,6 +801,7 @@ dcb_write(DCB *dcb, GWBUF *queue)
saved_errno,
strerror(saved_errno))));
}
}
break;
}
/*
@ -799,9 +816,9 @@ dcb_write(DCB *dcb, GWBUF *queue)
pthread_self(),
w,
dcb,
STRDCBSTATE(dcb->state),
STRDCBSTATE(dcb->state),
dcb->fd)));
}
} /*< while (queue != NULL) */
/*<
* What wasn't successfully written is stored to write queue
* for suspended write.
@ -819,7 +836,6 @@ dcb_write(DCB *dcb, GWBUF *queue)
saved_errno != EAGAIN &&
saved_errno != EWOULDBLOCK)
{
queue = gwbuf_consume(queue, gwbuf_length(queue));
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Writing to %s socket failed due %d, %s.",
@ -862,9 +878,13 @@ int saved_errno = 0;
while (dcb->writeq != NULL)
{
len = GWBUF_LENGTH(dcb->writeq);
GW_NOINTR_CALL(w = gw_write(dcb->fd,
GWBUF_DATA(dcb->writeq),
len););
GW_NOINTR_CALL(w = gw_write(
#if defined(SS_DEBUG)
dcb,
#endif
dcb->fd,
GWBUF_DATA(dcb->writeq),
len););
saved_errno = errno;
errno = 0;
@ -1319,12 +1339,15 @@ static bool dcb_set_state_nomutex(
}
int gw_write(
#if defined(SS_DEBUG)
DCB* dcb,
#endif
int fd,
const void* buf,
size_t nbytes)
{
int w;
#if defined(SS_DEBUG)
#if defined(SS_DEBUG)
if (dcb_fake_write_errno[fd] != 0) {
ss_dassert(dcb_fake_write_ev[fd] != 0);
w = write(fd, buf, nbytes/2); /*< leave peer to read missing bytes */
@ -1339,6 +1362,57 @@ int gw_write(
#else
w = write(fd, buf, nbytes);
#endif /* SS_DEBUG && SS_TEST */
#if defined(SS_DEBUG_MYSQL)
{
size_t len;
uint8_t* packet = (uint8_t *)buf;
char* str;
/** Print only MySQL packets */
if (w > 5)
{
str = (char *)&packet[5];
len = packet[0];
len += 256*packet[1];
len += 256*256*packet[2];
if (strncmp(str, "insert", 6) == 0 ||
strncmp(str, "create", 6) == 0 ||
strncmp(str, "drop", 4) == 0)
{
ss_dassert((dcb->dcb_server_status & (SERVER_RUNNING|SERVER_MASTER|SERVER_SLAVE))==(SERVER_RUNNING|SERVER_MASTER));
}
if (strncmp(str, "set autocommit", 14) == 0 && nbytes > 17)
{
char* s = (char *)calloc(1, nbytes+1);
if (nbytes-5 > len)
{
size_t len2 = packet[4+len];
len2 += 256*packet[4+len+1];
len2 += 256*256*packet[4+len+2];
char* str2 = (char *)&packet[4+len+5];
snprintf(s, 5+len+len2, "long %s %s", (char *)str, (char *)str2);
}
else
{
snprintf(s, len, "%s", (char *)str);
}
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"%lu [gw_write] Wrote %d bytes : %s ",
pthread_self(),
w,
s)));
free(s);
}
}
}
#endif
return w;
}

View File

@ -357,6 +357,75 @@ poll_waitevents(void *arg)
dcb,
STRDCBROLE(dcb->dcb_role))));
if (ev & EPOLLOUT)
{
int eno = 0;
eno = gw_getsockerrno(dcb->fd);
if (eno == 0) {
simple_mutex_lock(
&dcb->dcb_write_lock,
true);
ss_info_dassert(
!dcb->dcb_write_active,
"Write already active");
dcb->dcb_write_active = TRUE;
atomic_add(
&pollStats.n_write,
1);
dcb->func.write_ready(dcb);
dcb->dcb_write_active = FALSE;
simple_mutex_unlock(
&dcb->dcb_write_lock);
} else {
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"EPOLLOUT due %d, %s. "
"dcb %p, fd %i",
pthread_self(),
eno,
strerror(eno),
dcb,
dcb->fd)));
}
}
if (ev & EPOLLIN)
{
simple_mutex_lock(&dcb->dcb_read_lock,
true);
ss_info_dassert(!dcb->dcb_read_active,
"Read already active");
dcb->dcb_read_active = TRUE;
if (dcb->state == DCB_STATE_LISTENING)
{
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"Accept in fd %d",
pthread_self(),
dcb->fd)));
atomic_add(
&pollStats.n_accept, 1);
dcb->func.accept(dcb);
}
else
{
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"Read in dcb %p fd %d",
pthread_self(),
dcb,
dcb->fd)));
atomic_add(&pollStats.n_read, 1);
dcb->func.read(dcb);
}
dcb->dcb_read_active = FALSE;
simple_mutex_unlock(
&dcb->dcb_read_lock);
}
if (ev & EPOLLERR)
{
int eno = gw_getsockerrno(dcb->fd);
@ -386,6 +455,7 @@ poll_waitevents(void *arg)
atomic_add(&pollStats.n_error, 1);
dcb->func.error(dcb);
}
if (ev & EPOLLHUP)
{
int eno = 0;
@ -404,82 +474,6 @@ poll_waitevents(void *arg)
atomic_add(&pollStats.n_hup, 1);
dcb->func.hangup(dcb);
}
if (ev & EPOLLOUT)
{
int eno = 0;
eno = gw_getsockerrno(dcb->fd);
if (eno == 0) {
#if 1
simple_mutex_lock(
&dcb->dcb_write_lock,
true);
ss_info_dassert(
!dcb->dcb_write_active,
"Write already active");
dcb->dcb_write_active = TRUE;
#endif
atomic_add(
&pollStats.n_write,
1);
dcb->func.write_ready(dcb);
#if 1
dcb->dcb_write_active = FALSE;
simple_mutex_unlock(
&dcb->dcb_write_lock);
#endif
} else {
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"EPOLLOUT due %d, %s. "
"dcb %p, fd %i",
pthread_self(),
eno,
strerror(eno),
dcb,
dcb->fd)));
}
}
if (ev & EPOLLIN)
{
#if 1
simple_mutex_lock(&dcb->dcb_read_lock,
true);
ss_info_dassert(!dcb->dcb_read_active,
"Read already active");
dcb->dcb_read_active = TRUE;
#endif
if (dcb->state == DCB_STATE_LISTENING)
{
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"Accept in fd %d",
pthread_self(),
dcb->fd)));
atomic_add(
&pollStats.n_accept, 1);
dcb->func.accept(dcb);
}
else
{
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"Read in dcb %p fd %d",
pthread_self(),
dcb,
dcb->fd)));
atomic_add(&pollStats.n_read, 1);
dcb->func.read(dcb);
}
#if 1
dcb->dcb_read_active = FALSE;
simple_mutex_unlock(
&dcb->dcb_read_lock);
#endif
}
} /*< for */
no_op = FALSE;
}

View File

@ -41,9 +41,18 @@
*
* @endverbatim
*/
#include <skygw_debug.h>
typedef enum
{
GWBUF_TYPE_UNDEFINED = 0x0,
GWBUF_TYPE_PLAINSQL = 0x1,
GWBUF_TYPE_MYSQL = 0x2
} gwbuf_type_t;
/**
* A structure to encapsualte the data in a form that the data itself can be
* A structure to encapsulate the data in a form that the data itself can be
* shared between multiple GWBUF's without the need to make multiple copies
* but still maintain separate data pointers.
*/
@ -64,8 +73,9 @@ typedef struct gwbuf {
struct gwbuf *next; /*< Next buffer in a linked chain of buffers */
void *start; /*< Start of the valid data */
void *end; /*< First byte after the valid data */
SHARED_BUF *sbuf; /*< The shared buffer with the real data */
SHARED_BUF *sbuf; /*< The shared buffer with the real data */
int command;/*< The command type for the queue */
gwbuf_type_t gwbuf_type; /*< buffer's data type information */
} GWBUF;
/*<
@ -83,6 +93,7 @@ typedef struct gwbuf {
/*< Consume a number of bytes in the buffer */
#define GWBUF_CONSUME(b, bytes) (b)->start += bytes
#define GWBUF_TYPE(b) (b)->gwbuf_type
/*<
* Function prototypes for the API to maniplate the buffers
*/
@ -92,6 +103,7 @@ extern GWBUF *gwbuf_clone(GWBUF *buf);
extern GWBUF *gwbuf_append(GWBUF *head, GWBUF *tail);
extern GWBUF *gwbuf_consume(GWBUF *head, unsigned int length);
extern unsigned int gwbuf_length(GWBUF *head);
extern GWBUF *gwbuf_clone_portion(GWBUF *head, size_t offset, size_t len);
extern GWBUF *gwbuf_clone_transform(GWBUF *head, gwbuf_type_t type);
extern bool gwbuf_set_type(GWBUF *head, gwbuf_type_t type);
#endif

View File

@ -179,7 +179,7 @@ typedef struct dcb {
SPINLOCK authlock; /**< Generic Authorization spinlock */
DCBSTATS stats; /**< DCB related statistics */
unsigned int dcb_server_status; /*< the server role indicator from SERVER */
struct dcb *next; /**< Next DCB in the chain of allocated DCB's */
struct service *service; /**< The related service */
void *data; /**< Specific client data */
@ -205,7 +205,13 @@ int fail_accept_errno;
#define DCB_ISZOMBIE(x) ((x)->state == DCB_STATE_ZOMBIE)
DCB *dcb_get_zombies(void);
int gw_write(int fd, const void* buf, size_t nbytes);
int gw_write(
#if defined(SS_DEBUG)
DCB* dcb,
#endif
int fd,
const void* buf,
size_t nbytes);
int dcb_write(DCB *, GWBUF *);
DCB *dcb_alloc(dcb_role_t);
void dcb_free(DCB *);

View File

@ -54,6 +54,12 @@ int do_read_dcb(DCB *dcb);
int do_read_10(DCB *dcb, uint8_t *buffer);
int MySQLWrite(DCB *dcb, GWBUF *queue);
int setnonblocking(int fd);
int gw_write(int fd, const void* buf, size_t nbytes);
int gw_write(
#if defined(SS_DEBUG)
DCB* dcb,
#endif
int fd,
const void* buf,
size_t nbytes);
int gw_getsockerrno(int fd);
int parse_bindconfig(char *, unsigned short, struct sockaddr_in *);

View File

@ -35,6 +35,7 @@
#include <service.h>
#include <session.h>
#include <buffer.h>
#include <stdint.h>
/**
* The ROUTER handle points to module specific data, so the best we can do
@ -74,10 +75,13 @@ typedef struct router_object {
void (*diagnostics)(ROUTER *instance, DCB *dcb);
void (*clientReply)(ROUTER* instance, void* router_session, GWBUF* queue, DCB *backend_dcb);
void (*errorReply)(ROUTER* instance, void* router_session, char* message, DCB *backend_dcb, int action);
uint8_t (*getCapabilities)(ROUTER *instance, void* router_session);
} ROUTER_OBJECT;
/* Router commands */
#define ROUTER_DEFAULT 0 /**< Standard routing */
#define ROUTER_CHANGE_SESSION 1 /**< Route a change session */
typedef enum router_capability_t {
RCAP_TYPE_UNDEFINED = 0,
RCAP_TYPE_STMT_INPUT = (1 << 0),
RCAP_TYPE_PACKET_INPUT = (1 << 1)
} router_capability_t;
#endif

View File

@ -95,6 +95,11 @@ typedef struct server {
#define SERVER_IS_SLAVE(server) \
(((server)->status & (SERVER_RUNNING|SERVER_MASTER|SERVER_SLAVE)) == (SERVER_RUNNING|SERVER_SLAVE))
/**
* Is the server joined Galera node? The server must be running and joined.
*/
#define SERVER_IS_JOINED(server) \
(((server)->status & (SERVER_RUNNING|SERVER_MASTER|SERVER_SLAVE|SERVER_JOINED)) == (SERVER_RUNNING|SERVER_JOINED))
extern SERVER *server_alloc(char *, char *, unsigned short);
extern int server_free(SERVER *);

View File

@ -53,6 +53,8 @@ typedef struct spinlock {
#define SPINLOCK_INIT { 0 }
#endif
#define SPINLOCK_IS_LOCKED(l) ((l)->lock != 0 ? true : false)
extern void spinlock_init(SPINLOCK *lock);
extern void spinlock_acquire(SPINLOCK *lock);
extern int spinlock_acquire_nowait(SPINLOCK *lock);

View File

@ -47,6 +47,7 @@
#include <sys/ioctl.h>
#include <errno.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdbool.h>
@ -296,10 +297,11 @@ void gw_str_xor(
const uint8_t *input1,
const uint8_t *input2,
unsigned int len);
char *gw_bin2hex(char *out, const uint8_t *in, unsigned int len);
int gw_hex2bin(uint8_t *out, const char *in, unsigned int len);
int gw_generate_random_str(char *output, int len);
char *gw_strend(register const char *s);
int setnonblocking(int fd);
char *gw_bin2hex(char *out, const uint8_t *in, unsigned int len);
int gw_hex2bin(uint8_t *out, const char *in, unsigned int len);
int gw_generate_random_str(char *output, int len);
char *gw_strend(register const char *s);
int setnonblocking(int fd);
int setipaddress(struct in_addr *a, char *p);
int gw_read_gwbuff(DCB *dcb, GWBUF **head, int b);
int gw_read_gwbuff(DCB *dcb, GWBUF **head, int b);
GWBUF* gw_MySQL_get_next_stmt(GWBUF** p_readbuf);

View File

@ -53,8 +53,8 @@ typedef struct router_client_session {
bool rses_closed; /*< true when closeSession is called */
BACKEND *backend; /*< Backend used by the client session */
DCB *backend_dcb; /*< DCB Connection to the backend */
struct router_client_session
*next;
struct router_client_session *next;
int rses_capabilities; /*< input type, for example */
#if defined(SS_DEBUG)
skygw_chk_t rses_chk_tail;
#endif

View File

@ -24,7 +24,7 @@
* @verbatim
* Revision History
*
* bazaar..
* See GitHub https://github.com/skysql/MaxScale
*
* @endverbatim
*/
@ -41,25 +41,92 @@ typedef struct backend {
int backend_conn_count; /*< Number of connections to the server */
} BACKEND;
typedef struct rses_property_st rses_property_t;
typedef struct router_client_session ROUTER_CLIENT_SES;
typedef enum rses_property_type_t {
RSES_PROP_TYPE_UNDEFINED=0,
RSES_PROP_TYPE_SESCMD,
RSES_PROP_TYPE_FIRST = RSES_PROP_TYPE_SESCMD,
RSES_PROP_TYPE_LAST=RSES_PROP_TYPE_SESCMD,
RSES_PROP_TYPE_COUNT=RSES_PROP_TYPE_LAST+1
} rses_property_type_t;
typedef enum backend_type_t {
BE_UNDEFINED=-1,
BE_MASTER,
BE_JOINED = BE_MASTER,
BE_SLAVE,
BE_COUNT
} backend_type_t;
/**
* Session variable command
*/
typedef struct mysql_sescmd_st {
#if defined(SS_DEBUG)
skygw_chk_t my_sescmd_chk_top;
#endif
rses_property_t* my_sescmd_prop; /*< parent property */
GWBUF* my_sescmd_buf; /*< query buffer */
unsigned char my_sescmd_packet_type;/*< packet type */
bool my_sescmd_is_replied; /*< is cmd replied to client */
#if defined(SS_DEBUG)
skygw_chk_t my_sescmd_chk_tail;
#endif
} mysql_sescmd_t;
/**
* Property structure
*/
struct rses_property_st {
#if defined(SS_DEBUG)
skygw_chk_t rses_prop_chk_top;
#endif
ROUTER_CLIENT_SES* rses_prop_rsession; /*< parent router session */
int rses_prop_refcount;
rses_property_type_t rses_prop_type;
union rses_prop_data {
mysql_sescmd_t sescmd;
void* placeholder; /*< to be removed due new type */
} rses_prop_data;
rses_property_t* rses_prop_next; /*< next property of same type */
#if defined(SS_DEBUG)
skygw_chk_t rses_prop_chk_tail;
#endif
};
typedef struct sescmd_cursor_st {
ROUTER_CLIENT_SES* scmd_cur_rses; /*< pointer to owning router session */
rses_property_t** scmd_cur_ptr_property; /*< address of pointer to owner property */
mysql_sescmd_t* scmd_cur_cmd; /*< pointer to current session command */
bool scmd_cur_active; /*< true if command is being executed */
backend_type_t scmd_cur_be_type; /*< BE_MASTER or BE_SLAVE */
} sescmd_cursor_t;
/**
* The client session structure used within this router.
*/
typedef struct router_client_session {
struct router_client_session {
#if defined(SS_DEBUG)
skygw_chk_t rses_chk_top;
skygw_chk_t rses_chk_top;
#endif
SPINLOCK rses_lock; /*< protects rses_deleted */
int rses_versno; /*< even = no active update, else odd */
bool rses_closed; /*< true when closeSession is called */
BACKEND* be_slave; /*< Slave backend used by client session */
BACKEND* be_master; /*< Master backend used by client session */
DCB* slave_dcb; /*< Slave connection */
DCB* master_dcb; /*< Master connection */
SPINLOCK rses_lock; /*< protects rses_deleted */
int rses_versno; /*< even = no active update, else odd */
bool rses_closed; /*< true when closeSession is called */
/** Properties listed by their type */
rses_property_t* rses_properties[RSES_PROP_TYPE_COUNT];
BACKEND* rses_backend[BE_COUNT];/*< Backends used by client session */
DCB* rses_dcb[BE_COUNT];
/*< cursor is pointer and status variable to current session command */
sescmd_cursor_t rses_cursor[BE_COUNT];
int rses_capabilities; /*< input type, for example */
struct router_client_session* next;
#if defined(SS_DEBUG)
skygw_chk_t rses_chk_tail;
skygw_chk_t rses_chk_tail;
#endif
} ROUTER_CLIENT_SES;
};
/**
* The statistics for this router instance
@ -88,5 +155,4 @@ typedef struct router_instance {
struct router_instance* next; /*< Next router on the list */
} ROUTER_INSTANCE;
#endif
#endif /*< _RWSPLITROUTER_H */

View File

@ -246,7 +246,7 @@ static int gw_read_backend_event(DCB *dcb) {
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : backend server didn't "
"Error : Backend server didn't "
"accept authentication for user "
"%s.",
current_session->user)));
@ -283,6 +283,7 @@ static int gw_read_backend_event(DCB *dcb) {
}
if (backend_protocol->state == MYSQL_AUTH_FAILED) {
spinlock_acquire(&dcb->delayqlock);
/*<
* vraa : errorHandle
* check the delayq before the reply
@ -295,10 +296,12 @@ static int gw_read_backend_event(DCB *dcb) {
0,
"Connection to backend lost.");
// consume all the delay queue
dcb->delayq = gwbuf_consume(
while ((dcb->delayq = gwbuf_consume(
dcb->delayq,
gwbuf_length(dcb->delayq));
GWBUF_LENGTH(dcb->delayq))) != NULL);
}
spinlock_release(&dcb->delayqlock);
/* try reload users' table for next connection */
service_refresh_users(dcb->session->client->service);
@ -350,7 +353,7 @@ static int gw_read_backend_event(DCB *dcb) {
pthread_self(),
dcb->fd,
current_session->user)));
/* check the delay queue and flush the data */
if (dcb->delayq)
{
@ -519,12 +522,13 @@ static int
gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
{
MySQLProtocol *backend_protocol = dcb->protocol;
int rc;
int rc = 0;
/*<
* Don't write to backend if backend_dcb is not in poll set anymore.
*/
spinlock_acquire(&dcb->authlock);
spinlock_acquire(&dcb->dcb_initlock);
if (dcb->state != DCB_STATE_POLLING) {
/*< vraa : errorHandle */
/*< Free buffer memory */
@ -538,37 +542,79 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
dcb,
dcb->fd,
STRDCBSTATE(dcb->state))));
spinlock_release(&dcb->authlock);
return 0;
spinlock_release(&dcb->dcb_initlock);
rc = 0;
goto return_rc;
}
/*<
* Now put the incoming data to the delay queue unless backend is
* connected with auth ok
*/
if (backend_protocol->state != MYSQL_IDLE) {
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_MySQLWrite_backend] dcb %p fd %d protocol "
"state %s.",
pthread_self(),
dcb,
dcb->fd,
STRPROTOCOLSTATE(backend_protocol->state))));
backend_set_delayqueue(dcb, queue);
spinlock_release(&dcb->authlock);
return 1;
}
/*<
* Now we set the last command received, from the current queue
*/
memcpy(&dcb->command, &queue->command, sizeof(dcb->command));
spinlock_release(&dcb->authlock);
rc = dcb_write(dcb, queue);
spinlock_release(&dcb->dcb_initlock);
spinlock_acquire(&dcb->authlock);
/**
* Pick action according to state of protocol.
* If auth failed, return value is 0, write and buffered write
* return 1.
*/
switch(backend_protocol->state) {
case MYSQL_AUTH_FAILED:
{
size_t len;
char* str;
uint8_t* packet = (uint8_t *)queue->start;
uint8_t* startpoint;
len = (size_t)MYSQL_GET_PACKET_LEN(packet);
startpoint = &packet[5];
str = (char *)malloc(len+1);
snprintf(str, len+1, "%s", startpoint);
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Routing query \"%s\" failed due to "
"authentication failure.",
str)));
/** Consume query buffer */
while ((queue = gwbuf_consume(
queue,
GWBUF_LENGTH(queue))) != NULL);
free(str);
}
rc = 0;
spinlock_release(&dcb->authlock);
goto return_rc;
break;
case MYSQL_IDLE:
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_MySQLWrite_backend] write to dcb %p "
"fd %d protocol state %s.",
pthread_self(),
dcb,
dcb->fd,
STRPROTOCOLSTATE(backend_protocol->state))));
spinlock_release(&dcb->authlock);
rc = dcb_write(dcb, queue);
goto return_rc;
break;
default:
/*<
* Now put the incoming data to the delay queue unless backend is
* connected with auth ok
*/
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_MySQLWrite_backend] delayed write to "
"dcb %p fd %d protocol state %s.",
pthread_self(),
dcb,
dcb->fd,
STRPROTOCOLSTATE(backend_protocol->state))));
backend_set_delayqueue(dcb, queue);
spinlock_release(&dcb->authlock);
rc = 1;
goto return_rc;
break;
}
return_rc:
return rc;
}
@ -804,12 +850,6 @@ static int backend_write_delayqueue(DCB *dcb)
localq = dcb->delayq;
dcb->delayq = NULL;
/*<
* Now we set the last command received, from the delayed queue
*/
memcpy(&dcb->command, &localq->command, sizeof(dcb->command));
spinlock_release(&dcb->delayqlock);
rc = dcb_write(dcb, localq);
@ -851,15 +891,12 @@ static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWB
unsigned int auth_token_len = 0;
uint8_t *auth_token = NULL;
int rv = -1;
int len = 0;
int auth_ret = 1;
current_session = (MYSQL_session *)in_session->client->data;
backend_protocol = backend->protocol;
client_protocol = in_session->client->protocol;
queue->command = ROUTER_CHANGE_SESSION;
// now get the user, after 4 bytes header and 1 byte command
client_auth_packet += 5;
strcpy(username, (char *)client_auth_packet);
@ -910,31 +947,14 @@ static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWB
rv = gw_send_change_user_to_backend(database, username, client_sha1, backend_protocol);
/*<
* The current queue was not handled by func.write() in gw_send_change_user_to_backend()
* We wrote a new gwbuf
* Set backend command here!
*/
memcpy(&backend->command, &queue->command, sizeof(backend->command));
/*<
* Now copy new data into user session
*/
strcpy(current_session->user, username);
strcpy(current_session->db, database);
memcpy(current_session->client_sha1, client_sha1, sizeof(current_session->client_sha1));
}
// consume all the data received from client
spinlock_acquire(&backend->writeqlock);
len = gwbuf_length(queue);
queue = gwbuf_consume(queue, len);
spinlock_release(&backend->writeqlock);
}
gwbuf_free(queue);
return rv;
}
@ -951,7 +971,6 @@ static int gw_session(DCB *backend_dcb, void *data) {
GWBUF *queue = NULL;
queue = (GWBUF *) data;
queue->command = ROUTER_CHANGE_SESSION;
backend_dcb->func.write(backend_dcb, queue);
return 1;

View File

@ -32,6 +32,7 @@
* 24/02/2014 Massimiliano Pinto Added: on failed authentication a new users' table is loaded with time and frequency limitations
* If current user is authenticated the new users' table will replace the old one
* 28/02/2014 Massimiliano Pinto Added: client IPv4 in dcb->ipv4 and inet_ntop for string representation
* 11/03/2014 Massimiliano Pinto Added: Unix socket support
*
*/
@ -56,6 +57,11 @@ static int gw_client_hangup_event(DCB *dcb);
int mysql_send_ok(DCB *dcb, int packet_number, int in_affected_rows, const char* mysql_message);
int MySQLSendHandshake(DCB* dcb);
static int gw_mysql_do_authentication(DCB *dcb, GWBUF *queue);
static int route_by_statement(
ROUTER* router_instance,
ROUTER_OBJECT* router,
void* rsession,
GWBUF* read_buf);
/*
* The "module object" for the mysqld client protocol module.
@ -72,7 +78,7 @@ static GWPROTOCOL MyObject = {
gw_MySQLListener, /* Listen */
NULL, /* Authentication */
NULL /* Session */
};
};
/**
* Implementation of the mandatory version entry point
@ -620,10 +626,11 @@ int gw_read_client_event(DCB* dcb) {
*/
{
int len = -1;
GWBUF *queue = NULL;
GWBUF *gw_buffer = NULL;
uint8_t cap = 0;
GWBUF *read_buffer = NULL;
uint8_t *ptr_buff = NULL;
int mysql_command = -1;
bool stmt_input; /*< router input type */
session = dcb->session;
@ -639,21 +646,20 @@ int gw_read_client_event(DCB* dcb) {
//////////////////////////////////////////////////////
// read and handle errors & close, or return if busy
//////////////////////////////////////////////////////
rc = gw_read_gwbuff(dcb, &gw_buffer, b);
rc = gw_read_gwbuff(dcb, &read_buffer, b);
if (rc != 0) {
goto return_rc;
}
/* Now, we are assuming in the first buffer there is
* the information form mysql command */
queue = gw_buffer;
len = GWBUF_LENGTH(queue);
ptr_buff = GWBUF_DATA(queue);
len = GWBUF_LENGTH(read_buffer);
ptr_buff = GWBUF_DATA(read_buffer);
/* get mysql commang at fifth byte */
if (ptr_buff) {
mysql_command = ptr_buff[4];
}
}
/**
* Without rsession there is no access to backend.
* COM_QUIT : close client dcb
@ -682,12 +688,43 @@ int gw_read_client_event(DCB* dcb) {
}
rc = 1;
/** Free buffer */
queue = gwbuf_consume(queue, len);
read_buffer = gwbuf_consume(read_buffer, len);
goto return_rc;
}
/** Ask what type of input the router expects */
cap = router->getCapabilities(router_instance, rsession);
if (cap == 0 || (cap == RCAP_TYPE_PACKET_INPUT))
{
stmt_input = false;
}
else if (cap == RCAP_TYPE_STMT_INPUT)
{
stmt_input = true;
/** Mark buffer to as MySQL type */
gwbuf_set_type(read_buffer, GWBUF_TYPE_MYSQL);
}
else
{
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"%lu [gw_read_client_event] Reading router "
"capabilities failed.",
pthread_self())));
mysql_send_custom_error(dcb,
1,
0,
"Operation failed. Router "
"session is closed.");
rc = 1;
goto return_rc;
}
/** Route COM_QUIT to backend */
if (mysql_command == '\x01') {
router->routeQuery(router_instance, rsession, queue);
router->routeQuery(router_instance, rsession, read_buffer);
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"%lu [gw_read_client_event] Routed COM_QUIT to "
@ -703,10 +740,25 @@ int gw_read_client_event(DCB* dcb) {
}
else
{
/** Route other commands to backend */
rc = router->routeQuery(router_instance,
if (stmt_input)
{
/**
* Feed each statement completely and separately
* to router.
*/
rc = route_by_statement(router_instance,
router,
rsession,
read_buffer);
}
else
{
/** Feed whole packet to router */
rc = router->routeQuery(router_instance,
rsession,
queue);
read_buffer);
}
/** succeed */
if (rc == 1) {
rc = 0; /**< here '0' means success */
@ -807,47 +859,94 @@ int gw_MySQLListener(
{
int l_so;
struct sockaddr_in serv_addr;
struct sockaddr_un local_addr;
struct sockaddr *current_addr;
int one = 1;
int rc;
/* this gateway, as default, will bind on port 4404 for localhost only */
if (!parse_bindconfig(config_bind, 4406, &serv_addr))
return 0;
listen_dcb->fd = -1;
// socket create
if ((l_so = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
fprintf(stderr,
"\n* Error: can't open listening socket due "
"error %i, %s.\n\n\t",
errno,
strerror(errno));
return 0;
if (strchr(config_bind, '/')) {
char *tmp = strrchr(config_bind, ':');
if (tmp)
*tmp = '\0';
// UNIX socket create
if ((l_so = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
fprintf(stderr,
"\n* Error: can't create UNIX socket due "
"error %i, %s.\n\n\t",
errno,
strerror(errno));
return 0;
}
memset(&local_addr, 0, sizeof(local_addr));
local_addr.sun_family = AF_UNIX;
strncpy(local_addr.sun_path, config_bind, sizeof(local_addr.sun_path) - 1);
current_addr = (struct sockaddr *) &local_addr;
} else {
/* MaxScale, as default, will bind on port 4406 */
if (!parse_bindconfig(config_bind, 4406, &serv_addr)) {
fprintf(stderr, "Error in parse_bindconfig for [%s]\n", config_bind);
return 0;
}
// TCP socket create
if ((l_so = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
fprintf(stderr,
"\n* Error: can't create socket due "
"error %i, %s.\n\n\t",
errno,
strerror(errno));
return 0;
}
current_addr = (struct sockaddr *) &serv_addr;
}
listen_dcb->fd = -1;
// socket options
setsockopt(l_so, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one));
// set NONBLOCKING mode
setnonblocking(l_so);
setnonblocking(l_so);
/* get the right socket family for bind */
switch (current_addr->sa_family) {
case AF_UNIX:
rc = unlink(config_bind);
if ( (rc == -1) && (errno!=ENOENT) ) {
fprintf(stderr, "Error unlink Unix Socket %s\n", config_bind);
}
if (bind(l_so, (struct sockaddr *) &local_addr, sizeof(local_addr)) < 0) {
fprintf(stderr,
"\n* Bind failed due error %i, %s.\n",
errno,
strerror(errno));
fprintf(stderr, "* Can't bind to %s\n\n", config_bind);
return 0;
}
break;
case AF_INET:
if (bind(l_so, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
fprintf(stderr,
"\n* Bind failed due error %i, %s.\n",
errno,
strerror(errno));
fprintf(stderr, "* Can't bind to %s\n\n", config_bind);
return 0;
}
break;
default:
fprintf(stderr, "* Socket Family %i not supported\n", current_addr->sa_family);
return 0;
}
// bind address and port
if (bind(l_so, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
fprintf(stderr,
"\n* Bind failed due error %i, %s.\n",
errno,
strerror(errno));
fprintf(stderr, "* Can't bind to %s\n\n",
config_bind);
return 0;
}
/*
fprintf(stderr,
">> GATEWAY bind is: %s:%i. FD is %i\n",
address,
port,
l_so);
*/
rc = listen(l_so, 10 * SOMAXCONN);
if (rc == 0) {
@ -863,11 +962,6 @@ int gw_MySQLListener(
strerror(eno));
return 0;
}
/*
fprintf(stderr,
">> GATEWAY listen backlog queue is %i\n",
10 * SOMAXCONN);
*/
// assign l_so to dcb
listen_dcb->fd = l_so;
@ -908,8 +1002,8 @@ int gw_MySQLAccept(DCB *listener)
DCB *client_dcb;
MySQLProtocol *protocol;
int c_sock;
struct sockaddr_in local;
socklen_t addrlen = sizeof(struct sockaddr_in);
struct sockaddr client_conn;
socklen_t client_len = sizeof(struct sockaddr_storage);
int sendbuf = GW_BACKEND_SO_SNDBUF;
socklen_t optlen = sizeof(sendbuf);
int eno = 0;
@ -932,8 +1026,8 @@ int gw_MySQLAccept(DCB *listener)
#endif /* SS_DEBUG */
// new connection from client
c_sock = accept(listener->fd,
(struct sockaddr *) &local,
&addrlen);
(struct sockaddr *) &client_conn,
&client_len);
eno = errno;
errno = 0;
#if defined(SS_DEBUG)
@ -1020,19 +1114,38 @@ int gw_MySQLAccept(DCB *listener)
setnonblocking(c_sock);
client_dcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER);
if (client_dcb == NULL) {
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"%lu [gw_MySQLAccept] Failed to create "
"dcb object for client connection.",
pthread_self())));
rc = 1;
goto return_rc;
}
client_dcb->service = listener->session->service;
client_dcb->fd = c_sock;
/* client IPv4 in raw data*/
memcpy(&client_dcb->ipv4, &local, sizeof(struct sockaddr_in));
/* client IPv4 in string representation */
client_dcb->remote = (char *)calloc(INET_ADDRSTRLEN+1, sizeof(char));
if (client_dcb->remote != NULL) {
inet_ntop(AF_INET, &(client_dcb->ipv4).sin_addr, client_dcb->remote, INET_ADDRSTRLEN);
// get client address
if ( client_conn.sa_family == AF_UNIX) {
// client address
client_dcb->remote = strdup("localhost_from_socket");
// set localhost IP for user authentication
(client_dcb->ipv4).sin_addr.s_addr = 0x0100007F;
} else {
/* client IPv4 in raw data*/
memcpy(&client_dcb->ipv4, (struct sockaddr_in *)&client_conn, sizeof(struct sockaddr_in));
/* client IPv4 in string representation */
client_dcb->remote = (char *)calloc(INET_ADDRSTRLEN+1, sizeof(char));
if (client_dcb->remote != NULL) {
inet_ntop(AF_INET, &(client_dcb->ipv4).sin_addr, client_dcb->remote, INET_ADDRSTRLEN);
}
}
protocol = mysql_protocol_init(client_dcb, c_sock);
ss_dassert(protocol != NULL);
if (protocol == NULL) {
@ -1131,7 +1244,6 @@ static int gw_error_client_event(DCB *dcb) {
router = session->service->router;
router_instance = session->service->router_instance;
rsession = session->router_session;
router->closeSession(router_instance, rsession);
}
dcb_close(dcb);
@ -1189,7 +1301,8 @@ gw_client_hangup_event(DCB *dcb)
void* router_instance;
void* rsession;
int rc = 1;
#if defined(SS_DEBUG)
#if defined(SS_DEBUG)
MySQLProtocol* protocol = (MySQLProtocol *)dcb->protocol;
if (dcb->state == DCB_STATE_POLLING ||
dcb->state == DCB_STATE_NOPOLLING ||
@ -1198,8 +1311,6 @@ gw_client_hangup_event(DCB *dcb)
CHK_PROTOCOL(protocol);
}
#endif
CHK_DCB(dcb);
if (dcb->state != DCB_STATE_POLLING) {
@ -1216,7 +1327,6 @@ gw_client_hangup_event(DCB *dcb)
router = session->service->router;
router_instance = session->service->router_instance;
rsession = session->router_session;
router->closeSession(router_instance, rsession);
}
@ -1224,3 +1334,99 @@ gw_client_hangup_event(DCB *dcb)
return_rc:
return rc;
}
/**
* Detect if buffer includes partial mysql packet or multiple packets.
* Store partial packet to pendingqueue. Send complete packets one by one
* to router.
*/
static int route_by_statement(
ROUTER* router_instance,
ROUTER_OBJECT* router,
void* rsession,
GWBUF* readbuf)
{
int rc = -1;
DCB* master_dcb;
GWBUF* stmtbuf;
uint8_t* payload;
static size_t len;
do
{
stmtbuf = gw_MySQL_get_next_stmt(&readbuf);
ss_dassert(stmtbuf != NULL);
CHK_GWBUF(stmtbuf);
payload = (uint8_t *)GWBUF_DATA(stmtbuf);
/**
* If message is longer than read data, suspend routing and
* add statement buffer to wait queue.
*/
rc = router->routeQuery(router_instance, rsession, stmtbuf);
}
while (readbuf != NULL);
return rc;
}
/**
* Create a character array including the query string.
* GWBUF given as input includes either one complete or partial query.
* Length of buffer is at most the query length+4 (length of packet header).
*/
#if defined(NOT_USED)
static char* gw_get_or_create_querystr (
void* data,
bool* new_allocation)
{
GWBUF* buf = (GWBUF *)data;
size_t buflen; /*< first gw buffer data length */
size_t packetlen; /*< length of mysql packet */
size_t querylen; /*< total buffer length-<length of type indicator> */
size_t nbytes_copied;
char* startpos; /*< first byte of query in gw buffer */
char* str; /*< resulting query string */
CHK_GWBUF(buf);
packetlen = MYSQL_GET_PACKET_LEN((uint8_t *)GWBUF_DATA(buf));
str = (char *)malloc(packetlen); /*< leave space for terminating null */
if (str == NULL)
{
goto return_str;
}
*new_allocation = true;
/**
* First buffer includes 4 bytes header and a type indicator byte.
*/
buflen = GWBUF_LENGTH(buf);
querylen = packetlen-1;
ss_dassert(buflen<=querylen+5); /*< 5 == header+type indicator */
startpos = (char *)GWBUF_DATA(buf)+5;
nbytes_copied = MIN(querylen, buflen-5);
memcpy(str, startpos, nbytes_copied);
memset(&str[querylen-1], 0, 1);
buf = gwbuf_consume(buf, querylen-1);
/**
* In case of multi-packet statement whole buffer consists of query
* string.
*/
while (buf != NULL)
{
buflen = GWBUF_LENGTH(buf);
memcpy(str+nbytes_copied, GWBUF_DATA(buf), buflen);
nbytes_copied += buflen;
buf = gwbuf_consume(buf, buflen);
}
ss_dassert(str[querylen-1] == 0);
return_str:
return str;
}
#endif

View File

@ -182,7 +182,7 @@ int gw_read_backend_handshake(MySQLProtocol *conn) {
conn->state = MYSQL_AUTH_SENT;
// consume all the data here
head = gwbuf_consume(head, gwbuf_length(head));
head = gwbuf_consume(head, GWBUF_LENGTH(head));
return 0;
}
@ -332,17 +332,13 @@ int gw_receive_backend_auth(
tmpbuf[4],
tmpbuf)));
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Invalid authentication message from "
"backend server. Authentication failed.")));
free(tmpbuf);
rc = -1;
}
/*<
* Remove data from buffer.
*/
head = gwbuf_consume(head, gwbuf_length(head));
head = gwbuf_consume(head, GWBUF_LENGTH(head));
}
else if (n == 0)
{
@ -1280,3 +1276,57 @@ mysql_send_auth_error (DCB *dcb, int packet_number, int in_affected_rows, const
return sizeof(mysql_packet_header) + mysql_payload_size;
}
/**
* Remove the first mysql statement from buffer. Return pointer to the removed
* statement or NULL if buffer is empty.
*
* Clone buf, calculate the length of included mysql stmt, and point the
* statement with cloned buffer. Move the start pointer of buf accordingly
* so that it only cover the remaining buffer.
*
*/
GWBUF* gw_MySQL_get_next_stmt(
GWBUF** p_readbuf)
{
GWBUF* stmtbuf;
size_t buflen;
size_t strlen;
uint8_t* packet;
if (*p_readbuf == NULL)
{
stmtbuf = NULL;
goto return_stmtbuf;
}
CHK_GWBUF(*p_readbuf);
if (GWBUF_EMPTY(*p_readbuf))
{
stmtbuf = NULL;
goto return_stmtbuf;
}
buflen = GWBUF_LENGTH((*p_readbuf));
packet = GWBUF_DATA((*p_readbuf));
strlen = MYSQL_GET_PACKET_LEN(packet);
if (strlen+4 == buflen)
{
stmtbuf = *p_readbuf;
*p_readbuf = NULL;
goto return_stmtbuf;
}
/** vraa :Multi-packet stmt is not supported as of 7.3.14 */
if (strlen-1 > buflen-5)
{
stmtbuf = NULL;
goto return_stmtbuf;
}
stmtbuf = gwbuf_clone_portion(*p_readbuf, 0, strlen+4);
*p_readbuf = gwbuf_consume(*p_readbuf, strlen+4);
return_stmtbuf:
return stmtbuf;
}

View File

@ -54,6 +54,7 @@ static void closeSession(ROUTER *instance, void *router_session);
static void freeSession(ROUTER *instance, void *router_session);
static int execute(ROUTER *instance, void *router_session, GWBUF *queue);
static void diagnostics(ROUTER *instance, DCB *dcb);
static uint8_t getCapabilities (ROUTER* inst, void* router_session);
/** The module object definition */
static ROUTER_OBJECT MyObject = {
@ -64,7 +65,8 @@ static ROUTER_OBJECT MyObject = {
execute,
diagnostics,
NULL,
NULL
NULL,
getCapabilities
};
extern int execute_cmd(CLI_SESSION *cli);
@ -273,3 +275,10 @@ diagnostics(ROUTER *instance, DCB *dcb)
{
return; /* Nothing to do currently */
}
static uint8_t getCapabilities(
ROUTER* inst,
void* router_session)
{
return 0;
}

View File

@ -64,6 +64,7 @@
* 22/10/2013 Massimiliano Pinto errorReply called from backend, for client error reply
* or take different actions such as open a new backend connection
* 20/02/2014 Massimiliano Pinto If router_options=slave, route traffic to master if no slaves available
* 06/03/2014 Massimiliano Pinto Server connection counter is now updated in closeSession
*
* @endverbatim
*/
@ -107,6 +108,8 @@ static void errorReply(
char *message,
DCB *backend_dcb,
int action);
static uint8_t getCapabilities (ROUTER* inst, void* router_session);
/** The module object definition */
static ROUTER_OBJECT MyObject = {
@ -117,13 +120,14 @@ static ROUTER_OBJECT MyObject = {
routeQuery,
diagnostics,
clientReply,
errorReply
errorReply,
getCapabilities
};
static bool rses_begin_router_action(
static bool rses_begin_locked_router_action(
ROUTER_CLIENT_SES* rses);
static void rses_exit_router_action(
static void rses_end_locked_router_action(
ROUTER_CLIENT_SES* rses);
static SPINLOCK instlock;
@ -398,6 +402,8 @@ int master_host = -1;
}
}
client_rses->rses_capabilities = RCAP_TYPE_PACKET_INPUT;
/*
* We now have the server with the least connections.
* Bump the connection count for this server
@ -468,7 +474,6 @@ static void freeSession(
prev_val = atomic_add(&router_cli_ses->backend->current_connection_count, -1);
ss_dassert(prev_val > 0);
atomic_add(&router_cli_ses->backend->server->stats.n_current, -1);
spinlock_acquire(&router->lock);
if (router->connections == router_cli_ses) {
@ -517,13 +522,16 @@ DCB* backend_dcb;
/**
* Lock router client session for secure read and update.
*/
if (rses_begin_router_action(router_cli_ses))
if (rses_begin_locked_router_action(router_cli_ses))
{
/* decrease server current connection counter */
atomic_add(&router_cli_ses->backend->server->stats.n_current, -1);
backend_dcb = router_cli_ses->backend_dcb;
router_cli_ses->backend_dcb = NULL;
router_cli_ses->rses_closed = true;
/** Unlock */
rses_exit_router_action(router_cli_ses);
rses_end_locked_router_action(router_cli_ses);
/**
* Close the backend server connection
@ -569,21 +577,21 @@ routeQuery(ROUTER *instance, void *router_session, GWBUF *queue)
/**
* Lock router client session for secure read of DCBs
*/
rses_is_closed = !(rses_begin_router_action(router_cli_ses));
rses_is_closed = !(rses_begin_locked_router_action(router_cli_ses));
}
if (!rses_is_closed)
{
backend_dcb = router_cli_ses->backend_dcb;
/** unlock */
rses_exit_router_action(router_cli_ses);
rses_end_locked_router_action(router_cli_ses);
}
if (rses_is_closed || backend_dcb == NULL)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"Error: Failed to route MySQL command %d to backend "
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Error : Failed to route MySQL command %d to backend "
"server.",
mysql_command)));
goto return_rc;
@ -687,7 +695,7 @@ static void
errorReply(
ROUTER *instance,
void *router_session,
char *message,
char *message,
DCB *backend_dcb,
int action)
{
@ -714,7 +722,7 @@ errorReply(
* @details (write detailed description here)
*
*/
static bool rses_begin_router_action(
static bool rses_begin_locked_router_action(
ROUTER_CLIENT_SES* rses)
{
bool succp = false;
@ -749,9 +757,17 @@ return_succp:
* @details (write detailed description here)
*
*/
static void rses_exit_router_action(
static void rses_end_locked_router_action(
ROUTER_CLIENT_SES* rses)
{
CHK_CLIENT_RSES(rses);
spinlock_release(&rses->rses_lock);
}
static uint8_t getCapabilities(
ROUTER* inst,
void* router_session)
{
return 0;
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,41 @@
# cleantests - clean local and subdirectories' tests
# buildtests - build all local and subdirectories' tests
# runtests - run all local tests
# testall - clean, build and run local and subdirectories' tests
include ../../../../../build_gateway.inc
include $(ROOT_PATH)/makefile.inc
include $(ROOT_PATH)/test.inc
CC=cc
TESTLOG := $(shell pwd)/testrouters.log
RET := -1
cleantests:
- $(DEL) *.o
- $(DEL) *~
testall:
-$(MAKE) cleantests
-$(MAKE) DEBUG=Y buildtests
-$(MAKE) runtests
buildtests:
runtests:
@echo "" >> $(TESTLOG)
@echo "-------------------------------" >> $(TESTLOG)
@echo $(shell date) >> $(TESTLOG)
@echo "Test MaxScale R/W Split" >> $(TESTLOG)
@echo "-------------------------------" >> $(TESTLOG)
./rwsplit.sh $(TESTLOG) $(THOST) $(TPORT_RW) $(TMASTER_ID) $(TUSER) $(TPWD)
@echo "" >> $(TESTLOG)
pesce:
@echo "fine"

View File

@ -0,0 +1,51 @@
#!/bin/sh
NARGS=6
TLOG=$1
THOST=$2
TPORT=$3
TMASTER_ID=$4
TUSER=$5
TPWD=$6
if [ $# != $NARGS ] ;
then
echo""
echo "Wrong number of arguments, gave "$#" but "$NARGS" is required"
echo ""
echo "Usage :"
echo " rwsplit.sh <log filename> <host> <port> <master id> <user> <password>"
echo ""
exit 1
fi
RUNCMD=mysql\ --host=$THOST\ -P$TPORT\ -u$TUSER\ -p$TPWD\ --unbuffered=true\ --disable-reconnect\ --silent
TINPUT=test_transaction_routing2.sql
TRETVAL=0
a=`$RUNCMD < ./$TINPUT`
if [ "$a" != "$TRETVAL" ]; then
echo "$TINPUT FAILED, return value $a when $TRETVAL was expected">>$TLOG;
else
echo "$TINPUT PASSED">>$TLOG ;
fi
TINPUT=test_transaction_routing3.sql
TRETVAL=2
a=`$RUNCMD < ./$TINPUT`
if [ "$a" != "$TRETVAL" ]; then
echo "$TINPUT FAILED, return value $a when $TRETVAL was expected">>$TLOG;
else
echo "$TINPUT PASSED">>$TLOG ;
fi
# set a var via SELECT INTO @, get data from master, returning server-id: put master server-id value in TRETVAL
TINPUT=select_for_var_set.sql
TRETVAL=$TMASTER_ID
a=`$RUNCMD < ./$TINPUT`
if [ "$a" != "$TRETVAL" ]; then
echo "$TINPUT FAILED, return value $a when $TRETVAL was expected">>$TLOG;
else
echo "$TINPUT PASSED">>$TLOG ;
fi

View File

@ -0,0 +1,5 @@
-- simple read with variable from master
BEGIN;
SELECT (@@server_id) INTO @a;
SELECT @a;
COMMIT;

View File

@ -0,0 +1,19 @@
USE test;
SET autocommit = 0;
SET @a= -1;
SET @b= -2;
START TRANSACTION;
CREATE TABLE IF NOT EXISTS myCity (a int, b char(20));
INSERT INTO myCity VALUES (1, 'Milan');
INSERT INTO myCity VALUES (2, 'London');
COMMIT;
START TRANSACTION;
DELETE FROM myCity;
SET @a = (SELECT COUNT(*) FROM myCity);
ROLLBACK;
START TRANSACTION;
SET @b = (SELECT COUNT(*) FROM myCity);
START TRANSACTION;
DROP TABLE myCity;
SELECT (@a+@b) AS res;
COMMIT;

View File

@ -0,0 +1,11 @@
USE test;
SET autocommit = 0;
START TRANSACTION;
CREATE TABLE IF NOT EXISTS myCity (a int, b char(20));
INSERT INTO myCity VALUES (1, 'Milan');
INSERT INTO myCity VALUES (2, 'London');
COMMIT;
START TRANSACTION;
DELETE FROM myCity;
SELECT COUNT(*) FROM myCity; -- read transaction's modifications from master
COMMIT;

View File

@ -0,0 +1,10 @@
USE test;
SET autocommit = 0;
START TRANSACTION;
CREATE TABLE IF NOT EXISTS myCity (a int, b char(20));
INSERT INTO myCity VALUES (1, 'Milan');
INSERT INTO myCity VALUES (2, 'London');
COMMIT;
DELETE FROM myCity;
SELECT COUNT(*) FROM myCity; -- read transaction's modifications from slave
COMMIT;

View File

@ -26,6 +26,7 @@ static void closeSession(ROUTER *instance, void *session);
static void freeSession(ROUTER *instance, void *session);
static int routeQuery(ROUTER *instance, void *session, GWBUF *queue);
static void diagnostic(ROUTER *instance, DCB *dcb);
static uint8_t getCapabilities (ROUTER* inst, void* router_session);
static ROUTER_OBJECT MyObject = {
@ -36,7 +37,8 @@ static ROUTER_OBJECT MyObject = {
routeQuery,
diagnostic,
NULL,
NULL
NULL,
getCapabilities
};
/**
@ -137,3 +139,10 @@ static void
diagnostic(ROUTER *instance, DCB *dcb)
{
}
static uint8_t getCapabilities(
ROUTER* inst,
void* router_session)
{
return 0;
}

35
test.inc Normal file
View File

@ -0,0 +1,35 @@
#
# This file includes parameters needed for running tests and may be included
# in makefiles in test directories if seen useful.
#
# hostname or IP address of MaxScale's host, for example:
#
# THOST := 127.0.0.1
#
THOST :=
#
# port of read connection router module, for example:
# TPORT_RCONN := 4008
#
TPORT_RCONN :=
#
# port of read/write split router module, for example:
# TPORT_RW := 4006
#
TPORT_RW :=
#
# username of MaxScale user, for example:
# TUSER := maxuser
#
TUSER :=
#
# password of MaxScale user, for example:
# TPWD := maxpwd
#
TPWD :=
#
# master's server_id, for example:
# TMASTER_ID := 2
#
TMASTER_ID :=

View File

@ -117,7 +117,9 @@ typedef enum skygw_chk_t {
CHK_NUM_DCB,
CHK_NUM_PROTOCOL,
CHK_NUM_SESSION,
CHK_NUM_ROUTER_SES
CHK_NUM_ROUTER_SES,
CHK_NUM_MY_SESCMD,
CHK_NUM_ROUTER_PROPERTY
} skygw_chk_t;
# define STRBOOL(b) ((b) ? "true" : "false")
@ -214,7 +216,11 @@ typedef enum skygw_chk_t {
((r) == DCB_ROLE_REQUEST_HANDLER ? "DCB_ROLE_REQUEST_HANDLER" : \
"UNKNOWN DCB ROLE"))
#define STRBETYPE(t) ((t) == BE_MASTER ? "BE_MASTER" : \
((t) == BE_SLAVE ? "BE_SLAVE" : \
((t) == BE_UNDEFINED ? "BE_UNDEFINED" : \
"Unknown backend tpe")))
#define CHK_MLIST(l) { \
ss_info_dassert((l->mlist_chk_top == CHK_NUM_MLIST && \
l->mlist_chk_tail == CHK_NUM_MLIST), \
@ -428,7 +434,19 @@ typedef enum skygw_chk_t {
"Router client session has invalid check fields"); \
}
#define CHK_RSES_PROP(p) { \
ss_info_dassert((p)->rses_prop_chk_top == CHK_NUM_ROUTER_PROPERTY && \
(p)->rses_prop_chk_tail == CHK_NUM_ROUTER_PROPERTY, \
"Router property has invalid check fields"); \
}
#define CHK_MYSQL_SESCMD(s) { \
ss_info_dassert((s)->my_sescmd_chk_top == CHK_NUM_MY_SESCMD && \
(s)->my_sescmd_chk_tail == CHK_NUM_MY_SESCMD, \
"Session command has invalid check fields"); \
}
#if defined(SS_DEBUG)
bool conn_open[10240];
#endif