Sync slaves and writer thread in binlog_change_master
The test uses a separate writer thread to insert data into the master. This thread must be halted before the blocking of the master happens as the slaves must catch up. Once slaves have caught up and the master is blocked, the writer thread can continue doing inserts. At the end of the test slaves must also be synchronized before the inserted data is validated. This prevents test failures due to slave lag.
This commit is contained in:
@ -31,6 +31,43 @@ int failed_transaction_num = 0;
|
|||||||
/** The amount of rows each transaction inserts */
|
/** The amount of rows each transaction inserts */
|
||||||
const int N_INSERTS = 100;
|
const int N_INSERTS = 100;
|
||||||
|
|
||||||
|
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
|
||||||
|
|
||||||
|
bool sync_servers(MYSQL* master, MYSQL* slave)
|
||||||
|
{
|
||||||
|
bool rval = false;
|
||||||
|
int t = 240;
|
||||||
|
|
||||||
|
for (int i = 0; i < t; i++)
|
||||||
|
{
|
||||||
|
char master_log_file[80] = "";
|
||||||
|
char master_log_pos[80] = "";
|
||||||
|
char slave_log_file[80] = "";
|
||||||
|
char slave_log_pos[80] = "";
|
||||||
|
find_field(master, "SHOW MASTER STATUS", "File", master_log_file);
|
||||||
|
find_field(master, "SHOW MASTER STATUS", "Position", master_log_pos);
|
||||||
|
find_field(slave, "SHOW SLAVE STATUS", "Master_Log_File", slave_log_file);
|
||||||
|
find_field(slave, "SHOW SLAVE STATUS", "Read_Master_Log_Pos", slave_log_pos);
|
||||||
|
|
||||||
|
if (strcmp(slave_log_file, master_log_file) == 0 && strcmp(slave_log_pos, master_log_pos) == 0)
|
||||||
|
{
|
||||||
|
rval = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
sleep(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!rval)
|
||||||
|
{
|
||||||
|
printf("WARNING: Slave has not caught up in %d seconds. Test will most likely fail.\n", t);
|
||||||
|
}
|
||||||
|
|
||||||
|
return rval;
|
||||||
|
}
|
||||||
|
|
||||||
int transaction(MYSQL * conn, int N)
|
int transaction(MYSQL * conn, int N)
|
||||||
{
|
{
|
||||||
int local_result = 0;
|
int local_result = 0;
|
||||||
@ -83,6 +120,7 @@ int main(int argc, char *argv[])
|
|||||||
|
|
||||||
Test->repl->connect();
|
Test->repl->connect();
|
||||||
execute_query(Test->repl->nodes[0], (char *) "DROP TABLE IF EXISTS t1;");
|
execute_query(Test->repl->nodes[0], (char *) "DROP TABLE IF EXISTS t1;");
|
||||||
|
Test->repl->sync_slaves();
|
||||||
Test->repl->close_connections();
|
Test->repl->close_connections();
|
||||||
sleep(5);
|
sleep(5);
|
||||||
|
|
||||||
@ -91,8 +129,6 @@ int main(int argc, char *argv[])
|
|||||||
Test->repl->execute_query_all_nodes((char *) "RESET SLAVE ALL");
|
Test->repl->execute_query_all_nodes((char *) "RESET SLAVE ALL");
|
||||||
Test->repl->execute_query_all_nodes((char *) "RESET MASTER");
|
Test->repl->execute_query_all_nodes((char *) "RESET MASTER");
|
||||||
|
|
||||||
Test->repl->verbose = true;
|
|
||||||
|
|
||||||
Test->tprintf("Starting binlog configuration\n");
|
Test->tprintf("Starting binlog configuration\n");
|
||||||
Test->start_binlog(0);
|
Test->start_binlog(0);
|
||||||
|
|
||||||
@ -120,11 +156,18 @@ int main(int argc, char *argv[])
|
|||||||
|
|
||||||
sleep(15);
|
sleep(15);
|
||||||
|
|
||||||
Test->tprintf("Blocking master\n");
|
pthread_mutex_lock(&mutex);
|
||||||
Test->repl->block_node(0);
|
|
||||||
Test->stop_timeout();
|
|
||||||
|
|
||||||
sleep(180);
|
sync_servers(Test->repl->nodes[0], Test->repl->nodes[3]);
|
||||||
|
Test->tprintf("Blocking master");
|
||||||
|
Test->repl->block_node(0);
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&mutex);
|
||||||
|
|
||||||
|
for (int i = 0; i < 180 && exit_flag == 0; i++)
|
||||||
|
{
|
||||||
|
sleep(i);
|
||||||
|
}
|
||||||
|
|
||||||
Test->tprintf("Done! Waiting for thread\n");
|
Test->tprintf("Done! Waiting for thread\n");
|
||||||
exit_flag = 1;
|
exit_flag = 1;
|
||||||
@ -135,10 +178,8 @@ int main(int argc, char *argv[])
|
|||||||
char rep[256];
|
char rep[256];
|
||||||
int rep_d;
|
int rep_d;
|
||||||
|
|
||||||
Test->tprintf("Sleeping to let replication happen\n");
|
|
||||||
sleep(30);
|
|
||||||
|
|
||||||
Test->repl->connect();
|
Test->repl->connect();
|
||||||
|
sync_servers(Test->repl->nodes[2], Test->repl->nodes[3]);
|
||||||
|
|
||||||
for (int i_n = 3; i_n < Test->repl->N; i_n++)
|
for (int i_n = 3; i_n < Test->repl->N; i_n++)
|
||||||
{
|
{
|
||||||
@ -321,6 +362,8 @@ void *transaction_thread( void *ptr )
|
|||||||
|
|
||||||
while ((exit_flag == 0) && i_trans < trans_max)
|
while ((exit_flag == 0) && i_trans < trans_max)
|
||||||
{
|
{
|
||||||
|
pthread_mutex_lock(&mutex);
|
||||||
|
|
||||||
trans_result = transaction(conn, i_trans);
|
trans_result = transaction(conn, i_trans);
|
||||||
if (trans_result != 0)
|
if (trans_result != 0)
|
||||||
{
|
{
|
||||||
@ -328,7 +371,7 @@ void *transaction_thread( void *ptr )
|
|||||||
failed_transaction_num = i_trans;
|
failed_transaction_num = i_trans;
|
||||||
Test->tprintf("Closing connection\n");
|
Test->tprintf("Closing connection\n");
|
||||||
mysql_close(conn);
|
mysql_close(conn);
|
||||||
Test->tprintf("Waiting for repication\n");
|
Test->tprintf("Waiting for replication");
|
||||||
sleep(15);
|
sleep(15);
|
||||||
Test->tprintf("Calling select_new_master()\n");
|
Test->tprintf("Calling select_new_master()\n");
|
||||||
select_new_master(Test);
|
select_new_master(Test);
|
||||||
@ -341,8 +384,11 @@ void *transaction_thread( void *ptr )
|
|||||||
i_trans--;
|
i_trans--;
|
||||||
}
|
}
|
||||||
i_trans++;
|
i_trans++;
|
||||||
|
pthread_mutex_unlock(&mutex);
|
||||||
}
|
}
|
||||||
i_trans--;
|
i_trans--;
|
||||||
|
|
||||||
|
exit_flag = 1;
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -819,7 +819,6 @@ int TestConnections::start_binlog(int m)
|
|||||||
binlog = open_conn_no_db(maxscales->binlog_port[m], maxscales->IP[m], repl->user_name, repl->password, ssl);
|
binlog = open_conn_no_db(maxscales->binlog_port[m], maxscales->IP[m], repl->user_name, repl->password, ssl);
|
||||||
execute_query(binlog, (char *) "stop slave");
|
execute_query(binlog, (char *) "stop slave");
|
||||||
execute_query(binlog, (char *) "reset slave all");
|
execute_query(binlog, (char *) "reset slave all");
|
||||||
execute_query(binlog, (char *) "reset master");
|
|
||||||
mysql_close(binlog);
|
mysql_close(binlog);
|
||||||
|
|
||||||
tprintf("Stopping maxscale\n");
|
tprintf("Stopping maxscale\n");
|
||||||
|
|||||||
Reference in New Issue
Block a user