Replication consistency with replication tree

Added replication consistency after replication tree computation
This commit is contained in:
MassimilianoPinto 2014-06-23 13:12:26 +02:00
parent 51d47accf7
commit d36100a99f

View File

@ -35,6 +35,7 @@
* 03/06/14 Mark Ridoch Add support for maintenance mode
* 17/06/14 Massimiliano Pinto Addition of getServerByNodeId routine
* and first implementation for depth of replication for nodes.
* 23/06/14 Massimiliano Pinto Added replication consistency after replication tree computation
*
* @endverbatim
*/
@ -678,6 +679,11 @@ monitorMain(void *arg)
{
MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg;
MONITOR_SERVERS *ptr;
int replication_heartbeat = handle->replicationHeartbeat;
int depth = 0;
int node_id = -1;
int num_servers=0;
int depth_level = 0;
if (mysql_thread_init())
{
@ -700,8 +706,14 @@ MONITOR_SERVERS *ptr;
ptr = handle->databases;
while (ptr)
{
/* monitor current node */
monitorDatabase(handle, ptr);
/* reset the slave list of current node */
*ptr->server->slaves = '\0';
num_servers++;
if (mon_status_changed(ptr))
{
dcb_call_foreach(ptr->server, DCB_REASON_NOT_RESPONDING);
@ -727,38 +739,116 @@ MONITOR_SERVERS *ptr;
/** Reset this server's error count */
ptr->mon_err_count = 0;
}
ptr = ptr->next;
}
/* check the replication tree */
ptr = handle->databases;
while (ptr)
{
depth = 0;
current = ptr->server;
node_id = current->node_id;
if (node_id < 1)
continue;
while(1) {
backend = getServerByNodeId(handle->databases, node_id);
if (backend) {
node_id = backend->master_id;
} else
node_id = -1;
if (node_id > 0) {
current->depth = depth + 1;
depth++;
} else {
current->depth = depth - 1;
break;
}
/* ToDO: move these lines into monitorDatabase() */
if ((! SERVER_IN_MAINT(ptr->server)) && (! SERVER_IS_RUNNING(ptr->server))) {
/* clear M/S status */
server_clear_status(ptr->server, SERVER_SLAVE);
server_clear_status(ptr->server, SERVER_MASTER);
}
ptr = ptr->next;
}
/* replication depth_level is now set its maximum value: num_servers */
depth_level = num_servers;
/* Compute the replication tree */
ptr = handle->databases;
while (ptr && (! SERVER_IN_MAINT(ptr->server)) && (SERVER_IS_RUNNING(ptr->server)))
{
depth = 0;
current = ptr->server;
fprintf(stderr, "Current node to check is %d, depth %d, master %d\n", current->node_id, depth, current->master_id);
node_id = current->master_id;
if (node_id < 1) {
SERVER *find_slave;
find_slave = getSlaveOfNodeId(handle->databases, current->node_id);
if (find_slave == NULL) {
current->depth = -1;
ptr = ptr->next;
fprintf(stderr, "no slaves for %d, continue\n", current->node_id);
continue;
} else {
current->depth = 0;
fprintf(stderr, "Found slave for %d: %d. Master level is %d\n", current->node_id, find_slave->node_id, current->depth);
}
} else {
depth++;
}
while(depth <= num_servers) {
/* set the root master at lowest depth level */
if (current->depth > -1 && current->depth < root_level) {
root_level = current->depth;
handle->master = current;
}
fprintf(stderr, "Repl depth is %d, servers are %d\n", depth, num_servers);
fprintf(stderr, "Look for backend %d, depth %d\n", node_id, depth);
backend = getServerByNodeId(handle->databases, node_id);
if (backend) {
node_id = backend->master_id;
} else
node_id = -1;
if (node_id > 0) {
fprintf(stderr, "Setting Repl Level to %d for node %d\n", depth+1, current->node_id);
current->depth = depth + 1;
depth++;
} else {
SERVER *master;
current->depth = depth;
fprintf(stderr, "no backend, Setting Repl Level to %d for node %d\n", depth, current->node_id);
fprintf(stderr, "Node %d/%d is slave for %d: curr depth %d\n", current->node_id, current->depth, current->master_id, depth);
master = getServerByNodeId(handle->databases, current->master_id);
if (master && master->node_id > 0) {
char this_slave[5]="";
sprintf(this_slave, "%d", current->node_id);
if (strlen(master->slaves) && master->slaves[strlen(master->slaves)-1] != ',')
strcat(master->slaves, ", ");
strcat(master->slaves, this_slave);
master->depth = current->depth -1;
fprintf(stderr, "setting slaves [%s] for master %d, master level %d\n", master->slaves, master->node_id, master->depth);
server_set_status(master, SERVER_MASTER);
}
break;
}
}
ptr = ptr->next;
}
// do the Replication consistency
if (replication_heartbeat) {
ptr = handle->databases;
while (ptr && (! SERVER_IN_MAINT(ptr->server)) && SERVER_IS_RUNNING(ptr->server))
{
fprintf(stderr, "Node %d is M %d, S %d, R %d Repl depth %d\n", ptr->server->node_id, SERVER_IS_MASTER(ptr->server), SERVER_IS_SLAVE(ptr->server), SERVER_IS_RELAY_SERVER(ptr->server), ptr->server->depth);
if (ptr->server->node_id == handle->master->node_id) {
fprintf(stderr, "Set Master heartbeat for %d\n", handle->master->node_id);
set_master_heartbeat(handle, ptr);
} else {
fprintf(stderr, "Set Slave heartbeat for %d\n", ptr->server->node_id);
set_slave_heartbeat(handle, ptr);
}
ptr = ptr->next;
}
}
thread_millisleep(handle->interval);
}