Changed log_manager to use block-siuze buffers instead of small write buffers. Added new test cases and added iterations. Added possibility to test with dummy disk write which sleeps constantly for 5ms instead of performing disk write.

This commit is contained in:
vraatikka
2013-07-23 08:34:43 +03:00
parent c84f4e099a
commit baed0e846f
5 changed files with 628 additions and 398 deletions

View File

@ -30,9 +30,7 @@
#define MAX_PREFIXLEN 250 #define MAX_PREFIXLEN 250
#define MAX_SUFFIXLEN 250 #define MAX_SUFFIXLEN 250
#define MAX_PATHLEN 512 #define MAX_PATHLEN 512
#define MAX_WRITEBUFMEM (256*4096)L #define MAXNBLOCKBUFS 10
#define MAX_WRITEBUFCOUNT 4096L
#define DEFAULT_WBUFSIZE 256
/** /**
* BUFSIZ comes from the system. It equals with block size or * BUFSIZ comes from the system. It equals with block size or
* its multiplication. * its multiplication.
@ -44,13 +42,8 @@
* These counters may be inaccurate but give some idea of how * These counters may be inaccurate but give some idea of how
* things are going. * things are going.
*/ */
static size_t prof_freelist_get;
static size_t prof_writebuf_init;
static size_t prof_writebuf_done;
static size_t prof_writebuf_count;
#endif
static int writebuf_count; #endif
/** /**
* Global log manager pointer and lock variable. * Global log manager pointer and lock variable.
@ -59,27 +52,12 @@ static int writebuf_count;
static int lmlock; static int lmlock;
static logmanager_t* lm; static logmanager_t* lm;
/**
* Log client's string is copied to write buffer, which is passed
* to file writer thread. Write */
typedef struct logfile_writebuf_st {
skygw_chk_t wb_chk_top;
size_t wb_bufsize;
bool wb_recycle;
union {
char fixed[256];
char dynamic[1]; /** no zero length arrays in C++ */
} wb_buf;
skygw_chk_t wb_chk_tail;
} logfile_writebuf_t;
/** Writer thread structure */ /** Writer thread structure */
struct filewriter_st { struct filewriter_st {
skygw_chk_t fwr_chk_top; skygw_chk_t fwr_chk_top;
flat_obj_state_t fwr_state; flat_obj_state_t fwr_state;
logmanager_t* fwr_logmgr; logmanager_t* fwr_logmgr;
mlist_t fwr_freebuf_list;
/** Physical files */ /** Physical files */
skygw_file_t* fwr_file[LOGFILE_LAST+1]; skygw_file_t* fwr_file[LOGFILE_LAST+1];
/** fwr_logmes is for messages from log clients */ /** fwr_logmes is for messages from log clients */
@ -90,6 +68,24 @@ struct filewriter_st {
skygw_chk_t fwr_chk_tail; skygw_chk_t fwr_chk_tail;
}; };
/**
* Log client's string is copied to block-sized log buffer, which is passed
* to file writer thread.
*/
typedef struct blockbuf_st {
skygw_chk_t bb_chk_top;
logfile_id_t bb_fileid;
bool bb_isfull; /**< closed for disk write */
simple_mutex_t bb_mutex; /**< bb_buf_used, bb_isfull */
int bb_refcount; /**< protected by list mutex. #of clients */
// int bb_blankcount; /**< # of blanks used btw strings */
size_t bb_buf_size;
size_t bb_buf_left;
size_t bb_buf_used;
char bb_buf[BUFSIZ];
skygw_chk_t bb_chk_tail;
} blockbuf_t;
/** logfile object corresponds to physical file(s) where /** logfile object corresponds to physical file(s) where
* certain log is written. * certain log is written.
*/ */
@ -108,9 +104,10 @@ struct logfile_st {
char* lf_full_name; char* lf_full_name;
int lf_nfiles_max; int lf_nfiles_max;
size_t lf_file_size; size_t lf_file_size;
size_t lf_writebuf_size; /** list of block-sized log buffers */
/** Flat list for write buffers ready for disk writing */ mlist_t lf_blockbuf_list;
mlist_t lf_writebuf_list; bool lf_flushflag;
int lf_spinlock; /**< lf_flushflag */
int lf_npending_writes; int lf_npending_writes;
skygw_chk_t lf_chk_tail; skygw_chk_t lf_chk_tail;
}; };
@ -126,7 +123,6 @@ struct fnames_conf_st {
char* fn_err_prefix; char* fn_err_prefix;
char* fn_err_suffix; char* fn_err_suffix;
char* fn_logpath; char* fn_logpath;
size_t fn_bufsize;
skygw_chk_t fn_chk_tail; skygw_chk_t fn_chk_tail;
}; };
@ -154,6 +150,7 @@ static bool logfile_init(
logmanager_t* logmanager); logmanager_t* logmanager);
static void logfile_done(logfile_t* logfile); static void logfile_done(logfile_t* logfile);
static void logfile_free_memory(logfile_t* lf); static void logfile_free_memory(logfile_t* lf);
static void logfile_flush(logfile_t* lf);
static bool filewriter_init( static bool filewriter_init(
logmanager_t* logmanager, logmanager_t* logmanager,
filewriter_t* fw, filewriter_t* fw,
@ -165,7 +162,6 @@ static void fnames_conf_done(fnames_conf_t* fn);
static void fnames_conf_free_memory(fnames_conf_t* fn); static void fnames_conf_free_memory(fnames_conf_t* fn);
static char* fname_conf_get_prefix(fnames_conf_t* fn, logfile_id_t id); static char* fname_conf_get_prefix(fnames_conf_t* fn, logfile_id_t id);
static char* fname_conf_get_suffix(fnames_conf_t* fn, logfile_id_t id); static char* fname_conf_get_suffix(fnames_conf_t* fn, logfile_id_t id);
static size_t fname_conf_get_bufsize(fnames_conf_t* fn, logfile_id_t id);
static void* thr_filewriter_fun(void* data); static void* thr_filewriter_fun(void* data);
static logfile_t* logmanager_get_logfile(logmanager_t* lm, logfile_id_t id); static logfile_t* logmanager_get_logfile(logmanager_t* lm, logfile_id_t id);
static bool logmanager_register(bool writep); static bool logmanager_register(bool writep);
@ -181,13 +177,19 @@ static int logmanager_write_log(
char* str, char* str,
va_list valist); va_list valist);
static logfile_writebuf_t* writebuf_init(size_t buflen); static blockbuf_t* blockbuf_init(logfile_id_t id);
static logfile_writebuf_t* get_or_create_writebuffer( static char* blockbuf_get_writepos(
void* buf, #if 0
int** refcount,
#else
blockbuf_t** p_bb,
#endif
logfile_id_t id,
size_t str_len, size_t str_len,
bool forceinit); bool flush);
static void logmanager_print_profs(void); static void blockbuf_register(blockbuf_t* bb);
static void blockbuf_unregister(blockbuf_t* bb);
const char* get_suffix_default(void) const char* get_suffix_default(void)
{ {
@ -229,11 +231,6 @@ const char* get_logpath_default(void)
return "/tmp"; return "/tmp";
} }
const size_t get_bufsize_default(void)
{
return (size_t)256;
}
static bool logmanager_init_nomutex( static bool logmanager_init_nomutex(
void** p_ctx, void** p_ctx,
int argc, int argc,
@ -254,9 +251,6 @@ static bool logmanager_init_nomutex(
fn->fn_state = UNINIT; fn->fn_state = UNINIT;
fw->fwr_state = UNINIT; fw->fwr_state = UNINIT;
/** Clear counters */
writebuf_count = 0;
/** Initialize configuration including log file naming info */ /** Initialize configuration including log file naming info */
if (!fnames_conf_init(fn, argc, argv)) { if (!fnames_conf_init(fn, argc, argv)) {
goto return_succp; goto return_succp;
@ -422,10 +416,6 @@ void skygw_logmanager_done(
{ {
ss_dfprintf(stderr, ">> skygw_logmanager_done\n"); ss_dfprintf(stderr, ">> skygw_logmanager_done\n");
#if defined(SS_PROF)
/** print collected profiles to message log */
logmanager_print_profs();
#endif
acquire_lock(&lmlock); acquire_lock(&lmlock);
if (lm == NULL) { if (lm == NULL) {
@ -458,24 +448,6 @@ return_void:
ss_dfprintf(stderr, "<< skygw_logmanager_done\n"); ss_dfprintf(stderr, "<< skygw_logmanager_done\n");
} }
#if defined(SS_PROF)
static void logmanager_print_profs(void)
{
skygw_log_write(NULL,
LOGFILE_MESSAGE,
"---------------------\n"
"Write buffers : \n\n"
"Allocated\t%d\nReleased\t%d\nReused\t\t%d (%d\%)"
"\nTotal\t\t%d",
prof_writebuf_init,
prof_writebuf_done,
prof_freelist_get,
(prof_writebuf_init == 0 ? 0 :
(int)(prof_freelist_get*100)/prof_writebuf_init),
prof_writebuf_count);
}
#endif /* SS_PROF */
static logfile_t* logmanager_get_logfile( static logfile_t* logmanager_get_logfile(
logmanager_t* lmgr, logmanager_t* lmgr,
logfile_id_t id) logfile_id_t id)
@ -503,18 +475,20 @@ static int logmanager_write_log(
va_list valist) va_list valist)
{ {
logfile_t* lf; logfile_t* lf;
/** array of constan-size buffers */ char* wp;
logfile_writebuf_t* wb = NULL;
mlist_t* wblist;
int err = 0; int err = 0;
char* str_buf = NULL; #if 0
int* refcount;
#else
blockbuf_t* bb;
#endif
CHK_LOGMANAGER(lm); CHK_LOGMANAGER(lm);
if (id < LOGFILE_FIRST || id > LOGFILE_LAST) { if (id < LOGFILE_FIRST || id > LOGFILE_LAST) {
char* errstr = "Invalid logfile id argument."; char* errstr = "Invalid logfile id argument.";
/** invalid id, since we don't have logfile yet, /**
* recall logmanager_write. */ * invalid id, since we don't have logfile yet.
*/
err = logmanager_write_log(NULL, err = logmanager_write_log(NULL,
LOGFILE_ERROR, LOGFILE_ERROR,
TRUE, TRUE,
@ -538,181 +512,300 @@ static int logmanager_write_log(
*/ */
if (str == NULL) { if (str == NULL) {
ss_dassert(flush); ss_dassert(flush);
goto return_flush; logfile_flush(lf);
goto return_err;
} }
/** Check string length. */ /** Check string length. */
if (str_len > MAX_LOGSTRLEN) { if (str_len > MAX_LOGSTRLEN) {
err = -1; err = -1;
goto return_flush; if (flush) {
logfile_flush(lf);
} }
goto return_err;
if (str_len <= DEFAULT_WBUFSIZE) { }
wp = blockbuf_get_writepos(&bb, id, str_len, flush);
/** /**
* Get write buffer from freelist or create new. * Print formatted string to write position.
*/
wb = get_or_create_writebuffer(buf, str_len, FALSE);
if (wb != NULL) {
str_buf = wb->wb_buf.fixed;
} else {
err = -1;
goto return_flush;
}
} else {
/**
* Force creation of new write buffer for custom-size buffers
*/
wb = get_or_create_writebuffer(buf, str_len, TRUE);
if (wb != NULL) {
str_buf = wb->wb_buf.dynamic;
} else {
err = -1;
goto return_flush;
}
}
/**
* Print formatted string to write buffer.
*/ */
if (use_valist) { if (use_valist) {
vsnprintf(str_buf, str_len, str, valist); vsnprintf(wp, str_len, str, valist);
} else { } else {
snprintf(str_buf, str_len, str); snprintf(wp, str_len, str);
} }
str_buf[str_len-2]='\n'; wp[str_len-2]='\n';
CHK_WRITEBUF(wb);
wblist = &lf->lf_writebuf_list; /** lock-free unregistration */
/** blockbuf_unregister(bb);
* Add new write buffer to write buffer list where file
* writer thread finds it and writes to log file.
*/
simple_mutex_lock(&wblist->mlist_mutex, TRUE);
CHK_MLIST(wblist);
mlist_add_data_nomutex(wblist, wb);
simple_mutex_unlock(&wblist->mlist_mutex);
return_flush:
/**
* Notification is sent to filewriter thread.
*/
if (flush) {
skygw_message_send(lf->lf_logmes);
}
return_err: return_err:
return err; return err;
} }
static void blockbuf_register(
blockbuf_t* bb)
{
CHK_BLOCKBUF(bb);
ss_dassert(bb->bb_refcount >= 0);
atomic_add(&bb->bb_refcount, 1);
}
static void blockbuf_unregister(
blockbuf_t* bb)
{
logfile_t* lf;
CHK_BLOCKBUF(bb);
ss_dassert(bb->bb_refcount >= 1);
lf = &lm->lm_logfile[bb->bb_fileid];
CHK_LOGFILE(lf);
/**
* if this is the last client in a full buffer, send write request.
*/
if (atomic_add(&bb->bb_refcount, -1) == 1 && bb->bb_isfull) {
skygw_message_send(lf->lf_logmes);
}
ss_dassert(bb->bb_refcount >= 0);
}
/** /**
* @node Search available write buffer from freelist. * @node (write brief function description here)
* *
* Parameters: * Parameters:
* @param buf - <usage> * @param id - <usage>
* <description>
*
* @param str_len - <usage>
* <description> * <description>
* *
* @return * @return
* *
* *
* @details (write detailed description here) * @details List mutex now protects both the list and the contents of it.
* TODO : It should be so that adding and removing nodes of the list is protected
* by the list mutex. Buffer modifications should be protected by buffer
* mutex.
* *
*/ */
static logfile_writebuf_t* get_or_create_writebuffer( static char* blockbuf_get_writepos(
void* buf, blockbuf_t** p_bb,
size_t buflen, logfile_id_t id,
bool forceinit) size_t str_len,
bool flush)
{ {
logfile_writebuf_t* wb = NULL; logfile_t* lf;
mlist_t* freelist; mlist_t* bb_list;
char* pos = NULL;
mlist_node_t* node; mlist_node_t* node;
blockbuf_t* bb;
ss_debug(bool succp;)
ss_debug(int i=0;)
if (forceinit) {
wb = writebuf_init(buflen); CHK_LOGMANAGER(lm);
goto return_wb; lf = &lm->lm_logfile[id];
CHK_LOGFILE(lf);
bb_list = &lf->lf_blockbuf_list;
/** Lock list */
simple_mutex_lock(&bb_list->mlist_mutex, TRUE);
CHK_MLIST(bb_list);
if (bb_list->mlist_nodecount > 0) {
/**
* At least block buffer exists on the list.
*/
node = bb_list->mlist_first;
/** Loop over blockbuf list to find write position */
while (TRUE) {
CHK_MLIST_NODE(node);
/** Unlock list */
simple_mutex_unlock(&bb_list->mlist_mutex);
bb = (blockbuf_t *)node->mlnode_data;
CHK_BLOCKBUF(bb);
/** Lock buffer */
simple_mutex_lock(&bb->bb_mutex, TRUE);
if (bb->bb_isfull || bb->bb_buf_left < str_len) {
/**
* This block buffer is too full.
* Send flush request to file writer thread. This causes
* flushing all buffers, and (eventually) frees buffer space.
*/
blockbuf_register(bb);
bb->bb_isfull = TRUE;
blockbuf_unregister(bb);
/** Unlock buffer */
simple_mutex_unlock(&bb->bb_mutex);
/** Lock list */
simple_mutex_lock(&bb_list->mlist_mutex, TRUE);
/**
* If next node exists move forward. Else check if there is
* space for a new block buffer on the list.
*/
if (node != bb_list->mlist_last) {
node = node->mlnode_next;
continue;
} }
/**
* All buffers on the list are full.
*/
if (bb_list->mlist_nodecount <
bb_list->mlist_nodecount_max)
{
/**
* New node is created
*/
bb = blockbuf_init(id);
CHK_BLOCKBUF(bb);
freelist = &lm->lm_filewriter.fwr_freebuf_list; /**
simple_mutex_lock(&freelist->mlist_mutex, TRUE); * Increase version to odd to mark list update active update.
CHK_MLIST(freelist); */
bb_list->mlist_versno += 1;
ss_dassert(bb_list->mlist_versno%2 == 1);
if (freelist->mlist_nodecount > 0) { ss_debug(succp =)
node = mlist_detach_first(freelist); mlist_add_data_nomutex(bb_list, bb);
ss_dassert(succp);
simple_mutex_unlock(&freelist->mlist_mutex); /**
* Increase version to even to mark completion of update.
*/
bb_list->mlist_versno += 1;
ss_dassert(bb_list->mlist_versno%2 == 0);
CHK_MLIST_NODE(node); /** Unlock list */
ss_prof(prof_freelist_get += 1;) simple_mutex_unlock(&bb_list->mlist_mutex);
wb = (logfile_writebuf_t *)node->mlnode_data;
memset(wb->wb_buf.fixed, 0, DEFAULT_WBUFSIZE);
CHK_MLIST_NODE(node);
CHK_WRITEBUF(wb);
node->mlnode_data = NULL;
mlist_node_done(node);
} else { } else {
simple_mutex_unlock(&freelist->mlist_mutex); /**
wb = writebuf_init(buflen); * List and buffers are full.
* Reset to the beginning of the list, and wait until
* there is a block buffer with enough space.
*/
simple_mutex_unlock(&bb_list->mlist_mutex);
simple_mutex_lock(&bb_list->mlist_mutex, TRUE);
node = bb_list->mlist_first;
continue;
} }
return_wb: } else {
return wb; /**
* There is space for new log string.
*/
break;
} }
} /** while (TRUE) */
} else {
/**
* Create the first block buffer to logfile's blockbuf list.
*/
bb = blockbuf_init(id);
CHK_BLOCKBUF(bb);
/** Lock buffer */
simple_mutex_lock(&bb->bb_mutex, TRUE);
/**
* Increase version to odd to mark list update active update.
*/
bb_list->mlist_versno += 1;
ss_dassert(bb_list->mlist_versno%2 == 1);
ss_debug(succp =)mlist_add_data_nomutex(bb_list, bb);
ss_dassert(succp);
/**
* Increase version to even to mark completion of update.
*/
bb_list->mlist_versno += 1;
ss_dassert(bb_list->mlist_versno%2 == 0);
/** Unlock list */
simple_mutex_unlock(&bb_list->mlist_mutex);
} /* if (bb_list->mlist_nodecount > 0) */
ss_dassert(pos == NULL);
ss_dassert(!(bb->bb_isfull || bb->bb_buf_left < str_len));
ss_dassert(bb_list->mlist_nodecount <= nodecount_max);
/**
* Add reference for the write operation.
* It indicates that client has allocated space from the buffer,
* but not written yet. As long as refcount > 0 buffer can't be
* written to disk.
*/
blockbuf_register(bb);
*p_bb = bb;
/**
* At this point list mutex is held and bb points to a node with
* enough space available. pos is not yet set.
*/
pos = &bb->bb_buf[bb->bb_buf_used];
bb->bb_buf_used += str_len;
bb->bb_buf_left -= str_len;
ss_dassert(pos >= &bb->bb_buf[0] &&
pos <= &bb->bb_buf[BUFSIZ-str_len]);
/** read checkmark */
/** TODO: add buffer overflow checkmark
chk_val = (int)bb->bb_buf[bb->bb_buf_used-count_len];
ss_dassert(chk_val == bb->bb_strcount);
*/
/** TODO : write next checkmark
bb->bb_strcount += 1;
memcpy(&bb->bb_buf[bb->bb_buf_used], &bb->bb_strcount, count_len);
bb->bb_buf_used += count_len;
bb->bb_buf_left -= count_len;
*/
/** /**
* @node Allocate memory and initialize new write buffer struct. * If flush flag is set, set buffer full. As a consequence,
* * it will be flushed.
* Parameters:
* @param buflen - <usage>
* <description>
*
* @return
*
*
* @details write buffer can be recycled if there aren't too many of those
* already. Buffers other than default size aren't recycled.
*
*/ */
static logfile_writebuf_t* writebuf_init( bb->bb_isfull = (flush == TRUE ? TRUE : bb->bb_isfull);
size_t buflen)
/** Unlock buffer */
simple_mutex_unlock(&bb->bb_mutex);
#if 0
/** Release lock */
simple_mutex_unlock(&bb_list->mlist_mutex);
#endif
ss_dassert(bb_list->mlist_mutex->sm_lock_thr != pthread_self());
return pos;
}
static blockbuf_t* blockbuf_init(
logfile_id_t id)
{ {
logfile_writebuf_t* wb; blockbuf_t* bb;
int add_nbytes = 0;
bool recycle;
/** Increase global writebuf counter */ bb = (blockbuf_t *)calloc(1, sizeof(blockbuf_t));
if (atomic_add(&writebuf_count, 1) < MAX_WRITEBUFCOUNT) { bb->bb_fileid = id;
recycle = TRUE; bb->bb_chk_top = CHK_NUM_BLOCKBUF;
bb->bb_chk_tail = CHK_NUM_BLOCKBUF;
simple_mutex_init(&bb->bb_mutex, "Blockbuf mutex");
bb->bb_buf_left = BUFSIZ;
bb->bb_buf_size = BUFSIZ;
CHK_BLOCKBUF(bb);
return bb;
} }
if (buflen > DEFAULT_WBUFSIZE) {
add_nbytes = buflen-DEFAULT_WBUFSIZE;
recycle = FALSE;
}
wb = (logfile_writebuf_t*)
calloc(1, sizeof(logfile_writebuf_t)+add_nbytes);
if (wb == NULL) {
ss_dfprintf(stderr, "Memory allocation for write buffer failed.\n");
atomic_add(&writebuf_count, -1);
goto return_wb;
}
/** Increase profile counter */
ss_prof(prof_writebuf_init += 1;)
ss_prof(prof_writebuf_count = writebuf_count;)
ss_debug(wb->wb_chk_top = CHK_NUM_WRITEBUF;)
wb->wb_bufsize = MAX(buflen,DEFAULT_WBUFSIZE);
wb->wb_recycle = recycle;
CHK_WRITEBUF(wb);
return_wb:
return wb;
}
int skygw_log_write_flush( int skygw_log_write_flush(
void* ctx, void* ctx,
@ -892,7 +985,6 @@ static bool logmanager_register(
if (lm == NULL) { if (lm == NULL) {
succp = logmanager_init_nomutex(NULL, 0, NULL); succp = logmanager_init_nomutex(NULL, 0, NULL);
} }
} }
/** if logmanager existed or was succesfully restarted, increase link */ /** if logmanager existed or was succesfully restarted, increase link */
if (succp) { if (succp) {
@ -964,8 +1056,7 @@ static bool fnames_conf_init(
"-d <message suffix> ............(\".log\")\n" "-d <message suffix> ............(\".log\")\n"
"-e <error prefix> ............(\"skygw_err\")\n" "-e <error prefix> ............(\"skygw_err\")\n"
"-f <error suffix> ............(\".log\")\n" "-f <error suffix> ............(\".log\")\n"
"-g <log path> ............(\"/tmp\")\n" "-g <log path> ............(\"/tmp\")\n";
"-i <write buffer size in bytes> (256)\n";
/** /**
* When init_started is set, clean must be done for it. * When init_started is set, clean must be done for it.
@ -974,7 +1065,7 @@ static bool fnames_conf_init(
fn->fn_chk_top = CHK_NUM_FNAMES; fn->fn_chk_top = CHK_NUM_FNAMES;
fn->fn_chk_tail = CHK_NUM_FNAMES; fn->fn_chk_tail = CHK_NUM_FNAMES;
while ((opt = getopt(argc, argv, "+a:b:c:d:e:f:g:hi:")) != -1) { while ((opt = getopt(argc, argv, "+a:b:c:d:e:f:g:h")) != -1) {
switch (opt) { switch (opt) {
case 'a': case 'a':
fn->fn_trace_prefix = strndup(optarg, MAX_PREFIXLEN); fn->fn_trace_prefix = strndup(optarg, MAX_PREFIXLEN);
@ -1004,10 +1095,6 @@ static bool fnames_conf_init(
fn->fn_logpath = strndup(optarg, MAX_PATHLEN); fn->fn_logpath = strndup(optarg, MAX_PATHLEN);
break; break;
case 'i':
fn->fn_bufsize = strtoul(optarg, NULL, 10);
break;
case 'h': case 'h':
default: default:
fprintf(stderr, fprintf(stderr,
@ -1031,8 +1118,6 @@ static bool fnames_conf_init(
strdup(get_err_suffix_default()) : fn->fn_err_suffix; strdup(get_err_suffix_default()) : fn->fn_err_suffix;
fn->fn_logpath = (fn->fn_logpath == NULL) ? fn->fn_logpath = (fn->fn_logpath == NULL) ?
strdup(get_logpath_default()) : fn->fn_logpath; strdup(get_logpath_default()) : fn->fn_logpath;
fn->fn_bufsize = (fn->fn_bufsize == 0) ?
get_bufsize_default() : fn->fn_bufsize;
ss_dfprintf(stderr, "Command line : "); ss_dfprintf(stderr, "Command line : ");
for (i=0; i<argc; i++) { for (i=0; i<argc; i++) {
@ -1103,24 +1188,6 @@ static char* fname_conf_get_suffix(
} }
} }
static size_t fname_conf_get_bufsize(
fnames_conf_t* fn,
logfile_id_t id)
{
CHK_FNAMES_CONF(fn);
ss_dassert(id >= LOGFILE_FIRST && id <= LOGFILE_LAST);
switch (id) {
case LOGFILE_TRACE:
case LOGFILE_MESSAGE:
case LOGFILE_ERROR:
return fn->fn_bufsize;
break;
default:
return 0;
}
}
static bool logfiles_init( static bool logfiles_init(
logmanager_t* lmgr) logmanager_t* lmgr)
@ -1140,6 +1207,15 @@ static bool logfiles_init(
return succp; return succp;
} }
static void logfile_flush(
logfile_t* lf)
{
CHK_LOGFILE(lf);
acquire_lock(&lf->lf_spinlock);
lf->lf_flushflag = TRUE;
release_lock(&lf->lf_spinlock);
skygw_message_send(lf->lf_logmes);
}
static bool logfile_init( static bool logfile_init(
logfile_t* logfile, logfile_t* logfile,
@ -1162,6 +1238,8 @@ static bool logfile_init(
logfile->lf_npending_writes = 0; logfile->lf_npending_writes = 0;
logfile->lf_name_sequence = 1; logfile->lf_name_sequence = 1;
logfile->lf_lmgr = logmanager; logfile->lf_lmgr = logmanager;
logfile->lf_flushflag = FALSE;
logfile->lf_spinlock = 0;
/** Read existing files to logfile->lf_files_list and create /** Read existing files to logfile->lf_files_list and create
* new file for log named after <directory>/<prefix><counter><suffix> * new file for log named after <directory>/<prefix><counter><suffix>
*/ */
@ -1189,14 +1267,13 @@ static bool logfile_init(
logfile->lf_name_sequence, logfile->lf_name_sequence,
logfile->lf_name_suffix); logfile->lf_name_suffix);
logfile->lf_writebuf_size = fname_conf_get_bufsize(fn, logfile_id); if (mlist_init(&logfile->lf_blockbuf_list,
if (mlist_init(&logfile->lf_writebuf_list,
NULL, NULL,
strdup("logfile writebuf list"), strdup("logfile block buffer list"),
writebuf_clear) == NULL) NULL,
MAXNBLOCKBUFS) == NULL)
{ {
ss_dfprintf(stderr, "Initializing logfile writebuf list failed\n"); ss_dfprintf(stderr, "Initializing logfile blockbuf list failed\n");
logfile_free_memory(logfile); logfile_free_memory(logfile);
goto return_with_succp; goto return_with_succp;
} }
@ -1240,7 +1317,7 @@ static void logfile_done(
CHK_LOGFILE(lf); CHK_LOGFILE(lf);
ss_dassert(lf->lf_npending_writes == 0); ss_dassert(lf->lf_npending_writes == 0);
case INIT: case INIT:
mlist_done(&lf->lf_writebuf_list); mlist_done(&lf->lf_blockbuf_list);
logfile_free_memory(lf); logfile_free_memory(lf);
lf->lf_state = DONE; lf->lf_state = DONE;
case DONE: case DONE:
@ -1301,14 +1378,11 @@ static bool filewriter_init(
id = (logfile_id_t)i; id = (logfile_id_t)i;
lf = logmanager_get_logfile(logmanager, id); lf = logmanager_get_logfile(logmanager, id);
fw->fwr_file[id] = skygw_file_init(lf->lf_full_name); fw->fwr_file[id] = skygw_file_init(lf->lf_full_name);
}
if (mlist_init(&fw->fwr_freebuf_list, if (fw->fwr_file[id] == NULL) {
NULL,
strdup("Filewriter freebuf list"),
writebuf_clear) == NULL)
{
goto return_succp; goto return_succp;
} }
}
fw->fwr_state = RUN; fw->fwr_state = RUN;
CHK_FILEWRITER(fw); CHK_FILEWRITER(fw);
succp = TRUE; succp = TRUE;
@ -1320,41 +1394,6 @@ return_succp:
return succp; return succp;
} }
/**
* @node Clears write buffer but doesn't release memory.
*
* Parameters:
* @param data - <usage>
* <description>
*
* @return void
*
*
* @details (write detailed description here)
*
*/
void writebuf_clear(
void* data)
{
logfile_writebuf_t* wb;
wb = (logfile_writebuf_t *)data;
if (wb != NULL) {
CHK_WRITEBUF(wb);
if (wb->wb_bufsize == DEFAULT_WBUFSIZE) {
wb->wb_buf.fixed[0] = '\0';
} else {
wb->wb_buf.dynamic[0] = '\0';
}
/** Decrease counter */
atomic_add(&writebuf_count, -1);
ss_prof(prof_writebuf_done += 1;)
}
}
static void filewriter_done( static void filewriter_done(
filewriter_t* fw) filewriter_t* fw)
{ {
@ -1371,7 +1410,6 @@ static void filewriter_done(
id = (logfile_id_t)i; id = (logfile_id_t)i;
skygw_file_done(fw->fwr_file[id]); skygw_file_done(fw->fwr_file[id]);
} }
mlist_done(&fw->fwr_freebuf_list);
case DONE: case DONE:
case UNINIT: case UNINIT:
default: default:
@ -1380,41 +1418,26 @@ static void filewriter_done(
} }
/**
* @node File writer thread routine
*
* Parameters:
* @param data - <usage>
* <description>
*
* @return
*
*
* @details (write detailed description here)
*
*/
static void* thr_filewriter_fun( static void* thr_filewriter_fun(
void* data) void* data)
{ {
skygw_thread_t* thr; skygw_thread_t* thr;
filewriter_t* fwr; filewriter_t* fwr;
skygw_file_t* file; skygw_file_t* file;
logfile_t* lf;
logfile_writebuf_t* wb; mlist_t* bb_list;
char* writep; blockbuf_t* bb;
size_t nbytes;
mlist_t* wblist;
mlist_t* freelist;
mlist_node_t* node; mlist_node_t* node;
mlist_node_t* save_node;
int i; int i;
ss_debug(bool succp;) bool flush; /**< flush logfile */
bool flushall;/**< flush all logfiles */
size_t vn1;
size_t vn2;
thr = (skygw_thread_t *)data; thr = (skygw_thread_t *)data;
fwr = (filewriter_t *)skygw_thread_get_data(thr); fwr = (filewriter_t *)skygw_thread_get_data(thr);
CHK_FILEWRITER(fwr); CHK_FILEWRITER(fwr);
freelist = &fwr->fwr_freebuf_list;
CHK_MLIST(freelist);
ss_debug(skygw_thread_set_state(thr, THR_RUNNING)); ss_debug(skygw_thread_set_state(thr, THR_RUNNING));
/** Inform log manager about the state. */ /** Inform log manager about the state. */
@ -1426,61 +1449,85 @@ static void* thr_filewriter_fun(
* Reset message to avoid redundant calls. * Reset message to avoid redundant calls.
*/ */
skygw_message_wait(fwr->fwr_logmes); skygw_message_wait(fwr->fwr_logmes);
skygw_message_reset(fwr->fwr_logmes);
flushall = skygw_thread_must_exit(thr);
/** Process all logfiles which have buffered writes. */ /** Process all logfiles which have buffered writes. */
for (i=LOGFILE_FIRST; i<=LOGFILE_LAST; i++) { for (i=LOGFILE_FIRST; i<=LOGFILE_LAST; i++) {
/** /**
* Get file pointer of current logfile, * Get file pointer of current logfile.
* and logfile's write buffer.
*/ */
file = fwr->fwr_file[i]; file = fwr->fwr_file[i];
wblist = &lm->lm_logfile[(logfile_id_t)i].lf_writebuf_list; lf = &lm->lm_logfile[(logfile_id_t)i];
/** Process non-empty write buffer lists only. */
if (wblist->mlist_nodecount != 0) {
/** Detach all nodes of the list */
simple_mutex_lock(&wblist->mlist_mutex, TRUE);
node = mlist_detach_nodes(wblist);
simple_mutex_unlock(&wblist->mlist_mutex);
/** /**
* Get string pointer and length, and pass them to file * read and reset logfile's flushflag
* writer function.
*/ */
acquire_lock(&lf->lf_spinlock);
flush = lf->lf_flushflag;
lf->lf_flushflag = FALSE;
release_lock(&lf->lf_spinlock);
/**
* get logfile's block buffer list
*/
bb_list = &lf->lf_blockbuf_list;
/** Korvaa mutex version luvulla, luvulla ja uudella version luvulla.
* Joka kohdassa.
*/
#if 0
simple_mutex_lock(&bb_list->mlist_mutex, TRUE);
#endif
CHK_MLIST(bb_list);
node = bb_list->mlist_first;
while (node != NULL) { while (node != NULL) {
wb = (logfile_writebuf_t*)node->mlnode_data; CHK_MLIST_NODE(node);
CHK_WRITEBUF(wb); bb = (blockbuf_t *)node->mlnode_data;
CHK_BLOCKBUF(bb);
if (wb->wb_bufsize == DEFAULT_WBUFSIZE) { simple_mutex_lock(&bb->bb_mutex, TRUE);
writep = wb->wb_buf.fixed;
} else { if (bb->bb_buf_used != 0 &&
writep = wb->wb_buf.dynamic; (bb->bb_isfull || flush || flushall))
}
/** Call file write */
nbytes = strlen(writep);
ss_debug(succp = )
skygw_file_write(file, (void *)writep, nbytes);
ss_dassert(succp);
save_node = node;
node = node->mlnode_next;
save_node->mlnode_next = NULL;
/**
* Move nodes with default-size memory buffer to
* free list.
*/
if (wb->wb_bufsize == DEFAULT_WBUFSIZE &&
wb->wb_recycle)
{ {
simple_mutex_lock(&freelist->mlist_mutex, TRUE); /**
mlist_add_node_nomutex(freelist, save_node); * buffer is at least half-full -> write to disk
simple_mutex_unlock(&freelist->mlist_mutex); */
} else { while(bb->bb_refcount > 0) {
/** Custom-size buffers are freed */ simple_mutex_unlock(&bb->bb_mutex);
mlist_node_done(save_node); simple_mutex_lock(&bb->bb_mutex, TRUE);
} }
} /* while */
} /* if */ skygw_file_write(file,
(void *)bb->bb_buf,
bb->bb_buf_used);
/**
* Reset buffer
* TODO: it may be probably faster to free and calloc
* new buffer every time full one is locked for
* file write.
*/
bb->bb_buf_left = bb->bb_buf_size;
bb->bb_buf_used = 0;
memset(bb->bb_buf, 0, bb->bb_buf_size);
bb->bb_isfull = FALSE;
}
simple_mutex_unlock(&bb->bb_mutex);
/** Consistent lock-free read on the list */
do {
while ((vn1 = bb_list->mlist_versno)%2 != 0);
node = node->mlnode_next;
vn2 = bb_list->mlist_versno;
} while (vn1 != vn2);
} /* while (node != NULL) */
#if 0
simple_mutex_unlock(&bb_list->mlist_mutex);
#endif
} /* for */ } /* for */
} /* while (!skygw_thread_must_exit) */ } /* while (!skygw_thread_must_exit) */

View File

@ -35,8 +35,10 @@ typedef struct thread_st {
} thread_t; } thread_t;
static void* thr_run(void* data); static void* thr_run(void* data);
static void* thr_run_morelog(void* data);
#define NTHR 16 #define NTHR 256
#define NITER 100
int main(int argc, char* argv[]) int main(int argc, char* argv[])
{ {
@ -98,7 +100,11 @@ int main(int argc, char* argv[])
mes = skygw_message_init(); mes = skygw_message_init();
mtx = simple_mutex_init(NULL, strdup("testmtx")); mtx = simple_mutex_init(NULL, strdup("testmtx"));
/** Test starts */
fprintf(stderr, "\nStarting test #1 \n");
/** 1 */
for (i=0; i<NTHR; i++) { for (i=0; i<NTHR; i++) {
thr[i] = (thread_t*)calloc(1, sizeof(thread_t)); thr[i] = (thread_t*)calloc(1, sizeof(thread_t));
thr[i]->mes = mes; thr[i]->mes = mes;
@ -134,14 +140,67 @@ int main(int argc, char* argv[])
for (i=0; i<NTHR; i++) { for (i=0; i<NTHR; i++) {
free(thr[i]); free(thr[i]);
} }
fprintf(stderr, "\nStarting test #2 \n");
/** 2 */
for (i=0; i<NTHR; i++) {
thr[i] = (thread_t*)calloc(1, sizeof(thread_t));
thr[i]->mes = mes;
thr[i]->mtx = mtx;
thr[i]->nactive = &nactive;
}
nactive = NTHR;
fprintf(stderr,
"\nLaunching %d threads, each iterating %d times.",
NTHR,
NITER);
for (i=0; i<NTHR; i++) {
pthread_t p;
pthread_create(&p, NULL, thr_run_morelog, thr[i]);
thr[i]->tid = p;
}
fprintf(stderr, ".. done");
fprintf(stderr, "\nStarting to wait threads.\n");
do {
skygw_message_wait(mes);
simple_mutex_lock(mtx, TRUE);
if (nactive > 0) {
simple_mutex_unlock(mtx);
continue;
}
break;
} while(TRUE);
for (i=0; i<NTHR; i++) {
pthread_join(thr[i]->tid, NULL);
}
/** This is to release memory */
skygw_logmanager_done(NULL);
simple_mutex_unlock(mtx);
fprintf(stderr, "\nFreeing thread memory.");
for (i=0; i<NTHR; i++) {
free(thr[i]);
}
/** Test ended here */
skygw_message_done(mes); skygw_message_done(mes);
simple_mutex_done(mtx); simple_mutex_done(mtx);
fprintf(stderr, ".. done.\n");
return err; return err;
} }
void* thr_run( static void* thr_run(
void* data) void* data)
{ {
thread_t* td = (thread_t *)data; thread_t* td = (thread_t *)data;
@ -218,3 +277,57 @@ void* thr_run(
skygw_message_send(td->mes); skygw_message_send(td->mes);
return NULL; return NULL;
} }
static int nstr(
char** str_arr)
{
int i;
for (i=0; str_arr[i] != NULL; i++) {
}
return i;
}
char* logs[] = {
"foo",
"bar",
"done",
"critical test logging",
"longer test l o g g g i n g",
"reeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeally loooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooong line",
"shoorter one",
"two",
"scrap : 834nuft984pnw8ynup4598yp8wup8upwn48t5gpn45",
"more the same : f98uft5p8ut2p44449upnt5",
"asdasd987987asdasd987987asdasd987987asdasd987987asdasd987987asdasd987987asdasd987987asdasd98987",
NULL
};
static void* thr_run_morelog(
void* data)
{
thread_t* td = (thread_t *)data;
char* logstr;
int err;
int i;
int nmsg;
nmsg = nstr(logs);
for (i=0; i<NITER; i++) {
char* str = logs[rand()%nmsg];
err = skygw_log_write(NULL,
(logfile_id_t)(rand()%(LOGFILE_LAST+1)),
"%s - iteration # %d",
str,
i);
}
simple_mutex_lock(td->mtx, TRUE);
*td->nactive -= 1;
simple_mutex_unlock(td->mtx);
skygw_message_send(td->mes);
return NULL;
}

View File

@ -47,7 +47,7 @@
# define ss_prof(exp) # define ss_prof(exp)
#endif /* SS_DEBUG || SS_PROF */ #endif /* SS_DEBUG || SS_PROF */
#if defined(SS_DEBUG) #if defined(EI_SS_DEBUG)
# define ss_debug(exp) exp # define ss_debug(exp) exp
# define ss_dfprintf fprintf # define ss_dfprintf fprintf
# define ss_dfflush fflush # define ss_dfflush fflush
@ -113,7 +113,7 @@ typedef enum skygw_chk_t {
CHK_NUM_FNAMES, CHK_NUM_FNAMES,
CHK_NUM_LOGMANAGER, CHK_NUM_LOGMANAGER,
CHK_NUM_FILE, CHK_NUM_FILE,
CHK_NUM_WRITEBUF CHK_NUM_BLOCKBUF
} skygw_chk_t; } skygw_chk_t;
# define STRBOOL(b) ((b) ? "TRUE" : "FALSE") # define STRBOOL(b) ((b) ? "TRUE" : "FALSE")
@ -248,16 +248,12 @@ typedef enum skygw_chk_t {
ss_info_dassert(lf->lf_id >= LOGFILE_FIRST && \ ss_info_dassert(lf->lf_id >= LOGFILE_FIRST && \
lf->lf_id <= LOGFILE_LAST, \ lf->lf_id <= LOGFILE_LAST, \
"Invalid logfile id\n"); \ "Invalid logfile id\n"); \
ss_info_dassert(lf->lf_writebuf_size > 0, \
"Error, logfile's writebuf size is zero " \
"or negative\n"); \
(lf->lf_chk_top != CHK_NUM_LOGFILE || \ (lf->lf_chk_top != CHK_NUM_LOGFILE || \
lf->lf_chk_tail != CHK_NUM_LOGFILE ? \ lf->lf_chk_tail != CHK_NUM_LOGFILE ? \
FALSE : \ FALSE : \
(lf->lf_logpath == NULL || \ (lf->lf_logpath == NULL || \
lf->lf_name_prefix == NULL || \ lf->lf_name_prefix == NULL || \
lf->lf_name_suffix == NULL || \ lf->lf_name_suffix == NULL || \
lf->lf_writebuf_size == 0 || \
lf->lf_full_name == NULL ? FALSE : TRUE)); \ lf->lf_full_name == NULL ? FALSE : TRUE)); \
} }
@ -317,9 +313,10 @@ typedef enum skygw_chk_t {
"File struct under- or overflow"); \ "File struct under- or overflow"); \
} }
#define CHK_WRITEBUF(w) { \
ss_info_dassert(w->wb_chk_top == CHK_NUM_WRITEBUF, \ #define CHK_BLOCKBUF(bb) { \
"Writebuf under- or overflow"); \ ss_info_dassert(bb->bb_chk_top == CHK_NUM_BLOCKBUF, \
"Block buf under- or overflow"); \
} }
#endif /* SKYGW_DEBUG_H */ #endif /* SKYGW_DEBUG_H */

View File

@ -276,7 +276,8 @@ mlist_t* mlist_init(
mlist_t* listp, mlist_t* listp,
mlist_cursor_t** cursor, mlist_cursor_t** cursor,
char* name, char* name,
void (*datadel)(void*)) void (*datadel)(void*),
int maxnodes)
{ {
mlist_cursor_t* c; mlist_cursor_t* c;
mlist_t* list; mlist_t* list;
@ -301,6 +302,8 @@ mlist_t* mlist_init(
} }
list->mlist_chk_top = CHK_NUM_MLIST; list->mlist_chk_top = CHK_NUM_MLIST;
list->mlist_chk_tail = CHK_NUM_MLIST; list->mlist_chk_tail = CHK_NUM_MLIST;
/** Set size limit for list. 0 means unlimited */
list->mlist_nodecount_max = maxnodes;
/** Set data deletion callback fun */ /** Set data deletion callback fun */
list->mlist_datadel = datadel; list->mlist_datadel = datadel;
if (name != NULL) { if (name != NULL) {
@ -462,11 +465,31 @@ void* mlist_cursor_get_data_nomutex(
return (mc->mlcursor_pos->mlnode_data); return (mc->mlcursor_pos->mlnode_data);
} }
void mlist_add_data_nomutex( /**
* @node Adds data to list by allocating node for it. Checks list size limit.
*
* Parameters:
* @param list - <usage>
* <description>
*
* @param data - <usage>
* <description>
*
* @return TRUE, if succeed, FALSE, if list had node limit and it is full.
*
*
* @details (write detailed description here)
*
*/
bool mlist_add_data_nomutex(
mlist_t* list, mlist_t* list,
void* data) void* data)
{ {
mlist_add_node_nomutex(list, mlist_node_init(data, NULL)); bool succp;
succp = mlist_add_node_nomutex(list, mlist_node_init(data, NULL));
return succp;
} }
@ -512,7 +535,7 @@ mlist_node_t* mlist_detach_first(
} }
/** /**
* @node Add new node to end of list * @node Add new node to end of list if there is space for it.
* *
* Parameters: * Parameters:
* @param list - <usage> * @param list - <usage>
@ -524,21 +547,26 @@ mlist_node_t* mlist_detach_first(
* @param add_last - <usage> * @param add_last - <usage>
* <description> * <description>
* *
* @return void * @return TRUE, if succeede, FALSE, if list size limit was exceeded.
* *
* *
* @details (write detailed description here) * @details (write detailed description here)
* *
*/ */
void mlist_add_node_nomutex( bool mlist_add_node_nomutex(
mlist_t* list, mlist_t* list,
mlist_node_t* newnode) mlist_node_t* newnode)
{ {
bool succp = FALSE;
CHK_MLIST(list); CHK_MLIST(list);
CHK_MLIST_NODE(newnode); CHK_MLIST_NODE(newnode);
ss_dassert(!list->mlist_deleted); ss_dassert(!list->mlist_deleted);
/** List is full already. */
if (list->mlist_nodecount == list->mlist_nodecount_max) {
goto return_succp;
}
/** Find location for new node */ /** Find location for new node */
if (list->mlist_last != NULL) { if (list->mlist_last != NULL) {
ss_dassert(!list->mlist_last->mlnode_deleted); ss_dassert(!list->mlist_last->mlnode_deleted);
@ -552,7 +580,10 @@ void mlist_add_node_nomutex(
list->mlist_last = newnode; list->mlist_last = newnode;
newnode->mlnode_list = list; newnode->mlnode_list = list;
list->mlist_nodecount += 1; list->mlist_nodecount += 1;
succp = TRUE;
return_succp:
CHK_MLIST(list); CHK_MLIST(list);
return succp;
} }
@ -1102,14 +1133,13 @@ bool skygw_thread_must_exit(
void acquire_lock( void acquire_lock(
int* l) int* l)
{ {
register short int misscount = 0; register int misscount = 0;
while (atomic_add(l, 1) != 0) { while (atomic_add(l, 1) != 0) {
atomic_add(l, -1); atomic_add(l, -1);
misscount += 1; misscount += 1;
if (misscount > 10) { if (misscount > 10) {
usleep(rand()%100); usleep(rand()%misscount);
misscount = 0;
} }
} }
} }
@ -1188,30 +1218,37 @@ return_sm:
int simple_mutex_done( int simple_mutex_done(
simple_mutex_t* sm) simple_mutex_t* sm)
{ {
int err; int err = 0;
CHK_SIMPLE_MUTEX(sm); CHK_SIMPLE_MUTEX(sm);
if (atomic_add(&sm->sm_enabled, -1) != 1) { if (atomic_add(&sm->sm_enabled, -1) != 1) {
atomic_add(&sm->sm_enabled, 1); atomic_add(&sm->sm_enabled, 1);
} }
#if 0
assert(!pthread_mutex_trylock(&sm->sm_mutex));
assert(!pthread_mutex_unlock(&sm->sm_mutex));
assert((err = pthread_mutex_destroy(&sm->sm_mutex)) == 0);
#else
err = pthread_mutex_destroy(&sm->sm_mutex); err = pthread_mutex_destroy(&sm->sm_mutex);
#endif
#if 0
if (err != 0) { if (err != 0) {
goto return_err; goto return_err;
} }
#endif
simple_mutex_free_memory(sm); simple_mutex_free_memory(sm);
return_err: return_err:
if (err != 0) { if (err != 0) {
perror("simple_mutex : ");
fprintf(stderr, fprintf(stderr,
"FATAL : destroying simple mutex %s failed, " "FATAL : destroying simple mutex %s failed, "
"errno %d : %s\n", "errno %d : %s\n",
sm->sm_name, sm->sm_name,
err, err,
strerror(errno)); strerror(errno));
perror("simple_mutex : ");
} }
return err; return err;
} }
@ -1492,7 +1529,9 @@ static bool file_write_header(
} }
len1 = strlen(header_buf1); len1 = strlen(header_buf1);
len2 = strlen(header_buf2); len2 = strlen(header_buf2);
#if defined(LAPTOP_TEST)
usleep(DISKWRITE_LATENCY);
#else
wbytes1=fwrite((void*)header_buf1, len1, 1, file->sf_file); wbytes1=fwrite((void*)header_buf1, len1, 1, file->sf_file);
wbytes2=fwrite((void*)header_buf2, len2, 1, file->sf_file); wbytes2=fwrite((void*)header_buf2, len2, 1, file->sf_file);
@ -1505,7 +1544,7 @@ static bool file_write_header(
perror("Logfile header write.\n"); perror("Logfile header write.\n");
goto return_succp; goto return_succp;
} }
#endif
CHK_FILE(file); CHK_FILE(file);
succp = TRUE; succp = TRUE;
@ -1521,20 +1560,37 @@ bool skygw_file_write(
void* data, void* data,
size_t nbytes) size_t nbytes)
{ {
size_t nwritten;
bool succp = FALSE; bool succp = FALSE;
#if !defined(LAPTOP_TEST)
size_t nwritten;
int fd;
static int writecount;
#endif
CHK_FILE(file); CHK_FILE(file);
#if (LAPTOP_TEST)
usleep(DISKWRITE_LATENCY);
#else
nwritten = fwrite(data, nbytes, 1, file->sf_file); nwritten = fwrite(data, nbytes, 1, file->sf_file);
if (nwritten != 1) { if (nwritten != 1) {
perror("Logfile write.\n");
fprintf(stderr, fprintf(stderr,
"Writing header %s to %s failed.\n", "Writing %ld bytes, %s to %s failed.\n",
nbytes,
(char *)data, (char *)data,
file->sf_fname); file->sf_fname);
perror("Logfile write.\n");
goto return_succp; goto return_succp;
} }
writecount += 1;
if (writecount == FSYNCLIMIT) {
fd = fileno(file->sf_file);
fsync(fd);
writecount = 0;
}
#endif
succp = TRUE; succp = TRUE;
CHK_FILE(file); CHK_FILE(file);
return_succp: return_succp:
@ -1565,7 +1621,16 @@ skygw_file_t* skygw_file_init(
file = NULL; file = NULL;
goto return_file; goto return_file;
} }
file_write_header(file);
if (!file_write_header(file)) {
fprintf(stderr,
"Writing header of log file %s failed.\n",
file->sf_fname);
perror("SkyGW file open\n");
free(file);
file = NULL;
goto return_file;
}
CHK_FILE(file); CHK_FILE(file);
ss_dfprintf(stderr, "Opened %s\n", file->sf_fname); ss_dfprintf(stderr, "Opened %s\n", file->sf_fname);
return_file: return_file:

View File

@ -4,10 +4,13 @@
#define MLIST #define MLIST
#define MIN(a,b) (a<b ? a : b) #define MIN(a,b) (a<b ? a : b)
#define MAX(a,b) (a>b ? a : b) #define MAX(a,b) (a>b ? a : b)
#define FSYNCLIMIT 10
#include "skygw_types.h" #include "skygw_types.h"
#include "skygw_debug.h" #include "skygw_debug.h"
#define DISKWRITE_LATENCY (5*MSEC_USEC)
typedef struct slist_node_st slist_node_t; typedef struct slist_node_st slist_node_t;
typedef struct slist_st slist_t; typedef struct slist_st slist_t;
typedef struct slist_cursor_st slist_cursor_t; typedef struct slist_cursor_st slist_cursor_t;
@ -38,13 +41,16 @@ typedef struct skygw_rwlock_st {
typedef struct mlist_st { typedef struct mlist_st {
skygw_chk_t mlist_chk_top; skygw_chk_t mlist_chk_top;
char* mlist_name; char* mlist_name;
void (*mlist_datadel)(void *); void (*mlist_datadel)(void *); /**< clean-up function for data */
/** CREW concurrency, protects node updates and clean-up */ simple_mutex_t mlist_mutex; /**< protect node updates and clean-up */
simple_mutex_t mlist_mutex;
bool mlist_uselock; bool mlist_uselock;
bool mlist_islocked; bool mlist_islocked;
bool mlist_deleted; bool mlist_deleted;
size_t mlist_nodecount; size_t mlist_nodecount;
size_t mlist_nodecount_max; /**< size limit. 0 == no limit */
#if 1
size_t mlist_versno;
#endif
bool mlist_flat; bool mlist_flat;
mlist_node_t* mlist_first; mlist_node_t* mlist_first;
mlist_node_t* mlist_last; mlist_node_t* mlist_last;
@ -87,10 +93,12 @@ EXTERN_C_BLOCK_END
mlist_t* mlist_init(mlist_t* mlist, mlist_t* mlist_init(mlist_t* mlist,
mlist_cursor_t** cursor, mlist_cursor_t** cursor,
char* name, char* name,
void (*datadel)(void*)); void (*datadel)(void*),
int maxnodes);
void mlist_done(mlist_t* list); void mlist_done(mlist_t* list);
void mlist_add_data_nomutex(mlist_t* list, void* data); bool mlist_add_data_nomutex(mlist_t* list, void* data);
void mlist_add_node_nomutex(mlist_t* list, mlist_node_t* newnode); bool mlist_add_node_nomutex(mlist_t* list, mlist_node_t* newnode);
void* mlist_node_get_data(mlist_node_t* node); void* mlist_node_get_data(mlist_node_t* node);
mlist_node_t* mlist_detach_nodes(mlist_t* ml); mlist_node_t* mlist_detach_nodes(mlist_t* ml);
mlist_node_t* mlist_detach_first(mlist_t* ml); mlist_node_t* mlist_detach_first(mlist_t* ml);