Fixed bugs and cleaned code

This commit is contained in:
vraatikka
2013-06-27 12:18:25 +03:00
parent 70b19a0481
commit 9630ae588e
6 changed files with 153 additions and 128 deletions

View File

@ -30,10 +30,16 @@
#define MAX_SUFFIXLEN 250 #define MAX_SUFFIXLEN 250
#define MAX_PATHLEN 512 #define MAX_PATHLEN 512
/**
* Global log manager pointer and lock variable.
* lmlock protects logmanager access.
*/
static int lmlock; static int lmlock;
static logmanager_t* lm; static logmanager_t* lm;
/** Write buffer structure */ /**
* Log client's string is copied to write buffer, which is passed
* to file writer thread. Write */
typedef struct logfile_writebuf_st { typedef struct logfile_writebuf_st {
skygw_chk_t wb_chk_top; skygw_chk_t wb_chk_top;
size_t wb_bufsize; size_t wb_bufsize;
@ -133,17 +139,12 @@ static logfile_writebuf_t** get_or_create_writebuffers(
void* ctx, void* ctx,
size_t len, size_t len,
size_t bufsize); size_t bufsize);
static int logmanager_write( static int logmanager_write(void* ctx, logfile_id_t id, char* str, bool flush);
void* ctx, static logfile_t* logmanager_get_logfile(logmanager_t* lm, logfile_id_t id);
logfile_id_t id,
char* str,
bool flush);
static bool logmanager_register(bool writep); static bool logmanager_register(bool writep);
static void logmanager_unregister(void); static void logmanager_unregister(void);
static bool logmanager_init_nomutex(void** p_ctx, int argc, char* argv[]); static bool logmanager_init_nomutex(void** p_ctx, int argc, char* argv[]);
static void logmanager_done_nomutex(void** ctx); static void logmanager_done_nomutex(void** ctx);
static void acquire_lock(int* l);
static void release_lock(int* l);
static void logfile_write_buffers( static void logfile_write_buffers(
logfile_t* lf, logfile_t* lf,
logfile_writebuf_t** p_wb, logfile_writebuf_t** p_wb,
@ -195,27 +196,6 @@ const size_t get_bufsize_default(void)
return (size_t)256; return (size_t)256;
} }
static void acquire_lock(
int* l)
{
register short int misscount = 0;
while (atomic_add(l, 1) != 0) {
atomic_add(l, -1);
misscount += 1;
if (misscount > 10) {
usleep(rand()%100);
misscount = 0;
}
}
}
static void release_lock(
int* l)
{
atomic_add(l, -1);
}
static bool logmanager_init_nomutex( static bool logmanager_init_nomutex(
void** p_ctx, void** p_ctx,
int argc, int argc,
@ -325,26 +305,25 @@ static void logmanager_done_nomutex(
logmanager_t* lmgr; logmanager_t* lmgr;
filewriter_t* fwr; filewriter_t* fwr;
/** Set global pointer NULL */ fwr = &lm->lm_filewriter;
lmgr = lm;
lm = NULL;
fwr = &lmgr->lm_filewriter;
CHK_FILEWRITER(fwr); CHK_FILEWRITER(fwr);
/** Inform filewriter thread and wait until it has stopped. */ /** Inform filewriter thread and wait until it has stopped. */
skygw_thread_set_exitflag(fwr->fwr_thread, skygw_thread_set_exitflag(fwr->fwr_thread,
fwr->fwr_logmes, fwr->fwr_logmes,
fwr->fwr_clientmes); fwr->fwr_clientmes);
/** Set global pointer NULL to prevent access to freed data. */
lmgr = lm;
lm = NULL;
/** Free thread memory */ /** Free thread memory */
skygw_thread_done(fwr->fwr_thread); skygw_thread_done(fwr->fwr_thread);
/** Free filewriter memory. */ /** Free filewriter memory. */
filewriter_done(fwr); filewriter_done(fwr);
for (i=LOGFILE_FIRST; i<=LOGFILE_LAST; i++) { for (i=LOGFILE_FIRST; i<=LOGFILE_LAST; i++) {
lf = &lmgr->lm_logfile[i]; lf = logmanager_get_logfile(lmgr, (logfile_id_t)i);
CHK_LOGFILE(lf);
/** Release logfile memory */ /** Release logfile memory */
logfile_done(lf); logfile_done(lf);
} }
@ -412,11 +391,13 @@ static logfile_t* logmanager_get_logfile(
logmanager_t* lmgr, logmanager_t* lmgr,
logfile_id_t id) logfile_id_t id)
{ {
logfile_t* lf;
CHK_LOGMANAGER(lmgr); CHK_LOGMANAGER(lmgr);
ss_dassert(id >= LOGFILE_FIRST && id <= LOGFILE_LAST); ss_dassert(id >= LOGFILE_FIRST && id <= LOGFILE_LAST);
CHK_LOGFILE(lmgr->lm_logfile); lf = &lmgr->lm_logfile[id];
CHK_LOGFILE(lf);
return &lmgr->lm_logfile[id]; return lf;
} }
static int logmanager_write( static int logmanager_write(
@ -448,18 +429,19 @@ static int logmanager_write(
ss_dassert(FALSE); ss_dassert(FALSE);
goto return_err; goto return_err;
} }
/** Get logfile and check its correctness. */
lf = logmanager_get_logfile(lm, id); lf = logmanager_get_logfile(lm, id);
CHK_LOGFILE(lf);
if (lf == NULL) { /**
fprintf(stderr, "Could find valid logfile from the log manager.\n"); * String pointer is NULL in skygw_log_flush.
err = -1; */
goto return_err; if (str == NULL) {
ss_dassert(flush);
goto return_flush;
} }
/** skygw_log_flush doesn't pass any string */
if (str != NULL) { /**
/** Get or create array of constant-size buffers. */ * Get or create array of constant-size buffers.
*/
wb_arr = get_or_create_writebuffers( wb_arr = get_or_create_writebuffers(
ctx, ctx,
strlen(str), strlen(str),
@ -475,10 +457,11 @@ static int logmanager_write(
* to logfile. Free write buffer pointer array. * to logfile. Free write buffer pointer array.
*/ */
logfile_write_buffers(lf, wb_arr, str); logfile_write_buffers(lf, wb_arr, str);
} else {
ss_dassert(flush); return_flush:
} /**
/** Send notification to filewriter thread */ * Notification is sent to filewriter thread.
*/
if (flush) { if (flush) {
skygw_message_send(lf->lf_logmes); skygw_message_send(lf->lf_logmes);
} }
@ -561,6 +544,7 @@ return_p_wb:
return p_wb; return p_wb;
} }
static void logfile_write_buffers( static void logfile_write_buffers(
logfile_t* lf, logfile_t* lf,
logfile_writebuf_t** p_wb, logfile_writebuf_t** p_wb,
@ -579,7 +563,7 @@ static void logfile_write_buffers(
p = str; p = str;
/** Copy log string to write buffer(s) */ /** Copy log string to write buffer(s) */
while(slen > 0) { while (slen > 0) {
wb = *p_wb; wb = *p_wb;
copylen = MIN(wb->wb_bufsize, slen); copylen = MIN(wb->wb_bufsize, slen);
ss_dassert(*p_data != NULL); ss_dassert(*p_data != NULL);
@ -1190,68 +1174,69 @@ static void* thr_filewriter_fun(
{ {
skygw_thread_t* thr; skygw_thread_t* thr;
filewriter_t* fwr; filewriter_t* fwr;
logfile_t* lf;
skygw_file_t* file; skygw_file_t* file;
logfile_writebuf_t* wb; logfile_writebuf_t* wb;
char* writep; char* writep;
size_t nbytes; size_t nbytes;
logfile_id_t id;
mlist_t* wblist; mlist_t* wblist;
mlist_node_t* node; mlist_node_t* node;
mlist_node_t* anchor; mlist_node_t* prev_node;
logfile_id_t id;
int i; int i;
bool succp; bool succp;
thr = (skygw_thread_t *)data; thr = (skygw_thread_t *)data;
fwr = (filewriter_t *)skygw_thread_get_data(thr); fwr = (filewriter_t *)skygw_thread_get_data(thr);
CHK_FILEWRITER(fwr); CHK_FILEWRITER(fwr);
skygw_thread_set_state(thr, THR_RUNNING); ss_debug(skygw_thread_set_state(thr, THR_RUNNING));
/** Inform log manager about the state. */
skygw_message_send(fwr->fwr_clientmes); skygw_message_send(fwr->fwr_clientmes);
while(!skygw_thread_must_exit(thr)) { while(!skygw_thread_must_exit(thr)) {
/** Wait until new log arrival message appears. /**
* Reset message to avoid redundant calls. */ * Wait until new log arrival message appears.
* Reset message to avoid redundant calls.
*/
skygw_message_wait(fwr->fwr_logmes); skygw_message_wait(fwr->fwr_logmes);
skygw_message_reset(fwr->fwr_logmes); skygw_message_reset(fwr->fwr_logmes);
/** Test _all_ logfiles if they have buffered writes. */ /** Process all logfiles which have buffered writes. */
for (i=LOGFILE_FIRST; i<=LOGFILE_LAST; i++) { for (i=LOGFILE_FIRST; i<=LOGFILE_LAST; i++) {
id = (logfile_id_t)i; /**
lf = logmanager_get_logfile(fwr->fwr_logmgr, id); * Get file pointer of current logfile,
CHK_LOGFILE(lf); * and logfile's write buffer.
file = fwr->fwr_file[id]; */
wblist = &lf->lf_writebuf_list; file = fwr->fwr_file[i];
wblist = &lm->lm_logfile[(logfile_id_t)i].lf_writebuf_list;
simple_mutex_lock(&wblist->mlist_mutex, TRUE); /** Process non-empty write buffer lists only. */
/** Process non-empty write buffer lists. */
if (wblist->mlist_nodecount != 0) { if (wblist->mlist_nodecount != 0) {
/** Detach nodes from the list and release lock */
node = wblist->mlist_first;
wblist->mlist_first = NULL;
wblist->mlist_last = NULL;
wblist->mlist_nodecount = 0;
simple_mutex_unlock(&wblist->mlist_mutex);
/** Read data from write buf list and write it to file. */ /** Detach all nodes of the list */
simple_mutex_lock(&wblist->mlist_mutex, TRUE);
node = mlist_detach_nodes(wblist);
simple_mutex_unlock(&wblist->mlist_mutex);
/**
* Get string pointer and length, and pass them to file
* writer function.
*/
while(node != NULL) { while(node != NULL) {
wb = (logfile_writebuf_t*)node->mlnode_data; wb = (logfile_writebuf_t*)node->mlnode_data;
writep = wb->wb_buf; writep = wb->wb_buf;
nbytes = strlen(writep); nbytes = strlen(writep);
succp = skygw_file_write(file, (void *)writep, nbytes); succp = skygw_file_write(file, (void *)writep, nbytes);
ss_dassert(succp); ss_dassert(succp);
anchor = node; prev_node = node;
node = node->mlnode_next; node = node->mlnode_next;
mlist_node_done(anchor); mlist_node_done(prev_node);
} }
} else {
simple_mutex_unlock(&wblist->mlist_mutex);
} }
} /* for */ } /* for */
} /* while */ } /* while */
/** This informs client thread that file writer thread is stopped. */ ss_debug(skygw_thread_set_state(thr, THR_STOPPED));
skygw_thread_set_state(thr, THR_STOPPED); /** Inform log manager that file writer thread has stopped. */
skygw_message_send(fwr->fwr_clientmes); skygw_message_send(fwr->fwr_clientmes);
return NULL; return NULL;
} }

View File

@ -23,8 +23,8 @@ typedef struct fnames_conf_st fnames_conf_t;
typedef struct logmanager_st logmanager_t; typedef struct logmanager_st logmanager_t;
typedef enum { typedef enum {
LOGFILE_FIRST = 0, LOGFILE_TRACE = 0,
LOGFILE_TRACE = LOGFILE_FIRST, LOGFILE_FIRST = LOGFILE_TRACE,
LOGFILE_MESSAGE, LOGFILE_MESSAGE,
LOGFILE_ERROR, LOGFILE_ERROR,
LOGFILE_LAST = LOGFILE_ERROR LOGFILE_LAST = LOGFILE_ERROR

View File

@ -48,7 +48,6 @@ int main(int argc, char* argv[])
size_t nactive; size_t nactive;
thread_t* thr[NTHR]; thread_t* thr[NTHR];
skygw_logmanager_init(NULL, argc, argv); skygw_logmanager_init(NULL, argc, argv);
logstr = strdup("My name is Tracey"); logstr = strdup("My name is Tracey");
@ -73,7 +72,7 @@ int main(int argc, char* argv[])
logstr = strdup("I'm doing fine!"); logstr = strdup("I'm doing fine!");
err = skygw_log_write(NULL, LOGFILE_MESSAGE, logstr); err = skygw_log_write(NULL, LOGFILE_MESSAGE, logstr);
logstr = strdup("Rather more surprising, at least at first sight, is the fact that a reference to a[i] can also be written as *(a+i). In evaluating a[i], C converts it to *(a+i) immediately; the two forms are equivalent. Applying the operatos & to both parts of this equivalence, it follows that &a[i] and a+i are also identical: a+i is the address of the i-th element beyond a."); logstr = strdup("Rather more surprising, at least at first sight, is the fact that a reference to a[i] can also be written as *(a+i). In evaluating a[i], C converts it to *(a+i) immediately; the two forms are equivalent. Applying the operators & to both parts of this equivalence, it follows that &a[i] and a+i are also identical: a+i is the address of the i-th element beyond a.");
err = skygw_log_write(NULL, LOGFILE_ERROR, logstr); err = skygw_log_write(NULL, LOGFILE_ERROR, logstr);
logstr = strdup("I was wondering, you know, it has been such a lovely weather whole morning and I thought that would you like to come to my place and have a little piece of cheese with us. Just me and my mom - and you, of course. Then, if you wish, we could listen to the radio and keep company for our little Steven, my mom's cat, you know."); logstr = strdup("I was wondering, you know, it has been such a lovely weather whole morning and I thought that would you like to come to my place and have a little piece of cheese with us. Just me and my mom - and you, of course. Then, if you wish, we could listen to the radio and keep company for our little Steven, my mom's cat, you know.");

View File

@ -37,6 +37,7 @@
#if defined(SS_DEBUG) #if defined(SS_DEBUG)
# define ss_debug(exp) exp
# define ss_dfprintf fprintf # define ss_dfprintf fprintf
# define ss_dfflush fflush # define ss_dfflush fflush
# define ss_dfwrite fwrite # define ss_dfwrite fwrite
@ -73,6 +74,7 @@
#else /* SS_DEBUG */ #else /* SS_DEBUG */
# define ss_debug(exp)
# define ss_dfprintf(a, b, ...) # define ss_dfprintf(a, b, ...)
# define ss_dfflush # define ss_dfflush
# define ss_dfwrite # define ss_dfwrite

View File

@ -64,7 +64,9 @@ struct skygw_thread_st {
pthread_t sth_parent; pthread_t sth_parent;
pthread_t sth_thr; pthread_t sth_thr;
int sth_errno; int sth_errno;
#if defined(SS_DEBUG)
skygw_thr_state_t sth_state; skygw_thr_state_t sth_state;
#endif
char* sth_name; char* sth_name;
void* (*sth_thrfun)(void* data); void* (*sth_thrfun)(void* data);
void* sth_data; void* sth_data;
@ -92,18 +94,10 @@ struct skygw_file_st {
static mlist_node_t* mlist_node_init(void* data, mlist_cursor_t* cursor); static mlist_node_t* mlist_node_init(void* data, mlist_cursor_t* cursor);
//static mlist_node_t* mlist_node_get_next(mlist_node_t* curr_node);
static mlist_node_t* mlist_node_get_next( //static mlist_node_t* mlist_get_first(mlist_t* list);
mlist_node_t* curr_node); //static mlist_cursor_t* mlist_get_cursor(mlist_t* list);
static mlist_node_t* mlist_get_first( static void mlist_add_node_nomutex(mlist_t* list, mlist_node_t* newnode);
mlist_t* list);
static mlist_cursor_t* mlist_get_cursor(
mlist_t* list);
static void mlist_add_node_nomutex(
mlist_t* list,
mlist_node_t* newnode);
#endif /* MLIST */ #endif /* MLIST */
@ -232,6 +226,31 @@ return_err:
return err; return err;
} }
/**
* @node Cut off nodes of the list.
*
* Parameters:
* @param ml - <usage>
* <description>
*
* @return Pointer to the first of the detached nodes.
*
*
* @details (write detailed description here)
*
*/
mlist_node_t* mlist_detach_nodes(
mlist_t* ml)
{
mlist_node_t* node;
CHK_MLIST(ml);
node = ml->mlist_first;
ml->mlist_first = NULL;
ml->mlist_last = NULL;
ml->mlist_nodecount = 0;
return node;
}
/** /**
* @node Create a list with rwlock and optional read-only cursor * @node Create a list with rwlock and optional read-only cursor
@ -587,7 +606,6 @@ static void slist_add_node(
static slist_node_t* slist_node_get_next( static slist_node_t* slist_node_get_next(
slist_node_t* curr_node) slist_node_t* curr_node)
{ {
@ -850,7 +868,7 @@ skygw_thread_t* skygw_thread_init(
th->sth_chk_top = CHK_NUM_THREAD; th->sth_chk_top = CHK_NUM_THREAD;
th->sth_chk_tail = CHK_NUM_THREAD; th->sth_chk_tail = CHK_NUM_THREAD;
th->sth_parent = pthread_self(); th->sth_parent = pthread_self();
th->sth_state = THR_INIT; ss_debug(th->sth_state = THR_INIT;)
th->sth_name = name; th->sth_name = name;
th->sth_mutex = simple_mutex_init(NULL, strdup(name)); th->sth_mutex = simple_mutex_init(NULL, strdup(name));
@ -892,7 +910,7 @@ void skygw_thread_done(
{ {
CHK_THREAD(th); CHK_THREAD(th);
ss_dassert(th->sth_state == THR_STOPPED); ss_dassert(th->sth_state == THR_STOPPED);
th->sth_state = THR_DONE; ss_debug(th->sth_state = THR_DONE;)
simple_mutex_done(th->sth_mutex); simple_mutex_done(th->sth_mutex);
thread_free_memory(th, th->sth_name); thread_free_memory(th, th->sth_name);
} }
@ -924,14 +942,14 @@ return_err:
return err; return err;
} }
#if defined(SS_DEBUG)
skygw_thr_state_t skygw_thread_get_state( skygw_thr_state_t skygw_thread_get_state(
skygw_thread_t* thr) skygw_thread_t* thr)
{ {
CHK_THREAD(thr); CHK_THREAD(thr);
return thr->sth_state; return thr->sth_state;
} }
#endif
/** /**
* @node Update thread state * @node Update thread state
@ -949,6 +967,7 @@ skygw_thr_state_t skygw_thread_get_state(
* @details Thread must check state with mutex. * @details Thread must check state with mutex.
* *
*/ */
#if defined(SS_DEBUG)
void skygw_thread_set_state( void skygw_thread_set_state(
skygw_thread_t* thr, skygw_thread_t* thr,
skygw_thr_state_t state) skygw_thr_state_t state)
@ -958,7 +977,7 @@ void skygw_thread_set_state(
thr->sth_state = state; thr->sth_state = state;
simple_mutex_unlock(thr->sth_mutex); simple_mutex_unlock(thr->sth_mutex);
} }
#endif
/** /**
* @node Set exit flag for thread from other thread * @node Set exit flag for thread from other thread
* *
@ -1011,6 +1030,28 @@ bool skygw_thread_must_exit(
return thr->sth_must_exit; return thr->sth_must_exit;
} }
void acquire_lock(
int* l)
{
register short int misscount = 0;
while (atomic_add(l, 1) != 0) {
atomic_add(l, -1);
misscount += 1;
if (misscount > 10) {
usleep(rand()%100);
misscount = 0;
}
}
}
void release_lock(
int* l)
{
atomic_add(l, -1);
}
/** /**
* @node Create a simple_mutex structure which encapsulates pthread_mutex. * @node Create a simple_mutex structure which encapsulates pthread_mutex.
* *
@ -1081,16 +1122,9 @@ int simple_mutex_done(
int err; int err;
CHK_SIMPLE_MUTEX(sm); CHK_SIMPLE_MUTEX(sm);
err = simple_mutex_lock(sm, FALSE);
if (err != 0) { if (atomic_add(&sm->sm_enabled, -1) != 1) {
goto return_err; atomic_add(&sm->sm_enabled, 1);
}
sm->sm_enabled = FALSE;
err = simple_mutex_unlock(sm);
if (err != 0) {
goto return_err;
} }
err = pthread_mutex_destroy(&sm->sm_mutex); err = pthread_mutex_destroy(&sm->sm_mutex);

View File

@ -20,7 +20,7 @@ typedef struct simple_mutex_st {
pthread_mutex_t sm_mutex; pthread_mutex_t sm_mutex;
pthread_t sm_lock_thr; pthread_t sm_lock_thr;
bool sm_locked; bool sm_locked;
bool sm_enabled; int sm_enabled; /**< defined as in to minimize mutexing */
bool sm_flat; bool sm_flat;
char* sm_name; char* sm_name;
skygw_chk_t sm_chk_tail; skygw_chk_t sm_chk_tail;
@ -87,6 +87,8 @@ mlist_t* mlist_init(mlist_t* mlist, mlist_cursor_t** cursor, char* name);
void mlist_done(mlist_t* list); void mlist_done(mlist_t* list);
void mlist_add_data_nomutex(mlist_t* list, void* data); void mlist_add_data_nomutex(mlist_t* list, void* data);
void* mlist_node_get_data(mlist_node_t* node); void* mlist_node_get_data(mlist_node_t* node);
mlist_node_t* mlist_detach_nodes(mlist_t* ml);
void mlist_node_done(mlist_node_t* n); void mlist_node_done(mlist_node_t* n);
int mlist_cursor_done(mlist_cursor_t* c); int mlist_cursor_done(mlist_cursor_t* c);
mlist_cursor_t* mlist_cursor_init(mlist_t* ml); mlist_cursor_t* mlist_cursor_init(mlist_t* ml);
@ -124,6 +126,9 @@ bool skygw_file_write(skygw_file_t* file, void* data, size_t nbytes);
EXTERN_C_BLOCK_BEGIN EXTERN_C_BLOCK_BEGIN
void acquire_lock(int* l);
void release_lock(int* l);
simple_mutex_t* simple_mutex_init(simple_mutex_t* mutexptr, char* name); simple_mutex_t* simple_mutex_init(simple_mutex_t* mutexptr, char* name);
int simple_mutex_done(simple_mutex_t* sm); int simple_mutex_done(simple_mutex_t* sm);
int simple_mutex_lock(simple_mutex_t* sm, bool block); int simple_mutex_lock(simple_mutex_t* sm, bool block);