525 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
			
		
		
	
	
			525 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
| #include <unistd.h>
 | |
| #include <stdlib.h>
 | |
| #include <stdio.h>
 | |
| #include <string.h>
 | |
| #include <ini.h>
 | |
| #include <stdint.h>
 | |
| #include <amqp_tcp_socket.h>
 | |
| #include <amqp.h>
 | |
| #include <amqp_framing.h>
 | |
| #include <mysql.h>
 | |
| #include <signal.h>
 | |
| #include <sys/types.h>
 | |
| #include <sys/stat.h>
 | |
| #include <fcntl.h>
 | |
| 
 | |
| typedef struct delivery_t
 | |
| {
 | |
|   uint64_t dtag;
 | |
|   amqp_message_t* message;
 | |
|   struct delivery_t *next,*prev;
 | |
| }DELIVERY;
 | |
| 
 | |
| typedef struct consumer_t
 | |
| {
 | |
|   char *hostname,*vhost,*user,*passwd,*queue,*dbserver,*dbname,*dbuser,*dbpasswd;
 | |
|   DELIVERY* query_stack;
 | |
|   int port,dbport;
 | |
| }CONSUMER;
 | |
| 
 | |
| static int all_ok;
 | |
| static FILE* out_fd;
 | |
| static CONSUMER* c_inst;
 | |
| static char* DB_DATABASE = "CREATE DATABASE IF NOT EXISTS %s;";
 | |
| static char* DB_TABLE = "CREATE TABLE IF NOT EXISTS pairs (tag VARCHAR(64) PRIMARY KEY NOT NULL, query VARCHAR(2048), reply VARCHAR(2048), date_in DATETIME NOT NULL, date_out DATETIME DEFAULT NULL, counter INT DEFAULT 1)";
 | |
| static char* DB_INSERT = "INSERT INTO pairs(tag, query, date_in) VALUES ('%s','%s',FROM_UNIXTIME(%s))";
 | |
| static char* DB_UPDATE = "UPDATE pairs SET reply='%s', date_out=FROM_UNIXTIME(%s) WHERE tag='%s'";
 | |
| static char* DB_INCREMENT = "UPDATE pairs SET counter = counter+1, date_out=FROM_UNIXTIME(%s) WHERE query='%s'";
 | |
| 
 | |
| void sighndl(int signum)
 | |
| {
 | |
|   if(signum == SIGINT){
 | |
|     all_ok = 0;
 | |
|     alarm(1);
 | |
|   }
 | |
| }
 | |
| 
 | |
| int handler(void* user, const char* section, const char* name,
 | |
| 	    const char* value)
 | |
| {
 | |
|   if(strcmp(section,"consumer") == 0){
 | |
|     
 | |
|     if(strcmp(name,"hostname") == 0){
 | |
|       c_inst->hostname = strdup(value);
 | |
|     }else if(strcmp(name,"vhost") == 0){
 | |
|       c_inst->vhost = strdup(value);
 | |
|     }else if(strcmp(name,"port") == 0){
 | |
|       c_inst->port = atoi(value);
 | |
|     }else if(strcmp(name,"user") == 0){
 | |
|       c_inst->user = strdup(value);
 | |
|     }else if(strcmp(name,"passwd") == 0){
 | |
|       c_inst->passwd = strdup(value);
 | |
|     }else if(strcmp(name,"queue") == 0){
 | |
|       c_inst->queue = strdup(value);
 | |
|     }else if(strcmp(name,"dbserver") == 0){
 | |
|       c_inst->dbserver = strdup(value);
 | |
|     }else if(strcmp(name,"dbport") == 0){
 | |
|       c_inst->dbport = atoi(value);
 | |
|     }else if(strcmp(name,"dbname") == 0){
 | |
|       c_inst->dbname = strdup(value);
 | |
|     }else if(strcmp(name,"dbuser") == 0){
 | |
|       c_inst->dbuser = strdup(value);
 | |
|     }else if(strcmp(name,"dbpasswd") == 0){
 | |
|       c_inst->dbpasswd = strdup(value);
 | |
|     }else if(strcmp(name,"logfile") == 0){
 | |
|       out_fd = fopen(value,"ab");
 | |
|     }
 | |
| 
 | |
|   }
 | |
| 
 | |
|   return 1;
 | |
| }
 | |
| 
 | |
| int isPair(amqp_message_t* a, amqp_message_t* b)
 | |
| {
 | |
|   int keylen = a->properties.correlation_id.len >=
 | |
|     b->properties.correlation_id.len ?
 | |
|     a->properties.correlation_id.len :
 | |
|     b->properties.correlation_id.len;
 | |
|   
 | |
|   return strncmp(a->properties.correlation_id.bytes,
 | |
| 		 b->properties.correlation_id.bytes,
 | |
| 		 keylen) == 0 ? 1 : 0;
 | |
| }
 | |
| 
 | |
| int connectToServer(MYSQL* server)
 | |
| {
 | |
| 
 | |
|   
 | |
|   mysql_init(server);
 | |
| 
 | |
|   mysql_options(server,MYSQL_READ_DEFAULT_GROUP,"client");
 | |
|   mysql_options(server,MYSQL_OPT_USE_REMOTE_CONNECTION,0);
 | |
|   my_bool tr = 1;
 | |
|   mysql_options(server,MYSQL_OPT_RECONNECT,&tr);
 | |
|   
 | |
| 
 | |
|   MYSQL* result =  mysql_real_connect(server,
 | |
| 				      c_inst->dbserver,
 | |
| 				      c_inst->dbuser,
 | |
| 				      c_inst->dbpasswd,
 | |
| 				      NULL,
 | |
| 				      c_inst->dbport,
 | |
| 				      NULL,
 | |
| 				      0);
 | |
|  
 | |
|   
 | |
|   if(result==NULL){
 | |
|     fprintf(out_fd,"Error: Could not connect to MySQL server: %s\n",mysql_error(server));
 | |
|     return 0;
 | |
|   }
 | |
| 
 | |
|   int bsz = 1024;
 | |
|   char *qstr = calloc(bsz,sizeof(char));
 | |
| 
 | |
|   
 | |
|   if(!qstr){
 | |
|     fprintf(stderr, "Fatal Error: Cannot allocate enough memory.\n");
 | |
|     return 0;
 | |
|   }
 | |
| 
 | |
| 
 | |
|   /**Connection ok, check that the database and table exist*/
 | |
| 
 | |
|   memset(qstr,0,bsz);
 | |
|   sprintf(qstr,DB_DATABASE,c_inst->dbname);
 | |
|   if(mysql_query(server,qstr)){
 | |
|     fprintf(stderr,"Error: Could not send query MySQL server: %s\n",mysql_error(server));
 | |
|   }
 | |
|   memset(qstr,0,bsz);
 | |
|   sprintf(qstr,"USE %s;",c_inst->dbname);
 | |
|   if(mysql_query(server,qstr)){
 | |
|     fprintf(stderr,"Error: Could not send query MySQL server: %s\n",mysql_error(server));
 | |
|   }
 | |
|   
 | |
|   memset(qstr,0,bsz);
 | |
|   sprintf(qstr,DB_TABLE);
 | |
|   if(mysql_query(server,qstr)){
 | |
|     fprintf(stderr,"Error: Could not send query MySQL server: %s\n",mysql_error(server));
 | |
|   }
 | |
| 
 | |
|   free(qstr);
 | |
|   return 1;
 | |
| }
 | |
| 
 | |
| int sendMessage(MYSQL* server, amqp_message_t* msg)
 | |
| {
 | |
|   int buffsz = (int)((msg->body.len + 1)*2+1) + 
 | |
|     (int)((msg->properties.correlation_id.len + 1)*2+1) + 
 | |
|     strlen(DB_INSERT),
 | |
|     rval = 0;
 | |
|   char *qstr = calloc(buffsz,sizeof(char)),
 | |
|     *rawmsg = calloc((msg->body.len + 1),sizeof(char)),
 | |
|     *clnmsg = calloc(((msg->body.len + 1)*2+1),sizeof(char)),
 | |
|     *rawdate = calloc((msg->body.len + 1),sizeof(char)),
 | |
|     *clndate = calloc(((msg->body.len + 1)*2+1),sizeof(char)),
 | |
|     *rawtag = calloc((msg->properties.correlation_id.len + 1),sizeof(char)),
 | |
|     *clntag = calloc(((msg->properties.correlation_id.len + 1)*2+1),sizeof(char));
 | |
| 
 | |
| 
 | |
|   
 | |
|   sprintf(qstr,"%.*s",(int)msg->body.len,(char *)msg->body.bytes);
 | |
|   fprintf(out_fd,"Received: %s\n",qstr);
 | |
|   char *ptr = strtok(qstr,"|");
 | |
|   sprintf(rawdate,"%s",ptr);
 | |
|   ptr = strtok(NULL,"\n\0");
 | |
|   if(ptr == NULL){
 | |
|     fprintf(out_fd,"Message content not valid.\n");
 | |
|     rval = 1;
 | |
|     goto cleanup;
 | |
|   }
 | |
|   sprintf(rawmsg,"%s",ptr);
 | |
|   sprintf(rawtag,"%.*s",(int)msg->properties.correlation_id.len,(char *)msg->properties.correlation_id.bytes);
 | |
|   memset(qstr,0,buffsz);
 | |
|  
 | |
|   mysql_real_escape_string(server,clnmsg,rawmsg,strnlen(rawmsg,msg->body.len + 1));
 | |
|   mysql_real_escape_string(server,clndate,rawdate,strnlen(rawdate,msg->body.len + 1));
 | |
|   mysql_real_escape_string(server,clntag,rawtag,strnlen(rawtag,msg->properties.correlation_id.len + 1));
 | |
| 	  
 | |
|   if(strncmp(msg->properties.message_id.bytes,
 | |
| 	     "query",msg->properties.message_id.len) == 0)
 | |
|     {
 | |
|       
 | |
|       sprintf(qstr,DB_INCREMENT,clndate,clnmsg);
 | |
|       rval = mysql_query(server,qstr);
 | |
| 
 | |
|       if(mysql_affected_rows(server) == 0){
 | |
| 	memset(qstr,0,buffsz);
 | |
| 	sprintf(qstr,DB_INSERT,clntag,clnmsg,clndate);
 | |
| 	rval = mysql_query(server,qstr);
 | |
|       }
 | |
| 
 | |
|     }else if(strncmp(msg->properties.message_id.bytes,
 | |
| 		     "reply",msg->properties.message_id.len) == 0){
 | |
| 
 | |
|     sprintf(qstr,DB_UPDATE,clnmsg,clndate,clntag);
 | |
|     rval = mysql_query(server,qstr);
 | |
| 
 | |
|   }else{
 | |
|     rval = 1;
 | |
|     goto cleanup;
 | |
|   }
 | |
|   
 | |
| 
 | |
|   if(rval){
 | |
|     fprintf(stderr,"Could not send query to SQL server:%s\n",mysql_error(server));
 | |
|     goto cleanup;
 | |
|   }
 | |
| 
 | |
|  cleanup:
 | |
|   free(qstr);
 | |
|   free(rawmsg);
 | |
|   free(clnmsg);
 | |
|   free(rawdate);
 | |
|   free(clndate);
 | |
|   free(rawtag);
 | |
|   free(clntag);
 | |
|   
 | |
|   return rval;
 | |
| }
 | |
| 
 | |
| int sendToServer(MYSQL* server, amqp_message_t* a, amqp_message_t* b){
 | |
| 
 | |
|   amqp_message_t *msg, *reply;
 | |
|   int buffsz = 2048;
 | |
|   char *qstr = calloc(buffsz,sizeof(char));
 | |
| 
 | |
|   if(!qstr){
 | |
|     fprintf(out_fd, "Fatal Error: Cannot allocate enough memory.\n");
 | |
|     free(qstr);
 | |
|     return 0;
 | |
|   }
 | |
| 
 | |
|   if( a->properties.message_id.len == strlen("query") &&
 | |
|       strncmp(a->properties.message_id.bytes,"query",
 | |
| 	      a->properties.message_id.len) == 0){
 | |
| 
 | |
|     msg = a;
 | |
|     reply = b;
 | |
| 
 | |
|   }else{
 | |
| 
 | |
|     msg = b;
 | |
|     reply = a;
 | |
| 
 | |
|   }
 | |
|   
 | |
| 
 | |
|   printf("pair: %.*s\nquery: %.*s\nreply: %.*s\n",
 | |
| 	 (int)msg->properties.correlation_id.len,
 | |
| 	 (char *)msg->properties.correlation_id.bytes,
 | |
| 	 (int)msg->body.len,
 | |
| 	 (char *)msg->body.bytes,
 | |
| 	 (int)reply->body.len,
 | |
| 	 (char *)reply->body.bytes);
 | |
| 
 | |
|   if((int)msg->body.len +
 | |
|      (int)reply->body.len +
 | |
|      (int)msg->properties.correlation_id.len + 50 >= buffsz)
 | |
|     {
 | |
|       char *qtmp = calloc(buffsz*2,sizeof(char));
 | |
|       free(qstr);
 | |
| 
 | |
|       if(qtmp){
 | |
| 	qstr = qtmp;
 | |
| 	buffsz *= 2;
 | |
|       }else{
 | |
| 	fprintf(stderr, "Fatal Error: Cannot allocate enough memory.\n");
 | |
| 	return 0;
 | |
|       }
 | |
| 
 | |
|     }
 | |
|   
 | |
|   char *rawmsg = calloc((msg->body.len + 1),sizeof(char)),
 | |
|     *clnmsg = calloc(((msg->body.len + 1)*2+1),sizeof(char)),
 | |
|     *rawrpl = calloc((reply->body.len + 1),sizeof(char)),
 | |
|     *clnrpl = calloc(((reply->body.len + 1)*2+1),sizeof(char)),
 | |
|     *rawtag = calloc((msg->properties.correlation_id.len + 1),sizeof(char)),
 | |
|     *clntag = calloc(((msg->properties.correlation_id.len + 1)*2+1),sizeof(char));
 | |
| 
 | |
|   sprintf(rawmsg,"%.*s",(int)msg->body.len,(char *)msg->body.bytes);
 | |
|   sprintf(rawrpl,"%.*s",(int)reply->body.len,(char *)reply->body.bytes);
 | |
|   sprintf(rawtag,"%.*s",(int)msg->properties.correlation_id.len,(char *)msg->properties.correlation_id.bytes);
 | |
|   
 | |
|   char *ptr;
 | |
|   while((ptr = strchr(rawmsg,'\n'))){
 | |
|     *ptr = ' ';
 | |
|   }
 | |
|   while((ptr = strchr(rawrpl,'\n'))){
 | |
|     *ptr = ' ';
 | |
|   }
 | |
|   while((ptr = strchr(rawtag,'\n'))){
 | |
|     *ptr = ' ';
 | |
|   }
 | |
| 
 | |
|   mysql_real_escape_string(server,clnmsg,rawmsg,strnlen(rawmsg,msg->body.len + 1));
 | |
|   mysql_real_escape_string(server,clnrpl,rawrpl,strnlen(rawrpl,reply->body.len + 1));
 | |
|   mysql_real_escape_string(server,clntag,rawtag,strnlen(rawtag,msg->properties.correlation_id.len + 1));
 | |
| 	  
 | |
|   
 | |
| 
 | |
|   sprintf(qstr,"INSERT INTO pairs VALUES ('%s','%s','%s');",clnmsg,clnrpl,clntag); 
 | |
|   free(rawmsg);
 | |
|   free(clnmsg);
 | |
|   free(rawrpl);
 | |
|   free(clnrpl);
 | |
|   free(rawtag);
 | |
|   free(clntag);
 | |
|   
 | |
|   if(mysql_query(server,qstr)){
 | |
|     fprintf(stderr,"Could not send query to SQL server:%s\n",mysql_error(server));
 | |
|     free(qstr);
 | |
|     return 0;
 | |
|   }
 | |
|   
 | |
|   free(qstr);
 | |
|   return 1;
 | |
| }
 | |
| int main(int argc, char** argv)
 | |
| {
 | |
|   int channel = 1, status = AMQP_STATUS_OK, cnfnlen;
 | |
|   amqp_socket_t *socket = NULL;
 | |
|   amqp_connection_state_t conn;
 | |
|   amqp_rpc_reply_t ret;
 | |
|   amqp_message_t *reply = NULL;
 | |
|   amqp_frame_t frame;
 | |
|   struct timeval timeout;
 | |
|   MYSQL db_inst;
 | |
|   char ch, *cnfname = NULL, *cnfpath = NULL;
 | |
|   static const char* fname = "consumer.cnf";
 | |
| 
 | |
|   if((c_inst = calloc(1,sizeof(CONSUMER))) == NULL){
 | |
|     fprintf(stderr, "Fatal Error: Cannot allocate enough memory.\n");
 | |
|     return 1;
 | |
|   }
 | |
| 
 | |
|   if(signal(SIGINT,sighndl) == SIG_IGN){
 | |
|     signal(SIGINT,SIG_IGN);
 | |
|   }
 | |
| 
 | |
|   while((ch = getopt(argc,argv,"c:"))!= -1){
 | |
|     switch(ch){
 | |
|     case 'c':
 | |
|       cnfnlen = strlen(optarg);
 | |
|       cnfpath = strdup(optarg);
 | |
|       break;
 | |
|     default:
 | |
| 
 | |
|       break;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   cnfname = calloc(cnfnlen + strlen(fname) + 1,sizeof(char));
 | |
| 
 | |
|   if(cnfpath){
 | |
| 
 | |
|     /**Config file path as argument*/
 | |
|     strcpy(cnfname,cnfpath);
 | |
|     if(cnfpath[cnfnlen-1] != '/'){
 | |
|       strcat(cnfname,"/");
 | |
|     }
 | |
| 
 | |
|   }  
 | |
|   
 | |
|   strcat(cnfname,fname);
 | |
| 
 | |
|   timeout.tv_sec = 1;
 | |
|   timeout.tv_usec = 0;
 | |
|   all_ok = 1;
 | |
|   out_fd = NULL;
 | |
| 
 | |
| 
 | |
| 
 | |
|   /**Parse the INI file*/
 | |
|   if(ini_parse(cnfname,handler,NULL) < 0){
 | |
|     
 | |
|     /**Try to parse a config in the same directory*/
 | |
|     if(ini_parse(fname,handler,NULL) < 0){
 | |
|       fprintf(stderr, "Fatal Error: Error parsing configuration file!\n");
 | |
|     goto fatal_error;
 | |
| 
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   if(out_fd == NULL){
 | |
|     out_fd = stdout;
 | |
|   }
 | |
| 
 | |
|   fprintf(out_fd,"\n--------------------------------------------------------------\n");
 | |
|   
 | |
|   /**Confirm that all parameters were in the configuration file*/
 | |
|   if(!c_inst->hostname||!c_inst->vhost||!c_inst->user||
 | |
|      !c_inst->passwd||!c_inst->dbpasswd||!c_inst->queue||
 | |
|      !c_inst->dbserver||!c_inst->dbname||!c_inst->dbuser){
 | |
|     fprintf(stderr, "Fatal Error: Inadequate configuration file!\n");
 | |
|     goto fatal_error;    
 | |
|   }
 | |
| 
 | |
|   connectToServer(&db_inst);
 | |
| 
 | |
|   if((conn = amqp_new_connection()) == NULL || 
 | |
|      (socket = amqp_tcp_socket_new(conn)) == NULL){
 | |
|     fprintf(stderr, "Fatal Error: Cannot create connection object or socket.\n");
 | |
|     goto fatal_error;
 | |
|   }
 | |
|   
 | |
|   if(amqp_socket_open(socket, c_inst->hostname, c_inst->port)){
 | |
|     fprintf(stderr, "RabbitMQ Error: Cannot open socket.\n");
 | |
|     goto error;
 | |
|   }
 | |
|   
 | |
|   ret = amqp_login(conn, c_inst->vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, c_inst->user, c_inst->passwd);
 | |
| 
 | |
|   if(ret.reply_type != AMQP_RESPONSE_NORMAL){
 | |
|     fprintf(stderr, "RabbitMQ Error: Cannot login to server.\n");
 | |
|     goto error;
 | |
|   }
 | |
| 
 | |
|   amqp_channel_open(conn, channel);
 | |
|   ret = amqp_get_rpc_reply(conn);
 | |
| 
 | |
|   if(ret.reply_type != AMQP_RESPONSE_NORMAL){
 | |
|     fprintf(stderr, "RabbitMQ Error: Cannot open channel.\n");
 | |
|     goto error;
 | |
|   }  
 | |
| 
 | |
|   reply = malloc(sizeof(amqp_message_t));
 | |
|   if(!reply){
 | |
|     fprintf(stderr, "Error: Cannot allocate enough memory.\n");
 | |
|     goto error;
 | |
|   }
 | |
|   amqp_basic_consume(conn,channel,amqp_cstring_bytes(c_inst->queue),amqp_empty_bytes,0,0,0,amqp_empty_table);
 | |
| 
 | |
|   while(all_ok){
 | |
|      
 | |
|     status = amqp_simple_wait_frame_noblock(conn,&frame,&timeout);
 | |
| 
 | |
|     /**No frames to read from server, possibly out of messages*/
 | |
|     if(status == AMQP_STATUS_TIMEOUT){ 
 | |
|       sleep(timeout.tv_sec);
 | |
|       continue;
 | |
|     }
 | |
| 
 | |
|     if(frame.payload.method.id == AMQP_BASIC_DELIVER_METHOD){
 | |
| 
 | |
|       amqp_basic_deliver_t* decoded = (amqp_basic_deliver_t*)frame.payload.method.decoded;
 | |
| 	
 | |
|       amqp_read_message(conn,channel,reply,0);
 | |
| 
 | |
|       if(sendMessage(&db_inst,reply)){
 | |
| 
 | |
| 	fprintf(stderr,"RabbitMQ Error: Received malformed message.\n");
 | |
| 	amqp_basic_reject(conn,channel,decoded->delivery_tag,0);	
 | |
| 	amqp_destroy_message(reply);
 | |
| 
 | |
|       }else{
 | |
| 
 | |
| 	amqp_basic_ack(conn,channel,decoded->delivery_tag,0);
 | |
| 	amqp_destroy_message(reply);	
 | |
| 
 | |
|       }
 | |
|       
 | |
|     }else{
 | |
|       fprintf(stderr,"RabbitMQ Error: Received method from server: %s\n",amqp_method_name(frame.payload.method.id));
 | |
|       all_ok = 0;
 | |
|       goto error;
 | |
|     }
 | |
| 
 | |
|   }
 | |
| 
 | |
|   fprintf(out_fd,"Shutting down...\n");
 | |
|  error:
 | |
| 
 | |
|   mysql_close(&db_inst);
 | |
|   mysql_library_end();
 | |
|   if(c_inst && c_inst->query_stack){
 | |
| 
 | |
|     while(c_inst->query_stack){
 | |
|       DELIVERY* d = c_inst->query_stack->next;
 | |
|       amqp_destroy_message(c_inst->query_stack->message);
 | |
|       free(c_inst->query_stack);
 | |
|       c_inst->query_stack = d;
 | |
|     }
 | |
| 
 | |
|   }
 | |
|   
 | |
|   amqp_channel_close(conn, channel, AMQP_REPLY_SUCCESS);
 | |
|   amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
 | |
|   amqp_destroy_connection(conn);
 | |
|  fatal_error:
 | |
| 
 | |
|   if(out_fd){
 | |
|     fclose(out_fd);
 | |
|   }
 | |
| 
 | |
|   
 | |
|   if(c_inst){
 | |
| 
 | |
|     free(c_inst->hostname);
 | |
|     free(c_inst->vhost);
 | |
|     free(c_inst->user);
 | |
|     free(c_inst->passwd);
 | |
|     free(c_inst->queue);
 | |
|     free(c_inst->dbserver);
 | |
|     free(c_inst->dbname);
 | |
|     free(c_inst->dbuser);
 | |
|     free(c_inst->dbpasswd);    
 | |
|     free(c_inst);
 | |
|     
 | |
|   }
 | |
| 
 | |
|   
 | |
|   
 | |
|   return all_ok;
 | |
| }
 | 
