Merge branch 'develop' into MAX-324

Conflicts:
	query_classifier/query_classifier.h
This commit is contained in:
Markus Makela
2015-01-13 06:34:22 +02:00
19 changed files with 504 additions and 315 deletions

View File

@ -1,5 +1,6 @@
cmake_minimum_required(VERSION 2.6)
message(STATUS "CMake version: ${CMAKE_VERSION}")
include(macros.cmake)
enable_testing()
@ -13,10 +14,14 @@ set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/")
project(MaxScale)
#Disabled for now pending evaluation
#include(CheckPlatform.cmake)
check_deps()
check_dirs()
find_package(Valgrind)
find_package(MySQLClient)
find_package(MySQLConfig)
set(CMAKE_INSTALL_RPATH ${CMAKE_INSTALL_RPATH}:${CMAKE_INSTALL_PREFIX}/lib:${CMAKE_INSTALL_PREFIX}/modules)
@ -120,6 +125,10 @@ message(STATUS "Installing MaxScale to: ${CMAKE_INSTALL_PREFIX}/")
install(FILES server/MaxScale_template.cnf DESTINATION etc)
install(FILES ${ERRMSG} DESTINATION mysql)
install(FILES ${DOCS} DESTINATION Documentation)
install(FILES ${CMAKE_SOURCE_DIR}/COPYRIGHT DESTINATION ${CMAKE_INSTALL_PREFIX}/)
install(FILES ${CMAKE_SOURCE_DIR}/README DESTINATION ${CMAKE_INSTALL_PREFIX}/)
install(FILES ${CMAKE_SOURCE_DIR}/LICENSE DESTINATION ${CMAKE_INSTALL_PREFIX}/)
install(FILES ${CMAKE_SOURCE_DIR}/SETUP DESTINATION ${CMAKE_INSTALL_PREFIX}/)
install(DIRECTORY DESTINATION log)
if(${CMAKE_VERSION} VERSION_LESS 2.8.12)

45
CheckPlatform.cmake Normal file
View File

@ -0,0 +1,45 @@
#Checks for all the C system headers found in all the files
include(CheckFunctionExists)
include(CheckIncludeFiles)
check_include_files(arpa/inet.h HAVE_ARPA_INET)
check_include_files(crypt.h HAVE_CRYPT)
check_include_files(ctype.h HAVE_CTYPE)
check_include_files(dirent.h HAVE_DIRENT)
check_include_files(dlfcn.h HAVE_DLFCN)
check_include_files(errno.h HAVE_ERRNO)
check_include_files(execinfo.h HAVE_EXECINFO)
check_include_files(fcntl.h HAVE_FCNTL)
check_include_files(ftw.h HAVE_FTW)
check_include_files(getopt.h HAVE_GETOPT)
check_include_files(ini.h HAVE_INI)
check_include_files(math.h HAVE_MATH)
check_include_files(memlog.h HAVE_MEMLOG)
check_include_files(netdb.h HAVE_NETDB)
check_include_files(netinet/in.h HAVE_NETINET_IN)
check_include_files(openssl/aes.h HAVE_OPENSSL_AES)
check_include_files(openssl/sha.h HAVE_OPENSSL_SHA)
check_include_files(pthread.h HAVE_PTHREAD)
check_include_files(pwd.h HAVE_PWD)
check_include_files(rdtsc.h HAVE_RDTSC)
check_include_files(regex.h HAVE_REGEX)
check_include_files(signal.h HAVE_SIGNAL)
check_include_files(stdarg.h HAVE_STDARG)
check_include_files(stdbool.h HAVE_STDBOOL)
check_include_files(stdint.h HAVE_STDINT)
check_include_files(stdio.h HAVE_STDIO)
check_include_files(stdlib.h HAVE_STDLIB)
check_include_files(string.h HAVE_STRING)
check_include_files(strings.h HAVE_STRINGS)
check_include_files(sys/epoll.h HAVE_SYS_EPOLL)
check_include_files(sys/ioctl.h HAVE_SYS_IOCTL)
check_include_files(syslog.h HAVE_SYSLOG)
check_include_files(sys/param.h HAVE_SYS_PARAM)
check_include_files(sys/socket.h HAVE_SYS_SOCKET)
check_include_files(sys/stat.h HAVE_SYS_STAT)
check_include_files(sys/time.h HAVE_SYS_TIME)
check_include_files(sys/types.h HAVE_SYS_TYPES)
check_include_files(sys/un.h HAVE_SYS_UN)
check_include_files(time.h HAVE_TIME)
check_include_files(unistd.h HAVE_UNISTD)

Binary file not shown.

13
FindMySQLConfig.cmake Normal file
View File

@ -0,0 +1,13 @@
# This CMake file tries to find the the MySQL configuration tool
# The following variables are set:
# MYSQLCONFIG_FOUND - System has MySQL and the tool was found
# MYSQLCONFIG_EXECUTABLE - The MySQL configuration tool executable
find_program(MYSQLCONFIG_EXECUTABLE mysql_config)
if(MYSQLCONFIG_EXECUTABLE MATCHES "MYSQLCONFIG_EXECUTABLE-NOTFOUND")
message(FATAL_ERROR "Cannot find mysql_config.")
set(MYSQLCONFIG_FOUND FALSE CACHE INTERNAL "")
unset(MYSQLCONFIG_EXECUTABLE)
else()
message(STATUS "mysql_config found: ${MYSQLCONFIG_EXECUTABLE}")
set(MYSQLCONFIG_FOUND TRUE CACHE INTERNAL "")
endif()

2
debian/rules vendored
View File

@ -3,7 +3,7 @@
$(MAKE) ROOT_PATH=$(shell pwd) HOME="" clean
$(MAKE) ROOT_PATH=$(shell pwd) HOME="" depend
$(MAKE) ROOT_PATH=$(shell pwd) HOME=""
$(MAKE) DEST="$(shell pwd)/binaries" ROOT_PATH=$(shell pwd) HOME="" ERRMSG="/usr/share/mysql/english" EMBEDDED_LIB="/usr/lib/x86_64-linux-gnu/" install
$(MAKE) DEST="$(shell pwd)/binaries" ROOT_PATH=$(shell pwd) HOME="" ERRMSG="/usr/share/mysql/english" EMBEDDED_LIB="$(mysql_config --variable=pkglibdir)" install
dh $@
override_dh_usrlocal:
override_dh_auto_clean:

View File

@ -684,7 +684,11 @@ static int logmanager_write_log(
/** Length of session id */
int sesid_str_len;
/** 2 braces, 2 spaces and terminating char */
/**
* 2 braces, 2 spaces and terminating char
* If session id is stored to tls_log_info structure, allocate
* room for session id too.
*/
if (id == LOGFILE_TRACE && tls_log_info.li_sesid != 0)
{
sesid_str_len = 2+2+get_decimal_len(tls_log_info.li_sesid)+1;
@ -747,7 +751,6 @@ static int logmanager_write_log(
wp += strlen(wp);
}
#endif
/**
* Write timestamp with at most <timestamp_len> characters
* to wp.
@ -1848,10 +1851,9 @@ static char* fname_conf_get_suffix(
*
*
* Parameters:
* @param lm - <usage>
* <description>
* @param lm Log manager pointer
*
* @return
* @return succp true if succeed, otherwise false.
*
*
* @details If logfile is supposed to be located to shared memory
@ -1871,6 +1873,7 @@ static bool logfiles_init(
bool store_shmem;
bool write_syslog;
/** Open syslog immediately. Print pid of loggind process. */
if (syslog_id_str != NULL)
{
openlog(syslog_ident_str, LOG_PID | LOG_NDELAY, LOG_USER);
@ -2483,21 +2486,13 @@ static bool file_is_symlink(
* link name. Create block buffer for logfile.
*
* Parameters:
* @param logfile - <usage>
* <description>
*
* @param logfile_id - <usage>
* <description>
*
* @param logmanager - <usage>
* <description>
*
* @param store_shmem - <usage>
* <description>
* @param logfile log file
* @param logfile_id identifier for log file
* @param logmanager log manager pointer
* @param store_shmem flag to indicate whether log is physically written to shmem
* @param write_syslog flag to indicate whether log is also written to syslog
*
* @return true if succeed, false otherwise
*
*
*/
static bool logfile_init(
logfile_t* logfile,
@ -3084,7 +3079,7 @@ static int find_last_seqno(
for (i=0, p=parts; p->sp_string != NULL; i++, p=p->sp_next)
{
if (snstr != NULL && i == seqnoidx)
if (snstr != NULL && i == seqnoidx && strnlen(snstr,NAME_MAX) < NAME_MAX)
{
strncat(filename, snstr, NAME_MAX - 1); /*< add sequence number */
}

View File

@ -113,6 +113,7 @@ void skygw_logmanager_exit(void);
void skygw_log_done(void);
int skygw_log_write(logfile_id_t id, const char* format, ...);
int skygw_log_flush(logfile_id_t id);
void skygw_log_sync_all(void);
int skygw_log_rotate(logfile_id_t id);
int skygw_log_write_flush(logfile_id_t id, const char* format, ...);
int skygw_log_enable(logfile_id_t id);
@ -121,8 +122,6 @@ void skygw_log_sync_all(void);
EXTERN_C_BLOCK_END
void writebuf_clear(void* data);
const char* get_trace_prefix_default(void);
const char* get_trace_suffix_default(void);
const char* get_msg_prefix_default(void);

View File

@ -9,9 +9,9 @@ macro(set_maxscale_version)
#MaxScale version number
set(MAXSCALE_VERSION_MAJOR "1")
set(MAXSCALE_VERSION_MINOR "0")
set(MAXSCALE_VERSION_PATCH "3")
set(MAXSCALE_VERSION_PATCH "5")
set(MAXSCALE_VERSION_NUMERIC "${MAXSCALE_VERSION_MAJOR}.${MAXSCALE_VERSION_MINOR}.${MAXSCALE_VERSION_PATCH}")
set(MAXSCALE_VERSION "${MAXSCALE_VERSION_MAJOR}.${MAXSCALE_VERSION_MINOR}.${MAXSCALE_VERSION_PATCH}-rc")
set(MAXSCALE_VERSION "${MAXSCALE_VERSION_MAJOR}.${MAXSCALE_VERSION_MINOR}.${MAXSCALE_VERSION_PATCH}-unstable")
endmacro()
@ -21,7 +21,7 @@ macro(set_variables)
set(INSTALL_DIR "/usr/local/skysql/maxscale/" CACHE PATH "MaxScale installation directory.")
# Build type
set(BUILD_TYPE "None" CACHE STRING "Build type, possible values are:None, Debug, Optimized.")
set(BUILD_TYPE "None" CACHE STRING "Build type, possible values are:None, Debug, DebugSymbols, Optimized.")
# hostname or IP address of MaxScale's host
set(TEST_HOST "127.0.0.1" CACHE STRING "hostname or IP address of MaxScale's host")
@ -271,7 +271,7 @@ endmacro()
function(subdirs VAR DIRPATH)
if(${CMAKE_VERSION} VERSION_LESS 2.12 )
if(${CMAKE_VERSION} VERSION_LESS 2.8.12 )
set(COMP_VAR PATH)
else()
set(COMP_VAR DIRECTORY)

View File

@ -67,28 +67,18 @@ extern __thread log_info_t tls_log_info;
#define QTYPE_LESS_RESTRICTIVE_THAN_WRITE(t) (t<QUERY_TYPE_WRITE ? true : false)
static THD* get_or_create_thd_for_parsing(
MYSQL* mysql,
char* query_str);
static unsigned long set_client_flags(
MYSQL* mysql);
static bool create_parse_tree(
THD* thd);
static skygw_query_type_t resolve_query_type(
THD* thd);
static THD* get_or_create_thd_for_parsing(MYSQL* mysql, char* query_str);
static unsigned long set_client_flags(MYSQL* mysql);
static bool create_parse_tree(THD* thd);
static skygw_query_type_t resolve_query_type(THD* thd);
static bool skygw_stmt_causes_implicit_commit(
LEX* lex,
int* autocommit_stmt);
static int is_autocommit_stmt(
LEX* lex);
static int is_autocommit_stmt(LEX* lex);
static void parsing_info_set_plain_str(void* ptr, char* str);
static void* skygw_get_affected_tables(void* lexptr);
static void parsing_info_set_plain_str(void* ptr,
char* str);
/**
* Calls parser for the query includede in the buffer. Creates and adds parsing
@ -107,6 +97,11 @@ skygw_query_type_t query_classifier_get_type(
ss_info_dassert(querybuf != NULL, ("querybuf is NULL"));
if (querybuf == NULL)
{
succp = false;
goto retblock;
}
/** Create parsing info for the query and store it to buffer */
succp = query_is_parsed(querybuf);
@ -133,6 +128,7 @@ skygw_query_type_t query_classifier_get_type(
}
}
}
retblock:
return qtype;
}
@ -158,7 +154,7 @@ bool parse_query (
/** Do not parse without releasing previous parse info first */
ss_dassert(!query_is_parsed(querybuf));
if (query_is_parsed(querybuf))
if (querybuf == NULL || query_is_parsed(querybuf))
{
return false;
}
@ -225,7 +221,7 @@ bool query_is_parsed(
GWBUF* buf)
{
CHK_GWBUF(buf);
return GWBUF_IS_PARSED(buf);
return (buf != NULL && GWBUF_IS_PARSED(buf));
}
@ -957,13 +953,25 @@ return_rc:
return rc;
}
#if defined(NOT_USED)
char* skygw_query_classifier_get_stmtname(
MYSQL* mysql)
GWBUF* buf)
{
return ((THD *)(mysql->thd))->lex->prepared_stmt_name.str;
MYSQL* mysql;
if (buf == NULL ||
buf->gwbuf_bufobj == NULL ||
buf->gwbuf_bufobj->bo_data == NULL ||
(mysql = (MYSQL *)((parsing_info_t *)buf->gwbuf_bufobj->bo_data)->pi_handle) == NULL ||
mysql->thd == NULL ||
(THD *)(mysql->thd))->lex == NULL ||
(THD *)(mysql->thd))->lex->prepared_stmt_name == NULL)
{
return NULL;
}
return ((THD *)(mysql->thd))->lex->prepared_stmt_name.str;
}
#endif
/**
* Get the parse tree from parsed querybuf.
@ -979,7 +987,7 @@ LEX* get_lex(GWBUF* querybuf)
MYSQL* mysql;
THD* thd;
if (!GWBUF_IS_PARSED(querybuf))
if (querybuf == NULL || !GWBUF_IS_PARSED(querybuf))
{
return NULL;
}
@ -994,11 +1002,9 @@ LEX* get_lex(GWBUF* querybuf)
if ((mysql = (MYSQL *)pi->pi_handle) == NULL ||
(thd = (THD *)mysql->thd) == NULL)
{
ss_dassert(mysql != NULL &&
thd != NULL);
ss_dassert(mysql != NULL && thd != NULL);
return NULL;
}
return thd->lex;
}
@ -1009,7 +1015,7 @@ LEX* get_lex(GWBUF* querybuf)
* @param thd Pointer to a valid THD
* @return Pointer to the head of the TABLE_LIST chain or NULL in case of an error
*/
void* skygw_get_affected_tables(void* lexptr)
static void* skygw_get_affected_tables(void* lexptr)
{
LEX* lex = (LEX*)lexptr;
@ -1027,8 +1033,8 @@ void* skygw_get_affected_tables(void* lexptr)
/**
* Reads the parsetree and lists all the affected tables and views in the query.
* In the case of an error, the size of the table is set to zero and no memory is allocated.
* The caller must free the allocated memory.
* In the case of an error, the size of the table is set to zero and no memory
* is allocated. The caller must free the allocated memory.
*
* @param querybuf GWBUF where the table names are extracted from
* @param tblsize Pointer where the number of tables is written
@ -1043,7 +1049,9 @@ char** skygw_get_table_names(GWBUF* querybuf,int* tblsize, bool fullnames)
char **tables = NULL,
**tmp = NULL;
if( (lex = get_lex(querybuf)) == NULL ||
if(querybuf == NULL ||
tblsize == NULL ||
(lex = get_lex(querybuf)) == NULL ||
lex->current_select == NULL)
{
goto retblock;
@ -1051,39 +1059,45 @@ char** skygw_get_table_names(GWBUF* querybuf,int* tblsize, bool fullnames)
lex->current_select = lex->all_selects_list;
while(lex->current_select){
while(lex->current_select)
{
tbl = (TABLE_LIST*)skygw_get_affected_tables(lex);
while (tbl)
{
if(i >= currtblsz){
if (i >= currtblsz)
{
tmp = (char**)malloc(sizeof(char*)*(currtblsz*2+1));
if(tmp){
if(currtblsz > 0){
if(tmp)
{
if(currtblsz > 0)
{
int x;
for(x = 0;x<currtblsz;x++){
for(x = 0; x<currtblsz; x++)
{
tmp[x] = tables[x];
}
free(tables);
}
tables = tmp;
currtblsz = currtblsz*2 + 1;
}
}
}
if(tmp != NULL){
if(tmp != NULL)
{
char *catnm = NULL;
if(fullnames)
{
if(tbl->db && strcmp(tbl->db,"skygw_virtual") != 0)
if (tbl->db &&
strcmp(tbl->db,"skygw_virtual") != 0)
{
catnm = (char*)calloc(strlen(tbl->db) + strlen(tbl->table_name) + 2,sizeof(char));
catnm = (char*)calloc(strlen(tbl->db) +
strlen(tbl->table_name) +
2,
sizeof(char));
strcpy(catnm,tbl->db);
strcat(catnm,".");
strcat(catnm,tbl->table_name);
@ -1098,13 +1112,11 @@ char** skygw_get_table_names(GWBUF* querybuf,int* tblsize, bool fullnames)
{
tables[i++] = strdup(tbl->table_name);
}
tbl=tbl->next_local;
}
}
} /*< while (tbl) */
lex->current_select = lex->current_select->next_select_in_list();
}
} /*< while(lex->current_select) */
retblock:
*tblsize = i;
return tables;
@ -1119,35 +1131,49 @@ char* skygw_get_created_table_name(GWBUF* querybuf)
{
LEX* lex;
if((lex = get_lex(querybuf)) == NULL)
if(querybuf == NULL || (lex = get_lex(querybuf)) == NULL)
{
return NULL;
}
if (lex->create_last_non_select_table &&
lex->create_last_non_select_table->table_name){
lex->create_last_non_select_table->table_name)
{
char* name = strdup(lex->create_last_non_select_table->table_name);
return name;
}else{
}
else
{
return NULL;
}
}
/**
* Checks whether the query is a "real" query ie. SELECT,UPDATE,INSERT,DELETE or any variation of these.
* Queries that affect the underlying database are not considered as real queries and the queries that target
* specific row or variable data are regarded as the real queries.
* Checks whether the query is a "real" query ie. SELECT,UPDATE,INSERT,DELETE or
* any variation of these. Queries that affect the underlying database are not
* considered as real queries and the queries that target specific row or
* variable data are regarded as the real queries.
*
* @param GWBUF to analyze
*
* @return true if the query is a real query, otherwise false
*/
bool skygw_is_real_query(GWBUF* querybuf)
{
LEX* lex = get_lex(querybuf);
if(lex){
bool succp;
LEX* lex;
if (querybuf == NULL ||
(lex = get_lex(querybuf)) == NULL)
{
succp = false;
goto retblock;
}
switch(lex->sql_command) {
case SQLCOM_SELECT:
return lex->all_selects_list->table_list.elements > 0;
succp = lex->all_selects_list->table_list.elements > 0;
goto retblock;
break;
case SQLCOM_UPDATE:
case SQLCOM_INSERT:
case SQLCOM_INSERT_SELECT:
@ -1157,12 +1183,16 @@ bool skygw_is_real_query(GWBUF* querybuf)
case SQLCOM_REPLACE_SELECT:
case SQLCOM_PREPARE:
case SQLCOM_EXECUTE:
return true;
succp = true;
goto retblock;
break;
default:
return false;
succp = false;
goto retblock;
break;
}
}
return false;
retblock:
return succp;
}
@ -1175,8 +1205,9 @@ bool is_drop_table_query(GWBUF* querybuf)
{
LEX* lex;
return (lex = get_lex(querybuf)) != NULL &&
lex->sql_command == SQLCOM_DROP_TABLE;
return (querybuf != NULL &&
(lex = get_lex(querybuf)) != NULL &&
lex->sql_command == SQLCOM_DROP_TABLE);
}
/**
@ -1202,7 +1233,8 @@ char* skygw_get_canonical(
Item* item;
char* querystr;
if (!GWBUF_IS_PARSED(querybuf))
if (querybuf == NULL ||
!GWBUF_IS_PARSED(querybuf))
{
querystr = NULL;
goto retblock;
@ -1339,7 +1371,11 @@ retblock:
void parsing_info_done(
void* ptr)
{
parsing_info_t* pi = (parsing_info_t *)ptr;
parsing_info_t* pi;
if (ptr)
{
pi = (parsing_info_t *)ptr;
if (pi->pi_handle != NULL)
{
@ -1359,6 +1395,7 @@ void parsing_info_done(
}
free(pi);
}
}
/**
* Add plain text query string to parsing info.

View File

@ -83,13 +83,13 @@ typedef struct parsing_info_st {
skygw_query_type_t query_classifier_get_type(GWBUF* querybuf);
/** Free THD context and close MYSQL */
char* skygw_query_classifier_get_stmtname(MYSQL* mysql);
#if defined(NOT_USED)
char* skygw_query_classifier_get_stmtname(GWBUF* buf);
#endif
char* skygw_get_created_table_name(GWBUF* querybuf);
bool is_drop_table_query(GWBUF* querybuf);
bool skygw_is_real_query(GWBUF* querybuf);
void* skygw_get_affected_tables(void* lexptr);
char** skygw_get_table_names(GWBUF* querybuf, int* tblsize, bool fullnames);
char** skygw_get_database_names(GWBUF* querybuf,int* size);
char* skygw_get_canonical(GWBUF* querybuf);
bool parse_query (GWBUF* querybuf);
parsing_info_t* parsing_info_init(void (*donefun)(void *));

View File

@ -217,6 +217,7 @@ static void sigterm_handler (int i) {
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"MaxScale received signal SIGTERM. Exiting.")));
skygw_log_sync_all();
shutdown_server();
}
@ -228,6 +229,7 @@ sigint_handler (int i)
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"MaxScale received signal SIGINT. Shutting down.")));
skygw_log_sync_all();
shutdown_server();
fprintf(stderr, "\n\nShutting down MaxScale\n\n");
}
@ -270,6 +272,8 @@ sigfatal_handler (int i)
}
}
skygw_log_sync_all();
/* re-raise signal to enforce core dump */
fprintf(stderr, "\n\nWriting core dump\n");
signal_set(i, SIG_DFL);
@ -1527,7 +1531,7 @@ int main(int argc, char **argv)
free(log_context);
}
/*<
/**
* Init Log Manager for MaxScale.
* If $MAXSCALE_HOME is set then write the logs into $MAXSCALE_HOME/log.
* The skygw_logmanager_init expects to take arguments as passed to main
@ -1538,22 +1542,27 @@ int main(int argc, char **argv)
char buf[1024];
char *argv[8];
bool succp;
/** Set log directory under $MAXSCALE_HOME/log */
sprintf(buf, "%s/log", home_dir);
if(mkdir(buf, 0777) != 0){
if(errno != EEXIST){
if(mkdir(buf, 0777) != 0)
{
if(errno != EEXIST)
{
fprintf(stderr,
"Error: Cannot create log directory: %s\n",buf);
"Error: Cannot create log directory: %s\n",
buf);
goto return_main;
}
}
argv[0] = "MaxScale";
argv[1] = "-j";
argv[2] = buf;
if (logtofile)
{
argv[3] = "-l"; /*< write to syslog */
/** Logs that should be syslogged */
argv[4] = "LOGFILE_MESSAGE,LOGFILE_ERROR"
"LOGFILE_DEBUG,LOGFILE_TRACE";
argv[5] = NULL;
@ -1562,9 +1571,9 @@ int main(int argc, char **argv)
else
{
argv[3] = "-s"; /*< store to shared memory */
argv[4] = "LOGFILE_DEBUG,LOGFILE_TRACE"; /*< ..these logs to shm */
argv[4] = "LOGFILE_DEBUG,LOGFILE_TRACE"; /*< to shm */
argv[5] = "-l"; /*< write to syslog */
argv[6] = "LOGFILE_MESSAGE,LOGFILE_ERROR"; /*< ..these logs to syslog */
argv[6] = "LOGFILE_MESSAGE,LOGFILE_ERROR"; /*< to syslog */
argv[7] = NULL;
succp = skygw_logmanager_init(7, argv);
}
@ -1575,8 +1584,7 @@ int main(int argc, char **argv)
goto return_main;
}
}
/*<
/**
* Resolve the full pathname for configuration file and check for
* read accessibility.
*/

View File

@ -675,6 +675,14 @@ poll_set_maxwait(unsigned int maxwait)
* back of the queue so that other DCB's will have a share of the threads to
* execute events for them.
*
* Including session id to log entries depends on this function. Assumption is
* that when maxscale thread starts processing of an event it processes one
* and only one session until it returns from this function. Session id is
* read to thread's local storage in macro LOGIF_MAYBE(...) and reset back
* to zero just before returning in LOGIF(...) macro.
* Thread local storage (tls_log_info_t) follows thread and is accessed every
* time log is written to particular log.
*
* @param thread_id The thread ID of the calling thread
* @return 0 if no DCB's have been processed
*/
@ -798,7 +806,7 @@ unsigned long qtime;
simple_mutex_unlock(&dcb->dcb_write_lock);
#else
atomic_add(&pollStats.n_write, 1);
/** Read session id to thread's local storage */
LOGIF_MAYBE(LT, (dcb_get_ses_log_info(
dcb,
&tls_log_info.li_sesid,
@ -852,6 +860,7 @@ unsigned long qtime;
dcb,
dcb->fd)));
atomic_add(&pollStats.n_read, 1);
/** Read session id to thread's local storage */
LOGIF_MAYBE(LT, (dcb_get_ses_log_info(
dcb,
&tls_log_info.li_sesid,
@ -891,6 +900,7 @@ unsigned long qtime;
strerror(eno))));
}
atomic_add(&pollStats.n_error, 1);
/** Read session id to thread's local storage */
LOGIF_MAYBE(LT, (dcb_get_ses_log_info(
dcb,
&tls_log_info.li_sesid,
@ -919,6 +929,7 @@ unsigned long qtime;
{
dcb->flags |= DCBF_HUNG;
spinlock_release(&dcb->dcb_initlock);
/** Read session id to thread's local storage */
LOGIF_MAYBE(LT, (dcb_get_ses_log_info(
dcb,
&tls_log_info.li_sesid,
@ -951,6 +962,7 @@ unsigned long qtime;
{
dcb->flags |= DCBF_HUNG;
spinlock_release(&dcb->dcb_initlock);
/** Read session id to thread's local storage */
LOGIF_MAYBE(LT, (dcb_get_ses_log_info(
dcb,
&tls_log_info.li_sesid,
@ -1016,6 +1028,7 @@ unsigned long qtime;
}
}
dcb->evq.processing = 0;
/** Reset session id from thread's local storage */
LOGIF(LT, tls_log_info.li_sesid = 0);
spinlock_release(&pollqlock);

View File

@ -283,6 +283,9 @@ char *home, buf[1024];
result += test4();
result += test5();
/* Add the default user back so other tests can use it */
admin_add_user("admin", "skysql");
exit(result);
}

View File

@ -60,6 +60,8 @@
#include <dcb.h>
#include <sys/time.h>
#include <poll.h>
#include <mysql_client_server_protocol.h>
#include <housekeeper.h>
#define MYSQL_COM_QUIT 0x01
#define MYSQL_COM_INITDB 0x02
@ -73,6 +75,11 @@
#define REPLY_TIMEOUT_SECOND 5
#define REPLY_TIMEOUT_MILLISECOND 1
#define PARENT 0
#define CHILD 1
#define PTR_IS_RESULTSET(b) (b[0] == 0x01 && b[1] == 0x0 && b[2] == 0x0 && b[3] == 0x01)
#define PTR_IS_EOF(b) (b[4] == 0xfe)
static unsigned char required_packets[] = {
MYSQL_COM_QUIT,
@ -153,10 +160,9 @@ typedef struct {
FILTER_DEF* dummy_filterdef;
int active; /* filter is active? */
int waiting; /* if the client is waiting for a reply */
int replies; /* Number of queries received */
int min_replies; /* Minimum number of replies to receive
* before forwarding the packet to the client*/
bool waiting[2]; /* if the client is waiting for a reply */
int eof[2];
int replies[2]; /* Number of queries received */
DCB *branch_dcb; /* Client DCB for "branch" service */
SESSION *branch_session;/* The branch service session */
int n_duped; /* Number of duplicated queries */
@ -202,6 +208,83 @@ static int hcfn(
return strcmp(i1,i2);
}
static void
orphan_free(void* data)
{
spinlock_acquire(&orphanLock);
orphan_session_t *ptr = allOrphans, *finished = NULL, *tmp = NULL;
#ifdef SS_DEBUG
int o_stopping = 0, o_ready = 0, o_freed = 0;
#endif
while(ptr)
{
if(ptr->session->state == SESSION_STATE_TO_BE_FREED)
{
if(ptr == allOrphans)
{
tmp = ptr;
allOrphans = ptr->next;
}
else
{
tmp = allOrphans;
while(tmp && tmp->next != ptr)
tmp = tmp->next;
if(tmp)
{
tmp->next = ptr->next;
tmp = ptr;
}
}
}
#ifdef SS_DEBUG
else if(ptr->session->state == SESSION_STATE_STOPPING)
{
o_stopping++;
}
else if(ptr->session->state == SESSION_STATE_ROUTER_READY)
{
o_ready++;
}
#endif
ptr = ptr->next;
if(tmp)
{
tmp->next = finished;
finished = tmp;
tmp = NULL;
}
}
spinlock_release(&orphanLock);
#ifdef SS_DEBUG
if(o_stopping + o_ready > 0)
skygw_log_write(LOGFILE_DEBUG, "tee.c: %d orphans in "
"SESSION_STATE_STOPPING, %d orphans in "
"SESSION_STATE_ROUTER_READY. ", o_stopping, o_ready);
#endif
while(finished)
{
o_freed++;
tmp = finished;
finished = finished->next;
tmp->session->service->router->freeSession(
tmp->session->service->router_instance,
tmp->session->router_session);
tmp->session->state = SESSION_STATE_FREE;
free(tmp->session);
free(tmp);
}
#ifdef SS_DEBUG
skygw_log_write(LOGFILE_DEBUG, "tee.c: %d orphans freed.", o_freed);
#endif
}
/**
* Implementation of the mandatory version entry point
*
@ -221,6 +304,7 @@ void
ModuleInit()
{
spinlock_init(&orphanLock);
hktask_add("tee orphan cleanup",orphan_free,NULL,15);
}
/**
@ -490,7 +574,6 @@ char *remote, *userName;
}
ses->tail = *dummy_upstream;
my_session->min_replies = 2;
my_session->branch_session = ses;
my_session->branch_dcb = dcb;
my_session->dummy_filterdef = dummy;
@ -605,78 +688,6 @@ SESSION* ses = my_session->branch_session;
}
free(session);
spinlock_acquire(&orphanLock);
orphan_session_t *ptr = allOrphans, *finished = NULL,*tmp = NULL;
#ifdef SS_DEBUG
int o_stopping = 0, o_ready = 0,o_freed = 0;
#endif
while(ptr)
{
if(ptr->session->state == SESSION_STATE_TO_BE_FREED)
{
if(ptr == allOrphans)
{
tmp = ptr;
allOrphans = ptr->next;
}
else
{
tmp = allOrphans;
while(tmp && tmp->next != ptr)
tmp = tmp->next;
if(tmp)
{
tmp->next = ptr->next;
tmp = ptr;
}
}
}
#ifdef SS_DEBUG
else if(ptr->session->state == SESSION_STATE_STOPPING)
{
o_stopping++;
}
else if(ptr->session->state == SESSION_STATE_ROUTER_READY)
{
o_ready++;
}
#endif
ptr = ptr->next;
if(tmp)
{
tmp->next = finished;
finished = tmp;
tmp = NULL;
}
}
spinlock_release(&orphanLock);
#ifdef SS_DEBUG
if(o_stopping + o_ready > 0)
skygw_log_write(LOGFILE_DEBUG,"tee.c: %d orphans in "
"SESSION_STATE_STOPPING, %d orphans in "
"SESSION_STATE_ROUTER_READY. ",o_stopping,o_ready);
#endif
while(finished)
{
#ifdef SS_DEBUG
skygw_log_write(LOGFILE_DEBUG,"tee.c: %d orphans freed.",++o_freed);
#endif
tmp = finished;
finished = finished->next;
tmp->session->service->router->freeSession(
tmp->session->service->router_instance,
tmp->session->router_session);
tmp->session->state = SESSION_STATE_FREE;
free(tmp->session);
free(tmp);
}
return;
}
/**
@ -761,8 +772,6 @@ GWBUF *clone = NULL;
(my_instance->nomatch == NULL ||
regexec(&my_instance->nore,ptr,0,NULL, 0) != 0))
{
char *dummy;
length = modutil_MySQL_query_len(queue, &residual);
clone = gwbuf_clone_all(queue);
my_session->residual = residual;
@ -776,7 +785,12 @@ GWBUF *clone = NULL;
}
/* Pass the query downstream */
my_session->replies = 0;
ss_dassert(my_session->tee_replybuf == NULL);
memset(my_session->replies,0,2*sizeof(int));
memset(my_session->eof,0,2*sizeof(int));
memset(my_session->waiting,0,2*sizeof(bool));
rval = my_session->down.routeQuery(my_session->down.instance,
my_session->down.session,
queue);
@ -809,9 +823,42 @@ GWBUF *clone = NULL;
}
my_session->n_rejected++;
}
return rval;
}
/**
* Scans the GWBUF for EOF packets. If two packets for this session have been found
* from either the parent or the child branch, mark the response set from that branch as over.
* @param session The Tee filter session
* @param branch Parent or child branch
* @param reply Buffer to scan
*/
void
scan_resultset(TEE_SESSION *session, int branch, GWBUF *reply)
{
unsigned char* ptr = (unsigned char*) reply->start;
unsigned char* end = (unsigned char*) reply->end;
int pktlen = 0;
while(ptr < end)
{
pktlen = gw_mysql_get_byte3(ptr) + 4;
if(PTR_IS_EOF(ptr))
{
session->eof[branch]++;
if(session->eof[branch] == 2)
{
session->waiting[branch] = false;
session->eof[branch] = 0;
return;
}
}
ptr += pktlen;
}
}
/**
* The clientReply entry point. This is passed the response buffer
@ -826,17 +873,33 @@ GWBUF *clone = NULL;
static int
clientReply (FILTER* instance, void *session, GWBUF *reply)
{
int rc;
int rc, branch;
TEE_SESSION *my_session = (TEE_SESSION *) session;
spinlock_acquire(&my_session->tee_lock);
ss_dassert(my_session->active);
my_session->replies++;
if (my_session->tee_replybuf == NULL &&
instance != NULL)
branch = instance == NULL ? CHILD : PARENT;
unsigned char *ptr = (unsigned char*)reply->start;
if(my_session->replies[branch] == 0)
{
if(PTR_IS_RESULTSET(ptr))
{
my_session->waiting[branch] = true;
my_session->eof[branch] = 0;
}
}
if(my_session->waiting[branch])
{
scan_resultset(my_session,branch,reply);
}
if(branch == PARENT)
{
ss_dassert(my_session->tee_replybuf == NULL)
my_session->tee_replybuf = reply;
}
else
@ -844,15 +907,17 @@ clientReply (FILTER* instance, void *session, GWBUF *reply)
gwbuf_free(reply);
}
if((my_session->branch_session == NULL ||
my_session->replies >= my_session->min_replies) &&
my_session->tee_replybuf != NULL)
my_session->replies[branch]++;
if(my_session->tee_replybuf != NULL &&
(my_session->branch_session == NULL ||
my_session->waiting[PARENT] ||
(!my_session->waiting[CHILD] && !my_session->waiting[PARENT])))
{
rc = my_session->up.clientReply (
my_session->up.instance,
my_session->up.session,
my_session->tee_replybuf);
my_session->replies = 0;
my_session->tee_replybuf = NULL;
}
else

View File

@ -15,6 +15,7 @@ router=readwritesplit
servers=server1,server2,server3,server4
user=maxuser
passwd=maxpwd
max_slave_connections=100%
[DBShard Router]
type=service
@ -29,6 +30,7 @@ router=readwritesplit
servers=server1,server2,server3,server4
user=maxuser
passwd=maxpwd
max_slave_connections=100%
filters=Hint