Fix to bug 600: http://bugs.mariadb.com/show_bug.cgi?id=600
readwritesplit now drops slave connections that fail a session command.
This commit is contained in:
@ -31,6 +31,7 @@
|
|||||||
|
|
||||||
#include <dcb.h>
|
#include <dcb.h>
|
||||||
#include <hashtable.h>
|
#include <hashtable.h>
|
||||||
|
#include <math.h>
|
||||||
|
|
||||||
#undef PREP_STMT_CACHING
|
#undef PREP_STMT_CACHING
|
||||||
|
|
||||||
@ -54,7 +55,8 @@ typedef enum bref_state {
|
|||||||
BREF_IN_USE = 0x01,
|
BREF_IN_USE = 0x01,
|
||||||
BREF_WAITING_RESULT = 0x02, /*< for session commands only */
|
BREF_WAITING_RESULT = 0x02, /*< for session commands only */
|
||||||
BREF_QUERY_ACTIVE = 0x04, /*< for other queries */
|
BREF_QUERY_ACTIVE = 0x04, /*< for other queries */
|
||||||
BREF_CLOSED = 0x08
|
BREF_CLOSED = 0x08,
|
||||||
|
BREF_SESCMD_FAILED = 0x10 /*< Backend references that should be dropped */
|
||||||
} bref_state_t;
|
} bref_state_t;
|
||||||
|
|
||||||
#define BREF_IS_NOT_USED(s) ((s)->bref_state & ~BREF_IN_USE)
|
#define BREF_IS_NOT_USED(s) ((s)->bref_state & ~BREF_IN_USE)
|
||||||
@ -62,6 +64,7 @@ typedef enum bref_state {
|
|||||||
#define BREF_IS_WAITING_RESULT(s) ((s)->bref_num_result_wait > 0)
|
#define BREF_IS_WAITING_RESULT(s) ((s)->bref_num_result_wait > 0)
|
||||||
#define BREF_IS_QUERY_ACTIVE(s) ((s)->bref_state & BREF_QUERY_ACTIVE)
|
#define BREF_IS_QUERY_ACTIVE(s) ((s)->bref_state & BREF_QUERY_ACTIVE)
|
||||||
#define BREF_IS_CLOSED(s) ((s)->bref_state & BREF_CLOSED)
|
#define BREF_IS_CLOSED(s) ((s)->bref_state & BREF_CLOSED)
|
||||||
|
#define BREF_HAS_FAILED(s) ((s)->bref_state & BREF_SESCMD_FAILED)
|
||||||
|
|
||||||
typedef enum backend_type_t {
|
typedef enum backend_type_t {
|
||||||
BE_UNDEFINED=-1,
|
BE_UNDEFINED=-1,
|
||||||
@ -144,6 +147,9 @@ typedef struct mysql_sescmd_st {
|
|||||||
GWBUF* my_sescmd_buf; /*< query buffer */
|
GWBUF* my_sescmd_buf; /*< query buffer */
|
||||||
unsigned char my_sescmd_packet_type;/*< packet type */
|
unsigned char my_sescmd_packet_type;/*< packet type */
|
||||||
bool my_sescmd_is_replied; /*< is cmd replied to client */
|
bool my_sescmd_is_replied; /*< is cmd replied to client */
|
||||||
|
unsigned char reply_cmd; /*< The reply command. One of OK, ERR, RESULTSET or
|
||||||
|
* LOCAL_INFILE. Slave servers are compared to this
|
||||||
|
* when they return session command replies.*/
|
||||||
#if defined(SS_DEBUG)
|
#if defined(SS_DEBUG)
|
||||||
skygw_chk_t my_sescmd_chk_tail;
|
skygw_chk_t my_sescmd_chk_tail;
|
||||||
#endif
|
#endif
|
||||||
@ -226,6 +232,9 @@ typedef struct backend_ref_st {
|
|||||||
int bref_num_result_wait;
|
int bref_num_result_wait;
|
||||||
sescmd_cursor_t bref_sescmd_cur;
|
sescmd_cursor_t bref_sescmd_cur;
|
||||||
GWBUF* bref_pending_cmd; /*< For stmt which can't be routed due active sescmd execution */
|
GWBUF* bref_pending_cmd; /*< For stmt which can't be routed due active sescmd execution */
|
||||||
|
unsigned char
|
||||||
|
reply_cmd; /*< The reply the backend server sent to a session command.
|
||||||
|
* Used to detect slaves that fail to execute session command. */
|
||||||
#if defined(SS_DEBUG)
|
#if defined(SS_DEBUG)
|
||||||
skygw_chk_t bref_chk_tail;
|
skygw_chk_t bref_chk_tail;
|
||||||
#endif
|
#endif
|
||||||
|
@ -261,7 +261,7 @@ static mysql_sescmd_t* sescmd_cursor_get_command(
|
|||||||
static bool sescmd_cursor_next(
|
static bool sescmd_cursor_next(
|
||||||
sescmd_cursor_t* scur);
|
sescmd_cursor_t* scur);
|
||||||
|
|
||||||
static GWBUF* sescmd_cursor_process_replies(GWBUF* replybuf, backend_ref_t* bref);
|
static GWBUF* sescmd_cursor_process_replies(GWBUF* replybuf, backend_ref_t* bref,bool*);
|
||||||
|
|
||||||
static void tracelog_routed_query(
|
static void tracelog_routed_query(
|
||||||
ROUTER_CLIENT_SES* rses,
|
ROUTER_CLIENT_SES* rses,
|
||||||
@ -905,6 +905,15 @@ static void* newSession(
|
|||||||
client_rses->rses_capabilities = RCAP_TYPE_STMT_INPUT;
|
client_rses->rses_capabilities = RCAP_TYPE_STMT_INPUT;
|
||||||
client_rses->rses_backend_ref = backend_ref;
|
client_rses->rses_backend_ref = backend_ref;
|
||||||
client_rses->rses_nbackends = router_nservers; /*< # of backend servers */
|
client_rses->rses_nbackends = router_nservers; /*< # of backend servers */
|
||||||
|
|
||||||
|
if(client_rses->rses_config.rw_max_slave_conn_percent)
|
||||||
|
{
|
||||||
|
int n_conn = 0;
|
||||||
|
double pct = (double)client_rses->rses_config.rw_max_slave_conn_percent / 100.0;
|
||||||
|
n_conn = MAX(floor((double)client_rses->rses_nbackends * pct),1);
|
||||||
|
client_rses->rses_config.rw_max_slave_conn_count = n_conn;
|
||||||
|
}
|
||||||
|
|
||||||
router->stats.n_sessions += 1;
|
router->stats.n_sessions += 1;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -2666,11 +2675,13 @@ static void clientReply (
|
|||||||
DCB* backend_dcb)
|
DCB* backend_dcb)
|
||||||
{
|
{
|
||||||
DCB* client_dcb;
|
DCB* client_dcb;
|
||||||
|
ROUTER_INSTANCE* router_inst;
|
||||||
ROUTER_CLIENT_SES* router_cli_ses;
|
ROUTER_CLIENT_SES* router_cli_ses;
|
||||||
sescmd_cursor_t* scur = NULL;
|
sescmd_cursor_t* scur = NULL;
|
||||||
backend_ref_t* bref;
|
backend_ref_t* bref;
|
||||||
|
|
||||||
router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
|
router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
|
||||||
|
router_inst = (ROUTER_INSTANCE*)instance;
|
||||||
CHK_CLIENT_RSES(router_cli_ses);
|
CHK_CLIENT_RSES(router_cli_ses);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -2767,7 +2778,20 @@ static void clientReply (
|
|||||||
* the client. Return with buffer including response that
|
* the client. Return with buffer including response that
|
||||||
* needs to be sent to client or NULL.
|
* needs to be sent to client or NULL.
|
||||||
*/
|
*/
|
||||||
writebuf = sescmd_cursor_process_replies(writebuf, bref);
|
bool rconn = false;
|
||||||
|
writebuf = sescmd_cursor_process_replies(writebuf, bref, &rconn);
|
||||||
|
|
||||||
|
if(rconn)
|
||||||
|
{
|
||||||
|
select_connect_backend_servers(&router_cli_ses->rses_master_ref,
|
||||||
|
router_cli_ses->rses_backend_ref,
|
||||||
|
router_cli_ses->rses_nbackends,
|
||||||
|
router_cli_ses->rses_config.rw_max_slave_conn_count,
|
||||||
|
router_cli_ses->rses_config.rw_max_slave_replication_lag,
|
||||||
|
router_cli_ses->rses_config.rw_slave_select_criteria,
|
||||||
|
router_cli_ses->rses_master_ref->bref_dcb->session,
|
||||||
|
router_cli_ses->router);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* If response will be sent to client, decrease waiter count.
|
* If response will be sent to client, decrease waiter count.
|
||||||
@ -3185,6 +3209,11 @@ static bool select_connect_backend_servers(
|
|||||||
(master_host != NULL &&
|
(master_host != NULL &&
|
||||||
(b->backend_server != master_host->backend_server)))
|
(b->backend_server != master_host->backend_server)))
|
||||||
{
|
{
|
||||||
|
if(BREF_HAS_FAILED(&backend_ref[i]))
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
slaves_found += 1;
|
slaves_found += 1;
|
||||||
|
|
||||||
/** Slave is already connected */
|
/** Slave is already connected */
|
||||||
@ -3674,15 +3703,19 @@ static void mysql_sescmd_done(
|
|||||||
*/
|
*/
|
||||||
static GWBUF* sescmd_cursor_process_replies(
|
static GWBUF* sescmd_cursor_process_replies(
|
||||||
GWBUF* replybuf,
|
GWBUF* replybuf,
|
||||||
backend_ref_t* bref)
|
backend_ref_t* bref,
|
||||||
|
bool *reconnect)
|
||||||
{
|
{
|
||||||
mysql_sescmd_t* scmd;
|
mysql_sescmd_t* scmd;
|
||||||
sescmd_cursor_t* scur;
|
sescmd_cursor_t* scur;
|
||||||
|
ROUTER_CLIENT_SES* ses;
|
||||||
|
ROUTER_INSTANCE* router;
|
||||||
|
|
||||||
scur = &bref->bref_sescmd_cur;
|
scur = &bref->bref_sescmd_cur;
|
||||||
ss_dassert(SPINLOCK_IS_LOCKED(&(scur->scmd_cur_rses->rses_lock)));
|
ss_dassert(SPINLOCK_IS_LOCKED(&(scur->scmd_cur_rses->rses_lock)));
|
||||||
scmd = sescmd_cursor_get_command(scur);
|
scmd = sescmd_cursor_get_command(scur);
|
||||||
|
ses = (*scur->scmd_cur_ptr_property)->rses_prop_rsession;
|
||||||
|
router = ses->router;
|
||||||
CHK_GWBUF(replybuf);
|
CHK_GWBUF(replybuf);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -3691,6 +3724,7 @@ static GWBUF* sescmd_cursor_process_replies(
|
|||||||
*/
|
*/
|
||||||
while (scmd != NULL && replybuf != NULL)
|
while (scmd != NULL && replybuf != NULL)
|
||||||
{
|
{
|
||||||
|
bref->reply_cmd = *((unsigned char*)replybuf->start + 4);
|
||||||
/** Faster backend has already responded to client : discard */
|
/** Faster backend has already responded to client : discard */
|
||||||
if (scmd->my_sescmd_is_replied)
|
if (scmd->my_sescmd_is_replied)
|
||||||
{
|
{
|
||||||
@ -3709,14 +3743,68 @@ static GWBUF* sescmd_cursor_process_replies(
|
|||||||
}
|
}
|
||||||
/** Set response status received */
|
/** Set response status received */
|
||||||
bref_clear_state(bref, BREF_WAITING_RESULT);
|
bref_clear_state(bref, BREF_WAITING_RESULT);
|
||||||
|
|
||||||
|
if(bref->reply_cmd != scmd->reply_cmd)
|
||||||
|
{
|
||||||
|
skygw_log_write(LOGFILE_TRACE,"Backend "
|
||||||
|
"server '%s' response differs from master's response. "
|
||||||
|
"Closing connection.",
|
||||||
|
bref->bref_backend->backend_server->unique_name);
|
||||||
|
sescmd_cursor_set_active(scur,false);
|
||||||
|
bref_clear_state(bref,BREF_QUERY_ACTIVE);
|
||||||
|
bref_clear_state(bref,BREF_IN_USE);
|
||||||
|
bref_set_state(bref,BREF_CLOSED);
|
||||||
|
bref_set_state(bref,BREF_SESCMD_FAILED);
|
||||||
|
dcb_close(bref->bref_dcb);
|
||||||
|
*reconnect = true;
|
||||||
|
if(replybuf)
|
||||||
|
gwbuf_free(replybuf);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
/** Response is in the buffer and it will be sent to client. */
|
/** This is a response from the master and it is the "right" one.
|
||||||
else
|
* A slave server's response will be compared to this and if
|
||||||
|
* their response differs from the master server's response, they
|
||||||
|
* are dropped from the valid list of backend servers.
|
||||||
|
* Response is in the buffer and it will be sent to client. */
|
||||||
|
else if(ses->rses_master_ref->bref_dcb == bref->bref_dcb)
|
||||||
{
|
{
|
||||||
/** Mark the rest session commands as replied */
|
/** Mark the rest session commands as replied */
|
||||||
scmd->my_sescmd_is_replied = true;
|
scmd->my_sescmd_is_replied = true;
|
||||||
|
scmd->reply_cmd = *((unsigned char*)replybuf->start + 4);
|
||||||
|
skygw_log_write(LOGFILE_DEBUG,"Master '%s' responded to a session command.",
|
||||||
|
bref->bref_backend->backend_server->unique_name);
|
||||||
|
int i;
|
||||||
|
|
||||||
|
for(i=0;i<ses->rses_nbackends;i++)
|
||||||
|
{
|
||||||
|
if(!BREF_IS_WAITING_RESULT(&ses->rses_backend_ref[i]))
|
||||||
|
{
|
||||||
|
/** This backend has already received a response */
|
||||||
|
if(ses->rses_backend_ref[i].reply_cmd !=
|
||||||
|
scmd->reply_cmd &&
|
||||||
|
!BREF_IS_CLOSED(&ses->rses_backend_ref[i]))
|
||||||
|
{
|
||||||
|
bref_clear_state(&ses->rses_backend_ref[i],BREF_QUERY_ACTIVE);
|
||||||
|
bref_clear_state(&ses->rses_backend_ref[i],BREF_IN_USE);
|
||||||
|
bref_set_state(&ses->rses_backend_ref[i],BREF_CLOSED);
|
||||||
|
bref_set_state(bref,BREF_SESCMD_FAILED);
|
||||||
|
dcb_close(ses->rses_backend_ref[i].bref_dcb);
|
||||||
|
*reconnect = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
skygw_log_write(LOGFILE_DEBUG,"Slave '%s' responded faster to a session command.",
|
||||||
|
bref->bref_backend->backend_server->unique_name);
|
||||||
|
if(replybuf)
|
||||||
|
gwbuf_free(replybuf);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
if (sescmd_cursor_next(scur))
|
if (sescmd_cursor_next(scur))
|
||||||
{
|
{
|
||||||
scmd = sescmd_cursor_get_command(scur);
|
scmd = sescmd_cursor_get_command(scur);
|
||||||
@ -4217,7 +4305,7 @@ static bool route_session_write(
|
|||||||
{
|
{
|
||||||
DCB* dcb = backend_ref[i].bref_dcb;
|
DCB* dcb = backend_ref[i].bref_dcb;
|
||||||
|
|
||||||
if (LOG_IS_ENABLED(LOGFILE_TRACE))
|
if (LOG_IS_ENABLED(LOGFILE_TRACE) && BREF_IS_IN_USE((&backend_ref[i])))
|
||||||
{
|
{
|
||||||
LOGIF(LT, (skygw_log_write(
|
LOGIF(LT, (skygw_log_write(
|
||||||
LOGFILE_TRACE,
|
LOGFILE_TRACE,
|
||||||
|
Reference in New Issue
Block a user