MXS-2900 Move testcore library files to a dedicated directory

The library is now named "maxtest". The related include-files are, for
now, usable without designating the full include path. This may be changed
later, but would require modifying every test.
This commit is contained in:
Esa Korhonen
2019-11-22 14:58:09 +02:00
parent e180c20055
commit 96ba2da40c
58 changed files with 43 additions and 19 deletions

View File

@ -0,0 +1,31 @@
add_library(maxtest SHARED
stopwatch.cpp
big_load.cpp
big_transaction.cpp
blob_test.cpp
config_operations.cpp
different_size.cpp
envv.cpp
execute_cmd.cpp
fw_copy_rules.cpp
get_com_select_insert.cpp
get_my_ip.cpp
keepalived_func.cpp
labels_table.cpp
mariadb_func.cpp
mariadb_nodes.cpp
maxadmin_operations.cpp
maxinfo_func.cpp
maxscales.cpp
nodes.cpp
rds_vpc.cpp
sql_t1.cpp
tcp_connection.cpp
test_binlog_fnc.cpp
testconnections.cpp
# Include the CDC connector in the core library
${CMAKE_SOURCE_DIR}/connectors/cdc-connector/cdc_connector.cpp)
target_link_libraries(maxtest ${MARIADB_CONNECTOR_LIBRARIES} ${JANSSON_LIBRARIES} z m pthread ssl dl rt crypto crypt maxbase)
set_target_properties(maxtest PROPERTIES VERSION "1.0.0" LINK_FLAGS -Wl,-z,defs)
install(TARGETS maxtest DESTINATION system-test)
add_dependencies(maxtest connector-c jansson maxbase)

View File

@ -0,0 +1,234 @@
#include "big_load.h"
#include <pthread.h>
void load(long int* new_inserts,
long int* new_selects,
long int* selects,
long int* inserts,
int threads_num,
TestConnections* Test,
long int* i1,
long int* i2,
int rwsplit_only,
bool galera,
bool report_errors)
{
char sql[1000000];
thread_data data;
Mariadb_nodes* nodes;
if (galera)
{
nodes = Test->galera;
}
else
{
nodes = Test->repl;
}
int sql_l = 20000;
int run_time = 100;
if (Test->smoke)
{
sql_l = 500;
run_time = 10;
}
nodes->connect();
Test->maxscales->connect_rwsplit(0);
data.i1 = 0;
data.i2 = 0;
data.exit_flag = 0;
data.Test = Test;
data.rwsplit_only = rwsplit_only;
// connect to the MaxScale server (rwsplit)
if (Test->maxscales->conn_rwsplit[0] == NULL)
{
if (report_errors)
{
Test->add_result(1, "Can't connect to MaxScale\n");
}
// Test->copy_all_logs();
exit(1);
}
else
{
create_t1(Test->maxscales->conn_rwsplit[0]);
create_insert_string(sql, sql_l, 1);
if ((execute_query(Test->maxscales->conn_rwsplit[0], "%s", sql) != 0) && (report_errors))
{
Test->add_result(1, "Query %s failed\n", sql);
}
// close connections
Test->maxscales->close_rwsplit(0);
Test->tprintf("Waiting for the table to replicate\n");
nodes->sync_slaves();
pthread_t thread1[threads_num];
pthread_t thread2[threads_num];
Test->tprintf("COM_INSERT and COM_SELECT before executing test\n");
Test->add_result(get_global_status_allnodes(&selects[0], &inserts[0], nodes, 0),
"get_global_status_allnodes failed\n");
data.exit_flag = 0;
/* Create independent threads each of them will execute function */
for (int i = 0; i < threads_num; i++)
{
pthread_create(&thread1[i], NULL, query_thread1, &data);
pthread_create(&thread2[i], NULL, query_thread2, &data);
}
Test->tprintf("Threads are running %d seconds \n", run_time);
sleep(run_time);
data.exit_flag = 1;
Test->tprintf("Waiting for all threads to exit\n");
Test->set_timeout(100);
for (int i = 0; i < threads_num; i++)
{
pthread_join(thread1[i], NULL);
pthread_join(thread2[i], NULL);
}
sleep(1);
Test->tprintf("COM_INSERT and COM_SELECT after executing test\n");
get_global_status_allnodes(&new_selects[0], &new_inserts[0], nodes, 0);
print_delta(&new_selects[0], &new_inserts[0], &selects[0], &inserts[0], nodes->N);
Test->tprintf("First group of threads did %d queries, second - %d \n", data.i1, data.i2);
}
nodes->close_connections();
*i1 = data.i1;
*i2 = data.i2;
}
void* query_thread1(void* ptr)
{
MYSQL* conn1;
MYSQL* conn2;
MYSQL* conn3;
int conn_err = 0;
thread_data* data = (thread_data*) ptr;
conn1 = open_conn_db_timeout(data->Test->maxscales->rwsplit_port[0],
data->Test->maxscales->IP[0],
(char*) "test",
data->Test->maxscales->user_name,
data->Test->maxscales->password,
20,
data->Test->ssl);
// conn1 = data->Test->maxscales->open_rwsplit_connection(0);
if (mysql_errno(conn1) != 0)
{
conn_err++;
}
if (data->rwsplit_only == 0)
{
// conn2 = data->Test->maxscales->open_readconn_master_connection(0);
conn2 = open_conn_db_timeout(data->Test->maxscales->readconn_master_port[0],
data->Test->maxscales->IP[0],
(char*) "test",
data->Test->maxscales->user_name,
data->Test->maxscales->password,
20,
data->Test->ssl);
if (mysql_errno(conn2) != 0)
{
conn_err++;
}
// conn3 = data->Test->maxscales->open_readconn_slave_connection(0);
conn3 = open_conn_db_timeout(data->Test->maxscales->readconn_slave_port[0],
data->Test->maxscales->IP[0],
(char*) "test",
data->Test->maxscales->user_name,
data->Test->maxscales->password,
20,
data->Test->ssl);
if (mysql_errno(conn3) != 0)
{
conn_err++;
}
}
if (conn_err == 0)
{
while (data->exit_flag == 0)
{
if (execute_query_silent(conn1, (char*) "SELECT * FROM t1;") == 0)
{
__sync_fetch_and_add(&data->i1, 1);
}
if (data->rwsplit_only == 0)
{
execute_query_silent(conn2, (char*) "SELECT * FROM t1;");
execute_query_silent(conn3, (char*) "SELECT * FROM t1;");
}
}
mysql_close(conn1);
if (data->rwsplit_only == 0)
{
mysql_close(conn2);
mysql_close(conn3);
}
}
return NULL;
}
void* query_thread2(void* ptr)
{
MYSQL* conn1;
MYSQL* conn2;
MYSQL* conn3;
thread_data* data = (thread_data*) ptr;
// conn1 = data->Test->maxscales->open_rwsplit_connection(0);
conn1 = open_conn_db_timeout(data->Test->maxscales->rwsplit_port[0],
data->Test->maxscales->IP[0],
(char*) "test",
data->Test->maxscales->user_name,
data->Test->maxscales->password,
20,
data->Test->ssl);
if (data->rwsplit_only == 0)
{
// conn2 = data->Test->maxscales->open_readconn_master_connection(0);
// conn3 = data->Test->maxscales->open_readconn_slave_connection(0);
conn2 = open_conn_db_timeout(data->Test->maxscales->readconn_master_port[0],
data->Test->maxscales->IP[0],
(char*) "test",
data->Test->maxscales->user_name,
data->Test->maxscales->password,
20,
data->Test->ssl);
// if (mysql_errno(conn2) != 0) { conn_err++; }
conn3 = open_conn_db_timeout(data->Test->maxscales->readconn_slave_port[0],
data->Test->maxscales->IP[0],
(char*) "test",
data->Test->maxscales->user_name,
data->Test->maxscales->password,
20,
data->Test->ssl);
// if (mysql_errno(conn3) != 0) { conn_err++; }
}
while (data->exit_flag == 0)
{
sleep(1);
if (execute_query_silent(conn1, (char*) "SELECT * FROM t1;") == 0)
{
__sync_fetch_and_add(&data->i2, 1);
}
if (data->rwsplit_only == 0)
{
execute_query_silent(conn2, (char*) "SELECT * FROM t1;");
execute_query_silent(conn3, (char*) "SELECT * FROM t1;");
}
}
mysql_close(conn1);
if (data->rwsplit_only == 0)
{
mysql_close(conn2);
mysql_close(conn3);
}
return NULL;
}

View File

@ -0,0 +1,23 @@
#include "big_transaction.h"
int big_transaction(MYSQL* conn, int N)
{
int local_result = 0;
char sql[1000000];
local_result += create_t1(conn);
local_result += execute_query(conn, (char*) "START TRANSACTION");
local_result += execute_query(conn, (char*) "SET autocommit = 0");
for (int i = 0; i < N; i++)
{
create_insert_string(sql, 10000, i);
local_result += execute_query(conn, "%s", sql);
local_result += execute_query(conn, "CREATE TABLE t2(id int);");
local_result += execute_query(conn, "%s", sql);
local_result += execute_query(conn, "DROP TABLE t2;");
local_result += execute_query(conn, "%s", sql);
}
local_result += execute_query(conn, (char*) "COMMIT");
return local_result;
}

View File

@ -0,0 +1,211 @@
#include "blob_test.h"
int test_longblob(TestConnections* Test,
MYSQL* conn,
char* blob_name,
unsigned long chunk_size,
int chunks,
int rows)
{
int size = chunk_size;
unsigned long* data;
int i, j;
MYSQL_BIND param[1];
char sql[256];
int global_res = Test->global_result;
// Test->tprintf("chunk size %lu chunks %d inserts %d\n", chunk_size, chunks, rows);
char* insert_stmt = (char*) "INSERT INTO long_blob_table(x, b) VALUES(1, ?)";
Test->tprintf("Creating table with %s\n", blob_name);
Test->try_query(conn, (char*) "DROP TABLE IF EXISTS long_blob_table");
sprintf(sql,
"CREATE TABLE long_blob_table(id int NOT NULL AUTO_INCREMENT, x INT, b %s, PRIMARY KEY (id))",
blob_name);
Test->try_query(conn, "%s", sql);
for (int k = 0; k < rows; k++)
{
Test->tprintf("Preparintg INSERT stmt\n");
MYSQL_STMT* stmt = mysql_stmt_init(conn);
if (stmt == NULL)
{
Test->add_result(1, "stmt init error: %s\n", mysql_error(conn));
}
Test->add_result(mysql_stmt_prepare(stmt, insert_stmt, strlen(insert_stmt)),
"Error preparing stmt: %s\n",
mysql_stmt_error(stmt));
param[0].buffer_type = MYSQL_TYPE_STRING;
param[0].is_null = 0;
Test->tprintf("Binding parameter\n");
Test->add_result(mysql_stmt_bind_param(stmt, param),
"Error parameter binding: %s\n",
mysql_stmt_error(stmt));
Test->tprintf("Filling buffer\n");
data = (unsigned long*) malloc(size * sizeof(long int));
if (data == NULL)
{
Test->add_result(1, "Memory allocation error\n");
}
Test->tprintf("Sending data in %d bytes chunks, total size is %d\n",
size * sizeof(unsigned long),
(size * sizeof(unsigned long)) * chunks);
for (i = 0; i < chunks; i++)
{
for (j = 0; j < size; j++)
{
data[j] = j + i * size;
}
Test->set_timeout(300);
Test->tprintf("Chunk #%d\n", i);
if (mysql_stmt_send_long_data(stmt, 0, (char*) data, size * sizeof(unsigned long)) != 0)
{
Test->add_result(1,
"Error inserting data, iteration %d, error %s\n",
i,
mysql_stmt_error(stmt));
return 1;
}
}
// for (int k = 0; k < rows; k++)
// {
Test->tprintf("Executing statement: %02d\n", k);
Test->set_timeout(3000);
Test->add_result(mysql_stmt_execute(stmt),
"INSERT Statement with %s failed, error is %s\n",
blob_name,
mysql_stmt_error(stmt));
// }
Test->add_result(mysql_stmt_close(stmt), "Error closing stmt\n");
}
if (global_res == Test->global_result)
{
Test->tprintf("%s is OK\n", blob_name);
}
else
{
Test->tprintf("%s FAILED\n", blob_name);
}
return 0;
}
int check_longblob_data(TestConnections* Test,
MYSQL* conn,
unsigned long chunk_size,
int chunks,
int rows)
{
// char *select_stmt = (char *) "SELECT id, x, b FROM long_blob_table WHERE id = ?";
char* select_stmt = (char*) "SELECT id, x, b FROM long_blob_table ";
MYSQL_STMT* stmt = mysql_stmt_init(Test->maxscales->conn_rwsplit[0]);
if (stmt == NULL)
{
Test->add_result(1, "stmt init error: %s\n", mysql_error(Test->maxscales->conn_rwsplit[0]));
}
Test->add_result(mysql_stmt_prepare(stmt, select_stmt, strlen(select_stmt)),
"Error preparing stmt: %s\n",
mysql_stmt_error(stmt));
MYSQL_BIND param[1], result[3];
int id = 1;
memset(param, 0, sizeof(param));
memset(result, 0, sizeof(result));
param[0].buffer_type = MYSQL_TYPE_LONG;
param[0].buffer = &id;
unsigned long* data = (unsigned long*) malloc(chunk_size * chunks * sizeof(long int));
int r_id;
int r_x;
unsigned long l_id;
unsigned long l_x;
my_bool b_id;
my_bool b_x;
my_bool e_id;
my_bool e_x;
result[0].buffer_type = MYSQL_TYPE_LONG;
result[0].buffer = &r_id;
result[0].buffer_length = 0;
result[0].length = &l_id;
result[0].is_null = &b_id;
result[0].error = &e_id;
result[1].buffer_type = MYSQL_TYPE_LONG;
result[1].buffer = &r_x;
result[1].buffer_length = 0;
result[1].length = &l_x;
result[1].is_null = &b_x;
result[1].error = &e_x;
result[2].buffer_type = MYSQL_TYPE_LONG_BLOB;
result[2].buffer = data;
result[2].buffer_length = chunk_size * chunks * sizeof(long int);
/*
* if (mysql_stmt_bind_param(stmt, param) != 0)
* {
* printf("Could not bind parameters\n");
* return 1;
* }
*/
if (mysql_stmt_bind_result(stmt, result) != 0)
{
printf("Could not bind results: %s\n", mysql_stmt_error(stmt));
return 1;
}
if (mysql_stmt_execute(stmt) != 0)
{
Test->tprintf("Error executing stmt %s\n", mysql_error(Test->maxscales->conn_rwsplit[0]));
}
if (mysql_stmt_store_result(stmt) != 0)
{
printf("Could not buffer result set: %s\n", mysql_stmt_error(stmt));
return 1;
}
int row = 0;
while (!mysql_stmt_fetch(stmt))
{
Test->tprintf("id=%d\tx=%d\n", r_id, r_x);
if (r_id != row + 1)
{
Test->add_result(1, "id field is wrong! Expected %d, but it is %d\n", row + 1, r_id);
}
for (int y = 0; y < (int)chunk_size * chunks; y++)
{
if ((int)data[y] != y)
{
Test->add_result(1, "expected %lu, got %d", data[y], y);
break;
}
}
row++;
}
if (row != rows)
{
Test->add_result(1, "Wrong number of rows in the table! Expected %d, but it is %d\n", rows, row);
}
mysql_stmt_free_result(stmt);
mysql_stmt_close(stmt);
return 0;
}

View File

@ -0,0 +1,235 @@
#include "config_operations.h"
// The configuration should use these names for the services, listeners and monitors
#define SERVICE_NAME1 "rwsplit-service"
#define SERVICE_NAME2 "read-connection-router-master"
#define SERVICE_NAME3 "read-connection-router-slave"
#define LISTENER_NAME1 "rwsplit-service-listener"
#define LISTENER_NAME2 "read-connection-router-master-listener"
#define LISTENER_NAME3 "read-connection-router-slave-listener"
struct
{
const char* service;
const char* listener;
int port;
} services[]
{
{SERVICE_NAME1, LISTENER_NAME1, 4006},
{SERVICE_NAME2, LISTENER_NAME2, 4008},
{SERVICE_NAME3, LISTENER_NAME3, 4009}
};
Config::Config(TestConnections* parent)
: test_(parent)
{
}
Config::~Config()
{
}
void Config::add_server(int num)
{
test_->tprintf("Adding the servers");
test_->set_timeout(120);
test_->maxscales->ssh_node_f(0, true, "maxadmin add server server%d " SERVICE_NAME1, num);
test_->maxscales->ssh_node_f(0, true, "maxadmin add server server%d " SERVICE_NAME2, num);
test_->maxscales->ssh_node_f(0, true, "maxadmin add server server%d " SERVICE_NAME3, num);
for (auto& a : created_monitors_)
{
test_->maxscales->ssh_node_f(0, true, "maxadmin add server server%d %s", num, a.c_str());
}
test_->stop_timeout();
}
void Config::remove_server(int num)
{
test_->set_timeout(120);
test_->maxscales->ssh_node_f(0, true, "maxadmin remove server server%d " SERVICE_NAME1, num);
test_->maxscales->ssh_node_f(0, true, "maxadmin remove server server%d " SERVICE_NAME2, num);
test_->maxscales->ssh_node_f(0, true, "maxadmin remove server server%d " SERVICE_NAME3, num);
for (auto& a : created_monitors_)
{
test_->maxscales->ssh_node_f(0, true, "maxadmin remove server server%d %s", num, a.c_str());
}
test_->stop_timeout();
}
void Config::add_created_servers(const char* object)
{
for (auto a : created_servers_)
{
test_->maxscales->ssh_node_f(0, true, "maxadmin add server server%d %s", a, object);
}
}
void Config::destroy_server(int num)
{
test_->set_timeout(120);
test_->maxscales->ssh_node_f(0, true, "maxadmin destroy server server%d", num);
created_servers_.erase(num);
test_->stop_timeout();
}
void Config::create_server(int num)
{
test_->set_timeout(120);
test_->maxscales->ssh_node_f(0,
true,
"maxadmin create server server%d %s %d",
num,
test_->repl->IP_private[num],
test_->repl->port[num]);
created_servers_.insert(num);
test_->stop_timeout();
}
void Config::alter_server(int num, const char* key, const char* value)
{
test_->maxscales->ssh_node_f(0, true, "maxadmin alter server server%d %s=%s", num, key, value);
}
void Config::alter_server(int num, const char* key, int value)
{
test_->maxscales->ssh_node_f(0, true, "maxadmin alter server server%d %s=%d", num, key, value);
}
void Config::alter_server(int num, const char* key, float value)
{
test_->maxscales->ssh_node_f(0, true, "maxadmin alter server server%d %s=%f", num, key, value);
}
void Config::create_monitor(const char* name, const char* module, int interval)
{
test_->set_timeout(120);
test_->maxscales->ssh_node_f(0, true, "maxadmin create monitor %s %s", name, module);
alter_monitor(name, "monitor_interval", interval);
alter_monitor(name, "user", test_->maxscales->user_name);
alter_monitor(name, "password", test_->maxscales->password);
test_->maxscales->ssh_node_f(0, true, "maxadmin restart monitor %s", name);
test_->stop_timeout();
created_monitors_.insert(std::string(name));
}
void Config::alter_monitor(const char* name, const char* key, const char* value)
{
test_->maxscales->ssh_node_f(0, true, "maxadmin alter monitor %s %s=%s", name, key, value);
}
void Config::alter_monitor(const char* name, const char* key, int value)
{
test_->maxscales->ssh_node_f(0, true, "maxadmin alter monitor %s %s=%d", name, key, value);
}
void Config::alter_monitor(const char* name, const char* key, float value)
{
test_->maxscales->ssh_node_f(0, true, "maxadmin alter monitor %s %s=%f", name, key, value);
}
void Config::start_monitor(const char* name)
{
test_->maxscales->ssh_node_f(0, true, "maxadmin restart monitor %s", name);
}
void Config::destroy_monitor(const char* name)
{
test_->set_timeout(120);
test_->maxscales->ssh_node_f(0, true, "maxadmin destroy monitor %s", name);
test_->stop_timeout();
created_monitors_.erase(std::string(name));
}
void Config::restart_monitors()
{
for (auto& a : created_monitors_)
{
test_->maxscales->ssh_node_f(0, true, "maxadmin shutdown monitor \"%s\"", a.c_str());
test_->maxscales->ssh_node_f(0, true, "maxadmin restart monitor \"%s\"", a.c_str());
}
}
void Config::create_listener(Config::Service service)
{
int i = static_cast<int>(service);
test_->set_timeout(120);
test_->maxscales->ssh_node_f(0,
true,
"maxadmin create listener %s %s default %d",
services[i].service,
services[i].listener,
services[i].port);
test_->stop_timeout();
}
void Config::create_ssl_listener(Config::Service service)
{
int i = static_cast<int>(service);
test_->set_timeout(120);
test_->maxscales->ssh_node_f(0,
true,
"maxadmin create listener %s %s default %d default default default "
"/home/vagrant/certs/server-key.pem "
"/home/vagrant/certs/server-cert.pem "
"/home/vagrant/certs/ca.pem ",
services[i].service,
services[i].listener,
services[i].port);
test_->stop_timeout();
}
void Config::destroy_listener(Config::Service service)
{
int i = static_cast<int>(service);
test_->set_timeout(120);
test_->maxscales->ssh_node_f(0,
true,
"maxadmin destroy listener %s %s",
services[i].service,
services[i].listener);
test_->stop_timeout();
}
void Config::create_all_listeners()
{
create_listener(SERVICE_RWSPLIT);
create_listener(SERVICE_RCONN_SLAVE);
create_listener(SERVICE_RCONN_MASTER);
}
void Config::reset()
{
/** Make sure the servers exist before checking that connectivity is OK */
for (int i = 0; i < test_->repl->N; i++)
{
if (created_servers_.find(i) == created_servers_.end())
{
create_server(i);
add_server(i);
}
}
}
bool Config::check_server_count(int expected)
{
bool rval = true;
if (test_->maxscales->ssh_node_f(0,
true,
"test \"`maxadmin list servers|grep 'server[0-9]'|wc -l`\" == \"%d\"",
expected))
{
test_->add_result(1, "Number of servers is not %d.", expected);
rval = false;
}
return rval;
}

View File

@ -0,0 +1,107 @@
#include <iostream>
#include <unistd.h>
#include "testconnections.h"
using namespace std;
char* create_event_size(unsigned long size)
{
char* prefix = (char*) "insert into test.large_event values (1, '";
unsigned long prefix_size = strlen(prefix);
char* postfix = (char*) "');";
char* event = (char*)malloc(size + 1);
strcpy(event, prefix);
unsigned long max = size - 55 - 45;
// printf("BLOB data size %lu\n", max);
for (unsigned long i = 0; i < max; i++)
{
event[i + prefix_size] = 'a';
}
strcpy((char*) event + max + prefix_size, postfix);
return event;
}
MYSQL* connect_to_serv(TestConnections* Test, bool binlog)
{
MYSQL* conn;
if (binlog)
{
conn = open_conn(Test->repl->port[0],
Test->repl->IP[0],
Test->repl->user_name,
Test->repl->password,
Test->ssl);
}
else
{
conn = Test->maxscales->open_rwsplit_connection(0);
}
return conn;
}
void set_max_packet(TestConnections* Test, bool binlog, char* cmd)
{
Test->tprintf("Setting maximum packet size ...");
if (binlog)
{
Test->repl->connect();
Test->try_query(Test->repl->nodes[0], "%s", cmd);
Test->repl->close_connections();
}
else
{
Test->maxscales->connect_maxscale(0);
Test->try_query(Test->maxscales->conn_rwsplit[0], "%s", cmd);
Test->maxscales->close_maxscale_connections(0);
}
Test->tprintf(".. done\n");
}
void different_packet_size(TestConnections* Test, bool binlog)
{
Test->set_timeout(180);
Test->tprintf("Set big max_allowed_packet\n");
set_max_packet(Test, binlog, (char*) "set global max_allowed_packet = 200000000;");
Test->set_timeout(120);
Test->tprintf("Create table\n");
MYSQL* conn = connect_to_serv(Test, binlog);
Test->try_query(conn,
"DROP TABLE IF EXISTS test.large_event;"
"CREATE TABLE test.large_event(id INT, data LONGBLOB);");
mysql_close(conn);
const int loops = 3;
const int range = 2;
for (int i = 1; i <= loops; i++)
{
for (int j = -range; j <= range; j++)
{
size_t size = 0x0ffffff * i + j;
Test->tprintf("Trying event app. %lu bytes", size);
Test->set_timeout(1000);
char* event = create_event_size(size);
conn = connect_to_serv(Test, binlog);
Test->expect(execute_query_silent(conn, event) == 0, "Query should succeed");
free(event);
execute_query_silent(conn, (char*) "DELETE FROM test.large_event");
mysql_close(conn);
}
}
Test->set_timeout(120);
Test->tprintf("Restoring max_allowed_packet");
set_max_packet(Test, binlog, (char*) "set global max_allowed_packet = 1048576;");
Test->set_timeout(1000);
conn = connect_to_serv(Test, binlog);
Test->try_query(conn, "DROP TABLE test.large_event");
mysql_close(conn);
}

View File

@ -0,0 +1,61 @@
#include <string.h>
#include <string>
#include "envv.h"
char * readenv(const char * name, const char *format, ...)
{
char * env = getenv(name);
if (!env)
{
va_list valist;
va_start(valist, format);
int message_len = vsnprintf(NULL, 0, format, valist);
va_end(valist);
if (message_len < 0)
{
return NULL;
}
env = (char*)malloc(message_len + 1);
va_start(valist, format);
vsnprintf(env, message_len + 1, format, valist);
va_end(valist);
setenv(name, env, 1);
}
return env;
}
int readenv_int(const char * name, int def)
{
int x;
char * env = getenv(name);
if (env)
{
sscanf(env, "%d", &x);
}
else
{
x = def;
setenv(name, (std::to_string(x).c_str()), 1);
}
return x;
}
bool readenv_bool(const char * name, bool def)
{
char * env = getenv(name);
if (env)
{
return ((strcasecmp(env, "yes") == 0) ||
(strcasecmp(env, "y") == 0) ||
(strcasecmp(env, "true") == 0));
}
else
{
setenv(name, def ? "true" : "false", 1);
return def;
}
}

View File

@ -0,0 +1,42 @@
#include <iostream>
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include "execute_cmd.h"
using namespace std;
int execute_cmd(char* cmd, char** res)
{
char* result;
FILE* output = popen(cmd, "r");
if (output == NULL)
{
printf("Error opening ssh %s\n", strerror(errno));
return -1;
}
char buffer[10240];
size_t rsize = sizeof(buffer);
result = (char*)calloc(rsize, sizeof(char));
while (fgets(buffer, sizeof(buffer), output))
{
result = (char*)realloc(result, sizeof(buffer) + rsize);
rsize += sizeof(buffer);
strcat(result, buffer);
}
* res = result;
int return_code = pclose(output);
if (WIFEXITED(return_code))
{
return WEXITSTATUS(return_code);
}
else
{
return -1;
}
}

View File

@ -0,0 +1,29 @@
#include "fw_copy_rules.h"
#include <sstream>
void copy_rules(TestConnections* Test, const char* rules_name, const char* rules_dir)
{
std::stringstream src;
std::stringstream dest;
Test->maxscales->ssh_node_f(0,
true,
"cd %s;"
"rm -rf rules;"
"mkdir rules;"
"chown %s:%s rules",
Test->maxscales->access_homedir[0],
Test->maxscales->access_user[0],
Test->maxscales->access_user[0]);
src << rules_dir << "/" << rules_name;
dest << Test->maxscales->access_homedir[0] << "/rules/rules.txt";
Test->set_timeout(30);
Test->maxscales->copy_to_node_legacy(src.str().c_str(), dest.str().c_str(), 0);
Test->maxscales->ssh_node_f(0,
true,
"chmod a+r %s",
dest.str().c_str());
Test->stop_timeout();
}

View File

@ -0,0 +1,105 @@
#include "testconnections.h"
/**
* Reads COM_SELECT and COM_INSERT variables from all nodes and stores into 'selects' and 'inserts'
*/
int get_global_status_allnodes(long int* selects, long int* inserts, Mariadb_nodes* nodes, int silent)
{
int i;
MYSQL_RES* res;
MYSQL_ROW row;
for (i = 0; i < nodes->N; i++)
{
if (nodes->nodes[i] != NULL)
{
if (mysql_query(nodes->nodes[i], "show global status like 'COM_SELECT';") != 0)
{
printf("Error: can't execute SQL-query\n");
printf("%s\n", mysql_error(nodes->nodes[i]));
return 1;
}
res = mysql_store_result(nodes->nodes[i]);
if (res == NULL)
{
printf("Error: can't get the result description\n");
return 1;
}
if (mysql_num_rows(res) > 0)
{
while ((row = mysql_fetch_row(res)) != NULL)
{
if (silent == 0)
{
printf("Node %d COM_SELECT=%s\n", i, row[1]);
}
sscanf(row[1], "%ld", &selects[i]);
}
}
mysql_free_result(res);
while (mysql_next_result(nodes->nodes[i]) == 0)
{
res = mysql_store_result(nodes->nodes[i]);
mysql_free_result(res);
}
if (mysql_query(nodes->nodes[i], "show global status like 'COM_INSERT';") != 0)
{
printf("Error: can't execute SQL-query\n");
}
res = mysql_store_result(nodes->nodes[i]);
if (res == NULL)
{
printf("Error: can't get the result description\n");
}
if (mysql_num_rows(res) > 0)
{
while ((row = mysql_fetch_row(res)) != NULL)
{
if (silent == 0)
{
printf("Node %d COM_INSERT=%s\n", i, row[1]);
}
sscanf(row[1], "%ld", &inserts[i]);
}
}
mysql_free_result(res);
while (mysql_next_result(nodes->nodes[i]) == 0)
{
res = mysql_store_result(nodes->nodes[i]);
mysql_free_result(res);
}
}
else
{
selects[i] = 0;
inserts[i] = 0;
}
}
return 0;
}
/**
* Prints difference in COM_SELECT and COM_INSERT
*/
int print_delta(long int* new_selects,
long int* new_inserts,
long int* selects,
long int* inserts,
int nodes_num)
{
int i;
for (i = 0; i < nodes_num; i++)
{
printf("COM_SELECT increase on node %d is %ld\n", i, new_selects[i] - selects[i]);
printf("COM_INSERT increase on node %d is %ld\n", i, new_inserts[i] - inserts[i]);
}
return 0;
}

View File

@ -0,0 +1,60 @@
/*
* Find local ip used as source ip in ip packets.
* Use getsockname and a udp connection
*/
#include <stdio.h> // printf
#include <string.h> // memset
#include <errno.h> // errno
#include <sys/socket.h> // socket
#include <netinet/in.h> // sockaddr_in
#include <arpa/inet.h> // getsockname
#include <unistd.h> // close
#include "get_my_ip.h"
int get_my_ip(char* remote_ip, char* my_ip)
{
int dns_port = 53;
struct sockaddr_in serv;
int sock = socket (AF_INET, SOCK_DGRAM, 0);
// Socket could not be created
if (sock < 0)
{
return 1;
}
memset(&serv, 0, sizeof(serv));
serv.sin_family = AF_INET;
serv.sin_addr.s_addr = inet_addr(remote_ip);
serv.sin_port = htons(dns_port);
connect(sock, (const struct sockaddr*) &serv, sizeof(serv));
struct sockaddr_in name;
socklen_t namelen = sizeof(name);
getsockname(sock, (struct sockaddr*) &name, &namelen);
char buffer[100];
const char* p = inet_ntop(AF_INET, &name.sin_addr, buffer, 100);
if (p != NULL)
{
// printf("Local ip is : %s \n" , buffer);
strcpy(my_ip, buffer);
close(sock);
return 0;
}
else
{
// Some error
printf ("Error number : %d . Error message : %s \n", errno, strerror(errno));
close(sock);
return 2;
}
}

View File

@ -0,0 +1,76 @@
#include "keepalived_func.h"
#include "get_my_ip.h"
char* print_version_string(TestConnections* Test)
{
MYSQL* keepalived_conn = open_conn(Test->maxscales->rwsplit_port[0],
virtual_ip,
Test->maxscales->user_name,
Test->maxscales->password,
Test->ssl);
const char* version_string;
mariadb_get_info(keepalived_conn, MARIADB_CONNECTION_SERVER_VERSION, (void*)&version_string);
Test->tprintf("%s\n", version_string);
mysql_close(keepalived_conn);
return (char*) version_string;
}
void configure_keepalived(TestConnections* Test, char* keepalived_file)
{
int i;
char client_ip[24];
char* last_dot;
// Test->get_client_ip(0, client_ip);
get_my_ip(Test->maxscales->IP[0], client_ip);
last_dot = client_ip;
Test->tprintf("My IP is %s\n", client_ip);
for (i = 0; i < 3; i++)
{
last_dot = strstr(last_dot, ".");
last_dot = &last_dot[1];
}
last_dot[0] = '\0';
Test->tprintf("First part of IP is %s\n", client_ip);
sprintf(virtual_ip, "%s253", client_ip);
for (i = 0; i < Test->maxscales->N; i++)
{
std::string src = std::string(test_dir)
+ "/keepalived_cnf/"
+ std::string(keepalived_file)
+ std::to_string(i + 1)
+ ".conf";
std::string cp_cmd = "cp "
+ std::string(Test->maxscales->access_homedir[i])
+ std::string(keepalived_file)
+ std::to_string(i + 1) + ".conf "
+ " /etc/keepalived/keepalived.conf";
Test->tprintf("%s\n", src.c_str());
Test->tprintf("%s\n", cp_cmd.c_str());
Test->maxscales->ssh_node(i, "yum install -y keepalived", true);
Test->maxscales->ssh_node(i, "service iptables stop", true);
Test->maxscales->copy_to_node(i, src.c_str(), Test->maxscales->access_homedir[i]);
Test->maxscales->ssh_node(i, cp_cmd.c_str(), true);
Test->maxscales->ssh_node_f(i,
true,
"sed -i \"s/###virtual_ip###/%s/\" /etc/keepalived/keepalived.conf",
virtual_ip);
std::string script_src = std::string(test_dir) + "/keepalived_cnf/*.sh";
std::string script_cp_cmd = "cp " + std::string(Test->maxscales->access_homedir[i])
+ "*.sh /usr/bin/";
Test->maxscales->copy_to_node(i, script_src.c_str(), Test->maxscales->access_homedir[i]);
Test->maxscales->ssh_node(i, script_cp_cmd.c_str(), true);
Test->maxscales->ssh_node(i, "sudo service keepalived restart", true);
}
}
void stop_keepalived(TestConnections* Test)
{
for (int i = 0; i < Test->maxscales->N; i++)
{
Test->maxscales->ssh_node(i, "sudo service keepalived stop", true);
Test->maxscales->ssh_node(i, "killall -9 keepalived", true);
}
}

View File

@ -0,0 +1,24 @@
#include <cstring>
#include <string>
#include <stdio.h>
#include "labels_table.h"
#include "testconnections.h"
std::string get_mdbci_lables(const char *labels_string)
{
std::string mdbci_labels("MAXSCALE");
for (size_t i = 0; i < sizeof(labels_table) / sizeof(labels_table_t); i++)
{
std::string test_label = std::string(";") + labels_table[i].test_label;
if (strstr(labels_string, test_label.c_str()))
{
mdbci_labels += "," + labels_table[i].mdbci_label;
}
}
if (TestConnections::verbose)
{
printf("mdbci labels %s\n", mdbci_labels.c_str());
}
return mdbci_labels;
}

View File

@ -0,0 +1,594 @@
/**
* @file mariadb_func.cpp - basic DB interaction routines
*
* @verbatim
* Revision History
*
* Date Who Description
* 17/11/14 Timofey Turenko Initial implementation
*
* @endverbatim
*/
#include "mariadb_func.h"
#include "templates.h"
#include <ctype.h>
#include <sstream>
int set_ssl(MYSQL* conn)
{
char client_key[1024];
char client_cert[1024];
char ca[1024];
sprintf(client_key, "%s/ssl-cert/client-key.pem", test_dir);
sprintf(client_cert, "%s/ssl-cert/client-cert.pem", test_dir);
sprintf(ca, "%s/ssl-cert/ca.pem", test_dir);
return mysql_ssl_set(conn, client_key, client_cert, ca, NULL, NULL);
}
MYSQL* open_conn_db_flags(int port,
std::string ip,
std::string db,
std::string user,
std::string password,
unsigned long flag,
bool ssl)
{
MYSQL* conn = mysql_init(NULL);
if (conn == NULL)
{
fprintf(stdout, "Error: can't create MySQL-descriptor\n");
return NULL;
}
if (ssl)
{
set_ssl(conn);
}
// MXS-2568: This fixes mxs1828_double_local_infile
mysql_optionsv(conn, MYSQL_OPT_LOCAL_INFILE, (void*)"1");
mysql_real_connect(conn,
ip.c_str(),
user.c_str(),
password.c_str(),
db.c_str(),
port,
NULL,
flag);
return conn;
}
MYSQL* open_conn_db_timeout(int port,
std::string ip,
std::string db,
std::string user,
std::string password,
unsigned int timeout,
bool ssl)
{
MYSQL* conn = mysql_init(NULL);
if (conn == NULL)
{
fprintf(stdout, "Error: can't create MySQL-descriptor\n");
return NULL;
}
mysql_options(conn, MYSQL_OPT_CONNECT_TIMEOUT, &timeout);
mysql_options(conn, MYSQL_OPT_READ_TIMEOUT, &timeout);
mysql_options(conn, MYSQL_OPT_WRITE_TIMEOUT, &timeout);
// MXS-2568: This fixes mxs1828_double_local_infile
mysql_optionsv(conn, MYSQL_OPT_LOCAL_INFILE, (void*)"1");
if (ssl)
{
set_ssl(conn);
}
mysql_real_connect(conn,
ip.c_str(),
user.c_str(),
password.c_str(),
db.c_str(),
port,
NULL,
CLIENT_MULTI_STATEMENTS);
return conn;
}
int execute_query(MYSQL* conn, const char* format, ...)
{
va_list valist;
va_start(valist, format);
int message_len = vsnprintf(NULL, 0, format, valist);
va_end(valist);
char sql[message_len + 1];
va_start(valist, format);
vsnprintf(sql, sizeof(sql), format, valist);
va_end(valist);
return execute_query_silent(conn, sql, false);
}
int execute_query_from_file(MYSQL* conn, FILE* file)
{
int rc = -1;
char buf[4096];
if (fgets(buf, sizeof(buf), file))
{
char* nul = strchr(buf, '\0') - 1;
while (isspace(*nul))
{
*nul-- = '\0';
}
char* ptr = buf;
while (isspace(*ptr))
{
ptr++;
}
if (*ptr)
{
rc = execute_query_silent(conn, buf, false);
}
}
else if (!feof(file))
{
printf("Failed to read file: %d, %s", errno, strerror(errno));
rc = 1;
}
return rc;
}
int execute_query_silent(MYSQL* conn, const char* sql, bool silent)
{
MYSQL_RES* res;
if (conn != NULL)
{
if (mysql_query(conn, sql) != 0)
{
if (!silent)
{
int len = strlen(sql);
printf("Error: can't execute SQL-query: %.*s\n", len < 60 ? len : 60, sql);
printf("%s\n\n", mysql_error(conn));
}
return 1;
}
else
{
do
{
res = mysql_store_result(conn);
mysql_free_result(res);
}
while (mysql_next_result(conn) == 0);
return 0;
}
}
else
{
if (!silent)
{
printf("Connection is broken\n");
}
return 1;
}
}
int execute_query_check_one(MYSQL* conn, const char* sql, const char* expected)
{
int r = 1;
if (conn != NULL)
{
const int n_attempts = 3;
for (int i = 0; i < n_attempts && r != 0; i++)
{
if (i > 0)
{
sleep(1);
}
if (mysql_query(conn, sql) != 0)
{
printf("Error: can't execute SQL-query: %s\n", sql);
printf("%s\n\n", mysql_error(conn));
break;
}
else
{
do
{
MYSQL_RES* res = mysql_store_result(conn);
if (res)
{
if (mysql_num_rows(res) == 1)
{
MYSQL_ROW row = mysql_fetch_row(res);
if (row[0] != NULL)
{
if (strcmp(row[0], expected) == 0)
{
r = 0;
printf("First field is '%s' as expected\n", row[0]);
}
else
{
printf("First field is '%s', but expected '%s'\n", row[0], expected);
}
}
else
{
printf("First field is NULL\n");
}
}
else
{
printf("Number of rows is not 1, it is %llu\n", mysql_num_rows(res));
}
mysql_free_result(res);
}
}
while (mysql_next_result(conn) == 0);
}
}
}
else
{
printf("Connection is broken\n");
}
return r;
}
int execute_query_affected_rows(MYSQL* conn, const char* sql, my_ulonglong* affected_rows)
{
MYSQL_RES* res;
if (conn != NULL)
{
if (mysql_query(conn, sql) != 0)
{
printf("Error: can't execute SQL-query: %s\n", sql);
printf("%s\n\n", mysql_error(conn));
return 1;
}
else
{
do
{
*affected_rows = mysql_affected_rows(conn);
res = mysql_store_result(conn);
mysql_free_result(res);
}
while (mysql_next_result(conn) == 0);
return 0;
}
}
else
{
printf("Connection is broken\n");
return 1;
}
}
int execute_query_num_of_rows(MYSQL* conn,
const char* sql,
my_ulonglong* num_of_rows,
unsigned long long* i)
{
MYSQL_RES* res;
my_ulonglong N;
printf("%s\n", sql);
if (conn != NULL)
{
if (mysql_query(conn, sql) != 0)
{
printf("Error: can't execute SQL-query: %s\n", sql);
printf("%s\n\n", mysql_error(conn));
* i = 0;
return 1;
}
else
{
*i = 0;
do
{
res = mysql_store_result(conn);
if (res != NULL)
{
N = mysql_num_rows(res);
mysql_free_result(res);
}
else
{
N = 0;
}
num_of_rows[*i] = N;
*i = *i + 1;
}
while (mysql_next_result(conn) == 0);
return 0;
}
}
else
{
printf("Connection is broken\n");
* i = 0;
return 1;
}
}
int execute_stmt_num_of_rows(MYSQL_STMT* stmt, my_ulonglong* num_of_rows, unsigned long long* i)
{
my_ulonglong N;
/* This is debug hack; compatible only with t1 from t1_sql.h
* my_ulonglong k;
* MYSQL_BIND bind[2];
* my_ulonglong x1;
* my_ulonglong fl;
*
* unsigned long length[2];
* my_bool is_null[2];
* my_bool error[2];
*
* memset(bind, 0, sizeof(bind));
* bind[0].buffer = &x1;
* bind[0].buffer_type = MYSQL_TYPE_LONG;
* bind[0].length = &length[0];
* bind[0].is_null = &is_null[0];
* bind[0].error = &error[0];
*
* bind[1].buffer = &fl;
* bind[1].buffer_type = MYSQL_TYPE_LONG;
* bind[1].length = &length[0];
* bind[1].is_null = &is_null[0];
* bind[1].error = &error[0];
*/
if (mysql_stmt_execute(stmt) != 0)
{
printf("Error: can't execute prepared statement\n");
printf("%s\n\n", mysql_stmt_error(stmt));
* i = 0;
return 1;
}
else
{
*i = 0;
do
{
mysql_stmt_store_result(stmt);
N = mysql_stmt_num_rows(stmt);
/* This is debug hack; compatible only with t1 from t1_sql.h
* mysql_stmt_bind_result(stmt, bind);
* for (k = 0; k < N; k++)
* {
* mysql_stmt_fetch(stmt);
* printf("%04llu: x1 %llu, fl %llu\n", k, x1, fl);
* }
*/
num_of_rows[*i] = N;
*i = *i + 1;
}
while (mysql_stmt_next_result(stmt) == 0);
return 0;
}
return 1;
}
int execute_query_count_rows(MYSQL* conn, const char* sql)
{
int rval = -1;
unsigned long long num_of_rows[1024];
unsigned long long total;
if (execute_query_num_of_rows(conn, sql, num_of_rows, &total) == 0)
{
rval = 0;
for (unsigned int i = 0; i < total && i < 1024; i++)
{
rval += num_of_rows[i];
}
}
return rval;
}
int get_conn_num(MYSQL* conn, std::string ip, std::string hostname, std::string db)
{
MYSQL_RES* res;
MYSQL_ROW row;
unsigned long long int rows;
unsigned long long int i;
unsigned int conn_num = 0;
const char* hostname_internal;
if (ip == "127.0.0.1")
{
hostname_internal = "localhost";
}
else
{
hostname_internal = hostname.c_str();
}
if (conn != NULL)
{
if (mysql_query(conn, "show processlist;") != 0)
{
printf("Error: can't execute SQL-query: show processlist\n");
printf("%s\n\n", mysql_error(conn));
conn_num = 0;
}
else
{
res = mysql_store_result(conn);
if (res == NULL)
{
printf("Error: can't get the result description\n");
conn_num = -1;
}
else
{
mysql_num_fields(res);
rows = mysql_num_rows(res);
for (i = 0; i < rows; i++)
{
row = mysql_fetch_row(res);
if ((row[2] != NULL ) && (row[3] != NULL))
{
if ((strcmp(strtok(row[2], ":"), ip.c_str()) == 0) && strstr(row[3], db.c_str()))
{
conn_num++;
}
else if (strstr(row[2], hostname_internal) && strstr(row[3], db.c_str()))
{
conn_num++;
}
}
}
}
mysql_free_result(res);
}
}
if (ip == "127.0.0.1")
{
// one extra connection is visible in the process list
// output in case of local test
// (when MaxScale is on the same machine as backends)
conn_num--;
}
return conn_num;
}
int find_field(MYSQL* conn, const char* sql, const char* field_name, char* value)
{
MYSQL_RES* res;
MYSQL_ROW row;
MYSQL_FIELD* field;
unsigned int ret = 1;
unsigned long long int filed_i = 0;
unsigned long long int i = 0;
if (conn != NULL)
{
if (mysql_query(conn, sql) != 0)
{
printf("Error: can't execute SQL-query: %s\n", sql);
printf("%s\n\n", mysql_error(conn));
}
else
{
res = mysql_store_result(conn);
if (res == NULL)
{
printf("Error: can't get the result description\n");
}
else
{
mysql_num_fields(res);
while ((field = mysql_fetch_field(res)) && ret != 0)
{
if (strstr(field->name, field_name) != NULL)
{
filed_i = i;
ret = 0;
}
i++;
}
if (mysql_num_rows(res) > 0)
{
row = mysql_fetch_row(res);
sprintf(value, "%s", row[filed_i]);
}
else
{
sprintf(value, "%s", "");
ret = 1;
}
}
mysql_free_result(res);
do
{
res = mysql_store_result(conn);
mysql_free_result(res);
}
while (mysql_next_result(conn) == 0);
}
}
return ret;
}
Result get_result(MYSQL* conn, std::string sql)
{
Result rval;
MYSQL_RES* res;
if (mysql_query(conn, sql.c_str()) == 0 && (res = mysql_store_result(conn)))
{
MYSQL_ROW row = mysql_fetch_row(res);
while (row)
{
std::vector<std::string> tmp;
int n = mysql_num_fields(res);
for (int i = 0; i < n; ++i)
{
tmp.push_back(row[i] ? row[i] : "");
}
rval.push_back(tmp);
row = mysql_fetch_row(res);
}
mysql_free_result(res);
}
else
{
printf("Error: Query failed: %s\n", mysql_error(conn));
}
return rval;
}
Row get_row(MYSQL* conn, std::string sql)
{
Result res = get_result(conn, sql);
return res.empty() ? Row {} :
res[0];
}
int get_int_version(std::string version)
{
std::istringstream str(version);
int major = 0;
int minor = 0;
int patch = 0;
char dot;
str >> major >> dot >> minor >> dot >> patch;
return major * 10000 + minor * 100 + patch;
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,280 @@
/*
* This file is distributed as part of MaxScale. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright MariaDB Corporation Ab 2014
*/
#include "maxadmin_operations.h"
int connectMaxScale(char* hostname, char* port)
{
struct sockaddr_in addr;
int so;
int keepalive = 1;
if ((so = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
fprintf(stderr,
"Unable to create socket: %s\n",
strerror(errno));
return -1;
}
memset(&addr, 0, sizeof addr);
addr.sin_family = AF_INET;
setipaddress(&addr.sin_addr, hostname);
addr.sin_port = htons(atoi(port));
if (connect(so, (struct sockaddr*)&addr, sizeof(addr)) < 0)
{
fprintf(stderr,
"Unable to connect to MaxScale at %s, %s: %s\n",
hostname,
port,
strerror(errno));
close(so);
return -1;
}
if (setsockopt(so,
SOL_SOCKET,
SO_KEEPALIVE,
&keepalive,
sizeof(keepalive )))
{
perror("setsockopt");
}
return so;
}
int setipaddress(struct in_addr* a, char* p)
{
#ifdef __USE_POSIX
struct addrinfo* ai = NULL, hint;
int rc;
struct sockaddr_in* res_addr;
memset(&hint, 0, sizeof(hint));
hint.ai_socktype = SOCK_STREAM;
hint.ai_flags = AI_CANONNAME;
hint.ai_family = AF_INET;
if ((rc = getaddrinfo(p, NULL, &hint, &ai)) != 0)
{
return 0;
}
/* take the first one */
if (ai != NULL)
{
res_addr = (struct sockaddr_in*)(ai->ai_addr);
memcpy(a, &res_addr->sin_addr, sizeof(struct in_addr));
freeaddrinfo(ai);
return 1;
}
#else
struct hostent* h;
spinlock_acquire(&tmplock);
h = gethostbyname(p);
spinlock_release(&tmplock);
if (h == NULL)
{
if ((a->s_addr = inet_addr(p)) == -1)
{
return 0;
}
}
else
{
/* take the first one */
memcpy(a, h->h_addr, h->h_length);
return 1;
}
#endif
return 0;
}
int authMaxScale(int so, char* user, char* password)
{
char buf[20];
if (read(so, buf, 4) != 4)
{
return 0;
}
write(so, user, strlen(user));
if (read(so, buf, 8) != 8)
{
return 0;
}
write(so, password, strlen(password));
if (read(so, buf, 6) != 6)
{
return 0;
}
return strncmp(buf, "FAILED", 6);
}
int sendCommand(int so, char* cmd, char* buf)
{
char buf1[80];
int i, j, newline = 1;
int k = 0;
if (write(so, cmd, strlen(cmd)) == -1)
{
return 0;
}
while (1)
{
if ((i = read(so, buf1, 80)) <= 0)
{
return 0;
}
for (j = 0; j < i; j++)
{
if (newline == 1 && buf1[j] == 'O')
{
newline = 2;
}
else if (newline == 2 && buf1[j] == 'K' && j == i - 1)
{
return 1;
}
else if (newline == 2)
{
buf[k] = 'O';
k++;
buf[k] = buf1[j];
k++;
newline = 0;
}
else if (buf1[j] == '\n' || buf1[j] == '\r')
{
buf[k] = buf1[j];
k++;
newline = 1;
}
else
{
buf[k] = buf1[j];
k++;
newline = 0;
}
}
}
return 1;
}
int get_maxadmin_param_tcp(char* hostname, char* user, char* password, char* cmd, char* param, char* result)
{
char buf[10240];
char* port = (char*) "6603";
int so;
if ((so = connectMaxScale(hostname, port)) == -1)
{
return 1;
}
if (!authMaxScale(so, user, password))
{
fprintf(stderr,
"Failed to connect to MaxScale. "
"Incorrect username or password.\n");
close(so);
return 1;
}
sendCommand(so, cmd, buf);
// printf("%s\n", buf);
char* x = strstr(buf, param);
if (x == NULL)
{
return 1;
}
// char f_field[100];
int param_len = strlen(param);
int cnt = 0;
while (x[cnt + param_len] != '\n')
{
result[cnt] = x[cnt + param_len];
cnt++;
}
result[cnt] = '\0';
// sprintf(f_field, "%s %%s", param);
// sscanf(x, f_field, result);
close(so);
return 0;
}
int execute_maxadmin_command_tcp(char* hostname, char* user, char* password, char* cmd)
{
char buf[10240];
char* port = (char*) "6603";
int so;
if ((so = connectMaxScale(hostname, port)) == -1)
{
return 1;
}
if (!authMaxScale(so, user, password))
{
fprintf(stderr,
"Failed to connect to MaxScale. "
"Incorrect username or password.\n");
close(so);
return 1;
}
sendCommand(so, cmd, buf);
close(so);
return 0;
}
int execute_maxadmin_command_print_tcp(char* hostname, char* user, char* password, char* cmd)
{
char buf[10240];
char* port = (char*) "6603";
int so;
if ((so = connectMaxScale(hostname, port)) == -1)
{
return 1;
}
if (!authMaxScale(so, user, password))
{
fprintf(stderr,
"Failed to connect to MaxScale. "
"Incorrect username or password.\n");
close(so);
return 1;
}
sendCommand(so, cmd, buf);
printf("%s\n", buf);
close(so);
return 0;
}

View File

@ -0,0 +1,300 @@
#include <iostream>
#include <unistd.h>
#include "testconnections.h"
#include <stdio.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <netdb.h>
#include <string.h>
#include <openssl/sha.h>
#include "maxinfo_func.h"
#include <sys/epoll.h>
#include <jansson.h>
#include <fcntl.h>
using namespace std;
#define PORT 8080
#define USERAGENT "HTMLGET 1.1"
int create_tcp_socket()
{
int sock;
if ((sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
{
perror("Can't create TCP socket");
return 0;
}
return sock;
}
char* get_ip(char* host)
{
struct hostent* hent;
int iplen = 16; // XXX.XXX.XXX.XXX
char* ip = (char*)malloc(iplen + 1);
memset(ip, 0, iplen + 1);
if ((hent = gethostbyname(host)) == NULL)
{
herror("Can't get IP");
return NULL;
}
if (inet_ntop(AF_INET, (void*)hent->h_addr_list[0], ip, iplen) == NULL)
{
perror("Can't resolve host");
return NULL;
}
return ip;
}
char* build_get_query(char* host, const char* page)
{
char* query;
const char* getpage = page;
char* tpl = (char*) "GET /%s HTTP/1.1\r\nHost: %s\r\nUser-Agent: %s\r\n\r\n";
if (getpage[0] == '/')
{
getpage = getpage + 1;
fprintf(stderr, "Removing leading \"/\", converting %s to %s\n", page, getpage);
}
// -5 is to consider the %s %s %s in tpl and the ending \0
query = (char*)malloc(strlen(host) + strlen(getpage) + strlen(USERAGENT) + strlen(tpl) - 5);
sprintf(query, tpl, getpage, host, USERAGENT);
return query;
}
char* get_maxinfo(const char* page, TestConnections* Test)
{
struct sockaddr_in* remote;
int sock;
int tmpres;
char* ip;
char* get;
char buf[BUFSIZ + 1];
sock = create_tcp_socket();
ip = get_ip(Test->maxscales->IP[0]);
if (ip == NULL)
{
Test->add_result(1, "Can't get IP\n");
return NULL;
}
remote = (struct sockaddr_in*)malloc(sizeof(struct sockaddr_in*));
remote->sin_family = AF_INET;
tmpres = inet_pton(AF_INET, ip, (void*)(&(remote->sin_addr.s_addr)));
if (tmpres < 0)
{
Test->add_result(1, "Can't set remote->sin_addr.s_addr\n");
return NULL;
}
else if (tmpres == 0)
{
Test->add_result(1, "%s is not a valid IP address\n", ip);
return NULL;
}
remote->sin_port = htons(PORT);
if (connect(sock, (struct sockaddr*)remote, sizeof(struct sockaddr)) < 0)
{
Test->add_result(1, "Could not connect\n");
return NULL;
}
get = build_get_query(Test->maxscales->IP[0], page);
// Test->tprintf("Query is:\n<<START>>\n%s<<END>>\n", get);
// Send the query to the server
size_t sent = 0;
while (sent < strlen(get))
{
tmpres = send(sock, get + sent, strlen(get) - sent, 0);
if (tmpres == -1)
{
Test->add_result(1, "Can't send query\n");
return NULL;
}
sent += tmpres;
}
// now it is time to receive the page
memset(buf, 0, sizeof(buf));
char* result = (char*)calloc(BUFSIZ, sizeof(char));
size_t rsize = sizeof(buf);
while ((tmpres = recv(sock, buf, BUFSIZ, MSG_WAITALL)) > 0)
{
result = (char*)realloc(result, tmpres + rsize);
rsize += tmpres;
strcat(result, buf);
memset(buf, 0, tmpres);
}
if (tmpres < 0)
{
Test->add_result(1, "Error receiving data\n");
return NULL;
}
free(get);
free(remote);
free(ip);
close(sock);
char* content = strstr(result, "[");
if (content == NULL)
{
Test->add_result(1, "Content not found\n");
free(result);
return NULL;
}
char* ret_content = (char*) calloc(strlen(content) + 1, sizeof(char));
mempcpy(ret_content, content, strlen(content));
free(result);
return ret_content;
// return(result);
}
char* read_sc(int sock)
{
char buf[BUFSIZ + 1];
int tmpres;
memset(buf, 0, sizeof(buf));
char* result = (char*)calloc(BUFSIZ, sizeof(char));
size_t rsize = sizeof(buf);
while ((tmpres = recv(sock, buf, BUFSIZ, 0)) > 0)
{
result = (char*)realloc(result, tmpres + rsize);
rsize += tmpres;
// printf("%s", buf);
strcat(result, buf);
memset(buf, 0, tmpres);
}
return result;
}
int send_so(int sock, char* data)
{
int tmpres;
size_t sent = 0;
while (sent < strlen(data))
{
tmpres = send(sock, data + sent, strlen(data) - sent, 0);
if (tmpres == -1)
{
return -1;
}
sent += tmpres;
}
return 0;
}
static char* bin2hex(const unsigned char* old, const size_t oldlen)
{
char* result = (char*) malloc(oldlen * 2 + 1);
size_t i, j;
for (i = j = 0; i < oldlen; i++)
{
result[j++] = hexconvtab[old[i] >> 4];
result[j++] = hexconvtab[old[i] & 15];
}
result[j] = '\0';
return result;
}
char* cdc_auth_srt(char* user, char* password)
{
unsigned char sha1pass[20];
char* str;
str = (char*) malloc(42 + strlen(user) * 2);
unsigned char* password_u;
unsigned char* user_u;
password_u = (unsigned char*) malloc(strlen(password));
user_u = (unsigned char*) malloc(strlen(user));
memcpy((void*)password_u, (void*)password, strlen(password));
memcpy((void*)user_u, (void*)user, strlen(user));
SHA1(password_u, strlen(password), sha1pass);
// char * sha1pass_hex = (char *) "454ac34c2999aacfebc6bf5fe9fa1db9b596f625";
char* sha1pass_hex = bin2hex(sha1pass, 20);
printf("password %s, len %lu, password sha1: %s\n", password, strlen(password), sha1pass_hex);
char* user_hex = bin2hex(user_u, strlen(user));
char* clmn_hex = bin2hex((unsigned char*)":", 1);
sprintf(str, "%s%s%s", user_hex, clmn_hex, sha1pass_hex);
free(clmn_hex);
free(user_hex);
free(sha1pass_hex);
free(user_u);
free(password_u);
printf("%s\n", str);
return str;
}
int setnonblocking(int sock)
{
int opts;
opts = fcntl(sock, F_GETFL);
if (opts < 0)
{
return -1;
}
opts = (opts | O_NONBLOCK);
if (fcntl(sock, F_SETFL, opts) < 0)
{
return -1;
}
return 0;
}
int get_x_fl_from_json(char* line, long long int* x1, long long int* fl)
{
json_t* root;
json_error_t error;
root = json_loads(line, 0, &error);
if (!root)
{
fprintf(stderr, "error: on line %d: %s\n", error.line, error.text);
return 1;
}
json_t* x_json = json_object_get(root, "x1");
if (x_json == NULL)
{
return 1;
}
if (!json_is_integer(x_json))
{
printf("x1 is not int, type is %d\n", json_typeof(x_json));
return 1;
}
*x1 = json_integer_value(x_json);
json_t* fl_json = json_object_get(root, "fl");
if (fl_json == NULL)
{
return 1;
}
if (!json_is_integer(fl_json))
{
printf("fl is not int\n");
return 1;
}
*fl = json_integer_value(fl_json);
json_decref(x_json);
json_decref(fl_json);
json_decref(root);
return 0;
}

View File

@ -0,0 +1,492 @@
#include "maxscales.h"
#include <sstream>
#include <unordered_map>
#include <string>
#include "envv.h"
Maxscales::Maxscales(const char *pref, const char *test_cwd, bool verbose,
std::string network_config)
{
strcpy(prefix, pref);
this->verbose = verbose;
valgring_log_num = 0;
strcpy(test_dir, test_cwd);
this->network_config = network_config;
read_env();
if (this->use_valgrind)
{
for (int i = 0; i < N; i++)
{
ssh_node_f(i, true, "yum install -y valgrind gdb 2>&1");
ssh_node_f(i, true, "apt install -y --force-yes valgrind gdb 2>&1");
ssh_node_f(i, true, "zypper -n install valgrind gdb 2>&1");
ssh_node_f(i, true, "rm -rf /var/cache/maxscale/maxscale.lock");
}
}
}
int Maxscales::read_env()
{
char env_name[64];
read_basic_env();
if ((N > 0) && (N < 255))
{
for (int i = 0; i < N; i++)
{
sprintf(env_name, "%s_%03d_cnf", prefix, i);
maxscale_cnf[i] = readenv(env_name, DEFAULT_MAXSCALE_CNF);
sprintf(env_name, "%s_%03d_log_dir", prefix, i);
maxscale_log_dir[i] = readenv(env_name, DEFAULT_MAXSCALE_LOG_DIR);
sprintf(env_name, "%s_%03d_binlog_dir", prefix, i);
maxscale_binlog_dir[i] = readenv(env_name, DEFAULT_MAXSCALE_BINLOG_DIR);
sprintf(env_name, "%s_%03d_maxadmin_password", prefix, i);
maxadmin_password[i] = readenv(env_name, DEFAULT_MAXADMIN_PASSWORD);
rwsplit_port[i] = 4006;
readconn_master_port[i] = 4008;
readconn_slave_port[i] = 4009;
binlog_port[i] = 5306;
ports[i][0] = rwsplit_port[i];
ports[i][1] = readconn_master_port[i];
ports[i][2] = readconn_slave_port[i];
N_ports[0] = 3;
}
}
use_valgrind = readenv_bool("use_valgrind", false);
use_callgrind = readenv_bool("use_callgrind", false);
if (use_callgrind)
{
use_valgrind = true;
}
return 0;
}
int Maxscales::connect_rwsplit(int m, const std::string& db)
{
if (use_ipv6)
{
conn_rwsplit[m] = open_conn_db(rwsplit_port[m],
IP6[m],
db,
user_name,
password,
ssl);
}
else
{
conn_rwsplit[m] = open_conn_db(rwsplit_port[m],
IP[m],
db,
user_name,
password,
ssl);
}
routers[m][0] = conn_rwsplit[m];
int rc = 0;
int my_errno = mysql_errno(conn_rwsplit[m]);
if (my_errno)
{
if (verbose)
{
printf("Failed to connect to readwritesplit: %d, %s", my_errno, mysql_error(conn_rwsplit[m]));
}
rc = my_errno;
}
return rc;
}
int Maxscales::connect_readconn_master(int m, const std::string& db)
{
if (use_ipv6)
{
conn_master[m] = open_conn_db(readconn_master_port[m],
IP6[m],
db,
user_name,
password,
ssl);
}
else
{
conn_master[m] = open_conn_db(readconn_master_port[m],
IP[m],
db,
user_name,
password,
ssl);
}
routers[m][1] = conn_master[m];
int rc = 0;
int my_errno = mysql_errno(conn_master[m]);
if (my_errno)
{
if (verbose)
{
printf("Failed to connect to readwritesplit: %d, %s", my_errno, mysql_error(conn_master[m]));
}
rc = my_errno;
}
return rc;
}
int Maxscales::connect_readconn_slave(int m, const std::string& db)
{
if (use_ipv6)
{
conn_slave[m] = open_conn_db(readconn_slave_port[m],
IP6[m],
db,
user_name,
password,
ssl);
}
else
{
conn_slave[m] = open_conn_db(readconn_slave_port[m],
IP[m],
db,
user_name,
password,
ssl);
}
routers[m][2] = conn_slave[m];
int rc = 0;
int my_errno = mysql_errno(conn_slave[m]);
if (my_errno)
{
if (verbose)
{
printf("Failed to connect to readwritesplit: %d, %s", my_errno, mysql_error(conn_slave[m]));
}
rc = my_errno;
}
return rc;
}
int Maxscales::connect_maxscale(int m, const std::string& db)
{
return connect_rwsplit(m, db)
+ connect_readconn_master(m, db)
+ connect_readconn_slave(m, db);
}
int Maxscales::close_maxscale_connections(int m)
{
mysql_close(conn_master[m]);
mysql_close(conn_slave[m]);
mysql_close(conn_rwsplit[m]);
return 0;
}
int Maxscales::restart_maxscale(int m)
{
int res;
if (use_valgrind)
{
res = stop_maxscale(m);
res += start_maxscale(m);
}
else
{
res = ssh_node(m, "service maxscale restart", true);
}
fflush(stdout);
return res;
}
int Maxscales::start_maxscale(int m)
{
int res;
if (use_valgrind)
{
if (use_callgrind)
{
res = ssh_node_f(m, false,
"sudo --user=maxscale valgrind -d "
"--log-file=/%s/valgrind%02d.log --trace-children=yes "
" --tool=callgrind --callgrind-out-file=/%s/callgrind%02d.log "
" /usr/bin/maxscale",
maxscale_log_dir[m], valgring_log_num,
maxscale_log_dir[m], valgring_log_num);
}
else
{
res = ssh_node_f(m, false,
"sudo --user=maxscale valgrind --leak-check=full --show-leak-kinds=all "
"--log-file=/%s/valgrind%02d.log --trace-children=yes "
"--track-origins=yes /usr/bin/maxscale", maxscale_log_dir[m], valgring_log_num);
}
valgring_log_num++;
}
else
{
res =ssh_node(m, "service maxscale restart", true);
}
fflush(stdout);
return res;
}
int Maxscales::stop_maxscale(int m)
{
int res;
if (use_valgrind)
{
res = ssh_node_f(m, true, "sudo kill $(pidof valgrind) 2>&1 > /dev/null");
if ((res != 0) || atoi(ssh_node_output(m, "pidof valgrind", true, &res)) > 0)
{
res = ssh_node_f(m, true, "sudo kill -9 $(pidof valgrind) 2>&1 > /dev/null");
}
}
else
{
res = ssh_node(m, "service maxscale stop", true);
}
fflush(stdout);
return res;
}
int Maxscales::execute_maxadmin_command(int m, const char* cmd)
{
return ssh_node_f(m, true, "maxadmin %s", cmd);
}
int Maxscales::execute_maxadmin_command_print(int m, const char* cmd)
{
int exit_code;
printf("%s\n", ssh_node_output_f(m, true, &exit_code, "maxadmin %s", cmd));
return exit_code;
}
int Maxscales::check_maxadmin_param(int m, const char* command, const char* param, const char* value)
{
char result[1024];
int rval = 1;
if (get_maxadmin_param(m, (char*)command, (char*)param, (char*)result) == 0)
{
char* end = strchr(result, '\0') - 1;
while (isspace(*end))
{
*end-- = '\0';
}
char* start = result;
while (isspace(*start))
{
start++;
}
if (strcmp(start, value) == 0)
{
rval = 0;
}
else
{
printf("Expected %s, got %s\n", value, start);
}
}
return rval;
}
int Maxscales::get_maxadmin_param(int m, const char* command, const char* param, char* result)
{
char* buf;
int exit_code;
buf = ssh_node_output_f(m, true, &exit_code, "maxadmin %s", command);
// printf("%s\n", buf);
char* x = strstr(buf, param);
if (x == NULL)
{
return 1;
}
x += strlen(param);
// Skip any trailing parts of the parameter name
while (!isspace(*x))
{
x++;
}
// Trim leading whitespace
while (!isspace(*x))
{
x++;
}
char* end = strchr(x, '\n');
// Trim trailing whitespace
while (isspace(*end))
{
*end-- = '\0';
}
strcpy(result, x);
return exit_code;
}
int Maxscales::get_backend_servers_num(int m, const char* service)
{
char* buf;
int exit_code;
int i = 0;
buf = ssh_node_output_f(m, true, &exit_code, "maxadmin show service %s | grep Name: | grep Protocol: | wc -l", service);
if (buf && !exit_code)
{
sscanf(buf, "%d", &i);
}
return i;
}
long unsigned Maxscales::get_maxscale_memsize(int m)
{
int exit_code;
char* ps_out = ssh_node_output(m, "ps -e -o pid,vsz,comm= | grep maxscale", false, &exit_code);
long unsigned mem = 0;
pid_t pid;
sscanf(ps_out, "%d %lu", &pid, &mem);
return mem;
}
int Maxscales::find_master_maxadmin(Mariadb_nodes* nodes, int m)
{
bool found = false;
int master = -1;
for (int i = 0; i < nodes->N; i++)
{
char show_server[256];
char res[256];
sprintf(show_server, "show server server%d", i + 1);
get_maxadmin_param(m, show_server, (char*) "Status", res);
if (strstr(res, "Master"))
{
if (found)
{
master = -1;
}
else
{
master = i;
found = true;
}
}
}
return master;
}
int Maxscales::find_slave_maxadmin(Mariadb_nodes* nodes, int m)
{
int slave = -1;
for (int i = 0; i < nodes->N; i++)
{
char show_server[256];
char res[256];
sprintf(show_server, "show server server%d", i + 1);
get_maxadmin_param(m, show_server, (char*) "Status", res);
if (strstr(res, "Slave"))
{
slave = i;
}
}
return slave;
}
StringSet Maxscales::get_server_status(const char* name, int m)
{
std::set<std::string> rval;
int exit_code;
char* res = ssh_node_output_f(m, true, &exit_code, "maxadmin list servers|grep \'%s\'", name);
char* pipe = strrchr(res, '|');
if (res && pipe)
{
pipe++;
char* tok = strtok(pipe, ",");
while (tok)
{
char* p = tok;
char* end = strchr(tok, '\n');
if (!end)
{
end = strchr(tok, '\0');
}
// Trim leading whitespace
while (p < end && isspace(*p))
{
p++;
}
// Trim trailing whitespace
while (end > tok && isspace(*end))
{
*end-- = '\0';
}
rval.insert(p);
tok = strtok(NULL, ",\n");
}
free(res);
}
return rval;
}
int Maxscales::port(enum service type, int m) const
{
switch (type)
{
case RWSPLIT:
return rwsplit_port[m];
case READCONN_MASTER:
return readconn_master_port[m];
case READCONN_SLAVE:
return readconn_slave_port[m];
}
return -1;
}
void Maxscales::wait_for_monitor(int intervals, int m)
{
ssh_node_f(m, false, "for ((i=0;i<%d;i++)); do maxctrl api get maxscale/debug/monitor_wait; done", intervals);
}

View File

@ -0,0 +1,449 @@
#include "nodes.h"
#include <string>
#include <cstring>
#include <iostream>
#include <future>
#include <functional>
#include <algorithm>
#include <signal.h>
#include "envv.h"
Nodes::Nodes()
{
}
bool Nodes::check_node_ssh(int node)
{
bool res = true;
if (ssh_node(node, "ls > /dev/null", false) != 0)
{
std::cout << "Node " << node << " is not available" << std::endl;
res = false;
}
return res;
}
bool Nodes::check_nodes()
{
std::vector<std::future<bool>> f;
for (int i = 0; i < N; i++)
{
f.push_back(std::async(std::launch::async, &Nodes::check_node_ssh, this, i));
}
return std::all_of(f.begin(), f.end(), std::mem_fn(&std::future<bool>::get));
}
void Nodes::generate_ssh_cmd(char* cmd, int node, const char* ssh, bool sudo)
{
if (strcmp(IP[node], "127.0.0.1") == 0)
{
if (sudo)
{
sprintf(cmd,
"%s %s",
access_sudo[node],
ssh);
}
else
{
sprintf(cmd, "%s", ssh);
}
}
else
{
if (sudo)
{
sprintf(cmd,
"ssh -i %s -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -o LogLevel=quiet %s@%s '%s %s\'",
sshkey[node],
access_user[node],
IP[node],
access_sudo[node],
ssh);
}
else
{
sprintf(cmd,
"ssh -i %s -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -o LogLevel=quiet %s@%s '%s'",
sshkey[node],
access_user[node],
IP[node],
ssh);
}
}
}
char* Nodes::ssh_node_output_f(int node, bool sudo, int* exit_code, const char* format, ...)
{
va_list valist;
va_start(valist, format);
int message_len = vsnprintf(NULL, 0, format, valist);
va_end(valist);
if (message_len < 0)
{
return NULL;
}
char* sys = (char*)malloc(message_len + 1);
va_start(valist, format);
vsnprintf(sys, message_len + 1, format, valist);
va_end(valist);
char* result = ssh_node_output(node, sys, sudo, exit_code);
free(sys);
return result;
}
char* Nodes::ssh_node_output(int node, const char* ssh, bool sudo, int* exit_code)
{
char* cmd = (char*)malloc(strlen(ssh) + 1024);
generate_ssh_cmd(cmd, node, ssh, sudo);
FILE* output = popen(cmd, "r");
if (output == NULL)
{
printf("Error opening ssh %s\n", strerror(errno));
return NULL;
}
char buffer[1024];
size_t rsize = sizeof(buffer);
char* result = (char*)calloc(rsize, sizeof(char));
while (fgets(buffer, sizeof(buffer), output))
{
result = (char*)realloc(result, sizeof(buffer) + rsize);
rsize += sizeof(buffer);
strcat(result, buffer);
}
free(cmd);
int code = pclose(output);
if (WIFEXITED(code))
{
* exit_code = WEXITSTATUS(code);
}
else
{
* exit_code = 256;
}
return result;
}
int Nodes::ssh_node(int node, const char* ssh, bool sudo)
{
char* cmd = (char*)malloc(strlen(ssh) + 1024);
if (strcmp(IP[node], "127.0.0.1") == 0)
{
printf("starting bash\n");
sprintf(cmd, "bash");
}
else
{
sprintf(cmd,
"ssh -i %s -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -o LogLevel=quiet %s@%s%s",
sshkey[node],
access_user[node],
IP[node],
verbose ? "" : " > /dev/null");
}
if (verbose)
{
std::cout << ssh << std::endl;
}
int rc = 1;
FILE* in = popen(cmd, "w");
if (in)
{
if (sudo)
{
fprintf(in, "sudo su -\n");
fprintf(in, "cd /home/%s\n", access_user[node]);
}
fprintf(in, "%s\n", ssh);
rc = pclose(in);
}
free(cmd);
if (WIFEXITED(rc))
{
return WEXITSTATUS(rc);
}
else if (WIFSIGNALED(rc) && WTERMSIG(rc) == SIGHUP)
{
// SIGHUP appears to happen for SSH connections
return 0;
}
else
{
std::cout << strerror(errno) << std::endl;
return 256;
}
}
int Nodes::ssh_node_f(int node, bool sudo, const char* format, ...)
{
va_list valist;
va_start(valist, format);
int message_len = vsnprintf(NULL, 0, format, valist);
va_end(valist);
if (message_len < 0)
{
return -1;
}
char* sys = (char*)malloc(message_len + 1);
va_start(valist, format);
vsnprintf(sys, message_len + 1, format, valist);
va_end(valist);
int result = ssh_node(node, sys, sudo);
free(sys);
return result;
}
int Nodes::copy_to_node(int i, const char* src, const char* dest)
{
if (i >= N)
{
return 1;
}
char sys[strlen(src) + strlen(dest) + 1024];
if (strcmp(IP[i], "127.0.0.1") == 0)
{
sprintf(sys,
"cp %s %s",
src,
dest);
}
else
{
sprintf(sys,
"scp -q -r -i %s -o UserKnownHostsFile=/dev/null "
"-o StrictHostKeyChecking=no -o LogLevel=quiet %s %s@%s:%s",
sshkey[i],
src,
access_user[i],
IP[i],
dest);
}
if (verbose)
{
printf("%s\n", sys);
}
return system(sys);
}
int Nodes::copy_to_node_legacy(const char* src, const char* dest, int i)
{
return copy_to_node(i, src, dest);
}
int Nodes::copy_from_node(int i, const char* src, const char* dest)
{
if (i >= N)
{
return 1;
}
char sys[strlen(src) + strlen(dest) + 1024];
if (strcmp(IP[i], "127.0.0.1") == 0)
{
sprintf(sys,
"cp %s %s",
src,
dest);
}
else
{
sprintf(sys,
"scp -q -r -i %s -o UserKnownHostsFile=/dev/null "
"-o StrictHostKeyChecking=no -o LogLevel=quiet %s@%s:%s %s",
sshkey[i],
access_user[i],
IP[i],
src,
dest);
}
if (verbose)
{
printf("%s\n", sys);
}
return system(sys);
}
int Nodes::copy_from_node_legacy(const char* src, const char* dest, int i)
{
return copy_from_node(i, src, dest);
}
int Nodes::read_basic_env()
{
char env_name[64];
sprintf(env_name, "%s_user", prefix);
user_name = readenv(env_name, "skysql");
sprintf(env_name, "%s_password", prefix);
password = readenv(env_name, "skysql");
N = get_N();
if ((N > 0) && (N < 255))
{
for (int i = 0; i < N; i++)
{
// reading IPs
sprintf(env_name, "%s_%03d_network", prefix, i);
IP[i] = strdup(get_nc_item(env_name).c_str());
// reading private IPs
sprintf(env_name, "%s_%03d_private_ip", prefix, i);
IP_private[i] = strdup(get_nc_item(env_name).c_str());
if (IP_private[i] == NULL)
{
IP_private[i] = IP[i];
}
setenv(env_name, IP_private[i], 1);
// reading IPv6
sprintf(env_name, "%s_%03d_network6", prefix, i);
IP6[i] = strdup(get_nc_item(env_name).c_str());
if (IP6[i] == NULL)
{
IP6[i] = IP[i];
}
setenv(env_name, IP6[i], 1);
//reading sshkey
sprintf(env_name, "%s_%03d_keyfile", prefix, i);
sshkey[i] = strdup(get_nc_item(env_name).c_str());
sprintf(env_name, "%s_%03d_whoami", prefix, i);
access_user[i] = strdup(get_nc_item(env_name).c_str());
if (access_user[i] == NULL)
{
access_user[i] = (char *) "vagrant";
}
setenv(env_name, access_user[i], 1);
sprintf(env_name, "%s_%03d_access_sudo", prefix, i);
access_sudo[i] = readenv(env_name, " sudo ");
if (strcmp(access_user[i], "root") == 0)
{
access_homedir[i] = (char *) "/root/";
}
else
{
access_homedir[i] = (char *) malloc(strlen(access_user[i]) + 9);
sprintf(access_homedir[i], "/home/%s/", access_user[i]);
}
sprintf(env_name, "%s_%03d_hostname", prefix, i);
hostname[i] = strdup(get_nc_item(env_name).c_str());
if ((hostname[i] == NULL) || (strcmp(hostname[i], "") == 0))
{
hostname[i] = IP_private[i];
}
setenv(env_name, hostname[i], 1);
sprintf(env_name, "%s_%03d_start_vm_command", prefix, i);
start_vm_command[i] = readenv(env_name, "curr_dir=`pwd`; cd %s/%s;vagrant resume %s_%03d ; cd $curr_dir",
getenv("MDBCI_VM_PATH"), getenv("name"), prefix, i);
setenv(env_name, start_vm_command[i], 1);
sprintf(env_name, "%s_%03d_stop_vm_command", prefix, i);
stop_vm_command[i] = readenv(env_name, "curr_dir=`pwd`; cd %s/%s;vagrant suspend %s_%03d ; cd $curr_dir",
getenv("MDBCI_VM_PATH"), getenv("name"), prefix, i);
setenv(env_name, stop_vm_command[i], 1);
}
}
return 0;
}
const char* Nodes::ip(int i) const
{
return use_ipv6 ? IP6[i] : IP[i];
}
std::string Nodes::get_nc_item(const char* item_name)
{
size_t start = network_config.find(item_name);
if (start == std::string::npos)
{
return "";
}
size_t end = network_config.find("\n", start);
size_t equal = network_config.find("=", start);
if (end == std::string::npos)
{
end = network_config.length();
}
if (equal == std::string::npos)
{
return "";
}
std::string str = network_config.substr(equal + 1, end - equal - 1);
str.erase(remove(str.begin(), str.end(), ' '), str.end());
setenv(item_name, str.c_str(), 1);
return str;
}
int Nodes::get_N()
{
int N = 0;
char item[strlen(prefix) + 13];
do
{
sprintf(item, "%s_%03d_network", prefix, N);
N++;
}
while (network_config.find(item) != std::string::npos);
sprintf(item, "%s_N", prefix);
setenv(item, std::to_string(N).c_str(), 1);
return N - 1 ;
}
int Nodes::start_vm(int node)
{
return (system(start_vm_command[node]));
}
int Nodes::stop_vm(int node)
{
return (system(stop_vm_command[node]));
}

View File

@ -0,0 +1,817 @@
#include "execute_cmd.h"
#include "rds_vpc.h"
RDS::RDS(char* cluster)
{
cluster_name_intern = cluster;
subnets_intern = NULL;
N_intern = 0;
}
const char* RDS::get_instance_name(json_t* instance)
{
json_t* instance_name = json_object_get(instance, "DBInstanceIdentifier");
return json_string_value(instance_name);
}
json_t* RDS::get_cluster_descr(char* json)
{
json_t* root;
json_error_t error;
root = json_loads(json, 0, &error);
if (!root)
{
fprintf(stderr, "error: on line %d: %s\n", error.line, error.text);
return NULL;
}
json_t* clusters = json_object_get(root, "DBClusters");
// cluster_intern =
return json_array_get(clusters, 0);
}
json_t* RDS::get_subnets_group_descr(char* json)
{
json_t* root;
json_error_t error;
root = json_loads(json, 0, &error);
if (!root)
{
fprintf(stderr, "error: on line %d: %s\n", error.line, error.text);
return NULL;
}
json_t* subnets = json_object_get(root, "DBSubnetGroups");
return json_array_get(subnets, 0);
}
json_t* RDS::get_cluster_nodes()
{
return get_cluster_nodes(cluster_intern);
}
json_t* RDS::get_cluster_nodes(json_t* cluster)
{
json_t* members = json_object_get(cluster, "DBClusterMembers");
size_t members_N = json_array_size(members);
json_t* member;
json_t* node_names = json_array();
for (size_t i = 0; i < members_N; i++)
{
member = json_array_get(members, i);
json_array_append(node_names, json_string(get_instance_name(member)));
}
return node_names;
}
json_t* RDS::get_subnets()
{
char cmd[1024];
char* result;
sprintf(cmd, "aws rds describe-db-subnet-groups --db-subnet-group-name %s", subnets_group_name_intern);
if (execute_cmd(cmd, &result) != 0)
{
return NULL;
}
json_t* subnets_group = get_subnets_group_descr(result);
json_t* members = json_object_get(subnets_group, "Subnets");
vpc_id_intern = json_string_value(json_object_get(subnets_group, "VpcId"));
size_t members_N = json_array_size(members);
json_t* member;
json_t* subnets_names = json_array();
for (size_t i = 0; i < members_N; i++)
{
member = json_array_get(members, i);
json_array_append(subnets_names, json_object_get(member, "SubnetIdentifier"));
}
subnets_intern = subnets_names;
return subnets_names;
}
const char* RDS::get_subnetgroup_name()
{
if (cluster_intern != NULL)
{
subnets_group_name_intern = json_string_value(json_object_get(cluster_intern, "DBSubnetGroup"));
}
else
{
subnets_group_name_intern = cluster_name_intern;
}
return subnets_group_name_intern;
}
json_t* RDS::get_cluster()
{
char cmd[1024];
char* result;
sprintf(cmd, "aws rds describe-db-clusters --db-cluster-identifier=%s", cluster_name_intern);
execute_cmd(cmd, &result);
return get_cluster_descr(result);
}
int RDS::destroy_nodes(json_t* node_names)
{
size_t N = json_array_size(node_names);
char cmd[1024];
char* res;
json_t* node;
int err = 0;
for (size_t i = 0; i < N; i++)
{
node = json_array_get(node_names, i);
sprintf(cmd,
"aws rds delete-db-instance --skip-final-snapshot --db-instance-identifier=%s",
json_string_value(node));
printf("%s\n", cmd);
if (execute_cmd(cmd, &res) != 0)
{
err = -1;
fprintf(stderr, "error: can not delete node %s\n", json_string_value(node));
}
}
return err;
}
int RDS::destroy_subnets()
{
size_t N = json_array_size(subnets_intern);
char cmd[1024];
char* res;
json_t* subnet;
int err = 0;
for (size_t i = 0; i < N; i++)
{
subnet = json_array_get(subnets_intern, i);
sprintf(cmd, "aws ec2 delete-subnet --subnet-id=%s", json_string_value(subnet));
printf("%s\n", cmd);
execute_cmd(cmd, &res);
if (execute_cmd(cmd, &res) != 0)
{
err = -1;
fprintf(stderr, "error: can not delete subnet %s\n", json_string_value(subnet));
}
}
return err;
}
int RDS::destroy_route_tables()
{
json_t* root;
char cmd[1024];
char* json;
int res = 0;
sprintf(cmd, "aws ec2 describe-vpcs --vpc-ids=%s", vpc_id_intern);
if (execute_cmd(cmd, &json))
{
fprintf(stderr, "error: can not get internet gateways description\n");
return -1;
}
root = get_cluster_descr(json);
if (!root)
{
fprintf(stderr, "error: can not get cluster description\n");
return -1;
}
json_t* route_tables = json_object_get(root, "RouteTables");
size_t i;
json_t* route_table;
const char* rt_id;
const char* vpc_id;
json_array_foreach(route_tables, i, route_table)
{
rt_id = json_string_value(json_object_get(route_table, "RouteTableId"));
vpc_id = json_string_value(json_object_get(route_table, "VpcId"));
if (strcmp(vpc_id_intern, vpc_id) == 0)
{
sprintf(cmd, "aws ec2 delete-route-table --route-table-id %s", rt_id);
res += system(cmd);
}
}
return res;
}
int RDS::detach_and_destroy_gw()
{
json_t* root;
json_error_t error;
char cmd[1024];
char* json;
sprintf(cmd,
"aws ec2 describe-internet-gateways --filters Name=attachment.vpc-id,Values=%s",
vpc_id_intern);
if (execute_cmd(cmd, &json))
{
fprintf(stderr, "error: can not get internet gateways description\n");
return -1;
}
root = json_loads(json, 0, &error);
if (!root)
{
fprintf(stderr, "error: on line %d: %s\n", error.line, error.text);
return -1;
}
json_t* gws = json_object_get(root, "InternetGateways");
if (gws == NULL)
{
fprintf(stderr, "error: can not parse internet gateways description\n");
return -1;
}
size_t i;
json_t* gw;
const char* gw_id;
json_array_foreach(gws, i, gw)
{
gw_id = json_string_value(json_object_get(gw, "InternetGatewayId"));
sprintf(cmd,
"aws ec2 detach-internet-gateway --internet-gateway-id=%s --vpc-id=%s",
gw_id,
vpc_id_intern);
printf("%s\n", cmd);
if (system(cmd) != 0)
{
fprintf(stderr, "error: can not detach gateway %s from vpc %s\n", gw_id, vpc_id_intern);
return -1;
}
sprintf(cmd, "aws ec2 delete-internet-gateway --internet-gateway-id=%s", gw_id);
printf("%s\n", cmd);
if (system(cmd) != 0)
{
fprintf(stderr, "error: can not delete gateway %s\n", gw_id);
return -1;
}
}
return 0;
}
int RDS::create_vpc(const char** vpc_id)
{
json_t* root;
json_error_t error;
char* result;
char cmd[1024];
if (execute_cmd((char*) "aws ec2 create-vpc --cidr-block 172.30.0.0/16", &result) != 0)
{
fprintf(stderr, "error: can not create VPC\n");
return -1;
}
root = json_loads(result, 0, &error);
if (!root)
{
fprintf(stderr, "error: on line %d: %s\n", error.line, error.text);
return -1;
}
*vpc_id = json_string_value(json_object_get(json_object_get(root, "Vpc"), "VpcId"));
if (*vpc_id == NULL)
{
fprintf(stderr, "error: can not parse output of create-vpc command\n");
return -1;
}
vpc_id_intern = * vpc_id;
sprintf(cmd, "aws ec2 modify-vpc-attribute --enable-dns-support --vpc-id %s", *vpc_id);
if (system(cmd) != 0)
{
fprintf(stderr, "error: can not enable dns support\n");
return -1;
}
sprintf(cmd, "aws ec2 modify-vpc-attribute --enable-dns-hostnames --vpc-id %s", *vpc_id);
if (system(cmd) != 0)
{
fprintf(stderr, "error: can not enable dns hostnames\n");
return -1;
}
return 0;
}
int RDS::create_subnet(const char* az, const char* cidr, const char** subnet_id)
{
json_t* root;
json_error_t error;
char* result;
char cmd[1024];
*subnet_id = NULL;
sprintf(cmd,
"aws ec2 create-subnet --cidr-block %s --availability-zone %s --vpc-id %s",
cidr,
az,
vpc_id_intern);
puts(cmd);
if (execute_cmd(cmd, &result) != 0)
{
fprintf(stderr, "error: can not create subnet\n");
return -1;
}
root = json_loads(result, 0, &error);
if (!root)
{
fprintf(stderr, "error: on line %d: %s\n", error.line, error.text);
return -1;
}
*subnet_id = json_string_value(json_object_get(json_object_get(root, "Subnet"), "SubnetId"));
if (*subnet_id == NULL)
{
fprintf(stderr, "error: can not parse output of create-vpc command\n");
return -1;
}
if (subnets_intern == NULL)
{
subnets_intern = json_array();
}
json_array_append(subnets_intern, json_string(*subnet_id));
sprintf(cmd, "aws ec2 modify-subnet-attribute --map-public-ip-on-launch --subnet-id %s", *subnet_id);
if (system(cmd) != 0)
{
fprintf(stderr, "error: can not modify subnet attribute\n");
return -1;
}
return 0;
}
int RDS::create_subnet_group()
{
char cmd[1024];
size_t i;
json_t* subnet;
sprintf(cmd,
"aws rds create-db-subnet-group --db-subnet-group-name %s --db-subnet-group-description maxscale --subnet-ids",
cluster_name_intern);
json_array_foreach(subnets_intern, i, subnet)
{
strcat(cmd, " ");
strcat(cmd, json_string_value(subnet));
}
subnets_group_name_intern = cluster_name_intern;
if (system(cmd) != 0)
{
fprintf(stderr, "error: can not create subnets group\n");
return -1;
}
return 0;
}
int RDS::create_gw(const char** gw_id)
{
char* result;
char cmd[1024];
json_error_t error;
*gw_id = NULL;
gw_intern = NULL;
if (execute_cmd((char*) "aws ec2 create-internet-gateway", &result) != 0)
{
fprintf(stderr, "error: can not create internet gateway\n");
return -1;
}
json_t* root = json_loads(result, 0, &error);
if (!root)
{
fprintf(stderr, "error: on line %d: %s\n", error.line, error.text);
return -1;
}
*gw_id =
json_string_value(json_object_get(json_object_get(root, "InternetGateway"), "InternetGatewayId"));
if (*gw_id == NULL)
{
fprintf(stderr, "error: can not parse output of create-internet-gateway command\n");
return -1;
}
gw_intern = *gw_id;
sprintf(cmd,
"aws ec2 attach-internet-gateway --internet-gateway-id %s --vpc-id %s",
*gw_id,
vpc_id_intern);
if (system(cmd) != 0)
{
fprintf(stderr, "error: can not attach gateway to VPC\n");
return -1;
}
return 0;
}
int RDS::configure_route_table(const char** rt)
{
char* result;
char cmd[1024];
json_error_t error;
*rt = NULL;
if (execute_cmd((char*) "aws ec2 describe-route-tables", &result) != 0)
{
fprintf(stderr, "error: can not get route tables description\n");
return -1;
}
json_t* root = json_loads(result, 0, &error);
if (!root)
{
fprintf(stderr, "error: on line %d: %s\n", error.line, error.text);
return -1;
}
json_t* route_tables = json_object_get(root, "RouteTables");
if (route_tables == NULL)
{
fprintf(stderr, "error: can not parse route tables description\n");
return -1;
}
size_t i;
json_t* rtb;
const char* rt_vpc;
json_array_foreach(route_tables, i, rtb)
{
rt_vpc = json_string_value(json_object_get(rtb, "VpcId"));
if (strcmp(vpc_id_intern, rt_vpc) == 0)
{
// add route to route table which belongs to give VPC
*rt = json_string_value(json_object_get(rtb, "RouteTableId"));
sprintf(cmd,
"aws ec2 create-route --route-table-id %s --gateway-id %s --destination-cidr-block 0.0.0.0/0",
*rt,
gw_intern);
if (system(cmd) != 0)
{
fprintf(stderr, "error: can not create route\n");
return -1;
}
}
}
if (*rt == NULL)
{
fprintf(stderr, "error: can not find route table\n");
return -1;
}
return 0;
}
int RDS::create_cluster()
{
char cmd[1024];
char* result;
json_error_t error;
size_t i;
int res = 0;
sprintf(cmd,
"aws rds create-db-cluster --database-name=test --engine=aurora --master-username=skysql --master-user-password=skysqlrds --db-cluster-identifier=%s --db-subnet-group-name=%s",
cluster_name_intern,
cluster_name_intern);
execute_cmd(cmd, &result);
json_t* root = json_loads(result, 0, &error);
if (!root)
{
fprintf(stderr, "error: on line %d: %s\n", error.line, error.text);
return -1;
}
json_t* cluster = json_object_get(root, "DBCluster");
cluster_intern = cluster;
json_t* security_groups = json_object_get(cluster, "VpcSecurityGroups");
json_t* sg;
const char* sg_id;
json_array_foreach(security_groups, i, sg)
{
sg_id = json_string_value(json_object_get(sg, "VpcSecurityGroupId"));
printf("Security group %s\n", sg_id);
sprintf(cmd,
"aws ec2 authorize-security-group-ingress --group-id %s --protocol tcp --port 3306 --cidr 0.0.0.0/0",
sg_id);
res += system(cmd);
}
sg_intern = sg_id;
for (size_t i = 0; i < N_intern; i++)
{
sprintf(cmd,
"aws rds create-db-instance --db-cluster-identifier=%s --engine=aurora --db-instance-class=db.t2.medium --publicly-accessible --db-instance-identifier=node%03lu",
cluster_name_intern,
i);
printf("%s\n", cmd);
res += system(cmd);
}
return res;
}
int RDS::get_writer(const char** writer_name)
{
char* json;
char cmd[1024];
sprintf(cmd, "aws rds describe-db-clusters --db-cluster-identifier=%s", cluster_name_intern);
execute_cmd(cmd, &json);
json_t* cluster = get_cluster_descr(json);
json_t* nodes = json_object_get(cluster, "DBClusterMembers");
// char * s = json_dumps(nodes, JSON_INDENT(4));
// puts(s);
bool writer;
json_t* node;
size_t i = 0;
do
{
node = json_array_get(nodes, i);
writer = json_is_true(json_object_get(node, "IsClusterWriter"));
i++;
}
while (!writer);
* writer_name = json_string_value(json_object_get(node, "DBInstanceIdentifier"));
return 0;
}
int RDS::destroy_vpc()
{
char cmd[1024];
sprintf(cmd, "aws ec2 delete-vpc --vpc-id=%s", vpc_id_intern);
return system(cmd);
}
int RDS::destroy_cluster()
{
char cmd[1024];
char* result;
sprintf(cmd,
"aws rds delete-db-cluster --db-cluster-identifier=%s --skip-final-snapshot",
cluster_name_intern);
return execute_cmd(cmd, &result);
}
int RDS::destroy_subnets_group()
{
char cmd[1024];
char* result;
sprintf(cmd, "aws rds delete-db-subnet-group --db-subnet-group-name %s", get_subnetgroup_name());
puts(cmd);
return execute_cmd(cmd, &result);
}
int RDS::create_rds_db(int N)
{
const char* vpc;
const char* subnet1;
const char* subnet2;
const char* gw;
const char* rt;
N_intern = N;
printf("Create VPC\n");
if (create_vpc(&vpc) != 0)
{
fprintf(stderr, "error: can not create VPC\n");
destroy_vpc();
return -1;
}
printf("vpc id: %s\n", vpc);
printf("Create subnets\n");
create_subnet("eu-west-1b", "172.30.0.0/24", &subnet1);
create_subnet("eu-west-1a", "172.30.1.0/24", &subnet2);
printf("Create subnets group\n");
if (create_subnet_group() != 0)
{
destroy_subnets();
destroy_subnets_group();
destroy_vpc();
return -1;
}
printf("Create internet gateway\n");
if (create_gw(&gw) != 0)
{
detach_and_destroy_gw();
destroy_subnets();
destroy_subnets_group();
destroy_vpc();
return -1;
}
printf("Gateway: %s\n", gw);
printf("Configure route table\n");
if (configure_route_table(&rt) != 0)
{
detach_and_destroy_gw();
destroy_subnets();
destroy_subnets_group();
destroy_vpc();
return -1;
}
printf("Route table: %s\n", rt);
printf("Create RDS cluster\n");
if (create_cluster() != 0)
{
destroy_nodes(get_cluster_nodes());
destroy_cluster();
detach_and_destroy_gw();
destroy_subnets();
destroy_subnets_group();
destroy_vpc();
return -1;
}
return 0;
}
int RDS::delete_rds_cluster()
{
char* result;
char cmd[1024];
json_t* current_cluster;
printf("Get cluster\n");
cluster_intern = get_cluster();
printf("Get cluster NODES\n");
json_t* nodes = get_cluster_nodes();
printf("Get subnets group: %s\n", get_subnetgroup_name());
printf("Get subnets\n");
get_subnets();
printf("Get VPC: %s\n", vpc_id_intern);
size_t alive_nodes = json_array_size(nodes);
printf("Destroy nodes\n");
destroy_nodes(nodes);
do
{
printf("Waiting for nodes to be deleted, now %lu nodes are still alive\n", alive_nodes);
sleep(5);
current_cluster = get_cluster();
nodes = get_cluster_nodes(current_cluster);
alive_nodes = json_array_size(nodes);
}
while (alive_nodes > 0);
printf("Destroy cluster\n");
destroy_cluster();
do
{
printf("Waiting for cluster to be deleted\n");
sleep(5);
sprintf(cmd, "aws rds describe-db-clusters --db-cluster-identifier=%s", cluster_name_intern);
execute_cmd(cmd, &result);
}
while (get_cluster_descr(result) != NULL);
printf("Destroy subnets\n");
destroy_subnets();
printf("Destroy subnet group\n");
destroy_subnets_group();
printf("Get and destroy Internet Gateways\n");
detach_and_destroy_gw();
printf("Destroy vpc\n");
return destroy_vpc();
}
int RDS::wait_for_nodes(size_t N)
{
char* result;
size_t active_nodes = 0;
size_t i = 0;
json_t* node;
char cmd[1024];
json_t* nodes;
json_t* instances;
json_t* instance;
json_error_t error;
do
{
printf("Waiting for nodes to be active, now %lu are active\n", active_nodes);
sleep(5);
cluster_intern = get_cluster();
nodes = get_cluster_nodes();
active_nodes = 0;
json_array_foreach(nodes, i, node)
{
sprintf(cmd,
"aws rds describe-db-instances --db-instance-identifier=%s",
json_string_value(node));
execute_cmd(cmd, &result);
instances = json_loads(result, 0, &error);
if (!instances)
{
fprintf(stderr, "error: on line %d: %s\n", error.line, error.text);
return -1;
}
instance = json_array_get(json_object_get(instances, "DBInstances"), 0);
// puts(json_dumps(instance, JSON_INDENT(4)));
if (strcmp(json_string_value(json_object_get(instance, "DBInstanceStatus")), "available") == 0)
{
active_nodes++;
}
}
}
while (active_nodes != N);
return 0;
}
int RDS::do_failover()
{
char* result;
const char* writer;
const char* new_writer;
char cmd[1024];
if (get_writer(&writer) != 0)
{
return -1;
}
sprintf(cmd, "aws rds failover-db-cluster --db-cluster-identifier=%s", cluster_name_intern);
if (execute_cmd(cmd, &result) != 0)
{
return -1;
}
do
{
if (get_writer(&new_writer) != 0)
{
return -1;
}
printf("writer: %s\n", new_writer);
sleep(5);
}
while (strcmp(writer, new_writer) == 0);
return 0;
}
json_t* RDS::get_endpoints()
{
char cmd[1024];
char* result;
json_t* root;
json_error_t error;
json_t* node;
json_t* node_json;
json_t* endpoint;
json_t* endpoints;
endpoints = json_array();
cluster_intern = get_cluster();
json_t* nodes = get_cluster_nodes();
// puts(json_dumps(nodes, JSON_INDENT(4)));
size_t i;
json_array_foreach(nodes, i, node)
{
sprintf(cmd, "aws rds describe-db-instances --db-instance-identifier=%s", json_string_value(node));
if (execute_cmd(cmd, &result) != 0)
{
fprintf(stderr, "error: executing aws rds describe-db-instances\n");
return NULL;
}
root = json_loads(result, 0, &error);
if (!root)
{
fprintf(stderr, "error: on line %d: %s\n", error.line, error.text);
return NULL;
}
node_json = json_array_get(json_object_get(root, "DBInstances"), 0);
endpoint = json_object_get(node_json, "Endpoint");
json_array_append(endpoints, endpoint);
}
return endpoints;
}

View File

@ -0,0 +1,257 @@
#include "sql_t1.h"
#include <pthread.h>
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static char** sql = NULL;
static size_t sql_size = 0;
int execute_select_query_and_check(MYSQL* conn, const char* sql, unsigned long long int rows)
{
MYSQL_RES* res;
MYSQL_ROW row;
unsigned long long int i;
unsigned long long int num_fields;
unsigned long long int int_res;
unsigned long long int row_i = 0;
int test_result = 0;
unsigned long long int rows_from_select = 0;
int wait_i = 0;
printf("Trying SELECT, num_of_rows=%llu\n", rows);
int res_alloc = 0;
if (conn != NULL)
{
rows_from_select = 0;
wait_i = 0;
while ((rows_from_select != rows) && (wait_i < 100))
{
if (mysql_query(conn, sql) != 0)
{
printf("Error: can't execute SQL-query: %s\n", mysql_error(conn));
}
res = mysql_store_result(conn);
res_alloc = 1;
if (res == NULL)
{
printf("Error: can't get the result description\n");
test_result = 1;
mysql_free_result(res);
res_alloc = 0;
wait_i++;
sleep(1);
}
else
{
rows_from_select = mysql_num_rows(res);
printf("rows=%llu\n", rows_from_select);
wait_i++;
if (rows_from_select != rows)
{
printf("Waiting 1 second and trying again...\n");
mysql_free_result(res);
res_alloc = 0;
sleep(1);
}
}
}
if (rows_from_select != rows)
{
printf("SELECT returned %llu rows instead of %llu!\n", rows_from_select, rows);
test_result = 1;
printf("sql was %s\n", sql);
}
else
{
num_fields = mysql_num_fields(res);
if (num_fields != 2)
{
printf("SELECT returned %llu fileds instead of 2!\n", num_fields);
test_result = 1;
}
if (mysql_num_rows(res) > 0)
{
while ((row = mysql_fetch_row(res)) != NULL)
{
for (i = 0; i < num_fields; i++)
{
sscanf(row[i], "%llu", &int_res);
if ((i == 0 ) && (int_res != row_i))
{
printf("SELECT returned wrong result! %llu instead of expected %llu\n",
int_res,
row_i);
test_result = 1;
printf("sql was %s\n", sql);
}
}
row_i++;
}
}
}
if (res_alloc != 0)
{
mysql_free_result(res);
}
}
else
{
printf("FAILED: broken connection\n");
test_result = 1;
}
return test_result;
}
int create_t1(MYSQL* conn)
{
int result = 0;
result += execute_query(conn, "DROP TABLE IF EXISTS t1;");
printf("Creating test table\n");
result += execute_query(conn, "CREATE TABLE t1 (x1 int, fl int);");
return result;
}
int create_t2(MYSQL* conn)
{
int result = 0;
result += execute_query(conn, "DROP TABLE IF EXISTS t2;");
printf("Creating test table\n");
result += execute_query(conn, "CREATE TABLE t2 (x1 int, fl int);");
return result;
}
static const char ins1[] = "INSERT INTO t1 (x1, fl) VALUES ";
int create_insert_string(char* sql, int N, int fl)
{
char* wptr = sql;
strcpy(wptr, ins1);
for (int i = 0; i < N; i++)
{
wptr = strchr(wptr, '\0');
sprintf(wptr, "(%d, %d),", i, fl);
}
wptr = strrchr(wptr, ',');
sprintf(wptr, ";");
return 0;
}
char* allocate_insert_string(int fl, int N)
{
char* rval = NULL;
pthread_mutex_lock(&mutex);
if (sql == NULL)
{
sql = (char**)calloc(16, sizeof(char*));
sql_size = 16;
}
if ((size_t)fl >= sql_size)
{
fprintf(stderr, "Insert index %d is too large, setting it to %lu", fl, sql_size - 1);
fl = sql_size - 1;
}
if (sql[fl] == NULL)
{
char tmpstr[256];
sprintf(tmpstr, "(%d, %d),", N, fl);
sql[fl] = (char*)malloc(sizeof(ins1) + N * strlen(tmpstr) + 60);
create_insert_string(sql[fl], N, fl);
}
rval = sql[fl];
pthread_mutex_unlock(&mutex);
return rval;
}
int insert_into_t1(MYSQL* conn, int N)
{
int x = 16;
int result = 0;
printf("Generating long INSERTs\n");
for (int i = 0; i < N; i++)
{
printf("sql %d, rows=%d\n", i, x);
char* sqlstr = allocate_insert_string(i, x);
printf("INSERT: rwsplitter\n");
printf("Trying INSERT, len=%d\n", x);
fflush(stdout);
result += execute_query(conn, "%s", sqlstr);
fflush(stdout);
x *= 16;
}
return result;
}
int select_from_t1(MYSQL* conn, int N)
{
int x = 16;
int result = 0;
int i;
char sq[100];
for (i = 0; i < N; i++)
{
sprintf(&sq[0], "select * from t1 where fl=%d;", i);
result += execute_select_query_and_check(conn, sq, x);
x = x * 16;
}
return result;
}
// 0 - if it does not exist
// -1 - in case of error
int check_if_t1_exists(MYSQL* conn)
{
MYSQL_RES* res;
MYSQL_ROW row;
int t1 = 0;
if (conn != NULL)
{
if (mysql_query(conn, "show tables;") != 0)
{
printf("Error: can't execute SQL-query: %s\n", mysql_error(conn));
t1 = 0;
}
else
{
res = mysql_store_result(conn);
if (res == NULL)
{
printf("Error: can't get the result description\n");
t1 = -1;
}
else
{
mysql_num_fields(res);
if (mysql_num_rows(res) > 0)
{
while ((row = mysql_fetch_row(res)) != NULL)
{
if ((row[0] != NULL ) && (strcmp(row[0], "t1") == 0 ))
{
t1 = 1;
}
}
}
}
mysql_free_result(res);
}
}
else
{
printf("FAILED: broken connection\n");
t1 = -1;
}
return t1;
}

View File

@ -0,0 +1,147 @@
/*
* Copyright (c) 2018 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2024-02-10
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "stopwatch.h"
#include <iomanip>
#include <iostream>
#include <sstream>
#include <ctime>
namespace base
{
StopWatch::StopWatch()
{
restart();
}
Duration StopWatch::lap() const
{
return {Clock::now() - m_start};
}
Duration StopWatch::restart()
{
TimePoint now = Clock::now();
Duration lap = now - m_start;
m_start = now;
return lap;
}
} // base
/********** OUTPUT ***********/
namespace
{
using namespace base;
struct TimeConvert
{
double div; // divide the value of the previous unit by this
std::string suffix; // milliseconds, hours etc.
double max_visual; // threashold to switch to the next unit
};
// Will never get to centuries because the duration is a long carrying nanoseconds
TimeConvert convert[]
{
{1, "ns", 1000}, {1000, "us", 1000}, {1000, "ms", 1000},
{1000, "s", 60}, {60, "min", 60}, {60, "hours", 24},
{24, "days", 365.25}, {365.25, "years", 10000},
{100, "centuries", std::numeric_limits<double>::max()}
};
int convert_size = sizeof(convert) / sizeof(convert[0]);
}
namespace base
{
std::pair<double, std::string> dur_to_human_readable(Duration dur)
{
using namespace std::chrono;
double time = duration_cast<nanoseconds>(dur).count();
bool negative = (time < 0) ? time = -time, true : false;
for (int i = 0; i <= convert_size; ++i)
{
if (i == convert_size)
{
return std::make_pair(negative ? -time : time,
convert[convert_size - 1].suffix);
}
time /= convert[i].div;
if (time < convert[i].max_visual)
{
return std::make_pair(negative ? -time : time, convert[i].suffix);
}
}
abort(); // should never get here
}
std::ostream& operator<<(std::ostream& os, Duration dur)
{
auto p = dur_to_human_readable(dur);
os << p.first << p.second;
return os;
}
// TODO: this will require some thought. time_point_to_string() for a system_clock is
// obvious, but not so for a steady_clock. Maybe TimePoint belongs to a system clock
// and sould be called something else here, and live in a time_measuring namespace.
std::string time_point_to_string(TimePoint tp, const std::string& fmt)
{
using namespace std::chrono;
std::time_t timet = system_clock::to_time_t(system_clock::now()
+ (tp - Clock::now()));
struct tm* ptm;
ptm = gmtime(&timet);
const int sz = 1024;
char buf[sz];
strftime(buf, sz, fmt.c_str(), ptm);
return buf;
}
std::ostream& operator<<(std::ostream& os, TimePoint tp)
{
os << time_point_to_string(tp);
return os;
}
void test_stopwatch_output(std::ostream& os)
{
long long dur[] =
{
400, // 400ns
5 * 1000, // 5us
500 * 1000, // 500us
1 * 1000000, // 1ms
700 * 1000000LL, // 700ms
5 * 1000000000LL, // 5s
200 * 1000000000LL, // 200s
5 * 60 * 1000000000LL, // 5m
45 * 60 * 1000000000LL, // 45m
130 * 60 * 1000000000LL, // 130m
24 * 60 * 60 * 1000000000LL, // 24 hours
3 * 24 * 60 * 60 * 1000000000LL, // 72 hours
180 * 24 * 60 * 60 * 1000000000LL, // 180 days
1000 * 24 * 60 * 60 * 1000000000LL // 1000 days
};
for (unsigned i = 0; i < sizeof(dur) / sizeof(dur[0]); ++i)
{
os << Duration(dur[i]) << std::endl;
}
}
} // base

View File

@ -0,0 +1,87 @@
#include "tcp_connection.h"
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <string.h>
#include <netdb.h>
namespace
{
static void set_port(struct sockaddr_storage* addr, uint16_t port)
{
if (addr->ss_family == AF_INET)
{
struct sockaddr_in* ip = (struct sockaddr_in*)addr;
ip->sin_port = htons(port);
}
else if (addr->ss_family == AF_INET6)
{
struct sockaddr_in6* ip = (struct sockaddr_in6*)addr;
ip->sin6_port = htons(port);
}
}
int open_network_socket(struct sockaddr_storage* addr, const char* host, uint16_t port)
{
struct addrinfo* ai = NULL, hint = {};
int so = -1;
hint.ai_socktype = SOCK_STREAM;
hint.ai_family = AF_UNSPEC;
hint.ai_flags = AI_ALL;
/* Take the first one */
if (getaddrinfo(host, NULL, &hint, &ai) == 0 && ai)
{
if ((so = socket(ai->ai_family, SOCK_STREAM, 0)) != -1)
{
memcpy(addr, ai->ai_addr, ai->ai_addrlen);
set_port(addr, port);
freeaddrinfo(ai);
}
}
return so;
}
}
namespace tcp
{
Connection::~Connection()
{
if (m_so != -1)
{
close(m_so);
}
}
bool Connection::connect(const char* host, uint16_t port)
{
struct sockaddr_storage addr;
if ((m_so = open_network_socket(&addr, host, port)) != -1)
{
if (::connect(m_so, (struct sockaddr*)&addr, sizeof(addr)) != 0)
{
close(m_so);
m_so = -1;
}
}
return m_so != -1;
}
int Connection::write(void* buf, size_t size)
{
return ::write(m_so, buf, size);
}
int Connection::read(void* buf, size_t size)
{
return ::read(m_so, buf, size);
}
}

View File

@ -0,0 +1,279 @@
#include <iostream>
#include "testconnections.h"
#include "maxadmin_operations.h"
#include "sql_t1.h"
#include "test_binlog_fnc.h"
int check_sha1(TestConnections* Test)
{
if (Test->binlog_master_gtid || Test->binlog_slave_gtid)
{
Test->tprintf("GTID is in use, do not check sha1\n");
return 0;
}
else
{
char sys[1024];
char* x;
int local_result = 0;
int i;
int exit_code;
char* s_maxscale;
char* s;
Test->set_timeout(50);
Test->tprintf("ls before FLUSH LOGS");
Test->tprintf("Maxscale");
Test->maxscales->ssh_node_f(0,
true,
"ls -la %s/mar-bin.0000*",
Test->maxscales->maxscale_binlog_dir[0]);
Test->tprintf("Master");
Test->set_timeout(50);
Test->maxscales->ssh_node(0, "ls -la /var/lib/mysql/mar-bin.0000*", false);
Test->tprintf("FLUSH LOGS");
Test->set_timeout(100);
local_result += execute_query(Test->repl->nodes[0], (char*) "FLUSH LOGS");
Test->tprintf("Logs flushed");
Test->set_timeout(100);
Test->repl->sync_slaves();
Test->tprintf("ls after first FLUSH LOGS");
Test->tprintf("Maxscale");
Test->set_timeout(50);
Test->maxscales->ssh_node_f(0,
true,
"ls -la %s/mar-bin.0000*",
Test->maxscales->maxscale_binlog_dir[0]);
Test->tprintf("Master");
Test->set_timeout(50);
Test->maxscales->ssh_node(0, "ls -la /var/lib/mysql/mar-bin.0000*", false);
Test->set_timeout(100);
Test->tprintf("FLUSH LOGS");
local_result += execute_query(Test->repl->nodes[0], (char*) "FLUSH LOGS");
Test->tprintf("Logs flushed");
Test->set_timeout(50);
Test->repl->sync_slaves();
Test->set_timeout(50);
Test->tprintf("ls before FLUSH LOGS");
Test->tprintf("Maxscale");
Test->maxscales->ssh_node_f(0,
true,
"ls -la %s/mar-bin.0000*",
Test->maxscales->maxscale_binlog_dir[0]);
Test->tprintf("Master");
Test->set_timeout(50);
Test->maxscales->ssh_node(0, "ls -la /var/lib/mysql/mar-bin.0000*", false);
for (i = 1; i < 3; i++)
{
Test->tprintf("FILE: 000000%d", i);
Test->set_timeout(50);
s_maxscale = Test->maxscales->ssh_node_output_f(0,
true,
&exit_code,
"sha1sum %s/mar-bin.00000%d",
Test->maxscales->maxscale_binlog_dir[0],
i);
if (s_maxscale != NULL)
{
x = strchr(s_maxscale, ' ');
if (x != NULL)
{
x[0] = 0;
}
Test->tprintf("Binlog checksum from Maxscale %s", s_maxscale);
}
sprintf(sys, "sha1sum /var/lib/mysql/mar-bin.00000%d", i);
Test->set_timeout(50);
s = Test->repl->ssh_node_output(0, sys, true, &exit_code);
if (s != NULL)
{
x = strchr(s, ' ');
if (x != NULL)
{
x[0] = 0;
}
Test->tprintf("Binlog checksum from master %s", s);
}
if (strcmp(s_maxscale, s) != 0)
{
Test->tprintf("Binlog from master checksum is not equal to binlog checksum from Maxscale node");
local_result++;
}
}
return local_result;
}
}
int start_transaction(TestConnections* Test)
{
int local_result = 0;
Test->tprintf("Transaction test");
Test->tprintf("Start transaction");
execute_query(Test->repl->nodes[0], (char*) "DELETE FROM t1 WHERE fl=10;");
local_result += execute_query(Test->repl->nodes[0], (char*) "START TRANSACTION");
local_result += execute_query(Test->repl->nodes[0], (char*) "SET autocommit = 0");
Test->tprintf("INSERT data");
local_result += execute_query(Test->repl->nodes[0], (char*) "INSERT INTO t1 VALUES(111, 10)");
Test->set_timeout(120);
Test->repl->sync_slaves();
return local_result;
}
void test_binlog(TestConnections* Test)
{
int i;
MYSQL* binlog;
Test->repl->connect();
Test->set_timeout(100);
Test->try_query(Test->repl->nodes[0], (char*) "SET NAMES utf8mb4");
Test->try_query(Test->repl->nodes[0], (char*) "set autocommit=1");
Test->try_query(Test->repl->nodes[0], (char*) "select USER()");
Test->set_timeout(100);
create_t1(Test->repl->nodes[0]);
Test->add_result(insert_into_t1(Test->repl->nodes[0], 4), "Data inserting to t1 failed");
Test->stop_timeout();
Test->tprintf("Waiting for replication to catch up");
Row row = get_row(Test->repl->nodes[0], "SELECT @@gtid_current_pos");
for (int i = 1; i < Test->repl->N; i++)
{
std::string query = "SELECT MASTER_GTID_WAIT('" + row[0] + "', 120)";
get_row(Test->repl->nodes[i], query);
}
Test->repl->disconnect();
Test->repl->connect();
for (i = 0; i < Test->repl->N; i++)
{
Test->tprintf("Checking data from node %d (%s)", i, Test->repl->IP[i]);
Test->set_timeout(100);
Test->add_result(select_from_t1(Test->repl->nodes[i], 4), "Selecting from t1 failed");
Test->stop_timeout();
}
Test->set_timeout(10);
Test->tprintf("First transaction test (with ROLLBACK)");
start_transaction(Test);
Test->set_timeout(50);
Test->tprintf("SELECT * FROM t1 WHERE fl=10, checking inserted values");
Test->add_result(execute_query_check_one(Test->repl->nodes[0],
(char*) "SELECT * FROM t1 WHERE fl=10",
"111"),
"SELECT check failed");
Test->tprintf("ROLLBACK");
Test->try_query(Test->repl->nodes[0], (char*) "ROLLBACK");
Test->tprintf("INSERT INTO t1 VALUES(112, 10)");
Test->try_query(Test->repl->nodes[0], (char*) "INSERT INTO t1 VALUES(112, 10)");
Test->try_query(Test->repl->nodes[0], (char*) "COMMIT");
Test->set_timeout(120);
Test->repl->sync_slaves();
Test->set_timeout(20);
Test->tprintf("SELECT * FROM t1 WHERE fl=10, checking inserted values");
Test->add_result(execute_query_check_one(Test->repl->nodes[0],
(char*) "SELECT * FROM t1 WHERE fl=10",
"112"),
"SELECT check failed");
Test->tprintf("SELECT * FROM t1 WHERE fl=10, checking inserted values from slave");
Test->add_result(execute_query_check_one(Test->repl->nodes[2],
(char*) "SELECT * FROM t1 WHERE fl=10",
"112"),
"SELECT check failed");
Test->tprintf("DELETE FROM t1 WHERE fl=10");
Test->try_query(Test->repl->nodes[0], (char*) "DELETE FROM t1 WHERE fl=10");
Test->tprintf("Checking t1");
Test->add_result(select_from_t1(Test->repl->nodes[0], 4), "SELECT from t1 failed");
Test->tprintf("Second transaction test (with COMMIT)");
start_transaction(Test);
Test->tprintf("COMMIT");
Test->try_query(Test->repl->nodes[0], (char*) "COMMIT");
Test->tprintf("SELECT, checking inserted values");
Test->add_result(execute_query_check_one(Test->repl->nodes[0],
(char*) "SELECT * FROM t1 WHERE fl=10",
"111"),
"SELECT check failed");
Test->tprintf("SELECT, checking inserted values from slave");
Test->add_result(execute_query_check_one(Test->repl->nodes[2],
(char*) "SELECT * FROM t1 WHERE fl=10",
"111"),
"SELECT check failed");
Test->tprintf("DELETE FROM t1 WHERE fl=10");
Test->try_query(Test->repl->nodes[0], (char*) "DELETE FROM t1 WHERE fl=10");
Test->stop_timeout();
Test->set_timeout(50);
Test->add_result(check_sha1(Test), "sha1 check failed");
Test->repl->close_connections();
Test->stop_timeout();
// test SLAVE STOP/START
Test->tprintf("test SLAVE STOP/START");
Test->set_timeout(100);
Test->repl->connect();
Test->tprintf("Dropping and re-creating t1");
Test->try_query(Test->repl->nodes[0], (char*) "DROP TABLE IF EXISTS t1");
create_t1(Test->repl->nodes[0]);
Test->tprintf("Connecting to MaxScale binlog router");
binlog = open_conn(Test->maxscales->binlog_port[0],
Test->maxscales->IP[0],
Test->repl->user_name,
Test->repl->password,
Test->ssl);
Test->tprintf("STOP SLAVE against Maxscale binlog");
execute_query(binlog, (char*) "STOP SLAVE");
Test->tprintf("FLUSH LOGS on master");
execute_query(Test->repl->nodes[0], (char*) "FLUSH LOGS");
execute_query(Test->repl->nodes[0], (char*) "FLUSH LOGS");
execute_query(Test->repl->nodes[0], (char*) "FLUSH LOGS");
execute_query(Test->repl->nodes[0], (char*) "FLUSH LOGS");
Test->add_result(insert_into_t1(Test->repl->nodes[0], 4), "INSERT into t1 failed");
Test->tprintf("START SLAVE against Maxscale binlog");
Test->try_query(binlog, (char*) "START SLAVE");
Test->set_timeout(120);
Test->repl->sync_slaves();
for (i = 0; i < Test->repl->N; i++)
{
Test->set_timeout(50);
Test->tprintf("Checking data from node %d (%s)", i, Test->repl->IP[i]);
Test->add_result(select_from_t1(Test->repl->nodes[i], 4), "SELECT from t1 failed");
}
Test->set_timeout(100);
Test->add_result(check_sha1(Test), "sha1 check failed");
Test->repl->close_connections();
Test->stop_timeout();
}

File diff suppressed because it is too large Load Diff