diff --git a/server/modules/include/readwritesplit.h b/server/modules/include/readwritesplit.h index 19939c9e7..929b66b0b 100644 --- a/server/modules/include/readwritesplit.h +++ b/server/modules/include/readwritesplit.h @@ -31,6 +31,7 @@ #include #include +#include #undef PREP_STMT_CACHING @@ -54,7 +55,8 @@ typedef enum bref_state { BREF_IN_USE = 0x01, BREF_WAITING_RESULT = 0x02, /*< for session commands only */ BREF_QUERY_ACTIVE = 0x04, /*< for other queries */ - BREF_CLOSED = 0x08 + BREF_CLOSED = 0x08, + BREF_SESCMD_FAILED = 0x10 /*< Backend references that should be dropped */ } bref_state_t; #define BREF_IS_NOT_USED(s) ((s)->bref_state & ~BREF_IN_USE) @@ -62,6 +64,7 @@ typedef enum bref_state { #define BREF_IS_WAITING_RESULT(s) ((s)->bref_num_result_wait > 0) #define BREF_IS_QUERY_ACTIVE(s) ((s)->bref_state & BREF_QUERY_ACTIVE) #define BREF_IS_CLOSED(s) ((s)->bref_state & BREF_CLOSED) +#define BREF_HAS_FAILED(s) ((s)->bref_state & BREF_SESCMD_FAILED) typedef enum backend_type_t { BE_UNDEFINED=-1, @@ -144,6 +147,9 @@ typedef struct mysql_sescmd_st { GWBUF* my_sescmd_buf; /*< query buffer */ unsigned char my_sescmd_packet_type;/*< packet type */ bool my_sescmd_is_replied; /*< is cmd replied to client */ + unsigned char reply_cmd; /*< The reply command. One of OK, ERR, RESULTSET or + * LOCAL_INFILE. Slave servers are compared to this + * when they return session command replies.*/ #if defined(SS_DEBUG) skygw_chk_t my_sescmd_chk_tail; #endif @@ -226,6 +232,9 @@ typedef struct backend_ref_st { int bref_num_result_wait; sescmd_cursor_t bref_sescmd_cur; GWBUF* bref_pending_cmd; /*< For stmt which can't be routed due active sescmd execution */ + unsigned char + reply_cmd; /*< The reply the backend server sent to a session command. + * Used to detect slaves that fail to execute session command. */ #if defined(SS_DEBUG) skygw_chk_t bref_chk_tail; #endif diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index e90887d92..eed1fcf65 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -261,7 +261,7 @@ static mysql_sescmd_t* sescmd_cursor_get_command( static bool sescmd_cursor_next( sescmd_cursor_t* scur); -static GWBUF* sescmd_cursor_process_replies(GWBUF* replybuf, backend_ref_t* bref); +static GWBUF* sescmd_cursor_process_replies(GWBUF* replybuf, backend_ref_t* bref,bool*); static void tracelog_routed_query( ROUTER_CLIENT_SES* rses, @@ -905,6 +905,15 @@ static void* newSession( client_rses->rses_capabilities = RCAP_TYPE_STMT_INPUT; client_rses->rses_backend_ref = backend_ref; client_rses->rses_nbackends = router_nservers; /*< # of backend servers */ + + if(client_rses->rses_config.rw_max_slave_conn_percent) + { + int n_conn = 0; + double pct = (double)client_rses->rses_config.rw_max_slave_conn_percent / 100.0; + n_conn = MAX(floor((double)client_rses->rses_nbackends * pct),1); + client_rses->rses_config.rw_max_slave_conn_count = n_conn; + } + router->stats.n_sessions += 1; /** @@ -2666,11 +2675,13 @@ static void clientReply ( DCB* backend_dcb) { DCB* client_dcb; + ROUTER_INSTANCE* router_inst; ROUTER_CLIENT_SES* router_cli_ses; sescmd_cursor_t* scur = NULL; backend_ref_t* bref; router_cli_ses = (ROUTER_CLIENT_SES *)router_session; + router_inst = (ROUTER_INSTANCE*)instance; CHK_CLIENT_RSES(router_cli_ses); /** @@ -2767,7 +2778,20 @@ static void clientReply ( * the client. Return with buffer including response that * needs to be sent to client or NULL. */ - writebuf = sescmd_cursor_process_replies(writebuf, bref); + bool rconn = false; + writebuf = sescmd_cursor_process_replies(writebuf, bref, &rconn); + + if(rconn) + { + select_connect_backend_servers(&router_cli_ses->rses_master_ref, + router_cli_ses->rses_backend_ref, + router_cli_ses->rses_nbackends, + router_cli_ses->rses_config.rw_max_slave_conn_count, + router_cli_ses->rses_config.rw_max_slave_replication_lag, + router_cli_ses->rses_config.rw_slave_select_criteria, + router_cli_ses->rses_master_ref->bref_dcb->session, + router_cli_ses->router); + } } /** * If response will be sent to client, decrease waiter count. @@ -3185,6 +3209,11 @@ static bool select_connect_backend_servers( (master_host != NULL && (b->backend_server != master_host->backend_server))) { + if(BREF_HAS_FAILED(&backend_ref[i])) + { + continue; + } + slaves_found += 1; /** Slave is already connected */ @@ -3674,15 +3703,19 @@ static void mysql_sescmd_done( */ static GWBUF* sescmd_cursor_process_replies( GWBUF* replybuf, - backend_ref_t* bref) + backend_ref_t* bref, + bool *reconnect) { mysql_sescmd_t* scmd; sescmd_cursor_t* scur; - + ROUTER_CLIENT_SES* ses; + ROUTER_INSTANCE* router; + scur = &bref->bref_sescmd_cur; ss_dassert(SPINLOCK_IS_LOCKED(&(scur->scmd_cur_rses->rses_lock))); scmd = sescmd_cursor_get_command(scur); - + ses = (*scur->scmd_cur_ptr_property)->rses_prop_rsession; + router = ses->router; CHK_GWBUF(replybuf); /** @@ -3691,6 +3724,7 @@ static GWBUF* sescmd_cursor_process_replies( */ while (scmd != NULL && replybuf != NULL) { + bref->reply_cmd = *((unsigned char*)replybuf->start + 4); /** Faster backend has already responded to client : discard */ if (scmd->my_sescmd_is_replied) { @@ -3709,14 +3743,68 @@ static GWBUF* sescmd_cursor_process_replies( } /** Set response status received */ bref_clear_state(bref, BREF_WAITING_RESULT); + + if(bref->reply_cmd != scmd->reply_cmd) + { + skygw_log_write(LOGFILE_TRACE,"Backend " + "server '%s' response differs from master's response. " + "Closing connection.", + bref->bref_backend->backend_server->unique_name); + sescmd_cursor_set_active(scur,false); + bref_clear_state(bref,BREF_QUERY_ACTIVE); + bref_clear_state(bref,BREF_IN_USE); + bref_set_state(bref,BREF_CLOSED); + bref_set_state(bref,BREF_SESCMD_FAILED); + dcb_close(bref->bref_dcb); + *reconnect = true; + if(replybuf) + gwbuf_free(replybuf); + } } - /** Response is in the buffer and it will be sent to client. */ - else + /** This is a response from the master and it is the "right" one. + * A slave server's response will be compared to this and if + * their response differs from the master server's response, they + * are dropped from the valid list of backend servers. + * Response is in the buffer and it will be sent to client. */ + else if(ses->rses_master_ref->bref_dcb == bref->bref_dcb) { /** Mark the rest session commands as replied */ scmd->my_sescmd_is_replied = true; + scmd->reply_cmd = *((unsigned char*)replybuf->start + 4); + skygw_log_write(LOGFILE_DEBUG,"Master '%s' responded to a session command.", + bref->bref_backend->backend_server->unique_name); + int i; + + for(i=0;irses_nbackends;i++) + { + if(!BREF_IS_WAITING_RESULT(&ses->rses_backend_ref[i])) + { + /** This backend has already received a response */ + if(ses->rses_backend_ref[i].reply_cmd != + scmd->reply_cmd && + !BREF_IS_CLOSED(&ses->rses_backend_ref[i])) + { + bref_clear_state(&ses->rses_backend_ref[i],BREF_QUERY_ACTIVE); + bref_clear_state(&ses->rses_backend_ref[i],BREF_IN_USE); + bref_set_state(&ses->rses_backend_ref[i],BREF_CLOSED); + bref_set_state(bref,BREF_SESCMD_FAILED); + dcb_close(ses->rses_backend_ref[i].bref_dcb); + *reconnect = true; + } + } + } + } - + else + { + skygw_log_write(LOGFILE_DEBUG,"Slave '%s' responded faster to a session command.", + bref->bref_backend->backend_server->unique_name); + if(replybuf) + gwbuf_free(replybuf); + return NULL; + } + + if (sescmd_cursor_next(scur)) { scmd = sescmd_cursor_get_command(scur); @@ -4217,7 +4305,7 @@ static bool route_session_write( { DCB* dcb = backend_ref[i].bref_dcb; - if (LOG_IS_ENABLED(LOGFILE_TRACE)) + if (LOG_IS_ENABLED(LOGFILE_TRACE) && BREF_IS_IN_USE((&backend_ref[i]))) { LOGIF(LT, (skygw_log_write( LOGFILE_TRACE,