From 15ff1fd26a2cbd4d662d381b5368b6a1cf4b55bd Mon Sep 17 00:00:00 2001 From: VilhoRaatikka Date: Thu, 12 Jun 2014 23:22:51 +0300 Subject: [PATCH] Fixed many error handling issues regading to timing and multiple threads. Added flags to those backend references which have sent something to backend which causes the backend to send results or reply back. Didn't add removal of the flag since there's currently no way to tell whether response from backend contains anything else than session command reply - which aren't counted when BREF_WAITING_RESULT is set and cleared. --- server/core/buffer.c | 2 +- server/core/session.c | 27 ++ server/include/buffer.h | 9 +- server/modules/include/readwritesplit.h | 12 +- server/modules/monitor/mysql_mon.c | 2 +- server/modules/protocol/mysql_backend.c | 86 +++++- server/modules/protocol/mysql_common.c | 158 ++++++++-- .../routing/readwritesplit/readwritesplit.c | 292 ++++++++++++------ 8 files changed, 439 insertions(+), 149 deletions(-) diff --git a/server/core/buffer.c b/server/core/buffer.c index 11fb5b556..db6da8bec 100644 --- a/server/core/buffer.c +++ b/server/core/buffer.c @@ -308,7 +308,7 @@ bool gwbuf_set_type( case GWBUF_TYPE_MYSQL: case GWBUF_TYPE_PLAINSQL: case GWBUF_TYPE_UNDEFINED: - buf->gwbuf_type = type; + buf->gwbuf_type |= type; succp = true; break; default: diff --git a/server/core/session.c b/server/core/session.c index cb392258c..d02498fd4 100644 --- a/server/core/session.c +++ b/server/core/session.c @@ -642,3 +642,30 @@ int i; return 1; } + +bool session_route_query ( + SESSION* ses, + GWBUF* buf) +{ + bool succp; + + if (ses->head.routeQuery == NULL || + ses->head.instance == NULL || + ses->head.session == NULL) + { + succp = false; + goto return_succp; + } + + if (ses->head.routeQuery(ses->head.instance, ses->head.session, buf) == 1) + { + succp = true; + } + else + { + succp = false; + } +return_succp: + return succp; +} + \ No newline at end of file diff --git a/server/include/buffer.h b/server/include/buffer.h index 9651031b2..66c56322b 100644 --- a/server/include/buffer.h +++ b/server/include/buffer.h @@ -46,11 +46,14 @@ typedef enum { - GWBUF_TYPE_UNDEFINED = 0x0, - GWBUF_TYPE_PLAINSQL = 0x1, - GWBUF_TYPE_MYSQL = 0x2 + GWBUF_TYPE_UNDEFINED = 0x00, + GWBUF_TYPE_PLAINSQL = 0x01, + GWBUF_TYPE_MYSQL = 0x02 } gwbuf_type_t; +#define GWBUF_IS_TYPE_PLAINSQL(b) (b->gwbuf_type & GWBUF_TYPE_PLAINSQL) +#define GWBUF_IS_TYPE_MYSQL(b) (b->gwbuf_type & GWBUF_TYPE_MYSQL) + /** * A structure to encapsulate the data in a form that the data itself can be * shared between multiple GWBUF's without the need to make multiple copies diff --git a/server/modules/include/readwritesplit.h b/server/modules/include/readwritesplit.h index ed47d309d..6c35d1804 100644 --- a/server/modules/include/readwritesplit.h +++ b/server/modules/include/readwritesplit.h @@ -32,11 +32,17 @@ #include typedef enum bref_state { - BREF_NOT_USED, - BREF_IN_USE, - BREF_CLOSED + BREF_NOT_USED = 0x00, + BREF_IN_USE = 0x01, + BREF_WAITING_RESULT = 0x02, /*< for anything that responds */ + BREF_CLOSED = 0x08 } bref_state_t; +#define BREF_IS_NOT_USED(s) (s->bref_state & BREF_NOT_USED) +#define BREF_IS_IN_USE(s) (s->bref_state & BREF_IN_USE) +#define BREF_IS_WAITING_RESULT(s) (s->bref_state & BREF_WAITING_RESULT) +#define BREF_IS_CLOSED(s) (s->bref_state & BREF_CLOSED) + typedef enum backend_type_t { BE_UNDEFINED=-1, BE_MASTER, diff --git a/server/modules/monitor/mysql_mon.c b/server/modules/monitor/mysql_mon.c index f39412684..665a3336f 100644 --- a/server/modules/monitor/mysql_mon.c +++ b/server/modules/monitor/mysql_mon.c @@ -339,7 +339,7 @@ static int conn_err_count; return; /** Store prevous status */ - database->mon_prev_status = database->server->status; + database->mon_prev_status = database->server->status; if (database->con == NULL || mysql_ping(database->con) != 0) { diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index b77671143..932886a6e 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -197,6 +197,14 @@ static int gw_read_backend_event(DCB *dcb) { if (gw_read_backend_handshake(backend_protocol) != 0) { backend_protocol->state = MYSQL_AUTH_FAILED; + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_read_backend_event] after " + "gw_read_backend_handshake, fd %d, " + "state = MYSQL_AUTH_FAILED.", + pthread_self(), + backend_protocol->owner_dcb->fd))); + } else { /* handshake decoded, send the auth credentials */ if (gw_send_authentication_to_backend( @@ -206,6 +214,13 @@ static int gw_read_backend_event(DCB *dcb) { backend_protocol) != 0) { backend_protocol->state = MYSQL_AUTH_FAILED; + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_read_backend_event] after " + "gw_send_authentication_to_backend " + "fd %d, state = MYSQL_AUTH_FAILED.", + pthread_self(), + backend_protocol->owner_dcb->fd))); } else { backend_protocol->state = MYSQL_AUTH_RECV; } @@ -253,13 +268,21 @@ static int gw_read_backend_event(DCB *dcb) { switch (receive_rc) { case -1: backend_protocol->state = MYSQL_AUTH_FAILED; + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_read_backend_event] after " + "gw_receive_backend_authentication " + "fd %d, state = MYSQL_AUTH_FAILED.", + backend_protocol->owner_dcb->fd, + pthread_self()))); + LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, "Error : Backend server didn't " "accept authentication for user " "%s.", - current_session->user))); + current_session->user))); break; case 1: backend_protocol->state = MYSQL_IDLE; @@ -318,7 +341,11 @@ static int gw_read_backend_event(DCB *dcb) { } spinlock_release(&dcb->delayqlock); - + /** Whole session is being closed so return. */ + if (session->state == SESSION_STATE_STOPPING) + { + goto return_rc; + } /* try reload users' table for next connection */ service_refresh_users(dcb->session->client->service); @@ -341,11 +368,6 @@ static int gw_read_backend_event(DCB *dcb) { } usleep(1); } - - if (session->state == SESSION_STATE_STOPPING) - { - goto return_rc; - } spinlock_acquire(&session->ses_lock); session->state = SESSION_STATE_STOPPING; spinlock_release(&session->ses_lock); @@ -427,7 +449,7 @@ static int gw_read_backend_event(DCB *dcb) { */ router->handleError(router_instance, rsession, - "Read from backend failed.", + "Read from backend failed", dcb, ERRACT_NEW_CONNECTION, &succp); @@ -583,7 +605,8 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) */ spinlock_acquire(&dcb->dcb_initlock); - if (dcb->state != DCB_STATE_POLLING) { + if (dcb->state != DCB_STATE_POLLING) + { /*< vraa : errorHandle */ /*< Free buffer memory */ gwbuf_consume(queue, GWBUF_LENGTH(queue)); @@ -629,11 +652,11 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) queue, GWBUF_LENGTH(queue))) != NULL); free(str); - } rc = 0; spinlock_release(&dcb->authlock); goto return_rc; break; + } case MYSQL_IDLE: LOGIF(LD, (skygw_log_write( @@ -677,7 +700,8 @@ return_rc: * Backend Error Handling for EPOLLER * */ -static int gw_error_backend_event(DCB *dcb) { +static int gw_error_backend_event(DCB *dcb) +{ SESSION *session; void *rsession; ROUTER_OBJECT *router; @@ -692,7 +716,17 @@ static int gw_error_backend_event(DCB *dcb) { router_instance = session->service->router_instance; #if defined(ERRHANDLE2) - router->handleError(); + router->handleError(router_instance, + rsession, + "Connection to backend server failed", + dcb, + ERRACT_NEW_CONNECTION, + &succp); + + if (!succp) + { + dcb_close(dcb); + } #else if (dcb->state != DCB_STATE_POLLING) { /*< vraa : errorHandle */ @@ -866,7 +900,33 @@ static int gw_backend_close(DCB *dcb) { #if defined(ERRHANDLE) - mysql_send_com_quit(dcb, 1); + DCB* client_dcb; + SESSION* session; + GWBUF* quitbuf; + bool succp; + + CHK_DCB(dcb); + session = dcb->session; + CHK_SESSION(session); + + quitbuf = mysql_create_com_quit(NULL, 0); + + /** Send COM_QUIT to the backend being closed */ + mysql_send_com_quit(dcb, 0, quitbuf); + + if (session != NULL && + (session->state == SESSION_STATE_ROUTER_READY || + session->state == SESSION_STATE_READY)) + { + client_dcb = session->client; + + if (client_dcb != NULL && + client_dcb->state == DCB_STATE_POLLING) + { + /** Close client DCB */ + dcb_close(client_dcb); + } + } #else dcb_close(dcb); #endif diff --git a/server/modules/protocol/mysql_common.c b/server/modules/protocol/mysql_common.c index a620c2d8b..4ab8ad3ea 100644 --- a/server/modules/protocol/mysql_common.c +++ b/server/modules/protocol/mysql_common.c @@ -126,7 +126,9 @@ void gw_mysql_close(MySQLProtocol **ptr) { * @param conn MySQL protocol structure * @return 0 on success, 1 on failure */ -int gw_read_backend_handshake(MySQLProtocol *conn) { +int gw_read_backend_handshake( + MySQLProtocol *conn) +{ GWBUF *head = NULL; DCB *dcb = conn->owner_dcb; int n = -1; @@ -135,12 +137,14 @@ int gw_read_backend_handshake(MySQLProtocol *conn) { int success = 0; int packet_len = 0; - if ((n = dcb_read(dcb, &head)) != -1) { - if (head) { + if ((n = dcb_read(dcb, &head)) != -1) + { + if (head) + { payload = GWBUF_DATA(head); h_len = gwbuf_length(head); - - /* + + /** * The mysql packets content starts at byte fifth * just return with less bytes */ @@ -148,10 +152,45 @@ int gw_read_backend_handshake(MySQLProtocol *conn) { if (h_len <= 4) { /* log error this exit point */ conn->state = MYSQL_AUTH_FAILED; + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_read_backend_handshake] after " + "dcb_read, fd %d, " + "state = MYSQL_AUTH_FAILED.", + dcb->fd, + pthread_self()))); + return 1; } - //get mysql packet size, 3 bytes + if (payload[4] == 0xff) + { + size_t len = MYSQL_GET_PACKET_LEN(payload); + uint16_t errcode = MYSQL_GET_ERRCODE(payload); + char* bufstr = strndup(&((char *)payload)[7], len-3); + + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_receive_backend_auth] Invalid " + "authentication message from backend dcb %p " + "fd %d, ptr[4] = %p, error code %d, msg %s.", + pthread_self(), + dcb, + dcb->fd, + payload[4], + errcode, + bufstr))); + + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Invalid authentication message " + "from backend. Error code: %d, Msg : %s", + errcode, + bufstr))); + + free(bufstr); + } + //get mysql packet size, 3 bytes packet_len = gw_mysql_get_byte3(payload); if (h_len < (packet_len + 4)) { @@ -160,6 +199,15 @@ int gw_read_backend_handshake(MySQLProtocol *conn) { * packet. Log error this exit point */ conn->state = MYSQL_AUTH_FAILED; + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_read_backend_handshake] after " + "gw_mysql_get_byte3, fd %d, " + "state = MYSQL_AUTH_FAILED.", + pthread_self(), + dcb->fd, + pthread_self()))); + return 1; } @@ -176,6 +224,15 @@ int gw_read_backend_handshake(MySQLProtocol *conn) { * log error this exit point */ conn->state = MYSQL_AUTH_FAILED; + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_read_backend_handshake] after " + "gw_decode_mysql_server_handshake, fd %d, " + "state = MYSQL_AUTH_FAILED.", + pthread_self(), + conn->owner_dcb->fd, + pthread_self()))); + return 1; } @@ -202,7 +259,10 @@ int gw_read_backend_handshake(MySQLProtocol *conn) { * @return 0 on success, < 0 on failure * */ -int gw_decode_mysql_server_handshake(MySQLProtocol *conn, uint8_t *payload) { +int gw_decode_mysql_server_handshake( + MySQLProtocol *conn, + uint8_t *payload) +{ uint8_t *server_version_end = NULL; uint16_t mysql_server_capabilities_one = 0; uint16_t mysql_server_capabilities_two = 0; @@ -216,8 +276,8 @@ int gw_decode_mysql_server_handshake(MySQLProtocol *conn, uint8_t *payload) { protocol_version = payload[0]; - if (protocol_version != GW_MYSQL_PROTOCOL_VERSION) { - /* log error for this */ + if (protocol_version != GW_MYSQL_PROTOCOL_VERSION) + { return -1; } @@ -257,19 +317,23 @@ int gw_decode_mysql_server_handshake(MySQLProtocol *conn, uint8_t *payload) { payload+=2; // get scramble len - if (payload[0] > 0) { + if (payload[0] > 0) + { scramble_len = payload[0] -1; ss_dassert(scramble_len > GW_SCRAMBLE_LENGTH_323); ss_dassert(scramble_len <= GW_MYSQL_SCRAMBLE_SIZE); - if ( (scramble_len < GW_SCRAMBLE_LENGTH_323) || scramble_len > GW_MYSQL_SCRAMBLE_SIZE) { + if ((scramble_len < GW_SCRAMBLE_LENGTH_323) || + scramble_len > GW_MYSQL_SCRAMBLE_SIZE) + { /* log this */ - return -2; + return -2; } - } else { + } + else + { scramble_len = GW_MYSQL_SCRAMBLE_SIZE; } - // skip 10 zero bytes payload += 11; @@ -322,8 +386,8 @@ int gw_receive_backend_auth( else if (ptr[4] == 0xff) { size_t len = MYSQL_GET_PACKET_LEN(ptr); - char* err = strndup(&ptr[8], 5); - char* bufstr = strndup(&ptr[13], len-4-5); + char* err = strndup(&((char *)ptr)[8], 5); + char* bufstr = strndup(&((char *)ptr)[13], len-4-5); LOGIF(LD, (skygw_log_write( LOGFILE_DEBUG, @@ -737,31 +801,28 @@ gw_mysql_protocol_state2string (int state) { } } -int mysql_send_com_quit( - DCB* dcb, - int packet_number) +GWBUF* mysql_create_com_quit( + GWBUF* bufparam, + int packet_number) { - uint8_t *data; - GWBUF *buf; - int nbytes = 0; - - CHK_DCB(dcb); - ss_dassert(packet_number <= 255); + uint8_t* data; + GWBUF* buf; - if (dcb == NULL || - (dcb->state != DCB_STATE_NOPOLLING && - dcb->state != DCB_STATE_ZOMBIE)) + if (bufparam == NULL) { - return 0; - } - - buf = gwbuf_alloc(COM_QUIT_PACKET_SIZE); - ss_dassert(buf != NULL); + buf = gwbuf_alloc(COM_QUIT_PACKET_SIZE); + } + else + { + buf = bufparam; + } if (buf == NULL) { return 0; } + ss_dassert(GWBUF_LENGTH(buf) == COM_QUIT_PACKET_SIZE); + data = GWBUF_DATA(buf); *data++ = 0x1; @@ -770,6 +831,37 @@ int mysql_send_com_quit( *data++ = packet_number; *data = 0x1; + return buf; +} + +int mysql_send_com_quit( + DCB* dcb, + int packet_number, + GWBUF* bufparam) +{ + GWBUF *buf; + int nbytes = 0; + + CHK_DCB(dcb); + ss_dassert(packet_number <= 255); + + if (dcb == NULL || dcb->state == DCB_STATE_ZOMBIE) + { + return 0; + } + if (bufparam == NULL) + { + buf = mysql_create_com_quit(NULL, packet_number); + } + else + { + buf = bufparam; + } + + if (buf == NULL) + { + return 0; + } nbytes = dcb->func.write(dcb, buf); return nbytes; diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 3e521e5cf..951e2f343 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -93,6 +93,7 @@ static void handleError( static void print_error_packet(ROUTER_CLIENT_SES* rses, GWBUF* buf, DCB* dcb); static int router_get_servercount(ROUTER_INSTANCE* router); static int rses_get_max_slavecount(ROUTER_CLIENT_SES* rses, int router_nservers); +static backend_ref_t* get_bref_from_dcb(ROUTER_CLIENT_SES* rses, DCB* dcb); static uint8_t getCapabilities (ROUTER* inst, void* router_session); @@ -200,7 +201,8 @@ static bool sescmd_cursor_next( static GWBUF* sescmd_cursor_process_replies( DCB* client_dcb, GWBUF* replybuf, - sescmd_cursor_t* scur); + sescmd_cursor_t* scur, + bool* has_query); static void tracelog_routed_query( ROUTER_CLIENT_SES* rses, @@ -219,6 +221,10 @@ static void refreshInstance( ROUTER_INSTANCE* router, CONFIG_PARAMETER* param); +static void bref_clear_state(backend_ref_t* bref, bref_state_t state); +static void bref_set_state(backend_ref_t* bref, bref_state_t state); + + static SPINLOCK instlock; static ROUTER_INSTANCE* instances; @@ -453,6 +459,7 @@ static void* newSession( * router instance first. */ spinlock_acquire(&router->lock); + if (router->service->svc_config_version > router->rwsplit_version) { CONFIG_PARAMETER* param = router->service->svc_config_param; @@ -556,7 +563,8 @@ static void* newSession( backend_ref[i].bref_sescmd_cur.scmd_cur_chk_top = CHK_NUM_SESCMD_CUR; backend_ref[i].bref_sescmd_cur.scmd_cur_chk_tail = CHK_NUM_SESCMD_CUR; #endif - backend_ref[i].bref_state = BREF_NOT_USED; + backend_ref[i].bref_state = 0; + bref_set_state(&backend_ref[i], BREF_NOT_USED); backend_ref[i].bref_backend = router->servers[i]; /** store pointers to sescmd list to both cursors */ backend_ref[i].bref_sescmd_cur.scmd_cur_rses = client_rses; @@ -633,7 +641,15 @@ static void closeSession( { ROUTER_CLIENT_SES* router_cli_ses; backend_ref_t* backend_ref; - + + /** + * router session can be NULL if newSession failed and it is discarding + * its connections and DCB's. + */ + if (router_session == NULL) + { + return; + } router_cli_ses = (ROUTER_CLIENT_SES *)router_session; CHK_CLIENT_RSES(router_cli_ses); @@ -667,15 +683,15 @@ static void closeSession( DCB* dcb = backend_ref[i].bref_dcb; /** Close those which had been connected */ - if (backend_ref[i].bref_state == BREF_IN_USE) + if (BREF_IS_IN_USE((&backend_ref[i]))) { CHK_DCB(dcb); - backend_ref[i].bref_state = BREF_NOT_USED; - + bref_clear_state(&backend_ref[i], BREF_IN_USE); + bref_set_state(&backend_ref[i], BREF_CLOSED); #if defined(ERRHANDLE) /** - * closes protocol and dcb - */ + * closes protocol and dcb + */ dcb_close(dcb); #else dcb->func.close(dcb); @@ -705,12 +721,10 @@ static void freeSession( for (i=0; irses_nbackends; i++) { - if (backend_ref[i].bref_state != BREF_IN_USE) + if (!BREF_IS_IN_USE((&backend_ref[i]))) { continue; } - ss_dassert(backend_ref[i].bref_backend->backend_conn_count > 0); - atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1); } spinlock_acquire(&router->lock); @@ -783,8 +797,7 @@ static bool get_dcb( for (i=0; irses_nbackends; i++) { BACKEND* b = backend_ref[i].bref_backend; - - if (backend_ref[i].bref_state == BREF_IN_USE && + if (BREF_IS_IN_USE((&backend_ref[i])) && SERVER_IS_SLAVE(b->backend_server) && (smallest_nconn == -1 || b->backend_conn_count < smallest_nconn)) @@ -799,7 +812,7 @@ static bool get_dcb( { backend_ref = rses->rses_master_ref; - if (backend_ref[i].bref_state == BREF_IN_USE) + if (BREF_IS_IN_USE((&backend_ref[i]))) { *p_dcb = backend_ref->bref_dcb; succp = true; @@ -825,7 +838,7 @@ static bool get_dcb( { BACKEND* b = backend_ref[i].bref_backend; - if (backend_ref[i].bref_state == BREF_IN_USE && + if (BREF_IS_IN_USE((&backend_ref[i])) && (SERVER_IS_MASTER(b->backend_server))) { *p_dcb = backend_ref[i].bref_dcb; @@ -853,7 +866,7 @@ return_succp: * @param session The session associated with the client * @param queue Gateway buffer queue with the packets received * - * @return The number of queries forwarded + * @return if succeed 1, otherwise 0 */ static int routeQuery( ROUTER* instance, @@ -886,17 +899,24 @@ static int routeQuery( if (rses_is_closed) { - LOGIF(LE, - (skygw_log_write_flush( - LOGFILE_ERROR, - "Error: Failed to route %s:%s:\"%s\" to " - "backend server. %s.", - STRPACKETTYPE(packet_type), - STRQTYPE(qtype), - (querystr == NULL ? "(empty)" : querystr), - (rses_is_closed ? "Router was closed" : - "Router has no backend servers where to " - "route to")))); + /** + * COM_QUIT may have sent by client and as a part of backend + * closing procedure. + */ + if (packet_type != COM_QUIT) + { + LOGIF(LE, + (skygw_log_write_flush( + LOGFILE_ERROR, + "Error: Failed to route %s:%s:\"%s\" to " + "backend server. %s.", + STRPACKETTYPE(packet_type), + STRQTYPE(qtype), + (querystr == NULL ? "(empty)" : querystr), + (rses_is_closed ? "Router was closed" : + "Router has no backend servers where to " + "route to")))); + } goto return_ret; } inst->stats.n_queries++; @@ -991,6 +1011,10 @@ static int routeQuery( */ if (QUERY_IS_TYPE(qtype, QUERY_TYPE_SESSION_WRITE)) { + /** + * It is not sure if the session command in question requires + * response. Statement must be examined in route_session_write. + */ bool succp = route_session_write( router_cli_ses, querybuf, @@ -1031,6 +1055,12 @@ static int routeQuery( if ((ret = slave_dcb->func.write(slave_dcb, querybuf)) == 1) { atomic_add(&inst->stats.n_slave, 1); + /** + * This backend_ref waits resultset, flag it. + */ + bref_set_state(get_bref_from_dcb(router_cli_ses, + slave_dcb), + BREF_WAITING_RESULT); } else { @@ -1048,7 +1078,7 @@ static int routeQuery( else { bool succp = true; - + if (LOG_IS_ENABLED(LOGFILE_TRACE)) { if (router_cli_ses->rses_transaction_active) /*< all to master */ @@ -1081,6 +1111,12 @@ static int routeQuery( if ((ret = master_dcb->func.write(master_dcb, querybuf)) == 1) { atomic_add(&inst->stats.n_master, 1); + /** + * This backend_ref waits reply to write stmt, + * flag it. + */ + bref_set_state(get_bref_from_dcb(router_cli_ses, master_dcb), + BREF_WAITING_RESULT); } } rses_end_locked_router_action(router_cli_ses); @@ -1298,10 +1334,17 @@ static void clientReply( */ if (sescmd_cursor_is_active(scur)) { + bool has_query; writebuf = sescmd_cursor_process_replies(client_dcb, writebuf, - scur); + scur, + &has_query); + if (has_query) + { + bref_clear_state(backend_ref, BREF_WAITING_RESULT); + } } + /** Unlock router session */ rses_end_locked_router_action(router_cli_ses); @@ -1309,9 +1352,6 @@ static void clientReply( { /** Write reply to client DCB */ client_dcb->func.write(client_dcb, writebuf); - /** - * Log reply but use identifier for query - */ } lock_failed: @@ -1349,6 +1389,20 @@ int bref_cmp_behind_master( return 1; } +static void bref_clear_state( + backend_ref_t* bref, + bref_state_t state) +{ + bref->bref_state &= ~state; +} + +static void bref_set_state( + backend_ref_t* bref, + bref_state_t state) +{ + bref->bref_state |= state; +} + /** * @node Search suitable backend servers from those of router instance. * @@ -1409,7 +1463,7 @@ static bool select_connect_backend_servers( /** Master is already chosen and connected. This is slave failure case */ if (*p_master_ref != NULL && - (*p_master_ref)->bref_state == BREF_IN_USE) + BREF_IS_IN_USE((*p_master_ref))) { master_found = true; master_connected = true; @@ -1440,6 +1494,7 @@ static bool select_connect_backend_servers( #if defined(EXTRA_DEBUGGING) LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "Servers and conns before ordering:"))); + for (i=0; ibackend_server->name, - b->backend_server->port, - STRSRVSTATUS(b->backend_server)))); - /* handle connect error */ - } } /*< for */ /** @@ -1670,7 +1719,7 @@ static bool select_connect_backend_servers( { BACKEND* b = backend_ref[i].bref_backend; - if (backend_ref[i].bref_state == BREF_IN_USE) + if (BREF_IS_IN_USE((&backend_ref[i]))) { backend_type_t btype = BACKEND_TYPE(b); @@ -1690,7 +1739,9 @@ static bool select_connect_backend_servers( * Failure cases */ else - { + { + succp = false; + if (!master_found) { LOGIF(LE, (skygw_log_write( @@ -1702,7 +1753,7 @@ static bool select_connect_backend_servers( LOGIF(LM, (skygw_log_write( LOGFILE_MESSAGE, - "* Error : Couldn't find suitable %s from %d " + "Error : Couldn't find suitable %s from %d " "candidates.", (is_synced_master ? "Galera node" : "Master"), router_nservers))); @@ -1726,7 +1777,7 @@ static bool select_connect_backend_servers( LOGIF(LM, (skygw_log_write( LOGFILE_MESSAGE, - "* Error : Couldn't connect to any %s although " + "Error : Couldn't connect to any %s although " "there exists at least one %s node in the " "cluster.", (is_synced_master ? "Galera node" : "Master"), @@ -1750,18 +1801,22 @@ static bool select_connect_backend_servers( LOGIF(LM, (skygw_log_write( LOGFILE_MESSAGE, - "*Error : Couldn't establish required amount of " + "Error : Couldn't establish required amount of " "slave connections for router session."))); } /** Clean up connections */ for (i=0; ibackend_conn_count > 0); + /** disconnect opened connections */ - backend_ref[i].bref_dcb->func.close(backend_ref[i].bref_dcb); + dcb_close(backend_ref[i].bref_dcb); + bref_clear_state(&backend_ref[i], BREF_IN_USE); + bref_set_state(&backend_ref[i], BREF_NOT_USED); + atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1); } } @@ -1955,7 +2010,8 @@ static void mysql_sescmd_done( static GWBUF* sescmd_cursor_process_replies( DCB* client_dcb, GWBUF* replybuf, - sescmd_cursor_t* scur) + sescmd_cursor_t* scur, + bool* has_query) { const size_t headerlen = 4; /*< mysql packet header */ uint8_t* packet; @@ -1971,7 +2027,7 @@ static GWBUF* sescmd_cursor_process_replies( /** * Walk through packets in the message and the list of session - *commands. + * commands. */ while (scmd != NULL && replybuf != NULL) { @@ -1985,32 +2041,11 @@ static GWBUF* sescmd_cursor_process_replies( packet = (uint8_t *)GWBUF_DATA(replybuf); packetlen = packet[0]+packet[1]*256+packet[2]*256*256; replybuf = gwbuf_consume(replybuf, packetlen+headerlen); -/* - LOGIF(LT, (skygw_log_write_flush( - LOGFILE_TRACE, - "%lu [sescmd_cursor_process_replies] cmd %p " - "is already replied. Discarded %d bytes from " - "the %s replybuffer.", - pthread_self(), - scmd, - packetlen+headerlen, - STRBETYPE(scur->scmd_cur_be_type)))); - */ } else { /** Mark the rest session commands as replied */ scmd->my_sescmd_is_replied = true; - /* - LOGIF(LT, (skygw_log_write_flush( - LOGFILE_TRACE, - "%lu [sescmd_cursor_process_replies] Marked " - "cmd %p to as replied. Left message to %s's " - "buffer for reply.", - pthread_self(), - scmd, - STRBETYPE(scur->scmd_cur_be_type)))); - */ } if (sescmd_cursor_next(scur)) @@ -2023,7 +2058,9 @@ static GWBUF* sescmd_cursor_process_replies( /** All session commands are replied */ scur->scmd_cur_active = false; } - } + } + /** vraa:this is set but only because there's not yet way to find out */ + *has_query = false; ss_dassert(replybuf == NULL || *scur->scmd_cur_ptr_property == NULL); return replybuf; @@ -2106,7 +2143,7 @@ static bool execute_sescmd_in_backend( int rc = 0; sescmd_cursor_t* scur; - if (backend_ref->bref_state == BREF_CLOSED) + if (BREF_IS_CLOSED(backend_ref)) { goto return_succp; } @@ -2160,9 +2197,16 @@ static bool execute_sescmd_in_backend( "%lu [execute_sescmd_in_backend] Routed %s cmd %p.", pthread_self(), STRPACKETTYPE(scur->scmd_cur_cmd->my_sescmd_packet_type), - scur->scmd_cur_cmd))); - - if (rc != 1) + scur->scmd_cur_cmd))); + + if (rc == 1) + { + /** + * All but COM_QUIT cause backend to send reply. flag backend_ref. + */ + bref_set_state(backend_ref, BREF_WAITING_RESULT); + } + else { succp = false; } @@ -2381,7 +2425,7 @@ static bool route_session_write( { DCB* dcb = backend_ref[i].bref_dcb; - if (backend_ref[i].bref_state == BREF_IN_USE) + if (BREF_IS_IN_USE((&backend_ref[i]))) { rc = dcb->func.write(dcb, gwbuf_clone(querybuf)); @@ -2412,11 +2456,11 @@ static bool route_session_write( } /** Add sescmd property to router client session */ rses_property_add(router_cli_ses, prop); - + for (i=0; irses_nbackends; i++) { - if (backend_ref[i].bref_state == BREF_IN_USE) - { + if (BREF_IS_IN_USE((&backend_ref[i]))) + { succp = execute_sescmd_in_backend(&backend_ref[i]); if (!succp) @@ -2499,7 +2543,10 @@ static void rwsplit_process_options( * @param message The error message to reply * @param backend_dcb The backend DCB * @param action The action: REPLY, REPLY_AND_CLOSE, NEW_CONNECTION - * + * @param succp Result of action. + * + * Even if succp == true connecting to new slave may have failed. succp is to + * tell whether router has enough master/slave connections to continue work. */ static void handleError ( ROUTER *instance, @@ -2511,7 +2558,7 @@ static void handleError ( { DCB* client_dcb = NULL; SESSION* session = backend_dcb->session; - ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)instance; + ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES *)router_session; client_dcb = session->client; @@ -2523,9 +2570,12 @@ static void handleError ( int router_nservers; int max_nslaves; - router_nservers = router_get_servercount(router); + router_nservers = router_get_servercount(inst); max_nslaves = rses_get_max_slavecount(rses, router_nservers); - + /** + * Try to get replacement slave or at least the minimum + * number of slave connections for router session. + */ *succp = select_connect_backend_servers( &rses->rses_master_ref, rses->rses_backend_ref, @@ -2533,9 +2583,39 @@ static void handleError ( max_nslaves, rses->rses_config.rw_slave_select_criteria, session, - router); + inst); ss_dassert(*succp); + /** Too few or no slaves at all */ + if (!succp) + { + if (session->state == SESSION_STATE_ROUTER_READY) + { + ROUTER* rsession; + ROUTER_OBJECT* router; + + router = session->service->router; + + spinlock_acquire(&session->ses_lock); + session->state = SESSION_STATE_STOPPING; + spinlock_release(&session->ses_lock); + + rsession = session->router_session; + /*< + * rsession should never be NULL here. + */ + ss_dassert(rsession != NULL); + LOGIF(LD, (skygw_log_write_flush( + LOGFILE_DEBUG, + "%lu [gw_error_backend_event] " + "Call closeSession for backend " + "session.", + pthread_self()))); + + router->closeSession(instance, rsession); + } + + } } break; @@ -2598,10 +2678,10 @@ static void print_error_packet( } static int router_get_servercount( - ROUTER_INSTANCE* router) + ROUTER_INSTANCE* inst) { int router_nservers = 0; - BACKEND** b = router->servers; + BACKEND** b = inst->servers; /** count servers */ while (*(b++) != NULL) router_nservers++; @@ -2634,4 +2714,26 @@ static int rses_get_max_slavecount( max_nslaves = MIN(router_nservers-1, MAX(1, conf_max_nslaves)); return max_nslaves; +} + +static backend_ref_t* get_bref_from_dcb( + ROUTER_CLIENT_SES* rses, + DCB* dcb) +{ + backend_ref_t* bref; + + CHK_DCB(dcb); + CHK_CLIENT_RSES(rses); + + bref = rses->rses_backend_ref; + + while (bref != NULL) + { + if (bref->bref_dcb == dcb) + { + break; + } + bref++; + } + return bref; } \ No newline at end of file