diff --git a/server/modules/routing/readwritesplit/readwritesplit.cc b/server/modules/routing/readwritesplit/readwritesplit.cc index ef4d4b9c2..d0062eec0 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.cc +++ b/server/modules/routing/readwritesplit/readwritesplit.cc @@ -49,19 +49,7 @@ * The functions that implement the router module API */ -static MXS_ROUTER *createInstance(SERVICE *service, char **options); -static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *instance, MXS_SESSION *session); -static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *session); -static void freeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *session); static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *session, GWBUF *queue); -static void diagnostics(MXS_ROUTER *instance, DCB *dcb); -static json_t* diagnostics_json(const MXS_ROUTER *instance); -static void clientReply(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue, - DCB *backend_dcb); -static void handleError(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, - GWBUF *errmsgbuf, DCB *backend_dcb, - mxs_error_action_t action, bool *succp); -static uint64_t getCapabilities(MXS_ROUTER* instance); static bool rwsplit_process_router_options(ROUTER_INSTANCE *router, char **options); @@ -72,7 +60,6 @@ static bool handle_error_new_connection(ROUTER_INSTANCE *inst, DCB *backend_dcb, GWBUF *errmsg); static bool have_enough_servers(ROUTER_CLIENT_SES *rses, const int min_nsrv, int router_nsrv, ROUTER_INSTANCE *router); -static bool create_backends(ROUTER_CLIENT_SES *rses, backend_ref_t** dest, int* n_backend); /** * Enum values for router parameters @@ -105,149 +92,6 @@ static const MXS_ENUM_VALUE master_failure_mode_values[] = * Internal functions */ -/* - * @brief Clear one or more bits in the backend reference state - * - * The router session holds details of the backend servers that are - * involved in the routing for this particular service. Each backend - * server has a state bit string, and this function (along with - * bref_set_state) is used to manage the state. - * - * @param bref The backend reference to be modified - * @param state A bit string where the 1 bits indicate bits that should - * be turned off in the bref state. - */ -void bref_clear_state(backend_ref_t *bref, bref_state_t state) -{ - ss_dassert(bref); - - if ((state & BREF_WAITING_RESULT) && (bref->bref_state & BREF_WAITING_RESULT)) - { - int prev1; - int prev2; - - /** Decrease waiter count */ - prev1 = atomic_add(&bref->bref_num_result_wait, -1); - - if (prev1 <= 0) - { - atomic_add(&bref->bref_num_result_wait, 1); - } - else - { - /** Decrease global operation count */ - prev2 = atomic_add(&bref->ref->server->stats.n_current_ops, -1); - ss_dassert(prev2 > 0); - if (prev2 <= 0) - { - MXS_ERROR("[%s] Error: negative current operation count in backend %s:%u", - __FUNCTION__, bref->ref->server->name, - bref->ref->server->port); - } - } - } - - bref->bref_state &= ~state; -} - -/* - * @brief Set one or more bits in the backend reference state - * - * The router session holds details of the backend servers that are - * involved in the routing for this particular service. Each backend - * server has a state bit string, and this function (along with - * bref_clear_state) is used to manage the state. - * - * @param bref The backend reference to be modified - * @param state A bit string where the 1 bits indicate bits that should - * be turned on in the bref state. - */ -void bref_set_state(backend_ref_t *bref, bref_state_t state) -{ - ss_dassert(bref); - - if ((state & BREF_WAITING_RESULT) && (bref->bref_state & BREF_WAITING_RESULT) == 0) - { - int prev1; - int prev2; - - /** Increase waiter count */ - prev1 = atomic_add(&bref->bref_num_result_wait, 1); - ss_dassert(prev1 >= 0); - if (prev1 < 0) - { - MXS_ERROR("[%s] Error: negative number of connections waiting for " - "results in backend %s:%u", __FUNCTION__, - bref->ref->server->name, bref->ref->server->port); - } - /** Increase global operation count */ - prev2 = atomic_add(&bref->ref->server->stats.n_current_ops, 1); - ss_dassert(prev2 >= 0); - if (prev2 < 0) - { - MXS_ERROR("[%s] Error: negative current operation count in backend %s:%u", - __FUNCTION__, bref->ref->server->name, bref->ref->server->port); - } - } - - bref->bref_state |= state; -} - -/** - * @brief Create a generic router session property structure. - * - * @param prop_type Property type - * - * @return property structure of requested type, or NULL if failed - */ -rses_property_t *rses_property_init(rses_property_type_t prop_type) -{ - rses_property_t* prop = new (std::nothrow) rses_property_t; - - if (prop == NULL) - { - MXS_OOM(); - return NULL; - } - - prop->rses_prop_type = prop_type; - prop->rses_prop_next = NULL; - prop->rses_prop_refcount = 1; - prop->rses_prop_chk_top = CHK_NUM_ROUTER_PROPERTY; - prop->rses_prop_chk_tail = CHK_NUM_ROUTER_PROPERTY; - - return prop; -} - -/** - * @brief Free resources belonging to a property - * - * Property is freed at the end of router client session. - * - * @param prop The property whose resources are to be released - */ -void rses_property_done(rses_property_t *prop) -{ - ss_dassert(prop); - CHK_RSES_PROP(prop); - - switch (prop->rses_prop_type) - { - case RSES_PROP_TYPE_SESCMD: - mysql_sescmd_done(&prop->rses_prop_data.sescmd); - break; - - default: - MXS_DEBUG("%lu [rses_property_done] Unknown property type %d " - "in property %p", pthread_self(), prop->rses_prop_type, prop); - - ss_dassert(false); - break; - } - - delete prop; -} - /** * @brief Get count of backend servers that are slaves. * @@ -304,6 +148,14 @@ int rses_get_max_replication_lag(ROUTER_CLIENT_SES *rses) return conf_max_rlag; } +namespace +{ + +/** This will never get used but it should catch faults in the code */ +static SRWBackend no_backend; + +} + /** * @brief Find a back end reference that matches the given DCB * @@ -315,23 +167,26 @@ int rses_get_max_replication_lag(ROUTER_CLIENT_SES *rses) * * @return backend reference pointer if succeed or NULL */ -backend_ref_t *get_bref_from_dcb(ROUTER_CLIENT_SES *rses, DCB *dcb) +SRWBackend& get_bref_from_dcb(ROUTER_CLIENT_SES *rses, DCB *dcb) { ss_dassert(dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER); CHK_DCB(dcb); CHK_CLIENT_RSES(rses); - for (int i = 0; i < rses->rses_nbackends; i++) + for (SRWBackendList::iterator it = rses->backends.begin(); + it != rses->backends.end(); it++) { - if (rses->rses_backend_ref[i].bref_dcb == dcb) + SRWBackend& backend = *it; + + if (backend->dcb() == dcb) { - return &rses->rses_backend_ref[i]; + return backend; } } /** We should always have a valid backend reference */ ss_dassert(false); - return NULL; + return no_backend; } /** @@ -478,30 +333,13 @@ static bool handle_max_slaves(ROUTER_INSTANCE *router, const char *str) static void handle_error_reply_client(MXS_SESSION *ses, ROUTER_CLIENT_SES *rses, DCB *backend_dcb, GWBUF *errmsg) { - mxs_session_state_t sesstate; - DCB *client_dcb; - backend_ref_t *bref; - sesstate = ses->state; - client_dcb = ses->client_dcb; + mxs_session_state_t sesstate = ses->state; + DCB *client_dcb = ses->client_dcb; - if ((bref = get_bref_from_dcb(rses, backend_dcb)) != NULL) - { - CHK_BACKEND_REF(bref); + SRWBackend& bref = get_bref_from_dcb(rses, backend_dcb); - if (BREF_IS_IN_USE(bref)) - { - close_failed_bref(bref, false); - RW_CHK_DCB(bref, backend_dcb); - dcb_close(backend_dcb); - RW_CLOSE_BREF(bref); - } - } - else - { - // All dcbs should be associated with a backend reference. - ss_dassert(!true); - } + bref->close(); if (sesstate == SESSION_STATE_ROUTER_READY) { @@ -510,7 +348,7 @@ static void handle_error_reply_client(MXS_SESSION *ses, ROUTER_CLIENT_SES *rses, } } -static bool reroute_stored_statement(ROUTER_CLIENT_SES *rses, backend_ref_t *old, GWBUF *stored) +static bool reroute_stored_statement(ROUTER_CLIENT_SES *rses, SRWBackend& old, GWBUF *stored) { bool success = false; @@ -520,21 +358,22 @@ static bool reroute_stored_statement(ROUTER_CLIENT_SES *rses, backend_ref_t *old * Only try to retry the read if autocommit is enabled and we are * outside of a transaction */ - for (int i = 0; i < rses->rses_nbackends; i++) + for (SRWBackendList::iterator it = rses->backends.begin(); + it != rses->backends.end(); it++) { - backend_ref_t *bref = &rses->rses_backend_ref[i]; + SRWBackend& bref = *it; - if (BREF_IS_IN_USE(bref) && bref != old && - !SERVER_IS_MASTER(bref->ref->server) && - SERVER_IS_SLAVE(bref->ref->server)) + if (bref->in_use() && bref != old && + !SERVER_IS_MASTER(bref->server()) && + SERVER_IS_SLAVE(bref->server())) { /** Found a valid candidate; a non-master slave that's in use */ - if (bref->bref_dcb->func.write(bref->bref_dcb, stored)) + if (bref->write(stored)) { - MXS_INFO("Retrying failed read at '%s'.", bref->ref->server->unique_name); - ss_dassert(bref->reply_state == REPLY_STATE_DONE); + MXS_INFO("Retrying failed read at '%s'.", bref->server()->unique_name); + ss_dassert(bref->get_reply_state() == REPLY_STATE_DONE); LOG_RS(bref, REPLY_STATE_START); - bref->reply_state = REPLY_STATE_START; + bref->set_reply_state(REPLY_STATE_START); rses->expected_responses++; success = true; break; @@ -542,21 +381,18 @@ static bool reroute_stored_statement(ROUTER_CLIENT_SES *rses, backend_ref_t *old } } - if (!success && rses->rses_master_ref && BREF_IS_IN_USE(rses->rses_master_ref) && - rses->current_master && rses->current_master->in_use()) + if (!success && rses->current_master && rses->current_master->in_use()) { /** * Either we failed to write to the slave or no valid slave was found. * Try to retry the read on the master. */ - backend_ref_t *bref = rses->rses_master_ref; - - if (bref->bref_dcb->func.write(bref->bref_dcb, stored)) + if (rses->current_master->write(stored)) { - MXS_INFO("Retrying failed read at '%s'.", bref->ref->server->unique_name); - LOG_RS(bref, REPLY_STATE_START); - ss_dassert(bref->reply_state == REPLY_STATE_DONE); - bref->reply_state = REPLY_STATE_START; + MXS_INFO("Retrying failed read at '%s'.", rses->current_master->server()->unique_name); + LOG_RS(rses->current_master, REPLY_STATE_START); + ss_dassert(rses->current_master->get_reply_state() == REPLY_STATE_DONE); + rses->current_master->set_reply_state(REPLY_STATE_START); rses->expected_responses++; success = true; } @@ -585,30 +421,13 @@ static bool handle_error_new_connection(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES **rses, DCB *backend_dcb, GWBUF *errmsg) { - ROUTER_CLIENT_SES *myrses; - MXS_SESSION *ses; - int max_nslaves; - int max_slave_rlag; - backend_ref_t *bref; - bool succp; + ROUTER_CLIENT_SES *myrses = *rses; + SRWBackend& bref = get_bref_from_dcb(myrses, backend_dcb); - myrses = *rses; - - ses = backend_dcb->session; + MXS_SESSION* ses = backend_dcb->session; CHK_SESSION(ses); - /** - * If bref == NULL it has been replaced already with another one. - * - * NOTE: This can never happen. - */ - if ((bref = get_bref_from_dcb(myrses, backend_dcb)) == NULL) - { - return true; - } - CHK_BACKEND_REF(bref); - - if (BREF_IS_WAITING_RESULT(bref)) + if (bref->is_waiting_result()) { /** * A query was sent through the backend and it is waiting for a reply. @@ -619,7 +438,7 @@ static bool handle_error_new_connection(ROUTER_INSTANCE *inst, const SERVER *target; if (!session_take_stmt(backend_dcb->session, &stored, &target) || - target != bref->ref->server || + target != bref->backend()->server || !reroute_stored_statement(*rses, bref, stored)) { /** @@ -630,7 +449,7 @@ static bool handle_error_new_connection(ROUTER_INSTANCE *inst, gwbuf_free(stored); myrses->expected_responses--; - if (!sescmd_cursor_is_active(&bref->bref_sescmd_cur)) + if (bref->session_command_count()) { /** * The backend was executing a command that requires a reply. @@ -652,12 +471,11 @@ static bool handle_error_new_connection(ROUTER_INSTANCE *inst, } } - RW_CHK_DCB(bref, backend_dcb); - dcb_close(backend_dcb); - RW_CLOSE_BREF(bref); - close_failed_bref(bref, false); - max_nslaves = rses_get_max_slavecount(myrses, myrses->rses_nbackends); - max_slave_rlag = rses_get_max_replication_lag(myrses); + /** Close the current connection */ + bref->close(); + + int max_nslaves = rses_get_max_slavecount(myrses, myrses->rses_nbackends); + bool succp; /** * Try to get replacement slave or at least the minimum * number of slave connections for router session. @@ -668,10 +486,7 @@ static bool handle_error_new_connection(ROUTER_INSTANCE *inst, } else { - succp = select_connect_backend_servers(&myrses->rses_master_ref, - myrses->rses_backend_ref, - myrses->rses_nbackends, - max_nslaves, max_slave_rlag, + succp = select_connect_backend_servers(myrses->rses_nbackends, max_nslaves, myrses->rses_config.slave_selection_criteria, ses, inst, myrses, true); } @@ -737,94 +552,6 @@ static bool have_enough_servers(ROUTER_CLIENT_SES *rses, const int min_nsrv, return succp; } -/** - * @brief Create backend server references - * - * This creates a new set of backend references for the client session. Currently - * this is only used on startup but it could be used to dynamically change the - * set of used servers. - * - * @param rses Client router session - * @param dest Destination where the array of backens is stored - * @param n_backend Number of items in the array - * @return True on success, false on error - */ -static bool create_backends(ROUTER_CLIENT_SES *rses, backend_ref_t** dest, int* n_backend) -{ - backend_ref_t* backend_ref = new (std::nothrow) backend_ref_t[*n_backend]; - - if (backend_ref == NULL) - { - return false; - } - - int i = 0; - - for (SERVER_REF *sref = rses->router->service->dbref; sref && i < *n_backend; sref = sref->next) - { - if (sref->active) - { - rses->backends.push_back(SRWBackend(new RWBackend(sref))); - - backend_ref[i].bref_chk_top = CHK_NUM_BACKEND_REF; - backend_ref[i].bref_chk_tail = CHK_NUM_BACKEND_REF; - backend_ref[i].bref_sescmd_cur.scmd_cur_chk_top = CHK_NUM_SESCMD_CUR; - backend_ref[i].bref_sescmd_cur.scmd_cur_chk_tail = CHK_NUM_SESCMD_CUR; - backend_ref[i].closed_at = 0; - backend_ref[i].bref_state = 0; - backend_ref[i].ref = sref; - backend_ref[i].reply_state = REPLY_STATE_DONE; - backend_ref[i].reply_cmd = 0; - backend_ref[i].bref_dcb = NULL; - backend_ref[i].bref_num_result_wait = 0; - /** store pointers to sescmd list to both cursors */ - backend_ref[i].bref_sescmd_cur.scmd_cur_rses = rses; - backend_ref[i].bref_sescmd_cur.scmd_cur_active = false; - backend_ref[i].bref_sescmd_cur.scmd_cur_ptr_property = - &rses->rses_properties[RSES_PROP_TYPE_SESCMD]; - backend_ref[i].bref_sescmd_cur.scmd_cur_cmd = NULL; - i++; - } - } - - if (i < *n_backend) - { - MXS_INFO("The service reported %d servers but only took %d into use.", *n_backend, i); - *n_backend = i; - } - - *dest = backend_ref; - return true; -} - -/** - * @brief Mark a backend reference as failed - * - * @param bref Backend reference to close - * @param fatal Whether the failure was fatal - */ -void close_failed_bref(backend_ref_t *bref, bool fatal) -{ - if (BREF_IS_WAITING_RESULT(bref)) - { - bref_clear_state(bref, BREF_WAITING_RESULT); - } - - bref_clear_state(bref, BREF_QUERY_ACTIVE); - bref_clear_state(bref, BREF_IN_USE); - bref_set_state(bref, BREF_CLOSED); - - if (fatal) - { - bref_set_state(bref, BREF_FATAL_FAILURE); - } - - if (sescmd_cursor_is_active(&bref->bref_sescmd_cur)) - { - sescmd_cursor_set_active(&bref->bref_sescmd_cur, false); - } -} - bool route_stored_query(ROUTER_CLIENT_SES *rses) { bool rval = true; @@ -871,36 +598,36 @@ bool route_stored_query(ROUTER_CLIENT_SES *rses) * * @return True if the complete response has been received */ -bool reply_is_complete(backend_ref_t* bref, GWBUF *buffer) +bool reply_is_complete(SRWBackend& bref, GWBUF *buffer) { - mysql_server_cmd_t cmd = mxs_mysql_current_command(bref->bref_dcb->session); + mysql_server_cmd_t cmd = mxs_mysql_current_command(bref->dcb()->session); - if (bref->reply_state == REPLY_STATE_START && !mxs_mysql_is_result_set(buffer)) + if (bref->get_reply_state() == REPLY_STATE_START && !mxs_mysql_is_result_set(buffer)) { if (cmd == MYSQL_COM_STMT_PREPARE || !mxs_mysql_more_results_after_ok(buffer)) { /** Not a result set, we have the complete response */ LOG_RS(bref, REPLY_STATE_DONE); - bref->reply_state = REPLY_STATE_DONE; + bref->set_reply_state(REPLY_STATE_DONE); } } else { bool more = false; - int old_eof = bref->reply_state == REPLY_STATE_RSET_ROWS ? 1 : 0; + int old_eof = bref->get_reply_state() == REPLY_STATE_RSET_ROWS ? 1 : 0; int n_eof = modutil_count_signal_packets(buffer, old_eof, &more); if (n_eof == 0) { /** Waiting for the EOF packet after the column definitions */ LOG_RS(bref, REPLY_STATE_RSET_COLDEF); - bref->reply_state = REPLY_STATE_RSET_COLDEF; + bref->set_reply_state(REPLY_STATE_RSET_COLDEF); } else if (n_eof == 1 && cmd != MYSQL_COM_FIELD_LIST) { /** Waiting for the EOF packet after the rows */ LOG_RS(bref, REPLY_STATE_RSET_ROWS); - bref->reply_state = REPLY_STATE_RSET_ROWS; + bref->set_reply_state(REPLY_STATE_RSET_ROWS); } else { @@ -908,32 +635,18 @@ bool reply_is_complete(backend_ref_t* bref, GWBUF *buffer) * a COM_FIELD_LIST command */ ss_dassert(n_eof == 2 || (n_eof == 1 && cmd == MYSQL_COM_FIELD_LIST)); LOG_RS(bref, REPLY_STATE_DONE); - bref->reply_state = REPLY_STATE_DONE; + bref->set_reply_state(REPLY_STATE_DONE); if (more) { /** The server will send more resultsets */ LOG_RS(bref, REPLY_STATE_START); - bref->reply_state = REPLY_STATE_START; + bref->set_reply_state(REPLY_STATE_START); } } } - return bref->reply_state == REPLY_STATE_DONE; -} - -SRWBackend get_backend_from_bref(ROUTER_CLIENT_SES* rses, backend_ref_t* bref) -{ - for (SRWBackendList::iterator it = rses->backends.begin(); - it != rses->backends.end(); it++) - { - if ((*it)->backend() == bref->ref) - { - return *it; - } - } - - return SRWBackend(); + return bref->get_reply_state() == REPLY_STATE_DONE; } void close_all_connections(ROUTER_CLIENT_SES* rses) @@ -941,14 +654,11 @@ void close_all_connections(ROUTER_CLIENT_SES* rses) for (SRWBackendList::iterator it = rses->backends.begin(); it != rses->backends.end(); it++) { - if (BREF_IS_IN_USE((&backend_ref[i]))) - { - SRWBackend& bref = *it; + SRWBackend& bref = *it; - if (bref->in_use()) - { - bref->close(); - } + if (bref->in_use()) + { + bref->close(); } } } @@ -1059,36 +769,38 @@ static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *router_inst, MXS_SESSION *sess client_rses->rses_chk_top = CHK_NUM_ROUTER_SES; client_rses->rses_chk_tail = CHK_NUM_ROUTER_SES; client_rses->rses_closed = false; - client_rses->rses_properties[RSES_PROP_TYPE_SESCMD] = NULL; client_rses->router = router; client_rses->client_dcb = session->client_dcb; client_rses->have_tmp_tables = false; - client_rses->forced_node = NULL; client_rses->expected_responses = 0; client_rses->query_queue = NULL; client_rses->load_data_state = LOAD_DATA_INACTIVE; + client_rses->sent_sescmd = 0; + client_rses->recv_sescmd = 0; memcpy(&client_rses->rses_config, &router->rwsplit_config, sizeof(client_rses->rses_config)); int router_nservers = router->service->n_dbref; const int min_nservers = 1; /*< hard-coded for now */ - backend_ref_t *backend_ref; - if (!have_enough_servers(client_rses, min_nservers, router_nservers, router) || - !create_backends(client_rses, &backend_ref, &router_nservers)) + if (!have_enough_servers(client_rses, min_nservers, router_nservers, router)) { delete client_rses; return NULL; } - int max_nslaves = rses_get_max_slavecount(client_rses, router_nservers); - int max_slave_rlag = rses_get_max_replication_lag(client_rses); + for (SERVER_REF *sref = router->service->dbref; sref; sref = sref->next) + { + if (sref->active) + { + client_rses->backends.push_back(SRWBackend(new RWBackend(sref))); + } + } + + int max_nslaves = rses_get_max_slavecount(client_rses, router_nservers); - client_rses->rses_backend_ref = backend_ref; client_rses->rses_nbackends = router_nservers; /*< # of backend servers */ - backend_ref_t *master_ref = NULL; /*< pointer to selected master */ - if (!select_connect_backend_servers(&master_ref, backend_ref, router_nservers, - max_nslaves, max_slave_rlag, + if (!select_connect_backend_servers(router_nservers, max_nslaves, client_rses->rses_config.slave_selection_criteria, session, router, client_rses, false)) { @@ -1097,15 +809,10 @@ static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *router_inst, MXS_SESSION *sess * in the strict mode. If sessions without master are allowed, only * slaves must be found. */ - delete[] client_rses->rses_backend_ref; delete client_rses; return NULL; } - /** Copy backend pointers to router session. */ - client_rses->rses_master_ref = master_ref; - client_rses->current_master = get_backend_from_bref(client_rses, master_ref); - if (client_rses->rses_config.rw_max_slave_conn_percent) { int n_conn = 0; @@ -1143,62 +850,7 @@ static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_sessio * of every API function to quickly stop the processing of closed sessions. */ router_cli_ses->rses_closed = true; - - for (int i = 0; i < router_cli_ses->rses_nbackends; i++) - { - backend_ref_t *bref = &router_cli_ses->rses_backend_ref[i]; - - if (BREF_IS_IN_USE(bref)) - { - /** This backend is in use and it needs to be closed */ - DCB *dcb = bref->bref_dcb; - CHK_DCB(dcb); - ss_dassert(dcb->session->state == SESSION_STATE_STOPPING); - - if (BREF_IS_WAITING_RESULT(bref)) - { - /** This backend was executing a query when the session was closed */ - bref_clear_state(bref, BREF_WAITING_RESULT); - } - bref_clear_state(bref, BREF_IN_USE); - bref_set_state(bref, BREF_CLOSED); - - RW_CHK_DCB(bref, dcb); - - /** MXS-956: This will prevent closed DCBs from being closed twice. - * It should not happen but for currently unknown reasons, a DCB - * gets closed twice; first in handleError and a second time here. */ - if (dcb && dcb->state == DCB_STATE_POLLING) - { - dcb_close(dcb); - } - - RW_CLOSE_BREF(bref); - - /** decrease server current connection counters */ - atomic_add(&bref->ref->connections, -1); - } - else - { - ss_dassert(!BREF_IS_WAITING_RESULT(bref)); - - /** This should never be true unless a backend reference is taken - * out of use before clearing the BREF_WAITING_RESULT state */ - if (BREF_IS_WAITING_RESULT(bref)) - { - MXS_WARNING("A closed backend was expecting a result, this should not be possible. " - "Decrementing active operation counter for this backend."); - bref_clear_state(bref, BREF_WAITING_RESULT); - } - } - } - - for (SRWBackendList::iterator it = router_cli_ses->backends.begin(); - it != router_cli_ses->backends.end(); it++) - { - SRWBackend& backend = *it; - backend->close(); - } + close_all_connections(router_cli_ses); if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO) && router_cli_ses->sescmd_list.size()) @@ -1231,25 +883,6 @@ static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_sessio static void freeSession(MXS_ROUTER *router_instance, MXS_ROUTER_SESSION *router_client_session) { ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *)router_client_session; - - /** - * For each property type, walk through the list, finalize properties - * and free the allocated memory. - */ - for (int i = RSES_PROP_TYPE_FIRST; i < RSES_PROP_TYPE_COUNT; i++) - { - rses_property_t *p = router_cli_ses->rses_properties[i]; - rses_property_t *q = p; - - while (p != NULL) - { - q = p->rses_prop_next; - rses_property_done(p); - p = q; - } - } - - delete[] router_cli_ses->rses_backend_ref; delete router_cli_ses; } @@ -1481,20 +1114,18 @@ static void clientReply(MXS_ROUTER *instance, * and */ - backend_ref_t *bref = get_bref_from_dcb(router_cli_ses, backend_dcb); - CHK_BACKEND_REF(bref); - sescmd_cursor_t *scur = &bref->bref_sescmd_cur; + SRWBackend& bref = get_bref_from_dcb(router_cli_ses, backend_dcb); /** Statement was successfully executed, free the stored statement */ session_clear_stmt(backend_dcb->session); - ss_dassert(bref->reply_state != REPLY_STATE_DONE); + ss_dassert(bref->get_reply_state() != REPLY_STATE_DONE); if (reply_is_complete(bref, writebuf)) { /** Got a complete reply, decrement expected response count */ router_cli_ses->expected_responses--; ss_dassert(router_cli_ses->expected_responses >= 0); - ss_dassert(bref->reply_state == REPLY_STATE_DONE); + ss_dassert(bref->get_reply_state() == REPLY_STATE_DONE); } else { @@ -1505,9 +1136,9 @@ static void clientReply(MXS_ROUTER *instance, * Active cursor means that reply is from session command * execution. */ - if (sescmd_cursor_is_active(scur)) + if (bref->session_command_count()) { - check_session_command_reply(writebuf, scur, bref); + check_session_command_reply(writebuf, bref); if (GWBUF_IS_TYPE_SESCMD_RESPONSE(writebuf)) { @@ -1517,49 +1148,33 @@ static void clientReply(MXS_ROUTER *instance, * needs to be sent to client or NULL. */ bool rconn = false; - writebuf = sescmd_cursor_process_replies(writebuf, bref, &rconn); + process_sescmd_response(router_cli_ses, bref, &writebuf, &rconn); if (rconn && !router_inst->rwsplit_config.disable_sescmd_history) { select_connect_backend_servers( - &router_cli_ses->rses_master_ref, router_cli_ses->rses_backend_ref, router_cli_ses->rses_nbackends, router_cli_ses->rses_config.max_slave_connections, - router_cli_ses->rses_config.max_slave_replication_lag, router_cli_ses->rses_config.slave_selection_criteria, - router_cli_ses->rses_master_ref->bref_dcb->session, + router_cli_ses->client_dcb->session, router_cli_ses->router, router_cli_ses, true); } } - /** - * If response will be sent to client, decrease waiter count. - * This applies to session commands only. Counter decrement - * for other type of queries is done outside this block. - */ + } - /** Set response status as replied */ - bref_clear_state(bref, BREF_WAITING_RESULT); - } - /** - * Clear BREF_QUERY_ACTIVE flag and decrease waiter counter. - * This applies for queries other than session commands. - */ - else if (BREF_IS_QUERY_ACTIVE(bref)) - { - bref_clear_state(bref, BREF_QUERY_ACTIVE); - /** Set response status as replied */ - bref_clear_state(bref, BREF_WAITING_RESULT); - } + /** Complete the write */ + bref->ack_write(); bool queue_routed = false; if (router_cli_ses->expected_responses == 0) { - for (int i = 0; i < router_cli_ses->rses_nbackends; i++) + for (SRWBackendList::iterator it = router_cli_ses->backends.begin(); + it != router_cli_ses->backends.end(); it++) { - ss_dassert(router_cli_ses->rses_backend_ref[i].reply_state == REPLY_STATE_DONE); + ss_dassert((*it)->get_reply_state() == REPLY_STATE_DONE); } queue_routed = router_cli_ses->query_queue != NULL; @@ -1569,18 +1184,19 @@ static void clientReply(MXS_ROUTER *instance, { ss_dassert(router_cli_ses->expected_responses > 0); } - if (writebuf != NULL && client_dcb != NULL) + + if (writebuf && client_dcb) { /** Write reply to client DCB */ MXS_SESSION_ROUTE_REPLY(backend_dcb->session, writebuf); } - /** There is one pending session command to be executed. */ - else if (!queue_routed && sescmd_cursor_is_active(scur)) + /** Check pending session commands */ + else if (!queue_routed && bref->session_command_count()) { MXS_INFO("Backend [%s]:%d processed reply and starts to execute active cursor.", - bref->ref->server->name, bref->ref->server->port); + bref->server()->name, bref->server()->port); - if (execute_sescmd_in_backend(bref)) + if (bref->execute_session_command()) { router_cli_ses->expected_responses++; } @@ -1649,7 +1265,7 @@ static void handleError(MXS_ROUTER *instance, MXS_SESSION *session = problem_dcb->session; ss_dassert(session); - backend_ref_t *bref = get_bref_from_dcb(rses, problem_dcb); + SRWBackend& bref = get_bref_from_dcb(rses, problem_dcb); switch (action) { @@ -1659,13 +1275,13 @@ static void handleError(MXS_ROUTER *instance, * If master has lost its Master status error can't be * handled so that session could continue. */ - if (rses->rses_master_ref && rses->rses_master_ref->bref_dcb == problem_dcb) + if (rses->current_master && rses->current_master->dcb() == problem_dcb) { - SERVER *srv = rses->rses_master_ref->ref->server; + SERVER *srv = rses->current_master->server(); bool can_continue = false; if (rses->rses_config.master_failure_mode != RW_FAIL_INSTANTLY && - (bref == NULL || !BREF_IS_WAITING_RESULT(bref))) + (!bref || !bref->is_waiting_result())) { /** The failure of a master is not considered a critical * failure as partial functionality still remains. Reads @@ -1688,13 +1304,9 @@ static void handleError(MXS_ROUTER *instance, *succp = can_continue; - if (bref != NULL) + if (bref) { - CHK_BACKEND_REF(bref); - RW_CHK_DCB(bref, problem_dcb); - dcb_close(problem_dcb); - RW_CLOSE_BREF(bref); - close_failed_bref(bref, true); + bref->close(mxs::Backend::CLOSE_FATAL); } else { @@ -1704,19 +1316,18 @@ static void handleError(MXS_ROUTER *instance, } else if (bref) { - /** Check whether problem_dcb is same as dcb of rses->forced_node + /** Check whether problem_dcb is same as dcb of rses->target_node * and within READ ONLY transaction: - * if true reset rses->forced_node and close session + * if true reset rses->target_node and close session */ - if (rses->forced_node && - (rses->forced_node->bref_dcb == problem_dcb && + if (rses->target_node && + (rses->target_node->dcb() == problem_dcb && session_trx_is_read_only(problem_dcb->session))) { MXS_ERROR("forced_node SLAVE %s in opened READ ONLY transaction has failed:" " closing session", problem_dcb->server->unique_name); - rses->forced_node = NULL; rses->target_node.reset(); *succp = false; break; @@ -1731,11 +1342,11 @@ static void handleError(MXS_ROUTER *instance, if (bref) { /** This is a valid DCB for a backend ref */ - if (BREF_IS_IN_USE(bref) && bref->bref_dcb == problem_dcb) + if (bref->in_use() && bref->dcb() == problem_dcb) { ss_dassert(false); MXS_ERROR("Backend '%s' is still in use and points to the problem DCB.", - bref->ref->server->unique_name); + bref->server()->unique_name); } } else diff --git a/server/modules/routing/readwritesplit/readwritesplit.hh b/server/modules/routing/readwritesplit/readwritesplit.hh index 2c3a1ed48..5b42a70a6 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.hh +++ b/server/modules/routing/readwritesplit/readwritesplit.hh @@ -30,15 +30,6 @@ #include #include -enum bref_state_t -{ - BREF_IN_USE = 0x01, - BREF_WAITING_RESULT = 0x02, /**< for session commands only */ - BREF_QUERY_ACTIVE = 0x04, /**< for other queries */ - BREF_CLOSED = 0x08, - BREF_FATAL_FAILURE = 0x10 /**< Backend references that should be dropped */ -}; - enum backend_type_t { BE_UNDEFINED = -1, @@ -58,15 +49,6 @@ enum route_target_t TARGET_RLAG_MAX = 0x10 }; -enum rses_property_type_t -{ - RSES_PROP_TYPE_UNDEFINED = -1, - RSES_PROP_TYPE_SESCMD = 0, - RSES_PROP_TYPE_FIRST = RSES_PROP_TYPE_SESCMD, - RSES_PROP_TYPE_LAST = RSES_PROP_TYPE_SESCMD, - RSES_PROP_TYPE_COUNT = RSES_PROP_TYPE_LAST + 1 -}; - /** * This criteria is used when backends are chosen for a router session's use. * Backend servers are sorted to ascending order according to the criteria @@ -133,59 +115,12 @@ enum ld_state (SERVER_IS_SLAVE((b)->backend_server) ? BE_SLAVE : BE_UNDEFINED)); /** Reply state change debug logging */ -#define LOG_RS(a, b) MXS_DEBUG("[%s]:%d %s -> %s", (a)->ref->server->name, \ - (a)->ref->server->port, rstostr((a)->reply_state), rstostr(b)); +#define LOG_RS(a, b) MXS_DEBUG("[%s]:%d %s -> %s", (a)->server()->name, \ + (a)->server()->port, rstostr((a)->get_reply_state()), rstostr(b)); -struct rses_property_t; struct ROUTER_INSTANCE; struct ROUTER_CLIENT_SES; -/** - * Session variable command - */ -struct mysql_sescmd_t -{ - skygw_chk_t my_sescmd_chk_top; - struct rses_property_t* my_sescmd_prop; /**< parent property */ - GWBUF* my_sescmd_buf; /**< query buffer */ - unsigned char my_sescmd_packet_type; /**< packet type */ - bool my_sescmd_is_replied; /**< is cmd replied to client */ - unsigned char reply_cmd; /**< The reply command. One of OK, ERR, RESULTSET or - * LOCAL_INFILE. Slave servers are compared to this - * when they return session command replies.*/ - uint64_t position; /**< Position of this command */ - skygw_chk_t my_sescmd_chk_tail; -}; - -/** - * Property structure - */ -struct rses_property_t -{ - skygw_chk_t rses_prop_chk_top; - ROUTER_CLIENT_SES* rses_prop_rsession; /**< parent router session */ - int rses_prop_refcount; - rses_property_type_t rses_prop_type; - - struct rses_prop_data // TODO: Remove the properties and integrate them into the session object - { - mysql_sescmd_t sescmd; - } rses_prop_data; - rses_property_t* rses_prop_next; /**< next property of same type */ - skygw_chk_t rses_prop_chk_tail; -}; - -struct sescmd_cursor_t -{ - skygw_chk_t scmd_cur_chk_top; - ROUTER_CLIENT_SES* scmd_cur_rses; /**< pointer to owning router session */ - rses_property_t** scmd_cur_ptr_property; /**< address of pointer to owner property */ - mysql_sescmd_t* scmd_cur_cmd; /**< pointer to current session command */ - bool scmd_cur_active; /**< true if command is being executed */ - uint64_t position; /**< Position of this cursor */ - skygw_chk_t scmd_cur_chk_tail; -}; - /** Enum for tracking client reply state */ enum reply_state_t { @@ -258,7 +193,6 @@ struct ROUTER_CLIENT_SES { skygw_chk_t rses_chk_top; bool rses_closed; /**< true when closeSession is called */ - rses_property_t* rses_properties[RSES_PROP_TYPE_COUNT]; /**< Properties listed by their type */ SRWBackendList backends; /**< List of backend servers */ SRWBackend current_master; /**< Current master server */ SRWBackend target_node; /**< The currently locked target node */ diff --git a/server/modules/routing/readwritesplit/rwsplit_internal.hh b/server/modules/routing/readwritesplit/rwsplit_internal.hh index b071a21c7..028af9ce2 100644 --- a/server/modules/routing/readwritesplit/rwsplit_internal.hh +++ b/server/modules/routing/readwritesplit/rwsplit_internal.hh @@ -36,26 +36,22 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, GWBUF *querybuf); void closed_session_reply(GWBUF *querybuf); void print_error_packet(ROUTER_CLIENT_SES *rses, GWBUF *buf, DCB *dcb); -void check_session_command_reply(GWBUF *writebuf, sescmd_cursor_t *scur, backend_ref_t *bref); -bool execute_sescmd_in_backend(backend_ref_t *backend_ref); +void check_session_command_reply(GWBUF *writebuf, SRWBackend bref); +bool execute_sescmd_in_backend(SRWBackend& backend_ref); bool handle_target_is_all(route_target_t route_target, ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, GWBUF *querybuf, int packet_type, uint32_t qtype); uint8_t determine_packet_type(GWBUF *querybuf, bool *non_empty_packet); void log_transaction_status(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, uint32_t qtype); bool is_packet_a_one_way_message(int packet_type); -sescmd_cursor_t *backend_ref_get_sescmd_cursor(backend_ref_t *bref); bool is_packet_a_query(int packet_type); bool send_readonly_error(DCB *dcb); /* * The following are implemented in readwritesplit.c */ -void bref_clear_state(backend_ref_t *bref, bref_state_t state); -void bref_set_state(backend_ref_t *bref, bref_state_t state); int router_handle_state_switch(DCB *dcb, DCB_REASON reason, void *data); -backend_ref_t *get_bref_from_dcb(ROUTER_CLIENT_SES *rses, DCB *dcb); -void rses_property_done(rses_property_t *prop); +SRWBackend& get_bref_from_dcb(ROUTER_CLIENT_SES *rses, DCB *dcb); int rses_get_max_slavecount(ROUTER_CLIENT_SES *rses, int router_nservers); int rses_get_max_replication_lag(ROUTER_CLIENT_SES *rses); @@ -65,57 +61,32 @@ int rses_get_max_replication_lag(ROUTER_CLIENT_SES *rses); bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, GWBUF *querybuf); -int rwsplit_hashkeyfun(const void *key); -int rwsplit_hashcmpfun(const void *v1, const void *v2); -void *rwsplit_hstrdup(const void *fval); -void rwsplit_hfree(void *fval); -bool rwsplit_get_dcb(DCB **p_dcb, ROUTER_CLIENT_SES *rses, backend_type_t btype, - char *name, int max_rlag); +bool get_target_backend(ROUTER_CLIENT_SES *rses, backend_type_t btype, + char *name, int max_rlag, SRWBackend& target); route_target_t get_route_target(ROUTER_CLIENT_SES *rses, uint32_t qtype, HINT *hint); -rses_property_t *rses_property_init(rses_property_type_t prop_type); -int rses_property_add(ROUTER_CLIENT_SES *rses, rses_property_t *prop); void handle_multi_temp_and_load(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, uint8_t packet_type, uint32_t *qtype); bool handle_hinted_target(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, - route_target_t route_target, DCB **target_dcb); + route_target_t route_target, SRWBackend& target); bool handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, - DCB **target_dcb); + SRWBackend& target); bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, - DCB **target_dcb); + SRWBackend& target); bool handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, - GWBUF *querybuf, DCB *target_dcb, bool store); + GWBUF *querybuf, SRWBackend& target, bool store); bool route_session_write(ROUTER_CLIENT_SES *router_cli_ses, GWBUF *querybuf, ROUTER_INSTANCE *inst, int packet_type, uint32_t qtype); -/* - * The following are implemented in rwsplit_session_cmd.c -*/ -mysql_sescmd_t *rses_property_get_sescmd(rses_property_t *prop); -mysql_sescmd_t *mysql_sescmd_init(rses_property_t *rses_prop, - GWBUF *sescmd_buf, - unsigned char packet_type, - ROUTER_CLIENT_SES *rses); -void mysql_sescmd_done(mysql_sescmd_t *sescmd); -mysql_sescmd_t *sescmd_cursor_get_command(sescmd_cursor_t *scur); -bool sescmd_cursor_is_active(sescmd_cursor_t *sescmd_cursor); -void sescmd_cursor_set_active(sescmd_cursor_t *sescmd_cursor, - bool value); -bool execute_sescmd_history(backend_ref_t *bref); -GWBUF *sescmd_cursor_clone_querybuf(sescmd_cursor_t *scur); -GWBUF *sescmd_cursor_process_replies(GWBUF *replybuf, - backend_ref_t *bref, - bool *reconnect); - +void process_sescmd_response(ROUTER_CLIENT_SES* rses, SRWBackend& bref, + GWBUF** ppPacket, bool* reconnect); /* * The following are implemented in rwsplit_select_backends.c */ -bool select_connect_backend_servers(backend_ref_t **p_master_ref, - backend_ref_t *backend_ref, - int router_nservers, int max_nslaves, - int max_slave_rlag, +bool select_connect_backend_servers(int router_nservers, + int max_nslaves, select_criteria_t select_criteria, MXS_SESSION *session, ROUTER_INSTANCE *router, @@ -133,5 +104,5 @@ void check_create_tmp_table(ROUTER_CLIENT_SES *router_cli_ses, GWBUF *querybuf, uint32_t type); bool check_for_multi_stmt(GWBUF *buf, void *protocol, uint8_t packet_type); uint32_t determine_query_type(GWBUF *querybuf, int packet_type, bool non_empty_packet); -void close_failed_bref(backend_ref_t *bref, bool fatal); -SRWBackend get_backend_from_bref(ROUTER_CLIENT_SES* rses, backend_ref_t* bref); + +void close_all_connections(ROUTER_CLIENT_SES* rses);