632 lines
17 KiB
C
632 lines
17 KiB
C
/*
|
|
* Copyright (c) 2016 MariaDB Corporation Ab
|
|
*
|
|
* Use of this software is governed by the Business Source License included
|
|
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
|
*
|
|
* Change Date: 2025-03-08
|
|
*
|
|
* On the date above, in accordance with the Business Source License, use
|
|
* of this software will be governed by version 2 or later of the General
|
|
* Public License.
|
|
*/
|
|
|
|
#include <unistd.h>
|
|
#include <stdlib.h>
|
|
#include <stdio.h>
|
|
#include <string.h>
|
|
#include <ini.h>
|
|
#include <stdint.h>
|
|
#include <amqp_tcp_socket.h>
|
|
#include <amqp.h>
|
|
#include <amqp_framing.h>
|
|
#include <mysql.h>
|
|
#include <signal.h>
|
|
#include <sys/types.h>
|
|
#include <sys/stat.h>
|
|
#include <fcntl.h>
|
|
|
|
typedef struct delivery_t
|
|
{
|
|
uint64_t dtag;
|
|
amqp_message_t* message;
|
|
struct delivery_t* next, * prev;
|
|
} DELIVERY;
|
|
|
|
typedef struct consumer_t
|
|
{
|
|
char* hostname, * vhost, * user, * passwd, * queue, * dbserver, * dbname, * dbuser, * dbpasswd;
|
|
DELIVERY* query_stack;
|
|
int port, dbport;
|
|
} CONSUMER;
|
|
|
|
static int all_ok;
|
|
static FILE* out_fd;
|
|
static CONSUMER* c_inst;
|
|
static char* DB_DATABASE = "CREATE DATABASE IF NOT EXISTS %s;";
|
|
static char* DB_TABLE
|
|
=
|
|
"CREATE TABLE IF NOT EXISTS pairs (tag VARCHAR(64) PRIMARY KEY NOT NULL, query VARCHAR(2048), reply VARCHAR(2048), date_in DATETIME NOT NULL, date_out DATETIME DEFAULT NULL, counter INT DEFAULT 1)";
|
|
static char* DB_INSERT = "INSERT INTO pairs(tag, query, date_in) VALUES ('%s','%s',FROM_UNIXTIME(%s))";
|
|
static char* DB_UPDATE = "UPDATE pairs SET reply='%s', date_out=FROM_UNIXTIME(%s) WHERE tag='%s'";
|
|
static char* DB_INCREMENT =
|
|
"UPDATE pairs SET counter = counter+1, date_out=FROM_UNIXTIME(%s) WHERE query='%s'";
|
|
|
|
void sighndl(int signum)
|
|
{
|
|
if (signum == SIGINT)
|
|
{
|
|
all_ok = 0;
|
|
alarm(1);
|
|
}
|
|
}
|
|
|
|
int handler(void* user,
|
|
const char* section,
|
|
const char* name,
|
|
const char* value)
|
|
{
|
|
if (strcmp(section, "consumer") == 0)
|
|
{
|
|
|
|
if (strcmp(name, "hostname") == 0)
|
|
{
|
|
c_inst->hostname = strdup(value);
|
|
}
|
|
else if (strcmp(name, "vhost") == 0)
|
|
{
|
|
c_inst->vhost = strdup(value);
|
|
}
|
|
else if (strcmp(name, "port") == 0)
|
|
{
|
|
c_inst->port = atoi(value);
|
|
}
|
|
else if (strcmp(name, "user") == 0)
|
|
{
|
|
c_inst->user = strdup(value);
|
|
}
|
|
else if (strcmp(name, "passwd") == 0)
|
|
{
|
|
c_inst->passwd = strdup(value);
|
|
}
|
|
else if (strcmp(name, "queue") == 0)
|
|
{
|
|
c_inst->queue = strdup(value);
|
|
}
|
|
else if (strcmp(name, "dbserver") == 0)
|
|
{
|
|
c_inst->dbserver = strdup(value);
|
|
}
|
|
else if (strcmp(name, "dbport") == 0)
|
|
{
|
|
c_inst->dbport = atoi(value);
|
|
}
|
|
else if (strcmp(name, "dbname") == 0)
|
|
{
|
|
c_inst->dbname = strdup(value);
|
|
}
|
|
else if (strcmp(name, "dbuser") == 0)
|
|
{
|
|
c_inst->dbuser = strdup(value);
|
|
}
|
|
else if (strcmp(name, "dbpasswd") == 0)
|
|
{
|
|
c_inst->dbpasswd = strdup(value);
|
|
}
|
|
else if (strcmp(name, "logfile") == 0)
|
|
{
|
|
out_fd = fopen(value, "ab");
|
|
}
|
|
}
|
|
|
|
return 1;
|
|
}
|
|
|
|
int isPair(amqp_message_t* a, amqp_message_t* b)
|
|
{
|
|
int keylen = a->properties.correlation_id.len
|
|
>= b->properties.correlation_id.len ?
|
|
a->properties.correlation_id.len :
|
|
b->properties.correlation_id.len;
|
|
|
|
return strncmp(a->properties.correlation_id.bytes,
|
|
b->properties.correlation_id.bytes,
|
|
keylen) == 0 ? 1 : 0;
|
|
}
|
|
|
|
int connectToServer(MYSQL* server)
|
|
{
|
|
|
|
|
|
mysql_init(server);
|
|
|
|
mysql_options(server, MYSQL_READ_DEFAULT_GROUP, "client");
|
|
mysql_options(server, MYSQL_OPT_USE_REMOTE_CONNECTION, 0);
|
|
my_bool tr = 1;
|
|
mysql_options(server, MYSQL_OPT_RECONNECT, &tr);
|
|
|
|
|
|
MYSQL* result = mysql_real_connect(server,
|
|
c_inst->dbserver,
|
|
c_inst->dbuser,
|
|
c_inst->dbpasswd,
|
|
NULL,
|
|
c_inst->dbport,
|
|
NULL,
|
|
0);
|
|
|
|
|
|
if (result == NULL)
|
|
{
|
|
fprintf(out_fd, "\33[31;1mError\33[0m: Could not connect to MySQL server: %s\n", mysql_error(server));
|
|
return 0;
|
|
}
|
|
|
|
int bsz = 1024;
|
|
char* qstr = calloc(bsz, sizeof(char));
|
|
|
|
|
|
if (!qstr)
|
|
{
|
|
fprintf(stderr, "Fatal Error: Cannot allocate enough memory.\n");
|
|
return 0;
|
|
}
|
|
|
|
|
|
/**Connection ok, check that the database and table exist*/
|
|
|
|
memset(qstr, 0, bsz);
|
|
sprintf(qstr, DB_DATABASE, c_inst->dbname);
|
|
if (mysql_query(server, qstr))
|
|
{
|
|
fprintf(stderr, "\33[31;1mError\33[0m: Could not send query MySQL server: %s\n", mysql_error(server));
|
|
}
|
|
memset(qstr, 0, bsz);
|
|
sprintf(qstr, "USE %s;", c_inst->dbname);
|
|
if (mysql_query(server, qstr))
|
|
{
|
|
fprintf(stderr, "\33[31;1mError\33[0m: Could not send query MySQL server: %s\n", mysql_error(server));
|
|
}
|
|
|
|
memset(qstr, 0, bsz);
|
|
sprintf(qstr, "%s", DB_TABLE);
|
|
if (mysql_query(server, qstr))
|
|
{
|
|
fprintf(stderr, "\33[31;1mError\33[0m: Could not send query MySQL server: %s\n", mysql_error(server));
|
|
}
|
|
|
|
free(qstr);
|
|
return 1;
|
|
}
|
|
|
|
int sendMessage(MYSQL* server, amqp_message_t* msg)
|
|
{
|
|
int buffsz = (int)((msg->body.len + 1) * 2 + 1)
|
|
+ (int)((msg->properties.correlation_id.len + 1) * 2 + 1)
|
|
+ strlen(DB_INSERT),
|
|
rval = 0;
|
|
char* saved;
|
|
char* qstr = calloc(buffsz, sizeof(char)),
|
|
* rawmsg = calloc((msg->body.len + 1), sizeof(char)),
|
|
* clnmsg = calloc(((msg->body.len + 1) * 2 + 1), sizeof(char)),
|
|
* rawdate = calloc((msg->body.len + 1), sizeof(char)),
|
|
* clndate = calloc(((msg->body.len + 1) * 2 + 1), sizeof(char)),
|
|
* rawtag = calloc((msg->properties.correlation_id.len + 1), sizeof(char)),
|
|
* clntag = calloc(((msg->properties.correlation_id.len + 1) * 2 + 1), sizeof(char));
|
|
|
|
|
|
|
|
sprintf(qstr, "%.*s", (int)msg->body.len, (char*)msg->body.bytes);
|
|
fprintf(out_fd, "Received: %s\n", qstr);
|
|
char* ptr = strtok_r(qstr, "|", &saved);
|
|
sprintf(rawdate, "%s", ptr);
|
|
ptr = strtok_r(NULL, "\n\0", &saved);
|
|
if (ptr == NULL)
|
|
{
|
|
fprintf(out_fd, "Message content not valid.\n");
|
|
rval = 1;
|
|
goto cleanup;
|
|
}
|
|
sprintf(rawmsg, "%s", ptr);
|
|
sprintf(rawtag,
|
|
"%.*s",
|
|
(int)msg->properties.correlation_id.len,
|
|
(char*)msg->properties.correlation_id.bytes);
|
|
memset(qstr, 0, buffsz);
|
|
|
|
mysql_real_escape_string(server, clnmsg, rawmsg, strnlen(rawmsg, msg->body.len + 1));
|
|
mysql_real_escape_string(server, clndate, rawdate, strnlen(rawdate, msg->body.len + 1));
|
|
mysql_real_escape_string(server, clntag, rawtag, strnlen(rawtag, msg->properties.correlation_id.len + 1));
|
|
|
|
if (strncmp(msg->properties.message_id.bytes,
|
|
"query",
|
|
msg->properties.message_id.len) == 0)
|
|
{
|
|
|
|
sprintf(qstr, DB_INCREMENT, clndate, clnmsg);
|
|
rval = mysql_query(server, qstr);
|
|
|
|
if (mysql_affected_rows(server) == 0)
|
|
{
|
|
memset(qstr, 0, buffsz);
|
|
sprintf(qstr, DB_INSERT, clntag, clnmsg, clndate);
|
|
rval = mysql_query(server, qstr);
|
|
}
|
|
}
|
|
else if (strncmp(msg->properties.message_id.bytes,
|
|
"reply",
|
|
msg->properties.message_id.len) == 0)
|
|
{
|
|
|
|
sprintf(qstr, DB_UPDATE, clnmsg, clndate, clntag);
|
|
rval = mysql_query(server, qstr);
|
|
}
|
|
else
|
|
{
|
|
rval = 1;
|
|
goto cleanup;
|
|
}
|
|
|
|
|
|
if (rval)
|
|
{
|
|
fprintf(stderr, "Could not send query to SQL server:%s\n", mysql_error(server));
|
|
goto cleanup;
|
|
}
|
|
|
|
cleanup:
|
|
free(qstr);
|
|
free(rawmsg);
|
|
free(clnmsg);
|
|
free(rawdate);
|
|
free(clndate);
|
|
free(rawtag);
|
|
free(clntag);
|
|
|
|
return rval;
|
|
}
|
|
|
|
int sendToServer(MYSQL* server, amqp_message_t* a, amqp_message_t* b)
|
|
{
|
|
|
|
amqp_message_t* msg, * reply;
|
|
int buffsz = 2048;
|
|
char* qstr = calloc(buffsz, sizeof(char));
|
|
|
|
if (!qstr)
|
|
{
|
|
fprintf(out_fd, "Fatal Error: Cannot allocate enough memory.\n");
|
|
free(qstr);
|
|
return 0;
|
|
}
|
|
|
|
if (a->properties.message_id.len == strlen("query")
|
|
&& strncmp(a->properties.message_id.bytes,
|
|
"query",
|
|
a->properties.message_id.len) == 0)
|
|
{
|
|
|
|
msg = a;
|
|
reply = b;
|
|
}
|
|
else
|
|
{
|
|
|
|
msg = b;
|
|
reply = a;
|
|
}
|
|
|
|
|
|
printf("pair: %.*s\nquery: %.*s\nreply: %.*s\n",
|
|
(int)msg->properties.correlation_id.len,
|
|
(char*)msg->properties.correlation_id.bytes,
|
|
(int)msg->body.len,
|
|
(char*)msg->body.bytes,
|
|
(int)reply->body.len,
|
|
(char*)reply->body.bytes);
|
|
|
|
if ((int)msg->body.len
|
|
+ (int)reply->body.len
|
|
+ (int)msg->properties.correlation_id.len + 50 >= buffsz)
|
|
{
|
|
char* qtmp = calloc(buffsz * 2, sizeof(char));
|
|
free(qstr);
|
|
|
|
if (qtmp)
|
|
{
|
|
qstr = qtmp;
|
|
buffsz *= 2;
|
|
}
|
|
else
|
|
{
|
|
fprintf(stderr, "Fatal Error: Cannot allocate enough memory.\n");
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
char* rawmsg = calloc((msg->body.len + 1), sizeof(char)),
|
|
* clnmsg = calloc(((msg->body.len + 1) * 2 + 1), sizeof(char)),
|
|
* rawrpl = calloc((reply->body.len + 1), sizeof(char)),
|
|
* clnrpl = calloc(((reply->body.len + 1) * 2 + 1), sizeof(char)),
|
|
* rawtag = calloc((msg->properties.correlation_id.len + 1), sizeof(char)),
|
|
* clntag = calloc(((msg->properties.correlation_id.len + 1) * 2 + 1), sizeof(char));
|
|
|
|
sprintf(rawmsg, "%.*s", (int)msg->body.len, (char*)msg->body.bytes);
|
|
sprintf(rawrpl, "%.*s", (int)reply->body.len, (char*)reply->body.bytes);
|
|
sprintf(rawtag,
|
|
"%.*s",
|
|
(int)msg->properties.correlation_id.len,
|
|
(char*)msg->properties.correlation_id.bytes);
|
|
|
|
char* ptr;
|
|
while ((ptr = strchr(rawmsg, '\n')))
|
|
{
|
|
*ptr = ' ';
|
|
}
|
|
while ((ptr = strchr(rawrpl, '\n')))
|
|
{
|
|
*ptr = ' ';
|
|
}
|
|
while ((ptr = strchr(rawtag, '\n')))
|
|
{
|
|
*ptr = ' ';
|
|
}
|
|
|
|
mysql_real_escape_string(server, clnmsg, rawmsg, strnlen(rawmsg, msg->body.len + 1));
|
|
mysql_real_escape_string(server, clnrpl, rawrpl, strnlen(rawrpl, reply->body.len + 1));
|
|
mysql_real_escape_string(server, clntag, rawtag, strnlen(rawtag, msg->properties.correlation_id.len + 1));
|
|
|
|
|
|
|
|
sprintf(qstr, "INSERT INTO pairs VALUES ('%s','%s','%s');", clnmsg, clnrpl, clntag);
|
|
free(rawmsg);
|
|
free(clnmsg);
|
|
free(rawrpl);
|
|
free(clnrpl);
|
|
free(rawtag);
|
|
free(clntag);
|
|
|
|
if (mysql_query(server, qstr))
|
|
{
|
|
fprintf(stderr, "Could not send query to SQL server:%s\n", mysql_error(server));
|
|
free(qstr);
|
|
return 0;
|
|
}
|
|
|
|
free(qstr);
|
|
return 1;
|
|
}
|
|
int main(int argc, char** argv)
|
|
{
|
|
int channel = 1, status = AMQP_STATUS_OK, cnfnlen;
|
|
amqp_socket_t* socket = NULL;
|
|
amqp_connection_state_t conn;
|
|
amqp_rpc_reply_t ret;
|
|
amqp_message_t* reply = NULL;
|
|
amqp_frame_t frame;
|
|
struct timeval timeout;
|
|
MYSQL db_inst;
|
|
char ch, * cnfname = NULL, * cnfpath = NULL;
|
|
static const char* fname = "consumer.cnf";
|
|
const char* default_path = "@CMAKE_INSTALL_PREFIX@/etc";
|
|
|
|
if ((c_inst = calloc(1, sizeof(CONSUMER))) == NULL)
|
|
{
|
|
fprintf(stderr, "Fatal Error: Cannot allocate enough memory.\n");
|
|
return 1;
|
|
}
|
|
|
|
if (signal(SIGINT, sighndl) == SIG_IGN)
|
|
{
|
|
signal(SIGINT, SIG_IGN);
|
|
}
|
|
|
|
while ((ch = getopt(argc, argv, "c:")) != -1)
|
|
{
|
|
switch (ch)
|
|
{
|
|
case 'c':
|
|
cnfnlen = strlen(optarg);
|
|
cnfpath = strdup(optarg);
|
|
break;
|
|
|
|
default:
|
|
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (cnfpath == NULL)
|
|
{
|
|
cnfpath = strdup(default_path);
|
|
cnfnlen = strlen(default_path);
|
|
}
|
|
|
|
cnfname = calloc(cnfnlen + strlen(fname) + 1, sizeof(char));
|
|
|
|
if (cnfpath)
|
|
{
|
|
|
|
/**Config file path as argument*/
|
|
strcpy(cnfname, cnfpath);
|
|
if (cnfpath[cnfnlen - 1] != '/')
|
|
{
|
|
strcat(cnfname, "/");
|
|
}
|
|
}
|
|
|
|
strcat(cnfname, fname);
|
|
|
|
timeout.tv_sec = 1;
|
|
timeout.tv_usec = 0;
|
|
all_ok = 1;
|
|
out_fd = NULL;
|
|
|
|
|
|
|
|
/**Parse the INI file*/
|
|
if (ini_parse(cnfname, handler, NULL) < 0)
|
|
{
|
|
|
|
/**Try to parse a config in the same directory*/
|
|
if (ini_parse(fname, handler, NULL) < 0)
|
|
{
|
|
fprintf(stderr, "Fatal Error: Error parsing configuration file!\n");
|
|
goto fatal_error;
|
|
}
|
|
}
|
|
|
|
if (out_fd == NULL)
|
|
{
|
|
out_fd = stdout;
|
|
}
|
|
|
|
fprintf(out_fd, "\n--------------------------------------------------------------\n");
|
|
|
|
/**Confirm that all parameters were in the configuration file*/
|
|
if (!c_inst->hostname || !c_inst->vhost || !c_inst->user
|
|
|| !c_inst->passwd || !c_inst->dbpasswd || !c_inst->queue
|
|
|| !c_inst->dbserver || !c_inst->dbname || !c_inst->dbuser)
|
|
{
|
|
fprintf(stderr, "Fatal Error: Inadequate configuration file!\n");
|
|
goto fatal_error;
|
|
}
|
|
|
|
connectToServer(&db_inst);
|
|
|
|
if ((conn = amqp_new_connection()) == NULL
|
|
|| (socket = amqp_tcp_socket_new(conn)) == NULL)
|
|
{
|
|
fprintf(stderr, "Fatal Error: Cannot create connection object or socket.\n");
|
|
goto fatal_error;
|
|
}
|
|
|
|
if (amqp_socket_open(socket, c_inst->hostname, c_inst->port))
|
|
{
|
|
fprintf(stderr, "\33[31;1mRabbitMQ Error\33[0m: Cannot open socket.\n");
|
|
goto error;
|
|
}
|
|
|
|
ret = amqp_login(conn, c_inst->vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, c_inst->user, c_inst->passwd);
|
|
|
|
if (ret.reply_type != AMQP_RESPONSE_NORMAL)
|
|
{
|
|
fprintf(stderr, "\33[31;1mRabbitMQ Error\33[0m: Cannot login to server.\n");
|
|
goto error;
|
|
}
|
|
|
|
amqp_channel_open(conn, channel);
|
|
ret = amqp_get_rpc_reply(conn);
|
|
|
|
if (ret.reply_type != AMQP_RESPONSE_NORMAL)
|
|
{
|
|
fprintf(stderr, "\33[31;1mRabbitMQ Error\33[0m: Cannot open channel.\n");
|
|
goto error;
|
|
}
|
|
|
|
reply = malloc(sizeof(amqp_message_t));
|
|
if (!reply)
|
|
{
|
|
fprintf(stderr, "Error: Cannot allocate enough memory.\n");
|
|
goto error;
|
|
}
|
|
amqp_basic_consume(conn,
|
|
channel,
|
|
amqp_cstring_bytes(c_inst->queue),
|
|
amqp_empty_bytes,
|
|
0,
|
|
0,
|
|
0,
|
|
amqp_empty_table);
|
|
|
|
while (all_ok)
|
|
{
|
|
|
|
status = amqp_simple_wait_frame_noblock(conn, &frame, &timeout);
|
|
|
|
/**No frames to read from server, possibly out of messages*/
|
|
if (status == AMQP_STATUS_TIMEOUT)
|
|
{
|
|
sleep(timeout.tv_sec);
|
|
continue;
|
|
}
|
|
|
|
if (frame.payload.method.id == AMQP_BASIC_DELIVER_METHOD)
|
|
{
|
|
|
|
amqp_basic_deliver_t* decoded = (amqp_basic_deliver_t*)frame.payload.method.decoded;
|
|
|
|
amqp_read_message(conn, channel, reply, 0);
|
|
|
|
if (sendMessage(&db_inst, reply))
|
|
{
|
|
|
|
fprintf(stderr, "\33[31;1mRabbitMQ Error\33[0m: Received malformed message.\n");
|
|
amqp_basic_reject(conn, channel, decoded->delivery_tag, 0);
|
|
amqp_destroy_message(reply);
|
|
}
|
|
else
|
|
{
|
|
|
|
amqp_basic_ack(conn, channel, decoded->delivery_tag, 0);
|
|
amqp_destroy_message(reply);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
fprintf(stderr,
|
|
"\33[31;1mRabbitMQ Error\33[0m: Received method from server: %s\n",
|
|
amqp_method_name(frame.payload.method.id));
|
|
all_ok = 0;
|
|
goto error;
|
|
}
|
|
}
|
|
|
|
fprintf(out_fd, "Shutting down...\n");
|
|
error:
|
|
|
|
mysql_close(&db_inst);
|
|
mysql_library_end();
|
|
if (c_inst && c_inst->query_stack)
|
|
{
|
|
|
|
while (c_inst->query_stack)
|
|
{
|
|
DELIVERY* d = c_inst->query_stack->next;
|
|
amqp_destroy_message(c_inst->query_stack->message);
|
|
free(c_inst->query_stack);
|
|
c_inst->query_stack = d;
|
|
}
|
|
}
|
|
|
|
amqp_channel_close(conn, channel, AMQP_REPLY_SUCCESS);
|
|
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
|
|
amqp_destroy_connection(conn);
|
|
fatal_error:
|
|
|
|
if (out_fd)
|
|
{
|
|
fclose(out_fd);
|
|
}
|
|
|
|
|
|
if (c_inst)
|
|
{
|
|
|
|
free(c_inst->hostname);
|
|
free(c_inst->vhost);
|
|
free(c_inst->user);
|
|
free(c_inst->passwd);
|
|
free(c_inst->queue);
|
|
free(c_inst->dbserver);
|
|
free(c_inst->dbname);
|
|
free(c_inst->dbuser);
|
|
free(c_inst->dbpasswd);
|
|
free(c_inst);
|
|
}
|
|
|
|
|
|
|
|
return all_ok;
|
|
}
|