diff --git a/server/modules/routing/readwritesplit/readwritesplit.cc b/server/modules/routing/readwritesplit/readwritesplit.cc index 69cd91d16..a5c9c4071 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.cc +++ b/server/modules/routing/readwritesplit/readwritesplit.cc @@ -49,60 +49,6 @@ using namespace maxscale; /** Maximum number of slaves */ #define MAX_SLAVE_COUNT "255" -/* - * @brief Get the maximum replication lag for this router - * - * @param rses Router client session - * @return Replication lag from configuration or very large number - */ -int RWSplitSession::get_max_replication_lag() -{ - int conf_max_rlag; - - /** if there is no configured value, then longest possible int is used */ - if (rses_config.max_slave_replication_lag > 0) - { - conf_max_rlag = rses_config.max_slave_replication_lag; - } - else - { - conf_max_rlag = ~(1 << 31); - } - - return conf_max_rlag; -} - -/** - * @brief Find the backend reference that matches the given DCB - * - * @param dcb DCB to match - * - * @return The correct reference - */ -SRWBackend& RWSplitSession::get_backend_from_dcb(DCB *dcb) -{ - ss_dassert(dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER); - CHK_DCB(dcb); - - for (auto it = backends.begin(); it != backends.end(); it++) - { - SRWBackend& backend = *it; - - if (backend->in_use() && backend->dcb() == dcb) - { - return backend; - } - } - - /** We should always have a valid backend reference and in case we don't, - * something is terribly wrong. */ - MXS_ALERT("No reference to DCB %p found, aborting.", dcb); - raise(SIGABRT); - - // To make the compiler happy, we return a reference to a static value. - static SRWBackend this_should_not_happen; - return this_should_not_happen; -} /** * @brief Process router options @@ -241,266 +187,6 @@ static bool handle_max_slaves(Config& config, const char *str) return rval; } -/** - * @brief Handle an error reply for a client - * - * @param ses Session - * @param rses Router session - * @param backend_dcb DCB for the backend server that has failed - * @param errmsg GWBUF containing the error message - */ -void RWSplitSession::handle_error_reply_client(DCB *backend_dcb, GWBUF *errmsg) -{ - mxs_session_state_t sesstate = client_dcb->session->state; - SRWBackend& backend = get_backend_from_dcb(backend_dcb); - - backend->close(); - - if (sesstate == SESSION_STATE_ROUTER_READY) - { - CHK_DCB(client_dcb); - client_dcb->func.write(client_dcb, gwbuf_clone(errmsg)); - } -} - -bool RWSplitSession::reroute_stored_statement(const SRWBackend& old, GWBUF *stored) -{ - bool success = false; - - if (!session_trx_is_active(client_dcb->session)) - { - /** - * Only try to retry the read if autocommit is enabled and we are - * outside of a transaction - */ - for (auto it = backends.begin(); it != backends.end(); it++) - { - SRWBackend& backend = *it; - - if (backend->in_use() && backend != old && - !backend->is_master() && backend->is_slave()) - { - /** Found a valid candidate; a non-master slave that's in use */ - if (backend->write(stored)) - { - MXS_INFO("Retrying failed read at '%s'.", backend->name()); - ss_dassert(backend->get_reply_state() == REPLY_STATE_DONE); - backend->set_reply_state(REPLY_STATE_START); - expected_responses++; - success = true; - break; - } - } - } - - if (!success && current_master && current_master->in_use()) - { - /** - * Either we failed to write to the slave or no valid slave was found. - * Try to retry the read on the master. - */ - if (current_master->write(stored)) - { - MXS_INFO("Retrying failed read at '%s'.", current_master->name()); - ss_dassert(current_master->get_reply_state() == REPLY_STATE_DONE); - current_master->set_reply_state(REPLY_STATE_START); - expected_responses++; - success = true; - } - } - } - - return success; -} - -/** - * Check if there is backend reference pointing at failed DCB, and reset its - * flags. Then clear DCB's callback and finally : try to find replacement(s) - * for failed slave(s). - * - * This must be called with router lock. - * - * @param inst router instance - * @param rses router client session - * @param dcb failed DCB - * @param errmsg error message which is sent to client if it is waiting - * - * @return true if there are enough backend connections to continue, false if - * not - */ -bool RWSplitSession::handle_error_new_connection(DCB *backend_dcb, GWBUF *errmsg) -{ - SRWBackend& backend = get_backend_from_dcb(backend_dcb); - MXS_SESSION* ses = backend_dcb->session; - bool route_stored = false; - CHK_SESSION(ses); - - if (backend->is_waiting_result()) - { - ss_dassert(expected_responses > 0); - expected_responses--; - - /** - * A query was sent through the backend and it is waiting for a reply. - * Try to reroute the statement to a working server or send an error - * to the client. - */ - GWBUF *stored = NULL; - const SERVER *target = NULL; - if (!session_take_stmt(backend_dcb->session, &stored, &target) || - target != backend->backend()->server || - !reroute_stored_statement(backend, stored)) - { - /** - * We failed to route the stored statement or no statement was - * stored for this server. Either way we can safely free the buffer - * and decrement the expected response count. - */ - gwbuf_free(stored); - - if (!backend->have_session_commands()) - { - /** - * The backend was executing a command that requires a reply. - * Send an error to the client to let it know the query has - * failed. - */ - DCB *client_dcb = ses->client_dcb; - client_dcb->func.write(client_dcb, gwbuf_clone(errmsg)); - } - - if (expected_responses == 0) - { - /** The response from this server was the last one, try to - * route all queued queries */ - route_stored = true; - } - } - } - - /** Close the current connection. This needs to be done before routing any - * of the stored queries. If we route a stored query before the connection - * is closed, it's possible that the routing logic will pick the failed - * server as the target. */ - backend->close(); - - if (route_stored) - { - route_stored_query(); - } - - int max_nslaves = router->max_slave_count(); - bool succp; - /** - * Try to get replacement slave or at least the minimum - * number of slave connections for router session. - */ - if (recv_sescmd > 0 && rses_config.disable_sescmd_history) - { - succp = router->have_enough_servers(); - } - else - { - succp = router->select_connect_backend_servers(ses, backends, - current_master, - &sescmd_list, - &expected_responses, - connection_type::SLAVE); - } - - return succp; -} - -/** - * @brief Route a stored query - * - * When multiple queries are executed in a pipeline fashion, the readwritesplit - * stores the extra queries in a queue. This queue is emptied after reading a - * reply from the backend server. - * - * @param rses Router client session - * @return True if a stored query was routed successfully - */ -bool RWSplitSession::route_stored_query() -{ - bool rval = true; - - /** Loop over the stored statements as long as the routeQuery call doesn't - * append more data to the queue. If it appends data to the queue, we need - * to wait for a response before attempting another reroute */ - while (query_queue) - { - GWBUF* query_queue = modutil_get_next_MySQL_packet(&query_queue); - query_queue = gwbuf_make_contiguous(query_queue); - - /** Store the query queue locally for the duration of the routeQuery call. - * This prevents recursive calls into this function. */ - GWBUF *temp_storage = query_queue; - query_queue = NULL; - - // TODO: Move the handling of queued queries to the client protocol - // TODO: module where the command tracking is done automatically. - uint8_t cmd = mxs_mysql_get_command(query_queue); - mysql_protocol_set_current_command(client_dcb, (mxs_mysql_cmd_t)cmd); - - if (!routeQuery(query_queue)) - { - rval = false; - MXS_ERROR("Failed to route queued query."); - } - - if (query_queue == NULL) - { - /** Query successfully routed and no responses are expected */ - query_queue = temp_storage; - } - else - { - /** Routing was stopped, we need to wait for a response before retrying */ - query_queue = gwbuf_append(temp_storage, query_queue); - break; - } - } - - return rval; -} - -void close_all_connections(SRWBackendList& backends) -{ - for (SRWBackendList::iterator it = backends.begin(); it != backends.end(); it++) - { - SRWBackend& backend = *it; - - if (backend->in_use()) - { - backend->close(); - } - } -} - -void check_and_log_backend_state(const SRWBackend& backend, DCB* problem_dcb) -{ - if (backend) - { - /** This is a valid DCB for a backend ref */ - if (backend->in_use() && backend->dcb() == problem_dcb) - { - MXS_ERROR("Backend '%s' is still in use and points to the problem DCB.", - backend->name()); - ss_dassert(false); - } - } - else - { - const char *remote = problem_dcb->state == DCB_STATE_POLLING && - problem_dcb->server ? problem_dcb->server->unique_name : "CLOSED"; - - MXS_ERROR("DCB connected to '%s' is not in use by the router " - "session, not closing it. DCB is in state '%s'", - remote, STRDCBSTATE(problem_dcb->state)); - } -} - RWSplit::RWSplit(SERVICE* service, const Config& config): mxs::Router(service), m_service(service), @@ -619,85 +305,6 @@ RWSplitSession* RWSplit::newSession(MXS_SESSION *session) return rses; } -/** - * @brief Close a router session - * - * Close a session with the router, this is the mechanism by which a router - * may perform cleanup. The instance of the router that relates to - * the relevant service is passed, along with the router session that is to - * be closed. The freeSession will be called once the session has been closed. - * - * @param instance The router instance data - * @param session The router session being closed - */ -void RWSplitSession::close() -{ - close_all_connections(backends); - - if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO) && - sescmd_list.size()) - { - std::string sescmdstr; - - for (mxs::SessionCommandList::iterator it = sescmd_list.begin(); - it != sescmd_list.end(); it++) - { - mxs::SSessionCommand& scmd = *it; - sescmdstr += scmd->to_string(); - sescmdstr += "\n"; - } - - MXS_INFO("Executed session commands:\n%s", sescmdstr.c_str()); - } -} - -int32_t RWSplitSession::routeQuery(GWBUF* querybuf) -{ - int rval = 0; - - if (query_queue == NULL && - (expected_responses == 0 || - mxs_mysql_get_command(querybuf) == MXS_COM_STMT_FETCH || - load_data_state == LOAD_DATA_ACTIVE || - large_query)) - { - /** Gather the information required to make routing decisions */ - RouteInfo info(this, querybuf); - - /** No active or pending queries */ - if (route_single_stmt(querybuf, info)) - { - rval = 1; - } - } - else - { - /** - * We are already processing a request from the client. Store the - * new query and wait for the previous one to complete. - */ - ss_dassert(expected_responses || query_queue); - MXS_INFO("Storing query (len: %d cmd: %0x), expecting %d replies to current command", - gwbuf_length(querybuf), GWBUF_DATA(querybuf)[4], expected_responses); - query_queue = gwbuf_append(query_queue, querybuf); - querybuf = NULL; - rval = 1; - ss_dassert(expected_responses > 0); - - if (expected_responses == 0 && !route_stored_query()) - { - rval = 0; - } - } - - if (querybuf != NULL) - { - gwbuf_free(querybuf); - } - - return rval; -} - void RWSplit::diagnostics(DCB *dcb) { RWSplit *router = this; @@ -809,315 +416,12 @@ json_t* RWSplit::diagnostics_json() const return rval; } -static void log_unexpected_response(DCB* dcb, GWBUF* buffer) -{ - if (mxs_mysql_is_err_packet(buffer)) - { - /** This should be the only valid case where the server sends a response - * without the client sending one first. MaxScale does not yet advertise - * the progress reporting flag so we don't need to handle it. */ - uint8_t* data = GWBUF_DATA(buffer); - size_t len = MYSQL_GET_PAYLOAD_LEN(data); - uint16_t errcode = MYSQL_GET_ERRCODE(data); - std::string errstr((char*)data + 7, (char*)data + 7 + len - 3); - - if (errcode == ER_CONNECTION_KILLED) - { - MXS_INFO("Connection from '%s'@'%s' to '%s' was killed", - dcb->session->client_dcb->user, - dcb->session->client_dcb->remote, - dcb->server->unique_name); - } - else - { - MXS_WARNING("Server '%s' sent an unexpected error: %hu, %s", - dcb->server->unique_name, errcode, errstr.c_str()); - } - } - else - { - MXS_ERROR("Unexpected internal state: received response 0x%02hhx from " - "server '%s' when no response was expected", - mxs_mysql_get_command(buffer), dcb->server->unique_name); - ss_dassert(false); - } -} - -/** - * @bref discard the result of wait gtid statment, the result will be an error - * packet or an error packet. - * @param buffer origin reply buffer - * @param proto MySQLProtocol - * @return reset buffer - */ -GWBUF* RWSplitSession::discard_master_wait_gtid_result(GWBUF *buffer) -{ - uint8_t header_and_command[MYSQL_HEADER_LEN + 1]; - uint8_t packet_len = 0; - uint8_t offset = 0; - mxs_mysql_cmd_t com; - - gwbuf_copy_data(buffer, 0, MYSQL_HEADER_LEN + 1, header_and_command); - /* ignore error packet */ - if (MYSQL_GET_COMMAND(header_and_command) == MYSQL_REPLY_ERR) - { - wait_gtid_state = EXPECTING_NOTHING; - return buffer; - } - - /* this packet must be an ok packet now */ - ss_dassert(MYSQL_GET_COMMAND(header_and_command) == MYSQL_REPLY_OK); - packet_len = MYSQL_GET_PAYLOAD_LEN(header_and_command) + MYSQL_HEADER_LEN; - wait_gtid_state = EXPECTING_REAL_RESULT; - next_seq = 1; - - return gwbuf_consume(buffer, packet_len); -} - -/** - * @bref After discarded the wait result, we need correct the seqence number of every packet - * - * @param buffer origin reply buffer - * @param proto MySQLProtocol - * - */ -void RWSplitSession::correct_packet_sequence(GWBUF *buffer) -{ - uint8_t header[3]; - uint32_t offset = 0; - uint32_t packet_len = 0; - if (wait_gtid_state == EXPECTING_REAL_RESULT) - { - while (gwbuf_copy_data(buffer, offset, 3, header) == 3) - { - packet_len = MYSQL_GET_PAYLOAD_LEN(header) + MYSQL_HEADER_LEN; - uint8_t *seq = gwbuf_byte_pointer(buffer, offset + MYSQL_SEQ_OFFSET); - *seq = next_seq; - next_seq++; - offset += packet_len; - } - } -} - -void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb) -{ - DCB *client_dcb = backend_dcb->session->client_dcb; - - SRWBackend& backend = get_backend_from_dcb(backend_dcb); - - if (rses_config.enable_causal_read && - GWBUF_IS_REPLY_OK(writebuf) && - backend == current_master) - { - /** Save gtid position */ - char *tmp = gwbuf_get_property(writebuf, (char *)"gtid"); - if (tmp) - { - gtid_pos = std::string(tmp); - } - } - - if (wait_gtid_state == EXPECTING_WAIT_GTID_RESULT) - { - ss_dassert(rses_config.enable_causal_read); - if ((writebuf = discard_master_wait_gtid_result(writebuf)) == NULL) - { - // Nothing to route, return - return; - } - } - if (wait_gtid_state == EXPECTING_REAL_RESULT) - { - correct_packet_sequence(writebuf); - } - - if (backend->get_reply_state() == REPLY_STATE_DONE) - { - /** If we receive an unexpected response from the server, the internal - * logic cannot handle this situation. Routing the reply straight to - * the client should be the safest thing to do at this point. */ - log_unexpected_response(backend_dcb, writebuf); - MXS_SESSION_ROUTE_REPLY(backend_dcb->session, writebuf); - return; - } - - if (session_have_stmt(backend_dcb->session)) - { - /** Statement was successfully executed, free the stored statement */ - session_clear_stmt(backend_dcb->session); - } - - if (backend->reply_is_complete(writebuf)) - { - /** Got a complete reply, acknowledge the write and decrement expected response count */ - backend->ack_write(); - expected_responses--; - ss_dassert(expected_responses >= 0); - ss_dassert(backend->get_reply_state() == REPLY_STATE_DONE); - MXS_INFO("Reply complete, last reply from %s", backend->name()); - } - else - { - MXS_INFO("Reply not yet complete. Waiting for %d replies, got one from %s", - expected_responses, backend->name()); - } - - if (backend->have_session_commands()) - { - /** Reply to an executed session command */ - process_sescmd_response(backend, &writebuf); - } - - if (backend->have_session_commands()) - { - if (backend->execute_session_command()) - { - expected_responses++; - } - } - else if (expected_responses == 0 && query_queue) - { - route_stored_query(); - } - - if (writebuf) - { - ss_dassert(client_dcb); - /** Write reply to client DCB */ - MXS_SESSION_ROUTE_REPLY(backend_dcb->session, writebuf); - } -} - uint64_t RWSplit::getCapabilities() { return RCAP_TYPE_STMT_INPUT | RCAP_TYPE_TRANSACTION_TRACKING | RCAP_TYPE_PACKET_OUTPUT | RCAP_TYPE_SESSION_STATE_TRACKING; } -/** - * @brief Router error handling routine - * - * Error Handler routine to resolve backend failures. If it succeeds then - * there are enough operative backends available and connected. Otherwise it - * fails, and session is terminated. - * - * @param instance The router instance - * @param router_session The router session - * @param errmsgbuf The error message to reply - * @param backend_dcb The backend DCB - * @param action The action: ERRACT_NEW_CONNECTION or - * ERRACT_REPLY_CLIENT - * @param succp Result of action: true if router can continue - */ -void RWSplitSession::handleError(GWBUF *errmsgbuf, DCB *problem_dcb, - mxs_error_action_t action, bool *succp) -{ - ss_dassert(problem_dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER); - CHK_DCB(problem_dcb); - MXS_SESSION *session = problem_dcb->session; - ss_dassert(session); - - SRWBackend& backend = get_backend_from_dcb(problem_dcb); - ss_dassert(backend->in_use()); - - switch (action) - { - case ERRACT_NEW_CONNECTION: - { - if (current_master && current_master->in_use() && current_master == backend) - { - /** The connection to the master has failed */ - SERVER *srv = current_master->server(); - bool can_continue = false; - - if (!backend->is_waiting_result()) - { - /** The failure of a master is not considered a critical - * failure as partial functionality still remains. If - * master_failure_mode is not set to fail_instantly, reads - * are allowed as long as slave servers are available - * and writes will cause an error to be returned. - * - * If we were waiting for a response from the master, we - * can't be sure whether it was executed or not. In this - * case the safest thing to do is to close the client - * connection. */ - if (rses_config.master_failure_mode != RW_FAIL_INSTANTLY) - { - can_continue = true; - } - } - else - { - // We were expecting a response but we aren't going to get one - expected_responses--; - - if (rses_config.master_failure_mode == RW_ERROR_ON_WRITE) - { - /** In error_on_write mode, the session can continue even - * if the master is lost. Send a read-only error to - * the client to let it know that the query failed. */ - can_continue = true; - send_readonly_error(client_dcb); - } - - if (!SERVER_IS_MASTER(srv) && !srv->master_err_is_logged) - { - ss_dassert(backend); - MXS_ERROR("Server %s (%s) lost the master status while waiting" - " for a result. Client sessions will be closed.", - backend->name(), backend->uri()); - backend->server()->master_err_is_logged = true; - } - } - - if (session_trx_is_active(session)) - { - // We have an open transaction, we can't continue - can_continue = false; - } - - *succp = can_continue; - backend->close(); - } - else - { - if (target_node && target_node == backend && - session_trx_is_read_only(problem_dcb->session)) - { - /** - * We were locked to a single node but the node died. Currently - * this only happens with read-only transactions so the only - * thing we can do is to close the connection. - */ - *succp = false; - backend->close(mxs::Backend::CLOSE_FATAL); - } - else - { - /** Try to replace the failed connection with a new one */ - *succp = handle_error_new_connection(problem_dcb, errmsgbuf); - } - } - - check_and_log_backend_state(backend, problem_dcb); - break; - } - - case ERRACT_REPLY_CLIENT: - { - handle_error_reply_client(problem_dcb, errmsgbuf); - *succp = false; /*< no new backend servers were made available */ - break; - } - - default: - ss_dassert(!true); - *succp = false; - break; - } -} - MXS_BEGIN_DECLS /** diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index 8931e5489..418a9f581 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -559,6 +559,29 @@ SRWBackend RWSplitSession::get_target_backend(backend_type_t btype, return rval; } +/** + * @brief Get the maximum replication lag for this router + * + * @param rses Router client session + * @return Replication lag from configuration or very large number + */ +int RWSplitSession::get_max_replication_lag() +{ + int conf_max_rlag; + + /** if there is no configured value, then longest possible int is used */ + if (rses_config.max_slave_replication_lag > 0) + { + conf_max_rlag = rses_config.max_slave_replication_lag; + } + else + { + conf_max_rlag = ~(1 << 31); + } + + return conf_max_rlag; +} + /** * @brief Handle hinted target query * diff --git a/server/modules/routing/readwritesplit/rwsplitsession.cc b/server/modules/routing/readwritesplit/rwsplitsession.cc index 285cae01b..def04f227 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.cc +++ b/server/modules/routing/readwritesplit/rwsplitsession.cc @@ -78,6 +78,669 @@ RWSplitSession* RWSplitSession::create(RWSplit* router, MXS_SESSION* session) return rses; } +void close_all_connections(SRWBackendList& backends) +{ + for (SRWBackendList::iterator it = backends.begin(); it != backends.end(); it++) + { + SRWBackend& backend = *it; + + if (backend->in_use()) + { + backend->close(); + } + } +} + +void RWSplitSession::close() +{ + close_all_connections(backends); + + if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO) && + sescmd_list.size()) + { + std::string sescmdstr; + + for (mxs::SessionCommandList::iterator it = sescmd_list.begin(); + it != sescmd_list.end(); it++) + { + mxs::SSessionCommand& scmd = *it; + sescmdstr += scmd->to_string(); + sescmdstr += "\n"; + } + + MXS_INFO("Executed session commands:\n%s", sescmdstr.c_str()); + } +} + +int32_t RWSplitSession::routeQuery(GWBUF* querybuf) +{ + int rval = 0; + + if (query_queue == NULL && + (expected_responses == 0 || + mxs_mysql_get_command(querybuf) == MXS_COM_STMT_FETCH || + load_data_state == LOAD_DATA_ACTIVE || + large_query)) + { + /** Gather the information required to make routing decisions */ + RouteInfo info(this, querybuf); + + /** No active or pending queries */ + if (route_single_stmt(querybuf, info)) + { + rval = 1; + } + } + else + { + /** + * We are already processing a request from the client. Store the + * new query and wait for the previous one to complete. + */ + ss_dassert(expected_responses || query_queue); + MXS_INFO("Storing query (len: %d cmd: %0x), expecting %d replies to current command", + gwbuf_length(querybuf), GWBUF_DATA(querybuf)[4], expected_responses); + query_queue = gwbuf_append(query_queue, querybuf); + querybuf = NULL; + rval = 1; + ss_dassert(expected_responses > 0); + + if (expected_responses == 0 && !route_stored_query()) + { + rval = 0; + } + } + + if (querybuf != NULL) + { + gwbuf_free(querybuf); + } + + return rval; +} + +/** + * @brief Route a stored query + * + * When multiple queries are executed in a pipeline fashion, the readwritesplit + * stores the extra queries in a queue. This queue is emptied after reading a + * reply from the backend server. + * + * @param rses Router client session + * @return True if a stored query was routed successfully + */ +bool RWSplitSession::route_stored_query() +{ + bool rval = true; + + /** Loop over the stored statements as long as the routeQuery call doesn't + * append more data to the queue. If it appends data to the queue, we need + * to wait for a response before attempting another reroute */ + while (query_queue) + { + GWBUF* query_queue = modutil_get_next_MySQL_packet(&query_queue); + query_queue = gwbuf_make_contiguous(query_queue); + + /** Store the query queue locally for the duration of the routeQuery call. + * This prevents recursive calls into this function. */ + GWBUF *temp_storage = query_queue; + query_queue = NULL; + + // TODO: Move the handling of queued queries to the client protocol + // TODO: module where the command tracking is done automatically. + uint8_t cmd = mxs_mysql_get_command(query_queue); + mysql_protocol_set_current_command(client_dcb, (mxs_mysql_cmd_t)cmd); + + if (!routeQuery(query_queue)) + { + rval = false; + MXS_ERROR("Failed to route queued query."); + } + + if (query_queue == NULL) + { + /** Query successfully routed and no responses are expected */ + query_queue = temp_storage; + } + else + { + /** Routing was stopped, we need to wait for a response before retrying */ + query_queue = gwbuf_append(temp_storage, query_queue); + break; + } + } + + return rval; +} + +bool RWSplitSession::reroute_stored_statement(const SRWBackend& old, GWBUF *stored) +{ + bool success = false; + + if (!session_trx_is_active(client_dcb->session)) + { + /** + * Only try to retry the read if autocommit is enabled and we are + * outside of a transaction + */ + for (auto it = backends.begin(); it != backends.end(); it++) + { + SRWBackend& backend = *it; + + if (backend->in_use() && backend != old && + !backend->is_master() && backend->is_slave()) + { + /** Found a valid candidate; a non-master slave that's in use */ + if (backend->write(stored)) + { + MXS_INFO("Retrying failed read at '%s'.", backend->name()); + ss_dassert(backend->get_reply_state() == REPLY_STATE_DONE); + backend->set_reply_state(REPLY_STATE_START); + expected_responses++; + success = true; + break; + } + } + } + + if (!success && current_master && current_master->in_use()) + { + /** + * Either we failed to write to the slave or no valid slave was found. + * Try to retry the read on the master. + */ + if (current_master->write(stored)) + { + MXS_INFO("Retrying failed read at '%s'.", current_master->name()); + ss_dassert(current_master->get_reply_state() == REPLY_STATE_DONE); + current_master->set_reply_state(REPLY_STATE_START); + expected_responses++; + success = true; + } + } + } + + return success; +} + +/** + * @bref discard the result of wait gtid statment, the result will be an error + * packet or an error packet. + * @param buffer origin reply buffer + * @param proto MySQLProtocol + * @return reset buffer + */ +GWBUF* RWSplitSession::discard_master_wait_gtid_result(GWBUF *buffer) +{ + uint8_t header_and_command[MYSQL_HEADER_LEN + 1]; + uint8_t packet_len = 0; + uint8_t offset = 0; + mxs_mysql_cmd_t com; + + gwbuf_copy_data(buffer, 0, MYSQL_HEADER_LEN + 1, header_and_command); + /* ignore error packet */ + if (MYSQL_GET_COMMAND(header_and_command) == MYSQL_REPLY_ERR) + { + wait_gtid_state = EXPECTING_NOTHING; + return buffer; + } + + /* this packet must be an ok packet now */ + ss_dassert(MYSQL_GET_COMMAND(header_and_command) == MYSQL_REPLY_OK); + packet_len = MYSQL_GET_PAYLOAD_LEN(header_and_command) + MYSQL_HEADER_LEN; + wait_gtid_state = EXPECTING_REAL_RESULT; + next_seq = 1; + + return gwbuf_consume(buffer, packet_len); +} + +/** + * @brief Find the backend reference that matches the given DCB + * + * @param dcb DCB to match + * + * @return The correct reference + */ +SRWBackend& RWSplitSession::get_backend_from_dcb(DCB *dcb) +{ + ss_dassert(dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER); + CHK_DCB(dcb); + + for (auto it = backends.begin(); it != backends.end(); it++) + { + SRWBackend& backend = *it; + + if (backend->in_use() && backend->dcb() == dcb) + { + return backend; + } + } + + /** We should always have a valid backend reference and in case we don't, + * something is terribly wrong. */ + MXS_ALERT("No reference to DCB %p found, aborting.", dcb); + raise(SIGABRT); + + // To make the compiler happy, we return a reference to a static value. + static SRWBackend this_should_not_happen; + return this_should_not_happen; +} + +/** + * @bref After discarded the wait result, we need correct the seqence number of every packet + * + * @param buffer origin reply buffer + * @param proto MySQLProtocol + * + */ +void RWSplitSession::correct_packet_sequence(GWBUF *buffer) +{ + uint8_t header[3]; + uint32_t offset = 0; + uint32_t packet_len = 0; + if (wait_gtid_state == EXPECTING_REAL_RESULT) + { + while (gwbuf_copy_data(buffer, offset, 3, header) == 3) + { + packet_len = MYSQL_GET_PAYLOAD_LEN(header) + MYSQL_HEADER_LEN; + uint8_t *seq = gwbuf_byte_pointer(buffer, offset + MYSQL_SEQ_OFFSET); + *seq = next_seq; + next_seq++; + offset += packet_len; + } + } +} + +static void log_unexpected_response(DCB* dcb, GWBUF* buffer) +{ + if (mxs_mysql_is_err_packet(buffer)) + { + /** This should be the only valid case where the server sends a response + * without the client sending one first. MaxScale does not yet advertise + * the progress reporting flag so we don't need to handle it. */ + uint8_t* data = GWBUF_DATA(buffer); + size_t len = MYSQL_GET_PAYLOAD_LEN(data); + uint16_t errcode = MYSQL_GET_ERRCODE(data); + std::string errstr((char*)data + 7, (char*)data + 7 + len - 3); + + if (errcode == ER_CONNECTION_KILLED) + { + MXS_INFO("Connection from '%s'@'%s' to '%s' was killed", + dcb->session->client_dcb->user, + dcb->session->client_dcb->remote, + dcb->server->unique_name); + } + else + { + MXS_WARNING("Server '%s' sent an unexpected error: %hu, %s", + dcb->server->unique_name, errcode, errstr.c_str()); + } + } + else + { + MXS_ERROR("Unexpected internal state: received response 0x%02hhx from " + "server '%s' when no response was expected", + mxs_mysql_get_command(buffer), dcb->server->unique_name); + ss_dassert(false); + } +} + +void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb) +{ + DCB *client_dcb = backend_dcb->session->client_dcb; + + SRWBackend& backend = get_backend_from_dcb(backend_dcb); + + if (rses_config.enable_causal_read && + GWBUF_IS_REPLY_OK(writebuf) && + backend == current_master) + { + /** Save gtid position */ + char *tmp = gwbuf_get_property(writebuf, (char *)"gtid"); + if (tmp) + { + gtid_pos = std::string(tmp); + } + } + + if (wait_gtid_state == EXPECTING_WAIT_GTID_RESULT) + { + ss_dassert(rses_config.enable_causal_read); + if ((writebuf = discard_master_wait_gtid_result(writebuf)) == NULL) + { + // Nothing to route, return + return; + } + } + if (wait_gtid_state == EXPECTING_REAL_RESULT) + { + correct_packet_sequence(writebuf); + } + + if (backend->get_reply_state() == REPLY_STATE_DONE) + { + /** If we receive an unexpected response from the server, the internal + * logic cannot handle this situation. Routing the reply straight to + * the client should be the safest thing to do at this point. */ + log_unexpected_response(backend_dcb, writebuf); + MXS_SESSION_ROUTE_REPLY(backend_dcb->session, writebuf); + return; + } + + if (session_have_stmt(backend_dcb->session)) + { + /** Statement was successfully executed, free the stored statement */ + session_clear_stmt(backend_dcb->session); + } + + if (backend->reply_is_complete(writebuf)) + { + /** Got a complete reply, acknowledge the write and decrement expected response count */ + backend->ack_write(); + expected_responses--; + ss_dassert(expected_responses >= 0); + ss_dassert(backend->get_reply_state() == REPLY_STATE_DONE); + MXS_INFO("Reply complete, last reply from %s", backend->name()); + } + else + { + MXS_INFO("Reply not yet complete. Waiting for %d replies, got one from %s", + expected_responses, backend->name()); + } + + if (backend->have_session_commands()) + { + /** Reply to an executed session command */ + process_sescmd_response(backend, &writebuf); + } + + if (backend->have_session_commands()) + { + if (backend->execute_session_command()) + { + expected_responses++; + } + } + else if (expected_responses == 0 && query_queue) + { + route_stored_query(); + } + + if (writebuf) + { + ss_dassert(client_dcb); + /** Write reply to client DCB */ + MXS_SESSION_ROUTE_REPLY(backend_dcb->session, writebuf); + } +} + +void check_and_log_backend_state(const SRWBackend& backend, DCB* problem_dcb) +{ + if (backend) + { + /** This is a valid DCB for a backend ref */ + if (backend->in_use() && backend->dcb() == problem_dcb) + { + MXS_ERROR("Backend '%s' is still in use and points to the problem DCB.", + backend->name()); + ss_dassert(false); + } + } + else + { + const char *remote = problem_dcb->state == DCB_STATE_POLLING && + problem_dcb->server ? problem_dcb->server->unique_name : "CLOSED"; + + MXS_ERROR("DCB connected to '%s' is not in use by the router " + "session, not closing it. DCB is in state '%s'", + remote, STRDCBSTATE(problem_dcb->state)); + } +} + +/** + * @brief Router error handling routine + * + * Error Handler routine to resolve backend failures. If it succeeds then + * there are enough operative backends available and connected. Otherwise it + * fails, and session is terminated. + * + * @param instance The router instance + * @param router_session The router session + * @param errmsgbuf The error message to reply + * @param backend_dcb The backend DCB + * @param action The action: ERRACT_NEW_CONNECTION or + * ERRACT_REPLY_CLIENT + * @param succp Result of action: true if router can continue + */ +void RWSplitSession::handleError(GWBUF *errmsgbuf, DCB *problem_dcb, + mxs_error_action_t action, bool *succp) +{ + ss_dassert(problem_dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER); + CHK_DCB(problem_dcb); + MXS_SESSION *session = problem_dcb->session; + ss_dassert(session); + + SRWBackend& backend = get_backend_from_dcb(problem_dcb); + ss_dassert(backend->in_use()); + + switch (action) + { + case ERRACT_NEW_CONNECTION: + { + if (current_master && current_master->in_use() && current_master == backend) + { + /** The connection to the master has failed */ + SERVER *srv = current_master->server(); + bool can_continue = false; + + if (!backend->is_waiting_result()) + { + /** The failure of a master is not considered a critical + * failure as partial functionality still remains. If + * master_failure_mode is not set to fail_instantly, reads + * are allowed as long as slave servers are available + * and writes will cause an error to be returned. + * + * If we were waiting for a response from the master, we + * can't be sure whether it was executed or not. In this + * case the safest thing to do is to close the client + * connection. */ + if (rses_config.master_failure_mode != RW_FAIL_INSTANTLY) + { + can_continue = true; + } + } + else + { + // We were expecting a response but we aren't going to get one + expected_responses--; + + if (rses_config.master_failure_mode == RW_ERROR_ON_WRITE) + { + /** In error_on_write mode, the session can continue even + * if the master is lost. Send a read-only error to + * the client to let it know that the query failed. */ + can_continue = true; + send_readonly_error(client_dcb); + } + + if (!SERVER_IS_MASTER(srv) && !srv->master_err_is_logged) + { + ss_dassert(backend); + MXS_ERROR("Server %s (%s) lost the master status while waiting" + " for a result. Client sessions will be closed.", + backend->name(), backend->uri()); + backend->server()->master_err_is_logged = true; + } + } + + if (session_trx_is_active(session)) + { + // We have an open transaction, we can't continue + can_continue = false; + } + + *succp = can_continue; + backend->close(); + } + else + { + if (target_node && target_node == backend && + session_trx_is_read_only(problem_dcb->session)) + { + /** + * We were locked to a single node but the node died. Currently + * this only happens with read-only transactions so the only + * thing we can do is to close the connection. + */ + *succp = false; + backend->close(mxs::Backend::CLOSE_FATAL); + } + else + { + /** Try to replace the failed connection with a new one */ + *succp = handle_error_new_connection(problem_dcb, errmsgbuf); + } + } + + check_and_log_backend_state(backend, problem_dcb); + break; + } + + case ERRACT_REPLY_CLIENT: + { + handle_error_reply_client(problem_dcb, errmsgbuf); + *succp = false; /*< no new backend servers were made available */ + break; + } + + default: + ss_dassert(!true); + *succp = false; + break; + } +} + +/** + * Check if there is backend reference pointing at failed DCB, and reset its + * flags. Then clear DCB's callback and finally : try to find replacement(s) + * for failed slave(s). + * + * This must be called with router lock. + * + * @param inst router instance + * @param rses router client session + * @param dcb failed DCB + * @param errmsg error message which is sent to client if it is waiting + * + * @return true if there are enough backend connections to continue, false if + * not + */ +bool RWSplitSession::handle_error_new_connection(DCB *backend_dcb, GWBUF *errmsg) +{ + SRWBackend& backend = get_backend_from_dcb(backend_dcb); + MXS_SESSION* ses = backend_dcb->session; + bool route_stored = false; + CHK_SESSION(ses); + + if (backend->is_waiting_result()) + { + ss_dassert(expected_responses > 0); + expected_responses--; + + /** + * A query was sent through the backend and it is waiting for a reply. + * Try to reroute the statement to a working server or send an error + * to the client. + */ + GWBUF *stored = NULL; + const SERVER *target = NULL; + if (!session_take_stmt(backend_dcb->session, &stored, &target) || + target != backend->backend()->server || + !reroute_stored_statement(backend, stored)) + { + /** + * We failed to route the stored statement or no statement was + * stored for this server. Either way we can safely free the buffer + * and decrement the expected response count. + */ + gwbuf_free(stored); + + if (!backend->have_session_commands()) + { + /** + * The backend was executing a command that requires a reply. + * Send an error to the client to let it know the query has + * failed. + */ + DCB *client_dcb = ses->client_dcb; + client_dcb->func.write(client_dcb, gwbuf_clone(errmsg)); + } + + if (expected_responses == 0) + { + /** The response from this server was the last one, try to + * route all queued queries */ + route_stored = true; + } + } + } + + /** Close the current connection. This needs to be done before routing any + * of the stored queries. If we route a stored query before the connection + * is closed, it's possible that the routing logic will pick the failed + * server as the target. */ + backend->close(); + + if (route_stored) + { + route_stored_query(); + } + + int max_nslaves = router->max_slave_count(); + bool succp; + /** + * Try to get replacement slave or at least the minimum + * number of slave connections for router session. + */ + if (recv_sescmd > 0 && rses_config.disable_sescmd_history) + { + succp = router->have_enough_servers(); + } + else + { + succp = router->select_connect_backend_servers(ses, backends, + current_master, + &sescmd_list, + &expected_responses, + connection_type::SLAVE); + } + + return succp; +} + +/** + * @brief Handle an error reply for a client + * + * @param ses Session + * @param rses Router session + * @param backend_dcb DCB for the backend server that has failed + * @param errmsg GWBUF containing the error message + */ +void RWSplitSession::handle_error_reply_client(DCB *backend_dcb, GWBUF *errmsg) +{ + mxs_session_state_t sesstate = client_dcb->session->state; + SRWBackend& backend = get_backend_from_dcb(backend_dcb); + + backend->close(); + + if (sesstate == SESSION_STATE_ROUTER_READY) + { + CHK_DCB(client_dcb); + client_dcb->func.write(client_dcb, gwbuf_clone(errmsg)); + } +} + uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer) { uint32_t rval = 0;