#include #include #include #include #include #include #include #include #include #include #include #include #include #include typedef struct delivery_t { uint64_t dtag; amqp_message_t* message; struct delivery_t *next,*prev; }DELIVERY; typedef struct consumer_t { char *hostname,*vhost,*user,*passwd,*queue,*dbserver,*dbname,*dbuser,*dbpasswd; DELIVERY* query_stack; int port,dbport; }CONSUMER; static int all_ok; static FILE* out_fd; static CONSUMER* c_inst; static char* DB_DATABASE = "CREATE DATABASE IF NOT EXISTS %s;"; static char* DB_TABLE = "CREATE TABLE IF NOT EXISTS pairs (tag VARCHAR(64) PRIMARY KEY NOT NULL, query VARCHAR(2048), reply VARCHAR(2048), date_in DATETIME NOT NULL, date_out DATETIME DEFAULT NULL, counter INT DEFAULT 1)"; static char* DB_INSERT = "INSERT INTO pairs(tag, query, date_in) VALUES ('%s','%s',FROM_UNIXTIME(%s))"; static char* DB_UPDATE = "UPDATE pairs SET reply='%s', date_out=FROM_UNIXTIME(%s) WHERE tag='%s'"; static char* DB_INCREMENT = "UPDATE pairs SET counter = counter+1, date_out=FROM_UNIXTIME(%s) WHERE query='%s'"; void sighndl(int signum) { if(signum == SIGINT){ all_ok = 0; alarm(1); } } int handler(void* user, const char* section, const char* name, const char* value) { if(strcmp(section,"consumer") == 0){ if(strcmp(name,"hostname") == 0){ c_inst->hostname = strdup(value); }else if(strcmp(name,"vhost") == 0){ c_inst->vhost = strdup(value); }else if(strcmp(name,"port") == 0){ c_inst->port = atoi(value); }else if(strcmp(name,"user") == 0){ c_inst->user = strdup(value); }else if(strcmp(name,"passwd") == 0){ c_inst->passwd = strdup(value); }else if(strcmp(name,"queue") == 0){ c_inst->queue = strdup(value); }else if(strcmp(name,"dbserver") == 0){ c_inst->dbserver = strdup(value); }else if(strcmp(name,"dbport") == 0){ c_inst->dbport = atoi(value); }else if(strcmp(name,"dbname") == 0){ c_inst->dbname = strdup(value); }else if(strcmp(name,"dbuser") == 0){ c_inst->dbuser = strdup(value); }else if(strcmp(name,"dbpasswd") == 0){ c_inst->dbpasswd = strdup(value); }else if(strcmp(name,"logfile") == 0){ out_fd = fopen(value,"ab"); } } return 1; } int isPair(amqp_message_t* a, amqp_message_t* b) { int keylen = a->properties.correlation_id.len >= b->properties.correlation_id.len ? a->properties.correlation_id.len : b->properties.correlation_id.len; return strncmp(a->properties.correlation_id.bytes, b->properties.correlation_id.bytes, keylen) == 0 ? 1 : 0; } int connectToServer(MYSQL* server) { mysql_init(server); mysql_options(server,MYSQL_READ_DEFAULT_GROUP,"client"); mysql_options(server,MYSQL_OPT_USE_REMOTE_CONNECTION,0); my_bool tr = 1; mysql_options(server,MYSQL_OPT_RECONNECT,&tr); MYSQL* result = mysql_real_connect(server, c_inst->dbserver, c_inst->dbuser, c_inst->dbpasswd, NULL, c_inst->dbport, NULL, 0); if(result==NULL){ fprintf(out_fd,"\33[31;1mError\33[0m: Could not connect to MySQL server: %s\n",mysql_error(server)); return 0; } int bsz = 1024; char *qstr = calloc(bsz,sizeof(char)); if(!qstr){ fprintf(stderr, "Fatal Error: Cannot allocate enough memory.\n"); return 0; } /**Connection ok, check that the database and table exist*/ memset(qstr,0,bsz); sprintf(qstr,DB_DATABASE,c_inst->dbname); if(mysql_query(server,qstr)){ fprintf(stderr,"\33[31;1mError\33[0m: Could not send query MySQL server: %s\n",mysql_error(server)); } memset(qstr,0,bsz); sprintf(qstr,"USE %s;",c_inst->dbname); if(mysql_query(server,qstr)){ fprintf(stderr,"\33[31;1mError\33[0m: Could not send query MySQL server: %s\n",mysql_error(server)); } memset(qstr,0,bsz); sprintf(qstr,"%s",DB_TABLE); if(mysql_query(server,qstr)){ fprintf(stderr,"\33[31;1mError\33[0m: Could not send query MySQL server: %s\n",mysql_error(server)); } free(qstr); return 1; } int sendMessage(MYSQL* server, amqp_message_t* msg) { int buffsz = (int)((msg->body.len + 1)*2+1) + (int)((msg->properties.correlation_id.len + 1)*2+1) + strlen(DB_INSERT), rval = 0; char* saved; char *qstr = calloc(buffsz,sizeof(char)), *rawmsg = calloc((msg->body.len + 1),sizeof(char)), *clnmsg = calloc(((msg->body.len + 1)*2+1),sizeof(char)), *rawdate = calloc((msg->body.len + 1),sizeof(char)), *clndate = calloc(((msg->body.len + 1)*2+1),sizeof(char)), *rawtag = calloc((msg->properties.correlation_id.len + 1),sizeof(char)), *clntag = calloc(((msg->properties.correlation_id.len + 1)*2+1),sizeof(char)); sprintf(qstr,"%.*s",(int)msg->body.len,(char *)msg->body.bytes); fprintf(out_fd,"Received: %s\n",qstr); char *ptr = strtok_r(qstr,"|",&saved); sprintf(rawdate,"%s",ptr); ptr = strtok_r(NULL,"\n\0",&saved); if(ptr == NULL){ fprintf(out_fd,"Message content not valid.\n"); rval = 1; goto cleanup; } sprintf(rawmsg,"%s",ptr); sprintf(rawtag,"%.*s",(int)msg->properties.correlation_id.len,(char *)msg->properties.correlation_id.bytes); memset(qstr,0,buffsz); mysql_real_escape_string(server,clnmsg,rawmsg,strnlen(rawmsg,msg->body.len + 1)); mysql_real_escape_string(server,clndate,rawdate,strnlen(rawdate,msg->body.len + 1)); mysql_real_escape_string(server,clntag,rawtag,strnlen(rawtag,msg->properties.correlation_id.len + 1)); if(strncmp(msg->properties.message_id.bytes, "query",msg->properties.message_id.len) == 0) { sprintf(qstr,DB_INCREMENT,clndate,clnmsg); rval = mysql_query(server,qstr); if(mysql_affected_rows(server) == 0){ memset(qstr,0,buffsz); sprintf(qstr,DB_INSERT,clntag,clnmsg,clndate); rval = mysql_query(server,qstr); } }else if(strncmp(msg->properties.message_id.bytes, "reply",msg->properties.message_id.len) == 0){ sprintf(qstr,DB_UPDATE,clnmsg,clndate,clntag); rval = mysql_query(server,qstr); }else{ rval = 1; goto cleanup; } if(rval){ fprintf(stderr,"Could not send query to SQL server:%s\n",mysql_error(server)); goto cleanup; } cleanup: free(qstr); free(rawmsg); free(clnmsg); free(rawdate); free(clndate); free(rawtag); free(clntag); return rval; } int sendToServer(MYSQL* server, amqp_message_t* a, amqp_message_t* b){ amqp_message_t *msg, *reply; int buffsz = 2048; char *qstr = calloc(buffsz,sizeof(char)); if(!qstr){ fprintf(out_fd, "Fatal Error: Cannot allocate enough memory.\n"); free(qstr); return 0; } if( a->properties.message_id.len == strlen("query") && strncmp(a->properties.message_id.bytes,"query", a->properties.message_id.len) == 0){ msg = a; reply = b; }else{ msg = b; reply = a; } printf("pair: %.*s\nquery: %.*s\nreply: %.*s\n", (int)msg->properties.correlation_id.len, (char *)msg->properties.correlation_id.bytes, (int)msg->body.len, (char *)msg->body.bytes, (int)reply->body.len, (char *)reply->body.bytes); if((int)msg->body.len + (int)reply->body.len + (int)msg->properties.correlation_id.len + 50 >= buffsz) { char *qtmp = calloc(buffsz*2,sizeof(char)); free(qstr); if(qtmp){ qstr = qtmp; buffsz *= 2; }else{ fprintf(stderr, "Fatal Error: Cannot allocate enough memory.\n"); return 0; } } char *rawmsg = calloc((msg->body.len + 1),sizeof(char)), *clnmsg = calloc(((msg->body.len + 1)*2+1),sizeof(char)), *rawrpl = calloc((reply->body.len + 1),sizeof(char)), *clnrpl = calloc(((reply->body.len + 1)*2+1),sizeof(char)), *rawtag = calloc((msg->properties.correlation_id.len + 1),sizeof(char)), *clntag = calloc(((msg->properties.correlation_id.len + 1)*2+1),sizeof(char)); sprintf(rawmsg,"%.*s",(int)msg->body.len,(char *)msg->body.bytes); sprintf(rawrpl,"%.*s",(int)reply->body.len,(char *)reply->body.bytes); sprintf(rawtag,"%.*s",(int)msg->properties.correlation_id.len,(char *)msg->properties.correlation_id.bytes); char *ptr; while((ptr = strchr(rawmsg,'\n'))){ *ptr = ' '; } while((ptr = strchr(rawrpl,'\n'))){ *ptr = ' '; } while((ptr = strchr(rawtag,'\n'))){ *ptr = ' '; } mysql_real_escape_string(server,clnmsg,rawmsg,strnlen(rawmsg,msg->body.len + 1)); mysql_real_escape_string(server,clnrpl,rawrpl,strnlen(rawrpl,reply->body.len + 1)); mysql_real_escape_string(server,clntag,rawtag,strnlen(rawtag,msg->properties.correlation_id.len + 1)); sprintf(qstr,"INSERT INTO pairs VALUES ('%s','%s','%s');",clnmsg,clnrpl,clntag); free(rawmsg); free(clnmsg); free(rawrpl); free(clnrpl); free(rawtag); free(clntag); if(mysql_query(server,qstr)){ fprintf(stderr,"Could not send query to SQL server:%s\n",mysql_error(server)); free(qstr); return 0; } free(qstr); return 1; } int main(int argc, char** argv) { int channel = 1, status = AMQP_STATUS_OK, cnfnlen; amqp_socket_t *socket = NULL; amqp_connection_state_t conn; amqp_rpc_reply_t ret; amqp_message_t *reply = NULL; amqp_frame_t frame; struct timeval timeout; MYSQL db_inst; char ch, *cnfname = NULL, *cnfpath = NULL; static const char* fname = "consumer.cnf"; const char* default_path = "@CMAKE_INSTALL_PREFIX@/etc"; if((c_inst = calloc(1,sizeof(CONSUMER))) == NULL){ fprintf(stderr, "Fatal Error: Cannot allocate enough memory.\n"); return 1; } if(signal(SIGINT,sighndl) == SIG_IGN){ signal(SIGINT,SIG_IGN); } while((ch = getopt(argc,argv,"c:"))!= -1){ switch(ch){ case 'c': cnfnlen = strlen(optarg); cnfpath = strdup(optarg); break; default: break; } } if(cnfpath == NULL) { cnfpath = strdup(default_path); cnfnlen = strlen(default_path); } cnfname = calloc(cnfnlen + strlen(fname) + 1,sizeof(char)); if(cnfpath){ /**Config file path as argument*/ strcpy(cnfname,cnfpath); if(cnfpath[cnfnlen-1] != '/'){ strcat(cnfname,"/"); } } strcat(cnfname,fname); timeout.tv_sec = 1; timeout.tv_usec = 0; all_ok = 1; out_fd = NULL; /**Parse the INI file*/ if(ini_parse(cnfname,handler,NULL) < 0){ /**Try to parse a config in the same directory*/ if(ini_parse(fname,handler,NULL) < 0){ fprintf(stderr, "Fatal Error: Error parsing configuration file!\n"); goto fatal_error; } } if(out_fd == NULL){ out_fd = stdout; } fprintf(out_fd,"\n--------------------------------------------------------------\n"); /**Confirm that all parameters were in the configuration file*/ if(!c_inst->hostname||!c_inst->vhost||!c_inst->user|| !c_inst->passwd||!c_inst->dbpasswd||!c_inst->queue|| !c_inst->dbserver||!c_inst->dbname||!c_inst->dbuser){ fprintf(stderr, "Fatal Error: Inadequate configuration file!\n"); goto fatal_error; } connectToServer(&db_inst); if((conn = amqp_new_connection()) == NULL || (socket = amqp_tcp_socket_new(conn)) == NULL){ fprintf(stderr, "Fatal Error: Cannot create connection object or socket.\n"); goto fatal_error; } if(amqp_socket_open(socket, c_inst->hostname, c_inst->port)){ fprintf(stderr, "\33[31;1mRabbitMQ Error\33[0m: Cannot open socket.\n"); goto error; } ret = amqp_login(conn, c_inst->vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, c_inst->user, c_inst->passwd); if(ret.reply_type != AMQP_RESPONSE_NORMAL){ fprintf(stderr, "\33[31;1mRabbitMQ Error\33[0m: Cannot login to server.\n"); goto error; } amqp_channel_open(conn, channel); ret = amqp_get_rpc_reply(conn); if(ret.reply_type != AMQP_RESPONSE_NORMAL){ fprintf(stderr, "\33[31;1mRabbitMQ Error\33[0m: Cannot open channel.\n"); goto error; } reply = malloc(sizeof(amqp_message_t)); if(!reply){ fprintf(stderr, "Error: Cannot allocate enough memory.\n"); goto error; } amqp_basic_consume(conn,channel,amqp_cstring_bytes(c_inst->queue),amqp_empty_bytes,0,0,0,amqp_empty_table); while(all_ok){ status = amqp_simple_wait_frame_noblock(conn,&frame,&timeout); /**No frames to read from server, possibly out of messages*/ if(status == AMQP_STATUS_TIMEOUT){ sleep(timeout.tv_sec); continue; } if(frame.payload.method.id == AMQP_BASIC_DELIVER_METHOD){ amqp_basic_deliver_t* decoded = (amqp_basic_deliver_t*)frame.payload.method.decoded; amqp_read_message(conn,channel,reply,0); if(sendMessage(&db_inst,reply)){ fprintf(stderr,"\33[31;1mRabbitMQ Error\33[0m: Received malformed message.\n"); amqp_basic_reject(conn,channel,decoded->delivery_tag,0); amqp_destroy_message(reply); }else{ amqp_basic_ack(conn,channel,decoded->delivery_tag,0); amqp_destroy_message(reply); } }else{ fprintf(stderr,"\33[31;1mRabbitMQ Error\33[0m: Received method from server: %s\n",amqp_method_name(frame.payload.method.id)); all_ok = 0; goto error; } } fprintf(out_fd,"Shutting down...\n"); error: mysql_close(&db_inst); mysql_library_end(); if(c_inst && c_inst->query_stack){ while(c_inst->query_stack){ DELIVERY* d = c_inst->query_stack->next; amqp_destroy_message(c_inst->query_stack->message); free(c_inst->query_stack); c_inst->query_stack = d; } } amqp_channel_close(conn, channel, AMQP_REPLY_SUCCESS); amqp_connection_close(conn, AMQP_REPLY_SUCCESS); amqp_destroy_connection(conn); fatal_error: if(out_fd){ fclose(out_fd); } if(c_inst){ free(c_inst->hostname); free(c_inst->vhost); free(c_inst->user); free(c_inst->passwd); free(c_inst->queue); free(c_inst->dbserver); free(c_inst->dbname); free(c_inst->dbuser); free(c_inst->dbpasswd); free(c_inst); } return all_ok; }