Cleanup sharding test

Added a timeout to ensure shard info is updated.
This commit is contained in:
Esa Korhonen 2019-03-29 13:09:20 +02:00
parent 738ae9178b
commit 9281220e91

View File

@ -38,134 +38,158 @@
int main(int argc, char* argv[])
{
TestConnections test(argc, argv);
test.set_timeout(30);
const char opening_fmt[] = "Opening connection to sharding router using user '%s', "
"password '%s' and db '%s'.\n";
const char common_db[] = "common_db"; // This db will be on all backends.
const char shard_db_fmt[] = "shard_db_%i"; // Server-specific db name prefix
const char shard_table_fmt[] = "shard_table_%i";
const char user_name_fmt[] = "test_user_%i";
const char user_pw_fmt[] = "test_passwd_%i";
TestConnections* Test = new TestConnections(argc, argv);
Test->set_timeout(30);
int i, j;
char str[256];
char str1[256];
char user_str[256];
char pass_str[256];
const int N = test.repl->N; // Number of nodes and users.
const int LEN = 20;
Test->repl->execute_query_all_nodes("STOP SLAVE");
Test->repl->connect();
// Generate various strings.
char shard_dbs[N][LEN];
char shard_tables[N][LEN];
char user_names[N][LEN];
char user_pws[N][LEN];
for (i = 0; i < Test->repl->N; i++) // nodes
for (int i = 0; i < N; i++)
{
for (j = 0; j < Test->repl->N; j++) // users
sprintf(shard_dbs[i], shard_db_fmt, i);
sprintf(shard_tables[i], shard_table_fmt, i);
sprintf(user_names[i], user_name_fmt, i);
sprintf(user_pws[i], user_pw_fmt, i);
}
test.repl->execute_query_all_nodes("STOP SLAVE");
test.repl->connect();
test.set_timeout(30);
// On every node...
for (int i = 0; i < N; i++)
{
test.tprintf("\nNode %i:\n----------\n", i);
auto node = test.repl->nodes[i];
// for every user...
for (int j = 0; j < N; j++) // users
{
Test->set_timeout(30);
execute_query(Test->repl->nodes[i], "DROP USER 'user%d'@'%%';", j);
execute_query(Test->repl->nodes[i], "CREATE USER 'user%d'@'%%' IDENTIFIED BY 'pass%d';", j, j);
execute_query(Test->repl->nodes[i], "DROP DATABASE IF EXISTS shard_db");
// create users
auto user = user_names[j];
auto pass = user_pws[j];
if (test.try_query(node, "CREATE OR REPLACE USER '%s'@'%%' IDENTIFIED BY '%s';", user, pass) == 0)
{
test.tprintf("Created user '%s'.", user);
}
}
execute_query(Test->repl->nodes[i], "DROP DATABASE IF EXISTS shard_db%d", i);
execute_query(Test->repl->nodes[i], "CREATE DATABASE shard_db%d", i);
}
Test->stop_timeout();
auto shard_db = shard_dbs[i];
if (test.try_query(node, "CREATE OR REPLACE DATABASE %s", common_db) == 0
&& test.try_query(node, "CREATE OR REPLACE DATABASE %s", shard_db) == 0)
{
test.tprintf("Created databases '%s' and '%s'.", common_db, shard_db);
}
for (i = 0; i < Test->repl->N; i++) // nodes
{
Test->set_timeout(30);
Test->tprintf("Node %d\t", i);
Test->tprintf("Creating shard_db\t");
execute_query(Test->repl->nodes[i], "CREATE DATABASE shard_db");
Test->add_result(execute_query(Test->repl->nodes[i],
"GRANT SELECT,USAGE,CREATE ON shard_db.* TO 'user%d'@'%%'",
i),
"Query should succeed.");
execute_query(Test->repl->nodes[i], "FLUSH PRIVILEGES");
// Grant one user access to the common db on this node. Only the main test user has access to
// server-specific db:s.
test.try_query(node, "GRANT SELECT,USAGE,CREATE ON %s.* TO '%s'@'%%'", common_db, user_names[i]);
test.try_query(node, "FLUSH PRIVILEGES");
}
test.tprintf("%s", "----------\n");
Test->repl->close_connections();
Test->stop_timeout();
MYSQL* conn;
for (i = 0; i < Test->repl->N; i++)
{
Test->set_timeout(30);
sprintf(user_str, "user%d", i);
sprintf(pass_str, "pass%d", i);
Test->tprintf("Open connection to Sharding router using %s %s\n", user_str, pass_str);
conn = open_conn_db(Test->maxscales->rwsplit_port[0],
Test->maxscales->IP[0],
(char*) "shard_db",
user_str,
pass_str,
Test->ssl);
Test->add_result(execute_query(conn, "CREATE TABLE table%d (x1 int, fl int);", i),
"Query should succeed.");
}
test.repl->close_connections();
test.stop_timeout();
sleep(6); // The router is configured to refresh the shard map if older than 5 seconds.
for (i = 0; i < Test->repl->N; i++)
// Generate a table for each user on the common db. The tables should be on different backends since
// each user only has access to one node.
// Here, 'rwsplit_port' etc are actually ports to the schemarouter.
for (int i = 0; i < test.repl->N; i++)
{
Test->set_timeout(30);
sprintf(user_str, "user%d", i);
sprintf(pass_str, "pass%d", i);
Test->tprintf("Open connection to Sharding router using %s %s\n", user_str, pass_str);
conn = open_conn_db(Test->maxscales->rwsplit_port[0],
Test->maxscales->IP[0],
(char*) "shard_db",
user_str,
pass_str,
Test->ssl);
auto user = user_names[i];
auto pass = user_pws[i];
auto table = shard_tables[i];
sprintf(str, "SHOW TABLES;");
Test->tprintf("%s\n", str);
sprintf(str1, "table%d", i);
Test->tprintf("Table should be %s\n", str1);
Test->add_result(execute_query_check_one(conn, str, str1), "check failed\n");
mysql_ping(conn);
test.tprintf(opening_fmt, user, pass, common_db);
auto conn = open_conn_db(test.maxscales->rwsplit_port[0], test.maxscales->IP[0],
common_db, user, pass, test.ssl);
if (test.try_query(conn, "CREATE TABLE %s (x1 int, fl int);", table) == 0)
{
test.tprintf("Table '%s.%s' for user '%s' created.", common_db, table, user);
}
mysql_close(conn);
}
Test->maxscales->connect_rwsplit(0);
sleep(6); // Again, wait for shard info update.
Test->tprintf("Trying USE shard_db\n");
execute_query(Test->maxscales->conn_rwsplit[0], "USE shard_db");
for (i = 0; i < Test->repl->N; i++)
for (int i = 0; i < N; i++)
{
Test->add_result(execute_query(Test->maxscales->conn_rwsplit[0], "USE shard_db%d", i),
"Query should succeed.");
auto user = user_names[i];
auto pass = user_pws[i];
auto table = shard_tables[i];
test.tprintf(opening_fmt, user, pass, common_db);
auto conn = open_conn_db(test.maxscales->rwsplit_port[0], test.maxscales->IP[0],
common_db, user, pass, test.ssl);
const char* query = "SHOW TABLES;";
test.tprintf("Table should be %s\n", table);
test.add_result(execute_query_check_one(conn, query, table), "check failed\n");
mysql_close(conn);
}
mysql_close(Test->maxscales->conn_rwsplit[0]);
Test->tprintf("Trying to connect with empty database name\n");
conn = open_conn_db(Test->maxscales->rwsplit_port[0],
Test->maxscales->IP[0],
(char*) "",
user_str,
pass_str,
Test->ssl);
mysql_close(conn);
Test->stop_timeout();
Test->log_excludes(0, "Length (0) is 0");
Test->log_excludes(0, "Unable to parse query");
Test->log_excludes(0, "query string allocation failed");
Test->repl->connect();
/** Cleanup */
for (i = 0; i < Test->repl->N; i++)
// Test accessing all databases as the admin.
test.maxscales->connect_rwsplit(0);
auto conn = test.maxscales->conn_rwsplit[0]; // Is a schemarouter connection.
test.try_query(conn, "USE %s", common_db);
for (int i = 0; i < N; i++)
{
for (j = 0; j < Test->repl->N; j++)
test.try_query(conn, "USE %s", shard_dbs[i]);
}
if (test.ok())
{
test.tprintf("%s", "All databases are present.");
}
test.maxscales->close_rwsplit(0);
test.tprintf("Test connecting with empty database name for all users.\n");
for (int i = 0; i < N; i++)
{
auto user = user_names[i];
auto pass = user_pws[i];
conn = open_conn_db(test.maxscales->rwsplit_port[0], test.maxscales->IP[0],
"", user, pass, test.ssl);
test.expect(conn, "Connection failed for user '%s'.", user);
mysql_close(conn);
}
if (test.ok())
{
test.tprintf("Connections succeeded.");
}
test.stop_timeout();
test.log_excludes(0, "Length (0) is 0");
test.log_excludes(0, "Unable to parse query");
test.log_excludes(0, "query string allocation failed");
test.repl->connect();
/** Cleanup */
for (int i = 0; i < N; i++)
{
conn = test.repl->nodes[i];
for (int j = 0; j < N; j++)
{
Test->set_timeout(30);
execute_query(Test->repl->nodes[i], "DROP USER 'user%d'@'%%';", j);
execute_query(Test->repl->nodes[i], "DROP DATABASE IF EXISTS shard_db");
test.try_query(conn, "DROP USER '%s'@'%%';", user_names[j]);
}
execute_query(Test->repl->nodes[i], "DROP DATABASE IF EXISTS shard_db%d", i);
test.try_query(conn, "DROP DATABASE %s", common_db);
test.try_query(conn, "DROP DATABASE %s", shard_dbs[i]);
}
Test->repl->execute_query_all_nodes("START SLAVE");
sleep(1);
Test->repl->fix_replication();
int rval = Test->global_result;
delete Test;
return rval;
test.repl->execute_query_all_nodes("START SLAVE");
sleep(2);
test.repl->fix_replication();
return test.global_result;
}