RabbitMQ consumer client
This commit is contained in:
538
rabbitmq_consumer/consumer.c
Normal file
538
rabbitmq_consumer/consumer.c
Normal file
@ -0,0 +1,538 @@
|
||||
#include <unistd.h>
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <ini.h>
|
||||
#include <stdint.h>
|
||||
#include <amqp_tcp_socket.h>
|
||||
#include <amqp.h>
|
||||
#include <amqp_framing.h>
|
||||
#include <mysql.h>
|
||||
#include <signal.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/stat.h>
|
||||
#include <fcntl.h>
|
||||
#ifdef CONFIG_IN_ETC
|
||||
#define CONFIG 1
|
||||
#else
|
||||
#define CONFIG 0
|
||||
#endif
|
||||
|
||||
#ifndef CONSUMER_CONFIG_PREFIX
|
||||
#define CONSUMER_CONFIG_PREFIX "/usr/share/consumer"
|
||||
#endif
|
||||
|
||||
typedef struct delivery_t
|
||||
{
|
||||
uint64_t dtag;
|
||||
amqp_message_t* message;
|
||||
struct delivery_t *next,*prev;
|
||||
}DELIVERY;
|
||||
|
||||
typedef struct consumer_t
|
||||
{
|
||||
char *hostname,*vhost,*user,*passwd,*queue,*dbserver,*dbname,*dbuser,*dbpasswd;
|
||||
DELIVERY* query_stack;
|
||||
int port,dbport;
|
||||
}CONSUMER;
|
||||
|
||||
static int all_ok;
|
||||
static FILE* out_fd;
|
||||
static CONSUMER* c_inst;
|
||||
static char* DB_DATABASE = "CREATE DATABASE IF NOT EXISTS %s;";
|
||||
static char* DB_TABLE = "CREATE TABLE IF NOT EXISTS pairs (tag VARCHAR(64) PRIMARY KEY NOT NULL, query VARCHAR(2048), reply VARCHAR(2048), date_in DATETIME NOT NULL, date_out DATETIME DEFAULT NULL, counter INT DEFAULT 1)";
|
||||
static char* DB_INSERT = "INSERT INTO pairs(tag, query, date_in) VALUES ('%s','%s',FROM_UNIXTIME(%s))";
|
||||
static char* DB_UPDATE = "UPDATE pairs SET reply='%s', date_out=FROM_UNIXTIME(%s) WHERE tag='%s'";
|
||||
static char* DB_INCREMENT = "UPDATE pairs SET counter = counter+1, date_out=FROM_UNIXTIME(%s) WHERE query='%s'";
|
||||
|
||||
void sighndl(int signum)
|
||||
{
|
||||
if(signum == SIGINT){
|
||||
all_ok = 0;
|
||||
alarm(1);
|
||||
}
|
||||
}
|
||||
|
||||
int handler(void* user, const char* section, const char* name,
|
||||
const char* value)
|
||||
{
|
||||
if(strcmp(section,"consumer") == 0){
|
||||
|
||||
if(strcmp(name,"hostname") == 0){
|
||||
c_inst->hostname = strdup(value);
|
||||
}else if(strcmp(name,"vhost") == 0){
|
||||
c_inst->vhost = strdup(value);
|
||||
}else if(strcmp(name,"port") == 0){
|
||||
c_inst->port = atoi(value);
|
||||
}else if(strcmp(name,"user") == 0){
|
||||
c_inst->user = strdup(value);
|
||||
}else if(strcmp(name,"passwd") == 0){
|
||||
c_inst->passwd = strdup(value);
|
||||
}else if(strcmp(name,"queue") == 0){
|
||||
c_inst->queue = strdup(value);
|
||||
}else if(strcmp(name,"dbserver") == 0){
|
||||
c_inst->dbserver = strdup(value);
|
||||
}else if(strcmp(name,"dbport") == 0){
|
||||
c_inst->dbport = atoi(value);
|
||||
}else if(strcmp(name,"dbname") == 0){
|
||||
c_inst->dbname = strdup(value);
|
||||
}else if(strcmp(name,"dbuser") == 0){
|
||||
c_inst->dbuser = strdup(value);
|
||||
}else if(strcmp(name,"dbpasswd") == 0){
|
||||
c_inst->dbpasswd = strdup(value);
|
||||
}else if(strcmp(name,"logfile") == 0){
|
||||
out_fd = fopen(value,"ab");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
int isPair(amqp_message_t* a, amqp_message_t* b)
|
||||
{
|
||||
int keylen = a->properties.correlation_id.len >=
|
||||
b->properties.correlation_id.len ?
|
||||
a->properties.correlation_id.len :
|
||||
b->properties.correlation_id.len;
|
||||
|
||||
return strncmp(a->properties.correlation_id.bytes,
|
||||
b->properties.correlation_id.bytes,
|
||||
keylen) == 0 ? 1 : 0;
|
||||
}
|
||||
|
||||
int connectToServer(MYSQL* server)
|
||||
{
|
||||
|
||||
|
||||
mysql_init(server);
|
||||
|
||||
mysql_options(server,MYSQL_READ_DEFAULT_GROUP,"client");
|
||||
mysql_options(server,MYSQL_OPT_USE_REMOTE_CONNECTION,0);
|
||||
my_bool tr = 1;
|
||||
mysql_options(server,MYSQL_OPT_RECONNECT,&tr);
|
||||
|
||||
|
||||
MYSQL* result = mysql_real_connect(server,
|
||||
c_inst->dbserver,
|
||||
c_inst->dbuser,
|
||||
c_inst->dbpasswd,
|
||||
NULL,
|
||||
c_inst->dbport,
|
||||
NULL,
|
||||
0);
|
||||
|
||||
|
||||
if(result==NULL){
|
||||
fprintf(out_fd,"Error: Could not connect to MySQL server: %s\n",mysql_error(server));
|
||||
return 0;
|
||||
}
|
||||
|
||||
int bsz = 1024;
|
||||
char *qstr = calloc(bsz,sizeof(char));
|
||||
|
||||
|
||||
if(!qstr){
|
||||
fprintf(stderr, "Fatal Error: Cannot allocate enough memory.\n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
/**Connection ok, check that the database and table exist*/
|
||||
|
||||
memset(qstr,0,bsz);
|
||||
sprintf(qstr,DB_DATABASE,c_inst->dbname);
|
||||
if(mysql_query(server,qstr)){
|
||||
fprintf(stderr,"Error: Could not send query MySQL server: %s\n",mysql_error(server));
|
||||
}
|
||||
memset(qstr,0,bsz);
|
||||
sprintf(qstr,"USE %s;",c_inst->dbname);
|
||||
if(mysql_query(server,qstr)){
|
||||
fprintf(stderr,"Error: Could not send query MySQL server: %s\n",mysql_error(server));
|
||||
}
|
||||
|
||||
memset(qstr,0,bsz);
|
||||
sprintf(qstr,DB_TABLE);
|
||||
if(mysql_query(server,qstr)){
|
||||
fprintf(stderr,"Error: Could not send query MySQL server: %s\n",mysql_error(server));
|
||||
}
|
||||
|
||||
free(qstr);
|
||||
return 1;
|
||||
}
|
||||
|
||||
int sendMessage(MYSQL* server, amqp_message_t* msg)
|
||||
{
|
||||
int buffsz = (int)((msg->body.len + 1)*2+1) +
|
||||
(int)((msg->properties.correlation_id.len + 1)*2+1) +
|
||||
strlen(DB_INSERT),
|
||||
rval = 0;
|
||||
char *qstr = calloc(buffsz,sizeof(char)),
|
||||
*rawmsg = calloc((msg->body.len + 1),sizeof(char)),
|
||||
*clnmsg = calloc(((msg->body.len + 1)*2+1),sizeof(char)),
|
||||
*rawdate = calloc((msg->body.len + 1),sizeof(char)),
|
||||
*clndate = calloc(((msg->body.len + 1)*2+1),sizeof(char)),
|
||||
*rawtag = calloc((msg->properties.correlation_id.len + 1),sizeof(char)),
|
||||
*clntag = calloc(((msg->properties.correlation_id.len + 1)*2+1),sizeof(char));
|
||||
|
||||
|
||||
|
||||
sprintf(qstr,"%.*s",(int)msg->body.len,(char *)msg->body.bytes);
|
||||
fprintf(out_fd,"Received: %s\n",qstr);
|
||||
char *ptr = strtok(qstr,"|");
|
||||
sprintf(rawdate,"%s",ptr);
|
||||
ptr = strtok(NULL,"\n\0");
|
||||
if(ptr == NULL){
|
||||
fprintf(out_fd,"Message content not valid.\n");
|
||||
rval = 1;
|
||||
goto cleanup;
|
||||
}
|
||||
sprintf(rawmsg,"%s",ptr);
|
||||
sprintf(rawtag,"%.*s",(int)msg->properties.correlation_id.len,(char *)msg->properties.correlation_id.bytes);
|
||||
memset(qstr,0,buffsz);
|
||||
|
||||
mysql_real_escape_string(server,clnmsg,rawmsg,strnlen(rawmsg,msg->body.len + 1));
|
||||
mysql_real_escape_string(server,clndate,rawdate,strnlen(rawdate,msg->body.len + 1));
|
||||
mysql_real_escape_string(server,clntag,rawtag,strnlen(rawtag,msg->properties.correlation_id.len + 1));
|
||||
|
||||
if(strncmp(msg->properties.message_id.bytes,
|
||||
"query",msg->properties.message_id.len) == 0)
|
||||
{
|
||||
|
||||
sprintf(qstr,DB_INCREMENT,clndate,clnmsg);
|
||||
rval = mysql_query(server,qstr);
|
||||
|
||||
if(mysql_affected_rows(server) == 0){
|
||||
memset(qstr,0,buffsz);
|
||||
sprintf(qstr,DB_INSERT,clntag,clnmsg,clndate);
|
||||
rval = mysql_query(server,qstr);
|
||||
}
|
||||
|
||||
}else if(strncmp(msg->properties.message_id.bytes,
|
||||
"reply",msg->properties.message_id.len) == 0){
|
||||
|
||||
sprintf(qstr,DB_UPDATE,clnmsg,clndate,clntag);
|
||||
rval = mysql_query(server,qstr);
|
||||
|
||||
}else{
|
||||
rval = 1;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
|
||||
if(rval){
|
||||
fprintf(stderr,"Could not send query to SQL server:%s\n",mysql_error(server));
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
cleanup:
|
||||
free(qstr);
|
||||
free(rawmsg);
|
||||
free(clnmsg);
|
||||
free(rawdate);
|
||||
free(clndate);
|
||||
free(rawtag);
|
||||
free(clntag);
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
int sendToServer(MYSQL* server, amqp_message_t* a, amqp_message_t* b){
|
||||
|
||||
amqp_message_t *msg, *reply;
|
||||
int buffsz = 2048;
|
||||
char *qstr = calloc(buffsz,sizeof(char));
|
||||
|
||||
if(!qstr){
|
||||
fprintf(out_fd, "Fatal Error: Cannot allocate enough memory.\n");
|
||||
free(qstr);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if( a->properties.message_id.len == strlen("query") &&
|
||||
strncmp(a->properties.message_id.bytes,"query",
|
||||
a->properties.message_id.len) == 0){
|
||||
|
||||
msg = a;
|
||||
reply = b;
|
||||
|
||||
}else{
|
||||
|
||||
msg = b;
|
||||
reply = a;
|
||||
|
||||
}
|
||||
|
||||
|
||||
printf("pair: %.*s\nquery: %.*s\nreply: %.*s\n",
|
||||
(int)msg->properties.correlation_id.len,
|
||||
(char *)msg->properties.correlation_id.bytes,
|
||||
(int)msg->body.len,
|
||||
(char *)msg->body.bytes,
|
||||
(int)reply->body.len,
|
||||
(char *)reply->body.bytes);
|
||||
|
||||
if((int)msg->body.len +
|
||||
(int)reply->body.len +
|
||||
(int)msg->properties.correlation_id.len + 50 >= buffsz)
|
||||
{
|
||||
char *qtmp = calloc(buffsz*2,sizeof(char));
|
||||
free(qstr);
|
||||
|
||||
if(qtmp){
|
||||
qstr = qtmp;
|
||||
buffsz *= 2;
|
||||
}else{
|
||||
fprintf(stderr, "Fatal Error: Cannot allocate enough memory.\n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
char *rawmsg = calloc((msg->body.len + 1),sizeof(char)),
|
||||
*clnmsg = calloc(((msg->body.len + 1)*2+1),sizeof(char)),
|
||||
*rawrpl = calloc((reply->body.len + 1),sizeof(char)),
|
||||
*clnrpl = calloc(((reply->body.len + 1)*2+1),sizeof(char)),
|
||||
*rawtag = calloc((msg->properties.correlation_id.len + 1),sizeof(char)),
|
||||
*clntag = calloc(((msg->properties.correlation_id.len + 1)*2+1),sizeof(char));
|
||||
|
||||
sprintf(rawmsg,"%.*s",(int)msg->body.len,(char *)msg->body.bytes);
|
||||
sprintf(rawrpl,"%.*s",(int)reply->body.len,(char *)reply->body.bytes);
|
||||
sprintf(rawtag,"%.*s",(int)msg->properties.correlation_id.len,(char *)msg->properties.correlation_id.bytes);
|
||||
|
||||
char *ptr;
|
||||
while((ptr = strchr(rawmsg,'\n'))){
|
||||
*ptr = ' ';
|
||||
}
|
||||
while((ptr = strchr(rawrpl,'\n'))){
|
||||
*ptr = ' ';
|
||||
}
|
||||
while((ptr = strchr(rawtag,'\n'))){
|
||||
*ptr = ' ';
|
||||
}
|
||||
|
||||
mysql_real_escape_string(server,clnmsg,rawmsg,strnlen(rawmsg,msg->body.len + 1));
|
||||
mysql_real_escape_string(server,clnrpl,rawrpl,strnlen(rawrpl,reply->body.len + 1));
|
||||
mysql_real_escape_string(server,clntag,rawtag,strnlen(rawtag,msg->properties.correlation_id.len + 1));
|
||||
|
||||
|
||||
|
||||
sprintf(qstr,"INSERT INTO pairs VALUES ('%s','%s','%s');",clnmsg,clnrpl,clntag);
|
||||
free(rawmsg);
|
||||
free(clnmsg);
|
||||
free(rawrpl);
|
||||
free(clnrpl);
|
||||
free(rawtag);
|
||||
free(clntag);
|
||||
|
||||
if(mysql_query(server,qstr)){
|
||||
fprintf(stderr,"Could not send query to SQL server:%s\n",mysql_error(server));
|
||||
free(qstr);
|
||||
return 0;
|
||||
}
|
||||
|
||||
free(qstr);
|
||||
return 1;
|
||||
}
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
int channel = 1, status = AMQP_STATUS_OK, cnfnlen;
|
||||
amqp_socket_t *socket = NULL;
|
||||
amqp_connection_state_t conn;
|
||||
amqp_rpc_reply_t ret;
|
||||
amqp_message_t *reply = NULL;
|
||||
amqp_frame_t frame;
|
||||
struct timeval timeout;
|
||||
MYSQL db_inst;
|
||||
char ch, *cnfname = NULL, *cnfpath = NULL;
|
||||
static const char* fname = "consumer.cnf";
|
||||
static const char* fprefix = CONSUMER_CONFIG_PREFIX;
|
||||
|
||||
if(signal(SIGINT,sighndl) == SIG_IGN){
|
||||
signal(SIGINT,SIG_IGN);
|
||||
}
|
||||
|
||||
while((ch = getopt(argc,argv,"c:"))!= -1){
|
||||
switch(ch){
|
||||
case 'c':
|
||||
cnfnlen = strlen(optarg);
|
||||
cnfpath = strdup(optarg);
|
||||
break;
|
||||
default:
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
cnfname = calloc(cnfnlen + strlen(fname) + 1,sizeof(char));
|
||||
|
||||
if(cnfpath){
|
||||
|
||||
/**Config file path as argument*/
|
||||
strcpy(cnfname,cnfpath);
|
||||
if(cnfpath[cnfnlen-1] != '/'){
|
||||
strcat(cnfname,"/");
|
||||
}
|
||||
|
||||
}else if(CONFIG){
|
||||
|
||||
/**Config file location was set at install*/
|
||||
strcat(cnfname,fprefix);
|
||||
if(cnfname[strlen(cnfname) - 1] != '/'){
|
||||
strcat(cnfname,"/");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
strcat(cnfname,fname);
|
||||
|
||||
timeout.tv_sec = 1;
|
||||
timeout.tv_usec = 0;
|
||||
all_ok = 1;
|
||||
out_fd = NULL;
|
||||
|
||||
if((c_inst = calloc(1,sizeof(CONSUMER))) == NULL){
|
||||
fprintf(stderr, "Fatal Error: Cannot allocate enough memory.\n");
|
||||
return 1;
|
||||
}
|
||||
|
||||
/**Parse the INI file*/
|
||||
if(ini_parse(cnfname,handler,NULL) < 0){
|
||||
|
||||
/**Try to parse a config in the same directory*/
|
||||
if(ini_parse(fname,handler,NULL) < 0){
|
||||
fprintf(out_fd, "Fatal Error: Error parsing configuration file!\n");
|
||||
goto fatal_error;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
if(out_fd == NULL){
|
||||
out_fd = stdout;
|
||||
}
|
||||
|
||||
fprintf(out_fd,"\n--------------------------------------------------------------\n");
|
||||
|
||||
/**Confirm that all parameters were in the configuration file*/
|
||||
if(!c_inst->hostname||!c_inst->vhost||!c_inst->user||
|
||||
!c_inst->passwd||!c_inst->dbpasswd||!c_inst->queue||
|
||||
!c_inst->dbserver||!c_inst->dbname||!c_inst->dbuser){
|
||||
fprintf(stderr, "Fatal Error: Inadequate configuration file!\n");
|
||||
goto fatal_error;
|
||||
}
|
||||
|
||||
connectToServer(&db_inst);
|
||||
|
||||
if((conn = amqp_new_connection()) == NULL ||
|
||||
(socket = amqp_tcp_socket_new(conn)) == NULL){
|
||||
fprintf(stderr, "Fatal Error: Cannot create connection object or socket.\n");
|
||||
goto fatal_error;
|
||||
}
|
||||
|
||||
if(amqp_socket_open(socket, c_inst->hostname, c_inst->port)){
|
||||
fprintf(stderr, "RabbitMQ Error: Cannot open socket.\n");
|
||||
goto error;
|
||||
}
|
||||
|
||||
ret = amqp_login(conn, c_inst->vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, c_inst->user, c_inst->passwd);
|
||||
|
||||
if(ret.reply_type != AMQP_RESPONSE_NORMAL){
|
||||
fprintf(stderr, "RabbitMQ Error: Cannot login to server.\n");
|
||||
goto error;
|
||||
}
|
||||
|
||||
amqp_channel_open(conn, channel);
|
||||
ret = amqp_get_rpc_reply(conn);
|
||||
|
||||
if(ret.reply_type != AMQP_RESPONSE_NORMAL){
|
||||
fprintf(stderr, "RabbitMQ Error: Cannot open channel.\n");
|
||||
goto error;
|
||||
}
|
||||
|
||||
reply = malloc(sizeof(amqp_message_t));
|
||||
if(!reply){
|
||||
fprintf(stderr, "Error: Cannot allocate enough memory.\n");
|
||||
goto error;
|
||||
}
|
||||
amqp_basic_consume(conn,channel,amqp_cstring_bytes(c_inst->queue),amqp_empty_bytes,0,0,0,amqp_empty_table);
|
||||
|
||||
while(all_ok){
|
||||
|
||||
status = amqp_simple_wait_frame_noblock(conn,&frame,&timeout);
|
||||
|
||||
/**No frames to read from server, possibly out of messages*/
|
||||
if(status == AMQP_STATUS_TIMEOUT){
|
||||
sleep(timeout.tv_sec);
|
||||
continue;
|
||||
}
|
||||
|
||||
if(frame.payload.method.id == AMQP_BASIC_DELIVER_METHOD){
|
||||
|
||||
amqp_basic_deliver_t* decoded = (amqp_basic_deliver_t*)frame.payload.method.decoded;
|
||||
|
||||
amqp_read_message(conn,channel,reply,0);
|
||||
|
||||
if(sendMessage(&db_inst,reply)){
|
||||
|
||||
fprintf(stderr,"RabbitMQ Error: Received malformed message.\n");
|
||||
amqp_basic_reject(conn,channel,decoded->delivery_tag,0);
|
||||
amqp_destroy_message(reply);
|
||||
|
||||
}else{
|
||||
|
||||
amqp_basic_ack(conn,channel,decoded->delivery_tag,0);
|
||||
amqp_destroy_message(reply);
|
||||
|
||||
}
|
||||
|
||||
}else{
|
||||
fprintf(stderr,"RabbitMQ Error: Received method from server: %s\n",amqp_method_name(frame.payload.method.id));
|
||||
all_ok = 0;
|
||||
goto error;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
fprintf(out_fd,"Shutting down...\n");
|
||||
error:
|
||||
|
||||
mysql_close(&db_inst);
|
||||
mysql_library_end();
|
||||
if(c_inst && c_inst->query_stack){
|
||||
|
||||
while(c_inst->query_stack){
|
||||
DELIVERY* d = c_inst->query_stack->next;
|
||||
amqp_destroy_message(c_inst->query_stack->message);
|
||||
free(c_inst->query_stack);
|
||||
c_inst->query_stack = d;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
amqp_channel_close(conn, channel, AMQP_REPLY_SUCCESS);
|
||||
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
|
||||
amqp_destroy_connection(conn);
|
||||
fatal_error:
|
||||
|
||||
fclose(out_fd);
|
||||
|
||||
if(c_inst){
|
||||
|
||||
free(c_inst->hostname);
|
||||
free(c_inst->vhost);
|
||||
free(c_inst->user);
|
||||
free(c_inst->passwd);
|
||||
free(c_inst->queue);
|
||||
free(c_inst->dbserver);
|
||||
free(c_inst->dbname);
|
||||
free(c_inst->dbuser);
|
||||
free(c_inst->dbpasswd);
|
||||
free(c_inst);
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
return all_ok;
|
||||
}
|
Reference in New Issue
Block a user