From 0967a09b5cead742717eeab2b51e53e76d64a01f Mon Sep 17 00:00:00 2001 From: VilhoRaatikka Date: Tue, 9 Dec 2014 14:26:33 +0200 Subject: [PATCH 01/16] Added severity keyword to error log messages --- server/core/service.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/core/service.c b/server/core/service.c index 3b08f4274..fc2d128fc 100644 --- a/server/core/service.c +++ b/server/core/service.c @@ -765,7 +765,7 @@ int n = 0; if ((flist = (FILTER_DEF **)malloc(sizeof(FILTER_DEF *))) == NULL) { LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, - "Out of memory adding filters to service.\n"))); + "Error : Out of memory adding filters to service.\n"))); return; } ptr = strtok_r(filters, "|", &brkt); @@ -776,14 +776,14 @@ int n = 0; (n + 1) * sizeof(FILTER_DEF *))) == NULL) { LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, - "Out of memory adding filters to service.\n"))); + "Error : Out of memory adding filters to service.\n"))); return; } if ((flist[n-1] = filter_find(trim(ptr))) == NULL) { LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, - "Unable to find filter '%s' for service '%s'\n", + "Warning : Unable to find filter '%s' for service '%s'\n", trim(ptr), service->name ))); n--; From 2a623c6ec3343cf60c5b34dd4bc5799716568c0b Mon Sep 17 00:00:00 2001 From: Markus Makela Date: Wed, 10 Dec 2014 18:05:58 +0200 Subject: [PATCH 02/16] Fixed a typo in config.c --- server/core/config.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/core/config.c b/server/core/config.c index 61d7a6b98..a82827257 100644 --- a/server/core/config.c +++ b/server/core/config.c @@ -1814,7 +1814,7 @@ config_truth_value(char *str) { return 1; } - if (strcasecmp(str, "flase") == 0 || strcasecmp(str, "off") == 0) + if (strcasecmp(str, "false") == 0 || strcasecmp(str, "off") == 0) { return 0; } From c75ba90f6faf1dbd7e867e143b3c208793f36dd1 Mon Sep 17 00:00:00 2001 From: Markus Makela Date: Thu, 11 Dec 2014 09:43:59 +0200 Subject: [PATCH 03/16] Fixes to Coverity defect 84478 and to 72759 which has reappeared. --- server/core/gateway.c | 2 +- server/core/secrets.c | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/server/core/gateway.c b/server/core/gateway.c index 45f493adc..ee2290e28 100644 --- a/server/core/gateway.c +++ b/server/core/gateway.c @@ -572,7 +572,7 @@ return_succp: static bool resolve_maxscale_homedir( char** p_home_dir) { - bool succp; + bool succp = false; char* tmp; char* tmp2; char* log_context = NULL; diff --git a/server/core/secrets.c b/server/core/secrets.c index 57325a5ff..32fe59467 100644 --- a/server/core/secrets.c +++ b/server/core/secrets.c @@ -252,6 +252,7 @@ MAXKEYS key; "Error : failed opening /dev/random. Error %d, %s.", errno, strerror(errno)))); + close(fd); return 1; } @@ -260,6 +261,7 @@ MAXKEYS key; LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, "Error : failed to read /dev/random."))); + close(fd); close(randfd); return 1; } From 63def8d002c6a39980d7b075721ea358425d7da7 Mon Sep 17 00:00:00 2001 From: VilhoRaatikka Date: Thu, 11 Dec 2014 15:28:41 +0200 Subject: [PATCH 04/16] Fix to bug #644, http://bugs.mariadb.com/show_bug.cgi?id=644 Initialized the lock variable in gwbuf_clone --- server/core/buffer.c | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/server/core/buffer.c b/server/core/buffer.c index 8f4989a7e..991e78b1b 100644 --- a/server/core/buffer.c +++ b/server/core/buffer.c @@ -179,7 +179,7 @@ gwbuf_clone(GWBUF *buf) { GWBUF *rval; - if ((rval = (GWBUF *)malloc(sizeof(GWBUF))) == NULL) + if ((rval = (GWBUF *)calloc(1,sizeof(GWBUF))) == NULL) { ss_dassert(rval != NULL); LOGIF(LE, (skygw_log_write_flush( @@ -194,11 +194,8 @@ GWBUF *rval; rval->start = buf->start; rval->end = buf->end; rval->gwbuf_type = buf->gwbuf_type; - rval->properties = NULL; - rval->hint = NULL; rval->gwbuf_info = buf->gwbuf_info; rval->gwbuf_bufobj = buf->gwbuf_bufobj; - rval->next = NULL; rval->tail = rval; CHK_GWBUF(rval); return rval; From 573cf6040c8e33dff0c578cbae9857369afb71a0 Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Thu, 11 Dec 2014 13:45:21 +0000 Subject: [PATCH 05/16] Check for the duplciate service being the same as the service that is using the tee filter. I.e. trap simple recursive definitions. --- server/modules/filter/tee.c | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/server/modules/filter/tee.c b/server/modules/filter/tee.c index 809c8c441..ec1807ec7 100644 --- a/server/modules/filter/tee.c +++ b/server/modules/filter/tee.c @@ -280,6 +280,13 @@ TEE_INSTANCE *my_instance = (TEE_INSTANCE *)instance; TEE_SESSION *my_session; char *remote, *userName; + if (strcmp(my_instance->service->name, session->service->name) == 0) + { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "%s: Recursive use of tee filter in service.", + session->service->name))); + return NULL; + } if ((my_session = calloc(1, sizeof(TEE_SESSION))) != NULL) { my_session->active = 1; From 89c9bb8ade7cd52f85da5ad6be1ced42c2d539b8 Mon Sep 17 00:00:00 2001 From: Timofey Turenko Date: Thu, 11 Dec 2014 22:22:52 +0200 Subject: [PATCH 06/16] change version from 'beta' to 'RC' --- macros.cmake | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/macros.cmake b/macros.cmake index 09e0e8146..41e5bd049 100644 --- a/macros.cmake +++ b/macros.cmake @@ -11,7 +11,7 @@ macro(set_maxscale_version) set(MAXSCALE_VERSION_MINOR "0") set(MAXSCALE_VERSION_PATCH "2") set(MAXSCALE_VERSION_NUMERIC "${MAXSCALE_VERSION_MAJOR}.${MAXSCALE_VERSION_MINOR}.${MAXSCALE_VERSION_PATCH}") - set(MAXSCALE_VERSION "${MAXSCALE_VERSION_MAJOR}.${MAXSCALE_VERSION_MINOR}.${MAXSCALE_VERSION_PATCH}-beta") + set(MAXSCALE_VERSION "${MAXSCALE_VERSION_MAJOR}.${MAXSCALE_VERSION_MINOR}.${MAXSCALE_VERSION_PATCH}-RC") endmacro() From 680f7ef2c157e8788ccbc9cbcbb22b3d3235b30a Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Fri, 12 Dec 2014 09:45:52 +0000 Subject: [PATCH 07/16] Add fix for duplicating COM_QUIT, COM_INIT_DB and COM_CHANGE_USER packets --- server/modules/filter/tee.c | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/server/modules/filter/tee.c b/server/modules/filter/tee.c index ec1807ec7..672f8747b 100644 --- a/server/modules/filter/tee.c +++ b/server/modules/filter/tee.c @@ -41,6 +41,7 @@ * Date Who Description * 20/06/2014 Mark Riddoch Initial implementation * 24/06/2014 Mark Riddoch Addition of support for multi-packet queries + * 12/12/2014 Mark Riddoch Add support for otehr packet types * * @endverbatim */ @@ -58,6 +59,10 @@ #include #include +#define MYSQL_COM_QUIT 0x01 +#define MYSQL_COM_INITDB 0x02 +#define MYSQL_COM_CHANGE_USER 0x11 + /** Defined in log_manager.cc */ extern int lm_enabled_logfiles_bitmask; extern size_t log_ses_count[]; @@ -128,6 +133,7 @@ typedef struct { int residual; /* Any outstanding SQL text */ } TEE_SESSION; +static int packet_is_required(GWBUF *queue); /** * Implementation of the mandatory version entry point * @@ -429,6 +435,10 @@ GWBUF *clone = NULL; } free(ptr); } + else if (packet_is_required(queue)) + { + clone = gwbuf_clone(queue); + } /* Pass the query downstream */ rval = my_session->down.routeQuery(my_session->down.instance, @@ -484,3 +494,25 @@ TEE_SESSION *my_session = (TEE_SESSION *)fsession; my_session->n_rejected); } } + +/** + * Determine if the packet is a command that must be sent to the branch + * to maintain the session consistancy. These are COM_INIT_DB, + * COM_CHANGE_USER and COM_QUIT packets. + * + * @param queue The buffer to check + * @return non-zero if the packet should be sent to the branch + */ +static int +packet_is_required(GWBUF *queue) +{ +uint8_t *ptr; + + ptr = GWBUF_DATA(queue); + if (GWBUF_LENGTH(queue) > 4 && + (ptr[4] == MYSQL_COM_QUIT || ptr[4] == MYSQL_COM_INITDB + || ptr[4] == MYSQL_COM_CHANGE_USER)) + return 1; + else + return 0; +} From 1fc063b437d3fce2c3e62f93af36700c680a6ec8 Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Fri, 12 Dec 2014 10:02:24 +0000 Subject: [PATCH 08/16] Added pepared statements and field lsit support to the command types to duplciate --- server/modules/filter/tee.c | 37 ++++++++++++++++++++++++++++--------- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/server/modules/filter/tee.c b/server/modules/filter/tee.c index 672f8747b..ea3027012 100644 --- a/server/modules/filter/tee.c +++ b/server/modules/filter/tee.c @@ -59,9 +59,28 @@ #include #include -#define MYSQL_COM_QUIT 0x01 -#define MYSQL_COM_INITDB 0x02 -#define MYSQL_COM_CHANGE_USER 0x11 +#define MYSQL_COM_QUIT 0x01 +#define MYSQL_COM_INITDB 0x02 +#define MYSQL_COM_FIELD_LIST 0x04 +#define MYSQL_COM_CHANGE_USER 0x11 +#define MYSQL_COM_STMT_PREPARE 0x16 +#define MYSQL_COM_STMT_EXECUTE 0x17 +#define MYSQL_COM_STMT_SEND_LONG_DATA 0x18 +#define MYSQL_COM_STMT_CLOSE 0x19 +#define MYSQL_COM_STMT_RESET 0x1a + + +static unsigned char required_packets[] = { + MYSQL_COM_QUIT, + MYSQL_COM_INITDB, + MYSQL_COM_FIELD_LIST, + MYSQL_COM_CHANGE_USER, + MYSQL_COM_STMT_PREPARE, + MYSQL_COM_STMT_EXECUTE, + MYSQL_COM_STMT_SEND_LONG_DATA, + MYSQL_COM_STMT_CLOSE, + MYSQL_COM_STMT_RESET, + 0 }; /** Defined in log_manager.cc */ extern int lm_enabled_logfiles_bitmask; @@ -507,12 +526,12 @@ static int packet_is_required(GWBUF *queue) { uint8_t *ptr; +int i; ptr = GWBUF_DATA(queue); - if (GWBUF_LENGTH(queue) > 4 && - (ptr[4] == MYSQL_COM_QUIT || ptr[4] == MYSQL_COM_INITDB - || ptr[4] == MYSQL_COM_CHANGE_USER)) - return 1; - else - return 0; + if (GWBUF_LENGTH(queue) > 4) + for (i = 0; required_packets[i]; i++) + if (ptr[4] == required_packets[i]) + return 1; + return 0; } From d607d3ec7c9937967b29d1ca5a6b22202e65ff4c Mon Sep 17 00:00:00 2001 From: Timofey Turenko Date: Fri, 12 Dec 2014 13:07:51 +0200 Subject: [PATCH 09/16] change capital -RC to lowcase -rc in the package name --- macros.cmake | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/macros.cmake b/macros.cmake index 41e5bd049..de842ea5c 100644 --- a/macros.cmake +++ b/macros.cmake @@ -11,7 +11,7 @@ macro(set_maxscale_version) set(MAXSCALE_VERSION_MINOR "0") set(MAXSCALE_VERSION_PATCH "2") set(MAXSCALE_VERSION_NUMERIC "${MAXSCALE_VERSION_MAJOR}.${MAXSCALE_VERSION_MINOR}.${MAXSCALE_VERSION_PATCH}") - set(MAXSCALE_VERSION "${MAXSCALE_VERSION_MAJOR}.${MAXSCALE_VERSION_MINOR}.${MAXSCALE_VERSION_PATCH}-RC") + set(MAXSCALE_VERSION "${MAXSCALE_VERSION_MAJOR}.${MAXSCALE_VERSION_MINOR}.${MAXSCALE_VERSION_PATCH}-rc") endmacro() From e55c70b329745c9486a4b6cb70f4d201d4447965 Mon Sep 17 00:00:00 2001 From: VilhoRaatikka Date: Fri, 12 Dec 2014 14:36:05 +0200 Subject: [PATCH 10/16] Session status wasn't updated in tee filter's closeSession --- server/modules/filter/tee.c | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/server/modules/filter/tee.c b/server/modules/filter/tee.c index 809c8c441..cd4b9bba1 100644 --- a/server/modules/filter/tee.c +++ b/server/modules/filter/tee.c @@ -325,9 +325,18 @@ SESSION *bsession; { if ((bsession = my_session->branch_session) != NULL) { + CHK_SESSION(bsession); + spinlock_acquire(&bsession->ses_lock); + + if (bsession->state != SESSION_STATE_STOPPING) + { + bsession->state = SESSION_STATE_STOPPING; + } router = bsession->service->router; router_instance = bsession->service->router_instance; rsession = bsession->router_session; + spinlock_release(&bsession->ses_lock); + /** Close router session and all its connections */ router->closeSession(router_instance, rsession); } From 4c9307bbf23656f2ed2553d5cfe8ec2c844554c0 Mon Sep 17 00:00:00 2001 From: Markus Makela Date: Fri, 12 Dec 2014 17:15:11 +0200 Subject: [PATCH 11/16] Updated MaxScale_test.cnf --- server/test/MaxScale_test.cnf | 97 ++++++----------------------------- 1 file changed, 17 insertions(+), 80 deletions(-) diff --git a/server/test/MaxScale_test.cnf b/server/test/MaxScale_test.cnf index 7c2e8972e..60587ff92 100644 --- a/server/test/MaxScale_test.cnf +++ b/server/test/MaxScale_test.cnf @@ -1,27 +1,5 @@ -# -# Example MaxScale.cnf configuration file -# -# -# -# Number of server threads -# Valid options are: -# threads= - [maxscale] -threads=1 - -# Define a monitor that can be used to determine the state and role of -# the servers. -# -# Valid options are: -# -# module= -# servers=,,... -# user = -# passwd= -# monitor_interval= +threads=4 [MySQL Monitor] type=monitor @@ -29,41 +7,19 @@ module=mysqlmon servers=server1,server2,server3,server4 user=maxuser passwd=maxpwd - -# A series of service definition -# -# Valid options are: -# -# router= -# servers=,,... -# user= -# passwd= -# enable_root_user=<0 or 1, default is 0> -# version_string= -# -# Valid router modules currently are: -# readwritesplit, readconnroute and debugcli - +monitor_interval=10000 [RW Split Router] type=service router=readwritesplit servers=server1,server2,server3,server4 -max_slave_connections=90% -write_ses_variables_to_all=Yes -read_ses_variables_from_slaves=Yes user=maxuser passwd=maxpwd -filters=Hint [RW Split Hint Router] type=service router=readwritesplit servers=server1,server2,server3,server4 -max_slave_connections=90% -write_ses_variables_to_all=Yes -read_ses_variables_from_slaves=Yes user=maxuser passwd=maxpwd filters=Hint @@ -77,31 +33,23 @@ servers=server1 user=maxuser passwd=maxpwd - -[HTTPD Router] -type=service -router=testroute -servers=server1,server2,server3 +[Hint] +type=filter +module=hintfilter [Debug Interface] type=service router=debugcli +[CLI] +type=service +router=cli -[Hint] -type=filter -module=hintfilter - - -# Listener definitions for the services -# -# Valid options are: -# -# service= -# protocol= -# port= -# address=
-# socket= +[Read Connection Listener] +type=listener +service=Read Connection Router +protocol=MySQLClient +port=4008 [RW Split Listener] type=listener @@ -115,28 +63,17 @@ service=RW Split Hint Router protocol=MySQLClient port=4009 -[Read Connection Listener] -type=listener -service=Read Connection Router -protocol=MySQLClient -port=4008 -#socket=/tmp/readconn.sock - [Debug Listener] type=listener service=Debug Interface protocol=telnetd port=4442 -#address=127.0.0.1 -[HTTPD Listener] +[CLI Listener] type=listener -service=HTTPD Router -protocol=HTTPD -port=6444 - -# Definition of the servers - +service=CLI +protocol=maxscaled +port=6603 [server1] type=server address=127.0.0.1 From c31f971999053bbfb9a91f966c086c7a9b83ded6 Mon Sep 17 00:00:00 2001 From: VilhoRaatikka Date: Sat, 13 Dec 2014 01:55:40 +0200 Subject: [PATCH 12/16] Fix candidate for #645, http://bugs.skysql.com/show_bug.cgi?id=645 and #648, http://bugs.skysql.com/show_bug.cgi?id=648 If readwritesplit.c:routeQuery gets a GWBUF whose type is UNDEFINED, then each MySQL packet is extracted from input buffer and passed to new function, route_single_stmt. Each extracted packet is stored in separate GWBUF and added types GWBUF_TYPE_MYSQL and GWBUG_TYPE_SINGLE_STMT which makes it possible to execute session commands and process reply packets properly. Code nedes still cleaning but this is for testing atm. --- server/core/modutil.c | 69 +++ server/include/modutil.h | 1 + .../routing/readwritesplit/readwritesplit.c | 551 +++++++++++++++++- 3 files changed, 599 insertions(+), 22 deletions(-) diff --git a/server/core/modutil.c b/server/core/modutil.c index 21978a740..ff031c8cf 100644 --- a/server/core/modutil.c +++ b/server/core/modutil.c @@ -390,3 +390,72 @@ int modutil_send_mysql_err_packet ( return dcb->func.write(dcb, buf); } +/** + * Buffer contains at least one of the following: + * complete [complete] [partial] mysql packet + * + * return pointer to gwbuf containing a complete packet or + * NULL if no complete packet was found. + */ +GWBUF* modutil_get_next_MySQL_packet( + GWBUF** p_readbuf) +{ + GWBUF* packetbuf; + GWBUF* readbuf; + size_t buflen; + size_t packetlen; + size_t totalbuflen; + uint8_t* data; + size_t nbytes_copied = 0; + uint8_t* target; + + readbuf = *p_readbuf; + + if (readbuf == NULL) + { + packetbuf = NULL; + goto return_packetbuf; + } + CHK_GWBUF(readbuf); + + if (GWBUF_EMPTY(readbuf)) + { + packetbuf = NULL; + goto return_packetbuf; + } + totalbuflen = gwbuf_length(readbuf); + data = (uint8_t *)GWBUF_DATA((readbuf)); + packetlen = MYSQL_GET_PACKET_LEN(data)+4; + + /** packet is incomplete */ + if (packetlen > totalbuflen) + { + packetbuf = NULL; + goto return_packetbuf; + } + + packetbuf = gwbuf_alloc(packetlen); + target = GWBUF_DATA(packetbuf); + packetbuf->gwbuf_type = readbuf->gwbuf_type; /*< Copy the type too */ + /** + * Copy first MySQL packet to packetbuf and leave posible other + * packets to read buffer. + */ + while (nbytes_copied < packetlen && totalbuflen > 0) + { + uint8_t* src = GWBUF_DATA((*p_readbuf)); + size_t bytestocopy; + + buflen = GWBUF_LENGTH((*p_readbuf)); + bytestocopy = MIN(buflen,packetlen-nbytes_copied); + + memcpy(target+nbytes_copied, src, bytestocopy); + *p_readbuf = gwbuf_consume((*p_readbuf), bytestocopy); + totalbuflen = gwbuf_length((*p_readbuf)); + nbytes_copied += bytestocopy; + } + ss_dassert(buflen == 0 || nbytes_copied == packetlen); + +return_packetbuf: + return packetbuf; +} diff --git a/server/include/modutil.h b/server/include/modutil.h index a0daf60a9..762757617 100644 --- a/server/include/modutil.h +++ b/server/include/modutil.h @@ -41,6 +41,7 @@ extern char *modutil_get_SQL(GWBUF *); extern GWBUF *modutil_replace_SQL(GWBUF *, char *); extern char *modutil_get_query(GWBUF* buf); extern int modutil_send_mysql_err_packet(DCB *, int, int, int, const char *, const char *); +GWBUF* modutil_get_next_MySQL_packet(GWBUF** p_readbuf); GWBUF *modutil_create_mysql_err_msg( int packet_number, diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 0a1b4add3..804a460fa 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -111,6 +111,21 @@ static backend_ref_t* check_candidate_bref( backend_ref_t* new_bref, select_criteria_t sc); +static skygw_query_type_t is_read_tmp_table( + ROUTER_CLIENT_SES* router_cli_ses, + GWBUF* querybuf, + skygw_query_type_t type); + +static void check_create_tmp_table( + ROUTER_CLIENT_SES* router_cli_ses, + GWBUF* querybuf, + skygw_query_type_t type); + +static bool route_single_stmt( + ROUTER_INSTANCE* inst, + ROUTER_CLIENT_SES* rses, + GWBUF* querybuf); + static uint8_t getCapabilities (ROUTER* inst, void* router_session); @@ -1505,14 +1520,12 @@ static route_target_t get_route_target ( /** * Check if the query is a DROP TABLE... query and * if it targets a temporary table, remove it from the hashtable. - * @param instance Router instance - * @param router_session Router client session + * @param router_cli_ses Router client session * @param querybuf GWBUF containing the query * @param type The type of the query resolved so far */ void check_drop_tmp_table( - ROUTER* instance, - void* router_session, + ROUTER_CLIENT_SES* router_cli_ses, GWBUF* querybuf, skygw_query_type_t type) { @@ -1522,7 +1535,6 @@ void check_drop_tmp_table( char *hkey,*dbname; MYSQL_session* data; - ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session; DCB* master_dcb = NULL; rses_property_t* rses_prop_tmp; @@ -1567,16 +1579,14 @@ void check_drop_tmp_table( /** * Check if the query targets a temporary table. - * @param instance Router instance - * @param router_session Router client session + * @param router_cli_ses Router client session * @param querybuf GWBUF containing the query * @param type The type of the query resolved so far * @return The type of the query */ -skygw_query_type_t is_read_tmp_table( - ROUTER* instance, - void* router_session, - GWBUF* querybuf, +static skygw_query_type_t is_read_tmp_table( + ROUTER_CLIENT_SES* router_cli_ses, + GWBUF* querybuf, skygw_query_type_t type) { @@ -1586,7 +1596,6 @@ skygw_query_type_t is_read_tmp_table( char *hkey,*dbname; MYSQL_session* data; - ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session; DCB* master_dcb = NULL; skygw_query_type_t qtype = type; rses_property_t* rses_prop_tmp; @@ -1656,14 +1665,12 @@ skygw_query_type_t is_read_tmp_table( * the database and table name, create a hashvalue and * add it to the router client session's property. If property * doesn't exist then create it first. - * @param instance Router instance - * @param router_session Router client session + * @param router_cli_ses Router client session * @param querybuf GWBUF containing the query * @param type The type of the query resolved so far */ -void check_create_tmp_table( - ROUTER* instance, - void* router_session, +static void check_create_tmp_table( + ROUTER_CLIENT_SES* router_cli_ses, GWBUF* querybuf, skygw_query_type_t type) { @@ -1673,7 +1680,6 @@ void check_create_tmp_table( char *hkey,*dbname; MYSQL_session* data; - ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session; DCB* master_dcb = NULL; rses_property_t* rses_prop_tmp; HASHTABLE* h; @@ -1822,8 +1828,37 @@ static int routeQuery( { rses_is_closed = true; } +#if 1 + if (GWBUF_IS_TYPE_UNDEFINED(querybuf)) + { + GWBUF* tmpbuf = querybuf; + do + { + if ((querybuf = modutil_get_next_MySQL_packet(&tmpbuf)) == NULL) + { + ret = 1; + goto retblock; + } + /** Mark buffer to as MySQL type */ + gwbuf_set_type(querybuf, GWBUF_TYPE_MYSQL); + gwbuf_set_type(querybuf, GWBUF_TYPE_SINGLE_STMT); + succp = route_single_stmt(inst, router_cli_ses, querybuf); + } + while (tmpbuf != NULL && succp); + } + else + { + succp = route_single_stmt(inst, router_cli_ses, querybuf); + } + + if (succp) + { + ret = 1; + } + goto retblock; +#else ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf)); - + packet = GWBUF_DATA(querybuf); packet_type = packet[4]; @@ -1925,9 +1960,9 @@ static int routeQuery( /** * Check if the query has anything to do with temporary tables. */ - qtype = is_read_tmp_table(instance,router_session,querybuf,qtype); - check_create_tmp_table(instance,router_session,querybuf,qtype); - check_drop_tmp_table(instance,router_session,querybuf,qtype); + qtype = is_read_tmp_table(inst,router_cli_ses,querybuf,qtype); + check_create_tmp_table(inst,router_cli_ses,querybuf,qtype); + check_drop_tmp_table(inst,router_cli_ses,querybuf,qtype); /** * If autocommit is disabled or transaction is explicitly started @@ -2263,6 +2298,7 @@ static int routeQuery( } } rses_end_locked_router_action(router_cli_ses); +#endif retblock: #if defined(SS_DEBUG2) { @@ -2284,6 +2320,477 @@ retblock: return ret; } +static bool route_single_stmt( + ROUTER_INSTANCE* inst, + ROUTER_CLIENT_SES* rses, + GWBUF* querybuf) +{ + skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN; + mysql_server_cmd_t packet_type; + uint8_t* packet; + int ret = 0; + DCB* master_dcb = NULL; + DCB* target_dcb = NULL; + bool rses_is_closed = false; + route_target_t route_target; + bool succp; + int rlag_max = MAX_RLAG_UNDEFINED; + backend_type_t btype; /*< target backend type */ + + + ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf)); + packet = GWBUF_DATA(querybuf); + packet_type = packet[4]; + + if (rses_is_closed) + { + /** + * MYSQL_COM_QUIT may have sent by client and as a part of backend + * closing procedure. + */ + if (packet_type != MYSQL_COM_QUIT) + { + char* query_str = modutil_get_query(querybuf); + + LOGIF(LE, + (skygw_log_write_flush( + LOGFILE_ERROR, + "Error: Can't route %s:%s:\"%s\" to " + "backend server. Router is closed.", + STRPACKETTYPE(packet_type), + STRQTYPE(qtype), + (query_str == NULL ? "(empty)" : query_str)))); + free(query_str); + } + succp = false; + goto retblock; + } + + /** + * Read stored master DCB pointer. If master is not set, routing must + * be aborted + */ + if ((master_dcb = rses->rses_master_ref->bref_dcb) == NULL) + { + char* query_str = modutil_get_query(querybuf); + CHK_DCB(master_dcb); + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error: Can't route %s:%s:\"%s\" to " + "backend server. Session doesn't have a Master " + "node", + STRPACKETTYPE(packet_type), + STRQTYPE(qtype), + (query_str == NULL ? "(empty)" : query_str)))); + free(query_str); + succp = false; + goto retblock; + } + + /** If buffer is not contiguous, make it such */ + if (querybuf->next != NULL) + { + querybuf = gwbuf_make_contiguous(querybuf); + } + + switch(packet_type) { + case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */ + case MYSQL_COM_INIT_DB: /*< 2 DDL must go to the master */ + case MYSQL_COM_REFRESH: /*< 7 - I guess this is session but not sure */ + case MYSQL_COM_DEBUG: /*< 0d all servers dump debug info to stdout */ + case MYSQL_COM_PING: /*< 0e all servers are pinged */ + case MYSQL_COM_CHANGE_USER: /*< 11 all servers change it accordingly */ + case MYSQL_COM_STMT_CLOSE: /*< free prepared statement */ + case MYSQL_COM_STMT_SEND_LONG_DATA: /*< send data to column */ + case MYSQL_COM_STMT_RESET: /*< resets the data of a prepared statement */ + qtype = QUERY_TYPE_SESSION_WRITE; + break; + + case MYSQL_COM_CREATE_DB: /**< 5 DDL must go to the master */ + case MYSQL_COM_DROP_DB: /**< 6 DDL must go to the master */ + qtype = QUERY_TYPE_WRITE; + break; + + case MYSQL_COM_QUERY: + qtype = query_classifier_get_type(querybuf); + break; + + case MYSQL_COM_STMT_PREPARE: + qtype = query_classifier_get_type(querybuf); + qtype |= QUERY_TYPE_PREPARE_STMT; + break; + + case MYSQL_COM_STMT_EXECUTE: + /** Parsing is not needed for this type of packet */ + qtype = QUERY_TYPE_EXEC_STMT; + break; + + case MYSQL_COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */ + case MYSQL_COM_STATISTICS: /**< 9 ? */ + case MYSQL_COM_PROCESS_INFO: /**< 0a ? */ + case MYSQL_COM_CONNECT: /**< 0b ? */ + case MYSQL_COM_PROCESS_KILL: /**< 0c ? */ + case MYSQL_COM_TIME: /**< 0f should this be run in gateway ? */ + case MYSQL_COM_DELAYED_INSERT: /**< 10 ? */ + case MYSQL_COM_DAEMON: /**< 1d ? */ + default: + break; + } /**< switch by packet type */ + + /** + * Check if the query has anything to do with temporary tables. + */ + qtype = is_read_tmp_table(rses, querybuf, qtype); + check_create_tmp_table(rses, querybuf, qtype); + check_drop_tmp_table(rses, querybuf,qtype); + + /** + * If autocommit is disabled or transaction is explicitly started + * transaction becomes active and master gets all statements until + * transaction is committed and autocommit is enabled again. + */ + if (rses->rses_autocommit_enabled && + QUERY_IS_TYPE(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT)) + { + rses->rses_autocommit_enabled = false; + + if (!rses->rses_transaction_active) + { + rses->rses_transaction_active = true; + } + } + else if (!rses->rses_transaction_active && + QUERY_IS_TYPE(qtype, QUERY_TYPE_BEGIN_TRX)) + { + rses->rses_transaction_active = true; + } + /** + * Explicit COMMIT and ROLLBACK, implicit COMMIT. + */ + if (rses->rses_autocommit_enabled && + rses->rses_transaction_active && + (QUERY_IS_TYPE(qtype,QUERY_TYPE_COMMIT) || + QUERY_IS_TYPE(qtype,QUERY_TYPE_ROLLBACK))) + { + rses->rses_transaction_active = false; + } + else if (!rses->rses_autocommit_enabled && + QUERY_IS_TYPE(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT)) + { + rses->rses_autocommit_enabled = true; + rses->rses_transaction_active = false; + } + + if (LOG_IS_ENABLED(LOGFILE_TRACE)) + { + uint8_t* packet = GWBUF_DATA(querybuf); + unsigned char ptype = packet[4]; + size_t len = MIN(GWBUF_LENGTH(querybuf), + MYSQL_GET_PACKET_LEN((unsigned char *)querybuf->start)-1); + char* data = (char*)&packet[5]; + char* contentstr = strndup(data, len); + char* qtypestr = skygw_get_qtype_str(qtype); + + skygw_log_write( + LOGFILE_TRACE, + "> Autocommit: %s, trx is %s, cmd: %s, type: %s, " + "stmt: %s%s %s", + (rses->rses_autocommit_enabled ? "[enabled]" : "[disabled]"), + (rses->rses_transaction_active ? "[open]" : "[not open]"), + STRPACKETTYPE(ptype), + (qtypestr==NULL ? "N/A" : qtypestr), + contentstr, + (querybuf->hint == NULL ? "" : ", Hint:"), + (querybuf->hint == NULL ? "" : STRHINTTYPE(querybuf->hint->type))); + + free(contentstr); + free(qtypestr); + } + /** + * Find out where to route the query. Result may not be clear; it is + * possible to have a hint for routing to a named server which can + * be either slave or master. + * If query would otherwise be routed to slave then the hint determines + * actual target server if it exists. + * + * route_target is a bitfield and may include : + * TARGET_ALL + * - route to all connected backend servers + * TARGET_SLAVE[|TARGET_NAMED_SERVER|TARGET_RLAG_MAX] + * - route primarily according to hints, then to slave and if those + * failed, eventually to master + * TARGET_MASTER[|TARGET_NAMED_SERVER|TARGET_RLAG_MAX] + * - route primarily according to the hints and if they failed, + * eventually to master + */ + route_target = get_route_target(qtype, + rses->rses_transaction_active, + rses->rses_config.rw_use_sql_variables_in, + querybuf->hint); + + if (TARGET_IS_ALL(route_target)) + { + /** + * It is not sure if the session command in question requires + * response. Statement is examined in route_session_write. + * Router locking is done inside the function. + */ + succp = route_session_write( + rses, + gwbuf_clone(querybuf), + inst, + packet_type, + qtype); + + if (succp) + { + atomic_add(&inst->stats.n_all, 1); + } + goto retblock; + } + + /** Lock router session */ + if (!rses_begin_locked_router_action(rses)) + { + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Route query aborted! Routing session is closed <"))); + succp = false; + goto retblock; + } + /** + * There is a hint which either names the target backend or + * hint which sets maximum allowed replication lag for the + * backend. + */ + if (TARGET_IS_NAMED_SERVER(route_target) || + TARGET_IS_RLAG_MAX(route_target)) + { + HINT* hint; + char* named_server = NULL; + + hint = querybuf->hint; + + while (hint != NULL) + { + if (hint->type == HINT_ROUTE_TO_NAMED_SERVER) + { + /** + * Set the name of searched + * backend server. + */ + named_server = hint->data; + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Hint: route to server " + "'%s'", + named_server))); + } + else if (hint->type == HINT_PARAMETER && + (strncasecmp((char *)hint->data, + "max_slave_replication_lag", + strlen("max_slave_replication_lag")) == 0)) + { + int val = (int) strtol((char *)hint->value, + (char **)NULL, 10); + + if (val != 0 || errno == 0) + { + /** + * Set max. acceptable + * replication lag + * value for backend srv + */ + rlag_max = val; + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Hint: " + "max_slave_replication_lag=%d", + rlag_max))); + } + } + hint = hint->next; + } /*< while */ + + if (rlag_max == MAX_RLAG_UNDEFINED) /*< no rlag max hint, use config */ + { + rlag_max = rses_get_max_replication_lag(rses); + } + btype = BE_UNDEFINED; /*< target may be master or slave */ + /** + * Search backend server by name or replication lag. + * If it fails, then try to find valid slave or master. + */ + succp = get_dcb(&target_dcb, rses, btype, named_server,rlag_max); + + if (!succp) + { + if (TARGET_IS_NAMED_SERVER(route_target)) + { + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Was supposed to route to named server " + "%s but couldn't find the server in a " + "suitable state.", + named_server))); + } + else if (TARGET_IS_RLAG_MAX(route_target)) + { + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Was supposed to route to server with " + "replication lag at most %d but couldn't " + "find such a slave.", + rlag_max))); + } + } + } + else if (TARGET_IS_SLAVE(route_target)) + { + btype = BE_SLAVE; + + if (rlag_max == MAX_RLAG_UNDEFINED) /*< no rlag max hint, use config */ + { + rlag_max = rses_get_max_replication_lag(rses); + } + /** + * Search suitable backend server, get DCB in target_dcb + */ + succp = get_dcb(&target_dcb, rses, BE_SLAVE, NULL,rlag_max); + + if (succp) + { +#if defined(SS_EXTRA_DEBUG) + LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, + "Found DCB for slave."))); +#endif + ss_dassert(get_bref_from_dcb(rses, target_dcb) != + rses->rses_master_ref); + ss_dassert(get_root_master_bref(rses) == + rses->rses_master_ref); + atomic_add(&inst->stats.n_slave, 1); + } + else + { + LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, + "Was supposed to route to slave" + "but finding suitable one " + "failed."))); + } + } + else if (TARGET_IS_MASTER(route_target)) + { + DCB* curr_master_dcb = NULL; + + succp = get_dcb(&curr_master_dcb, + rses, + BE_MASTER, + NULL, + MAX_RLAG_UNDEFINED); + + if (succp && master_dcb == curr_master_dcb) + { + atomic_add(&inst->stats.n_master, 1); + target_dcb = master_dcb; + } + else + { + if (succp && master_dcb != curr_master_dcb) + { + LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, + "Was supposed to " + "route to master " + "but master has " + "changed."))); + } + else + { + LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, + "Was supposed to " + "route to master " + "but couldn't find " + "master in a " + "suitable state."))); + } + /** + * Master has changed. Return with error indicator. + */ + rses_end_locked_router_action(rses); + succp = false; + goto retblock; + } + } + + if (succp) /*< Have DCB of the target backend */ + { + backend_ref_t* bref; + sescmd_cursor_t* scur; + + bref = get_bref_from_dcb(rses, target_dcb); + scur = &bref->bref_sescmd_cur; + + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Route query to %s \t%s:%d <", + (SERVER_IS_MASTER(bref->bref_backend->backend_server) ? + "master" : "slave"), + bref->bref_backend->backend_server->name, + bref->bref_backend->backend_server->port))); + /** + * Store current stmt if execution of previous session command + * haven't completed yet. Note that according to MySQL protocol + * there can only be one such non-sescmd stmt at the time. + */ + if (sescmd_cursor_is_active(scur)) + { + ss_dassert(bref->bref_pending_cmd == NULL); + bref->bref_pending_cmd = gwbuf_clone(querybuf); + + rses_end_locked_router_action(rses); + goto retblock; + } + + if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(querybuf))) == 1) + { + backend_ref_t* bref; + + atomic_add(&inst->stats.n_queries, 1); + /** + * Add one query response waiter to backend reference + */ + bref = get_bref_from_dcb(rses, target_dcb); + bref_set_state(bref, BREF_QUERY_ACTIVE); + bref_set_state(bref, BREF_WAITING_RESULT); + } + else + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Routing query failed."))); + succp = false; + } + } + rses_end_locked_router_action(rses); + +retblock: +#if defined(SS_DEBUG2) + { + char* canonical_query_str; + + canonical_query_str = skygw_get_canonical(querybuf); + + if (canonical_query_str != NULL) + { + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Canonical version: %s", + canonical_query_str))); + free(canonical_query_str); + } + } +#endif + return succp; +} /** From a4968f25214dc0cff028507c13e18c109b49663e Mon Sep 17 00:00:00 2001 From: VilhoRaatikka Date: Sun, 14 Dec 2014 11:26:15 +0200 Subject: [PATCH 13/16] Complete fix candidate for #645, http://bugs.skysql.com/show_bug.cgi?id=645 and #648, http://bugs.skysql.com/show_bug.cgi?id=648 tee.c:closeSession removed unnecessary dcb_free, router/service closes all backend DCBs and the client DCB, and client DCB is the one that was tried to free in closeSession. readwritesplit.c:routeQuery now handles untyped and typed GWBUFs. Untyped means that read buffer may consist of incomplete and multiple MySQL packets. Typed buffer always consists of a single MySQL packet (which can be split to many buffers inside GWBUF). Fixed Coverity cases #84840 and #84841 --- server/core/dcb.c | 184 +++--- server/core/poll.c | 31 +- server/include/dcb.h | 3 + server/modules/filter/tee.c | 2 +- server/modules/include/readwritesplit.h | 4 + .../routing/readwritesplit/readwritesplit.c | 599 +++--------------- 6 files changed, 229 insertions(+), 594 deletions(-) diff --git a/server/core/dcb.c b/server/core/dcb.c index 8066bcef5..f66bc5c06 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -190,7 +190,7 @@ DCB *rval; rval->readcheck = 0; rval->polloutbusy = 0; rval->writecheck = 0; - rval->fd = -1; + rval->fd = DCBFD_CLOSED; rval->evq.next = NULL; rval->evq.prev = NULL; @@ -235,8 +235,10 @@ DCB *rval; void dcb_free(DCB *dcb) { - if (dcb->fd == -1) + if (dcb->fd == DCBFD_CLOSED) + { dcb_final_free(dcb); + } else { LOGIF(LE, (skygw_log_write_flush( @@ -308,7 +310,7 @@ DCB *clone; return NULL; } - clone->fd = -1; + clone->fd = DCBFD_CLONED;; clone->flags |= DCBF_CLONE; clone->state = orig->state; clone->data = orig->data; @@ -551,40 +553,42 @@ bool succp = false; DCB* dcb_next = NULL; int rc = 0; - /*< - * Close file descriptor and move to clean-up phase. - */ - rc = close(dcb->fd); + if (dcb->fd > 0) + { + /*< + * Close file descriptor and move to clean-up phase. + */ + rc = close(dcb->fd); - if (rc < 0) { - int eno = errno; - errno = 0; - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Failed to close " - "socket %d on dcb %p due error %d, %s.", - dcb->fd, - dcb, - eno, - strerror(eno)))); - } -#if defined(SS_DEBUG) - else { - LOGIF(LD, (skygw_log_write_flush( - LOGFILE_DEBUG, - "%lu [dcb_process_zombies] Closed socket " - "%d on dcb %p.", - pthread_self(), - dcb->fd, - dcb))); -#endif /* SS_DEBUG */ + if (rc < 0) + { + int eno = errno; + errno = 0; + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Failed to close " + "socket %d on dcb %p due error %d, %s.", + dcb->fd, + dcb, + eno, + strerror(eno)))); + } + else + { + dcb->fd = DCBFD_CLOSED; + + LOGIF(LD, (skygw_log_write_flush( + LOGFILE_DEBUG, + "%lu [dcb_process_zombies] Closed socket " + "%d on dcb %p.", + pthread_self(), + dcb->fd, + dcb))); #if defined(FAKE_CODE) - conn_open[dcb->fd] = false; + conn_open[dcb->fd] = false; #endif /* FAKE_CODE */ -#if defined(SS_DEBUG) - ss_debug(dcb->fd = -1;) - } -#endif /* SS_DEBUG */ + } + } LOGIF_MAYBE(LT, (dcb_get_ses_log_info( dcb, &tls_log_info.li_sesid, @@ -657,7 +661,7 @@ int rc; } fd = dcb->func.connect(dcb, server, session); - if (fd == -1) { + if (fd == DCBFD_CLOSED) { LOGIF(LD, (skygw_log_write( LOGFILE_DEBUG, "%lu [dcb_connect] Failed to connect to server %s:%d, " @@ -683,7 +687,7 @@ int rc; session->client, session->client->fd))); } - ss_dassert(dcb->fd == -1); /*< must be uninitialized at this point */ + ss_dassert(dcb->fd == DCBFD_CLOSED); /*< must be uninitialized at this point */ /*< * Successfully connected to backend. Assign file descriptor to dcb */ @@ -704,7 +708,7 @@ int rc; */ rc = poll_add_dcb(dcb); - if (rc == -1) { + if (rc == DCBFD_CLOSED) { dcb_set_state(dcb, DCB_STATE_DISCONNECTED, NULL); dcb_final_free(dcb); return NULL; @@ -736,11 +740,22 @@ int dcb_read( GWBUF *buffer = NULL; int b; int rc; - int n ; + int n; int nread = 0; CHK_DCB(dcb); - while (true) + + if (dcb->fd <= 0) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Read failed, dcb is %s.", + dcb->fd == DCBFD_CLOSED ? "closed" : "cloned, not readable"))); + n = 0; + goto return_n; + } + + while (true) { int bufsize; @@ -864,6 +879,14 @@ int below_water; below_water = (dcb->high_water && dcb->writeqlen < dcb->high_water) ? 1 : 0; ss_dassert(queue != NULL); + if (dcb->fd <= 0) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Write failed, dcb is %s.", + dcb->fd == DCBFD_CLOSED ? "closed" : "cloned, not writable"))); + return 0; + } /** * SESSION_STATE_STOPPING means that one of the backends is closing * the router session. Some backends may have not completed @@ -1209,46 +1232,42 @@ dcb_close(DCB *dcb) */ if (dcb->state == DCB_STATE_POLLING) { - if (dcb->fd != -1) - { - rc = poll_remove_dcb(dcb); + rc = poll_remove_dcb(dcb); - if (rc == 0) { - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [dcb_close] Removed dcb %p in state %s from " - "poll set.", - pthread_self(), - dcb, - STRDCBSTATE(dcb->state)))); - } else { - LOGIF(LE, (skygw_log_write( - LOGFILE_ERROR, - "Error : Removing DCB fd == %d in state %s from " - "poll set failed.", - dcb->fd, - STRDCBSTATE(dcb->state)))); - } - - if (rc == 0) - { - /** - * close protocol and router session - */ - if (dcb->func.close != NULL) - { - dcb->func.close(dcb); - } - dcb_call_callback(dcb, DCB_REASON_CLOSE); - - - if (dcb->state == DCB_STATE_NOPOLLING) - { - dcb_add_to_zombieslist(dcb); - } - } + if (rc == 0) { + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [dcb_close] Removed dcb %p in state %s from " + "poll set.", + pthread_self(), + dcb, + STRDCBSTATE(dcb->state)))); + } else { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Error : Removing DCB fd == %d in state %s from " + "poll set failed.", + dcb->fd, + STRDCBSTATE(dcb->state)))); } - + + if (rc == 0) + { + /** + * close protocol and router session + */ + if (dcb->func.close != NULL) + { + dcb->func.close(dcb); + } + dcb_call_callback(dcb, DCB_REASON_CLOSE); + + + if (dcb->state == DCB_STATE_NOPOLLING) + { + dcb_add_to_zombieslist(dcb); + } + } ss_dassert(dcb->state == DCB_STATE_NOPOLLING || dcb->state == DCB_STATE_ZOMBIE); } @@ -1764,7 +1783,8 @@ gw_write(DCB *dcb, const void *buf, size_t nbytes) int w; int fd = dcb->fd; #if defined(FAKE_CODE) - if (dcb_fake_write_errno[fd] != 0) { + if (fd > 0 && dcb_fake_write_errno[fd] != 0) + { ss_dassert(dcb_fake_write_ev[fd] != 0); w = write(fd, buf, nbytes/2); /*< leave peer to read missing bytes */ @@ -1772,11 +1792,15 @@ gw_write(DCB *dcb, const void *buf, size_t nbytes) w = -1; errno = dcb_fake_write_errno[fd]; } - } else { + } else if (fd > 0) + { w = write(fd, buf, nbytes); } #else - w = write(fd, buf, nbytes); + if (fd > 0) + { + w = write(fd, buf, nbytes); + } #endif /* FAKE_CODE */ #if defined(SS_DEBUG_MYSQL) diff --git a/server/core/poll.c b/server/core/poll.c index 53a09c25e..9ecd4367e 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -341,19 +341,26 @@ poll_remove_dcb(DCB *dcb) /*< * Set state to NOPOLLING and remove dcb from poll set. */ - if (dcb_set_state(dcb, new_state, &old_state)) { - rc = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, dcb->fd, &ev); + if (dcb_set_state(dcb, new_state, &old_state)) + { + /** + * Only positive fds can be removed from epoll set. + */ + if (dcb->fd > 0) + { + rc = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, dcb->fd, &ev); - if (rc != 0) { - int eno = errno; - errno = 0; - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : epoll_ctl failed due %d, %s.", - eno, - strerror(eno)))); - } - ss_dassert(rc == 0); /*< trap in debug */ + if (rc != 0) { + int eno = errno; + errno = 0; + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : epoll_ctl failed due %d, %s.", + eno, + strerror(eno)))); + } + ss_dassert(rc == 0); /*< trap in debug */ + } } /*< * This call was redundant, but the end result is correct. diff --git a/server/include/dcb.h b/server/include/dcb.h index 57c4b0358..c0bb80d73 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -129,6 +129,9 @@ typedef struct { */ #define GWPROTOCOL_VERSION {1, 0, 0} +#define DCBFD_CLOSED -1 +#define DCBFD_CLONED -2 + /** * The statitics gathered on a descriptor control block */ diff --git a/server/modules/filter/tee.c b/server/modules/filter/tee.c index d7c3650fd..1a94ae0f7 100644 --- a/server/modules/filter/tee.c +++ b/server/modules/filter/tee.c @@ -372,11 +372,11 @@ SESSION *bsession; /** Close router session and all its connections */ router->closeSession(router_instance, rsession); } - dcb_free(my_session->branch_dcb); /* No need to free the session, this is done as * a side effect of closing the client DCB of the * session. */ + my_session->active = 0; } } diff --git a/server/modules/include/readwritesplit.h b/server/modules/include/readwritesplit.h index 05bcfe2b1..dfd4f522f 100644 --- a/server/modules/include/readwritesplit.h +++ b/server/modules/include/readwritesplit.h @@ -323,4 +323,8 @@ typedef struct router_instance { #define BACKEND_TYPE(b) (SERVER_IS_MASTER((b)->backend_server) ? BE_MASTER : \ (SERVER_IS_SLAVE((b)->backend_server) ? BE_SLAVE : BE_UNDEFINED)); +#define RSES_SESSION(r) (r->rses_backend_ref->bref_dcb->session) + +#define RSES_CLIENT_DCB(r) (RSES_SESSION(r)->client) + #endif /*< _RWSPLITROUTER_H */ diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 804a460fa..3d53867d8 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -1807,502 +1807,108 @@ static int routeQuery( void* router_session, GWBUF* querybuf) { - skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN; - mysql_server_cmd_t packet_type; - uint8_t* packet; int ret = 0; - DCB* master_dcb = NULL; - DCB* target_dcb = NULL; - ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; + ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session; - bool rses_is_closed = false; - route_target_t route_target; bool succp = false; - int rlag_max = MAX_RLAG_UNDEFINED; - backend_type_t btype; /*< target backend type */ CHK_CLIENT_RSES(router_cli_ses); - /** Dirty read for quick check if router is closed. */ - if (router_cli_ses->rses_closed) - { - rses_is_closed = true; - } -#if 1 - if (GWBUF_IS_TYPE_UNDEFINED(querybuf)) + /** + * Untyped GWBUF means that it can consist of incomplete and/or multiple + * MySQL packets. + * Read and route found MySQL packets one by one and store potential + * incomplete packet to DCB's dcb_readqueue. + */ + if (GWBUF_IS_TYPE_UNDEFINED(querybuf)) { GWBUF* tmpbuf = querybuf; do { + /** + * Try to read complete MySQL packet from tmpbuf. + * Append leftover to client's read queue. + */ if ((querybuf = modutil_get_next_MySQL_packet(&tmpbuf)) == NULL) { - ret = 1; + if (GWBUF_LENGTH(tmpbuf) > 0) + { + DCB* dcb = RSES_CLIENT_DCB(router_cli_ses); + + dcb->dcb_readqueue = gwbuf_append(dcb->dcb_readqueue, tmpbuf); + } + succp = true; goto retblock; } /** Mark buffer to as MySQL type */ gwbuf_set_type(querybuf, GWBUF_TYPE_MYSQL); gwbuf_set_type(querybuf, GWBUF_TYPE_SINGLE_STMT); - succp = route_single_stmt(inst, router_cli_ses, querybuf); + + /** + * If router is closed, discard the packet + */ + if (router_cli_ses->rses_closed) + { + uint8_t* packet; + mysql_server_cmd_t packet_type; + + packet = GWBUF_DATA(querybuf); + packet_type = packet[4]; + + if (packet_type != MYSQL_COM_QUIT) + { + char* query_str = modutil_get_query(querybuf); + + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error: Can't route %s:\"%s\" to " + "backend server. Router is closed.", + STRPACKETTYPE(packet_type), + (query_str == NULL ? "(empty)" : query_str)))); + free(query_str); + } + } + else + { + succp = route_single_stmt(inst, router_cli_ses, querybuf); + } + } + while (tmpbuf != NULL); + } + /** + * If router is closed, discard the packet + */ + else if (router_cli_ses->rses_closed) + { + uint8_t* packet; + mysql_server_cmd_t packet_type; + + packet = GWBUF_DATA(querybuf); + packet_type = packet[4]; + + if (packet_type != MYSQL_COM_QUIT) + { + char* query_str = modutil_get_query(querybuf); + + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error: Can't route %s:\"%s\" to " + "backend server. Router is closed.", + STRPACKETTYPE(packet_type), + (query_str == NULL ? "(empty)" : query_str)))); + free(query_str); } - while (tmpbuf != NULL && succp); } else { succp = route_single_stmt(inst, router_cli_ses, querybuf); } - if (succp) - { - ret = 1; - } - goto retblock; -#else - ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf)); - - packet = GWBUF_DATA(querybuf); - packet_type = packet[4]; - - if (rses_is_closed) - { - /** - * MYSQL_COM_QUIT may have sent by client and as a part of backend - * closing procedure. - */ - if (packet_type != MYSQL_COM_QUIT) - { - char* query_str = modutil_get_query(querybuf); - - LOGIF(LE, - (skygw_log_write_flush( - LOGFILE_ERROR, - "Error: Can't route %s:%s:\"%s\" to " - "backend server. Router is closed.", - STRPACKETTYPE(packet_type), - STRQTYPE(qtype), - (query_str == NULL ? "(empty)" : query_str)))); - free(query_str); - } - ret = 0; - goto retblock; - } - - /** - * Read stored master DCB pointer. If master is not set, routing must - * be aborted - */ - if ((master_dcb = router_cli_ses->rses_master_ref->bref_dcb) == NULL) - { - char* query_str = modutil_get_query(querybuf); - CHK_DCB(master_dcb); - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error: Can't route %s:%s:\"%s\" to " - "backend server. Session doesn't have a Master " - "node", - STRPACKETTYPE(packet_type), - STRQTYPE(qtype), - (query_str == NULL ? "(empty)" : query_str)))); - free(query_str); - ret = 0; - goto retblock; - } - - /** If buffer is not contiguous, make it such */ - if (querybuf->next != NULL) - { - querybuf = gwbuf_make_contiguous(querybuf); - } - - switch(packet_type) { - case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */ - case MYSQL_COM_INIT_DB: /*< 2 DDL must go to the master */ - case MYSQL_COM_REFRESH: /*< 7 - I guess this is session but not sure */ - case MYSQL_COM_DEBUG: /*< 0d all servers dump debug info to stdout */ - case MYSQL_COM_PING: /*< 0e all servers are pinged */ - case MYSQL_COM_CHANGE_USER: /*< 11 all servers change it accordingly */ - case MYSQL_COM_STMT_CLOSE: /*< free prepared statement */ - case MYSQL_COM_STMT_SEND_LONG_DATA: /*< send data to column */ - case MYSQL_COM_STMT_RESET: /*< resets the data of a prepared statement */ - qtype = QUERY_TYPE_SESSION_WRITE; - break; - - case MYSQL_COM_CREATE_DB: /**< 5 DDL must go to the master */ - case MYSQL_COM_DROP_DB: /**< 6 DDL must go to the master */ - qtype = QUERY_TYPE_WRITE; - break; - - case MYSQL_COM_QUERY: - qtype = query_classifier_get_type(querybuf); - break; - - case MYSQL_COM_STMT_PREPARE: - qtype = query_classifier_get_type(querybuf); - qtype |= QUERY_TYPE_PREPARE_STMT; - break; - - case MYSQL_COM_STMT_EXECUTE: - /** Parsing is not needed for this type of packet */ - qtype = QUERY_TYPE_EXEC_STMT; - break; - - case MYSQL_COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */ - case MYSQL_COM_STATISTICS: /**< 9 ? */ - case MYSQL_COM_PROCESS_INFO: /**< 0a ? */ - case MYSQL_COM_CONNECT: /**< 0b ? */ - case MYSQL_COM_PROCESS_KILL: /**< 0c ? */ - case MYSQL_COM_TIME: /**< 0f should this be run in gateway ? */ - case MYSQL_COM_DELAYED_INSERT: /**< 10 ? */ - case MYSQL_COM_DAEMON: /**< 1d ? */ - default: - break; - } /**< switch by packet type */ - - /** - * Check if the query has anything to do with temporary tables. - */ - qtype = is_read_tmp_table(inst,router_cli_ses,querybuf,qtype); - check_create_tmp_table(inst,router_cli_ses,querybuf,qtype); - check_drop_tmp_table(inst,router_cli_ses,querybuf,qtype); - - /** - * If autocommit is disabled or transaction is explicitly started - * transaction becomes active and master gets all statements until - * transaction is committed and autocommit is enabled again. - */ - if (router_cli_ses->rses_autocommit_enabled && - QUERY_IS_TYPE(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT)) - { - router_cli_ses->rses_autocommit_enabled = false; - - if (!router_cli_ses->rses_transaction_active) - { - router_cli_ses->rses_transaction_active = true; - } - } - else if (!router_cli_ses->rses_transaction_active && - QUERY_IS_TYPE(qtype, QUERY_TYPE_BEGIN_TRX)) - { - router_cli_ses->rses_transaction_active = true; - } - /** - * Explicit COMMIT and ROLLBACK, implicit COMMIT. - */ - if (router_cli_ses->rses_autocommit_enabled && - router_cli_ses->rses_transaction_active && - (QUERY_IS_TYPE(qtype,QUERY_TYPE_COMMIT) || - QUERY_IS_TYPE(qtype,QUERY_TYPE_ROLLBACK))) - { - router_cli_ses->rses_transaction_active = false; - } - else if (!router_cli_ses->rses_autocommit_enabled && - QUERY_IS_TYPE(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT)) - { - router_cli_ses->rses_autocommit_enabled = true; - router_cli_ses->rses_transaction_active = false; - } - - if (LOG_IS_ENABLED(LOGFILE_TRACE)) - { - uint8_t* packet = GWBUF_DATA(querybuf); - unsigned char ptype = packet[4]; - size_t len = MIN(GWBUF_LENGTH(querybuf), - MYSQL_GET_PACKET_LEN((unsigned char *)querybuf->start)-1); - char* data = (char*)&packet[5]; - char* contentstr = strndup(data, len); - char* qtypestr = skygw_get_qtype_str(qtype); - - skygw_log_write( - LOGFILE_TRACE, - "> Autocommit: %s, trx is %s, cmd: %s, type: %s, " - "stmt: %s%s %s", - (router_cli_ses->rses_autocommit_enabled ? "[enabled]" : "[disabled]"), - (router_cli_ses->rses_transaction_active ? "[open]" : "[not open]"), - STRPACKETTYPE(ptype), - (qtypestr==NULL ? "N/A" : qtypestr), - contentstr, - (querybuf->hint == NULL ? "" : ", Hint:"), - (querybuf->hint == NULL ? "" : STRHINTTYPE(querybuf->hint->type))); - - free(contentstr); - free(qtypestr); - } - /** - * Find out where to route the query. Result may not be clear; it is - * possible to have a hint for routing to a named server which can - * be either slave or master. - * If query would otherwise be routed to slave then the hint determines - * actual target server if it exists. - * - * route_target is a bitfield and may include : - * TARGET_ALL - * - route to all connected backend servers - * TARGET_SLAVE[|TARGET_NAMED_SERVER|TARGET_RLAG_MAX] - * - route primarily according to hints, then to slave and if those - * failed, eventually to master - * TARGET_MASTER[|TARGET_NAMED_SERVER|TARGET_RLAG_MAX] - * - route primarily according to the hints and if they failed, - * eventually to master - */ - route_target = get_route_target(qtype, - router_cli_ses->rses_transaction_active, - router_cli_ses->rses_config.rw_use_sql_variables_in, - querybuf->hint); - - if (TARGET_IS_ALL(route_target)) - { - /** - * It is not sure if the session command in question requires - * response. Statement is examined in route_session_write. - * Router locking is done inside the function. - */ - succp = route_session_write(router_cli_ses, - gwbuf_clone(querybuf), - inst, - packet_type, - qtype); - - if (succp) - { - atomic_add(&inst->stats.n_all, 1); - ret = 1; - } - goto retblock; - } - - /** Lock router session */ - if (!rses_begin_locked_router_action(router_cli_ses)) - { - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "Route query aborted! Routing session is closed <"))); - ret = 0; - goto retblock; - } - /** - * There is a hint which either names the target backend or - * hint which sets maximum allowed replication lag for the - * backend. - */ - if (TARGET_IS_NAMED_SERVER(route_target) || - TARGET_IS_RLAG_MAX(route_target)) - { - HINT* hint; - char* named_server = NULL; - - hint = querybuf->hint; - - while (hint != NULL) - { - if (hint->type == HINT_ROUTE_TO_NAMED_SERVER) - { - /** - * Set the name of searched - * backend server. - */ - named_server = hint->data; - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "Hint: route to server " - "'%s'", - named_server))); - } - else if (hint->type == HINT_PARAMETER && - (strncasecmp((char *)hint->data, - "max_slave_replication_lag", - strlen("max_slave_replication_lag")) == 0)) - { - int val = (int) strtol((char *)hint->value, - (char **)NULL, 10); - - if (val != 0 || errno == 0) - { - /** - * Set max. acceptable - * replication lag - * value for backend srv - */ - rlag_max = val; - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "Hint: " - "max_slave_replication_lag=%d", - rlag_max))); - } - } - hint = hint->next; - } /*< while */ - - if (rlag_max == MAX_RLAG_UNDEFINED) /*< no rlag max hint, use config */ - { - rlag_max = rses_get_max_replication_lag(router_cli_ses); - } - btype = BE_UNDEFINED; /*< target may be master or slave */ - /** - * Search backend server by name or replication lag. - * If it fails, then try to find valid slave or master. - */ - succp = get_dcb(&target_dcb, - router_cli_ses, - btype, - named_server, - rlag_max); - if (!succp) - { - if (TARGET_IS_NAMED_SERVER(route_target)) - { - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "Was supposed to route to named server " - "%s but couldn't find the server in a " - "suitable state.", - named_server))); - } - else if (TARGET_IS_RLAG_MAX(route_target)) - { - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "Was supposed to route to server with " - "replication lag at most %d but couldn't " - "find such a slave.", - rlag_max))); - } - } - } - else if (TARGET_IS_SLAVE(route_target)) - { - btype = BE_SLAVE; - - if (rlag_max == MAX_RLAG_UNDEFINED) /*< no rlag max hint, use config */ - { - rlag_max = rses_get_max_replication_lag(router_cli_ses); - } - /** - * Search suitable backend server, get DCB in target_dcb - */ - succp = get_dcb(&target_dcb, - router_cli_ses, - BE_SLAVE, - NULL, - rlag_max); - if (succp) - { -#if defined(SS_EXTRA_DEBUG) - LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, - "Found DCB for slave."))); -#endif - ss_dassert(get_bref_from_dcb(router_cli_ses, target_dcb) != - router_cli_ses->rses_master_ref); - ss_dassert(get_root_master_bref(router_cli_ses) == - router_cli_ses->rses_master_ref); - atomic_add(&inst->stats.n_slave, 1); - } - else - { - LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, - "Was supposed to route to slave" - "but finding suitable one " - "failed."))); - } - } - else if (TARGET_IS_MASTER(route_target)) - { - DCB* curr_master_dcb = NULL; - - succp = get_dcb(&curr_master_dcb, - router_cli_ses, - BE_MASTER, - NULL, - MAX_RLAG_UNDEFINED); - - if (succp && master_dcb == curr_master_dcb) - { - atomic_add(&inst->stats.n_master, 1); - target_dcb = master_dcb; - } - else - { - if (succp && master_dcb != curr_master_dcb) - { - LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, - "Was supposed to " - "route to master " - "but master has " - "changed."))); - } - else - { - LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, - "Was supposed to " - "route to master " - "but couldn't find " - "master in a " - "suitable state."))); - } - /** - * Master has changed. Return with error indicator. - */ - rses_end_locked_router_action(router_cli_ses); - succp = false; - ret = 0; - goto retblock; - } - } - - if (succp) /*< Have DCB of the target backend */ - { - backend_ref_t* bref; - sescmd_cursor_t* scur; - - bref = get_bref_from_dcb(router_cli_ses, target_dcb); - scur = &bref->bref_sescmd_cur; - - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "Route query to %s \t%s:%d <", - (SERVER_IS_MASTER(bref->bref_backend->backend_server) ? - "master" : "slave"), - bref->bref_backend->backend_server->name, - bref->bref_backend->backend_server->port))); - /** - * Store current stmt if execution of previous session command - * haven't completed yet. Note that according to MySQL protocol - * there can only be one such non-sescmd stmt at the time. - */ - if (sescmd_cursor_is_active(scur)) - { - ss_dassert(bref->bref_pending_cmd == NULL); - bref->bref_pending_cmd = gwbuf_clone(querybuf); - - rses_end_locked_router_action(router_cli_ses); - ret = 1; - goto retblock; - } - - if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(querybuf))) == 1) - { - backend_ref_t* bref; - - atomic_add(&inst->stats.n_queries, 1); - /** - * Add one query response waiter to backend reference - */ - bref = get_bref_from_dcb(router_cli_ses, target_dcb); - bref_set_state(bref, BREF_QUERY_ACTIVE); - bref_set_state(bref, BREF_WAITING_RESULT); - } - else - { - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Routing query failed."))); - } - } - rses_end_locked_router_action(router_cli_ses); -#endif retblock: #if defined(SS_DEBUG2) - { - char* canonical_query_str; + if (querybuf != NULL) + { + char* canonical_query_str; canonical_query_str = skygw_get_canonical(querybuf); @@ -2316,10 +1922,14 @@ retblock: } } #endif - gwbuf_free(querybuf); + if (querybuf != NULL) gwbuf_free(querybuf); + if (succp) ret = 1; + return ret; } + + static bool route_single_stmt( ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* rses, @@ -2331,9 +1941,8 @@ static bool route_single_stmt( int ret = 0; DCB* master_dcb = NULL; DCB* target_dcb = NULL; - bool rses_is_closed = false; route_target_t route_target; - bool succp; + bool succp = false; int rlag_max = MAX_RLAG_UNDEFINED; backend_type_t btype; /*< target backend type */ @@ -2341,31 +1950,7 @@ static bool route_single_stmt( ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf)); packet = GWBUF_DATA(querybuf); packet_type = packet[4]; - - if (rses_is_closed) - { - /** - * MYSQL_COM_QUIT may have sent by client and as a part of backend - * closing procedure. - */ - if (packet_type != MYSQL_COM_QUIT) - { - char* query_str = modutil_get_query(querybuf); - - LOGIF(LE, - (skygw_log_write_flush( - LOGFILE_ERROR, - "Error: Can't route %s:%s:\"%s\" to " - "backend server. Router is closed.", - STRPACKETTYPE(packet_type), - STRQTYPE(qtype), - (query_str == NULL ? "(empty)" : query_str)))); - free(query_str); - } - succp = false; - goto retblock; - } - + /** * Read stored master DCB pointer. If master is not set, routing must * be aborted @@ -2552,9 +2137,19 @@ static bool route_single_stmt( /** Lock router session */ if (!rses_begin_locked_router_action(rses)) { - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "Route query aborted! Routing session is closed <"))); + if (packet_type != MYSQL_COM_QUIT) + { + char* query_str = modutil_get_query(querybuf); + + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error: Can't route %s:%s:\"%s\" to " + "backend server. Router is closed.", + STRPACKETTYPE(packet_type), + STRQTYPE(qtype), + (query_str == NULL ? "(empty)" : query_str)))); + free(query_str); + } succp = false; goto retblock; } @@ -2729,6 +2324,8 @@ static bool route_single_stmt( bref = get_bref_from_dcb(rses, target_dcb); scur = &bref->bref_sescmd_cur; + ss_dassert(target_dcb != NULL); + LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, "Route query to %s \t%s:%d <", From 04a92e40df8d0959bbaf227d93343d81d683783b Mon Sep 17 00:00:00 2001 From: Markus Makela Date: Mon, 15 Dec 2014 06:12:40 +0200 Subject: [PATCH 14/16] Fixes to Coverity defects 84879 84878 72752 72742 --- query_classifier/query_classifier.cc | 6 +++--- server/core/dcb.c | 3 ++- server/core/modutil.c | 6 +++--- server/modules/routing/readwritesplit/readwritesplit.c | 3 ++- 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/query_classifier/query_classifier.cc b/query_classifier/query_classifier.cc index 91342a224..788f6e5c6 100644 --- a/query_classifier/query_classifier.cc +++ b/query_classifier/query_classifier.cc @@ -151,7 +151,7 @@ bool parse_query ( THD* thd; uint8_t* data; size_t len; - char* query_str; + char* query_str = NULL; parsing_info_t* pi; CHK_GWBUF(querybuf); @@ -173,9 +173,9 @@ bool parse_query ( /** Extract query and copy it to different buffer */ data = (uint8_t*)GWBUF_DATA(querybuf); len = MYSQL_GET_PACKET_LEN(data)-1; /*< distract 1 for packet type byte */ - query_str = (char *)malloc(len+1); - if (query_str == NULL) + + if (len < 1 || (query_str = (char *)malloc(len+1)) == NULL) { /** Free parsing info data */ parsing_info_done(pi); diff --git a/server/core/dcb.c b/server/core/dcb.c index f66bc5c06..f8332de22 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -1776,11 +1776,12 @@ static bool dcb_set_state_nomutex( * @param dcb The DCB to write buffer * @param buf Buffer to write * @param nbytes Number of bytes to write + * @return Number of written bytes */ int gw_write(DCB *dcb, const void *buf, size_t nbytes) { - int w; + int w = 0; int fd = dcb->fd; #if defined(FAKE_CODE) if (fd > 0 && dcb_fake_write_errno[fd] != 0) diff --git a/server/core/modutil.c b/server/core/modutil.c index ff031c8cf..5824de9e2 100644 --- a/server/core/modutil.c +++ b/server/core/modutil.c @@ -234,7 +234,7 @@ modutil_get_query(GWBUF *buf) uint8_t* packet; mysql_server_cmd_t packet_type; size_t len; - char* query_str; + char* query_str = NULL; packet = GWBUF_DATA(buf); packet_type = packet[4]; @@ -252,7 +252,7 @@ modutil_get_query(GWBUF *buf) case MYSQL_COM_QUERY: len = MYSQL_GET_PACKET_LEN(packet)-1; /*< distract 1 for packet type byte */ - if ((query_str = (char *)malloc(len+1)) == NULL) + if (len < 1 || (query_str = (char *)malloc(len+1)) == NULL) { goto retblock; } @@ -262,7 +262,7 @@ modutil_get_query(GWBUF *buf) default: len = strlen(STRPACKETTYPE(packet_type))+1; - if ((query_str = (char *)malloc(len+1)) == NULL) + if (len < 1 || (query_str = (char *)malloc(len+1)) == NULL) { goto retblock; } diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 3d53867d8..8668afbd4 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -3807,7 +3807,8 @@ static bool execute_sescmd_in_backend( tmpbuf = scur->scmd_cur_cmd->my_sescmd_buf; qlen = MYSQL_GET_PACKET_LEN((unsigned char*)tmpbuf->start); memset(data->db,0,MYSQL_DATABASE_MAXLEN+1); - strncpy(data->db,tmpbuf->start+5,qlen - 1); + if(qlen > 0) + strncpy(data->db,tmpbuf->start+5,qlen - 1); } /** Fallthrough */ case MYSQL_COM_QUERY: From 4dd6f1b9c08a0d2e9f445348794f5e10fe41717c Mon Sep 17 00:00:00 2001 From: VilhoRaatikka Date: Mon, 15 Dec 2014 11:35:07 +0200 Subject: [PATCH 15/16] Added comment to assertion regarding to bref->bref_pending_cmd which is a one-slot buffer for database query. If sessoin command cursor is active when query is routed, instead of routing the query to backend it is stored to bref_pending_cmd to wait until previous (session) command is completed. The assertion traps if there is a command already in the bref_pending_cmd. Situation is wrong because client shouldn't send new query before the previous (the pending one) is executed in backend and the reply is sent to the client. --- server/modules/routing/readwritesplit/readwritesplit.c | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 3d53867d8..838c406d2 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -2335,8 +2335,14 @@ static bool route_single_stmt( bref->bref_backend->backend_server->port))); /** * Store current stmt if execution of previous session command - * haven't completed yet. Note that according to MySQL protocol + * haven't completed yet. + * + * !!! Note that according to MySQL protocol * there can only be one such non-sescmd stmt at the time. + * + * If the assertion below traps, pending queries are treated + * somehow wrong, or client is sending more queries before + * previous is received. */ if (sescmd_cursor_is_active(scur)) { @@ -2516,7 +2522,6 @@ char *weightby; } } - } /** From 82407ceaf48a3205a67025b2b07bdec13c420098 Mon Sep 17 00:00:00 2001 From: VilhoRaatikka Date: Mon, 15 Dec 2014 15:03:47 +0200 Subject: [PATCH 16/16] Fix to #650, http://bugs.mariadb.com/show_bug.cgi?id=650 Added checks. --- server/modules/filter/tee.c | 65 +++++++++++++++++++++++++++++++------ 1 file changed, 55 insertions(+), 10 deletions(-) diff --git a/server/modules/filter/tee.c b/server/modules/filter/tee.c index 1a94ae0f7..c8d2bc9c5 100644 --- a/server/modules/filter/tee.c +++ b/server/modules/filter/tee.c @@ -308,31 +308,76 @@ char *remote, *userName; if (strcmp(my_instance->service->name, session->service->name) == 0) { LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, - "%s: Recursive use of tee filter in service.", - session->service->name))); - return NULL; + "Error : %s: Recursive use of tee filter in service.", + session->service->name))); + my_session = NULL; + goto retblock; } + if ((my_session = calloc(1, sizeof(TEE_SESSION))) != NULL) { my_session->active = 1; my_session->residual = 0; - if (my_instance->source - && (remote = session_get_remote(session)) != NULL) + + if (my_instance->source && + (remote = session_get_remote(session)) != NULL) { if (strcmp(remote, my_instance->source)) + { my_session->active = 0; + + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Warning : Tee filter is not active."))); + } } userName = session_getUser(session); - if (my_instance->userName && userName && strcmp(userName, - my_instance->userName)) + + if (my_instance->userName && + userName && + strcmp(userName, my_instance->userName)) + { my_session->active = 0; + + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Warning : Tee filter is not active."))); + } + if (my_session->active) { - my_session->branch_dcb = dcb_clone(session->client); - my_session->branch_session = session_alloc(my_instance->service, my_session->branch_dcb); + DCB* dcb; + SESSION* ses; + + if ((dcb = dcb_clone(session->client)) == NULL) + { + freeSession(my_instance, (void *)my_session); + my_session = NULL; + + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Error : Creating client DCB for Tee " + "filter failed. Terminating session."))); + + goto retblock; + } + if ((ses = session_alloc(my_instance->service, dcb)) == NULL) + { + dcb_close(dcb); + freeSession(my_instance, (void *)my_session); + my_session = NULL; + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Error : Creating client session for Tee " + "filter failed. Terminating session."))); + + goto retblock; + } + my_session->branch_session = ses; + my_session->branch_dcb = dcb; } } - +retblock: return my_session; }