Merge branch 'develop' into MAX-348

This commit is contained in:
MassimilianoPinto 2015-03-02 16:39:37 +01:00
commit a384f63485
9 changed files with 89 additions and 20 deletions

View File

@ -1,4 +1,4 @@
cmake_minimum_required(VERSION 2.6)
cmake_minimum_required(VERSION 2.8.12)
message(STATUS "CMake version: ${CMAKE_VERSION}")
include(macros.cmake)

View File

@ -798,6 +798,12 @@ When value all is used, queries reading session variables can be routed to any a
In above-mentioned case the user-defined variable would only be updated in the master where query would be routed due to `INSERT` statement.
`max_sescmd_history` sets a limit on how many session commands each session can execute before the connection is closed. The default is an unlimited number of session commands.
max_sescmd_history=1500
When a limitation is set, it effectively creates a cap on the session's memory consumption. This might be useful if connection pooling is used and the sessions use large amounts of session commands.
An example of Read/Write Split router configuration :
```

View File

@ -182,6 +182,12 @@ replace=select
# router_options=
# slave_selection_criteria=[LEAST_CURRENT_OPERATIONS|LEAST_BEHIND_MASTER]
#
# router_options=max_sescmd_history specifies a limit on the number of 'session commands'
# a single session can execute. Please refer to the configuration guide for more details - optional.
#
# router_options=
# max_sescmd_history=2500
#
# max_slave_connections specifies how many slaves a router session can
# connect to - optional.
#

View File

@ -992,7 +992,7 @@ static void usage(void)
" -f|--config=... relative|absolute pathname of MaxScale configuration file\n"
" (default: $MAXSCALE_HOME/etc/MaxScale.cnf)\n"
" -l|--log=... log to file or shared memory\n"
" -lfile or -lshm - defaults to file\n"
" -lfile or -lshm - defaults to shared memory\n"
" -v|--version print version info and exit\n"
" -?|--help show this help\n"
, progname);
@ -1054,7 +1054,7 @@ int main(int argc, char **argv)
char* cnf_file_arg = NULL; /*< conf filename from cmd-line arg */
void* log_flush_thr = NULL;
int option_index;
int logtofile = 1; /* Use shared memory or file */
int logtofile = 0; /* Use shared memory or file */
ssize_t log_flush_timeout_ms = 0;
sigset_t sigset;
sigset_t sigpipe_mask;

View File

@ -63,6 +63,7 @@
#include <mysql_client_server_protocol.h>
#include <housekeeper.h>
#define MYSQL_COM_QUIT 0x01
#define MYSQL_COM_INITDB 0x02
#define MYSQL_COM_FIELD_LIST 0x04
@ -78,6 +79,10 @@
#define PARENT 0
#define CHILD 1
#ifdef SS_DEBUG
static int debug_seq = 0;
#endif
static unsigned char required_packets[] = {
MYSQL_COM_QUIT,
MYSQL_COM_INITDB,
@ -171,6 +176,7 @@ typedef struct {
GWBUF* tee_replybuf; /* Buffer for reply */
GWBUF* tee_partials[2];
SPINLOCK tee_lock;
DCB* client_dcb;
#ifdef SS_DEBUG
long d_id;
#endif
@ -328,7 +334,6 @@ void
ModuleInit()
{
spinlock_init(&orphanLock);
//hktask_add("tee orphan cleanup",orphan_free,NULL,15);
#ifdef SS_DEBUG
spinlock_init(&debug_lock);
#endif
@ -493,6 +498,7 @@ char *remote, *userName;
{
my_session->active = 1;
my_session->residual = 0;
my_session->client_dcb = session->client;
spinlock_init(&my_session->tee_lock);
if (my_instance->source &&
(remote = session_get_remote(session)) != NULL)
@ -630,7 +636,9 @@ TEE_SESSION *my_session = (TEE_SESSION *)session;
ROUTER_OBJECT *router;
void *router_instance, *rsession;
SESSION *bsession;
#ifdef SS_DEBUG
skygw_log_write(LOGFILE_TRACE,"Tee close: %d", atomic_add(&debug_seq,1));
#endif
if (my_session->active)
{
if ((bsession = my_session->branch_session) != NULL)
@ -654,7 +662,19 @@ SESSION *bsession;
* a side effect of closing the client DCB of the
* session.
*/
if(my_session->waiting[PARENT])
{
if(my_session->command != 0x01 &&
my_session->client_dcb &&
my_session->client_dcb->state == DCB_STATE_POLLING)
{
skygw_log_write(LOGFILE_TRACE,"Tee session closed mid-query.");
GWBUF* errbuf = modutil_create_mysql_err_msg(1,0,1,"00000","Session closed.");
my_session->client_dcb->func.write(my_session->client_dcb,errbuf);
}
}
my_session->active = 0;
}
}
@ -671,6 +691,9 @@ freeSession(FILTER *instance, void *session)
TEE_SESSION *my_session = (TEE_SESSION *)session;
SESSION* ses = my_session->branch_session;
session_state_t state;
#ifdef SS_DEBUG
skygw_log_write(LOGFILE_TRACE,"Tee free: %d", atomic_add(&debug_seq,1));
#endif
if (ses != NULL)
{
state = ses->state;
@ -777,7 +800,11 @@ char *ptr;
int length, rval, residual = 0;
GWBUF *clone = NULL;
unsigned char command = *((unsigned char*)queue->start + 4);
#ifdef SS_DEBUG
skygw_log_write(LOGFILE_TRACE,"Tee routeQuery: %d : %s",
atomic_add(&debug_seq,1),
((char*)queue->start + 5));
#endif
spinlock_acquire(&my_session->tee_lock);
if(!my_session->active)
@ -864,7 +891,8 @@ if(!my_session->active)
#endif
spinlock_acquire(&my_session->tee_lock);
if(my_session->branch_session == NULL ||
if(!my_session->active ||
my_session->branch_session == NULL ||
my_session->branch_session->state != SESSION_STATE_ROUTER_READY)
{
rval = 0;
@ -929,7 +957,14 @@ clientReply (FILTER* instance, void *session, GWBUF *reply)
GWBUF *complete = NULL;
unsigned char *ptr;
int min_eof = my_session->command != 0x04 ? 2 : 1;
#ifdef SS_DEBUG
ptr = (unsigned char*) reply->start;
skygw_log_write(LOGFILE_TRACE,"Tee clientReply [%s] [%s] [%s]: %d",
instance ? "parent":"child",
my_session->active ? "open" : "closed",
PTR_IS_ERR(ptr) ? "ERR" : PTR_IS_OK(ptr) ? "OK" : "RSET",
atomic_add(&debug_seq,1));
#endif
spinlock_acquire(&my_session->tee_lock);
if(!my_session->active)
@ -1007,8 +1042,8 @@ clientReply (FILTER* instance, void *session, GWBUF *reply)
if(branch == PARENT)
{
ss_dassert(my_session->tee_replybuf == NULL);
my_session->tee_replybuf = complete;
//ss_dassert(my_session->tee_replybuf == NULL);
my_session->tee_replybuf = gwbuf_append(my_session->tee_replybuf,complete);
}
else
{

View File

@ -246,7 +246,8 @@ typedef struct rwsplit_config_st {
int rw_max_slave_conn_count;
select_criteria_t rw_slave_select_criteria;
int rw_max_slave_replication_lag;
target_t rw_use_sql_variables_in;
target_t rw_use_sql_variables_in;
int rw_max_sescmd_history_size;
} rwsplit_config_t;
@ -285,6 +286,7 @@ struct router_client_session {
backend_ref_t* rses_backend_ref; /*< Pointer to backend reference array */
rwsplit_config_t rses_config; /*< copied config info from router instance */
int rses_nbackends;
int rses_nsescmd; /*< Number of executed session commands */
int rses_capabilities; /*< input type, for example */
bool rses_autocommit_enabled;
bool rses_transaction_active;

View File

@ -435,7 +435,7 @@ int query_len;
LOGFILE_ERROR, "Unexpected query from slave server %s", query_text)));
free(query_text);
blr_slave_send_error(router, slave, "Unexpected SQL query received from slave.");
return 0;
return 1;
}
@ -485,9 +485,9 @@ int len;
if ((pkt = gwbuf_alloc(strlen(msg) + 13)) == NULL)
return;
data = GWBUF_DATA(pkt);
len = strlen(msg) + 1;
len = strlen(msg) + 9;
encode_value(&data[0], len, 24); // Payload length
data[3] = 0; // Sequence id
data[3] = 1; // Sequence id
// Payload
data[4] = 0xff; // Error indicator
data[5] = 0; // Error Code
@ -1996,7 +1996,7 @@ blr_slave_disconnect_all(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
uint8_t *ptr;
int len, seqno;
GWBUF *pkt;
int n = 0;
int n = 1;
/* preparing output result */
blr_slave_send_fieldcount(router, slave, 2);
@ -2027,7 +2027,7 @@ blr_slave_disconnect_all(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
spinlock_release(&router->lock);
return 0;
return 1;
}
ptr = GWBUF_DATA(pkt);
@ -2055,5 +2055,5 @@ blr_slave_disconnect_all(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
blr_slave_send_eof(router, slave, seqno);
return n;
return 1;
}

View File

@ -4347,6 +4347,19 @@ static bool route_session_write(
goto return_succp;
}
if(router_cli_ses->rses_config.rw_max_sescmd_history_size > 0 &&
router_cli_ses->rses_nsescmd >= router_cli_ses->rses_config.rw_max_sescmd_history_size)
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Router session exceeded session command history limit. "
"Closing router session. <")));
router_cli_ses->rses_closed = true;
rses_end_locked_router_action(router_cli_ses);
goto return_succp;
}
/**
* Additional reference is created to querybuf to
* prevent it from being released before properties
@ -4418,6 +4431,9 @@ static bool route_session_write(
}
}
}
atomic_add(&router_cli_ses->rses_nsescmd,1);
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
@ -4517,6 +4533,10 @@ static void rwsplit_process_router_options(
router->rwsplit_config.rw_slave_select_criteria = c;
}
}
else if(strcmp(options[i], "max_sescmd_history") == 0)
{
router->rwsplit_config.rw_max_sescmd_history_size = atoi(value);
}
}
} /*< for */
}

View File

@ -707,8 +707,8 @@ size_t snprint_timestamp(
/** Generate timestamp */
gettimeofday(&tv,NULL);
tm = *(localtime(&tv.tv_sec));
t = time(NULL);
tm = *(localtime(&t));
snprintf(p_ts,
MIN(tslen,timestamp_len),
timestamp_formatstr,