Merge branch 'develop' into MAX-324

This commit is contained in:
Markus Makela
2015-02-20 10:18:45 +02:00
2 changed files with 107 additions and 10 deletions

View File

@ -261,7 +261,7 @@ static mysql_sescmd_t* sescmd_cursor_get_command(
static bool sescmd_cursor_next(
sescmd_cursor_t* scur);
static GWBUF* sescmd_cursor_process_replies(GWBUF* replybuf, backend_ref_t* bref);
static GWBUF* sescmd_cursor_process_replies(GWBUF* replybuf, backend_ref_t* bref,bool*);
static void tracelog_routed_query(
ROUTER_CLIENT_SES* rses,
@ -905,6 +905,15 @@ static void* newSession(
client_rses->rses_capabilities = RCAP_TYPE_STMT_INPUT;
client_rses->rses_backend_ref = backend_ref;
client_rses->rses_nbackends = router_nservers; /*< # of backend servers */
if(client_rses->rses_config.rw_max_slave_conn_percent)
{
int n_conn = 0;
double pct = (double)client_rses->rses_config.rw_max_slave_conn_percent / 100.0;
n_conn = MAX(floor((double)client_rses->rses_nbackends * pct),1);
client_rses->rses_config.rw_max_slave_conn_count = n_conn;
}
router->stats.n_sessions += 1;
/**
@ -2666,11 +2675,13 @@ static void clientReply (
DCB* backend_dcb)
{
DCB* client_dcb;
ROUTER_INSTANCE* router_inst;
ROUTER_CLIENT_SES* router_cli_ses;
sescmd_cursor_t* scur = NULL;
backend_ref_t* bref;
router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
router_inst = (ROUTER_INSTANCE*)instance;
CHK_CLIENT_RSES(router_cli_ses);
/**
@ -2767,7 +2778,20 @@ static void clientReply (
* the client. Return with buffer including response that
* needs to be sent to client or NULL.
*/
writebuf = sescmd_cursor_process_replies(writebuf, bref);
bool rconn = false;
writebuf = sescmd_cursor_process_replies(writebuf, bref, &rconn);
if(rconn)
{
select_connect_backend_servers(&router_cli_ses->rses_master_ref,
router_cli_ses->rses_backend_ref,
router_cli_ses->rses_nbackends,
router_cli_ses->rses_config.rw_max_slave_conn_count,
router_cli_ses->rses_config.rw_max_slave_replication_lag,
router_cli_ses->rses_config.rw_slave_select_criteria,
router_cli_ses->rses_master_ref->bref_dcb->session,
router_cli_ses->router);
}
}
/**
* If response will be sent to client, decrease waiter count.
@ -3185,6 +3209,11 @@ static bool select_connect_backend_servers(
(master_host != NULL &&
(b->backend_server != master_host->backend_server)))
{
if(BREF_HAS_FAILED(&backend_ref[i]))
{
continue;
}
slaves_found += 1;
/** Slave is already connected */
@ -3672,15 +3701,19 @@ static void mysql_sescmd_done(
*/
static GWBUF* sescmd_cursor_process_replies(
GWBUF* replybuf,
backend_ref_t* bref)
backend_ref_t* bref,
bool *reconnect)
{
mysql_sescmd_t* scmd;
sescmd_cursor_t* scur;
ROUTER_CLIENT_SES* ses;
ROUTER_INSTANCE* router;
scur = &bref->bref_sescmd_cur;
ss_dassert(SPINLOCK_IS_LOCKED(&(scur->scmd_cur_rses->rses_lock)));
scmd = sescmd_cursor_get_command(scur);
ses = (*scur->scmd_cur_ptr_property)->rses_prop_rsession;
router = ses->router;
CHK_GWBUF(replybuf);
/**
@ -3689,6 +3722,7 @@ static GWBUF* sescmd_cursor_process_replies(
*/
while (scmd != NULL && replybuf != NULL)
{
bref->reply_cmd = *((unsigned char*)replybuf->start + 4);
/** Faster backend has already responded to client : discard */
if (scmd->my_sescmd_is_replied)
{
@ -3707,14 +3741,68 @@ static GWBUF* sescmd_cursor_process_replies(
}
/** Set response status received */
bref_clear_state(bref, BREF_WAITING_RESULT);
if(bref->reply_cmd != scmd->reply_cmd)
{
skygw_log_write(LOGFILE_TRACE,"Backend "
"server '%s' response differs from master's response. "
"Closing connection.",
bref->bref_backend->backend_server->unique_name);
sescmd_cursor_set_active(scur,false);
bref_clear_state(bref,BREF_QUERY_ACTIVE);
bref_clear_state(bref,BREF_IN_USE);
bref_set_state(bref,BREF_CLOSED);
bref_set_state(bref,BREF_SESCMD_FAILED);
dcb_close(bref->bref_dcb);
*reconnect = true;
if(replybuf)
gwbuf_free(replybuf);
}
}
/** Response is in the buffer and it will be sent to client. */
else
/** This is a response from the master and it is the "right" one.
* A slave server's response will be compared to this and if
* their response differs from the master server's response, they
* are dropped from the valid list of backend servers.
* Response is in the buffer and it will be sent to client. */
else if(ses->rses_master_ref->bref_dcb == bref->bref_dcb)
{
/** Mark the rest session commands as replied */
scmd->my_sescmd_is_replied = true;
scmd->reply_cmd = *((unsigned char*)replybuf->start + 4);
skygw_log_write(LOGFILE_DEBUG,"Master '%s' responded to a session command.",
bref->bref_backend->backend_server->unique_name);
int i;
for(i=0;i<ses->rses_nbackends;i++)
{
if(!BREF_IS_WAITING_RESULT(&ses->rses_backend_ref[i]))
{
/** This backend has already received a response */
if(ses->rses_backend_ref[i].reply_cmd !=
scmd->reply_cmd &&
!BREF_IS_CLOSED(&ses->rses_backend_ref[i]))
{
bref_clear_state(&ses->rses_backend_ref[i],BREF_QUERY_ACTIVE);
bref_clear_state(&ses->rses_backend_ref[i],BREF_IN_USE);
bref_set_state(&ses->rses_backend_ref[i],BREF_CLOSED);
bref_set_state(bref,BREF_SESCMD_FAILED);
dcb_close(ses->rses_backend_ref[i].bref_dcb);
*reconnect = true;
}
}
}
}
else
{
skygw_log_write(LOGFILE_DEBUG,"Slave '%s' responded faster to a session command.",
bref->bref_backend->backend_server->unique_name);
if(replybuf)
gwbuf_free(replybuf);
return NULL;
}
if (sescmd_cursor_next(scur))
{
scmd = sescmd_cursor_get_command(scur);
@ -4215,7 +4303,7 @@ static bool route_session_write(
{
DCB* dcb = backend_ref[i].bref_dcb;
if (LOG_IS_ENABLED(LOGFILE_TRACE))
if (LOG_IS_ENABLED(LOGFILE_TRACE) && BREF_IS_IN_USE((&backend_ref[i])))
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,