diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 42de6a8fa..ecd0a4508 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -141,10 +141,10 @@ static mysql_sescmd_t* sescmd_cursor_get_command( static bool sescmd_cursor_next( sescmd_cursor_t* scur); -static bool sescmd_reply_to_client( - DCB* client_dcb, - mysql_sescmd_t* scmd, - GWBUF* writebuf); +static GWBUF* sescmd_cursor_process_replies( + DCB* client_dcb, + GWBUF* replybuf, + sescmd_cursor_t* scur); static bool cont_exec_sescmd_in_backend( ROUTER_CLIENT_SES* rses, @@ -1020,37 +1020,34 @@ static void clientReply( "reply_by_statement", backend_dcb, gwbuf_clone(writebuf))); - /** Lock router session */ if (!rses_begin_locked_router_action(router_cli_ses)) { /** Log to debug that router was closed */ goto lock_failed; } + scur = rses_get_sescmd_cursor(router_cli_ses, be_type); /** * Active cursor means that reply is from session command - * execution. + * execution. Majority of the time there are no session commands + * being executed. */ if (sescmd_cursor_is_active(scur)) { - mysql_sescmd_t* scmd = sescmd_cursor_get_command(scur); - sescmd_reply_to_client(client_dcb, scmd, writebuf); - /** When sescmd list is empty set cursor passive. */ - if (!sescmd_cursor_next(scur)) - { - sescmd_cursor_set_active(scur, false); - } - /** Unlock router session */ - rses_end_locked_router_action(router_cli_ses); + writebuf = sescmd_cursor_process_replies(client_dcb, + writebuf, + scur); + } - else if (client_dcb != NULL) + /** Unlock router session */ + rses_end_locked_router_action(router_cli_ses); + + if (writebuf != NULL && client_dcb != NULL) { /** Write reply to client DCB */ client_dcb->func.write(client_dcb, writebuf); - ss_dassert(!sescmd_cursor_is_active(scur)); - /** Unlock router session */ - rses_end_locked_router_action(router_cli_ses); + LOGIF(LT, (skygw_log_write_flush( LOGFILE_TRACE, "%lu [clientReply:rwsplit] client dcb %p, " @@ -1059,6 +1056,7 @@ static void clientReply( client_dcb, backend_dcb))); } + lock_failed: return; } @@ -1307,7 +1305,10 @@ static void rses_property_add( } } -/** Router session must be locked */ +/** + * Router session must be locked. + * Return session command pointer if succeed, NULL if failed. + */ static mysql_sescmd_t* rses_property_get_sescmd( rses_property_t* prop) { @@ -1377,66 +1378,89 @@ static void mysql_sescmd_done( memset(sescmd, 0, sizeof(mysql_sescmd_t)); } - - -/** - * Write session command reply from backend to client if command haven't yet - * been replied. - * Return true if succeed, false if command was already replied. - * - * Router session must be locked */ -static bool sescmd_reply_to_client( - DCB* client_dcb, - mysql_sescmd_t* scmd, - GWBUF* writebuf) +/** + * All cases where backend message starts at least with one response to session + * command are handled here. + * Read session commands from property list. If command is already replied, + * discard packet. Else send reply to client. In both cases move cursor forward + * until all session command replies are handled. + */ +static GWBUF* sescmd_cursor_process_replies( + DCB* client_dcb, + GWBUF* replybuf, + sescmd_cursor_t* scur) { - bool succp = false; + const size_t headerlen = 4; /*< mysql packet header */ + uint8_t* packet; + size_t packetlen; + mysql_sescmd_t* scmd; - CHK_DCB(client_dcb); - CHK_MYSQL_SESCMD(scmd); - CHK_GWBUF(writebuf); - ss_dassert(SPINLOCK_IS_LOCKED( - &scmd->my_sescmd_prop->rses_prop_rsession->rses_lock)); - /** - * This depends on MySQL protoocl and doesn't work with others. - * TODO: write a function to MySQL protocol module and add - * a new protocol function 'discard n first messages'. + ss_dassert(SPINLOCK_IS_LOCKED(&(scur->scmd_cur_rses->rses_lock))); + + scmd = sescmd_cursor_get_command(scur); + + CHK_DCB(client_dcb); + CHK_GWBUF(replybuf); + + /** + * Walk through packets in the message and the list of session + *commands. */ - if (scmd->my_sescmd_is_replied) - { - size_t len; - uint8_t* packet; - /** - * Skip reply message because it is duplicate of alredy - * replied message. - */ - packet = (uint8_t*)writebuf->start; - len = packet[0]; - len += packet[1]*256; - len += packet[2]*256*256; - writebuf = gwbuf_consume(writebuf, len+4); - - if (writebuf == NULL || GWBUF_EMPTY(writebuf)) + while (scmd != NULL && replybuf != NULL) + { + if (scmd->my_sescmd_is_replied) { - succp = true; - goto return_succp; + /** + * Discard heading packets if their related command is + * already replied. + */ + CHK_GWBUF(replybuf); + packet = (uint8_t *)GWBUF_DATA(replybuf); + packetlen = packet[0]+packet[1]*256+packet[2]*256*256; + replybuf = gwbuf_consume(replybuf, packetlen+headerlen); + + LOGIF(LT, (skygw_log_write_flush( + LOGFILE_TRACE, + "%lu [sescmd_cursor_process_replies] cmd %p " + "is already replied. Discarded %d bytes from " + "the %s replybuffer.", + pthread_self(), + scmd, + packetlen+headerlen, + STRBETYPE(scur->scmd_cur_be_type)))); } - } - - client_dcb->func.write(client_dcb, writebuf); - scmd->my_sescmd_is_replied = true; - succp = true; - LOGIF(LT, (skygw_log_write_flush( - LOGFILE_TRACE, - "%lu [sescmd_reply_to_client] Replied cmd %p to client dcb %p.", - pthread_self(), - scmd, - client_dcb))); - -return_succp: - return succp; + else + { + /** Mark the rest session commands as replied */ + scmd->my_sescmd_is_replied = true; + LOGIF(LT, (skygw_log_write_flush( + LOGFILE_TRACE, + "%lu [sescmd_cursor_process_replies] Marked " + "cmd %p to as replied. Left message to %s's " + "buffer for reply.", + pthread_self(), + scmd, + STRBETYPE(scur->scmd_cur_be_type)))); + } + + if (sescmd_cursor_next(scur)) + { + scmd = sescmd_cursor_get_command(scur); + } + else + { + scmd = NULL; + /** All session commands are replied */ + scur->scmd_cur_active = false; + } + } + ss_dassert(replybuf == NULL || *scur->scmd_cur_ptr_property == NULL); + + return replybuf; } + + /** * Get the address of current session command. * @@ -1445,9 +1469,8 @@ static mysql_sescmd_t* sescmd_cursor_get_command( sescmd_cursor_t* scur) { mysql_sescmd_t* scmd; - - ss_dassert(SPINLOCK_IS_LOCKED(&scur->scmd_cur_rses->rses_lock)); + ss_dassert(SPINLOCK_IS_LOCKED(&(scur->scmd_cur_rses->rses_lock))); scur->scmd_cur_cmd = rses_property_get_sescmd(*scur->scmd_cur_ptr_property); CHK_MYSQL_SESCMD(scur->scmd_cur_cmd); @@ -1524,7 +1547,7 @@ static bool execute_sescmd_in_backend( bool succp = true; int rc = 0; sescmd_cursor_t* scur; - + dcb = rses->rses_dcb[be_type]; CHK_DCB(dcb); @@ -1535,14 +1558,15 @@ static bool execute_sescmd_in_backend( * Get cursor pointer and copy of command buffer to cursor. */ scur = rses_get_sescmd_cursor(rses, be_type); - - /** Return if there are no pending ses commands */ + + /** Return if there are no pending ses commands */ if (sescmd_cursor_get_command(scur) == NULL) { succp = false; goto return_succp; } - if (!sescmd_cursor_is_active(scur)) + + if (!sescmd_cursor_is_active(scur)) { /** Cursor is left active when function returns. */ sescmd_cursor_set_active(scur, true); @@ -1551,6 +1575,7 @@ static bool execute_sescmd_in_backend( "execute_sescmd_in_backend", dcb, sescmd_cursor_clone_querybuf(scur))); + switch (scur->scmd_cur_cmd->my_sescmd_packet_type) { case COM_CHANGE_USER: rc = dcb->func.auth( @@ -1584,57 +1609,10 @@ return_succp: return succp; } -/** - * Execute session commands when cursor is already active. - * - * Router session must be locked - * - * Return true if there was pending sescmd and sending command to - * backend server succeed. Otherwise false. - */ -static bool cont_exec_sescmd_in_backend( - ROUTER_CLIENT_SES* rses, - backend_type_t be_type) -{ - DCB* dcb; - bool succp = true; - int rc = 0; - sescmd_cursor_t* scur; - - dcb = rses->rses_dcb[be_type]; - - CHK_DCB(dcb); - CHK_CLIENT_RSES(rses); - ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock)); - - scur = rses_get_sescmd_cursor(rses, be_type); - ss_dassert(sescmd_cursor_is_active(scur)); - - /** Return if there are no pending ses commands */ - if (scur->scmd_cur_cmd == NULL) - { - succp = false; - goto return_succp; - } - - LOGIF(LT, tracelog_routed_query(rses, - "cont_exec_sescmd_in_backend", - dcb, - sescmd_cursor_clone_querybuf(scur))); - - rc = dcb->func.write(dcb, sescmd_cursor_clone_querybuf(scur)); - - if (rc != 1) - { - succp = false; - } -return_succp: - return succp; -} - /** * Moves cursor to next property and copied address of its sescmd to cursor. * Current propery must be non-null. + * If current property is the last on the list, *scur->scmd_ptr_property == NULL * * Router session must be locked */ @@ -1695,7 +1673,7 @@ static bool sescmd_cursor_next( } else { - /** Log error, sescmd shouldn't be NULL */ + ss_dassert(false); /*< Log error, sescmd shouldn't be NULL */ } return_succp: return succp;