Merge branch '2.3' into develop

This commit is contained in:
Markus Mäkelä
2018-11-12 12:58:40 +02:00
12 changed files with 136 additions and 15 deletions

View File

@ -41,6 +41,12 @@ if(NOT HAVE_LIBPTHREAD)
message(FATAL_ERROR "Could not find libpthread") message(FATAL_ERROR "Could not find libpthread")
endif() endif()
# systemd libraries are optional
find_library(HAVE_SYSTEMD NAMES systemd)
if(HAVE_SYSTEMD)
add_definitions(-DHAVE_SYSTEMD=1)
endif()
# The XSI version of strerror_r return an int and the GNU version a char* # The XSI version of strerror_r return an int and the GNU version a char*
check_cxx_source_compiles(" check_cxx_source_compiles("
#define _GNU_SOURCE 1 #define _GNU_SOURCE 1

View File

@ -18,10 +18,12 @@
#include <vector> #include <vector>
#include <mutex> #include <mutex>
#include <type_traits> #include <type_traits>
#include <atomic>
#include <maxbase/atomic.hh> #include <maxbase/atomic.hh>
#include <maxbase/semaphore.hh> #include <maxbase/semaphore.hh>
#include <maxbase/worker.hh> #include <maxbase/worker.hh>
#include <maxbase/stopwatch.hh>
#include <maxscale/poll.h> #include <maxscale/poll.h>
#include <maxscale/query_classifier.h> #include <maxscale/query_classifier.h>
#include <maxscale/routingworker.h> #include <maxscale/routingworker.h>
@ -426,6 +428,10 @@ public:
*/ */
static std::unique_ptr<json_t> get_qc_stats_as_json(const char* zHost, int id); static std::unique_ptr<json_t> get_qc_stats_as_json(const char* zHost, int id);
/**
* To be called from the initial (parent) thread if the systemd watchdog is on.
*/
static void set_watchdog_interval(uint64_t microseconds);
private: private:
const int m_id; /*< The id of the worker. */ const int m_id; /*< The id of the worker. */
SessionsById m_sessions; /*< A mapping of session_id->MXS_SESSION. The map SessionsById m_sessions; /*< A mapping of session_id->MXS_SESSION. The map
@ -447,9 +453,14 @@ private:
void epoll_tick(); // override void epoll_tick(); // override
void delete_zombies(); void delete_zombies();
void check_systemd_watchdog();
static uint32_t epoll_instance_handler(MXB_POLL_DATA* data, MXB_WORKER* worker, uint32_t events); static uint32_t epoll_instance_handler(MXB_POLL_DATA* data, MXB_WORKER* worker, uint32_t events);
uint32_t handle_epoll_events(uint32_t events); uint32_t handle_epoll_events(uint32_t events);
static maxbase::Duration s_watchdog_interval; /*< Duration between notifications, if any. */
static maxbase::TimePoint s_watchdog_next_check;/*< Next time to notify systemd. */
std::atomic<bool> m_alive; /*< Set to true in epoll_tick(), false on notification. */
}; };
// Data local to a routing worker // Data local to a routing worker

View File

@ -44,13 +44,11 @@ int main(int argc, char** argv)
test.try_query(test.repl->nodes[0], "%s", "CREATE USER 'mxs1743'@'%' IDENTIFIED BY 'mxs1743'"); test.try_query(test.repl->nodes[0], "%s", "CREATE USER 'mxs1743'@'%' IDENTIFIED BY 'mxs1743'");
test.try_query(test.repl->nodes[0], "%s", "GRANT ALL ON *.* TO 'mxs1743'@'%'"); test.try_query(test.repl->nodes[0], "%s", "GRANT ALL ON *.* TO 'mxs1743'@'%'");
test.tprintf("Fix replication");
test.set_timeout(60);
test.repl->fix_replication();
test.set_timeout(120); test.set_timeout(120);
test.tprintf("Syncing slaves"); test.tprintf("Syncing slaves");
test.repl->connect();
test.repl->sync_slaves(); test.repl->sync_slaves();
test.repl->disconnect();
test.tprintf("Opening new connections to verify readconnroute works"); test.tprintf("Opening new connections to verify readconnroute works");
test.set_timeout(60); test.set_timeout(60);
@ -78,6 +76,7 @@ int main(int argc, char** argv)
char master_connections[1024]; char master_connections[1024];
char slave_connections[1024]; char slave_connections[1024];
test.set_timeout(60); test.set_timeout(60);
test.repl->connect();
find_field(test.repl->nodes[0], query.c_str(), "connections", master_connections); find_field(test.repl->nodes[0], query.c_str(), "connections", master_connections);
find_field(test.repl->nodes[1], query.c_str(), "connections", slave_connections); find_field(test.repl->nodes[1], query.c_str(), "connections", slave_connections);

View File

@ -15,5 +15,9 @@ add_library(maxbase STATIC
average.cc average.cc
) )
if(HAVE_SYSTEMD)
target_link_libraries(maxbase systemd)
endif()
set_target_properties(maxbase PROPERTIES VERSION "1.0.0" LINK_FLAGS -Wl,-z,defs) set_target_properties(maxbase PROPERTIES VERSION "1.0.0" LINK_FLAGS -Wl,-z,defs)
add_subdirectory(test) add_subdirectory(test)

View File

@ -27,6 +27,9 @@
#include <time.h> #include <time.h>
#include <unistd.h> #include <unistd.h>
#include <getopt.h> #include <getopt.h>
#ifdef HAVE_SYSTEMD
#include <systemd/sd-daemon.h>
#endif
#include <set> #include <set>
#include <map> #include <map>
@ -1768,6 +1771,15 @@ int main(int argc, char** argv)
cnf->log_target = MXB_LOG_TARGET_STDOUT; cnf->log_target = MXB_LOG_TARGET_STDOUT;
} }
#ifdef HAVE_SYSTEMD
// Systemd watchdog. Must be called in the initial thread */
uint64_t systemd_interval; // in microseconds
if (sd_watchdog_enabled(false, &systemd_interval) > 0)
{
RoutingWorker::set_watchdog_interval(systemd_interval);
}
#endif
if (!daemon_mode) if (!daemon_mode)
{ {
fprintf(stderr, fprintf(stderr,

View File

@ -18,6 +18,9 @@
#include <signal.h> #include <signal.h>
#include <stdlib.h> #include <stdlib.h>
#include <unistd.h> #include <unistd.h>
#ifdef HAVE_SYSTEMD
#include <systemd/sd-daemon.h>
#endif
#include <vector> #include <vector>
#include <sstream> #include <sstream>
@ -163,8 +166,14 @@ void modules_thread_finish()
namespace maxscale namespace maxscale
{ {
// static
maxbase::Duration RoutingWorker::s_watchdog_interval {0};
// static
maxbase::TimePoint RoutingWorker::s_watchdog_next_check;
RoutingWorker::RoutingWorker() RoutingWorker::RoutingWorker()
: m_id(next_worker_id()) : m_id(next_worker_id())
, m_alive(true)
{ {
MXB_POLL_DATA::handler = &RoutingWorker::epoll_instance_handler; MXB_POLL_DATA::handler = &RoutingWorker::epoll_instance_handler;
MXB_POLL_DATA::owner = this; MXB_POLL_DATA::owner = this;
@ -266,6 +275,12 @@ bool RoutingWorker::init()
// bofore the workes have been started) will be handled by the worker // bofore the workes have been started) will be handled by the worker
// that will be running in the main thread. // that will be running in the main thread.
this_thread.current_worker_id = 0; this_thread.current_worker_id = 0;
if (s_watchdog_interval.count() != 0)
{
MXS_NOTICE("The systemd watchdog is Enabled. Internal timeout = %s\n",
to_string(s_watchdog_interval).c_str());
}
} }
return this_unit.initialized; return this_unit.initialized;
@ -535,6 +550,8 @@ void RoutingWorker::epoll_tick()
m_state = ZPROCESSING; m_state = ZPROCESSING;
delete_zombies(); delete_zombies();
check_systemd_watchdog();
} }
/** /**
@ -964,6 +981,62 @@ RoutingWorker* RoutingWorker::pick_worker()
+ (mxb::atomic::add(&id_generator, 1, mxb::atomic::RELAXED) % this_unit.nWorkers); + (mxb::atomic::add(&id_generator, 1, mxb::atomic::RELAXED) % this_unit.nWorkers);
return get(id); return get(id);
} }
// static
void maxscale::RoutingWorker::set_watchdog_interval(uint64_t microseconds)
{
// Do not call anything from here, assume nothing has been initialized (like logging).
// The internal timeout is 2/3 of the systemd configured interval.
double seconds = 2.0 * microseconds / 3000000;
s_watchdog_interval = maxbase::Duration(seconds);
s_watchdog_next_check = maxbase::Clock::now();
}
// A note about the below code. While the main worker is turning the "m_alive" values to false,
// it is a possibility that another RoutingWorker sees the old value of "s_watchdog_next_check"
// but its new "m_alive==false" value, marks itself alive and promptly hangs. This would cause a
// watchdog kill delay of about "s_watchdog_interval" time.
// Release-acquire would fix that, but is an unneccesary expense.
void RoutingWorker::check_systemd_watchdog()
{
if (s_watchdog_interval.count() == 0) // not turned on
{
return;
}
maxbase::TimePoint now = maxbase::Clock::now();
if (now > s_watchdog_next_check)
{
if (m_id == this_unit.id_main_worker)
{
m_alive.store(true, std::memory_order_relaxed);
bool all_alive = std::all_of(this_unit.ppWorkers, this_unit.ppWorkers + this_unit.nWorkers,
[](RoutingWorker* rw) {
return rw->m_alive.load(std::memory_order_relaxed);
});
if (all_alive)
{
s_watchdog_next_check = now + s_watchdog_interval;
#ifdef HAVE_SYSTEMD
sd_notify(false, "WATCHDOG=1");
#endif
std::for_each(this_unit.ppWorkers, this_unit.ppWorkers + this_unit.nWorkers,
[](RoutingWorker* rw) {
rw->m_alive.store(false, std::memory_order_relaxed);
});
}
}
else
{
if (m_alive.load(std::memory_order_relaxed) == false)
{
m_alive.store(true, std::memory_order_relaxed);
}
}
}
}
} }
size_t mxs_rworker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) size_t mxs_rworker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2)
@ -1183,7 +1256,6 @@ public:
// Success if this is called. // Success if this is called.
} }
}; };
} }
void mxs_rworker_watchdog() void mxs_rworker_watchdog()

View File

@ -1703,7 +1703,7 @@ static void service_calculate_weights(SERVICE* service)
if (*weightby && service->dbref) if (*weightby && service->dbref)
{ {
// DEPRECATED in 2.3, remove in 2.4. // DEPRECATED in 2.3, remove in 2.4.
MXS_WARNING("Setting of server weigths (%s) has been deprecated" MXS_WARNING("Setting of server weights (%s) has been deprecated"
" and will be removed in a later version of MaxScale.", " and will be removed in a later version of MaxScale.",
weightby); weightby);

View File

@ -1065,11 +1065,13 @@ static int gw_read_normal_data(DCB* dcb, GWBUF* read_buffer, int nbytes_read)
/** Ask what type of input the router/filter chain expects */ /** Ask what type of input the router/filter chain expects */
capabilities = service_get_capabilities(session->service); capabilities = service_get_capabilities(session->service);
MySQLProtocol* proto = static_cast<MySQLProtocol*>(dcb->protocol);
/** If the router requires statement input we need to make sure that /** If the router requires statement input we need to make sure that
* a complete SQL packet is read before continuing. The current command * a complete SQL packet is read before continuing. The current command
* that is tracked by the protocol module is updated in route_by_statement() */ * that is tracked by the protocol module is updated in route_by_statement() */
if (rcap_type_required(capabilities, RCAP_TYPE_STMT_INPUT)) if (rcap_type_required(capabilities, RCAP_TYPE_STMT_INPUT)
|| proto->current_command == MXS_COM_CHANGE_USER)
{ {
uint8_t pktlen[MYSQL_HEADER_LEN]; uint8_t pktlen[MYSQL_HEADER_LEN];
size_t n_copied = gwbuf_copy_data(read_buffer, 0, MYSQL_HEADER_LEN, pktlen); size_t n_copied = gwbuf_copy_data(read_buffer, 0, MYSQL_HEADER_LEN, pktlen);
@ -1121,7 +1123,6 @@ static int gw_read_normal_data(DCB* dcb, GWBUF* read_buffer, int nbytes_read)
qc_set_server_version(service_get_version(session->service, SERVICE_VERSION_MIN)); qc_set_server_version(service_get_version(session->service, SERVICE_VERSION_MIN));
spec_com_res_t res = RES_CONTINUE; spec_com_res_t res = RES_CONTINUE;
MySQLProtocol* proto = static_cast<MySQLProtocol*>(dcb->protocol);
if (!proto->changing_user) if (!proto->changing_user)
{ {
@ -1194,7 +1195,8 @@ static int gw_read_finish_processing(DCB* dcb, GWBUF* read_buffer, uint64_t capa
/** Reset error handler when routing of the new query begins */ /** Reset error handler when routing of the new query begins */
dcb->dcb_errhandle_called = false; dcb->dcb_errhandle_called = false;
if (rcap_type_required(capabilities, RCAP_TYPE_STMT_INPUT)) if (rcap_type_required(capabilities, RCAP_TYPE_STMT_INPUT)
|| proto->current_command == MXS_COM_CHANGE_USER)
{ {
/** /**
* Feed each statement completely and separately to router. * Feed each statement completely and separately to router.

View File

@ -55,7 +55,7 @@ json_t* Cat::diagnostics_json() const
return NULL; return NULL;
} }
const uint64_t caps = RCAP_TYPE_STMT_OUTPUT | RCAP_TYPE_STMT_INPUT; const uint64_t caps = RCAP_TYPE_PACKET_OUTPUT | RCAP_TYPE_CONTIGUOUS_OUTPUT | RCAP_TYPE_STMT_INPUT;
uint64_t Cat::getCapabilities() uint64_t Cat::getCapabilities()
{ {

View File

@ -104,6 +104,7 @@ void CatSession::clientReply(GWBUF* pPacket, DCB* pDcb)
if (send) if (send)
{ {
// Increment the packet sequence number and send it to the client // Increment the packet sequence number and send it to the client
mxb_assert(modutil_count_packets(pPacket) > 0);
GWBUF_DATA(pPacket)[3] = m_packet_num++; GWBUF_DATA(pPacket)[3] = m_packet_num++;
MXS_SESSION_ROUTE_REPLY(pDcb->session, pPacket); MXS_SESSION_ROUTE_REPLY(pDcb->session, pPacket);
} }

View File

@ -400,6 +400,17 @@ void RWSplitSession::continue_large_session_write(GWBUF* querybuf, uint32_t type
*/ */
bool RWSplitSession::route_session_write(GWBUF* querybuf, uint8_t command, uint32_t type) bool RWSplitSession::route_session_write(GWBUF* querybuf, uint8_t command, uint32_t type)
{ {
if (mxs_mysql_is_ps_command(m_qc.current_route_info().command()))
{
/**
* Replace the ID with our internal one, the backends will replace it with their own ID
* when the packet is being written. We use the internal ID when we store the command
* to remove the need for extra conversions from external to internal form when the command
* is being replayed on a server.
*/
replace_binary_ps_id(querybuf, m_qc.current_route_info().stmt_id());
}
/** The SessionCommand takes ownership of the buffer */ /** The SessionCommand takes ownership of the buffer */
uint64_t id = m_sescmd_count++; uint64_t id = m_sescmd_count++;
mxs::SSessionCommand sescmd(new mxs::SessionCommand(querybuf, id)); mxs::SSessionCommand sescmd(new mxs::SessionCommand(querybuf, id));

View File

@ -79,7 +79,8 @@ SRWBackendVector::iterator best_score(SRWBackendVector& sBackends,
SRWBackendVector::iterator backend_cmp_router_conn(SRWBackendVector& sBackends) SRWBackendVector::iterator backend_cmp_router_conn(SRWBackendVector& sBackends)
{ {
static auto server_score = [](SERVER_REF* server) { static auto server_score = [](SERVER_REF* server) {
return server->server_weight ? (server->connections + 1) / server->server_weight : 0; return server->server_weight ? (server->connections + 1) / server->server_weight :
std::numeric_limits<double>::max();
}; };
return best_score(sBackends, server_score); return best_score(sBackends, server_score);
@ -89,7 +90,8 @@ SRWBackendVector::iterator backend_cmp_router_conn(SRWBackendVector& sBackends)
SRWBackendVector::iterator backend_cmp_global_conn(SRWBackendVector& sBackends) SRWBackendVector::iterator backend_cmp_global_conn(SRWBackendVector& sBackends)
{ {
static auto server_score = [](SERVER_REF* server) { static auto server_score = [](SERVER_REF* server) {
return server->server_weight ? (server->server->stats.n_current + 1) / server->server_weight : 0; return server->server_weight ? (server->server->stats.n_current + 1) / server->server_weight :
std::numeric_limits<double>::max();
}; };
return best_score(sBackends, server_score); return best_score(sBackends, server_score);
@ -99,7 +101,8 @@ SRWBackendVector::iterator backend_cmp_global_conn(SRWBackendVector& sBackends)
SRWBackendVector::iterator backend_cmp_behind_master(SRWBackendVector& sBackends) SRWBackendVector::iterator backend_cmp_behind_master(SRWBackendVector& sBackends)
{ {
static auto server_score = [](SERVER_REF* server) { static auto server_score = [](SERVER_REF* server) {
return server->server_weight ? server->server->rlag / server->server_weight : 0; return server->server_weight ? server->server->rlag / server->server_weight :
std::numeric_limits<double>::max();
}; };
return best_score(sBackends, server_score); return best_score(sBackends, server_score);
@ -109,8 +112,8 @@ SRWBackendVector::iterator backend_cmp_behind_master(SRWBackendVector& sBackends
SRWBackendVector::iterator backend_cmp_current_load(SRWBackendVector& sBackends) SRWBackendVector::iterator backend_cmp_current_load(SRWBackendVector& sBackends)
{ {
static auto server_score = [](SERVER_REF* server) { static auto server_score = [](SERVER_REF* server) {
return server->server_weight ? (server->server->stats.n_current_ops + 1) return server->server_weight ? (server->server->stats.n_current_ops + 1) / server->server_weight :
/ server->server_weight : 0; std::numeric_limits<double>::max();
}; };
return best_score(sBackends, server_score); return best_score(sBackends, server_score);