From bda0fd2db0aea24b0e584765e89ea4b0b33c5833 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Sun, 26 Mar 2017 11:42:33 +0300 Subject: [PATCH] Replace session command implementation The schemarouter now uses the new session commands. It uses a standard library container to manage the execution and storage of session commands. The session command history is disabled until a more complete refactoring can be done. --- .../routing/schemarouter/schemarouter.cc | 777 +++--------------- .../routing/schemarouter/schemarouter.h | 41 +- .../routing/schemarouter/session_command.cc | 28 +- .../routing/schemarouter/session_command.hh | 23 +- 4 files changed, 149 insertions(+), 720 deletions(-) diff --git a/server/modules/routing/schemarouter/schemarouter.cc b/server/modules/routing/schemarouter/schemarouter.cc index d26929e8f..5027c1e01 100644 --- a/server/modules/routing/schemarouter/schemarouter.cc +++ b/server/modules/routing/schemarouter/schemarouter.cc @@ -39,18 +39,7 @@ using std::string; #define SCHEMAROUTER_USERHASH_SIZE 10 /** - * @file schemarouter.c The entry points for the simple sharding - * router module. - *. - * @verbatim - * Revision History - * - * Date Who Description - * 01/12/2014 Vilho Raatikka/Markus Mäkelä Initial implementation - * 09/09/2015 Martin Brampton Modify error handler - * 25/09/2015 Martin Brampton Block callback processing when no router session in the DCB - * - * @endverbatim + * @file schemarouter.c The entry points for the simple sharding router module. */ static backend_ref_t* get_bref_from_dcb(SCHEMAROUTER_SESSION* rses, DCB* dcb); @@ -67,28 +56,12 @@ static bool get_shard_dcb(DCB** dcb, SCHEMAROUTER_SESSION* rses, char* name); -static void mysql_sescmd_done(mysql_sescmd_t* sescmd); -static mysql_sescmd_t* mysql_sescmd_init(rses_property_t* rses_prop, - GWBUF* sescmd_buf, - unsigned char packet_type, - SCHEMAROUTER_SESSION* rses); -static rses_property_t* mysql_sescmd_get_property(mysql_sescmd_t* scmd); + static rses_property_t* rses_property_init(rses_property_type_t prop_type); static void rses_property_add(SCHEMAROUTER_SESSION* rses, rses_property_t* prop); static void rses_property_done(rses_property_t* prop); -static mysql_sescmd_t* rses_property_get_sescmd(rses_property_t* prop); -static bool execute_sescmd_history(backend_ref_t* bref); static bool execute_sescmd_in_backend(backend_ref_t* backend_ref); -static void sescmd_cursor_reset(sescmd_cursor_t* scur); -static bool sescmd_cursor_history_empty(sescmd_cursor_t* scur); -static void sescmd_cursor_set_active(sescmd_cursor_t* sescmd_cursor, - bool value); -static bool sescmd_cursor_is_active(sescmd_cursor_t* sescmd_cursor); -static GWBUF* sescmd_cursor_clone_querybuf(sescmd_cursor_t* scur); -static mysql_sescmd_t* sescmd_cursor_get_command(sescmd_cursor_t* scur); -static bool sescmd_cursor_next(sescmd_cursor_t* scur); -static GWBUF* sescmd_cursor_process_replies(GWBUF* replybuf, backend_ref_t* bref); static void tracelog_routed_query(SCHEMAROUTER_SESSION* rses, char* funcname, backend_ref_t* bref, @@ -100,7 +73,6 @@ static bool route_session_write(SCHEMAROUTER_SESSION* router_client_ses, uint32_t qtype); static void bref_clear_state(backend_ref_t* bref, bref_state_t state); static void bref_set_state(backend_ref_t* bref, bref_state_t state); -static sescmd_cursor_t* backend_ref_get_sescmd_cursor (backend_ref_t* bref); static int router_handle_state_switch(DCB* dcb, DCB_REASON reason, void* data); static bool handle_error_new_connection(SCHEMAROUTER* inst, SCHEMAROUTER_SESSION* rses, @@ -1114,11 +1086,6 @@ static bool connect_backend_servers(backend_ref_t* backend_ref, if (backend_ref[i].bref_dcb != NULL) { servers_connected += 1; - /** - * Start executing session command - * history. - */ - execute_sescmd_history(&backend_ref[i]); /** * When server fails, this callback * is called. @@ -1208,10 +1175,6 @@ static void rses_property_done(rses_property_t* prop) switch (prop->rses_prop_type) { - case RSES_PROP_TYPE_SESCMD: - mysql_sescmd_done(&prop->rses_prop_data.sescmd); - break; - case RSES_PROP_TYPE_TMPTABLES: hashtable_free(prop->rses_prop_data.temp_tables); break; @@ -1260,240 +1223,6 @@ static void rses_property_add(SCHEMAROUTER_SESSION* rses, } } -/** - * Router session must be locked. - * Return session command pointer if succeed, NULL if failed. - */ -static mysql_sescmd_t* rses_property_get_sescmd(rses_property_t* prop) -{ - CHK_RSES_PROP(prop); - mysql_sescmd_t *sescmd = &prop->rses_prop_data.sescmd; - - if (sescmd != NULL) - { - CHK_MYSQL_SESCMD(sescmd); - } - return sescmd; -} - -/** - * Create session command property. - */ -static mysql_sescmd_t* mysql_sescmd_init(rses_property_t* rses_prop, - GWBUF* sescmd_buf, - unsigned char packet_type, - SCHEMAROUTER_SESSION* rses) -{ - mysql_sescmd_t* sescmd; - - CHK_RSES_PROP(rses_prop); - /** Can't call rses_property_get_sescmd with uninitialized sescmd */ - sescmd = &rses_prop->rses_prop_data.sescmd; - sescmd->my_sescmd_prop = rses_prop; /*< reference to owning property */ -#if defined(SS_DEBUG) - sescmd->my_sescmd_chk_top = CHK_NUM_MY_SESCMD; - sescmd->my_sescmd_chk_tail = CHK_NUM_MY_SESCMD; -#endif - /** Set session command buffer */ - sescmd->my_sescmd_buf = sescmd_buf; - sescmd->my_sescmd_packet_type = packet_type; - sescmd->position = atomic_add(&rses->pos_generator, 1); - return sescmd; -} - -static void mysql_sescmd_done(mysql_sescmd_t* sescmd) -{ - CHK_RSES_PROP(sescmd->my_sescmd_prop); - gwbuf_free(sescmd->my_sescmd_buf); - memset(sescmd, 0, sizeof(mysql_sescmd_t)); -} - -/** - * All cases where backend message starts at least with one response to session - * command are handled here. - * Read session commands from property list. If command is already replied, - * discard packet. Else send reply to client. In both cases move cursor forward - * until all session command replies are handled. - * - * Cases that are expected to happen and which are handled: - * s = response not yet replied to client, S = already replied response, - * q = query - * 1. q+ for example : select * from mysql.user - * 2. s+ for example : set autocommit=1 - * 3. S+ - * 4. sq+ - * 5. Sq+ - * 6. Ss+ - * 7. Ss+q+ - * 8. S+q+ - * 9. s+q+ - */ -static GWBUF* sescmd_cursor_process_replies(GWBUF* replybuf, - backend_ref_t* bref) -{ - mysql_sescmd_t* scmd; - sescmd_cursor_t* scur; - - scur = &bref->bref_sescmd_cur; - scmd = sescmd_cursor_get_command(scur); - - CHK_GWBUF(replybuf); - - /** - * Walk through packets in the message and the list of session - * commands. - */ - while (scmd != NULL && replybuf != NULL) - { - scur->position = scmd->position; - /** Faster backend has already responded to client : discard */ - if (scmd->my_sescmd_is_replied) - { - bool last_packet = false; - - CHK_GWBUF(replybuf); - - while (!last_packet) - { - int buflen; - - buflen = GWBUF_LENGTH(replybuf); - last_packet = GWBUF_IS_TYPE_RESPONSE_END(replybuf); - /** discard packet */ - replybuf = gwbuf_consume(replybuf, buflen); - } - /** Set response status received */ - bref_clear_state(bref, BREF_WAITING_RESULT); - } - /** Response is in the buffer and it will be sent to client. */ - else if (replybuf != NULL) - { - /** Mark the rest session commands as replied */ - scmd->my_sescmd_is_replied = true; - } - - if (sescmd_cursor_next(scur)) - { - scmd = sescmd_cursor_get_command(scur); - } - else - { - scmd = NULL; - /** All session commands are replied */ - scur->scmd_cur_active = false; - } - } - ss_dassert(replybuf == NULL || *scur->scmd_cur_ptr_property == NULL); - - return replybuf; -} - - - -/** - * Get the address of current session command. - * - * Router session must be locked */ -static mysql_sescmd_t* sescmd_cursor_get_command(sescmd_cursor_t* scur) -{ - mysql_sescmd_t* scmd; - - scur->scmd_cur_cmd = rses_property_get_sescmd(*scur->scmd_cur_ptr_property); - - CHK_MYSQL_SESCMD(scur->scmd_cur_cmd); - - scmd = scur->scmd_cur_cmd; - - return scmd; -} - -/** router must be locked */ -static bool sescmd_cursor_is_active(sescmd_cursor_t* sescmd_cursor) -{ - bool succp; - - succp = sescmd_cursor->scmd_cur_active; - return succp; -} - -/** router must be locked */ -static void sescmd_cursor_set_active(sescmd_cursor_t* sescmd_cursor, - bool value) -{ - /** avoid calling unnecessarily */ - ss_dassert(sescmd_cursor->scmd_cur_active != value); - sescmd_cursor->scmd_cur_active = value; -} - -/** - * Clone session command's command buffer. - * Router session must be locked - */ -static GWBUF* sescmd_cursor_clone_querybuf(sescmd_cursor_t* scur) -{ - GWBUF* buf; - ss_dassert(scur->scmd_cur_cmd != NULL); - - buf = gwbuf_clone(scur->scmd_cur_cmd->my_sescmd_buf); - - CHK_GWBUF(buf); - return buf; -} - -static bool sescmd_cursor_history_empty(sescmd_cursor_t* scur) -{ - bool succp; - - CHK_SESCMD_CUR(scur); - - if (scur->scmd_cur_rses->rses_properties[RSES_PROP_TYPE_SESCMD] == NULL) - { - succp = true; - } - else - { - succp = false; - } - - return succp; -} - -static void sescmd_cursor_reset(sescmd_cursor_t* scur) -{ - SCHEMAROUTER_SESSION* rses; - CHK_SESCMD_CUR(scur); - CHK_CLIENT_RSES(scur->scmd_cur_rses); - rses = scur->scmd_cur_rses; - - scur->scmd_cur_ptr_property = &rses->rses_properties[RSES_PROP_TYPE_SESCMD]; - - CHK_RSES_PROP((*scur->scmd_cur_ptr_property)); - scur->scmd_cur_active = false; - scur->scmd_cur_cmd = &(*scur->scmd_cur_ptr_property)->rses_prop_data.sescmd; -} - -static bool execute_sescmd_history(backend_ref_t* bref) -{ - bool succp; - sescmd_cursor_t* scur; - CHK_BACKEND_REF(bref); - - scur = &bref->bref_sescmd_cur; - CHK_SESCMD_CUR(scur); - - if (!sescmd_cursor_history_empty(scur)) - { - sescmd_cursor_reset(scur); - succp = execute_sescmd_in_backend(bref); - } - else - { - succp = true; - } - - return succp; -} - /** * If session command cursor is passive, sends the command to backend for * execution. @@ -1506,50 +1235,34 @@ static bool execute_sescmd_history(backend_ref_t* bref) */ static bool execute_sescmd_in_backend(backend_ref_t* backend_ref) { - DCB* dcb; - bool succp; - int rc = 0; - sescmd_cursor_t* scur; - if (BREF_IS_CLOSED(backend_ref)) { - succp = false; - goto return_succp; + return false; } - dcb = backend_ref->bref_dcb; + + DCB *dcb = backend_ref->bref_dcb; CHK_DCB(dcb); CHK_BACKEND_REF(backend_ref); - /** - * Get cursor pointer and copy of command buffer to cursor. - */ - scur = &backend_ref->bref_sescmd_cur; + int rc = 0; /** Return if there are no pending ses commands */ - if (sescmd_cursor_get_command(scur) == NULL) + if (backend_ref->session_commands.size() == 0) { - succp = false; MXS_INFO("Cursor had no pending session commands."); - - goto return_succp; + return false; } - if (!sescmd_cursor_is_active(scur)) - { - /** Cursor is left active when function returns. */ - sescmd_cursor_set_active(scur, true); - } + SessionCommandList::iterator iter = backend_ref->session_commands.begin(); + GWBUF *buffer = iter->copy_buffer().release(); - switch (scur->scmd_cur_cmd->my_sescmd_packet_type) + switch (iter->get_command()) { case MYSQL_COM_CHANGE_USER: /** This makes it possible to handle replies correctly */ - gwbuf_set_type(scur->scmd_cur_cmd->my_sescmd_buf, GWBUF_TYPE_SESCMD); - rc = dcb->func.auth(dcb, - NULL, - dcb->session, - sescmd_cursor_clone_querybuf(scur)); + gwbuf_set_type(buffer, GWBUF_TYPE_SESCMD); + rc = dcb->func.auth(dcb, NULL, dcb->session, buffer); break; case MYSQL_COM_QUERY: @@ -1558,94 +1271,12 @@ static bool execute_sescmd_in_backend(backend_ref_t* backend_ref) * Mark session command buffer, it triggers writing * MySQL command to protocol */ - gwbuf_set_type(scur->scmd_cur_cmd->my_sescmd_buf, GWBUF_TYPE_SESCMD); - rc = dcb->func.write(dcb, sescmd_cursor_clone_querybuf(scur)); + gwbuf_set_type(buffer, GWBUF_TYPE_SESCMD); + rc = dcb->func.write(dcb, buffer); break; } - if (rc == 1) - { - succp = true; - } - else - { - succp = false; - } -return_succp: - return succp; -} - - -/** - * Moves cursor to next property and copied address of its sescmd to cursor. - * Current propery must be non-null. - * If current property is the last on the list, *scur->scmd_ptr_property == NULL - * - * Router session must be locked - */ -static bool sescmd_cursor_next(sescmd_cursor_t* scur) -{ - bool succp = false; - rses_property_t* prop_curr; - rses_property_t* prop_next; - - ss_dassert(scur != NULL); - ss_dassert(*(scur->scmd_cur_ptr_property) != NULL); - - /** Illegal situation */ - if (scur == NULL || - *scur->scmd_cur_ptr_property == NULL || - scur->scmd_cur_cmd == NULL) - { - /** Log error */ - goto return_succp; - } - prop_curr = *(scur->scmd_cur_ptr_property); - - CHK_MYSQL_SESCMD(scur->scmd_cur_cmd); - ss_dassert(prop_curr == mysql_sescmd_get_property(scur->scmd_cur_cmd)); - CHK_RSES_PROP(prop_curr); - - /** Copy address of pointer to next property */ - scur->scmd_cur_ptr_property = &(prop_curr->rses_prop_next); - prop_next = *scur->scmd_cur_ptr_property; - ss_dassert(prop_next == *(scur->scmd_cur_ptr_property)); - - /** If there is a next property move forward */ - if (prop_next != NULL) - { - CHK_RSES_PROP(prop_next); - CHK_RSES_PROP((*(scur->scmd_cur_ptr_property))); - - /** Get pointer to next property's sescmd */ - scur->scmd_cur_cmd = rses_property_get_sescmd(prop_next); - - ss_dassert(prop_next == scur->scmd_cur_cmd->my_sescmd_prop); - CHK_MYSQL_SESCMD(scur->scmd_cur_cmd); - CHK_RSES_PROP(scur->scmd_cur_cmd->my_sescmd_prop); - } - else - { - /** No more properties, can't proceed. */ - goto return_succp; - } - - if (scur->scmd_cur_cmd != NULL) - { - succp = true; - } - else - { - ss_dassert(false); /*< Log error, sescmd shouldn't be NULL */ - } -return_succp: - return succp; -} - -static rses_property_t* mysql_sescmd_get_property(mysql_sescmd_t* scmd) -{ - CHK_MYSQL_SESCMD(scmd); - return scmd->my_sescmd_prop; + return rc == 1; } /** @@ -1667,149 +1298,21 @@ static bool route_session_write(SCHEMAROUTER_SESSION* router_cli_ses, uint32_t qtype) { bool succp = false; - rses_property_t* prop; - backend_ref_t* backend_ref; - int i; + backend_ref_t *backend_ref = router_cli_ses->rses_backend_ref; MXS_INFO("Session write, routing to all servers."); - - backend_ref = router_cli_ses->rses_backend_ref; - - /** - * These are one-way messages and server doesn't respond to them. - * Therefore reply processing is unnecessary and session - * command property is not needed. It is just routed to all available - * backends. - */ - if (packet_type == MYSQL_COM_STMT_SEND_LONG_DATA || - packet_type == MYSQL_COM_QUIT || - packet_type == MYSQL_COM_STMT_CLOSE) - { - int rc; - - succp = true; - - /** Lock router session */ - if (router_cli_ses->closed) - { - return false; - } - - for (i = 0; i < router_cli_ses->rses_nbackends; i++) - { - DCB* dcb = backend_ref[i].bref_dcb; - - if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) - { - MXS_INFO("Route query to %s\t%s:%d%s", - (SERVER_IS_MASTER(backend_ref[i].bref_backend->server) ? - "master" : "slave"), - backend_ref[i].bref_backend->server->name, - backend_ref[i].bref_backend->server->port, - (i + 1 == router_cli_ses->rses_nbackends ? " <" : "")); - } - - if (BREF_IS_IN_USE((&backend_ref[i]))) - { - rc = dcb->func.write(dcb, gwbuf_clone(querybuf)); - if (rc != 1) - { - succp = false; - } - } - } - - gwbuf_free(querybuf); - return succp; - } - /** Lock router session */ - - if (router_cli_ses->rses_nbackends <= 0) - { - return false; - } - - if (router_cli_ses->rses_config.max_sescmd_hist > 0 && - router_cli_ses->n_sescmd >= router_cli_ses->rses_config.max_sescmd_hist) - { - MXS_ERROR("Router session exceeded session command history limit of %d. " - "Closing router session.", - router_cli_ses->rses_config.max_sescmd_hist); - gwbuf_free(querybuf); - atomic_add(&router_cli_ses->router->stats.n_hist_exceeded, 1); - poll_fake_hangup_event(router_cli_ses->rses_client_dcb); - - return succp; - } - - if (router_cli_ses->rses_config.disable_sescmd_hist) - { - rses_property_t *prop, *tmp; - backend_ref_t* bref; - bool conflict; - - prop = router_cli_ses->rses_properties[RSES_PROP_TYPE_SESCMD]; - while (prop) - { - conflict = false; - - for (i = 0; i < router_cli_ses->rses_nbackends; i++) - { - bref = &backend_ref[i]; - if (BREF_IS_IN_USE(bref)) - { - - if (bref->bref_sescmd_cur.position <= prop->rses_prop_data.sescmd.position) - { - conflict = true; - break; - } - } - } - - if (conflict) - { - break; - } - - tmp = prop; - router_cli_ses->rses_properties[RSES_PROP_TYPE_SESCMD] = prop->rses_prop_next; - rses_property_done(tmp); - prop = router_cli_ses->rses_properties[RSES_PROP_TYPE_SESCMD]; - } - } - - /** - * - * Additional reference is created to querybuf to - * prevent it from being released before properties - * are cleaned up as a part of router session clean-up. - */ - prop = rses_property_init(RSES_PROP_TYPE_SESCMD); - mysql_sescmd_init(prop, querybuf, packet_type, router_cli_ses); - - /** Add sescmd property to router client session */ - rses_property_add(router_cli_ses, prop); atomic_add(&router_cli_ses->stats.longest_sescmd, 1); atomic_add(&router_cli_ses->n_sescmd, 1); - for (i = 0; i < router_cli_ses->rses_nbackends; i++) + /** Increment the session command count */ + ++router_cli_ses->sent_sescmd; + + for (int i = 0; i < router_cli_ses->rses_nbackends; i++) { if (BREF_IS_IN_USE((&backend_ref[i]))) { GWBUF *buffer = gwbuf_clone(querybuf); - backend_ref[i].session_commands.push_back(buffer); - - for (SessionCommandList::iterator iter = backend_ref[i].session_commands.begin(); - iter != backend_ref[i].session_commands.end(); - iter++) - { - SessionCommand& scmd = *iter; - string str = scmd.to_string(); - MXS_INFO("%s: %s", backend_ref[i].bref_backend->server->unique_name, str.c_str()); - } - - sescmd_cursor_t* scur; + backend_ref[i].session_commands.push_back(SessionCommand(buffer, router_cli_ses->sent_sescmd)); if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) { @@ -1821,32 +1324,26 @@ static bool route_session_write(SCHEMAROUTER_SESSION* router_cli_ses, (i + 1 == router_cli_ses->rses_nbackends ? " <" : "")); } - scur = backend_ref_get_sescmd_cursor(&backend_ref[i]); - - /** - * Add one waiter to backend reference. - */ - bref_set_state(get_bref_from_dcb(router_cli_ses, - backend_ref[i].bref_dcb), - BREF_WAITING_RESULT); - /** - * Start execution if cursor is not already executing. - * Otherwise, cursor will execute pending commands - * when it completes with previous commands. - */ - if (sescmd_cursor_is_active(scur)) + if (backend_ref[i].session_commands.size() == 1) { - succp = true; + /** Only one command, execute it */ + switch (packet_type) + { + /** These types of commands don't generate responses */ + case MYSQL_COM_QUIT: + case MYSQL_COM_STMT_CLOSE: + break; - MXS_INFO("Backend %s:%d already executing sescmd.", - backend_ref[i].bref_backend->server->name, - backend_ref[i].bref_backend->server->port); - } - else - { - succp = execute_sescmd_in_backend(&backend_ref[i]); + default: + bref_set_state(&backend_ref[i], BREF_WAITING_RESULT); + break; + } - if (!succp) + if (execute_sescmd_in_backend(&backend_ref[i])) + { + succp = true; + } + else { MXS_ERROR("Failed to execute session " "command in %s:%d", @@ -1854,10 +1351,15 @@ static bool route_session_write(SCHEMAROUTER_SESSION* router_cli_ses, backend_ref[i].bref_backend->server->port); } } - } - else - { - succp = false; + else + { + ss_dassert(backend_ref[i].session_commands.size() > 1); + /** The server is already executing a session command */ + MXS_INFO("Backend %s:%d already executing sescmd.", + backend_ref[i].bref_backend->server->name, + backend_ref[i].bref_backend->server->port); + succp = true; + } } } @@ -1942,8 +1444,9 @@ static bool handle_error_new_connection(SCHEMAROUTER* inst, */ if ((bref = get_bref_from_dcb(rses, backend_dcb)) == NULL) { - succp = false; - goto return_succp; + /** This should not happen */ + ss_dassert(false); + return false; } CHK_BACKEND_REF(bref); @@ -1963,34 +1466,7 @@ static bool handle_error_new_connection(SCHEMAROUTER* inst, bref_clear_state(bref, BREF_IN_USE); bref_set_state(bref, BREF_CLOSED); - /** - * Error handler is already called for this DCB because - * it's not polling anymore. It can be assumed that - * it succeed because rses isn't closed. - */ - if (backend_dcb->state != DCB_STATE_POLLING) - { - succp = true; - goto return_succp; - } - - /** - * Try to get replacement slave or at least the minimum - * number of slave connections for router session. - */ - succp = connect_backend_servers(rses->rses_backend_ref, - rses->rses_nbackends, - ses, inst); - - if (!have_servers(rses)) - { - MXS_ERROR("No more valid servers, closing session"); - succp = false; - goto return_succp; - } - -return_succp: - return succp; + return have_servers(rses); } /** @@ -2018,13 +1494,6 @@ static backend_ref_t* get_bref_from_dcb(SCHEMAROUTER_SESSION *rses, return NULL; } -static sescmd_cursor_t* backend_ref_get_sescmd_cursor(backend_ref_t* bref) -{ - CHK_BACKEND_REF(bref); - CHK_SESCMD_CUR((&bref->bref_sescmd_cur)); - return &bref->bref_sescmd_cur; -} - /** * Detect if a query contains a SHOW SHARDS query. * @param query Query to inspect @@ -2531,11 +2000,13 @@ MXS_BEGIN_DECLS */ static MXS_ROUTER* createInstance(SERVICE *service, char **options) { - SCHEMAROUTER* router; + SCHEMAROUTER* router = NULL; MXS_CONFIG_PARAMETER* conf; MXS_CONFIG_PARAMETER* param; - if ((router = (SCHEMAROUTER*)MXS_CALLOC(1, sizeof(SCHEMAROUTER))) == NULL) + MXS_EXCEPTION_GUARD(router = new SCHEMAROUTER); + + if (router == NULL) { return NULL; } @@ -2564,7 +2035,6 @@ static MXS_ROUTER* createInstance(SERVICE *service, char **options) hashtable_add(router->ignored_dbs, (void*)"information_schema", (void*)""); hashtable_add(router->ignored_dbs, (void*)"performance_schema", (void*)""); router->service = service; - router->schemarouter_config.max_sescmd_hist = 0; router->schemarouter_config.last_refresh = time(NULL); router->stats.longest_sescmd = 0; router->stats.n_hist_exceeded = 0; @@ -2578,8 +2048,6 @@ static MXS_ROUTER* createInstance(SERVICE *service, char **options) router->schemarouter_config.refresh_databases = config_get_bool(conf, "refresh_databases"); router->schemarouter_config.refresh_min_interval = config_get_integer(conf, "refresh_interval"); - router->schemarouter_config.max_sescmd_hist = config_get_integer(conf, "max_sescmd_history"); - router->schemarouter_config.disable_sescmd_hist = config_get_bool(conf, "disable_sescmd_history"); router->schemarouter_config.debug = config_get_bool(conf, "debug"); if ((config_get_param(conf, "auth_all_servers")) == NULL) @@ -2657,11 +2125,11 @@ static MXS_ROUTER* createInstance(SERVICE *service, char **options) if (strcmp(options[i], "max_sescmd_history") == 0) { - router->schemarouter_config.max_sescmd_hist = atoi(value); + MXS_WARNING("Use of 'max_sescmd_history' is deprecated"); } else if (strcmp(options[i], "disable_sescmd_history") == 0) { - router->schemarouter_config.disable_sescmd_hist = config_truth_value(value); + MXS_WARNING("Use of 'disable_sescmd_history' is deprecated"); } else if (strcmp(options[i], "refresh_databases") == 0) { @@ -2683,15 +2151,9 @@ static MXS_ROUTER* createInstance(SERVICE *service, char **options) } } - /** Setting a limit to the history size is not needed if it is disabled.*/ - if (router->schemarouter_config.disable_sescmd_hist && router->schemarouter_config.max_sescmd_hist > 0) - { - router->schemarouter_config.max_sescmd_hist = 0; - } - if (failure) { - MXS_FREE(router); + delete router; router = NULL; } @@ -2710,11 +2172,6 @@ static MXS_ROUTER* createInstance(SERVICE *service, char **options) */ static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* router_inst, MXS_SESSION* session) { - backend_ref_t* backend_ref; /*< array of backend references (DCB, BACKEND, cursor) */ - SCHEMAROUTER_SESSION* client_rses = NULL; - SCHEMAROUTER* router = (SCHEMAROUTER *)router_inst; - bool succp; - int router_nservers = 0; /*< # of servers in total */ char db[MYSQL_DATABASE_MAXLEN + 1] = ""; MySQLProtocol* protocol = (MySQLProtocol*)session->client_dcb->protocol; MYSQL_session* data = (MYSQL_session*)session->client_dcb->data; @@ -2739,17 +2196,21 @@ static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* router_inst, MXS_SESSION* sess MXS_INFO("Client'%s' connecting with empty database.", data->user); } - client_rses = (SCHEMAROUTER_SESSION *)MXS_CALLOC(1, sizeof(SCHEMAROUTER_SESSION)); + SCHEMAROUTER_SESSION* client_rses = NULL; + + MXS_EXCEPTION_GUARD(client_rses = new SCHEMAROUTER_SESSION); if (client_rses == NULL) { return NULL; } + + SCHEMAROUTER* router = (SCHEMAROUTER*)router_inst; + #if defined(SS_DEBUG) client_rses->rses_chk_top = CHK_NUM_ROUTER_SES; client_rses->rses_chk_tail = CHK_NUM_ROUTER_SES; #endif - client_rses->router = router; client_rses->rses_mysql_session = (MYSQL_session*)session->client_dcb->data; client_rses->rses_client_dcb = (DCB*)session->client_dcb; @@ -2772,7 +2233,7 @@ static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* router_inst, MXS_SESSION* sess { MXS_ERROR("Failed to allocate enough memory to create" "new shard mapping. Session will be closed."); - MXS_FREE(client_rses); + delete client_rses; return NULL; } client_rses->init = INIT_UNINT; @@ -2787,6 +2248,7 @@ static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* router_inst, MXS_SESSION* sess memcpy(&client_rses->rses_config, &router->schemarouter_config, sizeof(schemarouter_config_t)); client_rses->n_sescmd = 0; client_rses->rses_config.last_refresh = time(NULL); + client_rses->closed = false; if (using_db) { @@ -2797,24 +2259,30 @@ static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* router_inst, MXS_SESSION* sess */ client_rses->rses_autocommit_enabled = true; client_rses->rses_transaction_active = false; + client_rses->sent_sescmd = 0; + client_rses->replied_sescmd = 0; /** * Instead of calling this, ensure that there is at least one * responding server. */ - router_nservers = router->service->n_dbref; + int router_nservers = router->service->n_dbref; + + backend_ref_t* backend_ref = NULL; /** * Create backend reference objects for this session. */ - backend_ref = (backend_ref_t *)MXS_CALLOC(router_nservers, sizeof(backend_ref_t)); + + MXS_EXCEPTION_GUARD(backend_ref = new backend_ref_t[router_nservers]); if (backend_ref == NULL) { - MXS_FREE(client_rses); + delete client_rses; return NULL; } + /** * Initialize backend references with BACKEND ptr. * Initialize session command cursors for each backend reference. @@ -2829,19 +2297,16 @@ static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* router_inst, MXS_SESSION* sess #if defined(SS_DEBUG) backend_ref[i].bref_chk_top = CHK_NUM_BACKEND_REF; backend_ref[i].bref_chk_tail = CHK_NUM_BACKEND_REF; - backend_ref[i].bref_sescmd_cur.scmd_cur_chk_top = CHK_NUM_SESCMD_CUR; - backend_ref[i].bref_sescmd_cur.scmd_cur_chk_tail = CHK_NUM_SESCMD_CUR; #endif backend_ref[i].bref_state = 0; backend_ref[i].n_mapping_eof = 0; backend_ref[i].map_queue = NULL; backend_ref[i].bref_backend = ref; - /** store pointers to sescmd list to both cursors */ - backend_ref[i].bref_sescmd_cur.scmd_cur_rses = client_rses; - backend_ref[i].bref_sescmd_cur.scmd_cur_active = false; - backend_ref[i].bref_sescmd_cur.scmd_cur_ptr_property = - &client_rses->rses_properties[RSES_PROP_TYPE_SESCMD]; - backend_ref[i].bref_sescmd_cur.scmd_cur_cmd = NULL; + backend_ref[i].bref_pending_cmd = NULL; + backend_ref[i].bref_num_result_wait = 0; + + client_rses->rses_properties[RSES_PROP_TYPE_SESCMD] = NULL; + client_rses->rses_properties[RSES_PROP_TYPE_TMPTABLES] = NULL; i++; } } @@ -2857,12 +2322,12 @@ static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* router_inst, MXS_SESSION* sess /** * Connect to all backend servers */ - succp = connect_backend_servers(backend_ref, router_nservers, session, router); + bool succp = connect_backend_servers(backend_ref, router_nservers, session, router); if (!succp || client_rses->closed) { - MXS_FREE(client_rses->rses_backend_ref); - MXS_FREE(client_rses); + delete client_rses->rses_backend_ref; + delete client_rses; return NULL; } @@ -2981,8 +2446,8 @@ static void freeSession(MXS_ROUTER* router_instance, MXS_ROUTER_SESSION* router_ * all the memory and other resources associated * to the client session. */ - MXS_FREE(router_cli_ses->rses_backend_ref); - MXS_FREE(router_cli_ses); + delete[] router_cli_ses->rses_backend_ref; + delete router_cli_ses; return; } @@ -3016,7 +2481,6 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, DCB* target_dcb = NULL; SCHEMAROUTER* inst = (SCHEMAROUTER *)instance; SCHEMAROUTER_SESSION* router_cli_ses = (SCHEMAROUTER_SESSION *)router_session; - bool rses_is_closed = false; bool change_successful = false; route_target_t route_target = TARGET_UNDEFINED; bool succp = false; @@ -3397,11 +2861,7 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, if (succp) /*< Have DCB of the target backend */ { - backend_ref_t* bref; - sescmd_cursor_t* scur; - - bref = get_bref_from_dcb(router_cli_ses, target_dcb); - scur = &bref->bref_sescmd_cur; + backend_ref_t *bref = get_bref_from_dcb(router_cli_ses, target_dcb); MXS_INFO("Route query to \t%s:%d <", bref->bref_backend->server->name, @@ -3411,7 +2871,7 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, * haven't completed yet. Note that according to MySQL protocol * there can only be one such non-sescmd stmt at the time. */ - if (sescmd_cursor_is_active(scur)) + if (bref->session_commands.size() > 0) { ss_dassert((bref->bref_pending_cmd == NULL || router_cli_ses->closed)); @@ -3474,23 +2934,6 @@ static void diagnostic(MXS_ROUTER *instance, DCB *dcb) router->stats.longest_sescmd); dcb_printf(dcb, "Session command history limit exceeded: %d times\n", router->stats.n_hist_exceeded); - if (!router->schemarouter_config.disable_sescmd_hist) - { - dcb_printf(dcb, "Session command history: enabled\n"); - if (router->schemarouter_config.max_sescmd_hist == 0) - { - dcb_printf(dcb, "Session command history limit: unlimited\n"); - } - else - { - dcb_printf(dcb, "Session command history limit: %d\n", - router->schemarouter_config.max_sescmd_hist); - } - } - else - { - dcb_printf(dcb, "Session command history: disabled\n"); - } /** Session time statistics */ @@ -3634,39 +3077,13 @@ static void clientReply(MXS_ROUTER* instance, } CHK_BACKEND_REF(bref); - sescmd_cursor_t *scur = &bref->bref_sescmd_cur; + /** * Active cursor means that reply is from session command * execution. */ - if (sescmd_cursor_is_active(scur)) + if (bref->session_commands.size() > 0) { - if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_ERR) && - MYSQL_IS_ERROR_PACKET(((uint8_t *) GWBUF_DATA(writebuf)))) - { - uint8_t* buf = (uint8_t *) GWBUF_DATA((scur->scmd_cur_cmd->my_sescmd_buf)); - uint8_t* replybuf = (uint8_t *) GWBUF_DATA(writebuf); - size_t len = MYSQL_GET_PAYLOAD_LEN(buf); - size_t replylen = MYSQL_GET_PAYLOAD_LEN(replybuf); - char* cmdstr = strndup(&((char *) buf)[5], len - 4); - char* err = strndup(&((char *) replybuf)[8], 5); - char* replystr = strndup(&((char *) replybuf)[13], - replylen - 4 - 5); - - ss_dassert(len + 4 == GWBUF_LENGTH(scur->scmd_cur_cmd->my_sescmd_buf)); - - MXS_ERROR("Failed to execute %s in %s:%d. %s %s", - cmdstr, - bref->bref_backend->server->name, - bref->bref_backend->server->port, - err, - replystr); - - MXS_FREE(cmdstr); - MXS_FREE(err); - MXS_FREE(replystr); - } - if (GWBUF_IS_TYPE_SESCMD_RESPONSE(writebuf)) { /** @@ -3674,7 +3091,17 @@ static void clientReply(MXS_ROUTER* instance, * the client. Return with buffer including response that * needs to be sent to client or NULL. */ - writebuf = sescmd_cursor_process_replies(writebuf, bref); + if (router_cli_ses->replied_sescmd < router_cli_ses->sent_sescmd && + bref->session_commands.begin()->get_position() == router_cli_ses->replied_sescmd + 1) + { + ++router_cli_ses->replied_sescmd; + } + else + { + gwbuf_free(writebuf); + writebuf = NULL; + } + bref->session_commands.pop_front(); } /** * If response will be sent to client, decrease waiter count. @@ -3712,7 +3139,7 @@ static void clientReply(MXS_ROUTER* instance, } /** There is one pending session command to be executed. */ - if (sescmd_cursor_is_active(scur)) + if (bref->session_commands.size() > 0) { MXS_INFO("Backend %s:%d processed reply and starts to execute " diff --git a/server/modules/routing/schemarouter/schemarouter.h b/server/modules/routing/schemarouter/schemarouter.h index 25c7c0cab..ffbd043ba 100644 --- a/server/modules/routing/schemarouter/schemarouter.h +++ b/server/modules/routing/schemarouter/schemarouter.h @@ -146,25 +146,6 @@ typedef enum rses_property_type_t strncmp(s,"LEAST_CURRENT_OPERATIONS", strlen("LEAST_CURRENT_OPERATIONS")) == 0 ? \ LEAST_CURRENT_OPERATIONS : UNDEFINED_CRITERIA)))) -/** - * Session variable command - */ -typedef struct mysql_sescmd_st -{ -#if defined(SS_DEBUG) - skygw_chk_t my_sescmd_chk_top; -#endif - rses_property_t* my_sescmd_prop; /*< Parent property */ - GWBUF* my_sescmd_buf; /*< Query buffer */ - unsigned char my_sescmd_packet_type;/*< Packet type */ - bool my_sescmd_is_replied; /*< Is cmd replied to client */ - int position; /*< Position of this command */ -#if defined(SS_DEBUG) - skygw_chk_t my_sescmd_chk_tail; -#endif -} mysql_sescmd_t; - - /** * Property structure */ @@ -178,7 +159,6 @@ struct rses_property_st rses_property_type_t rses_prop_type; /*< Property type */ union rses_prop_data { - mysql_sescmd_t sescmd; /*< Session commands */ HASHTABLE* temp_tables; /*< Hashtable of table names */ } rses_prop_data; rses_property_t* rses_prop_next; /*< Next property of same type */ @@ -187,21 +167,6 @@ struct rses_property_st #endif }; -typedef struct sescmd_cursor_st -{ -#if defined(SS_DEBUG) - skygw_chk_t scmd_cur_chk_top; -#endif - SCHEMAROUTER_SESSION* scmd_cur_rses; /*< pointer to owning router session */ - rses_property_t** scmd_cur_ptr_property; /*< address of pointer to owner property */ - mysql_sescmd_t* scmd_cur_cmd; /*< pointer to current session command */ - bool scmd_cur_active; /*< true if command is being executed */ - int position; /*< Position of this cursor */ -#if defined(SS_DEBUG) - skygw_chk_t scmd_cur_chk_tail; -#endif -} sescmd_cursor_t; - /** * Internal structure used to define the set of backend servers we are routing * connections to. This provides the storage for routing module specific data @@ -253,7 +218,6 @@ typedef struct backend_ref_st bool bref_mapped; /*< Whether the backend has been mapped */ bool last_sescmd_replied; int bref_num_result_wait; /*< Number of not yet received results */ - sescmd_cursor_t bref_sescmd_cur; /*< Session command cursor */ GWBUF* bref_pending_cmd; /*< For stmt which can't be routed due active sescmd execution */ SessionCommandList session_commands; /**< List of session commands that are @@ -271,8 +235,6 @@ typedef struct schemarouter_config_st int rw_max_slave_conn_percent; int rw_max_slave_conn_count; mxs_target_t rw_use_sql_variables_in; - int max_sescmd_hist; - bool disable_sescmd_hist; time_t last_refresh; /*< Last time the database list was refreshed */ double refresh_min_interval; /*< Minimum required interval between refreshes of databases */ bool refresh_databases; /*< Are databases refreshed when they are not found in the hashtable */ @@ -329,6 +291,9 @@ struct schemarouter_session ROUTER_STATS stats; /*< Statistics for this router */ int n_sescmd; int pos_generator; + + uint64_t sent_sescmd; /**< The latest session command being executed */ + uint64_t replied_sescmd; /**< The last session command reply that was sent to the client */ #if defined(SS_DEBUG) skygw_chk_t rses_chk_tail; #endif diff --git a/server/modules/routing/schemarouter/session_command.cc b/server/modules/routing/schemarouter/session_command.cc index 409a88edd..a0e49b57e 100644 --- a/server/modules/routing/schemarouter/session_command.cc +++ b/server/modules/routing/schemarouter/session_command.cc @@ -13,6 +13,7 @@ #include "session_command.hh" #include +#include void SessionCommand::mark_reply_received() { @@ -24,15 +25,31 @@ bool SessionCommand::is_reply_received() const return m_replySent; } +uint8_t SessionCommand::get_command() const +{ + return m_command; +} + +uint64_t SessionCommand::get_position() const +{ + return m_pos; +} + Buffer SessionCommand::copy_buffer() const { return m_buffer; } -SessionCommand::SessionCommand(GWBUF *buffer): +SessionCommand::SessionCommand(GWBUF *buffer, uint64_t id): m_buffer(buffer), + m_command(0), + m_pos(id), m_replySent(false) { + if (buffer) + { + gwbuf_copy_data(buffer, MYSQL_HEADER_LEN, 1, &m_command); + } } SessionCommand::~SessionCommand() @@ -42,15 +59,18 @@ SessionCommand::~SessionCommand() std::string SessionCommand::to_string() { std::string str; - - GWBUF **buf = &m_buffer; char *sql; int sql_len; - if (modutil_extract_SQL(*buf, &sql, &sql_len)) + /** TODO: Create C++ versions of modutil functions */ + GWBUF *buf = m_buffer.release(); + + if (modutil_extract_SQL(buf, &sql, &sql_len)) { str.append(sql, sql_len); } + m_buffer.reset(buf); + return str; } diff --git a/server/modules/routing/schemarouter/session_command.hh b/server/modules/routing/schemarouter/session_command.hh index 569e5ffc6..f0aefe5c8 100644 --- a/server/modules/routing/schemarouter/session_command.hh +++ b/server/modules/routing/schemarouter/session_command.hh @@ -37,6 +37,20 @@ public: */ bool is_reply_received() const; + /** + * @brief Get the command type of the session command + * + * @return The type of the command + */ + uint8_t get_command() const; + + /** + * @brief Get the position of this session command + * + * @return The position of the session command + */ + uint64_t get_position() const; + /** * @brief Creates a copy of the internal buffer * @return A copy of the internal buffer @@ -48,8 +62,9 @@ public: * * @param buffer The buffer containing the command. Note that the ownership * of @c buffer is transferred to this object. + * @param id A unique position identifier used to track replies */ - SessionCommand(GWBUF *buffer); + SessionCommand(GWBUF *buffer, uint64_t id); ~SessionCommand(); @@ -61,8 +76,10 @@ public: std::string to_string(); private: - Buffer m_buffer; /**< The buffer containing the command */ - bool m_replySent; /**< Whether the session command reply has been sent */ + Buffer m_buffer; /**< The buffer containing the command */ + uint8_t m_command; /**< The command being executed */ + uint64_t m_pos; /**< Unique position identifier */ + bool m_replySent; /**< Whether the session command reply has been sent */ SessionCommand(); SessionCommand& operator = (const SessionCommand& command);