MaxScale/utils/skygw_utils.cc
vraatikka d11741130d Changed log manager API to support variable length formatted log strings. New API is as follows:
/** No change in these */
bool skygw_logmanager_init(void** buf, int argc, char* argv[]);
void skygw_logmanager_done(void** buf);
void skygw_logmanager_exit(void);
int  skygw_log_flush(logfile_id_t id);

/** writebuf remains unused, but formatted string is now possible and in case 
 * of formatted string, arbitrary long argument list is supported too. Max 
 * length for a log string is defined to BUFSIZ, whose value depends on the 
 * system but typically is 4/8KB.
 */
int  skygw_log_write(void* writebuf, logfile_id_t id, char* format, ...);
int  skygw_log_write_flush(void* writebuf, logfile_id_t id, char* format, ...);

makefile.inc includes new CFLAG : SS_PROF, which is set if PROF=Y on make command line or in build_gateway.inc .
ss_debug.h includes corresponding ss_prof(exp) macro which equals to exp if SS_PROF is defined and to empty if in other case.

mlist_t now includes datadel function which is a callback and it is executed for mlnode_data on node exit.
2013-07-02 14:46:39 +03:00

1602 lines
38 KiB
C++

/*
* This file is distributed as part of the SkySQL Gateway. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright SkySQL Ab 2013
*/
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <errno.h>
#include <string.h>
#include <time.h>
#include "skygw_debug.h"
#include "skygw_types.h"
#include "skygw_utils.h"
/** Single-linked list for storing test cases */
struct slist_node_st {
skygw_chk_t slnode_chk_top;
slist_t* slnode_list;
slist_node_t* slnode_next;
void* slnode_data;
size_t slnode_cursor_refcount;
skygw_chk_t slnode_chk_tail;
};
struct slist_st {
skygw_chk_t slist_chk_top;
slist_node_t* slist_head;
slist_node_t* slist_tail;
size_t slist_nelems;
slist_t* slist_cursors_list;
skygw_chk_t slist_chk_tail;
};
struct slist_cursor_st {
skygw_chk_t slcursor_chk_top;
slist_t* slcursor_list;
slist_node_t* slcursor_pos;
skygw_chk_t slcursor_chk_tail;
};
struct skygw_thread_st {
skygw_chk_t sth_chk_top;
bool sth_must_exit;
simple_mutex_t* sth_mutex;
pthread_t sth_parent;
pthread_t sth_thr;
int sth_errno;
#if defined(SS_DEBUG)
skygw_thr_state_t sth_state;
#endif
char* sth_name;
void* (*sth_thrfun)(void* data);
void* sth_data;
skygw_chk_t sth_chk_tail;
};
struct skygw_message_st {
skygw_chk_t mes_chk_top;
bool mes_sent;
pthread_mutex_t mes_mutex;
pthread_cond_t mes_cond;
skygw_chk_t mes_chk_tail;
};
struct skygw_file_st {
skygw_chk_t sf_chk_top;
char* sf_fname;
FILE* sf_file;
skygw_chk_t sf_chk_tail;
};
/** End of structs and types */
#if defined(MLIST)
static mlist_node_t* mlist_node_init(void* data, mlist_cursor_t* cursor);
//static mlist_node_t* mlist_node_get_next(mlist_node_t* curr_node);
//static mlist_node_t* mlist_get_first(mlist_t* list);
//static mlist_cursor_t* mlist_get_cursor(mlist_t* list);
#endif /* MLIST */
static slist_cursor_t* slist_cursor_init(
slist_t* list);
static slist_t* slist_init_ex(
bool create_cursors);
static slist_node_t* slist_node_init(
void* data,
slist_cursor_t* cursor);
static void slist_add_node(
slist_t* list,
slist_node_t* node);
static slist_node_t* slist_node_get_next(
slist_node_t* curr_node);
static slist_node_t* slist_get_first(
slist_t* list);
static slist_cursor_t* slist_get_cursor(
slist_t* list);
static bool file_write_header(skygw_file_t* file);
static void simple_mutex_free_memory(simple_mutex_t* sm);
static void mlist_free_memory(mlist_t* ml, char* name);
static void thread_free_memory(skygw_thread_t* th, char* name);
/** End of static function declarations */
int atomic_add(
int *variable,
int value)
{
asm volatile(
"lock; xaddl %%eax, %2;"
:"=a" (value)
: "a" (value), "m" (*variable)
: "memory" );
return value;
}
/** mutexed list, mlist */
#if defined(MLIST)
int skygw_rwlock_rdlock(
skygw_rwlock_t* rwlock)
{
int err = pthread_rwlock_rdlock(rwlock->srw_rwlock);
if (err == 0) {
rwlock->srw_rwlock_thr = pthread_self();
} else {
rwlock->srw_rwlock_thr = 0;
ss_dfprintf(stderr, "pthread_rwlock_rdlock : %s\n", strerror(err));
}
return err;
}
int skygw_rwlock_wrlock(
skygw_rwlock_t* rwlock)
{
int err = pthread_rwlock_wrlock(rwlock->srw_rwlock);
if (err == 0) {
rwlock->srw_rwlock_thr = pthread_self();
} else {
rwlock->srw_rwlock_thr = 0;
ss_dfprintf(stderr, "pthread_rwlock_wrlock : %s\n", strerror(err));
}
return err;
}
int skygw_rwlock_unlock(
skygw_rwlock_t* rwlock)
{
int err = pthread_rwlock_rdlock(rwlock->srw_rwlock);
if (err == 0) {
rwlock->srw_rwlock_thr = 0;
} else {
ss_dfprintf(stderr, "pthread_rwlock_unlock : %s\n", strerror(err));
}
return err;
}
int skygw_rwlock_destroy(
skygw_rwlock_t* rwlock)
{
int err = pthread_rwlock_destroy(rwlock->srw_rwlock);
if (err == 0) {
rwlock->srw_rwlock_thr = 0;
rwlock->srw_rwlock = NULL;
} else {
ss_dfprintf(stderr, "pthread_rwlock_destroy : %s\n", strerror(err));
}
return err;
}
int skygw_rwlock_init(
skygw_rwlock_t** rwlock)
{
skygw_rwlock_t* rwl;
int err;
rwl = (skygw_rwlock_t *)calloc(1, sizeof(skygw_rwlock_t));
rwl->srw_chk_top = CHK_NUM_RWLOCK;
rwl->srw_chk_tail = CHK_NUM_RWLOCK;
err = pthread_rwlock_init(rwl->srw_rwlock, NULL);
ss_dassert(err == 0);
if (err != 0) {
ss_dfprintf(stderr,
"Creating pthread_rwlock failed : %s\n",
strerror(err));
goto return_err;
}
return_err:
return err;
}
/**
* @node Cut off nodes of the list.
*
* Parameters:
* @param ml - <usage>
* <description>
*
* @return Pointer to the first of the detached nodes.
*
*
* @details (write detailed description here)
*
*/
mlist_node_t* mlist_detach_nodes(
mlist_t* ml)
{
mlist_node_t* node;
CHK_MLIST(ml);
node = ml->mlist_first;
ml->mlist_first = NULL;
ml->mlist_last = NULL;
ml->mlist_nodecount = 0;
return node;
}
/**
* @node Create a list with rwlock and optional read-only cursor
*
* Parameters:
* @param listp - <usage>
* <description>
*
* @param cursor - <usage>
* <description>
*
* @param name - <usage>
* <description>
*
* @return Address of mlist_t struct.
*
*
* @details Cursor must protect its reads with read lock, and after acquiring
* read lock reader must check whether the list is deleted (mlist_deleted).
*
*/
mlist_t* mlist_init(
mlist_t* listp,
mlist_cursor_t** cursor,
char* name,
void (*datadel)(void*))
{
mlist_cursor_t* c;
mlist_t* list;
if (cursor != NULL) {
ss_dassert(*cursor == NULL);
}
/** listp is not NULL if caller wants flat list */
if (listp == NULL) {
list = (mlist_t*)calloc(1, sizeof(mlist_t));
} else {
/** Caller wants list flat, memory won't be freed */
list = listp;
list->mlist_flat = TRUE;
}
ss_dassert(list != NULL);
if (list == NULL) {
fprintf(stderr, "Allocating memory for mlist failed\n");
mlist_free_memory(list, name);
goto return_list;
}
list->mlist_chk_top = CHK_NUM_MLIST;
list->mlist_chk_tail = CHK_NUM_MLIST;
/** Set data deletion callback fun */
list->mlist_datadel = datadel;
if (name != NULL) {
list->mlist_name = name;
}
/** Create mutex, return NULL if fails. */
if (simple_mutex_init(
&list->mlist_mutex,
strdup("writebuf mutex")) == NULL)
{
ss_dfprintf(stderr, "Creating rwlock for mlist failed\n");
mlist_free_memory(list, name);
list = NULL;
goto return_list;
}
/** Create cursor for reading the list */
if (cursor != NULL) {
c = mlist_cursor_init(list);
if (c == NULL) {
simple_mutex_done(&list->mlist_mutex);
mlist_free_memory(list, name);
list = NULL;
goto return_list;
}
CHK_MLIST_CURSOR(c);
*cursor = c;
}
CHK_MLIST(list);
return_list:
return list;
}
/**
* @node Free mlist memory allocations. name must be explicitly
* set if mlist has one.
*
* Parameters:
* @param ml - <usage>
* <description>
*
* @param name - <usage>
* <description>
*
* @return void
*
*
* @details (write detailed description here)
*
*/
static void mlist_free_memory(
mlist_t* ml,
char* name)
{
mlist_node_t* node;
/** name */
if (name != NULL) {
free(name);
}
if (ml != NULL) {
/** list data */
while(ml->mlist_first != NULL) {
/** Scan list and free nodes and data inside nodes */
node = ml->mlist_first->mlnode_next;
mlist_node_done(ml->mlist_first);
ml->mlist_first = node;
}
/** list structure */
if (!ml->mlist_flat) {
free(ml);
}
}
}
void mlist_node_done(
mlist_node_t* n)
{
CHK_MLIST_NODE(n);
if (n->mlnode_data != NULL) {
if (n->mlnode_list->mlist_datadel != NULL) {
(n->mlnode_list->mlist_datadel(n->mlnode_data));
}
free(n->mlnode_data);
}
free(n);
}
void* mlist_node_get_data(
mlist_node_t* node)
{
CHK_MLIST_NODE(node);
return node->mlnode_data;
}
mlist_cursor_t* mlist_cursor_init(
mlist_t* list)
{
CHK_MLIST(list);
mlist_cursor_t* c;
/** acquire shared lock to the list */
simple_mutex_lock(&list->mlist_mutex, TRUE);
c = (mlist_cursor_t *)calloc(1, sizeof(mlist_cursor_t));
if (c == NULL) {
goto return_cursor;
}
c->mlcursor_chk_top = CHK_NUM_MLIST_CURSOR;
c->mlcursor_chk_tail = CHK_NUM_MLIST_CURSOR;
c->mlcursor_list = list;
/** Set cursor position if list is not empty */
if (list->mlist_first != NULL) {
c->mlcursor_pos = list->mlist_first;
}
simple_mutex_unlock(&list->mlist_mutex);
CHK_MLIST_CURSOR(c);
return_cursor:
return c;
}
/**
* @node Mark list as deleted and free the memory.
*
* Parameters:
* @param list - <usage>
* <description>
*
* @return void
*
*
* @details (write detailed description here)
*
*/
void mlist_done(
mlist_t* list)
{
CHK_MLIST(list);
simple_mutex_lock(&list->mlist_mutex, TRUE);
list->mlist_deleted = TRUE;
simple_mutex_unlock(&list->mlist_mutex);
simple_mutex_done(&list->mlist_mutex);
mlist_free_memory(list, list->mlist_name);
}
void* mlist_cursor_get_data_nomutex(
mlist_cursor_t* mc)
{
CHK_MLIST_CURSOR(mc);
return (mc->mlcursor_pos->mlnode_data);
}
void mlist_add_data_nomutex(
mlist_t* list,
void* data)
{
mlist_add_node_nomutex(list, mlist_node_init(data, NULL));
}
static mlist_node_t* mlist_node_init(
void* data,
mlist_cursor_t* cursor)
{
mlist_node_t* node;
node = (mlist_node_t*)calloc(1, sizeof(mlist_node_t));
node->mlnode_chk_top = CHK_NUM_MLIST_NODE;
node->mlnode_chk_tail = CHK_NUM_MLIST_NODE;
node->mlnode_data = data;
CHK_MLIST_NODE(node);
if (cursor != NULL) {
cursor->mlcursor_pos = node;
}
return node;
}
mlist_node_t* mlist_detach_first(
mlist_t* ml)
{
mlist_node_t* node;
CHK_MLIST(ml);
node = ml->mlist_first;
CHK_MLIST_NODE(node);
ml->mlist_first = node->mlnode_next;
node->mlnode_next = NULL;
ml->mlist_nodecount -= 1;
if (ml->mlist_nodecount == 0) {
ml->mlist_last = NULL;
} else {
CHK_MLIST_NODE(ml->mlist_first);
}
CHK_MLIST(ml);
return (node);
}
/**
* @node Add new node to end of list
*
* Parameters:
* @param list - <usage>
* <description>
*
* @param newnode - <usage>
* <description>
*
* @param add_last - <usage>
* <description>
*
* @return void
*
*
* @details (write detailed description here)
*
*/
void mlist_add_node_nomutex(
mlist_t* list,
mlist_node_t* newnode)
{
CHK_MLIST(list);
CHK_MLIST_NODE(newnode);
ss_dassert(!list->mlist_deleted);
/** Find location for new node */
if (list->mlist_last != NULL) {
ss_dassert(!list->mlist_last->mlnode_deleted);
CHK_MLIST_NODE(list->mlist_last);
CHK_MLIST_NODE(list->mlist_first);
ss_dassert(list->mlist_last->mlnode_next == NULL);
list->mlist_last->mlnode_next = newnode;
} else {
list->mlist_first = newnode;
}
list->mlist_last = newnode;
newnode->mlnode_list = list;
list->mlist_nodecount += 1;
CHK_MLIST(list);
}
bool mlist_cursor_move_to_first(
mlist_cursor_t* mc)
{
bool succp = FALSE;
mlist_t* list;
CHK_MLIST_CURSOR(mc);
list = mc->mlcursor_list;
CHK_MLIST(list);
simple_mutex_lock(&list->mlist_mutex, TRUE);
if (mc->mlcursor_list->mlist_deleted) {
return FALSE;
}
/** Set position point to first node */
mc->mlcursor_pos = list->mlist_first;
if (mc->mlcursor_pos != NULL) {
CHK_MLIST_NODE(mc->mlcursor_pos);
succp = TRUE;
}
simple_mutex_unlock(&list->mlist_mutex);
return succp;
}
#endif /* MLIST */
/** End of mlist */
static slist_t* slist_init_ex(
bool create_cursors)
{
slist_t* list;
list = (slist_t*)calloc(1, sizeof(slist_t));
list->slist_chk_top = CHK_NUM_SLIST;
list->slist_chk_tail = CHK_NUM_SLIST;
if (create_cursors) {
list->slist_cursors_list = slist_init_ex(FALSE);
}
return list;
}
static slist_node_t* slist_node_init(
void* data,
slist_cursor_t* cursor)
{
slist_node_t* node;
node = (slist_node_t*)calloc(1, sizeof(slist_node_t));
node->slnode_chk_top = CHK_NUM_SLIST_NODE;
node->slnode_chk_tail = CHK_NUM_SLIST_NODE;
node->slnode_data = data;
CHK_SLIST_NODE(node);
if (cursor != NULL) {
node->slnode_cursor_refcount += 1;
cursor->slcursor_pos = node;
}
return node;
}
static void slist_add_node(
slist_t* list,
slist_node_t* node)
{
CHK_SLIST(list);
CHK_SLIST_NODE(node);
if (list->slist_tail != NULL) {
CHK_SLIST_NODE(list->slist_tail);
CHK_SLIST_NODE(list->slist_head);
ss_dassert(list->slist_tail->slnode_next == NULL);
list->slist_tail->slnode_next = node;
} else {
list->slist_head = node;
}
list->slist_tail = node;
node->slnode_list = list;
list->slist_nelems += 1;
CHK_SLIST(list);
}
static slist_node_t* slist_node_get_next(
slist_node_t* curr_node)
{
CHK_SLIST_NODE(curr_node);
if (curr_node->slnode_next != NULL) {
CHK_SLIST_NODE(curr_node->slnode_next);
return (curr_node->slnode_next);
}
return NULL;
}
static slist_node_t* slist_get_first(
slist_t* list)
{
CHK_SLIST(list);
if (list->slist_head != NULL) {
CHK_SLIST_NODE(list->slist_head);
return list->slist_head;
}
return NULL;
}
static slist_cursor_t* slist_get_cursor(
slist_t* list)
{
CHK_SLIST(list);
slist_cursor_t* c;
c = slist_cursor_init(list);
return c;
}
static slist_cursor_t* slist_cursor_init(
slist_t* list)
{
CHK_SLIST(list);
slist_cursor_t* c;
c = (slist_cursor_t *)calloc(1, sizeof(slist_cursor_t));
c->slcursor_chk_top = CHK_NUM_SLIST_CURSOR;
c->slcursor_chk_tail = CHK_NUM_SLIST_CURSOR;
c->slcursor_list = list;
/** Set cursor position is list is not empty */
if (list->slist_head != NULL) {
list->slist_head->slnode_cursor_refcount += 1;
c->slcursor_pos = list->slist_head;
}
/** Add cursor to cursor list */
slist_add_node(list->slist_cursors_list, slist_node_init(c, NULL));
CHK_SLIST_CURSOR(c);
return c;
}
/**
* @node Create a cursor and a list with cursors supported. 19.6.2013 :
* supports only cursor per list.
*
* Parameters:
* @param void - <usage>
* <description>
*
* @return returns a pointer to cursor, which is not positioned
* because the list is empty.
*
*
* @details (write detailed description here)
*
*/
slist_cursor_t* slist_init(void)
{
slist_t* list;
slist_cursor_t* slc;
list = slist_init_ex(TRUE);
CHK_SLIST(list);
slc = slist_cursor_init(list);
CHK_SLIST_CURSOR(slc);
return slc;
}
/**
* @node moves cursor to the first node of list.
*
* Parameters:
* @param c - <usage>
* <description>
*
* @return TRUE if there is first node in the list
* FALSE is the list is empty.
*
*
* @details (write detailed description here)
*
*/
bool slcursor_move_to_begin(
slist_cursor_t* c)
{
bool succp = TRUE;
slist_t* list;
CHK_SLIST_CURSOR(c);
list = c->slcursor_list;
CHK_SLIST(list);
c->slcursor_pos = list->slist_head;
if (c->slcursor_pos == NULL) {
succp = FALSE;
}
return succp;
}
/**
* @node moves cursor to next node
*
* Parameters:
* @param c - <usage>
* <description>
*
* @return TRUE in success, FALSE is there is no next node on the list.
*
*
* @details (write detailed description here)
*
*/
bool slcursor_step_ahead(
slist_cursor_t* c)
{
bool succp = FALSE;
slist_node_t* node;
CHK_SLIST_CURSOR(c);
CHK_SLIST_NODE(c->slcursor_pos);
node = c->slcursor_pos->slnode_next;
if (node != NULL) {
CHK_SLIST_NODE(node);
c->slcursor_pos = node;
succp = TRUE;
}
return succp;
}
void* slcursor_get_data(
slist_cursor_t* c)
{
slist_node_t* node;
void* data = NULL;
CHK_SLIST_CURSOR(c);
node = c->slcursor_pos;
if (node != NULL) {
CHK_SLIST_NODE(node);
data = node->slnode_data;
}
return data;
}
/**
* @node Add data to the list by using cursor.
*
* Parameters:
* @param c - <usage>
* <description>
*
* @param data - <usage>
* <description>
*
* @return void
*
*
* @details (write detailed description here)
*
*/
void slcursor_add_data(
slist_cursor_t* c,
void* data)
{
slist_t* list;
slist_node_t* pos;
CHK_SLIST_CURSOR(c);
list = c->slcursor_list;
CHK_SLIST(list);
pos = c->slcursor_pos;
if (pos != NULL) {
CHK_SLIST_NODE(pos);
pos = list->slist_tail->slnode_next;
}
ss_dassert(pos == NULL);
pos = slist_node_init(data, c);
slist_add_node(list, pos);
CHK_SLIST(list);
CHK_SLIST_CURSOR(c);
}
void slist_done(
slist_cursor_t* c)
{
bool succp;
void* data;
succp = slcursor_move_to_begin(c);
while (succp) {
data = slcursor_get_data(c);
free(data);
succp = slcursor_step_ahead(c);
}
free(c->slcursor_list);
free(c);
}
/** End of list implementation */
/**
* @node Initialize thread data structure
*
* Parameters:
* @param void - <usage>
* <description>
*
* @param sth_thrfun - <usage>
* <description>
*
* @return
*
*
* @details (write detailed description here)
*
*/
skygw_thread_t* skygw_thread_init(
char* name,
void* (*sth_thrfun)(void* data),
void* data)
{
skygw_thread_t* th =
(skygw_thread_t *)calloc(1, sizeof(skygw_thread_t));
if (th == NULL) {
fprintf(stderr, "FATAL: memory allocation for thread failed\n");
goto return_th;
}
ss_dassert(th != NULL);
th->sth_chk_top = CHK_NUM_THREAD;
th->sth_chk_tail = CHK_NUM_THREAD;
th->sth_parent = pthread_self();
ss_debug(th->sth_state = THR_INIT;)
th->sth_name = name;
th->sth_mutex = simple_mutex_init(NULL, strdup(name));
if (th->sth_mutex == NULL) {
thread_free_memory(th, th->sth_name);
goto return_th;
}
th->sth_thrfun = sth_thrfun;
th->sth_data = data;
CHK_THREAD(th);
return_th:
return th;
}
static void thread_free_memory(
skygw_thread_t* th,
char* name)
{
if (name != NULL) {
free(name);
}
free(th);
}
/**
* @node Release skygw_thread data except filewriter.
*
* Parameters:
* @param th - <usage>
* <description>
*
* @return void
*
*
* @details (write detailed description here)
*
*/
void skygw_thread_done(
skygw_thread_t* th)
{
if (th != NULL) {
CHK_THREAD(th);
ss_dassert(th->sth_state == THR_STOPPED);
ss_debug(th->sth_state = THR_DONE;)
simple_mutex_done(th->sth_mutex);
pthread_join(th->sth_thr, NULL);
thread_free_memory(th, th->sth_name);
}
}
pthread_t skygw_thread_gettid(
skygw_thread_t* thr)
{
CHK_THREAD(thr);
return thr->sth_thr;
}
int skygw_thread_start(
skygw_thread_t* thr)
{
int err;
CHK_THREAD(thr);
err = pthread_create(&thr->sth_thr,
NULL,
thr->sth_thrfun,
thr);
ss_dassert(err == 0);
if (err != 0) {
fprintf(stderr,
"FATAL: starting file writer thread failed, "
"errno %d : %s\n",
err,
strerror(errno));
goto return_err;
}
ss_dfprintf(stderr, "Started %s thread\n", thr->sth_name);
return_err:
return err;
}
#if defined(SS_DEBUG)
skygw_thr_state_t skygw_thread_get_state(
skygw_thread_t* thr)
{
CHK_THREAD(thr);
return thr->sth_state;
}
#endif
/**
* @node Update thread state
*
* Parameters:
* @param thr - <usage>
* <description>
*
* @param state - <usage>
* <description>
*
* @return void
*
*
* @details Thread must check state with mutex.
*
*/
#if defined(SS_DEBUG)
void skygw_thread_set_state(
skygw_thread_t* thr,
skygw_thr_state_t state)
{
CHK_THREAD(thr);
simple_mutex_lock(thr->sth_mutex, TRUE);
thr->sth_state = state;
simple_mutex_unlock(thr->sth_mutex);
}
#endif
/**
* @node Set exit flag for thread from other thread
*
* Parameters:
* @param thr - <usage>
* <description>
*
* @return
*
*
* @details This call informs thread about exit flag and waits the response.
*
*/
bool skygw_thread_set_exitflag(
skygw_thread_t* thr,
skygw_message_t* sendmes,
skygw_message_t* recmes)
{
bool succp = FALSE;
/**
* If thread struct pointer is NULL there's running thread
* neither.
*/
if (thr == NULL) {
succp = TRUE;
goto return_succp;
}
CHK_THREAD(thr);
CHK_MESSAGE(sendmes);
CHK_MESSAGE(recmes);
simple_mutex_lock(thr->sth_mutex, TRUE);
succp = !thr->sth_must_exit;
thr->sth_must_exit = TRUE;
simple_mutex_unlock(thr->sth_mutex);
/** Inform thread and wait for response */
if (succp) {
skygw_message_send(sendmes);
skygw_message_wait(recmes);
}
ss_dassert(thr->sth_state == THR_STOPPED);
return_succp:
return succp;
}
void* skygw_thread_get_data(
skygw_thread_t* thr)
{
CHK_THREAD(thr);
return thr->sth_data;
}
bool skygw_thread_must_exit(
skygw_thread_t* thr)
{
CHK_THREAD(thr);
return thr->sth_must_exit;
}
void acquire_lock(
int* l)
{
register short int misscount = 0;
while (atomic_add(l, 1) != 0) {
atomic_add(l, -1);
misscount += 1;
if (misscount > 10) {
usleep(rand()%100);
misscount = 0;
}
}
}
void release_lock(
int* l)
{
atomic_add(l, -1);
}
/**
* @node Create a simple_mutex structure which encapsulates pthread_mutex.
*
* Parameters:
* @param name - <usage>
* <description>
*
* @return
*
*
* @details If mutex is flat, sm_enabled can be read if the memory is not freed.
* If flat mutex exists, sm_enabled is TRUE.
* If mutex allocates its own memory, the pointer is NULL if mutex doesn't
* exist.
*
*/
simple_mutex_t* simple_mutex_init(
simple_mutex_t* mutexptr,
char* name)
{
int err;
simple_mutex_t* sm;
/** Copy pointer only if flat, allocate memory otherwise. */
if (mutexptr != NULL) {
sm = mutexptr;
sm->sm_flat = TRUE;
} else {
sm = (simple_mutex_t *)calloc(1, sizeof(simple_mutex_t));
}
ss_dassert(sm != NULL);
sm->sm_chk_top = CHK_NUM_SIMPLE_MUTEX;
sm->sm_chk_tail = CHK_NUM_SIMPLE_MUTEX;
sm->sm_name = name;
/** Create pthread mutex */
err = pthread_mutex_init(&sm->sm_mutex, NULL);
if (err != 0) {
fprintf(stderr,
"FATAL : initializing simple mutex %s failed, "
"errno %d : %s\n",
name,
err,
strerror(errno));
perror("simple_mutex : ");
/** Write zeroes if flat, free otherwise. */
if (sm->sm_flat) {
memset(sm, 0, sizeof(sm));
} else {
simple_mutex_free_memory(sm);
sm = NULL;
}
goto return_sm;
}
sm->sm_enabled = TRUE;
CHK_SIMPLE_MUTEX(sm);
ss_dfprintf(stderr, "Initialized simple mutex %s.\n", name);
return_sm:
return sm;
}
int simple_mutex_done(
simple_mutex_t* sm)
{
int err;
CHK_SIMPLE_MUTEX(sm);
if (atomic_add(&sm->sm_enabled, -1) != 1) {
atomic_add(&sm->sm_enabled, 1);
}
err = pthread_mutex_destroy(&sm->sm_mutex);
if (err != 0) {
goto return_err;
}
simple_mutex_free_memory(sm);
return_err:
if (err != 0) {
fprintf(stderr,
"FATAL : destroying simple mutex %s failed, "
"errno %d : %s\n",
sm->sm_name,
err,
strerror(errno));
perror("simple_mutex : ");
}
return err;
}
static void simple_mutex_free_memory(
simple_mutex_t* sm)
{
if (sm->sm_name != NULL) {
free(sm->sm_name);
}
if (!sm->sm_flat) {
free(sm);
}
}
int simple_mutex_lock(
simple_mutex_t* sm,
bool block)
{
int err;
if (block) {
err = pthread_mutex_lock(&sm->sm_mutex);
} else {
err = pthread_mutex_trylock(&sm->sm_mutex);
}
if (err != 0) {
fprintf(stderr,
"INFO : Locking simple mutex %s failed, "
"errno %d : %s\n",
sm->sm_name,
err,
strerror(errno));
perror("simple_mutex : ");
} else {
sm->sm_locked = TRUE;
sm->sm_lock_thr = pthread_self();
}
return err;
}
int simple_mutex_unlock(
simple_mutex_t* sm)
{
int err;
err = pthread_mutex_unlock(&sm->sm_mutex);
if (err != 0) {
fprintf(stderr,
"INFO : locking simple mutex %s failed, "
"errno %d : %s\n",
sm->sm_name,
err,
strerror(errno));
perror("simple_mutex : ");
} else {
sm->sm_locked = FALSE;
sm->sm_lock_thr = 0;
}
return err;
}
skygw_message_t* skygw_message_init(void)
{
int err;
skygw_message_t* mes;
mes = (skygw_message_t*)calloc(1, sizeof(skygw_message_t));
mes->mes_chk_top = CHK_NUM_MESSAGE;
mes->mes_chk_tail = CHK_NUM_MESSAGE;
err = pthread_mutex_init(&(mes->mes_mutex), NULL);
if (err != 0) {
fprintf(stderr,
"FATAL : initializing pthread mutex failed, "
"errno %d : %s\n",
err,
strerror(errno));
mes = NULL;
goto return_mes;
}
err = pthread_cond_init(&(mes->mes_cond), NULL);
if (err != 0) {
fprintf(stderr,
"FATAL : initializing pthread cond var failed, "
"errno %d : %s\n",
err,
strerror(errno));
mes = NULL;
goto return_mes;
}
CHK_MESSAGE(mes);
return_mes:
return mes;
}
void skygw_message_done(
skygw_message_t* mes)
{
int err;
/**
* If message struct pointer is NULL there's nothing to free.
*/
if (mes == NULL) {
return;
}
CHK_MESSAGE(mes);
err = pthread_cond_destroy(&(mes->mes_cond));
if (err != 0) {
fprintf(stderr,
"FATAL : destroying cond var failed, "
"errno %d : %s\n",
err,
strerror(errno));
}
ss_dassert(err == 0);
err = pthread_mutex_destroy(&(mes->mes_mutex));
if (err != 0) {
fprintf(stderr,
"FATAL : destroying pthread mutex failed, "
"errno %d : %s\n",
err,
strerror(errno));
}
ss_dassert(err == 0);
free(mes);
}
skygw_mes_rc_t skygw_message_send(
skygw_message_t* mes)
{
int err;
skygw_mes_rc_t rc = MES_RC_FAIL;
CHK_MESSAGE(mes);
err = pthread_mutex_lock(&(mes->mes_mutex));
if (err != 0) {
fprintf(stderr,
"INFO : Locking pthread mutex failed, "
"errno %d : %s\n",
err,
strerror(errno));
goto return_mes_rc;
}
mes->mes_sent = TRUE;
err = pthread_cond_signal(&(mes->mes_cond));
if (err != 0) {
fprintf(stderr,
"INFO : Signaling pthread cond var failed, "
"errno %d : %s\n",
err,
strerror(errno));
goto return_mes_rc;
}
err = pthread_mutex_unlock(&(mes->mes_mutex));
if (err != 0) {
fprintf(stderr,
"INFO : Unlocking pthread mutex failed, "
"errno %d : %s\n",
err,
strerror(errno));
goto return_mes_rc;
}
rc = MES_RC_SUCCESS;
return_mes_rc:
return rc;
}
void skygw_message_wait(
skygw_message_t* mes)
{
int err;
CHK_MESSAGE(mes);
err = pthread_mutex_lock(&(mes->mes_mutex));
if (err != 0) {
fprintf(stderr,
"INFO : Locking pthread mutex failed, "
"errno %d : %s\n",
err,
strerror(errno));
}
ss_dassert(err == 0);
while (!mes->mes_sent) {
err = pthread_cond_wait(&(mes->mes_cond), &(mes->mes_mutex));
if (err != 0) {
fprintf(stderr,
"INFO : Locking pthread cond wait failed, "
"errno %d : %s\n",
err,
strerror(errno));
}
}
mes->mes_sent = FALSE;
err = pthread_mutex_unlock(&(mes->mes_mutex));
if (err != 0) {
fprintf(stderr,
"INFO : Unlocking pthread mutex failed, "
"errno %d : %s\n",
err,
strerror(errno));
}
ss_dassert(err == 0);
}
void skygw_message_reset(
skygw_message_t* mes)
{
int err;
CHK_MESSAGE(mes);
err = pthread_mutex_lock(&(mes->mes_mutex));
if (err != 0) {
fprintf(stderr,
"INFO : Locking pthread mutex failed, "
"errno %d : %s\n",
err,
strerror(errno));
goto return_mes_rc;
}
ss_dassert(err == 0);
mes->mes_sent = FALSE;
err = pthread_mutex_unlock(&(mes->mes_mutex));
if (err != 0) {
fprintf(stderr,
"INFO : Unlocking pthread mutex failed, "
"errno %d : %s\n",
err,
strerror(errno));
goto return_mes_rc;
}
return_mes_rc:
ss_dassert(err == 0);
}
static bool file_write_header(
skygw_file_t* file)
{
bool succp = FALSE;
size_t wbytes1;
size_t wbytes2;
size_t len1;
size_t len2;
const char* header_buf1;
char* header_buf2 = NULL;
time_t* t;
struct tm* tm;
t = (time_t *)malloc(sizeof(time_t));
tm = (struct tm *)malloc(sizeof(struct tm));
*t = time(NULL);
*tm = *localtime(t);
CHK_FILE(file);
header_buf1 = "\n----------\nSkySQL Gateway ";
header_buf2 = strdup(asctime(tm));
if (header_buf2 == NULL) {
goto return_succp;
}
len1 = strlen(header_buf1);
len2 = strlen(header_buf2);
wbytes1=fwrite((void*)header_buf1, len1, 1, file->sf_file);
wbytes2=fwrite((void*)header_buf2, len2, 1, file->sf_file);
if (wbytes1 != 1 || wbytes2 != 1) {
fprintf(stderr,
"Writing header %s %s to %s failed.\n",
header_buf1,
header_buf2,
file->sf_fname);
perror("Logfile header write.\n");
goto return_succp;
}
CHK_FILE(file);
succp = TRUE;
return_succp:
free(header_buf2);
free(t);
free(tm);
return succp;
}
bool skygw_file_write(
skygw_file_t* file,
void* data,
size_t nbytes)
{
size_t nwritten;
bool succp = FALSE;
CHK_FILE(file);
nwritten = fwrite(data, nbytes, 1, file->sf_file);
if (nwritten != 1) {
fprintf(stderr,
"Writing header %s to %s failed.\n",
(char *)data,
file->sf_fname);
perror("Logfile write.\n");
goto return_succp;
}
succp = TRUE;
CHK_FILE(file);
return_succp:
return succp;
}
skygw_file_t* skygw_file_init(
char* fname)
{
skygw_file_t* file;
file = (skygw_file_t *)calloc(1, sizeof(skygw_file_t));
if (file == NULL) {
fprintf(stderr, "Memory allocation for skygw file failed.\n");
perror("SkyGW file allocation\n");
}
ss_dassert(file != NULL);
file->sf_chk_top = CHK_NUM_FILE;
file->sf_chk_tail = CHK_NUM_FILE;
file->sf_fname = strdup(fname);
file->sf_file = fopen(file->sf_fname, "a");
if (file->sf_file == NULL) {
fprintf(stderr, "Opening file %s failed.\n", file->sf_fname);
perror("SkyGW file open\n");
free(file);
file = NULL;
goto return_file;
}
file_write_header(file);
CHK_FILE(file);
ss_dfprintf(stderr, "Opened %s\n", file->sf_fname);
return_file:
ss_dassert(file->sf_file != NULL);
return file;
}
void skygw_file_done(
skygw_file_t* file)
{
int fd;
int err;
if (file != NULL) {
CHK_FILE(file);
fd = fileno(file->sf_file);
fsync(fd);
err = fclose(file->sf_file);
if (err != 0) {
fprintf(stderr,
"Closing file %s failed : %s.\n",
file->sf_fname,
strerror(err));
}
ss_dassert(err == 0);
ss_dfprintf(stderr, "Closed %s\n", file->sf_fname);
free(file->sf_fname);
free(file);
}
}