added state based block buffers to log manager

This commit is contained in:
Markus Makela
2014-09-10 12:09:00 +03:00
parent ae9d38025e
commit 4028c50fea
5 changed files with 68 additions and 71 deletions

View File

@ -48,7 +48,6 @@ extern char *program_invocation_short_name;
#if defined(SS_DEBUG) #if defined(SS_DEBUG)
static int write_index; static int write_index;
static int block_start_index; static int block_start_index;
static int block_end_index;
static int prevval; static int prevval;
static simple_mutex_t msg_mutex; static simple_mutex_t msg_mutex;
#endif #endif
@ -120,7 +119,7 @@ typedef struct blockbuf_st {
skygw_chk_t bb_chk_top; skygw_chk_t bb_chk_top;
#endif #endif
logfile_id_t bb_fileid; logfile_id_t bb_fileid;
bool bb_isfull; /**< closed for disk write */ blockbuf_state_t bb_state; /**State of the block buffer*/
simple_mutex_t bb_mutex; /**< bb_buf_used, bb_isfull */ simple_mutex_t bb_mutex; /**< bb_buf_used, bb_isfull */
int bb_refcount; /**< protected by list mutex. #of clients */ int bb_refcount; /**< protected by list mutex. #of clients */
// int bb_blankcount; /**< # of blanks used btw strings */ // int bb_blankcount; /**< # of blanks used btw strings */
@ -342,7 +341,6 @@ static bool logmanager_init_nomutex(
lm->lm_chk_tail = CHK_NUM_LOGMANAGER; lm->lm_chk_tail = CHK_NUM_LOGMANAGER;
write_index = 0; write_index = 0;
block_start_index = 0; block_start_index = 0;
block_end_index = 0;
prevval = -1; prevval = -1;
simple_mutex_init(&msg_mutex, "Message mutex"); simple_mutex_init(&msg_mutex, "Message mutex");
#endif #endif
@ -691,7 +689,7 @@ static int logmanager_write_log(
tok = strtok(NULL,"|"); tok = strtok(NULL,"|");
if(tok){ if(strstr(str,"message|") && tok){
tokval = atoi(tok); tokval = atoi(tok);
@ -811,7 +809,7 @@ static int logmanager_write_log(
wp_c[timestamp_len-1+str_len-1]='\n'; wp_c[timestamp_len-1+str_len-1]='\n';
/** lock-free unregistration, includes flush if /** lock-free unregistration, includes flush if
* bb_isfull */ * bb_state == BB_FULL */
blockbuf_unregister(bb_c); blockbuf_unregister(bb_c);
} }
} /* if (spread_down) */ } /* if (spread_down) */
@ -842,7 +840,7 @@ static void blockbuf_unregister(
/** /**
* if this is the last client in a full buffer, send write request. * if this is the last client in a full buffer, send write request.
*/ */
if (atomic_add(&bb->bb_refcount, -1) == 1 && bb->bb_isfull) { if (atomic_add(&bb->bb_refcount, -1) == 1 && bb->bb_state == BB_FULL) {
skygw_message_send(lf->lf_logmes); skygw_message_send(lf->lf_logmes);
} }
ss_dassert(bb->bb_refcount >= 0); ss_dassert(bb->bb_refcount >= 0);
@ -874,7 +872,6 @@ static char* blockbuf_get_writepos(
size_t str_len, size_t str_len,
bool flush) bool flush)
{ {
int depth = 0;
logfile_t* lf; logfile_t* lf;
mlist_t* bb_list; mlist_t* bb_list;
char* pos = NULL; char* pos = NULL;
@ -912,35 +909,24 @@ static char* blockbuf_get_writepos(
/** Lock buffer */ /** Lock buffer */
simple_mutex_lock(&bb->bb_mutex, true); simple_mutex_lock(&bb->bb_mutex, true);
if (bb->bb_isfull || bb->bb_buf_left < str_len) { if (bb->bb_state == BB_FULL || bb->bb_buf_left < str_len) {
/** /**
* This block buffer is too full. * This block buffer is too full.
* Send flush request to file writer thread. This causes * Send flush request to file writer thread. This causes
* flushing all buffers, and (eventually) frees buffer space. * flushing all buffers, and (eventually) frees buffer space.
*/ */
blockbuf_register(bb); blockbuf_register(bb);
bb->bb_state = BB_FULL;
bb->bb_isfull = true; blockbuf_unregister(bb);
/** Unlock buffer */
simple_mutex_unlock(&bb->bb_mutex);
/** Lock list */
simple_mutex_lock(&bb_list->mlist_mutex, true);
blockbuf_unregister(bb);
/** Unlock buffer */
simple_mutex_unlock(&bb->bb_mutex);
/** Lock list */
simple_mutex_lock(&bb_list->mlist_mutex, true);
/**Move the full buffer to the end of the list*/
if(node->mlnode_next){
bb_list->mlist_first = node->mlnode_next;
bb_list->mlist_last->mlnode_next = node;
node->mlnode_next = NULL;
bb_list->mlist_last = node;
node = bb_list->mlist_first;
continue;
}
/** /**
* If next node exists move forward. Else check if there is * If next node exists move forward. Else check if there is
@ -990,7 +976,28 @@ static char* blockbuf_get_writepos(
node = bb_list->mlist_first; node = bb_list->mlist_first;
continue; continue;
} }
} else {
}else if(bb->bb_state == BB_CLEARED){
/**
*Move the full buffer to the end of the list
*/
simple_mutex_unlock(&bb->bb_mutex);
simple_mutex_lock(&bb_list->mlist_mutex, true);
if(node->mlnode_next){
bb_list->mlist_first = node->mlnode_next;
bb_list->mlist_last->mlnode_next = node;
node->mlnode_next = NULL;
bb_list->mlist_last = node;
node = bb_list->mlist_first;
}
bb->bb_state = BB_READY;
}else if (bb->bb_state == BB_READY){
/** /**
* There is space for new log string. * There is space for new log string.
*/ */
@ -998,9 +1005,11 @@ static char* blockbuf_get_writepos(
} }
} /** while (true) */ } /** while (true) */
} else { } else {
/** /**
* Create the first block buffer to logfile's blockbuf list. * Create the first block buffer to logfile's blockbuf list.
*/ */
bb = blockbuf_init(id); bb = blockbuf_init(id);
CHK_BLOCKBUF(bb); CHK_BLOCKBUF(bb);
@ -1026,7 +1035,7 @@ static char* blockbuf_get_writepos(
} /* if (bb_list->mlist_nodecount > 0) */ } /* if (bb_list->mlist_nodecount > 0) */
ss_dassert(pos == NULL); ss_dassert(pos == NULL);
ss_dassert(!(bb->bb_isfull || bb->bb_buf_left < str_len)); ss_dassert(!(bb->bb_state == BB_FULL || bb->bb_buf_left < str_len));
ss_dassert(bb_list->mlist_nodecount <= bb_list->mlist_nodecount_max); ss_dassert(bb_list->mlist_nodecount <= bb_list->mlist_nodecount_max);
/** /**
@ -1065,7 +1074,7 @@ static char* blockbuf_get_writepos(
* If flush flag is set, set buffer full. As a consequence, no-one * If flush flag is set, set buffer full. As a consequence, no-one
* can write to it before it is flushed to disk. * can write to it before it is flushed to disk.
*/ */
bb->bb_isfull = (flush == true ? true : bb->bb_isfull); bb->bb_state = (flush == true ? BB_FULL : bb->bb_state);
/** Unlock buffer */ /** Unlock buffer */
simple_mutex_unlock(&bb->bb_mutex); simple_mutex_unlock(&bb->bb_mutex);
@ -1096,7 +1105,7 @@ static blockbuf_t* blockbuf_init(
bb->bb_buf_size = MAX_LOGSTRLEN; bb->bb_buf_size = MAX_LOGSTRLEN;
#if defined(SS_DEBUG) #if defined(SS_DEBUG)
sprintf(bb->bb_buf,"[start:%d]",atomic_add(&block_start_index,1)); sprintf(bb->bb_buf,"[block:%d]",atomic_add(&block_start_index,1));
bb->bb_buf_used += strlen(bb->bb_buf); bb->bb_buf_used += strlen(bb->bb_buf);
bb->bb_buf_left -= strlen(bb->bb_buf); bb->bb_buf_left -= strlen(bb->bb_buf);
#endif #endif
@ -2344,7 +2353,7 @@ static void filewriter_done(
* lists of each logfile object. * lists of each logfile object.
* *
* Block buffer is written to log file if * Block buffer is written to log file if
* 1. bb_isfull == true, * 1. bb_state == true,
* 2. logfile object's lf_flushflag == true, or * 2. logfile object's lf_flushflag == true, or
* 3. skygw_thread_must_exit returns true. * 3. skygw_thread_must_exit returns true.
* *
@ -2384,7 +2393,7 @@ static void* thr_filewriter_fun(
blockbuf_t* bb; blockbuf_t* bb;
mlist_node_t* node; mlist_node_t* node;
int i; int i;
bool flush_blockbuf; /**< flush single block buffer. */ blockbuf_state_t flush_blockbuf; /**< flush single block buffer. */
bool flush_logfile; /**< flush logfile */ bool flush_logfile; /**< flush logfile */
bool flushall_logfiles;/**< flush all logfiles */ bool flushall_logfiles;/**< flush all logfiles */
size_t vn1; size_t vn1;
@ -2443,10 +2452,10 @@ static void* thr_filewriter_fun(
/** Lock block buffer */ /** Lock block buffer */
simple_mutex_lock(&bb->bb_mutex, true); simple_mutex_lock(&bb->bb_mutex, true);
flush_blockbuf = bb->bb_isfull; flush_blockbuf = bb->bb_state;
if (bb->bb_buf_used != 0 && if (bb->bb_buf_used != 0 &&
(flush_blockbuf || (flush_blockbuf == BB_FULL ||
flush_logfile || flush_logfile ||
flushall_logfiles)) flushall_logfiles))
{ {
@ -2461,8 +2470,6 @@ static void* thr_filewriter_fun(
&bb->bb_mutex, &bb->bb_mutex,
true); true);
} }
skygw_file_write(file, skygw_file_write(file,
(void *)bb->bb_buf, (void *)bb->bb_buf,
@ -2476,9 +2483,9 @@ static void* thr_filewriter_fun(
bb->bb_buf_left = bb->bb_buf_size; bb->bb_buf_left = bb->bb_buf_size;
bb->bb_buf_used = 0; bb->bb_buf_used = 0;
memset(bb->bb_buf, 0, bb->bb_buf_size); memset(bb->bb_buf, 0, bb->bb_buf_size);
bb->bb_isfull = false; bb->bb_state = BB_CLEARED;
#if defined (SS_DEBUG) #if defined(SS_DEBUG)
sprintf(bb->bb_buf,"[clear:%d]",atomic_add(&block_start_index,1)); sprintf(bb->bb_buf,"[block:%d]",atomic_add(&block_start_index,1));
bb->bb_buf_used += strlen(bb->bb_buf); bb->bb_buf_used += strlen(bb->bb_buf);
bb->bb_buf_left -= strlen(bb->bb_buf); bb->bb_buf_left -= strlen(bb->bb_buf);
#endif #endif

View File

@ -22,6 +22,12 @@ typedef struct logfile_st logfile_t;
typedef struct fnames_conf_st fnames_conf_t; typedef struct fnames_conf_st fnames_conf_t;
typedef struct logmanager_st logmanager_t; typedef struct logmanager_st logmanager_t;
typedef enum {
BB_READY = 0x00,
BB_FULL,
BB_CLEARED
} blockbuf_state_t;
typedef enum { typedef enum {
LOGFILE_ERROR = 1, LOGFILE_ERROR = 1,
LOGFILE_FIRST = LOGFILE_ERROR, LOGFILE_FIRST = LOGFILE_ERROR,

View File

@ -1,6 +1,6 @@
#! /bin/bash #! /bin/bash
if [[ $# -lt 3 ]] if [[ $# -lt 4 ]]
then then
echo "Usage: logorder.sh <iterations> <frequency of flushes> <message size>" echo "Usage: logorder.sh <iterations> <frequency of flushes> <message size>"
echo "To disable log flushing, use 0 for flush frequency" echo "To disable log flushing, use 0 for flush frequency"
@ -12,31 +12,31 @@ rm *.log
#Create large messages #Create large messages
$PWD/testorder $1 $2 $3 $PWD/testorder $1 $2 $3
TESTLOG=$4
MCOUNT=$1 MCOUNT=$1
STARTS=`cat skygw_err1.log |tr -s ' '|grep -o 'start:[[:digit:]]\+'|cut -d ':' -f 2` BLOCKS=`cat skygw_err1.log |tr -s ' '|grep -o 'block:[[:digit:]]\+'|cut -d ':' -f 2`
MESSAGES=`cat skygw_err1.log |tr -s ' '|grep -o 'message|[[:digit:]]\+'|cut -d '|' -f 2` MESSAGES=`cat skygw_err1.log |tr -s ' '|grep -o 'message|[[:digit:]]\+'|cut -d '|' -f 2`
ENDS=`cat skygw_err1.log |tr -s ' '|grep -o 'end:[[:digit:]]\+'|cut -d ':' -f 2`
prev=0 prev=0
error=0 error=0
for i in $STARTS for i in $BLOCKS
do do
if [[ $i -le $prev ]] if [[ $i -le $prev ]]
then then
error=1 error=1
echo "start mismatch: $i was after $prev." echo "block mismatch: $i was after $prev." >> $TESTLOG
fi fi
prev=$i prev=$i
done done
if [[ error -eq 0 ]] if [[ error -eq 0 ]]
then then
echo "Block buffer starts were in ascending order" echo "Block buffers were in order" >> $TESTLOG
else else
echo "Error: block buffers were written in the wrong order" echo "Error: block buffers were written in the wrong order" >> $TESTLOG
fi fi
prev=0 prev=0
@ -48,34 +48,14 @@ do
if [[ $i -ne $(( prev + 1 )) ]] if [[ $i -ne $(( prev + 1 )) ]]
then then
error=1 error=1
echo "message mismatch: $i was after $prev." echo "message mismatch: $i was after $prev." >> $TESTLOG
fi fi
prev=$i prev=$i
done done
if [[ error -eq 0 ]] if [[ error -eq 0 ]]
then then
echo "Block buffer messages were in ascending order" echo "Block buffer messages were in order" >> $TESTLOG
else else
echo "Error: block buffer messages were written in the wrong order" echo "Error: block buffer messages were written in the wrong order" >> $TESTLOG
fi
prev=0
error=0
for i in $ENDS
do
if [[ $i -le $prev ]]
then
error=1
echo "end mismatch: $i was after $prev."
fi
prev=$i
done
if [[ error -eq 0 ]]
then
echo "Block buffer ends were in ascending order"
else
echo "Error: block buffers were written in the wrong order"
fi fi

View File

@ -28,6 +28,7 @@ testall:
cleantests: cleantests:
- $(DEL) *.o - $(DEL) *.o
- $(DEL) *.log
- $(DEL) testlog - $(DEL) testlog
- $(DEL) testorder - $(DEL) testorder
- $(DEL) *~ - $(DEL) *~
@ -73,6 +74,9 @@ runtests:
@echo "Use 16 threads" >> $(TESTLOG) @echo "Use 16 threads" >> $(TESTLOG)
@echo "" >> $(TESTLOG) @echo "" >> $(TESTLOG)
@-$(LAUNCH_DEBUGGER) $(TESTAPP) "-t 16" 2>>$(TESTLOG) @-$(LAUNCH_DEBUGGER) $(TESTAPP) "-t 16" 2>>$(TESTLOG)
@echo "Test Log Manager Message Order" >> $(TESTLOG)
@echo "" >> $(TESTLOG)
./logorder.sh 500 0 500 $(TESTLOG)
@echo "Log Manager PASSED" >> $(TESTLOG) @echo "Log Manager PASSED" >> $(TESTLOG)
@echo "" >> $(TESTLOG) @echo "" >> $(TESTLOG)

View File

@ -281,7 +281,7 @@ do
b=`$RUNCMD < $TINPUT 2>&1` b=`$RUNCMD < $TINPUT 2>&1`
if [[ "`echo "$b"|grep -i 'null|error'`" != "" ]] if [[ "`echo "$b"|grep -i 'null|error'`" != "" ]]
then then
err=`echo "$b" | grep -i null|error` err=`echo "$b" | grep -i 'null\|error'`
break break
fi fi
done done