/* * Copyright (c) 2016 MariaDB Corporation Ab * * Use of this software is governed by the Business Source License included * in the LICENSE.TXT file and at www.mariadb.com/bsl11. * * Change Date: 2026-01-04 * * On the date above, in accordance with the Business Source License, use * of this software will be governed by version 2 or later of the General * Public License. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include typedef struct delivery_t { uint64_t dtag; amqp_message_t* message; struct delivery_t* next, * prev; } DELIVERY; typedef struct consumer_t { char* hostname, * vhost, * user, * passwd, * queue, * dbserver, * dbname, * dbuser, * dbpasswd; DELIVERY* query_stack; int port, dbport; } CONSUMER; static int all_ok; static FILE* out_fd; static CONSUMER* c_inst; static char* DB_DATABASE = "CREATE DATABASE IF NOT EXISTS %s;"; static char* DB_TABLE = "CREATE TABLE IF NOT EXISTS pairs (tag VARCHAR(64) PRIMARY KEY NOT NULL, query VARCHAR(2048), reply VARCHAR(2048), date_in DATETIME NOT NULL, date_out DATETIME DEFAULT NULL, counter INT DEFAULT 1)"; static char* DB_INSERT = "INSERT INTO pairs(tag, query, date_in) VALUES ('%s','%s',FROM_UNIXTIME(%s))"; static char* DB_UPDATE = "UPDATE pairs SET reply='%s', date_out=FROM_UNIXTIME(%s) WHERE tag='%s'"; static char* DB_INCREMENT = "UPDATE pairs SET counter = counter+1, date_out=FROM_UNIXTIME(%s) WHERE query='%s'"; void sighndl(int signum) { if (signum == SIGINT) { all_ok = 0; alarm(1); } } int handler(void* user, const char* section, const char* name, const char* value) { if (strcmp(section, "consumer") == 0) { if (strcmp(name, "hostname") == 0) { c_inst->hostname = strdup(value); } else if (strcmp(name, "vhost") == 0) { c_inst->vhost = strdup(value); } else if (strcmp(name, "port") == 0) { c_inst->port = atoi(value); } else if (strcmp(name, "user") == 0) { c_inst->user = strdup(value); } else if (strcmp(name, "passwd") == 0) { c_inst->passwd = strdup(value); } else if (strcmp(name, "queue") == 0) { c_inst->queue = strdup(value); } else if (strcmp(name, "dbserver") == 0) { c_inst->dbserver = strdup(value); } else if (strcmp(name, "dbport") == 0) { c_inst->dbport = atoi(value); } else if (strcmp(name, "dbname") == 0) { c_inst->dbname = strdup(value); } else if (strcmp(name, "dbuser") == 0) { c_inst->dbuser = strdup(value); } else if (strcmp(name, "dbpasswd") == 0) { c_inst->dbpasswd = strdup(value); } else if (strcmp(name, "logfile") == 0) { out_fd = fopen(value, "ab"); } } return 1; } int isPair(amqp_message_t* a, amqp_message_t* b) { int keylen = a->properties.correlation_id.len >= b->properties.correlation_id.len ? a->properties.correlation_id.len : b->properties.correlation_id.len; return strncmp(a->properties.correlation_id.bytes, b->properties.correlation_id.bytes, keylen) == 0 ? 1 : 0; } int connectToServer(MYSQL* server) { mysql_init(server); mysql_options(server, MYSQL_READ_DEFAULT_GROUP, "client"); mysql_options(server, MYSQL_OPT_USE_REMOTE_CONNECTION, 0); my_bool tr = 1; mysql_options(server, MYSQL_OPT_RECONNECT, &tr); MYSQL* result = mysql_real_connect(server, c_inst->dbserver, c_inst->dbuser, c_inst->dbpasswd, NULL, c_inst->dbport, NULL, 0); if (result == NULL) { fprintf(out_fd, "\33[31;1mError\33[0m: Could not connect to MySQL server: %s\n", mysql_error(server)); return 0; } int bsz = 1024; char* qstr = calloc(bsz, sizeof(char)); if (!qstr) { fprintf(stderr, "Fatal Error: Cannot allocate enough memory.\n"); return 0; } /**Connection ok, check that the database and table exist*/ memset(qstr, 0, bsz); sprintf(qstr, DB_DATABASE, c_inst->dbname); if (mysql_query(server, qstr)) { fprintf(stderr, "\33[31;1mError\33[0m: Could not send query MySQL server: %s\n", mysql_error(server)); } memset(qstr, 0, bsz); sprintf(qstr, "USE %s;", c_inst->dbname); if (mysql_query(server, qstr)) { fprintf(stderr, "\33[31;1mError\33[0m: Could not send query MySQL server: %s\n", mysql_error(server)); } memset(qstr, 0, bsz); sprintf(qstr, "%s", DB_TABLE); if (mysql_query(server, qstr)) { fprintf(stderr, "\33[31;1mError\33[0m: Could not send query MySQL server: %s\n", mysql_error(server)); } free(qstr); return 1; } int sendMessage(MYSQL* server, amqp_message_t* msg) { int buffsz = (int)((msg->body.len + 1) * 2 + 1) + (int)((msg->properties.correlation_id.len + 1) * 2 + 1) + strlen(DB_INSERT), rval = 0; char* saved; char* qstr = calloc(buffsz, sizeof(char)), * rawmsg = calloc((msg->body.len + 1), sizeof(char)), * clnmsg = calloc(((msg->body.len + 1) * 2 + 1), sizeof(char)), * rawdate = calloc((msg->body.len + 1), sizeof(char)), * clndate = calloc(((msg->body.len + 1) * 2 + 1), sizeof(char)), * rawtag = calloc((msg->properties.correlation_id.len + 1), sizeof(char)), * clntag = calloc(((msg->properties.correlation_id.len + 1) * 2 + 1), sizeof(char)); sprintf(qstr, "%.*s", (int)msg->body.len, (char*)msg->body.bytes); fprintf(out_fd, "Received: %s\n", qstr); char* ptr = strtok_r(qstr, "|", &saved); sprintf(rawdate, "%s", ptr); ptr = strtok_r(NULL, "\n\0", &saved); if (ptr == NULL) { fprintf(out_fd, "Message content not valid.\n"); rval = 1; goto cleanup; } sprintf(rawmsg, "%s", ptr); sprintf(rawtag, "%.*s", (int)msg->properties.correlation_id.len, (char*)msg->properties.correlation_id.bytes); memset(qstr, 0, buffsz); mysql_real_escape_string(server, clnmsg, rawmsg, strnlen(rawmsg, msg->body.len + 1)); mysql_real_escape_string(server, clndate, rawdate, strnlen(rawdate, msg->body.len + 1)); mysql_real_escape_string(server, clntag, rawtag, strnlen(rawtag, msg->properties.correlation_id.len + 1)); if (strncmp(msg->properties.message_id.bytes, "query", msg->properties.message_id.len) == 0) { sprintf(qstr, DB_INCREMENT, clndate, clnmsg); rval = mysql_query(server, qstr); if (mysql_affected_rows(server) == 0) { memset(qstr, 0, buffsz); sprintf(qstr, DB_INSERT, clntag, clnmsg, clndate); rval = mysql_query(server, qstr); } } else if (strncmp(msg->properties.message_id.bytes, "reply", msg->properties.message_id.len) == 0) { sprintf(qstr, DB_UPDATE, clnmsg, clndate, clntag); rval = mysql_query(server, qstr); } else { rval = 1; goto cleanup; } if (rval) { fprintf(stderr, "Could not send query to SQL server:%s\n", mysql_error(server)); goto cleanup; } cleanup: free(qstr); free(rawmsg); free(clnmsg); free(rawdate); free(clndate); free(rawtag); free(clntag); return rval; } int sendToServer(MYSQL* server, amqp_message_t* a, amqp_message_t* b) { amqp_message_t* msg, * reply; int buffsz = 2048; char* qstr = calloc(buffsz, sizeof(char)); if (!qstr) { fprintf(out_fd, "Fatal Error: Cannot allocate enough memory.\n"); free(qstr); return 0; } if (a->properties.message_id.len == strlen("query") && strncmp(a->properties.message_id.bytes, "query", a->properties.message_id.len) == 0) { msg = a; reply = b; } else { msg = b; reply = a; } printf("pair: %.*s\nquery: %.*s\nreply: %.*s\n", (int)msg->properties.correlation_id.len, (char*)msg->properties.correlation_id.bytes, (int)msg->body.len, (char*)msg->body.bytes, (int)reply->body.len, (char*)reply->body.bytes); if ((int)msg->body.len + (int)reply->body.len + (int)msg->properties.correlation_id.len + 50 >= buffsz) { char* qtmp = calloc(buffsz * 2, sizeof(char)); free(qstr); if (qtmp) { qstr = qtmp; buffsz *= 2; } else { fprintf(stderr, "Fatal Error: Cannot allocate enough memory.\n"); return 0; } } char* rawmsg = calloc((msg->body.len + 1), sizeof(char)), * clnmsg = calloc(((msg->body.len + 1) * 2 + 1), sizeof(char)), * rawrpl = calloc((reply->body.len + 1), sizeof(char)), * clnrpl = calloc(((reply->body.len + 1) * 2 + 1), sizeof(char)), * rawtag = calloc((msg->properties.correlation_id.len + 1), sizeof(char)), * clntag = calloc(((msg->properties.correlation_id.len + 1) * 2 + 1), sizeof(char)); sprintf(rawmsg, "%.*s", (int)msg->body.len, (char*)msg->body.bytes); sprintf(rawrpl, "%.*s", (int)reply->body.len, (char*)reply->body.bytes); sprintf(rawtag, "%.*s", (int)msg->properties.correlation_id.len, (char*)msg->properties.correlation_id.bytes); char* ptr; while ((ptr = strchr(rawmsg, '\n'))) { *ptr = ' '; } while ((ptr = strchr(rawrpl, '\n'))) { *ptr = ' '; } while ((ptr = strchr(rawtag, '\n'))) { *ptr = ' '; } mysql_real_escape_string(server, clnmsg, rawmsg, strnlen(rawmsg, msg->body.len + 1)); mysql_real_escape_string(server, clnrpl, rawrpl, strnlen(rawrpl, reply->body.len + 1)); mysql_real_escape_string(server, clntag, rawtag, strnlen(rawtag, msg->properties.correlation_id.len + 1)); sprintf(qstr, "INSERT INTO pairs VALUES ('%s','%s','%s');", clnmsg, clnrpl, clntag); free(rawmsg); free(clnmsg); free(rawrpl); free(clnrpl); free(rawtag); free(clntag); if (mysql_query(server, qstr)) { fprintf(stderr, "Could not send query to SQL server:%s\n", mysql_error(server)); free(qstr); return 0; } free(qstr); return 1; } int main(int argc, char** argv) { int channel = 1, status = AMQP_STATUS_OK, cnfnlen; amqp_socket_t* socket = NULL; amqp_connection_state_t conn; amqp_rpc_reply_t ret; amqp_message_t* reply = NULL; amqp_frame_t frame; struct timeval timeout; MYSQL db_inst; char ch, * cnfname = NULL, * cnfpath = NULL; static const char* fname = "consumer.cnf"; const char* default_path = "@CMAKE_INSTALL_PREFIX@/etc"; if ((c_inst = calloc(1, sizeof(CONSUMER))) == NULL) { fprintf(stderr, "Fatal Error: Cannot allocate enough memory.\n"); return 1; } if (signal(SIGINT, sighndl) == SIG_IGN) { signal(SIGINT, SIG_IGN); } while ((ch = getopt(argc, argv, "c:")) != -1) { switch (ch) { case 'c': cnfnlen = strlen(optarg); cnfpath = strdup(optarg); break; default: break; } } if (cnfpath == NULL) { cnfpath = strdup(default_path); cnfnlen = strlen(default_path); } cnfname = calloc(cnfnlen + strlen(fname) + 1, sizeof(char)); if (cnfpath) { /**Config file path as argument*/ strcpy(cnfname, cnfpath); if (cnfpath[cnfnlen - 1] != '/') { strcat(cnfname, "/"); } } strcat(cnfname, fname); timeout.tv_sec = 1; timeout.tv_usec = 0; all_ok = 1; out_fd = NULL; /**Parse the INI file*/ if (ini_parse(cnfname, handler, NULL) < 0) { /**Try to parse a config in the same directory*/ if (ini_parse(fname, handler, NULL) < 0) { fprintf(stderr, "Fatal Error: Error parsing configuration file!\n"); goto fatal_error; } } if (out_fd == NULL) { out_fd = stdout; } fprintf(out_fd, "\n--------------------------------------------------------------\n"); /**Confirm that all parameters were in the configuration file*/ if (!c_inst->hostname || !c_inst->vhost || !c_inst->user || !c_inst->passwd || !c_inst->dbpasswd || !c_inst->queue || !c_inst->dbserver || !c_inst->dbname || !c_inst->dbuser) { fprintf(stderr, "Fatal Error: Inadequate configuration file!\n"); goto fatal_error; } connectToServer(&db_inst); if ((conn = amqp_new_connection()) == NULL || (socket = amqp_tcp_socket_new(conn)) == NULL) { fprintf(stderr, "Fatal Error: Cannot create connection object or socket.\n"); goto fatal_error; } if (amqp_socket_open(socket, c_inst->hostname, c_inst->port)) { fprintf(stderr, "\33[31;1mRabbitMQ Error\33[0m: Cannot open socket.\n"); goto error; } ret = amqp_login(conn, c_inst->vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, c_inst->user, c_inst->passwd); if (ret.reply_type != AMQP_RESPONSE_NORMAL) { fprintf(stderr, "\33[31;1mRabbitMQ Error\33[0m: Cannot login to server.\n"); goto error; } amqp_channel_open(conn, channel); ret = amqp_get_rpc_reply(conn); if (ret.reply_type != AMQP_RESPONSE_NORMAL) { fprintf(stderr, "\33[31;1mRabbitMQ Error\33[0m: Cannot open channel.\n"); goto error; } reply = malloc(sizeof(amqp_message_t)); if (!reply) { fprintf(stderr, "Error: Cannot allocate enough memory.\n"); goto error; } amqp_basic_consume(conn, channel, amqp_cstring_bytes(c_inst->queue), amqp_empty_bytes, 0, 0, 0, amqp_empty_table); while (all_ok) { status = amqp_simple_wait_frame_noblock(conn, &frame, &timeout); /**No frames to read from server, possibly out of messages*/ if (status == AMQP_STATUS_TIMEOUT) { sleep(timeout.tv_sec); continue; } if (frame.payload.method.id == AMQP_BASIC_DELIVER_METHOD) { amqp_basic_deliver_t* decoded = (amqp_basic_deliver_t*)frame.payload.method.decoded; amqp_read_message(conn, channel, reply, 0); if (sendMessage(&db_inst, reply)) { fprintf(stderr, "\33[31;1mRabbitMQ Error\33[0m: Received malformed message.\n"); amqp_basic_reject(conn, channel, decoded->delivery_tag, 0); amqp_destroy_message(reply); } else { amqp_basic_ack(conn, channel, decoded->delivery_tag, 0); amqp_destroy_message(reply); } } else { fprintf(stderr, "\33[31;1mRabbitMQ Error\33[0m: Received method from server: %s\n", amqp_method_name(frame.payload.method.id)); all_ok = 0; goto error; } } fprintf(out_fd, "Shutting down...\n"); error: mysql_close(&db_inst); mysql_library_end(); if (c_inst && c_inst->query_stack) { while (c_inst->query_stack) { DELIVERY* d = c_inst->query_stack->next; amqp_destroy_message(c_inst->query_stack->message); free(c_inst->query_stack); c_inst->query_stack = d; } } amqp_channel_close(conn, channel, AMQP_REPLY_SUCCESS); amqp_connection_close(conn, AMQP_REPLY_SUCCESS); amqp_destroy_connection(conn); fatal_error: if (out_fd) { fclose(out_fd); } if (c_inst) { free(c_inst->hostname); free(c_inst->vhost); free(c_inst->user); free(c_inst->passwd); free(c_inst->queue); free(c_inst->dbserver); free(c_inst->dbname); free(c_inst->dbuser); free(c_inst->dbpasswd); free(c_inst); } return all_ok; }