From f29e5b65de1d8f74c28e6fd314b043e4218e5fcd Mon Sep 17 00:00:00 2001 From: Niclas Antti Date: Fri, 9 Nov 2018 10:18:22 +0200 Subject: [PATCH 1/7] MXS-2057 systemd watchdog Systemd wathdog notification at a little more than 2/3 of the systemd configured time. In the service config (maxscale.service) add e.g. WatchdogSec=30s to set and enable the watchdog. For building: install libsystemd-dev. The next commit will modify cmake configuration and code to conditionally compile the new code based on existence of libsystemd-dev. --- include/maxscale/routingworker.hh | 11 +++++ maxutils/maxbase/src/CMakeLists.txt | 4 ++ server/core/gateway.cc | 8 ++++ server/core/routingworker.cc | 71 ++++++++++++++++++++++++++++- 4 files changed, 93 insertions(+), 1 deletion(-) diff --git a/include/maxscale/routingworker.hh b/include/maxscale/routingworker.hh index a05fb3743..19dfe2438 100644 --- a/include/maxscale/routingworker.hh +++ b/include/maxscale/routingworker.hh @@ -18,10 +18,12 @@ #include #include #include +#include #include #include #include +#include #include #include #include @@ -426,6 +428,10 @@ public: */ static std::unique_ptr get_qc_stats_as_json(const char* zHost, int id); + /** + * To be called from the initial (parent) thread if the systemd watchdog is on. + */ + static void set_watchdog_interval(uint64_t microseconds); private: const int m_id; /*< The id of the worker. */ SessionsById m_sessions; /*< A mapping of session_id->MXS_SESSION. The map @@ -447,9 +453,14 @@ private: void epoll_tick(); // override void delete_zombies(); + void check_systemd_watchdog(); static uint32_t epoll_instance_handler(MXB_POLL_DATA* data, MXB_WORKER* worker, uint32_t events); uint32_t handle_epoll_events(uint32_t events); + + static maxbase::Duration s_watchdog_interval; /*< Duration between notifications, if any. */ + static maxbase::TimePoint s_watchdog_next_check;/*< Next time to notify systemd. */ + std::atomic m_alive; /*< Set to true in epoll_tick(), false on notification. */ }; // Data local to a routing worker diff --git a/maxutils/maxbase/src/CMakeLists.txt b/maxutils/maxbase/src/CMakeLists.txt index 0e958370e..60ca60b8b 100644 --- a/maxutils/maxbase/src/CMakeLists.txt +++ b/maxutils/maxbase/src/CMakeLists.txt @@ -15,5 +15,9 @@ add_library(maxbase STATIC average.cc ) +target_link_libraries(maxbase + systemd +) + set_target_properties(maxbase PROPERTIES VERSION "1.0.0" LINK_FLAGS -Wl,-z,defs) add_subdirectory(test) diff --git a/server/core/gateway.cc b/server/core/gateway.cc index e0559f84e..d53c931ba 100644 --- a/server/core/gateway.cc +++ b/server/core/gateway.cc @@ -27,6 +27,7 @@ #include #include #include +#include #include #include @@ -1768,6 +1769,13 @@ int main(int argc, char** argv) cnf->log_target = MXB_LOG_TARGET_STDOUT; } + // Systemd watchdog. Must be called in the initial thread */ + uint64_t systemd_interval; // in microseconds + if (sd_watchdog_enabled(false, &systemd_interval) > 0) + { + RoutingWorker::set_watchdog_interval(systemd_interval); + } + if (!daemon_mode) { fprintf(stderr, diff --git a/server/core/routingworker.cc b/server/core/routingworker.cc index 88a749fe0..e42c0ec7a 100644 --- a/server/core/routingworker.cc +++ b/server/core/routingworker.cc @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -163,8 +164,14 @@ void modules_thread_finish() namespace maxscale { +// static +maxbase::Duration RoutingWorker::s_watchdog_interval {0}; +// static +maxbase::TimePoint RoutingWorker::s_watchdog_next_check; + RoutingWorker::RoutingWorker() : m_id(next_worker_id()) + , m_alive(true) { MXB_POLL_DATA::handler = &RoutingWorker::epoll_instance_handler; MXB_POLL_DATA::owner = this; @@ -266,6 +273,12 @@ bool RoutingWorker::init() // bofore the workes have been started) will be handled by the worker // that will be running in the main thread. this_thread.current_worker_id = 0; + + if (s_watchdog_interval.count() != 0) + { + MXS_NOTICE("The systemd watchdog is Enabled. Internal timeout = %s\n", + to_string(s_watchdog_interval).c_str()); + } } return this_unit.initialized; @@ -535,6 +548,8 @@ void RoutingWorker::epoll_tick() m_state = ZPROCESSING; delete_zombies(); + + check_systemd_watchdog(); } /** @@ -964,6 +979,61 @@ RoutingWorker* RoutingWorker::pick_worker() + (mxb::atomic::add(&id_generator, 1, mxb::atomic::RELAXED) % this_unit.nWorkers); return get(id); } + +// static +void maxscale::RoutingWorker::set_watchdog_interval(uint64_t microseconds) +{ + // Do not call anything from here, assume nothing has been initialized (like logging). + + // The internal timeout is 2/3 of the systemd configured interval. + double seconds = 2.0 * microseconds / 3000000; + + s_watchdog_interval = maxbase::Duration(seconds); + s_watchdog_next_check = maxbase::Clock::now(); +} + +// A note about the below code. While the main worker is turning the "m_alive" values to false, +// it is a possibility that another RoutingWorker sees the old value of "s_watchdog_next_check" +// but its new "m_alive==false" value, marks itself alive and promptly hangs. This would cause a +// watchdog kill delay of about "s_watchdog_interval" time. +// Release-acquire would fix that, but is an unneccesary expense. +void RoutingWorker::check_systemd_watchdog() +{ + if (s_watchdog_interval.count() == 0) // not turned on + { + return; + } + + maxbase::TimePoint now = maxbase::Clock::now(); + if (now > s_watchdog_next_check) + { + if (m_id == this_unit.id_main_worker) + { + m_alive.store(true, std::memory_order_relaxed); + bool all_alive = std::all_of(this_unit.ppWorkers, this_unit.ppWorkers + this_unit.nWorkers, + [](RoutingWorker* rw) { + return rw->m_alive.load(std::memory_order_relaxed); + }); + if (all_alive) + { + s_watchdog_next_check = now + s_watchdog_interval; + MXS_NOTICE("sd_notify\n"); + sd_notify(false, "WATCHDOG=1"); + std::for_each(this_unit.ppWorkers, this_unit.ppWorkers + this_unit.nWorkers, + [](RoutingWorker* rw) { + rw->m_alive.store(false, std::memory_order_relaxed); + }); + } + } + else + { + if (m_alive.load(std::memory_order_relaxed) == false) + { + m_alive.store(true, std::memory_order_relaxed); + } + } + } +} } size_t mxs_rworker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) @@ -1183,7 +1253,6 @@ public: // Success if this is called. } }; - } void mxs_rworker_watchdog() From 1108132cbdfa5b76c560fb082a71cb2c30ae8eb3 Mon Sep 17 00:00:00 2001 From: Niclas Antti Date: Fri, 9 Nov 2018 14:28:11 +0200 Subject: [PATCH 2/7] MXS-2057 Do not require systemd libraries Exclude systemd usage if the library is not installed. Only excluding what is necessary. This keeps the object size the same and still compiles most of the code. --- cmake/CheckPlatform.cmake | 6 ++++++ maxutils/maxbase/src/CMakeLists.txt | 6 +++--- server/core/gateway.cc | 4 ++++ server/core/routingworker.cc | 5 ++++- 4 files changed, 17 insertions(+), 4 deletions(-) diff --git a/cmake/CheckPlatform.cmake b/cmake/CheckPlatform.cmake index 199c4e4e6..574b0f04a 100644 --- a/cmake/CheckPlatform.cmake +++ b/cmake/CheckPlatform.cmake @@ -41,6 +41,12 @@ if(NOT HAVE_LIBPTHREAD) message(FATAL_ERROR "Could not find libpthread") endif() +# systemd libraries are optional +find_library(HAVE_SYSTEMD NAMES systemd) +if(HAVE_SYSTEMD) + add_definitions(-DHAVE_SYSTEMD=1) +endif() + # The XSI version of strerror_r return an int and the GNU version a char* check_cxx_source_compiles(" #define _GNU_SOURCE 1 diff --git a/maxutils/maxbase/src/CMakeLists.txt b/maxutils/maxbase/src/CMakeLists.txt index 60ca60b8b..f6186ebd5 100644 --- a/maxutils/maxbase/src/CMakeLists.txt +++ b/maxutils/maxbase/src/CMakeLists.txt @@ -15,9 +15,9 @@ add_library(maxbase STATIC average.cc ) -target_link_libraries(maxbase - systemd -) +if(HAVE_SYSTEMD) +target_link_libraries(maxbase systemd) +endif() set_target_properties(maxbase PROPERTIES VERSION "1.0.0" LINK_FLAGS -Wl,-z,defs) add_subdirectory(test) diff --git a/server/core/gateway.cc b/server/core/gateway.cc index d53c931ba..28b052931 100644 --- a/server/core/gateway.cc +++ b/server/core/gateway.cc @@ -27,7 +27,9 @@ #include #include #include +#ifdef HAVE_SYSTEMD #include +#endif #include #include @@ -1769,12 +1771,14 @@ int main(int argc, char** argv) cnf->log_target = MXB_LOG_TARGET_STDOUT; } +#ifdef HAVE_SYSTEMD // Systemd watchdog. Must be called in the initial thread */ uint64_t systemd_interval; // in microseconds if (sd_watchdog_enabled(false, &systemd_interval) > 0) { RoutingWorker::set_watchdog_interval(systemd_interval); } +#endif if (!daemon_mode) { diff --git a/server/core/routingworker.cc b/server/core/routingworker.cc index e42c0ec7a..72096dbff 100644 --- a/server/core/routingworker.cc +++ b/server/core/routingworker.cc @@ -18,7 +18,9 @@ #include #include #include +#ifdef HAVE_SYSTEMD #include +#endif #include #include @@ -1017,8 +1019,9 @@ void RoutingWorker::check_systemd_watchdog() if (all_alive) { s_watchdog_next_check = now + s_watchdog_interval; - MXS_NOTICE("sd_notify\n"); +#ifdef HAVE_SYSTEMD sd_notify(false, "WATCHDOG=1"); +#endif std::for_each(this_unit.ppWorkers, this_unit.ppWorkers + this_unit.nWorkers, [](RoutingWorker* rw) { rw->m_alive.store(false, std::memory_order_relaxed); From f4dd0628dab050f669d544beafa51246ced9cb90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Sun, 11 Nov 2018 17:19:52 +0200 Subject: [PATCH 3/7] Fix COM_CHANGE_USER handling If the service doesn't require collection of complete packets, the user reauthentication done with COM_CHANGE_USER would be skipped. This caused the change_user test to fail. By temporarily switching to full packet collection mode for the duration of the COM_CHANGE_USER, we avoid duplicating the code for the streaming router types. --- .../modules/protocol/MySQL/mariadbclient/mysql_client.cc | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/server/modules/protocol/MySQL/mariadbclient/mysql_client.cc b/server/modules/protocol/MySQL/mariadbclient/mysql_client.cc index 88c55bbd9..cd104ee6d 100644 --- a/server/modules/protocol/MySQL/mariadbclient/mysql_client.cc +++ b/server/modules/protocol/MySQL/mariadbclient/mysql_client.cc @@ -1019,11 +1019,13 @@ gw_read_normal_data(DCB *dcb, GWBUF *read_buffer, int nbytes_read) /** Ask what type of input the router/filter chain expects */ capabilities = service_get_capabilities(session->service); + MySQLProtocol* proto = static_cast(dcb->protocol); /** If the router requires statement input we need to make sure that * a complete SQL packet is read before continuing. The current command * that is tracked by the protocol module is updated in route_by_statement() */ - if (rcap_type_required(capabilities, RCAP_TYPE_STMT_INPUT)) + if (rcap_type_required(capabilities, RCAP_TYPE_STMT_INPUT) + || proto->current_command == MXS_COM_CHANGE_USER) { uint8_t pktlen[MYSQL_HEADER_LEN]; size_t n_copied = gwbuf_copy_data(read_buffer, 0, MYSQL_HEADER_LEN, pktlen); @@ -1062,7 +1064,6 @@ gw_read_normal_data(DCB *dcb, GWBUF *read_buffer, int nbytes_read) qc_set_server_version(service_get_version(session->service, SERVICE_VERSION_MIN)); spec_com_res_t res = RES_CONTINUE; - MySQLProtocol* proto = static_cast(dcb->protocol); if (!proto->changing_user) { @@ -1137,7 +1138,8 @@ gw_read_finish_processing(DCB *dcb, GWBUF *read_buffer, uint64_t capabilities) /** Reset error handler when routing of the new query begins */ dcb->dcb_errhandle_called = false; - if (rcap_type_required(capabilities, RCAP_TYPE_STMT_INPUT)) + if (rcap_type_required(capabilities, RCAP_TYPE_STMT_INPUT) + || proto->current_command == MXS_COM_CHANGE_USER) { /** * Feed each statement completely and separately to router. From 7e54cb8132d8a3e9b278b0c9b5d18fe0e51e1db7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Fri, 9 Nov 2018 15:42:20 +0200 Subject: [PATCH 4/7] Fix crash in cat The router used the wrong capabilities and results weren't delivered as complete and contiguous packets. --- server/modules/routing/cat/cat.cc | 2 +- server/modules/routing/cat/catsession.cc | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/server/modules/routing/cat/cat.cc b/server/modules/routing/cat/cat.cc index 97d82c843..874df9e62 100644 --- a/server/modules/routing/cat/cat.cc +++ b/server/modules/routing/cat/cat.cc @@ -55,7 +55,7 @@ json_t* Cat::diagnostics_json() const return NULL; } -const uint64_t caps = RCAP_TYPE_STMT_OUTPUT | RCAP_TYPE_STMT_INPUT; +const uint64_t caps = RCAP_TYPE_PACKET_OUTPUT | RCAP_TYPE_CONTIGUOUS_OUTPUT | RCAP_TYPE_STMT_INPUT; uint64_t Cat::getCapabilities() { diff --git a/server/modules/routing/cat/catsession.cc b/server/modules/routing/cat/catsession.cc index eb6c97185..06cc820e1 100644 --- a/server/modules/routing/cat/catsession.cc +++ b/server/modules/routing/cat/catsession.cc @@ -104,6 +104,7 @@ void CatSession::clientReply(GWBUF* pPacket, DCB* pDcb) if (send) { // Increment the packet sequence number and send it to the client + mxb_assert(modutil_count_packets(pPacket) > 0); GWBUF_DATA(pPacket)[3] = m_packet_num++; MXS_SESSION_ROUTE_REPLY(pDcb->session, pPacket); } From b443bb7525eef6d089fb14ca025faef50b9a719d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Sun, 11 Nov 2018 21:59:39 +0200 Subject: [PATCH 5/7] Store PS session commands with internal ID Commit a9e236497963251f8b4afa07484b88ad97e73a03 changed where the PS ID for a binary protocol command is replaced with the internal form. This caused prepared statements that are also session commands to be always routed with the external ID. As the external ID is almost always the master's ID, the aforementioned bug resulted in odd side-effects and the true cause of these was only revealed when the error message sent by the slave was included in the log messages. --- .../routing/readwritesplit/rwsplit_route_stmt.cc | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index a8802cc7e..2d09b153c 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -400,6 +400,17 @@ void RWSplitSession::continue_large_session_write(GWBUF* querybuf, uint32_t type */ bool RWSplitSession::route_session_write(GWBUF* querybuf, uint8_t command, uint32_t type) { + if (mxs_mysql_is_ps_command(m_qc.current_route_info().command())) + { + /** + * Replace the ID with our internal one, the backends will replace it with their own ID + * when the packet is being written. We use the internal ID when we store the command + * to remove the need for extra conversions from external to internal form when the command + * is being replayed on a server. + */ + replace_binary_ps_id(querybuf, m_qc.current_route_info().stmt_id()); + } + /** The SessionCommand takes ownership of the buffer */ uint64_t id = m_sescmd_count++; mxs::SSessionCommand sescmd(new mxs::SessionCommand(querybuf, id)); From f2688784cf14d2474e90bcf127488760c300f0c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Sun, 11 Nov 2018 22:07:34 +0200 Subject: [PATCH 6/7] Reconnect before sync in mxs1743_rconn_bitmask The blocking of the nodes that happens before it could cause the connections to break. This also removes the need for the fixing of the replication which takes time. --- maxscale-system-test/mxs1743_rconn_bitmask.cpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/maxscale-system-test/mxs1743_rconn_bitmask.cpp b/maxscale-system-test/mxs1743_rconn_bitmask.cpp index f261eca33..032a9ecb5 100644 --- a/maxscale-system-test/mxs1743_rconn_bitmask.cpp +++ b/maxscale-system-test/mxs1743_rconn_bitmask.cpp @@ -44,13 +44,11 @@ int main(int argc, char** argv) test.try_query(test.repl->nodes[0], "%s", "CREATE USER 'mxs1743'@'%' IDENTIFIED BY 'mxs1743'"); test.try_query(test.repl->nodes[0], "%s", "GRANT ALL ON *.* TO 'mxs1743'@'%'"); - test.tprintf("Fix replication"); - test.set_timeout(60); - test.repl->fix_replication(); - test.set_timeout(120); test.tprintf("Syncing slaves"); + test.repl->connect(); test.repl->sync_slaves(); + test.repl->disconnect(); test.tprintf("Opening new connections to verify readconnroute works"); test.set_timeout(60); @@ -78,6 +76,7 @@ int main(int argc, char** argv) char master_connections[1024]; char slave_connections[1024]; test.set_timeout(60); + test.repl->connect(); find_field(test.repl->nodes[0], query.c_str(), "connections", master_connections); find_field(test.repl->nodes[1], query.c_str(), "connections", slave_connections); From ae0e9b359d1559657183561e2acfc0f5ed08e0cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Mon, 12 Nov 2018 08:53:27 +0200 Subject: [PATCH 7/7] Fix use of zero-weight servers The servers with a zero weight would be always used over ones that have a weight. This means that the behavior was inverted and caused the mxs2054_hybrid_cluster test to fail in 2.3. Also fixed a typo in the deprecation message. --- server/core/service.cc | 2 +- .../readwritesplit/rwsplit_select_backends.cc | 13 ++++++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/server/core/service.cc b/server/core/service.cc index c1964d4b7..8900dc722 100644 --- a/server/core/service.cc +++ b/server/core/service.cc @@ -1703,7 +1703,7 @@ static void service_calculate_weights(SERVICE* service) if (*weightby && service->dbref) { // DEPRECATED in 2.3, remove in 2.4. - MXS_WARNING("Setting of server weigths (%s) has been deprecated" + MXS_WARNING("Setting of server weights (%s) has been deprecated" " and will be removed in a later version of MaxScale.", weightby); diff --git a/server/modules/routing/readwritesplit/rwsplit_select_backends.cc b/server/modules/routing/readwritesplit/rwsplit_select_backends.cc index 2b1b3a6d3..8403e8340 100644 --- a/server/modules/routing/readwritesplit/rwsplit_select_backends.cc +++ b/server/modules/routing/readwritesplit/rwsplit_select_backends.cc @@ -79,7 +79,8 @@ SRWBackendVector::iterator best_score(SRWBackendVector& sBackends, SRWBackendVector::iterator backend_cmp_router_conn(SRWBackendVector& sBackends) { static auto server_score = [](SERVER_REF* server) { - return server->server_weight ? (server->connections + 1) / server->server_weight : 0; + return server->server_weight ? (server->connections + 1) / server->server_weight : + std::numeric_limits::max(); }; return best_score(sBackends, server_score); @@ -89,7 +90,8 @@ SRWBackendVector::iterator backend_cmp_router_conn(SRWBackendVector& sBackends) SRWBackendVector::iterator backend_cmp_global_conn(SRWBackendVector& sBackends) { static auto server_score = [](SERVER_REF* server) { - return server->server_weight ? (server->server->stats.n_current + 1) / server->server_weight : 0; + return server->server_weight ? (server->server->stats.n_current + 1) / server->server_weight : + std::numeric_limits::max(); }; return best_score(sBackends, server_score); @@ -99,7 +101,8 @@ SRWBackendVector::iterator backend_cmp_global_conn(SRWBackendVector& sBackends) SRWBackendVector::iterator backend_cmp_behind_master(SRWBackendVector& sBackends) { static auto server_score = [](SERVER_REF* server) { - return server->server_weight ? server->server->rlag / server->server_weight : 0; + return server->server_weight ? server->server->rlag / server->server_weight : + std::numeric_limits::max(); }; return best_score(sBackends, server_score); @@ -109,8 +112,8 @@ SRWBackendVector::iterator backend_cmp_behind_master(SRWBackendVector& sBackends SRWBackendVector::iterator backend_cmp_current_load(SRWBackendVector& sBackends) { static auto server_score = [](SERVER_REF* server) { - return server->server_weight ? (server->server->stats.n_current_ops + 1) - / server->server_weight : 0; + return server->server_weight ? (server->server->stats.n_current_ops + 1) / server->server_weight : + std::numeric_limits::max(); }; return best_score(sBackends, server_score);