diff --git a/server/core/poll.c b/server/core/poll.c index bfb1e65c6..51bba4027 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -79,6 +79,7 @@ static simple_mutex_t epoll_wait_mutex; /*< serializes calls to epoll_wait */ #endif static int n_waiting = 0; /*< No. of threads in epoll_wait */ static int process_pollq(int thread_id); +static void poll_add_event_to_dcb(DCB* dcb, GWBUF* buf, __uint32_t ev); DCB *eventq = NULL; @@ -1110,3 +1111,68 @@ int new_samples, new_nfds; if (next_sample >= n_avg_samples) next_sample = 0; } + +/** + * Add given GWBUF to DCB's readqueue and add a pending EPOLLIN event for DCB. + * The event pretends that there is something to read for the DCB. Actually + * the incoming data is stored in the DCB's readqueue where it is read. + * + * @param dcb DCB where the event and data are added + * @param buf GWBUF including the data + * + */ +void poll_add_epollin_event_to_dcb( + DCB* dcb, + GWBUF* buf) +{ + __uint32_t ev; + + ev = EPOLLIN; + + poll_add_event_to_dcb(dcb, buf, ev); +} + + + +static void poll_add_event_to_dcb( + DCB* dcb, + GWBUF* buf, + __uint32_t ev) +{ + /** Add buf to readqueue */ + spinlock_acquire(&dcb->authlock); + dcb->dcb_readqueue = gwbuf_append(dcb->dcb_readqueue, buf); + spinlock_release(&dcb->authlock); + + spinlock_acquire(&pollqlock); + + /** Set event to DCB */ + if (DCB_POLL_BUSY(dcb)) + { + dcb->evq.pending_events |= ev; + } + else + { + dcb->evq.pending_events = ev; + /** Add DCB to eventqueue if it isn't already there */ + if (eventq) + { + dcb->evq.prev = eventq->evq.prev; + eventq->evq.prev->evq.next = dcb; + eventq->evq.prev = dcb; + dcb->evq.next = eventq; + } + else + { + eventq = dcb; + dcb->evq.prev = dcb; + dcb->evq.next = dcb; + } + pollStats.evq_length++; + if (pollStats.evq_length > pollStats.evq_max) + { + pollStats.evq_max = pollStats.evq_length; + } + } + spinlock_release(&pollqlock); +} \ No newline at end of file diff --git a/server/include/modutil.h b/server/include/modutil.h index 224e5f431..eccdb9a9b 100644 --- a/server/include/modutil.h +++ b/server/include/modutil.h @@ -41,4 +41,11 @@ extern GWBUF *modutil_replace_SQL(GWBUF *, char *); extern char *modutil_get_query(GWBUF* buf); extern int modutil_send_mysql_err_packet(DCB *, int, int, int, const char *, const char *); +GWBUF *modutil_create_mysql_err_msg( + int packet_number, + int affected_rows, + int merrno, + const char *statemsg, + const char *msg); + #endif diff --git a/server/include/poll.h b/server/include/poll.h index 0e4fa08c9..bc8b8ccc9 100644 --- a/server/include/poll.h +++ b/server/include/poll.h @@ -42,4 +42,6 @@ extern void poll_shutdown(); extern GWBITMASK *poll_bitmask(); extern void dprintPollStats(DCB *); extern void dShowThreads(DCB *dcb); +void poll_add_epollin_event_to_dcb(DCB* dcb, GWBUF* buf); + #endif diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index a72a6fad4..35e75a279 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -71,6 +71,7 @@ static int gw_change_user(DCB *backend_dcb, SERVER *server, SESSION *in_session, static GWBUF* process_response_data (DCB* dcb, GWBUF* readbuf, int nbytes_to_process); extern char* create_auth_failed_msg( GWBUF* readbuf, char* hostaddr, uint8_t* sha1); extern char* create_auth_fail_str(char *username, char *hostaddr, char *sha1, char *db); +static bool sescmd_response_complete(DCB* dcb); #if defined(NOT_USED) @@ -194,7 +195,7 @@ static int gw_read_backend_event(DCB *dcb) { * If starting to auhenticate with backend server, lock dcb * to prevent overlapping processing of auth messages. */ - if (backend_protocol->protocol_auth_state == MYSQL_CONNECTED) + if (backend_protocol->protocol_auth_state == MYSQL_CONNECTED) { spinlock_acquire(&dcb->authlock); @@ -471,19 +472,26 @@ static int gw_read_backend_event(DCB *dcb) { } nbytes_read = gwbuf_length(read_buffer); - if (nbytes_read == 0) + if (nbytes_read == 0 && dcb->dcb_readqueue == NULL) { goto return_rc; } else { - ss_dassert(read_buffer != NULL); + ss_dassert(read_buffer != NULL || dcb->dcb_readqueue != NULL); } /** Packet prefix was read earlier */ if (dcb->dcb_readqueue) { - read_buffer = gwbuf_append(dcb->dcb_readqueue, read_buffer); + if (read_buffer != NULL) + { + read_buffer = gwbuf_append(dcb->dcb_readqueue, read_buffer); + } + else + { + read_buffer = dcb->dcb_readqueue; + } nbytes_read = gwbuf_length(read_buffer); if (nbytes_read < 5) /*< read at least command type */ @@ -494,7 +502,6 @@ static int gw_read_backend_event(DCB *dcb) { /** There is at least length and command type. */ else { - read_buffer = dcb->dcb_readqueue; dcb->dcb_readqueue = NULL; } } @@ -516,6 +523,15 @@ static int gw_read_backend_event(DCB *dcb) { MYSQL_COM_UNDEFINED) { read_buffer = process_response_data(dcb, read_buffer, nbytes_read); + /** + * Received incomplete response to session command. + * Store it to readqueue and return. + */ + if (!sescmd_response_complete(dcb)) + { + rc = 0; + goto return_rc; + } } /*< * If dcb->session->client is freed already it may be NULL. @@ -723,9 +739,6 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) default: { - uint8_t* ptr = GWBUF_DATA(queue); - int cmd = MYSQL_GET_COMMAND(ptr); - LOGIF(LD, (skygw_log_write( LOGFILE_DEBUG, "%lu [gw_MySQLWrite_backend] delayed write to " @@ -741,6 +754,9 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) if (GWBUF_IS_TYPE_SINGLE_STMT(queue) && GWBUF_IS_TYPE_SESCMD(queue)) { + uint8_t* ptr = GWBUF_DATA(queue); + int cmd = MYSQL_GET_COMMAND(ptr); + /** Record the command to backend's protocol */ protocol_add_srv_command(backend_protocol, cmd); } @@ -1183,8 +1199,8 @@ static int backend_write_delayqueue(DCB *dcb) * @param server The backend server pointer * @param in_session The current session data (MYSQL_session) * @param queue The GWBUF containing the COM_CHANGE_USER receveid - * @return 0 on success and 1 on failure - */ + * @return 1 on success and 0 on failure + */ static int gw_change_user( DCB *backend, SERVER *server, @@ -1266,26 +1282,35 @@ static int gw_change_user( if (auth_ret != 0) { char *password_set = NULL; char *message = NULL; + GWBUF* buf; if (auth_token_len > 0) password_set = (char *)client_sha1; else password_set = ""; - message=create_auth_fail_str(username, + /** + * Create an error message and make it look like legit reply + * from backend server. Then make it look like an incoming event + * so that thread gets new task of it, calls clientReply + * which filters out duplicate errors from same cause and forward + * reply to the client. + */ + message = create_auth_fail_str(username, backend->session->client->remote, password_set, ""); - /* send the error packet */ - modutil_send_mysql_err_packet(backend->session->client, 1, 0, 1045, "28000", message); - - free(message); - - rv = 1; + /** TODO: Add custom message indicating that retry would probably help */ + buf = modutil_create_mysql_err_msg(1, 0, 1045, "28000", message); + /** Set flags that help router to identify session commans reply */ + gwbuf_set_type(buf, GWBUF_TYPE_MYSQL); + gwbuf_set_type(buf, GWBUF_TYPE_SESCMD_RESPONSE); + gwbuf_set_type(buf, GWBUF_TYPE_RESPONSE_END); + /** Create an incoming event for backend DCB */ + poll_add_epollin_event_to_dcb(backend, buf); + rv = 0; } else { - rv = gw_send_change_user_to_backend(database, username, client_sha1, backend_protocol); - /* * Now copy new data into user session */ @@ -1298,29 +1323,9 @@ static int gw_change_user( return rv; } -/** - * Session Change wrapper for func.write - * The reply packet will be back routed to the right server - * in the gw_read_backend_event checking the ROUTER_CHANGE_SESSION command in dcb->command - * - * @param - * @return always 1 - */ -/* -static int gw_session(DCB *backend_dcb, void *data) { - - GWBUF *queue = NULL; - - queue = (GWBUF *) data; - backend_dcb->func.write(backend_dcb, queue); - - return 1; -} -*/ - /** - * Move packets or parts of packets from redbuf to outbuf as the packet headers + * Move packets or parts of packets from readbuf to outbuf as the packet headers * and lengths have been noticed and counted. * Session commands need to be marked so that they can be handled properly in * the router's clientReply. @@ -1452,3 +1457,28 @@ static GWBUF* process_response_data ( } return outbuf; } + + +static bool sescmd_response_complete( + DCB* dcb) +{ + int npackets_left; + size_t nbytes_left; + MySQLProtocol* p; + bool succp; + + p = DCB_PROTOCOL(dcb, MySQLProtocol); + CHK_PROTOCOL(p); + + protocol_get_response_status(p, &npackets_left, &nbytes_left); + + if (npackets_left == 0) + { + succp = true; + } + else + { + succp = false; + } + return succp; +} \ No newline at end of file