Merge remote-tracking branch 'origin/develop' into MXS-105
This commit is contained in:
@ -164,6 +164,24 @@ blr_file_rotate(ROUTER_INSTANCE *router, char *file, uint64_t pos)
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* binlog files need an initial 4 magic bytes at the start. blr_file_add_magic()
|
||||
* adds them.
|
||||
*
|
||||
* @param router The router instance
|
||||
* @param fd file descriptor to the open binlog file
|
||||
* @return Nothing
|
||||
*/
|
||||
static void
|
||||
blr_file_add_magic(ROUTER_INSTANCE *router, int fd)
|
||||
{
|
||||
unsigned char magic[] = BINLOG_MAGIC;
|
||||
|
||||
write(fd, magic, 4);
|
||||
router->binlog_position = 4; /* Initial position after the magic number */
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create a new binlog file for the router to use.
|
||||
*
|
||||
@ -176,7 +194,6 @@ blr_file_create(ROUTER_INSTANCE *router, char *file)
|
||||
{
|
||||
char path[1024];
|
||||
int fd;
|
||||
unsigned char magic[] = BINLOG_MAGIC;
|
||||
|
||||
strcpy(path, router->binlogdir);
|
||||
strcat(path, "/");
|
||||
@ -184,7 +201,7 @@ unsigned char magic[] = BINLOG_MAGIC;
|
||||
|
||||
if ((fd = open(path, O_RDWR|O_CREAT, 0666)) != -1)
|
||||
{
|
||||
write(fd, magic, 4);
|
||||
blr_file_add_magic(router,fd);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -197,7 +214,7 @@ unsigned char magic[] = BINLOG_MAGIC;
|
||||
close(router->binlog_fd);
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
strncpy(router->binlog_name, file,BINLOG_FNAMELEN);
|
||||
router->binlog_position = 4; /* Initial position after the magic number */
|
||||
blr_file_add_magic(router, fd);
|
||||
spinlock_release(&router->binlog_lock);
|
||||
router->binlog_fd = fd;
|
||||
return 1;
|
||||
@ -232,6 +249,18 @@ int fd;
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
strncpy(router->binlog_name, file,BINLOG_FNAMELEN);
|
||||
router->binlog_position = lseek(fd, 0L, SEEK_END);
|
||||
if (router->binlog_position < 4) {
|
||||
if (router->binlog_position == 0) {
|
||||
blr_file_add_magic(router, fd);
|
||||
} else {
|
||||
/* If for any reason the file's length is between 1 and 3 bytes
|
||||
* then report an error. */
|
||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
||||
"%s: binlog file %s has an invalid length %d.",
|
||||
router->service->name, path, router->binlog_position)));
|
||||
return;
|
||||
}
|
||||
}
|
||||
spinlock_release(&router->binlog_lock);
|
||||
router->binlog_fd = fd;
|
||||
}
|
||||
|
@ -628,7 +628,7 @@ createInstance(SERVICE *service, char **options)
|
||||
* If server weighting has been defined calculate the percentage
|
||||
* of load that will be sent to each server. This is only used for
|
||||
* calculating the least connections, either globally or within a
|
||||
* service, or the numebr of current operations on a server.
|
||||
* service, or the number of current operations on a server.
|
||||
*/
|
||||
if ((weightby = serviceGetWeightingParameter(service)) != NULL)
|
||||
{
|
||||
@ -698,6 +698,13 @@ createInstance(SERVICE *service, char **options)
|
||||
{
|
||||
rwsplit_process_router_options(router, options);
|
||||
}
|
||||
|
||||
/** These options cancel each other out */
|
||||
if(router->rwsplit_config.disable_sescmd_hist && router->rwsplit_config.rw_max_sescmd_history_size > 0)
|
||||
{
|
||||
router->rwsplit_config.rw_max_sescmd_history_size = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set default value for max_slave_connections and for slave selection
|
||||
* criteria. If parameter is set in config file max_slave_connections
|
||||
@ -807,7 +814,7 @@ static void* newSession(
|
||||
rwsplit_process_router_options(router, router->service->routerOptions);
|
||||
}
|
||||
/** Copy config struct from router instance */
|
||||
client_rses->rses_config = router->rwsplit_config;
|
||||
memcpy(&client_rses->rses_config,&router->rwsplit_config,sizeof(rwsplit_config_t));
|
||||
|
||||
spinlock_release(&router->lock);
|
||||
/**
|
||||
@ -2782,16 +2789,16 @@ static void clientReply (
|
||||
bool rconn = false;
|
||||
writebuf = sescmd_cursor_process_replies(writebuf, bref, &rconn);
|
||||
|
||||
if(rconn)
|
||||
if(rconn && !router_inst->rwsplit_config.disable_slave_recovery)
|
||||
{
|
||||
select_connect_backend_servers(&router_cli_ses->rses_master_ref,
|
||||
router_cli_ses->rses_backend_ref,
|
||||
router_cli_ses->rses_nbackends,
|
||||
router_cli_ses->rses_config.rw_max_slave_conn_count,
|
||||
router_cli_ses->rses_config.rw_max_slave_replication_lag,
|
||||
router_cli_ses->rses_config.rw_slave_select_criteria,
|
||||
router_cli_ses->rses_master_ref->bref_dcb->session,
|
||||
router_cli_ses->router);
|
||||
select_connect_backend_servers(&router_cli_ses->rses_master_ref,
|
||||
router_cli_ses->rses_backend_ref,
|
||||
router_cli_ses->rses_nbackends,
|
||||
router_cli_ses->rses_config.rw_max_slave_conn_count,
|
||||
router_cli_ses->rses_config.rw_max_slave_replication_lag,
|
||||
router_cli_ses->rses_config.rw_slave_select_criteria,
|
||||
router_cli_ses->rses_master_ref->bref_dcb->session,
|
||||
router_cli_ses->router);
|
||||
}
|
||||
}
|
||||
/**
|
||||
@ -3667,7 +3674,8 @@ static mysql_sescmd_t* mysql_sescmd_init (
|
||||
/** Set session command buffer */
|
||||
sescmd->my_sescmd_buf = sescmd_buf;
|
||||
sescmd->my_sescmd_packet_type = packet_type;
|
||||
|
||||
sescmd->position = atomic_add(&rses->pos_generator,1);
|
||||
|
||||
return sescmd;
|
||||
}
|
||||
|
||||
@ -3724,6 +3732,7 @@ static GWBUF* sescmd_cursor_process_replies(
|
||||
while (scmd != NULL && replybuf != NULL)
|
||||
{
|
||||
bref->reply_cmd = *((unsigned char*)replybuf->start + 4);
|
||||
scur->position = scmd->position;
|
||||
/** Faster backend has already responded to client : discard */
|
||||
if (scmd->my_sescmd_is_replied)
|
||||
{
|
||||
@ -4361,6 +4370,43 @@ static bool route_session_write(
|
||||
goto return_succp;
|
||||
}
|
||||
|
||||
if(router_cli_ses->rses_config.disable_sescmd_hist)
|
||||
{
|
||||
rses_property_t *prop, *tmp;
|
||||
backend_ref_t* bref;
|
||||
bool conflict;
|
||||
|
||||
prop = router_cli_ses->rses_properties[RSES_PROP_TYPE_SESCMD];
|
||||
while(prop)
|
||||
{
|
||||
conflict = false;
|
||||
|
||||
for(i = 0;i<router_cli_ses->rses_nbackends;i++)
|
||||
{
|
||||
bref = &backend_ref[i];
|
||||
if(BREF_IS_IN_USE(bref))
|
||||
{
|
||||
|
||||
if(bref->bref_sescmd_cur.position <= prop->rses_prop_data.sescmd.position)
|
||||
{
|
||||
conflict = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(conflict)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
tmp = prop;
|
||||
router_cli_ses->rses_properties[RSES_PROP_TYPE_SESCMD] = prop->rses_prop_next;
|
||||
rses_property_done(tmp);
|
||||
prop = router_cli_ses->rses_properties[RSES_PROP_TYPE_SESCMD];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Additional reference is created to querybuf to
|
||||
* prevent it from being released before properties
|
||||
@ -4538,6 +4584,14 @@ static void rwsplit_process_router_options(
|
||||
{
|
||||
router->rwsplit_config.rw_max_sescmd_history_size = atoi(value);
|
||||
}
|
||||
else if(strcmp(options[i],"disable_sescmd_history") == 0)
|
||||
{
|
||||
router->rwsplit_config.disable_sescmd_hist = config_truth_value(value);
|
||||
}
|
||||
else if(strcmp(options[i],"disable_slave_recovery") == 0)
|
||||
{
|
||||
router->rwsplit_config.disable_slave_recovery = config_truth_value(value);
|
||||
}
|
||||
}
|
||||
} /*< for */
|
||||
}
|
||||
@ -4780,6 +4834,12 @@ static bool handle_error_new_connection(
|
||||
* Try to get replacement slave or at least the minimum
|
||||
* number of slave connections for router session.
|
||||
*/
|
||||
if(inst->rwsplit_config.disable_slave_recovery)
|
||||
{
|
||||
succp = have_enough_servers(&rses,1,router_nservers,inst) ? true : false;
|
||||
}
|
||||
else
|
||||
{
|
||||
succp = select_connect_backend_servers(
|
||||
&rses->rses_master_ref,
|
||||
rses->rses_backend_ref,
|
||||
@ -4789,6 +4849,7 @@ static bool handle_error_new_connection(
|
||||
rses->rses_config.rw_slave_select_criteria,
|
||||
ses,
|
||||
inst);
|
||||
}
|
||||
|
||||
return_succp:
|
||||
return succp;
|
||||
|
@ -716,6 +716,13 @@ createInstance(SERVICE *service, char **options)
|
||||
return NULL;
|
||||
}
|
||||
router->service = service;
|
||||
router->schemarouter_config.max_sescmd_hist = 0;
|
||||
router->stats.longest_sescmd = 0;
|
||||
router->stats.n_hist_exceeded = 0;
|
||||
router->stats.n_queries = 0;
|
||||
router->stats.n_sescmd = 0;
|
||||
router->stats.ses_longest = 0;
|
||||
router->stats.ses_shortest = (double)((unsigned long)(~0));
|
||||
spinlock_init(&router->lock);
|
||||
|
||||
/** Calculate number of servers */
|
||||
@ -730,6 +737,45 @@ createInstance(SERVICE *service, char **options)
|
||||
service->users_from_all = true;
|
||||
}
|
||||
|
||||
bool failure = false;
|
||||
|
||||
for(i=0;options && options[i];i++)
|
||||
{
|
||||
char* value;
|
||||
if((value = strchr(options[i],'=')) == NULL)
|
||||
{
|
||||
skygw_log_write(LOGFILE_ERROR,"Error: Unknown router options for Schemarouter: %s",options[i]);
|
||||
failure = true;
|
||||
break;
|
||||
}
|
||||
*value = '\0';
|
||||
value++;
|
||||
if(strcmp(options[i],"max_sescmd_history") == 0)
|
||||
{
|
||||
router->schemarouter_config.max_sescmd_hist = atoi(value);
|
||||
}
|
||||
else if(strcmp(options[i],"disable_sescmd_history") == 0)
|
||||
{
|
||||
router->schemarouter_config.disable_sescmd_hist = config_truth_value(value);
|
||||
}
|
||||
else
|
||||
{
|
||||
skygw_log_write(LOGFILE_ERROR,"Error: Unknown router options for Schemarouter: %s",options[i]);
|
||||
failure = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/** Setting a limit to the history size is not needed if it is disabled.*/
|
||||
if(router->schemarouter_config.disable_sescmd_hist && router->schemarouter_config.max_sescmd_hist > 0)
|
||||
router->schemarouter_config.max_sescmd_hist = 0;
|
||||
|
||||
if(failure)
|
||||
{
|
||||
free(router);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
while (server != NULL)
|
||||
{
|
||||
nservers++;
|
||||
@ -759,6 +805,7 @@ createInstance(SERVICE *service, char **options)
|
||||
router->servers[nservers]->backend_conn_count = 0;
|
||||
router->servers[nservers]->weight = 1;
|
||||
router->servers[nservers]->be_valid = false;
|
||||
router->servers[nservers]->stats.queries = 0;
|
||||
if(server->server->monuser == NULL && service->credentials.name != NULL)
|
||||
{
|
||||
router->servers[nservers]->backend_server->monuser =
|
||||
@ -887,7 +934,8 @@ static void* newSession(
|
||||
client_rses->dcb_reply->func.read = internalReply;
|
||||
client_rses->dcb_reply->state = DCB_STATE_POLLING;
|
||||
client_rses->dcb_reply->session = session;
|
||||
|
||||
memcpy(&client_rses->rses_config,&router->schemarouter_config,sizeof(schemarouter_config_t));
|
||||
client_rses->n_sescmd = 0;
|
||||
client_rses->dcb_route = dcb_alloc(DCB_ROLE_REQUEST_HANDLER);
|
||||
client_rses->dcb_route->func.read = internalRoute;
|
||||
client_rses->dcb_route->state = DCB_STATE_POLLING;
|
||||
@ -988,7 +1036,6 @@ static void* newSession(
|
||||
client_rses->rses_capabilities = RCAP_TYPE_STMT_INPUT;
|
||||
client_rses->rses_backend_ref = backend_ref;
|
||||
client_rses->rses_nbackends = router_nservers; /*< # of backend servers */
|
||||
router->stats.n_sessions += 1;
|
||||
|
||||
if (!(succp = rses_begin_locked_router_action(client_rses)))
|
||||
{
|
||||
@ -1008,7 +1055,8 @@ static void* newSession(
|
||||
|
||||
|
||||
rses_end_locked_router_action(client_rses);
|
||||
|
||||
|
||||
atomic_add(&router->stats.sessions, 1);
|
||||
|
||||
/**
|
||||
* Version is bigger than zero once initialized.
|
||||
@ -1047,6 +1095,7 @@ static void closeSession(
|
||||
void* router_session)
|
||||
{
|
||||
ROUTER_CLIENT_SES* router_cli_ses;
|
||||
ROUTER_INSTANCE* inst;
|
||||
backend_ref_t* backend_ref;
|
||||
|
||||
LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG,
|
||||
@ -1063,7 +1112,8 @@ static void closeSession(
|
||||
}
|
||||
router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
|
||||
CHK_CLIENT_RSES(router_cli_ses);
|
||||
|
||||
|
||||
inst = router_cli_ses->router;
|
||||
backend_ref = router_cli_ses->rses_backend_ref;
|
||||
/**
|
||||
* Lock router client session for secure read and update.
|
||||
@ -1118,9 +1168,24 @@ static void closeSession(
|
||||
router_cli_ses->dcb_route->session = NULL;
|
||||
dcb_close(router_cli_ses->dcb_reply);
|
||||
dcb_close(router_cli_ses->dcb_route);
|
||||
|
||||
|
||||
/** Unlock */
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
|
||||
spinlock_acquire(&inst->lock);
|
||||
if(inst->stats.longest_sescmd < router_cli_ses->stats.longest_sescmd)
|
||||
inst->stats.longest_sescmd = router_cli_ses->stats.longest_sescmd;
|
||||
double ses_time = difftime(time(NULL),router_cli_ses->rses_client_dcb->session->stats.connect);
|
||||
if(inst->stats.ses_longest < ses_time)
|
||||
inst->stats.ses_longest = ses_time;
|
||||
if(inst->stats.ses_shortest > ses_time)
|
||||
inst->stats.ses_shortest = ses_time;
|
||||
|
||||
inst->stats.ses_average =
|
||||
(ses_time + ((inst->stats.sessions - 1) * inst->stats.ses_average)) /
|
||||
(inst->stats.sessions);
|
||||
|
||||
spinlock_release(&inst->lock);
|
||||
}
|
||||
}
|
||||
|
||||
@ -2069,7 +2134,8 @@ static int routeQuery(
|
||||
|
||||
if (succp)
|
||||
{
|
||||
atomic_add(&inst->stats.n_all, 1);
|
||||
atomic_add(&inst->stats.n_sescmd, 1);
|
||||
atomic_add(&inst->stats.n_queries, 1);
|
||||
ret = 1;
|
||||
}
|
||||
goto retblock;
|
||||
@ -2270,6 +2336,9 @@ diagnostic(ROUTER *instance, DCB *dcb)
|
||||
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
|
||||
int i = 0;
|
||||
|
||||
double sescmd_pct = router->stats.n_sescmd != 0 ?
|
||||
100.0*((double)router->stats.n_sescmd / (double)router->stats.n_queries) :
|
||||
0.0;
|
||||
|
||||
dcb_printf(dcb,"\33[1;4m%-16s%-16s%-16s\33[0m\n","Server","Queries","State");
|
||||
for(i=0;router->servers[i];i++)
|
||||
@ -2282,6 +2351,45 @@ diagnostic(ROUTER *instance, DCB *dcb)
|
||||
"\33[30;41mDOWN\33[0m");
|
||||
}
|
||||
|
||||
/** Session command statistics */
|
||||
dcb_printf(dcb,"\n\33[1;4mSession Commands\33[0m\n");
|
||||
dcb_printf(dcb,"Total number of queries: %d\n",
|
||||
router->stats.n_queries);
|
||||
dcb_printf(dcb,"Percentage of session commands: %.2f\n",
|
||||
sescmd_pct);
|
||||
dcb_printf(dcb,"Longest chain of stored session commands: %d\n",
|
||||
router->stats.longest_sescmd);
|
||||
dcb_printf(dcb,"Session command history limit exceeded: %d times\n",
|
||||
router->stats.n_hist_exceeded);
|
||||
if(!router->schemarouter_config.disable_sescmd_hist)
|
||||
{
|
||||
dcb_printf(dcb,"Session command history: enabled\n");
|
||||
if(router->schemarouter_config.max_sescmd_hist == 0)
|
||||
{
|
||||
dcb_printf(dcb,"Session command history limit: unlimited\n");
|
||||
}
|
||||
else
|
||||
{
|
||||
dcb_printf(dcb,"Session command history limit: %d\n",
|
||||
router->schemarouter_config.max_sescmd_hist);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
|
||||
dcb_printf(dcb,"Session command history: disabled\n");
|
||||
}
|
||||
|
||||
/** Session time statistics */
|
||||
|
||||
if(router->stats.sessions > 0)
|
||||
{
|
||||
dcb_printf(dcb,"\n\33[1;4mSession Time Statistics\33[0m\n");
|
||||
dcb_printf(dcb,"Longest session: %.2lf seconds\n",router->stats.ses_longest);
|
||||
dcb_printf(dcb,"Shortest session: %.2lf seconds\n",router->stats.ses_shortest);
|
||||
dcb_printf(dcb,"Average session length: %.2lf seconds\n",router->stats.ses_average);
|
||||
}
|
||||
dcb_printf(dcb,"\n");
|
||||
}
|
||||
|
||||
/**
|
||||
@ -3126,7 +3234,7 @@ static mysql_sescmd_t* mysql_sescmd_init (
|
||||
/** Set session command buffer */
|
||||
sescmd->my_sescmd_buf = sescmd_buf;
|
||||
sescmd->my_sescmd_packet_type = packet_type;
|
||||
|
||||
sescmd->position = atomic_add(&rses->pos_generator,1);
|
||||
return sescmd;
|
||||
}
|
||||
|
||||
@ -3178,6 +3286,7 @@ static GWBUF* sescmd_cursor_process_replies(
|
||||
*/
|
||||
while (scmd != NULL && replybuf != NULL)
|
||||
{
|
||||
scur->position = scmd->position;
|
||||
/** Faster backend has already responded to client : discard */
|
||||
if (scmd->my_sescmd_is_replied)
|
||||
{
|
||||
@ -3729,17 +3838,74 @@ static bool route_session_write(
|
||||
succp = false;
|
||||
goto return_succp;
|
||||
}
|
||||
/**
|
||||
|
||||
if(router_cli_ses->rses_config.max_sescmd_hist > 0 &&
|
||||
router_cli_ses->n_sescmd >= router_cli_ses->rses_config.max_sescmd_hist)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write(
|
||||
LOGFILE_ERROR|LOGFILE_TRACE,
|
||||
"Router session exceeded session command history limit of %d. "
|
||||
"Closing router session.",
|
||||
router_cli_ses->rses_config.max_sescmd_hist)));
|
||||
gwbuf_free(querybuf);
|
||||
atomic_add(&router_cli_ses->router->stats.n_hist_exceeded,1);
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
router_cli_ses->rses_client_dcb->func.hangup(router_cli_ses->rses_client_dcb);
|
||||
|
||||
goto return_succp;
|
||||
}
|
||||
|
||||
if(router_cli_ses->rses_config.disable_sescmd_hist)
|
||||
{
|
||||
rses_property_t *prop, *tmp;
|
||||
backend_ref_t* bref;
|
||||
bool conflict;
|
||||
|
||||
prop = router_cli_ses->rses_properties[RSES_PROP_TYPE_SESCMD];
|
||||
while(prop)
|
||||
{
|
||||
conflict = false;
|
||||
|
||||
for(i = 0;i<router_cli_ses->rses_nbackends;i++)
|
||||
{
|
||||
bref = &backend_ref[i];
|
||||
if(BREF_IS_IN_USE(bref))
|
||||
{
|
||||
|
||||
if(bref->bref_sescmd_cur.position <= prop->rses_prop_data.sescmd.position)
|
||||
{
|
||||
conflict = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(conflict)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
tmp = prop;
|
||||
router_cli_ses->rses_properties[RSES_PROP_TYPE_SESCMD] = prop->rses_prop_next;
|
||||
rses_property_done(tmp);
|
||||
prop = router_cli_ses->rses_properties[RSES_PROP_TYPE_SESCMD];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* Additional reference is created to querybuf to
|
||||
* prevent it from being released before properties
|
||||
* are cleaned up as a part of router sessionclean-up.
|
||||
* are cleaned up as a part of router session clean-up.
|
||||
*/
|
||||
prop = rses_property_init(RSES_PROP_TYPE_SESCMD);
|
||||
mysql_sescmd_init(prop, querybuf, packet_type, router_cli_ses);
|
||||
|
||||
/** Add sescmd property to router client session */
|
||||
rses_property_add(router_cli_ses, prop);
|
||||
|
||||
atomic_add(&router_cli_ses->stats.longest_sescmd,1);
|
||||
atomic_add(&router_cli_ses->n_sescmd,1);
|
||||
|
||||
for (i=0; i<router_cli_ses->rses_nbackends; i++)
|
||||
{
|
||||
if (BREF_IS_IN_USE((&backend_ref[i])))
|
||||
@ -3794,6 +3960,10 @@ static bool route_session_write(
|
||||
backend_ref[i].bref_backend->backend_server->name,
|
||||
backend_ref[i].bref_backend->backend_server->port)));
|
||||
}
|
||||
else
|
||||
{
|
||||
atomic_add(&backend_ref[i].bref_backend->stats.queries,1);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
|
Reference in New Issue
Block a user