525 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
			
		
		
	
	
			525 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
#include <unistd.h>
 | 
						|
#include <stdlib.h>
 | 
						|
#include <stdio.h>
 | 
						|
#include <string.h>
 | 
						|
#include <ini.h>
 | 
						|
#include <stdint.h>
 | 
						|
#include <amqp_tcp_socket.h>
 | 
						|
#include <amqp.h>
 | 
						|
#include <amqp_framing.h>
 | 
						|
#include <mysql.h>
 | 
						|
#include <signal.h>
 | 
						|
#include <sys/types.h>
 | 
						|
#include <sys/stat.h>
 | 
						|
#include <fcntl.h>
 | 
						|
 | 
						|
typedef struct delivery_t
 | 
						|
{
 | 
						|
  uint64_t dtag;
 | 
						|
  amqp_message_t* message;
 | 
						|
  struct delivery_t *next,*prev;
 | 
						|
}DELIVERY;
 | 
						|
 | 
						|
typedef struct consumer_t
 | 
						|
{
 | 
						|
  char *hostname,*vhost,*user,*passwd,*queue,*dbserver,*dbname,*dbuser,*dbpasswd;
 | 
						|
  DELIVERY* query_stack;
 | 
						|
  int port,dbport;
 | 
						|
}CONSUMER;
 | 
						|
 | 
						|
static int all_ok;
 | 
						|
static FILE* out_fd;
 | 
						|
static CONSUMER* c_inst;
 | 
						|
static char* DB_DATABASE = "CREATE DATABASE IF NOT EXISTS %s;";
 | 
						|
static char* DB_TABLE = "CREATE TABLE IF NOT EXISTS pairs (tag VARCHAR(64) PRIMARY KEY NOT NULL, query VARCHAR(2048), reply VARCHAR(2048), date_in DATETIME NOT NULL, date_out DATETIME DEFAULT NULL, counter INT DEFAULT 1)";
 | 
						|
static char* DB_INSERT = "INSERT INTO pairs(tag, query, date_in) VALUES ('%s','%s',FROM_UNIXTIME(%s))";
 | 
						|
static char* DB_UPDATE = "UPDATE pairs SET reply='%s', date_out=FROM_UNIXTIME(%s) WHERE tag='%s'";
 | 
						|
static char* DB_INCREMENT = "UPDATE pairs SET counter = counter+1, date_out=FROM_UNIXTIME(%s) WHERE query='%s'";
 | 
						|
 | 
						|
void sighndl(int signum)
 | 
						|
{
 | 
						|
  if(signum == SIGINT){
 | 
						|
    all_ok = 0;
 | 
						|
    alarm(1);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
int handler(void* user, const char* section, const char* name,
 | 
						|
	    const char* value)
 | 
						|
{
 | 
						|
  if(strcmp(section,"consumer") == 0){
 | 
						|
    
 | 
						|
    if(strcmp(name,"hostname") == 0){
 | 
						|
      c_inst->hostname = strdup(value);
 | 
						|
    }else if(strcmp(name,"vhost") == 0){
 | 
						|
      c_inst->vhost = strdup(value);
 | 
						|
    }else if(strcmp(name,"port") == 0){
 | 
						|
      c_inst->port = atoi(value);
 | 
						|
    }else if(strcmp(name,"user") == 0){
 | 
						|
      c_inst->user = strdup(value);
 | 
						|
    }else if(strcmp(name,"passwd") == 0){
 | 
						|
      c_inst->passwd = strdup(value);
 | 
						|
    }else if(strcmp(name,"queue") == 0){
 | 
						|
      c_inst->queue = strdup(value);
 | 
						|
    }else if(strcmp(name,"dbserver") == 0){
 | 
						|
      c_inst->dbserver = strdup(value);
 | 
						|
    }else if(strcmp(name,"dbport") == 0){
 | 
						|
      c_inst->dbport = atoi(value);
 | 
						|
    }else if(strcmp(name,"dbname") == 0){
 | 
						|
      c_inst->dbname = strdup(value);
 | 
						|
    }else if(strcmp(name,"dbuser") == 0){
 | 
						|
      c_inst->dbuser = strdup(value);
 | 
						|
    }else if(strcmp(name,"dbpasswd") == 0){
 | 
						|
      c_inst->dbpasswd = strdup(value);
 | 
						|
    }else if(strcmp(name,"logfile") == 0){
 | 
						|
      out_fd = fopen(value,"ab");
 | 
						|
    }
 | 
						|
 | 
						|
  }
 | 
						|
 | 
						|
  return 1;
 | 
						|
}
 | 
						|
 | 
						|
int isPair(amqp_message_t* a, amqp_message_t* b)
 | 
						|
{
 | 
						|
  int keylen = a->properties.correlation_id.len >=
 | 
						|
    b->properties.correlation_id.len ?
 | 
						|
    a->properties.correlation_id.len :
 | 
						|
    b->properties.correlation_id.len;
 | 
						|
  
 | 
						|
  return strncmp(a->properties.correlation_id.bytes,
 | 
						|
		 b->properties.correlation_id.bytes,
 | 
						|
		 keylen) == 0 ? 1 : 0;
 | 
						|
}
 | 
						|
 | 
						|
int connectToServer(MYSQL* server)
 | 
						|
{
 | 
						|
 | 
						|
  
 | 
						|
  mysql_init(server);
 | 
						|
 | 
						|
  mysql_options(server,MYSQL_READ_DEFAULT_GROUP,"client");
 | 
						|
  mysql_options(server,MYSQL_OPT_USE_REMOTE_CONNECTION,0);
 | 
						|
  my_bool tr = 1;
 | 
						|
  mysql_options(server,MYSQL_OPT_RECONNECT,&tr);
 | 
						|
  
 | 
						|
 | 
						|
  MYSQL* result =  mysql_real_connect(server,
 | 
						|
				      c_inst->dbserver,
 | 
						|
				      c_inst->dbuser,
 | 
						|
				      c_inst->dbpasswd,
 | 
						|
				      NULL,
 | 
						|
				      c_inst->dbport,
 | 
						|
				      NULL,
 | 
						|
				      0);
 | 
						|
 
 | 
						|
  
 | 
						|
  if(result==NULL){
 | 
						|
    fprintf(out_fd,"Error: Could not connect to MySQL server: %s\n",mysql_error(server));
 | 
						|
    return 0;
 | 
						|
  }
 | 
						|
 | 
						|
  int bsz = 1024;
 | 
						|
  char *qstr = calloc(bsz,sizeof(char));
 | 
						|
 | 
						|
  
 | 
						|
  if(!qstr){
 | 
						|
    fprintf(stderr, "Fatal Error: Cannot allocate enough memory.\n");
 | 
						|
    return 0;
 | 
						|
  }
 | 
						|
 | 
						|
 | 
						|
  /**Connection ok, check that the database and table exist*/
 | 
						|
 | 
						|
  memset(qstr,0,bsz);
 | 
						|
  sprintf(qstr,DB_DATABASE,c_inst->dbname);
 | 
						|
  if(mysql_query(server,qstr)){
 | 
						|
    fprintf(stderr,"Error: Could not send query MySQL server: %s\n",mysql_error(server));
 | 
						|
  }
 | 
						|
  memset(qstr,0,bsz);
 | 
						|
  sprintf(qstr,"USE %s;",c_inst->dbname);
 | 
						|
  if(mysql_query(server,qstr)){
 | 
						|
    fprintf(stderr,"Error: Could not send query MySQL server: %s\n",mysql_error(server));
 | 
						|
  }
 | 
						|
  
 | 
						|
  memset(qstr,0,bsz);
 | 
						|
	sprintf(qstr,"%s",DB_TABLE);
 | 
						|
  if(mysql_query(server,qstr)){
 | 
						|
    fprintf(stderr,"Error: Could not send query MySQL server: %s\n",mysql_error(server));
 | 
						|
  }
 | 
						|
 | 
						|
  free(qstr);
 | 
						|
  return 1;
 | 
						|
}
 | 
						|
 | 
						|
int sendMessage(MYSQL* server, amqp_message_t* msg)
 | 
						|
{
 | 
						|
  int buffsz = (int)((msg->body.len + 1)*2+1) + 
 | 
						|
    (int)((msg->properties.correlation_id.len + 1)*2+1) + 
 | 
						|
    strlen(DB_INSERT),
 | 
						|
    rval = 0;
 | 
						|
  char *qstr = calloc(buffsz,sizeof(char)),
 | 
						|
    *rawmsg = calloc((msg->body.len + 1),sizeof(char)),
 | 
						|
    *clnmsg = calloc(((msg->body.len + 1)*2+1),sizeof(char)),
 | 
						|
    *rawdate = calloc((msg->body.len + 1),sizeof(char)),
 | 
						|
    *clndate = calloc(((msg->body.len + 1)*2+1),sizeof(char)),
 | 
						|
    *rawtag = calloc((msg->properties.correlation_id.len + 1),sizeof(char)),
 | 
						|
    *clntag = calloc(((msg->properties.correlation_id.len + 1)*2+1),sizeof(char));
 | 
						|
 | 
						|
 | 
						|
  
 | 
						|
  sprintf(qstr,"%.*s",(int)msg->body.len,(char *)msg->body.bytes);
 | 
						|
  fprintf(out_fd,"Received: %s\n",qstr);
 | 
						|
  char *ptr = strtok(qstr,"|");
 | 
						|
  sprintf(rawdate,"%s",ptr);
 | 
						|
  ptr = strtok(NULL,"\n\0");
 | 
						|
  if(ptr == NULL){
 | 
						|
    fprintf(out_fd,"Message content not valid.\n");
 | 
						|
    rval = 1;
 | 
						|
    goto cleanup;
 | 
						|
  }
 | 
						|
  sprintf(rawmsg,"%s",ptr);
 | 
						|
  sprintf(rawtag,"%.*s",(int)msg->properties.correlation_id.len,(char *)msg->properties.correlation_id.bytes);
 | 
						|
  memset(qstr,0,buffsz);
 | 
						|
 
 | 
						|
  mysql_real_escape_string(server,clnmsg,rawmsg,strnlen(rawmsg,msg->body.len + 1));
 | 
						|
  mysql_real_escape_string(server,clndate,rawdate,strnlen(rawdate,msg->body.len + 1));
 | 
						|
  mysql_real_escape_string(server,clntag,rawtag,strnlen(rawtag,msg->properties.correlation_id.len + 1));
 | 
						|
	  
 | 
						|
  if(strncmp(msg->properties.message_id.bytes,
 | 
						|
	     "query",msg->properties.message_id.len) == 0)
 | 
						|
    {
 | 
						|
      
 | 
						|
      sprintf(qstr,DB_INCREMENT,clndate,clnmsg);
 | 
						|
      rval = mysql_query(server,qstr);
 | 
						|
 | 
						|
      if(mysql_affected_rows(server) == 0){
 | 
						|
	memset(qstr,0,buffsz);
 | 
						|
	sprintf(qstr,DB_INSERT,clntag,clnmsg,clndate);
 | 
						|
	rval = mysql_query(server,qstr);
 | 
						|
      }
 | 
						|
 | 
						|
    }else if(strncmp(msg->properties.message_id.bytes,
 | 
						|
		     "reply",msg->properties.message_id.len) == 0){
 | 
						|
 | 
						|
    sprintf(qstr,DB_UPDATE,clnmsg,clndate,clntag);
 | 
						|
    rval = mysql_query(server,qstr);
 | 
						|
 | 
						|
  }else{
 | 
						|
    rval = 1;
 | 
						|
    goto cleanup;
 | 
						|
  }
 | 
						|
  
 | 
						|
 | 
						|
  if(rval){
 | 
						|
    fprintf(stderr,"Could not send query to SQL server:%s\n",mysql_error(server));
 | 
						|
    goto cleanup;
 | 
						|
  }
 | 
						|
 | 
						|
 cleanup:
 | 
						|
  free(qstr);
 | 
						|
  free(rawmsg);
 | 
						|
  free(clnmsg);
 | 
						|
  free(rawdate);
 | 
						|
  free(clndate);
 | 
						|
  free(rawtag);
 | 
						|
  free(clntag);
 | 
						|
  
 | 
						|
  return rval;
 | 
						|
}
 | 
						|
 | 
						|
int sendToServer(MYSQL* server, amqp_message_t* a, amqp_message_t* b){
 | 
						|
 | 
						|
  amqp_message_t *msg, *reply;
 | 
						|
  int buffsz = 2048;
 | 
						|
  char *qstr = calloc(buffsz,sizeof(char));
 | 
						|
 | 
						|
  if(!qstr){
 | 
						|
    fprintf(out_fd, "Fatal Error: Cannot allocate enough memory.\n");
 | 
						|
    free(qstr);
 | 
						|
    return 0;
 | 
						|
  }
 | 
						|
 | 
						|
  if( a->properties.message_id.len == strlen("query") &&
 | 
						|
      strncmp(a->properties.message_id.bytes,"query",
 | 
						|
	      a->properties.message_id.len) == 0){
 | 
						|
 | 
						|
    msg = a;
 | 
						|
    reply = b;
 | 
						|
 | 
						|
  }else{
 | 
						|
 | 
						|
    msg = b;
 | 
						|
    reply = a;
 | 
						|
 | 
						|
  }
 | 
						|
  
 | 
						|
 | 
						|
  printf("pair: %.*s\nquery: %.*s\nreply: %.*s\n",
 | 
						|
	 (int)msg->properties.correlation_id.len,
 | 
						|
	 (char *)msg->properties.correlation_id.bytes,
 | 
						|
	 (int)msg->body.len,
 | 
						|
	 (char *)msg->body.bytes,
 | 
						|
	 (int)reply->body.len,
 | 
						|
	 (char *)reply->body.bytes);
 | 
						|
 | 
						|
  if((int)msg->body.len +
 | 
						|
     (int)reply->body.len +
 | 
						|
     (int)msg->properties.correlation_id.len + 50 >= buffsz)
 | 
						|
    {
 | 
						|
      char *qtmp = calloc(buffsz*2,sizeof(char));
 | 
						|
      free(qstr);
 | 
						|
 | 
						|
      if(qtmp){
 | 
						|
	qstr = qtmp;
 | 
						|
	buffsz *= 2;
 | 
						|
      }else{
 | 
						|
	fprintf(stderr, "Fatal Error: Cannot allocate enough memory.\n");
 | 
						|
	return 0;
 | 
						|
      }
 | 
						|
 | 
						|
    }
 | 
						|
  
 | 
						|
  char *rawmsg = calloc((msg->body.len + 1),sizeof(char)),
 | 
						|
    *clnmsg = calloc(((msg->body.len + 1)*2+1),sizeof(char)),
 | 
						|
    *rawrpl = calloc((reply->body.len + 1),sizeof(char)),
 | 
						|
    *clnrpl = calloc(((reply->body.len + 1)*2+1),sizeof(char)),
 | 
						|
    *rawtag = calloc((msg->properties.correlation_id.len + 1),sizeof(char)),
 | 
						|
    *clntag = calloc(((msg->properties.correlation_id.len + 1)*2+1),sizeof(char));
 | 
						|
 | 
						|
  sprintf(rawmsg,"%.*s",(int)msg->body.len,(char *)msg->body.bytes);
 | 
						|
  sprintf(rawrpl,"%.*s",(int)reply->body.len,(char *)reply->body.bytes);
 | 
						|
  sprintf(rawtag,"%.*s",(int)msg->properties.correlation_id.len,(char *)msg->properties.correlation_id.bytes);
 | 
						|
  
 | 
						|
  char *ptr;
 | 
						|
  while((ptr = strchr(rawmsg,'\n'))){
 | 
						|
    *ptr = ' ';
 | 
						|
  }
 | 
						|
  while((ptr = strchr(rawrpl,'\n'))){
 | 
						|
    *ptr = ' ';
 | 
						|
  }
 | 
						|
  while((ptr = strchr(rawtag,'\n'))){
 | 
						|
    *ptr = ' ';
 | 
						|
  }
 | 
						|
 | 
						|
  mysql_real_escape_string(server,clnmsg,rawmsg,strnlen(rawmsg,msg->body.len + 1));
 | 
						|
  mysql_real_escape_string(server,clnrpl,rawrpl,strnlen(rawrpl,reply->body.len + 1));
 | 
						|
  mysql_real_escape_string(server,clntag,rawtag,strnlen(rawtag,msg->properties.correlation_id.len + 1));
 | 
						|
	  
 | 
						|
  
 | 
						|
 | 
						|
  sprintf(qstr,"INSERT INTO pairs VALUES ('%s','%s','%s');",clnmsg,clnrpl,clntag); 
 | 
						|
  free(rawmsg);
 | 
						|
  free(clnmsg);
 | 
						|
  free(rawrpl);
 | 
						|
  free(clnrpl);
 | 
						|
  free(rawtag);
 | 
						|
  free(clntag);
 | 
						|
  
 | 
						|
  if(mysql_query(server,qstr)){
 | 
						|
    fprintf(stderr,"Could not send query to SQL server:%s\n",mysql_error(server));
 | 
						|
    free(qstr);
 | 
						|
    return 0;
 | 
						|
  }
 | 
						|
  
 | 
						|
  free(qstr);
 | 
						|
  return 1;
 | 
						|
}
 | 
						|
int main(int argc, char** argv)
 | 
						|
{
 | 
						|
  int channel = 1, status = AMQP_STATUS_OK, cnfnlen;
 | 
						|
  amqp_socket_t *socket = NULL;
 | 
						|
  amqp_connection_state_t conn;
 | 
						|
  amqp_rpc_reply_t ret;
 | 
						|
  amqp_message_t *reply = NULL;
 | 
						|
  amqp_frame_t frame;
 | 
						|
  struct timeval timeout;
 | 
						|
  MYSQL db_inst;
 | 
						|
  char ch, *cnfname = NULL, *cnfpath = NULL;
 | 
						|
  static const char* fname = "consumer.cnf";
 | 
						|
 | 
						|
  if((c_inst = calloc(1,sizeof(CONSUMER))) == NULL){
 | 
						|
    fprintf(stderr, "Fatal Error: Cannot allocate enough memory.\n");
 | 
						|
    return 1;
 | 
						|
  }
 | 
						|
 | 
						|
  if(signal(SIGINT,sighndl) == SIG_IGN){
 | 
						|
    signal(SIGINT,SIG_IGN);
 | 
						|
  }
 | 
						|
 | 
						|
  while((ch = getopt(argc,argv,"c:"))!= -1){
 | 
						|
    switch(ch){
 | 
						|
    case 'c':
 | 
						|
      cnfnlen = strlen(optarg);
 | 
						|
      cnfpath = strdup(optarg);
 | 
						|
      break;
 | 
						|
    default:
 | 
						|
 | 
						|
      break;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  cnfname = calloc(cnfnlen + strlen(fname) + 1,sizeof(char));
 | 
						|
 | 
						|
  if(cnfpath){
 | 
						|
 | 
						|
    /**Config file path as argument*/
 | 
						|
    strcpy(cnfname,cnfpath);
 | 
						|
    if(cnfpath[cnfnlen-1] != '/'){
 | 
						|
      strcat(cnfname,"/");
 | 
						|
    }
 | 
						|
 | 
						|
  }  
 | 
						|
  
 | 
						|
  strcat(cnfname,fname);
 | 
						|
 | 
						|
  timeout.tv_sec = 1;
 | 
						|
  timeout.tv_usec = 0;
 | 
						|
  all_ok = 1;
 | 
						|
  out_fd = NULL;
 | 
						|
 | 
						|
 | 
						|
 | 
						|
  /**Parse the INI file*/
 | 
						|
  if(ini_parse(cnfname,handler,NULL) < 0){
 | 
						|
    
 | 
						|
    /**Try to parse a config in the same directory*/
 | 
						|
    if(ini_parse(fname,handler,NULL) < 0){
 | 
						|
      fprintf(stderr, "Fatal Error: Error parsing configuration file!\n");
 | 
						|
    goto fatal_error;
 | 
						|
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  if(out_fd == NULL){
 | 
						|
    out_fd = stdout;
 | 
						|
  }
 | 
						|
 | 
						|
  fprintf(out_fd,"\n--------------------------------------------------------------\n");
 | 
						|
  
 | 
						|
  /**Confirm that all parameters were in the configuration file*/
 | 
						|
  if(!c_inst->hostname||!c_inst->vhost||!c_inst->user||
 | 
						|
     !c_inst->passwd||!c_inst->dbpasswd||!c_inst->queue||
 | 
						|
     !c_inst->dbserver||!c_inst->dbname||!c_inst->dbuser){
 | 
						|
    fprintf(stderr, "Fatal Error: Inadequate configuration file!\n");
 | 
						|
    goto fatal_error;    
 | 
						|
  }
 | 
						|
 | 
						|
  connectToServer(&db_inst);
 | 
						|
 | 
						|
  if((conn = amqp_new_connection()) == NULL || 
 | 
						|
     (socket = amqp_tcp_socket_new(conn)) == NULL){
 | 
						|
    fprintf(stderr, "Fatal Error: Cannot create connection object or socket.\n");
 | 
						|
    goto fatal_error;
 | 
						|
  }
 | 
						|
  
 | 
						|
  if(amqp_socket_open(socket, c_inst->hostname, c_inst->port)){
 | 
						|
    fprintf(stderr, "RabbitMQ Error: Cannot open socket.\n");
 | 
						|
    goto error;
 | 
						|
  }
 | 
						|
  
 | 
						|
  ret = amqp_login(conn, c_inst->vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, c_inst->user, c_inst->passwd);
 | 
						|
 | 
						|
  if(ret.reply_type != AMQP_RESPONSE_NORMAL){
 | 
						|
    fprintf(stderr, "RabbitMQ Error: Cannot login to server.\n");
 | 
						|
    goto error;
 | 
						|
  }
 | 
						|
 | 
						|
  amqp_channel_open(conn, channel);
 | 
						|
  ret = amqp_get_rpc_reply(conn);
 | 
						|
 | 
						|
  if(ret.reply_type != AMQP_RESPONSE_NORMAL){
 | 
						|
    fprintf(stderr, "RabbitMQ Error: Cannot open channel.\n");
 | 
						|
    goto error;
 | 
						|
  }  
 | 
						|
 | 
						|
  reply = malloc(sizeof(amqp_message_t));
 | 
						|
  if(!reply){
 | 
						|
    fprintf(stderr, "Error: Cannot allocate enough memory.\n");
 | 
						|
    goto error;
 | 
						|
  }
 | 
						|
  amqp_basic_consume(conn,channel,amqp_cstring_bytes(c_inst->queue),amqp_empty_bytes,0,0,0,amqp_empty_table);
 | 
						|
 | 
						|
  while(all_ok){
 | 
						|
     
 | 
						|
    status = amqp_simple_wait_frame_noblock(conn,&frame,&timeout);
 | 
						|
 | 
						|
    /**No frames to read from server, possibly out of messages*/
 | 
						|
    if(status == AMQP_STATUS_TIMEOUT){ 
 | 
						|
      sleep(timeout.tv_sec);
 | 
						|
      continue;
 | 
						|
    }
 | 
						|
 | 
						|
    if(frame.payload.method.id == AMQP_BASIC_DELIVER_METHOD){
 | 
						|
 | 
						|
      amqp_basic_deliver_t* decoded = (amqp_basic_deliver_t*)frame.payload.method.decoded;
 | 
						|
	
 | 
						|
      amqp_read_message(conn,channel,reply,0);
 | 
						|
 | 
						|
      if(sendMessage(&db_inst,reply)){
 | 
						|
 | 
						|
	fprintf(stderr,"RabbitMQ Error: Received malformed message.\n");
 | 
						|
	amqp_basic_reject(conn,channel,decoded->delivery_tag,0);	
 | 
						|
	amqp_destroy_message(reply);
 | 
						|
 | 
						|
      }else{
 | 
						|
 | 
						|
	amqp_basic_ack(conn,channel,decoded->delivery_tag,0);
 | 
						|
	amqp_destroy_message(reply);	
 | 
						|
 | 
						|
      }
 | 
						|
      
 | 
						|
    }else{
 | 
						|
      fprintf(stderr,"RabbitMQ Error: Received method from server: %s\n",amqp_method_name(frame.payload.method.id));
 | 
						|
      all_ok = 0;
 | 
						|
      goto error;
 | 
						|
    }
 | 
						|
 | 
						|
  }
 | 
						|
 | 
						|
  fprintf(out_fd,"Shutting down...\n");
 | 
						|
 error:
 | 
						|
 | 
						|
  mysql_close(&db_inst);
 | 
						|
  mysql_library_end();
 | 
						|
  if(c_inst && c_inst->query_stack){
 | 
						|
 | 
						|
    while(c_inst->query_stack){
 | 
						|
      DELIVERY* d = c_inst->query_stack->next;
 | 
						|
      amqp_destroy_message(c_inst->query_stack->message);
 | 
						|
      free(c_inst->query_stack);
 | 
						|
      c_inst->query_stack = d;
 | 
						|
    }
 | 
						|
 | 
						|
  }
 | 
						|
  
 | 
						|
  amqp_channel_close(conn, channel, AMQP_REPLY_SUCCESS);
 | 
						|
  amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
 | 
						|
  amqp_destroy_connection(conn);
 | 
						|
 fatal_error:
 | 
						|
 | 
						|
  if(out_fd){
 | 
						|
    fclose(out_fd);
 | 
						|
  }
 | 
						|
 | 
						|
  
 | 
						|
  if(c_inst){
 | 
						|
 | 
						|
    free(c_inst->hostname);
 | 
						|
    free(c_inst->vhost);
 | 
						|
    free(c_inst->user);
 | 
						|
    free(c_inst->passwd);
 | 
						|
    free(c_inst->queue);
 | 
						|
    free(c_inst->dbserver);
 | 
						|
    free(c_inst->dbname);
 | 
						|
    free(c_inst->dbuser);
 | 
						|
    free(c_inst->dbpasswd);    
 | 
						|
    free(c_inst);
 | 
						|
    
 | 
						|
  }
 | 
						|
 | 
						|
  
 | 
						|
  
 | 
						|
  return all_ok;
 | 
						|
}
 |