From 51d47accf7c268abb52482db3960d2eab00d5b6f Mon Sep 17 00:00:00 2001 From: MassimilianoPinto Date: Mon, 23 Jun 2014 12:56:01 +0200 Subject: [PATCH] Added replication consistency routines Added replication consistency routines --- server/modules/monitor/mysql_mon.c | 196 ++++++++++++++++++++++++++++- 1 file changed, 195 insertions(+), 1 deletion(-) diff --git a/server/modules/monitor/mysql_mon.c b/server/modules/monitor/mysql_mon.c index ca5d63772..25e73d9e3 100644 --- a/server/modules/monitor/mysql_mon.c +++ b/server/modules/monitor/mysql_mon.c @@ -75,9 +75,12 @@ static void diagnostics(DCB *, void *); static void setInterval(void *, unsigned long); static void defaultId(void *, unsigned long); static void replicationHeartbeat(void *, int); -static SERVER *getServerByNodeId(MONITOR_SERVERS *, int); static bool mon_status_changed(MONITOR_SERVERS* mon_srv); static bool mon_print_fail_status(MONITOR_SERVERS* mon_srv); +static SERVER *getServerByNodeId(MONITOR_SERVERS *, int); +static SERVER *getSlaveOfNodeId(MONITOR_SERVERS *, int); +static void set_master_heartbeat(MYSQL_MONITOR *handle, MONITOR_SERVERS *database); +static void set_slave_heartbeat(MYSQL_MONITOR *handle, MONITOR_SERVERS *database); static MONITOR_OBJECT MyObject = { startMonitor, stopMonitor, registerServer, unregisterServer, defaultUser, diagnostics, setInterval, defaultId, replicationHeartbeat }; @@ -856,3 +859,194 @@ getServerByNodeId(MONITOR_SERVERS *ptr, int node_id) { return NULL; } +/** + * Fetch a MySQL slave node from a node_id + * + * @param ptr The list of servers to monitor + * @param node_id The MySQL server_id to fetch + */ +static SERVER *getSlaveOfNodeId(MONITOR_SERVERS *ptr, int node_id) { + SERVER *current; + while (ptr) + { + current = ptr->server; + if (current->master_id == node_id) { + return current; + } + ptr = ptr->next; + } + return NULL; +} + +static void set_master_heartbeat(MYSQL_MONITOR *handle, MONITOR_SERVERS *database) { + unsigned long id = handle->id; + time_t heartbeat; + time_t purge_time; + char heartbeat_insert_query[128]=""; + char heartbeat_purge_query[128]=""; + + /* create the maxscale_schema database */ + if (mysql_query(database->con, "CREATE DATABASE IF NOT EXISTS maxscale_schema")) { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "[mysql_mon]: Error creating maxscale_schema database in Master server" + ": %s", mysql_error(database->con)))); + + database->server->rlag = -1; + } + + /* create repl_heartbeat table in maxscale_schema database */ + if (mysql_query(database->con, "CREATE TABLE IF NOT EXISTS " + "maxscale_schema.replication_heartbeat " + "(maxscale_id INT NOT NULL, " + "master_server_id INT NOT NULL, " + "master_timestamp INT UNSIGNED NOT NULL, " + "PRIMARY KEY ( master_server_id, maxscale_id ) ) " + "ENGINE=MYISAM DEFAULT CHARSET=latin1")) { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "[mysql_mon]: Error creating maxscale_schema.replication_heartbeat table in Master server" + ": %s", mysql_error(database->con)))); + + database->server->rlag = -1; + } + + /* auto purge old values after 48 hours*/ + purge_time = time(0) - (3600 * 48); + + sprintf(heartbeat_purge_query, "DELETE FROM maxscale_schema.replication_heartbeat WHERE master_timestamp < %lu", purge_time); + + if (mysql_query(database->con, heartbeat_purge_query)) { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "[mysql_mon]: Error deleting from maxscale_schema.replication_heartbeat table: [%s], %s", + heartbeat_purge_query, + mysql_error(database->con)))); + } + + heartbeat = time(0); + + /* set node_ts for master as time(0) */ + database->server->node_ts = heartbeat; + + sprintf(heartbeat_insert_query, "UPDATE maxscale_schema.replication_heartbeat SET master_timestamp = %lu WHERE master_server_id = %i AND maxscale_id = %lu", heartbeat, handle->master->node_id, id); + + /* Try to insert MaxScale timestamp into master */ + if (mysql_query(database->con, heartbeat_insert_query)) { + + database->server->rlag = -1; + + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "[mysql_mon]: Error updating maxscale_schema.replication_heartbeat table: [%s], %s", + heartbeat_insert_query, + mysql_error(database->con)))); + } else { + if (mysql_affected_rows(database->con) == 0) { + heartbeat = time(0); + sprintf(heartbeat_insert_query, "REPLACE INTO maxscale_schema.replication_heartbeat (master_server_id, maxscale_id, master_timestamp ) VALUES ( %i, %lu, %lu)", handle->master->node_id, id, heartbeat); + + if (mysql_query(database->con, heartbeat_insert_query)) { + + database->server->rlag = -1; + + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "[mysql_mon]: Error inserting into maxscale_schema.replication_heartbeat table: [%s], %s", + heartbeat_insert_query, + mysql_error(database->con)))); + } else { + /* Set replication lag to 0 for the master */ + database->server->rlag = 0; + + LOGIF(LD, (skygw_log_write_flush( + LOGFILE_DEBUG, + "[mysql_mon]: heartbeat table inserted data for %s:%i", database->server->name, database->server->port))); + } + } else { + /* Set replication lag as 0 for the master */ + database->server->rlag = 0; + + LOGIF(LD, (skygw_log_write_flush( + LOGFILE_DEBUG, + "[mysql_mon]: heartbeat table updated for %s:%i", database->server->name, database->server->port))); + } + } +} + +static void set_slave_heartbeat(MYSQL_MONITOR *handle, MONITOR_SERVERS *database) { + unsigned long id = handle->id; + time_t heartbeat; + char select_heartbeat_query[256] = ""; + MYSQL_ROW row; + MYSQL_RES *result; + int num_fields; + + sprintf(select_heartbeat_query, "SELECT master_timestamp " + "FROM maxscale_schema.replication_heartbeat " + "WHERE maxscale_id = %lu AND master_server_id = %i", + id, handle->master->node_id); + + /* if there is a master then send the query to the slave with master_id*/ + if (handle->master !=NULL && (mysql_query(database->con, select_heartbeat_query) == 0 + && (result = mysql_store_result(database->con)) != NULL)) { + num_fields = mysql_num_fields(result); + + while ((row = mysql_fetch_row(result))) { + int rlag = -1; + time_t slave_read; + + heartbeat = time(0); + slave_read = strtoul(row[0], NULL, 10); + + if ((errno == ERANGE && (slave_read == LONG_MAX || slave_read == LONG_MIN)) || (errno != 0 && slave_read == 0)) { + slave_read = 0; + } + + if (slave_read) { + /* set the replication lag */ + rlag = heartbeat - slave_read; + } + + /* set this node_ts as master_timestamp read from replication_heartbeat table */ + database->server->node_ts = slave_read; + + if (rlag >= 0) { + /* store rlag only if greater than monitor sampling interval */ + database->server->rlag = (rlag > (handle->interval / 1000)) ? rlag : 0; + } else { + database->server->rlag = -1; + } + + LOGIF(LD, (skygw_log_write_flush( + LOGFILE_DEBUG, + "[mysql_mon]: replication heartbeat: " + "server %s:%i is %i seconds behind master", + database->server->name, + database->server->port, + database->server->rlag))); + } + mysql_free_result(result); + } else { + database->server->rlag = -1; + database->server->node_ts = 0; + + if (handle->master->node_id < 0) { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "[mysql_mon]: error: replication heartbeat: " + "master_server_id NOT available for %s:%i", + database->server->name, + database->server->port))); + } else { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "[mysql_mon]: error: replication heartbeat: " + "failed selecting from hearthbeat table of %s:%i : [%s], %s", + database->server->name, + database->server->port, + select_heartbeat_query, + mysql_error(database->con)))); + } + } +}