diff --git a/maxscale-system-test/CMakeLists.txt b/maxscale-system-test/CMakeLists.txt index da04de531..cfce1f6e3 100644 --- a/maxscale-system-test/CMakeLists.txt +++ b/maxscale-system-test/CMakeLists.txt @@ -455,7 +455,7 @@ add_test_executable(mxs359_read_only.cpp mxs359_read_only mxs359_read_only LABEL # Test master_failure_mode=error_on_write and master replacement add_test_executable(mxs359_error_on_write.cpp mxs359_error_on_write mxs359_error_on_write LABELS readwritesplit REPL_BACKEND) -# Binary protocol prepared statement tests +# Binary protocol prepared statement tests (also tests MXS-2266) add_test_executable(binary_ps.cpp binary_ps replication LABELS readwritesplit LIGHT REPL_BACKEND) add_test_executable(binary_ps_cursor.cpp binary_ps_cursor replication LABELS readwritesplit LIGHT REPL_BACKEND) diff --git a/maxscale-system-test/binary_ps.cpp b/maxscale-system-test/binary_ps.cpp index 137f903f9..bd513263e 100644 --- a/maxscale-system-test/binary_ps.cpp +++ b/maxscale-system-test/binary_ps.cpp @@ -46,6 +46,7 @@ int main(int argc, char** argv) test.add_result(strcmp(buffer, server_id[0]), "Expected server_id '%s', got '%s'", server_id[0], buffer); mysql_stmt_close(stmt); + stmt = mysql_stmt_init(test.maxscales->conn_rwsplit[0]); // Execute read, should return a slave server ID @@ -66,5 +67,8 @@ int main(int argc, char** argv) test.maxscales->close_maxscale_connections(0); + // MXS-2266: COM_STMT_CLOSE causes a warning to be logged + test.log_excludes(0, "Closing unknown prepared statement"); + return test.global_result; } diff --git a/maxscale-system-test/mariadb_nodes.cpp b/maxscale-system-test/mariadb_nodes.cpp index 649944e60..676e6ed84 100644 --- a/maxscale-system-test/mariadb_nodes.cpp +++ b/maxscale-system-test/mariadb_nodes.cpp @@ -456,93 +456,60 @@ int Mariadb_nodes::start_replication() int Galera_nodes::start_galera() { bool old_verbose = verbose; - char str[1024]; - char sys1[1024]; int local_result = 0; local_result += stop_nodes(); - // Remove the grastate.dat file - ssh_node(0, "rm -f /var/lib/mysql/grastate.dat", true); + std::stringstream ss; + + for (int i = 0; i < N; i++) + { + ss << (i == 0 ? "" : ",") << IP_private[i]; + } + + auto gcomm = ss.str(); + + for (int i = 0; i < N; i++) + { + // Remove the grastate.dat file + ssh_node(i, "rm -f /var/lib/mysql/grastate.dat", true); + + ssh_node(i, "echo [mysqld] > cluster_address.cnf", true); + ssh_node_f(i, true, "echo wsrep_cluster_address=gcomm://%s >> cluster_address.cnf", gcomm.c_str()); + ssh_node(i, "cp cluster_address.cnf /etc/my.cnf.d/", true); + + ssh_node_f(i, + true, + "sed -i 's/###NODE-ADDRESS###/%s/' /etc/my.cnf.d/* /etc/mysql/my.cnf.d/*;" + "sed -i \"s|###GALERA-LIB-PATH###|$(ls /usr/lib*/galera/*.so)|g\" /etc/my.cnf.d/* /etc/mysql/my.cnf.d/*", + IP[i]); + } printf("Starting new Galera cluster\n"); fflush(stdout); - ssh_node(0, "echo [mysqld] > cluster_address.cnf", false); - ssh_node(0, "echo wsrep_cluster_address=gcomm:// >> cluster_address.cnf", false); - ssh_node(0, "cp cluster_address.cnf /etc/my.cnf.d/", true); + // Start the first node that also starts a new cluster + ssh_node_f(0, true, "galera_new_cluster"); - ssh_node_f(0, - true, - "sed -i 's/###NODE-ADDRESS###/%s/' /etc/my.cnf.d/* /etc/mysql/my.cnf.d/*;" - "sed -i \"s|###GALERA-LIB-PATH###|$(ls /usr/lib*/galera/*.so)|g\" /etc/my.cnf.d/* /etc/mysql/my.cnf.d/*", - IP[0]); - - - if (start_node(0, (char*) " --wsrep-cluster-address=gcomm://") != 0) + for (int i = 0; i < N; i++) { - cout << "Failed to start first node, trying to prepare it again" << endl; - cout << "---------- BEGIN LOGS ----------" << endl; - verbose = true; - ssh_node_f(0, true, "sudo journalctl -u mariadb | tail -n 50"); - cout << "----------- END LOGS -----------" << endl; - prepare_server(0); - local_result += start_node(0, (char*) " --wsrep-cluster-address=gcomm://"); + if (start_node(i, "") != 0) + { + cout << "Failed to start node" << i << endl; + cout << "---------- BEGIN LOGS ----------" << endl; + verbose = true; + ssh_node_f(0, true, "sudo journalctl -u mariadb | tail -n 50"); + cout << "----------- END LOGS -----------" << endl; + } } + char str[1024]; sprintf(str, "%s/create_user_galera.sh", test_dir); copy_to_node_legacy(str, "~/", 0); - sprintf(str, - "export galera_user=\"%s\"; export galera_password=\"%s\"; ./create_user_galera.sh %s", - user_name, - password, - socket_cmd[0]); - ssh_node(0, str, false); - - std::vector threads; - std::mutex lock; - - for (int i = 1; i < N; i++) - { - auto func = [&, i]() { - printf("Starting node %d\n", i); - fflush(stdout); - ssh_node(i, "echo [mysqld] > cluster_address.cnf", true); - sprintf(str, "echo wsrep_cluster_address=gcomm://%s >> cluster_address.cnf", IP_private[0]); - ssh_node(i, str, true); - ssh_node(i, "cp cluster_address.cnf /etc/my.cnf.d/", true); - ssh_node_f(i, - true, - "sed -i 's/###NODE-ADDRESS###/%s/' /etc/my.cnf.d/* /etc/mysql/my.cnf.d/*;" - "sed -i \"s|###GALERA-LIB-PATH###|$(ls /usr/lib*/galera/*.so)|g\" /etc/my.cnf.d/* /etc/mysql/my.cnf.d/*", - IP[i]); - - sprintf(&sys1[0], " --wsrep-cluster-address=gcomm://%s", IP_private[0]); - if (this->verbose) - { - printf("%s\n", sys1); - fflush(stdout); - } - fflush(stdout); - - if (start_node(i, sys1)) - { - std::lock_guard guard(lock); - cout << "Failed to start node " << i << endl; - cout << "---------- BEGIN LOGS ----------" << endl; - verbose = true; - ssh_node_f(i, true, "sudo journalctl -u mariadb | tail -n 50"); - cout << "----------- END LOGS -----------" << endl; - local_result++; - } - }; - threads.emplace_back(func); - } - - for (auto& a : threads) - { - a.join(); - } + ssh_node_f(0, false, "export galera_user=\"%s\"; export galera_password=\"%s\"; ./create_user_galera.sh %s", + user_name, + password, + socket_cmd[0]); local_result += robust_connect(5) ? 0 : 1; local_result += execute_query(nodes[0], "%s", create_repl_user); diff --git a/maxscale-system-test/mariadb_nodes.h b/maxscale-system-test/mariadb_nodes.h index 34460627b..5875c0a47 100644 --- a/maxscale-system-test/mariadb_nodes.h +++ b/maxscale-system-test/mariadb_nodes.h @@ -153,6 +153,14 @@ public: int connect(int i, const std::string& db = "test"); int connect(const std::string& db = "test"); + /** + * Get a Connection to a node + */ + Connection get_connection(int i, const std::string& db = "test") + { + return Connection(IP[i], port[i], user_name, password, db, ssl); + } + /** * Repeatedly try to connect with one second sleep in between attempts * diff --git a/maxscale-system-test/proxy_protocol.cpp b/maxscale-system-test/proxy_protocol.cpp index 9b991e007..a288ed6eb 100644 --- a/maxscale-system-test/proxy_protocol.cpp +++ b/maxscale-system-test/proxy_protocol.cpp @@ -167,6 +167,20 @@ int main(int argc, char *argv[]) } } + + /** + * MXS-2252: Proxy Protocol not displaying originating IP address in SHOW PROCESSLIST + * https://jira.mariadb.org/browse/MXS-2252 + */ + Connection direct = test.repl->get_connection(0); + Connection rwsplit = test.maxscales->rwsplit(0); + direct.connect(); + rwsplit.connect(); + auto d = direct.field("SELECT USER()"); + auto r = rwsplit.field("SELECT USER()"); + test.tprintf("Direct: %s Readwritesplit: %s", d.c_str(), r.c_str()); + test.expect(d == r, "Both connections should return the same user: %s != %s", d.c_str(), r.c_str()); + if (server_proxy_setting) { // Restore server settings. diff --git a/script/maxscale_generate_support_info.py b/script/maxscale_generate_support_info.py index a99ca1523..3eb73d22c 100755 --- a/script/maxscale_generate_support_info.py +++ b/script/maxscale_generate_support_info.py @@ -1,4 +1,4 @@ -#!/usr/bin/python3 +#!/usr/bin/env python3 # Copyright (c) 2019 MariaDB Corporation Ab # diff --git a/server/core/dcb.cc b/server/core/dcb.cc index 11bf2ffbf..9ba5e10c0 100644 --- a/server/core/dcb.cc +++ b/server/core/dcb.cc @@ -65,6 +65,12 @@ using std::string; #define DCB_EH_NOTICE(s, p) #endif +#ifdef EPOLLRDHUP +constexpr uint32_t poll_events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLHUP | EPOLLET; +#else +constexpr uint32_t poll_events = EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLET; +#endif + namespace { @@ -2364,6 +2370,7 @@ static void dcb_add_to_list(DCB* dcb) } else { + mxb_assert(this_unit.all_dcbs[id]->thread.tail->thread.next != dcb); this_unit.all_dcbs[id]->thread.tail->thread.next = dcb; this_unit.all_dcbs[id]->thread.tail = dcb; } @@ -2530,6 +2537,8 @@ void dcb_foreach_local(bool (* func)(DCB* dcb, void* data), void* data) { if (dcb->session) { + mxb_assert(dcb->thread.next != dcb); + if (!func(dcb, data)) { break; @@ -3052,13 +3061,7 @@ int poll_add_dcb(DCB* dcb) { dcb_sanity_check(dcb); - uint32_t events = 0; - -#ifdef EPOLLRDHUP - events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLHUP | EPOLLET; -#else - events = EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLET; -#endif + uint32_t events = poll_events; /** Choose new state and worker thread ID according to the role of DCB. */ dcb_state_t new_state; @@ -3190,13 +3193,23 @@ DCB* dcb_get_current() static int upstream_throttle_callback(DCB* dcb, DCB_REASON reason, void* userdata) { DCB* client_dcb = dcb->session->client_dcb; + mxb::Worker* worker = static_cast(client_dcb->owner); + + // The fd is removed manually here due to the fact that poll_add_dcb causes the DCB to be added to the + // worker's list of DCBs but poll_remove_dcb doesn't remove it from it. This is due to the fact that the + // DCBs are only removed from the list when they are closed. if (reason == DCB_REASON_HIGH_WATER) { - poll_remove_dcb(client_dcb); + MXS_INFO("High water mark hit for '%s'@'%s', not reading data until low water mark is hit", + client_dcb->user, client_dcb->remote); + worker->remove_fd(client_dcb->fd); + client_dcb->state = DCB_STATE_NOPOLLING; } else if (reason == DCB_REASON_LOW_WATER) { - poll_add_dcb(client_dcb); + MXS_INFO("Low water mark hit for '%s'@'%s', accepting new data", client_dcb->user, client_dcb->remote); + worker->add_fd(client_dcb->fd, poll_events, (MXB_POLL_DATA*)client_dcb); + client_dcb->state = DCB_STATE_POLLING; } return 0; @@ -3208,7 +3221,13 @@ bool backend_dcb_remove_func(DCB* dcb, void* data) if (dcb->session == session && dcb->role == DCB::Role::BACKEND) { - poll_remove_dcb(dcb); + DCB* client_dcb = dcb->session->client_dcb; + MXS_INFO("High water mark hit for connection to '%s' from %s'@'%s', not reading data until low water " + "mark is hit", dcb->server->name(), client_dcb->user, client_dcb->remote); + + mxb::Worker* worker = static_cast(dcb->owner); + worker->remove_fd(dcb->fd); + dcb->state = DCB_STATE_NOPOLLING; } return true; @@ -3220,7 +3239,13 @@ bool backend_dcb_add_func(DCB* dcb, void* data) if (dcb->session == session && dcb->role == DCB::Role::BACKEND) { - poll_add_dcb(dcb); + DCB* client_dcb = dcb->session->client_dcb; + MXS_INFO("Low water mark hit for connection to '%s' from '%s'@'%s', accepting new data", + dcb->server->name(), client_dcb->user, client_dcb->remote); + + mxb::Worker* worker = static_cast(dcb->owner); + worker->add_fd(dcb->fd, poll_events, (MXB_POLL_DATA*)dcb); + dcb->state = DCB_STATE_POLLING; } return true; diff --git a/server/core/queryclassifier.cc b/server/core/queryclassifier.cc index 29cef1a25..d59c1e3a6 100644 --- a/server/core/queryclassifier.cc +++ b/server/core/queryclassifier.cc @@ -390,8 +390,18 @@ uint32_t QueryClassifier::ps_get_type(std::string id) const void QueryClassifier::ps_erase(GWBUF* buffer) { - m_ps_handles.erase(qc_mysql_extract_ps_id(buffer)); - m_sPs_manager->erase(buffer); + if (qc_mysql_is_ps_command(mxs_mysql_get_command(buffer))) + { + // Erase the type of the statement stored with the internal ID + m_sPs_manager->erase(ps_id_internal_get(buffer)); + // ... and then erase the external to internal ID mapping + m_ps_handles.erase(qc_mysql_extract_ps_id(buffer)); + } + else + { + // Not a PS command, we don't need the ID mapping + m_sPs_manager->erase(buffer); + } } bool QueryClassifier::query_type_is_read_only(uint32_t qtype) const diff --git a/server/modules/protocol/MySQL/mariadbbackend/mysql_backend.cc b/server/modules/protocol/MySQL/mariadbbackend/mysql_backend.cc index 1f16db2ec..f92f165e1 100644 --- a/server/modules/protocol/MySQL/mariadbbackend/mysql_backend.cc +++ b/server/modules/protocol/MySQL/mariadbbackend/mysql_backend.cc @@ -1441,7 +1441,8 @@ static int backend_write_delayqueue(DCB* dcb, GWBUF* buffer) if (rc == 0) { - do_handle_error(dcb, ERRACT_NEW_CONNECTION, "Lost connection to backend server."); + do_handle_error(dcb, ERRACT_NEW_CONNECTION, + "Lost connection to backend server while writing delay queue."); } return rc; diff --git a/server/modules/routing/readwritesplit/readwritesplit.hh b/server/modules/routing/readwritesplit/readwritesplit.hh index e168c331f..fd9df4e98 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.hh +++ b/server/modules/routing/readwritesplit/readwritesplit.hh @@ -417,3 +417,12 @@ mxs::PRWBackends::iterator find_best_backend(mxs::PRWBackends& backends, * The following are implemented in rwsplit_tmp_table_multi.c */ void close_all_connections(mxs::PRWBackends& backends); + +/** + * Utility function for extracting error messages from buffers + * + * @param buffer Buffer containing an error + * + * @return String representation of the error + */ +std::string extract_error(GWBUF* buffer); diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index 5b18fa775..4136d99f9 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -449,6 +449,7 @@ bool RWSplitSession::route_session_write(GWBUF* querybuf, uint8_t command, uint3 } else if (qc_query_is_type(type, QUERY_TYPE_DEALLOC_PREPARE)) { + mxb_assert(!mxs_mysql_is_ps_command(m_qc.current_route_info().command())); m_qc.ps_erase(querybuf); } diff --git a/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc b/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc index bf076c2de..9f85b65e4 100644 --- a/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc +++ b/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc @@ -29,19 +29,24 @@ using namespace maxscale; */ -static std::string extract_error(GWBUF* buffer) +std::string extract_error(GWBUF* buffer) { std::string rval; if (MYSQL_IS_ERROR_PACKET(((uint8_t*)GWBUF_DATA(buffer)))) { - size_t replylen = MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(buffer)); + size_t replylen = MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(buffer)) + MYSQL_HEADER_LEN; char replybuf[replylen]; gwbuf_copy_data(buffer, 0, sizeof(replybuf), (uint8_t*)replybuf); std::string err; std::string msg; - err.append(replybuf + 8, 5); - msg.append(replybuf + 13, replylen - 4 - 5); + + /** + * The payload starts with a one byte command followed by a two byte error code, a six byte state and + * a human-readable string that spans the rest of the packet. + */ + err.append(replybuf + MYSQL_HEADER_LEN + 3, 6); + msg.append(replybuf + MYSQL_HEADER_LEN + 3 + 6, replylen - MYSQL_HEADER_LEN - 3 - 6); rval = err + ": " + msg; } diff --git a/server/modules/routing/readwritesplit/rwsplitsession.cc b/server/modules/routing/readwritesplit/rwsplitsession.cc index c970eaab7..5e7f17f8f 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.cc +++ b/server/modules/routing/readwritesplit/rwsplitsession.cc @@ -941,7 +941,10 @@ void RWSplitSession::handleError(GWBUF* errmsgbuf, } else { - MXS_ERROR("Lost connection to the master server, closing session.%s", errmsg.c_str()); + int64_t idle = mxs_clock() - backend->dcb()->last_read; + MXS_ERROR("Lost connection to the master server, closing session.%s " + "Connection has been idle for %.1f seconds. Error caused by: %s", + errmsg.c_str(), (float)idle / 10.f, extract_error(errmsgbuf).c_str()); } }