add tests
This commit is contained in:

committed by
Markus Mäkelä

parent
dbfd631fed
commit
8c6ca38a8a
254
maxscale-system-test/cdc_client.cpp
Normal file
254
maxscale-system-test/cdc_client.cpp
Normal file
@ -0,0 +1,254 @@
|
||||
/**
|
||||
* @file cdc_client.cpp Test of CDC protocol (avro listener)
|
||||
* - configure binlog router setup, avro router, avro listener
|
||||
* - connect to avro listener
|
||||
* - start INSERT load thread
|
||||
* - read data from avro listener, comapre it with inserted data
|
||||
*/
|
||||
|
||||
#include <iostream>
|
||||
#include <unistd.h>
|
||||
#include "testconnections.h"
|
||||
|
||||
#include <stdio.h>
|
||||
#include <sys/socket.h>
|
||||
#include <arpa/inet.h>
|
||||
#include <stdlib.h>
|
||||
#include <netdb.h>
|
||||
#include <string.h>
|
||||
#include <openssl/sha.h>
|
||||
#include "maxinfo_func.h"
|
||||
#include "sql_t1.h"
|
||||
#include <sys/epoll.h>
|
||||
|
||||
using namespace std;
|
||||
char reg_str[] = "REGISTER UUID=XXX-YYY_YYY, TYPE=JSON";
|
||||
char req_str[] = "REQUEST-DATA test.t1";
|
||||
int insert_val = 0;
|
||||
bool exit_flag = false;
|
||||
|
||||
void *query_thread(void *ptr);
|
||||
|
||||
/**
|
||||
* @brief cdc_com Connects to avro listenet by CDC protocal, read data, compare data with inserted data
|
||||
* @param Test TestConnections object
|
||||
* @return true if test PASSED
|
||||
*/
|
||||
bool cdc_com(TestConnections *Test)
|
||||
{
|
||||
int max_inserted_val = Test->smoke ? 25 : 100;
|
||||
int sock = create_tcp_socket();
|
||||
char *ip = get_ip(Test->maxscale_IP);
|
||||
|
||||
if (ip == NULL)
|
||||
{
|
||||
Test->tprintf("Can't get IP");
|
||||
return false;
|
||||
}
|
||||
|
||||
struct sockaddr_in *remote = (struct sockaddr_in *)malloc(sizeof(struct sockaddr_in *));
|
||||
remote->sin_family = AF_INET;
|
||||
int tmpres = inet_pton(AF_INET, ip, (void *)(&(remote->sin_addr.s_addr)));
|
||||
|
||||
if ( tmpres < 0)
|
||||
{
|
||||
Test->tprintf("Can't set remote->sin_addr.s_addr");
|
||||
return false;
|
||||
}
|
||||
else if (tmpres == 0)
|
||||
{
|
||||
Test->tprintf("%s is not a valid IP address", ip);
|
||||
return false;
|
||||
}
|
||||
|
||||
remote->sin_port = htons(4001);
|
||||
|
||||
if (connect(sock, (struct sockaddr *)remote, sizeof(struct sockaddr)) < 0)
|
||||
{
|
||||
Test->tprintf("Could not connect");
|
||||
return false;
|
||||
}
|
||||
|
||||
char *get = cdc_auth_srt((char *) "skysql", (char *) "skysql");
|
||||
Test->tprintf("Auth string: %s", get);
|
||||
|
||||
//Send the query to the server
|
||||
if (send_so(sock, get) != 0)
|
||||
{
|
||||
Test->tprintf("Cat't send data to scoket");
|
||||
return false;
|
||||
}
|
||||
|
||||
char buf1[1024];
|
||||
recv(sock, buf1, 1024, 0);
|
||||
|
||||
//Send the query to the server
|
||||
if (send_so(sock, reg_str) != 0)
|
||||
{
|
||||
Test->tprintf("Cat't send data to scoket");
|
||||
return false;
|
||||
}
|
||||
|
||||
recv(sock, buf1, 1024, 0);
|
||||
|
||||
//Send the query to the server
|
||||
if (send_so(sock, req_str) != 0)
|
||||
{
|
||||
Test->tprintf("Cat't send data to scoket");
|
||||
return false;
|
||||
}
|
||||
|
||||
Test->stop_timeout();
|
||||
int epfd = epoll_create(1);
|
||||
static struct epoll_event ev;
|
||||
ev.events = EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP;
|
||||
ev.data.fd = sock;
|
||||
|
||||
if (epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev) < 0)
|
||||
{
|
||||
Test->tprintf("Error in epoll_ctl! errno = %d, %s", errno, strerror(errno));
|
||||
return false;
|
||||
}
|
||||
|
||||
epoll_event events[2];
|
||||
setnonblocking(sock);
|
||||
|
||||
int inserted_val = 0;
|
||||
int ignore_first = 2;
|
||||
|
||||
while (inserted_val < max_inserted_val)
|
||||
{
|
||||
Test->set_timeout(60);
|
||||
// wait for something to do...
|
||||
Test->tprintf("epoll_wait");
|
||||
int nfds = epoll_wait(epfd, &events[0], 1, -1);
|
||||
if (nfds < 0)
|
||||
{
|
||||
Test->tprintf("Error in epoll_wait! errno = %d, %s", errno, strerror(errno));
|
||||
return false;
|
||||
}
|
||||
|
||||
if (nfds > 0)
|
||||
{
|
||||
// for each ready socket
|
||||
//for(int i = 0; i < nfds; i++)
|
||||
//{
|
||||
int fd = events[0].data.fd;
|
||||
char *json = read_sc(sock);
|
||||
Test->tprintf("%s", json);
|
||||
//}
|
||||
if (ignore_first > 0)
|
||||
{
|
||||
ignore_first--; // ignoring first reads
|
||||
if (ignore_first == 0)
|
||||
{
|
||||
// first reads done, starting inserting
|
||||
insert_val = 10;
|
||||
inserted_val = insert_val;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// trying to check JSON
|
||||
long long int x1;
|
||||
long long int fl;
|
||||
get_x_fl_from_json(json, &x1, &fl);
|
||||
Test->tprintf("data received, x1=%lld fl=%lld", x1, fl);
|
||||
|
||||
if (x1 != inserted_val || fl != inserted_val + 100)
|
||||
{
|
||||
Test->tprintf("wrong values in JSON");
|
||||
}
|
||||
|
||||
inserted_val++;
|
||||
insert_val = inserted_val;
|
||||
}
|
||||
|
||||
free(json);
|
||||
}
|
||||
else
|
||||
{
|
||||
Test->tprintf("waiting");
|
||||
}
|
||||
}
|
||||
|
||||
free(remote);
|
||||
free(ip);
|
||||
close(sock);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
static TestConnections *Test;
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
|
||||
Test = new TestConnections(argc, argv);
|
||||
|
||||
Test->set_timeout(600);
|
||||
Test->stop_maxscale();
|
||||
|
||||
// Remove old data files and make sure that port 4001 is open
|
||||
Test->ssh_maxscale(true, "rm -rf /var/lib/maxscale/avro;"
|
||||
"iptables -n -L INPUT|grep 4001 || iptables -I INPUT -p tcp --dport 4001 -j ACCEPT;");
|
||||
|
||||
Test->repl->connect();
|
||||
execute_query(Test->repl->nodes[0], "DROP TABLE IF EXISTS t1;");
|
||||
Test->repl->close_connections();
|
||||
sleep(5);
|
||||
|
||||
Test->start_binlog();
|
||||
|
||||
Test->set_timeout(120);
|
||||
Test->stop_maxscale();
|
||||
|
||||
Test->ssh_maxscale(true, "rm -rf /var/lib/maxscale/avro");
|
||||
|
||||
Test->set_timeout(120);
|
||||
Test->start_maxscale();
|
||||
|
||||
Test->set_timeout(60);
|
||||
Test->repl->connect();
|
||||
create_t1(Test->repl->nodes[0]);
|
||||
execute_query(Test->repl->nodes[0], (char *) "INSERT INTO t1 VALUES (111, 222)");
|
||||
Test->repl->close_connections();
|
||||
|
||||
Test->tprintf("Waiting for binlogs to be processed...");
|
||||
Test->stop_timeout();
|
||||
sleep(15);
|
||||
|
||||
Test->set_timeout(120);
|
||||
|
||||
pthread_t thread;
|
||||
pthread_create(&thread, NULL, query_thread, NULL);
|
||||
|
||||
Test->add_result(!cdc_com(Test), "Failed to execute test");
|
||||
|
||||
exit_flag = true;
|
||||
|
||||
pthread_join(thread, NULL);
|
||||
|
||||
int rval = Test->global_result;
|
||||
delete Test;
|
||||
return rval;
|
||||
}
|
||||
|
||||
void *query_thread(void *ptr)
|
||||
{
|
||||
|
||||
Test->repl->connect();
|
||||
|
||||
while (!exit_flag)
|
||||
{
|
||||
if (insert_val != 0)
|
||||
{
|
||||
char str[256];
|
||||
sprintf(str, "INSERT INTO t1 VALUES (%d, %d)", insert_val, insert_val + 100);
|
||||
insert_val = 0;
|
||||
execute_query(Test->repl->nodes[0], str);
|
||||
}
|
||||
}
|
||||
|
||||
Test->repl->close_connections();
|
||||
}
|
Reference in New Issue
Block a user