2022-01-04 15:47:38 +02:00

632 lines
17 KiB
C

/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2026-01-04
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <ini.h>
#include <stdint.h>
#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
#include <mysql.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
typedef struct delivery_t
{
uint64_t dtag;
amqp_message_t* message;
struct delivery_t* next, * prev;
} DELIVERY;
typedef struct consumer_t
{
char* hostname, * vhost, * user, * passwd, * queue, * dbserver, * dbname, * dbuser, * dbpasswd;
DELIVERY* query_stack;
int port, dbport;
} CONSUMER;
static int all_ok;
static FILE* out_fd;
static CONSUMER* c_inst;
static char* DB_DATABASE = "CREATE DATABASE IF NOT EXISTS %s;";
static char* DB_TABLE
=
"CREATE TABLE IF NOT EXISTS pairs (tag VARCHAR(64) PRIMARY KEY NOT NULL, query VARCHAR(2048), reply VARCHAR(2048), date_in DATETIME NOT NULL, date_out DATETIME DEFAULT NULL, counter INT DEFAULT 1)";
static char* DB_INSERT = "INSERT INTO pairs(tag, query, date_in) VALUES ('%s','%s',FROM_UNIXTIME(%s))";
static char* DB_UPDATE = "UPDATE pairs SET reply='%s', date_out=FROM_UNIXTIME(%s) WHERE tag='%s'";
static char* DB_INCREMENT =
"UPDATE pairs SET counter = counter+1, date_out=FROM_UNIXTIME(%s) WHERE query='%s'";
void sighndl(int signum)
{
if (signum == SIGINT)
{
all_ok = 0;
alarm(1);
}
}
int handler(void* user,
const char* section,
const char* name,
const char* value)
{
if (strcmp(section, "consumer") == 0)
{
if (strcmp(name, "hostname") == 0)
{
c_inst->hostname = strdup(value);
}
else if (strcmp(name, "vhost") == 0)
{
c_inst->vhost = strdup(value);
}
else if (strcmp(name, "port") == 0)
{
c_inst->port = atoi(value);
}
else if (strcmp(name, "user") == 0)
{
c_inst->user = strdup(value);
}
else if (strcmp(name, "passwd") == 0)
{
c_inst->passwd = strdup(value);
}
else if (strcmp(name, "queue") == 0)
{
c_inst->queue = strdup(value);
}
else if (strcmp(name, "dbserver") == 0)
{
c_inst->dbserver = strdup(value);
}
else if (strcmp(name, "dbport") == 0)
{
c_inst->dbport = atoi(value);
}
else if (strcmp(name, "dbname") == 0)
{
c_inst->dbname = strdup(value);
}
else if (strcmp(name, "dbuser") == 0)
{
c_inst->dbuser = strdup(value);
}
else if (strcmp(name, "dbpasswd") == 0)
{
c_inst->dbpasswd = strdup(value);
}
else if (strcmp(name, "logfile") == 0)
{
out_fd = fopen(value, "ab");
}
}
return 1;
}
int isPair(amqp_message_t* a, amqp_message_t* b)
{
int keylen = a->properties.correlation_id.len
>= b->properties.correlation_id.len ?
a->properties.correlation_id.len :
b->properties.correlation_id.len;
return strncmp(a->properties.correlation_id.bytes,
b->properties.correlation_id.bytes,
keylen) == 0 ? 1 : 0;
}
int connectToServer(MYSQL* server)
{
mysql_init(server);
mysql_options(server, MYSQL_READ_DEFAULT_GROUP, "client");
mysql_options(server, MYSQL_OPT_USE_REMOTE_CONNECTION, 0);
my_bool tr = 1;
mysql_options(server, MYSQL_OPT_RECONNECT, &tr);
MYSQL* result = mysql_real_connect(server,
c_inst->dbserver,
c_inst->dbuser,
c_inst->dbpasswd,
NULL,
c_inst->dbport,
NULL,
0);
if (result == NULL)
{
fprintf(out_fd, "\33[31;1mError\33[0m: Could not connect to MySQL server: %s\n", mysql_error(server));
return 0;
}
int bsz = 1024;
char* qstr = calloc(bsz, sizeof(char));
if (!qstr)
{
fprintf(stderr, "Fatal Error: Cannot allocate enough memory.\n");
return 0;
}
/**Connection ok, check that the database and table exist*/
memset(qstr, 0, bsz);
sprintf(qstr, DB_DATABASE, c_inst->dbname);
if (mysql_query(server, qstr))
{
fprintf(stderr, "\33[31;1mError\33[0m: Could not send query MySQL server: %s\n", mysql_error(server));
}
memset(qstr, 0, bsz);
sprintf(qstr, "USE %s;", c_inst->dbname);
if (mysql_query(server, qstr))
{
fprintf(stderr, "\33[31;1mError\33[0m: Could not send query MySQL server: %s\n", mysql_error(server));
}
memset(qstr, 0, bsz);
sprintf(qstr, "%s", DB_TABLE);
if (mysql_query(server, qstr))
{
fprintf(stderr, "\33[31;1mError\33[0m: Could not send query MySQL server: %s\n", mysql_error(server));
}
free(qstr);
return 1;
}
int sendMessage(MYSQL* server, amqp_message_t* msg)
{
int buffsz = (int)((msg->body.len + 1) * 2 + 1)
+ (int)((msg->properties.correlation_id.len + 1) * 2 + 1)
+ strlen(DB_INSERT),
rval = 0;
char* saved;
char* qstr = calloc(buffsz, sizeof(char)),
* rawmsg = calloc((msg->body.len + 1), sizeof(char)),
* clnmsg = calloc(((msg->body.len + 1) * 2 + 1), sizeof(char)),
* rawdate = calloc((msg->body.len + 1), sizeof(char)),
* clndate = calloc(((msg->body.len + 1) * 2 + 1), sizeof(char)),
* rawtag = calloc((msg->properties.correlation_id.len + 1), sizeof(char)),
* clntag = calloc(((msg->properties.correlation_id.len + 1) * 2 + 1), sizeof(char));
sprintf(qstr, "%.*s", (int)msg->body.len, (char*)msg->body.bytes);
fprintf(out_fd, "Received: %s\n", qstr);
char* ptr = strtok_r(qstr, "|", &saved);
sprintf(rawdate, "%s", ptr);
ptr = strtok_r(NULL, "\n\0", &saved);
if (ptr == NULL)
{
fprintf(out_fd, "Message content not valid.\n");
rval = 1;
goto cleanup;
}
sprintf(rawmsg, "%s", ptr);
sprintf(rawtag,
"%.*s",
(int)msg->properties.correlation_id.len,
(char*)msg->properties.correlation_id.bytes);
memset(qstr, 0, buffsz);
mysql_real_escape_string(server, clnmsg, rawmsg, strnlen(rawmsg, msg->body.len + 1));
mysql_real_escape_string(server, clndate, rawdate, strnlen(rawdate, msg->body.len + 1));
mysql_real_escape_string(server, clntag, rawtag, strnlen(rawtag, msg->properties.correlation_id.len + 1));
if (strncmp(msg->properties.message_id.bytes,
"query",
msg->properties.message_id.len) == 0)
{
sprintf(qstr, DB_INCREMENT, clndate, clnmsg);
rval = mysql_query(server, qstr);
if (mysql_affected_rows(server) == 0)
{
memset(qstr, 0, buffsz);
sprintf(qstr, DB_INSERT, clntag, clnmsg, clndate);
rval = mysql_query(server, qstr);
}
}
else if (strncmp(msg->properties.message_id.bytes,
"reply",
msg->properties.message_id.len) == 0)
{
sprintf(qstr, DB_UPDATE, clnmsg, clndate, clntag);
rval = mysql_query(server, qstr);
}
else
{
rval = 1;
goto cleanup;
}
if (rval)
{
fprintf(stderr, "Could not send query to SQL server:%s\n", mysql_error(server));
goto cleanup;
}
cleanup:
free(qstr);
free(rawmsg);
free(clnmsg);
free(rawdate);
free(clndate);
free(rawtag);
free(clntag);
return rval;
}
int sendToServer(MYSQL* server, amqp_message_t* a, amqp_message_t* b)
{
amqp_message_t* msg, * reply;
int buffsz = 2048;
char* qstr = calloc(buffsz, sizeof(char));
if (!qstr)
{
fprintf(out_fd, "Fatal Error: Cannot allocate enough memory.\n");
free(qstr);
return 0;
}
if (a->properties.message_id.len == strlen("query")
&& strncmp(a->properties.message_id.bytes,
"query",
a->properties.message_id.len) == 0)
{
msg = a;
reply = b;
}
else
{
msg = b;
reply = a;
}
printf("pair: %.*s\nquery: %.*s\nreply: %.*s\n",
(int)msg->properties.correlation_id.len,
(char*)msg->properties.correlation_id.bytes,
(int)msg->body.len,
(char*)msg->body.bytes,
(int)reply->body.len,
(char*)reply->body.bytes);
if ((int)msg->body.len
+ (int)reply->body.len
+ (int)msg->properties.correlation_id.len + 50 >= buffsz)
{
char* qtmp = calloc(buffsz * 2, sizeof(char));
free(qstr);
if (qtmp)
{
qstr = qtmp;
buffsz *= 2;
}
else
{
fprintf(stderr, "Fatal Error: Cannot allocate enough memory.\n");
return 0;
}
}
char* rawmsg = calloc((msg->body.len + 1), sizeof(char)),
* clnmsg = calloc(((msg->body.len + 1) * 2 + 1), sizeof(char)),
* rawrpl = calloc((reply->body.len + 1), sizeof(char)),
* clnrpl = calloc(((reply->body.len + 1) * 2 + 1), sizeof(char)),
* rawtag = calloc((msg->properties.correlation_id.len + 1), sizeof(char)),
* clntag = calloc(((msg->properties.correlation_id.len + 1) * 2 + 1), sizeof(char));
sprintf(rawmsg, "%.*s", (int)msg->body.len, (char*)msg->body.bytes);
sprintf(rawrpl, "%.*s", (int)reply->body.len, (char*)reply->body.bytes);
sprintf(rawtag,
"%.*s",
(int)msg->properties.correlation_id.len,
(char*)msg->properties.correlation_id.bytes);
char* ptr;
while ((ptr = strchr(rawmsg, '\n')))
{
*ptr = ' ';
}
while ((ptr = strchr(rawrpl, '\n')))
{
*ptr = ' ';
}
while ((ptr = strchr(rawtag, '\n')))
{
*ptr = ' ';
}
mysql_real_escape_string(server, clnmsg, rawmsg, strnlen(rawmsg, msg->body.len + 1));
mysql_real_escape_string(server, clnrpl, rawrpl, strnlen(rawrpl, reply->body.len + 1));
mysql_real_escape_string(server, clntag, rawtag, strnlen(rawtag, msg->properties.correlation_id.len + 1));
sprintf(qstr, "INSERT INTO pairs VALUES ('%s','%s','%s');", clnmsg, clnrpl, clntag);
free(rawmsg);
free(clnmsg);
free(rawrpl);
free(clnrpl);
free(rawtag);
free(clntag);
if (mysql_query(server, qstr))
{
fprintf(stderr, "Could not send query to SQL server:%s\n", mysql_error(server));
free(qstr);
return 0;
}
free(qstr);
return 1;
}
int main(int argc, char** argv)
{
int channel = 1, status = AMQP_STATUS_OK, cnfnlen;
amqp_socket_t* socket = NULL;
amqp_connection_state_t conn;
amqp_rpc_reply_t ret;
amqp_message_t* reply = NULL;
amqp_frame_t frame;
struct timeval timeout;
MYSQL db_inst;
char ch, * cnfname = NULL, * cnfpath = NULL;
static const char* fname = "consumer.cnf";
const char* default_path = "@CMAKE_INSTALL_PREFIX@/etc";
if ((c_inst = calloc(1, sizeof(CONSUMER))) == NULL)
{
fprintf(stderr, "Fatal Error: Cannot allocate enough memory.\n");
return 1;
}
if (signal(SIGINT, sighndl) == SIG_IGN)
{
signal(SIGINT, SIG_IGN);
}
while ((ch = getopt(argc, argv, "c:")) != -1)
{
switch (ch)
{
case 'c':
cnfnlen = strlen(optarg);
cnfpath = strdup(optarg);
break;
default:
break;
}
}
if (cnfpath == NULL)
{
cnfpath = strdup(default_path);
cnfnlen = strlen(default_path);
}
cnfname = calloc(cnfnlen + strlen(fname) + 1, sizeof(char));
if (cnfpath)
{
/**Config file path as argument*/
strcpy(cnfname, cnfpath);
if (cnfpath[cnfnlen - 1] != '/')
{
strcat(cnfname, "/");
}
}
strcat(cnfname, fname);
timeout.tv_sec = 1;
timeout.tv_usec = 0;
all_ok = 1;
out_fd = NULL;
/**Parse the INI file*/
if (ini_parse(cnfname, handler, NULL) < 0)
{
/**Try to parse a config in the same directory*/
if (ini_parse(fname, handler, NULL) < 0)
{
fprintf(stderr, "Fatal Error: Error parsing configuration file!\n");
goto fatal_error;
}
}
if (out_fd == NULL)
{
out_fd = stdout;
}
fprintf(out_fd, "\n--------------------------------------------------------------\n");
/**Confirm that all parameters were in the configuration file*/
if (!c_inst->hostname || !c_inst->vhost || !c_inst->user
|| !c_inst->passwd || !c_inst->dbpasswd || !c_inst->queue
|| !c_inst->dbserver || !c_inst->dbname || !c_inst->dbuser)
{
fprintf(stderr, "Fatal Error: Inadequate configuration file!\n");
goto fatal_error;
}
connectToServer(&db_inst);
if ((conn = amqp_new_connection()) == NULL
|| (socket = amqp_tcp_socket_new(conn)) == NULL)
{
fprintf(stderr, "Fatal Error: Cannot create connection object or socket.\n");
goto fatal_error;
}
if (amqp_socket_open(socket, c_inst->hostname, c_inst->port))
{
fprintf(stderr, "\33[31;1mRabbitMQ Error\33[0m: Cannot open socket.\n");
goto error;
}
ret = amqp_login(conn, c_inst->vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, c_inst->user, c_inst->passwd);
if (ret.reply_type != AMQP_RESPONSE_NORMAL)
{
fprintf(stderr, "\33[31;1mRabbitMQ Error\33[0m: Cannot login to server.\n");
goto error;
}
amqp_channel_open(conn, channel);
ret = amqp_get_rpc_reply(conn);
if (ret.reply_type != AMQP_RESPONSE_NORMAL)
{
fprintf(stderr, "\33[31;1mRabbitMQ Error\33[0m: Cannot open channel.\n");
goto error;
}
reply = malloc(sizeof(amqp_message_t));
if (!reply)
{
fprintf(stderr, "Error: Cannot allocate enough memory.\n");
goto error;
}
amqp_basic_consume(conn,
channel,
amqp_cstring_bytes(c_inst->queue),
amqp_empty_bytes,
0,
0,
0,
amqp_empty_table);
while (all_ok)
{
status = amqp_simple_wait_frame_noblock(conn, &frame, &timeout);
/**No frames to read from server, possibly out of messages*/
if (status == AMQP_STATUS_TIMEOUT)
{
sleep(timeout.tv_sec);
continue;
}
if (frame.payload.method.id == AMQP_BASIC_DELIVER_METHOD)
{
amqp_basic_deliver_t* decoded = (amqp_basic_deliver_t*)frame.payload.method.decoded;
amqp_read_message(conn, channel, reply, 0);
if (sendMessage(&db_inst, reply))
{
fprintf(stderr, "\33[31;1mRabbitMQ Error\33[0m: Received malformed message.\n");
amqp_basic_reject(conn, channel, decoded->delivery_tag, 0);
amqp_destroy_message(reply);
}
else
{
amqp_basic_ack(conn, channel, decoded->delivery_tag, 0);
amqp_destroy_message(reply);
}
}
else
{
fprintf(stderr,
"\33[31;1mRabbitMQ Error\33[0m: Received method from server: %s\n",
amqp_method_name(frame.payload.method.id));
all_ok = 0;
goto error;
}
}
fprintf(out_fd, "Shutting down...\n");
error:
mysql_close(&db_inst);
mysql_library_end();
if (c_inst && c_inst->query_stack)
{
while (c_inst->query_stack)
{
DELIVERY* d = c_inst->query_stack->next;
amqp_destroy_message(c_inst->query_stack->message);
free(c_inst->query_stack);
c_inst->query_stack = d;
}
}
amqp_channel_close(conn, channel, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
fatal_error:
if (out_fd)
{
fclose(out_fd);
}
if (c_inst)
{
free(c_inst->hostname);
free(c_inst->vhost);
free(c_inst->user);
free(c_inst->passwd);
free(c_inst->queue);
free(c_inst->dbserver);
free(c_inst->dbname);
free(c_inst->dbuser);
free(c_inst->dbpasswd);
free(c_inst);
}
return all_ok;
}