MXS-1506: Make heartbeat reads atomic
The old hkheartbeat variable was changed to the mxs_clock() function that simply wraps an atomic load of the variable. This allows it to be correctly read by MaxScale as well as opening up the possibility of converting the value load to a relaxed memory order read. Renamed the header and associated macros. Removed inclusion of the heartbeat header from the housekeeper header and added it to the files that were missing it.
This commit is contained in:
@ -17,20 +17,24 @@
|
||||
MXS_BEGIN_DECLS
|
||||
|
||||
/**
|
||||
* The global housekeeper heartbeat value. This value is incremented
|
||||
* every 100 milliseconds and may be used for crude timing etc.
|
||||
* The global clock
|
||||
*
|
||||
* This value is incremented roughly every 100 milliseconds and may be used for
|
||||
* very crude timing. The crudeness is due to the fact that the housekeeper
|
||||
* thread does the updating of this value.
|
||||
*
|
||||
* @return The current clock tick
|
||||
*/
|
||||
|
||||
extern int64_t hkheartbeat;
|
||||
int64_t mxs_clock();
|
||||
|
||||
/**
|
||||
* Convert heartbeats to seconds
|
||||
*/
|
||||
#define HB_TO_SEC(a) ((int64_t)a / 10)
|
||||
#define MXS_CLOCK_TO_SEC(a) ((int64_t)a / 10)
|
||||
|
||||
/**
|
||||
* Convert seconds to heartbeats
|
||||
*/
|
||||
#define SEC_TO_HB(a) ((int64_t)a * 10)
|
||||
#define MXS_SEC_TO_CLOCK(a) ((int64_t)a * 10)
|
||||
|
||||
MXS_END_DECLS
|
||||
@ -19,7 +19,6 @@
|
||||
#include <maxscale/cdefs.h>
|
||||
#include <time.h>
|
||||
#include <maxscale/dcb.h>
|
||||
#include <maxscale/hk_heartbeat.h>
|
||||
|
||||
MXS_BEGIN_DECLS
|
||||
|
||||
|
||||
@ -31,7 +31,7 @@
|
||||
#include <maxscale/thread.h>
|
||||
#include <maxscale/utils.h>
|
||||
#include <maxscale/config.h>
|
||||
#include <maxscale/hk_heartbeat.h>
|
||||
#include <maxscale/clock.h>
|
||||
#include <maxscale/http.hh>
|
||||
#include <maxscale/adminusers.h>
|
||||
|
||||
|
||||
@ -44,7 +44,7 @@
|
||||
#include <maxscale/http.hh>
|
||||
#include <maxscale/version.h>
|
||||
#include <maxscale/maxscale.h>
|
||||
#include <maxscale/hk_heartbeat.h>
|
||||
#include <maxscale/clock.h>
|
||||
|
||||
#include "internal/config.h"
|
||||
#include "internal/filter.h"
|
||||
@ -4214,7 +4214,7 @@ json_t* config_maxscale_to_json(const char* host)
|
||||
|
||||
json_t* attr = json_object();
|
||||
time_t started = maxscale_started();
|
||||
time_t activated = started + HB_TO_SEC(cnf->promoted_at);
|
||||
time_t activated = started + MXS_CLOCK_TO_SEC(cnf->promoted_at);
|
||||
json_object_set_new(attr, CN_PARAMETERS, param);
|
||||
json_object_set_new(attr, "version", json_string(MAXSCALE_VERSION));
|
||||
json_object_set_new(attr, "commit", json_string(MAXSCALE_COMMIT));
|
||||
|
||||
@ -23,7 +23,7 @@
|
||||
#include <algorithm>
|
||||
|
||||
#include <maxscale/atomic.h>
|
||||
#include <maxscale/hk_heartbeat.h>
|
||||
#include <maxscale/clock.h>
|
||||
#include <maxscale/jansson.hh>
|
||||
#include <maxscale/json_api.h>
|
||||
#include <maxscale/paths.h>
|
||||
@ -765,7 +765,7 @@ bool runtime_alter_maxscale(const char* name, const char* value)
|
||||
if (cnf.passive && !boolval)
|
||||
{
|
||||
// This MaxScale is being promoted to the active instance
|
||||
cnf.promoted_at = hkheartbeat;
|
||||
cnf.promoted_at = mxs_clock();
|
||||
}
|
||||
|
||||
cnf.passive = boolval;
|
||||
|
||||
@ -42,7 +42,7 @@
|
||||
#include <maxscale/atomic.h>
|
||||
#include <maxscale/atomic.h>
|
||||
#include <maxscale/hashtable.h>
|
||||
#include <maxscale/hk_heartbeat.h>
|
||||
#include <maxscale/clock.h>
|
||||
#include <maxscale/limits.h>
|
||||
#include <maxscale/listener.h>
|
||||
#include <maxscale/log_manager.h>
|
||||
@ -198,7 +198,7 @@ dcb_alloc(dcb_role_t role, SERV_LISTENER *listener)
|
||||
dcb_initialize(newdcb);
|
||||
newdcb->dcb_role = role;
|
||||
newdcb->listener = listener;
|
||||
newdcb->last_read = hkheartbeat;
|
||||
newdcb->last_read = mxs_clock();
|
||||
|
||||
if (role == DCB_ROLE_SERVICE_LISTENER)
|
||||
{
|
||||
@ -403,7 +403,7 @@ dcb_connect(SERVER *server, MXS_SESSION *session, const char *protocol)
|
||||
MXS_DEBUG("Reusing a persistent connection, dcb %p", dcb);
|
||||
dcb->persistentstart = 0;
|
||||
dcb->was_persistent = true;
|
||||
dcb->last_read = hkheartbeat;
|
||||
dcb->last_read = mxs_clock();
|
||||
atomic_add_uint64(&server->stats.n_from_pool, 1);
|
||||
return dcb;
|
||||
}
|
||||
@ -594,7 +594,7 @@ int dcb_read(DCB *dcb,
|
||||
else
|
||||
{
|
||||
GWBUF *buffer;
|
||||
dcb->last_read = hkheartbeat;
|
||||
dcb->last_read = mxs_clock();
|
||||
|
||||
buffer = dcb_basic_read(dcb, bytes_available, maxbytes, nreadtotal, &nsingleread);
|
||||
if (buffer)
|
||||
@ -748,7 +748,7 @@ dcb_read_SSL(DCB *dcb, GWBUF **head)
|
||||
dcb_drain_writeq(dcb);
|
||||
}
|
||||
|
||||
dcb->last_read = hkheartbeat;
|
||||
dcb->last_read = mxs_clock();
|
||||
buffer = dcb_basic_read_SSL(dcb, &nsingleread);
|
||||
if (buffer)
|
||||
{
|
||||
@ -757,7 +757,7 @@ dcb_read_SSL(DCB *dcb, GWBUF **head)
|
||||
|
||||
while (buffer)
|
||||
{
|
||||
dcb->last_read = hkheartbeat;
|
||||
dcb->last_read = mxs_clock();
|
||||
buffer = dcb_basic_read_SSL(dcb, &nsingleread);
|
||||
if (buffer)
|
||||
{
|
||||
@ -2904,11 +2904,11 @@ void dcb_enable_session_timeouts()
|
||||
*/
|
||||
void dcb_process_idle_sessions(int thr)
|
||||
{
|
||||
if (this_unit.check_timeouts && hkheartbeat >= this_thread.next_timeout_check)
|
||||
if (this_unit.check_timeouts && mxs_clock() >= this_thread.next_timeout_check)
|
||||
{
|
||||
/** Because the resolution of the timeout is one second, we only need to
|
||||
* check for it once per second. One heartbeat is 100 milliseconds. */
|
||||
this_thread.next_timeout_check = hkheartbeat + 10;
|
||||
this_thread.next_timeout_check = mxs_clock() + 10;
|
||||
|
||||
for (DCB *dcb = this_unit.all_dcbs[thr]; dcb; dcb = dcb->thread.next)
|
||||
{
|
||||
@ -2919,7 +2919,7 @@ void dcb_process_idle_sessions(int thr)
|
||||
|
||||
if (service->conn_idle_timeout && dcb->state == DCB_STATE_POLLING)
|
||||
{
|
||||
int64_t idle = hkheartbeat - dcb->last_read;
|
||||
int64_t idle = mxs_clock() - dcb->last_read;
|
||||
int64_t timeout = service->conn_idle_timeout * 10;
|
||||
|
||||
if (idle > timeout)
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
|
||||
#include <maxscale/alloc.h>
|
||||
#include <maxscale/atomic.h>
|
||||
#include <maxscale/clock.h>
|
||||
#include <maxscale/config.h>
|
||||
#include <maxscale/semaphore.h>
|
||||
#include <maxscale/spinlock.h>
|
||||
@ -48,7 +49,6 @@ static SPINLOCK tasklock = SPINLOCK_INIT;
|
||||
|
||||
static bool do_shutdown = 0;
|
||||
|
||||
int64_t hkheartbeat = 0; /*< One heartbeat is 100 milliseconds */
|
||||
static THREAD hk_thr_handle;
|
||||
|
||||
static void hkthread(void *);
|
||||
@ -59,6 +59,14 @@ struct hkinit_result
|
||||
bool ok;
|
||||
};
|
||||
|
||||
// TODO: Move these into a separate file
|
||||
static int64_t mxs_clock_ticks = 0; /*< One clock tick is 100 milliseconds */
|
||||
|
||||
int64_t mxs_clock()
|
||||
{
|
||||
return atomic_load_int64(&mxs_clock_ticks);
|
||||
}
|
||||
|
||||
bool
|
||||
hkinit()
|
||||
{
|
||||
@ -240,7 +248,7 @@ void hkthread(void *data)
|
||||
for (i = 0; i < 10; i++)
|
||||
{
|
||||
thread_millisleep(100);
|
||||
atomic_add_int64(&hkheartbeat, 1);
|
||||
atomic_add_int64(&mxs_clock_ticks, 1);
|
||||
}
|
||||
now = time(0);
|
||||
spinlock_acquire(&tasklock);
|
||||
|
||||
@ -28,7 +28,7 @@
|
||||
#include <vector>
|
||||
|
||||
#include <maxscale/alloc.h>
|
||||
#include <maxscale/hk_heartbeat.h>
|
||||
#include <maxscale/clock.h>
|
||||
#include <maxscale/json_api.h>
|
||||
#include <maxscale/log_manager.h>
|
||||
#include <maxscale/mysql_utils.h>
|
||||
@ -1759,7 +1759,7 @@ void mon_process_state_changes(MXS_MONITOR *monitor, const char *script, uint64_
|
||||
*/
|
||||
mxs_monitor_event_t event = mon_get_event_type(ptr);
|
||||
ptr->server->last_event = event;
|
||||
ptr->server->triggered_at = hkheartbeat;
|
||||
ptr->server->triggered_at = mxs_clock();
|
||||
ptr->server->active_event = !config_get_global_options()->passive;
|
||||
ptr->new_event = true;
|
||||
mon_log_state_change(ptr);
|
||||
|
||||
@ -31,7 +31,7 @@
|
||||
#include <maxscale/alloc.h>
|
||||
#include <maxscale/atomic.h>
|
||||
#include <maxscale/config.h>
|
||||
#include <maxscale/hk_heartbeat.h>
|
||||
#include <maxscale/clock.h>
|
||||
#include <maxscale/platform.h>
|
||||
#include <maxscale/server.h>
|
||||
#include <maxscale/statistics.h>
|
||||
|
||||
@ -37,7 +37,7 @@
|
||||
#include <maxscale/utils.h>
|
||||
#include <maxscale/semaphore.hh>
|
||||
#include <maxscale/json_api.h>
|
||||
#include <maxscale/hk_heartbeat.h>
|
||||
#include <maxscale/clock.h>
|
||||
#include <maxscale/http.hh>
|
||||
#include <maxscale/maxscale.h>
|
||||
|
||||
@ -514,7 +514,7 @@ dprintServer(DCB *dcb, const SERVER *server)
|
||||
dcb_printf(dcb, "\tMaster Id: %ld\n", server->master_id);
|
||||
dcb_printf(dcb, "\tLast event: %s\n",
|
||||
mon_get_event_name((mxs_monitor_event_t)server->last_event));
|
||||
time_t t = maxscale_started() + HB_TO_SEC(server->triggered_at);
|
||||
time_t t = maxscale_started() + MXS_CLOCK_TO_SEC(server->triggered_at);
|
||||
dcb_printf(dcb, "\tTriggered at: %s\n", http_to_date(t).c_str());
|
||||
|
||||
if (server->slaves)
|
||||
@ -1421,7 +1421,7 @@ static json_t* server_json_attributes(const SERVER* server)
|
||||
json_object_set_new(attr, "replication_depth", json_integer(server->depth));
|
||||
|
||||
const char* event_name = mon_get_event_name((mxs_monitor_event_t)server->last_event);
|
||||
time_t t = maxscale_started() + HB_TO_SEC(server->triggered_at);
|
||||
time_t t = maxscale_started() + MXS_CLOCK_TO_SEC(server->triggered_at);
|
||||
json_object_set_new(attr, "last_event", json_string(event_name));
|
||||
json_object_set_new(attr, "triggered_at", json_string(http_to_date(t).c_str()));
|
||||
|
||||
|
||||
@ -28,6 +28,7 @@
|
||||
|
||||
#include <maxscale/alloc.h>
|
||||
#include <maxscale/atomic.h>
|
||||
#include <maxscale/clock.h>
|
||||
#include <maxscale/dcb.h>
|
||||
#include <maxscale/housekeeper.h>
|
||||
#include <maxscale/log_manager.h>
|
||||
@ -507,7 +508,7 @@ dprintSession(DCB *dcb, MXS_SESSION *print_session)
|
||||
|
||||
if (print_session->client_dcb && print_session->client_dcb->remote)
|
||||
{
|
||||
double idle = (hkheartbeat - print_session->client_dcb->last_read);
|
||||
double idle = (mxs_clock() - print_session->client_dcb->last_read);
|
||||
idle = idle > 0 ? idle / 10.f : 0;
|
||||
dcb_printf(dcb, "\tClient Address: %s%s%s\n",
|
||||
print_session->client_dcb->user ? print_session->client_dcb->user : "",
|
||||
@ -1073,7 +1074,7 @@ json_t* session_json_data(const MXS_SESSION *session, const char *host)
|
||||
|
||||
if (session->client_dcb->state == DCB_STATE_POLLING)
|
||||
{
|
||||
double idle = (hkheartbeat - session->client_dcb->last_read);
|
||||
double idle = (mxs_clock() - session->client_dcb->last_read);
|
||||
idle = idle > 0 ? idle / 10.f : 0;
|
||||
json_object_set_new(attr, "idle", json_real(idle));
|
||||
}
|
||||
|
||||
@ -24,7 +24,7 @@
|
||||
#include <maxscale/alloc.h>
|
||||
#include <maxscale/atomic.h>
|
||||
#include <maxscale/config.h>
|
||||
#include <maxscale/hk_heartbeat.h>
|
||||
#include <maxscale/clock.h>
|
||||
#include <maxscale/log_manager.h>
|
||||
#include <maxscale/platform.h>
|
||||
#include <maxscale/semaphore.hh>
|
||||
@ -1214,12 +1214,12 @@ void Worker::poll_waitevents()
|
||||
m_statistics.n_fds[(nfds < STATISTICS::MAXNFDS ? (nfds - 1) : STATISTICS::MAXNFDS - 1)]++;
|
||||
}
|
||||
|
||||
uint64_t cycle_start = hkheartbeat;
|
||||
uint64_t cycle_start = mxs_clock();
|
||||
|
||||
for (int i = 0; i < nfds; i++)
|
||||
{
|
||||
/** Calculate event queue statistics */
|
||||
int64_t started = hkheartbeat;
|
||||
int64_t started = mxs_clock();
|
||||
int64_t qtime = started - cycle_start;
|
||||
|
||||
if (qtime > STATISTICS::N_QUEUE_TIMES)
|
||||
@ -1263,7 +1263,7 @@ void Worker::poll_waitevents()
|
||||
}
|
||||
|
||||
/** Calculate event execution statistics */
|
||||
qtime = hkheartbeat - started;
|
||||
qtime = mxs_clock() - started;
|
||||
|
||||
if (qtime > STATISTICS::N_QUEUE_TIMES)
|
||||
{
|
||||
|
||||
@ -15,7 +15,7 @@
|
||||
|
||||
#include <inttypes.h>
|
||||
#include <sstream>
|
||||
#include <maxscale/hk_heartbeat.h>
|
||||
#include <maxscale/clock.h>
|
||||
#include <maxscale/mysql_utils.h>
|
||||
#include "utilities.hh"
|
||||
|
||||
@ -1498,8 +1498,8 @@ bool MariaDBMonitor::mon_process_failover(bool* cluster_modified_out)
|
||||
* passive, we need to execute the failover script again if no new
|
||||
* masters have appeared.
|
||||
*/
|
||||
int64_t timeout = SEC_TO_HB(m_failover_timeout);
|
||||
int64_t t = hkheartbeat - ptr->server->triggered_at;
|
||||
int64_t timeout = MXS_SEC_TO_CLOCK(m_failover_timeout);
|
||||
int64_t t = mxs_clock() - ptr->server->triggered_at;
|
||||
|
||||
if (t > timeout)
|
||||
{
|
||||
|
||||
@ -22,7 +22,7 @@
|
||||
#include <vector>
|
||||
|
||||
#include <maxscale/alloc.h>
|
||||
#include <maxscale/hk_heartbeat.h>
|
||||
#include <maxscale/clock.h>
|
||||
#include <maxscale/log_manager.h>
|
||||
#include <maxscale/modutil.h>
|
||||
#include <maxscale/mysql_utils.h>
|
||||
@ -953,7 +953,7 @@ bool read_complete_packet(DCB *dcb, GWBUF **readbuf)
|
||||
if (dcb_read(dcb, &localbuf, 0) >= 0)
|
||||
{
|
||||
rval = true;
|
||||
dcb->last_read = hkheartbeat;
|
||||
dcb->last_read = mxs_clock();
|
||||
GWBUF *packets = modutil_get_complete_packets(&localbuf);
|
||||
|
||||
if (packets)
|
||||
|
||||
@ -89,6 +89,7 @@
|
||||
#include <maxscale/alloc.h>
|
||||
#include <inttypes.h>
|
||||
#include <maxscale/utils.h>
|
||||
#include <maxscale/clock.h>
|
||||
|
||||
/**
|
||||
* This struct is used by sqlite3_exec callback routine
|
||||
@ -2540,13 +2541,12 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
|
||||
/* Handle ROTATE_EVENT */
|
||||
if (hdr.event_type == ROTATE_EVENT)
|
||||
{
|
||||
unsigned long beat1 = hkheartbeat;
|
||||
|
||||
int64_t beat1 = mxs_clock();
|
||||
blr_close_binlog(router, file);
|
||||
if (hkheartbeat - beat1 > 1)
|
||||
int64_t beat2 = mxs_clock();
|
||||
if (beat2 - beat1 > 1)
|
||||
{
|
||||
MXS_ERROR("blr_close_binlog took %lu maxscale beats",
|
||||
hkheartbeat - beat1);
|
||||
MXS_ERROR("blr_close_binlog took %ld maxscale beats", beat2 - beat1);
|
||||
}
|
||||
/* Set new file in slave->binlogfile */
|
||||
blr_slave_rotate(router, slave, GWBUF_DATA(record));
|
||||
@ -2555,7 +2555,7 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
|
||||
MXS_FREE(slave->encryption_ctx);
|
||||
slave->encryption_ctx = NULL;
|
||||
|
||||
beat1 = hkheartbeat;
|
||||
beat1 = mxs_clock();
|
||||
|
||||
#ifdef BLFILE_IN_SLAVE
|
||||
if ((slave->file = blr_open_binlog(router,
|
||||
@ -2623,10 +2623,10 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
|
||||
#ifdef BLFILE_IN_SLAVE
|
||||
file = slave->file;
|
||||
#endif
|
||||
if (hkheartbeat - beat1 > 1)
|
||||
if (mxs_clock() - beat1 > 1)
|
||||
{
|
||||
MXS_ERROR("blr_open_binlog took %lu beats",
|
||||
hkheartbeat - beat1);
|
||||
mxs_clock() - beat1);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
#include <strings.h>
|
||||
|
||||
#include <maxscale/alloc.h>
|
||||
#include <maxscale/hk_heartbeat.h>
|
||||
#include <maxscale/clock.h>
|
||||
#include <maxscale/modutil.h>
|
||||
#include <maxscale/router.h>
|
||||
#include <maxscale/server.h>
|
||||
@ -81,12 +81,12 @@ void RWSplitSession::handle_connection_keepalive(SRWBackend& target)
|
||||
if (backend->in_use() && backend != target && !backend->is_waiting_result())
|
||||
{
|
||||
ss_debug(nserv++);
|
||||
int diff = hkheartbeat - backend->dcb()->last_read;
|
||||
int diff = mxs_clock() - backend->dcb()->last_read;
|
||||
|
||||
if (diff > keepalive)
|
||||
{
|
||||
MXS_INFO("Pinging %s, idle for %ld seconds",
|
||||
backend->name(), HB_TO_SEC(diff));
|
||||
backend->name(), MXS_CLOCK_TO_SEC(diff));
|
||||
modutil_ignorable_ping(backend->dcb());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user