diff --git a/server/core/buffer.c b/server/core/buffer.c index 11fb5b556..db6da8bec 100644 --- a/server/core/buffer.c +++ b/server/core/buffer.c @@ -308,7 +308,7 @@ bool gwbuf_set_type( case GWBUF_TYPE_MYSQL: case GWBUF_TYPE_PLAINSQL: case GWBUF_TYPE_UNDEFINED: - buf->gwbuf_type = type; + buf->gwbuf_type |= type; succp = true; break; default: diff --git a/server/core/session.c b/server/core/session.c index cb392258c..d02498fd4 100644 --- a/server/core/session.c +++ b/server/core/session.c @@ -642,3 +642,30 @@ int i; return 1; } + +bool session_route_query ( + SESSION* ses, + GWBUF* buf) +{ + bool succp; + + if (ses->head.routeQuery == NULL || + ses->head.instance == NULL || + ses->head.session == NULL) + { + succp = false; + goto return_succp; + } + + if (ses->head.routeQuery(ses->head.instance, ses->head.session, buf) == 1) + { + succp = true; + } + else + { + succp = false; + } +return_succp: + return succp; +} + \ No newline at end of file diff --git a/server/include/buffer.h b/server/include/buffer.h index 9651031b2..66c56322b 100644 --- a/server/include/buffer.h +++ b/server/include/buffer.h @@ -46,11 +46,14 @@ typedef enum { - GWBUF_TYPE_UNDEFINED = 0x0, - GWBUF_TYPE_PLAINSQL = 0x1, - GWBUF_TYPE_MYSQL = 0x2 + GWBUF_TYPE_UNDEFINED = 0x00, + GWBUF_TYPE_PLAINSQL = 0x01, + GWBUF_TYPE_MYSQL = 0x02 } gwbuf_type_t; +#define GWBUF_IS_TYPE_PLAINSQL(b) (b->gwbuf_type & GWBUF_TYPE_PLAINSQL) +#define GWBUF_IS_TYPE_MYSQL(b) (b->gwbuf_type & GWBUF_TYPE_MYSQL) + /** * A structure to encapsulate the data in a form that the data itself can be * shared between multiple GWBUF's without the need to make multiple copies diff --git a/server/modules/include/readwritesplit.h b/server/modules/include/readwritesplit.h index ed47d309d..6c35d1804 100644 --- a/server/modules/include/readwritesplit.h +++ b/server/modules/include/readwritesplit.h @@ -32,11 +32,17 @@ #include typedef enum bref_state { - BREF_NOT_USED, - BREF_IN_USE, - BREF_CLOSED + BREF_NOT_USED = 0x00, + BREF_IN_USE = 0x01, + BREF_WAITING_RESULT = 0x02, /*< for anything that responds */ + BREF_CLOSED = 0x08 } bref_state_t; +#define BREF_IS_NOT_USED(s) (s->bref_state & BREF_NOT_USED) +#define BREF_IS_IN_USE(s) (s->bref_state & BREF_IN_USE) +#define BREF_IS_WAITING_RESULT(s) (s->bref_state & BREF_WAITING_RESULT) +#define BREF_IS_CLOSED(s) (s->bref_state & BREF_CLOSED) + typedef enum backend_type_t { BE_UNDEFINED=-1, BE_MASTER, diff --git a/server/modules/monitor/mysql_mon.c b/server/modules/monitor/mysql_mon.c index f39412684..665a3336f 100644 --- a/server/modules/monitor/mysql_mon.c +++ b/server/modules/monitor/mysql_mon.c @@ -339,7 +339,7 @@ static int conn_err_count; return; /** Store prevous status */ - database->mon_prev_status = database->server->status; + database->mon_prev_status = database->server->status; if (database->con == NULL || mysql_ping(database->con) != 0) { diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index b77671143..932886a6e 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -197,6 +197,14 @@ static int gw_read_backend_event(DCB *dcb) { if (gw_read_backend_handshake(backend_protocol) != 0) { backend_protocol->state = MYSQL_AUTH_FAILED; + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_read_backend_event] after " + "gw_read_backend_handshake, fd %d, " + "state = MYSQL_AUTH_FAILED.", + pthread_self(), + backend_protocol->owner_dcb->fd))); + } else { /* handshake decoded, send the auth credentials */ if (gw_send_authentication_to_backend( @@ -206,6 +214,13 @@ static int gw_read_backend_event(DCB *dcb) { backend_protocol) != 0) { backend_protocol->state = MYSQL_AUTH_FAILED; + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_read_backend_event] after " + "gw_send_authentication_to_backend " + "fd %d, state = MYSQL_AUTH_FAILED.", + pthread_self(), + backend_protocol->owner_dcb->fd))); } else { backend_protocol->state = MYSQL_AUTH_RECV; } @@ -253,13 +268,21 @@ static int gw_read_backend_event(DCB *dcb) { switch (receive_rc) { case -1: backend_protocol->state = MYSQL_AUTH_FAILED; + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_read_backend_event] after " + "gw_receive_backend_authentication " + "fd %d, state = MYSQL_AUTH_FAILED.", + backend_protocol->owner_dcb->fd, + pthread_self()))); + LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, "Error : Backend server didn't " "accept authentication for user " "%s.", - current_session->user))); + current_session->user))); break; case 1: backend_protocol->state = MYSQL_IDLE; @@ -318,7 +341,11 @@ static int gw_read_backend_event(DCB *dcb) { } spinlock_release(&dcb->delayqlock); - + /** Whole session is being closed so return. */ + if (session->state == SESSION_STATE_STOPPING) + { + goto return_rc; + } /* try reload users' table for next connection */ service_refresh_users(dcb->session->client->service); @@ -341,11 +368,6 @@ static int gw_read_backend_event(DCB *dcb) { } usleep(1); } - - if (session->state == SESSION_STATE_STOPPING) - { - goto return_rc; - } spinlock_acquire(&session->ses_lock); session->state = SESSION_STATE_STOPPING; spinlock_release(&session->ses_lock); @@ -427,7 +449,7 @@ static int gw_read_backend_event(DCB *dcb) { */ router->handleError(router_instance, rsession, - "Read from backend failed.", + "Read from backend failed", dcb, ERRACT_NEW_CONNECTION, &succp); @@ -583,7 +605,8 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) */ spinlock_acquire(&dcb->dcb_initlock); - if (dcb->state != DCB_STATE_POLLING) { + if (dcb->state != DCB_STATE_POLLING) + { /*< vraa : errorHandle */ /*< Free buffer memory */ gwbuf_consume(queue, GWBUF_LENGTH(queue)); @@ -629,11 +652,11 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) queue, GWBUF_LENGTH(queue))) != NULL); free(str); - } rc = 0; spinlock_release(&dcb->authlock); goto return_rc; break; + } case MYSQL_IDLE: LOGIF(LD, (skygw_log_write( @@ -677,7 +700,8 @@ return_rc: * Backend Error Handling for EPOLLER * */ -static int gw_error_backend_event(DCB *dcb) { +static int gw_error_backend_event(DCB *dcb) +{ SESSION *session; void *rsession; ROUTER_OBJECT *router; @@ -692,7 +716,17 @@ static int gw_error_backend_event(DCB *dcb) { router_instance = session->service->router_instance; #if defined(ERRHANDLE2) - router->handleError(); + router->handleError(router_instance, + rsession, + "Connection to backend server failed", + dcb, + ERRACT_NEW_CONNECTION, + &succp); + + if (!succp) + { + dcb_close(dcb); + } #else if (dcb->state != DCB_STATE_POLLING) { /*< vraa : errorHandle */ @@ -866,7 +900,33 @@ static int gw_backend_close(DCB *dcb) { #if defined(ERRHANDLE) - mysql_send_com_quit(dcb, 1); + DCB* client_dcb; + SESSION* session; + GWBUF* quitbuf; + bool succp; + + CHK_DCB(dcb); + session = dcb->session; + CHK_SESSION(session); + + quitbuf = mysql_create_com_quit(NULL, 0); + + /** Send COM_QUIT to the backend being closed */ + mysql_send_com_quit(dcb, 0, quitbuf); + + if (session != NULL && + (session->state == SESSION_STATE_ROUTER_READY || + session->state == SESSION_STATE_READY)) + { + client_dcb = session->client; + + if (client_dcb != NULL && + client_dcb->state == DCB_STATE_POLLING) + { + /** Close client DCB */ + dcb_close(client_dcb); + } + } #else dcb_close(dcb); #endif diff --git a/server/modules/protocol/mysql_common.c b/server/modules/protocol/mysql_common.c index a620c2d8b..4ab8ad3ea 100644 --- a/server/modules/protocol/mysql_common.c +++ b/server/modules/protocol/mysql_common.c @@ -126,7 +126,9 @@ void gw_mysql_close(MySQLProtocol **ptr) { * @param conn MySQL protocol structure * @return 0 on success, 1 on failure */ -int gw_read_backend_handshake(MySQLProtocol *conn) { +int gw_read_backend_handshake( + MySQLProtocol *conn) +{ GWBUF *head = NULL; DCB *dcb = conn->owner_dcb; int n = -1; @@ -135,12 +137,14 @@ int gw_read_backend_handshake(MySQLProtocol *conn) { int success = 0; int packet_len = 0; - if ((n = dcb_read(dcb, &head)) != -1) { - if (head) { + if ((n = dcb_read(dcb, &head)) != -1) + { + if (head) + { payload = GWBUF_DATA(head); h_len = gwbuf_length(head); - - /* + + /** * The mysql packets content starts at byte fifth * just return with less bytes */ @@ -148,10 +152,45 @@ int gw_read_backend_handshake(MySQLProtocol *conn) { if (h_len <= 4) { /* log error this exit point */ conn->state = MYSQL_AUTH_FAILED; + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_read_backend_handshake] after " + "dcb_read, fd %d, " + "state = MYSQL_AUTH_FAILED.", + dcb->fd, + pthread_self()))); + return 1; } - //get mysql packet size, 3 bytes + if (payload[4] == 0xff) + { + size_t len = MYSQL_GET_PACKET_LEN(payload); + uint16_t errcode = MYSQL_GET_ERRCODE(payload); + char* bufstr = strndup(&((char *)payload)[7], len-3); + + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_receive_backend_auth] Invalid " + "authentication message from backend dcb %p " + "fd %d, ptr[4] = %p, error code %d, msg %s.", + pthread_self(), + dcb, + dcb->fd, + payload[4], + errcode, + bufstr))); + + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Invalid authentication message " + "from backend. Error code: %d, Msg : %s", + errcode, + bufstr))); + + free(bufstr); + } + //get mysql packet size, 3 bytes packet_len = gw_mysql_get_byte3(payload); if (h_len < (packet_len + 4)) { @@ -160,6 +199,15 @@ int gw_read_backend_handshake(MySQLProtocol *conn) { * packet. Log error this exit point */ conn->state = MYSQL_AUTH_FAILED; + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_read_backend_handshake] after " + "gw_mysql_get_byte3, fd %d, " + "state = MYSQL_AUTH_FAILED.", + pthread_self(), + dcb->fd, + pthread_self()))); + return 1; } @@ -176,6 +224,15 @@ int gw_read_backend_handshake(MySQLProtocol *conn) { * log error this exit point */ conn->state = MYSQL_AUTH_FAILED; + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_read_backend_handshake] after " + "gw_decode_mysql_server_handshake, fd %d, " + "state = MYSQL_AUTH_FAILED.", + pthread_self(), + conn->owner_dcb->fd, + pthread_self()))); + return 1; } @@ -202,7 +259,10 @@ int gw_read_backend_handshake(MySQLProtocol *conn) { * @return 0 on success, < 0 on failure * */ -int gw_decode_mysql_server_handshake(MySQLProtocol *conn, uint8_t *payload) { +int gw_decode_mysql_server_handshake( + MySQLProtocol *conn, + uint8_t *payload) +{ uint8_t *server_version_end = NULL; uint16_t mysql_server_capabilities_one = 0; uint16_t mysql_server_capabilities_two = 0; @@ -216,8 +276,8 @@ int gw_decode_mysql_server_handshake(MySQLProtocol *conn, uint8_t *payload) { protocol_version = payload[0]; - if (protocol_version != GW_MYSQL_PROTOCOL_VERSION) { - /* log error for this */ + if (protocol_version != GW_MYSQL_PROTOCOL_VERSION) + { return -1; } @@ -257,19 +317,23 @@ int gw_decode_mysql_server_handshake(MySQLProtocol *conn, uint8_t *payload) { payload+=2; // get scramble len - if (payload[0] > 0) { + if (payload[0] > 0) + { scramble_len = payload[0] -1; ss_dassert(scramble_len > GW_SCRAMBLE_LENGTH_323); ss_dassert(scramble_len <= GW_MYSQL_SCRAMBLE_SIZE); - if ( (scramble_len < GW_SCRAMBLE_LENGTH_323) || scramble_len > GW_MYSQL_SCRAMBLE_SIZE) { + if ((scramble_len < GW_SCRAMBLE_LENGTH_323) || + scramble_len > GW_MYSQL_SCRAMBLE_SIZE) + { /* log this */ - return -2; + return -2; } - } else { + } + else + { scramble_len = GW_MYSQL_SCRAMBLE_SIZE; } - // skip 10 zero bytes payload += 11; @@ -322,8 +386,8 @@ int gw_receive_backend_auth( else if (ptr[4] == 0xff) { size_t len = MYSQL_GET_PACKET_LEN(ptr); - char* err = strndup(&ptr[8], 5); - char* bufstr = strndup(&ptr[13], len-4-5); + char* err = strndup(&((char *)ptr)[8], 5); + char* bufstr = strndup(&((char *)ptr)[13], len-4-5); LOGIF(LD, (skygw_log_write( LOGFILE_DEBUG, @@ -737,31 +801,28 @@ gw_mysql_protocol_state2string (int state) { } } -int mysql_send_com_quit( - DCB* dcb, - int packet_number) +GWBUF* mysql_create_com_quit( + GWBUF* bufparam, + int packet_number) { - uint8_t *data; - GWBUF *buf; - int nbytes = 0; - - CHK_DCB(dcb); - ss_dassert(packet_number <= 255); + uint8_t* data; + GWBUF* buf; - if (dcb == NULL || - (dcb->state != DCB_STATE_NOPOLLING && - dcb->state != DCB_STATE_ZOMBIE)) + if (bufparam == NULL) { - return 0; - } - - buf = gwbuf_alloc(COM_QUIT_PACKET_SIZE); - ss_dassert(buf != NULL); + buf = gwbuf_alloc(COM_QUIT_PACKET_SIZE); + } + else + { + buf = bufparam; + } if (buf == NULL) { return 0; } + ss_dassert(GWBUF_LENGTH(buf) == COM_QUIT_PACKET_SIZE); + data = GWBUF_DATA(buf); *data++ = 0x1; @@ -770,6 +831,37 @@ int mysql_send_com_quit( *data++ = packet_number; *data = 0x1; + return buf; +} + +int mysql_send_com_quit( + DCB* dcb, + int packet_number, + GWBUF* bufparam) +{ + GWBUF *buf; + int nbytes = 0; + + CHK_DCB(dcb); + ss_dassert(packet_number <= 255); + + if (dcb == NULL || dcb->state == DCB_STATE_ZOMBIE) + { + return 0; + } + if (bufparam == NULL) + { + buf = mysql_create_com_quit(NULL, packet_number); + } + else + { + buf = bufparam; + } + + if (buf == NULL) + { + return 0; + } nbytes = dcb->func.write(dcb, buf); return nbytes; diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 3e521e5cf..951e2f343 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -93,6 +93,7 @@ static void handleError( static void print_error_packet(ROUTER_CLIENT_SES* rses, GWBUF* buf, DCB* dcb); static int router_get_servercount(ROUTER_INSTANCE* router); static int rses_get_max_slavecount(ROUTER_CLIENT_SES* rses, int router_nservers); +static backend_ref_t* get_bref_from_dcb(ROUTER_CLIENT_SES* rses, DCB* dcb); static uint8_t getCapabilities (ROUTER* inst, void* router_session); @@ -200,7 +201,8 @@ static bool sescmd_cursor_next( static GWBUF* sescmd_cursor_process_replies( DCB* client_dcb, GWBUF* replybuf, - sescmd_cursor_t* scur); + sescmd_cursor_t* scur, + bool* has_query); static void tracelog_routed_query( ROUTER_CLIENT_SES* rses, @@ -219,6 +221,10 @@ static void refreshInstance( ROUTER_INSTANCE* router, CONFIG_PARAMETER* param); +static void bref_clear_state(backend_ref_t* bref, bref_state_t state); +static void bref_set_state(backend_ref_t* bref, bref_state_t state); + + static SPINLOCK instlock; static ROUTER_INSTANCE* instances; @@ -453,6 +459,7 @@ static void* newSession( * router instance first. */ spinlock_acquire(&router->lock); + if (router->service->svc_config_version > router->rwsplit_version) { CONFIG_PARAMETER* param = router->service->svc_config_param; @@ -556,7 +563,8 @@ static void* newSession( backend_ref[i].bref_sescmd_cur.scmd_cur_chk_top = CHK_NUM_SESCMD_CUR; backend_ref[i].bref_sescmd_cur.scmd_cur_chk_tail = CHK_NUM_SESCMD_CUR; #endif - backend_ref[i].bref_state = BREF_NOT_USED; + backend_ref[i].bref_state = 0; + bref_set_state(&backend_ref[i], BREF_NOT_USED); backend_ref[i].bref_backend = router->servers[i]; /** store pointers to sescmd list to both cursors */ backend_ref[i].bref_sescmd_cur.scmd_cur_rses = client_rses; @@ -633,7 +641,15 @@ static void closeSession( { ROUTER_CLIENT_SES* router_cli_ses; backend_ref_t* backend_ref; - + + /** + * router session can be NULL if newSession failed and it is discarding + * its connections and DCB's. + */ + if (router_session == NULL) + { + return; + } router_cli_ses = (ROUTER_CLIENT_SES *)router_session; CHK_CLIENT_RSES(router_cli_ses); @@ -667,15 +683,15 @@ static void closeSession( DCB* dcb = backend_ref[i].bref_dcb; /** Close those which had been connected */ - if (backend_ref[i].bref_state == BREF_IN_USE) + if (BREF_IS_IN_USE((&backend_ref[i]))) { CHK_DCB(dcb); - backend_ref[i].bref_state = BREF_NOT_USED; - + bref_clear_state(&backend_ref[i], BREF_IN_USE); + bref_set_state(&backend_ref[i], BREF_CLOSED); #if defined(ERRHANDLE) /** - * closes protocol and dcb - */ + * closes protocol and dcb + */ dcb_close(dcb); #else dcb->func.close(dcb); @@ -705,12 +721,10 @@ static void freeSession( for (i=0; irses_nbackends; i++) { - if (backend_ref[i].bref_state != BREF_IN_USE) + if (!BREF_IS_IN_USE((&backend_ref[i]))) { continue; } - ss_dassert(backend_ref[i].bref_backend->backend_conn_count > 0); - atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1); } spinlock_acquire(&router->lock); @@ -783,8 +797,7 @@ static bool get_dcb( for (i=0; irses_nbackends; i++) { BACKEND* b = backend_ref[i].bref_backend; - - if (backend_ref[i].bref_state == BREF_IN_USE && + if (BREF_IS_IN_USE((&backend_ref[i])) && SERVER_IS_SLAVE(b->backend_server) && (smallest_nconn == -1 || b->backend_conn_count < smallest_nconn)) @@ -799,7 +812,7 @@ static bool get_dcb( { backend_ref = rses->rses_master_ref; - if (backend_ref[i].bref_state == BREF_IN_USE) + if (BREF_IS_IN_USE((&backend_ref[i]))) { *p_dcb = backend_ref->bref_dcb; succp = true; @@ -825,7 +838,7 @@ static bool get_dcb( { BACKEND* b = backend_ref[i].bref_backend; - if (backend_ref[i].bref_state == BREF_IN_USE && + if (BREF_IS_IN_USE((&backend_ref[i])) && (SERVER_IS_MASTER(b->backend_server))) { *p_dcb = backend_ref[i].bref_dcb; @@ -853,7 +866,7 @@ return_succp: * @param session The session associated with the client * @param queue Gateway buffer queue with the packets received * - * @return The number of queries forwarded + * @return if succeed 1, otherwise 0 */ static int routeQuery( ROUTER* instance, @@ -886,17 +899,24 @@ static int routeQuery( if (rses_is_closed) { - LOGIF(LE, - (skygw_log_write_flush( - LOGFILE_ERROR, - "Error: Failed to route %s:%s:\"%s\" to " - "backend server. %s.", - STRPACKETTYPE(packet_type), - STRQTYPE(qtype), - (querystr == NULL ? "(empty)" : querystr), - (rses_is_closed ? "Router was closed" : - "Router has no backend servers where to " - "route to")))); + /** + * COM_QUIT may have sent by client and as a part of backend + * closing procedure. + */ + if (packet_type != COM_QUIT) + { + LOGIF(LE, + (skygw_log_write_flush( + LOGFILE_ERROR, + "Error: Failed to route %s:%s:\"%s\" to " + "backend server. %s.", + STRPACKETTYPE(packet_type), + STRQTYPE(qtype), + (querystr == NULL ? "(empty)" : querystr), + (rses_is_closed ? "Router was closed" : + "Router has no backend servers where to " + "route to")))); + } goto return_ret; } inst->stats.n_queries++; @@ -991,6 +1011,10 @@ static int routeQuery( */ if (QUERY_IS_TYPE(qtype, QUERY_TYPE_SESSION_WRITE)) { + /** + * It is not sure if the session command in question requires + * response. Statement must be examined in route_session_write. + */ bool succp = route_session_write( router_cli_ses, querybuf, @@ -1031,6 +1055,12 @@ static int routeQuery( if ((ret = slave_dcb->func.write(slave_dcb, querybuf)) == 1) { atomic_add(&inst->stats.n_slave, 1); + /** + * This backend_ref waits resultset, flag it. + */ + bref_set_state(get_bref_from_dcb(router_cli_ses, + slave_dcb), + BREF_WAITING_RESULT); } else { @@ -1048,7 +1078,7 @@ static int routeQuery( else { bool succp = true; - + if (LOG_IS_ENABLED(LOGFILE_TRACE)) { if (router_cli_ses->rses_transaction_active) /*< all to master */ @@ -1081,6 +1111,12 @@ static int routeQuery( if ((ret = master_dcb->func.write(master_dcb, querybuf)) == 1) { atomic_add(&inst->stats.n_master, 1); + /** + * This backend_ref waits reply to write stmt, + * flag it. + */ + bref_set_state(get_bref_from_dcb(router_cli_ses, master_dcb), + BREF_WAITING_RESULT); } } rses_end_locked_router_action(router_cli_ses); @@ -1298,10 +1334,17 @@ static void clientReply( */ if (sescmd_cursor_is_active(scur)) { + bool has_query; writebuf = sescmd_cursor_process_replies(client_dcb, writebuf, - scur); + scur, + &has_query); + if (has_query) + { + bref_clear_state(backend_ref, BREF_WAITING_RESULT); + } } + /** Unlock router session */ rses_end_locked_router_action(router_cli_ses); @@ -1309,9 +1352,6 @@ static void clientReply( { /** Write reply to client DCB */ client_dcb->func.write(client_dcb, writebuf); - /** - * Log reply but use identifier for query - */ } lock_failed: @@ -1349,6 +1389,20 @@ int bref_cmp_behind_master( return 1; } +static void bref_clear_state( + backend_ref_t* bref, + bref_state_t state) +{ + bref->bref_state &= ~state; +} + +static void bref_set_state( + backend_ref_t* bref, + bref_state_t state) +{ + bref->bref_state |= state; +} + /** * @node Search suitable backend servers from those of router instance. * @@ -1409,7 +1463,7 @@ static bool select_connect_backend_servers( /** Master is already chosen and connected. This is slave failure case */ if (*p_master_ref != NULL && - (*p_master_ref)->bref_state == BREF_IN_USE) + BREF_IS_IN_USE((*p_master_ref))) { master_found = true; master_connected = true; @@ -1440,6 +1494,7 @@ static bool select_connect_backend_servers( #if defined(EXTRA_DEBUGGING) LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "Servers and conns before ordering:"))); + for (i=0; ibackend_server->name, - b->backend_server->port, - STRSRVSTATUS(b->backend_server)))); - /* handle connect error */ - } } /*< for */ /** @@ -1670,7 +1719,7 @@ static bool select_connect_backend_servers( { BACKEND* b = backend_ref[i].bref_backend; - if (backend_ref[i].bref_state == BREF_IN_USE) + if (BREF_IS_IN_USE((&backend_ref[i]))) { backend_type_t btype = BACKEND_TYPE(b); @@ -1690,7 +1739,9 @@ static bool select_connect_backend_servers( * Failure cases */ else - { + { + succp = false; + if (!master_found) { LOGIF(LE, (skygw_log_write( @@ -1702,7 +1753,7 @@ static bool select_connect_backend_servers( LOGIF(LM, (skygw_log_write( LOGFILE_MESSAGE, - "* Error : Couldn't find suitable %s from %d " + "Error : Couldn't find suitable %s from %d " "candidates.", (is_synced_master ? "Galera node" : "Master"), router_nservers))); @@ -1726,7 +1777,7 @@ static bool select_connect_backend_servers( LOGIF(LM, (skygw_log_write( LOGFILE_MESSAGE, - "* Error : Couldn't connect to any %s although " + "Error : Couldn't connect to any %s although " "there exists at least one %s node in the " "cluster.", (is_synced_master ? "Galera node" : "Master"), @@ -1750,18 +1801,22 @@ static bool select_connect_backend_servers( LOGIF(LM, (skygw_log_write( LOGFILE_MESSAGE, - "*Error : Couldn't establish required amount of " + "Error : Couldn't establish required amount of " "slave connections for router session."))); } /** Clean up connections */ for (i=0; ibackend_conn_count > 0); + /** disconnect opened connections */ - backend_ref[i].bref_dcb->func.close(backend_ref[i].bref_dcb); + dcb_close(backend_ref[i].bref_dcb); + bref_clear_state(&backend_ref[i], BREF_IN_USE); + bref_set_state(&backend_ref[i], BREF_NOT_USED); + atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1); } } @@ -1955,7 +2010,8 @@ static void mysql_sescmd_done( static GWBUF* sescmd_cursor_process_replies( DCB* client_dcb, GWBUF* replybuf, - sescmd_cursor_t* scur) + sescmd_cursor_t* scur, + bool* has_query) { const size_t headerlen = 4; /*< mysql packet header */ uint8_t* packet; @@ -1971,7 +2027,7 @@ static GWBUF* sescmd_cursor_process_replies( /** * Walk through packets in the message and the list of session - *commands. + * commands. */ while (scmd != NULL && replybuf != NULL) { @@ -1985,32 +2041,11 @@ static GWBUF* sescmd_cursor_process_replies( packet = (uint8_t *)GWBUF_DATA(replybuf); packetlen = packet[0]+packet[1]*256+packet[2]*256*256; replybuf = gwbuf_consume(replybuf, packetlen+headerlen); -/* - LOGIF(LT, (skygw_log_write_flush( - LOGFILE_TRACE, - "%lu [sescmd_cursor_process_replies] cmd %p " - "is already replied. Discarded %d bytes from " - "the %s replybuffer.", - pthread_self(), - scmd, - packetlen+headerlen, - STRBETYPE(scur->scmd_cur_be_type)))); - */ } else { /** Mark the rest session commands as replied */ scmd->my_sescmd_is_replied = true; - /* - LOGIF(LT, (skygw_log_write_flush( - LOGFILE_TRACE, - "%lu [sescmd_cursor_process_replies] Marked " - "cmd %p to as replied. Left message to %s's " - "buffer for reply.", - pthread_self(), - scmd, - STRBETYPE(scur->scmd_cur_be_type)))); - */ } if (sescmd_cursor_next(scur)) @@ -2023,7 +2058,9 @@ static GWBUF* sescmd_cursor_process_replies( /** All session commands are replied */ scur->scmd_cur_active = false; } - } + } + /** vraa:this is set but only because there's not yet way to find out */ + *has_query = false; ss_dassert(replybuf == NULL || *scur->scmd_cur_ptr_property == NULL); return replybuf; @@ -2106,7 +2143,7 @@ static bool execute_sescmd_in_backend( int rc = 0; sescmd_cursor_t* scur; - if (backend_ref->bref_state == BREF_CLOSED) + if (BREF_IS_CLOSED(backend_ref)) { goto return_succp; } @@ -2160,9 +2197,16 @@ static bool execute_sescmd_in_backend( "%lu [execute_sescmd_in_backend] Routed %s cmd %p.", pthread_self(), STRPACKETTYPE(scur->scmd_cur_cmd->my_sescmd_packet_type), - scur->scmd_cur_cmd))); - - if (rc != 1) + scur->scmd_cur_cmd))); + + if (rc == 1) + { + /** + * All but COM_QUIT cause backend to send reply. flag backend_ref. + */ + bref_set_state(backend_ref, BREF_WAITING_RESULT); + } + else { succp = false; } @@ -2381,7 +2425,7 @@ static bool route_session_write( { DCB* dcb = backend_ref[i].bref_dcb; - if (backend_ref[i].bref_state == BREF_IN_USE) + if (BREF_IS_IN_USE((&backend_ref[i]))) { rc = dcb->func.write(dcb, gwbuf_clone(querybuf)); @@ -2412,11 +2456,11 @@ static bool route_session_write( } /** Add sescmd property to router client session */ rses_property_add(router_cli_ses, prop); - + for (i=0; irses_nbackends; i++) { - if (backend_ref[i].bref_state == BREF_IN_USE) - { + if (BREF_IS_IN_USE((&backend_ref[i]))) + { succp = execute_sescmd_in_backend(&backend_ref[i]); if (!succp) @@ -2499,7 +2543,10 @@ static void rwsplit_process_options( * @param message The error message to reply * @param backend_dcb The backend DCB * @param action The action: REPLY, REPLY_AND_CLOSE, NEW_CONNECTION - * + * @param succp Result of action. + * + * Even if succp == true connecting to new slave may have failed. succp is to + * tell whether router has enough master/slave connections to continue work. */ static void handleError ( ROUTER *instance, @@ -2511,7 +2558,7 @@ static void handleError ( { DCB* client_dcb = NULL; SESSION* session = backend_dcb->session; - ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)instance; + ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES *)router_session; client_dcb = session->client; @@ -2523,9 +2570,12 @@ static void handleError ( int router_nservers; int max_nslaves; - router_nservers = router_get_servercount(router); + router_nservers = router_get_servercount(inst); max_nslaves = rses_get_max_slavecount(rses, router_nservers); - + /** + * Try to get replacement slave or at least the minimum + * number of slave connections for router session. + */ *succp = select_connect_backend_servers( &rses->rses_master_ref, rses->rses_backend_ref, @@ -2533,9 +2583,39 @@ static void handleError ( max_nslaves, rses->rses_config.rw_slave_select_criteria, session, - router); + inst); ss_dassert(*succp); + /** Too few or no slaves at all */ + if (!succp) + { + if (session->state == SESSION_STATE_ROUTER_READY) + { + ROUTER* rsession; + ROUTER_OBJECT* router; + + router = session->service->router; + + spinlock_acquire(&session->ses_lock); + session->state = SESSION_STATE_STOPPING; + spinlock_release(&session->ses_lock); + + rsession = session->router_session; + /*< + * rsession should never be NULL here. + */ + ss_dassert(rsession != NULL); + LOGIF(LD, (skygw_log_write_flush( + LOGFILE_DEBUG, + "%lu [gw_error_backend_event] " + "Call closeSession for backend " + "session.", + pthread_self()))); + + router->closeSession(instance, rsession); + } + + } } break; @@ -2598,10 +2678,10 @@ static void print_error_packet( } static int router_get_servercount( - ROUTER_INSTANCE* router) + ROUTER_INSTANCE* inst) { int router_nservers = 0; - BACKEND** b = router->servers; + BACKEND** b = inst->servers; /** count servers */ while (*(b++) != NULL) router_nservers++; @@ -2634,4 +2714,26 @@ static int rses_get_max_slavecount( max_nslaves = MIN(router_nservers-1, MAX(1, conf_max_nslaves)); return max_nslaves; +} + +static backend_ref_t* get_bref_from_dcb( + ROUTER_CLIENT_SES* rses, + DCB* dcb) +{ + backend_ref_t* bref; + + CHK_DCB(dcb); + CHK_CLIENT_RSES(rses); + + bref = rses->rses_backend_ref; + + while (bref != NULL) + { + if (bref->bref_dcb == dcb) + { + break; + } + bref++; + } + return bref; } \ No newline at end of file